"""Tests for ``muse code checkout-symbol``. Coverage layers --------------- Unit _extract_lines — normal, out-of-bounds (high, low, swapped), empty source. _find_symbol_in_source — hit, miss, uses repo-relative path (regression guard for the absolute-path bug). Integration (live repo, CliRunner) Exits zero for valid restore. JSON schema: all required keys present, correct types. JSON: schema_version, branch, restored_from (8-char hex), changed, appended, verified, verified_preview. --dry-run: file not written, output contains diff markers. --dry-run --json: diff_lines + verified_preview in JSON output. No-op: symbol already matches — changed=false, file unchanged, verified=true. Empty historical lines from corrupted snapshot → exits non-zero before write. ADDRESS without '::' rejected (exit non-zero). Path-traversal ADDRESS rejected (exit non-zero). --commit invalid ref rejected (exit non-zero). File not in historical snapshot exits non-zero. Symbol not in historical snapshot exits non-zero. Missing repo exits non-zero. Text output contains expected lines. Appended path: symbol absent from working tree → appended at EOF. E2E (real symbol changes across commits) Restore replaces correct lines — surrounding code unchanged. Restore from HEAD~1 brings back previous implementation. Bug fix: absolute-path lookup — symbol IS found in current working tree (not silently appended every time). No-op detection: re-running restore is idempotent. Dry-run diff is accurate — applying it would yield the historical file. Appended symbol can be found by parse_symbols after write. File content equals expected bytes after restore. verified=True on a clean restore. verified_preview=True in dry-run for a valid restore. verified=False triggers warning when splice is unresolvable (monkeypatched). Post-write verification failure does not suppress the write. Stress Restore from commit far back in history: still correct. Large file (1 000-line source): only symbol lines change. Repeated restore is idempotent and fast. """ from __future__ import annotations import json import pathlib import textwrap import time from typing import TypedDict import pytest from muse.core.types import split_id from tests.cli_test_helper import CliRunner from muse.cli.commands.checkout_symbol import _extract_lines, _find_symbol_in_source from muse.plugins.code.ast_parser import SymbolRecord, parse_symbols cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Typed JSON payload # --------------------------------------------------------------------------- class _CheckoutPayload(TypedDict, total=False): schema_version: str address: str file: str branch: str restored_from: str dry_run: bool changed: bool appended: bool current_start: int current_end: int historical_line_count: int diff_lines: list[str] verified: bool # present on write and no-op paths verified_preview: bool # present on dry-run path only # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _invoke_json(args: list[str]) -> _CheckoutPayload: result = runner.invoke(cli, ["code", "checkout-symbol"] + args + ["--json"]) assert result.exit_code == 0, result.output raw: _CheckoutPayload = json.loads(result.output) return raw # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["init", "--domain", "code"]) assert result.exit_code == 0, result.output return tmp_path @pytest.fixture def two_commit_repo(repo: pathlib.Path) -> tuple[pathlib.Path, str, str]: """Repo with two commits containing different implementations of compute(). commit 1 (HEAD~1): compute returns sum(items) commit 2 (HEAD): compute returns sum(items) * 2 """ (repo / "billing.py").write_text(textwrap.dedent("""\ def header(): return "billing" def compute(items): return sum(items) def footer(): return "end" """)) runner.invoke(cli, ["code", "add", "."]) r1 = runner.invoke(cli, ["commit", "-m", "v1"]) assert r1.exit_code == 0, r1.output (repo / "billing.py").write_text(textwrap.dedent("""\ def header(): return "billing" def compute(items): return sum(items) * 2 def footer(): return "end" """)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "v2"]) assert r2.exit_code == 0, r2.output return repo, "billing.py::compute", "billing.py" @pytest.fixture def single_commit_repo(repo: pathlib.Path) -> pathlib.Path: """Minimal repo: one commit, one function.""" (repo / "utils.py").write_text(textwrap.dedent("""\ def greet(name): return f"Hello, {name}" """)) r = runner.invoke(cli, ["commit", "-m", "init"]) assert r.exit_code == 0, r.output return repo # --------------------------------------------------------------------------- # Unit — _extract_lines # --------------------------------------------------------------------------- class TestExtractLines: def _src(self, n: int = 5) -> bytes: return "\n".join(f"line {i}" for i in range(1, n + 1)).encode() def test_full_range(self) -> None: src = self._src(3) assert _extract_lines(src, 1, 3) == ["line 1\n", "line 2\n", "line 3"] def test_single_line(self) -> None: src = self._src(5) result = _extract_lines(src, 3, 3) assert len(result) == 1 assert "line 3" in result[0] def test_middle_range(self) -> None: src = self._src(5) result = _extract_lines(src, 2, 4) assert len(result) == 3 def test_out_of_bounds_end_returns_empty(self) -> None: src = self._src(3) result = _extract_lines(src, 1, 10) assert result == [] def test_out_of_bounds_start_zero_returns_empty(self) -> None: src = self._src(3) result = _extract_lines(src, 0, 2) assert result == [] def test_swapped_range_returns_empty(self) -> None: src = self._src(5) result = _extract_lines(src, 4, 2) assert result == [] def test_empty_source_returns_empty(self) -> None: result = _extract_lines(b"", 1, 1) assert result == [] def test_last_line_no_trailing_newline(self) -> None: src = b"a\nb\nc" result = _extract_lines(src, 3, 3) assert result == ["c"] def test_keepends_true(self) -> None: src = b"a\nb\nc\n" result = _extract_lines(src, 1, 2) assert result == ["a\n", "b\n"] # --------------------------------------------------------------------------- # Unit — _find_symbol_in_source # --------------------------------------------------------------------------- class TestFindSymbolInSource: def _src(self) -> bytes: return textwrap.dedent("""\ def alpha(): return 1 def beta(): return 2 """).encode() def test_found_returns_record(self) -> None: rec = _find_symbol_in_source(self._src(), "a.py", "a.py::alpha") assert rec is not None assert rec["name"] == "alpha" def test_not_found_returns_none(self) -> None: rec = _find_symbol_in_source(self._src(), "a.py", "a.py::missing") assert rec is None def test_uses_repo_relative_path_not_absolute(self) -> None: """Regression guard: address must use repo-relative file_rel, not /abs/path.""" src = b"def fn():\n pass\n" # Correct: repo-relative rec_rel = _find_symbol_in_source(src, "src/mod.py", "src/mod.py::fn") assert rec_rel is not None, "Should find symbol with repo-relative path" # Wrong: absolute path — should NOT find it under the relative address rec_abs = _find_symbol_in_source(src, "/abs/src/mod.py", "src/mod.py::fn") assert rec_abs is None, "Absolute path prefix must not match relative address" def test_second_symbol_found(self) -> None: rec = _find_symbol_in_source(self._src(), "m.py", "m.py::beta") assert rec is not None assert rec["name"] == "beta" def test_line_numbers_are_1_indexed(self) -> None: src = b"def fn():\n return 1\n" rec = _find_symbol_in_source(src, "f.py", "f.py::fn") assert rec is not None assert rec["lineno"] >= 1 assert rec["end_lineno"] >= rec["lineno"] # --------------------------------------------------------------------------- # Integration — basic CLI # --------------------------------------------------------------------------- class TestCheckoutSymbolBasic: def test_restore_exits_zero(self, two_commit_repo: tuple[pathlib.Path, str, str]) -> None: _, address, _ = two_commit_repo result = runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) assert result.exit_code == 0, result.output def test_no_address_separator_exits_nonzero( self, single_commit_repo: pathlib.Path ) -> None: result = runner.invoke(cli, [ "code", "checkout-symbol", "billing_no_sep", "--commit", "HEAD", ]) assert result.exit_code != 0 def test_path_traversal_exits_nonzero( self, single_commit_repo: pathlib.Path ) -> None: result = runner.invoke(cli, [ "code", "checkout-symbol", "../../etc/passwd::fn", "--commit", "HEAD", ]) assert result.exit_code != 0 def test_invalid_commit_ref_exits_nonzero( self, single_commit_repo: pathlib.Path ) -> None: result = runner.invoke(cli, [ "code", "checkout-symbol", "utils.py::greet", "--commit", "no_such_ref", ]) assert result.exit_code != 0 def test_file_not_in_snapshot_exits_nonzero( self, single_commit_repo: pathlib.Path ) -> None: result = runner.invoke(cli, [ "code", "checkout-symbol", "nonexistent.py::fn", "--commit", "HEAD", ]) assert result.exit_code != 0 def test_symbol_not_in_snapshot_exits_nonzero( self, single_commit_repo: pathlib.Path ) -> None: result = runner.invoke(cli, [ "code", "checkout-symbol", "utils.py::no_such_fn", "--commit", "HEAD", ]) assert result.exit_code != 0 def test_missing_repo_exits_nonzero( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) result = runner.invoke(cli, [ "code", "checkout-symbol", "utils.py::fn", "--commit", "HEAD", ]) assert result.exit_code != 0 def test_text_output_contains_restoring( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo result = runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) assert result.exit_code == 0 assert "Restoring" in result.output or "already matches" in result.output def test_dry_run_does_not_write_file( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: repo, address, file_path = two_commit_repo before = (repo / file_path).read_text() result = runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run", ]) assert result.exit_code == 0 after = (repo / file_path).read_text() assert before == after, "dry-run must not modify the file" def test_dry_run_output_contains_diff_markers( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo result = runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run", ]) assert result.exit_code == 0 assert "---" in result.output or "+++" in result.output or "already matches" in result.output # --------------------------------------------------------------------------- # Integration — JSON schema # --------------------------------------------------------------------------- class TestCheckoutSymbolJSONSchema: def test_json_has_all_required_keys( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) required = { "address", "file", "branch", "restored_from", "dry_run", "changed", "appended", "current_start", "current_end", "historical_line_count", "diff_lines", } assert required <= data.keys() def test_json_schema_version_nonempty( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert data["schema"] def test_json_branch_nonempty( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert isinstance(data["branch"], str) and data["branch"] def test_json_restored_from_is_short_id( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) # short_id() returns "sha256:<12 hex chars>" — 19 chars total assert isinstance(data["restored_from"], str) assert data["restored_from"].startswith("sha256:") hex_part = data["restored_from"][len("sha256:"):] assert all(c in "0123456789abcdef" for c in hex_part) def test_json_changed_is_bool( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert isinstance(data["changed"], bool) def test_json_appended_is_bool( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert isinstance(data["appended"], bool) def test_json_historical_line_count_is_int( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert isinstance(data["historical_line_count"], int) assert data["historical_line_count"] > 0 def test_json_dry_run_false_when_not_dry_run( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert data["dry_run"] is False def test_json_dry_run_true_and_file_unchanged( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: repo, address, file_path = two_commit_repo before = (repo / file_path).read_text() data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"]) assert data["dry_run"] is True assert (repo / file_path).read_text() == before def test_json_dry_run_includes_diff_lines( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"]) # If there is a real change, diff_lines must be non-empty. if data["changed"]: assert isinstance(data["diff_lines"], list) assert len(data["diff_lines"]) > 0 def test_json_diff_lines_empty_when_not_dry_run( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert data["diff_lines"] == [] # --------------------------------------------------------------------------- # E2E — real symbol restoration # --------------------------------------------------------------------------- class TestCheckoutSymbolE2E: def test_restore_brings_back_old_implementation( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: repo, address, file_path = two_commit_repo runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) content = (repo / file_path).read_text() # v1 returned sum(items), v2 returned sum(items) * 2 assert "sum(items)" in content assert "sum(items) * 2" not in content def test_surrounding_functions_unchanged( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: """Critical: surgical restore must NOT touch header() or footer().""" repo, address, file_path = two_commit_repo runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) content = (repo / file_path).read_text() assert 'return "billing"' in content assert 'return "end"' in content def test_restore_is_surgical_correct_line_count( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: repo, address, file_path = two_commit_repo original_lines = (repo / file_path).read_text().splitlines() runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) restored_lines = (repo / file_path).read_text().splitlines() # Both versions have the same number of body lines. assert len(restored_lines) == len(original_lines) def test_regression_symbol_found_in_place_not_appended( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: """Bug regression: absolute-path lookup caused symbol to never be found, so every restore appended instead of replacing in-place.""" repo, address, file_path = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert data["appended"] is False, ( "Symbol exists in working tree — must replace in-place, not append" ) # File must not grow: appending adds lines, replacing keeps the count. content = (repo / file_path).read_text() # footer() must appear exactly once (not duplicated by append). assert content.count('def footer') == 1 def test_no_op_detection_changed_false( self, single_commit_repo: pathlib.Path ) -> None: """Restoring HEAD to HEAD is a no-op.""" data = _invoke_json(["utils.py::greet", "--commit", "HEAD"]) assert data["changed"] is False def test_no_op_file_not_written( self, single_commit_repo: pathlib.Path ) -> None: before = (single_commit_repo / "utils.py").read_text() _invoke_json(["utils.py::greet", "--commit", "HEAD"]) after = (single_commit_repo / "utils.py").read_text() assert before == after def test_no_op_idempotent( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: repo, address, file_path = two_commit_repo runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) content_after_first = (repo / file_path).read_text() runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) content_after_second = (repo / file_path).read_text() assert content_after_first == content_after_second def test_dry_run_diff_matches_actual_change( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: """The dry-run diff, when applied to the current file, yields the result that the real restore would produce.""" repo, address, file_path = two_commit_repo dry_data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"]) # Now actually restore. runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) restored = (repo / file_path).read_text() # The dry-run diff_lines are unified diff lines — their `+++` side # represents the post-restore content. We verify the symbol now matches. assert "sum(items)" in restored assert dry_data["changed"] is True def test_appended_when_symbol_absent_from_working_tree( self, repo: pathlib.Path ) -> None: """Symbol exists in history but not in the current working tree → appended.""" (repo / "mod.py").write_text(textwrap.dedent("""\ def alpha(): return 1 def beta(): return 2 """)) runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", "add both"]) assert r.exit_code == 0, r.output # Remove beta from the working tree and commit. (repo / "mod.py").write_text(textwrap.dedent("""\ def alpha(): return 1 """)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "remove beta"]) assert r2.exit_code == 0, r2.output data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"]) assert data["appended"] is True content = (repo / "mod.py").read_text() assert "def beta" in content def test_restored_symbol_parseable_after_write( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: """After restore, parse_symbols must still find the symbol.""" repo, address, file_path = two_commit_repo runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) raw = (repo / file_path).read_bytes() tree = parse_symbols(raw, file_path) assert address in tree, f"Symbol {address} not parseable after restore" def test_json_current_start_matches_actual_line( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: repo, address, file_path = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) if data["changed"] and not data["appended"]: content = (repo / file_path).read_text().splitlines() start = data["current_start"] assert 1 <= start <= len(content), f"current_start {start} out of range" # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestCheckoutSymbolStress: def test_restore_in_large_file(self, repo: pathlib.Path) -> None: """1 000-line file: only the target symbol lines change.""" # Build a file with 200 dummy functions + our target. lines = ["def fn_{}():\n return {}\n\n".format(i, i) for i in range(200)] lines.insert(100, "def target():\n return 'v1'\n\n") (repo / "big.py").write_text("".join(lines)) runner.invoke(cli, ["code", "add", "."]) r1 = runner.invoke(cli, ["commit", "-m", "v1"]) assert r1.exit_code == 0, r1.output # Modify just target in v2. lines2 = list(lines) lines2[100] = "def target():\n return 'v2'\n\n" (repo / "big.py").write_text("".join(lines2)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "v2"]) assert r2.exit_code == 0, r2.output before_lines = (repo / "big.py").read_text().splitlines() runner.invoke(cli, [ "code", "checkout-symbol", "big.py::target", "--commit", "HEAD~1", ]) after_lines = (repo / "big.py").read_text().splitlines() assert len(before_lines) == len(after_lines), "No lines should be added/removed" # Only target changed. assert "'v1'" in "\n".join(after_lines) assert "'v2'" not in "\n".join(after_lines) # All other functions intact. assert sum(1 for l in after_lines if l.startswith("def fn_")) == 200 def test_repeated_restore_is_idempotent_and_fast( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: repo, address, file_path = two_commit_repo # First restore. runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) content_after_first = (repo / file_path).read_text() start = time.monotonic() for _ in range(10): runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) elapsed = time.monotonic() - start assert (repo / file_path).read_text() == content_after_first assert elapsed < 15.0, f"10 repeated restores took {elapsed:.1f}s — too slow" def test_restore_from_far_back_in_history(self, repo: pathlib.Path) -> None: """Symbol restored from a commit 10 steps back must match that version.""" body = (repo / "hist.py") for i in range(12): body.write_text(f"def fn():\n return {i}\n") runner.invoke(cli, ["code", "add", "."]) r = runner.invoke(cli, ["commit", "-m", f"v{i}"]) assert r.exit_code == 0, r.output runner.invoke(cli, [ "code", "checkout-symbol", "hist.py::fn", "--commit", "HEAD~10", ]) content = body.read_text() assert "return 1" in content # HEAD is v11 (i=11), HEAD~10 is v1 (i=1) # --------------------------------------------------------------------------- # Verification — post-write and dry-run preview # --------------------------------------------------------------------------- class TestCheckoutSymbolVerification: """Tests for the post-write verification and dry-run verified_preview.""" def test_json_verified_true_on_clean_restore( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert data["changed"] is True assert data.get("verified") is True def test_json_verified_true_on_no_op( self, single_commit_repo: pathlib.Path ) -> None: # Non-dry-run no-op: verified field is present, no write. data = _invoke_json(["utils.py::greet", "--commit", "HEAD"]) assert data["changed"] is False assert data.get("verified") is True def test_json_dry_run_no_op_has_verified_preview( self, single_commit_repo: pathlib.Path ) -> None: """dry-run on a no-op must still return verified_preview, not short-circuit. Agents routinely run --dry-run before every write. If the no-op path returns early without verified_preview, those pipelines break on a KeyError even though the command is logically correct. """ data = _invoke_json(["utils.py::greet", "--commit", "HEAD", "--dry-run"]) # changed is False (no-op), but dry-run must still emit verified_preview. assert data.get("changed") is False assert "verified_preview" in data, ( "dry-run no-op must include verified_preview — " "omitting it breaks agent pipelines that always inspect this field" ) assert data["verified_preview"] is True assert data.get("diff_lines") == [] def test_json_verified_preview_true_in_dry_run( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"]) assert data["dry_run"] is True assert "verified_preview" in data assert data.get("verified_preview") is True def test_json_verified_present_after_append( self, repo: pathlib.Path ) -> None: """verified must be True even when the symbol is appended to EOF.""" (repo / "mod.py").write_text(textwrap.dedent("""\ def alpha(): return 1 def beta(): return 2 """)) runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "v1"]) (repo / "mod.py").write_text("def alpha():\n return 1\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "drop beta"]) data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"]) assert data.get("appended") is True assert data.get("verified") is True def test_json_no_verified_preview_on_non_dry_run( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: """verified_preview must not appear on a live write — only dry-run has it.""" _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1"]) assert "verified_preview" not in data def test_json_no_verified_on_dry_run( self, two_commit_repo: tuple[pathlib.Path, str, str] ) -> None: """verified (write-path field) must not appear on dry-run output.""" _, address, _ = two_commit_repo data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"]) assert "verified" not in data def test_verified_false_triggers_warning( self, two_commit_repo: tuple[pathlib.Path, str, str], monkeypatch: pytest.MonkeyPatch, capfd: pytest.CaptureFixture[str], ) -> None: """When the post-write parse fails, verified=false and a warning is emitted.""" import muse.cli.commands.checkout_symbol as cs_mod call_count = 0 original = cs_mod._find_symbol_in_source def patched( source: bytes, file_rel: str, address: str ) -> SymbolRecord | None: nonlocal call_count call_count += 1 # First two calls: historical lookup + current lookup — behave normally. # Third call (post-write verification) — simulate parse failure. if call_count >= 3: return None return original(source, file_rel, address) monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched) _, address, _ = two_commit_repo result = runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", "--json", ]) assert result.exit_code == 0, result.output data: _CheckoutPayload = json.loads(result.output) assert data.get("verified") is False def test_verified_false_file_still_written( self, two_commit_repo: tuple[pathlib.Path, str, str], monkeypatch: pytest.MonkeyPatch, ) -> None: """Verification failure must not prevent the file from being written.""" import muse.cli.commands.checkout_symbol as cs_mod call_count = 0 original = cs_mod._find_symbol_in_source def patched( source: bytes, file_rel: str, address: str ) -> SymbolRecord | None: nonlocal call_count call_count += 1 if call_count >= 3: return None return original(source, file_rel, address) monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched) repo, address, file_path = two_commit_repo before = (repo / file_path).read_text() runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) after = (repo / file_path).read_text() # File must differ — verification failure must not roll back the write. assert before != after def test_empty_historical_lines_exits_before_write( self, two_commit_repo: tuple[pathlib.Path, str, str], monkeypatch: pytest.MonkeyPatch, ) -> None: """A corrupted snapshot producing zero lines must abort before writing.""" import muse.cli.commands.checkout_symbol as cs_mod monkeypatch.setattr(cs_mod, "_extract_lines", lambda *a, **kw: []) repo, address, file_path = two_commit_repo original_content = (repo / file_path).read_text() result = runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) assert result.exit_code != 0 # File must be completely untouched. assert (repo / file_path).read_text() == original_content def test_text_output_warns_when_not_verified( self, two_commit_repo: tuple[pathlib.Path, str, str], monkeypatch: pytest.MonkeyPatch, ) -> None: """Text mode must show a warning instead of ✅ when verification fails.""" import muse.cli.commands.checkout_symbol as cs_mod call_count = 0 original = cs_mod._find_symbol_in_source def patched( source: bytes, file_rel: str, address: str ) -> SymbolRecord | None: nonlocal call_count call_count += 1 if call_count >= 3: return None return original(source, file_rel, address) monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched) _, address, _ = two_commit_repo result = runner.invoke(cli, [ "code", "checkout-symbol", address, "--commit", "HEAD~1", ]) assert result.exit_code == 0 assert "verification failed" in result.output.lower() or "⚠️" in result.output # --------------------------------------------------------------------------- # Flag tests # --------------------------------------------------------------------------- import argparse as _argparse class TestRegisterFlags: def _parse(self, *args: str) -> _argparse.Namespace: from muse.cli.commands.checkout_symbol import register p = _argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["checkout-symbol", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse("foo.py::bar", "--commit", "HEAD") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json", "foo.py::bar", "--commit", "HEAD") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j", "foo.py::bar", "--commit", "HEAD") assert ns.json_out is True