Files
X-Financial/docs/plans/phase-3-agent-orchestration
WIN-JHFT4D3SIVT\caoxiaozhu 7141e1d11a feat: refactor monolithic App.vue into modular Vue component architecture
- Extract 711-line App.vue into 15+ focused files across 5 directories
- Add data layer (icons, metrics, policies, auditTrail, requests)
- Add composables (useNavigation, useRequests, useChat, useToast)
- Add layout components (SidebarRail, TopBar, FilterBar)
- Add shared components (PanelHead, InfoRow, ToastNotification)
- Add business component (RequestTable) and 5 view components
- Extract global CSS to assets/styles/global.css
- Add start.sh with WSL/Windows cross-platform support
- Add .gitignore for node_modules, dist, and IDE dirs
2026-04-28 17:20:52 +08:00
..

Phase 3: Agent 编排W3-W4

目标: 实现 Agent Orchestrator 状态机、5 个业务 Agent、LLM 集成层和审计日志,完成核心智能处理能力。
周期: 第 3 ~ 4 周
任务数: 4 个
可并行: Task 3.3 / 3.4 可与 Task 3.2 并行
前置依赖: Phase 2 完成


本阶段交付物

交付物 说明
Orchestrator 状态机 任务状态流转 + Agent 调度
5 个 Agent 受理 / 解析 / 规则校验 / 解释补件 / 同步
LLM 集成层 多 Provider 支持 + Prompt 模板
审计日志 所有关键操作留痕

任务清单

Task 3.1: Agent Orchestrator 状态机

负责人: 全栈/Agent 工程师
预计工时: 2 天
前置依赖: Phase 2所有 service 就绪)

Files:

  • Create: backend/app/agents/__init__.py

  • Create: backend/app/agents/state.py

  • Create: backend/app/agents/orchestrator.py

  • Create: backend/app/api/v1/agent.py

  • Modify: backend/app/api/v1/router.py

  • Test: backend/tests/test_orchestrator.py

  • Step 1: 定义 Agent 状态和上下文

backend/app/agents/state.py:

from dataclasses import dataclass, field
from app.models.enums import TaskStatus

@dataclass
class AgentContext:
    task_id: str
    status: TaskStatus
    user_intent: str | None = None
    current_agent: str | None = None
    ocr_results: list[dict] = field(default_factory=list)
    reimbursement_data: dict | None = None
    precheck_result: dict | None = None
    supplement_requests: list[dict] = field(default_factory=list)
    error_message: str | None = None
    retry_count: int = 0
  • Step 2: 定义 Agent 基类和结果

backend/app/agents/base_agent.py:

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.state import AgentContext

@dataclass
class AgentResult:
    success: bool
    data: dict = field(default_factory=dict)
    next_action: str = "continue"  # continue / wait_user / need_supplement / retry
    error: str | None = None

class BaseAgent(ABC):
    name: str = ""

    @abstractmethod
    async def execute(self, context: AgentContext, db: AsyncSession) -> AgentResult:
        ...
  • Step 3: 实现 Orchestrator 状态机

backend/app/agents/orchestrator.py — 核心编排逻辑:

状态转换图(对应开发文档 4.2 节):

CREATED → MATERIAL_COLLECTING → PARSING → DRAFT_GENERATED → PRECHECKING
    ↑                                                              ↓
    └─── MATERIAL_COLLECTING ←── NEED_SUPPLEMENT ←────────────────┘
                                              ↓
                                    PENDING_USER_CONFIRM → SUBMITTING → SYNCED
                                                                    ↓
                                                              SYNC_FAILED → SUBMITTING重试
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.state import AgentContext
from app.agents.base_agent import BaseAgent, AgentResult
from app.models.enums import TaskStatus
from app.services.task_service import TaskService

class Orchestrator:
    def __init__(self, db: AsyncSession):
        self.db = db
        self.agents: dict[str, BaseAgent] = {}

    def register_agent(self, agent: BaseAgent):
        self.agents[agent.name] = agent

    async def run(self, task_id: str, start_from: str = "intake") -> AgentContext:
        """启动编排流程"""
        task_svc = TaskService(self.db)
        task = await task_svc.get_task(task_id)
        if not task:
            raise ValueError(f"Task {task_id} not found")

        context = AgentContext(
            task_id=task_id,
            status=task.status,
            user_intent=task.user_intent,
        )

        # 根据 start_from 决定从哪个状态开始
        agent_sequence = self._get_agent_sequence(start_from)

        for agent_name in agent_sequence:
            context.current_agent = agent_name
            await task_svc.update_status(task_id, self._agent_to_status(agent_name), agent_name)

            agent = self.agents.get(agent_name)
            if not agent:
                continue

            result = await agent.execute(context, self.db)

            if not result.success:
                context.error_message = result.error
                break

            context = self._merge_result(context, result)

            if result.next_action == "wait_user":
                await task_svc.update_status(task_id, TaskStatus.PENDING_USER_CONFIRM, agent_name)
                break

            if result.next_action == "need_supplement":
                await task_svc.update_status(task_id, TaskStatus.NEED_SUPPLEMENT, agent_name)
                break

        return context

    def _get_agent_sequence(self, start_from: str) -> list[str]:
        sequences = {
            "intake": ["intake_agent", "parse_agent", "rule_check_agent", "explain_agent"],
            "precheck": ["rule_check_agent", "explain_agent"],
            "submit": ["sync_agent"],
        }
        return sequences.get(start_from, sequences["intake"])

    def _agent_to_status(self, agent_name: str) -> TaskStatus:
        mapping = {
            "intake_agent": TaskStatus.MATERIAL_COLLECTING,
            "parse_agent": TaskStatus.PARSING,
            "rule_check_agent": TaskStatus.PRECHECKING,
            "explain_agent": TaskStatus.PRECHECKING,
            "sync_agent": TaskStatus.SUBMITTING,
        }
        return mapping.get(agent_name, TaskStatus.PRECHECKING)

    def _merge_result(self, context: AgentContext, result: AgentResult) -> AgentContext:
        """将 Agent 结果合并到上下文"""
        data = result.data
        if "ocr_results" in data:
            context.ocr_results = data["ocr_results"]
        if "reimbursement_data" in data:
            context.reimbursement_data = data["reimbursement_data"]
        if "precheck_result" in data:
            context.precheck_result = data["precheck_result"]
        if "supplement_requests" in data:
            context.supplement_requests = data["supplement_requests"]
        return context
  • Step 4: 实现 Agent 启动 API

backend/app/api/v1/agent.py:

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from app.core.database import get_db

router = APIRouter(prefix="/reimbursement/tasks/{task_id}/agent", tags=["agent"])

class AgentRunRequest(BaseModel):
    start_from: str = "intake"   # intake / precheck / submit
    mode: str = "precheck"

class AgentRunResponse(BaseModel):
    task_id: str
    status: str
    current_agent: str | None

@router.post("/run", response_model=AgentRunResponse)
async def run_agent(task_id: str, req: AgentRunRequest, db: AsyncSession = Depends(get_db)):
    orchestrator = create_orchestrator(db)
    context = await orchestrator.run(task_id, start_from=req.start_from)
    return AgentRunResponse(
        task_id=context.task_id,
        status=context.status.value,
        current_agent=context.current_agent,
    )
  • Step 5: 编写状态机转换测试

覆盖路径:

  • 正常路径:创建 → 解析 → 草稿 → 预审 → 通过 → 提交 → 同步

  • 补件路径:预审 → 需补件 → 等待用户

  • 重试路径:提交 → 同步失败 → 重试

  • Step 6: Commit

git add backend/
git commit -m "feat: 实现 Agent Orchestrator 状态机编排"

Task 3.2: 5 个 Agent 实现

负责人: 全栈/Agent 工程师
预计工时: 3 天
前置依赖: Task 3.1Orchestrator 就绪)

Files:

  • Create: backend/app/agents/intake_agent.py

  • Create: backend/app/agents/parse_agent.py

  • Create: backend/app/agents/rule_check_agent.py

  • Create: backend/app/agents/explain_agent.py

  • Create: backend/app/agents/sync_agent.py

  • Test: backend/tests/test_agents.py

  • Step 1: 实现 IntakeAgent受理 Agent

backend/app/agents/intake_agent.py:

  • 分析 user_intent 文本,提取报销类型、出差信息

  • 调用 LLM 做 intent classification

  • 返回结构化任务信息(报销类型、出差城市、日期范围等)

  • 输出:AgentResult(data={"task_info": {...}})

  • Step 2: 实现 ParseAgent单据解析 Agent

backend/app/agents/parse_agent.py:

  • 遍历任务下所有 document调用 ocr_service.recognize()

  • 将 OCR 结果汇总为报销明细

  • 调用 ledger_service.create_shadow_reimbursement() 创建影子记录

  • 调用 ledger_service.add_item() 添加每条明细

  • 自动识别费用类型(可调用 LLM 辅助)

  • 输出:AgentResult(data={"ocr_results": [...], "reimbursement_data": {...}})

  • Step 3: 实现 RuleCheckAgent规则校验 Agent

backend/app/agents/rule_check_agent.py:

  • 构建 context dict报销数据 + 票据数据 + 已有发票列表)

  • 注册 6 个 RuleChecker 到 RuleEngine

  • 调用 rule_engine.run_precheck(context)

  • 保存 RuleHit 记录到 DB

  • 更新 shadow_reimbursement 的预审状态

  • 输出:AgentResult(data={"precheck_result": {...}})

  • Step 4: 实现 ExplainAgent解释与补件 Agent

backend/app/agents/explain_agent.py:

  • 遍历 rule_hits使用 LLM 生成自然语言解释

  • require_attachment 类型的命中自动创建 supplement_request

  • 生成修改建议

  • 根据预审结果决定 next_action

    • 全部通过 → continue
    • 有需补件的 → need_supplement
    • 有阻断的 → need_supplement
  • 输出:AgentResult(data={"supplement_requests": [...]}, next_action="need_supplement")

  • Step 5: 实现 SyncAgent同步执行 Agent

backend/app/agents/sync_agent.py:

  • 将 ShadowReimbursement 数据映射为标准报销单格式

  • 调用 sync_service.mock_sync_to_backend()

  • 记录 SyncRecord

  • 更新 shadow_reimbursement 的 sync_status 和 backend_bill_id

  • 处理同步失败重试retry_count < 3 时标记 retrying

  • 输出:AgentResult(data={"sync_result": {...}})

  • Step 6: 编写每个 Agent 的单元测试

使用 mock DB、mock OCR、mock LLM 测试每个 Agent 的输入输出。

  • Step 7: Commit
git add backend/
git commit -m "feat: 实现 5 个 Agent受理/解析/规则校验/解释补件/同步)"

Task 3.3: LLM 集成层

负责人: 全栈/Agent 工程师
预计工时: 2 天
前置依赖: Phase 2
可并行于: Task 3.2

Files:

  • Create: backend/app/services/llm_service.py

  • Create: backend/app/services/llm_prompts/__init__.py

  • Create: backend/app/services/llm_prompts/intent_classification.py

  • Create: backend/app/services/llm_prompts/risk_explanation.py

  • Create: backend/app/services/llm_prompts/expense_type_mapping.py

  • Test: backend/tests/test_llm_service.py

  • Step 1: 实现 LLM Service 封装

backend/app/services/llm_service.py:

import httpx
import json
from app.core.config import settings

class LLMService:
    def __init__(self):
        self.api_key = settings.LLM_API_KEY
        self.model = settings.LLM_MODEL
        self.base_url = settings.LLM_BASE_URL or "https://api.openai.com/v1"

    async def chat(self, system_prompt: str, user_message: str, json_mode: bool = False) -> str:
        """调用 LLM API"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload = {
            "model": self.model,
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_message},
            ],
        }
        if json_mode:
            payload["response_format"] = {"type": "json_object"}

        async with httpx.AsyncClient(timeout=60.0) as client:
            resp = await client.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload
            )
            resp.raise_for_status()
            data = resp.json()
            return data["choices"][0]["message"]["content"]

    async def chat_json(self, system_prompt: str, user_message: str) -> dict:
        """调用 LLM 并解析 JSON 响应"""
        raw = await self.chat(system_prompt, user_message, json_mode=True)
        return json.loads(raw)
  • Step 2: 编写 Prompt 模板

intent_classification.py — 分析用户意图,识别报销类型:

INTENT_CLASSIFICATION_PROMPT = """你是一个报销意图识别助手。根据用户的描述,识别报销类型和关键信息。

请严格按以下 JSON 格式输出:
{
  "reimbursement_type": "travel_expense" | "office_expense" | "business_meal" | "other",
  "travel_info": {
    "destination": "城市名",
    "start_date": "YYYY-MM-DD" 或 null,
    "end_date": "YYYY-MM-DD" 或 null,
    "purpose": "出差事由"
  },
  "confidence": 0.0-1.0
}

用户描述:{user_intent}
"""

risk_explanation.py — 将规则命中结果转为自然语言解释:

RISK_EXPLANATION_PROMPT = """你是一个报销制度解释助手。请根据规则命中结果,用简洁易懂的语言向用户解释问题。

规则命中信息:
- 规则名称:{rule_name}
- 问题类型:{issue_type}
- 制度依据:{policy_ref}
- 具体数据:{hit_detail}

请用 2-3 句话解释:
1. 存在什么问题
2. 制度标准是什么
3. 建议如何处理
"""

expense_type_mapping.py — 根据 OCR 结果匹配费用类型:

EXPENSE_TYPE_MAPPING_PROMPT = """根据票据 OCR 识别结果,判断费用类型。

可选费用类型:
- travel_transport: 差旅交通费(火车票、机票、打车)
- travel_hotel: 差旅住宿费(酒店发票)
- travel_meal: 差旅餐补
- local_transport: 市内交通费
- business_meal: 业务招待费
- office_supply: 办公用品费
- communication: 通讯费
- other: 其他

OCR 识别结果:{ocr_result}

请输出 JSON{"expense_type": "类型编码", "confidence": 0.0-1.0}
"""
  • Step 3: 编写测试(使用 mock LLM 响应)
import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_llm_intent_classification():
    with patch("app.services.llm_service.LLMService.chat_json", new_callable=AsyncMock) as mock_chat:
        mock_chat.return_value = {
            "reimbursement_type": "travel_expense",
            "travel_info": {"destination": "北京", "purpose": "商务出差"},
            "confidence": 0.9
        }
        llm = LLMService()
        result = await llm.chat_json("system prompt", "我要报北京出差费用")
        assert result["reimbursement_type"] == "travel_expense"
  • Step 4: Commit
git add backend/
git commit -m "feat: 实现 LLM 集成层(多 Provider + Prompt 模板)"

Task 3.4: 审计日志

负责人: 后端工程师 B
预计工时: 1 天
前置依赖: Phase 2
可并行于: Task 3.1、3.2、3.3

Files:

  • Create: backend/app/services/audit_service.py

  • Create: backend/app/api/v1/audit.py

  • Modify: backend/app/api/v1/router.py

  • Test: backend/tests/test_audit.py

  • Step 1: 实现 AuditService

backend/app/services/audit_service.py:

import json
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import Mapped
from app.models.audit import AuditLog

class AuditService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def log(self, action: str, actor: str, target_type: str, target_id: str, detail: dict | None = None):
        """记录审计日志"""
        log_entry = AuditLog(
            action=action,
            actor=actor,
            target_type=target_type,
            target_id=target_id,
            detail=json.dumps(detail, ensure_ascii=False) if detail else None,
        )
        self.db.add(log_entry)
        await self.db.flush()  # 不 commit让调用方统一 commit

    async def query_logs(self, target_type: str | None = None, target_id: str | None = None,
                         actor: str | None = None, page: int = 1, size: int = 50):
        """查询审计日志"""
        query = select(AuditLog)
        if target_type:
            query = query.where(AuditLog.target_type == target_type)
        if target_id:
            query = query.where(AuditLog.target_id == target_id)
        if actor:
            query = query.where(AuditLog.actor == actor)
        query = query.order_by(AuditLog.created_at.desc()).offset((page - 1) * size).limit(size)
        result = await self.db.execute(query)
        return result.scalars().all()
  • Step 2: 定义审计动作枚举
class AuditAction:
    FILE_UPLOAD = "file_upload"
    OCR_RECOGNIZE = "ocr_recognize"
    AGENT_CALL = "agent_call"
    RULE_HIT = "rule_hit"
    SUPPLEMENT_REQUEST = "supplement_request"
    SUPPLEMENT_RESPOND = "supplement_respond"
    USER_CONFIRM = "user_confirm"
    BACKEND_SYNC = "backend_sync"
  • Step 3: 在关键路径埋点

在以下位置调用 audit_service.log()

  • document_service.upload_document()FILE_UPLOAD

  • ocr_service.recognize()OCR_RECOGNIZE

  • orchestrator._run_agent()AGENT_CALL

  • rule_engine.run_precheck()RULE_HIT(每条命中记录一条)

  • supplement_service.create_supplement_request()SUPPLEMENT_REQUEST

  • supplement_service.respond_supplement()SUPPLEMENT_RESPOND

  • sync_service.mock_sync_to_backend()BACKEND_SYNC

  • Step 4: 实现审计日志查询 API

GET /api/v1/audit/logs — 支持按 target_type、target_id、actor、date_range 过滤

  • Step 5: 编写测试

  • Step 6: Commit

git add backend/
git commit -m "feat: 实现审计日志服务(记录 + 查询 API"

本阶段完成检查

  • Orchestrator 状态机所有转换路径测试通过
  • 5 个 Agent 能独立执行并返回正确结果
  • LLM Service 能调用大模型并解析 JSON 响应
  • 审计日志在所有关键路径都有记录
  • POST /api/v1/reimbursement/tasks/{id}/agent/run 能启动完整 Agent 流程
  • 所有测试 pytest tests/ -v 全部通过