Files
X-Financial/server/src/app/services/agent_assets.py
caoxiaozhu 9f7b8b46a3 Refine travel reimbursement steward flow
Align planner, runtime rules, and policy assets so travel guidance
matches the updated reimbursement workflow.
2026-06-15 22:55:18 +08:00

857 lines
35 KiB
Python

from __future__ import annotations
import json
from collections import defaultdict
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.orm import Session
from app.core.agent_enums import (
AgentAssetContentType,
AgentAssetStatus,
AgentAssetType,
AgentReviewStatus,
)
from app.core.logging import get_logger
from app.models.agent_asset import AgentAsset, AgentAssetReview, AgentAssetVersion
from app.repositories.agent_asset import AgentAssetRepository
from app.schemas.agent_asset import (
AgentAssetCreate,
AgentAssetListItem,
AgentAssetRead,
AgentAssetReviewCreate,
AgentAssetReviewRead,
AgentAssetUpdate,
AgentAssetVersionCreate,
AgentAssetVersionRead,
)
from app.services.agent_asset_json_rules import AgentAssetJsonRuleMixin
from app.services.agent_asset_onlyoffice import AgentAssetOnlyOfficeMixin
from app.services.agent_asset_risk_rule_feedback import AgentAssetRiskRuleFeedbackMixin
from app.services.agent_asset_risk_rule_level import AgentAssetRiskRuleLevelMixin
from app.services.agent_asset_risk_rule_publish import AgentAssetRiskRulePublishMixin
from app.services.agent_asset_risk_rule_simulation import AgentAssetRiskRuleSimulationMixin
from app.services.agent_asset_risk_rule_testing import AgentAssetRiskRuleTestingMixin
from app.services.agent_asset_rule_library import AgentAssetRuleLibraryManager
from app.services.agent_asset_spreadsheet import AgentAssetSpreadsheetManager
from app.services.agent_asset_spreadsheet_helpers import AgentAssetSpreadsheetHelperMixin
from app.services.agent_asset_timeline import AgentAssetTimelineMixin
from app.services.agent_foundation import AgentFoundationService
from app.services.audit import AuditLogService
from app.services.pagination import PageResult, normalize_page_params
from app.services.risk_rule_manifest_classifier import is_budget_risk_manifest
from app.services.risk_rule_score_backfill import backfill_missing_risk_rule_score
logger = get_logger("app.services.agent_assets")
class AgentAssetService(
AgentAssetOnlyOfficeMixin,
AgentAssetSpreadsheetHelperMixin,
AgentAssetRiskRuleLevelMixin,
AgentAssetRiskRulePublishMixin,
AgentAssetRiskRuleFeedbackMixin,
AgentAssetRiskRuleTestingMixin,
AgentAssetRiskRuleSimulationMixin,
AgentAssetTimelineMixin,
AgentAssetJsonRuleMixin,
):
def __init__(self, db: Session) -> None:
self.db = db
self.repository = AgentAssetRepository(db)
self.audit_service = AuditLogService(db)
self.spreadsheet_manager = AgentAssetSpreadsheetManager()
self.rule_library_manager = AgentAssetRuleLibraryManager()
def list_assets(
self,
*,
asset_type: str | None = None,
status: str | None = None,
domain: str | None = None,
keyword: str | None = None,
) -> list[AgentAssetListItem]:
self._ensure_ready()
if asset_type in {None, "", AgentAssetType.RULE.value}:
self.sync_rule_assets_from_libraries()
assets = self.repository.list(
asset_type=asset_type, status=status, domain=domain, keyword=keyword
)
assets = self._filter_excluded_risk_assets(assets)
version_stats = self._collect_version_stats(assets)
return [self._serialize_list_item(asset, version_stats.get(asset.id)) for asset in assets]
def list_assets_page(
self,
*,
asset_type: str | None = None,
status: str | None = None,
domain: str | None = None,
keyword: str | None = None,
page: int | None,
page_size: int | None,
) -> PageResult[AgentAssetListItem]:
self._ensure_ready()
if asset_type in {None, "", AgentAssetType.RULE.value}:
self.sync_rule_assets_from_libraries()
assets = self.repository.list(
asset_type=asset_type,
status=status,
domain=domain,
keyword=keyword,
)
assets = self._filter_excluded_risk_assets(assets)
page_params = normalize_page_params(page, page_size)
paged_assets = assets[page_params.offset : page_params.offset + page_params.page_size]
version_stats = self._collect_version_stats(paged_assets)
return PageResult(
items=[
self._serialize_list_item(asset, version_stats.get(asset.id))
for asset in paged_assets
],
total=len(assets),
page=page_params.page,
page_size=page_params.page_size,
)
def get_asset(self, asset_id: str) -> AgentAssetRead | None:
self._ensure_ready()
asset = self.repository.get(asset_id)
if asset is None:
return None
try:
if backfill_missing_risk_rule_score(asset):
asset = self.repository.save_asset(asset)
except Exception:
logger.warning("Failed to backfill risk rule score asset_id=%s", asset_id, exc_info=True)
working_version = self._resolve_working_version(asset)
recent_versions = self._sort_versions(
self.repository.list_versions(asset_id, limit=5),
working_version,
)
latest_review = (
self.repository.get_review(asset_id, working_version)
if working_version
else next(iter(self.repository.list_reviews(asset_id, limit=1)), None)
)
current_version = (
self.repository.get_version(asset_id, working_version) if working_version else None
)
version_stats = self._collect_version_stats([asset]).get(asset.id)
return AgentAssetRead(
**self._serialize_list_item(asset, version_stats).model_dump(),
current_version_content=self._deserialize_content(current_version)
if current_version
else None,
current_version_content_type=current_version.content_type if current_version else None,
current_version_change_note=current_version.change_note if current_version else None,
recent_versions=[self._serialize_version(item, asset) for item in recent_versions],
latest_review=AgentAssetReviewRead.model_validate(latest_review)
if latest_review
else None,
latest_test_summary=self.get_latest_risk_rule_test_summary(asset)
if str((asset.config_json or {}).get("detail_mode") or "").strip().lower()
== "json_risk"
and working_version
and asset.status
not in {AgentAssetStatus.GENERATING.value, AgentAssetStatus.FAILED.value}
else None,
)
@staticmethod
def _filter_excluded_risk_assets(assets: list[AgentAsset]) -> list[AgentAsset]:
return [asset for asset in assets if not AgentAssetService._is_excluded_budget_risk_asset(asset)]
@staticmethod
def _is_excluded_budget_risk_asset(asset: AgentAsset) -> bool:
if asset.asset_type != AgentAssetType.RULE.value:
return False
config_json = asset.config_json if isinstance(asset.config_json, dict) else {}
if str(config_json.get("detail_mode") or "").strip().lower() != "json_risk":
return False
manifest_like = {
**config_json,
"rule_code": str(asset.code or "").strip(),
"name": str(asset.name or "").strip(),
"description": str(asset.description or "").strip(),
"metadata": config_json,
}
return is_budget_risk_manifest(manifest_like)
def create_asset(
self,
payload: AgentAssetCreate,
*,
actor: str,
request_id: str | None = None,
) -> AgentAssetRead:
self._ensure_ready()
if self.repository.get_by_code(payload.code):
raise ValueError(f"资产编码 {payload.code} 已存在")
if payload.status == AgentAssetStatus.ACTIVE:
raise ValueError("请先创建资产并完成审核,再通过上线接口激活。")
asset = AgentAsset(
asset_type=payload.asset_type.value,
code=payload.code,
name=payload.name,
description=payload.description,
domain=payload.domain.value,
scenario_json=payload.scenario_json,
owner=payload.owner,
reviewer=payload.reviewer,
status=payload.status.value,
config_json=payload.config_json,
)
created = self.repository.create_asset(asset)
self.audit_service.log_action(
actor=actor,
action="create_agent_asset",
resource_type=created.asset_type,
resource_id=created.id,
before_json=None,
after_json=self._asset_snapshot(created),
request_id=request_id,
)
logger.info("Created agent asset id=%s code=%s", created.id, created.code)
return self.get_asset(created.id) # type: ignore[return-value]
def update_asset(
self,
asset_id: str,
payload: AgentAssetUpdate,
*,
actor: str,
request_id: str | None = None,
) -> AgentAssetRead:
self._ensure_ready()
asset = self.repository.get(asset_id)
if asset is None:
raise LookupError("Asset not found")
before = self._asset_snapshot(asset)
if payload.status == AgentAssetStatus.ACTIVE:
raise ValueError("请使用上线接口激活资产。")
for field_name in (
"name",
"description",
"owner",
"reviewer",
"current_version",
"published_version",
"working_version",
"config_json",
"scenario_json",
):
value = getattr(payload, field_name)
if value is not None:
setattr(asset, field_name, value)
if payload.domain is not None:
asset.domain = payload.domain.value
if payload.status is not None:
asset.status = payload.status.value
if payload.current_version is not None and not self.repository.get_version(
asset_id, payload.current_version
):
raise LookupError(f"版本 {payload.current_version} 不存在")
if payload.published_version is not None and not self.repository.get_version(
asset_id, payload.published_version
):
raise LookupError(f"版本 {payload.published_version} 不存在")
if payload.working_version is not None and not self.repository.get_version(
asset_id, payload.working_version
):
raise LookupError(f"版本 {payload.working_version} 不存在")
if payload.current_version is not None and payload.working_version is None:
asset.working_version = payload.current_version
if payload.working_version is not None:
asset.current_version = payload.working_version
updated = self.repository.save_asset(asset)
self.audit_service.log_action(
actor=actor,
action="update_agent_asset",
resource_type=updated.asset_type,
resource_id=updated.id,
before_json=before,
after_json=self._asset_snapshot(updated),
request_id=request_id,
)
logger.info("Updated agent asset id=%s code=%s", updated.id, updated.code)
return self.get_asset(updated.id) # type: ignore[return-value]
def list_versions(self, asset_id: str, *, limit: int = 20) -> list[AgentAssetVersionRead]:
self._ensure_ready()
asset = self.repository.get(asset_id)
if asset is None:
raise LookupError("Asset not found")
versions = self._sort_versions(
self.repository.list_versions(asset_id, limit=limit),
self._resolve_working_version(asset),
)
return [self._serialize_version(item, asset) for item in versions]
def create_version(
self,
asset_id: str,
payload: AgentAssetVersionCreate,
*,
actor: str,
request_id: str | None = None,
) -> AgentAssetVersionRead:
self._ensure_ready()
asset = self.repository.get(asset_id)
if asset is None:
raise LookupError("Asset not found")
if self.repository.get_version(asset_id, payload.version):
raise ValueError(f"版本号 {payload.version} 已存在")
self._validate_version_payload(asset, payload)
serialized_content = self._serialize_content(payload.content, payload.content_type.value)
version = AgentAssetVersion(
asset_id=asset_id,
version=payload.version,
content=serialized_content,
content_type=payload.content_type.value,
change_note=payload.change_note,
created_by=payload.created_by,
)
created = self.repository.create_version(version)
before = self._asset_snapshot(asset)
asset.working_version = payload.version
asset.current_version = payload.version
updated_asset = self.repository.save_asset(asset)
self.audit_service.log_action(
actor=actor,
action="save_agent_asset_version",
resource_type=updated_asset.asset_type,
resource_id=updated_asset.id,
before_json=before,
after_json={
"current_version": updated_asset.current_version,
"working_version": updated_asset.working_version,
"published_version": updated_asset.published_version,
"status": updated_asset.status,
},
request_id=request_id,
)
logger.info("Created agent asset version asset_id=%s version=%s", asset_id, payload.version)
return self._serialize_version(created, updated_asset)
def create_review(
self,
asset_id: str,
payload: AgentAssetReviewCreate,
*,
actor: str,
request_id: str | None = None,
) -> AgentAssetReviewRead:
self._ensure_ready()
asset = self.repository.get(asset_id)
if asset is None:
raise LookupError("Asset not found")
if (
asset.asset_type == AgentAssetType.RULE.value
and payload.review_status == AgentReviewStatus.PENDING
and payload.version != self._resolve_working_version(asset)
):
if self.repository.get_version(asset_id, payload.version) is not None:
raise ValueError(f"版本 {payload.version} 已存在,不能重复送审。")
asset = self._create_named_working_copy_for_review(
asset,
target_version=payload.version,
actor=actor,
request_id=request_id,
)
if self.repository.get_version(asset_id, payload.version) is None:
raise LookupError(f"版本 {payload.version} 不存在")
if asset.asset_type == AgentAssetType.RULE.value:
if (
str((asset.config_json or {}).get("detail_mode") or "").strip().lower()
== "json_risk"
and payload.review_status == AgentReviewStatus.PENDING
and not self.get_latest_risk_rule_test_summary(asset).test_passed
):
raise PermissionError("当前规则版本尚未完成测试通过确认,不能提交审核。")
working_version = self._resolve_working_version(asset)
if payload.version != working_version:
raise ValueError("只能对当前工作版本发起审核。")
review = AgentAssetReview(
asset_id=asset_id,
version=payload.version,
reviewer=payload.reviewer,
review_status=payload.review_status.value,
review_note=payload.review_note,
reviewed_at=None
if payload.review_status == AgentReviewStatus.PENDING
else datetime.now(UTC),
)
created = self.repository.create_review(review)
before = self._asset_snapshot(asset)
asset.reviewer = payload.reviewer
if payload.review_status == AgentReviewStatus.PENDING:
if not asset.published_version:
asset.status = AgentAssetStatus.REVIEW.value
elif payload.review_status == AgentReviewStatus.REJECTED:
if not asset.published_version:
asset.status = AgentAssetStatus.DRAFT.value
elif not asset.published_version:
asset.status = AgentAssetStatus.REVIEW.value
self.repository.save_asset(asset)
self.audit_service.log_action(
actor=actor,
action="review_agent_asset",
resource_type=asset.asset_type,
resource_id=asset.id,
before_json=before,
after_json={
"review_version": payload.version,
"review_status": payload.review_status.value,
"asset_status": asset.status,
},
request_id=request_id,
)
logger.info(
"Created review asset_id=%s version=%s status=%s",
asset_id,
payload.version,
payload.review_status.value,
)
return AgentAssetReviewRead.model_validate(created)
def _create_named_working_copy_for_review(
self,
asset: AgentAsset,
*,
target_version: str,
actor: str,
request_id: str | None = None,
) -> AgentAsset:
working_version = self._resolve_working_version(asset)
if not working_version:
raise ValueError("当前规则尚未配置工作版本,无法提交审核。")
source = self.repository.get_version(asset.id, working_version)
if source is None:
raise LookupError(f"版本 {working_version} 不存在")
is_spreadsheet_rule = (
asset.asset_type == AgentAssetType.RULE.value
and str((asset.config_json or {}).get("detail_mode") or "").strip().lower()
== "spreadsheet"
)
if is_spreadsheet_rule:
_, metadata = self._resolve_spreadsheet_version_meta(asset, version=working_version)
file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key)
if not file_path.exists():
raise FileNotFoundError(metadata.file_name)
snapshot_meta = self.spreadsheet_manager.store_rule_library_spreadsheet_snapshot(
library=self._resolve_spreadsheet_rule_library(asset),
asset_id=asset.id,
version=target_version,
file_name=metadata.file_name,
content=file_path.read_bytes(),
actor_name=actor,
source="review-submit",
)
next_content = self.spreadsheet_manager.build_version_markdown(
rule_name=asset.name,
version=target_version,
metadata=snapshot_meta,
)
next_content_type = AgentAssetContentType.MARKDOWN
else:
next_content = self._deserialize_content(source)
next_content_type = AgentAssetContentType(source.content_type)
self.create_version(
asset.id,
AgentAssetVersionCreate(
version=target_version,
content=next_content,
content_type=next_content_type,
change_note=f"提交审核前固化工作稿为 {target_version}",
created_by=actor,
),
actor=actor,
request_id=request_id,
)
refreshed = self.repository.get(asset.id)
if refreshed is None:
raise LookupError("Asset not found")
if is_spreadsheet_rule:
config_json = dict(refreshed.config_json or {})
current_document_meta = self._read_current_rule_document_meta(refreshed)
if current_document_meta is not None:
rule_document = self.spreadsheet_manager.build_rule_document_config(
current_document_meta,
asset_version=target_version,
)
rule_document["storage_key"] = current_document_meta.storage_key
config_json["rule_document"] = rule_document
refreshed.config_json = config_json
self.repository.save_asset(refreshed)
return refreshed
def activate_asset(
self,
asset_id: str,
*,
actor: str,
request_id: str | None = None,
) -> AgentAssetRead:
self._ensure_ready()
asset = self.repository.get(asset_id)
if asset is None:
raise LookupError("Asset not found")
candidate_version = self._resolve_working_version(asset)
if not candidate_version:
raise ValueError("资产尚未设置工作版本,无法上线。")
if asset.asset_type == AgentAssetType.RULE.value:
review = self.repository.get_review(
asset.id, candidate_version, AgentReviewStatus.APPROVED.value
)
if review is None:
raise PermissionError("规则工作版本尚未审核通过,不能上线。")
before = self._asset_snapshot(asset)
asset.published_version = candidate_version
asset.status = AgentAssetStatus.ACTIVE.value
updated = self.repository.save_asset(asset)
self.audit_service.log_action(
actor=actor,
action="activate_agent_asset",
resource_type=updated.asset_type,
resource_id=updated.id,
before_json=before,
after_json=self._asset_snapshot(updated),
request_id=request_id,
)
logger.info("Activated agent asset id=%s code=%s", updated.id, updated.code)
return self.get_asset(updated.id) # type: ignore[return-value]
def _ensure_ready(self) -> None:
AgentFoundationService(self.db).ensure_foundation_ready()
def sync_platform_risk_rules_from_library(self) -> int:
manifest_count = AgentFoundationService(self.db).sync_platform_risk_rules_from_library()
self.db.commit()
return manifest_count
def sync_rule_assets_from_libraries(self) -> int:
foundation = AgentFoundationService(self.db)
synced_count = foundation.sync_finance_rule_assets_from_catalog()
synced_count += foundation.sync_platform_risk_rules_from_library()
self.db.commit()
return synced_count
def _validate_version_payload(
self, asset: AgentAsset, payload: AgentAssetVersionCreate
) -> None:
if (
asset.asset_type == AgentAssetType.RULE.value
and payload.content_type != AgentAssetContentType.MARKDOWN
):
raise ValueError("规则资产版本内容必须使用 markdown。")
if (
asset.asset_type not in {AgentAssetType.RULE.value, AgentAssetType.TASK.value}
and payload.content_type != AgentAssetContentType.JSON
):
raise ValueError("技能、MCP 资产版本内容必须使用 json。")
if payload.content_type == AgentAssetContentType.MARKDOWN and not isinstance(
payload.content, str
):
raise ValueError("Markdown 内容必须是字符串。")
if payload.content_type == AgentAssetContentType.JSON and not isinstance(
payload.content, (dict, list)
):
raise ValueError("JSON 内容必须是对象或数组。")
def restore_version_as_working_copy(
self,
asset_id: str,
source_version: str,
*,
actor: str,
request_id: str | None = None,
) -> AgentAssetRead:
self._ensure_ready()
asset = self.repository.get(asset_id)
if asset is None:
raise LookupError("Asset not found")
source = self.repository.get_version(asset_id, source_version)
if source is None:
raise LookupError(f"版本 {source_version} 不存在")
if (
asset.asset_type == AgentAssetType.RULE.value
and str((asset.config_json or {}).get("detail_mode") or "").strip().lower()
== "spreadsheet"
):
metadata = self.spreadsheet_manager.parse_version_markdown(str(source.content or ""))
if metadata is None:
raise FileNotFoundError("历史规则表快照不存在,无法恢复。")
file_path = self.spreadsheet_manager.resolve_storage_path(metadata.storage_key)
if not file_path.exists():
raise FileNotFoundError(metadata.file_name)
restored = self.upload_rule_spreadsheet(
asset.id,
filename=metadata.file_name,
content=file_path.read_bytes(),
actor=actor,
request_id=request_id,
change_note=f"基于历史版本 {source_version} 恢复生成工作稿",
source="restore",
)
self.audit_service.log_action(
actor=actor,
action="restore_agent_asset_version",
resource_type=asset.asset_type,
resource_id=asset.id,
before_json={"source_version": source_version},
after_json={"working_version": restored.working_version},
request_id=request_id,
)
return restored
next_version = self._increment_version(self._resolve_working_version(asset))
self.create_version(
asset.id,
AgentAssetVersionCreate(
version=next_version,
content=self._deserialize_content(source),
content_type=AgentAssetContentType(source.content_type),
change_note=f"基于历史版本 {source_version} 恢复生成工作稿",
created_by=actor,
),
actor=actor,
request_id=request_id,
)
restored = self.get_asset(asset.id)
self.audit_service.log_action(
actor=actor,
action="restore_agent_asset_version",
resource_type=asset.asset_type,
resource_id=asset.id,
before_json={"source_version": source_version},
after_json={"working_version": next_version},
request_id=request_id,
)
return restored # type: ignore[return-value]
def _serialize_version(
self, version: AgentAssetVersion, asset: AgentAsset
) -> AgentAssetVersionRead:
latest_review = self.repository.get_review(asset.id, version.version)
working_version = self._resolve_working_version(asset)
published_version = self._resolve_published_version(asset)
return AgentAssetVersionRead(
id=version.id,
asset_id=version.asset_id,
version=version.version,
content=self._deserialize_content(version),
content_type=version.content_type,
change_note=version.change_note,
created_by=version.created_by,
created_at=version.created_at,
is_current=version.version == working_version,
is_published=version.version == published_version,
is_working=version.version == working_version,
lifecycle_state=self._resolve_version_lifecycle_state(
version.version,
working_version=working_version,
published_version=published_version,
latest_review_status=latest_review.review_status if latest_review else "",
),
)
def _collect_version_stats(self, assets: list[AgentAsset]) -> dict[str, dict[str, Any]]:
asset_ids = [item.id for item in assets]
versions = self.repository.list_versions_for_assets(asset_ids)
reviews = self.repository.list_reviews_for_assets(asset_ids)
spreadsheet_logs = self.audit_service.repository.list_for_resources(
resource_type=AgentAssetType.RULE.value,
resource_ids=[
item.id
for item in assets
if item.asset_type == AgentAssetType.RULE.value
and str((item.config_json or {}).get("detail_mode") or "").strip().lower()
== "spreadsheet"
],
action="edit_rule_spreadsheet",
)
working_versions = {item.id: self._resolve_working_version(item) for item in assets}
version_counts: dict[str, int] = defaultdict(int)
modified_by: dict[str, str | None] = {item.id: None for item in assets}
published_versions = {item.id: self._resolve_published_version(item) for item in assets}
published_by: dict[str, str | None] = {}
published_at: dict[str, datetime | None] = {}
spreadsheet_edit_counts: dict[str, int] = defaultdict(int)
spreadsheet_last_actor: dict[str, str | None] = {}
spreadsheet_last_changed_at: dict[str, datetime] = {}
for version in versions:
version_counts[version.asset_id] += 1
if modified_by.get(
version.asset_id
) is None and version.version == working_versions.get(version.asset_id):
modified_by[version.asset_id] = version.created_by
for review in reviews:
if review.asset_id in published_at:
continue
if review.version != published_versions.get(review.asset_id):
continue
if review.review_status != AgentReviewStatus.APPROVED.value:
continue
published_by[review.asset_id] = review.reviewer
published_at[review.asset_id] = review.reviewed_at or review.created_at
for log in spreadsheet_logs:
spreadsheet_edit_counts[log.resource_id] += 1
last_changed_at = spreadsheet_last_changed_at.get(log.resource_id)
if last_changed_at is None or log.created_at >= last_changed_at:
spreadsheet_last_changed_at[log.resource_id] = log.created_at
spreadsheet_last_actor[log.resource_id] = log.actor
return {
item.id: {
"change_count": (
spreadsheet_edit_counts.get(item.id, 0)
if item.asset_type == AgentAssetType.RULE.value
and str((item.config_json or {}).get("detail_mode") or "").strip().lower()
== "spreadsheet"
and spreadsheet_edit_counts.get(item.id, 0) > 0
else max(version_counts.get(item.id, 0) - 1, 0)
),
"modified_by": (
spreadsheet_last_actor.get(item.id)
if item.asset_type == AgentAssetType.RULE.value
and str((item.config_json or {}).get("detail_mode") or "").strip().lower()
== "spreadsheet"
and spreadsheet_last_actor.get(item.id)
else modified_by.get(item.id)
),
"published_by": published_by.get(item.id),
"published_at": published_at.get(item.id),
}
for item in assets
}
@staticmethod
def _serialize_list_item(
asset: AgentAsset,
version_stats: dict[str, int | str | None] | None = None,
) -> AgentAssetListItem:
payload = AgentAssetListItem.model_validate(asset).model_dump()
payload["change_count"] = int((version_stats or {}).get("change_count") or 0)
payload["modified_by"] = str((version_stats or {}).get("modified_by") or "").strip() or None
payload["published_by"] = (
str((version_stats or {}).get("published_by") or "").strip() or None
)
payload["published_at"] = (version_stats or {}).get("published_at")
return AgentAssetListItem.model_validate(payload)
@staticmethod
def _sort_versions(
versions: list[AgentAssetVersion], current_version: str | None
) -> list[AgentAssetVersion]:
return sorted(
versions,
key=lambda item: (item.version == current_version, item.created_at),
reverse=True,
)
@staticmethod
def _serialize_content(content: Any, content_type: str) -> str:
if content_type == AgentAssetContentType.MARKDOWN.value:
return str(content)
return json.dumps(content, ensure_ascii=False, sort_keys=True, indent=2)
@staticmethod
def _deserialize_content(version: AgentAssetVersion | None) -> Any:
if version is None:
return None
if version.content_type == AgentAssetContentType.MARKDOWN.value:
return version.content
return json.loads(version.content)
@staticmethod
def _increment_version(version: str | None) -> str:
normalized = str(version or "").strip().removeprefix("v")
parts = normalized.split(".")
if len(parts) != 3 or not all(item.isdigit() for item in parts):
return "v1.0.0"
major, minor, patch = [int(item) for item in parts]
return f"v{major}.{minor}.{patch + 1}"
@staticmethod
def _hash_bytes(content: bytes) -> str:
import hashlib
return hashlib.sha256(content).hexdigest()
@staticmethod
def _asset_snapshot(asset: AgentAsset) -> dict[str, Any]:
return {
"asset_type": asset.asset_type,
"code": asset.code,
"name": asset.name,
"status": asset.status,
"current_version": asset.current_version,
"published_version": asset.published_version,
"working_version": asset.working_version,
"domain": asset.domain,
"owner": asset.owner,
"reviewer": asset.reviewer,
}
@staticmethod
def _resolve_working_version(asset: AgentAsset) -> str:
return str(asset.working_version or asset.current_version or "").strip()
@staticmethod
def _resolve_published_version(asset: AgentAsset) -> str:
return str(asset.published_version or "").strip()
@staticmethod
def _resolve_version_lifecycle_state(
version: str,
*,
working_version: str,
published_version: str,
latest_review_status: str,
) -> str:
if version == published_version:
return "published"
if version != working_version:
return "history"
if latest_review_status == AgentReviewStatus.PENDING.value:
return "pending_review"
if latest_review_status == AgentReviewStatus.APPROVED.value:
return "approved"
if latest_review_status == AgentReviewStatus.REJECTED.value:
return "rejected"
return "draft"
def _next_available_version(self, asset: AgentAsset) -> str:
candidate = self._increment_version(self._resolve_working_version(asset))
while self.repository.get_version(asset.id, candidate) is not None:
candidate = self._increment_version(candidate)
return candidate