from __future__ import annotations from dataclasses import dataclass from datetime import UTC, datetime, timedelta from typing import Any from sqlalchemy import func, or_, select from sqlalchemy.orm import Session, selectinload from app.core.config import get_settings from app.core.logging import get_logger from app.core.security import verify_password from app.models.employee import Employee from app.models.financial_record import ExpenseClaim from app.schemas.auth import AuthUserRead, LoginRequest, LoginResponse from app.services.employee import EmployeeService from app.services.employee_seed import ROLE_DISPLAY_ORDER from app.services.settings import SettingsService logger = get_logger("app.services.auth") ROLE_LABELS = { "manager": "管理员", "finance": "财务人员", "executive": "高级管理人员", "approver": "审批负责人", "auditor": "审计观察员", "user": "使用者", } @dataclass(slots=True) class AuthenticatedUser: username: str name: str role: str department: str position: str grade: str employee_no: str manager_name: str location: str cost_center: str finance_owner_name: str risk_profile: dict[str, Any] role_codes: list[str] email: str avatar: str is_admin: bool = False class AuthService: def __init__(self, db: Session) -> None: self.db = db self.settings = get_settings() def login(self, payload: LoginRequest) -> LoginResponse: identifier = payload.username.strip() password = payload.password admin_user = self._authenticate_admin(identifier, password) if admin_user is not None: logger.info("Admin login succeeded identifier=%s", identifier) return LoginResponse(user=self._serialize_user(admin_user)) employee_user = self._authenticate_employee(identifier, password) if employee_user is not None: logger.info( "Employee login succeeded identifier=%s role_codes=%s", identifier, ",".join(employee_user.role_codes), ) return LoginResponse(user=self._serialize_user(employee_user)) logger.warning("Login failed identifier=%s", identifier) raise ValueError("账号或密码错误。") def _authenticate_admin(self, identifier: str, password: str) -> AuthenticatedUser | None: record = SettingsService(self.db).verify_admin_login(identifier, password) if record is None: return None admin_username = record.account.strip() admin_email = record.email.strip() display_name = admin_username or admin_email or "系统管理员" return AuthenticatedUser( username=admin_username or admin_email, name=display_name, role="管理员", department="", position="系统管理员", grade="", employee_no="", manager_name="", location="", cost_center="", finance_owner_name="", risk_profile={}, role_codes=["manager"], email=admin_email or f"{admin_username}@local", avatar=display_name[:1].upper(), is_admin=True, ) def _authenticate_employee(self, identifier: str, password: str) -> AuthenticatedUser | None: if not self.settings.setup_completed: return None EmployeeService(self.db).ensure_directory_ready() stmt = ( select(Employee) .options( selectinload(Employee.organization_unit), selectinload(Employee.manager), selectinload(Employee.roles), ) .where(func.lower(Employee.email) == identifier.lower()) ) employee = self.db.execute(stmt).scalars().first() if employee is None or not employee.password_hash: return None if employee.employment_status == "停用": logger.warning("Disabled employee login blocked identifier=%s", identifier) return None if not verify_password(password, employee.password_hash): return None sorted_roles = sorted( list(employee.roles), key=lambda item: (ROLE_DISPLAY_ORDER.get(item.role_code, 999), item.name), ) role_codes = [role.role_code for role in sorted_roles] primary_role_code = role_codes[0] if role_codes else "user" department = employee.organization_unit.name if employee.organization_unit is not None else "" manager_name = self._resolve_manager_name(employee) return AuthenticatedUser( username=employee.email, name=employee.name, role=ROLE_LABELS.get(primary_role_code, "使用者"), department=department, position=employee.position, grade=employee.grade, employee_no=employee.employee_no, manager_name=manager_name, location=employee.location or "", cost_center=employee.cost_center or "", finance_owner_name=employee.finance_owner_name or "", risk_profile=self._build_risk_profile(employee), role_codes=role_codes or ["user"], email=employee.email, avatar=(employee.name or "?")[:1].upper(), is_admin=False, ) @staticmethod def _resolve_manager_name(employee: Employee) -> str: if employee.manager is not None and employee.manager.name: return str(employee.manager.name).strip() if employee.organization_unit is not None and employee.organization_unit.manager_name: return str(employee.organization_unit.manager_name).strip() return "" def _build_risk_profile(self, employee: Employee) -> dict[str, Any]: since = datetime.now(UTC) - timedelta(days=90) identity_values = [ str(employee.name or "").strip(), str(employee.email or "").strip(), str(employee.employee_no or "").strip(), ] name_candidates = [item for item in dict.fromkeys(identity_values) if item] conditions = [ExpenseClaim.employee_id == employee.id] if name_candidates: conditions.append(ExpenseClaim.employee_name.in_(name_candidates)) stmt = ( select(ExpenseClaim) .where(or_(*conditions), ExpenseClaim.occurred_at >= since) .order_by(ExpenseClaim.occurred_at.desc()) .limit(30) ) claims = list(self.db.scalars(stmt).all()) recent_risk_flags: list[str] = [] for claim in claims: for flag in claim.risk_flags_json or []: normalized = str(flag or "").strip() if normalized and normalized not in recent_risk_flags: recent_risk_flags.append(normalized) if len(recent_risk_flags) >= 6: break if len(recent_risk_flags) >= 6: break return { "windowDays": 90, "totalClaimCount": len(claims), "riskyClaimCount": sum(1 for claim in claims if claim.risk_flags_json), "draftClaimCount": sum(1 for claim in claims if claim.status == "draft"), "recentRiskFlags": recent_risk_flags, "lastClaimAt": claims[0].occurred_at.isoformat() if claims and claims[0].occurred_at else "", } @staticmethod def _serialize_user(user: AuthenticatedUser) -> AuthUserRead: return AuthUserRead( username=user.username, name=user.name, role=user.role, department=user.department, departmentName=user.department, position=user.position, grade=user.grade, employeeNo=user.employee_no, managerName=user.manager_name, location=user.location, costCenter=user.cost_center, financeOwnerName=user.finance_owner_name, riskProfile=user.risk_profile, roleCodes=user.role_codes, email=user.email, avatar=user.avatar, isAdmin=user.is_admin, )