"""Comprehensive hardening tests for ``muse cherry-pick``. Covers all changes introduced in the cherry-pick command review: Unit ---- - Parser flags: --dry-run, --message/-m, --force, --no-commit, --format, --json - Dead-code removal: _read_branch absent, pathlib not imported - validate_branch_name called in run() - target.message sanitized before embedding in commit record - ref sanitized in "not found" error - Write ordering: write_snapshot → write_commit → apply_manifest → write_branch_ref - Fail-fast on missing parent commit / parent snapshot (no silent {} fallback) Integration ----------- - Error messages routed to stderr, stdout clean - JSON schema identical and complete for all code paths (normal, --no-commit, --dry-run, conflict) - --dry-run performs no writes (branch ref, workdir, reflog unchanged) - --no-commit applies workdir changes without advancing the branch ref - Reflog entry appended after normal cherry-pick - -m/--message overrides the cherry-picked commit message - Missing parent commit → INTERNAL_ERROR (not silent fallback) - Missing target snapshot → INTERNAL_ERROR End-to-end ---------- - Text output format - JSON output format with full schema verification - Cherry-pick from another branch applies correct content - --force bypasses dirty-workdir guard Security -------- - ANSI escape codes in ref rejected / sanitized in error - ANSI in original commit message not propagated to stored commit - --format with unknown value exits 1 and prints to stderr - Conflict paths sanitized in text output Stress ------ - Cherry-pick across a 200-commit history - 50 sequential cherry-picks in the same repo - Concurrent cherry-picks to isolated repos """ from __future__ import annotations import argparse import inspect import json import pathlib import threading import time import pytest from muse.core.types import fake_id, long_id, short_id, split_id from tests.cli_test_helper import CliRunner cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Repo on ``main`` with two commits: base (a.py) + target (b.py). The caller can immediately cherry-pick the HEAD commit to a new branch. """ monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke(cli, ["init"], env=_env(tmp_path), catch_exceptions=False) assert r.exit_code == 0, r.output (tmp_path / "a.py").write_text("x = 1\n") r = runner.invoke(cli, ["commit", "-m", "base"], env=_env(tmp_path), catch_exceptions=False) assert r.exit_code == 0, r.output (tmp_path / "b.py").write_text("y = 2\n") runner.invoke(cli, ["code", "add", "."], env=_env(tmp_path), catch_exceptions=False) r = runner.invoke(cli, ["commit", "-m", "add b"], env=_env(tmp_path), catch_exceptions=False) assert r.exit_code == 0, r.output return tmp_path @pytest.fixture() def two_branch_repo( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> tuple[pathlib.Path, str]: """Repo with main and feat branches, returns (root, commit-id-on-feat). ``main``: base commit only ``feat``: base commit + one extra commit (the one to cherry-pick) """ monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) runner.invoke(cli, ["init"], env=_env(tmp_path), catch_exceptions=False) (tmp_path / "base.py").write_text("base\n") runner.invoke(cli, ["commit", "-m", "base"], env=_env(tmp_path), catch_exceptions=False) runner.invoke(cli, ["branch", "feat"], env=_env(tmp_path), catch_exceptions=False) runner.invoke(cli, ["checkout", "feat"], env=_env(tmp_path), catch_exceptions=False) (tmp_path / "extra.py").write_text("extra\n") runner.invoke(cli, ["code", "add", "."], env=_env(tmp_path), catch_exceptions=False) runner.invoke(cli, ["commit", "-m", "extra on feat"], env=_env(tmp_path), catch_exceptions=False) from muse.core.refs import get_head_commit_id feat_cid = get_head_commit_id(tmp_path, "feat") assert feat_cid is not None runner.invoke(cli, ["checkout", "main"], env=_env(tmp_path), catch_exceptions=False) return tmp_path, feat_cid def _head_id(repo: pathlib.Path, branch: str = "main") -> str | None: from muse.core.refs import get_head_commit_id return get_head_commit_id(repo, branch) # --------------------------------------------------------------------------- # Unit — parser flags and dead-code removal # --------------------------------------------------------------------------- class TestRegisterFlags: def _parse(self, *args: str) -> argparse.Namespace: import muse.cli.commands.cherry_pick as m p = argparse.ArgumentParser() sub = p.add_subparsers() m.register(sub) return p.parse_args(["cherry-pick", *args]) def test_dry_run_flag(self) -> None: ns = self._parse("abc123", "--dry-run") assert ns.dry_run is True def test_dry_run_default_false(self) -> None: ns = self._parse("abc123") assert ns.dry_run is False def test_no_commit_short(self) -> None: ns = self._parse("abc123", "-n") assert ns.no_commit is True def test_no_commit_long(self) -> None: ns = self._parse("abc123", "--no-commit") assert ns.no_commit is True def test_force_flag(self) -> None: ns = self._parse("abc123", "--force") assert ns.force is True def test_message_short(self) -> None: ns = self._parse("abc123", "-m", "my msg") assert ns.message == "my msg" def test_message_long(self) -> None: ns = self._parse("abc123", "--message", "my msg") assert ns.message == "my msg" def test_message_default_none(self) -> None: ns = self._parse("abc123") assert ns.message is None def test_format_json_shorthand(self) -> None: ns = self._parse("abc123", "--json") assert ns.fmt == "json" def test_format_explicit_text(self) -> None: ns = self._parse("abc123", "--format", "text") assert ns.fmt == "text" def test_ref_positional(self) -> None: ns = self._parse("deadbeef") assert ns.ref == "deadbeef" class TestDeadCodeRemoval: def test_no_read_branch_wrapper(self) -> None: import muse.cli.commands.cherry_pick as m assert not hasattr(m, "_read_branch"), "_read_branch must be deleted" def test_pathlib_not_imported(self) -> None: import muse.cli.commands.cherry_pick as m assert "import pathlib" not in inspect.getsource(m) def test_validate_branch_name_in_run(self) -> None: import muse.cli.commands.cherry_pick as m assert "validate_branch_name" in inspect.getsource(m.run) def test_target_message_sanitized_in_run(self) -> None: import muse.cli.commands.cherry_pick as m assert "sanitize_display(target.message" in inspect.getsource(m.run) def test_ref_sanitized_in_not_found_error(self) -> None: import muse.cli.commands.cherry_pick as m assert "sanitize_display(ref)" in inspect.getsource(m.run) def test_write_snapshot_before_apply_manifest_in_normal_path(self) -> None: """Normal path: write_snapshot and write_commit must precede apply_manifest.""" import muse.cli.commands.cherry_pick as m src_lines = [ (i, l) for i, l in enumerate(inspect.getsource(m.run).split("\n"), 1) if l.strip() and not l.strip().startswith("#") ] ws = next(i for i, l in src_lines if "write_snapshot(" in l) wc = next(i for i, l in src_lines if "write_commit(" in l) wr = next(i for i, l in src_lines if "write_branch_ref(" in l) # Find the LAST apply_manifest (normal path, not no_commit path) all_am = [i for i, l in src_lines if "apply_manifest(" in l] last_am = max(all_am) assert ws < last_am, f"write_snapshot ({ws}) must precede apply_manifest ({last_am})" assert wc < last_am, f"write_commit ({wc}) must precede apply_manifest ({last_am})" assert last_am < wr, f"apply_manifest ({last_am}) must precede write_branch_ref ({wr})" def test_parent_snapshot_missing_raises_not_silently_falls_back(self) -> None: """Code must not silently use {} when parent snapshot is missing.""" import muse.cli.commands.cherry_pick as m src = inspect.getsource(m.run) # The silent fallback was: `if parent_snap: base_manifest = parent_snap.manifest` # The fix is: `raise SystemExit(ExitCode.INTERNAL_ERROR)` when parent_snap is None assert "INTERNAL_ERROR" in src # --------------------------------------------------------------------------- # Integration — error routing and behaviour # --------------------------------------------------------------------------- class TestErrorRouting: def test_not_found_to_stderr(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, _ = two_branch_repo r = runner.invoke(cli, ["cherry-pick", "badref"], env=_env(root)) assert r.exit_code != 0 assert "not found" in (r.stderr or "").lower() assert "badref" in (r.stderr or "") def test_not_found_stdout_clean(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, _ = two_branch_repo r = runner.invoke(cli, ["cherry-pick", "0000000000000000"], env=_env(root)) assert r.exit_code != 0 # Error must be in stderr; stdout should have no error messages. assert "not found" in (r.stderr or "").lower() class TestJsonSchema: """JSON schema must be identical across all code paths.""" _REQUIRED_KEYS = { "status", "commit_id", "branch", "ref", "source_commit_id", "snapshot_id", "message", "no_commit", "dry_run", "conflicts", } def test_normal_json_schema_complete( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED_KEYS <= d.keys() def test_normal_status_is_picked( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert d["status"] == "picked" assert d["no_commit"] is False assert d["dry_run"] is False assert d["conflicts"] == [] def test_normal_ref_field_matches_input( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", short_id(cid), "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert d["ref"] == short_id(cid) def test_normal_snapshot_id_is_string( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) # Canonical Muse IDs are "sha256:<64 hex chars>" = 71 chars total assert isinstance(d["snapshot_id"], str) assert d["snapshot_id"].startswith("sha256:") assert len(d["snapshot_id"]) == 71 def test_no_commit_json_schema_complete( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED_KEYS <= d.keys() def test_no_commit_status_is_applied( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert d["status"] == "applied" assert d["commit_id"] is None assert d["no_commit"] is True assert d["dry_run"] is False def test_dry_run_json_schema_complete( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED_KEYS <= d.keys() def test_dry_run_status( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert d["dry_run"] is True assert d["commit_id"] is None assert d["status"] == "dry_run" def test_all_three_schemas_identical( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r_dr = runner.invoke(cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False) r_nc = runner.invoke(cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False) # Normal cherry-pick: need to re-fetch after --no-commit modified workdir r_commit = runner.invoke(cli, ["commit", "-m", "after nc"], env=_env(root), catch_exceptions=False) from muse.core.refs import get_head_commit_id new_cid = get_head_commit_id(root, "main") assert new_cid is not None # Need a different source commit to cherry-pick after the no-commit pick r_nm = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) keys_dr = set(json.loads(r_dr.output).keys()) keys_nc = set(json.loads(r_nc.output).keys()) keys_nm = set(json.loads(r_nm.output).keys()) assert keys_dr == keys_nc == keys_nm class TestDryRun: def test_no_commit_created(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: from muse.core.refs import get_head_commit_id from muse.core.commits import get_all_commits root, cid = two_branch_repo before_count = len(get_all_commits(root)) before_head = get_head_commit_id(root, "main") r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output assert len(get_all_commits(root)) == before_count assert get_head_commit_id(root, "main") == before_head def test_workdir_unchanged(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, cid = two_branch_repo extra = root / "extra.py" content_before = extra.read_text() if extra.exists() else None r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output assert (extra.read_text() if extra.exists() else None) == content_before def test_reflog_unchanged(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: from muse.core.reflog import read_reflog root, cid = two_branch_repo before = len(read_reflog(root, "main")) runner.invoke(cli, ["cherry-pick", cid, "--dry-run"], env=_env(root), catch_exceptions=False) assert len(read_reflog(root, "main")) == before def test_dry_run_text_output_mentions_dry_run( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run"], env=_env(root), catch_exceptions=False, ) assert "dry-run" in r.output.lower() or "would" in r.output.lower() def test_dry_run_invalid_ref_errors(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, _ = two_branch_repo r = runner.invoke(cli, ["cherry-pick", "no-such-ref", "--dry-run"], env=_env(root)) assert r.exit_code != 0 def test_dry_run_json_snapshot_id_present( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) # Canonical Muse IDs are "sha256:<64 hex chars>" = 71 chars total assert d["snapshot_id"] is not None assert d["snapshot_id"].startswith("sha256:") assert len(d["snapshot_id"]) == 71 class TestNoCommit: def test_branch_ref_not_advanced(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: from muse.core.refs import get_head_commit_id root, cid = two_branch_repo before_head = get_head_commit_id(root, "main") r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output assert get_head_commit_id(root, "main") == before_head def test_workdir_modified(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output # extra.py was added by the feat branch commit assert (root / "extra.py").exists() def test_no_commit_in_json(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert d["no_commit"] is True assert d["commit_id"] is None assert d["status"] == "applied" def test_reflog_not_written_for_no_commit( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: from muse.core.reflog import read_reflog root, cid = two_branch_repo before = len(read_reflog(root, "main")) runner.invoke( cli, ["cherry-pick", cid, "--no-commit"], env=_env(root), catch_exceptions=False, ) assert len(read_reflog(root, "main")) == before class TestReflog: def test_reflog_appended(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: from muse.core.reflog import read_reflog root, cid = two_branch_repo before = len(read_reflog(root, "main")) runner.invoke(cli, ["cherry-pick", cid], env=_env(root), catch_exceptions=False) assert len(read_reflog(root, "main")) > before def test_reflog_operation_contains_cherry_pick( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: from muse.core.reflog import read_reflog root, cid = two_branch_repo runner.invoke(cli, ["cherry-pick", cid], env=_env(root), catch_exceptions=False) entries = read_reflog(root, "main") # read_reflog returns newest-first assert "cherry-pick" in entries[0].operation.lower() class TestMessageFlag: def test_custom_message_in_json(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "-m", "custom pick msg", "--json"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert d["message"] == "custom pick msg" def test_custom_message_in_text(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "-m", "undo extra"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output assert "undo extra" in r.output def test_default_message_is_source_message( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert d["message"] == "extra on feat" def test_message_stored_in_commit_record( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: from muse.core.refs import get_head_commit_id from muse.core.commits import read_commit root, cid = two_branch_repo runner.invoke( cli, ["cherry-pick", cid, "-m", "my override"], env=_env(root), catch_exceptions=False, ) new_cid = get_head_commit_id(root, "main") assert new_cid is not None rec = read_commit(root, new_cid) assert rec is not None assert rec.message == "my override" class TestWriteOrdering: def test_write_snapshot_before_apply_manifest( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: """write_snapshot must be called before apply_manifest in the normal path.""" from unittest.mock import patch import muse.cli.commands.cherry_pick as cp_mod from muse.core.snapshots import write_snapshot, SnapshotRecord events: list[str] = [] orig_ws = write_snapshot from muse.core.workdir import apply_manifest as orig_am_fn def tracking_write_snapshot(root: pathlib.Path, rec: SnapshotRecord) -> None: orig_ws(root, rec) events.append("write_snapshot") def tracking_apply(root: pathlib.Path, prev: Manifest, manifest: Manifest) -> None: orig_am_fn(root, prev, manifest) events.append("apply_manifest") root, cid = two_branch_repo with ( patch.object(cp_mod, "write_snapshot", tracking_write_snapshot), patch("muse.cli.commands.cherry_pick.apply_manifest", tracking_apply), ): runner.invoke(cli, ["cherry-pick", cid], env=_env(root), catch_exceptions=False) assert "write_snapshot" in events assert "apply_manifest" in events assert events.index("write_snapshot") < events.index("apply_manifest"), ( f"write_snapshot must precede apply_manifest, got: {events}" ) class TestFailFast: def test_missing_parent_commit_is_error(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """When the target has a parent_commit_id but the parent object is missing, cherry-pick must exit INTERNAL_ERROR — not silently use empty base.""" import datetime monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) runner.invoke(cli, ["init"], env=_env(tmp_path), catch_exceptions=False) from muse.core.refs import get_head_commit_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_commit, hash_snapshot repo_id = (repo_json_path(tmp_path)).read_text() import json as _json repo_id = _json.loads(repo_id)["repo_id"] # Create a base commit m1: Manifest = {} s1 = hash_snapshot(m1) t1 = datetime.datetime.now(datetime.timezone.utc) c1 = hash_commit( parent_ids=[], snapshot_id=s1, message="base", committed_at_iso=t1.isoformat(), ) write_snapshot(tmp_path, SnapshotRecord(snapshot_id=s1, manifest=m1)) write_commit(tmp_path, CommitRecord( commit_id=c1, branch="main", snapshot_id=s1, message="base", committed_at=t1, parent_commit_id=None, )) (heads_dir(tmp_path) / "main").write_text(c1) # Create a second commit that references c1 as parent m2: Manifest = {} s2 = hash_snapshot(m2) t2 = datetime.datetime.now(datetime.timezone.utc) c2 = hash_commit( parent_ids=[c1], snapshot_id=s2, message="target", committed_at_iso=t2.isoformat(), ) write_snapshot(tmp_path, SnapshotRecord(snapshot_id=s2, manifest=m2)) write_commit(tmp_path, CommitRecord( commit_id=c2, branch="main", snapshot_id=s2, message="target", committed_at=t2, parent_commit_id=c1, )) # Now delete the parent commit object to simulate object-store corruption. # Commits are stored in the unified object store: objects//<2>/<62> from muse.core.object_store import object_path as _obj_path commit_obj = _obj_path(tmp_path, c1) if commit_obj.exists(): import os as _os _os.chmod(commit_obj, 0o644) commit_obj.unlink() # Reset HEAD to base so we can cherry-pick c2 "from another branch" # Switch to a fresh branch at c1's snapshot runner.invoke(cli, ["branch", "target-branch"], env=_env(tmp_path), catch_exceptions=False) runner.invoke(cli, ["checkout", "target-branch"], env=_env(tmp_path), catch_exceptions=False) (heads_dir(tmp_path) / "target-branch").write_text(c1) r = runner.invoke(cli, ["cherry-pick", c2], env=_env(tmp_path)) # Must fail with INTERNAL_ERROR (exit code 3), not succeed silently assert r.exit_code != 0 # --------------------------------------------------------------------------- # End-to-end — text and JSON output # --------------------------------------------------------------------------- class TestTextOutput: def test_text_shows_branch_and_short_id( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid], env=_env(root), catch_exceptions=False) assert r.exit_code == 0 assert "main" in r.output def test_no_commit_text_mentions_workdir( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit"], env=_env(root), catch_exceptions=False, ) output = r.output.lower() assert "working tree" in output or "applied" in output or "commit" in output def test_workdir_has_cherry_picked_file( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo runner.invoke(cli, ["cherry-pick", cid], env=_env(root), catch_exceptions=False) assert (root / "extra.py").exists() assert (root / "extra.py").read_text() == "extra\n" class TestJsonOutput: def test_source_commit_id_matches( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert d["source_commit_id"] == cid def test_branch_field(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert d["branch"] == "main" def test_new_commit_id_different_from_source( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert d["commit_id"] != cid # Canonical Muse IDs are "sha256:<64 hex chars>" = 71 chars total assert isinstance(d["commit_id"], str) assert d["commit_id"].startswith("sha256:") assert len(d["commit_id"]) == 71 def test_conflicts_empty_on_success( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert d["conflicts"] == [] class TestForce: def test_force_bypasses_dirty_check( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo (root / "base.py").write_text("modified but uncommitted\n") r = runner.invoke( cli, ["cherry-pick", cid, "--force"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0, r.output def test_without_force_dirty_tree_fails( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo (root / "base.py").write_text("uncommitted change\n") r = runner.invoke(cli, ["cherry-pick", cid], env=_env(root)) assert r.exit_code != 0 # --------------------------------------------------------------------------- # Security — ANSI injection and sanitization # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_ref_error_in_stderr(self, two_branch_repo: tuple[pathlib.Path, str]) -> None: root, _ = two_branch_repo ansi_ref = "\x1b[31mbadref\x1b[0m" r = runner.invoke(cli, ["cherry-pick", ansi_ref], env=_env(root)) assert r.exit_code != 0 assert "\x1b[31m" not in (r.stdout or "") assert "badref" in (r.stderr or "") def test_ansi_in_commit_message_not_in_stored_commit( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: """If the source commit has ANSI in its message, the cherry-pick commit stored on disk must not contain raw escape sequences.""" from unittest.mock import patch from muse.core.commits import read_commit, CommitRecord import muse.cli.commands.cherry_pick as cp_mod root, cid = two_branch_repo orig_rc = read_commit def poisoned_read_commit(root: pathlib.Path, c: str) -> CommitRecord | None: rec = orig_rc(root, c) if rec is not None and rec.commit_id == cid: return CommitRecord( commit_id=rec.commit_id, branch=rec.branch, snapshot_id=rec.snapshot_id, message="\x1b[31mmalicious\x1b[0m", committed_at=rec.committed_at, parent_commit_id=rec.parent_commit_id, ) return rec with patch.object(cp_mod, "read_commit", poisoned_read_commit): r = runner.invoke( cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False, ) if r.exit_code == 0: d = json.loads(r.output) assert "\x1b[" not in d.get("message", ""), ( "Cherry-pick commit message must not contain raw ANSI from source" ) # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: @pytest.mark.slow def test_cherry_pick_deep_in_history(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """Cherry-pick a commit that's deep in a 200-commit chain.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) env = _env(tmp_path) runner.invoke(cli, ["init"], env=env, catch_exceptions=False) (tmp_path / "seed.py").write_text("seed\n") runner.invoke(cli, ["commit", "-m", "seed"], env=env, catch_exceptions=False) runner.invoke(cli, ["branch", "source"], env=env, catch_exceptions=False) runner.invoke(cli, ["checkout", "source"], env=env, catch_exceptions=False) (tmp_path / "target.py").write_text("target\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", "target commit"], env=env, catch_exceptions=False) from muse.core.refs import get_head_commit_id target_cid = get_head_commit_id(tmp_path, "source") assert target_cid is not None for i in range(198): (tmp_path / f"f{i}.py").write_text(f"{i}\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", f"c{i}"], env=env, catch_exceptions=False) runner.invoke(cli, ["checkout", "main"], env=env, catch_exceptions=False) r = runner.invoke( cli, ["cherry-pick", target_cid, "--json"], env=env, catch_exceptions=False, ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert d["source_commit_id"] == target_cid assert d["status"] == "picked" @pytest.mark.slow def test_sequential_cherry_picks(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """30 sequential cherry-picks must all succeed.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) env = _env(tmp_path) runner.invoke(cli, ["init"], env=env, catch_exceptions=False) (tmp_path / "base.py").write_text("base\n") runner.invoke(cli, ["commit", "-m", "base"], env=env, catch_exceptions=False) runner.invoke(cli, ["branch", "source"], env=env, catch_exceptions=False) runner.invoke(cli, ["checkout", "source"], env=env, catch_exceptions=False) # Create 30 non-conflicting commits on source source_cids: list[str] = [] for i in range(30): (tmp_path / f"s{i}.py").write_text(f"s{i}\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", f"src{i}"], env=env, catch_exceptions=False) from muse.core.refs import get_head_commit_id cid = get_head_commit_id(tmp_path, "source") assert cid is not None source_cids.append(cid) runner.invoke(cli, ["checkout", "main"], env=env, catch_exceptions=False) failures: list[str] = [] for i, cid in enumerate(source_cids): r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=env) if r.exit_code != 0: failures.append(f"pick {i}: exit={r.exit_code} {r.output.strip()[:60]}") assert not failures, f"Sequential failures: {failures}" @pytest.mark.slow def test_dry_run_performance(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """--dry-run on a large repo must complete in < 3 s.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) env = _env(tmp_path) runner.invoke(cli, ["init"], env=env, catch_exceptions=False) (tmp_path / "base.py").write_text("base\n") runner.invoke(cli, ["commit", "-m", "base"], env=env, catch_exceptions=False) runner.invoke(cli, ["branch", "src"], env=env, catch_exceptions=False) runner.invoke(cli, ["checkout", "src"], env=env, catch_exceptions=False) (tmp_path / "target.py").write_text("target\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", "target"], env=env, catch_exceptions=False) from muse.core.refs import get_head_commit_id target_cid = get_head_commit_id(tmp_path, "src") assert target_cid is not None for i in range(100): (tmp_path / f"f{i}.py").write_text(f"{i}\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", f"c{i}"], env=env, catch_exceptions=False) runner.invoke(cli, ["checkout", "main"], env=env, catch_exceptions=False) start = time.perf_counter() r = runner.invoke( cli, ["cherry-pick", target_cid, "--dry-run", "--json"], env=env, catch_exceptions=False, ) elapsed = time.perf_counter() - start assert r.exit_code == 0, r.output assert elapsed < 3.0, f"--dry-run took {elapsed:.2f}s" # --------------------------------------------------------------------------- # Agent supercharge — duration_ms and exit_code in every JSON output # --------------------------------------------------------------------------- class TestElapsed: """Every JSON output path must include ``duration_ms`` as a float.""" def test_picked_json_has_elapsed( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert "duration_ms" in d assert isinstance(d["duration_ms"], float) def test_applied_json_has_elapsed( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert "duration_ms" in d assert isinstance(d["duration_ms"], float) def test_dry_run_json_has_elapsed( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert "duration_ms" in d assert isinstance(d["duration_ms"], float) def test_conflict_json_has_elapsed( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """The conflict JSON path must also include duration_ms.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) env = _env(tmp_path) runner.invoke(cli, ["init"], env=env, catch_exceptions=False) (tmp_path / "shared.py").write_text("line1\nline2\nline3\n") runner.invoke(cli, ["commit", "-m", "base"], env=env, catch_exceptions=False) runner.invoke(cli, ["branch", "src"], env=env, catch_exceptions=False) runner.invoke(cli, ["checkout", "src"], env=env, catch_exceptions=False) (tmp_path / "shared.py").write_text("line1\nSRC_LINE2\nline3\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", "src change"], env=env, catch_exceptions=False) from muse.core.refs import get_head_commit_id as _gci src_cid = _gci(tmp_path, "src") assert src_cid is not None runner.invoke(cli, ["checkout", "main"], env=env, catch_exceptions=False) (tmp_path / "shared.py").write_text("line1\nMAIN_LINE2\nline3\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", "main change"], env=env, catch_exceptions=False) r = runner.invoke(cli, ["cherry-pick", src_cid, "--json"], env=env) # Conflict should produce JSON with duration_ms even on exit 1 assert r.exit_code == 1 d = json.loads(r.output) assert "duration_ms" in d assert isinstance(d["duration_ms"], float) class TestExitCode: """Every successful JSON path includes ``exit_code: 0``; conflict path has ``exit_code: 1``.""" def test_picked_json_exit_code_0( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) assert d["exit_code"] == 0 def test_applied_json_exit_code_0( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert d["exit_code"] == 0 def test_dry_run_json_exit_code_0( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) assert d["exit_code"] == 0 def test_conflict_json_exit_code_1( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Conflict JSON must report exit_code: 1, mirroring the process exit.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) env = _env(tmp_path) runner.invoke(cli, ["init"], env=env, catch_exceptions=False) (tmp_path / "shared.py").write_text("line1\nline2\nline3\n") runner.invoke(cli, ["commit", "-m", "base"], env=env, catch_exceptions=False) runner.invoke(cli, ["branch", "src"], env=env, catch_exceptions=False) runner.invoke(cli, ["checkout", "src"], env=env, catch_exceptions=False) (tmp_path / "shared.py").write_text("line1\nSRC_LINE2\nline3\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", "src change"], env=env, catch_exceptions=False) from muse.core.refs import get_head_commit_id as _gci src_cid = _gci(tmp_path, "src") assert src_cid is not None runner.invoke(cli, ["checkout", "main"], env=env, catch_exceptions=False) (tmp_path / "shared.py").write_text("line1\nMAIN_LINE2\nline3\n") runner.invoke(cli, ["code", "add", "."], env=env, catch_exceptions=False) runner.invoke(cli, ["commit", "-m", "main change"], env=env, catch_exceptions=False) r = runner.invoke(cli, ["cherry-pick", src_cid, "--json"], env=env) assert r.exit_code == 1 d = json.loads(r.output) assert d["exit_code"] == 1 class TestJsonSchemaComplete: """``duration_ms`` and ``exit_code`` must be in every JSON output.""" _FULL_KEYS = { "status", "commit_id", "branch", "ref", "source_commit_id", "snapshot_id", "message", "no_commit", "dry_run", "conflicts", "duration_ms", "exit_code", "muse_version", "schema", "timestamp", "warnings", } def test_picked_has_complete_schema( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False) d = json.loads(r.output) missing = self._FULL_KEYS - d.keys() assert not missing, f"Missing keys in 'picked' JSON: {missing}" def test_applied_has_complete_schema( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) missing = self._FULL_KEYS - d.keys() assert not missing, f"Missing keys in 'applied' JSON: {missing}" def test_dry_run_has_complete_schema( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False, ) d = json.loads(r.output) missing = self._FULL_KEYS - d.keys() assert not missing, f"Missing keys in 'dry_run' JSON: {missing}" def test_all_schemas_identical( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: """All three success paths must have identical key sets.""" root, cid = two_branch_repo r_dr = runner.invoke( cli, ["cherry-pick", cid, "--dry-run", "--json"], env=_env(root), catch_exceptions=False, ) r_nc = runner.invoke( cli, ["cherry-pick", cid, "--no-commit", "--json"], env=_env(root), catch_exceptions=False, ) runner.invoke(cli, ["commit", "-m", "after nc"], env=_env(root), catch_exceptions=False) r_nm = runner.invoke( cli, ["cherry-pick", cid, "--json"], env=_env(root), catch_exceptions=False, ) keys_dr = set(json.loads(r_dr.output).keys()) keys_nc = set(json.loads(r_nc.output).keys()) keys_nm = set(json.loads(r_nm.output).keys()) assert keys_dr == keys_nc == keys_nm == self._FULL_KEYS class TestTextOutputHex: """Text output must show sha256: prefix + 8 hex chars — canonical and algorithm-identifying.""" def test_picked_text_shows_prefixed_short_id( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: root, cid = two_branch_repo r = runner.invoke(cli, ["cherry-pick", cid], env=_env(root), catch_exceptions=False) assert r.exit_code == 0 from muse.core.refs import get_head_commit_id new_cid = get_head_commit_id(root, "main") assert new_cid is not None short = new_cid[:len("sha256:") + 8] assert short in r.output, ( f"Expected '{short}' in cherry-pick output, got: {r.output!r}" ) def test_dry_run_text_shows_prefixed_full_id( self, two_branch_repo: tuple[pathlib.Path, str] ) -> None: import re root, cid = two_branch_repo r = runner.invoke( cli, ["cherry-pick", cid, "--dry-run"], env=_env(root), catch_exceptions=False, ) assert r.exit_code == 0 # The dry-run text shows the source full ID in parens: (sha256:<64hex>) match = re.search(r'\(sha256:([0-9a-f]{64})\)', r.output) assert match is not None, ( f"Expected '(sha256:<64hex>)' in dry-run output, got: {r.output!r}" ) assert long_id(match.group(1)) == cid, ( f"Expected {cid} in parens, got {long_id(match.group(1))}" ) # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.cherry_pick import register as _register_cherry_pick from muse.core.paths import heads_dir, repo_json_path def _parse_cp(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_cherry_pick(subs) return root_p.parse_args(["cherry-pick", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_cp(fake_id("a")) assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_cp(fake_id("a"), "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_cp(fake_id("a"), "-j") assert ns.json_out is True def test_no_commit_flag(self) -> None: ns = _parse_cp(fake_id("a"), "--no-commit") assert ns.no_commit is True def test_no_commit_has_no_n_shorthand(self) -> None: import pytest with pytest.raises(SystemExit): _parse_cp(fake_id("a"), "-n")