diff --git a/backend/app/agents/learning/__init__.py b/backend/app/agents/learning/__init__.py new file mode 100644 index 0000000..6b65fc4 --- /dev/null +++ b/backend/app/agents/learning/__init__.py @@ -0,0 +1,19 @@ +from app.agents.learning.jobs import persist_retrospective, schedule_retrospective_job +from app.agents.learning.pattern_miner import LearningPatternMiner +from app.agents.learning.retrospector import build_session_retrospective +from app.agents.learning.session_search import SessionRetrospectiveSearch +from app.agents.learning.signal_extractor import RetrospectiveSignalExtractor +from app.agents.learning.skill_candidate_builder import SkillCandidateBuilder +from app.agents.learning.store import LearningArtifactStore, SessionRetrospectiveStore + +__all__ = [ + "build_session_retrospective", + "LearningArtifactStore", + "LearningPatternMiner", + "persist_retrospective", + "RetrospectiveSignalExtractor", + "schedule_retrospective_job", + "SessionRetrospectiveSearch", + "SessionRetrospectiveStore", + "SkillCandidateBuilder", +] diff --git a/backend/app/agents/learning/audit.py b/backend/app/agents/learning/audit.py new file mode 100644 index 0000000..4f71e10 --- /dev/null +++ b/backend/app/agents/learning/audit.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from app.agents.schemas.learning import LearningDecision, SessionRetrospective + + +def build_learning_audit_entry(retrospective: SessionRetrospective) -> dict[str, object]: + decision = retrospective.learning_decision + return { + "retrospective_id": retrospective.retrospective_id, + "decision": decision.decision if isinstance(decision, LearningDecision) else None, + "explanation": decision.explanation if isinstance(decision, LearningDecision) else None, + "signal_count": len(retrospective.learning_signals), + "pattern_count": len(retrospective.pattern_candidates), + "skill_candidate_count": len(retrospective.skill_candidates), + "outcome": retrospective.outcome, + } diff --git a/backend/app/agents/learning/bridge.py b/backend/app/agents/learning/bridge.py new file mode 100644 index 0000000..a5e6a25 --- /dev/null +++ b/backend/app/agents/learning/bridge.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from app.agents.schemas.learning import LearningDecision, LearningSignal + + +def route_learning_signal(signal: LearningSignal) -> str: + if signal.signal_type == "preference": + return "memory" + if signal.signal_type in {"workflow", "decomposition", "tool_success"}: + return "skill" + if signal.signal_type == "correction": + return "audit" + return "memory" + + +def build_learning_bridge_summary(signals: list[LearningSignal]) -> dict[str, object]: + memory_count = 0 + skill_count = 0 + audit_count = 0 + + for signal in signals: + route = route_learning_signal(signal) + if route == "memory": + memory_count += 1 + elif route == "skill": + skill_count += 1 + else: + audit_count += 1 + + return { + "memory_signal_count": memory_count, + "skill_signal_count": skill_count, + "audit_signal_count": audit_count, + } + + +def update_learning_decision_with_bridge( + decision: LearningDecision, + signals: list[LearningSignal], +) -> LearningDecision: + bridge_summary = build_learning_bridge_summary(signals) + metadata = dict(decision.metadata or {}) + metadata["bridge"] = bridge_summary + decision.metadata = metadata + return decision diff --git a/backend/app/agents/learning/jobs.py b/backend/app/agents/learning/jobs.py new file mode 100644 index 0000000..b1a6476 --- /dev/null +++ b/backend/app/agents/learning/jobs.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from app.config import settings +from app.database import async_session +from app.agents.learning.bridge import update_learning_decision_with_bridge +from app.agents.learning.pattern_miner import LearningPatternMiner +from app.agents.learning.audit import build_learning_audit_entry +from app.agents.learning.retrospector import build_session_retrospective +from app.agents.learning.signal_extractor import RetrospectiveSignalExtractor +from app.agents.learning.skill_candidate_builder import SkillCandidateBuilder +from app.agents.learning.store import LearningArtifactStore, SessionRetrospectiveStore +from app.agents.schemas.learning import LearningDecision, SessionRetrospective +from app.agents.skills.evaluator import SkillPromotionEvaluator + +logger = logging.getLogger(__name__) + + +def _enrich_retrospective(retrospective: SessionRetrospective) -> SessionRetrospective: + signals = RetrospectiveSignalExtractor().extract(retrospective) + patterns = LearningPatternMiner().mine(signals) + skill_candidates = SkillCandidateBuilder().build(patterns) + + decision = LearningDecision( + decision="create_candidate" if skill_candidates else ("reinforce_memory" if signals else "defer"), + explanation=( + "Retrospective produced reusable candidate skills." + if skill_candidates + else "Retrospective only reinforces memory-like evidence." + if signals + else "No stable signal was extracted from this retrospective." + ), + evidence_refs=(skill_candidates[0].evidence_refs if skill_candidates else retrospective.evidence_refs[:3]), + metadata={ + "signal_count": len(signals), + "pattern_count": len(patterns), + "skill_candidate_count": len(skill_candidates), + }, + ) + + retrospective.learning_signals = signals + retrospective.pattern_candidates = patterns + retrospective.skill_candidates = skill_candidates + retrospective.learning_decision = update_learning_decision_with_bridge(decision, signals) + return retrospective + + +def _build_learning_artifacts(retrospective: SessionRetrospective) -> list[dict[str, object]]: + artifacts: list[dict[str, object]] = [] + for signal in retrospective.learning_signals: + artifacts.append( + { + "artifact_type": "signal", + "artifact_key": signal.signal_type, + "summary_text": signal.explanation or signal.signal_type, + "payload": signal.model_dump(mode="json"), + } + ) + for pattern in retrospective.pattern_candidates: + artifacts.append( + { + "artifact_type": "pattern_candidate", + "artifact_key": pattern.pattern_type, + "summary_text": pattern.description, + "payload": pattern.model_dump(mode="json"), + } + ) + for candidate in retrospective.skill_candidates: + artifacts.append( + { + "artifact_type": "skill_candidate", + "artifact_key": candidate.name, + "summary_text": candidate.summary, + "payload": candidate.model_dump(mode="json"), + } + ) + if retrospective.learning_decision is not None: + artifacts.append( + { + "artifact_type": "learning_decision", + "artifact_key": retrospective.learning_decision.decision, + "summary_text": retrospective.learning_decision.explanation, + "payload": retrospective.learning_decision.model_dump(mode="json"), + } + ) + artifacts.append( + { + "artifact_type": "learning_audit", + "artifact_key": retrospective.retrospective_id or "retrospective", + "summary_text": retrospective.learning_decision.explanation, + "payload": build_learning_audit_entry(retrospective), + } + ) + return artifacts + + +def _build_lifecycle_artifacts(decisions: list) -> list[dict[str, object]]: + artifacts: list[dict[str, object]] = [] + for decision in decisions: + artifacts.append( + { + "artifact_type": "skill_lifecycle_decision", + "artifact_key": getattr(decision, "skill_name", None) or "skill", + "summary_text": getattr(decision, "reason", ""), + "payload": decision.model_dump(mode="json"), + } + ) + return artifacts + + +async def persist_retrospective( + *, + user_id: str, + conversation_id: str, + request_message_id: str | None, + response_message_id: str | None, + query_text: str, + final_response: str | None, + state: dict[str, Any] | None, +) -> None: + retrospective = build_session_retrospective( + request_id=response_message_id or request_message_id or conversation_id, + session_id=conversation_id, + user_query=query_text, + state=state, + runtime_context={"user_id": user_id}, + ) + retrospective = _enrich_retrospective(retrospective) + + async with async_session() as session: + saved = await SessionRetrospectiveStore(session).save(retrospective) + lifecycle_decisions = [] + if settings.ENABLE_SKILL_PROMOTION: + lifecycle_decisions = await SkillPromotionEvaluator(session).sync_retrospective( + user_id=user_id, + retrospective=retrospective, + ) + if settings.ENABLE_LEARNING_SIGNALS: + await LearningArtifactStore(session).save_batch( + user_id=user_id, + conversation_id=conversation_id, + retrospective_id=saved.id, + artifacts=[ + *_build_learning_artifacts(retrospective), + *_build_lifecycle_artifacts(lifecycle_decisions), + ], + ) + + +def schedule_retrospective_job(**kwargs) -> asyncio.Task[None] | None: + if not settings.ENABLE_RETROSPECTIVE: + return None + try: + task = asyncio.create_task(persist_retrospective(**kwargs)) + except RuntimeError: + return None + + def _handle_completion(done_task: asyncio.Task[None]) -> None: + try: + done_task.result() + except Exception: + logger.exception("retrospective_job_failed") + + task.add_done_callback(_handle_completion) + return task + + +def schedule_retrospective_learning_event( + *, + user_id: str, + conversation_id: str, + retrospective: SessionRetrospective, + session_factory=async_session, +) -> asyncio.Task[None] | None: + if not settings.ENABLE_RETROSPECTIVE: + return None + + async def _persist_existing() -> None: + async with session_factory() as session: + enriched = _enrich_retrospective(retrospective) + saved = await SessionRetrospectiveStore(session).save(enriched) + lifecycle_decisions = [] + if settings.ENABLE_SKILL_PROMOTION: + lifecycle_decisions = await SkillPromotionEvaluator(session).sync_retrospective( + user_id=user_id, + retrospective=enriched, + ) + if settings.ENABLE_LEARNING_SIGNALS: + await LearningArtifactStore(session).save_batch( + user_id=user_id, + conversation_id=conversation_id, + retrospective_id=saved.id, + artifacts=[ + *_build_learning_artifacts(enriched), + *_build_lifecycle_artifacts(lifecycle_decisions), + ], + ) + + try: + task = asyncio.create_task(_persist_existing()) + except RuntimeError: + return None + + def _handle_completion(done_task: asyncio.Task[None]) -> None: + try: + done_task.result() + except Exception: + logger.exception( + "retrospective_learning_event_failed", + extra={ + "details": { + "user_id": user_id, + "conversation_id": conversation_id, + } + }, + ) + + task.add_done_callback(_handle_completion) + return task diff --git a/backend/app/agents/learning/pattern_miner.py b/backend/app/agents/learning/pattern_miner.py new file mode 100644 index 0000000..4bc1dd0 --- /dev/null +++ b/backend/app/agents/learning/pattern_miner.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from uuid import uuid4 + +from app.agents.schemas.learning import LearningSignal, PatternCandidate + + +class LearningPatternMiner: + def mine(self, signals: list[LearningSignal]) -> list[PatternCandidate]: + patterns: list[PatternCandidate] = [] + + for signal in signals: + if signal.signal_type not in {"workflow", "decomposition", "preference"}: + continue + + description = self._build_description(signal) + patterns.append( + PatternCandidate( + pattern_id=f"pattern-{uuid4().hex[:10]}", + pattern_type=signal.signal_type, + description=description, + confidence=signal.confidence, + evidence_refs=signal.evidence_refs[:4], + ) + ) + + return patterns + + @staticmethod + def _build_description(signal: LearningSignal) -> str: + payload = signal.payload or {} + if signal.signal_type == "workflow": + task_type = payload.get("task_type") or "general" + execution_mode = payload.get("execution_mode") or "direct" + return f"Completed {task_type} requests worked under {execution_mode} execution." + if signal.signal_type == "decomposition": + task_count = payload.get("task_count") or 0 + return f"Requests with {task_count} concrete task refs benefit from structured decomposition." + if signal.signal_type == "preference": + preference = payload.get("preference") or "structured response" + return f"User preference repeatedly points to {preference}." + return signal.explanation or signal.signal_type diff --git a/backend/app/agents/learning/retrospector.py b/backend/app/agents/learning/retrospector.py new file mode 100644 index 0000000..71230c9 --- /dev/null +++ b/backend/app/agents/learning/retrospector.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.schemas.learning import SessionRetrospective + + +def _classify_task_type(query_text: str) -> str: + normalized = (query_text or "").lower() + if any(token in normalized for token in ("总结", "分析", "对比", "report", "analyze")): + return "analysis" + if any(token in normalized for token in ("安排", "提醒", "日程", "todo", "task")): + return "planning_or_execution" + if any(token in normalized for token in ("文档", "资料", "年报", "search", "查")): + return "retrieval" + return "general" + + +def build_session_retrospective( + *, + request_id: str, + session_id: str, + user_query: str, + state: dict[str, Any] | None, + runtime_context: dict[str, Any] | None = None, +) -> SessionRetrospective: + state = state or {} + if hasattr(runtime_context, "model_dump"): + runtime_context = runtime_context.model_dump(mode="json") + runtime_context = runtime_context or {} + skill_shortlist = state.get("skill_shortlist") or [] + used_skill_names = [ + item.get("skill_name") + for item in skill_shortlist + if isinstance(item, dict) and item.get("skill_name") + ] + + task_refs = [] + for task in (state.get("completed_tasks") or [])[:4]: + if isinstance(task, dict): + task_refs.append( + { + "task_id": task.get("task_id"), + "title": task.get("title"), + "status": task.get("status"), + } + ) + + event_refs = [] + for event in (state.get("event_trace") or [])[:8]: + if isinstance(event, dict): + event_refs.append( + { + "event_type": event.get("event_type"), + "task_id": event.get("task_id"), + "agent_id": event.get("agent_id"), + } + ) + + verification_evidence = [] + for evidence in (state.get("verification_evidence") or [])[:6]: + if isinstance(evidence, dict): + verification_evidence.append(evidence) + + verification_status = state.get("verification_status") + execution_mode = state.get("execution_mode") + primary_agent = state.get("current_agent") or "master" + retrospective_shortlist = state.get("retrospective_shortlist") or [] + + summary_parts = [ + f"本轮请求按 {execution_mode or 'unknown'} 模式处理", + f"主要负责 agent 为 {primary_agent}", + ] + if verification_status: + summary_parts.append(f"验证结果为 {verification_status}") + if used_skill_names: + summary_parts.append(f"命中技能候选 {', '.join(used_skill_names[:3])}") + if retrospective_shortlist: + summary_parts.append(f"参考了 {len(retrospective_shortlist)} 条历史复盘") + + final_response = state.get("final_response") + outcome = "completed" if final_response else "failed" + if not final_response and verification_status == "passed": + outcome = "completed" + if final_response and verification_status == "skipped": + outcome = "partial" + + return SessionRetrospective( + retrospective_id=request_id, + user_id=str(runtime_context.get("user_id") or ""), + conversation_id=session_id, + response_message_id=request_id, + query_text=user_query, + final_response=final_response, + summary=";".join(summary_parts) + "。", + task_type=_classify_task_type(user_query), + execution_mode=execution_mode, + primary_agent=primary_agent, + verification_status=verification_status, + verification_summary=state.get("verification_summary"), + used_skill_names=used_skill_names, + evidence_refs=verification_evidence, + task_refs=task_refs, + event_refs=event_refs, + context_snapshot={ + "runtime_request_context": runtime_context, + "recommended_runtime_mode": runtime_context.get("recommended_runtime_mode"), + "parallel_worthiness": state.get("parallel_worthiness"), + "retrospective_shortlist_count": len(retrospective_shortlist), + "scheduled_subtask_count": len(state.get("scheduled_subtasks") or []), + "merge_report": dict(state.get("merge_report") or {}), + "verification_report": dict(state.get("verification_report") or {}), + }, + outcome=outcome, + ) diff --git a/backend/app/agents/learning/session_search.py b/backend/app/agents/learning/session_search.py new file mode 100644 index 0000000..ee3902e --- /dev/null +++ b/backend/app/agents/learning/session_search.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +from app.agents.schemas.learning import SessionRetrospective +from app.agents.skills.matcher import score_text_match +from app.agents.learning.store import SessionRetrospectiveStore +from app.config import settings + + +class SessionRetrospectiveSearch: + def __init__(self, db): + self.db = db + + async def shortlist( + self, + *, + user_id: str, + query_text: str, + conversation_id: str | None = None, + task_type: str | None = None, + skill_name: str | None = None, + limit: int = 3, + ) -> list[SessionRetrospective]: + records = await SessionRetrospectiveStore(self.db).list_recent(user_id=user_id, limit=25) + scored: list[tuple[float, SessionRetrospective]] = [] + + for record in records: + if task_type and record.task_type != task_type: + continue + if skill_name and skill_name not in (record.skill_names or []): + continue + score, _matched_terms = score_text_match( + query_text, + record.query_text, + record.summary_text, + " ".join(record.skill_names or []), + ) + if conversation_id and record.conversation_id == conversation_id: + score = min(1.0, score + 0.1) + if score <= 0: + continue + + payload = dict(record.payload or {}) + payload["retrospective_id"] = record.id + retrospective = SessionRetrospective.model_validate(payload) + scored.append((score, retrospective)) + + scored.sort(key=lambda item: item[0], reverse=True) + return [item for _score, item in scored[:limit]] + + +async def search_recent_retrospectives( + db, + *, + user_id: str, + query: str, + conversation_id: str | None = None, + task_type: str | None = None, + skill_name: str | None = None, + limit: int = 3, +) -> list[SessionRetrospective]: + if not settings.ENABLE_SESSION_RETROSPECTIVE_SEARCH: + return [] + return await SessionRetrospectiveSearch(db).shortlist( + user_id=user_id, + query_text=query, + conversation_id=conversation_id, + task_type=task_type, + skill_name=skill_name, + limit=limit, + ) + + +def summarize_retrospective(retrospective: SessionRetrospective) -> dict[str, object]: + verification_status = retrospective.verification_status or retrospective.outcome + success_score = 1.0 if verification_status == "passed" else 0.6 if verification_status == "skipped" else 0.2 + reusable_patterns = [] + if retrospective.used_skill_names: + reusable_patterns.append("skill_shortlist_hit") + if retrospective.execution_mode: + reusable_patterns.append(f"mode:{retrospective.execution_mode}") + + avoid_patterns = [] + if retrospective.outcome == "failed": + avoid_patterns.append("failed_outcome") + + return { + "retrospective_id": retrospective.retrospective_id, + "task_type": retrospective.task_type, + "request_summary": retrospective.query_text[:120], + "summary": retrospective.summary, + "execution_mode": retrospective.execution_mode, + "success_score": round(success_score, 2), + "reusable_patterns": reusable_patterns, + "avoid_patterns": avoid_patterns, + } diff --git a/backend/app/agents/learning/signal_extractor.py b/backend/app/agents/learning/signal_extractor.py new file mode 100644 index 0000000..252ae93 --- /dev/null +++ b/backend/app/agents/learning/signal_extractor.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from app.agents.schemas.learning import LearningSignal, SessionRetrospective + + +class RetrospectiveSignalExtractor: + def extract(self, retrospective: SessionRetrospective) -> list[LearningSignal]: + signals: list[LearningSignal] = [] + + if retrospective.outcome == "completed": + signals.append( + LearningSignal( + signal_type="workflow", + confidence=0.8, + evidence_refs=retrospective.evidence_refs[:3], + explanation="Completed runs can be mined as workflow hints later.", + payload={ + "task_type": retrospective.task_type, + "execution_mode": retrospective.execution_mode, + }, + ) + ) + + if len(retrospective.task_refs) > 1: + context_snapshot = retrospective.context_snapshot or {} + merge_report = dict(context_snapshot.get("merge_report") or {}) + verification_report = dict(context_snapshot.get("verification_report") or {}) + effectiveness_score = 1.0 + if merge_report.get("status") == "conflicted": + effectiveness_score = 0.45 + elif merge_report.get("status") == "fallback": + effectiveness_score = 0.25 + elif verification_report.get("status") == "failed": + effectiveness_score = 0.3 + signals.append( + LearningSignal( + signal_type="decomposition", + confidence=0.7, + evidence_refs=retrospective.task_refs[:3], + explanation="Multiple completed task refs indicate a decomposition pattern.", + payload={ + "task_count": len(retrospective.task_refs), + "scheduled_subtask_count": context_snapshot.get("scheduled_subtask_count", 0), + "effectiveness_score": effectiveness_score, + "merge_status": merge_report.get("status"), + }, + ) + ) + + if retrospective.used_skill_names: + signals.append( + LearningSignal( + signal_type="tool_success", + confidence=0.65 if retrospective.outcome == "completed" else 0.35, + evidence_refs=retrospective.evidence_refs[:2], + explanation="Task-scoped skill shortlist was available during this run.", + payload={"skills": retrospective.used_skill_names[:3]}, + ) + ) + + if retrospective.outcome == "failed": + signals.append( + LearningSignal( + signal_type="correction", + confidence=0.75, + evidence_refs=retrospective.evidence_refs[:2], + explanation="Failed retrospectives should remain auditable before any promotion.", + payload={"verification_status": retrospective.verification_status}, + ) + ) + + return signals diff --git a/backend/app/agents/learning/skill_candidate_builder.py b/backend/app/agents/learning/skill_candidate_builder.py new file mode 100644 index 0000000..66c4b8f --- /dev/null +++ b/backend/app/agents/learning/skill_candidate_builder.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import hashlib + +from app.agents.schemas.learning import PatternCandidate, SkillCandidate + + +class SkillCandidateBuilder: + def build(self, patterns: list[PatternCandidate]) -> list[SkillCandidate]: + candidates: list[SkillCandidate] = [] + + for pattern in patterns: + if pattern.confidence < 0.55: + continue + + name = self._build_name(pattern) + candidates.append( + SkillCandidate( + candidate_id=f"candidate-{self._stable_suffix(pattern)}", + name=name, + summary=pattern.description, + candidate_type=self._map_candidate_type(pattern.pattern_type), + source_pattern_ids=[pattern.pattern_id], + confidence=pattern.confidence, + evidence_refs=pattern.evidence_refs[:4], + recommended_status="candidate", + ) + ) + + return candidates + + @staticmethod + def _build_name(pattern: PatternCandidate) -> str: + prefix = { + "workflow": "workflow", + "decomposition": "decomposition", + "preference": "preference", + }.get(pattern.pattern_type, "learned") + stable_suffix = SkillCandidateBuilder._stable_suffix(pattern) + return f"{prefix}-{stable_suffix}" + + @staticmethod + def _map_candidate_type(pattern_type: str) -> str: + mapping = { + "workflow": "workflow_skill", + "decomposition": "decomposition_skill", + "preference": "preference_skill", + } + return mapping.get(pattern_type, "workflow_skill") + + @staticmethod + def _stable_suffix(pattern: PatternCandidate) -> str: + raw = f"{pattern.pattern_type}:{pattern.description}".encode("utf-8") + return hashlib.sha1(raw).hexdigest()[:10] diff --git a/backend/app/agents/learning/store.py b/backend/app/agents/learning/store.py new file mode 100644 index 0000000..5c0de4c --- /dev/null +++ b/backend/app/agents/learning/store.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from sqlalchemy import desc, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.agents.schemas.learning import SessionRetrospective +from app.models.learning import LearningArtifactRecord, SessionRetrospectiveRecord + + +class SessionRetrospectiveStore: + def __init__(self, db: AsyncSession): + self.db = db + + async def save(self, retrospective: SessionRetrospective) -> SessionRetrospectiveRecord: + payload = retrospective.model_dump(mode="json") + record = SessionRetrospectiveRecord( + user_id=retrospective.user_id, + conversation_id=retrospective.conversation_id, + request_message_id=retrospective.request_message_id, + response_message_id=retrospective.response_message_id, + query_text=retrospective.query_text, + final_response=retrospective.final_response, + summary_text=retrospective.summary, + task_type=retrospective.task_type, + execution_mode=retrospective.execution_mode, + primary_agent=retrospective.primary_agent, + verification_status=retrospective.verification_status, + verification_summary=retrospective.verification_summary, + skill_names=retrospective.used_skill_names, + evidence=retrospective.evidence_refs, + task_refs=retrospective.task_refs, + payload=payload, + ) + self.db.add(record) + await self.db.commit() + await self.db.refresh(record) + return record + + async def list_recent( + self, + *, + user_id: str, + limit: int = 20, + ) -> list[SessionRetrospectiveRecord]: + result = await self.db.execute( + select(SessionRetrospectiveRecord) + .where(SessionRetrospectiveRecord.user_id == user_id) + .order_by(desc(SessionRetrospectiveRecord.recorded_at), desc(SessionRetrospectiveRecord.created_at)) + .limit(limit) + ) + return list(result.scalars().all()) + + +class LearningArtifactStore: + def __init__(self, db: AsyncSession): + self.db = db + + async def save_batch( + self, + *, + user_id: str, + conversation_id: str, + retrospective_id: str | None, + artifacts: list[dict[str, object]], + ) -> list[LearningArtifactRecord]: + records: list[LearningArtifactRecord] = [] + for artifact in artifacts: + record = LearningArtifactRecord( + user_id=user_id, + conversation_id=conversation_id, + retrospective_id=retrospective_id, + artifact_type=str(artifact.get("artifact_type") or "unknown"), + artifact_key=str(artifact.get("artifact_key") or "") or None, + summary_text=str(artifact.get("summary_text") or ""), + payload=dict(artifact.get("payload") or {}), + ) + self.db.add(record) + records.append(record) + + await self.db.commit() + for record in records: + await self.db.refresh(record) + return records + + async def list_recent( + self, + *, + user_id: str, + artifact_type: str | None = None, + limit: int = 50, + ) -> list[LearningArtifactRecord]: + query = select(LearningArtifactRecord).where(LearningArtifactRecord.user_id == user_id) + if artifact_type: + query = query.where(LearningArtifactRecord.artifact_type == artifact_type) + result = await self.db.execute( + query.order_by( + desc(LearningArtifactRecord.recorded_at), + desc(LearningArtifactRecord.created_at), + ).limit(limit) + ) + return list(result.scalars().all()) + + async def aggregate_counts_by_key( + self, + *, + user_id: str, + artifact_type: str, + limit: int = 100, + ) -> dict[str, int]: + records = await self.list_recent(user_id=user_id, artifact_type=artifact_type, limit=limit) + counts: dict[str, int] = {} + for record in records: + key = record.artifact_key or "unknown" + counts[key] = counts.get(key, 0) + 1 + return counts + + +def append_retrospective_attachment( + attachments: list[dict] | None, + retrospective: SessionRetrospective, +) -> list[dict]: + next_attachments = list(attachments or []) + next_attachments.append( + { + "kind": "session_retrospective", + "payload": retrospective.model_dump(mode="json"), + } + ) + return next_attachments diff --git a/backend/app/agents/schemas/learning.py b/backend/app/agents/schemas/learning.py new file mode 100644 index 0000000..ee67421 --- /dev/null +++ b/backend/app/agents/schemas/learning.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Literal + +from pydantic import BaseModel, Field + + +LearningSignalType = Literal[ + "preference", + "workflow", + "decomposition", + "tool_success", + "correction", +] + + +class SessionRetrospective(BaseModel): + retrospective_id: str | None = None + user_id: str + conversation_id: str + request_message_id: str | None = None + response_message_id: str | None = None + query_text: str + final_response: str | None = None + summary: str + task_type: str | None = None + execution_mode: str | None = None + primary_agent: str | None = None + verification_status: str | None = None + verification_summary: str | None = None + used_skill_names: list[str] = Field(default_factory=list) + evidence_refs: list[dict[str, Any]] = Field(default_factory=list) + task_refs: list[dict[str, Any]] = Field(default_factory=list) + event_refs: list[dict[str, Any]] = Field(default_factory=list) + context_snapshot: dict[str, Any] = Field(default_factory=dict) + learning_signals: list["LearningSignal"] = Field(default_factory=list) + pattern_candidates: list["PatternCandidate"] = Field(default_factory=list) + skill_candidates: list["SkillCandidate"] = Field(default_factory=list) + learning_decision: "LearningDecision | None" = None + outcome: Literal["completed", "partial", "failed"] = "completed" + captured_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class LearningSignal(BaseModel): + signal_type: LearningSignalType + confidence: float = 0.0 + evidence_refs: list[dict[str, Any]] = Field(default_factory=list) + explanation: str | None = None + payload: dict[str, Any] = Field(default_factory=dict) + + +class PatternCandidate(BaseModel): + pattern_id: str + pattern_type: str + description: str + confidence: float = 0.0 + evidence_refs: list[dict[str, Any]] = Field(default_factory=list) + + +class SkillCandidate(BaseModel): + candidate_id: str + name: str + summary: str + candidate_type: Literal["workflow_skill", "preference_skill", "decomposition_skill"] = "workflow_skill" + source_pattern_ids: list[str] = Field(default_factory=list) + confidence: float = 0.0 + evidence_refs: list[dict[str, Any]] = Field(default_factory=list) + recommended_status: Literal["candidate", "shadow"] = "candidate" + + +class LearningDecision(BaseModel): + decision: Literal["reinforce_memory", "create_candidate", "promote_skill", "defer", "reject"] + explanation: str + evidence_refs: list[dict[str, Any]] = Field(default_factory=list) + metadata: dict[str, Any] = Field(default_factory=dict) diff --git a/backend/app/models/learning.py b/backend/app/models/learning.py new file mode 100644 index 0000000..93fe1c1 --- /dev/null +++ b/backend/app/models/learning.py @@ -0,0 +1,38 @@ +from sqlalchemy import Column, DateTime, ForeignKey, JSON, String, Text + +from app.models.base import BaseModel, utc_now + + +class SessionRetrospectiveRecord(BaseModel): + __tablename__ = "session_retrospectives" + + user_id = Column(String(36), ForeignKey("users.id"), nullable=False, index=True) + conversation_id = Column(String(36), ForeignKey("conversations.id"), nullable=False, index=True) + request_message_id = Column(String(36), ForeignKey("messages.id"), nullable=True, index=True) + response_message_id = Column(String(36), ForeignKey("messages.id"), nullable=True, index=True) + query_text = Column(Text, nullable=False) + final_response = Column(Text, nullable=True) + summary_text = Column(Text, nullable=False) + task_type = Column(String(64), nullable=True, index=True) + execution_mode = Column(String(32), nullable=True, index=True) + primary_agent = Column(String(64), nullable=True) + verification_status = Column(String(32), nullable=True) + verification_summary = Column(Text, nullable=True) + skill_names = Column(JSON, default=list, nullable=False) + evidence = Column(JSON, default=list, nullable=False) + task_refs = Column(JSON, default=list, nullable=False) + payload = Column(JSON, default=dict, nullable=False) + recorded_at = Column(DateTime, default=utc_now, nullable=False) + + +class LearningArtifactRecord(BaseModel): + __tablename__ = "learning_artifacts" + + user_id = Column(String(36), ForeignKey("users.id"), nullable=False, index=True) + conversation_id = Column(String(36), ForeignKey("conversations.id"), nullable=False, index=True) + retrospective_id = Column(String(36), ForeignKey("session_retrospectives.id"), nullable=True, index=True) + artifact_type = Column(String(32), nullable=False, index=True) + artifact_key = Column(String(128), nullable=True, index=True) + summary_text = Column(Text, nullable=False) + payload = Column(JSON, default=dict, nullable=False) + recorded_at = Column(DateTime, default=utc_now, nullable=False) diff --git a/backend/tests/backend/app/agents/test_learning_runtime.py b/backend/tests/backend/app/agents/test_learning_runtime.py new file mode 100644 index 0000000..c5ace9f --- /dev/null +++ b/backend/tests/backend/app/agents/test_learning_runtime.py @@ -0,0 +1,1013 @@ +import asyncio +from datetime import datetime +from types import SimpleNamespace + +import pytest +from sqlalchemy import text +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +import app.models # noqa: F401 +from app.database import Base +from app.models.learning import LearningArtifactRecord, SessionRetrospectiveRecord +from app.models.skill import Skill +from app.models.user import User +from app.services.runtime_observability import build_runtime_observability_report +from app.services.agent_service import AgentService +from app.services.skill_service import SkillService +from app.services.auth_service import get_password_hash +from app.agents.learning.jobs import schedule_retrospective_job, schedule_retrospective_learning_event +from app.agents.learning.bridge import build_learning_bridge_summary, route_learning_signal +from app.agents.learning.retrospector import build_session_retrospective +from app.agents.learning.store import LearningArtifactStore, SessionRetrospectiveStore +from app.agents.learning.pattern_miner import LearningPatternMiner +from app.agents.learning.session_search import SessionRetrospectiveSearch, summarize_retrospective +from app.agents.learning.signal_extractor import RetrospectiveSignalExtractor +from app.agents.learning.skill_candidate_builder import SkillCandidateBuilder +from app.agents.skills.policy import render_skill_shortlist_context +from app.agents.schemas.orchestration import ( + ExecutionDecision, + RuntimeRequestContext, + assess_parallel_worthiness, +) +from app.agents.schemas.skills import SkillShortlistEntry +from app.agents.skills.retriever import build_shortlisted_skill_context, shortlist_skills_for_request + + +def test_runtime_request_context_accepts_request_time_fields(): + context = RuntimeRequestContext( + request_id="req-1", + session_id="conv-1", + user_id="user-1", + raw_user_query="帮我安排下周计划", + recalled_memories=["prefers concise updates"], + recalled_retrospectives=[{"summary": "mode=collaboration", "task_type": "analysis"}], + shortlisted_skills=["weekly-planning"], + current_agent_role="master", + execution_mode="direct", + conversation_state_ref="conv-1", + ) + + assert context.session_id == "conv-1" + assert context.raw_user_query == "帮我安排下周计划" + assert context.shortlisted_skills == ["weekly-planning"] + + +def test_execution_decision_captures_reason_and_selected_roles(): + decision = ExecutionDecision( + request_id="req-2", + mode="collaboration", + reason="complex_multi_role_request", + complexity_score=0.78, + parallel_worthiness_score=0.42, + selected_roles=["librarian", "analyst"], + ) + + assert decision.mode == "collaboration" + assert decision.reason == "complex_multi_role_request" + assert decision.selected_roles == ["librarian", "analyst"] + + +def test_retrospective_builder_and_summary_support_runtime_context_compatibility(): + retrospective = build_session_retrospective( + request_id="assistant-msg-1", + session_id="conv-1", + user_query="帮我安排下周计划,并把重点列出来", + runtime_context={"user_id": "user-1", "recalled_retrospectives": [{"summary": "older run"}]}, + state={ + "user_id": "user-1", + "execution_mode": "collaboration", + "current_agent": "schedule_planner", + "verification_status": "passed", + "verification_summary": "all checks passed", + "skill_shortlist": [{"skill_name": "weekly-planning"}], + "completed_tasks": [{"task_id": "task-1", "title": "draft schedule", "status": "completed"}], + }, + ) + + summary = summarize_retrospective(retrospective) + + assert retrospective.conversation_id == "conv-1" + assert retrospective.response_message_id == "assistant-msg-1" + assert retrospective.used_skill_names == ["weekly-planning"] + assert retrospective.outcome == "completed" + assert summary["execution_mode"] == "collaboration" + assert "skill_shortlist_hit" in summary["reusable_patterns"] + + +def test_build_shortlisted_skill_context_filters_entries_by_agent_scope(): + shortlist = [ + SkillShortlistEntry( + skill_name="weekly-planning", + scope=["schedule_planner"], + score=0.9, + summary="Turn weekly intent into concrete schedule blocks.", + injection_mode="summary", + ), + SkillShortlistEntry( + skill_name="forum-posting", + scope=["executor"], + score=0.7, + summary="Post updates to forum threads.", + injection_mode="metadata_only", + ), + ] + + rendered = build_shortlisted_skill_context(shortlist, agent_type="schedule_planner") + + assert "weekly-planning" in rendered + assert "forum-posting" not in rendered + + +def test_render_skill_shortlist_context_truncates_summary_and_avoids_full_injection(): + rendered = render_skill_shortlist_context( + [ + SkillShortlistEntry( + skill_name="long-skill", + score=0.9, + summary="x" * 300, + injection_mode="summary", + ) + ] + ) + + assert "mode=summary" in rendered + assert "x" * 150 not in rendered + + +def test_assess_parallel_worthiness_prefers_collaboration_for_multi_source_request(): + worthiness = assess_parallel_worthiness( + "帮我对比论坛、知识库和代码里的信息,然后给我一个分析结论", + retrospective_count=2, + skill_count=1, + ) + + assert worthiness.score > 0 + assert worthiness.preferred_mode in {"collaboration", "parallel"} + assert worthiness.estimated_subtasks >= 2 + + +def test_signal_pattern_and_skill_candidate_pipeline_builds_candidates(): + retrospective = build_session_retrospective( + request_id="assistant-msg-2", + session_id="conv-2", + user_query="先收集资料,再总结结论,再给我计划", + runtime_context={"user_id": "user-2"}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "passed", + "final_response": "done", + "skill_shortlist": [{"skill_name": "research-synthesis"}], + "completed_tasks": [ + {"task_id": "task-1", "title": "collect", "status": "completed"}, + {"task_id": "task-2", "title": "summarize", "status": "completed"}, + ], + "verification_evidence": [{"type": "verification"}], + }, + ) + + signals = RetrospectiveSignalExtractor().extract(retrospective) + patterns = LearningPatternMiner().mine(signals) + candidates = SkillCandidateBuilder().build(patterns) + + assert signals + assert any(signal.signal_type == "workflow" for signal in signals) + assert patterns + assert candidates + assert candidates[0].recommended_status == "candidate" + + +def test_decomposition_signal_carries_effectiveness_from_merge_context(): + retrospective = build_session_retrospective( + request_id="assistant-msg-merge", + session_id="conv-merge", + user_query="先收集资料,再总结结论", + runtime_context={"user_id": "user-2"}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "failed", + "final_response": "done", + "completed_tasks": [ + {"task_id": "task-1", "title": "collect", "status": "completed"}, + {"task_id": "task-2", "title": "summarize", "status": "completed"}, + ], + "scheduled_subtasks": [{"subtask_id": "task-1"}, {"subtask_id": "task-2"}], + "merge_report": {"status": "fallback", "conflict_flags": ["failed_or_blocked_tasks:task-2"]}, + "verification_report": {"status": "failed"}, + }, + ) + + signals = RetrospectiveSignalExtractor().extract(retrospective) + decomposition_signal = next(signal for signal in signals if signal.signal_type == "decomposition") + + assert decomposition_signal.payload["merge_status"] == "fallback" + assert decomposition_signal.payload["effectiveness_score"] == 0.25 + + +def test_learning_bridge_routes_workflow_to_skill_and_preference_to_memory(): + signals = [ + RetrospectiveSignalExtractor().extract( + build_session_retrospective( + request_id="bridge-r1", + session_id="bridge-c1", + user_query="先查资料再总结", + runtime_context={"user_id": "u1"}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "passed", + "final_response": "done", + "completed_tasks": [ + {"task_id": "t1", "title": "collect", "status": "completed"}, + {"task_id": "t2", "title": "summarize", "status": "completed"}, + ], + }, + ) + )[0] + ] + + summary = build_learning_bridge_summary(signals) + + assert route_learning_signal(signals[0]) == "skill" + assert summary["skill_signal_count"] == 1 + + +def test_learning_bridge_summary_is_written_into_learning_decision_metadata(): + retrospective = build_session_retrospective( + request_id="bridge-meta-r1", + session_id="bridge-meta-c1", + user_query="先查资料再总结", + runtime_context={"user_id": "u1"}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "passed", + "final_response": "done", + "completed_tasks": [ + {"task_id": "t1", "title": "collect", "status": "completed"}, + {"task_id": "t2", "title": "summarize", "status": "completed"}, + ], + }, + ) + from app.agents.learning.jobs import _enrich_retrospective + + enriched = _enrich_retrospective(retrospective) + + assert enriched.learning_decision is not None + bridge = enriched.learning_decision.metadata["bridge"] + assert bridge["skill_signal_count"] >= 1 + + +@pytest.mark.asyncio +async def test_schedule_retrospective_job_returns_without_waiting_for_persistence(monkeypatch): + started = asyncio.Event() + release = asyncio.Event() + + async def fake_persist_retrospective(**_kwargs): + started.set() + await release.wait() + + monkeypatch.setattr("app.agents.learning.jobs.persist_retrospective", fake_persist_retrospective) + + task = schedule_retrospective_job( + user_id="u1", + conversation_id="c1", + request_message_id="m1", + response_message_id="m2", + query_text="hello", + final_response="world", + state={}, + ) + + assert task is not None + await asyncio.wait_for(started.wait(), timeout=1) + assert task.done() is False + release.set() + await asyncio.wait_for(task, timeout=1) + + +@pytest.mark.asyncio +async def test_runtime_skill_shortlist_keeps_shadow_skills_metadata_only_and_skips_candidates(tmp_path): + db_path = tmp_path / "skill_lifecycle.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="skill-user", + email="skill@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Skill User", + ) + session.add(user) + await session.flush() + + session.add( + Skill( + owner_id=user.id, + name="weekly-planning-shadow", + description="用于安排下周计划的 shadow skill", + instructions="把下周计划拆成具体时间块。", + agent_type="schedule_planner", + status="shadow", + visibility="private", + scope=["schedule_planner"], + ) + ) + session.add( + Skill( + owner_id=user.id, + name="weekly-planning-candidate", + description="Candidate skill should not enter runtime shortlist", + instructions="Candidate", + agent_type="schedule_planner", + status="candidate", + visibility="private", + scope=["schedule_planner"], + ) + ) + await session.commit() + + shortlist = await shortlist_skills_for_request( + session, + user_id=user.id, + user_query="帮我安排下周计划并拆成时间块", + limit=5, + ) + + await engine.dispose() + + names = [item.skill_name for item in shortlist] + assert "weekly-planning-shadow" in names + assert "weekly-planning-candidate" not in names + shadow_entry = next(item for item in shortlist if item.skill_name == "weekly-planning-shadow") + assert shadow_entry.status == "shadow" + assert shadow_entry.injection_mode == "metadata_only" + + +@pytest.mark.asyncio +async def test_schedule_retrospective_learning_event_persists_learning_artifacts(tmp_path, monkeypatch): + db_path = tmp_path / "learning_artifacts.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + import app.agents.learning.jobs as jobs_module + + monkeypatch.setattr(jobs_module, "async_session", session_factory) + + async with session_factory() as session: + user = User( + username="learning-user", + email="learning@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Learning User", + ) + session.add(user) + await session.flush() + await session.commit() + + retrospective = build_session_retrospective( + request_id="assistant-msg-3", + session_id="conv-3", + user_query="先查资料,再总结,再给计划", + runtime_context={"user_id": user.id}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "passed", + "final_response": "done", + "skill_shortlist": [{"skill_name": "research-synthesis"}], + "completed_tasks": [ + {"task_id": "task-1", "title": "collect", "status": "completed"}, + {"task_id": "task-2", "title": "summarize", "status": "completed"}, + ], + "verification_evidence": [{"type": "verification"}], + }, + ) + + task = schedule_retrospective_learning_event( + user_id=user.id, + conversation_id="conv-3", + retrospective=retrospective, + session_factory=session_factory, + ) + assert task is not None + await task + + async with session_factory() as session: + retrospective_count = await session.scalar( + text("SELECT COUNT(*) FROM session_retrospectives") + ) + artifact_count = await session.scalar( + text("SELECT COUNT(*) FROM learning_artifacts") + ) + result = await session.execute(text("SELECT artifact_type FROM learning_artifacts")) + artifact_types = {row[0] for row in result.fetchall()} + + await engine.dispose() + + assert retrospective_count == 1 + assert artifact_count >= 2 + assert "signal" in artifact_types + assert "learning_decision" in artifact_types + assert "skill_lifecycle_decision" in artifact_types + + +@pytest.mark.asyncio +async def test_learned_skill_candidate_promotes_to_shadow_after_repeated_candidates(tmp_path): + db_path = tmp_path / "skill_candidate_promotion.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="candidate-user", + email="candidate@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Candidate User", + ) + session.add(user) + await session.flush() + + service = SkillService(session) + retrospective = build_session_retrospective( + request_id="r1", + session_id="c1", + user_query="先查资料再总结", + runtime_context={"user_id": user.id}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "passed", + "final_response": "done", + "completed_tasks": [ + {"task_id": "t1", "title": "collect", "status": "completed"}, + {"task_id": "t2", "title": "summarize", "status": "completed"}, + ], + }, + ) + signals = RetrospectiveSignalExtractor().extract(retrospective) + patterns = LearningPatternMiner().mine(signals) + candidate = SkillCandidateBuilder().build(patterns)[0] + + candidate_2 = candidate.model_copy( + update={ + "candidate_id": "candidate-alt", + "source_pattern_ids": ["pattern-alt-1"], + "evidence_refs": [{"type": "analysis", "detail": "second run"}], + } + ) + + first = await service.upsert_learned_candidate( + user_id=user.id, + candidate=candidate, + primary_agent="analyst", + ) + second = await service.upsert_learned_candidate( + user_id=user.id, + candidate=candidate_2, + primary_agent="analyst", + ) + skill = await service.get_by_name_for_user(user.id, candidate.name) + + await engine.dispose() + + assert first.new_status == "candidate" + assert second.new_status == "shadow" + assert skill is not None + assert skill.status == "shadow" + assert skill.candidate_count == 2 + + +@pytest.mark.asyncio +async def test_shadow_skill_promotes_to_active_after_positive_feedback(tmp_path): + db_path = tmp_path / "shadow_to_active.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="shadow-user", + email="shadow@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Shadow User", + ) + session.add(user) + await session.flush() + + skill = Skill( + owner_id=user.id, + name="workflow-shadow-skill", + description="shadow skill", + instructions="shadow skill", + agent_type="analyst", + visibility="private", + is_active=True, + status="shadow", + scope=["analyst", "learned"], + effectiveness=0.72, + activation_count=1, + ) + session.add(skill) + await session.commit() + + decision = await SkillService(session).record_activation_feedback( + user_id=user.id, + skill_name="workflow-shadow-skill", + outcome_score=0.9, + ) + refreshed = await session.get(Skill, skill.id) + + await engine.dispose() + + assert decision is not None + assert decision.action == "promoted_to_active" + assert refreshed is not None + assert refreshed.status == "active" + assert refreshed.activation_count == 2 + + +@pytest.mark.asyncio +async def test_runtime_skill_shortlist_excludes_retired_skills(tmp_path): + db_path = tmp_path / "retired_skill_filter.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="retired-user", + email="retired@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Retired User", + ) + session.add(user) + await session.flush() + + session.add( + Skill( + owner_id=user.id, + name="retired-skill", + description="retired skill", + instructions="retired skill", + agent_type="analyst", + visibility="private", + is_active=True, + status="retired", + scope=["analyst", "learned"], + ) + ) + await session.commit() + + shortlist = await shortlist_skills_for_request( + session, + user_id=user.id, + user_query="帮我分析一下资料", + limit=5, + ) + + await engine.dispose() + + assert all(item.skill_name != "retired-skill" for item in shortlist) + + +@pytest.mark.asyncio +async def test_activation_feedback_updates_effectiveness_score(tmp_path): + db_path = tmp_path / "skill_effectiveness.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="effect-user", + email="effect@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Effect User", + ) + session.add(user) + await session.flush() + + skill = Skill( + owner_id=user.id, + name="active-skill", + description="active skill", + instructions="active skill", + agent_type="analyst", + visibility="private", + is_active=True, + status="active", + scope=["analyst", "learned"], + effectiveness=0.4, + activation_count=1, + ) + session.add(skill) + await session.commit() + + await SkillService(session).record_activation_feedback( + user_id=user.id, + skill_name="active-skill", + outcome_score=0.8, + ) + refreshed = await session.get(Skill, skill.id) + + await engine.dispose() + + assert refreshed is not None + assert refreshed.effectiveness > 0.4 + + +@pytest.mark.asyncio +async def test_duplicate_candidate_does_not_increment_promotion_count_twice(tmp_path): + db_path = tmp_path / "duplicate_candidate.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="dup-user", + email="dup@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Dup User", + ) + session.add(user) + await session.flush() + + retrospective = build_session_retrospective( + request_id="dup-r1", + session_id="dup-c1", + user_query="先查资料再总结", + runtime_context={"user_id": user.id}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "passed", + "final_response": "done", + "completed_tasks": [ + {"task_id": "t1", "title": "collect", "status": "completed"}, + {"task_id": "t2", "title": "summarize", "status": "completed"}, + ], + }, + ) + candidate = SkillCandidateBuilder().build( + LearningPatternMiner().mine(RetrospectiveSignalExtractor().extract(retrospective)) + )[0] + + service = SkillService(session) + await service.upsert_learned_candidate( + user_id=user.id, + candidate=candidate, + primary_agent="analyst", + ) + await service.upsert_learned_candidate( + user_id=user.id, + candidate=candidate, + primary_agent="analyst", + ) + skill = await service.get_by_name_for_user(user.id, candidate.name) + + await engine.dispose() + + assert skill is not None + assert skill.candidate_count == 1 + assert skill.status == "candidate" + + +@pytest.mark.asyncio +async def test_low_effect_active_skill_degrades_to_deprecated(tmp_path): + db_path = tmp_path / "degrade_skill.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="degrade-user", + email="degrade@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Degrade User", + ) + session.add(user) + await session.flush() + + skill = Skill( + owner_id=user.id, + name="active-low-skill", + description="active low skill", + instructions="active low skill", + agent_type="analyst", + visibility="private", + is_active=True, + status="active", + scope=["analyst"], + effectiveness=0.2, + activation_count=2, + ) + session.add(skill) + await session.commit() + + decision = await SkillService(session).record_activation_feedback( + user_id=user.id, + skill_name="active-low-skill", + outcome_score=0.1, + ) + refreshed = await session.get(Skill, skill.id) + + await engine.dispose() + + assert decision is not None + assert decision.action == "degraded_to_deprecated" + assert refreshed is not None + assert refreshed.status == "deprecated" + + +@pytest.mark.asyncio +async def test_runtime_skill_shortlist_can_disable_learned_skill_loading(tmp_path): + db_path = tmp_path / "disable_learned_loading.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="builtin-user", + email="builtin@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Builtin User", + ) + session.add(user) + await session.flush() + + session.add( + Skill( + owner_id=user.id, + name="builtin-active-skill", + description="用于分析资料的 builtin active skill", + instructions="帮我分析资料并总结重点", + agent_type="analyst", + visibility="market", + is_builtin=True, + is_active=True, + status="active", + scope=["analyst"], + ) + ) + session.add( + Skill( + owner_id=user.id, + name="learned-active-skill", + description="用于分析资料的 learned active skill", + instructions="帮我分析资料并总结重点", + agent_type="analyst", + visibility="private", + is_active=True, + status="active", + scope=["analyst", "learned"], + ) + ) + await session.commit() + + shortlist = await shortlist_skills_for_request( + session, + user_id=user.id, + user_query="帮我分析一下资料", + include_learned=False, + limit=5, + ) + + await engine.dispose() + + names = [item.skill_name for item in shortlist] + assert "builtin-active-skill" in names + assert "learned-active-skill" not in names + + +@pytest.mark.asyncio +async def test_runtime_request_context_collects_assembly_metrics_with_small_latency(monkeypatch): + service = AgentService(None) + + async def fake_shortlist_retrospectives(self, *, user_id, query_text, conversation_id=None, limit=3): + await asyncio.sleep(0.01) + return [] + + async def fake_shortlist_skills(db, *, user_id, user_query, memory_context=None, retrospectives=None, include_learned=True, limit=4): + await asyncio.sleep(0.01) + return [] + + monkeypatch.setattr( + "app.services.agent_service.SessionRetrospectiveSearch.shortlist", + fake_shortlist_retrospectives, + ) + monkeypatch.setattr( + "app.services.agent_service.shortlist_skills_for_request", + fake_shortlist_skills, + ) + + runtime_request_context, _retros, _skills = await service._build_runtime_request_context( + request_id="req-latency", + user_id="u1", + conversation=SimpleNamespace(id="c1"), + user_query="帮我分析一下资料", + memory_context="【用户记忆】\n- 喜欢结构化输出", + ) + + assert runtime_request_context.assembly_metrics["retrospective_ms"] >= 0 + assert runtime_request_context.assembly_metrics["skill_shortlist_ms"] >= 0 + assert runtime_request_context.assembly_metrics["total_ms"] < 500 + + +def test_runtime_observability_report_includes_parallel_metrics_and_flags(): + report = build_runtime_observability_report( + state={ + "execution_mode": "collaboration", + "verification_status": "passed", + "skill_shortlist": [{"skill_name": "weekly-planning"}], + "retrospective_shortlist": [{"summary": "older"}], + "task_graph": {"nodes": [{"node_id": "t1"}]}, + "scheduled_subtasks": [{"subtask_id": "t1"}], + "task_results": [{"task_id": "t1", "status": "completed"}], + "merge_report": {"status": "merged", "conflict_flags": [], "fallback_used": False}, + }, + feature_flags={"ENABLE_PARALLEL_TASK_GRAPH": True}, + ) + + assert report["execution_mode"] == "collaboration" + assert report["parallel_metrics"]["task_graph_node_count"] == 1 + assert report["parallel_metrics"]["completed_subtask_count"] == 1 + assert report["feature_flags"]["ENABLE_PARALLEL_TASK_GRAPH"] is True + + +@pytest.mark.asyncio +async def test_session_retrospective_search_supports_task_type_and_skill_filters(tmp_path): + db_path = tmp_path / "retrospective_filters.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + store = SessionRetrospectiveStore(session) + await store.save( + build_session_retrospective( + request_id="r1", + session_id="c1", + user_query="帮我安排下周计划", + runtime_context={"user_id": "u1"}, + state={ + "execution_mode": "collaboration", + "current_agent": "schedule_planner", + "verification_status": "passed", + "final_response": "done", + "skill_shortlist": [{"skill_name": "weekly-planning"}], + "completed_tasks": [{"task_id": "t1", "title": "plan", "status": "completed"}], + }, + ) + ) + await store.save( + build_session_retrospective( + request_id="r2", + session_id="c2", + user_query="帮我分析风险", + runtime_context={"user_id": "u1"}, + state={ + "execution_mode": "collaboration", + "current_agent": "analyst", + "verification_status": "passed", + "final_response": "done", + "skill_shortlist": [{"skill_name": "risk-analysis"}], + "completed_tasks": [{"task_id": "t2", "title": "analysis", "status": "completed"}], + }, + ) + ) + + results = await SessionRetrospectiveSearch(session).shortlist( + user_id="u1", + query_text="帮我安排计划", + task_type="planning_or_execution", + skill_name="weekly-planning", + limit=5, + ) + + await engine.dispose() + + assert len(results) == 1 + assert results[0].used_skill_names == ["weekly-planning"] + + +@pytest.mark.asyncio +async def test_learning_artifact_store_aggregates_skill_candidate_counts(tmp_path): + db_path = tmp_path / "artifact_aggregate.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + store = LearningArtifactStore(session) + await store.save_batch( + user_id="u1", + conversation_id="c1", + retrospective_id="r1", + artifacts=[ + {"artifact_type": "skill_candidate", "artifact_key": "workflow-a", "summary_text": "a", "payload": {}}, + {"artifact_type": "skill_candidate", "artifact_key": "workflow-a", "summary_text": "a2", "payload": {}}, + {"artifact_type": "skill_candidate", "artifact_key": "workflow-b", "summary_text": "b", "payload": {}}, + ], + ) + counts = await store.aggregate_counts_by_key( + user_id="u1", + artifact_type="skill_candidate", + ) + + await engine.dispose() + + assert counts["workflow-a"] == 2 + assert counts["workflow-b"] == 1 + + +@pytest.mark.asyncio +async def test_run_decay_review_deprecates_shadow_and_retires_deprecated_skills(tmp_path): + db_path = tmp_path / "decay_review.db" + engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", future=True) + session_factory = async_sessionmaker(engine, expire_on_commit=False) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + async with session_factory() as session: + user = User( + username="review-user", + email="review@example.com", + hashed_password=get_password_hash("secret123"), + full_name="Review User", + ) + session.add(user) + await session.flush() + + session.add( + Skill( + owner_id=user.id, + name="shadow-low-skill", + description="shadow low skill", + instructions="shadow low skill", + agent_type="analyst", + visibility="private", + is_active=True, + status="shadow", + effectiveness=0.3, + review_after=datetime.utcnow(), + ) + ) + session.add( + Skill( + owner_id=user.id, + name="deprecated-low-skill", + description="deprecated low skill", + instructions="deprecated low skill", + agent_type="analyst", + visibility="private", + is_active=True, + status="deprecated", + effectiveness=0.1, + review_after=datetime.utcnow(), + ) + ) + await session.commit() + + decisions = await SkillService(session).run_decay_review(user_id=user.id) + shadow_skill = await SkillService(session).get_by_name_for_user(user.id, "shadow-low-skill") + deprecated_skill = await SkillService(session).get_by_name_for_user(user.id, "deprecated-low-skill") + + await engine.dispose() + + actions = {decision.skill_name: decision.action for decision in decisions} + assert actions["shadow-low-skill"] == "degraded_to_deprecated" + assert actions["deprecated-low-skill"] == "retired" + assert shadow_skill is not None and shadow_skill.status == "deprecated" + assert deprecated_skill is not None and deprecated_skill.status == "retired"