#!/usr/bin/env python3 """Append an incremental X-Financial agent work-log entry.""" from __future__ import annotations import argparse import os import subprocess import sys from dataclasses import dataclass from datetime import datetime from pathlib import Path SECTION_WORK = "## 当日工作内容" SECTION_ISSUES = "## 遗留问题" SECTION_TODO = "## TODO" @dataclass class CommandResult: args: list[str] returncode: int stdout: str stderr: str @property def ok(self) -> bool: return self.returncode == 0 @property def message(self) -> str: return (self.stderr.strip() or self.stdout.strip()).strip() def run(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult: try: result = subprocess.run( args, cwd=cwd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, ) except subprocess.TimeoutExpired as exc: return CommandResult(args, 124, exc.stdout or "", exc.stderr or "command timed out") return CommandResult(args, result.returncode, result.stdout, result.stderr) def git(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult: return run(["git", *args], cwd, timeout=timeout) def repo_root() -> Path: result = run(["git", "rev-parse", "--show-toplevel"], Path.cwd()) if not result.ok: raise SystemExit(f"Not inside a git repository: {result.message}") return Path(result.stdout.strip()) def one_line(value: str) -> str: return " ".join(value.strip().split()) def indent_block(text: str, limit: int = 8) -> list[str]: lines = [line.strip() for line in text.splitlines() if line.strip()] if not lines: return ["未发现"] shown = lines[:limit] if len(lines) > limit: shown.append(f"... 另有 {len(lines) - limit} 条") return shown def ensure_document(path: Path, date_text: str) -> str: if path.exists(): content = path.read_text(encoding="utf-8") else: content = f"# {date_text} 工作日志\n\n{SECTION_WORK}\n\n{SECTION_ISSUES}\n\n{SECTION_TODO}\n" if not content.endswith("\n"): content += "\n" for heading in (SECTION_WORK, SECTION_ISSUES, SECTION_TODO): if heading not in content: content += f"\n{heading}\n" return content def insert_under_section(content: str, section: str, entry: str) -> str: marker = f"{section}\n" start = content.find(marker) if start == -1: return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n" insert_at = start + len(marker) next_section = content.find("\n## ", insert_at) entry_text = "\n" + entry.rstrip() + "\n" if next_section == -1: return content[:insert_at].rstrip() + entry_text before = content[:next_section].rstrip() after = content[next_section:] return before + entry_text + after def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, str | bool]: data: dict[str, str | bool] = {} fetch_result = CommandResult(["git", "fetch"], 0, "", "") if no_fetch: data["fetch"] = "已跳过 fetch(--no-fetch)" else: fetch_result = git(["fetch", "--all", "--prune"], root, timeout=60) data["fetch"] = "成功" if fetch_result.ok else f"失败:{one_line(fetch_result.message)}" status = git(["status", "-sb"], root) data["status"] = status.stdout.strip() if status.ok else f"读取失败:{one_line(status.message)}" upstream = git(["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], root) if upstream.ok: data["upstream"] = upstream.stdout.strip() behind = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "HEAD..@{u}"], root) ahead = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "@{u}..HEAD"], root) data["behind"] = behind.stdout.strip() if behind.ok else f"读取失败:{one_line(behind.message)}" data["ahead"] = ahead.stdout.strip() if ahead.ok else f"读取失败:{one_line(ahead.message)}" else: data["upstream"] = "" data["behind"] = "无 upstream 分支" data["ahead"] = "无 upstream 分支" head_hash = git(["rev-parse", "--short", "HEAD"], root) head_subject = git(["show", "-s", "--format=%s", "HEAD"], root) data["head_hash"] = head_hash.stdout.strip() if head_hash.ok else "" data["head_subject"] = head_subject.stdout.strip() if head_subject.ok else "" stat = git(["show", "--stat", "--oneline", "--decorate", "--max-count=1", "HEAD"], root) data["head_stat"] = stat.stdout.strip() if stat.ok else "" data["fetch_failed"] = not fetch_result.ok return data def build_work_entry(now: datetime, event: str, git_data: dict[str, str | bool]) -> str: time_text = now.strftime("%H:%M") head_hash = str(git_data.get("head_hash") or "") head_subject = str(git_data.get("head_subject") or "") behind = str(git_data.get("behind") or "") ahead = str(git_data.get("ahead") or "") behind_lines = indent_block(behind) ahead_lines = indent_block(ahead) behind_text = ";".join(behind_lines) ahead_text = ";".join(ahead_lines) marker = f"auto-log:{head_hash}" if head_hash else "auto-log:no-head" return "\n".join( [ f"- {time_text}:自动记录 `{head_hash}` 提交后的工作日志。({marker})", f" - Git 提交检查:fetch {git_data.get('fetch')};upstream `{git_data.get('upstream') or '未配置'}`;upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}。", f" - 修改:最近提交为 `{head_hash} {head_subject}`。", f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py`,自动读取 Git 状态并写入当天日志。", " - 验证:自动脚本只记录提交和 Git 状态,不替代业务测试;业务验证仍以本次任务实际运行结果为准。", " - 影响:提交后即使执行代理忘记手动写日志,也会留下最低限度的时间、提交和分支状态记录。", ] ) def build_issue_entry(now: datetime, git_data: dict[str, str | bool]) -> str | None: time_text = now.strftime("%H:%M") issues: list[str] = [] if git_data.get("fetch_failed"): issues.append(f"fetch 未成功:{git_data.get('fetch')}") if not git_data.get("upstream"): issues.append("当前分支没有 upstream,无法判断其他智能体是否已推送新提交") if not issues: return None return f"- {time_text}:自动日志触发时发现 {';'.join(issues)}。建议后续在有 Git 写权限和网络权限的环境里重新执行拉取检查。" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--event", default="manual", help="Human-readable trigger source.") parser.add_argument("--date", help="Override log date YYYY-MM-DD.") parser.add_argument("--log-path", help="Override log file path.") parser.add_argument("--dry-run", action="store_true", help="Print the entry without writing files.") parser.add_argument("--force", action="store_true", help="Write even if this commit marker already exists.") parser.add_argument("--no-fetch", action="store_true", help="Skip git fetch; useful for tests or offline hooks.") parser.add_argument("--max-commits", type=int, default=20, help="Maximum commits to summarize per direction.") return parser.parse_args() def main() -> int: args = parse_args() root = repo_root() now = datetime.now().astimezone() date_text = args.date or now.strftime("%Y-%m-%d") log_path = Path(args.log_path) if args.log_path else root / "document" / "work-log" / f"{date_text}.md" git_data = collect_git(root, no_fetch=args.no_fetch, max_commits=args.max_commits) entry = build_work_entry(now, args.event, git_data) issue_entry = build_issue_entry(now, git_data) marker = f"auto-log:{git_data.get('head_hash')}" if args.dry_run: print(entry) if issue_entry: print() print(issue_entry) return 0 content = ensure_document(log_path, date_text) if marker in content and not args.force: print(f"Work log already contains {marker}; skipping.") return 0 content = insert_under_section(content, SECTION_WORK, entry) if issue_entry: content = insert_under_section(content, SECTION_ISSUES, issue_entry) log_path.parent.mkdir(parents=True, exist_ok=True) log_path.write_text(content, encoding="utf-8") print(f"updated_log={os.path.relpath(log_path, root)}") return 0 if __name__ == "__main__": raise SystemExit(main())