feat(server): 新增费用规则运行时服务和系统赫尔墨斯服务,增强报销规则执行和系统监控能力

This commit is contained in:
caoxiaozhu
2026-05-15 06:58:03 +00:00
parent ea339d883a
commit 45abd36430
2 changed files with 737 additions and 0 deletions

View File

@@ -0,0 +1,77 @@
from __future__ import annotations
import os
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True, slots=True)
class HermesCliResult:
response_text: str
session_id: str = ""
command: tuple[str, ...] = ()
class SystemHermesService:
def __init__(self) -> None:
configured_bin = str(os.getenv("HERMES_BIN", "")).strip()
self.hermes_bin = configured_bin or shutil.which("hermes") or "/usr/local/bin/hermes"
def is_available(self) -> bool:
return Path(self.hermes_bin).exists()
def run_query(
self,
query: str,
*,
source: str = "tool",
max_turns: int = 1,
timeout_seconds: int = 180,
) -> HermesCliResult:
if not self.is_available():
raise RuntimeError(f"未找到系统 Hermes CLI{self.hermes_bin}")
command = (
self.hermes_bin,
"chat",
"-Q",
"--source",
source,
"--max-turns",
str(max_turns),
"-q",
query,
)
completed = subprocess.run(
command,
capture_output=True,
text=True,
timeout=timeout_seconds,
check=False,
)
if completed.returncode != 0:
detail = (completed.stderr or completed.stdout or "").strip()
raise RuntimeError(detail or "Hermes CLI 返回非 0 状态码。")
return self._parse_output(completed.stdout, command=command)
@staticmethod
def _parse_output(stdout: str, *, command: tuple[str, ...]) -> HermesCliResult:
lines = [line.rstrip() for line in str(stdout or "").splitlines()]
session_id = ""
response_lines: list[str] = []
for line in lines:
if line.startswith("session_id:"):
session_id = line.split(":", 1)[1].strip()
continue
response_lines.append(line)
response_text = "\n".join(line for line in response_lines if line.strip()).strip()
return HermesCliResult(
response_text=response_text,
session_id=session_id,
command=command,
)