from __future__ import annotations import re import uuid from datetime import date from typing import Any from app.schemas.steward import ( StewardCandidateFlow, StewardConfirmationAction, StewardPendingFlowConfirmation, StewardPlanRequest, StewardPlanResponse, StewardTask, StewardThinkingEvent, ) from app.services.steward_planner_shared import ( APPLICATION_SPLIT_PATTERN, BUSINESS_FIELD_LABELS, PlannedTaskDraft, REIMBURSEMENT_PATTERN, STEWARD_BUSINESS_SIGNAL_KEYWORDS, STEWARD_GREETING_KEYWORDS, STEWARD_OFF_TOPIC_SCENARIO_GREETING, STEWARD_OFF_TOPIC_SCENARIO_MEANINGLESS, STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS, ) class StewardPlannerFallbackMixin: def _should_use_model_intent_recognition( self, message: str, base_date: date, request: StewardPlanRequest, ) -> bool: return bool(message.strip()) @staticmethod def _is_business_irrelevant_input(message: str, request: StewardPlanRequest) -> bool: """判断输入是否与小财管家支持的财务事项完全无关(向后兼容包装)。 判定规则:消息去除所有空白后不含任何业务信号关键词,且没有上传附件。 实际判定逻辑由 _classify_irrelevant_input 负责,命中任何场景即视为业务无关。 """ return StewardPlannerFallbackMixin._classify_irrelevant_input(message, request) is not None @staticmethod def _classify_irrelevant_input(message: str, request: StewardPlanRequest) -> str | None: """把业务无关输入细分为三个场景,便于给出更贴切的引导。 返回值: - "greeting":礼貌问候("你好"等),无业务关键词 - "meaningless":完全无意义内容(纯数字、纯标点、单字符重复、纯字母数字乱码) - "off_business":有意义但与财务无关(问天气、聊生活等) - None:消息与业务相关,无需走 off_topic 路径 """ if request.attachments: return None compact = re.sub(r"\s+", "", message) if not compact: return None if any(keyword in compact for keyword in STEWARD_BUSINESS_SIGNAL_KEYWORDS): return None # 补充注册表里各意图声明的信号词(如查询类"差旅标准"等),避免被判 off_topic from app.services.steward_intent_registry import all_signal_keywords if any(keyword in compact for keyword in all_signal_keywords()): return None if StewardPlannerFallbackMixin._looks_like_greeting(compact): return STEWARD_OFF_TOPIC_SCENARIO_GREETING if StewardPlannerFallbackMixin._looks_like_meaningless(compact): return STEWARD_OFF_TOPIC_SCENARIO_MEANINGLESS return STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS @staticmethod def _looks_like_greeting(compact_message: str) -> bool: """判断消息是否只是礼貌问候(无其他有意义内容)。""" normalized = compact_message.lower() for keyword in STEWARD_GREETING_KEYWORDS: if normalized == keyword.lower() or normalized.startswith(keyword.lower()): # 整句只是问候词(允许少量标点) tail = normalized[len(keyword.lower()):] if not tail or re.fullmatch(r"[!!。.??,,~\s]+", tail): return True return False @staticmethod def _looks_like_meaningless(compact_message: str) -> bool: """判断消息是否完全没有语义价值(纯数字、纯标点、单字符重复等)。""" if re.fullmatch(r"\d+", compact_message): return True # 纯标点 if re.fullmatch(r"[\W_]+", compact_message): return True # 单字符重复(例如 "啊啊啊啊啊") if len(compact_message) >= 2 and len(set(compact_message)) == 1: return True # 短字母数字组合但没有任何业务意义,例如 "abc"、"test123" # 注意:必须排除已经被关键词命中的情况(前面的判定已保证不命中关键词) if re.fullmatch(r"[a-zA-Z0-9]+", compact_message) and len(compact_message) <= 12: return True return False def _build_off_topic_plan( self, request: StewardPlanRequest, *, scenario: str, model_call_traces: list[dict[str, Any]] | None = None, fallback_reason: str = "", ) -> StewardPlanResponse: """业务无关输入的兜底计划:根据场景给出对应引导,off_business 场景可由 LLM 增强。""" base_summary = self._default_off_topic_summary(scenario) thinking_event = self._build_off_topic_thinking_event(scenario) suggested_prompts = self._off_topic_suggested_prompts(scenario) traces = list(model_call_traces or []) # 仅对 off_business 场景尝试让 LLM 生成多样化引导;问候/无意义场景用规则模板即可。 if ( scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS and self.off_topic_agent is not None ): try: llm_result = self.off_topic_agent.generate(request, scenario=scenario) if llm_result is not None and llm_result.response_text: base_summary = llm_result.response_text traces = llm_result.model_call_traces except Exception: # 失败时静默回退到规则模板 pass thinking_events = [thinking_event] if fallback_reason: thinking_events.insert( 0, StewardThinkingEvent( event_id="intent_agent_rule_fallback", stage="rule_fallback", title="意图识别智能体进入兜底模式", content=fallback_reason, ), ) return StewardPlanResponse( plan_id=f"steward_plan_{uuid.uuid4().hex[:12]}", plan_status="off_topic", planning_source="rule_fallback", next_action="none", summary=base_summary, thinking_events=thinking_events, tasks=[], attachment_groups=[], confirmation_groups=[], candidate_flows=[], suggested_prompts=suggested_prompts, model_call_traces=traces, ) @staticmethod def _default_off_topic_summary(scenario: str) -> str: """off_topic 场景的默认引导文案;LLM 不可用时使用。""" if scenario == STEWARD_OFF_TOPIC_SCENARIO_GREETING: return ( "### 您好主人,很高兴为您服务\n\n" "请问您今天要办理什么业务?目前小财管家能帮您整理" "**费用申请**和**费用报销**这两类事项。\n\n" "要不您换种说法告诉我:" ) if scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS: return ( "### 抱歉主人,这句话我暂时帮不上忙\n\n" "我看了您刚才说的这句话,里面聊的不是财务事项。" "小财管家目前只能帮您整理**费用申请**和**费用报销**这两类业务。\n\n" "要不您换种说法告诉我:" ) # meaningless return ( "### 这句话我暂时没识别到财务事项\n\n" "很抱歉主人,目前小财管家只能帮您整理**费用申请**和**费用报销**这两类事项。\n\n" "要不您换种说法告诉我:" ) @staticmethod def _build_off_topic_thinking_event(scenario: str) -> StewardThinkingEvent: """off_topic 场景下向用户展示的思考过程摘要。""" if scenario == STEWARD_OFF_TOPIC_SCENARIO_GREETING: return StewardThinkingEvent( event_id="intent_agent_off_topic_greeting", stage="off_topic", title="先回应主人的问候", content="主人向我打了个招呼,我先礼貌回应一下,再引导他/她说出具体想办什么业务。", ) if scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS: return StewardThinkingEvent( event_id="intent_agent_off_topic_non_business", stage="off_topic", title="这句话不在服务范围内", content="我看了您刚才说的这句话,里面聊的不是财务事项。小财管家目前只能帮您整理费用申请和费用报销。", ) return StewardThinkingEvent( event_id="intent_agent_off_topic_meaningless", stage="off_topic", title="未识别到财务事项", content=( "我仔细看了看您刚才说的这句话,里面好像没有出现" "费用申请、报销、出差、交通、招待这些财务关键词。" ), ) @staticmethod def _off_topic_suggested_prompts(scenario: str) -> list[str]: """off_topic 场景下展示给用户的推荐话术。""" if scenario == STEWARD_OFF_TOPIC_SCENARIO_GREETING: return [ "我想要申请明天去北京出差3天,支撑客户现场实施", "我要报销昨天的交通费", "我上周出差去上海的费用需要报销", ] if scenario == STEWARD_OFF_TOPIC_SCENARIO_OFF_BUSINESS: return [ "我想要申请明天去北京出差3天,支撑客户现场实施", "我要报销昨天的交通费", "我需要整理上周出差的发票", ] # meaningless return [ "我想要申请明天去北京出差3天,支撑客户现场实施", "我要报销昨天的交通费", "我上周出差去上海的费用需要报销", ] def _build_rule_fallback_plan( self, request: StewardPlanRequest, *, base_date: date, model_call_traces: list[dict[str, Any]] | None = None, fallback_reason: str = "", ) -> StewardPlanResponse: message = self._clean_text(request.message) if self._looks_like_ambiguous_travel_flow(message, base_date, request): return self._build_pending_flow_fallback_plan( request, base_date=base_date, model_call_traces=model_call_traces, fallback_reason=fallback_reason, ) task_drafts = self._extract_task_drafts(message) tasks = [self._build_task(draft, base_date, request) for draft in task_drafts] if not tasks: tasks = [self._build_fallback_task(message, base_date, request)] attachment_groups = self._build_attachment_groups(request.attachments, tasks) confirmation_groups = self._build_confirmation_actions(tasks, attachment_groups) thinking_events = self._build_thinking_events(tasks, attachment_groups, request.attachments) if fallback_reason: thinking_events.insert( 0, StewardThinkingEvent( event_id="intent_agent_rule_fallback", stage="rule_fallback", title="意图识别智能体进入兜底模式", content=fallback_reason, ), ) plan_id = f"steward_plan_{uuid.uuid4().hex[:12]}" return StewardPlanResponse( plan_id=plan_id, plan_status="needs_confirmation" if confirmation_groups else "ready_to_delegate", planning_source="rule_fallback", next_action="confirm_task" if confirmation_groups else "delegate_task", summary=self._build_summary(tasks, attachment_groups), thinking_events=thinking_events, tasks=tasks, attachment_groups=attachment_groups, confirmation_groups=confirmation_groups, model_call_traces=model_call_traces or [], ) def _build_pending_flow_fallback_plan( self, request: StewardPlanRequest, *, base_date: date, model_call_traces: list[dict[str, Any]] | None = None, fallback_reason: str = "", planning_source: str = "rule_fallback", ) -> StewardPlanResponse: candidates = self._build_rule_candidate_flows(request, base_date) gate = self._resolve_required_application_gate(request, "travel") pending_reason = self._build_pending_flow_reason(gate) pending = StewardPendingFlowConfirmation( status="pending", source_message=request.message, reason=pending_reason, candidate_flows=candidates, ) thinking_events = [] if fallback_reason: thinking_events.append( StewardThinkingEvent( event_id="intent_agent_rule_fallback", stage="rule_fallback", title="意图识别智能体进入兜底模式", content=fallback_reason, ) ) thinking_events.append( StewardThinkingEvent( event_id="intent_pending_flow_confirmation", stage="flow_confirmation", title="需要确认流程方向", content=pending_reason, ) ) return StewardPlanResponse( plan_id=f"steward_plan_{uuid.uuid4().hex[:12]}", plan_status="needs_flow_confirmation", planning_source=planning_source, # type: ignore[arg-type] next_action="confirm_flow", summary=self._build_pending_flow_summary(gate), thinking_events=thinking_events, pending_flow_confirmation=pending, candidate_flows=candidates, model_call_traces=model_call_traces or [], ) def _build_rule_candidate_flows( self, request: StewardPlanRequest, base_date: date, ) -> list[StewardCandidateFlow]: application_fields = self._extract_ontology_fields( request.message, "expense_application", base_date, request, ) reimbursement_fields = self._extract_ontology_fields( request.message, "reimbursement", base_date, request, ) gate = self._resolve_required_application_gate(request, "travel") if gate.get("checked") and int(gate.get("candidate_count") or 0) <= 0: return [ StewardCandidateFlow( flow_id="travel_application", label="先发起出差申请", confidence=0.86, reason="已先查询您名下可关联的差旅申请单,暂未查到可关联单据,因此应先申请单据。", ontology_fields=application_fields, missing_fields=self._resolve_missing_fields("expense_application", application_fields), ) ] reimbursement_label = "发起费用报销" reimbursement_reason = "用户描述的也可能是已发生出差事项,需要进入报销材料整理。" if gate.get("checked"): candidate_count = int(gate.get("candidate_count") or 0) reimbursement_label = "关联已有申请单并发起报销" reimbursement_reason = f"已先查到 {candidate_count} 个可关联申请单,选择后会先请您关联具体单据。" return [ StewardCandidateFlow( flow_id="travel_application", label="补办出差申请", confidence=0.52, reason="用户描述了出差时间、地点和事由,但没有明确说要报销。", ontology_fields=application_fields, missing_fields=self._resolve_missing_fields("expense_application", application_fields), ), StewardCandidateFlow( flow_id="travel_reimbursement", label=reimbursement_label, confidence=0.48, reason=reimbursement_reason, ontology_fields=reimbursement_fields, missing_fields=self._resolve_missing_fields("reimbursement", reimbursement_fields), ), ] @staticmethod def _resolve_required_application_gate( request: StewardPlanRequest, expense_type: str, ) -> dict[str, Any]: context = request.context_json if isinstance(request.context_json, dict) else {} gates = context.get("required_application_gate") if not isinstance(gates, dict): return {} gate = gates.get(expense_type) if not isinstance(gate, dict) or not gate.get("checked"): return {} try: candidate_count = max(0, int(gate.get("candidate_count") or 0)) except (TypeError, ValueError): candidate_count = 0 return { **gate, "candidate_count": candidate_count, "checked": True, } @staticmethod def _build_pending_flow_reason(gate: dict[str, Any]) -> str: if gate.get("checked") and int(gate.get("candidate_count") or 0) <= 0: return "我已先查询您名下可关联的差旅申请单,未查到可关联单据,所以当前应先申请单据。" if gate.get("checked"): candidate_count = int(gate.get("candidate_count") or 0) return f"我已先查询您名下的差旅申请单,查到 {candidate_count} 个可关联申请单,需要您确认是否关联单据后发起报销。" return "当前话术描述了出差事项,但没有明确说明要补办申请还是发起报销。" @staticmethod def _build_pending_flow_summary(gate: dict[str, Any]) -> str: if gate.get("checked") and int(gate.get("candidate_count") or 0) <= 0: return "我已先查询可关联申请单,暂未查到可关联单据;这次应先申请单据,再进入后续报销。" if gate.get("checked"): candidate_count = int(gate.get("candidate_count") or 0) return ( f"我已先查询可关联申请单,查到 {candidate_count} 个可关联申请单;" "您可以选择关联已有申请单发起报销,也可以改为补办新的出差申请。" ) return ( "我识别到这是一次出差事项,但还不能确定您要做的是" "**补办出差申请**还是**发起费用报销**。请先选择一个方向。" ) def _extract_task_drafts(self, message: str) -> list[PlannedTaskDraft]: drafts: list[PlannedTaskDraft] = [] first_reimbursement = self._find_first_reimbursement_index(message) application_source = message[:first_reimbursement] if first_reimbursement >= 0 else message if self._looks_like_application(application_source) or self._looks_like_future_travel_application(application_source): drafts.append( PlannedTaskDraft( task_type="expense_application", segment=application_source.strip(",,。;; "), index=len(drafts) + 1, ) ) for match in REIMBURSEMENT_PATTERN.finditer(message): segment = f"报销{match.group(1)}" drafts.append( PlannedTaskDraft( task_type="reimbursement", segment=segment.strip(",,。;; "), index=len(drafts) + 1, ) ) return drafts