228 lines
8.7 KiB
Python
228 lines
8.7 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""Append an incremental X-Financial agent work-log entry."""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import os
|
|||
|
|
import subprocess
|
|||
|
|
import sys
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
from datetime import datetime
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
|
|||
|
|
SECTION_WORK = "## 当日工作内容"
|
|||
|
|
SECTION_ISSUES = "## 遗留问题"
|
|||
|
|
SECTION_TODO = "## TODO"
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class CommandResult:
|
|||
|
|
args: list[str]
|
|||
|
|
returncode: int
|
|||
|
|
stdout: str
|
|||
|
|
stderr: str
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def ok(self) -> bool:
|
|||
|
|
return self.returncode == 0
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def message(self) -> str:
|
|||
|
|
return (self.stderr.strip() or self.stdout.strip()).strip()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
|
|||
|
|
try:
|
|||
|
|
result = subprocess.run(
|
|||
|
|
args,
|
|||
|
|
cwd=cwd,
|
|||
|
|
text=True,
|
|||
|
|
stdout=subprocess.PIPE,
|
|||
|
|
stderr=subprocess.PIPE,
|
|||
|
|
timeout=timeout,
|
|||
|
|
)
|
|||
|
|
except subprocess.TimeoutExpired as exc:
|
|||
|
|
return CommandResult(args, 124, exc.stdout or "", exc.stderr or "command timed out")
|
|||
|
|
return CommandResult(args, result.returncode, result.stdout, result.stderr)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def git(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult:
|
|||
|
|
return run(["git", *args], cwd, timeout=timeout)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def repo_root() -> Path:
|
|||
|
|
result = run(["git", "rev-parse", "--show-toplevel"], Path.cwd())
|
|||
|
|
if not result.ok:
|
|||
|
|
raise SystemExit(f"Not inside a git repository: {result.message}")
|
|||
|
|
return Path(result.stdout.strip())
|
|||
|
|
|
|||
|
|
|
|||
|
|
def one_line(value: str) -> str:
|
|||
|
|
return " ".join(value.strip().split())
|
|||
|
|
|
|||
|
|
|
|||
|
|
def indent_block(text: str, limit: int = 8) -> list[str]:
|
|||
|
|
lines = [line.strip() for line in text.splitlines() if line.strip()]
|
|||
|
|
if not lines:
|
|||
|
|
return ["未发现"]
|
|||
|
|
shown = lines[:limit]
|
|||
|
|
if len(lines) > limit:
|
|||
|
|
shown.append(f"... 另有 {len(lines) - limit} 条")
|
|||
|
|
return shown
|
|||
|
|
|
|||
|
|
|
|||
|
|
def ensure_document(path: Path, date_text: str) -> str:
|
|||
|
|
if path.exists():
|
|||
|
|
content = path.read_text(encoding="utf-8")
|
|||
|
|
else:
|
|||
|
|
content = f"# {date_text} 工作日志\n\n{SECTION_WORK}\n\n{SECTION_ISSUES}\n\n{SECTION_TODO}\n"
|
|||
|
|
|
|||
|
|
if not content.endswith("\n"):
|
|||
|
|
content += "\n"
|
|||
|
|
|
|||
|
|
for heading in (SECTION_WORK, SECTION_ISSUES, SECTION_TODO):
|
|||
|
|
if heading not in content:
|
|||
|
|
content += f"\n{heading}\n"
|
|||
|
|
return content
|
|||
|
|
|
|||
|
|
|
|||
|
|
def insert_under_section(content: str, section: str, entry: str) -> str:
|
|||
|
|
marker = f"{section}\n"
|
|||
|
|
start = content.find(marker)
|
|||
|
|
if start == -1:
|
|||
|
|
return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n"
|
|||
|
|
|
|||
|
|
insert_at = start + len(marker)
|
|||
|
|
next_section = content.find("\n## ", insert_at)
|
|||
|
|
entry_text = "\n" + entry.rstrip() + "\n"
|
|||
|
|
if next_section == -1:
|
|||
|
|
return content[:insert_at].rstrip() + entry_text
|
|||
|
|
before = content[:next_section].rstrip()
|
|||
|
|
after = content[next_section:]
|
|||
|
|
return before + entry_text + after
|
|||
|
|
|
|||
|
|
|
|||
|
|
def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, str | bool]:
|
|||
|
|
data: dict[str, str | bool] = {}
|
|||
|
|
|
|||
|
|
fetch_result = CommandResult(["git", "fetch"], 0, "", "")
|
|||
|
|
if no_fetch:
|
|||
|
|
data["fetch"] = "已跳过 fetch(--no-fetch)"
|
|||
|
|
else:
|
|||
|
|
fetch_result = git(["fetch", "--all", "--prune"], root, timeout=60)
|
|||
|
|
data["fetch"] = "成功" if fetch_result.ok else f"失败:{one_line(fetch_result.message)}"
|
|||
|
|
|
|||
|
|
status = git(["status", "-sb"], root)
|
|||
|
|
data["status"] = status.stdout.strip() if status.ok else f"读取失败:{one_line(status.message)}"
|
|||
|
|
|
|||
|
|
upstream = git(["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], root)
|
|||
|
|
if upstream.ok:
|
|||
|
|
data["upstream"] = upstream.stdout.strip()
|
|||
|
|
behind = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "HEAD..@{u}"], root)
|
|||
|
|
ahead = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "@{u}..HEAD"], root)
|
|||
|
|
data["behind"] = behind.stdout.strip() if behind.ok else f"读取失败:{one_line(behind.message)}"
|
|||
|
|
data["ahead"] = ahead.stdout.strip() if ahead.ok else f"读取失败:{one_line(ahead.message)}"
|
|||
|
|
else:
|
|||
|
|
data["upstream"] = ""
|
|||
|
|
data["behind"] = "无 upstream 分支"
|
|||
|
|
data["ahead"] = "无 upstream 分支"
|
|||
|
|
|
|||
|
|
head_hash = git(["rev-parse", "--short", "HEAD"], root)
|
|||
|
|
head_subject = git(["show", "-s", "--format=%s", "HEAD"], root)
|
|||
|
|
data["head_hash"] = head_hash.stdout.strip() if head_hash.ok else ""
|
|||
|
|
data["head_subject"] = head_subject.stdout.strip() if head_subject.ok else ""
|
|||
|
|
|
|||
|
|
stat = git(["show", "--stat", "--oneline", "--decorate", "--max-count=1", "HEAD"], root)
|
|||
|
|
data["head_stat"] = stat.stdout.strip() if stat.ok else ""
|
|||
|
|
data["fetch_failed"] = not fetch_result.ok
|
|||
|
|
return data
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_work_entry(now: datetime, event: str, git_data: dict[str, str | bool]) -> str:
|
|||
|
|
time_text = now.strftime("%H:%M")
|
|||
|
|
head_hash = str(git_data.get("head_hash") or "")
|
|||
|
|
head_subject = str(git_data.get("head_subject") or "")
|
|||
|
|
behind = str(git_data.get("behind") or "")
|
|||
|
|
ahead = str(git_data.get("ahead") or "")
|
|||
|
|
|
|||
|
|
behind_lines = indent_block(behind)
|
|||
|
|
ahead_lines = indent_block(ahead)
|
|||
|
|
behind_text = ";".join(behind_lines)
|
|||
|
|
ahead_text = ";".join(ahead_lines)
|
|||
|
|
|
|||
|
|
marker = f"auto-log:{head_hash}" if head_hash else "auto-log:no-head"
|
|||
|
|
return "\n".join(
|
|||
|
|
[
|
|||
|
|
f"- {time_text}:自动记录 `{head_hash}` 提交后的工作日志。({marker})",
|
|||
|
|
f" - Git 提交检查:fetch {git_data.get('fetch')};upstream `{git_data.get('upstream') or '未配置'}`;upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}。",
|
|||
|
|
f" - 修改:最近提交为 `{head_hash} {head_subject}`。",
|
|||
|
|
f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py`,自动读取 Git 状态并写入当天日志。",
|
|||
|
|
" - 验证:自动脚本只记录提交和 Git 状态,不替代业务测试;业务验证仍以本次任务实际运行结果为准。",
|
|||
|
|
" - 影响:提交后即使执行代理忘记手动写日志,也会留下最低限度的时间、提交和分支状态记录。",
|
|||
|
|
]
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_issue_entry(now: datetime, git_data: dict[str, str | bool]) -> str | None:
|
|||
|
|
time_text = now.strftime("%H:%M")
|
|||
|
|
issues: list[str] = []
|
|||
|
|
if git_data.get("fetch_failed"):
|
|||
|
|
issues.append(f"fetch 未成功:{git_data.get('fetch')}")
|
|||
|
|
if not git_data.get("upstream"):
|
|||
|
|
issues.append("当前分支没有 upstream,无法判断其他智能体是否已推送新提交")
|
|||
|
|
if not issues:
|
|||
|
|
return None
|
|||
|
|
return f"- {time_text}:自动日志触发时发现 {';'.join(issues)}。建议后续在有 Git 写权限和网络权限的环境里重新执行拉取检查。"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_args() -> argparse.Namespace:
|
|||
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|||
|
|
parser.add_argument("--event", default="manual", help="Human-readable trigger source.")
|
|||
|
|
parser.add_argument("--date", help="Override log date YYYY-MM-DD.")
|
|||
|
|
parser.add_argument("--log-path", help="Override log file path.")
|
|||
|
|
parser.add_argument("--dry-run", action="store_true", help="Print the entry without writing files.")
|
|||
|
|
parser.add_argument("--force", action="store_true", help="Write even if this commit marker already exists.")
|
|||
|
|
parser.add_argument("--no-fetch", action="store_true", help="Skip git fetch; useful for tests or offline hooks.")
|
|||
|
|
parser.add_argument("--max-commits", type=int, default=20, help="Maximum commits to summarize per direction.")
|
|||
|
|
return parser.parse_args()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main() -> int:
|
|||
|
|
args = parse_args()
|
|||
|
|
root = repo_root()
|
|||
|
|
now = datetime.now().astimezone()
|
|||
|
|
date_text = args.date or now.strftime("%Y-%m-%d")
|
|||
|
|
log_path = Path(args.log_path) if args.log_path else root / "document" / "work-log" / f"{date_text}.md"
|
|||
|
|
|
|||
|
|
git_data = collect_git(root, no_fetch=args.no_fetch, max_commits=args.max_commits)
|
|||
|
|
entry = build_work_entry(now, args.event, git_data)
|
|||
|
|
issue_entry = build_issue_entry(now, git_data)
|
|||
|
|
marker = f"auto-log:{git_data.get('head_hash')}"
|
|||
|
|
|
|||
|
|
if args.dry_run:
|
|||
|
|
print(entry)
|
|||
|
|
if issue_entry:
|
|||
|
|
print()
|
|||
|
|
print(issue_entry)
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
content = ensure_document(log_path, date_text)
|
|||
|
|
if marker in content and not args.force:
|
|||
|
|
print(f"Work log already contains {marker}; skipping.")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
content = insert_under_section(content, SECTION_WORK, entry)
|
|||
|
|
if issue_entry:
|
|||
|
|
content = insert_under_section(content, SECTION_ISSUES, issue_entry)
|
|||
|
|
|
|||
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|||
|
|
log_path.write_text(content, encoding="utf-8")
|
|||
|
|
print(f"updated_log={os.path.relpath(log_path, root)}")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
raise SystemExit(main())
|