- employee/settings/user_session_metrics 的 ensure_*_ready 改为按 bind 缓存 + 锁, 避免每次登录重复建表与并发场景下的竞态 - auth 登录链路先查员工再降级触发目录就绪,并吞掉查询期 SQLAlchemy 异常 - 默认管理员账号由 superadmin 迁移为 admin,兼容历史账号回填 - 补充登录降级与设置持久化相关测试
146 lines
5.3 KiB
Python
146 lines
5.3 KiB
Python
from __future__ import annotations
|
|
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import Session, sessionmaker
|
|
from sqlalchemy.pool import StaticPool
|
|
|
|
from app.db.base import Base
|
|
from app.schemas.auth import LoginRequest
|
|
from app.schemas.settings import SettingsWrite
|
|
from app.services.auth import AuthService, AuthenticatedUser
|
|
from app.services.employee import EmployeeService
|
|
from app.services.settings import SettingsService
|
|
|
|
|
|
def build_session() -> Session:
|
|
engine = create_engine(
|
|
"sqlite+pysqlite:///:memory:",
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
Base.metadata.create_all(bind=engine)
|
|
session_factory = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
|
return session_factory()
|
|
|
|
|
|
def test_employee_can_login_with_seed_default_password() -> None:
|
|
with build_session() as db:
|
|
employee = EmployeeService(db).list_employees()[0]
|
|
result = AuthService(db).login(
|
|
LoginRequest(username=employee.email, password="123456")
|
|
)
|
|
|
|
assert result.ok is True
|
|
assert result.user.username == employee.email
|
|
assert result.user.name == employee.name
|
|
assert result.user.position == employee.position
|
|
assert result.user.grade == employee.grade
|
|
assert result.user.roleCodes
|
|
assert result.user.isAdmin is False
|
|
|
|
|
|
def test_current_user_snapshot_refreshes_employee_position() -> None:
|
|
with build_session() as db:
|
|
employee = EmployeeService(db).list_employees()[0]
|
|
result = AuthService(db).get_user_snapshot(employee.email)
|
|
|
|
assert result is not None
|
|
assert result.username == employee.email
|
|
assert result.name == employee.name
|
|
assert result.department == employee.department
|
|
assert result.position == employee.position
|
|
assert result.grade == employee.grade
|
|
|
|
|
|
def test_admin_can_login_with_database_password() -> None:
|
|
with build_session() as db:
|
|
settings_service = SettingsService(db)
|
|
payload = settings_service.get_settings_snapshot().model_dump()
|
|
payload["adminForm"]["adminAccount"] = "superadmin"
|
|
payload["adminForm"]["newPassword"] = "admin123"
|
|
payload["adminForm"]["confirmPassword"] = "admin123"
|
|
settings_service.save_settings_snapshot(SettingsWrite(**payload))
|
|
|
|
result = AuthService(db).login(
|
|
LoginRequest(username="superadmin", password="admin123")
|
|
)
|
|
|
|
assert result.ok is True
|
|
assert result.user.username == "superadmin"
|
|
assert result.user.isAdmin is True
|
|
assert result.user.position == "系统管理员"
|
|
assert result.user.roleCodes == ["manager"]
|
|
|
|
|
|
def test_disabled_employee_cannot_login() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
service.disable_employee(employee.id)
|
|
|
|
try:
|
|
AuthService(db).login(LoginRequest(username=employee.email, password="123456"))
|
|
except ValueError as exc:
|
|
assert "账号或密码错误" in str(exc)
|
|
else:
|
|
raise AssertionError("disabled employee login should be rejected")
|
|
|
|
|
|
def test_reenabled_employee_can_login_again() -> None:
|
|
with build_session() as db:
|
|
service = EmployeeService(db)
|
|
employee = service.list_employees()[0]
|
|
service.disable_employee(employee.id)
|
|
service.enable_employee(employee.id)
|
|
|
|
result = AuthService(db).login(LoginRequest(username=employee.email, password="123456"))
|
|
|
|
assert result.ok is True
|
|
assert result.user.username == employee.email
|
|
|
|
|
|
def test_employee_login_skips_directory_bootstrap_when_employee_exists(monkeypatch) -> None:
|
|
with build_session() as db:
|
|
service = AuthService(db)
|
|
calls: list[str] = []
|
|
|
|
class ExistingEmployee:
|
|
email = "demo@example.com"
|
|
password_hash = "hash"
|
|
employment_status = "在职"
|
|
|
|
def fail_if_bootstrapped(self) -> None:
|
|
calls.append("ensure_directory_ready")
|
|
raise AssertionError("existing employee login should not run directory bootstrap")
|
|
|
|
monkeypatch.setattr(AuthService, "_find_employee_by_email", lambda self, _: ExistingEmployee())
|
|
monkeypatch.setattr("app.services.auth.verify_password", lambda password, password_hash: True)
|
|
monkeypatch.setattr(
|
|
AuthService,
|
|
"_build_employee_user",
|
|
lambda self, employee: AuthenticatedUser(
|
|
username=employee.email,
|
|
name="Demo",
|
|
role="使用者",
|
|
department="",
|
|
position="",
|
|
grade="",
|
|
employee_no="",
|
|
manager_name="",
|
|
location="",
|
|
cost_center="",
|
|
finance_owner_name="",
|
|
risk_profile={},
|
|
role_codes=["user"],
|
|
email=employee.email,
|
|
avatar="D",
|
|
),
|
|
)
|
|
monkeypatch.setattr(EmployeeService, "ensure_directory_ready", fail_if_bootstrapped)
|
|
|
|
user = service._authenticate_employee("demo@example.com", "123456")
|
|
|
|
assert user is not None
|
|
assert user.username == "demo@example.com"
|
|
assert calls == []
|