"""``muse shortlog`` — commit summary grouped by author, agent, model, or branch. Groups the commit history by a chosen dimension, counts commits per group, and optionally lists commit messages under each. Useful for changelogs, release notes, and auditing agent contribution. Muse's rich commit metadata — ``author``, ``agent_id``, ``model_id`` — makes shortlog especially expressive: you can see exactly which human or which agent class (and which model) contributed each set of commits. Usage:: muse shortlog # current branch, group by author muse shortlog --all # all branches muse shortlog --numbered # sort by commit count (most active first) muse shortlog --summary # counts only, no message list muse shortlog --group-by agent # group by agent_id instead of author muse shortlog --group-by model # group by model_id muse shortlog --group-by branch # group by originating branch muse shortlog --since 2025-01-01 # commits on or after this date muse shortlog --until 2025-06-30 # commits on or before this date muse shortlog --no-merges # exclude merge commits muse shortlog --json # machine-readable output JSON output schema:: { "repo_id": "", "branch": "", "truncated": false, "duration_ms": 4.2, "exit_code": 0, "groups": [ { "key": "", "count": , "commits": [ { "commit_id": "", "message": "", "committed_at": "", "author": "", "agent_id": "", "model_id": "" } ] } ] } ``truncated`` is ``true`` when ``--limit`` capped the number of commits loaded from history — the groups may be incomplete. It is ``false`` when all matching commits were included. All JSON responses (including the empty-result ``groups: []`` case) carry ``duration_ms`` (wall-clock ms) and ``exit_code`` so agent pipelines can parse timing and success uniformly. Date-parse errors (``--since`` / ``--until``) emit a structured JSON error to stdout when ``--json`` is set. Exit codes:: 0 — output produced (even if empty) 1 — bad date format or branch not found 2 — not a Muse repository Security model:: Branch names discovered via filesystem enumeration are checked for symlinks before being added to the list; symlinks inside ``.muse/refs/heads/`` are silently skipped. Author names, agent IDs, and commit messages are passed through ``sanitize_display`` in text mode; JSON output is left raw so callers receive the original values. """ import argparse import datetime import json import logging import pathlib import sys from collections import defaultdict from collections.abc import Callable from typing import TypedDict from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.paths import heads_dir as _heads_dir from muse.core.errors import ExitCode from muse.core.repo import read_repo_id, require_repo from muse.core.refs import read_current_branch from muse.core.commits import ( CommitRecord, get_commits_for_branch, ) from muse.core.validation import clamp_int, sanitize_display from muse.core.timing import start_timer type _GroupMap = dict[str, list["CommitRecord"]] logger = logging.getLogger(__name__) _GROUP_BY_CHOICES = ("author", "agent", "model", "branch") # --------------------------------------------------------------------------- # JSON wire format # --------------------------------------------------------------------------- class _CommitEntryJson(TypedDict): """Per-commit entry inside a shortlog group.""" commit_id: str message: str committed_at: str author: str | None agent_id: str | None model_id: str | None class _GroupJson(TypedDict): """One group (author / agent / model / branch) in the shortlog JSON.""" key: str count: int commits: list[_CommitEntryJson] class _ShortlogJson(EnvelopeJson): """Top-level JSON output for ``muse shortlog --json``.""" repo_id: str branch: str groups: list[_GroupJson] truncated: bool # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _branch_names(root: pathlib.Path) -> list[str]: """Return all branch names found under ``.muse/refs/heads/``. Symlinks are silently skipped — a symlink inside the refs directory could point to a file outside the repository and must not be followed. """ heads_dir = _heads_dir(root) if not heads_dir.exists(): return [] branches: list[str] = [] for ref_file in sorted(heads_dir.rglob("*")): if ref_file.is_symlink(): logger.warning("⚠️ Skipping symlink ref: %s", ref_file) continue if ref_file.is_file(): branches.append(str(ref_file.relative_to(heads_dir).as_posix())) return branches def _group_key(commit: CommitRecord, group_by: str) -> str: """Return the grouping key for *commit* based on *group_by*.""" if group_by == "author": if commit.author: return commit.author if commit.agent_id: return f"{commit.agent_id} (agent)" return "(unknown)" if group_by == "agent": return commit.agent_id or "(no agent)" if group_by == "model": return commit.model_id or "(no model)" if group_by == "branch": return commit.branch or "(unknown branch)" return commit.author or "(unknown)" def _build_groups( commits: list[CommitRecord], *, group_by: str, by_email: bool, ) -> _GroupMap: """Partition *commits* into groups keyed by *group_by*. When *by_email* is ``True`` and the commit has an ``agent_id`` distinct from the author, the agent ID is appended to the key in angle brackets. """ groups: _GroupMap = defaultdict(list) for c in commits: key = _group_key(c, group_by) if by_email and c.agent_id and c.agent_id != c.author: key = f"{key} <{c.agent_id}>" groups[key].append(c) return groups def _parse_date(value: str, flag: str) -> datetime.datetime: """Parse a YYYY-MM-DD date string into an aware UTC datetime. Pure parser — raises :exc:`ValueError` on bad input so the CLI layer can choose how to surface the error (JSON to stdout or plain text to stderr). No I/O is performed here. Args: value: Date string to parse. flag: Flag name for inclusion in the error message (e.g. ``"--since"``). Returns: Timezone-aware UTC :class:`datetime.datetime`. Raises: ValueError: If *value* is not a ``YYYY-MM-DD`` string. """ try: naive = datetime.datetime.strptime(value, "%Y-%m-%d") except ValueError: raise ValueError( f"{flag} must be YYYY-MM-DD (got '{sanitize_display(value)}')" ) return naive.replace(tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Command registration # --------------------------------------------------------------------------- def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``muse shortlog`` subcommand.""" parser = subparsers.add_parser( "shortlog", help="Summarise commit history grouped by author, agent, model, or branch.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "branch_opt", nargs="?", default=None, metavar="BRANCH", help="Branch to summarise (default: current branch).", ) parser.add_argument( "--all", dest="all_branches", action="store_true", help="Include all branches.", ) parser.add_argument( "--numbered", action="store_true", help="Sort by commit count (most active first).", ) parser.add_argument( "--summary", "-s", action="store_true", help="Show commit counts only — suppress individual message lines.", ) parser.add_argument( "--email", dest="by_email", action="store_true", help="Append agent_id to the group key when present.", ) parser.add_argument( "--group-by", dest="group_by", default="author", choices=list(_GROUP_BY_CHOICES), metavar="FIELD", help=( f"Dimension to group by: {', '.join(_GROUP_BY_CHOICES)} " "(default: author)." ), ) parser.add_argument( "--no-merges", dest="no_merges", action="store_true", help="Exclude merge commits (commits with more than one parent).", ) parser.add_argument( "--since", default=None, metavar="YYYY-MM-DD", help="Include only commits on or after this date (UTC).", ) parser.add_argument( "--until", default=None, metavar="YYYY-MM-DD", help="Include only commits on or before this date (UTC).", ) parser.add_argument( "--limit", type=int, default=0, metavar="N", help="Cap the number of commits loaded per branch (0 = no limit).", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit machine-readable JSON on stdout.", ) parser.set_defaults(func=run) # --------------------------------------------------------------------------- # Main handler # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> None: """Summarise commit history grouped by author, agent, model, or branch. Each group shows the key, commit count, and (unless ``--summary``) each commit message indented beneath. Muse commit metadata — ``author``, ``agent_id``, ``model_id`` — makes ``--group-by agent`` and ``--group-by model`` especially expressive for auditing agent contribution. Agent quickstart:: muse shortlog --json muse shortlog --group-by agent --json muse shortlog --group-by model --numbered --json muse shortlog --since 2026-01-01 --no-merges --json JSON fields:: repo_id str Repository content-id (sha256:...) branch str Branch name or "__all__" when --all is set truncated bool True when --limit capped the commit load groups list Per-group: key, count, commits[] groups[].key str Group identifier (author/agent/model/branch) groups[].count int Number of commits in this group groups[].commits list Per-commit: commit_id, message, committed_at, author, agent_id, model_id Exit codes:: 0 Success (empty result is also success). 1 Bad date format (--since / --until) or branch not found. 2 Not inside a Muse repository. """ branch_opt: str | None = args.branch_opt all_branches: bool = args.all_branches numbered: bool = args.numbered summary: bool = args.summary by_email: bool = args.by_email group_by: str = args.group_by no_merges: bool = args.no_merges limit: int = clamp_int(args.limit, 0, 100_000, "limit") json_out: bool = args.json_out elapsed = start_timer() def _emit_error(msg: str, code: int, error_key: str = "error") -> None: """Emit a structured error to stdout (JSON) or stderr (text) then exit.""" if json_out: print(json.dumps({ **make_envelope(elapsed, exit_code=code), "error": error_key, "message": msg, })) else: print(f"❌ {msg}", file=sys.stderr) raise SystemExit(code) since: datetime.datetime | None = None until: datetime.datetime | None = None if args.since: try: since = _parse_date(args.since, "--since") except ValueError as exc: _emit_error(str(exc), ExitCode.USER_ERROR, "bad_date") if args.until: try: # Treat --until as inclusive: advance to end-of-day. until = _parse_date(args.until, "--until").replace( hour=23, minute=59, second=59, microsecond=999999 ) except ValueError as exc: _emit_error(str(exc), ExitCode.USER_ERROR, "bad_date") root = require_repo() repo_id = read_repo_id(root) branches: list[str] if all_branches: branches = _branch_names(root) if not branches: _emit_empty(repo_id, "__all__", json_out, truncated=False, elapsed=elapsed) return else: branches = [branch_opt or read_current_branch(root)] branch_label = "__all__" if all_branches else branches[0] # Collect all commits across selected branches (deduplicated by commit_id). seen_ids: set[str] = set() all_commits: list[CommitRecord] = [] for br in branches: branch_commits = get_commits_for_branch(root, br) for c in branch_commits: if c.commit_id in seen_ids: continue seen_ids.add(c.commit_id) all_commits.append(c) if limit and len(all_commits) >= limit: break if limit and len(all_commits) >= limit: break # Record truncation before date filtering removes commits — truncated means # the *load* was capped, not that filters reduced the visible set. truncated = bool(limit and len(all_commits) >= limit) if not all_commits: _emit_empty(repo_id, branch_label, json_out, truncated=truncated, elapsed=elapsed) return # Apply date filters. if since is not None: all_commits = [ c for c in all_commits if c.committed_at.replace(tzinfo=datetime.timezone.utc) >= since ] if until is not None: all_commits = [ c for c in all_commits if c.committed_at.replace(tzinfo=datetime.timezone.utc) <= until ] # Exclude merge commits when requested. if no_merges: all_commits = [ c for c in all_commits if not c.parent2_commit_id ] if not all_commits: _emit_empty(repo_id, branch_label, json_out, truncated=truncated, elapsed=elapsed) return groups = _build_groups(all_commits, group_by=group_by, by_email=by_email) # Sort: by count descending (--numbered), then alphabetically. sorted_keys: list[str] if numbered: sorted_keys = sorted(groups, key=lambda k: -len(groups[k])) else: sorted_keys = sorted(groups) if json_out: group_list: list[_GroupJson] = [] for key in sorted_keys: commits_in_group = groups[key] entries: list[_CommitEntryJson] = [ _CommitEntryJson( commit_id=c.commit_id, message=c.message, committed_at=c.committed_at.isoformat(), author=c.author, agent_id=c.agent_id, model_id=c.model_id, ) for c in commits_in_group ] group_list.append( _GroupJson(key=key, count=len(commits_in_group), commits=entries) ) payload = _ShortlogJson( **make_envelope(elapsed), repo_id=repo_id, branch=branch_label, groups=group_list, truncated=truncated, ) print(json.dumps(payload)) else: for key in sorted_keys: commits_in_group = groups[key] print(f"{sanitize_display(key)} ({len(commits_in_group)}):") if not summary: for c in commits_in_group: print(f" {sanitize_display(c.message)}") print("") def _emit_empty( repo_id: str, branch: str, json_out: bool, *, truncated: bool, elapsed: Callable[[], float], ) -> None: """Emit an empty result in the requested format.""" if json_out: payload = _ShortlogJson( **make_envelope(elapsed), # type: ignore[arg-type] repo_id=repo_id, branch=branch, groups=[], truncated=truncated, ) print(json.dumps(payload)) else: print("No commits found.")