gabriel / muse public
test_framework_plugins.py python
873 lines 31.2 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 7 days ago
1 """Tests for muse/plugins/code/_framework.py.
2
3 Coverage
4 --------
5 ImplicitEntryEdge
6 - Immutable dataclass (frozen=True).
7 - Equality comparison uses all fields.
8
9 FrameworkPlugin protocol
10 - All three built-in plugins satisfy the Protocol at runtime.
11 - _CustomPatternPlugin satisfies the Protocol at runtime.
12
13 _FastAPIPlugin.detect_entry_points
14 - @router.get / .post / .put / .delete / .patch recognised.
15 - @router.head / .options / .trace recognised.
16 - Path extracted from first positional argument.
17 - Method stored in UPPER CASE.
18 - No-path decorator still produces an edge (path="").
19 - Non-HTTP attribute (@router.dependency) ignored.
20 - Non-Python source → empty list.
21 - Syntax error → empty list.
22 - Symbol not in sym_tree → no edge emitted.
23 - Class method decorated with @router.get → edge with qualified address.
24
25 _FlaskPlugin.detect_entry_points
26 - @app.route → kind "http-route", methods from kwarg.
27 - @app.route with no methods kwarg → defaults to ["GET"].
28 - @app.before_request (bare) → kind "lifecycle-hook".
29 - @app.before_request() (called) → kind "lifecycle-hook".
30 - @app.errorhandler(404) → kind "error-handler".
31 - Non-Flask attribute ignored.
32
33 _CeleryPlugin.detect_entry_points
34 - @shared_task (bare Name) → kind "task".
35 - @shared_task() (called Name) → kind "task".
36 - @app.task (bare Attribute) → kind "task".
37 - @app.task() (called Attribute) → kind "task".
38 - @app.periodic_task() → kind "periodic-task".
39 - Unrelated decorator ignored.
40
41 _CustomPatternPlugin.detect_entry_points
42 - Custom decorator name → edge with provided kind.
43 - Non-matching decorator → no edge.
44 - Only Python rules applied.
45
46 build_implicit_edge_graph
47 - Empty manifest → empty graph.
48 - Single file with FastAPI handler → address in graph.
49 - Multiple files, multiple plugins → all edges collected.
50 - Non-Python files ignored.
51 - Syntax error in file → skipped gracefully.
52 - auto_detect=False → empty graph.
53 - disabled_plugins excludes matching plugin.
54 - Custom rule via FrameworkConfig applied.
55
56 load_framework_config
57 - Missing file → FrameworkConfig defaults.
58 - Valid TOML section → fields parsed.
59 - Malformed TOML → defaults returned (no crash).
60 - Missing [framework_detection] section → defaults.
61 - disabled_plugins stored as frozenset.
62 - custom_entry_points parsed into _CustomEntryPointRule.
63
64 Integration
65 - FastAPI handler excluded from dead-code analysis.
66 - Celery task excluded from dead-code analysis.
67 """
68
69 from __future__ import annotations
70
71 import dataclasses
72 import hashlib
73 import pathlib
74 import textwrap
75
76 import pytest
77
78 from muse.core.object_store import write_object
79 from muse.core.types import Manifest, blob_id
80 from muse.plugins.code.ast_parser import SymbolTree, parse_symbols
81 from muse.core.paths import muse_dir
82 from muse.plugins.code._framework import (
83 BUILTIN_PLUGINS,
84 FrameworkConfig,
85 FrameworkPlugin,
86 ImplicitEntryEdge,
87 _CeleryPlugin,
88 _CustomEntryPointRule,
89 _CustomPatternPlugin,
90 _FastAPIPlugin,
91 _FlaskPlugin,
92 build_implicit_edge_graph,
93 load_framework_config,
94 )
95
96
97 # ---------------------------------------------------------------------------
98 # Helpers
99 # ---------------------------------------------------------------------------
100
101
102 type _SourceMap = dict[str, str]
103
104
105 def _write_snapshot(tmp_path: pathlib.Path, files: _SourceMap) -> Manifest:
106 """Write source files into the object store and return a manifest."""
107 manifest: Manifest = {}
108 for rel_path, source in files.items():
109 blob = source.encode()
110 oid = blob_id(blob)
111 write_object(tmp_path, oid, blob)
112 manifest[rel_path] = oid
113 return manifest
114
115
116 def _sym_tree(source: str, file_path: str) -> SymbolTree:
117 """Parse a source string into a SymbolTree."""
118 return parse_symbols(source.encode(), file_path)
119
120
121 # ---------------------------------------------------------------------------
122 # ImplicitEntryEdge
123 # ---------------------------------------------------------------------------
124
125
126 class TestImplicitEntryEdge:
127 def test_is_frozen(self) -> None:
128 edge = ImplicitEntryEdge(
129 framework_id="fastapi",
130 symbol_address="app/routes.py::create_item",
131 kind="http-route",
132 metadata={"method": "POST", "path": "/items"},
133 )
134 with pytest.raises((AttributeError, TypeError, dataclasses.FrozenInstanceError)):
135 setattr(edge, "framework_id", "other")
136
137 def test_equality_uses_all_fields(self) -> None:
138 e1 = ImplicitEntryEdge("fastapi", "f.py::fn", "http-route", {"method": "GET"})
139 e2 = ImplicitEntryEdge("fastapi", "f.py::fn", "http-route", {"method": "GET"})
140 e3 = ImplicitEntryEdge("fastapi", "f.py::fn", "http-route", {"method": "POST"})
141 assert e1 == e2
142 assert e1 != e3
143
144 def test_default_metadata_is_empty_dict(self) -> None:
145 edge = ImplicitEntryEdge(
146 framework_id="celery",
147 symbol_address="tasks.py::send_email",
148 kind="task",
149 )
150 assert edge.metadata == {}
151
152
153 # ---------------------------------------------------------------------------
154 # FrameworkPlugin protocol — runtime-checkable
155 # ---------------------------------------------------------------------------
156
157
158 class TestFrameworkPluginProtocol:
159 def test_builtin_plugins_satisfy_protocol(self) -> None:
160 for plugin in BUILTIN_PLUGINS:
161 assert isinstance(plugin, FrameworkPlugin), (
162 f"{type(plugin).__name__} does not satisfy FrameworkPlugin protocol"
163 )
164
165 def test_custom_pattern_plugin_satisfies_protocol(self) -> None:
166 plugin = _CustomPatternPlugin([])
167 assert isinstance(plugin, FrameworkPlugin)
168
169
170 # ---------------------------------------------------------------------------
171 # _FastAPIPlugin
172 # ---------------------------------------------------------------------------
173
174
175 class TestFastAPIPlugin:
176 plugin = _FastAPIPlugin()
177
178 def _edges(self, source: str, file_path: str = "app/routes.py") -> list[ImplicitEntryEdge]:
179 sym_tree = _sym_tree(source, file_path)
180 return self.plugin.detect_entry_points(file_path, sym_tree, source.encode())
181
182 def test_get_route(self) -> None:
183 src = textwrap.dedent("""\
184 @router.get("/items")
185 async def list_items():
186 return []
187 """)
188 edges = self._edges(src)
189 assert len(edges) == 1
190 assert edges[0].kind == "http-route"
191 assert edges[0].metadata["method"] == "GET"
192 assert edges[0].metadata["path"] == "/items"
193 assert edges[0].framework_id == "fastapi"
194
195 def test_post_route(self) -> None:
196 src = textwrap.dedent("""\
197 @router.post("/items")
198 def create_item():
199 pass
200 """)
201 edges = self._edges(src)
202 assert len(edges) == 1
203 assert edges[0].metadata["method"] == "POST"
204
205 def test_put_delete_patch_routes(self) -> None:
206 src = textwrap.dedent("""\
207 @router.put("/items/{id}")
208 def update_item(): pass
209
210 @router.delete("/items/{id}")
211 def delete_item(): pass
212
213 @router.patch("/items/{id}")
214 def patch_item(): pass
215 """)
216 edges = self._edges(src)
217 methods = {e.metadata["method"] for e in edges}
218 assert methods == {"PUT", "DELETE", "PATCH"}
219
220 def test_head_options_trace(self) -> None:
221 src = textwrap.dedent("""\
222 @router.head("/ping")
223 def ping_head(): pass
224
225 @router.options("/ping")
226 def ping_options(): pass
227
228 @router.trace("/ping")
229 def ping_trace(): pass
230 """)
231 edges = self._edges(src)
232 methods = {e.metadata["method"] for e in edges}
233 assert "HEAD" in methods
234 assert "OPTIONS" in methods
235 assert "TRACE" in methods
236
237 def test_path_extracted_from_first_arg(self) -> None:
238 src = textwrap.dedent("""\
239 @api_router.get("/api/v1/users", tags=["users"])
240 def get_users(): pass
241 """)
242 edges = self._edges(src)
243 assert edges[0].metadata["path"] == "/api/v1/users"
244
245 def test_no_path_arg_gives_empty_string(self) -> None:
246 src = textwrap.dedent("""\
247 @router.get()
248 def no_path(): pass
249 """)
250 edges = self._edges(src)
251 assert len(edges) == 1
252 assert edges[0].metadata["path"] == ""
253
254 def test_non_http_attribute_ignored(self) -> None:
255 src = textwrap.dedent("""\
256 @router.dependency
257 def get_db(): pass
258 """)
259 edges = self._edges(src)
260 assert edges == []
261
262 def test_bare_decorator_not_a_call_ignored(self) -> None:
263 # @router.get without parens — not a Call node in AST
264 src = textwrap.dedent("""\
265 @router.get
266 def index(): pass
267 """)
268 edges = self._edges(src)
269 assert edges == []
270
271 def test_syntax_error_returns_empty(self) -> None:
272 src = "def broken(:\n pass\n"
273 edges = self.plugin.detect_entry_points("f.py", {}, src.encode())
274 assert edges == []
275
276 def test_symbol_not_in_tree_returns_no_edge(self) -> None:
277 # Provide an empty sym_tree — _resolve_address will return None.
278 src = textwrap.dedent("""\
279 @router.get("/items")
280 def list_items(): pass
281 """)
282 edges = self.plugin.detect_entry_points("app/routes.py", {}, src.encode())
283 assert edges == []
284
285 def test_multiple_routes_in_one_file(self) -> None:
286 src = textwrap.dedent("""\
287 @router.get("/a")
288 def get_a(): pass
289
290 @router.post("/b")
291 def post_b(): pass
292 """)
293 edges = self._edges(src)
294 assert len(edges) == 2
295
296 def test_method_stored_uppercase(self) -> None:
297 src = textwrap.dedent("""\
298 @router.get("/x")
299 def get_x(): pass
300 """)
301 edges = self._edges(src)
302 assert edges[0].metadata["method"] == "GET"
303
304 def test_address_uses_file_path(self) -> None:
305 src = textwrap.dedent("""\
306 @router.get("/runs")
307 async def list_runs(): pass
308 """)
309 edges = self._edges(src, file_path="server/app/routers/runs.py")
310 assert edges[0].symbol_address.startswith("server/app/routers/runs.py::")
311
312
313 # ---------------------------------------------------------------------------
314 # _FlaskPlugin
315 # ---------------------------------------------------------------------------
316
317
318 class TestFlaskPlugin:
319 plugin = _FlaskPlugin()
320
321 def _edges(self, source: str, file_path: str = "app/views.py") -> list[ImplicitEntryEdge]:
322 sym_tree = _sym_tree(source, file_path)
323 return self.plugin.detect_entry_points(file_path, sym_tree, source.encode())
324
325 def test_route_with_methods_kwarg(self) -> None:
326 src = textwrap.dedent("""\
327 @app.route("/users", methods=["GET", "POST"])
328 def users(): pass
329 """)
330 edges = self._edges(src)
331 assert len(edges) == 1
332 assert edges[0].kind == "http-route"
333 assert "GET" in edges[0].metadata["method"]
334 assert "POST" in edges[0].metadata["method"]
335
336 def test_route_no_methods_defaults_to_get(self) -> None:
337 src = textwrap.dedent("""\
338 @app.route("/index")
339 def index(): pass
340 """)
341 edges = self._edges(src)
342 assert len(edges) == 1
343 assert "GET" in edges[0].metadata["method"]
344
345 def test_route_path_extracted(self) -> None:
346 src = textwrap.dedent("""\
347 @bp.route("/dashboard")
348 def dashboard(): pass
349 """)
350 edges = self._edges(src)
351 assert edges[0].metadata["path"] == "/dashboard"
352
353 def test_before_request_bare(self) -> None:
354 src = textwrap.dedent("""\
355 @app.before_request
356 def setup(): pass
357 """)
358 edges = self._edges(src)
359 assert len(edges) == 1
360 assert edges[0].kind == "lifecycle-hook"
361 assert edges[0].metadata.get("hook") == "before_request"
362
363 def test_before_request_called(self) -> None:
364 src = textwrap.dedent("""\
365 @app.before_request()
366 def setup(): pass
367 """)
368 edges = self._edges(src)
369 assert len(edges) == 1
370 assert edges[0].kind == "lifecycle-hook"
371
372 def test_after_request(self) -> None:
373 src = textwrap.dedent("""\
374 @app.after_request
375 def add_headers(response): pass
376 """)
377 edges = self._edges(src)
378 assert any(e.kind == "lifecycle-hook" for e in edges)
379
380 def test_errorhandler(self) -> None:
381 src = textwrap.dedent("""\
382 @app.errorhandler(404)
383 def not_found(e): pass
384 """)
385 edges = self._edges(src)
386 assert len(edges) == 1
387 assert edges[0].kind == "error-handler"
388
389 def test_non_flask_attribute_ignored(self) -> None:
390 src = textwrap.dedent("""\
391 @app.middleware
392 def mw(): pass
393 """)
394 edges = self._edges(src)
395 assert edges == []
396
397 def test_syntax_error_returns_empty(self) -> None:
398 src = "def bad(:\n pass\n"
399 edges = self.plugin.detect_entry_points("v.py", {}, src.encode())
400 assert edges == []
401
402 def test_framework_id_is_flask(self) -> None:
403 src = textwrap.dedent("""\
404 @app.route("/")
405 def root(): pass
406 """)
407 edges = self._edges(src)
408 assert edges[0].framework_id == "flask"
409
410
411 # ---------------------------------------------------------------------------
412 # _CeleryPlugin
413 # ---------------------------------------------------------------------------
414
415
416 class TestCeleryPlugin:
417 plugin = _CeleryPlugin()
418
419 def _edges(self, source: str, file_path: str = "tasks/email.py") -> list[ImplicitEntryEdge]:
420 sym_tree = _sym_tree(source, file_path)
421 return self.plugin.detect_entry_points(file_path, sym_tree, source.encode())
422
423 def test_shared_task_bare_name(self) -> None:
424 src = textwrap.dedent("""\
425 @shared_task
426 def send_email(): pass
427 """)
428 edges = self._edges(src)
429 assert len(edges) == 1
430 assert edges[0].kind == "task"
431 assert edges[0].framework_id == "celery"
432
433 def test_shared_task_called(self) -> None:
434 src = textwrap.dedent("""\
435 @shared_task(bind=True)
436 def send_email(self): pass
437 """)
438 edges = self._edges(src)
439 assert len(edges) == 1
440 assert edges[0].kind == "task"
441
442 def test_app_task_bare_attribute(self) -> None:
443 src = textwrap.dedent("""\
444 @celery.task
445 def process(): pass
446 """)
447 edges = self._edges(src)
448 assert len(edges) == 1
449 assert edges[0].kind == "task"
450
451 def test_app_task_called_attribute(self) -> None:
452 src = textwrap.dedent("""\
453 @app.task(queue="high")
454 def urgent(): pass
455 """)
456 edges = self._edges(src)
457 assert len(edges) == 1
458 assert edges[0].kind == "task"
459
460 def test_periodic_task_called(self) -> None:
461 src = textwrap.dedent("""\
462 @app.periodic_task(run_every=60)
463 def heartbeat(): pass
464 """)
465 edges = self._edges(src)
466 assert len(edges) == 1
467 assert edges[0].kind == "periodic-task"
468
469 def test_periodic_task_bare(self) -> None:
470 src = textwrap.dedent("""\
471 @app.periodic_task
472 def tick(): pass
473 """)
474 edges = self._edges(src)
475 assert len(edges) == 1
476 assert edges[0].kind == "periodic-task"
477
478 def test_unrelated_decorator_ignored(self) -> None:
479 src = textwrap.dedent("""\
480 @login_required
481 def secure(): pass
482 """)
483 edges = self._edges(src)
484 assert edges == []
485
486 def test_syntax_error_returns_empty(self) -> None:
487 edges = self.plugin.detect_entry_points("t.py", {}, b"def broken(:\n pass\n")
488 assert edges == []
489
490 def test_framework_id_is_celery(self) -> None:
491 src = textwrap.dedent("""\
492 @shared_task
493 def work(): pass
494 """)
495 edges = self._edges(src)
496 assert edges[0].framework_id == "celery"
497
498
499 # ---------------------------------------------------------------------------
500 # _CustomPatternPlugin
501 # ---------------------------------------------------------------------------
502
503
504 class TestCustomPatternPlugin:
505 def _make_plugin(self, rules: list[_CustomEntryPointRule]) -> _CustomPatternPlugin:
506 return _CustomPatternPlugin(rules)
507
508 def test_custom_decorator_produces_edge(self) -> None:
509 rule = _CustomEntryPointRule(
510 language="Python",
511 kind="rpc-handler",
512 decorator_names=["rpc_handler"],
513 )
514 plugin = self._make_plugin([rule])
515 src = textwrap.dedent("""\
516 @rpc_handler
517 def handle_request(): pass
518 """)
519 sym_tree = _sym_tree(src, "svc/rpc.py")
520 edges = plugin.detect_entry_points("svc/rpc.py", sym_tree, src.encode())
521 assert len(edges) == 1
522 assert edges[0].kind == "rpc-handler"
523 assert edges[0].framework_id == "custom"
524
525 def test_called_form_also_recognised(self) -> None:
526 rule = _CustomEntryPointRule(
527 language="Python",
528 kind="webhook",
529 decorator_names=["webhook_handler"],
530 )
531 plugin = self._make_plugin([rule])
532 src = textwrap.dedent("""\
533 @webhook_handler(event="push")
534 def on_push(): pass
535 """)
536 sym_tree = _sym_tree(src, "hooks.py")
537 edges = plugin.detect_entry_points("hooks.py", sym_tree, src.encode())
538 assert len(edges) == 1
539 assert edges[0].kind == "webhook"
540
541 def test_non_matching_decorator_ignored(self) -> None:
542 rule = _CustomEntryPointRule(
543 language="Python",
544 kind="grpc",
545 decorator_names=["grpc_handler"],
546 )
547 plugin = self._make_plugin([rule])
548 src = textwrap.dedent("""\
549 @login_required
550 def view(): pass
551 """)
552 sym_tree = _sym_tree(src, "views.py")
553 edges = plugin.detect_entry_points("views.py", sym_tree, src.encode())
554 assert edges == []
555
556 def test_non_python_rules_excluded(self) -> None:
557 rule = _CustomEntryPointRule(
558 language="TypeScript",
559 kind="ts-handler",
560 decorator_names=["Handler"],
561 )
562 plugin = self._make_plugin([rule])
563 # Plugin should have no Python rules, so no edges.
564 src = textwrap.dedent("""\
565 @Handler
566 def fn(): pass
567 """)
568 sym_tree = _sym_tree(src, "f.py")
569 edges = plugin.detect_entry_points("f.py", sym_tree, src.encode())
570 assert edges == []
571
572 def test_empty_rules_returns_empty(self) -> None:
573 plugin = self._make_plugin([])
574 src = textwrap.dedent("""\
575 @anything
576 def fn(): pass
577 """)
578 sym_tree = _sym_tree(src, "f.py")
579 edges = plugin.detect_entry_points("f.py", sym_tree, src.encode())
580 assert edges == []
581
582 def test_metadata_includes_decorator_name(self) -> None:
583 rule = _CustomEntryPointRule(
584 language="Python",
585 kind="job",
586 decorator_names=["schedule_job"],
587 )
588 plugin = self._make_plugin([rule])
589 src = textwrap.dedent("""\
590 @schedule_job
591 def nightly(): pass
592 """)
593 sym_tree = _sym_tree(src, "jobs.py")
594 edges = plugin.detect_entry_points("jobs.py", sym_tree, src.encode())
595 assert edges[0].metadata.get("decorator") == "schedule_job"
596
597
598 # ---------------------------------------------------------------------------
599 # build_implicit_edge_graph
600 # ---------------------------------------------------------------------------
601
602
603 class TestBuildImplicitEdgeGraph:
604 def test_empty_manifest_returns_empty(self, tmp_path: pathlib.Path) -> None:
605 config = FrameworkConfig()
606 result = build_implicit_edge_graph(tmp_path, {}, config=config)
607 assert result == {}
608
609 def test_fastapi_handler_detected(self, tmp_path: pathlib.Path) -> None:
610 src = textwrap.dedent("""\
611 @router.get("/runs")
612 async def list_runs(): pass
613 """)
614 manifest = _write_snapshot(tmp_path, {"server/routers/runs.py": src})
615 config = FrameworkConfig()
616 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
617 assert any("list_runs" in addr for addr in result)
618
619 def test_celery_task_detected(self, tmp_path: pathlib.Path) -> None:
620 src = textwrap.dedent("""\
621 @shared_task
622 def send_notification(): pass
623 """)
624 manifest = _write_snapshot(tmp_path, {"tasks/notify.py": src})
625 config = FrameworkConfig()
626 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
627 assert any("send_notification" in addr for addr in result)
628
629 def test_flask_handler_detected(self, tmp_path: pathlib.Path) -> None:
630 src = textwrap.dedent("""\
631 @app.route("/dashboard")
632 def dashboard(): pass
633 """)
634 manifest = _write_snapshot(tmp_path, {"web/views.py": src})
635 config = FrameworkConfig()
636 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
637 assert any("dashboard" in addr for addr in result)
638
639 def test_non_python_file_ignored(self, tmp_path: pathlib.Path) -> None:
640 go_src = "func handler() {}\n"
641 manifest = _write_snapshot(tmp_path, {"main.go": go_src})
642 config = FrameworkConfig()
643 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
644 assert result == {}
645
646 def test_syntax_error_file_skipped_gracefully(self, tmp_path: pathlib.Path) -> None:
647 bad_src = "def broken(:\n pass\n"
648 manifest = _write_snapshot(tmp_path, {"bad.py": bad_src})
649 config = FrameworkConfig()
650 # Must not raise.
651 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
652 assert isinstance(result, dict)
653
654 def test_auto_detect_false_returns_empty(self, tmp_path: pathlib.Path) -> None:
655 src = textwrap.dedent("""\
656 @router.get("/x")
657 def get_x(): pass
658 """)
659 manifest = _write_snapshot(tmp_path, {"routes.py": src})
660 config = FrameworkConfig(auto_detect=False)
661 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
662 assert result == {}
663
664 def test_disabled_plugins_excluded(self, tmp_path: pathlib.Path) -> None:
665 src = textwrap.dedent("""\
666 @router.get("/items")
667 async def list_items(): pass
668 """)
669 manifest = _write_snapshot(tmp_path, {"routes.py": src})
670 # Disable FastAPI; FlaskPlugin and CeleryPlugin won't match this source.
671 config = FrameworkConfig(disabled_plugins=frozenset({"fastapi"}))
672 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
673 # No edge should appear since fastapi is the only matching plugin.
674 assert not any("list_items" in addr for addr in result)
675
676 def test_multiple_files_multiple_plugins(self, tmp_path: pathlib.Path) -> None:
677 fastapi_src = textwrap.dedent("""\
678 @router.post("/orders")
679 def create_order(): pass
680 """)
681 celery_src = textwrap.dedent("""\
682 @shared_task
683 def process_order(): pass
684 """)
685 manifest = _write_snapshot(tmp_path, {
686 "api/routes.py": fastapi_src,
687 "workers/tasks.py": celery_src,
688 })
689 config = FrameworkConfig()
690 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
691 addrs = set(result.keys())
692 assert any("create_order" in a for a in addrs)
693 assert any("process_order" in a for a in addrs)
694
695 def test_custom_rule_in_config(self, tmp_path: pathlib.Path) -> None:
696 src = textwrap.dedent("""\
697 @grpc_handler
698 def serve_request(): pass
699 """)
700 manifest = _write_snapshot(tmp_path, {"svc/grpc.py": src})
701 rule = _CustomEntryPointRule(
702 language="Python", kind="grpc", decorator_names=["grpc_handler"]
703 )
704 config = FrameworkConfig(custom_entry_points=[rule])
705 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
706 assert any("serve_request" in addr for addr in result)
707 # Kind should be grpc
708 edges = [e for edges in result.values() for e in edges]
709 assert any(e.kind == "grpc" for e in edges)
710
711 def test_all_disabled_returns_empty(self, tmp_path: pathlib.Path) -> None:
712 src = textwrap.dedent("""\
713 @router.get("/x")
714 async def get_x(): pass
715 """)
716 manifest = _write_snapshot(tmp_path, {"r.py": src})
717 config = FrameworkConfig(disabled_plugins=frozenset({"fastapi", "flask", "celery"}))
718 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
719 assert result == {}
720
721 def test_graph_maps_address_to_edges(self, tmp_path: pathlib.Path) -> None:
722 src = textwrap.dedent("""\
723 @router.get("/x")
724 async def get_x(): pass
725 """)
726 manifest = _write_snapshot(tmp_path, {"routes.py": src})
727 config = FrameworkConfig()
728 result = build_implicit_edge_graph(tmp_path, manifest, config=config)
729 for addr, edges in result.items():
730 assert isinstance(addr, str)
731 assert isinstance(edges, list)
732 for e in edges:
733 assert isinstance(e, ImplicitEntryEdge)
734
735
736 # ---------------------------------------------------------------------------
737 # load_framework_config
738 # ---------------------------------------------------------------------------
739
740
741 class TestLoadFrameworkConfig:
742 def test_missing_file_returns_defaults(self, tmp_path: pathlib.Path) -> None:
743 config = load_framework_config(tmp_path)
744 assert config.auto_detect is True
745 assert config.disabled_plugins == frozenset()
746 assert config.custom_entry_points == []
747
748 def test_valid_toml_parsed(self, tmp_path: pathlib.Path) -> None:
749 dot_muse = muse_dir(tmp_path)
750 dot_muse.mkdir()
751 toml_content = textwrap.dedent("""\
752 [framework_detection]
753 auto_detect = true
754 disabled_plugins = ["celery"]
755
756 [[framework_detection.custom_entry_points]]
757 language = "Python"
758 kind = "grpc"
759 decorator_names = ["grpc_method"]
760 """)
761 (dot_muse / "code_config.toml").write_text(toml_content)
762 config = load_framework_config(tmp_path)
763 assert config.auto_detect is True
764 assert "celery" in config.disabled_plugins
765 assert len(config.custom_entry_points) == 1
766 assert config.custom_entry_points[0].kind == "grpc"
767 assert "grpc_method" in config.custom_entry_points[0].decorator_names
768
769 def test_auto_detect_false_parsed(self, tmp_path: pathlib.Path) -> None:
770 dot_muse = muse_dir(tmp_path)
771 dot_muse.mkdir()
772 (dot_muse / "code_config.toml").write_text(
773 "[framework_detection]\nauto_detect = false\n"
774 )
775 config = load_framework_config(tmp_path)
776 assert config.auto_detect is False
777
778 def test_missing_section_returns_defaults(self, tmp_path: pathlib.Path) -> None:
779 dot_muse = muse_dir(tmp_path)
780 dot_muse.mkdir()
781 (dot_muse / "code_config.toml").write_text("[other_section]\nkey = 1\n")
782 config = load_framework_config(tmp_path)
783 assert config.auto_detect is True
784 assert config.disabled_plugins == frozenset()
785
786 def test_malformed_toml_returns_defaults(self, tmp_path: pathlib.Path) -> None:
787 dot_muse = muse_dir(tmp_path)
788 dot_muse.mkdir()
789 (dot_muse / "code_config.toml").write_bytes(b"\xff\xfe invalid toml !!!")
790 # Must not raise — returns defaults.
791 config = load_framework_config(tmp_path)
792 assert isinstance(config, FrameworkConfig)
793
794 def test_disabled_plugins_stored_as_frozenset(self, tmp_path: pathlib.Path) -> None:
795 dot_muse = muse_dir(tmp_path)
796 dot_muse.mkdir()
797 (dot_muse / "code_config.toml").write_text(
798 '[framework_detection]\ndisabled_plugins = ["flask", "celery"]\n'
799 )
800 config = load_framework_config(tmp_path)
801 assert isinstance(config.disabled_plugins, frozenset)
802 assert "flask" in config.disabled_plugins
803 assert "celery" in config.disabled_plugins
804
805 def test_multiple_custom_rules(self, tmp_path: pathlib.Path) -> None:
806 dot_muse = muse_dir(tmp_path)
807 dot_muse.mkdir()
808 toml = textwrap.dedent("""\
809 [framework_detection]
810
811 [[framework_detection.custom_entry_points]]
812 language = "Python"
813 kind = "grpc"
814 decorator_names = ["grpc_method"]
815
816 [[framework_detection.custom_entry_points]]
817 language = "Python"
818 kind = "webhook"
819 decorator_names = ["webhook_handler", "on_event"]
820 """)
821 (dot_muse / "code_config.toml").write_text(toml)
822 config = load_framework_config(tmp_path)
823 assert len(config.custom_entry_points) == 2
824 kinds = {r.kind for r in config.custom_entry_points}
825 assert kinds == {"grpc", "webhook"}
826
827
828 # ---------------------------------------------------------------------------
829 # Integration — dead-code analysis excludes entry points
830 # ---------------------------------------------------------------------------
831
832
833 class TestDeadCodeIntegration:
834 """Verify that framework-wired symbols are not reported as dead code."""
835
836 def test_fastapi_handler_excluded_from_dead(self, tmp_path: pathlib.Path) -> None:
837 """A FastAPI route handler with no explicit callers is not dead code."""
838 from muse.plugins.code._framework import build_implicit_edge_graph
839
840 src = textwrap.dedent("""\
841 @router.get("/items")
842 async def list_items(): pass
843 """)
844 manifest = _write_snapshot(tmp_path, {"api/routes.py": src})
845 implicit = build_implicit_edge_graph(tmp_path, manifest)
846 entry_point_addresses = frozenset(implicit.keys())
847 # The handler must be recognised as an entry point.
848 assert any("list_items" in addr for addr in entry_point_addresses)
849
850 def test_celery_task_excluded_from_dead(self, tmp_path: pathlib.Path) -> None:
851 from muse.plugins.code._framework import build_implicit_edge_graph
852
853 src = textwrap.dedent("""\
854 @shared_task
855 def send_report(): pass
856 """)
857 manifest = _write_snapshot(tmp_path, {"tasks/report.py": src})
858 implicit = build_implicit_edge_graph(tmp_path, manifest)
859 entry_point_addresses = frozenset(implicit.keys())
860 assert any("send_report" in addr for addr in entry_point_addresses)
861
862 def test_plain_function_not_mistaken_for_entry_point(
863 self, tmp_path: pathlib.Path
864 ) -> None:
865 from muse.plugins.code._framework import build_implicit_edge_graph
866
867 src = textwrap.dedent("""\
868 def compute_total(items): pass
869 """)
870 manifest = _write_snapshot(tmp_path, {"billing.py": src})
871 implicit = build_implicit_edge_graph(tmp_path, manifest)
872 entry_point_addresses = frozenset(implicit.keys())
873 assert not any("compute_total" in addr for addr in entry_point_addresses)
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 7 days ago