"""muse rev-list — emit a filtered stream of commit IDs. ``rev-list`` is the raw commit-ID primitive that backs agent scripting, counting, ancestry queries, and pipeline composition. Where ``muse log`` is the human-facing history viewer, ``rev-list`` is the machine-facing stream — one commit ID per line (or a single integer for ``--count``). Usage examples:: muse rev-list HEAD # all commits from HEAD to root muse rev-list dev..feat/new # commits on feat/new not on dev muse rev-list -n 10 HEAD # last 10 commit IDs muse rev-list --count HEAD # single integer count muse rev-list --first-parent HEAD # linear chain only (skip merge parents) muse rev-list --no-merges HEAD # skip merge commits muse rev-list --merges HEAD # only merge commits muse rev-list --author alice HEAD # commits by alice (substring match) muse rev-list --after 2026-01-01 HEAD # commits after a date (UTC) muse rev-list --before 2026-12-31 HEAD # commits before a date (UTC) muse rev-list --touches src/ HEAD # commits touching anything under src/ muse rev-list --reverse HEAD # oldest-first muse rev-list --json HEAD # JSON array: {"commit_ids": [...]} Range syntax (``A..B``) Emits commits reachable from *B* but not reachable from *A*. Equivalent to ``git rev-list A..B``. Useful for counting divergence between branches:: muse rev-list --count main..feat # how many commits ahead of main ``--count`` mode Increments a counter rather than accumulating a list — memory is O(1) with respect to history depth. ``--touches `` Path is matched as a prefix: ``src/`` matches any file whose POSIX path starts with ``src/``, while ``src/foo.py`` matches only that exact file. Uses a per-invocation manifest cache so each snapshot is read at most once. Exit codes: 0 — success (including empty result for ``--count 0``). 1 — no commits matched (non-empty history but all filtered out). 2 — usage error (bad date, invalid ref, traversal attempt in ``--touches``). 3 — internal error (repository not found). """ import argparse import datetime import json import logging import collections import pathlib import re import sys from typing import Callable, TypedDict from muse.cli.config import get_limit from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.errors import ExitCode from muse.core.graph import ancestor_ids from muse.core.repo import require_repo from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import ( CommitRecord, read_commit, resolve_commit_ref, ) from muse.core.snapshots import get_commit_snapshot_manifest from muse.core.validation import sanitize_display from muse.core.timing import start_timer logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Types # --------------------------------------------------------------------------- type _FileManifest = dict[str, str] type _ManifestCache = dict[str, _FileManifest] type _Predicate = Callable[[CommitRecord], bool] class _RevListJson(EnvelopeJson): """JSON output for ID-list mode.""" commit_ids: list[str] class _RevListCountJson(EnvelopeJson): """JSON output for --count mode.""" count: int # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _parse_range(ref: str) -> tuple[str | None, str]: """Parse a ref string into ``(exclude_ref, include_ref)``. ``"A..B"`` → ``("A", "B")``. A plain ref with no ``..`` → ``(None, ref)``. Args: ref: Raw ref string from the CLI argument. Returns: A two-tuple ``(exclude, include)`` where ``exclude`` is ``None`` for single-ref inputs. """ if ".." in ref: parts = ref.split("..", 1) return parts[0].strip(), parts[1].strip() return None, ref def _parse_date(value: str) -> datetime.datetime: """Parse *value* as a UTC date (``YYYY-MM-DD``) or ISO datetime. Args: value: Date string supplied via ``--after`` or ``--before``. Returns: A timezone-aware :class:`datetime.datetime` in UTC. Raises: ValueError: If *value* cannot be parsed as a recognised date format. """ for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"): try: dt = datetime.datetime.strptime(value, fmt) return dt.replace(tzinfo=datetime.timezone.utc) except ValueError: continue raise ValueError( f"Unrecognised date format: {value!r}. " "Use YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS." ) def _get_manifest( root: pathlib.Path, commit_id: str | None, cache: _ManifestCache, ) -> _FileManifest: """Return the snapshot manifest for *commit_id*, using *cache* to avoid re-reads. Each commit_id is fetched from disk at most once per *cache* lifetime. An empty dict is returned (and cached) for ``None`` commit_ids so callers can treat initial commits uniformly. Args: root: Repository root. commit_id: Commit whose snapshot manifest to fetch, or ``None``. cache: Shared manifest cache dict; mutated in place. Returns: The snapshot manifest mapping POSIX path → object_id. """ if commit_id is None: return {} if commit_id not in cache: cache[commit_id] = get_commit_snapshot_manifest(root, commit_id) or {} return cache[commit_id] def _commit_touches_path( root: pathlib.Path, commit: CommitRecord, path: str, cache: _ManifestCache, ) -> bool: """Return ``True`` if *commit* added, modified, or removed *path*. *path* is matched as a prefix: ``"src/"`` matches any file under ``src/``, while ``"src/foo.py"`` matches only that exact file. Trailing slashes are normalised away before comparison. Args: root: Repository root. commit: The commit to inspect. path: POSIX path or prefix to match. cache: Shared manifest cache; passed through to :func:`_get_manifest`. Returns: ``True`` when at least one file matching *path* changed between this commit and its first parent. """ norm = path.rstrip("/") current = _get_manifest(root, commit.commit_id, cache) parent = _get_manifest(root, commit.parent_commit_id, cache) for p in set(current) | set(parent): if p == norm or p.startswith(f"{norm}/"): if current.get(p) != parent.get(p): return True return False def _exclude_set(root: pathlib.Path, start_id: str | None) -> set[str]: """Return the set of all commit IDs reachable from *start_id*. Used to implement ``A..B`` range exclusion: compute ancestors of *A*, then walk from *B* stopping at any commit in this set. """ if start_id is None: return set() return ancestor_ids(root, start_id) def _walk_from( root: pathlib.Path, start_id: str, *, exclude: set[str], first_parent: bool, max_count: int, predicates: list[_Predicate], count_mode: bool, ) -> tuple[list[str], int]: """BFS walk from *start_id*, applying filters, returning matching commit IDs. In ``count_mode`` the function increments a counter rather than accumulating IDs, keeping memory O(1) with respect to history depth. Args: root: Repository root. start_id: Commit ID to start walking from. exclude: Set of commit IDs to treat as a boundary — commits in this set and their ancestors are not emitted. first_parent: When ``True`` only follow ``parent_commit_id`` (the first/linear parent), skipping ``parent2_commit_id``. max_count: Stop after emitting this many matching commits. ``0`` means no limit. predicates: List of filter callables; a commit is emitted only when all predicates return ``True``. count_mode: When ``True`` accumulate a counter instead of a list. Returns: A two-tuple ``(commit_ids, count)`` where ``commit_ids`` is the list of matching commit IDs (empty in count_mode) and ``count`` is the number of matching commits. """ ids: list[str] = [] count = 0 visited: set[str] = set(exclude) queue: collections.deque[str] = collections.deque([start_id]) while queue: cid = queue.popleft() if cid in visited: continue visited.add(cid) try: c = read_commit(root, cid) except Exception: continue if all(pred(c) for pred in predicates): count += 1 if not count_mode: ids.append(c.commit_id) if max_count > 0 and count >= max_count: break if c.parent_commit_id: queue.append(c.parent_commit_id) if not first_parent and c.parent2_commit_id: queue.append(c.parent2_commit_id) return ids, count # --------------------------------------------------------------------------- # CLI registration # --------------------------------------------------------------------------- def register( subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]", ) -> None: """Register the ``muse rev-list`` subcommand and its flags.""" parser = subparsers.add_parser( "rev-list", help="Emit a filtered stream of commit IDs.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "ref", nargs="?", default="HEAD", help=( "Ref to walk from, or 'A..B' range (commits on B not on A). " "Defaults to HEAD." ), ) parser.add_argument( "--max-count", type=int, default=0, dest="max_count", metavar="N", help="Stop after emitting N commits. 0 = no limit (default).", ) parser.add_argument( "--count", action="store_true", help="Print a single integer — number of matching commits — instead of IDs.", ) parser.add_argument( "--first-parent", action="store_true", dest="first_parent", help="Follow only the first-parent chain (skip merge parents).", ) parser.add_argument( "--no-merges", action="store_true", dest="no_merges", help="Exclude merge commits (commits with two parents).", ) parser.add_argument( "--merges", action="store_true", help="Include only merge commits.", ) parser.add_argument( "--author", default=None, metavar="PATTERN", help="Only commits whose author matches PATTERN (substring or regex).", ) parser.add_argument( "--after", default=None, metavar="DATE", help="Only commits after DATE (YYYY-MM-DD, UTC).", ) parser.add_argument( "--before", default=None, metavar="DATE", help="Only commits before DATE (YYYY-MM-DD, UTC).", ) parser.add_argument( "--touches", default=None, metavar="PATH", help=( "Only commits that changed PATH. Matched as a prefix: 'src/' " "matches any file under src/." ), ) parser.add_argument( "--reverse", action="store_true", help="Output commits oldest-first instead of newest-first.", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help='Emit {"commit_ids": [...]} instead of newline-delimited IDs.', ) parser.set_defaults(func=run) # --------------------------------------------------------------------------- # Command implementation # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> None: """Emit a filtered stream of commit IDs. Default output is one commit ID per line, newest-first. Pass ``--count`` to receive a single integer. Pass ``--json`` for a stable JSON envelope suitable for agent pipeline consumption. Range syntax (``A..B``) emits commits reachable from *B* but not from *A*. Agent quickstart:: muse rev-list HEAD --json muse rev-list -n 10 HEAD --json muse rev-list --count main..feat --json muse rev-list --author alice --after 2026-01-01 HEAD --json JSON fields (ID-list mode):: commit_ids list[str] Commit IDs matching the filters, newest-first JSON fields (--count mode):: count int Number of matching commits Exit codes:: 0 Success (including empty result in JSON mode). 1 No commits matched (non-empty history, all filtered; plain-text only). 2 Usage error (bad date, invalid ref, traversal in --touches). 3 Repository not found. """ ref_arg: str = args.ref or "HEAD" max_count: int = args.max_count count_mode: bool = args.count first_parent: bool = args.first_parent no_merges: bool = args.no_merges merges_only: bool = args.merges author_pat: str | None = args.author after_str: str | None = args.after before_str: str | None = args.before touches_path: str | None = args.touches reverse: bool = args.reverse json_out: bool = args.json_out elapsed = start_timer() def _emit_error(msg: str, code: int, error_key: str = "error") -> None: """Emit a structured error (JSON when --json, stderr otherwise) and exit.""" if json_out: print(json.dumps({ "error": error_key, "message": msg, "commit_ids": [], "count": 0, "duration_ms": elapsed(), "exit_code": code, })) else: print(f"❌ {msg}", file=sys.stderr) raise SystemExit(code) # --merges and --no-merges are mutually exclusive. if no_merges and merges_only: _emit_error( "--no-merges and --merges are mutually exclusive.", ExitCode.USER_ERROR, "mutually_exclusive_flags", ) # Validate --touches path: reject traversal sequences. if touches_path is not None: norm = touches_path.replace("\\", "/") if norm.startswith("/") or ".." in norm.split("/"): _emit_error( f"--touches path {sanitize_display(touches_path)!r} must be a " "relative path with no traversal sequences.", ExitCode.USER_ERROR, "path_traversal", ) # Parse date filters up-front so errors are reported before any I/O. after_dt: datetime.datetime | None = None before_dt: datetime.datetime | None = None if after_str is not None: try: after_dt = _parse_date(after_str) except ValueError as exc: _emit_error(f"--after: {exc}", ExitCode.USER_ERROR, "bad_date") if before_str is not None: try: before_dt = _parse_date(before_str) except ValueError as exc: _emit_error(f"--before: {exc}", ExitCode.USER_ERROR, "bad_date") # Compile --author pattern. Treat compilation errors as literal substring # fallback rather than crashing — this matches user expectation when they # pass a name like "O'Brien" which contains no real regex metacharacters # but might be malformed in edge cases. author_re: re.Pattern[str] | None = None if author_pat is not None: try: author_re = re.compile(author_pat, re.IGNORECASE) except re.error: # Literal substring fallback. escaped = re.escape(author_pat) author_re = re.compile(escaped, re.IGNORECASE) root = require_repo() # Resolve start and optional exclude ref. exclude_ref_str, include_ref_str = _parse_range(ref_arg) # Resolve include ref → commit ID. try: branch = read_current_branch(root) except ValueError as exc: _emit_error(str(exc), ExitCode.INTERNAL_ERROR, "internal_error") include_commit = resolve_commit_ref(root, branch, include_ref_str) if include_commit is None: # Try treating the ref as a branch name directly. tip = get_head_commit_id(root, include_ref_str) if tip is not None: include_commit = read_commit(root, tip) if include_commit is None: _emit_error( f"Cannot resolve ref {sanitize_display(include_ref_str)!r}.", ExitCode.USER_ERROR, "bad_ref", ) # Compute the exclusion set for A..B ranges. excl: set[str] = set() if exclude_ref_str: exclude_commit = resolve_commit_ref(root, branch, exclude_ref_str) if exclude_commit is None: tip = get_head_commit_id(root, exclude_ref_str) if tip is not None: exclude_commit = read_commit(root, tip) if exclude_commit is None: _emit_error( f"Cannot resolve exclude ref {sanitize_display(exclude_ref_str)!r}.", ExitCode.USER_ERROR, "bad_ref", ) excl = _exclude_set(root, exclude_commit.commit_id) # Build filter predicates. manifest_cache: _ManifestCache = {} predicates: list[_Predicate] = [] if no_merges: predicates.append(lambda c: c.parent2_commit_id is None) if merges_only: predicates.append(lambda c: c.parent2_commit_id is not None) if author_re is not None: _pat = author_re # capture for closure predicates.append(lambda c: bool(_pat.search(c.author or ""))) if after_dt is not None: _after = after_dt # capture for closure # Parens required: `A if B else C > D` parses as `A if B else (C > D)`, # not `(A if B else C) > D`. --before already uses parens; match it here. predicates.append( lambda c: ( c.committed_at.replace(tzinfo=datetime.timezone.utc) if c.committed_at.tzinfo is None else c.committed_at ) > _after ) if before_dt is not None: _before = before_dt # capture for closure predicates.append( lambda c: ( c.committed_at.replace(tzinfo=datetime.timezone.utc) if c.committed_at.tzinfo is None else c.committed_at ) < _before ) if touches_path is not None: _path = touches_path.rstrip("/") _cache = manifest_cache _root = root predicates.append( lambda c: _commit_touches_path(_root, c, _path, _cache) ) # Walk the commit graph. walk_cap = get_limit("max_walk_commits", root) effective_max = max_count if max_count > 0 else walk_cap ids, count = _walk_from( root, include_commit.commit_id, exclude=excl, first_parent=first_parent, max_count=effective_max, predicates=predicates, count_mode=count_mode, ) # Emit output. if count_mode: if json_out: print(json.dumps(_RevListCountJson(**make_envelope(elapsed), count=count))) else: print(count) return if reverse: ids = list(reversed(ids)) exit_code_val = 0 if ids else 1 if json_out: print(json.dumps(_RevListJson( **make_envelope(elapsed, exit_code=exit_code_val), commit_ids=ids, ))) return if ids: print("\n".join(ids)) else: # Empty result — exit 1 so scripts can detect "no matches" vs "no repo". raise SystemExit(1)