Files
X-Financial/server/src/app/services/employee_spreadsheet.py

369 lines
11 KiB
Python
Raw Normal View History

from __future__ import annotations
from dataclasses import dataclass
from datetime import date, datetime
from email.utils import parseaddr
from io import BytesIO
from typing import Any
from openpyxl import Workbook, load_workbook
EMPLOYEE_SHEET_NAME = "员工目录"
INSTRUCTION_SHEET_NAME = "填表说明"
EMPLOYEE_HEADERS: tuple[str, ...] = (
"员工编号*",
"姓名*",
"邮箱*",
"性别",
"出生日期",
"手机号",
"入职日期",
"办公地点",
"岗位*",
"职级*",
"部门编码",
"直属上级工号",
"财务归口",
"成本中心",
"在职状态*",
"角色编码",
)
HEADER_TO_FIELD: dict[str, str] = {
"员工编号*": "employee_no",
"姓名*": "name",
"邮箱*": "email",
"性别": "gender",
"出生日期": "birth_date",
"手机号": "phone",
"入职日期": "join_date",
"办公地点": "location",
"岗位*": "position",
"职级*": "grade",
"部门编码": "organization_unit_code",
"直属上级工号": "manager_employee_no",
"财务归口": "finance_owner_name",
"成本中心": "cost_center",
"在职状态*": "employment_status",
"角色编码": "role_codes",
}
VALID_EMPLOYMENT_STATUSES = {"在职", "试用中", "停用"}
DEFAULT_ROLE_CODES = ("user",)
MAX_IMPORT_ROWS = 2000
MAX_IMPORT_BYTES = 5 * 1024 * 1024
@dataclass(frozen=True)
class EmployeeImportRow:
row_number: int
employee_no: str
name: str
email: str
gender: str | None
birth_date: date | None
phone: str | None
join_date: date | None
location: str | None
position: str
grade: str
organization_unit_code: str | None
manager_employee_no: str | None
finance_owner_name: str | None
cost_center: str | None
employment_status: str
role_codes: list[str]
@dataclass(frozen=True)
class EmployeeSpreadsheetError:
row: int
column: str
employee_no: str
message: str
def build_import_template_bytes() -> bytes:
workbook = Workbook()
sheet = workbook.active
sheet.title = EMPLOYEE_SHEET_NAME
sheet.append(list(EMPLOYEE_HEADERS))
instructions = workbook.create_sheet(INSTRUCTION_SHEET_NAME)
instructions.append(["字段", "说明"])
instruction_rows = [
("员工编号*", "必填,全局唯一,导入时用于判断新建或覆盖。"),
("姓名*", "必填。"),
("邮箱*", "必填,全局唯一。"),
("性别", "可选:男、女,留空表示不填写。"),
("出生日期", "可选,格式 YYYY-MM-DD。"),
("手机号", "可选。"),
("入职日期", "可选,格式 YYYY-MM-DD。"),
("办公地点", "可选。"),
("岗位*", "必填。"),
("职级*", "必填,例如 P3、P5。"),
("部门编码", "可选,须与系统组织编码一致,例如 FINANCE-DEPT。"),
("直属上级工号", "可选,须为系统中已有员工编号,或出现在本次导入表中。"),
("财务归口", "可选。"),
("成本中心", "可选。"),
("在职状态*", "必填:在职、试用中、停用。"),
("角色编码", "可选,多个角色用英文逗号分隔,例如 user,finance留空默认为 user。"),
("导入规则", "全部校验通过后才写入数据库;任一行有错则整批不导入,原有数据保持不变。"),
]
for row in instruction_rows:
instructions.append(list(row))
buffer = BytesIO()
workbook.save(buffer)
return buffer.getvalue()
def build_export_workbook_bytes(rows: list[list[Any]]) -> bytes:
workbook = Workbook()
sheet = workbook.active
sheet.title = EMPLOYEE_SHEET_NAME
sheet.append(list(EMPLOYEE_HEADERS))
for row in rows:
sheet.append(row)
buffer = BytesIO()
workbook.save(buffer)
return buffer.getvalue()
def parse_employee_workbook(content: bytes) -> tuple[list[EmployeeImportRow], list[EmployeeSpreadsheetError]]:
errors: list[EmployeeSpreadsheetError] = []
if not content:
return [], [
EmployeeSpreadsheetError(
row=0,
column="文件",
employee_no="",
message="上传文件不能为空。",
)
]
if len(content) > MAX_IMPORT_BYTES:
return [], [
EmployeeSpreadsheetError(
row=0,
column="文件",
employee_no="",
message=f"文件大小不能超过 {MAX_IMPORT_BYTES // (1024 * 1024)}MB。",
)
]
try:
workbook = load_workbook(filename=BytesIO(content), read_only=True, data_only=True)
except Exception:
return [], [
EmployeeSpreadsheetError(
row=0,
column="文件",
employee_no="",
message="无法解析 Excel 文件,请使用系统提供的 .xlsx 模板。",
)
]
if EMPLOYEE_SHEET_NAME not in workbook.sheetnames:
return [], [
EmployeeSpreadsheetError(
row=0,
column="工作表",
employee_no="",
message=f"缺少工作表“{EMPLOYEE_SHEET_NAME}”。",
)
]
worksheet = workbook[EMPLOYEE_SHEET_NAME]
raw_rows = list(worksheet.iter_rows(values_only=True))
if not raw_rows:
return [], [
EmployeeSpreadsheetError(
row=0,
column="文件",
employee_no="",
message="Excel 中没有可导入的数据行。",
)
]
header_row = [_normalize_cell(value) for value in raw_rows[0]]
if list(header_row) != list(EMPLOYEE_HEADERS):
return [], [
EmployeeSpreadsheetError(
row=1,
column="表头",
employee_no="",
message="表头与员工导入模板不一致,请下载最新模板后重试。",
)
]
parsed_rows: list[EmployeeImportRow] = []
for index, raw_row in enumerate(raw_rows[1:], start=2):
if index - 1 > MAX_IMPORT_ROWS:
errors.append(
EmployeeSpreadsheetError(
row=index,
column="文件",
employee_no="",
message=f"单次最多导入 {MAX_IMPORT_ROWS} 行数据。",
)
)
break
if _is_empty_data_row(raw_row):
continue
row_errors, parsed = _parse_data_row(index, raw_row)
errors.extend(row_errors)
if parsed is not None:
parsed_rows.append(parsed)
if not parsed_rows and not errors:
errors.append(
EmployeeSpreadsheetError(
row=0,
column="文件",
employee_no="",
message="Excel 中没有可导入的数据行。",
)
)
return parsed_rows, errors
def _parse_data_row(
row_number: int,
raw_row: tuple[Any, ...],
) -> tuple[list[EmployeeSpreadsheetError], EmployeeImportRow | None]:
errors: list[EmployeeSpreadsheetError] = []
values = {
HEADER_TO_FIELD[header]: _normalize_cell(raw_row[index] if index < len(raw_row) else "")
for index, header in enumerate(EMPLOYEE_HEADERS)
}
employee_no = values["employee_no"]
def add_error(column: str, message: str) -> None:
errors.append(
EmployeeSpreadsheetError(
row=row_number,
column=column,
employee_no=employee_no,
message=message,
)
)
if not employee_no:
add_error("员工编号*", "员工编号不能为空。")
name = values["name"]
if not name:
add_error("姓名*", "姓名不能为空。")
email = values["email"].lower() if values["email"] else ""
if not email:
add_error("邮箱*", "邮箱不能为空。")
elif not _is_valid_email(email):
add_error("邮箱*", "邮箱格式不正确。")
position = values["position"]
if not position:
add_error("岗位*", "岗位不能为空。")
grade = values["grade"]
if not grade:
add_error("职级*", "职级不能为空。")
employment_status = values["employment_status"]
if not employment_status:
add_error("在职状态*", "在职状态不能为空。")
elif employment_status not in VALID_EMPLOYMENT_STATUSES:
add_error("在职状态*", "在职状态必须为:在职、试用中、停用。")
gender = values["gender"] or None
if gender and gender not in {"", ""}:
add_error("性别", "性别只能填写:男、女,或留空。")
birth_date, birth_error = _parse_optional_date(values["birth_date"], "出生日期")
if birth_error:
add_error("出生日期", birth_error)
join_date, join_error = _parse_optional_date(values["join_date"], "入职日期")
if join_error:
add_error("入职日期", join_error)
role_codes = _parse_role_codes(values["role_codes"])
if values["role_codes"] and not role_codes:
add_error("角色编码", "角色编码不能为空片段,多个角色请用英文逗号分隔。")
if errors:
return errors, None
return (
[],
EmployeeImportRow(
row_number=row_number,
employee_no=employee_no,
name=name,
email=email,
gender=gender,
birth_date=birth_date,
phone=values["phone"] or None,
join_date=join_date,
location=values["location"] or None,
position=position,
grade=grade,
organization_unit_code=values["organization_unit_code"] or None,
manager_employee_no=values["manager_employee_no"] or None,
finance_owner_name=values["finance_owner_name"] or None,
cost_center=values["cost_center"] or None,
employment_status=employment_status,
role_codes=role_codes or list(DEFAULT_ROLE_CODES),
),
)
def _parse_role_codes(value: str) -> list[str]:
if not value:
return []
codes = [item.strip() for item in value.replace("", ",").split(",")]
return list(dict.fromkeys(code for code in codes if code))
def _parse_optional_date(value: str, label: str) -> tuple[date | None, str | None]:
if not value:
return None, None
if isinstance(value, datetime):
return value.date(), None
if isinstance(value, date):
return value, None
text = str(value).strip()
try:
return datetime.strptime(text, "%Y-%m-%d").date(), None
except ValueError:
return None, f"{label}格式必须为 YYYY-MM-DD。"
def _is_valid_email(value: str) -> bool:
_, address = parseaddr(value)
return bool(address) and "@" in address
def _normalize_cell(value: Any) -> str:
if value is None:
return ""
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d")
if isinstance(value, date):
return value.strftime("%Y-%m-%d")
return str(value).strip()
def _is_empty_data_row(raw_row: tuple[Any, ...]) -> bool:
return not any(_normalize_cell(value) for value in raw_row)