from __future__ import annotations from collections import Counter from datetime import date, datetime from typing import Any from sqlalchemy.orm import Session from app.core.config import get_settings from app.core.logging import get_logger from app.db.base import Base from app.db.session import get_session_factory from app.models.employee import Employee from app.models.employee_change_log import EmployeeChangeLog from app.models.organization import OrganizationUnit from app.models.role import Role from app.repositories.employee import EmployeeRepository from app.schemas.employee import ( EmployeeCreate, EmployeeHistoryRead, EmployeeMetaRead, EmployeeOrganizationRead, EmployeeRead, EmployeeRoleOptionRead, EmployeeStatusSummaryRead, ) from app.services.employee_seed import ( EMPLOYEE_DEFINITIONS, ORGANIZATION_DEFINITIONS, ROLE_DEFINITIONS, ROLE_DISPLAY_ORDER, ROLE_PERMISSION_MAP, ) logger = get_logger("app.services.employee") STATUS_TONE_MAP = { "在职": "success", "试用中": "warning", "停用": "neutral", } STATUS_ORDER = ["全部员工", "在职", "试用中", "停用"] SEEDED_EMPLOYEE_DEFINITIONS = EMPLOYEE_DEFINITIONS[:30] EXTRA_SEED_EMPLOYEE_NOS = {item["employee_no"] for item in EMPLOYEE_DEFINITIONS[30:]} def prepare_employee_directory() -> None: settings = get_settings() if not settings.setup_completed: logger.info("Employee directory bootstrap skipped because setup is incomplete") return session_factory = get_session_factory() with session_factory() as db: EmployeeService(db).ensure_directory_ready() class EmployeeService: def __init__(self, db: Session) -> None: self.db = db self.repository = EmployeeRepository(db) def ensure_directory_ready(self) -> None: try: Base.metadata.create_all(bind=self.db.get_bind()) self._prune_extra_seed_employees() self._seed_roles() self._seed_organization_units() self._seed_employees() self.db.commit() except Exception: self.db.rollback() logger.exception("Failed to prepare employee directory") raise def list_employees(self, status: str | None = None, keyword: str | None = None) -> list[EmployeeRead]: self.ensure_directory_ready() employees = self.repository.list(status=status, keyword=keyword) logger.info("Listed employees (count=%d, status=%s, keyword=%s)", len(employees), status, keyword) return [self._serialize_employee(item) for item in employees] def get_employee(self, employee_id: str) -> EmployeeRead | None: self.ensure_directory_ready() employee = self.repository.get(employee_id) if employee is None: logger.warning("Employee not found id=%s", employee_id) return None logger.info("Fetched employee id=%s name=%s", employee_id, employee.name) return self._serialize_employee(employee) def get_employee_meta(self) -> EmployeeMetaRead: self.ensure_directory_ready() employees = self.repository.list() status_counter = Counter(item.employment_status for item in employees) status_summary = [ EmployeeStatusSummaryRead( id=status, label=status, count=len(employees) if status == "全部员工" else status_counter.get(status, 0), ) for status in STATUS_ORDER ] role_options = [ EmployeeRoleOptionRead( id=role.role_code, code=role.role_code, label=role.name, desc=role.description, permissions=list(ROLE_PERMISSION_MAP.get(role.role_code, [])), ) for role in self._sorted_roles(self.repository.list_roles()) ] return EmployeeMetaRead( totalEmployees=len(employees), statusSummary=status_summary, roleOptions=role_options, ) def create_employee(self, payload: EmployeeCreate) -> EmployeeRead: self.ensure_directory_ready() if self.repository.get_by_employee_no(payload.employee_no): raise ValueError(f"员工编号 {payload.employee_no} 已存在") if self.repository.get_by_email(str(payload.email)): raise ValueError(f"邮箱 {payload.email} 已存在") employee = Employee( employee_no=payload.employee_no, name=payload.name, email=str(payload.email), gender=payload.gender, birth_date=payload.parsed_birth_date(), phone=payload.phone, join_date=payload.parsed_join_date(), location=payload.location, position=payload.position, grade=payload.grade, cost_center=payload.cost_center, finance_owner_name=payload.finance_owner_name, employment_status=payload.employment_status, sync_state=payload.sync_state, spotlight=payload.spotlight, last_sync_at=datetime.now(), ) if payload.organization_unit_code: employee.organization_unit = self.repository.get_organization_by_code(payload.organization_unit_code) if payload.manager_employee_no: employee.manager = self.repository.get_by_employee_no(payload.manager_employee_no) roles = [ role for code in payload.role_codes if (role := self.repository.get_role_by_code(code)) is not None ] employee.roles = self._sorted_roles(roles) created = self.repository.create(employee) logger.info( "Created employee id=%s no=%s name=%s", created.id, created.employee_no, created.name ) hydrated = self.repository.get(created.id) return self._serialize_employee(hydrated or created) def _seed_roles(self) -> None: existing_by_code = {role.role_code: role for role in self.repository.list_roles()} for definition in ROLE_DEFINITIONS: role = existing_by_code.get(definition["role_code"]) if role is None: role = Role( role_code=definition["role_code"], name=definition["name"], description=definition["description"], ) self.db.add(role) existing_by_code[role.role_code] = role self.db.flush() def _seed_organization_units(self) -> None: existing_by_code = { unit.unit_code: unit for unit in self.repository.list_organization_units() } for definition in ORGANIZATION_DEFINITIONS: organization = existing_by_code.get(definition["unit_code"]) if organization is None: organization = OrganizationUnit( unit_code=definition["unit_code"], name=definition["name"], unit_type=definition["unit_type"], cost_center=definition.get("cost_center"), location=definition.get("location"), manager_name=definition.get("manager_name"), ) self.db.add(organization) existing_by_code[organization.unit_code] = organization self.db.flush() for definition in ORGANIZATION_DEFINITIONS: parent_code = definition.get("parent_code") if not parent_code: continue organization = existing_by_code[definition["unit_code"]] if organization.parent_id: continue parent = existing_by_code.get(parent_code) if parent is not None: organization.parent = parent self.db.flush() def _seed_employees(self) -> None: employees_by_no = { employee.employee_no: employee for employee in self.repository.list() } roles_by_code = {role.role_code: role for role in self.repository.list_roles()} organizations_by_code = { unit.unit_code: unit for unit in self.repository.list_organization_units() } for definition in SEEDED_EMPLOYEE_DEFINITIONS: employee_no = definition["employee_no"] if employee_no in employees_by_no: continue employee = Employee( employee_no=employee_no, name=definition["name"], email=definition["email"], gender=definition.get("gender"), birth_date=self._parse_date(definition.get("birth_date")), phone=definition.get("phone"), join_date=self._parse_date(definition.get("join_date")), location=definition.get("location"), position=definition.get("position", "员工"), grade=definition.get("grade", "P3"), cost_center=definition.get("cost_center"), finance_owner_name=definition.get("finance_owner_name"), employment_status=definition.get("employment_status", "在职"), sync_state=definition.get("sync_state", "已同步"), spotlight=bool(definition.get("spotlight")), last_sync_at=self._parse_datetime(definition.get("last_sync_at")), updated_at=self._parse_datetime(definition.get("updated_at")), ) self.db.add(employee) employees_by_no[employee_no] = employee self.db.flush() for definition in SEEDED_EMPLOYEE_DEFINITIONS: employee = employees_by_no[definition["employee_no"]] organization_code = definition.get("organization_unit_code") manager_employee_no = definition.get("manager_employee_no") if employee.organization_unit_id is None and organization_code: employee.organization_unit = organizations_by_code.get(organization_code) if employee.manager_id is None and manager_employee_no: employee.manager = employees_by_no.get(manager_employee_no) if not employee.roles: employee.roles = self._sorted_roles( [ roles_by_code[role_code] for role_code in definition.get("role_codes", []) if role_code in roles_by_code ] ) self._seed_employee_history(employee, definition) self.db.flush() def _prune_extra_seed_employees(self) -> None: if not EXTRA_SEED_EMPLOYEE_NOS: return for employee_no in EXTRA_SEED_EMPLOYEE_NOS: employee = self.repository.get_by_employee_no(employee_no) if employee is not None: self.db.delete(employee) def _seed_employee_history(self, employee: Employee, definition: dict[str, Any]) -> None: existing_keys = { (item.action, item.owner, self._format_datetime(item.occurred_at)) for item in employee.change_logs } history_items = list(definition.get("history", [])) if not history_items: history_items = [ { "action": "初始化员工档案", "owner": "系统初始化任务", "occurred_at": definition.get("updated_at") or definition.get("last_sync_at"), } ] for history in history_items: occurred_at = self._parse_datetime(history.get("occurred_at")) if occurred_at is None: continue identity = ( history["action"], history["owner"], self._format_datetime(occurred_at), ) if identity in existing_keys: continue self.db.add( EmployeeChangeLog( employee=employee, action=history["action"], owner=history["owner"], occurred_at=occurred_at, ) ) existing_keys.add(identity) def _serialize_employee(self, employee: Employee) -> EmployeeRead: organization = employee.organization_unit roles = self._sorted_roles(list(employee.roles)) role_labels = [role.name for role in roles] role_codes = [role.role_code for role in roles] history = [ EmployeeHistoryRead( action=item.action, owner=item.owner, time=self._format_datetime(item.occurred_at) or "", occurredAt=self._format_datetime(item.occurred_at) or "", ) for item in employee.change_logs ] return EmployeeRead( id=employee.id, avatar=(employee.name or "?")[:1], name=employee.name, employeeNo=employee.employee_no, department=organization.name if organization else "", position=employee.position, grade=employee.grade, manager=employee.manager.name if employee.manager else "CEO", financeOwner=employee.finance_owner_name or "", roles=role_labels, roleCodes=role_codes, status=employee.employment_status, statusTone=STATUS_TONE_MAP.get(employee.employment_status, "neutral"), gender=employee.gender, age=self._calculate_age(employee.birth_date), birthDate=self._format_date(employee.birth_date), email=employee.email, phone=employee.phone, joinDate=self._format_date(employee.join_date), location=employee.location, costCenter=employee.cost_center, updatedAt=self._format_datetime(employee.updated_at or employee.created_at), lastSync=self._format_datetime(employee.last_sync_at), syncState=employee.sync_state, spotlight=employee.spotlight, permissions=self._collect_permissions(role_codes), history=history, organization=( EmployeeOrganizationRead( id=organization.id, code=organization.unit_code, name=organization.name, unitType=organization.unit_type, costCenter=organization.cost_center, location=organization.location, managerName=organization.manager_name, ) if organization else None ), ) def _collect_permissions(self, role_codes: list[str]) -> list[str]: permissions: list[str] = [] seen: set[str] = set() for role_code in role_codes: for permission in ROLE_PERMISSION_MAP.get(role_code, []): if permission in seen: continue permissions.append(permission) seen.add(permission) return permissions def _sorted_roles(self, roles: list[Role]) -> list[Role]: return sorted(roles, key=lambda item: (ROLE_DISPLAY_ORDER.get(item.role_code, 999), item.name)) @staticmethod def _parse_date(value: str | None) -> date | None: if not value: return None return datetime.strptime(value, "%Y-%m-%d").date() @staticmethod def _parse_datetime(value: str | datetime | None) -> datetime | None: if value is None: return None if isinstance(value, datetime): return value return datetime.strptime(value, "%Y-%m-%d %H:%M") @staticmethod def _format_date(value: date | None) -> str | None: if value is None: return None return value.strftime("%Y-%m-%d") @staticmethod def _format_datetime(value: datetime | None) -> str | None: if value is None: return None return value.strftime("%Y-%m-%d %H:%M") @staticmethod def _calculate_age(birth_date: date | None) -> int | None: if birth_date is None: return None today = date.today() age = today.year - birth_date.year if (today.month, today.day) < (birth_date.month, birth_date.day): age -= 1 return age