from __future__ import annotations from dataclasses import dataclass from datetime import datetime from sqlalchemy.orm import Session from app.core.admin_secret import read_admin_secret, verify_admin_secret from app.core.config import get_settings from app.core.secret_box import decrypt_secret, encrypt_secret from app.core.security import hash_password, verify_password from app.db.base import Base from app.models.system_setting import SystemSetting from app.models.system_setting_secret import SystemSettingSecret from app.repositories.settings import SETTINGS_ROW_ID, SettingsRepository from app.schemas.settings import SettingsRead, SettingsWrite MODEL_SECRET_FIELDS = { "main": "main_api_key_encrypted", "backup": "backup_api_key_encrypted", "vlm": "vlm_api_key_encrypted", "embedding": "embedding_api_key_encrypted", } @dataclass(slots=True) class AdminCredentialRecord: account: str email: str password_hash: str class SettingsService: def __init__(self, db: Session) -> None: self.db = db self.repository = SettingsRepository(db) self.runtime_settings = get_settings() def ensure_settings_ready(self) -> tuple[SystemSetting, SystemSettingSecret]: Base.metadata.create_all(bind=self.db.get_bind()) settings_row = self.repository.get_settings() secrets_row = self.repository.get_secrets() should_commit = False if settings_row is None: settings_row = self._build_default_settings() self.db.add(settings_row) should_commit = True if secrets_row is None: secrets_row = SystemSettingSecret(id=SETTINGS_ROW_ID) self.db.add(secrets_row) should_commit = True if should_commit: self.db.commit() self.db.refresh(settings_row) self.db.refresh(secrets_row) return settings_row, secrets_row def get_settings_snapshot(self) -> SettingsRead: settings_row, secrets_row = self.ensure_settings_ready() return self._serialize(settings_row, secrets_row) def save_settings_snapshot(self, payload: SettingsWrite) -> SettingsRead: settings_row, secrets_row = self.ensure_settings_ready() if payload.adminForm.newPassword: if len(payload.adminForm.newPassword) < 5: raise ValueError("管理员密码至少需要 5 位。") if payload.adminForm.newPassword != payload.adminForm.confirmPassword: raise ValueError("两次输入的管理员密码不一致。") secrets_row.admin_password_hash = hash_password(payload.adminForm.newPassword) settings_row.company_name = payload.companyForm.companyName settings_row.display_name = payload.companyForm.displayName settings_row.company_code = payload.companyForm.companyCode settings_row.record_number = payload.companyForm.recordNumber settings_row.copyright_text = payload.companyForm.copyright settings_row.admin_account = payload.adminForm.adminAccount settings_row.admin_email = payload.adminForm.adminEmail settings_row.session_timeout = payload.adminForm.sessionTimeout settings_row.notice_email = payload.adminForm.noticeEmail settings_row.mfa_enabled = payload.adminForm.mfaEnabled settings_row.strong_password = payload.adminForm.strongPassword settings_row.login_alert_enabled = payload.adminForm.loginAlertEnabled settings_row.main_provider = payload.llmForm.mainProvider settings_row.main_model = payload.llmForm.mainModel settings_row.main_endpoint = payload.llmForm.mainEndpoint settings_row.backup_provider = payload.llmForm.backupProvider settings_row.backup_model = payload.llmForm.backupModel settings_row.backup_endpoint = payload.llmForm.backupEndpoint settings_row.vlm_provider = payload.llmForm.vlmProvider settings_row.vlm_model = payload.llmForm.vlmModel settings_row.vlm_endpoint = payload.llmForm.vlmEndpoint settings_row.embedding_provider = payload.llmForm.embeddingProvider settings_row.embedding_model = payload.llmForm.embeddingModel settings_row.embedding_endpoint = payload.llmForm.embeddingEndpoint self._replace_secret_if_present(secrets_row, "main_api_key_encrypted", payload.llmForm.mainApiKey) self._replace_secret_if_present(secrets_row, "backup_api_key_encrypted", payload.llmForm.backupApiKey) self._replace_secret_if_present(secrets_row, "vlm_api_key_encrypted", payload.llmForm.vlmApiKey) self._replace_secret_if_present( secrets_row, "embedding_api_key_encrypted", payload.llmForm.embeddingApiKey, ) settings_row.log_level = payload.logForm.level settings_row.retention_days = payload.logForm.retentionDays settings_row.archive_cycle = payload.logForm.archiveCycle settings_row.log_path = payload.logForm.logPath settings_row.alert_email = payload.logForm.alertEmail settings_row.operation_audit = payload.logForm.operationAudit settings_row.login_audit = payload.logForm.loginAudit settings_row.mask_sensitive = payload.logForm.maskSensitive settings_row.smtp_host = payload.mailForm.smtpHost settings_row.smtp_port = payload.mailForm.port settings_row.smtp_encryption = payload.mailForm.encryption settings_row.sender_name = payload.mailForm.senderName settings_row.sender_address = payload.mailForm.senderAddress settings_row.smtp_username = payload.mailForm.username settings_row.alert_enabled = payload.mailForm.alertEnabled settings_row.digest_enabled = payload.mailForm.digestEnabled settings_row.digest_time = payload.mailForm.digestTime settings_row.default_receiver = payload.mailForm.defaultReceiver self._replace_secret_if_present(secrets_row, "smtp_password_encrypted", payload.mailForm.password) self.db.add(settings_row) self.db.add(secrets_row) self.db.commit() self.db.refresh(settings_row) self.db.refresh(secrets_row) return self._serialize(settings_row, secrets_row) def load_saved_model_api_key(self, slot: str | None) -> str: if not slot or slot not in MODEL_SECRET_FIELDS: return "" _, secrets_row = self.ensure_settings_ready() encrypted_value = getattr(secrets_row, MODEL_SECRET_FIELDS[slot], "") if not encrypted_value: return "" return decrypt_secret(encrypted_value) def get_admin_credentials(self) -> AdminCredentialRecord | None: settings_row, secrets_row = self.ensure_settings_ready() if secrets_row.admin_password_hash: return AdminCredentialRecord( account=settings_row.admin_account, email=settings_row.admin_email, password_hash=secrets_row.admin_password_hash, ) legacy_record = read_admin_secret() if legacy_record is None: return None username = str(legacy_record.get("username", "")).strip() email = str(settings_row.admin_email or self.runtime_settings.admin_email or "").strip() password_hash = "" # Legacy admin.json uses scrypt fields rather than the app password format. # The auth flow handles this file separately when no DB-backed admin password exists. if username or email: return AdminCredentialRecord(account=username, email=email, password_hash=password_hash) return None def verify_admin_login(self, identifier: str, password: str) -> AdminCredentialRecord | None: settings_row, secrets_row = self.ensure_settings_ready() normalized_identifier = identifier.casefold() if secrets_row.admin_password_hash: allowed_identifiers = { value.casefold() for value in [settings_row.admin_account, settings_row.admin_email] if value } if normalized_identifier not in allowed_identifiers: return None if not verify_password(password, secrets_row.admin_password_hash): return None return AdminCredentialRecord( account=settings_row.admin_account, email=settings_row.admin_email, password_hash=secrets_row.admin_password_hash, ) legacy_record = read_admin_secret() if legacy_record is None: return None admin_username = str(legacy_record.get("username", "")).strip() admin_email = str(settings_row.admin_email or self.runtime_settings.admin_email or "").strip() allowed_identifiers = { value.casefold() for value in [admin_username, admin_email] if value } if normalized_identifier not in allowed_identifiers: return None if not verify_admin_secret(password, legacy_record): return None return AdminCredentialRecord(account=admin_username, email=admin_email, password_hash="") def _build_default_settings(self) -> SystemSetting: current_year = datetime.now().year company_name = str(self.runtime_settings.company_name or "X-Financial").strip() or "X-Financial" company_code = str(self.runtime_settings.company_code or "XF-001").strip() or "XF-001" admin_email = str(self.runtime_settings.admin_email or "").strip() legacy_admin = read_admin_secret() or {} admin_account = str(legacy_admin.get("username", "")).strip() or "superadmin" return SystemSetting( id=SETTINGS_ROW_ID, company_name=company_name, display_name=company_name, company_code=company_code, record_number="", copyright_text=f"Copyright © 2024-{current_year} {company_name}. All Rights Reserved.", admin_account=admin_account, admin_email=admin_email, session_timeout=30, notice_email=admin_email, mfa_enabled=True, strong_password=True, login_alert_enabled=True, main_provider="Codex", main_model="codex-mini-latest", main_endpoint="https://api.openai.com/v1", backup_provider="GLM", backup_model="glm-5.1", backup_endpoint="https://open.bigmodel.cn/api/paas/v4/", vlm_provider="Gemini", vlm_model="gemini-2.5-flash", vlm_endpoint="https://generativelanguage.googleapis.com/v1beta/openai/", embedding_provider="GLM", embedding_model="Embedding-3", embedding_endpoint="https://open.bigmodel.cn/api/paas/v4/", log_level="INFO", retention_days=180, archive_cycle="weekly", log_path="server/logs/app.log", alert_email=admin_email, operation_audit=True, login_audit=True, mask_sensitive=True, smtp_host="smtp.exmail.qq.com", smtp_port=465, smtp_encryption="SSL/TLS", sender_name=company_name, sender_address=admin_email, smtp_username=admin_email, alert_enabled=True, digest_enabled=False, digest_time="09:00", default_receiver=admin_email, ) @staticmethod def _replace_secret_if_present(secret_row: SystemSettingSecret, field_name: str, value: str) -> None: normalized = value.strip() if not normalized: return setattr(secret_row, field_name, encrypt_secret(normalized)) @staticmethod def _serialize(settings_row: SystemSetting, secrets_row: SystemSettingSecret) -> SettingsRead: return SettingsRead( companyForm={ "companyName": settings_row.company_name, "displayName": settings_row.display_name, "companyCode": settings_row.company_code, "recordNumber": settings_row.record_number, "copyright": settings_row.copyright_text, }, adminForm={ "adminAccount": settings_row.admin_account, "adminEmail": settings_row.admin_email, "newPassword": "", "confirmPassword": "", "sessionTimeout": settings_row.session_timeout, "noticeEmail": settings_row.notice_email, "mfaEnabled": settings_row.mfa_enabled, "strongPassword": settings_row.strong_password, "loginAlertEnabled": settings_row.login_alert_enabled, "adminPasswordConfigured": bool(secrets_row.admin_password_hash), }, llmForm={ "mainProvider": settings_row.main_provider, "mainModel": settings_row.main_model, "mainEndpoint": settings_row.main_endpoint, "mainApiKey": "", "mainApiKeyConfigured": bool(secrets_row.main_api_key_encrypted), "backupProvider": settings_row.backup_provider, "backupModel": settings_row.backup_model, "backupEndpoint": settings_row.backup_endpoint, "backupApiKey": "", "backupApiKeyConfigured": bool(secrets_row.backup_api_key_encrypted), "vlmProvider": settings_row.vlm_provider, "vlmModel": settings_row.vlm_model, "vlmEndpoint": settings_row.vlm_endpoint, "vlmApiKey": "", "vlmApiKeyConfigured": bool(secrets_row.vlm_api_key_encrypted), "embeddingProvider": settings_row.embedding_provider, "embeddingModel": settings_row.embedding_model, "embeddingEndpoint": settings_row.embedding_endpoint, "embeddingApiKey": "", "embeddingApiKeyConfigured": bool(secrets_row.embedding_api_key_encrypted), }, logForm={ "level": settings_row.log_level, "retentionDays": settings_row.retention_days, "archiveCycle": settings_row.archive_cycle, "logPath": settings_row.log_path, "alertEmail": settings_row.alert_email, "operationAudit": settings_row.operation_audit, "loginAudit": settings_row.login_audit, "maskSensitive": settings_row.mask_sensitive, }, mailForm={ "smtpHost": settings_row.smtp_host, "port": settings_row.smtp_port, "encryption": settings_row.smtp_encryption, "senderName": settings_row.sender_name, "senderAddress": settings_row.sender_address, "username": settings_row.smtp_username, "password": "", "passwordConfigured": bool(secrets_row.smtp_password_encrypted), "alertEnabled": settings_row.alert_enabled, "digestEnabled": settings_row.digest_enabled, "digestTime": settings_row.digest_time, "defaultReceiver": settings_row.default_receiver, }, )