diff --git a/agent/app/__init__.py b/agent/app/__init__.py deleted file mode 100644 index 894f835..0000000 --- a/agent/app/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# X-Agents Python Agent Engine diff --git a/agent/app/agent/core/__init__.py b/agent/app/agent/core/__init__.py deleted file mode 100644 index 919c6b2..0000000 --- a/agent/app/agent/core/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# Agent Core -from app.agent.core.agent import AgentCore, AgentConfig, AgentResponse -from app.agent.core.supervisor import Supervisor diff --git a/agent/app/agent/core/agent.py b/agent/app/agent/core/agent.py deleted file mode 100644 index 1da9b69..0000000 --- a/agent/app/agent/core/agent.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -Agent Core - 单智能体核心 -""" -import logging -from typing import Optional, List, Dict, Any -from pydantic import BaseModel -from app.agent.memory.manager import MemoryManager -from app.agent.skills.router import SkillRouter -from app.agent.skills.executor import SkillExecutor -from app.agent.llm.factory import LLMFactory - -logger = logging.getLogger("agent.core") - - -class AgentConfig(BaseModel): - """智能体配置""" - id: int - name: str - role_description: str - model_provider: str = "openai" - model_name: str = "gpt-4" - api_key: Optional[str] = None # API Key(可选,用于覆盖默认配置) - base_url: Optional[str] = None # Base URL(可选,用于覆盖默认配置) - skills: List[int] = [] # 技能 ID 列表 - knowledge_base_ids: List[int] = [] - timeout: int = 60 - memory_limit: int = 134217728 # 128MB - - -class AgentResponse(BaseModel): - """智能体响应""" - content: str - tool_calls: List[Dict[str, Any]] = [] - tokens_used: int = 0 - duration_ms: int = 0 - session_id: Optional[str] = None - - -class AgentCore: - """单智能体核心类""" - - def __init__(self, config: AgentConfig): - self.config = config - - # 记录 LLM 初始化信息 - api_key_info = f"{config.api_key[:10]}..." if config.api_key else "None" - logger.info(f"初始化 AgentCore: name={config.name}, provider={config.model_provider}, model={config.model_name}, api_key={api_key_info}, base_url={config.base_url}") - - self.llm = LLMFactory.create(config.model_provider, config.model_name, config.api_key, config.base_url) - self.memory = MemoryManager(config.id) - self.skill_router = SkillRouter(config.skills) - self.skill_executor = SkillExecutor() - - async def run(self, user_input: str, user_id: int, session_id: str) -> AgentResponse: - """ - 执行智能体对话 - - Args: - user_input: 用户输入 - user_id: 用户 ID - session_id: 会话 ID - - Returns: - AgentResponse: 智能体响应 - """ - import time - start_time = time.time() - - try: - # 1. 加载记忆 - context = await self.memory.load_context(user_input, user_id, session_id) - - # 2. 构建 Prompt - prompt = self._build_prompt(user_input, context) - - # 3. LLM 决策 - decision = await self.llm.decide(prompt) - - # 4. 执行技能(如需) - if decision.get('needs_skill'): - skill_results = await self._execute_skills(decision.get('tool_calls', [])) - # 5. 基于结果生成回复 - final_response = await self.llm.generate(prompt, skill_results) - else: - final_response = decision.get('response', '') - - # 6. 保存记忆 - await self.memory.save(user_input, final_response, user_id, session_id) - - duration_ms = int((time.time() - start_time) * 1000) - - return AgentResponse( - content=final_response, - tool_calls=decision.get('tool_calls', []), - duration_ms=duration_ms, - session_id=session_id - ) - except Exception as e: - duration_ms = int((time.time() - start_time) * 1000) - return AgentResponse( - content=f"处理请求时发生错误: {str(e)}", - duration_ms=duration_ms, - session_id=session_id - ) - - def _build_prompt(self, user_input: str, context: dict) -> str: - """构建 Prompt""" - system_prompt = f"""你是 {self.config.name}。 -{self.config.role_description} - -相关记忆: -{context.get('summary', '')} - -知识库信息: -{context.get('knowledge', '')} - -请根据以上上下文回答用户问题,并使用 Markdown 格式输出。""" - - return f"{system_prompt}\n\n用户: {user_input}" - - async def _execute_skills(self, skill_decisions: List[Dict]) -> List[Dict]: - """执行技能""" - if not skill_decisions: - return [] - - results = [] - for decision in skill_decisions: - result = await self.skill_executor.execute( - skill_id=decision.get('skill_id'), - params=decision.get('params', {}) - ) - results.append(result) - return results - - async def run_stream(self, user_input: str, user_id: int, session_id: str): - """ - 执行智能体对话(流式输出) - - 优化:对于简单对话,直接流式生成,跳过 decide 步骤(省一次 LLM 调用) - 只有当需要工具时才先判断 - - Args: - user_input: 用户输入 - user_id: 用户 ID - session_id: 会话 ID - - Yields: - str: 流式回复片段 - """ - import time - start_time = time.time() - - try: - # 1. 加载记忆 - context = await self.memory.load_context(user_input, user_id, session_id) - - # 2. 构建 Prompt - prompt = self._build_prompt(user_input, context) - - # 3. 直接流式生成回复(跳过 decide,省一次 LLM 调用) - # 如果将来需要工具能力,可以在这里添加判断逻辑 - async for chunk in self.llm.generate_stream(prompt, []): - yield chunk - - # 4. 保存记忆(完成后) - final_response = "" - await self.memory.save(user_input, final_response, user_id, session_id) - - except Exception as e: - yield f"处理请求时发生错误: {str(e)}" diff --git a/agent/app/agent/core/supervisor.py b/agent/app/agent/core/supervisor.py deleted file mode 100644 index 8b739ca..0000000 --- a/agent/app/agent/core/supervisor.py +++ /dev/null @@ -1,156 +0,0 @@ -""" -Supervisor - 多智能体调度器 -""" -import asyncio -from typing import List, Dict, Any -from app.agent.core.agent import AgentCore - - -class Supervisor: - """多智能体调度器""" - - def __init__(self, supervisor_agent: AgentCore, members: List[AgentCore], strategy: str = "parallel"): - """ - 初始化调度器 - - Args: - supervisor_agent: 主智能体 - members: 子智能体列表 - strategy: 调度策略 (parallel/sequential) - """ - self.supervisor = supervisor_agent - self.members = members - self.strategy = strategy - - async def run(self, task: str, user_id: int, session_id: str) -> Dict[str, Any]: - """ - 执行多智能体协作 - - Args: - task: 用户任务 - user_id: 用户 ID - session_id: 会话 ID - - Returns: - Dict: 包含主响应和子任务结果 - """ - # 1. 任务分解 - subtasks = await self._decompose_task(task) - - # 2. 分配任务 - if self.strategy == "parallel": - results = await self._dispatch_parallel(subtasks, user_id, session_id) - else: - results = await self._dispatch_sequential(subtasks, user_id, session_id) - - # 3. 汇总结果 - final_result = await self._aggregate(results) - - return { - "main_response": final_result, - "subtask_results": results, - "strategy": self.strategy - } - - async def _decompose_task(self, task: str) -> List[Dict[str, str]]: - """任务分解""" - # 调用 LLM 分解任务 - prompt = f"""分解以下任务为子任务,返回 JSON 数组格式: -任务: {task} - -返回格式示例: -[{{"task": "子任务1描述", "agent_type": "适合的智能体类型"}}, {{"task": "子任务2描述", "agent_type": "适合的智能体类型"}}] - -请直接返回 JSON 数组,不要其他内容。""" - - response = await self.supervisor.llm.generate(prompt, []) - - try: - import json - # 尝试解析 JSON - subtasks = json.loads(response) - return subtasks - except: - # 解析失败,创建默认子任务 - return [{"task": task, "agent_type": "general"}] - - async def _dispatch_parallel(self, subtasks: List[Dict], user_id: int, session_id: str) -> List[Dict]: - """并行分发任务""" - tasks = [] - for i, subtask in enumerate(subtasks): - if i < len(self.members): - member = self.members[i] - else: - # 如果子任务多于成员,使用轮询 - member = self.members[i % len(self.members)] - - tasks.append(member.run(subtask['task'], user_id, session_id)) - - results = await asyncio.gather(*tasks, return_exceptions=True) - - # 处理结果 - formatted_results = [] - for i, result in enumerate(results): - if isinstance(result, Exception): - formatted_results.append({ - "task": subtasks[i]['task'], - "success": False, - "error": str(result) - }) - else: - formatted_results.append({ - "task": subtasks[i]['task'], - "success": True, - "content": result.content, - "tool_calls": result.tool_calls - }) - - return formatted_results - - async def _dispatch_sequential(self, subtasks: List[Dict], user_id: int, session_id: str) -> List[Dict]: - """顺序分发任务""" - results = [] - context = "" - - for i, subtask in enumerate(subtasks): - # 选择子智能体 - if i < len(self.members): - member = self.members[i] - else: - member = self.members[i % len(self.members)] - - # 传递前一个结果作为上下文 - enhanced_task = f"{context}\n\n当前任务: {subtask['task']}" if context else subtask['task'] - - result = await member.run(enhanced_task, user_id, session_id) - - results.append({ - "task": subtask['task'], - "success": True, - "content": result.content, - "tool_calls": result.tool_calls - }) - - # 累加上下文 - context += f"\n\n=== 任务: {subtask['task']} ===\n{result.content}" - - return results - - async def _aggregate(self, results: List[Dict]) -> str: - """汇总结果""" - # 过滤成功的结果 - success_results = [r for r in results if r.get('success')] - - if not success_results: - return "所有子任务执行失败" - - if len(success_results) == 1: - return success_results[0].get('content', '') - - # 调用 LLM 汇总 - summary_prompt = "请汇总以下所有任务的结果,生成一个完整的回复:\n\n" - for i, result in enumerate(success_results, 1): - summary_prompt += f"=== 任务 {i}: {result.get('task', '')} ===\n{result.get('content', '')}\n\n" - - final_response = await self.supervisor.llm.generate(summary_prompt, []) - return final_response diff --git a/agent/app/agent/llm/__init__.py b/agent/app/agent/llm/__init__.py deleted file mode 100644 index cbf8928..0000000 --- a/agent/app/agent/llm/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# LLM -from app.agent.llm.factory import LLMFactory -from app.agent.llm.openai import OpenAILLM -from app.agent.llm.anthropic import AnthropicLLM diff --git a/agent/app/agent/llm/anthropic.py b/agent/app/agent/llm/anthropic.py deleted file mode 100644 index 83e5068..0000000 --- a/agent/app/agent/llm/anthropic.py +++ /dev/null @@ -1,136 +0,0 @@ -""" -Anthropic LLM 实现 -""" -import os -from typing import Dict, Any, List, Optional -from anthropic import AsyncAnthropic - - -class AnthropicLLM: - """Anthropic Claude LLM""" - - def __init__(self, model_name: str = "claude-3-sonnet-20240229", api_key: Optional[str] = None, base_url: Optional[str] = None): - self.model_name = model_name - # 支持自定义 base_url(如 OpenRouter) - if base_url: - self.client = AsyncAnthropic( - api_key=api_key or os.getenv("ANTHROPIC_API_KEY", ""), - base_url=base_url - ) - else: - self.client = AsyncAnthropic( - api_key=api_key or os.getenv("ANTHROPIC_API_KEY", "") - ) - - async def decide(self, prompt: str) -> Dict[str, Any]: - """ - LLM 决策 - 判断是否需要调用技能 - - Args: - prompt: 完整的 Prompt - - Returns: - Dict: 包含 needs_skill, tool_calls, response 等 - """ - system_prompt = """你是一个智能助手。请分析用户请求,判断是否需要调用工具来回答问题。 - -如果需要调用工具,请按以下格式返回 JSON: -{"needs_skill": true, "tool_calls": [{"skill_id": "工具名称", "parameters": {"参数": "值"}, "reason": "调用原因"}]} - -如果不需要调用工具,请返回: -{"needs_skill": false, "response": "直接回答用户的内容"} - -请只返回 JSON,不要其他内容。""" - - try: - response = await self.client.messages.create( - model=self.model_name, - max_tokens=2000, - system=system_prompt, - messages=[{"role": "user", "content": prompt}] - ) - - content = response.content[0].text - - # 尝试解析 JSON - import json - try: - result = json.loads(content) - return result - except json.JSONDecodeError: - return { - "needs_skill": False, - "response": content - } - - except Exception as e: - return { - "needs_skill": False, - "response": f"LLM 调用失败: {str(e)}" - } - - async def generate(self, prompt: str, tool_results: List[Dict]) -> str: - """ - 生成回复 - - Args: - prompt: 完整的 Prompt - tool_results: 工具调用结果 - - Returns: - str: 生成的回复 - """ - user_message = prompt - - # 添加工具结果作为上下文 - if tool_results: - tool_context = "\n\n工具返回结果:\n" - for result in tool_results: - if result.get("success"): - tool_context += f"- {result.get('skill_id')}: {result.get('result')}\n" - user_message += tool_context - - try: - response = await self.client.messages.create( - model=self.model_name, - max_tokens=4000, - messages=[{"role": "user", "content": user_message}] - ) - - return response.content[0].text - - except Exception as e: - return f"生成回复失败: {str(e)}" - - async def generate_stream(self, prompt: str, tool_results: List[Dict]): - """ - 流式生成回复 - - Args: - prompt: 完整的 Prompt - tool_results: 工具调用结果 - - Yields: - str: 生成的回复片段 - """ - user_message = prompt - - # 添加工具结果作为上下文 - if tool_results: - tool_context = "\n\n工具返回结果:\n" - for result in tool_results: - if result.get("success"): - tool_context += f"- {result.get('skill_id')}: {result.get('result')}\n" - user_message += tool_context - - try: - async with self.client.messages.stream( - model=self.model_name, - max_tokens=4000, - messages=[{"role": "user", "content": user_message}] - ) as stream: - async for text in stream.text_stream: - yield text - - except Exception as e: - yield f"生成回复失败: {str(e)}" diff --git a/agent/app/agent/llm/factory.py b/agent/app/agent/llm/factory.py deleted file mode 100644 index f25efb9..0000000 --- a/agent/app/agent/llm/factory.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -LLM Factory - LLM 工厂类 -""" -from typing import Optional -from app.agent.llm.openai import OpenAILLM -from app.agent.llm.anthropic import AnthropicLLM - - -class LLMFactory: - """LLM 工厂类""" - - @staticmethod - def create(provider: str, model_name: str, api_key: Optional[str] = None, base_url: Optional[str] = None): - """ - 创建 LLM 实例 - - Args: - provider: 模型提供商 (openai/anthropic) - model_name: 模型名称 - api_key: API Key(可选) - base_url: Base URL(可选) - - Returns: - LLM 实例 - """ - if provider.lower() == "openai": - return OpenAILLM(model_name, api_key, base_url) - elif provider.lower() == "anthropic": - return AnthropicLLM(model_name, api_key, base_url) - else: - # 默认使用 OpenAI - return OpenAILLM(model_name, api_key, base_url) diff --git a/agent/app/agent/llm/openai.py b/agent/app/agent/llm/openai.py deleted file mode 100644 index 0ce745a..0000000 --- a/agent/app/agent/llm/openai.py +++ /dev/null @@ -1,167 +0,0 @@ -""" -OpenAI LLM 实现 -""" -import os -import logging -from typing import Dict, Any, List, Optional -from openai import AsyncOpenAI -from openai._client import AsyncOpenAI - -logger = logging.getLogger("llm.openai") - - -class OpenAILLM: - """OpenAI LLM""" - - def __init__(self, model_name: str = "gpt-4", api_key: Optional[str] = None, base_url: Optional[str] = None): - self.model_name = model_name - # 优先使用传入的参数,否则使用环境变量 - self.api_key = api_key or os.getenv("OPENAI_API_KEY", "") - self.base_url = base_url or os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") - - api_key_info = f"{self.api_key[:10]}..." if self.api_key else "None" - logger.info(f"初始化 OpenAI LLM: model={model_name}, api_key={api_key_info}, base_url={self.base_url}") - - if not self.api_key: - logger.warning("⚠️ WARNING: No API key provided!") - - # 配置超时 - self.client = AsyncOpenAI( - api_key=self.api_key, - base_url=self.base_url, - timeout=60.0, # 60秒超时 - max_retries=1 # 减少重试次数 - ) - - async def decide(self, prompt: str) -> Dict[str, Any]: - """ - LLM 决策 - 判断是否需要调用技能 - - Args: - prompt: 完整的 Prompt - - Returns: - Dict: 包含 needs_skill, tool_calls, response 等 - """ - # 构建决策用的系统提示 - system_prompt = """你是一个智能助手。请分析用户请求,判断是否需要调用工具来回答问题。 - -如果需要调用工具,请按以下格式返回 JSON: -{ - "needs_skill": true, - "tool_calls": [ - {"skill_id": "工具名称", "parameters": {"参数": "值"}, "reason": "调用原因"} - ] -} - -如果不需要调用工具,请返回: -{ - "needs_skill": false, - "response": "直接回答用户的内容" -} - -请只返回 JSON,不要其他内容。""" - - try: - response = await self.client.chat.completions.create( - model=self.model_name, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": prompt} - ], - temperature=0.7, - max_tokens=2000 - ) - - content = response.choices[0].message.content - - # 尝试解析 JSON - import json - try: - result = json.loads(content) - return result - except json.JSONDecodeError: - # 解析失败,作为普通回复处理 - return { - "needs_skill": False, - "response": content - } - - except Exception as e: - return { - "needs_skill": False, - "response": f"LLM 调用失败: {str(e)}" - } - - async def generate(self, prompt: str, tool_results: List[Dict]) -> str: - """ - 生成回复 - - Args: - prompt: 完整的 Prompt - tool_results: 工具调用结果 - - Returns: - str: 生成的回复 - """ - messages = [{"role": "user", "content": prompt}] - - # 添加工具结果作为上下文 - if tool_results: - for result in tool_results: - if result.get("success"): - messages.append({ - "role": "assistant", - "content": f"工具 {result.get('skill_id')} 返回: {result.get('result')}" - }) - - try: - response = await self.client.chat.completions.create( - model=self.model_name, - messages=messages, - temperature=0.7, - max_tokens=4000 - ) - - return response.choices[0].message.content - - except Exception as e: - return f"生成回复失败: {str(e)}" - - async def generate_stream(self, prompt: str, tool_results: List[Dict]): - """ - 流式生成回复 - - Args: - prompt: 完整的 Prompt - tool_results: 工具调用结果 - - Yields: - str: 生成的回复片段 - """ - messages = [{"role": "user", "content": prompt}] - - # 添加工具结果作为上下文 - if tool_results: - for result in tool_results: - if result.get("success"): - messages.append({ - "role": "assistant", - "content": f"工具 {result.get('skill_id')} 返回: {result.get('result')}" - }) - - try: - response = await self.client.chat.completions.create( - model=self.model_name, - messages=messages, - temperature=0.7, - max_tokens=4000, - stream=True - ) - - async for chunk in response: - if chunk.choices[0].delta.content: - yield chunk.choices[0].delta.content - - except Exception as e: - yield f"生成回复失败: {str(e)}" diff --git a/agent/app/agent/memory/__init__.py b/agent/app/agent/memory/__init__.py deleted file mode 100644 index f163391..0000000 --- a/agent/app/agent/memory/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# Memory -from app.agent.memory.manager import MemoryManager -from app.agent.memory.working import WorkingMemory -from app.agent.memory.session import SessionMemory -from app.agent.memory.persistent import PersistentMemory diff --git a/agent/app/agent/memory/manager.py b/agent/app/agent/memory/manager.py deleted file mode 100644 index 71129c1..0000000 --- a/agent/app/agent/memory/manager.py +++ /dev/null @@ -1,99 +0,0 @@ -""" -Memory Manager - 记忆管理器 -""" -from typing import Dict, List, Optional -from app.agent.memory.working import WorkingMemory -from app.agent.memory.session import SessionMemory -from app.agent.memory.persistent import PersistentMemory - - -class MemoryManager: - """记忆管理器 - 统一接口""" - - def __init__(self, agent_id: int): - self.agent_id = agent_id - self.working = WorkingMemory() - self.session = SessionMemory(agent_id) - self.persistent = PersistentMemory(agent_id) - - async def load_context(self, query: str, user_id: int, session_id: str) -> Dict[str, str]: - """ - 加载上下文记忆 - - 优化:跳过耗时的向量搜索,提升响应速度 - 生产环境可以加回来 - - Args: - query: 查询内容 - user_id: 用户 ID - session_id: 会话 ID - - Returns: - Dict: 包含 summary, knowledge 等 - """ - # 1. Working Memory (内存,最快) - working_context = self.working.get() - - # 2. Session Memory (Redis) - 暂时跳过,减少延迟 - # session_context = await self.session.get_summary(user_id, session_id) - session_context = "" - - # 3. Persistent Memory (向量库) - 暂时跳过,减少延迟 - # persistent_context = await self.persistent.search(query, user_id, top_k=3) - persistent_context = [] - - return { - 'working': working_context.get('recent_messages', []), - 'session': session_context, - 'persistent': persistent_context, - 'summary': "", # 简化 - 'knowledge': "" - } - - async def save(self, user_input: str, response: str, user_id: int, session_id: str): - """ - 保存记忆 - - Args: - user_input: 用户输入 - response: 智能体回复 - user_id: 用户 ID - session_id: 会话 ID - """ - # 1. 写入 Working - self.working.add(user_input, response) - - # 2. 写入 Session (定期摘要) - await self.session.add(user_input, response, user_id, session_id) - - # 3. 提取关键信息写入 Persistent (定期) - if self.working.size() >= 5: - await self._extract_and_persist(user_input, response, user_id) - - def _build_summary(self, session_context: str, persistent_context: List[str]) -> str: - """构建记忆摘要""" - parts = [] - - if session_context: - parts.append(f"会话记忆: {session_context}") - - if persistent_context: - parts.append(f"长期记忆: {'; '.join(persistent_context[:3])}") - - return "\n".join(parts) if parts else "无相关记忆" - - async def _extract_and_persist(self, user_input: str, response: str, user_id: int): - """提取并持久化关键信息""" - # 提取关键信息(简化版:取前100字符作为摘要) - key_points = [] - - # 简化:直接保存重要交互 - if len(response) > 50: # 只保存有意义的回复 - summary = response[:100] + "..." - key_points.append(summary) - - for point in key_points: - await self.persistent.add(point, user_id, memory_type="conversation") - - # 重置 Working Memory - self.working.clear() diff --git a/agent/app/agent/memory/persistent.py b/agent/app/agent/memory/persistent.py deleted file mode 100644 index 8a048cf..0000000 --- a/agent/app/agent/memory/persistent.py +++ /dev/null @@ -1,109 +0,0 @@ -""" -Persistent Memory - 长期记忆(向量存储) -""" -from typing import List, Optional - - -class PersistentMemory: - """长期记忆,向量存储""" - - def __init__(self, agent_id: int, vector_store=None): - """ - 初始化长期记忆 - - Args: - agent_id: 智能体 ID - vector_store: 向量存储客户端(可选) - """ - self.agent_id = agent_id - self.vector_store = vector_store - self.use_vector = vector_store is not None - - async def add(self, content: str, user_id: int, memory_type: str = "experience"): - """ - 添加长期记忆 - - Args: - content: 记忆内容 - user_id: 用户 ID - memory_type: 记忆类型 (experience/preference/conversation) - """ - if not self.use_vector: - return self._add_memory(content, user_id, memory_type) - - # 生成向量并存储 - embedding = await self._get_embedding(content) - # TODO: 调用向量存储 API - - async def search(self, query: str, user_id: int, top_k: int = 3) -> List[str]: - """ - 搜索相关记忆 - - Args: - query: 查询内容 - user_id: 用户 ID - top_k: 返回数量 - - Returns: - List[str]: 相关的记忆列表 - """ - if not self.use_vector: - return self._search_memory(query, user_id, top_k) - - # 生成查询向量 - query_embedding = await self._get_embedding(query) - - # TODO: 调用向量搜索 API - # results = await self.vector_store.search( - # agent_id=self.agent_id, - # user_id=user_id, - # embedding=query_embedding, - # top_k=top_k - # ) - - return [] - - async def _get_embedding(self, text: str) -> List[float]: - """ - 获取文本向量 - - Args: - text: 文本 - - Returns: - List[float]: 向量 - """ - # TODO: 实现向量生成 - # 可以使用 OpenAI Embedding API 或本地模型 - import hashlib - # 简化:使用文本哈希模拟 - h = hashlib.md5(text.encode()).digest() - return [float(b) / 255.0 for b in h[:16]] + [0.0] * 16 - - # === 内存模拟(无向量存储时使用)=== - _memory_store = {} - - def _add_memory(self, content: str, user_id: int, memory_type: str): - """内存模拟 - 添加""" - key = f"{self.agent_id}:{user_id}" - if key not in self._memory_store: - self._memory_store[key] = [] - - self._memory_store[key].append({ - "content": content, - "type": memory_type - }) - - def _search_memory(self, query: str, user_id: int, top_k: int) -> List[str]: - """内存模拟 - 搜索(简化版:关键词匹配)""" - key = f"{self.agent_id}:{user_id}" - if key not in self._memory_store: - return [] - - # 简化:包含查询词的记忆 - results = [] - for mem in self._memory_store[key]: - if query.lower() in mem["content"].lower(): - results.append(mem["content"]) - - return results[:top_k] diff --git a/agent/app/agent/memory/session.py b/agent/app/agent/memory/session.py deleted file mode 100644 index fed52b7..0000000 --- a/agent/app/agent/memory/session.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -Session Memory - 会话级记忆(Redis 存储) -""" -import json -from typing import Optional - - -class SessionMemory: - """会话级记忆,Redis 存储""" - - def __init__(self, agent_id: int, redis_client=None): - """ - 初始化会话记忆 - - Args: - agent_id: 智能体 ID - redis_client: Redis 客户端(可选) - """ - self.agent_id = agent_id - self.redis = redis_client - self.ttl = 3600 * 24 # 24 小时 - self.summary_threshold = 10 # 多少条消息后生成摘要 - - def _key(self, user_id: int, session_id: str) -> str: - """生成 Redis Key""" - return f"agent:memory:session:{self.agent_id}:{user_id}:{session_id}" - - async def add(self, user_input: str, response: str, user_id: int, session_id: str): - """ - 添加对话到会话记忆 - - Args: - user_input: 用户输入 - response: 智能体回复 - user_id: 用户 ID - session_id: 会话 ID - """ - if not self.redis: - # 如果没有 Redis,使用内存模拟 - return self._add_memory(user_input, response, user_id, session_id) - - key = self._key(user_id, session_id) - - # 获取现有数据 - data = await self.redis.get(key) - messages = json.loads(data) if data else {"messages": [], "summary": ""} - - # 添加新消息 - messages["messages"].append({"role": "user", "content": user_input}) - messages["messages"].append({"role": "assistant", "content": response}) - - # 定期生成摘要 - if len(messages["messages"]) >= self.summary_threshold: - messages["summary"] = await self._generate_summary(messages["messages"]) - - # 保持消息数量 - if len(messages["messages"]) > 50: - messages["messages"] = messages["messages"][-50:] - - await self.redis.setex(key, self.ttl, json.dumps(messages)) - - async def get_summary(self, user_id: int, session_id: str) -> str: - """ - 获取会话摘要 - - Args: - user_id: 用户 ID - session_id: 会话 ID - - Returns: - str: 会话摘要 - """ - if not self.redis: - return self._get_memory_summary(user_id, session_id) - - key = self._key(user_id, session_id) - data = await self.redis.get(key) - - if data: - messages = json.loads(data) - return messages.get("summary", "") - return "" - - async def _generate_summary(self, messages: list) -> str: - """ - 生成摘要(简化版) - - Args: - messages: 消息列表 - - Returns: - str: 摘要 - """ - # 简化:取最后几条消息的要点 - if not messages: - return "" - - recent = messages[-6:] # 最近 3 轮 - summary = f"最近对话包含 {len(messages)//2} 轮交互" - - # TODO: 后续可以使用 LLM 生成更好的摘要 - return summary - - # === 内存模拟(无 Redis 时使用)=== - _memory_store = {} - - def _add_memory(self, user_input: str, response: str, user_id: int, session_id: str): - """内存模拟 - 添加""" - key = f"{self.agent_id}:{user_id}:{session_id}" - if key not in self._memory_store: - self._memory_store[key] = {"messages": [], "summary": ""} - - messages = self._memory_store[key]["messages"] - messages.append({"role": "user", "content": user_input}) - messages.append({"role": "assistant", "content": response}) - - if len(messages) >= self.summary_threshold: - self._memory_store[key]["summary"] = self._generate_summary(messages) - - def _get_memory_summary(self, user_id: int, session_id: str) -> str: - """内存模拟 - 获取摘要""" - key = f"{self.agent_id}:{user_id}:{session_id}" - if key in self._memory_store: - return self._memory_store[key].get("summary", "") - return "" diff --git a/agent/app/agent/memory/working.py b/agent/app/agent/memory/working.py deleted file mode 100644 index 3fbab69..0000000 --- a/agent/app/agent/memory/working.py +++ /dev/null @@ -1,47 +0,0 @@ -""" -Working Memory - 当前任务上下文(内存级存储) -""" - - -class WorkingMemory: - """当前任务上下文,内存级存储""" - - def __init__(self): - self.current_task = None - self.recent_messages = [] - self.max_size = 10 # 最大保留 10 轮对话 - - def get(self) -> dict: - """获取当前记忆""" - return { - 'current_task': self.current_task, - 'recent_messages': self.recent_messages[-self.max_size:] - } - - def add(self, user_input: str, response: str): - """添加对话到记忆""" - self.recent_messages.append({ - 'role': 'user', - 'content': user_input - }) - self.recent_messages.append({ - 'role': 'assistant', - 'content': response - }) - - # 保持固定大小 - if len(self.recent_messages) > self.max_size * 2: - self.recent_messages = self.recent_messages[-self.max_size * 2:] - - def set_current_task(self, task: str): - """设置当前任务""" - self.current_task = task - - def clear(self): - """清空记忆""" - self.recent_messages = [] - self.current_task = None - - def size(self) -> int: - """获取对话轮数""" - return len(self.recent_messages) // 2 diff --git a/agent/app/agent/skills/__init__.py b/agent/app/agent/skills/__init__.py deleted file mode 100644 index e55d801..0000000 --- a/agent/app/agent/skills/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# Skills -from app.agent.skills.router import SkillRouter -from app.agent.skills.executor import SkillExecutor diff --git a/agent/app/agent/skills/executor.py b/agent/app/agent/skills/executor.py deleted file mode 100644 index 9db8bd5..0000000 --- a/agent/app/agent/skills/executor.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -Skill Executor - 技能执行器 -""" -import asyncio -from typing import List, Dict, Any - - -class SkillExecutor: - """技能执行器,支持并发/串行执行""" - - def __init__(self, skill_registry=None): - """ - 初始化技能执行器 - - Args: - skill_registry: 技能注册表(可选) - """ - self.skill_registry = skill_registry - self._skill_handlers = self._init_default_handlers() - - def _init_default_handlers(self) -> Dict[str, callable]: - """初始化默认技能处理器""" - return { - "query_database": self._handle_query_database, - "data_analysis": self._handle_data_analysis, - "search_knowledge": self._handle_search_knowledge, - "web_search": self._handle_web_search, - } - - async def execute(self, skill_id: str, params: dict) -> Dict[str, Any]: - """ - 执行单个技能 - - Args: - skill_id: 技能 ID - params: 技能参数 - - Returns: - Dict: 执行结果 - """ - handler = self._skill_handlers.get(skill_id) - - if not handler: - return { - "success": False, - "error": f"Skill {skill_id} not found" - } - - try: - result = await handler(params) - return { - "success": True, - "skill_id": skill_id, - "result": result - } - except Exception as e: - return { - "success": False, - "skill_id": skill_id, - "error": str(e) - } - - async def execute_multiple(self, skills: List[Dict], strategy: str = "parallel") -> List[Dict]: - """ - 批量执行技能 - - Args: - skills: 技能列表 - strategy: 执行策略 (parallel/sequential) - - Returns: - List[Dict]: 执行结果列表 - """ - if strategy == "parallel": - tasks = [self.execute(s['skill_id'], s['params']) for s in skills] - results = await asyncio.gather(*tasks, return_exceptions=True) - - # 处理异常结果 - formatted_results = [] - for i, result in enumerate(results): - if isinstance(result, Exception): - formatted_results.append({ - "success": False, - "skill_id": skills[i]['skill_id'], - "error": str(result) - }) - else: - formatted_results.append(result) - return formatted_results - else: - results = [] - for s in skills: - result = await self.execute(s['skill_id'], s['params']) - results.append(result) - return results - - # === 默认技能处理器 === - - async def _handle_query_database(self, params: dict) -> Dict[str, Any]: - """处理数据库查询""" - # TODO: 调用现有的数据库查询功能 - return { - "message": "数据库查询功能待实现", - "sql": params.get("sql", "") - } - - async def _handle_data_analysis(self, params: dict) -> Dict[str, Any]: - """处理数据分析""" - # TODO: 调用现有的数据分析功能 - return { - "message": "数据分析功能待实现", - "data": params.get("data", {}) - } - - async def _handle_search_knowledge(self, params: dict) -> Dict[str, Any]: - """处理知识库搜索""" - # TODO: 调用现有的知识库搜索功能 - return { - "message": "知识库搜索功能待实现", - "query": params.get("query", "") - } - - async def _handle_web_search(self, params: dict) -> Dict[str, Any]: - """处理网页搜索""" - # TODO: 实现网页搜索 - return { - "message": "网页搜索功能待实现", - "query": params.get("query", "") - } diff --git a/agent/app/agent/skills/router.py b/agent/app/agent/skills/router.py deleted file mode 100644 index a43e6c7..0000000 --- a/agent/app/agent/skills/router.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -Skill Router - 技能路由器 -""" -from typing import List, Dict - - -class SkillRouter: - """根据 LLM 决策选择要调用的技能""" - - def __init__(self, available_skills: List[int]): - """ - 初始化技能路由器 - - Args: - available_skills: 可用技能 ID 列表 - """ - self.available_skills = available_skills - - async def route(self, llm_decision: dict) -> List[dict]: - """ - 解析 LLM 的技能调用决策 - - Args: - llm_decision: LLM 决策结果 - - Returns: - List[dict]: 要执行的技能列表 - """ - if not llm_decision.get('tool_calls'): - return [] - - routes = [] - for tool_call in llm_decision['tool_calls']: - skill_id = tool_call.get('skill_id') or tool_call.get('tool_name') - - # 检查技能是否可用 - if self.available_skills and skill_id not in self.available_skills: - continue - - routes.append({ - 'skill_id': skill_id, - 'params': tool_call.get('parameters', {}), - 'reason': tool_call.get('reason', '') - }) - - return routes diff --git a/agent/app/main.py b/agent/app/main.py deleted file mode 100644 index b059b22..0000000 --- a/agent/app/main.py +++ /dev/null @@ -1,549 +0,0 @@ -""" -FastAPI Agent Engine Server -""" -import os -import sys -import time -import logging -from datetime import datetime -from typing import Optional -from fastapi import FastAPI, HTTPException -from fastapi.responses import StreamingResponse -from pydantic import BaseModel -from fastapi.middleware.cors import CORSMiddleware -import asyncio - -from app.agent.core import AgentConfig -from app.xbot import XBotAgent - - -# 日志目录 - 放在 server/logs 下 -LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "server", "logs", datetime.now().strftime("%Y-%m-%d")) -os.makedirs(LOG_DIR, exist_ok=True) - -# 成功/失败日志文件 -today = datetime.now().strftime("%Y-%m-%d") -success_log_file = os.path.join(LOG_DIR, f"python_success.log") -failure_log_file = os.path.join(LOG_DIR, f"python_failure.log") - - -def setup_logging(): - """配置日志系统""" - log_level = os.getenv("LOG_LEVEL", "INFO") - - # 创建格式化器 - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' - ) - - # 控制台处理器 - console_handler = logging.StreamHandler(sys.stdout) - console_handler.setFormatter(formatter) - - # 成功日志文件处理器 - success_handler = logging.FileHandler(success_log_file, encoding='utf-8') - success_handler.setFormatter(formatter) - success_handler.setLevel(logging.INFO) - - # 失败日志文件处理器 - failure_handler = logging.FileHandler(failure_log_file, encoding='utf-8') - failure_handler.setFormatter(formatter) - failure_handler.setLevel(logging.WARNING) - - # 根日志配置 - root_logger = logging.getLogger() - root_logger.setLevel(getattr(logging, log_level)) - root_logger.addHandler(console_handler) - root_logger.addHandler(success_handler) - root_logger.addHandler(failure_handler) - - return root_logger - - -# 成功日志记录器(只记录 INFO 级别到成功日志) -class SuccessLogger: - """成功日志记录器""" - - @staticmethod - def log(message: str): - """记录成功日志""" - logger = logging.getLogger("success") - logger.setLevel(logging.INFO) - handler = logging.FileHandler(success_log_file, encoding='utf-8') - handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) - logger.addHandler(handler) - logger.info(message) - - # 同时输出到控制台 - print(f"✅ {message}") - - -# 失败日志记录器 -class FailureLogger: - """失败日志记录器""" - - @staticmethod - def log(message: str, error: str = ""): - """记录失败日志""" - logger = logging.getLogger("failure") - logger.setLevel(logging.WARNING) - handler = logging.FileHandler(failure_log_file, encoding='utf-8') - handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s - %(error)s', datefmt='%Y-%m-%d %H:%M:%S')) - logger.addHandler(handler) - - full_message = f"{message} - Error: {error}" if error else message - logger.warning(full_message) - - # 同时输出到控制台 - print(f"❌ {full_message}") - - -logger = setup_logging() - -app = FastAPI(title="X-Agents Python Engine", version="1.0.0") - -# CORS -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - - -# === 请求/响应模型 === - -class ChatRequest(BaseModel): - """对话请求""" - agent_id: int - message: str - user_id: int = 1 - session_id: Optional[str] = None - # 模型参数(可选,如果传了就使用,否则用智能体配置的默认模型) - model_id: Optional[str] = None - model_name: Optional[str] = None - model_provider: Optional[str] = None - api_key: Optional[str] = None - base_url: Optional[str] = None - # Embedding 模型(可选) - embedding_model: Optional[str] = None - embedding_base_url: Optional[str] = None - - -class TeamChatRequest(BaseModel): - """多智能体群聊请求""" - supervisor_agent_id: int - member_agent_ids: list[int] - message: str - user_id: int = 1 - session_id: Optional[str] = None - strategy: str = "parallel" - - -class CreateAgentRequest(BaseModel): - """创建智能体请求""" - name: str - description: Optional[str] = None - avatar: str = "🤖" - # 技能配置 - skills_mode: str = "all" # all / include / exclude - skills: list[str] = [] # 技能ID列表 - # 知识库 - knowledge: str = "general" # general / codebase / docs / api - # 自定义提示词 - prompt: Optional[str] = None - # 模型配置 - model_provider: Optional[str] = None - model_name: Optional[str] = None - user_id: int = 1 - - -class CreateAgentResponse(BaseModel): - """创建智能体响应""" - agent_id: int - name: str - message: str = "Agent created successfully" - - -class ChatResponse(BaseModel): - """对话响应""" - agent_id: int - response: str - tool_calls: list = [] - tokens_used: int = 0 - duration_ms: int = 0 - session_id: Optional[str] = None - - -# === 模拟数据存储 === -# TODO: 后续替换为从数据库加载 -_mock_agents = { - 1: { - "id": 1, - "name": "数据分析助手", - "role_description": "你是一个专业的数据分析助手,擅长分析数据、生成报告。", - "model_provider": "openai", - "model_name": "gpt-4", - "skills": [1, 2] - }, - 2: { - "id": 2, - "name": "代码审查助手", - "role_description": "你是一个专业的代码审查助手,擅长审查代码、发现bug。", - "model_provider": "openai", - "model_name": "gpt-4", - "skills": [3] - } -} - - -def get_agent_config(agent_id: int, api_key: str = None, base_url: str = None) -> AgentConfig: - """获取智能体配置""" - agent_data = _mock_agents.get(agent_id) - if not agent_data: - raise HTTPException(status_code=404, detail="Agent not found") - - return AgentConfig( - id=agent_data["id"], - name=agent_data["name"], - role_description=agent_data["role_description"], - model_provider=agent_data["model_provider"], - model_name=agent_data["model_name"], - api_key=api_key, - base_url=base_url, - skills=agent_data.get("skills", []) - ) - - -# === API 路由 === - -@app.get("/") -async def root(): - return {"message": "X-Agents Python Engine", "version": "1.0.0"} - - -@app.get("/health") -async def health(): - return {"status": "healthy"} - - -@app.post("/agent/chat", response_model=ChatResponse) -async def chat(request: ChatRequest): - """ - 单智能体对话 - """ - chat_logger = logging.getLogger("agent.chat") - - # 打印请求参数(隐藏 api_key 敏感信息) - api_key_preview = f"{request.api_key[:10]}..." if request.api_key else "None" - chat_logger.info(f"========== 收到聊天请求 ==========") - chat_logger.info(f"agent_id: {request.agent_id}") - chat_logger.info(f"model_id: {request.model_id}") - chat_logger.info(f"model_provider: {request.model_provider}") - chat_logger.info(f"model_name: {request.model_name}") - chat_logger.info(f"api_key: {api_key_preview}") - chat_logger.info(f"base_url: {request.base_url}") - chat_logger.info(f"message: {request.message[:50]}...") - - start_time = time.time() - - # 获取智能体配置 - try: - config = get_agent_config(request.agent_id, request.api_key, request.base_url) - chat_logger.info(f"Agent config loaded: provider={config.model_provider}, model={config.model_name}") - except HTTPException as e: - FailureLogger.log(f"Agent not found: agent_id={request.agent_id}", str(e)) - chat_logger.error(f"Agent not found: {e}") - raise - except Exception as e: - FailureLogger.log(f"Error loading config: agent_id={request.agent_id}", str(e)) - chat_logger.error(f"Error loading config: {e}") - raise HTTPException(status_code=400, detail=str(e)) - - # 如果请求中指定了模型,覆盖智能体的默认配置 - if request.model_provider: - config.model_provider = request.model_provider - if request.model_name: - config.model_name = request.model_name - - chat_logger.info(f"Final LLM config: provider={config.model_provider}, model={config.model_name}, api_key={config.api_key[:10] if config.api_key else 'None'}..., base_url={config.base_url}") - - # 生成 session_id - session_id = request.session_id or f"session_{int(time.time())}" - - # 执行对话 - 默认使用 XBot Agent (nanobot 核心) - try: - xbot = XBotAgent( - name=config.name, - role_description=config.role_description, - provider=config.model_provider, - model=config.model_name, - api_key=request.api_key or config.api_key, - base_url=request.base_url or config.base_url, - embedding_model=request.embedding_model, - embedding_base_url=request.embedding_base_url, - ) - result = await xbot.run(request.message, session_id) - response_content = result["content"] - tool_calls = [{"name": tc} for tc in result.get("tool_calls", [])] if result.get("tool_calls") else [] - except Exception as e: - FailureLogger.log(f"Agent execution failed: agent_id={request.agent_id}, message={request.message[:30]}", str(e)) - chat_logger.error(f"Agent execution error: {e}") - raise HTTPException(status_code=500, detail=str(e)) - - duration_ms = int((time.time() - start_time) * 1000) - - # 记录成功日志 - SuccessLogger.log(f"Chat success: agent_id={request.agent_id}, duration={duration_ms}ms, message={request.message[:30]}...") - - return ChatResponse( - agent_id=request.agent_id, - response=response_content, - tool_calls=tool_calls, - tokens_used=0, - duration_ms=duration_ms, - session_id=session_id - ) - - -@app.post("/agent/chat/stream") -async def chat_stream(request: ChatRequest): - """ - 单智能体对话(流式输出) - """ - chat_logger = logging.getLogger("agent.chat.stream") - - # 打印请求参数 - api_key_preview = f"{request.api_key[:10]}..." if request.api_key else "None" - base_url_preview = request.base_url if request.base_url else "None" - chat_logger.info(f"========== 收到流式聊天请求 ==========") - chat_logger.info(f"agent_id: {request.agent_id}") - chat_logger.info(f"model_provider: {request.model_provider}") - chat_logger.info(f"model_name: {request.model_name}") - chat_logger.info(f"api_key: {api_key_preview}") - chat_logger.info(f"base_url: {base_url_preview}") - - # 获取智能体配置 - try: - config = get_agent_config(request.agent_id, request.api_key, request.base_url) - except HTTPException as e: - chat_logger.error(f"Agent not found: {e}") - raise - except Exception as e: - chat_logger.error(f"Error loading config: {e}") - raise HTTPException(status_code=400, detail=str(e)) - - # 如果请求中指定了模型,覆盖智能体的默认配置 - if request.model_provider: - config.model_provider = request.model_provider - if request.model_name: - config.model_name = request.model_name - - chat_logger.info(f"最终配置 - provider: {config.model_provider}, model: {config.model_name}, base_url: {config.base_url}") - - # 生成 session_id - session_id = request.session_id or f"session_{int(time.time())}" - - # Mock 模式测试流式 - if request.message.startswith("/mock "): - mock_text = request.message[6:] # 去掉 "/mock " 前缀 - async def mock_stream(): - for char in mock_text: - yield f"data: {char}\n\n" - await asyncio.sleep(0.05) # 50ms 延迟模拟流式 - yield f"data: [DONE]\n\n" - return StreamingResponse(mock_stream(), media_type="text/event-stream") - - # 使用 XBot Agent (nanobot 核心) - xbot = XBotAgent( - name=config.name, - role_description=config.role_description, - provider=config.model_provider, - model=config.model_name, - api_key=request.api_key or config.api_key, - base_url=request.base_url or config.base_url, - embedding_model=request.embedding_model, - embedding_base_url=request.embedding_base_url, - ) - - async def event_generator(): - """SSE 事件生成器""" - try: - # 执行流式对话 - async for chunk in xbot.run_stream(request.message, session_id): - # 发送 SSE 格式的数据 - yield f"data: {chunk}\n\n" - - # 发送结束信号 - yield f"data: [DONE]\n\n" - - except Exception as e: - chat_logger.error(f"Stream error: {e}") - yield f"data: {{\"error\": \"{str(e)}\"}}\n\n" - - return StreamingResponse(event_generator(), media_type="text/event-stream") - - -@app.post("/agent/team/chat") -async def team_chat(request: TeamChatRequest): - """ - 多智能体群聊 - """ - start_time = time.time() - - # 创建主智能体 - try: - supervisor_config = get_agent_config(request.supervisor_agent_id) - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=400, detail=str(e)) - - # 使用 XBot 作为主智能体 - supervisor_agent = XBotAgent( - name=supervisor_config.name, - role_description=supervisor_config.role_description, - provider=supervisor_config.model_provider, - model=supervisor_config.model_name, - api_key=supervisor_config.api_key, - base_url=supervisor_config.base_url, - ) - - # 创建子智能体 - members = [] - for member_id in request.member_agent_ids: - try: - member_config = get_agent_config(member_id) - members.append(XBotAgent( - name=member_config.name, - role_description=member_config.role_description, - provider=member_config.model_provider, - model=member_config.model_name, - api_key=member_config.api_key, - base_url=member_config.base_url, - )) - except: - continue - - if not members: - raise HTTPException(status_code=400, detail="No valid member agents") - - # TODO: 群聊调度逻辑 - 目前简化为串行执行 - # 生成 session_id - session_id = request.session_id or f"team_session_{int(time.time())}" - - # 串行执行每个智能体 - subtask_results = [] - main_response = "" - - try: - # 主智能体先处理 - result = await supervisor_agent.run(request.message, session_id) - main_response = result["content"] - subtask_results.append({ - "agent_id": request.supervisor_agent_id, - "response": main_response, - }) - - # 子智能体并行处理 - # import asyncio - # results = await asyncio.gather(*[m.run(request.message, session_id) for m in members]) - # for m, r in zip(members, results): - # subtask_results.append({"agent_id": m.name, "response": r["content"]}) - - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - duration_ms = int((time.time() - start_time) * 1000) - - return { - "supervisor_agent_id": request.supervisor_agent_id, - "response": main_response, - "subtask_results": subtask_results, - "strategy": request.strategy or "parallel", - "duration_ms": duration_ms, - "session_id": session_id - } - - -@app.post("/agent/create", response_model=CreateAgentResponse) -async def create_agent(request: CreateAgentRequest): - """ - 创建新的智能体 - """ - import json - import uuid - - # 生成唯一的 agent_id - agent_id = int(datetime.now().timestamp() * 1000) % 100000 - - # 构建 Agent 配置 - agent_config = { - "id": agent_id, - "name": request.name, - "description": request.description or "", - "avatar": request.avatar, - "skills_mode": request.skills_mode, - "skills": request.skills, - "knowledge": request.knowledge, - "role_description": request.prompt or f"You are {request.name}. {request.description or ''}", - "model_provider": request.model_provider or "anthropic", - "model_name": request.model_name or "claude-sonnet-4-20250514", - } - - # 保存到 agents 目录 - agents_dir = os.path.join(os.path.dirname(__file__), "agents") - os.makedirs(agents_dir, exist_ok=True) - - config_file = os.path.join(agents_dir, f"agent_{agent_id}.json") - with open(config_file, "w", encoding="utf-8") as f: - json.dump(agent_config, f, ensure_ascii=False, indent=2) - - logger.info(f"Agent created: {request.name} (ID: {agent_id})") - - return CreateAgentResponse( - agent_id=agent_id, - name=request.name, - message="Agent created successfully" - ) - - -@app.get("/agent/list") -async def list_agents(): - """ - 获取智能体列表 - """ - import json - - agents_dir = os.path.join(os.path.dirname(__file__), "agents") - if not os.path.exists(agents_dir): - return {"agents": []} - - agents = [] - for file in os.listdir(agents_dir): - if file.endswith(".json"): - config_file = os.path.join(agents_dir, file) - try: - with open(config_file, "r", encoding="utf-8") as f: - agent = json.load(f) - agents.append(agent) - except: - continue - - return {"agents": agents} - - -if __name__ == "__main__": - import uvicorn - port = int(os.getenv("AGENT_PORT", "8081")) - uvicorn.run( - app, - host="0.0.0.0", - port=port, - loop="asyncio", - http="h11", - access_log=False, - timeout_keep_alive=5, - ) diff --git a/agent/app/xbot/__init__.py b/agent/app/xbot/__init__.py deleted file mode 100644 index 59dd78b..0000000 --- a/agent/app/xbot/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -"""XBot - 轻量级 Agent 框架(基于 nanobot 核心)""" - -from .loop import AgentLoop -from .memory import MemoryConsolidator, MemoryStore -from .session import Session, SessionManager -from .adapter import XBotLLMAdapter -from .agent import XBotAgent - -__all__ = [ - "AgentLoop", - "MemoryConsolidator", - "MemoryStore", - "Session", - "SessionManager", - "XBotLLMAdapter", - "XBotAgent", -] diff --git a/agent/app/xbot/adapter.py b/agent/app/xbot/adapter.py deleted file mode 100644 index 3c73301..0000000 --- a/agent/app/xbot/adapter.py +++ /dev/null @@ -1,186 +0,0 @@ -"""LLM Adapter - 将现有 LLM 适配到 XBot 接口""" - -import json -from dataclasses import dataclass, field -from typing import Any, Optional - -from app.agent.llm.factory import LLMFactory - - -@dataclass -class ToolCallRequest: - """A tool call request from the LLM.""" - id: str - name: str - arguments: dict[str, Any] - - def to_openai_tool_call(self) -> dict[str, Any]: - return { - "id": self.id, - "type": "function", - "function": { - "name": self.name, - "arguments": json.dumps(self.arguments, ensure_ascii=False), - }, - } - - -@dataclass -class LLMResponse: - """Response from an LLM provider.""" - content: str | None - tool_calls: list[ToolCallRequest] = field(default_factory=list) - finish_reason: str = "stop" - usage: dict[str, int] = field(default_factory=dict) - reasoning_content: str | None = None - - @property - def has_tool_calls(self) -> bool: - return len(self.tool_calls) > 0 - - -class XBotLLMAdapter: - """ - 适配器:将现有 LLM 适配到 XBot 的 LLMProvider 接口 - - 封装 LLMFactory 创建的 LLM,使其符合 nanobot 风格的接口: - - chat_with_retry(messages, tools, model) -> LLMResponse - """ - - def __init__( - self, - provider: str, - model_name: str, - api_key: Optional[str] = None, - base_url: Optional[str] = None, - temperature: float = 0.7, - max_tokens: int = 4096, - ): - self.provider_name = provider - self.model = model_name - self.temperature = temperature - self.max_tokens = max_tokens - - # 创建底层 LLM - self._llm = LLMFactory.create(provider, model_name, api_key, base_url) - - # 检查是否支持 tool calling - self._supports_tools = self._check_tool_support() - - def _check_tool_support(self) -> bool: - """检查模型是否支持 tool calling""" - # GPT-4, Claude 支持 tool calling - # 简单的判断逻辑 - model_lower = self.model.lower() - if "gpt-4" in model_lower or "claude" in model_lower: - return True - return True # 默认支持 - - async def chat_with_retry( - self, - messages: list[dict[str, Any]], - tools: list[dict[str, Any]] | None = None, - model: str | None = None, - max_tokens: int | None = None, - temperature: float | None = None, - ) -> LLMResponse: - """ - 发送聊天请求(支持 tool calling) - - Args: - messages: 消息列表 - tools: 工具定义列表 - model: 模型名称(可选) - max_tokens: 最大 tokens(可选) - temperature: 温度(可选) - - Returns: - LLMResponse: 包含内容和/或工具调用 - """ - model = model or self.model - max_tokens = max_tokens or self.max_tokens - temperature = temperature or self.temperature - - try: - # 使用流式调用来获取完整响应 - response = await self._llm.client.chat.completions.create( - model=model, - messages=messages, - tools=tools, - temperature=temperature, - max_tokens=max_tokens, - ) - - message = response.choices[0].message - - # 检查是否有 tool calls - if message.tool_calls and tools: - tool_calls = [] - for tc in message.tool_calls: - tool_calls.append(ToolCallRequest( - id=tc.id, - name=tc.function.name, - arguments=json.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else tc.function.arguments, - )) - - return LLMResponse( - content=message.content, - tool_calls=tool_calls, - finish_reason="tool_calls", - ) - else: - return LLMResponse( - content=message.content or "", - finish_reason="stop", - ) - - except Exception as e: - return LLMResponse( - content=f"Error calling LLM: {str(e)}", - finish_reason="error", - ) - - async def chat( - self, - messages: list[dict[str, Any]], - tools: list[dict[str, Any]] | None = None, - model: str | None = None, - max_tokens: int = 4096, - temperature: float = 0.7, - ) -> LLMResponse: - """简化的 chat 方法""" - return await self.chat_with_retry( - messages=messages, - tools=tools, - model=model, - max_tokens=max_tokens, - temperature=temperature, - ) - - async def chat_stream( - self, - messages: list[dict[str, Any]], - tools: list[dict[str, Any]] | None = None, - model: str | None = None, - max_tokens: int = 4096, - temperature: float = 0.7, - ): - """流式聊天""" - model = model or self.model - - try: - response = await self._llm.client.chat.completions.create( - model=model, - messages=messages, - tools=tools, - temperature=temperature, - max_tokens=max_tokens, - stream=True, - ) - - async for chunk in response: - if chunk.choices[0].delta.content: - yield chunk.choices[0].delta.content - - except Exception as e: - yield f"Error: {str(e)}" diff --git a/agent/app/xbot/agent.py b/agent/app/xbot/agent.py deleted file mode 100644 index 1692a92..0000000 --- a/agent/app/xbot/agent.py +++ /dev/null @@ -1,309 +0,0 @@ -"""XBot Agent - 封装 nanobot 核心能力的 Agent""" - -import os -from pathlib import Path -from typing import Any, Optional -from datetime import datetime - -from .loop import AgentLoop -from .session import SessionManager -from .adapter import XBotLLMAdapter, LLMResponse -from . import config - -# 尝试导入 simplemem -try: - from simplemem import SimpleMemSystem - HAS_SIMPLEMEM = True -except ImportError: - HAS_SIMPLEMEM = False - - -class SimpleToolRegistry: - """简单的工具注册表""" - - def __init__(self): - self._tools: dict[str, Any] = {} - - def register(self, name: str, func: Any, description: str = "") -> None: - """注册一个工具""" - self._tools[name] = { - "function": func, - "description": description, - } - - def get_definitions(self) -> list[dict]: - """获取工具定义列表""" - tools = [] - for name, tool in self._tools.items(): - tools.append({ - "type": "function", - "function": { - "name": name, - "description": tool.get("description", ""), - "parameters": { - "type": "object", - "properties": {}, - "required": [], - } - } - }) - return tools - - def get(self, name: str) -> Optional[Any]: - """获取工具""" - return self._tools.get(name) - - async def execute(self, name: str, arguments: dict) -> Any: - """执行工具""" - tool = self._tools.get(name) - if not tool: - return f"Tool {name} not found" - - func = tool.get("function") - if not func: - return f"Tool {name} has no function" - - try: - if callable(func): - return await func(**arguments) if hasattr(func, '__await__') else func(**arguments) - return "Tool function is not callable" - except Exception as e: - return f"Tool execution error: {str(e)}" - - -class XBotAgent: - """ - XBot Agent - 基于 nanobot 核心的 Agent 实现 - - 特性: - - 多轮 tool-calling 对话 - - 自动内存压缩 - - 会话历史持久化 - """ - - def __init__( - self, - name: str, - role_description: str, - provider: str = "openai", - model: str = "gpt-4", - api_key: Optional[str] = None, - base_url: Optional[str] = None, - workspace: Optional[Path] = None, - context_window_tokens: int = 200000, - embedding_model: Optional[str] = None, - embedding_base_url: Optional[str] = None, - ): - """ - 初始化 XBot Agent - - Args: - name: Agent 名称 - role_description: Agent 角色描述 - provider: LLM 提供商 - model: 模型名称 - api_key: API Key - base_url: Base URL - workspace: 工作目录(用于存储会话和记忆) - context_window_tokens: 上下文窗口大小 - """ - self.name = name - self.role_description = role_description - - # 使用配置文件的默认值 - if api_key is None: - api_key = config.API_KEY - if base_url is None: - base_url = config.BASE_URL - if workspace is None: - workspace = Path(config.WORKSPACE) - - # 创建工作目录 - self.workspace = workspace - self.workspace.mkdir(parents=True, exist_ok=True) - - # 创建 LLM 适配器 - self.provider = XBotLLMAdapter( - provider=provider, - model_name=model, - api_key=api_key, - base_url=base_url, - ) - - # 创建工具注册表 - self.tools = SimpleToolRegistry() - self._register_default_tools() - - # 创建 Agent Loop - self.agent_loop = AgentLoop( - provider=self.provider, - model=model, - tools=self.tools, - max_iterations=50, - ) - - # 创建会话管理器 - self.sessions = SessionManager(self.workspace) - - # 创建 SimpleMem 记忆系统 - if HAS_SIMPLEMEM and api_key and config.ENABLE_SIMPLEMEM: - # 使用配置文件的 embedding 设置 - emb_model = embedding_model or config.EMBEDDING_MODEL - emb_base = embedding_base_url or config.EMBEDDING_BASE_URL or base_url - - self.memory = SimpleMemSystem( - api_key=api_key, - base_url=emb_base, - model=model, - embedding_model=emb_model, - db_path=str(self.workspace / "memory_db"), - clear_db=False, - # 并行处理配置 - enable_parallel_processing=config.ENABLE_PARALLEL_PROCESSING, - max_parallel_workers=config.MAX_PARALLEL_WORKERS, - enable_parallel_retrieval=config.ENABLE_PARALLEL_RETRIEVAL, - max_retrieval_workers=config.MAX_RETRIEVAL_WORKERS, - enable_planning=config.ENABLE_PLANNING, - enable_reflection=config.ENABLE_REFLECTION, - max_reflection_rounds=config.MAX_REFLECTION_ROUNDS, - ) - self._use_simplemem = True - print(f"SimpleMem initialized with embedding: {emb_model}, base_url: {emb_base}") - else: - self.memory = None - self._use_simplemem = False - if not api_key: - print("Warning: No API key provided, SimpleMem will be disabled") - - def _register_default_tools(self) -> None: - """注册默认工具""" - # 可以在这里添加默认工具 - pass - - def register_tool( - self, - name: str, - func: Any, - description: str = "", - parameters: Optional[dict] = None, - ) -> None: - """注册自定义工具""" - tool_def = { - "type": "function", - "function": { - "name": name, - "description": description, - "parameters": parameters or { - "type": "object", - "properties": {}, - "required": [], - } - } - } - # 存储在 tools 中 - self.tools.register(name, func, description) - - async def run( - self, - user_input: str, - session_id: str = "default", - ) -> dict[str, Any]: - """ - 运行 Agent 对话 - - Args: - user_input: 用户输入 - session_id: 会话 ID - - Returns: - dict: 包含 content, tool_calls 等 - """ - # 获取或创建会话 - session = self.sessions.get_or_create(session_id) - - # 构建系统提示 - system_prompt = f"""你是 {self.name}。 -{self.role_description} - -请根据用户的问题回答,并使用 Markdown 格式输出。""" - - # 如果使用 SimpleMem,检索相关记忆 - memory_context = "" - if self._use_simplemem and self.memory: - try: - memory_context = self.memory.ask(user_input) - except Exception as e: - print(f"Memory retrieval error: {e}") - - if memory_context: - system_prompt += f"\n\n相关记忆:\n{memory_context}" - - # 获取历史消息 - history = session.get_history(max_messages=50) - - # 构建初始消息 - initial_messages = history + [ - {"role": "user", "content": user_input} - ] - - # 运行 agent loop - final_content, tools_used, all_messages = await self.agent_loop.run_loop( - initial_messages=initial_messages, - system_prompt=system_prompt, - ) - - # 保存到会话 - for m in all_messages[len(history):]: - session.messages.append(m) - self.sessions.save(session) - - # 保存到 SimpleMem 记忆 - if self._use_simplemem and self.memory and final_content: - try: - self.memory.add_dialogue("User", user_input, datetime.now().isoformat()) - self.memory.add_dialogue(self.name, final_content, datetime.now().isoformat()) - self.memory.finalize() - except Exception as e: - print(f"Memory save error: {e}") - - return { - "content": final_content or "No response", - "tool_calls": tools_used, - "session_id": session_id, - } - - async def run_stream( - self, - user_input: str, - session_id: str = "default", - ): - """ - 运行 Agent 对话(流式输出) - - 先完整执行 agent loop,最后流式输出结果 - - Args: - user_input: 用户输入 - session_id: 会话 ID - - Yields: - str: 流式回复片段 - """ - # 先完整执行 agent loop(包含 tool-calling) - result = await self.run(user_input, session_id) - content = result["content"] - - # 流式输出结果 - for char in content: - yield char - - def clear_session(self, session_id: str) -> None: - """清除会话""" - session = self.sessions.get_or_create(session_id) - session.clear() - self.sessions.save(session) - self.sessions.invalidate(session_id) - - def list_sessions(self) -> list[dict]: - """列出所有会话""" - return self.sessions.list_sessions() diff --git a/agent/app/xbot/config.py b/agent/app/xbot/config.py deleted file mode 100644 index a58adf6..0000000 --- a/agent/app/xbot/config.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -XBot 配置文件 -""" - -# ==================== LLM 配置 ==================== - -# 默认 LLM 提供商 -DEFAULT_PROVIDER = "openai" - -# 默认模型 -DEFAULT_MODEL = "gpt-4" - -# API Key(建议使用环境变量) -import os -API_KEY = os.getenv("OPENAI_API_KEY", "") - -# Base URL -BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") - - -# ==================== SimpleMem 记忆配置 ==================== - -# 是否启用 SimpleMem -ENABLE_SIMPLEMEM = True - -# Embedding 模型 -# 推荐: text-embedding-3-small, text-embedding-3-large, text-embedding-ada-002 -# 或使用 Qwen: Qwen/Qwen3-Embedding-0.6B -EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small") - -# Embedding 服务的 Base URL(可选,默认使用 BASE_URL) -EMBEDDING_BASE_URL = os.getenv("EMBEDDING_BASE_URL", "") - - -# ==================== 并行处理配置 ==================== - -# 是否启用并行处理 -ENABLE_PARALLEL_PROCESSING = True -MAX_PARALLEL_WORKERS = 8 - -# 是否启用并行检索 -ENABLE_PARALLEL_RETRIEVAL = True -MAX_RETRIEVAL_WORKERS = 4 - -# 是否启用规划 -ENABLE_PLANNING = True - -# 是否启用反思 -ENABLE_REFLECTION = True -MAX_REFLECTION_ROUNDS = 2 - - -# ==================== 工作目录 ==================== - -# 工作目录(用于存储会话和记忆) -WORKSPACE = os.getenv("XAGENT_WORKSPACE", "./xbot_workspace") - -# 上下文窗口大小 -CONTEXT_WINDOW_TOKENS = 200000 - - -# ==================== Agent 配置 ==================== - -# 默认 Agent 配置 -DEFAULT_AGENTS = { - 1: { - "name": "数据分析助手", - "role_description": "你是一个专业的数据分析助手,擅长分析数据、生成报告。", - }, - 2: { - "name": "代码审查助手", - "role_description": "你是一个专业的代码审查助手,擅长审查代码、发现bug。", - }, -} diff --git a/agent/app/xbot/loop.py b/agent/app/xbot/loop.py deleted file mode 100644 index 66f9c70..0000000 --- a/agent/app/xbot/loop.py +++ /dev/null @@ -1,190 +0,0 @@ -"""Agent loop for tool-calling conversation.""" - -import asyncio -import json -import re -from typing import Any, Callable, Optional - -from loguru import logger - - -class AgentLoop: - """ - Agent loop with tool-calling capability. - - This is the core of the nanobot agent - it handles: - - Multi-turn conversation with the LLM - - Tool execution when the model requests it - - Progress callbacks for streaming responses - """ - - _TOOL_RESULT_MAX_CHARS = 50000 - - def __init__( - self, - provider: Any, - model: str, - tools: Any, - max_iterations: int = 50, - ): - """ - Initialize the agent loop. - - Args: - provider: LLM provider (must implement chat_with_retry) - model: Model name - tools: Tool registry (must have get_definitions() and execute()) - max_iterations: Maximum tool call iterations - """ - self.provider = provider - self.model = model - self.tools = tools - self.max_iterations = max_iterations - - @staticmethod - def _strip_think(text: Optional[str]) -> Optional[str]: - """Strip model thinking blocks from content.""" - if not text: - return None - # Strip tags commonly used by models like DeepSeek - pattern = r"[\s\S]*?" - text = re.sub(pattern, "", text) - return text.strip() or None - - @staticmethod - def _tool_hint(tool_calls: list) -> str: - """Format tool calls as concise hint.""" - def _fmt(tc): - args = tc.arguments or {} - val = next(iter(args.values()), None) if isinstance(args, dict) else None - if not isinstance(val, str): - return tc.name - return f'{tc.name}("{val[:40]}...")' if len(val) > 40 else f'{tc.name}("{val}")' - return ", ".join(_fmt(tc) for tc in tool_calls) - - async def run_loop( - self, - initial_messages: list[dict], - system_prompt: str = "", - on_progress: Optional[Callable[..., Any]] = None, - ) -> tuple[Optional[str], list[str], list[dict]]: - """ - Run the agent iteration loop. - - Args: - initial_messages: Starting message list - system_prompt: System prompt to prepend - on_progress: Optional callback for progress updates - - Returns: - Tuple of (final_content, tools_used, all_messages) - """ - # Prepend system prompt if provided - if system_prompt: - messages = [{"role": "system", "content": system_prompt}] + initial_messages - else: - messages = initial_messages - - iteration = 0 - final_content = None - tools_used: list[str] = [] - - while iteration < self.max_iterations: - iteration += 1 - - tool_defs = self.tools.get_definitions() if self.tools else [] - - response = await self.provider.chat_with_retry( - messages=messages, - tools=tool_defs, - model=self.model, - ) - - if response.has_tool_calls: - # Send progress update - if on_progress: - thought = self._strip_think(response.content) - if thought: - await on_progress(thought) - await on_progress(self._tool_hint(response.tool_calls), tool_hint=True) - - # Add assistant message with tool calls - tool_call_dicts = [ - tc.to_openai_tool_call() if hasattr(tc, 'to_openai_tool_call') else tc - for tc in response.tool_calls - ] - - messages = self._add_assistant_message( - messages, response.content, tool_call_dicts, - reasoning_content=getattr(response, 'reasoning_content', None), - ) - - # Execute tools - for tool_call in response.tool_calls: - tools_used.append(tool_call.name) - args_str = json.dumps(tool_call.arguments, ensure_ascii=False) - logger.info("Tool call: {}({})", tool_call.name, args_str[:200]) - - result = await self.tools.execute(tool_call.name, tool_call.arguments) - messages = self._add_tool_result(messages, tool_call.id, tool_call.name, result) - else: - clean = self._strip_think(response.content) - - # Handle error responses - if response.finish_reason == "error": - logger.error("LLM returned error: {}", (clean or "")[:200]) - final_content = clean or "Sorry, I encountered an error calling the AI model." - break - - messages = self._add_assistant_message( - messages, clean, - reasoning_content=getattr(response, 'reasoning_content', None), - ) - final_content = clean - break - - if final_content is None and iteration >= self.max_iterations: - logger.warning("Max iterations ({}) reached", self.max_iterations) - final_content = ( - f"I reached the maximum number of tool call iterations ({self.max_iterations}) " - "without completing the task." - ) - - return final_content, tools_used, messages - - def _add_assistant_message( - self, - messages: list[dict], - content: Optional[str], - tool_calls: Optional[list[dict]] = None, - reasoning_content: Optional[str] = None, - ) -> list[dict]: - """Add an assistant message to the message list.""" - msg: dict[str, Any] = {"role": "assistant", "content": content} - if tool_calls: - msg["tool_calls"] = tool_calls - if reasoning_content is not None: - msg["reasoning_content"] = reasoning_content - messages.append(msg) - return messages - - def _add_tool_result( - self, - messages: list[dict], - tool_call_id: str, - tool_name: str, - result: Any, - ) -> list[dict]: - """Add a tool result message to the message list.""" - # Truncate large results - content = str(result) - if len(content) > self._TOOL_RESULT_MAX_CHARS: - content = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)" - - messages.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": tool_name, - "content": content, - }) - return messages diff --git a/agent/app/xbot/memory.py b/agent/app/xbot/memory.py deleted file mode 100644 index f49299e..0000000 --- a/agent/app/xbot/memory.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Memory system for persistent agent memory.""" - -import json -import asyncio -import weakref -from pathlib import Path -from typing import Any, Callable, Optional - -try: - import tiktoken - HAS_TIKTOKEN = True -except ImportError: - HAS_TIKTOKEN = False - - -_SAVE_MEMORY_TOOL = [ - { - "type": "function", - "function": { - "name": "save_memory", - "description": "Save the memory consolidation result to persistent storage.", - "parameters": { - "type": "object", - "properties": { - "history_entry": { - "type": "string", - "description": "A paragraph summarizing key events/decisions/topics.", - }, - "memory_update": { - "type": "string", - "description": "Full updated long-term memory as markdown. Include all existing facts plus new ones.", - }, - }, - "required": ["history_entry", "memory_update"], - }, - }, - } -] - - -class MemoryStore: - """Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log).""" - - def __init__(self, workspace: Path): - self.memory_dir = workspace / "memory" - self.memory_dir.mkdir(parents=True, exist_ok=True) - self.memory_file = self.memory_dir / "MEMORY.md" - self.history_file = self.memory_dir / "HISTORY.md" - - def read_long_term(self) -> str: - if self.memory_file.exists(): - return self.memory_file.read_text(encoding="utf-8") - return "" - - def write_long_term(self, content: str) -> None: - self.memory_file.write_text(content, encoding="utf-8") - - def append_history(self, entry: str) -> None: - with open(self.history_file, "a", encoding="utf-8") as f: - f.write(entry.rstrip() + "\n\n") - - def get_memory_context(self) -> str: - long_term = self.read_long_term() - return f"## Long-term Memory\n{long_term}" if long_term else "" - - -def _estimate_tokens(text: str) -> int: - """Estimate token count.""" - if HAS_TIKTOKEN: - try: - enc = tiktoken.get_encoding("cl100k_base") - return len(enc.encode(text)) - except Exception: - pass - return max(1, len(text) // 4) - - -def _estimate_message_tokens(message: dict[str, Any]) -> int: - """Estimate prompt tokens for a message.""" - content = message.get("content") - parts = [] - - if isinstance(content, str): - parts.append(content) - elif isinstance(content, list): - for part in content: - if isinstance(part, dict) and part.get("type") == "text": - text = part.get("text", "") - if text: - parts.append(text) - else: - parts.append(json.dumps(part, ensure_ascii=False)) - elif content is not None: - parts.append(json.dumps(content, ensure_ascii=False)) - - for key in ("name", "tool_call_id"): - value = message.get(key) - if isinstance(value, str) and value: - parts.append(value) - if message.get("tool_calls"): - parts.append(json.dumps(message["tool_calls"], ensure_ascii=False)) - - payload = "\n".join(parts) - return max(1, _estimate_tokens(payload)) if payload else 1 - - -class MemoryConsolidator: - """Owns consolidation policy, locking, and session offset updates.""" - - def __init__( - self, - workspace: Path, - provider: Any, - model: str, - sessions: Any, - context_window_tokens: int = 200000, - ): - self.store = MemoryStore(workspace) - self.provider = provider - self.model = model - self.sessions = sessions - self.context_window_tokens = context_window_tokens - self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary() - - def get_lock(self, session_key: str) -> asyncio.Lock: - """Return the shared consolidation lock for one session.""" - return self._locks.setdefault(session_key, asyncio.Lock()) - - async def consolidate_messages(self, messages: list[dict[str, object]]) -> bool: - """Archive a selected message chunk into persistent memory.""" - if not messages: - return True - - current_memory = self.store.read_long_term() - prompt = f"""Process this conversation and call the save_memory tool. - -## Current Long-term Memory -{current_memory or "(empty)"} - -## Conversation to Process -{self._format_messages(messages)}""" - - try: - response = await self.provider.chat_with_retry( - messages=[ - {"role": "system", "content": "You are a memory consolidation agent."}, - {"role": "user", "content": prompt}, - ], - tools=_SAVE_MEMORY_TOOL, - model=self.model, - ) - - if not response.has_tool_calls: - return False - - args = response.tool_calls[0].arguments - if isinstance(args, str): - args = json.loads(args) - if isinstance(args, list): - args = args[0] if args else {} - - if entry := args.get("history_entry"): - self.store.append_history(str(entry)) - if update := args.get("memory_update"): - update = str(update) - if update != current_memory: - self.store.write_long_term(update) - - return True - except Exception: - return False - - def _format_messages(self, messages: list[dict]) -> str: - lines = [] - for message in messages: - if not message.get("content"): - continue - lines.append( - f"[{message.get('timestamp', '?')[:16]}] {message['role'].upper()}: {message['content']}" - ) - return "\n".join(lines) - - def pick_consolidation_boundary( - self, - session: Any, - tokens_to_remove: int, - ) -> Optional[tuple[int, int]]: - """Pick a user-turn boundary that removes enough old prompt tokens.""" - start = session.last_consolidated - if start >= len(session.messages) or tokens_to_remove <= 0: - return None - - removed_tokens = 0 - last_boundary: Optional[tuple[int, int]] = None - for idx in range(start, len(session.messages)): - message = session.messages[idx] - if idx > start and message.get("role") == "user": - last_boundary = (idx, removed_tokens) - if removed_tokens >= tokens_to_remove: - return last_boundary - removed_tokens += _estimate_message_tokens(message) - - return last_boundary - - async def archive_unconsolidated(self, session: Any) -> bool: - """Archive the full unconsolidated tail for /new-style session rollover.""" - lock = self.get_lock(session.key) - async with lock: - snapshot = session.messages[session.last_consolidated:] - if not snapshot: - return True - return await self.consolidate_messages(snapshot) - - async def maybe_consolidate_by_tokens(self, session: Any) -> None: - """Loop: archive old messages until prompt fits within half the context window.""" - if not session.messages or self.context_window_tokens <= 0: - return - - lock = self.get_lock(session.key) - async with lock: - target = self.context_window_tokens // 2 - # Simple estimation without full prompt build - estimated = sum(_estimate_message_tokens(m) for m in session.messages[session.last_consolidated:]) - - if estimated < self.context_window_tokens: - return - - # Find boundary and consolidate - boundary = self.pick_consolidation_boundary(session, max(1, estimated - target)) - if boundary is None: - return - - end_idx = boundary[0] - chunk = session.messages[session.last_consolidated:end_idx] - if not chunk: - return - - if await self.consolidate_messages(chunk): - session.last_consolidated = end_idx - self.sessions.save(session) diff --git a/agent/app/xbot/session.py b/agent/app/xbot/session.py deleted file mode 100644 index f7d442b..0000000 --- a/agent/app/xbot/session.py +++ /dev/null @@ -1,169 +0,0 @@ -"""Session management for conversation history.""" - -import json -import shutil -from dataclasses import dataclass, field -from datetime import datetime -from pathlib import Path -from typing import Any, Optional - - -@dataclass -class Session: - """ - A conversation session. - - Stores messages in JSONL format for easy reading and persistence. - """ - - key: str # session_id - messages: list[dict[str, Any]] = field(default_factory=list) - created_at: datetime = field(default_factory=datetime.now) - updated_at: datetime = field(default_factory=datetime.now) - metadata: dict[str, Any] = field(default_factory=dict) - last_consolidated: int = 0 # Number of messages already consolidated to files - - def add_message(self, role: str, content: str, **kwargs: Any) -> None: - """Add a message to the session.""" - msg = { - "role": role, - "content": content, - "timestamp": datetime.now().isoformat(), - **kwargs - } - self.messages.append(msg) - self.updated_at = datetime.now() - - def get_history(self, max_messages: int = 500) -> list[dict[str, Any]]: - """Return unconsolidated messages for LLM input, aligned to a user turn.""" - unconsolidated = self.messages[self.last_consolidated:] - sliced = unconsolidated[-max_messages:] - - # Drop leading non-user messages to avoid orphaned tool_result blocks - for i, m in enumerate(sliced): - if m.get("role") == "user": - sliced = sliced[i:] - break - - out: list[dict[str, Any]] = [] - for m in sliced: - entry: dict[str, Any] = {"role": m["role"], "content": m.get("content", "")} - for k in ("tool_calls", "tool_call_id", "name"): - if k in m: - entry[k] = m[k] - out.append(entry) - return out - - def clear(self) -> None: - """Clear all messages and reset session to initial state.""" - self.messages = [] - self.last_consolidated = 0 - self.updated_at = datetime.now() - - -class SessionManager: - """Manages conversation sessions stored as JSONL files.""" - - def __init__(self, workspace: Path): - self.workspace = workspace - self.sessions_dir = workspace / "sessions" - self.sessions_dir.mkdir(parents=True, exist_ok=True) - self._cache: dict[str, Session] = {} - - def _get_session_path(self, key: str) -> Path: - """Get the file path for a session.""" - safe_key = key.replace(":", "_").replace("/", "_") - return self.sessions_dir / f"{safe_key}.jsonl" - - def get_or_create(self, key: str) -> Session: - """Get an existing session or create a new one.""" - if key in self._cache: - return self._cache[key] - - session = self._load(key) - if session is None: - session = Session(key=key) - - self._cache[key] = session - return session - - def _load(self, key: str) -> Optional[Session]: - """Load a session from disk.""" - path = self._get_session_path(key) - if not path.exists(): - return None - - try: - messages = [] - metadata = {} - created_at = None - last_consolidated = 0 - - with open(path, encoding="utf-8") as f: - for line in f: - line = line.strip() - if not line: - continue - - data = json.loads(line) - - if data.get("_type") == "metadata": - metadata = data.get("metadata", {}) - created_at = datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None - last_consolidated = data.get("last_consolidated", 0) - else: - messages.append(data) - - return Session( - key=key, - messages=messages, - created_at=created_at or datetime.now(), - metadata=metadata, - last_consolidated=last_consolidated - ) - except Exception: - return None - - def save(self, session: Session) -> None: - """Save a session to disk.""" - path = self._get_session_path(session.key) - - with open(path, "w", encoding="utf-8") as f: - metadata_line = { - "_type": "metadata", - "key": session.key, - "created_at": session.created_at.isoformat(), - "updated_at": session.updated_at.isoformat(), - "metadata": session.metadata, - "last_consolidated": session.last_consolidated - } - f.write(json.dumps(metadata_line, ensure_ascii=False) + "\n") - for msg in session.messages: - f.write(json.dumps(msg, ensure_ascii=False) + "\n") - - self._cache[session.key] = session - - def invalidate(self, key: str) -> None: - """Remove a session from the in-memory cache.""" - self._cache.pop(key, None) - - def list_sessions(self) -> list[dict[str, Any]]: - """List all sessions.""" - sessions = [] - for path in self.sessions_dir.glob("*.jsonl"): - try: - with open(path, encoding="utf-8") as f: - first_line = f.readline().strip() - if first_line: - data = json.loads(first_line) - if data.get("_type") == "metadata": - sessions.append({ - "key": data.get("key") or path.stem, - "created_at": data.get("created_at"), - "updated_at": data.get("updated_at"), - "path": str(path) - }) - except Exception: - continue - - return sorted(sessions, key=lambda x: x.get("updated_at", ""), reverse=True) diff --git a/agent/requirements.txt b/agent/requirements.txt deleted file mode 100644 index 97f7f47..0000000 --- a/agent/requirements.txt +++ /dev/null @@ -1,11 +0,0 @@ -fastapi>=0.100.0 -uvicorn[standard]>=0.23.0 -pydantic>=2.0.0 -openai>=1.0.0 -anthropic>=0.18.0 -python-dotenv>=1.0.0 -aiohttp>=3.8.0 -redis>=5.0.0 -loguru>=0.7.0 -tiktoken>=0.12.0 -simplemem>=0.1.0 diff --git a/core/agents/skills/user/skills@xiaohongshu-creator/SKILL.md b/core/agents/skills/user/skills@xiaohongshu-creator/SKILL.md deleted file mode 100644 index 827bf3d..0000000 --- a/core/agents/skills/user/skills@xiaohongshu-creator/SKILL.md +++ /dev/null @@ -1,315 +0,0 @@ ---- -name: openakita/skills@xiaohongshu-creator -description: Create engaging Xiaohongshu (RED/小红书) content including titles, body text, hashtags, and image style recommendations. Supports multiple content types such as product reviews, tutorials, lifestyle sharing, and shopping guides with platform-specific optimization. -license: MIT -metadata: - author: openakita - version: "1.0.0" ---- - -# 小红书内容创作助手 - -专为小红书平台打造的内容创作技能,帮助你生成符合平台调性的高质量笔记,涵盖标题、正文、话题标签和配图建议。 - -## 适用场景 - -- 撰写种草笔记(好物推荐、购物分享) -- 撰写产品测评笔记 -- 撰写教程类笔记(美妆、穿搭、美食、DIY) -- 撰写生活分享笔记(旅行、日常、打卡) -- 品牌合作内容创作 -- 小红书账号运营内容规划 -- 批量生成笔记框架 - -## 核心创作规范 - -### 一、标题规则 - -标题是笔记的第一印象,直接决定点击率。 - -**硬性要求:** -- 字数限制:**不超过 20 个字符** -- Emoji 数量:**1-2 个**,放在标题开头或结尾 -- 禁止使用:感叹号堆叠(`!!!`)、全大写字母 - -**钩子元素(至少使用 1 种):** - -| 钩子类型 | 说明 | 示例 | -|----------|------|------| -| 数字法 | 用数字制造具体感 | `3步搞定通勤妆🌟` | -| 反差法 | 制造意外感 | `月薪3K穿出3W的感觉✨` | -| 痛点法 | 直击目标人群痛点 | `黄皮亲妈色号合集🎨` | -| 悬念法 | 引发好奇心 | `这个习惯让我瘦了20斤💪` | -| 对比法 | 前后/AB对比 | `早C晚A一个月的变化🔥` | -| 权威法 | 借势专业背书 | `皮肤科医生推荐的面霜💊` | -| 场景法 | 具体使用场景 | `约会前30分钟急救妆容💄` | -| 共鸣法 | 引发情感共鸣 | `打工人的高效早餐方案☀️` | - -**标题模板:** -``` -[数字] + [核心关键词] + [利益点] + [emoji] -[身份标签] + [动作] + [结果] + [emoji] -[场景] + [解决方案] + [emoji] -``` - -### 二、正文结构 - -正文长度控制在 **300-500 字符**,遵循四段式结构: - -``` -🔥 Hook(开头钩子) —— 1-2 句,抓住注意力 -📝 Core(核心内容) —— 主体信息,有价值的干货 -📌 Summary(总结) —— 精炼要点 -👉 CTA(行动号召) —— 引导互动 -``` - -**Hook 写法:** -- 提问式:`你们有没有这种烦恼?` -- 共鸣式:`每个打工人都需要这个!` -- 悬念式:`用了三年终于找到了最好用的...` -- 成果式:`坚持30天,效果太惊人了` - -**Core 写法要点:** -- 使用 emoji 分点(📍🔸💡等)替代纯文字列表 -- 每个要点控制在 1-2 行 -- 穿插个人体验和感受(增加真实感) -- 重要信息加【】或「」标注 -- 适当使用换行,避免大段文字 - -**CTA 常用句式:** -- `觉得有用的话记得点赞收藏哦~` -- `你们还想看什么类型的分享?评论区告诉我` -- `有同款的姐妹举个手🙋‍♀️` -- `关注我,持续分享[领域]干货` - -### 三、话题标签规则 - -每篇笔记配 **8 个话题标签**,按以下比例分配: - -| 类别 | 数量 | 说明 | 示例 | -|------|------|------|------| -| 核心词 | 2 个 | 笔记主题精准关键词 | `#面膜推荐` `#保湿面膜` | -| 品类词 | 2 个 | 所属品类/领域 | `#护肤` `#美妆好物` | -| 场景词 | 2 个 | 使用场景/人群 | `#学生党护肤` `#换季护肤` | -| 热门词 | 2 个 | 平台热门话题 | `#好物分享` `#我的爱用物` | - -**选择原则:** -- 优先选择搜索量大但竞争适中的标签 -- 避免过于宽泛的标签(如 `#生活`) -- 包含长尾关键词提升搜索曝光 -- 关注平台当前热门话题榜 - -### 四、配图风格建议 - -小红书是视觉驱动平台,封面决定 80% 的点击率。 - -**10 种推荐视觉风格:** - -| 编号 | 风格 | 适用类型 | 要点 | -|------|------|---------|------| -| 1 | 对比拼图 | 测评/效果展示 | 左右或上下对比,标注差异 | -| 2 | 清单图 | 好物合集/推荐 | 白底九宫格产品陈列 | -| 3 | 教程步骤图 | 教程类 | 编号标注步骤,清晰易跟 | -| 4 | 文字封面 | 干货分享 | 大字标题+简洁背景色 | -| 5 | 场景氛围图 | 生活分享/穿搭 | 自然光,生活感强 | -| 6 | 数据图表 | 测评/科普 | 简化数据可视化 | -| 7 | 手绘/插画风 | 知识科普 | 可爱风格信息图 | -| 8 | Vlog截图 | 日常分享 | 视频关键帧+文字标注 | -| 9 | 实拍特写 | 产品种草 | 高清细节,突出质感 | -| 10 | Ins风简约 | 穿搭/家居 | 低饱和度,高级感 | - -**封面设计通用原则:** -- 尺寸比例:**3:4**(1080×1440px)最佳 -- 文字不超过封面面积的 **20%** -- 核心信息放在画面上半部分(feed 流裁剪安全区) -- 色彩鲜明、对比度高 -- 避免过度 P 图,保持真实感 - -## 内容类型工作流 - -### 工作流一:种草笔记 - -``` -输入 → 产品名称、品类、价格、目标人群 - ↓ -Step 1: 生成 3 个标题方案(痛点法/数字法/场景法各一) - ↓ -Step 2: 撰写正文 - - Hook:个人使用感受/发现契机 - - Core:产品亮点(3-5个)、使用方法、适合人群 - - Summary:一句话总结推荐理由 - - CTA:引导收藏和讨论 - ↓ -Step 3: 生成 8 个话题标签 - ↓ -Step 4: 封面建议(推荐风格 9 实拍特写 或 风格 2 清单图) - ↓ -输出 → 完整笔记(可直接发布) -``` - -### 工作流二:测评笔记 - -``` -输入 → 产品列表(2-5 个)、测评维度 - ↓ -Step 1: 标题使用对比法或数字法 - ↓ -Step 2: 撰写正文 - - Hook:测评动机/痛点引入 - - Core:逐项对比(成分/价格/使用感/性价比) - - Summary:各产品评分或排名 - - CTA:`你们用过哪个?评论区聊聊` - ↓ -Step 3: 话题标签(增加品牌词标签) - ↓ -Step 4: 封面建议(推荐风格 1 对比拼图 或 风格 6 数据图表) - ↓ -输出 → 完整测评笔记 -``` - -### 工作流三:教程笔记 - -``` -输入 → 教程主题、难度、目标人群 - ↓ -Step 1: 标题使用数字法(`X步学会...`) - ↓ -Step 2: 撰写正文 - - Hook:学会后的效果/价值 - - Core:分步骤讲解(每步 1-2 句) - - Summary:关键注意事项 - - CTA:`学会的打个✅` - ↓ -Step 3: 话题标签(增加 `#教程` `#手把手教学` 等) - ↓ -Step 4: 封面建议(推荐风格 3 步骤图 或 风格 4 文字封面) - ↓ -输出 → 完整教程笔记 + 多图建议(每步一张图) -``` - -### 工作流四:生活分享笔记 - -``` -输入 → 分享主题、场景、情绪基调 - ↓ -Step 1: 标题使用共鸣法或场景法 - ↓ -Step 2: 撰写正文 - - Hook:故事开头/情感切入 - - Core:分享细节、个人感受、实用信息 - - Summary:感悟或建议 - - CTA:`你们有类似的经历吗?` - ↓ -Step 3: 话题标签(增加情感/生活方式标签) - ↓ -Step 4: 封面建议(推荐风格 5 场景氛围图 或 风格 8 Vlog截图) - ↓ -输出 → 完整分享笔记 -``` - -## 完整示例 - -### 示例:护肤种草笔记 - -**用户输入:** `帮我写一篇关于珂润面霜的种草笔记,目标人群是敏感肌学生党` - -**输出:** - -**标题方案:** -1. `敏感肌救星!这罐面霜用了3年回购🧴` -2. `学生党百元面霜天花板💰` -3. `换季烂脸?皮肤科医生同款面霜🏥` - -**正文(选用标题1):** - -``` -姐妹们!!敏感肌的福音真的来了 - -作为一个烂脸三年的敏感肌 终于找到了真正好用的面霜 - -📍珂润面霜使用感受: -🔸质地:乳白色霜状 不厚重 好推开 -🔸保湿:神经酰胺成分 修护屏障一绝 -🔸温和度:无香精无酒精 敏感期也能用 -🔸性价比:百元出头 学生党无压力 - -💡使用tips: -1️⃣ 先乳后霜 吸收更好 -2️⃣ 换季加量使用 厚涂当睡眠面膜 -3️⃣ 搭配同系列化妆水效果翻倍 - -用了整整三年 每次回购都是囤两罐的节奏 -敏感肌姐妹真的可以闭眼入! - -觉得有用记得收藏💕 还想看什么评论区告诉我~ -``` - -**话题标签:** -`#珂润面霜` `#敏感肌护肤` `#学生党护肤` `#平价面霜推荐` `#神经酰胺` `#换季护肤` `#好物分享` `#护肤干货` - -**封面建议:** 风格 9(实拍特写),自然光下拍摄面霜质地细节图,辅以手写标注关键成分。 - -## 高级技巧 - -### 发布时间建议 - -| 时段 | 说明 | -|------|------| -| 7:00-9:00 | 通勤高峰,碎片化浏览 | -| 12:00-14:00 | 午休时间,浏览高峰 | -| 18:00-20:00 | 下班后休闲浏览 | -| 21:00-23:00 | 睡前黄金时段,互动率最高 | - -### SEO 优化 - -- 标题和正文前 50 字包含核心关键词 -- 使用平台搜索下拉词作为参考 -- 正文自然融入 3-5 个相关关键词(避免堆砌) -- 评论区补充关键词(自评增加曝光) - -### 互动率提升 - -- 正文中设置互动问题(`你们觉得呢?`) -- 结尾提供选择题(`A还是B?评论区投票`) -- 24 小时内回复所有评论 -- 置顶评论放重要补充信息 - -## 常见误区 - -| 误区 | 正确做法 | -|------|---------| -| 标题过长(超20字) | 精炼到 20 字以内,信息密度优先 | -| 正文大段不分行 | 每 2-3 行空一行,用 emoji 分隔 | -| 标签太宽泛 | 组合使用泛词+精准词+长尾词 | -| 封面文字太多 | 封面突出视觉冲击,详细信息放正文 | -| 纯广告无真实感 | 加入个人体验和真实细节 | -| 内容同质化严重 | 找到独特切入角度(身份/场景/反差) | -| 忽略评论区运营 | 主动回复并引导二次互动 | - -## 输出格式规范 - -每次生成内容时,严格按以下格式输出: - -```markdown -## 📝 小红书笔记 - -### 标题方案 -1. [方案一] -2. [方案二] -3. [方案三] - -### 正文 -[完整正文内容] - -### 话题标签 -[8个标签] - -### 封面建议 -- 推荐风格:[编号+名称] -- 具体建议:[详细说明] -- 配色方案:[色系建议] - -### 发布建议 -- 推荐时段:[具体时间] -- 注意事项:[补充说明] -```