fix(server): 兼容模型 tasks 输出为 JSON 字符串与 flow_id 误填

- StewardModelPlanBuilder 解析 tasks 时兼容模型把数组序列化为字符串的情况,先反序列化;JSON 截断/语法不完整时用正则抢救 task_type/requested_action/ontology_fields 等关键字段
- task_type 未命中意图时尝试 flow_id→task_type 映射还原,避免模型把 flow_id(如 travel_application)误填为 task_type 导致正确意图被丢弃
This commit is contained in:
caoxiaozhu
2026-06-25 12:25:18 +08:00
parent 606a88c805
commit 2ebc2756bf

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import json
import re
import uuid
from datetime import date
@@ -110,6 +111,19 @@ class StewardModelPlanBuilder:
base_date: date,
) -> list[StewardTask]:
raw_tasks = payload.get("tasks")
# 兼容模型把 tasks 输出成 JSON 字符串(而非数组)的情况:
# 某些供应商在流式输出时会把整个数组序列化为字符串,需要先反序列化。
if isinstance(raw_tasks, str):
raw_tasks = raw_tasks.strip()
if not raw_tasks:
return []
try:
parsed_tasks = json.loads(raw_tasks)
except (TypeError, ValueError):
# JSON 可能被截断或语法不完整,尝试从残缺字符串里抢救 task_type 等关键字段,
# 避免模型偶发的格式抖动导致正确意图被整体丢弃。
parsed_tasks = self._salvage_tasks_from_fragment(raw_tasks)
raw_tasks = parsed_tasks if isinstance(parsed_tasks, list) else []
if not isinstance(raw_tasks, list):
return []
@@ -119,6 +133,13 @@ class StewardModelPlanBuilder:
continue
task_type = str(raw_task.get("task_type") or "").strip()
intent_descriptor = get_intent(task_type)
if intent_descriptor is None:
# 兼容模型把 flow_id(如 travel_application)误填为 task_type 的情况:
# 通过 flow_id → task_type 映射还原,避免正确意图被丢弃。
mapped_task_type = resolve_task_type_for_flow(task_type)
if mapped_task_type is not None:
task_type = mapped_task_type
intent_descriptor = get_intent(task_type)
if intent_descriptor is None:
continue
@@ -188,6 +209,50 @@ class StewardModelPlanBuilder:
return tasks
@staticmethod
def _salvage_tasks_from_fragment(fragment: str) -> list[dict[str, Any]]:
"""从残缺的 JSON 字符串里抢救 task 结构,避免模型格式抖动导致意图被整体丢弃。
模型偶发把 tasks 输出成被截断或语法不完整的 JSON 字符串,直接 json.loads 会失败。
这里用正则提取 task_type、requested_action 和 ontology_fields 等关键字段,
构造最小可用的 task dict 列表。
"""
if not fragment:
return []
task_type_match = re.search(r'"task_type"\s*:\s*"([^"]+)"', fragment)
if not task_type_match:
return []
task_type = task_type_match.group(1).strip()
requested_action = "preview"
action_match = re.search(r'"requested_action"\s*:\s*"([^"]+)"', fragment)
if action_match:
requested_action = action_match.group(1).strip()
# 提取 ontology_fields 对象内的键值对
ontology_fields: dict[str, str] = {}
ontology_block = re.search(
r'"ontology_fields"\s*:\s*\{([^}]*)\}',
fragment,
)
if ontology_block:
for key_match in re.finditer(
r'"([^"]+)"\s*:\s*"([^"]*)"',
ontology_block.group(1),
):
ontology_fields[key_match.group(1)] = key_match.group(2)
title_match = re.search(r'"title"\s*:\s*"([^"]+)"', fragment)
summary_match = re.search(r'"summary"\s*:\s*"([^"]+)"', fragment)
return [{
"task_type": task_type,
"title": title_match.group(1) if title_match else "",
"summary": summary_match.group(1) if summary_match else "",
"confidence": 0.8,
"requested_action": requested_action,
"ontology_fields": ontology_fields,
"missing_fields": [],
}]
def _build_pending_flow_confirmation(
self,
payload: dict[str, Any],