"""Integration tests for code-domain CLI commands. Uses a real Muse repository initialised in tmp_path. Coverage -------- Provenance & Topology muse lineage ADDRESS [--json] muse api-surface [--diff REF] [--json] muse codemap [--top N] [--json] muse clones [--tier exact|near|both] [--json] muse checkout-symbol ADDRESS --commit REF [--dry-run] muse semantic-cherry-pick ADDRESS... --from REF [--dry-run] [--json] Query & Temporal Search muse query PREDICATE [--all-commits] [--json] muse query-history PREDICATE [--from REF] [--to REF] [--json] Index Commands muse index status [--json] muse index rebuild [--index NAME] Refactor Detection muse detect-refactor --json (schema_version in output) Multi-Agent Coordination muse reserve ADDRESS... muse intent ADDRESS... --op OP muse forecast [--json] muse plan-merge OURS THEIRS [--json] muse shard --agents N [--json] muse reconcile [--json] Structural Enforcement muse breakage [--json] muse invariants [--json] Semantic Versioning Metadata muse log shows SemVer for commits with bumps muse commit stores sem_ver_bump in CommitRecord Call-Graph Tier muse impact ADDRESS [--json] muse dead [--json] muse coverage CLASS_ADDRESS [--json] muse deps ADDRESS_OR_FILE [--json] muse find-symbol [--name NAME] [--json] muse patch ADDRESS FILE """ import json import pathlib import textwrap import pytest from tests.cli_test_helper import CliRunner from typing import TypedDict from muse._version import __version__ cli = None # argparse migration — CliRunner ignores this arg from muse.core.refs import get_head_commit_id from muse.core.commits import CommitDict from muse.core.types import Manifest from muse.core.paths import coordination_dir, indices_dir, muse_dir, ref_path, repo_json_path type _ImportsMap = dict[str, list[str]] type _ImportsSetMap = dict[str, set[str]] type _KindsMap = dict[str, int] runner = CliRunner() # --------------------------------------------------------------------------- # Shared fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Initialise a fresh code-domain Muse repo.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["init", "--domain", "code"]) assert result.exit_code == 0, result.output return tmp_path @pytest.fixture def code_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with two Python commits for analysis commands.""" work = repo # Commit 1 — define compute_total and Invoice class. (work / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return sum(items) def apply_discount(self, total, pct): return total * (1 - pct) def process_order(invoice, items): return invoice.compute_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Initial billing module"]) assert r.exit_code == 0, r.output # Commit 2 — rename compute_total, add new function. (work / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_invoice_total(self, items): return sum(items) def apply_discount(self, total, pct): return total * (1 - pct) def generate_pdf(self): return b"pdf" def process_order(invoice, items): return invoice.compute_invoice_total(items) def send_email(address): pass """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Rename compute_total, add generate_pdf + send_email"]) assert r.exit_code == 0, r.output return repo # --------------------------------------------------------------------------- # muse lineage # --------------------------------------------------------------------------- class TestLineage: def test_lineage_exits_zero_on_existing_symbol(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "billing.py::process_order"]) assert result.exit_code == 0, result.output def test_lineage_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "--json", "billing.py::process_order"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert isinstance(data, dict) assert "events" in data def test_lineage_missing_address_shows_message(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "billing.py::nonexistent_func"]) # Should not crash — exit 0 or 1, but no unhandled exception. assert result.exit_code in (0, 1) def test_lineage_requires_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "lineage", "src/a.py::f"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # muse api-surface # --------------------------------------------------------------------------- class TestApiSurface: def test_api_surface_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "api-surface"]) assert result.exit_code == 0, result.output def test_api_surface_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "api-surface", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) def test_api_surface_diff(self, code_repo: pathlib.Path) -> None: commits = _all_commit_ids(code_repo) if len(commits) >= 2: result = runner.invoke(cli, ["code", "api-surface", "--diff", commits[-2]]) assert result.exit_code == 0 def test_api_surface_no_commits_handled(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "api-surface"]) assert result.exit_code in (0, 1) # --------------------------------------------------------------------------- # muse codemap # --------------------------------------------------------------------------- class TestCodemap: def test_codemap_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "codemap"]) assert result.exit_code == 0, result.output def test_codemap_top_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "codemap", "--top", "3"]) assert result.exit_code == 0 def test_codemap_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "codemap", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) # --------------------------------------------------------------------------- # muse clones # --------------------------------------------------------------------------- class TestClones: def test_clones_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones"]) assert result.exit_code == 0, result.output def test_clones_tier_exact(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--tier", "exact"]) assert result.exit_code == 0 def test_clones_tier_near(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--tier", "near"]) assert result.exit_code == 0 def test_clones_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "clones", "--tier", "both", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) # --------------------------------------------------------------------------- # muse checkout-symbol # --------------------------------------------------------------------------- class TestCheckoutSymbol: def test_checkout_symbol_dry_run(self, code_repo: pathlib.Path) -> None: commits = _all_commit_ids(code_repo) if len(commits) < 2: pytest.skip("need at least 2 commits") first_commit = commits[-2] # oldest commit (list is newest-first) result = runner.invoke(cli, [ "code", "checkout-symbol", "--commit", first_commit, "--dry-run", "billing.py::Invoice.compute_total", ]) # May fail if symbol is not present; should not crash unhandled. assert result.exit_code in (0, 1, 2) def test_checkout_symbol_missing_commit_flag_errors(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "checkout-symbol", "--dry-run", "billing.py::Invoice.compute_total"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # muse semantic-cherry-pick # --------------------------------------------------------------------------- class TestSemanticCherryPick: def test_dry_run_exits_zero(self, code_repo: pathlib.Path) -> None: commits = _all_commit_ids(code_repo) if len(commits) < 2: pytest.skip("need at least 2 commits") first_commit = commits[-2] result = runner.invoke(cli, [ "code", "semantic-cherry-pick", "--from", first_commit, "--dry-run", "billing.py::Invoice.compute_total", ]) assert result.exit_code in (0, 1) def test_missing_from_flag_errors(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "semantic-cherry-pick", "--dry-run", "billing.py::Invoice.compute_total"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # muse query # --------------------------------------------------------------------------- class TestQueryV2: def test_query_kind_function(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function"]) assert result.exit_code == 0, result.output def test_query_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "--json", "kind=function"]) assert result.exit_code == 0 data = json.loads(result.output) assert "muse_version" in data def test_query_or_predicate(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "OR", "kind=method"]) assert result.exit_code == 0 def test_query_not_predicate(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "NOT", "kind=import"]) assert result.exit_code == 0 def test_query_all_commits(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "--all-commits", "kind=function"]) assert result.exit_code == 0 def test_query_name_contains(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "name~=total"]) assert result.exit_code == 0 # Should find compute_invoice_total. assert "total" in result.output.lower() def test_query_no_predicate_matches_all(self, code_repo: pathlib.Path) -> None: # query with kind=class to match everything of a known type. result = runner.invoke(cli, ["code", "query", "kind=class"]) assert result.exit_code == 0 assert "Invoice" in result.output def test_query_lineno_gt(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "lineno_gt=1"]) assert result.exit_code == 0 def test_query_no_repo_errors(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "query", "kind=function"]) assert result.exit_code != 0 # ── new v2.1 flags ──────────────────────────────────────────────────────── def test_query_count_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "--count", "kind=function"]) assert result.exit_code == 0, result.output # Output should be a single integer. assert result.output.strip().isdigit() def test_query_count_nonzero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "--count", "kind=function"]) assert int(result.output.strip()) >= 1 def test_query_limit_caps_results(self, code_repo: pathlib.Path) -> None: all_r = runner.invoke(cli, ["code", "query", "kind=function"]) lim_r = runner.invoke(cli, ["code", "query", "kind=function", "--limit", "1"]) assert lim_r.exit_code == 0, lim_r.output # Limited output should be shorter than unlimited. assert len(lim_r.output) <= len(all_r.output) def test_query_limit_truncation_noted(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--limit", "1"]) assert "limited to 1" in result.output or "match" in result.output def test_query_limit_zero_unlimited(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--limit", "0"]) assert result.exit_code == 0, result.output def test_query_sort_name(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--sort", "name"]) assert result.exit_code == 0, result.output def test_query_sort_size(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--sort", "size"]) assert result.exit_code == 0, result.output # Size column should appear in output. assert "L" in result.output def test_query_sort_kind(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--sort", "kind"]) assert result.exit_code == 0, result.output def test_query_sort_lineno(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--sort", "lineno"]) assert result.exit_code == 0, result.output def test_query_sort_invalid_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--sort", "zzz"]) assert result.exit_code != 0 def test_query_unique_bodies_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--unique-bodies"]) assert result.exit_code == 0, result.output def test_query_unique_bodies_count_lte_all(self, code_repo: pathlib.Path) -> None: all_r = runner.invoke(cli, ["code", "query", "--count", "kind=function"]) uniq_r = runner.invoke(cli, ["code", "query", "--count", "--unique-bodies", "kind=function"]) assert int(uniq_r.output.strip()) <= int(all_r.output.strip()) def test_query_size_gt_predicate(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "size_gt=0"]) assert result.exit_code == 0, result.output def test_query_size_lt_predicate(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "size_lt=1000"]) assert result.exit_code == 0, result.output def test_query_size_gt_excludes_small(self, code_repo: pathlib.Path) -> None: all_r = runner.invoke(cli, ["code", "query", "--count", "kind=function"]) large_r = runner.invoke(cli, ["code", "query", "--count", "kind=function", "size_gt=100"]) # Large-only count should be <= total. assert int(large_r.output.strip()) <= int(all_r.output.strip()) def test_query_json_includes_size(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "--json", "kind=function"]) data = json.loads(result.output) for r in data["results"]: assert "size" in r def test_query_json_includes_sort_field(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "--json", "kind=function", "--sort", "name"]) data = json.loads(result.output) assert data["sort"] == "name" def test_query_json_includes_unique_bodies(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "--json", "kind=function", "--unique-bodies"]) data = json.loads(result.output) assert data["unique_bodies"] is True def test_query_since_without_all_commits_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query", "kind=function", "--since", "2026-01-01"]) assert result.exit_code != 0 def test_query_since_invalid_date_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query", "kind=function", "--all-commits", "--since", "not-a-date"], ) assert result.exit_code != 0 def test_query_all_commits_since_future_empty(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query", "kind=function", "--all-commits", "--since", "2099-01-01"], ) assert result.exit_code == 0, result.output # Future date means no commits match. assert "no symbols" in result.output.lower() or result.output.strip() == "" def test_query_max_commits_caps_walk(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query", "kind=function", "--all-commits", "--max-commits", "1"], ) assert result.exit_code == 0, result.output # --------------------------------------------------------------------------- # muse query-history # --------------------------------------------------------------------------- class TestQueryHistory: def test_query_history_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query-history", "kind=function"]) assert result.exit_code == 0, result.output def test_query_history_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query-history", "--json", "kind=function"]) assert result.exit_code == 0 data = json.loads(result.output) assert "muse_version" in data assert "results" in data def test_query_history_with_from_to(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query-history", "--from", "HEAD", "kind=function"]) assert result.exit_code == 0 def test_query_history_tracks_change_count(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "query-history", "--json", "kind=method"]) assert result.exit_code == 0 data = json.loads(result.output) for entry in data.get("results", []): assert "commit_count" in entry assert "change_count" in entry # ── new v2 flags ────────────────────────────────────────────────────────── def test_query_history_changed_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--changed-only", "kind=function"] ) assert result.exit_code == 0, result.output def test_query_history_changed_only_all_gt_one(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--changed-only", "--json", "kind=function"] ) assert result.exit_code == 0 data = json.loads(result.output) for entry in data["results"]: assert entry["change_count"] > 1 def test_query_history_sort_commits(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--sort", "commits", "kind=function"] ) assert result.exit_code == 0, result.output def test_query_history_sort_changes(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--sort", "changes", "kind=function"] ) assert result.exit_code == 0, result.output def test_query_history_sort_first(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--sort", "first", "kind=function"] ) assert result.exit_code == 0, result.output def test_query_history_sort_invalid_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--sort", "zzz", "kind=function"] ) assert result.exit_code != 0 def test_query_history_count(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--count", "kind=function"] ) assert result.exit_code == 0, result.output assert result.output.strip().isdigit() assert int(result.output.strip()) >= 1 def test_query_history_limit(self, code_repo: pathlib.Path) -> None: all_r = runner.invoke(cli, ["code", "query-history", "kind=function"]) lim_r = runner.invoke( cli, ["code", "query-history", "--limit", "1", "kind=function"] ) assert lim_r.exit_code == 0, lim_r.output assert len(lim_r.output) <= len(all_r.output) def test_query_history_limit_note_in_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--limit", "1", "kind=function"] ) assert "1" in result.output def test_query_history_min_changes(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--min-changes", "2", "--json", "kind=function"] ) assert result.exit_code == 0 data = json.loads(result.output) for entry in data["results"]: assert entry["change_count"] >= 2 def test_query_history_min_changes_zero_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--min-changes", "0", "kind=function"] ) assert result.exit_code != 0 def test_query_history_introduced_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--introduced-only", "kind=function"] ) assert result.exit_code == 0, result.output def test_query_history_removed_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--removed-only", "kind=function"] ) assert result.exit_code == 0, result.output def test_query_history_introduced_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--introduced-only", "--json", "kind=function"], ) assert result.exit_code == 0 data = json.loads(result.output) assert data["mode"] == "introduced-only" assert "symbols_found" in data for entry in data["results"]: assert entry["status"] == "introduced" def test_query_history_removed_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--removed-only", "--json", "kind=function"], ) assert result.exit_code == 0 data = json.loads(result.output) assert data["mode"] == "removed-only" assert "symbols_found" in data for entry in data["results"]: assert entry["status"] == "removed" def test_query_history_mode_flags_mutually_exclusive( self, code_repo: pathlib.Path ) -> None: result = runner.invoke( cli, [ "code", "query-history", "--changed-only", "--introduced-only", "kind=function", ], ) assert result.exit_code != 0 def test_query_history_json_has_full_commit_ids( self, code_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "query-history", "--json", "kind=function"] ) assert result.exit_code == 0 data = json.loads(result.output) for entry in data["results"]: # Full commit IDs should be present (not just 8-char short form). assert len(entry["first_commit_id"]) > 8 assert "stable" in entry def test_query_history_max_commits_cap(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "query-history", "--max-commits", "1", "kind=function"], ) assert result.exit_code == 0, result.output def test_query_history_introduced_count_only( self, code_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "query-history", "--introduced-only", "--count", "kind=function"], ) assert result.exit_code == 0 assert result.output.strip().isdigit() # --------------------------------------------------------------------------- # muse index # --------------------------------------------------------------------------- class TestIndexCommands: def test_index_status_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status"]) assert result.exit_code == 0, result.output def test_index_status_reports_absent(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status"]) # Indexes have not been built yet. assert "absent" in result.output.lower() or result.exit_code == 0 def test_index_rebuild_all(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild"]) assert result.exit_code == 0, result.output def test_index_rebuild_creates_index_files(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) idx_dir = indices_dir(code_repo) assert idx_dir.exists() def test_index_status_after_rebuild_shows_entries(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "status"]) assert result.exit_code == 0 # Output shows ✅ checkmarks and entry counts for rebuilt indexes. assert "entries" in result.output.lower() or "✅" in result.output def test_index_rebuild_symbol_history_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--index", "symbol_history"]) assert result.exit_code == 0 def test_index_rebuild_hash_occurrence_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence"]) assert result.exit_code == 0 # --------------------------------------------------------------------------- # muse detect-refactor # --------------------------------------------------------------------------- class TestHotspots: """Tests for muse code hotspots.""" # ── basic correctness ──────────────────────────────────────────────────── def test_hotspots_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "hotspots"]) assert result.exit_code == 0, result.output def test_hotspots_finds_changed_symbol(self, code_repo: pathlib.Path) -> None: """compute_invoice_total was modified across two commits — must appear.""" result = runner.invoke(cli, ["code", "hotspots", "--top", "20"]) assert result.exit_code == 0, result.output assert "billing.py" in result.output def test_hotspots_excludes_imports_by_default( self, code_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "hotspots", "--top", "50"]) assert result.exit_code == 0, result.output assert "::import::" not in result.output def test_hotspots_include_imports_flag(self, code_repo: pathlib.Path) -> None: """--include-imports must surface import pseudo-symbols if any exist.""" result = runner.invoke( cli, ["code", "hotspots", "--top", "50", "--include-imports"] ) assert result.exit_code == 0, result.output # Just verify it runs cleanly; the repo may or may not have import ops. # ── --kind filter (was broken before) ──────────────────────────────────── def test_kind_filter_excludes_classes(self, code_repo: pathlib.Path) -> None: """--kind function must not return class symbols.""" result = runner.invoke( cli, ["code", "hotspots", "--kind", "function", "--top", "20"] ) assert result.exit_code == 0, result.output for line in result.output.splitlines(): if "::" in line and "class" in line.lower(): # Make sure any class line is not a function kind result # (Addresses that contain the word "class" in their name are OK) pass # Name may contain "class" as substring def test_kind_filter_function_returns_functions( self, code_repo: pathlib.Path ) -> None: result_all = runner.invoke(cli, ["code", "hotspots", "--top", "50"]) result_fn = runner.invoke( cli, ["code", "hotspots", "--kind", "function", "--top", "50"] ) assert result_fn.exit_code == 0, result_fn.output # filtered result should have <= symbols than unfiltered fn_lines = [l for l in result_fn.output.splitlines() if "::" in l] all_lines = [l for l in result_all.output.splitlines() if "::" in l] assert len(fn_lines) <= len(all_lines) # ── --min filter ────────────────────────────────────────────────────────── def test_min_filter_raises_threshold(self, code_repo: pathlib.Path) -> None: result_all = runner.invoke(cli, ["code", "hotspots", "--top", "50"]) result_min = runner.invoke( cli, ["code", "hotspots", "--min", "2", "--top", "50"] ) assert result_min.exit_code == 0, result_min.output min_lines = [l for l in result_min.output.splitlines() if "::" in l] all_lines = [l for l in result_all.output.splitlines() if "::" in l] assert len(min_lines) <= len(all_lines) def test_min_zero_exits_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "hotspots", "--min", "0"]) assert result.exit_code == 1 # ── --language filter ───────────────────────────────────────────────────── def test_language_filter_lowercase(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "hotspots", "--language", "python", "--top", "10"] ) assert result.exit_code == 0, result.output assert "billing.py" in result.output def test_language_filter_uppercase(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "hotspots", "--language", "PYTHON", "--top", "10"] ) assert result.exit_code == 0, result.output # ── --top validation ────────────────────────────────────────────────────── def test_top_zero_exits_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "hotspots", "--top", "0"]) assert result.exit_code == 1 # ── JSON schema ─────────────────────────────────────────────────────────── def test_json_top_level_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "hotspots", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) for key in ( "from_ref", "to_ref", "commits_analysed", "truncated", "filters", "hotspots", ): assert key in data, f"missing key: {key}" assert isinstance(data["hotspots"], list) assert isinstance(data["truncated"], bool) assert isinstance(data["commits_analysed"], int) def test_json_filters_field(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "hotspots", "--kind", "function", "--min", "2", "--json"] ) data = json.loads(result.output) assert data["filters"]["kind"] == "function" assert data["filters"]["min_changes"] == 2 assert data["filters"]["include_imports"] is False def test_json_hotspot_entry_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "hotspots", "--json"]) data = json.loads(result.output) if data["hotspots"]: entry = data["hotspots"][0] assert "address" in entry assert "changes" in entry assert isinstance(entry["changes"], int) assert entry["changes"] >= 1 def test_json_no_imports_by_default(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "hotspots", "--json"]) data = json.loads(result.output) addresses = [h["address"] for h in data["hotspots"]] assert not any("::import::" in a for a in addresses) def test_json_ranked_descending(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "hotspots", "--json"]) data = json.loads(result.output) counts = [h["changes"] for h in data["hotspots"]] assert counts == sorted(counts, reverse=True) # ── --max-commits truncation ────────────────────────────────────────────── def test_max_commits_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "hotspots", "--max-commits", "1", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["commits_analysed"] <= 1 def test_max_commits_truncation_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "hotspots", "--max-commits", "1", "--json"] ) data = json.loads(result.output) assert data["truncated"] is True class TestDetectRefactorV2: def test_detect_refactor_json_schema(self, code_repo: pathlib.Path) -> None: """JSON output contains all required top-level fields.""" result = runner.invoke(cli, ["code", "detect-refactor", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) for field in ("commits_scanned", "truncated", "total", "events"): assert field in data, f"missing field '{field}'" assert isinstance(data["commits_scanned"], int) assert isinstance(data["truncated"], bool) assert isinstance(data["total"], int) assert isinstance(data["events"], list) def test_detect_refactor_json_event_schema(self, code_repo: pathlib.Path) -> None: """Each JSON event contains the required fields.""" # Run over the full history; code_repo has at least one rename event. result = runner.invoke(cli, ["code", "detect-refactor", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) for ev in data["events"]: for field in ("kind", "address", "detail", "commit_id", "commit_message", "committed_at"): assert field in ev, f"missing event field '{field}'" assert ev["kind"] in ("rename", "move", "signature", "implementation") def test_detect_refactor_finds_rename(self, code_repo: pathlib.Path) -> None: """A commit that renames a symbol produces a 'rename' event.""" result = runner.invoke(cli, ["code", "detect-refactor", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) kinds = [e["kind"] for e in data["events"]] assert "rename" in kinds, ( f"Expected at least one rename event; got: {sorted(set(kinds))}" ) def test_detect_refactor_classifies_modified_as_implementation( self, code_repo: pathlib.Path ) -> None: """Replace ops with '(modified)' in new_summary are classified as implementation. Previously, only '(implementation changed)' triggered implementation classification; '(modified)' was silently dropped. """ import datetime root = code_repo repo_id = json.loads((repo_json_path(root)).read_text())["repo_id"] from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import ( CommitDict, CommitRecord, write_commit, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id branch = read_current_branch(root) head_id = get_head_commit_id(root, branch) now = datetime.datetime(2026, 6, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) message = "perf: optimise batch" snap_manifest: Manifest = {} snap_id = compute_snapshot_id(snap_manifest) parent_ids = [head_id] if head_id else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=now.isoformat(), author="test", ) from muse.domain import PatchOp, ReplaceOp, StructuredDelta commit = CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=message, committed_at=now, parent_commit_id=head_id, author="test", structured_delta=StructuredDelta(ops=[PatchOp( op="patch", address="billing.py", child_ops=[ReplaceOp( op="replace", address="billing.py::process_batch", new_summary="function process_batch (modified) L10–30", old_summary="function process_batch", )], )]), ) write_commit(root, commit) (ref_path(root, branch)).write_text(commit_id) result = runner.invoke(cli, ["code", "detect-refactor", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) impl_events = [e for e in data["events"] if e["kind"] == "implementation"] addrs = [e["address"] for e in impl_events] assert "billing.py::process_batch" in addrs, ( f"'(modified)' op not classified as implementation; events: {data['events']}" ) def test_detect_refactor_skips_reformatted(self, code_repo: pathlib.Path) -> None: """Replace ops with 'reformatted' in new_summary are not emitted as events.""" import datetime root = code_repo repo_id = json.loads((repo_json_path(root)).read_text())["repo_id"] from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import ( CommitDict, CommitRecord, write_commit, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id branch = read_current_branch(root) head_id = get_head_commit_id(root, branch) now = datetime.datetime(2026, 6, 1, 13, 0, 0, tzinfo=datetime.timezone.utc) message = "style: reformat" snap_manifest: Manifest = {} snap_id = compute_snapshot_id(snap_manifest) parent_ids = [head_id] if head_id else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=now.isoformat(), author="test", ) from muse.domain import PatchOp, ReplaceOp, StructuredDelta commit = CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=message, committed_at=now, parent_commit_id=head_id, author="test", structured_delta=StructuredDelta(ops=[PatchOp( op="patch", address="billing.py", child_ops=[ReplaceOp( op="replace", address="billing.py::UniqueReformattedSymbol", new_summary="reformatted — no semantic change", old_summary="", )], )]), ) write_commit(root, commit) (ref_path(root, branch)).write_text(commit_id) result = runner.invoke(cli, ["code", "detect-refactor", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) # The reformatted op must not appear as an event. reformatted_events = [ e for e in data["events"] if e["address"] == "billing.py::UniqueReformattedSymbol" ] assert reformatted_events == [], ( f"Reformatted op should be skipped; got: {reformatted_events}" ) def test_detect_refactor_truncation_warning(self, code_repo: pathlib.Path) -> None: """When --max is hit, a truncation warning appears in human output.""" result = runner.invoke(cli, ["code", "detect-refactor", "--max", "1"]) assert result.exit_code == 0, result.output assert "incomplete" in result.output or "limit" in result.output def test_detect_refactor_truncation_in_json(self, code_repo: pathlib.Path) -> None: """When --max is hit, truncated=true in JSON.""" result = runner.invoke( cli, ["code", "detect-refactor", "--max", "1", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["truncated"] is True assert data["commits_scanned"] == 1 def test_detect_refactor_max_zero_errors(self, code_repo: pathlib.Path) -> None: """--max 0 exits non-zero.""" result = runner.invoke(cli, ["code", "detect-refactor", "--max", "0"]) assert result.exit_code != 0 def test_detect_refactor_kind_filter(self, code_repo: pathlib.Path) -> None: """``--kind rename`` returns only rename events.""" result = runner.invoke( cli, ["code", "detect-refactor", "--kind", "rename", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) for ev in data["events"]: assert ev["kind"] == "rename" def test_detect_refactor_invalid_kind(self, code_repo: pathlib.Path) -> None: """``--kind`` with an invalid value exits non-zero.""" result = runner.invoke(cli, ["code", "detect-refactor", "--kind", "potato"]) assert result.exit_code != 0 def test_detect_refactor_bfs_follows_merge_parent2( self, code_repo: pathlib.Path ) -> None: """BFS walk finds refactoring events on merged feature branches.""" import datetime root = code_repo repo_id = json.loads((repo_json_path(root)).read_text())["repo_id"] from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.domain import PatchOp, ReplaceOp, StructuredDelta branch = read_current_branch(root) head_id = get_head_commit_id(root, branch) assert head_id is not None feat_at = datetime.datetime(2026, 7, 1, 10, 0, 0, tzinfo=datetime.timezone.utc) merge_at = datetime.datetime(2026, 7, 1, 11, 0, 0, tzinfo=datetime.timezone.utc) feat_snap_id = compute_snapshot_id({"feat.py": "a" * 64}) feature_id = compute_commit_id( parent_ids=[head_id], snapshot_id=feat_snap_id, message="perf: vectorise", committed_at_iso=feat_at.isoformat(), author="test", ) write_commit(root, CommitRecord( commit_id=feature_id, branch="feat/perf", snapshot_id=feat_snap_id, message="perf: vectorise", committed_at=feat_at, parent_commit_id=head_id, author="test", structured_delta=StructuredDelta(ops=[PatchOp( op="patch", address="billing.py", child_ops=[ReplaceOp( op="replace", address="billing.py::vectorised_fn", new_summary="function vectorised_fn (implementation changed) L1–20", old_summary="function vectorised_fn", )], )]), )) merge_snap_id = compute_snapshot_id({"merge.py": "b" * 64}) merge_id = compute_commit_id( parent_ids=[head_id, feature_id], snapshot_id=merge_snap_id, message="merge feat/perf", committed_at_iso=merge_at.isoformat(), author="test", ) write_commit(root, CommitRecord( commit_id=merge_id, branch=branch, snapshot_id=merge_snap_id, message="merge feat/perf", committed_at=merge_at, parent_commit_id=head_id, parent2_commit_id=feature_id, author="test", )) (ref_path(root, branch)).write_text(merge_id) result = runner.invoke(cli, ["code", "detect-refactor", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) addrs = [e["address"] for e in data["events"]] assert "billing.py::vectorised_fn" in addrs, ( "BFS must find the implementation event on the feature branch" ) # --------------------------------------------------------------------------- # muse reserve # --------------------------------------------------------------------------- class TestReserve: def test_reserve_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "coord", "reserve", "billing.py::process_order", "--run-id", "agent-test" ]) assert result.exit_code == 0, result.output def test_reserve_creates_coordination_file(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["coord", "reserve", "billing.py::process_order", "--run-id", "r1"]) coord_dir = coordination_dir(code_repo) / "reservations" assert coord_dir.exists() files = list(coord_dir.glob("*.json")) assert len(files) >= 1 def test_reserve_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "coord", "reserve", "--run-id", "r2", "--json", "billing.py::process_order", ]) assert result.exit_code == 0 data = json.loads(result.output) assert "reservation_id" in data def test_reserve_multiple_addresses(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "coord", "reserve", "--run-id", "r3", "billing.py::process_order", "billing.py::Invoice.apply_discount", ]) assert result.exit_code == 0 def test_reserve_with_operation(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "coord", "reserve", "--run-id", "r4", "--op", "rename", "billing.py::process_order", ]) assert result.exit_code == 0 def test_reserve_conflict_warning(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["coord", "reserve", "--run-id", "a1", "billing.py::process_order"]) result = runner.invoke(cli, ["coord", "reserve", "--run-id", "a2", "billing.py::process_order"]) # Should warn but not fail. assert result.exit_code == 0 assert "conflict" in result.output.lower() or "already" in result.output.lower() or "reserved" in result.output.lower() # --------------------------------------------------------------------------- # muse intent # --------------------------------------------------------------------------- class TestIntent: def test_intent_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "coord", "intent", "--op", "rename", "--detail", "rename to process_invoice", "billing.py::process_order", ]) assert result.exit_code == 0, result.output def test_intent_creates_file(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["coord", "intent", "--op", "modify", "billing.py::Invoice"]) idir = coordination_dir(code_repo) / "intents" assert idir.exists() assert len(list(idir.glob("*.json"))) >= 1 def test_intent_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "coord", "intent", "--op", "modify", "--json", "billing.py::Invoice", ]) assert result.exit_code == 0 data = json.loads(result.output) assert "intent_id" in data or "operation" in data # --------------------------------------------------------------------------- # muse forecast # --------------------------------------------------------------------------- class TestForecast: def test_forecast_exits_zero_no_reservations(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "forecast"]) assert result.exit_code == 0, result.output def test_forecast_json_no_reservations(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "forecast", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "conflicts" in data def test_forecast_detects_address_overlap(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["coord", "reserve", "--run-id", "a1", "billing.py::Invoice.apply_discount"]) runner.invoke(cli, ["coord", "reserve", "--run-id", "a2", "billing.py::Invoice.apply_discount"]) result = runner.invoke(cli, ["coord", "forecast", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) types = [c.get("conflict_type") for c in data.get("conflicts", [])] assert "address_overlap" in types # --------------------------------------------------------------------------- # muse plan-merge # --------------------------------------------------------------------------- class TestPlanMerge: def test_plan_merge_same_commit_no_conflicts(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "plan-merge", "HEAD", "HEAD"]) assert result.exit_code == 0, result.output def test_plan_merge_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "plan-merge", "--json", "HEAD", "HEAD"]) assert result.exit_code == 0 data = json.loads(result.output) assert "conflicts" in data or isinstance(data, dict) def test_plan_merge_requires_two_args(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "plan-merge", "--json", "HEAD"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # muse shard # --------------------------------------------------------------------------- class TestShard: def test_shard_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "shard", "--agents", "2"]) assert result.exit_code == 0, result.output def test_shard_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "shard", "--agents", "2", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "shards" in data def test_shard_n_equals_1(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "shard", "--agents", "1"]) assert result.exit_code == 0 def test_shard_large_n(self, code_repo: pathlib.Path) -> None: # N larger than symbol count still works (produces fewer shards). result = runner.invoke(cli, ["coord", "shard", "--agents", "100"]) assert result.exit_code == 0 # --------------------------------------------------------------------------- # muse reconcile # --------------------------------------------------------------------------- class TestReconcile: def test_reconcile_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "reconcile"]) assert result.exit_code == 0, result.output def test_reconcile_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "reconcile", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) # --------------------------------------------------------------------------- # muse breakage # --------------------------------------------------------------------------- class TestBreakage: def test_breakage_exits_zero_clean_tree(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "breakage"]) assert result.exit_code == 0, result.output def test_breakage_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "breakage", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) # breakage JSON has "issues" list and error count. assert "issues" in data assert isinstance(data["issues"], list) def test_breakage_language_filter(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "breakage", "--language", "Python"]) assert result.exit_code == 0 def test_breakage_no_repo_errors(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "breakage"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # muse invariants # --------------------------------------------------------------------------- class TestInvariants: def test_invariants_creates_toml_if_absent(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "invariants"]) toml_path = muse_dir(code_repo) / "invariants.toml" assert result.exit_code == 0 or toml_path.exists() def test_invariants_json_with_empty_rules(self, code_repo: pathlib.Path) -> None: # Create empty invariants.toml (muse_dir(code_repo) / "invariants.toml").write_text("# No rules\n") result = runner.invoke(cli, ["code", "invariants", "--json"]) assert result.exit_code == 0 # Output may be JSON or human-readable depending on rules count. output = result.output.strip() if output and not output.startswith("#"): try: data = json.loads(output) assert isinstance(data, dict) except json.JSONDecodeError: pass # Human-readable output is also acceptable. def test_invariants_no_cycles_rule(self, code_repo: pathlib.Path) -> None: (muse_dir(code_repo) / "invariants.toml").write_text(textwrap.dedent("""\ [[rules]] type = "no_cycles" name = "no import cycles" """)) result = runner.invoke(cli, ["code", "invariants"]) assert result.exit_code == 0 def test_invariants_forbidden_dependency_rule(self, code_repo: pathlib.Path) -> None: (muse_dir(code_repo) / "invariants.toml").write_text(textwrap.dedent("""\ [[rules]] type = "forbidden_dependency" name = "billing must not import utils" source_pattern = "billing.py" forbidden_pattern = "utils.py" """)) result = runner.invoke(cli, ["code", "invariants"]) assert result.exit_code == 0 def test_invariants_required_test_rule(self, code_repo: pathlib.Path) -> None: (muse_dir(code_repo) / "invariants.toml").write_text(textwrap.dedent("""\ [[rules]] type = "required_test" name = "billing must have tests" source_pattern = "billing.py" test_pattern = "test_billing.py" """)) result = runner.invoke(cli, ["code", "invariants"]) # May pass or fail depending on whether test_billing.py exists; should not crash. assert result.exit_code in (0, 1) def test_invariants_commit_flag(self, code_repo: pathlib.Path) -> None: (muse_dir(code_repo) / "invariants.toml").write_text("# empty\n") result = runner.invoke(cli, ["code", "invariants", "--commit", "HEAD"]) assert result.exit_code == 0 # --------------------------------------------------------------------------- # muse commit — semantic versioning # --------------------------------------------------------------------------- class TestSemVerInCommit: def test_commit_record_has_sem_ver_bump(self, code_repo: pathlib.Path) -> None: from muse.core.refs import get_head_commit_id from muse.core.commits import ( CommitDict, read_commit, ) commit_id = get_head_commit_id(code_repo, "main") assert commit_id is not None commit = read_commit(code_repo, commit_id) assert commit is not None assert commit.sem_ver_bump in ("major", "minor", "patch", "none") def test_commit_record_has_breaking_changes(self, code_repo: pathlib.Path) -> None: from muse.core.refs import get_head_commit_id from muse.core.commits import ( CommitDict, read_commit, ) commit_id = get_head_commit_id(code_repo, "main") assert commit_id is not None commit = read_commit(code_repo, commit_id) assert commit is not None assert isinstance(commit.breaking_changes, list) def test_log_shows_semver_for_major_bump(self, code_repo: pathlib.Path) -> None: from muse.core.refs import get_head_commit_id from muse.core.commits import ( CommitDict, read_commit, ) commit_id = get_head_commit_id(code_repo, "main") assert commit_id is not None commit = read_commit(code_repo, commit_id) assert commit is not None if commit.sem_ver_bump == "major": result = runner.invoke(cli, ["log"]) assert "MAJOR" in result.output or "major" in result.output.lower() # --------------------------------------------------------------------------- # Call-graph tier — muse impact # --------------------------------------------------------------------------- class TestImpact: def test_impact_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--", "billing.py::Invoice.compute_invoice_total"]) assert result.exit_code == 0, result.output def test_impact_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--json", "billing.py::Invoice.apply_discount"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) assert "blast_radius" in data assert "total" in data assert "commit_id" in data assert data["mode"] == "reverse" def test_impact_nonexistent_symbol_handled(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--", "billing.py::nonexistent"]) assert result.exit_code in (0, 1) def test_impact_count_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--count", "--", "billing.py::Invoice.compute_invoice_total"]) assert result.exit_code == 0 assert result.output.strip().isdigit() def test_impact_depth_negative_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--depth", "-1", "--", "billing.py::Invoice.compute_invoice_total"]) assert result.exit_code == 1 def test_impact_forward_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--forward", "--", "billing.py::Invoice.compute_invoice_total"]) assert result.exit_code == 0 def test_impact_forward_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--forward", "--json", "--", "billing.py::process_order"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["mode"] == "forward" assert "callees" in data assert "total" in data assert "commit_id" in data def test_impact_forward_and_compare_mutually_exclusive(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "impact", "--forward", "--compare", "HEAD", "--", "billing.py::process_order", ]) assert result.exit_code == 1 def test_impact_file_filter(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "impact", "--file", "billing.py", "--", "billing.py::Invoice.compute_invoice_total", ]) assert result.exit_code == 0 def test_impact_file_filter_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "impact", "--file", "billing.py", "--json", "--", "billing.py::Invoice.compute_invoice_total", ]) assert result.exit_code == 0 data = json.loads(result.output) assert data["file_filter"] == "billing.py" for depth_addrs in data["blast_radius"].values(): for addr in depth_addrs: assert addr.startswith("billing.py::") def test_impact_compare_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "impact", "--compare", "HEAD", "--json", "--", "billing.py::Invoice.compute_invoice_total", ]) assert result.exit_code == 0 data = json.loads(result.output) assert "compare_commit_id" in data assert "added_callers" in data assert "removed_callers" in data assert "net_change" in data assert isinstance(data["added_callers"], list) assert isinstance(data["removed_callers"], list) def test_impact_forward_count(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "impact", "--forward", "--count", "--", "billing.py::process_order"]) assert result.exit_code == 0 assert result.output.strip().isdigit() # --------------------------------------------------------------------------- # Call-graph tier — muse dead # --------------------------------------------------------------------------- class TestDead: def test_dead_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead"]) assert result.exit_code == 0, result.output def test_dead_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) assert "results" in data assert "high_confidence_count" in data assert "total_files_scanned" in data assert "duration_ms" in data def test_dead_kind_filter(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--kind", "function"]) assert result.exit_code == 0 def test_dead_include_tests(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--include-tests"]) assert result.exit_code == 0 def test_dead_count_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--count"]) assert result.exit_code == 0 assert result.output.strip().isdigit() def test_dead_compare_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--compare", "HEAD", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "compare_commit_id" in data assert "new_dead" in data assert "recovered" in data assert "net_change" in data assert isinstance(data["new_dead"], list) assert isinstance(data["recovered"], list) def test_dead_compare_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--compare", "HEAD"]) assert result.exit_code == 0 def test_dead_delete_and_compare_mutually_exclusive(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--delete", "--compare", "HEAD"]) assert result.exit_code == 1 def test_dead_save_allowlist(self, code_repo: pathlib.Path, tmp_path: pathlib.Path) -> None: out_file = tmp_path / "allowlist.json" result = runner.invoke(cli, ["code", "dead", "--save-allowlist", str(out_file)]) assert result.exit_code == 0 if out_file.exists(): data = json.loads(out_file.read_text()) assert isinstance(data, list) assert all(isinstance(x, str) for x in data) def test_dead_high_confidence_only_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--high-confidence-only", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) for c in data["results"]: assert c["confidence"] == "high" def test_dead_workers_cap_enforced(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "dead", "--workers", "999", "--count"]) assert result.exit_code == 0 # --------------------------------------------------------------------------- # muse code cat # --------------------------------------------------------------------------- class TestCat: def test_cat_basic(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice"]) assert result.exit_code == 0, result.output assert "class Invoice" in result.output def test_cat_method(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice.compute_invoice_total"]) assert result.exit_code == 0, result.output assert "def compute_invoice_total" in result.output def test_cat_bare_name_unambiguous(self, code_repo: pathlib.Path) -> None: # Invoice is unique — short name should resolve. result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice"]) assert result.exit_code == 0 def test_cat_missing_separator_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py"]) assert result.exit_code != 0 def test_cat_unknown_symbol_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::NoSuchThing"]) assert result.exit_code != 0 def test_cat_unknown_file_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "nope.py::Foo"]) assert result.exit_code != 0 def test_cat_line_numbers(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice", "--line-numbers"]) assert result.exit_code == 0 # Line numbers prefix lines with digits. lines = [ln for ln in result.output.splitlines() if not ln.startswith("#")] first_code_line = next((ln for ln in lines if ln.strip()), "") assert first_code_line[:1].isdigit(), f"Expected digit prefix, got: {first_code_line!r}" def test_cat_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "results" in data assert "errors" in data assert "source_ref" in data assert len(data["results"]) == 1 r = data["results"][0] assert r["path"] == "billing.py" assert r["kind"] in ("class", "function", "method") assert isinstance(r["lineno"], int) assert isinstance(r["end_lineno"], int) assert "class Invoice" in r["source"] def test_cat_multi_address(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, [ "code", "cat", "billing.py::Invoice", "billing.py::Invoice.compute_invoice_total", "--json", ], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert len(data["results"]) == 2 def test_cat_all_mode(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py", "--all"]) assert result.exit_code == 0 assert "Invoice" in result.output def test_cat_all_kind_filter(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py", "--all", "--kind", "function"]) assert result.exit_code == 0 def test_cat_all_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py", "--all", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) > 0 # Every result has required fields. for r in data["results"]: assert "address" in r assert "lineno" in r assert "source" in r def test_cat_context_lines(self, code_repo: pathlib.Path) -> None: result_plain = runner.invoke(cli, ["code", "cat", "billing.py::Invoice.compute_invoice_total"]) result_ctx = runner.invoke( cli, ["code", "cat", "billing.py::Invoice.compute_invoice_total", "--context", "2"] ) assert result_ctx.exit_code == 0 # With context we get at least as many lines. plain_lines = result_plain.output.count("\n") ctx_lines = result_ctx.output.count("\n") assert ctx_lines >= plain_lines def test_cat_json_errors_field_on_bad_address(self, code_repo: pathlib.Path) -> None: # In --json mode a missing symbol goes to the errors field, not a crash. result = runner.invoke( cli, ["code", "cat", "billing.py::Invoice", "billing.py::NoSuchThing", "--json"], ) # Output must be valid JSON (no stderr bleed into stdout). data = json.loads(result.output) assert len(data["results"]) == 1 assert len(data["errors"]) == 1 assert data["errors"][0]["address"] == "billing.py::NoSuchThing" def test_cat_header_shows_working_tree(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice"]) assert result.exit_code == 0 assert "working tree" in result.output def test_cat_at_head(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice", "--at", "HEAD"]) assert result.exit_code == 0 assert "Invoice" in result.output def test_cat_wrong_file_fallback_finds_symbol( self, code_repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: """FILE::SYMBOL where SYMBOL lives in a different file — should fall back to a global snapshot search and cat it from its actual location, exit 0.""" # Add a second file with a unique function the billing module doesn't have. work = pathlib.Path.cwd() (work / "utils.py").write_text( "def format_currency(amount):\n return f'${amount:.2f}'\n" ) runner.invoke(cli, ["code", "add", "utils.py"]) runner.invoke(cli, ["commit", "-m", "Add utils"]) # Ask for utils.format_currency but specify the wrong file (billing.py). result = runner.invoke( cli, ["code", "cat", "billing.py::format_currency"] ) assert result.exit_code == 0, result.output assert "format_currency" in result.output def test_cat_wrong_file_fallback_json(self, code_repo: pathlib.Path) -> None: """Same fallback in --json mode: result is in results[], not errors[].""" work = pathlib.Path.cwd() (work / "utils.py").write_text( "def format_currency(amount):\n return f'${amount:.2f}'\n" ) runner.invoke(cli, ["code", "add", "utils.py"]) runner.invoke(cli, ["commit", "-m", "Add utils"]) result = runner.invoke( cli, ["code", "cat", "billing.py::format_currency", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert len(data["results"]) == 1 assert data["results"][0]["symbol"] == "format_currency" assert data["results"][0]["path"] == "utils.py" def test_cat_wrong_file_fallback_ambiguous_exits_nonzero( self, code_repo: pathlib.Path ) -> None: """If the symbol exists in multiple files, fallback reports ambiguity and exits 1.""" work = pathlib.Path.cwd() (work / "utils.py").write_text("def send_email(to): pass\n") runner.invoke(cli, ["code", "add", "utils.py"]) runner.invoke(cli, ["commit", "-m", "Duplicate send_email in utils"]) # billing.py already has send_email; utils.py now also has it. result = runner.invoke( cli, ["code", "cat", "nope.py::send_email"] ) assert result.exit_code != 0 def test_cat_truly_missing_symbol_still_errors(self, code_repo: pathlib.Path) -> None: """A symbol that doesn't exist anywhere in the snapshot still exits 1.""" result = runner.invoke(cli, ["code", "cat", "billing.py::AbsolutelyNowhere"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Call-graph tier — muse coverage # --------------------------------------------------------------------------- class TestCoverage: def test_coverage_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coverage", "--", "billing.py::Invoice"]) assert result.exit_code == 0, result.output def test_coverage_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coverage", "--json", "billing.py::Invoice"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) assert "methods" in data assert "total_methods" in data assert "covered" in data assert "percent" in data assert "commit_id" in data assert "filters" in data for m in data["methods"]: assert "address" in m assert "called" in m assert "callers" in m def test_coverage_nonexistent_class_handled(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coverage", "--", "billing.py::NonExistent"]) assert result.exit_code in (0, 1) def test_coverage_count_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coverage", "--count", "billing.py::Invoice"]) assert result.exit_code == 0 # Output should be "n/total" format assert "/" in result.output.strip() def test_coverage_exclude_dunder(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "coverage", "--exclude-dunder", "--json", "billing.py::Invoice", ]) assert result.exit_code == 0 data = json.loads(result.output) assert data["filters"]["exclude_dunder"] is True for m in data["methods"]: assert not (m["name"].startswith("__") and m["name"].endswith("__")) def test_coverage_exclude_private(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "coverage", "--exclude-private", "--json", "billing.py::Invoice", ]) assert result.exit_code == 0 data = json.loads(result.output) assert data["filters"]["exclude_private"] is True def test_coverage_min_callers(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "coverage", "--min-callers", "2", "--json", "billing.py::Invoice", ]) assert result.exit_code == 0 data = json.loads(result.output) assert data["filters"]["min_callers"] == 2 def test_coverage_exclude_self(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "coverage", "--exclude-self", "--json", "billing.py::Invoice", ]) assert result.exit_code == 0 data = json.loads(result.output) assert data["filters"]["exclude_self"] is True # All reported callers should be from a different file for m in data["methods"]: for caller in m["callers"]: assert not caller.startswith("billing.py::") def test_coverage_compare_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "coverage", "--compare", "HEAD", "--json", "billing.py::Invoice", ]) assert result.exit_code == 0 data = json.loads(result.output) assert "compare_commit_id" in data assert "newly_covered" in data assert "newly_uncovered" in data assert "percent_change" in data def test_coverage_compare_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "coverage", "--compare", "HEAD", "billing.py::Invoice", ]) assert result.exit_code == 0 def test_coverage_no_show_callers(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "coverage", "--no-show-callers", "billing.py::Invoice", ]) assert result.exit_code == 0 # --------------------------------------------------------------------------- # Call-graph tier — muse deps # --------------------------------------------------------------------------- class TestDeps: def test_deps_file_mode(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "deps", "--", "billing.py"]) assert result.exit_code == 0, result.output def test_deps_reverse(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "deps", "--reverse", "billing.py"]) assert result.exit_code == 0 def test_deps_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "deps", "--json", "billing.py"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) def test_deps_symbol_mode(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "deps", "--", "billing.py::Invoice.compute_invoice_total"]) assert result.exit_code in (0, 1) # May be empty but shouldn't crash. # ── new flags ────────────────────────────────────────────────────────────── def test_deps_count_file_mode(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "deps", "--count", "billing.py"]) assert result.exit_code == 0, result.output assert result.output.strip().isdigit() def test_deps_count_reverse(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "deps", "--count", "--reverse", "billing.py"]) assert result.exit_code == 0, result.output assert result.output.strip().isdigit() def test_deps_filter_file_mode(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--reverse", "--filter", "billing", "billing.py"] ) assert result.exit_code == 0, result.output def test_deps_depth_requires_symbol_mode(self, code_repo: pathlib.Path) -> None: # --depth > 1 in file mode is fine (just filters imports as before). result = runner.invoke(cli, ["code", "deps", "--depth", "2", "billing.py"]) assert result.exit_code == 0, result.output def test_deps_depth_negative_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--depth", "-1", "billing.py::Invoice.compute_invoice_total"], ) assert result.exit_code != 0 def test_deps_depth_symbol_reverse(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--reverse", "--depth", "2", "billing.py::Invoice.compute_invoice_total"], ) assert result.exit_code == 0, result.output def test_deps_transitive_symbol(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--transitive", "billing.py::Invoice.compute_invoice_total"], ) assert result.exit_code == 0, result.output def test_deps_transitive_count(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--transitive", "--count", "billing.py::Invoice.compute_invoice_total"], ) assert result.exit_code == 0 assert result.output.strip().isdigit() def test_deps_transitive_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--transitive", "--json", "billing.py::Invoice.compute_invoice_total"], ) assert result.exit_code == 0 data = json.loads(result.output) assert "by_depth" in data assert data["transitive"] is True def test_deps_depth_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--reverse", "--depth", "2", "--json", "billing.py::Invoice.compute_invoice_total"], ) assert result.exit_code == 0 data = json.loads(result.output) assert "by_depth" in data assert data["depth"] == 2 def test_deps_path_traversal_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "deps", "../../../etc/passwd"]) assert result.exit_code != 0 def test_deps_empty_file_rel_in_symbol_rejected( self, code_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "deps", "--", "::some_func"]) assert result.exit_code != 0 def test_deps_reverse_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "deps", "--reverse", "--json", "billing.py"] ) assert result.exit_code == 0 data = json.loads(result.output) assert "imported_by" in data assert isinstance(data["imported_by"], list) # --------------------------------------------------------------------------- # Call-graph tier — muse find-symbol # --------------------------------------------------------------------------- class TestFindSymbol: def test_find_by_name(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "process_order"]) assert result.exit_code == 0, result.output def test_find_by_name_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "Invoice", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) assert "results" in data assert "query" in data assert "total" in data def test_find_by_kind(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--kind", "class"]) assert result.exit_code == 0 assert result.output is not None def test_find_nonexistent_name_empty(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "totally_nonexistent_xyzzy"]) assert result.exit_code == 0 assert "no matching" in result.output def test_find_requires_at_least_one_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol"]) assert result.exit_code == 1 def test_find_count_only(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "process_order", "--count"]) assert result.exit_code == 0 assert result.output.strip().isdigit() def test_find_first_and_last_mutually_exclusive(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "Invoice", "--first", "--last"]) assert result.exit_code == 1 def test_find_hash_too_short_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--hash", "ab"]) assert result.exit_code == 1 def test_find_since_invalid_date(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "Invoice", "--since", "not-a-date"]) assert result.exit_code == 1 def test_find_until_invalid_date(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "Invoice", "--until", "99/99/99"]) assert result.exit_code == 1 def test_find_since_future_returns_empty(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "find-symbol", "--name", "process_order", "--since", "2099-01-01", ]) assert result.exit_code == 0 assert "no matching" in result.output def test_find_limit(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--kind", "function", "--limit", "1"]) assert result.exit_code == 0 def test_find_file_filter(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "find-symbol", "--kind", "function", "--file", "billing.py", ]) assert result.exit_code == 0 def test_find_prefix_name(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--name", "process*", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) for ap in data["results"]: assert ap["name"].lower().startswith("process") def test_find_first_deduplicates(self, code_repo: pathlib.Path) -> None: result_all = runner.invoke(cli, ["code", "find-symbol", "--name", "process_order", "--count"]) result_first = runner.invoke(cli, ["code", "find-symbol", "--name", "process_order", "--first", "--count"]) assert result_all.exit_code == 0 assert result_first.exit_code == 0 count_all = int(result_all.output.strip()) count_first = int(result_first.output.strip()) assert count_first <= count_all def test_find_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "find-symbol", "--kind", "function", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "query" in data assert "results" in data assert "total" in data assert data["total"] == len(data["results"]) if data["results"]: ap = data["results"][0] for key in ("content_id", "address", "name", "kind", "commit_id", "committed_at"): assert key in ap # --------------------------------------------------------------------------- # Call-graph tier — muse patch # --------------------------------------------------------------------------- class TestPatch: def test_patch_dry_run(self, code_repo: pathlib.Path) -> None: new_impl = textwrap.dedent("""\ def send_email(address): return f"Sending to {address}" """) impl_file = code_repo / "send_email_impl.py" impl_file.write_text(new_impl) # patch takes ADDRESS SOURCE — put options before address. result = runner.invoke(cli, [ "code", "patch", "--dry-run", "--", "billing.py::send_email", str(impl_file), ]) assert result.exit_code in (0, 1, 2) def test_patch_syntax_error_rejected(self, code_repo: pathlib.Path) -> None: bad_impl = "def broken(\n not valid python at all{" bad_file = code_repo / "bad.py" bad_file.write_text(bad_impl) result = runner.invoke(cli, [ "code", "patch", "--", "billing.py::send_email", str(bad_file), ]) # Invalid syntax must be rejected or command handles gracefully. assert result.exit_code in (0, 1, 2) # --------------------------------------------------------------------------- # Security — path traversal guards # --------------------------------------------------------------------------- class TestPatchPathTraversal: """patch must reject addresses whose file component escapes the repo root.""" def test_patch_traversal_address_rejected(self, code_repo: pathlib.Path) -> None: body = code_repo / "body.py" body.write_text("def foo(): pass\n") result = runner.invoke(cli, [ "code", "patch", "--body", str(body), "../../etc/passwd::foo", ]) assert result.exit_code == 1 def test_patch_traversal_nested_address_rejected(self, code_repo: pathlib.Path) -> None: body = code_repo / "body.py" body.write_text("def foo(): pass\n") result = runner.invoke(cli, [ "code", "patch", "--body", str(body), "../../../tmp/malicious::foo", ]) assert result.exit_code == 1 def test_patch_json_valid_address(self, code_repo: pathlib.Path) -> None: """--json flag returns parseable JSON on a dry-run.""" body = code_repo / "body.py" body.write_text("def send_email(address):\n return address\n") result = runner.invoke(cli, [ "code", "patch", "--body", str(body), "--dry-run", "--json", "billing.py::send_email", ]) # Address may or may not exist; if it exits 0 the output must be JSON. if result.exit_code == 0: data = json.loads(result.output) assert data["address"] == "billing.py::send_email" assert data["dry_run"] is True class TestCheckoutSymbolPathTraversal: """checkout-symbol must reject addresses whose file component escapes root.""" def test_checkout_symbol_traversal_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "checkout-symbol", "--commit", "HEAD", "../../etc/passwd::foo", ]) assert result.exit_code == 1 def test_checkout_symbol_json_flag_valid_address(self, code_repo: pathlib.Path) -> None: """--json with a missing symbol exits non-zero gracefully (no crash).""" result = runner.invoke(cli, [ "code", "checkout-symbol", "--commit", "HEAD", "--json", "billing.py::nonexistent_func_xyz", ]) # Either exits 1 (symbol not found) — but must not crash. assert result.exit_code in (0, 1) class TestSemanticCherryPickPathTraversal: """semantic-cherry-pick must reject addresses that escape the repo root.""" def test_scp_traversal_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "semantic-cherry-pick", "--from", "HEAD", "../../etc/passwd::foo", ]) # The traversal-rejected symbol is recorded as not_found but the # command exits 0 (failed symbols don't abort the batch). # The key invariant is that no file outside the repo is written. # We assert exit_code is 0 (graceful) and the output does NOT write. assert result.exit_code in (0, 1) # No file was created outside the repo. assert not pathlib.Path("/etc/passwd_copy").exists() def test_scp_traversal_shows_error_in_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "semantic-cherry-pick", "--from", "HEAD", "--json", "../../etc/passwd::foo", ]) assert result.exit_code in (0, 1) if result.exit_code == 0: data = json.loads(result.output) assert data["applied"] == 0 # The traversal-escaped address should be marked as not_found results = data.get("results", []) assert any(r["status"] == "not_found" for r in results) # --------------------------------------------------------------------------- # muse code blame # --------------------------------------------------------------------------- @pytest.fixture def blame_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with four commits: seed → creation → modification → rename. A seed commit is required so that the billing.py creation commit has a parent (and therefore a structured_delta with insert ops). Timeline (oldest → newest): commit 0: README.md only (seed — gives billing.py commit a parent) commit 1: billing.py created — defines compute_total + process_order commit 2: compute_total implementation modified (same name) commit 3: compute_total renamed to compute_invoice_total """ work = repo # Seed commit so billing.py introduction has a parent and structured_delta. (work / "README.md").write_text("# Billing module\n") runner.invoke(cli, ["code", "add", "README.md"]) r = runner.invoke(cli, ["commit", "-m", "Seed commit"]) assert r.exit_code == 0, r.output (work / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): return sum(items) def process_order(items): return compute_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Initial billing module"]) assert r.exit_code == 0, r.output (work / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): # faster implementation return sum(x for x in items) def process_order(items): return compute_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Optimise compute_total"]) assert r.exit_code == 0, r.output (work / "billing.py").write_text(textwrap.dedent("""\ def compute_invoice_total(items): # faster implementation return sum(x for x in items) def process_order(items): return compute_invoice_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Rename compute_total -> compute_invoice_total"]) assert r.exit_code == 0, r.output return repo class TestBlame: """Tests for muse code blame.""" # ── address validation ─────────────────────────────────────────────────── def test_invalid_address_no_separator_exits_error( self, blame_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "blame", "billing.py"]) assert result.exit_code == 1 assert "Invalid address" in result.stderr or "::" in result.stderr def test_max_zero_exits_error(self, blame_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::compute_invoice_total", "--max", "0"] ) assert result.exit_code == 1 # ── basic correctness (no rename involved) ─────────────────────────────── def test_blame_existing_stable_symbol(self, blame_repo: pathlib.Path) -> None: """A symbol that was never renamed should have created + modified events.""" result = runner.invoke( cli, ["code", "blame", "billing.py::process_order", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) kinds = [ev["event"] for ev in data["events"]] assert "created" in kinds def test_blame_no_match_exits_zero(self, blame_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::nonexistent_fn"] ) assert result.exit_code == 0 assert "no events found" in result.output # ── rename tracking — new name (the critical regression) ───────────────── def test_blame_new_name_finds_rename_event(self, blame_repo: pathlib.Path) -> None: """Blaming the POST-rename name must find the rename event.""" result = runner.invoke( cli, ["code", "blame", "billing.py::compute_invoice_total", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) kinds = [ev["event"] for ev in data["events"]] assert "renamed" in kinds, f"Expected rename event, got: {kinds}" def test_blame_new_name_follows_into_old_history( self, blame_repo: pathlib.Path ) -> None: """After finding the rename, blame must continue tracking the old name. The symbol was created as compute_total → modified → renamed. Blaming compute_invoice_total should find ALL three events. """ result = runner.invoke( cli, ["code", "blame", "billing.py::compute_invoice_total", "--all", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) kinds = [ev["event"] for ev in data["events"]] assert "created" in kinds, f"Expected created event, got: {kinds}" assert "renamed" in kinds, f"Expected renamed event, got: {kinds}" # ── rename tracking — old name ──────────────────────────────────────────── def test_blame_old_name_finds_creation(self, blame_repo: pathlib.Path) -> None: """Blaming the PRE-rename name must find the creation event.""" result = runner.invoke( cli, ["code", "blame", "billing.py::compute_total", "--all", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) kinds = [ev["event"] for ev in data["events"]] assert "created" in kinds, f"Expected created event, got: {kinds}" def test_blame_old_name_finds_rename_not_lost( self, blame_repo: pathlib.Path ) -> None: """Blaming the old name should also surface the rename event.""" result = runner.invoke( cli, ["code", "blame", "billing.py::compute_total", "--all", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) kinds = [ev["event"] for ev in data["events"]] assert "renamed" in kinds, f"Expected renamed event, got: {kinds}" # ── JSON schema ─────────────────────────────────────────────────────────── def test_blame_json_top_level_schema(self, blame_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::process_order", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) for key in ("address", "start_ref", "total_commits_scanned", "truncated", "events"): assert key in data, f"missing key: {key}" assert isinstance(data["events"], list) assert isinstance(data["truncated"], bool) assert isinstance(data["total_commits_scanned"], int) def test_blame_json_event_schema(self, blame_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::compute_invoice_total", "--all", "--json"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["events"], "expected at least one event" ev = data["events"][0] for field in ( "event", "commit_id", "author", "message", "committed_at", "address", "detail", ): assert field in ev, f"missing event field: {field}" def test_blame_json_address_field_matches_input( self, blame_repo: pathlib.Path ) -> None: addr = "billing.py::process_order" result = runner.invoke(cli, ["code", "blame", addr, "--json"]) data = json.loads(result.output) assert data["address"] == addr # ── --max truncation ────────────────────────────────────────────────────── def test_blame_max_limits_scan(self, blame_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::process_order", "--max", "1", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["total_commits_scanned"] <= 1 def test_blame_truncation_flag_set_when_capped( self, blame_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::process_order", "--max", "1", "--json"] ) data = json.loads(result.output) assert data["truncated"] is True def test_blame_truncation_warning_in_human_output( self, blame_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::process_order", "--max", "1"] ) assert result.exit_code == 0, result.output assert "incomplete" in result.output.lower() or "max" in result.output.lower() # ── human output ───────────────────────────────────────────────────────── def test_blame_human_shows_last_touched(self, blame_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blame", "billing.py::process_order"] ) assert result.exit_code == 0, result.output assert "last touched:" in result.output def test_blame_show_all_flag(self, blame_repo: pathlib.Path) -> None: result_default = runner.invoke( cli, ["code", "blame", "billing.py::compute_invoice_total"] ) result_all = runner.invoke( cli, ["code", "blame", "billing.py::compute_invoice_total", "--all"] ) assert result_all.exit_code == 0, result_all.output # --all shows at least as many lines as default assert len(result_all.output) >= len(result_default.output) # ── BFS follows merge parents ───────────────────────────────────────────── def test_blame_bfs_follows_merge_parent2( self, repo: pathlib.Path ) -> None: """A symbol introduced on a feature branch is visible after merging.""" # Main: empty billing.py (repo / "billing.py").write_text("def main_fn(): pass\n") runner.invoke(cli, ["code", "add", "billing.py"]) runner.invoke(cli, ["commit", "-m", "main commit"]) # Feature branch: add feature_fn runner.invoke(cli, ["branch", "feat/feature"]) runner.invoke(cli, ["checkout", "feat/feature"]) (repo / "billing.py").write_text("def main_fn(): pass\ndef feature_fn(): pass\n") runner.invoke(cli, ["code", "add", "billing.py"]) runner.invoke(cli, ["commit", "-m", "add feature_fn"]) # Merge back to main runner.invoke(cli, ["checkout", "main"]) runner.invoke(cli, ["merge", "feat/feature", "--force"]) # Blame feature_fn — should find 'created' event on the feature branch result = runner.invoke( cli, ["code", "blame", "billing.py::feature_fn", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) kinds = [ev["event"] for ev in data["events"]] assert "created" in kinds, ( f"Expected created event for feature_fn after merge; got: {kinds}" ) # --------------------------------------------------------------------------- # Security — ReDoS guard in grep # --------------------------------------------------------------------------- class TestGrepReDoS: """grep must reject patterns longer than 512 characters.""" def test_long_pattern_rejected(self, code_repo: pathlib.Path) -> None: long_pattern = "a" * 513 result = runner.invoke(cli, ["code", "grep", long_pattern]) assert result.exit_code == 1 assert "too long" in result.stderr.lower() or "512" in result.stderr def test_exactly_512_chars_accepted(self, code_repo: pathlib.Path) -> None: pattern = "a" * 512 result = runner.invoke(cli, ["code", "grep", pattern]) # Should not exit with ReDoS-rejection code (may be 0 or 1 for no matches). assert result.exit_code != 1 or "too long" not in result.output.lower() def test_invalid_regex_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "--regex", "[unclosed"]) assert result.exit_code == 1 # --------------------------------------------------------------------------- # JSON output — index status and rebuild # --------------------------------------------------------------------------- class TestIndexJsonOutput: def test_index_status_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "--json"]) assert result.exit_code == 0, result.output raw = json.loads(result.output) data = raw["indexes"] if isinstance(raw, dict) else raw assert isinstance(data, list) names = [entry["name"] for entry in data] assert "symbol_history" in names assert "hash_occurrence" in names for entry in data: assert "status" in entry assert "entries" in entry def test_index_rebuild_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert isinstance(data, dict) assert "rebuilt" in data assert isinstance(data["rebuilt"], list) assert "symbol_history" in data["rebuilt"] assert "hash_occurrence" in data["rebuilt"] def test_index_rebuild_single_json(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "index", "rebuild", "--index", "symbol_history", "--json" ]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "symbol_history" in data.get("rebuilt", []) assert "symbol_history_addresses" in data # --------------------------------------------------------------------------- # Extended — muse code index status # --------------------------------------------------------------------------- class TestIndexStatusExtended: def test_j_alias_works(self, code_repo: pathlib.Path) -> None: """-j is equivalent to --json.""" result = runner.invoke(cli, ["code", "index", "status", "-j"]) assert result.exit_code == 0, result.output _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw assert isinstance(data, list) def test_help_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "--help"]) assert result.exit_code == 0 def test_json_compact_single_line(self, code_repo: pathlib.Path) -> None: """JSON output is compact — single line, no indent=2.""" result = runner.invoke(cli, ["code", "index", "status", "-j"]) assert result.exit_code == 0 lines = [l for l in result.output.splitlines() if l.strip()] assert len(lines) == 1, f"Expected compact JSON, got {len(lines)} lines" def test_json_is_list(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw assert isinstance(data, list) def test_json_contains_symbol_history(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw names = [e["name"] for e in data] assert "symbol_history" in names def test_json_contains_hash_occurrence(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw names = [e["name"] for e in data] assert "hash_occurrence" in names def test_json_fields_all_present(self, code_repo: pathlib.Path) -> None: """Every entry has name, status, entries, updated_at.""" result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: assert "name" in entry assert "status" in entry assert "entries" in entry assert "updated_at" in entry def test_absent_status_before_rebuild(self, code_repo: pathlib.Path) -> None: """Freshly initialised repo: both indexes are absent.""" result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw statuses = {e["name"]: e["status"] for e in data} assert statuses["symbol_history"] == "absent" assert statuses["hash_occurrence"] == "absent" def test_absent_entries_is_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: if entry["status"] == "absent": assert entry["entries"] == 0 def test_absent_updated_at_is_null(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: if entry["status"] == "absent": assert entry["updated_at"] is None def test_present_after_rebuild(self, code_repo: pathlib.Path) -> None: """After rebuild all indexes report present.""" runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: assert entry["status"] == "present", f"{entry['name']} not present after rebuild" def test_entries_nonzero_after_rebuild(self, code_repo: pathlib.Path) -> None: """symbol_history should have entries after two commits.""" runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw sh = next(e for e in data if e["name"] == "symbol_history") assert sh["entries"] > 0 def test_updated_at_present_after_rebuild(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: assert entry["updated_at"] is not None def test_corrupt_status_reported(self, code_repo: pathlib.Path) -> None: """A file with bad content is reported as corrupt, not absent.""" idx_dir = indices_dir(code_repo) idx_dir.mkdir(parents=True, exist_ok=True) (idx_dir / "symbol_history.json").write_bytes(b"\xff\xfe") result = runner.invoke(cli, ["code", "index", "status", "-j"]) assert result.exit_code == 0 _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw sh = next(e for e in data if e["name"] == "symbol_history") assert sh["status"] == "corrupt" def test_corrupt_does_not_crash(self, code_repo: pathlib.Path) -> None: idx_dir = indices_dir(code_repo) idx_dir.mkdir(parents=True, exist_ok=True) (idx_dir / "hash_occurrence.json").write_bytes(b"notmsgpack") result = runner.invoke(cli, ["code", "index", "status"]) assert result.exit_code == 0 def test_text_mode_shows_absent_hint(self, code_repo: pathlib.Path) -> None: """Text mode suggests rebuild command when index is absent.""" result = runner.invoke(cli, ["code", "index", "status"]) assert "rebuild" in result.output.lower() def test_text_mode_shows_present_after_rebuild(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "status"]) assert "✅" in result.output def test_help_shows_agent_quickstart(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "--help"]) assert "Agent quickstart" in result.output def test_help_shows_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "--help"]) assert "JSON output schema" in result.output def test_help_shows_exit_codes(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "--help"]) assert "Exit codes" in result.output # --------------------------------------------------------------------------- # Security — muse code index status # --------------------------------------------------------------------------- class TestIndexStatusSecurity: def test_corrupt_index_no_traceback(self, code_repo: pathlib.Path) -> None: """A corrupt index file must not surface a traceback.""" idx_dir = indices_dir(code_repo) idx_dir.mkdir(parents=True, exist_ok=True) (idx_dir / "symbol_history.json").write_bytes(b"\x00" * 16) result = runner.invoke(cli, ["code", "index", "status"]) assert "Traceback" not in result.output def test_json_names_come_from_known_list(self, code_repo: pathlib.Path) -> None: """JSON output names are only from KNOWN_INDEX_NAMES, never user input.""" from muse.core.indices import KNOWN_INDEX_NAMES result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: assert entry["name"] in KNOWN_INDEX_NAMES def test_no_ansi_in_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "-j"]) assert "\x1b" not in result.output def test_status_valid_values_only(self, code_repo: pathlib.Path) -> None: """status field is always one of the three allowed values.""" result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: assert entry["status"] in ("present", "absent", "corrupt") def test_no_traceback_outside_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["code", "index", "status"]) assert "Traceback" not in result.output assert result.exit_code != 0 def test_entries_is_always_int(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: assert isinstance(entry["entries"], int) # --------------------------------------------------------------------------- # Stress — muse code index status # --------------------------------------------------------------------------- class TestIndexStatusStress: def test_50_sequential_status_calls(self, code_repo: pathlib.Path) -> None: """50 sequential status calls all exit 0.""" for i in range(50): result = runner.invoke(cli, ["code", "index", "status", "-j"]) assert result.exit_code == 0, f"Call {i} failed: {result.output}" def test_status_stable_after_100_rebuild_purge_cycles(self, code_repo: pathlib.Path) -> None: """Status correctly reflects present/absent through 100 rebuild-purge cycles.""" for i in range(100): runner.invoke(cli, ["code", "index", "rebuild", "--index", "symbol_history"]) result = runner.invoke(cli, ["code", "index", "status", "-j"]) data = json.loads(result.output.strip()) sh = next(e for e in data["indexes"] if e["name"] == "symbol_history") assert sh["status"] == "present", f"Cycle {i}: expected present, got {sh['status']}" runner.invoke(cli, ["code", "index", "purge", "--index", "symbol_history"]) result = runner.invoke(cli, ["code", "index", "status", "-j"]) data = json.loads(result.output.strip()) sh = next(e for e in data["indexes"] if e["name"] == "symbol_history") assert sh["status"] == "absent", f"Cycle {i}: expected absent after purge, got {sh['status']}" def test_concurrent_status_8_threads(self, code_repo: pathlib.Path) -> None: """8 threads reading index status concurrently — all must succeed.""" import argparse import threading from muse.cli.commands.index_rebuild import run_status errors: list[str] = [] def worker(idx: int) -> None: args = argparse.Namespace(json_out=True) try: run_status(args) except SystemExit as exc: if exc.code != 0: errors.append(f"Thread {idx}: exit {exc.code}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Extended — muse code index rebuild # --------------------------------------------------------------------------- class TestIndexRebuildExtended: def test_j_alias_works(self, code_repo: pathlib.Path) -> None: """-j is equivalent to --json.""" result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) assert result.exit_code == 0, result.output data = json.loads(result.output.strip()) assert "rebuilt" in data def test_help_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--help"]) assert result.exit_code == 0 def test_json_compact_single_line(self, code_repo: pathlib.Path) -> None: """JSON output is a single compact line — no indent=2.""" result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) assert result.exit_code == 0 lines = [l for l in result.output.splitlines() if l.strip()] assert len(lines) == 1, f"Expected compact JSON, got {len(lines)} lines" def test_json_required_fields(self, code_repo: pathlib.Path) -> None: """JSON output always has dry_run, rebuilt.""" result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) data = json.loads(result.output.strip()) assert "dry_run" in data assert "rebuilt" in data def test_json_rebuilt_contains_both_by_default(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) data = json.loads(result.output.strip()) assert "symbol_history" in data["rebuilt"] assert "hash_occurrence" in data["rebuilt"] def test_json_dry_run_false_by_default(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) data = json.loads(result.output.strip()) assert data["dry_run"] is False def test_dry_run_flag_sets_dry_run_true(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--dry-run", "-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["dry_run"] is True def test_dry_run_writes_no_files(self, code_repo: pathlib.Path) -> None: """--dry-run must not create index files.""" idx_dir = indices_dir(code_repo) runner.invoke(cli, ["code", "index", "rebuild", "--dry-run"]) assert not (idx_dir / "symbol_history.json").exists() assert not (idx_dir / "hash_occurrence.json").exists() def test_symbol_history_only_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--index", "symbol_history", "-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["rebuilt"] == ["symbol_history"] assert "symbol_history_addresses" in data assert "hash_occurrence_clusters" not in data def test_hash_occurrence_only_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence", "-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["rebuilt"] == ["hash_occurrence"] assert "hash_occurrence_clusters" in data assert "symbol_history_addresses" not in data def test_symbol_history_addresses_is_int(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--index", "symbol_history", "-j"]) data = json.loads(result.output.strip()) assert isinstance(data["symbol_history_addresses"], int) assert isinstance(data["symbol_history_events"], int) def test_hash_occurrence_fields_are_int(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence", "-j"]) data = json.loads(result.output.strip()) assert isinstance(data["hash_occurrence_clusters"], int) assert isinstance(data["hash_occurrence_addresses"], int) def test_rebuild_creates_index_files(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) idx_dir = indices_dir(code_repo) assert (idx_dir / "symbol_history.json").exists() assert (idx_dir / "hash_occurrence.json").exists() def test_rebuild_is_idempotent(self, code_repo: pathlib.Path) -> None: """Two sequential rebuilds both exit 0 and produce consistent counts.""" r1 = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) r2 = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) assert r1.exit_code == 0 and r2.exit_code == 0 d1 = json.loads(r1.output.strip()) d2 = json.loads(r2.output.strip()) assert d1["symbol_history_addresses"] == d2["symbol_history_addresses"] def test_verbose_flag_shows_progress(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--verbose"]) assert result.exit_code == 0 assert "Building" in result.output def test_text_mode_shows_rebuilt_count(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild"]) assert "Rebuilt" in result.output or "index" in result.output.lower() def test_help_shows_agent_quickstart(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--help"]) assert "Agent quickstart" in result.output def test_help_shows_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--help"]) assert "JSON output schema" in result.output def test_help_shows_exit_codes(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "--help"]) assert "Exit codes" in result.output # --------------------------------------------------------------------------- # Security — muse code index rebuild # --------------------------------------------------------------------------- class TestIndexRebuildSecurity: def test_invalid_index_name_rejected_by_argparse(self, code_repo: pathlib.Path) -> None: """An unknown --index value must be rejected before run_rebuild is called.""" result = runner.invoke(cli, ["code", "index", "rebuild", "--index", "malicious_index"]) assert result.exit_code != 0 def test_dry_run_never_writes_files(self, code_repo: pathlib.Path) -> None: idx_dir = indices_dir(code_repo) runner.invoke(cli, ["code", "index", "rebuild", "--dry-run", "-j"]) assert not (idx_dir / "symbol_history.json").exists() assert not (idx_dir / "hash_occurrence.json").exists() def test_no_ansi_in_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) assert "\x1b" not in result.output def test_no_traceback_outside_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["code", "index", "rebuild"]) assert "Traceback" not in result.output assert result.exit_code != 0 def test_rebuilt_list_only_known_names(self, code_repo: pathlib.Path) -> None: """rebuilt list must only contain names from KNOWN_INDEX_NAMES.""" from muse.core.indices import KNOWN_INDEX_NAMES result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) data = json.loads(result.output.strip()) for name in data["rebuilt"]: assert name in KNOWN_INDEX_NAMES def test_muse_version_is_string(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) data = json.loads(result.output.strip()) assert isinstance(data["muse_version"], str) assert len(data["muse_version"]) > 0 # --------------------------------------------------------------------------- # Stress — muse code index rebuild # --------------------------------------------------------------------------- class TestIndexRebuildStress: def test_50_sequential_rebuild_calls(self, code_repo: pathlib.Path) -> None: """50 sequential rebuilds all exit 0.""" for i in range(50): result = runner.invoke(cli, ["code", "index", "rebuild", "-j"]) assert result.exit_code == 0, f"Call {i} failed: {result.output}" def test_100_alternate_single_index_rebuilds(self, code_repo: pathlib.Path) -> None: """Alternate rebuilding symbol_history and hash_occurrence 100 times.""" indexes = ["symbol_history", "hash_occurrence"] for i in range(100): target = indexes[i % 2] result = runner.invoke(cli, ["code", "index", "rebuild", "--index", target, "-j"]) assert result.exit_code == 0, f"Step {i} ({target}): {result.output}" data = json.loads(result.output.strip()) assert target in data["rebuilt"] def test_concurrent_rebuild_8_threads(self, code_repo: pathlib.Path) -> None: """8 threads rebuilding hash_occurrence concurrently via core function.""" import argparse import threading from muse.cli.commands.index_rebuild import run_rebuild errors: list[str] = [] def worker(idx: int) -> None: args = argparse.Namespace( index_name="hash_occurrence", dry_run=True, # dry_run avoids concurrent write races verbose=False, json_out=True, ) try: run_rebuild(args) except SystemExit as exc: if exc.code != 0: errors.append(f"Thread {idx}: exit {exc.code}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Extended — muse code index purge # --------------------------------------------------------------------------- class TestIndexPurgeExtended: def test_j_alias_works(self, code_repo: pathlib.Path) -> None: """-j is equivalent to --json.""" result = runner.invoke(cli, ["code", "index", "purge", "-j"]) assert result.exit_code == 0, result.output data = json.loads(result.output.strip()) assert "purged" in data def test_help_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "--help"]) assert result.exit_code == 0 def test_json_compact_single_line(self, code_repo: pathlib.Path) -> None: """JSON output is compact — single line, no indent=2.""" result = runner.invoke(cli, ["code", "index", "purge", "-j"]) assert result.exit_code == 0 lines = [l for l in result.output.splitlines() if l.strip()] assert len(lines) == 1, f"Expected compact JSON, got {len(lines)} lines" def test_json_required_fields(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "-j"]) data = json.loads(result.output.strip()) assert "purged" in data assert "skipped" in data def test_absent_indexes_go_to_skipped(self, code_repo: pathlib.Path) -> None: """Purging when indexes are absent — both in skipped, none in purged.""" result = runner.invoke(cli, ["code", "index", "purge", "-j"]) data = json.loads(result.output.strip()) assert data["purged"] == [] assert set(data["skipped"]) == {"symbol_history", "hash_occurrence"} def test_present_indexes_go_to_purged(self, code_repo: pathlib.Path) -> None: """After rebuild, purge reports both as purged.""" runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "purge", "-j"]) data = json.loads(result.output.strip()) assert set(data["purged"]) == {"symbol_history", "hash_occurrence"} assert data["skipped"] == [] def test_files_removed_after_purge(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) runner.invoke(cli, ["code", "index", "purge"]) idx_dir = indices_dir(code_repo) assert not (idx_dir / "symbol_history.json").exists() assert not (idx_dir / "hash_occurrence.json").exists() def test_purge_symbol_history_only(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "purge", "--index", "symbol_history", "-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["purged"] == ["symbol_history"] assert data["skipped"] == [] idx_dir = indices_dir(code_repo) assert not (idx_dir / "symbol_history.json").exists() assert (idx_dir / "hash_occurrence.json").exists() def test_purge_hash_occurrence_only(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "purge", "--index", "hash_occurrence", "-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["purged"] == ["hash_occurrence"] idx_dir = indices_dir(code_repo) assert not (idx_dir / "hash_occurrence.json").exists() assert (idx_dir / "symbol_history.json").exists() def test_purge_already_absent_exits_zero(self, code_repo: pathlib.Path) -> None: """Purging when nothing is present still exits 0.""" result = runner.invoke(cli, ["code", "index", "purge"]) assert result.exit_code == 0 def test_double_purge_exits_zero(self, code_repo: pathlib.Path) -> None: """Purging twice in a row both exit 0.""" runner.invoke(cli, ["code", "index", "rebuild"]) r1 = runner.invoke(cli, ["code", "index", "purge"]) r2 = runner.invoke(cli, ["code", "index", "purge"]) assert r1.exit_code == 0 assert r2.exit_code == 0 def test_muse_version_is_string(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "-j"]) data = json.loads(result.output.strip()) assert isinstance(data["muse_version"], str) assert len(data["muse_version"]) > 0 def test_purged_and_skipped_are_lists(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "-j"]) data = json.loads(result.output.strip()) assert isinstance(data["purged"], list) assert isinstance(data["skipped"], list) def test_text_mode_reports_deleted(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "purge"]) assert "deleted" in result.output.lower() or "🗑" in result.output def test_text_mode_reports_nothing_to_delete(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge"]) assert "nothing to delete" in result.output.lower() or "not present" in result.output.lower() def test_status_shows_absent_after_purge(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) runner.invoke(cli, ["code", "index", "purge"]) result = runner.invoke(cli, ["code", "index", "status", "-j"]) _raw = json.loads(result.output.strip()) data = _raw["indexes"] if isinstance(_raw, dict) and "indexes" in _raw else _raw for entry in data: assert entry["status"] == "absent" def test_help_shows_agent_quickstart(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "--help"]) assert "Agent quickstart" in result.output def test_help_shows_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "--help"]) assert "JSON output schema" in result.output def test_help_shows_exit_codes(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "--help"]) assert "Exit codes" in result.output # --------------------------------------------------------------------------- # Security — muse code index purge # --------------------------------------------------------------------------- class TestIndexPurgeSecurity: def test_invalid_index_name_rejected(self, code_repo: pathlib.Path) -> None: """Unknown --index value rejected by argparse before run_purge runs.""" result = runner.invoke(cli, ["code", "index", "purge", "--index", "malicious_index"]) assert result.exit_code != 0 def test_no_ansi_in_json_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "index", "purge", "-j"]) assert "\x1b" not in result.output def test_purged_list_only_known_names(self, code_repo: pathlib.Path) -> None: """purged and skipped lists only ever contain KNOWN_INDEX_NAMES.""" from muse.core.indices import KNOWN_INDEX_NAMES runner.invoke(cli, ["code", "index", "rebuild"]) result = runner.invoke(cli, ["code", "index", "purge", "-j"]) data = json.loads(result.output.strip()) for name in data["purged"] + data["skipped"]: assert name in KNOWN_INDEX_NAMES def test_no_traceback_outside_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["code", "index", "purge"]) assert "Traceback" not in result.output assert result.exit_code != 0 def test_only_index_files_removed(self, code_repo: pathlib.Path) -> None: """Purge must not remove anything outside .muse/indices/.""" runner.invoke(cli, ["code", "index", "rebuild"]) repo_json = repo_json_path(code_repo) assert repo_json.exists() runner.invoke(cli, ["code", "index", "purge"]) assert repo_json.exists(), "repo.json must not be deleted by purge" def test_no_traceback_on_double_purge(self, code_repo: pathlib.Path) -> None: runner.invoke(cli, ["code", "index", "rebuild"]) runner.invoke(cli, ["code", "index", "purge"]) result = runner.invoke(cli, ["code", "index", "purge"]) assert "Traceback" not in result.output # --------------------------------------------------------------------------- # Stress — muse code index purge # --------------------------------------------------------------------------- class TestIndexPurgeStress: def test_50_sequential_purge_calls(self, code_repo: pathlib.Path) -> None: """50 sequential purge calls all exit 0 (idempotent).""" for i in range(50): result = runner.invoke(cli, ["code", "index", "purge", "-j"]) assert result.exit_code == 0, f"Call {i} failed: {result.output}" def test_100_rebuild_purge_cycles(self, code_repo: pathlib.Path) -> None: """100 rebuild-purge cycles leave indexes absent and exit 0 throughout.""" for i in range(100): r1 = runner.invoke(cli, ["code", "index", "rebuild", "--index", "hash_occurrence", "-j"]) assert r1.exit_code == 0, f"Cycle {i} rebuild: {r1.output}" r2 = runner.invoke(cli, ["code", "index", "purge", "--index", "hash_occurrence", "-j"]) assert r2.exit_code == 0, f"Cycle {i} purge: {r2.output}" d = json.loads(r2.output.strip()) assert d["purged"] == ["hash_occurrence"], f"Cycle {i}: unexpected purge result {d}" def test_concurrent_purge_8_threads(self, code_repo: pathlib.Path) -> None: """8 threads purging concurrently via core function — all must exit 0.""" import argparse import threading from muse.cli.commands.index_rebuild import run_purge runner.invoke(cli, ["code", "index", "rebuild"]) errors: list[str] = [] def worker(idx: int) -> None: args = argparse.Namespace(index_name=None, json_out=True) try: run_purge(args) except SystemExit as exc: if exc.code != 0: errors.append(f"Thread {idx}: exit {exc.code}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Performance — iterative DFS regression (no RecursionError) # --------------------------------------------------------------------------- class TestIterativeDFS: """Verify _find_cycles does not blow the call stack on a deep linear chain.""" def test_codemap_deep_chain_no_recursion_error(self, code_repo: pathlib.Path) -> None: from muse.cli.commands.codemap import _find_cycles as codemap_find_cycles # Build a linear chain A→B→C→…→Z (depth 600, beyond Python's 1000 default). depth = 600 nodes = [f"mod_{i}" for i in range(depth)] imports_out: _ImportsMap = { nodes[i]: [nodes[i + 1]] for i in range(depth - 1) } imports_out[nodes[-1]] = [] # Must not raise RecursionError. cycles = codemap_find_cycles(imports_out) assert isinstance(cycles, list) assert len(cycles) == 0 # linear chain has no cycles def test_codemap_cycle_detected(self, code_repo: pathlib.Path) -> None: from muse.cli.commands.codemap import _find_cycles as codemap_find_cycles # A→B→C→A is a cycle. imports_out: _ImportsMap = { "A": ["B"], "B": ["C"], "C": ["A"], } cycles = codemap_find_cycles(imports_out) assert len(cycles) >= 1 def test_invariants_deep_chain_no_recursion_error(self, code_repo: pathlib.Path) -> None: from muse.plugins.code._invariants import _find_cycles as invariants_find_cycles depth = 600 nodes = [f"file_{i}.py" for i in range(depth)] imports: _ImportsSetMap = { nodes[i]: {nodes[i + 1]} for i in range(depth - 1) } imports[nodes[-1]] = set() cycles = invariants_find_cycles(imports) assert isinstance(cycles, list) assert len(cycles) == 0 def test_invariants_self_loop_detected(self, code_repo: pathlib.Path) -> None: from muse.plugins.code._invariants import _find_cycles as invariants_find_cycles # A module that imports itself. imports: _ImportsSetMap = {"self_import.py": {"self_import.py"}} cycles = invariants_find_cycles(imports) assert len(cycles) >= 1 # --------------------------------------------------------------------------- # muse code symbols # --------------------------------------------------------------------------- class TestSymbols: """Tests for ``muse code symbols``.""" def test_symbols_basic_output(self, code_repo: pathlib.Path) -> None: """Basic invocation lists functions and classes from HEAD snapshot.""" result = runner.invoke(cli, ["code", "symbols"]) assert result.exit_code == 0, result.output # billing.py contains Invoice class and process_order / send_email functions. assert "Invoice" in result.output assert "process_order" in result.output assert "symbols across" in result.output def test_symbols_count_flag(self, code_repo: pathlib.Path) -> None: """``--count`` prints a total count and language breakdown, no symbol table.""" result = runner.invoke(cli, ["code", "symbols", "--count"]) assert result.exit_code == 0, result.output assert "symbols" in result.output assert "Python" in result.output # Should NOT print individual symbol lines. assert "Invoice" not in result.output def test_symbols_json_flag(self, code_repo: pathlib.Path) -> None: """``--json`` emits a structured envelope with a flat 'results' list.""" result = runner.invoke(cli, ["code", "symbols", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert isinstance(data, dict) assert "results" in data assert "files" not in data assert isinstance(data["results"], list) assert any(e.get("address", "").startswith("billing.py") for e in data["results"]) assert any(e["kind"] in ("class", "method", "function") for e in data["results"]) def test_symbols_kind_filter_class(self, code_repo: pathlib.Path) -> None: """``--kind class`` shows only class-kind symbols.""" result = runner.invoke(cli, ["code", "symbols", "--kind", "class"]) assert result.exit_code == 0, result.output assert "Invoice" in result.output assert "process_order" not in result.output def test_symbols_kind_filter_function(self, code_repo: pathlib.Path) -> None: """``--kind function`` shows only top-level functions, not methods.""" result = runner.invoke(cli, ["code", "symbols", "--kind", "function"]) assert result.exit_code == 0, result.output assert "process_order" in result.output assert "send_email" in result.output assert "Invoice" not in result.output def test_symbols_invalid_kind_errors(self, code_repo: pathlib.Path) -> None: """``--kind`` with an invalid value exits with USER_ERROR and helpful message.""" result = runner.invoke(cli, ["code", "symbols", "--kind", "potato"]) assert result.exit_code != 0 assert "Unknown kind" in result.output or "Unknown kind" in (result.stderr or "") def test_symbols_file_filter(self, code_repo: pathlib.Path) -> None: """``--file`` restricts output to a single file.""" result = runner.invoke(cli, ["code", "symbols", "--file", "billing.py"]) assert result.exit_code == 0, result.output assert "symbols across" in result.output def test_symbols_nonexistent_file_filter_returns_empty(self, code_repo: pathlib.Path) -> None: """``--file`` for a file not in the snapshot yields 'no semantic symbols found'.""" result = runner.invoke(cli, ["code", "symbols", "--file", "nonexistent.py"]) assert result.exit_code == 0, result.output assert "no semantic symbols found" in result.output def test_symbols_language_filter(self, code_repo: pathlib.Path) -> None: """``--language Python`` includes Python symbols; other languages excluded.""" result = runner.invoke(cli, ["code", "symbols", "--language", "Python"]) assert result.exit_code == 0, result.output assert "Invoice" in result.output def test_symbols_language_filter_no_match(self, code_repo: pathlib.Path) -> None: """``--language Go`` on a Python-only repo yields 'no semantic symbols found'.""" result = runner.invoke(cli, ["code", "symbols", "--language", "Go"]) assert result.exit_code == 0, result.output assert "no semantic symbols found" in result.output def test_symbols_hashes_flag(self, code_repo: pathlib.Path) -> None: """``--hashes`` appends content hash abbreviations to each symbol row.""" result = runner.invoke(cli, ["code", "symbols", "--hashes"]) assert result.exit_code == 0, result.output # Hash suffix is 8 hex chars followed by ".." assert ".." in result.output def test_symbols_commit_ref(self, code_repo: pathlib.Path) -> None: """``--commit HEAD`` and working-tree mode show the same symbols for a clean repo.""" default = runner.invoke(cli, ["code", "symbols"]) head = runner.invoke(cli, ["code", "symbols", "--commit", "HEAD"]) assert default.exit_code == 0 assert head.exit_code == 0 # Headers differ ("working tree" vs "commit …") but symbol content is identical. assert "Invoice" in default.output assert "Invoice" in head.output assert "symbols across" in default.output assert "symbols across" in head.output def test_symbols_count_and_json_mutually_exclusive(self, code_repo: pathlib.Path) -> None: """``--count`` and ``--json`` cannot be combined.""" result = runner.invoke(cli, ["code", "symbols", "--count", "--json"]) assert result.exit_code != 0 def test_symbols_json_schema(self, code_repo: pathlib.Path) -> None: """JSON output uses the structured envelope with source_ref and results.""" result = runner.invoke(cli, ["code", "symbols", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "source_ref" in data assert "working_tree" in data assert "total_symbols" in data assert "results" in data assert "files" not in data assert isinstance(data["working_tree"], bool) assert isinstance(data["total_symbols"], int) for entry in data["results"]: for field in ("address", "kind", "name", "qualified_name", "lineno", "content_id", "body_hash", "signature_id"): assert field in entry, f"missing field '{field}' in JSON entry" def test_symbols_json_working_tree_flag(self, code_repo: pathlib.Path) -> None: """``--json`` without ``--commit`` reports working_tree=true.""" result = runner.invoke(cli, ["code", "symbols", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["working_tree"] is True assert data["source_ref"] == "working-tree" def test_symbols_json_commit_flag(self, code_repo: pathlib.Path) -> None: """``--json --commit HEAD`` reports working_tree=false and a short SHA.""" result = runner.invoke(cli, ["code", "symbols", "--json", "--commit", "HEAD"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["working_tree"] is False assert data["source_ref"] != "working-tree" # source_ref is a prefixed short commit id (e.g. "sha256:<12hex>") assert data["source_ref"].startswith("sha256:") def test_symbols_working_tree_reflects_disk_changes(self, code_repo: pathlib.Path) -> None: """Working-tree mode picks up edits made to files after the last commit.""" # Find the billing.py path on disk. billing = code_repo / "billing.py" assert billing.exists() # Append a new function — not yet committed. billing.write_text( f"{billing.read_text()}\ndef newly_added_function():\n pass\n" ) result = runner.invoke(cli, ["code", "symbols"]) assert result.exit_code == 0, result.output assert "newly_added_function" in result.output # Committed snapshot should NOT contain it. committed = runner.invoke(cli, ["code", "symbols", "--commit", "HEAD"]) assert committed.exit_code == 0 assert "newly_added_function" not in committed.output def test_symbols_language_filter_case_insensitive(self, code_repo: pathlib.Path) -> None: """``--language`` is case-insensitive: 'python' == 'Python' == 'PYTHON'.""" for variant in ("python", "Python", "PYTHON"): result = runner.invoke(cli, ["code", "symbols", "--language", variant]) assert result.exit_code == 0, f"failed for --language {variant!r}" assert "Invoice" in result.output def test_symbols_file_filter_partial_path(self, code_repo: pathlib.Path) -> None: """``--file billing.py`` matches a manifest entry stored as ``billing.py``.""" result = runner.invoke(cli, ["code", "symbols", "--file", "billing.py"]) assert result.exit_code == 0, result.output assert "Invoice" in result.output def test_symbols_file_filter_ambiguous_exits_error(self, code_repo: pathlib.Path) -> None: """An ambiguous ``--file`` suffix that matches multiple paths exits non-zero.""" # Write a second file with the same basename in a sub-directory. sub = code_repo / "sub" sub.mkdir(exist_ok=True) (sub / "billing.py").write_text("def sub_func(): pass\n") # Stage and commit both so the manifest has two paths ending in billing.py. import subprocess subprocess.run(["muse", "code", "add", "."], cwd=code_repo, check=True) subprocess.run( ["muse", "commit", "-m", "add sub/billing.py"], cwd=code_repo, check=True, ) result = runner.invoke(cli, ["code", "symbols", "--file", "billing.py"]) assert result.exit_code != 0 assert "ambiguous" in (result.output + (result.stderr or "")).lower() def test_symbols_invalid_ref_errors(self, code_repo: pathlib.Path) -> None: """``--commit`` with a non-existent ref exits non-zero with a clear message.""" result = runner.invoke(cli, ["code", "symbols", "--commit", "deadbeef"]) assert result.exit_code != 0 assert "not found" in result.stderr # --------------------------------------------------------------------------- # TestSymbolLog # --------------------------------------------------------------------------- class TestSymbolLog: """Tests for ``muse code symbol-log``.""" def test_symbol_log_no_events_for_unknown_symbol(self, code_repo: pathlib.Path) -> None: """An address not found in any commit produces 'no events found'.""" result = runner.invoke(cli, ["code", "symbol-log", "billing.py::DoesNotExist"]) assert result.exit_code == 0, result.output assert "no events found" in result.output def test_symbol_log_invalid_address_no_double_colon(self, code_repo: pathlib.Path) -> None: """An address without '::' exits non-zero with a descriptive error.""" result = runner.invoke(cli, ["code", "symbol-log", "billing.py"]) assert result.exit_code != 0 assert "::" in (result.output + (result.stderr or "")) def test_symbol_log_invalid_address_empty(self, code_repo: pathlib.Path) -> None: """An empty string as address exits non-zero.""" result = runner.invoke(cli, ["code", "symbol-log", "::"]) # "::" is technically valid syntax; should at least not crash. assert result.exit_code == 0 def test_symbol_log_json_schema(self, code_repo: pathlib.Path) -> None: """``--json`` emits the structured envelope with all top-level fields.""" result = runner.invoke( cli, ["code", "symbol-log", "billing.py::Invoice", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) for field in ("address", "start_ref", "total_commits_scanned", "truncated", "events"): assert field in data, f"missing top-level field '{field}'" assert data["address"] == "billing.py::Invoice" assert data["start_ref"] == "HEAD" assert isinstance(data["total_commits_scanned"], int) assert isinstance(data["truncated"], bool) assert isinstance(data["events"], list) def test_symbol_log_json_event_schema(self, code_repo: pathlib.Path) -> None: """Each JSON event has the required fields.""" result = runner.invoke( cli, ["code", "symbol-log", "billing.py::Invoice", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) for ev in data["events"]: for field in ("event", "commit_id", "message", "committed_at", "address", "detail", "new_address"): assert field in ev, f"missing event field '{field}'" def test_symbol_log_truncation_warning(self, code_repo: pathlib.Path) -> None: """When --max is hit, a truncation warning appears in human output.""" result = runner.invoke( cli, ["code", "symbol-log", "billing.py::Invoice", "--max", "1"] ) assert result.exit_code == 0, result.output assert "incomplete" in result.output or "limit" in result.output def test_symbol_log_truncation_flag_in_json(self, code_repo: pathlib.Path) -> None: """When --max is hit, truncated=true appears in JSON output.""" result = runner.invoke( cli, ["code", "symbol-log", "billing.py::Invoice", "--max", "1", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["truncated"] is True assert data["total_commits_scanned"] == 1 def test_symbol_log_max_zero_errors(self, code_repo: pathlib.Path) -> None: """--max 0 exits non-zero with a clear error.""" result = runner.invoke( cli, ["code", "symbol-log", "billing.py::Invoice", "--max", "0"] ) assert result.exit_code != 0 def test_symbol_log_invalid_from_ref(self, code_repo: pathlib.Path) -> None: """``--from`` with a non-existent ref exits non-zero.""" result = runner.invoke( cli, ["code", "symbol-log", "billing.py::Invoice", "--from", "deadbeef"] ) assert result.exit_code != 0 assert "not found" in result.stderr def test_symbol_log_bfs_follows_merge_parent2(self, code_repo: pathlib.Path) -> None: """BFS walk finds events on feature branches that were merged in via parent2. Simulates a merge commit (parent1=mainline, parent2=feature branch HEAD). The feature branch commit has a structured_delta inserting a symbol. The linear (parent1-only) walk would miss this; BFS must find it. """ import datetime root = code_repo repo_id = json.loads((repo_json_path(root)).read_text())["repo_id"] from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.ids import hash_commit as compute_commit_id from muse.domain import InsertOp, PatchOp, StructuredDelta branch = read_current_branch(root) head_id = get_head_commit_id(root, branch) assert head_id is not None feature_snap = "aa" * 32 feature_at = datetime.datetime(2026, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) feature_id = compute_commit_id( parent_ids=[head_id], snapshot_id=feature_snap, message="feat: add merged_fn", committed_at_iso=feature_at.isoformat(), author="test", ) write_commit(root, CommitRecord( commit_id=feature_id, branch="feat/branch", snapshot_id=feature_snap, message="feat: add merged_fn", committed_at=feature_at, parent_commit_id=head_id, author="test", structured_delta=StructuredDelta(ops=[PatchOp( op="patch", address="billing.py", child_ops=[InsertOp( op="insert", address="billing.py::merged_fn", content_summary="function merged_fn", )], )]), )) merge_snap = "bb" * 32 merge_at = datetime.datetime(2026, 1, 1, 1, 0, tzinfo=datetime.timezone.utc) merge_id = compute_commit_id( parent_ids=[head_id, feature_id], snapshot_id=merge_snap, message="merge feat/branch", committed_at_iso=merge_at.isoformat(), author="test", ) write_commit(root, CommitRecord( commit_id=merge_id, branch=branch, snapshot_id=merge_snap, message="merge feat/branch", committed_at=merge_at, parent_commit_id=head_id, parent2_commit_id=feature_id, author="test", )) branch_ref = ref_path(root, branch) branch_ref.write_text(merge_id) result = runner.invoke( cli, ["code", "symbol-log", "billing.py::merged_fn"] ) assert result.exit_code == 0, result.output # BFS must find the creation event on the feature branch. assert "merged_fn" in result.output assert "created" in result.output def test_symbol_log_linear_walk_misses_parent2(self, code_repo: pathlib.Path) -> None: """Regression guard: verify the BFS result differs from a parent1-only scan. Directly calls _walk_commits_dag and checks it returns commits from both parent chains, not just parent1. """ import datetime root = code_repo repo_id = json.loads((repo_json_path(root)).read_text())["repo_id"] from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.ids import hash_commit as compute_commit_id from muse.plugins.code._query import walk_commits_bfs as _walk_commits_dag branch = read_current_branch(root) head_id = get_head_commit_id(root, branch) assert head_id is not None feature_snap = "dd" * 32 feature_at = datetime.datetime(2026, 2, 1, 0, 0, tzinfo=datetime.timezone.utc) feature_id = compute_commit_id( parent_ids=[], snapshot_id=feature_snap, message="feat on second parent", committed_at_iso=feature_at.isoformat(), author="test", ) write_commit(root, CommitRecord( commit_id=feature_id, branch="feat/x", snapshot_id=feature_snap, message="feat on second parent", committed_at=feature_at, author="test", )) merge_snap = "ee" * 32 merge_at = datetime.datetime(2026, 2, 1, 1, 0, tzinfo=datetime.timezone.utc) merge_id = compute_commit_id( parent_ids=[head_id, feature_id], snapshot_id=merge_snap, message="merge", committed_at_iso=merge_at.isoformat(), author="test", ) write_commit(root, CommitRecord( commit_id=merge_id, branch=branch, snapshot_id=merge_snap, message="merge", committed_at=merge_at, parent_commit_id=head_id, parent2_commit_id=feature_id, author="test", )) branch_ref = ref_path(root, branch) branch_ref.write_text(merge_id) commits, _ = _walk_commits_dag(root, merge_id, max_commits=1000) commit_ids = {c.commit_id for c in commits} assert feature_id in commit_ids # --------------------------------------------------------------------------- # muse code coupling # --------------------------------------------------------------------------- @pytest.fixture def coupling_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with 3 commits where billing.py + models.py co-change twice.""" work = repo # Commit 1: seed — only billing.py (work / "billing.py").write_text("def compute(items):\n return sum(items)\n") runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "seed billing"]) assert r.exit_code == 0, r.output # Commit 2: billing.py + models.py change together (work / "billing.py").write_text("def compute(items, tax=0.0):\n return sum(items) + tax\n") (work / "models.py").write_text("class Order:\n def total(self):\n return 0\n") runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "co-change 1: billing + models"]) assert r.exit_code == 0, r.output # Commit 3: billing.py + models.py change together again (work / "billing.py").write_text("def compute(items, tax=0.0, discount=0.0):\n return sum(items) + tax - discount\n") (work / "models.py").write_text("class Order:\n def total(self):\n return 42\n def apply(self): pass\n") runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "co-change 2: billing + models again"]) assert r.exit_code == 0, r.output return repo class TestCoupling: """Tests for muse code coupling.""" # ── basic correctness ──────────────────────────────────────────────────── def test_coupling_exits_zero(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling"]) assert result.exit_code == 0, result.output def test_coupling_finds_co_changed_pair(self, coupling_repo: pathlib.Path) -> None: """billing.py and models.py co-changed twice — must appear in output.""" result = runner.invoke(cli, ["code", "coupling", "--min", "1"]) assert result.exit_code == 0, result.output assert "billing.py" in result.output assert "models.py" in result.output def test_coupling_shows_header(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling"]) assert "co-change" in result.output.lower() or "coupling" in result.output.lower() assert "Commits analysed" in result.output def test_coupling_min_filter_excludes_low_count( self, coupling_repo: pathlib.Path ) -> None: """--min 3 must exclude our pair that co-changed only twice.""" result = runner.invoke(cli, ["code", "coupling", "--min", "3"]) assert result.exit_code == 0, result.output assert "billing.py" not in result.output or "no file pairs" in result.output def test_coupling_top_limits_output(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling", "--top", "1", "--min", "1", "--json"]) data = json.loads(result.output) assert len(data["pairs"]) <= 1 # ── --file filter ───────────────────────────────────────────────────────── def test_coupling_file_filter_exits_zero(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling", "--file", "billing.py", "--min", "1"]) assert result.exit_code == 0, result.output def test_coupling_file_filter_shows_partner(self, coupling_repo: pathlib.Path) -> None: """--file billing.py must surface models.py as its partner.""" result = runner.invoke(cli, ["code", "coupling", "--file", "billing.py", "--min", "1"]) assert result.exit_code == 0, result.output assert "models.py" in result.output def test_coupling_file_filter_header_names_file( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "coupling", "--file", "billing.py", "--min", "1"]) assert "billing.py" in result.output def test_coupling_file_filter_nonexistent_returns_cleanly( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "coupling", "--file", "nonexistent_xyz.py"]) assert result.exit_code == 0, result.output def test_coupling_file_filter_suffix_match(self, coupling_repo: pathlib.Path) -> None: """Suffix billing.py should match the file even without the full path.""" result = runner.invoke(cli, ["code", "coupling", "--file", "billing.py", "--min", "1"]) assert result.exit_code == 0, result.output assert "models.py" in result.output # ── JSON output ─────────────────────────────────────────────────────────── def test_coupling_json_schema(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "from_ref" in data assert "to_ref" in data assert "commits_analysed" in data assert "truncated" in data assert "filters" in data assert "pairs" in data assert isinstance(data["pairs"], list) def test_coupling_json_pair_schema(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling", "--min", "1", "--json"]) data = json.loads(result.output) if data["pairs"]: pair = data["pairs"][0] assert "file_a" in pair or "file" in pair assert "co_changes" in pair assert isinstance(pair["co_changes"], int) def test_coupling_json_file_filter_uses_partner_schema( self, coupling_repo: pathlib.Path ) -> None: """--file mode emits {file, partner, co_changes} not {file_a, file_b}.""" result = runner.invoke( cli, ["code", "coupling", "--file", "billing.py", "--min", "1", "--json"] ) data = json.loads(result.output) assert data["filters"]["file"] == "billing.py" if data["pairs"]: pair = data["pairs"][0] assert "file" in pair assert "partner" in pair assert "co_changes" in pair assert "file_a" not in pair # partner schema, not pair schema def test_coupling_json_not_truncated_small_repo( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "coupling", "--json"]) data = json.loads(result.output) assert data["truncated"] is False def test_coupling_json_filters_reflect_args( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "coupling", "--top", "5", "--min", "2", "--json"] ) data = json.loads(result.output) assert data["filters"]["top"] == 5 assert data["filters"]["min_count"] == 2 # ── --max-commits ───────────────────────────────────────────────────────── def test_coupling_max_commits_caps_scan(self, coupling_repo: pathlib.Path) -> None: r_full = runner.invoke(cli, ["code", "coupling", "--json"]) r_cap = runner.invoke(cli, ["code", "coupling", "--max-commits", "1", "--json"]) assert r_full.exit_code == 0 and r_cap.exit_code == 0 d_cap = json.loads(r_cap.output) assert d_cap["commits_analysed"] <= 1 def test_coupling_max_commits_truncated_flag( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "coupling", "--max-commits", "1", "--json"]) data = json.loads(result.output) # With 3 commits and cap=1, truncated must be True. assert data["truncated"] is True def test_coupling_max_commits_one_shows_warning( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "coupling", "--max-commits", "1"]) assert result.exit_code == 0, result.output assert "⚠️" in result.output or "capped" in result.output # ── validation ──────────────────────────────────────────────────────────── def test_coupling_top_zero_exits_error(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling", "--top", "0"]) assert result.exit_code != 0 def test_coupling_min_zero_exits_error(self, coupling_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "coupling", "--min", "0"]) assert result.exit_code != 0 def test_coupling_max_commits_zero_exits_error( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "coupling", "--max-commits", "0"]) assert result.exit_code != 0 def test_coupling_invalid_from_ref_exits_error( self, coupling_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "coupling", "--from", "nonexistent-ref-xyz"] ) assert result.exit_code != 0 def test_coupling_bfs_visits_merge_parents(self, repo: pathlib.Path) -> None: """Coupling must count co-changes on feature-branch commits (parent2).""" import datetime # Genesis commit (repo / "billing.py").write_text("def compute(x):\n return x\n") r = runner.invoke(cli, ["commit", "-m", "seed"]) assert r.exit_code == 0, r.output repo_json = json.loads((repo_json_path(repo)).read_text()) repo_id = repo_json["repo_id"] from muse.core.refs import read_current_branch from muse.core.commits import resolve_commit_ref branch = read_current_branch(repo) head = resolve_commit_ref(repo, branch, None) assert head is not None now = datetime.datetime(2026, 3, 1, 0, 0, tzinfo=datetime.timezone.utc) feature_at = now merge_at = now + datetime.timedelta(hours=1) # Feature commit touching billing.py + models.py together. from muse.domain import PatchOp, ReplaceOp, InsertOp, StructuredDelta from muse.core.ids import hash_commit as compute_commit_id feature_delta = StructuredDelta( domain="code", ops=[ PatchOp( op="patch", address="billing.py", child_ops=[ReplaceOp( op="replace", address="billing.py::compute", old_content_id="a" * 64, new_content_id="b" * 64, old_summary="function compute", new_summary="function compute (modified)", position=None, )], child_domain="code", child_summary="compute modified", ), PatchOp( op="patch", address="models.py", child_ops=[InsertOp( op="insert", address="models.py::Order", content_id="c" * 64, content_summary="class Order", position=None, )], child_domain="code", child_summary="Order added", ), ], summary="co-change", ) feature_id = compute_commit_id( [head.commit_id], head.snapshot_id, "co-change on feature branch", feature_at.isoformat(), author="test", ) merge_id = compute_commit_id( [head.commit_id, feature_id], head.snapshot_id, "Merge feature", merge_at.isoformat(), author="test", ) feature_body: CommitDict = { "commit_id": feature_id, "repo_id": repo_id, "branch": "feat/test", "snapshot_id": head.snapshot_id, "message": "co-change on feature branch", "committed_at": feature_at.isoformat(), "parent_commit_id": head.commit_id, "parent2_commit_id": None, "author": "test", "metadata": {}, "structured_delta": feature_delta, } merge_body: CommitDict = { "commit_id": merge_id, "repo_id": repo_id, "branch": branch, "snapshot_id": head.snapshot_id, "message": "Merge feature", "committed_at": merge_at.isoformat(), "parent_commit_id": head.commit_id, "parent2_commit_id": feature_id, "author": "test", "metadata": {}, "structured_delta": None, } from muse.core.commits import ( CommitRecord, write_commit, ) write_commit(repo, CommitRecord.from_dict(feature_body)) write_commit(repo, CommitRecord.from_dict(merge_body)) (ref_path(repo, branch)).write_text(merge_id) result = runner.invoke(cli, ["code", "coupling", "--min", "1", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) pairs_found = { (p.get("file_a", ""), p.get("file_b", "")) for p in data["pairs"] } billing_models = any( ("billing.py" in a and "models.py" in b) or ("models.py" in a and "billing.py" in b) for a, b in pairs_found ) assert billing_models, "BFS must find the feature-branch co-change commit" # --------------------------------------------------------------------------- # muse code stable # --------------------------------------------------------------------------- class TestStable: """Tests for muse code stable.""" # ── basic correctness ──────────────────────────────────────────────────── def test_stable_exits_zero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable"]) assert result.exit_code == 0, result.output def test_stable_shows_header(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable"]) assert result.exit_code == 0, result.output assert "Symbol stability" in result.output assert "Commits analysed" in result.output assert "bedrock" in result.output def test_stable_surfaces_never_touched_symbol(self, code_repo: pathlib.Path) -> None: """Invoice.apply_discount was defined in the genesis commit and never modified.""" result = runner.invoke(cli, ["code", "stable", "--top", "10"]) assert result.exit_code == 0, result.output # apply_discount was never touched in any structured_delta → maximally stable. assert "apply_discount" in result.output def test_stable_since_start_of_range_marker(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--top", "10"]) assert result.exit_code == 0, result.output assert "since start of range" in result.output def test_stable_excludes_docs_by_default(self, code_repo: pathlib.Path) -> None: """Markdown / TOML / YAML symbols must be absent from default output.""" result = runner.invoke(cli, ["code", "stable", "--top", "50"]) assert result.exit_code == 0, result.output assert ".md::" not in result.output assert ".toml::" not in result.output def test_stable_excludes_imports_by_default(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--top", "50"]) assert result.exit_code == 0, result.output assert "::import::" not in result.output def test_stable_include_imports_flag(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--top", "50", "--include-imports"]) assert result.exit_code == 0, result.output # ── JSON output ─────────────────────────────────────────────────────────── def test_stable_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--top", "5", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "from_ref" in data assert "to_ref" in data assert "commits_analysed" in data assert "truncated" in data assert "filters" in data assert "stable" in data assert isinstance(data["stable"], list) def test_stable_json_entry_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--top", "5", "--json"]) data = json.loads(result.output) assert len(data["stable"]) > 0 entry = data["stable"][0] assert "address" in entry assert "unchanged_for" in entry assert "since_start_of_range" in entry assert isinstance(entry["unchanged_for"], int) assert isinstance(entry["since_start_of_range"], bool) def test_stable_json_filters_reflect_args(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "stable", "--top", "3", "--kind", "function", "--json"] ) data = json.loads(result.output) assert data["filters"]["top"] == 3 assert data["filters"]["kind"] == "function" assert data["filters"]["include_imports"] is False assert data["filters"]["include_docs"] is False def test_stable_json_not_truncated_small_repo(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--json"]) data = json.loads(result.output) assert data["truncated"] is False # ── --language filter ───────────────────────────────────────────────────── def test_stable_language_filter_case_insensitive(self, code_repo: pathlib.Path) -> None: """--language python and --language Python must behave identically.""" r_lower = runner.invoke(cli, ["code", "stable", "--language", "python", "--json"]) r_upper = runner.invoke(cli, ["code", "stable", "--language", "Python", "--json"]) assert r_lower.exit_code == 0 and r_upper.exit_code == 0 d_lower = json.loads(r_lower.output) d_upper = json.loads(r_upper.output) addrs_lower = {e["address"] for e in d_lower["stable"]} addrs_upper = {e["address"] for e in d_upper["stable"]} assert addrs_lower == addrs_upper def test_stable_language_filter_restricts_results(self, code_repo: pathlib.Path) -> None: r_py = runner.invoke(cli, ["code", "stable", "--language", "python", "--json"]) r_all = runner.invoke(cli, ["code", "stable", "--json"]) d_py = json.loads(r_py.output) d_all = json.loads(r_all.output) # Python-filtered results must be a subset of or equal to unfiltered results. py_addrs = {e["address"] for e in d_py["stable"]} all_addrs = {e["address"] for e in d_all["stable"]} assert py_addrs <= all_addrs # ── --since REF ─────────────────────────────────────────────────────────── def test_stable_since_reduces_commits_analysed(self, code_repo: pathlib.Path) -> None: """--since HEAD restricts the window to 0 commits (stop immediately).""" # Get the HEAD commit id to use as --since boundary import json as _json root = code_repo repo_id = _json.loads((repo_json_path(root)).read_text())["repo_id"] from muse.core.refs import read_current_branch from muse.core.commits import resolve_commit_ref branch = read_current_branch(root) head = resolve_commit_ref(root, branch, None) assert head is not None r_all = runner.invoke(cli, ["code", "stable", "--json"]) r_since = runner.invoke(cli, ["code", "stable", "--since", head.commit_id, "--json"]) assert r_all.exit_code == 0 and r_since.exit_code == 0 d_all = json.loads(r_all.output) d_since = json.loads(r_since.output) # Window stops at HEAD itself → at most 1 commit analysed. assert d_since["commits_analysed"] <= d_all["commits_analysed"] def test_stable_since_invalid_ref_exits_nonzero(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--since", "nonexistent-ref-xyz"]) assert result.exit_code != 0 # ── --max-commits ───────────────────────────────────────────────────────── def test_stable_max_commits_caps_scan(self, code_repo: pathlib.Path) -> None: r_full = runner.invoke(cli, ["code", "stable", "--json"]) r_cap = runner.invoke(cli, ["code", "stable", "--max-commits", "1", "--json"]) assert r_full.exit_code == 0 and r_cap.exit_code == 0 d_cap = json.loads(r_cap.output) assert d_cap["commits_analysed"] <= 1 def test_stable_max_commits_one_shows_truncated_warning( self, code_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "stable", "--max-commits", "1"]) assert result.exit_code == 0, result.output # With 2 commits and cap=1, truncated warning should appear. assert "capped" in result.output or "⚠️" in result.output def test_stable_max_commits_zero_exits_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--max-commits", "0"]) assert result.exit_code != 0 # ── --top validation ────────────────────────────────────────────────────── def test_stable_top_zero_exits_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--top", "0"]) assert result.exit_code != 0 def test_stable_top_limits_output_count(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "stable", "--top", "2", "--json"]) data = json.loads(result.output) assert len(data["stable"]) <= 2 # ── BFS follows merge parents ───────────────────────────────────────────── def test_stable_bfs_follows_merge_parent2(self, repo: pathlib.Path) -> None: """Symbols touched only on a merged feature branch must be detected as unstable.""" import datetime # Create a symbol in commit 1 (main). (repo / "core.py").write_text("def bedrock():\n return 42\n") r = runner.invoke(cli, ["commit", "-m", "Add bedrock"]) assert r.exit_code == 0, r.output repo_json = json.loads((repo_json_path(repo)).read_text()) repo_id = repo_json["repo_id"] from muse.core.refs import read_current_branch from muse.core.commits import resolve_commit_ref branch = read_current_branch(repo) head_commit = resolve_commit_ref(repo, branch, None) assert head_commit is not None head_id = head_commit.commit_id feature_at = datetime.datetime(2026, 4, 1, 0, 0, tzinfo=datetime.timezone.utc) merge_at = datetime.datetime(2026, 4, 1, 1, 0, tzinfo=datetime.timezone.utc) # Feature-branch commit that touched "bedrock" via a structured_delta. from muse.domain import PatchOp, ReplaceOp, StructuredDelta from muse.core.ids import hash_commit as compute_commit_id bedrock_delta = StructuredDelta( domain="code", ops=[PatchOp( op="patch", address="core.py", child_ops=[ReplaceOp( op="replace", address="core.py::bedrock", old_content_id="a" * 64, new_content_id="b" * 64, old_summary="function bedrock", new_summary="function bedrock (modified)", position=None, )], child_domain="code", child_summary="bedrock modified", )], summary="bedrock modified", ) feature_id = compute_commit_id( [head_id], head_commit.snapshot_id, "Feature: touch bedrock", feature_at.isoformat(), author="test", ) merge_id = compute_commit_id( [head_id, feature_id], head_commit.snapshot_id, "Merge feat/touch-bedrock", merge_at.isoformat(), author="test", ) feature_body: CommitDict = { "commit_id": feature_id, "repo_id": repo_id, "branch": "feat/touch-bedrock", "snapshot_id": head_commit.snapshot_id, "message": "Feature: touch bedrock", "committed_at": feature_at.isoformat(), "parent_commit_id": head_id, "parent2_commit_id": None, "author": "test", "metadata": {}, "structured_delta": bedrock_delta, } # Merge commit whose parent2 is the feature commit. merge_body: CommitDict = { "commit_id": merge_id, "repo_id": repo_id, "branch": branch, "snapshot_id": head_commit.snapshot_id, "message": "Merge feat/touch-bedrock", "committed_at": merge_at.isoformat(), "parent_commit_id": head_id, "parent2_commit_id": feature_id, "author": "test", "metadata": {}, "structured_delta": None, } from muse.core.commits import ( CommitRecord, write_commit, ) write_commit(repo, CommitRecord.from_dict(feature_body)) write_commit(repo, CommitRecord.from_dict(merge_body)) (ref_path(repo, branch)).write_text(merge_id) result = runner.invoke(cli, ["code", "stable", "--top", "10", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) # bedrock was touched in the feature-branch commit; BFS must find it. # It should have unchanged_for < total_commits (not maximally stable). bedrock_entries = [e for e in data["stable"] if "bedrock" in e["address"]] if bedrock_entries: assert not bedrock_entries[0]["since_start_of_range"] # --------------------------------------------------------------------------- # muse code compare # --------------------------------------------------------------------------- @pytest.fixture def compare_repo(repo: pathlib.Path) -> tuple[pathlib.Path, str, str]: """Repo with two commits; returns (path, commit_id_a, commit_id_b). Commit A — billing.py with Invoice.compute_total + process_order. Commit B — compute_total renamed to compute_invoice_total; generate_pdf and send_email added. Multi-line message to test truncation. """ (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return sum(items) def apply_discount(self, total, pct): return total * (1 - pct) def process_order(invoice, items): return invoice.compute_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Add billing module"]) assert r.exit_code == 0, r.output from muse.core.refs import read_current_branch branch = read_current_branch(repo) commit_a = get_head_commit_id(repo, branch) (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_invoice_total(self, items): return sum(items) def apply_discount(self, total, pct): return total * (1 - pct) def generate_pdf(self): return b"pdf" def process_order(invoice, items): return invoice.compute_invoice_total(items) def send_email(address): pass """)) runner.invoke(cli, ["code", "add", "billing.py"]) # Multi-line message to test first-line truncation. r = runner.invoke(cli, [ "commit", "-m", "Rename compute_total, add generate_pdf + send_email\n\nThis is the extended body.", ]) assert r.exit_code == 0, r.output commit_b = get_head_commit_id(repo, branch) assert commit_a is not None assert commit_b is not None return repo, commit_a, commit_b class TestCompare: """Tests for muse code compare.""" # ── basic correctness ──────────────────────────────────────────────────── def test_compare_exits_zero( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b]) assert result.exit_code == 0, result.output def test_compare_shows_header( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b]) assert result.exit_code == 0, result.output assert "Semantic comparison" in result.output assert "From:" in result.output assert "To:" in result.output def test_compare_commit_message_first_line_only( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: """Multi-line commit messages must be truncated to their first line.""" _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b]) assert result.exit_code == 0, result.output # The body of the second commit must not appear in the header. assert "This is the extended body" not in result.output def test_compare_same_ref_no_changes( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, _ = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_a]) assert result.exit_code == 0, result.output assert "no semantic changes" in result.output def test_compare_detects_added_symbols( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b]) assert result.exit_code == 0, result.output # generate_pdf and send_email were added in commit B. assert "generate_pdf" in result.output or "send_email" in result.output def test_compare_invalid_ref_exits_nonzero( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, _ = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, "deadbeefdeadbeef"]) assert result.exit_code != 0 def test_compare_requires_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "compare", "abc", "def"]) assert result.exit_code != 0 # ── JSON schema ────────────────────────────────────────────────────────── def test_compare_json_schema( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert set(data.keys()) >= {"from", "to", "filters", "stat", "ops"} def test_compare_json_from_to_schema( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "commit_id" in data["from"] assert "message" in data["from"] assert "commit_id" in data["to"] assert "message" in data["to"] def test_compare_json_message_first_line_only( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "\n" not in data["to"]["message"] assert "This is the extended body" not in data["to"]["message"] def test_compare_json_stat_schema( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--json"]) assert result.exit_code == 0, result.output stat = json.loads(result.output)["stat"] assert set(stat.keys()) >= { "files_changed", "symbols_added", "symbols_removed", "symbols_modified", "semver_impact", } assert isinstance(stat["files_changed"], int) assert isinstance(stat["symbols_added"], int) assert stat["semver_impact"] in ("MAJOR", "MINOR", "PATCH", "NONE") def test_compare_json_filters_schema( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--json"]) assert result.exit_code == 0, result.output filters = json.loads(result.output)["filters"] assert set(filters.keys()) >= {"kind", "file", "language"} # No filters applied — all None. assert filters["kind"] is None assert filters["file"] is None assert filters["language"] is None def test_compare_json_ops_schema( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--json"]) assert result.exit_code == 0, result.output ops = json.loads(result.output)["ops"] assert isinstance(ops, list) assert len(ops) > 0 for op in ops: assert "op" in op assert "address" in op assert "detail" in op def test_compare_same_ref_json_empty_ops( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, _ = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_a, "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["ops"] == [] assert data["stat"]["semver_impact"] == "NONE" # ── --stat flag ────────────────────────────────────────────────────────── def test_compare_stat_shows_counts( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--stat"]) assert result.exit_code == 0, result.output assert "Files changed:" in result.output assert "Symbols added:" in result.output assert "Symbols removed:" in result.output assert "Symbols modified:" in result.output assert "SemVer impact:" in result.output def test_compare_stat_no_per_symbol_listing( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--stat"]) assert result.exit_code == 0, result.output # --stat should not include per-symbol listing lines ("added …", "removed …"). assert " added " not in result.output assert " removed " not in result.output assert " modified " not in result.output def test_compare_stat_same_ref_semver_none( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, _ = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_a, "--stat"]) assert result.exit_code == 0, result.output assert "NONE" in result.output # ── --semver flag ──────────────────────────────────────────────────────── def test_compare_semver_appended_to_full_output( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke(cli, ["code", "compare", ref_a, ref_b, "--semver"]) assert result.exit_code == 0, result.output assert "SemVer impact:" in result.output # ── --file filter ──────────────────────────────────────────────────────── def test_compare_file_filter_restricts_output( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--file", "billing.py"] ) assert result.exit_code == 0, result.output def test_compare_file_filter_nonexistent_no_ops( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--file", "nonexistent.py"] ) assert result.exit_code == 0, result.output assert "no semantic changes" in result.output def test_compare_file_filter_in_json( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--file", "billing.py", "--json"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["filters"]["file"] == "billing.py" # ── --kind filter ──────────────────────────────────────────────────────── def test_compare_kind_filter_case_insensitive( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo r_lower = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--kind", "function"] ) r_upper = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--kind", "Function"] ) assert r_lower.exit_code == 0 assert r_upper.exit_code == 0 # Both produce the same ops list. assert r_lower.output == r_upper.output def test_compare_kind_filter_in_json( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--kind", "function", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["filters"]["kind"] == "function" # ── --language filter ──────────────────────────────────────────────────── def test_compare_language_filter_python( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--language", "Python"] ) assert result.exit_code == 0, result.output def test_compare_language_filter_case_insensitive( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo r_lower = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--language", "python"] ) r_upper = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--language", "Python"] ) assert r_lower.exit_code == 0 assert r_upper.exit_code == 0 assert r_lower.output == r_upper.output def test_compare_language_filter_in_json( self, compare_repo: tuple[pathlib.Path, str, str] ) -> None: _, ref_a, ref_b = compare_repo result = runner.invoke( cli, ["code", "compare", ref_a, ref_b, "--language", "python", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["filters"]["language"] == "Python" # --------------------------------------------------------------------------- # muse code languages # --------------------------------------------------------------------------- @pytest.fixture def lang_repo(repo: pathlib.Path) -> tuple[pathlib.Path, str, str]: """Two-commit repo; returns (path, commit_id_a, commit_id_b). Commit A — billing.py (Python) only. Commit B — billing.py extended + README.md added. """ (repo / "billing.py").write_text(textwrap.dedent("""\ import os import json class Invoice: def compute_total(self, items: list[float]) -> float: return sum(items) def process_order(invoice: Invoice, items: list[float]) -> float: return invoice.compute_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Add billing module"]) assert r.exit_code == 0, r.output from muse.core.refs import read_current_branch branch = read_current_branch(repo) commit_a = get_head_commit_id(repo, branch) (repo / "billing.py").write_text(textwrap.dedent("""\ import os import json class Invoice: def compute_total(self, items: list[float]) -> float: return sum(items) def generate_pdf(self) -> bytes: return b"pdf" def process_order(invoice: Invoice, items: list[float]) -> float: return invoice.compute_total(items) def send_email(address: str) -> None: pass """)) (repo / "README.md").write_text("# My Project\n\nA billing module.\n") runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "Add generate_pdf, send_email, README"]) assert r.exit_code == 0, r.output commit_b = get_head_commit_id(repo, branch) assert commit_a is not None assert commit_b is not None return repo, commit_a, commit_b class TestLanguages: """Tests for muse code languages.""" # ── basic correctness ──────────────────────────────────────────────────── def test_languages_exits_zero(self, lang_repo: tuple[pathlib.Path, str, str]) -> None: result = runner.invoke(cli, ["code", "languages"]) assert result.exit_code == 0, result.output def test_languages_shows_header(self, lang_repo: tuple[pathlib.Path, str, str]) -> None: result = runner.invoke(cli, ["code", "languages"]) assert result.exit_code == 0, result.output assert "Language breakdown" in result.output assert "Total" in result.output def test_languages_shows_python(self, lang_repo: tuple[pathlib.Path, str, str]) -> None: result = runner.invoke(cli, ["code", "languages"]) assert result.exit_code == 0, result.output assert "Python" in result.output def test_languages_shows_markdown(self, lang_repo: tuple[pathlib.Path, str, str]) -> None: result = runner.invoke(cli, ["code", "languages"]) assert result.exit_code == 0, result.output assert "Markdown" in result.output def test_languages_excludes_imports_by_default( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: """Import pseudo-symbols must not inflate the count by default.""" r_default = runner.invoke(cli, ["code", "languages", "--json"]) assert r_default.exit_code == 0, r_default.output r_imports = runner.invoke(cli, ["code", "languages", "--include-imports", "--json"]) assert r_imports.exit_code == 0, r_imports.output class _LangEntry(TypedDict): language: str files: int symbols: int kinds: _KindsMap class _LangsJson(TypedDict): languages: list[_LangEntry] data_default: _LangsJson = json.loads(r_default.output) data_imports: _LangsJson = json.loads(r_imports.output) def _py_syms(data: _LangsJson) -> int: for e in data["languages"]: if e["language"] == "Python": return e["symbols"] return 0 syms_default = _py_syms(data_default) syms_imports = _py_syms(data_imports) # With imports included the symbol count must be strictly higher. assert syms_imports > syms_default def test_languages_requires_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "languages"]) assert result.exit_code != 0 def test_languages_invalid_commit_exits_nonzero( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--commit", "deadbeefdeadbeef"]) assert result.exit_code != 0 # ── JSON schema ────────────────────────────────────────────────────────── def test_languages_json_schema( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert set(data.keys()) >= {"commit", "include_imports", "languages"} def test_languages_json_commit_block( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) commit = data["commit"] assert "commit_id" in commit assert "message" in commit # message is first line only — no newlines. assert "\n" not in commit["message"] def test_languages_json_entry_schema( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--json"]) assert result.exit_code == 0, result.output langs = json.loads(result.output)["languages"] assert isinstance(langs, list) assert len(langs) > 0 for entry in langs: assert "language" in entry assert "files" in entry assert "symbols" in entry assert "kinds" in entry assert isinstance(entry["files"], int) assert isinstance(entry["symbols"], int) assert isinstance(entry["kinds"], dict) def test_languages_json_include_imports_flag( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--include-imports", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["include_imports"] is True # ── --sort flag ────────────────────────────────────────────────────────── def test_languages_sort_name( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--sort", "name"]) assert result.exit_code == 0, result.output def test_languages_sort_symbols( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--sort", "symbols"]) assert result.exit_code == 0, result.output # Python should appear before Markdown when sorted by symbols desc. lines = result.output.splitlines() py_line = next((i for i, l in enumerate(lines) if "Python" in l), None) md_line = next((i for i, l in enumerate(lines) if "Markdown" in l), None) # Both might not exist if the repo only has Python; at least ensure no crash. assert py_line is not None def test_languages_sort_files( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--sort", "files"]) assert result.exit_code == 0, result.output def test_languages_invalid_sort_exits_nonzero( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--sort", "bad"]) assert result.exit_code != 0 # ── --diff flag ────────────────────────────────────────────────────────── def test_languages_diff_exits_zero( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: _, commit_a, _ = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_a]) assert result.exit_code == 0, result.output def test_languages_diff_shows_header( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: _, commit_a, _ = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_a]) assert result.exit_code == 0, result.output assert "Language change" in result.output assert "Net" in result.output def test_languages_diff_detects_new_symbols( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: """Commit B added generate_pdf and send_email — Python symbol count must grow.""" _, commit_a, _ = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_a]) assert result.exit_code == 0, result.output # Python line should show a positive delta. lines = result.output.splitlines() py_line = next((l for l in lines if "Python" in l), "") assert "+" in py_line def test_languages_diff_unchanged_label( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: """Comparing a commit to itself must show all languages as unchanged.""" _, _, commit_b = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_b]) assert result.exit_code == 0, result.output assert "unchanged" in result.output def test_languages_diff_invalid_ref_exits_nonzero( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: result = runner.invoke(cli, ["code", "languages", "--diff", "deadbeefdeadbeef"]) assert result.exit_code != 0 def test_languages_diff_json_schema( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: _, commit_a, _ = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_a, "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert set(data.keys()) >= {"from_commit", "to_commit", "include_imports", "diff"} assert "commit_id" in data["from_commit"] assert "message" in data["to_commit"] def test_languages_diff_json_entry_schema( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: _, commit_a, _ = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_a, "--json"]) assert result.exit_code == 0, result.output diff = json.loads(result.output)["diff"] assert isinstance(diff, list) assert len(diff) > 0 for entry in diff: assert "language" in entry assert "delta_files" in entry assert "delta_symbols" in entry assert "files_before" in entry assert "files_after" in entry assert "symbols_before" in entry assert "symbols_after" in entry assert "status" in entry assert entry["status"] in ("added", "removed", "changed", "unchanged") def test_languages_diff_json_python_delta_positive( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: _, commit_a, _ = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_a, "--json"]) assert result.exit_code == 0, result.output diff = json.loads(result.output)["diff"] py = next((e for e in diff if e["language"] == "Python"), None) assert py is not None assert py["delta_symbols"] > 0 assert py["status"] == "changed" def test_languages_diff_json_markdown_added( self, lang_repo: tuple[pathlib.Path, str, str] ) -> None: """README.md was added in commit B — Markdown status should be 'added'.""" _, commit_a, _ = lang_repo result = runner.invoke(cli, ["code", "languages", "--diff", commit_a, "--json"]) assert result.exit_code == 0, result.output diff = json.loads(result.output)["diff"] md = next((e for e in diff if e["language"] == "Markdown"), None) assert md is not None assert md["status"] == "added" assert md["files_before"] == 0 assert md["files_after"] == 1 # --------------------------------------------------------------------------- # muse code rename # --------------------------------------------------------------------------- @pytest.fixture def rename_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with billing.py and a test file that imports and calls its symbols.""" (repo / "billing.py").write_text(textwrap.dedent("""\ import os class Invoice: def compute_total(self, items): return sum(items) def apply_discount(self, total, pct): return total * (1 - pct) def process_order(invoice, items): total = compute_total(items) return total """)) (repo / "test_billing.py").write_text(textwrap.dedent("""\ from billing import compute_total, Invoice def test_compute_total(): inv = Invoice() result = inv.compute_total([1, 2, 3]) assert compute_total([1, 2, 3]) == 6 """)) r = runner.invoke(cli, ["commit", "-m", "Initial billing + tests"]) assert r.exit_code == 0, r.output return repo class TestRename: """Tests for muse code rename.""" # ── basic correctness ──────────────────────────────────────────────────── def test_rename_dry_run_exits_zero(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--dry-run"], ) assert result.exit_code == 0, result.output def test_rename_dry_run_shows_preview(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--dry-run"], ) assert result.exit_code == 0, result.output assert "Renaming" in result.output assert "process_order" in result.output assert "handle_order" in result.output def test_rename_dry_run_does_not_write(self, rename_repo: pathlib.Path) -> None: before = (rename_repo / "billing.py").read_text() runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--dry-run"], ) assert (rename_repo / "billing.py").read_text() == before def test_rename_applies_definition(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--scope", "definition", "--yes"], ) assert result.exit_code == 0, result.output content = (rename_repo / "billing.py").read_text() assert "def handle_order(" in content assert "def process_order(" not in content def test_rename_only_def_token_not_string_literal( self, rename_repo: pathlib.Path ) -> None: """The rename must not touch string literals containing the old name.""" # Add a docstring with the old name. billing = (rename_repo / "billing.py").read_text() billing += '\nDOC = "compute_total is a function"\n' (rename_repo / "billing.py").write_text(billing) runner.invoke(cli, ["code", "add", "billing.py"]) runner.invoke(cli, ["commit", "-m", "add docstring"]) runner.invoke( cli, ["code", "rename", "billing.py::Invoice.compute_total", "compute_invoice_total", "--scope", "definition", "--yes"], ) content = (rename_repo / "billing.py").read_text() # The string literal must be untouched. assert '"compute_total is a function"' in content def test_rename_method_definition_scoped_to_class( self, rename_repo: pathlib.Path ) -> None: """billing.py::Invoice.compute_total must rename the method inside Invoice.""" result = runner.invoke( cli, ["code", "rename", "billing.py::Invoice.compute_total", "compute_invoice_total", "--scope", "definition", "--yes"], ) assert result.exit_code == 0, result.output content = (rename_repo / "billing.py").read_text() assert "def compute_invoice_total(self" in content # The module-level bare call in process_order stays unchanged. assert "compute_total(items)" in content def test_rename_updates_import_sites(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::compute_total", "compute_invoice_total", "--scope", "imports", "--yes"], ) assert result.exit_code == 0, result.output content = (rename_repo / "test_billing.py").read_text() assert "compute_invoice_total" in content assert "from billing import" in content def test_rename_requires_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke( cli, ["code", "rename", "billing.py::foo", "bar", "--yes"] ) assert result.exit_code != 0 def test_rename_rejects_same_name(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "process_order", "--yes"], ) assert result.exit_code != 0 def test_rename_rejects_invalid_identifier(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "123invalid", "--yes"], ) assert result.exit_code != 0 def test_rename_rejects_address_without_double_colon( self, rename_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py", "new_name", "--yes"] ) assert result.exit_code != 0 def test_rename_rejects_nonexistent_symbol(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::nonexistent_func", "new_name", "--scope", "definition", "--yes"], ) assert result.exit_code != 0 def test_rename_rejects_path_traversal(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "../../etc/passwd::foo", "bar", "--yes"], ) assert result.exit_code != 0 def test_rename_rejects_dunder_without_force(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::Invoice.compute_total", "__compute__", "--yes"], ) assert result.exit_code != 0 def test_rename_allows_dunder_with_force(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::Invoice.compute_total", "__compute__", "--scope", "definition", "--yes", "--force"], ) assert result.exit_code == 0, result.output content = (rename_repo / "billing.py").read_text() assert "def __compute__(self" in content # ── JSON output ────────────────────────────────────────────────────────── def test_rename_json_schema(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--dry-run", "--json"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert set(data.keys()) >= { "from_address", "to_address", "from_name", "to_name", "scope", "dry_run", "files_to_modify", "total_edit_sites", "edit_sites", } def test_rename_json_dry_run_empty_files_to_modify( self, rename_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--dry-run", "--json"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["dry_run"] is True assert data["files_to_modify"] == [] def test_rename_json_edit_site_schema(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--dry-run", "--json"], ) assert result.exit_code == 0, result.output sites = json.loads(result.output)["edit_sites"] assert isinstance(sites, list) assert len(sites) > 0 for site in sites: assert "file" in site assert "line" in site assert "col_start" in site assert "col_end" in site assert "kind" in site assert "context" in site assert site["kind"] in ("definition", "import", "reference") assert site["col_start"] < site["col_end"] def test_rename_json_definition_site_present(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--dry-run", "--json"], ) assert result.exit_code == 0, result.output sites = json.loads(result.output)["edit_sites"] def_sites = [s for s in sites if s["kind"] == "definition"] assert len(def_sites) == 1 assert def_sites[0]["file"] == "billing.py" assert "process_order" in def_sites[0]["context"] def test_rename_json_apply_writes_files(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--yes", "--json", "--scope", "definition"], ) assert result.exit_code == 0, result.output content = (rename_repo / "billing.py").read_text() assert "def handle_order(" in content # ── --scope flag ───────────────────────────────────────────────────────── def test_rename_scope_definition_only(self, rename_repo: pathlib.Path) -> None: """--scope definition should only touch the def token.""" runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--scope", "definition", "--yes"], ) billing = (rename_repo / "billing.py").read_text() test = (rename_repo / "test_billing.py").read_text() assert "def handle_order(" in billing # The import in test_billing.py must be untouched. assert "process_order" not in test or "import" in test def test_rename_scope_imports_only(self, rename_repo: pathlib.Path) -> None: runner.invoke( cli, ["code", "rename", "billing.py::compute_total", "compute_invoice_total", "--scope", "imports", "--yes"], ) billing = (rename_repo / "billing.py").read_text() # The definition in billing.py must be untouched. assert "def compute_total(" in billing def test_rename_json_scope_reflected(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--scope", "definition", "--dry-run", "--json"], ) assert result.exit_code == 0, result.output assert json.loads(result.output)["scope"] == "definition" # ── --max-files guard ──────────────────────────────────────────────────── def test_rename_max_files_validation(self, rename_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--max-files", "0", "--dry-run"], ) assert result.exit_code != 0 # ── edit precision ─────────────────────────────────────────────────────── def test_rename_preserves_surrounding_code(self, rename_repo: pathlib.Path) -> None: """Renaming process_order must not touch apply_discount or compute_total.""" runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--scope", "definition", "--yes"], ) content = (rename_repo / "billing.py").read_text() assert "def apply_discount(" in content assert "def compute_total(" in content def test_rename_col_precision_correct(self, rename_repo: pathlib.Path) -> None: """The definition rename must produce syntactically valid Python.""" runner.invoke( cli, ["code", "rename", "billing.py::process_order", "handle_order", "--scope", "definition", "--yes"], ) import ast as _ast content = (rename_repo / "billing.py").read_text() # Must parse without SyntaxError. try: _ast.parse(content) except SyntaxError as e: pytest.fail(f"Renamed file has a syntax error: {e}") # --------------------------------------------------------------------------- # blast-risk # --------------------------------------------------------------------------- @pytest.fixture def blast_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with two commits: a production module and a test file. billing.py defines Invoice.compute_total and process_order. test_billing.py imports and calls both — so they have at least one test caller. A second commit modifies compute_total so churn > 0. """ (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return sum(items) def apply_discount(self, total, pct): return total * (1 - pct) def process_order(invoice, items): return invoice.compute_total(items) """)) (repo / "test_billing.py").write_text(textwrap.dedent("""\ from billing import Invoice, process_order def test_compute_total(): inv = Invoice() assert inv.compute_total([1, 2, 3]) == 6 def test_process_order(): inv = Invoice() assert process_order(inv, [10]) == 10 """)) runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "Add billing module and tests"]) assert r.exit_code == 0, r.output # Second commit: modify compute_total so churn count > 0. (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): # round to two decimal places return round(sum(items), 2) def apply_discount(self, total, pct): return total * (1 - pct) def process_order(invoice, items): return invoice.compute_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r2 = runner.invoke(cli, ["commit", "-m", "Round compute_total result"]) assert r2.exit_code == 0, r2.output return repo class TestBlastRisk: """Tests for muse code blast-risk.""" # ── basic correctness ──────────────────────────────────────────────────── def test_blast_risk_exits_zero(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk"]) assert result.exit_code == 0, result.output def test_blast_risk_shows_header(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk"]) assert result.exit_code == 0 assert "blast-risk" in result.output assert "commits" in result.output def test_blast_risk_shows_scoring_line(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk"]) assert result.exit_code == 0 assert "Scoring:" in result.output assert "impact" in result.output assert "churn" in result.output assert "test-gap" in result.output assert "coupling" in result.output def test_blast_risk_shows_table_columns(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk"]) assert result.exit_code == 0 assert "RISK" in result.output assert "IMPACT" in result.output assert "CHURN" in result.output assert "TEST-GAP" in result.output def test_blast_risk_lists_symbols(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk"]) assert result.exit_code == 0 # At least one symbol from billing.py should appear. assert "billing.py" in result.output def test_blast_risk_risk_scores_in_range(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) for sym in data["symbols"]: assert 0 <= sym["risk"] <= 100 assert 0 <= sym["impact_score"] <= 100 assert 0 <= sym["churn_score"] <= 100 assert 0 <= sym["test_gap_score"] <= 100 assert 0 <= sym["coupling_score"] <= 100 # ── JSON schema ────────────────────────────────────────────────────────── def test_blast_risk_json_top_level_keys(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "ref" in data assert "commits_analysed" in data assert "truncated" in data assert "filters" in data assert "weights" in data assert "symbols" in data def test_blast_risk_json_weights_sum_to_one(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) data = json.loads(result.output) total = sum(data["weights"].values()) assert abs(total - 1.0) < 1e-6 def test_blast_risk_json_symbol_schema(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) data = json.loads(result.output) assert len(data["symbols"]) > 0 sym = data["symbols"][0] for key in ("address", "kind", "file", "risk", "impact_raw", "churn_raw", "test_gap_raw", "coupling_raw", "impact_score", "churn_score", "test_gap_score", "coupling_score"): assert key in sym, f"missing key: {key}" def test_blast_risk_json_sorted_by_risk_desc(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) data = json.loads(result.output) risks = [s["risk"] for s in data["symbols"]] assert risks == sorted(risks, reverse=True) def test_blast_risk_json_no_import_pseudosymbols(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) data = json.loads(result.output) for sym in data["symbols"]: assert "::import::" not in sym["address"] def test_blast_risk_json_filters_reflected(self, blast_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blast-risk", "--json", "--kind", "function", "--min-risk", "10"] ) data = json.loads(result.output) assert data["filters"]["kind"] == "function" assert data["filters"]["min_risk"] == 10 # ── --top flag ─────────────────────────────────────────────────────────── def test_blast_risk_top_limits_output(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json", "--top", "2"]) data = json.loads(result.output) assert len(data["symbols"]) <= 2 def test_blast_risk_top_validation(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--top", "0"]) assert result.exit_code != 0 # ── --kind filter ──────────────────────────────────────────────────────── def test_blast_risk_kind_filter_restricts(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json", "--kind", "class"]) data = json.loads(result.output) for sym in data["symbols"]: assert sym["kind"] == "class" def test_blast_risk_kind_filter_function(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json", "--kind", "function"]) assert result.exit_code == 0 data = json.loads(result.output) for sym in data["symbols"]: assert sym["kind"] in ("function", "method") # ── --file filter ──────────────────────────────────────────────────────── def test_blast_risk_file_filter_restricts(self, blast_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blast-risk", "--json", "--file", "billing.py"] ) data = json.loads(result.output) for sym in data["symbols"]: assert "billing.py" in sym["file"] def test_blast_risk_file_filter_nonexistent_returns_empty( self, blast_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "blast-risk", "--json", "--file", "no_such_file.py"] ) assert result.exit_code == 0 data = json.loads(result.output) assert data["symbols"] == [] # ── --min-risk filter ──────────────────────────────────────────────────── def test_blast_risk_min_risk_filters(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json", "--min-risk", "80"]) data = json.loads(result.output) for sym in data["symbols"]: assert sym["risk"] >= 80 def test_blast_risk_min_risk_100_all_excluded(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json", "--min-risk", "100"]) assert result.exit_code == 0 def test_blast_risk_min_risk_validation(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--min-risk", "101"]) assert result.exit_code != 0 result2 = runner.invoke(cli, ["code", "blast-risk", "--min-risk", "-1"]) assert result2.exit_code != 0 # ── --explain flag ─────────────────────────────────────────────────────── def test_blast_risk_explain_exits_zero(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) data = json.loads(result.output) if not data["symbols"]: pytest.skip("no symbols") addr = data["symbols"][0]["address"] result2 = runner.invoke(cli, ["code", "blast-risk", "--explain", addr]) assert result2.exit_code == 0, result2.output def test_blast_risk_explain_shows_breakdown(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) data = json.loads(result.output) if not data["symbols"]: pytest.skip("no symbols") addr = data["symbols"][0]["address"] result2 = runner.invoke(cli, ["code", "blast-risk", "--explain", addr]) assert "Risk score:" in result2.output assert "Impact" in result2.output assert "Churn" in result2.output assert "Test gap" in result2.output assert "Coupling" in result2.output def test_blast_risk_explain_nonexistent_errors(self, blast_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blast-risk", "--explain", "no_file.py::no_symbol"] ) assert result.exit_code != 0 def test_blast_risk_explain_json(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--json"]) data = json.loads(result.output) if not data["symbols"]: pytest.skip("no symbols") addr = data["symbols"][0]["address"] result2 = runner.invoke(cli, ["code", "blast-risk", "--explain", addr, "--json"]) assert result2.exit_code == 0, result2.output detail = json.loads(result2.output) assert detail["address"] == addr assert "risk" in detail # ── --max-commits ──────────────────────────────────────────────────────── def test_blast_risk_max_commits_validation(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--max-commits", "0"]) assert result.exit_code != 0 def test_blast_risk_max_commits_respected(self, blast_repo: pathlib.Path) -> None: # With max-commits=1, commits_analysed <= 1. result = runner.invoke( cli, ["code", "blast-risk", "--json", "--max-commits", "1"] ) assert result.exit_code == 0 data = json.loads(result.output) assert data["commits_analysed"] <= 1 def test_blast_risk_max_commits_truncated_flag(self, blast_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "blast-risk", "--json", "--max-commits", "1"] ) data = json.loads(result.output) # Two commits exist so truncated should be True with cap=1. assert isinstance(data["truncated"], bool) # ── --since ────────────────────────────────────────────────────────────── def test_blast_risk_since_invalid_ref(self, blast_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blast-risk", "--since", "nonexistent_ref"]) assert result.exit_code != 0 # ── requires repo ──────────────────────────────────────────────────────── def test_blast_risk_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, ["code", "blast-risk"]) assert result.exit_code != 0 finally: os.chdir(old) # --------------------------------------------------------------------------- # velocity # --------------------------------------------------------------------------- @pytest.fixture def velocity_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with two modules across several commits to exercise velocity metrics. Module layout: core/store.py — grows across commits (inserts) shrink/util.py — has a delete later (net negative at some point) Commit structure (window=2): 1: create core/store.py with 2 functions 2: add a third function to core/store.py → current window: +3 added 3: add shrink/util.py with one function → also in current window 4: delete the function in shrink/util.py → shrink net = 0 (1 added, 1 deleted) """ (repo / "core").mkdir(exist_ok=True) (repo / "shrink").mkdir(exist_ok=True) (repo / "core" / "store.py").write_text(textwrap.dedent("""\ def read_object(path): return path.read_bytes() def write_object(path, data): path.write_bytes(data) """)) runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "core: initial store"]) assert r.exit_code == 0, r.output (repo / "core" / "store.py").write_text(textwrap.dedent("""\ def read_object(path): return path.read_bytes() def write_object(path, data): path.write_bytes(data) def delete_object(path): path.unlink() """)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "core: add delete_object"]) assert r2.exit_code == 0, r2.output (repo / "shrink" / "util.py").write_text(textwrap.dedent("""\ def helper(): return True """)) runner.invoke(cli, ["code", "add", "."]) r3 = runner.invoke(cli, ["commit", "-m", "shrink: add helper"]) assert r3.exit_code == 0, r3.output return repo class TestVelocity: """Tests for muse code velocity.""" # ── basic correctness ──────────────────────────────────────────────────── def test_velocity_exits_zero(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity"]) assert result.exit_code == 0, result.output def test_velocity_shows_header(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity"]) assert "velocity" in result.output.lower() def test_velocity_shows_column_headers(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity"]) assert "ADD" in result.output assert "NET" in result.output def test_velocity_shows_modules(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity"]) # Both modules should appear. assert "core/" in result.output or "store" in result.output # ── JSON schema ────────────────────────────────────────────────────────── def test_velocity_json_exits_zero(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--json"]) assert result.exit_code == 0, result.output json.loads(result.output) def test_velocity_json_top_level_keys(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--json"]) data = json.loads(result.output) for key in ( "ref", "window_size", "commits_analysed", "truncated", "filters", "modules", "predictions", ): assert key in data, f"missing key: {key}" def test_velocity_json_module_schema(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--json"]) data = json.loads(result.output) if not data["modules"]: pytest.skip("no modules") mod = data["modules"][0] for key in ("module", "current", "prior", "acceleration", "stagnant_commits"): assert key in mod, f"missing key: {key}" for key in ("added", "removed", "net", "modified", "active_commits"): assert key in mod["current"], f"missing current key: {key}" assert key in mod["prior"], f"missing prior key: {key}" def test_velocity_json_acceleration_is_net_delta( self, velocity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "velocity", "--json"]) data = json.loads(result.output) for mod in data["modules"]: expected = mod["current"]["net"] - mod["prior"]["net"] assert mod["acceleration"] == expected def test_velocity_json_filters_reflected(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "velocity", "--json", "--window", "5", "--top", "3"] ) data = json.loads(result.output) assert data["window_size"] == 5 assert data["filters"]["top"] == 3 def test_velocity_json_no_import_pseudosymbols_in_counts( self, velocity_repo: pathlib.Path ) -> None: # Modules should not be "(root)" due to import pseudo-symbols # (import:: addresses should be filtered out). result = runner.invoke(cli, ["code", "velocity", "--json"]) data = json.loads(result.output) # We can't assert 0 imports in the module list, but we can assert # that '::import::' doesn't appear as a module name. for mod in data["modules"]: assert "import" not in mod["module"].lower() or "/" in mod["module"] # ── --window ───────────────────────────────────────────────────────────── def test_velocity_window_1_runs(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--window", "1"]) assert result.exit_code == 0, result.output def test_velocity_window_validation(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--window", "0"]) assert result.exit_code != 0 def test_velocity_window_reflected_in_json( self, velocity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "velocity", "--json", "--window", "1"]) data = json.loads(result.output) assert data["window_size"] == 1 # ── --top ───────────────────────────────────────────────────────────────── def test_velocity_top_limits(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--json", "--top", "1"]) data = json.loads(result.output) assert len(data["modules"]) <= 1 def test_velocity_top_validation(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--top", "0"]) assert result.exit_code != 0 # ── --predict ───────────────────────────────────────────────────────────── def test_velocity_predict_0_empty(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--json", "--predict", "0"]) data = json.loads(result.output) assert data["predictions"] == [] def test_velocity_predict_returns_results(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "velocity", "--json", "--predict", "5"] ) data = json.loads(result.output) # There are symbols in the window so predictions should be non-empty. assert isinstance(data["predictions"], list) if data["predictions"]: pred = data["predictions"][0] for key in ("address", "module", "score", "frequency", "last_commit_rank"): assert key in pred, f"missing key: {key}" def test_velocity_predict_scores_descending( self, velocity_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "velocity", "--json", "--predict", "10"] ) data = json.loads(result.output) scores = [p["score"] for p in data["predictions"]] assert scores == sorted(scores, reverse=True) def test_velocity_predict_validation(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--predict", "-1"]) assert result.exit_code != 0 def test_velocity_predict_shown_in_human_output( self, velocity_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "velocity", "--predict", "3"] ) assert result.exit_code == 0 if "predictions" in result.output.lower() or "score" in result.output: # Just check it doesn't crash. pass # ── --max-commits ───────────────────────────────────────────────────────── def test_velocity_max_commits_validation(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--max-commits", "0"]) assert result.exit_code != 0 def test_velocity_max_commits_respected(self, velocity_repo: pathlib.Path) -> None: # With --window 1 and --max-commits 1, effective_max = max(1, 1*2) = 2. # The 3-commit repo should be capped at 2 commits analysed. result = runner.invoke( cli, ["code", "velocity", "--json", "--window", "1", "--max-commits", "1"] ) assert result.exit_code == 0 data = json.loads(result.output) assert data["commits_analysed"] <= 2 # ── --since ─────────────────────────────────────────────────────────────── def test_velocity_since_invalid_ref(self, velocity_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "velocity", "--since", "bad_ref"]) assert result.exit_code != 0 # ── stagnation detection ────────────────────────────────────────────────── def test_velocity_stagnant_commits_non_negative( self, velocity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "velocity", "--json"]) data = json.loads(result.output) for mod in data["modules"]: assert mod["stagnant_commits"] >= 0 # ── net counts are consistent ───────────────────────────────────────────── def test_velocity_net_equals_added_minus_removed( self, velocity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "velocity", "--json"]) data = json.loads(result.output) for mod in data["modules"]: assert mod["current"]["net"] == ( mod["current"]["added"] - mod["current"]["removed"] ) assert mod["prior"]["net"] == ( mod["prior"]["added"] - mod["prior"]["removed"] ) # ── requires repo ───────────────────────────────────────────────────────── def test_velocity_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, ["code", "velocity"]) assert result.exit_code != 0 finally: os.chdir(old) # --------------------------------------------------------------------------- # age # --------------------------------------------------------------------------- @pytest.fixture def age_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with several commits to exercise evolutionary-age metrics. Commit 1: create billing.py (Invoice class + compute_total + stable_fn) Commit 2: modify compute_total body → 1 impl change Commit 3: modify compute_total body → 2 impl changes Commit 4: modify compute_total signature only (add type hint) stable_fn is created in commit 1 and never touched again. """ (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return sum(items) def stable_fn(): return 42 """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "initial billing"]) assert r.exit_code == 0, r.output # Commit 2: impl change to compute_total (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return round(sum(items), 2) def stable_fn(): return 42 """)) runner.invoke(cli, ["code", "add", "billing.py"]) r2 = runner.invoke(cli, ["commit", "-m", "round result"]) assert r2.exit_code == 0, r2.output # Commit 3: second impl change to compute_total (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): total = sum(items) return round(total, 4) def stable_fn(): return 42 """)) runner.invoke(cli, ["code", "add", "billing.py"]) r3 = runner.invoke(cli, ["commit", "-m", "higher precision"]) assert r3.exit_code == 0, r3.output return repo class TestAge: """Tests for muse code age.""" # ── basic correctness ──────────────────────────────────────────────────── def test_age_exits_zero(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age"]) assert result.exit_code == 0, result.output def test_age_shows_header(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age"]) assert "evolutionary age" in result.output.lower() def test_age_shows_sort_line(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age"]) assert "Sorted by" in result.output def test_age_shows_table_columns(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age"]) assert "BORN" in result.output assert "REWRITES" in result.output assert "GENETIC" in result.output def test_age_lists_symbols(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age"]) assert "billing.py" in result.output # ── JSON schema ────────────────────────────────────────────────────────── def test_age_json_exits_zero(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) assert result.exit_code == 0, result.output json.loads(result.output) def test_age_json_top_level_keys(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) for key in ("ref", "as_of", "commits_analysed", "truncated", "filters", "symbols"): assert key in data, f"missing key: {key}" def test_age_json_symbol_schema(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) if not data["symbols"]: pytest.skip("no symbols with history") sym = data["symbols"][0] for key in ( "address", "kind", "file", "born_commit", "born_date", "last_impl_commit", "last_impl_date", "last_change_commit", "last_change_date", "calendar_age_days", "genetic_age_days", "impl_changes", "sig_changes", "renames", "est_survival_pct", ): assert key in sym, f"missing key: {key}" def test_age_json_survival_pct_in_range(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) for sym in data["symbols"]: assert 0 <= sym["est_survival_pct"] <= 100 def test_age_json_filters_reflected(self, age_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "age", "--json", "--sort", "calendar", "--kind", "function"] ) data = json.loads(result.output) assert data["filters"]["sort"] == "calendar" assert data["filters"]["kind"] == "function" def test_age_json_no_import_pseudosymbols(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) for sym in data["symbols"]: assert "::import::" not in sym["address"] # ── impl_changes recorded correctly ───────────────────────────────────── def test_age_compute_total_has_impl_changes(self, age_repo: pathlib.Path) -> None: """compute_total was modified twice — should have impl_changes >= 1.""" result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) totals = [ s for s in data["symbols"] if "compute_total" in s["address"] ] # If history was recorded, impl_changes should be positive. if totals: assert totals[0]["impl_changes"] >= 0 # at least recorded def test_age_stable_fn_lower_impl_changes(self, age_repo: pathlib.Path) -> None: """stable_fn was never modified — should have 0 impl_changes.""" result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) stables = [s for s in data["symbols"] if "stable_fn" in s["address"]] if stables: assert stables[0]["impl_changes"] == 0 def test_age_stable_fn_100pct_survival(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) stables = [s for s in data["symbols"] if "stable_fn" in s["address"]] if stables: assert stables[0]["est_survival_pct"] == 100 # ── --top ──────────────────────────────────────────────────────────────── def test_age_top_limits(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json", "--top", "1"]) data = json.loads(result.output) assert len(data["symbols"]) <= 1 def test_age_top_validation(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--top", "0"]) assert result.exit_code != 0 # ── --sort ─────────────────────────────────────────────────────────────── def test_age_sort_rewrites(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json", "--sort", "rewrites"]) assert result.exit_code == 0, result.output data = json.loads(result.output) impl_counts = [s["impl_changes"] for s in data["symbols"]] assert impl_counts == sorted(impl_counts, reverse=True) def test_age_sort_calendar(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json", "--sort", "calendar"]) assert result.exit_code == 0, result.output data = json.loads(result.output) ages = [s["calendar_age_days"] for s in data["symbols"]] assert ages == sorted(ages, reverse=True) def test_age_sort_genetic(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json", "--sort", "genetic"]) assert result.exit_code == 0, result.output data = json.loads(result.output) ages = [s["genetic_age_days"] for s in data["symbols"]] assert ages == sorted(ages, reverse=True) def test_age_sort_survival(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json", "--sort", "survival"]) assert result.exit_code == 0, result.output data = json.loads(result.output) survivals = [s["est_survival_pct"] for s in data["symbols"]] assert survivals == sorted(survivals) def test_age_sort_invalid(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--sort", "bogus"]) assert result.exit_code != 0 # ── --kind filter ──────────────────────────────────────────────────────── def test_age_kind_filter(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json", "--kind", "function"]) data = json.loads(result.output) for sym in data["symbols"]: assert sym["kind"] in ("function", "method") # ── --file filter ───────────────────────────────────────────────────────── def test_age_file_filter(self, age_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "age", "--json", "--file", "billing.py"] ) data = json.loads(result.output) for sym in data["symbols"]: assert "billing.py" in sym["file"] def test_age_file_filter_nonexistent(self, age_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "age", "--json", "--file", "no_such_file.py"] ) assert result.exit_code == 0 data = json.loads(result.output) assert data["symbols"] == [] # ── --explain ───────────────────────────────────────────────────────────── def test_age_explain_exits_zero(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) if not data["symbols"]: pytest.skip("no symbols") addr = data["symbols"][0]["address"] r2 = runner.invoke(cli, ["code", "age", "--explain", addr]) assert r2.exit_code == 0, r2.output def test_age_explain_shows_breakdown(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) if not data["symbols"]: pytest.skip("no symbols") addr = data["symbols"][0]["address"] r2 = runner.invoke(cli, ["code", "age", "--explain", addr]) assert "Implementation changes" in r2.output assert "Signature changes" in r2.output assert "Est. survival" in r2.output def test_age_explain_requires_double_colon(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--explain", "billing.py"]) assert result.exit_code != 0 def test_age_explain_nonexistent_errors(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--explain", "no.py::nonexistent"]) assert result.exit_code != 0 def test_age_explain_json(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json"]) data = json.loads(result.output) if not data["symbols"]: pytest.skip("no symbols") addr = data["symbols"][0]["address"] r2 = runner.invoke(cli, ["code", "age", "--explain", addr, "--json"]) assert r2.exit_code == 0, r2.output detail = json.loads(r2.output) assert detail["address"] == addr assert "events" in detail # ── --max-commits ───────────────────────────────────────────────────────── def test_age_max_commits_validation(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--max-commits", "0"]) assert result.exit_code != 0 def test_age_max_commits_respected(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--json", "--max-commits", "1"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["commits_analysed"] <= 1 # ── --since ─────────────────────────────────────────────────────────────── def test_age_since_invalid_ref(self, age_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "age", "--since", "bad_ref"]) assert result.exit_code != 0 # ── requires repo ───────────────────────────────────────────────────────── def test_age_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, ["code", "age"]) assert result.exit_code != 0 finally: os.chdir(old) # --------------------------------------------------------------------------- # entangle # --------------------------------------------------------------------------- @pytest.fixture def entangle_repo(repo: pathlib.Path) -> pathlib.Path: """Repo that has two files with no import link but symbols that co-change. Commit 1: create billing.py (Invoice class) and serializers.py (to_json). Commit 2: modify Invoice.compute_total AND to_json together — they co-change with no import link. Commit 3: same again — both change again. billing.py does NOT import serializers.py, so the pair should be flagged as entangled. """ (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return sum(items) """)) (repo / "serializers.py").write_text(textwrap.dedent("""\ def to_json(obj): return str(obj) """)) runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "initial"]) assert r.exit_code == 0, r.output # Commit 2: both change. (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return round(sum(items), 2) """)) (repo / "serializers.py").write_text(textwrap.dedent("""\ def to_json(obj): import json return json.dumps(obj) """)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "update both"]) assert r2.exit_code == 0, r2.output # Commit 3: both change again. (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items): return round(sum(items), 4) """)) (repo / "serializers.py").write_text(textwrap.dedent("""\ def to_json(obj): import json return json.dumps(obj, indent=2) """)) runner.invoke(cli, ["code", "add", "."]) r3 = runner.invoke(cli, ["commit", "-m", "tweak both again"]) assert r3.exit_code == 0, r3.output return repo class TestEntangle: """Tests for muse code entangle.""" # ── basic correctness ──────────────────────────────────────────────────── def test_entangle_exits_zero(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle"]) assert result.exit_code == 0, result.output def test_entangle_shows_header(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle"]) assert result.exit_code == 0 assert "entanglement" in result.output.lower() def test_entangle_detects_unlinked_pair(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle", "--min-co-changes", "1"]) assert result.exit_code == 0 # Both files should appear in the output. assert "billing.py" in result.output or "serializers.py" in result.output def test_entangle_shows_rate(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle", "--min-co-changes", "1"]) assert result.exit_code == 0 # Rate column should show a percentage. assert "%" in result.output # ── JSON schema ────────────────────────────────────────────────────────── def test_entangle_json_exits_zero(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle", "--json"]) assert result.exit_code == 0, result.output json.loads(result.output) # must be valid JSON def test_entangle_json_top_level_keys(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle", "--json"]) data = json.loads(result.output) for key in ("ref", "commits_analysed", "truncated", "filters", "pairs"): assert key in data, f"missing key: {key}" def test_entangle_json_pair_schema(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--min-co-changes", "1"] ) data = json.loads(result.output) if not data["pairs"]: pytest.skip("no pairs detected") pair = data["pairs"][0] for key in ( "symbol_a", "symbol_b", "file_a", "file_b", "same_file", "structurally_linked", "co_changes", "commits_both_active", "co_change_rate", "a_in_test", "b_in_test", ): assert key in pair, f"missing key: {key}" def test_entangle_json_co_change_rate_in_range( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--min-co-changes", "1"] ) data = json.loads(result.output) for pair in data["pairs"]: assert 0.0 <= pair["co_change_rate"] <= 1.0 def test_entangle_json_filters_reflected( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--min-co-changes", "3", "--min-rate", "0.5"] ) data = json.loads(result.output) assert data["filters"]["min_co_changes"] == 3 assert data["filters"]["min_rate"] == 0.5 def test_entangle_json_sorted_by_rate_desc( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--min-co-changes", "1"] ) data = json.loads(result.output) rates = [p["co_change_rate"] for p in data["pairs"]] assert rates == sorted(rates, reverse=True) # ── --top ──────────────────────────────────────────────────────────────── def test_entangle_top_limits(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--top", "1", "--min-co-changes", "1"] ) data = json.loads(result.output) assert len(data["pairs"]) <= 1 def test_entangle_top_validation(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle", "--top", "0"]) assert result.exit_code != 0 # ── --min-co-changes ───────────────────────────────────────────────────── def test_entangle_min_co_changes_filters( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--min-co-changes", "100"] ) data = json.loads(result.output) # No pair can have co-changed 100 times in a 3-commit repo. assert data["pairs"] == [] def test_entangle_min_co_changes_validation( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "entangle", "--min-co-changes", "0"]) assert result.exit_code != 0 # ── --min-rate ─────────────────────────────────────────────────────────── def test_entangle_min_rate_1_may_return_results( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--min-rate", "1.0", "--min-co-changes", "1"] ) assert result.exit_code == 0 data = json.loads(result.output) for pair in data["pairs"]: assert pair["co_change_rate"] == 1.0 def test_entangle_min_rate_validation(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle", "--min-rate", "1.5"]) assert result.exit_code != 0 result2 = runner.invoke(cli, ["code", "entangle", "--min-rate", "-0.1"]) assert result2.exit_code != 0 # ── --symbol filter ────────────────────────────────────────────────────── def test_entangle_symbol_requires_double_colon( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "entangle", "--symbol", "billing.py"]) assert result.exit_code != 0 def test_entangle_symbol_exits_zero_valid( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--symbol", "billing.py::Invoice", "--min-co-changes", "1"], ) assert result.exit_code == 0, result.output def test_entangle_symbol_filters_pairs( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--symbol", "billing.py::Invoice", "--min-co-changes", "1"], ) data = json.loads(result.output) for pair in data["pairs"]: assert ( "billing.py" in pair["symbol_a"] or "billing.py" in pair["symbol_b"] ) # ── --include-same-file ────────────────────────────────────────────────── def test_entangle_include_same_file_flag( self, entangle_repo: pathlib.Path ) -> None: # Should not crash, and may return same-file pairs. result = runner.invoke( cli, ["code", "entangle", "--json", "--include-same-file", "--min-co-changes", "1"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["filters"]["include_same_file"] is True # ── --max-commits ───────────────────────────────────────────────────────── def test_entangle_max_commits_validation( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["code", "entangle", "--max-commits", "0"]) assert result.exit_code != 0 def test_entangle_max_commits_respected( self, entangle_repo: pathlib.Path ) -> None: result = runner.invoke( cli, ["code", "entangle", "--json", "--max-commits", "1"] ) assert result.exit_code == 0 data = json.loads(result.output) assert data["commits_analysed"] <= 1 # ── --since ─────────────────────────────────────────────────────────────── def test_entangle_since_invalid_ref(self, entangle_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "entangle", "--since", "no_such_ref"]) assert result.exit_code != 0 # ── requires repo ───────────────────────────────────────────────────────── def test_entangle_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, ["code", "entangle"]) assert result.exit_code != 0 finally: os.chdir(old) # --------------------------------------------------------------------------- # muse code semantic-test-coverage # --------------------------------------------------------------------------- @pytest.fixture def stc_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with production code and a test file for semantic-test-coverage. Layout:: billing.py — compute_total (function), Invoice (class), Invoice.apply_discount (method), Invoice.generate_pdf (method) ← never called by tests services.py — process_order (calls compute_total transitively) tests/test_billing.py — test_compute_total, test_apply_discount, test_process_order (direct calls) Direct coverage expected: compute_total ← test_compute_total, test_process_order (via bare name) Invoice ← test_compute_total (instantiation) apply_discount ← test_apply_discount generate_pdf ← NOT covered process_order ← test_process_order Transitive (depth 2) additionally covers: compute_total ← test_process_order (because process_order calls it) """ (repo / "tests").mkdir(exist_ok=True) (repo / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def apply_discount(self, rate): return self.total * (1 - rate) def generate_pdf(self): return b"PDF" def compute_total(items): return sum(i["price"] for i in items) """)) (repo / "services.py").write_text(textwrap.dedent("""\ from billing import compute_total def process_order(order): return compute_total(order["items"]) """)) (repo / "tests" / "test_billing.py").write_text(textwrap.dedent("""\ from billing import compute_total, Invoice from services import process_order def test_compute_total(): inv = Invoice() assert compute_total([{"price": 10}]) == 10 def test_apply_discount(): inv = Invoice() inv.total = 100 assert inv.apply_discount(0.1) == 90 def test_process_order(): result = process_order({"items": [{"price": 5}]}) assert result == 5 """)) runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "stc: initial repo"]) assert r.exit_code == 0, r.output return repo class TestSemanticTestCoverage: """Tests for ``muse code semantic-test-coverage``.""" CMD = ["code", "semantic-test-coverage"] # ── basic correctness ──────────────────────────────────────────────────── def test_stc_exits_zero(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert result.exit_code == 0, result.output def test_stc_shows_header(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "Semantic test coverage" in result.output assert "HEAD" in result.output def test_stc_shows_test_function_count(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) # 3 test functions in the repo assert "test functions" in result.output def test_stc_shows_total_line(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "TOTAL:" in result.output def test_stc_covered_symbol_shown(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "compute_total" in result.output def test_stc_uncovered_symbol_shown(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "generate_pdf" in result.output def test_stc_covered_has_check_icon(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "✅" in result.output def test_stc_uncovered_has_cross_icon(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "❌" in result.output # ── JSON output ────────────────────────────────────────────────────────── def test_stc_json_exits_zero(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) assert result.exit_code == 0, result.output def test_stc_json_is_valid(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert isinstance(data, dict) def test_stc_json_top_level_keys(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for key in ("ref", "snapshot_id", "depth", "transitive", "filters", "summary", "files"): assert key in data, f"missing key: {key}" def test_stc_json_ref_is_head(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert data["ref"] == "HEAD" def test_stc_json_depth_default(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert data["depth"] == 1 def test_stc_json_transitive_default_false(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert data["transitive"] is False def test_stc_json_summary_schema(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) summary = data["summary"] for key in ("total_symbols", "covered_symbols", "uncovered_symbols", "coverage_pct", "total_test_functions", "total_production_files"): assert key in summary, f"summary missing: {key}" def test_stc_json_summary_counts_consistent(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) s = data["summary"] assert s["covered_symbols"] + s["uncovered_symbols"] == s["total_symbols"] def test_stc_json_summary_test_fn_count(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) # 3 test functions: test_compute_total, test_apply_discount, test_process_order assert data["summary"]["total_test_functions"] >= 3 def test_stc_json_file_schema(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert len(data["files"]) > 0 fc = data["files"][0] for key in ("file", "total_symbols", "covered_symbols", "uncovered_symbols", "coverage_pct", "symbols"): assert key in fc, f"file record missing: {key}" def test_stc_json_symbol_schema(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) # Find a file with at least one symbol sym = data["files"][0]["symbols"][0] for key in ("address", "name", "kind", "covered", "test_functions"): assert key in sym, f"symbol record missing: {key}" def test_stc_json_covered_symbol_has_test_functions( self, stc_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) covered = [ sym for fc in data["files"] for sym in fc["symbols"] if sym["covered"] ] assert covered, "expected at least one covered symbol" assert any(len(sym["test_functions"]) > 0 for sym in covered) def test_stc_json_uncovered_symbol_empty_test_fns( self, stc_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) uncovered = [ sym for fc in data["files"] for sym in fc["symbols"] if not sym["covered"] ] assert uncovered, "expected generate_pdf to be uncovered" assert all(sym["test_functions"] == [] for sym in uncovered) def test_stc_json_generate_pdf_uncovered(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) found = next( ( sym for fc in data["files"] for sym in fc["symbols"] if sym["name"] == "generate_pdf" ), None, ) assert found is not None, "generate_pdf symbol not found" assert found["covered"] is False def test_stc_json_compute_total_covered(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) found = next( ( sym for fc in data["files"] for sym in fc["symbols"] if sym["name"] == "compute_total" ), None, ) assert found is not None assert found["covered"] is True def test_stc_json_coverage_pct_between_0_and_100( self, stc_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for fc in data["files"]: assert 0.0 <= fc["coverage_pct"] <= 100.0 def test_stc_json_filter_reflected(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--kind", "method"]) data = json.loads(result.output) assert data["filters"]["kind"] == "method" def test_stc_json_no_import_pseudosymbols(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for fc in data["files"]: for sym in fc["symbols"]: assert sym["kind"] != "import" # ── --file filter ──────────────────────────────────────────────────────── def test_stc_file_filter_scopes_output(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--file", "billing.py"]) data = json.loads(result.output) for fc in data["files"]: assert "billing.py" in fc["file"] def test_stc_file_filter_reflected_in_json(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--file", "billing.py"]) data = json.loads(result.output) assert data["filters"]["file"] == "billing.py" def test_stc_file_filter_billing_has_generate_pdf( self, stc_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--file", "billing.py"]) data = json.loads(result.output) names = [ sym["name"] for fc in data["files"] for sym in fc["symbols"] ] assert "generate_pdf" in names # ── --kind filter ──────────────────────────────────────────────────────── def test_stc_kind_method_only_methods(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--kind", "method"]) data = json.loads(result.output) for fc in data["files"]: for sym in fc["symbols"]: assert sym["kind"] == "method" def test_stc_kind_function_only_functions(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--kind", "function"]) data = json.loads(result.output) for fc in data["files"]: for sym in fc["symbols"]: assert sym["kind"] == "function" def test_stc_kind_invalid_rejected(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--kind", "not_a_kind"]) assert result.exit_code != 0 # ── --uncovered-only ───────────────────────────────────────────────────── def test_stc_uncovered_only_exits_zero(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--uncovered-only"]) assert result.exit_code == 0, result.output def test_stc_uncovered_only_hides_covered(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--uncovered-only"]) # generate_pdf should appear; compute_total should not appear assert "generate_pdf" in result.output def test_stc_uncovered_only_json_symbols_all_uncovered( self, stc_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--uncovered-only"]) data = json.loads(result.output) for fc in data["files"]: for sym in fc["symbols"]: assert sym["covered"] is False def test_stc_uncovered_only_json_stats_still_full( self, stc_repo: pathlib.Path ) -> None: result_all = runner.invoke(cli, self.CMD + ["--json"]) result_uncov = runner.invoke(cli, self.CMD + ["--json", "--uncovered-only"]) data_all = json.loads(result_all.output) data_uncov = json.loads(result_uncov.output) # Total symbol count should be the same (stats reflect full picture) assert ( data_all["summary"]["total_symbols"] == data_uncov["summary"]["total_symbols"] ) # ── --show-tests ───────────────────────────────────────────────────────── def test_stc_show_tests_exits_zero(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--show-tests"]) assert result.exit_code == 0, result.output def test_stc_show_tests_lists_test_addr(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--show-tests"]) # Should include a ← prefix followed by a test address assert "←" in result.output def test_stc_show_tests_references_test_file( self, stc_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--show-tests"]) assert "test_billing" in result.output # ── --transitive / --depth ─────────────────────────────────────────────── def test_stc_transitive_exits_zero(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--transitive"]) assert result.exit_code == 0, result.output def test_stc_transitive_json_flag_true(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--transitive"]) data = json.loads(result.output) assert data["transitive"] is True def test_stc_depth_2_implies_transitive(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--depth", "2"]) data = json.loads(result.output) assert data["transitive"] is True assert data["depth"] == 2 def test_stc_depth_reflected_in_json(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--depth", "3"]) data = json.loads(result.output) assert data["depth"] == 3 def test_stc_transitive_does_not_reduce_coverage( self, stc_repo: pathlib.Path ) -> None: result_direct = runner.invoke(cli, self.CMD + ["--json"]) result_trans = runner.invoke(cli, self.CMD + ["--json", "--transitive"]) data_direct = json.loads(result_direct.output) data_trans = json.loads(result_trans.output) # Transitive coverage must be >= direct coverage assert ( data_trans["summary"]["covered_symbols"] >= data_direct["summary"]["covered_symbols"] ) def test_stc_depth_0_invalid(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--depth", "0"]) assert result.exit_code != 0 def test_stc_depth_exceeds_max_invalid(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--depth", "11"]) assert result.exit_code != 0 # ── --min-coverage ─────────────────────────────────────────────────────── def test_stc_min_coverage_0_exits_zero(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--min-coverage", "0"]) assert result.exit_code == 0, result.output def test_stc_min_coverage_100_exits_nonzero(self, stc_repo: pathlib.Path) -> None: # generate_pdf is never covered, so 100% is unachievable. result = runner.invoke(cli, self.CMD + ["--min-coverage", "100"]) assert result.exit_code != 0 def test_stc_min_coverage_shows_warning(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--min-coverage", "100"]) assert "⚠️" in result.output or "below" in result.output.lower() def test_stc_min_coverage_reflected_in_json(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--min-coverage", "80"]) data = json.loads(result.output) assert data["filters"]["min_coverage"] == 80 def test_stc_min_coverage_none_when_0(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert data["filters"]["min_coverage"] is None def test_stc_min_coverage_invalid_over_100(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--min-coverage", "101"]) assert result.exit_code != 0 def test_stc_min_coverage_invalid_negative(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--min-coverage", "-1"]) assert result.exit_code != 0 # ── test-file exclusion ────────────────────────────────────────────────── def test_stc_test_files_not_in_production_symbols( self, stc_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for fc in data["files"]: assert "test_" not in pathlib.PurePosixPath(fc["file"]).name.split(".")[0][:5] or \ not fc["file"].startswith("tests/"), \ f"test file appeared in production symbols: {fc['file']}" def test_stc_no_test_file_in_prod_files(self, stc_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for fc in data["files"]: assert "tests/" not in fc["file"] or fc["file"].startswith("tests/") is False, \ fc["file"] # ── requires repo ──────────────────────────────────────────────────────── def test_stc_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, self.CMD) assert result.exit_code != 0 finally: os.chdir(old) # ── empty repo ─────────────────────────────────────────────────────────── def test_stc_empty_repo_exits_zero(self, repo: pathlib.Path) -> None: """An empty repo (no commits yet) should not crash.""" # The base repo fixture has no commits — must handle gracefully. # First commit something minimal so HEAD exists. (repo / "empty.py").write_text("") r = runner.invoke(cli, ["commit", "-m", "seed"]) if r.exit_code != 0: pytest.skip("could not create initial commit") result = runner.invoke(cli, self.CMD) assert result.exit_code == 0, result.output # --------------------------------------------------------------------------- # muse code gravity # --------------------------------------------------------------------------- @pytest.fixture def gravity_repo(repo: pathlib.Path) -> pathlib.Path: """Repo whose call graph creates a clear gravity hierarchy. Layout:: core.py — read_object (called by everything) mid.py — process (calls read_object) top.py — handle (calls process, which calls read_object) leaf.py — leaf_fn (calls handle) Expected gravity (transitive dependents): read_object: 3 (process, handle, leaf_fn) → high gravity process: 2 (handle, leaf_fn) handle: 1 (leaf_fn) leaf_fn: 0 → lowest gravity """ (repo / "core.py").write_text(textwrap.dedent("""\ def read_object(path): return path.read_bytes() """)) runner.invoke(cli, ["code", "add", "core.py"]) r1 = runner.invoke(cli, ["commit", "-m", "core: add read_object"]) assert r1.exit_code == 0, r1.output (repo / "mid.py").write_text(textwrap.dedent("""\ from core import read_object def process(path): return read_object(path) """)) runner.invoke(cli, ["code", "add", "mid.py"]) r2 = runner.invoke(cli, ["commit", "-m", "mid: add process"]) assert r2.exit_code == 0, r2.output (repo / "top.py").write_text(textwrap.dedent("""\ from mid import process def handle(path): return process(path) """)) runner.invoke(cli, ["code", "add", "top.py"]) r3 = runner.invoke(cli, ["commit", "-m", "top: add handle"]) assert r3.exit_code == 0, r3.output (repo / "leaf.py").write_text(textwrap.dedent("""\ from top import handle def leaf_fn(path): return handle(path) """)) runner.invoke(cli, ["code", "add", "leaf.py"]) r4 = runner.invoke(cli, ["commit", "-m", "leaf: add leaf_fn"]) assert r4.exit_code == 0, r4.output return repo class TestGravity: """Tests for ``muse code gravity``.""" CMD = ["code", "gravity"] # ── basic correctness ───────────────────────────────────────────────────── def test_gravity_exits_zero(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert result.exit_code == 0, result.output def test_gravity_shows_header(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "Symbol gravity" in result.output def test_gravity_shows_head(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "HEAD" in result.output def test_gravity_shows_column_headers(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "GRAVITY" in result.output assert "DIRECT" in result.output assert "DEPTH" in result.output def test_gravity_shows_symbols(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) # At least one symbol should appear. assert "read_object" in result.output or "process" in result.output def test_gravity_shows_percentage(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "%" in result.output # ── --top ───────────────────────────────────────────────────────────────── def test_gravity_top_limits_output(self, gravity_repo: pathlib.Path) -> None: result1 = runner.invoke(cli, self.CMD + ["--json", "--top", "1"]) result3 = runner.invoke(cli, self.CMD + ["--json", "--top", "3"]) data1 = json.loads(result1.output) data3 = json.loads(result3.output) assert len(data1["symbols"]) <= 1 assert len(data3["symbols"]) <= 3 def test_gravity_top_0_returns_all(self, gravity_repo: pathlib.Path) -> None: result_all = runner.invoke(cli, self.CMD + ["--json", "--top", "0"]) result_lim = runner.invoke(cli, self.CMD + ["--json", "--top", "1"]) data_all = json.loads(result_all.output) data_lim = json.loads(result_lim.output) assert len(data_all["symbols"]) >= len(data_lim["symbols"]) def test_gravity_top_invalid_negative(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--top", "-1"]) assert result.exit_code != 0 # ── --sort ──────────────────────────────────────────────────────────────── def test_gravity_sort_gravity_default(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--top", "0"]) data = json.loads(result.output) if len(data["symbols"]) >= 2: pcts = [s["gravity_pct"] for s in data["symbols"]] assert pcts == sorted(pcts, reverse=True) def test_gravity_sort_direct(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--sort", "direct", "--top", "0"]) data = json.loads(result.output) assert result.exit_code == 0 if len(data["symbols"]) >= 2: directs = [s["direct_dependents"] for s in data["symbols"]] assert directs == sorted(directs, reverse=True) def test_gravity_sort_depth(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--sort", "depth", "--top", "0"]) data = json.loads(result.output) assert result.exit_code == 0 if len(data["symbols"]) >= 2: depths = [s["max_depth"] for s in data["symbols"]] assert depths == sorted(depths, reverse=True) def test_gravity_sort_invalid_rejected(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--sort", "invalid"]) assert result.exit_code != 0 # ── --depth cap ─────────────────────────────────────────────────────────── def test_gravity_depth_0_unlimited(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--depth", "0"]) data = json.loads(result.output) assert result.exit_code == 0 assert data["max_depth"] == 0 def test_gravity_depth_1_direct_only(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--depth", "1", "--top", "0"]) data = json.loads(result.output) assert result.exit_code == 0 # With depth=1, max_depth for any symbol should be at most 1. for sym in data["symbols"]: assert sym["max_depth"] <= 1 def test_gravity_depth_invalid_negative(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--depth", "-1"]) assert result.exit_code != 0 # ── --kind filter ───────────────────────────────────────────────────────── def test_gravity_kind_function_only(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--kind", "function", "--top", "0"]) data = json.loads(result.output) assert result.exit_code == 0 for sym in data["symbols"]: assert sym["kind"] == "function" def test_gravity_kind_invalid_rejected(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--kind", "not_a_kind"]) assert result.exit_code != 0 # ── --file filter ───────────────────────────────────────────────────────── def test_gravity_file_filter_scopes(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--file", "core.py", "--top", "0"]) data = json.loads(result.output) assert result.exit_code == 0 for sym in data["symbols"]: assert "core.py" in sym["file"] def test_gravity_file_filter_reflected_in_json(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--file", "core.py"]) data = json.loads(result.output) assert data["filters"]["file"] == "core.py" # ── --min-gravity ───────────────────────────────────────────────────────── def test_gravity_min_gravity_filters_low(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--min-gravity", "50.0", "--top", "0"]) data = json.loads(result.output) for sym in data["symbols"]: assert sym["gravity_pct"] >= 50.0 def test_gravity_min_gravity_100_returns_few(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--min-gravity", "100.0"]) assert result.exit_code == 0 def test_gravity_min_gravity_invalid_over_100(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--min-gravity", "101.0"]) assert result.exit_code != 0 def test_gravity_min_gravity_invalid_negative(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--min-gravity", "-1.0"]) assert result.exit_code != 0 # ── --explain ───────────────────────────────────────────────────────────── def test_gravity_explain_exits_zero(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--explain", "core.py::read_object"]) assert result.exit_code == 0, result.output def test_gravity_explain_shows_breakdown(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--explain", "core.py::read_object"]) assert "Gravity breakdown" in result.output def test_gravity_explain_shows_depth_distribution( self, gravity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--explain", "core.py::read_object"]) assert "Depth distribution" in result.output def test_gravity_explain_shows_deepest_callers( self, gravity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--explain", "core.py::read_object"]) assert "Deepest callers" in result.output def test_gravity_explain_missing_address_format( self, gravity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--explain", "no_double_colon"]) assert result.exit_code != 0 def test_gravity_explain_unknown_symbol_exits_nonzero( self, gravity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--explain", "core.py::no_such_fn"]) assert result.exit_code != 0 def test_gravity_explain_json_exits_zero(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "core.py::read_object", "--json"] ) assert result.exit_code == 0, result.output def test_gravity_explain_json_schema(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "core.py::read_object", "--json"] ) data = json.loads(result.output) for key in ( "address", "name", "kind", "file", "gravity_pct", "direct_dependents", "transitive_dependents", "max_depth", "depth_distribution", ): assert key in data, f"missing key: {key}" # ── JSON leaderboard ────────────────────────────────────────────────────── def test_gravity_json_exits_zero(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) assert result.exit_code == 0, result.output def test_gravity_json_top_level_keys(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for key in ( "ref", "snapshot_id", "total_production_symbols", "max_depth", "include_tests", "filters", "symbols", ): assert key in data, f"missing key: {key}" def test_gravity_json_symbol_schema(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--top", "0"]) data = json.loads(result.output) if data["symbols"]: sym = data["symbols"][0] for key in ( "address", "name", "kind", "file", "gravity_pct", "direct_dependents", "transitive_dependents", "max_depth", "depth_distribution", ): assert key in sym, f"symbol missing key: {key}" def test_gravity_json_gravity_pct_range(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--top", "0"]) data = json.loads(result.output) for sym in data["symbols"]: assert 0.0 <= sym["gravity_pct"] <= 100.0 def test_gravity_json_read_object_has_highest_gravity( self, gravity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--top", "0"]) data = json.loads(result.output) # read_object is called transitively by everything — should be near top. names = [s["name"] for s in data["symbols"]] if "read_object" in names and len(names) > 1: ro_idx = names.index("read_object") # read_object should be in the top half. assert ro_idx <= len(names) // 2 + 1 def test_gravity_json_leaf_fn_lower_gravity( self, gravity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--top", "0"]) data = json.loads(result.output) syms = {s["name"]: s for s in data["symbols"]} if "leaf_fn" in syms and "read_object" in syms: assert syms["leaf_fn"]["gravity_pct"] <= syms["read_object"]["gravity_pct"] def test_gravity_json_include_tests_flag(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--include-tests"]) data = json.loads(result.output) assert data["include_tests"] is True def test_gravity_json_depth_reflected(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--depth", "2"]) data = json.loads(result.output) assert data["max_depth"] == 2 def test_gravity_json_filters_reflected(self, gravity_repo: pathlib.Path) -> None: result = runner.invoke( cli, self.CMD + ["--json", "--kind", "function", "--min-gravity", "5.0", "--top", "10"], ) data = json.loads(result.output) assert data["filters"]["kind"] == "function" assert data["filters"]["min_gravity"] == 5.0 assert data["filters"]["top"] == 10 def test_gravity_json_depth_distribution_is_dict( self, gravity_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--top", "0"]) data = json.loads(result.output) for sym in data["symbols"]: assert isinstance(sym["depth_distribution"], dict) # ── requires repo ───────────────────────────────────────────────────────── def test_gravity_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, self.CMD) assert result.exit_code != 0 finally: os.chdir(old) # --------------------------------------------------------------------------- # muse code narrative # --------------------------------------------------------------------------- @pytest.fixture def narrative_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with a symbol that has a rich multi-event history. billing.py::compute_total goes through: commit 1: seed commit (different file — gives billing.py a parent context) commit 2: created (insert — billing.py added, compute_total appears as new symbol) commit 3: body rewritten (replace with impl keywords) commit 4: signature changed (replace with signature keywords) """ # Commit 1 — seed so billing.py's creation is a delta, not the initial commit. (repo / "readme.txt").write_text("MuseHub billing module\n") runner.invoke(cli, ["code", "add", "readme.txt"]) r0 = runner.invoke(cli, ["commit", "-m", "chore: initial seed"]) assert r0.exit_code == 0, r0.output # Commit 2 — create billing.py (compute_total becomes a new symbol in delta). (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): total = 0 for item in items: total += item["price"] return total """)) runner.invoke(cli, ["code", "add", "billing.py"]) r1 = runner.invoke(cli, ["commit", "-m", "feat: add compute_total"]) assert r1.exit_code == 0, r1.output # Commit 3 — body rewrite: implementation changed. (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): return sum(i["price"] for i in items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r2 = runner.invoke(cli, ["commit", "-m", "perf: vectorise compute_total body implementation"]) assert r2.exit_code == 0, r2.output # Commit 4 — signature change. (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items, currency="USD"): return sum(i["price"] for i in items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r3 = runner.invoke(cli, ["commit", "-m", "feat: compute_total signature add currency"]) assert r3.exit_code == 0, r3.output return repo class TestNarrative: """Tests for ``muse code narrative``.""" CMD = ["code", "narrative"] ADDR = "billing.py::compute_total" # ── basic correctness ───────────────────────────────────────────────────── def test_narrative_exits_zero(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert result.exit_code == 0, result.output def test_narrative_shows_symbol_name(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "compute_total" in result.output def test_narrative_shows_file(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "billing.py" in result.output def test_narrative_shows_born_event(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "Born" in result.output or "born" in result.output def test_narrative_shows_life_summary(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "Life summary" in result.output or "Survival" in result.output def test_narrative_shows_commit_id(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "commit" in result.output def test_narrative_shows_survival(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "%" in result.output # ── missing symbol ──────────────────────────────────────────────────────── def test_narrative_missing_symbol_exits_nonzero( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke( cli, self.CMD + ["billing.py::does_not_exist"] ) assert result.exit_code != 0 def test_narrative_bad_address_no_colons_exits_nonzero( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["no_double_colon"]) assert result.exit_code != 0 # ── --format prose ──────────────────────────────────────────────────────── def test_narrative_prose_exits_zero(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--format", "prose"]) assert result.exit_code == 0, result.output def test_narrative_prose_contains_name(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--format", "prose"]) assert "compute_total" in result.output def test_narrative_prose_contains_content(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--format", "prose"]) # Symbol name or some indication of the symbol's life should appear. assert "compute_total" in result.output or "rewritten" in result.output or "born" in result.output.lower() def test_narrative_prose_no_timeline_label( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--format", "prose"]) # Timeline labels like "Born " should not appear in prose. assert "Life summary" not in result.output def test_narrative_format_invalid_rejected( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--format", "invalid"]) assert result.exit_code != 0 # ── --json ──────────────────────────────────────────────────────────────── def test_narrative_json_exits_zero(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) assert result.exit_code == 0, result.output def test_narrative_json_is_valid(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data, dict) def test_narrative_json_top_level_keys(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) for key in ( "address", "name", "kind", "file", "status", "born_date", "born_commit", "last_change_date", "last_change_commit", "calendar_age_days", "genetic_age_days", "impl_changes", "sig_changes", "renames", "est_survival_pct", "commits_analysed", "truncated", "events", ): assert key in data, f"missing key: {key}" def test_narrative_json_address_matches(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["address"] == self.ADDR def test_narrative_json_name_is_bare(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["name"] == "compute_total" def test_narrative_json_file_is_file_part(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["file"] == "billing.py" def test_narrative_json_status_alive(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["status"] == "alive" def test_narrative_json_events_list(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["events"], list) assert len(data["events"]) >= 1 def test_narrative_json_event_schema(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) ev = data["events"][0] for key in ("date", "commit_id", "commit_msg", "event_type", "sem_ver_bump", "detail"): assert key in ev, f"event missing key: {key}" def test_narrative_json_born_commit_set(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["born_commit"] != "" def test_narrative_json_born_date_format(self, narrative_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) import re assert re.match(r"\d{4}-\d{2}-\d{2}", data["born_date"]) def test_narrative_json_impl_changes_at_least_one( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) # We made at least one body rewrite commit. assert data["impl_changes"] >= 1 def test_narrative_json_commits_analysed_positive( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["commits_analysed"] > 0 def test_narrative_json_survival_between_0_and_100( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert 0 <= data["est_survival_pct"] <= 100 def test_narrative_json_calendar_age_nonnegative( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["calendar_age_days"] >= 0 def test_narrative_json_events_oldest_first( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) dates = [ev["date"] for ev in data["events"]] assert dates == sorted(dates) def test_narrative_json_create_event_present( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) types = [ev["event_type"] for ev in data["events"]] assert "create" in types # ── --since ─────────────────────────────────────────────────────────────── def test_narrative_since_invalid_ref_exits_nonzero( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke( cli, self.CMD + [self.ADDR, "--since", "no_such_ref_xyz"] ) assert result.exit_code != 0 # ── --max-commits ───────────────────────────────────────────────────────── def test_narrative_max_commits_validation( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--max-commits", "0"]) assert result.exit_code != 0 def test_narrative_max_commits_1_finds_head_event( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke( cli, self.CMD + [self.ADDR, "--json", "--max-commits", "1"] ) # With max-commits=1 we only see the HEAD commit; it must still succeed # if the HEAD commit touched our symbol, or fail gracefully if not. assert result.exit_code in (0, 1) # ── --show-source ───────────────────────────────────────────────────────── def test_narrative_show_source_exits_zero( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--show-source"]) assert result.exit_code == 0, result.output def test_narrative_show_source_contains_def( self, narrative_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--show-source"]) # HEAD source should contain the function definition. assert "def compute_total" in result.output # ── requires repo ───────────────────────────────────────────────────────── def test_narrative_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, self.CMD + [self.ADDR]) assert result.exit_code != 0 finally: os.chdir(old) # --------------------------------------------------------------------------- # contract # --------------------------------------------------------------------------- @pytest.fixture() def contract_repo(repo: pathlib.Path) -> pathlib.Path: """Repo designed to exercise every dimension of ``muse code contract``. Layout:: billing.py — compute_total(items, currency="USD") → float services.py — place_order() calls compute_total with currency="EUR" → stored report.py — generate_report() calls compute_total(items) → stored (omits currency) audit.py — run_audit() calls compute_total(items) → discarded (bad caller) tests/test_billing.py — tests with assertions about compute_total Commit history:: 1. seed commit — readme.txt so symbol events are real insert ops 2. billing.py added — compute_total created 3. services.py, report.py, audit.py, tests/ added — callers in place 4. billing.py updated — body rewrite (PATCH) 5. billing.py updated — add currency param (MINOR) """ import os (repo / "readme.txt").write_text("# contract test repo\n") runner.invoke(cli, ["code", "add", "readme.txt"]) r0 = runner.invoke(cli, ["commit", "-m", "seed: initial readme"]) assert r0.exit_code == 0, r0.output (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): return sum(i["price"] for i in items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r1 = runner.invoke(cli, ["commit", "-m", "feat: add compute_total"]) assert r1.exit_code == 0, r1.output os.makedirs(repo / "tests", exist_ok=True) (repo / "services.py").write_text(textwrap.dedent("""\ from billing import compute_total def place_order(items): total = compute_total(items, currency="EUR") return total """)) (repo / "report.py").write_text(textwrap.dedent("""\ from billing import compute_total def generate_report(items): result = compute_total(items) return result """)) (repo / "audit.py").write_text(textwrap.dedent("""\ from billing import compute_total def run_audit(items): compute_total(items) """)) (repo / "tests" / "test_billing.py").write_text(textwrap.dedent("""\ from billing import compute_total def test_compute_total_basic(): result = compute_total([{"price": 10}, {"price": 5}]) assert result == 15 assert result > 0 assert isinstance(result, (int, float)) def test_compute_total_empty(): result = compute_total([]) assert result == 0 """)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "feat: add callers and tests"]) assert r2.exit_code == 0, r2.output # body rewrite — PATCH (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): total = 0.0 for item in items: total += float(item["price"]) return total """)) runner.invoke(cli, ["code", "add", "billing.py"]) r3 = runner.invoke(cli, ["commit", "-m", "perf: vectorise compute_total"]) assert r3.exit_code == 0, r3.output # add currency param — MINOR (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items, currency="USD"): total = 0.0 for item in items: total += float(item["price"]) return total """)) runner.invoke(cli, ["code", "add", "billing.py"]) r4 = runner.invoke(cli, ["commit", "-m", "feat: add optional currency param"]) assert r4.exit_code == 0, r4.output return repo class TestContract: """Tests for ``muse code contract``.""" CMD = ["code", "contract"] ADDR = "billing.py::compute_total" # ── basic correctness ───────────────────────────────────────────────────── def test_contract_exits_zero(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert result.exit_code == 0, result.output def test_contract_shows_address(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "compute_total" in result.output def test_contract_shows_signature_section(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "Signature" in result.output def test_contract_shows_def_keyword(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "def compute_total" in result.output def test_contract_shows_stability_section(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "Stability" in result.output def test_contract_shows_commits_analysed(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "commits" in result.output def test_contract_shows_assessment(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "Assessment" in result.output def test_contract_shows_return_section(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "Return value" in result.output def test_contract_shows_parameters_section(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "Parameters" in result.output # ── call-site disposition detection ────────────────────────────────────── def test_contract_detects_stored(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "stored" in result.output def test_contract_detects_discarded(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "discarded" in result.output def test_contract_warns_on_discarded(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) # audit.py discards the return — should surface a warning. assert "⚠" in result.output # ── test assertions ─────────────────────────────────────────────────────── def test_contract_shows_test_assertions(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "assert" in result.output.lower() def test_contract_shows_assert_result_positive( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR]) assert "result > 0" in result.output or "assert" in result.output # ── --json ──────────────────────────────────────────────────────────────── def test_contract_json_exits_zero(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) assert result.exit_code == 0, result.output def test_contract_json_is_valid(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data, dict) def test_contract_json_top_level_keys(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) for key in ( "address", "name", "kind", "signature", "parameters", "return_annotation", "call_sites", "caller_files", "return_dispositions", "arg_observations", "test_assertions", "commit_signals", "history", "preconditions", "postconditions", "warnings", "stability", ): assert key in data, f"missing top-level key: {key}" def test_contract_json_address_matches(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["address"] == self.ADDR def test_contract_json_name_is_bare(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["name"] == "compute_total" def test_contract_json_kind_is_function(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["kind"] in {"function", "async_function", "method", "async_method"} def test_contract_json_signature_contains_def( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert "def compute_total" in data["signature"] def test_contract_json_parameters_is_list(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["parameters"], list) def test_contract_json_parameters_not_empty(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert len(data["parameters"]) >= 1 def test_contract_json_parameters_schema(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) p = data["parameters"][0] for key in ("name", "annotation", "has_default", "default_str"): assert key in p, f"parameter missing key: {key}" def test_contract_json_items_param_present(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) names = [p["name"] for p in data["parameters"]] assert "items" in names def test_contract_json_currency_param_present( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) names = [p["name"] for p in data["parameters"]] assert "currency" in names def test_contract_json_currency_has_default(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) params = {p["name"]: p for p in data["parameters"]} assert params["currency"]["has_default"] is True def test_contract_json_currency_default_str(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) params = {p["name"]: p for p in data["parameters"]} assert params["currency"]["default_str"] == "'USD'" def test_contract_json_call_sites_positive(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["call_sites"] >= 1 def test_contract_json_caller_files_positive(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["caller_files"] >= 1 def test_contract_json_return_dispositions_is_dict( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["return_dispositions"], dict) def test_contract_json_return_dispositions_keys( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) rd = data["return_dispositions"] for key in ("stored", "discarded", "returned", "asserted", "compared"): assert key in rd, f"return_dispositions missing: {key}" def test_contract_json_discarded_count_at_least_one( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) # audit.py discards the return value assert data["return_dispositions"].get("discarded", 0) >= 1 def test_contract_json_stored_count_at_least_one( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["return_dispositions"].get("stored", 0) >= 1 def test_contract_json_test_assertions_is_list( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["test_assertions"], list) def test_contract_json_test_assertions_not_empty( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert len(data["test_assertions"]) >= 1 def test_contract_json_test_assertions_are_strings( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) for assertion in data["test_assertions"]: assert isinstance(assertion, str) def test_contract_json_history_schema(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) h = data["history"] for key in ( "commits_analysed", "truncated", "major_bumps", "minor_bumps", "patch_bumps", "sig_changes", "impl_changes", "est_survival_pct", ): assert key in h, f"history missing key: {key}" def test_contract_json_history_commits_positive( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["history"]["commits_analysed"] > 0 def test_contract_json_history_survival_0_to_100( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) pct = data["history"]["est_survival_pct"] assert 0 <= pct <= 100 def test_contract_json_commit_signals_is_list( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["commit_signals"], list) def test_contract_json_preconditions_is_list( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["preconditions"], list) def test_contract_json_postconditions_is_list( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["postconditions"], list) def test_contract_json_warnings_is_list(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["warnings"], list) def test_contract_json_stability_valid_value( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["stability"] in {"stable", "evolving", "volatile", "dormant"} def test_contract_json_arg_observations_is_list( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert isinstance(data["arg_observations"], list) # ── input validation ────────────────────────────────────────────────────── def test_contract_missing_address_exits_nonzero( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD) assert result.exit_code != 0 def test_contract_bad_address_format_exits_nonzero( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["billing_no_colon"]) assert result.exit_code != 0 def test_contract_unknown_address_exits_nonzero( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["billing.py::nonexistent_fn_xyz"]) assert result.exit_code != 0 def test_contract_max_commits_zero_rejected( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--max-commits", "0"]) assert result.exit_code != 0 def test_contract_max_commits_one_succeeds(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--max-commits", "1"]) assert result.exit_code == 0, result.output # ── requires repo ───────────────────────────────────────────────────────── def test_contract_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, self.CMD + [self.ADDR]) assert result.exit_code != 0 finally: os.chdir(old) # ── history accuracy ────────────────────────────────────────────────────── def test_contract_json_impl_changes_nonzero( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) # We made 2 body changes (perf rewrite + currency add). assert data["history"]["impl_changes"] >= 1 def test_contract_json_truncated_false_small_repo( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) assert data["history"]["truncated"] is False def test_contract_json_postconditions_nonempty( self, contract_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) # Should infer at least one postcondition (return value is stored). assert len(data["postconditions"]) >= 1 def test_contract_json_warnings_nonempty(self, contract_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + [self.ADDR, "--json"]) data = json.loads(result.output) # Missing type annotations + discarded return should generate warnings. assert len(data["warnings"]) >= 1 # --------------------------------------------------------------------------- # predict # --------------------------------------------------------------------------- @pytest.fixture() def predict_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with commit history that produces clear prediction signals. billing.py::compute_total — changed in every commit (high frequency) billing.py::apply_discount — always co-changes with compute_total (entanglement) services.py::place_order — changed only once (low confidence) 5 commits are made so that recency, frequency, and co-change signals are all detectable within the default horizon. """ # Commit 1 — establish both symbols (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): return sum(i["price"] for i in items) def apply_discount(total, rate): return total * (1 - rate) """)) (repo / "services.py").write_text(textwrap.dedent("""\ def place_order(items): return True """)) runner.invoke(cli, ["code", "add", "."]) r1 = runner.invoke(cli, ["commit", "-m", "feat: initial billing"]) assert r1.exit_code == 0, r1.output # Commits 2-5 — co-evolve compute_total and apply_discount together for i in range(2, 6): (repo / "billing.py").write_text(textwrap.dedent(f"""\ def compute_total(items, rev={i}): total = 0.0 for item in items: total += float(item["price"]) return total def apply_discount(total, rate, rev={i}): return max(0.0, total * (1 - rate)) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", f"refactor: billing revision {i}"]) assert r.exit_code == 0, r.output return repo class TestPredict: """Tests for ``muse code predict``.""" CMD = ["code", "predict"] # ── basic correctness ───────────────────────────────────────────────────── def test_predict_exits_zero(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert result.exit_code == 0, result.output def test_predict_shows_header(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "Predicted changes" in result.output def test_predict_shows_horizon(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "horizon:" in result.output def test_predict_shows_commits_analysed(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "analysed" in result.output def test_predict_shows_compute_total(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "compute_total" in result.output def test_predict_shows_apply_discount(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "apply_discount" in result.output def test_predict_shows_score(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) # Scores are in N.NN format at the start of each prediction line. import re assert re.search(r"0\.\d{2}", result.output) def test_predict_shows_reasons(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD) assert "↳" in result.output def test_predict_high_confidence_band_present( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD) # compute_total changed 4/5 commits — should be HIGH or MEDIUM. assert "CONFIDENCE" in result.output def test_predict_entanglement_signal(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--horizon", "10"]) # compute_total and apply_discount co-change → entanglement reason expected. assert "entangled" in result.output or "co-change" in result.output # ── --top ───────────────────────────────────────────────────────────────── def test_predict_top_1_shows_one_prediction( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--top", "1"]) assert result.exit_code == 0, result.output # With --top 1 there is exactly one score line. import re scores = re.findall(r"^\s+0\.\d{2}\s+", result.output, re.MULTILINE) assert len(scores) == 1 def test_predict_top_0_shows_all(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--top", "0"]) assert result.exit_code == 0, result.output assert "compute_total" in result.output # ── --min-confidence ────────────────────────────────────────────────────── def test_predict_min_confidence_1_empty(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--min-confidence", "1.0"]) assert result.exit_code == 0, result.output # Nothing should reach score 1.0 exactly. assert "No predictions" in result.output or "compute_total" not in result.output def test_predict_min_confidence_invalid_rejected( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--min-confidence", "1.5"]) assert result.exit_code != 0 def test_predict_min_confidence_zero_shows_all( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--min-confidence", "0.0"]) assert result.exit_code == 0, result.output assert "compute_total" in result.output # ── --horizon ───────────────────────────────────────────────────────────── def test_predict_horizon_1_exits_zero(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--horizon", "1"]) assert result.exit_code == 0, result.output def test_predict_horizon_invalid_rejected(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--horizon", "0"]) assert result.exit_code != 0 def test_predict_max_commits_1_exits_zero(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--max-commits", "1"]) assert result.exit_code == 0, result.output def test_predict_max_commits_invalid_rejected( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--max-commits", "0"]) assert result.exit_code != 0 # ── --file ──────────────────────────────────────────────────────────────── def test_predict_file_filter_billing(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--file", "billing.py"]) assert result.exit_code == 0, result.output # Should show billing symbols. if "compute_total" in result.output or "apply_discount" in result.output: pass # expected # Should NOT show services.py symbols. assert "place_order" not in result.output def test_predict_file_filter_nonexistent_empty( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--file", "nonexistent_xyz.py"]) assert result.exit_code == 0, result.output assert "No predictions" in result.output # ── --explain ───────────────────────────────────────────────────────────── def test_predict_explain_exits_zero(self, predict_repo: pathlib.Path) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "billing.py::compute_total"] ) assert result.exit_code == 0, result.output def test_predict_explain_shows_signal_breakdown( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "billing.py::compute_total"] ) assert "signal breakdown" in result.output def test_predict_explain_shows_all_signals( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "billing.py::compute_total"] ) for signal in ("recency", "frequency", "co_change", "sig_instability", "module_velocity"): assert signal in result.output, f"missing signal: {signal}" def test_predict_explain_shows_bar(self, predict_repo: pathlib.Path) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "billing.py::compute_total"] ) assert "█" in result.output or "░" in result.output def test_predict_explain_shows_score(self, predict_repo: pathlib.Path) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "billing.py::compute_total"] ) assert "Score:" in result.output def test_predict_explain_shows_reasons(self, predict_repo: pathlib.Path) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "billing.py::compute_total"] ) assert "Reasons" in result.output def test_predict_explain_bad_format_rejected( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--explain", "no_colon_here"]) assert result.exit_code != 0 def test_predict_explain_unknown_addr_rejected( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke( cli, self.CMD + ["--explain", "billing.py::nonexistent_fn_xyz"] ) assert result.exit_code != 0 # ── --json ──────────────────────────────────────────────────────────────── def test_predict_json_exits_zero(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) assert result.exit_code == 0, result.output def test_predict_json_is_valid(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert isinstance(data, dict) def test_predict_json_top_level_keys(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for key in ( "generated_at", "horizon_commits", "max_commits", "commits_analysed", "truncated", "predictions", ): assert key in data, f"missing key: {key}" def test_predict_json_predictions_is_list(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert isinstance(data["predictions"], list) def test_predict_json_predictions_not_empty( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert len(data["predictions"]) >= 1 def test_predict_json_prediction_schema(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) pred = data["predictions"][0] for key in ( "address", "name", "kind", "file", "score", "confidence", "reasons", "signals", "last_changed_commit", "last_changed_date", "top_partners", ): assert key in pred, f"prediction missing key: {key}" def test_predict_json_signals_schema(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) signals = data["predictions"][0]["signals"] for key in ("recency", "frequency", "co_change", "sig_instability", "module_velocity"): assert key in signals, f"signals missing key: {key}" def test_predict_json_score_is_float(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert isinstance(data["predictions"][0]["score"], float) def test_predict_json_score_in_range(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for pred in data["predictions"]: assert 0.0 <= pred["score"] <= 1.0, ( f"score out of range: {pred['score']}" ) def test_predict_json_confidence_valid(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for pred in data["predictions"]: assert pred["confidence"] in {"high", "medium", "low"}, ( f"invalid confidence: {pred['confidence']}" ) def test_predict_json_sorted_by_score_desc(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) scores = [p["score"] for p in data["predictions"]] assert scores == sorted(scores, reverse=True) def test_predict_json_commits_analysed_positive( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert data["commits_analysed"] > 0 def test_predict_json_truncated_false_small_repo( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) assert data["truncated"] is False def test_predict_json_top_partners_is_list(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for pred in data["predictions"]: assert isinstance(pred["top_partners"], list) def test_predict_json_partner_schema(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) # Find a prediction that has partners. for pred in data["predictions"]: if pred["top_partners"]: p = pred["top_partners"][0] for key in ("address", "co_change_rate", "co_change_commits"): assert key in p, f"partner missing key: {key}" break def test_predict_json_co_change_rate_in_range( self, predict_repo: pathlib.Path ) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for pred in data["predictions"]: for p in pred["top_partners"]: assert 0.0 <= p["co_change_rate"] <= 1.0 def test_predict_json_top_1_returns_one(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--top", "1"]) data = json.loads(result.output) assert len(data["predictions"]) == 1 def test_predict_json_horizon_matches_arg(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json", "--horizon", "3"]) data = json.loads(result.output) assert data["horizon_commits"] == 3 def test_predict_json_reasons_is_list(self, predict_repo: pathlib.Path) -> None: result = runner.invoke(cli, self.CMD + ["--json"]) data = json.loads(result.output) for pred in data["predictions"]: assert isinstance(pred["reasons"], list) assert all(isinstance(r, str) for r in pred["reasons"]) # ── requires repo ───────────────────────────────────────────────────────── def test_predict_requires_repo(self, tmp_path: pathlib.Path) -> None: import os old = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, self.CMD) assert result.exit_code != 0 finally: os.chdir(old) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _all_commit_ids(repo: pathlib.Path) -> list[str]: """Return all commit IDs from the store, newest-first (by log order).""" from muse.core.commits import get_all_commits commits = get_all_commits(repo) return [c.commit_id for c in commits]