gabriel / muse public
test_walk_dag.py python
375 lines 13.4 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 22 days ago
1 """TDD — walk_dag generic DAG walker (Phase 1).
2
3 Tests are written against the planned public API before implementation.
4 All tests in this file are expected to FAIL until walk_dag is implemented
5 in muse/core/graph.py.
6
7 Coverage
8 --------
9 walk_dag
10 - empty starts → yields nothing
11 - single node, no neighbours → yields just that node
12 - linear chain BFS order (root last)
13 - linear chain DFS order (root last — same for a chain)
14 - diamond DAG: shared node visited exactly once (BFS)
15 - diamond DAG: shared node visited exactly once (DFS)
16 - BFS yields level-order (breadth before depth)
17 - DFS yields depth-first (depth before breadth)
18 - prune predicate stops subtree at matched node (node itself NOT yielded)
19 - prune at root → yields nothing
20 - exclude pre-seeds visited set (excluded nodes and subtrees skipped)
21 - max_nodes cap stops iteration after N nodes
22 - max_nodes=0 → yields nothing
23 - generic type: works with ints, not just strings
24 - multi-source starts: seeds from multiple start nodes
25 - multi-source: shared ancestor visited once
26 - adjacency returning empty → walk terminates at leaves
27 - invalid order value → raises ValueError
28
29 iter_ancestors rewired
30 - iter_ancestors routes through walk_dag (structural check)
31 """
32
33 from __future__ import annotations
34
35 from collections.abc import Iterable, Iterator
36 from typing import Callable
37
38 import pytest
39
40 from muse.core.graph import walk_dag
41
42
43 # ---------------------------------------------------------------------------
44 # Shared test graph topologies (pure in-memory dicts)
45 # ---------------------------------------------------------------------------
46
47 # Linear: A → B → C → D
48 _LINEAR: dict[str, list[str]] = {
49 "A": ["B"],
50 "B": ["C"],
51 "C": ["D"],
52 "D": [],
53 }
54
55 # Diamond: A → B → D
56 # A → C → D
57 _DIAMOND: dict[str, list[str]] = {
58 "A": ["B", "C"],
59 "B": ["D"],
60 "C": ["D"],
61 "D": [],
62 }
63
64 # Wide tree: root → [L, R]
65 # L → [LL, LR]
66 # R → [RL, RR]
67 _TREE: dict[str, list[str]] = {
68 "root": ["L", "R"],
69 "L": ["LL", "LR"],
70 "R": ["RL", "RR"],
71 "LL": [],
72 "LR": [],
73 "RL": [],
74 "RR": [],
75 }
76
77
78 type _AdjGraph = dict[str, list[str]]
79
80
81 def _adj(graph: _AdjGraph) -> Callable[[str], Iterable[str]]:
82 """Return an adjacency function for a dict-based graph."""
83 return lambda node: graph.get(node, [])
84
85
86 # ---------------------------------------------------------------------------
87 # Basic traversal
88 # ---------------------------------------------------------------------------
89
90
91 class TestWalkDagBasic:
92 def test_empty_starts_yields_nothing(self) -> None:
93 result = list(walk_dag([], _adj(_LINEAR)))
94 assert result == []
95
96 def test_single_node_no_neighbours(self) -> None:
97 graph = {"X": []}
98 result = list(walk_dag("X", _adj(graph)))
99 assert result == ["X"]
100
101 def test_single_node_iterable_start(self) -> None:
102 graph = {"X": []}
103 result = list(walk_dag(["X"], _adj(graph)))
104 assert result == ["X"]
105
106 def test_linear_chain_bfs_all_visited(self) -> None:
107 result = list(walk_dag("A", _adj(_LINEAR), order="bfs"))
108 assert set(result) == {"A", "B", "C", "D"}
109
110 def test_linear_chain_dfs_all_visited(self) -> None:
111 result = list(walk_dag("A", _adj(_LINEAR), order="dfs"))
112 assert set(result) == {"A", "B", "C", "D"}
113
114 def test_linear_chain_bfs_start_first(self) -> None:
115 result = list(walk_dag("A", _adj(_LINEAR), order="bfs"))
116 assert result[0] == "A"
117
118 def test_linear_chain_dfs_start_first(self) -> None:
119 result = list(walk_dag("A", _adj(_LINEAR), order="dfs"))
120 assert result[0] == "A"
121
122 def test_leaf_node_as_start(self) -> None:
123 """Starting from a leaf yields only that leaf."""
124 result = list(walk_dag("D", _adj(_LINEAR)))
125 assert result == ["D"]
126
127 def test_invalid_order_raises(self) -> None:
128 with pytest.raises(ValueError, match="order"):
129 list(walk_dag("A", _adj(_LINEAR), order="zigzag")) # type: ignore[arg-type]
130
131
132 # ---------------------------------------------------------------------------
133 # BFS vs DFS order
134 # ---------------------------------------------------------------------------
135
136
137 class TestWalkDagOrder:
138 def test_bfs_is_level_order(self) -> None:
139 """BFS on wide tree: root before L/R, L/R before leaves."""
140 result = list(walk_dag("root", _adj(_TREE), order="bfs"))
141 root_idx = result.index("root")
142 l_idx = result.index("L")
143 r_idx = result.index("R")
144 ll_idx = result.index("LL")
145 lr_idx = result.index("LR")
146
147 # root before both children
148 assert root_idx < l_idx
149 assert root_idx < r_idx
150 # both children before any grandchild
151 assert l_idx < ll_idx
152 assert l_idx < lr_idx
153 assert r_idx < ll_idx
154
155 def test_dfs_is_depth_first(self) -> None:
156 """DFS on wide tree: root first, then one branch fully explored before other."""
157 result = list(walk_dag("root", _adj(_TREE), order="dfs"))
158 root_idx = result.index("root")
159
160 # root must be first
161 assert root_idx == 0
162
163 # At least one subtree's leaf comes before the other subtree's root.
164 # DFS: root → first-child branch fully → second-child branch.
165 # _TREE adjacency: root → [L, R]
166 # DFS stack pops R first, then L. So actual order depends on stack behaviour.
167 # The invariant we care about: root is first, and no interleaving of L/R subtrees.
168 l_idx = result.index("L")
169 r_idx = result.index("R")
170 ll_idx = result.index("LL")
171 lr_idx = result.index("LR")
172
173 # Whichever branch comes first, its leaves must be before the other branch's root.
174 if l_idx < r_idx:
175 # L branch explored first
176 assert ll_idx < r_idx or lr_idx < r_idx
177 else:
178 # R branch explored first
179 rl_idx = result.index("RL")
180 rr_idx = result.index("RR")
181 assert rl_idx < l_idx or rr_idx < l_idx
182
183 def test_bfs_vs_dfs_different_on_tree(self) -> None:
184 """BFS and DFS produce different orderings on a non-trivial tree."""
185 bfs = list(walk_dag("root", _adj(_TREE), order="bfs"))
186 dfs = list(walk_dag("root", _adj(_TREE), order="dfs"))
187 assert set(bfs) == set(dfs) # same nodes
188 assert bfs != dfs # different order
189
190
191 # ---------------------------------------------------------------------------
192 # Diamond (deduplication)
193 # ---------------------------------------------------------------------------
194
195
196 class TestWalkDagDiamond:
197 def test_diamond_bfs_visits_shared_once(self) -> None:
198 result = list(walk_dag("A", _adj(_DIAMOND), order="bfs"))
199 assert result.count("D") == 1
200 assert set(result) == {"A", "B", "C", "D"}
201
202 def test_diamond_dfs_visits_shared_once(self) -> None:
203 result = list(walk_dag("A", _adj(_DIAMOND), order="dfs"))
204 assert result.count("D") == 1
205 assert set(result) == {"A", "B", "C", "D"}
206
207
208 # ---------------------------------------------------------------------------
209 # prune predicate
210 # ---------------------------------------------------------------------------
211
212
213 class TestWalkDagPrune:
214 def test_prune_stops_at_matched_node(self) -> None:
215 """prune("B") → B and its subtree [C, D] are never yielded."""
216 result = list(walk_dag("A", _adj(_LINEAR), prune=lambda n: n == "B"))
217 assert "A" in result
218 assert "B" not in result
219 assert "C" not in result
220 assert "D" not in result
221
222 def test_prune_at_root_yields_nothing(self) -> None:
223 """Pruning the start node itself → nothing yielded."""
224 result = list(walk_dag("A", _adj(_LINEAR), prune=lambda n: n == "A"))
225 assert result == []
226
227 def test_prune_at_leaf_no_effect(self) -> None:
228 """Pruning a leaf doesn't affect earlier nodes."""
229 result = set(walk_dag("A", _adj(_LINEAR), prune=lambda n: n == "D"))
230 assert result == {"A", "B", "C"}
231
232 def test_prune_one_branch_of_diamond(self) -> None:
233 """Pruning B in diamond: C path still reaches D; B subtree skipped."""
234 # A → B (pruned), A → C → D
235 result = list(walk_dag("A", _adj(_DIAMOND), prune=lambda n: n == "B"))
236 assert "A" in result
237 assert "B" not in result
238 assert "C" in result
239 assert "D" in result # D reachable via C
240
241 def test_prune_both_branches_of_diamond(self) -> None:
242 """Pruning both B and C → D never reached."""
243 result = list(walk_dag("A", _adj(_DIAMOND), prune=lambda n: n in {"B", "C"}))
244 assert result == ["A"]
245
246
247 # ---------------------------------------------------------------------------
248 # exclude
249 # ---------------------------------------------------------------------------
250
251
252 class TestWalkDagExclude:
253 def test_exclude_pre_seeds_visited(self) -> None:
254 """Excluded nodes are treated as already-visited; not yielded."""
255 result = list(walk_dag("A", _adj(_LINEAR), exclude={"B"}))
256 assert "A" in result
257 assert "B" not in result
258 assert "C" not in result # C only reachable via B
259 assert "D" not in result
260
261 def test_exclude_start_node_yields_nothing(self) -> None:
262 """Excluding the start node → nothing yielded (start treated as visited)."""
263 result = list(walk_dag("A", _adj(_LINEAR), exclude={"A"}))
264 assert result == []
265
266 def test_exclude_leaf(self) -> None:
267 """Excluding a leaf: other nodes still visited."""
268 result = set(walk_dag("A", _adj(_LINEAR), exclude={"D"}))
269 assert result == {"A", "B", "C"}
270
271 def test_exclude_does_not_mutate_input(self) -> None:
272 """walk_dag must not mutate the caller's exclude set."""
273 excl: set[str] = set()
274 list(walk_dag("A", _adj(_LINEAR), exclude=excl))
275 assert excl == set()
276
277
278 # ---------------------------------------------------------------------------
279 # max_nodes cap
280 # ---------------------------------------------------------------------------
281
282
283 class TestWalkDagMaxNodes:
284 def test_max_nodes_zero_yields_nothing(self) -> None:
285 result = list(walk_dag("A", _adj(_LINEAR), max_nodes=0))
286 assert result == []
287
288 def test_max_nodes_one_yields_start(self) -> None:
289 result = list(walk_dag("A", _adj(_LINEAR), max_nodes=1))
290 assert result == ["A"]
291
292 def test_max_nodes_caps_count(self) -> None:
293 result = list(walk_dag("A", _adj(_LINEAR), max_nodes=2))
294 assert len(result) == 2
295
296 def test_max_nodes_larger_than_graph(self) -> None:
297 """max_nodes larger than the graph → all nodes yielded normally."""
298 result = list(walk_dag("A", _adj(_LINEAR), max_nodes=999))
299 assert set(result) == {"A", "B", "C", "D"}
300
301
302 # ---------------------------------------------------------------------------
303 # Generic type (ints)
304 # ---------------------------------------------------------------------------
305
306
307 class TestWalkDagGenericType:
308 def test_works_with_integers(self) -> None:
309 """walk_dag is generic — must work with int nodes, not just strings."""
310 graph: dict[int, list[int]] = {1: [2, 3], 2: [4], 3: [4], 4: []}
311 result = list(walk_dag(1, lambda n: graph.get(n, [])))
312 assert set(result) == {1, 2, 3, 4}
313 assert result.count(4) == 1 # shared node visited once
314
315 def test_works_with_tuples(self) -> None:
316 """Nodes can be any hashable — tuples work fine (wrap in list to start)."""
317 graph: dict[tuple[int, int], list[tuple[int, int]]] = {
318 (0, 0): [(1, 0), (0, 1)],
319 (1, 0): [],
320 (0, 1): [],
321 }
322 result = list(walk_dag([(0, 0)], lambda n: graph.get(n, [])))
323 assert set(result) == {(0, 0), (1, 0), (0, 1)}
324
325
326 # ---------------------------------------------------------------------------
327 # Multi-source starts
328 # ---------------------------------------------------------------------------
329
330
331 class TestWalkDagMultiSource:
332 def test_multi_source_visits_all_reachable(self) -> None:
333 graph: dict[str, list[str]] = {
334 "shared": [],
335 "a": ["shared"],
336 "b": ["shared"],
337 }
338 result = list(walk_dag(["a", "b"], _adj(graph)))
339 assert set(result) == {"a", "b", "shared"}
340
341 def test_multi_source_shared_ancestor_once(self) -> None:
342 graph: dict[str, list[str]] = {
343 "shared": [],
344 "a": ["shared"],
345 "b": ["shared"],
346 }
347 result = list(walk_dag(["a", "b"], _adj(graph)))
348 assert result.count("shared") == 1
349
350 def test_multi_source_exclude_applies_to_all(self) -> None:
351 graph: dict[str, list[str]] = {
352 "shared": [],
353 "a": ["shared"],
354 "b": ["shared"],
355 }
356 result = list(walk_dag(["a", "b"], _adj(graph), exclude={"shared"}))
357 assert "shared" not in result
358 assert set(result) == {"a", "b"}
359
360
361 # ---------------------------------------------------------------------------
362 # Structural: iter_ancestors routes through walk_dag
363 # ---------------------------------------------------------------------------
364
365
366 class TestIterAncestorsRoutesThroughWalkDag:
367 def test_iter_ancestors_calls_walk_dag(self) -> None:
368 """iter_ancestors must delegate to walk_dag (implementation check)."""
369 import inspect
370 from muse.core import graph as graph_module
371
372 source = inspect.getsource(graph_module.iter_ancestors)
373 assert "walk_dag" in source, (
374 "iter_ancestors must call walk_dag after Phase 1 refactor"
375 )
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 22 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 23 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 29 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 30 days ago