test_callgraph.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
5 days ago
| 1 | """Tests for muse/plugins/code/_callgraph.py. |
| 2 | |
| 3 | Coverage |
| 4 | -------- |
| 5 | build_forward_graph(root, manifest) |
| 6 | - Empty manifest → empty graph. |
| 7 | - Single Python file with function calls → callee names recorded. |
| 8 | - Nested calls (function calling another function). |
| 9 | - Non-Python files are skipped. |
| 10 | - Syntax error files are skipped gracefully. |
| 11 | - Method calls are recorded for class methods. |
| 12 | |
| 13 | build_reverse_graph(root, manifest) |
| 14 | - Empty manifest → empty graph. |
| 15 | - Single caller → single callee reverse mapping. |
| 16 | - Multiple callers of same function → all listed. |
| 17 | - Sorted output (deterministic). |
| 18 | |
| 19 | transitive_callers(start_name, reverse, max_depth) |
| 20 | - Direct caller only (depth 1). |
| 21 | - Transitive callers at depth 2. |
| 22 | - No callers → empty dict. |
| 23 | - Self-recursive function → terminates (no infinite loop). |
| 24 | - Multi-hop chain a→b→c→d. |
| 25 | - max_depth limits traversal. |
| 26 | - Return type is dict[int, list[str]] — depth-keyed. |
| 27 | """ |
| 28 | |
| 29 | import hashlib |
| 30 | import pathlib |
| 31 | import textwrap |
| 32 | |
| 33 | import pytest |
| 34 | |
| 35 | from muse.plugins.code._callgraph import ( |
| 36 | build_forward_graph, |
| 37 | build_reverse_graph, |
| 38 | transitive_callers, |
| 39 | ) |
| 40 | from muse.core.object_store import write_object |
| 41 | from muse.core.types import Manifest, blob_id |
| 42 | |
| 43 | type _SourceFiles = dict[str, str] |
| 44 | type _ReverseGraph = dict[str, list[str]] |
| 45 | |
| 46 | |
| 47 | # --------------------------------------------------------------------------- |
| 48 | # Helpers |
| 49 | # --------------------------------------------------------------------------- |
| 50 | |
| 51 | |
| 52 | def _write_snapshot( |
| 53 | tmp_path: pathlib.Path, |
| 54 | files: _SourceFiles, |
| 55 | ) -> Manifest: |
| 56 | """Write source files to the object store and return a manifest dict.""" |
| 57 | manifest: Manifest = {} |
| 58 | for rel_path, source in files.items(): |
| 59 | blob = source.encode() |
| 60 | oid = blob_id(blob) |
| 61 | write_object(tmp_path, oid, blob) |
| 62 | manifest[rel_path] = oid |
| 63 | return manifest |
| 64 | |
| 65 | |
| 66 | # --------------------------------------------------------------------------- |
| 67 | # build_forward_graph |
| 68 | # --------------------------------------------------------------------------- |
| 69 | |
| 70 | |
| 71 | class TestBuildForwardGraph: |
| 72 | def test_empty_manifest(self, tmp_path: pathlib.Path) -> None: |
| 73 | graph = build_forward_graph(tmp_path, {}) |
| 74 | assert graph == {} |
| 75 | |
| 76 | def test_single_function_no_calls(self, tmp_path: pathlib.Path) -> None: |
| 77 | src = textwrap.dedent("""\ |
| 78 | def standalone(): |
| 79 | return 42 |
| 80 | """) |
| 81 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 82 | graph = build_forward_graph(tmp_path, manifest) |
| 83 | if "src/a.py::standalone" in graph: |
| 84 | assert graph["src/a.py::standalone"] == frozenset() |
| 85 | |
| 86 | def test_function_calls_another(self, tmp_path: pathlib.Path) -> None: |
| 87 | src = textwrap.dedent("""\ |
| 88 | def helper(): |
| 89 | return 1 |
| 90 | |
| 91 | def caller(): |
| 92 | return helper() |
| 93 | """) |
| 94 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 95 | graph = build_forward_graph(tmp_path, manifest) |
| 96 | caller_key = "src/a.py::caller" |
| 97 | assert caller_key in graph |
| 98 | assert "helper" in graph[caller_key] |
| 99 | |
| 100 | def test_nested_calls(self, tmp_path: pathlib.Path) -> None: |
| 101 | src = textwrap.dedent("""\ |
| 102 | def a(): |
| 103 | return b(c()) |
| 104 | |
| 105 | def b(x): |
| 106 | return x |
| 107 | |
| 108 | def c(): |
| 109 | return 1 |
| 110 | """) |
| 111 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 112 | graph = build_forward_graph(tmp_path, manifest) |
| 113 | a_key = "src/a.py::a" |
| 114 | assert a_key in graph |
| 115 | assert "b" in graph[a_key] |
| 116 | assert "c" in graph[a_key] |
| 117 | |
| 118 | def test_non_python_files_skipped(self, tmp_path: pathlib.Path) -> None: |
| 119 | go_src = "func caller() { helper() }" |
| 120 | manifest = _write_snapshot(tmp_path, {"src/main.go": go_src}) |
| 121 | graph = build_forward_graph(tmp_path, manifest) |
| 122 | assert len(graph) == 0 |
| 123 | |
| 124 | def test_multiple_files(self, tmp_path: pathlib.Path) -> None: |
| 125 | src_a = textwrap.dedent("""\ |
| 126 | def alpha(): |
| 127 | return beta() |
| 128 | """) |
| 129 | src_b = textwrap.dedent("""\ |
| 130 | def beta(): |
| 131 | return gamma() |
| 132 | |
| 133 | def gamma(): |
| 134 | return 0 |
| 135 | """) |
| 136 | manifest = _write_snapshot(tmp_path, {"src/a.py": src_a, "src/b.py": src_b}) |
| 137 | graph = build_forward_graph(tmp_path, manifest) |
| 138 | assert "src/a.py::alpha" in graph |
| 139 | assert "beta" in graph["src/a.py::alpha"] |
| 140 | assert "src/b.py::beta" in graph |
| 141 | assert "gamma" in graph["src/b.py::beta"] |
| 142 | |
| 143 | def test_syntax_error_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None: |
| 144 | bad_src = "def broken(:\n pass\n" |
| 145 | manifest = _write_snapshot(tmp_path, {"src/broken.py": bad_src}) |
| 146 | graph = build_forward_graph(tmp_path, manifest) |
| 147 | assert isinstance(graph, dict) |
| 148 | |
| 149 | def test_method_calls_recorded(self, tmp_path: pathlib.Path) -> None: |
| 150 | src = textwrap.dedent("""\ |
| 151 | class Invoice: |
| 152 | def compute(self): |
| 153 | return self.apply_tax() |
| 154 | |
| 155 | def apply_tax(self): |
| 156 | return 0.1 |
| 157 | """) |
| 158 | manifest = _write_snapshot(tmp_path, {"src/a.py": src}) |
| 159 | graph = build_forward_graph(tmp_path, manifest) |
| 160 | compute_key = "src/a.py::Invoice.compute" |
| 161 | if compute_key in graph: |
| 162 | assert "apply_tax" in graph[compute_key] |
| 163 | |
| 164 | def test_forward_graph_returns_frozensets(self, tmp_path: pathlib.Path) -> None: |
| 165 | src = textwrap.dedent("""\ |
| 166 | def f(): |
| 167 | return g() |
| 168 | """) |
| 169 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 170 | graph = build_forward_graph(tmp_path, manifest) |
| 171 | for callee_set in graph.values(): |
| 172 | assert isinstance(callee_set, frozenset) |
| 173 | |
| 174 | |
| 175 | # --------------------------------------------------------------------------- |
| 176 | # build_reverse_graph |
| 177 | # --------------------------------------------------------------------------- |
| 178 | |
| 179 | |
| 180 | class TestBuildReverseGraph: |
| 181 | def test_empty_manifest(self, tmp_path: pathlib.Path) -> None: |
| 182 | rev = build_reverse_graph(tmp_path, {}) |
| 183 | assert rev == {} |
| 184 | |
| 185 | def test_single_caller(self, tmp_path: pathlib.Path) -> None: |
| 186 | src = textwrap.dedent("""\ |
| 187 | def helper(): |
| 188 | return 1 |
| 189 | |
| 190 | def caller(): |
| 191 | return helper() |
| 192 | """) |
| 193 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 194 | rev = build_reverse_graph(tmp_path, manifest) |
| 195 | assert "helper" in rev |
| 196 | assert any("caller" in addr for addr in rev["helper"]) |
| 197 | |
| 198 | def test_multiple_callers(self, tmp_path: pathlib.Path) -> None: |
| 199 | src = textwrap.dedent("""\ |
| 200 | def core(): |
| 201 | return 0 |
| 202 | |
| 203 | def a(): |
| 204 | return core() |
| 205 | |
| 206 | def b(): |
| 207 | return core() |
| 208 | |
| 209 | def c(): |
| 210 | return core() |
| 211 | """) |
| 212 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 213 | rev = build_reverse_graph(tmp_path, manifest) |
| 214 | assert "core" in rev |
| 215 | callers = rev["core"] |
| 216 | assert len(callers) >= 3 |
| 217 | |
| 218 | def test_reverse_graph_callers_sorted(self, tmp_path: pathlib.Path) -> None: |
| 219 | src = textwrap.dedent("""\ |
| 220 | def target(): |
| 221 | return 0 |
| 222 | |
| 223 | def a_caller(): |
| 224 | return target() |
| 225 | |
| 226 | def z_caller(): |
| 227 | return target() |
| 228 | """) |
| 229 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 230 | rev = build_reverse_graph(tmp_path, manifest) |
| 231 | if "target" in rev: |
| 232 | callers = rev["target"] |
| 233 | assert callers == sorted(callers) |
| 234 | |
| 235 | def test_reverse_graph_returns_lists(self, tmp_path: pathlib.Path) -> None: |
| 236 | src = textwrap.dedent("""\ |
| 237 | def f(): |
| 238 | return g() |
| 239 | def g(): |
| 240 | return 0 |
| 241 | """) |
| 242 | manifest = _write_snapshot(tmp_path, {"a.py": src}) |
| 243 | rev = build_reverse_graph(tmp_path, manifest) |
| 244 | for callers in rev.values(): |
| 245 | assert isinstance(callers, list) |
| 246 | |
| 247 | |
| 248 | # --------------------------------------------------------------------------- |
| 249 | # transitive_callers — returns dict[int, list[str]] |
| 250 | # --------------------------------------------------------------------------- |
| 251 | |
| 252 | |
| 253 | class TestTransitiveCallers: |
| 254 | def test_no_callers_returns_empty(self) -> None: |
| 255 | rev: _ReverseGraph = {} |
| 256 | result = transitive_callers("orphan", rev) |
| 257 | assert result == {} |
| 258 | |
| 259 | def test_direct_caller_at_depth_1(self) -> None: |
| 260 | rev: _ReverseGraph = {"target": ["a.py::caller"]} |
| 261 | result = transitive_callers("target", rev) |
| 262 | assert 1 in result |
| 263 | assert "a.py::caller" in result[1] |
| 264 | |
| 265 | def test_two_hop_chain(self) -> None: |
| 266 | rev: _ReverseGraph = { |
| 267 | "target": ["a.py::caller"], |
| 268 | "caller": ["a.py::grandcaller"], |
| 269 | } |
| 270 | result = transitive_callers("target", rev) |
| 271 | assert 1 in result |
| 272 | assert "a.py::caller" in result[1] |
| 273 | assert 2 in result |
| 274 | assert "a.py::grandcaller" in result[2] |
| 275 | |
| 276 | def test_multi_hop_chain(self) -> None: |
| 277 | rev: _ReverseGraph = { |
| 278 | "d": ["a.py::c"], |
| 279 | "c": ["a.py::b"], |
| 280 | "b": ["a.py::a"], |
| 281 | } |
| 282 | result = transitive_callers("d", rev) |
| 283 | assert 1 in result |
| 284 | assert "a.py::c" in result[1] |
| 285 | assert 2 in result |
| 286 | assert "a.py::b" in result[2] |
| 287 | assert 3 in result |
| 288 | assert "a.py::a" in result[3] |
| 289 | |
| 290 | def test_no_infinite_loop_on_cycle(self) -> None: |
| 291 | rev: _ReverseGraph = { |
| 292 | "a": ["a.py::b"], |
| 293 | "b": ["a.py::a"], |
| 294 | } |
| 295 | result = transitive_callers("a", rev) |
| 296 | assert isinstance(result, dict) |
| 297 | |
| 298 | def test_self_recursive_not_infinite(self) -> None: |
| 299 | rev: _ReverseGraph = {"recursive": ["a.py::recursive"]} |
| 300 | result = transitive_callers("recursive", rev) |
| 301 | assert isinstance(result, dict) |
| 302 | |
| 303 | def test_max_depth_limits_traversal(self) -> None: |
| 304 | rev: _ReverseGraph = { |
| 305 | "d": ["a.py::c"], |
| 306 | "c": ["a.py::b"], |
| 307 | "b": ["a.py::a"], |
| 308 | } |
| 309 | result = transitive_callers("d", rev, max_depth=1) |
| 310 | assert 1 in result |
| 311 | assert 2 not in result |
| 312 | assert 3 not in result |
| 313 | |
| 314 | def test_diamond_dependency_no_duplicate_addresses(self) -> None: |
| 315 | rev: _ReverseGraph = { |
| 316 | "a": ["x.py::b", "x.py::c"], |
| 317 | "b": ["x.py::d"], |
| 318 | "c": ["x.py::d"], |
| 319 | } |
| 320 | result = transitive_callers("a", rev) |
| 321 | all_addrs: list[str] = [] |
| 322 | for addrs in result.values(): |
| 323 | all_addrs.extend(addrs) |
| 324 | # x.py::d should appear at most once (visited set prevents duplicates). |
| 325 | assert all_addrs.count("x.py::d") <= 1 |
| 326 | |
| 327 | def test_multiple_direct_callers_all_at_depth_1(self) -> None: |
| 328 | rev: _ReverseGraph = { |
| 329 | "target": ["a.py::f1", "a.py::f2", "a.py::f3"], |
| 330 | } |
| 331 | result = transitive_callers("target", rev) |
| 332 | assert 1 in result |
| 333 | assert set(result[1]) == {"a.py::f1", "a.py::f2", "a.py::f3"} |
| 334 | |
| 335 | def test_return_type_is_depth_to_list(self) -> None: |
| 336 | rev: _ReverseGraph = {"t": ["a.py::f"]} |
| 337 | result = transitive_callers("t", rev) |
| 338 | for depth, addrs in result.items(): |
| 339 | assert isinstance(depth, int) |
| 340 | assert isinstance(addrs, list) |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
5 days ago