gabriel / muse public
test_cmd_checkout_symbol.py python
909 lines 34.2 KB
Raw
1 """Tests for ``muse code checkout-symbol``.
2
3 Coverage layers
4 ---------------
5 Unit
6 _extract_lines — normal, out-of-bounds (high, low, swapped), empty source.
7 _find_symbol_in_source — hit, miss, uses repo-relative path (regression
8 guard for the absolute-path bug).
9
10 Integration (live repo, CliRunner)
11 Exits zero for valid restore.
12 JSON schema: all required keys present, correct types.
13 JSON: schema_version, branch, restored_from (8-char hex), changed, appended,
14 verified, verified_preview.
15 --dry-run: file not written, output contains diff markers.
16 --dry-run --json: diff_lines + verified_preview in JSON output.
17 No-op: symbol already matches — changed=false, file unchanged, verified=true.
18 Empty historical lines from corrupted snapshot → exits non-zero before write.
19 ADDRESS without '::' rejected (exit non-zero).
20 Path-traversal ADDRESS rejected (exit non-zero).
21 --commit invalid ref rejected (exit non-zero).
22 File not in historical snapshot exits non-zero.
23 Symbol not in historical snapshot exits non-zero.
24 Missing repo exits non-zero.
25 Text output contains expected lines.
26 Appended path: symbol absent from working tree → appended at EOF.
27
28 E2E (real symbol changes across commits)
29 Restore replaces correct lines — surrounding code unchanged.
30 Restore from HEAD~1 brings back previous implementation.
31 Bug fix: absolute-path lookup — symbol IS found in current working tree
32 (not silently appended every time).
33 No-op detection: re-running restore is idempotent.
34 Dry-run diff is accurate — applying it would yield the historical file.
35 Appended symbol can be found by parse_symbols after write.
36 File content equals expected bytes after restore.
37 verified=True on a clean restore.
38 verified_preview=True in dry-run for a valid restore.
39 verified=False triggers warning when splice is unresolvable (monkeypatched).
40 Post-write verification failure does not suppress the write.
41
42 Stress
43 Restore from commit far back in history: still correct.
44 Large file (1 000-line source): only symbol lines change.
45 Repeated restore is idempotent and fast.
46 """
47
48 from __future__ import annotations
49
50 import json
51 import pathlib
52 import textwrap
53 import time
54 from typing import TypedDict
55
56 import pytest
57 from muse.core.types import split_id
58 from tests.cli_test_helper import CliRunner
59
60 from muse.cli.commands.checkout_symbol import _extract_lines, _find_symbol_in_source
61 from muse.plugins.code.ast_parser import SymbolRecord, parse_symbols
62
63 cli = None
64 runner = CliRunner()
65
66
67 # ---------------------------------------------------------------------------
68 # Typed JSON payload
69 # ---------------------------------------------------------------------------
70
71
72 class _CheckoutPayload(TypedDict, total=False):
73 schema_version: str
74 address: str
75 file: str
76 branch: str
77 restored_from: str
78 dry_run: bool
79 changed: bool
80 appended: bool
81 current_start: int
82 current_end: int
83 historical_line_count: int
84 diff_lines: list[str]
85 verified: bool # present on write and no-op paths
86 verified_preview: bool # present on dry-run path only
87
88
89 # ---------------------------------------------------------------------------
90 # Helpers
91 # ---------------------------------------------------------------------------
92
93
94 def _invoke_json(args: list[str]) -> _CheckoutPayload:
95 result = runner.invoke(cli, ["code", "checkout-symbol"] + args + ["--json"])
96 assert result.exit_code == 0, result.output
97 raw: _CheckoutPayload = json.loads(result.output)
98 return raw
99
100
101 # ---------------------------------------------------------------------------
102 # Fixtures
103 # ---------------------------------------------------------------------------
104
105
106 @pytest.fixture
107 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
108 monkeypatch.chdir(tmp_path)
109 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
110 result = runner.invoke(cli, ["init", "--domain", "code"])
111 assert result.exit_code == 0, result.output
112 return tmp_path
113
114
115 @pytest.fixture
116 def two_commit_repo(repo: pathlib.Path) -> tuple[pathlib.Path, str, str]:
117 """Repo with two commits containing different implementations of compute().
118
119 commit 1 (HEAD~1): compute returns sum(items)
120 commit 2 (HEAD): compute returns sum(items) * 2
121 """
122 (repo / "billing.py").write_text(textwrap.dedent("""\
123 def header():
124 return "billing"
125
126 def compute(items):
127 return sum(items)
128
129 def footer():
130 return "end"
131 """))
132 r1 = runner.invoke(cli, ["commit", "-m", "v1"])
133 assert r1.exit_code == 0, r1.output
134
135 (repo / "billing.py").write_text(textwrap.dedent("""\
136 def header():
137 return "billing"
138
139 def compute(items):
140 return sum(items) * 2
141
142 def footer():
143 return "end"
144 """))
145 r2 = runner.invoke(cli, ["commit", "-m", "v2"])
146 assert r2.exit_code == 0, r2.output
147
148 return repo, "billing.py::compute", "billing.py"
149
150
151 @pytest.fixture
152 def single_commit_repo(repo: pathlib.Path) -> pathlib.Path:
153 """Minimal repo: one commit, one function."""
154 (repo / "utils.py").write_text(textwrap.dedent("""\
155 def greet(name):
156 return f"Hello, {name}"
157 """))
158 r = runner.invoke(cli, ["commit", "-m", "init"])
159 assert r.exit_code == 0, r.output
160 return repo
161
162
163 # ---------------------------------------------------------------------------
164 # Unit — _extract_lines
165 # ---------------------------------------------------------------------------
166
167
168 class TestExtractLines:
169 def _src(self, n: int = 5) -> bytes:
170 return "\n".join(f"line {i}" for i in range(1, n + 1)).encode()
171
172 def test_full_range(self) -> None:
173 src = self._src(3)
174 assert _extract_lines(src, 1, 3) == ["line 1\n", "line 2\n", "line 3"]
175
176 def test_single_line(self) -> None:
177 src = self._src(5)
178 result = _extract_lines(src, 3, 3)
179 assert len(result) == 1
180 assert "line 3" in result[0]
181
182 def test_middle_range(self) -> None:
183 src = self._src(5)
184 result = _extract_lines(src, 2, 4)
185 assert len(result) == 3
186
187 def test_out_of_bounds_end_returns_empty(self) -> None:
188 src = self._src(3)
189 result = _extract_lines(src, 1, 10)
190 assert result == []
191
192 def test_out_of_bounds_start_zero_returns_empty(self) -> None:
193 src = self._src(3)
194 result = _extract_lines(src, 0, 2)
195 assert result == []
196
197 def test_swapped_range_returns_empty(self) -> None:
198 src = self._src(5)
199 result = _extract_lines(src, 4, 2)
200 assert result == []
201
202 def test_empty_source_returns_empty(self) -> None:
203 result = _extract_lines(b"", 1, 1)
204 assert result == []
205
206 def test_last_line_no_trailing_newline(self) -> None:
207 src = b"a\nb\nc"
208 result = _extract_lines(src, 3, 3)
209 assert result == ["c"]
210
211 def test_keepends_true(self) -> None:
212 src = b"a\nb\nc\n"
213 result = _extract_lines(src, 1, 2)
214 assert result == ["a\n", "b\n"]
215
216
217 # ---------------------------------------------------------------------------
218 # Unit — _find_symbol_in_source
219 # ---------------------------------------------------------------------------
220
221
222 class TestFindSymbolInSource:
223 def _src(self) -> bytes:
224 return textwrap.dedent("""\
225 def alpha():
226 return 1
227
228 def beta():
229 return 2
230 """).encode()
231
232 def test_found_returns_record(self) -> None:
233 rec = _find_symbol_in_source(self._src(), "a.py", "a.py::alpha")
234 assert rec is not None
235 assert rec["name"] == "alpha"
236
237 def test_not_found_returns_none(self) -> None:
238 rec = _find_symbol_in_source(self._src(), "a.py", "a.py::missing")
239 assert rec is None
240
241 def test_uses_repo_relative_path_not_absolute(self) -> None:
242 """Regression guard: address must use repo-relative file_rel, not /abs/path."""
243 src = b"def fn():\n pass\n"
244 # Correct: repo-relative
245 rec_rel = _find_symbol_in_source(src, "src/mod.py", "src/mod.py::fn")
246 assert rec_rel is not None, "Should find symbol with repo-relative path"
247 # Wrong: absolute path — should NOT find it under the relative address
248 rec_abs = _find_symbol_in_source(src, "/abs/src/mod.py", "src/mod.py::fn")
249 assert rec_abs is None, "Absolute path prefix must not match relative address"
250
251 def test_second_symbol_found(self) -> None:
252 rec = _find_symbol_in_source(self._src(), "m.py", "m.py::beta")
253 assert rec is not None
254 assert rec["name"] == "beta"
255
256 def test_line_numbers_are_1_indexed(self) -> None:
257 src = b"def fn():\n return 1\n"
258 rec = _find_symbol_in_source(src, "f.py", "f.py::fn")
259 assert rec is not None
260 assert rec["lineno"] >= 1
261 assert rec["end_lineno"] >= rec["lineno"]
262
263
264 # ---------------------------------------------------------------------------
265 # Integration — basic CLI
266 # ---------------------------------------------------------------------------
267
268
269 class TestCheckoutSymbolBasic:
270 def test_restore_exits_zero(self, two_commit_repo: tuple[pathlib.Path, str, str]) -> None:
271 _, address, _ = two_commit_repo
272 result = runner.invoke(cli, [
273 "code", "checkout-symbol", address, "--commit", "HEAD~1",
274 ])
275 assert result.exit_code == 0, result.output
276
277 def test_no_address_separator_exits_nonzero(
278 self, single_commit_repo: pathlib.Path
279 ) -> None:
280 result = runner.invoke(cli, [
281 "code", "checkout-symbol", "billing_no_sep", "--commit", "HEAD",
282 ])
283 assert result.exit_code != 0
284
285 def test_path_traversal_exits_nonzero(
286 self, single_commit_repo: pathlib.Path
287 ) -> None:
288 result = runner.invoke(cli, [
289 "code", "checkout-symbol", "../../etc/passwd::fn", "--commit", "HEAD",
290 ])
291 assert result.exit_code != 0
292
293 def test_invalid_commit_ref_exits_nonzero(
294 self, single_commit_repo: pathlib.Path
295 ) -> None:
296 result = runner.invoke(cli, [
297 "code", "checkout-symbol", "utils.py::greet", "--commit", "no_such_ref",
298 ])
299 assert result.exit_code != 0
300
301 def test_file_not_in_snapshot_exits_nonzero(
302 self, single_commit_repo: pathlib.Path
303 ) -> None:
304 result = runner.invoke(cli, [
305 "code", "checkout-symbol", "nonexistent.py::fn", "--commit", "HEAD",
306 ])
307 assert result.exit_code != 0
308
309 def test_symbol_not_in_snapshot_exits_nonzero(
310 self, single_commit_repo: pathlib.Path
311 ) -> None:
312 result = runner.invoke(cli, [
313 "code", "checkout-symbol", "utils.py::no_such_fn", "--commit", "HEAD",
314 ])
315 assert result.exit_code != 0
316
317 def test_missing_repo_exits_nonzero(
318 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
319 ) -> None:
320 monkeypatch.chdir(tmp_path)
321 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
322 result = runner.invoke(cli, [
323 "code", "checkout-symbol", "utils.py::fn", "--commit", "HEAD",
324 ])
325 assert result.exit_code != 0
326
327 def test_text_output_contains_restoring(
328 self, two_commit_repo: tuple[pathlib.Path, str, str]
329 ) -> None:
330 _, address, _ = two_commit_repo
331 result = runner.invoke(cli, [
332 "code", "checkout-symbol", address, "--commit", "HEAD~1",
333 ])
334 assert result.exit_code == 0
335 assert "Restoring" in result.output or "already matches" in result.output
336
337 def test_dry_run_does_not_write_file(
338 self, two_commit_repo: tuple[pathlib.Path, str, str]
339 ) -> None:
340 repo, address, file_path = two_commit_repo
341 before = (repo / file_path).read_text()
342 result = runner.invoke(cli, [
343 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run",
344 ])
345 assert result.exit_code == 0
346 after = (repo / file_path).read_text()
347 assert before == after, "dry-run must not modify the file"
348
349 def test_dry_run_output_contains_diff_markers(
350 self, two_commit_repo: tuple[pathlib.Path, str, str]
351 ) -> None:
352 _, address, _ = two_commit_repo
353 result = runner.invoke(cli, [
354 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run",
355 ])
356 assert result.exit_code == 0
357 assert "---" in result.output or "+++" in result.output or "already matches" in result.output
358
359
360 # ---------------------------------------------------------------------------
361 # Integration — JSON schema
362 # ---------------------------------------------------------------------------
363
364
365 class TestCheckoutSymbolJSONSchema:
366 def test_json_has_all_required_keys(
367 self, two_commit_repo: tuple[pathlib.Path, str, str]
368 ) -> None:
369 _, address, _ = two_commit_repo
370 data = _invoke_json([address, "--commit", "HEAD~1"])
371 required = {
372 "address", "file", "branch", "restored_from",
373 "dry_run", "changed", "appended", "current_start", "current_end",
374 "historical_line_count", "diff_lines",
375 }
376 assert required <= data.keys()
377
378 def test_json_schema_version_nonempty(
379 self, two_commit_repo: tuple[pathlib.Path, str, str]
380 ) -> None:
381 _, address, _ = two_commit_repo
382 data = _invoke_json([address, "--commit", "HEAD~1"])
383 assert data["schema"]
384
385 def test_json_branch_nonempty(
386 self, two_commit_repo: tuple[pathlib.Path, str, str]
387 ) -> None:
388 _, address, _ = two_commit_repo
389 data = _invoke_json([address, "--commit", "HEAD~1"])
390 assert isinstance(data["branch"], str) and data["branch"]
391
392 def test_json_restored_from_is_short_id(
393 self, two_commit_repo: tuple[pathlib.Path, str, str]
394 ) -> None:
395 _, address, _ = two_commit_repo
396 data = _invoke_json([address, "--commit", "HEAD~1"])
397 # short_id() returns "sha256:<12 hex chars>" — 19 chars total
398 assert isinstance(data["restored_from"], str)
399 assert data["restored_from"].startswith("sha256:")
400 hex_part = data["restored_from"][len("sha256:"):]
401 assert all(c in "0123456789abcdef" for c in hex_part)
402
403 def test_json_changed_is_bool(
404 self, two_commit_repo: tuple[pathlib.Path, str, str]
405 ) -> None:
406 _, address, _ = two_commit_repo
407 data = _invoke_json([address, "--commit", "HEAD~1"])
408 assert isinstance(data["changed"], bool)
409
410 def test_json_appended_is_bool(
411 self, two_commit_repo: tuple[pathlib.Path, str, str]
412 ) -> None:
413 _, address, _ = two_commit_repo
414 data = _invoke_json([address, "--commit", "HEAD~1"])
415 assert isinstance(data["appended"], bool)
416
417 def test_json_historical_line_count_is_int(
418 self, two_commit_repo: tuple[pathlib.Path, str, str]
419 ) -> None:
420 _, address, _ = two_commit_repo
421 data = _invoke_json([address, "--commit", "HEAD~1"])
422 assert isinstance(data["historical_line_count"], int)
423 assert data["historical_line_count"] > 0
424
425 def test_json_dry_run_false_when_not_dry_run(
426 self, two_commit_repo: tuple[pathlib.Path, str, str]
427 ) -> None:
428 _, address, _ = two_commit_repo
429 data = _invoke_json([address, "--commit", "HEAD~1"])
430 assert data["dry_run"] is False
431
432 def test_json_dry_run_true_and_file_unchanged(
433 self, two_commit_repo: tuple[pathlib.Path, str, str]
434 ) -> None:
435 repo, address, file_path = two_commit_repo
436 before = (repo / file_path).read_text()
437 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
438 assert data["dry_run"] is True
439 assert (repo / file_path).read_text() == before
440
441 def test_json_dry_run_includes_diff_lines(
442 self, two_commit_repo: tuple[pathlib.Path, str, str]
443 ) -> None:
444 _, address, _ = two_commit_repo
445 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
446 # If there is a real change, diff_lines must be non-empty.
447 if data["changed"]:
448 assert isinstance(data["diff_lines"], list)
449 assert len(data["diff_lines"]) > 0
450
451 def test_json_diff_lines_empty_when_not_dry_run(
452 self, two_commit_repo: tuple[pathlib.Path, str, str]
453 ) -> None:
454 _, address, _ = two_commit_repo
455 data = _invoke_json([address, "--commit", "HEAD~1"])
456 assert data["diff_lines"] == []
457
458
459 # ---------------------------------------------------------------------------
460 # E2E — real symbol restoration
461 # ---------------------------------------------------------------------------
462
463
464 class TestCheckoutSymbolE2E:
465 def test_restore_brings_back_old_implementation(
466 self, two_commit_repo: tuple[pathlib.Path, str, str]
467 ) -> None:
468 repo, address, file_path = two_commit_repo
469 runner.invoke(cli, [
470 "code", "checkout-symbol", address, "--commit", "HEAD~1",
471 ])
472 content = (repo / file_path).read_text()
473 # v1 returned sum(items), v2 returned sum(items) * 2
474 assert "sum(items)" in content
475 assert "sum(items) * 2" not in content
476
477 def test_surrounding_functions_unchanged(
478 self, two_commit_repo: tuple[pathlib.Path, str, str]
479 ) -> None:
480 """Critical: surgical restore must NOT touch header() or footer()."""
481 repo, address, file_path = two_commit_repo
482 runner.invoke(cli, [
483 "code", "checkout-symbol", address, "--commit", "HEAD~1",
484 ])
485 content = (repo / file_path).read_text()
486 assert 'return "billing"' in content
487 assert 'return "end"' in content
488
489 def test_restore_is_surgical_correct_line_count(
490 self, two_commit_repo: tuple[pathlib.Path, str, str]
491 ) -> None:
492 repo, address, file_path = two_commit_repo
493 original_lines = (repo / file_path).read_text().splitlines()
494 runner.invoke(cli, [
495 "code", "checkout-symbol", address, "--commit", "HEAD~1",
496 ])
497 restored_lines = (repo / file_path).read_text().splitlines()
498 # Both versions have the same number of body lines.
499 assert len(restored_lines) == len(original_lines)
500
501 def test_regression_symbol_found_in_place_not_appended(
502 self, two_commit_repo: tuple[pathlib.Path, str, str]
503 ) -> None:
504 """Bug regression: absolute-path lookup caused symbol to never be found,
505 so every restore appended instead of replacing in-place."""
506 repo, address, file_path = two_commit_repo
507 data = _invoke_json([address, "--commit", "HEAD~1"])
508 assert data["appended"] is False, (
509 "Symbol exists in working tree — must replace in-place, not append"
510 )
511 # File must not grow: appending adds lines, replacing keeps the count.
512 content = (repo / file_path).read_text()
513 # footer() must appear exactly once (not duplicated by append).
514 assert content.count('def footer') == 1
515
516 def test_no_op_detection_changed_false(
517 self, single_commit_repo: pathlib.Path
518 ) -> None:
519 """Restoring HEAD to HEAD is a no-op."""
520 data = _invoke_json(["utils.py::greet", "--commit", "HEAD"])
521 assert data["changed"] is False
522
523 def test_no_op_file_not_written(
524 self, single_commit_repo: pathlib.Path
525 ) -> None:
526 before = (single_commit_repo / "utils.py").read_text()
527 _invoke_json(["utils.py::greet", "--commit", "HEAD"])
528 after = (single_commit_repo / "utils.py").read_text()
529 assert before == after
530
531 def test_no_op_idempotent(
532 self, two_commit_repo: tuple[pathlib.Path, str, str]
533 ) -> None:
534 repo, address, file_path = two_commit_repo
535 runner.invoke(cli, [
536 "code", "checkout-symbol", address, "--commit", "HEAD~1",
537 ])
538 content_after_first = (repo / file_path).read_text()
539 runner.invoke(cli, [
540 "code", "checkout-symbol", address, "--commit", "HEAD~1",
541 ])
542 content_after_second = (repo / file_path).read_text()
543 assert content_after_first == content_after_second
544
545 def test_dry_run_diff_matches_actual_change(
546 self, two_commit_repo: tuple[pathlib.Path, str, str]
547 ) -> None:
548 """The dry-run diff, when applied to the current file, yields the
549 result that the real restore would produce."""
550 repo, address, file_path = two_commit_repo
551 dry_data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
552 # Now actually restore.
553 runner.invoke(cli, [
554 "code", "checkout-symbol", address, "--commit", "HEAD~1",
555 ])
556 restored = (repo / file_path).read_text()
557 # The dry-run diff_lines are unified diff lines — their `+++` side
558 # represents the post-restore content. We verify the symbol now matches.
559 assert "sum(items)" in restored
560 assert dry_data["changed"] is True
561
562 def test_appended_when_symbol_absent_from_working_tree(
563 self, repo: pathlib.Path
564 ) -> None:
565 """Symbol exists in history but not in the current working tree → appended."""
566 (repo / "mod.py").write_text(textwrap.dedent("""\
567 def alpha():
568 return 1
569
570 def beta():
571 return 2
572 """))
573 r = runner.invoke(cli, ["commit", "-m", "add both"])
574 assert r.exit_code == 0, r.output
575
576 # Remove beta from the working tree and commit.
577 (repo / "mod.py").write_text(textwrap.dedent("""\
578 def alpha():
579 return 1
580 """))
581 r2 = runner.invoke(cli, ["commit", "-m", "remove beta"])
582 assert r2.exit_code == 0, r2.output
583
584 data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"])
585 assert data["appended"] is True
586 content = (repo / "mod.py").read_text()
587 assert "def beta" in content
588
589 def test_restored_symbol_parseable_after_write(
590 self, two_commit_repo: tuple[pathlib.Path, str, str]
591 ) -> None:
592 """After restore, parse_symbols must still find the symbol."""
593 repo, address, file_path = two_commit_repo
594 runner.invoke(cli, [
595 "code", "checkout-symbol", address, "--commit", "HEAD~1",
596 ])
597 raw = (repo / file_path).read_bytes()
598 tree = parse_symbols(raw, file_path)
599 assert address in tree, f"Symbol {address} not parseable after restore"
600
601 def test_json_current_start_matches_actual_line(
602 self, two_commit_repo: tuple[pathlib.Path, str, str]
603 ) -> None:
604 repo, address, file_path = two_commit_repo
605 data = _invoke_json([address, "--commit", "HEAD~1"])
606 if data["changed"] and not data["appended"]:
607 content = (repo / file_path).read_text().splitlines()
608 start = data["current_start"]
609 assert 1 <= start <= len(content), f"current_start {start} out of range"
610
611
612 # ---------------------------------------------------------------------------
613 # Stress
614 # ---------------------------------------------------------------------------
615
616
617 class TestCheckoutSymbolStress:
618 def test_restore_in_large_file(self, repo: pathlib.Path) -> None:
619 """1 000-line file: only the target symbol lines change."""
620 # Build a file with 200 dummy functions + our target.
621 lines = ["def fn_{}():\n return {}\n\n".format(i, i) for i in range(200)]
622 lines.insert(100, "def target():\n return 'v1'\n\n")
623 (repo / "big.py").write_text("".join(lines))
624 r1 = runner.invoke(cli, ["commit", "-m", "v1"])
625 assert r1.exit_code == 0, r1.output
626
627 # Modify just target in v2.
628 lines2 = list(lines)
629 lines2[100] = "def target():\n return 'v2'\n\n"
630 (repo / "big.py").write_text("".join(lines2))
631 r2 = runner.invoke(cli, ["commit", "-m", "v2"])
632 assert r2.exit_code == 0, r2.output
633
634 before_lines = (repo / "big.py").read_text().splitlines()
635 runner.invoke(cli, [
636 "code", "checkout-symbol", "big.py::target", "--commit", "HEAD~1",
637 ])
638 after_lines = (repo / "big.py").read_text().splitlines()
639
640 assert len(before_lines) == len(after_lines), "No lines should be added/removed"
641 # Only target changed.
642 assert "'v1'" in "\n".join(after_lines)
643 assert "'v2'" not in "\n".join(after_lines)
644 # All other functions intact.
645 assert sum(1 for l in after_lines if l.startswith("def fn_")) == 200
646
647 def test_repeated_restore_is_idempotent_and_fast(
648 self, two_commit_repo: tuple[pathlib.Path, str, str]
649 ) -> None:
650 repo, address, file_path = two_commit_repo
651 # First restore.
652 runner.invoke(cli, [
653 "code", "checkout-symbol", address, "--commit", "HEAD~1",
654 ])
655 content_after_first = (repo / file_path).read_text()
656
657 start = time.monotonic()
658 for _ in range(10):
659 runner.invoke(cli, [
660 "code", "checkout-symbol", address, "--commit", "HEAD~1",
661 ])
662 elapsed = time.monotonic() - start
663
664 assert (repo / file_path).read_text() == content_after_first
665 assert elapsed < 15.0, f"10 repeated restores took {elapsed:.1f}s — too slow"
666
667 def test_restore_from_far_back_in_history(self, repo: pathlib.Path) -> None:
668 """Symbol restored from a commit 10 steps back must match that version."""
669 body = (repo / "hist.py")
670 for i in range(12):
671 body.write_text(f"def fn():\n return {i}\n")
672 r = runner.invoke(cli, ["commit", "-m", f"v{i}"])
673 assert r.exit_code == 0, r.output
674
675 runner.invoke(cli, [
676 "code", "checkout-symbol", "hist.py::fn", "--commit", "HEAD~10",
677 ])
678 content = body.read_text()
679 assert "return 1" in content # HEAD is v11 (i=11), HEAD~10 is v1 (i=1)
680
681
682 # ---------------------------------------------------------------------------
683 # Verification — post-write and dry-run preview
684 # ---------------------------------------------------------------------------
685
686
687 class TestCheckoutSymbolVerification:
688 """Tests for the post-write verification and dry-run verified_preview."""
689
690 def test_json_verified_true_on_clean_restore(
691 self, two_commit_repo: tuple[pathlib.Path, str, str]
692 ) -> None:
693 _, address, _ = two_commit_repo
694 data = _invoke_json([address, "--commit", "HEAD~1"])
695 assert data["changed"] is True
696 assert data.get("verified") is True
697
698 def test_json_verified_true_on_no_op(
699 self, single_commit_repo: pathlib.Path
700 ) -> None:
701 # Non-dry-run no-op: verified field is present, no write.
702 data = _invoke_json(["utils.py::greet", "--commit", "HEAD"])
703 assert data["changed"] is False
704 assert data.get("verified") is True
705
706 def test_json_dry_run_no_op_has_verified_preview(
707 self, single_commit_repo: pathlib.Path
708 ) -> None:
709 """dry-run on a no-op must still return verified_preview, not short-circuit.
710
711 Agents routinely run --dry-run before every write. If the no-op path
712 returns early without verified_preview, those pipelines break on a
713 KeyError even though the command is logically correct.
714 """
715 data = _invoke_json(["utils.py::greet", "--commit", "HEAD", "--dry-run"])
716 # changed is False (no-op), but dry-run must still emit verified_preview.
717 assert data.get("changed") is False
718 assert "verified_preview" in data, (
719 "dry-run no-op must include verified_preview — "
720 "omitting it breaks agent pipelines that always inspect this field"
721 )
722 assert data["verified_preview"] is True
723 assert data.get("diff_lines") == []
724
725 def test_json_verified_preview_true_in_dry_run(
726 self, two_commit_repo: tuple[pathlib.Path, str, str]
727 ) -> None:
728 _, address, _ = two_commit_repo
729 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
730 assert data["dry_run"] is True
731 assert "verified_preview" in data
732 assert data.get("verified_preview") is True
733
734 def test_json_verified_present_after_append(
735 self, repo: pathlib.Path
736 ) -> None:
737 """verified must be True even when the symbol is appended to EOF."""
738 (repo / "mod.py").write_text(textwrap.dedent("""\
739 def alpha():
740 return 1
741
742 def beta():
743 return 2
744 """))
745 runner.invoke(cli, ["commit", "-m", "v1"])
746 (repo / "mod.py").write_text("def alpha():\n return 1\n")
747 runner.invoke(cli, ["commit", "-m", "drop beta"])
748
749 data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"])
750 assert data.get("appended") is True
751 assert data.get("verified") is True
752
753 def test_json_no_verified_preview_on_non_dry_run(
754 self, two_commit_repo: tuple[pathlib.Path, str, str]
755 ) -> None:
756 """verified_preview must not appear on a live write — only dry-run has it."""
757 _, address, _ = two_commit_repo
758 data = _invoke_json([address, "--commit", "HEAD~1"])
759 assert "verified_preview" not in data
760
761 def test_json_no_verified_on_dry_run(
762 self, two_commit_repo: tuple[pathlib.Path, str, str]
763 ) -> None:
764 """verified (write-path field) must not appear on dry-run output."""
765 _, address, _ = two_commit_repo
766 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
767 assert "verified" not in data
768
769 def test_verified_false_triggers_warning(
770 self,
771 two_commit_repo: tuple[pathlib.Path, str, str],
772 monkeypatch: pytest.MonkeyPatch,
773 capfd: pytest.CaptureFixture[str],
774 ) -> None:
775 """When the post-write parse fails, verified=false and a warning is emitted."""
776 import muse.cli.commands.checkout_symbol as cs_mod
777
778 call_count = 0
779 original = cs_mod._find_symbol_in_source
780
781 def patched(
782 source: bytes, file_rel: str, address: str
783 ) -> SymbolRecord | None:
784 nonlocal call_count
785 call_count += 1
786 # First two calls: historical lookup + current lookup — behave normally.
787 # Third call (post-write verification) — simulate parse failure.
788 if call_count >= 3:
789 return None
790 return original(source, file_rel, address)
791
792 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
793
794 _, address, _ = two_commit_repo
795 result = runner.invoke(cli, [
796 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--json",
797 ])
798 assert result.exit_code == 0, result.output
799 data: _CheckoutPayload = json.loads(result.output)
800 assert data.get("verified") is False
801
802 def test_verified_false_file_still_written(
803 self,
804 two_commit_repo: tuple[pathlib.Path, str, str],
805 monkeypatch: pytest.MonkeyPatch,
806 ) -> None:
807 """Verification failure must not prevent the file from being written."""
808 import muse.cli.commands.checkout_symbol as cs_mod
809
810 call_count = 0
811 original = cs_mod._find_symbol_in_source
812
813 def patched(
814 source: bytes, file_rel: str, address: str
815 ) -> SymbolRecord | None:
816 nonlocal call_count
817 call_count += 1
818 if call_count >= 3:
819 return None
820 return original(source, file_rel, address)
821
822 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
823
824 repo, address, file_path = two_commit_repo
825 before = (repo / file_path).read_text()
826 runner.invoke(cli, [
827 "code", "checkout-symbol", address, "--commit", "HEAD~1",
828 ])
829 after = (repo / file_path).read_text()
830 # File must differ — verification failure must not roll back the write.
831 assert before != after
832
833 def test_empty_historical_lines_exits_before_write(
834 self,
835 two_commit_repo: tuple[pathlib.Path, str, str],
836 monkeypatch: pytest.MonkeyPatch,
837 ) -> None:
838 """A corrupted snapshot producing zero lines must abort before writing."""
839 import muse.cli.commands.checkout_symbol as cs_mod
840
841 monkeypatch.setattr(cs_mod, "_extract_lines", lambda *a, **kw: [])
842
843 repo, address, file_path = two_commit_repo
844 original_content = (repo / file_path).read_text()
845
846 result = runner.invoke(cli, [
847 "code", "checkout-symbol", address, "--commit", "HEAD~1",
848 ])
849 assert result.exit_code != 0
850 # File must be completely untouched.
851 assert (repo / file_path).read_text() == original_content
852
853 def test_text_output_warns_when_not_verified(
854 self,
855 two_commit_repo: tuple[pathlib.Path, str, str],
856 monkeypatch: pytest.MonkeyPatch,
857 ) -> None:
858 """Text mode must show a warning instead of ✅ when verification fails."""
859 import muse.cli.commands.checkout_symbol as cs_mod
860
861 call_count = 0
862 original = cs_mod._find_symbol_in_source
863
864 def patched(
865 source: bytes, file_rel: str, address: str
866 ) -> SymbolRecord | None:
867 nonlocal call_count
868 call_count += 1
869 if call_count >= 3:
870 return None
871 return original(source, file_rel, address)
872
873 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
874
875 _, address, _ = two_commit_repo
876 result = runner.invoke(cli, [
877 "code", "checkout-symbol", address, "--commit", "HEAD~1",
878 ])
879 assert result.exit_code == 0
880 assert "verification failed" in result.output.lower() or "⚠️" in result.output
881
882
883 # ---------------------------------------------------------------------------
884 # Flag tests
885 # ---------------------------------------------------------------------------
886
887
888 import argparse as _argparse
889
890
891 class TestRegisterFlags:
892 def _parse(self, *args: str) -> _argparse.Namespace:
893 from muse.cli.commands.checkout_symbol import register
894 p = _argparse.ArgumentParser()
895 sub = p.add_subparsers()
896 register(sub)
897 return p.parse_args(["checkout-symbol", *args])
898
899 def test_default_json_out_is_false(self) -> None:
900 ns = self._parse("foo.py::bar", "--commit", "HEAD")
901 assert ns.json_out is False
902
903 def test_json_flag_sets_json_out(self) -> None:
904 ns = self._parse("--json", "foo.py::bar", "--commit", "HEAD")
905 assert ns.json_out is True
906
907 def test_j_shorthand_sets_json_out(self) -> None:
908 ns = self._parse("-j", "foo.py::bar", "--commit", "HEAD")
909 assert ns.json_out is True
File History 1 commit