"""muse.core.commits — commit layer for the Muse VCS. Everything that reads, writes, or walks commit records lives here. Public API ---------- MissingParentError Raised when a parent commit ID does not exist locally. CommitDict JSON-serialisable TypedDict for CommitRecord wire format. CommitRecord Immutable commit record dataclass with to_dict / from_dict. CommitReadOk / CommitReadNotFound / CommitReadCorrupt / WalkResult Typed result variants for read operations. commit_exists / commit_path Existence check and path helper. write_commit / read_commit / overwrite_commit / update_commit_metadata Core commit I/O. read_commit_result Result-typed read that distinguishes not-found from corrupt. get_head_snapshot_id Resolve a branch tip to its snapshot ID. resolve_commit_ref Resolve HEAD, tilde notation, branch names, or SHA prefixes. find_commits_by_prefix / get_all_commits / get_commits_for_branch Commit enumeration and lookup helpers. walk_commits_between / walk_commits_between_result Bounded history walks. """ from __future__ import annotations import datetime import json as _json import logging import os import pathlib import re import tempfile from dataclasses import dataclass, field from typing import Literal, TypedDict, TypeGuard from muse.core.io import write_text_atomic # noqa: F401 — re-used by callers that import from here from muse.core.object_store import objects_dir as _objects_dir from muse.core.paths import ref_path as _ref_path, remotes_dir as _remotes_dir from muse.core.record_helpers import ( _float_val, _int_val, _str_dict, _str_list, _str_or_none, _str_val, ) from muse.core.refs import get_head_commit_id, read_ref, resolve_any_ref from muse.core.snapshot import compute_commit_id from muse.core.types import ( Manifest, Metadata, MsgpackDict, MsgpackValue, SemVerBump, long_id, short_id, ) from muse.core.validation import sanitize_glob_prefix, validate_ref_id from muse.domain import StructuredDelta logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Commit-specific helpers (only used by CommitRecord) # --------------------------------------------------------------------------- def _as_structured_delta( v: MsgpackDict, ) -> TypeGuard[StructuredDelta]: """Narrow a raw storage dict to :class:`StructuredDelta`. ``StructuredDelta`` is ``TypedDict(total=False)`` — every field is optional — so any ``dict`` is structurally compatible. This TypeGuard exists solely to satisfy the type checker without ``# type: ignore``. """ return True def _sem_ver_bump_val(d: MsgpackDict) -> "SemVerBump": """Extract and validate a ``sem_ver_bump`` field from a raw storage dict. Falls back to ``"none"`` if the value is absent or not a recognised Literal — guards against tampered or forward-versioned records. """ val = _str_val(d, "sem_ver_bump", "none") if val == "major": return "major" if val == "minor": return "minor" if val == "patch": return "patch" return "none" # --------------------------------------------------------------------------- # Exceptions # --------------------------------------------------------------------------- class MissingParentError(ValueError): """Raised by :func:`write_commit` when a parent commit ID is referenced but does not exist in the local object store. A dangling parent pointer silently truncates history traversal — walks stop at the gap rather than reaching the true root. Reject eagerly so the corruption is surfaced at write time rather than discovered later during a log, rebase, or push. """ # --------------------------------------------------------------------------- # Wire-format TypedDict # --------------------------------------------------------------------------- class CommitDict(TypedDict, total=False): """JSON-serialisable representation of a CommitRecord. ``structured_delta`` is the typed delta produced by the domain plugin's ``diff()`` at commit time. ``None`` on the initial commit (no parent to diff against). ``sem_ver_bump`` and ``breaking_changes`` are semantic versioning metadata. Absent (treated as ``"none"`` / ``[]``) for older records and non-code domains. Agent provenance fields (all optional, default ``""`` for older records): ``agent_id`` Stable identity string for the committing agent or human (e.g. ``"counterpoint-bot"`` or ``"gabriel"``). ``model_id`` Model identifier when the author is an AI agent (e.g. ``"claude-opus-4"``). Empty for human authors. ``toolchain_id`` Toolchain that produced the commit (e.g. ``"cursor-agent-v2"``). ``prompt_hash`` SHA-256 of the instruction/prompt that triggered this commit. Privacy-preserving: the hash identifies the prompt without storing its content. ``signature`` Base64url-encoded Ed25519 signature (no padding, 86 chars) over the provenance payload (SHA-256 of commit_id + authorship fields). Verifiable with :func:`muse.core.provenance.verify_commit_ed25519` using the embedded ``signer_public_key``. ``signer_public_key`` Base64url-encoded raw Ed25519 public key bytes (32 bytes → 43 chars). Embedded in the commit so that verification is fully offline — no hub lookup required. ``signer_key_id`` ``sha256:<64-hex>`` fingerprint of the raw public key bytes. """ commit_id: str branch: str snapshot_id: str message: str committed_at: str parent_commit_id: str | None parent2_commit_id: str | None author: str metadata: Metadata structured_delta: StructuredDelta | None sem_ver_bump: SemVerBump breaking_changes: list[str] agent_id: str model_id: str toolchain_id: str prompt_hash: str signature: str signer_public_key: str signer_key_id: str reviewed_by: list[str] test_runs: int labels: list[str] status: str notes: list[str] score: float | None # --------------------------------------------------------------------------- # CommitRecord dataclass # --------------------------------------------------------------------------- @dataclass class CommitRecord: """An immutable commit record stored as a JSON file under .muse/objects/. ``sem_ver_bump`` and ``breaking_changes`` are populated by the commit command when a code-domain delta is available. They default to ``"none"`` and ``[]`` for older records and non-code domains. Agent provenance fields default to ``""`` so that existing JSON without them deserialises without error. See :class:`CommitDict` for field semantics. """ commit_id: str branch: str snapshot_id: str message: str committed_at: datetime.datetime parent_commit_id: str | None = None parent2_commit_id: str | None = None author: str = "" metadata: Metadata = field(default_factory=dict) structured_delta: StructuredDelta | None = None sem_ver_bump: SemVerBump = "none" breaking_changes: list[str] = field(default_factory=list) agent_id: str = "" model_id: str = "" toolchain_id: str = "" prompt_hash: str = "" signature: str = "" signer_public_key: str = "" signer_key_id: str = "" reviewed_by: list[str] = field(default_factory=list) test_runs: int = 0 labels: list[str] = field(default_factory=list) status: str = "" notes: list[str] = field(default_factory=list) score: float | None = None def to_dict(self) -> CommitDict: return CommitDict( commit_id=self.commit_id, branch=self.branch, snapshot_id=self.snapshot_id, message=self.message, committed_at=self.committed_at.isoformat(), parent_commit_id=self.parent_commit_id, parent2_commit_id=self.parent2_commit_id, author=self.author, metadata=dict(self.metadata), structured_delta=self.structured_delta, sem_ver_bump=self.sem_ver_bump, breaking_changes=list(self.breaking_changes), agent_id=self.agent_id, model_id=self.model_id, toolchain_id=self.toolchain_id, prompt_hash=self.prompt_hash, signature=self.signature, signer_public_key=self.signer_public_key, signer_key_id=self.signer_key_id, reviewed_by=list(self.reviewed_by), test_runs=self.test_runs, labels=list(self.labels), status=self.status, notes=list(self.notes), score=self.score, ) @classmethod def from_dict(cls, d: "MsgpackDict | CommitDict") -> "CommitRecord": """Deserialise a :class:`CommitRecord` from a plain dict. Accepts both the typed :class:`CommitDict` and any raw string-keyed mapping (e.g. the result of ``json.loads`` on a stored object). Uses typed accessor helpers (:func:`_str_val`, :func:`_str_or_none`, etc.) so every field access is type-safe without ``# type: ignore``. Runtime guards on the three ID fields fail loud — a corrupt commit_id would propagate into path construction which is a security boundary. """ committed_at_str = _str_val(d, "committed_at") try: committed_at = datetime.datetime.fromisoformat(committed_at_str) except ValueError as exc: raise ValueError( f"Commit record has missing or unparseable committed_at " f"({committed_at_str!r}): {exc}" ) from exc commit_id = _str_val(d, "commit_id") if not commit_id: raise TypeError(f"commit_id must be a non-empty str, got {d.get('commit_id')!r}") snapshot_id = _str_val(d, "snapshot_id") if not snapshot_id: raise TypeError(f"snapshot_id must be a non-empty str, got {d.get('snapshot_id')!r}") branch = _str_val(d, "branch") or _str_val(d, "created_on_branch") if not branch: raise TypeError(f"branch must be a non-empty str, got {d.get('branch')!r}") raw_delta = d.get("structured_delta") structured_delta: StructuredDelta | None = None if isinstance(raw_delta, dict) and _as_structured_delta(raw_delta): structured_delta = raw_delta return cls( commit_id=commit_id, branch=branch, snapshot_id=snapshot_id, message=_str_val(d, "message"), committed_at=committed_at, parent_commit_id=_str_or_none(d, "parent_commit_id"), parent2_commit_id=_str_or_none(d, "parent2_commit_id"), author=_str_val(d, "author"), metadata=_str_dict(d, "metadata"), structured_delta=structured_delta, sem_ver_bump=_sem_ver_bump_val(d), breaking_changes=_str_list(d, "breaking_changes"), agent_id=_str_val(d, "agent_id"), model_id=_str_val(d, "model_id"), toolchain_id=_str_val(d, "toolchain_id"), prompt_hash=_str_val(d, "prompt_hash"), signature=_str_val(d, "signature"), signer_public_key=_str_val(d, "signer_public_key"), signer_key_id=_str_val(d, "signer_key_id"), reviewed_by=_str_list(d, "reviewed_by"), test_runs=_int_val(d, "test_runs"), labels=_str_list(d, "labels"), status=_str_val(d, "status"), notes=_str_list(d, "notes"), score=_float_val(d, "score"), ) # --------------------------------------------------------------------------- # Result variant types # --------------------------------------------------------------------------- class CommitReadOk(TypedDict): status: str commit: CommitRecord class CommitReadNotFound(TypedDict): status: str class CommitReadCorrupt(TypedDict): status: str path: str error: str class WalkResult(TypedDict): """Result of a bounded history walk. Returned by :func:`walk_commits_between_result` so callers can distinguish between a complete walk and one that was truncated by the safety cap. """ commits: list[CommitRecord] truncated: bool count: int # --------------------------------------------------------------------------- # Path helper # --------------------------------------------------------------------------- def commit_path(repo_root: pathlib.Path, commit_id: str) -> pathlib.Path: """Return the on-disk path for a commit record in the unified object store. Path shape: ``.muse/objects///`` """ from muse.core.object_store import object_path as _object_path return _object_path(repo_root, commit_id) # --------------------------------------------------------------------------- # Commit existence # --------------------------------------------------------------------------- _known_commits_dirs: set[str] = set() def commit_exists(repo_root: pathlib.Path, commit_id: str) -> bool: """Return ``True`` when the commit file for *commit_id* is present on disk.""" return commit_path(repo_root, commit_id).exists() # --------------------------------------------------------------------------- # Write # --------------------------------------------------------------------------- def _verify_commit_id( record: CommitRecord, expected_id: str, path: pathlib.Path ) -> None: """Re-derive the commit ID from stored fields and assert it matches *expected_id*. Raises: OSError: If the recomputed ID does not match *expected_id*. """ parent_ids: list[str] = [] if record.parent_commit_id: parent_ids.append(record.parent_commit_id) if record.parent2_commit_id: parent_ids.append(record.parent2_commit_id) recomputed = compute_commit_id( parent_ids=parent_ids, snapshot_id=record.snapshot_id, message=record.message, committed_at_iso=record.committed_at.isoformat(), author=record.author or "", signer_public_key=record.signer_public_key or "", ) if recomputed != expected_id: logger.critical( "❌ Commit %s failed content-hash verification — " "core fields are corrupt (snapshot_id, message, committed_at, or " "parent IDs). Expected %s, recomputed %s. " "Run `muse verify-pack` to audit the full store.", expected_id, expected_id, recomputed, ) raise OSError( f"Commit {expected_id} failed content-hash verification. " f"Core fields (snapshot_id, message, committed_at, parent IDs) " f"have been silently corrupted in {path.name}. " "Run `muse verify-pack` to audit the full store." ) def write_commit( repo_root: pathlib.Path, commit: CommitRecord, *, skip_parent_check: bool = False, sync: bool = True, ) -> None: """Persist a commit record to the unified object store. Idempotent: if the file already exists and is a valid commit record, the write is skipped. If the file exists but is *corrupt*, a CRITICAL is logged and the file is overwritten with the incoming (good) record — the store prefers a live good record over a corrupt existing one. Args: skip_parent_check: When ``True``, skip the parent-existence guard. Use only for shallow clone boundary commits whose parents are intentionally absent. Raises: OSError: If an existing record's ``commit_id`` field does not match the filename (data-integrity violation — indicates a tampered or severely corrupted store). """ commit_path(repo_root, commit.commit_id).parent.mkdir(parents=True, exist_ok=True) if not skip_parent_check: for _field, _parent_id in ( ("parent_commit_id", commit.parent_commit_id), ("parent2_commit_id", commit.parent2_commit_id), ): if _parent_id and not commit_exists(repo_root, _parent_id): raise MissingParentError( f"Refusing to write commit {commit.commit_id!r}: " f"{_field} {_parent_id!r} does not exist in the local store. " "Fetch the missing commits before retrying." ) try: _verify_commit_id(commit, commit.commit_id, pathlib.Path("")) except OSError as exc: raise ValueError( f"Refusing to write commit {commit.commit_id!r}: " f"incoming record failed hash verification — {exc}" ) from exc path = commit_path(repo_root, commit.commit_id) if path.exists(): logger.debug("⚠️ Commit %s already exists — skipped", short_id(commit.commit_id)) return json_bytes = _json.dumps(commit.to_dict()).encode() content = f"commit {len(json_bytes)}\x00".encode() + json_bytes fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-") tmp = pathlib.Path(tmp_str) try: with os.fdopen(fd, "wb") as fh: fh.write(content) fh.flush() if sync: try: os.fsync(fh.fileno()) except OSError: pass tmp.replace(path) except OSError: tmp.unlink(missing_ok=True) raise logger.debug("✅ Stored commit %s branch=%r", short_id(commit.commit_id), commit.branch) # --------------------------------------------------------------------------- # Read # --------------------------------------------------------------------------- def read_commit(repo_root: pathlib.Path, commit_id: str) -> CommitRecord | None: """Load a commit record by ID, or ``None`` if it does not exist or is corrupt. Every read re-verifies the commit ID by recomputing it from the stored core fields (``snapshot_id``, ``message``, ``committed_at``, parent IDs). Callers that need to distinguish "not found" from "corrupt" should use :func:`read_commit_result` instead. """ path = commit_path(repo_root, commit_id) if not path.exists(): return None try: raw = path.read_bytes() nl = raw.index(b"\x00") record = CommitRecord.from_dict(_json.loads(raw[nl + 1:])) _verify_commit_id(record, commit_id, path) return record except Exception as exc: logger.critical("❌ Corrupt commit file %s (%s): %s", path, short_id(commit_id), exc) return None def commit_read_is_ok( r: CommitReadOk | CommitReadNotFound | CommitReadCorrupt, ) -> TypeGuard[CommitReadOk]: """``True`` when *r* is a successful :func:`read_commit_result`.""" return r["status"] == "ok" def commit_read_is_not_found( r: CommitReadOk | CommitReadNotFound | CommitReadCorrupt, ) -> TypeGuard[CommitReadNotFound]: """``True`` when *r* represents a missing commit.""" return r["status"] == "not_found" def commit_read_is_corrupt( r: CommitReadOk | CommitReadNotFound | CommitReadCorrupt, ) -> TypeGuard[CommitReadCorrupt]: """``True`` when *r* represents a corrupt commit file.""" return r["status"] == "corrupt" def read_commit_result( repo_root: pathlib.Path, commit_id: str ) -> CommitReadOk | CommitReadNotFound | CommitReadCorrupt: """Load a commit record with a typed result that distinguishes all outcomes. Returns one of: * ``{"status": "ok", "commit": CommitRecord}`` * ``{"status": "not_found"}`` * ``{"status": "corrupt", "path": str, "error": str}`` """ path = commit_path(repo_root, commit_id) if not path.exists(): return CommitReadNotFound(status="not_found") try: raw = path.read_bytes() nl = raw.index(b"\x00") record = CommitRecord.from_dict(_json.loads(raw[nl + 1:])) _verify_commit_id(record, commit_id, path) return CommitReadOk(status="ok", commit=record) except Exception as exc: logger.critical("❌ Corrupt commit file %s (%s): %s", path, short_id(commit_id), exc) return CommitReadCorrupt(status="corrupt", path=str(path), error=str(exc)) # --------------------------------------------------------------------------- # Mutation # --------------------------------------------------------------------------- def overwrite_commit(repo_root: pathlib.Path, commit: CommitRecord) -> None: """Overwrite an existing commit record on disk (e.g. for annotation updates). Unlike :func:`write_commit`, this function always writes the record even if the file already exists. Use only for annotation fields (``reviewed_by``, ``test_runs``, ``labels``, ``status``, ``notes``, ``score``) that are semantically additive — never for changing history. """ path = commit_path(repo_root, commit.commit_id) path.parent.mkdir(parents=True, exist_ok=True) json_bytes = _json.dumps(commit.to_dict()).encode() content = f"commit {len(json_bytes)}\x00".encode() + json_bytes fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-") tmp = pathlib.Path(tmp_str) try: with os.fdopen(fd, "wb") as fh: fh.write(content) tmp.replace(path) except OSError: tmp.unlink(missing_ok=True) raise logger.debug("✅ Updated annotation on commit %s", short_id(commit.commit_id)) def update_commit_metadata( repo_root: pathlib.Path, commit_id: str, key: str, value: str, ) -> bool: """Set a single string key in a commit's metadata dict. Returns ``True`` on success, ``False`` if the commit is not found. """ commit = read_commit(repo_root, commit_id) if commit is None: logger.warning("⚠️ Commit %s not found — cannot update metadata", commit_id) return False commit.metadata[key] = value overwrite_commit(repo_root, commit) logger.debug("✅ Set %s=%r on commit %s", key, value, short_id(commit_id)) return True # --------------------------------------------------------------------------- # Ref resolution helpers that depend on read_commit # --------------------------------------------------------------------------- def get_head_snapshot_id( repo_root: pathlib.Path, branch: str, ) -> str | None: """Return the snapshot_id at HEAD of *branch*, or ``None``.""" commit_id = resolve_any_ref(repo_root, branch) if commit_id is None: return None commit = read_commit(repo_root, commit_id) if commit is None: return None return commit.snapshot_id def resolve_commit_ref( repo_root: pathlib.Path, branch: str, ref: str | None, ) -> CommitRecord | None: """Resolve a commit reference to a ``CommitRecord``. *ref* may be: - ``None`` / ``"HEAD"`` — the most recent commit on *branch*. - ``"HEAD~N"`` or ``"~N"`` — walk *N* first-parent steps back. - A full or abbreviated commit SHA — resolved by prefix scan. Performs a safe prefix scan (glob metacharacters stripped from *ref*) so user-supplied references cannot glob the entire commits directory. """ if ref is None or ref.upper() == "HEAD": commit_id = get_head_commit_id(repo_root, branch) if commit_id is None: return None return read_commit(repo_root, commit_id) _tilde_match = re.fullmatch(r"(.+?)~(\d+)", ref, re.IGNORECASE) if _tilde_match: base_ref, steps_str = _tilde_match.group(1), _tilde_match.group(2) steps = int(steps_str) base = resolve_commit_ref(repo_root, branch, base_ref if base_ref.upper() != "HEAD" else None) if base is None: return None commit = base for _ in range(steps): if commit.parent_commit_id is None: return None next_commit = read_commit(repo_root, commit.parent_commit_id) if next_commit is None: return None commit = next_commit return commit safe_ref = sanitize_glob_prefix(ref) branch_ref = _ref_path(repo_root, safe_ref) branch_commit_id = read_ref(branch_ref) if branch_commit_id: return read_commit(repo_root, branch_commit_id) try: validate_ref_id(safe_ref) exact: CommitRecord | None = read_commit(repo_root, safe_ref) if exact is not None: return exact except ValueError: pass bare_prefix = long_id(safe_ref, strip=True) return _find_commit_by_prefix(repo_root, bare_prefix) def _find_commit_by_prefix( repo_root: pathlib.Path, prefix: str ) -> CommitRecord | None: """Find the first commit whose ID starts with *prefix*. Glob metacharacters are stripped from *prefix* before use. """ safe_prefix = sanitize_glob_prefix(prefix) objects_dir = _objects_dir(repo_root) if not objects_dir.exists(): return None for path in objects_dir.glob(f"sha256/{safe_prefix[:2]}/*"): if not path.is_file(): continue hex_tail = path.name full_hex = path.parent.name + hex_tail if not full_hex.startswith(safe_prefix): continue commit_id = f"sha256:{full_hex}" record = read_commit(repo_root, commit_id) if record is not None: return record return None def find_commits_by_prefix( repo_root: pathlib.Path, prefix: str ) -> list[CommitRecord]: """Return all commits whose ID starts with *prefix*.""" safe_prefix = sanitize_glob_prefix(prefix) objects_dir = _objects_dir(repo_root) if not objects_dir.exists(): return [] results: list[CommitRecord] = [] for path in objects_dir.glob(f"sha256/{safe_prefix[:2]}/*"): if not path.is_file(): continue hex_tail = path.name full_hex = path.parent.name + hex_tail if not full_hex.startswith(safe_prefix): continue commit_id = f"sha256:{full_hex}" record = read_commit(repo_root, commit_id) if record is not None: results.append(record) return results def _resolve_branch_commit_id(repo_root: pathlib.Path, branch: str) -> str | None: """Resolve *branch* to a commit ID, handling both local and remote tracking refs. Resolution order: 1. Local branch — ``.muse/refs/heads/`` 2. Remote tracking ref — ``.muse/remotes//`` when *branch* contains a ``/``. 3. Returns ``None`` when neither exists. """ local = get_head_commit_id(repo_root, branch) if local is not None: return local if "/" in branch: remote, _, name = branch.partition("/") if name: ref_file = _remotes_dir(repo_root) / remote / name return read_ref(ref_file) return None def get_commits_for_branch( repo_root: pathlib.Path, branch: str, max_count: int = 0, ) -> list[CommitRecord]: """Return commits on *branch*, newest first, by walking the first-parent chain. *branch* may be a local branch name (``"dev"``) or a remote tracking ref (``"origin/dev"``). Args: repo_root: Repository root. branch: Branch name or remote tracking ref to walk from HEAD. max_count: Stop after this many commits. ``0`` means walk the entire chain. """ from muse.core.graph import iter_ancestors # local to avoid circular import commit_id = _resolve_branch_commit_id(repo_root, branch) if not commit_id: return [] cap: int | None = max_count if max_count > 0 else None return list(iter_ancestors(repo_root, commit_id, first_parent_only=True, max_commits=cap)) def get_all_commits(repo_root: pathlib.Path) -> list[CommitRecord]: """Return all commits in the store (order not guaranteed). Corrupt commit files are skipped with a CRITICAL log entry. """ objects_dir = _objects_dir(repo_root) if not objects_dir.exists(): return [] results: list[CommitRecord] = [] for path in objects_dir.glob("sha256/*/*"): if not path.is_file(): continue raw = path.read_bytes() if raw.startswith(b"blob ") or raw.startswith(b"snapshot "): continue if not raw.startswith(b"commit "): logger.critical( "❌ Corrupt or unrecognized object at %s — skipped in commit listing", path ) continue try: nl = raw.index(b"\x00") record = CommitRecord.from_dict(_json.loads(raw[nl + 1:])) results.append(record) except Exception as exc: logger.critical("❌ Corrupt commit file %s — skipped in listing: %s", path, exc) return results # --------------------------------------------------------------------------- # Bounded history walks # --------------------------------------------------------------------------- def walk_commits_between( repo_root: pathlib.Path, to_commit_id: str, from_commit_id: str | None = None, max_commits: int = 10_000, ) -> list[CommitRecord]: """Return commits reachable from *to_commit_id*, stopping before *from_commit_id*. .. note:: If the walk reaches *max_commits* before the chain is exhausted the result is silently truncated. Use :func:`walk_commits_between_result` when the caller must know whether truncation occurred. """ return walk_commits_between_result( repo_root, to_commit_id, from_commit_id, max_commits )["commits"] def walk_commits_between_result( repo_root: pathlib.Path, to_commit_id: str, from_commit_id: str | None = None, max_commits: int = 10_000, ) -> WalkResult: """Bounded history walk with explicit truncation signalling. Returns a :class:`WalkResult` with ``"truncated": True`` when the safety cap was reached before the chain was exhausted. """ from muse.core.graph import iter_ancestors # local to avoid circular import prune = (lambda cid: cid == from_commit_id) if from_commit_id else None gathered = list(iter_ancestors( repo_root, to_commit_id, first_parent_only=True, prune=prune, max_commits=max_commits + 1, )) truncated = len(gathered) > max_commits commits = gathered[:max_commits] return WalkResult(commits=commits, truncated=truncated, count=len(commits))