refactor: enforce 800 line source limits

This commit is contained in:
caoxiaozhu
2026-06-22 11:58:53 +08:00
parent 08a4fa3577
commit 6d33ba5742
150 changed files with 27413 additions and 23791 deletions

View File

@@ -151,432 +151,7 @@ APPLICATION_DUPLICATE_IGNORED_STATUSES = {
}
class UserAgentApplicationMixin:
@staticmethod
def _is_expense_application_request(payload: UserAgentRequest) -> bool:
context_json = payload.context_json or {}
context_values = {
str(context_json.get("session_type") or "").strip(),
str(context_json.get("entry_source") or "").strip(),
str(context_json.get("document_type") or "").strip(),
str(context_json.get("application_stage") or "").strip(),
}
conversation_state = context_json.get("conversation_state")
if isinstance(conversation_state, dict):
context_values.update(
{
str(conversation_state.get("session_type") or "").strip(),
str(conversation_state.get("entry_source") or "").strip(),
str(conversation_state.get("document_type") or "").strip(),
str(conversation_state.get("application_stage") or "").strip(),
}
)
if context_values & APPLICATION_CONTEXT_VALUES:
return True
history = context_json.get("conversation_history")
if not isinstance(history, list):
return False
compact_message = re.sub(r"\s+", "", str(payload.message or ""))
looks_like_submit = (
any(keyword in compact_message for keyword in APPLICATION_SUBMIT_KEYWORDS)
or compact_message in APPLICATION_SHORT_CONFIRMATIONS
)
if not looks_like_submit:
return False
return any(
isinstance(item, dict)
and str(item.get("role") or "").strip() == "assistant"
and (
"#application-submit" in str(item.get("content") or "")
or ("费用申请" in str(item.get("content") or "") and "确认" in str(item.get("content") or ""))
)
for item in history[-6:]
)
def _build_expense_application_response(
self,
payload: UserAgentRequest,
*,
risk_flags: list[str],
) -> UserAgentResponse:
facts = self._resolve_expense_application_facts(payload)
step = self._resolve_expense_application_step(payload, facts)
application_claim = None
if step in {"draft", "submitted"}:
editable_claim = self._find_editable_expense_application_record(payload)
if editable_claim is not None:
application_claim = self._update_expense_application_record(
payload,
facts,
editable_claim,
submit=step == "submitted",
)
facts["application_edit_mode"] = "true"
elif step == "submitted":
application_claim = self._find_duplicate_expense_application_record(payload, facts)
if application_claim is not None:
step = "duplicate"
facts["duplicate_application_stage"] = str(application_claim.approval_stage or "").strip()
else:
application_claim = self._create_expense_application_record(
payload,
facts,
submit=True,
)
else:
application_claim = self._create_expense_application_record(
payload,
facts,
submit=False,
)
if application_claim is not None:
facts["application_no"] = application_claim.claim_no
facts["application_claim_id"] = application_claim.id
facts["manager_name"] = self._resolve_application_manager_name(payload, application_claim)
return UserAgentResponse(
answer=self._build_expense_application_answer(payload, facts=facts, step=step),
citations=[],
suggested_actions=self._build_expense_application_actions(step, facts),
query_payload=None,
draft_payload=(
self._build_persisted_application_payload(application_claim, facts)
if step in {"draft", "submitted"}
else None
),
review_payload=None,
risk_flags=risk_flags,
requires_confirmation=step == "preview",
)
def _build_expense_application_answer(
self,
payload: UserAgentRequest,
*,
facts: dict[str, str],
step: str,
) -> str:
recognized_table = build_application_summary_table(facts, include_empty=False)
if step == "ask_missing":
missing_fields = self._resolve_application_missing_fields(facts)
missing_text = "".join(
self._display_application_slot_label(item)
for item in missing_fields
)
return "\n\n".join(
[
"我已按「费用申请 / 事前审批」来处理这条内容。",
"已识别信息:\n" + recognized_table,
f"当前还需要补充:{missing_text}",
"请一次性补齐上述字段,我会继续生成申请核对结果并让你确认是否提交。",
]
)
if step == "draft":
application_no = str(facts.get("application_no") or "").strip()
return "\n\n".join(
[
"申请草稿已保存。",
f"草稿单号:{application_no}" if application_no else "草稿单号:待生成",
"当前节点:待提交。",
"后续可进入单据详情继续核对、补充或提交审批。",
]
)
if step == "submitted":
application_no = str(facts.get("application_no") or "").strip() or self._build_application_claim_no(payload, facts)
manager_name = str(facts.get("manager_name") or "").strip() or "直属领导"
submitted_title = (
"申请单据已修改并重新提交,已进入审批流程。"
if str(facts.get("application_edit_mode") or "").strip().lower() == "true"
else "申请单据已生成,并已进入审批流程。"
)
return "\n\n".join(
[
submitted_title,
f"系统已推送给 {manager_name} 审核,当前节点:{manager_name}审核中。",
f"申请单号:{application_no}",
"下方是简要单据信息。需要查看完整详情时,请点击快捷方式进入单据详情。",
]
)
if step == "duplicate":
application_no = str(facts.get("application_no") or "").strip()
stage = str(facts.get("duplicate_application_stage") or "").strip() or "处理中"
time_label = resolve_application_time_label(facts)
return "\n\n".join(
[
f"检测到同一申请人、同一申请类型、同一{time_label}已存在申请单,系统没有重复创建。",
f"已有申请单号:{application_no}",
f"当前节点:{stage}",
"如需继续处理,请在单据中心查看该申请;如果本次业务时间不同,请先调整时间后再提交。",
]
)
return "\n\n".join(
[
"这是费用申请核对结果,请核对:",
build_application_summary_table(facts),
"请核对上述信息无误,确认无误后 [确认](#application-submit) 提交至审批流程。",
]
)
def _resolve_expense_application_facts(self, payload: UserAgentRequest) -> dict[str, str]:
facts = {
"time": "",
"location": "",
"reason": "",
"days": "",
"transport_mode": "",
"amount": "",
"application_type": "",
"applicant": "",
"grade": "",
"department": "",
"position": "",
"manager_name": "",
"lodging_daily_cap": "",
"subsidy_daily_cap": "",
"transport_policy": "",
"policy_estimate": "",
"matched_city": "",
"rule_name": "",
"rule_version": "",
"hotel_amount": "",
"allowance_amount": "",
"transport_estimated_amount": "",
"transport_estimate_source": "",
"transport_estimate_confidence": "",
"policy_total_amount": "",
}
for message, is_current in self._iter_application_user_messages(payload):
partial = {
"time": self._resolve_application_time(payload, message=message) if is_current else self._resolve_application_time_from_text(message),
"location": self._resolve_application_location(payload, message=message, use_entities=is_current),
"reason": self._resolve_application_reason(message),
"days": self._resolve_application_days(message),
"transport_mode": self._resolve_application_transport_mode(message),
"amount": self._resolve_application_amount(payload, message=message) if is_current else self._resolve_application_amount_from_text(message),
"application_type": self._resolve_application_type_from_text(message),
}
for key, value in partial.items():
if value:
facts[key] = value
for key, value in self._resolve_application_preview_facts(payload.context_json or {}).items():
if value:
facts[key] = value
facts["application_type"] = self._normalize_application_type_label(facts.get("application_type", ""))
context_json = payload.context_json or {}
context_time = self._resolve_application_time_from_context(context_json)
if context_time and self._should_prefer_context_application_time(facts.get("time", ""), context_time):
facts["time"] = context_time
current_user = self._build_application_current_user(payload)
employee = ExpenseClaimAccessPolicy(self.db).resolve_current_employee(current_user)
if not facts["applicant"]:
facts["applicant"] = str(
context_json.get("name")
or context_json.get("user_name")
or context_json.get("applicant")
or (employee.name if employee is not None else "")
or current_user.name
or ""
).strip()
if not facts["grade"]:
facts["grade"] = str(
context_json.get("grade")
or context_json.get("employee_grade")
or context_json.get("employeeGrade")
or current_user.grade
or (employee.grade if employee is not None else "")
or ""
).strip()
if not facts["department"]:
facts["department"] = str(
context_json.get("department")
or context_json.get("department_name")
or context_json.get("departmentName")
or current_user.department_name
or (
employee.organization_unit.name
if employee is not None and employee.organization_unit is not None
else ""
)
or ""
).strip()
if not facts["position"]:
facts["position"] = str(
context_json.get("position")
or context_json.get("employee_position")
or context_json.get("employeePosition")
or current_user.position
or (employee.position if employee is not None else "")
or ""
).strip()
if not facts["manager_name"]:
facts["manager_name"] = str(
context_json.get("manager_name")
or context_json.get("managerName")
or context_json.get("direct_manager_name")
or context_json.get("directManagerName")
or current_user.manager_name
or (
employee.manager.name
if employee is not None and employee.manager is not None
else ""
)
or (
employee.organization_unit.manager_name
if employee is not None and employee.organization_unit is not None
else ""
)
or ""
).strip()
if not facts["application_type"]:
facts["application_type"] = self._infer_application_type(facts)
facts["time"] = self._expand_application_time_with_days(
facts.get("time", ""),
facts.get("days", ""),
payload.context_json or {},
)
if self._is_application_missing_value(facts.get("days", "")):
range_days = resolve_application_days_from_time_range(facts.get("time", ""))
if range_days:
facts["days"] = f"{range_days}"
self._apply_rule_center_travel_policy_to_application_facts(payload, facts)
apply_application_system_estimate_to_facts(facts)
return facts
def _apply_rule_center_travel_policy_to_application_facts(
self,
payload: UserAgentRequest,
facts: dict[str, str],
) -> None:
if "差旅" not in str(facts.get("application_type") or "") and "出差" not in str(facts.get("application_type") or ""):
return
location = str(facts.get("location") or "").strip()
grade = str(facts.get("grade") or "").strip()
if not location or not grade:
return
days = self._parse_application_days_count(facts.get("days", "")) or 1
try:
result = TravelReimbursementCalculatorService(self.db).calculate(
TravelReimbursementCalculatorRequest(days=days, location=location, grade=grade),
self._build_application_current_user(payload),
)
except ValueError:
return
hotel_rate = self._format_application_policy_money(result.hotel_rate)
hotel_amount = self._format_application_policy_money(result.hotel_amount)
allowance_rate = self._format_application_policy_money(result.total_allowance_rate)
allowance_amount = self._format_application_policy_money(result.allowance_amount)
if hotel_rate:
facts["lodging_daily_cap"] = f"{hotel_rate}元/天"
if hotel_amount:
facts["hotel_amount"] = f"{hotel_amount}"
if allowance_rate:
facts["subsidy_daily_cap"] = f"{allowance_rate}元/天"
if allowance_amount:
facts["allowance_amount"] = f"{allowance_amount}"
if str(result.matched_city or "").strip():
facts["matched_city"] = str(result.matched_city).strip()
if str(result.rule_name or "").strip():
facts["rule_name"] = str(result.rule_name).strip()
if str(result.rule_version or "").strip():
facts["rule_version"] = str(result.rule_version).strip()
@staticmethod
def _format_application_policy_money(value: object) -> str:
try:
amount = Decimal(str(value or "0")).quantize(Decimal("0.01"))
except (InvalidOperation, ValueError):
return ""
if amount == amount.to_integral():
return f"{int(amount):,}"
return f"{amount:,.2f}".rstrip("0").rstrip(".")
@staticmethod
def _parse_application_days_count(value: object) -> int:
match = re.search(r"\d+", str(value or ""))
if not match:
return 0
try:
return max(0, int(match.group(0)))
except ValueError:
return 0
@staticmethod
def _resolve_application_preview_facts(context_json: dict[str, object]) -> dict[str, str]:
preview = context_json.get("application_preview")
if not isinstance(preview, dict):
return {}
fields = preview.get("fields")
if not isinstance(fields, dict):
return {}
def pick(*keys: str) -> str:
for key in keys:
value = str(fields.get(key) or "").strip()
if value:
return value
return ""
reason = UserAgentApplicationMixin._cleanup_application_reason_candidate(pick("reason"))
return {
"application_type": UserAgentApplicationMixin._normalize_application_type_label(
pick("applicationType", "application_type")
),
"time": pick("time", "timeRange", "time_range"),
"location": pick("location"),
"reason": reason,
"days": pick("days"),
"transport_mode": pick("transportMode", "transport_mode"),
"amount": pick("amount"),
"applicant": pick("applicant", "name", "userName", "user_name"),
"grade": pick("grade"),
"department": pick("department", "departmentName", "department_name"),
"position": pick("position", "employeePosition", "employee_position"),
"manager_name": pick("managerName", "manager_name", "directManagerName", "direct_manager_name"),
"lodging_daily_cap": pick("lodgingDailyCap", "lodging_daily_cap"),
"subsidy_daily_cap": pick("subsidyDailyCap", "subsidy_daily_cap"),
"transport_policy": pick("transportPolicy", "transport_policy"),
"policy_estimate": pick("policyEstimate", "policy_estimate"),
"matched_city": pick("matchedCity", "matched_city"),
"rule_name": pick("ruleName", "rule_name"),
"rule_version": pick("ruleVersion", "rule_version"),
"hotel_amount": pick("hotelAmount", "hotel_amount"),
"allowance_amount": pick("allowanceAmount", "allowance_amount"),
"transport_estimated_amount": pick("transportEstimatedAmount", "transport_estimated_amount"),
"transport_estimate_source": pick("transportEstimateSource", "transport_estimate_source"),
"transport_estimate_confidence": pick("transportEstimateConfidence", "transport_estimate_confidence"),
"policy_total_amount": pick("policyTotalAmount", "policy_total_amount"),
}
@staticmethod
def _is_application_missing_value(value: object) -> bool:
return str(value or "").strip().lower() in APPLICATION_MISSING_VALUES
def _resolve_expense_application_step(
self,
payload: UserAgentRequest,
facts: dict[str, str],
) -> str:
if self._is_application_save_draft_action(payload):
return "draft"
if self._resolve_application_missing_base_fields(facts):
return "ask_missing"
if self._resolve_application_missing_followup_fields(facts):
return "ask_missing"
if self._is_application_submit_confirmation(payload):
return "submitted"
return "preview"
class UserAgentApplicationSlotMixin:
@staticmethod
def _iter_application_user_messages(payload: UserAgentRequest) -> list[tuple[str, bool]]:
messages: list[tuple[str, bool]] = []
@@ -1027,6 +602,8 @@ class UserAgentApplicationMixin:
return "会务费用申请"
return "差旅费用申请"
class UserAgentApplicationPersistenceMixin:
@staticmethod
def _resolve_application_edit_claim_id(context_json: dict[str, object]) -> str:
if not isinstance(context_json, dict):
@@ -1512,3 +1089,431 @@ class UserAgentApplicationMixin:
"application",
timestamp=datetime.now(UTC),
)
class UserAgentApplicationMixin(UserAgentApplicationSlotMixin, UserAgentApplicationPersistenceMixin):
@staticmethod
def _is_expense_application_request(payload: UserAgentRequest) -> bool:
context_json = payload.context_json or {}
context_values = {
str(context_json.get("session_type") or "").strip(),
str(context_json.get("entry_source") or "").strip(),
str(context_json.get("document_type") or "").strip(),
str(context_json.get("application_stage") or "").strip(),
}
conversation_state = context_json.get("conversation_state")
if isinstance(conversation_state, dict):
context_values.update(
{
str(conversation_state.get("session_type") or "").strip(),
str(conversation_state.get("entry_source") or "").strip(),
str(conversation_state.get("document_type") or "").strip(),
str(conversation_state.get("application_stage") or "").strip(),
}
)
if context_values & APPLICATION_CONTEXT_VALUES:
return True
history = context_json.get("conversation_history")
if not isinstance(history, list):
return False
compact_message = re.sub(r"\s+", "", str(payload.message or ""))
looks_like_submit = (
any(keyword in compact_message for keyword in APPLICATION_SUBMIT_KEYWORDS)
or compact_message in APPLICATION_SHORT_CONFIRMATIONS
)
if not looks_like_submit:
return False
return any(
isinstance(item, dict)
and str(item.get("role") or "").strip() == "assistant"
and (
"#application-submit" in str(item.get("content") or "")
or ("费用申请" in str(item.get("content") or "") and "确认" in str(item.get("content") or ""))
)
for item in history[-6:]
)
def _build_expense_application_response(
self,
payload: UserAgentRequest,
*,
risk_flags: list[str],
) -> UserAgentResponse:
facts = self._resolve_expense_application_facts(payload)
step = self._resolve_expense_application_step(payload, facts)
application_claim = None
if step in {"draft", "submitted"}:
editable_claim = self._find_editable_expense_application_record(payload)
if editable_claim is not None:
application_claim = self._update_expense_application_record(
payload,
facts,
editable_claim,
submit=step == "submitted",
)
facts["application_edit_mode"] = "true"
elif step == "submitted":
application_claim = self._find_duplicate_expense_application_record(payload, facts)
if application_claim is not None:
step = "duplicate"
facts["duplicate_application_stage"] = str(application_claim.approval_stage or "").strip()
else:
application_claim = self._create_expense_application_record(
payload,
facts,
submit=True,
)
else:
application_claim = self._create_expense_application_record(
payload,
facts,
submit=False,
)
if application_claim is not None:
facts["application_no"] = application_claim.claim_no
facts["application_claim_id"] = application_claim.id
facts["manager_name"] = self._resolve_application_manager_name(payload, application_claim)
return UserAgentResponse(
answer=self._build_expense_application_answer(payload, facts=facts, step=step),
citations=[],
suggested_actions=self._build_expense_application_actions(step, facts),
query_payload=None,
draft_payload=(
self._build_persisted_application_payload(application_claim, facts)
if step in {"draft", "submitted"}
else None
),
review_payload=None,
risk_flags=risk_flags,
requires_confirmation=step == "preview",
)
def _build_expense_application_answer(
self,
payload: UserAgentRequest,
*,
facts: dict[str, str],
step: str,
) -> str:
recognized_table = build_application_summary_table(facts, include_empty=False)
if step == "ask_missing":
missing_fields = self._resolve_application_missing_fields(facts)
missing_text = "".join(
self._display_application_slot_label(item)
for item in missing_fields
)
return "\n\n".join(
[
"我已按「费用申请 / 事前审批」来处理这条内容。",
"已识别信息:\n" + recognized_table,
f"当前还需要补充:{missing_text}",
"请一次性补齐上述字段,我会继续生成申请核对结果并让你确认是否提交。",
]
)
if step == "draft":
application_no = str(facts.get("application_no") or "").strip()
return "\n\n".join(
[
"申请草稿已保存。",
f"草稿单号:{application_no}" if application_no else "草稿单号:待生成",
"当前节点:待提交。",
"后续可进入单据详情继续核对、补充或提交审批。",
]
)
if step == "submitted":
application_no = str(facts.get("application_no") or "").strip() or self._build_application_claim_no(payload, facts)
manager_name = str(facts.get("manager_name") or "").strip() or "直属领导"
submitted_title = (
"申请单据已修改并重新提交,已进入审批流程。"
if str(facts.get("application_edit_mode") or "").strip().lower() == "true"
else "申请单据已生成,并已进入审批流程。"
)
return "\n\n".join(
[
submitted_title,
f"系统已推送给 {manager_name} 审核,当前节点:{manager_name}审核中。",
f"申请单号:{application_no}",
"下方是简要单据信息。需要查看完整详情时,请点击快捷方式进入单据详情。",
]
)
if step == "duplicate":
application_no = str(facts.get("application_no") or "").strip()
stage = str(facts.get("duplicate_application_stage") or "").strip() or "处理中"
time_label = resolve_application_time_label(facts)
return "\n\n".join(
[
f"检测到同一申请人、同一申请类型、同一{time_label}已存在申请单,系统没有重复创建。",
f"已有申请单号:{application_no}",
f"当前节点:{stage}",
"如需继续处理,请在单据中心查看该申请;如果本次业务时间不同,请先调整时间后再提交。",
]
)
return "\n\n".join(
[
"这是费用申请核对结果,请核对:",
build_application_summary_table(facts),
"请核对上述信息无误,确认无误后 [确认](#application-submit) 提交至审批流程。",
]
)
def _resolve_expense_application_facts(self, payload: UserAgentRequest) -> dict[str, str]:
facts = {
"time": "",
"location": "",
"reason": "",
"days": "",
"transport_mode": "",
"amount": "",
"application_type": "",
"applicant": "",
"grade": "",
"department": "",
"position": "",
"manager_name": "",
"lodging_daily_cap": "",
"subsidy_daily_cap": "",
"transport_policy": "",
"policy_estimate": "",
"matched_city": "",
"rule_name": "",
"rule_version": "",
"hotel_amount": "",
"allowance_amount": "",
"transport_estimated_amount": "",
"transport_estimate_source": "",
"transport_estimate_confidence": "",
"policy_total_amount": "",
}
for message, is_current in self._iter_application_user_messages(payload):
partial = {
"time": self._resolve_application_time(payload, message=message) if is_current else self._resolve_application_time_from_text(message),
"location": self._resolve_application_location(payload, message=message, use_entities=is_current),
"reason": self._resolve_application_reason(message),
"days": self._resolve_application_days(message),
"transport_mode": self._resolve_application_transport_mode(message),
"amount": self._resolve_application_amount(payload, message=message) if is_current else self._resolve_application_amount_from_text(message),
"application_type": self._resolve_application_type_from_text(message),
}
for key, value in partial.items():
if value:
facts[key] = value
for key, value in self._resolve_application_preview_facts(payload.context_json or {}).items():
if value:
facts[key] = value
facts["application_type"] = self._normalize_application_type_label(facts.get("application_type", ""))
context_json = payload.context_json or {}
context_time = self._resolve_application_time_from_context(context_json)
if context_time and self._should_prefer_context_application_time(facts.get("time", ""), context_time):
facts["time"] = context_time
current_user = self._build_application_current_user(payload)
employee = ExpenseClaimAccessPolicy(self.db).resolve_current_employee(current_user)
if not facts["applicant"]:
facts["applicant"] = str(
context_json.get("name")
or context_json.get("user_name")
or context_json.get("applicant")
or (employee.name if employee is not None else "")
or current_user.name
or ""
).strip()
if not facts["grade"]:
facts["grade"] = str(
context_json.get("grade")
or context_json.get("employee_grade")
or context_json.get("employeeGrade")
or current_user.grade
or (employee.grade if employee is not None else "")
or ""
).strip()
if not facts["department"]:
facts["department"] = str(
context_json.get("department")
or context_json.get("department_name")
or context_json.get("departmentName")
or current_user.department_name
or (
employee.organization_unit.name
if employee is not None and employee.organization_unit is not None
else ""
)
or ""
).strip()
if not facts["position"]:
facts["position"] = str(
context_json.get("position")
or context_json.get("employee_position")
or context_json.get("employeePosition")
or current_user.position
or (employee.position if employee is not None else "")
or ""
).strip()
if not facts["manager_name"]:
facts["manager_name"] = str(
context_json.get("manager_name")
or context_json.get("managerName")
or context_json.get("direct_manager_name")
or context_json.get("directManagerName")
or current_user.manager_name
or (
employee.manager.name
if employee is not None and employee.manager is not None
else ""
)
or (
employee.organization_unit.manager_name
if employee is not None and employee.organization_unit is not None
else ""
)
or ""
).strip()
if not facts["application_type"]:
facts["application_type"] = self._infer_application_type(facts)
facts["time"] = self._expand_application_time_with_days(
facts.get("time", ""),
facts.get("days", ""),
payload.context_json or {},
)
if self._is_application_missing_value(facts.get("days", "")):
range_days = resolve_application_days_from_time_range(facts.get("time", ""))
if range_days:
facts["days"] = f"{range_days}"
self._apply_rule_center_travel_policy_to_application_facts(payload, facts)
apply_application_system_estimate_to_facts(facts)
return facts
def _apply_rule_center_travel_policy_to_application_facts(
self,
payload: UserAgentRequest,
facts: dict[str, str],
) -> None:
if "差旅" not in str(facts.get("application_type") or "") and "出差" not in str(facts.get("application_type") or ""):
return
location = str(facts.get("location") or "").strip()
grade = str(facts.get("grade") or "").strip()
if not location or not grade:
return
days = self._parse_application_days_count(facts.get("days", "")) or 1
try:
result = TravelReimbursementCalculatorService(self.db).calculate(
TravelReimbursementCalculatorRequest(days=days, location=location, grade=grade),
self._build_application_current_user(payload),
)
except ValueError:
return
hotel_rate = self._format_application_policy_money(result.hotel_rate)
hotel_amount = self._format_application_policy_money(result.hotel_amount)
allowance_rate = self._format_application_policy_money(result.total_allowance_rate)
allowance_amount = self._format_application_policy_money(result.allowance_amount)
if hotel_rate:
facts["lodging_daily_cap"] = f"{hotel_rate}元/天"
if hotel_amount:
facts["hotel_amount"] = f"{hotel_amount}"
if allowance_rate:
facts["subsidy_daily_cap"] = f"{allowance_rate}元/天"
if allowance_amount:
facts["allowance_amount"] = f"{allowance_amount}"
if str(result.matched_city or "").strip():
facts["matched_city"] = str(result.matched_city).strip()
if str(result.rule_name or "").strip():
facts["rule_name"] = str(result.rule_name).strip()
if str(result.rule_version or "").strip():
facts["rule_version"] = str(result.rule_version).strip()
@staticmethod
def _format_application_policy_money(value: object) -> str:
try:
amount = Decimal(str(value or "0")).quantize(Decimal("0.01"))
except (InvalidOperation, ValueError):
return ""
if amount == amount.to_integral():
return f"{int(amount):,}"
return f"{amount:,.2f}".rstrip("0").rstrip(".")
@staticmethod
def _parse_application_days_count(value: object) -> int:
match = re.search(r"\d+", str(value or ""))
if not match:
return 0
try:
return max(0, int(match.group(0)))
except ValueError:
return 0
@staticmethod
def _resolve_application_preview_facts(context_json: dict[str, object]) -> dict[str, str]:
preview = context_json.get("application_preview")
if not isinstance(preview, dict):
return {}
fields = preview.get("fields")
if not isinstance(fields, dict):
return {}
def pick(*keys: str) -> str:
for key in keys:
value = str(fields.get(key) or "").strip()
if value:
return value
return ""
reason = UserAgentApplicationMixin._cleanup_application_reason_candidate(pick("reason"))
return {
"application_type": UserAgentApplicationMixin._normalize_application_type_label(
pick("applicationType", "application_type")
),
"time": pick("time", "timeRange", "time_range"),
"location": pick("location"),
"reason": reason,
"days": pick("days"),
"transport_mode": pick("transportMode", "transport_mode"),
"amount": pick("amount"),
"applicant": pick("applicant", "name", "userName", "user_name"),
"grade": pick("grade"),
"department": pick("department", "departmentName", "department_name"),
"position": pick("position", "employeePosition", "employee_position"),
"manager_name": pick("managerName", "manager_name", "directManagerName", "direct_manager_name"),
"lodging_daily_cap": pick("lodgingDailyCap", "lodging_daily_cap"),
"subsidy_daily_cap": pick("subsidyDailyCap", "subsidy_daily_cap"),
"transport_policy": pick("transportPolicy", "transport_policy"),
"policy_estimate": pick("policyEstimate", "policy_estimate"),
"matched_city": pick("matchedCity", "matched_city"),
"rule_name": pick("ruleName", "rule_name"),
"rule_version": pick("ruleVersion", "rule_version"),
"hotel_amount": pick("hotelAmount", "hotel_amount"),
"allowance_amount": pick("allowanceAmount", "allowance_amount"),
"transport_estimated_amount": pick("transportEstimatedAmount", "transport_estimated_amount"),
"transport_estimate_source": pick("transportEstimateSource", "transport_estimate_source"),
"transport_estimate_confidence": pick("transportEstimateConfidence", "transport_estimate_confidence"),
"policy_total_amount": pick("policyTotalAmount", "policy_total_amount"),
}
@staticmethod
def _is_application_missing_value(value: object) -> bool:
return str(value or "").strip().lower() in APPLICATION_MISSING_VALUES
def _resolve_expense_application_step(
self,
payload: UserAgentRequest,
facts: dict[str, str],
) -> str:
if self._is_application_save_draft_action(payload):
return "draft"
if self._resolve_application_missing_base_fields(facts):
return "ask_missing"
if self._resolve_application_missing_followup_fields(facts):
return "ask_missing"
if self._is_application_submit_confirmation(payload):
return "submitted"
return "preview"