Files
JARVIS/backend/app/services/stats_service.py
DESKTOP-72TV0V4\caoxiaozhu 3ee825aa90 Add MinerU document ingestion support
Normalize uploaded documents into structured markdown, add clearer parser
errors for missing dependencies, and cover the ingestion flow with
backend tests. This also replaces deprecated UTC timestamp helpers in
the touched backend paths so the knowledge pipeline stays warning-free.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 13:42:16 +08:00

296 lines
12 KiB
Python

import time
try:
import psutil
except ModuleNotFoundError: # pragma: no cover - optional runtime dependency fallback
psutil = None
from datetime import UTC, datetime, timedelta
from sqlalchemy import select, func, and_
from sqlalchemy.orm import Session
from app.models.conversation import Conversation, Message
from app.models.knowledge_graph import KGNode, KGEdge
from app.models.task import Task, TaskStatus
from app.models.forum import ForumPost, ForumReply
from app.models.document import Document
class StatsService:
def __init__(self, db: Session):
self.db = db
def get_system_health(self) -> dict:
"""获取系统健康指标"""
if psutil is None:
return {
"uptime_seconds": 0,
"cpu_percent": 0.0,
"memory_used_mb": 0.0,
"memory_total_mb": 0.0,
"memory_percent": 0.0,
"disk_used_gb": 0.0,
"disk_total_gb": 0.0,
"disk_percent": 0.0,
"active_users_24h": 0,
}
uptime_seconds = int(time.time() - psutil.boot_time())
cpu_percent = psutil.cpu_percent(interval=0.1)
mem = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"uptime_seconds": uptime_seconds,
"cpu_percent": cpu_percent,
"memory_used_mb": round(mem.used / (1024 * 1024), 1),
"memory_total_mb": round(mem.total / (1024 * 1024), 1),
"memory_percent": mem.percent,
"disk_used_gb": round(disk.used / (1024 * 1024 * 1024), 1),
"disk_total_gb": round(disk.total / (1024 * 1024 * 1024), 1),
"disk_percent": disk.percent,
"active_users_24h": 0, # 需要 User 表的 updated_at
}
def _get_daily_stats(self, model, date_column, user_id=None, days=30) -> list:
"""通用每日统计查询"""
cutoff = datetime.now(UTC) - timedelta(days=days)
query = self.db.query(
func.date(date_column).label('date'),
func.count().label('count')
).filter(date_column >= cutoff)
if user_id and hasattr(model, 'user_id'):
query = query.filter(model.user_id == user_id)
query = query.group_by(func.date(date_column)).order_by(func.date(date_column))
results = query.all()
return [{"date": str(r.date), "count": r.count} for r in results]
def get_conversation_stats(self, user_id: str = None, days=30) -> dict:
"""获取对话统计数据"""
cutoff = datetime.now(UTC) - timedelta(days=days)
daily_conversations = self._get_daily_stats(
Conversation, Conversation.created_at, user_id, days
)
daily_messages = self._get_daily_stats(
Message, Message.created_at, user_id, days
)
# Daily tokens
input_query = self.db.query(
func.date(Message.created_at).label('date'),
func.coalesce(func.sum(Message.tokens_used), 0).label('tokens')
).filter(
Message.created_at >= cutoff,
Message.role == 'user'
)
if user_id:
input_query = input_query.join(Conversation).filter(Conversation.user_id == user_id)
input_results = input_query.group_by(func.date(Message.created_at)).all()
output_query = self.db.query(
func.date(Message.created_at).label('date'),
func.coalesce(func.sum(Message.tokens_used), 0).label('tokens')
).filter(
Message.created_at >= cutoff,
Message.role == 'assistant'
)
if user_id:
output_query = output_query.join(Conversation).filter(Conversation.user_id == user_id)
output_results = output_query.group_by(func.date(Message.created_at)).all()
daily_input_tokens = [{"date": str(r.date), "input_tokens": r.tokens} for r in input_results]
daily_output_tokens = [{"date": str(r.date), "output_tokens": r.tokens} for r in output_results]
return {
"daily_conversations": daily_conversations,
"daily_messages": daily_messages,
"daily_input_tokens": daily_input_tokens,
"daily_output_tokens": daily_output_tokens,
"totals": {
"conversations": sum(c["count"] for c in daily_conversations),
"messages": sum(m["count"] for m in daily_messages),
"input_tokens": sum(t["input_tokens"] for t in daily_input_tokens),
"output_tokens": sum(t["output_tokens"] for t in daily_output_tokens),
}
}
def get_knowledge_stats(self, user_id: str = None, days=30) -> dict:
"""获取知识库统计数据"""
cutoff = datetime.now(UTC) - timedelta(days=days)
# New tags
tag_query = self.db.query(
func.date(KGNode.created_at).label('date'),
func.count().label('count')
).filter(
KGNode.created_at >= cutoff,
KGNode.entity_type == 'tag'
)
if user_id:
tag_query = tag_query.filter(KGNode.user_id == user_id)
tag_results = tag_query.group_by(func.date(KGNode.created_at)).all()
daily_new_tags = [{"date": str(r.date), "count": r.count} for r in tag_results]
daily_documents = self._get_daily_stats(
Document, Document.created_at, user_id, days
)
daily_tag_relations = self._get_daily_stats(
KGEdge, KGEdge.created_at, user_id, days
)
return {
"daily_new_tags": daily_new_tags,
"daily_documents": daily_documents,
"daily_knowledge_queries": [],
"daily_tag_relations": daily_tag_relations,
"totals": {
"new_tags": sum(t["count"] for t in daily_new_tags),
"documents": sum(d["count"] for d in daily_documents),
"tag_relations": sum(r["count"] for r in daily_tag_relations),
}
}
def get_kanban_stats(self, user_id: str = None, days=30) -> dict:
"""获取看板统计数据"""
daily_new_tasks = self._get_daily_stats(
Task, Task.created_at, user_id, days
)
# Completed tasks
completed_query = self.db.query(
func.date(Task.completed_at).label('date'),
func.count().label('count')
).filter(
Task.completed_at >= datetime.now(UTC) - timedelta(days=days),
Task.status == TaskStatus.DONE
)
if user_id:
completed_query = completed_query.filter(Task.user_id == user_id)
completed_results = completed_query.group_by(func.date(Task.completed_at)).all()
daily_completed_tasks = [{"date": str(r.date), "count": r.count} for r in completed_results]
# Current pending
pending_query = self.db.query(func.count(Task.id)).filter(Task.status == TaskStatus.TODO)
if user_id:
pending_query = pending_query.filter(Task.user_id == user_id)
current_pending_tasks = pending_query.scalar() or 0
# Completion rate
daily_new_dict = {d["date"]: d["count"] for d in daily_new_tasks}
daily_completed_dict = {d["date"]: d["count"] for d in daily_completed_tasks}
all_dates = set(daily_new_dict.keys()) | set(daily_completed_dict.keys())
daily_completion_rate = []
for date in sorted(all_dates):
new = daily_new_dict.get(date, 0)
completed = daily_completed_dict.get(date, 0)
rate = (completed / new * 100) if new > 0 else 0
daily_completion_rate.append({"date": date, "rate": round(rate, 1)})
return {
"daily_new_tasks": daily_new_tasks,
"daily_completed_tasks": daily_completed_tasks,
"daily_completion_rate": daily_completion_rate,
"current_pending_tasks": current_pending_tasks,
"totals": {
"new_tasks": sum(t["count"] for t in daily_new_tasks),
"completed_tasks": sum(c["count"] for c in daily_completed_tasks),
}
}
def get_community_stats(self, user_id: str = None, days=30) -> dict:
"""获取社区统计数据"""
daily_posts = self._get_daily_stats(
ForumPost, ForumPost.created_at, user_id, days
)
daily_replies = self._get_daily_stats(
ForumReply, ForumReply.created_at, user_id, days
)
# AI executions
ai_query = self.db.query(
func.date(ForumPost.updated_at).label('date'),
func.count().label('count')
).filter(
ForumPost.updated_at >= datetime.now(UTC) - timedelta(days=days),
ForumPost.is_executed == True
)
if user_id:
ai_query = ai_query.filter(ForumPost.user_id == user_id)
ai_results = ai_query.group_by(func.date(ForumPost.updated_at)).all()
daily_ai_executions = [{"date": str(r.date), "count": r.count} for r in ai_results]
return {
"daily_posts": daily_posts,
"daily_replies": daily_replies,
"daily_ai_executions": daily_ai_executions,
"daily_agent_calls": [],
"totals": {
"posts": sum(p["count"] for p in daily_posts),
"replies": sum(r["count"] for r in daily_replies),
"ai_executions": sum(a["count"] for a in daily_ai_executions),
}
}
def get_personal_insights(self, user_id: str) -> dict:
"""获取个人洞察"""
# Hourly activity
hourly_query = self.db.query(
func.extract('hour', Conversation.created_at).label('hour'),
func.count().label('count')
).filter(Conversation.user_id == user_id).group_by(
func.extract('hour', Conversation.created_at)
)
hourly_results = hourly_query.all()
hourly_activity = [{"hour": int(r.hour), "count": r.count} for r in hourly_results]
# Top tags
tag_query = self.db.query(
KGNode.properties_["tag_path"].astext.label('tag_path'),
func.count(KGEdge.id).label('usage_count')
).join(
KGEdge, KGEdge.target_id == KGNode.id
).filter(
KGNode.user_id == user_id,
KGNode.entity_type == 'tag',
KGEdge.relation_type == 'has_tag'
).group_by(
KGNode.properties_["tag_path"].astext
).order_by(func.count(KGEdge.id).desc()).limit(5)
top_tags = [{"tag_path": r.tag_path, "usage_count": r.usage_count} for r in tag_query.all()]
# Token trend
now = datetime.now(UTC)
this_month_start = datetime(now.year, now.month, 1)
last_month_end = this_month_start - timedelta(days=1)
last_month_start = datetime(last_month_end.year, last_month_end.month, 1)
this_month_tokens = self.db.query(
func.coalesce(func.sum(Message.tokens_used), 0)
).join(Conversation).filter(
Conversation.user_id == user_id,
Message.created_at >= this_month_start,
Message.role == 'assistant'
).scalar() or 0
last_month_tokens = self.db.query(
func.coalesce(func.sum(Message.tokens_used), 0)
).join(Conversation).filter(
Conversation.user_id == user_id,
Message.created_at >= last_month_start,
Message.created_at < this_month_start,
Message.role == 'assistant'
).scalar() or 0
token_trend_percent = 0
if last_month_tokens > 0:
token_trend_percent = round((this_month_tokens - last_month_tokens) / last_month_tokens * 100, 1)
return {
"hourly_activity": hourly_activity,
"top_tags": top_tags,
"token_trend_percent": token_trend_percent,
"this_month_tokens": this_month_tokens,
"last_month_tokens": last_month_tokens,
}