# Phase F.2:API 增强与安全 日期:2026-04-04 状态:待开始 依赖:F.1(待完成) 前置:routers/forum.py --- ## 1. 本阶段目的 增强 Jarvis Forum API 的功能和安全: - 引入文件锁机制(并发控制) - 强化输入验证 - 扩展 API 端点 - 实现缓存机制 - 添加限流保护 --- ## 2. Forum Service 架构 ### 2.1 目录结构 ``` backend/app/services/ └── forum_service.py # 新增 ``` ### 2.2 核心服务类 ```python class ForumLockManager: """文件锁管理器 - 防止并发写入冲突""" def __init__(self, max_concurrent: int = 5, timeout: int = 10): self.locks: Dict[str, LockInfo] = {} self.max_concurrent = max_concurrent self.timeout = timeout async def acquire_lock(self, resource_id: str) -> bool: """获取锁""" def release_lock(self, resource_id: str) -> None: """释放锁""" def cleanup_stale_locks(self) -> None: """清理过期锁""" class ForumService: """论坛服务主类""" def __init__( self, db: AsyncSession, lock_manager: ForumLockManager, ): self.db = db self.lock = lock_manager # === 板块操作 === async def create_board(self, data: ForumBoardCreate) -> ForumBoard: """创建板块""" async def list_boards(self) -> List[ForumBoard]: """列出板块""" # === 帖子操作 === async def create_post( self, user_id: str, data: ForumPostCreate, ) -> ForumPost: """创建帖子(带锁)""" async with self.lock.acquire_lock(f"post:{user_id}"): # 创建帖子逻辑 async def get_posts( self, board_id: Optional[str] = None, category: Optional[str] = None, tags: Optional[List[str]] = None, page: int = 1, page_size: int = 20, ) -> PaginatedResult[ForumPostOut]: """分页获取帖子""" async def get_post(self, post_id: str, user_id: Optional[str] = None) -> ForumPostOut: """获取帖子详情(增加浏览量)""" async def update_post( self, post_id: str, user_id: str, data: ForumPostUpdate, ) -> ForumPost: """更新帖子""" async def delete_post(self, post_id: str, user_id: str) -> None: """删除帖子(软删除)""" async def pin_post(self, post_id: str, is_pinned: bool) -> None: """置顶/取消置顶""" async def lock_post(self, post_id: str, is_locked: bool) -> None: """锁定/解锁帖子""" # === 回复操作 === async def create_reply( self, post_id: str, user_id: Optional[str], data: ForumReplyCreate, ) -> ForumReply: """创建回复(带锁)""" async def get_replies( self, post_id: str, page: int = 1, page_size: int = 50, ) -> PaginatedResult[ForumReplyOut]: """获取回复列表""" async def update_reply( self, reply_id: str, user_id: str, content: str, ) -> ForumReply: """更新回复""" async def delete_reply(self, reply_id: str, user_id: str) -> None: """删除回复""" # === 点赞操作 === async def toggle_like( self, user_id: str, post_id: Optional[str] = None, reply_id: Optional[str] = None, ) -> bool: """切换点赞状态""" # === 标签操作 === async def create_tag(self, name: str, color: str = "#666666") -> ForumTag: """创建标签""" async def search_tags(self, query: str) -> List[ForumTag]: """搜索标签""" async def add_tags_to_post( self, post_id: str, user_id: str, tags: List[str], ) -> None: """为帖子添加标签""" ``` --- ## 3. 安全机制 ### 3.1 输入验证 ```python # Forum 安全配置 FORUM_CONFIG = { "MAX_CONTENT_LENGTH": 50000, # 内容最大 50KB "MAX_TITLE_LENGTH": 200, # 标题最大 200 字符 "MAX_TAGS_PER_POST": 10, # 每帖最多标签 "MAX_REPLIES_PER_POST": 500, # 每帖最多回复 "MAX_POSTS_PER_USER_PER_DAY": 50, # 每人每天最多发帖 "MAX_REPLIES_PER_USER_PER_DAY": 200, # 每人每天最多回复 } def sanitize_input(text: str, max_length: int) -> str: """清理用户输入""" if not text or not isinstance(text, str): return "" # 移除控制字符 text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) # 限制长度 return text[:max_length] def validate_post_data(data: ForumPostCreate) -> ForumPostCreate: """验证帖子数据""" # 标题验证 data.title = sanitize_input(data.title, FORUM_CONFIG["MAX_TITLE_LENGTH"]) if len(data.title.strip()) < 3: raise ValueError("标题至少 3 个字符") # 内容验证 data.content = sanitize_input(data.content, FORUM_CONFIG["MAX_CONTENT_LENGTH"]) if len(data.content.strip()) < 10: raise ValueError("内容至少 10 个字符") # 标签验证 if len(data.tags) > FORUM_CONFIG["MAX_TAGS_PER_POST"]: raise ValueError(f"最多 {FORUM_CONFIG['MAX_TAGS_PER_POST']} 个标签") data.tags = [sanitize_input(t, 50) for t in data.tags] return data ``` ### 3.2 并发控制 ```python class RateLimiter: """简单的限流器""" def __init__(self): self.user_requests: Dict[str, List[datetime]] = defaultdict(list) def check_rate_limit( self, user_id: str, action: str, max_requests: int, window_seconds: int = 86400, ) -> bool: """检查是否超过限流""" now = datetime.utcnow() cutoff = now - timedelta(seconds=window_seconds) # 清理过期记录 self.user_requests[user_id] = [ t for t in self.user_requests[user_id] if t > cutoff ] if len(self.user_requests[user_id]) >= max_requests: return False self.user_requests[user_id].append(now) return True ``` ### 3.3 缓存策略 ```python from functools import lru_cache from typing import Optional import json class ForumCache: """论坛缓存""" def __init__(self): self.cache: Dict[str, CacheEntry] = {} self.max_size = 1000 self.default_ttl = 300 # 5 分钟 async def get(self, key: str) -> Optional[Any]: """获取缓存""" if key in self.cache: entry = self.cache[key] if entry.is_expired(): del self.cache[key] else: return entry.value return None async def set(self, key: str, value: Any, ttl: int = None) -> None: """设置缓存""" if len(self.cache) >= self.max_size: # LRU 清理 oldest = min(self.cache.items(), key=lambda x: x[1].created_at) del self.cache[oldest[0]] self.cache[key] = CacheEntry( value=value, ttl=ttl or self.default_ttl, ) async def invalidate(self, pattern: str) -> None: """清除匹配模式的缓存""" for key in list(self.cache.keys()): if pattern.format(key): del self.cache[key] ``` --- ## 4. API 端点扩展 ### 4.1 板块 API ```python @router.get("/boards", response_model=list[ForumBoardOut]) async def list_boards( current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """列出所有板块""" service = ForumService(db, lock_manager) return await service.list_boards() @router.post("/boards", response_model=ForumBoardOut, status_code=201) async def create_board( data: ForumBoardCreate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """创建板块(仅管理员)""" if current_user.role != "admin": raise HTTPException(status_code=403, detail="需要管理员权限") service = ForumService(db, lock_manager) return await service.create_board(data) ``` ### 4.2 帖子 API 扩展 ```python @router.get("/posts", response_model=PaginatedResponse[ForumPostOut]) async def list_posts( board_id: Optional[str] = None, category: Optional[str] = None, tags: Optional[str] = None, # comma-separated page: int = 1, page_size: int = 20, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """分页获取帖子""" service = ForumService(db, lock_manager) tag_list = tags.split(",") if tags else None return await service.get_posts( board_id=board_id, category=category, tags=tag_list, page=page, page_size=page_size, ) @router.post("/posts", response_model=ForumPostOut, status_code=201) async def create_post( data: ForumPostCreate, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """创建帖子""" # 限流检查 if not rate_limiter.check_rate_limit( current_user.id, "post", FORUM_CONFIG["MAX_POSTS_PER_USER_PER_DAY"] ): raise HTTPException(status_code=429, detail="今日发帖次数已达上限") # 验证数据 data = validate_post_data(data) service = ForumService(db, lock_manager) return await service.create_post(current_user.id, data) @router.patch("/posts/{post_id}/pin") async def pin_post( post_id: str, is_pinned: bool, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """置顶/取消置顶(仅版主+)""" service = ForumService(db, lock_manager) await service.pin_post(post_id, is_pinned) return {"success": True} @router.patch("/posts/{post_id}/lock") async def lock_post( post_id: str, is_locked: bool, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """锁定/解锁帖子""" service = ForumService(db, lock_manager) await service.lock_post(post_id, is_locked) return {"success": True} ``` ### 4.3 标签 API ```python @router.get("/tags", response_model=list[ForumTagOut]) async def list_tags( query: Optional[str] = None, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """搜索/列出标签""" service = ForumService(db, lock_manager) if query: return await service.search_tags(query) return await service.list_tags() @router.post("/posts/{post_id}/tags") async def add_tags( post_id: str, tags: List[str], current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """为帖子添加标签""" service = ForumService(db, lock_manager) await service.add_tags_to_post(post_id, current_user.id, tags) return {"success": True} ``` ### 4.4 点赞 API ```python @router.post("/like") async def toggle_like( post_id: Optional[str] = None, reply_id: Optional[str] = None, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): """切换点赞状态""" if not post_id and not reply_id: raise HTTPException(status_code=400, detail="需要指定 post_id 或 reply_id") service = ForumService(db, lock_manager) liked = await service.toggle_like( user_id=current_user.id, post_id=post_id, reply_id=reply_id, ) return {"liked": liked} ``` --- ## 5. 实现步骤 | 步骤 | 任务 | 优先级 | |------|------|--------| | 1 | 创建 ForumLockManager | 🟢 高 | | 2 | 创建 ForumService | 🟢 高 | | 3 | 实现输入验证函数 | 🟢 高 | | 4 | 实现限流器 | 🟡 中 | | 5 | 实现缓存 | 🟡 中 | | 6 | 扩展 Router 端点 | 🟢 高 | | 7 | 单元测试 | 🟡 中 | --- ## 6. 核心文件变更 | 文件 | 变更 | |------|------| | `services/forum_service.py` | 新增 | | `services/__init__.py` | 添加 export | | `routers/forum.py` | 扩展端点,集成服务 | --- ## 7. 工作量估算 | 任务 | 工作量 | |------|--------| | ForumLockManager | 0.5 天 | | ForumService 核心 | 1 天 | | 输入验证/限流/缓存 | 0.5 天 | | API 端点扩展 | 0.5 天 | | 单元测试 | 0.5 天 | | **总计** | **3 天** | --- ## 8. 验收标准 - [ ] ForumLockManager 可正确管理并发锁 - [ ] 输入验证可过滤危险字符 - [ ] 限流器可防止滥用 - [ ] 缓存可提升热门帖子读取速度 - [ ] 所有新增 API 端点正常工作 - [ ] 现有 API 保持向后兼容 - [ ] 单元测试覆盖核心逻辑