from __future__ import annotations from dataclasses import dataclass from datetime import UTC, datetime, timedelta from time import perf_counter from typing import Any from sqlalchemy import and_, func, or_, select from sqlalchemy.orm import Session from app.core.agent_enums import ( AgentAssetStatus, AgentAssetType, AgentName, AgentPermissionLevel, AgentRunSource, AgentRunStatus, AgentToolType, ) from app.core.logging import get_logger from app.models.employee import Employee from app.models.financial_record import ( AccountsPayableRecord, AccountsReceivableRecord, ExpenseClaim, ) from app.schemas.agent_asset import AgentAssetListItem, AgentAssetRead from app.schemas.ontology import OntologyParseRequest, OntologyParseResult from app.schemas.orchestrator import ( OrchestratorRequest, OrchestratorResponse, OrchestratorTraceSummary, ) from app.schemas.user_agent import UserAgentRequest, UserAgentResponse from app.services.agent_assets import AgentAssetService from app.services.agent_conversations import AgentConversationService from app.services.expense_claims import ExpenseClaimService from app.services.agent_foundation import AgentFoundationService from app.services.agent_runs import AgentRunService from app.services.knowledge import KnowledgeService from app.services.ontology import SemanticOntologyService from app.services.user_agent import UserAgentService logger = get_logger("app.services.orchestrator") SCENARIO_TO_DOMAIN = { "expense": "expense", "accounts_receivable": "ar", "accounts_payable": "ap", "knowledge": "knowledge", "unknown": "system", } @dataclass(slots=True) class ExecutionOutcome: status: str result: dict[str, Any] degraded: bool tool_count: int failed_tool_count: int PRIVILEGED_EXPENSE_QUERY_ROLE_CODES = {"finance"} SELF_REFERENCE_KEYWORDS = ("我的", "我自己", "本人", "我名下", "给我查", "我提交", "我申请") EXPENSE_QUERY_RECENT_WINDOW_DAYS = 10 EXPENSE_QUERY_PREVIEW_LIMIT = 20 EXPENSE_STATUS_LABELS = { "draft": "草稿", "submitted": "已提交", "review": "审核中", "approved": "已通过", "rejected": "已驳回", "paid": "已付款", } EXPENSE_STATUS_GROUP_LABELS = { "draft": "草稿", "in_progress": "审批中", "completed": "审批完成", "other": "其他状态", } EXPENSE_STATUS_GROUP_ORDER = ("draft", "in_progress", "completed", "other") EXPENSE_TYPE_LABELS = { "travel": "差旅费", "hotel": "住宿费", "transport": "交通费", "meal": "餐费", "meeting": "会务费", "entertainment": "业务招待费", "office": "办公费", "training": "培训费", "communication": "通讯费", "welfare": "福利费", "other": "其他费用", } class OrchestratorService: def __init__(self, db: Session) -> None: self.db = db self.asset_service = AgentAssetService(db) self.conversation_service = AgentConversationService(db) self.expense_claim_service = ExpenseClaimService(db) self.knowledge_service = KnowledgeService(db=db) self.run_service = AgentRunService(db) self.ontology_service = SemanticOntologyService(db) self.user_agent_service = UserAgentService(db) def run(self, payload: OrchestratorRequest) -> OrchestratorResponse: AgentFoundationService(self.db).ensure_foundation_ready() context_json = dict(payload.context_json or {}) conversation_id = str(payload.conversation_id or "").strip() or None conversation = None if payload.source == AgentRunSource.USER_MESSAGE.value: conversation = self.conversation_service.get_or_create_conversation( conversation_id=conversation_id, user_id=payload.user_id, source=payload.source, context_json=context_json, ) conversation_id = conversation.conversation_id context_json = self.conversation_service.hydrate_context_json( conversation=conversation, context_json=context_json, message=payload.message, ) route_json: dict[str, Any] = { "orchestrated_by": AgentName.ORCHESTRATOR.value, "stage": "created", } if conversation_id: route_json["conversation_id"] = conversation_id run = self.run_service.create_run( agent=AgentName.ORCHESTRATOR.value, source=payload.source, user_id=payload.user_id, task_id=payload.task_id, ontology_json={}, route_json=route_json, permission_level=AgentPermissionLevel.READ.value, status=AgentRunStatus.RUNNING.value, result_summary="Orchestrator 已接收请求。", ) try: message, task_asset = self._resolve_message(payload) if conversation is not None: self.conversation_service.append_message( conversation_id=conversation.conversation_id, role="user", content=message, run_id=run.run_id, message_json={ "attachment_names": context_json.get("attachment_names", []), "attachment_count": context_json.get("attachment_count", 0), "ocr_summary": context_json.get("ocr_summary", ""), }, ) ontology = self.ontology_service.parse_for_run( OntologyParseRequest( query=message, user_id=payload.user_id, context_json=context_json, ), run_id=run.run_id, ) if context_json.get("simulate_orchestrator_exception"): raise RuntimeError("simulated orchestrator exception") selected_agent, route_reason = self._select_agent(payload, ontology) capabilities = self._select_capabilities( payload=payload, ontology=ontology, task_asset=task_asset, ) selected_capability_codes = self._flatten_capability_codes(capabilities) is_expense_review_action = self._is_expense_review_action(context_json) requires_confirmation = ( ontology.permission.level == AgentPermissionLevel.APPROVAL_REQUIRED.value and not is_expense_review_action ) route_json = { "orchestrated_by": AgentName.ORCHESTRATOR.value, "stage": "routed", "selected_agent": selected_agent, "route_reason": route_reason, "selected_capability_codes": selected_capability_codes, "ontology_run_id": ontology.run_id, } if ontology.permission.level == AgentPermissionLevel.FORBIDDEN.value: outcome = ExecutionOutcome( status=AgentRunStatus.BLOCKED.value, result={ "message": ontology.permission.reason, "clarification_question": ontology.clarification_question, "degraded": False, }, degraded=False, tool_count=0, failed_tool_count=0, ) selected_agent = None route_reason = "permission_forbidden" route_json["stage"] = "blocked" route_json["route_reason"] = route_reason elif ontology.clarification_required: if selected_agent == AgentName.USER_AGENT.value and ontology.scenario == "expense": clarification_response = self.user_agent_service.respond( UserAgentRequest( run_id=run.run_id, user_id=payload.user_id, message=payload.message or "", ontology=ontology, context_json=context_json, tool_payload={"clarification_required": True}, selected_capability_codes=selected_capability_codes, degraded=False, requires_confirmation=requires_confirmation, ) ) clarification_result = self._build_user_agent_result( clarification_response, degraded=False, ) clarification_result.update( { "clarification_required": True, "missing_slots": ontology.missing_slots, "ambiguity": ontology.ambiguity, "parse_strategy": ontology.parse_strategy, } ) outcome = ExecutionOutcome( status=AgentRunStatus.BLOCKED.value, result=clarification_result, degraded=False, tool_count=0, failed_tool_count=0, ) else: outcome = ExecutionOutcome( status=AgentRunStatus.BLOCKED.value, result={ "message": ontology.clarification_question or "需要补充更多上下文。", "clarification_required": True, "missing_slots": ontology.missing_slots, "ambiguity": ontology.ambiguity, "parse_strategy": ontology.parse_strategy, "degraded": False, }, degraded=False, tool_count=0, failed_tool_count=0, ) route_reason = "clarification_required" route_json["stage"] = "clarification" route_json["route_reason"] = route_reason elif selected_agent == AgentName.HERMES.value: outcome = self._execute_hermes( payload=payload, run_id=run.run_id, ontology=ontology, capabilities=capabilities, requires_confirmation=requires_confirmation, task_asset=task_asset, context_json=context_json, ) else: outcome = self._execute_user_agent( payload=payload, run_id=run.run_id, ontology=ontology, capabilities=capabilities, requires_confirmation=requires_confirmation, context_json=context_json, ) final_status = ( AgentRunStatus.BLOCKED.value if requires_confirmation and outcome.status == AgentRunStatus.SUCCEEDED.value and ontology.permission.level == AgentPermissionLevel.APPROVAL_REQUIRED.value else outcome.status ) response_status = self._normalize_response_status(final_status) result_message = ( str(outcome.result.get("message", "")).strip() or "Orchestrator 执行完成。" ) trace_summary = OrchestratorTraceSummary( scenario=ontology.scenario, intent=ontology.intent, tool_count=outcome.tool_count, failed_tool_count=outcome.failed_tool_count, selected_capability_codes=selected_capability_codes, degraded=outcome.degraded, ) self.run_service.update_run( run.run_id, agent=selected_agent or AgentName.ORCHESTRATOR.value, ontology_json=self._build_ontology_json(ontology), route_json={ **route_json, "requires_confirmation": requires_confirmation, "degraded": outcome.degraded, }, permission_level=ontology.permission.level, status=final_status, result_summary=result_message, error_message=None, finished_at=datetime.now(UTC), ) if conversation is not None and conversation_id: draft_payload = outcome.result.get("draft_payload") self.conversation_service.update_state( conversation_id=conversation_id, run_id=run.run_id, scenario=ontology.scenario, intent=ontology.intent, context_json=context_json, draft_payload=draft_payload if isinstance(draft_payload, dict) else None, ) self.conversation_service.append_message( conversation_id=conversation_id, role="assistant", content=result_message, run_id=run.run_id, message_json={ "status": final_status, "scenario": ontology.scenario, "intent": ontology.intent, "attachment_names": context_json.get("attachment_names", []), "attachment_count": context_json.get("attachment_count", 0), "draft_payload": draft_payload if isinstance(draft_payload, dict) else None, "orchestrator_payload": { "run_id": run.run_id, "conversation_id": conversation_id, "selected_agent": selected_agent, "route_reason": route_reason, "permission_level": ontology.permission.level, "status": response_status, "requires_confirmation": requires_confirmation, "trace_summary": trace_summary.model_dump(), "result": outcome.result, }, }, ) return OrchestratorResponse( run_id=run.run_id, conversation_id=conversation_id, selected_agent=selected_agent, route_reason=route_reason, permission_level=ontology.permission.level, status=response_status, result=outcome.result, requires_confirmation=requires_confirmation, trace_summary=trace_summary, ) except Exception as exc: logger.exception("Orchestrator run failed run_id=%s", run.run_id) self.run_service.update_run( run.run_id, agent=AgentName.ORCHESTRATOR.value, route_json={**route_json, "stage": "failed"}, status=AgentRunStatus.FAILED.value, result_summary="Orchestrator 执行失败。", error_message=str(exc), finished_at=datetime.now(UTC), ) if conversation is not None and conversation_id: self.conversation_service.update_state( conversation_id=conversation_id, run_id=run.run_id, scenario=None, intent=None, context_json=context_json, draft_payload=None, ) self.conversation_service.append_message( conversation_id=conversation_id, role="assistant", content=f"Orchestrator 执行失败:{exc}", run_id=run.run_id, message_json={"status": AgentRunStatus.FAILED.value}, ) return OrchestratorResponse( run_id=run.run_id, conversation_id=conversation_id, selected_agent=None, route_reason="orchestrator_exception", permission_level=AgentPermissionLevel.READ.value, status="failed", result={"message": f"Orchestrator 执行失败:{exc}"}, requires_confirmation=False, trace_summary=OrchestratorTraceSummary( scenario="unknown", intent="query", tool_count=0, failed_tool_count=0, selected_capability_codes=[], degraded=False, ), ) def _resolve_message( self, payload: OrchestratorRequest, ) -> tuple[str, AgentAssetRead | None]: task_asset = None if payload.task_id: task_asset = self.asset_service.get_asset(payload.task_id) if payload.message and payload.message.strip(): return payload.message.strip(), task_asset if task_asset is not None: description = str(task_asset.description or "").strip() scenario_text = " ".join(str(item) for item in task_asset.scenario_json) message = f"{task_asset.name} {description} {scenario_text}".strip() return message, task_asset if payload.source == AgentRunSource.SCHEDULE.value: return "定时风险巡检任务", task_asset raise ValueError("message 或 task_id 至少需要提供一个。") @staticmethod def _select_agent( payload: OrchestratorRequest, ontology: OntologyParseResult, ) -> tuple[str, str]: if payload.source == AgentRunSource.SCHEDULE.value: return AgentName.HERMES.value, "schedule_source_defaults_to_hermes" if payload.source == AgentRunSource.SYSTEM_EVENT.value and ontology.intent == "risk_check": return AgentName.HERMES.value, "system_event_risk_check_routes_to_hermes" if ontology.intent == "risk_check" and payload.source == AgentRunSource.SCHEDULE.value: return AgentName.HERMES.value, "scheduled_risk_check_routes_to_hermes" if ontology.intent in {"query", "explain", "draft", "compare", "operate"}: return AgentName.USER_AGENT.value, f"{ontology.intent}_routes_to_user_agent" return AgentName.USER_AGENT.value, "user_message_defaults_to_user_agent" def _select_capabilities( self, *, payload: OrchestratorRequest, ontology: OntologyParseResult, task_asset: AgentAssetRead | None, ) -> dict[str, list[AgentAssetListItem | AgentAssetRead]]: domain_value = SCENARIO_TO_DOMAIN.get(ontology.scenario) rules = self._rank_assets( self.asset_service.list_assets( asset_type=AgentAssetType.RULE.value, status=AgentAssetStatus.ACTIVE.value, domain=domain_value if domain_value not in {"knowledge", "system"} else None, ), ontology, ) skills = self._rank_assets( self.asset_service.list_assets( asset_type=AgentAssetType.SKILL.value, status=AgentAssetStatus.ACTIVE.value, domain=domain_value if domain_value not in {"system"} else None, ), ontology, ) mcps = self._rank_assets( self.asset_service.list_assets( asset_type=AgentAssetType.MCP.value, status=AgentAssetStatus.ACTIVE.value, ), ontology, ) tasks: list[AgentAssetListItem | AgentAssetRead] = [] if task_asset is not None and task_asset.status == AgentAssetStatus.ACTIVE.value: tasks.append(task_asset) elif payload.source == AgentRunSource.SCHEDULE.value: tasks = self._rank_assets( self.asset_service.list_assets( asset_type=AgentAssetType.TASK.value, status=AgentAssetStatus.ACTIVE.value, ), ontology, ) return { "rules": rules, "skills": skills, "mcps": mcps, "tasks": tasks, } def _execute_user_agent( self, *, payload: OrchestratorRequest, run_id: str, ontology: OntologyParseResult, capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]], requires_confirmation: bool, context_json: dict[str, Any], ) -> ExecutionOutcome: selected_capability_codes = self._flatten_capability_codes(capabilities) if requires_confirmation: response, degraded = self._invoke_tool( run_id=run_id, tool_type=AgentToolType.LLM.value, tool_name="user_agent.confirmation_placeholder", request_json={ "message": payload.message, "permission_level": ontology.permission.level, }, context_json=context_json, executor=lambda: { "confirmation_title": "操作需要确认", "message": f"{ontology.permission.reason} 当前仅返回确认摘要,不直接执行动作。", }, fallback_factory=lambda exc: { "confirmation_title": "操作需要确认", "message": f"确认摘要生成失败,已阻断自动执行:{exc}", }, ) return ExecutionOutcome( status=AgentRunStatus.BLOCKED.value, result={**response, "degraded": degraded}, degraded=degraded, tool_count=1, failed_tool_count=1 if degraded else 0, ) next_step = self._resolve_next_step( ontology, payload.source, context_json=context_json, ) if next_step == "query_database": tool_payload, degraded = self._invoke_tool( run_id=run_id, tool_type=AgentToolType.DATABASE.value, tool_name=self._database_tool_name(ontology.scenario), request_json=self._build_ontology_json(ontology), context_json=context_json, executor=lambda: self._build_database_answer( ontology, user_id=payload.user_id, context_json=context_json, message=payload.message or "", ), fallback_factory=lambda exc: { "message": f"数据库查询暂时不可用,已返回降级说明:{exc}", "degraded": True, }, ) result = self._build_user_agent_result( self.user_agent_service.respond( UserAgentRequest( run_id=run_id, user_id=payload.user_id, message=payload.message or "", ontology=ontology, context_json=context_json, tool_payload=tool_payload, selected_capability_codes=selected_capability_codes, degraded=degraded, requires_confirmation=requires_confirmation, ) ), degraded=degraded, ) return ExecutionOutcome( status=AgentRunStatus.SUCCEEDED.value, result=result, degraded=degraded, tool_count=1, failed_tool_count=1 if degraded else 0, ) if next_step == "search_knowledge": tool_payload, degraded = self._invoke_tool( run_id=run_id, tool_type=AgentToolType.DATABASE.value, tool_name="knowledge.search", request_json=self._build_ontology_json(ontology), context_json=context_json, executor=lambda: self._build_knowledge_answer( message=payload.message or "", ontology=ontology, capabilities=capabilities, context_json=context_json, ), fallback_factory=lambda exc: { "message": f"知识检索暂时不可用,建议稍后重试:{exc}", "degraded": True, }, ) result = self._build_user_agent_result( self.user_agent_service.respond( UserAgentRequest( run_id=run_id, user_id=payload.user_id, message=payload.message or "", ontology=ontology, context_json=context_json, tool_payload=tool_payload, selected_capability_codes=selected_capability_codes, degraded=degraded, requires_confirmation=requires_confirmation, ) ), degraded=degraded, ) return ExecutionOutcome( status=AgentRunStatus.SUCCEEDED.value, result=result, degraded=degraded, tool_count=1, failed_tool_count=1 if degraded else 0, ) if next_step == "run_rule": tool_payload, degraded = self._invoke_tool( run_id=run_id, tool_type=AgentToolType.RULE_ENGINE.value, tool_name=self._rule_tool_name(capabilities), request_json=self._build_ontology_json(ontology), context_json=context_json, executor=lambda: self._build_rule_answer(ontology), fallback_factory=lambda exc: { "message": f"规则检查暂时不可用,已返回人工复核建议:{exc}", "degraded": True, }, ) result = self._build_user_agent_result( self.user_agent_service.respond( UserAgentRequest( run_id=run_id, user_id=payload.user_id, message=payload.message or "", ontology=ontology, context_json=context_json, tool_payload=tool_payload, selected_capability_codes=selected_capability_codes, degraded=degraded, requires_confirmation=requires_confirmation, ) ), degraded=degraded, ) return ExecutionOutcome( status=AgentRunStatus.SUCCEEDED.value, result=result, degraded=degraded, tool_count=1, failed_tool_count=1 if degraded else 0, ) tool_type = AgentToolType.LLM.value tool_name = "user_agent.draft_placeholder" executor = lambda: { "message": ( f"已生成 {ontology.scenario} 场景草稿," "占位能力后续由 Day 5 User Agent 接管。" ), "draft_only": True, } fallback_factory = lambda exc: { "message": f"草稿生成暂时不可用,请稍后再试:{exc}", "degraded": True, } if ontology.scenario == "expense" or self._is_expense_review_action(context_json): tool_type = AgentToolType.DATABASE.value tool_name = "database.expense_claims.save_or_submit" executor = lambda: self.expense_claim_service.save_or_submit_from_ontology( run_id=run_id, user_id=payload.user_id, message=payload.message or "", ontology=ontology, context_json=context_json, ) fallback_factory = lambda exc: { "message": f"报销草稿落库失败,请稍后再试:{exc}", "degraded": True, } tool_payload, degraded = self._invoke_tool( run_id=run_id, tool_type=tool_type, tool_name=tool_name, request_json=self._build_ontology_json(ontology), context_json=context_json, executor=executor, fallback_factory=fallback_factory, ) result = self._build_user_agent_result( self.user_agent_service.respond( UserAgentRequest( run_id=run_id, user_id=payload.user_id, message=payload.message or "", ontology=ontology, context_json=context_json, tool_payload=tool_payload, selected_capability_codes=selected_capability_codes, degraded=degraded, requires_confirmation=requires_confirmation, ) ), degraded=degraded, ) return ExecutionOutcome( status=AgentRunStatus.SUCCEEDED.value, result=result, degraded=degraded, tool_count=1, failed_tool_count=1 if degraded else 0, ) def _execute_hermes( self, *, payload: OrchestratorRequest, run_id: str, ontology: OntologyParseResult, capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]], requires_confirmation: bool, task_asset: AgentAssetRead | None, context_json: dict[str, Any], ) -> ExecutionOutcome: if requires_confirmation: return ExecutionOutcome( status=AgentRunStatus.BLOCKED.value, result={ "message": "Hermes 不会自动执行需要确认的高风险动作,已阻断。", "degraded": False, }, degraded=False, tool_count=0, failed_tool_count=0, ) rule_response, rule_degraded = self._invoke_tool( run_id=run_id, tool_type=AgentToolType.RULE_ENGINE.value, tool_name=self._rule_tool_name(capabilities), request_json=self._build_ontology_json(ontology), context_json=context_json, executor=lambda: self._build_rule_answer(ontology), fallback_factory=lambda exc: { "message": f"规则巡检失败,已降级为待人工复核:{exc}", "degraded": True, }, ) mcp_response, mcp_degraded = self._invoke_tool( run_id=run_id, tool_type=AgentToolType.MCP.value, tool_name=self._mcp_tool_name(capabilities), request_json={ "task_code": task_asset.code if task_asset is not None else "", "scenario": ontology.scenario, }, context_json=context_json, executor=lambda: self._build_mcp_answer(task_asset, ontology), fallback_factory=lambda exc: { "message": f"MCP 调用失败,已使用缓存快照降级:{exc}", "fallback": "used_cached_snapshot", }, ) degraded = rule_degraded or mcp_degraded failed_tool_count = int(rule_degraded) + int(mcp_degraded) result = { "message": self._build_hermes_message( task_asset=task_asset, ontology=ontology, rule_response=rule_response, mcp_response=mcp_response, degraded=degraded, ), "report_type": task_asset.code if task_asset is not None else "hermes_runtime", "degraded": degraded, } return ExecutionOutcome( status=AgentRunStatus.SUCCEEDED.value, result=result, degraded=degraded, tool_count=2, failed_tool_count=failed_tool_count, ) @staticmethod def _resolve_next_step( ontology: OntologyParseResult, source: str, *, context_json: dict[str, Any] | None = None, ) -> str: if OrchestratorService._is_expense_review_action(context_json or {}): return "create_draft" if ontology.clarification_required: return "ask_clarification" if ontology.intent == "draft": return "create_draft" if ontology.scenario == "knowledge" or ontology.intent == "explain": return "search_knowledge" if ontology.intent == "risk_check" or source == AgentRunSource.SCHEDULE.value: return "run_rule" if ontology.intent in {"query", "compare"}: return "query_database" return "create_draft" @staticmethod def _is_expense_review_action(context_json: dict[str, Any]) -> bool: review_action = str((context_json or {}).get("review_action") or "").strip() return review_action in { "save_draft", "next_step", "edit_review", "link_to_existing_draft", "create_new_claim_from_documents", } @staticmethod def _flatten_capability_codes( capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]], ) -> list[str]: codes: list[str] = [] for items in capabilities.values(): for item in items[:2]: if item.code not in codes: codes.append(item.code) return codes def _rank_assets( self, items: list[AgentAssetListItem], ontology: OntologyParseResult, ) -> list[AgentAssetListItem]: def score(item: AgentAssetListItem) -> tuple[int, str]: item_tags = {str(value) for value in item.scenario_json or []} weight = 0 if ontology.scenario in item_tags: weight += 3 if ontology.intent in item_tags: weight += 2 for risk_flag in ontology.risk_flags: if risk_flag in item_tags: weight += 4 return weight, item.code ranked = sorted(items, key=score, reverse=True) if not ranked: return [] scored = [item for item in ranked if score(item)[0] > 0] return scored or ranked[:1] def _invoke_tool( self, *, run_id: str, tool_type: str, tool_name: str, request_json: dict[str, Any], context_json: dict[str, Any], executor, fallback_factory, ) -> tuple[dict[str, Any], bool]: started = perf_counter() try: self._maybe_raise_simulated_failure(tool_type, context_json) response = executor() duration_ms = int((perf_counter() - started) * 1000) self.run_service.record_tool_call( run_id=run_id, tool_type=tool_type, tool_name=tool_name, request_json=request_json, response_json=response, status="succeeded", duration_ms=duration_ms, ) return response, False except Exception as exc: duration_ms = int((perf_counter() - started) * 1000) response = fallback_factory(exc) self.run_service.record_tool_call( run_id=run_id, tool_type=tool_type, tool_name=tool_name, request_json=request_json, response_json=response, status="failed", duration_ms=duration_ms, error_message=str(exc), ) return response, True @staticmethod def _maybe_raise_simulated_failure(tool_type: str, context_json: dict[str, Any]) -> None: expected = str(context_json.get("simulate_tool_failure") or "").strip().lower() if not expected: return if expected == tool_type.lower(): raise RuntimeError(f"simulated {tool_type} failure") def _build_database_answer( self, ontology: OntologyParseResult, *, user_id: str | None, context_json: dict[str, Any], message: str, ) -> dict[str, Any]: if ontology.scenario == "expense": conditions, scope_label, scoped_to_current_user = self._build_expense_query_scope( ontology=ontology, user_id=user_id, context_json=context_json, message=message, ) count_stmt = select(func.count()).select_from(ExpenseClaim) amount_stmt = select(func.coalesce(func.sum(ExpenseClaim.amount), 0)).select_from(ExpenseClaim) for condition in conditions: count_stmt = count_stmt.where(condition) amount_stmt = amount_stmt.where(condition) total_count = int(self.db.scalar(count_stmt) or 0) total_amount = float(self.db.scalar(amount_stmt) or 0) recent_window_applied = self._should_limit_expense_query_to_recent_window(ontology) display_count = total_count display_amount = total_amount older_record_count = 0 display_conditions = list(conditions) window_start_date: str | None = None window_end_date: str | None = None if recent_window_applied: reference_now = self._resolve_reference_now(context_json) recent_window_start, recent_window_end = self._resolve_expense_recent_window_bounds(reference_now) recent_condition = self._build_expense_recent_window_condition( recent_window_start, recent_window_end, ) display_conditions.append(recent_condition) window_start_date = recent_window_start.date().isoformat() window_end_date = (recent_window_end - timedelta(microseconds=1)).date().isoformat() recent_count_stmt = select(func.count()).select_from(ExpenseClaim).where(recent_condition) recent_amount_stmt = select(func.coalesce(func.sum(ExpenseClaim.amount), 0)).select_from(ExpenseClaim).where( recent_condition ) for condition in conditions: recent_count_stmt = recent_count_stmt.where(condition) recent_amount_stmt = recent_amount_stmt.where(condition) display_count = int(self.db.scalar(recent_count_stmt) or 0) display_amount = float(self.db.scalar(recent_amount_stmt) or 0) older_record_count = max(0, total_count - display_count) preview_stmt = ( select(ExpenseClaim) .order_by( func.coalesce( ExpenseClaim.submitted_at, ExpenseClaim.created_at, ExpenseClaim.occurred_at, ).desc(), ExpenseClaim.occurred_at.desc(), ) .limit(EXPENSE_QUERY_PREVIEW_LIMIT) ) for condition in display_conditions: preview_stmt = preview_stmt.where(condition) preview_claims = list(self.db.scalars(preview_stmt).all()) status_groups = self._build_expense_status_groups(display_conditions) return { "result_type": "expense_claim_list", "record_count": display_count, "total_amount": round(display_amount, 2), "scope_label": scope_label, "scoped_to_current_user": scoped_to_current_user, "recent_window_applied": recent_window_applied, "window_days": EXPENSE_QUERY_RECENT_WINDOW_DAYS if recent_window_applied else None, "window_start_date": window_start_date, "window_end_date": window_end_date, "preview_count": len(preview_claims), "older_record_count": older_record_count, "records": [ self._build_expense_query_record(claim) for claim in preview_claims ], "status_groups": status_groups, "has_more_in_window": display_count > len(preview_claims), "total_matched_count": total_count, } if ontology.scenario == "accounts_receivable": total_count = int( self.db.scalar( select(func.count()).select_from(AccountsReceivableRecord) ) or 0 ) total_amount = float( self.db.scalar( select(func.coalesce(func.sum(AccountsReceivableRecord.amount_outstanding), 0)) ) or 0 ) return { "record_count": total_count, "outstanding_amount": round(total_amount, 2), } total_count = int( self.db.scalar(select(func.count()).select_from(AccountsPayableRecord)) or 0 ) total_amount = float( self.db.scalar( select(func.coalesce(func.sum(AccountsPayableRecord.amount_outstanding), 0)) ) or 0 ) return { "record_count": total_count, "outstanding_amount": round(total_amount, 2), } @staticmethod def _should_limit_expense_query_to_recent_window( ontology: OntologyParseResult, ) -> bool: has_explicit_claim_no = any( item.type == "expense_claim" and str(item.normalized_value or item.value or "").strip() for item in ontology.entities ) has_explicit_time_range = bool( ontology.time_range.start_date or ontology.time_range.end_date ) return not has_explicit_claim_no and not has_explicit_time_range @staticmethod def _resolve_reference_now(context_json: dict[str, Any]) -> datetime: raw_value = str(context_json.get("client_now_iso") or "").strip() if raw_value: normalized = raw_value.replace("Z", "+00:00") try: parsed = datetime.fromisoformat(normalized) if parsed.tzinfo is None: return parsed.replace(tzinfo=UTC) return parsed.astimezone(UTC) except ValueError: pass return datetime.now(UTC) @staticmethod def _resolve_expense_recent_window_bounds( reference_now: datetime, ) -> tuple[datetime, datetime]: normalized_now = reference_now.astimezone(UTC) window_end = normalized_now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) window_start = window_end - timedelta(days=EXPENSE_QUERY_RECENT_WINDOW_DAYS) return window_start, window_end @staticmethod def _build_expense_recent_window_condition( window_start: datetime, window_end: datetime, ) -> Any: document_datetime = func.coalesce( ExpenseClaim.submitted_at, ExpenseClaim.created_at, ExpenseClaim.occurred_at, ) return and_(document_datetime >= window_start, document_datetime < window_end) def _build_expense_status_groups( self, conditions: list[Any], ) -> list[dict[str, Any]]: stmt = select(ExpenseClaim.status, func.count()).select_from(ExpenseClaim).group_by(ExpenseClaim.status) for condition in conditions: stmt = stmt.where(condition) grouped_counts = { key: 0 for key in EXPENSE_STATUS_GROUP_ORDER } for status, count in self.db.execute(stmt).all(): group_key, _ = self._resolve_expense_status_group(str(status or "").strip()) grouped_counts[group_key] = grouped_counts.get(group_key, 0) + int(count or 0) return [ { "key": key, "label": EXPENSE_STATUS_GROUP_LABELS[key], "count": grouped_counts.get(key, 0), } for key in EXPENSE_STATUS_GROUP_ORDER if grouped_counts.get(key, 0) > 0 ] @staticmethod def _resolve_expense_status_group(status: str) -> tuple[str, str]: normalized = str(status or "").strip().lower() if normalized == "draft": return "draft", EXPENSE_STATUS_GROUP_LABELS["draft"] if normalized in {"submitted", "review"}: return "in_progress", EXPENSE_STATUS_GROUP_LABELS["in_progress"] if normalized in {"approved", "paid"}: return "completed", EXPENSE_STATUS_GROUP_LABELS["completed"] return "other", EXPENSE_STATUS_GROUP_LABELS["other"] @staticmethod def _resolve_expense_query_document_datetime( claim: ExpenseClaim, ) -> datetime | None: return claim.submitted_at or claim.created_at or claim.occurred_at def _build_expense_query_record( self, claim: ExpenseClaim, ) -> dict[str, Any]: status_group, status_group_label = self._resolve_expense_status_group(claim.status) document_datetime = self._resolve_expense_query_document_datetime(claim) return { "claim_id": claim.id, "claim_no": claim.claim_no, "employee_name": claim.employee_name, "expense_type": claim.expense_type, "expense_type_label": EXPENSE_TYPE_LABELS.get(claim.expense_type, claim.expense_type or "报销"), "amount": round(float(claim.amount), 2), "status": claim.status, "status_label": EXPENSE_STATUS_LABELS.get(claim.status, claim.status or "处理中"), "status_group": status_group, "status_group_label": status_group_label, "approval_stage": claim.approval_stage, "document_date": document_datetime.date().isoformat() if document_datetime else "", "occurred_at": claim.occurred_at.date().isoformat() if claim.occurred_at else "", "reason": claim.reason, "location": claim.location, } def _build_expense_query_scope( self, *, ontology: OntologyParseResult, user_id: str | None, context_json: dict[str, Any], message: str, ) -> tuple[list[Any], str, bool]: conditions: list[Any] = [] explicit_employee_names = list( dict.fromkeys( str(item.value or "").strip() for item in ontology.entities if item.type == "employee" and str(item.value or "").strip() ) ) expense_claim_nos = list( dict.fromkeys( str(item.normalized_value or item.value or "").strip().upper() for item in ontology.entities if item.type == "expense_claim" and str(item.normalized_value or item.value or "").strip() ) ) expense_types = list( dict.fromkeys( str(item.normalized_value or item.value or "").strip() for item in ontology.entities if item.type == "expense_type" and str(item.normalized_value or item.value or "").strip() ) ) status_values = list( dict.fromkeys( str(item.value).strip() for item in ontology.constraints if item.field == "status" and item.operator == "=" and str(item.value).strip() ) ) amount_constraints = [ item for item in ontology.constraints if item.field == "amount" and item.operator in {">", ">=", "<", "<=", "="} ] scope_label = "报销单" scoped_to_current_user = False if expense_claim_nos: conditions.append(ExpenseClaim.claim_no.in_(expense_claim_nos)) if expense_types: conditions.append(ExpenseClaim.expense_type.in_(expense_types)) if status_values: conditions.append(ExpenseClaim.status.in_(status_values)) for item in amount_constraints: amount_value = float(item.value) if item.operator == ">": conditions.append(ExpenseClaim.amount > amount_value) elif item.operator == ">=": conditions.append(ExpenseClaim.amount >= amount_value) elif item.operator == "<": conditions.append(ExpenseClaim.amount < amount_value) elif item.operator == "<=": conditions.append(ExpenseClaim.amount <= amount_value) else: conditions.append(ExpenseClaim.amount == amount_value) if ontology.time_range.start_date: conditions.append( ExpenseClaim.occurred_at >= datetime.fromisoformat(f"{ontology.time_range.start_date}T00:00:00+00:00") ) if ontology.time_range.end_date: conditions.append( ExpenseClaim.occurred_at <= datetime.fromisoformat(f"{ontology.time_range.end_date}T23:59:59.999999+00:00") ) has_privileged_access = self._has_privileged_expense_query_access(context_json) refers_to_self = self._is_self_expense_query(message) if not has_privileged_access: owner_conditions, owner_label = self._build_current_user_claim_conditions( user_id=user_id, context_json=context_json, ) if owner_conditions: conditions.append(or_(*owner_conditions)) scope_label = owner_label scoped_to_current_user = True else: conditions.append(ExpenseClaim.id == "__no_visible_claim__") scope_label = "你的报销单" scoped_to_current_user = True elif explicit_employee_names: conditions.append(ExpenseClaim.employee_name.in_(explicit_employee_names)) scope_label = f"{'、'.join(explicit_employee_names)}的报销单" elif refers_to_self: owner_conditions, owner_label = self._build_current_user_claim_conditions( user_id=user_id, context_json=context_json, ) if owner_conditions: conditions.append(or_(*owner_conditions)) scope_label = owner_label scoped_to_current_user = True else: conditions.append(ExpenseClaim.id == "__no_visible_claim__") scope_label = "你的报销单" scoped_to_current_user = True else: scope_label = "全部报销单" return conditions, scope_label, scoped_to_current_user def _build_current_user_claim_conditions( self, *, user_id: str | None, context_json: dict[str, Any], ) -> tuple[list[Any], str]: normalized_user_id = str(user_id or "").strip() employee = None if normalized_user_id: employee = self.db.scalar( select(Employee) .where(func.lower(Employee.email) == normalized_user_id.lower()) .limit(1) ) conditions: list[Any] = [] seen: set[tuple[str, str]] = set() def add_condition(field_name: str, value: str | None) -> None: normalized = str(value or "").strip() if not normalized: return marker = (field_name, normalized.lower()) if marker in seen: return seen.add(marker) if field_name == "employee_id": conditions.append(ExpenseClaim.employee_id == normalized) return conditions.append(ExpenseClaim.employee_name == normalized) if employee is not None: add_condition("employee_id", employee.id) add_condition("employee_name", employee.email) if self._employee_name_is_unique(employee): add_condition("employee_name", employee.name) else: add_condition("employee_id", normalized_user_id) add_condition("employee_name", normalized_user_id) subject_name = (employee.name if employee is not None else "") or normalized_user_id if subject_name: return conditions, "你的报销单" return conditions, "当前用户的报销单" def _employee_name_is_unique(self, employee: Employee) -> bool: normalized_name = str(employee.name or "").strip() if not normalized_name: return False same_name_count = int( self.db.scalar( select(func.count()).select_from(Employee).where(Employee.name == normalized_name) ) or 0 ) return same_name_count == 1 @staticmethod def _has_privileged_expense_query_access(context_json: dict[str, Any]) -> bool: role_codes = { str(item).strip().lower() for item in context_json.get("role_codes", []) if str(item).strip() } return bool(role_codes & PRIVILEGED_EXPENSE_QUERY_ROLE_CODES) @staticmethod def _is_self_expense_query(message: str) -> bool: compact_message = "".join(str(message or "").split()) return any(keyword in compact_message for keyword in SELF_REFERENCE_KEYWORDS) @staticmethod def _build_user_query_result( ontology: OntologyParseResult, response: dict[str, Any], ) -> dict[str, Any]: if ontology.scenario == "expense": return { "message": ( f"已路由到 User Agent,占位查询结果:命中 {response['record_count']} 笔报销," f"金额合计 {response['total_amount']} 元。" ), "data": response, } if ontology.scenario == "accounts_receivable": return { "message": ( f"已路由到 User Agent,占位查询结果:命中 {response['record_count']} 条应收," f"未回款金额 {response['outstanding_amount']} 元。" ), "data": response, } return { "message": ( f"已路由到 User Agent,占位查询结果:命中 {response['record_count']} 条应付," f"待付金额 {response['outstanding_amount']} 元。" ), "data": response, } @staticmethod def _build_user_agent_result( response: UserAgentResponse, *, degraded: bool, ) -> dict[str, Any]: result = { "message": response.answer, "answer": response.answer, "citations": [item.model_dump() for item in response.citations], "suggested_actions": [item.model_dump() for item in response.suggested_actions], "risk_flags": response.risk_flags, "requires_confirmation": response.requires_confirmation, "degraded": degraded, } if response.query_payload is not None: result["query_payload"] = response.query_payload.model_dump() if response.draft_payload is not None: result["draft_payload"] = response.draft_payload.model_dump() if response.review_payload is not None: result["review_payload"] = response.review_payload.model_dump() return result def _build_knowledge_answer( self, *, message: str, ontology: OntologyParseResult, capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]], context_json: dict[str, Any], ) -> dict[str, Any]: del ontology, capabilities conversation_history = context_json.get("conversation_history") if not isinstance(conversation_history, list): conversation_history = None payload = self.knowledge_service.search_knowledge( message, conversation_history=conversation_history, limit=8, ) references = [str(item).strip() for item in list(payload.get("references") or []) if str(item).strip()] if references: payload["references"] = references return payload @staticmethod def _build_rule_answer(ontology: OntologyParseResult) -> dict[str, Any]: risk_text = ( "、".join(ontology.risk_flags) if ontology.risk_flags else "未识别到明确风险标签" ) return { "message": f"已完成占位规则检查,风险标签:{risk_text}。", "risk_flags": ontology.risk_flags, } @staticmethod def _build_mcp_answer( task_asset: AgentAssetRead | None, ontology: OntologyParseResult, ) -> dict[str, Any]: return { "message": ( f"已调用占位 MCP 快照,任务={task_asset.code if task_asset else 'none'}," f"scenario={ontology.scenario}。" ), "snapshot": "stubbed", } @staticmethod def _build_hermes_message( *, task_asset: AgentAssetRead | None, ontology: OntologyParseResult, rule_response: dict[str, Any], mcp_response: dict[str, Any], degraded: bool, ) -> str: task_code = task_asset.code if task_asset is not None else "task.unspecified" suffix = ",其中部分能力已降级。" if degraded else "。" return ( f"Hermes 占位执行完成:任务 {task_code}," f"场景 {ontology.scenario},规则结果={rule_response.get('message', '')}," f"MCP 结果={mcp_response.get('message', '')}{suffix}" ) @staticmethod def _database_tool_name(scenario: str) -> str: if scenario == "expense": return "database.expense_claims.lookup" if scenario == "accounts_receivable": return "database.accounts_receivable.lookup" return "database.accounts_payable.lookup" @staticmethod def _rule_tool_name( capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]], ) -> str: if capabilities["rules"]: return capabilities["rules"][0].code return "rule_engine.default_risk_check" @staticmethod def _mcp_tool_name( capabilities: dict[str, list[AgentAssetListItem | AgentAssetRead]], ) -> str: if capabilities["mcps"]: return capabilities["mcps"][0].code return "mcp.default_snapshot" @staticmethod def _build_ontology_json(ontology: OntologyParseResult) -> dict[str, Any]: return { "scenario": ontology.scenario, "intent": ontology.intent, "entities": [item.model_dump() for item in ontology.entities], "time_range": ontology.time_range.model_dump(), "metrics": [item.model_dump() for item in ontology.metrics], "constraints": [item.model_dump() for item in ontology.constraints], "risk_flags": ontology.risk_flags, "permission": ontology.permission.model_dump(), } @staticmethod def _normalize_response_status(status: str) -> str: if status == AgentRunStatus.FAILED.value: return "failed" if status == AgentRunStatus.BLOCKED.value: return "blocked" return "succeeded"