from __future__ import annotations import json from collections import defaultdict from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any from urllib.parse import quote from urllib.request import Request, urlopen import jwt from sqlalchemy.orm import Session from app.api.deps import CurrentUserContext from app.core.agent_enums import ( AgentAssetContentType, AgentAssetStatus, AgentAssetType, AgentReviewStatus, ) from app.core.config import get_settings from app.core.logging import get_logger from app.models.agent_asset import AgentAsset, AgentAssetReview, AgentAssetVersion from app.repositories.agent_asset import AgentAssetRepository from app.schemas.agent_asset import ( AgentAssetCreate, AgentAssetListItem, AgentAssetOnlyOfficeConfigRead, AgentAssetRead, AgentAssetReviewCreate, AgentAssetReviewRead, AgentAssetRuleJsonRead, AgentAssetRuleJsonWrite, AgentAssetSpreadsheetChangeRecordRead, AgentAssetSpreadsheetDiffCellRead, AgentAssetSpreadsheetDiffSheetRead, AgentAssetUpdate, AgentAssetVersionCompareRead, AgentAssetVersionCreate, AgentAssetVersionRead, AgentAssetVersionTimelineItemRead, ) from app.services.agent_asset_rule_library import AgentAssetRuleLibraryManager from app.services.agent_asset_spreadsheet import ( AgentAssetSpreadsheetManager, COMPANY_COMMUNICATION_EXPENSE_RULE_CODE, COMPANY_COMMUNICATION_EXPENSE_RULE_FILENAME, COMPANY_TRAVEL_EXPENSE_RULE_CODE, COMPANY_TRAVEL_EXPENSE_RULE_FILENAME, FINANCE_RULES_LIBRARY, RISK_RULES_LIBRARY, RULE_LIBRARY_NAMES, RuleSpreadsheetMeta, SPREADSHEET_MIME_TYPE, ) from app.services.agent_foundation import AgentFoundationService from app.services.audit import AuditLogService from app.services.settings import resolve_onlyoffice_settings logger = get_logger("app.services.agent_assets") PREVIEW_RULE_ASSET_ID = "preview-rule-expense-company-travel-expense" PREVIEW_RULE_CURRENT_VERSION = "v1.2.0" PREVIEW_RULE_VERSION_FILENAMES = { PREVIEW_RULE_CURRENT_VERSION: COMPANY_TRAVEL_EXPENSE_RULE_FILENAME, "v1.1.0": "公司差旅费报销规则-v1.1.0.xlsx", "v1.0.0": "公司差旅费报销规则-v1.0.0.xlsx", } @dataclass(slots=True) class OnlyOfficeCallbackPayload: status: int download_url: str users: list[str] class AgentAssetService: def __init__(self, db: Session) -> None: self.db = db self.repository = AgentAssetRepository(db) self.audit_service = AuditLogService(db) self.spreadsheet_manager = AgentAssetSpreadsheetManager() self.rule_library_manager = AgentAssetRuleLibraryManager() def list_assets( self, *, asset_type: str | None = None, status: str | None = None, domain: str | None = None, keyword: str | None = None, ) -> list[AgentAssetListItem]: self._ensure_ready() if asset_type in {None, "", AgentAssetType.RULE.value}: self.sync_platform_risk_rules_from_library() assets = self.repository.list( asset_type=asset_type, status=status, domain=domain, keyword=keyword ) version_stats = self._collect_version_stats(assets) return [ self._serialize_list_item(asset, version_stats.get(asset.id)) for asset in assets ] def get_asset(self, asset_id: str) -> AgentAssetRead | None: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: return None working_version = self._resolve_working_version(asset) recent_versions = self._sort_versions( self.repository.list_versions(asset_id, limit=5), working_version, ) latest_review = ( self.repository.get_review(asset_id, working_version) if working_version else next(iter(self.repository.list_reviews(asset_id, limit=1)), None) ) current_version = ( self.repository.get_version(asset_id, working_version) if working_version else None ) version_stats = self._collect_version_stats([asset]).get(asset.id) return AgentAssetRead( **self._serialize_list_item(asset, version_stats).model_dump(), current_version_content=self._deserialize_content(current_version) if current_version else None, current_version_content_type=current_version.content_type if current_version else None, current_version_change_note=current_version.change_note if current_version else None, recent_versions=[ self._serialize_version(item, asset) for item in recent_versions ], latest_review=AgentAssetReviewRead.model_validate(latest_review) if latest_review else None, ) def create_asset( self, payload: AgentAssetCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() if self.repository.get_by_code(payload.code): raise ValueError(f"资产编码 {payload.code} 已存在") if payload.status == AgentAssetStatus.ACTIVE: raise ValueError("请先创建资产并完成审核,再通过上线接口激活。") asset = AgentAsset( asset_type=payload.asset_type.value, code=payload.code, name=payload.name, description=payload.description, domain=payload.domain.value, scenario_json=payload.scenario_json, owner=payload.owner, reviewer=payload.reviewer, status=payload.status.value, config_json=payload.config_json, ) created = self.repository.create_asset(asset) self.audit_service.log_action( actor=actor, action="create_agent_asset", resource_type=created.asset_type, resource_id=created.id, before_json=None, after_json=self._asset_snapshot(created), request_id=request_id, ) logger.info("Created agent asset id=%s code=%s", created.id, created.code) return self.get_asset(created.id) # type: ignore[return-value] def update_asset( self, asset_id: str, payload: AgentAssetUpdate, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") before = self._asset_snapshot(asset) if payload.status == AgentAssetStatus.ACTIVE: raise ValueError("请使用上线接口激活资产。") for field_name in ( "name", "description", "owner", "reviewer", "current_version", "published_version", "working_version", "config_json", "scenario_json", ): value = getattr(payload, field_name) if value is not None: setattr(asset, field_name, value) if payload.domain is not None: asset.domain = payload.domain.value if payload.status is not None: asset.status = payload.status.value if payload.current_version is not None and not self.repository.get_version( asset_id, payload.current_version ): raise LookupError(f"版本 {payload.current_version} 不存在") if payload.published_version is not None and not self.repository.get_version( asset_id, payload.published_version ): raise LookupError(f"版本 {payload.published_version} 不存在") if payload.working_version is not None and not self.repository.get_version( asset_id, payload.working_version ): raise LookupError(f"版本 {payload.working_version} 不存在") if payload.current_version is not None and payload.working_version is None: asset.working_version = payload.current_version if payload.working_version is not None: asset.current_version = payload.working_version updated = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="update_agent_asset", resource_type=updated.asset_type, resource_id=updated.id, before_json=before, after_json=self._asset_snapshot(updated), request_id=request_id, ) logger.info("Updated agent asset id=%s code=%s", updated.id, updated.code) return self.get_asset(updated.id) # type: ignore[return-value] def list_versions(self, asset_id: str, *, limit: int = 20) -> list[AgentAssetVersionRead]: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") versions = self._sort_versions( self.repository.list_versions(asset_id, limit=limit), self._resolve_working_version(asset), ) return [self._serialize_version(item, asset) for item in versions] def create_version( self, asset_id: str, payload: AgentAssetVersionCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetVersionRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if self.repository.get_version(asset_id, payload.version): raise ValueError(f"版本号 {payload.version} 已存在") self._validate_version_payload(asset, payload) serialized_content = self._serialize_content(payload.content, payload.content_type.value) version = AgentAssetVersion( asset_id=asset_id, version=payload.version, content=serialized_content, content_type=payload.content_type.value, change_note=payload.change_note, created_by=payload.created_by, ) created = self.repository.create_version(version) before = self._asset_snapshot(asset) asset.working_version = payload.version asset.current_version = payload.version updated_asset = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="save_agent_asset_version", resource_type=updated_asset.asset_type, resource_id=updated_asset.id, before_json=before, after_json={ "current_version": updated_asset.current_version, "working_version": updated_asset.working_version, "published_version": updated_asset.published_version, "status": updated_asset.status, }, request_id=request_id, ) logger.info("Created agent asset version asset_id=%s version=%s", asset_id, payload.version) return self._serialize_version(created, updated_asset) def create_review( self, asset_id: str, payload: AgentAssetReviewCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetReviewRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if ( asset.asset_type == AgentAssetType.RULE.value and payload.review_status == AgentReviewStatus.PENDING and payload.version != self._resolve_working_version(asset) ): if self.repository.get_version(asset_id, payload.version) is not None: raise ValueError(f"版本 {payload.version} 已存在,不能重复送审。") asset = self._create_named_working_copy_for_review( asset, target_version=payload.version, actor=actor, request_id=request_id, ) if self.repository.get_version(asset_id, payload.version) is None: raise LookupError(f"版本 {payload.version} 不存在") if asset.asset_type == AgentAssetType.RULE.value: working_version = self._resolve_working_version(asset) if payload.version != working_version: raise ValueError("只能对当前工作版本发起审核。") review = AgentAssetReview( asset_id=asset_id, version=payload.version, reviewer=payload.reviewer, review_status=payload.review_status.value, review_note=payload.review_note, reviewed_at=None if payload.review_status == AgentReviewStatus.PENDING else datetime.now(UTC), ) created = self.repository.create_review(review) before = self._asset_snapshot(asset) asset.reviewer = payload.reviewer if payload.review_status == AgentReviewStatus.PENDING: if not asset.published_version: asset.status = AgentAssetStatus.REVIEW.value elif payload.review_status == AgentReviewStatus.REJECTED: if not asset.published_version: asset.status = AgentAssetStatus.DRAFT.value elif not asset.published_version: asset.status = AgentAssetStatus.REVIEW.value self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="review_agent_asset", resource_type=asset.asset_type, resource_id=asset.id, before_json=before, after_json={ "review_version": payload.version, "review_status": payload.review_status.value, "asset_status": asset.status, }, request_id=request_id, ) logger.info( "Created review asset_id=%s version=%s status=%s", asset_id, payload.version, payload.review_status.value, ) return AgentAssetReviewRead.model_validate(created) def _create_named_working_copy_for_review( self, asset: AgentAsset, *, target_version: str, actor: str, request_id: str | None = None, ) -> AgentAsset: working_version = self._resolve_working_version(asset) if not working_version: raise ValueError("当前规则尚未配置工作版本,无法提交审核。") source = self.repository.get_version(asset.id, working_version) if source is None: raise LookupError(f"版本 {working_version} 不存在") is_spreadsheet_rule = ( asset.asset_type == AgentAssetType.RULE.value and str((asset.config_json or {}).get("detail_mode") or "").strip().lower() == "spreadsheet" ) if is_spreadsheet_rule: _, metadata = self._resolve_spreadsheet_version_meta(asset, version=working_version) file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) snapshot_meta = self.spreadsheet_manager.store_spreadsheet( asset_id=asset.id, version=target_version, file_name=metadata.file_name, content=file_path.read_bytes(), actor_name=actor, source="review-submit", ) next_content = self.spreadsheet_manager.build_version_markdown( rule_name=asset.name, version=target_version, metadata=snapshot_meta, ) next_content_type = AgentAssetContentType.MARKDOWN else: next_content = self._deserialize_content(source) next_content_type = AgentAssetContentType(source.content_type) self.create_version( asset.id, AgentAssetVersionCreate( version=target_version, content=next_content, content_type=next_content_type, change_note=f"提交审核前固化工作稿为 {target_version}", created_by=actor, ), actor=actor, request_id=request_id, ) refreshed = self.repository.get(asset.id) if refreshed is None: raise LookupError("Asset not found") if is_spreadsheet_rule: config_json = dict(refreshed.config_json or {}) current_document_meta = self._read_current_rule_document_meta(refreshed) if current_document_meta is not None: rule_document = self.spreadsheet_manager.build_rule_document_config( current_document_meta, asset_version=target_version, ) rule_document["storage_key"] = current_document_meta.storage_key config_json["rule_document"] = rule_document refreshed.config_json = config_json self.repository.save_asset(refreshed) return refreshed def activate_asset( self, asset_id: str, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") candidate_version = self._resolve_working_version(asset) if not candidate_version: raise ValueError("资产尚未设置工作版本,无法上线。") if asset.asset_type == AgentAssetType.RULE.value: review = self.repository.get_review( asset.id, candidate_version, AgentReviewStatus.APPROVED.value ) if review is None: raise PermissionError("规则工作版本尚未审核通过,不能上线。") before = self._asset_snapshot(asset) asset.published_version = candidate_version asset.status = AgentAssetStatus.ACTIVE.value updated = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="activate_agent_asset", resource_type=updated.asset_type, resource_id=updated.id, before_json=before, after_json=self._asset_snapshot(updated), request_id=request_id, ) logger.info("Activated agent asset id=%s code=%s", updated.id, updated.code) return self.get_asset(updated.id) # type: ignore[return-value] def build_rule_spreadsheet_onlyoffice_config( self, asset_id: str, current_user: CurrentUserContext, *, version: str | None = None, ) -> AgentAssetOnlyOfficeConfigRead: self._ensure_ready() if asset_id == PREVIEW_RULE_ASSET_ID: resolved_version, metadata = self._ensure_preview_rule_spreadsheet(version=version) return self._build_onlyoffice_spreadsheet_config( asset_id=asset_id, current_user=current_user, resolved_version=resolved_version, metadata=metadata, editable=resolved_version == PREVIEW_RULE_CURRENT_VERSION, ) asset = self._require_spreadsheet_rule(asset_id) resolved_version, metadata = self._resolve_current_spreadsheet_meta(asset) editable = self._can_edit_current_spreadsheet(current_user) return self._build_onlyoffice_spreadsheet_config( asset_id=asset.id, current_user=current_user, resolved_version=resolved_version, metadata=metadata, editable=editable, ) def get_rule_spreadsheet_content( self, asset_id: str, *, version: str | None = None, ) -> tuple[Path, str, str]: self._ensure_ready() if asset_id == PREVIEW_RULE_ASSET_ID: _, metadata = self._ensure_preview_rule_spreadsheet(version=version) file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) return file_path, metadata.mime_type, metadata.file_name asset = self._require_spreadsheet_rule(asset_id) _, metadata = self._resolve_current_spreadsheet_meta(asset) file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) return file_path, metadata.mime_type, metadata.file_name def validate_rule_spreadsheet_access_token( self, asset_id: str, version: str, access_token: str, ) -> None: onlyoffice_settings = resolve_onlyoffice_settings() try: payload = jwt.decode( access_token, onlyoffice_settings.jwt_secret, algorithms=["HS256"], ) except jwt.PyJWTError as exc: raise ValueError("ONLYOFFICE 文件访问令牌无效。") from exc if ( payload.get("scope") != "agent-asset-spreadsheet" or payload.get("asset_id") != asset_id or payload.get("version") != version ): raise ValueError("ONLYOFFICE 文件访问令牌无效。") def upload_rule_spreadsheet( self, asset_id: str, *, filename: str, content: bytes, actor: str, request_id: str | None = None, change_note: str | None = None, source: str = "upload", ) -> AgentAssetRead: self._ensure_ready() asset = self._require_spreadsheet_rule(asset_id) normalized_name = Path(str(filename or "").strip()).name.strip() if not normalized_name: raise ValueError("规则表文件名不能为空。") if Path(normalized_name).suffix.lower() != ".xlsx": raise ValueError("当前仅支持上传 .xlsx 格式的规则表。") if not content: raise ValueError("规则表文件内容不能为空。") _, current_metadata = self._resolve_current_spreadsheet_meta(asset) file_name = current_metadata.file_name or self._resolve_default_spreadsheet_file_name(asset) metadata = self._store_current_rule_spreadsheet( asset, file_name=file_name, content=content, actor=actor, source=source, ) self.audit_service.log_action( actor=actor, action="edit_rule_spreadsheet", resource_type=asset.asset_type, resource_id=asset.id, before_json={"storage_key": current_metadata.storage_key}, after_json={ "summary": change_note or f"上传并覆盖当前规则表:{normalized_name}", "changed_sheet_count": 0, "changed_cell_count": 0, "sheet_changes": [], "cell_changes": [], "storage_key": metadata.storage_key, }, request_id=request_id, ) return self.get_asset(asset.id) # type: ignore[return-value] def import_rule_spreadsheet_content( self, asset_id: str, *, filename: str, content: bytes, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self._require_spreadsheet_rule(asset_id) normalized_name = Path(str(filename or "").strip()).name.strip() if not normalized_name: raise ValueError("待导入表格文件名不能为空。") if Path(normalized_name).suffix.lower() != ".xlsx": raise ValueError("当前仅支持导入 .xlsx 格式的规则表。") _, current_metadata = self._resolve_current_spreadsheet_meta(asset) imported_content = self.spreadsheet_manager.rebuild_from_uploaded_content(content) return self.upload_rule_spreadsheet( asset.id, filename=current_metadata.file_name, content=imported_content, actor=actor, request_id=request_id, change_note=f"导入 Excel 表格内容:{normalized_name}", source="content-import", ) def handle_rule_spreadsheet_onlyoffice_callback( self, asset_id: str, *, version: str, payload: dict[str, Any], actor_name: str | None = None, ) -> None: self._ensure_ready() if asset_id == PREVIEW_RULE_ASSET_ID: self._handle_preview_rule_spreadsheet_onlyoffice_callback( version=version, payload=payload, ) return asset = self._require_spreadsheet_rule(asset_id) callback = self._parse_onlyoffice_callback(payload) if callback.status not in {2, 6} or not callback.download_url: return if str(version or "").strip() not in {"", "current", self._resolve_working_version(asset)}: return _, current_metadata = self._resolve_current_spreadsheet_meta(asset) request = Request( callback.download_url, headers={"User-Agent": "x-financial-onlyoffice-agent-asset"}, ) with urlopen(request, timeout=30) as response: # noqa: S310 content = response.read() if current_metadata.checksum and current_metadata.checksum == self._hash_bytes(content): return from io import BytesIO from openpyxl import load_workbook try: base_workbook = self._load_spreadsheet_for_compare(current_metadata) target_workbook = load_workbook(BytesIO(content), read_only=False, data_only=False) sheet_changes, cell_changes = self._collect_workbook_changes( base_workbook, target_workbook ) changed_sheet_count = len( {item.sheet_name for item in sheet_changes} | {item.sheet_name for item in cell_changes} ) changed_cell_count = len(cell_changes) if changed_cell_count > 0 or changed_sheet_count > 0: change_note = f"ONLYOFFICE 在线编辑:涉及 {changed_sheet_count} 个 Sheet,共 {changed_cell_count} 处改动。" else: change_note = "ONLYOFFICE 在线编辑保存。" except Exception: sheet_changes = [] cell_changes = [] changed_sheet_count = 0 changed_cell_count = 0 change_note = "ONLYOFFICE 在线编辑保存。" resolved_actor_name = str(actor_name or "").strip() or ( callback.users[0] if callback.users else "ONLYOFFICE" ) self._store_current_rule_spreadsheet( asset, file_name=current_metadata.file_name, content=content, actor=resolved_actor_name, source="onlyoffice", ) if changed_sheet_count > 0 or changed_cell_count > 0: self.audit_service.log_action( actor=resolved_actor_name, action="edit_rule_spreadsheet", resource_type=asset.asset_type, resource_id=asset.id, before_json={"storage_key": current_metadata.storage_key}, after_json={ "summary": change_note, "changed_sheet_count": changed_sheet_count, "changed_cell_count": changed_cell_count, "sheet_changes": [item.model_dump() for item in sheet_changes], "cell_changes": [item.model_dump() for item in cell_changes[:500]], }, ) def _ensure_ready(self) -> None: AgentFoundationService(self.db).ensure_foundation_ready() def sync_platform_risk_rules_from_library(self) -> int: manifest_count = AgentFoundationService(self.db).sync_platform_risk_rules_from_library() self.db.commit() return manifest_count def _validate_version_payload( self, asset: AgentAsset, payload: AgentAssetVersionCreate ) -> None: if ( asset.asset_type == AgentAssetType.RULE.value and payload.content_type != AgentAssetContentType.MARKDOWN ): raise ValueError("规则资产版本内容必须使用 markdown。") if ( asset.asset_type != AgentAssetType.RULE.value and payload.content_type != AgentAssetContentType.JSON ): raise ValueError("技能、MCP、任务资产版本内容必须使用 json。") if payload.content_type == AgentAssetContentType.MARKDOWN and not isinstance( payload.content, str ): raise ValueError("Markdown 内容必须是字符串。") if payload.content_type == AgentAssetContentType.JSON and not isinstance( payload.content, (dict, list) ): raise ValueError("JSON 内容必须是对象或数组。") def restore_version_as_working_copy( self, asset_id: str, source_version: str, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") source = self.repository.get_version(asset_id, source_version) if source is None: raise LookupError(f"版本 {source_version} 不存在") if ( asset.asset_type == AgentAssetType.RULE.value and str((asset.config_json or {}).get("detail_mode") or "").strip().lower() == "spreadsheet" ): metadata = self.spreadsheet_manager.parse_version_markdown(str(source.content or "")) if metadata is None: raise FileNotFoundError("历史规则表快照不存在,无法恢复。") file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) restored = self.upload_rule_spreadsheet( asset.id, filename=metadata.file_name, content=file_path.read_bytes(), actor=actor, request_id=request_id, change_note=f"基于历史版本 {source_version} 恢复生成工作稿", source="restore", ) self.audit_service.log_action( actor=actor, action="restore_agent_asset_version", resource_type=asset.asset_type, resource_id=asset.id, before_json={"source_version": source_version}, after_json={"working_version": restored.working_version}, request_id=request_id, ) return restored next_version = self._increment_version(self._resolve_working_version(asset)) self.create_version( asset.id, AgentAssetVersionCreate( version=next_version, content=self._deserialize_content(source), content_type=AgentAssetContentType(source.content_type), change_note=f"基于历史版本 {source_version} 恢复生成工作稿", created_by=actor, ), actor=actor, request_id=request_id, ) restored = self.get_asset(asset.id) self.audit_service.log_action( actor=actor, action="restore_agent_asset_version", resource_type=asset.asset_type, resource_id=asset.id, before_json={"source_version": source_version}, after_json={"working_version": next_version}, request_id=request_id, ) return restored # type: ignore[return-value] def list_version_timeline(self, asset_id: str) -> list[AgentAssetVersionTimelineItemRead]: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") events: list[AgentAssetVersionTimelineItemRead] = [] versions = self.repository.list_versions(asset_id) for version in versions: source_version = self._extract_restore_source_version(version.change_note) events.append( AgentAssetVersionTimelineItemRead( event_type="restored" if source_version else "created", version=version.version, actor=version.created_by, event_time=version.created_at, title="恢复生成工作稿" if source_version else "创建工作版本", description=version.change_note or "生成新版本", note=version.change_note, source_version=source_version, ) ) for review in self.repository.list_reviews(asset_id): event_type = { AgentReviewStatus.PENDING.value: "submitted", AgentReviewStatus.APPROVED.value: "approved", AgentReviewStatus.REJECTED.value: "rejected", }.get(review.review_status, "reviewed") title = { "submitted": "提交审核", "approved": "审核通过", "rejected": "审核驳回", }.get(event_type, "审核处理") events.append( AgentAssetVersionTimelineItemRead( event_type=event_type, version=review.version, actor=review.reviewer, event_time=review.reviewed_at or review.created_at, title=title, description=review.review_note or "", note=review.review_note, ) ) audit_logs = self.audit_service.repository.list( resource_type=asset.asset_type, resource_id=asset.id, limit=200, ) for log in audit_logs: if log.action != "activate_agent_asset": continue after_json = log.after_json or {} version = str( after_json.get("published_version") or after_json.get("current_version") or "" ).strip() if not version: continue events.append( AgentAssetVersionTimelineItemRead( event_type="published", version=version, actor=log.actor, event_time=log.created_at, title="正式上线", description="该版本已切换为线上正式版本。", ) ) return sorted(events, key=lambda item: item.event_time) def compare_spreadsheet_versions( self, asset_id: str, *, base_version: str, target_version: str, ) -> AgentAssetVersionCompareRead: self._ensure_ready() asset = self._require_spreadsheet_rule(asset_id) resolved_base, base_meta = self._resolve_spreadsheet_version_meta(asset, version=base_version) resolved_target, target_meta = self._resolve_spreadsheet_version_meta(asset, version=target_version) base_workbook = self._load_spreadsheet_for_compare(base_meta) target_workbook = self._load_spreadsheet_for_compare(target_meta) base_sheet_names = set(base_workbook.sheetnames) target_sheet_names = set(target_workbook.sheetnames) sheet_changes: list[AgentAssetSpreadsheetDiffSheetRead] = [] for sheet_name in sorted(target_sheet_names - base_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="added") ) for sheet_name in sorted(base_sheet_names - target_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="removed") ) cell_changes: list[AgentAssetSpreadsheetDiffCellRead] = [] changed_sheets: set[str] = set() for sheet_name in sorted(base_sheet_names & target_sheet_names): base_sheet = base_workbook[sheet_name] target_sheet = target_workbook[sheet_name] max_row = max(base_sheet.max_row, target_sheet.max_row) max_column = max(base_sheet.max_column, target_sheet.max_column) for row_index in range(1, max_row + 1): for column_index in range(1, max_column + 1): before_value = base_sheet.cell(row=row_index, column=column_index).value after_value = target_sheet.cell(row=row_index, column=column_index).value if before_value == after_value: continue changed_sheets.add(sheet_name) if before_value in (None, ""): change_type = "added" elif after_value in (None, ""): change_type = "removed" else: change_type = "modified" cell_changes.append( AgentAssetSpreadsheetDiffCellRead( sheet_name=sheet_name, cell=target_sheet.cell(row=row_index, column=column_index).coordinate, change_type=change_type, before_value=before_value, after_value=after_value, ) ) return AgentAssetVersionCompareRead( base_version=resolved_base, target_version=resolved_target, added_sheet_count=len(target_sheet_names - base_sheet_names), removed_sheet_count=len(base_sheet_names - target_sheet_names), changed_sheet_count=len(changed_sheets), changed_cell_count=len(cell_changes), sheet_changes=sheet_changes, cell_changes=cell_changes[:500], ) def list_spreadsheet_change_records( self, asset_id: str, *, limit: int = 30, ) -> list[AgentAssetSpreadsheetChangeRecordRead]: self._ensure_ready() asset = self._require_spreadsheet_rule(asset_id) logs = self.audit_service.repository.list( resource_type=asset.asset_type, resource_id=asset.id, action="edit_rule_spreadsheet", limit=min(max(limit, 1), 30), ) return [ AgentAssetSpreadsheetChangeRecordRead( id=log.id, actor=log.actor, changed_at=log.created_at, summary=str((log.after_json or {}).get("summary") or "ONLYOFFICE 在线编辑保存。"), sheet_changes=[ AgentAssetSpreadsheetDiffSheetRead.model_validate(item) for item in ((log.after_json or {}).get("sheet_changes") or []) ], cell_changes=[ AgentAssetSpreadsheetDiffCellRead.model_validate(item) for item in ((log.after_json or {}).get("cell_changes") or []) ], changed_sheet_count=int((log.after_json or {}).get("changed_sheet_count") or 0), changed_cell_count=int((log.after_json or {}).get("changed_cell_count") or 0), ) for log in logs ] def _serialize_version( self, version: AgentAssetVersion, asset: AgentAsset ) -> AgentAssetVersionRead: latest_review = self.repository.get_review(asset.id, version.version) working_version = self._resolve_working_version(asset) published_version = self._resolve_published_version(asset) return AgentAssetVersionRead( id=version.id, asset_id=version.asset_id, version=version.version, content=self._deserialize_content(version), content_type=version.content_type, change_note=version.change_note, created_by=version.created_by, created_at=version.created_at, is_current=version.version == working_version, is_published=version.version == published_version, is_working=version.version == working_version, lifecycle_state=self._resolve_version_lifecycle_state( version.version, working_version=working_version, published_version=published_version, latest_review_status=latest_review.review_status if latest_review else "", ), ) def _collect_version_stats( self, assets: list[AgentAsset] ) -> dict[str, dict[str, int | str | None]]: asset_ids = [item.id for item in assets] versions = self.repository.list_versions_for_assets(asset_ids) spreadsheet_logs = self.audit_service.repository.list_for_resources( resource_type=AgentAssetType.RULE.value, resource_ids=[ item.id for item in assets if item.asset_type == AgentAssetType.RULE.value and str((item.config_json or {}).get("detail_mode") or "").strip().lower() == "spreadsheet" ], action="edit_rule_spreadsheet", ) working_versions = { item.id: self._resolve_working_version(item) for item in assets } version_counts: dict[str, int] = defaultdict(int) modified_by: dict[str, str | None] = {item.id: None for item in assets} spreadsheet_edit_counts: dict[str, int] = defaultdict(int) spreadsheet_last_actor: dict[str, str | None] = {} spreadsheet_last_changed_at: dict[str, datetime] = {} for version in versions: version_counts[version.asset_id] += 1 if ( modified_by.get(version.asset_id) is None and version.version == working_versions.get(version.asset_id) ): modified_by[version.asset_id] = version.created_by for log in spreadsheet_logs: spreadsheet_edit_counts[log.resource_id] += 1 last_changed_at = spreadsheet_last_changed_at.get(log.resource_id) if last_changed_at is None or log.created_at >= last_changed_at: spreadsheet_last_changed_at[log.resource_id] = log.created_at spreadsheet_last_actor[log.resource_id] = log.actor return { item.id: { "change_count": ( spreadsheet_edit_counts.get(item.id, 0) if item.asset_type == AgentAssetType.RULE.value and str((item.config_json or {}).get("detail_mode") or "").strip().lower() == "spreadsheet" and spreadsheet_edit_counts.get(item.id, 0) > 0 else max(version_counts.get(item.id, 0) - 1, 0) ), "modified_by": ( spreadsheet_last_actor.get(item.id) if item.asset_type == AgentAssetType.RULE.value and str((item.config_json or {}).get("detail_mode") or "").strip().lower() == "spreadsheet" and spreadsheet_last_actor.get(item.id) else modified_by.get(item.id) ), } for item in assets } @staticmethod def _serialize_list_item( asset: AgentAsset, version_stats: dict[str, int | str | None] | None = None, ) -> AgentAssetListItem: payload = AgentAssetListItem.model_validate(asset).model_dump() payload["change_count"] = int((version_stats or {}).get("change_count") or 0) payload["modified_by"] = ( str((version_stats or {}).get("modified_by") or "").strip() or None ) return AgentAssetListItem.model_validate(payload) @staticmethod def _sort_versions( versions: list[AgentAssetVersion], current_version: str | None ) -> list[AgentAssetVersion]: return sorted( versions, key=lambda item: (item.version == current_version, item.created_at), reverse=True, ) @staticmethod def _serialize_content(content: Any, content_type: str) -> str: if content_type == AgentAssetContentType.MARKDOWN.value: return str(content) return json.dumps(content, ensure_ascii=False, sort_keys=True, indent=2) @staticmethod def _deserialize_content(version: AgentAssetVersion | None) -> Any: if version is None: return None if version.content_type == AgentAssetContentType.MARKDOWN.value: return version.content return json.loads(version.content) def _require_spreadsheet_rule(self, asset_id: str) -> AgentAsset: asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if asset.asset_type != AgentAssetType.RULE.value: raise ValueError("仅规则资产支持 Excel 规则表。") detail_mode = str((asset.config_json or {}).get("detail_mode") or "").strip().lower() if detail_mode != "spreadsheet": raise ValueError("当前规则未配置 Excel 规则表。") return asset def _resolve_spreadsheet_version_meta( self, asset: AgentAsset, *, version: str | None = None, ) -> tuple[str, RuleSpreadsheetMeta]: resolved_version = str(version or self._resolve_working_version(asset) or "").strip() if not resolved_version: raise ValueError("当前规则尚未配置表格版本。") version_row = self.repository.get_version(asset.id, resolved_version) if version_row is None: raise LookupError(f"版本 {resolved_version} 不存在") # 版本记录中的快照才是不变的事实来源。`/rules` 下的工作簿只是当前 # 可编辑副本,后续写入不应该反向污染某个已存在版本的内容。 metadata = self.spreadsheet_manager.parse_version_markdown(str(version_row.content or "")) if metadata is None and self._resolve_working_version(asset) == resolved_version: metadata = self._read_current_rule_document_meta(asset) if metadata is None: raise FileNotFoundError("规则表版本快照不存在。") return resolved_version, metadata def _resolve_current_spreadsheet_meta( self, asset: AgentAsset, ) -> tuple[str, RuleSpreadsheetMeta]: config_json = dict(asset.config_json or {}) current_meta = self._read_current_rule_document_meta(asset) file_name = ( current_meta.file_name if current_meta is not None and current_meta.file_name else self._resolve_default_spreadsheet_file_name(asset) ) library = self._resolve_spreadsheet_rule_library(asset) storage_key = (Path("rules") / library / file_name).as_posix() file_path = self.spreadsheet_manager.resolve_storage_path(storage_key) if not file_path.exists(): content: bytes | None = None if current_meta is not None and current_meta.storage_key: try: legacy_path = self.spreadsheet_manager.resolve_storage_path( current_meta.storage_key ) except FileNotFoundError: legacy_path = None if legacy_path is not None and legacy_path.exists(): content = legacy_path.read_bytes() if content is None: content = AgentAssetSpreadsheetManager.build_blank_rule_workbook( Path(file_name).stem or "规则表" ) meta = self.spreadsheet_manager.store_rule_library_spreadsheet( library=library, file_name=file_name, content=content, actor_name=( current_meta.updated_by if current_meta is not None and current_meta.updated_by else "system" ), source="current-rule", ) else: content = file_path.read_bytes() meta = RuleSpreadsheetMeta( file_name=file_name, storage_key=storage_key, mime_type=( current_meta.mime_type if current_meta is not None and current_meta.mime_type else SPREADSHEET_MIME_TYPE ), size_bytes=file_path.stat().st_size, checksum=self._hash_bytes(content), updated_at=datetime.fromtimestamp(file_path.stat().st_mtime, UTC).isoformat(), updated_by=( current_meta.updated_by if current_meta is not None and current_meta.updated_by else "system" ), source=( current_meta.source if current_meta is not None and current_meta.source else "current-rule" ), ) expected_document = { **self.spreadsheet_manager.build_rule_document_config( meta, asset_version="current", ), "storage_key": meta.storage_key, } if config_json.get("rule_document") != expected_document: config_json["detail_mode"] = "spreadsheet" config_json["tag"] = str(config_json.get("tag") or "财务规则").strip() or "财务规则" config_json["rule_library"] = library config_json["rule_document"] = expected_document asset.config_json = config_json self.repository.save_asset(asset) return "current", meta def _store_current_rule_spreadsheet( self, asset: AgentAsset, *, file_name: str, content: bytes, actor: str, source: str, ) -> RuleSpreadsheetMeta: library = self._resolve_spreadsheet_rule_library(asset) metadata = self.spreadsheet_manager.store_rule_library_spreadsheet( library=library, file_name=file_name, content=content, actor_name=actor, source=source, ) config_json = dict(asset.config_json or {}) config_json["detail_mode"] = "spreadsheet" config_json["tag"] = str(config_json.get("tag") or "财务规则").strip() or "财务规则" config_json["rule_library"] = library config_json["rule_document"] = { **self.spreadsheet_manager.build_rule_document_config( metadata, asset_version="current", ), "storage_key": metadata.storage_key, } asset.config_json = config_json self.repository.save_asset(asset) return metadata @staticmethod def _resolve_spreadsheet_rule_library(asset: AgentAsset) -> str: config_json = dict(asset.config_json or {}) library = str(config_json.get("rule_library") or FINANCE_RULES_LIBRARY).strip() if library not in RULE_LIBRARY_NAMES: return FINANCE_RULES_LIBRARY return library @staticmethod def _resolve_default_spreadsheet_file_name(asset: AgentAsset) -> str: if asset.code == COMPANY_TRAVEL_EXPENSE_RULE_CODE: return COMPANY_TRAVEL_EXPENSE_RULE_FILENAME if asset.code == COMPANY_COMMUNICATION_EXPENSE_RULE_CODE: return COMPANY_COMMUNICATION_EXPENSE_RULE_FILENAME fallback = Path(str(asset.name or "规则表").strip()).name return fallback if fallback.lower().endswith(".xlsx") else f"{fallback}.xlsx" def _build_onlyoffice_spreadsheet_config( self, *, asset_id: str, current_user: CurrentUserContext, resolved_version: str, metadata: RuleSpreadsheetMeta, editable: bool, ) -> AgentAssetOnlyOfficeConfigRead: onlyoffice_settings = resolve_onlyoffice_settings() settings = get_settings() if not onlyoffice_settings.enabled: raise ValueError("ONLYOFFICE 预览未启用。") if not onlyoffice_settings.public_url or not onlyoffice_settings.backend_url: raise ValueError("ONLYOFFICE 地址配置不完整。") if not onlyoffice_settings.jwt_secret: raise ValueError("ONLYOFFICE JWT 密钥未配置。") backend_base_url = onlyoffice_settings.backend_url.rstrip("/") public_url = onlyoffice_settings.public_url.rstrip("/") access_token = self._build_onlyoffice_access_token(asset_id, resolved_version) document_url = ( f"{backend_base_url}{settings.api_v1_prefix}/agent-assets/{asset_id}/spreadsheet/onlyoffice/content" f"?version={resolved_version}&access_token={access_token}" ) callback_url = ( f"{backend_base_url}{settings.api_v1_prefix}/agent-assets/{asset_id}/spreadsheet/onlyoffice/callback" f"?version={resolved_version}&actor_name={quote(current_user.name)}" ) config: dict[str, Any] = { "documentType": "cell", "document": { "fileType": Path(metadata.file_name).suffix.lstrip(".").lower() or "xlsx", "key": self._build_onlyoffice_document_key(asset_id, resolved_version, metadata), "title": metadata.file_name, "url": document_url, "permissions": { "download": True, "edit": editable, "print": True, "copy": True, }, }, "editorConfig": { "mode": "edit" if editable else "view", "lang": "zh-CN", "callbackUrl": callback_url, "user": { "id": current_user.username, "name": current_user.name, }, "customization": { "compactHeader": True, "compactToolbar": False, "toolbarNoTabs": False, "autosave": False, "forcesave": editable, }, }, "width": "100%", "height": "100%", } config["token"] = jwt.encode(config, onlyoffice_settings.jwt_secret, algorithm="HS256") return AgentAssetOnlyOfficeConfigRead(documentServerUrl=public_url, config=config) def _ensure_preview_rule_spreadsheet( self, *, version: str | None = None, ) -> tuple[str, RuleSpreadsheetMeta]: resolved_version = str(version or PREVIEW_RULE_CURRENT_VERSION).strip() if resolved_version not in PREVIEW_RULE_VERSION_FILENAMES: raise LookupError(f"版本 {resolved_version} 不存在") file_name = PREVIEW_RULE_VERSION_FILENAMES[resolved_version] storage_key = ( Path("agent_assets") / PREVIEW_RULE_ASSET_ID / "rule_spreadsheets" / resolved_version / file_name ).as_posix() try: file_path = self.spreadsheet_manager.resolve_storage_path(storage_key) except FileNotFoundError: file_path = None if file_path is not None and file_path.exists(): content = file_path.read_bytes() updated_at = datetime.fromtimestamp(file_path.stat().st_mtime, UTC).isoformat() return resolved_version, RuleSpreadsheetMeta( file_name=file_name, storage_key=storage_key, mime_type=SPREADSHEET_MIME_TYPE, size_bytes=file_path.stat().st_size, checksum=self._hash_bytes(content), updated_at=updated_at, updated_by="ONLYOFFICE 预览", source="preview", ) metadata = self.spreadsheet_manager.store_spreadsheet( asset_id=PREVIEW_RULE_ASSET_ID, version=resolved_version, file_name=file_name, content=AgentAssetSpreadsheetManager.build_company_travel_rule_template(), actor_name="ONLYOFFICE 预览", source="preview", ) return resolved_version, metadata def _handle_preview_rule_spreadsheet_onlyoffice_callback( self, *, version: str, payload: dict[str, Any], ) -> None: callback = self._parse_onlyoffice_callback(payload) if callback.status not in {2, 6} or not callback.download_url: return resolved_version, metadata = self._ensure_preview_rule_spreadsheet(version=version) request = Request( callback.download_url, headers={"User-Agent": "x-financial-onlyoffice-agent-asset-preview"}, ) with urlopen(request, timeout=30) as response: # noqa: S310 content = response.read() if metadata.checksum and metadata.checksum == self._hash_bytes(content): return actor_name = callback.users[0] if callback.users else "ONLYOFFICE" self.spreadsheet_manager.store_spreadsheet( asset_id=PREVIEW_RULE_ASSET_ID, version=resolved_version, file_name=metadata.file_name, content=content, actor_name=actor_name, source="onlyoffice-preview", ) @staticmethod def _read_current_rule_document_meta(asset: AgentAsset) -> RuleSpreadsheetMeta | None: payload = (asset.config_json or {}).get("rule_document") if not isinstance(payload, dict): return None return RuleSpreadsheetMeta( file_name=str(payload.get("file_name") or "").strip(), storage_key=str(payload.get("storage_key") or "").strip(), mime_type=( str(payload.get("mime_type") or "").strip() or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), size_bytes=int(payload.get("size_bytes") or 0), checksum=str(payload.get("checksum") or "").strip(), updated_at=str(payload.get("updated_at") or "").strip(), updated_by=str(payload.get("updated_by") or "system").strip() or "system", source=str(payload.get("source") or "upload").strip() or "upload", ) @staticmethod def _increment_version(version: str | None) -> str: normalized = str(version or "").strip().removeprefix("v") parts = normalized.split(".") if len(parts) != 3 or not all(item.isdigit() for item in parts): return "v1.0.0" major, minor, patch = [int(item) for item in parts] return f"v{major}.{minor}.{patch + 1}" @staticmethod def _can_edit_spreadsheet_version( asset: AgentAsset, current_user: CurrentUserContext, version: str, ) -> bool: role_codes = {str(item).strip() for item in current_user.role_codes} can_edit = current_user.is_admin or "manager" in role_codes or "finance" in role_codes return can_edit and AgentAssetService._resolve_working_version(asset) == str(version or "").strip() @staticmethod def _can_edit_current_spreadsheet(current_user: CurrentUserContext) -> bool: role_codes = {str(item).strip() for item in current_user.role_codes} return current_user.is_admin or "manager" in role_codes or "finance" in role_codes @staticmethod def _build_onlyoffice_document_key( asset_id: str, version: str, metadata: RuleSpreadsheetMeta, ) -> str: raw_key = f"{asset_id}-{version}-{metadata.checksum or metadata.updated_at or metadata.file_name}" return "".join( character if character.isalnum() or character in {"-", "_", ".", "="} else "_" for character in raw_key ) @staticmethod def _build_onlyoffice_access_token(asset_id: str, version: str) -> str: onlyoffice_settings = resolve_onlyoffice_settings() payload = { "scope": "agent-asset-spreadsheet", "asset_id": asset_id, "version": version, } return jwt.encode(payload, onlyoffice_settings.jwt_secret, algorithm="HS256") @staticmethod def _parse_onlyoffice_callback(payload: dict[str, Any]) -> OnlyOfficeCallbackPayload: return OnlyOfficeCallbackPayload( status=int(payload.get("status") or 0), download_url=str(payload.get("url") or "").strip(), users=[str(item).strip() for item in payload.get("users") or [] if str(item).strip()], ) @staticmethod def _hash_bytes(content: bytes) -> str: import hashlib return hashlib.sha256(content).hexdigest() @staticmethod def _asset_snapshot(asset: AgentAsset) -> dict[str, Any]: return { "asset_type": asset.asset_type, "code": asset.code, "name": asset.name, "status": asset.status, "current_version": asset.current_version, "published_version": asset.published_version, "working_version": asset.working_version, "domain": asset.domain, "owner": asset.owner, "reviewer": asset.reviewer, } @staticmethod def _resolve_working_version(asset: AgentAsset) -> str: return str(asset.working_version or asset.current_version or "").strip() @staticmethod def _resolve_published_version(asset: AgentAsset) -> str: return str(asset.published_version or "").strip() @staticmethod def _resolve_version_lifecycle_state( version: str, *, working_version: str, published_version: str, latest_review_status: str, ) -> str: if version == published_version: return "published" if version != working_version: return "history" if latest_review_status == AgentReviewStatus.PENDING.value: return "pending_review" if latest_review_status == AgentReviewStatus.APPROVED.value: return "approved" if latest_review_status == AgentReviewStatus.REJECTED.value: return "rejected" return "draft" def _load_spreadsheet_for_compare(self, metadata: RuleSpreadsheetMeta): from io import BytesIO from openpyxl import load_workbook file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key) if not file_path.exists(): raise FileNotFoundError(metadata.file_name) return load_workbook(BytesIO(file_path.read_bytes()), read_only=False, data_only=False) def _collect_workbook_changes( self, base_workbook, target_workbook ) -> tuple[list[AgentAssetSpreadsheetDiffSheetRead], list[AgentAssetSpreadsheetDiffCellRead]]: base_sheet_names = set(base_workbook.sheetnames) target_sheet_names = set(target_workbook.sheetnames) sheet_changes: list[AgentAssetSpreadsheetDiffSheetRead] = [] for sheet_name in sorted(target_sheet_names - base_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="added") ) for sheet_name in sorted(base_sheet_names - target_sheet_names): sheet_changes.append( AgentAssetSpreadsheetDiffSheetRead(sheet_name=sheet_name, change_type="removed") ) cell_changes: list[AgentAssetSpreadsheetDiffCellRead] = [] for sheet_name in sorted(base_sheet_names & target_sheet_names): base_sheet = base_workbook[sheet_name] target_sheet = target_workbook[sheet_name] max_row = max(base_sheet.max_row, target_sheet.max_row) max_column = max(base_sheet.max_column, target_sheet.max_column) for row_index in range(1, max_row + 1): for column_index in range(1, max_column + 1): before_value = base_sheet.cell(row=row_index, column=column_index).value after_value = target_sheet.cell(row=row_index, column=column_index).value if before_value == after_value: continue if before_value in (None, ""): change_type = "added" elif after_value in (None, ""): change_type = "removed" else: change_type = "modified" cell_changes.append( AgentAssetSpreadsheetDiffCellRead( sheet_name=sheet_name, cell=target_sheet.cell(row=row_index, column=column_index).coordinate, change_type=change_type, before_value=before_value, after_value=after_value, ) ) return sheet_changes, cell_changes @staticmethod def _extract_restore_source_version(change_note: str | None) -> str | None: normalized = str(change_note or "").strip() prefix = "基于历史版本 " suffix = " 恢复生成工作稿" if not normalized.startswith(prefix) or suffix not in normalized: return None return normalized.removeprefix(prefix).split(suffix, 1)[0].strip() or None def _resolve_json_risk_rule_document(self, asset: AgentAsset) -> tuple[str, str]: config_json = dict(asset.config_json or {}) detail_mode = str(config_json.get("detail_mode") or "").strip().lower() if detail_mode != "json_risk": raise ValueError("当前资产不是 JSON 风险规则。") rule_library = str(config_json.get("rule_library") or RISK_RULES_LIBRARY).strip() if rule_library not in RULE_LIBRARY_NAMES: raise ValueError("规则库目录不合法。") rule_document = config_json.get("rule_document") if not isinstance(rule_document, dict): raise ValueError("规则资产缺少 rule_document 配置。") file_name = str(rule_document.get("file_name") or "").strip() if not file_name: raise ValueError("规则资产缺少 JSON 文件名。") return rule_library, file_name def read_rule_json(self, asset_id: str) -> AgentAssetRuleJsonRead: asset = self.repository.get(asset_id) if asset is None: raise LookupError("资产不存在。") rule_library, file_name = self._resolve_json_risk_rule_document(asset) payload = self.rule_library_manager.read_rule_library_json( library=rule_library, file_name=file_name, ) return AgentAssetRuleJsonRead( file_name=file_name, rule_code=str(payload.get("rule_code") or asset.code or ""), name=str(payload.get("name") or asset.name or ""), description=str(payload.get("description") or asset.description or "").strip(), evaluator=str(payload.get("evaluator") or ""), ontology_signal=str(payload.get("ontology_signal") or "") or None, inputs=payload.get("inputs") if isinstance(payload.get("inputs"), dict) else {}, outcomes=payload.get("outcomes") if isinstance(payload.get("outcomes"), dict) else {}, payload=payload, ) def write_rule_json( self, asset_id: str, *, body: AgentAssetRuleJsonWrite, actor: str, request_id: str | None = None, ) -> AgentAssetRuleJsonRead: asset = self.repository.get(asset_id) if asset is None: raise LookupError("资产不存在。") rule_library, file_name = self._resolve_json_risk_rule_document(asset) payload = dict(body.payload or {}) asset_code = str(asset.code or "").strip() if asset_code and str(payload.get("rule_code") or "").strip() not in {"", asset_code}: raise ValueError("规则 JSON 的 rule_code 必须与资产编码一致。") if asset_code and not str(payload.get("rule_code") or "").strip(): payload["rule_code"] = asset_code saved = self.rule_library_manager.write_rule_library_json( library=rule_library, file_name=file_name, payload=payload, ) rule_description = str(saved.get("description") or "").strip() if rule_description: asset.description = rule_description rule_name = str(saved.get("name") or "").strip() if rule_name: asset.name = rule_name risk_category = str(saved.get("risk_category") or "").strip() if risk_category: config_json = dict(asset.config_json or {}) config_json["risk_category"] = risk_category asset.config_json = config_json asset.scenario_json = [risk_category] self.audit_service.log_action( actor=actor, action="update_agent_asset_rule_json", resource_type=asset.asset_type, resource_id=asset.id, before_json={"file_name": file_name}, after_json={"file_name": file_name, "rule_code": saved.get("rule_code")}, request_id=request_id, ) self.db.commit() return self.read_rule_json(asset_id)