"""Tests for ``muse cat`` — file-level, domain-agnostic content reader. ``muse cat`` is the core-VCS primitive: give me the raw bytes of a tracked file at HEAD or any ref. It does not parse symbols — that is ``muse code cat``. Mirrors the relationship between ``muse blame`` (line-level) and ``muse code blame`` (symbol-level). 7-tier coverage --------------- Unit argument parsing, address validation (:: rejected) Integration single file, multi-file, --at ref, --json schema E2E historical ref shows old content; working-tree shows new Security symlink rejected, path traversal rejected, ANSI in path Stress large file (1 MiB) completes fast Data integrity JSON content == disk bytes; --at content != HEAD content Performance single file < 0.3s; multi-file 10 files < 1s """ from __future__ import annotations from collections.abc import Mapping import json import pathlib import textwrap import time import hashlib import datetime import pytest from tests.cli_test_helper import CliRunner from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import long_id, blob_id from muse.core.paths import muse_dir, ref_path from muse.plugins.code.stage import make_entry, write_stage cli = None runner = CliRunner() _REPO_ID = "core-cat-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path, repo_id: str = _REPO_ID) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": repo_id, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _add_file(repo: pathlib.Path, rel_path: str, content: bytes) -> str: obj_id = long_id(blob_id(content)) write_object(repo, obj_id, content) full = repo / rel_path full.parent.mkdir(parents=True, exist_ok=True) full.write_bytes(content) return obj_id def _commit( repo: pathlib.Path, files: Mapping[str, bytes], message: str = "c", parent_id: str | None = None, branch: str = "main", ) -> str: global _counter _counter += 1 manifest = {p: _add_file(repo, p, c) for p, c in files.items()} snap_id = hash_snapshot(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) now = datetime.datetime.now(datetime.timezone.utc) cid = hash_commit( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=now.isoformat(), ) write_commit(repo, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=message, committed_at=now, parent_commit_id=parent_id, )) (ref_path(repo, branch)).write_text(cid, encoding="utf-8") return cid # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- _CONTENT_V1 = b"# version 1\nHELLO = 'world'\n" _CONTENT_V2 = b"# version 2\nHELLO = 'updated'\n" @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: _init_repo(tmp_path) _commit(tmp_path, {"readme.md": b"# readme\n", "src/main.py": _CONTENT_V1}) return tmp_path @pytest.fixture def two_commit_repo(tmp_path: pathlib.Path) -> pathlib.Path: _init_repo(tmp_path) c1 = _commit(tmp_path, {"src/main.py": _CONTENT_V1}, message="v1") _commit(tmp_path, {"src/main.py": _CONTENT_V2}, message="v2", parent_id=c1) return tmp_path # --------------------------------------------------------------------------- # Unit: argument validation # --------------------------------------------------------------------------- class TestArgumentValidation: def test_no_args_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat"], env=_env(repo)) assert result.exit_code != 0 def test_no_args_json_is_valid_json_with_error(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "--json"], env=_env(repo)) assert result.exit_code != 0 data = json.loads(result.output) assert "error" in data def test_symbol_address_rejected(self, repo: pathlib.Path) -> None: """muse cat does not accept file.py::Symbol — that is muse code cat.""" result = runner.invoke(cli, ["cat", "src/main.py::HELLO"], env=_env(repo)) assert result.exit_code != 0 def test_symbol_address_json_has_error_code(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "src/main.py::HELLO", "--json"], env=_env(repo) ) assert result.exit_code != 0 data = json.loads(result.output) assert "error" in data def test_untracked_file_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "nothere.txt"], env=_env(repo)) assert result.exit_code != 0 def test_untracked_file_json_has_error_code(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "nothere.txt", "--json"], env=_env(repo) ) assert result.exit_code != 0 data = json.loads(result.output) # Single file error: multi-file schema with errors list assert "errors" in data assert len(data["errors"]) == 1 assert "error_code" in data["errors"][0] # --------------------------------------------------------------------------- # Integration: single file # --------------------------------------------------------------------------- class TestSingleFile: def test_prints_file_content(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "readme.md"], env=_env(repo)) assert result.exit_code == 0 assert "# readme" in result.output def test_json_schema_single_file(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "readme.md", "--json"], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.output) assert "path" in data assert "content" in data assert "size_bytes" in data assert "source_ref" in data assert "duration_ms" in data def test_json_file_path_correct(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "readme.md", "--json"], env=_env(repo)) data = json.loads(result.output) assert data["path"] == "readme.md" def test_json_source_ref_working_tree(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "readme.md", "--json"], env=_env(repo)) data = json.loads(result.output) assert data["source_ref"] == "working tree" def test_json_size_bytes_accurate(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "readme.md", "--json"], env=_env(repo)) data = json.loads(result.output) assert data["size_bytes"] == len(b"# readme\n") def test_json_shorthand_flag(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "readme.md", "-j"], env=_env(repo)) assert result.exit_code == 0 json.loads(result.output) # valid JSON def test_subdirectory_file(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "src/main.py"], env=_env(repo)) assert result.exit_code == 0 assert "HELLO" in result.output # --------------------------------------------------------------------------- # Integration: multi-file # --------------------------------------------------------------------------- class TestMultiFile: def test_multi_file_json_has_files_key(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "readme.md", "src/main.py", "--json"], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.output) assert "files" in data assert len(data["files"]) == 2 def test_multi_file_json_each_has_schema(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "readme.md", "src/main.py", "--json"], env=_env(repo) ) data = json.loads(result.output) for entry in data["files"]: assert "path" in entry assert "content" in entry assert "size_bytes" in entry def test_multi_file_text_prints_all(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "readme.md", "src/main.py"], env=_env(repo) ) assert result.exit_code == 0 assert "# readme" in result.output assert "HELLO" in result.output def test_multi_file_one_missing_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "readme.md", "missing.txt", "--json"], env=_env(repo) ) assert result.exit_code != 0 data = json.loads(result.output) assert len(data["errors"]) == 1 assert len(data["files"]) == 1 # the valid file still returned # --------------------------------------------------------------------------- # Integration: --at ref # --------------------------------------------------------------------------- class TestAtRef: def test_at_old_commit_shows_old_content( self, two_commit_repo: pathlib.Path ) -> None: log = runner.invoke(cli, ["log", "--json"], env=_env(two_commit_repo)) old_cid = json.loads(log.output)["commits"][-1]["commit_id"] result = runner.invoke( cli, ["cat", "src/main.py", "--at", old_cid], env=_env(two_commit_repo) ) assert result.exit_code == 0 assert "version 1" in result.output assert "updated" not in result.output def test_at_json_source_ref_contains_commit( self, two_commit_repo: pathlib.Path ) -> None: log = runner.invoke(cli, ["log", "--json"], env=_env(two_commit_repo)) old_cid = json.loads(log.output)["commits"][-1]["commit_id"] result = runner.invoke( cli, ["cat", "src/main.py", "--at", old_cid, "--json"], env=_env(two_commit_repo), ) data = json.loads(result.output) assert "commit" in data["source_ref"] def test_at_branch_name(self, two_commit_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "src/main.py", "--at", "main"], env=_env(two_commit_repo) ) assert result.exit_code == 0 assert "version 2" in result.output def test_at_bad_ref_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "readme.md", "--at", "deadbeef00"], env=_env(repo) ) assert result.exit_code != 0 def test_at_bad_ref_json_has_error(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "readme.md", "--at", "deadbeef00", "--json"], env=_env(repo) ) assert result.exit_code != 0 data = json.loads(result.output) assert "error" in data # --------------------------------------------------------------------------- # E2E: working tree vs historical # --------------------------------------------------------------------------- class TestE2E: def test_head_shows_latest_content(self, two_commit_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "src/main.py"], env=_env(two_commit_repo) ) assert result.exit_code == 0 assert "version 2" in result.output def test_old_ref_shows_old_content(self, two_commit_repo: pathlib.Path) -> None: log = runner.invoke(cli, ["log", "--json"], env=_env(two_commit_repo)) old_cid = json.loads(log.output)["commits"][-1]["commit_id"] result = runner.invoke( cli, ["cat", "src/main.py", "--at", old_cid], env=_env(two_commit_repo) ) assert result.exit_code == 0 assert "version 1" in result.output def test_working_tree_edit_visible_without_at( self, repo: pathlib.Path ) -> None: """Uncommitted edit on disk should appear when no --at is given.""" (repo / "readme.md").write_text("# modified\n", encoding="utf-8") result = runner.invoke(cli, ["cat", "readme.md"], env=_env(repo)) assert result.exit_code == 0 assert "modified" in result.output def test_requires_repo(self, tmp_path: pathlib.Path) -> None: no_repo = tmp_path / "no_repo" no_repo.mkdir() result = runner.invoke(cli, ["cat", "file.py"], env=_env(no_repo)) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_symlink_rejected(self, repo: pathlib.Path) -> None: link = repo / "link.md" link.symlink_to("/etc/passwd") result = runner.invoke(cli, ["cat", "link.md"], env=_env(repo)) assert result.exit_code != 0 def test_path_traversal_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "../../../etc/passwd"], env=_env(repo) ) assert result.exit_code != 0 def test_ansi_in_path_not_in_output(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "\x1b[31mreadme.md\x1b[0m"], env=_env(repo) ) assert result.exit_code != 0 assert "\x1b[31m" not in result.output def test_newline_in_path_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "read\nme.md"], env=_env(repo)) assert result.exit_code != 0 def test_null_byte_in_path_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "read\x00me.md"], env=_env(repo)) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_json_content_equals_disk_bytes(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["cat", "readme.md", "--json"], env=_env(repo)) data = json.loads(result.output) disk = (repo / "readme.md").read_bytes().decode("utf-8", errors="replace") assert data["content"] == disk def test_json_size_bytes_equals_len_of_content_utf8( self, repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["cat", "src/main.py", "--json"], env=_env(repo)) data = json.loads(result.output) assert data["size_bytes"] == len(data["content"].encode("utf-8")) def test_at_content_differs_from_head( self, two_commit_repo: pathlib.Path ) -> None: log = runner.invoke(cli, ["log", "--json"], env=_env(two_commit_repo)) old_cid = json.loads(log.output)["commits"][-1]["commit_id"] head = json.loads( runner.invoke( cli, ["cat", "src/main.py", "--json"], env=_env(two_commit_repo) ).output )["content"] old = json.loads( runner.invoke( cli, ["cat", "src/main.py", "--at", old_cid, "--json"], env=_env(two_commit_repo), ).output )["content"] assert head != old def test_multi_file_sizes_sum_correctly(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["cat", "readme.md", "src/main.py", "--json"], env=_env(repo) ) data = json.loads(result.output) for entry in data["files"]: assert entry["size_bytes"] == len( entry["content"].encode("utf-8", errors="replace") ) def test_empty_file_handled(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"empty.txt": b""}) result = runner.invoke( cli, ["cat", "empty.txt", "--json"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["content"] == "" assert data["size_bytes"] == 0 # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_large_file_1mib(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) content = b"x" * (1024 * 1024) _commit(tmp_path, {"large.bin": content}) result = runner.invoke(cli, ["cat", "large.bin"], env=_env(tmp_path)) assert result.exit_code == 0 assert len(result.output.encode()) >= 1024 * 1024 def test_10_files_json(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) files = {f"file_{i}.txt": f"content {i}\n".encode() for i in range(10)} _commit(tmp_path, files) args = ["cat"] + list(files.keys()) + ["--json"] result = runner.invoke(cli, args, env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["files"]) == 10 # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- class TestPerformance: def test_single_file_under_300ms(self, repo: pathlib.Path) -> None: t0 = time.monotonic() result = runner.invoke(cli, ["cat", "readme.md"], env=_env(repo)) elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 0.3 def test_10_files_under_1s(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) files = {f"f{i}.txt": f"line {i}\n".encode() for i in range(10)} _commit(tmp_path, files) args = ["cat"] + list(files.keys()) + ["--json"] t0 = time.monotonic() result = runner.invoke(cli, args, env=_env(tmp_path)) elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 1.0 def test_large_file_json_under_1s(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) content = b"a" * (512 * 1024) _commit(tmp_path, {"half_mib.txt": content}) t0 = time.monotonic() result = runner.invoke( cli, ["cat", "half_mib.txt", "--json"], env=_env(tmp_path) ) elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 1.0 # --------------------------------------------------------------------------- # Phase 2: untracked files error; staged files read from object store # --------------------------------------------------------------------------- def _stage_file(repo: pathlib.Path, rel_path: str, content: bytes) -> str: """Write blob to object store and add a stage entry — simulates muse code add.""" obj_id = long_id(blob_id(content)) write_object(repo, obj_id, content) entries = {rel_path: make_entry(obj_id, "A")} write_stage(repo, entries) return obj_id class TestUntrackedFileErrors: """muse cat must error on files that exist on disk but are not tracked.""" def test_untracked_on_disk_exits_nonzero(self, repo: pathlib.Path) -> None: (repo / "ghost.py").write_text("x = 1\n") result = runner.invoke(cli, ["cat", "ghost.py"], env=_env(repo)) assert result.exit_code != 0 def test_untracked_on_disk_json_error_code_is_file_not_tracked( self, repo: pathlib.Path ) -> None: (repo / "ghost.py").write_text("x = 1\n") result = runner.invoke(cli, ["cat", "ghost.py", "--json"], env=_env(repo)) assert result.exit_code != 0 data = json.loads(result.output) assert data["errors"][0]["error_code"] == "FILE_NOT_TRACKED" def test_untracked_on_disk_does_not_leak_content( self, repo: pathlib.Path ) -> None: """Content of an untracked file must never appear in the output.""" (repo / "secret.py").write_text("password = 'hunter2'\n") result = runner.invoke(cli, ["cat", "secret.py"], env=_env(repo)) assert "hunter2" not in result.output def test_untracked_on_disk_json_does_not_leak_content( self, repo: pathlib.Path ) -> None: (repo / "secret.py").write_text("password = 'hunter2'\n") result = runner.invoke(cli, ["cat", "secret.py", "--json"], env=_env(repo)) assert "hunter2" not in result.output def test_tracked_file_still_readable(self, repo: pathlib.Path) -> None: """Tracked files must not be broken by the untracked-check change.""" result = runner.invoke(cli, ["cat", "readme.md"], env=_env(repo)) assert result.exit_code == 0 assert "# readme" in result.output class TestStagedFileReads: """muse cat must read staged files (not yet committed) from the object store.""" def test_staged_added_file_readable(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"existing.py": b"old = 1\n"}) content = b"new_fn = lambda: 42\n" (tmp_path / "new_file.py").write_bytes(content) _stage_file(tmp_path, "new_file.py", content) result = runner.invoke(cli, ["cat", "new_file.py"], env=_env(tmp_path)) assert result.exit_code == 0 assert "new_fn" in result.output def test_staged_added_file_json_schema(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"base.py": b"x = 1\n"}) content = b"staged = True\n" (tmp_path / "staged.py").write_bytes(content) _stage_file(tmp_path, "staged.py", content) result = runner.invoke(cli, ["cat", "staged.py", "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert "path" in data assert "content" in data assert "staged" in data["content"] def test_staged_but_not_on_disk_reads_from_store( self, tmp_path: pathlib.Path ) -> None: """Staged blob readable even after the file is deleted from disk.""" _init_repo(tmp_path) _commit(tmp_path, {"base.py": b"x = 1\n"}) content = b"will_be_deleted = True\n" disk_path = tmp_path / "staged_only.py" disk_path.write_bytes(content) _stage_file(tmp_path, "staged_only.py", content) disk_path.unlink() # simulate file removed from disk after staging result = runner.invoke(cli, ["cat", "staged_only.py"], env=_env(tmp_path)) assert result.exit_code == 0 assert "will_be_deleted" in result.output def test_staged_file_json_content_matches_blob( self, tmp_path: pathlib.Path ) -> None: _init_repo(tmp_path) _commit(tmp_path, {"base.py": b"x = 1\n"}) content = b"blob_content = 'exact'\n" (tmp_path / "check.py").write_bytes(content) _stage_file(tmp_path, "check.py", content) result = runner.invoke(cli, ["cat", "check.py", "--json"], env=_env(tmp_path)) data = json.loads(result.output) assert data["content"] == content.decode("utf-8") assert data["size_bytes"] == len(content) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.core_cat import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["cat"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.core_cat import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["cat", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.core_cat import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["cat", "-j"]) assert args.json_out is True # --------------------------------------------------------------------------- # Phase 4: --staged flag # --------------------------------------------------------------------------- def _stage_file_core(repo: pathlib.Path, rel_path: str, content: bytes) -> str: """Write blob to object store and add a stage entry — simulates muse code add.""" obj_id = long_id(blob_id(content)) write_object(repo, obj_id, content) entries = {rel_path: make_entry(obj_id, "A")} write_stage(repo, entries) full = repo / rel_path full.parent.mkdir(parents=True, exist_ok=True) full.write_bytes(content) return obj_id class TestCatStaged: """muse cat --staged reads the staged index rather than disk or a commit.""" def test_staged_reads_staged_blob_not_disk(self, tmp_path: pathlib.Path) -> None: """--staged returns staged content even when disk has a different version.""" _init_repo(tmp_path) committed = b"committed = 1\n" _commit(tmp_path, {"f.py": committed}) staged = b"staged = 2\n" obj_id = long_id(blob_id(staged)) write_object(tmp_path, obj_id, staged) write_stage(tmp_path, {"f.py": make_entry(obj_id, "M")}) # Disk still has old content — stage has new content result = runner.invoke(cli, ["cat", "f.py", "--staged"], env=_env(tmp_path)) assert result.exit_code == 0 assert "staged = 2" in result.output assert "committed = 1" not in result.output def test_staged_json_source_ref_is_staged(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"f.py": b"x = 1\n"}) content = b"y = 2\n" _stage_file_core(tmp_path, "f.py", content) result = runner.invoke( cli, ["cat", "f.py", "--staged", "--json"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert data["source_ref"] == "staged" def test_staged_on_committed_file_without_restaing_shows_committed_content( self, tmp_path: pathlib.Path ) -> None: """A committed file with no stage entry returns the HEAD version.""" _init_repo(tmp_path) content = b"committed_only = True\n" _commit(tmp_path, {"only.py": content}) result = runner.invoke( cli, ["cat", "only.py", "--staged"], env=_env(tmp_path) ) assert result.exit_code == 0 assert "committed_only" in result.output def test_staged_deletion_gives_file_not_tracked(self, tmp_path: pathlib.Path) -> None: """A file staged for deletion (mode=D) is not readable via --staged.""" _init_repo(tmp_path) _commit(tmp_path, {"del.py": b"to_delete = 1\n"}) write_stage(tmp_path, {"del.py": make_entry("a" * 64, "D")}) result = runner.invoke( cli, ["cat", "del.py", "--staged"], env=_env(tmp_path) ) assert result.exit_code != 0 def test_staged_deletion_json_error_code(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"del.py": b"to_delete = 1\n"}) write_stage(tmp_path, {"del.py": make_entry("a" * 64, "D")}) result = runner.invoke( cli, ["cat", "del.py", "--staged", "--json"], env=_env(tmp_path) ) assert result.exit_code != 0 data = json.loads(result.output) assert data["errors"][0]["error_code"] == "FILE_NOT_TRACKED" def test_staged_and_at_mutually_exclusive(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"f.py": b"x = 1\n"}) result = runner.invoke( cli, ["cat", "f.py", "--staged", "--at", "HEAD"], env=_env(tmp_path) ) assert result.exit_code != 0 def test_staged_and_at_json_error_code(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"f.py": b"x = 1\n"}) result = runner.invoke( cli, ["cat", "f.py", "--staged", "--at", "HEAD", "--json"], env=_env(tmp_path), ) assert result.exit_code != 0 data = json.loads(result.output) assert "error_code" in data assert data["error_code"] == "MUTUALLY_EXCLUSIVE" def test_staged_untracked_file_not_readable(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, {"f.py": b"x = 1\n"}) result = runner.invoke( cli, ["cat", "untracked.py", "--staged"], env=_env(tmp_path) ) assert result.exit_code != 0 def test_staged_ignores_working_tree_edits(self, tmp_path: pathlib.Path) -> None: """--staged must not be influenced by on-disk edits after staging.""" _init_repo(tmp_path) _commit(tmp_path, {"f.py": b"v1 = 1\n"}) staged_content = b"v2 = 2\n" obj_id = long_id(blob_id(staged_content)) write_object(tmp_path, obj_id, staged_content) write_stage(tmp_path, {"f.py": make_entry(obj_id, "M")}) # After staging, modify the file again on disk (not re-staged) (tmp_path / "f.py").write_text("v3 = 3\n", encoding="utf-8") result = runner.invoke( cli, ["cat", "f.py", "--staged"], env=_env(tmp_path) ) assert result.exit_code == 0 assert "v2 = 2" in result.output assert "v3 = 3" not in result.output def test_staged_flag_registered(self) -> None: import argparse from muse.cli.commands.core_cat import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["cat", "--staged", "f.py"]) assert args.staged is True def test_staged_default_is_false(self) -> None: import argparse from muse.cli.commands.core_cat import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["cat", "f.py"]) assert args.staged is False