Files
X-Financial/server/scripts/audit_ontology_context_fields.py

105 lines
3.4 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import argparse
import re
import sys
from dataclasses import dataclass
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
APP_SRC = ROOT / "src"
if str(APP_SRC) not in sys.path:
sys.path.insert(0, str(APP_SRC))
from app.services.ontology_field_registry import ( # noqa: E402
CANONICAL_ONTOLOGY_FIELDS,
ONTOLOGY_CONTEXT_METADATA_FIELDS,
ONTOLOGY_FIELD_ALIASES,
REGISTERED_ONTOLOGY_CONTEXT_FIELDS,
)
SCAN_ROOTS = (ROOT / "src" / "app", ROOT.parent / "web" / "src")
SKIP_PARTS = {"__pycache__", ".pytest_cache", ".ruff_cache", "node_modules", "dist"}
FIELD_PATTERNS = (
re.compile(r"""context_json\.get\(["']([^"']+)["']"""),
re.compile(r"""review_form_values\.get\(["']([^"']+)["']"""),
re.compile(r"""form_values\.get\(["']([^"']+)["']"""),
re.compile(r"""review_values\.get\(["']([^"']+)["']"""),
)
@dataclass(frozen=True)
class Finding:
file: Path
line_no: int
field: str
kind: str
source: str
def iter_source_files() -> list[Path]:
files: list[Path] = []
for root in SCAN_ROOTS:
if not root.exists():
continue
for path in root.rglob("*"):
if any(part in SKIP_PARTS for part in path.parts):
continue
if path.suffix not in {".py", ".js", ".vue", ".mjs", ".ts"}:
continue
files.append(path)
return sorted(files)
def collect_findings() -> tuple[list[Finding], list[Finding]]:
alias_fields = {alias for aliases in ONTOLOGY_FIELD_ALIASES.values() for alias in aliases}
unknown: list[Finding] = []
alias_reads: list[Finding] = []
for path in iter_source_files():
if path.name == "ontology_field_registry.py":
continue
text = path.read_text(encoding="utf-8", errors="ignore")
for line_no, line in enumerate(text.splitlines(), start=1):
for pattern in FIELD_PATTERNS:
for match in pattern.finditer(line):
field = match.group(1)
source = match.group(0)
if field in alias_fields and field not in ONTOLOGY_CONTEXT_METADATA_FIELDS:
alias_reads.append(Finding(path, line_no, field, "alias_read", source))
if field not in REGISTERED_ONTOLOGY_CONTEXT_FIELDS:
unknown.append(Finding(path, line_no, field, "unknown", source))
return unknown, alias_reads
def print_section(title: str, findings: list[Finding]) -> None:
print(f"\n{title}: {len(findings)}")
for item in findings[:200]:
relative = item.file.relative_to(ROOT.parent)
print(f"- {relative}:{item.line_no} field={item.field} source={item.source}")
if len(findings) > 200:
print(f"- ... {len(findings) - 200} more")
def main() -> int:
parser = argparse.ArgumentParser(description="Audit ontology context field usage.")
parser.add_argument("--strict", action="store_true", help="Exit non-zero when findings exist.")
args = parser.parse_args()
unknown, alias_reads = collect_findings()
print(f"canonical_fields: {len(CANONICAL_ONTOLOGY_FIELDS)}")
print(f"context_metadata_fields: {len(ONTOLOGY_CONTEXT_METADATA_FIELDS)}")
print_section("unknown_context_fields", unknown)
print_section("direct_alias_reads", alias_reads)
if args.strict and (unknown or alias_reads):
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())