gabriel / muse public

test_callgraph.py file-level

at sha256:d · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Tests for muse/plugins/code/_callgraph.py.
2
3 Coverage
4 --------
5 build_forward_graph(root, manifest)
6 - Empty manifest β†’ empty graph.
7 - Single Python file with function calls β†’ callee names recorded.
8 - Nested calls (function calling another function).
9 - Non-Python files are skipped.
10 - Syntax error files are skipped gracefully.
11 - Method calls are recorded for class methods.
12
13 build_reverse_graph(root, manifest)
14 - Empty manifest β†’ empty graph.
15 - Single caller β†’ single callee reverse mapping.
16 - Multiple callers of same function β†’ all listed.
17 - Sorted output (deterministic).
18
19 transitive_callers(start_name, reverse, max_depth)
20 - Direct caller only (depth 1).
21 - Transitive callers at depth 2.
22 - No callers β†’ empty dict.
23 - Self-recursive function β†’ terminates (no infinite loop).
24 - Multi-hop chain a→b→c→d.
25 - max_depth limits traversal.
26 - Return type is dict[int, list[str]] β€” depth-keyed.
27 """
28
29 import hashlib
30 import pathlib
31 import textwrap
32
33 import pytest
34
35 from muse.plugins.code._callgraph import (
36 build_forward_graph,
37 build_reverse_graph,
38 transitive_callers,
39 )
40 from muse.core.object_store import write_object
41 from muse.core.types import Manifest, blob_id
42
43 type _SourceFiles = dict[str, str]
44 type _ReverseGraph = dict[str, list[str]]
45
46
47 # ---------------------------------------------------------------------------
48 # Helpers
49 # ---------------------------------------------------------------------------
50
51
52 def _write_snapshot(
53 tmp_path: pathlib.Path,
54 files: _SourceFiles,
55 ) -> Manifest:
56 """Write source files to the object store and return a manifest dict."""
57 manifest: Manifest = {}
58 for rel_path, source in files.items():
59 blob = source.encode()
60 oid = blob_id(blob)
61 write_object(tmp_path, oid, blob)
62 manifest[rel_path] = oid
63 return manifest
64
65
66 # ---------------------------------------------------------------------------
67 # build_forward_graph
68 # ---------------------------------------------------------------------------
69
70
71 class TestBuildForwardGraph:
72 def test_empty_manifest(self, tmp_path: pathlib.Path) -> None:
73 graph = build_forward_graph(tmp_path, {})
74 assert graph == {}
75
76 def test_single_function_no_calls(self, tmp_path: pathlib.Path) -> None:
77 src = textwrap.dedent("""\
78 def standalone():
79 return 42
80 """)
81 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
82 graph = build_forward_graph(tmp_path, manifest)
83 if "src/a.py::standalone" in graph:
84 assert graph["src/a.py::standalone"] == frozenset()
85
86 def test_function_calls_another(self, tmp_path: pathlib.Path) -> None:
87 src = textwrap.dedent("""\
88 def helper():
89 return 1
90
91 def caller():
92 return helper()
93 """)
94 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
95 graph = build_forward_graph(tmp_path, manifest)
96 caller_key = "src/a.py::caller"
97 assert caller_key in graph
98 assert "helper" in graph[caller_key]
99
100 def test_nested_calls(self, tmp_path: pathlib.Path) -> None:
101 src = textwrap.dedent("""\
102 def a():
103 return b(c())
104
105 def b(x):
106 return x
107
108 def c():
109 return 1
110 """)
111 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
112 graph = build_forward_graph(tmp_path, manifest)
113 a_key = "src/a.py::a"
114 assert a_key in graph
115 assert "b" in graph[a_key]
116 assert "c" in graph[a_key]
117
118 def test_non_python_files_skipped(self, tmp_path: pathlib.Path) -> None:
119 go_src = "func caller() { helper() }"
120 manifest = _write_snapshot(tmp_path, {"src/main.go": go_src})
121 graph = build_forward_graph(tmp_path, manifest)
122 assert len(graph) == 0
123
124 def test_multiple_files(self, tmp_path: pathlib.Path) -> None:
125 src_a = textwrap.dedent("""\
126 def alpha():
127 return beta()
128 """)
129 src_b = textwrap.dedent("""\
130 def beta():
131 return gamma()
132
133 def gamma():
134 return 0
135 """)
136 manifest = _write_snapshot(tmp_path, {"src/a.py": src_a, "src/b.py": src_b})
137 graph = build_forward_graph(tmp_path, manifest)
138 assert "src/a.py::alpha" in graph
139 assert "beta" in graph["src/a.py::alpha"]
140 assert "src/b.py::beta" in graph
141 assert "gamma" in graph["src/b.py::beta"]
142
143 def test_syntax_error_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None:
144 bad_src = "def broken(:\n pass\n"
145 manifest = _write_snapshot(tmp_path, {"src/broken.py": bad_src})
146 graph = build_forward_graph(tmp_path, manifest)
147 assert isinstance(graph, dict)
148
149 def test_method_calls_recorded(self, tmp_path: pathlib.Path) -> None:
150 src = textwrap.dedent("""\
151 class Invoice:
152 def compute(self):
153 return self.apply_tax()
154
155 def apply_tax(self):
156 return 0.1
157 """)
158 manifest = _write_snapshot(tmp_path, {"src/a.py": src})
159 graph = build_forward_graph(tmp_path, manifest)
160 compute_key = "src/a.py::Invoice.compute"
161 if compute_key in graph:
162 assert "apply_tax" in graph[compute_key]
163
164 def test_forward_graph_returns_frozensets(self, tmp_path: pathlib.Path) -> None:
165 src = textwrap.dedent("""\
166 def f():
167 return g()
168 """)
169 manifest = _write_snapshot(tmp_path, {"a.py": src})
170 graph = build_forward_graph(tmp_path, manifest)
171 for callee_set in graph.values():
172 assert isinstance(callee_set, frozenset)
173
174
175 # ---------------------------------------------------------------------------
176 # build_reverse_graph
177 # ---------------------------------------------------------------------------
178
179
180 class TestBuildReverseGraph:
181 def test_empty_manifest(self, tmp_path: pathlib.Path) -> None:
182 rev = build_reverse_graph(tmp_path, {})
183 assert rev == {}
184
185 def test_single_caller(self, tmp_path: pathlib.Path) -> None:
186 src = textwrap.dedent("""\
187 def helper():
188 return 1
189
190 def caller():
191 return helper()
192 """)
193 manifest = _write_snapshot(tmp_path, {"a.py": src})
194 rev = build_reverse_graph(tmp_path, manifest)
195 assert "helper" in rev
196 assert any("caller" in addr for addr in rev["helper"])
197
198 def test_multiple_callers(self, tmp_path: pathlib.Path) -> None:
199 src = textwrap.dedent("""\
200 def core():
201 return 0
202
203 def a():
204 return core()
205
206 def b():
207 return core()
208
209 def c():
210 return core()
211 """)
212 manifest = _write_snapshot(tmp_path, {"a.py": src})
213 rev = build_reverse_graph(tmp_path, manifest)
214 assert "core" in rev
215 callers = rev["core"]
216 assert len(callers) >= 3
217
218 def test_reverse_graph_callers_sorted(self, tmp_path: pathlib.Path) -> None:
219 src = textwrap.dedent("""\
220 def target():
221 return 0
222
223 def a_caller():
224 return target()
225
226 def z_caller():
227 return target()
228 """)
229 manifest = _write_snapshot(tmp_path, {"a.py": src})
230 rev = build_reverse_graph(tmp_path, manifest)
231 if "target" in rev:
232 callers = rev["target"]
233 assert callers == sorted(callers)
234
235 def test_reverse_graph_returns_lists(self, tmp_path: pathlib.Path) -> None:
236 src = textwrap.dedent("""\
237 def f():
238 return g()
239 def g():
240 return 0
241 """)
242 manifest = _write_snapshot(tmp_path, {"a.py": src})
243 rev = build_reverse_graph(tmp_path, manifest)
244 for callers in rev.values():
245 assert isinstance(callers, list)
246
247
248 # ---------------------------------------------------------------------------
249 # transitive_callers β€” returns dict[int, list[str]]
250 # ---------------------------------------------------------------------------
251
252
253 class TestTransitiveCallers:
254 def test_no_callers_returns_empty(self) -> None:
255 rev: _ReverseGraph = {}
256 result = transitive_callers("orphan", rev)
257 assert result == {}
258
259 def test_direct_caller_at_depth_1(self) -> None:
260 rev: _ReverseGraph = {"target": ["a.py::caller"]}
261 result = transitive_callers("target", rev)
262 assert 1 in result
263 assert "a.py::caller" in result[1]
264
265 def test_two_hop_chain(self) -> None:
266 rev: _ReverseGraph = {
267 "target": ["a.py::caller"],
268 "caller": ["a.py::grandcaller"],
269 }
270 result = transitive_callers("target", rev)
271 assert 1 in result
272 assert "a.py::caller" in result[1]
273 assert 2 in result
274 assert "a.py::grandcaller" in result[2]
275
276 def test_multi_hop_chain(self) -> None:
277 rev: _ReverseGraph = {
278 "d": ["a.py::c"],
279 "c": ["a.py::b"],
280 "b": ["a.py::a"],
281 }
282 result = transitive_callers("d", rev)
283 assert 1 in result
284 assert "a.py::c" in result[1]
285 assert 2 in result
286 assert "a.py::b" in result[2]
287 assert 3 in result
288 assert "a.py::a" in result[3]
289
290 def test_no_infinite_loop_on_cycle(self) -> None:
291 rev: _ReverseGraph = {
292 "a": ["a.py::b"],
293 "b": ["a.py::a"],
294 }
295 result = transitive_callers("a", rev)
296 assert isinstance(result, dict)
297
298 def test_self_recursive_not_infinite(self) -> None:
299 rev: _ReverseGraph = {"recursive": ["a.py::recursive"]}
300 result = transitive_callers("recursive", rev)
301 assert isinstance(result, dict)
302
303 def test_max_depth_limits_traversal(self) -> None:
304 rev: _ReverseGraph = {
305 "d": ["a.py::c"],
306 "c": ["a.py::b"],
307 "b": ["a.py::a"],
308 }
309 result = transitive_callers("d", rev, max_depth=1)
310 assert 1 in result
311 assert 2 not in result
312 assert 3 not in result
313
314 def test_diamond_dependency_no_duplicate_addresses(self) -> None:
315 rev: _ReverseGraph = {
316 "a": ["x.py::b", "x.py::c"],
317 "b": ["x.py::d"],
318 "c": ["x.py::d"],
319 }
320 result = transitive_callers("a", rev)
321 all_addrs: list[str] = []
322 for addrs in result.values():
323 all_addrs.extend(addrs)
324 # x.py::d should appear at most once (visited set prevents duplicates).
325 assert all_addrs.count("x.py::d") <= 1
326
327 def test_multiple_direct_callers_all_at_depth_1(self) -> None:
328 rev: _ReverseGraph = {
329 "target": ["a.py::f1", "a.py::f2", "a.py::f3"],
330 }
331 result = transitive_callers("target", rev)
332 assert 1 in result
333 assert set(result[1]) == {"a.py::f1", "a.py::f2", "a.py::f3"}
334
335 def test_return_type_is_depth_to_list(self) -> None:
336 rev: _ReverseGraph = {"t": ["a.py::f"]}
337 result = transitive_callers("t", rev)
338 for depth, addrs in result.items():
339 assert isinstance(depth, int)
340 assert isinstance(addrs, list)