Files
X-Financial/tools/agent-change-log/update_change_log.py
caoxiaozhu d4ff79f326 chore(agent): 新增变更日志/checkpoint skill 与 post-commit hook
- AGENTS.md 新增变更日志 Skill 规范:变更后增量更新 document/work-log,更新前先做 Git 拉取检查
- 新增 .codex/skills 下 agent-change-log、git-checkpoint-commit 两个 skill
- 新增 .githooks/post-commit 与 tools/agent-change-log 自动化脚本,提供提交后最低限度日志追加
2026-06-24 10:41:56 +08:00

228 lines
8.7 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Append an incremental X-Financial agent work-log entry."""
from __future__ import annotations
import argparse
import os
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
SECTION_WORK = "## 当日工作内容"
SECTION_ISSUES = "## 遗留问题"
SECTION_TODO = "## TODO"
@dataclass
class CommandResult:
args: list[str]
returncode: int
stdout: str
stderr: str
@property
def ok(self) -> bool:
return self.returncode == 0
@property
def message(self) -> str:
return (self.stderr.strip() or self.stdout.strip()).strip()
def run(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
try:
result = subprocess.run(
args,
cwd=cwd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
)
except subprocess.TimeoutExpired as exc:
return CommandResult(args, 124, exc.stdout or "", exc.stderr or "command timed out")
return CommandResult(args, result.returncode, result.stdout, result.stderr)
def git(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
return run(["git", *args], cwd, timeout=timeout)
def repo_root() -> Path:
result = run(["git", "rev-parse", "--show-toplevel"], Path.cwd())
if not result.ok:
raise SystemExit(f"Not inside a git repository: {result.message}")
return Path(result.stdout.strip())
def one_line(value: str) -> str:
return " ".join(value.strip().split())
def indent_block(text: str, limit: int = 8) -> list[str]:
lines = [line.strip() for line in text.splitlines() if line.strip()]
if not lines:
return ["未发现"]
shown = lines[:limit]
if len(lines) > limit:
shown.append(f"... 另有 {len(lines) - limit}")
return shown
def ensure_document(path: Path, date_text: str) -> str:
if path.exists():
content = path.read_text(encoding="utf-8")
else:
content = f"# {date_text} 工作日志\n\n{SECTION_WORK}\n\n{SECTION_ISSUES}\n\n{SECTION_TODO}\n"
if not content.endswith("\n"):
content += "\n"
for heading in (SECTION_WORK, SECTION_ISSUES, SECTION_TODO):
if heading not in content:
content += f"\n{heading}\n"
return content
def insert_under_section(content: str, section: str, entry: str) -> str:
marker = f"{section}\n"
start = content.find(marker)
if start == -1:
return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n"
insert_at = start + len(marker)
next_section = content.find("\n## ", insert_at)
entry_text = "\n" + entry.rstrip() + "\n"
if next_section == -1:
return content[:insert_at].rstrip() + entry_text
before = content[:next_section].rstrip()
after = content[next_section:]
return before + entry_text + after
def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, str | bool]:
data: dict[str, str | bool] = {}
fetch_result = CommandResult(["git", "fetch"], 0, "", "")
if no_fetch:
data["fetch"] = "已跳过 fetch--no-fetch"
else:
fetch_result = git(["fetch", "--all", "--prune"], root, timeout=60)
data["fetch"] = "成功" if fetch_result.ok else f"失败:{one_line(fetch_result.message)}"
status = git(["status", "-sb"], root)
data["status"] = status.stdout.strip() if status.ok else f"读取失败:{one_line(status.message)}"
upstream = git(["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], root)
if upstream.ok:
data["upstream"] = upstream.stdout.strip()
behind = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "HEAD..@{u}"], root)
ahead = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "@{u}..HEAD"], root)
data["behind"] = behind.stdout.strip() if behind.ok else f"读取失败:{one_line(behind.message)}"
data["ahead"] = ahead.stdout.strip() if ahead.ok else f"读取失败:{one_line(ahead.message)}"
else:
data["upstream"] = ""
data["behind"] = "无 upstream 分支"
data["ahead"] = "无 upstream 分支"
head_hash = git(["rev-parse", "--short", "HEAD"], root)
head_subject = git(["show", "-s", "--format=%s", "HEAD"], root)
data["head_hash"] = head_hash.stdout.strip() if head_hash.ok else ""
data["head_subject"] = head_subject.stdout.strip() if head_subject.ok else ""
stat = git(["show", "--stat", "--oneline", "--decorate", "--max-count=1", "HEAD"], root)
data["head_stat"] = stat.stdout.strip() if stat.ok else ""
data["fetch_failed"] = not fetch_result.ok
return data
def build_work_entry(now: datetime, event: str, git_data: dict[str, str | bool]) -> str:
time_text = now.strftime("%H:%M")
head_hash = str(git_data.get("head_hash") or "")
head_subject = str(git_data.get("head_subject") or "")
behind = str(git_data.get("behind") or "")
ahead = str(git_data.get("ahead") or "")
behind_lines = indent_block(behind)
ahead_lines = indent_block(ahead)
behind_text = "".join(behind_lines)
ahead_text = "".join(ahead_lines)
marker = f"auto-log:{head_hash}" if head_hash else "auto-log:no-head"
return "\n".join(
[
f"- {time_text}:自动记录 `{head_hash}` 提交后的工作日志。({marker}",
f" - Git 提交检查fetch {git_data.get('fetch')}upstream `{git_data.get('upstream') or '未配置'}`upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}",
f" - 修改:最近提交为 `{head_hash} {head_subject}`。",
f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py`,自动读取 Git 状态并写入当天日志。",
" - 验证:自动脚本只记录提交和 Git 状态,不替代业务测试;业务验证仍以本次任务实际运行结果为准。",
" - 影响:提交后即使执行代理忘记手动写日志,也会留下最低限度的时间、提交和分支状态记录。",
]
)
def build_issue_entry(now: datetime, git_data: dict[str, str | bool]) -> str | None:
time_text = now.strftime("%H:%M")
issues: list[str] = []
if git_data.get("fetch_failed"):
issues.append(f"fetch 未成功:{git_data.get('fetch')}")
if not git_data.get("upstream"):
issues.append("当前分支没有 upstream无法判断其他智能体是否已推送新提交")
if not issues:
return None
return f"- {time_text}:自动日志触发时发现 {''.join(issues)}。建议后续在有 Git 写权限和网络权限的环境里重新执行拉取检查。"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--event", default="manual", help="Human-readable trigger source.")
parser.add_argument("--date", help="Override log date YYYY-MM-DD.")
parser.add_argument("--log-path", help="Override log file path.")
parser.add_argument("--dry-run", action="store_true", help="Print the entry without writing files.")
parser.add_argument("--force", action="store_true", help="Write even if this commit marker already exists.")
parser.add_argument("--no-fetch", action="store_true", help="Skip git fetch; useful for tests or offline hooks.")
parser.add_argument("--max-commits", type=int, default=20, help="Maximum commits to summarize per direction.")
return parser.parse_args()
def main() -> int:
args = parse_args()
root = repo_root()
now = datetime.now().astimezone()
date_text = args.date or now.strftime("%Y-%m-%d")
log_path = Path(args.log_path) if args.log_path else root / "document" / "work-log" / f"{date_text}.md"
git_data = collect_git(root, no_fetch=args.no_fetch, max_commits=args.max_commits)
entry = build_work_entry(now, args.event, git_data)
issue_entry = build_issue_entry(now, git_data)
marker = f"auto-log:{git_data.get('head_hash')}"
if args.dry_run:
print(entry)
if issue_entry:
print()
print(issue_entry)
return 0
content = ensure_document(log_path, date_text)
if marker in content and not args.force:
print(f"Work log already contains {marker}; skipping.")
return 0
content = insert_under_section(content, SECTION_WORK, entry)
if issue_entry:
content = insert_under_section(content, SECTION_ISSUES, issue_entry)
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text(content, encoding="utf-8")
print(f"updated_log={os.path.relpath(log_path, root)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())