Files
X-Agents/agent/app/agent/memory/session.py
DESKTOP-72TV0V4\caoxiaozhu c6a4b28bf6 refactor: 重构 Agent 模块
- 删除旧的 agent 核心文件
- 新增 supervisor, memory, skills 等模块
- 重构 main.py 服务入口

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 16:25:37 +08:00

126 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Session Memory - 会话级记忆Redis 存储)
"""
import json
from typing import Optional
class SessionMemory:
"""会话级记忆Redis 存储"""
def __init__(self, agent_id: int, redis_client=None):
"""
初始化会话记忆
Args:
agent_id: 智能体 ID
redis_client: Redis 客户端(可选)
"""
self.agent_id = agent_id
self.redis = redis_client
self.ttl = 3600 * 24 # 24 小时
self.summary_threshold = 10 # 多少条消息后生成摘要
def _key(self, user_id: int, session_id: str) -> str:
"""生成 Redis Key"""
return f"agent:memory:session:{self.agent_id}:{user_id}:{session_id}"
async def add(self, user_input: str, response: str, user_id: int, session_id: str):
"""
添加对话到会话记忆
Args:
user_input: 用户输入
response: 智能体回复
user_id: 用户 ID
session_id: 会话 ID
"""
if not self.redis:
# 如果没有 Redis使用内存模拟
return self._add_memory(user_input, response, user_id, session_id)
key = self._key(user_id, session_id)
# 获取现有数据
data = await self.redis.get(key)
messages = json.loads(data) if data else {"messages": [], "summary": ""}
# 添加新消息
messages["messages"].append({"role": "user", "content": user_input})
messages["messages"].append({"role": "assistant", "content": response})
# 定期生成摘要
if len(messages["messages"]) >= self.summary_threshold:
messages["summary"] = await self._generate_summary(messages["messages"])
# 保持消息数量
if len(messages["messages"]) > 50:
messages["messages"] = messages["messages"][-50:]
await self.redis.setex(key, self.ttl, json.dumps(messages))
async def get_summary(self, user_id: int, session_id: str) -> str:
"""
获取会话摘要
Args:
user_id: 用户 ID
session_id: 会话 ID
Returns:
str: 会话摘要
"""
if not self.redis:
return self._get_memory_summary(user_id, session_id)
key = self._key(user_id, session_id)
data = await self.redis.get(key)
if data:
messages = json.loads(data)
return messages.get("summary", "")
return ""
async def _generate_summary(self, messages: list) -> str:
"""
生成摘要(简化版)
Args:
messages: 消息列表
Returns:
str: 摘要
"""
# 简化:取最后几条消息的要点
if not messages:
return ""
recent = messages[-6:] # 最近 3 轮
summary = f"最近对话包含 {len(messages)//2} 轮交互"
# TODO: 后续可以使用 LLM 生成更好的摘要
return summary
# === 内存模拟(无 Redis 时使用)===
_memory_store = {}
def _add_memory(self, user_input: str, response: str, user_id: int, session_id: str):
"""内存模拟 - 添加"""
key = f"{self.agent_id}:{user_id}:{session_id}"
if key not in self._memory_store:
self._memory_store[key] = {"messages": [], "summary": ""}
messages = self._memory_store[key]["messages"]
messages.append({"role": "user", "content": user_input})
messages.append({"role": "assistant", "content": response})
if len(messages) >= self.summary_threshold:
self._memory_store[key]["summary"] = self._generate_summary(messages)
def _get_memory_summary(self, user_id: int, session_id: str) -> str:
"""内存模拟 - 获取摘要"""
key = f"{self.agent_id}:{user_id}:{session_id}"
if key in self._memory_store:
return self._memory_store[key].get("summary", "")
return ""