from __future__ import annotations from datetime import UTC, datetime from pathlib import Path from typing import Any from app.core.agent_enums import AgentAssetType from app.models.agent_asset import AgentAsset from app.schemas.agent_asset import ( AgentAssetSpreadsheetDiffCellRead, AgentAssetSpreadsheetDiffSheetRead, ) from app.services.agent_asset_spreadsheet import ( COMPANY_COMMUNICATION_EXPENSE_RULE_CODE, COMPANY_COMMUNICATION_EXPENSE_RULE_FILENAME, COMPANY_TRAVEL_ALLOWANCE_RULE_CODE, COMPANY_TRAVEL_ALLOWANCE_RULE_FILENAME, COMPANY_TRAVEL_EXPENSE_RULE_CODE, COMPANY_TRAVEL_EXPENSE_RULE_FILENAME, COMPANY_TRAVEL_GRADE_MAPPING_RULE_CODE, COMPANY_TRAVEL_GRADE_MAPPING_RULE_FILENAME, COMPANY_TRAVEL_SEASON_MAPPING_RULE_CODE, COMPANY_TRAVEL_SEASON_MAPPING_RULE_FILENAME, COMPANY_TRAVEL_TRANSPORT_ESTIMATE_RULE_CODE, COMPANY_TRAVEL_TRANSPORT_ESTIMATE_RULE_FILENAME, COMPANY_TRAVEL_TRANSPORT_RULE_CODE, COMPANY_TRAVEL_TRANSPORT_RULE_FILENAME, FINANCE_RULES_LIBRARY, RULE_LIBRARY_NAMES, SPREADSHEET_MIME_TYPE, AgentAssetSpreadsheetManager, RuleSpreadsheetMeta, ) class AgentAssetSpreadsheetHelperMixin: def _require_spreadsheet_rule(self, asset_id: str) -> AgentAsset: asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if asset.asset_type != AgentAssetType.RULE.value: raise ValueError("仅规则资产支持 Excel 规则表。") detail_mode = str((asset.config_json or {}).get("detail_mode") or "").strip().lower() if detail_mode != "spreadsheet": raise ValueError("当前规则未配置 Excel 规则表。") return asset def _resolve_spreadsheet_version_meta( self, asset: AgentAsset, *, version: str | None = None, ) -> tuple[str, RuleSpreadsheetMeta]: resolved_version = str(version or self._resolve_working_version(asset) or "").strip() if not resolved_version: raise ValueError("当前规则尚未配置表格版本。") version_row = self.repository.get_version(asset.id, resolved_version) if version_row is None: raise LookupError(f"版本 {resolved_version} 不存在") # 版本记录中的快照才是不变的事实来源。`/rules` 下的工作簿只是当前 # 可编辑副本,后续写入不应该反向污染某个已存在版本的内容。 metadata = self.spreadsheet_manager.parse_version_markdown(str(version_row.content or "")) if metadata is None and self._resolve_working_version(asset) == resolved_version: metadata = self._read_current_rule_document_meta(asset) if metadata is None: raise FileNotFoundError("规则表版本快照不存在。") return resolved_version, metadata def _resolve_current_spreadsheet_meta( self, asset: AgentAsset, ) -> tuple[str, RuleSpreadsheetMeta]: config_json = dict(asset.config_json or {}) current_meta = self._read_current_rule_document_meta(asset) file_name = ( current_meta.file_name if current_meta is not None and current_meta.file_name else self._resolve_default_spreadsheet_file_name(asset) ) library = self._resolve_spreadsheet_rule_library(asset) storage_key = (Path("rules") / library / file_name).as_posix() file_path = self.spreadsheet_manager.resolve_storage_path(storage_key) if not file_path.exists(): content: bytes | None = None if current_meta is not None and current_meta.storage_key: try: legacy_path = self.spreadsheet_manager.resolve_storage_path( current_meta.storage_key ) except FileNotFoundError: legacy_path = None if legacy_path is not None and legacy_path.exists(): content = legacy_path.read_bytes() if content is None: content = AgentAssetSpreadsheetManager.build_blank_rule_workbook( Path(file_name).stem or "规则表" ) meta = self.spreadsheet_manager.store_rule_library_spreadsheet( library=library, file_name=file_name, content=content, actor_name=( current_meta.updated_by if current_meta is not None and current_meta.updated_by else "system" ), source="current-rule", ) else: content = file_path.read_bytes() meta = RuleSpreadsheetMeta( file_name=file_name, storage_key=storage_key, mime_type=( current_meta.mime_type if current_meta is not None and current_meta.mime_type else SPREADSHEET_MIME_TYPE ), size_bytes=file_path.stat().st_size, checksum=self._hash_bytes(content), updated_at=datetime.fromtimestamp(file_path.stat().st_mtime, UTC).isoformat(), updated_by=( current_meta.updated_by if current_meta is not None and current_meta.updated_by else "system" ), source=( current_meta.source if current_meta is not None and current_meta.source else "current-rule" ), ) expected_document = { **self.spreadsheet_manager.build_rule_document_config( meta, asset_version="current", ), "storage_key": meta.storage_key, } if config_json.get("rule_document") != expected_document: config_json["detail_mode"] = "spreadsheet" config_json["tag"] = str(config_json.get("tag") or "基础规则").strip() or "基础规则" config_json["rule_library"] = library config_json["rule_document"] = expected_document asset.config_json = config_json self.repository.save_asset(asset) return "current", meta def _store_current_rule_spreadsheet( self, asset: AgentAsset, *, file_name: str, content: bytes, actor: str, source: str, ) -> RuleSpreadsheetMeta: library = self._resolve_spreadsheet_rule_library(asset) metadata = self.spreadsheet_manager.store_rule_library_spreadsheet( library=library, file_name=file_name, content=content, actor_name=actor, source=source, ) config_json = dict(asset.config_json or {}) config_json["detail_mode"] = "spreadsheet" config_json["tag"] = str(config_json.get("tag") or "基础规则").strip() or "基础规则" config_json["rule_library"] = library config_json["rule_document"] = { **self.spreadsheet_manager.build_rule_document_config( metadata, asset_version="current", ), "storage_key": metadata.storage_key, } asset.config_json = config_json self.repository.save_asset(asset) return metadata @staticmethod def _resolve_spreadsheet_rule_library(asset: AgentAsset) -> str: config_json = dict(asset.config_json or {}) library = str(config_json.get("rule_library") or FINANCE_RULES_LIBRARY).strip() if library not in RULE_LIBRARY_NAMES: return FINANCE_RULES_LIBRARY return library @staticmethod def _resolve_default_spreadsheet_file_name(asset: AgentAsset) -> str: if asset.code == COMPANY_TRAVEL_EXPENSE_RULE_CODE: return COMPANY_TRAVEL_EXPENSE_RULE_FILENAME if asset.code == COMPANY_COMMUNICATION_EXPENSE_RULE_CODE: return COMPANY_COMMUNICATION_EXPENSE_RULE_FILENAME if asset.code == COMPANY_TRAVEL_ALLOWANCE_RULE_CODE: return COMPANY_TRAVEL_ALLOWANCE_RULE_FILENAME if asset.code == COMPANY_TRAVEL_TRANSPORT_RULE_CODE: return COMPANY_TRAVEL_TRANSPORT_RULE_FILENAME if asset.code == COMPANY_TRAVEL_TRANSPORT_ESTIMATE_RULE_CODE: return COMPANY_TRAVEL_TRANSPORT_ESTIMATE_RULE_FILENAME if asset.code == COMPANY_TRAVEL_GRADE_MAPPING_RULE_CODE: return COMPANY_TRAVEL_GRADE_MAPPING_RULE_FILENAME if asset.code == COMPANY_TRAVEL_SEASON_MAPPING_RULE_CODE: return COMPANY_TRAVEL_SEASON_MAPPING_RULE_FILENAME fallback = Path(str(asset.name or "规则表").strip()).name return fallback if fallback.lower().endswith(".xlsx") else f"{fallback}.xlsx" def _load_spreadsheet_for_compare(self, metadata: RuleSpreadsheetMeta): from io import BytesIO from openpyxl import load_workbook file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) return load_workbook(BytesIO(file_path.read_bytes()), read_only=False, data_only=False) def _collect_workbook_changes_from_content( self, base_metadata: RuleSpreadsheetMeta, target_content: bytes, ) -> tuple[list[AgentAssetSpreadsheetDiffSheetRead], list[AgentAssetSpreadsheetDiffCellRead]]: from io import BytesIO from openpyxl import load_workbook base_workbook = self._load_spreadsheet_for_compare(base_metadata) target_workbook = load_workbook(BytesIO(target_content), read_only=False, data_only=False) return self._collect_workbook_changes(base_workbook, target_workbook) def _collect_workbook_changes( self, base_workbook, target_workbook ) -> tuple[list[AgentAssetSpreadsheetDiffSheetRead], list[AgentAssetSpreadsheetDiffCellRead]]: base_sheet_names = set(base_workbook.sheetnames) target_sheet_names = set(target_workbook.sheetnames) sheet_changes: list[AgentAssetSpreadsheetDiffSheetRead] = [] for sheet_name in sorted(target_sheet_names - base_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="added") ) for sheet_name in sorted(base_sheet_names - target_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="removed") ) cell_changes: list[AgentAssetSpreadsheetDiffCellRead] = [] for sheet_name in sorted(base_sheet_names & target_sheet_names): base_sheet = base_workbook[sheet_name] target_sheet = target_workbook[sheet_name] max_row = max(base_sheet.max_row, target_sheet.max_row) max_column = max(base_sheet.max_column, target_sheet.max_column) for row_index in range(1, max_row + 1): for column_index in range(1, max_column + 1): before_value = base_sheet.cell(row=row_index, column=column_index).value after_value = target_sheet.cell(row=row_index, column=column_index).value if before_value == after_value: continue if before_value in (None, ""): change_type = "added" elif after_value in (None, ""): change_type = "removed" else: change_type = "modified" cell_changes.append( AgentAssetSpreadsheetDiffCellRead( sheet_name=sheet_name, cell=target_sheet.cell(row=row_index, column=column_index).coordinate, change_type=change_type, before_value=before_value, after_value=after_value, ) ) for sheet_name in sorted({item.sheet_name for item in cell_changes}): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="modified") ) return sheet_changes, cell_changes @staticmethod def _count_changed_sheets( sheet_changes: list[AgentAssetSpreadsheetDiffSheetRead], cell_changes: list[AgentAssetSpreadsheetDiffCellRead], ) -> int: return len( {item.sheet_name for item in sheet_changes} | {item.sheet_name for item in cell_changes} ) @staticmethod def _build_spreadsheet_change_summary( sheet_changes: list[AgentAssetSpreadsheetDiffSheetRead], cell_changes: list[AgentAssetSpreadsheetDiffCellRead], ) -> str: sheet_names = sorted( {item.sheet_name for item in sheet_changes} | {item.sheet_name for item in cell_changes} ) if not sheet_names: return "文件内容已保存,未发现单元格级差异。" preview = "、".join(sheet_names[:3]) if len(sheet_names) > 3: preview = f"{preview} 等" sheet_text = f"涉及 {len(sheet_names)} 个工作表({preview})" if cell_changes: return f"{sheet_text},共 {len(cell_changes)} 处单元格改动。" return f"{sheet_text},工作表结构发生变化。"