from __future__ import annotations import re from copy import copy from io import BytesIO from pathlib import Path from typing import Any from openpyxl import Workbook, load_workbook from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from app.services.travel_policy_grades import TRAVEL_GRADE_KEYS LODGING_SHEET_NAME = "差旅住宿费标准" ALLOWANCE_SHEET_NAME = "出差补助标准" TRANSPORT_CLASS_SHEET_NAME = "交通工具等级标准" TRANSPORT_ESTIMATE_SHEET_NAME = "交通费用预估表" TRAVEL_GRADE_LABELS = { "P0": "实习/见习", "P1": "基础员工", "P2": "初级员工", "P3": "普通员工", "P4": "资深员工/主管", "P5": "基层经理", "P6": "中层经理", "P7": "高层经理", "P8": "董事会", } def build_travel_lodging_workbook_from_source( source_path: Path, fallback_rows: list[list[object]], ) -> bytes: rows: list[list[object]] = [] if source_path.exists(): workbook = load_workbook(source_path, read_only=True, data_only=True) try: if LODGING_SHEET_NAME in workbook.sheetnames: rows = _extract_lodging_rows( list(workbook[LODGING_SHEET_NAME].iter_rows(values_only=True)) ) finally: workbook.close() if not rows: rows = _fallback_lodging_rows(fallback_rows) return build_styled_workbook( LODGING_SHEET_NAME, ["序号", "地区", "地区(城市)", *TRAVEL_GRADE_KEYS, "常规超标限额"], [ [ row[0], row[1], row[2], *_expand_lodging_grade_amounts(row), row[7], ] for row in rows ], column_widths=[8, 14, 28, *([12] * len(TRAVEL_GRADE_KEYS)), 16], ) def build_travel_grade_mapping_workbook() -> bytes: return build_styled_workbook( "差旅职级映射表", ["序号", "职级", "职级名称", "住宿标准列", "交通标准行", "适用说明", "备注"], [ [index, grade, TRAVEL_GRADE_LABELS[grade], grade, grade, _grade_usage_note(grade), ""] for index, grade in enumerate(TRAVEL_GRADE_KEYS, start=1) ], column_widths=[8, 14, 28, 14, 14, 32, 32], ) def build_travel_allowance_workbook() -> bytes: return build_styled_workbook( ALLOWANCE_SHEET_NAME, ["序号", "补助区域", "伙食补助/天", "基本补助/天", "补助合计/天", "适用说明", "备注"], [ [1, "直辖市/特区", 65, 35, 100, "北京、上海、天津、重庆、深圳等地区", "按出差自然日计算"], [2, "其他地区", 55, 35, 90, "未单列的境内城市和地区", "申请阶段用于预算占用"], [3, "新疆-乌鲁木齐", 75, 45, 120, "乌鲁木齐市", "按高原/远途地区补助口径执行"], [4, "新疆-其他", 65, 40, 105, "新疆除乌鲁木齐外地区", "按远途地区补助口径执行"], [5, "西藏", 80, 50, 130, "西藏自治区", "按高原地区补助口径执行"], [6, "港澳台", 120, 80, 200, "香港、澳门、台湾地区", "需按出入境及外币票据要求补充材料"], [7, "国外", 180, 120, 300, "境外国家和地区", "外币折算按财务汇率口径执行"], ], column_widths=[8, 18, 16, 16, 16, 34, 34], ) def build_travel_transport_class_workbook() -> bytes: return build_styled_workbook( TRANSPORT_CLASS_SHEET_NAME, [ "序号", "职级", "职级说明", "飞机标准", "火车标准", "轮船标准", "适用说明", "超标处理", "备注", ], [ [ index, grade, TRAVEL_GRADE_LABELS[grade], "经济舱", "二等座/硬卧/硬座" if grade != "P8" else "二等座/软卧/硬卧", "二等舱", "按已审批出差申请执行" if grade in {"P6", "P7", "P8"} else "优先选择火车或高铁;确需飞机时按经济舱执行", "超出标准需说明原因并走审批" if grade != "P8" else "超出标准需董事会或授权审批确认", "申请阶段按交通费用预估表占用预算" if grade != "P8" else "P8 为董事会级别", ] for index, grade in enumerate(TRAVEL_GRADE_KEYS, start=1) ], column_widths=[8, 18, 34, 14, 22, 14, 42, 34, 34], ) def build_travel_season_mapping_workbook(source_path: Path) -> bytes: rows: list[list[object]] = [] if source_path.exists(): workbook = load_workbook(source_path, read_only=True, data_only=True) try: if LODGING_SHEET_NAME in workbook.sheetnames: lodging_rows = _extract_lodging_rows( list(workbook[LODGING_SHEET_NAME].iter_rows(values_only=True)) ) rows = [ [row[0], row[1], row[2], row[3], row[7], row[8]] for row in lodging_rows ] finally: workbook.close() if not rows: rows = [[1, "北京", "北京", "", 500, ""]] return build_styled_workbook( "地区淡旺季映射表", ["序号", "地区", "地区(城市)", "旺季期间(月)", "常规超标限额", "旺季超标限额"], rows, column_widths=[8, 14, 28, 18, 16, 16], ) def build_travel_transport_estimate_workbook() -> bytes: return build_styled_workbook( TRANSPORT_ESTIMATE_SHEET_NAME, [ "序号", "出发城市", "目的地", "目的地范围", "交通方式", "单程预估金额", "往返预估金额", "置信度", "预算占用口径", "来源说明", ], [ [1, "武汉", "北京", "高频城市", "火车", 520, 1040, "基础规则", "往返二等座/硬卧预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [2, "武汉", "北京", "高频城市", "飞机", 700, 1400, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"], [3, "武汉", "上海", "高频城市", "火车", 360, 720, "基础规则", "往返二等座预估", "参考历史票据样例与 12306 公布票价查询口径"], [4, "武汉", "上海", "高频城市", "飞机", 600, 1200, "基础规则", "往返经济舱预估", "参考高频航线公开往返价格,按申请预算保守占用"], [5, "武汉", "广州", "高频城市", "火车", 470, 940, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [6, "武汉", "广州", "高频城市", "飞机", 650, 1300, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"], [7, "武汉", "深圳", "高频城市", "火车", 540, 1080, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [8, "武汉", "深圳", "高频城市", "飞机", 700, 1400, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"], [9, "武汉", "杭州", "高频城市", "火车", 330, 660, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [10, "武汉", "南京", "高频城市", "火车", 260, 520, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [11, "武汉", "成都", "普通城市", "火车", 350, 700, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [12, "武汉", "成都", "普通城市", "飞机", 600, 1200, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和高频航线公开价格"], [13, "武汉", "西安", "普通城市", "火车", 300, 600, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [14, "武汉", "厦门", "沿海城市", "火车", 450, 900, "基础规则", "往返二等座预估", "参考 12306 公布票价查询口径,申请阶段占用预算用"], [15, "武汉", "厦门", "沿海城市", "飞机", 650, 1300, "基础规则", "往返经济舱预估", "参考民航经济舱市场均价和沿海航线公开价格"], [16, "武汉", "三亚", "远途地区", "飞机", 900, 1800, "基础规则", "往返经济舱预估", "参考旅游/远途航线公开价格,申请阶段占用预算用"], [17, "武汉", "乌鲁木齐", "远途地区", "飞机", 1600, 3200, "基础规则", "往返经济舱预估", "远途航线按预算保守占用"], [18, "武汉", "拉萨", "远途地区", "飞机", 1800, 3600, "基础规则", "往返经济舱预估", "远途航线按预算保守占用"], [19, "*", "", "高频城市", "火车", 520, 1040, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"], [20, "*", "", "高频城市", "飞机", 650, 1300, "兜底", "往返经济舱预估", "未命中精确城市对时使用"], [21, "*", "", "沿海城市", "火车", 520, 1040, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"], [22, "*", "", "沿海城市", "飞机", 700, 1400, "兜底", "往返经济舱预估", "未命中精确城市对时使用"], [23, "*", "", "远途地区", "火车", 900, 1800, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"], [24, "*", "", "远途地区", "飞机", 1600, 3200, "兜底", "往返经济舱预估", "未命中精确城市对时使用"], [25, "*", "", "普通城市", "火车", 360, 720, "兜底", "往返二等座/硬卧预估", "未命中精确城市对时使用"], [26, "*", "", "普通城市", "飞机", 600, 1200, "兜底", "往返经济舱预估", "未命中精确城市对时使用"], [27, "*", "", "普通城市", "轮船", 320, 640, "兜底", "往返二等舱预估", "水路交通暂无实时接口时使用"], ], column_widths=[8, 14, 18, 16, 12, 16, 16, 12, 24, 42], ) def build_xlsx_bytes_from_source_sheet(source_path: Path, sheet_name: str) -> bytes: source_workbook = load_workbook(source_path, read_only=False, data_only=False) try: if sheet_name not in source_workbook.sheetnames: raise ValueError("原始规则表中没有对应工作表。") source_sheet = source_workbook[sheet_name] target_workbook = Workbook() target_sheet = target_workbook.active target_sheet.title = sheet_name _copy_worksheet(source_sheet, target_sheet) _clarify_travel_source_sheet_headers(sheet_name, target_sheet) _remove_redundant_title_row(target_sheet, sheet_name) target_sheet.sheet_view.zoomScale = 120 target_sheet.sheet_view.zoomScaleNormal = 120 workbook_buffer = BytesIO() target_workbook.save(workbook_buffer) target_workbook.close() return workbook_buffer.getvalue() finally: source_workbook.close() def build_styled_workbook( sheet_name: str, headers: list[str], rows: list[list[object]], *, column_widths: list[int], ) -> bytes: workbook = Workbook() worksheet = workbook.active worksheet.title = sheet_name header_fill = PatternFill(fill_type="solid", fgColor="FFD9EAF7") thin_side = Side(style="thin", color="FF7F9DB9") table_border = Border(left=thin_side, right=thin_side, top=thin_side, bottom=thin_side) for column_index, header in enumerate(headers, start=1): cell = worksheet.cell(row=1, column=column_index, value=header) cell.font = Font(name="Microsoft YaHei", size=12, bold=True, color="FF0F172A") cell.fill = header_fill cell.border = table_border cell.alignment = Alignment(horizontal="center", vertical="center", wrap_text=True) worksheet.row_dimensions[1].height = 30 for row_index, row in enumerate(rows, start=2): for column_index, value in enumerate(row, start=1): cell = worksheet.cell(row=row_index, column=column_index, value=value) cell.font = Font(name="Microsoft YaHei", size=11, color="FF0F172A") cell.border = table_border cell.alignment = Alignment(vertical="center", wrap_text=True) worksheet.row_dimensions[row_index].height = 30 for column_index, width in enumerate(column_widths, start=1): worksheet.column_dimensions[_column_letter(column_index)].width = width worksheet.freeze_panes = "A2" worksheet.sheet_view.zoomScale = 120 worksheet.sheet_view.zoomScaleNormal = 120 workbook_buffer = BytesIO() workbook.save(workbook_buffer) workbook.close() return workbook_buffer.getvalue() def _extract_lodging_rows(source_rows: list[tuple[Any, ...]]) -> list[list[object]]: header_index = -1 indexes: dict[str, int] = {} expected_headers = { "seq": "序号", "region": "地区", "city": "地区(城市)", "peak_period": "旺季期间", "p7": "公司级管理人员、高层经理(P7及以上)", "p4": "中层经理、基层经理(P4-P6、外聘专家)", "p1": "其他员工", "regular_limit": "超标限额", "peak_limit": "旺季超标限额", } for row_index, row in enumerate(source_rows[:10]): values = [str(value or "").strip() for value in row] if "地区(城市)" not in values: continue for key, label in expected_headers.items(): if label in values: indexes[key] = values.index(label) header_index = row_index break if header_index < 0 or "city" not in indexes: return [] rows: list[list[object]] = [] for row in source_rows[header_index + 1 :]: region = _row_value(row, indexes.get("region", -1)) raw_city = _row_value(row, indexes.get("city", -1)) cities = _split_location_names(raw_city) if not cities: continue period_by_city, shared_period = _parse_peak_periods( _row_value(row, indexes.get("peak_period", -1)) ) for city in cities: period = period_by_city.get(_normalize_period_key(city), shared_period) rows.append( [ _row_value(row, indexes.get("seq", -1)), region, city, period, _row_value(row, indexes.get("p7", -1)), _row_value(row, indexes.get("p4", -1)), _row_value(row, indexes.get("p1", -1)), _row_value(row, indexes.get("regular_limit", -1)), _row_value(row, indexes.get("peak_limit", -1)) if period else "", ] ) return rows def _fallback_lodging_rows(fallback_rows: list[list[object]]) -> list[list[object]]: rows: list[list[object]] = [] for index, row in enumerate(fallback_rows[1:], start=1): if len(row) >= 11: junior_amount = row[5] manager_amount = row[8] executive_amount = row[10] else: junior_amount = row[2] if len(row) > 2 else "" manager_amount = row[3] if len(row) > 3 else "" executive_amount = row[4] if len(row) > 4 else "" rows.append( [ index, "", row[0] if len(row) > 0 else "", "", executive_amount, manager_amount, junior_amount, executive_amount, "", ] ) return rows def _expand_lodging_grade_amounts(row: list[object]) -> list[object]: executive_amount = row[4] if len(row) > 4 else "" manager_amount = row[5] if len(row) > 5 else "" junior_amount = row[6] if len(row) > 6 else "" return [ junior_amount, junior_amount, junior_amount, junior_amount, manager_amount, manager_amount, manager_amount, executive_amount, executive_amount, ] def _grade_usage_note(grade: str) -> str: if grade == "P8": return "最高职级,适用于董事会" if grade in {"P6", "P7"}: return "适用于中高层管理人员" if grade in {"P4", "P5"}: return "适用于主管及基层管理人员" return "适用于员工序列" def _split_location_names(value: object) -> list[str]: text = str(value or "").strip() if not text: return [] text = re.sub(r"[((].*?[))]", "", text) text = re.sub(r"^\s*\d+\s*个中心城区[、,,]?", "", text) text = re.sub(r"[;;,,/]+", "、", text) names: list[str] = [] for part in text.split("、"): cleaned = _normalize_location_name(part) if not cleaned or cleaned == "中心城区": continue names.append(cleaned) return list(dict.fromkeys(names)) def _parse_peak_periods(value: object) -> tuple[dict[str, str], str]: text = str(value or "").strip() if not text: return ({}, "") period_by_city: dict[str, str] = {} for part in re.split(r"[;;]", text): if ":" not in part and ":" not in part: continue city, period = re.split(r"[::]", part, maxsplit=1) normalized_city = _normalize_period_key(city) normalized_period = _normalize_peak_period(period) if normalized_city and normalized_period: period_by_city[normalized_city] = normalized_period if period_by_city: return (period_by_city, "") return ({}, _normalize_peak_period(text)) def _normalize_peak_period(value: object) -> str: text = str(value or "").strip() text = re.sub(r"\s+", "", text) text = re.sub(r"(月|上旬|中旬|下旬)", "", text) text = re.sub(r"[、,;;]+", ",", text) text = re.sub(r"[^0-9,\-]", "", text) text = re.sub(r",{2,}", ",", text).strip(",") return text def _normalize_period_key(value: object) -> str: return _normalize_location_name(value).removesuffix("市") def _normalize_location_name(value: object) -> str: text = str(value or "").strip() text = re.sub(r"\s+", "", text) text = text.removesuffix("市") if text != "其他地区": text = text.removesuffix("地区") return text def _row_value(row: tuple[Any, ...], index: int) -> object: if index < 0 or index >= len(row): return "" return "" if row[index] is None else row[index] def _copy_worksheet(source_sheet, target_sheet) -> None: target_sheet.freeze_panes = source_sheet.freeze_panes target_sheet.sheet_format = copy(source_sheet.sheet_format) target_sheet.sheet_properties = copy(source_sheet.sheet_properties) target_sheet.page_margins = copy(source_sheet.page_margins) target_sheet.page_setup = copy(source_sheet.page_setup) target_sheet.print_options = copy(source_sheet.print_options) for row in source_sheet.iter_rows(): for source_cell in row: target_cell = target_sheet[source_cell.coordinate] target_cell.value = source_cell.value if source_cell.has_style: target_cell.font = copy(source_cell.font) target_cell.fill = copy(source_cell.fill) target_cell.border = copy(source_cell.border) target_cell.alignment = copy(source_cell.alignment) target_cell.protection = copy(source_cell.protection) target_cell.number_format = source_cell.number_format if source_cell.hyperlink: target_cell._hyperlink = copy(source_cell.hyperlink) if source_cell.comment: target_cell.comment = copy(source_cell.comment) for merged_range in source_sheet.merged_cells.ranges: target_sheet.merge_cells(str(merged_range)) for key, source_dimension in source_sheet.column_dimensions.items(): target_dimension = target_sheet.column_dimensions[key] target_dimension.width = source_dimension.width target_dimension.hidden = source_dimension.hidden target_dimension.bestFit = source_dimension.bestFit target_dimension.outlineLevel = source_dimension.outlineLevel target_dimension.collapsed = source_dimension.collapsed for index, source_dimension in source_sheet.row_dimensions.items(): target_dimension = target_sheet.row_dimensions[index] target_dimension.height = source_dimension.height target_dimension.hidden = source_dimension.hidden target_dimension.outlineLevel = source_dimension.outlineLevel target_dimension.collapsed = source_dimension.collapsed def _clarify_travel_source_sheet_headers(sheet_name: str, worksheet) -> None: if sheet_name == "交通工具等级标准": worksheet["A4"] = "P5+" worksheet["A5"] = "P1-P4" worksheet.row_dimensions[4].height = max(worksheet.row_dimensions[4].height or 0, 42) worksheet.row_dimensions[5].height = max(worksheet.row_dimensions[5].height or 0, 42) worksheet.column_dimensions["A"].width = max(worksheet.column_dimensions["A"].width or 0, 18) def _remove_redundant_title_row(worksheet, title: str) -> None: first_cell_value = str(worksheet["A1"].value or "").strip() if first_cell_value != str(title or "").strip(): return has_other_first_row_values = any( str(worksheet.cell(row=1, column=column_index).value or "").strip() for column_index in range(2, worksheet.max_column + 1) ) if has_other_first_row_values: return shifted_merged_ranges: list[tuple[int, int, int, int]] = [] for merged_range in list(worksheet.merged_cells.ranges): range_text = str(merged_range) min_col = merged_range.min_col min_row = merged_range.min_row max_col = merged_range.max_col max_row = merged_range.max_row worksheet.unmerge_cells(range_text) if min_row <= 1: continue shifted_merged_ranges.append((min_col, min_row - 1, max_col, max_row - 1)) old_freeze_panes = worksheet.freeze_panes worksheet.delete_rows(1, 1) for min_col, min_row, max_col, max_row in shifted_merged_ranges: worksheet.merge_cells( start_row=min_row, start_column=min_col, end_row=max_row, end_column=max_col, ) worksheet.freeze_panes = _shift_freeze_panes_after_deleted_first_row(old_freeze_panes) def _shift_freeze_panes_after_deleted_first_row(freeze_panes: object) -> str | None: if not freeze_panes: return None coordinate = str(freeze_panes) match = re.fullmatch(r"([A-Z]+)([0-9]+)", coordinate) if not match: return coordinate column, row_text = match.groups() row_index = int(row_text) if row_index <= 1: return None return f"{column}{row_index - 1}" def _column_letter(index: int) -> str: value = max(1, int(index)) result = "" while value > 0: value, remainder = divmod(value - 1, 26) result = f"{chr(65 + remainder)}{result}" return result