"""muse code test — symbol-graph–driven test selection and execution. The most powerful test command ever built for a version control system. Traditional test runners are file-aware at best. You run a test file, it either passes or fails. You change 200 files and hope your CI matrix covers the right subset. You run the full suite and wait ten minutes. ``muse code test`` is different. It knows exactly which symbols changed, which tests call those symbols (via the committed call graph), and which tests have historically been flaky. It runs the minimum set of tests needed to validate your changes — and it prioritises failing tests to surface problems in seconds, not minutes. How it works ------------ 1. **Diff** — compare HEAD snapshot symbols against the working tree to find every modified, added, or deleted symbol. 2. **Graph** — BFS through the call graph from every test function to find which tests transitively call each changed symbol. 3. **Prioritise** — order tests by risk: failure streaks first, flaky tests second, unknown tests third, slow tests to the front of parallel queues. 4. **Execute** — run the selected tests as isolated subprocesses with configurable parallelism and a wall-clock budget. 5. **Record** — persist pass/fail results to ``.muse/cache/test_history.json`` for future prioritisation and flaky-test detection. Usage:: # Run tests for all symbols changed vs HEAD (smart selection, default) muse code test # Run all tests (no selection, equivalent to pytest tests/) muse code test --all # Select tests covering a specific symbol muse code test --symbol "muse/core/store.py::read_commit" # Run a specific file or node ID directly muse code test tests/test_core_store.py muse code test "tests/test_core_store.py::TestReadCommit::test_returns_none_on_missing" # Control execution muse code test --workers 4 --timeout 120 # Show what would be run without running it muse code test --dry-run # Show historical summary (pass rates, flaky tests) muse code test --history muse code test --flaky # Run full CI gate suite (.muse/ci.toml) muse code test --ci # Machine-readable output muse code test --json Flags ----- ``TARGET [TARGET ...]`` Optional pytest node IDs or file paths to run directly (bypasses graph selection). ``--all, -a`` Ignore the working-tree diff; run all discovered tests. ``--symbol ADDR, -s ADDR`` Force-select tests covering the given symbol address (``"path/to/file.py::Name"``). May be specified multiple times. ``--depth N, -d N`` Call-graph BFS depth for test selection (default 3). ``--workers N, -w N`` Number of parallel subprocess partitions (default 1). ``--timeout S`` Wall-clock budget per partition in seconds (default 0 = unlimited). ``--dry-run`` Print selected tests without executing them. ``--no-save`` Do not persist results to ``.muse/cache/test_history.json``. ``--history`` Print a summary of historical pass/fail rates and exit. ``--flaky`` Print only tests with a history of intermittent failures and exit. ``--ci`` Execute the full CI gate suite from ``.muse/ci.toml`` and exit. ``--extra ARGS`` Extra arguments forwarded verbatim to pytest (e.g. ``-x``, ``-v``, ``--timeout=30``). ``--json`` Emit a machine-readable JSON result and exit. """ import argparse import json import logging import pathlib import sys from collections.abc import Callable from typing import NotRequired, TypedDict from muse.core.ci import CiRunResult, GateResult, load_ci_config, run_ci from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.timing import start_timer from muse.core.repo import require_repo from muse.core.symbol_cache import load_symbol_cache from muse.core.types import Manifest from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.snapshots import get_commit_snapshot_manifest from muse.core.test_history import ( HistorySummary, RunRecord, CaseRecord, append_run, flaky_tests, iso_now, load_history, make_run_id, prioritize_targets, summarize, ) from muse.core.test_runner import RunConfig, RunResult, CaseResult, run_tests from muse.core.validation import sanitize_display from muse.core.test_selection import ( ChangedSymbol, SelectionResult, SelectionTarget, changed_symbols_from_diff, select_tests, ) type _HistoryMap = dict[str, "HistorySummary"] logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # JSON output types # --------------------------------------------------------------------------- class _SelectionJson(TypedDict): """JSON representation of the test-selection phase.""" changed_addresses: list[str] covered_addresses: list[str] uncovered_addresses: list[str] coverage_fraction: float fallback_used: bool targets: list[str] class _RunJson(TypedDict): """JSON representation of the test execution phase.""" run_id: str exit_code: int duration_ms: float total: int passed: int failed: int errored: int skipped: int timed_out: bool json_report_available: bool class _TestResultJson(TypedDict): """Per-test result in JSON output.""" node_id: str outcome: str duration_ms: float longrepr: NotRequired[str] class _HistoryJson(TypedDict): """JSON representation of a HistorySummary.""" node_id: str total_runs: int pass_count: int fail_count: int skip_count: int flaky: bool avg_duration_ms: float last_outcome: str | None last_run_timestamp: str | None fail_streak: int class _CiGateJson(TypedDict): """JSON representation of a single CI gate result.""" name: str command: list[str] exit_code: int duration_ms: float required: bool passed: bool timed_out: bool stdout: str stderr: str warning: NotRequired[str] class _CiJson(TypedDict): """JSON representation of a full CI run.""" passed: bool timestamp: str duration_ms: float gates: list[_CiGateJson] class _FullJson(EnvelopeJson): """Top-level JSON output for ``muse code test``.""" mode: str selection: NotRequired[_SelectionJson] run: NotRequired[_RunJson] results: NotRequired[list[_TestResultJson]] history: NotRequired[list[_HistoryJson]] ci: NotRequired[_CiJson] error: NotRequired[str] # --------------------------------------------------------------------------- # Registration # --------------------------------------------------------------------------- def register( subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]", ) -> None: """Register the ``test`` subcommand under a code sub-parser. Arguments --------- TARGET Optional pytest node IDs or file paths to run directly (bypasses graph selection). --all, -a Run all tests regardless of working-tree diff. --symbol ADDR, -s ADDR Force-select tests covering this symbol address (repeatable). --depth N, -d N Call-graph BFS depth for test selection (default 3). --workers N, -w N Number of parallel subprocess partitions (default 1). --timeout S Wall-clock budget per partition in seconds (default 0 = unlimited). --dry-run Print selected tests without executing them. --no-save Do not persist results to ``.muse/cache/test_history.json``. --history Print a summary of historical pass/fail rates and exit. --flaky Print only tests with a history of intermittent failures and exit. --ci Execute the full CI gate suite from ``.muse/ci.toml`` and exit. --extra ARGS Extra arguments forwarded verbatim to pytest. --json, -j Emit machine-readable JSON with schema_version, exit_code, and duration_ms in the envelope. """ parser = subparsers.add_parser( "test", help="Symbol-graph–driven test selection and execution.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "targets", nargs="*", metavar="TARGET", help="Optional pytest node IDs or file paths (bypasses graph selection).", ) parser.add_argument( "--all", "-a", action="store_true", dest="run_all", help="Run all tests regardless of working-tree diff.", ) parser.add_argument( "--symbol", "-s", action="append", dest="symbols", default=[], metavar="ADDR", help="Force-select tests covering this symbol address (repeatable).", ) parser.add_argument( "--depth", "-d", type=int, default=3, metavar="N", help="Call-graph BFS depth for test selection (default 3).", ) parser.add_argument( "--workers", "-w", type=int, default=1, metavar="N", help="Parallel subprocess partitions (default 1).", ) parser.add_argument( "--timeout", type=float, default=0.0, metavar="S", help="Wall-clock budget per partition in seconds (default 0 = unlimited).", ) parser.add_argument( "--dry-run", action="store_true", help="Print selected tests without executing them.", ) parser.add_argument( "--no-save", action="store_true", help="Do not persist results to test history.", ) parser.add_argument( "--history", action="store_true", help="Print historical pass/fail summary and exit.", ) parser.add_argument( "--flaky", action="store_true", help="Print only flaky tests from history and exit.", ) parser.add_argument( "--ci", action="store_true", help="Run the full CI gate suite from .muse/ci.toml.", ) parser.add_argument( "--extra", nargs=argparse.REMAINDER, default=[], metavar="ARGS", help="Extra arguments forwarded verbatim to pytest.", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit machine-readable JSON.", ) parser.set_defaults(func=run, json_out=False) # --------------------------------------------------------------------------- # History display # --------------------------------------------------------------------------- def _print_history(summaries: _HistoryMap, *, flaky_only: bool) -> None: """Render the history table to stdout.""" entries = sorted(summaries.values(), key=lambda s: s["fail_count"], reverse=True) if flaky_only: entries = [e for e in entries if e["flaky"]] if not entries: print("No test history recorded." if not flaky_only else "No flaky tests found.") return hdr = f"{'NODE ID':<70} {'RUNS':>5} {'PASS':>5} {'FAIL':>5} {'FLAKY':>6} {'AVG ms':>8} {'STREAK':>7}" print(hdr) print("─" * len(hdr)) for s in entries: flaky_flag = "✓" if s["flaky"] else "" print( f"{s['node_id']:<70} " f"{s['total_runs']:>5} " f"{s['pass_count']:>5} " f"{s['fail_count']:>5} " f"{flaky_flag:>6} " f"{s['avg_duration_ms']:>8.1f} " f"{s['fail_streak']:>7}" ) def _history_to_json(s: HistorySummary) -> _HistoryJson: return _HistoryJson( node_id=s["node_id"], total_runs=s["total_runs"], pass_count=s["pass_count"], fail_count=s["fail_count"], skip_count=s["skip_count"], flaky=s["flaky"], avg_duration_ms=s["avg_duration_ms"], last_outcome=s["last_outcome"], last_run_timestamp=s["last_run_timestamp"], fail_streak=s["fail_streak"], ) # --------------------------------------------------------------------------- # CI display # --------------------------------------------------------------------------- def _print_ci_result(result: CiRunResult) -> None: """Render CI gate results to stdout.""" width = 72 print() print("CI gate results") print("─" * width) for gate in result["gates"]: icon = "✅" if gate["passed"] else ("⚠️ " if not gate["required"] else "❌") ms = gate["duration_ms"] print(f" {icon} {gate['name']:<40} {ms:>8.0f} ms exit={gate['exit_code']}") if not gate["passed"] and gate["stdout"]: for line in gate["stdout"].strip().splitlines()[-5:]: print(f" {line}") if not gate["passed"] and gate["stderr"]: for line in gate["stderr"].strip().splitlines()[-3:]: print(f" {line}") print("─" * width) overall = "✅ PASSED" if result["passed"] else "❌ FAILED" total_s = result["total_duration_ms"] / 1000.0 print(f" {overall} ({total_s:.1f} s total)") print() def _gate_to_json(g: GateResult) -> _CiGateJson: """Serialise a single :class:`GateResult` for JSON output.""" out = _CiGateJson( name=g["name"], command=g["command"], exit_code=g["exit_code"], duration_ms=g["duration_ms"], required=g["required"], passed=g["passed"], timed_out=g["timed_out"], stdout=g["stdout"], stderr=g["stderr"], ) if "warning" in g: out["warning"] = g["warning"] return out def _ci_to_json(result: CiRunResult) -> _CiJson: """Serialise a :class:`CiRunResult` for JSON output.""" return _CiJson( passed=result["passed"], timestamp=result["timestamp"], duration_ms=result["total_duration_ms"], gates=[_gate_to_json(g) for g in result["gates"]], ) # --------------------------------------------------------------------------- # Helpers for run recording # --------------------------------------------------------------------------- def _run_result_to_record( result: RunResult, *, commit_id: str | None, branch: str | None, selection: SelectionResult | None, ) -> RunRecord: """Convert a :class:`RunResult` to a persistable :class:`RunRecord`.""" def _to_case(r: CaseResult) -> CaseRecord: # Determine which symbol addresses this test covers (from selection). symbol_addresses: list[str] = [] if selection is not None: for target in selection["test_targets"]: if target["node_id"] == r["node_id"] or target["file"] in r["node_id"]: symbol_addresses = list(selection["covered_addresses"]) break rec = CaseRecord( node_id=r["node_id"], outcome=r["outcome"], duration_ms=r["duration_ms"], symbol_addresses=symbol_addresses, ) if "longrepr" in r: rec["longrepr"] = r["longrepr"] return rec return RunRecord( run_id=result["run_id"], timestamp=iso_now(), commit_id=commit_id, branch=branch, results=[_to_case(r) for r in result["results"]], total=result["total"], passed=result["passed"], failed=result["failed"], errored=result["errored"], skipped=result["skipped"], ) # --------------------------------------------------------------------------- # Main command handler # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> None: """Run symbol-graph–driven test selection and execution. Diffs the working tree against HEAD, follows the call graph to select covering tests, prioritises by failure history, and executes with configurable parallelism. Supports history inspection, dry-run, and full CI gate mode. Agent quickstart:: muse code test --json muse code test --all --json muse code test --symbol "billing.py::compute_total" --json muse code test --dry-run --json muse code test --ci --json muse code test --history --json JSON fields:: mode Execution mode: "run", "dry-run", "history", "ci". selection Test-selection metadata (changed + covered addresses). run Execution summary: total, passed, failed, errored, skipped. results Per-test outcome list (node_id, outcome, duration_ms). history Historical summaries per test (pass rate, flaky flag). ci CI gate results (present in --ci mode only). error Config or fatal error message (when non-zero exit). muse_version Muse release that produced this output. schema Envelope schema version (int). exit_code 0 on all tests pass, 1 on any test failure. duration_ms Wall-clock milliseconds for the command. timestamp ISO-8601 UTC timestamp of command completion. warnings List of non-fatal advisory messages. Exit codes:: 0 All selected tests passed (or dry-run / history mode). 1 One or more tests failed or errored. """ elapsed = start_timer() root = require_repo() json_out: bool = args.json_out # ── History / flaky mode (read-only, no tests run) ─────────────────── if args.history or args.flaky: records = load_history(root) sums = summarize(records) if args.flaky: flaky_list = flaky_tests(records) sums = {s["node_id"]: s for s in flaky_list} if json_out: out = _FullJson( **make_envelope(elapsed), mode="history", history=[_history_to_json(s) for s in sums.values()], ) print(json.dumps(out)) else: _print_history(sums, flaky_only=args.flaky) return # ── CI mode ────────────────────────────────────────────────────────── if args.ci: try: ci_config = load_ci_config(root) except ValueError as exc: _fatal(str(exc), json_out, elapsed) return ci_result = run_ci(root, ci_config) if json_out: ci_code = 0 if ci_result["passed"] else 1 out = _FullJson(**make_envelope(elapsed, exit_code=ci_code), mode="ci", ci=_ci_to_json(ci_result)) print(json.dumps(out)) else: _print_ci_result(ci_result) sys.exit(0 if ci_result["passed"] else 1) # ── Determine what to run ───────────────────────────────────────────── explicit_targets: list[str] = list(args.targets or []) force_symbols: list[str] = list(args.symbols or []) run_all: bool = bool(args.run_all) # Try to load the HEAD snapshot for graph-based selection. branch: str | None = None commit_id: str | None = None manifest: Manifest | None = None try: branch = read_current_branch(root) commit_id = get_head_commit_id(root, branch) if commit_id: manifest = get_commit_snapshot_manifest(root, commit_id) except Exception as exc: logger.debug("test_cmd: could not load HEAD manifest: %s", exc) selection: SelectionResult | None = None final_targets: list[str] # Load the symbol cache once and share it across changed_symbols_from_diff # and select_tests — avoids double disk reads and double parse_symbols calls. shared_sym_cache = load_symbol_cache(root) if manifest is not None else None if explicit_targets: # User specified exact targets — run them directly. final_targets = explicit_targets elif run_all or manifest is None: # No snapshot or --all flag — discover all tests. final_targets = [] elif force_symbols: # Force-select tests covering specific symbols. forced_changed: list[ChangedSymbol] = [ ChangedSymbol(address=addr, change_kind="modified") for addr in force_symbols ] selection = select_tests( root, forced_changed, manifest, depth=args.depth, cache=shared_sym_cache, ) final_targets = [t["node_id"] for t in selection["test_targets"]] else: # Default: diff working tree vs HEAD and select covering tests. try: changed = changed_symbols_from_diff(root, manifest, cache=shared_sym_cache) except Exception as exc: logger.warning("⚠️ test_cmd: diff failed, falling back to --all: %s", exc) changed = [] if not changed: # Nothing changed in the working tree — there is nothing to test. # Running the full suite here would silently block for minutes. # Use --all to explicitly run every test file. if json_out: print(json.dumps({**make_envelope(elapsed), "mode": "run", "message": "no changes detected"})) else: print("\n✅ No changes detected — nothing to test.") print(" Use --all to run the full suite explicitly.\n") return else: selection = select_tests( root, changed, manifest, depth=args.depth, cache=shared_sym_cache ) final_targets = [t["node_id"] for t in selection["test_targets"]] # Re-order targets using historical risk priority. if final_targets: records_for_priority = load_history(root) final_targets = prioritize_targets(final_targets, records_for_priority) # ── Dry-run ────────────────────────────────────────────────────────── if args.dry_run: _print_dry_run(selection, final_targets, json_out, elapsed) return # ── Execute ────────────────────────────────────────────────────────── extra: list[str] = list(args.extra or []) config = RunConfig( targets=final_targets, workers=args.workers, timeout_s=args.timeout, extra_args=extra, env_allowlist=[], cwd=root, stream_output=not json_out, ) if not json_out: _print_pre_run(selection, final_targets) # When streaming, pytest writes directly to the terminal so progress_cb # dots would interleave badly. Use progress_cb only in captured (json) mode. result = run_tests( config, progress_cb=_progress_cb if json_out else None, ) # ── Persist history ────────────────────────────────────────────────── if not args.no_save: record = _run_result_to_record( result, commit_id=commit_id, branch=branch, selection=selection, ) try: append_run(root, record) except Exception as exc: logger.warning("⚠️ test_cmd: failed to save history: %s", exc) # ── Output ─────────────────────────────────────────────────────────── if json_out: sel_json: _SelectionJson | None = None if selection is not None: sel_json = _SelectionJson( changed_addresses=selection["changed_addresses"], covered_addresses=selection["covered_addresses"], uncovered_addresses=selection["uncovered_addresses"], coverage_fraction=selection["coverage_fraction"], fallback_used=selection["fallback_used"], targets=[t["node_id"] for t in selection["test_targets"]], ) run_json = _RunJson( run_id=result["run_id"], exit_code=result["exit_code"], duration_ms=result["duration_ms"], total=result["total"], passed=result["passed"], failed=result["failed"], errored=result["errored"], skipped=result["skipped"], timed_out=result["timed_out"], json_report_available=result["json_report_available"], ) test_results: list[_TestResultJson] = [] for r in result["results"]: tr = _TestResultJson( node_id=r["node_id"], outcome=r["outcome"], duration_ms=r["duration_ms"], ) if "longrepr" in r: tr["longrepr"] = r["longrepr"] test_results.append(tr) run_exit = result["exit_code"] if result["exit_code"] in {0, 1} else 1 out = _FullJson(**make_envelope(elapsed, exit_code=run_exit), mode="run", run=run_json, results=test_results) if sel_json is not None: out["selection"] = sel_json print(json.dumps(out)) else: _print_summary(result, selection) sys.exit(result["exit_code"] if result["exit_code"] in {0, 1} else 1) # --------------------------------------------------------------------------- # Display helpers # --------------------------------------------------------------------------- def _fatal(msg: str, json_out: bool, elapsed: Callable[[], float] | None = None) -> None: if json_out: env = make_envelope(elapsed, exit_code=1) if elapsed is not None else {} print(json.dumps({**env, "error": msg})) else: print(f"❌ {msg}", file=sys.stderr) sys.exit(1) def _progress_cb(result: CaseResult) -> None: """Stream a single test result to stderr as it arrives. Progress dots go to stderr so they never contaminate the JSON object emitted to stdout in ``--json`` mode. """ icon = {"passed": ".", "failed": "F", "error": "E", "skipped": "s"}.get( result["outcome"], "?" ) print(icon, end="", flush=True, file=sys.stderr) def _print_pre_run(selection: SelectionResult | None, targets: list[str]) -> None: """Print a pre-run summary before tests execute.""" if selection is not None: n = len(selection["changed_addresses"]) t = len(targets) uncov = len(selection["uncovered_addresses"]) pct = selection["coverage_fraction"] * 100 print( f"\n🔍 Changed symbols: {n} → Selected tests: {t} " f"(coverage {pct:.0f}%)" ) if uncov: print(f" ⚠️ {uncov} symbol(s) have no covering test:") for addr in selection["uncovered_addresses"][:5]: print(f" • {sanitize_display(addr)}") if uncov > 5: print(f" … and {uncov - 5} more") if selection["fallback_used"]: print(" ℹ️ File-name heuristics used for some targets (graph miss)") elif targets: print(f"\n🔍 Running {len(targets)} specified target(s)") else: print("\n🔍 Running full test suite (--all or no HEAD snapshot)") print() def _print_dry_run( selection: SelectionResult | None, targets: list[str], json_out: bool, elapsed: Callable[[], float] | None = None, ) -> None: """Print the selected targets without executing them.""" if json_out: sel_json: _SelectionJson | None = None if selection is not None: sel_json = _SelectionJson( changed_addresses=selection["changed_addresses"], covered_addresses=selection["covered_addresses"], uncovered_addresses=selection["uncovered_addresses"], coverage_fraction=selection["coverage_fraction"], fallback_used=selection["fallback_used"], targets=targets, ) out = _FullJson(**make_envelope(elapsed if callable(elapsed) else lambda: 0.0), mode="dry-run") if sel_json is not None: out["selection"] = sel_json print(json.dumps(out)) return if selection is not None: _print_pre_run(selection, targets) if targets: print("Would run:") for t in targets: print(f" pytest {t}") else: print("Would run: pytest (full discovery)") def _print_summary(result: RunResult, selection: SelectionResult | None) -> None: """Print the post-run summary.""" print() width = 60 print("─" * width) icon = "✅" if result["exit_code"] == 0 else "❌" s = result["duration_ms"] / 1000.0 counts_available = result["json_report_available"] or result["total"] > 0 if counts_available: print( f"{icon} {result['passed']} passed " f"{result['failed']} failed " f"{result['errored']} error " f"{result['skipped']} skipped " f"({s:.2f} s)" ) else: # Stream mode without pytest-json-report: pytest output went straight # to the terminal but was never captured — counts are unavailable. # The exit code is still correct. print(f"{icon} counts unavailable ({s:.2f} s)") print( " ℹ️ Install pytest-json-report for structured counts: " "pip install pytest-json-report" ) if result["timed_out"]: print(" ⚠️ Run was terminated due to timeout") # Show any uncovered symbols as a reminder. if selection is not None and selection["uncovered_addresses"]: uncov = selection["uncovered_addresses"] print(f"\n⚠️ Coverage gaps — {len(uncov)} changed symbol(s) have no tests:") for addr in uncov[:10]: print(f" • {sanitize_display(addr)}") if len(uncov) > 10: print(f" … and {len(uncov) - 10} more") print()