gabriel / muse public
test_core_graph.py python
405 lines 16.3 KB
Raw
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
1 """Tests for muse/core/graph.py — canonical commit DAG traversal primitives.
2
3 Coverage
4 --------
5 iter_ancestors
6 - empty starts list → yields nothing
7 - single root commit (no parents) → yields just that commit
8 - linear chain, yields in BFS order (newest-first)
9 - merge commit (two parents) → both parents visited, each once
10 - first_parent_only=True → only follows parent_commit_id
11 - exclude= set → treats boundary commits as already-visited
12 - max_commits=N → stops after N commits yielded (missing commits don't count)
13 - missing commit in chain → skipped gracefully, walk continues
14 - diamond DAG (shared ancestor) → shared commit visited exactly once
15 - multi-source starts → seeds BFS from multiple tips simultaneously
16 - uses deque internally (structural check, same rationale as blame test)
17
18 ancestor_ids
19 - returns set of commit IDs (sha256:-prefixed)
20 - empty repo → empty set
21 - respects exclude= boundaries
22 - respects max_commits cap (returns IDs of visited commits)
23
24 find_merge_base
25 - same commit → returns itself
26 - linear chain → returns the common ancestor
27 - true merge topology → returns LCA
28 - no common ancestor (two disjoint chains) → returns None
29 - max_ancestors= exceeded → raises ValueError
30 """
31
32 from __future__ import annotations
33
34 import datetime
35 import json
36 import pathlib
37
38 import pytest
39
40 from muse.core.graph import ancestor_ids, find_merge_base, iter_ancestors
41 from muse.core.types import long_id
42 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
43 from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot
44 from muse.core.paths import muse_dir
45
46 _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
47
48
49 # ---------------------------------------------------------------------------
50 # Helpers
51 # ---------------------------------------------------------------------------
52
53
54 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
55 dot_muse = muse_dir(tmp_path)
56 for d in ("objects", "commits", "snapshots", "refs/heads"):
57 (dot_muse / d).mkdir(parents=True, exist_ok=True)
58 (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
59 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
60 return tmp_path
61
62
63 def _write_commit(
64 repo: pathlib.Path,
65 message: str = "commit",
66 parent: str | None = None,
67 parent2: str | None = None,
68 author: str = "Author",
69 committed_at: datetime.datetime | None = None,
70 ) -> str:
71 """Write a commit and return its commit_id."""
72 dt = committed_at if committed_at is not None else _BASE_DT
73 snap_id = compute_snapshot_id({})
74 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={}))
75 parent_ids = [p for p in (parent, parent2) if p is not None]
76 commit_id = compute_commit_id(
77 parent_ids=parent_ids,
78 snapshot_id=snap_id,
79 message=message,
80 committed_at_iso=dt.isoformat(),
81 author=author,
82 )
83 write_commit(
84 repo,
85 CommitRecord(
86 commit_id=commit_id,
87 branch="main",
88 snapshot_id=snap_id,
89 message=message,
90 committed_at=dt,
91 parent_commit_id=parent,
92 author=author,
93 parent2_commit_id=parent2,
94 ),
95 )
96 return commit_id
97
98
99 def _ids(commits: list[CommitRecord]) -> list[str]:
100 return [c.commit_id for c in commits]
101
102
103 # ---------------------------------------------------------------------------
104 # iter_ancestors
105 # ---------------------------------------------------------------------------
106
107
108 class TestIterAncestors:
109 def test_empty_starts_yields_nothing(self, tmp_path: pathlib.Path) -> None:
110 repo = _make_repo(tmp_path)
111 assert list(iter_ancestors(repo, [])) == []
112
113 def test_single_root_commit(self, tmp_path: pathlib.Path) -> None:
114 repo = _make_repo(tmp_path)
115 cid = _write_commit(repo, "root")
116 result = list(iter_ancestors(repo, cid))
117 assert len(result) == 1
118 assert result[0].commit_id == cid
119
120 def test_linear_chain_yields_all_in_bfs_order(self, tmp_path: pathlib.Path) -> None:
121 """A→B→C chain starting from A yields A, B, C in that order."""
122 repo = _make_repo(tmp_path)
123 c = _write_commit(repo, "root")
124 b = _write_commit(repo, "second", parent=c)
125 a = _write_commit(repo, "third", parent=b)
126
127 result = _ids(iter_ancestors(repo, a))
128 assert result == [a, b, c]
129
130 def test_merge_commit_visits_both_parents(self, tmp_path: pathlib.Path) -> None:
131 """Merge commit with two parents → both parent branches visited."""
132 repo = _make_repo(tmp_path)
133 root = _write_commit(repo, "root")
134 left = _write_commit(repo, "left", parent=root)
135 right = _write_commit(repo, "right", parent=root)
136 merge = _write_commit(repo, "merge", parent=left, parent2=right)
137
138 visited = set(_ids(iter_ancestors(repo, merge)))
139 assert merge in visited
140 assert left in visited
141 assert right in visited
142 assert root in visited
143
144 def test_diamond_dag_shared_ancestor_visited_once(self, tmp_path: pathlib.Path) -> None:
145 """In a diamond (A→B, A→C, B→D, C→D), D is visited exactly once."""
146 repo = _make_repo(tmp_path)
147 d = _write_commit(repo, "D")
148 b = _write_commit(repo, "B", parent=d)
149 c = _write_commit(repo, "C", parent=d)
150 a = _write_commit(repo, "A", parent=b, parent2=c)
151
152 result = list(iter_ancestors(repo, a))
153 commit_ids = _ids(result)
154 # D must appear exactly once despite two paths reaching it.
155 assert commit_ids.count(d) == 1
156 assert set(commit_ids) == {a, b, c, d}
157
158 def test_first_parent_only_skips_second_parent(self, tmp_path: pathlib.Path) -> None:
159 repo = _make_repo(tmp_path)
160 root = _write_commit(repo, "root")
161 left = _write_commit(repo, "left", parent=root)
162 right = _write_commit(repo, "right", parent=root)
163 merge = _write_commit(repo, "merge", parent=left, parent2=right)
164
165 result = set(_ids(iter_ancestors(repo, merge, first_parent_only=True)))
166 assert merge in result
167 assert left in result
168 assert root in result
169 assert right not in result # second parent branch skipped
170
171 def test_exclude_stops_at_boundary(self, tmp_path: pathlib.Path) -> None:
172 """Commits in `exclude` and their ancestors are never yielded."""
173 repo = _make_repo(tmp_path)
174 root = _write_commit(repo, "root")
175 mid = _write_commit(repo, "mid", parent=root)
176 tip = _write_commit(repo, "tip", parent=mid)
177
178 result = _ids(iter_ancestors(repo, tip, exclude={mid}))
179 assert tip in result
180 assert mid not in result
181 assert root not in result
182
183 def test_max_commits_caps_yield(self, tmp_path: pathlib.Path) -> None:
184 """max_commits=2 yields at most 2 commits from a longer chain."""
185 repo = _make_repo(tmp_path)
186 c = _write_commit(repo, "root")
187 b = _write_commit(repo, "mid", parent=c)
188 a = _write_commit(repo, "tip", parent=b)
189
190 result = list(iter_ancestors(repo, a, max_commits=2))
191 assert len(result) == 2
192
193 def test_missing_commit_skipped_walk_continues(self, tmp_path: pathlib.Path) -> None:
194 """A missing commit ID in the chain is silently skipped."""
195 repo = _make_repo(tmp_path)
196 root = _write_commit(repo, "root")
197 # Write a commit that references a nonexistent parent.
198 snap_id = compute_snapshot_id({})
199 write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={}))
200 ghost_parent = long_id("ab" * 32)
201 cid = compute_commit_id(
202 parent_ids=[ghost_parent],
203 snapshot_id=snap_id,
204 message="orphan",
205 committed_at_iso=_BASE_DT.isoformat(),
206 author="A",
207 )
208 # Write directly to bypass the parent-existence guard — this test
209 # intentionally creates a commit whose parent is absent to verify that
210 # iter_ancestors skips missing parents gracefully.
211 record = CommitRecord(
212 commit_id=cid,
213 branch="main",
214 snapshot_id=snap_id,
215 message="orphan",
216 committed_at=_BASE_DT,
217 parent_commit_id=ghost_parent,
218 author="A",
219 )
220 import json as _json
221 from muse.core.object_store import object_path as _obj_path
222 obj_file = _obj_path(repo, cid)
223 obj_file.parent.mkdir(parents=True, exist_ok=True)
224 payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode()
225 obj_file.write_bytes(f"commit {len(payload)}\0".encode() + payload)
226
227 # Should yield the orphan commit without crashing on the ghost parent.
228 result = list(iter_ancestors(repo, cid))
229 assert len(result) == 1
230 assert result[0].commit_id == cid
231
232 def test_multi_source_starts(self, tmp_path: pathlib.Path) -> None:
233 """Multi-source BFS from two tips collects ancestors of both."""
234 repo = _make_repo(tmp_path)
235 shared = _write_commit(repo, "shared")
236 branch_a = _write_commit(repo, "branch_a", parent=shared)
237 branch_b = _write_commit(repo, "branch_b", parent=shared)
238
239 visited = set(_ids(iter_ancestors(repo, [branch_a, branch_b])))
240 assert branch_a in visited
241 assert branch_b in visited
242 assert shared in visited # shared ancestor visited once
243
244 def test_multi_source_shared_ancestor_once(self, tmp_path: pathlib.Path) -> None:
245 """Shared ancestor appears exactly once in multi-source BFS."""
246 repo = _make_repo(tmp_path)
247 shared = _write_commit(repo, "shared")
248 a = _write_commit(repo, "a", parent=shared)
249 b = _write_commit(repo, "b", parent=shared)
250
251 all_ids = _ids(iter_ancestors(repo, [a, b]))
252 assert all_ids.count(shared) == 1
253
254 def test_yields_commit_records_not_ids(self, tmp_path: pathlib.Path) -> None:
255 """iter_ancestors yields CommitRecord objects (not strings)."""
256 from muse.core.store import CommitRecord as CR
257 repo = _make_repo(tmp_path)
258 cid = _write_commit(repo, "root", author="Alice")
259 result = list(iter_ancestors(repo, cid))
260 assert len(result) == 1
261 assert isinstance(result[0], CR)
262 assert result[0].author == "Alice"
263
264 def test_uses_deque_not_list(self) -> None:
265 """walk_dag (the underlying primitive) must use collections.deque for O(1) pops."""
266 import inspect
267 from muse.core import graph as graph_module
268 source = inspect.getsource(graph_module.walk_dag)
269 assert "deque" in source, "walk_dag must use collections.deque"
270 assert "pop(0)" not in source, "must not use list.pop(0)"
271 assert "insert(0" not in source, "must not use list.insert(0, ...)"
272
273
274 # ---------------------------------------------------------------------------
275 # ancestor_ids
276 # ---------------------------------------------------------------------------
277
278
279 class TestAncestorIds:
280 def test_returns_set_of_prefixed_ids(self, tmp_path: pathlib.Path) -> None:
281 repo = _make_repo(tmp_path)
282 c = _write_commit(repo, "root")
283 b = _write_commit(repo, "mid", parent=c)
284 a = _write_commit(repo, "tip", parent=b)
285 result = ancestor_ids(repo, a)
286 assert isinstance(result, set)
287 assert result == {a, b, c}
288 for oid in result:
289 assert oid.startswith("sha256:")
290
291 def test_empty_starts_returns_empty_set(self, tmp_path: pathlib.Path) -> None:
292 repo = _make_repo(tmp_path)
293 assert ancestor_ids(repo, []) == set()
294
295 def test_exclude_boundaries_respected(self, tmp_path: pathlib.Path) -> None:
296 repo = _make_repo(tmp_path)
297 root = _write_commit(repo, "root")
298 mid = _write_commit(repo, "mid", parent=root)
299 tip = _write_commit(repo, "tip", parent=mid)
300 result = ancestor_ids(repo, tip, exclude={mid})
301 assert tip in result
302 assert mid not in result
303 assert root not in result
304
305 def test_max_commits_respected(self, tmp_path: pathlib.Path) -> None:
306 repo = _make_repo(tmp_path)
307 ids = []
308 prev = None
309 for i in range(10):
310 cid = _write_commit(repo, f"c{i}", parent=prev)
311 ids.append(cid)
312 prev = cid
313 tip = ids[-1]
314 result = ancestor_ids(repo, tip, max_commits=5)
315 assert len(result) <= 5
316
317 def test_range_exclusion_pattern(self, tmp_path: pathlib.Path) -> None:
318 """ancestor_ids(base) as exclude= produces A..B semantics."""
319 repo = _make_repo(tmp_path)
320 shared = _write_commit(repo, "shared")
321 base = _write_commit(repo, "base", parent=shared)
322 c1 = _write_commit(repo, "c1", parent=base)
323 c2 = _write_commit(repo, "c2", parent=c1)
324
325 base_ancestors = ancestor_ids(repo, base)
326 tip_range = ancestor_ids(repo, c2, exclude=base_ancestors)
327 assert c2 in tip_range
328 assert c1 in tip_range
329 assert base not in tip_range
330 assert shared not in tip_range
331
332
333 # ---------------------------------------------------------------------------
334 # find_merge_base
335 # ---------------------------------------------------------------------------
336
337
338 class TestFindMergeBase:
339 def test_same_commit_returns_itself(self, tmp_path: pathlib.Path) -> None:
340 repo = _make_repo(tmp_path)
341 cid = _write_commit(repo, "root")
342 assert find_merge_base(repo, cid, cid) == cid
343
344 def test_linear_chain_returns_common_ancestor(self, tmp_path: pathlib.Path) -> None:
345 """A→B→C: merge_base(A, B) == B."""
346 repo = _make_repo(tmp_path)
347 c = _write_commit(repo, "root")
348 b = _write_commit(repo, "mid", parent=c)
349 a = _write_commit(repo, "tip", parent=b)
350 assert find_merge_base(repo, a, b) == b
351
352 def test_true_merge_returns_lca(self, tmp_path: pathlib.Path) -> None:
353 """root→left, root→right: merge_base(left, right) == root."""
354 repo = _make_repo(tmp_path)
355 root = _write_commit(repo, "root")
356 left = _write_commit(repo, "left", parent=root)
357 right = _write_commit(repo, "right", parent=root)
358 assert find_merge_base(repo, left, right) == root
359
360 def test_deeper_lca(self, tmp_path: pathlib.Path) -> None:
361 """A→B→X, A→C→X: merge_base(B, C) == X."""
362 repo = _make_repo(tmp_path)
363 x = _write_commit(repo, "X")
364 b = _write_commit(repo, "B", parent=x)
365 c = _write_commit(repo, "C", parent=x)
366 assert find_merge_base(repo, b, c) == x
367
368 def test_no_common_ancestor_returns_none(self, tmp_path: pathlib.Path) -> None:
369 """Two root commits with no shared history → None."""
370 repo = _make_repo(tmp_path)
371 root_a = _write_commit(repo, "rootA")
372 root_b = _write_commit(repo, "rootB")
373 result = find_merge_base(repo, root_a, root_b)
374 assert result is None
375
376 def test_symmetric(self, tmp_path: pathlib.Path) -> None:
377 """find_merge_base(a, b) == find_merge_base(b, a)."""
378 repo = _make_repo(tmp_path)
379 root = _write_commit(repo, "root")
380 left = _write_commit(repo, "left", parent=root)
381 right = _write_commit(repo, "right", parent=root)
382 assert find_merge_base(repo, left, right) == find_merge_base(repo, right, left)
383
384 def test_max_ancestors_raises_on_deep_graph(self, tmp_path: pathlib.Path) -> None:
385 """When max_ancestors is exceeded, raises ValueError."""
386 repo = _make_repo(tmp_path)
387 # Build a chain of 10 commits.
388 prev = None
389 for i in range(10):
390 prev = _write_commit(repo, f"c{i}", parent=prev)
391 a = _write_commit(repo, "a", parent=prev)
392 b_root = _write_commit(repo, "b_root")
393 b = _write_commit(repo, "b", parent=b_root)
394 # With max_ancestors=1, BFS from a and b won't find common ancestor.
395 with pytest.raises(ValueError, match="max_ancestors"):
396 find_merge_base(repo, a, b, max_ancestors=1)
397
398 def test_ancestor_of_itself_via_chain(self, tmp_path: pathlib.Path) -> None:
399 """merge_base(tip, root) == root when root is an ancestor of tip."""
400 repo = _make_repo(tmp_path)
401 root = _write_commit(repo, "root")
402 mid = _write_commit(repo, "mid", parent=root)
403 tip = _write_commit(repo, "tip", parent=mid)
404 result = find_merge_base(repo, tip, root)
405 assert result == root
File History 2 commits
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago