# Stats Dashboard Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** 实现统计页面,展示系统健康、对话趋势、知识库、看板、社区和个人洞察等6个Tab的指标数据。 **Architecture:** - 后端:6个统计API端点,按模块分组 - 前端:StatsView.vue 包含 6 个 Tab,使用 ECharts 渲染折线图 - 数据聚合:SQL GROUP BY date_trunc('day') **Tech Stack:** FastAPI, SQLAlchemy, ECharts, Vue 3, Element Plus --- ## File Structure ``` backend/app/ ├── routers/ │ └── stats.py # 新建: 统计 API 路由 ├── services/ │ └── stats_service.py # 新建: 统计服务 └── schemas/ └── stats.py # 新建: 统计 Schema frontend/src/ ├── api/ │ └── stats.ts # 新建: 统计 API ├── views/ │ └── StatsView.vue # 新建: 统计页面 └── router/ └── index.ts # 修改: 添加 /stats 路由 ``` --- ## Task 1: Create Stats Schema **Files:** - Create: `backend/app/schemas/stats.py` - [ ] **Step 1: Create stats schemas** ```python # backend/app/schemas/stats.py from pydantic import BaseModel from typing import Optional from datetime import datetime # ===== System Health ===== class SystemHealth(BaseModel): uptime_seconds: int cpu_percent: float memory_used_mb: float memory_total_mb: float memory_percent: float disk_used_gb: float disk_total_gb: float disk_percent: float active_users_24h: int # ===== Daily Stats Base ===== class DailyStatItem(BaseModel): date: str count: int class DailyTokenStatItem(BaseModel): date: str input_tokens: int output_tokens: int # ===== Conversation Stats ===== class ConversationStats(BaseModel): daily_conversations: list[DailyStatItem] daily_messages: list[DailyStatItem] daily_input_tokens: list[DailyTokenStatItem] daily_output_tokens: list[DailyTokenStatItem] totals: dict # ===== Knowledge Stats ===== class KnowledgeStats(BaseModel): daily_new_tags: list[DailyStatItem] daily_documents: list[DailyStatItem] daily_knowledge_queries: list[DailyStatItem] daily_tag_relations: list[DailyStatItem] totals: dict # ===== Kanban Stats ===== class KanbanStats(BaseModel): daily_new_tasks: list[DailyStatItem] daily_completed_tasks: list[DailyStatItem] daily_completion_rate: list[DailyStatItem] current_pending_tasks: int totals: dict # ===== Community Stats ===== class CommunityStats(BaseModel): daily_posts: list[DailyStatItem] daily_replies: list[DailyStatItem] daily_ai_executions: list[DailyStatItem] daily_agent_calls: list[DailyStatItem] totals: dict # ===== Personal Insights ===== class HourlyActivity(BaseModel): hour: int count: int class TagUsage(BaseModel): tag_path: str usage_count: int class PersonalInsights(BaseModel): hourly_activity: list[HourlyActivity] top_tags: list[TagUsage] token_trend_percent: float this_month_tokens: int last_month_tokens: int ``` --- ## Task 2: Create Stats Service **Files:** - Create: `backend/app/services/stats_service.py` - [ ] **Step 1: Create stats service** ```python # backend/app/services/stats_service.py import psutil import time from datetime import datetime, timedelta from sqlalchemy import select, func, and_ from sqlalchemy.orm import Session from app.models.conversation import Conversation, Message from app.models.knowledge_graph import KGNode, KGEdge from app.models.task import Task, TaskStatus from app.models.forum import ForumPost, ForumReply from app.models.document import Document from app.models.user import User class StatsService: def __init__(self, db: Session): self.db = db def get_system_health(self) -> dict: """获取系统健康指标""" # Uptime (假设进程启动时间) uptime_seconds = int(time.time() - psutil.boot_time()) # CPU cpu_percent = psutil.cpu_percent(interval=0.1) # Memory mem = psutil.virtual_memory() memory_used_mb = mem.used / (1024 * 1024) memory_total_mb = mem.total / (1024 * 1024) memory_percent = mem.percent # Disk disk = psutil.disk_usage('/') disk_used_gb = disk.used / (1024 * 1024 * 1024) disk_total_gb = disk.total / (1024 * 1024 * 1024) disk_percent = disk.percent # Active users (24h) yesterday = datetime.utcnow() - timedelta(days=1) active_users = self.db.query(func.count(func.distinct(User.id))).filter( User.updated_at >= yesterday ).scalar() or 0 return { "uptime_seconds": uptime_seconds, "cpu_percent": cpu_percent, "memory_used_mb": round(memory_used_mb, 1), "memory_total_mb": round(memory_total_mb, 1), "memory_percent": memory_percent, "disk_used_gb": round(disk_used_gb, 1), "disk_total_gb": round(disk_total_gb, 1), "disk_percent": disk_percent, "active_users_24h": active_users, } def _get_daily_stats(self, model, date_column, user_id=None, days=30) -> list: """通用每日统计查询""" cutoff = datetime.utcnow() - timedelta(days=days) query = self.db.query( func.date(date_column).label('date'), func.count().label('count') ).filter(date_column >= cutoff) if user_id: query = query.filter(model.user_id == user_id) query = query.group_by(func.date(date_column)).order_by(func.date(date_column)) results = query.all() return [{"date": str(r.date), "count": r.count} for r in results] def get_conversation_stats(self, user_id: str = None, days=30) -> dict: """获取对话统计数据""" cutoff = datetime.utcnow() - timedelta(days=days) # Daily conversations daily_conversations = self._get_daily_stats( Conversation, Conversation.created_at, user_id, days ) # Daily messages daily_messages = self._get_daily_stats( Message, Message.created_at, user_id, days ) # Daily tokens (input vs output - approximated by role) input_query = self.db.query( func.date(Message.created_at).label('date'), func.coalesce(func.sum(Message.tokens_used), 0).label('tokens') ).filter( Message.created_at >= cutoff, Message.role == 'user' ) if user_id: input_query = input_query.join(Conversation).filter(Conversation.user_id == user_id) input_query = input_query.group_by(func.date(Message.created_at)) input_results = input_query.all() output_query = self.db.query( func.date(Message.created_at).label('date'), func.coalesce(func.sum(Message.tokens_used), 0).label('tokens') ).filter( Message.created_at >= cutoff, Message.role == 'assistant' ) if user_id: output_query = output_query.join(Conversation).filter(Conversation.user_id == user_id) output_query = output_query.group_by(func.date(Message.created_at)) output_results = output_query.all() daily_input_tokens = [ {"date": str(r.date), "input_tokens": r.tokens} for r in input_results ] daily_output_tokens = [ {"date": str(r.date), "output_tokens": r.tokens} for r in output_results ] total_conversations = sum(c["count"] for c in daily_conversations) total_messages = sum(m["count"] for m in daily_messages) total_input = sum(t["input_tokens"] for t in daily_input_tokens) total_output = sum(t["output_tokens"] for t in daily_output_tokens) return { "daily_conversations": daily_conversations, "daily_messages": daily_messages, "daily_input_tokens": daily_input_tokens, "daily_output_tokens": daily_output_tokens, "totals": { "conversations": total_conversations, "messages": total_messages, "input_tokens": total_input, "output_tokens": total_output, } } def get_knowledge_stats(self, user_id: str = None, days=30) -> dict: """获取知识库统计数据""" cutoff = datetime.utcnow() - timedelta(days=days) # New tags daily_new_tags = self._get_daily_stats( KGNode, KGNode.created_at, user_id, days ) # Filter by tag type if user_id provided if user_id: tag_query = self.db.query( func.date(KGNode.created_at).label('date'), func.count().label('count') ).filter( KGNode.created_at >= cutoff, KGNode.user_id == user_id, KGNode.entity_type == 'tag' ).group_by(func.date(KGNode.created_at)) daily_new_tags = [{"date": str(r.date), "count": r.count} for r in tag_query.all()] # Documents daily_documents = self._get_daily_stats( Document, Document.created_at, user_id, days ) # Tag relations daily_tag_relations = self._get_daily_stats( KGEdge, KGEdge.created_at, user_id, days ) return { "daily_new_tags": daily_new_tags, "daily_documents": daily_documents, "daily_knowledge_queries": [], # 需要 Chroma 查询日志 "daily_tag_relations": daily_tag_relations, "totals": { "new_tags": sum(t["count"] for t in daily_new_tags), "documents": sum(d["count"] for d in daily_documents), "tag_relations": sum(r["count"] for r in daily_tag_relations), } } def get_kanban_stats(self, user_id: str = None, days=30) -> dict: """获取看板统计数据""" cutoff = datetime.utcnow() - timedelta(days=days) # New tasks daily_new_tasks = self._get_daily_stats( Task, Task.created_at, user_id, days ) # Completed tasks daily_completed = [] completed_query = self.db.query( func.date(Task.completed_at).label('date'), func.count().label('count') ).filter( Task.completed_at >= cutoff, Task.status == TaskStatus.DONE ) if user_id: completed_query = completed_query.filter(Task.user_id == user_id) completed_query = completed_query.group_by(func.date(Task.completed_at)) daily_completed = [{"date": str(r.date), "count": r.count} for r in completed_query.all()] # Current pending tasks pending_count = self.db.query(func.count(Task.id)).filter( Task.status == TaskStatus.TODO ) if user_id: pending_count = pending_count.filter(Task.user_id == user_id) current_pending = pending_count.scalar() or 0 # Completion rate (daily) daily_new_dict = {d["date"]: d["count"] for d in daily_new_tasks} daily_completed_dict = {d["date"]: d["count"] for d in daily_completed} all_dates = set(daily_new_dict.keys()) | set(daily_completed_dict.keys()) daily_completion_rate = [] for date in sorted(all_dates): new = daily_new_dict.get(date, 0) completed = daily_completed_dict.get(date, 0) rate = (completed / new * 100) if new > 0 else 0 daily_completion_rate.append({"date": date, "rate": round(rate, 1)}) return { "daily_new_tasks": daily_new_tasks, "daily_completed_tasks": daily_completed, "daily_completion_rate": daily_completion_rate, "current_pending_tasks": current_pending, "totals": { "new_tasks": sum(t["count"] for t in daily_new_tasks), "completed_tasks": sum(c["count"] for c in daily_completed), } } def get_community_stats(self, user_id: str = None, days=30) -> dict: """获取社区统计数据""" cutoff = datetime.utcnow() - timedelta(days=days) # Posts daily_posts = self._get_daily_stats( ForumPost, ForumPost.created_at, user_id, days ) # Replies daily_replies = self._get_daily_stats( ForumReply, ForumReply.created_at, user_id, days ) # AI executions daily_ai_executions = [] ai_query = self.db.query( func.date(ForumPost.updated_at).label('date'), func.count().label('count') ).filter( ForumPost.updated_at >= cutoff, ForumPost.is_executed == True ) if user_id: ai_query = ai_query.filter(ForumPost.user_id == user_id) ai_query = ai_query.group_by(func.date(ForumPost.updated_at)) daily_ai_executions = [{"date": str(r.date), "count": r.count} for r in ai_query.all()] return { "daily_posts": daily_posts, "daily_replies": daily_replies, "daily_ai_executions": daily_ai_executions, "daily_agent_calls": [], # 需要 AgentMessage 表 "totals": { "posts": sum(p["count"] for p in daily_posts), "replies": sum(r["count"] for r in daily_replies), "ai_executions": sum(a["count"] for a in daily_ai_executions), } } def get_personal_insights(self, user_id: str) -> dict: """获取个人洞察""" # Hourly activity hourly_query = self.db.query( func.extract('hour', Conversation.created_at).label('hour'), func.count().label('count') ).filter( Conversation.user_id == user_id ).group_by(func.extract('hour', Conversation.created_at)) hourly_results = hourly_query.all() hourly_activity = [{"hour": int(r.hour), "count": r.count} for r in hourly_results] # Top tags tag_query = self.db.query( KGNode.properties_["tag_path"].astext.label('tag_path'), func.count(KGEdge.id).label('usage_count') ).join( KGEdge, KGEdge.target_id == KGNode.id ).filter( KGNode.user_id == user_id, KGNode.entity_type == 'tag', KGEdge.relation_type == 'has_tag' ).group_by( KGNode.properties_["tag_path"].astext ).order_by(func.count(KGEdge.id).desc()).limit(5) top_tags = [{"tag_path": r.tag_path, "usage_count": r.usage_count} for r in tag_query.all()] # Token trend (this month vs last month) now = datetime.utcnow() this_month_start = datetime(now.year, now.month, 1) last_month_end = this_month_start - timedelta(days=1) last_month_start = datetime(last_month_end.year, last_month_end.month, 1) this_month_tokens = self.db.query( func.coalesce(func.sum(Message.tokens_used), 0) ).join(Conversation).filter( Conversation.user_id == user_id, Message.created_at >= this_month_start, Message.role == 'assistant' ).scalar() or 0 last_month_tokens = self.db.query( func.coalesce(func.sum(Message.tokens_used), 0) ).join(Conversation).filter( Conversation.user_id == user_id, Message.created_at >= last_month_start, Message.created_at < this_month_start, Message.role == 'assistant' ).scalar() or 0 token_trend = 0 if last_month_tokens > 0: token_trend = round((this_month_tokens - last_month_tokens) / last_month_tokens * 100, 1) return { "hourly_activity": hourly_activity, "top_tags": top_tags, "token_trend_percent": token_trend, "this_month_tokens": this_month_tokens, "last_month_tokens": last_month_tokens, } ``` --- ## Task 3: Create Stats Router **Files:** - Create: `backend/app/routers/stats.py` - [ ] **Step 1: Create stats router** ```python # backend/app/routers/stats.py from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from app.database import get_db from app.models.user import User from app.routers.auth import get_current_user from app.schemas.stats import ( SystemHealth, ConversationStats, KnowledgeStats, KanbanStats, CommunityStats, PersonalInsights, ) from app.services.stats_service import StatsService router = APIRouter(prefix="/api/stats", tags=["统计"]) @router.get("/system", response_model=SystemHealth) async def get_system_health(db: Session = Depends(get_db)): """获取系统健康指标""" svc = StatsService(db) return svc.get_system_health() @router.get("/conversations", response_model=ConversationStats) async def get_conversation_stats( days: int = 30, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取对话统计数据""" svc = StatsService(db) return svc.get_conversation_stats(user_id=current_user.id, days=days) @router.get("/knowledge", response_model=KnowledgeStats) async def get_knowledge_stats( days: int = 30, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取知识库统计数据""" svc = StatsService(db) return svc.get_knowledge_stats(user_id=current_user.id, days=days) @router.get("/kanban", response_model=KanbanStats) async def get_kanban_stats( days: int = 30, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取看板统计数据""" svc = StatsService(db) return svc.get_kanban_stats(user_id=current_user.id, days=days) @router.get("/community", response_model=CommunityStats) async def get_community_stats( days: int = 30, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取社区统计数据""" svc = StatsService(db) return svc.get_community_stats(user_id=current_user.id, days=days) @router.get("/insights", response_model=PersonalInsights) async def get_personal_insights( current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """获取个人洞察""" svc = StatsService(db) return svc.get_personal_insights(user_id=current_user.id) ``` - [ ] **Step 2: Register router in main app** 在 `backend/app/__init__.py` 或 `main.py` 中添加: ```python from app.routers import stats app.include_router(stats.router) ``` --- ## Task 4: Create Frontend API **Files:** - Create: `frontend/src/api/stats.ts` - [ ] **Step 1: Create stats API** ```typescript // frontend/src/api/stats.ts import axios from '@/api' export const getSystemHealth = () => axios.get('/stats/system') export const getConversationStats = (days = 30) => axios.get('/stats/conversations', { params: { days } }) export const getKnowledgeStats = (days = 30) => axios.get('/stats/knowledge', { params: { days } }) export const getKanbanStats = (days = 30) => axios.get('/stats/kanban', { params: { days } }) export const getCommunityStats = (days = 30) => axios.get('/stats/community', { params: { days } }) export const getPersonalInsights = () => axios.get('/stats/insights') ``` --- ## Task 5: Create StatsView Component **Files:** - Create: `frontend/src/views/StatsView.vue` - [ ] **Step 1: Create StatsView with 6 tabs** ```vue 数据统计 {{ formatUptime(systemHealth.uptime_seconds) }} 运行时间 {{ systemHealth.cpu_percent }}% CPU 使用率 {{ systemHealth.memory_percent }}% 内存占用 {{ systemHealth.disk_percent }}% 磁盘使用 {{ systemHealth.active_users_24h }} 活跃用户(24h) {{ conversationStats.totals.conversations }} 对话总数 {{ conversationStats.totals.messages }} 消息总数 {{ conversationStats.totals.input_tokens }} Input Tokens {{ conversationStats.totals.output_tokens }} Output Tokens {{ kanbanStats.current_pending_tasks }} 待办任务 活跃时段 常用标签 Top5 {{ tag.tag_path }} ({{ tag.usage_count }}) Token 消耗趋势 {{ personalInsights.token_trend_percent }}% 本月 vs 上月 ``` --- ## Task 6: Add Route and Navigation **Files:** - Modify: `frontend/src/router/index.ts` - Modify: `frontend/src/components/SidebarNav.vue` - [ ] **Step 1: Add route** ```typescript // frontend/src/router/index.ts { path: 'stats', name: 'stats', component: () => import('@/views/StatsView.vue'), }, ``` - [ ] **Step 2: Add navigation item** ```typescript // frontend/src/components/SidebarNav.vue // Add to navItems array: { name: '统计', path: '/stats', icon: Activity }, ``` --- ## Summary | Task | Description | Files | |------|-------------|-------| | 1 | Stats Schema | `schemas/stats.py` | | 2 | Stats Service | `services/stats_service.py` | | 3 | Stats Router | `routers/stats.py` | | 4 | Frontend API | `api/stats.ts` | | 5 | StatsView Component | `views/StatsView.vue` | | 6 | Route & Navigation | `router/index.ts`, `SidebarNav.vue` |
本月 vs 上月