"""Tests for ``muse code semantic-cherry-pick``. Coverage layers --------------- Unit _verify_symbol — hit (symbol parseable), miss (symbol not in tree), file read error, corrupt bytes. _apply_symbol — no-separator address, path-traversal, file_missing (obj not in manifest, blob missing), parse_error source, parse_error current, already_current, applied (replace), applied (append), new-file creation, dry-run (no write), diff_lines populated, verified field populated. src_cache — same blob fetched only once across multiple calls. Integration (live repo, CliRunner) Exits zero for valid cherry-pick. JSON schema: all required top-level keys present, correct types. JSON: schema_version, branch, from_commit (8-char hex), dry_run, results[], applied, already_current, failed, unverified. Per-result JSON: address, status, detail, old_lines, new_lines, diff_lines, verified. --dry-run: file not written, diff_lines populated in JSON. --dry-run verified is True for valid output. already_current result when symbol body unchanged. ADDRESS without '::' → status not_found. Path-traversal ADDRESS → status not_found. Unknown --from ref → exits non-zero. Symbol not in source commit → status not_found. File not in source snapshot → status file_missing. Multiple addresses in one invocation: all results present. Multiple addresses to same file: blob fetched once (src_cache). Text output contains commit short-hash, applied/failed counts. Missing repo → exits non-zero. unverified list populated when verification fails (monkeypatched). E2E (real symbol changes across commits) Applied symbol replaces only target lines; surrounding code unchanged. Applying from earlier commit restores old implementation. Dry-run leaves file unchanged while returning accurate diff_lines. already_current: re-applying the same symbol is idempotent. Symbol absent from working tree is appended at EOF and verifiable. verified=True after clean write. Multi-symbol single invocation applies all independently. Cross-file cherry-pick applies to the correct file. Stress 50-symbol repo: all cherry-picked in one invocation, all applied. Large file (1 000 lines): only target symbol lines change. Repeated idempotent cherry-pick: outcome stable, no file growth. """ from __future__ import annotations from collections.abc import Mapping type _FileStore = dict[str, bytes] import json import pathlib import textwrap import time from typing import TypedDict from unittest import mock import pytest from tests.cli_test_helper import CliRunner from muse.cli.commands.semantic_cherry_pick import ( ApplyStatus, _PickResult, _apply_symbol, _verify_symbol, ) from muse.plugins.code.ast_parser import parse_symbols from muse.core.types import Manifest, long_id from muse.core.object_store import object_path from muse.core.paths import muse_dir # --------------------------------------------------------------------------- # Shared CLI runner (accepts any first arg for legacy compatibility) # --------------------------------------------------------------------------- runner = CliRunner() cli = None # CliRunner always targets muse.cli.app.main # --------------------------------------------------------------------------- # TypedDicts — strict JSON schema validation # --------------------------------------------------------------------------- class _ResultEntry(TypedDict): address: str status: str detail: str old_lines: int new_lines: int diff_lines: list[str] verified: bool class _CherryPickPayload(TypedDict): schema_version: str branch: str from_commit: str dry_run: bool results: list[_ResultEntry] applied: int already_current: int failed: int unverified: list[str] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _invoke_json(args: list[str]) -> _CherryPickPayload: result = runner.invoke(cli, ["code", "semantic-cherry-pick"] + args + ["--json"]) assert result.exit_code == 0, result.output raw: _CherryPickPayload = json.loads(result.output) return raw # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["init", "--domain", "code"]) assert result.exit_code == 0, result.output return tmp_path @pytest.fixture def two_commit_repo(repo: pathlib.Path) -> tuple[pathlib.Path, str, str, str]: """Repo with two commits with different implementations of compute(). commit 1 (HEAD~1): compute returns sum(items) commit 2 (HEAD): compute returns sum(items) * 2 Returns (root, address, file_rel, HEAD~1_short). """ (repo / "billing.py").write_text(textwrap.dedent("""\ def header(): return "billing" def compute(items): return sum(items) def footer(): return "end" """)) runner.invoke(cli, ["code", "add", "billing.py"]) r1 = runner.invoke(cli, ["commit", "-m", "v1"]) assert r1.exit_code == 0, r1.output (repo / "billing.py").write_text(textwrap.dedent("""\ def header(): return "billing" def compute(items): return sum(items) * 2 def footer(): return "end" """)) runner.invoke(cli, ["code", "add", "billing.py"]) r2 = runner.invoke(cli, ["commit", "-m", "v2"]) assert r2.exit_code == 0, r2.output log_out = runner.invoke(cli, ["log", "--json"]) commits: list[Mapping[str, str]] = json.loads(log_out.output)["commits"] head_minus_1 = commits[1]["commit_id"][:8] return repo, "billing.py::compute", "billing.py", head_minus_1 @pytest.fixture def multi_file_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with two files, each containing two functions. Useful for cross-file and same-file multi-symbol tests. """ (repo / "auth.py").write_text(textwrap.dedent("""\ def validate_token(tok): return tok == "secret" def refresh_token(tok): return tok + "_refreshed" """)) (repo / "billing.py").write_text(textwrap.dedent("""\ def compute(items): return sum(items) def discount(items): return sum(items) * 0.9 """)) runner.invoke(cli, ["code", "add", "."]) r1 = runner.invoke(cli, ["commit", "-m", "v1"]) assert r1.exit_code == 0, r1.output # v2 — both files change (repo / "auth.py").write_text(textwrap.dedent("""\ def validate_nonce(nonce): return len(nonce) == 64 def refresh_nonce(nonce): return nonce + "_v2" """)) (repo / "billing.py").write_text(textwrap.dedent("""\ def compute(items): return sum(items) * 2 def discount(items): return sum(items) * 0.8 """)) runner.invoke(cli, ["code", "add", "."]) r2 = runner.invoke(cli, ["commit", "-m", "v2"]) assert r2.exit_code == 0, r2.output return repo # --------------------------------------------------------------------------- # Unit — _verify_symbol # --------------------------------------------------------------------------- class TestVerifySymbol: def test_valid_symbol_returns_true(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "m.py" f.write_text("def foo():\n return 1\n") assert _verify_symbol(f, "m.py", "m.py::foo") is True def test_missing_symbol_returns_false(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "m.py" f.write_text("def bar():\n return 2\n") assert _verify_symbol(f, "m.py", "m.py::foo") is False def test_syntax_error_returns_false(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "m.py" f.write_bytes(b"def broken(:\n pass\n") assert _verify_symbol(f, "m.py", "m.py::broken") is False def test_missing_file_returns_false(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "nonexistent.py" assert _verify_symbol(f, "nonexistent.py", "nonexistent.py::x") is False def test_empty_file_returns_false(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "empty.py" f.write_text("") assert _verify_symbol(f, "empty.py", "empty.py::anything") is False # --------------------------------------------------------------------------- # Unit — _apply_symbol # --------------------------------------------------------------------------- class TestApplySymbol: def _manifest_with_blob( self, root: pathlib.Path, file_rel: str, content: bytes ) -> tuple[Mapping[str, str], Mapping[str, bytes]]: """Create a synthetic manifest entry by writing a blob to object store.""" from muse.core.types import blob_id from muse.core.object_store import write_object as _wo oid = blob_id(content) _wo(root, oid, content) manifest: Manifest = {file_rel: oid} src_cache: _FileStore = {} return manifest, src_cache def test_no_separator_returns_not_found(self, tmp_path: pathlib.Path) -> None: result = _apply_symbol(tmp_path, "nocolon", {}, False, {}) assert result.status == "not_found" assert "separator" in result.detail def test_path_traversal_returns_not_found(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() result = _apply_symbol(tmp_path, "../../etc/shadow::root", {}, False, {}) assert result.status == "not_found" def test_file_not_in_manifest(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() result = _apply_symbol(tmp_path, "missing.py::func", {}, False, {}) assert result.status == "file_missing" assert "not in source snapshot" in result.detail def test_blob_missing_from_object_store(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() manifest: Manifest = {"src.py": long_id("a" * 64)} result = _apply_symbol(tmp_path, "src.py::func", manifest, False, {}) assert result.status == "file_missing" assert "missing from object store" in result.detail def test_source_parse_error(self, tmp_path: pathlib.Path) -> None: """parse_error is returned when parse_symbols raises (e.g. for non-Python files). parse_symbols silently returns {} for bad Python (catches SyntaxError internally), so parse_error is only reachable when parse_symbols itself raises — e.g. due to an unsupported file type or an internal adapter bug. We test this via mocking to verify the error handling path in _apply_symbol. """ muse_dir(tmp_path).mkdir() content = b"def foo():\n pass\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", content) with mock.patch( "muse.cli.commands.semantic_cherry_pick.parse_symbols", side_effect=RuntimeError("adapter exploded"), ): result = _apply_symbol(tmp_path, "src.py::foo", manifest, False, src_cache) assert result.status == "parse_error" def test_symbol_not_in_source(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() content = b"def other():\n pass\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", content) result = _apply_symbol(tmp_path, "src.py::missing_sym", manifest, False, src_cache) assert result.status == "not_found" assert "not found in source commit" in result.detail def test_already_current_returns_correct_status(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() content = b"def foo():\n return 1\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", content) (tmp_path / "src.py").write_bytes(content) result = _apply_symbol(tmp_path, "src.py::foo", manifest, False, src_cache) assert result.status == "already_current" assert result.old_lines == 0 assert result.new_lines == 0 def test_applied_replace_writes_new_body(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() src_content = b"def foo():\n return 42\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", src_content) (tmp_path / "src.py").write_text("def foo():\n return 1\n\ndef bar():\n pass\n") result = _apply_symbol(tmp_path, "src.py::foo", manifest, False, src_cache) assert result.status == "applied" text = (tmp_path / "src.py").read_text() assert "return 42" in text assert "bar" in text # surrounding code preserved def test_applied_append_when_symbol_missing_in_current(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() src_content = b"def new_func():\n return 99\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", src_content) (tmp_path / "src.py").write_text("def existing():\n pass\n") result = _apply_symbol(tmp_path, "src.py::new_func", manifest, False, src_cache) assert result.status == "applied" assert "appended" in result.detail text = (tmp_path / "src.py").read_text() assert "new_func" in text assert "existing" in text def test_creates_new_file_when_target_absent(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() src_content = b"def fresh():\n return 0\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "new_module.py", src_content) result = _apply_symbol(tmp_path, "new_module.py::fresh", manifest, False, src_cache) assert result.status == "applied" assert "created file" in result.detail assert (tmp_path / "new_module.py").exists() def test_dry_run_does_not_write(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() src_content = b"def foo():\n return 42\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", src_content) (tmp_path / "src.py").write_text("def foo():\n return 1\n") _apply_symbol(tmp_path, "src.py::foo", manifest, True, src_cache) assert "return 1" in (tmp_path / "src.py").read_text() def test_dry_run_new_file_not_created(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() src_content = b"def ghost():\n pass\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "ghost.py", src_content) _apply_symbol(tmp_path, "ghost.py::ghost", manifest, True, src_cache) assert not (tmp_path / "ghost.py").exists() def test_diff_lines_populated_on_replace(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() src_content = b"def foo():\n return 42\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", src_content) (tmp_path / "src.py").write_text("def foo():\n return 1\n") result = _apply_symbol(tmp_path, "src.py::foo", manifest, False, src_cache) assert result.status == "applied" assert len(result.diff_lines) > 0 diff_text = "\n".join(result.diff_lines) assert "-" in diff_text assert "+" in diff_text def test_diff_lines_empty_when_already_current(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() content = b"def foo():\n return 1\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", content) (tmp_path / "src.py").write_bytes(content) result = _apply_symbol(tmp_path, "src.py::foo", manifest, False, src_cache) assert result.diff_lines == [] def test_verified_true_on_clean_write(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() src_content = b"def foo():\n return 42\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", src_content) (tmp_path / "src.py").write_text("def foo():\n return 1\n") result = _apply_symbol(tmp_path, "src.py::foo", manifest, False, src_cache) assert result.verified is True def test_src_cache_prevents_double_fetch(self, tmp_path: pathlib.Path) -> None: """Same blob ID requested twice must be fetched only once.""" muse_dir(tmp_path).mkdir() content = b"def foo():\n return 1\ndef bar():\n return 2\n" manifest, src_cache = self._manifest_with_blob(tmp_path, "src.py", content) (tmp_path / "src.py").write_bytes(content) call_count = 0 original_read = __import__( "muse.core.object_store", fromlist=["read_object"] ).read_object def counting_read(root: pathlib.Path, obj_id: str) -> bytes | None: nonlocal call_count call_count += 1 result: bytes | None = original_read(root, obj_id) return result with mock.patch( "muse.cli.commands.semantic_cherry_pick.read_object", side_effect=counting_read ): _apply_symbol(tmp_path, "src.py::foo", manifest, False, src_cache) _apply_symbol(tmp_path, "src.py::bar", manifest, False, src_cache) assert call_count == 1, "Same blob should be fetched only once across calls" # --------------------------------------------------------------------------- # Integration — CLI runner tests # --------------------------------------------------------------------------- class TestCherryPickCLI: def test_exit_zero_on_valid_pick(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, head_m1 = two_commit_repo result = runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) assert result.exit_code == 0 def test_json_top_level_keys(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, head_m1 = two_commit_repo data = _invoke_json([address, "--from", "HEAD~1"]) for key in ("schema", "branch", "from_commit", "dry_run", "results", "applied", "already_current", "failed", "unverified"): assert key in data, f"Missing top-level key: {key}" def test_json_from_commit_is_full_id(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, _ = two_commit_repo data = _invoke_json([address, "--from", "HEAD~1"]) assert data["from_commit"].startswith("sha256:") assert len(data["from_commit"]) == 71 def test_json_result_keys(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, _ = two_commit_repo data = _invoke_json([address, "--from", "HEAD~1"]) assert len(data["results"]) == 1 r = data["results"][0] for key in ("address", "status", "detail", "old_lines", "new_lines", "diff_lines", "verified"): assert key in r, f"Missing result key: {key}" def test_json_applied_count(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, _ = two_commit_repo data = _invoke_json([address, "--from", "HEAD~1"]) assert data["applied"] == 1 assert data["failed"] == 0 def test_json_dry_run_flag(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, _ = two_commit_repo result = runner.invoke( cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1", "--dry-run", "--json"], ) assert result.exit_code == 0 data: _CherryPickPayload = json.loads(result.output) assert data["dry_run"] is True def test_dry_run_does_not_write(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: root, address, file_rel, _ = two_commit_repo before = (root / file_rel).read_text() runner.invoke( cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1", "--dry-run"], ) assert (root / file_rel).read_text() == before def test_dry_run_diff_lines_in_json(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, _ = two_commit_repo result = runner.invoke( cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1", "--dry-run", "--json"], ) data: _CherryPickPayload = json.loads(result.output) r = data["results"][0] assert isinstance(r["diff_lines"], list) assert len(r["diff_lines"]) > 0 def test_dry_run_verified_true_for_valid( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, _ = two_commit_repo result = runner.invoke( cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1", "--dry-run", "--json"], ) data: _CherryPickPayload = json.loads(result.output) assert data["results"][0]["verified"] is True def test_already_current_on_same_commit( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, _ = two_commit_repo data = _invoke_json([address, "--from", "HEAD"]) assert data["results"][0]["status"] == "already_current" assert data["already_current"] == 1 def test_no_separator_address_is_not_found( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, _, _, _ = two_commit_repo data = _invoke_json(["noseparator", "--from", "HEAD~1"]) assert data["results"][0]["status"] == "not_found" assert data["failed"] == 1 def test_path_traversal_is_not_found( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: data = _invoke_json(["../../etc/shadow::passwd", "--from", "HEAD~1"]) assert data["results"][0]["status"] == "not_found" assert data["failed"] == 1 def test_unknown_from_ref_exits_nonzero( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, _ = two_commit_repo result = runner.invoke( cli, ["code", "semantic-cherry-pick", address, "--from", "nonexistent-ref"] ) assert result.exit_code != 0 def test_symbol_not_in_source_is_not_found( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, _, _, _ = two_commit_repo data = _invoke_json(["billing.py::ghost_func", "--from", "HEAD~1"]) assert data["results"][0]["status"] == "not_found" def test_file_not_in_snapshot_is_file_missing( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: data = _invoke_json(["nonexistent_file.py::func", "--from", "HEAD~1"]) assert data["results"][0]["status"] == "file_missing" def test_multiple_addresses_all_in_results(self, multi_file_repo: pathlib.Path) -> None: data = _invoke_json( ["auth.py::validate_token", "auth.py::refresh_token", "--from", "HEAD~1"] ) assert len(data["results"]) == 2 addrs = {r["address"] for r in data["results"]} assert "auth.py::validate_token" in addrs assert "auth.py::refresh_token" in addrs def test_multiple_addresses_same_file_applied_count(self, multi_file_repo: pathlib.Path) -> None: data = _invoke_json( ["auth.py::validate_token", "auth.py::refresh_token", "--from", "HEAD~1"] ) assert data["applied"] == 2 assert data["failed"] == 0 def test_cross_file_addresses_applied(self, multi_file_repo: pathlib.Path) -> None: data = _invoke_json( ["auth.py::validate_token", "billing.py::compute", "--from", "HEAD~1"] ) assert data["applied"] == 2 assert data["failed"] == 0 def test_text_output_contains_commit_hash( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, head_m1 = two_commit_repo result = runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) assert head_m1 in result.output def test_text_output_counts(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: _, address, _, _ = two_commit_repo result = runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) assert "1 applied" in result.output assert "0 failed" in result.output def test_missing_repo_exits_nonzero(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke( cli, ["code", "semantic-cherry-pick", "a.py::foo", "--from", "HEAD~1"] ) assert result.exit_code != 0 def test_unverified_populated_when_verify_fails( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, _ = two_commit_repo with mock.patch( "muse.cli.commands.semantic_cherry_pick._verify_symbol", return_value=False ): result = runner.invoke( cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1", "--json"], ) data: _CherryPickPayload = json.loads(result.output) assert address in data["unverified"] assert data["results"][0]["verified"] is False # --------------------------------------------------------------------------- # E2E — real symbol diffs across commits # --------------------------------------------------------------------------- class TestCherryPickE2E: def test_applied_replaces_only_target_lines( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: root, address, file_rel, _ = two_commit_repo runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) text = (root / file_rel).read_text() # Old implementation restored assert "return sum(items)" in text assert "* 2" not in text # Surrounding functions untouched assert "def header" in text assert "def footer" in text def test_applied_from_earlier_commit_restores_old_impl( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: root, address, file_rel, _ = two_commit_repo before_pick = (root / file_rel).read_text() assert "* 2" in before_pick # HEAD has the * 2 version runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) after_pick = (root / file_rel).read_text() assert "* 2" not in after_pick def test_dry_run_leaves_file_unchanged( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: root, address, file_rel, _ = two_commit_repo original = (root / file_rel).read_bytes() runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1", "--dry-run"]) assert (root / file_rel).read_bytes() == original def test_dry_run_diff_lines_accurate( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, _ = two_commit_repo result = runner.invoke( cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1", "--dry-run", "--json"], ) data: _CherryPickPayload = json.loads(result.output) diff_text = "\n".join(data["results"][0]["diff_lines"]) # Should remove the * 2 line and restore the plain sum assert "sum(items)" in diff_text def test_already_current_is_idempotent( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: root, address, file_rel, _ = two_commit_repo runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) text_after_first = (root / file_rel).read_text() # Apply again — must be a no-op runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) assert (root / file_rel).read_text() == text_after_first def test_second_apply_is_already_current( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, _ = two_commit_repo runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) data = _invoke_json([address, "--from", "HEAD~1"]) assert data["results"][0]["status"] == "already_current" def test_appended_symbol_parseable(self, two_commit_repo: tuple[pathlib.Path, str, str, str]) -> None: root, _, _, _ = two_commit_repo # billing.py doesn't have 'header2' in source; cherry-pick should append # Use a symbol that exists in source (HEAD~1) but was removed in HEAD. # Add a new symbol in commit 3 to simulate absence in working tree. (root / "utils.py").write_text("def helper():\n return True\n") runner.invoke(cli, ["code", "add", "utils.py"]) runner.invoke(cli, ["commit", "-m", "v3"]) # Working tree no longer has utils.py (overwrite with something else) (root / "utils.py").write_text("def other():\n return False\n") runner.invoke(cli, ["code", "add", "utils.py"]) runner.invoke(cli, ["commit", "-m", "v4"]) # Cherry-pick helper from v3 (HEAD~1 now) runner.invoke(cli, ["code", "semantic-cherry-pick", "utils.py::helper", "--from", "HEAD~1"]) raw = (root / "utils.py").read_bytes() tree = parse_symbols(raw, "utils.py") assert "utils.py::helper" in tree def test_verified_true_after_clean_write( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: _, address, _, _ = two_commit_repo data = _invoke_json([address, "--from", "HEAD~1"]) assert data["results"][0]["verified"] is True def test_multi_symbol_same_file_applies_all(self, multi_file_repo: pathlib.Path) -> None: root = multi_file_repo runner.invoke( cli, ["code", "semantic-cherry-pick", "auth.py::validate_token", "auth.py::refresh_token", "--from", "HEAD~1"], ) text = (root / "auth.py").read_text() # Old implementations restored assert 'tok == "secret"' in text assert '"_refreshed"' in text def test_cross_file_cherry_pick_correct_files(self, multi_file_repo: pathlib.Path) -> None: root = multi_file_repo runner.invoke( cli, ["code", "semantic-cherry-pick", "auth.py::validate_token", "billing.py::compute", "--from", "HEAD~1"], ) auth_text = (root / "auth.py").read_text() bill_text = (root / "billing.py").read_text() assert 'tok == "secret"' in auth_text assert "return sum(items)" in bill_text assert "* 2" not in bill_text # --------------------------------------------------------------------------- # E2E — regression: all results returned even on mixed success/failure # --------------------------------------------------------------------------- class TestCherryPickMixedResults: def test_failure_does_not_stop_remaining_addresses( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: """All symbols processed; failure in one doesn't skip subsequent ones.""" _, _, _, _ = two_commit_repo data = _invoke_json([ "billing.py::ghost_func", # not_found "billing.py::compute", # applied "--from", "HEAD~1", ]) statuses = {r["address"]: r["status"] for r in data["results"]} assert statuses["billing.py::ghost_func"] == "not_found" assert statuses["billing.py::compute"] == "applied" assert data["applied"] == 1 assert data["failed"] == 1 def test_mixed_results_counts_accurate( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: data = _invoke_json([ "billing.py::compute", # applied "billing.py::ghost", # not_found "missing_file.py::func", # file_missing "--from", "HEAD~1", ]) assert data["applied"] == 1 assert data["failed"] == 2 # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestCherryPickStress: def test_many_symbols_all_applied(self, repo: pathlib.Path) -> None: """50 distinct functions, all cherry-picked in one invocation.""" n = 50 funcs = "\n\n".join(f"def func_{i}():\n return {i}" for i in range(n)) (repo / "big.py").write_text(f"{funcs}\n") runner.invoke(cli, ["code", "add", "big.py"]) r1 = runner.invoke(cli, ["commit", "-m", "v1"]) assert r1.exit_code == 0 new_funcs = "\n\n".join(f"def func_{i}():\n return {i * 10}" for i in range(n)) (repo / "big.py").write_text(f"{new_funcs}\n") runner.invoke(cli, ["code", "add", "big.py"]) r2 = runner.invoke(cli, ["commit", "-m", "v2"]) assert r2.exit_code == 0 addresses = [f"big.py::func_{i}" for i in range(n)] result = runner.invoke( cli, ["code", "semantic-cherry-pick"] + addresses + ["--from", "HEAD~1", "--json"], ) assert result.exit_code == 0 data: _CherryPickPayload = json.loads(result.output) # Every symbol should be applied or already_current (no failures) assert data["failed"] == 0 assert data["applied"] + data["already_current"] == n def test_large_file_only_target_lines_change(self, repo: pathlib.Path) -> None: """1 000-line file: cherry-pick changes exactly target symbol, nothing else.""" header = "def noop():\n pass\n\n" target_v1 = "def target():\n return 'v1'\n\n" footer_lines = "".join(f"def pad_{i}():\n pass\n\n" for i in range(100)) (repo / "large.py").write_text(header + target_v1 + footer_lines) runner.invoke(cli, ["code", "add", "large.py"]) runner.invoke(cli, ["commit", "-m", "v1"]) target_v2 = "def target():\n return 'v2'\n\n" (repo / "large.py").write_text(header + target_v2 + footer_lines) runner.invoke(cli, ["code", "add", "large.py"]) runner.invoke(cli, ["commit", "-m", "v2"]) runner.invoke(cli, ["code", "semantic-cherry-pick", "large.py::target", "--from", "HEAD~1"]) text = (repo / "large.py").read_text() assert "'v1'" in text assert "'v2'" not in text for i in range(100): assert f"def pad_{i}" in text def test_repeated_cherry_pick_is_idempotent( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: root, address, file_rel, _ = two_commit_repo runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) text_1 = (root / file_rel).read_text() for _ in range(5): runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) assert (root / file_rel).read_text() == text_1 def test_repeated_cherry_pick_is_fast( self, two_commit_repo: tuple[pathlib.Path, str, str, str] ) -> None: """Idempotent cherry-picks should complete well within 2 seconds each.""" _, address, _, _ = two_commit_repo runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) start = time.monotonic() for _ in range(10): runner.invoke(cli, ["code", "semantic-cherry-pick", address, "--from", "HEAD~1"]) elapsed = time.monotonic() - start assert elapsed < 20.0, f"10 idempotent cherry-picks took {elapsed:.1f}s — too slow" def test_src_cache_scales_with_many_same_file_addresses( self, repo: pathlib.Path ) -> None: """Blob fetch count stays 1 regardless of how many symbols target same file.""" n = 20 content = "\n\n".join(f"def sym_{i}():\n return {i}" for i in range(n)) (repo / "cache_test.py").write_text(f"{content}\n") runner.invoke(cli, ["code", "add", "cache_test.py"]) runner.invoke(cli, ["commit", "-m", "v1"]) # Mutate so all symbols differ content_v2 = "\n\n".join(f"def sym_{i}():\n return {i * 100}" for i in range(n)) (repo / "cache_test.py").write_text(f"{content_v2}\n") runner.invoke(cli, ["code", "add", "cache_test.py"]) runner.invoke(cli, ["commit", "-m", "v2"]) call_count = 0 original_read = __import__( "muse.core.object_store", fromlist=["read_object"] ).read_object def counting_read(r: pathlib.Path, obj_id: str) -> bytes | None: nonlocal call_count call_count += 1 fetched: bytes | None = original_read(r, obj_id) return fetched addresses = [f"cache_test.py::sym_{i}" for i in range(n)] with mock.patch( "muse.cli.commands.semantic_cherry_pick.read_object", side_effect=counting_read ): result = runner.invoke( cli, ["code", "semantic-cherry-pick"] + addresses + ["--from", "HEAD~1", "--json"], ) assert result.exit_code == 0 # The source blob for cache_test.py must be fetched exactly once assert call_count == 1, ( f"Expected 1 blob fetch for {n} symbols in the same file; got {call_count}" )