"""``muse read`` — inspect a commit: metadata, delta, and provenance. Display the full details of any commit: author, timestamp, semantic-version impact, agent provenance, and a file/symbol change summary. Usage ----- Inspect HEAD:: muse read Inspect a specific commit or branch tip:: muse read Omit the file-change summary:: muse read --no-stat Omit the stored ``structured_delta`` blob from JSON output (smaller payload):: muse read --json --no-delta Include the full snapshot manifest (path → object_id) in JSON output:: muse read --json --manifest The ``manifest`` key maps every tracked path to its content hash at this commit. Use it when you need to inspect or verify the complete working-tree state recorded by a commit, rather than just the files that changed. JSON output schema (``--json``):: { "commit_id": "", "branch": "main", "message": "...", "author": "gabriel", "agent_id": null, "model_id": null, "committed_at": "2026-01-01T00:00:00+00:00", "snapshot_id": "", "parent_commit_id": " | null", "parent2_commit_id": null, "sem_ver_bump": "none", "breaking_changes": [], "metadata": {}, "files_added": [], "files_removed": [], "files_modified": [], "total_changes": 0, "structured_delta": null, "duration_ms": 1.2, "exit_code": 0 } Error output (``--json``, always to stdout so agents can parse failures):: { "error": "commit_not_found", "ref": "", "message": "commit '' not found", "duration_ms": 0.3, "exit_code": 1 } Exit codes:: 0 — commit found and displayed 1 — commit ref not found or other user error 3 — I/O error """ import argparse import json import logging import pathlib import re import sys import textwrap from muse.core.types import Manifest, Metadata, long_id from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.errors import ExitCode from muse.core.repo import require_repo from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import ( find_commits_by_prefix, read_commit, resolve_commit_ref, ) from muse.core.snapshots import ( get_commit_snapshot_manifest, read_snapshot, ) from muse.core.timing import start_timer from muse.core.validation import sanitize_display from typing import TypedDict _SHA256_FULL_RE = re.compile(r"^sha256:[0-9a-f]{64}$") _SHA256_PREFIX_RE = re.compile(r"^sha256:[0-9a-f]{1,63}$") from muse.domain import DomainOp, StructuredDelta type _StrMap = dict[str, str] logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Wire-format TypedDicts # --------------------------------------------------------------------------- class _ReadErrorJson(EnvelopeJson, total=False): """JSON error envelope for ``muse read --json`` error output.""" error: str message: str ref: str class _ReadJson(EnvelopeJson, total=False): """JSON output envelope for ``muse read --json``. All commit fields are present; ``manifest`` is only included when ``--manifest`` is passed; ``structured_delta`` only when ``--no-delta`` is not passed. ``total=False`` reflects the dynamic build via ``commit.to_dict()``. """ commit_id: str branch: str message: str author: str agent_id: str | None model_id: str | None committed_at: str snapshot_id: str parent_commit_id: str | None parent2_commit_id: str | None sem_ver_bump: str breaking_changes: list[str] metadata: Metadata structured_delta: StructuredDelta | None files_added: list[str] files_removed: list[str] files_modified: list[str] dirs_added: list[str] dirs_removed: list[str] total_changes: int manifest: Manifest def _format_op(op: DomainOp) -> list[str]: """Return one or more display lines for a single domain op. Each branch checks ``op["op"]`` directly so mypy can narrow the TypedDict union to the specific subtype before accessing its fields. """ if op["op"] == "insert": return [f" A {op['address']}"] if op["op"] == "delete": return [f" D {op['address']}"] if op["op"] == "replace": return [f" M {op['address']}"] if op["op"] == "move": return [f" R {op['address']} ({op['from_position']} → {op['to_position']})"] if op["op"] == "mutate": fields = ", ".join( f"{k}: {v['old']}→{v['new']}" for k, v in op.get("fields", {}).items() ) return [f" ~ {op['address']} ({fields or op.get('old_summary', '')}→{op.get('new_summary', '')})"] # op["op"] == "patch" — the only remaining variant. lines = [f" M {op['address']}"] if op["child_summary"]: lines.append(f" └─ {op['child_summary']}") return lines def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``muse read`` subcommand and its flags.""" parser = subparsers.add_parser( "read", help="Inspect a commit: metadata, delta, and provenance.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "ref", nargs="?", default=None, help="Commit ID or branch name (default: HEAD).", ) parser.add_argument( "--no-stat", dest="stat", action="store_false", default=True, help="Omit the file/symbol change summary from output.", ) parser.add_argument( "--no-delta", dest="include_delta", action="store_false", default=True, help=( "Exclude the ``structured_delta`` blob from JSON output. " "Produces a smaller payload for agents that only need commit metadata." ), ) parser.add_argument( "--manifest", dest="include_manifest", action="store_true", default=False, help=( "Include the full snapshot manifest (path → object_id) in JSON output " "under the ``manifest`` key. Lets agents inspect the complete " "working-tree state recorded by this commit without a separate command. " "Ignored in text mode." ), ) parser.add_argument( "--no-manifest", dest="include_manifest", action="store_false", help="Exclude the snapshot manifest from JSON output (default).", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit machine-readable JSON instead of human text.", ) parser.set_defaults(func=run) def run(args: argparse.Namespace) -> None: """Inspect a commit: metadata, delta, and provenance. Agents should pass ``--json`` to receive a machine-readable result:: { "commit_id": "", "branch": "main", "message": "Add verse melody", "author": "gabriel", "agent_id": "", "model_id": "", "toolchain_id": "", "committed_at": "2026-03-21T12:00:00+00:00", "snapshot_id": "", "parent_commit_id": " | null", "parent2_commit_id": null, "sem_ver_bump": "minor", "breaking_changes": [], "metadata": {}, "files_added": ["new_track.mid"], "files_removed": [], "files_modified": ["tracks/bass.mid"], "total_changes": 1, "structured_delta": { ... } } Pass ``--no-stat`` to omit ``files_added/removed/modified``. Pass ``--no-delta`` to omit ``structured_delta`` (smaller payload). Pass ``--manifest`` to include the full snapshot manifest:: { ... "manifest": { "src/melody.py": "", "src/harmony.py": "" } } The ``manifest`` key maps every tracked path to its content hash at this commit. Useful when you need to verify the full working-tree state without a separate ``muse diff`` or file-read cycle. """ ref: str | None = args.ref stat: bool = args.stat include_delta: bool = args.include_delta include_manifest: bool = args.include_manifest json_out: bool = args.json_out # Bare hex is rejected at the CLI boundary — sha256: prefix is required. # HEAD and branch names contain non-hex characters and are never caught here. _HEX_CHARS = frozenset("0123456789abcdef") if ref is not None and not ref.startswith("sha256:") and all(c in _HEX_CHARS for c in ref): safe = sanitize_display(ref) print( f"❌ Bare hex IDs are not accepted — use 'sha256:{safe}' instead.\n" f" Even a short prefix works: 'sha256:{safe[:12]}'", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) elapsed = start_timer() def _emit_error(msg: str, code: int, error_key: str = "error", **extra: str) -> None: """Emit a structured error to stdout (JSON) or stderr (text) then exit.""" if json_out: print(json.dumps(_ReadErrorJson( **make_envelope(elapsed, exit_code=int(code)), error=error_key, message=msg, **extra, ))) else: print(f"❌ {msg}", file=sys.stderr) raise SystemExit(code) root = require_repo() branch = read_current_branch(root) # Canonical content-addressed IDs — must be detected before branch-name # resolution because ':' in the ref would raise ValueError in # validate_branch_name. if ref is not None and _SHA256_FULL_RE.match(ref): commit = read_commit(root, ref) elif ref is not None and _SHA256_PREFIX_RE.match(ref): bare_prefix = long_id(ref, strip=True) results = find_commits_by_prefix(root, bare_prefix) commit = results[0] if len(results) == 1 else None elif ref is not None and ref.upper() not in ("HEAD",): # Branch name or tilde notation — guard against forbidden characters. try: branch_head_id = get_head_commit_id(root, ref) except ValueError: branch_head_id = None if branch_head_id is not None: commit = read_commit(root, branch_head_id) else: commit = resolve_commit_ref(root, branch, ref) else: commit = resolve_commit_ref(root, branch, ref) if commit is None: _emit_error( f"commit '{ref}' not found", ExitCode.USER_ERROR, "commit_not_found", ref=str(ref), ) if json_out: commit_data = commit.to_dict() if not include_delta: commit_data.pop("structured_delta", None) elif commit.parent_commit_id is None: # Genesis commit — structured_delta was computed for indexers but # has no meaningful diff to surface (there is no parent to compare). commit_data["structured_delta"] = None # Read the snapshot once; reuse for both --stat and --manifest. cur_snap = read_snapshot(root, commit.snapshot_id) if (stat or include_manifest) else None cur: _StrMap = cur_snap.manifest if cur_snap is not None else {} if stat: par_snap = None if commit.parent_commit_id: par_commit = read_commit(root, commit.parent_commit_id) if par_commit is not None: par_snap = read_snapshot(root, par_commit.snapshot_id) par: _StrMap = par_snap.manifest if par_snap is not None else {} par_dirs: list[str] = par_snap.directories if par_snap is not None else [] cur_dirs: list[str] = cur_snap.directories if cur_snap is not None else [] files_added = sorted(set(cur) - set(par)) files_removed = sorted(set(par) - set(cur)) files_modified = sorted( p for p in set(cur) & set(par) if cur[p] != par[p] ) dirs_added = sorted(p + "/" for p in set(cur_dirs) - set(par_dirs)) dirs_removed = sorted(p + "/" for p in set(par_dirs) - set(cur_dirs)) commit_data.update({ "files_added": files_added, "files_removed": files_removed, "files_modified": files_modified, "dirs_added": dirs_added, "dirs_removed": dirs_removed, "total_changes": ( len(files_added) + len(files_modified) + len(files_removed) + len(dirs_added) + len(dirs_removed) ), }) if include_manifest: # Emit path → object_id for every file tracked at this commit. # Sorted for determinism; object_ids are content hashes (strings). commit_data["manifest"] = dict(sorted(cur.items())) # Idiomatic JSON cleanup: # - Optional crypto strings with no meaningful empty value: "" → null # - agent_id/model_id/toolchain_id stay as "" for human commits (never null) # - Always-present schema fields: reviewed_by, test_runs, metadata, # parent2_commit_id are kept even when empty/zero/null _OPTIONAL_CRYPTO = ( "prompt_hash", "signature", "signer_public_key", "signer_key_id", "status", ) for _k in _OPTIONAL_CRYPTO: if _k in commit_data and commit_data[_k] == "": commit_data[_k] = None # Ensure always-present fields carry their zero values rather than being absent. commit_data.setdefault("reviewed_by", []) commit_data.setdefault("test_runs", 0) commit_data.setdefault("metadata", {}) # parent2_commit_id is always present (null for non-merge commits). commit_data.setdefault("parent2_commit_id", None) # Noise-only fields that agents don't need when empty. for _k in ("labels", "notes"): if not commit_data.get(_k): commit_data.pop(_k, None) if commit_data.get("score") is None: commit_data.pop("score", None) # Strip position:null from structured_delta ops — position is only # meaningful for ordered-sequence domains (MIDI). Stored deltas from # before the AddressedInsertOp/AddressedDeleteOp refactor have it set # to null; omitting it makes the output schema-correct for all commits. _delta = commit_data.get("structured_delta") if isinstance(_delta, dict): def _strip_position(ops: list) -> None: for _op in ops: if isinstance(_op, dict): if _op.get("position") is None: _op.pop("position", None) _strip_position(_op.get("child_ops") or []) _strip_position(_delta.get("ops") or []) print(json.dumps(_ReadJson(**make_envelope(elapsed), **commit_data), default=str)) return # ── Text output ──────────────────────────────────────────────────────────── print(f"commit {commit.commit_id}") if commit.parent_commit_id: print(f"Parent: {commit.parent_commit_id}") if commit.parent2_commit_id: print(f"Parent: {commit.parent2_commit_id} (merge)") if commit.author: print(f"Author: {sanitize_display(commit.author)}") # Use ISO 8601 format (with T separator) for consistency with --json output. print(f"Date: {commit.committed_at.isoformat()}") if commit.sem_ver_bump and commit.sem_ver_bump != "none": print(f"SemVer: {commit.sem_ver_bump}") if commit.agent_id: print(f"Agent: {sanitize_display(commit.agent_id)}") if commit.metadata: for k, v in sorted(commit.metadata.items()): print(f" {sanitize_display(k)}: {sanitize_display(str(v))}") # Render the commit message with consistent 4-space indentation for every # line. Previously only the first line was indented; subsequent lines in # multiline messages started at column 0, breaking readability. raw_message = sanitize_display(commit.message) if commit.message else "" indented_message = textwrap.indent(raw_message, " ") if raw_message else "" print(f"\n{indented_message}\n") if not stat: return # Prefer the structured delta stored on the commit. # It carries rich note-level detail and is faster (no blob reloading). if commit.structured_delta is not None: delta = commit.structured_delta if not delta["ops"]: print(" (no changes)") return lines: list[str] = [] for op in delta["ops"]: lines.extend(_format_op(op)) for line in lines: print(line) # Re-derive counts from ops for a clean two-line summary. _f_added = sum(1 for o in delta["ops"] if o.get("op") == "insert" and not o.get("child_ops")) _f_removed = sum(1 for o in delta["ops"] if o.get("op") == "delete" and not o.get("child_ops")) _f_modified = sum(1 for o in delta["ops"] if o.get("op") in ("patch", "modify")) _sym_added = sum(sum(1 for c in o.get("child_ops", []) if c.get("op") == "insert") for o in delta["ops"]) _sym_removed = sum(sum(1 for c in o.get("child_ops", []) if c.get("op") == "delete") for o in delta["ops"]) _file_parts = [] if _f_removed: _file_parts.append(f"{_f_removed} removed") if _f_modified: _file_parts.append(f"{_f_modified} modified") if _f_added: _file_parts.append(f"{_f_added} added") _sym_parts = [] if _sym_added: _sym_parts.append(f"{_sym_added} added") if _sym_removed: _sym_parts.append(f"{_sym_removed} removed") print() if _file_parts: print(f" Files: {', '.join(_file_parts)}") if _sym_parts: print(f" Symbols: {', '.join(_sym_parts)}") return # Fallback for initial commits or pre-structured-delta commits: compute # file-level diff from snapshot manifests directly. current = get_commit_snapshot_manifest(root, commit.commit_id) or {} parent: _StrMap = {} if commit.parent_commit_id: parent = get_commit_snapshot_manifest(root, commit.parent_commit_id) or {} added = sorted(set(current) - set(parent)) removed = sorted(set(parent) - set(current)) modified = sorted(p for p in set(current) & set(parent) if current[p] != parent[p]) for p in added: print(f" A {p}") for p in removed: print(f" D {p}") for p in modified: print(f" M {p}") total = len(added) + len(removed) + len(modified) if total: print(f"\n {total} file(s) changed")