Files
X-Agents/agent/app/xbot/agent.py
DESKTOP-72TV0V4\caoxiaozhu 465fdf2e6c feat: 新增xbot agent核心代码
新增agent/app/xbot模块,包含:
- agent.py: agent核心逻辑
- config.py: 配置管理
- session.py: 会话管理
- memory.py: 记忆管理
- loop.py: 循环任务
- adapter.py: 适配器

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 15:23:05 +08:00

310 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""XBot Agent - 封装 nanobot 核心能力的 Agent"""
import os
from pathlib import Path
from typing import Any, Optional
from datetime import datetime
from .loop import AgentLoop
from .session import SessionManager
from .adapter import XBotLLMAdapter, LLMResponse
from . import config
# 尝试导入 simplemem
try:
from simplemem import SimpleMemSystem
HAS_SIMPLEMEM = True
except ImportError:
HAS_SIMPLEMEM = False
class SimpleToolRegistry:
"""简单的工具注册表"""
def __init__(self):
self._tools: dict[str, Any] = {}
def register(self, name: str, func: Any, description: str = "") -> None:
"""注册一个工具"""
self._tools[name] = {
"function": func,
"description": description,
}
def get_definitions(self) -> list[dict]:
"""获取工具定义列表"""
tools = []
for name, tool in self._tools.items():
tools.append({
"type": "function",
"function": {
"name": name,
"description": tool.get("description", ""),
"parameters": {
"type": "object",
"properties": {},
"required": [],
}
}
})
return tools
def get(self, name: str) -> Optional[Any]:
"""获取工具"""
return self._tools.get(name)
async def execute(self, name: str, arguments: dict) -> Any:
"""执行工具"""
tool = self._tools.get(name)
if not tool:
return f"Tool {name} not found"
func = tool.get("function")
if not func:
return f"Tool {name} has no function"
try:
if callable(func):
return await func(**arguments) if hasattr(func, '__await__') else func(**arguments)
return "Tool function is not callable"
except Exception as e:
return f"Tool execution error: {str(e)}"
class XBotAgent:
"""
XBot Agent - 基于 nanobot 核心的 Agent 实现
特性:
- 多轮 tool-calling 对话
- 自动内存压缩
- 会话历史持久化
"""
def __init__(
self,
name: str,
role_description: str,
provider: str = "openai",
model: str = "gpt-4",
api_key: Optional[str] = None,
base_url: Optional[str] = None,
workspace: Optional[Path] = None,
context_window_tokens: int = 200000,
embedding_model: Optional[str] = None,
embedding_base_url: Optional[str] = None,
):
"""
初始化 XBot Agent
Args:
name: Agent 名称
role_description: Agent 角色描述
provider: LLM 提供商
model: 模型名称
api_key: API Key
base_url: Base URL
workspace: 工作目录(用于存储会话和记忆)
context_window_tokens: 上下文窗口大小
"""
self.name = name
self.role_description = role_description
# 使用配置文件的默认值
if api_key is None:
api_key = config.API_KEY
if base_url is None:
base_url = config.BASE_URL
if workspace is None:
workspace = Path(config.WORKSPACE)
# 创建工作目录
self.workspace = workspace
self.workspace.mkdir(parents=True, exist_ok=True)
# 创建 LLM 适配器
self.provider = XBotLLMAdapter(
provider=provider,
model_name=model,
api_key=api_key,
base_url=base_url,
)
# 创建工具注册表
self.tools = SimpleToolRegistry()
self._register_default_tools()
# 创建 Agent Loop
self.agent_loop = AgentLoop(
provider=self.provider,
model=model,
tools=self.tools,
max_iterations=50,
)
# 创建会话管理器
self.sessions = SessionManager(self.workspace)
# 创建 SimpleMem 记忆系统
if HAS_SIMPLEMEM and api_key and config.ENABLE_SIMPLEMEM:
# 使用配置文件的 embedding 设置
emb_model = embedding_model or config.EMBEDDING_MODEL
emb_base = embedding_base_url or config.EMBEDDING_BASE_URL or base_url
self.memory = SimpleMemSystem(
api_key=api_key,
base_url=emb_base,
model=model,
embedding_model=emb_model,
db_path=str(self.workspace / "memory_db"),
clear_db=False,
# 并行处理配置
enable_parallel_processing=config.ENABLE_PARALLEL_PROCESSING,
max_parallel_workers=config.MAX_PARALLEL_WORKERS,
enable_parallel_retrieval=config.ENABLE_PARALLEL_RETRIEVAL,
max_retrieval_workers=config.MAX_RETRIEVAL_WORKERS,
enable_planning=config.ENABLE_PLANNING,
enable_reflection=config.ENABLE_REFLECTION,
max_reflection_rounds=config.MAX_REFLECTION_ROUNDS,
)
self._use_simplemem = True
print(f"SimpleMem initialized with embedding: {emb_model}, base_url: {emb_base}")
else:
self.memory = None
self._use_simplemem = False
if not api_key:
print("Warning: No API key provided, SimpleMem will be disabled")
def _register_default_tools(self) -> None:
"""注册默认工具"""
# 可以在这里添加默认工具
pass
def register_tool(
self,
name: str,
func: Any,
description: str = "",
parameters: Optional[dict] = None,
) -> None:
"""注册自定义工具"""
tool_def = {
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters or {
"type": "object",
"properties": {},
"required": [],
}
}
}
# 存储在 tools 中
self.tools.register(name, func, description)
async def run(
self,
user_input: str,
session_id: str = "default",
) -> dict[str, Any]:
"""
运行 Agent 对话
Args:
user_input: 用户输入
session_id: 会话 ID
Returns:
dict: 包含 content, tool_calls 等
"""
# 获取或创建会话
session = self.sessions.get_or_create(session_id)
# 构建系统提示
system_prompt = f"""你是 {self.name}
{self.role_description}
请根据用户的问题回答,并使用 Markdown 格式输出。"""
# 如果使用 SimpleMem检索相关记忆
memory_context = ""
if self._use_simplemem and self.memory:
try:
memory_context = self.memory.ask(user_input)
except Exception as e:
print(f"Memory retrieval error: {e}")
if memory_context:
system_prompt += f"\n\n相关记忆:\n{memory_context}"
# 获取历史消息
history = session.get_history(max_messages=50)
# 构建初始消息
initial_messages = history + [
{"role": "user", "content": user_input}
]
# 运行 agent loop
final_content, tools_used, all_messages = await self.agent_loop.run_loop(
initial_messages=initial_messages,
system_prompt=system_prompt,
)
# 保存到会话
for m in all_messages[len(history):]:
session.messages.append(m)
self.sessions.save(session)
# 保存到 SimpleMem 记忆
if self._use_simplemem and self.memory and final_content:
try:
self.memory.add_dialogue("User", user_input, datetime.now().isoformat())
self.memory.add_dialogue(self.name, final_content, datetime.now().isoformat())
self.memory.finalize()
except Exception as e:
print(f"Memory save error: {e}")
return {
"content": final_content or "No response",
"tool_calls": tools_used,
"session_id": session_id,
}
async def run_stream(
self,
user_input: str,
session_id: str = "default",
):
"""
运行 Agent 对话(流式输出)
先完整执行 agent loop最后流式输出结果
Args:
user_input: 用户输入
session_id: 会话 ID
Yields:
str: 流式回复片段
"""
# 先完整执行 agent loop包含 tool-calling
result = await self.run(user_input, session_id)
content = result["content"]
# 流式输出结果
for char in content:
yield char
def clear_session(self, session_id: str) -> None:
"""清除会话"""
session = self.sessions.get_or_create(session_id)
session.clear()
self.sessions.save(session)
self.sessions.invalidate(session_id)
def list_sessions(self) -> list[dict]:
"""列出所有会话"""
return self.sessions.list_sessions()