Files
X-Financial/server/tests/test_employee_service.py

303 lines
11 KiB
Python
Raw Normal View History

from __future__ import annotations
from datetime import UTC, datetime
import pytest
from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.security import verify_password
from app.db.base import Base
from app.models.employee import Employee
from app.models.employee_change_log import EmployeeChangeLog
from app.models.organization import OrganizationUnit
from app.models.role import Role
from app.schemas.employee import EmployeeUpdate
from app.services.employee import EmployeeService
from app.services.employee_seed import CANONICAL_DEPARTMENT_CODES
from app.services.employee_time import format_history_datetime
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return session_factory()
def test_employee_directory_seeds_rich_employee_data() -> None:
with build_session() as db:
service = EmployeeService(db)
employees = service.list_employees()
meta = service.get_employee_meta()
assert len(employees) == 30
assert meta.totalEmployees == 30
assert any(item.status == "试用中" for item in employees)
assert any(item.status == "停用" for item in employees)
assert any("审批负责人" in item.roles for item in employees)
assert any(item.permissions for item in employees)
assert any(item.history for item in employees)
role_count = db.scalar(select(func.count()).select_from(Role))
org_count = db.scalar(select(func.count()).select_from(OrganizationUnit))
employee_count = db.scalar(select(func.count()).select_from(Employee))
history_count = db.scalar(select(func.count()).select_from(EmployeeChangeLog))
assert role_count == 6
assert org_count == 7
assert employee_count == 30
assert history_count and history_count >= 30
def test_employee_detail_contains_department_and_roles() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
detail = service.get_employee(employee.id)
assert detail is not None
assert detail.department
assert detail.manager
assert detail.organization is not None
assert detail.roles
def test_update_employee_persists_changes_and_hashes_password() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
updated = service.update_employee(
employee.id,
EmployeeUpdate(
name="测试员工A",
phone="13900001111",
location="深圳南山",
position="高级财务分析师",
grade="P6",
finance_owner_name="共享财务中心",
cost_center="CC-TEST-01",
role_codes=["finance", "user"],
password="12345",
),
)
persisted = db.get(Employee, employee.id)
assert updated.name == "测试员工A"
assert updated.phone == "13900001111"
assert updated.location == "深圳南山"
assert updated.position == "高级财务分析师"
assert updated.grade == "P6"
assert updated.financeOwner == "共享财务中心"
assert updated.costCenter == "CC-TEST-01"
assert updated.roleCodes == ["finance", "user"]
assert persisted is not None
assert persisted.password_hash is not None
assert verify_password("12345", persisted.password_hash)
assert any("更新员工信息" in item.action for item in updated.history)
assert any("重置员工登录密码" == item.action for item in updated.history)
def test_disable_employee_marks_status_and_logs_change() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = next(item for item in service.list_employees() if item.status != "停用")
updated = service.disable_employee(employee.id)
persisted = db.get(Employee, employee.id)
assert updated.status == "停用"
assert updated.statusTone == "neutral"
assert persisted is not None
assert persisted.employment_status == "停用"
assert any(item.action == "停用员工账号" for item in updated.history)
def test_enable_employee_restores_status_and_logs_change() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = next(item for item in service.list_employees() if item.status != "停用")
service.disable_employee(employee.id)
updated = service.enable_employee(employee.id)
persisted = db.get(Employee, employee.id)
assert updated.status == "在职"
assert updated.statusTone == "success"
assert persisted is not None
assert persisted.employment_status == "在职"
assert any(item.action == "启用员工账号" for item in updated.history)
def test_profile_repairs_do_not_run_on_every_list() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
updated = service.update_employee(
employee.id,
EmployeeUpdate(position="测试岗位-不会被回滚"),
)
listed = next(item for item in service.list_employees() if item.id == employee.id)
assert updated.position == "测试岗位-不会被回滚"
assert listed.position == "测试岗位-不会被回滚"
def test_role_update_appends_recent_history() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
current_codes = list(employee.roleCodes)
next_codes = ["finance", "user"] if "finance" not in current_codes else ["user"]
updated = service.update_employee(employee.id, EmployeeUpdate(role_codes=next_codes))
assert any("更新系统角色" in item.action for item in updated.history)
def test_employee_change_logs_keep_only_latest_five() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
persisted = db.get(Employee, employee.id)
assert persisted is not None
for index in range(7):
service._append_change_log(
persisted,
action=f"测试变更-{index}",
owner="单元测试",
)
db.commit()
service._trim_employee_change_logs(persisted.id)
db.commit()
hydrated = db.get(Employee, employee.id)
assert hydrated is not None
assert len(hydrated.change_logs) == 5
assert hydrated.change_logs[0].action == "测试变更-6"
def test_employee_meta_includes_organization_options() -> None:
with build_session() as db:
service = EmployeeService(db)
meta = service.get_employee_meta()
assert meta.organizationOptions
assert all(item.code and item.name for item in meta.organizationOptions)
assert [item.name for item in meta.organizationOptions] == [
"人力资源部",
"市场部",
"总裁办",
"技术部",
"生产部",
"财务部",
]
def test_employee_directory_normalizes_legacy_departments() -> None:
with build_session() as db:
service = EmployeeService(db)
service.list_employees()
legacy_department = OrganizationUnit(
unit_code="RND-CENTER",
name="产品研发中心",
unit_type="department",
)
employee = db.execute(
select(Employee).where(Employee.employee_no == "E11745")
).scalar_one()
employee.organization_unit = legacy_department
db.add(legacy_department)
db.commit()
refreshed = next(
item for item in service.list_employees() if item.employeeNo == "E11745"
)
meta = service.get_employee_meta()
assert refreshed.department == "技术部"
assert refreshed.organization is not None
assert refreshed.organization.code == "TECH-DEPT"
assert "RND-CENTER" not in {item.code for item in meta.organizationOptions}
def test_update_employee_changes_organization() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
organizations = service.repository.list_organization_units()
current_code = employee.organization.code if employee.organization else None
target = next(
unit
for unit in organizations
if unit.unit_code in CANONICAL_DEPARTMENT_CODES and unit.unit_code != current_code
)
updated = service.update_employee(
employee.id,
EmployeeUpdate(organization_unit_code=target.unit_code),
)
assert updated.organization is not None
assert updated.organization.code == target.unit_code
assert updated.department == target.name
assert any("更新员工信息" in item.action for item in updated.history)
def test_update_employee_rejects_unknown_organization() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
with pytest.raises(ValueError, match="部门编码"):
service.update_employee(
employee.id,
EmployeeUpdate(organization_unit_code="ORG-NOT-EXISTS"),
)
def test_update_employee_changes_manager() -> None:
with build_session() as db:
service = EmployeeService(db)
employees = service.list_employees()
employee = employees[0]
manager = next(item for item in employees if item.id != employee.id)
updated = service.update_employee(
employee.id,
EmployeeUpdate(manager_employee_no=manager.employeeNo),
)
assert updated.managerEmployeeNo == manager.employeeNo
assert updated.manager == manager.name
def test_format_history_datetime_uses_local_timezone_without_seconds() -> None:
value = datetime(2026, 5, 20, 6, 30, 45, tzinfo=UTC)
formatted = format_history_datetime(value)
assert formatted == "2026年5月20日14时30分"
assert "" not in formatted
def test_update_employee_rejects_invalid_date_format() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
with pytest.raises(ValueError, match="出生日期格式必须为 YYYY-MM-DD。"):
service.update_employee(employee.id, EmployeeUpdate(birth_date="2024/01/01"))