from __future__ import annotations import json from datetime import UTC, datetime from typing import Any from sqlalchemy.orm import Session from app.core.agent_enums import ( AgentAssetContentType, AgentAssetStatus, AgentAssetType, AgentReviewStatus, ) from app.core.logging import get_logger from app.models.agent_asset import AgentAsset, AgentAssetReview, AgentAssetVersion from app.repositories.agent_asset import AgentAssetRepository from app.schemas.agent_asset import ( AgentAssetCreate, AgentAssetListItem, AgentAssetRead, AgentAssetReviewCreate, AgentAssetReviewRead, AgentAssetUpdate, AgentAssetVersionCreate, AgentAssetVersionRead, ) from app.services.agent_foundation import AgentFoundationService from app.services.audit import AuditLogService logger = get_logger("app.services.agent_assets") class AgentAssetService: def __init__(self, db: Session) -> None: self.db = db self.repository = AgentAssetRepository(db) self.audit_service = AuditLogService(db) def list_assets( self, *, asset_type: str | None = None, status: str | None = None, domain: str | None = None, keyword: str | None = None, ) -> list[AgentAssetListItem]: self._ensure_ready() items = self.repository.list( asset_type=asset_type, status=status, domain=domain, keyword=keyword ) return [AgentAssetListItem.model_validate(item) for item in items] def get_asset(self, asset_id: str) -> AgentAssetRead | None: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: return None recent_versions = self._sort_versions( self.repository.list_versions(asset_id, limit=5), asset.current_version, ) latest_review = next(iter(self.repository.list_reviews(asset_id, limit=1)), None) current_version = ( self.repository.get_version(asset_id, asset.current_version) if asset.current_version else None ) return AgentAssetRead( **AgentAssetListItem.model_validate(asset).model_dump(), current_version_content=self._deserialize_content(current_version) if current_version else None, current_version_content_type=current_version.content_type if current_version else None, current_version_change_note=current_version.change_note if current_version else None, recent_versions=[ self._serialize_version(item, asset.current_version) for item in recent_versions ], latest_review=AgentAssetReviewRead.model_validate(latest_review) if latest_review else None, ) def create_asset( self, payload: AgentAssetCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() if self.repository.get_by_code(payload.code): raise ValueError(f"资产编码 {payload.code} 已存在") if payload.status == AgentAssetStatus.ACTIVE: raise ValueError("请先创建资产并完成审核,再通过上线接口激活。") asset = AgentAsset( asset_type=payload.asset_type.value, code=payload.code, name=payload.name, description=payload.description, domain=payload.domain.value, scenario_json=payload.scenario_json, owner=payload.owner, reviewer=payload.reviewer, status=payload.status.value, config_json=payload.config_json, ) created = self.repository.create_asset(asset) self.audit_service.log_action( actor=actor, action="create_agent_asset", resource_type=created.asset_type, resource_id=created.id, before_json=None, after_json=self._asset_snapshot(created), request_id=request_id, ) logger.info("Created agent asset id=%s code=%s", created.id, created.code) return self.get_asset(created.id) # type: ignore[return-value] def update_asset( self, asset_id: str, payload: AgentAssetUpdate, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") before = self._asset_snapshot(asset) if payload.status == AgentAssetStatus.ACTIVE: raise ValueError("请使用上线接口激活资产。") for field_name in ( "name", "description", "owner", "reviewer", "current_version", "config_json", "scenario_json", ): value = getattr(payload, field_name) if value is not None: setattr(asset, field_name, value) if payload.domain is not None: asset.domain = payload.domain.value if payload.status is not None: asset.status = payload.status.value if payload.current_version is not None and not self.repository.get_version( asset_id, payload.current_version ): raise LookupError(f"版本 {payload.current_version} 不存在") updated = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="update_agent_asset", resource_type=updated.asset_type, resource_id=updated.id, before_json=before, after_json=self._asset_snapshot(updated), request_id=request_id, ) logger.info("Updated agent asset id=%s code=%s", updated.id, updated.code) return self.get_asset(updated.id) # type: ignore[return-value] def list_versions(self, asset_id: str, *, limit: int = 20) -> list[AgentAssetVersionRead]: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") versions = self._sort_versions( self.repository.list_versions(asset_id, limit=limit), asset.current_version, ) return [self._serialize_version(item, asset.current_version) for item in versions] def create_version( self, asset_id: str, payload: AgentAssetVersionCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetVersionRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if self.repository.get_version(asset_id, payload.version): raise ValueError(f"版本号 {payload.version} 已存在") self._validate_version_payload(asset, payload) serialized_content = self._serialize_content(payload.content, payload.content_type.value) version = AgentAssetVersion( asset_id=asset_id, version=payload.version, content=serialized_content, content_type=payload.content_type.value, change_note=payload.change_note, created_by=payload.created_by, ) created = self.repository.create_version(version) before = self._asset_snapshot(asset) asset.current_version = payload.version if ( asset.asset_type == AgentAssetType.RULE.value and asset.status == AgentAssetStatus.ACTIVE.value ): asset.status = AgentAssetStatus.REVIEW.value updated_asset = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="save_agent_asset_version", resource_type=updated_asset.asset_type, resource_id=updated_asset.id, before_json=before, after_json={ "current_version": updated_asset.current_version, "status": updated_asset.status, }, request_id=request_id, ) logger.info("Created agent asset version asset_id=%s version=%s", asset_id, payload.version) return self._serialize_version(created, updated_asset.current_version) def create_review( self, asset_id: str, payload: AgentAssetReviewCreate, *, actor: str, request_id: str | None = None, ) -> AgentAssetReviewRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if self.repository.get_version(asset_id, payload.version) is None: raise LookupError(f"版本 {payload.version} 不存在") review = AgentAssetReview( asset_id=asset_id, version=payload.version, reviewer=payload.reviewer, review_status=payload.review_status.value, review_note=payload.review_note, reviewed_at=None if payload.review_status == AgentReviewStatus.PENDING else datetime.now(UTC), ) created = self.repository.create_review(review) before = self._asset_snapshot(asset) asset.reviewer = payload.reviewer if payload.review_status == AgentReviewStatus.PENDING: asset.status = AgentAssetStatus.REVIEW.value elif payload.review_status == AgentReviewStatus.REJECTED: asset.status = AgentAssetStatus.DRAFT.value elif asset.status != AgentAssetStatus.ACTIVE.value: asset.status = AgentAssetStatus.REVIEW.value self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="review_agent_asset", resource_type=asset.asset_type, resource_id=asset.id, before_json=before, after_json={ "review_version": payload.version, "review_status": payload.review_status.value, "asset_status": asset.status, }, request_id=request_id, ) logger.info( "Created review asset_id=%s version=%s status=%s", asset_id, payload.version, payload.review_status.value, ) return AgentAssetReviewRead.model_validate(created) def activate_asset( self, asset_id: str, *, actor: str, request_id: str | None = None, ) -> AgentAssetRead: self._ensure_ready() asset = self.repository.get(asset_id) if asset is None: raise LookupError("Asset not found") if not asset.current_version: raise ValueError("资产尚未设置当前版本,无法上线。") if asset.asset_type == AgentAssetType.RULE.value: review = self.repository.get_review( asset.id, asset.current_version, AgentReviewStatus.APPROVED.value ) if review is None: raise PermissionError("规则当前版本尚未审核通过,不能上线。") before = self._asset_snapshot(asset) asset.status = AgentAssetStatus.ACTIVE.value updated = self.repository.save_asset(asset) self.audit_service.log_action( actor=actor, action="activate_agent_asset", resource_type=updated.asset_type, resource_id=updated.id, before_json=before, after_json=self._asset_snapshot(updated), request_id=request_id, ) logger.info("Activated agent asset id=%s code=%s", updated.id, updated.code) return self.get_asset(updated.id) # type: ignore[return-value] def _ensure_ready(self) -> None: AgentFoundationService(self.db).ensure_foundation_ready() def _validate_version_payload( self, asset: AgentAsset, payload: AgentAssetVersionCreate ) -> None: if ( asset.asset_type == AgentAssetType.RULE.value and payload.content_type != AgentAssetContentType.MARKDOWN ): raise ValueError("规则资产版本内容必须使用 markdown。") if ( asset.asset_type != AgentAssetType.RULE.value and payload.content_type != AgentAssetContentType.JSON ): raise ValueError("技能、MCP、任务资产版本内容必须使用 json。") if payload.content_type == AgentAssetContentType.MARKDOWN and not isinstance( payload.content, str ): raise ValueError("Markdown 内容必须是字符串。") if payload.content_type == AgentAssetContentType.JSON and not isinstance( payload.content, (dict, list) ): raise ValueError("JSON 内容必须是对象或数组。") def _serialize_version( self, version: AgentAssetVersion, current_version: str | None ) -> AgentAssetVersionRead: return AgentAssetVersionRead( id=version.id, asset_id=version.asset_id, version=version.version, content=self._deserialize_content(version), content_type=version.content_type, change_note=version.change_note, created_by=version.created_by, created_at=version.created_at, is_current=version.version == current_version, ) @staticmethod def _sort_versions( versions: list[AgentAssetVersion], current_version: str | None ) -> list[AgentAssetVersion]: return sorted( versions, key=lambda item: (item.version == current_version, item.created_at), reverse=True, ) @staticmethod def _serialize_content(content: Any, content_type: str) -> str: if content_type == AgentAssetContentType.MARKDOWN.value: return str(content) return json.dumps(content, ensure_ascii=False, sort_keys=True, indent=2) @staticmethod def _deserialize_content(version: AgentAssetVersion | None) -> Any: if version is None: return None if version.content_type == AgentAssetContentType.MARKDOWN.value: return version.content return json.loads(version.content) @staticmethod def _asset_snapshot(asset: AgentAsset) -> dict[str, Any]: return { "asset_type": asset.asset_type, "code": asset.code, "name": asset.name, "status": asset.status, "current_version": asset.current_version, "domain": asset.domain, "owner": asset.owner, "reviewer": asset.reviewer, }