gabriel / muse public
test_cmd_branch.py python
1,021 lines 43.9 KB
Raw
1 """Tests for ``muse branch``.
2
3 Coverage tiers
4 --------------
5 Unit — parser flags, dead-code removal, helpers (_resolve_start_point,
6 _list_local_branches, _list_remotes, _upstream_for,
7 _commit_ancestors, _is_merged, _contains_commit,
8 _cleanup_empty_dirs).
9 Integration — create, delete, force-delete, rename, force-rename, copy,
10 force-copy, listing, filtering, sorting.
11 End-to-end — full CLI invocations: text and JSON output, all operations.
12 Security — ANSI injection in branch names, format flags, messages.
13 Stress — 500 branches, concurrent list, deep ancestry chains.
14 """
15
16 from __future__ import annotations
17
18 import json
19 import os
20 import pathlib
21 import subprocess
22 import threading
23 import time
24 from typing import TYPE_CHECKING
25
26 import pytest
27
28 from tests.cli_test_helper import CliRunner, InvokeResult
29 from muse.core.refs import (
30 get_head_commit_id,
31 read_current_branch,
32 )
33 from muse.core.types import short_id
34 from muse.core.paths import heads_dir, logs_dir
35
36 if TYPE_CHECKING:
37 import argparse
38
39 runner = CliRunner()
40
41 # ──────────────────────────────────────────────────────────────────────────────
42 # Helpers
43 # ──────────────────────────────────────────────────────────────────────────────
44
45
46 def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult:
47 saved = os.getcwd()
48 try:
49 os.chdir(repo)
50 return runner.invoke(None, args)
51 finally:
52 os.chdir(saved)
53
54
55 def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult:
56 return _invoke(repo, ["branch", *extra])
57
58
59 def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult:
60 _invoke(repo, ["code", "add", "."])
61 return _invoke(repo, ["commit", *extra])
62
63
64 @pytest.fixture()
65 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
66 """Initialised repo with one commit on ``main``."""
67 saved = os.getcwd()
68 try:
69 os.chdir(tmp_path)
70 runner.invoke(None, ["init"])
71 finally:
72 os.chdir(saved)
73 (tmp_path / "a.py").write_text("x = 1\n")
74 _commit(tmp_path, "-m", "initial")
75 return tmp_path
76
77
78 @pytest.fixture()
79 def two_commit_repo(repo: pathlib.Path) -> pathlib.Path:
80 """Repo with two commits on ``main``."""
81 (repo / "b.py").write_text("y = 2\n")
82 _commit(repo, "-m", "second")
83 return repo
84
85
86 # ──────────────────────────────────────────────────────────────────────────────
87 # Unit — parser flags
88 # ──────────────────────────────────────────────────────────────────────────────
89
90
91 class TestRegisterFlags:
92 def _parse(self, *args: str) -> "argparse.Namespace":
93 import argparse
94
95 from muse.cli.commands.branch import register
96
97 p = argparse.ArgumentParser()
98 sub = p.add_subparsers()
99 register(sub)
100 return p.parse_args(["branch", *args])
101
102 def test_default_json_out_is_false(self) -> None:
103 ns = self._parse()
104 assert ns.json_out is False
105
106 def test_json_flag_sets_json_out(self) -> None:
107 ns = self._parse("--json")
108 assert ns.json_out is True
109
110 def test_j_shorthand_sets_json_out(self) -> None:
111 ns = self._parse("-j")
112 assert ns.json_out is True
113
114 def test_delete_flag(self) -> None:
115 ns = self._parse("-d", "foo")
116 assert ns.op == "delete"
117
118 def test_force_delete_flag(self) -> None:
119 ns = self._parse("-D", "foo")
120 assert ns.op == "force_delete"
121
122 def test_rename_flag(self) -> None:
123 ns = self._parse("-m", "new")
124 assert ns.op == "rename"
125
126 def test_force_rename_flag(self) -> None:
127 ns = self._parse("-M", "new")
128 assert ns.op == "force_rename"
129
130 def test_copy_flag(self) -> None:
131 ns = self._parse("-c", "copy")
132 assert ns.op == "copy"
133
134 def test_force_copy_flag(self) -> None:
135 ns = self._parse("-C", "copy")
136 assert ns.op == "force_copy"
137
138 def test_verbose_default_0(self) -> None:
139 ns = self._parse()
140 assert ns.verbose == 0
141
142 def test_verbose_v_is_1(self) -> None:
143 ns = self._parse("-v")
144 assert ns.verbose == 1
145
146 def test_verbose_vv_is_2(self) -> None:
147 ns = self._parse("-vv")
148 assert ns.verbose == 2
149
150 def test_remotes_flag(self) -> None:
151 ns = self._parse("-r")
152 assert ns.remotes is True
153
154 def test_all_flag(self) -> None:
155 ns = self._parse("-a")
156 assert ns.all_branches is True
157
158 def test_sort_default_name(self) -> None:
159 ns = self._parse()
160 assert ns.sort == "name"
161
162 def test_sort_committeddate(self) -> None:
163 ns = self._parse("--sort", "committeddate")
164 assert ns.sort == "committeddate"
165
166 def test_sort_invalid_rejected(self) -> None:
167 import argparse
168
169 from muse.cli.commands.branch import register
170
171 p = argparse.ArgumentParser()
172 sub = p.add_subparsers()
173 register(sub)
174 with pytest.raises(SystemExit):
175 p.parse_args(["branch", "--sort", "invalid"])
176
177
178 # ──────────────────────────────────────────────────────────────────────────────
179 # Unit — dead-code removal
180 # ──────────────────────────────────────────────────────────────────────────────
181
182
183 class TestDeadCodeRemoved:
184 def test_op_list_branch_removed(self) -> None:
185 import inspect
186
187 import muse.cli.commands.branch as m
188
189 src = inspect.getsource(m.run)
190 assert 'op == "list"' not in src, (
191 'op == "list" was a dead branch (nothing in register() creates it); must be deleted'
192 )
193
194 def test_inline_tomllib_import_removed(self) -> None:
195 import inspect
196
197 import muse.cli.commands.branch as m
198
199 src = inspect.getsource(m._upstream_for)
200 assert "import tomllib" not in src, (
201 "inline 'import tomllib' inside _upstream_for should be a module-level import"
202 )
203
204 def test_double_sanitize_removed(self) -> None:
205 """The verbose listing previously double-sanitized name_str (stripping ANSI)."""
206 import inspect
207
208 import muse.cli.commands.branch as m
209
210 src = inspect.getsource(m.run)
211 assert "sanitize_display(name_str)" not in src, (
212 "name_str was double-sanitized; the second call stripped ANSI from current branch"
213 )
214
215
216 # ──────────────────────────────────────────────────────────────────────────────
217 # Unit — _resolve_start_point
218 # ──────────────────────────────────────────────────────────────────────────────
219
220
221 class TestResolveStartPoint:
222 def test_resolves_branch_name(self, repo: pathlib.Path) -> None:
223 from muse.cli.commands.branch import _resolve_start_point
224
225 cid = get_head_commit_id(repo, "main")
226 result = _resolve_start_point(repo, "main", "main")
227 assert result == cid
228
229 def test_resolves_full_sha(self, repo: pathlib.Path) -> None:
230 from muse.cli.commands.branch import _resolve_start_point
231
232 cid = get_head_commit_id(repo, "main")
233 assert cid is not None
234 result = _resolve_start_point(repo, "main", cid)
235 assert result == cid
236
237 def test_resolves_partial_sha(self, two_commit_repo: pathlib.Path) -> None:
238 from muse.cli.commands.branch import _resolve_start_point
239 from muse.core.refs import read_current_branch
240 from muse.core.commits import get_commits_for_branch
241
242 repo = two_commit_repo
243 branch = read_current_branch(repo)
244 commits = get_commits_for_branch(repo, branch)
245 first_sha = commits[-1].commit_id # oldest commit
246 # 12-char prefix should resolve
247 result = _resolve_start_point(repo, "main", short_id(first_sha))
248 assert result == first_sha
249
250 def test_returns_input_for_unresolvable(self, repo: pathlib.Path) -> None:
251 from muse.cli.commands.branch import _resolve_start_point
252
253 result = _resolve_start_point(repo, "main", "nonexistent-ref")
254 assert result == "nonexistent-ref"
255
256
257 # ──────────────────────────────────────────────────────────────────────────────
258 # Unit — _list_local_branches
259 # ──────────────────────────────────────────────────────────────────────────────
260
261
262 class TestListLocalBranches:
263 def test_returns_sorted_list(self, repo: pathlib.Path) -> None:
264 from muse.cli.commands.branch import _list_local_branches
265
266 _branch(repo, "z-last")
267 _branch(repo, "a-first")
268 branches = _list_local_branches(repo)
269 assert branches == sorted(branches)
270
271 def test_skips_hidden_files(self, repo: pathlib.Path) -> None:
272 from muse.cli.commands.branch import _list_local_branches
273
274 # Plant a hidden lock file inside refs/heads/
275 lock = heads_dir(repo) / ".lock"
276 lock.write_text("locked")
277 branches = _list_local_branches(repo)
278 assert ".lock" not in branches
279 assert not any(b.startswith(".") for b in branches)
280
281 def test_empty_repo_returns_empty(self, tmp_path: pathlib.Path) -> None:
282 from muse.cli.commands.branch import _list_local_branches
283
284 assert _list_local_branches(tmp_path) == []
285
286 def test_includes_nested_branches(self, repo: pathlib.Path) -> None:
287 from muse.cli.commands.branch import _list_local_branches
288
289 _branch(repo, "feat/sub/task")
290 branches = _list_local_branches(repo)
291 assert "feat/sub/task" in branches
292
293
294 # ──────────────────────────────────────────────────────────────────────────────
295 # Unit — _commit_ancestors, _is_merged, _contains_commit
296 # ──────────────────────────────────────────────────────────────────────────────
297
298
299 class TestCommitGraph:
300 def test_commit_ancestors_includes_self(self, repo: pathlib.Path) -> None:
301 from muse.cli.commands.branch import _commit_ancestors
302
303 cid = get_head_commit_id(repo, "main")
304 assert cid is not None
305 ancestors = _commit_ancestors(repo, cid)
306 assert cid in ancestors
307
308 def test_is_merged_true_for_same_branch(self, repo: pathlib.Path) -> None:
309 from muse.cli.commands.branch import _is_merged
310
311 assert _is_merged(repo, "main", "main")
312
313 def test_is_merged_false_for_unmerged(self, repo: pathlib.Path) -> None:
314 from muse.cli.commands.branch import _is_merged
315
316 _branch(repo, "feat")
317 _invoke(repo, ["checkout", "feat"])
318 (repo / "c.py").write_text("c=1\n")
319 _commit(repo, "-m", "feat commit")
320 _invoke(repo, ["checkout", "main"])
321 assert not _is_merged(repo, "feat", "main")
322
323 def test_contains_commit_true(self, repo: pathlib.Path) -> None:
324 from muse.cli.commands.branch import _contains_commit
325
326 cid = get_head_commit_id(repo, "main")
327 assert cid is not None
328 assert _contains_commit(repo, "main", cid)
329
330 def test_contains_commit_false_for_unknown(self, repo: pathlib.Path) -> None:
331 from muse.cli.commands.branch import _contains_commit
332
333 assert not _contains_commit(repo, "main", "a" * 64)
334
335
336 # ──────────────────────────────────────────────────────────────────────────────
337 # Integration — CREATE
338 # ──────────────────────────────────────────────────────────────────────────────
339
340
341 class TestCreate:
342 def test_create_basic_exits_0(self, repo: pathlib.Path) -> None:
343 result = _branch(repo, "new-branch")
344 assert result.exit_code == 0
345
346 def test_create_text_output(self, repo: pathlib.Path) -> None:
347 result = _branch(repo, "my-branch")
348 assert "my-branch" in result.output
349
350 def test_create_json_schema(self, repo: pathlib.Path) -> None:
351 result = _branch(repo, "json-branch", "--json")
352 data = json.loads(result.output)
353 assert data["action"] == "created"
354 assert data["branch"] == "json-branch"
355 assert "commit_id" in data
356 assert "from" in data
357
358 def test_create_json_from_is_none_at_head(self, repo: pathlib.Path) -> None:
359 result = _branch(repo, "from-head", "--json")
360 data = json.loads(result.output)
361 assert data["from"] is None
362
363 def test_create_at_full_sha(self, two_commit_repo: pathlib.Path) -> None:
364 repo = two_commit_repo
365 from muse.core.refs import read_current_branch
366 from muse.core.commits import get_commits_for_branch
367
368 branch = read_current_branch(repo)
369 commits = get_commits_for_branch(repo, branch)
370 first_sha = commits[-1].commit_id
371
372 result = _branch(repo, "at-sha", first_sha)
373 assert result.exit_code == 0
374 tip = get_head_commit_id(repo, "at-sha")
375 assert tip == first_sha
376
377 def test_create_at_partial_sha(self, two_commit_repo: pathlib.Path) -> None:
378 repo = two_commit_repo
379 from muse.core.refs import read_current_branch
380 from muse.core.commits import get_commits_for_branch
381
382 branch = read_current_branch(repo)
383 commits = get_commits_for_branch(repo, branch)
384 first_sha = commits[-1].commit_id
385
386 result = _branch(repo, "at-partial", short_id(first_sha))
387 assert result.exit_code == 0
388 tip = get_head_commit_id(repo, "at-partial")
389 assert tip == first_sha
390
391 def test_create_at_branch_name(self, repo: pathlib.Path) -> None:
392 head_cid = get_head_commit_id(repo, "main")
393 result = _branch(repo, "copy-of-main", "main")
394 assert result.exit_code == 0
395 tip = get_head_commit_id(repo, "copy-of-main")
396 assert tip == head_cid
397
398 def test_create_json_from_field_populated(self, repo: pathlib.Path) -> None:
399 result = _branch(repo, "with-from", "main", "--json")
400 data = json.loads(result.output)
401 assert data["from"] == "main"
402
403 def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None:
404 _branch(repo, "dup")
405 result = _branch(repo, "dup")
406 assert result.exit_code == 1
407
408 def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None:
409 result = _branch(repo, "bad..name")
410 assert result.exit_code == 1
411
412 def test_create_does_not_checkout(self, repo: pathlib.Path) -> None:
413 _branch(repo, "new-but-no-switch")
414 assert read_current_branch(repo) == "main"
415
416
417 # ──────────────────────────────────────────────────────────────────────────────
418 # Integration — DELETE
419 # ──────────────────────────────────────────────────────────────────────────────
420
421
422 class TestDelete:
423 def test_delete_merged_branch_exits_0(self, repo: pathlib.Path) -> None:
424 _branch(repo, "to-delete")
425 # Branch points to same commit as main → considered merged
426 result = _branch(repo, "-d", "to-delete")
427 assert result.exit_code == 0
428
429 def test_delete_json_schema(self, repo: pathlib.Path) -> None:
430 _branch(repo, "del-json")
431 result = _branch(repo, "-d", "del-json", "--json")
432 data = json.loads(result.output)
433 assert data["action"] == "deleted"
434 assert data["branch"] == "del-json"
435 assert "was" in data
436
437 def test_delete_unmerged_exits_1_without_force(self, repo: pathlib.Path) -> None:
438 _branch(repo, "unmerged")
439 _invoke(repo, ["checkout", "unmerged"])
440 (repo / "z.py").write_text("z=1\n")
441 _commit(repo, "-m", "unmerged work")
442 _invoke(repo, ["checkout", "main"])
443 result = _branch(repo, "-d", "unmerged")
444 assert result.exit_code == 1
445
446 def test_force_delete_unmerged_exits_0(self, repo: pathlib.Path) -> None:
447 _branch(repo, "force-del")
448 _invoke(repo, ["checkout", "force-del"])
449 (repo / "x.py").write_text("x=1\n")
450 _commit(repo, "-m", "exclusive work")
451 _invoke(repo, ["checkout", "main"])
452 result = _branch(repo, "-D", "force-del")
453 assert result.exit_code == 0
454
455 def test_delete_current_branch_exits_1(self, repo: pathlib.Path) -> None:
456 result = _branch(repo, "-d", "main")
457 assert result.exit_code == 1
458
459 def test_delete_nonexistent_exits_1(self, repo: pathlib.Path) -> None:
460 result = _branch(repo, "-d", "ghost")
461 assert result.exit_code == 1
462
463 def test_delete_removes_branch_from_list(self, repo: pathlib.Path) -> None:
464 _branch(repo, "temp")
465 _branch(repo, "-d", "temp")
466 result = _branch(repo, "--json")
467 names = [b["name"] for b in json.loads(result.output)]
468 assert "temp" not in names
469
470 def test_delete_nested_branch_cleans_empty_dirs(self, repo: pathlib.Path) -> None:
471 _branch(repo, "feat/sub/task")
472 _branch(repo, "-D", "feat/sub/task")
473 # The feat/ and feat/sub/ dirs should be gone
474 feat_dir = heads_dir(repo) / "feat"
475 assert not feat_dir.exists()
476
477 def test_delete_removes_reflog_file(self, repo: pathlib.Path) -> None:
478 """Deleting a branch removes its reflog file — git-idiomatic behaviour."""
479 _invoke(repo, ["checkout", "-b", "bye"]) # checkout writes the reflog
480 _invoke(repo, ["checkout", "main"])
481 reflog = logs_dir(repo) / "refs" / "heads" / "bye"
482 assert reflog.exists(), "reflog should exist after checkout -b"
483 _branch(repo, "-d", "bye")
484 assert not reflog.exists(), "reflog must be deleted when branch is deleted"
485
486 def test_delete_nested_branch_removes_reflog_and_empty_dirs(
487 self, repo: pathlib.Path
488 ) -> None:
489 """Nested branch deletion removes reflog file and its empty parent dirs."""
490 _invoke(repo, ["checkout", "-b", "feat/ui/button"])
491 _invoke(repo, ["checkout", "main"])
492 reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "ui" / "button"
493 assert reflog.exists(), "reflog should exist after checkout -b"
494 _branch(repo, "-D", "feat/ui/button")
495 assert not reflog.exists()
496 log_feat_dir = logs_dir(repo) / "refs" / "heads" / "feat"
497 assert not log_feat_dir.exists(), "empty reflog parent dirs must be cleaned up"
498
499 def test_force_delete_also_removes_reflog(self, repo: pathlib.Path) -> None:
500 """-D (force delete) removes the reflog just like -d."""
501 _invoke(repo, ["checkout", "-b", "force-log"])
502 (repo / "tmp.py").write_text("x=1\n")
503 _commit(repo, "-m", "unmerged")
504 _invoke(repo, ["checkout", "main"])
505 reflog = logs_dir(repo) / "refs" / "heads" / "force-log"
506 assert reflog.exists()
507 _branch(repo, "-D", "force-log")
508 assert not reflog.exists()
509
510
511 # ──────────────────────────────────────────────────────────────────────────────
512 # Integration — CREATE REFLOG
513 # ──────────────────────────────────────────────────────────────────────────────
514
515
516 class TestCreateReflog:
517 def test_branch_create_writes_reflog(self, repo: pathlib.Path) -> None:
518 """muse branch -b writes a reflog entry — git-idiomatic behaviour."""
519 _branch(repo, "feat/new")
520 reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "new"
521 assert reflog.exists(), "reflog must exist after muse branch -b"
522
523 def test_branch_create_reflog_contains_branch_created(self, repo: pathlib.Path) -> None:
524 """Reflog entry records a 'branch: Created' operation."""
525 _branch(repo, "task/thing")
526 reflog = logs_dir(repo) / "refs" / "heads" / "task" / "thing"
527 content = reflog.read_text(encoding="utf-8")
528 assert "branch: Created" in content
529
530 def test_branch_create_reflog_records_start_point(self, repo: pathlib.Path) -> None:
531 """Reflog entry for a branch created from another branch names that source."""
532 _branch(repo, "task/from-main", "main")
533 reflog = logs_dir(repo) / "refs" / "heads" / "task" / "from-main"
534 content = reflog.read_text(encoding="utf-8")
535 assert "main" in content
536
537
538 # ──────────────────────────────────────────────────────────────────────────────
539 # Integration — RENAME
540 # ──────────────────────────────────────────────────────────────────────────────
541
542
543 class TestRename:
544 def test_rename_basic(self, repo: pathlib.Path) -> None:
545 _branch(repo, "old-name")
546 result = _branch(repo, "-m", "old-name", "new-name")
547 assert result.exit_code == 0
548 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
549 assert "new-name" in names
550 assert "old-name" not in names
551
552 def test_rename_omit_old_uses_current(self, repo: pathlib.Path) -> None:
553 _branch(repo, "temp")
554 _invoke(repo, ["checkout", "temp"])
555 result = _branch(repo, "-m", "renamed")
556 assert result.exit_code == 0
557 assert read_current_branch(repo) == "renamed"
558 _invoke(repo, ["checkout", "main"])
559
560 def test_rename_json_schema(self, repo: pathlib.Path) -> None:
561 _branch(repo, "src")
562 result = _branch(repo, "-m", "src", "dst", "--json")
563 data = json.loads(result.output)
564 assert data["action"] == "renamed"
565 assert data["from"] == "src"
566 assert data["to"] == "dst"
567
568 def test_rename_to_existing_exits_1(self, repo: pathlib.Path) -> None:
569 _branch(repo, "a")
570 _branch(repo, "b")
571 result = _branch(repo, "-m", "a", "b")
572 assert result.exit_code == 1
573
574 def test_force_rename_to_existing_exits_0(self, repo: pathlib.Path) -> None:
575 _branch(repo, "a")
576 _branch(repo, "b")
577 result = _branch(repo, "-M", "a", "b")
578 assert result.exit_code == 0
579
580 def test_rename_updates_head_when_current(self, repo: pathlib.Path) -> None:
581 _branch(repo, "temp2")
582 _invoke(repo, ["checkout", "temp2"])
583 _branch(repo, "-m", "temp2", "newname")
584 assert read_current_branch(repo) == "newname"
585 _invoke(repo, ["checkout", "main"])
586
587 def test_rename_nonexistent_exits_1(self, repo: pathlib.Path) -> None:
588 result = _branch(repo, "-m", "ghost", "newname")
589 assert result.exit_code == 1
590
591
592 # ──────────────────────────────────────────────────────────────────────────────
593 # Integration — COPY
594 # ──────────────────────────────────────────────────────────────────────────────
595
596
597 class TestCopy:
598 def test_copy_basic(self, repo: pathlib.Path) -> None:
599 _branch(repo, "orig")
600 result = _branch(repo, "-c", "orig", "clone")
601 assert result.exit_code == 0
602 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
603 assert "orig" in names
604 assert "clone" in names
605
606 def test_copy_same_tip(self, repo: pathlib.Path) -> None:
607 _branch(repo, "src")
608 _branch(repo, "-c", "src", "dst")
609 tip_src = get_head_commit_id(repo, "src")
610 tip_dst = get_head_commit_id(repo, "dst")
611 assert tip_src == tip_dst
612
613 def test_copy_json_schema(self, repo: pathlib.Path) -> None:
614 _branch(repo, "original")
615 result = _branch(repo, "-c", "original", "copy1", "--json")
616 data = json.loads(result.output)
617 assert data["action"] == "copied"
618 assert data["from"] == "original"
619 assert data["to"] == "copy1"
620
621 def test_copy_to_existing_exits_1(self, repo: pathlib.Path) -> None:
622 _branch(repo, "x")
623 _branch(repo, "y")
624 result = _branch(repo, "-c", "x", "y")
625 assert result.exit_code == 1
626
627 def test_force_copy_to_existing_exits_0(self, repo: pathlib.Path) -> None:
628 _branch(repo, "p")
629 _branch(repo, "q")
630 result = _branch(repo, "-C", "p", "q")
631 assert result.exit_code == 0
632
633 def test_copy_omit_src_uses_current(self, repo: pathlib.Path) -> None:
634 head = get_head_commit_id(repo, "main")
635 result = _branch(repo, "-c", "main-copy")
636 assert result.exit_code == 0
637 tip = get_head_commit_id(repo, "main-copy")
638 assert tip == head
639
640
641 # ──────────────────────────────────────────────────────────────────────────────
642 # Integration — LIST
643 # ──────────────────────────────────────────────────────────────────────────────
644
645
646 class TestList:
647 def test_list_text_exits_0(self, repo: pathlib.Path) -> None:
648 result = _branch(repo)
649 assert result.exit_code == 0
650
651 def test_list_contains_main(self, repo: pathlib.Path) -> None:
652 result = _branch(repo)
653 assert "main" in result.output
654
655 def test_list_marks_current_branch(self, repo: pathlib.Path) -> None:
656 result = _branch(repo)
657 # Current branch line must start with "* "
658 current_lines = [l for l in result.output.splitlines() if l.startswith("* ")]
659 assert len(current_lines) == 1
660 assert "main" in current_lines[0]
661
662 def test_list_json_schema(self, repo: pathlib.Path) -> None:
663 result = _branch(repo, "--json")
664 data = json.loads(result.output)
665 assert isinstance(data, list)
666 assert len(data) >= 1
667 keys = set(data[0].keys())
668 assert {"name", "current", "commit_id", "last_message", "upstream"} <= keys
669
670 def test_list_json_current_flag(self, repo: pathlib.Path) -> None:
671 result = _branch(repo, "--json")
672 data = json.loads(result.output)
673 current = [b for b in data if b["current"]]
674 assert len(current) == 1
675 assert current[0]["name"] == "main"
676
677 def test_list_json_last_message_populated(self, repo: pathlib.Path) -> None:
678 result = _branch(repo, "--json")
679 data = json.loads(result.output)
680 main_entry = next(b for b in data if b["name"] == "main")
681 assert main_entry["last_message"] is not None
682 assert "initial" in main_entry["last_message"]
683
684 def test_list_json_upstream_null_by_default(self, repo: pathlib.Path) -> None:
685 result = _branch(repo, "--json")
686 data = json.loads(result.output)
687 main_entry = next(b for b in data if b["name"] == "main")
688 assert main_entry["upstream"] is None
689
690 def test_list_verbose_shows_sha(self, repo: pathlib.Path) -> None:
691 result = _branch(repo, "-v")
692 # Short SHA should appear
693 cid = get_head_commit_id(repo, "main")
694 assert cid is not None
695 assert cid[:8] in result.output
696
697 def test_list_verbose_shows_message(self, repo: pathlib.Path) -> None:
698 result = _branch(repo, "-v")
699 assert "initial" in result.output
700
701 def test_list_multiple_branches(self, repo: pathlib.Path) -> None:
702 _branch(repo, "feat/a")
703 _branch(repo, "feat/b")
704 result = _branch(repo, "--json")
705 data = json.loads(result.output)
706 names = [b["name"] for b in data]
707 assert "feat/a" in names
708 assert "feat/b" in names
709
710 def test_list_sorted_by_name(self, repo: pathlib.Path) -> None:
711 _branch(repo, "z-last")
712 _branch(repo, "a-first")
713 result = _branch(repo, "--json")
714 data = json.loads(result.output)
715 names = [b["name"] for b in data]
716 assert names == sorted(names)
717
718 def test_list_sort_committeddate(self, repo: pathlib.Path) -> None:
719 _branch(repo, "feat-x")
720 result = _branch(repo, "--sort", "committeddate", "--json")
721 assert result.exit_code == 0
722 data = json.loads(result.output)
723 assert isinstance(data, list)
724
725
726 # ──────────────────────────────────────────────────────────────────────────────
727 # Integration — FILTERS
728 # ──────────────────────────────────────────────────────────────────────────────
729
730
731 class TestFilters:
732 def test_merged_filter_includes_self(self, repo: pathlib.Path) -> None:
733 result = _branch(repo, "--merged", "--json")
734 data = json.loads(result.output)
735 names = [b["name"] for b in data]
736 assert "main" in names
737
738 def test_merged_filter_excludes_unmerged(self, repo: pathlib.Path) -> None:
739 _branch(repo, "unmerged-feat")
740 _invoke(repo, ["checkout", "unmerged-feat"])
741 (repo / "u.py").write_text("u=1\n")
742 _commit(repo, "-m", "unmerged")
743 _invoke(repo, ["checkout", "main"])
744 result = _branch(repo, "--merged", "--json")
745 data = json.loads(result.output)
746 names = [b["name"] for b in data]
747 assert "unmerged-feat" not in names
748
749 def test_no_merged_filter_includes_unmerged(self, repo: pathlib.Path) -> None:
750 _branch(repo, "exclusive-feat")
751 _invoke(repo, ["checkout", "exclusive-feat"])
752 (repo / "e.py").write_text("e=1\n")
753 _commit(repo, "-m", "exclusive")
754 _invoke(repo, ["checkout", "main"])
755 result = _branch(repo, "--no-merged", "--json")
756 data = json.loads(result.output)
757 names = [b["name"] for b in data]
758 assert "exclusive-feat" in names
759
760 def test_no_merged_filter_excludes_self(self, repo: pathlib.Path) -> None:
761 result = _branch(repo, "--no-merged", "--json")
762 data = json.loads(result.output)
763 names = [b["name"] for b in data]
764 assert "main" not in names
765
766 def test_contains_commit_filter(self, repo: pathlib.Path) -> None:
767 cid = get_head_commit_id(repo, "main")
768 assert cid is not None
769 result = _branch(repo, "--contains", cid, "--json")
770 data = json.loads(result.output)
771 names = [b["name"] for b in data]
772 assert "main" in names
773
774 def test_contains_unknown_commit_empty(self, repo: pathlib.Path) -> None:
775 result = _branch(repo, "--contains", "a" * 64, "--json")
776 data = json.loads(result.output)
777 assert data == []
778
779
780 # ──────────────────────────────────────────────────────────────────────────────
781 # Integration — validation
782 # ──────────────────────────────────────────────────────────────────────────────
783
784
785 class TestValidation:
786 def test_ansi_in_pattern_arg_sanitized(self, repo: pathlib.Path) -> None:
787 result = _branch(repo, "--pattern", "\x1b[31mxml\x1b[0m")
788 assert "\x1b" not in result.output
789
790 def test_delete_without_name_exits_1(self, repo: pathlib.Path) -> None:
791 result = _branch(repo, "-d")
792 assert result.exit_code == 1
793
794 def test_rename_too_many_args_exits_1(self, repo: pathlib.Path) -> None:
795 result = _branch(repo, "-m", "a", "b", "c")
796 assert result.exit_code == 1
797
798 def test_copy_too_many_args_exits_1(self, repo: pathlib.Path) -> None:
799 result = _branch(repo, "-c", "a", "b", "c")
800 assert result.exit_code == 1
801
802
803 # ──────────────────────────────────────────────────────────────────────────────
804 # Security — ANSI injection
805 # ──────────────────────────────────────────────────────────────────────────────
806
807
808 class TestSecurityAnsi:
809 def _has_ansi(self, s: str) -> bool:
810 return "\x1b[" in s
811
812 def test_ansi_in_branch_name_rejected(self, repo: pathlib.Path) -> None:
813 result = _branch(repo, "\x1b[31mmalicious\x1b[0m")
814 assert result.exit_code == 1
815 assert not self._has_ansi(result.output)
816
817 def test_ansi_in_delete_name_rejected(self, repo: pathlib.Path) -> None:
818 result = _branch(repo, "-d", "\x1b[31mmalicious\x1b[0m")
819 assert result.exit_code == 1
820 assert not self._has_ansi(result.output)
821
822 def test_ansi_in_rename_new_name_rejected(self, repo: pathlib.Path) -> None:
823 result = _branch(repo, "-m", "\x1b[31mnew\x1b[0m")
824 assert result.exit_code == 1
825 assert not self._has_ansi(result.output)
826
827 def test_ansi_in_contains_arg_sanitized(self, repo: pathlib.Path) -> None:
828 result = _branch(repo, "--contains", "\x1b[31mxml\x1b[0m")
829 assert not self._has_ansi(result.output)
830
831 def test_ansi_in_contains_commit_id(self, repo: pathlib.Path) -> None:
832 result = _branch(repo, "--contains", "\x1b[31mmalicious\x1b[0m")
833 # Should exit 0 (no match, empty list) or exit 0 with empty list
834 # Either way, ANSI must not appear in output
835 assert not self._has_ansi(result.output)
836
837 def test_errors_go_to_stderr(self, repo: pathlib.Path) -> None:
838 result = _branch(repo, "-d", "nonexistent")
839 assert result.exit_code == 1
840 # Error should NOT appear in stdout
841 assert "not found" not in result.output.lower() or (result.stderr and "not found" in result.stderr.lower())
842
843
844 # ──────────────────────────────────────────────────────────────────────────────
845 # Stress
846 # ──────────────────────────────────────────────────────────────────────────────
847
848
849 @pytest.mark.slow
850 class TestStress:
851 def test_list_500_branches_fast(self, repo: pathlib.Path) -> None:
852 """Listing 500 branches must complete in under 2 seconds."""
853 for i in range(500):
854 _branch(repo, f"feat/task-{i:04d}")
855 t0 = time.perf_counter()
856 result = _branch(repo, "--json")
857 elapsed = (time.perf_counter() - t0) * 1000
858 data = json.loads(result.output)
859 assert len(data) == 501 # main + 500
860 assert elapsed < 2000, f"list 500 branches took {elapsed:.0f}ms (limit 2000ms)"
861
862 def test_merged_filter_100_branches(self, repo: pathlib.Path) -> None:
863 """--merged filter on 100 branches completes in reasonable time."""
864 for i in range(100):
865 _branch(repo, f"task-{i:03d}")
866 t0 = time.perf_counter()
867 result = _branch(repo, "--merged", "--json")
868 elapsed = (time.perf_counter() - t0) * 1000
869 data = json.loads(result.output)
870 # All branches share the same commit as main → all merged
871 assert len(data) == 101
872 assert elapsed < 3000, f"--merged on 100 branches took {elapsed:.0f}ms"
873
874 def test_sort_committeddate_100_branches(self, repo: pathlib.Path) -> None:
875 for i in range(100):
876 _branch(repo, f"sort-{i:03d}")
877 result = _branch(repo, "--sort", "committeddate", "--json")
878 assert result.exit_code == 0
879 data = json.loads(result.output)
880 assert len(data) == 101
881
882 def test_concurrent_branch_list_separate_repos(self, tmp_path: pathlib.Path) -> None:
883 errors: list[str] = []
884
885 def do_branch(idx: int) -> None:
886 repo_dir = tmp_path / f"repo_{idx}"
887 repo_dir.mkdir()
888 subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True)
889 (repo_dir / "x.py").write_text(f"x={idx}\n")
890 subprocess.run(
891 ["muse", "commit", "-m", f"c{idx}"],
892 cwd=str(repo_dir), capture_output=True,
893 )
894 for j in range(5):
895 subprocess.run(
896 ["muse", "branch", f"b{j}"],
897 cwd=str(repo_dir), capture_output=True,
898 )
899 r = subprocess.run(
900 ["muse", "branch", "--json"],
901 cwd=str(repo_dir), capture_output=True, text=True,
902 )
903 if r.returncode != 0:
904 errors.append(f"repo_{idx}: branch --json failed")
905 return
906 data = json.loads(r.stdout)
907 if len(data) != 6: # main + 5
908 errors.append(f"repo_{idx}: expected 6 branches, got {len(data)}")
909
910 threads = [threading.Thread(target=do_branch, args=(i,)) for i in range(6)]
911 for t in threads:
912 t.start()
913 for t in threads:
914 t.join()
915 assert not errors, f"Concurrent branch errors:\n{'\n'.join(errors)}"
916
917 def test_deep_ancestor_chain_is_merged(self, repo: pathlib.Path) -> None:
918 """A branch with 50 ancestors is correctly detected as merged."""
919 _branch(repo, "long-chain")
920 _invoke(repo, ["checkout", "long-chain"])
921 for i in range(50):
922 (repo / f"step_{i:03d}.py").write_text(f"s={i}\n")
923 _commit(repo, "-m", f"step {i}")
924 _invoke(repo, ["checkout", "main"])
925 _invoke(repo, ["merge", "long-chain"])
926 result = _branch(repo, "--merged", "--json")
927 data = json.loads(result.output)
928 names = [b["name"] for b in data]
929 assert "long-chain" in names
930
931 def test_merged_filter_ancestor_set_computed_once(
932 self, repo: pathlib.Path
933 ) -> None:
934 """--merged must compute the 'into' ancestor set once, not once per branch.
935
936 With N branches, the naive implementation calls _commit_ancestors N times
937 for the same 'into' tip. The fix pre-computes it once and checks each
938 branch tip against the cached set.
939 """
940 from unittest.mock import patch
941 import muse.cli.commands.branch as branch_module
942
943 for i in range(20):
944 _branch(repo, f"feat-{i:02d}")
945
946 with patch.object(
947 branch_module, "_commit_ancestors", wraps=branch_module._commit_ancestors
948 ) as mock_ca:
949 result = _branch(repo, "--merged", "--json")
950
951 assert result.exit_code == 0
952 data = json.loads(result.output)
953 assert len(data) == 21 # main + 20
954
955 # The ancestor set for 'main' (the into branch) must be computed exactly once,
956 # not once per branch being checked.
957 into_calls = [c for c in mock_ca.call_args_list if c.args[1] != ""]
958 # All calls with the same commit_id (main's tip) should collapse to 1.
959 unique_commit_ids = {c.args[1] for c in mock_ca.call_args_list}
960 assert len(mock_ca.call_args_list) <= len(unique_commit_ids) + 1, (
961 f"_commit_ancestors called {len(mock_ca.call_args_list)}× but only "
962 f"{len(unique_commit_ids)} unique commit IDs — ancestor set is being "
963 "recomputed per branch instead of once"
964 )
965
966
967 # ──────────────────────────────────────────────────────────────────────────────
968 # Prune-config — remove stale [branch.*] entries from config.toml
969 # ──────────────────────────────────────────────────────────────────────────────
970
971
972 class TestPruneConfig:
973 """muse branch --prune-config removes config entries for nonexistent branches."""
974
975 def test_prune_config_removes_stale_entries(self, repo: pathlib.Path) -> None:
976 """Stale [branch.*] sections are deleted; live branches are kept."""
977 from muse.cli.config import write_branch_meta, read_branch_meta
978
979 # Write a stale entry directly — branch ref never created
980 write_branch_meta(repo, "task/stale-1", intent="stale one", resumable=True)
981 write_branch_meta(repo, "task/stale-2", intent="stale two", resumable=True)
982
983 # Create a real branch so it should be kept
984 _branch(repo, "feat/keep-me", "--intent", "keep this", "--resumable")
985
986 result = _branch(repo, "--prune-config", "--json")
987 assert result.exit_code == 0, result.output
988
989 data = json.loads(result.output)
990 assert data["pruned"] == 2, f"expected 2 pruned, got {data}"
991 assert data["kept"] >= 1 # feat/keep-me and main at minimum
992
993 # Stale entries are gone from config
994 assert read_branch_meta(repo, "task/stale-1") == {}
995 assert read_branch_meta(repo, "task/stale-2") == {}
996
997 # Live branch entry still present
998 assert read_branch_meta(repo, "feat/keep-me").get("intent") == "keep this"
999
1000 def test_prune_config_noop_when_all_live(self, repo: pathlib.Path) -> None:
1001 """Pruning a repo with no stale entries reports 0 pruned."""
1002 _branch(repo, "feat/real-branch")
1003 result = _branch(repo, "--prune-config", "--json")
1004 assert result.exit_code == 0
1005 data = json.loads(result.output)
1006 assert data["pruned"] == 0
1007
1008 def test_prune_config_dry_run(self, repo: pathlib.Path) -> None:
1009 """--prune-config --dry-run reports stale entries without removing them."""
1010 from muse.cli.config import write_branch_meta, read_branch_meta
1011
1012 write_branch_meta(repo, "task/ghost", intent="ghost", resumable=True)
1013
1014 result = _branch(repo, "--prune-config", "--dry-run", "--json")
1015 assert result.exit_code == 0
1016 data = json.loads(result.output)
1017 assert data["pruned"] == 1
1018 assert data["dry_run"] is True
1019
1020 # Entry still present — dry run made no changes
1021 assert read_branch_meta(repo, "task/ghost").get("intent") == "ghost"
File History 2 commits
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103 feat: branch --prune-config, fix hub repo delete docstrings… Sonnet 4.6 minor 4 days ago