Files
JARVIS/development-doc/plan/forum-update/phase-f-2-forum-api.md

13 KiB
Raw Permalink Blame History

Phase F.2API 增强与安全

日期2026-04-04 状态:待开始 依赖F.1(待完成) 前置routers/forum.py


1. 本阶段目的

增强 Jarvis Forum API 的功能和安全:

  • 引入文件锁机制(并发控制)
  • 强化输入验证
  • 扩展 API 端点
  • 实现缓存机制
  • 添加限流保护

2. Forum Service 架构

2.1 目录结构

backend/app/services/
└── forum_service.py  # 新增

2.2 核心服务类

class ForumLockManager:
    """文件锁管理器 - 防止并发写入冲突"""
    
    def __init__(self, max_concurrent: int = 5, timeout: int = 10):
        self.locks: Dict[str, LockInfo] = {}
        self.max_concurrent = max_concurrent
        self.timeout = timeout
    
    async def acquire_lock(self, resource_id: str) -> bool:
        """获取锁"""
    
    def release_lock(self, resource_id: str) -> None:
        """释放锁"""
    
    def cleanup_stale_locks(self) -> None:
        """清理过期锁"""


class ForumService:
    """论坛服务主类"""
    
    def __init__(
        self,
        db: AsyncSession,
        lock_manager: ForumLockManager,
    ):
        self.db = db
        self.lock = lock_manager
    
    # === 板块操作 ===
    async def create_board(self, data: ForumBoardCreate) -> ForumBoard:
        """创建板块"""
    
    async def list_boards(self) -> List[ForumBoard]:
        """列出板块"""
    
    # === 帖子操作 ===
    async def create_post(
        self,
        user_id: str,
        data: ForumPostCreate,
    ) -> ForumPost:
        """创建帖子(带锁)"""
        async with self.lock.acquire_lock(f"post:{user_id}"):
            # 创建帖子逻辑
    
    async def get_posts(
        self,
        board_id: Optional[str] = None,
        category: Optional[str] = None,
        tags: Optional[List[str]] = None,
        page: int = 1,
        page_size: int = 20,
    ) -> PaginatedResult[ForumPostOut]:
        """分页获取帖子"""
    
    async def get_post(self, post_id: str, user_id: Optional[str] = None) -> ForumPostOut:
        """获取帖子详情(增加浏览量)"""
    
    async def update_post(
        self,
        post_id: str,
        user_id: str,
        data: ForumPostUpdate,
    ) -> ForumPost:
        """更新帖子"""
    
    async def delete_post(self, post_id: str, user_id: str) -> None:
        """删除帖子(软删除)"""
    
    async def pin_post(self, post_id: str, is_pinned: bool) -> None:
        """置顶/取消置顶"""
    
    async def lock_post(self, post_id: str, is_locked: bool) -> None:
        """锁定/解锁帖子"""
    
    # === 回复操作 ===
    async def create_reply(
        self,
        post_id: str,
        user_id: Optional[str],
        data: ForumReplyCreate,
    ) -> ForumReply:
        """创建回复(带锁)"""
    
    async def get_replies(
        self,
        post_id: str,
        page: int = 1,
        page_size: int = 50,
    ) -> PaginatedResult[ForumReplyOut]:
        """获取回复列表"""
    
    async def update_reply(
        self,
        reply_id: str,
        user_id: str,
        content: str,
    ) -> ForumReply:
        """更新回复"""
    
    async def delete_reply(self, reply_id: str, user_id: str) -> None:
        """删除回复"""
    
    # === 点赞操作 ===
    async def toggle_like(
        self,
        user_id: str,
        post_id: Optional[str] = None,
        reply_id: Optional[str] = None,
    ) -> bool:
        """切换点赞状态"""
    
    # === 标签操作 ===
    async def create_tag(self, name: str, color: str = "#666666") -> ForumTag:
        """创建标签"""
    
    async def search_tags(self, query: str) -> List[ForumTag]:
        """搜索标签"""
    
    async def add_tags_to_post(
        self,
        post_id: str,
        user_id: str,
        tags: List[str],
    ) -> None:
        """为帖子添加标签"""

3. 安全机制

3.1 输入验证

# Forum 安全配置
FORUM_CONFIG = {
    "MAX_CONTENT_LENGTH": 50000,      # 内容最大 50KB
    "MAX_TITLE_LENGTH": 200,           # 标题最大 200 字符
    "MAX_TAGS_PER_POST": 10,           # 每帖最多标签
    "MAX_REPLIES_PER_POST": 500,        # 每帖最多回复
    "MAX_POSTS_PER_USER_PER_DAY": 50,   # 每人每天最多发帖
    "MAX_REPLIES_PER_USER_PER_DAY": 200, # 每人每天最多回复
}


def sanitize_input(text: str, max_length: int) -> str:
    """清理用户输入"""
    if not text or not isinstance(text, str):
        return ""
    # 移除控制字符
    text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
    # 限制长度
    return text[:max_length]


def validate_post_data(data: ForumPostCreate) -> ForumPostCreate:
    """验证帖子数据"""
    # 标题验证
    data.title = sanitize_input(data.title, FORUM_CONFIG["MAX_TITLE_LENGTH"])
    if len(data.title.strip()) < 3:
        raise ValueError("标题至少 3 个字符")
    
    # 内容验证
    data.content = sanitize_input(data.content, FORUM_CONFIG["MAX_CONTENT_LENGTH"])
    if len(data.content.strip()) < 10:
        raise ValueError("内容至少 10 个字符")
    
    # 标签验证
    if len(data.tags) > FORUM_CONFIG["MAX_TAGS_PER_POST"]:
        raise ValueError(f"最多 {FORUM_CONFIG['MAX_TAGS_PER_POST']} 个标签")
    data.tags = [sanitize_input(t, 50) for t in data.tags]
    
    return data

3.2 并发控制

class RateLimiter:
    """简单的限流器"""
    
    def __init__(self):
        self.user_requests: Dict[str, List[datetime]] = defaultdict(list)
    
    def check_rate_limit(
        self,
        user_id: str,
        action: str,
        max_requests: int,
        window_seconds: int = 86400,
    ) -> bool:
        """检查是否超过限流"""
        now = datetime.utcnow()
        cutoff = now - timedelta(seconds=window_seconds)
        
        # 清理过期记录
        self.user_requests[user_id] = [
            t for t in self.user_requests[user_id] if t > cutoff
        ]
        
        if len(self.user_requests[user_id]) >= max_requests:
            return False
        
        self.user_requests[user_id].append(now)
        return True

3.3 缓存策略

from functools import lru_cache
from typing import Optional
import json


class ForumCache:
    """论坛缓存"""
    
    def __init__(self):
        self.cache: Dict[str, CacheEntry] = {}
        self.max_size = 1000
        self.default_ttl = 300  # 5 分钟
    
    async def get(self, key: str) -> Optional[Any]:
        """获取缓存"""
        if key in self.cache:
            entry = self.cache[key]
            if entry.is_expired():
                del self.cache[key]
            else:
                return entry.value
        return None
    
    async def set(self, key: str, value: Any, ttl: int = None) -> None:
        """设置缓存"""
        if len(self.cache) >= self.max_size:
            # LRU 清理
            oldest = min(self.cache.items(), key=lambda x: x[1].created_at)
            del self.cache[oldest[0]]
        
        self.cache[key] = CacheEntry(
            value=value,
            ttl=ttl or self.default_ttl,
        )
    
    async def invalidate(self, pattern: str) -> None:
        """清除匹配模式的缓存"""
        for key in list(self.cache.keys()):
            if pattern.format(key):
                del self.cache[key]

4. API 端点扩展

4.1 板块 API

@router.get("/boards", response_model=list[ForumBoardOut])
async def list_boards(
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """列出所有板块"""
    service = ForumService(db, lock_manager)
    return await service.list_boards()


@router.post("/boards", response_model=ForumBoardOut, status_code=201)
async def create_board(
    data: ForumBoardCreate,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """创建板块(仅管理员)"""
    if current_user.role != "admin":
        raise HTTPException(status_code=403, detail="需要管理员权限")
    service = ForumService(db, lock_manager)
    return await service.create_board(data)

4.2 帖子 API 扩展

@router.get("/posts", response_model=PaginatedResponse[ForumPostOut])
async def list_posts(
    board_id: Optional[str] = None,
    category: Optional[str] = None,
    tags: Optional[str] = None,  # comma-separated
    page: int = 1,
    page_size: int = 20,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """分页获取帖子"""
    service = ForumService(db, lock_manager)
    tag_list = tags.split(",") if tags else None
    return await service.get_posts(
        board_id=board_id,
        category=category,
        tags=tag_list,
        page=page,
        page_size=page_size,
    )


@router.post("/posts", response_model=ForumPostOut, status_code=201)
async def create_post(
    data: ForumPostCreate,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """创建帖子"""
    # 限流检查
    if not rate_limiter.check_rate_limit(
        current_user.id, "post", FORUM_CONFIG["MAX_POSTS_PER_USER_PER_DAY"]
    ):
        raise HTTPException(status_code=429, detail="今日发帖次数已达上限")
    
    # 验证数据
    data = validate_post_data(data)
    
    service = ForumService(db, lock_manager)
    return await service.create_post(current_user.id, data)


@router.patch("/posts/{post_id}/pin")
async def pin_post(
    post_id: str,
    is_pinned: bool,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """置顶/取消置顶(仅版主+"""
    service = ForumService(db, lock_manager)
    await service.pin_post(post_id, is_pinned)
    return {"success": True}


@router.patch("/posts/{post_id}/lock")
async def lock_post(
    post_id: str,
    is_locked: bool,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """锁定/解锁帖子"""
    service = ForumService(db, lock_manager)
    await service.lock_post(post_id, is_locked)
    return {"success": True}

4.3 标签 API

@router.get("/tags", response_model=list[ForumTagOut])
async def list_tags(
    query: Optional[str] = None,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """搜索/列出标签"""
    service = ForumService(db, lock_manager)
    if query:
        return await service.search_tags(query)
    return await service.list_tags()


@router.post("/posts/{post_id}/tags")
async def add_tags(
    post_id: str,
    tags: List[str],
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """为帖子添加标签"""
    service = ForumService(db, lock_manager)
    await service.add_tags_to_post(post_id, current_user.id, tags)
    return {"success": True}

4.4 点赞 API

@router.post("/like")
async def toggle_like(
    post_id: Optional[str] = None,
    reply_id: Optional[str] = None,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """切换点赞状态"""
    if not post_id and not reply_id:
        raise HTTPException(status_code=400, detail="需要指定 post_id 或 reply_id")
    
    service = ForumService(db, lock_manager)
    liked = await service.toggle_like(
        user_id=current_user.id,
        post_id=post_id,
        reply_id=reply_id,
    )
    return {"liked": liked}

5. 实现步骤

步骤 任务 优先级
1 创建 ForumLockManager 🟢
2 创建 ForumService 🟢
3 实现输入验证函数 🟢
4 实现限流器 🟡
5 实现缓存 🟡
6 扩展 Router 端点 🟢
7 单元测试 🟡

6. 核心文件变更

文件 变更
services/forum_service.py 新增
services/__init__.py 添加 export
routers/forum.py 扩展端点,集成服务

7. 工作量估算

任务 工作量
ForumLockManager 0.5 天
ForumService 核心 1 天
输入验证/限流/缓存 0.5 天
API 端点扩展 0.5 天
单元测试 0.5 天
总计 3 天

8. 验收标准

  • ForumLockManager 可正确管理并发锁
  • 输入验证可过滤危险字符
  • 限流器可防止滥用
  • 缓存可提升热门帖子读取速度
  • 所有新增 API 端点正常工作
  • 现有 API 保持向后兼容
  • 单元测试覆盖核心逻辑