""" Agent 执行器 - 负责执行 Agent 的核心逻辑 """ from typing import Any, Optional from app.llm.factory import LLMFactory from app.agent.tools.registry import ToolRegistry from app.agent.memory.session import SessionManager from app.security.audit import AuditLogger class AgentExecutor: """Agent 执行器""" def __init__( self, agent_id: str, llm_factory: LLMFactory, tool_registry: ToolRegistry, session_manager: SessionManager, audit_logger: AuditLogger, config: dict ): self.agent_id = agent_id self.llm_factory = llm_factory self.tool_registry = tool_registry self.session_manager = session_manager self.audit_logger = audit_logger self.config = config # 获取 LLM self.llm = self.llm_factory.get_llm() async def run( self, message: str, session_id: str, context: dict ) -> dict[str, Any]: """运行 Agent""" tools_used = [] # 1. 获取会话历史 history = self.session_manager.get_history(session_id) # 2. 构建消息列表 messages = self._build_messages(message, history) # 3. 获取可用工具 available_tools = self._get_available_tools() # 4. 调用 LLM(带工具) try: response = await self.llm.agenerate( messages=messages, tools=available_tools ) # 检查是否需要调用工具 response_message = response.generations[0][0] # 如果有工具调用 if hasattr(response_message, "tool_calls") and response_message.tool_calls: for tool_call in response_message.tool_calls: tool_name = tool_call.name tool_args = tool_call.arguments # 记录工具使用 tools_used.append(tool_name) # 执行工具 tool_result = await self._execute_tool(tool_name, tool_args) # 添加工具结果到消息 messages.append(response_message) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(tool_result) }) # 再次调用 LLM 生成最终响应 final_response = await self.llm.agenerate(messages=messages) final_message = final_response.generations[0][0].text # 保存到历史 self.session_manager.add_message(session_id, "user", message) self.session_manager.add_message(session_id, "assistant", final_message) return { "reply": final_message, "tools_used": tools_used, "metadata": {} } # 没有工具调用,直接返回 reply = response_message.text # 保存到历史 self.session_manager.add_message(session_id, "user", message) self.session_manager.add_message(session_id, "assistant", reply) return { "reply": reply, "tools_used": tools_used, "metadata": {} } except Exception as e: # 记录错误 self.audit_logger.log( action="agent_error", agent_id=self.agent_id, session_id=session_id, details={"error": str(e)} ) raise def _build_messages(self, message: str, history: list) -> list: """构建消息列表""" messages = [] # 添加系统提示 system_prompt = self.config.get("system_prompt", "You are a helpful assistant.") messages.append({"role": "system", "content": system_prompt}) # 添加历史 for msg in history: messages.append(msg) # 添加当前消息 messages.append({"role": "user", "content": message}) return messages def _get_available_tools(self) -> list: """获取可用工具定义""" agent_tools = self.config.get("tools", []) tool_defs = [] for tool_name in agent_tools: tool_def = self.tool_registry.get_tool_definition(tool_name) if tool_def: tool_defs.append(tool_def) return tool_defs async def _execute_tool(self, tool_name: str, args: dict) -> Any: """执行工具""" # 安全检查 tool_func, metadata = self.tool_registry.get_tool(tool_name) # 如果需要审批,抛出异常 if metadata.require_approval: raise PermissionError( f"Tool '{tool_name}' requires approval before execution" ) # 执行工具 try: result = tool_func(**args) return result except Exception as e: return {"error": str(e)}