Files
X-Financial/server/src/app/services/agent_asset_travel_spreadsheets.py
caoxiaozhu 9f7b8b46a3 Refine travel reimbursement steward flow
Align planner, runtime rules, and policy assets so travel guidance
matches the updated reimbursement workflow.
2026-06-15 22:55:18 +08:00

555 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import re
from copy import copy
from io import BytesIO
from pathlib import Path
from typing import Any
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from app.services.travel_policy_grades import TRAVEL_GRADE_KEYS
LODGING_SHEET_NAME = "差旅住宿费标准"
ALLOWANCE_SHEET_NAME = "出差补助标准"
TRANSPORT_CLASS_SHEET_NAME = "交通工具等级标准"
TRANSPORT_ESTIMATE_SHEET_NAME = "交通费用预估表"
TRAVEL_GRADE_LABELS = {
"P0": "实习/见习",
"P1": "基础员工",
"P2": "初级员工",
"P3": "普通员工",
"P4": "资深员工/主管",
"P5": "基层经理",
"P6": "中层经理",
"P7": "高层经理",
"P8": "董事会",
}
def build_travel_lodging_workbook_from_source(
source_path: Path,
fallback_rows: list[list[object]],
) -> bytes:
rows: list[list[object]] = []
if source_path.exists():
workbook = load_workbook(source_path, read_only=True, data_only=True)
try:
if LODGING_SHEET_NAME in workbook.sheetnames:
rows = _extract_lodging_rows(
list(workbook[LODGING_SHEET_NAME].iter_rows(values_only=True))
)
finally:
workbook.close()
if not rows:
rows = _fallback_lodging_rows(fallback_rows)
return build_styled_workbook(
LODGING_SHEET_NAME,
["序号", "地区", "地区(城市)", *TRAVEL_GRADE_KEYS, "常规超标限额"],
[
[
row[0],
row[1],
row[2],
*_expand_lodging_grade_amounts(row),
row[7],
]
for row in rows
],
column_widths=[8, 14, 28, *([12] * len(TRAVEL_GRADE_KEYS)), 16],
)
def build_travel_grade_mapping_workbook() -> bytes:
return build_styled_workbook(
"差旅职级映射表",
["序号", "职级", "职级名称", "住宿标准列", "交通标准行", "适用说明", "备注"],
[
[index, grade, TRAVEL_GRADE_LABELS[grade], grade, grade, _grade_usage_note(grade), ""]
for index, grade in enumerate(TRAVEL_GRADE_KEYS, start=1)
],
column_widths=[8, 14, 28, 14, 14, 32, 32],
)
def build_travel_allowance_workbook() -> bytes:
return build_styled_workbook(
ALLOWANCE_SHEET_NAME,
["序号", "补助区域", "伙食补助/天", "基本补助/天", "补助合计/天", "适用说明", "备注"],
[
[1, "直辖市/特区", 65, 35, 100, "北京、上海、天津、重庆、深圳等地区", "按出差自然日计算"],
[2, "其他地区", 55, 35, 90, "未单列的境内城市和地区", "申请阶段用于预算占用"],
[3, "新疆-乌鲁木齐", 75, 45, 120, "乌鲁木齐市", "按高原/远途地区补助口径执行"],
[4, "新疆-其他", 65, 40, 105, "新疆除乌鲁木齐外地区", "按远途地区补助口径执行"],
[5, "西藏", 80, 50, 130, "西藏自治区", "按高原地区补助口径执行"],
[6, "港澳台", 120, 80, 200, "香港、澳门、台湾地区", "需按出入境及外币票据要求补充材料"],
[7, "国外", 180, 120, 300, "境外国家和地区", "外币折算按财务汇率口径执行"],
],
column_widths=[8, 18, 16, 16, 16, 34, 34],
)
def build_travel_transport_class_workbook() -> bytes:
return build_styled_workbook(
TRANSPORT_CLASS_SHEET_NAME,
[
"序号",
"职级",
"职级说明",
"飞机标准",
"火车标准",
"轮船标准",
"适用说明",
"超标处理",
"备注",
],
[
[
index,
grade,
TRAVEL_GRADE_LABELS[grade],
"经济舱",
"二等座/硬卧/硬座" if grade != "P8" else "二等座/软卧/硬卧",
"二等舱",
"按已审批出差申请执行" if grade in {"P6", "P7", "P8"} else "优先选择火车或高铁;确需飞机时按经济舱执行",
"超出标准需说明原因并走审批" if grade != "P8" else "超出标准需董事会或授权审批确认",
"申请阶段按交通费用预估表占用预算" if grade != "P8" else "P8 为董事会级别",
]
for index, grade in enumerate(TRAVEL_GRADE_KEYS, start=1)
],
column_widths=[8, 18, 34, 14, 22, 14, 42, 34, 34],
)
def build_travel_season_mapping_workbook(source_path: Path) -> bytes:
rows: list[list[object]] = []
if source_path.exists():
workbook = load_workbook(source_path, read_only=True, data_only=True)
try:
if LODGING_SHEET_NAME in workbook.sheetnames:
lodging_rows = _extract_lodging_rows(
list(workbook[LODGING_SHEET_NAME].iter_rows(values_only=True))
)
rows = [
[row[0], row[1], row[2], row[3], row[7], row[8]]
for row in lodging_rows
]
finally:
workbook.close()
if not rows:
rows = [[1, "北京", "北京", "", 500, ""]]
return build_styled_workbook(
"地区淡旺季映射表",
["序号", "地区", "地区(城市)", "旺季期间(月)", "常规超标限额", "旺季超标限额"],
rows,
column_widths=[8, 14, 28, 18, 16, 16],
)
def build_travel_transport_estimate_workbook() -> bytes:
return build_styled_workbook(
TRANSPORT_ESTIMATE_SHEET_NAME,
[
"序号",
"出发城市",
"目的地",
"目的地范围",
"交通方式",
"单程预估金额",
"往返预估金额",
"置信度",
"预算占用口径",
"来源说明",
],
[
[1, "武汉", "北京", "高频城市", "火车", 520, 1040, "基础规则", "往返二等座/硬卧预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[2, "武汉", "北京", "高频城市", "飞机", 700, 1400, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"],
[3, "武汉", "上海", "高频城市", "火车", 360, 720, "基础规则", "往返二等座预估", "参考历史票据样例与 12306 公布票价查询口径"],
[4, "武汉", "上海", "高频城市", "飞机", 600, 1200, "基础规则", "往返经济舱预估", "参考高频航线公开往返价格,按申请预算保守占用"],
[5, "武汉", "广州", "高频城市", "火车", 470, 940, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[6, "武汉", "广州", "高频城市", "飞机", 650, 1300, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"],
[7, "武汉", "深圳", "高频城市", "火车", 540, 1080, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[8, "武汉", "深圳", "高频城市", "飞机", 700, 1400, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"],
[9, "武汉", "杭州", "高频城市", "火车", 330, 660, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[10, "武汉", "南京", "高频城市", "火车", 260, 520, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[11, "武汉", "成都", "普通城市", "火车", 350, 700, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[12, "武汉", "成都", "普通城市", "飞机", 600, 1200, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"],
[13, "武汉", "西安", "普通城市", "火车", 300, 600, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[14, "武汉", "厦门", "沿海城市", "火车", 450, 900, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"],
[15, "武汉", "厦门", "沿海城市", "飞机", 650, 1300, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和沿海航线公开价格"],
[16, "武汉", "三亚", "远途地区", "飞机", 900, 1800, "基础规则", "往返经济舱预估", "参考旅游/远途航线公开价格,申请阶段占用预算用"],
[17, "武汉", "乌鲁木齐", "远途地区", "飞机", 1600, 3200, "基础规则", "往返经济舱预估", "远途航线按预算保守占用"],
[18, "武汉", "拉萨", "远途地区", "飞机", 1800, 3600, "基础规则", "往返经济舱预估", "远途航线按预算保守占用"],
[19, "*", "", "高频城市", "火车", 520, 1040, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"],
[20, "*", "", "高频城市", "飞机", 650, 1300, "兜底", "往返经济舱预估", "未命中精确城市对时使用"],
[21, "*", "", "沿海城市", "火车", 520, 1040, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"],
[22, "*", "", "沿海城市", "飞机", 700, 1400, "兜底", "往返经济舱预估", "未命中精确城市对时使用"],
[23, "*", "", "远途地区", "火车", 900, 1800, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"],
[24, "*", "", "远途地区", "飞机", 1600, 3200, "兜底", "往返经济舱预估", "未命中精确城市对时使用"],
[25, "*", "", "普通城市", "火车", 360, 720, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"],
[26, "*", "", "普通城市", "飞机", 600, 1200, "兜底", "往返经济舱预估", "未命中精确城市对时使用"],
[27, "*", "", "普通城市", "轮船", 320, 640, "兜底", "往返二等舱预估", "水路交通暂无实时接口时使用"],
],
column_widths=[8, 14, 18, 16, 12, 16, 16, 12, 24, 42],
)
def build_xlsx_bytes_from_source_sheet(source_path: Path, sheet_name: str) -> bytes:
source_workbook = load_workbook(source_path, read_only=False, data_only=False)
try:
if sheet_name not in source_workbook.sheetnames:
raise ValueError("原始规则表中没有对应工作表。")
source_sheet = source_workbook[sheet_name]
target_workbook = Workbook()
target_sheet = target_workbook.active
target_sheet.title = sheet_name
_copy_worksheet(source_sheet, target_sheet)
_clarify_travel_source_sheet_headers(sheet_name, target_sheet)
_remove_redundant_title_row(target_sheet, sheet_name)
target_sheet.sheet_view.zoomScale = 120
target_sheet.sheet_view.zoomScaleNormal = 120
workbook_buffer = BytesIO()
target_workbook.save(workbook_buffer)
target_workbook.close()
return workbook_buffer.getvalue()
finally:
source_workbook.close()
def build_styled_workbook(
sheet_name: str,
headers: list[str],
rows: list[list[object]],
*,
column_widths: list[int],
) -> bytes:
workbook = Workbook()
worksheet = workbook.active
worksheet.title = sheet_name
header_fill = PatternFill(fill_type="solid", fgColor="FFD9EAF7")
thin_side = Side(style="thin", color="FF7F9DB9")
table_border = Border(left=thin_side, right=thin_side, top=thin_side, bottom=thin_side)
for column_index, header in enumerate(headers, start=1):
cell = worksheet.cell(row=1, column=column_index, value=header)
cell.font = Font(name="Microsoft YaHei", size=12, bold=True, color="FF0F172A")
cell.fill = header_fill
cell.border = table_border
cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
worksheet.row_dimensions[1].height = 30
for row_index, row in enumerate(rows, start=2):
for column_index, value in enumerate(row, start=1):
cell = worksheet.cell(row=row_index, column=column_index, value=value)
cell.font = Font(name="Microsoft YaHei", size=11, color="FF0F172A")
cell.border = table_border
cell.alignment = Alignment(vertical="center", wrap_text=True)
worksheet.row_dimensions[row_index].height = 30
for column_index, width in enumerate(column_widths, start=1):
worksheet.column_dimensions[_column_letter(column_index)].width = width
worksheet.freeze_panes = "A2"
worksheet.sheet_view.zoomScale = 120
worksheet.sheet_view.zoomScaleNormal = 120
workbook_buffer = BytesIO()
workbook.save(workbook_buffer)
workbook.close()
return workbook_buffer.getvalue()
def _extract_lodging_rows(source_rows: list[tuple[Any, ...]]) -> list[list[object]]:
header_index = -1
indexes: dict[str, int] = {}
expected_headers = {
"seq": "序号",
"region": "地区",
"city": "地区(城市)",
"peak_period": "旺季期间",
"p7": "公司级管理人员、高层经理P7及以上",
"p4": "中层经理、基层经理P4-P6、外聘专家",
"p1": "其他员工",
"regular_limit": "超标限额",
"peak_limit": "旺季超标限额",
}
for row_index, row in enumerate(source_rows[:10]):
values = [str(value or "").strip() for value in row]
if "地区(城市)" not in values:
continue
for key, label in expected_headers.items():
if label in values:
indexes[key] = values.index(label)
header_index = row_index
break
if header_index < 0 or "city" not in indexes:
return []
rows: list[list[object]] = []
for row in source_rows[header_index + 1 :]:
region = _row_value(row, indexes.get("region", -1))
raw_city = _row_value(row, indexes.get("city", -1))
cities = _split_location_names(raw_city)
if not cities:
continue
period_by_city, shared_period = _parse_peak_periods(
_row_value(row, indexes.get("peak_period", -1))
)
for city in cities:
period = period_by_city.get(_normalize_period_key(city), shared_period)
rows.append(
[
_row_value(row, indexes.get("seq", -1)),
region,
city,
period,
_row_value(row, indexes.get("p7", -1)),
_row_value(row, indexes.get("p4", -1)),
_row_value(row, indexes.get("p1", -1)),
_row_value(row, indexes.get("regular_limit", -1)),
_row_value(row, indexes.get("peak_limit", -1)) if period else "",
]
)
return rows
def _fallback_lodging_rows(fallback_rows: list[list[object]]) -> list[list[object]]:
rows: list[list[object]] = []
for index, row in enumerate(fallback_rows[1:], start=1):
if len(row) >= 11:
junior_amount = row[5]
manager_amount = row[8]
executive_amount = row[10]
else:
junior_amount = row[2] if len(row) > 2 else ""
manager_amount = row[3] if len(row) > 3 else ""
executive_amount = row[4] if len(row) > 4 else ""
rows.append(
[
index,
"",
row[0] if len(row) > 0 else "",
"",
executive_amount,
manager_amount,
junior_amount,
executive_amount,
"",
]
)
return rows
def _expand_lodging_grade_amounts(row: list[object]) -> list[object]:
executive_amount = row[4] if len(row) > 4 else ""
manager_amount = row[5] if len(row) > 5 else ""
junior_amount = row[6] if len(row) > 6 else ""
return [
junior_amount,
junior_amount,
junior_amount,
junior_amount,
manager_amount,
manager_amount,
manager_amount,
executive_amount,
executive_amount,
]
def _grade_usage_note(grade: str) -> str:
if grade == "P8":
return "最高职级,适用于董事会"
if grade in {"P6", "P7"}:
return "适用于中高层管理人员"
if grade in {"P4", "P5"}:
return "适用于主管及基层管理人员"
return "适用于员工序列"
def _split_location_names(value: object) -> list[str]:
text = str(value or "").strip()
if not text:
return []
text = re.sub(r"[(].*?[)]", "", text)
text = re.sub(r"^\s*\d+\s*个中心城区[、,]?", "", text)
text = re.sub(r"[;,/]+", "", text)
names: list[str] = []
for part in text.split(""):
cleaned = _normalize_location_name(part)
if not cleaned or cleaned == "中心城区":
continue
names.append(cleaned)
return list(dict.fromkeys(names))
def _parse_peak_periods(value: object) -> tuple[dict[str, str], str]:
text = str(value or "").strip()
if not text:
return ({}, "")
period_by_city: dict[str, str] = {}
for part in re.split(r"[;]", text):
if "" not in part and ":" not in part:
continue
city, period = re.split(r"[:]", part, maxsplit=1)
normalized_city = _normalize_period_key(city)
normalized_period = _normalize_peak_period(period)
if normalized_city and normalized_period:
period_by_city[normalized_city] = normalized_period
if period_by_city:
return (period_by_city, "")
return ({}, _normalize_peak_period(text))
def _normalize_peak_period(value: object) -> str:
text = str(value or "").strip()
text = re.sub(r"\s+", "", text)
text = re.sub(r"(月|上旬|中旬|下旬)", "", text)
text = re.sub(r"[、,;;]+", ",", text)
text = re.sub(r"[^0-9,\-]", "", text)
text = re.sub(r",{2,}", ",", text).strip(",")
return text
def _normalize_period_key(value: object) -> str:
return _normalize_location_name(value).removesuffix("")
def _normalize_location_name(value: object) -> str:
text = str(value or "").strip()
text = re.sub(r"\s+", "", text)
text = text.removesuffix("")
if text != "其他地区":
text = text.removesuffix("地区")
return text
def _row_value(row: tuple[Any, ...], index: int) -> object:
if index < 0 or index >= len(row):
return ""
return "" if row[index] is None else row[index]
def _copy_worksheet(source_sheet, target_sheet) -> None:
target_sheet.freeze_panes = source_sheet.freeze_panes
target_sheet.sheet_format = copy(source_sheet.sheet_format)
target_sheet.sheet_properties = copy(source_sheet.sheet_properties)
target_sheet.page_margins = copy(source_sheet.page_margins)
target_sheet.page_setup = copy(source_sheet.page_setup)
target_sheet.print_options = copy(source_sheet.print_options)
for row in source_sheet.iter_rows():
for source_cell in row:
target_cell = target_sheet[source_cell.coordinate]
target_cell.value = source_cell.value
if source_cell.has_style:
target_cell.font = copy(source_cell.font)
target_cell.fill = copy(source_cell.fill)
target_cell.border = copy(source_cell.border)
target_cell.alignment = copy(source_cell.alignment)
target_cell.protection = copy(source_cell.protection)
target_cell.number_format = source_cell.number_format
if source_cell.hyperlink:
target_cell._hyperlink = copy(source_cell.hyperlink)
if source_cell.comment:
target_cell.comment = copy(source_cell.comment)
for merged_range in source_sheet.merged_cells.ranges:
target_sheet.merge_cells(str(merged_range))
for key, source_dimension in source_sheet.column_dimensions.items():
target_dimension = target_sheet.column_dimensions[key]
target_dimension.width = source_dimension.width
target_dimension.hidden = source_dimension.hidden
target_dimension.bestFit = source_dimension.bestFit
target_dimension.outlineLevel = source_dimension.outlineLevel
target_dimension.collapsed = source_dimension.collapsed
for index, source_dimension in source_sheet.row_dimensions.items():
target_dimension = target_sheet.row_dimensions[index]
target_dimension.height = source_dimension.height
target_dimension.hidden = source_dimension.hidden
target_dimension.outlineLevel = source_dimension.outlineLevel
target_dimension.collapsed = source_dimension.collapsed
def _clarify_travel_source_sheet_headers(sheet_name: str, worksheet) -> None:
if sheet_name == "交通工具等级标准":
worksheet["A4"] = "P5+"
worksheet["A5"] = "P1-P4"
worksheet.row_dimensions[4].height = max(worksheet.row_dimensions[4].height or 0, 42)
worksheet.row_dimensions[5].height = max(worksheet.row_dimensions[5].height or 0, 42)
worksheet.column_dimensions["A"].width = max(worksheet.column_dimensions["A"].width or 0, 18)
def _remove_redundant_title_row(worksheet, title: str) -> None:
first_cell_value = str(worksheet["A1"].value or "").strip()
if first_cell_value != str(title or "").strip():
return
has_other_first_row_values = any(
str(worksheet.cell(row=1, column=column_index).value or "").strip()
for column_index in range(2, worksheet.max_column + 1)
)
if has_other_first_row_values:
return
shifted_merged_ranges: list[tuple[int, int, int, int]] = []
for merged_range in list(worksheet.merged_cells.ranges):
range_text = str(merged_range)
min_col = merged_range.min_col
min_row = merged_range.min_row
max_col = merged_range.max_col
max_row = merged_range.max_row
worksheet.unmerge_cells(range_text)
if min_row <= 1:
continue
shifted_merged_ranges.append((min_col, min_row - 1, max_col, max_row - 1))
old_freeze_panes = worksheet.freeze_panes
worksheet.delete_rows(1, 1)
for min_col, min_row, max_col, max_row in shifted_merged_ranges:
worksheet.merge_cells(
start_row=min_row,
start_column=min_col,
end_row=max_row,
end_column=max_col,
)
worksheet.freeze_panes = _shift_freeze_panes_after_deleted_first_row(old_freeze_panes)
def _shift_freeze_panes_after_deleted_first_row(freeze_panes: object) -> str | None:
if not freeze_panes:
return None
coordinate = str(freeze_panes)
match = re.fullmatch(r"([A-Z]+)([0-9]+)", coordinate)
if not match:
return coordinate
column, row_text = match.groups()
row_index = int(row_text)
if row_index <= 1:
return None
return f"{column}{row_index - 1}"
def _column_letter(index: int) -> str:
value = max(1, int(index))
result = ""
while value > 0:
value, remainder = divmod(value - 1, 26)
result = f"{chr(65 + remainder)}{result}"
return result