Files
X-Financial/server/tests/test_employee_service.py
caoxiaozhu 8a4a777be7 feat: 新增员工行为画像算法与费用风险标签体系
后端新增员工行为画像算法模块,支持标签规则引擎和评分计算,
完善员工模型、银行信息、序列化和导入逻辑,优化报销审批流
和工作流常量,增强 Hermes 同步和知识同步能力,前端新增费
用画像详情弹窗、雷达图和风险卡片组件,完善登录页和工作台
样式,优化文档中心和归档中心交互,补充单元测试。
2026-05-28 12:09:49 +08:00

310 lines
11 KiB
Python

from __future__ import annotations
from datetime import UTC, datetime
import pytest
from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.security import verify_password
from app.db.base import Base
from app.models.employee import Employee
from app.models.employee_change_log import EmployeeChangeLog
from app.models.organization import OrganizationUnit
from app.models.role import Role
from app.schemas.employee import EmployeeUpdate
from app.services.employee import EmployeeService
from app.services.employee_seed import CANONICAL_DEPARTMENT_CODES
from app.services.employee_time import format_history_datetime
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return session_factory()
def test_employee_directory_seeds_rich_employee_data() -> None:
with build_session() as db:
service = EmployeeService(db)
employees = service.list_employees()
meta = service.get_employee_meta()
assert len(employees) == 30
assert meta.totalEmployees == 30
assert any(item.status == "试用中" for item in employees)
assert any(item.status == "停用" for item in employees)
assert any("审批负责人" in item.roles for item in employees)
assert any(item.permissions for item in employees)
assert any(item.history for item in employees)
assert all(item.bankName and item.bankAccountNo and item.bankAccountName for item in employees)
role_count = db.scalar(select(func.count()).select_from(Role))
org_count = db.scalar(select(func.count()).select_from(OrganizationUnit))
employee_count = db.scalar(select(func.count()).select_from(Employee))
history_count = db.scalar(select(func.count()).select_from(EmployeeChangeLog))
assert role_count == 6
assert org_count == 7
assert employee_count == 30
assert history_count and history_count >= 30
def test_employee_detail_contains_department_and_roles() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
detail = service.get_employee(employee.id)
assert detail is not None
assert detail.department
assert detail.manager
assert detail.organization is not None
assert detail.roles
def test_update_employee_persists_changes_and_hashes_password() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
updated = service.update_employee(
employee.id,
EmployeeUpdate(
name="测试员工A",
phone="13900001111",
location="深圳南山",
position="高级财务分析师",
grade="P6",
finance_owner_name="共享财务中心",
cost_center="CC-TEST-01",
bank_account_name="测试员工A",
bank_name="招商银行上海分行",
bank_account_no="622588000000000001",
role_codes=["finance", "user"],
password="12345",
),
)
persisted = db.get(Employee, employee.id)
assert updated.name == "测试员工A"
assert updated.phone == "13900001111"
assert updated.location == "深圳南山"
assert updated.position == "高级财务分析师"
assert updated.grade == "P6"
assert updated.financeOwner == "共享财务中心"
assert updated.costCenter == "CC-TEST-01"
assert updated.bankAccountName == "测试员工A"
assert updated.bankName == "招商银行上海分行"
assert updated.bankAccountNo == "622588000000000001"
assert updated.roleCodes == ["finance", "user"]
assert persisted is not None
assert persisted.password_hash is not None
assert verify_password("12345", persisted.password_hash)
assert any("更新员工信息" in item.action for item in updated.history)
assert any("重置员工登录密码" == item.action for item in updated.history)
def test_disable_employee_marks_status_and_logs_change() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = next(item for item in service.list_employees() if item.status != "停用")
updated = service.disable_employee(employee.id)
persisted = db.get(Employee, employee.id)
assert updated.status == "停用"
assert updated.statusTone == "neutral"
assert persisted is not None
assert persisted.employment_status == "停用"
assert any(item.action == "停用员工账号" for item in updated.history)
def test_enable_employee_restores_status_and_logs_change() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = next(item for item in service.list_employees() if item.status != "停用")
service.disable_employee(employee.id)
updated = service.enable_employee(employee.id)
persisted = db.get(Employee, employee.id)
assert updated.status == "在职"
assert updated.statusTone == "success"
assert persisted is not None
assert persisted.employment_status == "在职"
assert any(item.action == "启用员工账号" for item in updated.history)
def test_profile_repairs_do_not_run_on_every_list() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
updated = service.update_employee(
employee.id,
EmployeeUpdate(position="测试岗位-不会被回滚"),
)
listed = next(item for item in service.list_employees() if item.id == employee.id)
assert updated.position == "测试岗位-不会被回滚"
assert listed.position == "测试岗位-不会被回滚"
def test_role_update_appends_recent_history() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
current_codes = list(employee.roleCodes)
next_codes = ["finance", "user"] if "finance" not in current_codes else ["user"]
updated = service.update_employee(employee.id, EmployeeUpdate(role_codes=next_codes))
assert any("更新系统角色" in item.action for item in updated.history)
def test_employee_change_logs_keep_only_latest_five() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
persisted = db.get(Employee, employee.id)
assert persisted is not None
for index in range(7):
service._append_change_log(
persisted,
action=f"测试变更-{index}",
owner="单元测试",
)
db.commit()
service._trim_employee_change_logs(persisted.id)
db.commit()
hydrated = db.get(Employee, employee.id)
assert hydrated is not None
assert len(hydrated.change_logs) == 5
assert hydrated.change_logs[0].action == "测试变更-6"
def test_employee_meta_includes_organization_options() -> None:
with build_session() as db:
service = EmployeeService(db)
meta = service.get_employee_meta()
assert meta.organizationOptions
assert all(item.code and item.name for item in meta.organizationOptions)
assert [item.name for item in meta.organizationOptions] == [
"人力资源部",
"市场部",
"总裁办",
"技术部",
"生产部",
"财务部",
]
def test_employee_directory_normalizes_legacy_departments() -> None:
with build_session() as db:
service = EmployeeService(db)
service.list_employees()
legacy_department = OrganizationUnit(
unit_code="RND-CENTER",
name="产品研发中心",
unit_type="department",
)
employee = db.execute(
select(Employee).where(Employee.employee_no == "E11745")
).scalar_one()
employee.organization_unit = legacy_department
db.add(legacy_department)
db.commit()
refreshed = next(
item for item in service.list_employees() if item.employeeNo == "E11745"
)
meta = service.get_employee_meta()
assert refreshed.department == "技术部"
assert refreshed.organization is not None
assert refreshed.organization.code == "TECH-DEPT"
assert "RND-CENTER" not in {item.code for item in meta.organizationOptions}
def test_update_employee_changes_organization() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
organizations = service.repository.list_organization_units()
current_code = employee.organization.code if employee.organization else None
target = next(
unit
for unit in organizations
if unit.unit_code in CANONICAL_DEPARTMENT_CODES and unit.unit_code != current_code
)
updated = service.update_employee(
employee.id,
EmployeeUpdate(organization_unit_code=target.unit_code),
)
assert updated.organization is not None
assert updated.organization.code == target.unit_code
assert updated.department == target.name
assert any("更新员工信息" in item.action for item in updated.history)
def test_update_employee_rejects_unknown_organization() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
with pytest.raises(ValueError, match="部门编码"):
service.update_employee(
employee.id,
EmployeeUpdate(organization_unit_code="ORG-NOT-EXISTS"),
)
def test_update_employee_changes_manager() -> None:
with build_session() as db:
service = EmployeeService(db)
employees = service.list_employees()
employee = employees[0]
manager = next(item for item in employees if item.id != employee.id)
updated = service.update_employee(
employee.id,
EmployeeUpdate(manager_employee_no=manager.employeeNo),
)
assert updated.managerEmployeeNo == manager.employeeNo
assert updated.manager == manager.name
def test_format_history_datetime_uses_local_timezone_without_seconds() -> None:
value = datetime(2026, 5, 20, 6, 30, 45, tzinfo=UTC)
formatted = format_history_datetime(value)
assert formatted == "2026年5月20日14时30分"
assert "" not in formatted
def test_update_employee_rejects_invalid_date_format() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
with pytest.raises(ValueError, match="出生日期格式必须为 YYYY-MM-DD。"):
service.update_employee(employee.id, EmployeeUpdate(birth_date="2024/01/01"))