Initialize admin bootstrap settings during startup, persist username support in auth flows, and align frontend auth requests with local API behavior.
61 lines
1.9 KiB
Python
61 lines
1.9 KiB
Python
from sqlalchemy import or_, select
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.user import User
|
|
from app.services.auth_service import get_password_hash
|
|
|
|
|
|
def _is_bootstrap_enabled(settings) -> bool:
|
|
return bool(settings.ADMIN.strip() and settings.ADMIN_EMAIL.strip() and settings.ADMIN_PASSWORD.strip())
|
|
|
|
|
|
async def ensure_admin_user(db: AsyncSession, settings) -> None:
|
|
if not _is_bootstrap_enabled(settings):
|
|
return
|
|
|
|
result = await db.execute(
|
|
select(User).where(
|
|
or_(User.username == settings.ADMIN.strip(), User.email == settings.ADMIN_EMAIL.strip())
|
|
)
|
|
)
|
|
existing_user = result.scalar_one_or_none()
|
|
|
|
if existing_user:
|
|
if (
|
|
existing_user.username == settings.ADMIN.strip()
|
|
and existing_user.email == settings.ADMIN_EMAIL.strip()
|
|
and existing_user.is_superuser
|
|
):
|
|
return
|
|
raise RuntimeError('admin bootstrap identity conflict')
|
|
|
|
admin_user = User(
|
|
username=settings.ADMIN.strip(),
|
|
email=settings.ADMIN_EMAIL.strip(),
|
|
hashed_password=get_password_hash(settings.ADMIN_PASSWORD),
|
|
full_name=settings.ADMIN_FULL_NAME or None,
|
|
is_active=True,
|
|
is_superuser=True,
|
|
)
|
|
db.add(admin_user)
|
|
try:
|
|
await db.commit()
|
|
except IntegrityError:
|
|
await db.rollback()
|
|
result = await db.execute(
|
|
select(User).where(
|
|
or_(User.username == settings.ADMIN.strip(), User.email == settings.ADMIN_EMAIL.strip())
|
|
)
|
|
)
|
|
existing_user = result.scalar_one_or_none()
|
|
if (
|
|
existing_user
|
|
and existing_user.username == settings.ADMIN.strip()
|
|
and existing_user.email == settings.ADMIN_EMAIL.strip()
|
|
and existing_user.is_superuser
|
|
):
|
|
return
|
|
raise
|
|
await db.refresh(admin_user)
|