from __future__ import annotations import json import hashlib import mimetypes import re import shutil from datetime import UTC, datetime from pathlib import Path from typing import Any from uuid import uuid4 from app.api.deps import CurrentUserContext from app.core.config import get_settings from app.schemas.ocr import OcrRecognizeBatchRead, OcrRecognizeDocumentRead, OcrRecognizeFieldRead from app.schemas.receipt_folder import ( ReceiptFolderDeleteResponse, ReceiptFolderDetailRead, ReceiptFolderFieldRead, ReceiptFolderItemRead, ReceiptFolderUpdate, ) from app.services.document_preview import DocumentPreviewAssets from app.services.document_intelligence import build_document_insight from app.services.ocr import SUPPORTED_SUFFIXES RECEIPT_DATE_PATTERN = re.compile( r"((?:20\d{2}|19\d{2})(?:[-/年.]|\s+)(?:1[0-2]|0?[1-9])" r"(?:[-/月.]|\s+)(?:3[01]|[12]\d|0?[1-9])日?)" ) RECEIPT_TIME_PATTERN = re.compile(r"(?|—|–|-)\s*" r"([\u4e00-\u9fa5]{2,12})站?" ) TRAIN_ROUTE_WITH_NO_PATTERN = re.compile( r"([\u4e00-\u9fa5]{2,12})站?\s+[GCDZKTLYS]\d{1,5}\s+" r"([\u4e00-\u9fa5]{2,12})站?", re.IGNORECASE, ) TRAIN_NO_PATTERN = re.compile(r"(?:车次|列车号)\s*[::]?\s*([GCDZKTLYS]\d{1,5})", re.IGNORECASE) TRAIN_STANDALONE_NO_PATTERN = re.compile(r"(? str: normalized = Path(str(filename or "").strip()).name normalized = re.sub(r"[^\w.\-\u4e00-\u9fff]+", "_", normalized).strip("._") return normalized or "receipt.bin" @staticmethod def resolve_media_type(filename: str, fallback: str | None = None) -> str: return str(mimetypes.guess_type(filename)[0] or fallback or "application/octet-stream") def _owner_root(self, owner_key: str) -> Path: return self._assert_child(self.root / owner_key) def _receipt_dir(self, owner_key: str, receipt_id: str) -> Path: normalized = str(receipt_id or "").strip() if not re.fullmatch(r"[0-9a-fA-F-]{32,36}", normalized): raise FileNotFoundError("Receipt not found") path = self._assert_child(self._owner_root(owner_key) / normalized) if not path.exists() or not path.is_dir(): raise FileNotFoundError("Receipt not found") return path def _assert_child(self, path: Path) -> Path: self.root.mkdir(parents=True, exist_ok=True) resolved = path.resolve() try: resolved.relative_to(self.root) except ValueError as exc: raise FileNotFoundError("Receipt path is invalid") from exc return resolved @staticmethod def _owner_key(current_user: CurrentUserContext) -> str: raw = str(current_user.username or current_user.name or "anonymous").strip().lower() normalized = re.sub(r"[^\w.\-\u4e00-\u9fff]+", "_", raw).strip("._") return normalized or "anonymous" @staticmethod def _should_persist_source(filename: str, content: bytes) -> bool: if not content: return False return Path(str(filename or "")).suffix.lower() in SUPPORTED_SUFFIXES def _write_preview_asset( self, *, receipt_dir: Path, source_path: Path, media_type: str, document: Any | None, ) -> dict[str, Any]: preview_data_url = str(getattr(document, "preview_data_url", "") or "").strip() preview_asset = DocumentPreviewAssets.write_data_url_preview( preview_dir=receipt_dir, preview_name_stem="preview", preview_data_url=preview_data_url, ) if preview_asset is not None: _, preview_media_type, preview_name = preview_asset return { "previewable": True, "preview_kind": "image", "preview_file_name": preview_name, "preview_media_type": preview_media_type, "preview_rendered_with": DocumentPreviewAssets.renderer_id_for_source(media_type), } if str(media_type or "").strip() == "application/pdf": preview_path = receipt_dir / f"preview{DocumentPreviewAssets.PDF_PREVIEW_SUFFIX}" try: DocumentPreviewAssets.render_pdf_first_page( pdf_path=source_path, preview_path=preview_path, timeout_seconds=get_settings().ocr_timeout_seconds, ) except Exception: return { "previewable": True, "preview_kind": "pdf", "preview_file_name": source_path.name, "preview_media_type": media_type, "preview_rendered_with": "", } return { "previewable": True, "preview_kind": "image", "preview_file_name": preview_path.name, "preview_media_type": DocumentPreviewAssets.PDF_PREVIEW_MEDIA_TYPE, "preview_rendered_with": DocumentPreviewAssets.PDF_RENDERER_ID, } if self._is_previewable(media_type): return { "previewable": True, "preview_kind": "image" if media_type.startswith("image/") else "pdf", "preview_file_name": source_path.name, "preview_media_type": media_type, "preview_rendered_with": "", } return { "previewable": False, "preview_kind": "", "preview_file_name": "", "preview_media_type": "", "preview_rendered_with": "", } def _refresh_pdf_preview_asset_if_needed( self, *, receipt_dir: Path, meta: dict[str, Any], ) -> dict[str, Any]: source_name = str(meta.get("source_file_name") or meta.get("file_name") or "").strip() if not source_name: return meta source_path = self._assert_child(receipt_dir / source_name) source_media_type = self.resolve_media_type(source_path.name, str(meta.get("media_type") or "")) if source_media_type != "application/pdf" or not source_path.exists(): return meta preview_name = str(meta.get("preview_file_name") or "").strip() preview_path = self._assert_child(receipt_dir / preview_name) if preview_name else None if ( preview_path is not None and preview_path.exists() and str(meta.get("preview_kind") or "").strip() == "image" and str(meta.get("preview_media_type") or "").strip() == DocumentPreviewAssets.PDF_PREVIEW_MEDIA_TYPE and str(meta.get("preview_rendered_with") or "").strip() == DocumentPreviewAssets.PDF_RENDERER_ID ): return meta if not preview_name or not preview_name.lower().endswith(DocumentPreviewAssets.PDF_PREVIEW_SUFFIX): preview_name = f"preview{DocumentPreviewAssets.PDF_PREVIEW_SUFFIX}" preview_path = self._assert_child(receipt_dir / preview_name) try: DocumentPreviewAssets.render_pdf_first_page( pdf_path=source_path, preview_path=preview_path, timeout_seconds=get_settings().ocr_timeout_seconds, ) except Exception: meta.update( { "previewable": True, "preview_kind": "pdf", "preview_file_name": source_path.name, "preview_media_type": "application/pdf", "preview_rendered_with": "", } ) self._write_meta(receipt_dir, meta) return meta meta.update( { "previewable": True, "preview_kind": "image", "preview_file_name": preview_path.name, "preview_media_type": DocumentPreviewAssets.PDF_PREVIEW_MEDIA_TYPE, "preview_rendered_with": DocumentPreviewAssets.PDF_RENDERER_ID, } ) self._write_meta(receipt_dir, meta) return meta @staticmethod def _is_previewable(media_type: str) -> bool: return str(media_type or "").startswith("image/") or str(media_type or "") == "application/pdf" @classmethod def _build_document_meta(cls, document: Any | None) -> dict[str, Any]: fields = [] for field in list(getattr(document, "document_fields", []) or []): if isinstance(field, dict): fields.append( { "key": str(field.get("key") or "").strip(), "label": str(field.get("label") or "").strip(), "value": str(field.get("value") or "").strip(), } ) else: fields.append( { "key": str(getattr(field, "key", "") or "").strip(), "label": str(getattr(field, "label", "") or "").strip(), "value": str(getattr(field, "value", "") or "").strip(), } ) fields = [field for field in fields if field["label"] and field["value"]] ocr_text = str(getattr(document, "text", "") or "") summary = str(getattr(document, "summary", "") or "") document_type = str(getattr(document, "document_type", "") or "other") document_type_label = str(getattr(document, "document_type_label", "") or "其他单据") scene_label = str(getattr(document, "scene_label", "") or "其他票据") if cls._is_train_ticket_values( document_type=document_type, document_type_label=document_type_label, scene_label=scene_label, text=f"{summary}\n{ocr_text}", ): fields = cls._enrich_train_ticket_field_dicts( fields, text=f"{ocr_text}\n{summary}\n{str(getattr(document, 'filename', '') or '')}", ) return { "engine": str(getattr(document, "engine", "") or ""), "model": str(getattr(document, "model", "") or ""), "ocr_text": ocr_text, "summary": summary, "ocr_avg_score": float(getattr(document, "avg_score", 0.0) or 0.0), "ocr_line_count": int(getattr(document, "line_count", 0) or 0), "page_count": int(getattr(document, "page_count", 1) or 1), "document_type": document_type, "document_type_label": document_type_label, "scene_code": str(getattr(document, "scene_code", "") or "other"), "scene_label": scene_label, "ocr_classification_source": str(getattr(document, "classification_source", "") or ""), "ocr_classification_confidence": float(getattr(document, "classification_confidence", 0.0) or 0.0), "ocr_classification_evidence": [ str(value) for value in list(getattr(document, "classification_evidence", []) or []) if str(value).strip() ], "document_fields": fields, "editable_fields": {}, "ocr_warnings": [str(value) for value in list(getattr(document, "warnings", []) or []) if str(value).strip()], } def _iter_owner_meta(self, owner_key: str) -> list[dict[str, Any]]: owner_root = self._owner_root(owner_key) if not owner_root.exists(): return [] metas = [] for meta_path in owner_root.glob("*/meta.json"): meta = self._read_meta(meta_path.parent) if meta: metas.append(meta) return metas def _read_receipt_meta(self, receipt_id: str, current_user: CurrentUserContext) -> dict[str, Any]: return self._read_meta(self._receipt_dir(self._owner_key(current_user), receipt_id)) def _resolve_existing_item( self, receipt_id: str | None, current_user: CurrentUserContext, ) -> ReceiptFolderItemRead | None: normalized = str(receipt_id or "").strip() if not normalized: return None try: return self._build_item(self._read_receipt_meta(normalized, current_user)) except FileNotFoundError: return None @staticmethod def _meta_path(receipt_dir: Path) -> Path: return receipt_dir / "meta.json" def _read_meta(self, receipt_dir: Path) -> dict[str, Any]: meta_path = self._meta_path(receipt_dir) if not meta_path.exists(): raise FileNotFoundError("Receipt not found") try: payload = json.loads(meta_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: raise FileNotFoundError("Receipt metadata not found") from exc return payload if isinstance(payload, dict) else {} def _write_meta(self, receipt_dir: Path, payload: dict[str, Any]) -> None: self._meta_path(receipt_dir).write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) @staticmethod def _content_hash(content: bytes) -> str: return hashlib.sha256(content or b"").hexdigest() if content else "" @staticmethod def _operator_label(current_user: CurrentUserContext) -> str: return str(current_user.name or current_user.username or "当前用户").strip() or "当前用户" class ReceiptFolderItemMixin: @staticmethod def _matches_status(meta: dict[str, Any], status_filter: str) -> bool: if status_filter in {"", "all"}: return True return str(meta.get("status") or "unlinked").strip().lower() == status_filter def _build_item(self, meta: dict[str, Any]) -> ReceiptFolderItemRead: receipt_id = str(meta.get("id") or "").strip() status_value = str(meta.get("status") or "unlinked").strip() or "unlinked" identity = self._resolve_receipt_document_identity(meta) return ReceiptFolderItemRead( id=receipt_id, file_name=str(meta.get("file_name") or ""), media_type=str(meta.get("media_type") or "application/octet-stream"), size_bytes=int(meta.get("size_bytes") or 0), status=status_value, status_label="已关联" if status_value == "linked" else "未关联", document_type=identity["document_type"], document_type_label=identity["document_type_label"], scene_code=identity["scene_code"], scene_label=identity["scene_label"], summary=str(meta.get("summary") or ""), amount=self._resolve_editable_or_field(meta, "amount", labels=("金额", "价税合计", "票价")), document_date=self._resolve_receipt_document_date(meta), merchant_name=self._resolve_receipt_merchant_name(meta), avg_score=float(meta.get("ocr_avg_score") or 0.0), uploaded_at=self._parse_datetime(meta.get("uploaded_at")), linked_at=self._parse_datetime(meta.get("linked_at")), linked_claim_id=str(meta.get("linked_claim_id") or ""), linked_claim_no=str(meta.get("linked_claim_no") or ""), previewable=bool(meta.get("previewable")), preview_kind=str(meta.get("preview_kind") or ""), preview_url=f"/receipt-folder/{receipt_id}/preview" if bool(meta.get("previewable")) and receipt_id else "", source_url=f"/receipt-folder/{receipt_id}/source" if receipt_id else "", warnings=[str(value) for value in list(meta.get("ocr_warnings") or []) if str(value).strip()], ) def _resolve_receipt_document_identity(self, meta: dict[str, Any]) -> dict[str, str]: document_type = str(meta.get("document_type") or "other").strip() or "other" document_type_label = str(meta.get("document_type_label") or "其他单据").strip() or "其他单据" scene_code = str(meta.get("scene_code") or "other").strip() or "other" scene_label = str(meta.get("scene_label") or "其他票据").strip() or "其他票据" if document_type not in {"", "other"} and document_type_label != "其他单据": return { "document_type": document_type, "document_type_label": document_type_label, "scene_code": scene_code, "scene_label": scene_label, } insight = build_document_insight( filename=str(meta.get("file_name") or ""), summary=str(meta.get("summary") or ""), text=self._receipt_text(meta), ) if insight.document_type in {"", "other"}: return { "document_type": document_type, "document_type_label": document_type_label, "scene_code": scene_code, "scene_label": scene_label, } return { "document_type": insight.document_type, "document_type_label": insight.document_type_label, "scene_code": insight.scene_code, "scene_label": insight.scene_label, } def _resolve_fields(self, meta: dict[str, Any]) -> list[ReceiptFolderFieldRead]: fields = [ ReceiptFolderFieldRead( key=str(field.get("key") or ""), label=str(field.get("label") or ""), value=str(field.get("value") or ""), ) for field in list(meta.get("document_fields") or []) if isinstance(field, dict) and str(field.get("label") or "").strip() ] if self._is_train_ticket_meta(meta): return [ ReceiptFolderFieldRead(**field) for field in self._enrich_train_ticket_field_dicts( [field.model_dump() for field in fields], text=self._receipt_text(meta), ) ] return fields def _resolve_edit_logs(self, meta: dict[str, Any]) -> list[dict[str, Any]]: logs = [] for log in list(meta.get("edit_logs") or []): if not isinstance(log, dict): continue changes = [ { "key": str(change.get("key") or ""), "label": str(change.get("label") or ""), "before": str(change.get("before") or ""), "after": str(change.get("after") or ""), } for change in list(log.get("changes") or []) if isinstance(change, dict) and str(change.get("label") or change.get("key") or "").strip() ] if not changes: continue logs.append( { "operated_at": self._parse_datetime(log.get("operated_at")), "operator": str(log.get("operator") or "当前用户").strip() or "当前用户", "changes": changes, } ) return logs def _build_edit_changes(self, before_meta: dict[str, Any], after_meta: dict[str, Any]) -> list[dict[str, str]]: before_values = self._flatten_editable_receipt_values(before_meta) after_values = self._flatten_editable_receipt_values(after_meta) changes = [] for key in sorted(set(before_values) | set(after_values)): before = before_values.get(key, {}) after = after_values.get(key, {}) before_value = str(before.get("value") or "").strip() after_value = str(after.get("value") or "").strip() if before_value == after_value: continue label = str(after.get("label") or before.get("label") or key).strip() changes.append( { "key": key, "label": label, "before": before_value, "after": after_value, } ) return changes def _flatten_editable_receipt_values(self, meta: dict[str, Any]) -> dict[str, dict[str, str]]: values = { "document_type_label": { "label": "票据类型", "value": str(meta.get("document_type_label") or "").strip(), }, "scene_label": { "label": "费用场景", "value": str(meta.get("scene_label") or "").strip(), }, "summary": { "label": "摘要", "value": str(meta.get("summary") or "").strip(), }, "amount": { "label": "金额", "value": self._resolve_editable_or_field(meta, "amount", labels=("金额", "价税合计", "票价")), }, "document_date": { "label": "票据日期", "value": self._resolve_receipt_document_date(meta), }, "merchant_name": { "label": "商户", "value": self._resolve_receipt_merchant_name(meta), }, } for index, field in enumerate(list(meta.get("document_fields") or [])): if not isinstance(field, dict): continue key = str(field.get("key") or "").strip() label = str(field.get("label") or "").strip() value = str(field.get("value") or "").strip() stable_key = key or f"field_{index}_{label}" if not stable_key and not label: continue values[stable_key] = { "label": label or stable_key, "value": value, } return values def _resolve_receipt_document_date(self, meta: dict[str, Any]) -> str: editable = meta.get("editable_fields") if isinstance(editable, dict): value = str(editable.get("document_date") or "").strip() if value: return value fields = self._resolve_fields(meta) for field in fields: if field.key in {"invoice_date", "issue_date"} or field.label in {"开票日期", "发票日期"}: return self._normalize_receipt_date_value(field.value) if self._is_train_ticket_meta(meta): invoice_date = self._extract_train_invoice_date(self._receipt_text(meta)) if invoice_date: return invoice_date for field in fields: if field.key == "document_date" or field.label in {"日期", "乘车日期", "列车出发时间", "行程日期"}: return self._normalize_receipt_date_value(field.value) return "" def _resolve_receipt_merchant_name(self, meta: dict[str, Any]) -> str: value = self._resolve_editable_or_field(meta, "merchant_name", labels=("商户", "销售方", "收款方", "开票方")) if value: return value if self._is_train_ticket_meta(meta): return "中国铁路" return "" def _resolve_editable_or_field(self, meta: dict[str, Any], key: str, *, labels: tuple[str, ...]) -> str: editable = meta.get("editable_fields") if isinstance(editable, dict): value = str(editable.get(key) or "").strip() if value: return value label_set = set(labels) for field in self._resolve_fields(meta): if field.label in label_set or field.key == key: return field.value return "" class ReceiptFolderTrainTicketMixin: @classmethod def _enrich_train_ticket_field_dicts( cls, fields: list[dict[str, Any]], *, text: str, ) -> list[dict[str, str]]: normalized: list[dict[str, str]] = [] for field in fields: key = str(field.get("key") or "").strip() label = str(field.get("label") or "").strip() value = str(field.get("value") or "").strip() if not label or not value: continue if key == "merchant_name" or label == "商户": continue if not cls._should_keep_train_ticket_field(key=key, label=label, value=value): continue if key == "trip_no" and label == "车次/航班": label = "车次" if key == "route" and label == "行程": label = "行程" normalized.append({"key": key, "label": label, "value": value}) def add_field(key: str, label: str, value: str) -> None: cleaned = str(value or "").strip() if not cleaned: return if any(item["key"] == key for item in normalized if item["key"]): return if any(item["label"] == label for item in normalized): return normalized.append({"key": key, "label": label, "value": cleaned}) add_field("merchant_name", "商户", "中国铁路") invoice_date = cls._extract_train_invoice_date(text) add_field("invoice_date", "开票日期", invoice_date) trip_datetime = cls._extract_train_trip_datetime(text) add_field("trip_date", "列车出发时间", trip_datetime) departure, arrival = cls._extract_train_route_points(text) add_field("departure_station", "出发地点", departure) add_field("arrival_station", "到达地点", arrival) if departure and arrival: add_field("route", "行程", f"{departure}-{arrival}") add_field("train_no", "车次", cls._extract_first(TRAIN_NO_PATTERN, text) or cls._extract_first(TRAIN_STANDALONE_NO_PATTERN, text)) id_number = cls._extract_train_id_number(text) add_field("passenger_name", "乘车人", cls._extract_train_passenger_name(text, id_number=id_number)) add_field("id_number", "身份证号", id_number) add_field("electronic_ticket_no", "电子客票号", cls._extract_first(TRAIN_ETICKET_PATTERN, text)) add_field("seat_class", "席别", cls._extract_first(TRAIN_SEAT_CLASS_PATTERN, text)) carriage_no, seat_no = cls._extract_train_carriage_and_seat(text) add_field("carriage_no", "车厢", carriage_no) add_field("seat_no", "座位号", seat_no) add_field("fare", "票价", cls._extract_train_fare(text)) return normalized @staticmethod def _is_train_ticket_values( *, document_type: str, document_type_label: str, scene_label: str, text: str, ) -> bool: if str(document_type or "").strip().lower() == "train_ticket": return True compact = "".join([document_type_label, scene_label, text]).replace(" ", "") if any(token in compact for token in ("火车", "高铁", "动车", "铁路", "电子客票", "车次")): return True lower_compact = compact.lower() return bool(re.search(r"[GCDZKTLYS]\d{1,5}", compact, flags=re.IGNORECASE)) and ( "12306" in compact or "95306" in compact or re.search(r"[\u4e00-\u9fa5]{2,12}(?:至|到|→|->|—|–|-)[\u4e00-\u9fa5]{2,12}", compact) or ("wuhan" in lower_compact and "shanghai" in lower_compact) ) @classmethod def _is_train_ticket_meta(cls, meta: dict[str, Any]) -> bool: return cls._is_train_ticket_values( document_type=str(meta.get("document_type") or ""), document_type_label=str(meta.get("document_type_label") or ""), scene_label=str(meta.get("scene_label") or ""), text=cls._receipt_text(meta), ) @staticmethod def _receipt_text(meta: dict[str, Any]) -> str: field_text = "\n".join( f"{field.get('label', '')} {field.get('value', '')}" for field in list(meta.get("document_fields") or []) if isinstance(field, dict) ) return "\n".join( value for value in ( str(meta.get("ocr_text") or ""), str(meta.get("summary") or ""), str(meta.get("file_name") or ""), field_text, ) if value ) @classmethod def _extract_train_invoice_date(cls, text: str) -> str: match = TRAIN_INVOICE_DATE_PATTERN.search(str(text or "")) if not match: return "" return cls._normalize_receipt_date_value(match.group(1)) @classmethod def _extract_train_trip_datetime(cls, text: str) -> str: raw_text = str(text or "") candidates: list[tuple[int, int, str]] = [] for index, match in enumerate(RECEIPT_DATE_PATTERN.finditer(raw_text)): window = raw_text[max(0, match.start() - 14): match.end() + 8].replace(" ", "") if any(token in window for token in ("开票日期", "发票日期", "开票时间")): continue value = cls._format_date_match_with_time(raw_text, match) score = 0 nearby = raw_text[max(0, match.start() - 32): match.end() + 32] compact = nearby.replace(" ", "") if ":" in value or ":" in value: score += 8 if any(token in compact for token in ("开车时间", "发车时间", "乘车日期", "乘车时间", "检票", "车次")): score += 6 if any(token in compact for token in ("二等座", "一等座", "商务座", "硬座", "软卧", "硬卧")): score += 3 candidates.append((score, -index, value)) if not candidates: return "" return max(candidates, key=lambda item: (item[0], item[1]))[2] @classmethod def _format_date_match_with_time(cls, text: str, match: re.Match[str]) -> str: date_value = cls._normalize_receipt_date_value(match.group(1)) if not date_value: return "" surrounding = str(text or "")[max(0, match.start() - 18): match.end() + 24] time_match = RECEIPT_TIME_PATTERN.search(surrounding) if not time_match: return date_value return f"{date_value} {str(time_match.group(1)).zfill(2)}:{str(time_match.group(2)).zfill(2)}" @staticmethod def _normalize_receipt_date_value(value: str) -> str: raw = str(value or "").strip() match = RECEIPT_DATE_PATTERN.search(raw) if not match: return raw normalized = match.group(1).replace("年", "-").replace("月", "-").replace("日", "") normalized = normalized.replace("/", "-").replace(".", "-") normalized = re.sub(r"\s+", "-", normalized) parts = [part for part in normalized.split("-") if part] if len(parts) != 3: return match.group(1) year, month, day = parts return f"{year.zfill(4)}-{month.zfill(2)}-{day.zfill(2)}" @classmethod def _extract_train_route_points(cls, text: str) -> tuple[str, str]: raw_text = str(text or "") split_line_match = TRAIN_ROUTE_WITH_NO_PATTERN.search(raw_text) if split_line_match: departure = cls._clean_train_station(split_line_match.group(1)) arrival = cls._clean_train_station(split_line_match.group(2)) if cls._is_valid_train_station_value(departure) and cls._is_valid_train_station_value(arrival) and departure != arrival: return departure, arrival station_candidates: list[str] = [] for line in raw_text.replace("\r", "\n").splitlines(): candidate = cls._clean_train_station(line) if not candidate or candidate in station_candidates: continue if not str(line or "").strip().endswith("站"): continue if not cls._is_valid_train_station_value(candidate): continue station_candidates.append(candidate) if len(station_candidates) >= 2: return station_candidates[0], station_candidates[1] match = TRAIN_ROUTE_PATTERN.search(raw_text) if match: departure = cls._clean_train_station(match.group(1)) arrival = cls._clean_train_station(match.group(2)) if cls._is_valid_train_station_value(departure) and cls._is_valid_train_station_value(arrival) and departure != arrival: return departure, arrival return "", "" @staticmethod def _clean_train_station(value: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9\u4e00-\u9fa5()()·]", "", str(value or "")) cleaned = re.sub(r"(?:火车站|高铁站|站)$", "", cleaned) return cleaned.strip() @classmethod def _should_keep_train_ticket_field(cls, *, key: str, label: str, value: str) -> bool: if key in TRAIN_STATION_FIELD_KEYS or label in TRAIN_STATION_FIELD_LABELS: return cls._is_valid_train_station_value(value) if key == "passenger_name" or label == "乘车人": return bool(cls._clean_train_passenger_candidate(value)) return True @classmethod def _is_valid_train_station_value(cls, value: str) -> bool: cleaned = cls._clean_train_station(value) if not 2 <= len(cleaned) <= 12: return False if any(token in cleaned for token in TRAIN_INVALID_STATION_TOKENS): return False if re.search(r"[A-Za-z0-9]", cleaned): return False return True @staticmethod def _extract_first(pattern: re.Pattern[str], text: str) -> str: match = pattern.search(str(text or "")) return str(match.group(1) or "").strip() if match else "" @classmethod def _extract_train_passenger_name(cls, text: str, *, id_number: str = "") -> str: lines = [line.strip() for line in str(text or "").replace("\r", "\n").splitlines() if line.strip()] for line in lines: labeled = cls._clean_train_passenger_candidate(cls._extract_first(TRAIN_PASSENGER_PATTERN, line)) if labeled: return labeled if id_number: for index, line in enumerate(lines): if id_number not in line: continue candidate = cls._clean_train_passenger_candidate(line.replace(id_number, " ")) if candidate: return candidate for offset in (1, -1, 2): target_index = index + offset if target_index < 0 or target_index >= len(lines): continue candidate = cls._clean_train_passenger_candidate(lines[target_index]) if candidate: return candidate for line in lines: purchase_match = TRAIN_PURCHASER_NAME_PATTERN.search(line) if purchase_match: candidate = cls._clean_train_passenger_candidate(purchase_match.group(1)) if candidate: return candidate return "" @staticmethod def _clean_train_passenger_candidate(value: str) -> str: cleaned = re.sub(r"[^·\u4e00-\u9fa5]", "", str(value or "")).strip() if not 2 <= len(cleaned) <= 8: return "" if any( token in cleaned for token in ( "电子", "客票", "铁路", "发票", "税务", "湖北省", "中国铁路", "开票", "日期", "车厢", "席别", "二等座", "一等座", "商务座", "特等座", "软座", "硬座", "无座", "软卧", "硬卧", "座位", "票价", "金额", "行程", "出发", "到达", "车次", "公司", "信用代码", "纳税人", "扫码", "无效", "二维码", "座席", "身份", "身份证号", "证件", ) ): return "" return cleaned @classmethod def _extract_train_id_number(cls, text: str) -> str: labeled = cls._extract_first(TRAIN_ID_PATTERN, text) if labeled: return labeled fallback = "" for line in str(text or "").replace("\r", "\n").splitlines(): compact_line = line.replace(" ", "") if any(token in compact_line for token in ("发票号码", "电子客票号", "客票号", "订单号")): continue match = TRAIN_ID_FALLBACK_PATTERN.search(compact_line) if not match: continue candidate = str(match.group(1) or "").strip() if "*" in candidate: return candidate if not fallback: fallback = candidate return fallback @staticmethod def _extract_train_carriage_and_seat(text: str) -> tuple[str, str]: combined_match = TRAIN_COMBINED_SEAT_PATTERN.search(str(text or "")) if combined_match: return f"{combined_match.group(1)}车", combined_match.group(2) loose_match = TRAIN_LOOSE_SEAT_PATTERN.search(str(text or "")) if loose_match: return f"{loose_match.group(1).zfill(2)}车", loose_match.group(2).upper() carriage_no = ReceiptFolderService._extract_first(TRAIN_CARRIAGE_PATTERN, text).replace(" ", "") seat_no = ReceiptFolderService._extract_first(TRAIN_SEAT_NO_PATTERN, text) return carriage_no, seat_no @staticmethod def _extract_train_fare(text: str) -> str: match = TRAIN_FARE_PATTERN.search(str(text or "")) if not match: match = max( list(TRAIN_LOOSE_FARE_PATTERN.finditer(str(text or ""))), key=lambda item: float(str(item.group(1) or "0").replace(",", ".")), default=None, ) if not match: return "" value = str(match.group(1) or "").replace(",", ".").strip() return f"{value}元" if value else "" @staticmethod def _parse_datetime(value: Any) -> datetime | None: raw = str(value or "").strip() if not raw: return None try: return datetime.fromisoformat(raw) except ValueError: return None class ReceiptFolderService(ReceiptFolderStorageMixin, ReceiptFolderItemMixin, ReceiptFolderTrainTicketMixin): def __init__(self) -> None: self.root = (get_settings().resolved_storage_root_dir / "receipt_folder").resolve() def persist_ocr_batch( self, *, files: list[tuple[str, bytes, str | None]], result: OcrRecognizeBatchRead, current_user: CurrentUserContext, receipt_ids: list[str] | None = None, ) -> OcrRecognizeBatchRead: documents = list(result.documents or []) enriched: list[OcrRecognizeDocumentRead] = [] for index, document in enumerate(documents): if index >= len(files): enriched.append(document) continue existing_receipt = self._resolve_existing_item( receipt_ids[index] if receipt_ids and index < len(receipt_ids) else "", current_user, ) if existing_receipt is not None: enriched.append( self._enrich_ocr_document_with_receipt( document, receipt=existing_receipt, current_user=current_user, ) ) continue filename, content, media_type = files[index] if not self._should_persist_source(filename, content): enriched.append(document) continue duplicate_receipt = self.find_duplicate_receipt( filename=filename, content=content, current_user=current_user, ) if duplicate_receipt is not None: duplicate_receipt = self._refresh_duplicate_receipt_from_document_if_stronger( receipt=duplicate_receipt, document=document, current_user=current_user, ) warning = "已上传过同样的单据,请不要重复上传。" existing_warnings = [str(item) for item in list(document.warnings or []) if str(item).strip()] enriched.append( self._enrich_ocr_document_with_receipt( document, receipt=duplicate_receipt, current_user=current_user, extra_warnings=[*existing_warnings, warning], ) ) continue receipt = self.save_receipt( filename=filename, content=content, media_type=media_type or document.media_type, document=document, current_user=current_user, ) enriched.append( self._enrich_ocr_document_with_receipt( document, receipt=receipt, current_user=current_user, ) ) return result.model_copy(update={"documents": enriched}) def _enrich_ocr_document_with_receipt( self, document: OcrRecognizeDocumentRead, *, receipt: ReceiptFolderItemRead, current_user: CurrentUserContext, extra_warnings: list[str] | None = None, ) -> OcrRecognizeDocumentRead: update: dict[str, Any] = { "receipt_id": receipt.id, "receipt_status": receipt.status, "receipt_preview_url": receipt.preview_url, "receipt_source_url": receipt.source_url, } try: meta = self._read_receipt_meta(receipt.id, current_user) except FileNotFoundError: meta = {} if meta: update.update( { "text": str(meta.get("ocr_text") or document.text or ""), "summary": str(meta.get("summary") or document.summary or ""), "document_type": str(meta.get("document_type") or document.document_type or "other"), "document_type_label": str(meta.get("document_type_label") or document.document_type_label or "其他单据"), "scene_code": str(meta.get("scene_code") or document.scene_code or "other"), "scene_label": str(meta.get("scene_label") or document.scene_label or "其他票据"), "classification_source": str(meta.get("ocr_classification_source") or document.classification_source or ""), "classification_confidence": float( meta.get("ocr_classification_confidence") or document.classification_confidence or 0.0 ), "classification_evidence": [ str(value) for value in list(meta.get("ocr_classification_evidence") or document.classification_evidence or []) if str(value).strip() ], "document_fields": self._build_ocr_document_fields_from_meta(meta), "preview_kind": str(meta.get("preview_kind") or document.preview_kind or ""), } ) warnings = [ str(item) for item in list(extra_warnings if extra_warnings is not None else document.warnings or []) if str(item).strip() ] if warnings: update["warnings"] = list(dict.fromkeys(warnings)) return document.model_copy(update=update) def _refresh_duplicate_receipt_from_document_if_stronger( self, *, receipt: ReceiptFolderItemRead, document: OcrRecognizeDocumentRead, current_user: CurrentUserContext, ) -> ReceiptFolderItemRead: try: meta = self._read_receipt_meta(receipt.id, current_user) except FileNotFoundError: return receipt incoming_meta = self._build_document_meta(document) if not self._is_incoming_document_meta_stronger(meta, incoming_meta): return receipt for key in ( "engine", "model", "ocr_text", "summary", "ocr_avg_score", "ocr_line_count", "page_count", "document_type", "document_type_label", "scene_code", "scene_label", "ocr_classification_source", "ocr_classification_confidence", "ocr_classification_evidence", "document_fields", "ocr_warnings", ): meta[key] = incoming_meta[key] meta["updated_at"] = datetime.now(UTC).isoformat() self._write_meta(self._receipt_dir(self._owner_key(current_user), receipt.id), meta) return self._build_item(meta) @staticmethod def _is_incoming_document_meta_stronger(existing_meta: dict[str, Any], incoming_meta: dict[str, Any]) -> bool: existing_type = str(existing_meta.get("document_type") or "other").strip() or "other" incoming_type = str(incoming_meta.get("document_type") or "other").strip() or "other" existing_fields = [field for field in list(existing_meta.get("document_fields") or []) if isinstance(field, dict)] incoming_fields = [field for field in list(incoming_meta.get("document_fields") or []) if isinstance(field, dict)] existing_text = str(existing_meta.get("ocr_text") or "").strip() incoming_text = str(incoming_meta.get("ocr_text") or "").strip() if incoming_type != "other" and existing_type == "other": return True if incoming_fields and not existing_fields: return True if incoming_text and not existing_text: return True return False def _build_ocr_document_fields_from_meta(self, meta: dict[str, Any]) -> list[OcrRecognizeFieldRead]: return [ OcrRecognizeFieldRead( key=field.key, label=field.label, value=field.value, ) for field in self._resolve_fields(meta) if field.label and field.value ] def save_receipt( self, *, filename: str, content: bytes, media_type: str | None, document: Any | None, current_user: CurrentUserContext, linked_claim_id: str = "", linked_claim_no: str = "", linked_item_id: str = "", ) -> ReceiptFolderItemRead: owner_key = self._owner_key(current_user) receipt_id = str(uuid4()) receipt_dir = self._owner_root(owner_key) / receipt_id receipt_dir.mkdir(parents=True, exist_ok=True) normalized_name = self.normalize_filename(filename) source_path = receipt_dir / normalized_name source_path.write_bytes(content) resolved_media_type = self.resolve_media_type(normalized_name, media_type) preview_meta = self._write_preview_asset( receipt_dir=receipt_dir, source_path=source_path, media_type=resolved_media_type, document=document, ) now = datetime.now(UTC) linked = bool(str(linked_claim_id or "").strip()) meta = { "id": receipt_id, "owner_key": owner_key, "file_name": normalized_name, "source_file_name": normalized_name, "media_type": resolved_media_type, "size_bytes": len(content), "file_sha256": self._content_hash(content), "uploaded_at": now.isoformat(), "status": "linked" if linked else "unlinked", "linked_claim_id": str(linked_claim_id or "").strip(), "linked_claim_no": str(linked_claim_no or "").strip(), "linked_item_id": str(linked_item_id or "").strip(), "linked_at": now.isoformat() if linked else "", **self._build_document_meta(document), **preview_meta, } self._write_meta(receipt_dir, meta) return self._build_item(meta) def save_linked_attachment( self, *, file_path: Path, media_type: str, document: Any | None, current_user: CurrentUserContext, claim_id: str, claim_no: str, item_id: str, source_receipt_id: str = "", ) -> ReceiptFolderItemRead | None: if not file_path.exists() or not file_path.is_file(): return None if str(source_receipt_id or "").strip(): try: return self.mark_receipt_linked( receipt_id=source_receipt_id, current_user=current_user, claim_id=claim_id, claim_no=claim_no, item_id=item_id, ) except FileNotFoundError: pass storage_root = get_settings().resolved_storage_root_dir try: file_path.resolve().relative_to(storage_root) except ValueError: return None return self.save_receipt( filename=file_path.name, content=file_path.read_bytes(), media_type=media_type, document=document, current_user=current_user, linked_claim_id=claim_id, linked_claim_no=claim_no, linked_item_id=item_id, ) def mark_receipt_linked( self, *, receipt_id: str, current_user: CurrentUserContext, claim_id: str, claim_no: str, item_id: str, ) -> ReceiptFolderItemRead: owner_key = self._owner_key(current_user) receipt_dir = self._receipt_dir(owner_key, receipt_id) meta = self._read_meta(receipt_dir) meta["status"] = "linked" meta["linked_claim_id"] = str(claim_id or "").strip() meta["linked_claim_no"] = str(claim_no or "").strip() meta["linked_item_id"] = str(item_id or "").strip() meta["linked_at"] = datetime.now(UTC).isoformat() self._write_meta(receipt_dir, meta) return self._build_item(meta) def list_receipts( self, *, current_user: CurrentUserContext, status_filter: str = "all", ) -> list[ReceiptFolderItemRead]: status_filter = str(status_filter or "all").strip().lower() items = [ self._build_item(meta) for meta in self._iter_owner_meta(self._owner_key(current_user)) if self._matches_status(meta, status_filter) ] return sorted(items, key=lambda item: item.uploaded_at or datetime.min.replace(tzinfo=UTC), reverse=True) def get_receipt(self, receipt_id: str, current_user: CurrentUserContext) -> ReceiptFolderDetailRead: meta = self._read_receipt_meta(receipt_id, current_user) item = self._build_item(meta) return ReceiptFolderDetailRead( **item.model_dump(), engine=str(meta.get("engine") or ""), model=str(meta.get("model") or ""), ocr_text=str(meta.get("ocr_text") or ""), line_count=int(meta.get("ocr_line_count") or 0), page_count=max(1, int(meta.get("page_count") or 1)), classification_confidence=float(meta.get("ocr_classification_confidence") or 0.0), classification_evidence=[ str(value) for value in list(meta.get("ocr_classification_evidence") or []) if str(value).strip() ], fields=self._resolve_fields(meta), raw_meta=meta, edit_logs=self._resolve_edit_logs(meta), ) def find_duplicate_receipt( self, *, filename: str, content: bytes, current_user: CurrentUserContext, ) -> ReceiptFolderItemRead | None: if not self._should_persist_source(filename, content): return None file_hash = self._content_hash(content) for meta in self._iter_owner_meta(self._owner_key(current_user)): if file_hash and str(meta.get("file_sha256") or "").strip() == file_hash: return self._build_item(meta) return None def update_receipt( self, *, receipt_id: str, payload: ReceiptFolderUpdate, current_user: CurrentUserContext, ) -> ReceiptFolderDetailRead: owner_key = self._owner_key(current_user) receipt_dir = self._receipt_dir(owner_key, receipt_id) meta = self._read_meta(receipt_dir) before_meta = json.loads(json.dumps(meta, ensure_ascii=False)) updates = payload.model_dump(exclude_unset=True) for key in ("document_type", "document_type_label", "scene_code", "scene_label", "summary"): if key in updates and updates[key] is not None: meta[key] = str(updates[key] or "").strip() editable = dict(meta.get("editable_fields") or {}) for key in ("amount", "document_date", "merchant_name"): if key in updates and updates[key] is not None: editable[key] = str(updates[key] or "").strip() if "fields" in updates and updates["fields"] is not None: meta["document_fields"] = [ field.model_dump() if isinstance(field, ReceiptFolderFieldRead) else dict(field) for field in payload.fields or [] ] meta["editable_fields"] = editable changes = self._build_edit_changes(before_meta, meta) if changes: logs = list(meta.get("edit_logs") or []) logs.insert( 0, { "operated_at": datetime.now(UTC).isoformat(), "operator": self._operator_label(current_user), "changes": changes, }, ) meta["edit_logs"] = logs[:50] meta["updated_at"] = datetime.now(UTC).isoformat() self._write_meta(receipt_dir, meta) return self.get_receipt(receipt_id, current_user) def delete_receipt( self, *, receipt_id: str, current_user: CurrentUserContext, ) -> ReceiptFolderDeleteResponse: owner_key = self._owner_key(current_user) receipt_dir = self._receipt_dir(owner_key, receipt_id) shutil.rmtree(receipt_dir) return ReceiptFolderDeleteResponse(message="票据已删除。", receipt_id=receipt_id) def unlink_receipts_for_claim(self, claim_id: str) -> int: normalized_claim_id = str(claim_id or "").strip() if not normalized_claim_id: return 0 unlinked_count = 0 self.root.mkdir(parents=True, exist_ok=True) for meta_path in list(self.root.glob("*/*/meta.json")): try: meta = self._read_meta(meta_path.parent) except FileNotFoundError: continue if str(meta.get("linked_claim_id") or "").strip() != normalized_claim_id: continue meta["status"] = "unlinked" meta["linked_claim_id"] = "" meta["linked_claim_no"] = "" meta["linked_item_id"] = "" meta["linked_at"] = "" meta["updated_at"] = datetime.now(UTC).isoformat() self._write_meta(meta_path.parent, meta) unlinked_count += 1 return unlinked_count def delete_receipts_for_claim(self, claim_id: str) -> int: return self.unlink_receipts_for_claim(claim_id) def resolve_source(self, receipt_id: str, current_user: CurrentUserContext) -> tuple[Path, str, str]: meta = self._read_receipt_meta(receipt_id, current_user) receipt_dir = self._receipt_dir(self._owner_key(current_user), receipt_id) file_name = str(meta.get("source_file_name") or meta.get("file_name") or "").strip() path = self._assert_child(receipt_dir / file_name) if not path.exists(): raise FileNotFoundError("Receipt source not found") media_type = self.resolve_media_type(path.name, str(meta.get("media_type") or "")) return path, media_type, str(meta.get("file_name") or path.name) def resolve_preview(self, receipt_id: str, current_user: CurrentUserContext) -> tuple[Path, str, str]: meta = self._read_receipt_meta(receipt_id, current_user) receipt_dir = self._receipt_dir(self._owner_key(current_user), receipt_id) meta = self._refresh_pdf_preview_asset_if_needed(receipt_dir=receipt_dir, meta=meta) preview_name = str(meta.get("preview_file_name") or "").strip() if preview_name: preview_path = self._assert_child(receipt_dir / preview_name) if preview_path.exists(): return ( preview_path, self.resolve_media_type(preview_path.name, str(meta.get("preview_media_type") or "")), preview_path.name, ) source_path, source_media_type, source_name = self.resolve_source(receipt_id, current_user) if self._is_previewable(source_media_type): return source_path, source_media_type, source_name raise FileNotFoundError("Receipt preview not found")