Files
X-Agents/core/agents/agent/intelligent_memory.py

522 lines
15 KiB
Python
Raw Normal View History

"""Intelligent memory summarization and compression system."""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Any
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class SummarizationConfig:
"""Configuration for memory summarization."""
# Token thresholds
context_window: int = 200000 # Model's context window
reserve_tokens: int = 20000 # Reserved tokens for system prompt
soft_threshold: int = 4000 # Trigger summarization before hitting limit
# Summary settings
keep_recent_tokens: int = 20000 # Keep recent N tokens
summary_prompt: str = (
"Please summarize the following conversation, preserving key information, "
"decisions, and important details. Focus on:\n"
"- User preferences and requirements\n"
"- Important decisions made\n"
"- Technical details and configurations\n"
"- Any follow-up tasks or action items\n\n"
"Conversation:\n{content}\n\n"
"Provide a concise summary:"
)
# Evergreen settings
evergreen_importance_threshold: int = 8 # Auto-mark high importance as evergreen
# Decay settings
decay_days_no_activity: int = 30 # Days without activity before decay starts
decay_factor: float = 0.9 # Importance decay factor per period
class MemorySummarizer:
"""LLM-based memory summarizer."""
def __init__(self, llm_provider=None, config: SummarizationConfig | None = None):
"""Initialize memory summarizer.
Args:
llm_provider: LLM provider for generating summaries
config: Summarization configuration
"""
self.llm_provider = llm_provider
self.config = config or SummarizationConfig()
async def summarize_conversation(
self,
messages: list[dict[str, Any]],
) -> str | None:
"""Summarize a conversation.
Args:
messages: List of conversation messages
Returns:
Summary string or None if failed
"""
if not self.llm_provider:
logger.warning("No LLM provider configured for summarization")
return None
if not messages:
return None
# Format messages for summarization
content = self._format_messages(messages)
# Generate summary using LLM
try:
prompt = self.config.summary_prompt.format(content=content)
response = await self.llm_provider.chat(
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
temperature=0.5,
)
if response and response.content:
return response.content.strip()
except Exception as e:
logger.error(f"Summarization failed: {e}")
return None
def _format_messages(self, messages: list[dict[str, Any]]) -> str:
"""Format messages for summarization prompt."""
lines = []
for msg in messages:
role = msg.get("role", "unknown")
content = msg.get("content", "")
if content:
lines.append(f"{role}: {content[:500]}") # Truncate long messages
return "\n".join(lines)
def estimate_tokens(self, text: str) -> int:
"""Estimate token count (rough approximation).
Args:
text: Text to estimate
Returns:
Estimated token count
"""
# Rough estimate: ~4 characters per token
return len(text) // 4
class ContextCompressor:
"""Context compression manager for agent memory."""
def __init__(
self,
summarizer: MemorySummarizer,
config: SummarizationConfig | None = None,
):
"""Initialize context compressor.
Args:
summarizer: Memory summarizer
config: Summarization configuration
"""
self.summarizer = summarizer
self.config = config or SummarizationConfig()
self._compaction_count = 0
@property
def flush_trigger_tokens(self) -> int:
"""Calculate token threshold for triggering memory flush."""
return (
self.config.context_window
- self.config.reserve_tokens
- self.config.soft_threshold
)
def should_flush(self, current_tokens: int) -> bool:
"""Check if memory flush should be triggered.
Args:
current_tokens: Current token count
Returns:
True if flush should be triggered
"""
return current_tokens >= self.flush_trigger_tokens
async def compress_context(
self,
messages: list[dict[str, Any]],
current_tokens: int,
) -> tuple[list[dict[str, Any]], str | None]:
"""Compress context when approaching token limit.
Args:
messages: Current conversation messages
current_tokens: Current token count
Returns:
Tuple of (compressed messages, summary)
"""
if not self.should_flush(current_tokens):
return messages, None
self._compaction_count += 1
logger.info(f"Triggering context compression (count: {self._compaction_count})")
# Keep recent messages
recent_messages = self._keep_recent_messages(
messages,
self.config.keep_recent_tokens,
)
# Summarize older messages
older_messages = self._get_older_messages(
messages,
self.config.keep_recent_tokens,
)
if not older_messages:
return recent_messages, None
summary = await self.summarizer.summarize_conversation(older_messages)
# Create compressed context
compressed = recent_messages.copy()
if summary:
# Add summary as a system message
compressed.insert(0, {
"role": "system",
"content": f"[Previous conversation summary]\n{summary}",
})
logger.info(f"Context compressed: {len(older_messages)} messages summarized")
return compressed, summary
def _keep_recent_messages(
self,
messages: list[dict[str, Any]],
max_tokens: int,
) -> list[dict[str, Any]]:
"""Keep recent messages within token limit."""
result = []
total_tokens = 0
# Process from newest to oldest
for msg in reversed(messages):
content = msg.get("content", "")
tokens = self.summarizer.estimate_tokens(content)
if total_tokens + tokens > max_tokens:
break
result.insert(0, msg)
total_tokens += tokens
return result
def _get_older_messages(
self,
messages: list[dict[str, Any]],
keep_tokens: int,
) -> list[dict[str, Any]]:
"""Get older messages that should be summarized."""
result = []
total_tokens = 0
# Process from oldest to newest
for msg in messages:
content = msg.get("content", "")
tokens = self.summarizer.estimate_tokens(content)
if total_tokens + tokens > keep_tokens:
result.append(msg)
total_tokens += tokens
return result
def get_compaction_count(self) -> int:
"""Get number of compactions performed."""
return self._compaction_count
class MemoryDecayManager:
"""Memory importance decay manager."""
def __init__(self, config: SummarizationConfig | None = None):
"""Initialize decay manager.
Args:
config: Summarization configuration
"""
self.config = config or SummarizationConfig()
def calculate_decay(
self,
importance: int,
last_accessed: datetime,
is_evergreen: bool = False,
) -> int:
"""Calculate decayed importance.
Args:
importance: Original importance (1-10)
last_accessed: Last access timestamp
is_evergreen: Whether memory is marked as evergreen
Returns:
Decayed importance
"""
if is_evergreen:
return importance
# Calculate days since last access
days_since = (datetime.now() - last_accessed).days
if days_since < self.config.decay_days_no_activity:
return importance
# Calculate decay periods
decay_periods = (
days_since - self.config.decay_days_no_activity
) // self.config.decay_days_no_activity
# Apply decay
decay_factor = self.config.decay_factor ** decay_periods
decayed = int(importance * decay_factor)
# Ensure minimum importance of 1
return max(1, decayed)
def should_archive(self, importance: int, last_accessed: datetime) -> bool:
"""Check if memory should be archived.
Args:
importance: Current importance
last_accessed: Last access timestamp
Returns:
True if should be archived
"""
# Archive if importance has decayed to 1 and no recent access
decayed = self.calculate_decay(importance, last_accessed)
days_since = (datetime.now() - last_accessed).days
return decayed == 1 and days_since > self.config.decay_days_no_activity * 3
class EvergreenManager:
"""Evergreen (persistent) memory manager."""
def __init__(self, config: SummarizationConfig | None = None):
"""Initialize evergreen manager.
Args:
config: Summarization configuration
"""
self.config = config or SummarizationConfig()
def should_mark_evergreen(
self,
importance: int,
memory_type: str,
content: str,
) -> bool:
"""Determine if memory should be marked as evergreen.
Args:
importance: Importance score
memory_type: Type of memory
content: Memory content
Returns:
True if should be evergreen
"""
# High importance memories are evergreen
if importance >= self.config.evergreen_importance_threshold:
return True
# Certain memory types are typically evergreen
evergreen_types = {"preference", "identity", "configuration"}
if memory_type in evergreen_types:
return True
# Check for evergreen keywords in content
evergreen_keywords = [
"always", "never", "permanent", "fixed",
"my name is", "i am", "preference",
]
content_lower = content.lower()
if any(kw in content_lower for kw in evergreen_keywords):
return True
return False
def format_evergreen_prompt(self, memories: list[dict[str, Any]]) -> str:
"""Format evergreen memories for system prompt.
Args:
memories: List of evergreen memories
Returns:
Formatted prompt
"""
if not memories:
return ""
lines = ["[Evergreen Memories]"]
for mem in memories:
content = mem.get("content", "")
memory_type = mem.get("memory_type", "general")
lines.append(f"- [{memory_type}] {content}")
return "\n".join(lines)
class IntelligentMemorySystem:
"""Complete intelligent memory management system."""
def __init__(
self,
llm_provider=None,
config: SummarizationConfig | None = None,
):
"""Initialize intelligent memory system.
Args:
llm_provider: LLM provider for summarization
config: System configuration
"""
self.config = config or SummarizationConfig()
# Initialize components
self.summarizer = MemorySummarizer(llm_provider, self.config)
self.compressor = ContextCompressor(self.summarizer, self.config)
self.decay_manager = MemoryDecayManager(self.config)
self.evergreen_manager = EvergreenManager(self.config)
async def process_message(
self,
messages: list[dict[str, Any]],
current_tokens: int,
agent_id: str,
user_id: str = "default",
) -> tuple[list[dict[str, Any]], dict[str, Any] | None]:
"""Process incoming message with intelligent memory management.
Args:
messages: Current conversation messages
current_tokens: Current token count
agent_id: Agent ID
user_id: User ID
Returns:
Tuple of (processed messages, memory to save)
"""
# Check if compression needed
processed_messages, summary = await self.compressor.compress_context(
messages,
current_tokens,
)
memory_to_save = None
if summary:
memory_to_save = {
"content": f"[Conversation Summary]\n{summary}",
"agent_id": agent_id,
"user_id": user_id,
"memory_type": "summary",
"importance": 5,
}
return processed_messages, memory_to_save
def get_evergreen_context(
self,
memories: list[dict[str, Any]],
) -> str:
"""Get evergreen memories formatted for context.
Args:
memories: List of all memories
Returns:
Formatted evergreen context
"""
evergreen = [
m for m in memories
if m.get("is_evergreen", False)
or self.evergreen_manager.should_mark_evergreen(
m.get("importance", 5),
m.get("memory_type", ""),
m.get("content", ""),
)
]
return self.evergreen_manager.format_evergreen_prompt(evergreen)
def apply_decay(
self,
memories: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Apply decay to memories.
Args:
memories: List of memories
Returns:
Memories with updated importance
"""
updated = []
for mem in memories:
last_accessed = mem.get("last_accessed_at")
if isinstance(last_accessed, str):
last_accessed = datetime.fromisoformat(last_accessed)
elif not last_accessed:
last_accessed = datetime.now()
is_evergreen = mem.get("is_evergreen", False)
new_importance = self.decay_manager.calculate_decay(
mem.get("importance", 5),
last_accessed,
is_evergreen,
)
mem["importance"] = new_importance
mem["should_archive"] = self.decay_manager.should_archive(
new_importance,
last_accessed,
)
updated.append(mem)
return updated
def create_intelligent_memory_system(
llm_provider=None,
context_window: int = 200000,
reserve_tokens: int = 20000,
) -> IntelligentMemorySystem:
"""Create intelligent memory system with configuration.
Args:
llm_provider: LLM provider
context_window: Model context window size
reserve_tokens: Reserved tokens
Returns:
Configured IntelligentMemorySystem
"""
config = SummarizationConfig(
context_window=context_window,
reserve_tokens=reserve_tokens,
)
return IntelligentMemorySystem(llm_provider=llm_provider, config=config)