Files
X-Financial/server/tests/test_auth_service.py
caoxiaozhu 3f17619e0c fix(auth): 登录目录就绪幂等化与并发控制
- employee/settings/user_session_metrics 的 ensure_*_ready 改为按 bind 缓存 + 锁,
  避免每次登录重复建表与并发场景下的竞态
- auth 登录链路先查员工再降级触发目录就绪,并吞掉查询期 SQLAlchemy 异常
- 默认管理员账号由 superadmin 迁移为 admin,兼容历史账号回填
- 补充登录降级与设置持久化相关测试
2026-06-18 22:11:53 +08:00

146 lines
5.3 KiB
Python

from __future__ import annotations
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.db.base import Base
from app.schemas.auth import LoginRequest
from app.schemas.settings import SettingsWrite
from app.services.auth import AuthService, AuthenticatedUser
from app.services.employee import EmployeeService
from app.services.settings import SettingsService
def build_session() -> Session:
engine = create_engine(
"sqlite+pysqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
return session_factory()
def test_employee_can_login_with_seed_default_password() -> None:
with build_session() as db:
employee = EmployeeService(db).list_employees()[0]
result = AuthService(db).login(
LoginRequest(username=employee.email, password="123456")
)
assert result.ok is True
assert result.user.username == employee.email
assert result.user.name == employee.name
assert result.user.position == employee.position
assert result.user.grade == employee.grade
assert result.user.roleCodes
assert result.user.isAdmin is False
def test_current_user_snapshot_refreshes_employee_position() -> None:
with build_session() as db:
employee = EmployeeService(db).list_employees()[0]
result = AuthService(db).get_user_snapshot(employee.email)
assert result is not None
assert result.username == employee.email
assert result.name == employee.name
assert result.department == employee.department
assert result.position == employee.position
assert result.grade == employee.grade
def test_admin_can_login_with_database_password() -> None:
with build_session() as db:
settings_service = SettingsService(db)
payload = settings_service.get_settings_snapshot().model_dump()
payload["adminForm"]["adminAccount"] = "superadmin"
payload["adminForm"]["newPassword"] = "admin123"
payload["adminForm"]["confirmPassword"] = "admin123"
settings_service.save_settings_snapshot(SettingsWrite(**payload))
result = AuthService(db).login(
LoginRequest(username="superadmin", password="admin123")
)
assert result.ok is True
assert result.user.username == "superadmin"
assert result.user.isAdmin is True
assert result.user.position == "系统管理员"
assert result.user.roleCodes == ["manager"]
def test_disabled_employee_cannot_login() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
service.disable_employee(employee.id)
try:
AuthService(db).login(LoginRequest(username=employee.email, password="123456"))
except ValueError as exc:
assert "账号或密码错误" in str(exc)
else:
raise AssertionError("disabled employee login should be rejected")
def test_reenabled_employee_can_login_again() -> None:
with build_session() as db:
service = EmployeeService(db)
employee = service.list_employees()[0]
service.disable_employee(employee.id)
service.enable_employee(employee.id)
result = AuthService(db).login(LoginRequest(username=employee.email, password="123456"))
assert result.ok is True
assert result.user.username == employee.email
def test_employee_login_skips_directory_bootstrap_when_employee_exists(monkeypatch) -> None:
with build_session() as db:
service = AuthService(db)
calls: list[str] = []
class ExistingEmployee:
email = "demo@example.com"
password_hash = "hash"
employment_status = "在职"
def fail_if_bootstrapped(self) -> None:
calls.append("ensure_directory_ready")
raise AssertionError("existing employee login should not run directory bootstrap")
monkeypatch.setattr(AuthService, "_find_employee_by_email", lambda self, _: ExistingEmployee())
monkeypatch.setattr("app.services.auth.verify_password", lambda password, password_hash: True)
monkeypatch.setattr(
AuthService,
"_build_employee_user",
lambda self, employee: AuthenticatedUser(
username=employee.email,
name="Demo",
role="使用者",
department="",
position="",
grade="",
employee_no="",
manager_name="",
location="",
cost_center="",
finance_owner_name="",
risk_profile={},
role_codes=["user"],
email=employee.email,
avatar="D",
),
)
monkeypatch.setattr(EmployeeService, "ensure_directory_ready", fail_if_bootstrapped)
user = service._authenticate_employee("demo@example.com", "123456")
assert user is not None
assert user.username == "demo@example.com"
assert calls == []