#!/usr/bin/env python3 """Write split X-Financial development logs.""" from __future__ import annotations import argparse import os import re import subprocess from dataclasses import dataclass from datetime import datetime from pathlib import Path BUG_SECTION = "## 修复记录" WORK_LOG_NAME = "work-logs.med" @dataclass class CommandResult: args: list[str] returncode: int stdout: str stderr: str @property def ok(self) -> bool: return self.returncode == 0 @property def message(self) -> str: return (self.stderr.strip() or self.stdout.strip()).strip() def run(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult: try: result = subprocess.run( args, cwd=cwd, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, ) except subprocess.TimeoutExpired as exc: return CommandResult(args, 124, exc.stdout or "", exc.stderr or "command timed out") return CommandResult(args, result.returncode, result.stdout, result.stderr) def git(args: list[str], cwd: Path, timeout: int = 60) -> CommandResult: return run(["git", *args], cwd, timeout=timeout) def repo_root() -> Path: result = run(["git", "rev-parse", "--show-toplevel"], Path.cwd()) if not result.ok: raise SystemExit(f"Not inside a git repository: {result.message}") return Path(result.stdout.strip()) def one_line(value: str) -> str: return " ".join(value.strip().split()) def compact_lines(text: str, limit: int = 8) -> list[str]: lines = [line.strip() for line in text.splitlines() if line.strip()] if not lines: return ["未发现"] shown = lines[:limit] if len(lines) > limit: shown.append(f"... 另有 {len(lines) - limit} 条") return shown def slugify(value: str, fallback: str) -> str: normalized = value.strip().lower() chars: list[str] = [] previous_dash = False for char in normalized: keep = char.isascii() and char.isalnum() keep = keep or "\u4e00" <= char <= "\u9fff" if keep: chars.append(char) previous_dash = False elif not previous_dash: chars.append("-") previous_dash = True slug = "".join(chars).strip("-") return slug or fallback def date_dir(root: Path, date_text: str) -> Path: return root / "document" / "development" / date_text def bugs_dir(root: Path, date_text: str) -> Path: return date_dir(root, date_text) / "dev-logs" / "bugs" def feature_dir(root: Path, date_text: str) -> Path: return date_dir(root, date_text) / "feature" def collect_git(root: Path, *, no_fetch: bool, max_commits: int) -> dict[str, str | bool]: data: dict[str, str | bool] = {} fetch_result = CommandResult(["git", "fetch"], 0, "", "") if no_fetch: data["fetch"] = "已跳过 fetch(--no-fetch)" else: fetch_result = git(["fetch", "--all", "--prune"], root, timeout=60) data["fetch"] = "成功" if fetch_result.ok else f"失败:{one_line(fetch_result.message)}" status = git(["status", "-sb"], root) data["status"] = status.stdout.strip() if status.ok else f"读取失败:{one_line(status.message)}" upstream = git(["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], root) if upstream.ok: data["upstream"] = upstream.stdout.strip() behind = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "HEAD..@{u}"], root) ahead = git(["log", "--oneline", "--decorate", f"--max-count={max_commits}", "@{u}..HEAD"], root) data["behind"] = behind.stdout.strip() if behind.ok else f"读取失败:{one_line(behind.message)}" data["ahead"] = ahead.stdout.strip() if ahead.ok else f"读取失败:{one_line(ahead.message)}" else: data["upstream"] = "" data["behind"] = "无 upstream 分支" data["ahead"] = "无 upstream 分支" head_hash = git(["rev-parse", "--short", "HEAD"], root) head_subject = git(["show", "-s", "--format=%s", "HEAD"], root) data["head_hash"] = head_hash.stdout.strip() if head_hash.ok else "" data["head_subject"] = head_subject.stdout.strip() if head_subject.ok else "" data["fetch_failed"] = not fetch_result.ok return data def git_check_text(git_data: dict[str, str | bool]) -> str: behind_text = ";".join(compact_lines(str(git_data.get("behind") or ""))) ahead_text = ";".join(compact_lines(str(git_data.get("ahead") or ""))) upstream = git_data.get("upstream") or "未配置" return ( f"fetch {git_data.get('fetch')};upstream `{upstream}`;" f"upstream 新提交:{behind_text};本地 ahead 提交:{ahead_text}" ) def looks_like_bug(subject: str) -> bool: lower = subject.strip().lower() if lower.startswith(("fix", "bugfix", "hotfix", "revert")): return True return any(keyword in lower for keyword in ("bug", "修复", "缺陷", "故障", "报错", "失败", "异常")) def insert_under_section(content: str, section: str, entry: str) -> str: marker = f"{section}\n" start = content.find(marker) if start == -1: return content.rstrip() + f"\n\n{section}\n\n{entry.rstrip()}\n" insert_at = start + len(marker) next_section = content.find("\n## ", insert_at) entry_text = "\n" + entry.rstrip() + "\n" if next_section == -1: return content[:insert_at].rstrip() + entry_text before = content[:next_section].rstrip() after = content[next_section:] return before + entry_text + after def ensure_bug_document(path: Path, *, title: str, date_text: str) -> str: if path.exists(): content = path.read_text(encoding="utf-8") else: rel_path = path.as_posix().split("document/development/", 1)[-1] content = "\n".join( [ f"# {title}", "", f"日期:{date_text}", f"文档路径:document/development/{rel_path}", "", BUG_SECTION, "", ] ) if not content.endswith("\n"): content += "\n" if BUG_SECTION not in content: content += f"\n{BUG_SECTION}\n" return content def build_bug_entry(now: datetime, event: str, title: str, git_data: dict[str, str | bool]) -> str: time_text = now.strftime("%H:%M") head_hash = str(git_data.get("head_hash") or "") head_subject = str(git_data.get("head_subject") or "") marker = f"bug-log:{head_hash}" if head_hash else f"bug-log:{time_text}" return "\n".join( [ f"- {time_text}:记录 bug 修复:{title}。({marker})", f" - Git 提交检查:{git_check_text(git_data)}。", f" - 修改:最近提交为 `{head_hash} {head_subject}`。", f" - 操作:{event} 触发 `tools/agent-change-log/update_change_log.py --kind bug`,写入当天 `dev-logs/bugs`。", " - 验证:记录修复时应引用本次任务实际运行的测试、构建或手工验证结果;自动脚本不替代业务验证。", " - 影响:该 bug 会在 17:00 综合日志中与当天功能点一起汇总。", ] ) def write_bug_log(args: argparse.Namespace, root: Path, now: datetime, git_data: dict[str, str | bool]) -> int: date_text = args.date or now.strftime("%Y-%m-%d") head_hash = str(git_data.get("head_hash") or "") head_subject = str(git_data.get("head_subject") or "") title = args.bug_title or head_subject or "未命名 bug 修复" slug = slugify(args.bug_slug or title, fallback=f"bug-{head_hash or now.strftime('%H%M%S')}") path = Path(args.log_path) if args.log_path else bugs_dir(root, date_text) / f"{slug}.md" entry = build_bug_entry(now, args.event, title, git_data) marker = f"bug-log:{head_hash}" if head_hash else "" if args.dry_run: print(f"target_log={os.path.relpath(path, root)}") print(entry) return 0 content = ensure_bug_document(path, title=title, date_text=date_text) if marker and marker in content and not args.force: print(f"Bug log already contains {marker}; skipping.") return 0 content = insert_under_section(content, BUG_SECTION, entry) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") print(f"updated_bug_log={os.path.relpath(path, root)}") return 0 def first_heading(path: Path) -> str: for line in path.read_text(encoding="utf-8").splitlines(): if line.startswith("# "): return line[2:].strip() return path.parent.name def section_first_text(content: str, section: str) -> str: marker = f"## {section}" start = content.find(marker) if start == -1: return "" tail = content[start + len(marker) :].split("\n## ", 1)[0] for line in tail.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#"): return stripped.strip("- ").strip() return "" def summarize_feature(path: Path) -> str: concept = path / "CONCEPT.md" todo = path / "TODO.md" title = path.name sentence = "" if concept.exists(): concept_text = concept.read_text(encoding="utf-8") title = first_heading(concept) sentence = section_first_text(concept_text, "功能一句话") todo_text = todo.read_text(encoding="utf-8") if todo.exists() else "" done_count = len(re.findall(r"^- \[x\]", todo_text, flags=re.MULTILINE | re.IGNORECASE)) open_count = len(re.findall(r"^- \[ \]", todo_text, flags=re.MULTILINE)) status = f"TODO 完成 {done_count} 项,未完成 {open_count} 项" if todo.exists() else "未找到 TODO.md" detail = sentence or "未提取到功能一句话" return f"- {title}:{detail}({status};目录:`{path.name}`)" def summarize_bug(path: Path) -> str: title = first_heading(path) content = path.read_text(encoding="utf-8") latest = "" for line in content.splitlines(): stripped = line.strip() if stripped.startswith("- ") and ":" in stripped: latest = stripped[2:] return f"- {title}:{latest or '已记录修复内容'}(文件:`{path.name}`)" def build_daily_summary(root: Path, date_text: str, now: datetime) -> str: features = [] feature_root = feature_dir(root, date_text) if feature_root.exists(): for path in sorted(feature_root.iterdir()): if path.is_dir(): features.append(summarize_feature(path)) bugs = [] bug_root = bugs_dir(root, date_text) if bug_root.exists(): for path in sorted(bug_root.glob("*.md")): bugs.append(summarize_bug(path)) feature_text = "\n".join(features) if features else "- 今日未发现功能点文档。" bug_text = "\n".join(bugs) if bugs else "- 今日未发现 bug 修复记录。" analysis_parts = [] if features: analysis_parts.append(f"功能侧沉淀了 {len(features)} 个功能点") if bugs: analysis_parts.append(f"问题侧记录了 {len(bugs)} 个 bug 修复") analysis = ",".join(analysis_parts) if analysis_parts else "今日目录下暂无功能点或 bug 记录" return "\n".join( [ f"# {date_text} 综合工作日志", "", f"生成时间:{now.strftime('%Y-%m-%d %H:%M:%S %Z')}", "来源:`feature/` 功能点文档与 `dev-logs/bugs/` bug 记录", "", "## 今日功能点", "", feature_text, "", "## 今日 Bugs", "", bug_text, "", "## 综合分析", "", f"- {analysis}。", "- 后续复盘优先看本文件,再回到对应功能点或 bug 文件追溯证据。", "", ] ) def write_daily_summary(args: argparse.Namespace, root: Path, now: datetime) -> int: date_text = args.date or now.strftime("%Y-%m-%d") path = Path(args.log_path) if args.log_path else date_dir(root, date_text) / WORK_LOG_NAME content = build_daily_summary(root, date_text, now) if args.dry_run: print(f"target_log={os.path.relpath(path, root)}") print(content) return 0 path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") print(f"updated_work_log={os.path.relpath(path, root)}") return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--kind", choices=("auto", "bug", "summary"), default="auto") parser.add_argument("--event", default="manual", help="Human-readable trigger source.") parser.add_argument("--date", help="Override log date YYYY-MM-DD.") parser.add_argument("--log-path", help="Override output log path.") parser.add_argument("--bug-title", help="Human-readable bug title.") parser.add_argument("--bug-slug", help="Stable bug log file slug.") parser.add_argument("--dry-run", action="store_true", help="Print output without writing files.") parser.add_argument("--force", action="store_true", help="Write even if this commit marker already exists.") parser.add_argument("--no-fetch", action="store_true", help="Skip git fetch; useful for tests or offline hooks.") parser.add_argument("--max-commits", type=int, default=20, help="Maximum commits to summarize per direction.") return parser.parse_args() def main() -> int: args = parse_args() root = repo_root() now = datetime.now().astimezone() if args.kind == "summary": return write_daily_summary(args, root, now) git_data = collect_git(root, no_fetch=args.no_fetch, max_commits=args.max_commits) subject = str(git_data.get("head_subject") or "") if args.kind == "auto" and not looks_like_bug(subject): print(f"skipped_non_bug_commit={subject}") return 0 return write_bug_log(args, root, now, git_data) if __name__ == "__main__": raise SystemExit(main())