from __future__ import annotations import json import re from datetime import UTC, datetime, timedelta from decimal import Decimal, InvalidOperation from typing import Any from sqlalchemy import or_, select from sqlalchemy.orm import selectinload from app.api.deps import CurrentUserContext from app.core.agent_enums import AgentAssetStatus, AgentAssetType from app.models.employee import Employee from app.models.financial_record import ExpenseClaim from app.schemas.agent_asset import AgentAssetListItem from app.schemas.reimbursement import TravelReimbursementCalculatorRequest from app.schemas.user_agent import ( UserAgentCitation, UserAgentDraftPayload, UserAgentExpenseQueryRecord, UserAgentQueryPayload, UserAgentQueryStatusGroup, UserAgentReviewAction, UserAgentReviewClaimGroup, UserAgentReviewDocumentCard, UserAgentReviewDocumentField, UserAgentReviewEditField, UserAgentReviewPayload, UserAgentReviewRiskBrief, UserAgentReviewSlotCard, UserAgentRequest, UserAgentSuggestedAction, ) from app.services.agent_assets import AgentAssetService from app.services.expense_claims import ExpenseClaimService from app.services.expense_rule_runtime import ExpenseRuleRuntimeService, RuntimeTravelPolicy, resolve_document_type_label from app.services.risk_ontology_bridge import resolve_rule_codes_for_risk_check from app.services.travel_reimbursement_calculator import TravelReimbursementCalculatorService from app.services.user_agent_constants import * class UserAgentReviewProfileMixin: def _build_review_edit_fields( self, payload: UserAgentRequest, *, draft_payload: UserAgentDraftPayload | None, slot_cards: list[UserAgentReviewSlotCard], ) -> list[UserAgentReviewEditField]: slot_map = {item.key: item for item in slot_cards} employee = self._resolve_employee_profile(payload) reporter_name = ( slot_map.get("reporter_name").value if slot_map.get("reporter_name") else str(payload.context_json.get("name") or "").strip() ) manager_name = self._resolve_manager_name(employee) reason = slot_map.get("reason").value if slot_map.get("reason") else "" attachments = "、".join(self._resolve_attachment_names(payload)) fields = [ UserAgentReviewEditField( key="claim_no", label="报销单据编号", value=str(draft_payload.claim_no if draft_payload is not None and draft_payload.claim_no else "待生成"), placeholder="保存草稿后自动生成", required=False, group="basic", ), UserAgentReviewEditField( key="expense_type", label="报销类型", value=slot_map.get("expense_type").value if slot_map.get("expense_type") else "", placeholder="例如:业务招待费 / 差旅费", group="basic", ), UserAgentReviewEditField( key="occurred_date", label="业务发生时间", value=slot_map.get("time_range").normalized_value if slot_map.get("time_range") and slot_map.get("time_range").normalized_value else slot_map.get("time_range").value if slot_map.get("time_range") else "", placeholder="例如:2026-05-11", group="basic", ), UserAgentReviewEditField( key="reporter_name", label="报销人", value=reporter_name, placeholder="请输入报销人姓名", group="basic", ), UserAgentReviewEditField( key="manager_name", label="直属上司姓名", value=manager_name, placeholder="请输入直属上司姓名", required=False, group="basic", ), UserAgentReviewEditField( key="customer_name", label="客户名称", value=slot_map.get("customer_name").value if slot_map.get("customer_name") else "", placeholder="请输入客户名称", group="business", ), UserAgentReviewEditField( key="business_location", label="业务地点", value=slot_map.get("location").normalized_value if slot_map.get("location") and slot_map.get("location").normalized_value else slot_map.get("location").value if slot_map.get("location") else "", placeholder="例如:北京 / 客户现场", required=False, group="business", ), UserAgentReviewEditField( key="merchant_name", label="酒店/商户", value=slot_map.get("merchant_name").value if slot_map.get("merchant_name") else "", placeholder="请输入酒店或商户名称", required=False, group="business", ), UserAgentReviewEditField( key="amount", label="金额", value=slot_map.get("amount").normalized_value if slot_map.get("amount") and slot_map.get("amount").normalized_value else slot_map.get("amount").value if slot_map.get("amount") else "", placeholder="例如:200.00元", group="business", ), UserAgentReviewEditField( key="participants", label="参与人员", value=slot_map.get("participants").value if slot_map.get("participants") else "", placeholder="例如:客户 2 人,我方 1 人", group="business", ), UserAgentReviewEditField( key="reason", label="事由", value=reason, placeholder="请输入报销事由", field_type="textarea", group="business", ), UserAgentReviewEditField( key="attachment_names", label="附件清单", value=attachments, placeholder="例如:发票.jpg、行程单.png", required=False, field_type="textarea", group="attachments", ), ] return fields def _resolve_employee_profile(self, payload: UserAgentRequest) -> Employee | None: candidates = [ str(payload.context_json.get("name") or "").strip(), str(payload.user_id or "").strip(), self._collect_entity_values(payload).get("employee_name", ""), ] normalized = [item for item in dict.fromkeys(candidates) if item] if not normalized: return None stmt = ( select(Employee) .options(selectinload(Employee.organization_unit), selectinload(Employee.manager)) .where( or_( Employee.name.in_(normalized), Employee.employee_no.in_(normalized), Employee.email.in_(normalized), ) ) .limit(1) ) return self.db.scalar(stmt) @staticmethod def _resolve_manager_name(employee: Employee | None) -> str: if employee is None: return "" if employee.manager is not None and employee.manager.name: return employee.manager.name if employee.organization_unit is not None and employee.organization_unit.manager_name: return employee.organization_unit.manager_name return "" @staticmethod def _extract_message_reason(message: str) -> str: for line in str(message or "").splitlines(): cleaned = line.strip() if not cleaned: continue if cleaned.startswith(("附件名称:", "OCR摘要:", "关联单号:")): continue return cleaned[:300] return "" @staticmethod def _looks_like_system_generated_reason_message(message: str) -> bool: cleaned = str(message or "").strip() if not cleaned: return False compact = re.sub(r"\s+", "", cleaned) return compact.startswith(SYSTEM_GENERATED_REASON_PREFIXES) def _resolve_reason_source_text(self, payload: UserAgentRequest) -> str: explicit_text = payload.context_json.get("user_input_text") if isinstance(explicit_text, str): return explicit_text.strip() if self._looks_like_system_generated_reason_message(payload.message): return "" return str(payload.message or "").strip() @classmethod def _resolve_reason_text(cls, message: str) -> str: reason = cls._strip_leading_time_from_reason(cls._extract_message_reason(message)) if not reason: return "" compact = re.sub(r"\s+", "", reason) if compact in GENERIC_EXPENSE_PROMPTS: return "" instruction_prefixes = ( "帮我生成", "请帮我生成", "生成", "起草", "创建", "发起", "准备", "帮我报销", "我要报销", "我想报销", ) if compact.startswith(instruction_prefixes): for separator in (",", ",", "。", ";", ";", ":", ":"): if separator in reason: trailing = reason.split(separator, 1)[1].strip() if trailing: return trailing[:300] return "" return reason @staticmethod def _strip_leading_time_from_reason(value: str) -> str: reason = str(value or "").strip() for pattern in LEADING_REASON_TIME_PATTERNS: next_reason = pattern.sub("", reason).strip() if next_reason != reason: return next_reason return reason @staticmethod def _should_skip_model_answer( payload: UserAgentRequest, review_payload: UserAgentReviewPayload | None, ) -> bool: if payload.ontology.scenario == "expense" and payload.ontology.intent in {"query", "compare"}: return True if review_payload is None: return False return payload.ontology.scenario == "expense" and ( payload.ontology.intent == "draft" or int(payload.context_json.get("attachment_count") or 0) > 0 ) def _build_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]: knowledge_citations = self._build_knowledge_citations(payload) if payload.ontology.scenario == "knowledge": return knowledge_citations[:3] rule_citations = self._build_rule_asset_citations(payload) if knowledge_citations: return (knowledge_citations + rule_citations)[:3] return rule_citations @staticmethod def _build_knowledge_citations(payload: UserAgentRequest) -> list[UserAgentCitation]: citations: list[UserAgentCitation] = [] for item in list(payload.tool_payload.get("hits") or [])[:3]: if not isinstance(item, dict): continue title = str(item.get("title") or item.get("document_name") or "").strip() code = str(item.get("code") or item.get("candidate_id") or "").strip() if not title or not code: continue citations.append( UserAgentCitation( source_type="knowledge", code=code, title=title, version=str(item.get("version") or "").strip() or None, updated_at=str(item.get("updated_at") or "").strip() or None, excerpt=( str(item.get("excerpt") or "").strip() or str(item.get("content") or "").strip() or None ), ) ) return citations def _build_rule_asset_citations(self, payload: UserAgentRequest) -> list[UserAgentCitation]: domain = self._resolve_domain(payload.ontology.scenario) items = self.asset_service.list_assets( asset_type=AgentAssetType.RULE.value, status=AgentAssetStatus.ACTIVE.value, domain=domain, ) ranked = self._rank_rule_assets(items, payload) citations: list[UserAgentCitation] = [] for item in ranked[:2]: detail = self.asset_service.get_asset(item.id) if detail is None: continue excerpt = self._extract_excerpt(str(detail.current_version_content or "")) citations.append( UserAgentCitation( source_type="rule", code=detail.code, title=detail.name, version=detail.current_version, updated_at=detail.updated_at.date().isoformat(), excerpt=excerpt, ) ) return citations @staticmethod def _resolve_risk_flags(payload: UserAgentRequest) -> list[str]: tool_flags = payload.tool_payload.get("risk_flags") if isinstance(tool_flags, list) and tool_flags: return [str(item) for item in tool_flags] return [str(item) for item in payload.ontology.risk_flags] @staticmethod def _resolve_subject(payload: UserAgentRequest) -> str: named_entities = [ item.value for item in payload.ontology.entities if item.type in {"employee", "customer", "vendor", "project"} ] if named_entities: return f"{'、'.join(named_entities)} 相关数据" return f"{SCENARIO_LABELS.get(payload.ontology.scenario, '当前')}场景数据" @staticmethod def _is_generic_expense_prompt(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense": return False normalized_message = re.sub(r"\s+", "", payload.message) return normalized_message in GENERIC_EXPENSE_PROMPTS @staticmethod def _is_implicit_expense_draft_request(payload: UserAgentRequest) -> bool: if payload.ontology.scenario != "expense" or payload.ontology.intent != "draft": return False compact_message = re.sub(r"\s+", "", payload.message) if any(keyword in compact_message for keyword in EXPLICIT_DRAFT_KEYWORDS): return False return True @staticmethod def _resolve_attachment_names(payload: UserAgentRequest) -> list[str]: names = payload.context_json.get("attachment_names") if not isinstance(names, list): return [] return [str(name) for name in names if str(name).strip()] @staticmethod def _resolve_attachment_count(payload: UserAgentRequest) -> int: names = UserAgentReviewProfileMixin._resolve_attachment_names(payload) if names: return len(names) try: return max(0, int(payload.context_json.get("attachment_count") or 0)) except (TypeError, ValueError): return 0 @staticmethod def _resolve_ocr_documents(payload: UserAgentRequest) -> list[dict[str, object]]: documents = payload.context_json.get("ocr_documents") if not isinstance(documents, list): return [] overrides = payload.context_json.get("review_document_form_values") override_map: dict[tuple[int, str], dict[str, object]] = {} if isinstance(overrides, list): for item in overrides: if not isinstance(item, dict): continue filename = str(item.get("filename") or "").strip() index = int(item.get("index") or 0) if not filename and index <= 0: continue override_map[(index, filename)] = item normalized: list[dict[str, object]] = [] for index, item in enumerate(documents[:8], start=1): if not isinstance(item, dict): continue normalized_item = dict(item) override = override_map.get((index, str(normalized_item.get("filename") or "").strip())) if override is None: override = override_map.get((index, "")) if override is not None: summary = str(override.get("summary") or "").strip() scene_label = str(override.get("scene_label") or "").strip() fields = override.get("fields") if summary: normalized_item["summary"] = summary if scene_label: normalized_item["scene_label"] = scene_label if isinstance(fields, list): normalized_item["document_fields"] = [ { "key": str(field.get("key") or field.get("label") or "").strip(), "label": str(field.get("label") or "").strip(), "value": str(field.get("value") or "").strip(), } for field in fields if isinstance(field, dict) and str(field.get("label") or "").strip() and str(field.get("value") or "").strip() ] normalized.append(normalized_item) return normalized @staticmethod def _is_review_association_choice_pending(payload: UserAgentRequest) -> bool: return bool(payload.tool_payload.get("pending_association_decision")) def _resolve_review_document_count(self, payload: UserAgentRequest) -> int: return max( len(self._resolve_ocr_documents(payload)), self._resolve_attachment_count(payload), )