"""Comprehensive hardening tests for ``muse worktree``. Covers: - Unit: _load_meta symlink/size guards, _safe_delete_path, _WORKTREES_META_DIR constant, get_worktree_status, prune_worktrees dry_run - Security: symlink meta file, oversized meta, path-in-JSON pointing to .muse/, path-in-JSON pointing to symlink, ANSI injection, error routing to stderr - JSON schema: all subcommands (add, list, status, remove, prune) - Integration: -b/--create-branch, --path override, prune dry-run, status subcommand - E2E: text output paths still correct, worktree lifecycle - Stress: 50 worktrees, concurrent list reads """ from __future__ import annotations import json import pathlib import re import threading from typing import TypedDict import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import NULL_LONG_ID, long_id from muse.core.paths import muse_dir, objects_dir, ref_path, worktrees_dir runner = CliRunner() _ANSI_RE = re.compile(r"\x1b\[[0-9;]*m") # --------------------------------------------------------------------------- # Repo helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path, branch: str = "main") -> pathlib.Path: """Create a minimal Muse repo with a named branch.""" repo = tmp_path / "myproject" muse = muse_dir(repo) for d in ("objects", "commits", "snapshots", "refs/heads"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (muse / "HEAD").write_text(f"ref: refs/heads/{branch}\n") (muse / "refs" / "heads" / branch).write_text(NULL_LONG_ID) return repo def _add_branch(repo: pathlib.Path, branch: str) -> None: ref = ref_path(repo, branch) ref.parent.mkdir(parents=True, exist_ok=True) ref.write_text(NULL_LONG_ID) def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: return runner.invoke(None, args, env={"MUSE_REPO_ROOT": str(repo)}) def _json_blob(output: str) -> str: for line in output.splitlines(): stripped = line.strip() if stripped.startswith(("{", "[")): return stripped return output.strip() # --------------------------------------------------------------------------- # Typed schema helpers # --------------------------------------------------------------------------- class _AddJson(TypedDict): name: str branch: str path: str head_commit: str | None class _ListEntryJson(TypedDict): name: str branch: str path: str head_commit: str | None is_main: bool class _StatusJson(TypedDict): name: str branch: str path: str head_commit: str | None present: bool is_main: bool class _RemoveJson(TypedDict): name: str status: str class _PruneJson(TypedDict): pruned: list[str] count: int dry_run: bool def _parse_add(output: str) -> _AddJson: raw = json.loads(_json_blob(output)) assert isinstance(raw, dict) hc = raw.get("head_commit") return _AddJson( name=str(raw["name"]), branch=str(raw["branch"]), path=str(raw["path"]), head_commit=str(hc) if hc is not None else None, ) def _parse_list(output: str) -> list[_ListEntryJson]: raw = json.loads(_json_blob(output)) # list returns an envelope: {worktrees, exit_code, duration_ms} if isinstance(raw, dict): raw = raw["worktrees"] assert isinstance(raw, list) result: list[_ListEntryJson] = [] for item in raw: assert isinstance(item, dict) hc = item["head_commit"] assert hc is None or isinstance(hc, str) result.append(_ListEntryJson( name=str(item["name"]), branch=str(item["branch"]), path=str(item["path"]), head_commit=hc, is_main=bool(item["is_main"]), )) return result def _parse_status(output: str) -> _StatusJson: raw = json.loads(_json_blob(output)) assert isinstance(raw, dict) hc = raw["head_commit"] assert hc is None or isinstance(hc, str) return _StatusJson( name=str(raw["name"]), branch=str(raw["branch"]), path=str(raw["path"]), head_commit=hc, present=bool(raw["present"]), is_main=bool(raw["is_main"]), ) def _parse_remove(output: str) -> _RemoveJson: raw = json.loads(_json_blob(output)) assert isinstance(raw, dict) return _RemoveJson(name=str(raw["name"]), status=str(raw["status"])) def _parse_prune(output: str) -> _PruneJson: raw = json.loads(_json_blob(output)) assert isinstance(raw, dict) pruned_val = raw["pruned"] assert isinstance(pruned_val, list) count_val = raw["count"] assert isinstance(count_val, int) dry_run_val = raw["dry_run"] assert isinstance(dry_run_val, bool) return _PruneJson( pruned=[str(x) for x in pruned_val], count=count_val, dry_run=dry_run_val, ) # --------------------------------------------------------------------------- # Unit — _load_meta security guards # --------------------------------------------------------------------------- class TestLoadMetaSecurity: def test_symlink_meta_file_rejected(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _load_meta, _worktree_meta_path repo = _make_repo(tmp_path) name = "my-wt" # Create a symlink where the meta file should be. meta_path = _worktree_meta_path(repo, name) meta_path.parent.mkdir(parents=True, exist_ok=True) real_file = tmp_path / "real_meta.json" real_file.write_text(json.dumps({"name": name, "branch": "main", "path": str(tmp_path / "wt")})) meta_path.symlink_to(real_file) result = _load_meta(repo, name) assert result is None def test_oversized_meta_file_rejected(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _MAX_META_BYTES, _load_meta, _worktree_meta_path repo = _make_repo(tmp_path) name = "big-wt" meta_path = _worktree_meta_path(repo, name) meta_path.parent.mkdir(parents=True, exist_ok=True) meta_path.write_text("x" * (_MAX_META_BYTES + 1)) result = _load_meta(repo, name) assert result is None def test_corrupt_json_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _load_meta, _worktree_meta_path repo = _make_repo(tmp_path) name = "bad-wt" meta_path = _worktree_meta_path(repo, name) meta_path.parent.mkdir(parents=True, exist_ok=True) meta_path.write_text("not valid json !!!{{{") result = _load_meta(repo, name) assert result is None def test_missing_key_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _load_meta, _worktree_meta_path repo = _make_repo(tmp_path) name = "missing-key-wt" meta_path = _worktree_meta_path(repo, name) meta_path.parent.mkdir(parents=True, exist_ok=True) # Missing 'path' key. meta_path.write_text(json.dumps({"name": name, "branch": "main"})) result = _load_meta(repo, name) assert result is None def test_valid_meta_round_trips(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _load_meta, _save_meta, WorktreeRecord repo = _make_repo(tmp_path) (worktrees_dir(repo)).mkdir(parents=True, exist_ok=True) record: WorktreeRecord = {"name": "wt1", "branch": "dev", "path": str(tmp_path / "wt1")} _save_meta(repo, record) loaded = _load_meta(repo, "wt1") assert loaded is not None assert loaded["name"] == "wt1" assert loaded["branch"] == "dev" # --------------------------------------------------------------------------- # Unit — _safe_delete_path # --------------------------------------------------------------------------- class TestSafeDeletePath: def test_normal_directory_deleted(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _safe_delete_path repo = _make_repo(tmp_path) target = tmp_path / "target-dir" target.mkdir() (target / "file.txt").write_text("content") result = _safe_delete_path(repo, target) assert result is True assert not target.exists() def test_nonexistent_path_returns_true(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _safe_delete_path repo = _make_repo(tmp_path) result = _safe_delete_path(repo, tmp_path / "no-such-dir") assert result is True def test_symlink_refused(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _safe_delete_path repo = _make_repo(tmp_path) real_dir = tmp_path / "real-dir" real_dir.mkdir() symlink = tmp_path / "link-to-dir" symlink.symlink_to(real_dir) result = _safe_delete_path(repo, symlink) assert result is False assert real_dir.exists() # real directory untouched def test_path_inside_muse_refused(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import _safe_delete_path repo = _make_repo(tmp_path) inside_muse = objects_dir(repo) result = _safe_delete_path(repo, inside_muse) assert result is False assert inside_muse.exists() # --------------------------------------------------------------------------- # Unit — _WORKTREES_META_DIR constant # --------------------------------------------------------------------------- class TestWorktreesMetaDir: def test_constant_used_in_worktrees_dir(self, tmp_path: pathlib.Path) -> None: from muse.core.paths import worktrees_dir, muse_dir repo = _make_repo(tmp_path) assert worktrees_dir(repo) == muse_dir(repo) / "worktrees" # --------------------------------------------------------------------------- # Unit — get_worktree_status # --------------------------------------------------------------------------- class TestGetWorktreeStatus: def test_main_worktree_status(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import get_worktree_status repo = _make_repo(tmp_path) status = get_worktree_status(repo, "main") assert status["is_main"] is True assert status["name"] == "(main)" assert status["branch"] == "main" assert status["present"] is True def test_main_alias_works(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import get_worktree_status repo = _make_repo(tmp_path) s1 = get_worktree_status(repo, "main") s2 = get_worktree_status(repo, "(main)") assert s1["name"] == s2["name"] def test_linked_worktree_present(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import add_worktree, get_worktree_status repo = _make_repo(tmp_path) _add_branch(repo, "dev") add_worktree(repo, "mydev", "dev") status = get_worktree_status(repo, "mydev") assert status["name"] == "mydev" assert status["branch"] == "dev" assert status["present"] is True assert status["is_main"] is False def test_linked_worktree_absent_after_delete(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import add_worktree, get_worktree_status import shutil repo = _make_repo(tmp_path) _add_branch(repo, "dev") wt_path = add_worktree(repo, "mydev", "dev") # Manually delete the worktree directory (simulating external removal). shutil.rmtree(wt_path) status = get_worktree_status(repo, "mydev") assert status["present"] is False def test_nonexistent_worktree_raises(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import get_worktree_status repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="does not exist"): get_worktree_status(repo, "ghost") # --------------------------------------------------------------------------- # Unit — prune_worktrees dry_run # --------------------------------------------------------------------------- class TestPruneDryRun: def test_dry_run_does_not_delete(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import add_worktree, prune_worktrees import shutil repo = _make_repo(tmp_path) _add_branch(repo, "dev") wt_path = add_worktree(repo, "mydev", "dev") shutil.rmtree(wt_path) meta = worktrees_dir(repo) / "mydev.json" assert meta.exists() pruned = prune_worktrees(repo, dry_run=True) assert "mydev" in pruned # Meta file must still exist — dry_run=True. assert meta.exists() def test_dry_run_false_deletes(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import add_worktree, prune_worktrees import shutil repo = _make_repo(tmp_path) _add_branch(repo, "dev") wt_path = add_worktree(repo, "mydev", "dev") shutil.rmtree(wt_path) meta = worktrees_dir(repo) / "mydev.json" pruned = prune_worktrees(repo, dry_run=False) assert "mydev" in pruned assert not meta.exists() def test_dry_run_returns_empty_when_all_present(self, tmp_path: pathlib.Path) -> None: from muse.core.worktree import add_worktree, prune_worktrees repo = _make_repo(tmp_path) _add_branch(repo, "dev") add_worktree(repo, "mydev", "dev") pruned = prune_worktrees(repo, dry_run=True) assert pruned == [] # --------------------------------------------------------------------------- # Security — tampered path in meta.json # --------------------------------------------------------------------------- class TestTamperedMetaPath: def test_path_inside_muse_refused_on_remove(self, tmp_path: pathlib.Path) -> None: """A tampered path pointing inside .muse/ must be refused by remove_worktree.""" from muse.core.worktree import _worktree_meta_path, add_worktree, remove_worktree repo = _make_repo(tmp_path) _add_branch(repo, "dev") add_worktree(repo, "mydev", "dev") # Tamper: set path to the .muse/objects directory. meta_path = _worktree_meta_path(repo, "mydev") danger = str(objects_dir(repo)) meta_path.write_text(json.dumps({"name": "mydev", "branch": "dev", "path": danger})) with pytest.raises(ValueError, match="Refusing"): remove_worktree(repo, "mydev") # The objects directory must still exist. assert (objects_dir(repo)).exists() def test_symlink_wt_path_refused_on_remove(self, tmp_path: pathlib.Path) -> None: """A worktree path that resolves to a symlink must be refused by remove_worktree.""" from muse.core.worktree import _worktree_meta_path, add_worktree, remove_worktree repo = _make_repo(tmp_path) _add_branch(repo, "dev") add_worktree(repo, "mydev", "dev") # Replace the worktree directory with a symlink to an innocent directory. from muse.core.worktree import _worktree_dir wt_dir = _worktree_dir(repo, "mydev") import shutil shutil.rmtree(wt_dir) innocent = tmp_path / "innocent-dir" innocent.mkdir() wt_dir.symlink_to(innocent) with pytest.raises(ValueError, match="Refusing|symlink"): remove_worktree(repo, "mydev") assert innocent.exists() # --------------------------------------------------------------------------- # Security — error routing # --------------------------------------------------------------------------- class TestErrorRouting: def test_add_nonexistent_branch_goes_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "no-such-branch"]) assert result.exit_code != 0 assert "❌" in result.stderr def test_add_duplicate_name_goes_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mywt", "dev"]) result = _invoke(repo, ["worktree", "add", "mywt", "dev"]) assert result.exit_code != 0 assert "❌" in result.stderr def test_remove_nonexistent_goes_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "remove", "ghost"]) assert result.exit_code != 0 assert "❌" in result.stderr def test_status_nonexistent_goes_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "ghost"]) assert result.exit_code != 0 assert "❌" in result.stderr def test_create_branch_already_exists_goes_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "dev"]) assert result.exit_code != 0 assert "❌" in result.stderr # --------------------------------------------------------------------------- # Security — ANSI sanitization # --------------------------------------------------------------------------- class TestAnsiSanitization: def test_ansi_in_name_does_not_leak(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ansi_name = "\x1b[31mmywt\x1b[0m" result = _invoke(repo, ["worktree", "add", ansi_name, "main"]) assert _ANSI_RE.search(result.output) is None def test_ansi_in_branch_does_not_leak(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ansi_branch = "\x1b[31mmain\x1b[0m" result = _invoke(repo, ["worktree", "add", "mywt", ansi_branch]) assert _ANSI_RE.search(result.output) is None # --------------------------------------------------------------------------- # JSON schema — add # --------------------------------------------------------------------------- class TestJsonSchemaAdd: def test_add_json_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) assert result.exit_code == 0 parsed = _parse_add(result.output) assert parsed["name"] == "mydev" assert parsed["branch"] == "dev" assert "mydev" in parsed["path"] or "myproject" in parsed["path"] def test_add_json_with_create_branch(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "--json"]) assert result.exit_code == 0 parsed = _parse_add(result.output) assert parsed["branch"] == "feat/new" assert parsed["name"] == "mywt" def test_add_json_path_is_string(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "dev-wt", "dev", "--json"]) assert result.exit_code == 0 parsed = _parse_add(result.output) assert isinstance(parsed["path"], str) assert len(parsed["path"]) > 0 # --------------------------------------------------------------------------- # JSON schema — list # --------------------------------------------------------------------------- class TestJsonSchemaList: def test_list_json_includes_main(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list", "--json"]) assert result.exit_code == 0 entries = _parse_list(result.output) assert any(e["is_main"] for e in entries) def test_list_json_linked_worktree(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "list", "--json"]) assert result.exit_code == 0 entries = _parse_list(result.output) names = [e["name"] for e in entries] assert "mydev" in names def test_list_json_entry_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "list", "--json"]) entries = _parse_list(result.output) for entry in entries: assert isinstance(entry["name"], str) assert isinstance(entry["branch"], str) assert isinstance(entry["path"], str) assert entry["head_commit"] is None or isinstance(entry["head_commit"], str) assert isinstance(entry["is_main"], bool) def test_list_json_empty_repo(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list", "--json"]) assert result.exit_code == 0 entries = _parse_list(result.output) assert len(entries) == 1 # always includes main # --------------------------------------------------------------------------- # JSON schema — status # --------------------------------------------------------------------------- class TestJsonSchemaStatus: def test_status_json_main(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main", "--json"]) assert result.exit_code == 0 parsed = _parse_status(result.output) assert parsed["is_main"] is True assert parsed["present"] is True assert parsed["branch"] == "main" def test_status_json_linked_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "status", "mydev", "--json"]) assert result.exit_code == 0 parsed = _parse_status(result.output) assert parsed["name"] == "mydev" assert parsed["branch"] == "dev" assert parsed["present"] is True assert parsed["is_main"] is False def test_status_json_linked_absent(self, tmp_path: pathlib.Path) -> None: import shutil repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) # Delete the worktree directory externally. from muse.core.worktree import _worktree_dir wt_path = _worktree_dir(repo, "mydev") shutil.rmtree(wt_path) result = _invoke(repo, ["worktree", "status", "mydev", "--json"]) assert result.exit_code == 0 parsed = _parse_status(result.output) assert parsed["present"] is False def test_status_json_nonexistent_fails(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "ghost", "--json"]) assert result.exit_code != 0 def test_status_json_all_fields_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main", "--json"]) parsed = _parse_status(result.output) assert "name" in parsed assert "branch" in parsed assert "path" in parsed assert "head_commit" in parsed assert "present" in parsed assert "is_main" in parsed # --------------------------------------------------------------------------- # JSON schema — remove # --------------------------------------------------------------------------- class TestJsonSchemaRemove: def test_remove_json_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "remove", "mydev", "--json"]) assert result.exit_code == 0 parsed = _parse_remove(result.output) assert parsed["name"] == "mydev" assert parsed["status"] == "removed" def test_remove_json_nonexistent_fails(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "remove", "ghost", "--json"]) assert result.exit_code != 0 def test_remove_cleans_up_directory(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir wt_path = _worktree_dir(repo, "mydev") assert wt_path.exists() _invoke(repo, ["worktree", "remove", "mydev"]) assert not wt_path.exists() # --------------------------------------------------------------------------- # JSON schema — prune # --------------------------------------------------------------------------- class TestJsonSchemaPrune: def test_prune_json_nothing_to_prune(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "prune", "--json"]) assert result.exit_code == 0 parsed = _parse_prune(result.output) assert parsed["pruned"] == [] assert parsed["count"] == 0 assert parsed["dry_run"] is False def test_prune_json_stale_worktree(self, tmp_path: pathlib.Path) -> None: import shutil repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir shutil.rmtree(_worktree_dir(repo, "mydev")) result = _invoke(repo, ["worktree", "prune", "--json"]) assert result.exit_code == 0 parsed = _parse_prune(result.output) assert "mydev" in parsed["pruned"] assert parsed["count"] == 1 assert parsed["dry_run"] is False def test_prune_json_dry_run(self, tmp_path: pathlib.Path) -> None: import shutil repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir shutil.rmtree(_worktree_dir(repo, "mydev")) # Meta file still present. meta = worktrees_dir(repo) / "mydev.json" assert meta.exists() result = _invoke(repo, ["worktree", "prune", "--dry-run", "--json"]) assert result.exit_code == 0 parsed = _parse_prune(result.output) assert "mydev" in parsed["pruned"] assert parsed["dry_run"] is True # Meta must not be deleted in dry-run. assert meta.exists() def test_prune_json_count_matches_pruned_list(self, tmp_path: pathlib.Path) -> None: import shutil repo = _make_repo(tmp_path) for i in range(3): branch = f"feat{i}" _add_branch(repo, branch) _invoke(repo, ["worktree", "add", f"wt{i}", branch]) from muse.core.worktree import _worktree_dir shutil.rmtree(_worktree_dir(repo, f"wt{i}")) result = _invoke(repo, ["worktree", "prune", "--json"]) parsed = _parse_prune(result.output) assert parsed["count"] == len(parsed["pruned"]) assert parsed["count"] == 3 # --------------------------------------------------------------------------- # Integration — -b/--create-branch # --------------------------------------------------------------------------- class TestCreateBranch: def test_create_branch_makes_new_ref(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "--json"]) assert result.exit_code == 0 new_ref = ref_path(repo, "feat") / "new" assert new_ref.exists() def test_create_branch_checked_out_in_worktree(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "--json"]) assert result.exit_code == 0 parsed = _parse_add(result.output) assert parsed["branch"] == "feat/new" def test_create_branch_existing_name_fails(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) # 'main' already exists. result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "main"]) assert result.exit_code != 0 def test_create_branch_no_commits_on_start_fails(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) # Create a branch that has no commits. empty_branch_ref = ref_path(repo, "empty") # Don't write anything to it — it doesn't exist as a ref. # The start point HEAD should be 'main' which has '0'*64 as a dummy ref, # get_head_commit_id reads the file content; let's make it return None. empty_branch_ref.write_text("") # empty file → no commit result = _invoke(repo, ["worktree", "add", "mywt", "empty", "-b", "feat/from-empty"]) # get_head_commit_id for a branch with empty ref content returns None → error assert result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — --path override # --------------------------------------------------------------------------- class TestCustomPath: def test_custom_path_used(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") custom = tmp_path / "custom-wt-dir" result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--path", str(custom), "--json"]) assert result.exit_code == 0 parsed = _parse_add(result.output) assert pathlib.Path(parsed["path"]).resolve() == custom.resolve() assert custom.exists() def test_custom_path_shows_in_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") custom = tmp_path / "custom-wt-dir" _invoke(repo, ["worktree", "add", "mydev", "dev", "--path", str(custom)]) result = _invoke(repo, ["worktree", "list", "--json"]) entries = _parse_list(result.output) paths = [e["path"] for e in entries] assert any("custom-wt-dir" in p for p in paths) # --------------------------------------------------------------------------- # E2E — text output (non-JSON) still correct # --------------------------------------------------------------------------- class TestE2EText: def test_add_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev"]) assert result.exit_code == 0 assert "✅" in result.output or "created" in result.output.lower() def test_list_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list"]) assert result.exit_code == 0 assert "main" in result.output def test_status_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "status", "mydev"]) assert result.exit_code == 0 assert "mydev" in result.output assert "dev" in result.output def test_remove_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "remove", "mydev"]) assert result.exit_code == 0 assert "✅" in result.output or "removed" in result.output.lower() def test_prune_text_nothing(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "prune"]) assert result.exit_code == 0 assert "Nothing" in result.output def test_prune_text_dry_run(self, tmp_path: pathlib.Path) -> None: import shutil repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir shutil.rmtree(_worktree_dir(repo, "mydev")) result = _invoke(repo, ["worktree", "prune", "--dry-run"]) assert result.exit_code == 0 assert "dry-run" in result.output or "would prune" in result.output.lower() def test_status_text_main(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main"]) assert result.exit_code == 0 assert "present" in result.output.lower() # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_50_worktrees_add_list_remove(self, tmp_path: pathlib.Path) -> None: """Create 50 worktrees, verify list, then remove all.""" repo = _make_repo(tmp_path) names: list[str] = [] for i in range(50): branch = f"branch{i}" _add_branch(repo, branch) name = f"wt{i}" r = _invoke(repo, ["worktree", "add", name, branch, "--json"]) assert r.exit_code == 0, f"Failed to add wt{i}: {r.output}" names.append(name) r_list = _invoke(repo, ["worktree", "list", "--json"]) assert r_list.exit_code == 0 entries = _parse_list(r_list.output) listed_names = {e["name"] for e in entries} for name in names: assert name in listed_names for name in names: r_rm = _invoke(repo, ["worktree", "remove", name, "--json"]) assert r_rm.exit_code == 0 def test_concurrent_list_reads_safe(self, tmp_path: pathlib.Path) -> None: """Concurrent reads of worktree list must not crash.""" repo = _make_repo(tmp_path) for i in range(5): branch = f"br{i}" _add_branch(repo, branch) from muse.core.worktree import add_worktree add_worktree(repo, f"wt{i}", branch) errors: list[str] = [] def _read() -> None: from muse.core.worktree import list_worktrees try: wts = list_worktrees(repo) assert len(wts) >= 1 except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=_read) for _ in range(30)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent list failures: {errors}" def test_prune_50_stale_worktrees(self, tmp_path: pathlib.Path) -> None: """prune_worktrees with 50 stale entries all reported correctly.""" import shutil repo = _make_repo(tmp_path) for i in range(50): branch = f"br{i}" _add_branch(repo, branch) from muse.core.worktree import add_worktree, _worktree_dir add_worktree(repo, f"wt{i}", branch) shutil.rmtree(_worktree_dir(repo, f"wt{i}")) result = _invoke(repo, ["worktree", "prune", "--json"]) assert result.exit_code == 0 parsed = _parse_prune(result.output) assert parsed["count"] == 50 assert len(parsed["pruned"]) == 50 # --------------------------------------------------------------------------- # Extended — muse worktree add # --------------------------------------------------------------------------- class TestWorktreeAddExtended: def test_add_j_alias(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "-j"]) assert result.exit_code == 0 d = _parse_add(result.output) assert d["name"] == "mydev" def test_add_json_schema_has_head_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) assert result.exit_code == 0 d = _parse_add(result.output) assert "head_commit" in d def test_add_json_head_commit_is_sha256_or_null(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) d = _parse_add(result.output) hc = d["head_commit"] assert hc is None or (isinstance(hc, str) and hc.startswith("sha256:")) def test_add_json_schema_complete(self, tmp_path: pathlib.Path) -> None: _REQUIRED = {"name", "branch", "path", "head_commit"} repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) assert _REQUIRED <= _parse_add(result.output).keys() def test_add_worktree_dir_created(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) wt_path = pathlib.Path(_parse_add(result.output)["path"]) assert wt_path.exists() and wt_path.is_dir() def test_add_appears_in_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output) assert any(e["name"] == "mydev" for e in entries) def test_add_duplicate_name_fails(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "add", "mydev", "dev"]) assert result.exit_code != 0 def test_add_nonexistent_branch_fails(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "no-such-branch"]) assert result.exit_code != 0 def test_add_json_branch_matches_arg(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) assert _parse_add(result.output)["branch"] == "dev" def test_add_json_name_matches_arg(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) assert _parse_add(result.output)["name"] == "mydev" def test_add_text_shows_checkmark(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev"]) assert "✅" in result.output or "created" in result.output.lower() def test_add_text_shows_branch(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev"]) assert "dev" in result.output def test_add_text_shows_path(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev"]) assert "mydev" in result.output def test_add_default_is_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev"]) try: json.loads(result.output) assert False, "default should be text" except (json.JSONDecodeError, ValueError): pass def test_add_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, ["worktree", "add", "mywt", "main"]) assert result.exit_code == 2 def test_add_description_in_help(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, ["worktree", "add", "--help"]) assert result.exit_code == 0 assert "Agent quickstart" in result.output or "head_commit" in result.output def test_add_create_branch_with_j(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "-j"]) assert result.exit_code == 0 d = _parse_add(result.output) assert d["branch"] == "feat/new" assert "head_commit" in d def test_add_path_is_absolute(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) path = pathlib.Path(_parse_add(result.output)["path"]) assert path.is_absolute() def test_add_multiple_worktrees(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(5): _add_branch(repo, f"feat{i}") r = _invoke(repo, ["worktree", "add", f"wt{i}", f"feat{i}", "--json"]) assert r.exit_code == 0, f"wt{i} failed: {r.output}" entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output) names = [e["name"] for e in entries] for i in range(5): assert f"wt{i}" in names def test_add_worktree_has_muse_pointer(self, tmp_path: pathlib.Path) -> None: """Each worktree directory must contain a .muse pointer file.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) wt_path = pathlib.Path(_parse_add(result.output)["path"]) assert (muse_dir(wt_path)).exists() def test_add_invalid_name_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "bad name!", "main"]) assert result.exit_code != 0 def test_add_invalid_create_branch_name_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "bad name!"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Security — muse worktree add # --------------------------------------------------------------------------- class TestWorktreeAddSecurity: def test_ansi_in_name_sanitized_text(self, tmp_path: pathlib.Path) -> None: """ANSI in name must not appear raw in text output.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") # Invalid name (contains space/special) will be rejected — just verify no crash result = _invoke(repo, ["worktree", "add", "\x1b[31mwt\x1b[0m", "dev"]) assert "\x1b[31m" not in result.output def test_ansi_in_branch_sanitized_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "\x1b[31mmain\x1b[0m"]) assert "\x1b[31m" not in result.output def test_path_traversal_name_rejected(self, tmp_path: pathlib.Path) -> None: """Names with path separators must be rejected.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "../traversal", "main"]) assert result.exit_code != 0 def test_null_byte_in_name_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "bad\x00name", "main"]) assert result.exit_code != 0 def test_existing_directory_prevented(self, tmp_path: pathlib.Path) -> None: """add must refuse if the target directory already exists.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") # Pre-create the target directory from muse.core.worktree import _worktree_dir wt_path = _worktree_dir(repo, "mydev") wt_path.mkdir(parents=True) result = _invoke(repo, ["worktree", "add", "mydev", "dev"]) assert result.exit_code != 0 def test_error_to_stderr(self, tmp_path: pathlib.Path) -> None: """Error messages for missing branch must go to stderr.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "no-such-branch"]) assert result.exit_code != 0 assert "no-such-branch" in (result.stderr or result.output) def test_create_branch_duplicate_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "main"]) assert result.exit_code != 0 def test_json_output_is_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _add_branch(repo, "dev") result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) assert result.exit_code == 0 d = json.loads(result.output.strip().splitlines()[0]) assert isinstance(d, dict) # --------------------------------------------------------------------------- # Stress — muse worktree add # --------------------------------------------------------------------------- class TestWorktreeAddStress: def test_add_20_worktrees_sequential(self, tmp_path: pathlib.Path) -> None: """Add 20 worktrees sequentially; all must succeed and appear in list.""" repo = _make_repo(tmp_path) failures: list[str] = [] for i in range(20): _add_branch(repo, f"feat{i}") r = _invoke(repo, ["worktree", "add", f"wt{i}", f"feat{i}", "--json"]) if r.exit_code != 0: failures.append(f"wt{i}: {r.output.strip()[:60]}") assert not failures, f"Add failures: {failures}" entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output) assert len(entries) == 21 # 20 linked + 1 main def test_add_performance(self, tmp_path: pathlib.Path) -> None: """Adding a worktree must complete in < 2 s.""" import time repo = _make_repo(tmp_path) _add_branch(repo, "dev") start = time.perf_counter() result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"]) elapsed = time.perf_counter() - start assert result.exit_code == 0 assert elapsed < 2.0, f"add took {elapsed:.2f}s" def test_add_with_create_branch_20_times(self, tmp_path: pathlib.Path) -> None: """Create 20 worktrees each with a fresh -b branch.""" repo = _make_repo(tmp_path) failures: list[str] = [] for i in range(20): r = _invoke(repo, ["worktree", "add", f"wt{i}", "main", "-b", f"feat/task-{i}", "--json"]) if r.exit_code != 0: failures.append(f"wt{i}: {r.output.strip()[:60]}") assert not failures entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output) assert len(entries) == 21 # =========================================================================== # muse worktree list — Extended / Security / Stress # =========================================================================== class TestWorktreeListExtended: """-j alias, text output, ordering, schema completeness, edge cases.""" def test_list_j_alias_json(self, tmp_path: pathlib.Path) -> None: """-j is accepted and produces the same JSON as --json.""" repo = _make_repo(tmp_path) r1 = _invoke(repo, ["worktree", "list", "--json"]) r2 = _invoke(repo, ["worktree", "list", "-j"]) assert r1.exit_code == 0 assert r2.exit_code == 0 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None) d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None) assert d1 == d2 def test_list_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json the output is human-readable text, not JSON.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list"]) assert result.exit_code == 0 output = result.output.strip() assert not output.startswith("[") assert not output.startswith("{") def test_list_text_has_header(self, tmp_path: pathlib.Path) -> None: """Text output includes a column header.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list"]) assert "name" in result.output assert "branch" in result.output def test_list_text_empty_repo_prints_main(self, tmp_path: pathlib.Path) -> None: """Repo with no linked worktrees still shows main in text mode.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list"]) assert result.exit_code == 0 assert "(main)" in result.output def test_list_main_always_first_in_json(self, tmp_path: pathlib.Path) -> None: """Main worktree is always the first element in JSON output.""" repo = _make_repo(tmp_path) _add_branch(repo, "feat/x") _invoke(repo, ["worktree", "add", "feat-x", "feat/x"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert entries[0]["is_main"] is True def test_list_linked_worktrees_sorted(self, tmp_path: pathlib.Path) -> None: """Linked worktrees appear in lexicographic order after main.""" repo = _make_repo(tmp_path) for name in ("zzz", "aaa", "mmm"): _add_branch(repo, name) _invoke(repo, ["worktree", "add", name, name]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) linked = [e["name"] for e in entries if not e["is_main"]] assert linked == sorted(linked) def test_list_json_path_is_absolute(self, tmp_path: pathlib.Path) -> None: """Every path in JSON output is absolute.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) for entry in entries: assert pathlib.Path(entry["path"]).is_absolute() def test_list_json_all_fields_present(self, tmp_path: pathlib.Path) -> None: """Every JSON entry has name, branch, path, head_commit, is_main.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "list", "-j"]).output)) for entry in raw["worktrees"]: for field in ("name", "branch", "path", "head_commit", "is_main"): assert field in entry, f"field '{field}' missing from {entry}" def test_list_json_only_one_is_main(self, tmp_path: pathlib.Path) -> None: """Exactly one entry has is_main=true.""" repo = _make_repo(tmp_path) for name in ("a", "b", "c"): _add_branch(repo, name) _invoke(repo, ["worktree", "add", name, name]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert sum(1 for e in entries if e["is_main"]) == 1 def test_list_json_linked_is_main_false(self, tmp_path: pathlib.Path) -> None: """All linked worktrees have is_main=false.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) linked = [e for e in entries if not e["is_main"]] assert all(e["is_main"] is False for e in linked) def test_list_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: """Running outside a repo exits with code 2.""" result = runner.invoke(None, ["worktree", "list"], env={"MUSE_REPO_ROOT": str(tmp_path)}) assert result.exit_code == 2 def test_list_json_output_is_array(self, tmp_path: pathlib.Path) -> None: """JSON output envelope wraps worktrees list in a dict.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list", "-j"]) raw = json.loads(_json_blob(result.output)) assert isinstance(raw["worktrees"], list) def test_list_json_count_matches_worktrees(self, tmp_path: pathlib.Path) -> None: """JSON array length equals 1 (main) + number of linked worktrees.""" repo = _make_repo(tmp_path) n = 5 for i in range(n): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert len(entries) == n + 1 def test_list_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help output includes the rich description text.""" result = _invoke(tmp_path, ["worktree", "list", "--help"]) assert result.exit_code == 0 assert "Agent quickstart" in result.output or "JSON output schema" in result.output def test_list_text_shows_worktree_name(self, tmp_path: pathlib.Path) -> None: """Text output includes linked worktree name.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "list"]) assert "mydev" in result.output def test_list_text_marks_main_with_star(self, tmp_path: pathlib.Path) -> None: """Main worktree row starts with '*' in text output.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "list"]) lines = result.output.splitlines() assert any(l.startswith("* ") for l in lines) def test_list_after_remove_not_shown(self, tmp_path: pathlib.Path) -> None: """A removed worktree no longer appears in list.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) _invoke(repo, ["worktree", "remove", "mydev"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert all(e["name"] != "mydev" for e in entries) def test_list_json_branch_matches_added_branch(self, tmp_path: pathlib.Path) -> None: """Branch field in list JSON matches the branch used at add time.""" repo = _make_repo(tmp_path) _add_branch(repo, "feat/x") _invoke(repo, ["worktree", "add", "feat-x", "feat/x"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) feat = next(e for e in entries if e["name"] == "feat-x") assert feat["branch"] == "feat/x" class TestWorktreeListSecurity: """ANSI sanitization, path integrity, error routing.""" def test_list_json_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes injected into stored branch name are stripped from JSON.""" repo = _make_repo(tmp_path) poisoned = "dev\x1b[31mred\x1b[0m" # Write the metadata directly so we bypass name validation. meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) import json as _json (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")})) # Create the worktree path so it is "present". (repo / "mywt").mkdir() result = _invoke(repo, ["worktree", "list", "-j"]) assert result.exit_code == 0 raw = _json.loads(_json_blob(result.output)) wt = next((e for e in raw["worktrees"] if e["name"] == "mywt"), None) assert wt is not None assert "\x1b" not in wt["branch"] def test_list_text_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes injected into stored branch are stripped from text output.""" repo = _make_repo(tmp_path) poisoned = "dev\x1b[31mred\x1b[0m" meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) import json as _json (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")})) (repo / "mywt").mkdir() result = _invoke(repo, ["worktree", "list"]) assert "\x1b" not in result.output def test_list_error_to_stderr_not_stdout(self, tmp_path: pathlib.Path) -> None: """Error output goes to stderr, not stdout.""" result = runner.invoke(None, ["worktree", "list"], env={"MUSE_REPO_ROOT": str(tmp_path)}) assert result.exit_code != 0 assert result.stdout == "" or not result.stdout.strip().startswith("[") def test_list_json_path_never_empty(self, tmp_path: pathlib.Path) -> None: """path field in JSON is always a non-empty string.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) for entry in entries: assert entry["path"] def test_list_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: """Output is well-formed JSON (no trailing commas, correct types).""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "list", "-j"]) parsed = json.loads(_json_blob(result.output)) assert isinstance(parsed["worktrees"], list) for entry in parsed["worktrees"]: assert isinstance(entry["is_main"], bool) assert isinstance(entry["name"], str) def test_list_json_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes injected into stored worktree name are stripped from JSON.""" repo = _make_repo(tmp_path) poisoned_name = "mywt" branch = "dev" meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) import json as _json (meta_dir / f"{poisoned_name}.json").write_text( _json.dumps({"name": poisoned_name, "branch": branch, "path": str(repo / poisoned_name)}) ) (repo / poisoned_name).mkdir() # Patch the name field as returned by list_worktrees via WorktreeInfo # by writing a meta file with a name that would contain ANSI if stored. # Since name comes from filename stem (already safe), verify the # sanitize_display path is exercised by injecting directly into the # raw output path — branch covers this adequately (tested above). result = _invoke(repo, ["worktree", "list", "-j"]) assert "\x1b" not in result.output class TestWorktreeListStress: """Performance and scale tests for worktree list.""" def test_list_20_worktrees(self, tmp_path: pathlib.Path) -> None: """List with 20 linked worktrees returns all 21 entries (main + 20).""" repo = _make_repo(tmp_path) for i in range(20): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert len(entries) == 21 def test_list_performance(self, tmp_path: pathlib.Path) -> None: """Listing 15 worktrees completes within 2 seconds.""" import time repo = _make_repo(tmp_path) for i in range(15): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) t0 = time.monotonic() result = _invoke(repo, ["worktree", "list", "-j"]) elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 2.0, f"list took {elapsed:.2f}s" def test_list_concurrent_reads(self, tmp_path: pathlib.Path) -> None: """Concurrent list invocations via threads all return consistent counts.""" repo = _make_repo(tmp_path) for i in range(10): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) results: list[int] = [] errors: list[str] = [] lock = threading.Lock() def _run() -> None: r = _invoke(repo, ["worktree", "list", "-j"]) with lock: if r.exit_code != 0: errors.append(r.output) return try: envelope = json.loads(_json_blob(r.output)) results.append(len(envelope["worktrees"])) except (json.JSONDecodeError, ValueError) as exc: errors.append(f"parse error: {exc!r} output={r.output!r}") threads = [threading.Thread(target=_run) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent list errors: {errors}" assert all(n == 11 for n in results), f"Inconsistent counts: {results}" # =========================================================================== # muse worktree status — Extended / Security / Stress # =========================================================================== class TestWorktreeStatusExtended: """-j alias, text output, main aliases, all fields, edge cases.""" def test_status_j_alias(self, tmp_path: pathlib.Path) -> None: """-j produces the same JSON as --json.""" repo = _make_repo(tmp_path) r1 = _invoke(repo, ["worktree", "status", "main", "--json"]) r2 = _invoke(repo, ["worktree", "status", "main", "-j"]) assert r1.exit_code == 0 and r2.exit_code == 0 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None) d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None) assert d1 == d2 def test_status_main_alias_main(self, tmp_path: pathlib.Path) -> None: """'main' resolves to the main worktree.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main", "-j"]) assert result.exit_code == 0 parsed = _parse_status(result.output) assert parsed["is_main"] is True def test_status_main_alias_paren_main(self, tmp_path: pathlib.Path) -> None: """'(main)' resolves to the main worktree.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "(main)", "-j"]) assert result.exit_code == 0 parsed = _parse_status(result.output) assert parsed["is_main"] is True def test_status_linked_present(self, tmp_path: pathlib.Path) -> None: """present=true when the worktree directory exists.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output) assert parsed["present"] is True def test_status_linked_absent(self, tmp_path: pathlib.Path) -> None: """present=false when the worktree directory has been deleted.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output) assert parsed["present"] is False def test_status_nonexistent_exits_1(self, tmp_path: pathlib.Path) -> None: """Querying a nonexistent worktree exits with code 1.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "ghost", "-j"]) assert result.exit_code == 1 def test_status_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: """Running outside a repo exits with code 2.""" result = runner.invoke(None, ["worktree", "status", "main"], env={"MUSE_REPO_ROOT": str(tmp_path)}) assert result.exit_code == 2 def test_status_json_all_fields_present(self, tmp_path: pathlib.Path) -> None: """JSON output contains all six required fields.""" repo = _make_repo(tmp_path) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "status", "main", "-j"]).output)) for field in ("name", "branch", "path", "head_commit", "present", "is_main"): assert field in raw, f"field '{field}' missing" def test_status_json_path_is_absolute(self, tmp_path: pathlib.Path) -> None: """path field in JSON is absolute.""" repo = _make_repo(tmp_path) parsed = _parse_status(_invoke(repo, ["worktree", "status", "main", "-j"]).output) assert pathlib.Path(parsed["path"]).is_absolute() def test_status_json_branch_matches(self, tmp_path: pathlib.Path) -> None: """branch field matches the branch the worktree was created with.""" repo = _make_repo(tmp_path) _add_branch(repo, "feat/x") _invoke(repo, ["worktree", "add", "feat-x", "feat/x"]) parsed = _parse_status(_invoke(repo, ["worktree", "status", "feat-x", "-j"]).output) assert parsed["branch"] == "feat/x" def test_status_json_is_main_false_for_linked(self, tmp_path: pathlib.Path) -> None: """is_main=false for a linked worktree.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output) assert parsed["is_main"] is False def test_status_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json output is human-readable text, not JSON.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main"]) assert result.exit_code == 0 output = result.output.strip() assert not output.startswith("{") def test_status_text_shows_branch(self, tmp_path: pathlib.Path) -> None: """Text output includes the branch name.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main"]) assert "main" in result.output def test_status_text_shows_present(self, tmp_path: pathlib.Path) -> None: """Text output indicates the worktree is present.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main"]) assert "present" in result.output def test_status_text_main_flag(self, tmp_path: pathlib.Path) -> None: """Text output marks the main worktree with '[main]'.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main"]) assert "[main]" in result.output def test_status_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help includes the rich description.""" result = _invoke(tmp_path, ["worktree", "status", "--help"]) assert result.exit_code == 0 assert "Agent quickstart" in result.output or "JSON output schema" in result.output def test_status_head_commit_null_for_empty_branch(self, tmp_path: pathlib.Path) -> None: """head_commit is null when no commits exist on the branch.""" repo = _make_repo(tmp_path) parsed = _parse_status(_invoke(repo, ["worktree", "status", "main", "-j"]).output) # The test repo uses "0"*64 as the ref — get_head_commit_id may return # that or None depending on whether the commit object exists. assert parsed["head_commit"] is None or isinstance(parsed["head_commit"], str) def test_status_json_name_matches_requested(self, tmp_path: pathlib.Path) -> None: """name field in JSON matches the queried worktree name.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output) assert parsed["name"] == "mydev" def test_status_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: """Output is well-formed JSON.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "main", "-j"]) parsed = json.loads(_json_blob(result.output)) assert isinstance(parsed, dict) assert isinstance(parsed["is_main"], bool) assert isinstance(parsed["present"], bool) class TestWorktreeStatusSecurity: """ANSI sanitization and error routing for worktree status.""" def test_status_json_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in stored branch are stripped from JSON output.""" repo = _make_repo(tmp_path) import json as _json poisoned = "dev\x1b[31mred\x1b[0m" meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")})) (repo / "mywt").mkdir() result = _invoke(repo, ["worktree", "status", "mywt", "-j"]) assert result.exit_code == 0 raw = _json.loads(_json_blob(result.output)) assert "\x1b" not in raw["branch"] def test_status_text_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in stored branch are stripped from text output.""" repo = _make_repo(tmp_path) import json as _json poisoned = "dev\x1b[31mred\x1b[0m" meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")})) (repo / "mywt").mkdir() result = _invoke(repo, ["worktree", "status", "mywt"]) assert "\x1b" not in result.output def test_status_error_to_stderr(self, tmp_path: pathlib.Path) -> None: """Error output goes to stderr, not stdout.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "ghost"]) assert result.exit_code != 0 assert result.stdout.strip() == "" or not result.stdout.strip().startswith("{") def test_status_json_path_never_empty(self, tmp_path: pathlib.Path) -> None: """path is always a non-empty string.""" repo = _make_repo(tmp_path) parsed = _parse_status(_invoke(repo, ["worktree", "status", "main", "-j"]).output) assert parsed["path"] def test_status_json_ansi_in_path_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in stored path are stripped from JSON output.""" repo = _make_repo(tmp_path) import json as _json meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) poisoned_path = str(repo / "mywt") + "\x1b[31m" (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": "dev", "path": poisoned_path})) (repo / "mywt").mkdir() result = _invoke(repo, ["worktree", "status", "mywt", "-j"]) assert result.exit_code == 0 raw = _json.loads(_json_blob(result.output)) assert "\x1b" not in raw["path"] def test_status_invalid_name_exits_1(self, tmp_path: pathlib.Path) -> None: """An invalid worktree name (path traversal attempt) exits with 1.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "status", "../../../etc/passwd"]) assert result.exit_code == 1 class TestWorktreeStatusStress: """Performance and scale tests for worktree status.""" def test_status_10_sequential(self, tmp_path: pathlib.Path) -> None: """Querying status of 10 worktrees sequentially all succeed.""" repo = _make_repo(tmp_path) for i in range(10): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) for i in range(10): result = _invoke(repo, ["worktree", "status", f"wt{i}", "-j"]) assert result.exit_code == 0 parsed = _parse_status(result.output) assert parsed["name"] == f"wt{i}" def test_status_performance(self, tmp_path: pathlib.Path) -> None: """10 sequential status queries complete within 2 seconds.""" import time repo = _make_repo(tmp_path) for i in range(10): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) t0 = time.monotonic() for i in range(10): _invoke(repo, ["worktree", "status", f"wt{i}", "-j"]) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"10 status queries took {elapsed:.2f}s" def test_status_concurrent_reads(self, tmp_path: pathlib.Path) -> None: """Concurrent status queries against the same worktree all succeed.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) errors: list[str] = [] lock = threading.Lock() def _run() -> None: r = _invoke(repo, ["worktree", "status", "mydev", "-j"]) if r.exit_code != 0: with lock: errors.append(r.output) threads = [threading.Thread(target=_run) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent status errors: {errors}" # =========================================================================== # muse worktree remove — Extended / Security / Stress # =========================================================================== class TestWorktreeRemoveExtended: """-j alias, text output, lifecycle, metadata cleanup, edge cases.""" def test_remove_j_alias(self, tmp_path: pathlib.Path) -> None: """-j produces the same JSON as --json.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "remove", "mydev", "-j"]) assert result.exit_code == 0 parsed = _parse_remove(result.output) assert parsed["name"] == "mydev" assert parsed["status"] == "removed" def test_remove_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json output is human-readable text.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "remove", "mydev"]) assert result.exit_code == 0 assert not result.output.strip().startswith("{") assert "mydev" in result.output def test_remove_json_name_field(self, tmp_path: pathlib.Path) -> None: """JSON name field matches the removed worktree.""" repo = _make_repo(tmp_path) _add_branch(repo, "feat/x") _invoke(repo, ["worktree", "add", "feat-x", "feat/x"]) parsed = _parse_remove(_invoke(repo, ["worktree", "remove", "feat-x", "-j"]).output) assert parsed["name"] == "feat-x" def test_remove_json_status_field(self, tmp_path: pathlib.Path) -> None: """JSON status field is always 'removed'.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) parsed = _parse_remove(_invoke(repo, ["worktree", "remove", "mydev", "-j"]).output) assert parsed["status"] == "removed" def test_remove_cleans_working_dir(self, tmp_path: pathlib.Path) -> None: """Worktree working directory is deleted after removal.""" from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) wt_path = _worktree_dir(repo, "mydev") assert wt_path.exists() _invoke(repo, ["worktree", "remove", "mydev"]) assert not wt_path.exists() def test_remove_cleans_metadata(self, tmp_path: pathlib.Path) -> None: """Worktree metadata file is deleted after removal.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) meta = worktrees_dir(repo) / "mydev.json" assert meta.exists() _invoke(repo, ["worktree", "remove", "mydev"]) assert not meta.exists() def test_remove_not_in_list_after(self, tmp_path: pathlib.Path) -> None: """Removed worktree does not appear in subsequent list.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) _invoke(repo, ["worktree", "remove", "mydev"]) entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert all(e["name"] != "mydev" for e in entries) def test_remove_nonexistent_exits_1(self, tmp_path: pathlib.Path) -> None: """Removing a nonexistent worktree exits with code 1.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "remove", "ghost"]) assert result.exit_code == 1 def test_remove_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: """Running outside a repo exits with code 2.""" result = runner.invoke(None, ["worktree", "remove", "mydev"], env={"MUSE_REPO_ROOT": str(tmp_path)}) assert result.exit_code == 2 def test_remove_force_flag_accepted(self, tmp_path: pathlib.Path) -> None: """--force flag is accepted without error.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "remove", "mydev", "--force", "-j"]) assert result.exit_code == 0 assert _parse_remove(result.output)["status"] == "removed" def test_remove_json_all_fields(self, tmp_path: pathlib.Path) -> None: """JSON output contains name, status, exit_code, duration_ms.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "remove", "mydev", "-j"]).output)) assert {"name", "status", "exit_code", "duration_ms"}.issubset(raw.keys()) def test_remove_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help output includes the rich description.""" result = _invoke(tmp_path, ["worktree", "remove", "--help"]) assert result.exit_code == 0 assert "Agent quickstart" in result.output or "JSON output schema" in result.output def test_remove_idempotent_second_call_fails(self, tmp_path: pathlib.Path) -> None: """Removing the same worktree twice: second call exits 1.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) _invoke(repo, ["worktree", "remove", "mydev"]) result = _invoke(repo, ["worktree", "remove", "mydev"]) assert result.exit_code == 1 def test_remove_multiple_sequential(self, tmp_path: pathlib.Path) -> None: """Multiple worktrees can be removed one after another.""" repo = _make_repo(tmp_path) for name in ("a", "b", "c"): _add_branch(repo, name) _invoke(repo, ["worktree", "add", name, name]) for name in ("a", "b", "c"): r = _invoke(repo, ["worktree", "remove", name, "-j"]) assert r.exit_code == 0 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert len(entries) == 1 # only main remains def test_remove_leaves_branch_intact(self, tmp_path: pathlib.Path) -> None: """The branch ref still exists after the worktree is removed.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) _invoke(repo, ["worktree", "remove", "mydev"]) branch_ref = ref_path(repo, "dev") assert branch_ref.exists() class TestWorktreeRemoveSecurity: """Safety guards, ANSI sanitization, and error routing.""" def test_remove_refuses_symlink_path(self, tmp_path: pathlib.Path) -> None: """Removal is refused when the worktree path is a symlink.""" from muse.core.worktree import _worktree_meta_path import json as _json repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) # Replace the worktree dir with a symlink to a safe directory. from muse.core.worktree import _worktree_dir wt_dir = _worktree_dir(repo, "mydev") safe_target = tmp_path / "safe" safe_target.mkdir() import shutil shutil.rmtree(wt_dir) wt_dir.symlink_to(safe_target) result = _invoke(repo, ["worktree", "remove", "mydev"]) assert result.exit_code == 1 def test_remove_refuses_muse_overlap(self, tmp_path: pathlib.Path) -> None: """Removal is refused when the stored path resolves inside .muse/.""" import json as _json repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) # Create a real directory inside .muse/ so _safe_delete_path can # evaluate it (non-existent paths are a no-op, not a refusal). malicious_dir = muse_dir(repo) / "malicious" malicious_dir.mkdir(parents=True, exist_ok=True) # Tamper the metadata to point inside .muse/. meta = worktrees_dir(repo) / "mydev.json" data = _json.loads(meta.read_text()) data["path"] = str(malicious_dir) meta.write_text(_json.dumps(data)) result = _invoke(repo, ["worktree", "remove", "mydev"]) assert result.exit_code == 1 def test_remove_json_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in the name argument are stripped from JSON output.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) # The name goes through validate_branch_name at core level, but # sanitize_display is applied at CLI output level. Verify no ANSI leaks. result = _invoke(repo, ["worktree", "remove", "mydev", "-j"]) assert result.exit_code == 0 assert "\x1b" not in result.output def test_remove_error_to_stderr(self, tmp_path: pathlib.Path) -> None: """Error output does not contain valid JSON (errors go to stderr).""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "remove", "ghost"]) assert result.exit_code != 0 assert not result.stdout.strip().startswith("{") def test_remove_invalid_name_exits_1(self, tmp_path: pathlib.Path) -> None: """Path-traversal name is rejected before any filesystem operation.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "remove", "../../../etc/passwd"]) assert result.exit_code == 1 def test_remove_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: """JSON output is well-formed.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "remove", "mydev", "-j"]) parsed = json.loads(_json_blob(result.output)) assert isinstance(parsed, dict) assert isinstance(parsed["name"], str) assert isinstance(parsed["status"], str) class TestWorktreeRemoveStress: """Performance and scale tests for worktree remove.""" def test_remove_20_sequential(self, tmp_path: pathlib.Path) -> None: """20 worktrees can be created and removed sequentially.""" repo = _make_repo(tmp_path) for i in range(20): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) failures = [] for i in range(20): r = _invoke(repo, ["worktree", "remove", f"wt{i}", "-j"]) if r.exit_code != 0: failures.append(f"wt{i}: {r.output.strip()[:60]}") assert not failures entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) assert len(entries) == 1 # only main def test_remove_performance(self, tmp_path: pathlib.Path) -> None: """Removing 10 worktrees sequentially completes within 2 seconds.""" import time repo = _make_repo(tmp_path) for i in range(10): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) t0 = time.monotonic() for i in range(10): _invoke(repo, ["worktree", "remove", f"wt{i}"]) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"10 removes took {elapsed:.2f}s" def test_remove_add_remove_cycle(self, tmp_path: pathlib.Path) -> None: """A worktree can be re-added with the same name after removal.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") for _ in range(5): r_add = _invoke(repo, ["worktree", "add", "mydev", "dev"]) assert r_add.exit_code == 0 r_rm = _invoke(repo, ["worktree", "remove", "mydev"]) assert r_rm.exit_code == 0 # =========================================================================== # muse worktree prune — Extended / Security / Stress # =========================================================================== class TestWorktreesPruneExtended: """-j alias, dry-run, text output, schema, lifecycle edge cases.""" def test_prune_j_alias(self, tmp_path: pathlib.Path) -> None: """-j produces the same JSON as --json.""" repo = _make_repo(tmp_path) r1 = _invoke(repo, ["worktree", "prune", "--json"]) r2 = _invoke(repo, ["worktree", "prune", "-j"]) assert r1.exit_code == 0 and r2.exit_code == 0 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None) d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None) assert d1 == d2 def test_prune_empty_repo_exits_0(self, tmp_path: pathlib.Path) -> None: """Prune on a clean repo exits 0 with empty pruned list.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "prune", "-j"]) assert result.exit_code == 0 parsed = _parse_prune(result.output) assert parsed["pruned"] == [] assert parsed["count"] == 0 def test_prune_stale_in_json_pruned_list(self, tmp_path: pathlib.Path) -> None: """Stale worktree name appears in JSON pruned list.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output) assert "mydev" in parsed["pruned"] assert parsed["count"] == 1 assert parsed["dry_run"] is False def test_prune_removes_metadata(self, tmp_path: pathlib.Path) -> None: """Prune deletes the metadata file for a stale worktree.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) meta = worktrees_dir(repo) / "mydev.json" assert meta.exists() _invoke(repo, ["worktree", "prune"]) assert not meta.exists() def test_prune_dry_run_preserves_metadata(self, tmp_path: pathlib.Path) -> None: """--dry-run does not delete any metadata.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) meta = worktrees_dir(repo) / "mydev.json" _invoke(repo, ["worktree", "prune", "--dry-run"]) assert meta.exists() def test_prune_dry_run_json_dry_run_true(self, tmp_path: pathlib.Path) -> None: """JSON dry_run field is true when --dry-run is passed.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "--dry-run", "-j"]).output) assert parsed["dry_run"] is True assert "mydev" in parsed["pruned"] def test_prune_active_worktree_not_pruned(self, tmp_path: pathlib.Path) -> None: """Active (present) worktrees are not pruned.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output) assert "mydev" not in parsed["pruned"] assert parsed["count"] == 0 def test_prune_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json the output is human-readable text.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "prune"]) assert result.exit_code == 0 assert not result.output.strip().startswith("{") def test_prune_text_nothing_to_prune(self, tmp_path: pathlib.Path) -> None: """Text output says 'Nothing to prune' when clean.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "prune"]) assert "Nothing to prune" in result.output def test_prune_text_shows_stale_names(self, tmp_path: pathlib.Path) -> None: """Text output includes names of pruned worktrees.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) result = _invoke(repo, ["worktree", "prune"]) assert "mydev" in result.output def test_prune_count_matches_pruned_list(self, tmp_path: pathlib.Path) -> None: """count field always equals len(pruned).""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) for i in range(4): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) shutil.rmtree(_worktree_dir(repo, f"wt{i}")) parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output) assert parsed["count"] == len(parsed["pruned"]) == 4 def test_prune_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: """Running outside a repo exits with code 2.""" result = runner.invoke(None, ["worktree", "prune"], env={"MUSE_REPO_ROOT": str(tmp_path)}) assert result.exit_code == 2 def test_prune_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help includes the rich description.""" result = _invoke(tmp_path, ["worktree", "prune", "--help"]) assert result.exit_code == 0 assert "Agent quickstart" in result.output or "JSON output schema" in result.output def test_prune_leaves_branch_refs_intact(self, tmp_path: pathlib.Path) -> None: """Branch refs are never deleted by prune.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) _invoke(repo, ["worktree", "prune"]) assert (ref_path(repo, "dev")).exists() def test_prune_after_prune_nothing_left(self, tmp_path: pathlib.Path) -> None: """Running prune twice: second call reports nothing to prune.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) _invoke(repo, ["worktree", "prune"]) parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output) assert parsed["count"] == 0 def test_prune_json_is_array_for_pruned(self, tmp_path: pathlib.Path) -> None: """pruned field is always a JSON array.""" repo = _make_repo(tmp_path) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "prune", "-j"]).output)) assert isinstance(raw["pruned"], list) def test_prune_json_all_fields(self, tmp_path: pathlib.Path) -> None: """JSON output contains pruned, count, dry_run, exit_code, duration_ms.""" repo = _make_repo(tmp_path) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "prune", "-j"]).output)) assert {"pruned", "count", "dry_run", "exit_code", "duration_ms"}.issubset(raw.keys()) class TestWorktreePruneSecurity: """ANSI sanitization, corrupt metadata handling.""" def test_prune_json_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in worktree names are stripped from JSON pruned list.""" import json as _json repo = _make_repo(tmp_path) # Write a meta file whose stem contains no ANSI (filesystem prevents it), # but whose stored branch field has ANSI — prune goes by filename stem # for the name. Verify the output contains no escape sequences. meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": "dev", "path": str(repo / "mywt")})) # mywt dir does not exist → stale → will be pruned result = _invoke(repo, ["worktree", "prune", "-j"]) assert result.exit_code == 0 assert "\x1b" not in result.output def test_prune_text_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in worktree names are stripped from text output.""" import json as _json repo = _make_repo(tmp_path) meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": "dev", "path": str(repo / "mywt")})) result = _invoke(repo, ["worktree", "prune"]) assert "\x1b" not in result.output def test_prune_corrupt_meta_pruned_safely(self, tmp_path: pathlib.Path) -> None: """A corrupt (unparseable) metadata file is treated as stale and pruned.""" repo = _make_repo(tmp_path) meta_dir = worktrees_dir(repo) meta_dir.mkdir(parents=True, exist_ok=True) (meta_dir / "corrupt.json").write_text("NOT VALID JSON {{{") result = _invoke(repo, ["worktree", "prune", "-j"]) assert result.exit_code == 0 parsed = _parse_prune(result.output) assert "corrupt" in parsed["pruned"] def test_prune_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: """Output is well-formed JSON.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "prune", "-j"]) parsed = json.loads(_json_blob(result.output)) assert isinstance(parsed["pruned"], list) assert isinstance(parsed["count"], int) assert isinstance(parsed["dry_run"], bool) def test_prune_dry_run_no_side_effects(self, tmp_path: pathlib.Path) -> None: """--dry-run produces no filesystem side effects at all.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) for i in range(3): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) shutil.rmtree(_worktree_dir(repo, f"wt{i}")) metas_before = sorted(p.name for p in (worktrees_dir(repo)).glob("*.json")) _invoke(repo, ["worktree", "prune", "--dry-run"]) metas_after = sorted(p.name for p in (worktrees_dir(repo)).glob("*.json")) assert metas_before == metas_after class TestWorktreePruneStress: """Performance and scale tests for worktree prune.""" def test_prune_20_stale(self, tmp_path: pathlib.Path) -> None: """Pruning 20 stale entries reports count=20.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) for i in range(20): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) shutil.rmtree(_worktree_dir(repo, f"wt{i}")) parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output) assert parsed["count"] == 20 assert len(parsed["pruned"]) == 20 def test_prune_performance(self, tmp_path: pathlib.Path) -> None: """Pruning 15 stale worktrees completes within 2 seconds.""" import shutil import time from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) for i in range(15): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) shutil.rmtree(_worktree_dir(repo, f"wt{i}")) t0 = time.monotonic() result = _invoke(repo, ["worktree", "prune", "-j"]) elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 2.0, f"prune took {elapsed:.2f}s" def test_prune_mixed_active_and_stale(self, tmp_path: pathlib.Path) -> None: """Only stale worktrees are pruned; active ones are preserved.""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) for i in range(10): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) # Remove every other worktree dir to make it stale. stale = [f"wt{i}" for i in range(0, 10, 2)] active = [f"wt{i}" for i in range(1, 10, 2)] for name in stale: shutil.rmtree(_worktree_dir(repo, name)) parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output) assert parsed["count"] == 5 for name in stale: assert name in parsed["pruned"] for name in active: assert name not in parsed["pruned"] # Active worktrees still in list. entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output) listed_names = {e["name"] for e in entries} for name in active: assert name in listed_names # =========================================================================== # muse worktree repair — Extended / Security / Stress # =========================================================================== class TestWorktreeRepairExtended: """-j alias, text output, pointer writes, idempotency, edge cases.""" def test_repair_j_alias(self, tmp_path: pathlib.Path) -> None: """-j produces the same JSON as --json.""" repo = _make_repo(tmp_path) r1 = _invoke(repo, ["worktree", "repair", "--json"]) r2 = _invoke(repo, ["worktree", "repair", "-j"]) assert r1.exit_code == 0 and r2.exit_code == 0 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None) d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None) assert d1 == d2 def test_repair_empty_repo_exits_0(self, tmp_path: pathlib.Path) -> None: """Repair on a repo with no linked worktrees exits 0.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "repair", "-j"]) assert result.exit_code == 0 def test_repair_json_repaired_is_list(self, tmp_path: pathlib.Path) -> None: """JSON repaired field is always a list.""" repo = _make_repo(tmp_path) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output)) assert isinstance(raw["repaired"], list) def test_repair_json_only_repaired_field(self, tmp_path: pathlib.Path) -> None: """JSON output contains repaired, exit_code, duration_ms.""" repo = _make_repo(tmp_path) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output)) assert {"repaired", "exit_code", "duration_ms"}.issubset(raw.keys()) def test_repair_writes_pointer_file(self, tmp_path: pathlib.Path) -> None: """Repair writes the .muse pointer file in the worktree directory.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir pointer = _worktree_dir(repo, "mydev") / ".muse" pointer.unlink(missing_ok=True) assert not pointer.exists() result = _invoke(repo, ["worktree", "repair"]) assert result.exit_code == 0 assert pointer.exists() def test_repair_pointer_contains_store_path(self, tmp_path: pathlib.Path) -> None: """The pointer file content references the main .muse store.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir pointer = _worktree_dir(repo, "mydev") / ".muse" pointer.unlink(missing_ok=True) _invoke(repo, ["worktree", "repair"]) content = pointer.read_text() assert "musestore:" in content assert str(muse_dir(repo)) in content or ".muse" in content def test_repair_idempotent(self, tmp_path: pathlib.Path) -> None: """Running repair twice does not raise an error.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) r1 = _invoke(repo, ["worktree", "repair", "-j"]) r2 = _invoke(repo, ["worktree", "repair", "-j"]) assert r1.exit_code == 0 assert r2.exit_code == 0 def test_repair_includes_worktree_name_in_json(self, tmp_path: pathlib.Path) -> None: """JSON repaired list contains the linked worktree name.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir (_worktree_dir(repo, "mydev") / ".muse").unlink(missing_ok=True) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output)) assert "mydev" in raw["repaired"] def test_repair_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json the output is human-readable text.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "repair"]) assert result.exit_code == 0 assert not result.output.strip().startswith("{") def test_repair_text_no_worktrees_message(self, tmp_path: pathlib.Path) -> None: """Text output says 'All worktrees already have pointer files' when clean.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "repair"]) assert "already have pointer files" in result.output def test_repair_text_shows_repaired_name(self, tmp_path: pathlib.Path) -> None: """Text output shows the name of each repaired worktree.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir (_worktree_dir(repo, "mydev") / ".muse").unlink(missing_ok=True) result = _invoke(repo, ["worktree", "repair"]) assert "mydev" in result.output def test_repair_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: """Running outside a repo exits with code 2.""" result = runner.invoke(None, ["worktree", "repair"], env={"MUSE_REPO_ROOT": str(tmp_path)}) assert result.exit_code == 2 def test_repair_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help includes the rich description.""" result = _invoke(tmp_path, ["worktree", "repair", "--help"]) assert result.exit_code == 0 assert "Agent quickstart" in result.output or "JSON output schema" in result.output def test_repair_skips_absent_worktree_dirs(self, tmp_path: pathlib.Path) -> None: """Worktrees whose directories are absent are skipped (not in repaired list).""" import shutil from muse.core.worktree import _worktree_dir repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) shutil.rmtree(_worktree_dir(repo, "mydev")) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output)) assert "mydev" not in raw["repaired"] def test_repair_multiple_worktrees(self, tmp_path: pathlib.Path) -> None: """Repair fixes pointer files for all present linked worktrees.""" repo = _make_repo(tmp_path) for name in ("a", "b", "c"): _add_branch(repo, name) _invoke(repo, ["worktree", "add", name, name]) from muse.core.worktree import _worktree_dir for name in ("a", "b", "c"): (_worktree_dir(repo, name) / ".muse").unlink(missing_ok=True) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output)) for name in ("a", "b", "c"): assert name in raw["repaired"] assert len(raw["repaired"]) >= 3 def test_repair_is_valid_json(self, tmp_path: pathlib.Path) -> None: """JSON output is well-formed.""" repo = _make_repo(tmp_path) result = _invoke(repo, ["worktree", "repair", "-j"]) parsed = json.loads(_json_blob(result.output)) assert isinstance(parsed, dict) assert isinstance(parsed["repaired"], list) class TestWorktreeRepairSecurity: """ANSI sanitization and output integrity.""" def test_repair_json_no_ansi_in_output(self, tmp_path: pathlib.Path) -> None: """JSON output contains no ANSI escape sequences.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "repair", "-j"]) assert "\x1b" not in result.output def test_repair_text_no_ansi_in_output(self, tmp_path: pathlib.Path) -> None: """Text output contains no ANSI escape sequences.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) result = _invoke(repo, ["worktree", "repair"]) assert "\x1b" not in result.output def test_repair_does_not_write_outside_worktree_dirs(self, tmp_path: pathlib.Path) -> None: """Repair only writes inside registered worktree directories.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir wt_dir = _worktree_dir(repo, "mydev") _invoke(repo, ["worktree", "repair"]) # The only .muse file written should be inside the worktree directory. pointer = muse_dir(wt_dir) assert pointer.exists() # The main repo .muse dir must remain a directory (not overwritten). assert (muse_dir(repo)).is_dir() def test_repair_pointer_not_symlink(self, tmp_path: pathlib.Path) -> None: """The written pointer file is a regular file, not a symlink.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) from muse.core.worktree import _worktree_dir pointer = _worktree_dir(repo, "mydev") / ".muse" pointer.unlink(missing_ok=True) _invoke(repo, ["worktree", "repair"]) assert pointer.exists() assert not pointer.is_symlink() def test_repair_json_repaired_names_are_strings(self, tmp_path: pathlib.Path) -> None: """Every element of JSON repaired list is a string.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output)) for name in raw["repaired"]: assert isinstance(name, str) class TestWorktreeRepairStress: """Performance and scale tests for worktree repair.""" def test_repair_15_worktrees(self, tmp_path: pathlib.Path) -> None: """Repair correctly fixes pointer files for 15 linked worktrees.""" repo = _make_repo(tmp_path) for i in range(15): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) from muse.core.worktree import _worktree_dir for i in range(15): (_worktree_dir(repo, f"wt{i}") / ".muse").unlink(missing_ok=True) raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output)) assert len(raw["repaired"]) >= 15 def test_repair_performance(self, tmp_path: pathlib.Path) -> None: """Repairing 10 worktrees completes within 2 seconds.""" import time repo = _make_repo(tmp_path) for i in range(10): _add_branch(repo, f"br{i}") _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"]) from muse.core.worktree import _worktree_dir for i in range(10): (_worktree_dir(repo, f"wt{i}") / ".muse").unlink(missing_ok=True) t0 = time.monotonic() result = _invoke(repo, ["worktree", "repair", "-j"]) elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 2.0, f"repair took {elapsed:.2f}s" def test_repair_idempotent_10_times(self, tmp_path: pathlib.Path) -> None: """Running repair 10 times in a row never fails.""" repo = _make_repo(tmp_path) _add_branch(repo, "dev") _invoke(repo, ["worktree", "add", "mydev", "dev"]) for _ in range(10): r = _invoke(repo, ["worktree", "repair", "-j"]) assert r.exit_code == 0