279 lines
11 KiB
Python
279 lines
11 KiB
Python
import psutil
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import select, func, and_
|
|
from sqlalchemy.orm import Session
|
|
from app.models.conversation import Conversation, Message
|
|
from app.models.knowledge_graph import KGNode, KGEdge
|
|
from app.models.task import Task, TaskStatus
|
|
from app.models.forum import ForumPost, ForumReply
|
|
from app.models.document import Document
|
|
|
|
|
|
class StatsService:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def get_system_health(self) -> dict:
|
|
"""获取系统健康指标"""
|
|
uptime_seconds = int(time.time() - psutil.boot_time())
|
|
cpu_percent = psutil.cpu_percent(interval=0.1)
|
|
mem = psutil.virtual_memory()
|
|
disk = psutil.disk_usage('/')
|
|
|
|
return {
|
|
"uptime_seconds": uptime_seconds,
|
|
"cpu_percent": cpu_percent,
|
|
"memory_used_mb": round(mem.used / (1024 * 1024), 1),
|
|
"memory_total_mb": round(mem.total / (1024 * 1024), 1),
|
|
"memory_percent": mem.percent,
|
|
"disk_used_gb": round(disk.used / (1024 * 1024 * 1024), 1),
|
|
"disk_total_gb": round(disk.total / (1024 * 1024 * 1024), 1),
|
|
"disk_percent": disk.percent,
|
|
"active_users_24h": 0, # 需要 User 表的 updated_at
|
|
}
|
|
|
|
def _get_daily_stats(self, model, date_column, user_id=None, days=30) -> list:
|
|
"""通用每日统计查询"""
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
query = self.db.query(
|
|
func.date(date_column).label('date'),
|
|
func.count().label('count')
|
|
).filter(date_column >= cutoff)
|
|
|
|
if user_id and hasattr(model, 'user_id'):
|
|
query = query.filter(model.user_id == user_id)
|
|
|
|
query = query.group_by(func.date(date_column)).order_by(func.date(date_column))
|
|
results = query.all()
|
|
return [{"date": str(r.date), "count": r.count} for r in results]
|
|
|
|
def get_conversation_stats(self, user_id: str = None, days=30) -> dict:
|
|
"""获取对话统计数据"""
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
|
|
daily_conversations = self._get_daily_stats(
|
|
Conversation, Conversation.created_at, user_id, days
|
|
)
|
|
daily_messages = self._get_daily_stats(
|
|
Message, Message.created_at, user_id, days
|
|
)
|
|
|
|
# Daily tokens
|
|
input_query = self.db.query(
|
|
func.date(Message.created_at).label('date'),
|
|
func.coalesce(func.sum(Message.tokens_used), 0).label('tokens')
|
|
).filter(
|
|
Message.created_at >= cutoff,
|
|
Message.role == 'user'
|
|
)
|
|
if user_id:
|
|
input_query = input_query.join(Conversation).filter(Conversation.user_id == user_id)
|
|
input_results = input_query.group_by(func.date(Message.created_at)).all()
|
|
|
|
output_query = self.db.query(
|
|
func.date(Message.created_at).label('date'),
|
|
func.coalesce(func.sum(Message.tokens_used), 0).label('tokens')
|
|
).filter(
|
|
Message.created_at >= cutoff,
|
|
Message.role == 'assistant'
|
|
)
|
|
if user_id:
|
|
output_query = output_query.join(Conversation).filter(Conversation.user_id == user_id)
|
|
output_results = output_query.group_by(func.date(Message.created_at)).all()
|
|
|
|
daily_input_tokens = [{"date": str(r.date), "input_tokens": r.tokens} for r in input_results]
|
|
daily_output_tokens = [{"date": str(r.date), "output_tokens": r.tokens} for r in output_results]
|
|
|
|
return {
|
|
"daily_conversations": daily_conversations,
|
|
"daily_messages": daily_messages,
|
|
"daily_input_tokens": daily_input_tokens,
|
|
"daily_output_tokens": daily_output_tokens,
|
|
"totals": {
|
|
"conversations": sum(c["count"] for c in daily_conversations),
|
|
"messages": sum(m["count"] for m in daily_messages),
|
|
"input_tokens": sum(t["input_tokens"] for t in daily_input_tokens),
|
|
"output_tokens": sum(t["output_tokens"] for t in daily_output_tokens),
|
|
}
|
|
}
|
|
|
|
def get_knowledge_stats(self, user_id: str = None, days=30) -> dict:
|
|
"""获取知识库统计数据"""
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
|
|
# New tags
|
|
tag_query = self.db.query(
|
|
func.date(KGNode.created_at).label('date'),
|
|
func.count().label('count')
|
|
).filter(
|
|
KGNode.created_at >= cutoff,
|
|
KGNode.entity_type == 'tag'
|
|
)
|
|
if user_id:
|
|
tag_query = tag_query.filter(KGNode.user_id == user_id)
|
|
tag_results = tag_query.group_by(func.date(KGNode.created_at)).all()
|
|
daily_new_tags = [{"date": str(r.date), "count": r.count} for r in tag_results]
|
|
|
|
daily_documents = self._get_daily_stats(
|
|
Document, Document.created_at, user_id, days
|
|
)
|
|
daily_tag_relations = self._get_daily_stats(
|
|
KGEdge, KGEdge.created_at, user_id, days
|
|
)
|
|
|
|
return {
|
|
"daily_new_tags": daily_new_tags,
|
|
"daily_documents": daily_documents,
|
|
"daily_knowledge_queries": [],
|
|
"daily_tag_relations": daily_tag_relations,
|
|
"totals": {
|
|
"new_tags": sum(t["count"] for t in daily_new_tags),
|
|
"documents": sum(d["count"] for d in daily_documents),
|
|
"tag_relations": sum(r["count"] for r in daily_tag_relations),
|
|
}
|
|
}
|
|
|
|
def get_kanban_stats(self, user_id: str = None, days=30) -> dict:
|
|
"""获取看板统计数据"""
|
|
daily_new_tasks = self._get_daily_stats(
|
|
Task, Task.created_at, user_id, days
|
|
)
|
|
|
|
# Completed tasks
|
|
completed_query = self.db.query(
|
|
func.date(Task.completed_at).label('date'),
|
|
func.count().label('count')
|
|
).filter(
|
|
Task.completed_at >= datetime.utcnow() - timedelta(days=days),
|
|
Task.status == TaskStatus.DONE
|
|
)
|
|
if user_id:
|
|
completed_query = completed_query.filter(Task.user_id == user_id)
|
|
completed_results = completed_query.group_by(func.date(Task.completed_at)).all()
|
|
daily_completed_tasks = [{"date": str(r.date), "count": r.count} for r in completed_results]
|
|
|
|
# Current pending
|
|
pending_query = self.db.query(func.count(Task.id)).filter(Task.status == TaskStatus.TODO)
|
|
if user_id:
|
|
pending_query = pending_query.filter(Task.user_id == user_id)
|
|
current_pending_tasks = pending_query.scalar() or 0
|
|
|
|
# Completion rate
|
|
daily_new_dict = {d["date"]: d["count"] for d in daily_new_tasks}
|
|
daily_completed_dict = {d["date"]: d["count"] for d in daily_completed_tasks}
|
|
all_dates = set(daily_new_dict.keys()) | set(daily_completed_dict.keys())
|
|
daily_completion_rate = []
|
|
for date in sorted(all_dates):
|
|
new = daily_new_dict.get(date, 0)
|
|
completed = daily_completed_dict.get(date, 0)
|
|
rate = (completed / new * 100) if new > 0 else 0
|
|
daily_completion_rate.append({"date": date, "rate": round(rate, 1)})
|
|
|
|
return {
|
|
"daily_new_tasks": daily_new_tasks,
|
|
"daily_completed_tasks": daily_completed_tasks,
|
|
"daily_completion_rate": daily_completion_rate,
|
|
"current_pending_tasks": current_pending_tasks,
|
|
"totals": {
|
|
"new_tasks": sum(t["count"] for t in daily_new_tasks),
|
|
"completed_tasks": sum(c["count"] for c in daily_completed_tasks),
|
|
}
|
|
}
|
|
|
|
def get_community_stats(self, user_id: str = None, days=30) -> dict:
|
|
"""获取社区统计数据"""
|
|
daily_posts = self._get_daily_stats(
|
|
ForumPost, ForumPost.created_at, user_id, days
|
|
)
|
|
daily_replies = self._get_daily_stats(
|
|
ForumReply, ForumReply.created_at, user_id, days
|
|
)
|
|
|
|
# AI executions
|
|
ai_query = self.db.query(
|
|
func.date(ForumPost.updated_at).label('date'),
|
|
func.count().label('count')
|
|
).filter(
|
|
ForumPost.updated_at >= datetime.utcnow() - timedelta(days=days),
|
|
ForumPost.is_executed == True
|
|
)
|
|
if user_id:
|
|
ai_query = ai_query.filter(ForumPost.user_id == user_id)
|
|
ai_results = ai_query.group_by(func.date(ForumPost.updated_at)).all()
|
|
daily_ai_executions = [{"date": str(r.date), "count": r.count} for r in ai_results]
|
|
|
|
return {
|
|
"daily_posts": daily_posts,
|
|
"daily_replies": daily_replies,
|
|
"daily_ai_executions": daily_ai_executions,
|
|
"daily_agent_calls": [],
|
|
"totals": {
|
|
"posts": sum(p["count"] for p in daily_posts),
|
|
"replies": sum(r["count"] for r in daily_replies),
|
|
"ai_executions": sum(a["count"] for a in daily_ai_executions),
|
|
}
|
|
}
|
|
|
|
def get_personal_insights(self, user_id: str) -> dict:
|
|
"""获取个人洞察"""
|
|
# Hourly activity
|
|
hourly_query = self.db.query(
|
|
func.extract('hour', Conversation.created_at).label('hour'),
|
|
func.count().label('count')
|
|
).filter(Conversation.user_id == user_id).group_by(
|
|
func.extract('hour', Conversation.created_at)
|
|
)
|
|
hourly_results = hourly_query.all()
|
|
hourly_activity = [{"hour": int(r.hour), "count": r.count} for r in hourly_results]
|
|
|
|
# Top tags
|
|
tag_query = self.db.query(
|
|
KGNode.properties_["tag_path"].astext.label('tag_path'),
|
|
func.count(KGEdge.id).label('usage_count')
|
|
).join(
|
|
KGEdge, KGEdge.target_id == KGNode.id
|
|
).filter(
|
|
KGNode.user_id == user_id,
|
|
KGNode.entity_type == 'tag',
|
|
KGEdge.relation_type == 'has_tag'
|
|
).group_by(
|
|
KGNode.properties_["tag_path"].astext
|
|
).order_by(func.count(KGEdge.id).desc()).limit(5)
|
|
top_tags = [{"tag_path": r.tag_path, "usage_count": r.usage_count} for r in tag_query.all()]
|
|
|
|
# Token trend
|
|
now = datetime.utcnow()
|
|
this_month_start = datetime(now.year, now.month, 1)
|
|
last_month_end = this_month_start - timedelta(days=1)
|
|
last_month_start = datetime(last_month_end.year, last_month_end.month, 1)
|
|
|
|
this_month_tokens = self.db.query(
|
|
func.coalesce(func.sum(Message.tokens_used), 0)
|
|
).join(Conversation).filter(
|
|
Conversation.user_id == user_id,
|
|
Message.created_at >= this_month_start,
|
|
Message.role == 'assistant'
|
|
).scalar() or 0
|
|
|
|
last_month_tokens = self.db.query(
|
|
func.coalesce(func.sum(Message.tokens_used), 0)
|
|
).join(Conversation).filter(
|
|
Conversation.user_id == user_id,
|
|
Message.created_at >= last_month_start,
|
|
Message.created_at < this_month_start,
|
|
Message.role == 'assistant'
|
|
).scalar() or 0
|
|
|
|
token_trend_percent = 0
|
|
if last_month_tokens > 0:
|
|
token_trend_percent = round((this_month_tokens - last_month_tokens) / last_month_tokens * 100, 1)
|
|
|
|
return {
|
|
"hourly_activity": hourly_activity,
|
|
"top_tags": top_tags,
|
|
"token_trend_percent": token_trend_percent,
|
|
"this_month_tokens": this_month_tokens,
|
|
"last_month_tokens": last_month_tokens,
|
|
}
|