feat: 更新 user_agent 服务,增强用户代理功能

This commit is contained in:
caoxiaozhu
2026-05-18 02:50:32 +00:00
parent 4414ffb34c
commit 35a3783481
2 changed files with 394 additions and 55 deletions

View File

@@ -135,11 +135,14 @@ KNOWLEDGE_QUERY_STOPWORDS = {
}
MAX_KNOWLEDGE_QUERY_TERMS = 12
MAX_KNOWLEDGE_DIRECT_EVIDENCE = 4
MAX_KNOWLEDGE_MODEL_HITS = 5
KNOWLEDGE_SECTION_HEADING_PATTERN = re.compile(
r"^(#\s*.+|##\s*.+|###\s*.+|第[一二三四五六七八九十百零0-9]+[章节条]\s*.*|[一二三四五六七八九十]+、.*|[一二三四五六七八九十]+.*|\([一二三四五六七八九十]+\).*)$"
)
KNOWLEDGE_LIST_ITEM_PATTERN = re.compile(r"^[-*•]\s+.+$")
KNOWLEDGE_NUMBERED_ITEM_PATTERN = re.compile(r"^(?:\d+[.)、]|[①②③④⑤⑥⑦⑧⑨⑩])\s*.+$")
KNOWLEDGE_NUMBERED_ITEM_PATTERN = re.compile(
r"^(?:(?:\d+[.)、])|(?:[(][一二三四五六七八九十百零0-9]+[)])|[①②③④⑤⑥⑦⑧⑨⑩])\s*.+$"
)
KNOWLEDGE_ARTICLE_PATTERN = re.compile(r"^(第[一二三四五六七八九十百零0-9]+条)\s*.*$")
EXPENSE_STATUS_LABELS = {
@@ -496,10 +499,15 @@ class UserAgentService:
draft_payload: UserAgentDraftPayload | None,
fallback_answer: str,
) -> list[dict[str, str]]:
facts = {
"run_id": payload.run_id,
"user_message": payload.message,
"ontology": payload.ontology.model_dump(mode="json"),
knowledge_question = (
self._resolve_knowledge_question(payload)
if payload.ontology.scenario == "knowledge"
else ""
)
facts = {
"run_id": payload.run_id,
"user_message": payload.message,
"ontology": payload.ontology.model_dump(mode="json"),
"context": {
"entry_source": payload.context_json.get("entry_source"),
"user_name": payload.context_json.get("name"),
@@ -519,7 +527,10 @@ class UserAgentService:
"draft_claim_id": payload.context_json.get("draft_claim_id"),
"conversation_history": self._resolve_conversation_history(payload),
},
"tool_payload": self._build_model_tool_payload(payload.tool_payload),
"tool_payload": self._build_model_tool_payload(
payload.tool_payload,
question=knowledge_question,
),
"citations": [item.model_dump(mode="json") for item in citations],
"suggested_actions": [item.model_dump(mode="json") for item in suggested_actions],
"risk_flags": risk_flags,
@@ -530,7 +541,8 @@ class UserAgentService:
}
if payload.ontology.scenario == "knowledge":
facts["knowledge_evidence_blocks"] = self._build_knowledge_evidence_blocks(
payload.tool_payload
payload.tool_payload,
question=knowledge_question,
)
facts["knowledge_answer_evidence"] = [
{
@@ -598,13 +610,20 @@ class UserAgentService:
]
@staticmethod
def _build_model_tool_payload(tool_payload: dict[str, Any]) -> dict[str, Any]:
def _build_model_tool_payload(
tool_payload: dict[str, Any],
*,
question: str | None = None,
) -> dict[str, Any]:
normalized = dict(tool_payload or {})
hits = []
for item in UserAgentService._select_knowledge_model_hits(tool_payload):
for item in UserAgentService._select_knowledge_model_hits(
tool_payload,
question=question,
):
if not isinstance(item, dict):
continue
hits.append(
hits.append(
{
"title": str(item.get("title") or "").strip(),
"document_name": str(item.get("document_name") or "").strip(),
@@ -619,10 +638,17 @@ class UserAgentService:
return normalized
@staticmethod
def _build_knowledge_evidence_blocks(tool_payload: dict[str, Any]) -> str:
def _build_knowledge_evidence_blocks(
tool_payload: dict[str, Any],
*,
question: str | None = None,
) -> str:
blocks: list[str] = []
for index, item in enumerate(
UserAgentService._select_knowledge_model_hits(tool_payload)[:3],
UserAgentService._select_knowledge_model_hits(
tool_payload,
question=question,
)[:3],
start=1,
):
if not isinstance(item, dict):
@@ -645,24 +671,86 @@ class UserAgentService:
return "\n\n".join(blocks)
@staticmethod
def _select_knowledge_model_hits(tool_payload: dict[str, Any]) -> list[dict[str, Any]]:
raw_hits = [item for item in list(tool_payload.get("hits") or []) if isinstance(item, dict)]
structured_hits = [
def _select_knowledge_model_hits(
tool_payload: dict[str, Any],
*,
question: str | None = None,
) -> list[dict[str, Any]]:
raw_hits = [
item
for item in raw_hits
if any(
marker in str(item.get("content") or "")
for marker in ("问答线索补充", "结构化表格补充", "重点章节摘录")
)
]
selected = structured_hits[:2]
for item in raw_hits:
if item in selected:
continue
selected.append(item)
if len(selected) >= 3:
break
return selected[:3]
for item in list(tool_payload.get("hits") or [])
if isinstance(item, dict)
][: max(MAX_KNOWLEDGE_MODEL_HITS + 1, 6)]
if not raw_hits:
return []
query_terms = UserAgentService._extract_knowledge_query_terms(question or "")
if not query_terms:
return raw_hits[:MAX_KNOWLEDGE_MODEL_HITS]
ranked_hits = sorted(
enumerate(raw_hits),
key=lambda value: (
UserAgentService._score_knowledge_model_hit(
value[1],
query_terms=query_terms,
rank_index=value[0],
),
-value[0],
),
reverse=True,
)
return [item for _, item in ranked_hits[:MAX_KNOWLEDGE_MODEL_HITS]]
@staticmethod
def _score_knowledge_model_hit(
item: dict[str, Any],
*,
query_terms: list[str],
rank_index: int,
) -> int:
title = str(item.get("title") or item.get("document_name") or "").lower()
excerpt = str(item.get("excerpt") or "").lower()
content = str(item.get("content") or "").lower()
haystack = "\n".join([title, excerpt, content[:1400]])
matched_terms = [term for term in query_terms if term in haystack]
score = max(1, 48 - rank_index * 4)
score += len(matched_terms) * 10
score += sum(1 for term in matched_terms if term in title) * 8
leading_marker = UserAgentService._leading_knowledge_appendix_marker(content)
if leading_marker == "# 章节导航":
score -= 22
elif leading_marker == "# 问答线索补充":
score += 6 if matched_terms else -8
elif leading_marker == "# 重点章节摘录":
score += 4 if matched_terms else -4
elif leading_marker == "# 结构化表格补充":
score += 8 if matched_terms else -3
if matched_terms and "|" in content:
score += 8
if matched_terms and any(marker in content for marker in ("", ":")):
score += 10
if matched_terms and "\n" in content:
score += 4
if matched_terms and any(marker in content for marker in ("附表", "", "")):
score += 4
if matched_terms and any(marker in content for marker in ("", "", "", "-", "")):
score += 4
if re.search(r"没有.{0,8}(信息|规定|说明|依据)", content):
score -= 12
return score
@staticmethod
def _leading_knowledge_appendix_marker(content: str) -> str:
normalized = str(content or "").lstrip()
for marker in ("# 章节导航", "# 重点章节摘录", "# 问答线索补充", "# 结构化表格补充"):
index = normalized.find(marker)
if 0 <= index <= 220:
return marker
return ""
def _build_query_answer(self, payload: UserAgentRequest) -> str:
scenario = payload.ontology.scenario
@@ -860,25 +948,33 @@ class UserAgentService:
question = self._resolve_knowledge_question(payload)
query_terms = self._extract_knowledge_query_terms(question)
ordered_evidence_items = self._prioritize_knowledge_evidence_items(question, evidence_items)
lead = self._summarize_knowledge_evidence_content(ordered_evidence_items[0], query_terms)
primary_item = ordered_evidence_items[0]
primary_heading = self._format_knowledge_heading_label(
str(primary_item.get("heading") or "").strip()
)
primary_lines = self._collect_direct_knowledge_answer_lines(ordered_evidence_items)
lines: list[str] = []
if user_name:
lines.append(f"{user_name},您好。")
lines.append(f"根据《{title},当前能直接确认的是:{lead}")
lines.append("")
lines.append("## 依据")
source_prefix = f"根据《{title}"
if primary_heading:
source_prefix = f"{source_prefix}{primary_heading}"
for item in ordered_evidence_items:
heading = str(item.get("heading") or "").strip()
heading_text = f" > {heading}" if heading else ""
content = str(item.get("content") or "").strip()
if str(item.get("kind") or "") == "table":
lines.append(f"{item.get('title') or title}{heading_text}")
lines.append(self._extract_relevant_table_preview(content, query_terms))
lines.append("")
continue
lines.append(f"- 《{item.get('title') or title}{heading_text}{self._clean_knowledge_segment_text(content)}")
if str(primary_item.get("kind") or "") == "table":
lines.append(f"{source_prefix},当前能直接确认的是:")
lines.append(self._extract_relevant_table_preview(str(primary_item.get("content") or ""), query_terms))
else:
if not primary_lines:
lines.append(
f"{source_prefix},当前能直接确认的是:"
f"{self._summarize_knowledge_evidence_content(primary_item, query_terms)}"
)
elif len(primary_lines) == 1:
lines.append(f"{source_prefix},当前能直接确认的是:{primary_lines[0].strip()}")
else:
lines.append(f"{source_prefix},当前能直接确认的是:")
lines.extend(primary_lines)
notes: list[str] = []
location_note = self._build_missing_location_grounding_note(question, evidence_items)
@@ -889,7 +985,7 @@ class UserAgentService:
if notes:
lines.append("")
lines.append("## 说明")
lines.append("说明")
lines.extend(f"- {note}" for note in notes)
return "\n".join(line for line in lines if line is not None).strip()
@@ -944,7 +1040,10 @@ class UserAgentService:
query_terms = self._extract_knowledge_query_terms(question)
candidates: list[dict[str, Any]] = []
for hit in self._select_knowledge_model_hits(payload.tool_payload):
for hit in self._select_knowledge_model_hits(
payload.tool_payload,
question=question,
):
if not isinstance(hit, dict):
continue
candidates.extend(self._extract_knowledge_evidence_candidates(hit, query_terms))
@@ -988,7 +1087,9 @@ class UserAgentService:
if not content:
return []
raw_candidates = self._split_knowledge_hit_into_segments(content)
raw_candidates = self._merge_knowledge_lead_in_segments(
self._split_knowledge_hit_into_segments(content)
)
candidates: list[dict[str, Any]] = []
for item in raw_candidates:
score = self._score_knowledge_evidence_candidate(item, query_terms)
@@ -1015,6 +1116,95 @@ class UserAgentService:
}
]
@staticmethod
def _is_knowledge_lead_in_segment(item: dict[str, str]) -> bool:
kind = str(item.get("kind") or "").strip()
content = str(item.get("content") or "").strip()
return kind in {"kv", "list", "clause"} and content.endswith(("", ":"))
@staticmethod
def _extract_knowledge_marker_family(content: str) -> str:
normalized = str(content or "").strip()
if not normalized:
return ""
if KNOWLEDGE_ARTICLE_PATTERN.match(normalized):
return "article"
if re.match(r"^\d+[.)、]\s*", normalized):
return "arabic"
if re.match(r"^[(][一二三四五六七八九十百零0-9]+[)]\s*", normalized):
return "paren"
if re.match(r"^[①②③④⑤⑥⑦⑧⑨⑩]\s*", normalized):
return "circled"
if KNOWLEDGE_LIST_ITEM_PATTERN.match(normalized):
return "bullet"
return ""
@staticmethod
def _format_knowledge_heading_label(heading: str) -> str:
parts = [item.strip() for item in str(heading or "").split(">") if item.strip()]
return " / ".join(parts)
def _merge_knowledge_lead_in_segments(
self,
segments: list[dict[str, str]],
) -> list[dict[str, str]]:
if not segments:
return []
merged: list[dict[str, str]] = []
index = 0
while index < len(segments):
current = dict(segments[index])
if not self._is_knowledge_lead_in_segment(current):
merged.append(current)
index += 1
continue
base_heading = str(current.get("heading") or "").strip()
current_marker = self._extract_knowledge_marker_family(str(current.get("content") or ""))
follow_segments: list[dict[str, str]] = []
next_index = index + 1
while next_index < len(segments):
candidate = segments[next_index]
if str(candidate.get("heading") or "").strip() != base_heading:
break
candidate_kind = str(candidate.get("kind") or "").strip()
candidate_content = str(candidate.get("content") or "").strip()
candidate_marker = self._extract_knowledge_marker_family(candidate_content)
if not candidate_content or candidate_kind == "table":
break
if current_marker and candidate_marker == current_marker:
break
if self._is_knowledge_lead_in_segment(candidate) and follow_segments:
break
if candidate_kind not in {"list", "paragraph", "kv", "clause"}:
break
follow_segments.append(candidate)
next_index += 1
if len(follow_segments) >= 4:
break
if candidate_kind == "paragraph" and len(candidate_content) >= 200:
break
if follow_segments:
current["content"] = "\n".join(
[str(current.get("content") or "").strip()]
+ [str(item.get("content") or "").strip() for item in follow_segments]
)
if any(str(item.get("kind") or "").strip() == "list" for item in follow_segments):
current["kind"] = "list"
merged.append(current)
index = next_index
continue
merged.append(current)
index += 1
return merged
def _split_knowledge_hit_into_segments(self, content: str) -> list[dict[str, str]]:
segments: list[dict[str, str]] = []
markdown_headings: list[str] = []
@@ -1218,11 +1408,85 @@ class UserAgentService:
normalized = str(content or "").strip()
normalized = re.sub(r"^[-*•]\s*", "", normalized)
normalized = re.sub(r"^(?:\d+[.)、]|[①②③④⑤⑥⑦⑧⑨⑩])\s*", "", normalized)
normalized = re.sub(r"^[(][一二三四五六七八九十百零0-9]+[)]\s*", "", normalized)
normalized = re.sub(r"\s+", " ", normalized)
if len(normalized) <= 180:
return normalized
return f"{normalized[:177].rstrip()}..."
@staticmethod
def _normalize_knowledge_line(content: str, *, preserve_marker: bool) -> str:
normalized = str(content or "").strip()
normalized = re.sub(r"^[-*•]\s*", "", normalized)
if not preserve_marker:
normalized = re.sub(r"^(?:\d+[.)、]|[①②③④⑤⑥⑦⑧⑨⑩])\s*", "", normalized)
normalized = re.sub(r"^[(][一二三四五六七八九十百零0-9]+[)]\s*", "", normalized)
normalized = re.sub(r"\s+", " ", normalized)
return normalized
def _split_clean_knowledge_lines(
self,
content: str,
*,
preserve_marker: bool,
) -> list[str]:
return [
line
for line in (
self._normalize_knowledge_line(item, preserve_marker=preserve_marker)
for item in str(content or "").splitlines()
)
if line
]
def _render_knowledge_evidence_text(self, item: dict[str, Any]) -> str:
lines = self._split_clean_knowledge_lines(
str(item.get("content") or ""),
preserve_marker=True,
)
if not lines:
return ""
if len(lines) == 1:
return self._clean_knowledge_segment_text(lines[0])
return "\n".join(f" {line}" for line in lines)
def _collect_direct_knowledge_answer_lines(
self,
ordered_evidence_items: list[dict[str, Any]],
) -> list[str]:
if not ordered_evidence_items:
return []
primary_item = ordered_evidence_items[0]
primary_title = str(primary_item.get("title") or "").strip()
primary_heading = str(primary_item.get("heading") or "").strip()
primary_kind = str(primary_item.get("kind") or "").strip()
related_items = [primary_item]
if primary_kind != "table":
for item in ordered_evidence_items[1:]:
if len(related_items) >= 3:
break
if str(item.get("kind") or "").strip() != primary_kind:
continue
if str(item.get("title") or "").strip() != primary_title:
continue
if str(item.get("heading") or "").strip() != primary_heading:
continue
related_items.append(item)
lines: list[str] = []
seen: set[str] = set()
for item in related_items:
rendered = self._render_knowledge_evidence_text(item)
for line in rendered.splitlines():
normalized = str(line or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
lines.append(line)
return lines
def _summarize_knowledge_evidence_content(
self,
item: dict[str, Any],
@@ -1236,6 +1500,9 @@ class UserAgentService:
if len(preview_rows) >= 3:
return "当前命中的直接依据是一张与问题强相关的标准表,已摘出最相关的表头和行。"
return "当前命中的直接依据是一张与问题强相关的标准表。"
lines = self._split_clean_knowledge_lines(content, preserve_marker=True)
if len(lines) >= 2:
return self._clean_knowledge_segment_text(f"{lines[0]} {' '.join(lines[1:4])}")
return self._clean_knowledge_segment_text(content)
@staticmethod
@@ -1354,9 +1621,12 @@ class UserAgentService:
)
evidence_lines.append(f"- 《{item.get('title') or title}{heading_text}\n{preview}")
continue
content = self._clean_knowledge_segment_text(str(item.get("content") or ""))
if content:
evidence_lines.append(f"- 《{item.get('title') or title}{heading_text}{content}")
rendered = self._render_knowledge_evidence_text(item)
if rendered:
if "\n" in rendered:
evidence_lines.append(f"- 《{item.get('title') or title}{heading_text}\n{rendered}")
else:
evidence_lines.append(f"- 《{item.get('title') or title}{heading_text}{rendered}")
if not evidence_lines:
for item in hits[:2]:

View File

@@ -202,21 +202,37 @@ def test_user_agent_knowledge_answer_generation_uses_fast_timeouts(monkeypatch)
assert captured["max_attempts"] == 1
def test_user_agent_prefers_structured_knowledge_hit_for_answer_generation() -> None:
def test_user_agent_prefers_structured_table_hit_for_standard_query() -> None:
selected = UserAgentService._select_knowledge_model_hits(
{
"hits": [
{"content": "raw hit 1"},
{"content": "raw hit 2"},
{"content": "# 问答线索补充\n\n- 第二章 报销时限:费用发生后 30 日内提交申请。"},
{"content": "# 结构化表格补充\n\n| 项目 | 金额 |"},
{"content": "# 结构化表格补充\n\n| 项目 | 餐补 |\n| 其他地区 | 55 |"},
]
}
},
question="餐补标准是多少?",
)
assert selected[0]["content"].startswith("# 问答线索补充")
assert selected[1]["content"].startswith("# 结构化表格补充")
assert selected[2]["content"] == "raw hit 1"
assert selected[0]["content"].startswith("# 结构化表格补充")
assert any(item["content"].startswith("# 结构化表格补充") for item in selected[:2])
def test_user_agent_prefers_relevant_raw_hit_over_generic_appendix() -> None:
selected = UserAgentService._select_knowledge_model_hits(
{
"hits": [
{"content": "# 章节导航\n\n- 第一章 总则\n- 第二章 职责分工"},
{"content": "# 问答线索补充\n\n- 第二章 职责分工:计划财务部负责财务审核。"},
{"content": "一般性说明文字,没有探亲差旅归口信息。"},
{"content": "附表3支出归口管理部门与归口业务范围\n组织人事部:探亲差旅、条件艰苦及安全风险较高区域补助等支出。"},
]
},
question="探亲差旅归哪个部门管理?",
)
assert "组织人事部" in selected[0]["content"]
def test_user_agent_uses_fast_knowledge_answer_without_model(monkeypatch) -> None:
@@ -266,6 +282,7 @@ def test_user_agent_uses_fast_knowledge_answer_without_model(monkeypatch) -> Non
assert response.answer.startswith("张三,您好。")
assert "当前能直接确认的是" in response.answer
assert "30 日内提交报销申请" in response.answer
assert "## 依据" not in response.answer
assert "答案整理阶段本轮没有及时返回" not in response.answer
@@ -314,6 +331,7 @@ def test_user_agent_fast_knowledge_answer_renders_relevant_table_preview() -> No
assert answer is not None
assert "| 项目 | 港澳台 | 其他地区 | 国外 |" in answer
assert "| 餐补 | 75 | 55 | 140 |" in answer
assert "## 依据" not in answer
def test_user_agent_fast_knowledge_answer_notes_missing_location_grounding() -> None:
@@ -360,6 +378,57 @@ def test_user_agent_fast_knowledge_answer_notes_missing_location_grounding() ->
assert answer is not None
assert "没有直接写出“北京”对应的地区档位或映射关系" in answer
assert "## 依据" not in answer
def test_user_agent_fast_knowledge_answer_expands_lead_in_list_items() -> None:
session_factory = build_session_factory()
with session_factory() as db:
ontology = SemanticOntologyService(db).parse(
OntologyParseRequest(
query="出差记录链条中断时,要提供哪些业务佐证材料?",
user_id="pytest",
context_json={"session_type": "knowledge"},
)
)
service = UserAgentService(db)
answer = service._build_fast_knowledge_answer(
UserAgentRequest(
run_id=ontology.run_id,
user_id="pytest",
message="出差记录链条中断时,要提供哪些业务佐证材料?",
ontology=ontology,
context_json={
"session_type": "knowledge",
"user_input_text": "出差记录链条中断时,要提供哪些业务佐证材料?",
},
tool_payload={
"result_type": "knowledge_search",
"hits": [
{
"title": "费用报销制度",
"content": (
"第十三条 差旅费\n\n"
"2出差记录链条中断时应提供业务佐证材料\n"
"① 登机牌、高速道路通行记录、其他道路通行记录、租车记录等。\n"
"② 支付记录。\n"
"③ 出差审批邮件、短信、微信等。"
),
}
],
},
),
citations=[],
)
assert answer is not None
assert "当前能直接确认的是" in answer
assert "登机牌、高速道路通行记录" in answer
assert "支付记录" in answer
assert "出差审批邮件、短信、微信等" in answer
assert "3" not in answer
assert "## 依据" not in answer
def test_user_agent_model_prompt_supports_contextual_personalization() -> None: