""" 定时任务服务 - APScheduler 调度器 """ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy import select, and_ from app.database import async_session from app.models.task import Task from app.models.forum import ForumPost from app.models.knowledge_graph import KGNode from app.services.agent_service import AgentService from app.services.graph_service import GraphService from app.config import settings import logging logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") # ===================== M.2: 遗忘曲线任务 ===================== async def daily_forgetting_check(): """ 每日遗忘检查 (03:00) - 计算所有记忆的 decay_score - 归档 decay < 0.2 的记忆 - 降权 decay < 0.5 的记忆 """ from app.services.memory_service import process_memory_decay from sqlalchemy import select logger.info("[Scheduler] 开始执行每日遗忘检查...") async with async_session() as db: from app.models.user import User result = await db.execute(select(User).where(User.is_active == True)) users = result.scalars().all() total_archived = 0 total_deprioritized = 0 for user in users: try: decay_result = await process_memory_decay(db, user.id) total_archived += decay_result["archived"] total_deprioritized += decay_result["deprioritized"] except Exception as e: logger.error(f"[Scheduler] 用户 {user.id} 遗忘检查失败: {e}") logger.info( f"[Scheduler] 每日遗忘检查完成,归档 {total_archived} 条,降权 {total_deprioritized} 条" ) async def weekly_reinforcement_task(): """ 每周自动强化 (周一 04:00) 对 high 重要性记忆做轻量强化 """ from app.services.memory_service import process_weekly_reinforcement from sqlalchemy import select logger.info("[Scheduler] 开始执行每周强化任务...") async with async_session() as db: from app.models.user import User result = await db.execute(select(User).where(User.is_active == True)) users = result.scalars().all() total_reinforced = 0 for user in users: try: count = await process_weekly_reinforcement(db, user.id) total_reinforced += count except Exception as e: logger.error(f"[Scheduler] 用户 {user.id} 强化任务失败: {e}") logger.info(f"[Scheduler] 每周强化完成,共强化 {total_reinforced} 条记忆") # ===================== M.3: 主动提醒任务 ===================== async def daily_digest_generation(): """ 每日摘要生成 (22:00) 为所有活跃用户生成每日摘要 """ from app.services.memory.daily_digest import DailyDigestGenerator from sqlalchemy import select logger.info("[Scheduler] 开始执行每日摘要生成...") async with async_session() as db: from app.models.user import User result = await db.execute(select(User).where(User.is_active == True)) users = result.scalars().all() generated = 0 generator = DailyDigestGenerator() for user in users: try: from datetime import date digest = await generator.generate(db, user.id, target_date=date.today()) # In production, would save digest to database generated += 1 except Exception as e: logger.error(f"[Scheduler] 用户 {user.id} 摘要生成失败: {e}") logger.info(f"[Scheduler] 每日摘要生成完成,共生成 {generated} 条") async def reminder_check_task(): """ 提醒检查 (每15分钟) 检查到期的提醒并标记为 sent """ from sqlalchemy import select logger.info("[Scheduler] 开始检查到期提醒...") async with async_session() as db: from app.models.reminder import Reminder from app.services.memory.reminder_scheduler import ReminderScheduler scheduler = ReminderScheduler() result = await db.execute( select(Reminder).where( Reminder.status == "pending", ) ) reminders = result.scalars().all() sent_count = 0 for reminder in reminders: try: due = await scheduler.get_due_reminders(db, reminder.user_id) for due_reminder in due: await scheduler.mark_sent(db, due_reminder.id) sent_count += 1 except Exception as e: logger.error(f"[Scheduler] 提醒检查失败: {e}") if sent_count > 0: logger.info(f"[Scheduler] 提醒检查完成,发送 {sent_count} 条提醒") async def daily_task_analysis(): """ 每日凌晨任务分析 - 分析前一天完成的任务 - 生成每日报告 - 创建次日计划建议 """ logger.info("[Scheduler] 开始执行每日任务分析...") async with async_session() as db: from datetime import UTC, datetime, timedelta yesterday = datetime.now(UTC).date() - timedelta(days=1) # 统计昨日任务完成情况 result = await db.execute(select(Task).where(Task.updated_at >= yesterday)) tasks = result.scalars().all() completed = [t for t in tasks if t.status == "done"] pending = [t for t in tasks if t.status != "done"] report = f"""## 每日任务报告 - {yesterday.strftime("%Y-%m-%d")} ### 完成情况 - 总任务数: {len(tasks)} - 已完成: {len(completed)} - 未完成: {len(pending)} ### 已完成任务 {chr(10).join([f"- {t.title}" for t in completed]) or "无"} ### 未完成任务 {chr(10).join([f"- {t.title} (优先级: {t.priority})" for t in pending]) or "无"} ### 建议 根据未完成任务,建议明天优先处理: {chr(10).join([f"{i + 1}. {t.title}" for i, t in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5])]) or "无待处理任务"} """ # 发布到论坛 from app.models.forum import ForumPost post = ForumPost( title=f"每日报告 - {yesterday.strftime('%Y-%m-%d')}", content=report, category="discussion", ) db.add(post) # 创建明日计划建议任务 for i, task in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5]): suggestion = Task( title=f"继续: {task.title}", description=f"昨日未完成任务,优先级: {task.priority}", priority=task.priority, status="todo", ) db.add(suggestion) await db.commit() logger.info(f"[Scheduler] 每日任务分析完成,完成 {len(completed)} 个任务") async def forum_scan_task(): """ 论坛扫描任务 - 扫描所有指令类帖子 - 识别可执行指令 - AI自动执行 """ logger.info("[Scheduler] 开始扫描论坛指令...") async with async_session() as db: from sqlalchemy import select result = await db.execute( select(ForumPost) .where( ForumPost.category == "instruction", ForumPost.is_executed == False, ) .limit(5) ) posts = result.scalars().all() if not posts: logger.info("[Scheduler] 暂无待执行指令") return agent_svc = AgentService(db) executed_count = 0 for post in posts: try: # 让 Agent 分析并执行指令 conv_id, msg_id, response = await agent_svc.chat_simple( user_id=post.user_id, message=f"请执行以下论坛指令: {post.title}。{post.content}", conversation_id=None, ) post.is_executed = True post.executed_response = response executed_count += 1 logger.info(f"[Scheduler] 执行指令: {post.title}") except Exception as e: logger.error(f"[Scheduler] 执行指令失败 {post.title}: {e}") await db.commit() logger.info(f"[Scheduler] 论坛扫描完成,执行了 {executed_count} 个指令") async def graph_rebuild_task(): """ 知识图谱增量重建任务 - 扫描新增/更新的文档 - 更新图谱节点和边 """ logger.info("[Scheduler] 开始重建知识图谱...") async with async_session() as db: try: graph_svc = GraphService(db) # 只处理最近7天有活动的文档 await graph_svc.build_graph(user_id="default", document_ids=None) logger.info("[Scheduler] 知识图谱重建完成") except Exception as e: logger.error(f"[Scheduler] 知识图谱重建失败: {e}") async def tag_generation_task(): """ 每日凌晨 00:00 增量标签生成任务 """ from app.services.tag_service import TagService from app.core.llm import get_llm_client from sqlalchemy import select logger.info("[Scheduler] 开始执行每日标签生成...") async with async_session() as db: try: llm_client = get_llm_client() tag_service = TagService(db, llm_client) result = await db.execute( select(KGNode.user_id) .distinct() .where(KGNode.entity_type.in_(["conversation", "document", "chunk"])) ) user_ids = result.scalars().all() total_tagged = 0 for user_id in user_ids: sync_tag_service = TagService(db, llm_client) result = sync_tag_service.tag_incremental_content(user_id, days=1) total_tagged += result["tagged"] logger.info(f"[Scheduler] 每日标签生成完成,共标签化 {total_tagged} 个内容节点") except Exception as e: logger.error(f"[Scheduler] 每日标签生成失败: {e}") async def daily_todo_generation(): """ 每天早上 08:00 为所有活跃用户生成待办 - 来自前一天未完成的看板任务 - 来自前一天对话记录分析 """ from app.models.user import User from app.services.todo_service import generate_daily_todos from sqlalchemy import select logger.info("[Scheduler] 开始执行每日待办生成...") async with async_session() as db: try: result = await db.execute(select(User).where(User.is_active == True)) users = result.scalars().all() for user in users: try: await generate_daily_todos(user.id, db) logger.info(f"[Scheduler] 为用户 {user.id} 生成今日待办完成") except Exception as e: logger.error(f"[Scheduler] 用户 {user.id} 定时生成待办失败: {e}") logger.info(f"[Scheduler] 每日待办生成完成,共处理 {len(users)} 个用户") except Exception as e: logger.error(f"[Scheduler] 每日待办生成失败: {e}") # ———— M.4: 主动记忆提取 ———— async def check_idle_conversations(): """ 每30分钟检查空闲超过30分钟的对话,提取记忆 M.4: 主动记忆提取 """ from datetime import timedelta, datetime, UTC from app.models.conversation import Conversation, Message from app.services.memory.memory_extractor import MemoryExtractor logger.info("[Scheduler] 开始检查空闲对话...") async with async_session() as db: try: # Find conversations idle > 30 minutes (no recent messages) cutoff = datetime.now(UTC) - timedelta(minutes=30) # Subquery to find last message time per conversation from sqlalchemy import func subq = ( select(Message.conversation_id, func.max(Message.created_at).label("last_message")) .group_by(Message.conversation_id) .having(func.max(Message.created_at) < cutoff) ).subquery() result = await db.execute( select(Conversation) .join(subq, Conversation.id == subq.c.conversation_id) .where(Conversation.updated_at >= datetime.now(UTC) - timedelta(hours=24)) .limit(10) ) idle_conversations = list(result.scalars().all()) extractor = MemoryExtractor() total_extracted = 0 for conv in idle_conversations: try: # Get conversation messages msg_result = await db.execute( select(Message) .where(Message.conversation_id == conv.id) .order_by(Message.created_at.desc()) .limit(10) ) messages = list(msg_result.scalars().all()) if len(messages) >= 2: new_memories = await extractor.extract_from_conversation( db, conv.user_id, conv.id, messages ) if new_memories: await extractor.save_memories(db, conv.user_id, conv.id, new_memories) total_extracted += len(new_memories) except Exception as e: logger.warning( f"[MemoryExtractor] Failed to process conversation {conv.id}: {e}" ) if total_extracted > 0: logger.info(f"[Scheduler] 空闲对话记忆提取完成,共提取 {total_extracted} 条记忆") except Exception as e: logger.error(f"[Scheduler] 空闲对话检查失败: {e}") # ===================== 调度器管理 ===================== def start_scheduler(): """启动调度器,注册所有定时任务""" if scheduler.running: logger.warning("[Scheduler] 调度器已在运行") return # 每日凌晨 00:30 执行任务分析 scheduler.add_job( daily_task_analysis, CronTrigger(hour=0, minute=30, timezone="Asia/Shanghai"), id="daily_task_analysis", name="每日任务分析", replace_existing=True, ) # 每小时扫描论坛指令 scheduler.add_job( forum_scan_task, IntervalTrigger(hours=1), id="forum_scan", name="论坛指令扫描", replace_existing=True, ) # 每天凌晨 3:00 重建图谱 scheduler.add_job( graph_rebuild_task, CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"), id="graph_rebuild", name="知识图谱重建", replace_existing=True, ) # 每天凌晨 00:00 生成标签 scheduler.add_job( tag_generation_task, CronTrigger(hour=0, minute=0, timezone="Asia/Shanghai"), id="tag_generation", name="每日标签生成", replace_existing=True, ) # 每天早上 08:00 生成今日待办 scheduler.add_job( daily_todo_generation, CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"), id="daily_todo_generation", name="每日待办生成", replace_existing=True, ) # ———— M.2: 遗忘曲线系统 ———— # 每天凌晨 03:00 执行遗忘检查 scheduler.add_job( daily_forgetting_check, CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"), id="daily_forgetting_check", name="每日遗忘检查", replace_existing=True, ) # 每周一 04:00 执行自动强化 scheduler.add_job( weekly_reinforcement_task, CronTrigger(day_of_week="mon", hour=4, minute=0, timezone="Asia/Shanghai"), id="weekly_reinforcement", name="每周记忆强化", replace_existing=True, ) # ———— M.3: 主动提醒系统 ———— # 每天 22:00 生成每日摘要 scheduler.add_job( daily_digest_generation, CronTrigger(hour=22, minute=0, timezone="Asia/Shanghai"), id="daily_digest_generation", name="每日摘要生成", replace_existing=True, ) # 每15分钟检查到期提醒 scheduler.add_job( reminder_check_task, IntervalTrigger(minutes=15), id="reminder_check", name="提醒检查", replace_existing=True, ) # ———— M.4: 主动记忆提取 ———— # 每30分钟检查空闲对话并提取记忆 scheduler.add_job( check_idle_conversations, IntervalTrigger(minutes=30), id="check_idle_conversations", name="空闲对话记忆提取", replace_existing=True, ) scheduler.start() logger.info("[Scheduler] 定时任务调度器已启动") def stop_scheduler(): """停止调度器""" if scheduler.running: scheduler.shutdown(wait=False) logger.info("[Scheduler] 定时任务调度器已停止") def get_scheduler_status() -> dict: """获取调度器状态""" if not scheduler.running: return {"status": "stopped", "jobs": []} jobs = [] for job in scheduler.get_jobs(): jobs.append( { "id": job.id, "name": job.name, "next_run": str(job.next_run_time) if job.next_run_time else None, } ) return {"status": "running", "jobs": jobs}