from __future__ import annotations from typing import Any from app.core.agent_enums import AgentPermissionLevel from app.schemas.ontology import ( OntologyConstraint, OntologyEntity, OntologyFieldError, OntologyMetric, OntologyPermission, OntologyTimeRange, ) from app.services.ontology_rules import ( EXPENSE_REVIEW_ACTIONS, MISSING_SLOT_LABELS, OPERATE_KEYWORDS, PRIVILEGED_ROLE_CODES, ) class OntologyValidationMixin: def _extract_risk_flags(self, compact_query: str, scenario: str) -> list[str]: risk_flags: list[str] = [] def append(flag: str) -> None: if flag not in risk_flags: risk_flags.append(flag) if "重复" in compact_query: append("duplicate_expense") if any( keyword in compact_query for keyword in ("发票异常", "票据异常", "验真失败", "附件缺失", "补件") ): append("invoice_anomaly") if any(keyword in compact_query for keyword in ("超标", "超预算", "超限")): append("amount_over_limit") if scenario == "budget" and any( keyword in compact_query for keyword in ("预算不足", "超预算", "超支") ): append("budget_over_limit") if scenario == "budget" and any( keyword in compact_query for keyword in ("预算预警", "触发预警", "接近预算") ): append("budget_warning") if scenario == "accounts_receivable" and any( keyword in compact_query for keyword in ("逾期", "账龄", "欠款", "未回款") ): append("ar_overdue") if scenario == "accounts_payable" and any( keyword in compact_query for keyword in ("逾期", "待付", "付款风险", "未付款") ): append("ap_overdue") return risk_flags def _resolve_permission( self, compact_query: str, context_json: dict, intent: str, ) -> OntologyPermission: role_codes = { str(item).strip().lower() for item in context_json.get("role_codes", []) if str(item).strip() } is_admin = bool(context_json.get("is_admin")) privileged = is_admin or bool(role_codes & PRIVILEGED_ROLE_CODES) if intent in {"query", "explain", "compare", "risk_check"}: return OntologyPermission( level=AgentPermissionLevel.READ.value, allowed=True, reason="只读查询。", ) if intent == "draft": return OntologyPermission( level=AgentPermissionLevel.DRAFT_WRITE.value, allowed=True, reason="允许生成草稿,但不会直接提交业务动作。", ) if any(keyword in compact_query for keyword in OPERATE_KEYWORDS) or "付款" in compact_query: if privileged: return OntologyPermission( level=AgentPermissionLevel.APPROVAL_REQUIRED.value, allowed=False, reason="涉及付款、审批或上线动作,必须进入人工审批链。", ) return OntologyPermission( level=AgentPermissionLevel.FORBIDDEN.value, allowed=False, reason="当前账号缺少财务或审批权限,只能查看结果或生成草稿。", ) return OntologyPermission( level=AgentPermissionLevel.APPROVAL_REQUIRED.value, allowed=False, reason="操作类请求需要人工审批确认。", ) def _build_field_errors( self, *, scenario: str, intent: str, entities: list[OntologyEntity], permission: OntologyPermission, missing_slots: list[str], ambiguity: list[str], ) -> list[OntologyFieldError]: errors: list[OntologyFieldError] = [] if scenario == "unknown": errors.append( OntologyFieldError( field="scenario", code="scenario_unknown", message="未识别出明确业务场景,请补充是报销、应收、应付还是制度问题。", ) ) if intent == "compare" and len([item for item in entities if item.type != "amount"]) < 2: errors.append( OntologyFieldError( field="entities", code="compare_target_missing", message="对比类问题请至少给出两个对象,或给出更明确的对比范围。", ) ) if missing_slots: errors.append( OntologyFieldError( field="missing_slots", code="required_slot_missing", message=( "继续处理前还缺少关键信息:" f"{'、'.join(self._display_slot_label(item) for item in missing_slots)}。" ), ) ) if ambiguity: errors.append( OntologyFieldError( field="ambiguity", code="ambiguity_detected", message=f"当前问题存在歧义:{';'.join(ambiguity)}。", ) ) if permission.level == AgentPermissionLevel.FORBIDDEN.value: errors.append( OntologyFieldError( field="permission", code="permission_forbidden", message=permission.reason, ) ) return errors def _build_clarification( self, *, scenario: str, intent: str, entities: list[OntologyEntity], permission: OntologyPermission, missing_slots: list[str], ambiguity: list[str], allow_incomplete_draft: bool, model_clarification_required: bool, model_clarification_question: str | None, ) -> tuple[bool, str | None]: if permission.level == AgentPermissionLevel.FORBIDDEN.value: return True, "当前动作超出权限范围。是否改为生成草稿或建议?" if scenario == "knowledge" and intent in {"query", "explain"}: return False, None if model_clarification_required: question = str(model_clarification_question or "").strip() if question: return True, question if missing_slots: return True, self._build_missing_slot_question(missing_slots) if ambiguity: return True, f"当前问题存在歧义,请进一步说明:{';'.join(ambiguity)}。" if scenario == "unknown": return True, "请说明这是报销、应收、应付,还是制度知识问题?" if intent == "compare" and len([item for item in entities if item.type != "amount"]) < 2: return True, "请补充需要对比的两个对象,例如两个客户、两个供应商或两个员工。" if allow_incomplete_draft and scenario == "expense" and intent == "draft": return False, None if missing_slots: return True, self._build_missing_slot_question(missing_slots) if ambiguity: return True, f"当前问题存在歧义,请进一步说明:{';'.join(ambiguity)}。" return False, None @staticmethod def _allow_incomplete_draft( context_json: dict[str, Any], *, scenario: str, intent: str, ) -> bool: if scenario != "expense" or intent != "draft": return False review_action = str(context_json.get("review_action") or "").strip() return review_action in EXPENSE_REVIEW_ACTIONS @staticmethod def _display_slot_label(slot: str) -> str: return MISSING_SLOT_LABELS.get(slot, slot) def _build_missing_slot_question(self, missing_slots: list[str]) -> str: labels = [self._display_slot_label(item) for item in missing_slots[:4]] if not labels: return "请补充更多上下文后再继续。" return f"请补充{'、'.join(labels)},我再继续帮你解析和处理。" @staticmethod def _compute_confidence( *, scenario: str, scenario_score: float, intent_score: float, entities: list[OntologyEntity], time_range: OntologyTimeRange, metrics: list[OntologyMetric], constraints: list[OntologyConstraint], risk_flags: list[str], clarification_required: bool, permission: OntologyPermission, ) -> float: confidence = 0.18 + scenario_score + intent_score confidence += min(0.16, len(entities) * 0.04) if time_range.start_date: confidence += 0.10 if metrics: confidence += 0.06 if constraints: confidence += 0.06 if risk_flags: confidence += 0.08 if permission.level == AgentPermissionLevel.FORBIDDEN.value: confidence = max(confidence, 0.86) if scenario == "unknown": confidence = min(confidence, 0.45) if clarification_required and permission.level != AgentPermissionLevel.FORBIDDEN.value: confidence = min(confidence, 0.58) return round(min(confidence, 0.98), 2) @staticmethod def _build_result_summary( scenario: str, intent: str, permission_level: str, confidence: float, ) -> str: return ( f"语义解析完成:scenario={scenario}, intent={intent}, " f"permission={permission_level}, confidence={confidence:.2f}" ) @staticmethod def _normalize_operator(value: str) -> str: mapping = { "超过": ">", "大于": ">", "高于": ">", ">": ">", ">=": ">=", "不少于": ">=", "不低于": ">=", "小于": "<", "低于": "<", "少于": "<", "<": "<", "<=": "<=", "至多": "<=", "不超过": "<=", "=": "=", "=": "=", } return mapping.get(value, value) @staticmethod def _normalize_amount(raw_value: str | None, unit: str | None) -> int | float: numeric = float(raw_value or 0) if unit in {"万", "万元"}: numeric *= 10000 return int(numeric) if numeric.is_integer() else round(numeric, 2)