from __future__ import annotations from typing import Annotated from fastapi import APIRouter, BackgroundTasks, Body, Depends, Header, HTTPException, Query, status from fastapi.responses import FileResponse from sqlalchemy.orm import Session from app.api.deps import ( CurrentUserContext, get_current_user, get_db, require_platform_admin_user, require_rule_editor_user, require_rule_reviewer_user, ) from app.db.session import get_session_factory from app.schemas.agent_asset import ( AgentAssetCreate, AgentAssetListItem, AgentAssetOnlyOfficeCallbackRead, AgentAssetOnlyOfficeCallbackWrite, AgentAssetOnlyOfficeConfigRead, AgentAssetRead, AgentAssetReviewCreate, AgentAssetReviewRead, AgentAssetRiskRuleEnabledUpdate, AgentAssetRiskRuleGenerateRequest, AgentAssetRiskRuleLatestTestSummary, AgentAssetRiskRuleLevelUpdate, AgentAssetRiskRuleReportRequest, AgentAssetRiskRuleReturnRequest, AgentAssetRiskRuleSampleTestRequest, AgentAssetRiskRuleScenarioTestRequest, AgentAssetRiskRuleSimulationRead, AgentAssetRiskRuleSimulationRequest, AgentAssetRiskRuleTestRunRead, AgentAssetRuleJsonRead, AgentAssetRuleJsonWrite, AgentAssetSpreadsheetChangeRecordRead, AgentAssetUpdate, AgentAssetVersionCreate, AgentAssetVersionRead, AgentAssetVersionTimelineItemRead, ) from app.schemas.common import ErrorResponse from app.services.agent_assets import AgentAssetService from app.services.risk_rule_generation_jobs import RiskRuleGenerationJobService router = APIRouter(prefix="/agent-assets") DbSession = Annotated[Session, Depends(get_db)] ActorHeader = Annotated[ str | None, Header(description="审计操作者。未传时回退到请求体中的 owner / reviewer 或 `system`。"), ] RequestIdHeader = Annotated[ str | None, Header(description="外部请求 ID,用于串联审计日志和上游调用链。"), ] CurrentUser = Annotated[CurrentUserContext, Depends(get_current_user)] PlatformAdminUser = Annotated[CurrentUserContext, Depends(require_platform_admin_user)] RuleEditorUser = Annotated[CurrentUserContext, Depends(require_rule_editor_user)] RuleReviewerUser = Annotated[CurrentUserContext, Depends(require_rule_reviewer_user)] def _handle_asset_error(exc: Exception) -> None: if isinstance(exc, (LookupError, FileNotFoundError)): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc if isinstance(exc, PermissionError): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc if isinstance(exc, ValueError): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc raise exc def _complete_risk_rule_generation_task( asset_id: str, payload: dict, actor: str, request_id: str | None, ) -> None: db = get_session_factory()() try: body = AgentAssetRiskRuleGenerateRequest.model_validate(payload) RiskRuleGenerationJobService(db).complete_rule_asset_generation( asset_id, body, actor=actor, request_id=request_id, ) finally: db.close() @router.get( "", response_model=list[AgentAssetListItem], summary="查询 Agent 资产列表", description="按资产类型、状态、领域和关键字筛选规则、技能、MCP 与任务资产。", ) def list_agent_assets( db: DbSession, asset_type: Annotated[ str | None, Query(description="资产类型:`rule`、`skill`、`mcp`、`task`。"), ] = None, status_value: Annotated[ str | None, Query(alias="status", description="资产状态筛选。"), ] = None, domain: Annotated[ str | None, Query(description="业务领域筛选,例如 `expense`、`ar`、`ap`。"), ] = None, keyword: Annotated[ str | None, Query(description="资产编码、名称关键字模糊查询。"), ] = None, ) -> list[AgentAssetListItem]: return AgentAssetService(db).list_assets( asset_type=asset_type, status=status_value, domain=domain, keyword=keyword, ) @router.get( "/{asset_id}", response_model=AgentAssetRead, summary="读取 Agent 资产详情", description="返回资产当前版本正文、最近版本列表和最近一次审核信息。", responses={ status.HTTP_404_NOT_FOUND: { "model": ErrorResponse, "description": "资产不存在。", } }, ) def get_agent_asset(asset_id: str, db: DbSession) -> AgentAssetRead: asset = AgentAssetService(db).get_asset(asset_id) if asset is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Asset not found") return asset @router.get( "/{asset_id}/rule-json", response_model=AgentAssetRuleJsonRead, summary="读取风险规则 JSON", description="读取 JSON 风险规则资产绑定的规则文件内容。", ) def get_agent_asset_rule_json( asset_id: str, _: CurrentUser, db: DbSession, ) -> AgentAssetRuleJsonRead: try: return AgentAssetService(db).read_rule_json(asset_id) except Exception as exc: _handle_asset_error(exc) @router.get( "/{asset_id}/risk-rule-tests/latest", response_model=AgentAssetRiskRuleLatestTestSummary, summary="读取风险规则最近测试摘要", description="返回当前风险规则工作版本最近一次样例测试、场景试运行和测试报告。", ) def get_agent_asset_risk_rule_latest_test( asset_id: str, _: CurrentUser, db: DbSession, ) -> AgentAssetRiskRuleLatestTestSummary: try: return AgentAssetService(db).get_latest_risk_rule_test_summary(asset_id) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/risk-rule-tests/simulate", response_model=AgentAssetRiskRuleSimulationRead, summary="执行风险规则对话仿真", description="基于临时对话输入和附件元信息执行风险识别,不创建业务单据,不写入测试记录。", ) def simulate_agent_asset_risk_rule_test( asset_id: str, payload: AgentAssetRiskRuleSimulationRequest, _: PlatformAdminUser, db: DbSession, ) -> AgentAssetRiskRuleSimulationRead: try: return AgentAssetService(db).simulate_risk_rule_message(asset_id, payload) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/risk-rule-tests/sample", response_model=AgentAssetRiskRuleTestRunRead, summary="执行风险规则快速样例测试", description="使用人工样例或系统默认样例执行当前 JSON 风险规则,不依赖大模型判断结果。", ) def run_agent_asset_risk_rule_sample_test( asset_id: str, payload: AgentAssetRiskRuleSampleTestRequest, current_user: PlatformAdminUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRiskRuleTestRunRead: try: return AgentAssetService(db).run_risk_rule_sample_test( asset_id, payload, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/risk-rule-tests/scenario", response_model=AgentAssetRiskRuleTestRunRead, summary="执行风险规则真实场景试运行", description="按测试意图读取真实业务样本并沙盒执行风险规则,不写回业务单据。", ) def run_agent_asset_risk_rule_scenario_test( asset_id: str, payload: AgentAssetRiskRuleScenarioTestRequest, current_user: PlatformAdminUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRiskRuleTestRunRead: try: return AgentAssetService(db).run_risk_rule_scenario_test( asset_id, payload, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/risk-rule-tests/report", response_model=AgentAssetRiskRuleTestRunRead, summary="确认风险规则测试报告", description="在样例测试和真实场景试运行通过后,保存当前版本测试通过记录。", ) def confirm_agent_asset_risk_rule_test_report( asset_id: str, payload: AgentAssetRiskRuleReportRequest, current_user: PlatformAdminUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRiskRuleTestRunRead: try: return AgentAssetService(db).confirm_risk_rule_test_report( asset_id, payload, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.put( "/{asset_id}/rule-json", response_model=AgentAssetRuleJsonRead, summary="保存风险规则 JSON", description="保存 JSON 风险规则资产绑定的规则文件内容,并写入审计日志。", ) def save_agent_asset_rule_json( asset_id: str, payload: AgentAssetRuleJsonWrite, current_user: RuleEditorUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRuleJsonRead: try: return AgentAssetService(db).write_rule_json( asset_id, body=payload, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/risk-rules/generate", response_model=AgentAssetRead, status_code=status.HTTP_201_CREATED, summary="根据自然语言新建风险规则草稿", description="根据业务域、自然语言描述和风险评分模型生成 JSON 风险规则,并保存为待上线草稿资产。", ) def generate_agent_asset_risk_rule( payload: AgentAssetRiskRuleGenerateRequest, background_tasks: BackgroundTasks, current_user: RuleReviewerUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: actor = (x_actor or current_user.name or "system").strip() or "system" asset_id = RiskRuleGenerationJobService(db).enqueue_rule_asset_generation( payload, actor=actor, request_id=x_request_id, ) background_tasks.add_task( _complete_risk_rule_generation_task, asset_id, payload.model_dump(mode="json"), actor, x_request_id, ) asset = AgentAssetService(db).get_asset(asset_id) if asset is None: raise LookupError("Asset not found") return asset except Exception as exc: _handle_asset_error(exc) @router.get( "/{asset_id}/spreadsheet/onlyoffice-config", response_model=AgentAssetOnlyOfficeConfigRead, summary="读取规则 Excel 的 ONLYOFFICE 配置", description="为规则详情页中的 Excel 规则表生成 ONLYOFFICE 配置。", ) def get_agent_asset_spreadsheet_onlyoffice_config( asset_id: str, current_user: CurrentUser, db: DbSession, version: Annotated[ str | None, Query(description="兼容旧前端的可选参数;表格规则始终打开当前规则表。"), ] = None, ) -> AgentAssetOnlyOfficeConfigRead: try: return AgentAssetService(db).build_rule_spreadsheet_onlyoffice_config( asset_id, current_user, version=version, ) except Exception as exc: _handle_asset_error(exc) @router.get( "/{asset_id}/spreadsheet/content", response_class=FileResponse, summary="下载或预览规则 Excel 文件", description="返回当前规则 Excel 文件,用于浏览器预览或下载。", ) def get_agent_asset_spreadsheet_content( asset_id: str, _: CurrentUser, db: DbSession, version: Annotated[ str | None, Query(description="兼容旧前端的可选参数;不传时返回当前规则表。"), ] = None, ) -> FileResponse: try: file_path, media_type, filename = AgentAssetService(db).get_rule_spreadsheet_content( asset_id, version=version, ) except Exception as exc: _handle_asset_error(exc) return FileResponse(file_path, media_type=media_type, filename=filename) @router.get( "/{asset_id}/spreadsheet/onlyoffice/content", response_class=FileResponse, summary="供 ONLYOFFICE 读取规则 Excel 源文件", description="使用短时令牌供 ONLYOFFICE 拉取规则表源文件。", ) def get_agent_asset_spreadsheet_onlyoffice_content( asset_id: str, db: DbSession, access_token: Annotated[ str, Query(min_length=1, description="ONLYOFFICE 临时访问令牌。"), ], version: Annotated[ str | None, Query(description="兼容旧 ONLYOFFICE URL;当前表格模式不再使用。"), ] = None, ) -> FileResponse: try: service = AgentAssetService(db) service.validate_rule_spreadsheet_access_token(asset_id, access_token) file_path, media_type, filename = service.get_rule_spreadsheet_content( asset_id, version=version, ) except FileNotFoundError as exc: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc except ValueError as exc: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc)) from exc except Exception as exc: _handle_asset_error(exc) return FileResponse(file_path, media_type=media_type, filename=filename) @router.post( "/{asset_id}/spreadsheet/upload", response_model=AgentAssetRead, status_code=status.HTTP_201_CREATED, summary="上传规则 Excel 文件", description="为指定规则上传新的 Excel 文件,并记录本次表格修改。", ) def upload_agent_asset_spreadsheet( asset_id: str, content: Annotated[ bytes, Body( media_type="application/octet-stream", description="待上传的 Excel 文件二进制内容。", ), ], filename: Annotated[str, Query(min_length=1, description="原始文件名。")], current_user: RuleEditorUser, db: DbSession, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: return AgentAssetService(db).upload_rule_spreadsheet( asset_id, filename=filename, content=content, actor=current_user.name, request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/spreadsheet/import-content", response_model=AgentAssetRead, status_code=status.HTTP_201_CREATED, summary="导入规则 Excel 表格内容", description="读取上传 Excel 中的工作表内容,写回当前规则表;保留当前规则文件名与规则身份。", ) def import_agent_asset_spreadsheet_content( asset_id: str, content: Annotated[ bytes, Body( media_type="application/octet-stream", description="待导入的 Excel 文件二进制内容。", ), ], filename: Annotated[str, Query(min_length=1, description="上传文件原始文件名。")], current_user: RuleEditorUser, db: DbSession, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: return AgentAssetService(db).import_rule_spreadsheet_content( asset_id, filename=filename, content=content, actor=current_user.name, request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/spreadsheet/onlyoffice/callback", response_model=AgentAssetOnlyOfficeCallbackRead, summary="接收规则 Excel 的 ONLYOFFICE 回调", description="接收 ONLYOFFICE 回写内容,并记录本次表格修改。", ) def handle_agent_asset_spreadsheet_onlyoffice_callback( asset_id: str, payload: AgentAssetOnlyOfficeCallbackWrite, db: DbSession, version: Annotated[ str | None, Query(description="兼容旧 ONLYOFFICE 回调;当前表格模式不再使用。"), ] = None, actor_name: Annotated[ str | None, Query(description="发起编辑的用户显示名。"), ] = None, ) -> AgentAssetOnlyOfficeCallbackRead: try: AgentAssetService(db).handle_rule_spreadsheet_onlyoffice_callback( asset_id, version=version, payload=payload.model_dump(), actor_name=actor_name, ) except Exception as exc: _handle_asset_error(exc) return AgentAssetOnlyOfficeCallbackRead() @router.get( "/{asset_id}/spreadsheet/change-records", response_model=list[AgentAssetSpreadsheetChangeRecordRead], summary="读取规则表最近修改记录", description="返回最近 30 次 ONLYOFFICE 保存级修改记录,用于展示操作者、时间和具体差异。", ) def list_agent_asset_spreadsheet_change_records( asset_id: str, _: CurrentUser, db: DbSession, limit: Annotated[int, Query(ge=1, le=30, description="返回条数,最多 30 条。")] = 30, ) -> list[AgentAssetSpreadsheetChangeRecordRead]: try: return AgentAssetService(db).list_spreadsheet_change_records(asset_id, limit=limit) except Exception as exc: _handle_asset_error(exc) @router.post( "", response_model=AgentAssetRead, status_code=status.HTTP_201_CREATED, summary="创建 Agent 资产", description="创建新的规则、技能、MCP 或任务资产,并自动记录审计日志。", responses={ status.HTTP_400_BAD_REQUEST: { "model": ErrorResponse, "description": "资产编码冲突或请求字段不合法。", } }, ) def create_agent_asset( payload: AgentAssetCreate, current_user: RuleReviewerUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: return AgentAssetService(db).create_asset( payload, actor=(x_actor or current_user.name or payload.owner).strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.patch( "/{asset_id}", response_model=AgentAssetRead, summary="更新 Agent 资产", description="更新资产基础信息、当前版本、状态和配置,并写入审计日志。", responses={ status.HTTP_400_BAD_REQUEST: { "model": ErrorResponse, "description": "状态更新非法或请求字段不合法。", }, status.HTTP_404_NOT_FOUND: { "model": ErrorResponse, "description": "资产或指定版本不存在。", }, }, ) def update_agent_asset( asset_id: str, payload: AgentAssetUpdate, current_user: CurrentUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: role_codes = {item.strip() for item in current_user.role_codes} if (payload.status is not None or payload.published_version is not None) and not ( current_user.is_admin or "manager" in role_codes ): raise PermissionError("只有高级管理员或 admin 管理员可以更改规则上线状态。") return AgentAssetService(db).update_asset( asset_id, payload, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.get( "/{asset_id}/versions", response_model=list[AgentAssetVersionRead], summary="查询资产版本列表", description="返回指定资产的版本历史,默认按最近版本优先排序。", responses={ status.HTTP_404_NOT_FOUND: { "model": ErrorResponse, "description": "资产不存在。", } }, ) def list_agent_asset_versions( asset_id: str, db: DbSession, limit: Annotated[ int, Query(ge=1, le=100, description="返回版本数量上限。"), ] = 20, ) -> list[AgentAssetVersionRead]: try: return AgentAssetService(db).list_versions(asset_id, limit=limit) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/versions", response_model=AgentAssetVersionRead, status_code=status.HTTP_201_CREATED, summary="创建资产版本", description="为指定资产创建新版本;规则和任务源文件可使用 Markdown,技能与 MCP 使用 JSON 快照。", responses={ status.HTTP_400_BAD_REQUEST: { "model": ErrorResponse, "description": "版本号重复或内容类型不匹配。", }, status.HTTP_404_NOT_FOUND: { "model": ErrorResponse, "description": "资产不存在。", }, }, ) def create_agent_asset_version( asset_id: str, payload: AgentAssetVersionCreate, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetVersionRead: try: return AgentAssetService(db).create_version( asset_id, payload, actor=(x_actor or payload.created_by).strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/reviews", response_model=AgentAssetReviewRead, status_code=status.HTTP_201_CREATED, summary="创建资产审核记录", description="为指定资产版本写入审核结果,并联动更新资产状态。", responses={ status.HTTP_400_BAD_REQUEST: { "model": ErrorResponse, "description": "审核参数不合法。", }, status.HTTP_404_NOT_FOUND: { "model": ErrorResponse, "description": "资产或版本不存在。", }, }, ) def create_agent_asset_review( asset_id: str, payload: AgentAssetReviewCreate, current_user: CurrentUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetReviewRead: try: role_codes = {item.strip() for item in current_user.role_codes} if payload.review_status.value == "pending": if not (current_user.is_admin or "manager" in role_codes or "finance" in role_codes): raise PermissionError("只有财务人员或高级财务人员可以提交审核。") elif not (current_user.is_admin or "manager" in role_codes): raise PermissionError("只有高级财务人员可以审核规则。") return AgentAssetService(db).create_review( asset_id, payload, actor=(x_actor or payload.reviewer).strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/activate", response_model=AgentAssetRead, summary="激活资产当前版本", description="将资产当前版本切换为上线状态;规则资产必须已有 `approved` 审核记录。", responses={ status.HTTP_400_BAD_REQUEST: { "model": ErrorResponse, "description": "审核未通过或当前版本未设置。", }, status.HTTP_404_NOT_FOUND: { "model": ErrorResponse, "description": "资产不存在。", }, }, ) def activate_agent_asset( asset_id: str, _: RuleReviewerUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: return AgentAssetService(db).activate_asset( asset_id, actor=(x_actor or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/risk-rule-enabled", response_model=AgentAssetRead, summary="设置风险规则启用状态", description=( "高级财务人员可独立启用或停用 JSON 风险规则;停用后即使已上线也不会进入真实业务扫描。" ), ) def set_agent_asset_risk_rule_enabled( asset_id: str, payload: AgentAssetRiskRuleEnabledUpdate, current_user: RuleReviewerUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: asset = AgentAssetService(db).set_risk_rule_enabled( asset_id, enabled=payload.enabled, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) detail = AgentAssetService(db).get_asset(asset.id) if detail is None: raise LookupError("Asset not found") return detail except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/risk-rule-level", response_model=AgentAssetRead, summary="风险规则风险等级已由评分模型接管", description="风险规则等级和分数由自然语言规则评分模型生成,不再允许人工调整。", ) def set_agent_asset_risk_rule_level( asset_id: str, payload: AgentAssetRiskRuleLevelUpdate, current_user: RuleEditorUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: del asset_id, payload, current_user, db, x_actor, x_request_id raise ValueError("风险等级和分数由评分模型自动计算,不能手动修改。") except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/return", response_model=AgentAssetRiskRuleLatestTestSummary, summary="回退待审核风险规则", description="高级财务人员将待审核风险规则回退到草稿,并记录回退原因。", ) def return_agent_asset_risk_rule( asset_id: str, payload: AgentAssetRiskRuleReturnRequest, current_user: RuleReviewerUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRiskRuleLatestTestSummary: try: return AgentAssetService(db).return_risk_rule( asset_id, note=payload.note, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/publish", response_model=AgentAssetRead, summary="审核并发布风险规则", description="高级财务人员确认测试通过后,将待审核风险规则一次性审核通过并发布上线。", ) def publish_agent_asset_risk_rule( asset_id: str, current_user: RuleReviewerUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: asset = AgentAssetService(db).publish_risk_rule( asset_id, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) detail = AgentAssetService(db).get_asset(asset.id) if detail is None: raise LookupError("Asset not found") return detail except Exception as exc: _handle_asset_error(exc) @router.delete( "/{asset_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除未发布风险规则", description="仅允许删除从未发布过的 JSON 风险规则,并同步删除规则 JSON 文件。", ) def delete_agent_asset( asset_id: str, current_user: PlatformAdminUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> None: try: AgentAssetService(db).delete_unpublished_asset( asset_id, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.post( "/{asset_id}/versions/{version}/restore", response_model=AgentAssetRead, summary="基于历史版本恢复工作稿", description="复制指定历史版本内容生成新的工作版本,用于误上线后的快速恢复与重新审核。", ) def restore_agent_asset_version( asset_id: str, version: str, current_user: RuleReviewerUser, db: DbSession, x_actor: ActorHeader = None, x_request_id: RequestIdHeader = None, ) -> AgentAssetRead: try: return AgentAssetService(db).restore_version_as_working_copy( asset_id, version, actor=(x_actor or current_user.name or "system").strip() or "system", request_id=x_request_id, ) except Exception as exc: _handle_asset_error(exc) @router.get( "/{asset_id}/version-timeline", response_model=list[AgentAssetVersionTimelineItemRead], summary="读取规则版本流转时间线", description="返回规则版本创建、提交审核、审核结果和正式上线等流转事件。", ) def get_agent_asset_version_timeline( asset_id: str, _: CurrentUser, db: DbSession, ) -> list[AgentAssetVersionTimelineItemRead]: try: return AgentAssetService(db).list_version_timeline(asset_id) except Exception as exc: _handle_asset_error(exc)