Files
JARVIS/backend/app/services/scheduler_service.py

292 lines
9.2 KiB
Python
Raw Normal View History

2026-03-21 10:13:29 +08:00
"""
定时任务服务 - APScheduler 调度器
"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select, and_
from app.database import async_session
from app.models.task import Task
from app.models.forum import ForumPost
from app.models.knowledge_graph import KGNode
from app.services.agent_service import AgentService
from app.services.graph_service import GraphService
from app.config import settings
import logging
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
# ===================== 定时任务函数 =====================
async def daily_task_analysis():
"""
每日凌晨任务分析
- 分析前一天完成的任务
- 生成每日报告
- 创建次日计划建议
"""
logger.info("[Scheduler] 开始执行每日任务分析...")
async with async_session() as db:
from datetime import datetime, timedelta
yesterday = datetime.utcnow().date() - timedelta(days=1)
# 统计昨日任务完成情况
result = await db.execute(
select(Task).where(Task.updated_at >= yesterday)
)
tasks = result.scalars().all()
completed = [t for t in tasks if t.status == "done"]
pending = [t for t in tasks if t.status != "done"]
report = f"""## 每日任务报告 - {yesterday.strftime('%Y-%m-%d')}
### 完成情况
- 总任务数: {len(tasks)}
- 已完成: {len(completed)}
- 未完成: {len(pending)}
### 已完成任务
{chr(10).join([f"- {t.title}" for t in completed]) or ""}
### 未完成任务
{chr(10).join([f"- {t.title} (优先级: {t.priority})" for t in pending]) or ""}
### 建议
根据未完成任务建议明天优先处理:
{chr(10).join([f"{i+1}. {t.title}" for i, t in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5])]) or "无待处理任务"}
"""
# 发布到论坛
from app.models.forum import ForumPost
post = ForumPost(
title=f"每日报告 - {yesterday.strftime('%Y-%m-%d')}",
content=report,
category="discussion",
)
db.add(post)
# 创建明日计划建议任务
for i, task in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5]):
suggestion = Task(
title=f"继续: {task.title}",
description=f"昨日未完成任务,优先级: {task.priority}",
priority=task.priority,
status="todo",
)
db.add(suggestion)
await db.commit()
logger.info(f"[Scheduler] 每日任务分析完成,完成 {len(completed)} 个任务")
async def forum_scan_task():
"""
论坛扫描任务
- 扫描所有指令类帖子
- 识别可执行指令
- AI自动执行
"""
logger.info("[Scheduler] 开始扫描论坛指令...")
async with async_session() as db:
from sqlalchemy import select
result = await db.execute(
select(ForumPost).where(
ForumPost.category == "instruction",
ForumPost.is_executed == False,
).limit(5)
)
posts = result.scalars().all()
if not posts:
logger.info("[Scheduler] 暂无待执行指令")
return
agent_svc = AgentService(db)
executed_count = 0
for post in posts:
try:
# 让 Agent 分析并执行指令
conv_id, msg_id, response = await agent_svc.chat_simple(
user_id=post.user_id,
message=f"请执行以下论坛指令: {post.title}{post.content}",
conversation_id=None,
)
post.is_executed = True
post.executed_response = response
executed_count += 1
logger.info(f"[Scheduler] 执行指令: {post.title}")
except Exception as e:
logger.error(f"[Scheduler] 执行指令失败 {post.title}: {e}")
await db.commit()
logger.info(f"[Scheduler] 论坛扫描完成,执行了 {executed_count} 个指令")
async def graph_rebuild_task():
"""
知识图谱增量重建任务
- 扫描新增/更新的文档
- 更新图谱节点和边
"""
logger.info("[Scheduler] 开始重建知识图谱...")
async with async_session() as db:
try:
graph_svc = GraphService(db)
# 只处理最近7天有活动的文档
await graph_svc.build_graph(user_id="default", document_ids=None)
logger.info("[Scheduler] 知识图谱重建完成")
except Exception as e:
logger.error(f"[Scheduler] 知识图谱重建失败: {e}")
async def tag_generation_task():
"""
每日凌晨 00:00 增量标签生成任务
"""
from app.services.tag_service import TagService
from app.core.llm import get_llm_client
from sqlalchemy import select
logger.info("[Scheduler] 开始执行每日标签生成...")
async with async_session() as db:
try:
llm_client = get_llm_client()
tag_service = TagService(db, llm_client)
result = await db.execute(
select(KGNode.user_id).distinct().where(
KGNode.entity_type.in_(["conversation", "document", "chunk"])
)
)
user_ids = result.scalars().all()
total_tagged = 0
for user_id in user_ids:
sync_tag_service = TagService(db, llm_client)
result = sync_tag_service.tag_incremental_content(user_id, days=1)
total_tagged += result["tagged"]
logger.info(f"[Scheduler] 每日标签生成完成,共标签化 {total_tagged} 个内容节点")
except Exception as e:
logger.error(f"[Scheduler] 每日标签生成失败: {e}")
async def daily_todo_generation():
"""
每天早上 08:00 为所有活跃用户生成待办
- 来自前一天未完成的看板任务
- 来自前一天对话记录分析
"""
from app.models.user import User
from app.services.todo_service import generate_daily_todos
from sqlalchemy import select
logger.info("[Scheduler] 开始执行每日待办生成...")
async with async_session() as db:
try:
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()
for user in users:
try:
await generate_daily_todos(user.id, db)
logger.info(f"[Scheduler] 为用户 {user.id} 生成今日待办完成")
except Exception as e:
logger.error(f"[Scheduler] 用户 {user.id} 定时生成待办失败: {e}")
logger.info(f"[Scheduler] 每日待办生成完成,共处理 {len(users)} 个用户")
except Exception as e:
logger.error(f"[Scheduler] 每日待办生成失败: {e}")
# ===================== 调度器管理 =====================
def start_scheduler():
"""启动调度器,注册所有定时任务"""
if scheduler.running:
logger.warning("[Scheduler] 调度器已在运行")
return
# 每日凌晨 00:30 执行任务分析
scheduler.add_job(
daily_task_analysis,
CronTrigger(hour=0, minute=30, timezone="Asia/Shanghai"),
id="daily_task_analysis",
name="每日任务分析",
replace_existing=True,
)
# 每小时扫描论坛指令
scheduler.add_job(
forum_scan_task,
IntervalTrigger(hours=1),
id="forum_scan",
name="论坛指令扫描",
replace_existing=True,
)
# 每天凌晨 3:00 重建图谱
scheduler.add_job(
graph_rebuild_task,
CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
id="graph_rebuild",
name="知识图谱重建",
replace_existing=True,
)
# 每天凌晨 00:00 生成标签
scheduler.add_job(
tag_generation_task,
CronTrigger(hour=0, minute=0, timezone="Asia/Shanghai"),
id="tag_generation",
name="每日标签生成",
replace_existing=True,
)
# 每天早上 08:00 生成今日待办
scheduler.add_job(
daily_todo_generation,
CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"),
id="daily_todo_generation",
name="每日待办生成",
replace_existing=True,
)
scheduler.start()
logger.info("[Scheduler] 定时任务调度器已启动")
def stop_scheduler():
"""停止调度器"""
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("[Scheduler] 定时任务调度器已停止")
def get_scheduler_status() -> dict:
"""获取调度器状态"""
if not scheduler.running:
return {"status": "stopped", "jobs": []}
jobs = []
for job in scheduler.get_jobs():
jobs.append({
"id": job.id,
"name": job.name,
"next_run": str(job.next_run_time) if job.next_run_time else None,
})
return {"status": "running", "jobs": jobs}