from __future__ import annotations import re from datetime import date from decimal import Decimal, InvalidOperation, ROUND_HALF_UP LOCATION_BANDS = { "premium": ("北京", "上海", "广州", "深圳", "杭州", "南京", "苏州", "成都", "重庆", "天津"), "remote": ("新疆", "西藏", "青海", "甘肃", "宁夏", "内蒙古", "海南", "香港", "澳门", "台湾", "海外", "国外"), "coastal": ("上海", "广州", "深圳", "厦门", "福州", "青岛", "大连", "宁波", "舟山", "海口", "三亚", "天津"), } TRANSPORT_PRICE_BASE = { "火车": {"default": Decimal("360"), "premium": Decimal("520"), "remote": Decimal("900"), "coastal": Decimal("520")}, "飞机": {"default": Decimal("850"), "premium": Decimal("1100"), "remote": Decimal("1800"), "coastal": Decimal("1050")}, "轮船": {"default": Decimal("320"), "premium": Decimal("480"), "remote": Decimal("680"), "coastal": Decimal("520")}, } LODGING_DAILY_BASE = { "default": Decimal("420"), "premium": Decimal("600"), "remote": Decimal("520"), "coastal": Decimal("500"), } ALLOWANCE_DAILY_BASE = { "default": Decimal("100"), "premium": Decimal("120"), "remote": Decimal("120"), "coastal": Decimal("110"), } def parse_application_days(days_text: str) -> int: match = re.search(r"\d+", str(days_text or "")) if not match: return 1 return max(1, int(match.group(0))) def parse_application_money(value: object) -> Decimal: normalized = re.sub(r"[^\d.\-]", "", str(value or "").replace(",", "")) if not normalized: return Decimal("0") try: return Decimal(normalized) except (InvalidOperation, ValueError): return Decimal("0") def format_application_money(value: Decimal | int | float | str) -> str: amount = parse_application_money(value) if not isinstance(value, Decimal) else value quantized = amount.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) if quantized == quantized.to_integral(): return f"{int(quantized):,}" return f"{quantized:,.2f}".rstrip("0").rstrip(".") def normalize_application_transport_mode(value: str) -> str: text = str(value or "").strip() if re.search(r"飞机|机票|航班|乘机|坐飞机", text): return "飞机" if re.search(r"轮船|船票|客轮|渡轮|邮轮|坐船", text): return "轮船" if re.search(r"火车|高铁|动车|铁路|列车", text): return "火车" return text if text in TRANSPORT_PRICE_BASE else "" def resolve_application_location_band(location: str) -> str: text = str(location or "").strip() if any(keyword in text for keyword in LOCATION_BANDS["remote"]): return "remote" if any(keyword in text for keyword in LOCATION_BANDS["premium"]): return "premium" if any(keyword in text for keyword in LOCATION_BANDS["coastal"]): return "coastal" return "default" def _round_to_ten(value: Decimal) -> Decimal: return (value / Decimal("10")).quantize(Decimal("1"), rounding=ROUND_HALF_UP) * Decimal("10") def parse_application_start_date(time_text: object) -> str: match = re.search(r"(20\d{2})[年\-/.](\d{1,2})[月\-/.](\d{1,2})", str(time_text or "")) if not match: return "" try: return date(int(match.group(1)), int(match.group(2)), int(match.group(3))).isoformat() except ValueError: return "" def _resolve_ticket_price_factor(query_date: str) -> Decimal: if not query_date: return Decimal("1.00") try: parsed = date.fromisoformat(query_date) except ValueError: return Decimal("1.00") factor = Decimal("1.00") if parsed.weekday() == 0: factor += Decimal("0.04") if parsed.weekday() in (4, 6): factor += Decimal("0.08") if parsed.month in {1, 2, 7, 8, 10}: factor += Decimal("0.06") jitter = (parsed.year + parsed.month * 13 + parsed.day * 7) % 7 - 3 factor += Decimal(jitter) / Decimal("100") if factor < Decimal("0.88"): return Decimal("0.88") if factor > Decimal("1.22"): return Decimal("1.22") return factor def _resolve_mock_query_latency_ms(query_date: str, mode: str, location_band: str) -> int: try: parsed = date.fromisoformat(query_date) if query_date else None except ValueError: parsed = None seed = len(mode) * 43 + len(location_band) * 29 if parsed: seed += parsed.year + parsed.month * 17 + parsed.day * 31 return 360 + seed % 420 def build_application_system_estimate( *, transport_mode: str, location: str, days_text: str, time_text: object = "", lodging_amount: object = None, allowance_amount: object = None, ) -> dict[str, str]: mode = normalize_application_transport_mode(transport_mode) if not mode: return {} days = parse_application_days(days_text) location_band = resolve_application_location_band(location) query_date = parse_application_start_date(time_text) price_factor = _resolve_ticket_price_factor(query_date) simulated_latency_ms = _resolve_mock_query_latency_ms(query_date, mode, location_band) transport_one_way = TRANSPORT_PRICE_BASE[mode].get(location_band, TRANSPORT_PRICE_BASE[mode]["default"]) transport_amount = _round_to_ten(transport_one_way * Decimal("2") * price_factor) lodging = parse_application_money(lodging_amount) allowance = parse_application_money(allowance_amount) lodging_daily = LODGING_DAILY_BASE.get(location_band, LODGING_DAILY_BASE["default"]) allowance_daily = ALLOWANCE_DAILY_BASE.get(location_band, ALLOWANCE_DAILY_BASE["default"]) if lodging <= 0: lodging = lodging_daily * days if allowance <= 0: allowance = allowance_daily * days total_amount = transport_amount + lodging + allowance transport_display = format_application_money(transport_amount) lodging_display = format_application_money(lodging) allowance_display = format_application_money(allowance) total_display = format_application_money(total_amount) return { "amount": f"{total_display}元", "lodging_daily_cap": f"{format_application_money(lodging_daily)}元/天", "subsidy_daily_cap": f"{format_application_money(allowance_daily)}元/天", "transport_policy": f"预估交通费用 {transport_display}元", "policy_estimate": ( f"交通 {transport_display}元 + 住宿 {lodging_display}元" f" + 补贴 {allowance_display}元 = {total_display}元({days}天)" ), "matched_city": str(location or "").strip(), "transport_estimated_amount": f"{transport_display}元", "transport_estimate_date": query_date, "transport_query_latency_ms": str(simulated_latency_ms), "policy_total_amount": f"{total_display}元", "estimate_source": "mock_ticket_price_query_v1", "estimate_confidence": "mock", } def _is_pending_application_amount(value: str) -> bool: normalized = str(value or "").strip() return not normalized or normalized in {"待测算", "待补充", "未知"} def apply_application_system_estimate_to_facts(facts: dict[str, str]) -> None: estimate = build_application_system_estimate( transport_mode=str(facts.get("transport_mode") or ""), location=str(facts.get("matched_city") or facts.get("location") or ""), days_text=str(facts.get("days") or ""), time_text=facts.get("time") or "", lodging_amount=facts.get("hotel_amount") or None, allowance_amount=facts.get("allowance_amount") or None, ) if not estimate: return if _is_pending_application_amount(facts.get("amount", "")): facts["amount"] = estimate["amount"] field_map = { "lodging_daily_cap": "lodging_daily_cap", "subsidy_daily_cap": "subsidy_daily_cap", "transport_policy": "transport_policy", "policy_estimate": "policy_estimate", "matched_city": "matched_city", "transport_estimated_amount": "transport_estimated_amount", "transport_estimate_date": "transport_estimate_date", "transport_query_latency_ms": "transport_query_latency_ms", "transport_estimate_source": "estimate_source", "transport_estimate_confidence": "estimate_confidence", "policy_total_amount": "policy_total_amount", } for target, source in field_map.items(): if not str(facts.get(target) or "").strip() and estimate.get(source): facts[target] = estimate[source]