Files
X-Financial/server/src/app/services/agent_runs.py

438 lines
16 KiB
Python
Raw Normal View History

2026-05-11 03:51:24 +00:00
from __future__ import annotations
import uuid
2026-05-18 02:53:06 +00:00
from datetime import UTC, datetime, timedelta
2026-05-11 03:51:24 +00:00
from typing import Any
from sqlalchemy.orm import Session
from app.core.config import get_settings
2026-05-18 02:53:06 +00:00
from app.core.agent_enums import AgentName, AgentPermissionLevel, AgentRunStatus
2026-05-11 03:51:24 +00:00
from app.core.logging import get_logger
from app.models.agent_run import AgentRun, AgentToolCall, SemanticParseLog
from app.repositories.agent_run import AgentRunRepository
from app.schemas.agent_run import (
AgentRunRead,
AgentRunStatsRead,
AgentToolCallRead,
SemanticParseRead,
)
2026-05-11 03:51:24 +00:00
from app.services.agent_foundation import AgentFoundationService
from app.services.knowledge_ingest_log import enrich_knowledge_ingest_route_json
2026-05-11 03:51:24 +00:00
logger = get_logger("app.services.agent_runs")
2026-05-18 02:53:06 +00:00
KNOWLEDGE_SYNC_HEARTBEAT_TIMEOUT = timedelta(minutes=30)
KNOWLEDGE_SYNC_JOB_TYPES = {"knowledge_index_sync", "llm_wiki_sync"}
2026-05-18 02:53:06 +00:00
2026-05-11 03:51:24 +00:00
class AgentRunService:
def __init__(self, db: Session) -> None:
self.db = db
self.repository = AgentRunRepository(db)
def list_runs(
self,
*,
agent: str | None = None,
status: str | None = None,
source: str | None = None,
limit: int = 20,
) -> list[AgentRunRead]:
self._ensure_ready()
2026-05-18 02:53:06 +00:00
self._reconcile_stale_knowledge_index_runs()
2026-05-11 03:51:24 +00:00
runs = self.repository.list(agent=agent, status=status, source=source, limit=limit)
return [self._serialize_run(item) for item in runs]
def get_run(self, run_id: str) -> AgentRunRead | None:
self._ensure_ready()
2026-05-18 02:53:06 +00:00
self._reconcile_stale_knowledge_index_runs(target_run_id=run_id)
2026-05-11 03:51:24 +00:00
run = self.repository.get_by_run_id(run_id)
if run is None:
return None
return self._serialize_run(run, enrich_knowledge_ingest=True)
2026-05-11 03:51:24 +00:00
def summarize_runs(
self,
*,
agent: str | None = None,
status: str | None = None,
source: str | None = None,
limit: int = 200,
) -> AgentRunStatsRead:
self._ensure_ready()
self._reconcile_stale_knowledge_index_runs()
runs = self.repository.list(agent=agent, status=status, source=source, limit=limit)
agents: dict[str, int] = {}
statuses: dict[str, int] = {}
tool_statuses: dict[str, int] = {}
tool_call_count = 0
failed_tool_call_count = 0
llm_call_count = 0
failed_llm_call_count = 0
model_fallback_count = 0
model_guardrail_count = 0
recent_errors: list[dict[str, Any]] = []
for run in runs:
agents[run.agent] = agents.get(run.agent, 0) + 1
statuses[run.status] = statuses.get(run.status, 0) + 1
ontology_json = run.ontology_json or {}
if ontology_json.get("parse_strategy") == "rule_fallback":
model_fallback_count += 1
model_summary = ontology_json.get("model_invocation_summary")
if isinstance(model_summary, dict) and model_summary.get("model_guardrail_reason"):
model_guardrail_count += 1
if run.status == AgentRunStatus.FAILED.value and run.error_message:
recent_errors.append(
{
"run_id": run.run_id,
"agent": run.agent,
"stage": (run.route_json or {}).get("stage"),
"message": run.error_message,
}
)
for tool_call in run.tool_calls:
tool_call_count += 1
tool_statuses[tool_call.status] = tool_statuses.get(tool_call.status, 0) + 1
failed = tool_call.status == "failed"
if failed:
failed_tool_call_count += 1
if tool_call.tool_type == "llm":
llm_call_count += 1
if failed:
failed_llm_call_count += 1
if tool_call.error_message:
recent_errors.append(
{
"run_id": run.run_id,
"agent": run.agent,
"tool_name": tool_call.tool_name,
"tool_type": tool_call.tool_type,
"message": tool_call.error_message,
}
)
return AgentRunStatsRead(
window_limit=limit,
total_runs=len(runs),
succeeded_runs=statuses.get(AgentRunStatus.SUCCEEDED.value, 0),
blocked_runs=statuses.get(AgentRunStatus.BLOCKED.value, 0),
failed_runs=statuses.get(AgentRunStatus.FAILED.value, 0),
tool_call_count=tool_call_count,
failed_tool_call_count=failed_tool_call_count,
llm_call_count=llm_call_count,
failed_llm_call_count=failed_llm_call_count,
model_fallback_count=model_fallback_count,
model_guardrail_count=model_guardrail_count,
agents=agents,
statuses=statuses,
tool_statuses=tool_statuses,
recent_errors=recent_errors[:10],
)
2026-05-11 03:51:24 +00:00
def create_run(
self,
*,
agent: str,
source: str,
user_id: str | None = None,
task_id: str | None = None,
ontology_json: dict[str, Any] | None = None,
route_json: dict[str, Any] | None = None,
permission_level: str = AgentPermissionLevel.READ.value,
status: str = AgentRunStatus.RUNNING.value,
result_summary: str | None = None,
error_message: str | None = None,
started_at: datetime | None = None,
finished_at: datetime | None = None,
) -> AgentRunRead:
self._ensure_ready()
run = AgentRun(
run_id=f"run_{uuid.uuid4().hex[:16]}",
agent=agent,
source=source,
user_id=user_id,
task_id=task_id,
ontology_json=ontology_json or {},
route_json=route_json or {},
permission_level=permission_level,
status=status,
result_summary=result_summary,
error_message=error_message,
started_at=started_at or datetime.now(UTC),
finished_at=finished_at,
)
created = self.repository.create_run(run)
logger.info("Created agent run id=%s run_id=%s", created.id, created.run_id)
return self._serialize_run(created)
def update_run(
self,
run_id: str,
*,
agent: str | None = None,
ontology_json: dict[str, Any] | None = None,
route_json: dict[str, Any] | None = None,
permission_level: str | None = None,
status: str | None = None,
result_summary: str | None = None,
error_message: str | None = None,
finished_at: datetime | None = None,
) -> AgentRunRead:
self._ensure_ready()
run = self.repository.get_by_run_id(run_id)
if run is None:
raise LookupError("Run not found")
if agent is not None:
run.agent = agent
if ontology_json is not None:
run.ontology_json = ontology_json
if route_json is not None:
run.route_json = route_json
if permission_level is not None:
run.permission_level = permission_level
if status is not None:
run.status = status
if result_summary is not None:
run.result_summary = result_summary
if error_message is not None:
run.error_message = error_message
if finished_at is not None:
run.finished_at = finished_at
updated = self.repository.save_run(run)
logger.info("Updated agent run run_id=%s status=%s", updated.run_id, updated.status)
return self._serialize_run(updated)
def merge_route_json(
self,
run_id: str,
route_patch: dict[str, Any],
*,
status: str | None = None,
result_summary: str | None = None,
error_message: str | None = None,
finished_at: datetime | None = None,
) -> AgentRunRead:
self._ensure_ready()
run = self.repository.get_by_run_id(run_id)
if run is None:
raise LookupError("Run not found")
route_json = dict(run.route_json or {})
route_json.update(route_patch or {})
run.route_json = route_json
if status is not None:
run.status = status
if result_summary is not None:
run.result_summary = result_summary
if error_message is not None:
run.error_message = error_message
if finished_at is not None:
run.finished_at = finished_at
updated = self.repository.save_run(run)
logger.info("Merged route_json for agent run run_id=%s status=%s", updated.run_id, updated.status)
return self._serialize_run(updated)
2026-05-11 03:51:24 +00:00
def record_tool_call(
self,
*,
run_id: str,
tool_type: str,
tool_name: str,
request_json: dict[str, Any] | None = None,
response_json: dict[str, Any] | None = None,
status: str,
duration_ms: int = 0,
error_message: str | None = None,
) -> AgentToolCallRead:
self._ensure_ready()
tool_call = AgentToolCall(
run_id=run_id,
tool_type=tool_type,
tool_name=tool_name,
request_json=request_json or {},
response_json=response_json or {},
status=status,
duration_ms=duration_ms,
error_message=error_message,
)
created = self.repository.create_tool_call(tool_call)
logger.info("Recorded tool call run_id=%s tool=%s", run_id, tool_name)
return AgentToolCallRead.model_validate(created)
2026-05-18 02:53:06 +00:00
def update_tool_call(
self,
tool_call_id: str,
*,
request_json: dict[str, Any] | None = None,
response_json: dict[str, Any] | None = None,
status: str | None = None,
duration_ms: int | None = None,
error_message: str | None = None,
) -> AgentToolCallRead:
self._ensure_ready()
tool_call = self.repository.get_tool_call(tool_call_id)
if tool_call is None:
raise LookupError("Tool call not found")
if request_json is not None:
tool_call.request_json = request_json
if response_json is not None:
tool_call.response_json = response_json
if status is not None:
tool_call.status = status
if duration_ms is not None:
tool_call.duration_ms = duration_ms
tool_call.error_message = error_message
updated = self.repository.save_tool_call(tool_call)
logger.info("Updated tool call id=%s status=%s", updated.id, updated.status)
return AgentToolCallRead.model_validate(updated)
2026-05-11 03:51:24 +00:00
def record_semantic_parse(
self,
*,
run_id: str,
user_id: str | None,
raw_query: str,
scenario: str,
intent: str,
entities_json: list[Any] | None = None,
time_range_json: dict[str, Any] | None = None,
metrics_json: list[Any] | None = None,
constraints_json: list[Any] | None = None,
risk_flags_json: list[Any] | None = None,
permission_json: dict[str, Any] | None = None,
confidence: float = 0.0,
) -> SemanticParseRead:
self._ensure_ready()
semantic_parse = SemanticParseLog(
run_id=run_id,
user_id=user_id,
raw_query=raw_query,
scenario=scenario,
intent=intent,
entities_json=entities_json or [],
time_range_json=time_range_json or {},
metrics_json=metrics_json or [],
constraints_json=constraints_json or [],
risk_flags_json=risk_flags_json or [],
permission_json=permission_json or {},
confidence=confidence,
)
created = self.repository.create_semantic_parse(semantic_parse)
logger.info(
"Recorded semantic parse run_id=%s scenario=%s intent=%s", run_id, scenario, intent
)
return SemanticParseRead.model_validate(created)
def _ensure_ready(self) -> None:
AgentFoundationService(self.db).ensure_foundation_ready()
2026-05-18 02:53:06 +00:00
def _reconcile_stale_knowledge_index_runs(self, *, target_run_id: str | None = None) -> None:
runs = self.repository.list(
agent=AgentName.HERMES.value,
status=AgentRunStatus.RUNNING.value,
limit=200,
)
now = datetime.now(UTC)
for run in runs:
if target_run_id is not None and run.run_id != target_run_id:
continue
route_json = dict(run.route_json or {})
if str(route_json.get("job_type") or "").strip() not in KNOWLEDGE_SYNC_JOB_TYPES:
2026-05-18 02:53:06 +00:00
continue
heartbeat_at = self._parse_heartbeat_time(
str(route_json.get("heartbeat_at") or "").strip()
)
last_seen_at = heartbeat_at or run.started_at
if last_seen_at.tzinfo is None:
last_seen_at = last_seen_at.replace(tzinfo=UTC)
if now - last_seen_at <= KNOWLEDGE_SYNC_HEARTBEAT_TIMEOUT:
continue
stale_document_ids = [
str(document_id).strip()
for document_id in list(route_json.get("requested_document_ids") or [])
if str(document_id).strip()
]
if stale_document_ids:
from app.services.knowledge import (
KNOWLEDGE_INGEST_STATUS_FAILED,
KnowledgeService,
)
KnowledgeService(db=self.db).set_document_ingest_statuses(
stale_document_ids,
KNOWLEDGE_INGEST_STATUS_FAILED,
agent_run_id=run.run_id,
)
route_json.update(
{
"phase": "stale_failed",
"heartbeat_at": now.isoformat(),
}
)
run.route_json = route_json
run.status = AgentRunStatus.FAILED.value
run.result_summary = "知识归纳任务长时间无心跳,系统已自动标记失败。"
run.error_message = "Knowledge index heartbeat timed out."
run.finished_at = now
self.repository.save_run(run)
logger.warning("Marked stale knowledge index run as failed run_id=%s", run.run_id)
@staticmethod
def _parse_heartbeat_time(raw_value: str) -> datetime | None:
normalized = str(raw_value or "").strip()
if not normalized:
return None
try:
return datetime.fromisoformat(normalized)
except ValueError:
return None
def _serialize_run(
self,
run: AgentRun,
*,
enrich_knowledge_ingest: bool = False,
) -> AgentRunRead:
2026-05-11 03:51:24 +00:00
semantic_parse = run.semantic_parse_logs[0] if run.semantic_parse_logs else None
route_json = run.route_json
if enrich_knowledge_ingest:
route_json = enrich_knowledge_ingest_route_json(
dict(run.route_json or {}),
storage_root=get_settings().resolved_storage_root_dir,
)
2026-05-11 03:51:24 +00:00
return AgentRunRead(
id=run.id,
run_id=run.run_id,
agent=run.agent,
source=run.source,
user_id=run.user_id,
task_id=run.task_id,
ontology_json=run.ontology_json,
route_json=route_json,
2026-05-11 03:51:24 +00:00
permission_level=run.permission_level,
status=run.status,
result_summary=run.result_summary,
error_message=run.error_message,
started_at=run.started_at,
finished_at=run.finished_at,
tool_calls=[AgentToolCallRead.model_validate(item) for item in run.tool_calls],
semantic_parse=SemanticParseRead.model_validate(semantic_parse)
if semantic_parse
else None,
)