from __future__ import annotations from dataclasses import dataclass from sqlalchemy import func, select from sqlalchemy.orm import Session, selectinload from app.core.config import get_settings from app.core.logging import get_logger from app.core.security import verify_password from app.models.employee import Employee from app.schemas.auth import AuthUserRead, LoginRequest, LoginResponse from app.services.employee import EmployeeService from app.services.employee_seed import ROLE_DISPLAY_ORDER from app.services.settings import SettingsService logger = get_logger("app.services.auth") ROLE_LABELS = { "manager": "管理员", "finance": "财务人员", "executive": "高级管理人员", "approver": "审批负责人", "auditor": "审计观察员", "user": "使用者", } @dataclass(slots=True) class AuthenticatedUser: username: str name: str role: str position: str grade: str role_codes: list[str] email: str avatar: str is_admin: bool = False class AuthService: def __init__(self, db: Session) -> None: self.db = db self.settings = get_settings() def login(self, payload: LoginRequest) -> LoginResponse: identifier = payload.username.strip() password = payload.password admin_user = self._authenticate_admin(identifier, password) if admin_user is not None: logger.info("Admin login succeeded identifier=%s", identifier) return LoginResponse(user=self._serialize_user(admin_user)) employee_user = self._authenticate_employee(identifier, password) if employee_user is not None: logger.info( "Employee login succeeded identifier=%s role_codes=%s", identifier, ",".join(employee_user.role_codes), ) return LoginResponse(user=self._serialize_user(employee_user)) logger.warning("Login failed identifier=%s", identifier) raise ValueError("账号或密码错误。") def _authenticate_admin(self, identifier: str, password: str) -> AuthenticatedUser | None: record = SettingsService(self.db).verify_admin_login(identifier, password) if record is None: return None admin_username = record.account.strip() admin_email = record.email.strip() display_name = admin_username or admin_email or "系统管理员" return AuthenticatedUser( username=admin_username or admin_email, name=display_name, role="管理员", position="系统管理员", grade="", role_codes=["manager"], email=admin_email or f"{admin_username}@local", avatar=display_name[:1].upper(), is_admin=True, ) def _authenticate_employee(self, identifier: str, password: str) -> AuthenticatedUser | None: if not self.settings.setup_completed: return None EmployeeService(self.db).ensure_directory_ready() stmt = ( select(Employee) .options(selectinload(Employee.roles)) .where(func.lower(Employee.email) == identifier.lower()) ) employee = self.db.execute(stmt).scalars().first() if employee is None or not employee.password_hash: return None if employee.employment_status == "停用": logger.warning("Disabled employee login blocked identifier=%s", identifier) return None if not verify_password(password, employee.password_hash): return None sorted_roles = sorted( list(employee.roles), key=lambda item: (ROLE_DISPLAY_ORDER.get(item.role_code, 999), item.name), ) role_codes = [role.role_code for role in sorted_roles] primary_role_code = role_codes[0] if role_codes else "user" return AuthenticatedUser( username=employee.email, name=employee.name, role=ROLE_LABELS.get(primary_role_code, "使用者"), position=employee.position, grade=employee.grade, role_codes=role_codes or ["user"], email=employee.email, avatar=(employee.name or "?")[:1].upper(), is_admin=False, ) @staticmethod def _serialize_user(user: AuthenticatedUser) -> AuthUserRead: return AuthUserRead( username=user.username, name=user.name, role=user.role, position=user.position, grade=user.grade, roleCodes=user.role_codes, email=user.email, avatar=user.avatar, isAdmin=user.is_admin, )