"""Intelligent memory summarization and compression system.""" import asyncio import logging from datetime import datetime, timedelta from typing import Any from dataclasses import dataclass, field logger = logging.getLogger(__name__) @dataclass class SummarizationConfig: """Configuration for memory summarization.""" # Token thresholds context_window: int = 200000 # Model's context window reserve_tokens: int = 20000 # Reserved tokens for system prompt soft_threshold: int = 4000 # Trigger summarization before hitting limit # Summary settings keep_recent_tokens: int = 20000 # Keep recent N tokens summary_prompt: str = ( "Please summarize the following conversation, preserving key information, " "decisions, and important details. Focus on:\n" "- User preferences and requirements\n" "- Important decisions made\n" "- Technical details and configurations\n" "- Any follow-up tasks or action items\n\n" "Conversation:\n{content}\n\n" "Provide a concise summary:" ) # Evergreen settings evergreen_importance_threshold: int = 8 # Auto-mark high importance as evergreen # Decay settings decay_days_no_activity: int = 30 # Days without activity before decay starts decay_factor: float = 0.9 # Importance decay factor per period class MemorySummarizer: """LLM-based memory summarizer.""" def __init__(self, llm_provider=None, config: SummarizationConfig | None = None): """Initialize memory summarizer. Args: llm_provider: LLM provider for generating summaries config: Summarization configuration """ self.llm_provider = llm_provider self.config = config or SummarizationConfig() async def summarize_conversation( self, messages: list[dict[str, Any]], ) -> str | None: """Summarize a conversation. Args: messages: List of conversation messages Returns: Summary string or None if failed """ if not self.llm_provider: logger.warning("No LLM provider configured for summarization") return None if not messages: return None # Format messages for summarization content = self._format_messages(messages) # Generate summary using LLM try: prompt = self.config.summary_prompt.format(content=content) response = await self.llm_provider.chat( messages=[{"role": "user", "content": prompt}], max_tokens=1024, temperature=0.5, ) if response and response.content: return response.content.strip() except Exception as e: logger.error(f"Summarization failed: {e}") return None def _format_messages(self, messages: list[dict[str, Any]]) -> str: """Format messages for summarization prompt.""" lines = [] for msg in messages: role = msg.get("role", "unknown") content = msg.get("content", "") if content: lines.append(f"{role}: {content[:500]}") # Truncate long messages return "\n".join(lines) def estimate_tokens(self, text: str) -> int: """Estimate token count (rough approximation). Args: text: Text to estimate Returns: Estimated token count """ # Rough estimate: ~4 characters per token return len(text) // 4 class ContextCompressor: """Context compression manager for agent memory.""" def __init__( self, summarizer: MemorySummarizer, config: SummarizationConfig | None = None, ): """Initialize context compressor. Args: summarizer: Memory summarizer config: Summarization configuration """ self.summarizer = summarizer self.config = config or SummarizationConfig() self._compaction_count = 0 @property def flush_trigger_tokens(self) -> int: """Calculate token threshold for triggering memory flush.""" return ( self.config.context_window - self.config.reserve_tokens - self.config.soft_threshold ) def should_flush(self, current_tokens: int) -> bool: """Check if memory flush should be triggered. Args: current_tokens: Current token count Returns: True if flush should be triggered """ return current_tokens >= self.flush_trigger_tokens async def compress_context( self, messages: list[dict[str, Any]], current_tokens: int, ) -> tuple[list[dict[str, Any]], str | None]: """Compress context when approaching token limit. Args: messages: Current conversation messages current_tokens: Current token count Returns: Tuple of (compressed messages, summary) """ if not self.should_flush(current_tokens): return messages, None self._compaction_count += 1 logger.info(f"Triggering context compression (count: {self._compaction_count})") # Keep recent messages recent_messages = self._keep_recent_messages( messages, self.config.keep_recent_tokens, ) # Summarize older messages older_messages = self._get_older_messages( messages, self.config.keep_recent_tokens, ) if not older_messages: return recent_messages, None summary = await self.summarizer.summarize_conversation(older_messages) # Create compressed context compressed = recent_messages.copy() if summary: # Add summary as a system message compressed.insert(0, { "role": "system", "content": f"[Previous conversation summary]\n{summary}", }) logger.info(f"Context compressed: {len(older_messages)} messages summarized") return compressed, summary def _keep_recent_messages( self, messages: list[dict[str, Any]], max_tokens: int, ) -> list[dict[str, Any]]: """Keep recent messages within token limit.""" result = [] total_tokens = 0 # Process from newest to oldest for msg in reversed(messages): content = msg.get("content", "") tokens = self.summarizer.estimate_tokens(content) if total_tokens + tokens > max_tokens: break result.insert(0, msg) total_tokens += tokens return result def _get_older_messages( self, messages: list[dict[str, Any]], keep_tokens: int, ) -> list[dict[str, Any]]: """Get older messages that should be summarized.""" result = [] total_tokens = 0 # Process from oldest to newest for msg in messages: content = msg.get("content", "") tokens = self.summarizer.estimate_tokens(content) if total_tokens + tokens > keep_tokens: result.append(msg) total_tokens += tokens return result def get_compaction_count(self) -> int: """Get number of compactions performed.""" return self._compaction_count class MemoryDecayManager: """Memory importance decay manager.""" def __init__(self, config: SummarizationConfig | None = None): """Initialize decay manager. Args: config: Summarization configuration """ self.config = config or SummarizationConfig() def calculate_decay( self, importance: int, last_accessed: datetime, is_evergreen: bool = False, ) -> int: """Calculate decayed importance. Args: importance: Original importance (1-10) last_accessed: Last access timestamp is_evergreen: Whether memory is marked as evergreen Returns: Decayed importance """ if is_evergreen: return importance # Calculate days since last access days_since = (datetime.now() - last_accessed).days if days_since < self.config.decay_days_no_activity: return importance # Calculate decay periods decay_periods = ( days_since - self.config.decay_days_no_activity ) // self.config.decay_days_no_activity # Apply decay decay_factor = self.config.decay_factor ** decay_periods decayed = int(importance * decay_factor) # Ensure minimum importance of 1 return max(1, decayed) def should_archive(self, importance: int, last_accessed: datetime) -> bool: """Check if memory should be archived. Args: importance: Current importance last_accessed: Last access timestamp Returns: True if should be archived """ # Archive if importance has decayed to 1 and no recent access decayed = self.calculate_decay(importance, last_accessed) days_since = (datetime.now() - last_accessed).days return decayed == 1 and days_since > self.config.decay_days_no_activity * 3 class EvergreenManager: """Evergreen (persistent) memory manager.""" def __init__(self, config: SummarizationConfig | None = None): """Initialize evergreen manager. Args: config: Summarization configuration """ self.config = config or SummarizationConfig() def should_mark_evergreen( self, importance: int, memory_type: str, content: str, ) -> bool: """Determine if memory should be marked as evergreen. Args: importance: Importance score memory_type: Type of memory content: Memory content Returns: True if should be evergreen """ # High importance memories are evergreen if importance >= self.config.evergreen_importance_threshold: return True # Certain memory types are typically evergreen evergreen_types = {"preference", "identity", "configuration"} if memory_type in evergreen_types: return True # Check for evergreen keywords in content evergreen_keywords = [ "always", "never", "permanent", "fixed", "my name is", "i am", "preference", ] content_lower = content.lower() if any(kw in content_lower for kw in evergreen_keywords): return True return False def format_evergreen_prompt(self, memories: list[dict[str, Any]]) -> str: """Format evergreen memories for system prompt. Args: memories: List of evergreen memories Returns: Formatted prompt """ if not memories: return "" lines = ["[Evergreen Memories]"] for mem in memories: content = mem.get("content", "") memory_type = mem.get("memory_type", "general") lines.append(f"- [{memory_type}] {content}") return "\n".join(lines) class IntelligentMemorySystem: """Complete intelligent memory management system.""" def __init__( self, llm_provider=None, config: SummarizationConfig | None = None, ): """Initialize intelligent memory system. Args: llm_provider: LLM provider for summarization config: System configuration """ self.config = config or SummarizationConfig() # Initialize components self.summarizer = MemorySummarizer(llm_provider, self.config) self.compressor = ContextCompressor(self.summarizer, self.config) self.decay_manager = MemoryDecayManager(self.config) self.evergreen_manager = EvergreenManager(self.config) async def process_message( self, messages: list[dict[str, Any]], current_tokens: int, agent_id: str, user_id: str = "default", ) -> tuple[list[dict[str, Any]], dict[str, Any] | None]: """Process incoming message with intelligent memory management. Args: messages: Current conversation messages current_tokens: Current token count agent_id: Agent ID user_id: User ID Returns: Tuple of (processed messages, memory to save) """ # Check if compression needed processed_messages, summary = await self.compressor.compress_context( messages, current_tokens, ) memory_to_save = None if summary: memory_to_save = { "content": f"[Conversation Summary]\n{summary}", "agent_id": agent_id, "user_id": user_id, "memory_type": "summary", "importance": 5, } return processed_messages, memory_to_save def get_evergreen_context( self, memories: list[dict[str, Any]], ) -> str: """Get evergreen memories formatted for context. Args: memories: List of all memories Returns: Formatted evergreen context """ evergreen = [ m for m in memories if m.get("is_evergreen", False) or self.evergreen_manager.should_mark_evergreen( m.get("importance", 5), m.get("memory_type", ""), m.get("content", ""), ) ] return self.evergreen_manager.format_evergreen_prompt(evergreen) def apply_decay( self, memories: list[dict[str, Any]], ) -> list[dict[str, Any]]: """Apply decay to memories. Args: memories: List of memories Returns: Memories with updated importance """ updated = [] for mem in memories: last_accessed = mem.get("last_accessed_at") if isinstance(last_accessed, str): last_accessed = datetime.fromisoformat(last_accessed) elif not last_accessed: last_accessed = datetime.now() is_evergreen = mem.get("is_evergreen", False) new_importance = self.decay_manager.calculate_decay( mem.get("importance", 5), last_accessed, is_evergreen, ) mem["importance"] = new_importance mem["should_archive"] = self.decay_manager.should_archive( new_importance, last_accessed, ) updated.append(mem) return updated def create_intelligent_memory_system( llm_provider=None, context_window: int = 200000, reserve_tokens: int = 20000, ) -> IntelligentMemorySystem: """Create intelligent memory system with configuration. Args: llm_provider: LLM provider context_window: Model context window size reserve_tokens: Reserved tokens Returns: Configured IntelligentMemorySystem """ config = SummarizationConfig( context_window=context_window, reserve_tokens=reserve_tokens, ) return IntelligentMemorySystem(llm_provider=llm_provider, config=config)