gabriel / muse public
test_cmd_branch.py python
1,070 lines 46.2 KB
Raw
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29 fix(branch): guard -d --dry-run against destructive writes;… Sonnet 4.6 patch 1 day ago
1 """Tests for ``muse branch``.
2
3 Coverage tiers
4 --------------
5 Unit — parser flags, dead-code removal, helpers (_resolve_start_point,
6 _list_local_branches, _list_remotes, _upstream_for,
7 _commit_ancestors, _is_merged, _contains_commit,
8 _cleanup_empty_dirs).
9 Integration — create, delete, force-delete, rename, force-rename, copy,
10 force-copy, listing, filtering, sorting.
11 End-to-end — full CLI invocations: text and JSON output, all operations.
12 Security — ANSI injection in branch names, format flags, messages.
13 Stress — 500 branches, concurrent list, deep ancestry chains.
14 """
15
16 from __future__ import annotations
17
18 import json
19 import os
20 import pathlib
21 import subprocess
22 import threading
23 import time
24 from typing import TYPE_CHECKING
25
26 import pytest
27
28 from tests.cli_test_helper import CliRunner, InvokeResult
29 from muse.core.refs import (
30 get_head_commit_id,
31 read_current_branch,
32 )
33 from muse.core.types import short_id
34 from muse.core.paths import heads_dir, logs_dir
35
36 if TYPE_CHECKING:
37 import argparse
38
39 runner = CliRunner()
40
41 # ──────────────────────────────────────────────────────────────────────────────
42 # Helpers
43 # ──────────────────────────────────────────────────────────────────────────────
44
45
46 def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult:
47 saved = os.getcwd()
48 try:
49 os.chdir(repo)
50 return runner.invoke(None, args)
51 finally:
52 os.chdir(saved)
53
54
55 def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult:
56 return _invoke(repo, ["branch", *extra])
57
58
59 def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult:
60 _invoke(repo, ["code", "add", "."])
61 return _invoke(repo, ["commit", *extra])
62
63
64 @pytest.fixture()
65 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
66 """Initialised repo with one commit on ``main``."""
67 saved = os.getcwd()
68 try:
69 os.chdir(tmp_path)
70 runner.invoke(None, ["init"])
71 finally:
72 os.chdir(saved)
73 (tmp_path / "a.py").write_text("x = 1\n")
74 _commit(tmp_path, "-m", "initial")
75 return tmp_path
76
77
78 @pytest.fixture()
79 def two_commit_repo(repo: pathlib.Path) -> pathlib.Path:
80 """Repo with two commits on ``main``."""
81 (repo / "b.py").write_text("y = 2\n")
82 _commit(repo, "-m", "second")
83 return repo
84
85
86 # ──────────────────────────────────────────────────────────────────────────────
87 # Unit — parser flags
88 # ──────────────────────────────────────────────────────────────────────────────
89
90
91 class TestRegisterFlags:
92 def _parse(self, *args: str) -> "argparse.Namespace":
93 import argparse
94
95 from muse.cli.commands.branch import register
96
97 p = argparse.ArgumentParser()
98 sub = p.add_subparsers()
99 register(sub)
100 return p.parse_args(["branch", *args])
101
102 def test_default_json_out_is_false(self) -> None:
103 ns = self._parse()
104 assert ns.json_out is False
105
106 def test_json_flag_sets_json_out(self) -> None:
107 ns = self._parse("--json")
108 assert ns.json_out is True
109
110 def test_j_shorthand_sets_json_out(self) -> None:
111 ns = self._parse("-j")
112 assert ns.json_out is True
113
114 def test_delete_flag(self) -> None:
115 ns = self._parse("-d", "foo")
116 assert ns.op == "delete"
117
118 def test_force_delete_flag(self) -> None:
119 ns = self._parse("-D", "foo")
120 assert ns.op == "force_delete"
121
122 def test_rename_flag(self) -> None:
123 ns = self._parse("-m", "new")
124 assert ns.op == "rename"
125
126 def test_force_rename_flag(self) -> None:
127 ns = self._parse("-M", "new")
128 assert ns.op == "force_rename"
129
130 def test_copy_flag(self) -> None:
131 ns = self._parse("-c", "copy")
132 assert ns.op == "copy"
133
134 def test_force_copy_flag(self) -> None:
135 ns = self._parse("-C", "copy")
136 assert ns.op == "force_copy"
137
138 def test_verbose_default_0(self) -> None:
139 ns = self._parse()
140 assert ns.verbose == 0
141
142 def test_verbose_v_is_1(self) -> None:
143 ns = self._parse("-v")
144 assert ns.verbose == 1
145
146 def test_verbose_vv_is_2(self) -> None:
147 ns = self._parse("-vv")
148 assert ns.verbose == 2
149
150 def test_remotes_flag(self) -> None:
151 ns = self._parse("-r")
152 assert ns.remotes is True
153
154 def test_all_flag(self) -> None:
155 ns = self._parse("-a")
156 assert ns.all_branches is True
157
158 def test_sort_default_name(self) -> None:
159 ns = self._parse()
160 assert ns.sort == "name"
161
162 def test_sort_committeddate(self) -> None:
163 ns = self._parse("--sort", "committeddate")
164 assert ns.sort == "committeddate"
165
166 def test_sort_invalid_rejected(self) -> None:
167 import argparse
168
169 from muse.cli.commands.branch import register
170
171 p = argparse.ArgumentParser()
172 sub = p.add_subparsers()
173 register(sub)
174 with pytest.raises(SystemExit):
175 p.parse_args(["branch", "--sort", "invalid"])
176
177
178 # ──────────────────────────────────────────────────────────────────────────────
179 # Unit — dead-code removal
180 # ──────────────────────────────────────────────────────────────────────────────
181
182
183 class TestDeadCodeRemoved:
184 def test_op_list_branch_removed(self) -> None:
185 import inspect
186
187 import muse.cli.commands.branch as m
188
189 src = inspect.getsource(m.run)
190 assert 'op == "list"' not in src, (
191 'op == "list" was a dead branch (nothing in register() creates it); must be deleted'
192 )
193
194 def test_inline_tomllib_import_removed(self) -> None:
195 import inspect
196
197 import muse.cli.commands.branch as m
198
199 src = inspect.getsource(m._upstream_for)
200 assert "import tomllib" not in src, (
201 "inline 'import tomllib' inside _upstream_for should be a module-level import"
202 )
203
204 def test_double_sanitize_removed(self) -> None:
205 """The verbose listing previously double-sanitized name_str (stripping ANSI)."""
206 import inspect
207
208 import muse.cli.commands.branch as m
209
210 src = inspect.getsource(m.run)
211 assert "sanitize_display(name_str)" not in src, (
212 "name_str was double-sanitized; the second call stripped ANSI from current branch"
213 )
214
215
216 # ──────────────────────────────────────────────────────────────────────────────
217 # Unit — _resolve_start_point
218 # ──────────────────────────────────────────────────────────────────────────────
219
220
221 class TestResolveStartPoint:
222 def test_resolves_branch_name(self, repo: pathlib.Path) -> None:
223 from muse.cli.commands.branch import _resolve_start_point
224
225 cid = get_head_commit_id(repo, "main")
226 result = _resolve_start_point(repo, "main", "main")
227 assert result == cid
228
229 def test_resolves_full_sha(self, repo: pathlib.Path) -> None:
230 from muse.cli.commands.branch import _resolve_start_point
231
232 cid = get_head_commit_id(repo, "main")
233 assert cid is not None
234 result = _resolve_start_point(repo, "main", cid)
235 assert result == cid
236
237 def test_resolves_partial_sha(self, two_commit_repo: pathlib.Path) -> None:
238 from muse.cli.commands.branch import _resolve_start_point
239 from muse.core.refs import read_current_branch
240 from muse.core.commits import get_commits_for_branch
241
242 repo = two_commit_repo
243 branch = read_current_branch(repo)
244 commits = get_commits_for_branch(repo, branch)
245 first_sha = commits[-1].commit_id # oldest commit
246 # 12-char prefix should resolve
247 result = _resolve_start_point(repo, "main", short_id(first_sha))
248 assert result == first_sha
249
250 def test_returns_input_for_unresolvable(self, repo: pathlib.Path) -> None:
251 from muse.cli.commands.branch import _resolve_start_point
252
253 result = _resolve_start_point(repo, "main", "nonexistent-ref")
254 assert result == "nonexistent-ref"
255
256
257 # ──────────────────────────────────────────────────────────────────────────────
258 # Unit — _list_local_branches
259 # ──────────────────────────────────────────────────────────────────────────────
260
261
262 class TestListLocalBranches:
263 def test_returns_sorted_list(self, repo: pathlib.Path) -> None:
264 from muse.cli.commands.branch import _list_local_branches
265
266 _branch(repo, "z-last")
267 _branch(repo, "a-first")
268 branches = _list_local_branches(repo)
269 assert branches == sorted(branches)
270
271 def test_skips_hidden_files(self, repo: pathlib.Path) -> None:
272 from muse.cli.commands.branch import _list_local_branches
273
274 # Plant a hidden lock file inside refs/heads/
275 lock = heads_dir(repo) / ".lock"
276 lock.write_text("locked")
277 branches = _list_local_branches(repo)
278 assert ".lock" not in branches
279 assert not any(b.startswith(".") for b in branches)
280
281 def test_empty_repo_returns_empty(self, tmp_path: pathlib.Path) -> None:
282 from muse.cli.commands.branch import _list_local_branches
283
284 assert _list_local_branches(tmp_path) == []
285
286 def test_includes_nested_branches(self, repo: pathlib.Path) -> None:
287 from muse.cli.commands.branch import _list_local_branches
288
289 _branch(repo, "feat/sub/task")
290 branches = _list_local_branches(repo)
291 assert "feat/sub/task" in branches
292
293
294 # ──────────────────────────────────────────────────────────────────────────────
295 # Unit — _commit_ancestors, _is_merged, _contains_commit
296 # ──────────────────────────────────────────────────────────────────────────────
297
298
299 class TestCommitGraph:
300 def test_commit_ancestors_includes_self(self, repo: pathlib.Path) -> None:
301 from muse.cli.commands.branch import _commit_ancestors
302
303 cid = get_head_commit_id(repo, "main")
304 assert cid is not None
305 ancestors = _commit_ancestors(repo, cid)
306 assert cid in ancestors
307
308 def test_is_merged_true_for_same_branch(self, repo: pathlib.Path) -> None:
309 from muse.cli.commands.branch import _is_merged
310
311 assert _is_merged(repo, "main", "main")
312
313 def test_is_merged_false_for_unmerged(self, repo: pathlib.Path) -> None:
314 from muse.cli.commands.branch import _is_merged
315
316 _branch(repo, "feat")
317 _invoke(repo, ["checkout", "feat"])
318 (repo / "c.py").write_text("c=1\n")
319 _commit(repo, "-m", "feat commit")
320 _invoke(repo, ["checkout", "main"])
321 assert not _is_merged(repo, "feat", "main")
322
323 def test_contains_commit_true(self, repo: pathlib.Path) -> None:
324 from muse.cli.commands.branch import _contains_commit
325
326 cid = get_head_commit_id(repo, "main")
327 assert cid is not None
328 assert _contains_commit(repo, "main", cid)
329
330 def test_contains_commit_false_for_unknown(self, repo: pathlib.Path) -> None:
331 from muse.cli.commands.branch import _contains_commit
332
333 assert not _contains_commit(repo, "main", "a" * 64)
334
335
336 # ──────────────────────────────────────────────────────────────────────────────
337 # Integration — CREATE
338 # ──────────────────────────────────────────────────────────────────────────────
339
340
341 class TestCreate:
342 def test_create_basic_exits_0(self, repo: pathlib.Path) -> None:
343 result = _branch(repo, "new-branch")
344 assert result.exit_code == 0
345
346 def test_create_text_output(self, repo: pathlib.Path) -> None:
347 result = _branch(repo, "my-branch")
348 assert "my-branch" in result.output
349
350 def test_create_json_schema(self, repo: pathlib.Path) -> None:
351 result = _branch(repo, "json-branch", "--json")
352 data = json.loads(result.output)
353 assert data["action"] == "created"
354 assert data["branch"] == "json-branch"
355 assert "commit_id" in data
356 assert "from" in data
357
358 def test_create_json_from_is_none_at_head(self, repo: pathlib.Path) -> None:
359 result = _branch(repo, "from-head", "--json")
360 data = json.loads(result.output)
361 assert data["from"] is None
362
363 def test_create_at_full_sha(self, two_commit_repo: pathlib.Path) -> None:
364 repo = two_commit_repo
365 from muse.core.refs import read_current_branch
366 from muse.core.commits import get_commits_for_branch
367
368 branch = read_current_branch(repo)
369 commits = get_commits_for_branch(repo, branch)
370 first_sha = commits[-1].commit_id
371
372 result = _branch(repo, "at-sha", first_sha)
373 assert result.exit_code == 0
374 tip = get_head_commit_id(repo, "at-sha")
375 assert tip == first_sha
376
377 def test_create_at_partial_sha(self, two_commit_repo: pathlib.Path) -> None:
378 repo = two_commit_repo
379 from muse.core.refs import read_current_branch
380 from muse.core.commits import get_commits_for_branch
381
382 branch = read_current_branch(repo)
383 commits = get_commits_for_branch(repo, branch)
384 first_sha = commits[-1].commit_id
385
386 result = _branch(repo, "at-partial", short_id(first_sha))
387 assert result.exit_code == 0
388 tip = get_head_commit_id(repo, "at-partial")
389 assert tip == first_sha
390
391 def test_create_at_branch_name(self, repo: pathlib.Path) -> None:
392 head_cid = get_head_commit_id(repo, "main")
393 result = _branch(repo, "copy-of-main", "main")
394 assert result.exit_code == 0
395 tip = get_head_commit_id(repo, "copy-of-main")
396 assert tip == head_cid
397
398 def test_create_json_from_field_populated(self, repo: pathlib.Path) -> None:
399 result = _branch(repo, "with-from", "main", "--json")
400 data = json.loads(result.output)
401 assert data["from"] == "main"
402
403 def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None:
404 _branch(repo, "dup")
405 result = _branch(repo, "dup")
406 assert result.exit_code == 1
407
408 def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None:
409 result = _branch(repo, "bad..name")
410 assert result.exit_code == 1
411
412 def test_create_does_not_checkout(self, repo: pathlib.Path) -> None:
413 _branch(repo, "new-but-no-switch")
414 assert read_current_branch(repo) == "main"
415
416
417 # ──────────────────────────────────────────────────────────────────────────────
418 # Integration — DELETE
419 # ──────────────────────────────────────────────────────────────────────────────
420
421
422 class TestDelete:
423 def test_delete_merged_branch_exits_0(self, repo: pathlib.Path) -> None:
424 _branch(repo, "to-delete")
425 # Branch points to same commit as main → considered merged
426 result = _branch(repo, "-d", "to-delete")
427 assert result.exit_code == 0
428
429 def test_delete_json_schema(self, repo: pathlib.Path) -> None:
430 _branch(repo, "del-json")
431 result = _branch(repo, "-d", "del-json", "--json")
432 data = json.loads(result.output)
433 assert data["action"] == "deleted"
434 assert data["branch"] == "del-json"
435 assert "was" in data
436
437 def test_delete_unmerged_exits_1_without_force(self, repo: pathlib.Path) -> None:
438 _branch(repo, "unmerged")
439 _invoke(repo, ["checkout", "unmerged"])
440 (repo / "z.py").write_text("z=1\n")
441 _commit(repo, "-m", "unmerged work")
442 _invoke(repo, ["checkout", "main"])
443 result = _branch(repo, "-d", "unmerged")
444 assert result.exit_code == 1
445
446 def test_force_delete_unmerged_exits_0(self, repo: pathlib.Path) -> None:
447 _branch(repo, "force-del")
448 _invoke(repo, ["checkout", "force-del"])
449 (repo / "x.py").write_text("x=1\n")
450 _commit(repo, "-m", "exclusive work")
451 _invoke(repo, ["checkout", "main"])
452 result = _branch(repo, "-D", "force-del")
453 assert result.exit_code == 0
454
455 def test_delete_current_branch_exits_1(self, repo: pathlib.Path) -> None:
456 result = _branch(repo, "-d", "main")
457 assert result.exit_code == 1
458
459 def test_delete_nonexistent_exits_1(self, repo: pathlib.Path) -> None:
460 result = _branch(repo, "-d", "ghost")
461 assert result.exit_code == 1
462
463 def test_delete_removes_branch_from_list(self, repo: pathlib.Path) -> None:
464 _branch(repo, "temp")
465 _branch(repo, "-d", "temp")
466 result = _branch(repo, "--json")
467 names = [b["name"] for b in json.loads(result.output)]
468 assert "temp" not in names
469
470 def test_delete_nested_branch_cleans_empty_dirs(self, repo: pathlib.Path) -> None:
471 _branch(repo, "feat/sub/task")
472 _branch(repo, "-D", "feat/sub/task")
473 # The feat/ and feat/sub/ dirs should be gone
474 feat_dir = heads_dir(repo) / "feat"
475 assert not feat_dir.exists()
476
477 def test_delete_removes_reflog_file(self, repo: pathlib.Path) -> None:
478 """Deleting a branch removes its reflog file — git-idiomatic behaviour."""
479 _invoke(repo, ["checkout", "-b", "bye"]) # checkout writes the reflog
480 _invoke(repo, ["checkout", "main"])
481 reflog = logs_dir(repo) / "refs" / "heads" / "bye"
482 assert reflog.exists(), "reflog should exist after checkout -b"
483 _branch(repo, "-d", "bye")
484 assert not reflog.exists(), "reflog must be deleted when branch is deleted"
485
486 def test_delete_nested_branch_removes_reflog_and_empty_dirs(
487 self, repo: pathlib.Path
488 ) -> None:
489 """Nested branch deletion removes reflog file and its empty parent dirs."""
490 _invoke(repo, ["checkout", "-b", "feat/ui/button"])
491 _invoke(repo, ["checkout", "main"])
492 reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "ui" / "button"
493 assert reflog.exists(), "reflog should exist after checkout -b"
494 _branch(repo, "-D", "feat/ui/button")
495 assert not reflog.exists()
496 log_feat_dir = logs_dir(repo) / "refs" / "heads" / "feat"
497 assert not log_feat_dir.exists(), "empty reflog parent dirs must be cleaned up"
498
499 def test_force_delete_also_removes_reflog(self, repo: pathlib.Path) -> None:
500 """-D (force delete) removes the reflog just like -d."""
501 _invoke(repo, ["checkout", "-b", "force-log"])
502 (repo / "tmp.py").write_text("x=1\n")
503 _commit(repo, "-m", "unmerged")
504 _invoke(repo, ["checkout", "main"])
505 reflog = logs_dir(repo) / "refs" / "heads" / "force-log"
506 assert reflog.exists()
507 _branch(repo, "-D", "force-log")
508 assert not reflog.exists()
509
510 def test_delete_dry_run_does_not_delete_branch(self, repo: pathlib.Path) -> None:
511 """-d --dry-run prints what would be deleted but makes no changes."""
512 _branch(repo, "stay")
513 result = _branch(repo, "-d", "stay", "--dry-run")
514 assert result.exit_code == 0
515 # Branch must still exist
516 branches = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
517 assert "stay" in branches
518
519 def test_delete_dry_run_json_schema(self, repo: pathlib.Path) -> None:
520 """-d --dry-run --json includes dry_run: true in envelope."""
521 _branch(repo, "dry-json")
522 result = _branch(repo, "-d", "dry-json", "--dry-run", "--json")
523 assert result.exit_code == 0
524 data = json.loads(result.output)
525 assert data["action"] == "deleted"
526 assert data["branch"] == "dry-json"
527 assert data["dry_run"] is True
528 # Branch still present
529 branches = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
530 assert "dry-json" in branches
531
532 def test_delete_dry_run_output_has_prefix(self, repo: pathlib.Path) -> None:
533 """-d --dry-run text output starts with [dry-run]."""
534 _branch(repo, "prefixed")
535 result = _branch(repo, "-d", "prefixed", "--dry-run")
536 assert result.exit_code == 0
537 assert "[dry-run]" in result.output
538
539 def test_force_delete_dry_run_does_not_delete(self, repo: pathlib.Path) -> None:
540 """-D --dry-run on an unmerged branch makes no changes."""
541 _invoke(repo, ["checkout", "-b", "unmerged-dry"])
542 (repo / "q.py").write_text("q=1\n")
543 _commit(repo, "-m", "unmerged work")
544 _invoke(repo, ["checkout", "main"])
545 result = _branch(repo, "-D", "unmerged-dry", "--dry-run")
546 assert result.exit_code == 0
547 branches = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
548 assert "unmerged-dry" in branches
549
550 def test_delete_dry_run_does_not_remove_reflog(self, repo: pathlib.Path) -> None:
551 """-d --dry-run leaves the reflog file intact."""
552 _invoke(repo, ["checkout", "-b", "keep-log"])
553 _invoke(repo, ["checkout", "main"])
554 reflog = logs_dir(repo) / "refs" / "heads" / "keep-log"
555 assert reflog.exists()
556 _branch(repo, "-d", "keep-log", "--dry-run")
557 assert reflog.exists(), "dry-run must not remove the reflog"
558
559
560 # ──────────────────────────────────────────────────────────────────────────────
561 # Integration — CREATE REFLOG
562 # ──────────────────────────────────────────────────────────────────────────────
563
564
565 class TestCreateReflog:
566 def test_branch_create_writes_reflog(self, repo: pathlib.Path) -> None:
567 """muse branch -b writes a reflog entry — git-idiomatic behaviour."""
568 _branch(repo, "feat/new")
569 reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "new"
570 assert reflog.exists(), "reflog must exist after muse branch -b"
571
572 def test_branch_create_reflog_contains_branch_created(self, repo: pathlib.Path) -> None:
573 """Reflog entry records a 'branch: Created' operation."""
574 _branch(repo, "task/thing")
575 reflog = logs_dir(repo) / "refs" / "heads" / "task" / "thing"
576 content = reflog.read_text(encoding="utf-8")
577 assert "branch: Created" in content
578
579 def test_branch_create_reflog_records_start_point(self, repo: pathlib.Path) -> None:
580 """Reflog entry for a branch created from another branch names that source."""
581 _branch(repo, "task/from-main", "main")
582 reflog = logs_dir(repo) / "refs" / "heads" / "task" / "from-main"
583 content = reflog.read_text(encoding="utf-8")
584 assert "main" in content
585
586
587 # ──────────────────────────────────────────────────────────────────────────────
588 # Integration — RENAME
589 # ──────────────────────────────────────────────────────────────────────────────
590
591
592 class TestRename:
593 def test_rename_basic(self, repo: pathlib.Path) -> None:
594 _branch(repo, "old-name")
595 result = _branch(repo, "-m", "old-name", "new-name")
596 assert result.exit_code == 0
597 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
598 assert "new-name" in names
599 assert "old-name" not in names
600
601 def test_rename_omit_old_uses_current(self, repo: pathlib.Path) -> None:
602 _branch(repo, "temp")
603 _invoke(repo, ["checkout", "temp"])
604 result = _branch(repo, "-m", "renamed")
605 assert result.exit_code == 0
606 assert read_current_branch(repo) == "renamed"
607 _invoke(repo, ["checkout", "main"])
608
609 def test_rename_json_schema(self, repo: pathlib.Path) -> None:
610 _branch(repo, "src")
611 result = _branch(repo, "-m", "src", "dst", "--json")
612 data = json.loads(result.output)
613 assert data["action"] == "renamed"
614 assert data["from"] == "src"
615 assert data["to"] == "dst"
616
617 def test_rename_to_existing_exits_1(self, repo: pathlib.Path) -> None:
618 _branch(repo, "a")
619 _branch(repo, "b")
620 result = _branch(repo, "-m", "a", "b")
621 assert result.exit_code == 1
622
623 def test_force_rename_to_existing_exits_0(self, repo: pathlib.Path) -> None:
624 _branch(repo, "a")
625 _branch(repo, "b")
626 result = _branch(repo, "-M", "a", "b")
627 assert result.exit_code == 0
628
629 def test_rename_updates_head_when_current(self, repo: pathlib.Path) -> None:
630 _branch(repo, "temp2")
631 _invoke(repo, ["checkout", "temp2"])
632 _branch(repo, "-m", "temp2", "newname")
633 assert read_current_branch(repo) == "newname"
634 _invoke(repo, ["checkout", "main"])
635
636 def test_rename_nonexistent_exits_1(self, repo: pathlib.Path) -> None:
637 result = _branch(repo, "-m", "ghost", "newname")
638 assert result.exit_code == 1
639
640
641 # ──────────────────────────────────────────────────────────────────────────────
642 # Integration — COPY
643 # ──────────────────────────────────────────────────────────────────────────────
644
645
646 class TestCopy:
647 def test_copy_basic(self, repo: pathlib.Path) -> None:
648 _branch(repo, "orig")
649 result = _branch(repo, "-c", "orig", "clone")
650 assert result.exit_code == 0
651 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
652 assert "orig" in names
653 assert "clone" in names
654
655 def test_copy_same_tip(self, repo: pathlib.Path) -> None:
656 _branch(repo, "src")
657 _branch(repo, "-c", "src", "dst")
658 tip_src = get_head_commit_id(repo, "src")
659 tip_dst = get_head_commit_id(repo, "dst")
660 assert tip_src == tip_dst
661
662 def test_copy_json_schema(self, repo: pathlib.Path) -> None:
663 _branch(repo, "original")
664 result = _branch(repo, "-c", "original", "copy1", "--json")
665 data = json.loads(result.output)
666 assert data["action"] == "copied"
667 assert data["from"] == "original"
668 assert data["to"] == "copy1"
669
670 def test_copy_to_existing_exits_1(self, repo: pathlib.Path) -> None:
671 _branch(repo, "x")
672 _branch(repo, "y")
673 result = _branch(repo, "-c", "x", "y")
674 assert result.exit_code == 1
675
676 def test_force_copy_to_existing_exits_0(self, repo: pathlib.Path) -> None:
677 _branch(repo, "p")
678 _branch(repo, "q")
679 result = _branch(repo, "-C", "p", "q")
680 assert result.exit_code == 0
681
682 def test_copy_omit_src_uses_current(self, repo: pathlib.Path) -> None:
683 head = get_head_commit_id(repo, "main")
684 result = _branch(repo, "-c", "main-copy")
685 assert result.exit_code == 0
686 tip = get_head_commit_id(repo, "main-copy")
687 assert tip == head
688
689
690 # ──────────────────────────────────────────────────────────────────────────────
691 # Integration — LIST
692 # ──────────────────────────────────────────────────────────────────────────────
693
694
695 class TestList:
696 def test_list_text_exits_0(self, repo: pathlib.Path) -> None:
697 result = _branch(repo)
698 assert result.exit_code == 0
699
700 def test_list_contains_main(self, repo: pathlib.Path) -> None:
701 result = _branch(repo)
702 assert "main" in result.output
703
704 def test_list_marks_current_branch(self, repo: pathlib.Path) -> None:
705 result = _branch(repo)
706 # Current branch line must start with "* "
707 current_lines = [l for l in result.output.splitlines() if l.startswith("* ")]
708 assert len(current_lines) == 1
709 assert "main" in current_lines[0]
710
711 def test_list_json_schema(self, repo: pathlib.Path) -> None:
712 result = _branch(repo, "--json")
713 data = json.loads(result.output)
714 assert isinstance(data, list)
715 assert len(data) >= 1
716 keys = set(data[0].keys())
717 assert {"name", "current", "commit_id", "last_message", "upstream"} <= keys
718
719 def test_list_json_current_flag(self, repo: pathlib.Path) -> None:
720 result = _branch(repo, "--json")
721 data = json.loads(result.output)
722 current = [b for b in data if b["current"]]
723 assert len(current) == 1
724 assert current[0]["name"] == "main"
725
726 def test_list_json_last_message_populated(self, repo: pathlib.Path) -> None:
727 result = _branch(repo, "--json")
728 data = json.loads(result.output)
729 main_entry = next(b for b in data if b["name"] == "main")
730 assert main_entry["last_message"] is not None
731 assert "initial" in main_entry["last_message"]
732
733 def test_list_json_upstream_null_by_default(self, repo: pathlib.Path) -> None:
734 result = _branch(repo, "--json")
735 data = json.loads(result.output)
736 main_entry = next(b for b in data if b["name"] == "main")
737 assert main_entry["upstream"] is None
738
739 def test_list_verbose_shows_sha(self, repo: pathlib.Path) -> None:
740 result = _branch(repo, "-v")
741 # Short SHA should appear
742 cid = get_head_commit_id(repo, "main")
743 assert cid is not None
744 assert cid[:8] in result.output
745
746 def test_list_verbose_shows_message(self, repo: pathlib.Path) -> None:
747 result = _branch(repo, "-v")
748 assert "initial" in result.output
749
750 def test_list_multiple_branches(self, repo: pathlib.Path) -> None:
751 _branch(repo, "feat/a")
752 _branch(repo, "feat/b")
753 result = _branch(repo, "--json")
754 data = json.loads(result.output)
755 names = [b["name"] for b in data]
756 assert "feat/a" in names
757 assert "feat/b" in names
758
759 def test_list_sorted_by_name(self, repo: pathlib.Path) -> None:
760 _branch(repo, "z-last")
761 _branch(repo, "a-first")
762 result = _branch(repo, "--json")
763 data = json.loads(result.output)
764 names = [b["name"] for b in data]
765 assert names == sorted(names)
766
767 def test_list_sort_committeddate(self, repo: pathlib.Path) -> None:
768 _branch(repo, "feat-x")
769 result = _branch(repo, "--sort", "committeddate", "--json")
770 assert result.exit_code == 0
771 data = json.loads(result.output)
772 assert isinstance(data, list)
773
774
775 # ──────────────────────────────────────────────────────────────────────────────
776 # Integration — FILTERS
777 # ──────────────────────────────────────────────────────────────────────────────
778
779
780 class TestFilters:
781 def test_merged_filter_includes_self(self, repo: pathlib.Path) -> None:
782 result = _branch(repo, "--merged", "--json")
783 data = json.loads(result.output)
784 names = [b["name"] for b in data]
785 assert "main" in names
786
787 def test_merged_filter_excludes_unmerged(self, repo: pathlib.Path) -> None:
788 _branch(repo, "unmerged-feat")
789 _invoke(repo, ["checkout", "unmerged-feat"])
790 (repo / "u.py").write_text("u=1\n")
791 _commit(repo, "-m", "unmerged")
792 _invoke(repo, ["checkout", "main"])
793 result = _branch(repo, "--merged", "--json")
794 data = json.loads(result.output)
795 names = [b["name"] for b in data]
796 assert "unmerged-feat" not in names
797
798 def test_no_merged_filter_includes_unmerged(self, repo: pathlib.Path) -> None:
799 _branch(repo, "exclusive-feat")
800 _invoke(repo, ["checkout", "exclusive-feat"])
801 (repo / "e.py").write_text("e=1\n")
802 _commit(repo, "-m", "exclusive")
803 _invoke(repo, ["checkout", "main"])
804 result = _branch(repo, "--no-merged", "--json")
805 data = json.loads(result.output)
806 names = [b["name"] for b in data]
807 assert "exclusive-feat" in names
808
809 def test_no_merged_filter_excludes_self(self, repo: pathlib.Path) -> None:
810 result = _branch(repo, "--no-merged", "--json")
811 data = json.loads(result.output)
812 names = [b["name"] for b in data]
813 assert "main" not in names
814
815 def test_contains_commit_filter(self, repo: pathlib.Path) -> None:
816 cid = get_head_commit_id(repo, "main")
817 assert cid is not None
818 result = _branch(repo, "--contains", cid, "--json")
819 data = json.loads(result.output)
820 names = [b["name"] for b in data]
821 assert "main" in names
822
823 def test_contains_unknown_commit_empty(self, repo: pathlib.Path) -> None:
824 result = _branch(repo, "--contains", "a" * 64, "--json")
825 data = json.loads(result.output)
826 assert data == []
827
828
829 # ──────────────────────────────────────────────────────────────────────────────
830 # Integration — validation
831 # ──────────────────────────────────────────────────────────────────────────────
832
833
834 class TestValidation:
835 def test_ansi_in_pattern_arg_sanitized(self, repo: pathlib.Path) -> None:
836 result = _branch(repo, "--pattern", "\x1b[31mxml\x1b[0m")
837 assert "\x1b" not in result.output
838
839 def test_delete_without_name_exits_1(self, repo: pathlib.Path) -> None:
840 result = _branch(repo, "-d")
841 assert result.exit_code == 1
842
843 def test_rename_too_many_args_exits_1(self, repo: pathlib.Path) -> None:
844 result = _branch(repo, "-m", "a", "b", "c")
845 assert result.exit_code == 1
846
847 def test_copy_too_many_args_exits_1(self, repo: pathlib.Path) -> None:
848 result = _branch(repo, "-c", "a", "b", "c")
849 assert result.exit_code == 1
850
851
852 # ──────────────────────────────────────────────────────────────────────────────
853 # Security — ANSI injection
854 # ──────────────────────────────────────────────────────────────────────────────
855
856
857 class TestSecurityAnsi:
858 def _has_ansi(self, s: str) -> bool:
859 return "\x1b[" in s
860
861 def test_ansi_in_branch_name_rejected(self, repo: pathlib.Path) -> None:
862 result = _branch(repo, "\x1b[31mmalicious\x1b[0m")
863 assert result.exit_code == 1
864 assert not self._has_ansi(result.output)
865
866 def test_ansi_in_delete_name_rejected(self, repo: pathlib.Path) -> None:
867 result = _branch(repo, "-d", "\x1b[31mmalicious\x1b[0m")
868 assert result.exit_code == 1
869 assert not self._has_ansi(result.output)
870
871 def test_ansi_in_rename_new_name_rejected(self, repo: pathlib.Path) -> None:
872 result = _branch(repo, "-m", "\x1b[31mnew\x1b[0m")
873 assert result.exit_code == 1
874 assert not self._has_ansi(result.output)
875
876 def test_ansi_in_contains_arg_sanitized(self, repo: pathlib.Path) -> None:
877 result = _branch(repo, "--contains", "\x1b[31mxml\x1b[0m")
878 assert not self._has_ansi(result.output)
879
880 def test_ansi_in_contains_commit_id(self, repo: pathlib.Path) -> None:
881 result = _branch(repo, "--contains", "\x1b[31mmalicious\x1b[0m")
882 # Should exit 0 (no match, empty list) or exit 0 with empty list
883 # Either way, ANSI must not appear in output
884 assert not self._has_ansi(result.output)
885
886 def test_errors_go_to_stderr(self, repo: pathlib.Path) -> None:
887 result = _branch(repo, "-d", "nonexistent")
888 assert result.exit_code == 1
889 # Error should NOT appear in stdout
890 assert "not found" not in result.output.lower() or (result.stderr and "not found" in result.stderr.lower())
891
892
893 # ──────────────────────────────────────────────────────────────────────────────
894 # Stress
895 # ──────────────────────────────────────────────────────────────────────────────
896
897
898 @pytest.mark.slow
899 class TestStress:
900 def test_list_500_branches_fast(self, repo: pathlib.Path) -> None:
901 """Listing 500 branches must complete in under 2 seconds."""
902 for i in range(500):
903 _branch(repo, f"feat/task-{i:04d}")
904 t0 = time.perf_counter()
905 result = _branch(repo, "--json")
906 elapsed = (time.perf_counter() - t0) * 1000
907 data = json.loads(result.output)
908 assert len(data) == 501 # main + 500
909 assert elapsed < 2000, f"list 500 branches took {elapsed:.0f}ms (limit 2000ms)"
910
911 def test_merged_filter_100_branches(self, repo: pathlib.Path) -> None:
912 """--merged filter on 100 branches completes in reasonable time."""
913 for i in range(100):
914 _branch(repo, f"task-{i:03d}")
915 t0 = time.perf_counter()
916 result = _branch(repo, "--merged", "--json")
917 elapsed = (time.perf_counter() - t0) * 1000
918 data = json.loads(result.output)
919 # All branches share the same commit as main → all merged
920 assert len(data) == 101
921 assert elapsed < 3000, f"--merged on 100 branches took {elapsed:.0f}ms"
922
923 def test_sort_committeddate_100_branches(self, repo: pathlib.Path) -> None:
924 for i in range(100):
925 _branch(repo, f"sort-{i:03d}")
926 result = _branch(repo, "--sort", "committeddate", "--json")
927 assert result.exit_code == 0
928 data = json.loads(result.output)
929 assert len(data) == 101
930
931 def test_concurrent_branch_list_separate_repos(self, tmp_path: pathlib.Path) -> None:
932 errors: list[str] = []
933
934 def do_branch(idx: int) -> None:
935 repo_dir = tmp_path / f"repo_{idx}"
936 repo_dir.mkdir()
937 subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True)
938 (repo_dir / "x.py").write_text(f"x={idx}\n")
939 subprocess.run(
940 ["muse", "commit", "-m", f"c{idx}"],
941 cwd=str(repo_dir), capture_output=True,
942 )
943 for j in range(5):
944 subprocess.run(
945 ["muse", "branch", f"b{j}"],
946 cwd=str(repo_dir), capture_output=True,
947 )
948 r = subprocess.run(
949 ["muse", "branch", "--json"],
950 cwd=str(repo_dir), capture_output=True, text=True,
951 )
952 if r.returncode != 0:
953 errors.append(f"repo_{idx}: branch --json failed")
954 return
955 data = json.loads(r.stdout)
956 if len(data) != 6: # main + 5
957 errors.append(f"repo_{idx}: expected 6 branches, got {len(data)}")
958
959 threads = [threading.Thread(target=do_branch, args=(i,)) for i in range(6)]
960 for t in threads:
961 t.start()
962 for t in threads:
963 t.join()
964 assert not errors, f"Concurrent branch errors:\n{'\n'.join(errors)}"
965
966 def test_deep_ancestor_chain_is_merged(self, repo: pathlib.Path) -> None:
967 """A branch with 50 ancestors is correctly detected as merged."""
968 _branch(repo, "long-chain")
969 _invoke(repo, ["checkout", "long-chain"])
970 for i in range(50):
971 (repo / f"step_{i:03d}.py").write_text(f"s={i}\n")
972 _commit(repo, "-m", f"step {i}")
973 _invoke(repo, ["checkout", "main"])
974 _invoke(repo, ["merge", "long-chain"])
975 result = _branch(repo, "--merged", "--json")
976 data = json.loads(result.output)
977 names = [b["name"] for b in data]
978 assert "long-chain" in names
979
980 def test_merged_filter_ancestor_set_computed_once(
981 self, repo: pathlib.Path
982 ) -> None:
983 """--merged must compute the 'into' ancestor set once, not once per branch.
984
985 With N branches, the naive implementation calls _commit_ancestors N times
986 for the same 'into' tip. The fix pre-computes it once and checks each
987 branch tip against the cached set.
988 """
989 from unittest.mock import patch
990 import muse.cli.commands.branch as branch_module
991
992 for i in range(20):
993 _branch(repo, f"feat-{i:02d}")
994
995 with patch.object(
996 branch_module, "_commit_ancestors", wraps=branch_module._commit_ancestors
997 ) as mock_ca:
998 result = _branch(repo, "--merged", "--json")
999
1000 assert result.exit_code == 0
1001 data = json.loads(result.output)
1002 assert len(data) == 21 # main + 20
1003
1004 # The ancestor set for 'main' (the into branch) must be computed exactly once,
1005 # not once per branch being checked.
1006 into_calls = [c for c in mock_ca.call_args_list if c.args[1] != ""]
1007 # All calls with the same commit_id (main's tip) should collapse to 1.
1008 unique_commit_ids = {c.args[1] for c in mock_ca.call_args_list}
1009 assert len(mock_ca.call_args_list) <= len(unique_commit_ids) + 1, (
1010 f"_commit_ancestors called {len(mock_ca.call_args_list)}× but only "
1011 f"{len(unique_commit_ids)} unique commit IDs — ancestor set is being "
1012 "recomputed per branch instead of once"
1013 )
1014
1015
1016 # ──────────────────────────────────────────────────────────────────────────────
1017 # Prune-config — remove stale [branch.*] entries from config.toml
1018 # ──────────────────────────────────────────────────────────────────────────────
1019
1020
1021 class TestPruneConfig:
1022 """muse branch --prune-config removes config entries for nonexistent branches."""
1023
1024 def test_prune_config_removes_stale_entries(self, repo: pathlib.Path) -> None:
1025 """Stale [branch.*] sections are deleted; live branches are kept."""
1026 from muse.cli.config import write_branch_meta, read_branch_meta
1027
1028 # Write a stale entry directly — branch ref never created
1029 write_branch_meta(repo, "task/stale-1", intent="stale one", resumable=True)
1030 write_branch_meta(repo, "task/stale-2", intent="stale two", resumable=True)
1031
1032 # Create a real branch so it should be kept
1033 _branch(repo, "feat/keep-me", "--intent", "keep this", "--resumable")
1034
1035 result = _branch(repo, "--prune-config", "--json")
1036 assert result.exit_code == 0, result.output
1037
1038 data = json.loads(result.output)
1039 assert data["pruned"] == 2, f"expected 2 pruned, got {data}"
1040 assert data["kept"] >= 1 # feat/keep-me and main at minimum
1041
1042 # Stale entries are gone from config
1043 assert read_branch_meta(repo, "task/stale-1") == {}
1044 assert read_branch_meta(repo, "task/stale-2") == {}
1045
1046 # Live branch entry still present
1047 assert read_branch_meta(repo, "feat/keep-me").get("intent") == "keep this"
1048
1049 def test_prune_config_noop_when_all_live(self, repo: pathlib.Path) -> None:
1050 """Pruning a repo with no stale entries reports 0 pruned."""
1051 _branch(repo, "feat/real-branch")
1052 result = _branch(repo, "--prune-config", "--json")
1053 assert result.exit_code == 0
1054 data = json.loads(result.output)
1055 assert data["pruned"] == 0
1056
1057 def test_prune_config_dry_run(self, repo: pathlib.Path) -> None:
1058 """--prune-config --dry-run reports stale entries without removing them."""
1059 from muse.cli.config import write_branch_meta, read_branch_meta
1060
1061 write_branch_meta(repo, "task/ghost", intent="ghost", resumable=True)
1062
1063 result = _branch(repo, "--prune-config", "--dry-run", "--json")
1064 assert result.exit_code == 0
1065 data = json.loads(result.output)
1066 assert data["pruned"] == 1
1067 assert data["dry_run"] is True
1068
1069 # Entry still present — dry run made no changes
1070 assert read_branch_meta(repo, "task/ghost").get("intent") == "ghost"
File History 3 commits
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29 fix(branch): guard -d --dry-run against destructive writes;… Sonnet 4.6 patch 1 day ago
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103 feat: branch --prune-config, fix hub repo delete docstrings… Sonnet 4.6 minor 8 days ago