from __future__ import annotations from io import BytesIO from openpyxl import Workbook from sqlalchemy import create_engine, select from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.db.base import Base from app.models.employee import Employee from app.services.employee import EmployeeService from app.services.employee_spreadsheet import EMPLOYEE_HEADERS, EMPLOYEE_SHEET_NAME def build_session() -> Session: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) return session_factory() def build_workbook_bytes(rows: list[list[object]]) -> bytes: workbook = Workbook() sheet = workbook.active sheet.title = EMPLOYEE_SHEET_NAME sheet.append(list(EMPLOYEE_HEADERS)) for row in rows: sheet.append(row) buffer = BytesIO() workbook.save(buffer) return buffer.getvalue() def test_import_employees_rejects_invalid_row_without_writing() -> None: with build_session() as db: service = EmployeeService(db) first = service.list_employees()[0] content = build_workbook_bytes( [ [ first.employeeNo, "", first.email, "", "", "", "", "", first.position, first.grade, "", "", "", "", "在职", "user", ] ] ) result = service.import_employees(content) assert result.success is False assert result.summary.errorCount >= 1 assert any("姓名" in item.message for item in result.errors) refreshed = service.get_employee(first.id) assert refreshed is not None assert refreshed.name == first.name def test_import_employees_updates_existing_employee() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] new_name = f"{employee.name}-导入" content = build_workbook_bytes( [ [ employee.employeeNo, new_name, employee.email, "男", "", "13900000001", "", "上海", employee.position, employee.grade, "FIN-SSC", "", "华东财务组", "CC-TEST", "在职", "user", ] ] ) result = service.import_employees(content, actor="测试管理员") assert result.success is True assert result.summary.updated == 1 updated = service.get_employee(employee.id) assert updated is not None assert updated.name == new_name assert updated.phone == "13900000001" def test_import_employees_creates_new_employee() -> None: with build_session() as db: service = EmployeeService(db) service.list_employees() content = build_workbook_bytes( [ [ "E90001", "导入新员工", "import.new.user@xfinance.com", "女", "", "13811112222", "2025-01-01", "上海", "业务专员", "P3", "FIN-SSC", "E10234", "华东财务组", "CC-9001", "在职", "user", ] ] ) result = service.import_employees(content) assert result.success is True assert result.summary.created == 1 imported = db.execute( select(Employee).where(Employee.employee_no == "E90001") ).scalar_one() assert imported.name == "导入新员工" assert imported.email == "import.new.user@xfinance.com"