gabriel / muse public
test_cmd_checkout_symbol.py python
876 lines 33.0 KB
Raw
1 """Tests for ``muse code checkout-symbol``.
2
3 Coverage layers
4 ---------------
5 Unit
6 _extract_lines — normal, out-of-bounds (high, low, swapped), empty source.
7 _find_symbol_in_source — hit, miss, uses repo-relative path (regression
8 guard for the absolute-path bug).
9
10 Integration (live repo, CliRunner)
11 Exits zero for valid restore.
12 JSON schema: all required keys present, correct types.
13 JSON: schema_version, branch, restored_from (8-char hex), changed, appended,
14 verified, verified_preview.
15 --dry-run: file not written, output contains diff markers.
16 --dry-run --json: diff_lines + verified_preview in JSON output.
17 No-op: symbol already matches — changed=false, file unchanged, verified=true.
18 Empty historical lines from corrupted snapshot → exits non-zero before write.
19 ADDRESS without '::' rejected (exit non-zero).
20 Path-traversal ADDRESS rejected (exit non-zero).
21 --commit invalid ref rejected (exit non-zero).
22 File not in historical snapshot exits non-zero.
23 Symbol not in historical snapshot exits non-zero.
24 Missing repo exits non-zero.
25 Text output contains expected lines.
26 Appended path: symbol absent from working tree → appended at EOF.
27
28 E2E (real symbol changes across commits)
29 Restore replaces correct lines — surrounding code unchanged.
30 Restore from HEAD~1 brings back previous implementation.
31 Bug fix: absolute-path lookup — symbol IS found in current working tree
32 (not silently appended every time).
33 No-op detection: re-running restore is idempotent.
34 Dry-run diff is accurate — applying it would yield the historical file.
35 Appended symbol can be found by parse_symbols after write.
36 File content equals expected bytes after restore.
37 verified=True on a clean restore.
38 verified_preview=True in dry-run for a valid restore.
39 verified=False triggers warning when splice is unresolvable (monkeypatched).
40 Post-write verification failure does not suppress the write.
41
42 Stress
43 Restore from commit far back in history: still correct.
44 Large file (1 000-line source): only symbol lines change.
45 Repeated restore is idempotent and fast.
46 """
47
48 from __future__ import annotations
49
50 import json
51 import pathlib
52 import textwrap
53 import time
54 from typing import TypedDict
55
56 import pytest
57 from tests.cli_test_helper import CliRunner
58
59 from muse.cli.commands.checkout_symbol import _extract_lines, _find_symbol_in_source
60 from muse.plugins.code.ast_parser import SymbolRecord, parse_symbols
61
62 cli = None
63 runner = CliRunner()
64
65
66 # ---------------------------------------------------------------------------
67 # Typed JSON payload
68 # ---------------------------------------------------------------------------
69
70
71 class _CheckoutPayload(TypedDict, total=False):
72 schema_version: str
73 address: str
74 file: str
75 branch: str
76 restored_from: str
77 dry_run: bool
78 changed: bool
79 appended: bool
80 current_start: int
81 current_end: int
82 historical_line_count: int
83 diff_lines: list[str]
84 verified: bool # present on write and no-op paths
85 verified_preview: bool # present on dry-run path only
86
87
88 # ---------------------------------------------------------------------------
89 # Helpers
90 # ---------------------------------------------------------------------------
91
92
93 def _invoke_json(args: list[str]) -> _CheckoutPayload:
94 result = runner.invoke(cli, ["code", "checkout-symbol"] + args + ["--json"])
95 assert result.exit_code == 0, result.output
96 raw: _CheckoutPayload = json.loads(result.output)
97 return raw
98
99
100 # ---------------------------------------------------------------------------
101 # Fixtures
102 # ---------------------------------------------------------------------------
103
104
105 @pytest.fixture
106 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
107 monkeypatch.chdir(tmp_path)
108 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
109 result = runner.invoke(cli, ["init", "--domain", "code"])
110 assert result.exit_code == 0, result.output
111 return tmp_path
112
113
114 @pytest.fixture
115 def two_commit_repo(repo: pathlib.Path) -> tuple[pathlib.Path, str, str]:
116 """Repo with two commits containing different implementations of compute().
117
118 commit 1 (HEAD~1): compute returns sum(items)
119 commit 2 (HEAD): compute returns sum(items) * 2
120 """
121 (repo / "billing.py").write_text(textwrap.dedent("""\
122 def header():
123 return "billing"
124
125 def compute(items):
126 return sum(items)
127
128 def footer():
129 return "end"
130 """))
131 r1 = runner.invoke(cli, ["commit", "-m", "v1"])
132 assert r1.exit_code == 0, r1.output
133
134 (repo / "billing.py").write_text(textwrap.dedent("""\
135 def header():
136 return "billing"
137
138 def compute(items):
139 return sum(items) * 2
140
141 def footer():
142 return "end"
143 """))
144 r2 = runner.invoke(cli, ["commit", "-m", "v2"])
145 assert r2.exit_code == 0, r2.output
146
147 return repo, "billing.py::compute", "billing.py"
148
149
150 @pytest.fixture
151 def single_commit_repo(repo: pathlib.Path) -> pathlib.Path:
152 """Minimal repo: one commit, one function."""
153 (repo / "utils.py").write_text(textwrap.dedent("""\
154 def greet(name):
155 return f"Hello, {name}"
156 """))
157 r = runner.invoke(cli, ["commit", "-m", "init"])
158 assert r.exit_code == 0, r.output
159 return repo
160
161
162 # ---------------------------------------------------------------------------
163 # Unit — _extract_lines
164 # ---------------------------------------------------------------------------
165
166
167 class TestExtractLines:
168 def _src(self, n: int = 5) -> bytes:
169 return "\n".join(f"line {i}" for i in range(1, n + 1)).encode()
170
171 def test_full_range(self) -> None:
172 src = self._src(3)
173 assert _extract_lines(src, 1, 3) == ["line 1\n", "line 2\n", "line 3"]
174
175 def test_single_line(self) -> None:
176 src = self._src(5)
177 result = _extract_lines(src, 3, 3)
178 assert len(result) == 1
179 assert "line 3" in result[0]
180
181 def test_middle_range(self) -> None:
182 src = self._src(5)
183 result = _extract_lines(src, 2, 4)
184 assert len(result) == 3
185
186 def test_out_of_bounds_end_returns_empty(self) -> None:
187 src = self._src(3)
188 result = _extract_lines(src, 1, 10)
189 assert result == []
190
191 def test_out_of_bounds_start_zero_returns_empty(self) -> None:
192 src = self._src(3)
193 result = _extract_lines(src, 0, 2)
194 assert result == []
195
196 def test_swapped_range_returns_empty(self) -> None:
197 src = self._src(5)
198 result = _extract_lines(src, 4, 2)
199 assert result == []
200
201 def test_empty_source_returns_empty(self) -> None:
202 result = _extract_lines(b"", 1, 1)
203 assert result == []
204
205 def test_last_line_no_trailing_newline(self) -> None:
206 src = b"a\nb\nc"
207 result = _extract_lines(src, 3, 3)
208 assert result == ["c"]
209
210 def test_keepends_true(self) -> None:
211 src = b"a\nb\nc\n"
212 result = _extract_lines(src, 1, 2)
213 assert result == ["a\n", "b\n"]
214
215
216 # ---------------------------------------------------------------------------
217 # Unit — _find_symbol_in_source
218 # ---------------------------------------------------------------------------
219
220
221 class TestFindSymbolInSource:
222 def _src(self) -> bytes:
223 return textwrap.dedent("""\
224 def alpha():
225 return 1
226
227 def beta():
228 return 2
229 """).encode()
230
231 def test_found_returns_record(self) -> None:
232 rec = _find_symbol_in_source(self._src(), "a.py", "a.py::alpha")
233 assert rec is not None
234 assert rec["name"] == "alpha"
235
236 def test_not_found_returns_none(self) -> None:
237 rec = _find_symbol_in_source(self._src(), "a.py", "a.py::missing")
238 assert rec is None
239
240 def test_uses_repo_relative_path_not_absolute(self) -> None:
241 """Regression guard: address must use repo-relative file_rel, not /abs/path."""
242 src = b"def fn():\n pass\n"
243 # Correct: repo-relative
244 rec_rel = _find_symbol_in_source(src, "src/mod.py", "src/mod.py::fn")
245 assert rec_rel is not None, "Should find symbol with repo-relative path"
246 # Wrong: absolute path — should NOT find it under the relative address
247 rec_abs = _find_symbol_in_source(src, "/abs/src/mod.py", "src/mod.py::fn")
248 assert rec_abs is None, "Absolute path prefix must not match relative address"
249
250 def test_second_symbol_found(self) -> None:
251 rec = _find_symbol_in_source(self._src(), "m.py", "m.py::beta")
252 assert rec is not None
253 assert rec["name"] == "beta"
254
255 def test_line_numbers_are_1_indexed(self) -> None:
256 src = b"def fn():\n return 1\n"
257 rec = _find_symbol_in_source(src, "f.py", "f.py::fn")
258 assert rec is not None
259 assert rec["lineno"] >= 1
260 assert rec["end_lineno"] >= rec["lineno"]
261
262
263 # ---------------------------------------------------------------------------
264 # Integration — basic CLI
265 # ---------------------------------------------------------------------------
266
267
268 class TestCheckoutSymbolBasic:
269 def test_restore_exits_zero(self, two_commit_repo: tuple[pathlib.Path, str, str]) -> None:
270 _, address, _ = two_commit_repo
271 result = runner.invoke(cli, [
272 "code", "checkout-symbol", address, "--commit", "HEAD~1",
273 ])
274 assert result.exit_code == 0, result.output
275
276 def test_no_address_separator_exits_nonzero(
277 self, single_commit_repo: pathlib.Path
278 ) -> None:
279 result = runner.invoke(cli, [
280 "code", "checkout-symbol", "billing_no_sep", "--commit", "HEAD",
281 ])
282 assert result.exit_code != 0
283
284 def test_path_traversal_exits_nonzero(
285 self, single_commit_repo: pathlib.Path
286 ) -> None:
287 result = runner.invoke(cli, [
288 "code", "checkout-symbol", "../../etc/passwd::fn", "--commit", "HEAD",
289 ])
290 assert result.exit_code != 0
291
292 def test_invalid_commit_ref_exits_nonzero(
293 self, single_commit_repo: pathlib.Path
294 ) -> None:
295 result = runner.invoke(cli, [
296 "code", "checkout-symbol", "utils.py::greet", "--commit", "no_such_ref",
297 ])
298 assert result.exit_code != 0
299
300 def test_file_not_in_snapshot_exits_nonzero(
301 self, single_commit_repo: pathlib.Path
302 ) -> None:
303 result = runner.invoke(cli, [
304 "code", "checkout-symbol", "nonexistent.py::fn", "--commit", "HEAD",
305 ])
306 assert result.exit_code != 0
307
308 def test_symbol_not_in_snapshot_exits_nonzero(
309 self, single_commit_repo: pathlib.Path
310 ) -> None:
311 result = runner.invoke(cli, [
312 "code", "checkout-symbol", "utils.py::no_such_fn", "--commit", "HEAD",
313 ])
314 assert result.exit_code != 0
315
316 def test_missing_repo_exits_nonzero(
317 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
318 ) -> None:
319 monkeypatch.chdir(tmp_path)
320 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
321 result = runner.invoke(cli, [
322 "code", "checkout-symbol", "utils.py::fn", "--commit", "HEAD",
323 ])
324 assert result.exit_code != 0
325
326 def test_text_output_contains_restoring(
327 self, two_commit_repo: tuple[pathlib.Path, str, str]
328 ) -> None:
329 _, address, _ = two_commit_repo
330 result = runner.invoke(cli, [
331 "code", "checkout-symbol", address, "--commit", "HEAD~1",
332 ])
333 assert result.exit_code == 0
334 assert "Restoring" in result.output or "already matches" in result.output
335
336 def test_dry_run_does_not_write_file(
337 self, two_commit_repo: tuple[pathlib.Path, str, str]
338 ) -> None:
339 repo, address, file_path = two_commit_repo
340 before = (repo / file_path).read_text()
341 result = runner.invoke(cli, [
342 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run",
343 ])
344 assert result.exit_code == 0
345 after = (repo / file_path).read_text()
346 assert before == after, "dry-run must not modify the file"
347
348 def test_dry_run_output_contains_diff_markers(
349 self, two_commit_repo: tuple[pathlib.Path, str, str]
350 ) -> None:
351 _, address, _ = two_commit_repo
352 result = runner.invoke(cli, [
353 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run",
354 ])
355 assert result.exit_code == 0
356 assert "---" in result.output or "+++" in result.output or "already matches" in result.output
357
358
359 # ---------------------------------------------------------------------------
360 # Integration — JSON schema
361 # ---------------------------------------------------------------------------
362
363
364 class TestCheckoutSymbolJSONSchema:
365 def test_json_has_all_required_keys(
366 self, two_commit_repo: tuple[pathlib.Path, str, str]
367 ) -> None:
368 _, address, _ = two_commit_repo
369 data = _invoke_json([address, "--commit", "HEAD~1"])
370 required = {
371 "schema_version", "address", "file", "branch", "restored_from",
372 "dry_run", "changed", "appended", "current_start", "current_end",
373 "historical_line_count", "diff_lines",
374 }
375 assert required <= data.keys()
376
377 def test_json_schema_version_nonempty(
378 self, two_commit_repo: tuple[pathlib.Path, str, str]
379 ) -> None:
380 _, address, _ = two_commit_repo
381 data = _invoke_json([address, "--commit", "HEAD~1"])
382 assert data["schema_version"]
383
384 def test_json_branch_nonempty(
385 self, two_commit_repo: tuple[pathlib.Path, str, str]
386 ) -> None:
387 _, address, _ = two_commit_repo
388 data = _invoke_json([address, "--commit", "HEAD~1"])
389 assert isinstance(data["branch"], str) and data["branch"]
390
391 def test_json_restored_from_is_8_hex(
392 self, two_commit_repo: tuple[pathlib.Path, str, str]
393 ) -> None:
394 _, address, _ = two_commit_repo
395 data = _invoke_json([address, "--commit", "HEAD~1"])
396 assert len(data["restored_from"]) == 8
397 assert all(c in "0123456789abcdef" for c in data["restored_from"])
398
399 def test_json_changed_is_bool(
400 self, two_commit_repo: tuple[pathlib.Path, str, str]
401 ) -> None:
402 _, address, _ = two_commit_repo
403 data = _invoke_json([address, "--commit", "HEAD~1"])
404 assert isinstance(data["changed"], bool)
405
406 def test_json_appended_is_bool(
407 self, two_commit_repo: tuple[pathlib.Path, str, str]
408 ) -> None:
409 _, address, _ = two_commit_repo
410 data = _invoke_json([address, "--commit", "HEAD~1"])
411 assert isinstance(data["appended"], bool)
412
413 def test_json_historical_line_count_is_int(
414 self, two_commit_repo: tuple[pathlib.Path, str, str]
415 ) -> None:
416 _, address, _ = two_commit_repo
417 data = _invoke_json([address, "--commit", "HEAD~1"])
418 assert isinstance(data["historical_line_count"], int)
419 assert data["historical_line_count"] > 0
420
421 def test_json_dry_run_false_when_not_dry_run(
422 self, two_commit_repo: tuple[pathlib.Path, str, str]
423 ) -> None:
424 _, address, _ = two_commit_repo
425 data = _invoke_json([address, "--commit", "HEAD~1"])
426 assert data["dry_run"] is False
427
428 def test_json_dry_run_true_and_file_unchanged(
429 self, two_commit_repo: tuple[pathlib.Path, str, str]
430 ) -> None:
431 repo, address, file_path = two_commit_repo
432 before = (repo / file_path).read_text()
433 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
434 assert data["dry_run"] is True
435 assert (repo / file_path).read_text() == before
436
437 def test_json_dry_run_includes_diff_lines(
438 self, two_commit_repo: tuple[pathlib.Path, str, str]
439 ) -> None:
440 _, address, _ = two_commit_repo
441 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
442 # If there is a real change, diff_lines must be non-empty.
443 if data["changed"]:
444 assert isinstance(data["diff_lines"], list)
445 assert len(data["diff_lines"]) > 0
446
447 def test_json_diff_lines_empty_when_not_dry_run(
448 self, two_commit_repo: tuple[pathlib.Path, str, str]
449 ) -> None:
450 _, address, _ = two_commit_repo
451 data = _invoke_json([address, "--commit", "HEAD~1"])
452 assert data["diff_lines"] == []
453
454
455 # ---------------------------------------------------------------------------
456 # E2E — real symbol restoration
457 # ---------------------------------------------------------------------------
458
459
460 class TestCheckoutSymbolE2E:
461 def test_restore_brings_back_old_implementation(
462 self, two_commit_repo: tuple[pathlib.Path, str, str]
463 ) -> None:
464 repo, address, file_path = two_commit_repo
465 runner.invoke(cli, [
466 "code", "checkout-symbol", address, "--commit", "HEAD~1",
467 ])
468 content = (repo / file_path).read_text()
469 # v1 returned sum(items), v2 returned sum(items) * 2
470 assert "sum(items)" in content
471 assert "sum(items) * 2" not in content
472
473 def test_surrounding_functions_unchanged(
474 self, two_commit_repo: tuple[pathlib.Path, str, str]
475 ) -> None:
476 """Critical: surgical restore must NOT touch header() or footer()."""
477 repo, address, file_path = two_commit_repo
478 runner.invoke(cli, [
479 "code", "checkout-symbol", address, "--commit", "HEAD~1",
480 ])
481 content = (repo / file_path).read_text()
482 assert 'return "billing"' in content
483 assert 'return "end"' in content
484
485 def test_restore_is_surgical_correct_line_count(
486 self, two_commit_repo: tuple[pathlib.Path, str, str]
487 ) -> None:
488 repo, address, file_path = two_commit_repo
489 original_lines = (repo / file_path).read_text().splitlines()
490 runner.invoke(cli, [
491 "code", "checkout-symbol", address, "--commit", "HEAD~1",
492 ])
493 restored_lines = (repo / file_path).read_text().splitlines()
494 # Both versions have the same number of body lines.
495 assert len(restored_lines) == len(original_lines)
496
497 def test_regression_symbol_found_in_place_not_appended(
498 self, two_commit_repo: tuple[pathlib.Path, str, str]
499 ) -> None:
500 """Bug regression: absolute-path lookup caused symbol to never be found,
501 so every restore appended instead of replacing in-place."""
502 repo, address, file_path = two_commit_repo
503 data = _invoke_json([address, "--commit", "HEAD~1"])
504 assert data["appended"] is False, (
505 "Symbol exists in working tree — must replace in-place, not append"
506 )
507 # File must not grow: appending adds lines, replacing keeps the count.
508 content = (repo / file_path).read_text()
509 # footer() must appear exactly once (not duplicated by append).
510 assert content.count('def footer') == 1
511
512 def test_no_op_detection_changed_false(
513 self, single_commit_repo: pathlib.Path
514 ) -> None:
515 """Restoring HEAD to HEAD is a no-op."""
516 data = _invoke_json(["utils.py::greet", "--commit", "HEAD"])
517 assert data["changed"] is False
518
519 def test_no_op_file_not_written(
520 self, single_commit_repo: pathlib.Path
521 ) -> None:
522 before = (single_commit_repo / "utils.py").read_text()
523 _invoke_json(["utils.py::greet", "--commit", "HEAD"])
524 after = (single_commit_repo / "utils.py").read_text()
525 assert before == after
526
527 def test_no_op_idempotent(
528 self, two_commit_repo: tuple[pathlib.Path, str, str]
529 ) -> None:
530 repo, address, file_path = two_commit_repo
531 runner.invoke(cli, [
532 "code", "checkout-symbol", address, "--commit", "HEAD~1",
533 ])
534 content_after_first = (repo / file_path).read_text()
535 runner.invoke(cli, [
536 "code", "checkout-symbol", address, "--commit", "HEAD~1",
537 ])
538 content_after_second = (repo / file_path).read_text()
539 assert content_after_first == content_after_second
540
541 def test_dry_run_diff_matches_actual_change(
542 self, two_commit_repo: tuple[pathlib.Path, str, str]
543 ) -> None:
544 """The dry-run diff, when applied to the current file, yields the
545 result that the real restore would produce."""
546 repo, address, file_path = two_commit_repo
547 dry_data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
548 # Now actually restore.
549 runner.invoke(cli, [
550 "code", "checkout-symbol", address, "--commit", "HEAD~1",
551 ])
552 restored = (repo / file_path).read_text()
553 # The dry-run diff_lines are unified diff lines — their `+++` side
554 # represents the post-restore content. We verify the symbol now matches.
555 assert "sum(items)" in restored
556 assert dry_data["changed"] is True
557
558 def test_appended_when_symbol_absent_from_working_tree(
559 self, repo: pathlib.Path
560 ) -> None:
561 """Symbol exists in history but not in the current working tree → appended."""
562 (repo / "mod.py").write_text(textwrap.dedent("""\
563 def alpha():
564 return 1
565
566 def beta():
567 return 2
568 """))
569 r = runner.invoke(cli, ["commit", "-m", "add both"])
570 assert r.exit_code == 0, r.output
571
572 # Remove beta from the working tree and commit.
573 (repo / "mod.py").write_text(textwrap.dedent("""\
574 def alpha():
575 return 1
576 """))
577 r2 = runner.invoke(cli, ["commit", "-m", "remove beta"])
578 assert r2.exit_code == 0, r2.output
579
580 data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"])
581 assert data["appended"] is True
582 content = (repo / "mod.py").read_text()
583 assert "def beta" in content
584
585 def test_restored_symbol_parseable_after_write(
586 self, two_commit_repo: tuple[pathlib.Path, str, str]
587 ) -> None:
588 """After restore, parse_symbols must still find the symbol."""
589 repo, address, file_path = two_commit_repo
590 runner.invoke(cli, [
591 "code", "checkout-symbol", address, "--commit", "HEAD~1",
592 ])
593 raw = (repo / file_path).read_bytes()
594 tree = parse_symbols(raw, file_path)
595 assert address in tree, f"Symbol {address} not parseable after restore"
596
597 def test_json_current_start_matches_actual_line(
598 self, two_commit_repo: tuple[pathlib.Path, str, str]
599 ) -> None:
600 repo, address, file_path = two_commit_repo
601 data = _invoke_json([address, "--commit", "HEAD~1"])
602 if data["changed"] and not data["appended"]:
603 content = (repo / file_path).read_text().splitlines()
604 start = data["current_start"]
605 assert 1 <= start <= len(content), f"current_start {start} out of range"
606
607
608 # ---------------------------------------------------------------------------
609 # Stress
610 # ---------------------------------------------------------------------------
611
612
613 class TestCheckoutSymbolStress:
614 def test_restore_in_large_file(self, repo: pathlib.Path) -> None:
615 """1 000-line file: only the target symbol lines change."""
616 # Build a file with 200 dummy functions + our target.
617 lines = ["def fn_{}():\n return {}\n\n".format(i, i) for i in range(200)]
618 lines.insert(100, "def target():\n return 'v1'\n\n")
619 (repo / "big.py").write_text("".join(lines))
620 r1 = runner.invoke(cli, ["commit", "-m", "v1"])
621 assert r1.exit_code == 0, r1.output
622
623 # Modify just target in v2.
624 lines2 = list(lines)
625 lines2[100] = "def target():\n return 'v2'\n\n"
626 (repo / "big.py").write_text("".join(lines2))
627 r2 = runner.invoke(cli, ["commit", "-m", "v2"])
628 assert r2.exit_code == 0, r2.output
629
630 before_lines = (repo / "big.py").read_text().splitlines()
631 runner.invoke(cli, [
632 "code", "checkout-symbol", "big.py::target", "--commit", "HEAD~1",
633 ])
634 after_lines = (repo / "big.py").read_text().splitlines()
635
636 assert len(before_lines) == len(after_lines), "No lines should be added/removed"
637 # Only target changed.
638 assert "'v1'" in "\n".join(after_lines)
639 assert "'v2'" not in "\n".join(after_lines)
640 # All other functions intact.
641 assert sum(1 for l in after_lines if l.startswith("def fn_")) == 200
642
643 def test_repeated_restore_is_idempotent_and_fast(
644 self, two_commit_repo: tuple[pathlib.Path, str, str]
645 ) -> None:
646 repo, address, file_path = two_commit_repo
647 # First restore.
648 runner.invoke(cli, [
649 "code", "checkout-symbol", address, "--commit", "HEAD~1",
650 ])
651 content_after_first = (repo / file_path).read_text()
652
653 start = time.monotonic()
654 for _ in range(10):
655 runner.invoke(cli, [
656 "code", "checkout-symbol", address, "--commit", "HEAD~1",
657 ])
658 elapsed = time.monotonic() - start
659
660 assert (repo / file_path).read_text() == content_after_first
661 assert elapsed < 15.0, f"10 repeated restores took {elapsed:.1f}s — too slow"
662
663 def test_restore_from_far_back_in_history(self, repo: pathlib.Path) -> None:
664 """Symbol restored from a commit 10 steps back must match that version."""
665 body = (repo / "hist.py")
666 for i in range(12):
667 body.write_text(f"def fn():\n return {i}\n")
668 r = runner.invoke(cli, ["commit", "-m", f"v{i}"])
669 assert r.exit_code == 0, r.output
670
671 runner.invoke(cli, [
672 "code", "checkout-symbol", "hist.py::fn", "--commit", "HEAD~10",
673 ])
674 content = body.read_text()
675 assert "return 1" in content # HEAD is v11 (i=11), HEAD~10 is v1 (i=1)
676
677
678 # ---------------------------------------------------------------------------
679 # Verification — post-write and dry-run preview
680 # ---------------------------------------------------------------------------
681
682
683 class TestCheckoutSymbolVerification:
684 """Tests for the post-write verification and dry-run verified_preview."""
685
686 def test_json_verified_true_on_clean_restore(
687 self, two_commit_repo: tuple[pathlib.Path, str, str]
688 ) -> None:
689 _, address, _ = two_commit_repo
690 data = _invoke_json([address, "--commit", "HEAD~1"])
691 assert data["changed"] is True
692 assert data.get("verified") is True
693
694 def test_json_verified_true_on_no_op(
695 self, single_commit_repo: pathlib.Path
696 ) -> None:
697 # Non-dry-run no-op: verified field is present, no write.
698 data = _invoke_json(["utils.py::greet", "--commit", "HEAD"])
699 assert data["changed"] is False
700 assert data.get("verified") is True
701
702 def test_json_dry_run_no_op_has_verified_preview(
703 self, single_commit_repo: pathlib.Path
704 ) -> None:
705 """dry-run on a no-op must still return verified_preview, not short-circuit.
706
707 Agents routinely run --dry-run before every write. If the no-op path
708 returns early without verified_preview, those pipelines break on a
709 KeyError even though the command is logically correct.
710 """
711 data = _invoke_json(["utils.py::greet", "--commit", "HEAD", "--dry-run"])
712 # changed is False (no-op), but dry-run must still emit verified_preview.
713 assert data.get("changed") is False
714 assert "verified_preview" in data, (
715 "dry-run no-op must include verified_preview — "
716 "omitting it breaks agent pipelines that always inspect this field"
717 )
718 assert data["verified_preview"] is True
719 assert data.get("diff_lines") == []
720
721 def test_json_verified_preview_true_in_dry_run(
722 self, two_commit_repo: tuple[pathlib.Path, str, str]
723 ) -> None:
724 _, address, _ = two_commit_repo
725 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
726 assert data["dry_run"] is True
727 assert "verified_preview" in data
728 assert data.get("verified_preview") is True
729
730 def test_json_verified_present_after_append(
731 self, repo: pathlib.Path
732 ) -> None:
733 """verified must be True even when the symbol is appended to EOF."""
734 (repo / "mod.py").write_text(textwrap.dedent("""\
735 def alpha():
736 return 1
737
738 def beta():
739 return 2
740 """))
741 runner.invoke(cli, ["commit", "-m", "v1"])
742 (repo / "mod.py").write_text("def alpha():\n return 1\n")
743 runner.invoke(cli, ["commit", "-m", "drop beta"])
744
745 data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"])
746 assert data.get("appended") is True
747 assert data.get("verified") is True
748
749 def test_json_no_verified_preview_on_non_dry_run(
750 self, two_commit_repo: tuple[pathlib.Path, str, str]
751 ) -> None:
752 """verified_preview must not appear on a live write — only dry-run has it."""
753 _, address, _ = two_commit_repo
754 data = _invoke_json([address, "--commit", "HEAD~1"])
755 assert "verified_preview" not in data
756
757 def test_json_no_verified_on_dry_run(
758 self, two_commit_repo: tuple[pathlib.Path, str, str]
759 ) -> None:
760 """verified (write-path field) must not appear on dry-run output."""
761 _, address, _ = two_commit_repo
762 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
763 assert "verified" not in data
764
765 def test_verified_false_triggers_warning(
766 self,
767 two_commit_repo: tuple[pathlib.Path, str, str],
768 monkeypatch: pytest.MonkeyPatch,
769 capfd: pytest.CaptureFixture[str],
770 ) -> None:
771 """When the post-write parse fails, verified=false and a warning is emitted."""
772 import muse.cli.commands.checkout_symbol as cs_mod
773
774 call_count = 0
775 original = cs_mod._find_symbol_in_source
776
777 def patched(
778 source: bytes, file_rel: str, address: str
779 ) -> SymbolRecord | None:
780 nonlocal call_count
781 call_count += 1
782 # First two calls: historical lookup + current lookup — behave normally.
783 # Third call (post-write verification) — simulate parse failure.
784 if call_count >= 3:
785 return None
786 return original(source, file_rel, address)
787
788 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
789
790 _, address, _ = two_commit_repo
791 result = runner.invoke(cli, [
792 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--json",
793 ])
794 assert result.exit_code == 0, result.output
795 data: _CheckoutPayload = json.loads(result.output)
796 assert data.get("verified") is False
797
798 def test_verified_false_file_still_written(
799 self,
800 two_commit_repo: tuple[pathlib.Path, str, str],
801 monkeypatch: pytest.MonkeyPatch,
802 ) -> None:
803 """Verification failure must not prevent the file from being written."""
804 import muse.cli.commands.checkout_symbol as cs_mod
805
806 call_count = 0
807 original = cs_mod._find_symbol_in_source
808
809 def patched(
810 source: bytes, file_rel: str, address: str
811 ) -> SymbolRecord | None:
812 nonlocal call_count
813 call_count += 1
814 if call_count >= 3:
815 return None
816 return original(source, file_rel, address)
817
818 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
819
820 repo, address, file_path = two_commit_repo
821 before = (repo / file_path).read_text()
822 runner.invoke(cli, [
823 "code", "checkout-symbol", address, "--commit", "HEAD~1",
824 ])
825 after = (repo / file_path).read_text()
826 # File must differ — verification failure must not roll back the write.
827 assert before != after
828
829 def test_empty_historical_lines_exits_before_write(
830 self,
831 two_commit_repo: tuple[pathlib.Path, str, str],
832 monkeypatch: pytest.MonkeyPatch,
833 ) -> None:
834 """A corrupted snapshot producing zero lines must abort before writing."""
835 import muse.cli.commands.checkout_symbol as cs_mod
836
837 monkeypatch.setattr(cs_mod, "_extract_lines", lambda *a, **kw: [])
838
839 repo, address, file_path = two_commit_repo
840 original_content = (repo / file_path).read_text()
841
842 result = runner.invoke(cli, [
843 "code", "checkout-symbol", address, "--commit", "HEAD~1",
844 ])
845 assert result.exit_code != 0
846 # File must be completely untouched.
847 assert (repo / file_path).read_text() == original_content
848
849 def test_text_output_warns_when_not_verified(
850 self,
851 two_commit_repo: tuple[pathlib.Path, str, str],
852 monkeypatch: pytest.MonkeyPatch,
853 ) -> None:
854 """Text mode must show a warning instead of ✅ when verification fails."""
855 import muse.cli.commands.checkout_symbol as cs_mod
856
857 call_count = 0
858 original = cs_mod._find_symbol_in_source
859
860 def patched(
861 source: bytes, file_rel: str, address: str
862 ) -> SymbolRecord | None:
863 nonlocal call_count
864 call_count += 1
865 if call_count >= 3:
866 return None
867 return original(source, file_rel, address)
868
869 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
870
871 _, address, _ = two_commit_repo
872 result = runner.invoke(cli, [
873 "code", "checkout-symbol", address, "--commit", "HEAD~1",
874 ])
875 assert result.exit_code == 0
876 assert "verification failed" in result.output.lower() or "⚠️" in result.output
File History 1 commit