- 修复员工创建时组织架构关联与邮箱校验逻辑 - 修复报销单API端点参数及预审流程调用 - 优化审批中心、差旅详情等前端页面交互 - 更新侧边栏导航与请求视图模型 - 补充员工服务与报销单相关测试用例
261 lines
9.4 KiB
Python
261 lines
9.4 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine, func, select
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
from app.core.security import verify_password
|
|
from app.db.base import Base
|
|
from app.models.employee import Employee
|
|
from app.models.employee_change_log import EmployeeChangeLog
|
|
from app.models.organization import OrganizationUnit
|
|
from app.models.role import Role
|
|
from app.schemas.employee import EmployeeUpdate
|
|
from app.services.employee import EmployeeService
|
|
|
|
|
|
def build_session() -> Session:
|
|
engine = create_engine(
|
|
"sqlite+pysqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
Base.metadata.create_all(bind=engine)
|
|
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
|
return session_factory()
|
|
|
|
|
|
def test_employee_directory_seeds_rich_employee_data() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
|
|
employees = service.list_employees()
|
|
meta = service.get_employee_meta()
|
|
|
|
assert len(employees) == 30
|
|
assert meta.totalEmployees == 30
|
|
assert any(item.status == "试用中" for item in employees)
|
|
assert any(item.status == "停用" for item in employees)
|
|
assert any("审批负责人" in item.roles for item in employees)
|
|
assert any(item.permissions for item in employees)
|
|
assert any(item.history for item in employees)
|
|
|
|
role_count = db.scalar(select(func.count()).select_from(Role))
|
|
org_count = db.scalar(select(func.count()).select_from(OrganizationUnit))
|
|
employee_count = db.scalar(select(func.count()).select_from(Employee))
|
|
history_count = db.scalar(select(func.count()).select_from(EmployeeChangeLog))
|
|
|
|
assert role_count == 6
|
|
assert org_count == 10
|
|
assert employee_count == 30
|
|
assert history_count and history_count >= 30
|
|
|
|
|
|
def test_employee_detail_contains_department_and_roles() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
detail = service.get_employee(employee.id)
|
|
|
|
assert detail is not None
|
|
assert detail.department
|
|
assert detail.manager
|
|
assert detail.organization is not None
|
|
assert detail.roles
|
|
|
|
|
|
def test_update_employee_persists_changes_and_hashes_password() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
|
|
updated = service.update_employee(
|
|
employee.id,
|
|
EmployeeUpdate(
|
|
name="测试员工A",
|
|
phone="13900001111",
|
|
location="深圳南山",
|
|
position="高级财务分析师",
|
|
grade="P6",
|
|
finance_owner_name="共享财务中心",
|
|
cost_center="CC-TEST-01",
|
|
role_codes=["finance", "user"],
|
|
password="12345",
|
|
),
|
|
)
|
|
|
|
persisted = db.get(Employee, employee.id)
|
|
|
|
assert updated.name == "测试员工A"
|
|
assert updated.phone == "13900001111"
|
|
assert updated.location == "深圳南山"
|
|
assert updated.position == "高级财务分析师"
|
|
assert updated.grade == "P6"
|
|
assert updated.financeOwner == "共享财务中心"
|
|
assert updated.costCenter == "CC-TEST-01"
|
|
assert updated.roleCodes == ["finance", "user"]
|
|
assert persisted is not None
|
|
assert persisted.password_hash is not None
|
|
assert verify_password("12345", persisted.password_hash)
|
|
assert any("更新员工信息" in item.action for item in updated.history)
|
|
assert any("重置员工登录密码" == item.action for item in updated.history)
|
|
|
|
|
|
def test_disable_employee_marks_status_and_logs_change() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = next(item for item in service.list_employees() if item.status != "停用")
|
|
|
|
updated = service.disable_employee(employee.id)
|
|
|
|
persisted = db.get(Employee, employee.id)
|
|
|
|
assert updated.status == "停用"
|
|
assert updated.statusTone == "neutral"
|
|
assert persisted is not None
|
|
assert persisted.employment_status == "停用"
|
|
assert any(item.action == "停用员工账号" for item in updated.history)
|
|
|
|
|
|
def test_enable_employee_restores_status_and_logs_change() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = next(item for item in service.list_employees() if item.status != "停用")
|
|
service.disable_employee(employee.id)
|
|
|
|
updated = service.enable_employee(employee.id)
|
|
|
|
persisted = db.get(Employee, employee.id)
|
|
|
|
assert updated.status == "在职"
|
|
assert updated.statusTone == "success"
|
|
assert persisted is not None
|
|
assert persisted.employment_status == "在职"
|
|
assert any(item.action == "启用员工账号" for item in updated.history)
|
|
|
|
|
|
def test_profile_repairs_do_not_run_on_every_list() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
|
|
updated = service.update_employee(
|
|
employee.id,
|
|
EmployeeUpdate(position="测试岗位-不会被回滚"),
|
|
)
|
|
|
|
listed = next(item for item in service.list_employees() if item.id == employee.id)
|
|
assert updated.position == "测试岗位-不会被回滚"
|
|
assert listed.position == "测试岗位-不会被回滚"
|
|
|
|
|
|
def test_role_update_appends_recent_history() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
current_codes = list(employee.roleCodes)
|
|
next_codes = ["finance", "user"] if "finance" not in current_codes else ["user"]
|
|
|
|
updated = service.update_employee(employee.id, EmployeeUpdate(role_codes=next_codes))
|
|
|
|
assert any("更新系统角色" in item.action for item in updated.history)
|
|
|
|
|
|
def test_employee_change_logs_keep_only_latest_five() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
persisted = db.get(Employee, employee.id)
|
|
assert persisted is not None
|
|
|
|
for index in range(7):
|
|
service._append_change_log(
|
|
persisted,
|
|
action=f"测试变更-{index}",
|
|
owner="单元测试",
|
|
)
|
|
|
|
db.commit()
|
|
service._trim_employee_change_logs(persisted.id)
|
|
db.commit()
|
|
hydrated = db.get(Employee, employee.id)
|
|
assert hydrated is not None
|
|
assert len(hydrated.change_logs) == 5
|
|
assert hydrated.change_logs[0].action == "测试变更-6"
|
|
|
|
|
|
def test_employee_meta_includes_organization_options() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
meta = service.get_employee_meta()
|
|
|
|
assert meta.organizationOptions
|
|
assert all(item.code and item.name for item in meta.organizationOptions)
|
|
|
|
|
|
def test_update_employee_changes_organization() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
organizations = service.repository.list_organization_units()
|
|
current_code = employee.organization.code if employee.organization else None
|
|
target = next(unit for unit in organizations if unit.unit_code != current_code)
|
|
|
|
updated = service.update_employee(
|
|
employee.id,
|
|
EmployeeUpdate(organization_unit_code=target.unit_code),
|
|
)
|
|
|
|
assert updated.organization is not None
|
|
assert updated.organization.code == target.unit_code
|
|
assert updated.department == target.name
|
|
assert any("更新员工信息" in item.action for item in updated.history)
|
|
|
|
|
|
def test_update_employee_rejects_unknown_organization() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
|
|
with pytest.raises(ValueError, match="部门编码"):
|
|
service.update_employee(
|
|
employee.id,
|
|
EmployeeUpdate(organization_unit_code="ORG-NOT-EXISTS"),
|
|
)
|
|
|
|
|
|
def test_update_employee_changes_manager() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employees = service.list_employees()
|
|
employee = employees[0]
|
|
manager = next(item for item in employees if item.id != employee.id)
|
|
|
|
updated = service.update_employee(
|
|
employee.id,
|
|
EmployeeUpdate(manager_employee_no=manager.employeeNo),
|
|
)
|
|
|
|
assert updated.managerEmployeeNo == manager.employeeNo
|
|
assert updated.manager == manager.name
|
|
|
|
|
|
def test_format_history_datetime_uses_local_timezone_without_seconds() -> None:
|
|
value = datetime(2026, 5, 20, 6, 30, 45, tzinfo=UTC)
|
|
formatted = EmployeeService._format_history_datetime(value)
|
|
|
|
assert formatted == "2026年5月20日14时30分"
|
|
assert "秒" not in formatted
|
|
|
|
|
|
def test_update_employee_rejects_invalid_date_format() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
|
|
with pytest.raises(ValueError, match="出生日期格式必须为 YYYY-MM-DD。"):
|
|
service.update_employee(employee.id, EmployeeUpdate(birth_date="2024/01/01"))
|