test_cmd_branch.py
python
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29
fix(branch): guard -d --dry-run against destructive writes;…
Sonnet 4.6
patch
9 days ago
| 1 | """Tests for ``muse branch``. |
| 2 | |
| 3 | Coverage tiers |
| 4 | -------------- |
| 5 | Unit — parser flags, dead-code removal, helpers (_resolve_start_point, |
| 6 | _list_local_branches, _list_remotes, _upstream_for, |
| 7 | _commit_ancestors, _is_merged, _contains_commit, |
| 8 | _cleanup_empty_dirs). |
| 9 | Integration — create, delete, force-delete, rename, force-rename, copy, |
| 10 | force-copy, listing, filtering, sorting. |
| 11 | End-to-end — full CLI invocations: text and JSON output, all operations. |
| 12 | Security — ANSI injection in branch names, format flags, messages. |
| 13 | Stress — 500 branches, concurrent list, deep ancestry chains. |
| 14 | """ |
| 15 | |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import json |
| 19 | import os |
| 20 | import pathlib |
| 21 | import subprocess |
| 22 | import threading |
| 23 | import time |
| 24 | from typing import TYPE_CHECKING |
| 25 | |
| 26 | import pytest |
| 27 | |
| 28 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 29 | from muse.core.store import get_head_commit_id, read_current_branch |
| 30 | |
| 31 | if TYPE_CHECKING: |
| 32 | import argparse |
| 33 | |
| 34 | runner = CliRunner() |
| 35 | |
| 36 | # ────────────────────────────────────────────────────────────────────────────── |
| 37 | # Helpers |
| 38 | # ────────────────────────────────────────────────────────────────────────────── |
| 39 | |
| 40 | |
| 41 | def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: |
| 42 | saved = os.getcwd() |
| 43 | try: |
| 44 | os.chdir(repo) |
| 45 | return runner.invoke(None, args) |
| 46 | finally: |
| 47 | os.chdir(saved) |
| 48 | |
| 49 | |
| 50 | def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 51 | return _invoke(repo, ["branch", *extra]) |
| 52 | |
| 53 | |
| 54 | def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 55 | return _invoke(repo, ["commit", *extra]) |
| 56 | |
| 57 | |
| 58 | @pytest.fixture() |
| 59 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 60 | """Initialised repo with one commit on ``main``.""" |
| 61 | saved = os.getcwd() |
| 62 | try: |
| 63 | os.chdir(tmp_path) |
| 64 | runner.invoke(None, ["init"]) |
| 65 | finally: |
| 66 | os.chdir(saved) |
| 67 | (tmp_path / "a.py").write_text("x = 1\n") |
| 68 | _commit(tmp_path, "-m", "initial") |
| 69 | return tmp_path |
| 70 | |
| 71 | |
| 72 | @pytest.fixture() |
| 73 | def two_commit_repo(repo: pathlib.Path) -> pathlib.Path: |
| 74 | """Repo with two commits on ``main``.""" |
| 75 | (repo / "b.py").write_text("y = 2\n") |
| 76 | _commit(repo, "-m", "second") |
| 77 | return repo |
| 78 | |
| 79 | |
| 80 | # ────────────────────────────────────────────────────────────────────────────── |
| 81 | # Unit — parser flags |
| 82 | # ────────────────────────────────────────────────────────────────────────────── |
| 83 | |
| 84 | |
| 85 | class TestRegisterFlags: |
| 86 | def _parse(self, *args: str) -> "argparse.Namespace": |
| 87 | import argparse |
| 88 | |
| 89 | from muse.cli.commands.branch import register |
| 90 | |
| 91 | p = argparse.ArgumentParser() |
| 92 | sub = p.add_subparsers() |
| 93 | register(sub) |
| 94 | return p.parse_args(["branch", *args]) |
| 95 | |
| 96 | def test_default_fmt_is_text(self) -> None: |
| 97 | ns = self._parse() |
| 98 | assert ns.fmt == "text" |
| 99 | |
| 100 | def test_json_flag_sets_fmt(self) -> None: |
| 101 | ns = self._parse("--json") |
| 102 | assert ns.fmt == "json" |
| 103 | |
| 104 | def test_format_json_flag(self) -> None: |
| 105 | ns = self._parse("--format", "json") |
| 106 | assert ns.fmt == "json" |
| 107 | |
| 108 | def test_delete_flag(self) -> None: |
| 109 | ns = self._parse("-d", "foo") |
| 110 | assert ns.op == "delete" |
| 111 | |
| 112 | def test_force_delete_flag(self) -> None: |
| 113 | ns = self._parse("-D", "foo") |
| 114 | assert ns.op == "force_delete" |
| 115 | |
| 116 | def test_rename_flag(self) -> None: |
| 117 | ns = self._parse("-m", "new") |
| 118 | assert ns.op == "rename" |
| 119 | |
| 120 | def test_force_rename_flag(self) -> None: |
| 121 | ns = self._parse("-M", "new") |
| 122 | assert ns.op == "force_rename" |
| 123 | |
| 124 | def test_copy_flag(self) -> None: |
| 125 | ns = self._parse("-c", "copy") |
| 126 | assert ns.op == "copy" |
| 127 | |
| 128 | def test_force_copy_flag(self) -> None: |
| 129 | ns = self._parse("-C", "copy") |
| 130 | assert ns.op == "force_copy" |
| 131 | |
| 132 | def test_verbose_default_0(self) -> None: |
| 133 | ns = self._parse() |
| 134 | assert ns.verbose == 0 |
| 135 | |
| 136 | def test_verbose_v_is_1(self) -> None: |
| 137 | ns = self._parse("-v") |
| 138 | assert ns.verbose == 1 |
| 139 | |
| 140 | def test_verbose_vv_is_2(self) -> None: |
| 141 | ns = self._parse("-vv") |
| 142 | assert ns.verbose == 2 |
| 143 | |
| 144 | def test_remotes_flag(self) -> None: |
| 145 | ns = self._parse("-r") |
| 146 | assert ns.remotes is True |
| 147 | |
| 148 | def test_all_flag(self) -> None: |
| 149 | ns = self._parse("-a") |
| 150 | assert ns.all_branches is True |
| 151 | |
| 152 | def test_sort_default_name(self) -> None: |
| 153 | ns = self._parse() |
| 154 | assert ns.sort == "name" |
| 155 | |
| 156 | def test_sort_committeddate(self) -> None: |
| 157 | ns = self._parse("--sort", "committeddate") |
| 158 | assert ns.sort == "committeddate" |
| 159 | |
| 160 | def test_sort_invalid_rejected(self) -> None: |
| 161 | import argparse |
| 162 | |
| 163 | from muse.cli.commands.branch import register |
| 164 | |
| 165 | p = argparse.ArgumentParser() |
| 166 | sub = p.add_subparsers() |
| 167 | register(sub) |
| 168 | with pytest.raises(SystemExit): |
| 169 | p.parse_args(["branch", "--sort", "invalid"]) |
| 170 | |
| 171 | |
| 172 | # ────────────────────────────────────────────────────────────────────────────── |
| 173 | # Unit — dead-code removal |
| 174 | # ────────────────────────────────────────────────────────────────────────────── |
| 175 | |
| 176 | |
| 177 | class TestDeadCodeRemoved: |
| 178 | def test_op_list_branch_removed(self) -> None: |
| 179 | import inspect |
| 180 | |
| 181 | import muse.cli.commands.branch as m |
| 182 | |
| 183 | src = inspect.getsource(m.run) |
| 184 | assert 'op == "list"' not in src, ( |
| 185 | 'op == "list" was a dead branch (nothing in register() creates it); must be deleted' |
| 186 | ) |
| 187 | |
| 188 | def test_inline_tomllib_import_removed(self) -> None: |
| 189 | import inspect |
| 190 | |
| 191 | import muse.cli.commands.branch as m |
| 192 | |
| 193 | src = inspect.getsource(m._upstream_for) |
| 194 | assert "import tomllib" not in src, ( |
| 195 | "inline 'import tomllib' inside _upstream_for should be a module-level import" |
| 196 | ) |
| 197 | |
| 198 | def test_double_sanitize_removed(self) -> None: |
| 199 | """The verbose listing previously double-sanitized name_str (stripping ANSI).""" |
| 200 | import inspect |
| 201 | |
| 202 | import muse.cli.commands.branch as m |
| 203 | |
| 204 | src = inspect.getsource(m.run) |
| 205 | assert "sanitize_display(name_str)" not in src, ( |
| 206 | "name_str was double-sanitized; the second call stripped ANSI from current branch" |
| 207 | ) |
| 208 | |
| 209 | |
| 210 | # ────────────────────────────────────────────────────────────────────────────── |
| 211 | # Unit — _resolve_start_point |
| 212 | # ────────────────────────────────────────────────────────────────────────────── |
| 213 | |
| 214 | |
| 215 | class TestResolveStartPoint: |
| 216 | def test_resolves_branch_name(self, repo: pathlib.Path) -> None: |
| 217 | from muse.cli.commands.branch import _resolve_start_point |
| 218 | from muse.core.repo import read_repo_id |
| 219 | |
| 220 | repo_id = read_repo_id(repo) |
| 221 | cid = get_head_commit_id(repo, "main") |
| 222 | result = _resolve_start_point(repo, repo_id, "main", "main") |
| 223 | assert result == cid |
| 224 | |
| 225 | def test_resolves_full_sha(self, repo: pathlib.Path) -> None: |
| 226 | from muse.cli.commands.branch import _resolve_start_point |
| 227 | from muse.core.repo import read_repo_id |
| 228 | |
| 229 | repo_id = read_repo_id(repo) |
| 230 | cid = get_head_commit_id(repo, "main") |
| 231 | assert cid is not None |
| 232 | result = _resolve_start_point(repo, repo_id, "main", cid) |
| 233 | assert result == cid |
| 234 | |
| 235 | def test_resolves_partial_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 236 | from muse.cli.commands.branch import _resolve_start_point |
| 237 | from muse.core.repo import read_repo_id |
| 238 | from muse.core.store import get_commits_for_branch, read_current_branch |
| 239 | |
| 240 | repo = two_commit_repo |
| 241 | repo_id = read_repo_id(repo) |
| 242 | branch = read_current_branch(repo) |
| 243 | commits = get_commits_for_branch(repo, repo_id, branch) |
| 244 | first_sha = commits[-1].commit_id # oldest commit |
| 245 | # 12-char prefix should resolve |
| 246 | result = _resolve_start_point(repo, repo_id, "main", first_sha[:12]) |
| 247 | assert result == first_sha |
| 248 | |
| 249 | def test_returns_input_for_unresolvable(self, repo: pathlib.Path) -> None: |
| 250 | from muse.cli.commands.branch import _resolve_start_point |
| 251 | from muse.core.repo import read_repo_id |
| 252 | |
| 253 | repo_id = read_repo_id(repo) |
| 254 | result = _resolve_start_point(repo, repo_id, "main", "nonexistent-ref") |
| 255 | assert result == "nonexistent-ref" |
| 256 | |
| 257 | |
| 258 | # ────────────────────────────────────────────────────────────────────────────── |
| 259 | # Unit — _list_local_branches |
| 260 | # ────────────────────────────────────────────────────────────────────────────── |
| 261 | |
| 262 | |
| 263 | class TestListLocalBranches: |
| 264 | def test_returns_sorted_list(self, repo: pathlib.Path) -> None: |
| 265 | from muse.cli.commands.branch import _list_local_branches |
| 266 | |
| 267 | _branch(repo, "z-last") |
| 268 | _branch(repo, "a-first") |
| 269 | branches = _list_local_branches(repo) |
| 270 | assert branches == sorted(branches) |
| 271 | |
| 272 | def test_skips_hidden_files(self, repo: pathlib.Path) -> None: |
| 273 | from muse.cli.commands.branch import _list_local_branches |
| 274 | |
| 275 | # Plant a hidden lock file inside refs/heads/ |
| 276 | lock = repo / ".muse" / "refs" / "heads" / ".lock" |
| 277 | lock.write_text("locked") |
| 278 | branches = _list_local_branches(repo) |
| 279 | assert ".lock" not in branches |
| 280 | assert not any(b.startswith(".") for b in branches) |
| 281 | |
| 282 | def test_empty_repo_returns_empty(self, tmp_path: pathlib.Path) -> None: |
| 283 | from muse.cli.commands.branch import _list_local_branches |
| 284 | |
| 285 | assert _list_local_branches(tmp_path) == [] |
| 286 | |
| 287 | def test_includes_nested_branches(self, repo: pathlib.Path) -> None: |
| 288 | from muse.cli.commands.branch import _list_local_branches |
| 289 | |
| 290 | _branch(repo, "feat/sub/task") |
| 291 | branches = _list_local_branches(repo) |
| 292 | assert "feat/sub/task" in branches |
| 293 | |
| 294 | |
| 295 | # ────────────────────────────────────────────────────────────────────────────── |
| 296 | # Unit — _commit_ancestors, _is_merged, _contains_commit |
| 297 | # ────────────────────────────────────────────────────────────────────────────── |
| 298 | |
| 299 | |
| 300 | class TestCommitGraph: |
| 301 | def test_commit_ancestors_includes_self(self, repo: pathlib.Path) -> None: |
| 302 | from muse.cli.commands.branch import _commit_ancestors |
| 303 | |
| 304 | cid = get_head_commit_id(repo, "main") |
| 305 | assert cid is not None |
| 306 | ancestors = _commit_ancestors(repo, cid) |
| 307 | assert cid in ancestors |
| 308 | |
| 309 | def test_is_merged_true_for_same_branch(self, repo: pathlib.Path) -> None: |
| 310 | from muse.cli.commands.branch import _is_merged |
| 311 | |
| 312 | assert _is_merged(repo, "main", "main") |
| 313 | |
| 314 | def test_is_merged_false_for_unmerged(self, repo: pathlib.Path) -> None: |
| 315 | from muse.cli.commands.branch import _is_merged |
| 316 | |
| 317 | _branch(repo, "feat") |
| 318 | _invoke(repo, ["checkout", "feat"]) |
| 319 | (repo / "c.py").write_text("c=1\n") |
| 320 | _commit(repo, "-m", "feat commit") |
| 321 | _invoke(repo, ["checkout", "main"]) |
| 322 | assert not _is_merged(repo, "feat", "main") |
| 323 | |
| 324 | def test_contains_commit_true(self, repo: pathlib.Path) -> None: |
| 325 | from muse.cli.commands.branch import _contains_commit |
| 326 | |
| 327 | cid = get_head_commit_id(repo, "main") |
| 328 | assert cid is not None |
| 329 | assert _contains_commit(repo, "main", cid) |
| 330 | |
| 331 | def test_contains_commit_false_for_unknown(self, repo: pathlib.Path) -> None: |
| 332 | from muse.cli.commands.branch import _contains_commit |
| 333 | |
| 334 | assert not _contains_commit(repo, "main", "a" * 64) |
| 335 | |
| 336 | |
| 337 | # ────────────────────────────────────────────────────────────────────────────── |
| 338 | # Integration — CREATE |
| 339 | # ────────────────────────────────────────────────────────────────────────────── |
| 340 | |
| 341 | |
| 342 | class TestCreate: |
| 343 | def test_create_basic_exits_0(self, repo: pathlib.Path) -> None: |
| 344 | result = _branch(repo, "new-branch") |
| 345 | assert result.exit_code == 0 |
| 346 | |
| 347 | def test_create_text_output(self, repo: pathlib.Path) -> None: |
| 348 | result = _branch(repo, "my-branch") |
| 349 | assert "my-branch" in result.output |
| 350 | |
| 351 | def test_create_json_schema(self, repo: pathlib.Path) -> None: |
| 352 | result = _branch(repo, "json-branch", "--json") |
| 353 | data = json.loads(result.output) |
| 354 | assert data["action"] == "created" |
| 355 | assert data["branch"] == "json-branch" |
| 356 | assert "commit_id" in data |
| 357 | assert "from" in data |
| 358 | |
| 359 | def test_create_json_from_is_none_at_head(self, repo: pathlib.Path) -> None: |
| 360 | result = _branch(repo, "from-head", "--json") |
| 361 | data = json.loads(result.output) |
| 362 | assert data["from"] is None |
| 363 | |
| 364 | def test_create_at_full_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 365 | repo = two_commit_repo |
| 366 | from muse.core.store import get_commits_for_branch, read_current_branch |
| 367 | from muse.core.repo import read_repo_id |
| 368 | |
| 369 | repo_id = read_repo_id(repo) |
| 370 | branch = read_current_branch(repo) |
| 371 | commits = get_commits_for_branch(repo, repo_id, branch) |
| 372 | first_sha = commits[-1].commit_id |
| 373 | |
| 374 | result = _branch(repo, "at-sha", first_sha) |
| 375 | assert result.exit_code == 0 |
| 376 | tip = get_head_commit_id(repo, "at-sha") |
| 377 | assert tip == first_sha |
| 378 | |
| 379 | def test_create_at_partial_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 380 | repo = two_commit_repo |
| 381 | from muse.core.store import get_commits_for_branch, read_current_branch |
| 382 | from muse.core.repo import read_repo_id |
| 383 | |
| 384 | repo_id = read_repo_id(repo) |
| 385 | branch = read_current_branch(repo) |
| 386 | commits = get_commits_for_branch(repo, repo_id, branch) |
| 387 | first_sha = commits[-1].commit_id |
| 388 | |
| 389 | result = _branch(repo, "at-partial", first_sha[:12]) |
| 390 | assert result.exit_code == 0 |
| 391 | tip = get_head_commit_id(repo, "at-partial") |
| 392 | assert tip == first_sha |
| 393 | |
| 394 | def test_create_at_branch_name(self, repo: pathlib.Path) -> None: |
| 395 | head_cid = get_head_commit_id(repo, "main") |
| 396 | result = _branch(repo, "copy-of-main", "main") |
| 397 | assert result.exit_code == 0 |
| 398 | tip = get_head_commit_id(repo, "copy-of-main") |
| 399 | assert tip == head_cid |
| 400 | |
| 401 | def test_create_json_from_field_populated(self, repo: pathlib.Path) -> None: |
| 402 | result = _branch(repo, "with-from", "main", "--json") |
| 403 | data = json.loads(result.output) |
| 404 | assert data["from"] == "main" |
| 405 | |
| 406 | def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None: |
| 407 | _branch(repo, "dup") |
| 408 | result = _branch(repo, "dup") |
| 409 | assert result.exit_code == 1 |
| 410 | |
| 411 | def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None: |
| 412 | result = _branch(repo, "bad..name") |
| 413 | assert result.exit_code == 1 |
| 414 | |
| 415 | def test_create_does_not_checkout(self, repo: pathlib.Path) -> None: |
| 416 | _branch(repo, "new-but-no-switch") |
| 417 | assert read_current_branch(repo) == "main" |
| 418 | |
| 419 | |
| 420 | # ────────────────────────────────────────────────────────────────────────────── |
| 421 | # Integration — DELETE |
| 422 | # ────────────────────────────────────────────────────────────────────────────── |
| 423 | |
| 424 | |
| 425 | class TestDelete: |
| 426 | def test_delete_merged_branch_exits_0(self, repo: pathlib.Path) -> None: |
| 427 | _branch(repo, "to-delete") |
| 428 | # Branch points to same commit as main → considered merged |
| 429 | result = _branch(repo, "-d", "to-delete") |
| 430 | assert result.exit_code == 0 |
| 431 | |
| 432 | def test_delete_json_schema(self, repo: pathlib.Path) -> None: |
| 433 | _branch(repo, "del-json") |
| 434 | result = _branch(repo, "-d", "del-json", "--json") |
| 435 | data = json.loads(result.output) |
| 436 | assert data["action"] == "deleted" |
| 437 | assert data["branch"] == "del-json" |
| 438 | assert "was" in data |
| 439 | |
| 440 | def test_delete_unmerged_exits_1_without_force(self, repo: pathlib.Path) -> None: |
| 441 | _branch(repo, "unmerged") |
| 442 | _invoke(repo, ["checkout", "unmerged"]) |
| 443 | (repo / "z.py").write_text("z=1\n") |
| 444 | _commit(repo, "-m", "unmerged work") |
| 445 | _invoke(repo, ["checkout", "main"]) |
| 446 | result = _branch(repo, "-d", "unmerged") |
| 447 | assert result.exit_code == 1 |
| 448 | |
| 449 | def test_force_delete_unmerged_exits_0(self, repo: pathlib.Path) -> None: |
| 450 | _branch(repo, "force-del") |
| 451 | _invoke(repo, ["checkout", "force-del"]) |
| 452 | (repo / "x.py").write_text("x=1\n") |
| 453 | _commit(repo, "-m", "exclusive work") |
| 454 | _invoke(repo, ["checkout", "main"]) |
| 455 | result = _branch(repo, "-D", "force-del") |
| 456 | assert result.exit_code == 0 |
| 457 | |
| 458 | def test_delete_current_branch_exits_1(self, repo: pathlib.Path) -> None: |
| 459 | result = _branch(repo, "-d", "main") |
| 460 | assert result.exit_code == 1 |
| 461 | |
| 462 | def test_delete_nonexistent_exits_1(self, repo: pathlib.Path) -> None: |
| 463 | result = _branch(repo, "-d", "ghost") |
| 464 | assert result.exit_code == 1 |
| 465 | |
| 466 | def test_delete_removes_branch_from_list(self, repo: pathlib.Path) -> None: |
| 467 | _branch(repo, "temp") |
| 468 | _branch(repo, "-d", "temp") |
| 469 | result = _branch(repo, "--json") |
| 470 | names = [b["name"] for b in json.loads(result.output)] |
| 471 | assert "temp" not in names |
| 472 | |
| 473 | def test_delete_nested_branch_cleans_empty_dirs(self, repo: pathlib.Path) -> None: |
| 474 | _branch(repo, "feat/sub/task") |
| 475 | _branch(repo, "-D", "feat/sub/task") |
| 476 | # The feat/ and feat/sub/ dirs should be gone |
| 477 | feat_dir = repo / ".muse" / "refs" / "heads" / "feat" |
| 478 | assert not feat_dir.exists() |
| 479 | |
| 480 | |
| 481 | # ────────────────────────────────────────────────────────────────────────────── |
| 482 | # Integration — RENAME |
| 483 | # ────────────────────────────────────────────────────────────────────────────── |
| 484 | |
| 485 | |
| 486 | class TestRename: |
| 487 | def test_rename_basic(self, repo: pathlib.Path) -> None: |
| 488 | _branch(repo, "old-name") |
| 489 | result = _branch(repo, "-m", "old-name", "new-name") |
| 490 | assert result.exit_code == 0 |
| 491 | names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] |
| 492 | assert "new-name" in names |
| 493 | assert "old-name" not in names |
| 494 | |
| 495 | def test_rename_omit_old_uses_current(self, repo: pathlib.Path) -> None: |
| 496 | _branch(repo, "temp") |
| 497 | _invoke(repo, ["checkout", "temp"]) |
| 498 | result = _branch(repo, "-m", "renamed") |
| 499 | assert result.exit_code == 0 |
| 500 | assert read_current_branch(repo) == "renamed" |
| 501 | _invoke(repo, ["checkout", "main"]) |
| 502 | |
| 503 | def test_rename_json_schema(self, repo: pathlib.Path) -> None: |
| 504 | _branch(repo, "src") |
| 505 | result = _branch(repo, "-m", "src", "dst", "--json") |
| 506 | data = json.loads(result.output) |
| 507 | assert data["action"] == "renamed" |
| 508 | assert data["from"] == "src" |
| 509 | assert data["to"] == "dst" |
| 510 | |
| 511 | def test_rename_to_existing_exits_1(self, repo: pathlib.Path) -> None: |
| 512 | _branch(repo, "a") |
| 513 | _branch(repo, "b") |
| 514 | result = _branch(repo, "-m", "a", "b") |
| 515 | assert result.exit_code == 1 |
| 516 | |
| 517 | def test_force_rename_to_existing_exits_0(self, repo: pathlib.Path) -> None: |
| 518 | _branch(repo, "a") |
| 519 | _branch(repo, "b") |
| 520 | result = _branch(repo, "-M", "a", "b") |
| 521 | assert result.exit_code == 0 |
| 522 | |
| 523 | def test_rename_updates_head_when_current(self, repo: pathlib.Path) -> None: |
| 524 | _branch(repo, "temp2") |
| 525 | _invoke(repo, ["checkout", "temp2"]) |
| 526 | _branch(repo, "-m", "temp2", "newname") |
| 527 | assert read_current_branch(repo) == "newname" |
| 528 | _invoke(repo, ["checkout", "main"]) |
| 529 | |
| 530 | def test_rename_nonexistent_exits_1(self, repo: pathlib.Path) -> None: |
| 531 | result = _branch(repo, "-m", "ghost", "newname") |
| 532 | assert result.exit_code == 1 |
| 533 | |
| 534 | |
| 535 | # ────────────────────────────────────────────────────────────────────────────── |
| 536 | # Integration — COPY |
| 537 | # ────────────────────────────────────────────────────────────────────────────── |
| 538 | |
| 539 | |
| 540 | class TestCopy: |
| 541 | def test_copy_basic(self, repo: pathlib.Path) -> None: |
| 542 | _branch(repo, "orig") |
| 543 | result = _branch(repo, "-c", "orig", "clone") |
| 544 | assert result.exit_code == 0 |
| 545 | names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] |
| 546 | assert "orig" in names |
| 547 | assert "clone" in names |
| 548 | |
| 549 | def test_copy_same_tip(self, repo: pathlib.Path) -> None: |
| 550 | _branch(repo, "src") |
| 551 | _branch(repo, "-c", "src", "dst") |
| 552 | tip_src = get_head_commit_id(repo, "src") |
| 553 | tip_dst = get_head_commit_id(repo, "dst") |
| 554 | assert tip_src == tip_dst |
| 555 | |
| 556 | def test_copy_json_schema(self, repo: pathlib.Path) -> None: |
| 557 | _branch(repo, "original") |
| 558 | result = _branch(repo, "-c", "original", "copy1", "--json") |
| 559 | data = json.loads(result.output) |
| 560 | assert data["action"] == "copied" |
| 561 | assert data["from"] == "original" |
| 562 | assert data["to"] == "copy1" |
| 563 | |
| 564 | def test_copy_to_existing_exits_1(self, repo: pathlib.Path) -> None: |
| 565 | _branch(repo, "x") |
| 566 | _branch(repo, "y") |
| 567 | result = _branch(repo, "-c", "x", "y") |
| 568 | assert result.exit_code == 1 |
| 569 | |
| 570 | def test_force_copy_to_existing_exits_0(self, repo: pathlib.Path) -> None: |
| 571 | _branch(repo, "p") |
| 572 | _branch(repo, "q") |
| 573 | result = _branch(repo, "-C", "p", "q") |
| 574 | assert result.exit_code == 0 |
| 575 | |
| 576 | def test_copy_omit_src_uses_current(self, repo: pathlib.Path) -> None: |
| 577 | head = get_head_commit_id(repo, "main") |
| 578 | result = _branch(repo, "-c", "main-copy") |
| 579 | assert result.exit_code == 0 |
| 580 | tip = get_head_commit_id(repo, "main-copy") |
| 581 | assert tip == head |
| 582 | |
| 583 | |
| 584 | # ────────────────────────────────────────────────────────────────────────────── |
| 585 | # Integration — LIST |
| 586 | # ────────────────────────────────────────────────────────────────────────────── |
| 587 | |
| 588 | |
| 589 | class TestList: |
| 590 | def test_list_text_exits_0(self, repo: pathlib.Path) -> None: |
| 591 | result = _branch(repo) |
| 592 | assert result.exit_code == 0 |
| 593 | |
| 594 | def test_list_contains_main(self, repo: pathlib.Path) -> None: |
| 595 | result = _branch(repo) |
| 596 | assert "main" in result.output |
| 597 | |
| 598 | def test_list_marks_current_branch(self, repo: pathlib.Path) -> None: |
| 599 | result = _branch(repo) |
| 600 | # Current branch line must start with "* " |
| 601 | current_lines = [l for l in result.output.splitlines() if l.startswith("* ")] |
| 602 | assert len(current_lines) == 1 |
| 603 | assert "main" in current_lines[0] |
| 604 | |
| 605 | def test_list_json_schema(self, repo: pathlib.Path) -> None: |
| 606 | result = _branch(repo, "--json") |
| 607 | data = json.loads(result.output) |
| 608 | assert isinstance(data, list) |
| 609 | assert len(data) >= 1 |
| 610 | keys = set(data[0].keys()) |
| 611 | assert {"name", "current", "commit_id", "last_message", "upstream"} <= keys |
| 612 | |
| 613 | def test_list_json_current_flag(self, repo: pathlib.Path) -> None: |
| 614 | result = _branch(repo, "--json") |
| 615 | data = json.loads(result.output) |
| 616 | current = [b for b in data if b["current"]] |
| 617 | assert len(current) == 1 |
| 618 | assert current[0]["name"] == "main" |
| 619 | |
| 620 | def test_list_json_last_message_populated(self, repo: pathlib.Path) -> None: |
| 621 | result = _branch(repo, "--json") |
| 622 | data = json.loads(result.output) |
| 623 | main_entry = next(b for b in data if b["name"] == "main") |
| 624 | assert main_entry["last_message"] is not None |
| 625 | assert "initial" in main_entry["last_message"] |
| 626 | |
| 627 | def test_list_json_upstream_null_by_default(self, repo: pathlib.Path) -> None: |
| 628 | result = _branch(repo, "--json") |
| 629 | data = json.loads(result.output) |
| 630 | main_entry = next(b for b in data if b["name"] == "main") |
| 631 | assert main_entry["upstream"] is None |
| 632 | |
| 633 | def test_list_verbose_shows_sha(self, repo: pathlib.Path) -> None: |
| 634 | result = _branch(repo, "-v") |
| 635 | # Short SHA should appear |
| 636 | cid = get_head_commit_id(repo, "main") |
| 637 | assert cid is not None |
| 638 | assert cid[:8] in result.output |
| 639 | |
| 640 | def test_list_verbose_shows_message(self, repo: pathlib.Path) -> None: |
| 641 | result = _branch(repo, "-v") |
| 642 | assert "initial" in result.output |
| 643 | |
| 644 | def test_list_multiple_branches(self, repo: pathlib.Path) -> None: |
| 645 | _branch(repo, "feat/a") |
| 646 | _branch(repo, "feat/b") |
| 647 | result = _branch(repo, "--json") |
| 648 | data = json.loads(result.output) |
| 649 | names = [b["name"] for b in data] |
| 650 | assert "feat/a" in names |
| 651 | assert "feat/b" in names |
| 652 | |
| 653 | def test_list_sorted_by_name(self, repo: pathlib.Path) -> None: |
| 654 | _branch(repo, "z-last") |
| 655 | _branch(repo, "a-first") |
| 656 | result = _branch(repo, "--json") |
| 657 | data = json.loads(result.output) |
| 658 | names = [b["name"] for b in data] |
| 659 | assert names == sorted(names) |
| 660 | |
| 661 | def test_list_sort_committeddate(self, repo: pathlib.Path) -> None: |
| 662 | _branch(repo, "feat-x") |
| 663 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 664 | assert result.exit_code == 0 |
| 665 | data = json.loads(result.output) |
| 666 | assert isinstance(data, list) |
| 667 | |
| 668 | |
| 669 | # ────────────────────────────────────────────────────────────────────────────── |
| 670 | # Integration — FILTERS |
| 671 | # ────────────────────────────────────────────────────────────────────────────── |
| 672 | |
| 673 | |
| 674 | class TestFilters: |
| 675 | def test_merged_filter_includes_self(self, repo: pathlib.Path) -> None: |
| 676 | result = _branch(repo, "--merged", "--json") |
| 677 | data = json.loads(result.output) |
| 678 | names = [b["name"] for b in data] |
| 679 | assert "main" in names |
| 680 | |
| 681 | def test_merged_filter_excludes_unmerged(self, repo: pathlib.Path) -> None: |
| 682 | _branch(repo, "unmerged-feat") |
| 683 | _invoke(repo, ["checkout", "unmerged-feat"]) |
| 684 | (repo / "u.py").write_text("u=1\n") |
| 685 | _commit(repo, "-m", "unmerged") |
| 686 | _invoke(repo, ["checkout", "main"]) |
| 687 | result = _branch(repo, "--merged", "--json") |
| 688 | data = json.loads(result.output) |
| 689 | names = [b["name"] for b in data] |
| 690 | assert "unmerged-feat" not in names |
| 691 | |
| 692 | def test_no_merged_filter_includes_unmerged(self, repo: pathlib.Path) -> None: |
| 693 | _branch(repo, "exclusive-feat") |
| 694 | _invoke(repo, ["checkout", "exclusive-feat"]) |
| 695 | (repo / "e.py").write_text("e=1\n") |
| 696 | _commit(repo, "-m", "exclusive") |
| 697 | _invoke(repo, ["checkout", "main"]) |
| 698 | result = _branch(repo, "--no-merged", "--json") |
| 699 | data = json.loads(result.output) |
| 700 | names = [b["name"] for b in data] |
| 701 | assert "exclusive-feat" in names |
| 702 | |
| 703 | def test_no_merged_filter_excludes_self(self, repo: pathlib.Path) -> None: |
| 704 | result = _branch(repo, "--no-merged", "--json") |
| 705 | data = json.loads(result.output) |
| 706 | names = [b["name"] for b in data] |
| 707 | assert "main" not in names |
| 708 | |
| 709 | def test_contains_commit_filter(self, repo: pathlib.Path) -> None: |
| 710 | cid = get_head_commit_id(repo, "main") |
| 711 | assert cid is not None |
| 712 | result = _branch(repo, "--contains", cid, "--json") |
| 713 | data = json.loads(result.output) |
| 714 | names = [b["name"] for b in data] |
| 715 | assert "main" in names |
| 716 | |
| 717 | def test_contains_unknown_commit_empty(self, repo: pathlib.Path) -> None: |
| 718 | result = _branch(repo, "--contains", "a" * 64, "--json") |
| 719 | data = json.loads(result.output) |
| 720 | assert data == [] |
| 721 | |
| 722 | |
| 723 | # ────────────────────────────────────────────────────────────────────────────── |
| 724 | # Integration — validation |
| 725 | # ────────────────────────────────────────────────────────────────────────────── |
| 726 | |
| 727 | |
| 728 | class TestValidation: |
| 729 | def test_unknown_format_exits_1(self, repo: pathlib.Path) -> None: |
| 730 | result = _branch(repo, "--format", "xml") |
| 731 | assert result.exit_code == 1 |
| 732 | |
| 733 | def test_unknown_format_sanitized_error(self, repo: pathlib.Path) -> None: |
| 734 | result = _branch(repo, "--format", "\x1b[31mxml\x1b[0m") |
| 735 | assert "\x1b" not in result.output |
| 736 | |
| 737 | def test_delete_without_name_exits_1(self, repo: pathlib.Path) -> None: |
| 738 | result = _branch(repo, "-d") |
| 739 | assert result.exit_code == 1 |
| 740 | |
| 741 | def test_rename_too_many_args_exits_1(self, repo: pathlib.Path) -> None: |
| 742 | result = _branch(repo, "-m", "a", "b", "c") |
| 743 | assert result.exit_code == 1 |
| 744 | |
| 745 | def test_copy_too_many_args_exits_1(self, repo: pathlib.Path) -> None: |
| 746 | result = _branch(repo, "-c", "a", "b", "c") |
| 747 | assert result.exit_code == 1 |
| 748 | |
| 749 | |
| 750 | # ────────────────────────────────────────────────────────────────────────────── |
| 751 | # Security — ANSI injection |
| 752 | # ────────────────────────────────────────────────────────────────────────────── |
| 753 | |
| 754 | |
| 755 | class TestSecurityAnsi: |
| 756 | def _has_ansi(self, s: str) -> bool: |
| 757 | return "\x1b[" in s |
| 758 | |
| 759 | def test_ansi_in_branch_name_rejected(self, repo: pathlib.Path) -> None: |
| 760 | result = _branch(repo, "\x1b[31mevil\x1b[0m") |
| 761 | assert result.exit_code == 1 |
| 762 | assert not self._has_ansi(result.output) |
| 763 | |
| 764 | def test_ansi_in_delete_name_rejected(self, repo: pathlib.Path) -> None: |
| 765 | result = _branch(repo, "-d", "\x1b[31mevil\x1b[0m") |
| 766 | assert result.exit_code == 1 |
| 767 | assert not self._has_ansi(result.output) |
| 768 | |
| 769 | def test_ansi_in_rename_new_name_rejected(self, repo: pathlib.Path) -> None: |
| 770 | result = _branch(repo, "-m", "\x1b[31mnew\x1b[0m") |
| 771 | assert result.exit_code == 1 |
| 772 | assert not self._has_ansi(result.output) |
| 773 | |
| 774 | def test_ansi_in_format_flag_sanitized(self, repo: pathlib.Path) -> None: |
| 775 | result = _branch(repo, "--format", "\x1b[31mxml\x1b[0m") |
| 776 | assert not self._has_ansi(result.output) |
| 777 | |
| 778 | def test_ansi_in_contains_commit_id(self, repo: pathlib.Path) -> None: |
| 779 | result = _branch(repo, "--contains", "\x1b[31mevil\x1b[0m") |
| 780 | # Should exit 0 (no match, empty list) or exit 0 with empty list |
| 781 | # Either way, ANSI must not appear in output |
| 782 | assert not self._has_ansi(result.output) |
| 783 | |
| 784 | def test_errors_go_to_stderr(self, repo: pathlib.Path) -> None: |
| 785 | result = _branch(repo, "-d", "nonexistent") |
| 786 | assert result.exit_code == 1 |
| 787 | # Error should NOT appear in stdout |
| 788 | assert "not found" not in result.output.lower() or (result.stderr and "not found" in result.stderr.lower()) |
| 789 | |
| 790 | |
| 791 | # ────────────────────────────────────────────────────────────────────────────── |
| 792 | # Stress |
| 793 | # ────────────────────────────────────────────────────────────────────────────── |
| 794 | |
| 795 | |
| 796 | @pytest.mark.slow |
| 797 | class TestStress: |
| 798 | def test_list_500_branches_fast(self, repo: pathlib.Path) -> None: |
| 799 | """Listing 500 branches must complete in under 2 seconds.""" |
| 800 | for i in range(500): |
| 801 | _branch(repo, f"feat/task-{i:04d}") |
| 802 | t0 = time.perf_counter() |
| 803 | result = _branch(repo, "--json") |
| 804 | elapsed = (time.perf_counter() - t0) * 1000 |
| 805 | data = json.loads(result.output) |
| 806 | assert len(data) == 501 # main + 500 |
| 807 | assert elapsed < 2000, f"list 500 branches took {elapsed:.0f}ms (limit 2000ms)" |
| 808 | |
| 809 | def test_merged_filter_100_branches(self, repo: pathlib.Path) -> None: |
| 810 | """--merged filter on 100 branches completes in reasonable time.""" |
| 811 | for i in range(100): |
| 812 | _branch(repo, f"task-{i:03d}") |
| 813 | t0 = time.perf_counter() |
| 814 | result = _branch(repo, "--merged", "--json") |
| 815 | elapsed = (time.perf_counter() - t0) * 1000 |
| 816 | data = json.loads(result.output) |
| 817 | # All branches share the same commit as main → all merged |
| 818 | assert len(data) == 101 |
| 819 | assert elapsed < 3000, f"--merged on 100 branches took {elapsed:.0f}ms" |
| 820 | |
| 821 | def test_sort_committeddate_100_branches(self, repo: pathlib.Path) -> None: |
| 822 | for i in range(100): |
| 823 | _branch(repo, f"sort-{i:03d}") |
| 824 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 825 | assert result.exit_code == 0 |
| 826 | data = json.loads(result.output) |
| 827 | assert len(data) == 101 |
| 828 | |
| 829 | def test_concurrent_branch_list_separate_repos(self, tmp_path: pathlib.Path) -> None: |
| 830 | errors: list[str] = [] |
| 831 | |
| 832 | def do_branch(idx: int) -> None: |
| 833 | repo_dir = tmp_path / f"repo_{idx}" |
| 834 | repo_dir.mkdir() |
| 835 | subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) |
| 836 | (repo_dir / "x.py").write_text(f"x={idx}\n") |
| 837 | subprocess.run( |
| 838 | ["muse", "commit", "-m", f"c{idx}"], |
| 839 | cwd=str(repo_dir), capture_output=True, |
| 840 | ) |
| 841 | for j in range(5): |
| 842 | subprocess.run( |
| 843 | ["muse", "branch", f"b{j}"], |
| 844 | cwd=str(repo_dir), capture_output=True, |
| 845 | ) |
| 846 | r = subprocess.run( |
| 847 | ["muse", "branch", "--json"], |
| 848 | cwd=str(repo_dir), capture_output=True, text=True, |
| 849 | ) |
| 850 | if r.returncode != 0: |
| 851 | errors.append(f"repo_{idx}: branch --json failed") |
| 852 | return |
| 853 | data = json.loads(r.stdout) |
| 854 | if len(data) != 6: # main + 5 |
| 855 | errors.append(f"repo_{idx}: expected 6 branches, got {len(data)}") |
| 856 | |
| 857 | threads = [threading.Thread(target=do_branch, args=(i,)) for i in range(6)] |
| 858 | for t in threads: |
| 859 | t.start() |
| 860 | for t in threads: |
| 861 | t.join() |
| 862 | assert not errors, "Concurrent branch errors:\n" + "\n".join(errors) |
| 863 | |
| 864 | def test_deep_ancestor_chain_is_merged(self, repo: pathlib.Path) -> None: |
| 865 | """A branch with 50 ancestors is correctly detected as merged.""" |
| 866 | _branch(repo, "long-chain") |
| 867 | _invoke(repo, ["checkout", "long-chain"]) |
| 868 | for i in range(50): |
| 869 | (repo / f"step_{i:03d}.py").write_text(f"s={i}\n") |
| 870 | _commit(repo, "-m", f"step {i}") |
| 871 | _invoke(repo, ["checkout", "main"]) |
| 872 | _invoke(repo, ["merge", "long-chain"]) |
| 873 | result = _branch(repo, "--merged", "--json") |
| 874 | data = json.loads(result.output) |
| 875 | names = [b["name"] for b in data] |
| 876 | assert "long-chain" in names |
File History
3 commits
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29
fix(branch): guard -d --dry-run against destructive writes;…
Sonnet 4.6
patch
9 days ago
sha256:144495ae90498f9d51aeac4bf7f9ee9e9502d14474745a856b96921f7349c0ce
next round of fixing tests
Human
15 days ago
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103
feat: branch --prune-config, fix hub repo delete docstrings…
Sonnet 4.6
minor
⚠
16 days ago