gabriel / muse public
test_cmd_branch.py python
963 lines 41.1 KB
Raw
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29 fix(branch): guard -d --dry-run against destructive writes;… Sonnet 4.6 patch 8 days ago
1 """Tests for ``muse branch``.
2
3 Coverage tiers
4 --------------
5 Unit — parser flags, dead-code removal, helpers (_resolve_start_point,
6 _list_local_branches, _list_remotes, _upstream_for,
7 _commit_ancestors, _is_merged, _contains_commit,
8 _cleanup_empty_dirs).
9 Integration — create, delete, force-delete, rename, force-rename, copy,
10 force-copy, listing, filtering, sorting.
11 End-to-end — full CLI invocations: text and JSON output, all operations.
12 Security — ANSI injection in branch names, format flags, messages.
13 Stress — 500 branches, concurrent list, deep ancestry chains.
14 """
15
16 from __future__ import annotations
17
18 import json
19 import os
20 import pathlib
21 import subprocess
22 import threading
23 import time
24 from typing import TYPE_CHECKING
25
26 import pytest
27
28 from tests.cli_test_helper import CliRunner, InvokeResult
29 from muse.core.refs import (
30 get_head_commit_id,
31 read_current_branch,
32 )
33 from muse.core.types import short_id
34 from muse.core.paths import heads_dir, logs_dir
35
36 if TYPE_CHECKING:
37 import argparse
38
39 runner = CliRunner()
40
41 # ──────────────────────────────────────────────────────────────────────────────
42 # Helpers
43 # ──────────────────────────────────────────────────────────────────────────────
44
45
46 def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult:
47 saved = os.getcwd()
48 try:
49 os.chdir(repo)
50 return runner.invoke(None, args)
51 finally:
52 os.chdir(saved)
53
54
55 def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult:
56 return _invoke(repo, ["branch", *extra])
57
58
59 def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult:
60 return _invoke(repo, ["commit", *extra])
61
62
63 @pytest.fixture()
64 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
65 """Initialised repo with one commit on ``main``."""
66 saved = os.getcwd()
67 try:
68 os.chdir(tmp_path)
69 runner.invoke(None, ["init"])
70 finally:
71 os.chdir(saved)
72 (tmp_path / "a.py").write_text("x = 1\n")
73 _commit(tmp_path, "-m", "initial")
74 return tmp_path
75
76
77 @pytest.fixture()
78 def two_commit_repo(repo: pathlib.Path) -> pathlib.Path:
79 """Repo with two commits on ``main``."""
80 (repo / "b.py").write_text("y = 2\n")
81 _commit(repo, "-m", "second")
82 return repo
83
84
85 # ──────────────────────────────────────────────────────────────────────────────
86 # Unit — parser flags
87 # ──────────────────────────────────────────────────────────────────────────────
88
89
90 class TestRegisterFlags:
91 def _parse(self, *args: str) -> "argparse.Namespace":
92 import argparse
93
94 from muse.cli.commands.branch import register
95
96 p = argparse.ArgumentParser()
97 sub = p.add_subparsers()
98 register(sub)
99 return p.parse_args(["branch", *args])
100
101 def test_default_json_out_is_false(self) -> None:
102 ns = self._parse()
103 assert ns.json_out is False
104
105 def test_json_flag_sets_json_out(self) -> None:
106 ns = self._parse("--json")
107 assert ns.json_out is True
108
109 def test_j_shorthand_sets_json_out(self) -> None:
110 ns = self._parse("-j")
111 assert ns.json_out is True
112
113 def test_delete_flag(self) -> None:
114 ns = self._parse("-d", "foo")
115 assert ns.op == "delete"
116
117 def test_force_delete_flag(self) -> None:
118 ns = self._parse("-D", "foo")
119 assert ns.op == "force_delete"
120
121 def test_rename_flag(self) -> None:
122 ns = self._parse("-m", "new")
123 assert ns.op == "rename"
124
125 def test_force_rename_flag(self) -> None:
126 ns = self._parse("-M", "new")
127 assert ns.op == "force_rename"
128
129 def test_copy_flag(self) -> None:
130 ns = self._parse("-c", "copy")
131 assert ns.op == "copy"
132
133 def test_force_copy_flag(self) -> None:
134 ns = self._parse("-C", "copy")
135 assert ns.op == "force_copy"
136
137 def test_verbose_default_0(self) -> None:
138 ns = self._parse()
139 assert ns.verbose == 0
140
141 def test_verbose_v_is_1(self) -> None:
142 ns = self._parse("-v")
143 assert ns.verbose == 1
144
145 def test_verbose_vv_is_2(self) -> None:
146 ns = self._parse("-vv")
147 assert ns.verbose == 2
148
149 def test_remotes_flag(self) -> None:
150 ns = self._parse("-r")
151 assert ns.remotes is True
152
153 def test_all_flag(self) -> None:
154 ns = self._parse("-a")
155 assert ns.all_branches is True
156
157 def test_sort_default_name(self) -> None:
158 ns = self._parse()
159 assert ns.sort == "name"
160
161 def test_sort_committeddate(self) -> None:
162 ns = self._parse("--sort", "committeddate")
163 assert ns.sort == "committeddate"
164
165 def test_sort_invalid_rejected(self) -> None:
166 import argparse
167
168 from muse.cli.commands.branch import register
169
170 p = argparse.ArgumentParser()
171 sub = p.add_subparsers()
172 register(sub)
173 with pytest.raises(SystemExit):
174 p.parse_args(["branch", "--sort", "invalid"])
175
176
177 # ──────────────────────────────────────────────────────────────────────────────
178 # Unit — dead-code removal
179 # ──────────────────────────────────────────────────────────────────────────────
180
181
182 class TestDeadCodeRemoved:
183 def test_op_list_branch_removed(self) -> None:
184 import inspect
185
186 import muse.cli.commands.branch as m
187
188 src = inspect.getsource(m.run)
189 assert 'op == "list"' not in src, (
190 'op == "list" was a dead branch (nothing in register() creates it); must be deleted'
191 )
192
193 def test_inline_tomllib_import_removed(self) -> None:
194 import inspect
195
196 import muse.cli.commands.branch as m
197
198 src = inspect.getsource(m._upstream_for)
199 assert "import tomllib" not in src, (
200 "inline 'import tomllib' inside _upstream_for should be a module-level import"
201 )
202
203 def test_double_sanitize_removed(self) -> None:
204 """The verbose listing previously double-sanitized name_str (stripping ANSI)."""
205 import inspect
206
207 import muse.cli.commands.branch as m
208
209 src = inspect.getsource(m.run)
210 assert "sanitize_display(name_str)" not in src, (
211 "name_str was double-sanitized; the second call stripped ANSI from current branch"
212 )
213
214
215 # ──────────────────────────────────────────────────────────────────────────────
216 # Unit — _resolve_start_point
217 # ──────────────────────────────────────────────────────────────────────────────
218
219
220 class TestResolveStartPoint:
221 def test_resolves_branch_name(self, repo: pathlib.Path) -> None:
222 from muse.cli.commands.branch import _resolve_start_point
223
224 cid = get_head_commit_id(repo, "main")
225 result = _resolve_start_point(repo, "main", "main")
226 assert result == cid
227
228 def test_resolves_full_sha(self, repo: pathlib.Path) -> None:
229 from muse.cli.commands.branch import _resolve_start_point
230
231 cid = get_head_commit_id(repo, "main")
232 assert cid is not None
233 result = _resolve_start_point(repo, "main", cid)
234 assert result == cid
235
236 def test_resolves_partial_sha(self, two_commit_repo: pathlib.Path) -> None:
237 from muse.cli.commands.branch import _resolve_start_point
238 from muse.core.refs import read_current_branch
239 from muse.core.commits import get_commits_for_branch
240
241 repo = two_commit_repo
242 branch = read_current_branch(repo)
243 commits = get_commits_for_branch(repo, branch)
244 first_sha = commits[-1].commit_id # oldest commit
245 # 12-char prefix should resolve
246 result = _resolve_start_point(repo, "main", short_id(first_sha))
247 assert result == first_sha
248
249 def test_returns_input_for_unresolvable(self, repo: pathlib.Path) -> None:
250 from muse.cli.commands.branch import _resolve_start_point
251
252 result = _resolve_start_point(repo, "main", "nonexistent-ref")
253 assert result == "nonexistent-ref"
254
255
256 # ──────────────────────────────────────────────────────────────────────────────
257 # Unit — _list_local_branches
258 # ──────────────────────────────────────────────────────────────────────────────
259
260
261 class TestListLocalBranches:
262 def test_returns_sorted_list(self, repo: pathlib.Path) -> None:
263 from muse.cli.commands.branch import _list_local_branches
264
265 _branch(repo, "z-last")
266 _branch(repo, "a-first")
267 branches = _list_local_branches(repo)
268 assert branches == sorted(branches)
269
270 def test_skips_hidden_files(self, repo: pathlib.Path) -> None:
271 from muse.cli.commands.branch import _list_local_branches
272
273 # Plant a hidden lock file inside refs/heads/
274 lock = heads_dir(repo) / ".lock"
275 lock.write_text("locked")
276 branches = _list_local_branches(repo)
277 assert ".lock" not in branches
278 assert not any(b.startswith(".") for b in branches)
279
280 def test_empty_repo_returns_empty(self, tmp_path: pathlib.Path) -> None:
281 from muse.cli.commands.branch import _list_local_branches
282
283 assert _list_local_branches(tmp_path) == []
284
285 def test_includes_nested_branches(self, repo: pathlib.Path) -> None:
286 from muse.cli.commands.branch import _list_local_branches
287
288 _branch(repo, "feat/sub/task")
289 branches = _list_local_branches(repo)
290 assert "feat/sub/task" in branches
291
292
293 # ──────────────────────────────────────────────────────────────────────────────
294 # Unit — _commit_ancestors, _is_merged, _contains_commit
295 # ──────────────────────────────────────────────────────────────────────────────
296
297
298 class TestCommitGraph:
299 def test_commit_ancestors_includes_self(self, repo: pathlib.Path) -> None:
300 from muse.cli.commands.branch import _commit_ancestors
301
302 cid = get_head_commit_id(repo, "main")
303 assert cid is not None
304 ancestors = _commit_ancestors(repo, cid)
305 assert cid in ancestors
306
307 def test_is_merged_true_for_same_branch(self, repo: pathlib.Path) -> None:
308 from muse.cli.commands.branch import _is_merged
309
310 assert _is_merged(repo, "main", "main")
311
312 def test_is_merged_false_for_unmerged(self, repo: pathlib.Path) -> None:
313 from muse.cli.commands.branch import _is_merged
314
315 _branch(repo, "feat")
316 _invoke(repo, ["checkout", "feat"])
317 (repo / "c.py").write_text("c=1\n")
318 _commit(repo, "-m", "feat commit")
319 _invoke(repo, ["checkout", "main"])
320 assert not _is_merged(repo, "feat", "main")
321
322 def test_contains_commit_true(self, repo: pathlib.Path) -> None:
323 from muse.cli.commands.branch import _contains_commit
324
325 cid = get_head_commit_id(repo, "main")
326 assert cid is not None
327 assert _contains_commit(repo, "main", cid)
328
329 def test_contains_commit_false_for_unknown(self, repo: pathlib.Path) -> None:
330 from muse.cli.commands.branch import _contains_commit
331
332 assert not _contains_commit(repo, "main", "a" * 64)
333
334
335 # ──────────────────────────────────────────────────────────────────────────────
336 # Integration — CREATE
337 # ──────────────────────────────────────────────────────────────────────────────
338
339
340 class TestCreate:
341 def test_create_basic_exits_0(self, repo: pathlib.Path) -> None:
342 result = _branch(repo, "new-branch")
343 assert result.exit_code == 0
344
345 def test_create_text_output(self, repo: pathlib.Path) -> None:
346 result = _branch(repo, "my-branch")
347 assert "my-branch" in result.output
348
349 def test_create_json_schema(self, repo: pathlib.Path) -> None:
350 result = _branch(repo, "json-branch", "--json")
351 data = json.loads(result.output)
352 assert data["action"] == "created"
353 assert data["branch"] == "json-branch"
354 assert "commit_id" in data
355 assert "from" in data
356
357 def test_create_json_from_is_none_at_head(self, repo: pathlib.Path) -> None:
358 result = _branch(repo, "from-head", "--json")
359 data = json.loads(result.output)
360 assert data["from"] is None
361
362 def test_create_at_full_sha(self, two_commit_repo: pathlib.Path) -> None:
363 repo = two_commit_repo
364 from muse.core.refs import read_current_branch
365 from muse.core.commits import get_commits_for_branch
366
367 branch = read_current_branch(repo)
368 commits = get_commits_for_branch(repo, branch)
369 first_sha = commits[-1].commit_id
370
371 result = _branch(repo, "at-sha", first_sha)
372 assert result.exit_code == 0
373 tip = get_head_commit_id(repo, "at-sha")
374 assert tip == first_sha
375
376 def test_create_at_partial_sha(self, two_commit_repo: pathlib.Path) -> None:
377 repo = two_commit_repo
378 from muse.core.refs import read_current_branch
379 from muse.core.commits import get_commits_for_branch
380
381 branch = read_current_branch(repo)
382 commits = get_commits_for_branch(repo, branch)
383 first_sha = commits[-1].commit_id
384
385 result = _branch(repo, "at-partial", short_id(first_sha))
386 assert result.exit_code == 0
387 tip = get_head_commit_id(repo, "at-partial")
388 assert tip == first_sha
389
390 def test_create_at_branch_name(self, repo: pathlib.Path) -> None:
391 head_cid = get_head_commit_id(repo, "main")
392 result = _branch(repo, "copy-of-main", "main")
393 assert result.exit_code == 0
394 tip = get_head_commit_id(repo, "copy-of-main")
395 assert tip == head_cid
396
397 def test_create_json_from_field_populated(self, repo: pathlib.Path) -> None:
398 result = _branch(repo, "with-from", "main", "--json")
399 data = json.loads(result.output)
400 assert data["from"] == "main"
401
402 def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None:
403 _branch(repo, "dup")
404 result = _branch(repo, "dup")
405 assert result.exit_code == 1
406
407 def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None:
408 result = _branch(repo, "bad..name")
409 assert result.exit_code == 1
410
411 def test_create_does_not_checkout(self, repo: pathlib.Path) -> None:
412 _branch(repo, "new-but-no-switch")
413 assert read_current_branch(repo) == "main"
414
415
416 # ──────────────────────────────────────────────────────────────────────────────
417 # Integration — DELETE
418 # ──────────────────────────────────────────────────────────────────────────────
419
420
421 class TestDelete:
422 def test_delete_merged_branch_exits_0(self, repo: pathlib.Path) -> None:
423 _branch(repo, "to-delete")
424 # Branch points to same commit as main → considered merged
425 result = _branch(repo, "-d", "to-delete")
426 assert result.exit_code == 0
427
428 def test_delete_json_schema(self, repo: pathlib.Path) -> None:
429 _branch(repo, "del-json")
430 result = _branch(repo, "-d", "del-json", "--json")
431 data = json.loads(result.output)
432 assert data["action"] == "deleted"
433 assert data["branch"] == "del-json"
434 assert "was" in data
435
436 def test_delete_unmerged_exits_1_without_force(self, repo: pathlib.Path) -> None:
437 _branch(repo, "unmerged")
438 _invoke(repo, ["checkout", "unmerged"])
439 (repo / "z.py").write_text("z=1\n")
440 _commit(repo, "-m", "unmerged work")
441 _invoke(repo, ["checkout", "main"])
442 result = _branch(repo, "-d", "unmerged")
443 assert result.exit_code == 1
444
445 def test_force_delete_unmerged_exits_0(self, repo: pathlib.Path) -> None:
446 _branch(repo, "force-del")
447 _invoke(repo, ["checkout", "force-del"])
448 (repo / "x.py").write_text("x=1\n")
449 _commit(repo, "-m", "exclusive work")
450 _invoke(repo, ["checkout", "main"])
451 result = _branch(repo, "-D", "force-del")
452 assert result.exit_code == 0
453
454 def test_delete_current_branch_exits_1(self, repo: pathlib.Path) -> None:
455 result = _branch(repo, "-d", "main")
456 assert result.exit_code == 1
457
458 def test_delete_nonexistent_exits_1(self, repo: pathlib.Path) -> None:
459 result = _branch(repo, "-d", "ghost")
460 assert result.exit_code == 1
461
462 def test_delete_removes_branch_from_list(self, repo: pathlib.Path) -> None:
463 _branch(repo, "temp")
464 _branch(repo, "-d", "temp")
465 result = _branch(repo, "--json")
466 names = [b["name"] for b in json.loads(result.output)]
467 assert "temp" not in names
468
469 def test_delete_nested_branch_cleans_empty_dirs(self, repo: pathlib.Path) -> None:
470 _branch(repo, "feat/sub/task")
471 _branch(repo, "-D", "feat/sub/task")
472 # The feat/ and feat/sub/ dirs should be gone
473 feat_dir = heads_dir(repo) / "feat"
474 assert not feat_dir.exists()
475
476 def test_delete_removes_reflog_file(self, repo: pathlib.Path) -> None:
477 """Deleting a branch removes its reflog file — git-idiomatic behaviour."""
478 _invoke(repo, ["checkout", "-b", "bye"]) # checkout writes the reflog
479 _invoke(repo, ["checkout", "main"])
480 reflog = logs_dir(repo) / "refs" / "heads" / "bye"
481 assert reflog.exists(), "reflog should exist after checkout -b"
482 _branch(repo, "-d", "bye")
483 assert not reflog.exists(), "reflog must be deleted when branch is deleted"
484
485 def test_delete_nested_branch_removes_reflog_and_empty_dirs(
486 self, repo: pathlib.Path
487 ) -> None:
488 """Nested branch deletion removes reflog file and its empty parent dirs."""
489 _invoke(repo, ["checkout", "-b", "feat/ui/button"])
490 _invoke(repo, ["checkout", "main"])
491 reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "ui" / "button"
492 assert reflog.exists(), "reflog should exist after checkout -b"
493 _branch(repo, "-D", "feat/ui/button")
494 assert not reflog.exists()
495 log_feat_dir = logs_dir(repo) / "refs" / "heads" / "feat"
496 assert not log_feat_dir.exists(), "empty reflog parent dirs must be cleaned up"
497
498 def test_force_delete_also_removes_reflog(self, repo: pathlib.Path) -> None:
499 """-D (force delete) removes the reflog just like -d."""
500 _invoke(repo, ["checkout", "-b", "force-log"])
501 (repo / "tmp.py").write_text("x=1\n")
502 _commit(repo, "-m", "unmerged")
503 _invoke(repo, ["checkout", "main"])
504 reflog = logs_dir(repo) / "refs" / "heads" / "force-log"
505 assert reflog.exists()
506 _branch(repo, "-D", "force-log")
507 assert not reflog.exists()
508
509
510 # ──────────────────────────────────────────────────────────────────────────────
511 # Integration — CREATE REFLOG
512 # ──────────────────────────────────────────────────────────────────────────────
513
514
515 class TestCreateReflog:
516 def test_branch_create_writes_reflog(self, repo: pathlib.Path) -> None:
517 """muse branch -b writes a reflog entry — git-idiomatic behaviour."""
518 _branch(repo, "feat/new")
519 reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "new"
520 assert reflog.exists(), "reflog must exist after muse branch -b"
521
522 def test_branch_create_reflog_contains_branch_created(self, repo: pathlib.Path) -> None:
523 """Reflog entry records a 'branch: Created' operation."""
524 _branch(repo, "task/thing")
525 reflog = logs_dir(repo) / "refs" / "heads" / "task" / "thing"
526 content = reflog.read_text(encoding="utf-8")
527 assert "branch: Created" in content
528
529 def test_branch_create_reflog_records_start_point(self, repo: pathlib.Path) -> None:
530 """Reflog entry for a branch created from another branch names that source."""
531 _branch(repo, "task/from-main", "main")
532 reflog = logs_dir(repo) / "refs" / "heads" / "task" / "from-main"
533 content = reflog.read_text(encoding="utf-8")
534 assert "main" in content
535
536
537 # ──────────────────────────────────────────────────────────────────────────────
538 # Integration — RENAME
539 # ──────────────────────────────────────────────────────────────────────────────
540
541
542 class TestRename:
543 def test_rename_basic(self, repo: pathlib.Path) -> None:
544 _branch(repo, "old-name")
545 result = _branch(repo, "-m", "old-name", "new-name")
546 assert result.exit_code == 0
547 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
548 assert "new-name" in names
549 assert "old-name" not in names
550
551 def test_rename_omit_old_uses_current(self, repo: pathlib.Path) -> None:
552 _branch(repo, "temp")
553 _invoke(repo, ["checkout", "temp"])
554 result = _branch(repo, "-m", "renamed")
555 assert result.exit_code == 0
556 assert read_current_branch(repo) == "renamed"
557 _invoke(repo, ["checkout", "main"])
558
559 def test_rename_json_schema(self, repo: pathlib.Path) -> None:
560 _branch(repo, "src")
561 result = _branch(repo, "-m", "src", "dst", "--json")
562 data = json.loads(result.output)
563 assert data["action"] == "renamed"
564 assert data["from"] == "src"
565 assert data["to"] == "dst"
566
567 def test_rename_to_existing_exits_1(self, repo: pathlib.Path) -> None:
568 _branch(repo, "a")
569 _branch(repo, "b")
570 result = _branch(repo, "-m", "a", "b")
571 assert result.exit_code == 1
572
573 def test_force_rename_to_existing_exits_0(self, repo: pathlib.Path) -> None:
574 _branch(repo, "a")
575 _branch(repo, "b")
576 result = _branch(repo, "-M", "a", "b")
577 assert result.exit_code == 0
578
579 def test_rename_updates_head_when_current(self, repo: pathlib.Path) -> None:
580 _branch(repo, "temp2")
581 _invoke(repo, ["checkout", "temp2"])
582 _branch(repo, "-m", "temp2", "newname")
583 assert read_current_branch(repo) == "newname"
584 _invoke(repo, ["checkout", "main"])
585
586 def test_rename_nonexistent_exits_1(self, repo: pathlib.Path) -> None:
587 result = _branch(repo, "-m", "ghost", "newname")
588 assert result.exit_code == 1
589
590
591 # ──────────────────────────────────────────────────────────────────────────────
592 # Integration — COPY
593 # ──────────────────────────────────────────────────────────────────────────────
594
595
596 class TestCopy:
597 def test_copy_basic(self, repo: pathlib.Path) -> None:
598 _branch(repo, "orig")
599 result = _branch(repo, "-c", "orig", "clone")
600 assert result.exit_code == 0
601 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)]
602 assert "orig" in names
603 assert "clone" in names
604
605 def test_copy_same_tip(self, repo: pathlib.Path) -> None:
606 _branch(repo, "src")
607 _branch(repo, "-c", "src", "dst")
608 tip_src = get_head_commit_id(repo, "src")
609 tip_dst = get_head_commit_id(repo, "dst")
610 assert tip_src == tip_dst
611
612 def test_copy_json_schema(self, repo: pathlib.Path) -> None:
613 _branch(repo, "original")
614 result = _branch(repo, "-c", "original", "copy1", "--json")
615 data = json.loads(result.output)
616 assert data["action"] == "copied"
617 assert data["from"] == "original"
618 assert data["to"] == "copy1"
619
620 def test_copy_to_existing_exits_1(self, repo: pathlib.Path) -> None:
621 _branch(repo, "x")
622 _branch(repo, "y")
623 result = _branch(repo, "-c", "x", "y")
624 assert result.exit_code == 1
625
626 def test_force_copy_to_existing_exits_0(self, repo: pathlib.Path) -> None:
627 _branch(repo, "p")
628 _branch(repo, "q")
629 result = _branch(repo, "-C", "p", "q")
630 assert result.exit_code == 0
631
632 def test_copy_omit_src_uses_current(self, repo: pathlib.Path) -> None:
633 head = get_head_commit_id(repo, "main")
634 result = _branch(repo, "-c", "main-copy")
635 assert result.exit_code == 0
636 tip = get_head_commit_id(repo, "main-copy")
637 assert tip == head
638
639
640 # ──────────────────────────────────────────────────────────────────────────────
641 # Integration — LIST
642 # ──────────────────────────────────────────────────────────────────────────────
643
644
645 class TestList:
646 def test_list_text_exits_0(self, repo: pathlib.Path) -> None:
647 result = _branch(repo)
648 assert result.exit_code == 0
649
650 def test_list_contains_main(self, repo: pathlib.Path) -> None:
651 result = _branch(repo)
652 assert "main" in result.output
653
654 def test_list_marks_current_branch(self, repo: pathlib.Path) -> None:
655 result = _branch(repo)
656 # Current branch line must start with "* "
657 current_lines = [l for l in result.output.splitlines() if l.startswith("* ")]
658 assert len(current_lines) == 1
659 assert "main" in current_lines[0]
660
661 def test_list_json_schema(self, repo: pathlib.Path) -> None:
662 result = _branch(repo, "--json")
663 data = json.loads(result.output)
664 assert isinstance(data, list)
665 assert len(data) >= 1
666 keys = set(data[0].keys())
667 assert {"name", "current", "commit_id", "last_message", "upstream"} <= keys
668
669 def test_list_json_current_flag(self, repo: pathlib.Path) -> None:
670 result = _branch(repo, "--json")
671 data = json.loads(result.output)
672 current = [b for b in data if b["current"]]
673 assert len(current) == 1
674 assert current[0]["name"] == "main"
675
676 def test_list_json_last_message_populated(self, repo: pathlib.Path) -> None:
677 result = _branch(repo, "--json")
678 data = json.loads(result.output)
679 main_entry = next(b for b in data if b["name"] == "main")
680 assert main_entry["last_message"] is not None
681 assert "initial" in main_entry["last_message"]
682
683 def test_list_json_upstream_null_by_default(self, repo: pathlib.Path) -> None:
684 result = _branch(repo, "--json")
685 data = json.loads(result.output)
686 main_entry = next(b for b in data if b["name"] == "main")
687 assert main_entry["upstream"] is None
688
689 def test_list_verbose_shows_sha(self, repo: pathlib.Path) -> None:
690 result = _branch(repo, "-v")
691 # Short SHA should appear
692 cid = get_head_commit_id(repo, "main")
693 assert cid is not None
694 assert cid[:8] in result.output
695
696 def test_list_verbose_shows_message(self, repo: pathlib.Path) -> None:
697 result = _branch(repo, "-v")
698 assert "initial" in result.output
699
700 def test_list_multiple_branches(self, repo: pathlib.Path) -> None:
701 _branch(repo, "feat/a")
702 _branch(repo, "feat/b")
703 result = _branch(repo, "--json")
704 data = json.loads(result.output)
705 names = [b["name"] for b in data]
706 assert "feat/a" in names
707 assert "feat/b" in names
708
709 def test_list_sorted_by_name(self, repo: pathlib.Path) -> None:
710 _branch(repo, "z-last")
711 _branch(repo, "a-first")
712 result = _branch(repo, "--json")
713 data = json.loads(result.output)
714 names = [b["name"] for b in data]
715 assert names == sorted(names)
716
717 def test_list_sort_committeddate(self, repo: pathlib.Path) -> None:
718 _branch(repo, "feat-x")
719 result = _branch(repo, "--sort", "committeddate", "--json")
720 assert result.exit_code == 0
721 data = json.loads(result.output)
722 assert isinstance(data, list)
723
724
725 # ──────────────────────────────────────────────────────────────────────────────
726 # Integration — FILTERS
727 # ──────────────────────────────────────────────────────────────────────────────
728
729
730 class TestFilters:
731 def test_merged_filter_includes_self(self, repo: pathlib.Path) -> None:
732 result = _branch(repo, "--merged", "--json")
733 data = json.loads(result.output)
734 names = [b["name"] for b in data]
735 assert "main" in names
736
737 def test_merged_filter_excludes_unmerged(self, repo: pathlib.Path) -> None:
738 _branch(repo, "unmerged-feat")
739 _invoke(repo, ["checkout", "unmerged-feat"])
740 (repo / "u.py").write_text("u=1\n")
741 _commit(repo, "-m", "unmerged")
742 _invoke(repo, ["checkout", "main"])
743 result = _branch(repo, "--merged", "--json")
744 data = json.loads(result.output)
745 names = [b["name"] for b in data]
746 assert "unmerged-feat" not in names
747
748 def test_no_merged_filter_includes_unmerged(self, repo: pathlib.Path) -> None:
749 _branch(repo, "exclusive-feat")
750 _invoke(repo, ["checkout", "exclusive-feat"])
751 (repo / "e.py").write_text("e=1\n")
752 _commit(repo, "-m", "exclusive")
753 _invoke(repo, ["checkout", "main"])
754 result = _branch(repo, "--no-merged", "--json")
755 data = json.loads(result.output)
756 names = [b["name"] for b in data]
757 assert "exclusive-feat" in names
758
759 def test_no_merged_filter_excludes_self(self, repo: pathlib.Path) -> None:
760 result = _branch(repo, "--no-merged", "--json")
761 data = json.loads(result.output)
762 names = [b["name"] for b in data]
763 assert "main" not in names
764
765 def test_contains_commit_filter(self, repo: pathlib.Path) -> None:
766 cid = get_head_commit_id(repo, "main")
767 assert cid is not None
768 result = _branch(repo, "--contains", cid, "--json")
769 data = json.loads(result.output)
770 names = [b["name"] for b in data]
771 assert "main" in names
772
773 def test_contains_unknown_commit_empty(self, repo: pathlib.Path) -> None:
774 result = _branch(repo, "--contains", "a" * 64, "--json")
775 data = json.loads(result.output)
776 assert data == []
777
778
779 # ──────────────────────────────────────────────────────────────────────────────
780 # Integration — validation
781 # ──────────────────────────────────────────────────────────────────────────────
782
783
784 class TestValidation:
785 def test_ansi_in_pattern_arg_sanitized(self, repo: pathlib.Path) -> None:
786 result = _branch(repo, "--pattern", "\x1b[31mxml\x1b[0m")
787 assert "\x1b" not in result.output
788
789 def test_delete_without_name_exits_1(self, repo: pathlib.Path) -> None:
790 result = _branch(repo, "-d")
791 assert result.exit_code == 1
792
793 def test_rename_too_many_args_exits_1(self, repo: pathlib.Path) -> None:
794 result = _branch(repo, "-m", "a", "b", "c")
795 assert result.exit_code == 1
796
797 def test_copy_too_many_args_exits_1(self, repo: pathlib.Path) -> None:
798 result = _branch(repo, "-c", "a", "b", "c")
799 assert result.exit_code == 1
800
801
802 # ──────────────────────────────────────────────────────────────────────────────
803 # Security — ANSI injection
804 # ──────────────────────────────────────────────────────────────────────────────
805
806
807 class TestSecurityAnsi:
808 def _has_ansi(self, s: str) -> bool:
809 return "\x1b[" in s
810
811 def test_ansi_in_branch_name_rejected(self, repo: pathlib.Path) -> None:
812 result = _branch(repo, "\x1b[31mmalicious\x1b[0m")
813 assert result.exit_code == 1
814 assert not self._has_ansi(result.output)
815
816 def test_ansi_in_delete_name_rejected(self, repo: pathlib.Path) -> None:
817 result = _branch(repo, "-d", "\x1b[31mmalicious\x1b[0m")
818 assert result.exit_code == 1
819 assert not self._has_ansi(result.output)
820
821 def test_ansi_in_rename_new_name_rejected(self, repo: pathlib.Path) -> None:
822 result = _branch(repo, "-m", "\x1b[31mnew\x1b[0m")
823 assert result.exit_code == 1
824 assert not self._has_ansi(result.output)
825
826 def test_ansi_in_contains_arg_sanitized(self, repo: pathlib.Path) -> None:
827 result = _branch(repo, "--contains", "\x1b[31mxml\x1b[0m")
828 assert not self._has_ansi(result.output)
829
830 def test_ansi_in_contains_commit_id(self, repo: pathlib.Path) -> None:
831 result = _branch(repo, "--contains", "\x1b[31mmalicious\x1b[0m")
832 # Should exit 0 (no match, empty list) or exit 0 with empty list
833 # Either way, ANSI must not appear in output
834 assert not self._has_ansi(result.output)
835
836 def test_errors_go_to_stderr(self, repo: pathlib.Path) -> None:
837 result = _branch(repo, "-d", "nonexistent")
838 assert result.exit_code == 1
839 # Error should NOT appear in stdout
840 assert "not found" not in result.output.lower() or (result.stderr and "not found" in result.stderr.lower())
841
842
843 # ──────────────────────────────────────────────────────────────────────────────
844 # Stress
845 # ──────────────────────────────────────────────────────────────────────────────
846
847
848 @pytest.mark.slow
849 class TestStress:
850 def test_list_500_branches_fast(self, repo: pathlib.Path) -> None:
851 """Listing 500 branches must complete in under 2 seconds."""
852 for i in range(500):
853 _branch(repo, f"feat/task-{i:04d}")
854 t0 = time.perf_counter()
855 result = _branch(repo, "--json")
856 elapsed = (time.perf_counter() - t0) * 1000
857 data = json.loads(result.output)
858 assert len(data) == 501 # main + 500
859 assert elapsed < 2000, f"list 500 branches took {elapsed:.0f}ms (limit 2000ms)"
860
861 def test_merged_filter_100_branches(self, repo: pathlib.Path) -> None:
862 """--merged filter on 100 branches completes in reasonable time."""
863 for i in range(100):
864 _branch(repo, f"task-{i:03d}")
865 t0 = time.perf_counter()
866 result = _branch(repo, "--merged", "--json")
867 elapsed = (time.perf_counter() - t0) * 1000
868 data = json.loads(result.output)
869 # All branches share the same commit as main → all merged
870 assert len(data) == 101
871 assert elapsed < 3000, f"--merged on 100 branches took {elapsed:.0f}ms"
872
873 def test_sort_committeddate_100_branches(self, repo: pathlib.Path) -> None:
874 for i in range(100):
875 _branch(repo, f"sort-{i:03d}")
876 result = _branch(repo, "--sort", "committeddate", "--json")
877 assert result.exit_code == 0
878 data = json.loads(result.output)
879 assert len(data) == 101
880
881 def test_concurrent_branch_list_separate_repos(self, tmp_path: pathlib.Path) -> None:
882 errors: list[str] = []
883
884 def do_branch(idx: int) -> None:
885 repo_dir = tmp_path / f"repo_{idx}"
886 repo_dir.mkdir()
887 subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True)
888 (repo_dir / "x.py").write_text(f"x={idx}\n")
889 subprocess.run(
890 ["muse", "commit", "-m", f"c{idx}"],
891 cwd=str(repo_dir), capture_output=True,
892 )
893 for j in range(5):
894 subprocess.run(
895 ["muse", "branch", f"b{j}"],
896 cwd=str(repo_dir), capture_output=True,
897 )
898 r = subprocess.run(
899 ["muse", "branch", "--json"],
900 cwd=str(repo_dir), capture_output=True, text=True,
901 )
902 if r.returncode != 0:
903 errors.append(f"repo_{idx}: branch --json failed")
904 return
905 data = json.loads(r.stdout)
906 if len(data) != 6: # main + 5
907 errors.append(f"repo_{idx}: expected 6 branches, got {len(data)}")
908
909 threads = [threading.Thread(target=do_branch, args=(i,)) for i in range(6)]
910 for t in threads:
911 t.start()
912 for t in threads:
913 t.join()
914 assert not errors, f"Concurrent branch errors:\n{'\n'.join(errors)}"
915
916 def test_deep_ancestor_chain_is_merged(self, repo: pathlib.Path) -> None:
917 """A branch with 50 ancestors is correctly detected as merged."""
918 _branch(repo, "long-chain")
919 _invoke(repo, ["checkout", "long-chain"])
920 for i in range(50):
921 (repo / f"step_{i:03d}.py").write_text(f"s={i}\n")
922 _commit(repo, "-m", f"step {i}")
923 _invoke(repo, ["checkout", "main"])
924 _invoke(repo, ["merge", "long-chain"])
925 result = _branch(repo, "--merged", "--json")
926 data = json.loads(result.output)
927 names = [b["name"] for b in data]
928 assert "long-chain" in names
929
930 def test_merged_filter_ancestor_set_computed_once(
931 self, repo: pathlib.Path
932 ) -> None:
933 """--merged must compute the 'into' ancestor set once, not once per branch.
934
935 With N branches, the naive implementation calls _commit_ancestors N times
936 for the same 'into' tip. The fix pre-computes it once and checks each
937 branch tip against the cached set.
938 """
939 from unittest.mock import patch
940 import muse.cli.commands.branch as branch_module
941
942 for i in range(20):
943 _branch(repo, f"feat-{i:02d}")
944
945 with patch.object(
946 branch_module, "_commit_ancestors", wraps=branch_module._commit_ancestors
947 ) as mock_ca:
948 result = _branch(repo, "--merged", "--json")
949
950 assert result.exit_code == 0
951 data = json.loads(result.output)
952 assert len(data) == 21 # main + 20
953
954 # The ancestor set for 'main' (the into branch) must be computed exactly once,
955 # not once per branch being checked.
956 into_calls = [c for c in mock_ca.call_args_list if c.args[1] != ""]
957 # All calls with the same commit_id (main's tip) should collapse to 1.
958 unique_commit_ids = {c.args[1] for c in mock_ca.call_args_list}
959 assert len(mock_ca.call_args_list) <= len(unique_commit_ids) + 1, (
960 f"_commit_ancestors called {len(mock_ca.call_args_list)}× but only "
961 f"{len(unique_commit_ids)} unique commit IDs — ancestor set is being "
962 "recomputed per branch instead of once"
963 )
File History 3 commits
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29 fix(branch): guard -d --dry-run against destructive writes;… Sonnet 4.6 patch 8 days ago
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103 feat: branch --prune-config, fix hub repo delete docstrings… Sonnet 4.6 minor 15 days ago