gabriel / muse public

test_core_graph.py file-level

at sha256:c · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Tests for muse/core/graph.py — canonical commit DAG traversal primitives.
2
3 Coverage
4 --------
5 iter_ancestors
6 - empty starts list → yields nothing
7 - single root commit (no parents) → yields just that commit
8 - linear chain, yields in BFS order (newest-first)
9 - merge commit (two parents) → both parents visited, each once
10 - first_parent_only=True → only follows parent_commit_id
11 - exclude= set → treats boundary commits as already-visited
12 - max_commits=N → stops after N commits yielded (missing commits don't count)
13 - missing commit in chain → skipped gracefully, walk continues
14 - diamond DAG (shared ancestor) → shared commit visited exactly once
15 - multi-source starts → seeds BFS from multiple tips simultaneously
16 - uses deque internally (structural check, same rationale as blame test)
17
18 ancestor_ids
19 - returns set of commit IDs (sha256:-prefixed)
20 - empty repo → empty set
21 - respects exclude= boundaries
22 - respects max_commits cap (returns IDs of visited commits)
23
24 find_merge_base
25 - same commit → returns itself
26 - linear chain → returns the common ancestor
27 - true merge topology → returns LCA
28 - no common ancestor (two disjoint chains) → returns None
29 - max_ancestors= exceeded → raises ValueError
30 """
31
32 from __future__ import annotations
33
34 import datetime
35 import json
36 import pathlib
37
38 import pytest
39
40 from muse.core.graph import ancestor_ids, find_merge_base, iter_ancestors
41 from muse.core.types import long_id
42 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
43 from muse.core.commits import (
44 CommitRecord,
45 write_commit,
46 )
47 from muse.core.snapshots import (
48 SnapshotRecord,
49 write_snapshot,
50 )
51 from muse.core.paths import muse_dir
52
53 _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
54
55
56 # ---------------------------------------------------------------------------
57 # Helpers
58 # ---------------------------------------------------------------------------
59
60
61 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
62 dot_muse = muse_dir(tmp_path)
63 for d in ("objects", "commits", "snapshots", "refs/heads"):
64 (dot_muse / d).mkdir(parents=True, exist_ok=True)
65 (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
66 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
67 return tmp_path
68
69
70 def _write_commit(
71 repo: pathlib.Path,
72 message: str = "commit",
73 parent: str | None = None,
74 parent2: str | None = None,
75 author: str = "Author",
76 committed_at: datetime.datetime | None = None,
77 ) -> str:
78 """Write a commit and return its commit_id."""
79 dt = committed_at if committed_at is not None else _BASE_DT
80 snap_id = compute_snapshot_id({})
81 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={}))
82 parent_ids = [p for p in (parent, parent2) if p is not None]
83 commit_id = compute_commit_id(
84 parent_ids=parent_ids,
85 snapshot_id=snap_id,
86 message=message,
87 committed_at_iso=dt.isoformat(),
88 author=author,
89 )
90 write_commit(
91 repo,
92 CommitRecord(
93 commit_id=commit_id,
94 branch="main",
95 snapshot_id=snap_id,
96 message=message,
97 committed_at=dt,
98 parent_commit_id=parent,
99 author=author,
100 parent2_commit_id=parent2,
101 ),
102 )
103 return commit_id
104
105
106 def _ids(commits: list[CommitRecord]) -> list[str]:
107 return [c.commit_id for c in commits]
108
109
110 # ---------------------------------------------------------------------------
111 # iter_ancestors
112 # ---------------------------------------------------------------------------
113
114
115 class TestIterAncestors:
116 def test_empty_starts_yields_nothing(self, tmp_path: pathlib.Path) -> None:
117 repo = _make_repo(tmp_path)
118 assert list(iter_ancestors(repo, [])) == []
119
120 def test_single_root_commit(self, tmp_path: pathlib.Path) -> None:
121 repo = _make_repo(tmp_path)
122 cid = _write_commit(repo, "root")
123 result = list(iter_ancestors(repo, cid))
124 assert len(result) == 1
125 assert result[0].commit_id == cid
126
127 def test_linear_chain_yields_all_in_bfs_order(self, tmp_path: pathlib.Path) -> None:
128 """A→B→C chain starting from A yields A, B, C in that order."""
129 repo = _make_repo(tmp_path)
130 c = _write_commit(repo, "root")
131 b = _write_commit(repo, "second", parent=c)
132 a = _write_commit(repo, "third", parent=b)
133
134 result = _ids(iter_ancestors(repo, a))
135 assert result == [a, b, c]
136
137 def test_merge_commit_visits_both_parents(self, tmp_path: pathlib.Path) -> None:
138 """Merge commit with two parents → both parent branches visited."""
139 repo = _make_repo(tmp_path)
140 root = _write_commit(repo, "root")
141 left = _write_commit(repo, "left", parent=root)
142 right = _write_commit(repo, "right", parent=root)
143 merge = _write_commit(repo, "merge", parent=left, parent2=right)
144
145 visited = set(_ids(iter_ancestors(repo, merge)))
146 assert merge in visited
147 assert left in visited
148 assert right in visited
149 assert root in visited
150
151 def test_diamond_dag_shared_ancestor_visited_once(self, tmp_path: pathlib.Path) -> None:
152 """In a diamond (A→B, A→C, B→D, C→D), D is visited exactly once."""
153 repo = _make_repo(tmp_path)
154 d = _write_commit(repo, "D")
155 b = _write_commit(repo, "B", parent=d)
156 c = _write_commit(repo, "C", parent=d)
157 a = _write_commit(repo, "A", parent=b, parent2=c)
158
159 result = list(iter_ancestors(repo, a))
160 commit_ids = _ids(result)
161 # D must appear exactly once despite two paths reaching it.
162 assert commit_ids.count(d) == 1
163 assert set(commit_ids) == {a, b, c, d}
164
165 def test_first_parent_only_skips_second_parent(self, tmp_path: pathlib.Path) -> None:
166 repo = _make_repo(tmp_path)
167 root = _write_commit(repo, "root")
168 left = _write_commit(repo, "left", parent=root)
169 right = _write_commit(repo, "right", parent=root)
170 merge = _write_commit(repo, "merge", parent=left, parent2=right)
171
172 result = set(_ids(iter_ancestors(repo, merge, first_parent_only=True)))
173 assert merge in result
174 assert left in result
175 assert root in result
176 assert right not in result # second parent branch skipped
177
178 def test_exclude_stops_at_boundary(self, tmp_path: pathlib.Path) -> None:
179 """Commits in `exclude` and their ancestors are never yielded."""
180 repo = _make_repo(tmp_path)
181 root = _write_commit(repo, "root")
182 mid = _write_commit(repo, "mid", parent=root)
183 tip = _write_commit(repo, "tip", parent=mid)
184
185 result = _ids(iter_ancestors(repo, tip, exclude={mid}))
186 assert tip in result
187 assert mid not in result
188 assert root not in result
189
190 def test_max_commits_caps_yield(self, tmp_path: pathlib.Path) -> None:
191 """max_commits=2 yields at most 2 commits from a longer chain."""
192 repo = _make_repo(tmp_path)
193 c = _write_commit(repo, "root")
194 b = _write_commit(repo, "mid", parent=c)
195 a = _write_commit(repo, "tip", parent=b)
196
197 result = list(iter_ancestors(repo, a, max_commits=2))
198 assert len(result) == 2
199
200 def test_missing_commit_skipped_walk_continues(self, tmp_path: pathlib.Path) -> None:
201 """A missing commit ID in the chain is silently skipped."""
202 repo = _make_repo(tmp_path)
203 root = _write_commit(repo, "root")
204 # Write a commit that references a nonexistent parent.
205 snap_id = compute_snapshot_id({})
206 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={}))
207 ghost_parent = long_id("ab" * 32)
208 cid = compute_commit_id(
209 parent_ids=[ghost_parent],
210 snapshot_id=snap_id,
211 message="orphan",
212 committed_at_iso=_BASE_DT.isoformat(),
213 author="A",
214 )
215 # Write directly to bypass the parent-existence guard — this test
216 # intentionally creates a commit whose parent is absent to verify that
217 # iter_ancestors skips missing parents gracefully.
218 record = CommitRecord(
219 commit_id=cid,
220 branch="main",
221 snapshot_id=snap_id,
222 message="orphan",
223 committed_at=_BASE_DT,
224 parent_commit_id=ghost_parent,
225 author="A",
226 )
227 import json as _json
228 from muse.core.object_store import object_path as _obj_path
229 obj_file = _obj_path(repo, cid)
230 obj_file.parent.mkdir(parents=True, exist_ok=True)
231 payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode()
232 obj_file.write_bytes(f"commit {len(payload)}\0".encode() + payload)
233
234 # Should yield the orphan commit without crashing on the ghost parent.
235 result = list(iter_ancestors(repo, cid))
236 assert len(result) == 1
237 assert result[0].commit_id == cid
238
239 def test_multi_source_starts(self, tmp_path: pathlib.Path) -> None:
240 """Multi-source BFS from two tips collects ancestors of both."""
241 repo = _make_repo(tmp_path)
242 shared = _write_commit(repo, "shared")
243 branch_a = _write_commit(repo, "branch_a", parent=shared)
244 branch_b = _write_commit(repo, "branch_b", parent=shared)
245
246 visited = set(_ids(iter_ancestors(repo, [branch_a, branch_b])))
247 assert branch_a in visited
248 assert branch_b in visited
249 assert shared in visited # shared ancestor visited once
250
251 def test_multi_source_shared_ancestor_once(self, tmp_path: pathlib.Path) -> None:
252 """Shared ancestor appears exactly once in multi-source BFS."""
253 repo = _make_repo(tmp_path)
254 shared = _write_commit(repo, "shared")
255 a = _write_commit(repo, "a", parent=shared)
256 b = _write_commit(repo, "b", parent=shared)
257
258 all_ids = _ids(iter_ancestors(repo, [a, b]))
259 assert all_ids.count(shared) == 1
260
261 def test_yields_commit_records_not_ids(self, tmp_path: pathlib.Path) -> None:
262 """iter_ancestors yields CommitRecord objects (not strings)."""
263 from muse.core.commits import CommitRecord as CR
264 repo = _make_repo(tmp_path)
265 cid = _write_commit(repo, "root", author="Alice")
266 result = list(iter_ancestors(repo, cid))
267 assert len(result) == 1
268 assert isinstance(result[0], CR)
269 assert result[0].author == "Alice"
270
271 def test_uses_deque_not_list(self) -> None:
272 """walk_dag (the underlying primitive) must use collections.deque for O(1) pops."""
273 import inspect
274 from muse.core import graph as graph_module
275 source = inspect.getsource(graph_module.walk_dag)
276 assert "deque" in source, "walk_dag must use collections.deque"
277 assert "pop(0)" not in source, "must not use list.pop(0)"
278 assert "insert(0" not in source, "must not use list.insert(0, ...)"
279
280
281 # ---------------------------------------------------------------------------
282 # ancestor_ids
283 # ---------------------------------------------------------------------------
284
285
286 class TestAncestorIds:
287 def test_returns_set_of_prefixed_ids(self, tmp_path: pathlib.Path) -> None:
288 repo = _make_repo(tmp_path)
289 c = _write_commit(repo, "root")
290 b = _write_commit(repo, "mid", parent=c)
291 a = _write_commit(repo, "tip", parent=b)
292 result = ancestor_ids(repo, a)
293 assert isinstance(result, set)
294 assert result == {a, b, c}
295 for oid in result:
296 assert oid.startswith("sha256:")
297
298 def test_empty_starts_returns_empty_set(self, tmp_path: pathlib.Path) -> None:
299 repo = _make_repo(tmp_path)
300 assert ancestor_ids(repo, []) == set()
301
302 def test_exclude_boundaries_respected(self, tmp_path: pathlib.Path) -> None:
303 repo = _make_repo(tmp_path)
304 root = _write_commit(repo, "root")
305 mid = _write_commit(repo, "mid", parent=root)
306 tip = _write_commit(repo, "tip", parent=mid)
307 result = ancestor_ids(repo, tip, exclude={mid})
308 assert tip in result
309 assert mid not in result
310 assert root not in result
311
312 def test_max_commits_respected(self, tmp_path: pathlib.Path) -> None:
313 repo = _make_repo(tmp_path)
314 ids = []
315 prev = None
316 for i in range(10):
317 cid = _write_commit(repo, f"c{i}", parent=prev)
318 ids.append(cid)
319 prev = cid
320 tip = ids[-1]
321 result = ancestor_ids(repo, tip, max_commits=5)
322 assert len(result) <= 5
323
324 def test_range_exclusion_pattern(self, tmp_path: pathlib.Path) -> None:
325 """ancestor_ids(base) as exclude= produces A..B semantics."""
326 repo = _make_repo(tmp_path)
327 shared = _write_commit(repo, "shared")
328 base = _write_commit(repo, "base", parent=shared)
329 c1 = _write_commit(repo, "c1", parent=base)
330 c2 = _write_commit(repo, "c2", parent=c1)
331
332 base_ancestors = ancestor_ids(repo, base)
333 tip_range = ancestor_ids(repo, c2, exclude=base_ancestors)
334 assert c2 in tip_range
335 assert c1 in tip_range
336 assert base not in tip_range
337 assert shared not in tip_range
338
339
340 # ---------------------------------------------------------------------------
341 # find_merge_base
342 # ---------------------------------------------------------------------------
343
344
345 class TestFindMergeBase:
346 def test_same_commit_returns_itself(self, tmp_path: pathlib.Path) -> None:
347 repo = _make_repo(tmp_path)
348 cid = _write_commit(repo, "root")
349 assert find_merge_base(repo, cid, cid) == cid
350
351 def test_linear_chain_returns_common_ancestor(self, tmp_path: pathlib.Path) -> None:
352 """A→B→C: merge_base(A, B) == B."""
353 repo = _make_repo(tmp_path)
354 c = _write_commit(repo, "root")
355 b = _write_commit(repo, "mid", parent=c)
356 a = _write_commit(repo, "tip", parent=b)
357 assert find_merge_base(repo, a, b) == b
358
359 def test_true_merge_returns_lca(self, tmp_path: pathlib.Path) -> None:
360 """root→left, root→right: merge_base(left, right) == root."""
361 repo = _make_repo(tmp_path)
362 root = _write_commit(repo, "root")
363 left = _write_commit(repo, "left", parent=root)
364 right = _write_commit(repo, "right", parent=root)
365 assert find_merge_base(repo, left, right) == root
366
367 def test_deeper_lca(self, tmp_path: pathlib.Path) -> None:
368 """A→B→X, A→C→X: merge_base(B, C) == X."""
369 repo = _make_repo(tmp_path)
370 x = _write_commit(repo, "X")
371 b = _write_commit(repo, "B", parent=x)
372 c = _write_commit(repo, "C", parent=x)
373 assert find_merge_base(repo, b, c) == x
374
375 def test_no_common_ancestor_returns_none(self, tmp_path: pathlib.Path) -> None:
376 """Two root commits with no shared history → None."""
377 repo = _make_repo(tmp_path)
378 root_a = _write_commit(repo, "rootA")
379 root_b = _write_commit(repo, "rootB")
380 result = find_merge_base(repo, root_a, root_b)
381 assert result is None
382
383 def test_symmetric(self, tmp_path: pathlib.Path) -> None:
384 """find_merge_base(a, b) == find_merge_base(b, a)."""
385 repo = _make_repo(tmp_path)
386 root = _write_commit(repo, "root")
387 left = _write_commit(repo, "left", parent=root)
388 right = _write_commit(repo, "right", parent=root)
389 assert find_merge_base(repo, left, right) == find_merge_base(repo, right, left)
390
391 def test_max_ancestors_raises_on_deep_graph(self, tmp_path: pathlib.Path) -> None:
392 """When max_ancestors is exceeded, raises ValueError."""
393 repo = _make_repo(tmp_path)
394 # Build a chain of 10 commits.
395 prev = None
396 for i in range(10):
397 prev = _write_commit(repo, f"c{i}", parent=prev)
398 a = _write_commit(repo, "a", parent=prev)
399 b_root = _write_commit(repo, "b_root")
400 b = _write_commit(repo, "b", parent=b_root)
401 # With max_ancestors=1, BFS from a and b won't find common ancestor.
402 with pytest.raises(ValueError, match="max_ancestors"):
403 find_merge_base(repo, a, b, max_ancestors=1)
404
405 def test_ancestor_of_itself_via_chain(self, tmp_path: pathlib.Path) -> None:
406 """merge_base(tip, root) == root when root is an ancestor of tip."""
407 repo = _make_repo(tmp_path)
408 root = _write_commit(repo, "root")
409 mid = _write_commit(repo, "mid", parent=root)
410 tip = _write_commit(repo, "tip", parent=mid)
411 result = find_merge_base(repo, tip, root)
412 assert result == root