- AGENTS.md 新增变更日志 Skill 规范:变更后增量更新 document/work-log,更新前先做 Git 拉取检查 - 新增 .codex/skills 下 agent-change-log、git-checkpoint-commit 两个 skill - 新增 .githooks/post-commit 与 tools/agent-change-log 自动化脚本,提供提交后最低限度日志追加
228 lines
8.7 KiB
Python
Executable File
228 lines
8.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""Append an incremental X-Financial agent work-log entry."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
|
||
SECTION_WORK = "## 当日工作内容"
|
||
SECTION_ISSUES = "## 遗留问题"
|
||
SECTION_TODO = "## TODO"
|
||
|
||
|
||
@dataclass
|
||
class CommandResult:
|
||
args: list[str]
|
||
returncode: int
|
||
stdout: str
|
||
stderr: str
|
||
|
||
@property
|
||
def ok(self) -> bool:
|
||
return self.returncode == 0
|
||
|
||
@property
|
||
def message(self) -> str:
|
||
return (self.stderr.strip() or self.stdout.strip()).strip()
|
||
|
||
|
||
def run(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
|
||
try:
|
||
result = subprocess.run(
|
||
args,
|
||
cwd=cwd,
|
||
text=True,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
timeout=timeout,
|
||
)
|
||
except subprocess.TimeoutExpired as exc:
|
||
return CommandResult(args, 124, exc.stdout or "", exc.stderr or "command timed out")
|
||
return CommandResult(args, result.returncode, result.stdout, result.stderr)
|
||
|
||
|
||
def git(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
|
||
return run(["git", *args], cwd, timeout=timeout)
|
||
|
||
|
||
def repo_root() -> Path:
|
||
result = run(["git", "rev-parse", "--show-toplevel"], Path.cwd())
|
||
if not result.ok:
|
||
raise SystemExit(f"Not inside a git repository: {result.message}")
|
||
return Path(result.stdout.strip())
|
||
|
||
|
||
def one_line(value: str) -> str:
|
||
return " ".join(value.strip().split())
|
||
|
||
|
||
def indent_block(text: str, limit: int = 8) -> list[str]:
|
||
lines = [line.strip() for line in text.splitlines() if line.strip()]
|
||
if not lines:
|
||
return ["未发现"]
|
||
shown = lines[:limit]
|
||
if len(lines) > limit:
|
||
shown.append(f"... 另有 {len(lines) - limit} 条")
|
||
return shown
|
||
|
||
|
||
def ensure_document(path: Path, date_text: str) -> str:
|
||
if path.exists():
|
||
content = path.read_text(encoding="utf-8")
|
||
else:
|
||
content = f"# {date_text} 工作日志\n\n{SECTION_WORK}\n\n{SECTION_ISSUES}\n\n{SECTION_TODO}\n"
|
||
|
||
if not content.endswith("\n"):
|
||
content += "\n"
|
||
|
||
for heading in (SECTION_WORK, SECTION_ISSUES, SECTION_TODO):
|
||
if heading not in content:
|
||
content += f"\n{heading}\n"
|
||
return content
|
||
|
||
|
||
def insert_under_section(content: str, section: str, entry: str) -> str:
|
||
marker = f"{section}\n"
|
||
start = content.find(marker)
|
||
if start == -1:
|
||
return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n"
|
||
|
||
insert_at = start + len(marker)
|
||
next_section = content.find("\n## ", insert_at)
|
||
entry_text = "\n" + entry.rstrip() + "\n"
|
||
if next_section == -1:
|
||
return content[:insert_at].rstrip() + entry_text
|
||
before = content[:next_section].rstrip()
|
||
after = content[next_section:]
|
||
return before + entry_text + after
|
||
|
||
|
||
def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, str | bool]:
|
||
data: dict[str, str | bool] = {}
|
||
|
||
fetch_result = CommandResult(["git", "fetch"], 0, "", "")
|
||
if no_fetch:
|
||
data["fetch"] = "已跳过 fetch(--no-fetch)"
|
||
else:
|
||
fetch_result = git(["fetch", "--all", "--prune"], root, timeout=60)
|
||
data["fetch"] = "成功" if fetch_result.ok else f"失败:{one_line(fetch_result.message)}"
|
||
|
||
status = git(["status", "-sb"], root)
|
||
data["status"] = status.stdout.strip() if status.ok else f"读取失败:{one_line(status.message)}"
|
||
|
||
upstream = git(["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], root)
|
||
if upstream.ok:
|
||
data["upstream"] = upstream.stdout.strip()
|
||
behind = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "HEAD..@{u}"], root)
|
||
ahead = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "@{u}..HEAD"], root)
|
||
data["behind"] = behind.stdout.strip() if behind.ok else f"读取失败:{one_line(behind.message)}"
|
||
data["ahead"] = ahead.stdout.strip() if ahead.ok else f"读取失败:{one_line(ahead.message)}"
|
||
else:
|
||
data["upstream"] = ""
|
||
data["behind"] = "无 upstream 分支"
|
||
data["ahead"] = "无 upstream 分支"
|
||
|
||
head_hash = git(["rev-parse", "--short", "HEAD"], root)
|
||
head_subject = git(["show", "-s", "--format=%s", "HEAD"], root)
|
||
data["head_hash"] = head_hash.stdout.strip() if head_hash.ok else ""
|
||
data["head_subject"] = head_subject.stdout.strip() if head_subject.ok else ""
|
||
|
||
stat = git(["show", "--stat", "--oneline", "--decorate", "--max-count=1", "HEAD"], root)
|
||
data["head_stat"] = stat.stdout.strip() if stat.ok else ""
|
||
data["fetch_failed"] = not fetch_result.ok
|
||
return data
|
||
|
||
|
||
def build_work_entry(now: datetime, event: str, git_data: dict[str, str | bool]) -> str:
|
||
time_text = now.strftime("%H:%M")
|
||
head_hash = str(git_data.get("head_hash") or "")
|
||
head_subject = str(git_data.get("head_subject") or "")
|
||
behind = str(git_data.get("behind") or "")
|
||
ahead = str(git_data.get("ahead") or "")
|
||
|
||
behind_lines = indent_block(behind)
|
||
ahead_lines = indent_block(ahead)
|
||
behind_text = ";".join(behind_lines)
|
||
ahead_text = ";".join(ahead_lines)
|
||
|
||
marker = f"auto-log:{head_hash}" if head_hash else "auto-log:no-head"
|
||
return "\n".join(
|
||
[
|
||
f"- {time_text}:自动记录 `{head_hash}` 提交后的工作日志。({marker})",
|
||
f" - Git 提交检查:fetch {git_data.get('fetch')};upstream `{git_data.get('upstream') or '未配置'}`;upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}。",
|
||
f" - 修改:最近提交为 `{head_hash} {head_subject}`。",
|
||
f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py`,自动读取 Git 状态并写入当天日志。",
|
||
" - 验证:自动脚本只记录提交和 Git 状态,不替代业务测试;业务验证仍以本次任务实际运行结果为准。",
|
||
" - 影响:提交后即使执行代理忘记手动写日志,也会留下最低限度的时间、提交和分支状态记录。",
|
||
]
|
||
)
|
||
|
||
|
||
def build_issue_entry(now: datetime, git_data: dict[str, str | bool]) -> str | None:
|
||
time_text = now.strftime("%H:%M")
|
||
issues: list[str] = []
|
||
if git_data.get("fetch_failed"):
|
||
issues.append(f"fetch 未成功:{git_data.get('fetch')}")
|
||
if not git_data.get("upstream"):
|
||
issues.append("当前分支没有 upstream,无法判断其他智能体是否已推送新提交")
|
||
if not issues:
|
||
return None
|
||
return f"- {time_text}:自动日志触发时发现 {';'.join(issues)}。建议后续在有 Git 写权限和网络权限的环境里重新执行拉取检查。"
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
parser.add_argument("--event", default="manual", help="Human-readable trigger source.")
|
||
parser.add_argument("--date", help="Override log date YYYY-MM-DD.")
|
||
parser.add_argument("--log-path", help="Override log file path.")
|
||
parser.add_argument("--dry-run", action="store_true", help="Print the entry without writing files.")
|
||
parser.add_argument("--force", action="store_true", help="Write even if this commit marker already exists.")
|
||
parser.add_argument("--no-fetch", action="store_true", help="Skip git fetch; useful for tests or offline hooks.")
|
||
parser.add_argument("--max-commits", type=int, default=20, help="Maximum commits to summarize per direction.")
|
||
return parser.parse_args()
|
||
|
||
|
||
def main() -> int:
|
||
args = parse_args()
|
||
root = repo_root()
|
||
now = datetime.now().astimezone()
|
||
date_text = args.date or now.strftime("%Y-%m-%d")
|
||
log_path = Path(args.log_path) if args.log_path else root / "document" / "work-log" / f"{date_text}.md"
|
||
|
||
git_data = collect_git(root, no_fetch=args.no_fetch, max_commits=args.max_commits)
|
||
entry = build_work_entry(now, args.event, git_data)
|
||
issue_entry = build_issue_entry(now, git_data)
|
||
marker = f"auto-log:{git_data.get('head_hash')}"
|
||
|
||
if args.dry_run:
|
||
print(entry)
|
||
if issue_entry:
|
||
print()
|
||
print(issue_entry)
|
||
return 0
|
||
|
||
content = ensure_document(log_path, date_text)
|
||
if marker in content and not args.force:
|
||
print(f"Work log already contains {marker}; skipping.")
|
||
return 0
|
||
|
||
content = insert_under_section(content, SECTION_WORK, entry)
|
||
if issue_entry:
|
||
content = insert_under_section(content, SECTION_ISSUES, issue_entry)
|
||
|
||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||
log_path.write_text(content, encoding="utf-8")
|
||
print(f"updated_log={os.path.relpath(log_path, root)}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|