chore: 更新配置和构建脚本
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.core.agent_enums import AgentPermissionLevel, AgentRunStatus
|
||||
from app.core.agent_enums import AgentName, AgentPermissionLevel, AgentRunStatus
|
||||
from app.core.logging import get_logger
|
||||
from app.models.agent_run import AgentRun, AgentToolCall, SemanticParseLog
|
||||
from app.repositories.agent_run import AgentRunRepository
|
||||
@@ -15,6 +15,8 @@ from app.services.agent_foundation import AgentFoundationService
|
||||
|
||||
logger = get_logger("app.services.agent_runs")
|
||||
|
||||
KNOWLEDGE_SYNC_HEARTBEAT_TIMEOUT = timedelta(minutes=30)
|
||||
|
||||
|
||||
class AgentRunService:
|
||||
def __init__(self, db: Session) -> None:
|
||||
@@ -30,11 +32,13 @@ class AgentRunService:
|
||||
limit: int = 20,
|
||||
) -> list[AgentRunRead]:
|
||||
self._ensure_ready()
|
||||
self._reconcile_stale_knowledge_index_runs()
|
||||
runs = self.repository.list(agent=agent, status=status, source=source, limit=limit)
|
||||
return [self._serialize_run(item) for item in runs]
|
||||
|
||||
def get_run(self, run_id: str) -> AgentRunRead | None:
|
||||
self._ensure_ready()
|
||||
self._reconcile_stale_knowledge_index_runs(target_run_id=run_id)
|
||||
run = self.repository.get_by_run_id(run_id)
|
||||
if run is None:
|
||||
return None
|
||||
@@ -174,6 +178,35 @@ class AgentRunService:
|
||||
logger.info("Recorded tool call run_id=%s tool=%s", run_id, tool_name)
|
||||
return AgentToolCallRead.model_validate(created)
|
||||
|
||||
def update_tool_call(
|
||||
self,
|
||||
tool_call_id: str,
|
||||
*,
|
||||
request_json: dict[str, Any] | None = None,
|
||||
response_json: dict[str, Any] | None = None,
|
||||
status: str | None = None,
|
||||
duration_ms: int | None = None,
|
||||
error_message: str | None = None,
|
||||
) -> AgentToolCallRead:
|
||||
self._ensure_ready()
|
||||
tool_call = self.repository.get_tool_call(tool_call_id)
|
||||
if tool_call is None:
|
||||
raise LookupError("Tool call not found")
|
||||
|
||||
if request_json is not None:
|
||||
tool_call.request_json = request_json
|
||||
if response_json is not None:
|
||||
tool_call.response_json = response_json
|
||||
if status is not None:
|
||||
tool_call.status = status
|
||||
if duration_ms is not None:
|
||||
tool_call.duration_ms = duration_ms
|
||||
tool_call.error_message = error_message
|
||||
|
||||
updated = self.repository.save_tool_call(tool_call)
|
||||
logger.info("Updated tool call id=%s status=%s", updated.id, updated.status)
|
||||
return AgentToolCallRead.model_validate(updated)
|
||||
|
||||
def record_semantic_parse(
|
||||
self,
|
||||
*,
|
||||
@@ -214,6 +247,73 @@ class AgentRunService:
|
||||
def _ensure_ready(self) -> None:
|
||||
AgentFoundationService(self.db).ensure_foundation_ready()
|
||||
|
||||
def _reconcile_stale_knowledge_index_runs(self, *, target_run_id: str | None = None) -> None:
|
||||
runs = self.repository.list(
|
||||
agent=AgentName.HERMES.value,
|
||||
status=AgentRunStatus.RUNNING.value,
|
||||
limit=200,
|
||||
)
|
||||
now = datetime.now(UTC)
|
||||
|
||||
for run in runs:
|
||||
if target_run_id is not None and run.run_id != target_run_id:
|
||||
continue
|
||||
|
||||
route_json = dict(run.route_json or {})
|
||||
if str(route_json.get("job_type") or "").strip() != "knowledge_index_sync":
|
||||
continue
|
||||
|
||||
heartbeat_at = self._parse_heartbeat_time(
|
||||
str(route_json.get("heartbeat_at") or "").strip()
|
||||
)
|
||||
last_seen_at = heartbeat_at or run.started_at
|
||||
if last_seen_at.tzinfo is None:
|
||||
last_seen_at = last_seen_at.replace(tzinfo=UTC)
|
||||
|
||||
if now - last_seen_at <= KNOWLEDGE_SYNC_HEARTBEAT_TIMEOUT:
|
||||
continue
|
||||
|
||||
stale_document_ids = [
|
||||
str(document_id).strip()
|
||||
for document_id in list(route_json.get("requested_document_ids") or [])
|
||||
if str(document_id).strip()
|
||||
]
|
||||
if stale_document_ids:
|
||||
from app.services.knowledge import (
|
||||
KNOWLEDGE_INGEST_STATUS_FAILED,
|
||||
KnowledgeService,
|
||||
)
|
||||
|
||||
KnowledgeService(db=self.db).set_document_ingest_statuses(
|
||||
stale_document_ids,
|
||||
KNOWLEDGE_INGEST_STATUS_FAILED,
|
||||
agent_run_id=run.run_id,
|
||||
)
|
||||
|
||||
route_json.update(
|
||||
{
|
||||
"phase": "stale_failed",
|
||||
"heartbeat_at": now.isoformat(),
|
||||
}
|
||||
)
|
||||
run.route_json = route_json
|
||||
run.status = AgentRunStatus.FAILED.value
|
||||
run.result_summary = "知识归纳任务长时间无心跳,系统已自动标记失败。"
|
||||
run.error_message = "Knowledge index heartbeat timed out."
|
||||
run.finished_at = now
|
||||
self.repository.save_run(run)
|
||||
logger.warning("Marked stale knowledge index run as failed run_id=%s", run.run_id)
|
||||
|
||||
@staticmethod
|
||||
def _parse_heartbeat_time(raw_value: str) -> datetime | None:
|
||||
normalized = str(raw_value or "").strip()
|
||||
if not normalized:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(normalized)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _serialize_run(run: AgentRun) -> AgentRunRead:
|
||||
semantic_parse = run.semantic_parse_logs[0] if run.semantic_parse_logs else None
|
||||
|
||||
Reference in New Issue
Block a user