from __future__ import annotations import json import re from sqlalchemy.orm import Session from app.core.agent_enums import AgentAssetStatus, AgentAssetType from app.schemas.agent_asset import AgentAssetListItem from app.schemas.user_agent import ( UserAgentCitation, UserAgentDraftPayload, UserAgentRequest, UserAgentResponse, UserAgentSuggestedAction, ) from app.services.agent_assets import AgentAssetService from app.services.agent_foundation import AgentFoundationService from app.services.runtime_chat import RuntimeChatService SCENARIO_LABELS = { "expense": "报销", "accounts_receivable": "应收", "accounts_payable": "应付", "knowledge": "知识", "unknown": "通用", } RISK_REASON_MAP = { "duplicate_expense": "检测到同员工、同金额或近似单据存在重复提交迹象。", "amount_over_limit": "金额超过当前制度或预算阈值,需要补充例外说明。", "invoice_anomaly": "票据或附件完整性不满足当前规则要求,需要补件或人工复核。", "ar_overdue": "应收账款已出现逾期,存在回款延迟风险。", "ap_overdue": "应付付款已出现逾期,可能影响供应商履约或合作关系。", } GENERIC_EXPENSE_PROMPTS = { "报销", "我要报销", "我想报销", "帮我报销", "我要申请报销", "发起报销", "提交报销", } EXPLICIT_DRAFT_KEYWORDS = ("生成", "草稿", "起草", "创建", "发起", "准备") EXPENSE_TYPE_LABELS = { "travel": "差旅", "hotel": "住宿", "transport": "交通", "meal": "餐费", "meeting": "会务", "entertainment": "招待", } class UserAgentService: def __init__(self, db: Session) -> None: self.db = db self.asset_service = AgentAssetService(db) self.runtime_chat_service = RuntimeChatService(db) def respond(self, payload: UserAgentRequest) -> UserAgentResponse: AgentFoundationService(self.db).ensure_foundation_ready() citations = self._build_rule_citations(payload) suggested_actions = self._build_suggested_actions(payload) risk_flags = self._resolve_risk_flags(payload) draft_payload = ( self._build_draft_payload(payload) if payload.ontology.intent == "draft" else None ) if payload.degraded and payload.tool_payload.get("message"): return UserAgentResponse( answer=str(payload.tool_payload["message"]), citations=citations, suggested_actions=suggested_actions, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) guided_answer = self._build_guided_answer(payload) if guided_answer: return UserAgentResponse( answer=guided_answer, citations=citations, suggested_actions=suggested_actions, draft_payload=draft_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) fallback_answer = self._build_fallback_answer( payload, citations=citations, draft_payload=draft_payload, ) answer = self._generate_answer_with_model( payload, citations=citations, suggested_actions=suggested_actions, risk_flags=risk_flags, draft_payload=draft_payload, fallback_answer=fallback_answer, ) return UserAgentResponse( answer=answer or fallback_answer, citations=citations, suggested_actions=suggested_actions, draft_payload=draft_payload, risk_flags=risk_flags, requires_confirmation=payload.requires_confirmation, ) def _build_fallback_answer( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], draft_payload: UserAgentDraftPayload | None, ) -> str: if payload.ontology.intent in {"query", "compare"}: return self._build_query_answer(payload) if payload.ontology.intent == "risk_check": return self._build_risk_answer(payload, citations) if payload.ontology.intent == "draft" and draft_payload is not None: return ( f"已生成 {draft_payload.title},当前仅返回待人工确认的草稿内容," "仍需人工确认后再进入正式流程。" ) return self._build_explain_answer(payload, citations) def _build_guided_answer(self, payload: UserAgentRequest) -> str | None: if not self._is_generic_expense_prompt(payload): return self._build_implicit_expense_draft_guidance(payload) attachment_names = self._resolve_attachment_names(payload) attachment_hint = "" if attachment_names: attachment_hint = ( f" 我已带入 {len(attachment_names)} 份附件名称,但目前还不能直接读取附件内容," "仍需要你补充关键信息。" ) return ( "可以帮你发起报销。请补充费用类型、发生时间、金额、事由和相关对象," "或者直接上传票据附件,我再继续帮你判断能否报、缺什么材料以及生成报销草稿。" f"{attachment_hint}" ) def _build_implicit_expense_draft_guidance( self, payload: UserAgentRequest, ) -> str | None: if not self._is_implicit_expense_draft_request(payload): return None amount_text = next( (item.value for item in payload.ontology.entities if item.type == "amount"), "", ) expense_type = next( ( EXPENSE_TYPE_LABELS.get(item.normalized_value, item.value) for item in payload.ontology.entities if item.type == "expense_type" ), "报销", ) time_text = payload.ontology.time_range.raw or "本次" amount_hint = f",金额 {amount_text}" if amount_text else "" return ( f"已识别到一笔{time_text}的{expense_type}支出{amount_hint}。" "如果要继续生成报销草稿,还需要补充客户单位、参与人员、费用明细和票据附件。" "你也可以继续上传发票或图片,我会把这些信息带入后续对话。" ) def _generate_answer_with_model( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], suggested_actions: list[UserAgentSuggestedAction], risk_flags: list[str], draft_payload: UserAgentDraftPayload | None, fallback_answer: str, ) -> str | None: messages = self._build_model_messages( payload, citations=citations, suggested_actions=suggested_actions, risk_flags=risk_flags, draft_payload=draft_payload, fallback_answer=fallback_answer, ) return self._sanitize_model_answer( self.runtime_chat_service.complete( messages, max_tokens=420, temperature=0.2, ) ) def _sanitize_model_answer(self, answer: str | None) -> str | None: if not answer: return None cleaned = re.sub(r".*?", "", answer, flags=re.DOTALL | re.IGNORECASE) cleaned = cleaned.strip() return cleaned or None def _build_model_messages( self, payload: UserAgentRequest, *, citations: list[UserAgentCitation], suggested_actions: list[UserAgentSuggestedAction], risk_flags: list[str], draft_payload: UserAgentDraftPayload | None, fallback_answer: str, ) -> list[dict[str, str]]: facts = { "run_id": payload.run_id, "user_message": payload.message, "ontology": payload.ontology.model_dump(mode="json"), "context": { "entry_source": payload.context_json.get("entry_source"), "user_name": payload.context_json.get("name"), "user_role": payload.context_json.get("role"), "request_context": payload.context_json.get("request_context"), "attachment_count": payload.context_json.get("attachment_count"), "attachment_names": self._resolve_attachment_names(payload), }, "tool_payload": payload.tool_payload, "citations": [item.model_dump(mode="json") for item in citations], "suggested_actions": [ item.model_dump(mode="json") for item in suggested_actions ], "risk_flags": risk_flags, "draft_payload": ( draft_payload.model_dump(mode="json") if draft_payload is not None else None ), "selected_capability_codes": payload.selected_capability_codes, "requires_confirmation": payload.requires_confirmation, "fallback_answer": fallback_answer, } system_prompt = ( "你是企业财务共享场景中的中文智能助手,负责和最终用户直接对话。" "你只能基于提供的事实回答,不能编造制度、流程结果或附件内容。" "如果用户问题很笼统,例如“我要报销”,优先告诉用户你可以协助什么," "并明确要求补充费用类型、金额、时间、事由、参与对象或上传票据。" "如果上下文里只有附件名称,必须明确说明你只拿到了附件名称," "不能假装已看过图片、PDF 或发票内容。" "不要声称已经提交、审批、付款、入账或真正执行了任何动作;如果只是建议、草稿或待确认,要明确说清楚。" "若给出了风险标签、制度引用或建议动作,可以简洁吸收进回答,但不要新增未提供的事实。" "只输出最终给用户看的自然语言,不要输出 JSON、Markdown、标题、" " 标签或任何中间推理。" "使用简体中文,控制在 2 到 4 句。" ) user_prompt = ( "请根据以下事实生成最终答复,优先保持准确、具体、可执行:\n" f"{json.dumps(facts, ensure_ascii=False, indent=2)}" ) return [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] def _build_query_answer(self, payload: UserAgentRequest) -> str: scenario = payload.ontology.scenario data = payload.tool_payload subject = self._resolve_subject(payload) if scenario == "expense": record_count = int(data.get("record_count") or 0) total_amount = float(data.get("total_amount") or 0) return ( f"{subject}共命中 {record_count} 笔报销,金额合计 {total_amount:.2f} 元。" "如需继续处理,可以查看明细或生成处理意见草稿。" ) if scenario == "accounts_receivable": record_count = int(data.get("record_count") or 0) outstanding_amount = float(data.get("outstanding_amount") or 0) return ( f"{subject}共命中 {record_count} 条应收,未回款金额 {outstanding_amount:.2f} 元。" "建议结合账龄和客户分布继续排查逾期风险。" ) if scenario == "accounts_payable": record_count = int(data.get("record_count") or 0) outstanding_amount = float(data.get("outstanding_amount") or 0) return ( f"{subject}共命中 {record_count} 条应付,待付金额 {outstanding_amount:.2f} 元。" "如需推进动作,建议先生成付款建议草稿并发起人工确认。" ) return "已完成当前查询,但暂时没有更多结构化结果可展示。" def _build_explain_answer( self, payload: UserAgentRequest, citations: list[UserAgentCitation], ) -> str: if citations: titles = "、".join(item.title for item in citations[:2]) summary = citations[0].excerpt or "请结合制度全文进一步确认。" return f"已检索到相关依据:{titles}。核心说明:{summary}" return ( f"当前还没有与“{SCENARIO_LABELS.get(payload.ontology.scenario, '当前问题')}”" "强匹配的已上线规则引用,建议先人工复核或补充更具体的单据上下文。" ) def _build_risk_answer( self, payload: UserAgentRequest, citations: list[UserAgentCitation], ) -> str: risk_flags = self._resolve_risk_flags(payload) if not risk_flags: return "当前未识别到明确风险标签,建议继续查看原始明细或补充更多上下文。" reasons = [RISK_REASON_MAP.get(flag, f"{flag} 需要人工进一步确认。") for flag in risk_flags] citation_text = ( f" 参考规则:{'、'.join(item.title for item in citations[:2])}。" if citations else "" ) return ( f"本次识别到 {len(risk_flags)} 类风险:{'、'.join(risk_flags)}。" f"触发原因:{';'.join(reasons)}。" "建议先复核明细、附件和审批链,再决定是否继续处理。" f"{citation_text}" ) def _build_draft_payload(self, payload: UserAgentRequest) -> UserAgentDraftPayload: scenario_label = SCENARIO_LABELS.get(payload.ontology.scenario, "业务") subject = self._resolve_subject(payload) title = f"{scenario_label}处理意见草稿" body = ( f"主题:{subject}\n" "结论:已根据当前语义解析结果生成草稿,尚未自动执行。\n" "建议:请先核对明细、规则命中和所需附件,再由人工确认是否提交正式流程。\n" f"原始问题:{payload.message}" ) return UserAgentDraftPayload( draft_type=payload.ontology.scenario, title=title, body=body, confirmation_required=True, ) def _build_suggested_actions( self, payload: UserAgentRequest, ) -> list[UserAgentSuggestedAction]: if self._is_generic_expense_prompt(payload): return [ UserAgentSuggestedAction( label="上传票据", action_type="ask_clarification", description="上传发票、行程单或付款截图,继续识别报销内容。", ), UserAgentSuggestedAction( label="补充报销信息", action_type="ask_clarification", description="补充费用类型、金额、时间和事由后继续处理。", ), ] if payload.ontology.intent in {"query", "compare"}: return [ UserAgentSuggestedAction( label="查看明细", action_type="open_detail", description="继续查看命中记录和过滤条件。", ), UserAgentSuggestedAction( label="生成处理意见", action_type="create_draft", description="把当前查询结果整理成可确认草稿。", ), ] if payload.ontology.intent == "risk_check": return [ UserAgentSuggestedAction( label="人工复核风险", action_type="manual_review", description="优先检查明细、附件和规则命中原因。", ), UserAgentSuggestedAction( label="生成整改建议", action_type="create_draft", description="把风险说明整理成处理意见草稿。", ), ] if payload.ontology.intent == "draft": return [ UserAgentSuggestedAction( label="复制草稿", action_type="copy_draft", description="复制当前草稿后交由人工确认。", ), UserAgentSuggestedAction( label="补充上下文", action_type="ask_clarification", description="补充单据编号、客户或供应商信息以完善草稿。", ), ] return [ UserAgentSuggestedAction( label="查看规则全文", action_type="open_rule", description="继续查看引用规则或知识内容。", ), UserAgentSuggestedAction( label="补充问题上下文", action_type="ask_clarification", description="补充业务对象、时间或单据范围,提升回答准确度。", ), ] def _build_rule_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]: domain = self._resolve_domain(payload.ontology.scenario) items = self.asset_service.list_assets( asset_type=AgentAssetType.RULE.value, status=AgentAssetStatus.ACTIVE.value, domain=domain, ) ranked = self._rank_rule_assets(items, payload) citations: list[UserAgentCitation] = [] for item in ranked[:2]: detail = self.asset_service.get_asset(item.id) if detail is None: continue excerpt = self._extract_excerpt(str(detail.current_version_content or "")) citations.append( UserAgentCitation( source_type="rule", code=detail.code, title=detail.name, version=detail.current_version, updated_at=detail.updated_at.date().isoformat(), excerpt=excerpt, ) ) return citations @staticmethod def _resolve_risk_flags(payload: UserAgentRequest) -> list[str]: tool_flags = payload.tool_payload.get("risk_flags") if isinstance(tool_flags, list) and tool_flags: return [str(item) for item in tool_flags] return [str(item) for item in payload.ontology.risk_flags] @staticmethod def _resolve_subject(payload: UserAgentRequest) -> str: named_entities = [ item.value for item in payload.ontology.entities if item.type in {"employee", "customer", "vendor", "project"} ] if named_entities: return f"{'、'.join(named_entities)} 相关数据" return f"{SCENARIO_LABELS.get(payload.ontology.scenario, '当前')}场景数据" @staticmethod def _is_generic_expense_prompt(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense": return False normalized_message = re.sub(r"\s+", "", payload.message) return normalized_message in GENERIC_EXPENSE_PROMPTS @staticmethod def _is_implicit_expense_draft_request(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense" or payload.ontology.intent != "draft": return False compact_message = re.sub(r"\s+", "", payload.message) if any(keyword in compact_message for keyword in EXPLICIT_DRAFT_KEYWORDS): return False return True @staticmethod def _resolve_attachment_names(payload: UserAgentRequest) -> list[str]: names = payload.context_json.get("attachment_names") if not isinstance(names, list): return [] return [str(name) for name in names if str(name).strip()] @staticmethod def _resolve_domain(scenario: str) -> str | None: if scenario == "expense": return "expense" if scenario == "accounts_receivable": return "ar" if scenario == "accounts_payable": return "ap" return None @staticmethod def _rank_rule_assets( items: list[AgentAssetListItem], payload: UserAgentRequest, ) -> list[AgentAssetListItem]: def score(item: AgentAssetListItem) -> tuple[int, str]: tags = {str(value) for value in item.scenario_json or []} weight = 0 if payload.ontology.scenario in tags: weight += 3 if payload.ontology.intent in tags: weight += 2 for risk_flag in payload.ontology.risk_flags: if risk_flag in tags: weight += 4 return weight, item.code ranked = sorted(items, key=score, reverse=True) return [item for item in ranked if score(item)[0] > 0] @staticmethod def _extract_excerpt(content: str) -> str: lines = [line.strip() for line in str(content).splitlines() if line.strip()] cleaned: list[str] = [] for line in lines: normalized = re.sub(r"^[#>\-\*\d\.\s`]+", "", line).strip() if normalized: cleaned.append(normalized) if len(cleaned) >= 2: break return ";".join(cleaned[:2])