from __future__ import annotations from datetime import UTC, datetime import pytest from sqlalchemy import create_engine, func, select from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.pool import StaticPool from app.core.security import verify_password from app.db.base import Base from app.models.employee import Employee from app.models.employee_change_log import EmployeeChangeLog from app.models.organization import OrganizationUnit from app.models.role import Role from app.schemas.employee import EmployeeUpdate from app.services.employee import EmployeeService from app.services.employee_seed import CANONICAL_DEPARTMENT_CODES from app.services.employee_time import format_history_datetime def build_session() -> Session: engine = create_engine( "sqlite+pysqlite:///:memory:", connect_args={"check_same_thread": False}, poolclass=StaticPool, ) Base.metadata.create_all(bind=engine) session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False) return session_factory() def test_employee_directory_seeds_rich_employee_data() -> None: with build_session() as db: service = EmployeeService(db) employees = service.list_employees() meta = service.get_employee_meta() assert len(employees) == 30 assert meta.totalEmployees == 30 assert any(item.status == "试用中" for item in employees) assert any(item.status == "停用" for item in employees) assert any("审批负责人" in item.roles for item in employees) assert any(item.permissions for item in employees) assert any(item.history for item in employees) assert all(item.bankName and item.bankAccountNo and item.bankAccountName for item in employees) role_count = db.scalar(select(func.count()).select_from(Role)) org_count = db.scalar(select(func.count()).select_from(OrganizationUnit)) employee_count = db.scalar(select(func.count()).select_from(Employee)) history_count = db.scalar(select(func.count()).select_from(EmployeeChangeLog)) assert role_count == 6 assert org_count == 7 assert employee_count == 30 assert history_count and history_count >= 30 def test_employee_detail_contains_department_and_roles() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] detail = service.get_employee(employee.id) assert detail is not None assert detail.department assert detail.manager assert detail.organization is not None assert detail.roles def test_update_employee_persists_changes_and_hashes_password() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] updated = service.update_employee( employee.id, EmployeeUpdate( name="测试员工A", phone="13900001111", location="深圳南山", position="高级财务分析师", grade="P6", finance_owner_name="共享财务中心", cost_center="CC-TEST-01", bank_account_name="测试员工A", bank_name="招商银行上海分行", bank_account_no="622588000000000001", role_codes=["finance", "user"], password="12345", ), ) persisted = db.get(Employee, employee.id) assert updated.name == "测试员工A" assert updated.phone == "13900001111" assert updated.location == "深圳南山" assert updated.position == "高级财务分析师" assert updated.grade == "P6" assert updated.financeOwner == "共享财务中心" assert updated.costCenter == "CC-TEST-01" assert updated.bankAccountName == "测试员工A" assert updated.bankName == "招商银行上海分行" assert updated.bankAccountNo == "622588000000000001" assert updated.roleCodes == ["finance", "user"] assert persisted is not None assert persisted.password_hash is not None assert verify_password("12345", persisted.password_hash) assert any("更新员工信息" in item.action for item in updated.history) assert any("重置员工登录密码" == item.action for item in updated.history) def test_disable_employee_marks_status_and_logs_change() -> None: with build_session() as db: service = EmployeeService(db) employee = next(item for item in service.list_employees() if item.status != "停用") updated = service.disable_employee(employee.id) persisted = db.get(Employee, employee.id) assert updated.status == "停用" assert updated.statusTone == "neutral" assert persisted is not None assert persisted.employment_status == "停用" assert any(item.action == "停用员工账号" for item in updated.history) def test_enable_employee_restores_status_and_logs_change() -> None: with build_session() as db: service = EmployeeService(db) employee = next(item for item in service.list_employees() if item.status != "停用") service.disable_employee(employee.id) updated = service.enable_employee(employee.id) persisted = db.get(Employee, employee.id) assert updated.status == "在职" assert updated.statusTone == "success" assert persisted is not None assert persisted.employment_status == "在职" assert any(item.action == "启用员工账号" for item in updated.history) def test_profile_repairs_do_not_run_on_every_list() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] updated = service.update_employee( employee.id, EmployeeUpdate(position="测试岗位-不会被回滚"), ) listed = next(item for item in service.list_employees() if item.id == employee.id) assert updated.position == "测试岗位-不会被回滚" assert listed.position == "测试岗位-不会被回滚" def test_role_update_appends_recent_history() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] current_codes = list(employee.roleCodes) next_codes = ["finance", "user"] if "finance" not in current_codes else ["user"] updated = service.update_employee(employee.id, EmployeeUpdate(role_codes=next_codes)) assert any("更新系统角色" in item.action for item in updated.history) def test_employee_change_logs_keep_only_latest_five() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] persisted = db.get(Employee, employee.id) assert persisted is not None for index in range(7): service._append_change_log( persisted, action=f"测试变更-{index}", owner="单元测试", ) db.commit() service._trim_employee_change_logs(persisted.id) db.commit() hydrated = db.get(Employee, employee.id) assert hydrated is not None assert len(hydrated.change_logs) == 5 assert hydrated.change_logs[0].action == "测试变更-6" def test_employee_meta_includes_organization_options() -> None: with build_session() as db: service = EmployeeService(db) meta = service.get_employee_meta() assert meta.organizationOptions assert all(item.code and item.name for item in meta.organizationOptions) assert [item.name for item in meta.organizationOptions] == [ "人力资源部", "市场部", "总裁办", "技术部", "生产部", "财务部", ] def test_employee_directory_normalizes_legacy_departments() -> None: with build_session() as db: service = EmployeeService(db) service.list_employees() legacy_department = OrganizationUnit( unit_code="RND-CENTER", name="产品研发中心", unit_type="department", ) employee = db.execute( select(Employee).where(Employee.employee_no == "E11745") ).scalar_one() employee.organization_unit = legacy_department db.add(legacy_department) db.commit() refreshed = next( item for item in service.list_employees() if item.employeeNo == "E11745" ) meta = service.get_employee_meta() assert refreshed.department == "技术部" assert refreshed.organization is not None assert refreshed.organization.code == "TECH-DEPT" assert "RND-CENTER" not in {item.code for item in meta.organizationOptions} def test_update_employee_changes_organization() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] organizations = service.repository.list_organization_units() current_code = employee.organization.code if employee.organization else None target = next( unit for unit in organizations if unit.unit_code in CANONICAL_DEPARTMENT_CODES and unit.unit_code != current_code ) updated = service.update_employee( employee.id, EmployeeUpdate(organization_unit_code=target.unit_code), ) assert updated.organization is not None assert updated.organization.code == target.unit_code assert updated.department == target.name assert any("更新员工信息" in item.action for item in updated.history) def test_update_employee_rejects_unknown_organization() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] with pytest.raises(ValueError, match="部门编码"): service.update_employee( employee.id, EmployeeUpdate(organization_unit_code="ORG-NOT-EXISTS"), ) def test_update_employee_changes_manager() -> None: with build_session() as db: service = EmployeeService(db) employees = service.list_employees() employee = employees[0] manager = next(item for item in employees if item.id != employee.id) updated = service.update_employee( employee.id, EmployeeUpdate(manager_employee_no=manager.employeeNo), ) assert updated.managerEmployeeNo == manager.employeeNo assert updated.manager == manager.name def test_format_history_datetime_uses_local_timezone_without_seconds() -> None: value = datetime(2026, 5, 20, 6, 30, 45, tzinfo=UTC) formatted = format_history_datetime(value) assert formatted == "2026年5月20日14时30分" assert "秒" not in formatted def test_update_employee_rejects_invalid_date_format() -> None: with build_session() as db: service = EmployeeService(db) employee = service.list_employees()[0] with pytest.raises(ValueError, match="出生日期格式必须为 YYYY-MM-DD。"): service.update_employee(employee.id, EmployeeUpdate(birth_date="2024/01/01"))