"""Tests for ``muse checkout``. Coverage tiers -------------- Unit — parser flags, dead-code removal, docstring schema. Integration — switch, create, already_on, detach, --dry-run, conflict resolution. End-to-end — full CLI invocations: text and JSON output, all operations. Security — ANSI injection in target, error routing to stderr. Stress — checkout under high file counts, concurrent checkouts. """ from __future__ import annotations import json import os import pathlib import subprocess import threading import time from typing import TYPE_CHECKING import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.store import get_head_commit_id, read_current_branch if TYPE_CHECKING: import argparse runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _checkout(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["checkout", *extra]) def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["commit", *extra]) def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["branch", *extra]) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one commit on ``main``.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "a.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") return tmp_path @pytest.fixture() def two_branch_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with ``main`` and ``feat`` branches, each with unique content.""" _branch(repo, "feat") _checkout(repo, "feat") (repo / "feat.py").write_text("f = 1\n") _commit(repo, "-m", "feat commit") _checkout(repo, "main") return repo # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.checkout import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["checkout", *args]) def test_default_fmt_is_text(self) -> None: ns = self._parse("main") assert ns.fmt == "text" def test_json_flag_sets_fmt(self) -> None: ns = self._parse("main", "--json") assert ns.fmt == "json" def test_format_json_flag(self) -> None: ns = self._parse("main", "--format", "json") assert ns.fmt == "json" def test_create_flag(self) -> None: ns = self._parse("-b", "new") assert ns.create is True def test_force_flag(self) -> None: ns = self._parse("main", "--force") assert ns.force is True def test_force_short_flag(self) -> None: ns = self._parse("main", "-f") assert ns.force is True def test_dry_run_flag(self) -> None: ns = self._parse("main", "--dry-run") assert ns.dry_run is True def test_dry_run_short_flag(self) -> None: ns = self._parse("main", "-n") assert ns.dry_run is True def test_dry_run_default_false(self) -> None: ns = self._parse("main") assert ns.dry_run is False def test_ours_flag(self) -> None: ns = self._parse("--ours", "file.py") assert ns.resolve_ours is True def test_theirs_flag(self) -> None: ns = self._parse("--theirs", "file.py") assert ns.resolve_theirs is True def test_all_flag(self) -> None: ns = self._parse("--ours", "--all") assert ns.resolve_all is True def test_target_optional(self) -> None: ns = self._parse() assert ns.target is None # ────────────────────────────────────────────────────────────────────────────── # Unit — dead-code removal # ────────────────────────────────────────────────────────────────────────────── class TestDeadCodeRemoved: def test_read_current_branch_wrapper_removed(self) -> None: import muse.cli.commands.checkout as m assert not hasattr(m, "_read_current_branch"), ( "_read_current_branch was a dead one-liner wrapper and must be deleted" ) def test_inline_sanitize_display_import_removed(self) -> None: import inspect import muse.cli.commands.checkout as m src = inspect.getsource(m.run) assert "sanitize_display as _sd" not in src, ( "The inline 'from muse.core.validation import sanitize_display as _sd' " "inside run() was a redundant re-import of the module-level sanitize_display" ) # ────────────────────────────────────────────────────────────────────────────── # Integration — SWITCH (existing branch) # ────────────────────────────────────────────────────────────────────────────── class TestSwitch: def test_switch_exits_0(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat") assert result.exit_code == 0 def test_switch_changes_branch(self, two_branch_repo: pathlib.Path) -> None: _checkout(two_branch_repo, "feat") assert read_current_branch(two_branch_repo) == "feat" def test_switch_text_output(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat") assert "feat" in result.output assert "Switched" in result.output def test_switch_json_schema(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat", "--json") data = json.loads(result.output) assert data["action"] == "switched" assert data["branch"] == "feat" assert data["from_branch"] == "main" assert "commit_id" in data assert data.get("dry_run") is False def test_switch_restores_files(self, two_branch_repo: pathlib.Path) -> None: """Files unique to ``feat`` appear after checkout and disappear on return.""" _checkout(two_branch_repo, "feat") assert (two_branch_repo / "feat.py").exists() _checkout(two_branch_repo, "main") assert not (two_branch_repo / "feat.py").exists() def test_switch_to_nonexistent_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "does-not-exist") assert result.exit_code == 1 def test_switch_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo, "ghost") assert result.exit_code == 1 # Error must appear in stderr, not exclusively stdout assert "not a branch" in (result.stderr or "").lower() # ────────────────────────────────────────────────────────────────────────────── # Integration — ALREADY_ON # ────────────────────────────────────────────────────────────────────────────── class TestAlreadyOn: def test_already_on_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main") assert result.exit_code == 0 def test_already_on_text(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main") assert "Already on" in result.output def test_already_on_json_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--json") data = json.loads(result.output) assert data["action"] == "already_on" assert data["branch"] == "main" assert data["from_branch"] == "main" assert "commit_id" in data # ────────────────────────────────────────────────────────────────────────────── # Integration — FORCE on current branch (working-tree restore) # ────────────────────────────────────────────────────────────────────────────── class TestForceOnCurrentBranch: """checkout --force must restore the working tree to HEAD. Regression: previously this was a no-op ('Already on main') regardless of --force. Git's behaviour is to restore missing/modified tracked files even when the branch is already current. """ def test_force_current_branch_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main") assert result.exit_code == 0 def test_force_current_branch_restores_deleted_file(self, repo: pathlib.Path) -> None: (repo / "a.py").unlink() assert not (repo / "a.py").exists() _checkout(repo, "--force", "main") assert (repo / "a.py").exists(), "force checkout must restore deleted tracked file" def test_force_current_branch_restores_modified_file(self, repo: pathlib.Path) -> None: original = (repo / "a.py").read_text() (repo / "a.py").write_text("corrupted content\n") _checkout(repo, "--force", "main") assert (repo / "a.py").read_text() == original, ( "force checkout must restore modified tracked file to HEAD content" ) def test_force_current_branch_text_output(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main") assert "restored" in result.output def test_force_current_branch_json_action(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "main", "--json") data = json.loads(result.output) assert data["action"] == "restored" assert data["branch"] == "main" def test_force_current_branch_dry_run_does_not_restore( self, repo: pathlib.Path ) -> None: (repo / "a.py").unlink() _checkout(repo, "--force", "--dry-run", "main") assert not (repo / "a.py").exists(), ( "--dry-run must not actually restore files" ) def test_force_current_branch_dry_run_json(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--force", "--dry-run", "main", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "restored" def test_without_force_still_noop_on_current_branch( self, repo: pathlib.Path ) -> None: """Without --force, checkout on current branch is still a no-op.""" result = _checkout(repo, "main") assert "Already on" in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — CREATE (-b) # ────────────────────────────────────────────────────────────────────────────── class TestCreate: def test_create_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "new-branch") assert result.exit_code == 0 def test_create_switches_to_new_branch(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "new-branch") assert read_current_branch(repo) == "new-branch" def test_create_text_output(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "my-branch") assert "my-branch" in result.output def test_create_json_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "json-branch", "--json") data = json.loads(result.output) assert data["action"] == "created" assert data["branch"] == "json-branch" assert data["from_branch"] == "main" assert "commit_id" in data assert data.get("dry_run") is False def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "dup") _checkout(repo, "main") result = _checkout(repo, "-b", "dup") assert result.exit_code == 1 def test_create_duplicate_error_to_stderr(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "dup2") _checkout(repo, "main") result = _checkout(repo, "-b", "dup2") # Error must appear in stderr assert "already exists" in (result.stderr or "").lower() def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "bad..name") assert result.exit_code == 1 def test_create_invalid_name_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "bad..name") assert "Invalid" in (result.stderr or "") # ────────────────────────────────────────────────────────────────────────────── # Integration — DETACH HEAD # ────────────────────────────────────────────────────────────────────────────── class TestDetach: def test_detach_full_sha_exits_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha) assert result.exit_code == 0 def test_detach_full_sha_text_output(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha) assert sha[:8] in result.output def test_detach_full_sha_json_schema(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) assert data["action"] == "detached" assert data["branch"] is None assert data["commit_id"] == sha assert data["from_branch"] == "main" assert data.get("dry_run") is False def test_detach_partial_sha_exits_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha[:12]) assert result.exit_code == 0 def test_detach_partial_sha_points_to_correct_commit(self, repo: pathlib.Path) -> None: """A partial SHA must resolve to the correct commit, not be treated as a branch.""" from muse.core.store import get_commits_for_branch, read_current_branch from muse.core.repo import read_repo_id (repo / "b.py").write_text("b=1\n") _commit(repo, "-m", "second") repo_id = read_repo_id(repo) branch = read_current_branch(repo) commits = get_commits_for_branch(repo, repo_id, branch) first_sha = commits[-1].commit_id # oldest result = _checkout(repo, first_sha[:12]) assert result.exit_code == 0 assert first_sha[:8] in result.output def test_detach_bad_ref_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "deadbeefdeadbeef") assert result.exit_code == 1 def test_detach_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo, "deadbeefdeadbeef") assert "not a branch" in (result.stderr or "").lower() # ────────────────────────────────────────────────────────────────────────────── # Integration — DRY-RUN # ────────────────────────────────────────────────────────────────────────────── class TestDryRun: def test_dry_run_switch_exits_0(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat") assert result.exit_code == 0 def test_dry_run_does_not_switch_branch(self, two_branch_repo: pathlib.Path) -> None: _checkout(two_branch_repo, "--dry-run", "feat") assert read_current_branch(two_branch_repo) == "main" def test_dry_run_text_says_would(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat") assert "Would" in result.output assert "feat" in result.output def test_dry_run_json_schema(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "--dry-run", "feat", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "switched" assert data["branch"] == "feat" assert data["from_branch"] == "main" def test_dry_run_does_not_restore_files(self, two_branch_repo: pathlib.Path) -> None: """feat.py exists only on feat branch; dry-run must not create it on main.""" _checkout(two_branch_repo, "--dry-run", "feat") assert not (two_branch_repo / "feat.py").exists() def test_dry_run_create_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "dry-branch", "--dry-run") assert result.exit_code == 0 def test_dry_run_create_does_not_create_branch(self, repo: pathlib.Path) -> None: _checkout(repo, "-b", "dry-branch", "--dry-run") result = _invoke(repo, ["branch", "--json"]) names = [b["name"] for b in json.loads(result.output)] assert "dry-branch" not in names def test_dry_run_create_json_schema(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "dry-new", "--dry-run", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "created" assert data["from_branch"] == "main" def test_dry_run_detach_exits_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, "--dry-run", sha) assert result.exit_code == 0 def test_dry_run_detach_does_not_detach(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None _checkout(repo, "--dry-run", sha) assert read_current_branch(repo) == "main" def test_dry_run_detach_json(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, "--dry-run", sha, "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "detached" assert data["branch"] is None def test_dry_run_nonexistent_branch_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--dry-run", "no-such-branch") assert result.exit_code == 1 def test_dry_run_already_on_exits_0(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--dry-run", "main") assert result.exit_code == 0 def test_dry_run_already_on_json(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--dry-run", "main", "--json") data = json.loads(result.output) assert data["dry_run"] is True assert data["action"] == "already_on" # ────────────────────────────────────────────────────────────────────────────── # Integration — JSON schema consistency # ────────────────────────────────────────────────────────────────────────────── class TestJsonSchema: REQUIRED_KEYS = {"action", "branch", "commit_id", "from_branch", "dry_run"} def test_create_has_all_keys(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "k-test", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'created' JSON: {missing}" def test_switch_has_all_keys(self, two_branch_repo: pathlib.Path) -> None: result = _checkout(two_branch_repo, "feat", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'switched' JSON: {missing}" def test_already_on_has_all_keys(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'already_on' JSON: {missing}" def test_detach_has_all_keys(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in 'detached' JSON: {missing}" def test_detach_branch_is_null(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") assert sha is not None result = _checkout(repo, sha, "--json") data = json.loads(result.output) assert data["branch"] is None def test_from_branch_reflects_previous(self, two_branch_repo: pathlib.Path) -> None: _checkout(two_branch_repo, "feat") result = _checkout(two_branch_repo, "main", "--json") data = json.loads(result.output) assert data["from_branch"] == "feat" # ────────────────────────────────────────────────────────────────────────────── # Integration — validation # ────────────────────────────────────────────────────────────────────────────── class TestValidation: def test_no_target_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo) assert result.exit_code == 1 def test_no_target_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo) assert "Specify" in (result.stderr or "") def test_unknown_format_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--format", "xml") assert result.exit_code == 1 def test_unknown_format_error_to_stderr(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--format", "xml") assert "Unknown" in (result.stderr or "") def test_ours_without_theirs_context_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "file.py") assert result.exit_code == 1 def test_ours_and_theirs_together_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "--theirs", "--all") assert result.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: def _has_ansi(self, s: str) -> bool: return "\x1b[" in s def test_ansi_in_target_sanitized(self, repo: pathlib.Path) -> None: result = _checkout(repo, "\x1b[31mevil\x1b[0m") assert not self._has_ansi(result.output) def test_ansi_in_create_name_sanitized(self, repo: pathlib.Path) -> None: result = _checkout(repo, "-b", "\x1b[31mevil\x1b[0m") assert not self._has_ansi(result.output) def test_ansi_in_format_sanitized(self, repo: pathlib.Path) -> None: result = _checkout(repo, "main", "--format", "\x1b[31mxml\x1b[0m") assert not self._has_ansi(result.output) def test_error_not_a_branch_sanitized(self, repo: pathlib.Path) -> None: """The 'not a branch' error message must not echo raw ANSI from target.""" result = _checkout(repo, "\x1b[31mnotabranch\x1b[0m") assert not self._has_ansi(result.output) assert not self._has_ansi(result.stderr or "") def test_all_errors_to_stderr(self, repo: pathlib.Path) -> None: """Every ❌ error must go to stderr; stderr must contain the error.""" error_cases = [ ["ghost"], ["-b", "bad..name"], ["--format", "xml"], ] for case in error_cases: result = _checkout(repo, *case) assert result.exit_code != 0, f"Expected failure for args {case}" assert "❌" in (result.stderr or ""), ( f"Error not in stderr for args {case}: stderr={result.stderr!r}" ) # ────────────────────────────────────────────────────────────────────────────── # Integration — conflict resolution # ────────────────────────────────────────────────────────────────────────────── class TestConflictResolution: def _setup_merge_conflict( self, repo: pathlib.Path ) -> tuple[str, str]: """Create a merge conflict on ``repo``. Returns (ours_commit, theirs_commit).""" # ours: commit on main (repo / "shared.py").write_text("x = 1\n") _commit(repo, "-m", "main: set x=1") ours_cid = get_head_commit_id(repo, "main") or "" # theirs: commit on feature branch _branch(repo, "feat2") _invoke(repo, ["checkout", "feat2"]) (repo / "shared.py").write_text("x = 2\n") _commit(repo, "-m", "feat: set x=2") theirs_cid = get_head_commit_id(repo, "feat2") or "" _invoke(repo, ["checkout", "main"]) # Force a merge conflict via merge_engine plumbing from muse.core.merge_engine import write_merge_state write_merge_state( repo, base_commit="", ours_commit=ours_cid, theirs_commit=theirs_cid, conflict_paths=["shared.py"], other_branch="feat2", ) return ours_cid, theirs_cid def test_ours_no_merge_state_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "file.py") assert result.exit_code == 1 def test_theirs_no_merge_state_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--theirs", "file.py") assert result.exit_code == 1 def test_ours_and_theirs_both_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours", "--theirs", "--all") assert result.exit_code == 1 def test_ours_resolves_conflict(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "shared.py") assert result.exit_code == 0 def test_theirs_resolves_conflict(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--theirs", "shared.py") assert result.exit_code == 0 def test_resolve_all_ours_json(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "--all", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["action"] == "conflict_resolved_all" assert data["side"] == "ours" assert "resolved_count" in data assert "remaining_conflicts" in data def test_resolve_all_theirs_json(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--theirs", "--all", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["action"] == "conflict_resolved_all" assert data["side"] == "theirs" def test_resolve_single_file_json(self, repo: pathlib.Path) -> None: self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "shared.py", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["action"] == "conflict_resolved" assert data["file"] == "shared.py" assert data["side"] == "ours" assert "remaining_conflicts" in data def test_resolve_all_empty_conflicts_exits_0(self, repo: pathlib.Path) -> None: """--ours --all when no conflicts exist still exits 0.""" from muse.core.merge_engine import write_merge_state ours = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit="", ours_commit=ours, theirs_commit=ours, conflict_paths=[], other_branch="feat", ) result = _checkout(repo, "--ours", "--all") assert result.exit_code == 0 def test_resolve_nonexistent_path_exits_0(self, repo: pathlib.Path) -> None: """A path not in the conflict list is informational, not an error.""" self._setup_merge_conflict(repo) result = _checkout(repo, "--ours", "not_conflicted.py") assert result.exit_code == 0 def test_missing_ours_theirs_without_all_exits_1(self, repo: pathlib.Path) -> None: result = _checkout(repo, "--ours") assert result.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Stress # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStress: def test_checkout_100_file_branch_fast(self, repo: pathlib.Path) -> None: """Switching between branches with 100 modified files under 2s.""" for i in range(100): (repo / f"f{i:03d}.py").write_text(f"x={i}\n") _commit(repo, "-m", "big main") _branch(repo, "big-alt") _checkout(repo, "big-alt") for i in range(100): (repo / f"f{i:03d}.py").write_text(f"y={i}\n") _commit(repo, "-m", "big alt") _checkout(repo, "main") t0 = time.perf_counter() result = _checkout(repo, "big-alt") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 2000, f"checkout 100-file branch took {elapsed:.0f}ms (limit 2s)" def test_dry_run_100_file_branch_fast(self, repo: pathlib.Path) -> None: """dry-run on 100-file branch should be very fast (no restore).""" for i in range(100): (repo / f"g{i:03d}.py").write_text(f"x={i}\n") _commit(repo, "-m", "big2") _branch(repo, "big2-alt") t0 = time.perf_counter() result = _checkout(repo, "--dry-run", "big2-alt") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 500, f"dry-run took {elapsed:.0f}ms (limit 500ms)" def test_concurrent_checkouts_separate_repos(self, tmp_path: pathlib.Path) -> None: """Multiple threads checking out branches in separate repos must not interfere.""" errors: list[str] = [] def do_checkout(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) (repo_dir / "x.py").write_text(f"x={idx}\n") subprocess.run( ["muse", "commit", "-m", f"base{idx}"], cwd=str(repo_dir), capture_output=True, ) subprocess.run( ["muse", "branch", "alt"], cwd=str(repo_dir), capture_output=True ) subprocess.run( ["muse", "checkout", "alt"], cwd=str(repo_dir), capture_output=True ) (repo_dir / "y.py").write_text(f"y={idx}\n") subprocess.run( ["muse", "commit", "-m", f"alt{idx}"], cwd=str(repo_dir), capture_output=True, ) r = subprocess.run( ["muse", "checkout", "main", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) if r.returncode != 0: errors.append(f"repo_{idx}: checkout failed") return data = json.loads(r.stdout) if data.get("action") != "switched": errors.append(f"repo_{idx}: expected switched, got {data.get('action')}") threads = [threading.Thread(target=do_checkout, args=(i,)) for i in range(6)] for t in threads: t.start() for t in threads: t.join() assert not errors, "Concurrent checkout errors:\n" + "\n".join(errors) def test_repeated_back_and_forth_100_times(self, two_branch_repo: pathlib.Path) -> None: """Switching back and forth 100 times must not corrupt the working tree.""" for i in range(50): r1 = _checkout(two_branch_repo, "feat") assert r1.exit_code == 0, f"Iteration {i}: switch to feat failed" assert (two_branch_repo / "feat.py").exists() r2 = _checkout(two_branch_repo, "main") assert r2.exit_code == 0, f"Iteration {i}: switch to main failed" assert not (two_branch_repo / "feat.py").exists()