from __future__ import annotations
import html
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from app.core.config import get_settings
@dataclass(frozen=True, slots=True)
class RenderedFinanceReport:
html_path: Path
pdf_path: Path
storage_key: str
title: str
page_count: int
class FinanceReportRenderer:
def render(self, context: dict[str, Any]) -> RenderedFinanceReport:
report_dir = self._report_dir(context)
report_dir.mkdir(parents=True, exist_ok=True)
title = str((context.get("period") or {}).get("title") or "财务经营报告")
html_text = self.render_html(context)
html_path = report_dir / "report.html"
pdf_path = report_dir / "report.pdf"
html_path.write_text(html_text, encoding="utf-8")
page_count = SimpleFinancePdfWriter().write(pdf_path, context)
return RenderedFinanceReport(
html_path=html_path,
pdf_path=pdf_path,
storage_key=self._storage_key(pdf_path),
title=title,
page_count=page_count,
)
def render_html(self, context: dict[str, Any]) -> str:
period = context.get("period") or {}
dashboard = context.get("dashboard") or {}
totals = dashboard.get("totals") if isinstance(dashboard.get("totals"), dict) else {}
trend = dashboard.get("trend") if isinstance(dashboard.get("trend"), dict) else {}
departments = list(dashboard.get("department_ranking") or [])
top_claims = list(dashboard.get("top_claims") or [])
actions = list(context.get("action_items") or [])
insights = list(context.get("insights") or [])
return f"""
{_e(period.get("title"))}
{_e(period.get("title"))}
周期:{_e(period.get("label"))} 生成时间:{_e(context.get("generated_at"))}
管理摘要
{''.join(f'{_e(item)}
' for item in insights)}
关键指标
{_metric_html("报销金额", _money(totals.get("reimbursementAmount")))}
{_metric_html("报销单数", f'{int(totals.get("reimbursementCount") or 0)} 单')}
{_metric_html("待付款", _money(totals.get("pendingPaymentAmount")))}
{_metric_html("预算使用率", f'{float(totals.get("budgetUsageRate") or 0):.1f}%')}
每日报销趋势
{_trend_html(trend)}
部门费用排行
{_ranking_html(departments, "amount")}
高额单据
{_top_claims_html(top_claims)}
行动清单
{_actions_html(actions)}
"""
def _report_dir(self, context: dict[str, Any]) -> Path:
settings = get_settings()
period = context.get("period") or {}
report_type = str(context.get("report_type") or "weekly")
label = re.sub(r"[^0-9A-Za-z\u4e00-\u9fff_-]+", "_", str(period.get("label") or "latest"))
return settings.resolved_storage_root_dir / "finance_reports" / report_type / label
@staticmethod
def _storage_key(pdf_path: Path) -> str:
root = get_settings().resolved_storage_root_dir.resolve()
return pdf_path.resolve().relative_to(root).as_posix()
class SimpleFinancePdfWriter:
width = 595
height = 842
margin = 48
def write(self, path: Path, context: dict[str, Any]) -> int:
pages = self._build_pages(context)
objects: list[bytes] = []
page_ids: list[int] = []
font_id = 3
for page in pages:
content = self._content_stream(page)
content_id = len(objects) + 4
page_id = len(objects) + 5
objects.append(
f"<< /Length {len(content)} >>\nstream\n".encode("latin-1")
+ content
+ b"\nendstream"
)
objects.append(
(
f"<< /Type /Page /Parent 2 0 R /MediaBox [0 0 {self.width} {self.height}] "
f"/Resources << /Font << /F1 {font_id} 0 R >> >> /Contents {content_id} 0 R >>"
).encode("latin-1")
)
page_ids.append(page_id)
catalog = b"<< /Type /Catalog /Pages 2 0 R >>"
kids = " ".join(f"{page_id} 0 R" for page_id in page_ids)
pages_obj = f"<< /Type /Pages /Kids [{kids}] /Count {len(page_ids)} >>".encode("latin-1")
font_obj = (
b"<< /Type /Font /Subtype /Type0 /BaseFont /STSong-Light "
b"/Encoding /UniGB-UCS2-H /DescendantFonts ["
b"<< /Type /Font /Subtype /CIDFontType0 /BaseFont /STSong-Light "
b"/CIDSystemInfo << /Registry (Adobe) /Ordering (GB1) /Supplement 5 >> >>] >>"
)
all_objects = [catalog, pages_obj, font_obj, *objects]
self._write_pdf(path, all_objects)
return len(pages)
def _build_pages(self, context: dict[str, Any]) -> list[list[dict[str, Any]]]:
period = context.get("period") or {}
dashboard = context.get("dashboard") or {}
totals = dashboard.get("totals") if isinstance(dashboard.get("totals"), dict) else {}
trend = dashboard.get("trend") if isinstance(dashboard.get("trend"), dict) else {}
departments = list(dashboard.get("department_ranking") or [])
actions = list(context.get("action_items") or [])
insights = list(context.get("insights") or [])
pages: list[list[dict[str, Any]]] = []
pages.append(
[
{"type": "title", "text": str(period.get("title") or "财务经营报告")},
{"type": "text", "text": f"报告周期:{period.get('label') or ''}"},
{"type": "text", "text": f"生成时间:{context.get('generated_at') or ''}"},
{"type": "heading", "text": "管理摘要"},
*[{"type": "bullet", "text": str(item)} for item in insights],
{"type": "heading", "text": "关键指标"},
{
"type": "metrics",
"items": [
("报销金额", _money(totals.get("reimbursementAmount"))),
("报销单数", f"{int(totals.get('reimbursementCount') or 0)} 单"),
("待付款", _money(totals.get("pendingPaymentAmount"))),
("预算使用率", f"{float(totals.get('budgetUsageRate') or 0):.1f}%"),
],
},
]
)
pages.append(
[
{"type": "heading", "text": "每日报销趋势"},
{
"type": "bars",
"labels": trend.get("labels") or [],
"values": trend.get("claimAmount") or [],
},
{"type": "heading", "text": "部门费用排行"},
{
"type": "bars",
"labels": [str(item.get("name") or "") for item in departments[:8]],
"values": [float(item.get("amount") or 0) for item in departments[:8]],
},
]
)
pages.append(
[
{"type": "heading", "text": "行动清单"},
*[
{
"type": "bullet",
"text": (
f"{item.get('title')} / {item.get('owner')}:"
f"{item.get('suggestion')}"
),
}
for item in actions
],
]
)
return pages
def _content_stream(self, blocks: list[dict[str, Any]]) -> bytes:
commands: list[str] = ["q", "1 1 1 rg 0 0 595 842 re f"]
y = self.height - self.margin
for block in blocks:
block_type = block["type"]
if block_type == "title":
commands.extend(self._text(block["text"], self.margin, y, 24, "0.05 0.15 0.35"))
y -= 42
elif block_type == "heading":
y -= 8
commands.extend(self._text(block["text"], self.margin, y, 15, "0.10 0.25 0.55"))
y -= 26
elif block_type == "text":
commands.extend(self._text(block["text"], self.margin, y, 10, "0.25 0.30 0.38"))
y -= 18
elif block_type == "bullet":
lines = self._wrap(str(block["text"]), 34)
for line in lines:
commands.extend(self._text(f"• {line}", self.margin, y, 10, "0.12 0.16 0.22"))
y -= 17
elif block_type == "metrics":
y = self._metrics(commands, block["items"], y)
elif block_type == "bars":
y = self._bars(commands, block.get("labels") or [], block.get("values") or [], y)
commands.append("Q")
return "\n".join(commands).encode("latin-1")
def _metrics(self, commands: list[str], items: list[tuple[str, str]], y: int) -> int:
box_w = 122
for index, (label, value) in enumerate(items):
x = self.margin + index * (box_w + 8)
commands.append("0.95 0.97 1.00 rg")
commands.append(f"{x} {y - 48} {box_w} 46 re f")
commands.extend(self._text(label, x + 8, y - 18, 8, "0.35 0.42 0.50"))
commands.extend(self._text(value, x + 8, y - 36, 13, "0.05 0.15 0.35"))
return y - 68
def _bars(self, commands: list[str], labels: list[Any], values: list[Any], y: int) -> int:
pairs = [
(str(label), float(value or 0))
for label, value in zip(labels, values, strict=False)
]
max_value = max([value for _label, value in pairs] or [1])
for label, value in pairs[:10]:
width = 310 * (value / max_value) if max_value else 0
commands.extend(self._text(_trim(label, 14), self.margin, y, 9, "0.25 0.30 0.38"))
commands.append("0.88 0.92 0.96 rg")
commands.append(f"{self.margin + 90} {y - 4} 320 8 re f")
commands.append("0.18 0.44 0.93 rg")
commands.append(f"{self.margin + 90} {y - 4} {width:.1f} 8 re f")
commands.extend(self._text(_money(value), self.margin + 420, y, 8, "0.25 0.30 0.38"))
y -= 22
return y - 8
@staticmethod
def _text(text: Any, x: int | float, y: int | float, size: int, color: str) -> list[str]:
return [
f"{color} rg",
"BT",
f"/F1 {size} Tf",
f"{x:.1f} {y:.1f} Td",
f"<{_pdf_hex(str(text))}> Tj",
"ET",
]
@staticmethod
def _wrap(text: str, length: int) -> list[str]:
value = str(text or "").strip()
return [value[index : index + length] for index in range(0, len(value), length)] or [""]
@staticmethod
def _write_pdf(path: Path, objects: list[bytes]) -> None:
offsets: list[int] = []
payload = bytearray(b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n")
for index, obj in enumerate(objects, start=1):
offsets.append(len(payload))
payload.extend(f"{index} 0 obj\n".encode("latin-1"))
payload.extend(obj)
payload.extend(b"\nendobj\n")
xref_at = len(payload)
payload.extend(f"xref\n0 {len(objects) + 1}\n0000000000 65535 f \n".encode("latin-1"))
for offset in offsets:
payload.extend(f"{offset:010d} 00000 n \n".encode("latin-1"))
payload.extend(
(
f"trailer\n<< /Size {len(objects) + 1} /Root 1 0 R >>\n"
f"startxref\n{xref_at}\n%%EOF"
).encode("latin-1")
)
path.write_bytes(bytes(payload))
def _metric_html(label: str, value: str) -> str:
return (
f'{_e(label)}
'
f'
{_e(value)}
'
)
def _trend_html(trend: dict[str, Any]) -> str:
labels = list(trend.get("labels") or [])
values = [float(value or 0) for value in list(trend.get("claimAmount") or [])]
return _bar_html(labels, values)
def _ranking_html(rows: list[dict[str, Any]], value_key: str) -> str:
labels = [str(item.get("name") or "") for item in rows[:8]]
values = [float(item.get(value_key) or 0) for item in rows[:8]]
return _bar_html(labels, values)
def _bar_html(labels: list[Any], values: list[float]) -> str:
max_value = max(values or [1])
rows = []
for label, value in zip(labels, values, strict=False):
width = 100 * value / max_value if max_value else 0
rows.append(
''
f'
{_e(label)}
'
f'
'
f'
{_e(_money(value))}
'
"
"
)
return "".join(rows) or '暂无数据
'
def _top_claims_html(rows: list[dict[str, Any]]) -> str:
body = "".join(
""
f"| {_e(item.get('claimNo'))} | "
f"{_e(item.get('employeeName'))} | "
f"{_e(item.get('departmentName'))} | "
f"{_e(item.get('amountLabel') or _money(item.get('amount')))} | "
"
"
for item in rows[:6]
)
return (
""
)
def _actions_html(rows: list[dict[str, Any]]) -> str:
if not rows:
return '暂无需要升级的行动项。
'
return "".join(
(
f'{_e(item.get("title"))}'
f'|{_e(item.get("owner"))}
{_e(item.get("suggestion"))}
'
)
for item in rows
)
def _e(value: Any) -> str:
return html.escape(str(value or ""))
def _money(value: Any) -> str:
try:
return f"¥{float(value or 0):,.0f}"
except (TypeError, ValueError):
return "¥0"
def _trim(value: str, max_len: int) -> str:
return value if len(value) <= max_len else value[: max_len - 1] + "…"
def _pdf_hex(value: str) -> str:
return value.encode("utf-16-be").hex().upper()