"""End-to-end CLI tests for ``muse code type``. Coverage: - Default health report: text and JSON on a minimal repo. - --any-blast-radius: finds callers up to depth 2. - --drift: across 5 commits shows coverage trend. - --migration-targets: top-5 ranked correctly. - --diff HEAD~1: detects widened signature. - --file: filter restricts output. - --json: output is valid JSON with all required keys. - Missing repo exits non-zero. - Depth cap respected (no infinite BFS). - Stress: --drift over 100 commits completes in < 10 s. """ from __future__ import annotations import datetime import json import pathlib import time import pytest from muse.core.types import blob_id from muse.core.object_store import write_object as _write_object_store from tests.cli_test_helper import CliRunner from muse.core.paths import muse_dir runner = CliRunner() cli = None # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _write_object(root: pathlib.Path, content: bytes) -> str: oid = blob_id(content) _write_object_store(root, oid, content) return oid def _make_repo( tmp: pathlib.Path, *, src: bytes | None = None, branch: str = "main", repo_id: str = "test-type-repo", ) -> pathlib.Path: """Minimal one-commit repo with a single Python source file.""" from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) if src is None: src = ( b"def add(x: int, y: int) -> int:\n" b" return x + y\n" b"\n" b"def untyped(a, b):\n" b" return a + b\n" ) dot_muse = muse_dir(tmp) dot_muse.mkdir(exist_ok=True) (dot_muse / "repo.json").write_text(f'{{"repo_id": "{repo_id}", "name": "test"}}') oid = _write_object(tmp, src) (tmp / "sample.py").write_bytes(src) manifest: Manifest = {"sample.py": oid} snap_id = hash_snapshot(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 3, 26, tzinfo=datetime.timezone.utc) commit_id = hash_commit( parent_ids=[], snapshot_id=snap_id, message="initial", committed_at_iso=committed_at.isoformat(), author="test", ) commit = CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message="initial", committed_at=committed_at, author="test", ) write_commit(tmp, commit) refs = dot_muse / "refs" / "heads" refs.mkdir(parents=True, exist_ok=True) (refs / branch).write_text(commit_id) (dot_muse / "HEAD").write_text(f"ref: refs/heads/{branch}\n") return tmp def _make_multi_commit_repo( tmp: pathlib.Path, srcs: list[bytes], branch: str = "main", repo_id: str = "drift-repo", ) -> pathlib.Path: """Repo with multiple sequential commits, each replacing sample.py.""" from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) dot_muse = muse_dir(tmp) dot_muse.mkdir(exist_ok=True) (dot_muse / "repo.json").write_text(f'{{"repo_id": "{repo_id}", "name": "test"}}') refs = dot_muse / "refs" / "heads" refs.mkdir(parents=True, exist_ok=True) parent_ids: list[str] = [] base_time = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i, src in enumerate(srcs): oid = _write_object(tmp, src) manifest: Manifest = {"sample.py": oid} snap_id = hash_snapshot(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = base_time + datetime.timedelta(days=i) msg = f"commit {i}" commit_id = hash_commit( parent_ids=parent_ids, snapshot_id=snap_id, message=msg, committed_at_iso=committed_at.isoformat(), author="test", ) commit = CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=msg, committed_at=committed_at, author="test", parent_commit_id=parent_ids[0] if parent_ids else None, ) write_commit(tmp, commit) parent_ids = [commit_id] (refs / branch).write_text(parent_ids[-1]) (dot_muse / "HEAD").write_text(f"ref: refs/heads/{branch}\n") (tmp / "sample.py").write_bytes(srcs[-1]) return tmp # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: return _make_repo(tmp_path) @pytest.fixture() def empty_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text('{"repo_id": "empty", "name": "empty"}') refs = dot_muse / "refs" / "heads" refs.mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path @pytest.fixture() def no_repo(tmp_path: pathlib.Path) -> pathlib.Path: return tmp_path # --------------------------------------------------------------------------- # Tests: default health report # --------------------------------------------------------------------------- class TestHealthReport: def test_exits_zero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type"], env=_env(repo)) assert result.exit_code == 0, result.output def test_output_contains_health_header(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type"], env=_env(repo)) assert "Type Health" in result.output def test_output_shows_coverage(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type"], env=_env(repo)) assert "%" in result.output def test_json_output_valid(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type", "--json"], env=_env(repo)) assert result.exit_code == 0, result.output data = json.loads(result.output) required_keys = { "total_symbols", "fully_typed", "partially_typed", "untyped", "any_count", "coverage_fraction", "symbols", } assert required_keys.issubset(data.keys()) def test_json_symbols_list(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type", "--json"], env=_env(repo)) data = json.loads(result.output) assert isinstance(data["symbols"], list) # Our fixture has 2 functions: add (typed) and untyped (not typed) assert data["total_symbols"] == 2 def test_json_coverage_between_0_and_1(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type", "--json"], env=_env(repo)) data = json.loads(result.output) assert 0.0 <= data["coverage_fraction"] <= 1.0 def test_file_filter_restricts_output(self, tmp_path: pathlib.Path) -> None: src = b"def fn(x: int) -> int:\n return x\n" _make_repo(tmp_path, src=src) # Filter to "other/" which doesn't match "sample.py" result = runner.invoke( cli, ["code", "type", "--file", "other/", "--json"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["total_symbols"] == 0 def test_no_repo_exits_nonzero(self, no_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type"], env=_env(no_repo)) assert result.exit_code != 0 def test_empty_repo_no_head_snapshot(self, empty_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type"], env=_env(empty_repo)) # Should exit non-zero with an informative message assert result.exit_code != 0 assert "No snapshot" in result.stderr or "snapshot" in result.stderr.lower() # --------------------------------------------------------------------------- # Tests: --any-blast-radius # --------------------------------------------------------------------------- class TestAnyBlastRadius: def test_any_return_exits_nonzero_when_callers_found( self, tmp_path: pathlib.Path ) -> None: src = b"""\ import typing from muse.core.paths import muse_dir def load() -> typing.Any: return {} def process(): return load() """ _make_repo(tmp_path, src=src) result = runner.invoke( cli, ["code", "type", "--any-blast-radius", "sample.py::load"], env=_env(tmp_path), ) # load() has Any return AND has a caller (process), so exit non-zero assert result.exit_code != 0 def test_no_any_exits_zero(self, repo: pathlib.Path) -> None: # "add" is fully typed with no Any — blast radius is empty → exit 0 result = runner.invoke( cli, ["code", "type", "--any-blast-radius", "sample.py::add"], env=_env(repo), ) assert result.exit_code == 0 def test_json_output_has_nodes_key(self, tmp_path: pathlib.Path) -> None: src = b"import typing\ndef f() -> typing.Any:\n return {}\n" _make_repo(tmp_path, src=src) result = runner.invoke( cli, ["code", "type", "--any-blast-radius", "sample.py::f", "--json"], env=_env(tmp_path), ) data = json.loads(result.output) assert "nodes" in data assert "address" in data def test_depth_flag_accepted(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--any-blast-radius", "sample.py::add", "--depth", "3"], env=_env(repo), ) # Should not raise — depth=3 is valid assert result.exit_code == 0 def test_depth_over_max_capped(self, tmp_path: pathlib.Path) -> None: src = b"from typing import Any\ndef f(x: Any) -> None:\n pass\n" _make_repo(tmp_path, src=src) result = runner.invoke( cli, ["code", "type", "--any-blast-radius", "sample.py::f", "--depth", "999"], env=_env(tmp_path), ) # Should not hang or error due to infinite BFS assert result.exit_code in (0, 1) # 0 if no callers, 1 if callers found def test_missing_address_exits_zero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--any-blast-radius", "sample.py::nonexistent"], env=_env(repo), ) assert result.exit_code == 0 def test_text_output_shows_address(self, tmp_path: pathlib.Path) -> None: src = b"from typing import Any\ndef f(x: Any) -> None:\n pass\n" _make_repo(tmp_path, src=src) result = runner.invoke( cli, ["code", "type", "--any-blast-radius", "sample.py::f"], env=_env(tmp_path), ) assert "sample.py::f" in result.output # --------------------------------------------------------------------------- # Tests: --drift # --------------------------------------------------------------------------- class TestDrift: def test_drift_on_single_commit(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type", "--drift"], env=_env(repo)) assert result.exit_code == 0 def test_drift_json_has_branch_and_drift(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--drift", "--json"], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.output) assert "branch" in data assert "drift" in data assert isinstance(data["drift"], list) def test_drift_json_point_keys(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--drift", "--json"], env=_env(repo) ) data = json.loads(result.output) assert len(data["drift"]) >= 1 point = data["drift"][0] required = { "commit_id", "committed_at", "message", "coverage_fraction", "any_count", "delta_coverage", } assert required.issubset(point.keys()) def test_drift_five_commits_coverage_trend(self, tmp_path: pathlib.Path) -> None: """Coverage improves across 5 commits (none typed → fully typed).""" srcs: list[bytes] = [ b"def a(x, y): return x\n", b"def a(x: int, y): return x\n", b"def a(x: int, y: int): return x\n", b"def a(x: int, y: int) -> int: return x\n", b"def a(x: int, y: int) -> int:\n return x\n", ] _make_multi_commit_repo(tmp_path, srcs) result = runner.invoke( cli, ["code", "type", "--drift", "--json"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) coverages = [p["coverage_fraction"] for p in data["drift"]] assert len(coverages) == 5 # Coverage should be non-decreasing (or at worst plateau) for i in range(1, len(coverages)): assert coverages[i] >= coverages[0] - 0.01 # allow tiny float noise def test_drift_since_flag_accepted(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--drift", "--since", "2020-01-01"], env=_env(repo), ) assert result.exit_code == 0 def test_drift_since_invalid_date_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--drift", "--since", "not-a-date"], env=_env(repo), ) assert result.exit_code != 0 def test_drift_max_commits_flag(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--drift", "--max-commits", "10"], env=_env(repo), ) assert result.exit_code == 0 def test_drift_text_shows_trend(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "type", "--drift"], env=_env(repo)) assert "Type Coverage Drift" in result.output or "Coverage" in result.output # --------------------------------------------------------------------------- # Tests: --migration-targets # --------------------------------------------------------------------------- class TestMigrationTargets: def test_exits_zero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--migration-targets"], env=_env(repo) ) assert result.exit_code == 0 def test_json_has_targets_key(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--migration-targets", "--json"], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.output) assert "targets" in data assert isinstance(data["targets"], list) def test_untyped_fn_appears_in_targets(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--migration-targets", "--json"], env=_env(repo) ) data = json.loads(result.output) addresses = [t["address"] for t in data["targets"]] # "sample.py::untyped" should appear since it has no annotations assert any("untyped" in addr for addr in addresses) def test_fully_typed_repo_shows_no_targets(self, tmp_path: pathlib.Path) -> None: src = b"def fn(x: int, y: str) -> float:\n return float(x)\n" _make_repo(tmp_path, src=src) result = runner.invoke( cli, ["code", "type", "--migration-targets", "--json"], env=_env(tmp_path) ) data = json.loads(result.output) assert data["targets"] == [] def test_top_n_limits_output(self, tmp_path: pathlib.Path) -> None: fns = b"\n".join([f"def fn{i}(x, y): pass".encode() for i in range(20)]) _make_repo(tmp_path, src=fns) result = runner.invoke( cli, ["code", "type", "--migration-targets", "--top", "5", "--json"], env=_env(tmp_path), ) data = json.loads(result.output) assert len(data["targets"]) <= 5 def test_targets_sorted_by_priority(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--migration-targets", "--json"], env=_env(repo) ) data = json.loads(result.output) scores = [t["priority_score"] for t in data["targets"]] assert scores == sorted(scores, reverse=True) def test_target_has_required_keys(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--migration-targets", "--json"], env=_env(repo) ) data = json.loads(result.output) if data["targets"]: t = data["targets"][0] required = {"address", "caller_count", "type_score", "priority_score"} assert required.issubset(t.keys()) # --------------------------------------------------------------------------- # Tests: --diff REF # --------------------------------------------------------------------------- class TestDiff: def _make_two_commit_repo( self, tmp: pathlib.Path, src_a: bytes, src_b: bytes, ) -> pathlib.Path: return _make_multi_commit_repo(tmp, [src_a, src_b]) def test_identical_snapshots_exit_zero(self, tmp_path: pathlib.Path) -> None: src = b"def fn(x: int) -> int:\n return x\n" _make_multi_commit_repo(tmp_path, [src, src]) result = runner.invoke( cli, ["code", "type", "--diff", "HEAD~1"], env=_env(tmp_path) ) assert result.exit_code == 0 def test_widened_signature_exits_nonzero(self, tmp_path: pathlib.Path) -> None: src_a = b"def fn(x: str) -> str:\n return x\n" src_b = b"from typing import Any\ndef fn(x: Any) -> str:\n return str(x)\n" self._make_two_commit_repo(tmp_path, src_a, src_b) result = runner.invoke( cli, ["code", "type", "--diff", "HEAD~1"], env=_env(tmp_path) ) # Widened → exit non-zero assert result.exit_code != 0 def test_narrowed_signature_exits_zero(self, tmp_path: pathlib.Path) -> None: src_a = b"from typing import Any\ndef fn(x: Any) -> str:\n return str(x)\n" src_b = b"def fn(x: str) -> str:\n return x\n" self._make_two_commit_repo(tmp_path, src_a, src_b) result = runner.invoke( cli, ["code", "type", "--diff", "HEAD~1"], env=_env(tmp_path) ) # Narrowed only → exit zero assert result.exit_code == 0 def test_json_has_conflicts_key(self, tmp_path: pathlib.Path) -> None: src = b"def fn(x: int) -> int:\n return x\n" _make_multi_commit_repo(tmp_path, [src, src]) result = runner.invoke( cli, ["code", "type", "--diff", "HEAD~1", "--json"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert "conflicts" in data assert "diff_ref" in data def test_conflict_has_required_keys(self, tmp_path: pathlib.Path) -> None: src_a = b"def fn(x: str) -> str:\n return x\n" src_b = b"from typing import Any\ndef fn(x: Any) -> str:\n return str(x)\n" self._make_two_commit_repo(tmp_path, src_a, src_b) result = runner.invoke( cli, ["code", "type", "--diff", "HEAD~1", "--json"], env=_env(tmp_path) ) data = json.loads(result.output) if data["conflicts"]: c = data["conflicts"][0] required = {"address", "signature_a", "signature_b", "change_kind"} assert required.issubset(c.keys()) def test_invalid_ref_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "type", "--diff", "HEAD~999"], env=_env(repo) ) assert result.exit_code != 0 def test_text_output_shows_type_diff_header(self, tmp_path: pathlib.Path) -> None: src = b"def fn(x: int) -> int:\n return x\n" _make_multi_commit_repo(tmp_path, [src, src]) result = runner.invoke( cli, ["code", "type", "--diff", "HEAD~1"], env=_env(tmp_path) ) assert "Type Diff" in result.output or "HEAD" in result.output # --------------------------------------------------------------------------- # Stress test # --------------------------------------------------------------------------- class TestStress: def test_drift_100_commits_under_10_seconds(self, tmp_path: pathlib.Path) -> None: """--drift over 100 commits must complete in under 10 seconds.""" from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) dot_muse = muse_dir(tmp_path) dot_muse.mkdir() repo_id = "stress-drift" (dot_muse / "repo.json").write_text( f'{{"repo_id": "{repo_id}", "name": "stress"}}' ) refs = dot_muse / "refs" / "heads" refs.mkdir(parents=True) parent_ids: list[str] = [] base_time = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) # Write a single typed Python file once — reuse its object ID across commits. src = b"def fn(x: int, y: int) -> int:\n return x + y\n" oid = _write_object(tmp_path, src) for i in range(100): manifest: Manifest = {"sample.py": oid} snap_id = hash_snapshot(manifest) write_snapshot(tmp_path, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = base_time + datetime.timedelta(days=i) msg = f"commit {i}" commit_id = hash_commit( parent_ids=parent_ids, snapshot_id=snap_id, message=msg, committed_at_iso=committed_at.isoformat(), author="test", ) commit = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=msg, committed_at=committed_at, author="test", parent_commit_id=parent_ids[0] if parent_ids else None, ) write_commit(tmp_path, commit) parent_ids = [commit_id] (refs / "main").write_text(parent_ids[-1]) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (tmp_path / "sample.py").write_bytes(src) start = time.monotonic() result = runner.invoke( cli, ["code", "type", "--drift", "--json"], env=_env(tmp_path), ) elapsed = time.monotonic() - start assert result.exit_code == 0, result.output data = json.loads(result.output) assert len(data["drift"]) == 100 assert elapsed < 10.0, f"--drift 100 commits took {elapsed:.2f}s — too slow"