test_core_graph.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Tests for muse/core/graph.py — canonical commit DAG traversal primitives. |
| 2 | |
| 3 | Coverage |
| 4 | -------- |
| 5 | iter_ancestors |
| 6 | - empty starts list → yields nothing |
| 7 | - single root commit (no parents) → yields just that commit |
| 8 | - linear chain, yields in BFS order (newest-first) |
| 9 | - merge commit (two parents) → both parents visited, each once |
| 10 | - first_parent_only=True → only follows parent_commit_id |
| 11 | - exclude= set → treats boundary commits as already-visited |
| 12 | - max_commits=N → stops after N commits yielded (missing commits don't count) |
| 13 | - missing commit in chain → skipped gracefully, walk continues |
| 14 | - diamond DAG (shared ancestor) → shared commit visited exactly once |
| 15 | - multi-source starts → seeds BFS from multiple tips simultaneously |
| 16 | - uses deque internally (structural check, same rationale as blame test) |
| 17 | |
| 18 | ancestor_ids |
| 19 | - returns set of commit IDs (sha256:-prefixed) |
| 20 | - empty repo → empty set |
| 21 | - respects exclude= boundaries |
| 22 | - respects max_commits cap (returns IDs of visited commits) |
| 23 | |
| 24 | find_merge_base |
| 25 | - same commit → returns itself |
| 26 | - linear chain → returns the common ancestor |
| 27 | - true merge topology → returns LCA |
| 28 | - no common ancestor (two disjoint chains) → returns None |
| 29 | - max_ancestors= exceeded → raises ValueError |
| 30 | """ |
| 31 | |
| 32 | from __future__ import annotations |
| 33 | |
| 34 | import datetime |
| 35 | import json |
| 36 | import pathlib |
| 37 | |
| 38 | import pytest |
| 39 | |
| 40 | from muse.core.graph import ancestor_ids, find_merge_base, iter_ancestors |
| 41 | from muse.core.types import long_id |
| 42 | from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id |
| 43 | from muse.core.commits import ( |
| 44 | CommitRecord, |
| 45 | write_commit, |
| 46 | ) |
| 47 | from muse.core.snapshots import ( |
| 48 | SnapshotRecord, |
| 49 | write_snapshot, |
| 50 | ) |
| 51 | from muse.core.paths import muse_dir |
| 52 | |
| 53 | _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) |
| 54 | |
| 55 | |
| 56 | # --------------------------------------------------------------------------- |
| 57 | # Helpers |
| 58 | # --------------------------------------------------------------------------- |
| 59 | |
| 60 | |
| 61 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 62 | dot_muse = muse_dir(tmp_path) |
| 63 | for d in ("objects", "commits", "snapshots", "refs/heads"): |
| 64 | (dot_muse / d).mkdir(parents=True, exist_ok=True) |
| 65 | (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) |
| 66 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 67 | return tmp_path |
| 68 | |
| 69 | |
| 70 | def _write_commit( |
| 71 | repo: pathlib.Path, |
| 72 | message: str = "commit", |
| 73 | parent: str | None = None, |
| 74 | parent2: str | None = None, |
| 75 | author: str = "Author", |
| 76 | committed_at: datetime.datetime | None = None, |
| 77 | ) -> str: |
| 78 | """Write a commit and return its commit_id.""" |
| 79 | dt = committed_at if committed_at is not None else _BASE_DT |
| 80 | snap_id = compute_snapshot_id({}) |
| 81 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={})) |
| 82 | parent_ids = [p for p in (parent, parent2) if p is not None] |
| 83 | commit_id = compute_commit_id( |
| 84 | parent_ids=parent_ids, |
| 85 | snapshot_id=snap_id, |
| 86 | message=message, |
| 87 | committed_at_iso=dt.isoformat(), |
| 88 | author=author, |
| 89 | ) |
| 90 | write_commit( |
| 91 | repo, |
| 92 | CommitRecord( |
| 93 | commit_id=commit_id, |
| 94 | branch="main", |
| 95 | snapshot_id=snap_id, |
| 96 | message=message, |
| 97 | committed_at=dt, |
| 98 | parent_commit_id=parent, |
| 99 | author=author, |
| 100 | parent2_commit_id=parent2, |
| 101 | ), |
| 102 | ) |
| 103 | return commit_id |
| 104 | |
| 105 | |
| 106 | def _ids(commits: list[CommitRecord]) -> list[str]: |
| 107 | return [c.commit_id for c in commits] |
| 108 | |
| 109 | |
| 110 | # --------------------------------------------------------------------------- |
| 111 | # iter_ancestors |
| 112 | # --------------------------------------------------------------------------- |
| 113 | |
| 114 | |
| 115 | class TestIterAncestors: |
| 116 | def test_empty_starts_yields_nothing(self, tmp_path: pathlib.Path) -> None: |
| 117 | repo = _make_repo(tmp_path) |
| 118 | assert list(iter_ancestors(repo, [])) == [] |
| 119 | |
| 120 | def test_single_root_commit(self, tmp_path: pathlib.Path) -> None: |
| 121 | repo = _make_repo(tmp_path) |
| 122 | cid = _write_commit(repo, "root") |
| 123 | result = list(iter_ancestors(repo, cid)) |
| 124 | assert len(result) == 1 |
| 125 | assert result[0].commit_id == cid |
| 126 | |
| 127 | def test_linear_chain_yields_all_in_bfs_order(self, tmp_path: pathlib.Path) -> None: |
| 128 | """A→B→C chain starting from A yields A, B, C in that order.""" |
| 129 | repo = _make_repo(tmp_path) |
| 130 | c = _write_commit(repo, "root") |
| 131 | b = _write_commit(repo, "second", parent=c) |
| 132 | a = _write_commit(repo, "third", parent=b) |
| 133 | |
| 134 | result = _ids(iter_ancestors(repo, a)) |
| 135 | assert result == [a, b, c] |
| 136 | |
| 137 | def test_merge_commit_visits_both_parents(self, tmp_path: pathlib.Path) -> None: |
| 138 | """Merge commit with two parents → both parent branches visited.""" |
| 139 | repo = _make_repo(tmp_path) |
| 140 | root = _write_commit(repo, "root") |
| 141 | left = _write_commit(repo, "left", parent=root) |
| 142 | right = _write_commit(repo, "right", parent=root) |
| 143 | merge = _write_commit(repo, "merge", parent=left, parent2=right) |
| 144 | |
| 145 | visited = set(_ids(iter_ancestors(repo, merge))) |
| 146 | assert merge in visited |
| 147 | assert left in visited |
| 148 | assert right in visited |
| 149 | assert root in visited |
| 150 | |
| 151 | def test_diamond_dag_shared_ancestor_visited_once(self, tmp_path: pathlib.Path) -> None: |
| 152 | """In a diamond (A→B, A→C, B→D, C→D), D is visited exactly once.""" |
| 153 | repo = _make_repo(tmp_path) |
| 154 | d = _write_commit(repo, "D") |
| 155 | b = _write_commit(repo, "B", parent=d) |
| 156 | c = _write_commit(repo, "C", parent=d) |
| 157 | a = _write_commit(repo, "A", parent=b, parent2=c) |
| 158 | |
| 159 | result = list(iter_ancestors(repo, a)) |
| 160 | commit_ids = _ids(result) |
| 161 | # D must appear exactly once despite two paths reaching it. |
| 162 | assert commit_ids.count(d) == 1 |
| 163 | assert set(commit_ids) == {a, b, c, d} |
| 164 | |
| 165 | def test_first_parent_only_skips_second_parent(self, tmp_path: pathlib.Path) -> None: |
| 166 | repo = _make_repo(tmp_path) |
| 167 | root = _write_commit(repo, "root") |
| 168 | left = _write_commit(repo, "left", parent=root) |
| 169 | right = _write_commit(repo, "right", parent=root) |
| 170 | merge = _write_commit(repo, "merge", parent=left, parent2=right) |
| 171 | |
| 172 | result = set(_ids(iter_ancestors(repo, merge, first_parent_only=True))) |
| 173 | assert merge in result |
| 174 | assert left in result |
| 175 | assert root in result |
| 176 | assert right not in result # second parent branch skipped |
| 177 | |
| 178 | def test_exclude_stops_at_boundary(self, tmp_path: pathlib.Path) -> None: |
| 179 | """Commits in `exclude` and their ancestors are never yielded.""" |
| 180 | repo = _make_repo(tmp_path) |
| 181 | root = _write_commit(repo, "root") |
| 182 | mid = _write_commit(repo, "mid", parent=root) |
| 183 | tip = _write_commit(repo, "tip", parent=mid) |
| 184 | |
| 185 | result = _ids(iter_ancestors(repo, tip, exclude={mid})) |
| 186 | assert tip in result |
| 187 | assert mid not in result |
| 188 | assert root not in result |
| 189 | |
| 190 | def test_max_commits_caps_yield(self, tmp_path: pathlib.Path) -> None: |
| 191 | """max_commits=2 yields at most 2 commits from a longer chain.""" |
| 192 | repo = _make_repo(tmp_path) |
| 193 | c = _write_commit(repo, "root") |
| 194 | b = _write_commit(repo, "mid", parent=c) |
| 195 | a = _write_commit(repo, "tip", parent=b) |
| 196 | |
| 197 | result = list(iter_ancestors(repo, a, max_commits=2)) |
| 198 | assert len(result) == 2 |
| 199 | |
| 200 | def test_missing_commit_skipped_walk_continues(self, tmp_path: pathlib.Path) -> None: |
| 201 | """A missing commit ID in the chain is silently skipped.""" |
| 202 | repo = _make_repo(tmp_path) |
| 203 | root = _write_commit(repo, "root") |
| 204 | # Write a commit that references a nonexistent parent. |
| 205 | snap_id = compute_snapshot_id({}) |
| 206 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={})) |
| 207 | ghost_parent = long_id("ab" * 32) |
| 208 | cid = compute_commit_id( |
| 209 | parent_ids=[ghost_parent], |
| 210 | snapshot_id=snap_id, |
| 211 | message="orphan", |
| 212 | committed_at_iso=_BASE_DT.isoformat(), |
| 213 | author="A", |
| 214 | ) |
| 215 | # Write directly to bypass the parent-existence guard — this test |
| 216 | # intentionally creates a commit whose parent is absent to verify that |
| 217 | # iter_ancestors skips missing parents gracefully. |
| 218 | record = CommitRecord( |
| 219 | commit_id=cid, |
| 220 | branch="main", |
| 221 | snapshot_id=snap_id, |
| 222 | message="orphan", |
| 223 | committed_at=_BASE_DT, |
| 224 | parent_commit_id=ghost_parent, |
| 225 | author="A", |
| 226 | ) |
| 227 | import json as _json |
| 228 | from muse.core.object_store import object_path as _obj_path |
| 229 | obj_file = _obj_path(repo, cid) |
| 230 | obj_file.parent.mkdir(parents=True, exist_ok=True) |
| 231 | payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode() |
| 232 | obj_file.write_bytes(f"commit {len(payload)}\0".encode() + payload) |
| 233 | |
| 234 | # Should yield the orphan commit without crashing on the ghost parent. |
| 235 | result = list(iter_ancestors(repo, cid)) |
| 236 | assert len(result) == 1 |
| 237 | assert result[0].commit_id == cid |
| 238 | |
| 239 | def test_multi_source_starts(self, tmp_path: pathlib.Path) -> None: |
| 240 | """Multi-source BFS from two tips collects ancestors of both.""" |
| 241 | repo = _make_repo(tmp_path) |
| 242 | shared = _write_commit(repo, "shared") |
| 243 | branch_a = _write_commit(repo, "branch_a", parent=shared) |
| 244 | branch_b = _write_commit(repo, "branch_b", parent=shared) |
| 245 | |
| 246 | visited = set(_ids(iter_ancestors(repo, [branch_a, branch_b]))) |
| 247 | assert branch_a in visited |
| 248 | assert branch_b in visited |
| 249 | assert shared in visited # shared ancestor visited once |
| 250 | |
| 251 | def test_multi_source_shared_ancestor_once(self, tmp_path: pathlib.Path) -> None: |
| 252 | """Shared ancestor appears exactly once in multi-source BFS.""" |
| 253 | repo = _make_repo(tmp_path) |
| 254 | shared = _write_commit(repo, "shared") |
| 255 | a = _write_commit(repo, "a", parent=shared) |
| 256 | b = _write_commit(repo, "b", parent=shared) |
| 257 | |
| 258 | all_ids = _ids(iter_ancestors(repo, [a, b])) |
| 259 | assert all_ids.count(shared) == 1 |
| 260 | |
| 261 | def test_yields_commit_records_not_ids(self, tmp_path: pathlib.Path) -> None: |
| 262 | """iter_ancestors yields CommitRecord objects (not strings).""" |
| 263 | from muse.core.commits import CommitRecord as CR |
| 264 | repo = _make_repo(tmp_path) |
| 265 | cid = _write_commit(repo, "root", author="Alice") |
| 266 | result = list(iter_ancestors(repo, cid)) |
| 267 | assert len(result) == 1 |
| 268 | assert isinstance(result[0], CR) |
| 269 | assert result[0].author == "Alice" |
| 270 | |
| 271 | def test_uses_deque_not_list(self) -> None: |
| 272 | """walk_dag (the underlying primitive) must use collections.deque for O(1) pops.""" |
| 273 | import inspect |
| 274 | from muse.core import graph as graph_module |
| 275 | source = inspect.getsource(graph_module.walk_dag) |
| 276 | assert "deque" in source, "walk_dag must use collections.deque" |
| 277 | assert "pop(0)" not in source, "must not use list.pop(0)" |
| 278 | assert "insert(0" not in source, "must not use list.insert(0, ...)" |
| 279 | |
| 280 | |
| 281 | # --------------------------------------------------------------------------- |
| 282 | # ancestor_ids |
| 283 | # --------------------------------------------------------------------------- |
| 284 | |
| 285 | |
| 286 | class TestAncestorIds: |
| 287 | def test_returns_set_of_prefixed_ids(self, tmp_path: pathlib.Path) -> None: |
| 288 | repo = _make_repo(tmp_path) |
| 289 | c = _write_commit(repo, "root") |
| 290 | b = _write_commit(repo, "mid", parent=c) |
| 291 | a = _write_commit(repo, "tip", parent=b) |
| 292 | result = ancestor_ids(repo, a) |
| 293 | assert isinstance(result, set) |
| 294 | assert result == {a, b, c} |
| 295 | for oid in result: |
| 296 | assert oid.startswith("sha256:") |
| 297 | |
| 298 | def test_empty_starts_returns_empty_set(self, tmp_path: pathlib.Path) -> None: |
| 299 | repo = _make_repo(tmp_path) |
| 300 | assert ancestor_ids(repo, []) == set() |
| 301 | |
| 302 | def test_exclude_boundaries_respected(self, tmp_path: pathlib.Path) -> None: |
| 303 | repo = _make_repo(tmp_path) |
| 304 | root = _write_commit(repo, "root") |
| 305 | mid = _write_commit(repo, "mid", parent=root) |
| 306 | tip = _write_commit(repo, "tip", parent=mid) |
| 307 | result = ancestor_ids(repo, tip, exclude={mid}) |
| 308 | assert tip in result |
| 309 | assert mid not in result |
| 310 | assert root not in result |
| 311 | |
| 312 | def test_max_commits_respected(self, tmp_path: pathlib.Path) -> None: |
| 313 | repo = _make_repo(tmp_path) |
| 314 | ids = [] |
| 315 | prev = None |
| 316 | for i in range(10): |
| 317 | cid = _write_commit(repo, f"c{i}", parent=prev) |
| 318 | ids.append(cid) |
| 319 | prev = cid |
| 320 | tip = ids[-1] |
| 321 | result = ancestor_ids(repo, tip, max_commits=5) |
| 322 | assert len(result) <= 5 |
| 323 | |
| 324 | def test_range_exclusion_pattern(self, tmp_path: pathlib.Path) -> None: |
| 325 | """ancestor_ids(base) as exclude= produces A..B semantics.""" |
| 326 | repo = _make_repo(tmp_path) |
| 327 | shared = _write_commit(repo, "shared") |
| 328 | base = _write_commit(repo, "base", parent=shared) |
| 329 | c1 = _write_commit(repo, "c1", parent=base) |
| 330 | c2 = _write_commit(repo, "c2", parent=c1) |
| 331 | |
| 332 | base_ancestors = ancestor_ids(repo, base) |
| 333 | tip_range = ancestor_ids(repo, c2, exclude=base_ancestors) |
| 334 | assert c2 in tip_range |
| 335 | assert c1 in tip_range |
| 336 | assert base not in tip_range |
| 337 | assert shared not in tip_range |
| 338 | |
| 339 | |
| 340 | # --------------------------------------------------------------------------- |
| 341 | # find_merge_base |
| 342 | # --------------------------------------------------------------------------- |
| 343 | |
| 344 | |
| 345 | class TestFindMergeBase: |
| 346 | def test_same_commit_returns_itself(self, tmp_path: pathlib.Path) -> None: |
| 347 | repo = _make_repo(tmp_path) |
| 348 | cid = _write_commit(repo, "root") |
| 349 | assert find_merge_base(repo, cid, cid) == cid |
| 350 | |
| 351 | def test_linear_chain_returns_common_ancestor(self, tmp_path: pathlib.Path) -> None: |
| 352 | """A→B→C: merge_base(A, B) == B.""" |
| 353 | repo = _make_repo(tmp_path) |
| 354 | c = _write_commit(repo, "root") |
| 355 | b = _write_commit(repo, "mid", parent=c) |
| 356 | a = _write_commit(repo, "tip", parent=b) |
| 357 | assert find_merge_base(repo, a, b) == b |
| 358 | |
| 359 | def test_true_merge_returns_lca(self, tmp_path: pathlib.Path) -> None: |
| 360 | """root→left, root→right: merge_base(left, right) == root.""" |
| 361 | repo = _make_repo(tmp_path) |
| 362 | root = _write_commit(repo, "root") |
| 363 | left = _write_commit(repo, "left", parent=root) |
| 364 | right = _write_commit(repo, "right", parent=root) |
| 365 | assert find_merge_base(repo, left, right) == root |
| 366 | |
| 367 | def test_deeper_lca(self, tmp_path: pathlib.Path) -> None: |
| 368 | """A→B→X, A→C→X: merge_base(B, C) == X.""" |
| 369 | repo = _make_repo(tmp_path) |
| 370 | x = _write_commit(repo, "X") |
| 371 | b = _write_commit(repo, "B", parent=x) |
| 372 | c = _write_commit(repo, "C", parent=x) |
| 373 | assert find_merge_base(repo, b, c) == x |
| 374 | |
| 375 | def test_no_common_ancestor_returns_none(self, tmp_path: pathlib.Path) -> None: |
| 376 | """Two root commits with no shared history → None.""" |
| 377 | repo = _make_repo(tmp_path) |
| 378 | root_a = _write_commit(repo, "rootA") |
| 379 | root_b = _write_commit(repo, "rootB") |
| 380 | result = find_merge_base(repo, root_a, root_b) |
| 381 | assert result is None |
| 382 | |
| 383 | def test_symmetric(self, tmp_path: pathlib.Path) -> None: |
| 384 | """find_merge_base(a, b) == find_merge_base(b, a).""" |
| 385 | repo = _make_repo(tmp_path) |
| 386 | root = _write_commit(repo, "root") |
| 387 | left = _write_commit(repo, "left", parent=root) |
| 388 | right = _write_commit(repo, "right", parent=root) |
| 389 | assert find_merge_base(repo, left, right) == find_merge_base(repo, right, left) |
| 390 | |
| 391 | def test_max_ancestors_raises_on_deep_graph(self, tmp_path: pathlib.Path) -> None: |
| 392 | """When max_ancestors is exceeded, raises ValueError.""" |
| 393 | repo = _make_repo(tmp_path) |
| 394 | # Build a chain of 10 commits. |
| 395 | prev = None |
| 396 | for i in range(10): |
| 397 | prev = _write_commit(repo, f"c{i}", parent=prev) |
| 398 | a = _write_commit(repo, "a", parent=prev) |
| 399 | b_root = _write_commit(repo, "b_root") |
| 400 | b = _write_commit(repo, "b", parent=b_root) |
| 401 | # With max_ancestors=1, BFS from a and b won't find common ancestor. |
| 402 | with pytest.raises(ValueError, match="max_ancestors"): |
| 403 | find_merge_base(repo, a, b, max_ancestors=1) |
| 404 | |
| 405 | def test_ancestor_of_itself_via_chain(self, tmp_path: pathlib.Path) -> None: |
| 406 | """merge_base(tip, root) == root when root is an ancestor of tip.""" |
| 407 | repo = _make_repo(tmp_path) |
| 408 | root = _write_commit(repo, "root") |
| 409 | mid = _write_commit(repo, "mid", parent=root) |
| 410 | tip = _write_commit(repo, "tip", parent=mid) |
| 411 | result = find_merge_base(repo, tip, root) |
| 412 | assert result == root |