chore: 删除旧版 agent/app 模块

移除已废弃的旧版 agent 实现,使用新的 core/agents 模块替代

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-13 21:24:20 +08:00
parent 7b5d4b20a5
commit 194fe22e26
26 changed files with 0 additions and 3292 deletions

View File

@@ -1 +0,0 @@
# X-Agents Python Agent Engine

View File

@@ -1,3 +0,0 @@
# Agent Core
from app.agent.core.agent import AgentCore, AgentConfig, AgentResponse
from app.agent.core.supervisor import Supervisor

View File

@@ -1,170 +0,0 @@
"""
Agent Core - 单智能体核心
"""
import logging
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
from app.agent.memory.manager import MemoryManager
from app.agent.skills.router import SkillRouter
from app.agent.skills.executor import SkillExecutor
from app.agent.llm.factory import LLMFactory
logger = logging.getLogger("agent.core")
class AgentConfig(BaseModel):
"""智能体配置"""
id: int
name: str
role_description: str
model_provider: str = "openai"
model_name: str = "gpt-4"
api_key: Optional[str] = None # API Key可选用于覆盖默认配置
base_url: Optional[str] = None # Base URL可选用于覆盖默认配置
skills: List[int] = [] # 技能 ID 列表
knowledge_base_ids: List[int] = []
timeout: int = 60
memory_limit: int = 134217728 # 128MB
class AgentResponse(BaseModel):
"""智能体响应"""
content: str
tool_calls: List[Dict[str, Any]] = []
tokens_used: int = 0
duration_ms: int = 0
session_id: Optional[str] = None
class AgentCore:
"""单智能体核心类"""
def __init__(self, config: AgentConfig):
self.config = config
# 记录 LLM 初始化信息
api_key_info = f"{config.api_key[:10]}..." if config.api_key else "None"
logger.info(f"初始化 AgentCore: name={config.name}, provider={config.model_provider}, model={config.model_name}, api_key={api_key_info}, base_url={config.base_url}")
self.llm = LLMFactory.create(config.model_provider, config.model_name, config.api_key, config.base_url)
self.memory = MemoryManager(config.id)
self.skill_router = SkillRouter(config.skills)
self.skill_executor = SkillExecutor()
async def run(self, user_input: str, user_id: int, session_id: str) -> AgentResponse:
"""
执行智能体对话
Args:
user_input: 用户输入
user_id: 用户 ID
session_id: 会话 ID
Returns:
AgentResponse: 智能体响应
"""
import time
start_time = time.time()
try:
# 1. 加载记忆
context = await self.memory.load_context(user_input, user_id, session_id)
# 2. 构建 Prompt
prompt = self._build_prompt(user_input, context)
# 3. LLM 决策
decision = await self.llm.decide(prompt)
# 4. 执行技能(如需)
if decision.get('needs_skill'):
skill_results = await self._execute_skills(decision.get('tool_calls', []))
# 5. 基于结果生成回复
final_response = await self.llm.generate(prompt, skill_results)
else:
final_response = decision.get('response', '')
# 6. 保存记忆
await self.memory.save(user_input, final_response, user_id, session_id)
duration_ms = int((time.time() - start_time) * 1000)
return AgentResponse(
content=final_response,
tool_calls=decision.get('tool_calls', []),
duration_ms=duration_ms,
session_id=session_id
)
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
return AgentResponse(
content=f"处理请求时发生错误: {str(e)}",
duration_ms=duration_ms,
session_id=session_id
)
def _build_prompt(self, user_input: str, context: dict) -> str:
"""构建 Prompt"""
system_prompt = f"""你是 {self.config.name}
{self.config.role_description}
相关记忆:
{context.get('summary', '')}
知识库信息:
{context.get('knowledge', '')}
请根据以上上下文回答用户问题,并使用 Markdown 格式输出。"""
return f"{system_prompt}\n\n用户: {user_input}"
async def _execute_skills(self, skill_decisions: List[Dict]) -> List[Dict]:
"""执行技能"""
if not skill_decisions:
return []
results = []
for decision in skill_decisions:
result = await self.skill_executor.execute(
skill_id=decision.get('skill_id'),
params=decision.get('params', {})
)
results.append(result)
return results
async def run_stream(self, user_input: str, user_id: int, session_id: str):
"""
执行智能体对话(流式输出)
优化:对于简单对话,直接流式生成,跳过 decide 步骤(省一次 LLM 调用)
只有当需要工具时才先判断
Args:
user_input: 用户输入
user_id: 用户 ID
session_id: 会话 ID
Yields:
str: 流式回复片段
"""
import time
start_time = time.time()
try:
# 1. 加载记忆
context = await self.memory.load_context(user_input, user_id, session_id)
# 2. 构建 Prompt
prompt = self._build_prompt(user_input, context)
# 3. 直接流式生成回复(跳过 decide省一次 LLM 调用)
# 如果将来需要工具能力,可以在这里添加判断逻辑
async for chunk in self.llm.generate_stream(prompt, []):
yield chunk
# 4. 保存记忆(完成后)
final_response = ""
await self.memory.save(user_input, final_response, user_id, session_id)
except Exception as e:
yield f"处理请求时发生错误: {str(e)}"

View File

@@ -1,156 +0,0 @@
"""
Supervisor - 多智能体调度器
"""
import asyncio
from typing import List, Dict, Any
from app.agent.core.agent import AgentCore
class Supervisor:
"""多智能体调度器"""
def __init__(self, supervisor_agent: AgentCore, members: List[AgentCore], strategy: str = "parallel"):
"""
初始化调度器
Args:
supervisor_agent: 主智能体
members: 子智能体列表
strategy: 调度策略 (parallel/sequential)
"""
self.supervisor = supervisor_agent
self.members = members
self.strategy = strategy
async def run(self, task: str, user_id: int, session_id: str) -> Dict[str, Any]:
"""
执行多智能体协作
Args:
task: 用户任务
user_id: 用户 ID
session_id: 会话 ID
Returns:
Dict: 包含主响应和子任务结果
"""
# 1. 任务分解
subtasks = await self._decompose_task(task)
# 2. 分配任务
if self.strategy == "parallel":
results = await self._dispatch_parallel(subtasks, user_id, session_id)
else:
results = await self._dispatch_sequential(subtasks, user_id, session_id)
# 3. 汇总结果
final_result = await self._aggregate(results)
return {
"main_response": final_result,
"subtask_results": results,
"strategy": self.strategy
}
async def _decompose_task(self, task: str) -> List[Dict[str, str]]:
"""任务分解"""
# 调用 LLM 分解任务
prompt = f"""分解以下任务为子任务,返回 JSON 数组格式:
任务: {task}
返回格式示例:
[{{"task": "子任务1描述", "agent_type": "适合的智能体类型"}}, {{"task": "子任务2描述", "agent_type": "适合的智能体类型"}}]
请直接返回 JSON 数组,不要其他内容。"""
response = await self.supervisor.llm.generate(prompt, [])
try:
import json
# 尝试解析 JSON
subtasks = json.loads(response)
return subtasks
except:
# 解析失败,创建默认子任务
return [{"task": task, "agent_type": "general"}]
async def _dispatch_parallel(self, subtasks: List[Dict], user_id: int, session_id: str) -> List[Dict]:
"""并行分发任务"""
tasks = []
for i, subtask in enumerate(subtasks):
if i < len(self.members):
member = self.members[i]
else:
# 如果子任务多于成员,使用轮询
member = self.members[i % len(self.members)]
tasks.append(member.run(subtask['task'], user_id, session_id))
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
formatted_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
formatted_results.append({
"task": subtasks[i]['task'],
"success": False,
"error": str(result)
})
else:
formatted_results.append({
"task": subtasks[i]['task'],
"success": True,
"content": result.content,
"tool_calls": result.tool_calls
})
return formatted_results
async def _dispatch_sequential(self, subtasks: List[Dict], user_id: int, session_id: str) -> List[Dict]:
"""顺序分发任务"""
results = []
context = ""
for i, subtask in enumerate(subtasks):
# 选择子智能体
if i < len(self.members):
member = self.members[i]
else:
member = self.members[i % len(self.members)]
# 传递前一个结果作为上下文
enhanced_task = f"{context}\n\n当前任务: {subtask['task']}" if context else subtask['task']
result = await member.run(enhanced_task, user_id, session_id)
results.append({
"task": subtask['task'],
"success": True,
"content": result.content,
"tool_calls": result.tool_calls
})
# 累加上下文
context += f"\n\n=== 任务: {subtask['task']} ===\n{result.content}"
return results
async def _aggregate(self, results: List[Dict]) -> str:
"""汇总结果"""
# 过滤成功的结果
success_results = [r for r in results if r.get('success')]
if not success_results:
return "所有子任务执行失败"
if len(success_results) == 1:
return success_results[0].get('content', '')
# 调用 LLM 汇总
summary_prompt = "请汇总以下所有任务的结果,生成一个完整的回复:\n\n"
for i, result in enumerate(success_results, 1):
summary_prompt += f"=== 任务 {i}: {result.get('task', '')} ===\n{result.get('content', '')}\n\n"
final_response = await self.supervisor.llm.generate(summary_prompt, [])
return final_response

View File

@@ -1,4 +0,0 @@
# LLM
from app.agent.llm.factory import LLMFactory
from app.agent.llm.openai import OpenAILLM
from app.agent.llm.anthropic import AnthropicLLM

View File

@@ -1,136 +0,0 @@
"""
Anthropic LLM 实现
"""
import os
from typing import Dict, Any, List, Optional
from anthropic import AsyncAnthropic
class AnthropicLLM:
"""Anthropic Claude LLM"""
def __init__(self, model_name: str = "claude-3-sonnet-20240229", api_key: Optional[str] = None, base_url: Optional[str] = None):
self.model_name = model_name
# 支持自定义 base_url如 OpenRouter
if base_url:
self.client = AsyncAnthropic(
api_key=api_key or os.getenv("ANTHROPIC_API_KEY", ""),
base_url=base_url
)
else:
self.client = AsyncAnthropic(
api_key=api_key or os.getenv("ANTHROPIC_API_KEY", "")
)
async def decide(self, prompt: str) -> Dict[str, Any]:
"""
LLM 决策 - 判断是否需要调用技能
Args:
prompt: 完整的 Prompt
Returns:
Dict: 包含 needs_skill, tool_calls, response 等
"""
system_prompt = """你是一个智能助手。请分析用户请求,判断是否需要调用工具来回答问题。
如果需要调用工具,请按以下格式返回 JSON
{"needs_skill": true, "tool_calls": [{"skill_id": "工具名称", "parameters": {"参数": ""}, "reason": "调用原因"}]}
如果不需要调用工具,请返回:
{"needs_skill": false, "response": "直接回答用户的内容"}
请只返回 JSON不要其他内容。"""
try:
response = await self.client.messages.create(
model=self.model_name,
max_tokens=2000,
system=system_prompt,
messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
# 尝试解析 JSON
import json
try:
result = json.loads(content)
return result
except json.JSONDecodeError:
return {
"needs_skill": False,
"response": content
}
except Exception as e:
return {
"needs_skill": False,
"response": f"LLM 调用失败: {str(e)}"
}
async def generate(self, prompt: str, tool_results: List[Dict]) -> str:
"""
生成回复
Args:
prompt: 完整的 Prompt
tool_results: 工具调用结果
Returns:
str: 生成的回复
"""
user_message = prompt
# 添加工具结果作为上下文
if tool_results:
tool_context = "\n\n工具返回结果:\n"
for result in tool_results:
if result.get("success"):
tool_context += f"- {result.get('skill_id')}: {result.get('result')}\n"
user_message += tool_context
try:
response = await self.client.messages.create(
model=self.model_name,
max_tokens=4000,
messages=[{"role": "user", "content": user_message}]
)
return response.content[0].text
except Exception as e:
return f"生成回复失败: {str(e)}"
async def generate_stream(self, prompt: str, tool_results: List[Dict]):
"""
流式生成回复
Args:
prompt: 完整的 Prompt
tool_results: 工具调用结果
Yields:
str: 生成的回复片段
"""
user_message = prompt
# 添加工具结果作为上下文
if tool_results:
tool_context = "\n\n工具返回结果:\n"
for result in tool_results:
if result.get("success"):
tool_context += f"- {result.get('skill_id')}: {result.get('result')}\n"
user_message += tool_context
try:
async with self.client.messages.stream(
model=self.model_name,
max_tokens=4000,
messages=[{"role": "user", "content": user_message}]
) as stream:
async for text in stream.text_stream:
yield text
except Exception as e:
yield f"生成回复失败: {str(e)}"

View File

@@ -1,32 +0,0 @@
"""
LLM Factory - LLM 工厂类
"""
from typing import Optional
from app.agent.llm.openai import OpenAILLM
from app.agent.llm.anthropic import AnthropicLLM
class LLMFactory:
"""LLM 工厂类"""
@staticmethod
def create(provider: str, model_name: str, api_key: Optional[str] = None, base_url: Optional[str] = None):
"""
创建 LLM 实例
Args:
provider: 模型提供商 (openai/anthropic)
model_name: 模型名称
api_key: API Key可选
base_url: Base URL可选
Returns:
LLM 实例
"""
if provider.lower() == "openai":
return OpenAILLM(model_name, api_key, base_url)
elif provider.lower() == "anthropic":
return AnthropicLLM(model_name, api_key, base_url)
else:
# 默认使用 OpenAI
return OpenAILLM(model_name, api_key, base_url)

View File

@@ -1,167 +0,0 @@
"""
OpenAI LLM 实现
"""
import os
import logging
from typing import Dict, Any, List, Optional
from openai import AsyncOpenAI
from openai._client import AsyncOpenAI
logger = logging.getLogger("llm.openai")
class OpenAILLM:
"""OpenAI LLM"""
def __init__(self, model_name: str = "gpt-4", api_key: Optional[str] = None, base_url: Optional[str] = None):
self.model_name = model_name
# 优先使用传入的参数,否则使用环境变量
self.api_key = api_key or os.getenv("OPENAI_API_KEY", "")
self.base_url = base_url or os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
api_key_info = f"{self.api_key[:10]}..." if self.api_key else "None"
logger.info(f"初始化 OpenAI LLM: model={model_name}, api_key={api_key_info}, base_url={self.base_url}")
if not self.api_key:
logger.warning("⚠️ WARNING: No API key provided!")
# 配置超时
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url,
timeout=60.0, # 60秒超时
max_retries=1 # 减少重试次数
)
async def decide(self, prompt: str) -> Dict[str, Any]:
"""
LLM 决策 - 判断是否需要调用技能
Args:
prompt: 完整的 Prompt
Returns:
Dict: 包含 needs_skill, tool_calls, response 等
"""
# 构建决策用的系统提示
system_prompt = """你是一个智能助手。请分析用户请求,判断是否需要调用工具来回答问题。
如果需要调用工具,请按以下格式返回 JSON
{
"needs_skill": true,
"tool_calls": [
{"skill_id": "工具名称", "parameters": {"参数": ""}, "reason": "调用原因"}
]
}
如果不需要调用工具,请返回:
{
"needs_skill": false,
"response": "直接回答用户的内容"
}
请只返回 JSON不要其他内容。"""
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=2000
)
content = response.choices[0].message.content
# 尝试解析 JSON
import json
try:
result = json.loads(content)
return result
except json.JSONDecodeError:
# 解析失败,作为普通回复处理
return {
"needs_skill": False,
"response": content
}
except Exception as e:
return {
"needs_skill": False,
"response": f"LLM 调用失败: {str(e)}"
}
async def generate(self, prompt: str, tool_results: List[Dict]) -> str:
"""
生成回复
Args:
prompt: 完整的 Prompt
tool_results: 工具调用结果
Returns:
str: 生成的回复
"""
messages = [{"role": "user", "content": prompt}]
# 添加工具结果作为上下文
if tool_results:
for result in tool_results:
if result.get("success"):
messages.append({
"role": "assistant",
"content": f"工具 {result.get('skill_id')} 返回: {result.get('result')}"
})
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=messages,
temperature=0.7,
max_tokens=4000
)
return response.choices[0].message.content
except Exception as e:
return f"生成回复失败: {str(e)}"
async def generate_stream(self, prompt: str, tool_results: List[Dict]):
"""
流式生成回复
Args:
prompt: 完整的 Prompt
tool_results: 工具调用结果
Yields:
str: 生成的回复片段
"""
messages = [{"role": "user", "content": prompt}]
# 添加工具结果作为上下文
if tool_results:
for result in tool_results:
if result.get("success"):
messages.append({
"role": "assistant",
"content": f"工具 {result.get('skill_id')} 返回: {result.get('result')}"
})
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=messages,
temperature=0.7,
max_tokens=4000,
stream=True
)
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"生成回复失败: {str(e)}"

View File

@@ -1,5 +0,0 @@
# Memory
from app.agent.memory.manager import MemoryManager
from app.agent.memory.working import WorkingMemory
from app.agent.memory.session import SessionMemory
from app.agent.memory.persistent import PersistentMemory

View File

@@ -1,99 +0,0 @@
"""
Memory Manager - 记忆管理器
"""
from typing import Dict, List, Optional
from app.agent.memory.working import WorkingMemory
from app.agent.memory.session import SessionMemory
from app.agent.memory.persistent import PersistentMemory
class MemoryManager:
"""记忆管理器 - 统一接口"""
def __init__(self, agent_id: int):
self.agent_id = agent_id
self.working = WorkingMemory()
self.session = SessionMemory(agent_id)
self.persistent = PersistentMemory(agent_id)
async def load_context(self, query: str, user_id: int, session_id: str) -> Dict[str, str]:
"""
加载上下文记忆
优化:跳过耗时的向量搜索,提升响应速度
生产环境可以加回来
Args:
query: 查询内容
user_id: 用户 ID
session_id: 会话 ID
Returns:
Dict: 包含 summary, knowledge 等
"""
# 1. Working Memory (内存,最快)
working_context = self.working.get()
# 2. Session Memory (Redis) - 暂时跳过,减少延迟
# session_context = await self.session.get_summary(user_id, session_id)
session_context = ""
# 3. Persistent Memory (向量库) - 暂时跳过,减少延迟
# persistent_context = await self.persistent.search(query, user_id, top_k=3)
persistent_context = []
return {
'working': working_context.get('recent_messages', []),
'session': session_context,
'persistent': persistent_context,
'summary': "", # 简化
'knowledge': ""
}
async def save(self, user_input: str, response: str, user_id: int, session_id: str):
"""
保存记忆
Args:
user_input: 用户输入
response: 智能体回复
user_id: 用户 ID
session_id: 会话 ID
"""
# 1. 写入 Working
self.working.add(user_input, response)
# 2. 写入 Session (定期摘要)
await self.session.add(user_input, response, user_id, session_id)
# 3. 提取关键信息写入 Persistent (定期)
if self.working.size() >= 5:
await self._extract_and_persist(user_input, response, user_id)
def _build_summary(self, session_context: str, persistent_context: List[str]) -> str:
"""构建记忆摘要"""
parts = []
if session_context:
parts.append(f"会话记忆: {session_context}")
if persistent_context:
parts.append(f"长期记忆: {'; '.join(persistent_context[:3])}")
return "\n".join(parts) if parts else "无相关记忆"
async def _extract_and_persist(self, user_input: str, response: str, user_id: int):
"""提取并持久化关键信息"""
# 提取关键信息简化版取前100字符作为摘要
key_points = []
# 简化:直接保存重要交互
if len(response) > 50: # 只保存有意义的回复
summary = response[:100] + "..."
key_points.append(summary)
for point in key_points:
await self.persistent.add(point, user_id, memory_type="conversation")
# 重置 Working Memory
self.working.clear()

View File

@@ -1,109 +0,0 @@
"""
Persistent Memory - 长期记忆(向量存储)
"""
from typing import List, Optional
class PersistentMemory:
"""长期记忆,向量存储"""
def __init__(self, agent_id: int, vector_store=None):
"""
初始化长期记忆
Args:
agent_id: 智能体 ID
vector_store: 向量存储客户端(可选)
"""
self.agent_id = agent_id
self.vector_store = vector_store
self.use_vector = vector_store is not None
async def add(self, content: str, user_id: int, memory_type: str = "experience"):
"""
添加长期记忆
Args:
content: 记忆内容
user_id: 用户 ID
memory_type: 记忆类型 (experience/preference/conversation)
"""
if not self.use_vector:
return self._add_memory(content, user_id, memory_type)
# 生成向量并存储
embedding = await self._get_embedding(content)
# TODO: 调用向量存储 API
async def search(self, query: str, user_id: int, top_k: int = 3) -> List[str]:
"""
搜索相关记忆
Args:
query: 查询内容
user_id: 用户 ID
top_k: 返回数量
Returns:
List[str]: 相关的记忆列表
"""
if not self.use_vector:
return self._search_memory(query, user_id, top_k)
# 生成查询向量
query_embedding = await self._get_embedding(query)
# TODO: 调用向量搜索 API
# results = await self.vector_store.search(
# agent_id=self.agent_id,
# user_id=user_id,
# embedding=query_embedding,
# top_k=top_k
# )
return []
async def _get_embedding(self, text: str) -> List[float]:
"""
获取文本向量
Args:
text: 文本
Returns:
List[float]: 向量
"""
# TODO: 实现向量生成
# 可以使用 OpenAI Embedding API 或本地模型
import hashlib
# 简化:使用文本哈希模拟
h = hashlib.md5(text.encode()).digest()
return [float(b) / 255.0 for b in h[:16]] + [0.0] * 16
# === 内存模拟(无向量存储时使用)===
_memory_store = {}
def _add_memory(self, content: str, user_id: int, memory_type: str):
"""内存模拟 - 添加"""
key = f"{self.agent_id}:{user_id}"
if key not in self._memory_store:
self._memory_store[key] = []
self._memory_store[key].append({
"content": content,
"type": memory_type
})
def _search_memory(self, query: str, user_id: int, top_k: int) -> List[str]:
"""内存模拟 - 搜索(简化版:关键词匹配)"""
key = f"{self.agent_id}:{user_id}"
if key not in self._memory_store:
return []
# 简化:包含查询词的记忆
results = []
for mem in self._memory_store[key]:
if query.lower() in mem["content"].lower():
results.append(mem["content"])
return results[:top_k]

View File

@@ -1,125 +0,0 @@
"""
Session Memory - 会话级记忆Redis 存储)
"""
import json
from typing import Optional
class SessionMemory:
"""会话级记忆Redis 存储"""
def __init__(self, agent_id: int, redis_client=None):
"""
初始化会话记忆
Args:
agent_id: 智能体 ID
redis_client: Redis 客户端(可选)
"""
self.agent_id = agent_id
self.redis = redis_client
self.ttl = 3600 * 24 # 24 小时
self.summary_threshold = 10 # 多少条消息后生成摘要
def _key(self, user_id: int, session_id: str) -> str:
"""生成 Redis Key"""
return f"agent:memory:session:{self.agent_id}:{user_id}:{session_id}"
async def add(self, user_input: str, response: str, user_id: int, session_id: str):
"""
添加对话到会话记忆
Args:
user_input: 用户输入
response: 智能体回复
user_id: 用户 ID
session_id: 会话 ID
"""
if not self.redis:
# 如果没有 Redis使用内存模拟
return self._add_memory(user_input, response, user_id, session_id)
key = self._key(user_id, session_id)
# 获取现有数据
data = await self.redis.get(key)
messages = json.loads(data) if data else {"messages": [], "summary": ""}
# 添加新消息
messages["messages"].append({"role": "user", "content": user_input})
messages["messages"].append({"role": "assistant", "content": response})
# 定期生成摘要
if len(messages["messages"]) >= self.summary_threshold:
messages["summary"] = await self._generate_summary(messages["messages"])
# 保持消息数量
if len(messages["messages"]) > 50:
messages["messages"] = messages["messages"][-50:]
await self.redis.setex(key, self.ttl, json.dumps(messages))
async def get_summary(self, user_id: int, session_id: str) -> str:
"""
获取会话摘要
Args:
user_id: 用户 ID
session_id: 会话 ID
Returns:
str: 会话摘要
"""
if not self.redis:
return self._get_memory_summary(user_id, session_id)
key = self._key(user_id, session_id)
data = await self.redis.get(key)
if data:
messages = json.loads(data)
return messages.get("summary", "")
return ""
async def _generate_summary(self, messages: list) -> str:
"""
生成摘要(简化版)
Args:
messages: 消息列表
Returns:
str: 摘要
"""
# 简化:取最后几条消息的要点
if not messages:
return ""
recent = messages[-6:] # 最近 3 轮
summary = f"最近对话包含 {len(messages)//2} 轮交互"
# TODO: 后续可以使用 LLM 生成更好的摘要
return summary
# === 内存模拟(无 Redis 时使用)===
_memory_store = {}
def _add_memory(self, user_input: str, response: str, user_id: int, session_id: str):
"""内存模拟 - 添加"""
key = f"{self.agent_id}:{user_id}:{session_id}"
if key not in self._memory_store:
self._memory_store[key] = {"messages": [], "summary": ""}
messages = self._memory_store[key]["messages"]
messages.append({"role": "user", "content": user_input})
messages.append({"role": "assistant", "content": response})
if len(messages) >= self.summary_threshold:
self._memory_store[key]["summary"] = self._generate_summary(messages)
def _get_memory_summary(self, user_id: int, session_id: str) -> str:
"""内存模拟 - 获取摘要"""
key = f"{self.agent_id}:{user_id}:{session_id}"
if key in self._memory_store:
return self._memory_store[key].get("summary", "")
return ""

View File

@@ -1,47 +0,0 @@
"""
Working Memory - 当前任务上下文(内存级存储)
"""
class WorkingMemory:
"""当前任务上下文,内存级存储"""
def __init__(self):
self.current_task = None
self.recent_messages = []
self.max_size = 10 # 最大保留 10 轮对话
def get(self) -> dict:
"""获取当前记忆"""
return {
'current_task': self.current_task,
'recent_messages': self.recent_messages[-self.max_size:]
}
def add(self, user_input: str, response: str):
"""添加对话到记忆"""
self.recent_messages.append({
'role': 'user',
'content': user_input
})
self.recent_messages.append({
'role': 'assistant',
'content': response
})
# 保持固定大小
if len(self.recent_messages) > self.max_size * 2:
self.recent_messages = self.recent_messages[-self.max_size * 2:]
def set_current_task(self, task: str):
"""设置当前任务"""
self.current_task = task
def clear(self):
"""清空记忆"""
self.recent_messages = []
self.current_task = None
def size(self) -> int:
"""获取对话轮数"""
return len(self.recent_messages) // 2

View File

@@ -1,3 +0,0 @@
# Skills
from app.agent.skills.router import SkillRouter
from app.agent.skills.executor import SkillExecutor

View File

@@ -1,129 +0,0 @@
"""
Skill Executor - 技能执行器
"""
import asyncio
from typing import List, Dict, Any
class SkillExecutor:
"""技能执行器,支持并发/串行执行"""
def __init__(self, skill_registry=None):
"""
初始化技能执行器
Args:
skill_registry: 技能注册表(可选)
"""
self.skill_registry = skill_registry
self._skill_handlers = self._init_default_handlers()
def _init_default_handlers(self) -> Dict[str, callable]:
"""初始化默认技能处理器"""
return {
"query_database": self._handle_query_database,
"data_analysis": self._handle_data_analysis,
"search_knowledge": self._handle_search_knowledge,
"web_search": self._handle_web_search,
}
async def execute(self, skill_id: str, params: dict) -> Dict[str, Any]:
"""
执行单个技能
Args:
skill_id: 技能 ID
params: 技能参数
Returns:
Dict: 执行结果
"""
handler = self._skill_handlers.get(skill_id)
if not handler:
return {
"success": False,
"error": f"Skill {skill_id} not found"
}
try:
result = await handler(params)
return {
"success": True,
"skill_id": skill_id,
"result": result
}
except Exception as e:
return {
"success": False,
"skill_id": skill_id,
"error": str(e)
}
async def execute_multiple(self, skills: List[Dict], strategy: str = "parallel") -> List[Dict]:
"""
批量执行技能
Args:
skills: 技能列表
strategy: 执行策略 (parallel/sequential)
Returns:
List[Dict]: 执行结果列表
"""
if strategy == "parallel":
tasks = [self.execute(s['skill_id'], s['params']) for s in skills]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
formatted_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
formatted_results.append({
"success": False,
"skill_id": skills[i]['skill_id'],
"error": str(result)
})
else:
formatted_results.append(result)
return formatted_results
else:
results = []
for s in skills:
result = await self.execute(s['skill_id'], s['params'])
results.append(result)
return results
# === 默认技能处理器 ===
async def _handle_query_database(self, params: dict) -> Dict[str, Any]:
"""处理数据库查询"""
# TODO: 调用现有的数据库查询功能
return {
"message": "数据库查询功能待实现",
"sql": params.get("sql", "")
}
async def _handle_data_analysis(self, params: dict) -> Dict[str, Any]:
"""处理数据分析"""
# TODO: 调用现有的数据分析功能
return {
"message": "数据分析功能待实现",
"data": params.get("data", {})
}
async def _handle_search_knowledge(self, params: dict) -> Dict[str, Any]:
"""处理知识库搜索"""
# TODO: 调用现有的知识库搜索功能
return {
"message": "知识库搜索功能待实现",
"query": params.get("query", "")
}
async def _handle_web_search(self, params: dict) -> Dict[str, Any]:
"""处理网页搜索"""
# TODO: 实现网页搜索
return {
"message": "网页搜索功能待实现",
"query": params.get("query", "")
}

View File

@@ -1,46 +0,0 @@
"""
Skill Router - 技能路由器
"""
from typing import List, Dict
class SkillRouter:
"""根据 LLM 决策选择要调用的技能"""
def __init__(self, available_skills: List[int]):
"""
初始化技能路由器
Args:
available_skills: 可用技能 ID 列表
"""
self.available_skills = available_skills
async def route(self, llm_decision: dict) -> List[dict]:
"""
解析 LLM 的技能调用决策
Args:
llm_decision: LLM 决策结果
Returns:
List[dict]: 要执行的技能列表
"""
if not llm_decision.get('tool_calls'):
return []
routes = []
for tool_call in llm_decision['tool_calls']:
skill_id = tool_call.get('skill_id') or tool_call.get('tool_name')
# 检查技能是否可用
if self.available_skills and skill_id not in self.available_skills:
continue
routes.append({
'skill_id': skill_id,
'params': tool_call.get('parameters', {}),
'reason': tool_call.get('reason', '')
})
return routes

View File

@@ -1,549 +0,0 @@
"""
FastAPI Agent Engine Server
"""
import os
import sys
import time
import logging
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
import asyncio
from app.agent.core import AgentConfig
from app.xbot import XBotAgent
# 日志目录 - 放在 server/logs 下
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "server", "logs", datetime.now().strftime("%Y-%m-%d"))
os.makedirs(LOG_DIR, exist_ok=True)
# 成功/失败日志文件
today = datetime.now().strftime("%Y-%m-%d")
success_log_file = os.path.join(LOG_DIR, f"python_success.log")
failure_log_file = os.path.join(LOG_DIR, f"python_failure.log")
def setup_logging():
"""配置日志系统"""
log_level = os.getenv("LOG_LEVEL", "INFO")
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 成功日志文件处理器
success_handler = logging.FileHandler(success_log_file, encoding='utf-8')
success_handler.setFormatter(formatter)
success_handler.setLevel(logging.INFO)
# 失败日志文件处理器
failure_handler = logging.FileHandler(failure_log_file, encoding='utf-8')
failure_handler.setFormatter(formatter)
failure_handler.setLevel(logging.WARNING)
# 根日志配置
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level))
root_logger.addHandler(console_handler)
root_logger.addHandler(success_handler)
root_logger.addHandler(failure_handler)
return root_logger
# 成功日志记录器(只记录 INFO 级别到成功日志)
class SuccessLogger:
"""成功日志记录器"""
@staticmethod
def log(message: str):
"""记录成功日志"""
logger = logging.getLogger("success")
logger.setLevel(logging.INFO)
handler = logging.FileHandler(success_log_file, encoding='utf-8')
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(handler)
logger.info(message)
# 同时输出到控制台
print(f"{message}")
# 失败日志记录器
class FailureLogger:
"""失败日志记录器"""
@staticmethod
def log(message: str, error: str = ""):
"""记录失败日志"""
logger = logging.getLogger("failure")
logger.setLevel(logging.WARNING)
handler = logging.FileHandler(failure_log_file, encoding='utf-8')
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s - %(error)s', datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(handler)
full_message = f"{message} - Error: {error}" if error else message
logger.warning(full_message)
# 同时输出到控制台
print(f"{full_message}")
logger = setup_logging()
app = FastAPI(title="X-Agents Python Engine", version="1.0.0")
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# === 请求/响应模型 ===
class ChatRequest(BaseModel):
"""对话请求"""
agent_id: int
message: str
user_id: int = 1
session_id: Optional[str] = None
# 模型参数(可选,如果传了就使用,否则用智能体配置的默认模型)
model_id: Optional[str] = None
model_name: Optional[str] = None
model_provider: Optional[str] = None
api_key: Optional[str] = None
base_url: Optional[str] = None
# Embedding 模型(可选)
embedding_model: Optional[str] = None
embedding_base_url: Optional[str] = None
class TeamChatRequest(BaseModel):
"""多智能体群聊请求"""
supervisor_agent_id: int
member_agent_ids: list[int]
message: str
user_id: int = 1
session_id: Optional[str] = None
strategy: str = "parallel"
class CreateAgentRequest(BaseModel):
"""创建智能体请求"""
name: str
description: Optional[str] = None
avatar: str = "🤖"
# 技能配置
skills_mode: str = "all" # all / include / exclude
skills: list[str] = [] # 技能ID列表
# 知识库
knowledge: str = "general" # general / codebase / docs / api
# 自定义提示词
prompt: Optional[str] = None
# 模型配置
model_provider: Optional[str] = None
model_name: Optional[str] = None
user_id: int = 1
class CreateAgentResponse(BaseModel):
"""创建智能体响应"""
agent_id: int
name: str
message: str = "Agent created successfully"
class ChatResponse(BaseModel):
"""对话响应"""
agent_id: int
response: str
tool_calls: list = []
tokens_used: int = 0
duration_ms: int = 0
session_id: Optional[str] = None
# === 模拟数据存储 ===
# TODO: 后续替换为从数据库加载
_mock_agents = {
1: {
"id": 1,
"name": "数据分析助手",
"role_description": "你是一个专业的数据分析助手,擅长分析数据、生成报告。",
"model_provider": "openai",
"model_name": "gpt-4",
"skills": [1, 2]
},
2: {
"id": 2,
"name": "代码审查助手",
"role_description": "你是一个专业的代码审查助手擅长审查代码、发现bug。",
"model_provider": "openai",
"model_name": "gpt-4",
"skills": [3]
}
}
def get_agent_config(agent_id: int, api_key: str = None, base_url: str = None) -> AgentConfig:
"""获取智能体配置"""
agent_data = _mock_agents.get(agent_id)
if not agent_data:
raise HTTPException(status_code=404, detail="Agent not found")
return AgentConfig(
id=agent_data["id"],
name=agent_data["name"],
role_description=agent_data["role_description"],
model_provider=agent_data["model_provider"],
model_name=agent_data["model_name"],
api_key=api_key,
base_url=base_url,
skills=agent_data.get("skills", [])
)
# === API 路由 ===
@app.get("/")
async def root():
return {"message": "X-Agents Python Engine", "version": "1.0.0"}
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.post("/agent/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""
单智能体对话
"""
chat_logger = logging.getLogger("agent.chat")
# 打印请求参数(隐藏 api_key 敏感信息)
api_key_preview = f"{request.api_key[:10]}..." if request.api_key else "None"
chat_logger.info(f"========== 收到聊天请求 ==========")
chat_logger.info(f"agent_id: {request.agent_id}")
chat_logger.info(f"model_id: {request.model_id}")
chat_logger.info(f"model_provider: {request.model_provider}")
chat_logger.info(f"model_name: {request.model_name}")
chat_logger.info(f"api_key: {api_key_preview}")
chat_logger.info(f"base_url: {request.base_url}")
chat_logger.info(f"message: {request.message[:50]}...")
start_time = time.time()
# 获取智能体配置
try:
config = get_agent_config(request.agent_id, request.api_key, request.base_url)
chat_logger.info(f"Agent config loaded: provider={config.model_provider}, model={config.model_name}")
except HTTPException as e:
FailureLogger.log(f"Agent not found: agent_id={request.agent_id}", str(e))
chat_logger.error(f"Agent not found: {e}")
raise
except Exception as e:
FailureLogger.log(f"Error loading config: agent_id={request.agent_id}", str(e))
chat_logger.error(f"Error loading config: {e}")
raise HTTPException(status_code=400, detail=str(e))
# 如果请求中指定了模型,覆盖智能体的默认配置
if request.model_provider:
config.model_provider = request.model_provider
if request.model_name:
config.model_name = request.model_name
chat_logger.info(f"Final LLM config: provider={config.model_provider}, model={config.model_name}, api_key={config.api_key[:10] if config.api_key else 'None'}..., base_url={config.base_url}")
# 生成 session_id
session_id = request.session_id or f"session_{int(time.time())}"
# 执行对话 - 默认使用 XBot Agent (nanobot 核心)
try:
xbot = XBotAgent(
name=config.name,
role_description=config.role_description,
provider=config.model_provider,
model=config.model_name,
api_key=request.api_key or config.api_key,
base_url=request.base_url or config.base_url,
embedding_model=request.embedding_model,
embedding_base_url=request.embedding_base_url,
)
result = await xbot.run(request.message, session_id)
response_content = result["content"]
tool_calls = [{"name": tc} for tc in result.get("tool_calls", [])] if result.get("tool_calls") else []
except Exception as e:
FailureLogger.log(f"Agent execution failed: agent_id={request.agent_id}, message={request.message[:30]}", str(e))
chat_logger.error(f"Agent execution error: {e}")
raise HTTPException(status_code=500, detail=str(e))
duration_ms = int((time.time() - start_time) * 1000)
# 记录成功日志
SuccessLogger.log(f"Chat success: agent_id={request.agent_id}, duration={duration_ms}ms, message={request.message[:30]}...")
return ChatResponse(
agent_id=request.agent_id,
response=response_content,
tool_calls=tool_calls,
tokens_used=0,
duration_ms=duration_ms,
session_id=session_id
)
@app.post("/agent/chat/stream")
async def chat_stream(request: ChatRequest):
"""
单智能体对话(流式输出)
"""
chat_logger = logging.getLogger("agent.chat.stream")
# 打印请求参数
api_key_preview = f"{request.api_key[:10]}..." if request.api_key else "None"
base_url_preview = request.base_url if request.base_url else "None"
chat_logger.info(f"========== 收到流式聊天请求 ==========")
chat_logger.info(f"agent_id: {request.agent_id}")
chat_logger.info(f"model_provider: {request.model_provider}")
chat_logger.info(f"model_name: {request.model_name}")
chat_logger.info(f"api_key: {api_key_preview}")
chat_logger.info(f"base_url: {base_url_preview}")
# 获取智能体配置
try:
config = get_agent_config(request.agent_id, request.api_key, request.base_url)
except HTTPException as e:
chat_logger.error(f"Agent not found: {e}")
raise
except Exception as e:
chat_logger.error(f"Error loading config: {e}")
raise HTTPException(status_code=400, detail=str(e))
# 如果请求中指定了模型,覆盖智能体的默认配置
if request.model_provider:
config.model_provider = request.model_provider
if request.model_name:
config.model_name = request.model_name
chat_logger.info(f"最终配置 - provider: {config.model_provider}, model: {config.model_name}, base_url: {config.base_url}")
# 生成 session_id
session_id = request.session_id or f"session_{int(time.time())}"
# Mock 模式测试流式
if request.message.startswith("/mock "):
mock_text = request.message[6:] # 去掉 "/mock " 前缀
async def mock_stream():
for char in mock_text:
yield f"data: {char}\n\n"
await asyncio.sleep(0.05) # 50ms 延迟模拟流式
yield f"data: [DONE]\n\n"
return StreamingResponse(mock_stream(), media_type="text/event-stream")
# 使用 XBot Agent (nanobot 核心)
xbot = XBotAgent(
name=config.name,
role_description=config.role_description,
provider=config.model_provider,
model=config.model_name,
api_key=request.api_key or config.api_key,
base_url=request.base_url or config.base_url,
embedding_model=request.embedding_model,
embedding_base_url=request.embedding_base_url,
)
async def event_generator():
"""SSE 事件生成器"""
try:
# 执行流式对话
async for chunk in xbot.run_stream(request.message, session_id):
# 发送 SSE 格式的数据
yield f"data: {chunk}\n\n"
# 发送结束信号
yield f"data: [DONE]\n\n"
except Exception as e:
chat_logger.error(f"Stream error: {e}")
yield f"data: {{\"error\": \"{str(e)}\"}}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.post("/agent/team/chat")
async def team_chat(request: TeamChatRequest):
"""
多智能体群聊
"""
start_time = time.time()
# 创建主智能体
try:
supervisor_config = get_agent_config(request.supervisor_agent_id)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 使用 XBot 作为主智能体
supervisor_agent = XBotAgent(
name=supervisor_config.name,
role_description=supervisor_config.role_description,
provider=supervisor_config.model_provider,
model=supervisor_config.model_name,
api_key=supervisor_config.api_key,
base_url=supervisor_config.base_url,
)
# 创建子智能体
members = []
for member_id in request.member_agent_ids:
try:
member_config = get_agent_config(member_id)
members.append(XBotAgent(
name=member_config.name,
role_description=member_config.role_description,
provider=member_config.model_provider,
model=member_config.model_name,
api_key=member_config.api_key,
base_url=member_config.base_url,
))
except:
continue
if not members:
raise HTTPException(status_code=400, detail="No valid member agents")
# TODO: 群聊调度逻辑 - 目前简化为串行执行
# 生成 session_id
session_id = request.session_id or f"team_session_{int(time.time())}"
# 串行执行每个智能体
subtask_results = []
main_response = ""
try:
# 主智能体先处理
result = await supervisor_agent.run(request.message, session_id)
main_response = result["content"]
subtask_results.append({
"agent_id": request.supervisor_agent_id,
"response": main_response,
})
# 子智能体并行处理
# import asyncio
# results = await asyncio.gather(*[m.run(request.message, session_id) for m in members])
# for m, r in zip(members, results):
# subtask_results.append({"agent_id": m.name, "response": r["content"]})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
duration_ms = int((time.time() - start_time) * 1000)
return {
"supervisor_agent_id": request.supervisor_agent_id,
"response": main_response,
"subtask_results": subtask_results,
"strategy": request.strategy or "parallel",
"duration_ms": duration_ms,
"session_id": session_id
}
@app.post("/agent/create", response_model=CreateAgentResponse)
async def create_agent(request: CreateAgentRequest):
"""
创建新的智能体
"""
import json
import uuid
# 生成唯一的 agent_id
agent_id = int(datetime.now().timestamp() * 1000) % 100000
# 构建 Agent 配置
agent_config = {
"id": agent_id,
"name": request.name,
"description": request.description or "",
"avatar": request.avatar,
"skills_mode": request.skills_mode,
"skills": request.skills,
"knowledge": request.knowledge,
"role_description": request.prompt or f"You are {request.name}. {request.description or ''}",
"model_provider": request.model_provider or "anthropic",
"model_name": request.model_name or "claude-sonnet-4-20250514",
}
# 保存到 agents 目录
agents_dir = os.path.join(os.path.dirname(__file__), "agents")
os.makedirs(agents_dir, exist_ok=True)
config_file = os.path.join(agents_dir, f"agent_{agent_id}.json")
with open(config_file, "w", encoding="utf-8") as f:
json.dump(agent_config, f, ensure_ascii=False, indent=2)
logger.info(f"Agent created: {request.name} (ID: {agent_id})")
return CreateAgentResponse(
agent_id=agent_id,
name=request.name,
message="Agent created successfully"
)
@app.get("/agent/list")
async def list_agents():
"""
获取智能体列表
"""
import json
agents_dir = os.path.join(os.path.dirname(__file__), "agents")
if not os.path.exists(agents_dir):
return {"agents": []}
agents = []
for file in os.listdir(agents_dir):
if file.endswith(".json"):
config_file = os.path.join(agents_dir, file)
try:
with open(config_file, "r", encoding="utf-8") as f:
agent = json.load(f)
agents.append(agent)
except:
continue
return {"agents": agents}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("AGENT_PORT", "8081"))
uvicorn.run(
app,
host="0.0.0.0",
port=port,
loop="asyncio",
http="h11",
access_log=False,
timeout_keep_alive=5,
)

View File

@@ -1,17 +0,0 @@
"""XBot - 轻量级 Agent 框架(基于 nanobot 核心)"""
from .loop import AgentLoop
from .memory import MemoryConsolidator, MemoryStore
from .session import Session, SessionManager
from .adapter import XBotLLMAdapter
from .agent import XBotAgent
__all__ = [
"AgentLoop",
"MemoryConsolidator",
"MemoryStore",
"Session",
"SessionManager",
"XBotLLMAdapter",
"XBotAgent",
]

View File

@@ -1,186 +0,0 @@
"""LLM Adapter - 将现有 LLM 适配到 XBot 接口"""
import json
from dataclasses import dataclass, field
from typing import Any, Optional
from app.agent.llm.factory import LLMFactory
@dataclass
class ToolCallRequest:
"""A tool call request from the LLM."""
id: str
name: str
arguments: dict[str, Any]
def to_openai_tool_call(self) -> dict[str, Any]:
return {
"id": self.id,
"type": "function",
"function": {
"name": self.name,
"arguments": json.dumps(self.arguments, ensure_ascii=False),
},
}
@dataclass
class LLMResponse:
"""Response from an LLM provider."""
content: str | None
tool_calls: list[ToolCallRequest] = field(default_factory=list)
finish_reason: str = "stop"
usage: dict[str, int] = field(default_factory=dict)
reasoning_content: str | None = None
@property
def has_tool_calls(self) -> bool:
return len(self.tool_calls) > 0
class XBotLLMAdapter:
"""
适配器:将现有 LLM 适配到 XBot 的 LLMProvider 接口
封装 LLMFactory 创建的 LLM使其符合 nanobot 风格的接口:
- chat_with_retry(messages, tools, model) -> LLMResponse
"""
def __init__(
self,
provider: str,
model_name: str,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 4096,
):
self.provider_name = provider
self.model = model_name
self.temperature = temperature
self.max_tokens = max_tokens
# 创建底层 LLM
self._llm = LLMFactory.create(provider, model_name, api_key, base_url)
# 检查是否支持 tool calling
self._supports_tools = self._check_tool_support()
def _check_tool_support(self) -> bool:
"""检查模型是否支持 tool calling"""
# GPT-4, Claude 支持 tool calling
# 简单的判断逻辑
model_lower = self.model.lower()
if "gpt-4" in model_lower or "claude" in model_lower:
return True
return True # 默认支持
async def chat_with_retry(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
model: str | None = None,
max_tokens: int | None = None,
temperature: float | None = None,
) -> LLMResponse:
"""
发送聊天请求(支持 tool calling
Args:
messages: 消息列表
tools: 工具定义列表
model: 模型名称(可选)
max_tokens: 最大 tokens可选
temperature: 温度(可选)
Returns:
LLMResponse: 包含内容和/或工具调用
"""
model = model or self.model
max_tokens = max_tokens or self.max_tokens
temperature = temperature or self.temperature
try:
# 使用流式调用来获取完整响应
response = await self._llm.client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
temperature=temperature,
max_tokens=max_tokens,
)
message = response.choices[0].message
# 检查是否有 tool calls
if message.tool_calls and tools:
tool_calls = []
for tc in message.tool_calls:
tool_calls.append(ToolCallRequest(
id=tc.id,
name=tc.function.name,
arguments=json.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else tc.function.arguments,
))
return LLMResponse(
content=message.content,
tool_calls=tool_calls,
finish_reason="tool_calls",
)
else:
return LLMResponse(
content=message.content or "",
finish_reason="stop",
)
except Exception as e:
return LLMResponse(
content=f"Error calling LLM: {str(e)}",
finish_reason="error",
)
async def chat(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
) -> LLMResponse:
"""简化的 chat 方法"""
return await self.chat_with_retry(
messages=messages,
tools=tools,
model=model,
max_tokens=max_tokens,
temperature=temperature,
)
async def chat_stream(
self,
messages: list[dict[str, Any]],
tools: list[dict[str, Any]] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
):
"""流式聊天"""
model = model or self.model
try:
response = await self._llm.client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
temperature=temperature,
max_tokens=max_tokens,
stream=True,
)
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"Error: {str(e)}"

View File

@@ -1,309 +0,0 @@
"""XBot Agent - 封装 nanobot 核心能力的 Agent"""
import os
from pathlib import Path
from typing import Any, Optional
from datetime import datetime
from .loop import AgentLoop
from .session import SessionManager
from .adapter import XBotLLMAdapter, LLMResponse
from . import config
# 尝试导入 simplemem
try:
from simplemem import SimpleMemSystem
HAS_SIMPLEMEM = True
except ImportError:
HAS_SIMPLEMEM = False
class SimpleToolRegistry:
"""简单的工具注册表"""
def __init__(self):
self._tools: dict[str, Any] = {}
def register(self, name: str, func: Any, description: str = "") -> None:
"""注册一个工具"""
self._tools[name] = {
"function": func,
"description": description,
}
def get_definitions(self) -> list[dict]:
"""获取工具定义列表"""
tools = []
for name, tool in self._tools.items():
tools.append({
"type": "function",
"function": {
"name": name,
"description": tool.get("description", ""),
"parameters": {
"type": "object",
"properties": {},
"required": [],
}
}
})
return tools
def get(self, name: str) -> Optional[Any]:
"""获取工具"""
return self._tools.get(name)
async def execute(self, name: str, arguments: dict) -> Any:
"""执行工具"""
tool = self._tools.get(name)
if not tool:
return f"Tool {name} not found"
func = tool.get("function")
if not func:
return f"Tool {name} has no function"
try:
if callable(func):
return await func(**arguments) if hasattr(func, '__await__') else func(**arguments)
return "Tool function is not callable"
except Exception as e:
return f"Tool execution error: {str(e)}"
class XBotAgent:
"""
XBot Agent - 基于 nanobot 核心的 Agent 实现
特性:
- 多轮 tool-calling 对话
- 自动内存压缩
- 会话历史持久化
"""
def __init__(
self,
name: str,
role_description: str,
provider: str = "openai",
model: str = "gpt-4",
api_key: Optional[str] = None,
base_url: Optional[str] = None,
workspace: Optional[Path] = None,
context_window_tokens: int = 200000,
embedding_model: Optional[str] = None,
embedding_base_url: Optional[str] = None,
):
"""
初始化 XBot Agent
Args:
name: Agent 名称
role_description: Agent 角色描述
provider: LLM 提供商
model: 模型名称
api_key: API Key
base_url: Base URL
workspace: 工作目录(用于存储会话和记忆)
context_window_tokens: 上下文窗口大小
"""
self.name = name
self.role_description = role_description
# 使用配置文件的默认值
if api_key is None:
api_key = config.API_KEY
if base_url is None:
base_url = config.BASE_URL
if workspace is None:
workspace = Path(config.WORKSPACE)
# 创建工作目录
self.workspace = workspace
self.workspace.mkdir(parents=True, exist_ok=True)
# 创建 LLM 适配器
self.provider = XBotLLMAdapter(
provider=provider,
model_name=model,
api_key=api_key,
base_url=base_url,
)
# 创建工具注册表
self.tools = SimpleToolRegistry()
self._register_default_tools()
# 创建 Agent Loop
self.agent_loop = AgentLoop(
provider=self.provider,
model=model,
tools=self.tools,
max_iterations=50,
)
# 创建会话管理器
self.sessions = SessionManager(self.workspace)
# 创建 SimpleMem 记忆系统
if HAS_SIMPLEMEM and api_key and config.ENABLE_SIMPLEMEM:
# 使用配置文件的 embedding 设置
emb_model = embedding_model or config.EMBEDDING_MODEL
emb_base = embedding_base_url or config.EMBEDDING_BASE_URL or base_url
self.memory = SimpleMemSystem(
api_key=api_key,
base_url=emb_base,
model=model,
embedding_model=emb_model,
db_path=str(self.workspace / "memory_db"),
clear_db=False,
# 并行处理配置
enable_parallel_processing=config.ENABLE_PARALLEL_PROCESSING,
max_parallel_workers=config.MAX_PARALLEL_WORKERS,
enable_parallel_retrieval=config.ENABLE_PARALLEL_RETRIEVAL,
max_retrieval_workers=config.MAX_RETRIEVAL_WORKERS,
enable_planning=config.ENABLE_PLANNING,
enable_reflection=config.ENABLE_REFLECTION,
max_reflection_rounds=config.MAX_REFLECTION_ROUNDS,
)
self._use_simplemem = True
print(f"SimpleMem initialized with embedding: {emb_model}, base_url: {emb_base}")
else:
self.memory = None
self._use_simplemem = False
if not api_key:
print("Warning: No API key provided, SimpleMem will be disabled")
def _register_default_tools(self) -> None:
"""注册默认工具"""
# 可以在这里添加默认工具
pass
def register_tool(
self,
name: str,
func: Any,
description: str = "",
parameters: Optional[dict] = None,
) -> None:
"""注册自定义工具"""
tool_def = {
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters or {
"type": "object",
"properties": {},
"required": [],
}
}
}
# 存储在 tools 中
self.tools.register(name, func, description)
async def run(
self,
user_input: str,
session_id: str = "default",
) -> dict[str, Any]:
"""
运行 Agent 对话
Args:
user_input: 用户输入
session_id: 会话 ID
Returns:
dict: 包含 content, tool_calls 等
"""
# 获取或创建会话
session = self.sessions.get_or_create(session_id)
# 构建系统提示
system_prompt = f"""你是 {self.name}
{self.role_description}
请根据用户的问题回答,并使用 Markdown 格式输出。"""
# 如果使用 SimpleMem检索相关记忆
memory_context = ""
if self._use_simplemem and self.memory:
try:
memory_context = self.memory.ask(user_input)
except Exception as e:
print(f"Memory retrieval error: {e}")
if memory_context:
system_prompt += f"\n\n相关记忆:\n{memory_context}"
# 获取历史消息
history = session.get_history(max_messages=50)
# 构建初始消息
initial_messages = history + [
{"role": "user", "content": user_input}
]
# 运行 agent loop
final_content, tools_used, all_messages = await self.agent_loop.run_loop(
initial_messages=initial_messages,
system_prompt=system_prompt,
)
# 保存到会话
for m in all_messages[len(history):]:
session.messages.append(m)
self.sessions.save(session)
# 保存到 SimpleMem 记忆
if self._use_simplemem and self.memory and final_content:
try:
self.memory.add_dialogue("User", user_input, datetime.now().isoformat())
self.memory.add_dialogue(self.name, final_content, datetime.now().isoformat())
self.memory.finalize()
except Exception as e:
print(f"Memory save error: {e}")
return {
"content": final_content or "No response",
"tool_calls": tools_used,
"session_id": session_id,
}
async def run_stream(
self,
user_input: str,
session_id: str = "default",
):
"""
运行 Agent 对话(流式输出)
先完整执行 agent loop最后流式输出结果
Args:
user_input: 用户输入
session_id: 会话 ID
Yields:
str: 流式回复片段
"""
# 先完整执行 agent loop包含 tool-calling
result = await self.run(user_input, session_id)
content = result["content"]
# 流式输出结果
for char in content:
yield char
def clear_session(self, session_id: str) -> None:
"""清除会话"""
session = self.sessions.get_or_create(session_id)
session.clear()
self.sessions.save(session)
self.sessions.invalidate(session_id)
def list_sessions(self) -> list[dict]:
"""列出所有会话"""
return self.sessions.list_sessions()

View File

@@ -1,74 +0,0 @@
"""
XBot 配置文件
"""
# ==================== LLM 配置 ====================
# 默认 LLM 提供商
DEFAULT_PROVIDER = "openai"
# 默认模型
DEFAULT_MODEL = "gpt-4"
# API Key建议使用环境变量
import os
API_KEY = os.getenv("OPENAI_API_KEY", "")
# Base URL
BASE_URL = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
# ==================== SimpleMem 记忆配置 ====================
# 是否启用 SimpleMem
ENABLE_SIMPLEMEM = True
# Embedding 模型
# 推荐: text-embedding-3-small, text-embedding-3-large, text-embedding-ada-002
# 或使用 Qwen: Qwen/Qwen3-Embedding-0.6B
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small")
# Embedding 服务的 Base URL可选默认使用 BASE_URL
EMBEDDING_BASE_URL = os.getenv("EMBEDDING_BASE_URL", "")
# ==================== 并行处理配置 ====================
# 是否启用并行处理
ENABLE_PARALLEL_PROCESSING = True
MAX_PARALLEL_WORKERS = 8
# 是否启用并行检索
ENABLE_PARALLEL_RETRIEVAL = True
MAX_RETRIEVAL_WORKERS = 4
# 是否启用规划
ENABLE_PLANNING = True
# 是否启用反思
ENABLE_REFLECTION = True
MAX_REFLECTION_ROUNDS = 2
# ==================== 工作目录 ====================
# 工作目录(用于存储会话和记忆)
WORKSPACE = os.getenv("XAGENT_WORKSPACE", "./xbot_workspace")
# 上下文窗口大小
CONTEXT_WINDOW_TOKENS = 200000
# ==================== Agent 配置 ====================
# 默认 Agent 配置
DEFAULT_AGENTS = {
1: {
"name": "数据分析助手",
"role_description": "你是一个专业的数据分析助手,擅长分析数据、生成报告。",
},
2: {
"name": "代码审查助手",
"role_description": "你是一个专业的代码审查助手擅长审查代码、发现bug。",
},
}

View File

@@ -1,190 +0,0 @@
"""Agent loop for tool-calling conversation."""
import asyncio
import json
import re
from typing import Any, Callable, Optional
from loguru import logger
class AgentLoop:
"""
Agent loop with tool-calling capability.
This is the core of the nanobot agent - it handles:
- Multi-turn conversation with the LLM
- Tool execution when the model requests it
- Progress callbacks for streaming responses
"""
_TOOL_RESULT_MAX_CHARS = 50000
def __init__(
self,
provider: Any,
model: str,
tools: Any,
max_iterations: int = 50,
):
"""
Initialize the agent loop.
Args:
provider: LLM provider (must implement chat_with_retry)
model: Model name
tools: Tool registry (must have get_definitions() and execute())
max_iterations: Maximum tool call iterations
"""
self.provider = provider
self.model = model
self.tools = tools
self.max_iterations = max_iterations
@staticmethod
def _strip_think(text: Optional[str]) -> Optional[str]:
"""Strip model thinking blocks from content."""
if not text:
return None
# Strip <thinking> tags commonly used by models like DeepSeek
pattern = r"<thinking>[\s\S]*?</thinking>"
text = re.sub(pattern, "", text)
return text.strip() or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint."""
def _fmt(tc):
args = tc.arguments or {}
val = next(iter(args.values()), None) if isinstance(args, dict) else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}...")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
async def run_loop(
self,
initial_messages: list[dict],
system_prompt: str = "",
on_progress: Optional[Callable[..., Any]] = None,
) -> tuple[Optional[str], list[str], list[dict]]:
"""
Run the agent iteration loop.
Args:
initial_messages: Starting message list
system_prompt: System prompt to prepend
on_progress: Optional callback for progress updates
Returns:
Tuple of (final_content, tools_used, all_messages)
"""
# Prepend system prompt if provided
if system_prompt:
messages = [{"role": "system", "content": system_prompt}] + initial_messages
else:
messages = initial_messages
iteration = 0
final_content = None
tools_used: list[str] = []
while iteration < self.max_iterations:
iteration += 1
tool_defs = self.tools.get_definitions() if self.tools else []
response = await self.provider.chat_with_retry(
messages=messages,
tools=tool_defs,
model=self.model,
)
if response.has_tool_calls:
# Send progress update
if on_progress:
thought = self._strip_think(response.content)
if thought:
await on_progress(thought)
await on_progress(self._tool_hint(response.tool_calls), tool_hint=True)
# Add assistant message with tool calls
tool_call_dicts = [
tc.to_openai_tool_call() if hasattr(tc, 'to_openai_tool_call') else tc
for tc in response.tool_calls
]
messages = self._add_assistant_message(
messages, response.content, tool_call_dicts,
reasoning_content=getattr(response, 'reasoning_content', None),
)
# Execute tools
for tool_call in response.tool_calls:
tools_used.append(tool_call.name)
args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
logger.info("Tool call: {}({})", tool_call.name, args_str[:200])
result = await self.tools.execute(tool_call.name, tool_call.arguments)
messages = self._add_tool_result(messages, tool_call.id, tool_call.name, result)
else:
clean = self._strip_think(response.content)
# Handle error responses
if response.finish_reason == "error":
logger.error("LLM returned error: {}", (clean or "")[:200])
final_content = clean or "Sorry, I encountered an error calling the AI model."
break
messages = self._add_assistant_message(
messages, clean,
reasoning_content=getattr(response, 'reasoning_content', None),
)
final_content = clean
break
if final_content is None and iteration >= self.max_iterations:
logger.warning("Max iterations ({}) reached", self.max_iterations)
final_content = (
f"I reached the maximum number of tool call iterations ({self.max_iterations}) "
"without completing the task."
)
return final_content, tools_used, messages
def _add_assistant_message(
self,
messages: list[dict],
content: Optional[str],
tool_calls: Optional[list[dict]] = None,
reasoning_content: Optional[str] = None,
) -> list[dict]:
"""Add an assistant message to the message list."""
msg: dict[str, Any] = {"role": "assistant", "content": content}
if tool_calls:
msg["tool_calls"] = tool_calls
if reasoning_content is not None:
msg["reasoning_content"] = reasoning_content
messages.append(msg)
return messages
def _add_tool_result(
self,
messages: list[dict],
tool_call_id: str,
tool_name: str,
result: Any,
) -> list[dict]:
"""Add a tool result message to the message list."""
# Truncate large results
content = str(result)
if len(content) > self._TOOL_RESULT_MAX_CHARS:
content = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)"
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": tool_name,
"content": content,
})
return messages

View File

@@ -1,240 +0,0 @@
"""Memory system for persistent agent memory."""
import json
import asyncio
import weakref
from pathlib import Path
from typing import Any, Callable, Optional
try:
import tiktoken
HAS_TIKTOKEN = True
except ImportError:
HAS_TIKTOKEN = False
_SAVE_MEMORY_TOOL = [
{
"type": "function",
"function": {
"name": "save_memory",
"description": "Save the memory consolidation result to persistent storage.",
"parameters": {
"type": "object",
"properties": {
"history_entry": {
"type": "string",
"description": "A paragraph summarizing key events/decisions/topics.",
},
"memory_update": {
"type": "string",
"description": "Full updated long-term memory as markdown. Include all existing facts plus new ones.",
},
},
"required": ["history_entry", "memory_update"],
},
},
}
]
class MemoryStore:
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
def __init__(self, workspace: Path):
self.memory_dir = workspace / "memory"
self.memory_dir.mkdir(parents=True, exist_ok=True)
self.memory_file = self.memory_dir / "MEMORY.md"
self.history_file = self.memory_dir / "HISTORY.md"
def read_long_term(self) -> str:
if self.memory_file.exists():
return self.memory_file.read_text(encoding="utf-8")
return ""
def write_long_term(self, content: str) -> None:
self.memory_file.write_text(content, encoding="utf-8")
def append_history(self, entry: str) -> None:
with open(self.history_file, "a", encoding="utf-8") as f:
f.write(entry.rstrip() + "\n\n")
def get_memory_context(self) -> str:
long_term = self.read_long_term()
return f"## Long-term Memory\n{long_term}" if long_term else ""
def _estimate_tokens(text: str) -> int:
"""Estimate token count."""
if HAS_TIKTOKEN:
try:
enc = tiktoken.get_encoding("cl100k_base")
return len(enc.encode(text))
except Exception:
pass
return max(1, len(text) // 4)
def _estimate_message_tokens(message: dict[str, Any]) -> int:
"""Estimate prompt tokens for a message."""
content = message.get("content")
parts = []
if isinstance(content, str):
parts.append(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
text = part.get("text", "")
if text:
parts.append(text)
else:
parts.append(json.dumps(part, ensure_ascii=False))
elif content is not None:
parts.append(json.dumps(content, ensure_ascii=False))
for key in ("name", "tool_call_id"):
value = message.get(key)
if isinstance(value, str) and value:
parts.append(value)
if message.get("tool_calls"):
parts.append(json.dumps(message["tool_calls"], ensure_ascii=False))
payload = "\n".join(parts)
return max(1, _estimate_tokens(payload)) if payload else 1
class MemoryConsolidator:
"""Owns consolidation policy, locking, and session offset updates."""
def __init__(
self,
workspace: Path,
provider: Any,
model: str,
sessions: Any,
context_window_tokens: int = 200000,
):
self.store = MemoryStore(workspace)
self.provider = provider
self.model = model
self.sessions = sessions
self.context_window_tokens = context_window_tokens
self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary()
def get_lock(self, session_key: str) -> asyncio.Lock:
"""Return the shared consolidation lock for one session."""
return self._locks.setdefault(session_key, asyncio.Lock())
async def consolidate_messages(self, messages: list[dict[str, object]]) -> bool:
"""Archive a selected message chunk into persistent memory."""
if not messages:
return True
current_memory = self.store.read_long_term()
prompt = f"""Process this conversation and call the save_memory tool.
## Current Long-term Memory
{current_memory or "(empty)"}
## Conversation to Process
{self._format_messages(messages)}"""
try:
response = await self.provider.chat_with_retry(
messages=[
{"role": "system", "content": "You are a memory consolidation agent."},
{"role": "user", "content": prompt},
],
tools=_SAVE_MEMORY_TOOL,
model=self.model,
)
if not response.has_tool_calls:
return False
args = response.tool_calls[0].arguments
if isinstance(args, str):
args = json.loads(args)
if isinstance(args, list):
args = args[0] if args else {}
if entry := args.get("history_entry"):
self.store.append_history(str(entry))
if update := args.get("memory_update"):
update = str(update)
if update != current_memory:
self.store.write_long_term(update)
return True
except Exception:
return False
def _format_messages(self, messages: list[dict]) -> str:
lines = []
for message in messages:
if not message.get("content"):
continue
lines.append(
f"[{message.get('timestamp', '?')[:16]}] {message['role'].upper()}: {message['content']}"
)
return "\n".join(lines)
def pick_consolidation_boundary(
self,
session: Any,
tokens_to_remove: int,
) -> Optional[tuple[int, int]]:
"""Pick a user-turn boundary that removes enough old prompt tokens."""
start = session.last_consolidated
if start >= len(session.messages) or tokens_to_remove <= 0:
return None
removed_tokens = 0
last_boundary: Optional[tuple[int, int]] = None
for idx in range(start, len(session.messages)):
message = session.messages[idx]
if idx > start and message.get("role") == "user":
last_boundary = (idx, removed_tokens)
if removed_tokens >= tokens_to_remove:
return last_boundary
removed_tokens += _estimate_message_tokens(message)
return last_boundary
async def archive_unconsolidated(self, session: Any) -> bool:
"""Archive the full unconsolidated tail for /new-style session rollover."""
lock = self.get_lock(session.key)
async with lock:
snapshot = session.messages[session.last_consolidated:]
if not snapshot:
return True
return await self.consolidate_messages(snapshot)
async def maybe_consolidate_by_tokens(self, session: Any) -> None:
"""Loop: archive old messages until prompt fits within half the context window."""
if not session.messages or self.context_window_tokens <= 0:
return
lock = self.get_lock(session.key)
async with lock:
target = self.context_window_tokens // 2
# Simple estimation without full prompt build
estimated = sum(_estimate_message_tokens(m) for m in session.messages[session.last_consolidated:])
if estimated < self.context_window_tokens:
return
# Find boundary and consolidate
boundary = self.pick_consolidation_boundary(session, max(1, estimated - target))
if boundary is None:
return
end_idx = boundary[0]
chunk = session.messages[session.last_consolidated:end_idx]
if not chunk:
return
if await self.consolidate_messages(chunk):
session.last_consolidated = end_idx
self.sessions.save(session)

View File

@@ -1,169 +0,0 @@
"""Session management for conversation history."""
import json
import shutil
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
@dataclass
class Session:
"""
A conversation session.
Stores messages in JSONL format for easy reading and persistence.
"""
key: str # session_id
messages: list[dict[str, Any]] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
metadata: dict[str, Any] = field(default_factory=dict)
last_consolidated: int = 0 # Number of messages already consolidated to files
def add_message(self, role: str, content: str, **kwargs: Any) -> None:
"""Add a message to the session."""
msg = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat(),
**kwargs
}
self.messages.append(msg)
self.updated_at = datetime.now()
def get_history(self, max_messages: int = 500) -> list[dict[str, Any]]:
"""Return unconsolidated messages for LLM input, aligned to a user turn."""
unconsolidated = self.messages[self.last_consolidated:]
sliced = unconsolidated[-max_messages:]
# Drop leading non-user messages to avoid orphaned tool_result blocks
for i, m in enumerate(sliced):
if m.get("role") == "user":
sliced = sliced[i:]
break
out: list[dict[str, Any]] = []
for m in sliced:
entry: dict[str, Any] = {"role": m["role"], "content": m.get("content", "")}
for k in ("tool_calls", "tool_call_id", "name"):
if k in m:
entry[k] = m[k]
out.append(entry)
return out
def clear(self) -> None:
"""Clear all messages and reset session to initial state."""
self.messages = []
self.last_consolidated = 0
self.updated_at = datetime.now()
class SessionManager:
"""Manages conversation sessions stored as JSONL files."""
def __init__(self, workspace: Path):
self.workspace = workspace
self.sessions_dir = workspace / "sessions"
self.sessions_dir.mkdir(parents=True, exist_ok=True)
self._cache: dict[str, Session] = {}
def _get_session_path(self, key: str) -> Path:
"""Get the file path for a session."""
safe_key = key.replace(":", "_").replace("/", "_")
return self.sessions_dir / f"{safe_key}.jsonl"
def get_or_create(self, key: str) -> Session:
"""Get an existing session or create a new one."""
if key in self._cache:
return self._cache[key]
session = self._load(key)
if session is None:
session = Session(key=key)
self._cache[key] = session
return session
def _load(self, key: str) -> Optional[Session]:
"""Load a session from disk."""
path = self._get_session_path(key)
if not path.exists():
return None
try:
messages = []
metadata = {}
created_at = None
last_consolidated = 0
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
data = json.loads(line)
if data.get("_type") == "metadata":
metadata = data.get("metadata", {})
created_at = datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None
last_consolidated = data.get("last_consolidated", 0)
else:
messages.append(data)
return Session(
key=key,
messages=messages,
created_at=created_at or datetime.now(),
metadata=metadata,
last_consolidated=last_consolidated
)
except Exception:
return None
def save(self, session: Session) -> None:
"""Save a session to disk."""
path = self._get_session_path(session.key)
with open(path, "w", encoding="utf-8") as f:
metadata_line = {
"_type": "metadata",
"key": session.key,
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"metadata": session.metadata,
"last_consolidated": session.last_consolidated
}
f.write(json.dumps(metadata_line, ensure_ascii=False) + "\n")
for msg in session.messages:
f.write(json.dumps(msg, ensure_ascii=False) + "\n")
self._cache[session.key] = session
def invalidate(self, key: str) -> None:
"""Remove a session from the in-memory cache."""
self._cache.pop(key, None)
def list_sessions(self) -> list[dict[str, Any]]:
"""List all sessions."""
sessions = []
for path in self.sessions_dir.glob("*.jsonl"):
try:
with open(path, encoding="utf-8") as f:
first_line = f.readline().strip()
if first_line:
data = json.loads(first_line)
if data.get("_type") == "metadata":
sessions.append({
"key": data.get("key") or path.stem,
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
"path": str(path)
})
except Exception:
continue
return sorted(sessions, key=lambda x: x.get("updated_at", ""), reverse=True)

View File

@@ -1,11 +0,0 @@
fastapi>=0.100.0
uvicorn[standard]>=0.23.0
pydantic>=2.0.0
openai>=1.0.0
anthropic>=0.18.0
python-dotenv>=1.0.0
aiohttp>=3.8.0
redis>=5.0.0
loguru>=0.7.0
tiktoken>=0.12.0
simplemem>=0.1.0

View File

@@ -1,315 +0,0 @@
---
name: openakita/skills@xiaohongshu-creator
description: Create engaging Xiaohongshu (RED/小红书) content including titles, body text, hashtags, and image style recommendations. Supports multiple content types such as product reviews, tutorials, lifestyle sharing, and shopping guides with platform-specific optimization.
license: MIT
metadata:
author: openakita
version: "1.0.0"
---
# 小红书内容创作助手
专为小红书平台打造的内容创作技能,帮助你生成符合平台调性的高质量笔记,涵盖标题、正文、话题标签和配图建议。
## 适用场景
- 撰写种草笔记(好物推荐、购物分享)
- 撰写产品测评笔记
- 撰写教程类笔记美妆、穿搭、美食、DIY
- 撰写生活分享笔记(旅行、日常、打卡)
- 品牌合作内容创作
- 小红书账号运营内容规划
- 批量生成笔记框架
## 核心创作规范
### 一、标题规则
标题是笔记的第一印象,直接决定点击率。
**硬性要求:**
- 字数限制:**不超过 20 个字符**
- Emoji 数量:**1-2 个**,放在标题开头或结尾
- 禁止使用:感叹号堆叠(`!!!`)、全大写字母
**钩子元素(至少使用 1 种):**
| 钩子类型 | 说明 | 示例 |
|----------|------|------|
| 数字法 | 用数字制造具体感 | `3步搞定通勤妆🌟` |
| 反差法 | 制造意外感 | `月薪3K穿出3W的感觉✨` |
| 痛点法 | 直击目标人群痛点 | `黄皮亲妈色号合集🎨` |
| 悬念法 | 引发好奇心 | `这个习惯让我瘦了20斤💪` |
| 对比法 | 前后/AB对比 | `早C晚A一个月的变化🔥` |
| 权威法 | 借势专业背书 | `皮肤科医生推荐的面霜💊` |
| 场景法 | 具体使用场景 | `约会前30分钟急救妆容💄` |
| 共鸣法 | 引发情感共鸣 | `打工人的高效早餐方案☀️` |
**标题模板:**
```
[数字] + [核心关键词] + [利益点] + [emoji]
[身份标签] + [动作] + [结果] + [emoji]
[场景] + [解决方案] + [emoji]
```
### 二、正文结构
正文长度控制在 **300-500 字符**,遵循四段式结构:
```
🔥 Hook开头钩子 —— 1-2 句,抓住注意力
📝 Core核心内容 —— 主体信息,有价值的干货
📌 Summary总结 —— 精炼要点
👉 CTA行动号召 —— 引导互动
```
**Hook 写法:**
- 提问式:`你们有没有这种烦恼?`
- 共鸣式:`每个打工人都需要这个!`
- 悬念式:`用了三年终于找到了最好用的...`
- 成果式:`坚持30天效果太惊人了`
**Core 写法要点:**
- 使用 emoji 分点(📍🔸💡等)替代纯文字列表
- 每个要点控制在 1-2 行
- 穿插个人体验和感受(增加真实感)
- 重要信息加【】或「」标注
- 适当使用换行,避免大段文字
**CTA 常用句式:**
- `觉得有用的话记得点赞收藏哦~`
- `你们还想看什么类型的分享?评论区告诉我`
- `有同款的姐妹举个手🙋‍♀️`
- `关注我,持续分享[领域]干货`
### 三、话题标签规则
每篇笔记配 **8 个话题标签**,按以下比例分配:
| 类别 | 数量 | 说明 | 示例 |
|------|------|------|------|
| 核心词 | 2 个 | 笔记主题精准关键词 | `#面膜推荐` `#保湿面膜` |
| 品类词 | 2 个 | 所属品类/领域 | `#护肤` `#美妆好物` |
| 场景词 | 2 个 | 使用场景/人群 | `#学生党护肤` `#换季护肤` |
| 热门词 | 2 个 | 平台热门话题 | `#好物分享` `#我的爱用物` |
**选择原则:**
- 优先选择搜索量大但竞争适中的标签
- 避免过于宽泛的标签(如 `#生活`
- 包含长尾关键词提升搜索曝光
- 关注平台当前热门话题榜
### 四、配图风格建议
小红书是视觉驱动平台,封面决定 80% 的点击率。
**10 种推荐视觉风格:**
| 编号 | 风格 | 适用类型 | 要点 |
|------|------|---------|------|
| 1 | 对比拼图 | 测评/效果展示 | 左右或上下对比,标注差异 |
| 2 | 清单图 | 好物合集/推荐 | 白底九宫格产品陈列 |
| 3 | 教程步骤图 | 教程类 | 编号标注步骤,清晰易跟 |
| 4 | 文字封面 | 干货分享 | 大字标题+简洁背景色 |
| 5 | 场景氛围图 | 生活分享/穿搭 | 自然光,生活感强 |
| 6 | 数据图表 | 测评/科普 | 简化数据可视化 |
| 7 | 手绘/插画风 | 知识科普 | 可爱风格信息图 |
| 8 | Vlog截图 | 日常分享 | 视频关键帧+文字标注 |
| 9 | 实拍特写 | 产品种草 | 高清细节,突出质感 |
| 10 | Ins风简约 | 穿搭/家居 | 低饱和度,高级感 |
**封面设计通用原则:**
- 尺寸比例:**3:4**1080×1440px最佳
- 文字不超过封面面积的 **20%**
- 核心信息放在画面上半部分feed 流裁剪安全区)
- 色彩鲜明、对比度高
- 避免过度 P 图,保持真实感
## 内容类型工作流
### 工作流一:种草笔记
```
输入 → 产品名称、品类、价格、目标人群
Step 1: 生成 3 个标题方案(痛点法/数字法/场景法各一)
Step 2: 撰写正文
- Hook个人使用感受/发现契机
- Core产品亮点3-5个、使用方法、适合人群
- Summary一句话总结推荐理由
- CTA引导收藏和讨论
Step 3: 生成 8 个话题标签
Step 4: 封面建议(推荐风格 9 实拍特写 或 风格 2 清单图)
输出 → 完整笔记(可直接发布)
```
### 工作流二:测评笔记
```
输入 → 产品列表2-5 个)、测评维度
Step 1: 标题使用对比法或数字法
Step 2: 撰写正文
- Hook测评动机/痛点引入
- Core逐项对比成分/价格/使用感/性价比)
- Summary各产品评分或排名
- CTA`你们用过哪个?评论区聊聊`
Step 3: 话题标签(增加品牌词标签)
Step 4: 封面建议(推荐风格 1 对比拼图 或 风格 6 数据图表)
输出 → 完整测评笔记
```
### 工作流三:教程笔记
```
输入 → 教程主题、难度、目标人群
Step 1: 标题使用数字法(`X步学会...`
Step 2: 撰写正文
- Hook学会后的效果/价值
- Core分步骤讲解每步 1-2 句)
- Summary关键注意事项
- CTA`学会的打个✅`
Step 3: 话题标签(增加 `#教程` `#手把手教学` 等)
Step 4: 封面建议(推荐风格 3 步骤图 或 风格 4 文字封面)
输出 → 完整教程笔记 + 多图建议(每步一张图)
```
### 工作流四:生活分享笔记
```
输入 → 分享主题、场景、情绪基调
Step 1: 标题使用共鸣法或场景法
Step 2: 撰写正文
- Hook故事开头/情感切入
- Core分享细节、个人感受、实用信息
- Summary感悟或建议
- CTA`你们有类似的经历吗?`
Step 3: 话题标签(增加情感/生活方式标签)
Step 4: 封面建议(推荐风格 5 场景氛围图 或 风格 8 Vlog截图
输出 → 完整分享笔记
```
## 完整示例
### 示例:护肤种草笔记
**用户输入:** `帮我写一篇关于珂润面霜的种草笔记,目标人群是敏感肌学生党`
**输出:**
**标题方案:**
1. `敏感肌救星这罐面霜用了3年回购🧴`
2. `学生党百元面霜天花板💰`
3. `换季烂脸?皮肤科医生同款面霜🏥`
**正文选用标题1**
```
姐妹们!!敏感肌的福音真的来了
作为一个烂脸三年的敏感肌 终于找到了真正好用的面霜
📍珂润面霜使用感受:
🔸质地:乳白色霜状 不厚重 好推开
🔸保湿:神经酰胺成分 修护屏障一绝
🔸温和度:无香精无酒精 敏感期也能用
🔸性价比:百元出头 学生党无压力
💡使用tips
1⃣ 先乳后霜 吸收更好
2⃣ 换季加量使用 厚涂当睡眠面膜
3⃣ 搭配同系列化妆水效果翻倍
用了整整三年 每次回购都是囤两罐的节奏
敏感肌姐妹真的可以闭眼入!
觉得有用记得收藏💕 还想看什么评论区告诉我~
```
**话题标签:**
`#珂润面霜` `#敏感肌护肤` `#学生党护肤` `#平价面霜推荐` `#神经酰胺` `#换季护肤` `#好物分享` `#护肤干货`
**封面建议:** 风格 9实拍特写自然光下拍摄面霜质地细节图辅以手写标注关键成分。
## 高级技巧
### 发布时间建议
| 时段 | 说明 |
|------|------|
| 7:00-9:00 | 通勤高峰,碎片化浏览 |
| 12:00-14:00 | 午休时间,浏览高峰 |
| 18:00-20:00 | 下班后休闲浏览 |
| 21:00-23:00 | 睡前黄金时段,互动率最高 |
### SEO 优化
- 标题和正文前 50 字包含核心关键词
- 使用平台搜索下拉词作为参考
- 正文自然融入 3-5 个相关关键词(避免堆砌)
- 评论区补充关键词(自评增加曝光)
### 互动率提升
- 正文中设置互动问题(`你们觉得呢?`
- 结尾提供选择题(`A还是B评论区投票`
- 24 小时内回复所有评论
- 置顶评论放重要补充信息
## 常见误区
| 误区 | 正确做法 |
|------|---------|
| 标题过长超20字 | 精炼到 20 字以内,信息密度优先 |
| 正文大段不分行 | 每 2-3 行空一行,用 emoji 分隔 |
| 标签太宽泛 | 组合使用泛词+精准词+长尾词 |
| 封面文字太多 | 封面突出视觉冲击,详细信息放正文 |
| 纯广告无真实感 | 加入个人体验和真实细节 |
| 内容同质化严重 | 找到独特切入角度(身份/场景/反差) |
| 忽略评论区运营 | 主动回复并引导二次互动 |
## 输出格式规范
每次生成内容时,严格按以下格式输出:
```markdown
## 📝 小红书笔记
### 标题方案
1. [方案一]
2. [方案二]
3. [方案三]
### 正文
[完整正文内容]
### 话题标签
[8个标签]
### 封面建议
- 推荐风格:[编号+名称]
- 具体建议:[详细说明]
- 配色方案:[色系建议]
### 发布建议
- 推荐时段:[具体时间]
- 注意事项:[补充说明]
```