Files
X-Financial/docs/plans/phase-2-backend-core/README.md

835 lines
26 KiB
Markdown
Raw Normal View History

# Phase 2: 后端核心服务W2-W3
> **目标:** 实现所有后端业务 API包括任务管理、文件上传、OCR 集成、规则引擎、影子账本、补件与提交。
> **周期:** 第 2 ~ 3 周
> **任务数:** 6 个
> **可并行:** Task 2.1 / 2.2 / 2.3 可并行Task 2.4 依赖 2.1Task 2.5 依赖 2.2 + 2.4
> **前置依赖:** Phase 1 完成
---
## 本阶段交付物
| 交付物 | 说明 |
|---|---|
| 报销任务 API | 创建/查询/列表 |
| 文件上传 API | MinIO 存储 + 票据管理 |
| OCR 服务 | 百度云 + Mock Provider |
| 规则引擎 | 6 条核心规则 + 管理 API |
| 影子账本 API | 草稿/预审结果查询 |
| 补件 + 提交 API | 补件交互 + 模拟同步 |
---
## 任务清单
### Task 2.1: 报销任务管理 API
**负责人:** 后端工程师 A
**预计工时:** 1.5 天
**前置依赖:** Phase 1
**Files:**
- Create: `backend/app/schemas/task.py`
- Create: `backend/app/services/task_service.py`
- Create: `backend/app/api/v1/tasks.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_task_api.py`
- [ ] **Step 1: 定义 Pydantic schemas**
`backend/app/schemas/task.py`:
```python
from pydantic import BaseModel
from datetime import datetime
class TaskCreateRequest(BaseModel):
user_id: str
company_id: str
user_intent: str
entry_channel: str = "web"
class TaskResponse(BaseModel):
task_id: str
status: str
class Config:
from_attributes = True
class TaskDetailResponse(BaseModel):
id: str
user_id: str
company_id: str
task_type: str
status: str
user_intent: str | None
current_agent: str | None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class TaskListResponse(BaseModel):
total: int
items: list[TaskDetailResponse]
```
- [ ] **Step 2: 实现 TaskService 业务逻辑**
`backend/app/services/task_service.py`:
```python
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.models.task import ReimbursementTask
from app.models.enums import TaskStatus
from app.schemas.task import TaskCreateRequest
class TaskService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_task(self, req: TaskCreateRequest) -> ReimbursementTask:
task = ReimbursementTask(
user_id=req.user_id,
company_id=req.company_id,
user_intent=req.user_intent,
status=TaskStatus.MATERIAL_COLLECTING,
)
self.db.add(task)
await self.db.commit()
await self.db.refresh(task)
return task
async def get_task(self, task_id: str) -> ReimbursementTask | None:
result = await self.db.execute(
select(ReimbursementTask).where(ReimbursementTask.id == task_id)
)
return result.scalar_one_or_none()
async def list_tasks(self, user_id: str | None = None, status: str | None = None,
page: int = 1, size: int = 20) -> tuple[list[ReimbursementTask], int]:
query = select(ReimbursementTask)
count_query = select(func.count()).select_from(ReimbursementTask)
if user_id:
query = query.where(ReimbursementTask.user_id == user_id)
count_query = count_query.where(ReimbursementTask.user_id == user_id)
if status:
query = query.where(ReimbursementTask.status == status)
count_query = count_query.where(ReimbursementTask.status == status)
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
query = query.offset((page - 1) * size).limit(size).order_by(ReimbursementTask.created_at.desc())
result = await self.db.execute(query)
return result.scalars().all(), total
async def update_status(self, task_id: str, status: TaskStatus, current_agent: str | None = None) -> ReimbursementTask | None:
task = await self.get_task(task_id)
if not task:
return None
task.status = status
task.current_agent = current_agent
await self.db.commit()
await self.db.refresh(task)
return task
```
- [ ] **Step 3: 实现 API 路由**
`backend/app/api/v1/tasks.py`:
```python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.schemas.task import TaskCreateRequest, TaskResponse, TaskDetailResponse, TaskListResponse
from app.services.task_service import TaskService
router = APIRouter(prefix="/reimbursement/tasks", tags=["tasks"])
@router.post("", response_model=TaskResponse, status_code=201)
async def create_task(req: TaskCreateRequest, db: AsyncSession = Depends(get_db)):
svc = TaskService(db)
task = await svc.create_task(req)
return TaskResponse(task_id=task.id, status=task.status.value)
@router.get("/{task_id}", response_model=TaskDetailResponse)
async def get_task(task_id: str, db: AsyncSession = Depends(get_db)):
svc = TaskService(db)
task = await svc.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.get("", response_model=TaskListResponse)
async def list_tasks(user_id: str | None = None, status: str | None = None,
page: int = 1, size: int = 20, db: AsyncSession = Depends(get_db)):
svc = TaskService(db)
items, total = await svc.list_tasks(user_id, status, page, size)
return TaskListResponse(total=total, items=items)
```
更新 `backend/app/api/v1/router.py`:
```python
from fastapi import APIRouter
from app.api.v1.tasks import router as tasks_router
api_router = APIRouter()
api_router.include_router(tasks_router)
```
- [ ] **Step 4: 编写测试**
```python
# backend/tests/test_task_api.py
import pytest
@pytest.mark.asyncio
async def test_create_task(client):
response = await client.post("/api/v1/reimbursement/tasks", json={
"user_id": "U001",
"company_id": "C001",
"user_intent": "我要报这次北京出差的费用",
"entry_channel": "web"
})
assert response.status_code == 201
data = response.json()
assert "task_id" in data
assert data["status"] == "material_collecting"
@pytest.mark.asyncio
async def test_get_task(client):
# 先创建
create_resp = await client.post("/api/v1/reimbursement/tasks", json={
"user_id": "U001", "company_id": "C001", "user_intent": "test"
})
task_id = create_resp.json()["task_id"]
# 再查询
get_resp = await client.get(f"/api/v1/reimbursement/tasks/{task_id}")
assert get_resp.status_code == 200
assert get_resp.json()["user_id"] == "U001"
@pytest.mark.asyncio
async def test_list_tasks(client):
response = await client.get("/api/v1/reimbursement/tasks")
assert response.status_code == 200
assert "total" in response.json()
assert "items" in response.json()
```
- [ ] **Step 5: 运行测试**
Run: `cd backend && pytest tests/test_task_api.py -v`
Expected: All PASS
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现报销任务管理 API创建/查询/列表)"
```
---
### Task 2.2: 文件上传与票据管理 API
**负责人:** 后端工程师 B
**预计工时:** 1.5 天
**前置依赖:** Phase 1
**Files:**
- Create: `backend/app/schemas/document.py`
- Create: `backend/app/services/document_service.py`
- Create: `backend/app/services/storage_service.py`
- Create: `backend/app/api/v1/documents.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_document_api.py`
- [ ] **Step 1: 实现 MinIO 存储服务**
`backend/app/services/storage_service.py` — 封装 MinIO 操作:
- `upload_file(bucket, file_name, file_data, content_type)` → 上传文件到 MinIO
- `get_file_url(bucket, file_name)` → 获取文件访问 URL
- `delete_file(bucket, file_name)` → 删除文件
- `ensure_bucket(bucket)` → 确保 bucket 存在
开发阶段可使用 mock 实现(本地文件系统存储)。
- [ ] **Step 2: 实现文档服务**
`backend/app/services/document_service.py`:
- `upload_document(task_id, file, document_type)` → 保存文件到 MinIO创建 DB 记录
- `get_documents(task_id)` → 查询任务下所有票据
- `get_document(document_id)` → 查询单个票据
- `update_ocr_result(document_id, ocr_result)` → 更新 OCR 识别结果
- [ ] **Step 3: 定义 Pydantic schemas**
`backend/app/schemas/document.py`:
```python
from pydantic import BaseModel
from datetime import date
from decimal import Decimal
class DocumentUploadResponse(BaseModel):
document_id: str
ocr_status: str
class DocumentResponse(BaseModel):
id: str
task_id: str
document_type: str
file_url: str
ocr_status: str
invoice_code: str | None
invoice_number: str | None
invoice_date: date | None
amount: Decimal | None
seller_name: str | None
class Config:
from_attributes = True
```
- [ ] **Step 4: 实现 API 路由**
`backend/app/api/v1/documents.py`:
```python
from fastapi import APIRouter, Depends, UploadFile, File, Form, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.schemas.document import DocumentUploadResponse, DocumentResponse
from app.services.document_service import DocumentService
router = APIRouter(prefix="/reimbursement/tasks/{task_id}/documents", tags=["documents"])
@router.post("", response_model=DocumentUploadResponse, status_code=201)
async def upload_document(
task_id: str,
file: UploadFile = File(...),
document_type: str = Form(...),
db: AsyncSession = Depends(get_db)
):
svc = DocumentService(db)
doc = await svc.upload_document(task_id, file, document_type)
return DocumentUploadResponse(document_id=doc.id, ocr_status=doc.ocr_status)
@router.get("", response_model=list[DocumentResponse])
async def list_documents(task_id: str, db: AsyncSession = Depends(get_db)):
svc = DocumentService(db)
docs = await svc.get_documents(task_id)
return docs
```
`router.py` 中注册 documents_router。
- [ ] **Step 5: 编写测试(使用 mock MinIO**
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现文件上传与票据管理 APIMinIO 存储)"
```
---
### Task 2.3: OCR 服务集成
**负责人:** 后端工程师 B
**预计工时:** 2 天
**前置依赖:** Task 2.2(需要 document_service
**可并行于:** Task 2.1、Task 2.4
**Files:**
- Create: `backend/app/services/ocr_service.py`
- Create: `backend/app/services/ocr_providers/__init__.py`
- Create: `backend/app/services/ocr_providers/base.py`
- Create: `backend/app/services/ocr_providers/baidu.py`
- Create: `backend/app/services/ocr_providers/mock.py`
- Test: `backend/tests/test_ocr_service.py`
- [ ] **Step 1: 定义 OCR Provider 抽象接口**
`backend/app/services/ocr_providers/base.py`:
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@dataclass
class OCRResult:
document_type: str # 识别出的票据类型
raw_text: str # 原始文字
fields: dict = field(default_factory=dict) # 结构化字段
confidence: float = 0.0 # 整体置信度 0-1
provider: str = "" # 提供商名称
class OCRProvider(ABC):
@abstractmethod
async def recognize(self, file_url: str, document_type: str | None = None) -> OCRResult:
...
@abstractmethod
async def recognize_vat_invoice(self, file_url: str) -> OCRResult:
...
@abstractmethod
async def recognize_train_ticket(self, file_url: str) -> OCRResult:
...
```
- [ ] **Step 2: 实现 Mock OCR Provider**
`backend/app/services/ocr_providers/mock.py` — 根据文件名/类型返回预定义的结构化数据:
```python
from app.services.ocr_providers.base import OCRProvider, OCRResult
class MockOCRProvider(OCRProvider):
async def recognize(self, file_url: str, document_type: str | None = None) -> OCRResult:
if document_type == "vat_invoice" or "invoice" in file_url:
return await self.recognize_vat_invoice(file_url)
elif document_type == "train_ticket" or "train" in file_url:
return await self.recognize_train_ticket(file_url)
return OCRResult(document_type="unknown", raw_text="", confidence=0.0, provider="mock")
async def recognize_vat_invoice(self, file_url: str) -> OCRResult:
return OCRResult(
document_type="vat_invoice",
raw_text="增值税电子普通发票",
fields={
"invoice_code": "050002100311",
"invoice_number": "23912077",
"invoice_date": "2026-04-20",
"amount": "1061.95",
"tax_amount": "61.95",
"total_amount": "1123.90",
"seller_name": "北京XX酒店管理有限公司",
"buyer_name": "XX科技有限公司",
"check_code": "1234567890",
},
confidence=0.95,
provider="mock"
)
async def recognize_train_ticket(self, file_url: str) -> OCRResult:
return OCRResult(
document_type="train_ticket",
raw_text="火车票",
fields={
"train_number": "G101",
"departure_station": "北京南",
"arrival_station": "上海虹桥",
"departure_date": "2026-04-18",
"departure_time": "07:00",
"seat_type": "二等座",
"amount": "553.00",
"passenger_name": "张三",
"id_number": "****1234",
},
confidence=0.90,
provider="mock"
)
```
- [ ] **Step 3: 实现百度 OCR Provider**
`backend/app/services/ocr_providers/baidu.py` — 调用百度云 OCR API
- `recognize_vat_invoice()` → 调用增值税发票识别接口
- `recognize_train_ticket()` → 调用火车票识别接口
- `recognize()` → 自动判断票据类型,调用对应接口
- 将百度返回结果标准化为 `OCRResult`
- 包含 access_token 获取和缓存逻辑
- [ ] **Step 4: 实现 OCR Service 门面**
`backend/app/services/ocr_service.py`:
```python
from app.core.config import settings
from app.services.ocr_providers.base import OCRResult
from app.services.ocr_providers.mock import MockOCRProvider
from app.services.ocr_providers.baidu import BaiduOCRProvider
class OCRService:
def __init__(self):
self._provider = self._create_provider()
def _create_provider(self):
if settings.OCR_PROVIDER == "mock":
return MockOCRProvider()
elif settings.OCR_PROVIDER == "baidu":
return BaiduOCRProvider(
api_key=settings.BAIDU_OCR_API_KEY,
secret_key=settings.BAIDU_OCR_SECRET_KEY
)
raise ValueError(f"Unknown OCR provider: {settings.OCR_PROVIDER}")
async def recognize(self, file_url: str, document_type: str | None = None) -> OCRResult:
return await self._provider.recognize(file_url, document_type)
```
- [ ] **Step 5: 编写测试**
使用 Mock Provider 测试完整 OCR 流程。
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现 OCR 服务(百度云 + Mock Provider"
```
---
### Task 2.4: 规则引擎
**负责人:** 后端工程师 A
**预计工时:** 3 天
**前置依赖:** Task 2.1(需要 task 模型)
**Files:**
- Create: `backend/app/services/rule_engine.py`
- Create: `backend/app/services/rule_checkers/__init__.py`
- Create: `backend/app/services/rule_checkers/base.py`
- Create: `backend/app/services/rule_checkers/required_fields.py`
- Create: `backend/app/services/rule_checkers/attachment_check.py`
- Create: `backend/app/services/rule_checkers/duplicate_invoice.py`
- Create: `backend/app/services/rule_checkers/amount_limit.py`
- Create: `backend/app/services/rule_checkers/date_validity.py`
- Create: `backend/app/services/rule_checkers/expense_type_match.py`
- Create: `backend/app/schemas/rule.py`
- Create: `backend/app/api/v1/rules.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_rule_engine.py`
- [ ] **Step 1: 定义规则检查器基类**
`backend/app/services/rule_checkers/base.py`:
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class RuleCheckResult:
rule_code: str
severity: str # low / medium / high / blocked
action: str # pass / warn / require_explanation / require_attachment / require_approval / block
message: str
suggestion: str
policy_ref: str
hit_detail: dict
class RuleChecker(ABC):
@abstractmethod
async def check(self, context: dict) -> RuleCheckResult | None:
"""检查规则,命中返回结果,未命中返回 None"""
...
```
- [ ] **Step 2: 实现 RuleEngine 核心引擎**
`backend/app/services/rule_engine.py`:
```python
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.rule_checkers.base import RuleChecker, RuleCheckResult
from dataclasses import dataclass, field
@dataclass
class PrecheckResult:
precheck_status: str # pass / need_supplement / blocked
risk_level: str # low / medium / high / blocked
rule_hits: list[RuleCheckResult] = field(default_factory=list)
summary: str = ""
class RuleEngine:
def __init__(self, db: AsyncSession):
self.db = db
self.checkers: list[RuleChecker] = []
def register_checker(self, checker: RuleChecker):
self.checkers.append(checker)
async def run_precheck(self, context: dict) -> PrecheckResult:
"""执行完整预审,遍历所有注册的 checker"""
hits: list[RuleCheckResult] = []
for checker in self.checkers:
result = await checker.check(context)
if result:
hits.append(result)
risk_level = self._calculate_overall_risk(hits)
status = self._determine_status(hits)
summary = self._generate_summary(hits)
return PrecheckResult(
precheck_status=status,
risk_level=risk_level,
rule_hits=hits,
summary=summary
)
def _calculate_overall_risk(self, hits: list[RuleCheckResult]) -> str:
if not hits:
return "low"
severity_order = {"blocked": 4, "high": 3, "medium": 2, "low": 1}
max_severity = max(hits, key=lambda h: severity_order.get(h.severity, 0))
return max_severity.severity
def _determine_status(self, hits: list[RuleCheckResult]) -> str:
if not hits:
return "pass"
if any(h.action == "block" for h in hits):
return "blocked"
return "need_supplement"
def _generate_summary(self, hits: list[RuleCheckResult]) -> str:
if not hits:
return "预审通过,未发现风险。"
blocked = sum(1 for h in hits if h.action == "block")
warnings = sum(1 for h in hits if h.action in ("warn", "require_explanation"))
supplements = sum(1 for h in hits if h.action == "require_attachment")
parts = []
if blocked:
parts.append(f"{blocked} 个阻断项")
if supplements:
parts.append(f"{supplements} 个缺件")
if warnings:
parts.append(f"{warnings} 个风险提示")
return f"当前报销单存在{'、'.join(parts)}。"
```
- [ ] **Step 3: 实现 6 条核心规则检查器**
1. **`required_fields.py`** — `RequiredFieldsChecker` — 必填字段校验
- 检查报销人、部门、事由、费用明细是否有空值
- 命中返回 `require_explanation`
2. **`attachment_check.py`** — `AttachmentCheckChecker` — 附件完整性校验
- 住宿费必须上传酒店流水
- 交通费必须上传对应票据
- 命中返回 `require_attachment`
3. **`duplicate_invoice.py`** — `DuplicateInvoiceChecker` — 重复发票检查
- 检查 invoice_code + invoice_number + amount 是否重复
- 命中返回 `block`
4. **`amount_limit.py`** — `AmountLimitChecker` — 金额超标校验
- 按城市等级和费用类型检查标准
- 住宿费按每晚金额检查
- 命中返回 `require_explanation`
5. **`date_validity.py`** — `DateValidityChecker` — 日期合理性校验
- 费用日期不能晚于今天
- 费用日期应在出差期间内
- 命中返回 `warn`
6. **`expense_type_match.py`** — `ExpenseTypeMatchChecker` — 费用类型匹配校验
- 住宿费应关联 hotel_bill 类型票据
- 交通费应关联 train_ticket / flight_itinerary / taxi_receipt
- 命中返回 `warn`
- [ ] **Step 4: 实现规则管理 API**
- `GET /api/v1/rules` — 列出所有规则
- `POST /api/v1/rules` — 创建规则
- `PUT /api/v1/rules/{rule_id}` — 更新规则
- `PATCH /api/v1/rules/{rule_id}/toggle` — 启用/禁用规则
- [ ] **Step 5: 编写测试**
对每条规则编写单元测试:
```python
@pytest.mark.asyncio
async def test_duplicate_invoice_checker():
checker = DuplicateInvoiceChecker()
# 模拟重复发票场景
context = {"items": [...], "existing_invoices": [...]}
result = await checker.check(context)
assert result is not None
assert result.action == "block"
@pytest.mark.asyncio
async def test_no_duplicate():
checker = DuplicateInvoiceChecker()
context = {"items": [...], "existing_invoices": []} # 无重复
result = await checker.check(context)
assert result is None
```
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现规则引擎6 条核心规则 + 管理 API"
```
---
### Task 2.5: 影子报销账本 CRUD
**负责人:** 后端工程师 A
**预计工时:** 1.5 天
**前置依赖:** Task 2.1 + Task 2.4
**Files:**
- Create: `backend/app/schemas/reimbursement.py`
- Create: `backend/app/services/ledger_service.py`
- Create: `backend/app/api/v1/ledger.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_ledger_api.py`
- [ ] **Step 1: 定义 Pydantic schemas**
`backend/app/schemas/reimbursement.py`:
```python
from pydantic import BaseModel
from datetime import date, datetime
from decimal import Decimal
class ReimbursementItemResponse(BaseModel):
id: str
expense_type: str
amount: Decimal
tax_amount: Decimal | None
occurred_at: date | None
city: str | None
vendor_name: str | None
risk_level: str | None
remark: str | None
class Config:
from_attributes = True
class ReimbursementDraftResponse(BaseModel):
reimbursement_id: str
reason: str | None
total_amount: Decimal
precheck_status: str | None
risk_level: str | None
items: list[ReimbursementItemResponse]
created_at: datetime
class Config:
from_attributes = True
class PrecheckResultResponse(BaseModel):
precheck_status: str
risk_level: str
summary: str
rule_hits: list[dict]
```
- [ ] **Step 2: 实现 LedgerService**
`backend/app/services/ledger_service.py` — 核心方法:
- `create_shadow_reimbursement(task_id, data)` → 创建影子报销记录
- `get_draft(reimbursement_id)` → 获取报销草稿
- `get_draft_by_task(task_id)` → 通过任务 ID 获取草稿
- `update_precheck_result(reimbursement_id, result)` → 更新预审结果
- `add_item(reimbursement_id, item_data)` → 添加报销明细
- [ ] **Step 3: 实现 API 路由**
`GET /api/v1/reimbursement/tasks/{task_id}/draft` — 获取报销草稿(对应文档 8.4
`GET /api/v1/reimbursement/tasks/{task_id}/precheck-result` — 获取预审结果(对应文档 8.5
- [ ] **Step 4: 编写测试**
- [ ] **Step 5: Commit**
```bash
git add backend/
git commit -m "feat: 实现影子报销账本 CRUD API"
```
---
### Task 2.6: 补件与提交 API
**负责人:** 后端工程师 A
**预计工时:** 1.5 天
**前置依赖:** Task 2.5
**Files:**
- Create: `backend/app/api/v1/supplements.py`
- Create: `backend/app/services/supplement_service.py`
- Create: `backend/app/services/sync_service.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_supplement_api.py`
- [ ] **Step 1: 实现补件服务**
`backend/app/services/supplement_service.py`:
- `create_supplement_request(reimbursement_id, items)` → 创建补件请求
- `respond_supplement(request_id, response_text, document_ids)` → 用户补件响应
- `get_supplement_requests(task_id)` → 查询补件请求列表
- [ ] **Step 2: 实现同步服务MVP 阶段为模拟)**
`backend/app/services/sync_service.py`:
```python
import uuid
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
class SyncService:
def __init__(self, db: AsyncSession):
self.db = db
async def mock_sync_to_backend(self, reimbursement_id: str) -> dict:
"""模拟后端同步,生成假的 backend_bill_id"""
backend_bill_id = f"BX{datetime.now().strftime('%Y%m%d')}{str(uuid.uuid4())[:6]}"
return {
"sync_status": "success",
"target_system": "expense_system",
"backend_bill_id": backend_bill_id,
}
async def get_sync_status(self, task_id: str) -> dict | None:
"""查询同步状态"""
# 从 sync_record 表查询
...
```
- [ ] **Step 3: 实现 API 路由**
`POST /api/v1/reimbursement/tasks/{task_id}/supplements` — 用户补件(对应文档 8.6
`POST /api/v1/reimbursement/tasks/{task_id}/submit` — 用户确认提交(对应文档 8.7
`GET /api/v1/reimbursement/tasks/{task_id}/sync-status` — 查询同步状态(对应文档 8.8
- [ ] **Step 4: 编写测试**
- [ ] **Step 5: Commit**
```bash
git add backend/
git commit -m "feat: 实现补件与提交确认 API含模拟同步"
```
---
## 本阶段完成检查
- [ ] `POST /api/v1/reimbursement/tasks` 创建任务返回 201
- [ ] `POST /api/v1/reimbursement/tasks/{id}/documents` 上传文件返回 201
- [ ] OCR Service 对 Mock Provider 正常返回结构化数据
- [ ] 规则引擎对 6 条规则命中/未命中的测试全部通过
- [ ] `GET /api/v1/reimbursement/tasks/{id}/draft` 返回草稿数据
- [ ] `POST /api/v1/reimbursement/tasks/{id}/supplements` 补件返回 received
- [ ] `POST /api/v1/reimbursement/tasks/{id}/submit` 提交返回 submitting
- [ ] 所有测试 `pytest tests/ -v` 全部通过