"""Supercharged tests for ``muse code cat`` (symbol-level). New features under TDD: --limit N cap --all results; truncated + total_symbols in JSON total_symbols always present in --all --json output redirected_from JSON field when global-fallback fires fmt bug fix no NameError when --json + no addresses 7-tier coverage --------------- Unit _resolve_symbol edge cases Integration --limit, total_symbols, redirected_from, fmt-bug E2E --at historical ref; --limit round-trip Security (file-level security covered by test_cmd_core_cat.py) Stress --limit 10 of 200 symbols fast Data integrity source matches actual bytes; --at gives different content than HEAD Performance --limit faster than full --all """ from __future__ import annotations from collections.abc import Mapping import json import pathlib import textwrap import time import pytest from tests.cli_test_helper import CliRunner from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) import datetime from muse.core.types import blob_id, long_id from muse.core.paths import muse_dir, ref_path cli = None runner = CliRunner() _REPO_ID = "cat-sc-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path, repo_id: str = _REPO_ID) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": repo_id, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _add_file(repo: pathlib.Path, rel_path: str, content: bytes) -> str: """Write a file to disk and return its object_id.""" obj_id = blob_id(content) write_object(repo, obj_id, content) full_path = repo / rel_path full_path.parent.mkdir(parents=True, exist_ok=True) full_path.write_bytes(content) return obj_id def _make_commit( repo: pathlib.Path, files: dict[str, bytes], message: str = "commit", parent_id: str | None = None, branch: str = "main", ) -> str: global _counter _counter += 1 manifest: dict[str, str] = {} for rel_path, content in files.items(): obj_id = _add_file(repo, rel_path, content) manifest[rel_path] = obj_id snap_id = hash_snapshot(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) parent_ids = [parent_id] if parent_id else [] commit_id = hash_commit( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) (ref_path(repo, branch)).write_text(commit_id, encoding="utf-8") return commit_id _SIMPLE_PY = textwrap.dedent("""\ def hello(): return "hello" def world(): return "world" class Greeter: def greet(self): return "hi" """) _UPDATED_PY = textwrap.dedent("""\ def hello(): return "hello updated" def world(): return "world" class Greeter: def greet(self): return "hi" """) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: _init_repo(tmp_path) _make_commit(tmp_path, {"mod.py": _SIMPLE_PY.encode()}, message="initial") return tmp_path @pytest.fixture def two_commit_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Repo with two commits — mod.py changes between them.""" _init_repo(tmp_path) cid1 = _make_commit(tmp_path, {"mod.py": _SIMPLE_PY.encode()}, message="v1") _make_commit( tmp_path, {"mod.py": _UPDATED_PY.encode()}, message="v2", parent_id=cid1 ) return tmp_path # --------------------------------------------------------------------------- # Bug fix: fmt NameError when --json and no addresses # --------------------------------------------------------------------------- class TestFmtBugFix: """fmt NameError fix — targets muse code cat (symbol-level).""" def test_no_address_json_flag_no_unbound_error(self, repo: pathlib.Path) -> None: """Passing --json with no address must not raise UnboundLocalError.""" result = runner.invoke(cli, ["code", "cat", "--json"], env=_env(repo)) assert result.exit_code != 0 assert "UnboundLocalError" not in result.output assert "Traceback" not in result.output def test_no_address_text_mode_no_crash(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat"], env=_env(repo)) assert result.exit_code != 0 assert "UnboundLocalError" not in result.output def test_no_address_json_contains_error_key(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--json"], env=_env(repo)) assert result.exit_code != 0 data = json.loads(result.output) assert "error" in data # --------------------------------------------------------------------------- # Integration: total_symbols in --all --json # --------------------------------------------------------------------------- class TestAllJsonTotalSymbols: def test_all_json_has_total_symbols(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "cat", "mod.py", "--all", "--json"], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.output) assert "total_symbols" in data def test_all_json_total_symbols_count(self, repo: pathlib.Path) -> None: """total_symbols == len(results) when no kind filter.""" result = runner.invoke( cli, ["code", "cat", "mod.py", "--all", "--json"], env=_env(repo) ) data = json.loads(result.output) assert data["total_symbols"] == len(data["results"]) def test_all_json_kind_filter_shows_unfiltered_total( self, repo: pathlib.Path ) -> None: """When --kind filters, total_symbols reflects pre-filter count.""" all_result = runner.invoke( cli, ["code", "cat", "mod.py", "--all", "--json"], env=_env(repo) ) total = json.loads(all_result.output)["total_symbols"] func_result = runner.invoke( cli, ["code", "cat", "mod.py", "--all", "--kind", "function", "--json"], env=_env(repo), ) func_data = json.loads(func_result.output) # total_symbols should be the pre-filter total, not just functions assert func_data["total_symbols"] == total # But results should only have functions assert all(r["kind"] == "function" for r in func_data["results"]) def test_all_json_total_symbols_stable_across_filters( self, repo: pathlib.Path ) -> None: """total_symbols is the same regardless of --kind filter.""" base = json.loads( runner.invoke(cli, ["code", "cat", "mod.py", "--all", "--json"], env=_env(repo)).output )["total_symbols"] for kind in ("function", "method", "class"): data = json.loads( runner.invoke( cli, ["code", "cat", "mod.py", "--all", "--kind", kind, "--json"], env=_env(repo), ).output ) assert data["total_symbols"] == base # --------------------------------------------------------------------------- # Integration: --limit N for --all mode # --------------------------------------------------------------------------- class TestAllLimit: def _big_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: _init_repo(tmp_path) funcs = "\n\n".join(f"def func_{i}():\n pass" for i in range(30)) _make_commit(tmp_path, {"big.py": funcs.encode()}, message="big") return tmp_path def test_limit_caps_results(self, tmp_path: pathlib.Path) -> None: repo = self._big_repo(tmp_path) result = runner.invoke( cli, ["code", "cat", "big.py", "--all", "--limit", "5", "--json"], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) == 5 def test_limit_sets_truncated_true(self, tmp_path: pathlib.Path) -> None: repo = self._big_repo(tmp_path) result = runner.invoke( cli, ["code", "cat", "big.py", "--all", "--limit", "5", "--json"], env=_env(repo) ) data = json.loads(result.output) assert data["truncated"] is True def test_no_limit_truncated_false(self, tmp_path: pathlib.Path) -> None: repo = self._big_repo(tmp_path) result = runner.invoke( cli, ["code", "cat", "big.py", "--all", "--json"], env=_env(repo) ) data = json.loads(result.output) assert data.get("truncated") is False def test_limit_larger_than_results_not_truncated( self, tmp_path: pathlib.Path ) -> None: repo = self._big_repo(tmp_path) result = runner.invoke( cli, ["code", "cat", "big.py", "--all", "--limit", "999", "--json"], env=_env(repo) ) data = json.loads(result.output) assert data.get("truncated") is False assert len(data["results"]) == 30 def test_limit_zero_shows_zero_results(self, tmp_path: pathlib.Path) -> None: repo = self._big_repo(tmp_path) result = runner.invoke( cli, ["code", "cat", "big.py", "--all", "--limit", "0", "--json"], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) == 0 assert data["truncated"] is True def test_limit_text_mode_respects_cap(self, tmp_path: pathlib.Path) -> None: repo = self._big_repo(tmp_path) result = runner.invoke( cli, ["code", "cat", "big.py", "--all", "--limit", "3"], env=_env(repo) ) assert result.exit_code == 0 # Only 3 symbols printed — count '# big.py::' headers headers = [line for line in result.output.splitlines() if line.startswith("# big.py::")] assert len(headers) == 3 def test_limit_without_all_is_ignored(self, repo: pathlib.Path) -> None: """--limit without --all should be silently accepted (operates on results list).""" result = runner.invoke( cli, ["code", "cat", "mod.py::hello", "--limit", "5", "--json"], env=_env(repo) ) # Should work normally (limit doesn't apply in address mode) assert result.exit_code == 0 # --------------------------------------------------------------------------- # Integration: redirected_from in JSON for global fallback # --------------------------------------------------------------------------- class TestRedirectedFrom: def test_json_global_fallback_has_redirected_from( self, tmp_path: pathlib.Path ) -> None: """When symbol is found in a different file via fallback, JSON has redirected_from.""" _init_repo(tmp_path) _make_commit( tmp_path, { # wrong.py has some symbols but NOT my_func — triggers global fallback "wrong.py": b"def other_func():\n pass\n", "right.py": b"def my_func():\n pass\n", }, message="two files", ) result = runner.invoke( cli, ["code", "cat", "wrong.py::my_func", "--json"], env=_env(tmp_path), ) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) == 1 r = data["results"][0] assert "redirected_from" in r assert "wrong.py" in r["redirected_from"] def test_text_fallback_still_prints_note(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _make_commit( tmp_path, { # wrong.py has some symbols but NOT my_func "wrong.py": b"def other_func():\n pass\n", "right.py": b"def my_func():\n pass\n", }, message="two files", ) result = runner.invoke( cli, ["code", "cat", "wrong.py::my_func"], env=_env(tmp_path) ) assert result.exit_code == 0 assert "note" in result.output.lower() or "found in" in result.output.lower() # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_symbol_source_matches_file_bytes(self, repo: pathlib.Path) -> None: """Source extracted by cat must appear verbatim in the actual file.""" result = runner.invoke( cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo) ) data = json.loads(result.output) source = data["results"][0]["source"] disk_content = (repo / "mod.py").read_text() assert source in disk_content def test_at_ref_gives_different_content_than_head( self, two_commit_repo: pathlib.Path ) -> None: log = runner.invoke(cli, ["log", "--json"], env=_env(two_commit_repo)) old_cid = json.loads(log.output)["commits"][-1]["commit_id"] head = runner.invoke( cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(two_commit_repo) ) old = runner.invoke( cli, ["code", "cat", "mod.py::hello", "--at", old_cid, "--json"], env=_env(two_commit_repo), ) head_src = json.loads(head.output)["results"][0]["source"] old_src = json.loads(old.output)["results"][0]["source"] assert head_src != old_src assert "updated" in head_src assert "updated" not in old_src def test_all_symbols_cover_all_defs(self, repo: pathlib.Path) -> None: """--all must return entries for every def/class in the file.""" result = runner.invoke( cli, ["code", "cat", "mod.py", "--all", "--json"], env=_env(repo) ) data = json.loads(result.output) names = {r["symbol"] for r in data["results"]} assert "hello" in names assert "world" in names # Greeter class or Greeter.greet method assert any("Greeter" in n or "greet" in n for n in names) def test_limit_preserves_lineno_order(self, tmp_path: pathlib.Path) -> None: """With --limit, returned symbols should be the first N in line order.""" _init_repo(tmp_path) funcs = "\n\n".join(f"def func_{i}():\n pass" for i in range(10)) _make_commit(tmp_path, {"ordered.py": funcs.encode()}, message="ordered") result = runner.invoke( cli, ["code", "cat", "ordered.py", "--all", "--limit", "3", "--json"], env=_env(tmp_path), ) data = json.loads(result.output) linenos = [r["lineno"] for r in data["results"]] assert linenos == sorted(linenos) # First 3 should be func_0, func_1, func_2 symbols = [r["symbol"] for r in data["results"]] assert symbols == ["func_0", "func_1", "func_2"] # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- class TestPerformance: @pytest.fixture def large_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: _init_repo(tmp_path) funcs = "\n\n".join(f"def func_{i}():\n return {i}" for i in range(200)) _make_commit(tmp_path, {"large.py": funcs.encode()}, message="large") return tmp_path def test_limit_10_faster_than_all(self, large_repo: pathlib.Path) -> None: """--limit 10 should complete in under 3s on 200-symbol file.""" t0 = time.monotonic() result = runner.invoke( cli, ["code", "cat", "large.py", "--all", "--limit", "10", "--json"], env=_env(large_repo), ) elapsed = time.monotonic() - t0 assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) == 10 assert elapsed < 3.0 # --------------------------------------------------------------------------- # TestJsonAlias — -j works identically to --json # --------------------------------------------------------------------------- class TestJsonAlias: """-j shorthand must behave identically to --json.""" def test_j_alias_exits_zero(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) assert r.exit_code == 0, r.output def test_j_alias_valid_json(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) json.loads(r.output) # must not raise def test_j_alias_has_results_key(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) data = json.loads(r.output) assert "results" in data def test_j_alias_has_errors_key(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) data = json.loads(r.output) assert "errors" in data def test_j_alias_same_top_level_keys_as_json_flag(self, repo: pathlib.Path) -> None: r1 = runner.invoke(cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo)) r2 = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) d1 = json.loads(r1.output) d2 = json.loads(r2.output) d1.pop("duration_ms", None) d2.pop("duration_ms", None) assert set(d1.keys()) == set(d2.keys()) def test_j_alias_result_address_matches(self, repo: pathlib.Path) -> None: r1 = runner.invoke(cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo)) r2 = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) assert json.loads(r1.output)["results"][0]["address"] == \ json.loads(r2.output)["results"][0]["address"] # --------------------------------------------------------------------------- # TestExitCode — JSON output must include exit_code # --------------------------------------------------------------------------- class TestExitCode: """JSON envelope must carry exit_code mirroring the process exit.""" def test_json_has_exit_code(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo)) data = json.loads(r.output) assert "exit_code" in data def test_json_exit_code_zero_on_success(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo)) assert r.exit_code == 0 data = json.loads(r.output) assert data["exit_code"] == 0 def test_json_exit_code_is_int(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo)) data = json.loads(r.output) assert isinstance(data["exit_code"], int) def test_j_alias_exit_code_present(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) data = json.loads(r.output) assert "exit_code" in data def test_exit_code_mirrors_process_exit_on_success(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo)) data = json.loads(r.output) assert data["exit_code"] == r.exit_code def test_exit_code_nonzero_on_symbol_not_found(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::nonexistent_fn", "--json"], env=_env(repo)) assert r.exit_code != 0 data = json.loads(r.output) assert data["exit_code"] != 0 def test_exit_code_mirrors_process_exit_on_error(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::nonexistent_fn", "--json"], env=_env(repo)) data = json.loads(r.output) assert data["exit_code"] == r.exit_code def test_exit_code_zero_with_all_flag(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py", "--all", "--json"], env=_env(repo)) assert r.exit_code == 0 data = json.loads(r.output) assert data["exit_code"] == 0 # --------------------------------------------------------------------------- # TestTypedDicts — _CatOutputJson carries the envelope fields # --------------------------------------------------------------------------- class TestTypedDicts: """_CatOutputJson must carry source_ref, results, errors, exit_code, duration_ms.""" def test_cat_output_json_exists(self) -> None: from muse.cli.commands.cat import _CatOutputJson # noqa: F401 def test_cat_output_json_has_exit_code_annotation(self) -> None: from muse.cli.commands.cat import _CatOutputJson assert "exit_code" in _CatOutputJson.__annotations__ def test_cat_output_json_has_duration_ms_annotation(self) -> None: from muse.cli.commands.cat import _CatOutputJson assert "duration_ms" in _CatOutputJson.__annotations__ def test_cat_output_json_has_results_annotation(self) -> None: from muse.cli.commands.cat import _CatOutputJson assert "results" in _CatOutputJson.__annotations__ def test_cat_output_json_has_errors_annotation(self) -> None: from muse.cli.commands.cat import _CatOutputJson assert "errors" in _CatOutputJson.__annotations__ def test_cat_output_json_has_source_ref_annotation(self) -> None: from muse.cli.commands.cat import _CatOutputJson assert "source_ref" in _CatOutputJson.__annotations__ def test_cat_result_exists(self) -> None: from muse.cli.commands.cat import CatResult # noqa: F401 def test_cat_error_exists(self) -> None: from muse.cli.commands.cat import CatError # noqa: F401 # --------------------------------------------------------------------------- # TestDocstrings — run() docstring documents new fields # --------------------------------------------------------------------------- class TestDocstrings: """run() must document exit_code in the JSON output section.""" def test_run_docstring_documents_fields(self) -> None: from muse.cli.commands.cat import run assert "exit_code" in run.__doc__ # --------------------------------------------------------------------------- # TestAnsiSanitization — no escape codes in JSON output # --------------------------------------------------------------------------- class TestAnsiSanitization: """No ANSI escape sequences anywhere in the JSON output.""" def test_json_output_no_ansi(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "--json"], env=_env(repo)) assert "\x1b" not in r.output def test_j_alias_output_no_ansi(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::hello", "-j"], env=_env(repo)) assert "\x1b" not in r.output def test_error_path_json_no_ansi(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["code", "cat", "mod.py::no_such_fn", "--json"], env=_env(repo)) assert "\x1b" not in r.output