Files
X-Financial/tools/agent-change-log/update_change_log.py
2026-06-25 09:35:18 +08:00

385 lines
14 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Write split X-Financial development logs."""
from __future__ import annotations
import argparse
import os
import re
import subprocess
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
BUG_SECTION = "## 修复记录"
WORK_LOG_NAME = "work-logs.med"
@dataclass
class CommandResult:
args: list[str]
returncode: int
stdout: str
stderr: str
@property
def ok(self) -> bool:
return self.returncode == 0
@property
def message(self) -> str:
return (self.stderr.strip() or self.stdout.strip()).strip()
def run(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
try:
result = subprocess.run(
args,
cwd=cwd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
except subprocess.TimeoutExpired as exc:
return CommandResult(args, 124, exc.stdout or "", exc.stderr or "command timed out")
return CommandResult(args, result.returncode, result.stdout, result.stderr)
def git(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
return run(["git", *args], cwd, timeout=timeout)
def repo_root() -> Path:
result = run(["git", "rev-parse", "--show-toplevel"], Path.cwd())
if not result.ok:
raise SystemExit(f"Not inside a git repository: {result.message}")
return Path(result.stdout.strip())
def one_line(value: str) -> str:
return " ".join(value.strip().split())
def compact_lines(text: str, limit: int = 8) -> list[str]:
lines = [line.strip() for line in text.splitlines() if line.strip()]
if not lines:
return ["未发现"]
shown = lines[:limit]
if len(lines) > limit:
shown.append(f"... 另有 {len(lines) - limit}")
return shown
def slugify(value: str, fallback: str) -> str:
normalized = value.strip().lower()
chars: list[str] = []
previous_dash = False
for char in normalized:
keep = char.isascii() and char.isalnum()
keep = keep or "\u4e00" <= char <= "\u9fff"
if keep:
chars.append(char)
previous_dash = False
elif not previous_dash:
chars.append("-")
previous_dash = True
slug = "".join(chars).strip("-")
return slug or fallback
def date_dir(root: Path, date_text: str) -> Path:
return root / "document" / "development" / date_text
def bugs_dir(root: Path, date_text: str) -> Path:
return date_dir(root, date_text) / "dev-logs" / "bugs"
def feature_dir(root: Path, date_text: str) -> Path:
return date_dir(root, date_text) / "feature"
def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, str | bool]:
data: dict[str, str | bool] = {}
fetch_result = CommandResult(["git", "fetch"], 0, "", "")
if no_fetch:
data["fetch"] = "已跳过 fetch--no-fetch"
else:
fetch_result = git(["fetch", "--all", "--prune"], root, timeout=60)
data["fetch"] = "成功" if fetch_result.ok else f"失败:{one_line(fetch_result.message)}"
status = git(["status", "-sb"], root)
data["status"] = status.stdout.strip() if status.ok else f"读取失败:{one_line(status.message)}"
upstream = git(["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], root)
if upstream.ok:
data["upstream"] = upstream.stdout.strip()
behind = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "HEAD..@{u}"], root)
ahead = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "@{u}..HEAD"], root)
data["behind"] = behind.stdout.strip() if behind.ok else f"读取失败:{one_line(behind.message)}"
data["ahead"] = ahead.stdout.strip() if ahead.ok else f"读取失败:{one_line(ahead.message)}"
else:
data["upstream"] = ""
data["behind"] = "无 upstream 分支"
data["ahead"] = "无 upstream 分支"
head_hash = git(["rev-parse", "--short", "HEAD"], root)
head_subject = git(["show", "-s", "--format=%s", "HEAD"], root)
data["head_hash"] = head_hash.stdout.strip() if head_hash.ok else ""
data["head_subject"] = head_subject.stdout.strip() if head_subject.ok else ""
data["fetch_failed"] = not fetch_result.ok
return data
def git_check_text(git_data: dict[str, str | bool]) -> str:
behind_text = "".join(compact_lines(str(git_data.get("behind") or "")))
ahead_text = "".join(compact_lines(str(git_data.get("ahead") or "")))
upstream = git_data.get("upstream") or "未配置"
return (
f"fetch {git_data.get('fetch')}upstream `{upstream}`"
f"upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}"
)
def looks_like_bug(subject: str) -> bool:
lower = subject.strip().lower()
if lower.startswith(("fix", "bugfix", "hotfix", "revert")):
return True
return any(keyword in lower for keyword in ("bug", "修复", "缺陷", "故障", "报错", "失败", "异常"))
def insert_under_section(content: str, section: str, entry: str) -> str:
marker = f"{section}\n"
start = content.find(marker)
if start == -1:
return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n"
insert_at = start + len(marker)
next_section = content.find("\n## ", insert_at)
entry_text = "\n" + entry.rstrip() + "\n"
if next_section == -1:
return content[:insert_at].rstrip() + entry_text
before = content[:next_section].rstrip()
after = content[next_section:]
return before + entry_text + after
def ensure_bug_document(path: Path, *, title: str, date_text: str) -> str:
if path.exists():
content = path.read_text(encoding="utf-8")
else:
rel_path = path.as_posix().split("document/development/", 1)[-1]
content = "\n".join(
[
f"# {title}",
"",
f"日期:{date_text}",
f"文档路径document/development/{rel_path}",
"",
BUG_SECTION,
"",
]
)
if not content.endswith("\n"):
content += "\n"
if BUG_SECTION not in content:
content += f"\n{BUG_SECTION}\n"
return content
def build_bug_entry(now: datetime, event: str, title: str, git_data: dict[str, str | bool]) -> str:
time_text = now.strftime("%H:%M")
head_hash = str(git_data.get("head_hash") or "")
head_subject = str(git_data.get("head_subject") or "")
marker = f"bug-log:{head_hash}" if head_hash else f"bug-log:{time_text}"
return "\n".join(
[
f"- {time_text}:记录 bug 修复:{title}。({marker}",
f" - Git 提交检查:{git_check_text(git_data)}",
f" - 修改:最近提交为 `{head_hash} {head_subject}`。",
f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py --kind bug`,写入当天 `dev-logs/bugs`。",
" - 验证:记录修复时应引用本次任务实际运行的测试、构建或手工验证结果;自动脚本不替代业务验证。",
" - 影响:该 bug 会在 17:00 综合日志中与当天功能点一起汇总。",
]
)
def write_bug_log(args: argparse.Namespace, root: Path, now: datetime, git_data: dict[str, str | bool]) -> int:
date_text = args.date or now.strftime("%Y-%m-%d")
head_hash = str(git_data.get("head_hash") or "")
head_subject = str(git_data.get("head_subject") or "")
title = args.bug_title or head_subject or "未命名 bug 修复"
slug = slugify(args.bug_slug or title, fallback=f"bug-{head_hash or now.strftime('%H%M%S')}")
path = Path(args.log_path) if args.log_path else bugs_dir(root, date_text) / f"{slug}.md"
entry = build_bug_entry(now, args.event, title, git_data)
marker = f"bug-log:{head_hash}" if head_hash else ""
if args.dry_run:
print(f"target_log={os.path.relpath(path, root)}")
print(entry)
return 0
content = ensure_bug_document(path, title=title, date_text=date_text)
if marker and marker in content and not args.force:
print(f"Bug log already contains {marker}; skipping.")
return 0
content = insert_under_section(content, BUG_SECTION, entry)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
print(f"updated_bug_log={os.path.relpath(path, root)}")
return 0
def first_heading(path: Path) -> str:
for line in path.read_text(encoding="utf-8").splitlines():
if line.startswith("# "):
return line[2:].strip()
return path.parent.name
def section_first_text(content: str, section: str) -> str:
marker = f"## {section}"
start = content.find(marker)
if start == -1:
return ""
tail = content[start + len(marker) :].split("\n## ", 1)[0]
for line in tail.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
return stripped.strip("- ").strip()
return ""
def summarize_feature(path: Path) -> str:
concept = path / "CONCEPT.md"
todo = path / "TODO.md"
title = path.name
sentence = ""
if concept.exists():
concept_text = concept.read_text(encoding="utf-8")
title = first_heading(concept)
sentence = section_first_text(concept_text, "功能一句话")
todo_text = todo.read_text(encoding="utf-8") if todo.exists() else ""
done_count = len(re.findall(r"^- \[x\]", todo_text, flags=re.MULTILINE | re.IGNORECASE))
open_count = len(re.findall(r"^- \[ \]", todo_text, flags=re.MULTILINE))
status = f"TODO 完成 {done_count} 项,未完成 {open_count}" if todo.exists() else "未找到 TODO.md"
detail = sentence or "未提取到功能一句话"
return f"- {title}{detail}{status};目录:`{path.name}`"
def summarize_bug(path: Path) -> str:
title = first_heading(path)
content = path.read_text(encoding="utf-8")
latest = ""
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith("- ") and "" in stripped:
latest = stripped[2:]
return f"- {title}{latest or '已记录修复内容'}(文件:`{path.name}`"
def build_daily_summary(root: Path, date_text: str, now: datetime) -> str:
features = []
feature_root = feature_dir(root, date_text)
if feature_root.exists():
for path in sorted(feature_root.iterdir()):
if path.is_dir():
features.append(summarize_feature(path))
bugs = []
bug_root = bugs_dir(root, date_text)
if bug_root.exists():
for path in sorted(bug_root.glob("*.md")):
bugs.append(summarize_bug(path))
feature_text = "\n".join(features) if features else "- 今日未发现功能点文档。"
bug_text = "\n".join(bugs) if bugs else "- 今日未发现 bug 修复记录。"
analysis_parts = []
if features:
analysis_parts.append(f"功能侧沉淀了 {len(features)} 个功能点")
if bugs:
analysis_parts.append(f"问题侧记录了 {len(bugs)} 个 bug 修复")
analysis = "".join(analysis_parts) if analysis_parts else "今日目录下暂无功能点或 bug 记录"
return "\n".join(
[
f"# {date_text} 综合工作日志",
"",
f"生成时间:{now.strftime('%Y-%m-%d %H:%M:%S %Z')}",
"来源:`feature/` 功能点文档与 `dev-logs/bugs/` bug 记录",
"",
"## 今日功能点",
"",
feature_text,
"",
"## 今日 Bugs",
"",
bug_text,
"",
"## 综合分析",
"",
f"- {analysis}",
"- 后续复盘优先看本文件,再回到对应功能点或 bug 文件追溯证据。",
"",
]
)
def write_daily_summary(args: argparse.Namespace, root: Path, now: datetime) -> int:
date_text = args.date or now.strftime("%Y-%m-%d")
path = Path(args.log_path) if args.log_path else date_dir(root, date_text) / WORK_LOG_NAME
content = build_daily_summary(root, date_text, now)
if args.dry_run:
print(f"target_log={os.path.relpath(path, root)}")
print(content)
return 0
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
print(f"updated_work_log={os.path.relpath(path, root)}")
return 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--kind", choices=("auto", "bug", "summary"), default="auto")
parser.add_argument("--event", default="manual", help="Human-readable trigger source.")
parser.add_argument("--date", help="Override log date YYYY-MM-DD.")
parser.add_argument("--log-path", help="Override output log path.")
parser.add_argument("--bug-title", help="Human-readable bug title.")
parser.add_argument("--bug-slug", help="Stable bug log file slug.")
parser.add_argument("--dry-run", action="store_true", help="Print output without writing files.")
parser.add_argument("--force", action="store_true", help="Write even if this commit marker already exists.")
parser.add_argument("--no-fetch", action="store_true", help="Skip git fetch; useful for tests or offline hooks.")
parser.add_argument("--max-commits", type=int, default=20, help="Maximum commits to summarize per direction.")
return parser.parse_args()
def main() -> int:
args = parse_args()
root = repo_root()
now = datetime.now().astimezone()
if args.kind == "summary":
return write_daily_summary(args, root, now)
git_data = collect_git(root, no_fetch=args.no_fetch, max_commits=args.max_commits)
subject = str(git_data.get("head_subject") or "")
if args.kind == "auto" and not looks_like_bug(subject):
print(f"skipped_non_bug_commit={subject}")
return 0
return write_bug_log(args, root, now, git_data)
if __name__ == "__main__":
raise SystemExit(main())