from __future__ import annotations
import hashlib
import json
import mimetypes
import re
from dataclasses import asdict, dataclass
from datetime import UTC, datetime
from io import BytesIO
from pathlib import Path
from xml.sax.saxutils import escape
from zipfile import ZIP_DEFLATED, ZipFile
from openpyxl import load_workbook
from app.core.config import SERVER_DIR, get_settings
from app.services.agent_asset_finance_spreadsheets import build_communication_expense_workbook
from app.services.agent_asset_travel_spreadsheets import (
build_travel_allowance_workbook,
build_travel_grade_mapping_workbook,
build_travel_lodging_workbook_from_source,
build_travel_season_mapping_workbook,
build_travel_transport_class_workbook,
build_travel_transport_estimate_workbook,
build_xlsx_bytes_from_source_sheet,
)
RULE_SPREADSHEET_BLOCK_PATTERN = re.compile(
r"```rule-spreadsheet\s*(\{.*?\})\s*```",
re.DOTALL,
)
COMPANY_TRAVEL_EXPENSE_RULE_CODE = "rule.expense.company_travel_expense_reimbursement"
COMPANY_TRAVEL_EXPENSE_RULE_FILENAME = "差旅住宿费标准.xlsx"
COMPANY_TRAVEL_SOURCE_RULE_FILENAME = "公司差旅费报销规则.xlsx"
COMPANY_TRAVEL_ALLOWANCE_RULE_CODE = "rule.expense.company_travel_allowance_reimbursement"
COMPANY_TRAVEL_ALLOWANCE_RULE_FILENAME = "出差补助标准.xlsx"
COMPANY_TRAVEL_TRANSPORT_RULE_CODE = "rule.expense.company_travel_transport_class"
COMPANY_TRAVEL_TRANSPORT_RULE_FILENAME = "交通工具等级标准.xlsx"
COMPANY_TRAVEL_TRANSPORT_ESTIMATE_RULE_CODE = "rule.expense.company_travel_transport_estimate"
COMPANY_TRAVEL_TRANSPORT_ESTIMATE_RULE_FILENAME = "交通费用预估表.xlsx"
COMPANY_TRAVEL_GRADE_MAPPING_RULE_CODE = "rule.expense.company_travel_grade_mapping"
COMPANY_TRAVEL_GRADE_MAPPING_RULE_FILENAME = "差旅职级映射表.xlsx"
COMPANY_TRAVEL_SEASON_MAPPING_RULE_CODE = "rule.expense.company_travel_season_mapping"
COMPANY_TRAVEL_SEASON_MAPPING_RULE_FILENAME = "地区淡旺季映射表.xlsx"
COMPANY_COMMUNICATION_EXPENSE_RULE_CODE = "rule.expense.company_communication_expense_reimbursement"
COMPANY_COMMUNICATION_EXPENSE_RULE_FILENAME = "公司通信费报销规则.xlsx"
COMPANY_PREAPPROVAL_RULE_CODE = "rule.expense.company_preapproval_requirement"
COMPANY_PREAPPROVAL_RULE_FILENAME = "公司费用申请审批规则.xlsx"
TRAVEL_SPREADSHEET_RULE_CODES = {
COMPANY_TRAVEL_EXPENSE_RULE_CODE,
COMPANY_TRAVEL_ALLOWANCE_RULE_CODE,
COMPANY_TRAVEL_TRANSPORT_RULE_CODE,
COMPANY_TRAVEL_TRANSPORT_ESTIMATE_RULE_CODE,
COMPANY_TRAVEL_SEASON_MAPPING_RULE_CODE,
}
FINANCE_RULES_LIBRARY = "finance-rules"
RISK_RULES_LIBRARY = "risk-rules"
RULE_LIBRARY_NAMES = {FINANCE_RULES_LIBRARY, RISK_RULES_LIBRARY}
SPREADSHEET_MIME_TYPE = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
@dataclass(slots=True)
class RuleSpreadsheetMeta:
file_name: str
storage_key: str
mime_type: str
size_bytes: int
checksum: str
updated_at: str
updated_by: str
source: str = "upload"
class AgentAssetSpreadsheetManager:
def __init__(
self,
storage_root: Path | None = None,
rule_root: Path | None = None,
) -> None:
settings = get_settings()
self.storage_root = Path(storage_root or settings.resolved_storage_root_dir).resolve()
self.asset_root = (self.storage_root / "agent_assets").resolve()
self.rule_root = Path(rule_root or (SERVER_DIR / "rules")).resolve()
def ensure_rule_library_dirs(self) -> None:
for library in sorted(RULE_LIBRARY_NAMES):
(self.rule_root / library).mkdir(parents=True, exist_ok=True)
def store_spreadsheet(
self,
*,
asset_id: str,
version: str,
file_name: str,
content: bytes,
actor_name: str,
source: str = "upload",
) -> RuleSpreadsheetMeta:
return self.store_rule_library_spreadsheet_snapshot(
library=FINANCE_RULES_LIBRARY,
asset_id=asset_id,
version=version,
file_name=file_name,
content=content,
actor_name=actor_name,
source=source,
)
def store_rule_library_spreadsheet(
self,
*,
library: str,
file_name: str,
content: bytes,
actor_name: str,
source: str = "rule-library",
) -> RuleSpreadsheetMeta:
normalized_library = str(library or "").strip()
if normalized_library not in RULE_LIBRARY_NAMES:
raise ValueError("规则库目录不合法。")
normalized_name = Path(str(file_name or "").strip()).name.strip()
if not normalized_name:
raise ValueError("规则表文件名不能为空。")
if not content:
raise ValueError("规则表文件内容不能为空。")
self.ensure_rule_library_dirs()
relative_path = Path("rules") / normalized_library / normalized_name
target_path = (SERVER_DIR / relative_path).resolve()
try:
target_path.relative_to(self.rule_root)
except ValueError:
raise ValueError("规则库文件路径不合法。") from None
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(content)
mime_type = mimetypes.guess_type(normalized_name)[0] or SPREADSHEET_MIME_TYPE
return RuleSpreadsheetMeta(
file_name=normalized_name,
storage_key=relative_path.as_posix(),
mime_type=mime_type,
size_bytes=len(content),
checksum=hashlib.sha256(content).hexdigest(),
updated_at=datetime.now(UTC).isoformat(),
updated_by=str(actor_name or "system").strip() or "system",
source=source,
)
def store_rule_library_spreadsheet_snapshot(
self,
*,
library: str,
asset_id: str,
version: str,
file_name: str,
content: bytes,
actor_name: str,
source: str = "rule-library-version",
) -> RuleSpreadsheetMeta:
normalized_library = str(library or "").strip()
if normalized_library not in RULE_LIBRARY_NAMES:
raise ValueError("规则库目录不合法。")
raw_asset_id = str(asset_id or "").strip()
raw_version = str(version or "").strip()
normalized_asset_id = Path(raw_asset_id).name.strip()
normalized_version = Path(raw_version).name.strip()
normalized_name = Path(str(file_name or "").strip()).name.strip()
if (
not normalized_asset_id
or normalized_asset_id in {".", ".."}
or normalized_asset_id != raw_asset_id
):
raise ValueError("规则资产 ID 不合法。")
if (
not normalized_version
or normalized_version in {".", ".."}
or normalized_version != raw_version
):
raise ValueError("规则表版本号不合法。")
if not normalized_name:
raise ValueError("规则表文件名不能为空。")
if not content:
raise ValueError("规则表文件内容不能为空。")
self.ensure_rule_library_dirs()
relative_path = (
Path("rules")
/ normalized_library
/ ".versions"
/ normalized_asset_id
/ normalized_version
/ normalized_name
)
target_path = (SERVER_DIR / relative_path).resolve()
try:
target_path.relative_to(self.rule_root)
except ValueError:
raise ValueError("规则库版本文件路径不合法。") from None
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(content)
mime_type = mimetypes.guess_type(normalized_name)[0] or SPREADSHEET_MIME_TYPE
return RuleSpreadsheetMeta(
file_name=normalized_name,
storage_key=relative_path.as_posix(),
mime_type=mime_type,
size_bytes=len(content),
checksum=hashlib.sha256(content).hexdigest(),
updated_at=datetime.now(UTC).isoformat(),
updated_by=str(actor_name or "system").strip() or "system",
source=source,
)
def resolve_storage_path(self, storage_key: str) -> Path:
normalized = Path(str(storage_key or "").strip())
if not normalized.parts:
raise FileNotFoundError("规则表文件不存在。")
if normalized.parts[0] == "rules":
resolved = (SERVER_DIR / normalized).resolve()
allowed_root = self.rule_root
else:
resolved = (self.storage_root / normalized).resolve()
allowed_root = self.storage_root
try:
resolved.relative_to(allowed_root)
except ValueError:
raise FileNotFoundError("规则表文件不存在。") from None
return resolved
@staticmethod
def parse_version_markdown(markdown: str) -> RuleSpreadsheetMeta | None:
match = RULE_SPREADSHEET_BLOCK_PATTERN.search(str(markdown or ""))
if match is None:
return None
try:
payload = json.loads(match.group(1))
except json.JSONDecodeError:
return None
if not isinstance(payload, dict):
return None
return RuleSpreadsheetMeta(
file_name=str(payload.get("file_name") or "").strip(),
storage_key=str(payload.get("storage_key") or "").strip(),
mime_type=str(payload.get("mime_type") or SPREADSHEET_MIME_TYPE).strip()
or SPREADSHEET_MIME_TYPE,
size_bytes=int(payload.get("size_bytes") or 0),
checksum=str(payload.get("checksum") or "").strip(),
updated_at=str(payload.get("updated_at") or "").strip(),
updated_by=str(payload.get("updated_by") or "system").strip() or "system",
source=str(payload.get("source") or "upload").strip() or "upload",
)
@staticmethod
def build_version_markdown(
*,
rule_name: str,
version: str,
metadata: RuleSpreadsheetMeta,
) -> str:
sections = [
f"# {rule_name}",
"",
"## 规则载体",
"",
"- 详情类型:Excel 表格",
f"- 当前规则版本:`{version}`",
f"- 表格文件:`{metadata.file_name}`",
f"- 最近更新人:{metadata.updated_by}",
f"- 最近更新时间:{metadata.updated_at}",
"",
"## 使用说明",
"",
"- 管理员可直接在规则中心内联编辑 Excel 表格,并通过 ONLYOFFICE 回写新版本。",
"- 上传新的 Excel 文件后,会自动生成新的规则版本快照。",
"- 切换到历史版本时仅提供预览,不允许直接覆盖历史快照。",
"",
"```rule-spreadsheet",
json.dumps(asdict(metadata), ensure_ascii=False, indent=2),
"```",
]
return "\n".join(sections)
@staticmethod
def build_rule_document_config(
metadata: RuleSpreadsheetMeta,
*,
asset_version: str,
) -> dict[str, object]:
return {
"kind": "spreadsheet",
"file_name": metadata.file_name,
"mime_type": metadata.mime_type,
"size_bytes": metadata.size_bytes,
"checksum": metadata.checksum,
"updated_at": metadata.updated_at,
"updated_by": metadata.updated_by,
"source": metadata.source,
"asset_version": asset_version,
}
@staticmethod
def build_company_travel_rule_template() -> bytes:
return AgentAssetSpreadsheetManager.build_travel_lodging_rule_template()
@staticmethod
def build_travel_lodging_rule_template() -> bytes:
lodging_rows = [
["地区(城市)", "城市级别", "P0", "P1", "P2", "P3", "P4", "P5", "P6", "P7", "P8", "备注"],
["北京", "一线城市", 450, 450, 450, 450, 450, 450, 450, 500, 500, "中心城区按本标准执行"],
["上海", "一线城市", 450, 450, 450, 450, 450, 450, 450, 500, 500, "中心城区按本标准执行"],
["广州", "一线城市", 430, 430, 430, 430, 450, 450, 450, 500, 500, "广交会期间可按例外流程说明"],
["深圳", "一线城市", 430, 430, 430, 430, 450, 450, 450, 500, 500, "旺季需补充超标说明"],
["杭州", "二线城市", 380, 380, 380, 380, 430, 430, 430, 480, 480, ""],
["南京", "二线城市", 380, 380, 380, 380, 430, 430, 430, 480, 480, ""],
["成都", "二线城市", 380, 380, 380, 380, 430, 430, 430, 480, 480, ""],
["武汉", "二线城市", 380, 380, 380, 380, 430, 430, 430, 480, 480, ""],
["其他地区", "其他地区", 320, 320, 320, 320, 380, 380, 380, 450, 450, "未单列城市按其他地区执行"],
]
source_path = (
SERVER_DIR
/ "rules"
/ FINANCE_RULES_LIBRARY
/ COMPANY_TRAVEL_SOURCE_RULE_FILENAME
)
return build_travel_lodging_workbook_from_source(source_path, lodging_rows)
@staticmethod
def build_travel_allowance_rule_template() -> bytes:
return build_travel_allowance_workbook()
@staticmethod
def build_travel_transport_rule_template() -> bytes:
return build_travel_transport_class_workbook()
@staticmethod
def build_travel_grade_mapping_template() -> bytes:
return build_travel_grade_mapping_workbook()
@staticmethod
def build_travel_season_mapping_template() -> bytes:
source_path = (
SERVER_DIR
/ "rules"
/ FINANCE_RULES_LIBRARY
/ COMPANY_TRAVEL_SOURCE_RULE_FILENAME
)
return build_travel_season_mapping_workbook(source_path)
@staticmethod
def build_travel_transport_estimate_rule_template() -> bytes:
return build_travel_transport_estimate_workbook()
@staticmethod
def build_company_communication_rule_template() -> bytes:
return build_communication_expense_workbook()
@staticmethod
def _build_travel_source_sheet(
sheet_name: str,
*,
fallback_rows: list[list[object]],
) -> bytes:
source_path = (
SERVER_DIR
/ "rules"
/ FINANCE_RULES_LIBRARY
/ COMPANY_TRAVEL_SOURCE_RULE_FILENAME
)
if source_path.exists():
try:
return build_xlsx_bytes_from_source_sheet(source_path, sheet_name)
except (OSError, ValueError):
pass
return _build_xlsx_bytes([(sheet_name, fallback_rows)])
@staticmethod
def build_rule_workbook(sheets: list[tuple[str, list[list[object]]]]) -> bytes:
return _build_xlsx_bytes(sheets)
@staticmethod
def build_blank_rule_workbook(sheet_name: str = "规则配置") -> bytes:
return _build_xlsx_bytes(
[
(
sheet_name,
[
["规则项", "适用条件", "标准/阈值", "所需材料", "审批要求", "备注"],
["", "", "", "", "", ""],
],
)
]
)
@staticmethod
def rebuild_from_uploaded_content(content: bytes) -> bytes:
if not content:
raise ValueError("待导入的表格内容不能为空。")
try:
workbook = load_workbook(
filename=BytesIO(content),
read_only=False,
data_only=False,
)
except Exception as exc: # noqa: BLE001
raise ValueError("无法解析上传的 Excel 表格。") from exc
try:
if not workbook.worksheets:
raise ValueError("上传的 Excel 表格中没有可导入的工作表。")
rebuilt_buffer = BytesIO()
workbook.save(rebuilt_buffer)
return rebuilt_buffer.getvalue()
finally:
workbook.close()
def _build_xlsx_bytes(sheets: list[tuple[str, list[list[object]]]]) -> bytes:
created_at = datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
workbook_buffer = BytesIO()
with ZipFile(workbook_buffer, "w", ZIP_DEFLATED) as archive:
archive.writestr("[Content_Types].xml", _build_content_types_xml(sheets))
archive.writestr("_rels/.rels", _build_root_rels_xml())
archive.writestr("docProps/app.xml", _build_app_xml(sheets))
archive.writestr("docProps/core.xml", _build_core_xml(created_at))
archive.writestr("xl/workbook.xml", _build_workbook_xml(sheets))
archive.writestr("xl/_rels/workbook.xml.rels", _build_workbook_rels_xml(sheets))
archive.writestr("xl/styles.xml", _build_styles_xml())
for index, (_, rows) in enumerate(sheets, start=1):
archive.writestr(
f"xl/worksheets/sheet{index}.xml",
_build_sheet_xml(rows),
)
return workbook_buffer.getvalue()
def _build_content_types_xml(sheets: list[tuple[str, list[list[object]]]]) -> str:
overrides = [
(
'