154 lines
4.4 KiB
Python
154 lines
4.4 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from io import BytesIO
|
||
|
|
|
||
|
|
from openpyxl import Workbook
|
||
|
|
from sqlalchemy import create_engine, select
|
||
|
|
from sqlalchemy.orm import Session, sessionmaker
|
||
|
|
from sqlalchemy.pool import StaticPool
|
||
|
|
|
||
|
|
from app.db.base import Base
|
||
|
|
from app.models.employee import Employee
|
||
|
|
from app.services.employee import EmployeeService
|
||
|
|
from app.services.employee_spreadsheet import EMPLOYEE_HEADERS, EMPLOYEE_SHEET_NAME
|
||
|
|
|
||
|
|
|
||
|
|
def build_session() -> Session:
|
||
|
|
engine = create_engine(
|
||
|
|
"sqlite+pysqlite:///:memory:",
|
||
|
|
connect_args={"check_same_thread": False},
|
||
|
|
poolclass=StaticPool,
|
||
|
|
)
|
||
|
|
Base.metadata.create_all(bind=engine)
|
||
|
|
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
||
|
|
return session_factory()
|
||
|
|
|
||
|
|
|
||
|
|
def build_workbook_bytes(rows: list[list[object]]) -> bytes:
|
||
|
|
workbook = Workbook()
|
||
|
|
sheet = workbook.active
|
||
|
|
sheet.title = EMPLOYEE_SHEET_NAME
|
||
|
|
sheet.append(list(EMPLOYEE_HEADERS))
|
||
|
|
for row in rows:
|
||
|
|
sheet.append(row)
|
||
|
|
|
||
|
|
buffer = BytesIO()
|
||
|
|
workbook.save(buffer)
|
||
|
|
return buffer.getvalue()
|
||
|
|
|
||
|
|
|
||
|
|
def test_import_employees_rejects_invalid_row_without_writing() -> None:
|
||
|
|
with build_session() as db:
|
||
|
|
service = EmployeeService(db)
|
||
|
|
first = service.list_employees()[0]
|
||
|
|
|
||
|
|
content = build_workbook_bytes(
|
||
|
|
[
|
||
|
|
[
|
||
|
|
first.employeeNo,
|
||
|
|
"",
|
||
|
|
first.email,
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
first.position,
|
||
|
|
first.grade,
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
"",
|
||
|
|
"在职",
|
||
|
|
"user",
|
||
|
|
]
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
result = service.import_employees(content)
|
||
|
|
|
||
|
|
assert result.success is False
|
||
|
|
assert result.summary.errorCount >= 1
|
||
|
|
assert any("姓名" in item.message for item in result.errors)
|
||
|
|
refreshed = service.get_employee(first.id)
|
||
|
|
assert refreshed is not None
|
||
|
|
assert refreshed.name == first.name
|
||
|
|
|
||
|
|
|
||
|
|
def test_import_employees_updates_existing_employee() -> None:
|
||
|
|
with build_session() as db:
|
||
|
|
service = EmployeeService(db)
|
||
|
|
employee = service.list_employees()[0]
|
||
|
|
new_name = f"{employee.name}-导入"
|
||
|
|
|
||
|
|
content = build_workbook_bytes(
|
||
|
|
[
|
||
|
|
[
|
||
|
|
employee.employeeNo,
|
||
|
|
new_name,
|
||
|
|
employee.email,
|
||
|
|
"男",
|
||
|
|
"",
|
||
|
|
"13900000001",
|
||
|
|
"",
|
||
|
|
"上海",
|
||
|
|
employee.position,
|
||
|
|
employee.grade,
|
||
|
|
"FIN-SSC",
|
||
|
|
"",
|
||
|
|
"华东财务组",
|
||
|
|
"CC-TEST",
|
||
|
|
"在职",
|
||
|
|
"user",
|
||
|
|
]
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
result = service.import_employees(content, actor="测试管理员")
|
||
|
|
|
||
|
|
assert result.success is True
|
||
|
|
assert result.summary.updated == 1
|
||
|
|
updated = service.get_employee(employee.id)
|
||
|
|
assert updated is not None
|
||
|
|
assert updated.name == new_name
|
||
|
|
assert updated.phone == "13900000001"
|
||
|
|
|
||
|
|
|
||
|
|
def test_import_employees_creates_new_employee() -> None:
|
||
|
|
with build_session() as db:
|
||
|
|
service = EmployeeService(db)
|
||
|
|
service.list_employees()
|
||
|
|
|
||
|
|
content = build_workbook_bytes(
|
||
|
|
[
|
||
|
|
[
|
||
|
|
"E90001",
|
||
|
|
"导入新员工",
|
||
|
|
"import.new.user@xfinance.com",
|
||
|
|
"女",
|
||
|
|
"",
|
||
|
|
"13811112222",
|
||
|
|
"2025-01-01",
|
||
|
|
"上海",
|
||
|
|
"业务专员",
|
||
|
|
"P3",
|
||
|
|
"FIN-SSC",
|
||
|
|
"E10234",
|
||
|
|
"华东财务组",
|
||
|
|
"CC-9001",
|
||
|
|
"在职",
|
||
|
|
"user",
|
||
|
|
]
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
result = service.import_employees(content)
|
||
|
|
|
||
|
|
assert result.success is True
|
||
|
|
assert result.summary.created == 1
|
||
|
|
imported = db.execute(
|
||
|
|
select(Employee).where(Employee.employee_no == "E90001")
|
||
|
|
).scalar_one()
|
||
|
|
assert imported.name == "导入新员工"
|
||
|
|
assert imported.email == "import.new.user@xfinance.com"
|