Files
X-Financial/server/src/app/services/orchestrator.py
caoxiaozhu edb484e2f6 refactor: update orchestrator service and travel form view
- services/orchestrator.py: update orchestrator service
- views/TravelReimbursementCreateView.vue: update travel form view
- views/scripts/TravelReimbursementCreateView.js: update travel form script
2026-05-13 15:40:41 +00:00

1450 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from time import perf_counter
from typing import Any
from sqlalchemy import and_, func, or_, select
from sqlalchemy.orm import Session
from app.core.agent_enums import (
AgentAssetStatus,
AgentAssetType,
AgentName,
AgentPermissionLevel,
AgentRunSource,
AgentRunStatus,
AgentToolType,
)
from app.core.logging import get_logger
from app.models.employee import Employee
from app.models.financial_record import (
AccountsPayableRecord,
AccountsReceivableRecord,
ExpenseClaim,
)
from app.schemas.agent_asset import AgentAssetListItem, AgentAssetRead
from app.schemas.ontology import OntologyParseRequest, OntologyParseResult
from app.schemas.orchestrator import (
OrchestratorRequest,
OrchestratorResponse,
OrchestratorTraceSummary,
)
from app.schemas.user_agent import UserAgentRequest, UserAgentResponse
from app.services.agent_assets import AgentAssetService
from app.services.agent_conversations import AgentConversationService
from app.services.expense_claims import ExpenseClaimService
from app.services.agent_foundation import AgentFoundationService
from app.services.agent_runs import AgentRunService
from app.services.ontology import SemanticOntologyService
from app.services.user_agent import UserAgentService
logger = get_logger("app.services.orchestrator")
SCENARIO_TO_DOMAIN = {
"expense": "expense",
"accounts_receivable": "ar",
"accounts_payable": "ap",
"knowledge": "knowledge",
"unknown": "system",
}
@dataclass(slots=True)
class ExecutionOutcome:
status: str
result: dict[str, Any]
degraded: bool
tool_count: int
failed_tool_count: int
PRIVILEGED_EXPENSE_QUERY_ROLE_CODES = {"finance"}
SELF_REFERENCE_KEYWORDS = ("我的", "我自己", "本人", "我名下", "给我查", "我提交", "我申请")
EXPENSE_QUERY_RECENT_WINDOW_DAYS = 10
EXPENSE_QUERY_PREVIEW_LIMIT = 20
EXPENSE_STATUS_LABELS = {
"draft": "草稿",
"submitted": "已提交",
"review": "审核中",
"approved": "已通过",
"rejected": "已驳回",
"paid": "已付款",
}
EXPENSE_STATUS_GROUP_LABELS = {
"draft": "草稿",
"in_progress": "审批中",
"completed": "审批完成",
"other": "其他状态",
}
EXPENSE_STATUS_GROUP_ORDER = ("draft", "in_progress", "completed", "other")
EXPENSE_TYPE_LABELS = {
"travel": "差旅费",
"hotel": "住宿费",
"transport": "交通费",
"meal": "餐费",
"meeting": "会务费",
"entertainment": "业务招待费",
"office": "办公费",
"training": "培训费",
"communication": "通讯费",
"welfare": "福利费",
"other": "其他费用",
}
class OrchestratorService:
def __init__(self, db: Session) -> None:
self.db = db
self.asset_service = AgentAssetService(db)
self.conversation_service = AgentConversationService(db)
self.expense_claim_service = ExpenseClaimService(db)
self.run_service = AgentRunService(db)
self.ontology_service = SemanticOntologyService(db)
self.user_agent_service = UserAgentService(db)
def run(self, payload: OrchestratorRequest) -> OrchestratorResponse:
AgentFoundationService(self.db).ensure_foundation_ready()
context_json = dict(payload.context_json or {})
conversation_id = str(payload.conversation_id or "").strip() or None
conversation = None
if payload.source == AgentRunSource.USER_MESSAGE.value:
conversation = self.conversation_service.get_or_create_conversation(
conversation_id=conversation_id,
user_id=payload.user_id,
source=payload.source,
context_json=context_json,
)
conversation_id = conversation.conversation_id
context_json = self.conversation_service.hydrate_context_json(
conversation=conversation,
context_json=context_json,
)
route_json: dict[str, Any] = {
"orchestrated_by": AgentName.ORCHESTRATOR.value,
"stage": "created",
}
if conversation_id:
route_json["conversation_id"] = conversation_id
run = self.run_service.create_run(
agent=AgentName.ORCHESTRATOR.value,
source=payload.source,
user_id=payload.user_id,
task_id=payload.task_id,
ontology_json={},
route_json=route_json,
permission_level=AgentPermissionLevel.READ.value,
status=AgentRunStatus.RUNNING.value,
result_summary="Orchestrator 已接收请求。",
)
try:
message, task_asset = self._resolve_message(payload)
if conversation is not None:
self.conversation_service.append_message(
conversation_id=conversation.conversation_id,
role="user",
content=message,
run_id=run.run_id,
message_json={
"attachment_names": context_json.get("attachment_names", []),
"attachment_count": context_json.get("attachment_count", 0),
"ocr_summary": context_json.get("ocr_summary", ""),
},
)
ontology = self.ontology_service.parse_for_run(
OntologyParseRequest(
query=message,
user_id=payload.user_id,
context_json=context_json,
),
run_id=run.run_id,
)
if context_json.get("simulate_orchestrator_exception"):
raise RuntimeError("simulated orchestrator exception")
selected_agent, route_reason = self._select_agent(payload, ontology)
capabilities = self._select_capabilities(
payload=payload,
ontology=ontology,
task_asset=task_asset,
)
selected_capability_codes = self._flatten_capability_codes(capabilities)
requires_confirmation = (
ontology.permission.level == AgentPermissionLevel.APPROVAL_REQUIRED.value
)
route_json = {
"orchestrated_by": AgentName.ORCHESTRATOR.value,
"stage": "routed",
"selected_agent": selected_agent,
"route_reason": route_reason,
"selected_capability_codes": selected_capability_codes,
"ontology_run_id": ontology.run_id,
}
if ontology.permission.level == AgentPermissionLevel.FORBIDDEN.value:
outcome = ExecutionOutcome(
status=AgentRunStatus.BLOCKED.value,
result={
"message": ontology.permission.reason,
"clarification_question": ontology.clarification_question,
"degraded": False,
},
degraded=False,
tool_count=0,
failed_tool_count=0,
)
selected_agent = None
route_reason = "permission_forbidden"
route_json["stage"] = "blocked"
route_json["route_reason"] = route_reason
elif ontology.clarification_required:
if selected_agent == AgentName.USER_AGENT.value and ontology.scenario == "expense":
clarification_response = self.user_agent_service.respond(
UserAgentRequest(
run_id=run.run_id,
user_id=payload.user_id,
message=payload.message or "",
ontology=ontology,
context_json=context_json,
tool_payload={"clarification_required": True},
selected_capability_codes=selected_capability_codes,
degraded=False,
requires_confirmation=requires_confirmation,
)
)
clarification_result = self._build_user_agent_result(
clarification_response,
degraded=False,
)
clarification_result.update(
{
"clarification_required": True,
"missing_slots": ontology.missing_slots,
"ambiguity": ontology.ambiguity,
"parse_strategy": ontology.parse_strategy,
}
)
outcome = ExecutionOutcome(
status=AgentRunStatus.BLOCKED.value,
result=clarification_result,
degraded=False,
tool_count=0,
failed_tool_count=0,
)
else:
outcome = ExecutionOutcome(
status=AgentRunStatus.BLOCKED.value,
result={
"message": ontology.clarification_question or "需要补充更多上下文。",
"clarification_required": True,
"missing_slots": ontology.missing_slots,
"ambiguity": ontology.ambiguity,
"parse_strategy": ontology.parse_strategy,
"degraded": False,
},
degraded=False,
tool_count=0,
failed_tool_count=0,
)
route_reason = "clarification_required"
route_json["stage"] = "clarification"
route_json["route_reason"] = route_reason
elif selected_agent == AgentName.HERMES.value:
outcome = self._execute_hermes(
payload=payload,
run_id=run.run_id,
ontology=ontology,
capabilities=capabilities,
requires_confirmation=requires_confirmation,
task_asset=task_asset,
context_json=context_json,
)
else:
outcome = self._execute_user_agent(
payload=payload,
run_id=run.run_id,
ontology=ontology,
capabilities=capabilities,
requires_confirmation=requires_confirmation,
context_json=context_json,
)
final_status = (
AgentRunStatus.BLOCKED.value
if requires_confirmation
and outcome.status == AgentRunStatus.SUCCEEDED.value
and ontology.permission.level == AgentPermissionLevel.APPROVAL_REQUIRED.value
else outcome.status
)
response_status = self._normalize_response_status(final_status)
result_message = (
str(outcome.result.get("message", "")).strip()
or "Orchestrator 执行完成。"
)
trace_summary = OrchestratorTraceSummary(
scenario=ontology.scenario,
intent=ontology.intent,
tool_count=outcome.tool_count,
failed_tool_count=outcome.failed_tool_count,
selected_capability_codes=selected_capability_codes,
degraded=outcome.degraded,
)
self.run_service.update_run(
run.run_id,
agent=selected_agent or AgentName.ORCHESTRATOR.value,
ontology_json=self._build_ontology_json(ontology),
route_json={
**route_json,
"requires_confirmation": requires_confirmation,
"degraded": outcome.degraded,
},
permission_level=ontology.permission.level,
status=final_status,
result_summary=result_message,
error_message=None,
finished_at=datetime.now(UTC),
)
if conversation is not None and conversation_id:
draft_payload = outcome.result.get("draft_payload")
self.conversation_service.update_state(
conversation_id=conversation_id,
run_id=run.run_id,
scenario=ontology.scenario,
intent=ontology.intent,
context_json=context_json,
draft_payload=draft_payload if isinstance(draft_payload, dict) else None,
)
self.conversation_service.append_message(
conversation_id=conversation_id,
role="assistant",
content=result_message,
run_id=run.run_id,
message_json={
"status": final_status,
"scenario": ontology.scenario,
"intent": ontology.intent,
"attachment_names": context_json.get("attachment_names", []),
"attachment_count": context_json.get("attachment_count", 0),
"draft_payload": draft_payload if isinstance(draft_payload, dict) else None,
"orchestrator_payload": {
"run_id": run.run_id,
"conversation_id": conversation_id,
"selected_agent": selected_agent,
"route_reason": route_reason,
"permission_level": ontology.permission.level,
"status": response_status,
"requires_confirmation": requires_confirmation,
"trace_summary": trace_summary.model_dump(),
"result": outcome.result,
},
},
)
return OrchestratorResponse(
run_id=run.run_id,
conversation_id=conversation_id,
selected_agent=selected_agent,
route_reason=route_reason,
permission_level=ontology.permission.level,
status=response_status,
result=outcome.result,
requires_confirmation=requires_confirmation,
trace_summary=trace_summary,
)
except Exception as exc:
logger.exception("Orchestrator run failed run_id=%s", run.run_id)
self.run_service.update_run(
run.run_id,
agent=AgentName.ORCHESTRATOR.value,
route_json={**route_json, "stage": "failed"},
status=AgentRunStatus.FAILED.value,
result_summary="Orchestrator 执行失败。",
error_message=str(exc),
finished_at=datetime.now(UTC),
)
if conversation is not None and conversation_id:
self.conversation_service.update_state(
conversation_id=conversation_id,
run_id=run.run_id,
scenario=None,
intent=None,
context_json=context_json,
draft_payload=None,
)
self.conversation_service.append_message(
conversation_id=conversation_id,
role="assistant",
content=f"Orchestrator 执行失败:{exc}",
run_id=run.run_id,
message_json={"status": AgentRunStatus.FAILED.value},
)
return OrchestratorResponse(
run_id=run.run_id,
conversation_id=conversation_id,
selected_agent=None,
route_reason="orchestrator_exception",
permission_level=AgentPermissionLevel.READ.value,
status="failed",
result={"message": f"Orchestrator 执行失败:{exc}"},
requires_confirmation=False,
trace_summary=OrchestratorTraceSummary(
scenario="unknown",
intent="query",
tool_count=0,
failed_tool_count=0,
selected_capability_codes=[],
degraded=False,
),
)
def _resolve_message(
self,
payload: OrchestratorRequest,
) -> tuple[str, AgentAssetRead | None]:
task_asset = None
if payload.task_id:
task_asset = self.asset_service.get_asset(payload.task_id)
if payload.message and payload.message.strip():
return payload.message.strip(), task_asset
if task_asset is not None:
description = str(task_asset.description or "").strip()
scenario_text = " ".join(str(item) for item in task_asset.scenario_json)
message = f"{task_asset.name} {description} {scenario_text}".strip()
return message, task_asset
if payload.source == AgentRunSource.SCHEDULE.value:
return "定时风险巡检任务", task_asset
raise ValueError("message 或 task_id 至少需要提供一个。")
@staticmethod
def _select_agent(
payload: OrchestratorRequest,
ontology: OntologyParseResult,
) -> tuple[str, str]:
if payload.source == AgentRunSource.SCHEDULE.value:
return AgentName.HERMES.value, "schedule_source_defaults_to_hermes"
if payload.source == AgentRunSource.SYSTEM_EVENT.value and ontology.intent == "risk_check":
return AgentName.HERMES.value, "system_event_risk_check_routes_to_hermes"
if ontology.intent == "risk_check" and payload.source == AgentRunSource.SCHEDULE.value:
return AgentName.HERMES.value, "scheduled_risk_check_routes_to_hermes"
if ontology.intent in {"query", "explain", "draft", "compare", "operate"}:
return AgentName.USER_AGENT.value, f"{ontology.intent}_routes_to_user_agent"
return AgentName.USER_AGENT.value, "user_message_defaults_to_user_agent"
def _select_capabilities(
self,
*,
payload: OrchestratorRequest,
ontology: OntologyParseResult,
task_asset: AgentAssetRead | None,
) -> dict[str, list[AgentAssetListItem | AgentAssetRead]]:
domain_value = SCENARIO_TO_DOMAIN.get(ontology.scenario)
rules = self._rank_assets(
self.asset_service.list_assets(
asset_type=AgentAssetType.RULE.value,
status=AgentAssetStatus.ACTIVE.value,
domain=domain_value if domain_value not in {"knowledge", "system"} else None,
),
ontology,
)
skills = self._rank_assets(
self.asset_service.list_assets(
asset_type=AgentAssetType.SKILL.value,
status=AgentAssetStatus.ACTIVE.value,
domain=domain_value if domain_value not in {"system"} else None,
),
ontology,
)
mcps = self._rank_assets(
self.asset_service.list_assets(
asset_type=AgentAssetType.MCP.value,
status=AgentAssetStatus.ACTIVE.value,
),
ontology,
)
tasks: list[AgentAssetListItem | AgentAssetRead] = []
if task_asset is not None and task_asset.status == AgentAssetStatus.ACTIVE.value:
tasks.append(task_asset)
elif payload.source == AgentRunSource.SCHEDULE.value:
tasks = self._rank_assets(
self.asset_service.list_assets(
asset_type=AgentAssetType.TASK.value,
status=AgentAssetStatus.ACTIVE.value,
),
ontology,
)
return {
"rules": rules,
"skills": skills,
"mcps": mcps,
"tasks": tasks,
}
def _execute_user_agent(
self,
*,
payload: OrchestratorRequest,
run_id: str,
ontology: OntologyParseResult,
capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]],
requires_confirmation: bool,
context_json: dict[str, Any],
) -> ExecutionOutcome:
selected_capability_codes = self._flatten_capability_codes(capabilities)
if requires_confirmation:
response, degraded = self._invoke_tool(
run_id=run_id,
tool_type=AgentToolType.LLM.value,
tool_name="user_agent.confirmation_placeholder",
request_json={
"message": payload.message,
"permission_level": ontology.permission.level,
},
context_json=context_json,
executor=lambda: {
"confirmation_title": "操作需要确认",
"message": f"{ontology.permission.reason} 当前仅返回确认摘要,不直接执行动作。",
},
fallback_factory=lambda exc: {
"confirmation_title": "操作需要确认",
"message": f"确认摘要生成失败,已阻断自动执行:{exc}",
},
)
return ExecutionOutcome(
status=AgentRunStatus.BLOCKED.value,
result={**response, "degraded": degraded},
degraded=degraded,
tool_count=1,
failed_tool_count=1 if degraded else 0,
)
next_step = self._resolve_next_step(ontology, payload.source)
if next_step == "query_database":
tool_payload, degraded = self._invoke_tool(
run_id=run_id,
tool_type=AgentToolType.DATABASE.value,
tool_name=self._database_tool_name(ontology.scenario),
request_json=self._build_ontology_json(ontology),
context_json=context_json,
executor=lambda: self._build_database_answer(
ontology,
user_id=payload.user_id,
context_json=context_json,
message=payload.message or "",
),
fallback_factory=lambda exc: {
"message": f"数据库查询暂时不可用,已返回降级说明:{exc}",
"degraded": True,
},
)
result = self._build_user_agent_result(
self.user_agent_service.respond(
UserAgentRequest(
run_id=run_id,
user_id=payload.user_id,
message=payload.message or "",
ontology=ontology,
context_json=context_json,
tool_payload=tool_payload,
selected_capability_codes=selected_capability_codes,
degraded=degraded,
requires_confirmation=requires_confirmation,
)
),
degraded=degraded,
)
return ExecutionOutcome(
status=AgentRunStatus.SUCCEEDED.value,
result=result,
degraded=degraded,
tool_count=1,
failed_tool_count=1 if degraded else 0,
)
if next_step == "search_knowledge":
tool_payload, degraded = self._invoke_tool(
run_id=run_id,
tool_type=AgentToolType.DATABASE.value,
tool_name="knowledge.search",
request_json=self._build_ontology_json(ontology),
context_json=context_json,
executor=lambda: self._build_knowledge_answer(ontology, capabilities),
fallback_factory=lambda exc: {
"message": f"知识检索暂时不可用,建议稍后重试:{exc}",
"degraded": True,
},
)
result = self._build_user_agent_result(
self.user_agent_service.respond(
UserAgentRequest(
run_id=run_id,
user_id=payload.user_id,
message=payload.message or "",
ontology=ontology,
context_json=context_json,
tool_payload=tool_payload,
selected_capability_codes=selected_capability_codes,
degraded=degraded,
requires_confirmation=requires_confirmation,
)
),
degraded=degraded,
)
return ExecutionOutcome(
status=AgentRunStatus.SUCCEEDED.value,
result=result,
degraded=degraded,
tool_count=1,
failed_tool_count=1 if degraded else 0,
)
if next_step == "run_rule":
tool_payload, degraded = self._invoke_tool(
run_id=run_id,
tool_type=AgentToolType.RULE_ENGINE.value,
tool_name=self._rule_tool_name(capabilities),
request_json=self._build_ontology_json(ontology),
context_json=context_json,
executor=lambda: self._build_rule_answer(ontology),
fallback_factory=lambda exc: {
"message": f"规则检查暂时不可用,已返回人工复核建议:{exc}",
"degraded": True,
},
)
result = self._build_user_agent_result(
self.user_agent_service.respond(
UserAgentRequest(
run_id=run_id,
user_id=payload.user_id,
message=payload.message or "",
ontology=ontology,
context_json=context_json,
tool_payload=tool_payload,
selected_capability_codes=selected_capability_codes,
degraded=degraded,
requires_confirmation=requires_confirmation,
)
),
degraded=degraded,
)
return ExecutionOutcome(
status=AgentRunStatus.SUCCEEDED.value,
result=result,
degraded=degraded,
tool_count=1,
failed_tool_count=1 if degraded else 0,
)
tool_type = AgentToolType.LLM.value
tool_name = "user_agent.draft_placeholder"
executor = lambda: {
"message": (
f"已生成 {ontology.scenario} 场景草稿,"
"占位能力后续由 Day 5 User Agent 接管。"
),
"draft_only": True,
}
fallback_factory = lambda exc: {
"message": f"草稿生成暂时不可用,请稍后再试:{exc}",
"degraded": True,
}
if ontology.scenario == "expense":
tool_type = AgentToolType.DATABASE.value
tool_name = "database.expense_claims.upsert_draft"
executor = lambda: self.expense_claim_service.upsert_draft_from_ontology(
run_id=run_id,
user_id=payload.user_id,
message=payload.message or "",
ontology=ontology,
context_json=context_json,
)
fallback_factory = lambda exc: {
"message": f"报销草稿落库失败,请稍后再试:{exc}",
"degraded": True,
}
tool_payload, degraded = self._invoke_tool(
run_id=run_id,
tool_type=tool_type,
tool_name=tool_name,
request_json=self._build_ontology_json(ontology),
context_json=context_json,
executor=executor,
fallback_factory=fallback_factory,
)
result = self._build_user_agent_result(
self.user_agent_service.respond(
UserAgentRequest(
run_id=run_id,
user_id=payload.user_id,
message=payload.message or "",
ontology=ontology,
context_json=context_json,
tool_payload=tool_payload,
selected_capability_codes=selected_capability_codes,
degraded=degraded,
requires_confirmation=requires_confirmation,
)
),
degraded=degraded,
)
return ExecutionOutcome(
status=AgentRunStatus.SUCCEEDED.value,
result=result,
degraded=degraded,
tool_count=1,
failed_tool_count=1 if degraded else 0,
)
def _execute_hermes(
self,
*,
payload: OrchestratorRequest,
run_id: str,
ontology: OntologyParseResult,
capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]],
requires_confirmation: bool,
task_asset: AgentAssetRead | None,
context_json: dict[str, Any],
) -> ExecutionOutcome:
if requires_confirmation:
return ExecutionOutcome(
status=AgentRunStatus.BLOCKED.value,
result={
"message": "Hermes 不会自动执行需要确认的高风险动作,已阻断。",
"degraded": False,
},
degraded=False,
tool_count=0,
failed_tool_count=0,
)
rule_response, rule_degraded = self._invoke_tool(
run_id=run_id,
tool_type=AgentToolType.RULE_ENGINE.value,
tool_name=self._rule_tool_name(capabilities),
request_json=self._build_ontology_json(ontology),
context_json=context_json,
executor=lambda: self._build_rule_answer(ontology),
fallback_factory=lambda exc: {
"message": f"规则巡检失败,已降级为待人工复核:{exc}",
"degraded": True,
},
)
mcp_response, mcp_degraded = self._invoke_tool(
run_id=run_id,
tool_type=AgentToolType.MCP.value,
tool_name=self._mcp_tool_name(capabilities),
request_json={
"task_code": task_asset.code if task_asset is not None else "",
"scenario": ontology.scenario,
},
context_json=context_json,
executor=lambda: self._build_mcp_answer(task_asset, ontology),
fallback_factory=lambda exc: {
"message": f"MCP 调用失败,已使用缓存快照降级:{exc}",
"fallback": "used_cached_snapshot",
},
)
degraded = rule_degraded or mcp_degraded
failed_tool_count = int(rule_degraded) + int(mcp_degraded)
result = {
"message": self._build_hermes_message(
task_asset=task_asset,
ontology=ontology,
rule_response=rule_response,
mcp_response=mcp_response,
degraded=degraded,
),
"report_type": task_asset.code if task_asset is not None else "hermes_runtime",
"degraded": degraded,
}
return ExecutionOutcome(
status=AgentRunStatus.SUCCEEDED.value,
result=result,
degraded=degraded,
tool_count=2,
failed_tool_count=failed_tool_count,
)
@staticmethod
def _resolve_next_step(ontology: OntologyParseResult, source: str) -> str:
if ontology.clarification_required:
return "ask_clarification"
if ontology.intent == "draft":
return "create_draft"
if ontology.scenario == "knowledge" or ontology.intent == "explain":
return "search_knowledge"
if ontology.intent == "risk_check" or source == AgentRunSource.SCHEDULE.value:
return "run_rule"
if ontology.intent in {"query", "compare"}:
return "query_database"
return "create_draft"
@staticmethod
def _flatten_capability_codes(
capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]],
) -> list[str]:
codes: list[str] = []
for items in capabilities.values():
for item in items[:2]:
if item.code not in codes:
codes.append(item.code)
return codes
def _rank_assets(
self,
items: list[AgentAssetListItem],
ontology: OntologyParseResult,
) -> list[AgentAssetListItem]:
def score(item: AgentAssetListItem) -> tuple[int, str]:
item_tags = {str(value) for value in item.scenario_json or []}
weight = 0
if ontology.scenario in item_tags:
weight += 3
if ontology.intent in item_tags:
weight += 2
for risk_flag in ontology.risk_flags:
if risk_flag in item_tags:
weight += 4
return weight, item.code
ranked = sorted(items, key=score, reverse=True)
if not ranked:
return []
scored = [item for item in ranked if score(item)[0] > 0]
return scored or ranked[:1]
def _invoke_tool(
self,
*,
run_id: str,
tool_type: str,
tool_name: str,
request_json: dict[str, Any],
context_json: dict[str, Any],
executor,
fallback_factory,
) -> tuple[dict[str, Any], bool]:
started = perf_counter()
try:
self._maybe_raise_simulated_failure(tool_type, context_json)
response = executor()
duration_ms = int((perf_counter() - started) * 1000)
self.run_service.record_tool_call(
run_id=run_id,
tool_type=tool_type,
tool_name=tool_name,
request_json=request_json,
response_json=response,
status="succeeded",
duration_ms=duration_ms,
)
return response, False
except Exception as exc:
duration_ms = int((perf_counter() - started) * 1000)
response = fallback_factory(exc)
self.run_service.record_tool_call(
run_id=run_id,
tool_type=tool_type,
tool_name=tool_name,
request_json=request_json,
response_json=response,
status="failed",
duration_ms=duration_ms,
error_message=str(exc),
)
return response, True
@staticmethod
def _maybe_raise_simulated_failure(tool_type: str, context_json: dict[str, Any]) -> None:
expected = str(context_json.get("simulate_tool_failure") or "").strip().lower()
if not expected:
return
if expected == tool_type.lower():
raise RuntimeError(f"simulated {tool_type} failure")
def _build_database_answer(
self,
ontology: OntologyParseResult,
*,
user_id: str | None,
context_json: dict[str, Any],
message: str,
) -> dict[str, Any]:
if ontology.scenario == "expense":
conditions, scope_label, scoped_to_current_user = self._build_expense_query_scope(
ontology=ontology,
user_id=user_id,
context_json=context_json,
message=message,
)
count_stmt = select(func.count()).select_from(ExpenseClaim)
amount_stmt = select(func.coalesce(func.sum(ExpenseClaim.amount), 0)).select_from(ExpenseClaim)
for condition in conditions:
count_stmt = count_stmt.where(condition)
amount_stmt = amount_stmt.where(condition)
total_count = int(self.db.scalar(count_stmt) or 0)
total_amount = float(self.db.scalar(amount_stmt) or 0)
recent_window_applied = self._should_limit_expense_query_to_recent_window(ontology)
display_count = total_count
display_amount = total_amount
older_record_count = 0
display_conditions = list(conditions)
window_start_date: str | None = None
window_end_date: str | None = None
if recent_window_applied:
reference_now = self._resolve_reference_now(context_json)
recent_window_start, recent_window_end = self._resolve_expense_recent_window_bounds(reference_now)
recent_condition = self._build_expense_recent_window_condition(
recent_window_start,
recent_window_end,
)
display_conditions.append(recent_condition)
window_start_date = recent_window_start.date().isoformat()
window_end_date = (recent_window_end - timedelta(microseconds=1)).date().isoformat()
recent_count_stmt = select(func.count()).select_from(ExpenseClaim).where(recent_condition)
recent_amount_stmt = select(func.coalesce(func.sum(ExpenseClaim.amount), 0)).select_from(ExpenseClaim).where(
recent_condition
)
for condition in conditions:
recent_count_stmt = recent_count_stmt.where(condition)
recent_amount_stmt = recent_amount_stmt.where(condition)
display_count = int(self.db.scalar(recent_count_stmt) or 0)
display_amount = float(self.db.scalar(recent_amount_stmt) or 0)
older_record_count = max(0, total_count - display_count)
preview_stmt = (
select(ExpenseClaim)
.order_by(
func.coalesce(
ExpenseClaim.submitted_at,
ExpenseClaim.created_at,
ExpenseClaim.occurred_at,
).desc(),
ExpenseClaim.occurred_at.desc(),
)
.limit(EXPENSE_QUERY_PREVIEW_LIMIT)
)
for condition in display_conditions:
preview_stmt = preview_stmt.where(condition)
preview_claims = list(self.db.scalars(preview_stmt).all())
status_groups = self._build_expense_status_groups(display_conditions)
return {
"result_type": "expense_claim_list",
"record_count": display_count,
"total_amount": round(display_amount, 2),
"scope_label": scope_label,
"scoped_to_current_user": scoped_to_current_user,
"recent_window_applied": recent_window_applied,
"window_days": EXPENSE_QUERY_RECENT_WINDOW_DAYS if recent_window_applied else None,
"window_start_date": window_start_date,
"window_end_date": window_end_date,
"preview_count": len(preview_claims),
"older_record_count": older_record_count,
"records": [
self._build_expense_query_record(claim)
for claim in preview_claims
],
"status_groups": status_groups,
"has_more_in_window": display_count > len(preview_claims),
"total_matched_count": total_count,
}
if ontology.scenario == "accounts_receivable":
total_count = int(
self.db.scalar(
select(func.count()).select_from(AccountsReceivableRecord)
)
or 0
)
total_amount = float(
self.db.scalar(
select(func.coalesce(func.sum(AccountsReceivableRecord.amount_outstanding), 0))
)
or 0
)
return {
"record_count": total_count,
"outstanding_amount": round(total_amount, 2),
}
total_count = int(
self.db.scalar(select(func.count()).select_from(AccountsPayableRecord))
or 0
)
total_amount = float(
self.db.scalar(
select(func.coalesce(func.sum(AccountsPayableRecord.amount_outstanding), 0))
)
or 0
)
return {
"record_count": total_count,
"outstanding_amount": round(total_amount, 2),
}
@staticmethod
def _should_limit_expense_query_to_recent_window(
ontology: OntologyParseResult,
) -> bool:
has_explicit_claim_no = any(
item.type == "expense_claim"
and str(item.normalized_value or item.value or "").strip()
for item in ontology.entities
)
has_explicit_time_range = bool(
ontology.time_range.start_date or ontology.time_range.end_date
)
return not has_explicit_claim_no and not has_explicit_time_range
@staticmethod
def _resolve_reference_now(context_json: dict[str, Any]) -> datetime:
raw_value = str(context_json.get("client_now_iso") or "").strip()
if raw_value:
normalized = raw_value.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=UTC)
return parsed.astimezone(UTC)
except ValueError:
pass
return datetime.now(UTC)
@staticmethod
def _resolve_expense_recent_window_bounds(
reference_now: datetime,
) -> tuple[datetime, datetime]:
normalized_now = reference_now.astimezone(UTC)
window_end = normalized_now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
window_start = window_end - timedelta(days=EXPENSE_QUERY_RECENT_WINDOW_DAYS)
return window_start, window_end
@staticmethod
def _build_expense_recent_window_condition(
window_start: datetime,
window_end: datetime,
) -> Any:
document_datetime = func.coalesce(
ExpenseClaim.submitted_at,
ExpenseClaim.created_at,
ExpenseClaim.occurred_at,
)
return and_(document_datetime >= window_start, document_datetime < window_end)
def _build_expense_status_groups(
self,
conditions: list[Any],
) -> list[dict[str, Any]]:
stmt = select(ExpenseClaim.status, func.count()).select_from(ExpenseClaim).group_by(ExpenseClaim.status)
for condition in conditions:
stmt = stmt.where(condition)
grouped_counts = {
key: 0
for key in EXPENSE_STATUS_GROUP_ORDER
}
for status, count in self.db.execute(stmt).all():
group_key, _ = self._resolve_expense_status_group(str(status or "").strip())
grouped_counts[group_key] = grouped_counts.get(group_key, 0) + int(count or 0)
return [
{
"key": key,
"label": EXPENSE_STATUS_GROUP_LABELS[key],
"count": grouped_counts.get(key, 0),
}
for key in EXPENSE_STATUS_GROUP_ORDER
if grouped_counts.get(key, 0) > 0
]
@staticmethod
def _resolve_expense_status_group(status: str) -> tuple[str, str]:
normalized = str(status or "").strip().lower()
if normalized == "draft":
return "draft", EXPENSE_STATUS_GROUP_LABELS["draft"]
if normalized in {"submitted", "review"}:
return "in_progress", EXPENSE_STATUS_GROUP_LABELS["in_progress"]
if normalized in {"approved", "paid"}:
return "completed", EXPENSE_STATUS_GROUP_LABELS["completed"]
return "other", EXPENSE_STATUS_GROUP_LABELS["other"]
@staticmethod
def _resolve_expense_query_document_datetime(
claim: ExpenseClaim,
) -> datetime | None:
return claim.submitted_at or claim.created_at or claim.occurred_at
def _build_expense_query_record(
self,
claim: ExpenseClaim,
) -> dict[str, Any]:
status_group, status_group_label = self._resolve_expense_status_group(claim.status)
document_datetime = self._resolve_expense_query_document_datetime(claim)
return {
"claim_id": claim.id,
"claim_no": claim.claim_no,
"employee_name": claim.employee_name,
"expense_type": claim.expense_type,
"expense_type_label": EXPENSE_TYPE_LABELS.get(claim.expense_type, claim.expense_type or "报销"),
"amount": round(float(claim.amount), 2),
"status": claim.status,
"status_label": EXPENSE_STATUS_LABELS.get(claim.status, claim.status or "处理中"),
"status_group": status_group,
"status_group_label": status_group_label,
"approval_stage": claim.approval_stage,
"document_date": document_datetime.date().isoformat() if document_datetime else "",
"occurred_at": claim.occurred_at.date().isoformat() if claim.occurred_at else "",
"reason": claim.reason,
"location": claim.location,
}
def _build_expense_query_scope(
self,
*,
ontology: OntologyParseResult,
user_id: str | None,
context_json: dict[str, Any],
message: str,
) -> tuple[list[Any], str, bool]:
conditions: list[Any] = []
explicit_employee_names = list(
dict.fromkeys(
str(item.value or "").strip()
for item in ontology.entities
if item.type == "employee" and str(item.value or "").strip()
)
)
expense_claim_nos = list(
dict.fromkeys(
str(item.normalized_value or item.value or "").strip().upper()
for item in ontology.entities
if item.type == "expense_claim" and str(item.normalized_value or item.value or "").strip()
)
)
expense_types = list(
dict.fromkeys(
str(item.normalized_value or item.value or "").strip()
for item in ontology.entities
if item.type == "expense_type" and str(item.normalized_value or item.value or "").strip()
)
)
status_values = list(
dict.fromkeys(
str(item.value).strip()
for item in ontology.constraints
if item.field == "status" and item.operator == "=" and str(item.value).strip()
)
)
amount_constraints = [
item
for item in ontology.constraints
if item.field == "amount" and item.operator in {">", ">=", "<", "<=", "="}
]
scope_label = "报销单"
scoped_to_current_user = False
if expense_claim_nos:
conditions.append(ExpenseClaim.claim_no.in_(expense_claim_nos))
if expense_types:
conditions.append(ExpenseClaim.expense_type.in_(expense_types))
if status_values:
conditions.append(ExpenseClaim.status.in_(status_values))
for item in amount_constraints:
amount_value = float(item.value)
if item.operator == ">":
conditions.append(ExpenseClaim.amount > amount_value)
elif item.operator == ">=":
conditions.append(ExpenseClaim.amount >= amount_value)
elif item.operator == "<":
conditions.append(ExpenseClaim.amount < amount_value)
elif item.operator == "<=":
conditions.append(ExpenseClaim.amount <= amount_value)
else:
conditions.append(ExpenseClaim.amount == amount_value)
if ontology.time_range.start_date:
conditions.append(
ExpenseClaim.occurred_at
>= datetime.fromisoformat(f"{ontology.time_range.start_date}T00:00:00+00:00")
)
if ontology.time_range.end_date:
conditions.append(
ExpenseClaim.occurred_at
<= datetime.fromisoformat(f"{ontology.time_range.end_date}T23:59:59.999999+00:00")
)
has_privileged_access = self._has_privileged_expense_query_access(context_json)
refers_to_self = self._is_self_expense_query(message)
if not has_privileged_access:
owner_conditions, owner_label = self._build_current_user_claim_conditions(
user_id=user_id,
context_json=context_json,
)
if owner_conditions:
conditions.append(or_(*owner_conditions))
scope_label = owner_label
scoped_to_current_user = True
else:
conditions.append(ExpenseClaim.id == "__no_visible_claim__")
scope_label = "你的报销单"
scoped_to_current_user = True
elif explicit_employee_names:
conditions.append(ExpenseClaim.employee_name.in_(explicit_employee_names))
scope_label = f"{''.join(explicit_employee_names)}的报销单"
elif refers_to_self:
owner_conditions, owner_label = self._build_current_user_claim_conditions(
user_id=user_id,
context_json=context_json,
)
if owner_conditions:
conditions.append(or_(*owner_conditions))
scope_label = owner_label
scoped_to_current_user = True
else:
conditions.append(ExpenseClaim.id == "__no_visible_claim__")
scope_label = "你的报销单"
scoped_to_current_user = True
else:
scope_label = "全部报销单"
return conditions, scope_label, scoped_to_current_user
def _build_current_user_claim_conditions(
self,
*,
user_id: str | None,
context_json: dict[str, Any],
) -> tuple[list[Any], str]:
normalized_user_id = str(user_id or "").strip()
employee = None
if normalized_user_id:
employee = self.db.scalar(
select(Employee)
.where(func.lower(Employee.email) == normalized_user_id.lower())
.limit(1)
)
conditions: list[Any] = []
seen: set[tuple[str, str]] = set()
def add_condition(field_name: str, value: str | None) -> None:
normalized = str(value or "").strip()
if not normalized:
return
marker = (field_name, normalized.lower())
if marker in seen:
return
seen.add(marker)
if field_name == "employee_id":
conditions.append(ExpenseClaim.employee_id == normalized)
return
conditions.append(ExpenseClaim.employee_name == normalized)
if employee is not None:
add_condition("employee_id", employee.id)
add_condition("employee_name", employee.email)
if self._employee_name_is_unique(employee):
add_condition("employee_name", employee.name)
else:
add_condition("employee_id", normalized_user_id)
add_condition("employee_name", normalized_user_id)
subject_name = (employee.name if employee is not None else "") or normalized_user_id
if subject_name:
return conditions, "你的报销单"
return conditions, "当前用户的报销单"
def _employee_name_is_unique(self, employee: Employee) -> bool:
normalized_name = str(employee.name or "").strip()
if not normalized_name:
return False
same_name_count = int(
self.db.scalar(
select(func.count()).select_from(Employee).where(Employee.name == normalized_name)
)
or 0
)
return same_name_count == 1
@staticmethod
def _has_privileged_expense_query_access(context_json: dict[str, Any]) -> bool:
role_codes = {
str(item).strip().lower()
for item in context_json.get("role_codes", [])
if str(item).strip()
}
return bool(role_codes & PRIVILEGED_EXPENSE_QUERY_ROLE_CODES)
@staticmethod
def _is_self_expense_query(message: str) -> bool:
compact_message = "".join(str(message or "").split())
return any(keyword in compact_message for keyword in SELF_REFERENCE_KEYWORDS)
@staticmethod
def _build_user_query_result(
ontology: OntologyParseResult,
response: dict[str, Any],
) -> dict[str, Any]:
if ontology.scenario == "expense":
return {
"message": (
f"已路由到 User Agent占位查询结果命中 {response['record_count']} 笔报销,"
f"金额合计 {response['total_amount']} 元。"
),
"data": response,
}
if ontology.scenario == "accounts_receivable":
return {
"message": (
f"已路由到 User Agent占位查询结果命中 {response['record_count']} 条应收,"
f"未回款金额 {response['outstanding_amount']} 元。"
),
"data": response,
}
return {
"message": (
f"已路由到 User Agent占位查询结果命中 {response['record_count']} 条应付,"
f"待付金额 {response['outstanding_amount']} 元。"
),
"data": response,
}
@staticmethod
def _build_user_agent_result(
response: UserAgentResponse,
*,
degraded: bool,
) -> dict[str, Any]:
result = {
"message": response.answer,
"answer": response.answer,
"citations": [item.model_dump() for item in response.citations],
"suggested_actions": [item.model_dump() for item in response.suggested_actions],
"risk_flags": response.risk_flags,
"requires_confirmation": response.requires_confirmation,
"degraded": degraded,
}
if response.query_payload is not None:
result["query_payload"] = response.query_payload.model_dump()
if response.draft_payload is not None:
result["draft_payload"] = response.draft_payload.model_dump()
if response.review_payload is not None:
result["review_payload"] = response.review_payload.model_dump()
return result
@staticmethod
def _build_knowledge_answer(
ontology: OntologyParseResult,
capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]],
) -> dict[str, Any]:
referenced = [item.code for item in capabilities["rules"][:1]] or [
"knowledge.policy.default"
]
return {
"message": f"已路由到 User Agent占位知识结果建议先查看 {', '.join(referenced)}",
"references": referenced,
}
@staticmethod
def _build_rule_answer(ontology: OntologyParseResult) -> dict[str, Any]:
risk_text = (
"".join(ontology.risk_flags)
if ontology.risk_flags
else "未识别到明确风险标签"
)
return {
"message": f"已完成占位规则检查,风险标签:{risk_text}",
"risk_flags": ontology.risk_flags,
}
@staticmethod
def _build_mcp_answer(
task_asset: AgentAssetRead | None,
ontology: OntologyParseResult,
) -> dict[str, Any]:
return {
"message": (
f"已调用占位 MCP 快照,任务={task_asset.code if task_asset else 'none'}"
f"scenario={ontology.scenario}"
),
"snapshot": "stubbed",
}
@staticmethod
def _build_hermes_message(
*,
task_asset: AgentAssetRead | None,
ontology: OntologyParseResult,
rule_response: dict[str, Any],
mcp_response: dict[str, Any],
degraded: bool,
) -> str:
task_code = task_asset.code if task_asset is not None else "task.unspecified"
suffix = ",其中部分能力已降级。" if degraded else ""
return (
f"Hermes 占位执行完成:任务 {task_code}"
f"场景 {ontology.scenario},规则结果={rule_response.get('message', '')}"
f"MCP 结果={mcp_response.get('message', '')}{suffix}"
)
@staticmethod
def _database_tool_name(scenario: str) -> str:
if scenario == "expense":
return "database.expense_claims.lookup"
if scenario == "accounts_receivable":
return "database.accounts_receivable.lookup"
return "database.accounts_payable.lookup"
@staticmethod
def _rule_tool_name(
capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]],
) -> str:
if capabilities["rules"]:
return capabilities["rules"][0].code
return "rule_engine.default_risk_check"
@staticmethod
def _mcp_tool_name(
capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]],
) -> str:
if capabilities["mcps"]:
return capabilities["mcps"][0].code
return "mcp.default_snapshot"
@staticmethod
def _build_ontology_json(ontology: OntologyParseResult) -> dict[str, Any]:
return {
"scenario": ontology.scenario,
"intent": ontology.intent,
"entities": [item.model_dump() for item in ontology.entities],
"time_range": ontology.time_range.model_dump(),
"metrics": [item.model_dump() for item in ontology.metrics],
"constraints": [item.model_dump() for item in ontology.constraints],
"risk_flags": ontology.risk_flags,
"permission": ontology.permission.model_dump(),
}
@staticmethod
def _normalize_response_status(status: str) -> str:
if status == AgentRunStatus.FAILED.value:
return "failed"
if status == AgentRunStatus.BLOCKED.value:
return "blocked"
return "succeeded"