Files
X-Financial/server/src/app/services/settings.py

501 lines
21 KiB
Python
Raw Normal View History

feat: 完善系统配置、安全增强与知识库功能 - .env.example: API基础路径改为相对路径 /api/v1,支持代理转发 - README.md: 完善项目结构与启动说明文档 - docker-compose.yml: 新增Docker编排配置,支持容器化部署 - docker/: 新增Docker部署相关文档与配置 - server_start.sh: 重构启动脚本,添加容器环境检测、隔离虚拟环境路径、环境变量覆盖机制 - deps.py: 完善API依赖注入,增强权限验证逻辑 - admin_secret.py: 优化管理员密钥加密存储与验证 - config.py: 扩展配置管理,支持多环境变量绑定 - security.py: 增强安全模块,完善加密与认证机制 - db/base.py: 优化数据库基础架构与连接管理 - main.py: 更新应用入口,整合新模块路由 - models/: 完善系统模型配置,支持模型设置持久化 - repositories/settings.py: 优化设置仓储层,增强数据持久化 - services/settings.py: 重构设置服务,精简代码结构 - router.py: 更新API路由配置 - endpoints/knowledge.py: 新增知识库API端点 - schemas/knowledge.py: 新增知识库数据模型 - services/knowledge.py: 新增知识库业务逻辑 - storage/knowledge/.index.json: 知识库索引存储 - api.js: 完善API服务层,增强错误处理 - bootstrap.js: 优化前端初始化与引导流程 - useSetupView.js / useSystemState.js: 重构组合式函数 - TopBar.vue: 优化顶部导航栏组件 - SettingsView.vue: 重构设置页面UI,增强用户体验 - SetupView.vue / SetupRouteView.vue: 完善引导流程页面 - PoliciesView.vue: 优化策略视图组件 - vite.config.js: 更新Vite构建配置 - web_start.sh: 完善前端启动脚本 - views/scripts/: 优化各业务视图JS逻辑 - settings-view.css: 重构设置页面样式 - setup-view.css: 完善引导页样式 - policies-view.css: 优化策略页样式 - test_auth_service.py: 完善认证服务测试 - test_settings_persistence.py: 增强设置持久化测试 - document/: 新增开发文档与工作日志
2026-05-09 03:04:09 +00:00
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from sqlalchemy.orm import Session
from app.core.admin_secret import legacy_admin_secret_to_password_hash, read_admin_secret, verify_admin_secret
from app.core.config import get_settings
from app.core.secret_box import decrypt_secret, encrypt_secret
from app.core.security import hash_password, verify_password
from app.db.base import Base
from app.models.system_model_setting import SystemModelSetting
from app.models.system_setting import SystemSetting
from app.models.system_setting_secret import SystemSettingSecret
from app.repositories.settings import SETTINGS_ROW_ID, SettingsRepository
from app.schemas.settings import SettingsRead, SettingsWrite
@dataclass(frozen=True, slots=True)
class ModelSlotConfig:
provider_attr: str
model_attr: str
endpoint_attr: str
legacy_secret_attr: str
default_provider: str
default_model: str
default_endpoint: str
capability: str
priority: int
MODEL_SLOT_CONFIGS = {
"main": ModelSlotConfig(
provider_attr="main_provider",
model_attr="main_model",
endpoint_attr="main_endpoint",
legacy_secret_attr="main_api_key_encrypted",
default_provider="Codex",
default_model="codex-mini-latest",
default_endpoint="https://api.openai.com/v1",
capability="chat",
priority=10,
),
"backup": ModelSlotConfig(
provider_attr="backup_provider",
model_attr="backup_model",
endpoint_attr="backup_endpoint",
legacy_secret_attr="backup_api_key_encrypted",
default_provider="GLM",
default_model="glm-5.1",
default_endpoint="https://open.bigmodel.cn/api/paas/v4/",
capability="chat",
priority=20,
),
"vlm": ModelSlotConfig(
provider_attr="vlm_provider",
model_attr="vlm_model",
endpoint_attr="vlm_endpoint",
legacy_secret_attr="vlm_api_key_encrypted",
default_provider="Gemini",
default_model="gemini-2.5-flash",
default_endpoint="https://generativelanguage.googleapis.com/v1beta/openai/",
capability="chat",
priority=30,
),
"embedding": ModelSlotConfig(
provider_attr="embedding_provider",
model_attr="embedding_model",
endpoint_attr="embedding_endpoint",
legacy_secret_attr="embedding_api_key_encrypted",
default_provider="GLM",
default_model="Embedding-3",
default_endpoint="https://open.bigmodel.cn/api/paas/v4/",
capability="embedding",
priority=40,
),
}
@dataclass(slots=True)
class AdminCredentialRecord:
account: str
email: str
password_hash: str
class SettingsService:
def __init__(self, db: Session) -> None:
self.db = db
self.repository = SettingsRepository(db)
self.runtime_settings = get_settings()
def ensure_settings_ready(self) -> tuple[SystemSetting, SystemSettingSecret]:
Base.metadata.create_all(bind=self.db.get_bind())
settings_row = self.repository.get_settings()
secrets_row = self.repository.get_secrets()
should_commit = False
legacy_admin = read_admin_secret()
if settings_row is None:
settings_row = self._build_default_settings()
self.db.add(settings_row)
should_commit = True
if secrets_row is None:
secrets_row = SystemSettingSecret(id=SETTINGS_ROW_ID)
self.db.add(secrets_row)
should_commit = True
if legacy_admin is not None and not secrets_row.admin_password_hash:
secrets_row.admin_password_hash = legacy_admin_secret_to_password_hash(legacy_admin)
admin_username = str(legacy_admin.get("username", "")).strip()
if admin_username and str(settings_row.admin_account or "").strip() in {"", "superadmin"}:
settings_row.admin_account = admin_username
should_commit = True
if should_commit:
self.db.commit()
self.db.refresh(settings_row)
self.db.refresh(secrets_row)
return settings_row, secrets_row
def ensure_model_settings_ready(
self,
settings_row: SystemSetting,
secrets_row: SystemSettingSecret,
) -> dict[str, SystemModelSetting]:
model_rows = {row.slot: row for row in self.repository.get_model_settings()}
should_commit = False
for slot, config in MODEL_SLOT_CONFIGS.items():
if slot in model_rows:
continue
model_row = SystemModelSetting(
slot=slot,
provider=str(getattr(settings_row, config.provider_attr, "") or config.default_provider),
model_name=str(getattr(settings_row, config.model_attr, "") or config.default_model),
endpoint=str(getattr(settings_row, config.endpoint_attr, "") or config.default_endpoint),
capability=config.capability,
priority=config.priority,
enabled=True,
api_key_encrypted=str(getattr(secrets_row, config.legacy_secret_attr, "") or ""),
)
self.db.add(model_row)
model_rows[slot] = model_row
should_commit = True
if should_commit:
self.db.commit()
for model_row in model_rows.values():
self.db.refresh(model_row)
return model_rows
def get_settings_snapshot(self) -> SettingsRead:
settings_row, secrets_row = self.ensure_settings_ready()
model_rows = self.ensure_model_settings_ready(settings_row, secrets_row)
return self._serialize(settings_row, secrets_row, model_rows)
def save_settings_snapshot(self, payload: SettingsWrite) -> SettingsRead:
settings_row, secrets_row = self.ensure_settings_ready()
model_rows = self.ensure_model_settings_ready(settings_row, secrets_row)
if payload.adminForm.newPassword:
if len(payload.adminForm.newPassword) < 5:
raise ValueError("管理员密码至少需要 5 位。")
if payload.adminForm.newPassword != payload.adminForm.confirmPassword:
raise ValueError("两次输入的管理员密码不一致。")
secrets_row.admin_password_hash = hash_password(payload.adminForm.newPassword)
settings_row.company_name = payload.companyForm.companyName
settings_row.display_name = payload.companyForm.displayName
settings_row.company_code = payload.companyForm.companyCode
settings_row.record_number = payload.companyForm.recordNumber
settings_row.copyright_text = payload.companyForm.copyright
settings_row.admin_account = payload.adminForm.adminAccount
settings_row.admin_email = payload.adminForm.adminEmail
settings_row.session_timeout = payload.adminForm.sessionTimeout
settings_row.notice_email = payload.adminForm.noticeEmail
settings_row.mfa_enabled = payload.adminForm.mfaEnabled
settings_row.strong_password = payload.adminForm.strongPassword
settings_row.login_alert_enabled = payload.adminForm.loginAlertEnabled
self._apply_model_setting(
model_rows["main"],
payload.llmForm.mainProvider,
payload.llmForm.mainModel,
payload.llmForm.mainEndpoint,
payload.llmForm.mainApiKey,
)
self._apply_model_setting(
model_rows["backup"],
payload.llmForm.backupProvider,
payload.llmForm.backupModel,
payload.llmForm.backupEndpoint,
payload.llmForm.backupApiKey,
)
self._apply_model_setting(
model_rows["vlm"],
payload.llmForm.vlmProvider,
payload.llmForm.vlmModel,
payload.llmForm.vlmEndpoint,
payload.llmForm.vlmApiKey,
)
self._apply_model_setting(
model_rows["embedding"],
payload.llmForm.embeddingProvider,
payload.llmForm.embeddingModel,
payload.llmForm.embeddingEndpoint,
payload.llmForm.embeddingApiKey,
)
settings_row.main_provider = model_rows["main"].provider
settings_row.main_model = model_rows["main"].model_name
settings_row.main_endpoint = model_rows["main"].endpoint
settings_row.backup_provider = model_rows["backup"].provider
settings_row.backup_model = model_rows["backup"].model_name
settings_row.backup_endpoint = model_rows["backup"].endpoint
settings_row.vlm_provider = model_rows["vlm"].provider
settings_row.vlm_model = model_rows["vlm"].model_name
settings_row.vlm_endpoint = model_rows["vlm"].endpoint
settings_row.embedding_provider = model_rows["embedding"].provider
settings_row.embedding_model = model_rows["embedding"].model_name
settings_row.embedding_endpoint = model_rows["embedding"].endpoint
settings_row.log_level = payload.logForm.level
settings_row.retention_days = payload.logForm.retentionDays
settings_row.archive_cycle = payload.logForm.archiveCycle
settings_row.log_path = payload.logForm.logPath
settings_row.alert_email = payload.logForm.alertEmail
settings_row.operation_audit = payload.logForm.operationAudit
settings_row.login_audit = payload.logForm.loginAudit
settings_row.mask_sensitive = payload.logForm.maskSensitive
settings_row.smtp_host = payload.mailForm.smtpHost
settings_row.smtp_port = payload.mailForm.port
settings_row.smtp_encryption = payload.mailForm.encryption
settings_row.sender_name = payload.mailForm.senderName
settings_row.sender_address = payload.mailForm.senderAddress
settings_row.smtp_username = payload.mailForm.username
settings_row.alert_enabled = payload.mailForm.alertEnabled
settings_row.digest_enabled = payload.mailForm.digestEnabled
settings_row.digest_time = payload.mailForm.digestTime
settings_row.default_receiver = payload.mailForm.defaultReceiver
self._replace_secret_if_present(secrets_row, "smtp_password_encrypted", payload.mailForm.password)
self.db.add(settings_row)
self.db.add(secrets_row)
for model_row in model_rows.values():
self.db.add(model_row)
self.db.commit()
self.db.refresh(settings_row)
self.db.refresh(secrets_row)
for model_row in model_rows.values():
self.db.refresh(model_row)
return self._serialize(settings_row, secrets_row, model_rows)
def load_saved_model_api_key(self, slot: str | None) -> str:
if not slot or slot not in MODEL_SLOT_CONFIGS:
return ""
settings_row, secrets_row = self.ensure_settings_ready()
model_rows = self.ensure_model_settings_ready(settings_row, secrets_row)
encrypted_value = model_rows[slot].api_key_encrypted
if not encrypted_value:
return ""
return decrypt_secret(encrypted_value)
def get_admin_credentials(self) -> AdminCredentialRecord | None:
settings_row, secrets_row = self.ensure_settings_ready()
if secrets_row.admin_password_hash:
return AdminCredentialRecord(
account=settings_row.admin_account,
email=settings_row.admin_email,
password_hash=secrets_row.admin_password_hash,
)
legacy_record = read_admin_secret()
if legacy_record is None:
return None
username = str(legacy_record.get("username", "")).strip()
email = str(settings_row.admin_email or self.runtime_settings.admin_email or "").strip()
password_hash = ""
# Legacy admin.json uses scrypt fields rather than the app password format.
# The auth flow handles this file separately when no DB-backed admin password exists.
if username or email:
return AdminCredentialRecord(account=username, email=email, password_hash=password_hash)
return None
def verify_admin_login(self, identifier: str, password: str) -> AdminCredentialRecord | None:
settings_row, secrets_row = self.ensure_settings_ready()
normalized_identifier = identifier.casefold()
if secrets_row.admin_password_hash:
allowed_identifiers = {
value.casefold()
for value in [settings_row.admin_account, settings_row.admin_email]
if value
}
if normalized_identifier not in allowed_identifiers:
return None
if not verify_password(password, secrets_row.admin_password_hash):
return None
return AdminCredentialRecord(
account=settings_row.admin_account,
email=settings_row.admin_email,
password_hash=secrets_row.admin_password_hash,
)
legacy_record = read_admin_secret()
if legacy_record is None:
return None
admin_username = str(legacy_record.get("username", "")).strip()
admin_email = str(settings_row.admin_email or self.runtime_settings.admin_email or "").strip()
allowed_identifiers = {
value.casefold()
for value in [admin_username, admin_email]
if value
}
if normalized_identifier not in allowed_identifiers:
return None
if not verify_admin_secret(password, legacy_record):
return None
return AdminCredentialRecord(account=admin_username, email=admin_email, password_hash="")
def _build_default_settings(self) -> SystemSetting:
current_year = datetime.now().year
company_name = str(self.runtime_settings.company_name or "X-Financial").strip() or "X-Financial"
company_code = str(self.runtime_settings.company_code or "XF-001").strip() or "XF-001"
admin_email = str(self.runtime_settings.admin_email or "").strip()
legacy_admin = read_admin_secret() or {}
admin_account = str(legacy_admin.get("username", "")).strip() or "superadmin"
return SystemSetting(
id=SETTINGS_ROW_ID,
company_name=company_name,
display_name=company_name,
company_code=company_code,
record_number="",
copyright_text=f"Copyright © 2024-{current_year} {company_name}. All Rights Reserved.",
admin_account=admin_account,
admin_email=admin_email,
session_timeout=30,
notice_email=admin_email,
mfa_enabled=True,
strong_password=True,
login_alert_enabled=True,
main_provider="Codex",
main_model="codex-mini-latest",
main_endpoint="https://api.openai.com/v1",
backup_provider="GLM",
backup_model="glm-5.1",
backup_endpoint="https://open.bigmodel.cn/api/paas/v4/",
vlm_provider="Gemini",
vlm_model="gemini-2.5-flash",
vlm_endpoint="https://generativelanguage.googleapis.com/v1beta/openai/",
embedding_provider="GLM",
embedding_model="Embedding-3",
embedding_endpoint="https://open.bigmodel.cn/api/paas/v4/",
log_level="INFO",
retention_days=180,
archive_cycle="weekly",
log_path="server/logs/app.log",
alert_email=admin_email,
operation_audit=True,
login_audit=True,
mask_sensitive=True,
smtp_host="smtp.exmail.qq.com",
smtp_port=465,
smtp_encryption="SSL/TLS",
sender_name=company_name,
sender_address=admin_email,
smtp_username=admin_email,
alert_enabled=True,
digest_enabled=False,
digest_time="09:00",
default_receiver=admin_email,
)
@staticmethod
def _replace_secret_if_present(secret_row: SystemSettingSecret, field_name: str, value: str) -> None:
normalized = value.strip()
if not normalized:
return
setattr(secret_row, field_name, encrypt_secret(normalized))
@staticmethod
def _apply_model_setting(
model_row: SystemModelSetting,
provider: str,
model_name: str,
endpoint: str,
api_key: str,
) -> None:
model_row.provider = provider
model_row.model_name = model_name
model_row.endpoint = endpoint
normalized_api_key = api_key.strip()
if normalized_api_key:
model_row.api_key_encrypted = encrypt_secret(normalized_api_key)
@staticmethod
def _serialize(
settings_row: SystemSetting,
secrets_row: SystemSettingSecret,
model_rows: dict[str, SystemModelSetting],
) -> SettingsRead:
main_model = model_rows["main"]
backup_model = model_rows["backup"]
vlm_model = model_rows["vlm"]
embedding_model = model_rows["embedding"]
return SettingsRead(
companyForm={
"companyName": settings_row.company_name,
"displayName": settings_row.display_name,
"companyCode": settings_row.company_code,
"recordNumber": settings_row.record_number,
"copyright": settings_row.copyright_text,
},
adminForm={
"adminAccount": settings_row.admin_account,
"adminEmail": settings_row.admin_email,
"newPassword": "",
"confirmPassword": "",
"sessionTimeout": settings_row.session_timeout,
"noticeEmail": settings_row.notice_email,
"mfaEnabled": settings_row.mfa_enabled,
"strongPassword": settings_row.strong_password,
"loginAlertEnabled": settings_row.login_alert_enabled,
"adminPasswordConfigured": bool(secrets_row.admin_password_hash),
},
llmForm={
"mainProvider": main_model.provider,
"mainModel": main_model.model_name,
"mainEndpoint": main_model.endpoint,
"mainApiKey": "",
"mainApiKeyConfigured": bool(main_model.api_key_encrypted),
"backupProvider": backup_model.provider,
"backupModel": backup_model.model_name,
"backupEndpoint": backup_model.endpoint,
"backupApiKey": "",
"backupApiKeyConfigured": bool(backup_model.api_key_encrypted),
"vlmProvider": vlm_model.provider,
"vlmModel": vlm_model.model_name,
"vlmEndpoint": vlm_model.endpoint,
"vlmApiKey": "",
"vlmApiKeyConfigured": bool(vlm_model.api_key_encrypted),
"embeddingProvider": embedding_model.provider,
"embeddingModel": embedding_model.model_name,
"embeddingEndpoint": embedding_model.endpoint,
"embeddingApiKey": "",
"embeddingApiKeyConfigured": bool(embedding_model.api_key_encrypted),
},
logForm={
"level": settings_row.log_level,
"retentionDays": settings_row.retention_days,
"archiveCycle": settings_row.archive_cycle,
"logPath": settings_row.log_path,
"alertEmail": settings_row.alert_email,
"operationAudit": settings_row.operation_audit,
"loginAudit": settings_row.login_audit,
"maskSensitive": settings_row.mask_sensitive,
},
mailForm={
"smtpHost": settings_row.smtp_host,
"port": settings_row.smtp_port,
"encryption": settings_row.smtp_encryption,
"senderName": settings_row.sender_name,
"senderAddress": settings_row.sender_address,
"username": settings_row.smtp_username,
"password": "",
"passwordConfigured": bool(secrets_row.smtp_password_encrypted),
"alertEnabled": settings_row.alert_enabled,
"digestEnabled": settings_row.digest_enabled,
"digestTime": settings_row.digest_time,
"defaultReceiver": settings_row.default_receiver,
},
)