Normalize uploaded documents into structured markdown, add clearer parser errors for missing dependencies, and cover the ingestion flow with backend tests. This also replaces deprecated UTC timestamp helpers in the touched backend paths so the knowledge pipeline stays warning-free. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
292 lines
9.2 KiB
Python
292 lines
9.2 KiB
Python
"""
|
|
定时任务服务 - APScheduler 调度器
|
|
"""
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from sqlalchemy import select, and_
|
|
from app.database import async_session
|
|
from app.models.task import Task
|
|
from app.models.forum import ForumPost
|
|
from app.models.knowledge_graph import KGNode
|
|
from app.services.agent_service import AgentService
|
|
from app.services.graph_service import GraphService
|
|
from app.config import settings
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
|
|
|
|
|
|
# ===================== 定时任务函数 =====================
|
|
|
|
async def daily_task_analysis():
|
|
"""
|
|
每日凌晨任务分析
|
|
- 分析前一天完成的任务
|
|
- 生成每日报告
|
|
- 创建次日计划建议
|
|
"""
|
|
logger.info("[Scheduler] 开始执行每日任务分析...")
|
|
|
|
async with async_session() as db:
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
yesterday = datetime.now(UTC).date() - timedelta(days=1)
|
|
|
|
# 统计昨日任务完成情况
|
|
result = await db.execute(
|
|
select(Task).where(Task.updated_at >= yesterday)
|
|
)
|
|
tasks = result.scalars().all()
|
|
|
|
completed = [t for t in tasks if t.status == "done"]
|
|
pending = [t for t in tasks if t.status != "done"]
|
|
|
|
report = f"""## 每日任务报告 - {yesterday.strftime('%Y-%m-%d')}
|
|
|
|
### 完成情况
|
|
- 总任务数: {len(tasks)}
|
|
- 已完成: {len(completed)}
|
|
- 未完成: {len(pending)}
|
|
|
|
### 已完成任务
|
|
{chr(10).join([f"- {t.title}" for t in completed]) or "无"}
|
|
|
|
### 未完成任务
|
|
{chr(10).join([f"- {t.title} (优先级: {t.priority})" for t in pending]) or "无"}
|
|
|
|
### 建议
|
|
根据未完成任务,建议明天优先处理:
|
|
{chr(10).join([f"{i+1}. {t.title}" for i, t in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5])]) or "无待处理任务"}
|
|
"""
|
|
|
|
# 发布到论坛
|
|
from app.models.forum import ForumPost
|
|
post = ForumPost(
|
|
title=f"每日报告 - {yesterday.strftime('%Y-%m-%d')}",
|
|
content=report,
|
|
category="discussion",
|
|
)
|
|
db.add(post)
|
|
|
|
# 创建明日计划建议任务
|
|
for i, task in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5]):
|
|
suggestion = Task(
|
|
title=f"继续: {task.title}",
|
|
description=f"昨日未完成任务,优先级: {task.priority}",
|
|
priority=task.priority,
|
|
status="todo",
|
|
)
|
|
db.add(suggestion)
|
|
|
|
await db.commit()
|
|
logger.info(f"[Scheduler] 每日任务分析完成,完成 {len(completed)} 个任务")
|
|
|
|
|
|
async def forum_scan_task():
|
|
"""
|
|
论坛扫描任务
|
|
- 扫描所有指令类帖子
|
|
- 识别可执行指令
|
|
- AI自动执行
|
|
"""
|
|
logger.info("[Scheduler] 开始扫描论坛指令...")
|
|
|
|
async with async_session() as db:
|
|
from sqlalchemy import select
|
|
result = await db.execute(
|
|
select(ForumPost).where(
|
|
ForumPost.category == "instruction",
|
|
ForumPost.is_executed == False,
|
|
).limit(5)
|
|
)
|
|
posts = result.scalars().all()
|
|
|
|
if not posts:
|
|
logger.info("[Scheduler] 暂无待执行指令")
|
|
return
|
|
|
|
agent_svc = AgentService(db)
|
|
executed_count = 0
|
|
|
|
for post in posts:
|
|
try:
|
|
# 让 Agent 分析并执行指令
|
|
conv_id, msg_id, response = await agent_svc.chat_simple(
|
|
user_id=post.user_id,
|
|
message=f"请执行以下论坛指令: {post.title}。{post.content}",
|
|
conversation_id=None,
|
|
)
|
|
post.is_executed = True
|
|
post.executed_response = response
|
|
executed_count += 1
|
|
logger.info(f"[Scheduler] 执行指令: {post.title}")
|
|
except Exception as e:
|
|
logger.error(f"[Scheduler] 执行指令失败 {post.title}: {e}")
|
|
|
|
await db.commit()
|
|
logger.info(f"[Scheduler] 论坛扫描完成,执行了 {executed_count} 个指令")
|
|
|
|
|
|
async def graph_rebuild_task():
|
|
"""
|
|
知识图谱增量重建任务
|
|
- 扫描新增/更新的文档
|
|
- 更新图谱节点和边
|
|
"""
|
|
logger.info("[Scheduler] 开始重建知识图谱...")
|
|
|
|
async with async_session() as db:
|
|
try:
|
|
graph_svc = GraphService(db)
|
|
# 只处理最近7天有活动的文档
|
|
await graph_svc.build_graph(user_id="default", document_ids=None)
|
|
logger.info("[Scheduler] 知识图谱重建完成")
|
|
except Exception as e:
|
|
logger.error(f"[Scheduler] 知识图谱重建失败: {e}")
|
|
|
|
|
|
async def tag_generation_task():
|
|
"""
|
|
每日凌晨 00:00 增量标签生成任务
|
|
"""
|
|
from app.services.tag_service import TagService
|
|
from app.core.llm import get_llm_client
|
|
from sqlalchemy import select
|
|
|
|
logger.info("[Scheduler] 开始执行每日标签生成...")
|
|
|
|
async with async_session() as db:
|
|
try:
|
|
llm_client = get_llm_client()
|
|
tag_service = TagService(db, llm_client)
|
|
|
|
result = await db.execute(
|
|
select(KGNode.user_id).distinct().where(
|
|
KGNode.entity_type.in_(["conversation", "document", "chunk"])
|
|
)
|
|
)
|
|
user_ids = result.scalars().all()
|
|
|
|
total_tagged = 0
|
|
for user_id in user_ids:
|
|
sync_tag_service = TagService(db, llm_client)
|
|
result = sync_tag_service.tag_incremental_content(user_id, days=1)
|
|
total_tagged += result["tagged"]
|
|
|
|
logger.info(f"[Scheduler] 每日标签生成完成,共标签化 {total_tagged} 个内容节点")
|
|
except Exception as e:
|
|
logger.error(f"[Scheduler] 每日标签生成失败: {e}")
|
|
|
|
|
|
async def daily_todo_generation():
|
|
"""
|
|
每天早上 08:00 为所有活跃用户生成待办
|
|
- 来自前一天未完成的看板任务
|
|
- 来自前一天对话记录分析
|
|
"""
|
|
from app.models.user import User
|
|
from app.services.todo_service import generate_daily_todos
|
|
from sqlalchemy import select
|
|
|
|
logger.info("[Scheduler] 开始执行每日待办生成...")
|
|
|
|
async with async_session() as db:
|
|
try:
|
|
result = await db.execute(select(User).where(User.is_active == True))
|
|
users = result.scalars().all()
|
|
|
|
for user in users:
|
|
try:
|
|
await generate_daily_todos(user.id, db)
|
|
logger.info(f"[Scheduler] 为用户 {user.id} 生成今日待办完成")
|
|
except Exception as e:
|
|
logger.error(f"[Scheduler] 用户 {user.id} 定时生成待办失败: {e}")
|
|
|
|
logger.info(f"[Scheduler] 每日待办生成完成,共处理 {len(users)} 个用户")
|
|
except Exception as e:
|
|
logger.error(f"[Scheduler] 每日待办生成失败: {e}")
|
|
|
|
|
|
# ===================== 调度器管理 =====================
|
|
|
|
def start_scheduler():
|
|
"""启动调度器,注册所有定时任务"""
|
|
if scheduler.running:
|
|
logger.warning("[Scheduler] 调度器已在运行")
|
|
return
|
|
|
|
# 每日凌晨 00:30 执行任务分析
|
|
scheduler.add_job(
|
|
daily_task_analysis,
|
|
CronTrigger(hour=0, minute=30, timezone="Asia/Shanghai"),
|
|
id="daily_task_analysis",
|
|
name="每日任务分析",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# 每小时扫描论坛指令
|
|
scheduler.add_job(
|
|
forum_scan_task,
|
|
IntervalTrigger(hours=1),
|
|
id="forum_scan",
|
|
name="论坛指令扫描",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# 每天凌晨 3:00 重建图谱
|
|
scheduler.add_job(
|
|
graph_rebuild_task,
|
|
CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
|
|
id="graph_rebuild",
|
|
name="知识图谱重建",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# 每天凌晨 00:00 生成标签
|
|
scheduler.add_job(
|
|
tag_generation_task,
|
|
CronTrigger(hour=0, minute=0, timezone="Asia/Shanghai"),
|
|
id="tag_generation",
|
|
name="每日标签生成",
|
|
replace_existing=True,
|
|
)
|
|
|
|
# 每天早上 08:00 生成今日待办
|
|
scheduler.add_job(
|
|
daily_todo_generation,
|
|
CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"),
|
|
id="daily_todo_generation",
|
|
name="每日待办生成",
|
|
replace_existing=True,
|
|
)
|
|
|
|
scheduler.start()
|
|
logger.info("[Scheduler] 定时任务调度器已启动")
|
|
|
|
|
|
def stop_scheduler():
|
|
"""停止调度器"""
|
|
if scheduler.running:
|
|
scheduler.shutdown(wait=False)
|
|
logger.info("[Scheduler] 定时任务调度器已停止")
|
|
|
|
|
|
def get_scheduler_status() -> dict:
|
|
"""获取调度器状态"""
|
|
if not scheduler.running:
|
|
return {"status": "stopped", "jobs": []}
|
|
|
|
jobs = []
|
|
for job in scheduler.get_jobs():
|
|
jobs.append({
|
|
"id": job.id,
|
|
"name": job.name,
|
|
"next_run": str(job.next_run_time) if job.next_run_time else None,
|
|
})
|
|
|
|
return {"status": "running", "jobs": jobs}
|