2026-05-07 11:50:10 +08:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
2026-05-14 02:23:33 +00:00
|
|
|
import pytest
|
2026-05-07 11:50:10 +08:00
|
|
|
from sqlalchemy import create_engine, func, select
|
|
|
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
|
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
|
|
2026-05-07 13:48:00 +08:00
|
|
|
from app.core.security import verify_password
|
2026-05-07 11:50:10 +08:00
|
|
|
from app.db.base import Base
|
|
|
|
|
from app.models.employee import Employee
|
|
|
|
|
from app.models.employee_change_log import EmployeeChangeLog
|
|
|
|
|
from app.models.organization import OrganizationUnit
|
|
|
|
|
from app.models.role import Role
|
2026-05-07 13:48:00 +08:00
|
|
|
from app.schemas.employee import EmployeeUpdate
|
2026-05-07 11:50:10 +08:00
|
|
|
from app.services.employee import EmployeeService
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_session() -> Session:
|
|
|
|
|
engine = create_engine(
|
|
|
|
|
"sqlite+pysqlite:///:memory:",
|
|
|
|
|
connect_args={"check_same_thread": False},
|
|
|
|
|
poolclass=StaticPool,
|
|
|
|
|
)
|
|
|
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
|
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
|
|
|
|
return session_factory()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_employee_directory_seeds_rich_employee_data() -> None:
|
|
|
|
|
with build_session() as db:
|
|
|
|
|
service = EmployeeService(db)
|
|
|
|
|
|
|
|
|
|
employees = service.list_employees()
|
|
|
|
|
meta = service.get_employee_meta()
|
|
|
|
|
|
|
|
|
|
assert len(employees) == 30
|
|
|
|
|
assert meta.totalEmployees == 30
|
|
|
|
|
assert any(item.status == "试用中" for item in employees)
|
|
|
|
|
assert any(item.status == "停用" for item in employees)
|
|
|
|
|
assert any("审批负责人" in item.roles for item in employees)
|
|
|
|
|
assert any(item.permissions for item in employees)
|
|
|
|
|
assert any(item.history for item in employees)
|
|
|
|
|
|
|
|
|
|
role_count = db.scalar(select(func.count()).select_from(Role))
|
|
|
|
|
org_count = db.scalar(select(func.count()).select_from(OrganizationUnit))
|
|
|
|
|
employee_count = db.scalar(select(func.count()).select_from(Employee))
|
|
|
|
|
history_count = db.scalar(select(func.count()).select_from(EmployeeChangeLog))
|
|
|
|
|
|
|
|
|
|
assert role_count == 6
|
|
|
|
|
assert org_count == 10
|
|
|
|
|
assert employee_count == 30
|
|
|
|
|
assert history_count and history_count >= 30
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_employee_detail_contains_department_and_roles() -> None:
|
|
|
|
|
with build_session() as db:
|
|
|
|
|
service = EmployeeService(db)
|
|
|
|
|
employee = service.list_employees()[0]
|
|
|
|
|
detail = service.get_employee(employee.id)
|
|
|
|
|
|
|
|
|
|
assert detail is not None
|
|
|
|
|
assert detail.department
|
|
|
|
|
assert detail.manager
|
|
|
|
|
assert detail.organization is not None
|
|
|
|
|
assert detail.roles
|
2026-05-07 13:48:00 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_update_employee_persists_changes_and_hashes_password() -> None:
|
|
|
|
|
with build_session() as db:
|
|
|
|
|
service = EmployeeService(db)
|
|
|
|
|
employee = service.list_employees()[0]
|
|
|
|
|
|
|
|
|
|
updated = service.update_employee(
|
|
|
|
|
employee.id,
|
|
|
|
|
EmployeeUpdate(
|
|
|
|
|
name="测试员工A",
|
|
|
|
|
phone="13900001111",
|
|
|
|
|
location="深圳南山",
|
|
|
|
|
position="高级财务分析师",
|
|
|
|
|
grade="P6",
|
|
|
|
|
finance_owner_name="共享财务中心",
|
|
|
|
|
cost_center="CC-TEST-01",
|
|
|
|
|
role_codes=["finance", "user"],
|
|
|
|
|
password="12345",
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
persisted = db.get(Employee, employee.id)
|
|
|
|
|
|
|
|
|
|
assert updated.name == "测试员工A"
|
|
|
|
|
assert updated.phone == "13900001111"
|
|
|
|
|
assert updated.location == "深圳南山"
|
|
|
|
|
assert updated.position == "高级财务分析师"
|
|
|
|
|
assert updated.grade == "P6"
|
|
|
|
|
assert updated.financeOwner == "共享财务中心"
|
|
|
|
|
assert updated.costCenter == "CC-TEST-01"
|
|
|
|
|
assert updated.roleCodes == ["finance", "user"]
|
|
|
|
|
assert persisted is not None
|
|
|
|
|
assert persisted.password_hash is not None
|
|
|
|
|
assert verify_password("12345", persisted.password_hash)
|
|
|
|
|
assert any("更新员工信息" in item.action for item in updated.history)
|
|
|
|
|
assert any("重置员工登录密码" == item.action for item in updated.history)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_disable_employee_marks_status_and_logs_change() -> None:
|
|
|
|
|
with build_session() as db:
|
|
|
|
|
service = EmployeeService(db)
|
|
|
|
|
employee = next(item for item in service.list_employees() if item.status != "停用")
|
|
|
|
|
|
|
|
|
|
updated = service.disable_employee(employee.id)
|
|
|
|
|
|
|
|
|
|
persisted = db.get(Employee, employee.id)
|
|
|
|
|
|
|
|
|
|
assert updated.status == "停用"
|
|
|
|
|
assert updated.statusTone == "neutral"
|
|
|
|
|
assert persisted is not None
|
|
|
|
|
assert persisted.employment_status == "停用"
|
|
|
|
|
assert any(item.action == "停用员工账号" for item in updated.history)
|
2026-05-14 02:23:33 +00:00
|
|
|
|
|
|
|
|
|
2026-05-14 02:57:00 +00:00
|
|
|
def test_enable_employee_restores_status_and_logs_change() -> None:
|
|
|
|
|
with build_session() as db:
|
|
|
|
|
service = EmployeeService(db)
|
|
|
|
|
employee = next(item for item in service.list_employees() if item.status != "停用")
|
|
|
|
|
service.disable_employee(employee.id)
|
|
|
|
|
|
|
|
|
|
updated = service.enable_employee(employee.id)
|
|
|
|
|
|
|
|
|
|
persisted = db.get(Employee, employee.id)
|
|
|
|
|
|
|
|
|
|
assert updated.status == "在职"
|
|
|
|
|
assert updated.statusTone == "success"
|
|
|
|
|
assert persisted is not None
|
|
|
|
|
assert persisted.employment_status == "在职"
|
|
|
|
|
assert any(item.action == "启用员工账号" for item in updated.history)
|
|
|
|
|
|
|
|
|
|
|
2026-05-14 02:23:33 +00:00
|
|
|
def test_update_employee_rejects_invalid_date_format() -> None:
|
|
|
|
|
with build_session() as db:
|
|
|
|
|
service = EmployeeService(db)
|
|
|
|
|
employee = service.list_employees()[0]
|
|
|
|
|
|
|
|
|
|
with pytest.raises(ValueError, match="出生日期格式必须为 YYYY-MM-DD。"):
|
|
|
|
|
service.update_employee(employee.id, EmployeeUpdate(birth_date="2024/01/01"))
|