feat: 新增 Agent 应用核心代码

- supervisor, memory, skills 模块
- LLM 工厂

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-11 16:26:33 +08:00
parent 0cab33b16b
commit f9660a3d7b
14 changed files with 835 additions and 0 deletions

1
agent/app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# X-Agents Python Agent Engine

View File

@@ -0,0 +1,3 @@
# Agent Core
from app.agent.core.agent import AgentCore, AgentConfig, AgentResponse
from app.agent.core.supervisor import Supervisor

View File

@@ -0,0 +1,156 @@
"""
Supervisor - 多智能体调度器
"""
import asyncio
from typing import List, Dict, Any
from app.agent.core.agent import AgentCore
class Supervisor:
"""多智能体调度器"""
def __init__(self, supervisor_agent: AgentCore, members: List[AgentCore], strategy: str = "parallel"):
"""
初始化调度器
Args:
supervisor_agent: 主智能体
members: 子智能体列表
strategy: 调度策略 (parallel/sequential)
"""
self.supervisor = supervisor_agent
self.members = members
self.strategy = strategy
async def run(self, task: str, user_id: int, session_id: str) -> Dict[str, Any]:
"""
执行多智能体协作
Args:
task: 用户任务
user_id: 用户 ID
session_id: 会话 ID
Returns:
Dict: 包含主响应和子任务结果
"""
# 1. 任务分解
subtasks = await self._decompose_task(task)
# 2. 分配任务
if self.strategy == "parallel":
results = await self._dispatch_parallel(subtasks, user_id, session_id)
else:
results = await self._dispatch_sequential(subtasks, user_id, session_id)
# 3. 汇总结果
final_result = await self._aggregate(results)
return {
"main_response": final_result,
"subtask_results": results,
"strategy": self.strategy
}
async def _decompose_task(self, task: str) -> List[Dict[str, str]]:
"""任务分解"""
# 调用 LLM 分解任务
prompt = f"""分解以下任务为子任务,返回 JSON 数组格式:
任务: {task}
返回格式示例:
[{{"task": "子任务1描述", "agent_type": "适合的智能体类型"}}, {{"task": "子任务2描述", "agent_type": "适合的智能体类型"}}]
请直接返回 JSON 数组,不要其他内容。"""
response = await self.supervisor.llm.generate(prompt, [])
try:
import json
# 尝试解析 JSON
subtasks = json.loads(response)
return subtasks
except:
# 解析失败,创建默认子任务
return [{"task": task, "agent_type": "general"}]
async def _dispatch_parallel(self, subtasks: List[Dict], user_id: int, session_id: str) -> List[Dict]:
"""并行分发任务"""
tasks = []
for i, subtask in enumerate(subtasks):
if i < len(self.members):
member = self.members[i]
else:
# 如果子任务多于成员,使用轮询
member = self.members[i % len(self.members)]
tasks.append(member.run(subtask['task'], user_id, session_id))
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
formatted_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
formatted_results.append({
"task": subtasks[i]['task'],
"success": False,
"error": str(result)
})
else:
formatted_results.append({
"task": subtasks[i]['task'],
"success": True,
"content": result.content,
"tool_calls": result.tool_calls
})
return formatted_results
async def _dispatch_sequential(self, subtasks: List[Dict], user_id: int, session_id: str) -> List[Dict]:
"""顺序分发任务"""
results = []
context = ""
for i, subtask in enumerate(subtasks):
# 选择子智能体
if i < len(self.members):
member = self.members[i]
else:
member = self.members[i % len(self.members)]
# 传递前一个结果作为上下文
enhanced_task = f"{context}\n\n当前任务: {subtask['task']}" if context else subtask['task']
result = await member.run(enhanced_task, user_id, session_id)
results.append({
"task": subtask['task'],
"success": True,
"content": result.content,
"tool_calls": result.tool_calls
})
# 累加上下文
context += f"\n\n=== 任务: {subtask['task']} ===\n{result.content}"
return results
async def _aggregate(self, results: List[Dict]) -> str:
"""汇总结果"""
# 过滤成功的结果
success_results = [r for r in results if r.get('success')]
if not success_results:
return "所有子任务执行失败"
if len(success_results) == 1:
return success_results[0].get('content', '')
# 调用 LLM 汇总
summary_prompt = "请汇总以下所有任务的结果,生成一个完整的回复:\n\n"
for i, result in enumerate(success_results, 1):
summary_prompt += f"=== 任务 {i}: {result.get('task', '')} ===\n{result.get('content', '')}\n\n"
final_response = await self.supervisor.llm.generate(summary_prompt, [])
return final_response

View File

@@ -0,0 +1,4 @@
# LLM
from app.agent.llm.factory import LLMFactory
from app.agent.llm.openai import OpenAILLM
from app.agent.llm.anthropic import AnthropicLLM

View File

@@ -0,0 +1,96 @@
"""
Anthropic LLM 实现
"""
import os
from typing import Dict, Any, List
from anthropic import AsyncAnthropic
class AnthropicLLM:
"""Anthropic Claude LLM"""
def __init__(self, model_name: str = "claude-3-sonnet-20240229"):
self.model_name = model_name
self.client = AsyncAnthropic(
api_key=os.getenv("ANTHROPIC_API_KEY", "")
)
async def decide(self, prompt: str) -> Dict[str, Any]:
"""
LLM 决策 - 判断是否需要调用技能
Args:
prompt: 完整的 Prompt
Returns:
Dict: 包含 needs_skill, tool_calls, response 等
"""
system_prompt = """你是一个智能助手。请分析用户请求,判断是否需要调用工具来回答问题。
如果需要调用工具,请按以下格式返回 JSON
{"needs_skill": true, "tool_calls": [{"skill_id": "工具名称", "parameters": {"参数": ""}, "reason": "调用原因"}]}
如果不需要调用工具,请返回:
{"needs_skill": false, "response": "直接回答用户的内容"}
请只返回 JSON不要其他内容。"""
try:
response = await self.client.messages.create(
model=self.model_name,
max_tokens=2000,
system=system_prompt,
messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
# 尝试解析 JSON
import json
try:
result = json.loads(content)
return result
except json.JSONDecodeError:
return {
"needs_skill": False,
"response": content
}
except Exception as e:
return {
"needs_skill": False,
"response": f"LLM 调用失败: {str(e)}"
}
async def generate(self, prompt: str, tool_results: List[Dict]) -> str:
"""
生成回复
Args:
prompt: 完整的 Prompt
tool_results: 工具调用结果
Returns:
str: 生成的回复
"""
user_message = prompt
# 添加工具结果作为上下文
if tool_results:
tool_context = "\n\n工具返回结果:\n"
for result in tool_results:
if result.get("success"):
tool_context += f"- {result.get('skill_id')}: {result.get('result')}\n"
user_message += tool_context
try:
response = await self.client.messages.create(
model=self.model_name,
max_tokens=4000,
messages=[{"role": "user", "content": user_message}]
)
return response.content[0].text
except Exception as e:
return f"生成回复失败: {str(e)}"

View File

@@ -0,0 +1,30 @@
"""
LLM Factory - LLM 工厂类
"""
from typing import Dict, Any
from app.agent.llm.openai import OpenAILLM
from app.agent.llm.anthropic import AnthropicLLM
class LLMFactory:
"""LLM 工厂类"""
@staticmethod
def create(provider: str, model_name: str):
"""
创建 LLM 实例
Args:
provider: 模型提供商 (openai/anthropic)
model_name: 模型名称
Returns:
LLM 实例
"""
if provider.lower() == "openai":
return OpenAILLM(model_name)
elif provider.lower() == "anthropic":
return AnthropicLLM(model_name)
else:
# 默认使用 OpenAI
return OpenAILLM(model_name)

View File

@@ -0,0 +1,112 @@
"""
OpenAI LLM 实现
"""
import os
from typing import Dict, Any, List, Optional
from openai import AsyncOpenAI
class OpenAILLM:
"""OpenAI LLM"""
def __init__(self, model_name: str = "gpt-4"):
self.model_name = model_name
self.client = AsyncOpenAI(
api_key=os.getenv("OPENAI_API_KEY", ""),
base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
)
async def decide(self, prompt: str) -> Dict[str, Any]:
"""
LLM 决策 - 判断是否需要调用技能
Args:
prompt: 完整的 Prompt
Returns:
Dict: 包含 needs_skill, tool_calls, response 等
"""
# 构建决策用的系统提示
system_prompt = """你是一个智能助手。请分析用户请求,判断是否需要调用工具来回答问题。
如果需要调用工具,请按以下格式返回 JSON
{
"needs_skill": true,
"tool_calls": [
{"skill_id": "工具名称", "parameters": {"参数": ""}, "reason": "调用原因"}
]
}
如果不需要调用工具,请返回:
{
"needs_skill": false,
"response": "直接回答用户的内容"
}
请只返回 JSON不要其他内容。"""
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=2000
)
content = response.choices[0].message.content
# 尝试解析 JSON
import json
try:
result = json.loads(content)
return result
except json.JSONDecodeError:
# 解析失败,作为普通回复处理
return {
"needs_skill": False,
"response": content
}
except Exception as e:
return {
"needs_skill": False,
"response": f"LLM 调用失败: {str(e)}"
}
async def generate(self, prompt: str, tool_results: List[Dict]) -> str:
"""
生成回复
Args:
prompt: 完整的 Prompt
tool_results: 工具调用结果
Returns:
str: 生成的回复
"""
messages = [{"role": "user", "content": prompt}]
# 添加工具结果作为上下文
if tool_results:
for result in tool_results:
if result.get("success"):
messages.append({
"role": "assistant",
"content": f"工具 {result.get('skill_id')} 返回: {result.get('result')}"
})
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=messages,
temperature=0.7,
max_tokens=4000
)
return response.choices[0].message.content
except Exception as e:
return f"生成回复失败: {str(e)}"

View File

@@ -0,0 +1,5 @@
# Memory
from app.agent.memory.manager import MemoryManager
from app.agent.memory.working import WorkingMemory
from app.agent.memory.session import SessionMemory
from app.agent.memory.persistent import PersistentMemory

View File

@@ -0,0 +1,94 @@
"""
Memory Manager - 记忆管理器
"""
from typing import Dict, List, Optional
from app.agent.memory.working import WorkingMemory
from app.agent.memory.session import SessionMemory
from app.agent.memory.persistent import PersistentMemory
class MemoryManager:
"""记忆管理器 - 统一接口"""
def __init__(self, agent_id: int):
self.agent_id = agent_id
self.working = WorkingMemory()
self.session = SessionMemory(agent_id)
self.persistent = PersistentMemory(agent_id)
async def load_context(self, query: str, user_id: int, session_id: str) -> Dict[str, str]:
"""
加载上下文记忆
Args:
query: 查询内容
user_id: 用户 ID
session_id: 会话 ID
Returns:
Dict: 包含 summary, knowledge 等
"""
# 1. Working Memory (内存,最快)
working_context = self.working.get()
# 2. Session Memory (Redis)
session_context = await self.session.get_summary(user_id, session_id)
# 3. Persistent Memory (向量库) - 按需检索
persistent_context = await self.persistent.search(query, user_id, top_k=3)
return {
'working': working_context.get('recent_messages', []),
'session': session_context,
'persistent': persistent_context,
'summary': self._build_summary(session_context, persistent_context),
'knowledge': persistent_context # TODO: 后续对接知识库
}
async def save(self, user_input: str, response: str, user_id: int, session_id: str):
"""
保存记忆
Args:
user_input: 用户输入
response: 智能体回复
user_id: 用户 ID
session_id: 会话 ID
"""
# 1. 写入 Working
self.working.add(user_input, response)
# 2. 写入 Session (定期摘要)
await self.session.add(user_input, response, user_id, session_id)
# 3. 提取关键信息写入 Persistent (定期)
if self.working.size() >= 5:
await self._extract_and_persist(user_input, response, user_id)
def _build_summary(self, session_context: str, persistent_context: List[str]) -> str:
"""构建记忆摘要"""
parts = []
if session_context:
parts.append(f"会话记忆: {session_context}")
if persistent_context:
parts.append(f"长期记忆: {'; '.join(persistent_context[:3])}")
return "\n".join(parts) if parts else "无相关记忆"
async def _extract_and_persist(self, user_input: str, response: str, user_id: int):
"""提取并持久化关键信息"""
# 提取关键信息简化版取前100字符作为摘要
key_points = []
# 简化:直接保存重要交互
if len(response) > 50: # 只保存有意义的回复
summary = response[:100] + "..."
key_points.append(summary)
for point in key_points:
await self.persistent.add(point, user_id, memory_type="conversation")
# 重置 Working Memory
self.working.clear()

View File

@@ -0,0 +1,109 @@
"""
Persistent Memory - 长期记忆(向量存储)
"""
from typing import List, Optional
class PersistentMemory:
"""长期记忆,向量存储"""
def __init__(self, agent_id: int, vector_store=None):
"""
初始化长期记忆
Args:
agent_id: 智能体 ID
vector_store: 向量存储客户端(可选)
"""
self.agent_id = agent_id
self.vector_store = vector_store
self.use_vector = vector_store is not None
async def add(self, content: str, user_id: int, memory_type: str = "experience"):
"""
添加长期记忆
Args:
content: 记忆内容
user_id: 用户 ID
memory_type: 记忆类型 (experience/preference/conversation)
"""
if not self.use_vector:
return self._add_memory(content, user_id, memory_type)
# 生成向量并存储
embedding = await self._get_embedding(content)
# TODO: 调用向量存储 API
async def search(self, query: str, user_id: int, top_k: int = 3) -> List[str]:
"""
搜索相关记忆
Args:
query: 查询内容
user_id: 用户 ID
top_k: 返回数量
Returns:
List[str]: 相关的记忆列表
"""
if not self.use_vector:
return self._search_memory(query, user_id, top_k)
# 生成查询向量
query_embedding = await self._get_embedding(query)
# TODO: 调用向量搜索 API
# results = await self.vector_store.search(
# agent_id=self.agent_id,
# user_id=user_id,
# embedding=query_embedding,
# top_k=top_k
# )
return []
async def _get_embedding(self, text: str) -> List[float]:
"""
获取文本向量
Args:
text: 文本
Returns:
List[float]: 向量
"""
# TODO: 实现向量生成
# 可以使用 OpenAI Embedding API 或本地模型
import hashlib
# 简化:使用文本哈希模拟
h = hashlib.md5(text.encode()).digest()
return [float(b) / 255.0 for b in h[:16]] + [0.0] * 16
# === 内存模拟(无向量存储时使用)===
_memory_store = {}
def _add_memory(self, content: str, user_id: int, memory_type: str):
"""内存模拟 - 添加"""
key = f"{self.agent_id}:{user_id}"
if key not in self._memory_store:
self._memory_store[key] = []
self._memory_store[key].append({
"content": content,
"type": memory_type
})
def _search_memory(self, query: str, user_id: int, top_k: int) -> List[str]:
"""内存模拟 - 搜索(简化版:关键词匹配)"""
key = f"{self.agent_id}:{user_id}"
if key not in self._memory_store:
return []
# 简化:包含查询词的记忆
results = []
for mem in self._memory_store[key]:
if query.lower() in mem["content"].lower():
results.append(mem["content"])
return results[:top_k]

View File

@@ -0,0 +1,47 @@
"""
Working Memory - 当前任务上下文(内存级存储)
"""
class WorkingMemory:
"""当前任务上下文,内存级存储"""
def __init__(self):
self.current_task = None
self.recent_messages = []
self.max_size = 10 # 最大保留 10 轮对话
def get(self) -> dict:
"""获取当前记忆"""
return {
'current_task': self.current_task,
'recent_messages': self.recent_messages[-self.max_size:]
}
def add(self, user_input: str, response: str):
"""添加对话到记忆"""
self.recent_messages.append({
'role': 'user',
'content': user_input
})
self.recent_messages.append({
'role': 'assistant',
'content': response
})
# 保持固定大小
if len(self.recent_messages) > self.max_size * 2:
self.recent_messages = self.recent_messages[-self.max_size * 2:]
def set_current_task(self, task: str):
"""设置当前任务"""
self.current_task = task
def clear(self):
"""清空记忆"""
self.recent_messages = []
self.current_task = None
def size(self) -> int:
"""获取对话轮数"""
return len(self.recent_messages) // 2

View File

@@ -0,0 +1,3 @@
# Skills
from app.agent.skills.router import SkillRouter
from app.agent.skills.executor import SkillExecutor

View File

@@ -0,0 +1,129 @@
"""
Skill Executor - 技能执行器
"""
import asyncio
from typing import List, Dict, Any
class SkillExecutor:
"""技能执行器,支持并发/串行执行"""
def __init__(self, skill_registry=None):
"""
初始化技能执行器
Args:
skill_registry: 技能注册表(可选)
"""
self.skill_registry = skill_registry
self._skill_handlers = self._init_default_handlers()
def _init_default_handlers(self) -> Dict[str, callable]:
"""初始化默认技能处理器"""
return {
"query_database": self._handle_query_database,
"data_analysis": self._handle_data_analysis,
"search_knowledge": self._handle_search_knowledge,
"web_search": self._handle_web_search,
}
async def execute(self, skill_id: str, params: dict) -> Dict[str, Any]:
"""
执行单个技能
Args:
skill_id: 技能 ID
params: 技能参数
Returns:
Dict: 执行结果
"""
handler = self._skill_handlers.get(skill_id)
if not handler:
return {
"success": False,
"error": f"Skill {skill_id} not found"
}
try:
result = await handler(params)
return {
"success": True,
"skill_id": skill_id,
"result": result
}
except Exception as e:
return {
"success": False,
"skill_id": skill_id,
"error": str(e)
}
async def execute_multiple(self, skills: List[Dict], strategy: str = "parallel") -> List[Dict]:
"""
批量执行技能
Args:
skills: 技能列表
strategy: 执行策略 (parallel/sequential)
Returns:
List[Dict]: 执行结果列表
"""
if strategy == "parallel":
tasks = [self.execute(s['skill_id'], s['params']) for s in skills]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
formatted_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
formatted_results.append({
"success": False,
"skill_id": skills[i]['skill_id'],
"error": str(result)
})
else:
formatted_results.append(result)
return formatted_results
else:
results = []
for s in skills:
result = await self.execute(s['skill_id'], s['params'])
results.append(result)
return results
# === 默认技能处理器 ===
async def _handle_query_database(self, params: dict) -> Dict[str, Any]:
"""处理数据库查询"""
# TODO: 调用现有的数据库查询功能
return {
"message": "数据库查询功能待实现",
"sql": params.get("sql", "")
}
async def _handle_data_analysis(self, params: dict) -> Dict[str, Any]:
"""处理数据分析"""
# TODO: 调用现有的数据分析功能
return {
"message": "数据分析功能待实现",
"data": params.get("data", {})
}
async def _handle_search_knowledge(self, params: dict) -> Dict[str, Any]:
"""处理知识库搜索"""
# TODO: 调用现有的知识库搜索功能
return {
"message": "知识库搜索功能待实现",
"query": params.get("query", "")
}
async def _handle_web_search(self, params: dict) -> Dict[str, Any]:
"""处理网页搜索"""
# TODO: 实现网页搜索
return {
"message": "网页搜索功能待实现",
"query": params.get("query", "")
}

View File

@@ -0,0 +1,46 @@
"""
Skill Router - 技能路由器
"""
from typing import List, Dict
class SkillRouter:
"""根据 LLM 决策选择要调用的技能"""
def __init__(self, available_skills: List[int]):
"""
初始化技能路由器
Args:
available_skills: 可用技能 ID 列表
"""
self.available_skills = available_skills
async def route(self, llm_decision: dict) -> List[dict]:
"""
解析 LLM 的技能调用决策
Args:
llm_decision: LLM 决策结果
Returns:
List[dict]: 要执行的技能列表
"""
if not llm_decision.get('tool_calls'):
return []
routes = []
for tool_call in llm_decision['tool_calls']:
skill_id = tool_call.get('skill_id') or tool_call.get('tool_name')
# 检查技能是否可用
if self.available_skills and skill_id not in self.available_skills:
continue
routes.append({
'skill_id': skill_id,
'params': tool_call.get('parameters', {}),
'reason': tool_call.get('reason', '')
})
return routes