feat(services): enhance services with rollback and observability

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
2026-04-08 00:12:08 +08:00
parent 36c93a764f
commit 74fdfc2652
5 changed files with 675 additions and 14 deletions

View File

@@ -7,12 +7,13 @@ import json
import uuid
import logging
from datetime import UTC, datetime
from time import perf_counter
from typing import Any, AsyncGenerator
import asyncio
from openai import BadRequestError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from app.database import async_session
from app.logging_utils import summarize_llm_config
@@ -21,10 +22,24 @@ from app.models.conversation import Conversation, Message
from app.models.user import User
from app.agents.graph import get_agent_graph
from app.agents.context import set_current_user, clear_current_user
from app.agents.learning.jobs import schedule_retrospective_job
from app.agents.learning.retrospector import build_session_retrospective
from app.agents.learning.session_search import SessionRetrospectiveSearch, summarize_retrospective
from app.agents.orchestration.task_graph import build_bounded_task_graph
from app.agents.learning.store import append_retrospective_attachment
from app.agents.schemas.orchestration import (
RuntimeRequestContext,
assess_parallel_worthiness,
render_runtime_request_context_summary,
)
from app.agents.schemas.skills import SkillActivationRecord
from app.agents.skills.registry import get_skill_registry
from app.agents.skills.retriever import shortlist_skills_for_request
from app.services import memory_service
from app.services.brain_service import BrainService
from app.services.llm_service import create_llm_from_config, resolve_provider_capabilities
from app.services.rollback_controller import RollbackController
from app.services.runtime_observability import build_runtime_observability_report
from app.agents.tools.time_reasoning import extract_reference_datetime
from app.agents.state import initial_state
@@ -36,6 +51,7 @@ MEMORY_SECTION_HEADERS = (
"【之前对话摘要】",
"【知识大脑】",
)
MEMORY_INLINE_HEADERS = {"[关于你的记忆]"}
def _split_memory_context_sections(memory_context: str | None) -> dict[str, str]:
@@ -81,6 +97,41 @@ def _derive_role_memory_contexts(memory_context: str | None) -> dict[str, str |
}
def _extract_memory_highlights(memory_context: str | None, *, limit: int = 5) -> list[str]:
text = (memory_context or "").strip()
if not text:
return []
highlights: list[str] = []
for raw_line in text.splitlines():
line = raw_line.strip()
if not line or line in MEMORY_SECTION_HEADERS or line in MEMORY_INLINE_HEADERS:
continue
if line.startswith("-"):
normalized = line[1:].strip()
else:
normalized = line
if normalized:
highlights.append(normalized)
if len(highlights) >= limit:
break
return highlights
def _summarize_retrospective(retrospective: Any) -> str:
summary = str(getattr(retrospective, "summary", "") or "").strip()
task_type = str(getattr(retrospective, "task_type", "") or "").strip()
execution_mode = str(getattr(retrospective, "execution_mode", "") or "").strip()
outcome = str(getattr(retrospective, "outcome", "") or "").strip()
parts = [summary[:80] or task_type or "历史复盘"]
if execution_mode:
parts.append(f"mode={execution_mode}")
if outcome:
parts.append(f"outcome={outcome}")
return "".join(parts)
def _is_streaming_rejection_error(error: Exception, user_llm_config: dict | None) -> bool:
capabilities = resolve_provider_capabilities(user_llm_config)
error_text = str(error).lower()
@@ -461,18 +512,27 @@ class AgentService:
async def _build_agent_state(
self,
*,
request_id: str,
user_id: str,
conversation: Conversation,
raw_user_query: str,
full_message: str,
memory_context: str | None,
current_datetime_context: str,
current_datetime_reference: dict[str, str],
user_llm_config: dict | None,
runtime_request_context: RuntimeRequestContext,
recalled_retrospectives: list[dict[str, Any]],
skill_shortlist: list[dict[str, Any]],
) -> dict[str, Any]:
state = initial_state(user_id, conversation.id)
runtime_summary = render_runtime_request_context_summary(runtime_request_context)
state.update(
{
"messages": [HumanMessage(content=full_message)],
"messages": [
SystemMessage(content=runtime_summary),
HumanMessage(content=full_message),
],
"memory_context": memory_context,
"current_datetime_context": current_datetime_context,
"current_datetime_reference": current_datetime_reference,
@@ -482,9 +542,119 @@ class AgentService:
previous_snapshot = await self._load_continuity_snapshot(conversation)
if previous_snapshot:
state.update(previous_snapshot)
state["messages"] = [HumanMessage(content=full_message)]
state["messages"] = [
SystemMessage(content=runtime_summary),
HumanMessage(content=full_message),
]
state.update(
{
"runtime_request_context": runtime_request_context.model_dump(mode="json"),
"task_graph": (
runtime_request_context.task_graph.model_dump(mode="json")
if runtime_request_context.task_graph is not None
else None
),
"feature_flags": RollbackController().snapshot_flags(),
"recalled_retrospectives": recalled_retrospectives,
"retrospective_shortlist": recalled_retrospectives,
"skill_shortlist": skill_shortlist,
"skill_activation_records": [
SkillActivationRecord(
skill_name=item.get("skill_name"),
source=item.get("source", "runtime"),
source_id=item.get("source_id"),
status=item.get("status", "active"),
injection_mode=item.get("injection_mode", "metadata_only"),
matched_terms=item.get("matched_terms", []),
rationale=item.get("rationale"),
).model_dump(mode="json")
for item in skill_shortlist
if item.get("skill_name")
],
"parallel_worthiness": runtime_request_context.parallel_worthiness.model_dump(
mode="json"
),
}
)
return state
async def _build_runtime_request_context(
self,
*,
request_id: str,
user_id: str,
conversation: Conversation,
user_query: str,
memory_context: str | None,
) -> tuple[RuntimeRequestContext, list[dict[str, Any]], list[dict[str, Any]]]:
started_at = perf_counter()
retrospectives_started = perf_counter()
recent_retrospectives = await SessionRetrospectiveSearch(self.db).shortlist(
user_id=user_id,
query_text=user_query,
conversation_id=conversation.id,
limit=3,
)
retrospective_ms = (perf_counter() - retrospectives_started) * 1000
feature_flags = RollbackController().snapshot_flags()
shortlist_started = perf_counter()
skill_shortlist = await shortlist_skills_for_request(
self.db,
user_id=user_id,
user_query=user_query,
memory_context=memory_context,
retrospectives=[item.model_dump(mode="json") for item in recent_retrospectives],
include_learned=feature_flags["ENABLE_LEARNED_SKILL_LOADING"],
limit=4,
)
skill_shortlist_ms = (perf_counter() - shortlist_started) * 1000
parallel_worthiness = assess_parallel_worthiness(
user_query,
retrospective_count=len(recent_retrospectives),
skill_count=len(skill_shortlist),
)
recommended_runtime_mode = (
"collaboration" if parallel_worthiness.preferred_mode != "direct" else "direct"
)
task_graph = (
build_bounded_task_graph(
query_text=user_query,
parallel_worthiness=parallel_worthiness,
)
if feature_flags["ENABLE_PARALLEL_TASK_GRAPH"]
else None
)
runtime_request_context = RuntimeRequestContext(
request_id=request_id,
session_id=conversation.id,
conversation_id=conversation.id,
user_id=user_id,
query_text=user_query,
raw_user_query=user_query,
recalled_memories=_extract_memory_highlights(memory_context),
recalled_retrospectives=[
summarize_retrospective(retrospective) for retrospective in recent_retrospectives
],
shortlisted_skills=[entry.skill_name for entry in skill_shortlist],
skill_shortlist=skill_shortlist,
current_agent_role="master",
execution_mode=recommended_runtime_mode,
conversation_state_ref=conversation.id,
parallel_worthiness=parallel_worthiness,
task_graph=task_graph,
recommended_runtime_mode=recommended_runtime_mode,
assembly_metrics={
"retrospective_ms": round(retrospective_ms, 3),
"skill_shortlist_ms": round(skill_shortlist_ms, 3),
"total_ms": round((perf_counter() - started_at) * 1000, 3),
},
)
return (
runtime_request_context,
[item.model_dump(mode="json") for item in recent_retrospectives],
[item.model_dump(mode="json") for item in skill_shortlist],
)
async def chat(
self,
user_id: str,
@@ -610,21 +780,38 @@ class AgentService:
async def run_agent():
collected = ""
state: dict[str, Any] | None = None
runtime_request_context: RuntimeRequestContext | None = None
set_current_user(user_id)
try:
graph = get_agent_graph()
current_datetime_context, current_datetime_reference = (
self._build_current_datetime_context()
)
state = await self._build_agent_state(
(
runtime_request_context,
recalled_retrospectives,
skill_shortlist,
) = await self._build_runtime_request_context(
request_id=assistant_msg.id,
user_id=user_id,
conversation=conv,
user_query=message,
memory_context=memory_ctx,
)
state = await self._build_agent_state(
request_id=assistant_msg.id,
user_id=user_id,
conversation=conv,
raw_user_query=message,
full_message=full_message,
memory_context=memory_ctx,
current_datetime_context=current_datetime_context,
current_datetime_reference=current_datetime_reference,
user_llm_config=user_llm_config,
runtime_request_context=runtime_request_context,
recalled_retrospectives=recalled_retrospectives,
skill_shortlist=skill_shortlist,
)
state.update(_derive_role_memory_contexts(memory_ctx))
@@ -749,7 +936,7 @@ class AgentService:
if collected:
assistant_msg.content = collected
continuity_snapshot = _build_continuity_snapshot(state or {})
assistant_msg.attachments = (
attachments = (
[
{
"kind": "agent_continuity_state",
@@ -757,8 +944,26 @@ class AgentService:
}
]
if continuity_snapshot
else None
else []
)
if state is not None and runtime_request_context is not None:
retrospective = build_session_retrospective(
request_id=assistant_msg.id,
session_id=conversation_id,
user_query=message,
state=state,
runtime_context=runtime_request_context,
)
attachments = append_retrospective_attachment(attachments, retrospective)
attachments.append(
{
"kind": "runtime_observability",
"payload": build_runtime_observability_report(
state=state,
feature_flags=state.get("feature_flags") or {},
),
}
)
conv.agent_state = (
{
"kind": "agent_continuity_state",
@@ -771,8 +976,18 @@ class AgentService:
user_id,
**_build_assistant_event_payload(collected),
)
assistant_msg.attachments = attachments or None
await self.db.commit()
await self.db.refresh(assistant_msg)
schedule_retrospective_job(
user_id=user_id,
conversation_id=conversation_id,
request_message_id=user_msg.id,
response_message_id=assistant_msg.id,
query_text=message,
final_response=collected,
state=state,
)
except Exception:
logger.exception("save_assistant_message_failed")
asyncio.create_task(self._try_auto_summarize_background(user_id, conversation_id))
@@ -863,14 +1078,30 @@ class AgentService:
current_datetime_context, current_datetime_reference = (
self._build_current_datetime_context()
)
state = await self._build_agent_state(
(
runtime_request_context,
recalled_retrospectives,
skill_shortlist,
) = await self._build_runtime_request_context(
request_id=assistant_msg.id,
user_id=user_id,
conversation=conv,
user_query=message,
memory_context=memory_ctx,
)
state = await self._build_agent_state(
request_id=assistant_msg.id,
user_id=user_id,
conversation=conv,
raw_user_query=message,
full_message=message,
memory_context=memory_ctx,
current_datetime_context=current_datetime_context,
current_datetime_reference=current_datetime_reference,
user_llm_config=user_llm_config,
runtime_request_context=runtime_request_context,
recalled_retrospectives=recalled_retrospectives,
skill_shortlist=skill_shortlist,
)
state.update(_derive_role_memory_contexts(memory_ctx))
result_state = await graph.ainvoke(state)
@@ -900,7 +1131,7 @@ class AgentService:
continuity_snapshot = (
_build_continuity_snapshot(result_state) if "result_state" in locals() else None
)
assistant_msg.attachments = (
attachments = (
[
{
"kind": "agent_continuity_state",
@@ -908,8 +1139,26 @@ class AgentService:
}
]
if continuity_snapshot
else None
else []
)
if "result_state" in locals() and "runtime_request_context" in locals():
retrospective = build_session_retrospective(
request_id=assistant_msg.id,
session_id=conversation_id,
user_query=message,
state=result_state,
runtime_context=runtime_request_context,
)
attachments = append_retrospective_attachment(attachments, retrospective)
attachments.append(
{
"kind": "runtime_observability",
"payload": build_runtime_observability_report(
state=result_state,
feature_flags=result_state.get("feature_flags") or {},
),
}
)
conv.agent_state = (
{
"kind": "agent_continuity_state",
@@ -918,7 +1167,17 @@ class AgentService:
if continuity_snapshot
else None
)
assistant_msg.attachments = attachments or None
await self.db.commit()
await self.db.refresh(assistant_msg)
schedule_retrospective_job(
user_id=user_id,
conversation_id=conversation_id,
request_message_id=user_msg.id,
response_message_id=assistant_msg.id,
query_text=message,
final_response=response_content,
state=result_state if "result_state" in locals() else None,
)
return conversation_id, assistant_msg.id, response_content, model_name_used