gabriel / muse public

test_stress_graph.py file-level

at sha256:c · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Stress tests for the commit DAG and merge-base algorithm.
2
3 Exercises:
4 - Linear chains of 500 commits.
5 - Wide fan-out / fan-in (octopus merge shapes).
6 - Criss-cross merge (ambiguous LCA β€” should still find *some* ancestor).
7 - Independent histories (no common ancestor β†’ None).
8 - find_merge_base symmetry: find_merge_base(a, b) == find_merge_base(b, a).
9 - Missing commit handles gracefully (None parent pointers in corrupt graphs).
10 - Diamond topology: four-node diamond always finds the root.
11 - Double diamond: two diamonds chained together.
12 - Long parallel branches that converge at a single point.
13 """
14
15 import datetime
16 import pathlib
17
18 import pytest
19
20 from muse.core.types import fake_id
21 from muse.core.paths import muse_dir
22 from muse.core.merge_engine import find_merge_base
23 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
24 from muse.core.commits import (
25 CommitRecord,
26 write_commit,
27 )
28
29
30 # ---------------------------------------------------------------------------
31 # Helpers
32 # ---------------------------------------------------------------------------
33
34 _SNAP_ID: str = compute_snapshot_id({})
35 _BASE_TS: datetime.datetime = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
36
37
38 def _write(
39 root: pathlib.Path,
40 label: str,
41 parent: str | None = None,
42 parent2: str | None = None,
43 ) -> str:
44 """Write a commit with a real content-addressed ID. Returns the commit_id."""
45 parent_ids = [p for p in (parent, parent2) if p is not None]
46 cid = compute_commit_id(
47 parent_ids=parent_ids,
48 snapshot_id=_SNAP_ID,
49 message=label,
50 committed_at_iso=_BASE_TS.isoformat(),
51 )
52 write_commit(root, CommitRecord(
53 commit_id=cid,
54 branch="main",
55 snapshot_id=_SNAP_ID,
56 message=label,
57 committed_at=_BASE_TS,
58 parent_commit_id=parent,
59 parent2_commit_id=parent2,
60 ))
61 return cid
62
63
64 @pytest.fixture
65 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
66 dot_muse = muse_dir(tmp_path)
67 (dot_muse / "commits").mkdir(parents=True)
68 (dot_muse / "refs" / "heads").mkdir(parents=True)
69 return tmp_path
70
71
72 # ---------------------------------------------------------------------------
73 # Linear chain
74 # ---------------------------------------------------------------------------
75
76
77 class TestLinearChain:
78 def test_chain_of_500_finds_base(self, repo: pathlib.Path) -> None:
79 """LCA of two commits on a 500-long linear chain is the shared ancestor."""
80 prev: str | None = None
81 ids: list[str] = []
82 for i in range(500):
83 cid = _write(repo, f"c{i:04d}", prev)
84 ids.append(cid)
85 prev = cid
86
87 # Branch off at the commit at index 100
88 branch_tip_id = _write(repo, "branch-tip", ids[100])
89
90 base = find_merge_base(repo, ids[499], branch_tip_id)
91 assert base == ids[100]
92
93 def test_lca_of_adjacent_commits_is_parent(self, repo: pathlib.Path) -> None:
94 root_id = _write(repo, "root")
95 child_id = _write(repo, "child", root_id)
96 assert find_merge_base(repo, root_id, child_id) == root_id
97 assert find_merge_base(repo, child_id, root_id) == root_id
98
99 def test_long_chain_lca_symmetry(self, repo: pathlib.Path) -> None:
100 """find_merge_base(a, b) == find_merge_base(b, a) on a long chain."""
101 prev: str | None = None
102 ids: list[str] = []
103 for i in range(100):
104 cid = _write(repo, f"n{i:03d}", prev)
105 ids.append(cid)
106 prev = cid
107
108 left_id = _write(repo, "left", ids[50])
109 right_id = _write(repo, "right", ids[50])
110
111 assert find_merge_base(repo, left_id, right_id) == ids[50]
112 assert find_merge_base(repo, right_id, left_id) == ids[50]
113
114 def test_same_commit_returns_itself(self, repo: pathlib.Path) -> None:
115 solo_id = _write(repo, "solo")
116 assert find_merge_base(repo, solo_id, solo_id) == solo_id
117
118 def test_one_is_ancestor_of_other(self, repo: pathlib.Path) -> None:
119 """When A is a direct ancestor of B, LCA is A."""
120 prev: str | None = None
121 ids: list[str] = []
122 for i in range(20):
123 cid = _write(repo, f"x{i:02d}", prev)
124 ids.append(cid)
125 prev = cid
126 assert find_merge_base(repo, ids[0], ids[19]) == ids[0]
127 assert find_merge_base(repo, ids[19], ids[0]) == ids[0]
128
129
130 # ---------------------------------------------------------------------------
131 # Diamond topology
132 # ---------------------------------------------------------------------------
133
134
135 class TestDiamondTopology:
136 def test_simple_diamond(self, repo: pathlib.Path) -> None:
137 """
138 root
139 / \\
140 L R
141 \\ /
142 M (merge commit, not relevant here β€” just find LCA of L and R)
143 """
144 root_id = _write(repo, "root")
145 l_id = _write(repo, "L", root_id)
146 r_id = _write(repo, "R", root_id)
147 assert find_merge_base(repo, l_id, r_id) == root_id
148
149 def test_double_diamond(self, repo: pathlib.Path) -> None:
150 """
151 A
152 / \\
153 B C
154 \\ /
155 D
156 / \\
157 E F
158 \\ /
159 G
160 LCA(E, F) should be D.
161 """
162 a_id = _write(repo, "A")
163 b_id = _write(repo, "B", a_id)
164 c_id = _write(repo, "C", a_id)
165 d_id = _write(repo, "D", b_id, c_id)
166 e_id = _write(repo, "E", d_id)
167 f_id = _write(repo, "F", d_id)
168 assert find_merge_base(repo, e_id, f_id) == d_id
169
170 def test_criss_cross_merge(self, repo: pathlib.Path) -> None:
171 """
172 Criss-cross: A and B are each other's ancestor via two different merge paths.
173 X β†’ L1 β†’ M1(L1,R1)
174 X β†’ R1 β†’ M2(R1,L1)
175 LCA of M1 and M2 should be either L1 or R1 (both are valid LCAs).
176 The algorithm must not return None or crash.
177 """
178 x_id = _write(repo, "X")
179 l1_id = _write(repo, "L1", x_id)
180 r1_id = _write(repo, "R1", x_id)
181 m1_id = _write(repo, "M1", l1_id, r1_id)
182 m2_id = _write(repo, "M2", r1_id, l1_id)
183
184 base = find_merge_base(repo, m1_id, m2_id)
185 # Any of X, L1, R1 is a valid common ancestor; None is not acceptable.
186 assert base is not None
187 assert base in {x_id, l1_id, r1_id}
188
189 def test_octopus_three_branch_fan_in(self, repo: pathlib.Path) -> None:
190 """Three branches that all diverged from the same root."""
191 root_id = _write(repo, "root")
192 ba_id = _write(repo, "branch-a", root_id)
193 bb_id = _write(repo, "branch-b", root_id)
194 bc_id = _write(repo, "branch-c", root_id)
195
196 assert find_merge_base(repo, ba_id, bb_id) == root_id
197 assert find_merge_base(repo, ba_id, bc_id) == root_id
198 assert find_merge_base(repo, bb_id, bc_id) == root_id
199
200
201 # ---------------------------------------------------------------------------
202 # Independent histories
203 # ---------------------------------------------------------------------------
204
205
206 class TestDisjointHistories:
207 def test_no_common_ancestor_returns_none(self, repo: pathlib.Path) -> None:
208 island_a_id = _write(repo, "island-a")
209 island_b_id = _write(repo, "island-b")
210 assert find_merge_base(repo, island_a_id, island_b_id) is None
211
212 def test_long_independent_chains_return_none(self, repo: pathlib.Path) -> None:
213 prev_a: str | None = None
214 prev_b: str | None = None
215 last_a = last_b = ""
216 for i in range(20):
217 last_a = _write(repo, f"a{i:02d}", prev_a)
218 last_b = _write(repo, f"b{i:02d}", prev_b)
219 prev_a = last_a
220 prev_b = last_b
221 assert find_merge_base(repo, last_a, last_b) is None
222
223 def test_missing_commit_id_graceful(self, repo: pathlib.Path) -> None:
224 """Asking for an LCA where one commit doesn't exist should return None, not raise."""
225 real_id = _write(repo, "real")
226 result = find_merge_base(repo, real_id, fake_id("ghost-commit-that-does-not-exist"))
227 # The ghost has no ancestors, so no common ancestor found.
228 assert result is None
229
230
231 # ---------------------------------------------------------------------------
232 # Ancestor-set correctness
233 # ---------------------------------------------------------------------------
234
235
236 class TestAncestorCorrectness:
237 def test_merge_commit_has_both_parents_as_ancestors(self, repo: pathlib.Path) -> None:
238 root_id = _write(repo, "root")
239 a_id = _write(repo, "A", root_id)
240 b_id = _write(repo, "B", root_id)
241 merge_id = _write(repo, "merge", a_id, b_id)
242 feature_id = _write(repo, "feature", a_id)
243
244 # LCA of feature and merge: feature branched from A, merge contains A.
245 # So A is the common ancestor.
246 base = find_merge_base(repo, feature_id, merge_id)
247 assert base == a_id
248
249 def test_wide_history_with_shared_root(self, repo: pathlib.Path) -> None:
250 """100 branches diverging from a shared root, pairwise LCA is root."""
251 root_id = _write(repo, "root")
252 branch_ids = [_write(repo, f"br{i:03d}", root_id) for i in range(50)]
253
254 # Check a sampling of pairs
255 for i in range(0, 50, 10):
256 for j in range(i + 1, 50, 10):
257 assert find_merge_base(repo, branch_ids[i], branch_ids[j]) == root_id
258
259 def test_deep_branch_divergence(self, repo: pathlib.Path) -> None:
260 """Branches diverge at root, each has 50 commits. LCA is root."""
261 root_id = _write(repo, "root")
262 prev_a: str | None = root_id
263 prev_b: str | None = root_id
264 last_a = last_b = root_id
265 for i in range(50):
266 last_a = _write(repo, f"da{i:02d}", prev_a)
267 last_b = _write(repo, f"db{i:02d}", prev_b)
268 prev_a = last_a
269 prev_b = last_b
270
271 assert find_merge_base(repo, last_a, last_b) == root_id
272
273 def test_multiple_merge_bases_chain(self, repo: pathlib.Path) -> None:
274 """A β†’ B β†’ C; branch D from B. LCA of C and D is B."""
275 a_id = _write(repo, "A")
276 b_id = _write(repo, "B", a_id)
277 c_id = _write(repo, "C", b_id)
278 d_id = _write(repo, "D", b_id)
279 assert find_merge_base(repo, c_id, d_id) == b_id
280 assert find_merge_base(repo, d_id, c_id) == b_id