gabriel / muse public
test_cmd_worktree_hardening.py python
2,551 lines 110.1 KB
Raw
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor ⚠ breaking 28 days ago
1 """Comprehensive hardening tests for ``muse worktree``.
2
3 Covers:
4 - Unit: _load_meta symlink/size guards, _safe_delete_path, _WORKTREES_META_DIR constant,
5 get_worktree_status, prune_worktrees dry_run
6 - Security: symlink meta file, oversized meta, path-in-JSON pointing to .muse/,
7 path-in-JSON pointing to symlink, ANSI injection, error routing to stderr
8 - JSON schema: all subcommands (add, list, status, remove, prune)
9 - Integration: -b/--create-branch, --path override, prune dry-run, status subcommand
10 - E2E: text output paths still correct, worktree lifecycle
11 - Stress: 50 worktrees, concurrent list reads
12 """
13 from __future__ import annotations
14
15 import json
16 import pathlib
17 import re
18 import threading
19 from typing import TypedDict
20
21 import pytest
22
23 from tests.cli_test_helper import CliRunner, InvokeResult
24 from muse.core.types import NULL_LONG_ID, long_id
25 from muse.core.paths import muse_dir, objects_dir, ref_path, worktrees_dir
26
27 runner = CliRunner()
28
29 _ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
30
31
32 # ---------------------------------------------------------------------------
33 # Repo helpers
34 # ---------------------------------------------------------------------------
35
36
37 def _make_repo(tmp_path: pathlib.Path, branch: str = "main") -> pathlib.Path:
38 """Create a minimal Muse repo with a named branch."""
39 repo = tmp_path / "myproject"
40 muse = muse_dir(repo)
41 for d in ("objects", "commits", "snapshots", "refs/heads"):
42 (muse / d).mkdir(parents=True, exist_ok=True)
43 (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
44 (muse / "HEAD").write_text(f"ref: refs/heads/{branch}\n")
45 (muse / "refs" / "heads" / branch).write_text(NULL_LONG_ID)
46 return repo
47
48
49 def _add_branch(repo: pathlib.Path, branch: str) -> None:
50 ref = ref_path(repo, branch)
51 ref.parent.mkdir(parents=True, exist_ok=True)
52 ref.write_text(NULL_LONG_ID)
53
54
55 def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult:
56 return runner.invoke(None, args, env={"MUSE_REPO_ROOT": str(repo)})
57
58
59 def _json_blob(output: str) -> str:
60 for line in output.splitlines():
61 stripped = line.strip()
62 if stripped.startswith(("{", "[")):
63 return stripped
64 return output.strip()
65
66
67 # ---------------------------------------------------------------------------
68 # Typed schema helpers
69 # ---------------------------------------------------------------------------
70
71
72 class _AddJson(TypedDict):
73 name: str
74 branch: str
75 path: str
76 head_commit: str | None
77
78
79 class _ListEntryJson(TypedDict):
80 name: str
81 branch: str
82 path: str
83 head_commit: str | None
84 is_main: bool
85
86
87 class _StatusJson(TypedDict):
88 name: str
89 branch: str
90 path: str
91 head_commit: str | None
92 present: bool
93 is_main: bool
94
95
96 class _RemoveJson(TypedDict):
97 name: str
98 status: str
99
100
101 class _PruneJson(TypedDict):
102 pruned: list[str]
103 count: int
104 dry_run: bool
105
106
107 def _parse_add(output: str) -> _AddJson:
108 raw = json.loads(_json_blob(output))
109 assert isinstance(raw, dict)
110 hc = raw.get("head_commit")
111 return _AddJson(
112 name=str(raw["name"]),
113 branch=str(raw["branch"]),
114 path=str(raw["path"]),
115 head_commit=str(hc) if hc is not None else None,
116 )
117
118
119 def _parse_list(output: str) -> list[_ListEntryJson]:
120 raw = json.loads(_json_blob(output))
121 # list returns an envelope: {worktrees, exit_code, duration_ms}
122 if isinstance(raw, dict):
123 raw = raw["worktrees"]
124 assert isinstance(raw, list)
125 result: list[_ListEntryJson] = []
126 for item in raw:
127 assert isinstance(item, dict)
128 hc = item["head_commit"]
129 assert hc is None or isinstance(hc, str)
130 result.append(_ListEntryJson(
131 name=str(item["name"]),
132 branch=str(item["branch"]),
133 path=str(item["path"]),
134 head_commit=hc,
135 is_main=bool(item["is_main"]),
136 ))
137 return result
138
139
140 def _parse_status(output: str) -> _StatusJson:
141 raw = json.loads(_json_blob(output))
142 assert isinstance(raw, dict)
143 hc = raw["head_commit"]
144 assert hc is None or isinstance(hc, str)
145 return _StatusJson(
146 name=str(raw["name"]),
147 branch=str(raw["branch"]),
148 path=str(raw["path"]),
149 head_commit=hc,
150 present=bool(raw["present"]),
151 is_main=bool(raw["is_main"]),
152 )
153
154
155 def _parse_remove(output: str) -> _RemoveJson:
156 raw = json.loads(_json_blob(output))
157 assert isinstance(raw, dict)
158 return _RemoveJson(name=str(raw["name"]), status=str(raw["status"]))
159
160
161 def _parse_prune(output: str) -> _PruneJson:
162 raw = json.loads(_json_blob(output))
163 assert isinstance(raw, dict)
164 pruned_val = raw["pruned"]
165 assert isinstance(pruned_val, list)
166 count_val = raw["count"]
167 assert isinstance(count_val, int)
168 dry_run_val = raw["dry_run"]
169 assert isinstance(dry_run_val, bool)
170 return _PruneJson(
171 pruned=[str(x) for x in pruned_val],
172 count=count_val,
173 dry_run=dry_run_val,
174 )
175
176
177 # ---------------------------------------------------------------------------
178 # Unit — _load_meta security guards
179 # ---------------------------------------------------------------------------
180
181
182 class TestLoadMetaSecurity:
183 def test_symlink_meta_file_rejected(self, tmp_path: pathlib.Path) -> None:
184 from muse.core.worktree import _load_meta, _worktree_meta_path
185
186 repo = _make_repo(tmp_path)
187 name = "my-wt"
188 # Create a symlink where the meta file should be.
189 meta_path = _worktree_meta_path(repo, name)
190 meta_path.parent.mkdir(parents=True, exist_ok=True)
191 real_file = tmp_path / "real_meta.json"
192 real_file.write_text(json.dumps({"name": name, "branch": "main", "path": str(tmp_path / "wt")}))
193 meta_path.symlink_to(real_file)
194 result = _load_meta(repo, name)
195 assert result is None
196
197 def test_oversized_meta_file_rejected(self, tmp_path: pathlib.Path) -> None:
198 from muse.core.worktree import _MAX_META_BYTES, _load_meta, _worktree_meta_path
199
200 repo = _make_repo(tmp_path)
201 name = "big-wt"
202 meta_path = _worktree_meta_path(repo, name)
203 meta_path.parent.mkdir(parents=True, exist_ok=True)
204 meta_path.write_text("x" * (_MAX_META_BYTES + 1))
205 result = _load_meta(repo, name)
206 assert result is None
207
208 def test_corrupt_json_returns_none(self, tmp_path: pathlib.Path) -> None:
209 from muse.core.worktree import _load_meta, _worktree_meta_path
210
211 repo = _make_repo(tmp_path)
212 name = "bad-wt"
213 meta_path = _worktree_meta_path(repo, name)
214 meta_path.parent.mkdir(parents=True, exist_ok=True)
215 meta_path.write_text("not valid json !!!{{{")
216 result = _load_meta(repo, name)
217 assert result is None
218
219 def test_missing_key_returns_none(self, tmp_path: pathlib.Path) -> None:
220 from muse.core.worktree import _load_meta, _worktree_meta_path
221
222 repo = _make_repo(tmp_path)
223 name = "missing-key-wt"
224 meta_path = _worktree_meta_path(repo, name)
225 meta_path.parent.mkdir(parents=True, exist_ok=True)
226 # Missing 'path' key.
227 meta_path.write_text(json.dumps({"name": name, "branch": "main"}))
228 result = _load_meta(repo, name)
229 assert result is None
230
231 def test_valid_meta_round_trips(self, tmp_path: pathlib.Path) -> None:
232 from muse.core.worktree import _load_meta, _save_meta, WorktreeRecord
233
234 repo = _make_repo(tmp_path)
235 (worktrees_dir(repo)).mkdir(parents=True, exist_ok=True)
236 record: WorktreeRecord = {"name": "wt1", "branch": "dev", "path": str(tmp_path / "wt1")}
237 _save_meta(repo, record)
238 loaded = _load_meta(repo, "wt1")
239 assert loaded is not None
240 assert loaded["name"] == "wt1"
241 assert loaded["branch"] == "dev"
242
243
244 # ---------------------------------------------------------------------------
245 # Unit — _safe_delete_path
246 # ---------------------------------------------------------------------------
247
248
249 class TestSafeDeletePath:
250 def test_normal_directory_deleted(self, tmp_path: pathlib.Path) -> None:
251 from muse.core.worktree import _safe_delete_path
252
253 repo = _make_repo(tmp_path)
254 target = tmp_path / "target-dir"
255 target.mkdir()
256 (target / "file.txt").write_text("content")
257 result = _safe_delete_path(repo, target)
258 assert result is True
259 assert not target.exists()
260
261 def test_nonexistent_path_returns_true(self, tmp_path: pathlib.Path) -> None:
262 from muse.core.worktree import _safe_delete_path
263
264 repo = _make_repo(tmp_path)
265 result = _safe_delete_path(repo, tmp_path / "no-such-dir")
266 assert result is True
267
268 def test_symlink_refused(self, tmp_path: pathlib.Path) -> None:
269 from muse.core.worktree import _safe_delete_path
270
271 repo = _make_repo(tmp_path)
272 real_dir = tmp_path / "real-dir"
273 real_dir.mkdir()
274 symlink = tmp_path / "link-to-dir"
275 symlink.symlink_to(real_dir)
276 result = _safe_delete_path(repo, symlink)
277 assert result is False
278 assert real_dir.exists() # real directory untouched
279
280 def test_path_inside_muse_refused(self, tmp_path: pathlib.Path) -> None:
281 from muse.core.worktree import _safe_delete_path
282
283 repo = _make_repo(tmp_path)
284 inside_muse = objects_dir(repo)
285 result = _safe_delete_path(repo, inside_muse)
286 assert result is False
287 assert inside_muse.exists()
288
289
290 # ---------------------------------------------------------------------------
291 # Unit — _WORKTREES_META_DIR constant
292 # ---------------------------------------------------------------------------
293
294
295 class TestWorktreesMetaDir:
296 def test_constant_used_in_worktrees_dir(self, tmp_path: pathlib.Path) -> None:
297 from muse.core.paths import worktrees_dir, muse_dir
298
299 repo = _make_repo(tmp_path)
300 assert worktrees_dir(repo) == muse_dir(repo) / "worktrees"
301
302
303 # ---------------------------------------------------------------------------
304 # Unit — get_worktree_status
305 # ---------------------------------------------------------------------------
306
307
308 class TestGetWorktreeStatus:
309 def test_main_worktree_status(self, tmp_path: pathlib.Path) -> None:
310 from muse.core.worktree import get_worktree_status
311
312 repo = _make_repo(tmp_path)
313 status = get_worktree_status(repo, "main")
314 assert status["is_main"] is True
315 assert status["name"] == "(main)"
316 assert status["branch"] == "main"
317 assert status["present"] is True
318
319 def test_main_alias_works(self, tmp_path: pathlib.Path) -> None:
320 from muse.core.worktree import get_worktree_status
321
322 repo = _make_repo(tmp_path)
323 s1 = get_worktree_status(repo, "main")
324 s2 = get_worktree_status(repo, "(main)")
325 assert s1["name"] == s2["name"]
326
327 def test_linked_worktree_present(self, tmp_path: pathlib.Path) -> None:
328 from muse.core.worktree import add_worktree, get_worktree_status
329
330 repo = _make_repo(tmp_path)
331 _add_branch(repo, "dev")
332 add_worktree(repo, "mydev", "dev")
333 status = get_worktree_status(repo, "mydev")
334 assert status["name"] == "mydev"
335 assert status["branch"] == "dev"
336 assert status["present"] is True
337 assert status["is_main"] is False
338
339 def test_linked_worktree_absent_after_delete(self, tmp_path: pathlib.Path) -> None:
340 from muse.core.worktree import add_worktree, get_worktree_status
341 import shutil
342
343 repo = _make_repo(tmp_path)
344 _add_branch(repo, "dev")
345 wt_path = add_worktree(repo, "mydev", "dev")
346 # Manually delete the worktree directory (simulating external removal).
347 shutil.rmtree(wt_path)
348 status = get_worktree_status(repo, "mydev")
349 assert status["present"] is False
350
351 def test_nonexistent_worktree_raises(self, tmp_path: pathlib.Path) -> None:
352 from muse.core.worktree import get_worktree_status
353
354 repo = _make_repo(tmp_path)
355 with pytest.raises(ValueError, match="does not exist"):
356 get_worktree_status(repo, "ghost")
357
358
359 # ---------------------------------------------------------------------------
360 # Unit — prune_worktrees dry_run
361 # ---------------------------------------------------------------------------
362
363
364 class TestPruneDryRun:
365 def test_dry_run_does_not_delete(self, tmp_path: pathlib.Path) -> None:
366 from muse.core.worktree import add_worktree, prune_worktrees
367 import shutil
368
369 repo = _make_repo(tmp_path)
370 _add_branch(repo, "dev")
371 wt_path = add_worktree(repo, "mydev", "dev")
372 shutil.rmtree(wt_path)
373 meta = worktrees_dir(repo) / "mydev.json"
374 assert meta.exists()
375
376 pruned = prune_worktrees(repo, dry_run=True)
377 assert "mydev" in pruned
378 # Meta file must still exist — dry_run=True.
379 assert meta.exists()
380
381 def test_dry_run_false_deletes(self, tmp_path: pathlib.Path) -> None:
382 from muse.core.worktree import add_worktree, prune_worktrees
383 import shutil
384
385 repo = _make_repo(tmp_path)
386 _add_branch(repo, "dev")
387 wt_path = add_worktree(repo, "mydev", "dev")
388 shutil.rmtree(wt_path)
389 meta = worktrees_dir(repo) / "mydev.json"
390
391 pruned = prune_worktrees(repo, dry_run=False)
392 assert "mydev" in pruned
393 assert not meta.exists()
394
395 def test_dry_run_returns_empty_when_all_present(self, tmp_path: pathlib.Path) -> None:
396 from muse.core.worktree import add_worktree, prune_worktrees
397
398 repo = _make_repo(tmp_path)
399 _add_branch(repo, "dev")
400 add_worktree(repo, "mydev", "dev")
401 pruned = prune_worktrees(repo, dry_run=True)
402 assert pruned == []
403
404
405 # ---------------------------------------------------------------------------
406 # Security — tampered path in meta.json
407 # ---------------------------------------------------------------------------
408
409
410 class TestTamperedMetaPath:
411 def test_path_inside_muse_refused_on_remove(self, tmp_path: pathlib.Path) -> None:
412 """A tampered path pointing inside .muse/ must be refused by remove_worktree."""
413 from muse.core.worktree import _worktree_meta_path, add_worktree, remove_worktree
414
415 repo = _make_repo(tmp_path)
416 _add_branch(repo, "dev")
417 add_worktree(repo, "mydev", "dev")
418
419 # Tamper: set path to the .muse/objects directory.
420 meta_path = _worktree_meta_path(repo, "mydev")
421 danger = str(objects_dir(repo))
422 meta_path.write_text(json.dumps({"name": "mydev", "branch": "dev", "path": danger}))
423
424 with pytest.raises(ValueError, match="Refusing"):
425 remove_worktree(repo, "mydev")
426
427 # The objects directory must still exist.
428 assert (objects_dir(repo)).exists()
429
430 def test_symlink_wt_path_refused_on_remove(self, tmp_path: pathlib.Path) -> None:
431 """A worktree path that resolves to a symlink must be refused by remove_worktree."""
432 from muse.core.worktree import _worktree_meta_path, add_worktree, remove_worktree
433
434 repo = _make_repo(tmp_path)
435 _add_branch(repo, "dev")
436 add_worktree(repo, "mydev", "dev")
437
438 # Replace the worktree directory with a symlink to an innocent directory.
439 from muse.core.worktree import _worktree_dir
440 wt_dir = _worktree_dir(repo, "mydev")
441 import shutil
442 shutil.rmtree(wt_dir)
443 innocent = tmp_path / "innocent-dir"
444 innocent.mkdir()
445 wt_dir.symlink_to(innocent)
446
447 with pytest.raises(ValueError, match="Refusing|symlink"):
448 remove_worktree(repo, "mydev")
449
450 assert innocent.exists()
451
452
453 # ---------------------------------------------------------------------------
454 # Security — error routing
455 # ---------------------------------------------------------------------------
456
457
458 class TestErrorRouting:
459 def test_add_nonexistent_branch_goes_to_stderr(self, tmp_path: pathlib.Path) -> None:
460 repo = _make_repo(tmp_path)
461 result = _invoke(repo, ["worktree", "add", "mywt", "no-such-branch"])
462 assert result.exit_code != 0
463 assert "❌" in result.stderr
464
465 def test_add_duplicate_name_goes_to_stderr(self, tmp_path: pathlib.Path) -> None:
466 repo = _make_repo(tmp_path)
467 _add_branch(repo, "dev")
468 _invoke(repo, ["worktree", "add", "mywt", "dev"])
469 result = _invoke(repo, ["worktree", "add", "mywt", "dev"])
470 assert result.exit_code != 0
471 assert "❌" in result.stderr
472
473 def test_remove_nonexistent_goes_to_stderr(self, tmp_path: pathlib.Path) -> None:
474 repo = _make_repo(tmp_path)
475 result = _invoke(repo, ["worktree", "remove", "ghost"])
476 assert result.exit_code != 0
477 assert "❌" in result.stderr
478
479 def test_status_nonexistent_goes_to_stderr(self, tmp_path: pathlib.Path) -> None:
480 repo = _make_repo(tmp_path)
481 result = _invoke(repo, ["worktree", "status", "ghost"])
482 assert result.exit_code != 0
483 assert "❌" in result.stderr
484
485 def test_create_branch_already_exists_goes_to_stderr(self, tmp_path: pathlib.Path) -> None:
486 repo = _make_repo(tmp_path)
487 _add_branch(repo, "dev")
488 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "dev"])
489 assert result.exit_code != 0
490 assert "❌" in result.stderr
491
492
493 # ---------------------------------------------------------------------------
494 # Security — ANSI sanitization
495 # ---------------------------------------------------------------------------
496
497
498 class TestAnsiSanitization:
499 def test_ansi_in_name_does_not_leak(self, tmp_path: pathlib.Path) -> None:
500 repo = _make_repo(tmp_path)
501 ansi_name = "\x1b[31mmywt\x1b[0m"
502 result = _invoke(repo, ["worktree", "add", ansi_name, "main"])
503 assert _ANSI_RE.search(result.output) is None
504
505 def test_ansi_in_branch_does_not_leak(self, tmp_path: pathlib.Path) -> None:
506 repo = _make_repo(tmp_path)
507 ansi_branch = "\x1b[31mmain\x1b[0m"
508 result = _invoke(repo, ["worktree", "add", "mywt", ansi_branch])
509 assert _ANSI_RE.search(result.output) is None
510
511
512 # ---------------------------------------------------------------------------
513 # JSON schema — add
514 # ---------------------------------------------------------------------------
515
516
517 class TestJsonSchemaAdd:
518 def test_add_json_schema(self, tmp_path: pathlib.Path) -> None:
519 repo = _make_repo(tmp_path)
520 _add_branch(repo, "dev")
521 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
522 assert result.exit_code == 0
523 parsed = _parse_add(result.output)
524 assert parsed["name"] == "mydev"
525 assert parsed["branch"] == "dev"
526 assert "mydev" in parsed["path"] or "myproject" in parsed["path"]
527
528 def test_add_json_with_create_branch(self, tmp_path: pathlib.Path) -> None:
529 repo = _make_repo(tmp_path)
530 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "--json"])
531 assert result.exit_code == 0
532 parsed = _parse_add(result.output)
533 assert parsed["branch"] == "feat/new"
534 assert parsed["name"] == "mywt"
535
536 def test_add_json_path_is_string(self, tmp_path: pathlib.Path) -> None:
537 repo = _make_repo(tmp_path)
538 _add_branch(repo, "dev")
539 result = _invoke(repo, ["worktree", "add", "dev-wt", "dev", "--json"])
540 assert result.exit_code == 0
541 parsed = _parse_add(result.output)
542 assert isinstance(parsed["path"], str)
543 assert len(parsed["path"]) > 0
544
545
546 # ---------------------------------------------------------------------------
547 # JSON schema — list
548 # ---------------------------------------------------------------------------
549
550
551 class TestJsonSchemaList:
552 def test_list_json_includes_main(self, tmp_path: pathlib.Path) -> None:
553 repo = _make_repo(tmp_path)
554 result = _invoke(repo, ["worktree", "list", "--json"])
555 assert result.exit_code == 0
556 entries = _parse_list(result.output)
557 assert any(e["is_main"] for e in entries)
558
559 def test_list_json_linked_worktree(self, tmp_path: pathlib.Path) -> None:
560 repo = _make_repo(tmp_path)
561 _add_branch(repo, "dev")
562 _invoke(repo, ["worktree", "add", "mydev", "dev"])
563 result = _invoke(repo, ["worktree", "list", "--json"])
564 assert result.exit_code == 0
565 entries = _parse_list(result.output)
566 names = [e["name"] for e in entries]
567 assert "mydev" in names
568
569 def test_list_json_entry_schema(self, tmp_path: pathlib.Path) -> None:
570 repo = _make_repo(tmp_path)
571 _add_branch(repo, "dev")
572 _invoke(repo, ["worktree", "add", "mydev", "dev"])
573 result = _invoke(repo, ["worktree", "list", "--json"])
574 entries = _parse_list(result.output)
575 for entry in entries:
576 assert isinstance(entry["name"], str)
577 assert isinstance(entry["branch"], str)
578 assert isinstance(entry["path"], str)
579 assert entry["head_commit"] is None or isinstance(entry["head_commit"], str)
580 assert isinstance(entry["is_main"], bool)
581
582 def test_list_json_empty_repo(self, tmp_path: pathlib.Path) -> None:
583 repo = _make_repo(tmp_path)
584 result = _invoke(repo, ["worktree", "list", "--json"])
585 assert result.exit_code == 0
586 entries = _parse_list(result.output)
587 assert len(entries) == 1 # always includes main
588
589
590 # ---------------------------------------------------------------------------
591 # JSON schema — status
592 # ---------------------------------------------------------------------------
593
594
595 class TestJsonSchemaStatus:
596 def test_status_json_main(self, tmp_path: pathlib.Path) -> None:
597 repo = _make_repo(tmp_path)
598 result = _invoke(repo, ["worktree", "status", "main", "--json"])
599 assert result.exit_code == 0
600 parsed = _parse_status(result.output)
601 assert parsed["is_main"] is True
602 assert parsed["present"] is True
603 assert parsed["branch"] == "main"
604
605 def test_status_json_linked_present(self, tmp_path: pathlib.Path) -> None:
606 repo = _make_repo(tmp_path)
607 _add_branch(repo, "dev")
608 _invoke(repo, ["worktree", "add", "mydev", "dev"])
609 result = _invoke(repo, ["worktree", "status", "mydev", "--json"])
610 assert result.exit_code == 0
611 parsed = _parse_status(result.output)
612 assert parsed["name"] == "mydev"
613 assert parsed["branch"] == "dev"
614 assert parsed["present"] is True
615 assert parsed["is_main"] is False
616
617 def test_status_json_linked_absent(self, tmp_path: pathlib.Path) -> None:
618 import shutil
619 repo = _make_repo(tmp_path)
620 _add_branch(repo, "dev")
621 _invoke(repo, ["worktree", "add", "mydev", "dev"])
622 # Delete the worktree directory externally.
623 from muse.core.worktree import _worktree_dir
624 wt_path = _worktree_dir(repo, "mydev")
625 shutil.rmtree(wt_path)
626 result = _invoke(repo, ["worktree", "status", "mydev", "--json"])
627 assert result.exit_code == 0
628 parsed = _parse_status(result.output)
629 assert parsed["present"] is False
630
631 def test_status_json_nonexistent_fails(self, tmp_path: pathlib.Path) -> None:
632 repo = _make_repo(tmp_path)
633 result = _invoke(repo, ["worktree", "status", "ghost", "--json"])
634 assert result.exit_code != 0
635
636 def test_status_json_all_fields_present(self, tmp_path: pathlib.Path) -> None:
637 repo = _make_repo(tmp_path)
638 result = _invoke(repo, ["worktree", "status", "main", "--json"])
639 parsed = _parse_status(result.output)
640 assert "name" in parsed
641 assert "branch" in parsed
642 assert "path" in parsed
643 assert "head_commit" in parsed
644 assert "present" in parsed
645 assert "is_main" in parsed
646
647
648 # ---------------------------------------------------------------------------
649 # JSON schema — remove
650 # ---------------------------------------------------------------------------
651
652
653 class TestJsonSchemaRemove:
654 def test_remove_json_schema(self, tmp_path: pathlib.Path) -> None:
655 repo = _make_repo(tmp_path)
656 _add_branch(repo, "dev")
657 _invoke(repo, ["worktree", "add", "mydev", "dev"])
658 result = _invoke(repo, ["worktree", "remove", "mydev", "--json"])
659 assert result.exit_code == 0
660 parsed = _parse_remove(result.output)
661 assert parsed["name"] == "mydev"
662 assert parsed["status"] == "removed"
663
664 def test_remove_json_nonexistent_fails(self, tmp_path: pathlib.Path) -> None:
665 repo = _make_repo(tmp_path)
666 result = _invoke(repo, ["worktree", "remove", "ghost", "--json"])
667 assert result.exit_code != 0
668
669 def test_remove_cleans_up_directory(self, tmp_path: pathlib.Path) -> None:
670 repo = _make_repo(tmp_path)
671 _add_branch(repo, "dev")
672 _invoke(repo, ["worktree", "add", "mydev", "dev"])
673 from muse.core.worktree import _worktree_dir
674 wt_path = _worktree_dir(repo, "mydev")
675 assert wt_path.exists()
676 _invoke(repo, ["worktree", "remove", "mydev"])
677 assert not wt_path.exists()
678
679
680 # ---------------------------------------------------------------------------
681 # JSON schema — prune
682 # ---------------------------------------------------------------------------
683
684
685 class TestJsonSchemaPrune:
686 def test_prune_json_nothing_to_prune(self, tmp_path: pathlib.Path) -> None:
687 repo = _make_repo(tmp_path)
688 result = _invoke(repo, ["worktree", "prune", "--json"])
689 assert result.exit_code == 0
690 parsed = _parse_prune(result.output)
691 assert parsed["pruned"] == []
692 assert parsed["count"] == 0
693 assert parsed["dry_run"] is False
694
695 def test_prune_json_stale_worktree(self, tmp_path: pathlib.Path) -> None:
696 import shutil
697 repo = _make_repo(tmp_path)
698 _add_branch(repo, "dev")
699 _invoke(repo, ["worktree", "add", "mydev", "dev"])
700 from muse.core.worktree import _worktree_dir
701 shutil.rmtree(_worktree_dir(repo, "mydev"))
702 result = _invoke(repo, ["worktree", "prune", "--json"])
703 assert result.exit_code == 0
704 parsed = _parse_prune(result.output)
705 assert "mydev" in parsed["pruned"]
706 assert parsed["count"] == 1
707 assert parsed["dry_run"] is False
708
709 def test_prune_json_dry_run(self, tmp_path: pathlib.Path) -> None:
710 import shutil
711 repo = _make_repo(tmp_path)
712 _add_branch(repo, "dev")
713 _invoke(repo, ["worktree", "add", "mydev", "dev"])
714 from muse.core.worktree import _worktree_dir
715 shutil.rmtree(_worktree_dir(repo, "mydev"))
716 # Meta file still present.
717 meta = worktrees_dir(repo) / "mydev.json"
718 assert meta.exists()
719 result = _invoke(repo, ["worktree", "prune", "--dry-run", "--json"])
720 assert result.exit_code == 0
721 parsed = _parse_prune(result.output)
722 assert "mydev" in parsed["pruned"]
723 assert parsed["dry_run"] is True
724 # Meta must not be deleted in dry-run.
725 assert meta.exists()
726
727 def test_prune_json_count_matches_pruned_list(self, tmp_path: pathlib.Path) -> None:
728 import shutil
729 repo = _make_repo(tmp_path)
730 for i in range(3):
731 branch = f"feat{i}"
732 _add_branch(repo, branch)
733 _invoke(repo, ["worktree", "add", f"wt{i}", branch])
734 from muse.core.worktree import _worktree_dir
735 shutil.rmtree(_worktree_dir(repo, f"wt{i}"))
736 result = _invoke(repo, ["worktree", "prune", "--json"])
737 parsed = _parse_prune(result.output)
738 assert parsed["count"] == len(parsed["pruned"])
739 assert parsed["count"] == 3
740
741
742 # ---------------------------------------------------------------------------
743 # Integration — -b/--create-branch
744 # ---------------------------------------------------------------------------
745
746
747 class TestCreateBranch:
748 def test_create_branch_makes_new_ref(self, tmp_path: pathlib.Path) -> None:
749 repo = _make_repo(tmp_path)
750 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "--json"])
751 assert result.exit_code == 0
752 new_ref = ref_path(repo, "feat") / "new"
753 assert new_ref.exists()
754
755 def test_create_branch_checked_out_in_worktree(self, tmp_path: pathlib.Path) -> None:
756 repo = _make_repo(tmp_path)
757 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "--json"])
758 assert result.exit_code == 0
759 parsed = _parse_add(result.output)
760 assert parsed["branch"] == "feat/new"
761
762 def test_create_branch_existing_name_fails(self, tmp_path: pathlib.Path) -> None:
763 repo = _make_repo(tmp_path)
764 # 'main' already exists.
765 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "main"])
766 assert result.exit_code != 0
767
768 def test_create_branch_no_commits_on_start_fails(self, tmp_path: pathlib.Path) -> None:
769 repo = _make_repo(tmp_path)
770 # Create a branch that has no commits.
771 empty_branch_ref = ref_path(repo, "empty")
772 # Don't write anything to it — it doesn't exist as a ref.
773 # The start point HEAD should be 'main' which has '0'*64 as a dummy ref,
774 # get_head_commit_id reads the file content; let's make it return None.
775 empty_branch_ref.write_text("") # empty file → no commit
776 result = _invoke(repo, ["worktree", "add", "mywt", "empty", "-b", "feat/from-empty"])
777 # get_head_commit_id for a branch with empty ref content returns None → error
778 assert result.exit_code != 0
779
780
781 # ---------------------------------------------------------------------------
782 # Integration — --path override
783 # ---------------------------------------------------------------------------
784
785
786 class TestCustomPath:
787 def test_custom_path_used(self, tmp_path: pathlib.Path) -> None:
788 repo = _make_repo(tmp_path)
789 _add_branch(repo, "dev")
790 custom = tmp_path / "custom-wt-dir"
791 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--path", str(custom), "--json"])
792 assert result.exit_code == 0
793 parsed = _parse_add(result.output)
794 assert pathlib.Path(parsed["path"]).resolve() == custom.resolve()
795 assert custom.exists()
796
797 def test_custom_path_shows_in_list(self, tmp_path: pathlib.Path) -> None:
798 repo = _make_repo(tmp_path)
799 _add_branch(repo, "dev")
800 custom = tmp_path / "custom-wt-dir"
801 _invoke(repo, ["worktree", "add", "mydev", "dev", "--path", str(custom)])
802 result = _invoke(repo, ["worktree", "list", "--json"])
803 entries = _parse_list(result.output)
804 paths = [e["path"] for e in entries]
805 assert any("custom-wt-dir" in p for p in paths)
806
807
808 # ---------------------------------------------------------------------------
809 # E2E — text output (non-JSON) still correct
810 # ---------------------------------------------------------------------------
811
812
813 class TestE2EText:
814 def test_add_text_output(self, tmp_path: pathlib.Path) -> None:
815 repo = _make_repo(tmp_path)
816 _add_branch(repo, "dev")
817 result = _invoke(repo, ["worktree", "add", "mydev", "dev"])
818 assert result.exit_code == 0
819 assert "✅" in result.output or "created" in result.output.lower()
820
821 def test_list_text_output(self, tmp_path: pathlib.Path) -> None:
822 repo = _make_repo(tmp_path)
823 result = _invoke(repo, ["worktree", "list"])
824 assert result.exit_code == 0
825 assert "main" in result.output
826
827 def test_status_text_output(self, tmp_path: pathlib.Path) -> None:
828 repo = _make_repo(tmp_path)
829 _add_branch(repo, "dev")
830 _invoke(repo, ["worktree", "add", "mydev", "dev"])
831 result = _invoke(repo, ["worktree", "status", "mydev"])
832 assert result.exit_code == 0
833 assert "mydev" in result.output
834 assert "dev" in result.output
835
836 def test_remove_text_output(self, tmp_path: pathlib.Path) -> None:
837 repo = _make_repo(tmp_path)
838 _add_branch(repo, "dev")
839 _invoke(repo, ["worktree", "add", "mydev", "dev"])
840 result = _invoke(repo, ["worktree", "remove", "mydev"])
841 assert result.exit_code == 0
842 assert "✅" in result.output or "removed" in result.output.lower()
843
844 def test_prune_text_nothing(self, tmp_path: pathlib.Path) -> None:
845 repo = _make_repo(tmp_path)
846 result = _invoke(repo, ["worktree", "prune"])
847 assert result.exit_code == 0
848 assert "Nothing" in result.output
849
850 def test_prune_text_dry_run(self, tmp_path: pathlib.Path) -> None:
851 import shutil
852 repo = _make_repo(tmp_path)
853 _add_branch(repo, "dev")
854 _invoke(repo, ["worktree", "add", "mydev", "dev"])
855 from muse.core.worktree import _worktree_dir
856 shutil.rmtree(_worktree_dir(repo, "mydev"))
857 result = _invoke(repo, ["worktree", "prune", "--dry-run"])
858 assert result.exit_code == 0
859 assert "dry-run" in result.output or "would prune" in result.output.lower()
860
861 def test_status_text_main(self, tmp_path: pathlib.Path) -> None:
862 repo = _make_repo(tmp_path)
863 result = _invoke(repo, ["worktree", "status", "main"])
864 assert result.exit_code == 0
865 assert "present" in result.output.lower()
866
867
868 # ---------------------------------------------------------------------------
869 # Stress
870 # ---------------------------------------------------------------------------
871
872
873 class TestStress:
874 def test_50_worktrees_add_list_remove(self, tmp_path: pathlib.Path) -> None:
875 """Create 50 worktrees, verify list, then remove all."""
876 repo = _make_repo(tmp_path)
877 names: list[str] = []
878 for i in range(50):
879 branch = f"branch{i}"
880 _add_branch(repo, branch)
881 name = f"wt{i}"
882 r = _invoke(repo, ["worktree", "add", name, branch, "--json"])
883 assert r.exit_code == 0, f"Failed to add wt{i}: {r.output}"
884 names.append(name)
885
886 r_list = _invoke(repo, ["worktree", "list", "--json"])
887 assert r_list.exit_code == 0
888 entries = _parse_list(r_list.output)
889 listed_names = {e["name"] for e in entries}
890 for name in names:
891 assert name in listed_names
892
893 for name in names:
894 r_rm = _invoke(repo, ["worktree", "remove", name, "--json"])
895 assert r_rm.exit_code == 0
896
897 def test_concurrent_list_reads_safe(self, tmp_path: pathlib.Path) -> None:
898 """Concurrent reads of worktree list must not crash."""
899 repo = _make_repo(tmp_path)
900 for i in range(5):
901 branch = f"br{i}"
902 _add_branch(repo, branch)
903 from muse.core.worktree import add_worktree
904 add_worktree(repo, f"wt{i}", branch)
905
906 errors: list[str] = []
907
908 def _read() -> None:
909 from muse.core.worktree import list_worktrees
910 try:
911 wts = list_worktrees(repo)
912 assert len(wts) >= 1
913 except Exception as exc:
914 errors.append(str(exc))
915
916 threads = [threading.Thread(target=_read) for _ in range(30)]
917 for t in threads:
918 t.start()
919 for t in threads:
920 t.join()
921
922 assert not errors, f"Concurrent list failures: {errors}"
923
924 def test_prune_50_stale_worktrees(self, tmp_path: pathlib.Path) -> None:
925 """prune_worktrees with 50 stale entries all reported correctly."""
926 import shutil
927 repo = _make_repo(tmp_path)
928 for i in range(50):
929 branch = f"br{i}"
930 _add_branch(repo, branch)
931 from muse.core.worktree import add_worktree, _worktree_dir
932 add_worktree(repo, f"wt{i}", branch)
933 shutil.rmtree(_worktree_dir(repo, f"wt{i}"))
934
935 result = _invoke(repo, ["worktree", "prune", "--json"])
936 assert result.exit_code == 0
937 parsed = _parse_prune(result.output)
938 assert parsed["count"] == 50
939 assert len(parsed["pruned"]) == 50
940
941
942 # ---------------------------------------------------------------------------
943 # Extended — muse worktree add
944 # ---------------------------------------------------------------------------
945
946 class TestWorktreeAddExtended:
947 def test_add_j_alias(self, tmp_path: pathlib.Path) -> None:
948 repo = _make_repo(tmp_path)
949 _add_branch(repo, "dev")
950 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "-j"])
951 assert result.exit_code == 0
952 d = _parse_add(result.output)
953 assert d["name"] == "mydev"
954
955 def test_add_json_schema_has_head_commit(self, tmp_path: pathlib.Path) -> None:
956 repo = _make_repo(tmp_path)
957 _add_branch(repo, "dev")
958 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
959 assert result.exit_code == 0
960 d = _parse_add(result.output)
961 assert "head_commit" in d
962
963 def test_add_json_head_commit_is_sha256_or_null(self, tmp_path: pathlib.Path) -> None:
964 repo = _make_repo(tmp_path)
965 _add_branch(repo, "dev")
966 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
967 d = _parse_add(result.output)
968 hc = d["head_commit"]
969 assert hc is None or (isinstance(hc, str) and hc.startswith("sha256:"))
970
971 def test_add_json_schema_complete(self, tmp_path: pathlib.Path) -> None:
972 _REQUIRED = {"name", "branch", "path", "head_commit"}
973 repo = _make_repo(tmp_path)
974 _add_branch(repo, "dev")
975 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
976 assert _REQUIRED <= _parse_add(result.output).keys()
977
978 def test_add_worktree_dir_created(self, tmp_path: pathlib.Path) -> None:
979 repo = _make_repo(tmp_path)
980 _add_branch(repo, "dev")
981 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
982 wt_path = pathlib.Path(_parse_add(result.output)["path"])
983 assert wt_path.exists() and wt_path.is_dir()
984
985 def test_add_appears_in_list(self, tmp_path: pathlib.Path) -> None:
986 repo = _make_repo(tmp_path)
987 _add_branch(repo, "dev")
988 _invoke(repo, ["worktree", "add", "mydev", "dev"])
989 entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output)
990 assert any(e["name"] == "mydev" for e in entries)
991
992 def test_add_duplicate_name_fails(self, tmp_path: pathlib.Path) -> None:
993 repo = _make_repo(tmp_path)
994 _add_branch(repo, "dev")
995 _invoke(repo, ["worktree", "add", "mydev", "dev"])
996 result = _invoke(repo, ["worktree", "add", "mydev", "dev"])
997 assert result.exit_code != 0
998
999 def test_add_nonexistent_branch_fails(self, tmp_path: pathlib.Path) -> None:
1000 repo = _make_repo(tmp_path)
1001 result = _invoke(repo, ["worktree", "add", "mywt", "no-such-branch"])
1002 assert result.exit_code != 0
1003
1004 def test_add_json_branch_matches_arg(self, tmp_path: pathlib.Path) -> None:
1005 repo = _make_repo(tmp_path)
1006 _add_branch(repo, "dev")
1007 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
1008 assert _parse_add(result.output)["branch"] == "dev"
1009
1010 def test_add_json_name_matches_arg(self, tmp_path: pathlib.Path) -> None:
1011 repo = _make_repo(tmp_path)
1012 _add_branch(repo, "dev")
1013 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
1014 assert _parse_add(result.output)["name"] == "mydev"
1015
1016 def test_add_text_shows_checkmark(self, tmp_path: pathlib.Path) -> None:
1017 repo = _make_repo(tmp_path)
1018 _add_branch(repo, "dev")
1019 result = _invoke(repo, ["worktree", "add", "mydev", "dev"])
1020 assert "✅" in result.output or "created" in result.output.lower()
1021
1022 def test_add_text_shows_branch(self, tmp_path: pathlib.Path) -> None:
1023 repo = _make_repo(tmp_path)
1024 _add_branch(repo, "dev")
1025 result = _invoke(repo, ["worktree", "add", "mydev", "dev"])
1026 assert "dev" in result.output
1027
1028 def test_add_text_shows_path(self, tmp_path: pathlib.Path) -> None:
1029 repo = _make_repo(tmp_path)
1030 _add_branch(repo, "dev")
1031 result = _invoke(repo, ["worktree", "add", "mydev", "dev"])
1032 assert "mydev" in result.output
1033
1034 def test_add_default_is_text(self, tmp_path: pathlib.Path) -> None:
1035 repo = _make_repo(tmp_path)
1036 _add_branch(repo, "dev")
1037 result = _invoke(repo, ["worktree", "add", "mydev", "dev"])
1038 try:
1039 json.loads(result.output)
1040 assert False, "default should be text"
1041 except (json.JSONDecodeError, ValueError):
1042 pass
1043
1044 def test_add_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None:
1045 result = _invoke(tmp_path, ["worktree", "add", "mywt", "main"])
1046 assert result.exit_code == 2
1047
1048 def test_add_description_in_help(self, tmp_path: pathlib.Path) -> None:
1049 result = _invoke(tmp_path, ["worktree", "add", "--help"])
1050 assert result.exit_code == 0
1051 assert "Agent quickstart" in result.output or "head_commit" in result.output
1052
1053 def test_add_create_branch_with_j(self, tmp_path: pathlib.Path) -> None:
1054 repo = _make_repo(tmp_path)
1055 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "feat/new", "-j"])
1056 assert result.exit_code == 0
1057 d = _parse_add(result.output)
1058 assert d["branch"] == "feat/new"
1059 assert "head_commit" in d
1060
1061 def test_add_path_is_absolute(self, tmp_path: pathlib.Path) -> None:
1062 repo = _make_repo(tmp_path)
1063 _add_branch(repo, "dev")
1064 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
1065 path = pathlib.Path(_parse_add(result.output)["path"])
1066 assert path.is_absolute()
1067
1068 def test_add_multiple_worktrees(self, tmp_path: pathlib.Path) -> None:
1069 repo = _make_repo(tmp_path)
1070 for i in range(5):
1071 _add_branch(repo, f"feat{i}")
1072 r = _invoke(repo, ["worktree", "add", f"wt{i}", f"feat{i}", "--json"])
1073 assert r.exit_code == 0, f"wt{i} failed: {r.output}"
1074 entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output)
1075 names = [e["name"] for e in entries]
1076 for i in range(5):
1077 assert f"wt{i}" in names
1078
1079 def test_add_worktree_has_muse_pointer(self, tmp_path: pathlib.Path) -> None:
1080 """Each worktree directory must contain a .muse pointer file."""
1081 repo = _make_repo(tmp_path)
1082 _add_branch(repo, "dev")
1083 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
1084 wt_path = pathlib.Path(_parse_add(result.output)["path"])
1085 assert (muse_dir(wt_path)).exists()
1086
1087 def test_add_invalid_name_exits_1(self, tmp_path: pathlib.Path) -> None:
1088 repo = _make_repo(tmp_path)
1089 result = _invoke(repo, ["worktree", "add", "bad name!", "main"])
1090 assert result.exit_code != 0
1091
1092 def test_add_invalid_create_branch_name_exits_1(self, tmp_path: pathlib.Path) -> None:
1093 repo = _make_repo(tmp_path)
1094 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "bad name!"])
1095 assert result.exit_code != 0
1096
1097
1098 # ---------------------------------------------------------------------------
1099 # Security — muse worktree add
1100 # ---------------------------------------------------------------------------
1101
1102 class TestWorktreeAddSecurity:
1103 def test_ansi_in_name_sanitized_text(self, tmp_path: pathlib.Path) -> None:
1104 """ANSI in name must not appear raw in text output."""
1105 repo = _make_repo(tmp_path)
1106 _add_branch(repo, "dev")
1107 # Invalid name (contains space/special) will be rejected — just verify no crash
1108 result = _invoke(repo, ["worktree", "add", "\x1b[31mwt\x1b[0m", "dev"])
1109 assert "\x1b[31m" not in result.output
1110
1111 def test_ansi_in_branch_sanitized_text(self, tmp_path: pathlib.Path) -> None:
1112 repo = _make_repo(tmp_path)
1113 result = _invoke(repo, ["worktree", "add", "mywt", "\x1b[31mmain\x1b[0m"])
1114 assert "\x1b[31m" not in result.output
1115
1116 def test_path_traversal_name_rejected(self, tmp_path: pathlib.Path) -> None:
1117 """Names with path separators must be rejected."""
1118 repo = _make_repo(tmp_path)
1119 result = _invoke(repo, ["worktree", "add", "../traversal", "main"])
1120 assert result.exit_code != 0
1121
1122 def test_null_byte_in_name_rejected(self, tmp_path: pathlib.Path) -> None:
1123 repo = _make_repo(tmp_path)
1124 result = _invoke(repo, ["worktree", "add", "bad\x00name", "main"])
1125 assert result.exit_code != 0
1126
1127 def test_existing_directory_prevented(self, tmp_path: pathlib.Path) -> None:
1128 """add must refuse if the target directory already exists."""
1129 repo = _make_repo(tmp_path)
1130 _add_branch(repo, "dev")
1131 # Pre-create the target directory
1132 from muse.core.worktree import _worktree_dir
1133 wt_path = _worktree_dir(repo, "mydev")
1134 wt_path.mkdir(parents=True)
1135 result = _invoke(repo, ["worktree", "add", "mydev", "dev"])
1136 assert result.exit_code != 0
1137
1138 def test_error_to_stderr(self, tmp_path: pathlib.Path) -> None:
1139 """Error messages for missing branch must go to stderr."""
1140 repo = _make_repo(tmp_path)
1141 result = _invoke(repo, ["worktree", "add", "mywt", "no-such-branch"])
1142 assert result.exit_code != 0
1143 assert "no-such-branch" in (result.stderr or result.output)
1144
1145 def test_create_branch_duplicate_rejected(self, tmp_path: pathlib.Path) -> None:
1146 repo = _make_repo(tmp_path)
1147 result = _invoke(repo, ["worktree", "add", "mywt", "main", "-b", "main"])
1148 assert result.exit_code != 0
1149
1150 def test_json_output_is_valid_json(self, tmp_path: pathlib.Path) -> None:
1151 repo = _make_repo(tmp_path)
1152 _add_branch(repo, "dev")
1153 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
1154 assert result.exit_code == 0
1155 d = json.loads(result.output.strip().splitlines()[0])
1156 assert isinstance(d, dict)
1157
1158
1159 # ---------------------------------------------------------------------------
1160 # Stress — muse worktree add
1161 # ---------------------------------------------------------------------------
1162
1163 class TestWorktreeAddStress:
1164 def test_add_20_worktrees_sequential(self, tmp_path: pathlib.Path) -> None:
1165 """Add 20 worktrees sequentially; all must succeed and appear in list."""
1166 repo = _make_repo(tmp_path)
1167 failures: list[str] = []
1168 for i in range(20):
1169 _add_branch(repo, f"feat{i}")
1170 r = _invoke(repo, ["worktree", "add", f"wt{i}", f"feat{i}", "--json"])
1171 if r.exit_code != 0:
1172 failures.append(f"wt{i}: {r.output.strip()[:60]}")
1173 assert not failures, f"Add failures: {failures}"
1174 entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output)
1175 assert len(entries) == 21 # 20 linked + 1 main
1176
1177 def test_add_performance(self, tmp_path: pathlib.Path) -> None:
1178 """Adding a worktree must complete in < 2 s."""
1179 import time
1180 repo = _make_repo(tmp_path)
1181 _add_branch(repo, "dev")
1182 start = time.perf_counter()
1183 result = _invoke(repo, ["worktree", "add", "mydev", "dev", "--json"])
1184 elapsed = time.perf_counter() - start
1185 assert result.exit_code == 0
1186 assert elapsed < 2.0, f"add took {elapsed:.2f}s"
1187
1188 def test_add_with_create_branch_20_times(self, tmp_path: pathlib.Path) -> None:
1189 """Create 20 worktrees each with a fresh -b branch."""
1190 repo = _make_repo(tmp_path)
1191 failures: list[str] = []
1192 for i in range(20):
1193 r = _invoke(repo, ["worktree", "add", f"wt{i}", "main", "-b", f"feat/task-{i}", "--json"])
1194 if r.exit_code != 0:
1195 failures.append(f"wt{i}: {r.output.strip()[:60]}")
1196 assert not failures
1197 entries = _parse_list(_invoke(repo, ["worktree", "list", "--json"]).output)
1198 assert len(entries) == 21
1199
1200
1201 # ===========================================================================
1202 # muse worktree list — Extended / Security / Stress
1203 # ===========================================================================
1204
1205
1206 class TestWorktreeListExtended:
1207 """-j alias, text output, ordering, schema completeness, edge cases."""
1208
1209 def test_list_j_alias_json(self, tmp_path: pathlib.Path) -> None:
1210 """-j is accepted and produces the same JSON as --json."""
1211 repo = _make_repo(tmp_path)
1212 r1 = _invoke(repo, ["worktree", "list", "--json"])
1213 r2 = _invoke(repo, ["worktree", "list", "-j"])
1214 assert r1.exit_code == 0
1215 assert r2.exit_code == 0
1216 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None)
1217 d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None)
1218 assert d1 == d2
1219
1220 def test_list_default_is_text(self, tmp_path: pathlib.Path) -> None:
1221 """Without --json the output is human-readable text, not JSON."""
1222 repo = _make_repo(tmp_path)
1223 result = _invoke(repo, ["worktree", "list"])
1224 assert result.exit_code == 0
1225 output = result.output.strip()
1226 assert not output.startswith("[")
1227 assert not output.startswith("{")
1228
1229 def test_list_text_has_header(self, tmp_path: pathlib.Path) -> None:
1230 """Text output includes a column header."""
1231 repo = _make_repo(tmp_path)
1232 result = _invoke(repo, ["worktree", "list"])
1233 assert "name" in result.output
1234 assert "branch" in result.output
1235
1236 def test_list_text_empty_repo_prints_main(self, tmp_path: pathlib.Path) -> None:
1237 """Repo with no linked worktrees still shows main in text mode."""
1238 repo = _make_repo(tmp_path)
1239 result = _invoke(repo, ["worktree", "list"])
1240 assert result.exit_code == 0
1241 assert "(main)" in result.output
1242
1243 def test_list_main_always_first_in_json(self, tmp_path: pathlib.Path) -> None:
1244 """Main worktree is always the first element in JSON output."""
1245 repo = _make_repo(tmp_path)
1246 _add_branch(repo, "feat/x")
1247 _invoke(repo, ["worktree", "add", "feat-x", "feat/x"])
1248 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1249 assert entries[0]["is_main"] is True
1250
1251 def test_list_linked_worktrees_sorted(self, tmp_path: pathlib.Path) -> None:
1252 """Linked worktrees appear in lexicographic order after main."""
1253 repo = _make_repo(tmp_path)
1254 for name in ("zzz", "aaa", "mmm"):
1255 _add_branch(repo, name)
1256 _invoke(repo, ["worktree", "add", name, name])
1257 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1258 linked = [e["name"] for e in entries if not e["is_main"]]
1259 assert linked == sorted(linked)
1260
1261 def test_list_json_path_is_absolute(self, tmp_path: pathlib.Path) -> None:
1262 """Every path in JSON output is absolute."""
1263 repo = _make_repo(tmp_path)
1264 _add_branch(repo, "dev")
1265 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1266 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1267 for entry in entries:
1268 assert pathlib.Path(entry["path"]).is_absolute()
1269
1270 def test_list_json_all_fields_present(self, tmp_path: pathlib.Path) -> None:
1271 """Every JSON entry has name, branch, path, head_commit, is_main."""
1272 repo = _make_repo(tmp_path)
1273 _add_branch(repo, "dev")
1274 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1275 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "list", "-j"]).output))
1276 for entry in raw["worktrees"]:
1277 for field in ("name", "branch", "path", "head_commit", "is_main"):
1278 assert field in entry, f"field '{field}' missing from {entry}"
1279
1280 def test_list_json_only_one_is_main(self, tmp_path: pathlib.Path) -> None:
1281 """Exactly one entry has is_main=true."""
1282 repo = _make_repo(tmp_path)
1283 for name in ("a", "b", "c"):
1284 _add_branch(repo, name)
1285 _invoke(repo, ["worktree", "add", name, name])
1286 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1287 assert sum(1 for e in entries if e["is_main"]) == 1
1288
1289 def test_list_json_linked_is_main_false(self, tmp_path: pathlib.Path) -> None:
1290 """All linked worktrees have is_main=false."""
1291 repo = _make_repo(tmp_path)
1292 _add_branch(repo, "dev")
1293 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1294 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1295 linked = [e for e in entries if not e["is_main"]]
1296 assert all(e["is_main"] is False for e in linked)
1297
1298 def test_list_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None:
1299 """Running outside a repo exits with code 2."""
1300 result = runner.invoke(None, ["worktree", "list"], env={"MUSE_REPO_ROOT": str(tmp_path)})
1301 assert result.exit_code == 2
1302
1303 def test_list_json_output_is_array(self, tmp_path: pathlib.Path) -> None:
1304 """JSON output envelope wraps worktrees list in a dict."""
1305 repo = _make_repo(tmp_path)
1306 result = _invoke(repo, ["worktree", "list", "-j"])
1307 raw = json.loads(_json_blob(result.output))
1308 assert isinstance(raw["worktrees"], list)
1309
1310 def test_list_json_count_matches_worktrees(self, tmp_path: pathlib.Path) -> None:
1311 """JSON array length equals 1 (main) + number of linked worktrees."""
1312 repo = _make_repo(tmp_path)
1313 n = 5
1314 for i in range(n):
1315 _add_branch(repo, f"br{i}")
1316 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
1317 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1318 assert len(entries) == n + 1
1319
1320 def test_list_help_has_description(self, tmp_path: pathlib.Path) -> None:
1321 """--help output includes the rich description text."""
1322 result = _invoke(tmp_path, ["worktree", "list", "--help"])
1323 assert result.exit_code == 0
1324 assert "Agent quickstart" in result.output or "JSON output schema" in result.output
1325
1326 def test_list_text_shows_worktree_name(self, tmp_path: pathlib.Path) -> None:
1327 """Text output includes linked worktree name."""
1328 repo = _make_repo(tmp_path)
1329 _add_branch(repo, "dev")
1330 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1331 result = _invoke(repo, ["worktree", "list"])
1332 assert "mydev" in result.output
1333
1334 def test_list_text_marks_main_with_star(self, tmp_path: pathlib.Path) -> None:
1335 """Main worktree row starts with '*' in text output."""
1336 repo = _make_repo(tmp_path)
1337 result = _invoke(repo, ["worktree", "list"])
1338 lines = result.output.splitlines()
1339 assert any(l.startswith("* ") for l in lines)
1340
1341 def test_list_after_remove_not_shown(self, tmp_path: pathlib.Path) -> None:
1342 """A removed worktree no longer appears in list."""
1343 repo = _make_repo(tmp_path)
1344 _add_branch(repo, "dev")
1345 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1346 _invoke(repo, ["worktree", "remove", "mydev"])
1347 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1348 assert all(e["name"] != "mydev" for e in entries)
1349
1350 def test_list_json_branch_matches_added_branch(self, tmp_path: pathlib.Path) -> None:
1351 """Branch field in list JSON matches the branch used at add time."""
1352 repo = _make_repo(tmp_path)
1353 _add_branch(repo, "feat/x")
1354 _invoke(repo, ["worktree", "add", "feat-x", "feat/x"])
1355 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1356 feat = next(e for e in entries if e["name"] == "feat-x")
1357 assert feat["branch"] == "feat/x"
1358
1359
1360 class TestWorktreeListSecurity:
1361 """ANSI sanitization, path integrity, error routing."""
1362
1363 def test_list_json_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None:
1364 """ANSI codes injected into stored branch name are stripped from JSON."""
1365 repo = _make_repo(tmp_path)
1366 poisoned = "dev\x1b[31mred\x1b[0m"
1367 # Write the metadata directly so we bypass name validation.
1368 meta_dir = worktrees_dir(repo)
1369 meta_dir.mkdir(parents=True, exist_ok=True)
1370 import json as _json
1371 (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")}))
1372 # Create the worktree path so it is "present".
1373 (repo / "mywt").mkdir()
1374 result = _invoke(repo, ["worktree", "list", "-j"])
1375 assert result.exit_code == 0
1376 raw = _json.loads(_json_blob(result.output))
1377 wt = next((e for e in raw["worktrees"] if e["name"] == "mywt"), None)
1378 assert wt is not None
1379 assert "\x1b" not in wt["branch"]
1380
1381 def test_list_text_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None:
1382 """ANSI codes injected into stored branch are stripped from text output."""
1383 repo = _make_repo(tmp_path)
1384 poisoned = "dev\x1b[31mred\x1b[0m"
1385 meta_dir = worktrees_dir(repo)
1386 meta_dir.mkdir(parents=True, exist_ok=True)
1387 import json as _json
1388 (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")}))
1389 (repo / "mywt").mkdir()
1390 result = _invoke(repo, ["worktree", "list"])
1391 assert "\x1b" not in result.output
1392
1393 def test_list_error_to_stderr_not_stdout(self, tmp_path: pathlib.Path) -> None:
1394 """Error output goes to stderr, not stdout."""
1395 result = runner.invoke(None, ["worktree", "list"], env={"MUSE_REPO_ROOT": str(tmp_path)})
1396 assert result.exit_code != 0
1397 assert result.stdout == "" or not result.stdout.strip().startswith("[")
1398
1399 def test_list_json_path_never_empty(self, tmp_path: pathlib.Path) -> None:
1400 """path field in JSON is always a non-empty string."""
1401 repo = _make_repo(tmp_path)
1402 _add_branch(repo, "dev")
1403 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1404 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1405 for entry in entries:
1406 assert entry["path"]
1407
1408 def test_list_json_is_valid_json(self, tmp_path: pathlib.Path) -> None:
1409 """Output is well-formed JSON (no trailing commas, correct types)."""
1410 repo = _make_repo(tmp_path)
1411 _add_branch(repo, "dev")
1412 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1413 result = _invoke(repo, ["worktree", "list", "-j"])
1414 parsed = json.loads(_json_blob(result.output))
1415 assert isinstance(parsed["worktrees"], list)
1416 for entry in parsed["worktrees"]:
1417 assert isinstance(entry["is_main"], bool)
1418 assert isinstance(entry["name"], str)
1419
1420 def test_list_json_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None:
1421 """ANSI codes injected into stored worktree name are stripped from JSON."""
1422 repo = _make_repo(tmp_path)
1423 poisoned_name = "mywt"
1424 branch = "dev"
1425 meta_dir = worktrees_dir(repo)
1426 meta_dir.mkdir(parents=True, exist_ok=True)
1427 import json as _json
1428 (meta_dir / f"{poisoned_name}.json").write_text(
1429 _json.dumps({"name": poisoned_name, "branch": branch, "path": str(repo / poisoned_name)})
1430 )
1431 (repo / poisoned_name).mkdir()
1432 # Patch the name field as returned by list_worktrees via WorktreeInfo
1433 # by writing a meta file with a name that would contain ANSI if stored.
1434 # Since name comes from filename stem (already safe), verify the
1435 # sanitize_display path is exercised by injecting directly into the
1436 # raw output path — branch covers this adequately (tested above).
1437 result = _invoke(repo, ["worktree", "list", "-j"])
1438 assert "\x1b" not in result.output
1439
1440
1441 class TestWorktreeListStress:
1442 """Performance and scale tests for worktree list."""
1443
1444 def test_list_20_worktrees(self, tmp_path: pathlib.Path) -> None:
1445 """List with 20 linked worktrees returns all 21 entries (main + 20)."""
1446 repo = _make_repo(tmp_path)
1447 for i in range(20):
1448 _add_branch(repo, f"br{i}")
1449 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
1450 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1451 assert len(entries) == 21
1452
1453 def test_list_performance(self, tmp_path: pathlib.Path) -> None:
1454 """Listing 15 worktrees completes within 2 seconds."""
1455 import time
1456 repo = _make_repo(tmp_path)
1457 for i in range(15):
1458 _add_branch(repo, f"br{i}")
1459 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
1460 t0 = time.monotonic()
1461 result = _invoke(repo, ["worktree", "list", "-j"])
1462 elapsed = time.monotonic() - t0
1463 assert result.exit_code == 0
1464 assert elapsed < 2.0, f"list took {elapsed:.2f}s"
1465
1466 def test_list_concurrent_reads(self, tmp_path: pathlib.Path) -> None:
1467 """Concurrent list invocations via threads all return consistent counts."""
1468 repo = _make_repo(tmp_path)
1469 for i in range(10):
1470 _add_branch(repo, f"br{i}")
1471 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
1472 results: list[int] = []
1473 errors: list[str] = []
1474 lock = threading.Lock()
1475
1476 def _run() -> None:
1477 r = _invoke(repo, ["worktree", "list", "-j"])
1478 with lock:
1479 if r.exit_code != 0:
1480 errors.append(r.output)
1481 return
1482 try:
1483 envelope = json.loads(_json_blob(r.output))
1484 results.append(len(envelope["worktrees"]))
1485 except (json.JSONDecodeError, ValueError) as exc:
1486 errors.append(f"parse error: {exc!r} output={r.output!r}")
1487
1488 threads = [threading.Thread(target=_run) for _ in range(8)]
1489 for t in threads:
1490 t.start()
1491 for t in threads:
1492 t.join()
1493 assert not errors, f"Concurrent list errors: {errors}"
1494 assert all(n == 11 for n in results), f"Inconsistent counts: {results}"
1495
1496
1497 # ===========================================================================
1498 # muse worktree status — Extended / Security / Stress
1499 # ===========================================================================
1500
1501
1502 class TestWorktreeStatusExtended:
1503 """-j alias, text output, main aliases, all fields, edge cases."""
1504
1505 def test_status_j_alias(self, tmp_path: pathlib.Path) -> None:
1506 """-j produces the same JSON as --json."""
1507 repo = _make_repo(tmp_path)
1508 r1 = _invoke(repo, ["worktree", "status", "main", "--json"])
1509 r2 = _invoke(repo, ["worktree", "status", "main", "-j"])
1510 assert r1.exit_code == 0 and r2.exit_code == 0
1511 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None)
1512 d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None)
1513 assert d1 == d2
1514
1515 def test_status_main_alias_main(self, tmp_path: pathlib.Path) -> None:
1516 """'main' resolves to the main worktree."""
1517 repo = _make_repo(tmp_path)
1518 result = _invoke(repo, ["worktree", "status", "main", "-j"])
1519 assert result.exit_code == 0
1520 parsed = _parse_status(result.output)
1521 assert parsed["is_main"] is True
1522
1523 def test_status_main_alias_paren_main(self, tmp_path: pathlib.Path) -> None:
1524 """'(main)' resolves to the main worktree."""
1525 repo = _make_repo(tmp_path)
1526 result = _invoke(repo, ["worktree", "status", "(main)", "-j"])
1527 assert result.exit_code == 0
1528 parsed = _parse_status(result.output)
1529 assert parsed["is_main"] is True
1530
1531 def test_status_linked_present(self, tmp_path: pathlib.Path) -> None:
1532 """present=true when the worktree directory exists."""
1533 repo = _make_repo(tmp_path)
1534 _add_branch(repo, "dev")
1535 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1536 parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output)
1537 assert parsed["present"] is True
1538
1539 def test_status_linked_absent(self, tmp_path: pathlib.Path) -> None:
1540 """present=false when the worktree directory has been deleted."""
1541 import shutil
1542 from muse.core.worktree import _worktree_dir
1543 repo = _make_repo(tmp_path)
1544 _add_branch(repo, "dev")
1545 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1546 shutil.rmtree(_worktree_dir(repo, "mydev"))
1547 parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output)
1548 assert parsed["present"] is False
1549
1550 def test_status_nonexistent_exits_1(self, tmp_path: pathlib.Path) -> None:
1551 """Querying a nonexistent worktree exits with code 1."""
1552 repo = _make_repo(tmp_path)
1553 result = _invoke(repo, ["worktree", "status", "ghost", "-j"])
1554 assert result.exit_code == 1
1555
1556 def test_status_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None:
1557 """Running outside a repo exits with code 2."""
1558 result = runner.invoke(None, ["worktree", "status", "main"], env={"MUSE_REPO_ROOT": str(tmp_path)})
1559 assert result.exit_code == 2
1560
1561 def test_status_json_all_fields_present(self, tmp_path: pathlib.Path) -> None:
1562 """JSON output contains all six required fields."""
1563 repo = _make_repo(tmp_path)
1564 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "status", "main", "-j"]).output))
1565 for field in ("name", "branch", "path", "head_commit", "present", "is_main"):
1566 assert field in raw, f"field '{field}' missing"
1567
1568 def test_status_json_path_is_absolute(self, tmp_path: pathlib.Path) -> None:
1569 """path field in JSON is absolute."""
1570 repo = _make_repo(tmp_path)
1571 parsed = _parse_status(_invoke(repo, ["worktree", "status", "main", "-j"]).output)
1572 assert pathlib.Path(parsed["path"]).is_absolute()
1573
1574 def test_status_json_branch_matches(self, tmp_path: pathlib.Path) -> None:
1575 """branch field matches the branch the worktree was created with."""
1576 repo = _make_repo(tmp_path)
1577 _add_branch(repo, "feat/x")
1578 _invoke(repo, ["worktree", "add", "feat-x", "feat/x"])
1579 parsed = _parse_status(_invoke(repo, ["worktree", "status", "feat-x", "-j"]).output)
1580 assert parsed["branch"] == "feat/x"
1581
1582 def test_status_json_is_main_false_for_linked(self, tmp_path: pathlib.Path) -> None:
1583 """is_main=false for a linked worktree."""
1584 repo = _make_repo(tmp_path)
1585 _add_branch(repo, "dev")
1586 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1587 parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output)
1588 assert parsed["is_main"] is False
1589
1590 def test_status_default_is_text(self, tmp_path: pathlib.Path) -> None:
1591 """Without --json output is human-readable text, not JSON."""
1592 repo = _make_repo(tmp_path)
1593 result = _invoke(repo, ["worktree", "status", "main"])
1594 assert result.exit_code == 0
1595 output = result.output.strip()
1596 assert not output.startswith("{")
1597
1598 def test_status_text_shows_branch(self, tmp_path: pathlib.Path) -> None:
1599 """Text output includes the branch name."""
1600 repo = _make_repo(tmp_path)
1601 result = _invoke(repo, ["worktree", "status", "main"])
1602 assert "main" in result.output
1603
1604 def test_status_text_shows_present(self, tmp_path: pathlib.Path) -> None:
1605 """Text output indicates the worktree is present."""
1606 repo = _make_repo(tmp_path)
1607 result = _invoke(repo, ["worktree", "status", "main"])
1608 assert "present" in result.output
1609
1610 def test_status_text_main_flag(self, tmp_path: pathlib.Path) -> None:
1611 """Text output marks the main worktree with '[main]'."""
1612 repo = _make_repo(tmp_path)
1613 result = _invoke(repo, ["worktree", "status", "main"])
1614 assert "[main]" in result.output
1615
1616 def test_status_help_has_description(self, tmp_path: pathlib.Path) -> None:
1617 """--help includes the rich description."""
1618 result = _invoke(tmp_path, ["worktree", "status", "--help"])
1619 assert result.exit_code == 0
1620 assert "Agent quickstart" in result.output or "JSON output schema" in result.output
1621
1622 def test_status_head_commit_null_for_empty_branch(self, tmp_path: pathlib.Path) -> None:
1623 """head_commit is null when no commits exist on the branch."""
1624 repo = _make_repo(tmp_path)
1625 parsed = _parse_status(_invoke(repo, ["worktree", "status", "main", "-j"]).output)
1626 # The test repo uses "0"*64 as the ref — get_head_commit_id may return
1627 # that or None depending on whether the commit object exists.
1628 assert parsed["head_commit"] is None or isinstance(parsed["head_commit"], str)
1629
1630 def test_status_json_name_matches_requested(self, tmp_path: pathlib.Path) -> None:
1631 """name field in JSON matches the queried worktree name."""
1632 repo = _make_repo(tmp_path)
1633 _add_branch(repo, "dev")
1634 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1635 parsed = _parse_status(_invoke(repo, ["worktree", "status", "mydev", "-j"]).output)
1636 assert parsed["name"] == "mydev"
1637
1638 def test_status_json_is_valid_json(self, tmp_path: pathlib.Path) -> None:
1639 """Output is well-formed JSON."""
1640 repo = _make_repo(tmp_path)
1641 result = _invoke(repo, ["worktree", "status", "main", "-j"])
1642 parsed = json.loads(_json_blob(result.output))
1643 assert isinstance(parsed, dict)
1644 assert isinstance(parsed["is_main"], bool)
1645 assert isinstance(parsed["present"], bool)
1646
1647
1648 class TestWorktreeStatusSecurity:
1649 """ANSI sanitization and error routing for worktree status."""
1650
1651 def test_status_json_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None:
1652 """ANSI codes in stored branch are stripped from JSON output."""
1653 repo = _make_repo(tmp_path)
1654 import json as _json
1655 poisoned = "dev\x1b[31mred\x1b[0m"
1656 meta_dir = worktrees_dir(repo)
1657 meta_dir.mkdir(parents=True, exist_ok=True)
1658 (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")}))
1659 (repo / "mywt").mkdir()
1660 result = _invoke(repo, ["worktree", "status", "mywt", "-j"])
1661 assert result.exit_code == 0
1662 raw = _json.loads(_json_blob(result.output))
1663 assert "\x1b" not in raw["branch"]
1664
1665 def test_status_text_ansi_in_branch_sanitized(self, tmp_path: pathlib.Path) -> None:
1666 """ANSI codes in stored branch are stripped from text output."""
1667 repo = _make_repo(tmp_path)
1668 import json as _json
1669 poisoned = "dev\x1b[31mred\x1b[0m"
1670 meta_dir = worktrees_dir(repo)
1671 meta_dir.mkdir(parents=True, exist_ok=True)
1672 (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": poisoned, "path": str(repo / "mywt")}))
1673 (repo / "mywt").mkdir()
1674 result = _invoke(repo, ["worktree", "status", "mywt"])
1675 assert "\x1b" not in result.output
1676
1677 def test_status_error_to_stderr(self, tmp_path: pathlib.Path) -> None:
1678 """Error output goes to stderr, not stdout."""
1679 repo = _make_repo(tmp_path)
1680 result = _invoke(repo, ["worktree", "status", "ghost"])
1681 assert result.exit_code != 0
1682 assert result.stdout.strip() == "" or not result.stdout.strip().startswith("{")
1683
1684 def test_status_json_path_never_empty(self, tmp_path: pathlib.Path) -> None:
1685 """path is always a non-empty string."""
1686 repo = _make_repo(tmp_path)
1687 parsed = _parse_status(_invoke(repo, ["worktree", "status", "main", "-j"]).output)
1688 assert parsed["path"]
1689
1690 def test_status_json_ansi_in_path_sanitized(self, tmp_path: pathlib.Path) -> None:
1691 """ANSI codes in stored path are stripped from JSON output."""
1692 repo = _make_repo(tmp_path)
1693 import json as _json
1694 meta_dir = worktrees_dir(repo)
1695 meta_dir.mkdir(parents=True, exist_ok=True)
1696 poisoned_path = str(repo / "mywt") + "\x1b[31m"
1697 (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": "dev", "path": poisoned_path}))
1698 (repo / "mywt").mkdir()
1699 result = _invoke(repo, ["worktree", "status", "mywt", "-j"])
1700 assert result.exit_code == 0
1701 raw = _json.loads(_json_blob(result.output))
1702 assert "\x1b" not in raw["path"]
1703
1704 def test_status_invalid_name_exits_1(self, tmp_path: pathlib.Path) -> None:
1705 """An invalid worktree name (path traversal attempt) exits with 1."""
1706 repo = _make_repo(tmp_path)
1707 result = _invoke(repo, ["worktree", "status", "../../../etc/passwd"])
1708 assert result.exit_code == 1
1709
1710
1711 class TestWorktreeStatusStress:
1712 """Performance and scale tests for worktree status."""
1713
1714 def test_status_10_sequential(self, tmp_path: pathlib.Path) -> None:
1715 """Querying status of 10 worktrees sequentially all succeed."""
1716 repo = _make_repo(tmp_path)
1717 for i in range(10):
1718 _add_branch(repo, f"br{i}")
1719 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
1720 for i in range(10):
1721 result = _invoke(repo, ["worktree", "status", f"wt{i}", "-j"])
1722 assert result.exit_code == 0
1723 parsed = _parse_status(result.output)
1724 assert parsed["name"] == f"wt{i}"
1725
1726 def test_status_performance(self, tmp_path: pathlib.Path) -> None:
1727 """10 sequential status queries complete within 2 seconds."""
1728 import time
1729 repo = _make_repo(tmp_path)
1730 for i in range(10):
1731 _add_branch(repo, f"br{i}")
1732 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
1733 t0 = time.monotonic()
1734 for i in range(10):
1735 _invoke(repo, ["worktree", "status", f"wt{i}", "-j"])
1736 elapsed = time.monotonic() - t0
1737 assert elapsed < 2.0, f"10 status queries took {elapsed:.2f}s"
1738
1739 def test_status_concurrent_reads(self, tmp_path: pathlib.Path) -> None:
1740 """Concurrent status queries against the same worktree all succeed."""
1741 repo = _make_repo(tmp_path)
1742 _add_branch(repo, "dev")
1743 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1744 errors: list[str] = []
1745 lock = threading.Lock()
1746
1747 def _run() -> None:
1748 r = _invoke(repo, ["worktree", "status", "mydev", "-j"])
1749 if r.exit_code != 0:
1750 with lock:
1751 errors.append(r.output)
1752
1753 threads = [threading.Thread(target=_run) for _ in range(8)]
1754 for t in threads:
1755 t.start()
1756 for t in threads:
1757 t.join()
1758 assert not errors, f"Concurrent status errors: {errors}"
1759
1760
1761 # ===========================================================================
1762 # muse worktree remove — Extended / Security / Stress
1763 # ===========================================================================
1764
1765
1766 class TestWorktreeRemoveExtended:
1767 """-j alias, text output, lifecycle, metadata cleanup, edge cases."""
1768
1769 def test_remove_j_alias(self, tmp_path: pathlib.Path) -> None:
1770 """-j produces the same JSON as --json."""
1771 repo = _make_repo(tmp_path)
1772 _add_branch(repo, "dev")
1773 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1774 result = _invoke(repo, ["worktree", "remove", "mydev", "-j"])
1775 assert result.exit_code == 0
1776 parsed = _parse_remove(result.output)
1777 assert parsed["name"] == "mydev"
1778 assert parsed["status"] == "removed"
1779
1780 def test_remove_default_is_text(self, tmp_path: pathlib.Path) -> None:
1781 """Without --json output is human-readable text."""
1782 repo = _make_repo(tmp_path)
1783 _add_branch(repo, "dev")
1784 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1785 result = _invoke(repo, ["worktree", "remove", "mydev"])
1786 assert result.exit_code == 0
1787 assert not result.output.strip().startswith("{")
1788 assert "mydev" in result.output
1789
1790 def test_remove_json_name_field(self, tmp_path: pathlib.Path) -> None:
1791 """JSON name field matches the removed worktree."""
1792 repo = _make_repo(tmp_path)
1793 _add_branch(repo, "feat/x")
1794 _invoke(repo, ["worktree", "add", "feat-x", "feat/x"])
1795 parsed = _parse_remove(_invoke(repo, ["worktree", "remove", "feat-x", "-j"]).output)
1796 assert parsed["name"] == "feat-x"
1797
1798 def test_remove_json_status_field(self, tmp_path: pathlib.Path) -> None:
1799 """JSON status field is always 'removed'."""
1800 repo = _make_repo(tmp_path)
1801 _add_branch(repo, "dev")
1802 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1803 parsed = _parse_remove(_invoke(repo, ["worktree", "remove", "mydev", "-j"]).output)
1804 assert parsed["status"] == "removed"
1805
1806 def test_remove_cleans_working_dir(self, tmp_path: pathlib.Path) -> None:
1807 """Worktree working directory is deleted after removal."""
1808 from muse.core.worktree import _worktree_dir
1809 repo = _make_repo(tmp_path)
1810 _add_branch(repo, "dev")
1811 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1812 wt_path = _worktree_dir(repo, "mydev")
1813 assert wt_path.exists()
1814 _invoke(repo, ["worktree", "remove", "mydev"])
1815 assert not wt_path.exists()
1816
1817 def test_remove_cleans_metadata(self, tmp_path: pathlib.Path) -> None:
1818 """Worktree metadata file is deleted after removal."""
1819 repo = _make_repo(tmp_path)
1820 _add_branch(repo, "dev")
1821 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1822 meta = worktrees_dir(repo) / "mydev.json"
1823 assert meta.exists()
1824 _invoke(repo, ["worktree", "remove", "mydev"])
1825 assert not meta.exists()
1826
1827 def test_remove_not_in_list_after(self, tmp_path: pathlib.Path) -> None:
1828 """Removed worktree does not appear in subsequent list."""
1829 repo = _make_repo(tmp_path)
1830 _add_branch(repo, "dev")
1831 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1832 _invoke(repo, ["worktree", "remove", "mydev"])
1833 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1834 assert all(e["name"] != "mydev" for e in entries)
1835
1836 def test_remove_nonexistent_exits_1(self, tmp_path: pathlib.Path) -> None:
1837 """Removing a nonexistent worktree exits with code 1."""
1838 repo = _make_repo(tmp_path)
1839 result = _invoke(repo, ["worktree", "remove", "ghost"])
1840 assert result.exit_code == 1
1841
1842 def test_remove_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None:
1843 """Running outside a repo exits with code 2."""
1844 result = runner.invoke(None, ["worktree", "remove", "mydev"], env={"MUSE_REPO_ROOT": str(tmp_path)})
1845 assert result.exit_code == 2
1846
1847 def test_remove_force_flag_accepted(self, tmp_path: pathlib.Path) -> None:
1848 """--force flag is accepted without error."""
1849 repo = _make_repo(tmp_path)
1850 _add_branch(repo, "dev")
1851 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1852 result = _invoke(repo, ["worktree", "remove", "mydev", "--force", "-j"])
1853 assert result.exit_code == 0
1854 assert _parse_remove(result.output)["status"] == "removed"
1855
1856 def test_remove_json_all_fields(self, tmp_path: pathlib.Path) -> None:
1857 """JSON output contains name, status, exit_code, duration_ms."""
1858 repo = _make_repo(tmp_path)
1859 _add_branch(repo, "dev")
1860 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1861 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "remove", "mydev", "-j"]).output))
1862 assert {"name", "status", "exit_code", "duration_ms"}.issubset(raw.keys())
1863
1864 def test_remove_help_has_description(self, tmp_path: pathlib.Path) -> None:
1865 """--help output includes the rich description."""
1866 result = _invoke(tmp_path, ["worktree", "remove", "--help"])
1867 assert result.exit_code == 0
1868 assert "Agent quickstart" in result.output or "JSON output schema" in result.output
1869
1870 def test_remove_idempotent_second_call_fails(self, tmp_path: pathlib.Path) -> None:
1871 """Removing the same worktree twice: second call exits 1."""
1872 repo = _make_repo(tmp_path)
1873 _add_branch(repo, "dev")
1874 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1875 _invoke(repo, ["worktree", "remove", "mydev"])
1876 result = _invoke(repo, ["worktree", "remove", "mydev"])
1877 assert result.exit_code == 1
1878
1879 def test_remove_multiple_sequential(self, tmp_path: pathlib.Path) -> None:
1880 """Multiple worktrees can be removed one after another."""
1881 repo = _make_repo(tmp_path)
1882 for name in ("a", "b", "c"):
1883 _add_branch(repo, name)
1884 _invoke(repo, ["worktree", "add", name, name])
1885 for name in ("a", "b", "c"):
1886 r = _invoke(repo, ["worktree", "remove", name, "-j"])
1887 assert r.exit_code == 0
1888 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1889 assert len(entries) == 1 # only main remains
1890
1891 def test_remove_leaves_branch_intact(self, tmp_path: pathlib.Path) -> None:
1892 """The branch ref still exists after the worktree is removed."""
1893 repo = _make_repo(tmp_path)
1894 _add_branch(repo, "dev")
1895 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1896 _invoke(repo, ["worktree", "remove", "mydev"])
1897 branch_ref = ref_path(repo, "dev")
1898 assert branch_ref.exists()
1899
1900
1901 class TestWorktreeRemoveSecurity:
1902 """Safety guards, ANSI sanitization, and error routing."""
1903
1904 def test_remove_refuses_symlink_path(self, tmp_path: pathlib.Path) -> None:
1905 """Removal is refused when the worktree path is a symlink."""
1906 from muse.core.worktree import _worktree_meta_path
1907 import json as _json
1908 repo = _make_repo(tmp_path)
1909 _add_branch(repo, "dev")
1910 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1911 # Replace the worktree dir with a symlink to a safe directory.
1912 from muse.core.worktree import _worktree_dir
1913 wt_dir = _worktree_dir(repo, "mydev")
1914 safe_target = tmp_path / "safe"
1915 safe_target.mkdir()
1916 import shutil
1917 shutil.rmtree(wt_dir)
1918 wt_dir.symlink_to(safe_target)
1919 result = _invoke(repo, ["worktree", "remove", "mydev"])
1920 assert result.exit_code == 1
1921
1922 def test_remove_refuses_muse_overlap(self, tmp_path: pathlib.Path) -> None:
1923 """Removal is refused when the stored path resolves inside .muse/."""
1924 import json as _json
1925 repo = _make_repo(tmp_path)
1926 _add_branch(repo, "dev")
1927 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1928 # Create a real directory inside .muse/ so _safe_delete_path can
1929 # evaluate it (non-existent paths are a no-op, not a refusal).
1930 malicious_dir = muse_dir(repo) / "malicious"
1931 malicious_dir.mkdir(parents=True, exist_ok=True)
1932 # Tamper the metadata to point inside .muse/.
1933 meta = worktrees_dir(repo) / "mydev.json"
1934 data = _json.loads(meta.read_text())
1935 data["path"] = str(malicious_dir)
1936 meta.write_text(_json.dumps(data))
1937 result = _invoke(repo, ["worktree", "remove", "mydev"])
1938 assert result.exit_code == 1
1939
1940 def test_remove_json_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None:
1941 """ANSI codes in the name argument are stripped from JSON output."""
1942 repo = _make_repo(tmp_path)
1943 _add_branch(repo, "dev")
1944 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1945 # The name goes through validate_branch_name at core level, but
1946 # sanitize_display is applied at CLI output level. Verify no ANSI leaks.
1947 result = _invoke(repo, ["worktree", "remove", "mydev", "-j"])
1948 assert result.exit_code == 0
1949 assert "\x1b" not in result.output
1950
1951 def test_remove_error_to_stderr(self, tmp_path: pathlib.Path) -> None:
1952 """Error output does not contain valid JSON (errors go to stderr)."""
1953 repo = _make_repo(tmp_path)
1954 result = _invoke(repo, ["worktree", "remove", "ghost"])
1955 assert result.exit_code != 0
1956 assert not result.stdout.strip().startswith("{")
1957
1958 def test_remove_invalid_name_exits_1(self, tmp_path: pathlib.Path) -> None:
1959 """Path-traversal name is rejected before any filesystem operation."""
1960 repo = _make_repo(tmp_path)
1961 result = _invoke(repo, ["worktree", "remove", "../../../etc/passwd"])
1962 assert result.exit_code == 1
1963
1964 def test_remove_json_is_valid_json(self, tmp_path: pathlib.Path) -> None:
1965 """JSON output is well-formed."""
1966 repo = _make_repo(tmp_path)
1967 _add_branch(repo, "dev")
1968 _invoke(repo, ["worktree", "add", "mydev", "dev"])
1969 result = _invoke(repo, ["worktree", "remove", "mydev", "-j"])
1970 parsed = json.loads(_json_blob(result.output))
1971 assert isinstance(parsed, dict)
1972 assert isinstance(parsed["name"], str)
1973 assert isinstance(parsed["status"], str)
1974
1975
1976 class TestWorktreeRemoveStress:
1977 """Performance and scale tests for worktree remove."""
1978
1979 def test_remove_20_sequential(self, tmp_path: pathlib.Path) -> None:
1980 """20 worktrees can be created and removed sequentially."""
1981 repo = _make_repo(tmp_path)
1982 for i in range(20):
1983 _add_branch(repo, f"br{i}")
1984 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
1985 failures = []
1986 for i in range(20):
1987 r = _invoke(repo, ["worktree", "remove", f"wt{i}", "-j"])
1988 if r.exit_code != 0:
1989 failures.append(f"wt{i}: {r.output.strip()[:60]}")
1990 assert not failures
1991 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
1992 assert len(entries) == 1 # only main
1993
1994 def test_remove_performance(self, tmp_path: pathlib.Path) -> None:
1995 """Removing 10 worktrees sequentially completes within 2 seconds."""
1996 import time
1997 repo = _make_repo(tmp_path)
1998 for i in range(10):
1999 _add_branch(repo, f"br{i}")
2000 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2001 t0 = time.monotonic()
2002 for i in range(10):
2003 _invoke(repo, ["worktree", "remove", f"wt{i}"])
2004 elapsed = time.monotonic() - t0
2005 assert elapsed < 2.0, f"10 removes took {elapsed:.2f}s"
2006
2007 def test_remove_add_remove_cycle(self, tmp_path: pathlib.Path) -> None:
2008 """A worktree can be re-added with the same name after removal."""
2009 repo = _make_repo(tmp_path)
2010 _add_branch(repo, "dev")
2011 for _ in range(5):
2012 r_add = _invoke(repo, ["worktree", "add", "mydev", "dev"])
2013 assert r_add.exit_code == 0
2014 r_rm = _invoke(repo, ["worktree", "remove", "mydev"])
2015 assert r_rm.exit_code == 0
2016
2017
2018 # ===========================================================================
2019 # muse worktree prune — Extended / Security / Stress
2020 # ===========================================================================
2021
2022
2023 class TestWorktreesPruneExtended:
2024 """-j alias, dry-run, text output, schema, lifecycle edge cases."""
2025
2026 def test_prune_j_alias(self, tmp_path: pathlib.Path) -> None:
2027 """-j produces the same JSON as --json."""
2028 repo = _make_repo(tmp_path)
2029 r1 = _invoke(repo, ["worktree", "prune", "--json"])
2030 r2 = _invoke(repo, ["worktree", "prune", "-j"])
2031 assert r1.exit_code == 0 and r2.exit_code == 0
2032 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None)
2033 d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None)
2034 assert d1 == d2
2035
2036 def test_prune_empty_repo_exits_0(self, tmp_path: pathlib.Path) -> None:
2037 """Prune on a clean repo exits 0 with empty pruned list."""
2038 repo = _make_repo(tmp_path)
2039 result = _invoke(repo, ["worktree", "prune", "-j"])
2040 assert result.exit_code == 0
2041 parsed = _parse_prune(result.output)
2042 assert parsed["pruned"] == []
2043 assert parsed["count"] == 0
2044
2045 def test_prune_stale_in_json_pruned_list(self, tmp_path: pathlib.Path) -> None:
2046 """Stale worktree name appears in JSON pruned list."""
2047 import shutil
2048 from muse.core.worktree import _worktree_dir
2049 repo = _make_repo(tmp_path)
2050 _add_branch(repo, "dev")
2051 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2052 shutil.rmtree(_worktree_dir(repo, "mydev"))
2053 parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output)
2054 assert "mydev" in parsed["pruned"]
2055 assert parsed["count"] == 1
2056 assert parsed["dry_run"] is False
2057
2058 def test_prune_removes_metadata(self, tmp_path: pathlib.Path) -> None:
2059 """Prune deletes the metadata file for a stale worktree."""
2060 import shutil
2061 from muse.core.worktree import _worktree_dir
2062 repo = _make_repo(tmp_path)
2063 _add_branch(repo, "dev")
2064 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2065 shutil.rmtree(_worktree_dir(repo, "mydev"))
2066 meta = worktrees_dir(repo) / "mydev.json"
2067 assert meta.exists()
2068 _invoke(repo, ["worktree", "prune"])
2069 assert not meta.exists()
2070
2071 def test_prune_dry_run_preserves_metadata(self, tmp_path: pathlib.Path) -> None:
2072 """--dry-run does not delete any metadata."""
2073 import shutil
2074 from muse.core.worktree import _worktree_dir
2075 repo = _make_repo(tmp_path)
2076 _add_branch(repo, "dev")
2077 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2078 shutil.rmtree(_worktree_dir(repo, "mydev"))
2079 meta = worktrees_dir(repo) / "mydev.json"
2080 _invoke(repo, ["worktree", "prune", "--dry-run"])
2081 assert meta.exists()
2082
2083 def test_prune_dry_run_json_dry_run_true(self, tmp_path: pathlib.Path) -> None:
2084 """JSON dry_run field is true when --dry-run is passed."""
2085 import shutil
2086 from muse.core.worktree import _worktree_dir
2087 repo = _make_repo(tmp_path)
2088 _add_branch(repo, "dev")
2089 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2090 shutil.rmtree(_worktree_dir(repo, "mydev"))
2091 parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "--dry-run", "-j"]).output)
2092 assert parsed["dry_run"] is True
2093 assert "mydev" in parsed["pruned"]
2094
2095 def test_prune_active_worktree_not_pruned(self, tmp_path: pathlib.Path) -> None:
2096 """Active (present) worktrees are not pruned."""
2097 repo = _make_repo(tmp_path)
2098 _add_branch(repo, "dev")
2099 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2100 parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output)
2101 assert "mydev" not in parsed["pruned"]
2102 assert parsed["count"] == 0
2103
2104 def test_prune_default_is_text(self, tmp_path: pathlib.Path) -> None:
2105 """Without --json the output is human-readable text."""
2106 repo = _make_repo(tmp_path)
2107 result = _invoke(repo, ["worktree", "prune"])
2108 assert result.exit_code == 0
2109 assert not result.output.strip().startswith("{")
2110
2111 def test_prune_text_nothing_to_prune(self, tmp_path: pathlib.Path) -> None:
2112 """Text output says 'Nothing to prune' when clean."""
2113 repo = _make_repo(tmp_path)
2114 result = _invoke(repo, ["worktree", "prune"])
2115 assert "Nothing to prune" in result.output
2116
2117 def test_prune_text_shows_stale_names(self, tmp_path: pathlib.Path) -> None:
2118 """Text output includes names of pruned worktrees."""
2119 import shutil
2120 from muse.core.worktree import _worktree_dir
2121 repo = _make_repo(tmp_path)
2122 _add_branch(repo, "dev")
2123 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2124 shutil.rmtree(_worktree_dir(repo, "mydev"))
2125 result = _invoke(repo, ["worktree", "prune"])
2126 assert "mydev" in result.output
2127
2128 def test_prune_count_matches_pruned_list(self, tmp_path: pathlib.Path) -> None:
2129 """count field always equals len(pruned)."""
2130 import shutil
2131 from muse.core.worktree import _worktree_dir
2132 repo = _make_repo(tmp_path)
2133 for i in range(4):
2134 _add_branch(repo, f"br{i}")
2135 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2136 shutil.rmtree(_worktree_dir(repo, f"wt{i}"))
2137 parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output)
2138 assert parsed["count"] == len(parsed["pruned"]) == 4
2139
2140 def test_prune_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None:
2141 """Running outside a repo exits with code 2."""
2142 result = runner.invoke(None, ["worktree", "prune"], env={"MUSE_REPO_ROOT": str(tmp_path)})
2143 assert result.exit_code == 2
2144
2145 def test_prune_help_has_description(self, tmp_path: pathlib.Path) -> None:
2146 """--help includes the rich description."""
2147 result = _invoke(tmp_path, ["worktree", "prune", "--help"])
2148 assert result.exit_code == 0
2149 assert "Agent quickstart" in result.output or "JSON output schema" in result.output
2150
2151 def test_prune_leaves_branch_refs_intact(self, tmp_path: pathlib.Path) -> None:
2152 """Branch refs are never deleted by prune."""
2153 import shutil
2154 from muse.core.worktree import _worktree_dir
2155 repo = _make_repo(tmp_path)
2156 _add_branch(repo, "dev")
2157 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2158 shutil.rmtree(_worktree_dir(repo, "mydev"))
2159 _invoke(repo, ["worktree", "prune"])
2160 assert (ref_path(repo, "dev")).exists()
2161
2162 def test_prune_after_prune_nothing_left(self, tmp_path: pathlib.Path) -> None:
2163 """Running prune twice: second call reports nothing to prune."""
2164 import shutil
2165 from muse.core.worktree import _worktree_dir
2166 repo = _make_repo(tmp_path)
2167 _add_branch(repo, "dev")
2168 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2169 shutil.rmtree(_worktree_dir(repo, "mydev"))
2170 _invoke(repo, ["worktree", "prune"])
2171 parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output)
2172 assert parsed["count"] == 0
2173
2174 def test_prune_json_is_array_for_pruned(self, tmp_path: pathlib.Path) -> None:
2175 """pruned field is always a JSON array."""
2176 repo = _make_repo(tmp_path)
2177 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "prune", "-j"]).output))
2178 assert isinstance(raw["pruned"], list)
2179
2180 def test_prune_json_all_fields(self, tmp_path: pathlib.Path) -> None:
2181 """JSON output contains pruned, count, dry_run, exit_code, duration_ms."""
2182 repo = _make_repo(tmp_path)
2183 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "prune", "-j"]).output))
2184 assert {"pruned", "count", "dry_run", "exit_code", "duration_ms"}.issubset(raw.keys())
2185
2186
2187 class TestWorktreePruneSecurity:
2188 """ANSI sanitization, corrupt metadata handling."""
2189
2190 def test_prune_json_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None:
2191 """ANSI codes in worktree names are stripped from JSON pruned list."""
2192 import json as _json
2193 repo = _make_repo(tmp_path)
2194 # Write a meta file whose stem contains no ANSI (filesystem prevents it),
2195 # but whose stored branch field has ANSI — prune goes by filename stem
2196 # for the name. Verify the output contains no escape sequences.
2197 meta_dir = worktrees_dir(repo)
2198 meta_dir.mkdir(parents=True, exist_ok=True)
2199 (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": "dev", "path": str(repo / "mywt")}))
2200 # mywt dir does not exist → stale → will be pruned
2201 result = _invoke(repo, ["worktree", "prune", "-j"])
2202 assert result.exit_code == 0
2203 assert "\x1b" not in result.output
2204
2205 def test_prune_text_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None:
2206 """ANSI codes in worktree names are stripped from text output."""
2207 import json as _json
2208 repo = _make_repo(tmp_path)
2209 meta_dir = worktrees_dir(repo)
2210 meta_dir.mkdir(parents=True, exist_ok=True)
2211 (meta_dir / "mywt.json").write_text(_json.dumps({"name": "mywt", "branch": "dev", "path": str(repo / "mywt")}))
2212 result = _invoke(repo, ["worktree", "prune"])
2213 assert "\x1b" not in result.output
2214
2215 def test_prune_corrupt_meta_pruned_safely(self, tmp_path: pathlib.Path) -> None:
2216 """A corrupt (unparseable) metadata file is treated as stale and pruned."""
2217 repo = _make_repo(tmp_path)
2218 meta_dir = worktrees_dir(repo)
2219 meta_dir.mkdir(parents=True, exist_ok=True)
2220 (meta_dir / "corrupt.json").write_text("NOT VALID JSON {{{")
2221 result = _invoke(repo, ["worktree", "prune", "-j"])
2222 assert result.exit_code == 0
2223 parsed = _parse_prune(result.output)
2224 assert "corrupt" in parsed["pruned"]
2225
2226 def test_prune_json_is_valid_json(self, tmp_path: pathlib.Path) -> None:
2227 """Output is well-formed JSON."""
2228 repo = _make_repo(tmp_path)
2229 result = _invoke(repo, ["worktree", "prune", "-j"])
2230 parsed = json.loads(_json_blob(result.output))
2231 assert isinstance(parsed["pruned"], list)
2232 assert isinstance(parsed["count"], int)
2233 assert isinstance(parsed["dry_run"], bool)
2234
2235 def test_prune_dry_run_no_side_effects(self, tmp_path: pathlib.Path) -> None:
2236 """--dry-run produces no filesystem side effects at all."""
2237 import shutil
2238 from muse.core.worktree import _worktree_dir
2239 repo = _make_repo(tmp_path)
2240 for i in range(3):
2241 _add_branch(repo, f"br{i}")
2242 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2243 shutil.rmtree(_worktree_dir(repo, f"wt{i}"))
2244 metas_before = sorted(p.name for p in (worktrees_dir(repo)).glob("*.json"))
2245 _invoke(repo, ["worktree", "prune", "--dry-run"])
2246 metas_after = sorted(p.name for p in (worktrees_dir(repo)).glob("*.json"))
2247 assert metas_before == metas_after
2248
2249
2250 class TestWorktreePruneStress:
2251 """Performance and scale tests for worktree prune."""
2252
2253 def test_prune_20_stale(self, tmp_path: pathlib.Path) -> None:
2254 """Pruning 20 stale entries reports count=20."""
2255 import shutil
2256 from muse.core.worktree import _worktree_dir
2257 repo = _make_repo(tmp_path)
2258 for i in range(20):
2259 _add_branch(repo, f"br{i}")
2260 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2261 shutil.rmtree(_worktree_dir(repo, f"wt{i}"))
2262 parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output)
2263 assert parsed["count"] == 20
2264 assert len(parsed["pruned"]) == 20
2265
2266 def test_prune_performance(self, tmp_path: pathlib.Path) -> None:
2267 """Pruning 15 stale worktrees completes within 2 seconds."""
2268 import shutil
2269 import time
2270 from muse.core.worktree import _worktree_dir
2271 repo = _make_repo(tmp_path)
2272 for i in range(15):
2273 _add_branch(repo, f"br{i}")
2274 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2275 shutil.rmtree(_worktree_dir(repo, f"wt{i}"))
2276 t0 = time.monotonic()
2277 result = _invoke(repo, ["worktree", "prune", "-j"])
2278 elapsed = time.monotonic() - t0
2279 assert result.exit_code == 0
2280 assert elapsed < 2.0, f"prune took {elapsed:.2f}s"
2281
2282 def test_prune_mixed_active_and_stale(self, tmp_path: pathlib.Path) -> None:
2283 """Only stale worktrees are pruned; active ones are preserved."""
2284 import shutil
2285 from muse.core.worktree import _worktree_dir
2286 repo = _make_repo(tmp_path)
2287 for i in range(10):
2288 _add_branch(repo, f"br{i}")
2289 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2290 # Remove every other worktree dir to make it stale.
2291 stale = [f"wt{i}" for i in range(0, 10, 2)]
2292 active = [f"wt{i}" for i in range(1, 10, 2)]
2293 for name in stale:
2294 shutil.rmtree(_worktree_dir(repo, name))
2295 parsed = _parse_prune(_invoke(repo, ["worktree", "prune", "-j"]).output)
2296 assert parsed["count"] == 5
2297 for name in stale:
2298 assert name in parsed["pruned"]
2299 for name in active:
2300 assert name not in parsed["pruned"]
2301 # Active worktrees still in list.
2302 entries = _parse_list(_invoke(repo, ["worktree", "list", "-j"]).output)
2303 listed_names = {e["name"] for e in entries}
2304 for name in active:
2305 assert name in listed_names
2306
2307
2308 # ===========================================================================
2309 # muse worktree repair — Extended / Security / Stress
2310 # ===========================================================================
2311
2312
2313 class TestWorktreeRepairExtended:
2314 """-j alias, text output, pointer writes, idempotency, edge cases."""
2315
2316 def test_repair_j_alias(self, tmp_path: pathlib.Path) -> None:
2317 """-j produces the same JSON as --json."""
2318 repo = _make_repo(tmp_path)
2319 r1 = _invoke(repo, ["worktree", "repair", "--json"])
2320 r2 = _invoke(repo, ["worktree", "repair", "-j"])
2321 assert r1.exit_code == 0 and r2.exit_code == 0
2322 d1 = json.loads(_json_blob(r1.output)); d1.pop("duration_ms", None); d1.pop("timestamp", None)
2323 d2 = json.loads(_json_blob(r2.output)); d2.pop("duration_ms", None); d2.pop("timestamp", None)
2324 assert d1 == d2
2325
2326 def test_repair_empty_repo_exits_0(self, tmp_path: pathlib.Path) -> None:
2327 """Repair on a repo with no linked worktrees exits 0."""
2328 repo = _make_repo(tmp_path)
2329 result = _invoke(repo, ["worktree", "repair", "-j"])
2330 assert result.exit_code == 0
2331
2332 def test_repair_json_repaired_is_list(self, tmp_path: pathlib.Path) -> None:
2333 """JSON repaired field is always a list."""
2334 repo = _make_repo(tmp_path)
2335 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output))
2336 assert isinstance(raw["repaired"], list)
2337
2338 def test_repair_json_only_repaired_field(self, tmp_path: pathlib.Path) -> None:
2339 """JSON output contains repaired, exit_code, duration_ms."""
2340 repo = _make_repo(tmp_path)
2341 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output))
2342 assert {"repaired", "exit_code", "duration_ms"}.issubset(raw.keys())
2343
2344 def test_repair_writes_pointer_file(self, tmp_path: pathlib.Path) -> None:
2345 """Repair writes the .muse pointer file in the worktree directory."""
2346 repo = _make_repo(tmp_path)
2347 _add_branch(repo, "dev")
2348 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2349 from muse.core.worktree import _worktree_dir
2350 pointer = _worktree_dir(repo, "mydev") / ".muse"
2351 pointer.unlink(missing_ok=True)
2352 assert not pointer.exists()
2353 result = _invoke(repo, ["worktree", "repair"])
2354 assert result.exit_code == 0
2355 assert pointer.exists()
2356
2357 def test_repair_pointer_contains_store_path(self, tmp_path: pathlib.Path) -> None:
2358 """The pointer file content references the main .muse store."""
2359 repo = _make_repo(tmp_path)
2360 _add_branch(repo, "dev")
2361 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2362 from muse.core.worktree import _worktree_dir
2363 pointer = _worktree_dir(repo, "mydev") / ".muse"
2364 pointer.unlink(missing_ok=True)
2365 _invoke(repo, ["worktree", "repair"])
2366 content = pointer.read_text()
2367 assert "musestore:" in content
2368 assert str(muse_dir(repo)) in content or ".muse" in content
2369
2370 def test_repair_idempotent(self, tmp_path: pathlib.Path) -> None:
2371 """Running repair twice does not raise an error."""
2372 repo = _make_repo(tmp_path)
2373 _add_branch(repo, "dev")
2374 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2375 r1 = _invoke(repo, ["worktree", "repair", "-j"])
2376 r2 = _invoke(repo, ["worktree", "repair", "-j"])
2377 assert r1.exit_code == 0
2378 assert r2.exit_code == 0
2379
2380 def test_repair_includes_worktree_name_in_json(self, tmp_path: pathlib.Path) -> None:
2381 """JSON repaired list contains the linked worktree name."""
2382 repo = _make_repo(tmp_path)
2383 _add_branch(repo, "dev")
2384 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2385 from muse.core.worktree import _worktree_dir
2386 (_worktree_dir(repo, "mydev") / ".muse").unlink(missing_ok=True)
2387 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output))
2388 assert "mydev" in raw["repaired"]
2389
2390 def test_repair_default_is_text(self, tmp_path: pathlib.Path) -> None:
2391 """Without --json the output is human-readable text."""
2392 repo = _make_repo(tmp_path)
2393 result = _invoke(repo, ["worktree", "repair"])
2394 assert result.exit_code == 0
2395 assert not result.output.strip().startswith("{")
2396
2397 def test_repair_text_no_worktrees_message(self, tmp_path: pathlib.Path) -> None:
2398 """Text output says 'All worktrees already have pointer files' when clean."""
2399 repo = _make_repo(tmp_path)
2400 result = _invoke(repo, ["worktree", "repair"])
2401 assert "already have pointer files" in result.output
2402
2403 def test_repair_text_shows_repaired_name(self, tmp_path: pathlib.Path) -> None:
2404 """Text output shows the name of each repaired worktree."""
2405 repo = _make_repo(tmp_path)
2406 _add_branch(repo, "dev")
2407 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2408 from muse.core.worktree import _worktree_dir
2409 (_worktree_dir(repo, "mydev") / ".muse").unlink(missing_ok=True)
2410 result = _invoke(repo, ["worktree", "repair"])
2411 assert "mydev" in result.output
2412
2413 def test_repair_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None:
2414 """Running outside a repo exits with code 2."""
2415 result = runner.invoke(None, ["worktree", "repair"], env={"MUSE_REPO_ROOT": str(tmp_path)})
2416 assert result.exit_code == 2
2417
2418 def test_repair_help_has_description(self, tmp_path: pathlib.Path) -> None:
2419 """--help includes the rich description."""
2420 result = _invoke(tmp_path, ["worktree", "repair", "--help"])
2421 assert result.exit_code == 0
2422 assert "Agent quickstart" in result.output or "JSON output schema" in result.output
2423
2424 def test_repair_skips_absent_worktree_dirs(self, tmp_path: pathlib.Path) -> None:
2425 """Worktrees whose directories are absent are skipped (not in repaired list)."""
2426 import shutil
2427 from muse.core.worktree import _worktree_dir
2428 repo = _make_repo(tmp_path)
2429 _add_branch(repo, "dev")
2430 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2431 shutil.rmtree(_worktree_dir(repo, "mydev"))
2432 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output))
2433 assert "mydev" not in raw["repaired"]
2434
2435 def test_repair_multiple_worktrees(self, tmp_path: pathlib.Path) -> None:
2436 """Repair fixes pointer files for all present linked worktrees."""
2437 repo = _make_repo(tmp_path)
2438 for name in ("a", "b", "c"):
2439 _add_branch(repo, name)
2440 _invoke(repo, ["worktree", "add", name, name])
2441 from muse.core.worktree import _worktree_dir
2442 for name in ("a", "b", "c"):
2443 (_worktree_dir(repo, name) / ".muse").unlink(missing_ok=True)
2444 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output))
2445 for name in ("a", "b", "c"):
2446 assert name in raw["repaired"]
2447 assert len(raw["repaired"]) >= 3
2448
2449 def test_repair_is_valid_json(self, tmp_path: pathlib.Path) -> None:
2450 """JSON output is well-formed."""
2451 repo = _make_repo(tmp_path)
2452 result = _invoke(repo, ["worktree", "repair", "-j"])
2453 parsed = json.loads(_json_blob(result.output))
2454 assert isinstance(parsed, dict)
2455 assert isinstance(parsed["repaired"], list)
2456
2457
2458 class TestWorktreeRepairSecurity:
2459 """ANSI sanitization and output integrity."""
2460
2461 def test_repair_json_no_ansi_in_output(self, tmp_path: pathlib.Path) -> None:
2462 """JSON output contains no ANSI escape sequences."""
2463 repo = _make_repo(tmp_path)
2464 _add_branch(repo, "dev")
2465 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2466 result = _invoke(repo, ["worktree", "repair", "-j"])
2467 assert "\x1b" not in result.output
2468
2469 def test_repair_text_no_ansi_in_output(self, tmp_path: pathlib.Path) -> None:
2470 """Text output contains no ANSI escape sequences."""
2471 repo = _make_repo(tmp_path)
2472 _add_branch(repo, "dev")
2473 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2474 result = _invoke(repo, ["worktree", "repair"])
2475 assert "\x1b" not in result.output
2476
2477 def test_repair_does_not_write_outside_worktree_dirs(self, tmp_path: pathlib.Path) -> None:
2478 """Repair only writes inside registered worktree directories."""
2479 repo = _make_repo(tmp_path)
2480 _add_branch(repo, "dev")
2481 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2482 from muse.core.worktree import _worktree_dir
2483 wt_dir = _worktree_dir(repo, "mydev")
2484 _invoke(repo, ["worktree", "repair"])
2485 # The only .muse file written should be inside the worktree directory.
2486 pointer = muse_dir(wt_dir)
2487 assert pointer.exists()
2488 # The main repo .muse dir must remain a directory (not overwritten).
2489 assert (muse_dir(repo)).is_dir()
2490
2491 def test_repair_pointer_not_symlink(self, tmp_path: pathlib.Path) -> None:
2492 """The written pointer file is a regular file, not a symlink."""
2493 repo = _make_repo(tmp_path)
2494 _add_branch(repo, "dev")
2495 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2496 from muse.core.worktree import _worktree_dir
2497 pointer = _worktree_dir(repo, "mydev") / ".muse"
2498 pointer.unlink(missing_ok=True)
2499 _invoke(repo, ["worktree", "repair"])
2500 assert pointer.exists()
2501 assert not pointer.is_symlink()
2502
2503 def test_repair_json_repaired_names_are_strings(self, tmp_path: pathlib.Path) -> None:
2504 """Every element of JSON repaired list is a string."""
2505 repo = _make_repo(tmp_path)
2506 _add_branch(repo, "dev")
2507 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2508 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output))
2509 for name in raw["repaired"]:
2510 assert isinstance(name, str)
2511
2512
2513 class TestWorktreeRepairStress:
2514 """Performance and scale tests for worktree repair."""
2515
2516 def test_repair_15_worktrees(self, tmp_path: pathlib.Path) -> None:
2517 """Repair correctly fixes pointer files for 15 linked worktrees."""
2518 repo = _make_repo(tmp_path)
2519 for i in range(15):
2520 _add_branch(repo, f"br{i}")
2521 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2522 from muse.core.worktree import _worktree_dir
2523 for i in range(15):
2524 (_worktree_dir(repo, f"wt{i}") / ".muse").unlink(missing_ok=True)
2525 raw = json.loads(_json_blob(_invoke(repo, ["worktree", "repair", "-j"]).output))
2526 assert len(raw["repaired"]) >= 15
2527
2528 def test_repair_performance(self, tmp_path: pathlib.Path) -> None:
2529 """Repairing 10 worktrees completes within 2 seconds."""
2530 import time
2531 repo = _make_repo(tmp_path)
2532 for i in range(10):
2533 _add_branch(repo, f"br{i}")
2534 _invoke(repo, ["worktree", "add", f"wt{i}", f"br{i}"])
2535 from muse.core.worktree import _worktree_dir
2536 for i in range(10):
2537 (_worktree_dir(repo, f"wt{i}") / ".muse").unlink(missing_ok=True)
2538 t0 = time.monotonic()
2539 result = _invoke(repo, ["worktree", "repair", "-j"])
2540 elapsed = time.monotonic() - t0
2541 assert result.exit_code == 0
2542 assert elapsed < 2.0, f"repair took {elapsed:.2f}s"
2543
2544 def test_repair_idempotent_10_times(self, tmp_path: pathlib.Path) -> None:
2545 """Running repair 10 times in a row never fails."""
2546 repo = _make_repo(tmp_path)
2547 _add_branch(repo, "dev")
2548 _invoke(repo, ["worktree", "add", "mydev", "dev"])
2549 for _ in range(10):
2550 r = _invoke(repo, ["worktree", "repair", "-j"])
2551 assert r.exit_code == 0
File History 1 commit
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago