"""Tests for ``muse checkout``. Coverage tiers -------------- Unit — parser flags, dead-code removal, docstring schema. Integration — switch, create, already_on, detach, --dry-run, conflict resolution. End-to-end — full CLI invocations: text and JSON output, all operations. Security — ANSI injection in target, error routing to stderr. Stress — checkout under high file counts, concurrent checkouts. """ from __future__ import annotations import json import os import pathlib import subprocess import threading import time from typing import TYPE_CHECKING import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import short_id from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.cli.config import read_branch_meta if TYPE_CHECKING: import argparse runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _checkout(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["checkout", *extra]) def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: _invoke(repo, ["code", "add", "."]) return _invoke(repo, ["commit", *extra]) def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["branch", *extra]) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one commit on ``main``.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "a.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") return tmp_path @pytest.fixture() def two_branch_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with ``main`` and ``feat`` branches, each with unique content.""" _branch(repo, "feat") _checkout(repo, "feat") (repo / "feat.py").write_text("f = 1\n") _commit(repo, "-m", "feat commit") _checkout(repo, "main") return repo # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.checkout import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["checkout", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse("main") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("main", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("main", "-j") assert ns.json_out is True def test_create_flag(self) -> None: ns = self._parse("-b", "new") assert ns.create is True def test_force_flag(self) -> None: ns = self._parse("main", "--force") assert ns.force is True def test_force_short_flag(self) -> None: ns = self._parse("main", "-f") assert ns.force is True def test_dry_run_flag(self) -> None: ns = self._parse("main", "--dry-run") assert ns.dry_run is True def test_dry_run_short_flag(self) -> None: ns = self._parse("main", "-n") assert ns.dry_run is True def test_dry_run_default_false(self) -> None: ns = self._parse("main") assert ns.dry_run is False def test_ours_flag(self) -> None: ns = self._parse("--ours", "file.py") assert ns.resolve_ours is True def test_theirs_flag(self) -> None: ns = self._parse("--theirs", "file.py") assert ns.resolve_theirs is True def test_all_flag(self) -> None: ns = self._parse("--ours", "--all") assert ns.resolve_all is True def test_target_optional(self) -> None: ns = self._parse() assert ns.target is None # ────────────────────────────────────────────────────────────────────────────── # Unit — dead-code removal # ────────────────────────────────────────────────────────────────────────────── class TestDeadCodeRemoved: def test_read_current_branch_wrapper_removed(self) -> None: import muse.cli.commands.checkout as m assert not hasattr(m, "_read_current_branch"), ( "_read_current_branch was a dead one-liner wrapper and must be deleted" ) def test_inline_sanitize_display_import_removed(self) -> None: import inspect import muse.cli.commands.checkout as m src = inspect.getsource(m.run) assert "sanitize_display as _sd" not in src, ( "The inline 'from muse.core.validation import sanitize_display as _sd' " "inside run() was a redundant re-import of the module-level sanitize_display" ) # ────────────────────────────────────────────────────────────────────────────── # Integration — SWITCH (existing branch) # ────────────────────────────────────────────────────────────────────────────── class TestSwitch: def test_switch_exits_0(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat") assert result.exit_code == 0 def test_switch_changes_branch(self, two_branch_repo: pathlib.Path) -> None: _checkout(two_branch_repo, "feat") assert read_current_branch(two_branch_repo) == "feat" def test_switch_text_output(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat") assert "feat" in result.output assert "Switched" in result.output def test_switch_json_schema(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat", "--json") data = json.loads(result.output) assert data["action"] == "switched" assert data["branch"] == "feat" assert data["from_branch"] == "main" assert "commit_id" in data assert data.get("dry_run") is False def test_switch_restores_files(self, two_branch_repo: pathlib.Path) -> None: """Files unique to ``feat`` appear after checkout and disappear on return.""" _checkout(two_branch_repo, "feat") assert (two_branch_repo / "feat.py").exists() _checkout(two_branch_repo, "main") assert not (two_branch_repo / "feat.py").exists() def test_switch_to_nonexistent_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "does-not-exist") assert result.exit_code == 1 def test_switch_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo, "ghost") assert result.exit_code == 1 # Error must appear in stderr, not exclusively stdout assert "not a branch" in (result.stderr or "").lower() # ────────────────────────────────────────────────────────────────────────────── # Integration — ALREADY_ON # ────────────────────────────────────────────────────────────────────────────── class TestAlreadyOn: def test_already_on_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main") assert result.exit_code == 0 def test_already_on_text(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main") assert "Already on" in result.output def test_already_on_json_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--json") data = json.loads(result.output) assert data["action"] == "already_on" assert data["branch"] == "main" assert data["from_branch"] == "main" assert "commit_id" in data # ────────────────────────────────────────────────────────────────────────────── # Integration — FORCE on current branch (working-tree restore) # ────────────────────────────────────────────────────────────────────────────── class TestForceOnCurrentBranch: """checkout --force must restore the working tree to HEAD. Regression: previously this was a no-op ('Already on main') regardless of --force. Git's behaviour is to restore missing/modified tracked files even when the branch is already current. """ def test_force_current_branch_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main") assert result.exit_code == 0 def test_force_current_branch_restores_deleted_file(self, repo: pathlib.Path) -> None: (repo / "a.py").unlink() assert not (repo / "a.py").exists() _checkout(repo, "--force", "main") assert (repo / "a.py").exists(), "force checkout must restore deleted tracked file" def test_force_current_branch_restores_modified_file(self, repo: pathlib.Path) -> None: original = (repo / "a.py").read_text() (repo / "a.py").write_text("corrupted content\n") _checkout(repo, "--force", "main") assert (repo / "a.py").read_text() == original, ( "force checkout must restore modified tracked file to HEAD content" ) def test_force_current_branch_text_output(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main") assert "restored" in result.output def test_force_current_branch_json_action(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main", "--json") data = json.loads(result.output) assert data["action"] == "restored" assert data["branch"] == "main" def test_force_current_branch_dry_run_does_not_restore( self, repo: pathlib.Path ) -> None: (repo / "a.py").unlink() _checkout(repo, "--force", "--dry-run", "main") assert not (repo / "a.py").exists(), ( "--dry-run must not actually restore files" ) def test_force_current_branch_dry_run_json(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "--dry-run", "main", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "restored" def test_without_force_still_noop_on_current_branch( self, repo: pathlib.Path ) -> None: """Without --force, checkout on current branch is still a no-op.""" result = _checkout(repo, "main") assert "Already on" in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — CREATE (-b) # ────────────────────────────────────────────────────────────────────────────── class TestCreate: def test_create_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "new-branch") assert result.exit_code == 0 def test_create_switches_to_new_branch(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "new-branch") assert read_current_branch(repo) == "new-branch" def test_create_text_output(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "my-branch") assert "my-branch" in result.output def test_create_json_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "json-branch", "--json") data = json.loads(result.output) assert data["action"] == "created" assert data["branch"] == "json-branch" assert data["from_branch"] == "main" assert "commit_id" in data assert data.get("dry_run") is False def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "dup") _checkout(repo, "main") result = _checkout(repo, "-b", "dup") assert result.exit_code == 1 def test_create_duplicate_error_to_stderr(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "dup2") _checkout(repo, "main") result = _checkout(repo, "-b", "dup2") # Error must appear in stderr assert "already exists" in (result.stderr or "").lower() def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "bad..name") assert result.exit_code == 1 def test_create_invalid_name_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "bad..name") assert "Invalid" in (result.stderr or "") def test_create_with_dirty_workdir_succeeds(self, repo: pathlib.Path) -> None: """checkout -b must succeed even with uncommitted changes. Creating a new branch starts at the current HEAD — no file content changes, so dirty tracked files cannot be overwritten. Blocking here forces an unnecessary shelf/pop dance. """ # Dirty the tracked file without committing (repo / "a.py").write_text("x = 2\n") result = _checkout(repo, "-b", "task/dirty-ok") assert result.exit_code == 0 assert read_current_branch(repo) == "task/dirty-ok" # The dirty file must still be present (not lost) assert (repo / "a.py").read_text() == "x = 2\n" # ────────────────────────────────────────────────────────────────────────────── # Integration — DIRTY WORKDIR BLEED-THROUGH (regression) # ────────────────────────────────────────────────────────────────────────────── @pytest.fixture() def shared_file_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Repo where ``main`` and ``feat`` share the same file at the same content. Both branches commit ``shared.py`` with identical content. This is the scenario that triggers the bleed-through bug: a dirty ``shared.py`` would not appear in the delta between the two snapshots, so the old code let it through silently instead of refusing the checkout. """ saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "shared.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") # Create feat branch — shared.py is the same on both branches _checkout(tmp_path, "-b", "feat") (tmp_path / "feat_only.py").write_text("f = 1\n") _commit(tmp_path, "-m", "feat commit") _checkout(tmp_path, "main") return tmp_path class TestDirtyWorkdirBleedThrough: """checkout must refuse when the working tree is dirty. Regression: the old ``require_clean_workdir`` only blocked files that the target branch would *overwrite*. Files modified locally but identical on both branches were silently carried through, causing dirty working-tree state to appear on branches the user never touched — exactly the bug that left 80+ modified files on ``main``. The fix: always refuse checkout on any dirty tracked file. Users must explicitly commit, shelf, use ``--autoshelf``, or ``--force``. """ def test_dirty_shared_file_blocks_checkout( self, shared_file_repo: pathlib.Path ) -> None: """A file modified locally but identical on both branches must block checkout.""" (shared_file_repo / "shared.py").write_text("x = 999\n") result = _checkout(shared_file_repo, "feat") assert result.exit_code == 1, ( "checkout must refuse when shared.py is dirty, " "even though main and feat have the same committed version" ) def test_dirty_shared_file_error_names_file( self, shared_file_repo: pathlib.Path ) -> None: """The refusal message must name the dirty file.""" (shared_file_repo / "shared.py").write_text("x = 999\n") result = _checkout(shared_file_repo, "feat") assert "shared.py" in (result.stderr or ""), ( "error message must name the dirty file" ) def test_dirty_shared_file_json_error( self, shared_file_repo: pathlib.Path ) -> None: """JSON mode must emit a machine-readable error, not bleed through.""" (shared_file_repo / "shared.py").write_text("x = 999\n") result = _checkout(shared_file_repo, "feat", "--json") assert result.exit_code == 1 data = json.loads(result.output) assert data["error"] == "dirty_workdir" assert "shared.py" in data["files"] def test_dirty_shared_file_not_silently_moved( self, shared_file_repo: pathlib.Path ) -> None: """After a refused checkout, we must still be on the original branch.""" (shared_file_repo / "shared.py").write_text("x = 999\n") _checkout(shared_file_repo, "feat") assert read_current_branch(shared_file_repo) == "main", ( "failed checkout must not switch the branch" ) def test_deleted_shared_file_blocks_checkout( self, shared_file_repo: pathlib.Path ) -> None: """A tracked file deleted locally but present on both branches must block checkout.""" (shared_file_repo / "shared.py").unlink() result = _checkout(shared_file_repo, "feat") assert result.exit_code == 1 def test_force_bypasses_dirty_check( self, shared_file_repo: pathlib.Path ) -> None: """``--force`` must still bypass the dirty check (discards local changes).""" (shared_file_repo / "shared.py").write_text("x = 999\n") result = _checkout(shared_file_repo, "--force", "feat") assert result.exit_code == 0 assert read_current_branch(shared_file_repo) == "feat" # --force restores the target branch's version, discarding local edit assert (shared_file_repo / "shared.py").read_text() == "x = 1\n" def test_autoshelf_bypasses_dirty_check( self, shared_file_repo: pathlib.Path ) -> None: """``--autoshelf`` must shelve the dirty file and switch cleanly.""" (shared_file_repo / "shared.py").write_text("x = 999\n") result = _checkout(shared_file_repo, "--autoshelf", "feat") assert result.exit_code == 0 assert read_current_branch(shared_file_repo) == "feat" def test_untracked_file_does_not_block_checkout( self, shared_file_repo: pathlib.Path ) -> None: """A brand-new untracked file must never block checkout.""" (shared_file_repo / "new_untracked.py").write_text("new = 1\n") result = _checkout(shared_file_repo, "feat") assert result.exit_code == 0, ( "untracked files are never in any snapshot so checkout must allow them" ) def test_create_branch_allows_dirty_workdir( self, shared_file_repo: pathlib.Path ) -> None: """``-b`` (create) must still succeed with dirty files — no snapshot change.""" (shared_file_repo / "shared.py").write_text("x = 999\n") result = _checkout(shared_file_repo, "-b", "task/new") assert result.exit_code == 0, ( "creating a branch does not change any file content; dirty tree is fine" ) # ────────────────────────────────────────────────────────────────────────────── # Integration — DETACH HEAD # ────────────────────────────────────────────────────────────────────────────── class TestDetach: def test_detach_full_sha_exits_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha) assert result.exit_code == 0 def test_detach_full_sha_text_output(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha) # Output shows sha256: prefix + 8 hex chars — canonical and algorithm-identifying. assert sha[:len("sha256:") + 8] in result.output def test_detach_full_sha_json_schema(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) assert data["action"] == "detached" assert data["branch"] is None assert data["commit_id"] == sha assert data["from_branch"] == "main" assert data.get("dry_run") is False def test_detach_partial_sha_exits_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None # Pass bare hex prefix to checkout — the command resolves it hex_prefix = short_id(sha, strip=True) result = _checkout(repo, hex_prefix) assert result.exit_code == 0 def test_detach_partial_sha_points_to_correct_commit(self, repo: pathlib.Path) -> None: """A partial SHA must resolve to the correct commit, not be treated as a branch.""" from muse.core.refs import read_current_branch from muse.core.commits import get_commits_for_branch (repo / "b.py").write_text("b=1\n") _commit(repo, "-m", "second") branch = read_current_branch(repo) commits = get_commits_for_branch(repo, branch) first_sha = commits[-1].commit_id # oldest # Pass bare hex prefix to checkout — the command resolves it hex_prefix = short_id(first_sha, strip=True) result = _checkout(repo, hex_prefix) assert result.exit_code == 0 assert first_sha[:len("sha256:") + 8] in result.output def test_detach_bad_ref_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "deadbeefdeadbeef") assert result.exit_code == 1 def test_detach_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo, "deadbeefdeadbeef") assert "not a branch" in (result.stderr or "").lower() # ────────────────────────────────────────────────────────────────────────────── # Integration — DETACHED HEAD RECOVERY (checkout branch from detached HEAD) # ────────────────────────────────────────────────────────────────────────────── class TestDetachedHeadRecovery: """Recovering from detached HEAD state via 'muse checkout '.""" @pytest.fixture() def detached_repo(self, repo: pathlib.Path) -> pathlib.Path: """Repo with HEAD detached at the main commit.""" sha = get_head_commit_id(repo, "main") assert sha is not None _checkout(repo, sha) return repo def test_recover_to_branch_exits_0(self, detached_repo: pathlib.Path) -> None: result = _checkout(detached_repo, "main") assert result.exit_code == 0 def test_recover_to_branch_restores_symbolic_head(self, detached_repo: pathlib.Path) -> None: _checkout(detached_repo, "main") assert read_current_branch(detached_repo) == "main" def test_recover_to_branch_text_output(self, detached_repo: pathlib.Path) -> None: result = _checkout(detached_repo, "main") assert "main" in result.output def test_recover_to_branch_json_action(self, detached_repo: pathlib.Path) -> None: result = _checkout(detached_repo, "main", "--json") data = json.loads(result.output) assert data["action"] in ("switched", "already_on") assert data["branch"] == "main" def test_recover_to_branch_from_branch_is_null(self, detached_repo: pathlib.Path) -> None: """from_branch is None when recovering from detached HEAD.""" result = _checkout(detached_repo, "main", "--json") data = json.loads(result.output) assert data["from_branch"] is None def test_create_branch_from_detached_exits_0(self, detached_repo: pathlib.Path) -> None: result = _checkout(detached_repo, "-b", "rescue") assert result.exit_code == 0 def test_create_branch_from_detached_switches_head(self, detached_repo: pathlib.Path) -> None: _checkout(detached_repo, "-b", "rescue") assert read_current_branch(detached_repo) == "rescue" def test_create_branch_from_detached_inherits_commit(self, detached_repo: pathlib.Path) -> None: sha = get_head_commit_id(detached_repo, "main") _checkout(detached_repo, "-b", "rescue") assert get_head_commit_id(detached_repo, "rescue") == sha def test_dry_run_recover_exits_0(self, detached_repo: pathlib.Path) -> None: result = _checkout(detached_repo, "--dry-run", "main") assert result.exit_code == 0 def test_dry_run_recover_does_not_change_head(self, detached_repo: pathlib.Path) -> None: _checkout(detached_repo, "--dry-run", "main") from muse.core.refs import read_head state = read_head(detached_repo) assert state["kind"] == "commit", "HEAD must still be detached after dry-run" def test_merge_flag_from_detached_exits_1(self, detached_repo: pathlib.Path) -> None: result = _checkout(detached_repo, "--merge", "main") assert result.exit_code == 1 def test_merge_flag_from_detached_stderr_message(self, detached_repo: pathlib.Path) -> None: result = _checkout(detached_repo, "--merge", "main") assert "detach" in (result.stderr or "").lower() or "branch" in (result.stderr or "").lower() # ────────────────────────────────────────────────────────────────────────────── # Integration — DRY-RUN # ────────────────────────────────────────────────────────────────────────────── class TestDryRun: def test_dry_run_switch_exits_0(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat") assert result.exit_code == 0 def test_dry_run_does_not_switch_branch(self, two_branch_repo: pathlib.Path) -> None: _checkout(two_branch_repo, "--dry-run", "feat") assert read_current_branch(two_branch_repo) == "main" def test_dry_run_text_says_would(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat") assert "Would" in result.output assert "feat" in result.output def test_dry_run_json_schema(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "switched" assert data["branch"] == "feat" assert data["from_branch"] == "main" def test_dry_run_does_not_restore_files(self, two_branch_repo: pathlib.Path) -> None: """feat.py exists only on feat branch; dry-run must not create it on main.""" _checkout(two_branch_repo, "--dry-run", "feat") assert not (two_branch_repo / "feat.py").exists() def test_dry_run_create_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "dry-branch", "--dry-run") assert result.exit_code == 0 def test_dry_run_create_does_not_create_branch(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "dry-branch", "--dry-run") result = _invoke(repo, ["branch", "--json"]) names = [b["name"] for b in json.loads(result.output)] assert "dry-branch" not in names def test_dry_run_create_json_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "dry-new", "--dry-run", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "created" assert data["from_branch"] == "main" def test_dry_run_detach_exits_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, "--dry-run", sha) assert result.exit_code == 0 def test_dry_run_detach_does_not_detach(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None _checkout(repo, "--dry-run", sha) assert read_current_branch(repo) == "main" def test_dry_run_detach_json(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, "--dry-run", sha, "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "detached" assert data["branch"] is None def test_dry_run_nonexistent_branch_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--dry-run", "no-such-branch") assert result.exit_code == 1 def test_dry_run_already_on_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--dry-run", "main") assert result.exit_code == 0 def test_dry_run_already_on_json(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--dry-run", "main", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "already_on" # ────────────────────────────────────────────────────────────────────────────── # Integration — JSON schema consistency # ────────────────────────────────────────────────────────────────────────────── class TestJsonSchema: REQUIRED_KEYS = {"action", "branch", "commit_id", "from_branch", "dry_run"} def test_create_has_all_keys(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "k-test", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'created' JSON: {missing}" def test_switch_has_all_keys(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'switched' JSON: {missing}" def test_already_on_has_all_keys(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'already_on' JSON: {missing}" def test_detach_has_all_keys(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'detached' JSON: {missing}" def test_detach_branch_is_null(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) assert data["branch"] is None def test_from_branch_reflects_previous(self, two_branch_repo: pathlib.Path) -> None: _checkout(two_branch_repo, "feat") result = _checkout(two_branch_repo, "main", "--json") data = json.loads(result.output) assert data["from_branch"] == "feat" # ────────────────────────────────────────────────────────────────────────────── # Integration — validation # ────────────────────────────────────────────────────────────────────────────── class TestValidation: def test_no_target_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo) assert result.exit_code == 1 def test_no_target_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo) assert "Specify" in (result.stderr or "") def test_unknown_flag_exits_nonzero(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--no-such-flag") assert result.exit_code != 0 def test_ours_without_theirs_context_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "file.py") assert result.exit_code == 1 def test_ours_and_theirs_together_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "--theirs", "--all") assert result.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: def _has_ansi(self, s: str) -> bool: return "\x1b[" in s def test_ansi_in_target_sanitized(self, repo: pathlib.Path) -> None: result = _checkout(repo, "\x1b[31mmalicious\x1b[0m") assert not self._has_ansi(result.output) def test_ansi_in_create_name_sanitized(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "\x1b[31mmalicious\x1b[0m") assert not self._has_ansi(result.output) def test_error_not_a_branch_sanitized(self, repo: pathlib.Path) -> None: """The 'not a branch' error message must not echo raw ANSI from target.""" result = _checkout(repo, "\x1b[31mnotabranch\x1b[0m") assert not self._has_ansi(result.output) assert not self._has_ansi(result.stderr or "") def test_all_errors_to_stderr(self, repo: pathlib.Path) -> None: """Every ❌ error must go to stderr; stderr must contain the error.""" error_cases = [ ["ghost"], ["-b", "bad..name"], ] for case in error_cases: result = _checkout(repo, *case) assert result.exit_code != 0, f"Expected failure for args {case}" assert "❌" in (result.stderr or ""), ( f"Error not in stderr for args {case}: stderr={result.stderr!r}" ) # ────────────────────────────────────────────────────────────────────────────── # Integration — conflict resolution # ────────────────────────────────────────────────────────────────────────────── class TestConflictResolution: def _setup_merge_conflict( self, repo: pathlib.Path ) -> tuple[str, str]: """Create a merge conflict on ``repo``. Returns (ours_commit, theirs_commit).""" # ours: commit on main (repo / "shared.py").write_text("x = 1\n") _commit(repo, "-m", "main: set x=1") ours_cid = get_head_commit_id(repo, "main") or "" # theirs: commit on feature branch _branch(repo, "feat2") _invoke(repo, ["checkout", "feat2"]) (repo / "shared.py").write_text("x = 2\n") _commit(repo, "-m", "feat: set x=2") theirs_cid = get_head_commit_id(repo, "feat2") or "" _invoke(repo, ["checkout", "main"]) # Force a merge conflict via merge_engine internals from muse.core.merge_engine import write_merge_state write_merge_state( repo, base_commit="", ours_commit=ours_cid, theirs_commit=theirs_cid, conflict_paths=["shared.py"], other_branch="feat2", ) return ours_cid, theirs_cid def test_ours_no_merge_state_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "file.py") assert result.exit_code == 1 def test_theirs_no_merge_state_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--theirs", "file.py") assert result.exit_code == 1 def test_ours_and_theirs_both_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "--theirs", "--all") assert result.exit_code == 1 def test_ours_resolves_conflict(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "shared.py") assert result.exit_code == 0 def test_theirs_resolves_conflict(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--theirs", "shared.py") assert result.exit_code == 0 def test_resolve_all_ours_json(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "--all", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["action"] == "conflict_resolved_all" assert data["side"] == "ours" assert "resolved_count" in data assert "remaining_conflicts" in data def test_resolve_all_theirs_json(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--theirs", "--all", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["action"] == "conflict_resolved_all" assert data["side"] == "theirs" def test_resolve_single_file_json(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "shared.py", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["action"] == "conflict_resolved" assert data["file"] == "shared.py" assert data["side"] == "ours" assert "remaining_conflicts" in data def test_resolve_all_empty_conflicts_exits_0(self, repo: pathlib.Path) -> None: """--ours --all when no conflicts exist still exits 0.""" from muse.core.merge_engine import write_merge_state ours = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit="", ours_commit=ours, theirs_commit=ours, conflict_paths=[], other_branch="feat", ) result = _checkout(repo, "--ours", "--all") assert result.exit_code == 0 def test_resolve_nonexistent_path_exits_0(self, repo: pathlib.Path) -> None: """A path not in the conflict list is informational, not an error.""" self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "not_conflicted.py") assert result.exit_code == 0 def test_missing_ours_theirs_without_all_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours") assert result.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Stress # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStress: def test_checkout_100_file_branch_fast(self, repo: pathlib.Path) -> None: """Switching between branches with 100 modified files under 2s.""" for i in range(100): (repo / f"f{i:03d}.py").write_text(f"x={i}\n") _commit(repo, "-m", "big main") _branch(repo, "big-alt") _checkout(repo, "big-alt") for i in range(100): (repo / f"f{i:03d}.py").write_text(f"y={i}\n") _commit(repo, "-m", "big alt") _checkout(repo, "main") t0 = time.perf_counter() result = _checkout(repo, "big-alt") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 2000, f"checkout 100-file branch took {elapsed:.0f}ms (limit 2s)" def test_dry_run_100_file_branch_fast(self, repo: pathlib.Path) -> None: """dry-run on 100-file branch should be very fast (no restore).""" for i in range(100): (repo / f"g{i:03d}.py").write_text(f"x={i}\n") _commit(repo, "-m", "big2") _branch(repo, "big2-alt") t0 = time.perf_counter() result = _checkout(repo, "--dry-run", "big2-alt") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 500, f"dry-run took {elapsed:.0f}ms (limit 500ms)" def test_concurrent_checkouts_separate_repos(self, tmp_path: pathlib.Path) -> None: """Multiple threads checking out branches in separate repos must not interfere.""" errors: list[str] = [] def do_checkout(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) (repo_dir / "x.py").write_text(f"x={idx}\n") subprocess.run( ["muse", "commit", "-m", f"base{idx}"], cwd=str(repo_dir), capture_output=True, ) subprocess.run( ["muse", "branch", "alt"], cwd=str(repo_dir), capture_output=True ) subprocess.run( ["muse", "checkout", "alt"], cwd=str(repo_dir), capture_output=True ) (repo_dir / "y.py").write_text(f"y={idx}\n") subprocess.run( ["muse", "commit", "-m", f"alt{idx}"], cwd=str(repo_dir), capture_output=True, ) r = subprocess.run( ["muse", "checkout", "main", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) if r.returncode != 0: errors.append(f"repo_{idx}: checkout failed") return data = json.loads(r.stdout) if data.get("action") != "switched": errors.append(f"repo_{idx}: expected switched, got {data.get('action')}") threads = [threading.Thread(target=do_checkout, args=(i,)) for i in range(6)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent checkout errors:\n{'\n'.join(errors)}" def test_repeated_back_and_forth_100_times(self, two_branch_repo: pathlib.Path) -> None: """Switching back and forth 100 times must not corrupt the working tree.""" for i in range(50): r1 = _checkout(two_branch_repo, "feat") assert r1.exit_code == 0, f"Iteration {i}: switch to feat failed" assert (two_branch_repo / "feat.py").exists() r2 = _checkout(two_branch_repo, "main") assert r2.exit_code == 0, f"Iteration {i}: switch to main failed" # ────────────────────────────────────────────────────────────────────────────── # TestCheckoutMerge — muse checkout -m (Cohen Transform carry-forward) # ────────────────────────────────────────────────────────────────────────────── def _make_diverged_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Repo with main and *other* branches that have diverged file content. Layout after setup:: main: shared.py = "line1\\nline2\\nline3\\n" (committed) other: shared.py = "line1\\nLINE2\\nline3\\n" (committed — different line 2) The caller is left on *main* with a dirty working tree. """ saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "shared.py").write_text("line1\nline2\nline3\n") _commit(tmp_path, "-m", "initial") # Create other branch with a different version of shared.py _branch(tmp_path, "other") _checkout(tmp_path, "other") (tmp_path / "shared.py").write_text("line1\nLINE2\nline3\n") _commit(tmp_path, "-m", "other changes line2") # Back on main _checkout(tmp_path, "main") return tmp_path class TestCheckoutMergeParser: """Parser-level tests for the ``-m`` / ``--merge`` flag.""" def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.checkout import register parser = argparse.ArgumentParser() sub = parser.add_subparsers() register(sub) return parser.parse_args(["checkout", *args]) def test_merge_short_flag_parsed(self) -> None: ns = self._parse("-m", "feat") assert ns.merge is True def test_merge_long_flag_parsed(self) -> None: ns = self._parse("--merge", "feat") assert ns.merge is True def test_merge_false_by_default(self) -> None: ns = self._parse("feat") assert ns.merge is False def test_merge_and_dry_run_coexist(self) -> None: ns = self._parse("-m", "--dry-run", "feat") assert ns.merge is True assert ns.dry_run is True def test_merge_and_json_coexist(self) -> None: ns = self._parse("-m", "--json", "feat") assert ns.merge is True assert ns.json_out is True class TestCheckoutMergeClean: """Clean-merge scenarios — no conflict markers should appear.""" def test_untracked_file_survives_checkout(self, repo: pathlib.Path) -> None: """Untracked files must not be disturbed by -m checkout.""" _branch(repo, "feat") (repo / "untracked.txt").write_text("I am untracked\n") r = _checkout(repo, "-m", "feat") assert r.exit_code == 0 assert (repo / "untracked.txt").read_text() == "I am untracked\n" def test_ours_only_change_carried_cleanly(self, tmp_path: pathlib.Path) -> None: """When we modify a file that target branch left untouched, the change carries. 'clean' branch diverged by adding a brand-new file, leaving shared.py identical to main's HEAD. Our uncommitted change to shared.py has no competition from the target and must merge cleanly. """ saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "shared.py").write_text("line1\nline2\nline3\n") _commit(tmp_path, "-m", "initial") # Create 'clean' branch — only adds a new file, does NOT touch shared.py _branch(tmp_path, "clean") _checkout(tmp_path, "clean") (tmp_path / "extra.py").write_text("# extra\n") _commit(tmp_path, "-m", "add extra.py") _checkout(tmp_path, "main") # Dirty workdir on main: modify shared.py (tmp_path / "shared.py").write_text("line1\nline2\nLINE3\n") r = _checkout(tmp_path, "-m", "clean") assert r.exit_code == 0 content = (tmp_path / "shared.py").read_text() assert "LINE3" in content, "Our uncommitted change must be in merged result" def test_clean_merge_json_output(self, tmp_path: pathlib.Path) -> None: """JSON output reports clean_merges and empty conflicts on success.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "shared.py").write_text("line1\nline2\nline3\n") _commit(tmp_path, "-m", "initial") _branch(tmp_path, "clean") _checkout(tmp_path, "clean") (tmp_path / "extra.py").write_text("# extra\n") _commit(tmp_path, "-m", "add extra.py") _checkout(tmp_path, "main") (tmp_path / "shared.py").write_text("line1\nline2\nLINE3\n") r = _checkout(tmp_path, "-m", "--json", "clean") assert r.exit_code == 0 data = json.loads(r.output) assert data["action"] == "switched" assert data["branch"] == "clean" assert isinstance(data["clean_merges"], list) assert isinstance(data["conflicts"], list) assert len(data["conflicts"]) == 0 def test_switched_branch_recorded(self, tmp_path: pathlib.Path) -> None: """After a clean -m checkout the current branch is the target.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nline2\nLINE3\n") _checkout(repo, "-m", "other") assert read_current_branch(repo) == "other" def test_no_dirty_files_succeeds_silently(self, repo: pathlib.Path) -> None: """If the working tree is clean, -m behaves like a normal checkout.""" _branch(repo, "feat") r = _checkout(repo, "-m", "feat") assert r.exit_code == 0 assert read_current_branch(repo) == "feat" def test_dry_run_does_not_switch_branch(self, tmp_path: pathlib.Path) -> None: """-m --dry-run must not actually switch branches.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nline2\nLINE3\n") r = _checkout(repo, "-m", "--dry-run", "other") assert r.exit_code == 0 assert read_current_branch(repo) == "main" def test_dry_run_json_reports_dry_run_true(self, tmp_path: pathlib.Path) -> None: repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nline2\nLINE3\n") r = _checkout(repo, "-m", "--dry-run", "--json", "other") assert r.exit_code == 0 data = json.loads(r.output) assert data["dry_run"] is True assert data["branch"] == "other" class TestCheckoutMergeConflict: """Conflict scenarios — conflict markers and MERGE_STATE must appear.""" def test_conflicting_change_exits_1(self, tmp_path: pathlib.Path) -> None: """Same line changed on both sides → conflict → exit code 1.""" repo = _make_diverged_repo(tmp_path) # Also change line2 on main (uncommitted) — same line other branch changed (repo / "shared.py").write_text("line1\nOURS_LINE2\nline3\n") r = _checkout(repo, "-m", "other") assert r.exit_code == 1 def test_conflict_markers_written_to_file(self, tmp_path: pathlib.Path) -> None: """Conflicting file must contain diff3-style conflict markers after -m.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nOURS_LINE2\nline3\n") _checkout(repo, "-m", "other") content = (repo / "shared.py").read_text() assert "<<<<<<<" in content assert "=======" in content assert ">>>>>>>" in content def test_conflict_markers_contain_cohen_action_labels(self, tmp_path: pathlib.Path) -> None: """Cohen-style action labels ([modified], [inserted], [deleted]) must appear.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nOURS_LINE2\nline3\n") _checkout(repo, "-m", "other") content = (repo / "shared.py").read_text() # At least one action label must be present assert any(label in content for label in ("[modified]", "[inserted]", "[deleted]")) def test_merge_state_written_on_conflict(self, tmp_path: pathlib.Path) -> None: """MERGE_STATE.json must exist after a conflicting -m checkout.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nOURS_LINE2\nline3\n") _checkout(repo, "-m", "other") merge_state_file = merge_state_path(repo) assert merge_state_file.exists() def test_merge_state_lists_conflict_path(self, tmp_path: pathlib.Path) -> None: """MERGE_STATE.json must name the conflicting path.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nOURS_LINE2\nline3\n") _checkout(repo, "-m", "other") data = json.loads((merge_state_path(repo)).read_text()) assert "shared.py" in data.get("conflict_paths", []) def test_conflict_json_output_contains_conflicts_list(self, tmp_path: pathlib.Path) -> None: """JSON output must list the conflict paths even on exit code 1.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nOURS_LINE2\nline3\n") r = _checkout(repo, "-m", "--json", "other") data = json.loads(r.output) assert "conflicts" in data assert len(data["conflicts"]) >= 1 def test_branch_is_switched_despite_conflict(self, tmp_path: pathlib.Path) -> None: """Even when conflicts exist, we are on the target branch after -m.""" repo = _make_diverged_repo(tmp_path) (repo / "shared.py").write_text("line1\nOURS_LINE2\nline3\n") _checkout(repo, "-m", "other") assert read_current_branch(repo) == "other" class TestCheckoutMergeErrors: """Error cases for -m flag.""" def test_merge_with_create_flag_ignored(self, repo: pathlib.Path) -> None: """``-m -b new-branch`` must NOT invoke _checkout_with_merge (target doesn't exist).""" # -m with -b falls through to regular create logic; no conflict-carry r = _checkout(repo, "-m", "-b", "new-branch") # Should succeed as a regular branch creation (no carry logic for new branches) assert r.exit_code == 0 def test_merge_missing_branch_errors(self, repo: pathlib.Path) -> None: """``-m nonexistent`` must exit 1 with an error message.""" r = _checkout(repo, "-m", "nonexistent") assert r.exit_code == 1 def test_merge_already_on_branch_is_noop(self, repo: pathlib.Path) -> None: """``-m main`` when already on main must print 'Already on' and exit 0.""" r = _checkout(repo, "-m", "main") assert r.exit_code == 0 assert "Already on" in r.output or "already" in r.output.lower() def test_merge_with_invalid_branch_name_errors(self, repo: pathlib.Path) -> None: """``-m ..invalid`` must exit 1 before attempting any merge.""" r = _checkout(repo, "-m", "../bad/name") assert r.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Non-conflicting carry-forward: checkout with dirty files that the # target branch does not touch should succeed without --merge or --autoshelf. # # This is the core ergonomics fix: an agent editing app.py on dev, then doing # checkout always refuses a dirty tracked file regardless of whether the target # branch shares the same committed version — see TestDirtyWorkdirBleedThrough. # Untracked files are never blocked (they are not in any snapshot). # ────────────────────────────────────────────────────────────────────────────── class TestSwitchWithNonConflictingChanges: @pytest.fixture() def two_branch_clean(self, tmp_path: pathlib.Path) -> pathlib.Path: """Repo with main and feat; feat adds a NEW file, leaves shared.py alone.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "shared.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") # Create feat branch with an extra file — shared.py is untouched _checkout(tmp_path, "-b", "feat") (tmp_path / "feat_only.py").write_text("f = 1\n") _commit(tmp_path, "-m", "feat commit") _checkout(tmp_path, "main") return tmp_path def test_switch_still_blocks_on_true_conflict( self, tmp_path: pathlib.Path ) -> None: """When the target branch has a different version of a modified file, block.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "shared.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") # feat branch changes shared.py _checkout(tmp_path, "-b", "feat") (tmp_path / "shared.py").write_text("x = feat\n") _commit(tmp_path, "-m", "feat changes shared") _checkout(tmp_path, "main") # Now dirty shared.py locally — feat has a different version → must block (tmp_path / "shared.py").write_text("x = local\n") result = _checkout(tmp_path, "feat") assert result.exit_code != 0 def test_switch_new_file_carries_through( self, two_branch_clean: pathlib.Path ) -> None: """Brand-new untracked files always carry through (unchanged behaviour).""" repo = two_branch_clean (repo / "new_file.py").write_text("new = True\n") result = _checkout(repo, "feat") assert result.exit_code == 0 assert (repo / "new_file.py").exists() # ────────────────────────────────────────────────────────────────────────────── # Agent supercharge — duration_ms and exit_code in every JSON output # ────────────────────────────────────────────────────────────────────────────── class TestElapsed: """Every JSON output path must include an ``duration_ms`` float.""" def test_switch_json_has_elapsed(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat", "--json") data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_create_json_has_elapsed(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "elapsed-branch", "--json") data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_already_on_json_has_elapsed(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--json") data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_detach_json_has_elapsed(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_dry_run_switch_json_has_elapsed(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat", "--json") data = json.loads(result.output) assert "duration_ms" in data def test_dry_run_create_json_has_elapsed(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "dry-elapsed", "--dry-run", "--json") data = json.loads(result.output) assert "duration_ms" in data def test_dry_run_detach_json_has_elapsed(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, "--dry-run", sha, "--json") data = json.loads(result.output) assert "duration_ms" in data def test_restored_json_has_elapsed(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main", "--json") data = json.loads(result.output) assert "duration_ms" in data def test_conflict_resolved_all_json_has_elapsed(self, repo: pathlib.Path) -> None: from muse.core.merge_engine import write_merge_state ours = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit="", ours_commit=ours, theirs_commit=ours, conflict_paths=["a.py"], other_branch="feat", ) result = _checkout(repo, "--ours", "--all", "--json") data = json.loads(result.output) assert "duration_ms" in data def test_conflict_resolved_single_json_has_elapsed(self, repo: pathlib.Path) -> None: from muse.core.merge_engine import write_merge_state from muse.core.refs import get_head_commit_id as _gci ours = _gci(repo, "main") or "" write_merge_state( repo, base_commit="", ours_commit=ours, theirs_commit=ours, conflict_paths=["a.py"], other_branch="feat", ) result = _checkout(repo, "--ours", "a.py", "--json") data = json.loads(result.output) assert "duration_ms" in data class TestExitCode: """Every successful JSON output path must include ``exit_code: 0``.""" def test_switch_json_exit_code_0(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_create_json_exit_code_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "ec-branch", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_already_on_json_exit_code_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_detach_json_exit_code_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_dry_run_switch_json_exit_code_0(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_dry_run_create_json_exit_code_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "dry-ec", "--dry-run", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_dry_run_detach_json_exit_code_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, "--dry-run", sha, "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_restored_json_exit_code_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_conflict_resolved_all_json_exit_code_0(self, repo: pathlib.Path) -> None: from muse.core.merge_engine import write_merge_state ours = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit="", ours_commit=ours, theirs_commit=ours, conflict_paths=["a.py"], other_branch="feat", ) result = _checkout(repo, "--ours", "--all", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 def test_conflict_resolved_single_json_exit_code_0(self, repo: pathlib.Path) -> None: from muse.core.merge_engine import write_merge_state from muse.core.refs import get_head_commit_id as _gci ours = _gci(repo, "main") or "" write_merge_state( repo, base_commit="", ours_commit=ours, theirs_commit=ours, conflict_paths=["a.py"], other_branch="feat", ) result = _checkout(repo, "--ours", "a.py", "--json") data = json.loads(result.output) assert data["exit_code"] == 0 class TestJsonSchemaComplete: """``duration_ms`` and ``exit_code`` must be in ``REQUIRED_KEYS``.""" REQUIRED_KEYS = { "action", "branch", "commit_id", "from_branch", "dry_run", "duration_ms", "exit_code", } def test_switch_has_complete_schema(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'switched' JSON: {missing}" def test_create_has_complete_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "schema-branch", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'created' JSON: {missing}" def test_detach_has_complete_schema(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'detached' JSON: {missing}" def test_already_on_has_complete_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'already_on' JSON: {missing}" # ────────────────────────────────────────────────────────────────────────────── # checkout -b --intent / --resumable # ────────────────────────────────────────────────────────────────────────────── class TestCheckoutCreateWithMeta: """``muse checkout -b --intent --resumable`` stores metadata.""" # ── Parser ──────────────────────────────────────────────────────────────── def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.checkout import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["checkout", *args]) def test_intent_flag_parsed(self) -> None: ns = self._parse("-b", "task/x", "--intent", "do the thing") assert ns.intent == "do the thing" def test_resumable_flag_parsed(self) -> None: ns = self._parse("-b", "task/x", "--resumable") assert ns.resumable is True def test_intent_default_none(self) -> None: ns = self._parse("main") assert ns.intent is None def test_resumable_default_false(self) -> None: ns = self._parse("main") assert ns.resumable is False def test_intent_without_create_is_error(self, repo: pathlib.Path) -> None: """--intent without -b should be rejected.""" result = _checkout(repo, "main", "--intent", "oops") assert result.exit_code != 0 def test_resumable_without_create_is_error(self, repo: pathlib.Path) -> None: """--resumable without -b should be rejected.""" result = _checkout(repo, "main", "--resumable") assert result.exit_code != 0 # ── Integration: intent stored ───────────────────────────────────────── def test_intent_stored_in_branch_meta(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "task/work", "--intent", "implement auth") assert result.exit_code == 0 meta = read_branch_meta(repo, "task/work") assert meta.get("intent") == "implement auth" def test_resumable_stored_in_branch_meta(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "task/work", "--resumable") assert result.exit_code == 0 meta = read_branch_meta(repo, "task/work") assert meta.get("resumable") is True def test_intent_and_resumable_together(self, repo: pathlib.Path) -> None: result = _checkout( repo, "-b", "task/work", "--intent", "add feature", "--resumable" ) assert result.exit_code == 0 meta = read_branch_meta(repo, "task/work") assert meta.get("intent") == "add feature" assert meta.get("resumable") is True def test_branch_switched_after_create_with_meta(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "task/work", "--intent", "x", "--resumable") assert read_current_branch(repo) == "task/work" def test_no_metadata_when_flags_absent(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "task/plain") meta = read_branch_meta(repo, "task/plain") assert meta.get("intent") is None assert not meta.get("resumable") def test_intent_only_no_resumable_set(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "task/work", "--intent", "just intent") meta = read_branch_meta(repo, "task/work") assert meta.get("intent") == "just intent" assert not meta.get("resumable") def test_resumable_only_no_intent_set(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "task/work", "--resumable") meta = read_branch_meta(repo, "task/work") assert meta.get("resumable") is True assert meta.get("intent") is None # ── JSON output still correct ────────────────────────────────────────── def test_json_action_is_created(self, repo: pathlib.Path) -> None: result = _checkout( repo, "-b", "task/work", "--intent", "x", "--resumable", "--json" ) assert result.exit_code == 0 data = json.loads(result.output) assert data["action"] == "created" def test_json_branch_name_correct(self, repo: pathlib.Path) -> None: result = _checkout( repo, "-b", "task/work", "--intent", "x", "--json" ) data = json.loads(result.output) assert data["branch"] == "task/work" # ── branch --json listing reflects metadata ──────────────────────────── def test_branch_list_json_shows_intent(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "task/work", "--intent", "my intent") result = _invoke(repo, ["branch", "--json"]) branches = json.loads(result.output) entry = next(b for b in branches if b["name"] == "task/work") assert entry.get("intent") == "my intent" def test_branch_list_json_shows_resumable(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "task/work", "--resumable") result = _invoke(repo, ["branch", "--json"]) branches = json.loads(result.output) entry = next(b for b in branches if b["name"] == "task/work") assert entry.get("resumable") is True def test_resumable_filter_finds_branch(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "task/work", "--resumable") _checkout(repo, "main") _checkout(repo, "-b", "task/plain") result = _invoke(repo, ["branch", "--resumable", "--json"]) names = [b["name"] for b in json.loads(result.output)] assert "task/work" in names assert "task/plain" not in names # ── Security: ANSI in intent ─────────────────────────────────────────── def test_ansi_in_intent_sanitized_in_text_output(self, repo: pathlib.Path) -> None: malicious = "\x1b[31mmalicious\x1b[0m" result = _checkout(repo, "-b", "task/work", "--intent", malicious) assert result.exit_code == 0 assert "\x1b" not in result.output # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.checkout import register as _register_checkout from muse.core.paths import merge_state_path def _parse_checkout(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_checkout(subs) return root_p.parse_args(["checkout", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_checkout("dev") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_checkout("dev", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_checkout("dev", "-j") assert ns.json_out is True def test_create_branch_flag(self) -> None: ns = _parse_checkout("-b", "task/foo") assert ns.create is True def test_force_flag(self) -> None: ns = _parse_checkout("dev", "--force") assert ns.force is True def test_f_shorthand_for_force(self) -> None: ns = _parse_checkout("dev", "-f") assert ns.force is True def test_dry_run_flag(self) -> None: ns = _parse_checkout("dev", "--dry-run") assert ns.dry_run is True def test_n_shorthand_for_dry_run(self) -> None: ns = _parse_checkout("dev", "-n") assert ns.dry_run is True # ────────────────────────────────────────────────────────────────────────────── # Regression — autoshelf must NOT bleed committed task-branch changes back # ────────────────────────────────────────────────────────────────────────────── @pytest.fixture() def task_branch_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Repo that reproduces the autoshelf phantom-modification bug. Layout ------ main committed.py = "base\n" dirty.py = "base\n" task (branched from main) committed.py = "task-committed\n" ← committed on task branch dirty.py = "dirty-edit\n" ← NOT committed (working-tree only) This is the exact shape that triggered the bug: after ``muse checkout main --autoshelf``, committed.py was appearing as "modified" on main even though no one edited it there. """ saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "committed.py").write_text("base\n") (tmp_path / "dirty.py").write_text("base\n") _commit(tmp_path, "-m", "initial") _checkout(tmp_path, "-b", "task") (tmp_path / "committed.py").write_text("task-committed\n") _commit(tmp_path, "-m", "task: update committed.py") # Leave dirty.py modified but NOT committed — this is the legitimate dirt. (tmp_path / "dirty.py").write_text("dirty-edit\n") return tmp_path class TestAutoshelfDirtyIsolation: """autoshelf must only restore files that were actually dirty (unstaged/uncommitted). Regression: the old implementation saved the full working-tree snapshot and applied it verbatim on the target branch. Committed-on-source changes bled into the target working tree as phantom modifications, causing merge to refuse with "your local changes would be overwritten". The invariant: after ``checkout --autoshelf``, only files that were genuinely uncommitted on the source branch should appear as modified on the target. Files that were committed on the source (but not yet on the target) are the merge's job — not the shelf's. """ def test_committed_source_file_not_in_working_tree_after_autoshelf( self, task_branch_repo: pathlib.Path, ) -> None: """committed.py was committed on task — it must NOT appear modified on main.""" result = _checkout(task_branch_repo, "main", "--autoshelf") assert result.exit_code == 0, result.stderr assert read_current_branch(task_branch_repo) == "main" content = (task_branch_repo / "committed.py").read_text() assert content == "base\n", ( "committed.py must reflect main HEAD after autoshelf — " f"got {content!r}, expected 'base\\n'. " "The task-branch committed version must NOT be restored by the shelf." ) def test_dirty_file_is_restored_after_autoshelf( self, task_branch_repo: pathlib.Path, ) -> None: """dirty.py was uncommitted on task — it MUST be restored on main.""" _checkout(task_branch_repo, "main", "--autoshelf") content = (task_branch_repo / "dirty.py").read_text() assert content == "dirty-edit\n", ( "dirty.py (uncommitted working-tree change) must survive the autoshelf " f"round-trip — got {content!r}, expected 'dirty-edit\\n'." ) def test_working_tree_clean_except_dirty_file_after_autoshelf( self, task_branch_repo: pathlib.Path, ) -> None: """After autoshelf to main, only dirty.py should be modified — nothing else.""" _checkout(task_branch_repo, "main", "--autoshelf") result = _invoke(task_branch_repo, ["status", "--json"]) assert result.exit_code == 0, result.stderr data = json.loads(result.output) modified = set(data.get("modified", [])) assert modified == {"dirty.py"}, ( f"Only dirty.py should be modified after autoshelf — got {modified}. " "committed.py is a phantom: it was committed on task, belongs to the " "merge, and must not bleed into the working tree via the shelf." )