gabriel / muse public
test_cmd_checkout_symbol.py python
918 lines 34.6 KB
Raw
1 """Tests for ``muse code checkout-symbol``.
2
3 Coverage layers
4 ---------------
5 Unit
6 _extract_lines — normal, out-of-bounds (high, low, swapped), empty source.
7 _find_symbol_in_source — hit, miss, uses repo-relative path (regression
8 guard for the absolute-path bug).
9
10 Integration (live repo, CliRunner)
11 Exits zero for valid restore.
12 JSON schema: all required keys present, correct types.
13 JSON: schema_version, branch, restored_from (8-char hex), changed, appended,
14 verified, verified_preview.
15 --dry-run: file not written, output contains diff markers.
16 --dry-run --json: diff_lines + verified_preview in JSON output.
17 No-op: symbol already matches — changed=false, file unchanged, verified=true.
18 Empty historical lines from corrupted snapshot → exits non-zero before write.
19 ADDRESS without '::' rejected (exit non-zero).
20 Path-traversal ADDRESS rejected (exit non-zero).
21 --commit invalid ref rejected (exit non-zero).
22 File not in historical snapshot exits non-zero.
23 Symbol not in historical snapshot exits non-zero.
24 Missing repo exits non-zero.
25 Text output contains expected lines.
26 Appended path: symbol absent from working tree → appended at EOF.
27
28 E2E (real symbol changes across commits)
29 Restore replaces correct lines — surrounding code unchanged.
30 Restore from HEAD~1 brings back previous implementation.
31 Bug fix: absolute-path lookup — symbol IS found in current working tree
32 (not silently appended every time).
33 No-op detection: re-running restore is idempotent.
34 Dry-run diff is accurate — applying it would yield the historical file.
35 Appended symbol can be found by parse_symbols after write.
36 File content equals expected bytes after restore.
37 verified=True on a clean restore.
38 verified_preview=True in dry-run for a valid restore.
39 verified=False triggers warning when splice is unresolvable (monkeypatched).
40 Post-write verification failure does not suppress the write.
41
42 Stress
43 Restore from commit far back in history: still correct.
44 Large file (1 000-line source): only symbol lines change.
45 Repeated restore is idempotent and fast.
46 """
47
48 from __future__ import annotations
49
50 import json
51 import pathlib
52 import textwrap
53 import time
54 from typing import TypedDict
55
56 import pytest
57 from muse.core.types import split_id
58 from tests.cli_test_helper import CliRunner
59
60 from muse.cli.commands.checkout_symbol import _extract_lines, _find_symbol_in_source
61 from muse.plugins.code.ast_parser import SymbolRecord, parse_symbols
62
63 cli = None
64 runner = CliRunner()
65
66
67 # ---------------------------------------------------------------------------
68 # Typed JSON payload
69 # ---------------------------------------------------------------------------
70
71
72 class _CheckoutPayload(TypedDict, total=False):
73 schema_version: str
74 address: str
75 file: str
76 branch: str
77 restored_from: str
78 dry_run: bool
79 changed: bool
80 appended: bool
81 current_start: int
82 current_end: int
83 historical_line_count: int
84 diff_lines: list[str]
85 verified: bool # present on write and no-op paths
86 verified_preview: bool # present on dry-run path only
87
88
89 # ---------------------------------------------------------------------------
90 # Helpers
91 # ---------------------------------------------------------------------------
92
93
94 def _invoke_json(args: list[str]) -> _CheckoutPayload:
95 result = runner.invoke(cli, ["code", "checkout-symbol"] + args + ["--json"])
96 assert result.exit_code == 0, result.output
97 raw: _CheckoutPayload = json.loads(result.output)
98 return raw
99
100
101 # ---------------------------------------------------------------------------
102 # Fixtures
103 # ---------------------------------------------------------------------------
104
105
106 @pytest.fixture
107 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
108 monkeypatch.chdir(tmp_path)
109 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
110 result = runner.invoke(cli, ["init", "--domain", "code"])
111 assert result.exit_code == 0, result.output
112 return tmp_path
113
114
115 @pytest.fixture
116 def two_commit_repo(repo: pathlib.Path) -> tuple[pathlib.Path, str, str]:
117 """Repo with two commits containing different implementations of compute().
118
119 commit 1 (HEAD~1): compute returns sum(items)
120 commit 2 (HEAD): compute returns sum(items) * 2
121 """
122 (repo / "billing.py").write_text(textwrap.dedent("""\
123 def header():
124 return "billing"
125
126 def compute(items):
127 return sum(items)
128
129 def footer():
130 return "end"
131 """))
132 runner.invoke(cli, ["code", "add", "."])
133 r1 = runner.invoke(cli, ["commit", "-m", "v1"])
134 assert r1.exit_code == 0, r1.output
135
136 (repo / "billing.py").write_text(textwrap.dedent("""\
137 def header():
138 return "billing"
139
140 def compute(items):
141 return sum(items) * 2
142
143 def footer():
144 return "end"
145 """))
146 runner.invoke(cli, ["code", "add", "."])
147 r2 = runner.invoke(cli, ["commit", "-m", "v2"])
148 assert r2.exit_code == 0, r2.output
149
150 return repo, "billing.py::compute", "billing.py"
151
152
153 @pytest.fixture
154 def single_commit_repo(repo: pathlib.Path) -> pathlib.Path:
155 """Minimal repo: one commit, one function."""
156 (repo / "utils.py").write_text(textwrap.dedent("""\
157 def greet(name):
158 return f"Hello, {name}"
159 """))
160 r = runner.invoke(cli, ["commit", "-m", "init"])
161 assert r.exit_code == 0, r.output
162 return repo
163
164
165 # ---------------------------------------------------------------------------
166 # Unit — _extract_lines
167 # ---------------------------------------------------------------------------
168
169
170 class TestExtractLines:
171 def _src(self, n: int = 5) -> bytes:
172 return "\n".join(f"line {i}" for i in range(1, n + 1)).encode()
173
174 def test_full_range(self) -> None:
175 src = self._src(3)
176 assert _extract_lines(src, 1, 3) == ["line 1\n", "line 2\n", "line 3"]
177
178 def test_single_line(self) -> None:
179 src = self._src(5)
180 result = _extract_lines(src, 3, 3)
181 assert len(result) == 1
182 assert "line 3" in result[0]
183
184 def test_middle_range(self) -> None:
185 src = self._src(5)
186 result = _extract_lines(src, 2, 4)
187 assert len(result) == 3
188
189 def test_out_of_bounds_end_returns_empty(self) -> None:
190 src = self._src(3)
191 result = _extract_lines(src, 1, 10)
192 assert result == []
193
194 def test_out_of_bounds_start_zero_returns_empty(self) -> None:
195 src = self._src(3)
196 result = _extract_lines(src, 0, 2)
197 assert result == []
198
199 def test_swapped_range_returns_empty(self) -> None:
200 src = self._src(5)
201 result = _extract_lines(src, 4, 2)
202 assert result == []
203
204 def test_empty_source_returns_empty(self) -> None:
205 result = _extract_lines(b"", 1, 1)
206 assert result == []
207
208 def test_last_line_no_trailing_newline(self) -> None:
209 src = b"a\nb\nc"
210 result = _extract_lines(src, 3, 3)
211 assert result == ["c"]
212
213 def test_keepends_true(self) -> None:
214 src = b"a\nb\nc\n"
215 result = _extract_lines(src, 1, 2)
216 assert result == ["a\n", "b\n"]
217
218
219 # ---------------------------------------------------------------------------
220 # Unit — _find_symbol_in_source
221 # ---------------------------------------------------------------------------
222
223
224 class TestFindSymbolInSource:
225 def _src(self) -> bytes:
226 return textwrap.dedent("""\
227 def alpha():
228 return 1
229
230 def beta():
231 return 2
232 """).encode()
233
234 def test_found_returns_record(self) -> None:
235 rec = _find_symbol_in_source(self._src(), "a.py", "a.py::alpha")
236 assert rec is not None
237 assert rec["name"] == "alpha"
238
239 def test_not_found_returns_none(self) -> None:
240 rec = _find_symbol_in_source(self._src(), "a.py", "a.py::missing")
241 assert rec is None
242
243 def test_uses_repo_relative_path_not_absolute(self) -> None:
244 """Regression guard: address must use repo-relative file_rel, not /abs/path."""
245 src = b"def fn():\n pass\n"
246 # Correct: repo-relative
247 rec_rel = _find_symbol_in_source(src, "src/mod.py", "src/mod.py::fn")
248 assert rec_rel is not None, "Should find symbol with repo-relative path"
249 # Wrong: absolute path — should NOT find it under the relative address
250 rec_abs = _find_symbol_in_source(src, "/abs/src/mod.py", "src/mod.py::fn")
251 assert rec_abs is None, "Absolute path prefix must not match relative address"
252
253 def test_second_symbol_found(self) -> None:
254 rec = _find_symbol_in_source(self._src(), "m.py", "m.py::beta")
255 assert rec is not None
256 assert rec["name"] == "beta"
257
258 def test_line_numbers_are_1_indexed(self) -> None:
259 src = b"def fn():\n return 1\n"
260 rec = _find_symbol_in_source(src, "f.py", "f.py::fn")
261 assert rec is not None
262 assert rec["lineno"] >= 1
263 assert rec["end_lineno"] >= rec["lineno"]
264
265
266 # ---------------------------------------------------------------------------
267 # Integration — basic CLI
268 # ---------------------------------------------------------------------------
269
270
271 class TestCheckoutSymbolBasic:
272 def test_restore_exits_zero(self, two_commit_repo: tuple[pathlib.Path, str, str]) -> None:
273 _, address, _ = two_commit_repo
274 result = runner.invoke(cli, [
275 "code", "checkout-symbol", address, "--commit", "HEAD~1",
276 ])
277 assert result.exit_code == 0, result.output
278
279 def test_no_address_separator_exits_nonzero(
280 self, single_commit_repo: pathlib.Path
281 ) -> None:
282 result = runner.invoke(cli, [
283 "code", "checkout-symbol", "billing_no_sep", "--commit", "HEAD",
284 ])
285 assert result.exit_code != 0
286
287 def test_path_traversal_exits_nonzero(
288 self, single_commit_repo: pathlib.Path
289 ) -> None:
290 result = runner.invoke(cli, [
291 "code", "checkout-symbol", "../../etc/passwd::fn", "--commit", "HEAD",
292 ])
293 assert result.exit_code != 0
294
295 def test_invalid_commit_ref_exits_nonzero(
296 self, single_commit_repo: pathlib.Path
297 ) -> None:
298 result = runner.invoke(cli, [
299 "code", "checkout-symbol", "utils.py::greet", "--commit", "no_such_ref",
300 ])
301 assert result.exit_code != 0
302
303 def test_file_not_in_snapshot_exits_nonzero(
304 self, single_commit_repo: pathlib.Path
305 ) -> None:
306 result = runner.invoke(cli, [
307 "code", "checkout-symbol", "nonexistent.py::fn", "--commit", "HEAD",
308 ])
309 assert result.exit_code != 0
310
311 def test_symbol_not_in_snapshot_exits_nonzero(
312 self, single_commit_repo: pathlib.Path
313 ) -> None:
314 result = runner.invoke(cli, [
315 "code", "checkout-symbol", "utils.py::no_such_fn", "--commit", "HEAD",
316 ])
317 assert result.exit_code != 0
318
319 def test_missing_repo_exits_nonzero(
320 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
321 ) -> None:
322 monkeypatch.chdir(tmp_path)
323 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
324 result = runner.invoke(cli, [
325 "code", "checkout-symbol", "utils.py::fn", "--commit", "HEAD",
326 ])
327 assert result.exit_code != 0
328
329 def test_text_output_contains_restoring(
330 self, two_commit_repo: tuple[pathlib.Path, str, str]
331 ) -> None:
332 _, address, _ = two_commit_repo
333 result = runner.invoke(cli, [
334 "code", "checkout-symbol", address, "--commit", "HEAD~1",
335 ])
336 assert result.exit_code == 0
337 assert "Restoring" in result.output or "already matches" in result.output
338
339 def test_dry_run_does_not_write_file(
340 self, two_commit_repo: tuple[pathlib.Path, str, str]
341 ) -> None:
342 repo, address, file_path = two_commit_repo
343 before = (repo / file_path).read_text()
344 result = runner.invoke(cli, [
345 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run",
346 ])
347 assert result.exit_code == 0
348 after = (repo / file_path).read_text()
349 assert before == after, "dry-run must not modify the file"
350
351 def test_dry_run_output_contains_diff_markers(
352 self, two_commit_repo: tuple[pathlib.Path, str, str]
353 ) -> None:
354 _, address, _ = two_commit_repo
355 result = runner.invoke(cli, [
356 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--dry-run",
357 ])
358 assert result.exit_code == 0
359 assert "---" in result.output or "+++" in result.output or "already matches" in result.output
360
361
362 # ---------------------------------------------------------------------------
363 # Integration — JSON schema
364 # ---------------------------------------------------------------------------
365
366
367 class TestCheckoutSymbolJSONSchema:
368 def test_json_has_all_required_keys(
369 self, two_commit_repo: tuple[pathlib.Path, str, str]
370 ) -> None:
371 _, address, _ = two_commit_repo
372 data = _invoke_json([address, "--commit", "HEAD~1"])
373 required = {
374 "address", "file", "branch", "restored_from",
375 "dry_run", "changed", "appended", "current_start", "current_end",
376 "historical_line_count", "diff_lines",
377 }
378 assert required <= data.keys()
379
380 def test_json_schema_version_nonempty(
381 self, two_commit_repo: tuple[pathlib.Path, str, str]
382 ) -> None:
383 _, address, _ = two_commit_repo
384 data = _invoke_json([address, "--commit", "HEAD~1"])
385 assert data["schema"]
386
387 def test_json_branch_nonempty(
388 self, two_commit_repo: tuple[pathlib.Path, str, str]
389 ) -> None:
390 _, address, _ = two_commit_repo
391 data = _invoke_json([address, "--commit", "HEAD~1"])
392 assert isinstance(data["branch"], str) and data["branch"]
393
394 def test_json_restored_from_is_short_id(
395 self, two_commit_repo: tuple[pathlib.Path, str, str]
396 ) -> None:
397 _, address, _ = two_commit_repo
398 data = _invoke_json([address, "--commit", "HEAD~1"])
399 # short_id() returns "sha256:<12 hex chars>" — 19 chars total
400 assert isinstance(data["restored_from"], str)
401 assert data["restored_from"].startswith("sha256:")
402 hex_part = data["restored_from"][len("sha256:"):]
403 assert all(c in "0123456789abcdef" for c in hex_part)
404
405 def test_json_changed_is_bool(
406 self, two_commit_repo: tuple[pathlib.Path, str, str]
407 ) -> None:
408 _, address, _ = two_commit_repo
409 data = _invoke_json([address, "--commit", "HEAD~1"])
410 assert isinstance(data["changed"], bool)
411
412 def test_json_appended_is_bool(
413 self, two_commit_repo: tuple[pathlib.Path, str, str]
414 ) -> None:
415 _, address, _ = two_commit_repo
416 data = _invoke_json([address, "--commit", "HEAD~1"])
417 assert isinstance(data["appended"], bool)
418
419 def test_json_historical_line_count_is_int(
420 self, two_commit_repo: tuple[pathlib.Path, str, str]
421 ) -> None:
422 _, address, _ = two_commit_repo
423 data = _invoke_json([address, "--commit", "HEAD~1"])
424 assert isinstance(data["historical_line_count"], int)
425 assert data["historical_line_count"] > 0
426
427 def test_json_dry_run_false_when_not_dry_run(
428 self, two_commit_repo: tuple[pathlib.Path, str, str]
429 ) -> None:
430 _, address, _ = two_commit_repo
431 data = _invoke_json([address, "--commit", "HEAD~1"])
432 assert data["dry_run"] is False
433
434 def test_json_dry_run_true_and_file_unchanged(
435 self, two_commit_repo: tuple[pathlib.Path, str, str]
436 ) -> None:
437 repo, address, file_path = two_commit_repo
438 before = (repo / file_path).read_text()
439 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
440 assert data["dry_run"] is True
441 assert (repo / file_path).read_text() == before
442
443 def test_json_dry_run_includes_diff_lines(
444 self, two_commit_repo: tuple[pathlib.Path, str, str]
445 ) -> None:
446 _, address, _ = two_commit_repo
447 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
448 # If there is a real change, diff_lines must be non-empty.
449 if data["changed"]:
450 assert isinstance(data["diff_lines"], list)
451 assert len(data["diff_lines"]) > 0
452
453 def test_json_diff_lines_empty_when_not_dry_run(
454 self, two_commit_repo: tuple[pathlib.Path, str, str]
455 ) -> None:
456 _, address, _ = two_commit_repo
457 data = _invoke_json([address, "--commit", "HEAD~1"])
458 assert data["diff_lines"] == []
459
460
461 # ---------------------------------------------------------------------------
462 # E2E — real symbol restoration
463 # ---------------------------------------------------------------------------
464
465
466 class TestCheckoutSymbolE2E:
467 def test_restore_brings_back_old_implementation(
468 self, two_commit_repo: tuple[pathlib.Path, str, str]
469 ) -> None:
470 repo, address, file_path = two_commit_repo
471 runner.invoke(cli, [
472 "code", "checkout-symbol", address, "--commit", "HEAD~1",
473 ])
474 content = (repo / file_path).read_text()
475 # v1 returned sum(items), v2 returned sum(items) * 2
476 assert "sum(items)" in content
477 assert "sum(items) * 2" not in content
478
479 def test_surrounding_functions_unchanged(
480 self, two_commit_repo: tuple[pathlib.Path, str, str]
481 ) -> None:
482 """Critical: surgical restore must NOT touch header() or footer()."""
483 repo, address, file_path = two_commit_repo
484 runner.invoke(cli, [
485 "code", "checkout-symbol", address, "--commit", "HEAD~1",
486 ])
487 content = (repo / file_path).read_text()
488 assert 'return "billing"' in content
489 assert 'return "end"' in content
490
491 def test_restore_is_surgical_correct_line_count(
492 self, two_commit_repo: tuple[pathlib.Path, str, str]
493 ) -> None:
494 repo, address, file_path = two_commit_repo
495 original_lines = (repo / file_path).read_text().splitlines()
496 runner.invoke(cli, [
497 "code", "checkout-symbol", address, "--commit", "HEAD~1",
498 ])
499 restored_lines = (repo / file_path).read_text().splitlines()
500 # Both versions have the same number of body lines.
501 assert len(restored_lines) == len(original_lines)
502
503 def test_regression_symbol_found_in_place_not_appended(
504 self, two_commit_repo: tuple[pathlib.Path, str, str]
505 ) -> None:
506 """Bug regression: absolute-path lookup caused symbol to never be found,
507 so every restore appended instead of replacing in-place."""
508 repo, address, file_path = two_commit_repo
509 data = _invoke_json([address, "--commit", "HEAD~1"])
510 assert data["appended"] is False, (
511 "Symbol exists in working tree — must replace in-place, not append"
512 )
513 # File must not grow: appending adds lines, replacing keeps the count.
514 content = (repo / file_path).read_text()
515 # footer() must appear exactly once (not duplicated by append).
516 assert content.count('def footer') == 1
517
518 def test_no_op_detection_changed_false(
519 self, single_commit_repo: pathlib.Path
520 ) -> None:
521 """Restoring HEAD to HEAD is a no-op."""
522 data = _invoke_json(["utils.py::greet", "--commit", "HEAD"])
523 assert data["changed"] is False
524
525 def test_no_op_file_not_written(
526 self, single_commit_repo: pathlib.Path
527 ) -> None:
528 before = (single_commit_repo / "utils.py").read_text()
529 _invoke_json(["utils.py::greet", "--commit", "HEAD"])
530 after = (single_commit_repo / "utils.py").read_text()
531 assert before == after
532
533 def test_no_op_idempotent(
534 self, two_commit_repo: tuple[pathlib.Path, str, str]
535 ) -> None:
536 repo, address, file_path = two_commit_repo
537 runner.invoke(cli, [
538 "code", "checkout-symbol", address, "--commit", "HEAD~1",
539 ])
540 content_after_first = (repo / file_path).read_text()
541 runner.invoke(cli, [
542 "code", "checkout-symbol", address, "--commit", "HEAD~1",
543 ])
544 content_after_second = (repo / file_path).read_text()
545 assert content_after_first == content_after_second
546
547 def test_dry_run_diff_matches_actual_change(
548 self, two_commit_repo: tuple[pathlib.Path, str, str]
549 ) -> None:
550 """The dry-run diff, when applied to the current file, yields the
551 result that the real restore would produce."""
552 repo, address, file_path = two_commit_repo
553 dry_data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
554 # Now actually restore.
555 runner.invoke(cli, [
556 "code", "checkout-symbol", address, "--commit", "HEAD~1",
557 ])
558 restored = (repo / file_path).read_text()
559 # The dry-run diff_lines are unified diff lines — their `+++` side
560 # represents the post-restore content. We verify the symbol now matches.
561 assert "sum(items)" in restored
562 assert dry_data["changed"] is True
563
564 def test_appended_when_symbol_absent_from_working_tree(
565 self, repo: pathlib.Path
566 ) -> None:
567 """Symbol exists in history but not in the current working tree → appended."""
568 (repo / "mod.py").write_text(textwrap.dedent("""\
569 def alpha():
570 return 1
571
572 def beta():
573 return 2
574 """))
575 runner.invoke(cli, ["code", "add", "."])
576 r = runner.invoke(cli, ["commit", "-m", "add both"])
577 assert r.exit_code == 0, r.output
578
579 # Remove beta from the working tree and commit.
580 (repo / "mod.py").write_text(textwrap.dedent("""\
581 def alpha():
582 return 1
583 """))
584 runner.invoke(cli, ["code", "add", "."])
585 r2 = runner.invoke(cli, ["commit", "-m", "remove beta"])
586 assert r2.exit_code == 0, r2.output
587
588 data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"])
589 assert data["appended"] is True
590 content = (repo / "mod.py").read_text()
591 assert "def beta" in content
592
593 def test_restored_symbol_parseable_after_write(
594 self, two_commit_repo: tuple[pathlib.Path, str, str]
595 ) -> None:
596 """After restore, parse_symbols must still find the symbol."""
597 repo, address, file_path = two_commit_repo
598 runner.invoke(cli, [
599 "code", "checkout-symbol", address, "--commit", "HEAD~1",
600 ])
601 raw = (repo / file_path).read_bytes()
602 tree = parse_symbols(raw, file_path)
603 assert address in tree, f"Symbol {address} not parseable after restore"
604
605 def test_json_current_start_matches_actual_line(
606 self, two_commit_repo: tuple[pathlib.Path, str, str]
607 ) -> None:
608 repo, address, file_path = two_commit_repo
609 data = _invoke_json([address, "--commit", "HEAD~1"])
610 if data["changed"] and not data["appended"]:
611 content = (repo / file_path).read_text().splitlines()
612 start = data["current_start"]
613 assert 1 <= start <= len(content), f"current_start {start} out of range"
614
615
616 # ---------------------------------------------------------------------------
617 # Stress
618 # ---------------------------------------------------------------------------
619
620
621 class TestCheckoutSymbolStress:
622 def test_restore_in_large_file(self, repo: pathlib.Path) -> None:
623 """1 000-line file: only the target symbol lines change."""
624 # Build a file with 200 dummy functions + our target.
625 lines = ["def fn_{}():\n return {}\n\n".format(i, i) for i in range(200)]
626 lines.insert(100, "def target():\n return 'v1'\n\n")
627 (repo / "big.py").write_text("".join(lines))
628 runner.invoke(cli, ["code", "add", "."])
629 r1 = runner.invoke(cli, ["commit", "-m", "v1"])
630 assert r1.exit_code == 0, r1.output
631
632 # Modify just target in v2.
633 lines2 = list(lines)
634 lines2[100] = "def target():\n return 'v2'\n\n"
635 (repo / "big.py").write_text("".join(lines2))
636 runner.invoke(cli, ["code", "add", "."])
637 r2 = runner.invoke(cli, ["commit", "-m", "v2"])
638 assert r2.exit_code == 0, r2.output
639
640 before_lines = (repo / "big.py").read_text().splitlines()
641 runner.invoke(cli, [
642 "code", "checkout-symbol", "big.py::target", "--commit", "HEAD~1",
643 ])
644 after_lines = (repo / "big.py").read_text().splitlines()
645
646 assert len(before_lines) == len(after_lines), "No lines should be added/removed"
647 # Only target changed.
648 assert "'v1'" in "\n".join(after_lines)
649 assert "'v2'" not in "\n".join(after_lines)
650 # All other functions intact.
651 assert sum(1 for l in after_lines if l.startswith("def fn_")) == 200
652
653 def test_repeated_restore_is_idempotent_and_fast(
654 self, two_commit_repo: tuple[pathlib.Path, str, str]
655 ) -> None:
656 repo, address, file_path = two_commit_repo
657 # First restore.
658 runner.invoke(cli, [
659 "code", "checkout-symbol", address, "--commit", "HEAD~1",
660 ])
661 content_after_first = (repo / file_path).read_text()
662
663 start = time.monotonic()
664 for _ in range(10):
665 runner.invoke(cli, [
666 "code", "checkout-symbol", address, "--commit", "HEAD~1",
667 ])
668 elapsed = time.monotonic() - start
669
670 assert (repo / file_path).read_text() == content_after_first
671 assert elapsed < 15.0, f"10 repeated restores took {elapsed:.1f}s — too slow"
672
673 def test_restore_from_far_back_in_history(self, repo: pathlib.Path) -> None:
674 """Symbol restored from a commit 10 steps back must match that version."""
675 body = (repo / "hist.py")
676 for i in range(12):
677 body.write_text(f"def fn():\n return {i}\n")
678 runner.invoke(cli, ["code", "add", "."])
679 r = runner.invoke(cli, ["commit", "-m", f"v{i}"])
680 assert r.exit_code == 0, r.output
681
682 runner.invoke(cli, [
683 "code", "checkout-symbol", "hist.py::fn", "--commit", "HEAD~10",
684 ])
685 content = body.read_text()
686 assert "return 1" in content # HEAD is v11 (i=11), HEAD~10 is v1 (i=1)
687
688
689 # ---------------------------------------------------------------------------
690 # Verification — post-write and dry-run preview
691 # ---------------------------------------------------------------------------
692
693
694 class TestCheckoutSymbolVerification:
695 """Tests for the post-write verification and dry-run verified_preview."""
696
697 def test_json_verified_true_on_clean_restore(
698 self, two_commit_repo: tuple[pathlib.Path, str, str]
699 ) -> None:
700 _, address, _ = two_commit_repo
701 data = _invoke_json([address, "--commit", "HEAD~1"])
702 assert data["changed"] is True
703 assert data.get("verified") is True
704
705 def test_json_verified_true_on_no_op(
706 self, single_commit_repo: pathlib.Path
707 ) -> None:
708 # Non-dry-run no-op: verified field is present, no write.
709 data = _invoke_json(["utils.py::greet", "--commit", "HEAD"])
710 assert data["changed"] is False
711 assert data.get("verified") is True
712
713 def test_json_dry_run_no_op_has_verified_preview(
714 self, single_commit_repo: pathlib.Path
715 ) -> None:
716 """dry-run on a no-op must still return verified_preview, not short-circuit.
717
718 Agents routinely run --dry-run before every write. If the no-op path
719 returns early without verified_preview, those pipelines break on a
720 KeyError even though the command is logically correct.
721 """
722 data = _invoke_json(["utils.py::greet", "--commit", "HEAD", "--dry-run"])
723 # changed is False (no-op), but dry-run must still emit verified_preview.
724 assert data.get("changed") is False
725 assert "verified_preview" in data, (
726 "dry-run no-op must include verified_preview — "
727 "omitting it breaks agent pipelines that always inspect this field"
728 )
729 assert data["verified_preview"] is True
730 assert data.get("diff_lines") == []
731
732 def test_json_verified_preview_true_in_dry_run(
733 self, two_commit_repo: tuple[pathlib.Path, str, str]
734 ) -> None:
735 _, address, _ = two_commit_repo
736 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
737 assert data["dry_run"] is True
738 assert "verified_preview" in data
739 assert data.get("verified_preview") is True
740
741 def test_json_verified_present_after_append(
742 self, repo: pathlib.Path
743 ) -> None:
744 """verified must be True even when the symbol is appended to EOF."""
745 (repo / "mod.py").write_text(textwrap.dedent("""\
746 def alpha():
747 return 1
748
749 def beta():
750 return 2
751 """))
752 runner.invoke(cli, ["code", "add", "."])
753 runner.invoke(cli, ["commit", "-m", "v1"])
754 (repo / "mod.py").write_text("def alpha():\n return 1\n")
755 runner.invoke(cli, ["code", "add", "."])
756 runner.invoke(cli, ["commit", "-m", "drop beta"])
757
758 data = _invoke_json(["mod.py::beta", "--commit", "HEAD~1"])
759 assert data.get("appended") is True
760 assert data.get("verified") is True
761
762 def test_json_no_verified_preview_on_non_dry_run(
763 self, two_commit_repo: tuple[pathlib.Path, str, str]
764 ) -> None:
765 """verified_preview must not appear on a live write — only dry-run has it."""
766 _, address, _ = two_commit_repo
767 data = _invoke_json([address, "--commit", "HEAD~1"])
768 assert "verified_preview" not in data
769
770 def test_json_no_verified_on_dry_run(
771 self, two_commit_repo: tuple[pathlib.Path, str, str]
772 ) -> None:
773 """verified (write-path field) must not appear on dry-run output."""
774 _, address, _ = two_commit_repo
775 data = _invoke_json([address, "--commit", "HEAD~1", "--dry-run"])
776 assert "verified" not in data
777
778 def test_verified_false_triggers_warning(
779 self,
780 two_commit_repo: tuple[pathlib.Path, str, str],
781 monkeypatch: pytest.MonkeyPatch,
782 capfd: pytest.CaptureFixture[str],
783 ) -> None:
784 """When the post-write parse fails, verified=false and a warning is emitted."""
785 import muse.cli.commands.checkout_symbol as cs_mod
786
787 call_count = 0
788 original = cs_mod._find_symbol_in_source
789
790 def patched(
791 source: bytes, file_rel: str, address: str
792 ) -> SymbolRecord | None:
793 nonlocal call_count
794 call_count += 1
795 # First two calls: historical lookup + current lookup — behave normally.
796 # Third call (post-write verification) — simulate parse failure.
797 if call_count >= 3:
798 return None
799 return original(source, file_rel, address)
800
801 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
802
803 _, address, _ = two_commit_repo
804 result = runner.invoke(cli, [
805 "code", "checkout-symbol", address, "--commit", "HEAD~1", "--json",
806 ])
807 assert result.exit_code == 0, result.output
808 data: _CheckoutPayload = json.loads(result.output)
809 assert data.get("verified") is False
810
811 def test_verified_false_file_still_written(
812 self,
813 two_commit_repo: tuple[pathlib.Path, str, str],
814 monkeypatch: pytest.MonkeyPatch,
815 ) -> None:
816 """Verification failure must not prevent the file from being written."""
817 import muse.cli.commands.checkout_symbol as cs_mod
818
819 call_count = 0
820 original = cs_mod._find_symbol_in_source
821
822 def patched(
823 source: bytes, file_rel: str, address: str
824 ) -> SymbolRecord | None:
825 nonlocal call_count
826 call_count += 1
827 if call_count >= 3:
828 return None
829 return original(source, file_rel, address)
830
831 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
832
833 repo, address, file_path = two_commit_repo
834 before = (repo / file_path).read_text()
835 runner.invoke(cli, [
836 "code", "checkout-symbol", address, "--commit", "HEAD~1",
837 ])
838 after = (repo / file_path).read_text()
839 # File must differ — verification failure must not roll back the write.
840 assert before != after
841
842 def test_empty_historical_lines_exits_before_write(
843 self,
844 two_commit_repo: tuple[pathlib.Path, str, str],
845 monkeypatch: pytest.MonkeyPatch,
846 ) -> None:
847 """A corrupted snapshot producing zero lines must abort before writing."""
848 import muse.cli.commands.checkout_symbol as cs_mod
849
850 monkeypatch.setattr(cs_mod, "_extract_lines", lambda *a, **kw: [])
851
852 repo, address, file_path = two_commit_repo
853 original_content = (repo / file_path).read_text()
854
855 result = runner.invoke(cli, [
856 "code", "checkout-symbol", address, "--commit", "HEAD~1",
857 ])
858 assert result.exit_code != 0
859 # File must be completely untouched.
860 assert (repo / file_path).read_text() == original_content
861
862 def test_text_output_warns_when_not_verified(
863 self,
864 two_commit_repo: tuple[pathlib.Path, str, str],
865 monkeypatch: pytest.MonkeyPatch,
866 ) -> None:
867 """Text mode must show a warning instead of ✅ when verification fails."""
868 import muse.cli.commands.checkout_symbol as cs_mod
869
870 call_count = 0
871 original = cs_mod._find_symbol_in_source
872
873 def patched(
874 source: bytes, file_rel: str, address: str
875 ) -> SymbolRecord | None:
876 nonlocal call_count
877 call_count += 1
878 if call_count >= 3:
879 return None
880 return original(source, file_rel, address)
881
882 monkeypatch.setattr(cs_mod, "_find_symbol_in_source", patched)
883
884 _, address, _ = two_commit_repo
885 result = runner.invoke(cli, [
886 "code", "checkout-symbol", address, "--commit", "HEAD~1",
887 ])
888 assert result.exit_code == 0
889 assert "verification failed" in result.output.lower() or "⚠️" in result.output
890
891
892 # ---------------------------------------------------------------------------
893 # Flag tests
894 # ---------------------------------------------------------------------------
895
896
897 import argparse as _argparse
898
899
900 class TestRegisterFlags:
901 def _parse(self, *args: str) -> _argparse.Namespace:
902 from muse.cli.commands.checkout_symbol import register
903 p = _argparse.ArgumentParser()
904 sub = p.add_subparsers()
905 register(sub)
906 return p.parse_args(["checkout-symbol", *args])
907
908 def test_default_json_out_is_false(self) -> None:
909 ns = self._parse("foo.py::bar", "--commit", "HEAD")
910 assert ns.json_out is False
911
912 def test_json_flag_sets_json_out(self) -> None:
913 ns = self._parse("--json", "foo.py::bar", "--commit", "HEAD")
914 assert ns.json_out is True
915
916 def test_j_shorthand_sets_json_out(self) -> None:
917 ns = self._parse("-j", "foo.py::bar", "--commit", "HEAD")
918 assert ns.json_out is True
File History 1 commit