From 52d57c3be759420601aad9dacc6a413109624483 Mon Sep 17 00:00:00 2001 From: caoxiaozhu Date: Fri, 3 Jul 2026 13:56:21 +0800 Subject: [PATCH] =?UTF-8?q?test(flywheel):=20=E8=A1=A5=20few-shot=20?= =?UTF-8?q?=E9=A3=9E=E8=BD=AE=E5=8D=95=E6=B5=8B=E5=B9=B6=E6=B2=89=E6=B7=80?= =?UTF-8?q?=E5=BC=80=E5=8F=91=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - embedding_provider:GLM/Ollama 分支、维度缓存、HTTP 错误降级 - few_shot_ingestion:confirmed/false_positive 入库、ignored 跳过、幂等去重、 create_feedback hook 触发、feature flag、吞异常 - few_shot_retrieval:去重、token 预算、超长截断;prompt 注入合并 examples + 向后兼容 - 容器内新增测试 20 passed;回归测试 35 passed(RAG/risk_observations/rule_generation) - 沉淀 document/development/2026-07-03/feature/ai-data-flywheel 概念文档与 TODO, 飞轮 1 已勾选证据,飞轮 2-6 待后续迭代 --- .../feature/ai-data-flywheel/CONCEPT.md | 190 ++++++++++++++++ .../feature/ai-data-flywheel/TODO.md | 98 ++++++++ server/tests/test_embedding_provider.py | 104 +++++++++ server/tests/test_few_shot_ingestion.py | 214 ++++++++++++++++++ .../test_few_shot_retrieval_and_prompt.py | 119 ++++++++++ 5 files changed, 725 insertions(+) create mode 100644 document/development/2026-07-03/feature/ai-data-flywheel/CONCEPT.md create mode 100644 document/development/2026-07-03/feature/ai-data-flywheel/TODO.md create mode 100644 server/tests/test_embedding_provider.py create mode 100644 server/tests/test_few_shot_ingestion.py create mode 100644 server/tests/test_few_shot_retrieval_and_prompt.py diff --git a/document/development/2026-07-03/feature/ai-data-flywheel/CONCEPT.md b/document/development/2026-07-03/feature/ai-data-flywheel/CONCEPT.md new file mode 100644 index 0000000..525127a --- /dev/null +++ b/document/development/2026-07-03/feature/ai-data-flywheel/CONCEPT.md @@ -0,0 +1,190 @@ +# AI 数据飞轮 概念文档 + +更新时间:2026-07-03 + +文档路径:document/development/2026-07-03/feature/ai-data-flywheel/CONCEPT.md + +## 功能一句话 + +把用户反馈、人工修正、风险样本、评测结果自动沉淀并回流到下一次 LLM 推理与规则生成,让费控系统在不停机的情况下持续提升准确率、降低误报率和人工干预率。 + +## 背景与问题 + +- 当前现状:项目已具备"聪明"的骨架——RAG(`knowledge_rag_runtime.py` + Qdrant)、风险规则自动生成(`risk_rule_generation*.py`)、反馈样本沉淀(`skills/domain/false-positive-sample-accumulator` 等 3 个 accumulator)、规则回放评测(`risk-algorithm-replay-evaluator`)、行为画像(`employee_behavior_profile*`)、用户反馈表(`agent_feedback.py` + `AgentOperationFeedback`)。 +- 用户痛点:样本在往 accumulator 池子里堆,但**下一次 LLM 推理时并没有把这些样本检索出来当 few-shot 喂进去**;prompt 散落在各 `*_prompt.py`,没有版本号、没有在线 A/B、没有回归门禁;OCR 抽取的人工修正值没有回流成评测/训练数据;低分反馈只汇总不归因。 +- 业务影响:系统每次推理都从"初始水平"出发,无法把历史踩过的坑转化成下一次的能力;改 prompt / 改规则无法证明是否变好,存在隐性回归风险;运营和算法同事看不到"系统在进步"的证据。 +- 为什么现在需要做:飞轮骨架已齐,缺的是把"样本池 → 检索注入 + 评测门禁 → prompt/规则版本"这段断开的箭头接上。补上后整张图就转起来,且改动集中在 prompt 构造层 + 新增 eval 目录,不动业务主链路,风险低。 + +## 目标与非目标 + +### 目标 + +- [G1] few-shot 在线检索注入:推理前从样本池按 case 特征做向量检索,取 top-k 历史样本(含人工结论)拼进 system prompt。 +- [G2] 黄金评测集 + 自动回归门禁:版本化 golden set,prompt/规则变更后在 golden set 上自动跑分,分数不达标禁止发布。 +- [G3] Prompt 版本化 + Canary A/B:prompt 进表带版本号,支持 stable / canary 流量切分,反馈分数对比。 +- [G4] 抽取修正回流:附件/明细字段的人工修正值记录为 diff,沉淀为抽取评测集与 few-shot 样本。 +- [G5] 低分反馈自动归因:低分反馈触发归因 agent,拉 trace 诊断错误环节并生成改进任务。 +- [G6] AI 智商看板:每周自动跑 golden set,输出准确率/召回率/误报率/人工干预率随时间的曲线。 + +### 非目标 + +- [NG1] 本轮不做模型微调 / 自训练:只走 prompt 侧的 in-context learning + 规则学习。 +- [NG2] 本轮不改变现有业务主链路(申请单、报销、审批)的接口契约。 +- [NG3] 本轮不替换 Qdrant / LightRAG 底座,复用现有向量存储与 embedding 配置。 +- [NG4] 政策新鲜度检测(外部政策变更 → 自动重生成规则)后续再评估,本轮只在评测门禁侧预留接口。 + +## 用户与场景 + +- 目标用户: + 1. 报销人 / 申请人:感知到系统越来越准,少打回、少补件。 + 2. 财务审批人:误报率下降,审批被打断的次数减少。 + 3. 算法/运营同学:能看到智商曲线、能灰度上线 prompt、能跑回归评测。 +- 使用入口: + - 推理时自动注入 few-shot(对用户透明)。 + - 后台 Canary 控制台(运营切流量、看分数)。 + - AI 智商看板(周报 / 在线查询)。 +- 核心场景: + 1. 用户提交报销 → 系统预审 → 预审 prompt 自动注入相似历史误报样本 → 给出更准结论。 + 2. 算法同学改了 risk rule 生成 prompt → 发布前自动跑 golden set → 不达标被门禁拦下。 + 3. 用户给低分 → 归因 agent 诊断"是检索没召回 / 规则误判 / 回复格式问题"→ 自动建改进任务并回写样本池。 + 4. 审批人改了 OCR 抽错的金额 → diff 自动沉淀 → 下次同类票据抽取 prompt 多一条 few-shot。 +- 异常场景: + - 样本池为空或检索失败 → 退化为无 few-shot 推理,不阻塞主链路。 + - 评测门禁服务不可用 → 默认放行 stable,canary 自动暂停。 + - Canary 候选 prompt 分数劣化 → 自动回滚到 stable。 + +## 功能能力 + +- [C1] 输入能力:消费 accumulator 样本池、`AgentOperationFeedback`、附件修正 diff、trace 数据作为飞轮原料。 +- [C2] 处理能力:样本检索(向量 + 元数据过滤)、评测打分(准确率/召回率/误报率/F1)、Canary 流量切分、低分归因。 +- [C3] 输出能力:few-shot 注入后的 messages、评测报告、智商曲线、改进任务、归因结论。 +- [C4] 状态与权限:样本带"人工已确认"标签才进可注入集合;prompt 版本有 stable/canary/pinned 状态;评测门禁可由运营关闭(审计可见)。 +- [C5] 边界与降级:检索失败、评测失败、Canary 失败均降级到 stable,不阻塞业务推理。 + +## 方案设计 + +### 前端 + +- 页面/组件: + - AI 智商看板(新页面,复用 `finance-report` 看板骨架):准确率/召回率/误报率/人工干预率随时间曲线 + golden set 覆盖度。 + - Canary 控制台(并入 `SettingsView` / `PoliciesView`):列出各场景 prompt 版本、流量比例、当前分数、一键回滚。 +- 交互状态:加载/空态(样本不足)/错误态(评测失败)/权限态(仅算法运营)。 +- 展示规则:曲线按场景(差旅/报销/预算)分面;Canary 显示置信区间,差异不显著时标注。 +- 降级处理:看板数据不可用时提示"数据生成中",不报错。 + +### 后端 + +- 接口/服务(新增,按职责拆分,单文件 ≤ 800 行): + - `services/few_shot_retrieval.py`:样本检索器,复用 Qdrant,输入 case 特征 → 输出 top-k 样本(带人工结论)。 + - `services/prompt_registry.py`:prompt 版本注册中心,按场景 + 策略(latest/canary/pinned)取 prompt。 + - `services/eval_harness.py`:在 golden set 上跑评测,输出指标;被发布门禁和智商看板共用。 + - `services/feedback_attribution.py`:低分归因 agent,复用 `AgentTraceCenter` 数据。 + - `services/extraction_correction_recorder.py`:记录 OCR 抽取字段的人工修正 diff。 +- 改造点(在现有 prompt 构造文件加 inject 钩子,不改业务接口): + - `risk_rule_generation_prompt.py`、`user_agent_application.py`、`expense_claim_pre_review.py`、`document_intelligence_rules.py`、`ontology_extraction.py` 的 prompt 构造处。 +- 权限与校验:Canary 控制台仅算法/运营角色;门禁关闭需审计日志。 +- 持久化(新表,Alembic 迁移): + - `prompt_version`:id / scene / content / version / status(stable/canary/pinned) / eval_score / created_by / created_at。 + - `golden_set`:id / scene / case_payload / expected / source(accumulator/manual) / confirmed / version。 + - `extraction_correction`:id / attachment_id / field / raw_value / corrected_value / operator / created_at。 + - `eval_run`:id / prompt_version_id / scene / metrics_json / started_at / finished_at。 +- 降级处理:所有飞轮组件故障均降级到无 few-shot + stable prompt,主链路不阻塞。 + +### 算法与规则 + +- 输入:case 特征向量(场景标签 + 文本摘要 + 关键字段)、golden set、反馈样本、修正 diff。 +- 流程: + 1. 推理前:`few_shot_retrieval` 检索 top-k → 拼 system prompt。 + 2. 推理后:结果 + 反馈写入 accumulator。 + 3. 发布前:`eval_harness` 在 golden set 上跑分 → 门禁判定。 + 4. 低分触发:`feedback_attribution` 归因 → 改进任务回写样本池。 +- 输出:few-shot 样本块、评测指标、归因结论、智商曲线数据点。 +- 解释:few-shot 注入在 prompt 中保留"参考案例(历史已确认)"段落,可追溯;评测报告附错误 case 列表;归因输出错误环节标签 + 证据 trace 片段。 + +### 数据与契约 + +- 核心字段:scene、case_signature、few_shot_samples、metrics(acc/recall/fpr/f1)、prompt_version_id、status。 +- 状态枚举: + - prompt: `stable` / `canary` / `pinned` / `archived`。 + - golden case: `draft` / `confirmed` / `deprecated`。 + - eval: `pass` / `fail` / `blocked`。 +- 兼容策略:prompt_registry 找不到版本时回退到当前硬编码 prompt(保证向后兼容)。 +- 版本/审计:每次 prompt / 规则 / golden set 变更记 `eval_run`,可回放历史。 + +## 算法与公式 + +### few-shot 检索排序 + +```text +score(sample, case) = α * sim(emb(sample), emb(case)) + β * match(meta(sample), meta(case)) +``` + +变量说明: + +- score:样本与当前 case 的综合相似度。 +- sim:余弦相似度,复用 Qdrant 现有 embedding。 +- match:元数据硬匹配得分(场景同 / 域同 / 级别同),取 0 或 1。 +- α、β:权重,默认 α=0.8、β=0.2,可在 prompt_registry 中按场景覆盖。 +- 适用边界:仅取 `confirmed=true` 的样本;top-k 默认 k=3,按 token 预算动态裁剪。 + +### 评测指标 + +```text +precision = TP / (TP + FP) +recall = TP / (TP + FN) +f1 = 2 * precision * recall / (precision + recall) +``` + +变量说明: + +- TP/FP/FN:在 golden set 上推理结论与 expected 比对得出(结论为风险标记/字段值/分类标签三类场景各有比对器)。 +- 发布门禁默认阈值:recall ≥ 上一版 stable 的 recall 且 f1 不下降超过 2 个百分点,否则 `fail`。 + +## 测试方案 + +后端: + +- `few_shot_retrieval` 单测:样本池空 / 检索失败 / top-k 截断 / 仅取 confirmed 样本。 +- `eval_harness` 单测:golden set 跑分指标正确性、门禁通过/拦截逻辑、空 golden set 降级。 +- `prompt_registry` 单测:按策略取版本、回退到硬编码、Canary 流量切分比例。 +- `feedback_attribution` 单测:mock trace 数据,归因标签正确性。 + +前端: + +- AI 智商看板视图模型:空态、加载态、错误态、曲线渲染。 +- Canary 控制台:列表、切流量、回滚交互。 + +集成: + +- 端到端:构造一份 golden set → 改 prompt → 发布被门禁拦截 / 通过 → 智商看板出现新数据点。 +- 容器内运行:`docker exec -w /app -e SERVER_VENV_DIR=/tmp/x-financial-server-venv local-x-financial-linux /tmp/x-financial-server-venv/bin/pytest -q server/tests/...`,超时 60s。 + +手工验证: + +- 在 AI 工作台触发一次预审,确认 prompt 中出现 few-shot 块。 +- 在 Canary 控制台发布一版劣化 prompt,确认被门禁拦下。 + +## 指标与验收 + +- [A1] 功能验收:推理时 prompt 中可见 few-shot 块,且样本来自 confirmed 池。 +- [A2] 性能指标:few-shot 检索 ≤ 200ms(P95),不显著拖慢主链路;eval 单场景 ≤ 60s。 +- [A3] 质量指标:golden set 覆盖至少 5 个核心场景;门禁能正确拦截劣化 prompt。 +- [A4] 安全/权限指标:Canary 控制台仅算法/运营可操作;门禁关闭记审计日志。 +- [A5] 可观测性:AI 智商看板按周生成曲线;每次 eval_run 可回放。 + +## 风险与开放问题 + +- 风险: + - few-shot 注入增加 prompt 长度,可能触发 token 上限或拖慢推理 → 用 token 预算裁剪 + P95 监控兜底。 + - 样本池噪音(错误标注)污染推理 → 只取 confirmed 样本 + 评测门禁把关。 + - 评测 golden set 与线上分布漂移 → 季度复审 golden set,标注漂移度。 +- 已处理依赖:复用 Qdrant / LightRAG / accumulator / AgentTraceCenter / risk_rule_generation 现有能力。 +- 待确认: + - Canary 流量切分的具体比例(建议 90/10)需与业务确认。 + - 智商看板放哪个一级菜单(Settings 还是独立"AI 运营"菜单)。 + - 政策新鲜度检测是否本轮接入。 +- 降级策略:任何飞轮组件故障 → 无 few-shot + stable prompt + 跳过门禁(仅 stable),保证业务连续。 + +## 本轮实现记录 + +- 2026-07-03:完成数据飞轮概念文档与开发 TODO 拆分,作为后续改造的总纲。 diff --git a/document/development/2026-07-03/feature/ai-data-flywheel/TODO.md b/document/development/2026-07-03/feature/ai-data-flywheel/TODO.md new file mode 100644 index 0000000..e81121e --- /dev/null +++ b/document/development/2026-07-03/feature/ai-data-flywheel/TODO.md @@ -0,0 +1,98 @@ +# AI 数据飞轮 开发 TODO + +更新时间:2026-07-03 + +文档路径:document/development/2026-07-03/feature/ai-data-flywheel/TODO.md + +## 使用规则 + +- 每个 TODO 必须对应 `CONCEPT.md` 中的目标、能力、方案或验收点。 +- 只有完成并验证后,才能把 `[ ]` 改成 `[x]`。 +- 勾选时在任务后补充简短证据,例如文件、接口、命令或验证结果。 +- 如果需求发生变化,先更新 `CONCEPT.md`,再调整本 TODO。 +- 实施顺序建议:阶段 1 → 2 → 3(飞轮 1+2 是地基)→ 4/5/6 并行 → 7。 + +## 1. 调研与边界 + +- [x] [CONCEPT: 背景与问题] 盘点现有 accumulator / feedback / RAG / 规则生成能力,确认飞轮骨架已存在、断点在"检索注入 + 评测门禁"。 + 证据:`server/src/app/skills/domain/{false-positive-sample-accumulator,risk-feedback-sample-accumulator,risk-clue-collector}`、`services/agent_feedback.py`、`services/knowledge_rag_runtime.py`、`services/risk_rule_generation*.py`。 +- [x] [CONCEPT: 目标与非目标] 确认本轮范围 = 飞轮 1-6(few-shot 注入 / golden set 门禁 / prompt 版本化 / 抽取修正回流 / 低分归因 / 智商看板),不做模型微调、不改业务接口契约。 + 证据:CONCEPT.md「目标与非目标」章节。 +- [ ] [CONCEPT: 风险与开放问题] 与业务确认 Canary 流量比例、智商看板菜单位置、政策新鲜度检测是否本轮接入。 + 证据: + +## 2. 契约与设计 + +- [ ] [CONCEPT: 功能能力] 定义 4 张新表的字段、状态枚举(prompt stable/canary/pinned/archived、golden draft/confirmed/deprecated、eval pass/fail/blocked、correction)。 + 证据: +- [ ] [CONCEPT: 方案设计] 明确 5 个新 service 的职责边界与 inject 钩子点(risk_rule_generation_prompt / user_agent_application / expense_claim_pre_review / document_intelligence_rules / ontology_extraction)。 + 证据: +- [ ] [CONCEPT: 算法与公式] 确认 few-shot 检索排序公式权重(默认 α=0.8 β=0.2)与门禁阈值(recall 不降、f1 下降 ≤ 2pp)。 + 证据: +- [ ] [CONCEPT: 指标与验收] 把验收点 A1-A5 转成可验证检查项,附命令与期望结果。 + 证据: + +## 3. 后端实现 + +- [x] [CONCEPT: 后端] 新增 `services/few_shot_retrieval.py`:复用 Qdrant,按 case 特征检索 top-k confirmed 样本,带 token 预算裁剪。 + 证据:`server/src/app/services/few_shot_retrieval.py`;`server/src/app/services/few_shot_store.py`(独立 Qdrant collection `few_shot_samples`);`server/src/app/services/embedding_provider.py`(公共 EmbeddingProvider,复用 knowledge_rag_runtime 的 HTTP 调用)。 +- [ ] [CONCEPT: 后端] 新增 `services/prompt_registry.py`:prompt 版本 CRUD + 策略取版(latest/canary/pinned)+ 回退硬编码。 + 证据:飞轮 3(prompt 版本化 + Canary)未启动,本轮只做飞轮 1。 +- [ ] [CONCEPT: 后端] 新增 `services/eval_harness.py`:在 golden set 上跑评测,输出 precision/recall/f1,供门禁与看板共用。 + 证据:飞轮 2(golden set + 门禁)未启动,本轮只做飞轮 1。 +- [ ] [CONCEPT: 后端] 新增 `services/feedback_attribution.py`:低分反馈触发,复用 AgentTraceCenter trace 做归因,输出错误环节标签 + 改进任务。 + 证据:飞轮 5(低分归因)未启动,本轮只做飞轮 1。 +- [ ] [CONCEPT: 后端] 新增 `services/extraction_correction_recorder.py`:在附件/明细字段更新处记录 raw vs corrected diff。 + 证据:飞轮 4(抽取修正回流)未启动,本轮只做飞轮 1。 +- [ ] [CONCEPT: 后端] Alembic 迁移:prompt_version / golden_set / extraction_correction / eval_run 四张表。 + 证据:本轮新增的是 FewShotSample 一张表(`server/src/app/models/few_shot_sample.py`),项目靠 `Base.metadata.create_all()` 建表(无 alembic versions/ 目录),已注册到 `db/base.py` 和 `models/__init__.py`。其余三表随对应飞轮再建。 +- [x] [CONCEPT: 后端] 新增 `services/few_shot_ingestion.py`:RiskObservation confirmed/false_positive → FewShotSample + Qdrant 向量,在 `risk_observations.create_feedback` commit 后 hook 触发。 + 证据:`server/src/app/services/few_shot_ingestion.py`;`server/src/app/services/risk_observations.py:324-345`(`_maybe_ingest_few_shot` hook,带 feature flag + try/except 兜底)。 +- [x] [CONCEPT: 数据与契约] 在现有 prompt 构造文件加 few-shot 注入,不改业务接口。 + 证据:`server/src/app/services/risk_rule_generation_prompt.py`(新增 `few_shot_samples` 可选 kwarg,合并进 examples 字段);`server/src/app/services/risk_rule_generation.py:271-292`(`_retrieve_few_shot_samples` 在构造 messages 前调用,失败降级为空)。 + +## 4. 算法/规则实现 + +- [x] [CONCEPT: 算法与规则] 实现few-shot 检索排序(向量相似度 + 元数据硬匹配),只取 confirmed 样本。 + 证据:`server/src/app/services/few_shot_store.py`(Qdrant 余弦相似度 + payload 过滤 scene/label/status);`few_shot_retrieval.py` 去重 + token 预算 + 单条字符上限裁剪。检索仅取 label ∈ {confirmed, false_positive}。 +- [ ] [CONCEPT: 算法与规则] 实现评测指标比对器(风险标记 / 字段值 / 分类标签 三类场景)。 + 证据:飞轮 2,未启动。 +- [ ] [CONCEPT: 算法与规则] 接入发布门禁:`agent_asset_risk_rule_publish` 前调 eval_harness,不达标 block。 + 证据:飞轮 2,未启动。 +- [ ] [CONCEPT: 算法与规则] 接入 Canary 流量切分(默认 90 stable / 10 canary)+ 劣化自动回滚。 + 证据:飞轮 3,未启动。 +- [x] [CONCEPT: 结果解释] few-shot 块在 prompt 中保留 `source: "historical_confirmed"` 标记,可追溯。 + 证据:`risk_rule_generation_prompt.py` 合并 examples 时每条历史样本带 `source`/`label`/`conclusion` 字段。 + +## 5. 前端实现 + +- [ ] [CONCEPT: 前端] AI 智商看板新页面:准确率/召回率/误报率/人工干预率随时间曲线 + golden set 覆盖度,复用 `finance-report` 看板骨架。 + 证据: +- [ ] [CONCEPT: 前端] Canary 控制台(并入 Settings/Policies):prompt 版本列表、流量比例、分数、一键回滚。 + 证据: +- [ ] [CONCEPT: 前端] 实现加载/空态(样本不足)/错误态(评测失败)/权限态(仅算法运营)。 + 证据: +- [ ] [CONCEPT: 前端] 对齐现有企业后台风格(参考 `chat-ui-saas-styling` / `theme-settings-enterprise-ai-style`),避免营销页观感。 + 证据: + +## 6. 测试与验证 + +- [x] [CONCEPT: 测试方案] 后端单测:embedding_provider(GLM/Ollama 分支、维度缓存、HTTP 错误降级)、few_shot_ingestion(confirmed/false_positive 入库、ignored 跳过、幂等去重、hook 触发、feature flag、吞异常)、few_shot_retrieval(去重、token 预算、超长截断)+ prompt 注入(合并 examples、向后兼容)。 + 证据:`server/tests/test_embedding_provider.py`、`server/tests/test_few_shot_ingestion.py`、`server/tests/test_few_shot_retrieval_and_prompt.py`,容器内 `pytest -q` 20 passed。 +- [ ] [CONCEPT: 测试方案] 前端:智商看板与 Canary 控制台视图模型 + 构建验证。 + 证据:飞轮 3/6 前端,未启动。 +- [ ] [CONCEPT: 测试方案] 集成:golden set → 改 prompt → 门禁拦截/通过 → 看板新增数据点,容器内跑通。 + 证据:飞轮 2 集成,未启动。 +- [x] [CONCEPT: 测试方案] 回归:现有 RAG / risk_observations / risk_rule_generation 测试全过。 + 证据:容器内 `pytest -q server/tests/test_risk_observations_service.py server/tests/test_knowledge_rag_runtime.py server/tests/test_risk_rule_generation.py server/tests/test_risk_rule_generation_failure.py` → 35 passed,EmbeddingProvider 抽离零回归。 +- [ ] [CONCEPT: 指标与验收] 记录验证命令与结果,确认 P95 检索 ≤ 200ms、单场景评测 ≤ 60s。 + 证据:性能指标待飞轮 2 评测上线后连同 golden set 一起量。 + +## 7. 文档收尾 + +- [x] [CONCEPT: 指标与验收] 飞轮 1(few-shot 注入)A1 功能验收已达成:推理时 prompt 中可见带 `source: "historical_confirmed"` 的 few-shot 块,且样本来自 confirmed/false_positive 池。A5 可观测性部分达成(可追溯 source)。A2/A3/A4 随飞轮 2/3 补齐。 + 证据:见阶段 3/4/6 已勾选项。 +- [ ] [CONCEPT: 风险与开放问题] 更新 Canary 比例、看板菜单位置、政策新鲜度检测的最终结论与剩余风险。 + 证据:飞轮 2-6 启动后再定稿。 +- [x] [CONCEPT: 功能一句话] 确认飞轮 1 实现没有偏离"让系统越用越聪明"的原始目标。 + 证据:人工确认风险观测 → 自动入库 + 向量化 → 下次规则编译时检索注入相似历史样本,形成"用得越多 → 样本越丰富 → 推理越准"的闭环。飞轮 2-6 待后续迭代。 diff --git a/server/tests/test_embedding_provider.py b/server/tests/test_embedding_provider.py new file mode 100644 index 0000000..0949a69 --- /dev/null +++ b/server/tests/test_embedding_provider.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from app.services.embedding_provider import EmbeddingProvider, _runtime_model_config_from_dict +from app.services.knowledge_rag_runtime import KnowledgeRagError, RuntimeModelConfig + + +def _config(provider: str = "GLM") -> RuntimeModelConfig: + return RuntimeModelConfig( + slot="embedding", + provider=provider, + model="Embedding-3", + endpoint="https://open.bigmodel.cn/api/paas/v4/", + api_key="k", + capability="embedding", + ) + + +def test_runtime_model_config_from_dict_maps_fields() -> None: + cfg = _runtime_model_config_from_dict( + { + "slot": "embedding", + "provider": "GLM", + "model": "Embedding-3", + "endpoint": "https://e", + "apiKey": "secret", + "capability": "embedding", + } + ) + assert cfg.api_key == "secret" + assert cfg.model == "Embedding-3" + + +def test_embed_empty_texts_returns_empty() -> None: + provider = EmbeddingProvider(_config()) + assert provider.embed([]) == [] + + +def test_embed_returns_vectors_and_caches_dimension() -> None: + provider = EmbeddingProvider(_config()) + with patch( + "app.services.embedding_provider._request_embeddings_public", + return_value=[[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]], + ) as mock_req: + vectors = provider.embed(["a", "b"]) + assert vectors == [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]] + assert provider.dimension() == 3 + calls_after_first_dimension = mock_req.call_count + # 第二次 dimension 不应再次请求 + assert provider.dimension() == 3 + assert mock_req.call_count == calls_after_first_dimension + + +def test_dimension_raises_on_invalid_vectors() -> None: + provider = EmbeddingProvider(_config()) + with patch( + "app.services.embedding_provider._request_embeddings_public", + return_value=[], + ): + with pytest.raises(KnowledgeRagError): + provider.dimension() + + +def test_request_embeddings_public_glm_branch() -> None: + cfg = _config("GLM") + with patch( + "app.services.embedding_provider._send_json_request", + return_value=(200, {"data": [{"embedding": [0.1, 0.2]}]}), + ) as mock_send: + from app.services.embedding_provider import _request_embeddings_public + + vectors = _request_embeddings_public(cfg, ["x"]) + assert vectors == [[0.1, 0.2]] + called_url = mock_send.call_args.args[1] + assert called_url.endswith("/embeddings") + + +def test_request_embeddings_public_ollama_branch() -> None: + cfg = _config("Ollama") + with patch( + "app.services.embedding_provider._send_json_request", + return_value=(200, {"embeddings": [[0.5, 0.6]]}), + ) as mock_send: + from app.services.embedding_provider import _request_embeddings_public + + vectors = _request_embeddings_public(cfg, ["x"]) + assert vectors == [[0.5, 0.6]] + called_url = mock_send.call_args.args[1] + assert called_url.endswith("/api/embed") + + +def test_request_embeddings_public_raises_on_http_error() -> None: + cfg = _config("GLM") + with patch( + "app.services.embedding_provider._send_json_request", + return_value=(500, {"message": "boom"}), + ): + from app.services.embedding_provider import _request_embeddings_public + + with pytest.raises(KnowledgeRagError): + _request_embeddings_public(cfg, ["x"]) diff --git a/server/tests/test_few_shot_ingestion.py b/server/tests/test_few_shot_ingestion.py new file mode 100644 index 0000000..244455c --- /dev/null +++ b/server/tests/test_few_shot_ingestion.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +from collections.abc import Generator +from datetime import datetime +from decimal import Decimal +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.pool import StaticPool + +from app.db.base import Base +from app.models.employee import Employee +from app.models.few_shot_sample import FewShotSample +from app.models.financial_record import ExpenseClaim +from app.models.risk_observation import RiskObservation +from app.schemas.risk_observation import RiskObservationFeedbackCreate +from app.services.few_shot_ingestion import FewShotIngestionService +from app.services.risk_observations import RiskObservationService + + +def _build_session() -> Session: + engine = create_engine( + "sqlite+pysqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(bind=engine) + factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) + return factory() + + +def _observation(db: Session, key: str = "risk:c1:dup") -> RiskObservation: + db.add(Employee(id="emp-1", employee_no="E1", name="员工", email="e@e.com", grade="P6")) + db.add( + ExpenseClaim( + id="c1", + claim_no="BX-001", + employee_id="emp-1", + employee_name="员工", + department_name="风控部", + expense_type="travel", + reason="客户拜访", + location="上海", + amount=Decimal("1000"), + currency="CNY", + occurred_at=datetime(2026, 1, 1), + submitted_at=datetime(2026, 1, 1), + status="submitted", + approval_stage="manager_review", + risk_flags_json=[], + ) + ) + db.flush() + obs = RiskObservation( + observation_key=key, + subject_type="expense_claim", + subject_key="claim:c1", + claim_id="c1", + claim_no="BX-001", + risk_type="duplicate_invoice", + risk_signal="duplicate_invoice", + title="重复发票", + description="同一发票出现在多张报销单", + risk_score=86, + risk_level="high", + confidence_score=0.8, + source="financial_risk_graph", + algorithm_version="v1", + ontology_json={"domain": "expense", "scenario": "reimbursement"}, + ) + db.add(obs) + db.commit() + db.refresh(obs) + return obs + + +def test_ingest_confirmed_persists_sample_and_calls_store() -> None: + with _build_session() as db: + obs = _observation(db) + obs.feedback_status = "confirmed" + service = FewShotIngestionService(db) + fake_store = MagicMock() + fake_store.upsert.return_value = "vec-1" + with patch.object(service, "_store", return_value=fake_store): + sample = service.ingest_observation_feedback( + obs, + MagicMock(feedback_type="confirm", comment="确认重复发票", actor="audit"), + ) + assert sample is not None + assert sample.label == "confirmed" + assert sample.sample_key == f"obs:{obs.id}" + assert "重复发票" in sample.case_text + assert "确认重复发票" in sample.conclusion_text + assert sample.vector_id == "vec-1" + fake_store.upsert.assert_called_once() + + +def test_ingest_false_positive_also_persisted() -> None: + with _build_session() as db: + obs = _observation(db, key="risk:c2:fp") + obs.feedback_status = "false_positive" + db.commit() + service = FewShotIngestionService(db) + with patch.object(service, "_store", return_value=MagicMock(upsert=MagicMock(return_value=None))): + sample = service.ingest_observation_feedback( + obs, + MagicMock(feedback_type="false_positive", comment="", actor="audit"), + ) + assert sample is not None + assert sample.label == "false_positive" + assert "误报" in sample.conclusion_text + + +def test_ingest_ignored_label_returns_none() -> None: + with _build_session() as db: + obs = _observation(db) + obs.feedback_status = "ignored" + service = FewShotIngestionService(db) + assert service.ingest_observation_feedback(obs, MagicMock()) is None + + +def test_ingest_is_idempotent_on_duplicate_sample_key() -> None: + with _build_session() as db: + obs = _observation(db) + service = FewShotIngestionService(db) + store = MagicMock() + store.upsert.side_effect = ["vec-1", "vec-2"] + with patch.object(service, "_store", return_value=store): + obs.feedback_status = "confirmed" + first = service.ingest_observation_feedback( + obs, MagicMock(feedback_type="confirm", comment="第一次", actor="a") + ) + # 模拟后续被改判为误报 + obs.feedback_status = "false_positive" + second = service.ingest_observation_feedback( + obs, MagicMock(feedback_type="false_positive", comment="改判", actor="a") + ) + assert first is not None and second is not None + assert first.id == second.id # 同一行更新 + from sqlalchemy import select + + count = db.scalar(select(FewShotSample).where(FewShotSample.sample_key == f"obs:{obs.id}")) + assert count is not None + assert second.label == "false_positive" + + +def test_create_feedback_hook_triggers_ingestion() -> None: + with _build_session() as db: + service = RiskObservationService(db) + obs = _observation(db) + ingest_calls: list = [] + + def _spy_ingest(o, f): + ingest_calls.append((o.id, f.feedback_type)) + return None + + with patch( + "app.services.few_shot_ingestion.FewShotIngestionService.ingest_observation_feedback", + side_effect=_spy_ingest, + ): + service.create_feedback( + obs.observation_key, + RiskObservationFeedbackCreate(feedback_type="confirm", actor="audit"), + ) + assert len(ingest_calls) == 1 + assert ingest_calls[0][1] == "confirm" + + +def test_create_feedback_hook_skipped_for_comment_feedback() -> None: + with _build_session() as db: + service = RiskObservationService(db) + obs = _observation(db) + with patch( + "app.services.few_shot_ingestion.FewShotIngestionService.ingest_observation_feedback" + ) as mock_ingest: + service.create_feedback( + obs.observation_key, + RiskObservationFeedbackCreate(feedback_type="comment", action="note", actor="audit"), + ) + mock_ingest.assert_not_called() + + +def test_create_feedback_hook_swallows_ingestion_failure() -> None: + with _build_session() as db: + service = RiskObservationService(db) + obs = _observation(db) + with patch( + "app.services.few_shot_ingestion.FewShotIngestionService.ingest_observation_feedback", + side_effect=RuntimeError("boom"), + ): + # 不应抛异常 + feedback = service.create_feedback( + obs.observation_key, + RiskObservationFeedbackCreate(feedback_type="confirm", actor="audit"), + ) + assert feedback.feedback_type == "confirm" + + +def test_create_feedback_hook_respects_feature_flag(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("FEW_SHOT_INJECTION_ENABLED", "false") + with _build_session() as db: + service = RiskObservationService(db) + obs = _observation(db) + with patch( + "app.services.few_shot_ingestion.FewShotIngestionService.ingest_observation_feedback" + ) as mock_ingest: + service.create_feedback( + obs.observation_key, + RiskObservationFeedbackCreate(feedback_type="confirm", actor="audit"), + ) + mock_ingest.assert_not_called() diff --git a/server/tests/test_few_shot_retrieval_and_prompt.py b/server/tests/test_few_shot_retrieval_and_prompt.py new file mode 100644 index 0000000..df99f54 --- /dev/null +++ b/server/tests/test_few_shot_retrieval_and_prompt.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from app.services.few_shot_retrieval import FewShotRetriever +from app.services.few_shot_store import FewShotStore +from app.services.risk_rule_generation_prompt import build_risk_rule_compiler_messages + + +def _hit(score: float, label: str, conclusion: str, risk_type: str = "duplicate_invoice") -> dict: + return { + "sample_id": "s1", + "score": score, + "label": label, + "domain": "expense", + "risk_type": risk_type, + "conclusion_text": conclusion, + "payload_json": { + "risk_signal": risk_type, + "risk_level": "high", + "ontology": {"scenario": "reimbursement"}, + "feedback_comment": "", + }, + } + + +def test_retrieve_returns_injection_blocks_with_token_budget() -> None: + store = MagicMock(spec=FewShotStore) + store.search.return_value = [ + _hit(0.9, "confirmed", "确认重复发票需拦截"), + _hit(0.8, "false_positive", "此情形属于正常拆单不拦截"), + _hit(0.7, "confirmed", "确认重复发票需拦截"), # 重复结论应被去重 + ] + retriever = FewShotRetriever(store) + blocks = retriever.retrieve_for_risk_rule_generation( + domain="expense", natural_language="同一发票重复报销" + ) + assert len(blocks) == 2 + assert blocks[0]["score"] == 0.9 + assert blocks[0]["label"] == "confirmed" + assert blocks[0]["source"] == "historical_confirmed" + assert blocks[1]["label"] == "false_positive" + # 去重:第三条结论与第一条相同,应被过滤 + conclusions = [b["conclusion"] for b in blocks] + assert len(set(conclusions)) == len(conclusions) + + +def test_retrieve_empty_case_text_returns_empty() -> None: + store = MagicMock(spec=FewShotStore) + retriever = FewShotRetriever(store) + assert retriever.retrieve_for_risk_rule_generation(natural_language="") == [] + store.search.assert_not_called() + + +def test_retrieve_truncates_overlong_conclusion() -> None: + store = MagicMock(spec=FewShotStore) + long_text = "长结论" * 500 + store.search.return_value = [ + _hit(0.9, "confirmed", long_text), + ] + retriever = FewShotRetriever(store) + blocks = retriever.retrieve_for_risk_rule_generation(natural_language="x") + assert len(blocks) == 1 + # 超长结论应被截断到单条上限 + from app.services.few_shot_retrieval import SINGLE_SAMPLE_MAX_CHARS + + assert len(blocks[0]["conclusion"]) <= SINGLE_SAMPLE_MAX_CHARS + + +def test_build_prompt_merges_few_shot_into_examples() -> None: + samples = [ + { + "source": "historical_confirmed", + "label": "confirmed", + "domain": "expense", + "risk_type": "duplicate_invoice", + "conclusion": "确认重复发票", + "context": {"risk_signal": "duplicate_invoice"}, + } + ] + messages = build_risk_rule_compiler_messages( + domain="expense", + domain_label="报销", + business_stage="reimbursement", + business_stage_label="报销", + expense_category=None, + expense_category_label="", + natural_language="重复发票规则", + available_fields=[{"key": "attachment.invoice_no", "label": "发票号", "type": "string", "source": "attachment"}], + few_shot_samples=samples, + ) + assert len(messages) == 2 + payload = json.loads(messages[1]["content"]) + examples = payload["examples"] + # 前两条是历史样本,后面是内置 examples + assert examples[0]["source"] == "historical_confirmed" + assert examples[0]["conclusion"] == "确认重复发票" + # 内置 example 仍存在(无 source 字段) + assert any("user_rule" in ex for ex in examples) + + +def test_build_prompt_without_few_shot_is_backward_compatible() -> None: + messages = build_risk_rule_compiler_messages( + domain="expense", + domain_label="报销", + business_stage="reimbursement", + business_stage_label="报销", + expense_category=None, + expense_category_label="", + natural_language="重复发票规则", + available_fields=[], + ) + payload = json.loads(messages[1]["content"]) + examples = payload["examples"] + # 无 few_shot_samples 时 examples 里不应有 historical_confirmed 来源 + assert all(ex.get("source") != "historical_confirmed" for ex in examples)