Files
X-Financial/server/tests/test_attachment_association_jobs.py
caoxiaozhu 9a5ed0e94a feat(server): 系统缓存清理接口与 OCR 文本层兜底增强
- 新增 system_cache 模块与 POST /settings/cache/clear,管理员可一键清理 OCR 结果/运行时配置/模型失败冷却/知识库索引/地点语义等进程内缓存
- 各服务暴露 clear_*_cache 方法(ocr/runtime_settings/runtime_chat/knowledge/application_location_semantic),SettingsCacheClearRead 汇总清理项
- OCR 转图片失败时尝试用 PDF 文本层兜底构建识别文档(有效字符≥8),并写结果缓存;OcrService 暴露 clear_result_cache
- receipt_folder 车票过滤补充身份证号关键词,附件文档/操作/展示模块同步适配
- 新增 system_cache_endpoints 测试,更新 openapi_schema/ocr/receipt_folder/attachment_association_jobs 测试
2026-06-24 12:35:51 +08:00

517 lines
20 KiB
Python

from __future__ import annotations
import base64
from collections.abc import Generator
from datetime import UTC, date, datetime
from decimal import Decimal
from fastapi.testclient import TestClient
from sqlalchemy import select
from sqlalchemy.orm import Session, selectinload
from app.api.deps import CurrentUserContext, get_db
from app.api.v1.endpoints import attachment_association_jobs as attachment_jobs_endpoint
from app.core.config import get_settings
from app.main import create_app
from app.models.employee import Employee
from app.models.financial_record import ExpenseClaim, ExpenseClaimItem
from app.schemas.ocr import OcrRecognizeBatchRead, OcrRecognizeDocumentRead, OcrRecognizeFieldRead
from app.services.attachment_association_jobs import clear_attachment_association_jobs_for_tests
from app.services.expense_claims import ExpenseClaimService
from app.services.expense_claim_attachment_storage import ExpenseClaimAttachmentStorage
from app.services.ocr import OcrService
from app.services.receipt_folder import ReceiptFolderService
from app.test_helpers.db import build_in_memory_session_factory
def build_client(monkeypatch) -> tuple[TestClient, object]:
session_factory = build_in_memory_session_factory()
app = create_app()
def override_db() -> Generator[Session, None, None]:
db = session_factory()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_db
monkeypatch.setattr(attachment_jobs_endpoint, "get_session_factory", lambda: session_factory)
return TestClient(app), session_factory
def seed_travel_claim(db: Session) -> ExpenseClaim:
employee = Employee(
id="emp-bg-association",
employee_no="E10001",
name="张三",
email="zhangsan@example.com",
position="实施顾问",
grade="P4",
)
claim = ExpenseClaim(
id="claim-bg-association",
claim_no="BX-20260220-001",
employee_id=employee.id,
employee_name=employee.name,
department_id="dept-delivery",
department_name="交付部",
project_code=None,
expense_type="travel",
reason="辅助国网仿生产服务器部署,武汉往返上海",
location="上海",
amount=Decimal("0.00"),
currency="CNY",
invoice_count=0,
occurred_at=datetime(2026, 2, 20, tzinfo=UTC),
submitted_at=None,
status="draft",
approval_stage="待提交",
risk_flags_json=[],
)
item = ExpenseClaimItem(
id="item-bg-association-1",
claim_id=claim.id,
item_date=date(2026, 2, 20),
item_type="train_ticket",
item_reason="武汉至上海高铁",
item_location="上海",
item_amount=Decimal("0.00"),
invoice_id=None,
)
claim.items = [item]
db.add_all([employee, claim])
db.commit()
return claim
def save_train_receipt(
*,
service: ReceiptFolderService,
current_user: CurrentUserContext,
filename: str,
route: str,
trip_date: str,
) -> str:
receipt = service.save_receipt(
filename=filename,
content=f"fake-pdf-{filename}".encode("utf-8"),
media_type="application/pdf",
current_user=current_user,
document=OcrRecognizeDocumentRead(
filename=filename,
media_type="application/pdf",
text=f"电子发票(铁路电子客票) {route} {trip_date} 票价 354 元",
summary=f"铁路电子客票,{route},票价 354 元。",
avg_score=0.96,
line_count=1,
page_count=1,
document_type="train_ticket",
document_type_label="火车/高铁票",
scene_code="travel",
scene_label="差旅票据",
document_fields=[
OcrRecognizeFieldRead(key="date", label="列车出发时间", value=trip_date),
OcrRecognizeFieldRead(key="route", label="行程", value=route),
OcrRecognizeFieldRead(key="amount", label="金额", value="354元"),
],
),
)
return receipt.id
def fake_ocr_recognize(
self,
files: list[tuple[str, bytes, str | None]],
) -> OcrRecognizeBatchRead:
filename = files[0][0]
return OcrRecognizeBatchRead(
total_file_count=1,
success_count=1,
documents=[
OcrRecognizeDocumentRead(
filename=filename,
media_type=files[0][2] or "application/pdf",
text="电子发票(铁路电子客票) 武汉 上海 2026-02-20 票价 354 元",
summary="铁路电子客票,武汉至上海,票价 354 元。",
avg_score=0.96,
line_count=1,
page_count=1,
document_type="train_ticket",
document_type_label="火车/高铁票",
scene_code="travel",
scene_label="差旅票据",
document_fields=[
OcrRecognizeFieldRead(key="date", label="列车出发时间", value="2026-02-20"),
OcrRecognizeFieldRead(key="route", label="行程", value="武汉-上海"),
OcrRecognizeFieldRead(key="amount", label="金额", value="354元"),
],
)
],
)
def fake_ocr_recognize_without_preview(
self,
files: list[tuple[str, bytes, str | None]],
) -> OcrRecognizeBatchRead:
return fake_ocr_recognize(self, files)
def test_attachment_association_job_links_receipts_after_conversation_exit(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("STORAGE_ROOT_DIR", str(tmp_path / "storage"))
get_settings.cache_clear()
clear_attachment_association_jobs_for_tests()
monkeypatch.setattr(OcrService, "recognize_files", fake_ocr_recognize)
monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path / "attachments")
try:
client, session_factory = build_client(monkeypatch)
current_user = CurrentUserContext(
username="zhangsan@example.com",
name="张三",
role_codes=["user"],
is_admin=False,
employee_no="E10001",
)
with session_factory() as db:
seed_travel_claim(db)
receipt_service = ReceiptFolderService()
receipt_ids = [
save_train_receipt(
service=receipt_service,
current_user=current_user,
filename="2月20 武汉-上海.pdf",
route="武汉-上海",
trip_date="2026-02-20",
),
save_train_receipt(
service=receipt_service,
current_user=current_user,
filename="2月23 上海-武汉.pdf",
route="上海-武汉",
trip_date="2026-02-23",
),
]
headers = {
"x-auth-username": "zhangsan@example.com",
"x-auth-name": "Zhang San",
"x-auth-employee-no": "E10001",
"x-auth-role-codes": "user",
}
response = client.post(
"/api/v1/reimbursements/attachment-association-jobs",
headers=headers,
json={
"receipt_ids": receipt_ids,
"prompt": "请帮我处理已上传的附件。",
"conversation_id": "inline-test",
},
)
assert response.status_code == 202
job_id = response.json()["job_id"]
status_response = client.get(
f"/api/v1/reimbursements/attachment-association-jobs/{job_id}",
headers=headers,
)
assert status_response.status_code == 200
payload = status_response.json()
assert payload["status"] == "succeeded"
assert payload["claim_id"] == "claim-bg-association"
assert payload["claim_no"] == "BX-20260220-001"
assert payload["uploaded_count"] == 2
with session_factory() as db:
claim = db.scalar(
select(ExpenseClaim)
.options(selectinload(ExpenseClaim.items))
.where(ExpenseClaim.id == "claim-bg-association")
)
assert claim is not None
attached_items = [item for item in claim.items if item.invoice_id]
assert len(attached_items) == 2
linked_receipts = receipt_service.list_receipts(current_user=current_user, status_filter="linked")
assert {item.id for item in linked_receipts} == set(receipt_ids)
assert {item.linked_claim_id for item in linked_receipts} == {"claim-bg-association"}
finally:
clear_attachment_association_jobs_for_tests()
get_settings.cache_clear()
def test_attachment_association_keeps_receipt_folder_preview_and_fields_after_cache_clear(
monkeypatch,
tmp_path,
) -> None:
preview_bytes = b"receipt-folder-preview-png"
preview_data_url = f"data:image/png;base64,{base64.b64encode(preview_bytes).decode('ascii')}"
monkeypatch.setenv("STORAGE_ROOT_DIR", str(tmp_path / "storage"))
get_settings.cache_clear()
clear_attachment_association_jobs_for_tests()
monkeypatch.setattr(OcrService, "recognize_files", fake_ocr_recognize_without_preview)
monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path / "attachments")
try:
client, session_factory = build_client(monkeypatch)
current_user = CurrentUserContext(
username="zhangsan@example.com",
name="张三",
role_codes=["user"],
is_admin=False,
employee_no="E10001",
)
with session_factory() as db:
seed_travel_claim(db)
receipt = ReceiptFolderService().save_receipt(
filename="2月20 武汉-上海.pdf",
content=b"%PDF-1.7 fake-ticket",
media_type="application/pdf",
current_user=current_user,
document=OcrRecognizeDocumentRead(
filename="2月20 武汉-上海.pdf",
media_type="application/pdf",
text="电子发票(铁路电子客票) 武汉站 G458 上海虹桥站 2026年02月20日 07:55开 二等座 票价 354.00",
summary="铁路电子客票,武汉-上海,票价 354 元。",
avg_score=0.96,
line_count=1,
page_count=1,
document_type="train_ticket",
document_type_label="火车/高铁票",
scene_code="travel",
scene_label="差旅票据",
preview_kind="image",
preview_data_url=preview_data_url,
document_fields=[
OcrRecognizeFieldRead(key="date", label="列车出发时间", value="2026-02-20 07:55"),
OcrRecognizeFieldRead(key="route", label="行程", value="武汉-上海"),
OcrRecognizeFieldRead(key="amount", label="金额", value="354元"),
],
),
)
OcrService.clear_result_cache()
headers = {
"x-auth-username": "zhangsan@example.com",
"x-auth-name": "Zhang San",
"x-auth-employee-no": "E10001",
"x-auth-role-codes": "user",
}
response = client.post(
"/api/v1/reimbursements/attachment-association-jobs",
headers=headers,
json={
"receipt_ids": [receipt.id],
"prompt": "请帮我处理已上传的附件。",
"conversation_id": "inline-test",
},
)
assert response.status_code == 202
job_id = response.json()["job_id"]
status_response = client.get(
f"/api/v1/reimbursements/attachment-association-jobs/{job_id}",
headers=headers,
)
assert status_response.status_code == 200
assert status_response.json()["status"] == "succeeded"
with session_factory() as db:
claim = db.scalar(
select(ExpenseClaim)
.options(selectinload(ExpenseClaim.items))
.where(ExpenseClaim.id == "claim-bg-association")
)
assert claim is not None
attached_item = next(item for item in claim.items if item.invoice_id)
metadata = ExpenseClaimService(db).get_claim_item_attachment_meta(
claim_id=claim.id,
item_id=attached_item.id,
current_user=current_user,
)
assert metadata is not None
assert metadata["preview_kind"] == "image"
assert metadata["document_info"]["document_type"] == "train_ticket"
assert metadata["document_info"]["document_type_label"] == "火车/高铁票"
assert {
(field["label"], field["value"])
for field in metadata["document_info"]["fields"]
} >= {
("列车出发时间", "2026-02-20 07:55"),
("行程", "武汉-上海"),
("金额", "354元"),
}
preview_path, media_type, filename = ExpenseClaimService(db).get_claim_item_attachment_preview_content(
claim_id=claim.id,
item_id=attached_item.id,
current_user=current_user,
)
assert media_type == "image/png"
assert filename.endswith(".png")
assert preview_path.read_bytes() == preview_bytes
finally:
clear_attachment_association_jobs_for_tests()
get_settings.cache_clear()
def test_attachment_meta_repairs_existing_pdf_fallback_from_source_receipt(
monkeypatch,
tmp_path,
) -> None:
preview_bytes = b"legacy-repaired-preview-png"
preview_data_url = f"data:image/png;base64,{base64.b64encode(preview_bytes).decode('ascii')}"
monkeypatch.setenv("STORAGE_ROOT_DIR", str(tmp_path / "storage"))
get_settings.cache_clear()
monkeypatch.setattr(ExpenseClaimAttachmentStorage, "root", lambda self: tmp_path / "attachments")
try:
current_user = CurrentUserContext(
username="zhangsan@example.com",
name="张三",
role_codes=["user"],
is_admin=False,
employee_no="E10001",
)
client, session_factory = build_client(monkeypatch)
client.close()
with session_factory() as db:
claim = seed_travel_claim(db)
item = claim.items[0]
receipt = ReceiptFolderService().save_receipt(
filename="2月20 武汉-上海.pdf",
content=b"%PDF-1.7 fake-ticket",
media_type="application/pdf",
current_user=current_user,
document=OcrRecognizeDocumentRead(
filename="2月20 武汉-上海.pdf",
media_type="application/pdf",
text="电子发票(铁路电子客票) 武汉站 G458 上海虹桥站 2026年02月20日 07:55开 二等座 票价 354.00",
summary="铁路电子客票,武汉-上海,票价 354 元。",
avg_score=0.96,
line_count=1,
page_count=1,
document_type="train_ticket",
document_type_label="火车/高铁票",
scene_code="travel",
scene_label="差旅票据",
preview_kind="image",
preview_data_url=preview_data_url,
document_fields=[
OcrRecognizeFieldRead(key="date", label="列车出发时间", value="2026-02-20 07:55"),
OcrRecognizeFieldRead(key="route", label="行程", value="武汉-上海"),
OcrRecognizeFieldRead(key="amount", label="金额", value="354元"),
],
),
)
attachment_dir = tmp_path / "attachments" / claim.id / item.id
attachment_dir.mkdir(parents=True)
file_path = attachment_dir / "2月20_武汉-上海.pdf"
file_path.write_bytes(b"%PDF-1.7 persisted-but-bad-meta")
storage = ExpenseClaimAttachmentStorage()
item.invoice_id = storage.to_storage_key(file_path)
storage.write_meta(
file_path,
{
"file_name": file_path.name,
"storage_key": storage.to_storage_key(file_path),
"media_type": "application/pdf",
"size_bytes": file_path.stat().st_size,
"previewable": True,
"preview_kind": "pdf",
"preview_storage_key": storage.to_storage_key(file_path),
"preview_media_type": "application/pdf",
"preview_file_name": file_path.name,
"document_info": {
"document_type": "other",
"document_type_label": "其他单据",
"scene_code": "other",
"scene_label": "其他票据",
"fields": [],
},
"source_receipt_id": receipt.id,
},
)
db.commit()
service = ExpenseClaimService(db)
metadata = service.get_claim_item_attachment_meta(
claim_id=claim.id,
item_id=item.id,
current_user=current_user,
)
assert metadata is not None
assert metadata["preview_kind"] == "image"
assert metadata["document_info"]["document_type"] == "train_ticket"
assert metadata["document_info"]["document_type_label"] == "火车/高铁票"
assert {
(field["label"], field["value"])
for field in metadata["document_info"]["fields"]
} >= {
("列车出发时间", "2026-02-20 07:55"),
("行程", "武汉-上海"),
("金额", "354元"),
}
preview_path, media_type, filename = service.get_claim_item_attachment_preview_content(
claim_id=claim.id,
item_id=item.id,
current_user=current_user,
)
assert media_type == "image/png"
assert filename.endswith(".png")
assert preview_path.read_bytes() == preview_bytes
finally:
get_settings.cache_clear()
def test_attachment_association_job_fails_without_editable_claim(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("STORAGE_ROOT_DIR", str(tmp_path / "storage"))
get_settings.cache_clear()
clear_attachment_association_jobs_for_tests()
try:
client, _session_factory = build_client(monkeypatch)
current_user = CurrentUserContext(
username="zhangsan@example.com",
name="张三",
role_codes=["user"],
is_admin=False,
employee_no="E10001",
)
receipt_id = save_train_receipt(
service=ReceiptFolderService(),
current_user=current_user,
filename="2月20 武汉-上海.pdf",
route="武汉-上海",
trip_date="2026-02-20",
)
headers = {
"x-auth-username": "zhangsan@example.com",
"x-auth-name": "Zhang San",
"x-auth-employee-no": "E10001",
"x-auth-role-codes": "user",
}
response = client.post(
"/api/v1/reimbursements/attachment-association-jobs",
headers=headers,
json={"receipt_ids": [receipt_id], "conversation_id": "inline-empty"},
)
assert response.status_code == 202
status_response = client.get(
f"/api/v1/reimbursements/attachment-association-jobs/{response.json()['job_id']}",
headers=headers,
)
assert status_response.status_code == 200
payload = status_response.json()
assert payload["status"] == "failed"
assert "没有找到可自动关联的报销草稿" in payload["message"]
finally:
clear_attachment_association_jobs_for_tests()
get_settings.cache_clear()