From 74fdfc26522e1d323cea75dd33fb6944e97a7066 Mon Sep 17 00:00:00 2001 From: "WIN-JHFT4D3SIVT\\caoxiaozhu" Date: Wed, 8 Apr 2026 00:12:08 +0800 Subject: [PATCH] feat(services): enhance services with rollback and observability Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- backend/app/services/agent_service.py | 279 +++++++++++++++++- backend/app/services/rollback_controller.py | 25 ++ backend/app/services/runtime_observability.py | 32 ++ backend/app/services/skill_service.py | 255 +++++++++++++++- backend/app/services/system_service.py | 98 +++++- 5 files changed, 675 insertions(+), 14 deletions(-) create mode 100644 backend/app/services/rollback_controller.py create mode 100644 backend/app/services/runtime_observability.py diff --git a/backend/app/services/agent_service.py b/backend/app/services/agent_service.py index 81e4840..8ecd75e 100644 --- a/backend/app/services/agent_service.py +++ b/backend/app/services/agent_service.py @@ -7,12 +7,13 @@ import json import uuid import logging from datetime import UTC, datetime +from time import perf_counter from typing import Any, AsyncGenerator import asyncio from openai import BadRequestError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select -from langchain_core.messages import HumanMessage, AIMessage +from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from app.database import async_session from app.logging_utils import summarize_llm_config @@ -21,10 +22,24 @@ from app.models.conversation import Conversation, Message from app.models.user import User from app.agents.graph import get_agent_graph from app.agents.context import set_current_user, clear_current_user +from app.agents.learning.jobs import schedule_retrospective_job +from app.agents.learning.retrospector import build_session_retrospective +from app.agents.learning.session_search import SessionRetrospectiveSearch, summarize_retrospective +from app.agents.orchestration.task_graph import build_bounded_task_graph +from app.agents.learning.store import append_retrospective_attachment +from app.agents.schemas.orchestration import ( + RuntimeRequestContext, + assess_parallel_worthiness, + render_runtime_request_context_summary, +) +from app.agents.schemas.skills import SkillActivationRecord from app.agents.skills.registry import get_skill_registry +from app.agents.skills.retriever import shortlist_skills_for_request from app.services import memory_service from app.services.brain_service import BrainService from app.services.llm_service import create_llm_from_config, resolve_provider_capabilities +from app.services.rollback_controller import RollbackController +from app.services.runtime_observability import build_runtime_observability_report from app.agents.tools.time_reasoning import extract_reference_datetime from app.agents.state import initial_state @@ -36,6 +51,7 @@ MEMORY_SECTION_HEADERS = ( "【之前对话摘要】", "【知识大脑】", ) +MEMORY_INLINE_HEADERS = {"[关于你的记忆]"} def _split_memory_context_sections(memory_context: str | None) -> dict[str, str]: @@ -81,6 +97,41 @@ def _derive_role_memory_contexts(memory_context: str | None) -> dict[str, str | } +def _extract_memory_highlights(memory_context: str | None, *, limit: int = 5) -> list[str]: + text = (memory_context or "").strip() + if not text: + return [] + + highlights: list[str] = [] + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line or line in MEMORY_SECTION_HEADERS or line in MEMORY_INLINE_HEADERS: + continue + if line.startswith("-"): + normalized = line[1:].strip() + else: + normalized = line + if normalized: + highlights.append(normalized) + if len(highlights) >= limit: + break + return highlights + + +def _summarize_retrospective(retrospective: Any) -> str: + summary = str(getattr(retrospective, "summary", "") or "").strip() + task_type = str(getattr(retrospective, "task_type", "") or "").strip() + execution_mode = str(getattr(retrospective, "execution_mode", "") or "").strip() + outcome = str(getattr(retrospective, "outcome", "") or "").strip() + + parts = [summary[:80] or task_type or "历史复盘"] + if execution_mode: + parts.append(f"mode={execution_mode}") + if outcome: + parts.append(f"outcome={outcome}") + return ";".join(parts) + + def _is_streaming_rejection_error(error: Exception, user_llm_config: dict | None) -> bool: capabilities = resolve_provider_capabilities(user_llm_config) error_text = str(error).lower() @@ -461,18 +512,27 @@ class AgentService: async def _build_agent_state( self, *, + request_id: str, user_id: str, conversation: Conversation, + raw_user_query: str, full_message: str, memory_context: str | None, current_datetime_context: str, current_datetime_reference: dict[str, str], user_llm_config: dict | None, + runtime_request_context: RuntimeRequestContext, + recalled_retrospectives: list[dict[str, Any]], + skill_shortlist: list[dict[str, Any]], ) -> dict[str, Any]: state = initial_state(user_id, conversation.id) + runtime_summary = render_runtime_request_context_summary(runtime_request_context) state.update( { - "messages": [HumanMessage(content=full_message)], + "messages": [ + SystemMessage(content=runtime_summary), + HumanMessage(content=full_message), + ], "memory_context": memory_context, "current_datetime_context": current_datetime_context, "current_datetime_reference": current_datetime_reference, @@ -482,9 +542,119 @@ class AgentService: previous_snapshot = await self._load_continuity_snapshot(conversation) if previous_snapshot: state.update(previous_snapshot) - state["messages"] = [HumanMessage(content=full_message)] + state["messages"] = [ + SystemMessage(content=runtime_summary), + HumanMessage(content=full_message), + ] + state.update( + { + "runtime_request_context": runtime_request_context.model_dump(mode="json"), + "task_graph": ( + runtime_request_context.task_graph.model_dump(mode="json") + if runtime_request_context.task_graph is not None + else None + ), + "feature_flags": RollbackController().snapshot_flags(), + "recalled_retrospectives": recalled_retrospectives, + "retrospective_shortlist": recalled_retrospectives, + "skill_shortlist": skill_shortlist, + "skill_activation_records": [ + SkillActivationRecord( + skill_name=item.get("skill_name"), + source=item.get("source", "runtime"), + source_id=item.get("source_id"), + status=item.get("status", "active"), + injection_mode=item.get("injection_mode", "metadata_only"), + matched_terms=item.get("matched_terms", []), + rationale=item.get("rationale"), + ).model_dump(mode="json") + for item in skill_shortlist + if item.get("skill_name") + ], + "parallel_worthiness": runtime_request_context.parallel_worthiness.model_dump( + mode="json" + ), + } + ) return state + async def _build_runtime_request_context( + self, + *, + request_id: str, + user_id: str, + conversation: Conversation, + user_query: str, + memory_context: str | None, + ) -> tuple[RuntimeRequestContext, list[dict[str, Any]], list[dict[str, Any]]]: + started_at = perf_counter() + retrospectives_started = perf_counter() + recent_retrospectives = await SessionRetrospectiveSearch(self.db).shortlist( + user_id=user_id, + query_text=user_query, + conversation_id=conversation.id, + limit=3, + ) + retrospective_ms = (perf_counter() - retrospectives_started) * 1000 + feature_flags = RollbackController().snapshot_flags() + shortlist_started = perf_counter() + skill_shortlist = await shortlist_skills_for_request( + self.db, + user_id=user_id, + user_query=user_query, + memory_context=memory_context, + retrospectives=[item.model_dump(mode="json") for item in recent_retrospectives], + include_learned=feature_flags["ENABLE_LEARNED_SKILL_LOADING"], + limit=4, + ) + skill_shortlist_ms = (perf_counter() - shortlist_started) * 1000 + parallel_worthiness = assess_parallel_worthiness( + user_query, + retrospective_count=len(recent_retrospectives), + skill_count=len(skill_shortlist), + ) + recommended_runtime_mode = ( + "collaboration" if parallel_worthiness.preferred_mode != "direct" else "direct" + ) + task_graph = ( + build_bounded_task_graph( + query_text=user_query, + parallel_worthiness=parallel_worthiness, + ) + if feature_flags["ENABLE_PARALLEL_TASK_GRAPH"] + else None + ) + runtime_request_context = RuntimeRequestContext( + request_id=request_id, + session_id=conversation.id, + conversation_id=conversation.id, + user_id=user_id, + query_text=user_query, + raw_user_query=user_query, + recalled_memories=_extract_memory_highlights(memory_context), + recalled_retrospectives=[ + summarize_retrospective(retrospective) for retrospective in recent_retrospectives + ], + shortlisted_skills=[entry.skill_name for entry in skill_shortlist], + skill_shortlist=skill_shortlist, + current_agent_role="master", + execution_mode=recommended_runtime_mode, + conversation_state_ref=conversation.id, + parallel_worthiness=parallel_worthiness, + task_graph=task_graph, + recommended_runtime_mode=recommended_runtime_mode, + assembly_metrics={ + "retrospective_ms": round(retrospective_ms, 3), + "skill_shortlist_ms": round(skill_shortlist_ms, 3), + "total_ms": round((perf_counter() - started_at) * 1000, 3), + }, + ) + return ( + runtime_request_context, + [item.model_dump(mode="json") for item in recent_retrospectives], + [item.model_dump(mode="json") for item in skill_shortlist], + ) + async def chat( self, user_id: str, @@ -610,21 +780,38 @@ class AgentService: async def run_agent(): collected = "" state: dict[str, Any] | None = None + runtime_request_context: RuntimeRequestContext | None = None set_current_user(user_id) try: graph = get_agent_graph() current_datetime_context, current_datetime_reference = ( self._build_current_datetime_context() ) - - state = await self._build_agent_state( + ( + runtime_request_context, + recalled_retrospectives, + skill_shortlist, + ) = await self._build_runtime_request_context( + request_id=assistant_msg.id, user_id=user_id, conversation=conv, + user_query=message, + memory_context=memory_ctx, + ) + + state = await self._build_agent_state( + request_id=assistant_msg.id, + user_id=user_id, + conversation=conv, + raw_user_query=message, full_message=full_message, memory_context=memory_ctx, current_datetime_context=current_datetime_context, current_datetime_reference=current_datetime_reference, user_llm_config=user_llm_config, + runtime_request_context=runtime_request_context, + recalled_retrospectives=recalled_retrospectives, + skill_shortlist=skill_shortlist, ) state.update(_derive_role_memory_contexts(memory_ctx)) @@ -749,7 +936,7 @@ class AgentService: if collected: assistant_msg.content = collected continuity_snapshot = _build_continuity_snapshot(state or {}) - assistant_msg.attachments = ( + attachments = ( [ { "kind": "agent_continuity_state", @@ -757,8 +944,26 @@ class AgentService: } ] if continuity_snapshot - else None + else [] ) + if state is not None and runtime_request_context is not None: + retrospective = build_session_retrospective( + request_id=assistant_msg.id, + session_id=conversation_id, + user_query=message, + state=state, + runtime_context=runtime_request_context, + ) + attachments = append_retrospective_attachment(attachments, retrospective) + attachments.append( + { + "kind": "runtime_observability", + "payload": build_runtime_observability_report( + state=state, + feature_flags=state.get("feature_flags") or {}, + ), + } + ) conv.agent_state = ( { "kind": "agent_continuity_state", @@ -771,8 +976,18 @@ class AgentService: user_id, **_build_assistant_event_payload(collected), ) + assistant_msg.attachments = attachments or None await self.db.commit() await self.db.refresh(assistant_msg) + schedule_retrospective_job( + user_id=user_id, + conversation_id=conversation_id, + request_message_id=user_msg.id, + response_message_id=assistant_msg.id, + query_text=message, + final_response=collected, + state=state, + ) except Exception: logger.exception("save_assistant_message_failed") asyncio.create_task(self._try_auto_summarize_background(user_id, conversation_id)) @@ -863,14 +1078,30 @@ class AgentService: current_datetime_context, current_datetime_reference = ( self._build_current_datetime_context() ) - state = await self._build_agent_state( + ( + runtime_request_context, + recalled_retrospectives, + skill_shortlist, + ) = await self._build_runtime_request_context( + request_id=assistant_msg.id, user_id=user_id, conversation=conv, + user_query=message, + memory_context=memory_ctx, + ) + state = await self._build_agent_state( + request_id=assistant_msg.id, + user_id=user_id, + conversation=conv, + raw_user_query=message, full_message=message, memory_context=memory_ctx, current_datetime_context=current_datetime_context, current_datetime_reference=current_datetime_reference, user_llm_config=user_llm_config, + runtime_request_context=runtime_request_context, + recalled_retrospectives=recalled_retrospectives, + skill_shortlist=skill_shortlist, ) state.update(_derive_role_memory_contexts(memory_ctx)) result_state = await graph.ainvoke(state) @@ -900,7 +1131,7 @@ class AgentService: continuity_snapshot = ( _build_continuity_snapshot(result_state) if "result_state" in locals() else None ) - assistant_msg.attachments = ( + attachments = ( [ { "kind": "agent_continuity_state", @@ -908,8 +1139,26 @@ class AgentService: } ] if continuity_snapshot - else None + else [] ) + if "result_state" in locals() and "runtime_request_context" in locals(): + retrospective = build_session_retrospective( + request_id=assistant_msg.id, + session_id=conversation_id, + user_query=message, + state=result_state, + runtime_context=runtime_request_context, + ) + attachments = append_retrospective_attachment(attachments, retrospective) + attachments.append( + { + "kind": "runtime_observability", + "payload": build_runtime_observability_report( + state=result_state, + feature_flags=result_state.get("feature_flags") or {}, + ), + } + ) conv.agent_state = ( { "kind": "agent_continuity_state", @@ -918,7 +1167,17 @@ class AgentService: if continuity_snapshot else None ) + assistant_msg.attachments = attachments or None await self.db.commit() await self.db.refresh(assistant_msg) + schedule_retrospective_job( + user_id=user_id, + conversation_id=conversation_id, + request_message_id=user_msg.id, + response_message_id=assistant_msg.id, + query_text=message, + final_response=response_content, + state=result_state if "result_state" in locals() else None, + ) return conversation_id, assistant_msg.id, response_content, model_name_used diff --git a/backend/app/services/rollback_controller.py b/backend/app/services/rollback_controller.py new file mode 100644 index 0000000..293cd82 --- /dev/null +++ b/backend/app/services/rollback_controller.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from app.config import settings + + +FEATURE_FLAG_NAMES = ( + "ENABLE_RETROSPECTIVE", + "ENABLE_SESSION_RETROSPECTIVE_SEARCH", + "ENABLE_RUNTIME_SKILL_SHORTLIST", + "ENABLE_LEARNING_SIGNALS", + "ENABLE_SKILL_PROMOTION", + "ENABLE_LEARNED_SKILL_LOADING", + "ENABLE_PARALLEL_TASK_GRAPH", +) + + +class RollbackController: + def snapshot_flags(self) -> dict[str, bool]: + return { + flag_name: bool(getattr(settings, flag_name, False)) + for flag_name in FEATURE_FLAG_NAMES + } + + def is_enabled(self, flag_name: str) -> bool: + return bool(getattr(settings, flag_name, False)) diff --git a/backend/app/services/runtime_observability.py b/backend/app/services/runtime_observability.py new file mode 100644 index 0000000..4ec2deb --- /dev/null +++ b/backend/app/services/runtime_observability.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Any + +from app.agents.orchestration.monitor import build_parallel_runtime_metrics + + +def build_runtime_observability_report( + *, + state: dict[str, Any], + feature_flags: dict[str, bool] | None = None, +) -> dict[str, Any]: + task_graph = state.get("task_graph") if isinstance(state.get("task_graph"), dict) else None + scheduled_subtasks = ( + state.get("scheduled_subtasks") if isinstance(state.get("scheduled_subtasks"), list) else [] + ) + task_results = state.get("task_results") if isinstance(state.get("task_results"), list) else [] + merge_report = state.get("merge_report") if isinstance(state.get("merge_report"), dict) else None + + return { + "execution_mode": state.get("execution_mode"), + "verification_status": state.get("verification_status"), + "skill_shortlist_count": len(state.get("skill_shortlist") or []), + "retrospective_shortlist_count": len(state.get("retrospective_shortlist") or []), + "feature_flags": feature_flags or {}, + "parallel_metrics": build_parallel_runtime_metrics( + task_graph=task_graph, + scheduled_subtasks=scheduled_subtasks, + task_results=task_results, + merge_report=merge_report, + ), + } diff --git a/backend/app/services/skill_service.py b/backend/app/services/skill_service.py index 716556d..c928d5d 100644 --- a/backend/app/services/skill_service.py +++ b/backend/app/services/skill_service.py @@ -3,9 +3,13 @@ Skill Service - 技能管理服务层 负责技能的创建、查询、更新、删除等操作 """ +import hashlib +from datetime import UTC, datetime, timedelta from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_, or_ +from app.agents.schemas.learning import SkillCandidate +from app.agents.skills.models import SkillLifecycleDecision from app.models.skill import Skill from app.models.user import User @@ -28,6 +32,10 @@ class SkillService: visibility=data.get("visibility", "private"), team_id=data.get("team_id"), is_active=data.get("is_active", True), + status=data.get("status", "active"), + scope=data.get("scope", []), + effectiveness=data.get("effectiveness", 0.0), + review_after=data.get("review_after"), ) self.db.add(skill) await self.db.commit() @@ -41,6 +49,17 @@ class SkillService: ) return result.scalar_one_or_none() + async def get_by_name_for_user(self, user_id: str, name: str) -> Optional[Skill]: + access_scope = or_( + Skill.owner_id == user_id, + Skill.visibility == "market", + Skill.team_id == user_id, + ) + result = await self.db.execute( + select(Skill).where(and_(Skill.name == name, access_scope)) + ) + return result.scalar_one_or_none() + async def list_for_user( self, user_id: str, @@ -56,7 +75,7 @@ class SkillService: Skill.team_id == user_id, ) - filters = [access_scope, Skill.is_active == True] + filters = [access_scope, Skill.is_active == True, Skill.status != "retired"] if agent_type: filters.append(Skill.agent_type == agent_type) @@ -83,7 +102,7 @@ class SkillService: update_fields = [ "name", "description", "instructions", "agent_type", "tools", "required_context", "output_format", "visibility", - "team_id", "is_active" + "team_id", "is_active", "status", "scope", "effectiveness", "review_after" ] for field in update_fields: @@ -117,6 +136,7 @@ class SkillService: and_( Skill.agent_type == agent_type, Skill.is_active == True, + Skill.status == "active", or_( Skill.visibility == "market", Skill.visibility == "private" @@ -125,3 +145,234 @@ class SkillService: ) ) return list(result.scalars().all()) + + async def list_runtime_candidates( + self, + user_id: str, + *, + agent_type: Optional[str] = None, + include_shadow: bool = True, + include_learned: bool = True, + ) -> list[Skill]: + allowed_statuses = ["active", "shadow"] if include_shadow else ["active"] + access_scope = or_( + Skill.owner_id == user_id, + Skill.visibility == "market", + Skill.team_id == user_id, + ) + + filters = [ + access_scope, + Skill.is_active == True, + Skill.status.in_(allowed_statuses), + ] + if not include_learned: + filters.append(Skill.is_builtin == True) + if agent_type: + filters.append(Skill.agent_type == agent_type) + + result = await self.db.execute(select(Skill).where(and_(*filters))) + return list(result.scalars().all()) + + async def upsert_learned_candidate( + self, + *, + user_id: str, + candidate: SkillCandidate, + primary_agent: str | None, + evidence_refs: list[dict] | None = None, + ) -> SkillLifecycleDecision: + source_hash = self._build_candidate_source_hash(candidate) + skill = await self.get_by_name_for_user(user_id, candidate.name) + if skill is None: + review_after = datetime.now(UTC) + timedelta(days=7) + skill = Skill( + owner_id=user_id, + name=candidate.name, + description=candidate.summary, + instructions=candidate.summary, + agent_type=primary_agent or "master", + tools=[], + required_context=[], + output_format=None, + visibility="private", + is_active=True, + status="candidate", + scope=[primary_agent or "master", "learned", candidate.candidate_type], + effectiveness=candidate.confidence, + review_after=review_after, + candidate_count=1, + candidate_source_hashes=[source_hash], + ) + self.db.add(skill) + await self.db.commit() + await self.db.refresh(skill) + return SkillLifecycleDecision( + skill_name=skill.name, + action="created_candidate", + previous_status=None, + new_status="candidate", + reason="First learned candidate created from retrospective evidence.", + evidence_refs=evidence_refs or [], + confidence=candidate.confidence, + review_after=review_after, + ) + + previous_status = skill.status + known_hashes = list(skill.candidate_source_hashes or []) + is_duplicate_candidate = source_hash in known_hashes + if not is_duplicate_candidate: + skill.candidate_count = int(skill.candidate_count or 0) + 1 + known_hashes.append(source_hash) + skill.candidate_source_hashes = known_hashes + current_effectiveness = float(skill.effectiveness or 0.0) + skill.effectiveness = round(max(current_effectiveness, float(candidate.confidence or 0.0)), 3) + skill.review_after = datetime.now(UTC) + timedelta(days=7) + if primary_agent and primary_agent not in (skill.scope or []): + skill.scope = [*(skill.scope or []), primary_agent] + + action = "no_change" + reason = "Candidate evidence refreshed." + if is_duplicate_candidate: + reason = "Duplicate candidate evidence ignored for promotion counting." + if ( + not is_duplicate_candidate + and skill.status == "candidate" + and skill.candidate_count >= 2 + and skill.effectiveness >= 0.6 + ): + skill.status = "shadow" + action = "promoted_to_shadow" + reason = "Repeated candidate evidence promoted the learned skill to shadow." + + await self.db.commit() + await self.db.refresh(skill) + return SkillLifecycleDecision( + skill_name=skill.name, + action=action, + previous_status=previous_status, + new_status=skill.status, + reason=reason, + evidence_refs=evidence_refs or [], + confidence=skill.effectiveness, + review_after=skill.review_after, + ) + + async def record_activation_feedback( + self, + *, + user_id: str, + skill_name: str, + outcome_score: float, + evidence_refs: list[dict] | None = None, + ) -> SkillLifecycleDecision | None: + skill = await self.get_by_name_for_user(user_id, skill_name) + if skill is None or skill.status not in {"shadow", "active", "deprecated"}: + return None + + previous_status = skill.status + previous_activation_count = int(skill.activation_count or 0) + skill.activation_count = previous_activation_count + 1 + skill.last_activated_at = datetime.now(UTC) + + previous_effectiveness = float(skill.effectiveness or 0.0) + if previous_activation_count <= 0: + skill.effectiveness = round(outcome_score, 3) + else: + skill.effectiveness = round( + ((previous_effectiveness * previous_activation_count) + outcome_score) + / skill.activation_count, + 3, + ) + + action = "feedback_recorded" + reason = "Activation outcome recorded." + if skill.status == "shadow" and skill.activation_count >= 2 and skill.effectiveness >= 0.7: + skill.status = "active" + action = "promoted_to_active" + reason = "Shadow skill proved effective enough to become active." + elif skill.status == "active" and skill.activation_count >= 3 and skill.effectiveness < 0.35: + skill.status = "deprecated" + action = "degraded_to_deprecated" + reason = "Active skill underperformed repeatedly and was deprecated." + elif skill.status == "deprecated" and skill.activation_count >= 4 and skill.effectiveness < 0.2: + skill.status = "retired" + action = "retired" + reason = "Deprecated skill stayed ineffective and was retired." + elif skill.status == "deprecated" and skill.effectiveness >= 0.65 and outcome_score >= 0.8: + skill.status = "active" + action = "reactivated" + reason = "Deprecated skill recovered with strong positive feedback." + + skill.review_after = datetime.now(UTC) + timedelta(days=7) + await self.db.commit() + await self.db.refresh(skill) + return SkillLifecycleDecision( + skill_name=skill.name, + action=action, + previous_status=previous_status, + new_status=skill.status, + reason=reason, + evidence_refs=evidence_refs or [], + confidence=skill.effectiveness, + review_after=skill.review_after, + ) + + async def run_decay_review( + self, + *, + user_id: str, + as_of: datetime | None = None, + ) -> list[SkillLifecycleDecision]: + review_time = as_of or datetime.now(UTC) + result = await self.db.execute( + select(Skill).where( + and_( + Skill.owner_id == user_id, + Skill.is_active == True, + Skill.status.in_(["shadow", "active", "deprecated"]), + Skill.review_after.is_not(None), + Skill.review_after <= review_time, + ) + ) + ) + skills = list(result.scalars().all()) + decisions: list[SkillLifecycleDecision] = [] + for skill in skills: + previous_status = skill.status + action = "no_change" + reason = "Review completed without status change." + + if skill.status == "shadow" and float(skill.effectiveness or 0.0) < 0.45: + skill.status = "deprecated" + action = "degraded_to_deprecated" + reason = "Shadow skill review found low effectiveness." + elif skill.status == "deprecated" and float(skill.effectiveness or 0.0) < 0.2: + skill.status = "retired" + action = "retired" + reason = "Deprecated skill remained weak through review." + + skill.review_after = review_time + timedelta(days=7) + decisions.append( + SkillLifecycleDecision( + skill_name=skill.name, + action=action, + previous_status=previous_status, + new_status=skill.status, + reason=reason, + confidence=skill.effectiveness, + review_after=skill.review_after, + ) + ) + + await self.db.commit() + return decisions + + @staticmethod + def _build_candidate_source_hash(candidate: SkillCandidate) -> str: + raw = ( + f"{candidate.name}|{candidate.summary}|" + f"{','.join(candidate.source_pattern_ids)}|" + f"{len(candidate.evidence_refs)}" + ).encode("utf-8") + return hashlib.sha1(raw).hexdigest() diff --git a/backend/app/services/system_service.py b/backend/app/services/system_service.py index 7669a95..6a21925 100644 --- a/backend/app/services/system_service.py +++ b/backend/app/services/system_service.py @@ -4,6 +4,9 @@ import os import platform import socket import subprocess +import time + +import httpx try: import psutil @@ -15,6 +18,10 @@ class SystemService: _last_net_bytes_sent: int | None = None _last_net_bytes_recv: int | None = None _last_net_sample_at: float | None = None + _weather_cache: dict | None = None + _weather_cached_at: float | None = None + _weather_cached_location: str | None = None + _weather_cache_ttl_seconds: float = 10 * 60 # 10 minutes def __init__(self): # Import settings here to avoid circular imports @@ -134,8 +141,95 @@ class SystemService: 'timestamp': datetime.now(UTC).isoformat(), } - def get_config(self) -> dict: + async def _fetch_weather(self, location: str) -> dict: + try: + timeout = httpx.Timeout(10.0, connect=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.get(f'https://wttr.in/{location}', params={'format': 'j1'}) + response.raise_for_status() + payload = response.json() + current = (payload.get('current_condition') or [{}])[0] + weather_code = current.get('weatherCode') + temp = current.get('temp_C') + parsed_code = int(weather_code) if weather_code is not None and str(weather_code).isdigit() else None + if parsed_code is None or temp in (None, ''): + return {'weather_code': None, 'weather_summary': 'Weather unavailable'} + + label = self._weather_code_label(parsed_code) + return { + 'weather_code': parsed_code, + 'weather_summary': f'{label} {temp}°C', + } + except (httpx.HTTPError, ValueError, TypeError): + return {'weather_code': None, 'weather_summary': 'Weather unavailable'} + + @staticmethod + def _weather_code_label(code: int | None) -> str: + if code == 0: + return 'Clear' + if code in {1, 2}: + return 'Partly Cloudy' + if code == 3: + return 'Overcast' + if code in {45, 48}: + return 'Fog' + if code in {51, 53, 55, 56, 57}: + return 'Drizzle' + if code in {61, 63, 65, 66, 67, 80, 81, 82}: + return 'Rain' + if code in {71, 73, 75, 77, 85, 86}: + return 'Snow' + if code in {95, 96, 99}: + return 'Thunderstorm' + return 'Weather' + + async def get_config(self) -> dict: """Get public system configuration.""" + location = self._settings.LOCATION + now = time.time() + cached_weather = self.__class__._weather_cache + cached_at = self.__class__._weather_cached_at + cached_location = self.__class__._weather_cached_location + + cache_is_valid = ( + cached_weather is not None + and cached_at is not None + and cached_location == location + and (now - cached_at) < self.__class__._weather_cache_ttl_seconds + ) + + if cache_is_valid: + return { + 'location': location, + **cached_weather, + 'weather_cached': True, + 'weather_cached_at': cached_at, + } + + weather = await self._fetch_weather(location) + + # If fetch failed but we have *any* last known weather for same location, return it to avoid UI flicker. + if ( + (weather.get('weather_code') is None) + and cached_weather is not None + and cached_location == location + ): + return { + 'location': location, + **cached_weather, + 'weather_cached': True, + 'weather_cached_at': cached_at, + 'weather_stale': True, + } + + # Update cache on successful/meaningful payload (or keep "unavailable" if never succeeded). + self.__class__._weather_cache = weather + self.__class__._weather_cached_at = now + self.__class__._weather_cached_location = location + return { - 'location': self._settings.LOCATION, + 'location': location, + **weather, + 'weather_cached': False, + 'weather_cached_at': now, }