from __future__ import annotations import json import re import shutil from pathlib import Path from app.core.config import get_settings from app.models.financial_record import ExpenseClaim, ExpenseClaimItem class ExpenseClaimAttachmentStorage: """Centralizes filesystem operations for expense claim attachments.""" def root(self) -> Path: return (get_settings().resolved_storage_root_dir / "expense_claims").resolve() def build_item_dir(self, claim_id: str, item_id: str) -> Path: return (self.root() / claim_id / item_id).resolve() def delete_claim_files(self, claim: ExpenseClaim) -> None: for item in list(claim.items or []): self.delete_item_files(item) self.delete_claim_root(claim.id) def delete_claim_root(self, claim_id: str) -> None: claim_root = self._assert_child(self.root() / claim_id) self._delete_path(claim_root) @staticmethod def normalize_filename(filename: str | None) -> str: normalized = Path(str(filename or "").strip()).name normalized = re.sub(r"[^\w.\-\u4e00-\u9fff]+", "_", normalized).strip("._") suffix = Path(normalized).suffix if normalized: return normalized return f"attachment{suffix or '.bin'}" def resolve_path(self, storage_key: str | None) -> Path | None: normalized = str(storage_key or "").strip() if not normalized: return None root = self.root() path = (root / normalized).resolve() try: path.relative_to(root) except ValueError as exc: raise FileNotFoundError("Attachment path is invalid") from exc return path def resolve_item_path(self, item: ExpenseClaimItem) -> Path | None: if not str(item.invoice_id or "").strip(): return None file_path = self.resolve_path(item.invoice_id) if file_path is not None and file_path.exists(): return file_path filename = self.normalize_filename(item.invoice_id) if not filename: return file_path fallback_path = (self.build_item_dir(item.claim_id, item.id) / filename).resolve() try: fallback_path.relative_to(self.root()) except ValueError as exc: raise FileNotFoundError("Attachment path is invalid") from exc return fallback_path def to_storage_key(self, file_path: Path) -> str: return file_path.resolve().relative_to(self.root()).as_posix() def delete_item_files(self, item: ExpenseClaimItem) -> None: file_path = self.resolve_item_path(item) if file_path is None: return root = self.root() if file_path.parent == root: self._delete_path(file_path) self._delete_path(self.meta_path(file_path)) return self._delete_path(file_path.parent) @staticmethod def meta_path(file_path: Path) -> Path: return file_path.with_name(f"{file_path.name}.meta.json") def write_meta(self, file_path: Path, payload: dict) -> None: meta_path = self.meta_path(file_path) meta_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def read_meta(self, file_path: Path) -> dict: meta_path = self.meta_path(file_path) if not meta_path.exists(): return {} try: payload = json.loads(meta_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return {} return payload if isinstance(payload, dict) else {} def _assert_child(self, path: Path) -> Path: root = self.root() resolved = path.resolve() try: resolved.relative_to(root) except ValueError as exc: raise FileNotFoundError("Attachment path is invalid") from exc return resolved def _delete_path(self, path: Path | None) -> None: if path is None: return target = self._assert_child(path) if not target.exists(): return if target.is_dir(): shutil.rmtree(target) else: target.unlink() if target.exists(): raise OSError(f"Attachment path was not deleted: {target}")