Files
JARVIS/backend/app/services/stats_service.py

279 lines
11 KiB
Python

import psutil
import time
from datetime import datetime, timedelta
from sqlalchemy import select, func, and_
from sqlalchemy.orm import Session
from app.models.conversation import Conversation, Message
from app.models.knowledge_graph import KGNode, KGEdge
from app.models.task import Task, TaskStatus
from app.models.forum import ForumPost, ForumReply
from app.models.document import Document
class StatsService:
def __init__(self, db: Session):
self.db = db
def get_system_health(self) -> dict:
"""获取系统健康指标"""
uptime_seconds = int(time.time() - psutil.boot_time())
cpu_percent = psutil.cpu_percent(interval=0.1)
mem = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"uptime_seconds": uptime_seconds,
"cpu_percent": cpu_percent,
"memory_used_mb": round(mem.used / (1024 * 1024), 1),
"memory_total_mb": round(mem.total / (1024 * 1024), 1),
"memory_percent": mem.percent,
"disk_used_gb": round(disk.used / (1024 * 1024 * 1024), 1),
"disk_total_gb": round(disk.total / (1024 * 1024 * 1024), 1),
"disk_percent": disk.percent,
"active_users_24h": 0, # 需要 User 表的 updated_at
}
def _get_daily_stats(self, model, date_column, user_id=None, days=30) -> list:
"""通用每日统计查询"""
cutoff = datetime.utcnow() - timedelta(days=days)
query = self.db.query(
func.date(date_column).label('date'),
func.count().label('count')
).filter(date_column >= cutoff)
if user_id and hasattr(model, 'user_id'):
query = query.filter(model.user_id == user_id)
query = query.group_by(func.date(date_column)).order_by(func.date(date_column))
results = query.all()
return [{"date": str(r.date), "count": r.count} for r in results]
def get_conversation_stats(self, user_id: str = None, days=30) -> dict:
"""获取对话统计数据"""
cutoff = datetime.utcnow() - timedelta(days=days)
daily_conversations = self._get_daily_stats(
Conversation, Conversation.created_at, user_id, days
)
daily_messages = self._get_daily_stats(
Message, Message.created_at, user_id, days
)
# Daily tokens
input_query = self.db.query(
func.date(Message.created_at).label('date'),
func.coalesce(func.sum(Message.tokens_used), 0).label('tokens')
).filter(
Message.created_at >= cutoff,
Message.role == 'user'
)
if user_id:
input_query = input_query.join(Conversation).filter(Conversation.user_id == user_id)
input_results = input_query.group_by(func.date(Message.created_at)).all()
output_query = self.db.query(
func.date(Message.created_at).label('date'),
func.coalesce(func.sum(Message.tokens_used), 0).label('tokens')
).filter(
Message.created_at >= cutoff,
Message.role == 'assistant'
)
if user_id:
output_query = output_query.join(Conversation).filter(Conversation.user_id == user_id)
output_results = output_query.group_by(func.date(Message.created_at)).all()
daily_input_tokens = [{"date": str(r.date), "input_tokens": r.tokens} for r in input_results]
daily_output_tokens = [{"date": str(r.date), "output_tokens": r.tokens} for r in output_results]
return {
"daily_conversations": daily_conversations,
"daily_messages": daily_messages,
"daily_input_tokens": daily_input_tokens,
"daily_output_tokens": daily_output_tokens,
"totals": {
"conversations": sum(c["count"] for c in daily_conversations),
"messages": sum(m["count"] for m in daily_messages),
"input_tokens": sum(t["input_tokens"] for t in daily_input_tokens),
"output_tokens": sum(t["output_tokens"] for t in daily_output_tokens),
}
}
def get_knowledge_stats(self, user_id: str = None, days=30) -> dict:
"""获取知识库统计数据"""
cutoff = datetime.utcnow() - timedelta(days=days)
# New tags
tag_query = self.db.query(
func.date(KGNode.created_at).label('date'),
func.count().label('count')
).filter(
KGNode.created_at >= cutoff,
KGNode.entity_type == 'tag'
)
if user_id:
tag_query = tag_query.filter(KGNode.user_id == user_id)
tag_results = tag_query.group_by(func.date(KGNode.created_at)).all()
daily_new_tags = [{"date": str(r.date), "count": r.count} for r in tag_results]
daily_documents = self._get_daily_stats(
Document, Document.created_at, user_id, days
)
daily_tag_relations = self._get_daily_stats(
KGEdge, KGEdge.created_at, user_id, days
)
return {
"daily_new_tags": daily_new_tags,
"daily_documents": daily_documents,
"daily_knowledge_queries": [],
"daily_tag_relations": daily_tag_relations,
"totals": {
"new_tags": sum(t["count"] for t in daily_new_tags),
"documents": sum(d["count"] for d in daily_documents),
"tag_relations": sum(r["count"] for r in daily_tag_relations),
}
}
def get_kanban_stats(self, user_id: str = None, days=30) -> dict:
"""获取看板统计数据"""
daily_new_tasks = self._get_daily_stats(
Task, Task.created_at, user_id, days
)
# Completed tasks
completed_query = self.db.query(
func.date(Task.completed_at).label('date'),
func.count().label('count')
).filter(
Task.completed_at >= datetime.utcnow() - timedelta(days=days),
Task.status == TaskStatus.DONE
)
if user_id:
completed_query = completed_query.filter(Task.user_id == user_id)
completed_results = completed_query.group_by(func.date(Task.completed_at)).all()
daily_completed_tasks = [{"date": str(r.date), "count": r.count} for r in completed_results]
# Current pending
pending_query = self.db.query(func.count(Task.id)).filter(Task.status == TaskStatus.TODO)
if user_id:
pending_query = pending_query.filter(Task.user_id == user_id)
current_pending_tasks = pending_query.scalar() or 0
# Completion rate
daily_new_dict = {d["date"]: d["count"] for d in daily_new_tasks}
daily_completed_dict = {d["date"]: d["count"] for d in daily_completed_tasks}
all_dates = set(daily_new_dict.keys()) | set(daily_completed_dict.keys())
daily_completion_rate = []
for date in sorted(all_dates):
new = daily_new_dict.get(date, 0)
completed = daily_completed_dict.get(date, 0)
rate = (completed / new * 100) if new > 0 else 0
daily_completion_rate.append({"date": date, "rate": round(rate, 1)})
return {
"daily_new_tasks": daily_new_tasks,
"daily_completed_tasks": daily_completed_tasks,
"daily_completion_rate": daily_completion_rate,
"current_pending_tasks": current_pending_tasks,
"totals": {
"new_tasks": sum(t["count"] for t in daily_new_tasks),
"completed_tasks": sum(c["count"] for c in daily_completed_tasks),
}
}
def get_community_stats(self, user_id: str = None, days=30) -> dict:
"""获取社区统计数据"""
daily_posts = self._get_daily_stats(
ForumPost, ForumPost.created_at, user_id, days
)
daily_replies = self._get_daily_stats(
ForumReply, ForumReply.created_at, user_id, days
)
# AI executions
ai_query = self.db.query(
func.date(ForumPost.updated_at).label('date'),
func.count().label('count')
).filter(
ForumPost.updated_at >= datetime.utcnow() - timedelta(days=days),
ForumPost.is_executed == True
)
if user_id:
ai_query = ai_query.filter(ForumPost.user_id == user_id)
ai_results = ai_query.group_by(func.date(ForumPost.updated_at)).all()
daily_ai_executions = [{"date": str(r.date), "count": r.count} for r in ai_results]
return {
"daily_posts": daily_posts,
"daily_replies": daily_replies,
"daily_ai_executions": daily_ai_executions,
"daily_agent_calls": [],
"totals": {
"posts": sum(p["count"] for p in daily_posts),
"replies": sum(r["count"] for r in daily_replies),
"ai_executions": sum(a["count"] for a in daily_ai_executions),
}
}
def get_personal_insights(self, user_id: str) -> dict:
"""获取个人洞察"""
# Hourly activity
hourly_query = self.db.query(
func.extract('hour', Conversation.created_at).label('hour'),
func.count().label('count')
).filter(Conversation.user_id == user_id).group_by(
func.extract('hour', Conversation.created_at)
)
hourly_results = hourly_query.all()
hourly_activity = [{"hour": int(r.hour), "count": r.count} for r in hourly_results]
# Top tags
tag_query = self.db.query(
KGNode.properties_["tag_path"].astext.label('tag_path'),
func.count(KGEdge.id).label('usage_count')
).join(
KGEdge, KGEdge.target_id == KGNode.id
).filter(
KGNode.user_id == user_id,
KGNode.entity_type == 'tag',
KGEdge.relation_type == 'has_tag'
).group_by(
KGNode.properties_["tag_path"].astext
).order_by(func.count(KGEdge.id).desc()).limit(5)
top_tags = [{"tag_path": r.tag_path, "usage_count": r.usage_count} for r in tag_query.all()]
# Token trend
now = datetime.utcnow()
this_month_start = datetime(now.year, now.month, 1)
last_month_end = this_month_start - timedelta(days=1)
last_month_start = datetime(last_month_end.year, last_month_end.month, 1)
this_month_tokens = self.db.query(
func.coalesce(func.sum(Message.tokens_used), 0)
).join(Conversation).filter(
Conversation.user_id == user_id,
Message.created_at >= this_month_start,
Message.role == 'assistant'
).scalar() or 0
last_month_tokens = self.db.query(
func.coalesce(func.sum(Message.tokens_used), 0)
).join(Conversation).filter(
Conversation.user_id == user_id,
Message.created_at >= last_month_start,
Message.created_at < this_month_start,
Message.role == 'assistant'
).scalar() or 0
token_trend_percent = 0
if last_month_tokens > 0:
token_trend_percent = round((this_month_tokens - last_month_tokens) / last_month_tokens * 100, 1)
return {
"hourly_activity": hourly_activity,
"top_tags": top_tags,
"token_trend_percent": token_trend_percent,
"this_month_tokens": this_month_tokens,
"last_month_tokens": last_month_tokens,
}