Files
X-Financial/server/src/app/services/finance_dashboard.py

885 lines
35 KiB
Python
Raw Normal View History

from __future__ import annotations
import re
from collections import defaultdict
from datetime import UTC, date, datetime, time, timedelta
from decimal import Decimal
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models.budget import BudgetAllocation
from app.models.financial_record import ExpenseClaim
from app.schemas.finance_dashboard import FinanceDashboardRead
from app.services.budget_support import BudgetSupportMixin
from app.services.demo_company_simulation_filters import is_finance_reimbursement_claim
from app.services.expense_claim_constants import EXPENSE_TYPE_LABELS
from app.services.finance_dashboard_constants import (
CHART_COLORS,
EMPTY_DONUT,
EXCLUDED_SPEND_STATUSES,
EXPENSE_TYPE_ALIASES,
PENDING_STATUSES,
RISK_SIGNAL_LABELS,
SLA_TARGET_HOURS,
STAGE_LABELS,
SUCCESS_STATUSES,
)
class FinanceDashboardService(BudgetSupportMixin):
def __init__(self, db: Session) -> None:
self.db = db
def build_dashboard(
self,
*,
range_key: str = "近10日",
start_date: date | None = None,
end_date: date | None = None,
trend_range: str = "近12天",
department_range: str = "本月",
) -> FinanceDashboardRead:
now = datetime.now(UTC)
start, end, resolved_key = self._resolve_scope(
range_key=range_key,
start_date=start_date,
end_date=end_date,
now=now,
)
previous_start = start - (end - start)
trend_start, trend_end, trend_labels = self._resolve_trend_scope(
trend_range,
now,
fallback_start=start,
fallback_end=end,
)
ranking_start, ranking_end = self._resolve_ranking_scope(
department_range,
now,
fallback_start=start,
fallback_end=end,
)
claims = [
claim for claim in self._fetch_claims() if is_finance_reimbursement_claim(claim)
]
scope_claims = self._claims_between(claims, start, end)
previous_claims = self._claims_between(claims, previous_start, start)
trend_claims = self._claims_between(claims, trend_start, trend_end)
ranking_claims = self._claims_between(claims, ranking_start, ranking_end)
totals = self._totals(scope_claims)
previous_totals = self._totals(previous_claims)
return FinanceDashboardRead(
range_key=resolved_key,
start_date=start.date().isoformat(),
end_date=(end - timedelta(days=1)).date().isoformat(),
generated_at=now.isoformat(),
has_real_data=bool(claims or self._fetch_budget_allocations(now.year)),
totals=totals,
metric_meta=self._metric_meta(totals, previous_totals),
trend=self._trend(trend_labels, trend_claims, now),
spend_by_category=self._spend_by_category(scope_claims),
exception_mix=self._payment_status_mix(scope_claims),
department_ranking=self._department_ranking(ranking_claims),
department_employee_mix=self._department_employee_mix(ranking_claims),
employee_ranking=self._employee_ranking(ranking_claims),
top_claims=self._top_claims(ranking_claims),
bottlenecks=self._bottlenecks(scope_claims),
budget_summary=self._budget_summary(now.year),
budget_metrics=self._budget_metrics(now.year),
)
def _fetch_claims(self) -> list[ExpenseClaim]:
stmt = select(ExpenseClaim).order_by(ExpenseClaim.created_at.asc())
return list(self.db.scalars(stmt).all())
def _fetch_budget_allocations(self, fiscal_year: int) -> list[BudgetAllocation]:
stmt = (
select(BudgetAllocation)
.where(BudgetAllocation.fiscal_year == fiscal_year)
.order_by(BudgetAllocation.period_key.asc())
)
return list(self.db.scalars(stmt).all())
def _resolve_scope(
self,
*,
range_key: str,
start_date: date | None,
end_date: date | None,
now: datetime,
) -> tuple[datetime, datetime, str]:
today = now.date()
normalized_key = str(range_key or "").strip() or "近10日"
if start_date and end_date:
start_day = min(start_date, end_date)
end_day = max(start_date, end_date)
return self._day_start(start_day), self._day_after(end_day), "自定义"
if normalized_key == "今日":
start_day = today
elif normalized_key == "本周":
start_day = today - timedelta(days=today.weekday())
elif normalized_key == "本月":
start_day = today.replace(day=1)
else:
days = self._days_from_label(normalized_key, default=10)
start_day = today - timedelta(days=days - 1)
return self._day_start(start_day), self._day_after(today), normalized_key
def _resolve_trend_scope(
self,
trend_range: str,
now: datetime,
*,
fallback_start: datetime | None = None,
fallback_end: datetime | None = None,
) -> tuple[datetime, datetime, list[str]]:
today = now.date()
key = str(trend_range or "").strip()
if key in {"custom", "自定义"} and fallback_start and fallback_end:
start_day = fallback_start.date()
end_day = (fallback_end - timedelta(days=1)).date()
elif key == "今日":
start_day = today
end_day = today
elif key == "本周":
start_day = today - timedelta(days=today.weekday())
end_day = today
elif key == "本月":
start_day = today.replace(day=1)
end_day = today
else:
days = self._days_from_label(trend_range, default=12)
end_day = today
start_day = end_day - timedelta(days=days - 1)
if start_day > end_day:
start_day, end_day = end_day, start_day
days = max(1, (end_day - start_day).days + 1)
labels = [self._date_label(start_day + timedelta(days=index)) for index in range(days)]
return self._day_start(start_day), self._day_after(end_day), labels
def _resolve_ranking_scope(
self,
department_range: str,
now: datetime,
*,
fallback_start: datetime | None = None,
fallback_end: datetime | None = None,
) -> tuple[datetime, datetime]:
today = now.date()
key = str(department_range or "").strip()
if key in {"custom", "自定义"} and fallback_start and fallback_end:
return fallback_start, fallback_end
if key == "今日":
return self._day_start(today), self._day_after(today)
if key == "本周":
start_day = today - timedelta(days=today.weekday())
return self._day_start(start_day), self._day_after(today)
if key == "全部":
return datetime(1970, 1, 1, tzinfo=UTC), self._day_after(today)
if key == "本季度":
quarter_month = ((today.month - 1) // 3) * 3 + 1
return self._day_start(today.replace(month=quarter_month, day=1)), self._day_after(today)
if key == "本年":
return self._day_start(today.replace(month=1, day=1)), self._day_after(today)
if key == "本月":
return self._day_start(today.replace(day=1)), self._day_after(today)
if re.search(r"\d+", key):
days = self._days_from_label(key, default=10)
start_day = today - timedelta(days=days - 1)
return self._day_start(start_day), self._day_after(today)
if key == "全部":
return datetime(1970, 1, 1, tzinfo=UTC), self._day_after(today)
if key == "本季度":
quarter_month = ((today.month - 1) // 3) * 3 + 1
start_day = today.replace(month=quarter_month, day=1)
elif key == "本年":
start_day = today.replace(month=1, day=1)
else:
start_day = today.replace(day=1)
return self._day_start(start_day), self._day_after(today)
def _claims_between(
self,
claims: list[ExpenseClaim],
start: datetime,
end: datetime,
) -> list[ExpenseClaim]:
return [claim for claim in claims if start <= self._claim_time(claim) < end]
def _totals(
self,
claims: list[ExpenseClaim],
) -> dict[str, Any]:
active_claims = [
claim for claim in claims if self._status(claim) not in {"draft", "deleted"}
]
spend_claims = [
claim for claim in active_claims if self._status(claim) not in EXCLUDED_SPEND_STATUSES
]
pending_payment_claims = [
claim for claim in spend_claims if self._status(claim) == "pending_payment"
]
paid_claims = [claim for claim in spend_claims if self._status(claim) == "paid"]
total_amount = sum((self._claim_amount(claim) for claim in spend_claims), Decimal("0.00"))
pending_payment_amount = sum(
(self._claim_amount(claim) for claim in pending_payment_claims),
Decimal("0.00"),
)
budget_summary = self._budget_summary(datetime.now(UTC).year)
avg_amount = (
total_amount / Decimal(str(len(spend_claims)))
if spend_claims
else Decimal("0.00")
)
return {
"reimbursementAmount": self._decimal_number(total_amount),
"reimbursementCount": len(spend_claims),
"pendingPaymentAmount": self._decimal_number(pending_payment_amount),
"avgClaimAmount": self._decimal_number(avg_amount),
"budgetUsageRate": float(budget_summary.get("ratio") or 0),
"paymentClearanceRate": self._percent(len(paid_claims), len(spend_claims)),
}
def _metric_meta(self, current: dict[str, Any], previous: dict[str, Any]) -> dict[str, Any]:
unit_by_key = {
"reimbursementAmount": "",
"reimbursementCount": "",
"pendingPaymentAmount": "",
"avgClaimAmount": "",
"budgetUsageRate": "%",
"paymentClearanceRate": "%",
}
meta: dict[str, Any] = {}
for key, current_value in current.items():
previous_value = Decimal(str(previous.get(key, 0) or 0))
value = Decimal(str(current_value or 0))
diff = value - previous_value
change = self._change_percent(value, previous_value)
unit = unit_by_key.get(key, "")
meta[key] = {
"changeText": f"{'+' if change >= 0 else ''}{change:.1f}%",
"delta": f"较上一周期 {'+' if diff >= 0 else ''}{self._format_delta(diff, unit)}",
"trend": "up" if diff >= 0 else "down",
}
return meta
def _trend(
self,
labels: list[str],
claims: list[ExpenseClaim],
now: datetime,
) -> dict[str, Any]:
claim_count = [0 for _ in labels]
claim_amount = [Decimal("0.00") for _ in labels]
success_count = [0 for _ in labels]
category_amounts: dict[str, list[Decimal]] = {}
category_totals: dict[str, Decimal] = defaultdict(Decimal)
hours: list[list[Decimal]] = [[] for _ in labels]
index = {label: idx for idx, label in enumerate(labels)}
for claim in claims:
if self._status(claim) in EXCLUDED_SPEND_STATUSES:
continue
label = self._date_label(self._claim_time(claim).date())
if label not in index:
continue
bucket = index[label]
amount = self._claim_amount(claim)
category = self._expense_type_label(claim.expense_type)
claim_count[bucket] += 1
claim_amount[bucket] += amount
category_amounts.setdefault(category, [Decimal("0.00") for _ in labels])[bucket] += amount
category_totals[category] += amount
if self._status(claim) in SUCCESS_STATUSES:
success_count[bucket] += 1
if claim.submitted_at:
hours[bucket].append(self._claim_sla_hours(claim, now))
return {
"labels": labels,
"claimCount": claim_count,
"claimAmount": [self._decimal_number(value) for value in claim_amount],
"categoryAmountSeries": [
{
"name": name,
"color": CHART_COLORS[index % len(CHART_COLORS)],
"data": [self._decimal_number(value) for value in category_amounts[name]],
"total": self._decimal_number(category_totals[name]),
}
for index, name in enumerate(
sorted(category_amounts, key=lambda item: category_totals[item], reverse=True)[:6]
)
],
"successCount": success_count,
# 兼容旧前端字段;新财务看板不再使用审批趋势语义。
"applications": claim_count,
"approved": success_count,
"avgHours": [self._decimal_number(self._average(row)) for row in hours],
}
def _spend_by_category(self, claims: list[ExpenseClaim]) -> list[dict[str, Any]]:
buckets: dict[str, Decimal] = defaultdict(Decimal)
for claim in claims:
if self._status(claim) in EXCLUDED_SPEND_STATUSES:
continue
buckets[self._expense_type_label(claim.expense_type)] += self._claim_amount(claim)
rows = [
{
"name": name,
"value": self._decimal_number(value),
"color": CHART_COLORS[index % len(CHART_COLORS)],
}
for index, (name, value) in enumerate(
sorted(buckets.items(), key=lambda item: item[1], reverse=True)[:6]
)
]
return rows or EMPTY_DONUT
def _payment_status_mix(self, claims: list[ExpenseClaim]) -> list[dict[str, Any]]:
buckets: dict[str, int] = defaultdict(int)
for claim in claims:
status = self._status(claim)
if status in {"draft", "deleted"}:
continue
buckets[self._finance_status_label(status)] += 1
rows = [
{"name": name, "value": count, "color": CHART_COLORS[index % len(CHART_COLORS)]}
for index, (name, count) in enumerate(
sorted(buckets.items(), key=lambda item: item[1], reverse=True)[:6]
)
]
return rows or EMPTY_DONUT
def _department_ranking(self, claims: list[ExpenseClaim]) -> list[dict[str, Any]]:
buckets: dict[str, Decimal] = defaultdict(Decimal)
counts: dict[str, int] = defaultdict(int)
pending_amounts: dict[str, Decimal] = defaultdict(Decimal)
employees: dict[str, set[str]] = defaultdict(set)
for claim in claims:
status = self._status(claim)
if status in EXCLUDED_SPEND_STATUSES:
continue
department_name = str(claim.department_name or "").strip()
if self._is_missing_finance_dimension(department_name):
continue
amount = self._claim_amount(claim)
buckets[department_name] += amount
counts[department_name] += 1
employee_name = str(claim.employee_name or "").strip()
if not self._is_missing_finance_dimension(employee_name):
employees[department_name].add(employee_name)
if status in PENDING_STATUSES:
pending_amounts[department_name] += amount
rows = [
{
"name": name,
"amount": self._decimal_number(amount),
"value": self._decimal_number(amount),
"count": counts[name],
"employeeCount": len(employees[name]),
"pendingAmount": self._decimal_number(pending_amounts[name]),
"color": CHART_COLORS[index % len(CHART_COLORS)],
}
for index, (name, amount) in enumerate(
sorted(buckets.items(), key=lambda item: item[1], reverse=True)[:6]
)
]
return rows
def _department_employee_mix(self, claims: list[ExpenseClaim]) -> list[dict[str, Any]]:
buckets: dict[tuple[str, str], Decimal] = defaultdict(Decimal)
for claim in claims:
if self._status(claim) in EXCLUDED_SPEND_STATUSES:
continue
department_name = str(claim.department_name or "").strip()
employee_name = str(claim.employee_name or "").strip()
if self._is_missing_finance_dimension(department_name):
continue
if self._is_missing_finance_dimension(employee_name):
continue
buckets[(department_name, employee_name)] += self._claim_amount(claim)
rows = [
{
"name": f"{department_name} · {employee_name}",
"department": department_name,
"employee": employee_name,
"value": self._decimal_number(amount),
"amount": self._decimal_number(amount),
"color": CHART_COLORS[index % len(CHART_COLORS)],
}
for index, ((department_name, employee_name), amount) in enumerate(
sorted(buckets.items(), key=lambda item: item[1], reverse=True)[:6]
)
]
return rows or EMPTY_DONUT
def _employee_ranking(self, claims: list[ExpenseClaim]) -> list[dict[str, Any]]:
buckets: dict[str, Decimal] = defaultdict(Decimal)
counts: dict[str, int] = defaultdict(int)
departments: dict[str, str] = {}
for claim in claims:
if self._status(claim) in EXCLUDED_SPEND_STATUSES:
continue
employee_name = str(claim.employee_name or "").strip()
if self._is_missing_finance_dimension(employee_name):
continue
amount = self._claim_amount(claim)
buckets[employee_name] += amount
counts[employee_name] += 1
departments.setdefault(employee_name, str(claim.department_name or "").strip())
return [
{
"name": name,
"department": departments.get(name, ""),
"amount": self._decimal_number(amount),
"value": self._decimal_number(amount),
"count": counts[name],
"avgAmount": self._decimal_number(amount / Decimal(str(counts[name]))),
"color": CHART_COLORS[index % len(CHART_COLORS)],
}
for index, (name, amount) in enumerate(
sorted(buckets.items(), key=lambda item: item[1], reverse=True)[:6]
)
]
def _top_claims(self, claims: list[ExpenseClaim]) -> list[dict[str, Any]]:
spend_claims = [
claim for claim in claims if self._status(claim) not in EXCLUDED_SPEND_STATUSES
]
return [
{
"claimNo": claim.claim_no,
"employeeName": claim.employee_name,
"departmentName": self._display_finance_dimension(
claim.department_name,
fallback="未归属部门",
),
"expenseTypeLabel": self._expense_type_label(claim.expense_type),
"amount": self._decimal_number(self._claim_amount(claim)),
"amountLabel": self._currency(self._claim_amount(claim)),
"statusLabel": self._finance_status_label(self._status(claim)),
}
for claim in sorted(spend_claims, key=self._claim_amount, reverse=True)[:6]
]
def _bottlenecks(self, claims: list[ExpenseClaim]) -> list[dict[str, Any]]:
active_claims = [
claim for claim in claims if self._status(claim) not in EXCLUDED_SPEND_STATUSES
]
pending_payment_claims = [
claim for claim in active_claims if self._status(claim) == "pending_payment"
]
paid_claims = [claim for claim in active_claims if self._status(claim) == "paid"]
submitted_claims = [
claim for claim in active_claims if self._status(claim) in PENDING_STATUSES
]
budget_rows = self._budget_focus_rows()
pending_payment_amount = sum(
(self._claim_amount(claim) for claim in pending_payment_claims),
Decimal("0.00"),
)
high_claim = max(
(self._claim_amount(claim) for claim in active_claims),
default=Decimal("0.00"),
)
payment_clearance = self._percent(len(paid_claims), len(active_claims))
rows = [
*budget_rows,
self._focus_item(
name="待付款",
role="资金计划",
duration=self._currency(pending_payment_amount),
status=f"{len(pending_payment_claims)}",
tone="warning" if pending_payment_claims else "success",
avatar="",
),
self._focus_item(
name="高额单据",
role="费用集中度",
duration=self._currency(high_claim),
status="本期最高",
tone="warning" if high_claim >= Decimal("10000") else "success",
avatar="",
),
self._focus_item(
name="待入账",
role="月结准备",
duration=f"{len(submitted_claims)}",
status="待流转" if submitted_claims else "已清理",
tone="warning" if submitted_claims else "success",
avatar="",
),
self._focus_item(
name="付款完成率",
role="付款执行",
duration=f"{payment_clearance:.1f}%",
status=f"{len(paid_claims)} 单已付",
tone="success" if payment_clearance >= 80 else "warning",
avatar="",
),
]
priority = {"danger": 0, "warning": 1, "success": 2}
return sorted(rows, key=lambda item: priority.get(str(item.get("tone")), 3))[:6]
def _budget_summary(self, fiscal_year: int) -> dict[str, Any]:
allocations = self._fetch_budget_allocations(fiscal_year)
total = Decimal("0.00")
used = Decimal("0.00")
available = Decimal("0.00")
for allocation in allocations:
balance = self.get_balance(allocation)
total += balance.total_amount
used += balance.reserved_amount + balance.consumed_amount
available += balance.available_amount
ratio = Decimal("0.00")
if total > Decimal("0.00"):
ratio = (used / total) * Decimal("100")
return {
"ratio": self._decimal_number(ratio),
"total": self._currency(total),
"used": self._currency(used),
"left": self._currency(available),
}
def _budget_metrics(self, fiscal_year: int) -> list[dict[str, Any]]:
allocations = self._fetch_budget_allocations(fiscal_year)
total = Decimal("0.00")
consumed = Decimal("0.00")
reserved = Decimal("0.00")
available = Decimal("0.00")
over_count = 0
warning_count = 0
for allocation in allocations:
balance = self.get_balance(allocation)
total += balance.total_amount
consumed += balance.consumed_amount
reserved += balance.reserved_amount
available += balance.available_amount
if balance.available_amount < Decimal("0.00"):
over_count += 1
continue
if balance.usage_rate >= Decimal(str(allocation.warning_threshold or 80)):
warning_count += 1
used = consumed + reserved
usage_rate = Decimal("0.00")
if total > Decimal("0.00"):
usage_rate = (used / total) * Decimal("100")
return [
self._budget_metric(
label="预算池数量",
value=f"{len(allocations)}",
detail="年度有效预算池",
tone="neutral",
icon="mdi mdi-database-outline",
),
self._budget_metric(
label="总预算",
value=self._currency(total),
detail="原始预算 + 调整",
tone="neutral",
icon="mdi mdi-cash-register",
),
self._budget_metric(
label="已用预算",
value=self._currency(used),
detail=f"使用率 {self._decimal_number(usage_rate):.1f}%",
tone="warning" if usage_rate >= Decimal("80") else "success",
icon="mdi mdi-chart-arc",
),
self._budget_metric(
label="预占预算",
value=self._currency(reserved),
detail="待流转单据占用",
tone="warning" if reserved > Decimal("0.00") else "success",
icon="mdi mdi-lock-outline",
),
self._budget_metric(
label="可用预算",
value=self._currency(available),
detail="可继续使用额度",
tone="danger" if available < Decimal("0.00") else "success",
icon="mdi mdi-wallet-outline",
),
self._budget_metric(
label="预警预算池",
value=f"{warning_count}",
detail=f"超支 {over_count}",
tone="danger" if over_count else "warning" if warning_count else "success",
icon="mdi mdi-alert-outline",
),
]
def _budget_metric(
self,
*,
label: str,
value: str,
detail: str,
tone: str,
icon: str,
) -> dict[str, Any]:
return {
"label": label,
"value": value,
"detail": detail,
"tone": tone,
"icon": icon,
}
def _budget_focus_rows(self) -> list[dict[str, Any]]:
allocations = self._fetch_budget_allocations(datetime.now(UTC).year)
over_count = 0
warning_count = 0
over_amount = Decimal("0.00")
warning_used = Decimal("0.00")
for allocation in allocations:
balance = self.get_balance(allocation)
if balance.available_amount < Decimal("0.00"):
over_count += 1
over_amount += abs(balance.available_amount)
continue
if balance.usage_rate >= Decimal(str(allocation.warning_threshold or 80)):
warning_count += 1
warning_used += balance.reserved_amount + balance.consumed_amount
return [
self._focus_item(
name="预算超支",
role="预算控制",
duration=f"{over_count} 个池",
status=self._currency(over_amount),
tone="danger" if over_count else "success",
avatar="",
),
self._focus_item(
name="预算预警",
role="预算控制",
duration=f"{warning_count} 个池",
status=self._currency(warning_used),
tone="warning" if warning_count else "success",
avatar="",
),
]
def _focus_item(
self,
*,
name: str,
role: str,
duration: str,
status: str,
tone: str,
avatar: str,
) -> dict[str, Any]:
return {
"name": name,
"role": role,
"duration": duration,
"status": status,
"tone": tone,
"avatar": avatar,
}
def _claim_time(self, claim: ExpenseClaim) -> datetime:
return self._as_utc(claim.submitted_at or claim.occurred_at or claim.created_at)
def _claim_sla_hours(self, claim: ExpenseClaim, now: datetime) -> Decimal:
start = self._as_utc(claim.submitted_at or claim.created_at or claim.occurred_at)
end = now
if self._status(claim) in SUCCESS_STATUSES | {"rejected", "returned"} and claim.updated_at:
end = self._as_utc(claim.updated_at)
hours = Decimal(str(max((end - start).total_seconds(), 0))) / Decimal("3600")
return hours.quantize(Decimal("0.1"))
def _claim_amount(self, claim: ExpenseClaim) -> Decimal:
return Decimal(str(claim.amount or 0))
def _claim_key(self, claim: ExpenseClaim) -> str:
return str(claim.claim_no or claim.id or "").strip()
def _has_claim_risk(self, claim: ExpenseClaim) -> bool:
return bool(claim.hermes_risk_flag or self._risk_flags(claim))
def _claim_risk_labels(self, claim: ExpenseClaim) -> list[str]:
labels: list[str] = []
if claim.hermes_risk_flag:
labels.append("风险扫描命中")
for flag in self._risk_flags(claim):
if isinstance(flag, dict):
label = str(flag.get("label") or flag.get("message") or "").strip()
if not label:
label = self._risk_signal_label(
flag.get("type") or flag.get("risk_signal") or ""
)
else:
label = self._risk_signal_label(flag)
labels.append(self._display_risk_label(label))
return labels
def _risk_flags(self, claim: ExpenseClaim) -> list[Any]:
flags = claim.risk_flags_json or []
return flags if isinstance(flags, list) else []
def _stage_label(self, claim: ExpenseClaim) -> str:
stage = str(claim.approval_stage or self._status(claim) or "").strip().lower()
return STAGE_LABELS.get(stage, stage.replace("_", " ").strip() or "待审批")
def _finance_status_label(self, status: str) -> str:
labels = {
"submitted": "审批中",
"review": "审批中",
"pending_review": "审批中",
"manager_review": "审批中",
"budget_review": "审批中",
"finance_review": "审批中",
"approving": "审批中",
"approved": "已入账",
"pending_payment": "待付款",
"paid": "已付款",
"returned": "待补充",
"rejected": "已驳回",
}
return labels.get(str(status or "").strip().lower(), "其他")
def _expense_type_label(self, value: str | None) -> str:
raw = str(value or "").strip()
normalized = raw.lower().replace(" ", "_").replace("-", "_")
normalized = EXPENSE_TYPE_ALIASES.get(normalized, normalized)
if normalized.endswith("_application"):
normalized = normalized.removesuffix("_application")
return EXPENSE_TYPE_LABELS.get(normalized, "其他费用")
def _is_missing_finance_dimension(self, value: str | None) -> bool:
normalized = str(value or "").strip()
return not normalized or normalized in {
"待补充",
"待确认",
"未归属部门",
"未归属",
"N/A",
"n/a",
"-",
}
def _display_finance_dimension(self, value: str | None, *, fallback: str) -> str:
text = str(value or "").strip()
return fallback if self._is_missing_finance_dimension(text) else text
def _risk_signal_label(self, value: Any) -> str:
normalized = str(value or "").strip()
if not normalized:
return "风险观察"
key = normalized.lower().replace(" ", "_").replace("-", "_")
if key in RISK_SIGNAL_LABELS:
return RISK_SIGNAL_LABELS[key]
return self._display_risk_label(normalized)
def _display_risk_label(self, value: Any) -> str:
text = str(value or "").strip()
if not text:
return "风险观察"
key = text.lower().replace(" ", "_").replace("-", "_")
if key in RISK_SIGNAL_LABELS:
return RISK_SIGNAL_LABELS[key]
if self._contains_cjk(text):
return text
return "风险观察"
def _contains_cjk(self, value: str) -> bool:
return any("\u4e00" <= char <= "\u9fff" for char in value)
def _status(self, claim: ExpenseClaim) -> str:
return str(claim.status or "").strip().lower()
def _as_utc(self, value: datetime | None) -> datetime:
if value is None:
return datetime.now(UTC)
if value.tzinfo is None:
return value.replace(tzinfo=UTC)
return value.astimezone(UTC)
def _day_start(self, value: date) -> datetime:
return datetime.combine(value, time.min, tzinfo=UTC)
def _day_after(self, value: date) -> datetime:
return datetime.combine(value + timedelta(days=1), time.min, tzinfo=UTC)
def _date_label(self, value: date) -> str:
return value.strftime("%m-%d")
def _days_from_label(self, value: str, *, default: int) -> int:
match = re.search(r"\d+", str(value or ""))
if not match:
return default
return max(1, min(int(match.group(0)), 90))
def _duration_status(self, hours: Decimal) -> str:
if hours >= Decimal("12"):
return "较慢"
if hours >= SLA_TARGET_HOURS:
return "偏慢"
return "正常"
def _duration_tone(self, hours: Decimal) -> str:
if hours >= Decimal("12"):
return "danger"
if hours >= SLA_TARGET_HOURS:
return "warning"
return "success"
def _average(self, values: list[Decimal]) -> Decimal:
if not values:
return Decimal("0.00")
return sum(values, Decimal("0.00")) / Decimal(str(len(values)))
def _percent(self, part: int | Decimal, total: int | Decimal) -> float:
total_decimal = Decimal(str(total or 0))
if total_decimal <= Decimal("0"):
return 0.0
return self._decimal_number((Decimal(str(part or 0)) / total_decimal) * Decimal("100"))
def _change_percent(self, current: Decimal, previous: Decimal) -> float:
if previous == Decimal("0"):
return 0.0 if current == Decimal("0") else 100.0
return self._decimal_number(((current - previous) / previous) * Decimal("100"))
def _decimal_number(self, value: Decimal) -> float:
return float(value.quantize(Decimal("0.1")))
def _format_delta(self, value: Decimal, unit: str) -> str:
if unit == "":
return self._currency(value)
if unit == "h":
return f"{self._decimal_number(value):.1f}h"
if unit == "%":
return f"{self._decimal_number(value):.1f}%"
return f"{int(value)}{unit}"
def _currency(self, value: Decimal) -> str:
prefix = "" if value < Decimal("0") else "¥"
amount = abs(value)
return f"{prefix}{amount:,.0f}"