Files
X-Agents/core/agents/agent/loop.py
DESKTOP-72TV0V4\caoxiaozhu 0e0f988264 feat: 增强 Agent 意图识别和上下文管理
- 新增 intent_router.py 意图路由模块
- 优化 context.py 上下文管理
- 增强 loop.py Agent 运行循环
- 更新 memory.py 记忆模块
- 修复 builtin.py 工具函数

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 21:44:00 +08:00

705 lines
27 KiB
Python

"""Agent run loop - complete implementation."""
import asyncio
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Awaitable, AsyncGenerator
from agents.agent.context import ContextBuilder
from agents.agent.memory import AgentMemory
from agents.agent.intent_router import IntentRouter, create_intent_router, IntentType
from agents.llm import LLMProvider, LLMResponse, ProviderFactory
from agents.tools import ToolRegistry
logger = logging.getLogger(__name__)
class AgentLoop:
"""Agent loop with message processing, LLM calls, tool execution, and streaming."""
_TOOL_RESULT_MAX_CHARS = 10000
def __init__(
self,
provider: LLMProvider,
model: str,
workspace: Path | None = None,
max_iterations: int = 10,
tools: ToolRegistry | None = None,
enable_intent_routing: bool = True,
):
"""Initialize the agent loop.
Args:
provider: LLM provider (OpenAI, Anthropic, etc.)
model: Model name to use
workspace: Workspace directory for memory and configs
max_iterations: Maximum tool call iterations
tools: Tool registry (creates default if None)
enable_intent_routing: Enable intent recognition and routing
"""
self.provider = provider
self.model = model
self.workspace = workspace or Path.cwd()
self.max_iterations = max_iterations
self.tools = tools
self.enable_intent_routing = enable_intent_routing
self.context = ContextBuilder(self.workspace)
self.memory = AgentMemory(self.workspace)
# Initialize intent router
if enable_intent_routing:
self.intent_router = create_intent_router(llm_provider=provider)
else:
self.intent_router = None
async def chat(
self,
message: str,
history: list[dict[str, Any]] | None = None,
session_key: str = "default",
on_progress: Callable[[str], Awaitable[None]] | None = None,
model_id: str | None = None,
model_name: str | None = None,
model_provider: str | None = None,
api_key: str | None = None,
base_url: str | None = None,
use_xbot: bool = False,
) -> str:
"""Process a chat message and return the response.
Args:
message: User message
history: Conversation history
session_key: Session identifier
on_progress: Optional callback for progress updates
model_id: Model ID (optional)
model_name: Model name (optional)
model_provider: Model provider (optional)
api_key: API key (optional)
base_url: Custom API base URL (optional)
use_xbot: Use xbot mode (optional)
Returns:
Agent response content
"""
history = history or []
# Intent recognition and routing
intent_decision = None
if self.intent_router and not history: # Only for first message in conversation
try:
tool_names = self.tools.tool_names if self.tools else []
intent_decision = self.intent_router.route(
message=message,
available_tools=tool_names,
)
logger.info(f"Intent recognized: {intent_decision['intent']} -> {intent_decision['action']}")
# For simple intent, respond directly without tool loop
if intent_decision["intent"] == IntentType.SIMPLE.value:
# Build messages for direct response
messages = self.context.build_messages(
history=history,
current_message=message,
)
# Call LLM without tools
response = await self.provider.chat_with_retry(
messages=messages,
tools=None, # No tools for simple requests
model=self.model,
)
content = self._strip_think(response.content) or "好的,让我来回答这个问题。"
# Save to history
self._save_history(session_key, messages, len(history))
return content
except Exception as e:
logger.warning(f"Intent routing failed: {e}, continuing with normal flow")
# Load history from session if session_key is provided
if session_key and session_key != "default":
loaded_history = self.memory.get_history(session_key, max_messages=20)
if loaded_history:
# Merge any split assistant messages
loaded_history = self._merge_history_messages(loaded_history)
logger.info(f"Loaded {len(loaded_history)} messages from session history")
# Merge loaded history with provided history (loaded takes precedence if empty)
if not history:
history = loaded_history
else:
# Append loaded history before current messages
history = loaded_history + history
# Check if dynamic provider parameters are provided
if api_key or model_provider:
logger.info(f"Using dynamic provider: model_provider={model_provider}, model_name={model_name}, base_url={base_url}")
# Create temporary provider with dynamic parameters
temp_provider = ProviderFactory.create(
provider=model_provider or "openai",
api_key=api_key,
api_base=base_url,
)
# Use temporary provider and model
temp_model = model_name or temp_provider.get_default_model()
logger.info(f"Created temp provider with model: {temp_model}")
return await self._chat_with_provider(
message=message,
history=history,
session_key=session_key,
on_progress=on_progress,
provider=temp_provider,
model=temp_model,
)
# Build messages for LLM
messages = self.context.build_messages(
history=history,
current_message=message,
)
# Log which provider is being used
logger.info(f"Using static provider: {type(self.provider).__name__}, model={self.model}")
# Run the agent loop
final_content, tools_used, all_messages = await self._run_loop(
messages, on_progress
)
# Save to history
self._save_history(session_key, all_messages, len(history))
return final_content or "No response generated."
async def _chat_with_provider(
self,
message: str,
history: list[dict[str, Any]] | None = None,
session_key: str = "default",
on_progress: Callable[[str], Awaitable[None]] | None = None,
provider: LLMProvider | None = None,
model: str | None = None,
) -> str:
"""Chat with a specific provider (used for dynamic provider support).
Args:
message: User message
history: Conversation history
session_key: Session identifier
on_progress: Optional callback for progress updates
provider: LLM provider to use
model: Model name to use
Returns:
Agent response content
"""
history = history or []
# Intent recognition and routing
intent_decision = None
if self.intent_router and not history: # Only for first message in conversation
try:
tool_names = self.tools.tool_names if self.tools else []
intent_decision = self.intent_router.route(
message=message,
available_tools=tool_names,
)
logger.info(f"Intent recognized: {intent_decision['intent']} -> {intent_decision['action']}")
# For simple intent, respond directly without tool loop
if intent_decision["intent"] == IntentType.SIMPLE.value:
# Build messages for direct response
messages = self.context.build_messages(
history=history,
current_message=message,
)
# Call LLM without tools
response = await self.provider.chat_with_retry(
messages=messages,
tools=None, # No tools for simple requests
model=self.model,
)
content = self._strip_think(response.content) or "好的,让我来回答这个问题。"
# Save to history
self._save_history(session_key, messages, len(history))
return content
except Exception as e:
logger.warning(f"Intent routing failed: {e}, continuing with normal flow")
# Load history from session if session_key is provided
if session_key and session_key != "default":
loaded_history = self.memory.get_history(session_key, max_messages=20)
if loaded_history:
# Merge any split assistant messages
loaded_history = self._merge_history_messages(loaded_history)
logger.info(f"Loaded {len(loaded_history)} messages from session history")
# Merge loaded history with provided history (loaded takes precedence if empty)
if not history:
history = loaded_history
else:
# Append loaded history before current messages
history = loaded_history + history
provider = provider or self.provider
model = model or self.model
# Build messages for LLM
messages = self.context.build_messages(
history=history,
current_message=message,
)
# Run the agent loop with custom provider
final_content, tools_used, all_messages = await self._run_loop(
messages, on_progress, provider=provider, model=model
)
# Save to history
self._save_history(session_key, all_messages, len(history))
return final_content or "No response generated."
async def chat_stream(
self,
message: str,
history: list[dict[str, Any]] | None = None,
session_key: str = "default",
model_id: str | None = None,
model_name: str | None = None,
model_provider: str | None = None,
api_key: str | None = None,
base_url: str | None = None,
use_xbot: bool = False,
) -> AsyncGenerator[str, None]:
"""Process a chat message with streaming response.
Args:
message: User message
history: Conversation history
session_key: Session identifier
model_id: Model ID (optional)
model_name: Model name (optional)
model_provider: Model provider (optional)
api_key: API key (optional)
base_url: Custom API base URL (optional)
use_xbot: Use xbot mode (optional)
Yields:
Response content chunks
"""
history = history or []
# Load history from session if session_key is provided
if session_key and session_key != "default":
loaded_history = self.memory.get_history(session_key, max_messages=20)
if loaded_history:
logger.info(f"[stream] Loaded {len(loaded_history)} messages from session history")
# Merge loaded history with provided history (loaded takes precedence if empty)
if not history:
history = loaded_history
else:
# Append loaded history before current messages
history = loaded_history + history
# Check if dynamic provider parameters are provided
if api_key or model_provider:
logger.info(f"[stream] Using dynamic provider: model_provider={model_provider}, model_name={model_name}, base_url={base_url}")
# Create temporary provider with dynamic parameters
temp_provider = ProviderFactory.create(
provider=model_provider or "openai",
api_key=api_key,
api_base=base_url,
)
# Use temporary provider and model
temp_model = model_name or temp_provider.get_default_model()
logger.info(f"[stream] Created temp provider with model: {temp_model}")
async for chunk in self._chat_stream_with_provider(
message=message,
history=history,
session_key=session_key,
provider=temp_provider,
model=temp_model,
):
yield chunk
return
# Build messages for LLM
messages = self.context.build_messages(
history=history,
current_message=message,
)
# Stream the response
async for chunk in self._run_loop_stream(messages):
yield chunk
async def _chat_stream_with_provider(
self,
message: str,
history: list[dict[str, Any]] | None = None,
session_key: str = "default",
provider: LLMProvider | None = None,
model: str | None = None,
) -> AsyncGenerator[str, None]:
"""Stream chat with a specific provider (used for dynamic provider support).
Args:
message: User message
history: Conversation history
session_key: Session identifier
provider: LLM provider to use
model: Model name to use
Yields:
Response content chunks
"""
history = history or []
# Load history from session if session_key is provided
if session_key and session_key != "default":
loaded_history = self.memory.get_history(session_key, max_messages=20)
if loaded_history:
logger.info(f"[stream] Loaded {len(loaded_history)} messages from session history")
# Merge loaded history with provided history (loaded takes precedence if empty)
if not history:
history = loaded_history
else:
# Append loaded history before current messages
history = loaded_history + history
provider = provider or self.provider
model = model or self.model
# Build messages for LLM
messages = self.context.build_messages(
history=history,
current_message=message,
)
# Stream the response with custom provider
async for chunk in self._run_loop_stream(messages, provider=provider, model=model):
yield chunk
async def _run_loop(
self,
initial_messages: list[dict],
on_progress: Callable[..., Awaitable[None]] | None = None,
provider: LLMProvider | None = None,
model: str | None = None,
) -> tuple[str | None, list[str], list[dict]]:
"""Run the agent iteration loop.
Args:
initial_messages: Initial message list
on_progress: Progress callback
provider: Optional LLM provider to use (defaults to self.provider)
model: Optional model name to use (defaults to self.model)
Returns:
Tuple of (final_content, tools_used, all_messages)
"""
messages = initial_messages
iteration = 0
final_content = None
tools_used: list[str] = []
provider = provider or self.provider
model = model or self.model
tool_defs = self.tools.get_definitions() if self.tools else []
# Intent recognition - determine if tools are needed before first LLM call
user_message = ""
for msg in messages:
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
# Apply intent recognition on first iteration
if self.enable_intent_routing and self.intent_router and user_message:
available_tools = [t.get("function", {}).get("name", "") for t in tool_defs] if tool_defs else []
routing_decision = self.intent_router.route(
user_message,
available_tools=available_tools,
)
intent = routing_decision.get("intent", "simple")
logger.info(f"Intent recognized: {intent} for message: {user_message[:50]}...")
# If simple intent, don't pass tools to reduce unnecessary tool calls
if intent == "simple":
tool_defs = []
logger.info("Simple intent detected - disabling tool definitions for this request")
while iteration < self.max_iterations:
iteration += 1
# Call LLM
response = await provider.chat_with_retry(
messages=messages,
tools=tool_defs if tool_defs else None,
model=model,
)
if response.has_tool_calls:
# Progress callback for tool calls
if on_progress:
thought = self._strip_think(response.content)
if thought:
await on_progress(thought)
await on_progress(self._tool_hint(response.tool_calls), tool_hint=True)
# Add assistant message with tool calls
tool_call_dicts = [tc.to_dict() for tc in response.tool_calls]
messages = self.context.add_assistant_message(
messages,
response.content,
tool_call_dicts,
reasoning_content=response.reasoning_content,
)
# Execute tools
for tool_call in response.tool_calls:
tools_used.append(tool_call.name)
args = tool_call.arguments
logger.info(f"Tool call: {tool_call.name}({args})")
# Execute tool
result = await self._execute_tool(tool_call.name, args)
# Truncate large results
if len(result) > self._TOOL_RESULT_MAX_CHARS:
result = result[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)"
# Add tool result
messages = self.context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
else:
# No tool calls - return the response
clean = self._strip_think(response.content)
# Handle errors
if response.finish_reason == "error":
logger.error(f"LLM error: {clean}")
final_content = clean or "Sorry, I encountered an error calling the AI model."
break
messages = self.context.add_assistant_message(
messages, clean, reasoning_content=response.reasoning_content
)
final_content = clean
break
if final_content is None and iteration >= self.max_iterations:
logger.warning(f"Max iterations ({self.max_iterations}) reached")
final_content = (
f"I reached the maximum number of iterations ({self.max_iterations}) "
"without completing the task."
)
return final_content, tools_used, messages
async def _run_loop_stream(
self,
initial_messages: list[dict],
provider: LLMProvider | None = None,
model: str | None = None,
) -> AsyncGenerator[str, None]:
"""Run the agent loop with streaming response.
Args:
initial_messages: Initial message list
provider: Optional LLM provider to use (defaults to self.provider)
model: Optional model name to use (defaults to self.model)
Yields:
Response content chunks
"""
provider = provider or self.provider
model = model or self.model
tool_defs = self.tools.get_definitions() if self.tools else []
# Intent recognition - determine if tools are needed before first LLM call
user_message = ""
for msg in initial_messages:
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
# Apply intent recognition
if self.enable_intent_routing and self.intent_router and user_message:
available_tools = [t.get("function", {}).get("name", "") for t in tool_defs] if tool_defs else []
routing_decision = self.intent_router.route(
user_message,
available_tools=available_tools,
)
intent = routing_decision.get("intent", "simple")
logger.info(f"[stream] Intent recognized: {intent} for message: {user_message[:50]}...")
# If simple intent, don't pass tools to reduce unnecessary tool calls
if intent == "simple":
tool_defs = []
logger.info("[stream] Simple intent detected - disabling tool definitions")
# First call to check for tool calls
response = await provider.chat_with_retry(
messages=initial_messages,
tools=tool_defs if tool_defs else None,
model=model,
)
if response.has_tool_calls:
# Execute tools first
for tool_call in response.tool_calls:
logger.info(f"Tool call: {tool_call.name}")
result = await self._execute_tool(tool_call.name, tool_call.arguments)
# Add to messages
initial_messages = self.context.add_tool_result(
initial_messages, tool_call.id, tool_call.name, result
)
# Recursive call after tool execution
async for chunk in self._run_loop_stream(initial_messages, provider=provider, model=model):
yield chunk
else:
# Stream the content
content = self._strip_think(response.content)
if content:
yield content
async def _execute_tool(self, tool_name: str, args: dict) -> str:
"""Execute a tool.
Args:
tool_name: Name of the tool to execute
args: Tool arguments
Returns:
Tool execution result
"""
if self.tools:
return await self.tools.execute(tool_name, args)
return json.dumps({"error": "No tools registered"})
@staticmethod
def _strip_think(text: str | None) -> str | None:
"""Strip think blocks that some models embed in content."""
if not text:
return None
import re
# Match content between [/INST] or [/CONTINUE] tags commonly used in thinking
patterns = [
r"<think>[\s\S]*?</think>",
r"<\/?think>",
]
for pattern in patterns:
text = re.sub(pattern, "", text)
return text.strip() or None
@staticmethod
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint."""
def _fmt(tc):
args = tc.arguments or {}
val = next(iter(args.values()), None) if isinstance(args, dict) else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}...")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)
@staticmethod
def _merge_history_messages(messages: list[dict]) -> list[dict]:
"""Merge adjacent assistant messages that have content and tool_calls separately.
When saving/loading history, assistant messages with both content and tool_calls
might be split into multiple entries. This method merges them back together.
Args:
messages: List of message dictionaries
Returns:
Merged list of messages
"""
if not messages:
return messages
merged = []
i = 0
while i < len(messages):
current = messages[i].copy()
# If current is an assistant message with tool_calls, check if next is
# an assistant message with content (or vice versa)
if current.get("role") == "assistant" and current.get("tool_calls"):
# Look ahead for another assistant message to merge with
j = i + 1
while j < len(messages):
next_msg = messages[j]
if next_msg.get("role") == "assistant":
# Merge content
if next_msg.get("content") and not current.get("content"):
current["content"] = next_msg.get("content")
# Merge tool_calls (should already be in current)
if next_msg.get("tool_calls") and not current.get("tool_calls"):
current["tool_calls"] = next_msg.get("tool_calls")
j += 1
else:
break
# If we merged multiple messages, skip them
if j > i + 1:
logger.debug(f"Merged {j - i} assistant messages")
i = j
else:
merged.append(current)
i += 1
return merged
def _save_history(
self,
session_key: str,
messages: list[dict],
skip: int = 0,
) -> None:
"""Save messages to history.
Args:
session_key: Session identifier
messages: Messages to save
skip: Number of messages to skip
"""
for m in messages[skip:]:
role = m.get("role")
content = m.get("content")
if role == "user" and content:
self.memory.add_to_history("user", str(content)[:1000], session_key)
elif role == "assistant":
# Build a combined message with content and tool_calls
msg_data = {}
if content:
msg_data["content"] = str(content)[:1000]
if m.get("tool_calls"):
msg_data["tool_calls"] = m.get("tool_calls", [])
# Save as a single JSON message with all data
if msg_data:
msg_str = json.dumps(msg_data)
self.memory.add_to_history("assistant", msg_str, session_key)
# Save tool results (needed for multi-turn conversations)
elif role == "tool":
tool_call_id = m.get("tool_call_id", "")
tool_name = m.get("name", "")
tool_content = m.get("content", "")
tool_result_str = json.dumps({
"tool_call_id": tool_call_id,
"name": tool_name,
"content": tool_content
})
self.memory.add_to_history("tool", f"[tool_result]{tool_result_str}", session_key)