"""TDD Phase 3 — migrate non-commit graph BFS/DFS to walk_dag. Target sites ------------ shard._connected_components — DFS connected-components over file coupling graph identity/dag._DAG.would_cycle — DFS cycle detection for identity relationship DAG Intentionally NOT migrated -------------------------- callgraph.transitive_callers / transitive_callees: output format is {depth: [name, ...]}, which requires tracking BFS depth alongside nodes. walk_dag yields a flat node stream — encoding depth into the node type (e.g. (name, depth) tuples) would make the code harder to read. These functions have a genuinely different shape and are a sanctioned exception. """ from __future__ import annotations import inspect import types # --------------------------------------------------------------------------- # shard._connected_components # --------------------------------------------------------------------------- class TestShardConnectedComponents: def _fn(self) -> types.FunctionType: from muse.cli.commands import shard as shard_module return shard_module._connected_components def test_structural_no_inline_dfs(self) -> None: """_connected_components must not use an inline stack-based DFS after migration.""" src = inspect.getsource(self._fn()) assert "while stack:" not in src and "stack.pop()" not in src, ( "shard._connected_components still uses an inline DFS stack — " "migrate to walk_dag(order='dfs')" ) def test_single_node_single_component(self) -> None: result = self._fn()(["a"], []) assert len(result) == 1 assert result[0] == frozenset({"a"}) def test_two_connected_nodes_one_component(self) -> None: result = self._fn()(["a", "b"], [("a", "b")]) assert len(result) == 1 assert result[0] == frozenset({"a", "b"}) def test_two_disconnected_nodes_two_components(self) -> None: result = self._fn()(["a", "b"], []) assert len(result) == 2 components = {frozenset(c) for c in result} assert frozenset({"a"}) in components assert frozenset({"b"}) in components def test_triangle_one_component(self) -> None: result = self._fn()(["a", "b", "c"], [("a", "b"), ("b", "c"), ("a", "c")]) assert len(result) == 1 assert result[0] == frozenset({"a", "b", "c"}) def test_two_disconnected_pairs(self) -> None: result = self._fn()(["a", "b", "c", "d"], [("a", "b"), ("c", "d")]) assert len(result) == 2 components = {frozenset(c) for c in result} assert frozenset({"a", "b"}) in components assert frozenset({"c", "d"}) in components def test_edges_are_undirected(self) -> None: """Edge (a, b) means both a→b and b→a for connectivity.""" result = self._fn()(["a", "b"], [("a", "b")]) assert len(result) == 1 assert "a" in result[0] and "b" in result[0] def test_empty_input(self) -> None: assert self._fn()([], []) == [] def test_no_node_duplicated_across_components(self) -> None: """Every node appears in exactly one component.""" files = ["a", "b", "c", "d", "e"] edges = [("a", "b"), ("c", "d")] result = self._fn()(files, edges) all_nodes = [n for comp in result for n in comp] assert len(all_nodes) == len(set(all_nodes)) assert set(all_nodes) == set(files) # --------------------------------------------------------------------------- # identity/dag._DAG.would_cycle # --------------------------------------------------------------------------- class TestDagWouldCycle: def _cls(self) -> type: from muse.plugins.identity import dag as dag_module return dag_module._DAG def test_structural_no_inline_dfs(self) -> None: """_DAG.would_cycle must not use an inline DFS stack after migration.""" src = inspect.getsource(self._cls().would_cycle) assert "while stack:" not in src and "stack.pop()" not in src, ( "identity/_DAG.would_cycle still uses an inline DFS stack — " "migrate to walk_dag(order='dfs')" ) def test_self_loop_is_cycle(self) -> None: dag = self._cls()() assert dag.would_cycle("a", "a") is True def test_no_edges_no_cycle(self) -> None: dag = self._cls()() assert dag.would_cycle("a", "b") is False def test_direct_forward_edge_no_cycle(self) -> None: """a→b exists; adding a→b again is fine — no back-edge to a.""" dag = self._cls()() dag.add_edge("a", "b") assert dag.would_cycle("a", "b") is False def test_back_edge_is_cycle(self) -> None: """a→b exists; b→a would create a cycle.""" dag = self._cls()() dag.add_edge("a", "b") assert dag.would_cycle("b", "a") is True def test_transitive_cycle(self) -> None: """a→b→c exists; c→a would create a transitive cycle.""" dag = self._cls()() dag.add_edge("a", "b") dag.add_edge("b", "c") assert dag.would_cycle("c", "a") is True def test_no_transitive_cycle(self) -> None: """a→b→c exists; d→a is fine (d is not reachable from a).""" dag = self._cls()() dag.add_edge("a", "b") dag.add_edge("b", "c") assert dag.would_cycle("d", "a") is False def test_diamond_cycle_detection(self) -> None: """a→b, a→c, b→d, c→d; d→a would cycle.""" dag = self._cls()() dag.add_edge("a", "b") dag.add_edge("a", "c") dag.add_edge("b", "d") dag.add_edge("c", "d") assert dag.would_cycle("d", "a") is True def test_unrelated_nodes_no_cycle(self) -> None: dag = self._cls()() dag.add_edge("x", "y") assert dag.would_cycle("a", "b") is False