Files
JARVIS/backend/app/services/scheduler_service.py
WIN-JHFT4D3SIVT\caoxiaozhu 11160ec4d2 feat(memory): complete M.2-M.5 memory upgrade phases with tests
- M.2: ForgettingCurve, MemoryDecay, MemoryReinforcement (selective forgetting)
- M.3: DailyDigestGenerator, ReminderScheduler, ProactiveInformer (proactive reminders)
- M.4: MemoryExtractor with LLM-based memory extraction from conversations
- M.5: MemoryRecallInjector with token budget control for prompt injection
- All phases include comprehensive unit tests (109 tests passing)
- Updated checklist.md to mark all tasks complete
2026-04-05 14:09:51 +08:00

541 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
定时任务服务 - APScheduler 调度器
"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select, and_
from app.database import async_session
from app.models.task import Task
from app.models.forum import ForumPost
from app.models.knowledge_graph import KGNode
from app.services.agent_service import AgentService
from app.services.graph_service import GraphService
from app.config import settings
import logging
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
# ===================== M.2: 遗忘曲线任务 =====================
async def daily_forgetting_check():
"""
每日遗忘检查 (03:00)
- 计算所有记忆的 decay_score
- 归档 decay < 0.2 的记忆
- 降权 decay < 0.5 的记忆
"""
from app.services.memory_service import process_memory_decay
from sqlalchemy import select
logger.info("[Scheduler] 开始执行每日遗忘检查...")
async with async_session() as db:
from app.models.user import User
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()
total_archived = 0
total_deprioritized = 0
for user in users:
try:
decay_result = await process_memory_decay(db, user.id)
total_archived += decay_result["archived"]
total_deprioritized += decay_result["deprioritized"]
except Exception as e:
logger.error(f"[Scheduler] 用户 {user.id} 遗忘检查失败: {e}")
logger.info(
f"[Scheduler] 每日遗忘检查完成,归档 {total_archived} 条,降权 {total_deprioritized}"
)
async def weekly_reinforcement_task():
"""
每周自动强化 (周一 04:00)
对 high 重要性记忆做轻量强化
"""
from app.services.memory_service import process_weekly_reinforcement
from sqlalchemy import select
logger.info("[Scheduler] 开始执行每周强化任务...")
async with async_session() as db:
from app.models.user import User
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()
total_reinforced = 0
for user in users:
try:
count = await process_weekly_reinforcement(db, user.id)
total_reinforced += count
except Exception as e:
logger.error(f"[Scheduler] 用户 {user.id} 强化任务失败: {e}")
logger.info(f"[Scheduler] 每周强化完成,共强化 {total_reinforced} 条记忆")
# ===================== M.3: 主动提醒任务 =====================
async def daily_digest_generation():
"""
每日摘要生成 (22:00)
为所有活跃用户生成每日摘要
"""
from app.services.memory.daily_digest import DailyDigestGenerator
from sqlalchemy import select
logger.info("[Scheduler] 开始执行每日摘要生成...")
async with async_session() as db:
from app.models.user import User
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()
generated = 0
generator = DailyDigestGenerator()
for user in users:
try:
from datetime import date
digest = await generator.generate(db, user.id, target_date=date.today())
# In production, would save digest to database
generated += 1
except Exception as e:
logger.error(f"[Scheduler] 用户 {user.id} 摘要生成失败: {e}")
logger.info(f"[Scheduler] 每日摘要生成完成,共生成 {generated}")
async def reminder_check_task():
"""
提醒检查 (每15分钟)
检查到期的提醒并标记为 sent
"""
from sqlalchemy import select
logger.info("[Scheduler] 开始检查到期提醒...")
async with async_session() as db:
from app.models.reminder import Reminder
from app.services.memory.reminder_scheduler import ReminderScheduler
scheduler = ReminderScheduler()
result = await db.execute(
select(Reminder).where(
Reminder.status == "pending",
)
)
reminders = result.scalars().all()
sent_count = 0
for reminder in reminders:
try:
due = await scheduler.get_due_reminders(db, reminder.user_id)
for due_reminder in due:
await scheduler.mark_sent(db, due_reminder.id)
sent_count += 1
except Exception as e:
logger.error(f"[Scheduler] 提醒检查失败: {e}")
if sent_count > 0:
logger.info(f"[Scheduler] 提醒检查完成,发送 {sent_count} 条提醒")
async def daily_task_analysis():
"""
每日凌晨任务分析
- 分析前一天完成的任务
- 生成每日报告
- 创建次日计划建议
"""
logger.info("[Scheduler] 开始执行每日任务分析...")
async with async_session() as db:
from datetime import UTC, datetime, timedelta
yesterday = datetime.now(UTC).date() - timedelta(days=1)
# 统计昨日任务完成情况
result = await db.execute(select(Task).where(Task.updated_at >= yesterday))
tasks = result.scalars().all()
completed = [t for t in tasks if t.status == "done"]
pending = [t for t in tasks if t.status != "done"]
report = f"""## 每日任务报告 - {yesterday.strftime("%Y-%m-%d")}
### 完成情况
- 总任务数: {len(tasks)}
- 已完成: {len(completed)}
- 未完成: {len(pending)}
### 已完成任务
{chr(10).join([f"- {t.title}" for t in completed]) or ""}
### 未完成任务
{chr(10).join([f"- {t.title} (优先级: {t.priority})" for t in pending]) or ""}
### 建议
根据未完成任务,建议明天优先处理:
{chr(10).join([f"{i + 1}. {t.title}" for i, t in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5])]) or "无待处理任务"}
"""
# 发布到论坛
from app.models.forum import ForumPost
post = ForumPost(
title=f"每日报告 - {yesterday.strftime('%Y-%m-%d')}",
content=report,
category="discussion",
)
db.add(post)
# 创建明日计划建议任务
for i, task in enumerate(sorted(pending, key=lambda x: x.priority, reverse=True)[:5]):
suggestion = Task(
title=f"继续: {task.title}",
description=f"昨日未完成任务,优先级: {task.priority}",
priority=task.priority,
status="todo",
)
db.add(suggestion)
await db.commit()
logger.info(f"[Scheduler] 每日任务分析完成,完成 {len(completed)} 个任务")
async def forum_scan_task():
"""
论坛扫描任务
- 扫描所有指令类帖子
- 识别可执行指令
- AI自动执行
"""
logger.info("[Scheduler] 开始扫描论坛指令...")
async with async_session() as db:
from sqlalchemy import select
result = await db.execute(
select(ForumPost)
.where(
ForumPost.category == "instruction",
ForumPost.is_executed == False,
)
.limit(5)
)
posts = result.scalars().all()
if not posts:
logger.info("[Scheduler] 暂无待执行指令")
return
agent_svc = AgentService(db)
executed_count = 0
for post in posts:
try:
# 让 Agent 分析并执行指令
conv_id, msg_id, response = await agent_svc.chat_simple(
user_id=post.user_id,
message=f"请执行以下论坛指令: {post.title}{post.content}",
conversation_id=None,
)
post.is_executed = True
post.executed_response = response
executed_count += 1
logger.info(f"[Scheduler] 执行指令: {post.title}")
except Exception as e:
logger.error(f"[Scheduler] 执行指令失败 {post.title}: {e}")
await db.commit()
logger.info(f"[Scheduler] 论坛扫描完成,执行了 {executed_count} 个指令")
async def graph_rebuild_task():
"""
知识图谱增量重建任务
- 扫描新增/更新的文档
- 更新图谱节点和边
"""
logger.info("[Scheduler] 开始重建知识图谱...")
async with async_session() as db:
try:
graph_svc = GraphService(db)
# 只处理最近7天有活动的文档
await graph_svc.build_graph(user_id="default", document_ids=None)
logger.info("[Scheduler] 知识图谱重建完成")
except Exception as e:
logger.error(f"[Scheduler] 知识图谱重建失败: {e}")
async def tag_generation_task():
"""
每日凌晨 00:00 增量标签生成任务
"""
from app.services.tag_service import TagService
from app.core.llm import get_llm_client
from sqlalchemy import select
logger.info("[Scheduler] 开始执行每日标签生成...")
async with async_session() as db:
try:
llm_client = get_llm_client()
tag_service = TagService(db, llm_client)
result = await db.execute(
select(KGNode.user_id)
.distinct()
.where(KGNode.entity_type.in_(["conversation", "document", "chunk"]))
)
user_ids = result.scalars().all()
total_tagged = 0
for user_id in user_ids:
sync_tag_service = TagService(db, llm_client)
result = sync_tag_service.tag_incremental_content(user_id, days=1)
total_tagged += result["tagged"]
logger.info(f"[Scheduler] 每日标签生成完成,共标签化 {total_tagged} 个内容节点")
except Exception as e:
logger.error(f"[Scheduler] 每日标签生成失败: {e}")
async def daily_todo_generation():
"""
每天早上 08:00 为所有活跃用户生成待办
- 来自前一天未完成的看板任务
- 来自前一天对话记录分析
"""
from app.models.user import User
from app.services.todo_service import generate_daily_todos
from sqlalchemy import select
logger.info("[Scheduler] 开始执行每日待办生成...")
async with async_session() as db:
try:
result = await db.execute(select(User).where(User.is_active == True))
users = result.scalars().all()
for user in users:
try:
await generate_daily_todos(user.id, db)
logger.info(f"[Scheduler] 为用户 {user.id} 生成今日待办完成")
except Exception as e:
logger.error(f"[Scheduler] 用户 {user.id} 定时生成待办失败: {e}")
logger.info(f"[Scheduler] 每日待办生成完成,共处理 {len(users)} 个用户")
except Exception as e:
logger.error(f"[Scheduler] 每日待办生成失败: {e}")
# ———— M.4: 主动记忆提取 ————
async def check_idle_conversations():
"""
每30分钟检查空闲超过30分钟的对话提取记忆
M.4: 主动记忆提取
"""
from datetime import timedelta, datetime, UTC
from app.models.conversation import Conversation, Message
from app.services.memory.memory_extractor import MemoryExtractor
logger.info("[Scheduler] 开始检查空闲对话...")
async with async_session() as db:
try:
# Find conversations idle > 30 minutes (no recent messages)
cutoff = datetime.now(UTC) - timedelta(minutes=30)
# Subquery to find last message time per conversation
from sqlalchemy import func
subq = (
select(Message.conversation_id, func.max(Message.created_at).label("last_message"))
.group_by(Message.conversation_id)
.having(func.max(Message.created_at) < cutoff)
).subquery()
result = await db.execute(
select(Conversation)
.join(subq, Conversation.id == subq.c.conversation_id)
.where(Conversation.updated_at >= datetime.now(UTC) - timedelta(hours=24))
.limit(10)
)
idle_conversations = list(result.scalars().all())
extractor = MemoryExtractor()
total_extracted = 0
for conv in idle_conversations:
try:
# Get conversation messages
msg_result = await db.execute(
select(Message)
.where(Message.conversation_id == conv.id)
.order_by(Message.created_at.desc())
.limit(10)
)
messages = list(msg_result.scalars().all())
if len(messages) >= 2:
new_memories = await extractor.extract_from_conversation(
db, conv.user_id, conv.id, messages
)
if new_memories:
await extractor.save_memories(db, conv.user_id, conv.id, new_memories)
total_extracted += len(new_memories)
except Exception as e:
logger.warning(
f"[MemoryExtractor] Failed to process conversation {conv.id}: {e}"
)
if total_extracted > 0:
logger.info(f"[Scheduler] 空闲对话记忆提取完成,共提取 {total_extracted} 条记忆")
except Exception as e:
logger.error(f"[Scheduler] 空闲对话检查失败: {e}")
# ===================== 调度器管理 =====================
def start_scheduler():
"""启动调度器,注册所有定时任务"""
if scheduler.running:
logger.warning("[Scheduler] 调度器已在运行")
return
# 每日凌晨 00:30 执行任务分析
scheduler.add_job(
daily_task_analysis,
CronTrigger(hour=0, minute=30, timezone="Asia/Shanghai"),
id="daily_task_analysis",
name="每日任务分析",
replace_existing=True,
)
# 每小时扫描论坛指令
scheduler.add_job(
forum_scan_task,
IntervalTrigger(hours=1),
id="forum_scan",
name="论坛指令扫描",
replace_existing=True,
)
# 每天凌晨 3:00 重建图谱
scheduler.add_job(
graph_rebuild_task,
CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
id="graph_rebuild",
name="知识图谱重建",
replace_existing=True,
)
# 每天凌晨 00:00 生成标签
scheduler.add_job(
tag_generation_task,
CronTrigger(hour=0, minute=0, timezone="Asia/Shanghai"),
id="tag_generation",
name="每日标签生成",
replace_existing=True,
)
# 每天早上 08:00 生成今日待办
scheduler.add_job(
daily_todo_generation,
CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"),
id="daily_todo_generation",
name="每日待办生成",
replace_existing=True,
)
# ———— M.2: 遗忘曲线系统 ————
# 每天凌晨 03:00 执行遗忘检查
scheduler.add_job(
daily_forgetting_check,
CronTrigger(hour=3, minute=0, timezone="Asia/Shanghai"),
id="daily_forgetting_check",
name="每日遗忘检查",
replace_existing=True,
)
# 每周一 04:00 执行自动强化
scheduler.add_job(
weekly_reinforcement_task,
CronTrigger(day_of_week="mon", hour=4, minute=0, timezone="Asia/Shanghai"),
id="weekly_reinforcement",
name="每周记忆强化",
replace_existing=True,
)
# ———— M.3: 主动提醒系统 ————
# 每天 22:00 生成每日摘要
scheduler.add_job(
daily_digest_generation,
CronTrigger(hour=22, minute=0, timezone="Asia/Shanghai"),
id="daily_digest_generation",
name="每日摘要生成",
replace_existing=True,
)
# 每15分钟检查到期提醒
scheduler.add_job(
reminder_check_task,
IntervalTrigger(minutes=15),
id="reminder_check",
name="提醒检查",
replace_existing=True,
)
# ———— M.4: 主动记忆提取 ————
# 每30分钟检查空闲对话并提取记忆
scheduler.add_job(
check_idle_conversations,
IntervalTrigger(minutes=30),
id="check_idle_conversations",
name="空闲对话记忆提取",
replace_existing=True,
)
scheduler.start()
logger.info("[Scheduler] 定时任务调度器已启动")
def stop_scheduler():
"""停止调度器"""
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("[Scheduler] 定时任务调度器已停止")
def get_scheduler_status() -> dict:
"""获取调度器状态"""
if not scheduler.running:
return {"status": "stopped", "jobs": []}
jobs = []
for job in scheduler.get_jobs():
jobs.append(
{
"id": job.id,
"name": job.name,
"next_run": str(job.next_run_time) if job.next_run_time else None,
}
)
return {"status": "running", "jobs": jobs}