# Phase 3: Agent 编排(W3-W4) > **目标:** 实现 Agent Orchestrator 状态机、5 个业务 Agent、LLM 集成层和审计日志,完成核心智能处理能力。 > **周期:** 第 3 ~ 4 周 > **任务数:** 4 个 > **可并行:** Task 3.3 / 3.4 可与 Task 3.2 并行 > **前置依赖:** Phase 2 完成 --- ## 本阶段交付物 | 交付物 | 说明 | |---|---| | Orchestrator 状态机 | 任务状态流转 + Agent 调度 | | 5 个 Agent | 受理 / 解析 / 规则校验 / 解释补件 / 同步 | | LLM 集成层 | 多 Provider 支持 + Prompt 模板 | | 审计日志 | 所有关键操作留痕 | --- ## 任务清单 ### Task 3.1: Agent Orchestrator 状态机 **负责人:** 全栈/Agent 工程师 **预计工时:** 2 天 **前置依赖:** Phase 2(所有 service 就绪) **Files:** - Create: `backend/app/agents/__init__.py` - Create: `backend/app/agents/state.py` - Create: `backend/app/agents/orchestrator.py` - Create: `backend/app/api/v1/agent.py` - Modify: `backend/app/api/v1/router.py` - Test: `backend/tests/test_orchestrator.py` - [ ] **Step 1: 定义 Agent 状态和上下文** `backend/app/agents/state.py`: ```python from dataclasses import dataclass, field from app.models.enums import TaskStatus @dataclass class AgentContext: task_id: str status: TaskStatus user_intent: str | None = None current_agent: str | None = None ocr_results: list[dict] = field(default_factory=list) reimbursement_data: dict | None = None precheck_result: dict | None = None supplement_requests: list[dict] = field(default_factory=list) error_message: str | None = None retry_count: int = 0 ``` - [ ] **Step 2: 定义 Agent 基类和结果** `backend/app/agents/base_agent.py`: ```python from abc import ABC, abstractmethod from dataclasses import dataclass, field from sqlalchemy.ext.asyncio import AsyncSession from app.agents.state import AgentContext @dataclass class AgentResult: success: bool data: dict = field(default_factory=dict) next_action: str = "continue" # continue / wait_user / need_supplement / retry error: str | None = None class BaseAgent(ABC): name: str = "" @abstractmethod async def execute(self, context: AgentContext, db: AsyncSession) -> AgentResult: ... ``` - [ ] **Step 3: 实现 Orchestrator 状态机** `backend/app/agents/orchestrator.py` — 核心编排逻辑: 状态转换图(对应开发文档 4.2 节): ``` CREATED → MATERIAL_COLLECTING → PARSING → DRAFT_GENERATED → PRECHECKING ↑ ↓ └─── MATERIAL_COLLECTING ←── NEED_SUPPLEMENT ←────────────────┘ ↓ PENDING_USER_CONFIRM → SUBMITTING → SYNCED ↓ SYNC_FAILED → SUBMITTING(重试) ``` ```python from sqlalchemy.ext.asyncio import AsyncSession from app.agents.state import AgentContext from app.agents.base_agent import BaseAgent, AgentResult from app.models.enums import TaskStatus from app.services.task_service import TaskService class Orchestrator: def __init__(self, db: AsyncSession): self.db = db self.agents: dict[str, BaseAgent] = {} def register_agent(self, agent: BaseAgent): self.agents[agent.name] = agent async def run(self, task_id: str, start_from: str = "intake") -> AgentContext: """启动编排流程""" task_svc = TaskService(self.db) task = await task_svc.get_task(task_id) if not task: raise ValueError(f"Task {task_id} not found") context = AgentContext( task_id=task_id, status=task.status, user_intent=task.user_intent, ) # 根据 start_from 决定从哪个状态开始 agent_sequence = self._get_agent_sequence(start_from) for agent_name in agent_sequence: context.current_agent = agent_name await task_svc.update_status(task_id, self._agent_to_status(agent_name), agent_name) agent = self.agents.get(agent_name) if not agent: continue result = await agent.execute(context, self.db) if not result.success: context.error_message = result.error break context = self._merge_result(context, result) if result.next_action == "wait_user": await task_svc.update_status(task_id, TaskStatus.PENDING_USER_CONFIRM, agent_name) break if result.next_action == "need_supplement": await task_svc.update_status(task_id, TaskStatus.NEED_SUPPLEMENT, agent_name) break return context def _get_agent_sequence(self, start_from: str) -> list[str]: sequences = { "intake": ["intake_agent", "parse_agent", "rule_check_agent", "explain_agent"], "precheck": ["rule_check_agent", "explain_agent"], "submit": ["sync_agent"], } return sequences.get(start_from, sequences["intake"]) def _agent_to_status(self, agent_name: str) -> TaskStatus: mapping = { "intake_agent": TaskStatus.MATERIAL_COLLECTING, "parse_agent": TaskStatus.PARSING, "rule_check_agent": TaskStatus.PRECHECKING, "explain_agent": TaskStatus.PRECHECKING, "sync_agent": TaskStatus.SUBMITTING, } return mapping.get(agent_name, TaskStatus.PRECHECKING) def _merge_result(self, context: AgentContext, result: AgentResult) -> AgentContext: """将 Agent 结果合并到上下文""" data = result.data if "ocr_results" in data: context.ocr_results = data["ocr_results"] if "reimbursement_data" in data: context.reimbursement_data = data["reimbursement_data"] if "precheck_result" in data: context.precheck_result = data["precheck_result"] if "supplement_requests" in data: context.supplement_requests = data["supplement_requests"] return context ``` - [ ] **Step 4: 实现 Agent 启动 API** `backend/app/api/v1/agent.py`: ```python from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from pydantic import BaseModel from app.core.database import get_db router = APIRouter(prefix="/reimbursement/tasks/{task_id}/agent", tags=["agent"]) class AgentRunRequest(BaseModel): start_from: str = "intake" # intake / precheck / submit mode: str = "precheck" class AgentRunResponse(BaseModel): task_id: str status: str current_agent: str | None @router.post("/run", response_model=AgentRunResponse) async def run_agent(task_id: str, req: AgentRunRequest, db: AsyncSession = Depends(get_db)): orchestrator = create_orchestrator(db) context = await orchestrator.run(task_id, start_from=req.start_from) return AgentRunResponse( task_id=context.task_id, status=context.status.value, current_agent=context.current_agent, ) ``` - [ ] **Step 5: 编写状态机转换测试** 覆盖路径: - 正常路径:创建 → 解析 → 草稿 → 预审 → 通过 → 提交 → 同步 - 补件路径:预审 → 需补件 → 等待用户 - 重试路径:提交 → 同步失败 → 重试 - [ ] **Step 6: Commit** ```bash git add backend/ git commit -m "feat: 实现 Agent Orchestrator 状态机编排" ``` --- ### Task 3.2: 5 个 Agent 实现 **负责人:** 全栈/Agent 工程师 **预计工时:** 3 天 **前置依赖:** Task 3.1(Orchestrator 就绪) **Files:** - Create: `backend/app/agents/intake_agent.py` - Create: `backend/app/agents/parse_agent.py` - Create: `backend/app/agents/rule_check_agent.py` - Create: `backend/app/agents/explain_agent.py` - Create: `backend/app/agents/sync_agent.py` - Test: `backend/tests/test_agents.py` - [ ] **Step 1: 实现 IntakeAgent(受理 Agent)** `backend/app/agents/intake_agent.py`: - 分析 user_intent 文本,提取报销类型、出差信息 - 调用 LLM 做 intent classification - 返回结构化任务信息(报销类型、出差城市、日期范围等) - 输出:`AgentResult(data={"task_info": {...}})` - [ ] **Step 2: 实现 ParseAgent(单据解析 Agent)** `backend/app/agents/parse_agent.py`: - 遍历任务下所有 document,调用 `ocr_service.recognize()` - 将 OCR 结果汇总为报销明细 - 调用 `ledger_service.create_shadow_reimbursement()` 创建影子记录 - 调用 `ledger_service.add_item()` 添加每条明细 - 自动识别费用类型(可调用 LLM 辅助) - 输出:`AgentResult(data={"ocr_results": [...], "reimbursement_data": {...}})` - [ ] **Step 3: 实现 RuleCheckAgent(规则校验 Agent)** `backend/app/agents/rule_check_agent.py`: - 构建 context dict(报销数据 + 票据数据 + 已有发票列表) - 注册 6 个 RuleChecker 到 RuleEngine - 调用 `rule_engine.run_precheck(context)` - 保存 RuleHit 记录到 DB - 更新 shadow_reimbursement 的预审状态 - 输出:`AgentResult(data={"precheck_result": {...}})` - [ ] **Step 4: 实现 ExplainAgent(解释与补件 Agent)** `backend/app/agents/explain_agent.py`: - 遍历 rule_hits,使用 LLM 生成自然语言解释 - 对 `require_attachment` 类型的命中自动创建 supplement_request - 生成修改建议 - 根据预审结果决定 next_action: - 全部通过 → `continue` - 有需补件的 → `need_supplement` - 有阻断的 → `need_supplement` - 输出:`AgentResult(data={"supplement_requests": [...]}, next_action="need_supplement")` - [ ] **Step 5: 实现 SyncAgent(同步执行 Agent)** `backend/app/agents/sync_agent.py`: - 将 ShadowReimbursement 数据映射为标准报销单格式 - 调用 `sync_service.mock_sync_to_backend()` - 记录 SyncRecord - 更新 shadow_reimbursement 的 sync_status 和 backend_bill_id - 处理同步失败重试(retry_count < 3 时标记 retrying) - 输出:`AgentResult(data={"sync_result": {...}})` - [ ] **Step 6: 编写每个 Agent 的单元测试** 使用 mock DB、mock OCR、mock LLM 测试每个 Agent 的输入输出。 - [ ] **Step 7: Commit** ```bash git add backend/ git commit -m "feat: 实现 5 个 Agent(受理/解析/规则校验/解释补件/同步)" ``` --- ### Task 3.3: LLM 集成层 **负责人:** 全栈/Agent 工程师 **预计工时:** 2 天 **前置依赖:** Phase 2 **可并行于:** Task 3.2 **Files:** - Create: `backend/app/services/llm_service.py` - Create: `backend/app/services/llm_prompts/__init__.py` - Create: `backend/app/services/llm_prompts/intent_classification.py` - Create: `backend/app/services/llm_prompts/risk_explanation.py` - Create: `backend/app/services/llm_prompts/expense_type_mapping.py` - Test: `backend/tests/test_llm_service.py` - [ ] **Step 1: 实现 LLM Service 封装** `backend/app/services/llm_service.py`: ```python import httpx import json from app.core.config import settings class LLMService: def __init__(self): self.api_key = settings.LLM_API_KEY self.model = settings.LLM_MODEL self.base_url = settings.LLM_BASE_URL or "https://api.openai.com/v1" async def chat(self, system_prompt: str, user_message: str, json_mode: bool = False) -> str: """调用 LLM API""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } payload = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}, ], } if json_mode: payload["response_format"] = {"type": "json_object"} async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post( f"{self.base_url}/chat/completions", headers=headers, json=payload ) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"] async def chat_json(self, system_prompt: str, user_message: str) -> dict: """调用 LLM 并解析 JSON 响应""" raw = await self.chat(system_prompt, user_message, json_mode=True) return json.loads(raw) ``` - [ ] **Step 2: 编写 Prompt 模板** **`intent_classification.py`** — 分析用户意图,识别报销类型: ```python INTENT_CLASSIFICATION_PROMPT = """你是一个报销意图识别助手。根据用户的描述,识别报销类型和关键信息。 请严格按以下 JSON 格式输出: { "reimbursement_type": "travel_expense" | "office_expense" | "business_meal" | "other", "travel_info": { "destination": "城市名", "start_date": "YYYY-MM-DD" 或 null, "end_date": "YYYY-MM-DD" 或 null, "purpose": "出差事由" }, "confidence": 0.0-1.0 } 用户描述:{user_intent} """ ``` **`risk_explanation.py`** — 将规则命中结果转为自然语言解释: ```python RISK_EXPLANATION_PROMPT = """你是一个报销制度解释助手。请根据规则命中结果,用简洁易懂的语言向用户解释问题。 规则命中信息: - 规则名称:{rule_name} - 问题类型:{issue_type} - 制度依据:{policy_ref} - 具体数据:{hit_detail} 请用 2-3 句话解释: 1. 存在什么问题 2. 制度标准是什么 3. 建议如何处理 """ ``` **`expense_type_mapping.py`** — 根据 OCR 结果匹配费用类型: ```python EXPENSE_TYPE_MAPPING_PROMPT = """根据票据 OCR 识别结果,判断费用类型。 可选费用类型: - travel_transport: 差旅交通费(火车票、机票、打车) - travel_hotel: 差旅住宿费(酒店发票) - travel_meal: 差旅餐补 - local_transport: 市内交通费 - business_meal: 业务招待费 - office_supply: 办公用品费 - communication: 通讯费 - other: 其他 OCR 识别结果:{ocr_result} 请输出 JSON:{"expense_type": "类型编码", "confidence": 0.0-1.0} """ ``` - [ ] **Step 3: 编写测试(使用 mock LLM 响应)** ```python import pytest from unittest.mock import AsyncMock, patch @pytest.mark.asyncio async def test_llm_intent_classification(): with patch("app.services.llm_service.LLMService.chat_json", new_callable=AsyncMock) as mock_chat: mock_chat.return_value = { "reimbursement_type": "travel_expense", "travel_info": {"destination": "北京", "purpose": "商务出差"}, "confidence": 0.9 } llm = LLMService() result = await llm.chat_json("system prompt", "我要报北京出差费用") assert result["reimbursement_type"] == "travel_expense" ``` - [ ] **Step 4: Commit** ```bash git add backend/ git commit -m "feat: 实现 LLM 集成层(多 Provider + Prompt 模板)" ``` --- ### Task 3.4: 审计日志 **负责人:** 后端工程师 B **预计工时:** 1 天 **前置依赖:** Phase 2 **可并行于:** Task 3.1、3.2、3.3 **Files:** - Create: `backend/app/services/audit_service.py` - Create: `backend/app/api/v1/audit.py` - Modify: `backend/app/api/v1/router.py` - Test: `backend/tests/test_audit.py` - [ ] **Step 1: 实现 AuditService** `backend/app/services/audit_service.py`: ```python import json from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import Mapped from app.models.audit import AuditLog class AuditService: def __init__(self, db: AsyncSession): self.db = db async def log(self, action: str, actor: str, target_type: str, target_id: str, detail: dict | None = None): """记录审计日志""" log_entry = AuditLog( action=action, actor=actor, target_type=target_type, target_id=target_id, detail=json.dumps(detail, ensure_ascii=False) if detail else None, ) self.db.add(log_entry) await self.db.flush() # 不 commit,让调用方统一 commit async def query_logs(self, target_type: str | None = None, target_id: str | None = None, actor: str | None = None, page: int = 1, size: int = 50): """查询审计日志""" query = select(AuditLog) if target_type: query = query.where(AuditLog.target_type == target_type) if target_id: query = query.where(AuditLog.target_id == target_id) if actor: query = query.where(AuditLog.actor == actor) query = query.order_by(AuditLog.created_at.desc()).offset((page - 1) * size).limit(size) result = await self.db.execute(query) return result.scalars().all() ``` - [ ] **Step 2: 定义审计动作枚举** ```python class AuditAction: FILE_UPLOAD = "file_upload" OCR_RECOGNIZE = "ocr_recognize" AGENT_CALL = "agent_call" RULE_HIT = "rule_hit" SUPPLEMENT_REQUEST = "supplement_request" SUPPLEMENT_RESPOND = "supplement_respond" USER_CONFIRM = "user_confirm" BACKEND_SYNC = "backend_sync" ``` - [ ] **Step 3: 在关键路径埋点** 在以下位置调用 `audit_service.log()`: - `document_service.upload_document()` → `FILE_UPLOAD` - `ocr_service.recognize()` → `OCR_RECOGNIZE` - `orchestrator._run_agent()` → `AGENT_CALL` - `rule_engine.run_precheck()` → `RULE_HIT`(每条命中记录一条) - `supplement_service.create_supplement_request()` → `SUPPLEMENT_REQUEST` - `supplement_service.respond_supplement()` → `SUPPLEMENT_RESPOND` - `sync_service.mock_sync_to_backend()` → `BACKEND_SYNC` - [ ] **Step 4: 实现审计日志查询 API** `GET /api/v1/audit/logs` — 支持按 target_type、target_id、actor、date_range 过滤 - [ ] **Step 5: 编写测试** - [ ] **Step 6: Commit** ```bash git add backend/ git commit -m "feat: 实现审计日志服务(记录 + 查询 API)" ``` --- ## 本阶段完成检查 - [ ] Orchestrator 状态机所有转换路径测试通过 - [ ] 5 个 Agent 能独立执行并返回正确结果 - [ ] LLM Service 能调用大模型并解析 JSON 响应 - [ ] 审计日志在所有关键路径都有记录 - [ ] `POST /api/v1/reimbursement/tasks/{id}/agent/run` 能启动完整 Agent 流程 - [ ] 所有测试 `pytest tests/ -v` 全部通过