"""Tests for muse/core/graph.py — canonical commit DAG traversal primitives. Coverage -------- iter_ancestors - empty starts list → yields nothing - single root commit (no parents) → yields just that commit - linear chain, yields in BFS order (newest-first) - merge commit (two parents) → both parents visited, each once - first_parent_only=True → only follows parent_commit_id - exclude= set → treats boundary commits as already-visited - max_commits=N → stops after N commits yielded (missing commits don't count) - missing commit in chain → skipped gracefully, walk continues - diamond DAG (shared ancestor) → shared commit visited exactly once - multi-source starts → seeds BFS from multiple tips simultaneously - uses deque internally (structural check, same rationale as blame test) ancestor_ids - returns set of commit IDs (sha256:-prefixed) - empty repo → empty set - respects exclude= boundaries - respects max_commits cap (returns IDs of visited commits) find_merge_base - same commit → returns itself - linear chain → returns the common ancestor - true merge topology → returns LCA - no common ancestor (two disjoint chains) → returns None - max_ancestors= exceeded → raises ValueError """ from __future__ import annotations import datetime import json import pathlib import pytest from muse.core.graph import ancestor_ids, find_merge_base, iter_ancestors from muse.core.types import long_id from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.paths import muse_dir _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) for d in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path def _write_commit( repo: pathlib.Path, message: str = "commit", parent: str | None = None, parent2: str | None = None, author: str = "Author", committed_at: datetime.datetime | None = None, ) -> str: """Write a commit and return its commit_id.""" dt = committed_at if committed_at is not None else _BASE_DT snap_id = compute_snapshot_id({}) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={})) parent_ids = [p for p in (parent, parent2) if p is not None] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=dt.isoformat(), author=author, ) write_commit( repo, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=dt, parent_commit_id=parent, author=author, parent2_commit_id=parent2, ), ) return commit_id def _ids(commits: list[CommitRecord]) -> list[str]: return [c.commit_id for c in commits] # --------------------------------------------------------------------------- # iter_ancestors # --------------------------------------------------------------------------- class TestIterAncestors: def test_empty_starts_yields_nothing(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert list(iter_ancestors(repo, [])) == [] def test_single_root_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) cid = _write_commit(repo, "root") result = list(iter_ancestors(repo, cid)) assert len(result) == 1 assert result[0].commit_id == cid def test_linear_chain_yields_all_in_bfs_order(self, tmp_path: pathlib.Path) -> None: """A→B→C chain starting from A yields A, B, C in that order.""" repo = _make_repo(tmp_path) c = _write_commit(repo, "root") b = _write_commit(repo, "second", parent=c) a = _write_commit(repo, "third", parent=b) result = _ids(iter_ancestors(repo, a)) assert result == [a, b, c] def test_merge_commit_visits_both_parents(self, tmp_path: pathlib.Path) -> None: """Merge commit with two parents → both parent branches visited.""" repo = _make_repo(tmp_path) root = _write_commit(repo, "root") left = _write_commit(repo, "left", parent=root) right = _write_commit(repo, "right", parent=root) merge = _write_commit(repo, "merge", parent=left, parent2=right) visited = set(_ids(iter_ancestors(repo, merge))) assert merge in visited assert left in visited assert right in visited assert root in visited def test_diamond_dag_shared_ancestor_visited_once(self, tmp_path: pathlib.Path) -> None: """In a diamond (A→B, A→C, B→D, C→D), D is visited exactly once.""" repo = _make_repo(tmp_path) d = _write_commit(repo, "D") b = _write_commit(repo, "B", parent=d) c = _write_commit(repo, "C", parent=d) a = _write_commit(repo, "A", parent=b, parent2=c) result = list(iter_ancestors(repo, a)) commit_ids = _ids(result) # D must appear exactly once despite two paths reaching it. assert commit_ids.count(d) == 1 assert set(commit_ids) == {a, b, c, d} def test_first_parent_only_skips_second_parent(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") left = _write_commit(repo, "left", parent=root) right = _write_commit(repo, "right", parent=root) merge = _write_commit(repo, "merge", parent=left, parent2=right) result = set(_ids(iter_ancestors(repo, merge, first_parent_only=True))) assert merge in result assert left in result assert root in result assert right not in result # second parent branch skipped def test_exclude_stops_at_boundary(self, tmp_path: pathlib.Path) -> None: """Commits in `exclude` and their ancestors are never yielded.""" repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) result = _ids(iter_ancestors(repo, tip, exclude={mid})) assert tip in result assert mid not in result assert root not in result def test_max_commits_caps_yield(self, tmp_path: pathlib.Path) -> None: """max_commits=2 yields at most 2 commits from a longer chain.""" repo = _make_repo(tmp_path) c = _write_commit(repo, "root") b = _write_commit(repo, "mid", parent=c) a = _write_commit(repo, "tip", parent=b) result = list(iter_ancestors(repo, a, max_commits=2)) assert len(result) == 2 def test_missing_commit_skipped_walk_continues(self, tmp_path: pathlib.Path) -> None: """A missing commit ID in the chain is silently skipped.""" repo = _make_repo(tmp_path) root = _write_commit(repo, "root") # Write a commit that references a nonexistent parent. snap_id = compute_snapshot_id({}) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={})) ghost_parent = long_id("ab" * 32) cid = compute_commit_id( parent_ids=[ghost_parent], snapshot_id=snap_id, message="orphan", committed_at_iso=_BASE_DT.isoformat(), author="A", ) # Write directly to bypass the parent-existence guard — this test # intentionally creates a commit whose parent is absent to verify that # iter_ancestors skips missing parents gracefully. record = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="orphan", committed_at=_BASE_DT, parent_commit_id=ghost_parent, author="A", ) import json as _json from muse.core.object_store import object_path as _obj_path obj_file = _obj_path(repo, cid) obj_file.parent.mkdir(parents=True, exist_ok=True) payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode() obj_file.write_bytes(f"commit {len(payload)}\0".encode() + payload) # Should yield the orphan commit without crashing on the ghost parent. result = list(iter_ancestors(repo, cid)) assert len(result) == 1 assert result[0].commit_id == cid def test_multi_source_starts(self, tmp_path: pathlib.Path) -> None: """Multi-source BFS from two tips collects ancestors of both.""" repo = _make_repo(tmp_path) shared = _write_commit(repo, "shared") branch_a = _write_commit(repo, "branch_a", parent=shared) branch_b = _write_commit(repo, "branch_b", parent=shared) visited = set(_ids(iter_ancestors(repo, [branch_a, branch_b]))) assert branch_a in visited assert branch_b in visited assert shared in visited # shared ancestor visited once def test_multi_source_shared_ancestor_once(self, tmp_path: pathlib.Path) -> None: """Shared ancestor appears exactly once in multi-source BFS.""" repo = _make_repo(tmp_path) shared = _write_commit(repo, "shared") a = _write_commit(repo, "a", parent=shared) b = _write_commit(repo, "b", parent=shared) all_ids = _ids(iter_ancestors(repo, [a, b])) assert all_ids.count(shared) == 1 def test_yields_commit_records_not_ids(self, tmp_path: pathlib.Path) -> None: """iter_ancestors yields CommitRecord objects (not strings).""" from muse.core.commits import CommitRecord as CR repo = _make_repo(tmp_path) cid = _write_commit(repo, "root", author="Alice") result = list(iter_ancestors(repo, cid)) assert len(result) == 1 assert isinstance(result[0], CR) assert result[0].author == "Alice" def test_uses_deque_not_list(self) -> None: """walk_dag (the underlying primitive) must use collections.deque for O(1) pops.""" import inspect from muse.core import graph as graph_module source = inspect.getsource(graph_module.walk_dag) assert "deque" in source, "walk_dag must use collections.deque" assert "pop(0)" not in source, "must not use list.pop(0)" assert "insert(0" not in source, "must not use list.insert(0, ...)" # --------------------------------------------------------------------------- # ancestor_ids # --------------------------------------------------------------------------- class TestAncestorIds: def test_returns_set_of_prefixed_ids(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c = _write_commit(repo, "root") b = _write_commit(repo, "mid", parent=c) a = _write_commit(repo, "tip", parent=b) result = ancestor_ids(repo, a) assert isinstance(result, set) assert result == {a, b, c} for oid in result: assert oid.startswith("sha256:") def test_empty_starts_returns_empty_set(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert ancestor_ids(repo, []) == set() def test_exclude_boundaries_respected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) result = ancestor_ids(repo, tip, exclude={mid}) assert tip in result assert mid not in result assert root not in result def test_max_commits_respected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ids = [] prev = None for i in range(10): cid = _write_commit(repo, f"c{i}", parent=prev) ids.append(cid) prev = cid tip = ids[-1] result = ancestor_ids(repo, tip, max_commits=5) assert len(result) <= 5 def test_range_exclusion_pattern(self, tmp_path: pathlib.Path) -> None: """ancestor_ids(base) as exclude= produces A..B semantics.""" repo = _make_repo(tmp_path) shared = _write_commit(repo, "shared") base = _write_commit(repo, "base", parent=shared) c1 = _write_commit(repo, "c1", parent=base) c2 = _write_commit(repo, "c2", parent=c1) base_ancestors = ancestor_ids(repo, base) tip_range = ancestor_ids(repo, c2, exclude=base_ancestors) assert c2 in tip_range assert c1 in tip_range assert base not in tip_range assert shared not in tip_range # --------------------------------------------------------------------------- # find_merge_base # --------------------------------------------------------------------------- class TestFindMergeBase: def test_same_commit_returns_itself(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) cid = _write_commit(repo, "root") assert find_merge_base(repo, cid, cid) == cid def test_linear_chain_returns_common_ancestor(self, tmp_path: pathlib.Path) -> None: """A→B→C: merge_base(A, B) == B.""" repo = _make_repo(tmp_path) c = _write_commit(repo, "root") b = _write_commit(repo, "mid", parent=c) a = _write_commit(repo, "tip", parent=b) assert find_merge_base(repo, a, b) == b def test_true_merge_returns_lca(self, tmp_path: pathlib.Path) -> None: """root→left, root→right: merge_base(left, right) == root.""" repo = _make_repo(tmp_path) root = _write_commit(repo, "root") left = _write_commit(repo, "left", parent=root) right = _write_commit(repo, "right", parent=root) assert find_merge_base(repo, left, right) == root def test_deeper_lca(self, tmp_path: pathlib.Path) -> None: """A→B→X, A→C→X: merge_base(B, C) == X.""" repo = _make_repo(tmp_path) x = _write_commit(repo, "X") b = _write_commit(repo, "B", parent=x) c = _write_commit(repo, "C", parent=x) assert find_merge_base(repo, b, c) == x def test_no_common_ancestor_returns_none(self, tmp_path: pathlib.Path) -> None: """Two root commits with no shared history → None.""" repo = _make_repo(tmp_path) root_a = _write_commit(repo, "rootA") root_b = _write_commit(repo, "rootB") result = find_merge_base(repo, root_a, root_b) assert result is None def test_symmetric(self, tmp_path: pathlib.Path) -> None: """find_merge_base(a, b) == find_merge_base(b, a).""" repo = _make_repo(tmp_path) root = _write_commit(repo, "root") left = _write_commit(repo, "left", parent=root) right = _write_commit(repo, "right", parent=root) assert find_merge_base(repo, left, right) == find_merge_base(repo, right, left) def test_max_ancestors_raises_on_deep_graph(self, tmp_path: pathlib.Path) -> None: """When max_ancestors is exceeded, raises ValueError.""" repo = _make_repo(tmp_path) # Build a chain of 10 commits. prev = None for i in range(10): prev = _write_commit(repo, f"c{i}", parent=prev) a = _write_commit(repo, "a", parent=prev) b_root = _write_commit(repo, "b_root") b = _write_commit(repo, "b", parent=b_root) # With max_ancestors=1, BFS from a and b won't find common ancestor. with pytest.raises(ValueError, match="max_ancestors"): find_merge_base(repo, a, b, max_ancestors=1) def test_ancestor_of_itself_via_chain(self, tmp_path: pathlib.Path) -> None: """merge_base(tip, root) == root when root is an ancestor of tip.""" repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) result = find_merge_base(repo, tip, root) assert result == root