from __future__ import annotations import json from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any from urllib.request import Request, urlopen import jwt from sqlalchemy.orm import Session from app.api.deps import CurrentUserContext from app.core.agent_enums import ( AgentAssetContentType, AgentAssetStatus, AgentAssetType, AgentReviewStatus, ) from app.core.config import get_settings from app.core.logging import get_logger from app.models.agent_asset import AgentAsset, AgentAssetReview, AgentAssetVersion from app.repositories.agent_asset import AgentAssetRepository from app.schemas.agent_asset import ( AgentAssetCreate, AgentAssetListItem, AgentAssetOnlyOfficeConfigRead, AgentAssetRead, AgentAssetReviewCreate, AgentAssetReviewRead, AgentAssetSpreadsheetDiffCellRead, AgentAssetSpreadsheetDiffSheetRead, AgentAssetUpdate, AgentAssetVersionCompareRead, AgentAssetVersionCreate, AgentAssetVersionRead, AgentAssetVersionTimelineItemRead, ) from app.services.agent_asset_spreadsheet import ( AgentAssetSpreadsheetManager, COMPANY_TRAVEL_EXPENSE_RULE_FILENAME, RULE_LIBRARY_NAMES, RuleSpreadsheetMeta, SPREADSHEET_MIME_TYPE, ) from app.services.agent_foundation import AgentFoundationService from app.services.audit import AuditLogService from app.services.settings import resolve_onlyoffice_settings logger = get_logger("app.services.agent_assets") PREVIEW_RULE_ASSET_ID = "preview-rule-expense-company-travel-expense" PREVIEW_RULE_CURRENT_VERSION = "v1.2.0" PREVIEW_RULE_VERSION_FILENAMES = { PREVIEW_RULE_CURRENT_VERSION: COMPANY_TRAVEL_EXPENSE_RULE_FILENAME, "v1.1.0": "公司差旅费报销规则-v1.1.0.xlsx", "v1.0.0": "公司差旅费报销规则-v1.0.0.xlsx", } @dataclass(slots=True) class OnlyOfficeCallbackPayload: status: int download_url: str users: list[str] class AgentAssetService: def __init__(self, db: Session) -> None: self.db = db self.repository = AgentAssetRepository(db) self.audit_service = AuditLogService(db) self.spreadsheet_manager = AgentAssetSpreadsheetManager() def list_assets( self, *, asset_type: str | None = None, status: str | None = None, domain: str | None = None, keyword: str | None = None, ) -> list[AgentAssetListItem]: self._ensure_ready() items = self.repository.list( asset_type=asset_type, status=status, domain=domain, keyword=keyword ) return [AgentAssetListItem.model_validate(item) for item in items] def get_asset(self, asset_id: str) -> AgentAssetRead | None: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: return None working_version = self._resolve_working_version(asset) recent_versions = self._sort_versions( self.repository.list_versions(asset_id, limit=5), working_version, ) latest_review = ( self.repository.get_review(asset_id, working_version) if working_version else next(iter(self.repository.list_reviews(asset_id, limit=1)), None) ) current_version = ( self.repository.get_version(asset_id, working_version) if working_version else None ) return AgentAssetRead( **AgentAssetListItem.model_validate(asset).model_dump(), current_version_content=self._deserialize_content(current_version) if current_version else None, current_version_content_type=current_version.content_type if current_version else None, current_version_change_note=current_version.change_note if current_version else None, recent_versions=[ self._serialize_version(item, asset) for item in recent_versions ], latest_review=AgentAssetReviewRead.model_validate(latest_review) if latest_review else None, ) def create_asset( self, payload: AgentAssetCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() if self.repository.get_by_code(payload.code): raise ValueError(f"资产编码 {payload.code} 已存在") if payload.status == AgentAssetStatus.ACTIVE: raise ValueError("请先创建资产并完成审核,再通过上线接口激活。") asset = AgentAsset( asset_type=payload.asset_type.value, code=payload.code, name=payload.name, description=payload.description, domain=payload.domain.value, scenario_json=payload.scenario_json, owner=payload.owner, reviewer=payload.reviewer, status=payload.status.value, config_json=payload.config_json, ) created = self.repository.create_asset(asset) self.audit_service.log_action( actor=actor, action="create_agent_asset", resource_type=created.asset_type, resource_id=created.id, before_json=None, after_json=self._asset_snapshot(created), request_id=request_id, ) logger.info("Created agent asset id=%s code=%s", created.id, created.code) return self.get_asset(created.id) # type: ignore[return-value] def update_asset( self, asset_id: str, payload: AgentAssetUpdate, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") before = self._asset_snapshot(asset) if payload.status == AgentAssetStatus.ACTIVE: raise ValueError("请使用上线接口激活资产。") for field_name in ( "name", "description", "owner", "reviewer", "current_version", "published_version", "working_version", "config_json", "scenario_json", ): value = getattr(payload, field_name) if value is not None: setattr(asset, field_name, value) if payload.domain is not None: asset.domain = payload.domain.value if payload.status is not None: asset.status = payload.status.value if payload.current_version is not None and not self.repository.get_version( asset_id, payload.current_version ): raise LookupError(f"版本 {payload.current_version} 不存在") if payload.published_version is not None and not self.repository.get_version( asset_id, payload.published_version ): raise LookupError(f"版本 {payload.published_version} 不存在") if payload.working_version is not None and not self.repository.get_version( asset_id, payload.working_version ): raise LookupError(f"版本 {payload.working_version} 不存在") if payload.current_version is not None and payload.working_version is None: asset.working_version = payload.current_version if payload.working_version is not None: asset.current_version = payload.working_version updated = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="update_agent_asset", resource_type=updated.asset_type, resource_id=updated.id, before_json=before, after_json=self._asset_snapshot(updated), request_id=request_id, ) logger.info("Updated agent asset id=%s code=%s", updated.id, updated.code) return self.get_asset(updated.id) # type: ignore[return-value] def list_versions(self, asset_id: str, *, limit: int = 20) -> list[AgentAssetVersionRead]: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") versions = self._sort_versions( self.repository.list_versions(asset_id, limit=limit), self._resolve_working_version(asset), ) return [self._serialize_version(item, asset) for item in versions] def create_version( self, asset_id: str, payload: AgentAssetVersionCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetVersionRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if self.repository.get_version(asset_id, payload.version): raise ValueError(f"版本号 {payload.version} 已存在") self._validate_version_payload(asset, payload) serialized_content = self._serialize_content(payload.content, payload.content_type.value) version = AgentAssetVersion( asset_id=asset_id, version=payload.version, content=serialized_content, content_type=payload.content_type.value, change_note=payload.change_note, created_by=payload.created_by, ) created = self.repository.create_version(version) before = self._asset_snapshot(asset) asset.working_version = payload.version asset.current_version = payload.version updated_asset = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="save_agent_asset_version", resource_type=updated_asset.asset_type, resource_id=updated_asset.id, before_json=before, after_json={ "current_version": updated_asset.current_version, "working_version": updated_asset.working_version, "published_version": updated_asset.published_version, "status": updated_asset.status, }, request_id=request_id, ) logger.info("Created agent asset version asset_id=%s version=%s", asset_id, payload.version) return self._serialize_version(created, updated_asset) def create_review( self, asset_id: str, payload: AgentAssetReviewCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetReviewRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if self.repository.get_version(asset_id, payload.version) is None: raise LookupError(f"版本 {payload.version} 不存在") if asset.asset_type == AgentAssetType.RULE.value: working_version = self._resolve_working_version(asset) if payload.version != working_version: raise ValueError("只能对当前工作版本发起审核。") review = AgentAssetReview( asset_id=asset_id, version=payload.version, reviewer=payload.reviewer, review_status=payload.review_status.value, review_note=payload.review_note, reviewed_at=None if payload.review_status == AgentReviewStatus.PENDING else datetime.now(UTC), ) created = self.repository.create_review(review) before = self._asset_snapshot(asset) asset.reviewer = payload.reviewer if payload.review_status == AgentReviewStatus.PENDING: if not asset.published_version: asset.status = AgentAssetStatus.REVIEW.value elif payload.review_status == AgentReviewStatus.REJECTED: if not asset.published_version: asset.status = AgentAssetStatus.DRAFT.value elif not asset.published_version: asset.status = AgentAssetStatus.REVIEW.value self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="review_agent_asset", resource_type=asset.asset_type, resource_id=asset.id, before_json=before, after_json={ "review_version": payload.version, "review_status": payload.review_status.value, "asset_status": asset.status, }, request_id=request_id, ) logger.info( "Created review asset_id=%s version=%s status=%s", asset_id, payload.version, payload.review_status.value, ) return AgentAssetReviewRead.model_validate(created) def activate_asset( self, asset_id: str, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") candidate_version = self._resolve_working_version(asset) if not candidate_version: raise ValueError("资产尚未设置工作版本,无法上线。") if asset.asset_type == AgentAssetType.RULE.value: review = self.repository.get_review( asset.id, candidate_version, AgentReviewStatus.APPROVED.value ) if review is None: raise PermissionError("规则工作版本尚未审核通过,不能上线。") before = self._asset_snapshot(asset) asset.published_version = candidate_version asset.status = AgentAssetStatus.ACTIVE.value updated = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="activate_agent_asset", resource_type=updated.asset_type, resource_id=updated.id, before_json=before, after_json=self._asset_snapshot(updated), request_id=request_id, ) logger.info("Activated agent asset id=%s code=%s", updated.id, updated.code) return self.get_asset(updated.id) # type: ignore[return-value] def build_rule_spreadsheet_onlyoffice_config( self, asset_id: str, current_user: CurrentUserContext, *, version: str | None = None, ) -> AgentAssetOnlyOfficeConfigRead: self._ensure_ready() if asset_id == PREVIEW_RULE_ASSET_ID: resolved_version, metadata = self._ensure_preview_rule_spreadsheet(version=version) return self._build_onlyoffice_spreadsheet_config( asset_id=asset_id, current_user=current_user, resolved_version=resolved_version, metadata=metadata, editable=resolved_version == PREVIEW_RULE_CURRENT_VERSION, ) asset = self._require_spreadsheet_rule(asset_id) resolved_version, metadata = self._resolve_spreadsheet_version_meta(asset, version=version) editable = self._can_edit_spreadsheet_version(asset, current_user, resolved_version) return self._build_onlyoffice_spreadsheet_config( asset_id=asset.id, current_user=current_user, resolved_version=resolved_version, metadata=metadata, editable=editable, ) def get_rule_spreadsheet_content( self, asset_id: str, *, version: str | None = None, ) -> tuple[Path, str, str]: self._ensure_ready() if asset_id == PREVIEW_RULE_ASSET_ID: _, metadata = self._ensure_preview_rule_spreadsheet(version=version) file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) return file_path, metadata.mime_type, metadata.file_name asset = self._require_spreadsheet_rule(asset_id) _, metadata = self._resolve_spreadsheet_version_meta(asset, version=version) file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) return file_path, metadata.mime_type, metadata.file_name def validate_rule_spreadsheet_access_token( self, asset_id: str, version: str, access_token: str, ) -> None: onlyoffice_settings = resolve_onlyoffice_settings() try: payload = jwt.decode( access_token, onlyoffice_settings.jwt_secret, algorithms=["HS256"], ) except jwt.PyJWTError as exc: raise ValueError("ONLYOFFICE 文件访问令牌无效。") from exc if ( payload.get("scope") != "agent-asset-spreadsheet" or payload.get("asset_id") != asset_id or payload.get("version") != version ): raise ValueError("ONLYOFFICE 文件访问令牌无效。") def upload_rule_spreadsheet( self, asset_id: str, *, filename: str, content: bytes, actor: str, request_id: str | None = None, change_note: str | None = None, source: str = "upload", ) -> AgentAssetRead: self._ensure_ready() asset = self._require_spreadsheet_rule(asset_id) normalized_name = Path(str(filename or "").strip()).name.strip() if not normalized_name: raise ValueError("规则表文件名不能为空。") if Path(normalized_name).suffix.lower() != ".xlsx": raise ValueError("当前仅支持上传 .xlsx 格式的规则表。") if not content: raise ValueError("规则表文件内容不能为空。") next_version = self._increment_version(self._resolve_working_version(asset)) metadata = self.spreadsheet_manager.store_spreadsheet( asset_id=asset.id, version=next_version, file_name=normalized_name, content=content, actor_name=actor, source=source, ) markdown = self.spreadsheet_manager.build_version_markdown( rule_name=asset.name, version=next_version, metadata=metadata, ) self.create_version( asset.id, AgentAssetVersionCreate( version=next_version, content=markdown, content_type=AgentAssetContentType.MARKDOWN, change_note=change_note or f"上传 Excel 规则表:{normalized_name}", created_by=actor, ), actor=actor, request_id=request_id, ) refreshed = self.repository.get(asset.id) if refreshed is None: raise LookupError("Asset not found") config_json = dict(refreshed.config_json or {}) config_json["detail_mode"] = "spreadsheet" config_json["tag"] = str(config_json.get("tag") or "财务规则").strip() or "财务规则" current_document_meta = metadata rule_library = str(config_json.get("rule_library") or "").strip() if rule_library in RULE_LIBRARY_NAMES: current_document_meta = self.spreadsheet_manager.store_rule_library_spreadsheet( library=rule_library, file_name=normalized_name, content=content, actor_name=actor, source=source, ) rule_document = self.spreadsheet_manager.build_rule_document_config( current_document_meta, asset_version=next_version, ) rule_document["storage_key"] = current_document_meta.storage_key config_json["rule_document"] = rule_document refreshed.config_json = config_json self.repository.save_asset(refreshed) return self.get_asset(asset.id) # type: ignore[return-value] def import_rule_spreadsheet_content( self, asset_id: str, *, filename: str, content: bytes, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self._require_spreadsheet_rule(asset_id) normalized_name = Path(str(filename or "").strip()).name.strip() if not normalized_name: raise ValueError("待导入表格文件名不能为空。") if Path(normalized_name).suffix.lower() != ".xlsx": raise ValueError("当前仅支持导入 .xlsx 格式的规则表。") _, current_metadata = self._resolve_spreadsheet_version_meta( asset, version=self._resolve_working_version(asset), ) imported_content = self.spreadsheet_manager.rebuild_from_uploaded_content(content) return self.upload_rule_spreadsheet( asset.id, filename=current_metadata.file_name, content=imported_content, actor=actor, request_id=request_id, change_note=f"导入 Excel 表格内容:{normalized_name}", source="content-import", ) def handle_rule_spreadsheet_onlyoffice_callback( self, asset_id: str, *, version: str, payload: dict[str, Any], ) -> None: self._ensure_ready() if asset_id == PREVIEW_RULE_ASSET_ID: self._handle_preview_rule_spreadsheet_onlyoffice_callback( version=version, payload=payload, ) return asset = self._require_spreadsheet_rule(asset_id) callback = self._parse_onlyoffice_callback(payload) if callback.status not in {2, 6} or not callback.download_url: return if self._resolve_working_version(asset) != str(version or "").strip(): return _, current_metadata = self._resolve_spreadsheet_version_meta(asset, version=version) request = Request( callback.download_url, headers={"User-Agent": "x-financial-onlyoffice-agent-asset"}, ) with urlopen(request, timeout=30) as response: # noqa: S310 content = response.read() if current_metadata.checksum and current_metadata.checksum == self._hash_bytes(content): return actor_name = callback.users[0] if callback.users else "ONLYOFFICE" self.upload_rule_spreadsheet( asset.id, filename=current_metadata.file_name, content=content, actor=actor_name, change_note="ONLYOFFICE 编辑保存规则表。", source="onlyoffice", ) def _ensure_ready(self) -> None: AgentFoundationService(self.db).ensure_foundation_ready() def _validate_version_payload( self, asset: AgentAsset, payload: AgentAssetVersionCreate ) -> None: if ( asset.asset_type == AgentAssetType.RULE.value and payload.content_type != AgentAssetContentType.MARKDOWN ): raise ValueError("规则资产版本内容必须使用 markdown。") if ( asset.asset_type != AgentAssetType.RULE.value and payload.content_type != AgentAssetContentType.JSON ): raise ValueError("技能、MCP、任务资产版本内容必须使用 json。") if payload.content_type == AgentAssetContentType.MARKDOWN and not isinstance( payload.content, str ): raise ValueError("Markdown 内容必须是字符串。") if payload.content_type == AgentAssetContentType.JSON and not isinstance( payload.content, (dict, list) ): raise ValueError("JSON 内容必须是对象或数组。") def restore_version_as_working_copy( self, asset_id: str, source_version: str, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") source = self.repository.get_version(asset_id, source_version) if source is None: raise LookupError(f"版本 {source_version} 不存在") if ( asset.asset_type == AgentAssetType.RULE.value and str((asset.config_json or {}).get("detail_mode") or "").strip().lower() == "spreadsheet" ): metadata = self.spreadsheet_manager.parse_version_markdown(str(source.content or "")) if metadata is None: raise FileNotFoundError("历史规则表快照不存在,无法恢复。") file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) restored = self.upload_rule_spreadsheet( asset.id, filename=metadata.file_name, content=file_path.read_bytes(), actor=actor, request_id=request_id, change_note=f"基于历史版本 {source_version} 恢复生成工作稿", source="restore", ) self.audit_service.log_action( actor=actor, action="restore_agent_asset_version", resource_type=asset.asset_type, resource_id=asset.id, before_json={"source_version": source_version}, after_json={"working_version": restored.working_version}, request_id=request_id, ) return restored next_version = self._increment_version(self._resolve_working_version(asset)) self.create_version( asset.id, AgentAssetVersionCreate( version=next_version, content=self._deserialize_content(source), content_type=AgentAssetContentType(source.content_type), change_note=f"基于历史版本 {source_version} 恢复生成工作稿", created_by=actor, ), actor=actor, request_id=request_id, ) restored = self.get_asset(asset.id) self.audit_service.log_action( actor=actor, action="restore_agent_asset_version", resource_type=asset.asset_type, resource_id=asset.id, before_json={"source_version": source_version}, after_json={"working_version": next_version}, request_id=request_id, ) return restored # type: ignore[return-value] def list_version_timeline(self, asset_id: str) -> list[AgentAssetVersionTimelineItemRead]: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") events: list[AgentAssetVersionTimelineItemRead] = [] versions = self.repository.list_versions(asset_id) for version in versions: source_version = self._extract_restore_source_version(version.change_note) events.append( AgentAssetVersionTimelineItemRead( event_type="restored" if source_version else "created", version=version.version, actor=version.created_by, event_time=version.created_at, title="恢复生成工作稿" if source_version else "创建工作版本", description=version.change_note or "生成新版本", note=version.change_note, source_version=source_version, ) ) for review in self.repository.list_reviews(asset_id): event_type = { AgentReviewStatus.PENDING.value: "submitted", AgentReviewStatus.APPROVED.value: "approved", AgentReviewStatus.REJECTED.value: "rejected", }.get(review.review_status, "reviewed") title = { "submitted": "提交审核", "approved": "审核通过", "rejected": "审核驳回", }.get(event_type, "审核处理") events.append( AgentAssetVersionTimelineItemRead( event_type=event_type, version=review.version, actor=review.reviewer, event_time=review.reviewed_at or review.created_at, title=title, description=review.review_note or "", note=review.review_note, ) ) audit_logs = self.audit_service.repository.list( resource_type=asset.asset_type, resource_id=asset.id, limit=200, ) for log in audit_logs: if log.action != "activate_agent_asset": continue after_json = log.after_json or {} version = str( after_json.get("published_version") or after_json.get("current_version") or "" ).strip() if not version: continue events.append( AgentAssetVersionTimelineItemRead( event_type="published", version=version, actor=log.actor, event_time=log.created_at, title="正式上线", description="该版本已切换为线上正式版本。", ) ) return sorted(events, key=lambda item: item.event_time) def compare_spreadsheet_versions( self, asset_id: str, *, base_version: str, target_version: str, ) -> AgentAssetVersionCompareRead: self._ensure_ready() asset = self._require_spreadsheet_rule(asset_id) resolved_base, base_meta = self._resolve_spreadsheet_version_meta(asset, version=base_version) resolved_target, target_meta = self._resolve_spreadsheet_version_meta(asset, version=target_version) base_workbook = self._load_spreadsheet_for_compare(base_meta) target_workbook = self._load_spreadsheet_for_compare(target_meta) base_sheet_names = set(base_workbook.sheetnames) target_sheet_names = set(target_workbook.sheetnames) sheet_changes: list[AgentAssetSpreadsheetDiffSheetRead] = [] for sheet_name in sorted(target_sheet_names - base_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="added") ) for sheet_name in sorted(base_sheet_names - target_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="removed") ) cell_changes: list[AgentAssetSpreadsheetDiffCellRead] = [] changed_sheets: set[str] = set() for sheet_name in sorted(base_sheet_names & target_sheet_names): base_sheet = base_workbook[sheet_name] target_sheet = target_workbook[sheet_name] max_row = max(base_sheet.max_row, target_sheet.max_row) max_column = max(base_sheet.max_column, target_sheet.max_column) for row_index in range(1, max_row + 1): for column_index in range(1, max_column + 1): before_value = base_sheet.cell(row=row_index, column=column_index).value after_value = target_sheet.cell(row=row_index, column=column_index).value if before_value == after_value: continue changed_sheets.add(sheet_name) if before_value in (None, ""): change_type = "added" elif after_value in (None, ""): change_type = "removed" else: change_type = "modified" cell_changes.append( AgentAssetSpreadsheetDiffCellRead( sheet_name=sheet_name, cell=target_sheet.cell(row=row_index, column=column_index).coordinate, change_type=change_type, before_value=before_value, after_value=after_value, ) ) return AgentAssetVersionCompareRead( base_version=resolved_base, target_version=resolved_target, added_sheet_count=len(target_sheet_names - base_sheet_names), removed_sheet_count=len(base_sheet_names - target_sheet_names), changed_sheet_count=len(changed_sheets), changed_cell_count=len(cell_changes), sheet_changes=sheet_changes, cell_changes=cell_changes[:500], ) def _serialize_version( self, version: AgentAssetVersion, asset: AgentAsset ) -> AgentAssetVersionRead: latest_review = self.repository.get_review(asset.id, version.version) working_version = self._resolve_working_version(asset) published_version = self._resolve_published_version(asset) return AgentAssetVersionRead( id=version.id, asset_id=version.asset_id, version=version.version, content=self._deserialize_content(version), content_type=version.content_type, change_note=version.change_note, created_by=version.created_by, created_at=version.created_at, is_current=version.version == working_version, is_published=version.version == published_version, is_working=version.version == working_version, lifecycle_state=self._resolve_version_lifecycle_state( version.version, working_version=working_version, published_version=published_version, latest_review_status=latest_review.review_status if latest_review else "", ), ) @staticmethod def _sort_versions( versions: list[AgentAssetVersion], current_version: str | None ) -> list[AgentAssetVersion]: return sorted( versions, key=lambda item: (item.version == current_version, item.created_at), reverse=True, ) @staticmethod def _serialize_content(content: Any, content_type: str) -> str: if content_type == AgentAssetContentType.MARKDOWN.value: return str(content) return json.dumps(content, ensure_ascii=False, sort_keys=True, indent=2) @staticmethod def _deserialize_content(version: AgentAssetVersion | None) -> Any: if version is None: return None if version.content_type == AgentAssetContentType.MARKDOWN.value: return version.content return json.loads(version.content) def _require_spreadsheet_rule(self, asset_id: str) -> AgentAsset: asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if asset.asset_type != AgentAssetType.RULE.value: raise ValueError("仅规则资产支持 Excel 规则表。") detail_mode = str((asset.config_json or {}).get("detail_mode") or "").strip().lower() if detail_mode != "spreadsheet": raise ValueError("当前规则未配置 Excel 规则表。") return asset def _resolve_spreadsheet_version_meta( self, asset: AgentAsset, *, version: str | None = None, ) -> tuple[str, RuleSpreadsheetMeta]: resolved_version = str(version or self._resolve_working_version(asset) or "").strip() if not resolved_version: raise ValueError("当前规则尚未配置表格版本。") version_row = self.repository.get_version(asset.id, resolved_version) if version_row is None: raise LookupError(f"版本 {resolved_version} 不存在") # 版本记录中的快照才是不变的事实来源。`/rules` 下的工作簿只是当前 # 可编辑副本,后续写入不应该反向污染某个已存在版本的内容。 metadata = self.spreadsheet_manager.parse_version_markdown(str(version_row.content or "")) if metadata is None and self._resolve_working_version(asset) == resolved_version: metadata = self._read_current_rule_document_meta(asset) if metadata is None: raise FileNotFoundError("规则表版本快照不存在。") return resolved_version, metadata def _build_onlyoffice_spreadsheet_config( self, *, asset_id: str, current_user: CurrentUserContext, resolved_version: str, metadata: RuleSpreadsheetMeta, editable: bool, ) -> AgentAssetOnlyOfficeConfigRead: onlyoffice_settings = resolve_onlyoffice_settings() settings = get_settings() if not onlyoffice_settings.enabled: raise ValueError("ONLYOFFICE 预览未启用。") if not onlyoffice_settings.public_url or not onlyoffice_settings.backend_url: raise ValueError("ONLYOFFICE 地址配置不完整。") if not onlyoffice_settings.jwt_secret: raise ValueError("ONLYOFFICE JWT 密钥未配置。") backend_base_url = onlyoffice_settings.backend_url.rstrip("/") public_url = onlyoffice_settings.public_url.rstrip("/") access_token = self._build_onlyoffice_access_token(asset_id, resolved_version) document_url = ( f"{backend_base_url}{settings.api_v1_prefix}/agent-assets/{asset_id}/spreadsheet/onlyoffice/content" f"?version={resolved_version}&access_token={access_token}" ) callback_url = ( f"{backend_base_url}{settings.api_v1_prefix}/agent-assets/{asset_id}/spreadsheet/onlyoffice/callback" f"?version={resolved_version}" ) config: dict[str, Any] = { "documentType": "cell", "document": { "fileType": Path(metadata.file_name).suffix.lstrip(".").lower() or "xlsx", "key": self._build_onlyoffice_document_key(asset_id, resolved_version, metadata), "title": metadata.file_name, "url": document_url, "permissions": { "download": True, "edit": editable, "print": True, "copy": True, }, }, "editorConfig": { "mode": "edit" if editable else "view", "lang": "zh-CN", "callbackUrl": callback_url, "user": { "id": current_user.username, "name": current_user.name, }, "customization": { "compactHeader": True, "compactToolbar": False, "toolbarNoTabs": False, "autosave": False, "forcesave": False, }, }, "width": "100%", "height": "100%", } config["token"] = jwt.encode(config, onlyoffice_settings.jwt_secret, algorithm="HS256") return AgentAssetOnlyOfficeConfigRead(documentServerUrl=public_url, config=config) def _ensure_preview_rule_spreadsheet( self, *, version: str | None = None, ) -> tuple[str, RuleSpreadsheetMeta]: resolved_version = str(version or PREVIEW_RULE_CURRENT_VERSION).strip() if resolved_version not in PREVIEW_RULE_VERSION_FILENAMES: raise LookupError(f"版本 {resolved_version} 不存在") file_name = PREVIEW_RULE_VERSION_FILENAMES[resolved_version] storage_key = ( Path("agent_assets") / PREVIEW_RULE_ASSET_ID / "rule_spreadsheets" / resolved_version / file_name ).as_posix() try: file_path = self.spreadsheet_manager.resolve_storage_path(storage_key) except FileNotFoundError: file_path = None if file_path is not None and file_path.exists(): content = file_path.read_bytes() updated_at = datetime.fromtimestamp(file_path.stat().st_mtime, UTC).isoformat() return resolved_version, RuleSpreadsheetMeta( file_name=file_name, storage_key=storage_key, mime_type=SPREADSHEET_MIME_TYPE, size_bytes=file_path.stat().st_size, checksum=self._hash_bytes(content), updated_at=updated_at, updated_by="ONLYOFFICE 预览", source="preview", ) metadata = self.spreadsheet_manager.store_spreadsheet( asset_id=PREVIEW_RULE_ASSET_ID, version=resolved_version, file_name=file_name, content=AgentAssetSpreadsheetManager.build_company_travel_rule_template(), actor_name="ONLYOFFICE 预览", source="preview", ) return resolved_version, metadata def _handle_preview_rule_spreadsheet_onlyoffice_callback( self, *, version: str, payload: dict[str, Any], ) -> None: callback = self._parse_onlyoffice_callback(payload) if callback.status not in {2, 6} or not callback.download_url: return resolved_version, metadata = self._ensure_preview_rule_spreadsheet(version=version) request = Request( callback.download_url, headers={"User-Agent": "x-financial-onlyoffice-agent-asset-preview"}, ) with urlopen(request, timeout=30) as response: # noqa: S310 content = response.read() if metadata.checksum and metadata.checksum == self._hash_bytes(content): return actor_name = callback.users[0] if callback.users else "ONLYOFFICE" self.spreadsheet_manager.store_spreadsheet( asset_id=PREVIEW_RULE_ASSET_ID, version=resolved_version, file_name=metadata.file_name, content=content, actor_name=actor_name, source="onlyoffice-preview", ) @staticmethod def _read_current_rule_document_meta(asset: AgentAsset) -> RuleSpreadsheetMeta | None: payload = (asset.config_json or {}).get("rule_document") if not isinstance(payload, dict): return None return RuleSpreadsheetMeta( file_name=str(payload.get("file_name") or "").strip(), storage_key=str(payload.get("storage_key") or "").strip(), mime_type=( str(payload.get("mime_type") or "").strip() or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), size_bytes=int(payload.get("size_bytes") or 0), checksum=str(payload.get("checksum") or "").strip(), updated_at=str(payload.get("updated_at") or "").strip(), updated_by=str(payload.get("updated_by") or "system").strip() or "system", source=str(payload.get("source") or "upload").strip() or "upload", ) @staticmethod def _increment_version(version: str | None) -> str: normalized = str(version or "").strip().removeprefix("v") parts = normalized.split(".") if len(parts) != 3 or not all(item.isdigit() for item in parts): return "v1.0.0" major, minor, patch = [int(item) for item in parts] return f"v{major}.{minor}.{patch + 1}" @staticmethod def _can_edit_spreadsheet_version( asset: AgentAsset, current_user: CurrentUserContext, version: str, ) -> bool: role_codes = {str(item).strip() for item in current_user.role_codes} can_edit = current_user.is_admin or "manager" in role_codes or "finance" in role_codes return can_edit and AgentAssetService._resolve_working_version(asset) == str(version or "").strip() @staticmethod def _build_onlyoffice_document_key( asset_id: str, version: str, metadata: RuleSpreadsheetMeta, ) -> str: raw_key = f"{asset_id}-{version}-{metadata.checksum or metadata.updated_at or metadata.file_name}" return "".join( character if character.isalnum() or character in {"-", "_", ".", "="} else "_" for character in raw_key ) @staticmethod def _build_onlyoffice_access_token(asset_id: str, version: str) -> str: onlyoffice_settings = resolve_onlyoffice_settings() payload = { "scope": "agent-asset-spreadsheet", "asset_id": asset_id, "version": version, } return jwt.encode(payload, onlyoffice_settings.jwt_secret, algorithm="HS256") @staticmethod def _parse_onlyoffice_callback(payload: dict[str, Any]) -> OnlyOfficeCallbackPayload: return OnlyOfficeCallbackPayload( status=int(payload.get("status") or 0), download_url=str(payload.get("url") or "").strip(), users=[str(item).strip() for item in payload.get("users") or [] if str(item).strip()], ) @staticmethod def _hash_bytes(content: bytes) -> str: import hashlib return hashlib.sha256(content).hexdigest() @staticmethod def _asset_snapshot(asset: AgentAsset) -> dict[str, Any]: return { "asset_type": asset.asset_type, "code": asset.code, "name": asset.name, "status": asset.status, "current_version": asset.current_version, "published_version": asset.published_version, "working_version": asset.working_version, "domain": asset.domain, "owner": asset.owner, "reviewer": asset.reviewer, } @staticmethod def _resolve_working_version(asset: AgentAsset) -> str: return str(asset.working_version or asset.current_version or "").strip() @staticmethod def _resolve_published_version(asset: AgentAsset) -> str: return str(asset.published_version or "").strip() @staticmethod def _resolve_version_lifecycle_state( version: str, *, working_version: str, published_version: str, latest_review_status: str, ) -> str: if version == published_version: return "published" if version != working_version: return "history" if latest_review_status == AgentReviewStatus.PENDING.value: return "pending_review" if latest_review_status == AgentReviewStatus.APPROVED.value: return "approved" if latest_review_status == AgentReviewStatus.REJECTED.value: return "rejected" return "draft" def _load_spreadsheet_for_compare(self, metadata: RuleSpreadsheetMeta): from io import BytesIO from openpyxl import load_workbook file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) return load_workbook(BytesIO(file_path.read_bytes()), read_only=False, data_only=False) @staticmethod def _extract_restore_source_version(change_note: str | None) -> str | None: normalized = str(change_note or "").strip() prefix = "基于历史版本 " suffix = " 恢复生成工作稿" if not normalized.startswith(prefix) or suffix not in normalized: return None return normalized.removeprefix(prefix).split(suffix, 1)[0].strip() or None