"""Plugin API 路由 - Phase 8.6""" import os from typing import Any from fastapi import APIRouter, HTTPException from pydantic import BaseModel from app.agents.plugins import get_plugin_manager, PluginManifest router = APIRouter(prefix="/api/plugins", tags=["Plugins"]) class PluginInfo(BaseModel): """插件信息""" id: str name: str version: str description: str author: str enabled: bool main: str class PluginInstallRequest(BaseModel): """插件安装请求""" plugin_path: str class PluginListResponse(BaseModel): """插件列表响应""" plugins: list[dict[str, Any]] count: int # 全局插件市场(简单内存实现) _plugin_marketplace: list[dict[str, str]] = [] def _manifest_to_dict(manifest: PluginManifest, enabled: bool) -> dict[str, Any]: """将 PluginManifest 转换为字典""" return { "id": manifest.id, "name": manifest.name, "version": manifest.version, "description": manifest.description, "author": manifest.author, "enabled": enabled, "main": manifest.main, } @router.get("", response_model=PluginListResponse) async def list_plugins() -> PluginListResponse: """列出所有已安装的插件""" manager = get_plugin_manager() plugins = manager.list_plugins() result = [] for p in plugins: enabled = manager.is_enabled(p.id) result.append(_manifest_to_dict(p, enabled)) return PluginListResponse(plugins=result, count=len(result)) @router.get("/{plugin_id}", response_model=dict[str, Any]) async def get_plugin(plugin_id: str) -> dict[str, Any]: """获取指定插件信息""" manager = get_plugin_manager() manifest = manager.get_plugin(plugin_id) if not manifest: raise HTTPException(status_code=404, detail=f"Plugin '{plugin_id}' not found") enabled = manager.is_enabled(plugin_id) return _manifest_to_dict(manifest, enabled) @router.post("/install", response_model=dict[str, str]) async def install_plugin(request: PluginInstallRequest) -> dict[str, str]: """安装插件""" manager = get_plugin_manager() if not os.path.exists(request.plugin_path): raise HTTPException(status_code=400, detail="Plugin path does not exist") if manager.install(request.plugin_path): return {"status": "installed", "path": request.plugin_path} raise HTTPException(status_code=500, detail="Failed to install plugin") @router.post("/{plugin_id}/enable", response_model=dict[str, str]) async def enable_plugin(plugin_id: str) -> dict[str, str]: """启用插件""" manager = get_plugin_manager() if manager.enable(plugin_id): return {"status": "enabled", "plugin_id": plugin_id} raise HTTPException(status_code=404, detail=f"Plugin '{plugin_id}' not found") @router.post("/{plugin_id}/disable", response_model=dict[str, str]) async def disable_plugin(plugin_id: str) -> dict[str, str]: """禁用插件""" manager = get_plugin_manager() if manager.disable(plugin_id): return {"status": "disabled", "plugin_id": plugin_id} raise HTTPException(status_code=404, detail=f"Plugin '{plugin_id}' not found") @router.delete("/{plugin_id}", response_model=dict[str, str]) async def uninstall_plugin(plugin_id: str) -> dict[str, str]: """卸载插件""" manager = get_plugin_manager() if manager.uninstall(plugin_id): return {"status": "uninstalled", "plugin_id": plugin_id} raise HTTPException(status_code=404, detail=f"Plugin '{plugin_id}' not found") @router.post("/{plugin_id}/reload", response_model=dict[str, str]) async def reload_plugin(plugin_id: str) -> dict[str, str]: """重新加载插件""" manager = get_plugin_manager() if manager.reload(plugin_id): return {"status": "reloaded", "plugin_id": plugin_id} raise HTTPException(status_code=404, detail=f"Plugin '{plugin_id}' not found") # === Plugin Marketplace === _marketplace_router = APIRouter(prefix="/api/marketplace", tags=["Plugin Marketplace"]) @_marketplace_router.get("/plugins", response_model=dict[str, Any]) async def search_marketplace_plugins( query: str | None = None, category: str | None = None, ) -> dict[str, Any]: """搜索插件市场""" results = _plugin_marketplace if query: results = [ p for p in results if query.lower() in p.get("name", "").lower() or query.lower() in p.get("description", "").lower() ] if category: results = [p for p in results if p.get("category") == category] return {"plugins": results, "count": len(results)} @_marketplace_router.get("/plugins/{plugin_id}", response_model=dict[str, Any]) async def get_marketplace_plugin(plugin_id: str) -> dict[str, Any]: """获取市场中的插件详情""" for plugin in _plugin_marketplace: if plugin.get("id") == plugin_id: return plugin raise HTTPException(status_code=404, detail=f"Plugin '{plugin_id}' not found in marketplace") @_marketplace_router.post("/plugins", response_model=dict[str, str]) async def add_to_marketplace(plugin: dict[str, str]) -> dict[str, str]: """添加插件到市场(仅供测试/开发)""" if "id" not in plugin or "name" not in plugin: raise HTTPException(status_code=400, detail="Plugin must have id and name") # 移除已存在的同 ID 插件 global _plugin_marketplace _plugin_marketplace = [p for p in _plugin_marketplace if p.get("id") != plugin["id"]] _plugin_marketplace.append(plugin) return {"status": "added", "id": plugin["id"]}