feat: add system settings with model connectivity and encrypted storage
This commit is contained in:
91
server/src/app/core/secret_box.py
Normal file
91
server/src/app/core/secret_box.py
Normal file
@@ -0,0 +1,91 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import secrets
|
||||
from pathlib import Path
|
||||
|
||||
from app.core.config import SERVER_DIR
|
||||
|
||||
SECRET_KEY_FILE = SERVER_DIR / ".secrets" / "settings.key"
|
||||
SECRET_BOX_VERSION = "v1"
|
||||
KEY_BYTES = 32
|
||||
NONCE_BYTES = 16
|
||||
MAC_BYTES = 32
|
||||
BLOCK_BYTES = 32
|
||||
|
||||
|
||||
def _ensure_secret_dir(path: Path) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def get_or_create_secret_key() -> bytes:
|
||||
_ensure_secret_dir(SECRET_KEY_FILE)
|
||||
|
||||
if SECRET_KEY_FILE.exists():
|
||||
encoded = SECRET_KEY_FILE.read_text(encoding="utf-8").strip()
|
||||
if encoded:
|
||||
return base64.urlsafe_b64decode(encoded.encode("ascii"))
|
||||
|
||||
secret_key = secrets.token_bytes(KEY_BYTES)
|
||||
encoded = base64.urlsafe_b64encode(secret_key).decode("ascii")
|
||||
SECRET_KEY_FILE.write_text(encoded, encoding="utf-8")
|
||||
return secret_key
|
||||
|
||||
|
||||
def _keystream(secret_key: bytes, nonce: bytes, length: int) -> bytes:
|
||||
chunks: list[bytes] = []
|
||||
counter = 0
|
||||
|
||||
while sum(len(chunk) for chunk in chunks) < length:
|
||||
block = hmac.new(
|
||||
secret_key,
|
||||
b"stream:" + nonce + counter.to_bytes(4, "big"),
|
||||
hashlib.sha256,
|
||||
).digest()
|
||||
chunks.append(block)
|
||||
counter += 1
|
||||
|
||||
return b"".join(chunks)[:length]
|
||||
|
||||
|
||||
def encrypt_secret(value: str) -> str:
|
||||
if not value:
|
||||
return ""
|
||||
|
||||
secret_key = get_or_create_secret_key()
|
||||
nonce = secrets.token_bytes(NONCE_BYTES)
|
||||
plaintext = value.encode("utf-8")
|
||||
ciphertext = bytes(a ^ b for a, b in zip(plaintext, _keystream(secret_key, nonce, len(plaintext)), strict=False))
|
||||
mac = hmac.new(secret_key, b"mac:" + nonce + ciphertext, hashlib.sha256).digest()
|
||||
|
||||
encoded_nonce = base64.urlsafe_b64encode(nonce).decode("ascii")
|
||||
encoded_ciphertext = base64.urlsafe_b64encode(ciphertext).decode("ascii")
|
||||
encoded_mac = base64.urlsafe_b64encode(mac).decode("ascii")
|
||||
return f"{SECRET_BOX_VERSION}${encoded_nonce}${encoded_ciphertext}${encoded_mac}"
|
||||
|
||||
|
||||
def decrypt_secret(value: str) -> str:
|
||||
if not value:
|
||||
return ""
|
||||
|
||||
try:
|
||||
version, encoded_nonce, encoded_ciphertext, encoded_mac = value.split("$", 3)
|
||||
except ValueError as exc:
|
||||
raise ValueError("Invalid secret payload format") from exc
|
||||
|
||||
if version != SECRET_BOX_VERSION:
|
||||
raise ValueError("Unsupported secret payload version")
|
||||
|
||||
secret_key = get_or_create_secret_key()
|
||||
nonce = base64.urlsafe_b64decode(encoded_nonce.encode("ascii"))
|
||||
ciphertext = base64.urlsafe_b64decode(encoded_ciphertext.encode("ascii"))
|
||||
expected_mac = base64.urlsafe_b64decode(encoded_mac.encode("ascii"))
|
||||
actual_mac = hmac.new(secret_key, b"mac:" + nonce + ciphertext, hashlib.sha256).digest()
|
||||
|
||||
if not hmac.compare_digest(actual_mac, expected_mac):
|
||||
raise ValueError("Secret payload integrity check failed")
|
||||
|
||||
plaintext = bytes(a ^ b for a, b in zip(ciphertext, _keystream(secret_key, nonce, len(ciphertext)), strict=False))
|
||||
return plaintext.decode("utf-8")
|
||||
Reference in New Issue
Block a user