Files
X-Financial/docs/plans/phase-3-agent-orchestration/README.md

569 lines
18 KiB
Markdown
Raw Normal View History

# Phase 3: Agent 编排W3-W4
> **目标:** 实现 Agent Orchestrator 状态机、5 个业务 Agent、LLM 集成层和审计日志,完成核心智能处理能力。
> **周期:** 第 3 ~ 4 周
> **任务数:** 4 个
> **可并行:** Task 3.3 / 3.4 可与 Task 3.2 并行
> **前置依赖:** Phase 2 完成
---
## 本阶段交付物
| 交付物 | 说明 |
|---|---|
| Orchestrator 状态机 | 任务状态流转 + Agent 调度 |
| 5 个 Agent | 受理 / 解析 / 规则校验 / 解释补件 / 同步 |
| LLM 集成层 | 多 Provider 支持 + Prompt 模板 |
| 审计日志 | 所有关键操作留痕 |
---
## 任务清单
### Task 3.1: Agent Orchestrator 状态机
**负责人:** 全栈/Agent 工程师
**预计工时:** 2 天
**前置依赖:** Phase 2所有 service 就绪)
**Files:**
- Create: `backend/app/agents/__init__.py`
- Create: `backend/app/agents/state.py`
- Create: `backend/app/agents/orchestrator.py`
- Create: `backend/app/api/v1/agent.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_orchestrator.py`
- [ ] **Step 1: 定义 Agent 状态和上下文**
`backend/app/agents/state.py`:
```python
from dataclasses import dataclass, field
from app.models.enums import TaskStatus
@dataclass
class AgentContext:
task_id: str
status: TaskStatus
user_intent: str | None = None
current_agent: str | None = None
ocr_results: list[dict] = field(default_factory=list)
reimbursement_data: dict | None = None
precheck_result: dict | None = None
supplement_requests: list[dict] = field(default_factory=list)
error_message: str | None = None
retry_count: int = 0
```
- [ ] **Step 2: 定义 Agent 基类和结果**
`backend/app/agents/base_agent.py`:
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.state import AgentContext
@dataclass
class AgentResult:
success: bool
data: dict = field(default_factory=dict)
next_action: str = "continue" # continue / wait_user / need_supplement / retry
error: str | None = None
class BaseAgent(ABC):
name: str = ""
@abstractmethod
async def execute(self, context: AgentContext, db: AsyncSession) -> AgentResult:
...
```
- [ ] **Step 3: 实现 Orchestrator 状态机**
`backend/app/agents/orchestrator.py` — 核心编排逻辑:
状态转换图(对应开发文档 4.2 节):
```
CREATED → MATERIAL_COLLECTING → PARSING → DRAFT_GENERATED → PRECHECKING
↑ ↓
└─── MATERIAL_COLLECTING ←── NEED_SUPPLEMENT ←────────────────┘
PENDING_USER_CONFIRM → SUBMITTING → SYNCED
SYNC_FAILED → SUBMITTING重试
```
```python
from sqlalchemy.ext.asyncio import AsyncSession
from app.agents.state import AgentContext
from app.agents.base_agent import BaseAgent, AgentResult
from app.models.enums import TaskStatus
from app.services.task_service import TaskService
class Orchestrator:
def __init__(self, db: AsyncSession):
self.db = db
self.agents: dict[str, BaseAgent] = {}
def register_agent(self, agent: BaseAgent):
self.agents[agent.name] = agent
async def run(self, task_id: str, start_from: str = "intake") -> AgentContext:
"""启动编排流程"""
task_svc = TaskService(self.db)
task = await task_svc.get_task(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
context = AgentContext(
task_id=task_id,
status=task.status,
user_intent=task.user_intent,
)
# 根据 start_from 决定从哪个状态开始
agent_sequence = self._get_agent_sequence(start_from)
for agent_name in agent_sequence:
context.current_agent = agent_name
await task_svc.update_status(task_id, self._agent_to_status(agent_name), agent_name)
agent = self.agents.get(agent_name)
if not agent:
continue
result = await agent.execute(context, self.db)
if not result.success:
context.error_message = result.error
break
context = self._merge_result(context, result)
if result.next_action == "wait_user":
await task_svc.update_status(task_id, TaskStatus.PENDING_USER_CONFIRM, agent_name)
break
if result.next_action == "need_supplement":
await task_svc.update_status(task_id, TaskStatus.NEED_SUPPLEMENT, agent_name)
break
return context
def _get_agent_sequence(self, start_from: str) -> list[str]:
sequences = {
"intake": ["intake_agent", "parse_agent", "rule_check_agent", "explain_agent"],
"precheck": ["rule_check_agent", "explain_agent"],
"submit": ["sync_agent"],
}
return sequences.get(start_from, sequences["intake"])
def _agent_to_status(self, agent_name: str) -> TaskStatus:
mapping = {
"intake_agent": TaskStatus.MATERIAL_COLLECTING,
"parse_agent": TaskStatus.PARSING,
"rule_check_agent": TaskStatus.PRECHECKING,
"explain_agent": TaskStatus.PRECHECKING,
"sync_agent": TaskStatus.SUBMITTING,
}
return mapping.get(agent_name, TaskStatus.PRECHECKING)
def _merge_result(self, context: AgentContext, result: AgentResult) -> AgentContext:
"""将 Agent 结果合并到上下文"""
data = result.data
if "ocr_results" in data:
context.ocr_results = data["ocr_results"]
if "reimbursement_data" in data:
context.reimbursement_data = data["reimbursement_data"]
if "precheck_result" in data:
context.precheck_result = data["precheck_result"]
if "supplement_requests" in data:
context.supplement_requests = data["supplement_requests"]
return context
```
- [ ] **Step 4: 实现 Agent 启动 API**
`backend/app/api/v1/agent.py`:
```python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from app.core.database import get_db
router = APIRouter(prefix="/reimbursement/tasks/{task_id}/agent", tags=["agent"])
class AgentRunRequest(BaseModel):
start_from: str = "intake" # intake / precheck / submit
mode: str = "precheck"
class AgentRunResponse(BaseModel):
task_id: str
status: str
current_agent: str | None
@router.post("/run", response_model=AgentRunResponse)
async def run_agent(task_id: str, req: AgentRunRequest, db: AsyncSession = Depends(get_db)):
orchestrator = create_orchestrator(db)
context = await orchestrator.run(task_id, start_from=req.start_from)
return AgentRunResponse(
task_id=context.task_id,
status=context.status.value,
current_agent=context.current_agent,
)
```
- [ ] **Step 5: 编写状态机转换测试**
覆盖路径:
- 正常路径:创建 → 解析 → 草稿 → 预审 → 通过 → 提交 → 同步
- 补件路径:预审 → 需补件 → 等待用户
- 重试路径:提交 → 同步失败 → 重试
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现 Agent Orchestrator 状态机编排"
```
---
### Task 3.2: 5 个 Agent 实现
**负责人:** 全栈/Agent 工程师
**预计工时:** 3 天
**前置依赖:** Task 3.1Orchestrator 就绪)
**Files:**
- Create: `backend/app/agents/intake_agent.py`
- Create: `backend/app/agents/parse_agent.py`
- Create: `backend/app/agents/rule_check_agent.py`
- Create: `backend/app/agents/explain_agent.py`
- Create: `backend/app/agents/sync_agent.py`
- Test: `backend/tests/test_agents.py`
- [ ] **Step 1: 实现 IntakeAgent受理 Agent**
`backend/app/agents/intake_agent.py`:
- 分析 user_intent 文本,提取报销类型、出差信息
- 调用 LLM 做 intent classification
- 返回结构化任务信息(报销类型、出差城市、日期范围等)
- 输出:`AgentResult(data={"task_info": {...}})`
- [ ] **Step 2: 实现 ParseAgent单据解析 Agent**
`backend/app/agents/parse_agent.py`:
- 遍历任务下所有 document调用 `ocr_service.recognize()`
- 将 OCR 结果汇总为报销明细
- 调用 `ledger_service.create_shadow_reimbursement()` 创建影子记录
- 调用 `ledger_service.add_item()` 添加每条明细
- 自动识别费用类型(可调用 LLM 辅助)
- 输出:`AgentResult(data={"ocr_results": [...], "reimbursement_data": {...}})`
- [ ] **Step 3: 实现 RuleCheckAgent规则校验 Agent**
`backend/app/agents/rule_check_agent.py`:
- 构建 context dict报销数据 + 票据数据 + 已有发票列表)
- 注册 6 个 RuleChecker 到 RuleEngine
- 调用 `rule_engine.run_precheck(context)`
- 保存 RuleHit 记录到 DB
- 更新 shadow_reimbursement 的预审状态
- 输出:`AgentResult(data={"precheck_result": {...}})`
- [ ] **Step 4: 实现 ExplainAgent解释与补件 Agent**
`backend/app/agents/explain_agent.py`:
- 遍历 rule_hits使用 LLM 生成自然语言解释
-`require_attachment` 类型的命中自动创建 supplement_request
- 生成修改建议
- 根据预审结果决定 next_action
- 全部通过 → `continue`
- 有需补件的 → `need_supplement`
- 有阻断的 → `need_supplement`
- 输出:`AgentResult(data={"supplement_requests": [...]}, next_action="need_supplement")`
- [ ] **Step 5: 实现 SyncAgent同步执行 Agent**
`backend/app/agents/sync_agent.py`:
- 将 ShadowReimbursement 数据映射为标准报销单格式
- 调用 `sync_service.mock_sync_to_backend()`
- 记录 SyncRecord
- 更新 shadow_reimbursement 的 sync_status 和 backend_bill_id
- 处理同步失败重试retry_count < 3 时标记 retrying
- 输出:`AgentResult(data={"sync_result": {...}})`
- [ ] **Step 6: 编写每个 Agent 的单元测试**
使用 mock DB、mock OCR、mock LLM 测试每个 Agent 的输入输出。
- [ ] **Step 7: Commit**
```bash
git add backend/
git commit -m "feat: 实现 5 个 Agent受理/解析/规则校验/解释补件/同步)"
```
---
### Task 3.3: LLM 集成层
**负责人:** 全栈/Agent 工程师
**预计工时:** 2 天
**前置依赖:** Phase 2
**可并行于:** Task 3.2
**Files:**
- Create: `backend/app/services/llm_service.py`
- Create: `backend/app/services/llm_prompts/__init__.py`
- Create: `backend/app/services/llm_prompts/intent_classification.py`
- Create: `backend/app/services/llm_prompts/risk_explanation.py`
- Create: `backend/app/services/llm_prompts/expense_type_mapping.py`
- Test: `backend/tests/test_llm_service.py`
- [ ] **Step 1: 实现 LLM Service 封装**
`backend/app/services/llm_service.py`:
```python
import httpx
import json
from app.core.config import settings
class LLMService:
def __init__(self):
self.api_key = settings.LLM_API_KEY
self.model = settings.LLM_MODEL
self.base_url = settings.LLM_BASE_URL or "https://api.openai.com/v1"
async def chat(self, system_prompt: str, user_message: str, json_mode: bool = False) -> str:
"""调用 LLM API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
}
if json_mode:
payload["response_format"] = {"type": "json_object"}
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]
async def chat_json(self, system_prompt: str, user_message: str) -> dict:
"""调用 LLM 并解析 JSON 响应"""
raw = await self.chat(system_prompt, user_message, json_mode=True)
return json.loads(raw)
```
- [ ] **Step 2: 编写 Prompt 模板**
**`intent_classification.py`** — 分析用户意图,识别报销类型:
```python
INTENT_CLASSIFICATION_PROMPT = """你是一个报销意图识别助手。根据用户的描述,识别报销类型和关键信息。
请严格按以下 JSON 格式输出:
{
"reimbursement_type": "travel_expense" | "office_expense" | "business_meal" | "other",
"travel_info": {
"destination": "城市名",
"start_date": "YYYY-MM-DD" 或 null,
"end_date": "YYYY-MM-DD" 或 null,
"purpose": "出差事由"
},
"confidence": 0.0-1.0
}
用户描述:{user_intent}
"""
```
**`risk_explanation.py`** — 将规则命中结果转为自然语言解释:
```python
RISK_EXPLANATION_PROMPT = """你是一个报销制度解释助手。请根据规则命中结果,用简洁易懂的语言向用户解释问题。
规则命中信息:
- 规则名称:{rule_name}
- 问题类型:{issue_type}
- 制度依据:{policy_ref}
- 具体数据:{hit_detail}
请用 2-3 句话解释:
1. 存在什么问题
2. 制度标准是什么
3. 建议如何处理
"""
```
**`expense_type_mapping.py`** — 根据 OCR 结果匹配费用类型:
```python
EXPENSE_TYPE_MAPPING_PROMPT = """根据票据 OCR 识别结果,判断费用类型。
可选费用类型:
- travel_transport: 差旅交通费(火车票、机票、打车)
- travel_hotel: 差旅住宿费(酒店发票)
- travel_meal: 差旅餐补
- local_transport: 市内交通费
- business_meal: 业务招待费
- office_supply: 办公用品费
- communication: 通讯费
- other: 其他
OCR 识别结果:{ocr_result}
请输出 JSON{"expense_type": "类型编码", "confidence": 0.0-1.0}
"""
```
- [ ] **Step 3: 编写测试(使用 mock LLM 响应)**
```python
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_llm_intent_classification():
with patch("app.services.llm_service.LLMService.chat_json", new_callable=AsyncMock) as mock_chat:
mock_chat.return_value = {
"reimbursement_type": "travel_expense",
"travel_info": {"destination": "北京", "purpose": "商务出差"},
"confidence": 0.9
}
llm = LLMService()
result = await llm.chat_json("system prompt", "我要报北京出差费用")
assert result["reimbursement_type"] == "travel_expense"
```
- [ ] **Step 4: Commit**
```bash
git add backend/
git commit -m "feat: 实现 LLM 集成层(多 Provider + Prompt 模板)"
```
---
### Task 3.4: 审计日志
**负责人:** 后端工程师 B
**预计工时:** 1 天
**前置依赖:** Phase 2
**可并行于:** Task 3.1、3.2、3.3
**Files:**
- Create: `backend/app/services/audit_service.py`
- Create: `backend/app/api/v1/audit.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_audit.py`
- [ ] **Step 1: 实现 AuditService**
`backend/app/services/audit_service.py`:
```python
import json
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import Mapped
from app.models.audit import AuditLog
class AuditService:
def __init__(self, db: AsyncSession):
self.db = db
async def log(self, action: str, actor: str, target_type: str, target_id: str, detail: dict | None = None):
"""记录审计日志"""
log_entry = AuditLog(
action=action,
actor=actor,
target_type=target_type,
target_id=target_id,
detail=json.dumps(detail, ensure_ascii=False) if detail else None,
)
self.db.add(log_entry)
await self.db.flush() # 不 commit让调用方统一 commit
async def query_logs(self, target_type: str | None = None, target_id: str | None = None,
actor: str | None = None, page: int = 1, size: int = 50):
"""查询审计日志"""
query = select(AuditLog)
if target_type:
query = query.where(AuditLog.target_type == target_type)
if target_id:
query = query.where(AuditLog.target_id == target_id)
if actor:
query = query.where(AuditLog.actor == actor)
query = query.order_by(AuditLog.created_at.desc()).offset((page - 1) * size).limit(size)
result = await self.db.execute(query)
return result.scalars().all()
```
- [ ] **Step 2: 定义审计动作枚举**
```python
class AuditAction:
FILE_UPLOAD = "file_upload"
OCR_RECOGNIZE = "ocr_recognize"
AGENT_CALL = "agent_call"
RULE_HIT = "rule_hit"
SUPPLEMENT_REQUEST = "supplement_request"
SUPPLEMENT_RESPOND = "supplement_respond"
USER_CONFIRM = "user_confirm"
BACKEND_SYNC = "backend_sync"
```
- [ ] **Step 3: 在关键路径埋点**
在以下位置调用 `audit_service.log()`
- `document_service.upload_document()``FILE_UPLOAD`
- `ocr_service.recognize()``OCR_RECOGNIZE`
- `orchestrator._run_agent()``AGENT_CALL`
- `rule_engine.run_precheck()``RULE_HIT`(每条命中记录一条)
- `supplement_service.create_supplement_request()``SUPPLEMENT_REQUEST`
- `supplement_service.respond_supplement()``SUPPLEMENT_RESPOND`
- `sync_service.mock_sync_to_backend()``BACKEND_SYNC`
- [ ] **Step 4: 实现审计日志查询 API**
`GET /api/v1/audit/logs` — 支持按 target_type、target_id、actor、date_range 过滤
- [ ] **Step 5: 编写测试**
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现审计日志服务(记录 + 查询 API"
```
---
## 本阶段完成检查
- [ ] Orchestrator 状态机所有转换路径测试通过
- [ ] 5 个 Agent 能独立执行并返回正确结果
- [ ] LLM Service 能调用大模型并解析 JSON 响应
- [ ] 审计日志在所有关键路径都有记录
- [ ] `POST /api/v1/reimbursement/tasks/{id}/agent/run` 能启动完整 Agent 流程
- [ ] 所有测试 `pytest tests/ -v` 全部通过