Files
X-Financial/docs/plans/phase-2-backend-core/README.md
WIN-JHFT4D3SIVT\caoxiaozhu 7141e1d11a feat: refactor monolithic App.vue into modular Vue component architecture
- Extract 711-line App.vue into 15+ focused files across 5 directories
- Add data layer (icons, metrics, policies, auditTrail, requests)
- Add composables (useNavigation, useRequests, useChat, useToast)
- Add layout components (SidebarRail, TopBar, FilterBar)
- Add shared components (PanelHead, InfoRow, ToastNotification)
- Add business component (RequestTable) and 5 view components
- Extract global CSS to assets/styles/global.css
- Add start.sh with WSL/Windows cross-platform support
- Add .gitignore for node_modules, dist, and IDE dirs
2026-04-28 17:20:52 +08:00

835 lines
26 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Phase 2: 后端核心服务W2-W3
> **目标:** 实现所有后端业务 API包括任务管理、文件上传、OCR 集成、规则引擎、影子账本、补件与提交。
> **周期:** 第 2 ~ 3 周
> **任务数:** 6 个
> **可并行:** Task 2.1 / 2.2 / 2.3 可并行Task 2.4 依赖 2.1Task 2.5 依赖 2.2 + 2.4
> **前置依赖:** Phase 1 完成
---
## 本阶段交付物
| 交付物 | 说明 |
|---|---|
| 报销任务 API | 创建/查询/列表 |
| 文件上传 API | MinIO 存储 + 票据管理 |
| OCR 服务 | 百度云 + Mock Provider |
| 规则引擎 | 6 条核心规则 + 管理 API |
| 影子账本 API | 草稿/预审结果查询 |
| 补件 + 提交 API | 补件交互 + 模拟同步 |
---
## 任务清单
### Task 2.1: 报销任务管理 API
**负责人:** 后端工程师 A
**预计工时:** 1.5 天
**前置依赖:** Phase 1
**Files:**
- Create: `backend/app/schemas/task.py`
- Create: `backend/app/services/task_service.py`
- Create: `backend/app/api/v1/tasks.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_task_api.py`
- [ ] **Step 1: 定义 Pydantic schemas**
`backend/app/schemas/task.py`:
```python
from pydantic import BaseModel
from datetime import datetime
class TaskCreateRequest(BaseModel):
user_id: str
company_id: str
user_intent: str
entry_channel: str = "web"
class TaskResponse(BaseModel):
task_id: str
status: str
class Config:
from_attributes = True
class TaskDetailResponse(BaseModel):
id: str
user_id: str
company_id: str
task_type: str
status: str
user_intent: str | None
current_agent: str | None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class TaskListResponse(BaseModel):
total: int
items: list[TaskDetailResponse]
```
- [ ] **Step 2: 实现 TaskService 业务逻辑**
`backend/app/services/task_service.py`:
```python
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.models.task import ReimbursementTask
from app.models.enums import TaskStatus
from app.schemas.task import TaskCreateRequest
class TaskService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_task(self, req: TaskCreateRequest) -> ReimbursementTask:
task = ReimbursementTask(
user_id=req.user_id,
company_id=req.company_id,
user_intent=req.user_intent,
status=TaskStatus.MATERIAL_COLLECTING,
)
self.db.add(task)
await self.db.commit()
await self.db.refresh(task)
return task
async def get_task(self, task_id: str) -> ReimbursementTask | None:
result = await self.db.execute(
select(ReimbursementTask).where(ReimbursementTask.id == task_id)
)
return result.scalar_one_or_none()
async def list_tasks(self, user_id: str | None = None, status: str | None = None,
page: int = 1, size: int = 20) -> tuple[list[ReimbursementTask], int]:
query = select(ReimbursementTask)
count_query = select(func.count()).select_from(ReimbursementTask)
if user_id:
query = query.where(ReimbursementTask.user_id == user_id)
count_query = count_query.where(ReimbursementTask.user_id == user_id)
if status:
query = query.where(ReimbursementTask.status == status)
count_query = count_query.where(ReimbursementTask.status == status)
total_result = await self.db.execute(count_query)
total = total_result.scalar() or 0
query = query.offset((page - 1) * size).limit(size).order_by(ReimbursementTask.created_at.desc())
result = await self.db.execute(query)
return result.scalars().all(), total
async def update_status(self, task_id: str, status: TaskStatus, current_agent: str | None = None) -> ReimbursementTask | None:
task = await self.get_task(task_id)
if not task:
return None
task.status = status
task.current_agent = current_agent
await self.db.commit()
await self.db.refresh(task)
return task
```
- [ ] **Step 3: 实现 API 路由**
`backend/app/api/v1/tasks.py`:
```python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.schemas.task import TaskCreateRequest, TaskResponse, TaskDetailResponse, TaskListResponse
from app.services.task_service import TaskService
router = APIRouter(prefix="/reimbursement/tasks", tags=["tasks"])
@router.post("", response_model=TaskResponse, status_code=201)
async def create_task(req: TaskCreateRequest, db: AsyncSession = Depends(get_db)):
svc = TaskService(db)
task = await svc.create_task(req)
return TaskResponse(task_id=task.id, status=task.status.value)
@router.get("/{task_id}", response_model=TaskDetailResponse)
async def get_task(task_id: str, db: AsyncSession = Depends(get_db)):
svc = TaskService(db)
task = await svc.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.get("", response_model=TaskListResponse)
async def list_tasks(user_id: str | None = None, status: str | None = None,
page: int = 1, size: int = 20, db: AsyncSession = Depends(get_db)):
svc = TaskService(db)
items, total = await svc.list_tasks(user_id, status, page, size)
return TaskListResponse(total=total, items=items)
```
更新 `backend/app/api/v1/router.py`:
```python
from fastapi import APIRouter
from app.api.v1.tasks import router as tasks_router
api_router = APIRouter()
api_router.include_router(tasks_router)
```
- [ ] **Step 4: 编写测试**
```python
# backend/tests/test_task_api.py
import pytest
@pytest.mark.asyncio
async def test_create_task(client):
response = await client.post("/api/v1/reimbursement/tasks", json={
"user_id": "U001",
"company_id": "C001",
"user_intent": "我要报这次北京出差的费用",
"entry_channel": "web"
})
assert response.status_code == 201
data = response.json()
assert "task_id" in data
assert data["status"] == "material_collecting"
@pytest.mark.asyncio
async def test_get_task(client):
# 先创建
create_resp = await client.post("/api/v1/reimbursement/tasks", json={
"user_id": "U001", "company_id": "C001", "user_intent": "test"
})
task_id = create_resp.json()["task_id"]
# 再查询
get_resp = await client.get(f"/api/v1/reimbursement/tasks/{task_id}")
assert get_resp.status_code == 200
assert get_resp.json()["user_id"] == "U001"
@pytest.mark.asyncio
async def test_list_tasks(client):
response = await client.get("/api/v1/reimbursement/tasks")
assert response.status_code == 200
assert "total" in response.json()
assert "items" in response.json()
```
- [ ] **Step 5: 运行测试**
Run: `cd backend && pytest tests/test_task_api.py -v`
Expected: All PASS
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现报销任务管理 API创建/查询/列表)"
```
---
### Task 2.2: 文件上传与票据管理 API
**负责人:** 后端工程师 B
**预计工时:** 1.5 天
**前置依赖:** Phase 1
**Files:**
- Create: `backend/app/schemas/document.py`
- Create: `backend/app/services/document_service.py`
- Create: `backend/app/services/storage_service.py`
- Create: `backend/app/api/v1/documents.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_document_api.py`
- [ ] **Step 1: 实现 MinIO 存储服务**
`backend/app/services/storage_service.py` — 封装 MinIO 操作:
- `upload_file(bucket, file_name, file_data, content_type)` → 上传文件到 MinIO
- `get_file_url(bucket, file_name)` → 获取文件访问 URL
- `delete_file(bucket, file_name)` → 删除文件
- `ensure_bucket(bucket)` → 确保 bucket 存在
开发阶段可使用 mock 实现(本地文件系统存储)。
- [ ] **Step 2: 实现文档服务**
`backend/app/services/document_service.py`:
- `upload_document(task_id, file, document_type)` → 保存文件到 MinIO创建 DB 记录
- `get_documents(task_id)` → 查询任务下所有票据
- `get_document(document_id)` → 查询单个票据
- `update_ocr_result(document_id, ocr_result)` → 更新 OCR 识别结果
- [ ] **Step 3: 定义 Pydantic schemas**
`backend/app/schemas/document.py`:
```python
from pydantic import BaseModel
from datetime import date
from decimal import Decimal
class DocumentUploadResponse(BaseModel):
document_id: str
ocr_status: str
class DocumentResponse(BaseModel):
id: str
task_id: str
document_type: str
file_url: str
ocr_status: str
invoice_code: str | None
invoice_number: str | None
invoice_date: date | None
amount: Decimal | None
seller_name: str | None
class Config:
from_attributes = True
```
- [ ] **Step 4: 实现 API 路由**
`backend/app/api/v1/documents.py`:
```python
from fastapi import APIRouter, Depends, UploadFile, File, Form, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.schemas.document import DocumentUploadResponse, DocumentResponse
from app.services.document_service import DocumentService
router = APIRouter(prefix="/reimbursement/tasks/{task_id}/documents", tags=["documents"])
@router.post("", response_model=DocumentUploadResponse, status_code=201)
async def upload_document(
task_id: str,
file: UploadFile = File(...),
document_type: str = Form(...),
db: AsyncSession = Depends(get_db)
):
svc = DocumentService(db)
doc = await svc.upload_document(task_id, file, document_type)
return DocumentUploadResponse(document_id=doc.id, ocr_status=doc.ocr_status)
@router.get("", response_model=list[DocumentResponse])
async def list_documents(task_id: str, db: AsyncSession = Depends(get_db)):
svc = DocumentService(db)
docs = await svc.get_documents(task_id)
return docs
```
`router.py` 中注册 documents_router。
- [ ] **Step 5: 编写测试(使用 mock MinIO**
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现文件上传与票据管理 APIMinIO 存储)"
```
---
### Task 2.3: OCR 服务集成
**负责人:** 后端工程师 B
**预计工时:** 2 天
**前置依赖:** Task 2.2(需要 document_service
**可并行于:** Task 2.1、Task 2.4
**Files:**
- Create: `backend/app/services/ocr_service.py`
- Create: `backend/app/services/ocr_providers/__init__.py`
- Create: `backend/app/services/ocr_providers/base.py`
- Create: `backend/app/services/ocr_providers/baidu.py`
- Create: `backend/app/services/ocr_providers/mock.py`
- Test: `backend/tests/test_ocr_service.py`
- [ ] **Step 1: 定义 OCR Provider 抽象接口**
`backend/app/services/ocr_providers/base.py`:
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@dataclass
class OCRResult:
document_type: str # 识别出的票据类型
raw_text: str # 原始文字
fields: dict = field(default_factory=dict) # 结构化字段
confidence: float = 0.0 # 整体置信度 0-1
provider: str = "" # 提供商名称
class OCRProvider(ABC):
@abstractmethod
async def recognize(self, file_url: str, document_type: str | None = None) -> OCRResult:
...
@abstractmethod
async def recognize_vat_invoice(self, file_url: str) -> OCRResult:
...
@abstractmethod
async def recognize_train_ticket(self, file_url: str) -> OCRResult:
...
```
- [ ] **Step 2: 实现 Mock OCR Provider**
`backend/app/services/ocr_providers/mock.py` — 根据文件名/类型返回预定义的结构化数据:
```python
from app.services.ocr_providers.base import OCRProvider, OCRResult
class MockOCRProvider(OCRProvider):
async def recognize(self, file_url: str, document_type: str | None = None) -> OCRResult:
if document_type == "vat_invoice" or "invoice" in file_url:
return await self.recognize_vat_invoice(file_url)
elif document_type == "train_ticket" or "train" in file_url:
return await self.recognize_train_ticket(file_url)
return OCRResult(document_type="unknown", raw_text="", confidence=0.0, provider="mock")
async def recognize_vat_invoice(self, file_url: str) -> OCRResult:
return OCRResult(
document_type="vat_invoice",
raw_text="增值税电子普通发票",
fields={
"invoice_code": "050002100311",
"invoice_number": "23912077",
"invoice_date": "2026-04-20",
"amount": "1061.95",
"tax_amount": "61.95",
"total_amount": "1123.90",
"seller_name": "北京XX酒店管理有限公司",
"buyer_name": "XX科技有限公司",
"check_code": "1234567890",
},
confidence=0.95,
provider="mock"
)
async def recognize_train_ticket(self, file_url: str) -> OCRResult:
return OCRResult(
document_type="train_ticket",
raw_text="火车票",
fields={
"train_number": "G101",
"departure_station": "北京南",
"arrival_station": "上海虹桥",
"departure_date": "2026-04-18",
"departure_time": "07:00",
"seat_type": "二等座",
"amount": "553.00",
"passenger_name": "张三",
"id_number": "****1234",
},
confidence=0.90,
provider="mock"
)
```
- [ ] **Step 3: 实现百度 OCR Provider**
`backend/app/services/ocr_providers/baidu.py` — 调用百度云 OCR API
- `recognize_vat_invoice()` → 调用增值税发票识别接口
- `recognize_train_ticket()` → 调用火车票识别接口
- `recognize()` → 自动判断票据类型,调用对应接口
- 将百度返回结果标准化为 `OCRResult`
- 包含 access_token 获取和缓存逻辑
- [ ] **Step 4: 实现 OCR Service 门面**
`backend/app/services/ocr_service.py`:
```python
from app.core.config import settings
from app.services.ocr_providers.base import OCRResult
from app.services.ocr_providers.mock import MockOCRProvider
from app.services.ocr_providers.baidu import BaiduOCRProvider
class OCRService:
def __init__(self):
self._provider = self._create_provider()
def _create_provider(self):
if settings.OCR_PROVIDER == "mock":
return MockOCRProvider()
elif settings.OCR_PROVIDER == "baidu":
return BaiduOCRProvider(
api_key=settings.BAIDU_OCR_API_KEY,
secret_key=settings.BAIDU_OCR_SECRET_KEY
)
raise ValueError(f"Unknown OCR provider: {settings.OCR_PROVIDER}")
async def recognize(self, file_url: str, document_type: str | None = None) -> OCRResult:
return await self._provider.recognize(file_url, document_type)
```
- [ ] **Step 5: 编写测试**
使用 Mock Provider 测试完整 OCR 流程。
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现 OCR 服务(百度云 + Mock Provider"
```
---
### Task 2.4: 规则引擎
**负责人:** 后端工程师 A
**预计工时:** 3 天
**前置依赖:** Task 2.1(需要 task 模型)
**Files:**
- Create: `backend/app/services/rule_engine.py`
- Create: `backend/app/services/rule_checkers/__init__.py`
- Create: `backend/app/services/rule_checkers/base.py`
- Create: `backend/app/services/rule_checkers/required_fields.py`
- Create: `backend/app/services/rule_checkers/attachment_check.py`
- Create: `backend/app/services/rule_checkers/duplicate_invoice.py`
- Create: `backend/app/services/rule_checkers/amount_limit.py`
- Create: `backend/app/services/rule_checkers/date_validity.py`
- Create: `backend/app/services/rule_checkers/expense_type_match.py`
- Create: `backend/app/schemas/rule.py`
- Create: `backend/app/api/v1/rules.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_rule_engine.py`
- [ ] **Step 1: 定义规则检查器基类**
`backend/app/services/rule_checkers/base.py`:
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class RuleCheckResult:
rule_code: str
severity: str # low / medium / high / blocked
action: str # pass / warn / require_explanation / require_attachment / require_approval / block
message: str
suggestion: str
policy_ref: str
hit_detail: dict
class RuleChecker(ABC):
@abstractmethod
async def check(self, context: dict) -> RuleCheckResult | None:
"""检查规则,命中返回结果,未命中返回 None"""
...
```
- [ ] **Step 2: 实现 RuleEngine 核心引擎**
`backend/app/services/rule_engine.py`:
```python
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.rule_checkers.base import RuleChecker, RuleCheckResult
from dataclasses import dataclass, field
@dataclass
class PrecheckResult:
precheck_status: str # pass / need_supplement / blocked
risk_level: str # low / medium / high / blocked
rule_hits: list[RuleCheckResult] = field(default_factory=list)
summary: str = ""
class RuleEngine:
def __init__(self, db: AsyncSession):
self.db = db
self.checkers: list[RuleChecker] = []
def register_checker(self, checker: RuleChecker):
self.checkers.append(checker)
async def run_precheck(self, context: dict) -> PrecheckResult:
"""执行完整预审,遍历所有注册的 checker"""
hits: list[RuleCheckResult] = []
for checker in self.checkers:
result = await checker.check(context)
if result:
hits.append(result)
risk_level = self._calculate_overall_risk(hits)
status = self._determine_status(hits)
summary = self._generate_summary(hits)
return PrecheckResult(
precheck_status=status,
risk_level=risk_level,
rule_hits=hits,
summary=summary
)
def _calculate_overall_risk(self, hits: list[RuleCheckResult]) -> str:
if not hits:
return "low"
severity_order = {"blocked": 4, "high": 3, "medium": 2, "low": 1}
max_severity = max(hits, key=lambda h: severity_order.get(h.severity, 0))
return max_severity.severity
def _determine_status(self, hits: list[RuleCheckResult]) -> str:
if not hits:
return "pass"
if any(h.action == "block" for h in hits):
return "blocked"
return "need_supplement"
def _generate_summary(self, hits: list[RuleCheckResult]) -> str:
if not hits:
return "预审通过,未发现风险。"
blocked = sum(1 for h in hits if h.action == "block")
warnings = sum(1 for h in hits if h.action in ("warn", "require_explanation"))
supplements = sum(1 for h in hits if h.action == "require_attachment")
parts = []
if blocked:
parts.append(f"{blocked} 个阻断项")
if supplements:
parts.append(f"{supplements} 个缺件")
if warnings:
parts.append(f"{warnings} 个风险提示")
return f"当前报销单存在{''.join(parts)}"
```
- [ ] **Step 3: 实现 6 条核心规则检查器**
1. **`required_fields.py`** — `RequiredFieldsChecker` — 必填字段校验
- 检查报销人、部门、事由、费用明细是否有空值
- 命中返回 `require_explanation`
2. **`attachment_check.py`** — `AttachmentCheckChecker` — 附件完整性校验
- 住宿费必须上传酒店流水
- 交通费必须上传对应票据
- 命中返回 `require_attachment`
3. **`duplicate_invoice.py`** — `DuplicateInvoiceChecker` — 重复发票检查
- 检查 invoice_code + invoice_number + amount 是否重复
- 命中返回 `block`
4. **`amount_limit.py`** — `AmountLimitChecker` — 金额超标校验
- 按城市等级和费用类型检查标准
- 住宿费按每晚金额检查
- 命中返回 `require_explanation`
5. **`date_validity.py`** — `DateValidityChecker` — 日期合理性校验
- 费用日期不能晚于今天
- 费用日期应在出差期间内
- 命中返回 `warn`
6. **`expense_type_match.py`** — `ExpenseTypeMatchChecker` — 费用类型匹配校验
- 住宿费应关联 hotel_bill 类型票据
- 交通费应关联 train_ticket / flight_itinerary / taxi_receipt
- 命中返回 `warn`
- [ ] **Step 4: 实现规则管理 API**
- `GET /api/v1/rules` — 列出所有规则
- `POST /api/v1/rules` — 创建规则
- `PUT /api/v1/rules/{rule_id}` — 更新规则
- `PATCH /api/v1/rules/{rule_id}/toggle` — 启用/禁用规则
- [ ] **Step 5: 编写测试**
对每条规则编写单元测试:
```python
@pytest.mark.asyncio
async def test_duplicate_invoice_checker():
checker = DuplicateInvoiceChecker()
# 模拟重复发票场景
context = {"items": [...], "existing_invoices": [...]}
result = await checker.check(context)
assert result is not None
assert result.action == "block"
@pytest.mark.asyncio
async def test_no_duplicate():
checker = DuplicateInvoiceChecker()
context = {"items": [...], "existing_invoices": []} # 无重复
result = await checker.check(context)
assert result is None
```
- [ ] **Step 6: Commit**
```bash
git add backend/
git commit -m "feat: 实现规则引擎6 条核心规则 + 管理 API"
```
---
### Task 2.5: 影子报销账本 CRUD
**负责人:** 后端工程师 A
**预计工时:** 1.5 天
**前置依赖:** Task 2.1 + Task 2.4
**Files:**
- Create: `backend/app/schemas/reimbursement.py`
- Create: `backend/app/services/ledger_service.py`
- Create: `backend/app/api/v1/ledger.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_ledger_api.py`
- [ ] **Step 1: 定义 Pydantic schemas**
`backend/app/schemas/reimbursement.py`:
```python
from pydantic import BaseModel
from datetime import date, datetime
from decimal import Decimal
class ReimbursementItemResponse(BaseModel):
id: str
expense_type: str
amount: Decimal
tax_amount: Decimal | None
occurred_at: date | None
city: str | None
vendor_name: str | None
risk_level: str | None
remark: str | None
class Config:
from_attributes = True
class ReimbursementDraftResponse(BaseModel):
reimbursement_id: str
reason: str | None
total_amount: Decimal
precheck_status: str | None
risk_level: str | None
items: list[ReimbursementItemResponse]
created_at: datetime
class Config:
from_attributes = True
class PrecheckResultResponse(BaseModel):
precheck_status: str
risk_level: str
summary: str
rule_hits: list[dict]
```
- [ ] **Step 2: 实现 LedgerService**
`backend/app/services/ledger_service.py` — 核心方法:
- `create_shadow_reimbursement(task_id, data)` → 创建影子报销记录
- `get_draft(reimbursement_id)` → 获取报销草稿
- `get_draft_by_task(task_id)` → 通过任务 ID 获取草稿
- `update_precheck_result(reimbursement_id, result)` → 更新预审结果
- `add_item(reimbursement_id, item_data)` → 添加报销明细
- [ ] **Step 3: 实现 API 路由**
`GET /api/v1/reimbursement/tasks/{task_id}/draft` — 获取报销草稿(对应文档 8.4
`GET /api/v1/reimbursement/tasks/{task_id}/precheck-result` — 获取预审结果(对应文档 8.5
- [ ] **Step 4: 编写测试**
- [ ] **Step 5: Commit**
```bash
git add backend/
git commit -m "feat: 实现影子报销账本 CRUD API"
```
---
### Task 2.6: 补件与提交 API
**负责人:** 后端工程师 A
**预计工时:** 1.5 天
**前置依赖:** Task 2.5
**Files:**
- Create: `backend/app/api/v1/supplements.py`
- Create: `backend/app/services/supplement_service.py`
- Create: `backend/app/services/sync_service.py`
- Modify: `backend/app/api/v1/router.py`
- Test: `backend/tests/test_supplement_api.py`
- [ ] **Step 1: 实现补件服务**
`backend/app/services/supplement_service.py`:
- `create_supplement_request(reimbursement_id, items)` → 创建补件请求
- `respond_supplement(request_id, response_text, document_ids)` → 用户补件响应
- `get_supplement_requests(task_id)` → 查询补件请求列表
- [ ] **Step 2: 实现同步服务MVP 阶段为模拟)**
`backend/app/services/sync_service.py`:
```python
import uuid
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
class SyncService:
def __init__(self, db: AsyncSession):
self.db = db
async def mock_sync_to_backend(self, reimbursement_id: str) -> dict:
"""模拟后端同步,生成假的 backend_bill_id"""
backend_bill_id = f"BX{datetime.now().strftime('%Y%m%d')}{str(uuid.uuid4())[:6]}"
return {
"sync_status": "success",
"target_system": "expense_system",
"backend_bill_id": backend_bill_id,
}
async def get_sync_status(self, task_id: str) -> dict | None:
"""查询同步状态"""
# 从 sync_record 表查询
...
```
- [ ] **Step 3: 实现 API 路由**
`POST /api/v1/reimbursement/tasks/{task_id}/supplements` — 用户补件(对应文档 8.6
`POST /api/v1/reimbursement/tasks/{task_id}/submit` — 用户确认提交(对应文档 8.7
`GET /api/v1/reimbursement/tasks/{task_id}/sync-status` — 查询同步状态(对应文档 8.8
- [ ] **Step 4: 编写测试**
- [ ] **Step 5: Commit**
```bash
git add backend/
git commit -m "feat: 实现补件与提交确认 API含模拟同步"
```
---
## 本阶段完成检查
- [ ] `POST /api/v1/reimbursement/tasks` 创建任务返回 201
- [ ] `POST /api/v1/reimbursement/tasks/{id}/documents` 上传文件返回 201
- [ ] OCR Service 对 Mock Provider 正常返回结构化数据
- [ ] 规则引擎对 6 条规则命中/未命中的测试全部通过
- [ ] `GET /api/v1/reimbursement/tasks/{id}/draft` 返回草稿数据
- [ ] `POST /api/v1/reimbursement/tasks/{id}/supplements` 补件返回 received
- [ ] `POST /api/v1/reimbursement/tasks/{id}/submit` 提交返回 submitting
- [ ] 所有测试 `pytest tests/ -v` 全部通过