fix(auth): 登录目录就绪幂等化与并发控制
- employee/settings/user_session_metrics 的 ensure_*_ready 改为按 bind 缓存 + 锁, 避免每次登录重复建表与并发场景下的竞态 - auth 登录链路先查员工再降级触发目录就绪,并吞掉查询期 SQLAlchemy 异常 - 默认管理员账号由 superadmin 迁移为 admin,兼容历史账号回填 - 补充登录降级与设置持久化相关测试
This commit is contained in:
@@ -7,7 +7,7 @@ from sqlalchemy.pool import StaticPool
|
||||
from app.db.base import Base
|
||||
from app.schemas.auth import LoginRequest
|
||||
from app.schemas.settings import SettingsWrite
|
||||
from app.services.auth import AuthService
|
||||
from app.services.auth import AuthService, AuthenticatedUser
|
||||
from app.services.employee import EmployeeService
|
||||
from app.services.settings import SettingsService
|
||||
|
||||
@@ -97,3 +97,49 @@ def test_reenabled_employee_can_login_again() -> None:
|
||||
|
||||
assert result.ok is True
|
||||
assert result.user.username == employee.email
|
||||
|
||||
|
||||
def test_employee_login_skips_directory_bootstrap_when_employee_exists(monkeypatch) -> None:
|
||||
with build_session() as db:
|
||||
service = AuthService(db)
|
||||
calls: list[str] = []
|
||||
|
||||
class ExistingEmployee:
|
||||
email = "demo@example.com"
|
||||
password_hash = "hash"
|
||||
employment_status = "在职"
|
||||
|
||||
def fail_if_bootstrapped(self) -> None:
|
||||
calls.append("ensure_directory_ready")
|
||||
raise AssertionError("existing employee login should not run directory bootstrap")
|
||||
|
||||
monkeypatch.setattr(AuthService, "_find_employee_by_email", lambda self, _: ExistingEmployee())
|
||||
monkeypatch.setattr("app.services.auth.verify_password", lambda password, password_hash: True)
|
||||
monkeypatch.setattr(
|
||||
AuthService,
|
||||
"_build_employee_user",
|
||||
lambda self, employee: AuthenticatedUser(
|
||||
username=employee.email,
|
||||
name="Demo",
|
||||
role="使用者",
|
||||
department="",
|
||||
position="",
|
||||
grade="",
|
||||
employee_no="",
|
||||
manager_name="",
|
||||
location="",
|
||||
cost_center="",
|
||||
finance_owner_name="",
|
||||
risk_profile={},
|
||||
role_codes=["user"],
|
||||
email=employee.email,
|
||||
avatar="D",
|
||||
),
|
||||
)
|
||||
monkeypatch.setattr(EmployeeService, "ensure_directory_ready", fail_if_bootstrapped)
|
||||
|
||||
user = service._authenticate_employee("demo@example.com", "123456")
|
||||
|
||||
assert user is not None
|
||||
assert user.username == "demo@example.com"
|
||||
assert calls == []
|
||||
|
||||
@@ -186,6 +186,23 @@ def test_legacy_setup_admin_password_is_migrated_to_database(monkeypatch) -> Non
|
||||
assert service.verify_admin_login("setup-admin", password) is not None
|
||||
|
||||
|
||||
def test_default_admin_credentials_are_written_to_database(monkeypatch) -> None:
|
||||
temp_dir = build_temp_secret_dir()
|
||||
monkeypatch.setattr(admin_secret, "ADMIN_SECRET_FILE", temp_dir / "missing-admin.json")
|
||||
monkeypatch.setattr(secret_box, "SECRET_KEY_FILE", temp_dir / "settings.key")
|
||||
monkeypatch.setattr(Base.metadata, "create_all", lambda *args, **kwargs: None)
|
||||
monkeypatch.setenv("HERMES_HOME", str(temp_dir / ".hermes"))
|
||||
|
||||
with build_session(temp_dir / "settings.db") as db:
|
||||
service = SettingsService(db)
|
||||
settings_row, secrets_row = service.ensure_settings_ready()
|
||||
|
||||
assert settings_row.admin_account == "admin"
|
||||
assert secrets_row.admin_password_hash
|
||||
assert service.verify_admin_login("admin", "admin") is not None
|
||||
assert service.verify_admin_login("superadmin", "admin") is None
|
||||
|
||||
|
||||
def test_settings_service_syncs_models_to_hermes_config(monkeypatch) -> None:
|
||||
temp_dir = build_temp_secret_dir()
|
||||
hermes_home = temp_dir / ".hermes"
|
||||
|
||||
Reference in New Issue
Block a user