"""Comprehensive tests for ``muse code cat``. Coverage -------- Unit _extract_source — basic slice, context lines, unicode, binary-safe _format_line_numbers — numbering, width padding, first-line offset _resolve_symbol — qualified match, bare-name match, ambiguous, missing _get_file_bytes — workdir read, object-store fallback, not-in-manifest Integration cat ADDRESS — found, missing, no "::", JSON output cat --all — all symbols in file, kind filter cat --at REF — historical snapshot cat --context N — surrounding lines appear cat --line-numbers — line numbers prefix cat --json — schema, errors list, unicode cat multiple addresses — batch lookup, partial errors cat untracked file — exits 1, FILE_NOT_TRACKED (never leaks content) cat staged file — staged blob readable from object store Security sanitize_display — control chars in address do not crash missing repo — exits non-zero outside repo Stress file with 200 symbols — --all completes in < 5 s 50 addresses in one call — batch under 3 s """ from __future__ import annotations import json import pathlib import textwrap import time import pytest from typing import Literal from tests.cli_test_helper import CliRunner from muse.cli.commands.cat import ( _FileError, _extract_source, _format_line_numbers, _get_file_bytes, _resolve_symbol, ) from muse.plugins.code.ast_parser import SymbolRecord, SymbolKind from muse.core.types import NULL_LONG_ID, long_id from muse.core.object_store import write_object from muse.core.types import long_id as _long_id, blob_id as _blob_id from muse.plugins.code.stage import make_entry, write_stage cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_record( qualified_name: str, name: str, lineno: int, end_lineno: int, kind: SymbolKind = "function", ) -> SymbolRecord: return SymbolRecord( name=name, qualified_name=qualified_name, kind=kind, lineno=lineno, end_lineno=end_lineno, content_id="a" * 64, body_hash="b" * 64, signature_id="c" * 64, metadata_id="", canonical_key=f"mod.py###{kind}#{name}#{lineno}", ) # --------------------------------------------------------------------------- # Shared repo fixture # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke(cli, ["init", "--domain", "code"]) assert r.exit_code == 0, r.output (tmp_path / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items: list[int]) -> int: return sum(items) def apply_discount(self, total: float, pct: float) -> float: return total * (1 - pct) def validate_amount(amount: float) -> bool: return amount > 0 def format_receipt(amount: float) -> str: return f"Total: {amount:.2f}" """)) r2 = runner.invoke(cli, ["commit", "-m", "initial"]) assert r2.exit_code == 0, r2.output return tmp_path # --------------------------------------------------------------------------- # Unit — _extract_source # --------------------------------------------------------------------------- class TestExtractSource: _SOURCE = b"line1\nline2\nline3\nline4\nline5\n" def test_basic_slice(self) -> None: result = _extract_source(self._SOURCE, lineno=2, end_lineno=3) assert result == "line2\nline3" def test_single_line(self) -> None: result = _extract_source(self._SOURCE, lineno=1, end_lineno=1) assert result == "line1" def test_context_before(self) -> None: result = _extract_source(self._SOURCE, lineno=3, end_lineno=3, context=1) assert "line2" in result assert "line3" in result def test_context_after(self) -> None: result = _extract_source(self._SOURCE, lineno=3, end_lineno=3, context=1) assert "line4" in result def test_context_clamps_at_start(self) -> None: # Asking for 10 lines of context before line 1 must not go negative. result = _extract_source(self._SOURCE, lineno=1, end_lineno=1, context=10) assert "line1" in result def test_context_clamps_at_end(self) -> None: result = _extract_source(self._SOURCE, lineno=5, end_lineno=5, context=10) assert "line5" in result def test_unicode_round_trips(self) -> None: src = "def café() -> str:\n return 'café'\n".encode() result = _extract_source(src, lineno=1, end_lineno=2) assert "café" in result def test_binary_errors_replaced(self) -> None: src = b"def foo():\n x = \xff\xfe\n" result = _extract_source(src, lineno=1, end_lineno=2) assert "foo" in result # must not raise # --------------------------------------------------------------------------- # Unit — _format_line_numbers # --------------------------------------------------------------------------- class TestFormatLineNumbers: def test_single_line_numbered(self) -> None: result = _format_line_numbers("hello", start_lineno=5) assert result.startswith("5") assert "hello" in result def test_multiline_numbered(self) -> None: source = "a\nb\nc" result = _format_line_numbers(source, start_lineno=1) lines = result.splitlines() assert len(lines) == 3 assert lines[0].startswith("1") assert lines[2].startswith("3") def test_width_pads_for_large_line_numbers(self) -> None: # 100 lines → width=3; the separator " " appears at offset 3 for every line. source = "\n".join(f"line_{i}" for i in range(100)) result = _format_line_numbers(source, start_lineno=1) expected_width = len(str(100)) # 3 for line in result.splitlines(): sep = line[expected_width : expected_width + 2] assert sep == " ", f"separator not at col {expected_width} in {line!r}" def test_offset_start_lineno(self) -> None: result = _format_line_numbers("hello", start_lineno=42) assert result.startswith("42") # --------------------------------------------------------------------------- # Unit — _resolve_symbol # --------------------------------------------------------------------------- class TestResolveSymbol: def test_qualified_name_match(self) -> None: tree = { "mod.py::MyClass.my_method": _make_record("MyClass.my_method", "my_method", 5, 7), } record, err = _resolve_symbol(tree, "MyClass.my_method", "mod.py") assert record is not None assert err == "" def test_bare_name_unambiguous(self) -> None: tree = { "mod.py::my_func": _make_record("my_func", "my_func", 1, 3), } record, err = _resolve_symbol(tree, "my_func", "mod.py") assert record is not None def test_bare_name_ambiguous_returns_error(self) -> None: tree = { "mod.py::A.validate": _make_record("A.validate", "validate", 1, 2), "mod.py::B.validate": _make_record("B.validate", "validate", 5, 6), } record, err = _resolve_symbol(tree, "validate", "mod.py") assert record is None assert "ambiguous" in err.lower() or "qualify" in err.lower() def test_not_found_returns_error_message(self) -> None: tree = { "mod.py::existing": _make_record("existing", "existing", 1, 3), } record, err = _resolve_symbol(tree, "missing_func", "mod.py") assert record is None assert "not found" in err.lower() or "missing_func" in err def test_empty_tree_returns_error(self) -> None: record, err = _resolve_symbol({}, "anything", "mod.py") assert record is None assert len(err) > 0 def test_import_symbols_excluded_from_suggestions(self) -> None: """Import pseudo-symbols must not show up as 'available' options.""" tree = { "mod.py::import::os": _make_record("import::os", "os", 1, 1, kind="import"), } record, err = _resolve_symbol(tree, "missing", "mod.py") assert record is None assert "import::os" not in err # --------------------------------------------------------------------------- # Integration — basic address lookup # --------------------------------------------------------------------------- class TestCatBasic: def test_finds_top_level_function(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"]) assert result.exit_code == 0, result.output assert "validate_amount" in result.output def test_shows_function_body(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"]) assert result.exit_code == 0 assert "amount > 0" in result.output def test_finds_method(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice.compute_total"]) assert result.exit_code == 0 assert "compute_total" in result.output def test_missing_symbol_exits_one(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::zzz_nonexistent"]) assert result.exit_code == 1 def test_no_separator_exits_one(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py"]) assert result.exit_code == 1 def test_untracked_file_exits_one(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "nowhere.py::foo"]) assert result.exit_code == 1 # --------------------------------------------------------------------------- # Integration — --all mode # --------------------------------------------------------------------------- class TestCatFileFlag: """--file is a convenience alias for --all.""" def test_file_flag_prints_all_symbols(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--file", "billing.py"]) assert result.exit_code == 0 assert "validate_amount" in result.output assert "format_receipt" in result.output assert "compute_total" in result.output def test_file_flag_accepts_kind_filter(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--file", "billing.py", "--kind", "function"]) assert result.exit_code == 0 assert "validate_amount" in result.output def test_file_flag_json_output(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--file", "billing.py", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "results" in data names = [r["symbol"] for r in data["results"]] assert "validate_amount" in names def test_file_flag_untracked_file_errors(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--file", "missing.py"]) assert result.exit_code != 0 class TestCatAll: def test_all_prints_every_non_import_symbol(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--all", "billing.py"]) assert result.exit_code == 0 assert "validate_amount" in result.output assert "format_receipt" in result.output assert "compute_total" in result.output def test_all_kind_filter_functions_only(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--all", "--kind", "function", "billing.py"]) assert result.exit_code == 0 assert "validate_amount" in result.output # Classes should not appear as their own block (only functions). def test_all_untracked_file_skips_gracefully(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--all", "missing.py"]) # Should exit with error since file not in manifest. assert result.exit_code != 0 or "not tracked" in result.output.lower() # --------------------------------------------------------------------------- # Integration — --line-numbers and --context # --------------------------------------------------------------------------- class TestCatLineNumbers: def test_line_numbers_flag(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "cat", "--line-numbers", "billing.py::validate_amount", ]) assert result.exit_code == 0 # Some line in the output should start with a digit. output_lines = result.output.splitlines() has_number = any(line.strip() and line.strip()[0].isdigit() for line in output_lines) assert has_number def test_context_includes_surrounding_lines(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "cat", "--context", "2", "billing.py::validate_amount", ]) assert result.exit_code == 0 # With context=2 the preceding lines of the file should appear. assert len(result.output.splitlines()) > 2 # --------------------------------------------------------------------------- # Integration — --json # --------------------------------------------------------------------------- class TestCatJson: def test_json_schema(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "results" in data assert "errors" in data assert len(data["results"]) == 1 def test_json_result_fields(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"]) data = json.loads(result.output) r = data["results"][0] for field in ("address", "source", "kind", "lineno", "end_lineno"): assert field in r, f"missing field {field!r}" def test_json_missing_symbol_in_errors(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::zzz_missing"]) data = json.loads(result.output) assert len(data["errors"]) == 1 assert "zzz_missing" in data["errors"][0].get("error", "") def test_json_multiple_results(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "cat", "--json", "billing.py::validate_amount", "billing.py::format_receipt", ]) data = json.loads(result.output) assert len(data["results"]) == 2 def test_json_partial_error(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "cat", "--json", "billing.py::validate_amount", "billing.py::zzz_missing", ]) data = json.loads(result.output) assert len(data["results"]) == 1 assert len(data["errors"]) == 1 def test_json_has_duration_ms(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"]) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_json_has_source_ref(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"]) data = json.loads(result.output) assert "source_ref" in data assert data["source_ref"] == "working tree" def test_format_json_flag(self, repo: pathlib.Path) -> None: """--json flag produces JSON output.""" result = runner.invoke(cli, [ "code", "cat", "--json", "billing.py::validate_amount", ]) assert result.exit_code == 0 data = json.loads(result.output) assert "results" in data def test_format_text_is_default(self, repo: pathlib.Path) -> None: """Default (no flag) produces text output, not JSON.""" result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"]) assert result.exit_code == 0 # Text output starts with a '#' header, not a JSON brace. assert result.output.strip().startswith("#") def test_json_error_has_error_code(self, repo: pathlib.Path) -> None: """Every error entry in JSON output must have an error_code field.""" result = runner.invoke(cli, [ "code", "cat", "--json", "billing.py::zzz_missing", "nowhere.py::foo", ]) data = json.loads(result.output) for err in data["errors"]: assert "error_code" in err, f"error_code missing from {err}" # --------------------------------------------------------------------------- # Integration — --at historical snapshot # --------------------------------------------------------------------------- class TestCatAtRef: def test_at_head_works(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "cat", "--at", "HEAD", "billing.py::validate_amount", ]) assert result.exit_code == 0, result.output assert "validate_amount" in result.output def test_at_bad_ref_exits_one(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "cat", "--at", "zzz_bad_ref_xyz", "billing.py::validate_amount", ]) assert result.exit_code == 1 # --------------------------------------------------------------------------- # Integration — --staged flag # --------------------------------------------------------------------------- class TestCatStaged: """muse code cat --staged reads symbols from the staged version of a file.""" def test_staged_reads_staged_blob(self, repo: pathlib.Path) -> None: """--staged returns staged content, not the working-tree disk version.""" staged_content = ( "class Invoice:\n" " def compute_total(self, items: list[int]) -> int:\n" " return sum(items) + 99 # staged addition\n\n" " def apply_discount(self, total: float, pct: float) -> float:\n" " return total * (1 - pct)\n\n" "def validate_amount(amount: float) -> bool:\n" " return amount > 0\n\n" "def format_receipt(amount: float) -> str:\n" " return f'Total: {amount:.2f}'\n" ).encode() obj_id = _blob_id(staged_content) long_oid = _long_id(obj_id) write_object(repo, long_oid, staged_content) write_stage(repo, {"billing.py": make_entry(long_oid, "M")}) result = runner.invoke( cli, ["code", "cat", "--staged", "billing.py::Invoice.compute_total"] ) assert result.exit_code == 0 assert "99" in result.output # staged addition visible def test_staged_json_source_ref_is_staged(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "cat", "--staged", "--json", "billing.py::validate_amount"] ) assert result.exit_code == 0 data = json.loads(result.output) assert data.get("source_ref") == "staged" def test_staged_falls_back_to_committed_when_no_stage_entry( self, repo: pathlib.Path ) -> None: """File in HEAD but not in stage → --staged falls back to HEAD version.""" result = runner.invoke( cli, ["code", "cat", "--staged", "billing.py::validate_amount"] ) assert result.exit_code == 0 assert "validate_amount" in result.output def test_staged_and_at_mutually_exclusive(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "cat", "--staged", "--at", "HEAD", "billing.py::validate_amount"], ) assert result.exit_code != 0 def test_staged_and_at_json_has_error(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, [ "code", "cat", "--staged", "--at", "HEAD", "--json", "billing.py::validate_amount", ], ) assert result.exit_code != 0 data = json.loads(result.output) assert "error" in data assert "mutually exclusive" in data["error"].lower() def test_staged_untracked_file_errors(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "cat", "--staged", "nowhere.py::foo"] ) assert result.exit_code != 0 def test_staged_flag_registered(self) -> None: import argparse as _ap from muse.cli.commands.cat import register as _reg p = _ap.ArgumentParser() subs = p.add_subparsers(dest="cmd") _reg(subs) args = p.parse_args(["cat", "--staged", "f.py::fn"]) assert args.staged is True def test_staged_default_is_false(self) -> None: import argparse as _ap from muse.cli.commands.cat import register as _reg p = _ap.ArgumentParser() subs = p.add_subparsers(dest="cmd") _reg(subs) args = p.parse_args(["cat", "f.py::fn"]) assert args.staged is False # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestCatSecurity: def test_requires_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) result = runner.invoke(cli, ["code", "cat", "billing.py::foo"]) assert result.exit_code != 0 def test_control_chars_in_address_do_not_crash(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::foo\x01bar"]) assert result.exit_code in (0, 1) # must not raise unhandled exception def test_symlink_workdir_rejected( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """A tracked file that is actually a symlink must be rejected.""" # Create a real file so the manifest knows about it, then replace with a symlink. real = repo / "billing.py" link = repo / "link.py" link.symlink_to(real) # Inject the symlink path into a fake manifest and call _get_file_bytes directly. fake_manifest = {"link.py": "a" * 64} with pytest.raises(_FileError) as exc_info: _get_file_bytes(repo, "link.py", fake_manifest, source_is_workdir=True) assert exc_info.value.code == "SYMLINK_REJECTED" def test_path_traversal_rejected( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """A manifest entry with '..' that escapes the repo must be rejected.""" fake_manifest = {"../outside.py": "a" * 64} with pytest.raises(_FileError) as exc_info: _get_file_bytes(repo, "../outside.py", fake_manifest, source_is_workdir=True) assert exc_info.value.code in ("PATH_TRAVERSAL", "FILE_NOT_TRACKED") def test_blob_not_found_gives_precise_error_code( self, repo: pathlib.Path ) -> None: """Missing blob raises _FileError with BLOB_NOT_FOUND, not generic exit.""" fake_manifest = {"billing.py": NULL_LONG_ID} # blob that doesn't exist in store with pytest.raises(_FileError) as exc_info: _get_file_bytes(repo, "billing.py", fake_manifest, source_is_workdir=False) assert exc_info.value.code == "BLOB_NOT_FOUND" def test_symlink_in_json_gives_error_code(self, repo: pathlib.Path) -> None: """Symlink rejection surfaces as a JSON error, not a crash.""" link = repo / "symlink_billing.py" link.symlink_to(repo / "billing.py") # Commit so symlink_billing.py appears in the manifest (won't — symlinks # are not tracked by the code plugin, so we test via _all_ on an untracked path). result = runner.invoke(cli, ["code", "cat", "--json", "symlink_billing.py::foo"]) # Either exit 0 with an error in the errors list, or exit 1 — never a crash. assert result.exit_code in (0, 1) try: data = json.loads(result.output) # If JSON, errors list must be non-empty or results non-empty. assert isinstance(data, dict) except json.JSONDecodeError: pass # text-mode output is also acceptable here # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestCatStress: @pytest.fixture def large_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) runner.invoke(cli, ["init", "--domain", "code"]) lines: list[str] = [] for i in range(200): lines.append(f"def symbol_{i:04d}(x: int) -> int:") lines.append(f" return x + {i}") lines.append("") (tmp_path / "big.py").write_text("\n".join(lines)) r = runner.invoke(cli, ["commit", "-m", "big module"]) assert r.exit_code == 0, r.output return tmp_path def test_all_200_symbols_under_5s(self, large_repo: pathlib.Path) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "cat", "--all", "big.py"]) elapsed = time.monotonic() - start assert result.exit_code == 0, result.output assert elapsed < 5.0, f"--all on 200 symbols took {elapsed:.2f}s" assert "symbol_0000" in result.output assert "symbol_0199" in result.output def test_50_addresses_batch_under_5s(self, large_repo: pathlib.Path) -> None: addresses = [f"big.py::symbol_{i:04d}" for i in range(50)] start = time.monotonic() result = runner.invoke(cli, ["code", "cat", "--json"] + addresses) elapsed = time.monotonic() - start assert result.exit_code == 0, result.output assert elapsed < 5.0, f"50-address batch took {elapsed:.2f}s" data = json.loads(result.output) assert len(data["results"]) == 50 def test_all_json_200_symbols_schema_valid(self, large_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "--all", "--json", "big.py"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert len(data["results"]) == 200 for r in data["results"]: assert "source" in r assert "lineno" in r def test_file_cache_batch_same_file_under_2s(self, large_repo: pathlib.Path) -> None: """50 addresses to the same file should benefit from caching: one read, one parse.""" addresses = [f"big.py::symbol_{i:04d}" for i in range(50)] start = time.monotonic() result = runner.invoke(cli, ["code", "cat", "--json"] + addresses) elapsed = time.monotonic() - start assert result.exit_code == 0, result.output # With caching, 50 same-file lookups should be fast. assert elapsed < 2.0, f"50 same-file addresses took {elapsed:.2f}s — cache may not be working" data = json.loads(result.output) assert len(data["results"]) == 50 def test_duration_ms_in_large_batch(self, large_repo: pathlib.Path) -> None: addresses = [f"big.py::symbol_{i:04d}" for i in range(20)] result = runner.invoke(cli, ["code", "cat", "--json"] + addresses) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.cat import register as _register_cat def _parse_cat(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_cat(subs) return root_p.parse_args(["cat", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_cat("src/foo.py::MyFn") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_cat("src/foo.py::MyFn", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_cat("src/foo.py::MyFn", "-j") assert ns.json_out is True def test_line_numbers_has_no_n_shorthand(self) -> None: import pytest with pytest.raises(SystemExit): _parse_cat("src/foo.py::MyFn", "-n") def test_all_flag(self) -> None: ns = _parse_cat("src/foo.py", "--all") assert ns.all_symbols is True # --------------------------------------------------------------------------- # Phase 3: untracked files error; staged files readable from object store # --------------------------------------------------------------------------- def _stage_py(repo: pathlib.Path, rel_path: str, content: bytes) -> None: """Write blob to object store and record a stage entry — simulates muse code add.""" obj_id = _long_id(_blob_id(content)) write_object(repo, obj_id, content) write_stage(repo, {rel_path: make_entry(obj_id, "A")}) _STAGED_SRC = textwrap.dedent("""\ def staged_fn(x: int) -> int: return x * 2 class StagedClass: def method(self) -> None: pass """).encode() class TestUntrackedFileErrors: """muse code cat must reject files that exist on disk but are not tracked.""" def test_untracked_on_disk_address_exits_nonzero( self, repo: pathlib.Path ) -> None: (repo / "untracked.py").write_text("def ghost(): pass\n") result = runner.invoke(cli, ["code", "cat", "untracked.py::ghost"]) assert result.exit_code != 0 def test_untracked_on_disk_all_mode_exits_nonzero( self, repo: pathlib.Path ) -> None: (repo / "untracked.py").write_text("def ghost(): pass\n") result = runner.invoke(cli, ["code", "cat", "--all", "untracked.py"]) assert result.exit_code != 0 def test_untracked_on_disk_json_error_code(self, repo: pathlib.Path) -> None: (repo / "untracked.py").write_text("def ghost(): pass\n") result = runner.invoke( cli, ["code", "cat", "untracked.py::ghost", "--json"] ) assert result.exit_code != 0 data = json.loads(result.output) assert data["errors"][0]["error_code"] == "FILE_NOT_TRACKED" def test_untracked_content_not_leaked(self, repo: pathlib.Path) -> None: (repo / "untracked.py").write_text("secret = 'xyzzy'\n") result = runner.invoke( cli, ["code", "cat", "untracked.py::secret", "--json"] ) assert "xyzzy" not in result.output def test_tracked_file_still_works(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"]) assert result.exit_code == 0 assert "validate_amount" in result.output class TestStagedFileReads: """muse code cat must read staged-but-uncommitted files from the object store.""" def test_staged_symbol_readable(self, repo: pathlib.Path) -> None: _stage_py(repo, "new_mod.py", _STAGED_SRC) (repo / "new_mod.py").write_bytes(_STAGED_SRC) result = runner.invoke(cli, ["code", "cat", "new_mod.py::staged_fn"]) assert result.exit_code == 0 assert "staged_fn" in result.output def test_staged_all_mode_readable(self, repo: pathlib.Path) -> None: _stage_py(repo, "new_mod.py", _STAGED_SRC) (repo / "new_mod.py").write_bytes(_STAGED_SRC) result = runner.invoke(cli, ["code", "cat", "--all", "new_mod.py"]) assert result.exit_code == 0 assert "staged_fn" in result.output assert "StagedClass" in result.output def test_staged_json_schema(self, repo: pathlib.Path) -> None: _stage_py(repo, "new_mod.py", _STAGED_SRC) (repo / "new_mod.py").write_bytes(_STAGED_SRC) result = runner.invoke( cli, ["code", "cat", "new_mod.py::staged_fn", "--json"] ) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) == 1 assert data["results"][0]["symbol"] == "staged_fn" def test_staged_blob_deleted_from_disk_still_readable( self, repo: pathlib.Path ) -> None: """Staged blob readable even after the file is removed from disk.""" disk = repo / "new_mod.py" disk.write_bytes(_STAGED_SRC) _stage_py(repo, "new_mod.py", _STAGED_SRC) disk.unlink() result = runner.invoke(cli, ["code", "cat", "new_mod.py::staged_fn"]) assert result.exit_code == 0 assert "staged_fn" in result.output def test_staged_content_matches_blob(self, repo: pathlib.Path) -> None: _stage_py(repo, "new_mod.py", _STAGED_SRC) (repo / "new_mod.py").write_bytes(_STAGED_SRC) result = runner.invoke( cli, ["code", "cat", "new_mod.py::staged_fn", "--json"] ) data = json.loads(result.output) assert "x * 2" in data["results"][0]["source"]