semantic_test_coverage.py
python
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e
chore: remove blob-debug test marker file
Sonnet 4.6
23 hours ago
| 1 | """muse code semantic-test-coverage — static symbol-level test coverage. |
| 2 | |
| 3 | Traditional coverage tools measure *which lines execute* during a test run. |
| 4 | They require an instrumented test suite, a working interpreter, and real I/O. |
| 5 | |
| 6 | This command answers a different question: |
| 7 | |
| 8 | **Which symbols are exercised by which test functions — without running any |
| 9 | tests?** |
| 10 | |
| 11 | Because Muse tracks the complete symbol graph of every snapshot, we can |
| 12 | perform static call-graph analysis to determine — at the symbol level, across |
| 13 | every production file — which test functions exercise which symbols. No test |
| 14 | runner. No instrumentation. No I/O. |
| 15 | |
| 16 | Coverage tiers |
| 17 | -------------- |
| 18 | ``--depth 1`` (default, "direct") |
| 19 | A production symbol is covered if any test function body directly calls it |
| 20 | by bare name (e.g. ``compute_total(...)`` or ``obj.compute_total(...)``). |
| 21 | This is a conservative but high-precision signal. |
| 22 | |
| 23 | ``--depth N / --transitive`` (N > 1) |
| 24 | Extends direct coverage by following the production call graph up to N−1 |
| 25 | additional hops. If ``test_process_order`` calls ``process_order`` and |
| 26 | ``process_order`` calls ``Invoice.compute_total``, then |
| 27 | ``Invoice.compute_total`` is covered at depth 2. |
| 28 | |
| 29 | Scope |
| 30 | ----- |
| 31 | Production symbols |
| 32 | Every non-import symbol in a non-test file. Classes, functions, methods, |
| 33 | async functions, and async methods are all included. |
| 34 | |
| 35 | Test functions |
| 36 | Every ``test_``-prefixed function in a test file, including methods inside |
| 37 | ``Test*`` classes. All top-level functions in ``conftest.py`` (fixtures) |
| 38 | are included because they often call production code directly. |
| 39 | |
| 40 | Usage:: |
| 41 | |
| 42 | muse code semantic-test-coverage |
| 43 | muse code semantic-test-coverage --file billing.py |
| 44 | muse code semantic-test-coverage --kind function |
| 45 | muse code semantic-test-coverage --uncovered-only |
| 46 | muse code semantic-test-coverage --show-tests |
| 47 | muse code semantic-test-coverage --transitive --depth 2 |
| 48 | muse code semantic-test-coverage --min-coverage 80 |
| 49 | muse code semantic-test-coverage --json |
| 50 | |
| 51 | Output:: |
| 52 | |
| 53 | Semantic test coverage — HEAD (378 symbols · 47 test functions) |
| 54 | |
| 55 | billing.py |
| 56 | [████████████████░░░░] 80.0% (4/5) |
| 57 | ✅ compute_total meth |
| 58 | ✅ apply_discount meth |
| 59 | ✅ process_order func |
| 60 | ✅ Invoice clas |
| 61 | ❌ generate_pdf meth |
| 62 | |
| 63 | ──────────────────────────────────────────────────────────────── |
| 64 | TOTAL: 285/378 symbols covered (75.4%) |
| 65 | |
| 66 | JSON output (``--json``):: |
| 67 | |
| 68 | { |
| 69 | "ref": "HEAD", |
| 70 | "snapshot_id": "a3f2c9e1ab2c", |
| 71 | "depth": 1, |
| 72 | "transitive": false, |
| 73 | "filters": { "file": null, "kind": null, "min_coverage": null, |
| 74 | "uncovered_only": false }, |
| 75 | "summary": { |
| 76 | "total_symbols": 378, |
| 77 | "covered_symbols": 285, |
| 78 | "uncovered_symbols": 93, |
| 79 | "coverage_pct": 75.4, |
| 80 | "total_test_functions": 47, |
| 81 | "total_production_files": 28 |
| 82 | }, |
| 83 | "files": [ |
| 84 | { |
| 85 | "file": "billing.py", |
| 86 | "total_symbols": 5, |
| 87 | "covered_symbols": 4, |
| 88 | "uncovered_symbols": 1, |
| 89 | "coverage_pct": 80.0, |
| 90 | "symbols": [ |
| 91 | { |
| 92 | "address": "billing.py::Invoice.compute_total", |
| 93 | "name": "compute_total", |
| 94 | "kind": "method", |
| 95 | "covered": true, |
| 96 | "test_functions": ["tests/test_billing.py::test_compute_total_basic"] |
| 97 | } |
| 98 | ] |
| 99 | } |
| 100 | ] |
| 101 | } |
| 102 | |
| 103 | ``--min-coverage PCT`` — CI integration:: |
| 104 | |
| 105 | # Exit 1 if any file falls below 80% semantic coverage |
| 106 | muse code semantic-test-coverage --min-coverage 80 |
| 107 | |
| 108 | Performance |
| 109 | ----------- |
| 110 | A single pass loads all Python blobs from the committed object store into |
| 111 | memory. AST parsing and walking are done once per file. For transitive |
| 112 | coverage, the production call graph is built from the already-loaded blobs |
| 113 | without a second store read. Typical runtimes: |
| 114 | |
| 115 | 200 files / 1 000 symbols / 100 test functions → < 2 s |
| 116 | 1 000 files / 10 000 symbols / 500 test functions → < 10 s |
| 117 | |
| 118 | Accuracy note |
| 119 | ------------- |
| 120 | This is a *static* analysis. Dynamic dispatch, conditional imports, and |
| 121 | runtime code generation cannot be captured without execution. The analysis |
| 122 | may report: |
| 123 | |
| 124 | * False negatives: dynamically-constructed call sites (``getattr(obj, name)()``) |
| 125 | are not detected. |
| 126 | * False positives: names like ``save`` match any production symbol named |
| 127 | ``save``, regardless of the actual class/instance. |
| 128 | |
| 129 | Use the results as a coverage *signal*, not a proof. |
| 130 | """ |
| 131 | |
| 132 | import argparse |
| 133 | import ast |
| 134 | import json |
| 135 | import logging |
| 136 | import pathlib |
| 137 | import sys |
| 138 | from collections import defaultdict |
| 139 | from typing import TypedDict |
| 140 | |
| 141 | from muse.core.envelope import EnvelopeJson, make_envelope |
| 142 | from muse.core.errors import ExitCode |
| 143 | from muse.core.repo import require_repo |
| 144 | from muse.core.types import Manifest |
| 145 | from muse.core.refs import read_current_branch |
| 146 | from muse.core.commits import resolve_commit_ref |
| 147 | from muse.core.snapshots import get_commit_snapshot_manifest |
| 148 | from muse.core.timing import start_timer |
| 149 | from muse.plugins.code._callgraph import ForwardGraph, call_name, find_func_node |
| 150 | from muse.plugins.code._query import is_test_file, symbols_for_snapshot |
| 151 | from muse.plugins.code.ast_parser import SymbolRecord, parse_symbols |
| 152 | from muse.core.validation import clamp_int, MAX_AST_BYTES, sanitize_display |
| 153 | |
| 154 | logger = logging.getLogger(__name__) |
| 155 | |
| 156 | type BlobMap = dict[str, bytes] # file_path → raw blob bytes |
| 157 | type TestRefs = dict[str, set[str]] # test_addr → set of callee names |
| 158 | type CoverageMap = dict[str, set[str]] # prod_addr → set of test_addrs |
| 159 | type NameToAddrs = dict[str, list[str]] # bare_name → list of prod_addrs |
| 160 | type _SymbolTree = dict[str, SymbolRecord] |
| 161 | type SymbolTreeMap = dict[str, _SymbolTree] # file_path → symbol tree |
| 162 | type FlatSymbolMap = dict[str, SymbolRecord] # address → symbol record |
| 163 | |
| 164 | # ── Constants ────────────────────────────────────────────────────────────────── |
| 165 | |
| 166 | _DEFAULT_TOP = 0 # 0 = unlimited |
| 167 | _DEFAULT_DEPTH = 1 |
| 168 | _MAX_DEPTH = 10 |
| 169 | |
| 170 | _PY_SUFFIXES: frozenset[str] = frozenset({".py", ".pyi"}) |
| 171 | |
| 172 | _IMPORT_KIND = "import" |
| 173 | |
| 174 | # Symbol kinds surfaced by this command. |
| 175 | _TRACKED_KINDS: frozenset[str] = frozenset( |
| 176 | {"function", "async_function", "method", "async_method", "class"} |
| 177 | ) |
| 178 | |
| 179 | # ── TypedDicts ───────────────────────────────────────────────────────────────── |
| 180 | |
| 181 | class _SymbolCov(TypedDict): |
| 182 | """Coverage record for a single production symbol.""" |
| 183 | |
| 184 | address: str |
| 185 | name: str |
| 186 | kind: str |
| 187 | covered: bool |
| 188 | test_functions: list[str] |
| 189 | |
| 190 | class _FileCov(TypedDict): |
| 191 | """Aggregated coverage record for one production file.""" |
| 192 | |
| 193 | file: str |
| 194 | total_symbols: int |
| 195 | covered_symbols: int |
| 196 | uncovered_symbols: int |
| 197 | coverage_pct: float |
| 198 | symbols: list[_SymbolCov] |
| 199 | |
| 200 | class _FilterSpec(TypedDict): |
| 201 | """Filters applied to this analysis run.""" |
| 202 | |
| 203 | file: str | None |
| 204 | kind: str | None |
| 205 | min_coverage: int | None |
| 206 | uncovered_only: bool |
| 207 | |
| 208 | class _SummarySpec(TypedDict): |
| 209 | """Aggregate statistics across all production files.""" |
| 210 | |
| 211 | total_symbols: int |
| 212 | covered_symbols: int |
| 213 | uncovered_symbols: int |
| 214 | coverage_pct: float |
| 215 | total_test_functions: int |
| 216 | total_production_files: int |
| 217 | |
| 218 | class _JsonOut(EnvelopeJson): |
| 219 | """Top-level JSON output structure for ``muse code semantic-test-coverage --json``.""" |
| 220 | |
| 221 | ref: str |
| 222 | snapshot_id: str |
| 223 | depth: int |
| 224 | transitive: bool |
| 225 | filters: _FilterSpec |
| 226 | summary: _SummarySpec |
| 227 | files: list[_FileCov] |
| 228 | |
| 229 | def _is_conftest(file_path: str) -> bool: |
| 230 | """Return True if *file_path* is a conftest module.""" |
| 231 | return pathlib.PurePosixPath(file_path).name == "conftest.py" |
| 232 | |
| 233 | # ── AST helpers ──────────────────────────────────────────────────────────────── |
| 234 | |
| 235 | def _collect_test_funcs( |
| 236 | stmts: list[ast.stmt], |
| 237 | prefix: str, |
| 238 | is_conftest: bool, |
| 239 | ) -> list[tuple[str, ast.FunctionDef | ast.AsyncFunctionDef]]: |
| 240 | """Recursively collect test function nodes from *stmts*. |
| 241 | |
| 242 | Handles top-level test functions and methods inside ``Test*`` classes. |
| 243 | ``conftest.py`` includes all functions (fixture bodies also call production |
| 244 | code). |
| 245 | |
| 246 | Args: |
| 247 | stmts: Statement list from a module or class body. |
| 248 | prefix: Dotted qualification prefix accumulated from enclosing classes. |
| 249 | is_conftest: True when the file is ``conftest.py``; includes all functions. |
| 250 | |
| 251 | Returns: |
| 252 | List of ``(qualified_name, node)`` pairs for each test entry point found. |
| 253 | """ |
| 254 | results: list[tuple[str, ast.FunctionDef | ast.AsyncFunctionDef]] = [] |
| 255 | for stmt in stmts: |
| 256 | if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): |
| 257 | qname = f"{prefix}.{stmt.name}" if prefix else stmt.name |
| 258 | if is_conftest or stmt.name.startswith("test_"): |
| 259 | results.append((qname, stmt)) |
| 260 | elif isinstance(stmt, ast.ClassDef): |
| 261 | new_prefix = f"{prefix}.{stmt.name}" if prefix else stmt.name |
| 262 | results.extend(_collect_test_funcs(stmt.body, new_prefix, is_conftest)) |
| 263 | return results |
| 264 | |
| 265 | def _scan_calls(func_node: ast.FunctionDef | ast.AsyncFunctionDef) -> set[str]: |
| 266 | """Return the set of bare callee names called inside *func_node*. |
| 267 | |
| 268 | Only ``ast.Call`` nodes are examined. Bare name references without a call |
| 269 | are excluded (they do not indicate the symbol is exercised). |
| 270 | |
| 271 | Args: |
| 272 | func_node: The function or async-function AST node to scan. |
| 273 | |
| 274 | Returns: |
| 275 | Set of bare callee names (e.g. ``{"compute_total", "Invoice"}``) |
| 276 | """ |
| 277 | names: set[str] = set() |
| 278 | for node in ast.walk(func_node): |
| 279 | if isinstance(node, ast.Call): |
| 280 | name = call_name(node.func) |
| 281 | if name: |
| 282 | names.add(name) |
| 283 | return names |
| 284 | |
| 285 | # ── Data collection ──────────────────────────────────────────────────────────── |
| 286 | |
| 287 | def _load_py_blobs( |
| 288 | root: pathlib.Path, |
| 289 | manifest: Manifest, |
| 290 | ) -> BlobMap: |
| 291 | """Load every Python/stub blob from *manifest* into memory. |
| 292 | |
| 293 | A single bulk read avoids redundant object-store I/O for subsequent AST |
| 294 | parsing and call-graph construction phases. |
| 295 | |
| 296 | Args: |
| 297 | root: Repository root (object store location). |
| 298 | manifest: Snapshot manifest from ``get_commit_snapshot_manifest``. |
| 299 | |
| 300 | Returns: |
| 301 | Mapping ``{file_path: raw_bytes}`` for every Python file in the snapshot. |
| 302 | """ |
| 303 | from muse.core.object_store import read_object as _read_obj |
| 304 | |
| 305 | blobs: BlobMap = {} |
| 306 | for file_path, obj_id in manifest.items(): |
| 307 | if pathlib.PurePosixPath(file_path).suffix.lower() not in _PY_SUFFIXES: |
| 308 | continue |
| 309 | raw = _read_obj(root, obj_id) |
| 310 | if raw is not None: |
| 311 | blobs[file_path] = raw |
| 312 | return blobs |
| 313 | |
| 314 | def _build_test_refs(blobs: BlobMap) -> TestRefs: |
| 315 | """Scan test files and return ``{test_addr: set[bare_callee_name]}``. |
| 316 | |
| 317 | Args: |
| 318 | blobs: Pre-loaded mapping from ``_load_py_blobs``. |
| 319 | |
| 320 | Returns: |
| 321 | A mapping from each test-function address to the set of bare callee |
| 322 | names called anywhere inside that function body. |
| 323 | """ |
| 324 | refs: TestRefs = {} |
| 325 | for file_path, raw in blobs.items(): |
| 326 | if not is_test_file(file_path): |
| 327 | continue |
| 328 | try: |
| 329 | if len(raw) > MAX_AST_BYTES: |
| 330 | return {} |
| 331 | tree = ast.parse(raw) |
| 332 | except SyntaxError: |
| 333 | logger.warning("SyntaxError in %s — test file skipped", file_path) |
| 334 | continue |
| 335 | is_conf = _is_conftest(file_path) |
| 336 | for qname, func_node in _collect_test_funcs(tree.body, "", is_conf): |
| 337 | addr = f"{file_path}::{qname}" |
| 338 | refs[addr] = _scan_calls(func_node) |
| 339 | return refs |
| 340 | |
| 341 | def _build_prod_forward_graph( |
| 342 | blobs: BlobMap, |
| 343 | prod_files: set[str], |
| 344 | ) -> ForwardGraph: |
| 345 | """Build a forward call graph for production files using pre-loaded blobs. |
| 346 | |
| 347 | Reuses ``blobs`` to avoid a second round-trip through the object store. |
| 348 | Only callable symbols (functions, methods) are included in the graph. |
| 349 | |
| 350 | Args: |
| 351 | blobs: Pre-loaded Python blobs from ``_load_py_blobs``. |
| 352 | prod_files: Set of production file paths to include in the graph. |
| 353 | |
| 354 | Returns: |
| 355 | ``{caller_address: frozenset[callee_bare_name]}`` |
| 356 | """ |
| 357 | graph: ForwardGraph = {} |
| 358 | for file_path, raw in blobs.items(): |
| 359 | if file_path not in prod_files: |
| 360 | continue |
| 361 | try: |
| 362 | if len(raw) > MAX_AST_BYTES: |
| 363 | return {} |
| 364 | tree = ast.parse(raw) |
| 365 | except SyntaxError: |
| 366 | continue |
| 367 | sym_tree = parse_symbols(raw, file_path) |
| 368 | for addr, rec in sym_tree.items(): |
| 369 | if rec["kind"] not in {"function", "async_function", "method", "async_method"}: |
| 370 | continue |
| 371 | func_node = find_func_node(tree.body, rec["qualified_name"].split(".")) |
| 372 | if func_node is None: |
| 373 | continue |
| 374 | callees: set[str] = set() |
| 375 | for node in ast.walk(func_node): |
| 376 | if isinstance(node, ast.Call): |
| 377 | n = call_name(node.func) |
| 378 | if n: |
| 379 | callees.add(n) |
| 380 | graph[addr] = frozenset(callees) |
| 381 | return graph |
| 382 | |
| 383 | def _compute_coverage( |
| 384 | test_refs: TestRefs, |
| 385 | name_to_prod_addrs: NameToAddrs, |
| 386 | forward_graph: ForwardGraph | None, |
| 387 | depth: int, |
| 388 | ) -> CoverageMap: |
| 389 | """Compute ``{prod_addr: {test_addr, ...}}`` coverage mapping. |
| 390 | |
| 391 | Phase 1 — Direct coverage: |
| 392 | For each test function, find every bare callee name that matches a |
| 393 | production symbol address via *name_to_prod_addrs*. |
| 394 | |
| 395 | Phase 2 — Transitive expansion (only when ``forward_graph`` is provided |
| 396 | and ``depth > 1``): |
| 397 | BFS from each directly-covered production symbol, following calls in |
| 398 | *forward_graph*. Each newly-reached production symbol is added to the |
| 399 | test function's coverage set. The BFS is capped at ``depth - 1`` |
| 400 | additional hops. |
| 401 | |
| 402 | Args: |
| 403 | test_refs: ``{test_addr: set[bare_callee_name]}``. |
| 404 | name_to_prod_addrs: Reverse index: ``{bare_name: [prod_addr, ...]}``. |
| 405 | forward_graph: Production call graph; ``None`` for direct-only mode. |
| 406 | depth: Total coverage depth (1 = direct only). |
| 407 | |
| 408 | Returns: |
| 409 | ``{prod_addr: {test_addr, ...}}`` |
| 410 | """ |
| 411 | # Phase 1: direct coverage |
| 412 | coverage: CoverageMap = defaultdict(set) |
| 413 | test_to_direct: CoverageMap = defaultdict(set) |
| 414 | |
| 415 | for test_addr, bare_names in test_refs.items(): |
| 416 | for bare in bare_names: |
| 417 | for prod_addr in name_to_prod_addrs.get(bare, []): |
| 418 | coverage[prod_addr].add(test_addr) |
| 419 | test_to_direct[test_addr].add(prod_addr) |
| 420 | |
| 421 | # Phase 2: transitive expansion |
| 422 | if forward_graph is not None and depth > 1: |
| 423 | for test_addr, direct_prods in test_to_direct.items(): |
| 424 | frontier: set[str] = set(direct_prods) |
| 425 | visited: set[str] = set(direct_prods) |
| 426 | for _ in range(depth - 1): |
| 427 | next_frontier: set[str] = set() |
| 428 | for prod_addr in frontier: |
| 429 | for callee_bare in forward_graph.get(prod_addr, frozenset()): |
| 430 | for callee_addr in name_to_prod_addrs.get(callee_bare, []): |
| 431 | if callee_addr not in visited: |
| 432 | visited.add(callee_addr) |
| 433 | next_frontier.add(callee_addr) |
| 434 | coverage[callee_addr].add(test_addr) |
| 435 | if not next_frontier: |
| 436 | break |
| 437 | frontier = next_frontier |
| 438 | |
| 439 | return dict(coverage) |
| 440 | |
| 441 | # ── Output builders ──────────────────────────────────────────────────────────── |
| 442 | |
| 443 | def _build_file_coverage( |
| 444 | prod_trees: SymbolTreeMap, |
| 445 | coverage: CoverageMap, |
| 446 | file_filter: str | None, |
| 447 | kind_filter: str | None, |
| 448 | ) -> list[_FileCov]: |
| 449 | """Build structured ``_FileCov`` records for every production file. |
| 450 | |
| 451 | Returns all symbols per file (uncovered-only filtering is applied at |
| 452 | display/JSON time so summary statistics always reflect the full picture). |
| 453 | |
| 454 | Args: |
| 455 | prod_trees: ``{file_path: {addr: SymbolRecord}}`` for production files. |
| 456 | coverage: ``{prod_addr: {test_addr, ...}}`` from ``_compute_coverage``. |
| 457 | file_filter: Optional path suffix; only files containing this string are included. |
| 458 | kind_filter: Optional symbol kind; only symbols of this kind are included. |
| 459 | |
| 460 | Returns: |
| 461 | List of ``_FileCov`` records sorted by file path. |
| 462 | """ |
| 463 | result: list[_FileCov] = [] |
| 464 | for file_path in sorted(prod_trees): |
| 465 | if file_filter and file_filter not in file_path: |
| 466 | continue |
| 467 | sym_tree = prod_trees[file_path] |
| 468 | syms: list[_SymbolCov] = [] |
| 469 | for addr in sorted(sym_tree): |
| 470 | rec = sym_tree[addr] |
| 471 | if rec["kind"] == _IMPORT_KIND: |
| 472 | continue |
| 473 | if rec["kind"] not in _TRACKED_KINDS: |
| 474 | continue |
| 475 | if kind_filter and rec["kind"] != kind_filter: |
| 476 | continue |
| 477 | test_fns = sorted(coverage.get(addr, set())) |
| 478 | syms.append( |
| 479 | _SymbolCov( |
| 480 | address=addr, |
| 481 | name=rec["name"], |
| 482 | kind=rec["kind"], |
| 483 | covered=bool(test_fns), |
| 484 | test_functions=test_fns, |
| 485 | ) |
| 486 | ) |
| 487 | if not syms: |
| 488 | continue |
| 489 | covered_count = sum(1 for s in syms if s["covered"]) |
| 490 | result.append( |
| 491 | _FileCov( |
| 492 | file=file_path, |
| 493 | total_symbols=len(syms), |
| 494 | covered_symbols=covered_count, |
| 495 | uncovered_symbols=len(syms) - covered_count, |
| 496 | coverage_pct=round(100.0 * covered_count / len(syms), 1), |
| 497 | symbols=syms, |
| 498 | ) |
| 499 | ) |
| 500 | return result |
| 501 | |
| 502 | # ── Formatting ───────────────────────────────────────────────────────────────── |
| 503 | |
| 504 | _BAR_WIDTH = 20 |
| 505 | _KindAbbrevMap = dict[str, str] |
| 506 | _KIND_ABBREV: _KindAbbrevMap = { |
| 507 | "function": "func", |
| 508 | "async_function": "afn ", |
| 509 | "method": "meth", |
| 510 | "async_method": "amth", |
| 511 | "class": "cls ", |
| 512 | } |
| 513 | |
| 514 | def _bar(pct: float) -> str: |
| 515 | """Return a 20-char Unicode block bar representing *pct* (0–100).""" |
| 516 | filled = round(pct / (100 / _BAR_WIDTH)) |
| 517 | return f"{'█' * filled}{'░' * (_BAR_WIDTH - filled)}" |
| 518 | |
| 519 | def _print_table( |
| 520 | files: list[_FileCov], |
| 521 | uncovered_only: bool, |
| 522 | show_tests: bool, |
| 523 | min_coverage: int, |
| 524 | ref: str, |
| 525 | total_test_fns: int, |
| 526 | ) -> bool: |
| 527 | """Print human-readable coverage table to stdout. |
| 528 | |
| 529 | Args: |
| 530 | files: Structured file coverage from ``_build_file_coverage``. |
| 531 | uncovered_only: When True, only uncovered symbols are printed per file. |
| 532 | show_tests: When True, list covering test functions under each symbol. |
| 533 | min_coverage: Coverage threshold for ⚠️ flag and exit-1 signalling. |
| 534 | ref: Display string for the analysed ref (e.g. ``"HEAD"``). |
| 535 | total_test_fns: Total number of test functions found in the snapshot. |
| 536 | |
| 537 | Returns: |
| 538 | True if any file violates *min_coverage*; False otherwise. |
| 539 | """ |
| 540 | total_syms = sum(f["total_symbols"] for f in files) |
| 541 | total_covered = sum(f["covered_symbols"] for f in files) |
| 542 | total_pct = round(100.0 * total_covered / total_syms, 1) if total_syms else 0.0 |
| 543 | |
| 544 | print( |
| 545 | f"Semantic test coverage — {ref}" |
| 546 | f" ({total_syms} symbols · {total_test_fns} test functions)\n" |
| 547 | ) |
| 548 | |
| 549 | violation = False |
| 550 | for fc in files: |
| 551 | pct = fc["coverage_pct"] |
| 552 | bar = _bar(pct) |
| 553 | below = pct < min_coverage and min_coverage > 0 |
| 554 | if below: |
| 555 | violation = True |
| 556 | flag = " ⚠️" if below else "" |
| 557 | print(f" {fc['file']}") |
| 558 | print( |
| 559 | f" [{bar}] {pct:5.1f}%" |
| 560 | f" ({fc['covered_symbols']}/{fc['total_symbols']}){flag}" |
| 561 | ) |
| 562 | |
| 563 | for sym in fc["symbols"]: |
| 564 | if uncovered_only and sym["covered"]: |
| 565 | continue |
| 566 | icon = "✅" if sym["covered"] else "❌" |
| 567 | kind_short = _KIND_ABBREV.get(sym["kind"], sym["kind"][:4]) |
| 568 | print(f" {icon} {sanitize_display(sym['name']):<42} {kind_short}") |
| 569 | if show_tests and sym["test_functions"]: |
| 570 | for tf in sym["test_functions"]: |
| 571 | print(f" ← {tf}") |
| 572 | print() |
| 573 | |
| 574 | sep = "─" * 64 |
| 575 | print(f" {sep}") |
| 576 | print(f" TOTAL: {total_covered}/{total_syms} symbols covered ({total_pct}%)\n") |
| 577 | if violation: |
| 578 | print( |
| 579 | f" ⚠️ One or more files are below the {min_coverage}%" |
| 580 | " minimum coverage threshold." |
| 581 | ) |
| 582 | return violation |
| 583 | |
| 584 | # ── Entry point ──────────────────────────────────────────────────────────────── |
| 585 | |
| 586 | def run(args: argparse.Namespace) -> None: |
| 587 | """Entry point for ``muse code semantic-test-coverage``. |
| 588 | |
| 589 | Validates arguments, loads the HEAD snapshot, runs the static coverage |
| 590 | analysis, and prints results (or emits JSON). Exits 1 when |
| 591 | ``--min-coverage`` is set and any file falls below the threshold. |
| 592 | |
| 593 | Pass ``--json`` (or ``-j``) for a stable, machine-readable result. |
| 594 | |
| 595 | Agent quickstart:: |
| 596 | |
| 597 | muse code semantic-test-coverage --json |
| 598 | muse code semantic-test-coverage --file billing.py --json |
| 599 | muse code semantic-test-coverage --uncovered-only --json |
| 600 | muse code semantic-test-coverage --min-coverage 80 --json |
| 601 | |
| 602 | JSON fields:: |
| 603 | |
| 604 | ref str Analysed ref (always "HEAD") |
| 605 | snapshot_id str Short commit ID of the analysed snapshot |
| 606 | depth int Call-graph depth used for the analysis |
| 607 | transitive bool True when transitive coverage was enabled |
| 608 | filters dict Applied filters: file, kind, min_coverage, uncovered_only |
| 609 | summary dict total_symbols, covered_symbols, uncovered_symbols, coverage_pct, total_test_functions, total_production_files |
| 610 | files list Per-file coverage records with per-symbol detail |
| 611 | |
| 612 | Exit codes:: |
| 613 | |
| 614 | 0 Success (or min-coverage gate passed). |
| 615 | 1 Min-coverage threshold violated. |
| 616 | 1 User error (bad arguments, no HEAD commit). |
| 617 | 3 Internal error. |
| 618 | """ |
| 619 | elapsed = start_timer() |
| 620 | root = require_repo() |
| 621 | |
| 622 | # ── Argument validation ──────────────────────────────────────────────────── |
| 623 | depth: int = clamp_int(args.depth, 1, 50, 'depth') |
| 624 | if depth < 1 or depth > _MAX_DEPTH: |
| 625 | print(f"❌ --depth must be between 1 and {_MAX_DEPTH}.", file=sys.stderr) |
| 626 | raise SystemExit(ExitCode.USER_ERROR) |
| 627 | |
| 628 | min_coverage: int = clamp_int(args.min_coverage, 0, 100, 'min_coverage') |
| 629 | if not (0 <= min_coverage <= 100): |
| 630 | print("❌ --min-coverage must be between 0 and 100.", file=sys.stderr) |
| 631 | raise SystemExit(ExitCode.USER_ERROR) |
| 632 | |
| 633 | transitive: bool = args.transitive or depth > 1 |
| 634 | effective_depth: int = depth if transitive else 1 |
| 635 | |
| 636 | kind_filter: str | None = args.kind or None |
| 637 | file_filter: str | None = args.file or None |
| 638 | |
| 639 | # ── Resolve HEAD snapshot ────────────────────────────────────────────────── |
| 640 | branch = read_current_branch(root) |
| 641 | |
| 642 | head = resolve_commit_ref(root, branch, None) |
| 643 | if head is None: |
| 644 | print("❌ HEAD commit not found — is this an empty repository?", file=sys.stderr) |
| 645 | raise SystemExit(ExitCode.USER_ERROR) |
| 646 | |
| 647 | manifest: Manifest = get_commit_snapshot_manifest(root, head.commit_id) or {} |
| 648 | |
| 649 | # ── Single bulk read of all Python blobs ─────────────────────────────────── |
| 650 | blobs = _load_py_blobs(root, manifest) |
| 651 | |
| 652 | # ── Symbol extraction ────────────────────────────────────────────────────── |
| 653 | all_trees = symbols_for_snapshot(root, manifest) |
| 654 | |
| 655 | # Partition: test files vs. production files. |
| 656 | prod_trees: SymbolTreeMap = {} |
| 657 | for file_path, sym_tree in all_trees.items(): |
| 658 | if not is_test_file(file_path): |
| 659 | prod_trees[file_path] = sym_tree |
| 660 | |
| 661 | # Flat index of all production symbols, excluding imports. |
| 662 | all_prod: FlatSymbolMap = {} |
| 663 | for sym_tree in prod_trees.values(): |
| 664 | for addr, rec in sym_tree.items(): |
| 665 | if rec["kind"] not in (_IMPORT_KIND,) and rec["kind"] in _TRACKED_KINDS: |
| 666 | all_prod[addr] = rec |
| 667 | |
| 668 | # Reverse index: bare_name → [prod_addr, ...] |
| 669 | name_to_addrs: NameToAddrs = defaultdict(list) |
| 670 | for addr, rec in all_prod.items(): |
| 671 | name_to_addrs[rec["name"]].append(addr) |
| 672 | |
| 673 | # ── Test-function scanning ───────────────────────────────────────────────── |
| 674 | test_refs = _build_test_refs(blobs) |
| 675 | total_test_fns = len(test_refs) |
| 676 | |
| 677 | # ── Optional: build production call graph for transitive coverage ────────── |
| 678 | forward_graph: ForwardGraph | None = None |
| 679 | if transitive: |
| 680 | forward_graph = _build_prod_forward_graph(blobs, set(prod_trees)) |
| 681 | |
| 682 | # ── Coverage computation ─────────────────────────────────────────────────── |
| 683 | coverage = _compute_coverage( |
| 684 | test_refs, dict(name_to_addrs), forward_graph, effective_depth |
| 685 | ) |
| 686 | |
| 687 | # ── Structured output ────────────────────────────────────────────────────── |
| 688 | file_coverage = _build_file_coverage(prod_trees, coverage, file_filter, kind_filter) |
| 689 | |
| 690 | # ── JSON output ──────────────────────────────────────────────────────────── |
| 691 | if args.json_out: |
| 692 | total_syms = sum(f["total_symbols"] for f in file_coverage) |
| 693 | total_covered = sum(f["covered_symbols"] for f in file_coverage) |
| 694 | pct = round(100.0 * total_covered / total_syms, 1) if total_syms else 0.0 |
| 695 | |
| 696 | # Apply uncovered_only filter to symbol lists in JSON. |
| 697 | if args.uncovered_only: |
| 698 | filtered_files: list[_FileCov] = [] |
| 699 | for fc in file_coverage: |
| 700 | uncov = [s for s in fc["symbols"] if not s["covered"]] |
| 701 | if uncov: |
| 702 | filtered_files.append( |
| 703 | _FileCov( |
| 704 | file=fc["file"], |
| 705 | total_symbols=fc["total_symbols"], |
| 706 | covered_symbols=fc["covered_symbols"], |
| 707 | uncovered_symbols=fc["uncovered_symbols"], |
| 708 | coverage_pct=fc["coverage_pct"], |
| 709 | symbols=uncov, |
| 710 | ) |
| 711 | ) |
| 712 | file_coverage = filtered_files |
| 713 | |
| 714 | out = _JsonOut( |
| 715 | **make_envelope(elapsed), |
| 716 | ref="HEAD", |
| 717 | snapshot_id=head.commit_id, |
| 718 | depth=effective_depth, |
| 719 | transitive=transitive, |
| 720 | filters=_FilterSpec( |
| 721 | file=file_filter, |
| 722 | kind=kind_filter, |
| 723 | min_coverage=min_coverage if min_coverage > 0 else None, |
| 724 | uncovered_only=args.uncovered_only, |
| 725 | ), |
| 726 | summary=_SummarySpec( |
| 727 | total_symbols=total_syms, |
| 728 | covered_symbols=total_covered, |
| 729 | uncovered_symbols=total_syms - total_covered, |
| 730 | coverage_pct=pct, |
| 731 | total_test_functions=total_test_fns, |
| 732 | total_production_files=len(file_coverage), |
| 733 | ), |
| 734 | files=file_coverage, |
| 735 | ) |
| 736 | print(json.dumps(out)) |
| 737 | return |
| 738 | |
| 739 | # ── Human-readable output ────────────────────────────────────────────────── |
| 740 | violation = _print_table( |
| 741 | file_coverage, |
| 742 | uncovered_only=args.uncovered_only, |
| 743 | show_tests=args.show_tests, |
| 744 | min_coverage=min_coverage, |
| 745 | ref="HEAD", |
| 746 | total_test_fns=total_test_fns, |
| 747 | ) |
| 748 | if violation: |
| 749 | sys.exit(1) |
| 750 | |
| 751 | # ── CLI registration ─────────────────────────────────────────────────────────── |
| 752 | |
| 753 | def register( |
| 754 | sub: argparse._SubParsersAction[argparse.ArgumentParser], |
| 755 | ) -> None: |
| 756 | """Register ``semantic-test-coverage`` under the ``code`` subcommand group. |
| 757 | |
| 758 | Arguments registered |
| 759 | -------------------- |
| 760 | --file SUFFIX Scope to production files whose path contains SUFFIX. |
| 761 | --kind KIND Filter symbols by kind (function, method, class, …). |
| 762 | --transitive Expand coverage through the production call graph. |
| 763 | --depth D Transitive call-graph depth (default: 1). Values > 1 imply --transitive. |
| 764 | --uncovered-only Show/emit only symbols with no test coverage. |
| 765 | --show-tests Under each covered symbol, list covering test functions. |
| 766 | --min-coverage PCT Exit 1 if any production file is below PCT% semantic coverage. |
| 767 | --json / -j Emit JSON instead of human-readable text. |
| 768 | |
| 769 | Args: |
| 770 | sub: The subparser action from the ``code`` command group. |
| 771 | """ |
| 772 | p = sub.add_parser( |
| 773 | "semantic-test-coverage", |
| 774 | help=( |
| 775 | "Static symbol-level test coverage — which symbols are exercised" |
| 776 | " by which tests, without running the test suite." |
| 777 | ), |
| 778 | description=__doc__, |
| 779 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 780 | ) |
| 781 | p.add_argument( |
| 782 | "--file", |
| 783 | metavar="SUFFIX", |
| 784 | help=( |
| 785 | "Scope analysis to production files whose path contains SUFFIX" |
| 786 | " (e.g. --file billing.py)." |
| 787 | ), |
| 788 | ) |
| 789 | p.add_argument( |
| 790 | "--kind", |
| 791 | metavar="KIND", |
| 792 | choices=["function", "async_function", "method", "async_method", "class"], |
| 793 | help="Filter symbols by kind.", |
| 794 | ) |
| 795 | p.add_argument( |
| 796 | "--transitive", |
| 797 | action="store_true", |
| 798 | help=( |
| 799 | "Expand coverage through the production call graph." |
| 800 | " Combines with --depth (default: 2 when this flag is set)." |
| 801 | ), |
| 802 | ) |
| 803 | p.add_argument( |
| 804 | "--depth", |
| 805 | type=int, |
| 806 | default=_DEFAULT_DEPTH, |
| 807 | metavar="D", |
| 808 | help=( |
| 809 | f"Transitive call-graph depth (default: {_DEFAULT_DEPTH})." |
| 810 | " Values > 1 imply --transitive." |
| 811 | f" Maximum: {_MAX_DEPTH}." |
| 812 | ), |
| 813 | ) |
| 814 | p.add_argument( |
| 815 | "--uncovered-only", |
| 816 | action="store_true", |
| 817 | help="Show (or emit in JSON) only symbols with no test coverage.", |
| 818 | ) |
| 819 | p.add_argument( |
| 820 | "--show-tests", |
| 821 | action="store_true", |
| 822 | help="Under each covered symbol, list the test functions that exercise it.", |
| 823 | ) |
| 824 | p.add_argument( |
| 825 | "--min-coverage", |
| 826 | type=int, |
| 827 | default=0, |
| 828 | metavar="PCT", |
| 829 | help=( |
| 830 | "Exit 1 if any production file is below PCT%% semantic coverage." |
| 831 | " Useful for CI gates." |
| 832 | ), |
| 833 | ) |
| 834 | p.add_argument( |
| 835 | "--json", "-j", |
| 836 | action="store_true", |
| 837 | dest="json_out", |
| 838 | help="Emit JSON instead of human-readable text.", |
| 839 | ) |
| 840 | p.set_defaults(func=run) |
File History
7 commits
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e
chore: remove blob-debug test marker file
Sonnet 4.6
23 hours ago
sha256:e452ad9a6ace6ccc6d875a35e06caf9da5576a970c1c36133b69a891ce5fefa8
chore: prebuild timing test
Sonnet 4.6
8 days ago
sha256:0008ab6695e3e064b3e236b24fd19e538fef6a588eb0d211622f4466d919c0b1
merge: pull staging/dev — advance to 0.2.0rc12
Sonnet 4.6
patch
10 days ago
sha256:9c33d61749fff814c5226d5386aa2af7064c2c02788594a25fdd709358132eea
fix: _PROPOSAL_PREFIX_RESOLVE_LIMIT 200 → 100 to match hub …
Sonnet 4.6
21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
24 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
30 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
30 days ago