chore(logs): split agent change log outputs

This commit is contained in:
caoxiaozhu
2026-06-25 09:35:18 +08:00
parent 4d8a606cd6
commit 6b0756a55f
9 changed files with 653 additions and 154 deletions

View File

@@ -1,20 +1,19 @@
#!/usr/bin/env python3
"""Append an incremental X-Financial agent work-log entry."""
"""Write split X-Financial development logs."""
from __future__ import annotations
import argparse
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
SECTION_WORK = "## 当日工作内容"
SECTION_ISSUES = "## 遗留问题"
SECTION_TODO = "## TODO"
BUG_SECTION = "## 修复记录"
WORK_LOG_NAME = "work-logs.med"
@dataclass
@@ -63,7 +62,7 @@ def one_line(value: str) -> str:
return " ".join(value.strip().split())
def indent_block(text: str, limit: int = 8) -> list[str]:
def compact_lines(text: str, limit: int = 8) -> list[str]:
lines = [line.strip() for line in text.splitlines() if line.strip()]
if not lines:
return ["未发现"]
@@ -73,35 +72,33 @@ def indent_block(text: str, limit: int = 8) -> list[str]:
return shown
def ensure_document(path: Path, date_text: str) -> str:
if path.exists():
content = path.read_text(encoding="utf-8")
else:
content = f"# {date_text} 工作日志\n\n{SECTION_WORK}\n\n{SECTION_ISSUES}\n\n{SECTION_TODO}\n"
if not content.endswith("\n"):
content += "\n"
for heading in (SECTION_WORK, SECTION_ISSUES, SECTION_TODO):
if heading not in content:
content += f"\n{heading}\n"
return content
def slugify(value: str, fallback: str) -> str:
normalized = value.strip().lower()
chars: list[str] = []
previous_dash = False
for char in normalized:
keep = char.isascii() and char.isalnum()
keep = keep or "\u4e00" <= char <= "\u9fff"
if keep:
chars.append(char)
previous_dash = False
elif not previous_dash:
chars.append("-")
previous_dash = True
slug = "".join(chars).strip("-")
return slug or fallback
def insert_under_section(content: str, section: str, entry: str) -> str:
marker = f"{section}\n"
start = content.find(marker)
if start == -1:
return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n"
def date_dir(root: Path, date_text: str) -> Path:
return root / "document" / "development" / date_text
insert_at = start + len(marker)
next_section = content.find("\n## ", insert_at)
entry_text = "\n" + entry.rstrip() + "\n"
if next_section == -1:
return content[:insert_at].rstrip() + entry_text
before = content[:next_section].rstrip()
after = content[next_section:]
return before + entry_text + after
def bugs_dir(root: Path, date_text: str) -> Path:
return date_dir(root, date_text) / "dev-logs" / "bugs"
def feature_dir(root: Path, date_text: str) -> Path:
return date_dir(root, date_text) / "feature"
def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, str | bool]:
@@ -133,56 +130,232 @@ def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, st
head_subject = git(["show", "-s", "--format=%s", "HEAD"], root)
data["head_hash"] = head_hash.stdout.strip() if head_hash.ok else ""
data["head_subject"] = head_subject.stdout.strip() if head_subject.ok else ""
stat = git(["show", "--stat", "--oneline", "--decorate", "--max-count=1", "HEAD"], root)
data["head_stat"] = stat.stdout.strip() if stat.ok else ""
data["fetch_failed"] = not fetch_result.ok
return data
def build_work_entry(now: datetime, event: str, git_data: dict[str, str | bool]) -> str:
def git_check_text(git_data: dict[str, str | bool]) -> str:
behind_text = "".join(compact_lines(str(git_data.get("behind") or "")))
ahead_text = "".join(compact_lines(str(git_data.get("ahead") or "")))
upstream = git_data.get("upstream") or "未配置"
return (
f"fetch {git_data.get('fetch')}upstream `{upstream}`"
f"upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}"
)
def looks_like_bug(subject: str) -> bool:
lower = subject.strip().lower()
if lower.startswith(("fix", "bugfix", "hotfix", "revert")):
return True
return any(keyword in lower for keyword in ("bug", "修复", "缺陷", "故障", "报错", "失败", "异常"))
def insert_under_section(content: str, section: str, entry: str) -> str:
marker = f"{section}\n"
start = content.find(marker)
if start == -1:
return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n"
insert_at = start + len(marker)
next_section = content.find("\n## ", insert_at)
entry_text = "\n" + entry.rstrip() + "\n"
if next_section == -1:
return content[:insert_at].rstrip() + entry_text
before = content[:next_section].rstrip()
after = content[next_section:]
return before + entry_text + after
def ensure_bug_document(path: Path, *, title: str, date_text: str) -> str:
if path.exists():
content = path.read_text(encoding="utf-8")
else:
rel_path = path.as_posix().split("document/development/", 1)[-1]
content = "\n".join(
[
f"# {title}",
"",
f"日期:{date_text}",
f"文档路径document/development/{rel_path}",
"",
BUG_SECTION,
"",
]
)
if not content.endswith("\n"):
content += "\n"
if BUG_SECTION not in content:
content += f"\n{BUG_SECTION}\n"
return content
def build_bug_entry(now: datetime, event: str, title: str, git_data: dict[str, str | bool]) -> str:
time_text = now.strftime("%H:%M")
head_hash = str(git_data.get("head_hash") or "")
head_subject = str(git_data.get("head_subject") or "")
behind = str(git_data.get("behind") or "")
ahead = str(git_data.get("ahead") or "")
behind_lines = indent_block(behind)
ahead_lines = indent_block(ahead)
behind_text = "".join(behind_lines)
ahead_text = "".join(ahead_lines)
marker = f"auto-log:{head_hash}" if head_hash else "auto-log:no-head"
marker = f"bug-log:{head_hash}" if head_hash else f"bug-log:{time_text}"
return "\n".join(
[
f"- {time_text}自动记录 `{head_hash}` 提交后的工作日志。({marker}",
f" - Git 提交检查:fetch {git_data.get('fetch')}upstream `{git_data.get('upstream') or '未配置'}`upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}",
f"- {time_text}:记录 bug 修复:{title}。({marker}",
f" - Git 提交检查:{git_check_text(git_data)}",
f" - 修改:最近提交为 `{head_hash} {head_subject}`。",
f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py`,自动读取 Git 状态并写入当天日志",
" - 验证:自动脚本只记录提交和 Git 状态,不替代业务测试;业务验证仍以本次任务实际运行结果为准",
" - 影响:提交后即使执行代理忘记手动写日志,也会留下最低限度的时间、提交和分支状态记录",
f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py --kind bug`,写入当天 `dev-logs/bugs`",
" - 验证:记录修复时应引用本次任务实际运行的测试、构建或手工验证结果;自动脚本不替代业务验证",
" - 影响:该 bug 会在 17:00 综合日志中与当天功能点一起汇总",
]
)
def build_issue_entry(now: datetime, git_data: dict[str, str | bool]) -> str | None:
time_text = now.strftime("%H:%M")
issues: list[str] = []
if git_data.get("fetch_failed"):
issues.append(f"fetch 未成功:{git_data.get('fetch')}")
if not git_data.get("upstream"):
issues.append("当前分支没有 upstream无法判断其他智能体是否已推送新提交")
if not issues:
return None
return f"- {time_text}:自动日志触发时发现 {''.join(issues)}。建议后续在有 Git 写权限和网络权限的环境里重新执行拉取检查。"
def write_bug_log(args: argparse.Namespace, root: Path, now: datetime, git_data: dict[str, str | bool]) -> int:
date_text = args.date or now.strftime("%Y-%m-%d")
head_hash = str(git_data.get("head_hash") or "")
head_subject = str(git_data.get("head_subject") or "")
title = args.bug_title or head_subject or "未命名 bug 修复"
slug = slugify(args.bug_slug or title, fallback=f"bug-{head_hash or now.strftime('%H%M%S')}")
path = Path(args.log_path) if args.log_path else bugs_dir(root, date_text) / f"{slug}.md"
entry = build_bug_entry(now, args.event, title, git_data)
marker = f"bug-log:{head_hash}" if head_hash else ""
if args.dry_run:
print(f"target_log={os.path.relpath(path, root)}")
print(entry)
return 0
content = ensure_bug_document(path, title=title, date_text=date_text)
if marker and marker in content and not args.force:
print(f"Bug log already contains {marker}; skipping.")
return 0
content = insert_under_section(content, BUG_SECTION, entry)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
print(f"updated_bug_log={os.path.relpath(path, root)}")
return 0
def first_heading(path: Path) -> str:
for line in path.read_text(encoding="utf-8").splitlines():
if line.startswith("# "):
return line[2:].strip()
return path.parent.name
def section_first_text(content: str, section: str) -> str:
marker = f"## {section}"
start = content.find(marker)
if start == -1:
return ""
tail = content[start + len(marker) :].split("\n## ", 1)[0]
for line in tail.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
return stripped.strip("- ").strip()
return ""
def summarize_feature(path: Path) -> str:
concept = path / "CONCEPT.md"
todo = path / "TODO.md"
title = path.name
sentence = ""
if concept.exists():
concept_text = concept.read_text(encoding="utf-8")
title = first_heading(concept)
sentence = section_first_text(concept_text, "功能一句话")
todo_text = todo.read_text(encoding="utf-8") if todo.exists() else ""
done_count = len(re.findall(r"^- \[x\]", todo_text, flags=re.MULTILINE | re.IGNORECASE))
open_count = len(re.findall(r"^- \[ \]", todo_text, flags=re.MULTILINE))
status = f"TODO 完成 {done_count} 项,未完成 {open_count}" if todo.exists() else "未找到 TODO.md"
detail = sentence or "未提取到功能一句话"
return f"- {title}{detail}{status};目录:`{path.name}`"
def summarize_bug(path: Path) -> str:
title = first_heading(path)
content = path.read_text(encoding="utf-8")
latest = ""
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith("- ") and "" in stripped:
latest = stripped[2:]
return f"- {title}{latest or '已记录修复内容'}(文件:`{path.name}`"
def build_daily_summary(root: Path, date_text: str, now: datetime) -> str:
features = []
feature_root = feature_dir(root, date_text)
if feature_root.exists():
for path in sorted(feature_root.iterdir()):
if path.is_dir():
features.append(summarize_feature(path))
bugs = []
bug_root = bugs_dir(root, date_text)
if bug_root.exists():
for path in sorted(bug_root.glob("*.md")):
bugs.append(summarize_bug(path))
feature_text = "\n".join(features) if features else "- 今日未发现功能点文档。"
bug_text = "\n".join(bugs) if bugs else "- 今日未发现 bug 修复记录。"
analysis_parts = []
if features:
analysis_parts.append(f"功能侧沉淀了 {len(features)} 个功能点")
if bugs:
analysis_parts.append(f"问题侧记录了 {len(bugs)} 个 bug 修复")
analysis = "".join(analysis_parts) if analysis_parts else "今日目录下暂无功能点或 bug 记录"
return "\n".join(
[
f"# {date_text} 综合工作日志",
"",
f"生成时间:{now.strftime('%Y-%m-%d %H:%M:%S %Z')}",
"来源:`feature/` 功能点文档与 `dev-logs/bugs/` bug 记录",
"",
"## 今日功能点",
"",
feature_text,
"",
"## 今日 Bugs",
"",
bug_text,
"",
"## 综合分析",
"",
f"- {analysis}",
"- 后续复盘优先看本文件,再回到对应功能点或 bug 文件追溯证据。",
"",
]
)
def write_daily_summary(args: argparse.Namespace, root: Path, now: datetime) -> int:
date_text = args.date or now.strftime("%Y-%m-%d")
path = Path(args.log_path) if args.log_path else date_dir(root, date_text) / WORK_LOG_NAME
content = build_daily_summary(root, date_text, now)
if args.dry_run:
print(f"target_log={os.path.relpath(path, root)}")
print(content)
return 0
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
print(f"updated_work_log={os.path.relpath(path, root)}")
return 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--kind", choices=("auto", "bug", "summary"), default="auto")
parser.add_argument("--event", default="manual", help="Human-readable trigger source.")
parser.add_argument("--date", help="Override log date YYYY-MM-DD.")
parser.add_argument("--log-path", help="Override log file path.")
parser.add_argument("--dry-run", action="store_true", help="Print the entry without writing files.")
parser.add_argument("--log-path", help="Override output log path.")
parser.add_argument("--bug-title", help="Human-readable bug title.")
parser.add_argument("--bug-slug", help="Stable bug log file slug.")
parser.add_argument("--dry-run", action="store_true", help="Print output without writing files.")
parser.add_argument("--force", action="store_true", help="Write even if this commit marker already exists.")
parser.add_argument("--no-fetch", action="store_true", help="Skip git fetch; useful for tests or offline hooks.")
parser.add_argument("--max-commits", type=int, default=20, help="Maximum commits to summarize per direction.")
@@ -193,34 +366,18 @@ def main() -> int:
args = parse_args()
root = repo_root()
now = datetime.now().astimezone()
date_text = args.date or now.strftime("%Y-%m-%d")
log_path = Path(args.log_path) if args.log_path else root / "document" / "work-log" / f"{date_text}.md"
if args.kind == "summary":
return write_daily_summary(args, root, now)
git_data = collect_git(root, no_fetch=args.no_fetch, max_commits=args.max_commits)
entry = build_work_entry(now, args.event, git_data)
issue_entry = build_issue_entry(now, git_data)
marker = f"auto-log:{git_data.get('head_hash')}"
subject = str(git_data.get("head_subject") or "")
if args.dry_run:
print(entry)
if issue_entry:
print()
print(issue_entry)
if args.kind == "auto" and not looks_like_bug(subject):
print(f"skipped_non_bug_commit={subject}")
return 0
content = ensure_document(log_path, date_text)
if marker in content and not args.force:
print(f"Work log already contains {marker}; skipping.")
return 0
content = insert_under_section(content, SECTION_WORK, entry)
if issue_entry:
content = insert_under_section(content, SECTION_ISSUES, issue_entry)
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text(content, encoding="utf-8")
print(f"updated_log={os.path.relpath(log_path, root)}")
return 0
return write_bug_log(args, root, now, git_data)
if __name__ == "__main__":