from __future__ import annotations import json from datetime import UTC, date, datetime from decimal import Decimal from sqlalchemy import select from sqlalchemy.orm import Session from app.core.agent_enums import ( AgentAssetContentType, AgentAssetDomain, AgentAssetStatus, AgentAssetType, AgentName, AgentPermissionLevel, AgentReviewStatus, AgentRunSource, AgentRunStatus, AgentToolType, ) from app.core.config import get_settings from app.core.logging import get_logger from app.db.base import Base from app.db.session import get_session_factory from app.models.agent_asset import AgentAsset, AgentAssetReview, AgentAssetVersion from app.models.agent_run import AgentRun, AgentToolCall, SemanticParseLog from app.models.audit_log import AuditLog from app.models.financial_record import ( AccountsPayableRecord, AccountsReceivableRecord, ExpenseClaim, ExpenseClaimItem, ) logger = get_logger("app.services.agent_foundation") def prepare_agent_foundation() -> None: settings = get_settings() if not settings.setup_completed: logger.info("Agent foundation bootstrap skipped because setup is incomplete") return session_factory = get_session_factory() with session_factory() as db: AgentFoundationService(db).ensure_foundation_ready() class AgentFoundationService: def __init__(self, db: Session) -> None: self.db = db def ensure_foundation_ready(self) -> None: try: Base.metadata.create_all(bind=self.db.get_bind()) self._seed_agent_assets() self._seed_financial_records() self._seed_runs_and_logs() self.db.commit() except Exception: self.db.rollback() logger.exception("Failed to prepare agent foundation") raise def _seed_agent_assets(self) -> None: existing_codes = set(self.db.scalars(select(AgentAsset.code)).all()) if existing_codes: self._top_up_agent_assets(existing_codes) return approved_rule = AgentAsset( asset_type=AgentAssetType.RULE.value, code="rule.expense.duplicate_expense_check", name="重复报销识别规则", description="识别同一员工短时间内同金额、同地点、同理由的重复报销风险。", domain=AgentAssetDomain.EXPENSE.value, scenario_json=["expense", "risk_check", "duplicate_expense"], owner="财务共享中心", reviewer="张晓晴", status=AgentAssetStatus.ACTIVE.value, current_version="v1.1.0", config_json={"severity": "high", "enabled": True}, ) pending_rule = AgentAsset( asset_type=AgentAssetType.RULE.value, code="rule.expense.travel_receipt_requirements", name="差旅票据完整性规则", description="检查差旅报销是否附齐发票、行程单和住宿凭证。", domain=AgentAssetDomain.EXPENSE.value, scenario_json=["expense", "explain", "invoice_anomaly"], owner="费用运营组", reviewer="高嘉禾", status=AgentAssetStatus.REVIEW.value, current_version="v1.0.0", config_json={"severity": "medium", "enabled": False}, ) rejected_rule = AgentAsset( asset_type=AgentAssetType.RULE.value, code="rule.ap.payment_dual_review", name="付款双人复核规则", description="大额付款必须由两名财务人员复核后再进入付款建议。", domain=AgentAssetDomain.AP.value, scenario_json=["accounts_payable", "approval_required"], owner="付款管理组", reviewer="孙楠", status=AgentAssetStatus.DRAFT.value, current_version="v0.9.0", config_json={"amount_threshold": 50000}, ) skill_expense_asset = AgentAsset( asset_type=AgentAssetType.SKILL.value, code="skill.expense.summary_lookup", name="报销汇总查询技能", description="根据时间、员工和部门汇总报销金额与单据数量。", domain=AgentAssetDomain.EXPENSE.value, scenario_json=["expense", "query", "summary"], owner="平台研发组", reviewer="陈硕", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"input_schema": ["time_range", "employee", "department"]}, ) skill_ar_asset = AgentAsset( asset_type=AgentAssetType.SKILL.value, code="skill.ar.aging_summary", name="应收账龄汇总技能", description="按客户、账龄和逾期状态汇总应收风险分布。", domain=AgentAssetDomain.AR.value, scenario_json=["accounts_receivable", "query", "aging_summary"], owner="平台研发组", reviewer="陈硕", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"input_schema": ["customer", "aging_bucket", "status"]}, ) invoice_mcp_asset = AgentAsset( asset_type=AgentAssetType.MCP.value, code="mcp.invoice.verify_mock", name="发票验真 Mock 服务", description="模拟发票验真、发票状态查询和异常降级说明。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["expense", "invoice_validation"], owner="平台研发组", reviewer="周悦宁", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"endpoint": "mock://invoice/verify", "timeout_ms": 1200}, ) ledger_mcp_asset = AgentAsset( asset_type=AgentAssetType.MCP.value, code="mcp.ledger.snapshot_mock", name="总账快照 Mock 服务", description="模拟返回应收、应付和费用汇总快照,供 Agent 查询和巡检。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["expense", "accounts_receivable", "accounts_payable"], owner="平台研发组", reviewer="周悦宁", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"endpoint": "mock://ledger/snapshot", "timeout_ms": 1500}, ) task_asset = AgentAsset( asset_type=AgentAssetType.TASK.value, code="task.hermes.daily_risk_scan", name="Hermes 每日风险巡检", description="每天早上巡检重复报销、金额超标、逾期应收和异常付款。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["schedule", "risk_check"], owner="风控与审计部", reviewer="顾承宇", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"cron": "0 9 * * *", "agent": AgentName.HERMES.value}, ) ar_summary_task = AgentAsset( asset_type=AgentAssetType.TASK.value, code="task.hermes.weekly_ar_summary", name="Hermes 每周应收账龄汇总", description="每周汇总逾期应收、账龄分布和客户风险变化。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["schedule", "accounts_receivable", "summary"], owner="风控与审计部", reviewer="顾承宇", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"cron": "0 10 * * 1", "agent": AgentName.HERMES.value}, ) rule_digest_task = AgentAsset( asset_type=AgentAssetType.TASK.value, code="task.hermes.rule_review_digest", name="Hermes 规则待审摘要", description="每天汇总待审规则、待补样例和被拒规则修订建议。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["schedule", "rule_center", "review_digest"], owner="风控与审计部", reviewer="顾承宇", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"cron": "0 18 * * *", "agent": AgentName.HERMES.value}, ) self.db.add_all( [ approved_rule, pending_rule, rejected_rule, skill_expense_asset, skill_ar_asset, invoice_mcp_asset, ledger_mcp_asset, task_asset, ar_summary_task, rule_digest_task, ] ) self.db.flush() self.db.add_all( [ AgentAssetVersion( asset=approved_rule, version="v1.0.0", content=self._markdown_content( "# 重复报销识别规则\n\n" "- 检查员工、金额、地点、发生日期是否高度重复。\n" "- 命中后输出 `duplicate_expense` 风险标签。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="初始化生产规则版本。", created_by="系统初始化", ), AgentAssetVersion( asset=approved_rule, version="v1.1.0", content=self._markdown_content( "# 重复报销识别规则\n\n" "- 检查员工、金额、地点、发生日期是否高度重复。\n" "- 新增对同项目、同金额、跨单重复提交的识别。\n" "- 命中后输出 `duplicate_expense` 风险标签。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="补充跨单重复提交判断。", created_by="系统初始化", ), AgentAssetVersion( asset=pending_rule, version="v0.9.0", content=self._markdown_content( "# 差旅票据完整性规则\n\n" "- 差旅报销必须具备发票、行程单、住宿凭证。\n" "- 缺失时输出 `invoice_anomaly`。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="首版草稿。", created_by="高嘉禾", ), AgentAssetVersion( asset=pending_rule, version="v1.0.0", content=self._markdown_content( "# 差旅票据完整性规则\n\n" "- 差旅报销必须具备发票、行程单、住宿凭证。\n" "- 新增高铁改签和住宿分拆票据的补件说明。\n" "- 缺失时输出 `invoice_anomaly`。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="补充差旅特殊票据口径,待审核。", created_by="高嘉禾", ), AgentAssetVersion( asset=rejected_rule, version="v0.8.0", content=self._markdown_content( "# 付款双人复核规则\n\n" "- 单笔付款超过阈值时必须双人复核。\n" "- 本版本规则口径过宽,待修订。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="首版方案。", created_by="孙楠", ), AgentAssetVersion( asset=rejected_rule, version="v0.9.0", content=self._markdown_content( "# 付款双人复核规则\n\n" "- 单笔付款超过阈值时必须双人复核。\n" "- 新增跨币种付款也进入复核队列。\n" "- 当前阈值定义仍不清晰,需继续修订。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="补充跨币种场景,但阈值仍待明确。", created_by="孙楠", ), AgentAssetVersion( asset=skill_expense_asset, version="v1.0.0", content=self._json_content( { "inputs": ["time_range", "employee", "department"], "outputs": ["total_amount", "claim_count"], "dependencies": ["database.expense_claims"], } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化技能快照。", created_by="系统初始化", ), AgentAssetVersion( asset=skill_ar_asset, version="v1.0.0", content=self._json_content( { "inputs": ["customer", "aging_bucket", "status"], "outputs": ["receivable_total", "overdue_total", "customer_count"], "dependencies": ["database.accounts_receivable"], } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化应收账龄技能快照。", created_by="系统初始化", ), AgentAssetVersion( asset=invoice_mcp_asset, version="v1.0.0", content=self._json_content( { "service_type": "mock", "auth_mode": "none", "degrade_strategy": "return_stub_with_warning", } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化 MCP 快照。", created_by="系统初始化", ), AgentAssetVersion( asset=ledger_mcp_asset, version="v1.0.0", content=self._json_content( { "service_type": "mock", "auth_mode": "service_account", "degrade_strategy": "return_cached_snapshot_with_warning", } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化总账快照 MCP。", created_by="系统初始化", ), AgentAssetVersion( asset=task_asset, version="v1.0.0", content=self._json_content( { "task_type": "daily_risk_scan", "schedule": "0 9 * * *", "target_agent": AgentName.HERMES.value, } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化任务快照。", created_by="系统初始化", ), AgentAssetVersion( asset=ar_summary_task, version="v1.0.0", content=self._json_content( { "task_type": "weekly_ar_summary", "schedule": "0 10 * * 1", "target_agent": AgentName.HERMES.value, } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化应收账龄汇总任务。", created_by="系统初始化", ), AgentAssetVersion( asset=rule_digest_task, version="v1.0.0", content=self._json_content( { "task_type": "rule_review_digest", "schedule": "0 18 * * *", "target_agent": AgentName.HERMES.value, } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化规则待审摘要任务。", created_by="系统初始化", ), ] ) self.db.add_all( [ AgentAssetReview( asset=approved_rule, version="v1.1.0", reviewer="张晓晴", review_status=AgentReviewStatus.APPROVED.value, review_note="规则口径清晰,可上线。", reviewed_at=datetime.now(UTC), ), AgentAssetReview( asset=pending_rule, version="v1.0.0", reviewer="高嘉禾", review_status=AgentReviewStatus.PENDING.value, review_note="等待补充票据异常样例。", reviewed_at=None, ), AgentAssetReview( asset=rejected_rule, version="v0.9.0", reviewer="孙楠", review_status=AgentReviewStatus.REJECTED.value, review_note="阈值定义不清,暂不通过。", reviewed_at=datetime.now(UTC), ), ] ) def _seed_financial_records(self) -> None: if self.db.scalar(select(ExpenseClaim.id).limit(1)) is not None: return claim_1 = ExpenseClaim( claim_no="EXP-202605-001", employee_name="张三", department_name="财务共享中心", project_code="PRJ-EXP-01", expense_type="travel", reason="华南客户拜访差旅报销", location="深圳", amount=Decimal("3280.00"), currency="CNY", invoice_count=3, occurred_at=datetime(2026, 5, 6, 9, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 7, 10, 20, tzinfo=UTC), status="submitted", approval_stage="finance_review", risk_flags_json=["amount_over_limit"], ) claim_1.items = [ ExpenseClaimItem( item_date=date(2026, 5, 5), item_type="hotel", item_reason="客户拜访住宿", item_location="深圳", item_amount=Decimal("1880.00"), invoice_id="INV-HOTEL-001", ), ExpenseClaimItem( item_date=date(2026, 5, 6), item_type="transport", item_reason="往返交通", item_location="深圳", item_amount=Decimal("1400.00"), invoice_id="INV-TRANS-009", ), ] claim_2 = ExpenseClaim( claim_no="EXP-202605-002", employee_name="李四", department_name="华东销售部", project_code="PRJ-SALES-02", expense_type="meal", reason="客户路演餐费", location="上海", amount=Decimal("860.00"), currency="CNY", invoice_count=1, occurred_at=datetime(2026, 5, 8, 12, 0, tzinfo=UTC), submitted_at=datetime(2026, 5, 8, 18, 30, tzinfo=UTC), status="approved", approval_stage="completed", risk_flags_json=[], ) claim_3 = ExpenseClaim( claim_no="EXP-202605-003", employee_name="王五", department_name="市场品牌部", project_code="PRJ-MKT-08", expense_type="travel", reason="市场活动会务差旅", location="北京", amount=Decimal("3280.00"), currency="CNY", invoice_count=2, occurred_at=datetime(2026, 5, 6, 11, 30, tzinfo=UTC), submitted_at=datetime(2026, 5, 8, 9, 10, tzinfo=UTC), status="review", approval_stage="risk_check", risk_flags_json=["duplicate_expense"], ) ar_records = [ AccountsReceivableRecord( receivable_no="AR-202605-001", customer_id="CUS-A", customer_name="客户A", contract_no="CTR-AR-1001", invoice_no="INV-AR-9001", amount_receivable=Decimal("120000.00"), amount_received=Decimal("70000.00"), amount_outstanding=Decimal("50000.00"), currency="CNY", posting_date=date(2026, 4, 1), due_date=date(2026, 4, 30), aging_days=11, status="partial", risk_flags_json=[], ), AccountsReceivableRecord( receivable_no="AR-202605-002", customer_id="CUS-B", customer_name="客户B", contract_no="CTR-AR-1002", invoice_no="INV-AR-9002", amount_receivable=Decimal("88000.00"), amount_received=Decimal("10000.00"), amount_outstanding=Decimal("78000.00"), currency="CNY", posting_date=date(2026, 3, 15), due_date=date(2026, 4, 15), aging_days=26, status="overdue", risk_flags_json=["ar_overdue"], ), ] ap_records = [ AccountsPayableRecord( payable_no="AP-202605-001", vendor_id="VEN-A", vendor_name="供应商A", invoice_no="INV-AP-5001", amount_payable=Decimal("43000.00"), amount_paid=Decimal("10000.00"), amount_outstanding=Decimal("33000.00"), currency="CNY", posting_date=date(2026, 4, 20), due_date=date(2026, 5, 12), aging_days=0, status="scheduled", risk_flags_json=[], ), AccountsPayableRecord( payable_no="AP-202605-002", vendor_id="VEN-B", vendor_name="供应商B", invoice_no="INV-AP-5002", amount_payable=Decimal("96000.00"), amount_paid=Decimal("0.00"), amount_outstanding=Decimal("96000.00"), currency="CNY", posting_date=date(2026, 4, 10), due_date=date(2026, 5, 5), aging_days=6, status="overdue", risk_flags_json=["ap_overdue"], ), ] self.db.add_all([claim_1, claim_2, claim_3, *ar_records, *ap_records]) def _seed_runs_and_logs(self) -> None: if self.db.scalar(select(AgentRun.id).limit(1)) is not None: return task_asset = self.db.scalar( select(AgentAsset).where(AgentAsset.code == "task.hermes.daily_risk_scan") ) user_run = AgentRun( run_id="run_user_20260511_001", agent=AgentName.USER_AGENT.value, source=AgentRunSource.USER_MESSAGE.value, user_id="emp_001", task_id=None, ontology_json={"scenario": "expense", "intent": "query"}, route_json={"selected_agent": AgentName.USER_AGENT.value, "route_reason": "user query"}, permission_level=AgentPermissionLevel.READ.value, status=AgentRunStatus.SUCCEEDED.value, result_summary="已返回本周报销金额和风险摘要。", started_at=datetime(2026, 5, 11, 8, 35, tzinfo=UTC), finished_at=datetime(2026, 5, 11, 8, 35, 2, tzinfo=UTC), ) hermes_run = AgentRun( run_id="run_hermes_20260511_001", agent=AgentName.HERMES.value, source=AgentRunSource.SCHEDULE.value, user_id=None, task_id=task_asset.id if task_asset else None, ontology_json={"scenario": "expense", "intent": "risk_check"}, route_json={ "selected_agent": AgentName.HERMES.value, "route_reason": "scheduled risk scan", }, permission_level=AgentPermissionLevel.READ.value, status=AgentRunStatus.SUCCEEDED.value, result_summary="Hermes 已生成今日风险巡检摘要。", started_at=datetime(2026, 5, 11, 9, 0, tzinfo=UTC), finished_at=datetime(2026, 5, 11, 9, 0, 4, tzinfo=UTC), ) blocked_run = AgentRun( run_id="run_user_20260511_002", agent=AgentName.ORCHESTRATOR.value, source=AgentRunSource.USER_MESSAGE.value, user_id="emp_002", task_id=None, ontology_json={"scenario": "accounts_payable", "intent": "operate"}, route_json={ "selected_agent": AgentName.USER_AGENT.value, "route_reason": "payment request", }, permission_level=AgentPermissionLevel.APPROVAL_REQUIRED.value, status=AgentRunStatus.BLOCKED.value, result_summary="动作需要人工确认。", error_message="直接付款属于高风险动作,已阻断自动执行。", started_at=datetime(2026, 5, 11, 10, 5, tzinfo=UTC), finished_at=datetime(2026, 5, 11, 10, 5, 1, tzinfo=UTC), ) self.db.add_all([user_run, hermes_run, blocked_run]) self.db.flush() self.db.add_all( [ AgentToolCall( run_id=user_run.run_id, tool_type=AgentToolType.DATABASE.value, tool_name="expense_claims.lookup", request_json={"time_range": "this_week", "employee": "all"}, response_json={"claim_count": 3, "total_amount": "7420.00"}, status="succeeded", duration_ms=48, ), AgentToolCall( run_id=hermes_run.run_id, tool_type=AgentToolType.MCP.value, tool_name="invoice.verify_mock", request_json={"claim_no": "EXP-202605-003"}, response_json={ "warning": "external service degraded", "fallback": "used mock response", }, status="failed", duration_ms=132, error_message="mock upstream timeout", ), AgentToolCall( run_id=blocked_run.run_id, tool_type=AgentToolType.RULE_ENGINE.value, tool_name="permission.guard", request_json={"action": "direct_payment"}, response_json={"requires_confirmation": True}, status="succeeded", duration_ms=5, ), SemanticParseLog( run_id=user_run.run_id, user_id="emp_001", raw_query="查一下本周报销超标风险", scenario="expense", intent="risk_check", entities_json=[], time_range_json={"start_date": "2026-05-11", "end_date": "2026-05-17"}, metrics_json=["amount"], constraints_json=[], risk_flags_json=["amount_over_limit"], permission_json={"level": AgentPermissionLevel.READ.value}, confidence=0.93, ), SemanticParseLog( run_id=blocked_run.run_id, user_id="emp_002", raw_query="帮我直接付款给供应商B", scenario="accounts_payable", intent="operate", entities_json=[{"type": "vendor", "value": "供应商B"}], time_range_json={}, metrics_json=["amount"], constraints_json=[], risk_flags_json=["ap_overdue"], permission_json={"level": AgentPermissionLevel.APPROVAL_REQUIRED.value}, confidence=0.96, ), ] ) if self.db.scalar(select(AuditLog.id).limit(1)) is None: self.db.add_all( [ AuditLog( actor="系统初始化", action="save_rule_markdown", resource_type="rule", resource_id="rule.expense.duplicate_expense_check", before_json=None, after_json={"version": "v1.0.0"}, request_id="seed-audit-001", ), AuditLog( actor="张晓晴", action="review_rule", resource_type="rule", resource_id="rule.expense.duplicate_expense_check", before_json={"review_status": "pending"}, after_json={"review_status": "approved"}, request_id="seed-audit-002", ), AuditLog( actor="系统初始化", action="activate_rule", resource_type="rule", resource_id="rule.expense.duplicate_expense_check", before_json={"status": "review"}, after_json={"status": "active"}, request_id="seed-audit-003", ), AuditLog( actor="Hermes", action="update_task_status", resource_type="task", resource_id="task.hermes.daily_risk_scan", before_json={"status": "idle"}, after_json={"status": "succeeded"}, request_id="seed-audit-004", ), ] ) def _top_up_agent_assets(self, existing_codes: set[str]) -> None: approved_rule = self.db.scalar( select(AgentAsset).where(AgentAsset.code == "rule.expense.duplicate_expense_check") ) pending_rule = self.db.scalar( select(AgentAsset).where(AgentAsset.code == "rule.expense.travel_receipt_requirements") ) rejected_rule = self.db.scalar( select(AgentAsset).where(AgentAsset.code == "rule.ap.payment_dual_review") ) if approved_rule is not None: self._ensure_asset_version( approved_rule, version="v1.1.0", content=self._markdown_content( "# 重复报销识别规则\n\n" "- 检查员工、金额、地点、发生日期是否高度重复。\n" "- 新增对同项目、同金额、跨单重复提交的识别。\n" "- 命中后输出 `duplicate_expense` 风险标签。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="补充跨单重复提交判断。", created_by="系统初始化", ) if pending_rule is not None: self._ensure_asset_version( pending_rule, version="v1.0.0", content=self._markdown_content( "# 差旅票据完整性规则\n\n" "- 差旅报销必须具备发票、行程单、住宿凭证。\n" "- 新增高铁改签和住宿分拆票据的补件说明。\n" "- 缺失时输出 `invoice_anomaly`。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="补充差旅特殊票据口径,待审核。", created_by="高嘉禾", ) if rejected_rule is not None: self._ensure_asset_version( rejected_rule, version="v0.9.0", content=self._markdown_content( "# 付款双人复核规则\n\n" "- 单笔付款超过阈值时必须双人复核。\n" "- 新增跨币种付款也进入复核队列。\n" "- 当前阈值定义仍不清晰,需继续修订。" ), content_type=AgentAssetContentType.MARKDOWN.value, change_note="补充跨币种场景,但阈值仍待明确。", created_by="孙楠", ) if "skill.ar.aging_summary" not in existing_codes: asset = self._create_seed_asset( asset_type=AgentAssetType.SKILL.value, code="skill.ar.aging_summary", name="应收账龄汇总技能", description="按客户、账龄和逾期状态汇总应收风险分布。", domain=AgentAssetDomain.AR.value, scenario_json=["accounts_receivable", "query", "aging_summary"], owner="平台研发组", reviewer="陈硕", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"input_schema": ["customer", "aging_bucket", "status"]}, ) self._ensure_asset_version( asset, version="v1.0.0", content=self._json_content( { "inputs": ["customer", "aging_bucket", "status"], "outputs": ["receivable_total", "overdue_total", "customer_count"], "dependencies": ["database.accounts_receivable"], } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化应收账龄技能快照。", created_by="系统初始化", ) if "mcp.ledger.snapshot_mock" not in existing_codes: asset = self._create_seed_asset( asset_type=AgentAssetType.MCP.value, code="mcp.ledger.snapshot_mock", name="总账快照 Mock 服务", description="模拟返回应收、应付和费用汇总快照,供 Agent 查询和巡检。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["expense", "accounts_receivable", "accounts_payable"], owner="平台研发组", reviewer="周悦宁", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"endpoint": "mock://ledger/snapshot", "timeout_ms": 1500}, ) self._ensure_asset_version( asset, version="v1.0.0", content=self._json_content( { "service_type": "mock", "auth_mode": "service_account", "degrade_strategy": "return_cached_snapshot_with_warning", } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化总账快照 MCP。", created_by="系统初始化", ) if "task.hermes.weekly_ar_summary" not in existing_codes: asset = self._create_seed_asset( asset_type=AgentAssetType.TASK.value, code="task.hermes.weekly_ar_summary", name="Hermes 每周应收账龄汇总", description="每周汇总逾期应收、账龄分布和客户风险变化。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["schedule", "accounts_receivable", "summary"], owner="风控与审计部", reviewer="顾承宇", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"cron": "0 10 * * 1", "agent": AgentName.HERMES.value}, ) self._ensure_asset_version( asset, version="v1.0.0", content=self._json_content( { "task_type": "weekly_ar_summary", "schedule": "0 10 * * 1", "target_agent": AgentName.HERMES.value, } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化应收账龄汇总任务。", created_by="系统初始化", ) if "task.hermes.rule_review_digest" not in existing_codes: asset = self._create_seed_asset( asset_type=AgentAssetType.TASK.value, code="task.hermes.rule_review_digest", name="Hermes 规则待审摘要", description="每天汇总待审规则、待补样例和被拒规则修订建议。", domain=AgentAssetDomain.SYSTEM.value, scenario_json=["schedule", "rule_center", "review_digest"], owner="风控与审计部", reviewer="顾承宇", status=AgentAssetStatus.ACTIVE.value, current_version="v1.0.0", config_json={"cron": "0 18 * * *", "agent": AgentName.HERMES.value}, ) self._ensure_asset_version( asset, version="v1.0.0", content=self._json_content( { "task_type": "rule_review_digest", "schedule": "0 18 * * *", "target_agent": AgentName.HERMES.value, } ), content_type=AgentAssetContentType.JSON.value, change_note="初始化规则待审摘要任务。", created_by="系统初始化", ) def _create_seed_asset( self, *, asset_type: str, code: str, name: str, description: str, domain: str, scenario_json: list[str], owner: str, reviewer: str, status: str, current_version: str, config_json: dict[str, object], ) -> AgentAsset: asset = AgentAsset( asset_type=asset_type, code=code, name=name, description=description, domain=domain, scenario_json=scenario_json, owner=owner, reviewer=reviewer, status=status, current_version=current_version, config_json=config_json, ) self.db.add(asset) self.db.flush() return asset def _ensure_asset_version( self, asset: AgentAsset, *, version: str, content: str, content_type: str, change_note: str, created_by: str, ) -> None: existing = self.db.scalar( select(AgentAssetVersion).where( AgentAssetVersion.asset_id == asset.id, AgentAssetVersion.version == version, ) ) if existing is not None: return self.db.add( AgentAssetVersion( asset_id=asset.id, version=version, content=content, content_type=content_type, change_note=change_note, created_by=created_by, ) ) @staticmethod def _markdown_content(content: str) -> str: return content @staticmethod def _json_content(content: dict[str, object]) -> str: return json.dumps(content, ensure_ascii=False, sort_keys=True, indent=2)