import psutil import time from datetime import datetime, timedelta from sqlalchemy import select, func, and_ from sqlalchemy.orm import Session from app.models.conversation import Conversation, Message from app.models.knowledge_graph import KGNode, KGEdge from app.models.task import Task, TaskStatus from app.models.forum import ForumPost, ForumReply from app.models.document import Document class StatsService: def __init__(self, db: Session): self.db = db def get_system_health(self) -> dict: """获取系统健康指标""" uptime_seconds = int(time.time() - psutil.boot_time()) cpu_percent = psutil.cpu_percent(interval=0.1) mem = psutil.virtual_memory() disk = psutil.disk_usage('/') return { "uptime_seconds": uptime_seconds, "cpu_percent": cpu_percent, "memory_used_mb": round(mem.used / (1024 * 1024), 1), "memory_total_mb": round(mem.total / (1024 * 1024), 1), "memory_percent": mem.percent, "disk_used_gb": round(disk.used / (1024 * 1024 * 1024), 1), "disk_total_gb": round(disk.total / (1024 * 1024 * 1024), 1), "disk_percent": disk.percent, "active_users_24h": 0, # 需要 User 表的 updated_at } def _get_daily_stats(self, model, date_column, user_id=None, days=30) -> list: """通用每日统计查询""" cutoff = datetime.utcnow() - timedelta(days=days) query = self.db.query( func.date(date_column).label('date'), func.count().label('count') ).filter(date_column >= cutoff) if user_id and hasattr(model, 'user_id'): query = query.filter(model.user_id == user_id) query = query.group_by(func.date(date_column)).order_by(func.date(date_column)) results = query.all() return [{"date": str(r.date), "count": r.count} for r in results] def get_conversation_stats(self, user_id: str = None, days=30) -> dict: """获取对话统计数据""" cutoff = datetime.utcnow() - timedelta(days=days) daily_conversations = self._get_daily_stats( Conversation, Conversation.created_at, user_id, days ) daily_messages = self._get_daily_stats( Message, Message.created_at, user_id, days ) # Daily tokens input_query = self.db.query( func.date(Message.created_at).label('date'), func.coalesce(func.sum(Message.tokens_used), 0).label('tokens') ).filter( Message.created_at >= cutoff, Message.role == 'user' ) if user_id: input_query = input_query.join(Conversation).filter(Conversation.user_id == user_id) input_results = input_query.group_by(func.date(Message.created_at)).all() output_query = self.db.query( func.date(Message.created_at).label('date'), func.coalesce(func.sum(Message.tokens_used), 0).label('tokens') ).filter( Message.created_at >= cutoff, Message.role == 'assistant' ) if user_id: output_query = output_query.join(Conversation).filter(Conversation.user_id == user_id) output_results = output_query.group_by(func.date(Message.created_at)).all() daily_input_tokens = [{"date": str(r.date), "input_tokens": r.tokens} for r in input_results] daily_output_tokens = [{"date": str(r.date), "output_tokens": r.tokens} for r in output_results] return { "daily_conversations": daily_conversations, "daily_messages": daily_messages, "daily_input_tokens": daily_input_tokens, "daily_output_tokens": daily_output_tokens, "totals": { "conversations": sum(c["count"] for c in daily_conversations), "messages": sum(m["count"] for m in daily_messages), "input_tokens": sum(t["input_tokens"] for t in daily_input_tokens), "output_tokens": sum(t["output_tokens"] for t in daily_output_tokens), } } def get_knowledge_stats(self, user_id: str = None, days=30) -> dict: """获取知识库统计数据""" cutoff = datetime.utcnow() - timedelta(days=days) # New tags tag_query = self.db.query( func.date(KGNode.created_at).label('date'), func.count().label('count') ).filter( KGNode.created_at >= cutoff, KGNode.entity_type == 'tag' ) if user_id: tag_query = tag_query.filter(KGNode.user_id == user_id) tag_results = tag_query.group_by(func.date(KGNode.created_at)).all() daily_new_tags = [{"date": str(r.date), "count": r.count} for r in tag_results] daily_documents = self._get_daily_stats( Document, Document.created_at, user_id, days ) daily_tag_relations = self._get_daily_stats( KGEdge, KGEdge.created_at, user_id, days ) return { "daily_new_tags": daily_new_tags, "daily_documents": daily_documents, "daily_knowledge_queries": [], "daily_tag_relations": daily_tag_relations, "totals": { "new_tags": sum(t["count"] for t in daily_new_tags), "documents": sum(d["count"] for d in daily_documents), "tag_relations": sum(r["count"] for r in daily_tag_relations), } } def get_kanban_stats(self, user_id: str = None, days=30) -> dict: """获取看板统计数据""" daily_new_tasks = self._get_daily_stats( Task, Task.created_at, user_id, days ) # Completed tasks completed_query = self.db.query( func.date(Task.completed_at).label('date'), func.count().label('count') ).filter( Task.completed_at >= datetime.utcnow() - timedelta(days=days), Task.status == TaskStatus.DONE ) if user_id: completed_query = completed_query.filter(Task.user_id == user_id) completed_results = completed_query.group_by(func.date(Task.completed_at)).all() daily_completed_tasks = [{"date": str(r.date), "count": r.count} for r in completed_results] # Current pending pending_query = self.db.query(func.count(Task.id)).filter(Task.status == TaskStatus.TODO) if user_id: pending_query = pending_query.filter(Task.user_id == user_id) current_pending_tasks = pending_query.scalar() or 0 # Completion rate daily_new_dict = {d["date"]: d["count"] for d in daily_new_tasks} daily_completed_dict = {d["date"]: d["count"] for d in daily_completed_tasks} all_dates = set(daily_new_dict.keys()) | set(daily_completed_dict.keys()) daily_completion_rate = [] for date in sorted(all_dates): new = daily_new_dict.get(date, 0) completed = daily_completed_dict.get(date, 0) rate = (completed / new * 100) if new > 0 else 0 daily_completion_rate.append({"date": date, "rate": round(rate, 1)}) return { "daily_new_tasks": daily_new_tasks, "daily_completed_tasks": daily_completed_tasks, "daily_completion_rate": daily_completion_rate, "current_pending_tasks": current_pending_tasks, "totals": { "new_tasks": sum(t["count"] for t in daily_new_tasks), "completed_tasks": sum(c["count"] for c in daily_completed_tasks), } } def get_community_stats(self, user_id: str = None, days=30) -> dict: """获取社区统计数据""" daily_posts = self._get_daily_stats( ForumPost, ForumPost.created_at, user_id, days ) daily_replies = self._get_daily_stats( ForumReply, ForumReply.created_at, user_id, days ) # AI executions ai_query = self.db.query( func.date(ForumPost.updated_at).label('date'), func.count().label('count') ).filter( ForumPost.updated_at >= datetime.utcnow() - timedelta(days=days), ForumPost.is_executed == True ) if user_id: ai_query = ai_query.filter(ForumPost.user_id == user_id) ai_results = ai_query.group_by(func.date(ForumPost.updated_at)).all() daily_ai_executions = [{"date": str(r.date), "count": r.count} for r in ai_results] return { "daily_posts": daily_posts, "daily_replies": daily_replies, "daily_ai_executions": daily_ai_executions, "daily_agent_calls": [], "totals": { "posts": sum(p["count"] for p in daily_posts), "replies": sum(r["count"] for r in daily_replies), "ai_executions": sum(a["count"] for a in daily_ai_executions), } } def get_personal_insights(self, user_id: str) -> dict: """获取个人洞察""" # Hourly activity hourly_query = self.db.query( func.extract('hour', Conversation.created_at).label('hour'), func.count().label('count') ).filter(Conversation.user_id == user_id).group_by( func.extract('hour', Conversation.created_at) ) hourly_results = hourly_query.all() hourly_activity = [{"hour": int(r.hour), "count": r.count} for r in hourly_results] # Top tags tag_query = self.db.query( KGNode.properties_["tag_path"].astext.label('tag_path'), func.count(KGEdge.id).label('usage_count') ).join( KGEdge, KGEdge.target_id == KGNode.id ).filter( KGNode.user_id == user_id, KGNode.entity_type == 'tag', KGEdge.relation_type == 'has_tag' ).group_by( KGNode.properties_["tag_path"].astext ).order_by(func.count(KGEdge.id).desc()).limit(5) top_tags = [{"tag_path": r.tag_path, "usage_count": r.usage_count} for r in tag_query.all()] # Token trend now = datetime.utcnow() this_month_start = datetime(now.year, now.month, 1) last_month_end = this_month_start - timedelta(days=1) last_month_start = datetime(last_month_end.year, last_month_end.month, 1) this_month_tokens = self.db.query( func.coalesce(func.sum(Message.tokens_used), 0) ).join(Conversation).filter( Conversation.user_id == user_id, Message.created_at >= this_month_start, Message.role == 'assistant' ).scalar() or 0 last_month_tokens = self.db.query( func.coalesce(func.sum(Message.tokens_used), 0) ).join(Conversation).filter( Conversation.user_id == user_id, Message.created_at >= last_month_start, Message.created_at < this_month_start, Message.role == 'assistant' ).scalar() or 0 token_trend_percent = 0 if last_month_tokens > 0: token_trend_percent = round((this_month_tokens - last_month_tokens) / last_month_tokens * 100, 1) return { "hourly_activity": hourly_activity, "top_tags": top_tags, "token_trend_percent": token_trend_percent, "this_month_tokens": this_month_tokens, "last_month_tokens": last_month_tokens, }