from __future__ import annotations import logging from dataclasses import dataclass from datetime import datetime from sqlalchemy import inspect, text from sqlalchemy.orm import Session from app.core.admin_secret import legacy_admin_secret_to_password_hash, read_admin_secret, verify_admin_secret from app.core.config import get_settings from app.core.secret_box import decrypt_secret, encrypt_secret from app.core.security import hash_password, verify_password from app.db.base import Base from app.db.session import get_session_factory from app.models.system_model_setting import SystemModelSetting from app.models.system_setting import SystemSetting from app.models.system_setting_secret import SystemSettingSecret from app.repositories.settings import SETTINGS_ROW_ID, SettingsRepository from app.schemas.settings import SettingsRead, SettingsWrite from app.services.hermes_sync import ( HermesModelRoute, capture_hermes_config_snapshot, restore_hermes_config_snapshot, sync_hermes_model_settings, ) logger = logging.getLogger(__name__) @dataclass(frozen=True, slots=True) class ModelSlotConfig: provider_attr: str model_attr: str endpoint_attr: str legacy_secret_attr: str default_provider: str default_model: str default_endpoint: str capability: str priority: int MODEL_SLOT_CONFIGS = { "main": ModelSlotConfig( provider_attr="main_provider", model_attr="main_model", endpoint_attr="main_endpoint", legacy_secret_attr="main_api_key_encrypted", default_provider="Codex", default_model="codex-mini-latest", default_endpoint="https://api.openai.com/v1", capability="chat", priority=10, ), "backup": ModelSlotConfig( provider_attr="backup_provider", model_attr="backup_model", endpoint_attr="backup_endpoint", legacy_secret_attr="backup_api_key_encrypted", default_provider="GLM", default_model="glm-5.1", default_endpoint="https://open.bigmodel.cn/api/paas/v4/", capability="chat", priority=20, ), "vlm": ModelSlotConfig( provider_attr="vlm_provider", model_attr="vlm_model", endpoint_attr="vlm_endpoint", legacy_secret_attr="vlm_api_key_encrypted", default_provider="Gemini", default_model="gemini-2.5-flash", default_endpoint="https://generativelanguage.googleapis.com/v1beta/openai/", capability="chat", priority=30, ), "embedding": ModelSlotConfig( provider_attr="embedding_provider", model_attr="embedding_model", endpoint_attr="embedding_endpoint", legacy_secret_attr="embedding_api_key_encrypted", default_provider="GLM", default_model="Embedding-3", default_endpoint="https://open.bigmodel.cn/api/paas/v4/", capability="embedding", priority=40, ), } @dataclass(slots=True) class AdminCredentialRecord: account: str email: str password_hash: str @dataclass(frozen=True, slots=True) class OnlyOfficeRuntimeConfig: enabled: bool public_url: str backend_url: str jwt_secret: str class SettingsService: def __init__(self, db: Session) -> None: self.db = db self.repository = SettingsRepository(db) self.runtime_settings = get_settings() def ensure_settings_ready(self) -> tuple[SystemSetting, SystemSettingSecret]: Base.metadata.create_all(bind=self.db.get_bind()) self._ensure_settings_schema() settings_row = self.repository.get_settings() secrets_row = self.repository.get_secrets() should_commit = False legacy_admin = read_admin_secret() if settings_row is None: settings_row = self._build_default_settings() self.db.add(settings_row) should_commit = True if secrets_row is None: secrets_row = SystemSettingSecret(id=SETTINGS_ROW_ID) self.db.add(secrets_row) should_commit = True if legacy_admin is not None and not secrets_row.admin_password_hash: secrets_row.admin_password_hash = legacy_admin_secret_to_password_hash(legacy_admin) admin_username = str(legacy_admin.get("username", "")).strip() if admin_username and str(settings_row.admin_account or "").strip() in {"", "superadmin"}: settings_row.admin_account = admin_username should_commit = True if self._sync_onlyoffice_defaults(settings_row, secrets_row): should_commit = True if should_commit: self.db.commit() self.db.refresh(settings_row) self.db.refresh(secrets_row) return settings_row, secrets_row def ensure_model_settings_ready( self, settings_row: SystemSetting, secrets_row: SystemSettingSecret, ) -> dict[str, SystemModelSetting]: model_rows = {row.slot: row for row in self.repository.get_model_settings()} should_commit = False for slot, config in MODEL_SLOT_CONFIGS.items(): if slot in model_rows: continue model_row = SystemModelSetting( slot=slot, provider=str(getattr(settings_row, config.provider_attr, "") or config.default_provider), model_name=str(getattr(settings_row, config.model_attr, "") or config.default_model), endpoint=str(getattr(settings_row, config.endpoint_attr, "") or config.default_endpoint), capability=config.capability, priority=config.priority, enabled=True, api_key_encrypted=str(getattr(secrets_row, config.legacy_secret_attr, "") or ""), ) self.db.add(model_row) model_rows[slot] = model_row should_commit = True if should_commit: self.db.commit() for model_row in model_rows.values(): self.db.refresh(model_row) return model_rows def get_settings_snapshot(self) -> SettingsRead: settings_row, secrets_row = self.ensure_settings_ready() model_rows = self.ensure_model_settings_ready(settings_row, secrets_row) return self._serialize(settings_row, secrets_row, model_rows) def save_settings_snapshot(self, payload: SettingsWrite) -> SettingsRead: settings_row, secrets_row = self.ensure_settings_ready() model_rows = self.ensure_model_settings_ready(settings_row, secrets_row) if payload.adminForm.newPassword: if len(payload.adminForm.newPassword) < 5: raise ValueError("管理员密码至少需要 5 位。") if payload.adminForm.newPassword != payload.adminForm.confirmPassword: raise ValueError("两次输入的管理员密码不一致。") secrets_row.admin_password_hash = hash_password(payload.adminForm.newPassword) settings_row.company_name = payload.companyForm.companyName settings_row.display_name = payload.companyForm.displayName settings_row.company_code = payload.companyForm.companyCode settings_row.record_number = payload.companyForm.recordNumber settings_row.copyright_text = payload.companyForm.copyright settings_row.admin_account = payload.adminForm.adminAccount settings_row.admin_email = payload.adminForm.adminEmail settings_row.session_timeout = payload.adminForm.sessionTimeout settings_row.notice_email = payload.adminForm.noticeEmail settings_row.mfa_enabled = payload.adminForm.mfaEnabled settings_row.strong_password = payload.adminForm.strongPassword settings_row.login_alert_enabled = payload.adminForm.loginAlertEnabled self._apply_model_setting( model_rows["main"], payload.llmForm.mainProvider, payload.llmForm.mainModel, payload.llmForm.mainEndpoint, payload.llmForm.mainApiKey, ) self._apply_model_setting( model_rows["backup"], payload.llmForm.backupProvider, payload.llmForm.backupModel, payload.llmForm.backupEndpoint, payload.llmForm.backupApiKey, ) self._apply_model_setting( model_rows["vlm"], payload.llmForm.vlmProvider, payload.llmForm.vlmModel, payload.llmForm.vlmEndpoint, payload.llmForm.vlmApiKey, ) self._apply_model_setting( model_rows["embedding"], payload.llmForm.embeddingProvider, payload.llmForm.embeddingModel, payload.llmForm.embeddingEndpoint, payload.llmForm.embeddingApiKey, ) if payload.renderForm.enabled and not payload.renderForm.publicUrl: raise ValueError("启用 ONLYOFFICE 时必须配置服务地址。") if ( payload.renderForm.enabled and not payload.renderForm.jwtSecret and not secrets_row.onlyoffice_jwt_secret_encrypted ): raise ValueError("启用 ONLYOFFICE 时必须配置 JWT 密钥。") settings_row.main_provider = model_rows["main"].provider settings_row.main_model = model_rows["main"].model_name settings_row.main_endpoint = model_rows["main"].endpoint settings_row.backup_provider = model_rows["backup"].provider settings_row.backup_model = model_rows["backup"].model_name settings_row.backup_endpoint = model_rows["backup"].endpoint settings_row.vlm_provider = model_rows["vlm"].provider settings_row.vlm_model = model_rows["vlm"].model_name settings_row.vlm_endpoint = model_rows["vlm"].endpoint settings_row.embedding_provider = model_rows["embedding"].provider settings_row.embedding_model = model_rows["embedding"].model_name settings_row.embedding_endpoint = model_rows["embedding"].endpoint settings_row.onlyoffice_enabled = payload.renderForm.enabled settings_row.onlyoffice_public_url = payload.renderForm.publicUrl settings_row.log_level = payload.logForm.level settings_row.retention_days = payload.logForm.retentionDays settings_row.archive_cycle = payload.logForm.archiveCycle settings_row.log_path = payload.logForm.logPath settings_row.alert_email = payload.logForm.alertEmail settings_row.operation_audit = payload.logForm.operationAudit settings_row.login_audit = payload.logForm.loginAudit settings_row.mask_sensitive = payload.logForm.maskSensitive settings_row.smtp_host = payload.mailForm.smtpHost settings_row.smtp_port = payload.mailForm.port settings_row.smtp_encryption = payload.mailForm.encryption settings_row.sender_name = payload.mailForm.senderName settings_row.sender_address = payload.mailForm.senderAddress settings_row.smtp_username = payload.mailForm.username settings_row.alert_enabled = payload.mailForm.alertEnabled settings_row.digest_enabled = payload.mailForm.digestEnabled settings_row.digest_time = payload.mailForm.digestTime settings_row.default_receiver = payload.mailForm.defaultReceiver self._replace_secret_if_present( secrets_row, "onlyoffice_jwt_secret_encrypted", payload.renderForm.jwtSecret, ) self._replace_secret_if_present(secrets_row, "smtp_password_encrypted", payload.mailForm.password) hermes_snapshot = capture_hermes_config_snapshot() try: sync_hermes_model_settings( primary_route=self._build_hermes_model_route(model_rows["main"]), fallback_route=self._build_hermes_model_route(model_rows["backup"]), ) self.db.add(settings_row) self.db.add(secrets_row) for model_row in model_rows.values(): self.db.add(model_row) self.db.commit() except Exception: self.db.rollback() restore_hermes_config_snapshot(hermes_snapshot) raise self.db.refresh(settings_row) self.db.refresh(secrets_row) for model_row in model_rows.values(): self.db.refresh(model_row) return self._serialize(settings_row, secrets_row, model_rows) def load_saved_model_api_key(self, slot: str | None) -> str: if not slot or slot not in MODEL_SLOT_CONFIGS: return "" settings_row, secrets_row = self.ensure_settings_ready() model_rows = self.ensure_model_settings_ready(settings_row, secrets_row) encrypted_value = model_rows[slot].api_key_encrypted if not encrypted_value: return "" return self._decrypt_model_api_key(encrypted_value, slot=slot) def get_runtime_model_config(self, slot: str) -> dict[str, str]: if slot not in MODEL_SLOT_CONFIGS: raise ValueError("未知模型槽位。") settings_row, secrets_row = self.ensure_settings_ready() model_rows = self.ensure_model_settings_ready(settings_row, secrets_row) model_row = model_rows[slot] return { "slot": slot, "provider": model_row.provider, "model": model_row.model_name, "endpoint": model_row.endpoint, "apiKey": self.load_saved_model_api_key(slot), "capability": model_row.capability, } def get_admin_credentials(self) -> AdminCredentialRecord | None: settings_row, secrets_row = self.ensure_settings_ready() if secrets_row.admin_password_hash: return AdminCredentialRecord( account=settings_row.admin_account, email=settings_row.admin_email, password_hash=secrets_row.admin_password_hash, ) legacy_record = read_admin_secret() if legacy_record is None: return None username = str(legacy_record.get("username", "")).strip() email = str(settings_row.admin_email or self.runtime_settings.admin_email or "").strip() password_hash = "" # Legacy admin.json uses scrypt fields rather than the app password format. # The auth flow handles this file separately when no DB-backed admin password exists. if username or email: return AdminCredentialRecord(account=username, email=email, password_hash=password_hash) return None def verify_admin_login(self, identifier: str, password: str) -> AdminCredentialRecord | None: settings_row, secrets_row = self.ensure_settings_ready() normalized_identifier = identifier.casefold() if secrets_row.admin_password_hash: allowed_identifiers = { value.casefold() for value in [settings_row.admin_account, settings_row.admin_email] if value } if normalized_identifier not in allowed_identifiers: return None if not verify_password(password, secrets_row.admin_password_hash): return None return AdminCredentialRecord( account=settings_row.admin_account, email=settings_row.admin_email, password_hash=secrets_row.admin_password_hash, ) legacy_record = read_admin_secret() if legacy_record is None: return None admin_username = str(legacy_record.get("username", "")).strip() admin_email = str(settings_row.admin_email or self.runtime_settings.admin_email or "").strip() allowed_identifiers = { value.casefold() for value in [admin_username, admin_email] if value } if normalized_identifier not in allowed_identifiers: return None if not verify_admin_secret(password, legacy_record): return None return AdminCredentialRecord(account=admin_username, email=admin_email, password_hash="") def _build_default_settings(self) -> SystemSetting: current_year = datetime.now().year company_name = str(self.runtime_settings.company_name or "X-Financial").strip() or "X-Financial" company_code = str(self.runtime_settings.company_code or "XF-001").strip() or "XF-001" admin_email = str(self.runtime_settings.admin_email or "").strip() legacy_admin = read_admin_secret() or {} admin_account = str(legacy_admin.get("username", "")).strip() or "superadmin" return SystemSetting( id=SETTINGS_ROW_ID, company_name=company_name, display_name=company_name, company_code=company_code, record_number="", copyright_text=f"Copyright © 2024-{current_year} {company_name}. All Rights Reserved.", admin_account=admin_account, admin_email=admin_email, session_timeout=30, notice_email=admin_email, mfa_enabled=True, strong_password=True, login_alert_enabled=True, main_provider="Codex", main_model="codex-mini-latest", main_endpoint="https://api.openai.com/v1", backup_provider="GLM", backup_model="glm-5.1", backup_endpoint="https://open.bigmodel.cn/api/paas/v4/", vlm_provider="Gemini", vlm_model="gemini-2.5-flash", vlm_endpoint="https://generativelanguage.googleapis.com/v1beta/openai/", embedding_provider="GLM", embedding_model="Embedding-3", embedding_endpoint="https://open.bigmodel.cn/api/paas/v4/", onlyoffice_enabled=bool(self.runtime_settings.onlyoffice_enabled), onlyoffice_public_url=str(self.runtime_settings.onlyoffice_public_url or "").strip(), log_level="INFO", retention_days=180, archive_cycle="weekly", log_path="server/logs/app.log", alert_email=admin_email, operation_audit=True, login_audit=True, mask_sensitive=True, smtp_host="smtp.exmail.qq.com", smtp_port=465, smtp_encryption="SSL/TLS", sender_name=company_name, sender_address=admin_email, smtp_username=admin_email, alert_enabled=True, digest_enabled=False, digest_time="09:00", default_receiver=admin_email, ) @staticmethod def _replace_secret_if_present(secret_row: SystemSettingSecret, field_name: str, value: str) -> None: normalized = value.strip() if not normalized: return setattr(secret_row, field_name, encrypt_secret(normalized)) @staticmethod def _apply_model_setting( model_row: SystemModelSetting, provider: str, model_name: str, endpoint: str, api_key: str, ) -> None: model_row.provider = provider model_row.model_name = model_name model_row.endpoint = endpoint normalized_api_key = api_key.strip() if normalized_api_key: model_row.api_key_encrypted = encrypt_secret(normalized_api_key) def _build_hermes_model_route(self, model_row: SystemModelSetting) -> HermesModelRoute: api_key = self._decrypt_model_api_key(model_row.api_key_encrypted, slot=model_row.slot) return HermesModelRoute( provider_label=str(model_row.provider or "").strip(), model=str(model_row.model_name or "").strip(), endpoint=str(model_row.endpoint or "").strip(), api_key=api_key, ) def _decrypt_model_api_key(self, encrypted_value: str, *, slot: str) -> str: normalized_value = str(encrypted_value or "").strip() if not normalized_value: return "" try: return decrypt_secret(normalized_value) except ValueError: logger.warning("Skipping undecryptable model API key for slot=%s", slot) return "" def _ensure_settings_schema(self) -> None: bind = self.db.get_bind() inspector = inspect(bind) table_names = set(inspector.get_table_names()) migration_statements: list[str] = [] if "system_settings" in table_names: settings_columns = {column["name"] for column in inspector.get_columns("system_settings")} if "onlyoffice_enabled" not in settings_columns: migration_statements.append( "ALTER TABLE system_settings ADD COLUMN onlyoffice_enabled BOOLEAN DEFAULT FALSE" ) if "onlyoffice_public_url" not in settings_columns: migration_statements.append( "ALTER TABLE system_settings ADD COLUMN onlyoffice_public_url VARCHAR(512) DEFAULT ''" ) if "system_setting_secrets" in table_names: secret_columns = {column["name"] for column in inspector.get_columns("system_setting_secrets")} if "onlyoffice_jwt_secret_encrypted" not in secret_columns: migration_statements.append( "ALTER TABLE system_setting_secrets ADD COLUMN onlyoffice_jwt_secret_encrypted TEXT DEFAULT ''" ) for statement in migration_statements: self.db.execute(text(statement)) if migration_statements: self.db.commit() def _sync_onlyoffice_defaults( self, settings_row: SystemSetting, secrets_row: SystemSettingSecret, ) -> bool: should_commit = False runtime_public_url = str(self.runtime_settings.onlyoffice_public_url or "").strip() runtime_jwt_secret = str(self.runtime_settings.onlyoffice_jwt_secret or "").strip() if not str(settings_row.onlyoffice_public_url or "").strip() and runtime_public_url: settings_row.onlyoffice_public_url = runtime_public_url should_commit = True if not secrets_row.onlyoffice_jwt_secret_encrypted and runtime_jwt_secret: secrets_row.onlyoffice_jwt_secret_encrypted = encrypt_secret(runtime_jwt_secret) should_commit = True if ( not settings_row.onlyoffice_enabled and self.runtime_settings.onlyoffice_enabled and (runtime_public_url or runtime_jwt_secret) ): settings_row.onlyoffice_enabled = True should_commit = True return should_commit @staticmethod def _serialize( settings_row: SystemSetting, secrets_row: SystemSettingSecret, model_rows: dict[str, SystemModelSetting], ) -> SettingsRead: main_model = model_rows["main"] backup_model = model_rows["backup"] vlm_model = model_rows["vlm"] embedding_model = model_rows["embedding"] return SettingsRead( companyForm={ "companyName": settings_row.company_name, "displayName": settings_row.display_name, "companyCode": settings_row.company_code, "recordNumber": settings_row.record_number, "copyright": settings_row.copyright_text, }, adminForm={ "adminAccount": settings_row.admin_account, "adminEmail": settings_row.admin_email, "newPassword": "", "confirmPassword": "", "sessionTimeout": settings_row.session_timeout, "noticeEmail": settings_row.notice_email, "mfaEnabled": settings_row.mfa_enabled, "strongPassword": settings_row.strong_password, "loginAlertEnabled": settings_row.login_alert_enabled, "adminPasswordConfigured": bool(secrets_row.admin_password_hash), }, llmForm={ "mainProvider": main_model.provider, "mainModel": main_model.model_name, "mainEndpoint": main_model.endpoint, "mainApiKey": "", "mainApiKeyConfigured": bool(main_model.api_key_encrypted), "backupProvider": backup_model.provider, "backupModel": backup_model.model_name, "backupEndpoint": backup_model.endpoint, "backupApiKey": "", "backupApiKeyConfigured": bool(backup_model.api_key_encrypted), "vlmProvider": vlm_model.provider, "vlmModel": vlm_model.model_name, "vlmEndpoint": vlm_model.endpoint, "vlmApiKey": "", "vlmApiKeyConfigured": bool(vlm_model.api_key_encrypted), "embeddingProvider": embedding_model.provider, "embeddingModel": embedding_model.model_name, "embeddingEndpoint": embedding_model.endpoint, "embeddingApiKey": "", "embeddingApiKeyConfigured": bool(embedding_model.api_key_encrypted), }, renderForm={ "enabled": settings_row.onlyoffice_enabled, "publicUrl": settings_row.onlyoffice_public_url, "jwtSecret": "", "jwtSecretConfigured": bool(secrets_row.onlyoffice_jwt_secret_encrypted), }, logForm={ "level": settings_row.log_level, "retentionDays": settings_row.retention_days, "archiveCycle": settings_row.archive_cycle, "logPath": settings_row.log_path, "alertEmail": settings_row.alert_email, "operationAudit": settings_row.operation_audit, "loginAudit": settings_row.login_audit, "maskSensitive": settings_row.mask_sensitive, }, mailForm={ "smtpHost": settings_row.smtp_host, "port": settings_row.smtp_port, "encryption": settings_row.smtp_encryption, "senderName": settings_row.sender_name, "senderAddress": settings_row.sender_address, "username": settings_row.smtp_username, "password": "", "passwordConfigured": bool(secrets_row.smtp_password_encrypted), "alertEnabled": settings_row.alert_enabled, "digestEnabled": settings_row.digest_enabled, "digestTime": settings_row.digest_time, "defaultReceiver": settings_row.default_receiver, }, ) def resolve_onlyoffice_settings(db: Session | None = None) -> OnlyOfficeRuntimeConfig: runtime_settings = get_settings() owned_session = False session = db try: if session is None: session_factory = get_session_factory() session = session_factory() owned_session = True service = SettingsService(session) settings_row, secrets_row = service.ensure_settings_ready() jwt_secret = "" if secrets_row.onlyoffice_jwt_secret_encrypted: jwt_secret = decrypt_secret(secrets_row.onlyoffice_jwt_secret_encrypted) return OnlyOfficeRuntimeConfig( enabled=settings_row.onlyoffice_enabled, public_url=str(settings_row.onlyoffice_public_url or "").strip(), backend_url=str(runtime_settings.onlyoffice_backend_url or "").strip(), jwt_secret=jwt_secret, ) except Exception: return OnlyOfficeRuntimeConfig( enabled=bool(runtime_settings.onlyoffice_enabled), public_url=str(runtime_settings.onlyoffice_public_url or "").strip(), backend_url=str(runtime_settings.onlyoffice_backend_url or "").strip(), jwt_secret=str(runtime_settings.onlyoffice_jwt_secret or "").strip(), ) finally: if owned_session and session is not None: session.close()