test_cmd_branch.py
python
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29
fix(branch): guard -d --dry-run against destructive writes;…
Sonnet 4.6
patch
8 days ago
| 1 | """Tests for ``muse branch``. |
| 2 | |
| 3 | Coverage tiers |
| 4 | -------------- |
| 5 | Unit — parser flags, dead-code removal, helpers (_resolve_start_point, |
| 6 | _list_local_branches, _list_remotes, _upstream_for, |
| 7 | _commit_ancestors, _is_merged, _contains_commit, |
| 8 | _cleanup_empty_dirs). |
| 9 | Integration — create, delete, force-delete, rename, force-rename, copy, |
| 10 | force-copy, listing, filtering, sorting. |
| 11 | End-to-end — full CLI invocations: text and JSON output, all operations. |
| 12 | Security — ANSI injection in branch names, format flags, messages. |
| 13 | Stress — 500 branches, concurrent list, deep ancestry chains. |
| 14 | """ |
| 15 | |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import json |
| 19 | import os |
| 20 | import pathlib |
| 21 | import subprocess |
| 22 | import threading |
| 23 | import time |
| 24 | from typing import TYPE_CHECKING |
| 25 | |
| 26 | import pytest |
| 27 | |
| 28 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 29 | from muse.core.refs import ( |
| 30 | get_head_commit_id, |
| 31 | read_current_branch, |
| 32 | ) |
| 33 | from muse.core.types import short_id |
| 34 | from muse.core.paths import heads_dir, logs_dir |
| 35 | |
| 36 | if TYPE_CHECKING: |
| 37 | import argparse |
| 38 | |
| 39 | runner = CliRunner() |
| 40 | |
| 41 | # ────────────────────────────────────────────────────────────────────────────── |
| 42 | # Helpers |
| 43 | # ────────────────────────────────────────────────────────────────────────────── |
| 44 | |
| 45 | |
| 46 | def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: |
| 47 | saved = os.getcwd() |
| 48 | try: |
| 49 | os.chdir(repo) |
| 50 | return runner.invoke(None, args) |
| 51 | finally: |
| 52 | os.chdir(saved) |
| 53 | |
| 54 | |
| 55 | def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 56 | return _invoke(repo, ["branch", *extra]) |
| 57 | |
| 58 | |
| 59 | def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 60 | return _invoke(repo, ["commit", *extra]) |
| 61 | |
| 62 | |
| 63 | @pytest.fixture() |
| 64 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 65 | """Initialised repo with one commit on ``main``.""" |
| 66 | saved = os.getcwd() |
| 67 | try: |
| 68 | os.chdir(tmp_path) |
| 69 | runner.invoke(None, ["init"]) |
| 70 | finally: |
| 71 | os.chdir(saved) |
| 72 | (tmp_path / "a.py").write_text("x = 1\n") |
| 73 | _commit(tmp_path, "-m", "initial") |
| 74 | return tmp_path |
| 75 | |
| 76 | |
| 77 | @pytest.fixture() |
| 78 | def two_commit_repo(repo: pathlib.Path) -> pathlib.Path: |
| 79 | """Repo with two commits on ``main``.""" |
| 80 | (repo / "b.py").write_text("y = 2\n") |
| 81 | _commit(repo, "-m", "second") |
| 82 | return repo |
| 83 | |
| 84 | |
| 85 | # ────────────────────────────────────────────────────────────────────────────── |
| 86 | # Unit — parser flags |
| 87 | # ────────────────────────────────────────────────────────────────────────────── |
| 88 | |
| 89 | |
| 90 | class TestRegisterFlags: |
| 91 | def _parse(self, *args: str) -> "argparse.Namespace": |
| 92 | import argparse |
| 93 | |
| 94 | from muse.cli.commands.branch import register |
| 95 | |
| 96 | p = argparse.ArgumentParser() |
| 97 | sub = p.add_subparsers() |
| 98 | register(sub) |
| 99 | return p.parse_args(["branch", *args]) |
| 100 | |
| 101 | def test_default_json_out_is_false(self) -> None: |
| 102 | ns = self._parse() |
| 103 | assert ns.json_out is False |
| 104 | |
| 105 | def test_json_flag_sets_json_out(self) -> None: |
| 106 | ns = self._parse("--json") |
| 107 | assert ns.json_out is True |
| 108 | |
| 109 | def test_j_shorthand_sets_json_out(self) -> None: |
| 110 | ns = self._parse("-j") |
| 111 | assert ns.json_out is True |
| 112 | |
| 113 | def test_delete_flag(self) -> None: |
| 114 | ns = self._parse("-d", "foo") |
| 115 | assert ns.op == "delete" |
| 116 | |
| 117 | def test_force_delete_flag(self) -> None: |
| 118 | ns = self._parse("-D", "foo") |
| 119 | assert ns.op == "force_delete" |
| 120 | |
| 121 | def test_rename_flag(self) -> None: |
| 122 | ns = self._parse("-m", "new") |
| 123 | assert ns.op == "rename" |
| 124 | |
| 125 | def test_force_rename_flag(self) -> None: |
| 126 | ns = self._parse("-M", "new") |
| 127 | assert ns.op == "force_rename" |
| 128 | |
| 129 | def test_copy_flag(self) -> None: |
| 130 | ns = self._parse("-c", "copy") |
| 131 | assert ns.op == "copy" |
| 132 | |
| 133 | def test_force_copy_flag(self) -> None: |
| 134 | ns = self._parse("-C", "copy") |
| 135 | assert ns.op == "force_copy" |
| 136 | |
| 137 | def test_verbose_default_0(self) -> None: |
| 138 | ns = self._parse() |
| 139 | assert ns.verbose == 0 |
| 140 | |
| 141 | def test_verbose_v_is_1(self) -> None: |
| 142 | ns = self._parse("-v") |
| 143 | assert ns.verbose == 1 |
| 144 | |
| 145 | def test_verbose_vv_is_2(self) -> None: |
| 146 | ns = self._parse("-vv") |
| 147 | assert ns.verbose == 2 |
| 148 | |
| 149 | def test_remotes_flag(self) -> None: |
| 150 | ns = self._parse("-r") |
| 151 | assert ns.remotes is True |
| 152 | |
| 153 | def test_all_flag(self) -> None: |
| 154 | ns = self._parse("-a") |
| 155 | assert ns.all_branches is True |
| 156 | |
| 157 | def test_sort_default_name(self) -> None: |
| 158 | ns = self._parse() |
| 159 | assert ns.sort == "name" |
| 160 | |
| 161 | def test_sort_committeddate(self) -> None: |
| 162 | ns = self._parse("--sort", "committeddate") |
| 163 | assert ns.sort == "committeddate" |
| 164 | |
| 165 | def test_sort_invalid_rejected(self) -> None: |
| 166 | import argparse |
| 167 | |
| 168 | from muse.cli.commands.branch import register |
| 169 | |
| 170 | p = argparse.ArgumentParser() |
| 171 | sub = p.add_subparsers() |
| 172 | register(sub) |
| 173 | with pytest.raises(SystemExit): |
| 174 | p.parse_args(["branch", "--sort", "invalid"]) |
| 175 | |
| 176 | |
| 177 | # ────────────────────────────────────────────────────────────────────────────── |
| 178 | # Unit — dead-code removal |
| 179 | # ────────────────────────────────────────────────────────────────────────────── |
| 180 | |
| 181 | |
| 182 | class TestDeadCodeRemoved: |
| 183 | def test_op_list_branch_removed(self) -> None: |
| 184 | import inspect |
| 185 | |
| 186 | import muse.cli.commands.branch as m |
| 187 | |
| 188 | src = inspect.getsource(m.run) |
| 189 | assert 'op == "list"' not in src, ( |
| 190 | 'op == "list" was a dead branch (nothing in register() creates it); must be deleted' |
| 191 | ) |
| 192 | |
| 193 | def test_inline_tomllib_import_removed(self) -> None: |
| 194 | import inspect |
| 195 | |
| 196 | import muse.cli.commands.branch as m |
| 197 | |
| 198 | src = inspect.getsource(m._upstream_for) |
| 199 | assert "import tomllib" not in src, ( |
| 200 | "inline 'import tomllib' inside _upstream_for should be a module-level import" |
| 201 | ) |
| 202 | |
| 203 | def test_double_sanitize_removed(self) -> None: |
| 204 | """The verbose listing previously double-sanitized name_str (stripping ANSI).""" |
| 205 | import inspect |
| 206 | |
| 207 | import muse.cli.commands.branch as m |
| 208 | |
| 209 | src = inspect.getsource(m.run) |
| 210 | assert "sanitize_display(name_str)" not in src, ( |
| 211 | "name_str was double-sanitized; the second call stripped ANSI from current branch" |
| 212 | ) |
| 213 | |
| 214 | |
| 215 | # ────────────────────────────────────────────────────────────────────────────── |
| 216 | # Unit — _resolve_start_point |
| 217 | # ────────────────────────────────────────────────────────────────────────────── |
| 218 | |
| 219 | |
| 220 | class TestResolveStartPoint: |
| 221 | def test_resolves_branch_name(self, repo: pathlib.Path) -> None: |
| 222 | from muse.cli.commands.branch import _resolve_start_point |
| 223 | |
| 224 | cid = get_head_commit_id(repo, "main") |
| 225 | result = _resolve_start_point(repo, "main", "main") |
| 226 | assert result == cid |
| 227 | |
| 228 | def test_resolves_full_sha(self, repo: pathlib.Path) -> None: |
| 229 | from muse.cli.commands.branch import _resolve_start_point |
| 230 | |
| 231 | cid = get_head_commit_id(repo, "main") |
| 232 | assert cid is not None |
| 233 | result = _resolve_start_point(repo, "main", cid) |
| 234 | assert result == cid |
| 235 | |
| 236 | def test_resolves_partial_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 237 | from muse.cli.commands.branch import _resolve_start_point |
| 238 | from muse.core.refs import read_current_branch |
| 239 | from muse.core.commits import get_commits_for_branch |
| 240 | |
| 241 | repo = two_commit_repo |
| 242 | branch = read_current_branch(repo) |
| 243 | commits = get_commits_for_branch(repo, branch) |
| 244 | first_sha = commits[-1].commit_id # oldest commit |
| 245 | # 12-char prefix should resolve |
| 246 | result = _resolve_start_point(repo, "main", short_id(first_sha)) |
| 247 | assert result == first_sha |
| 248 | |
| 249 | def test_returns_input_for_unresolvable(self, repo: pathlib.Path) -> None: |
| 250 | from muse.cli.commands.branch import _resolve_start_point |
| 251 | |
| 252 | result = _resolve_start_point(repo, "main", "nonexistent-ref") |
| 253 | assert result == "nonexistent-ref" |
| 254 | |
| 255 | |
| 256 | # ────────────────────────────────────────────────────────────────────────────── |
| 257 | # Unit — _list_local_branches |
| 258 | # ────────────────────────────────────────────────────────────────────────────── |
| 259 | |
| 260 | |
| 261 | class TestListLocalBranches: |
| 262 | def test_returns_sorted_list(self, repo: pathlib.Path) -> None: |
| 263 | from muse.cli.commands.branch import _list_local_branches |
| 264 | |
| 265 | _branch(repo, "z-last") |
| 266 | _branch(repo, "a-first") |
| 267 | branches = _list_local_branches(repo) |
| 268 | assert branches == sorted(branches) |
| 269 | |
| 270 | def test_skips_hidden_files(self, repo: pathlib.Path) -> None: |
| 271 | from muse.cli.commands.branch import _list_local_branches |
| 272 | |
| 273 | # Plant a hidden lock file inside refs/heads/ |
| 274 | lock = heads_dir(repo) / ".lock" |
| 275 | lock.write_text("locked") |
| 276 | branches = _list_local_branches(repo) |
| 277 | assert ".lock" not in branches |
| 278 | assert not any(b.startswith(".") for b in branches) |
| 279 | |
| 280 | def test_empty_repo_returns_empty(self, tmp_path: pathlib.Path) -> None: |
| 281 | from muse.cli.commands.branch import _list_local_branches |
| 282 | |
| 283 | assert _list_local_branches(tmp_path) == [] |
| 284 | |
| 285 | def test_includes_nested_branches(self, repo: pathlib.Path) -> None: |
| 286 | from muse.cli.commands.branch import _list_local_branches |
| 287 | |
| 288 | _branch(repo, "feat/sub/task") |
| 289 | branches = _list_local_branches(repo) |
| 290 | assert "feat/sub/task" in branches |
| 291 | |
| 292 | |
| 293 | # ────────────────────────────────────────────────────────────────────────────── |
| 294 | # Unit — _commit_ancestors, _is_merged, _contains_commit |
| 295 | # ────────────────────────────────────────────────────────────────────────────── |
| 296 | |
| 297 | |
| 298 | class TestCommitGraph: |
| 299 | def test_commit_ancestors_includes_self(self, repo: pathlib.Path) -> None: |
| 300 | from muse.cli.commands.branch import _commit_ancestors |
| 301 | |
| 302 | cid = get_head_commit_id(repo, "main") |
| 303 | assert cid is not None |
| 304 | ancestors = _commit_ancestors(repo, cid) |
| 305 | assert cid in ancestors |
| 306 | |
| 307 | def test_is_merged_true_for_same_branch(self, repo: pathlib.Path) -> None: |
| 308 | from muse.cli.commands.branch import _is_merged |
| 309 | |
| 310 | assert _is_merged(repo, "main", "main") |
| 311 | |
| 312 | def test_is_merged_false_for_unmerged(self, repo: pathlib.Path) -> None: |
| 313 | from muse.cli.commands.branch import _is_merged |
| 314 | |
| 315 | _branch(repo, "feat") |
| 316 | _invoke(repo, ["checkout", "feat"]) |
| 317 | (repo / "c.py").write_text("c=1\n") |
| 318 | _commit(repo, "-m", "feat commit") |
| 319 | _invoke(repo, ["checkout", "main"]) |
| 320 | assert not _is_merged(repo, "feat", "main") |
| 321 | |
| 322 | def test_contains_commit_true(self, repo: pathlib.Path) -> None: |
| 323 | from muse.cli.commands.branch import _contains_commit |
| 324 | |
| 325 | cid = get_head_commit_id(repo, "main") |
| 326 | assert cid is not None |
| 327 | assert _contains_commit(repo, "main", cid) |
| 328 | |
| 329 | def test_contains_commit_false_for_unknown(self, repo: pathlib.Path) -> None: |
| 330 | from muse.cli.commands.branch import _contains_commit |
| 331 | |
| 332 | assert not _contains_commit(repo, "main", "a" * 64) |
| 333 | |
| 334 | |
| 335 | # ────────────────────────────────────────────────────────────────────────────── |
| 336 | # Integration — CREATE |
| 337 | # ────────────────────────────────────────────────────────────────────────────── |
| 338 | |
| 339 | |
| 340 | class TestCreate: |
| 341 | def test_create_basic_exits_0(self, repo: pathlib.Path) -> None: |
| 342 | result = _branch(repo, "new-branch") |
| 343 | assert result.exit_code == 0 |
| 344 | |
| 345 | def test_create_text_output(self, repo: pathlib.Path) -> None: |
| 346 | result = _branch(repo, "my-branch") |
| 347 | assert "my-branch" in result.output |
| 348 | |
| 349 | def test_create_json_schema(self, repo: pathlib.Path) -> None: |
| 350 | result = _branch(repo, "json-branch", "--json") |
| 351 | data = json.loads(result.output) |
| 352 | assert data["action"] == "created" |
| 353 | assert data["branch"] == "json-branch" |
| 354 | assert "commit_id" in data |
| 355 | assert "from" in data |
| 356 | |
| 357 | def test_create_json_from_is_none_at_head(self, repo: pathlib.Path) -> None: |
| 358 | result = _branch(repo, "from-head", "--json") |
| 359 | data = json.loads(result.output) |
| 360 | assert data["from"] is None |
| 361 | |
| 362 | def test_create_at_full_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 363 | repo = two_commit_repo |
| 364 | from muse.core.refs import read_current_branch |
| 365 | from muse.core.commits import get_commits_for_branch |
| 366 | |
| 367 | branch = read_current_branch(repo) |
| 368 | commits = get_commits_for_branch(repo, branch) |
| 369 | first_sha = commits[-1].commit_id |
| 370 | |
| 371 | result = _branch(repo, "at-sha", first_sha) |
| 372 | assert result.exit_code == 0 |
| 373 | tip = get_head_commit_id(repo, "at-sha") |
| 374 | assert tip == first_sha |
| 375 | |
| 376 | def test_create_at_partial_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 377 | repo = two_commit_repo |
| 378 | from muse.core.refs import read_current_branch |
| 379 | from muse.core.commits import get_commits_for_branch |
| 380 | |
| 381 | branch = read_current_branch(repo) |
| 382 | commits = get_commits_for_branch(repo, branch) |
| 383 | first_sha = commits[-1].commit_id |
| 384 | |
| 385 | result = _branch(repo, "at-partial", short_id(first_sha)) |
| 386 | assert result.exit_code == 0 |
| 387 | tip = get_head_commit_id(repo, "at-partial") |
| 388 | assert tip == first_sha |
| 389 | |
| 390 | def test_create_at_branch_name(self, repo: pathlib.Path) -> None: |
| 391 | head_cid = get_head_commit_id(repo, "main") |
| 392 | result = _branch(repo, "copy-of-main", "main") |
| 393 | assert result.exit_code == 0 |
| 394 | tip = get_head_commit_id(repo, "copy-of-main") |
| 395 | assert tip == head_cid |
| 396 | |
| 397 | def test_create_json_from_field_populated(self, repo: pathlib.Path) -> None: |
| 398 | result = _branch(repo, "with-from", "main", "--json") |
| 399 | data = json.loads(result.output) |
| 400 | assert data["from"] == "main" |
| 401 | |
| 402 | def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None: |
| 403 | _branch(repo, "dup") |
| 404 | result = _branch(repo, "dup") |
| 405 | assert result.exit_code == 1 |
| 406 | |
| 407 | def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None: |
| 408 | result = _branch(repo, "bad..name") |
| 409 | assert result.exit_code == 1 |
| 410 | |
| 411 | def test_create_does_not_checkout(self, repo: pathlib.Path) -> None: |
| 412 | _branch(repo, "new-but-no-switch") |
| 413 | assert read_current_branch(repo) == "main" |
| 414 | |
| 415 | |
| 416 | # ────────────────────────────────────────────────────────────────────────────── |
| 417 | # Integration — DELETE |
| 418 | # ────────────────────────────────────────────────────────────────────────────── |
| 419 | |
| 420 | |
| 421 | class TestDelete: |
| 422 | def test_delete_merged_branch_exits_0(self, repo: pathlib.Path) -> None: |
| 423 | _branch(repo, "to-delete") |
| 424 | # Branch points to same commit as main → considered merged |
| 425 | result = _branch(repo, "-d", "to-delete") |
| 426 | assert result.exit_code == 0 |
| 427 | |
| 428 | def test_delete_json_schema(self, repo: pathlib.Path) -> None: |
| 429 | _branch(repo, "del-json") |
| 430 | result = _branch(repo, "-d", "del-json", "--json") |
| 431 | data = json.loads(result.output) |
| 432 | assert data["action"] == "deleted" |
| 433 | assert data["branch"] == "del-json" |
| 434 | assert "was" in data |
| 435 | |
| 436 | def test_delete_unmerged_exits_1_without_force(self, repo: pathlib.Path) -> None: |
| 437 | _branch(repo, "unmerged") |
| 438 | _invoke(repo, ["checkout", "unmerged"]) |
| 439 | (repo / "z.py").write_text("z=1\n") |
| 440 | _commit(repo, "-m", "unmerged work") |
| 441 | _invoke(repo, ["checkout", "main"]) |
| 442 | result = _branch(repo, "-d", "unmerged") |
| 443 | assert result.exit_code == 1 |
| 444 | |
| 445 | def test_force_delete_unmerged_exits_0(self, repo: pathlib.Path) -> None: |
| 446 | _branch(repo, "force-del") |
| 447 | _invoke(repo, ["checkout", "force-del"]) |
| 448 | (repo / "x.py").write_text("x=1\n") |
| 449 | _commit(repo, "-m", "exclusive work") |
| 450 | _invoke(repo, ["checkout", "main"]) |
| 451 | result = _branch(repo, "-D", "force-del") |
| 452 | assert result.exit_code == 0 |
| 453 | |
| 454 | def test_delete_current_branch_exits_1(self, repo: pathlib.Path) -> None: |
| 455 | result = _branch(repo, "-d", "main") |
| 456 | assert result.exit_code == 1 |
| 457 | |
| 458 | def test_delete_nonexistent_exits_1(self, repo: pathlib.Path) -> None: |
| 459 | result = _branch(repo, "-d", "ghost") |
| 460 | assert result.exit_code == 1 |
| 461 | |
| 462 | def test_delete_removes_branch_from_list(self, repo: pathlib.Path) -> None: |
| 463 | _branch(repo, "temp") |
| 464 | _branch(repo, "-d", "temp") |
| 465 | result = _branch(repo, "--json") |
| 466 | names = [b["name"] for b in json.loads(result.output)] |
| 467 | assert "temp" not in names |
| 468 | |
| 469 | def test_delete_nested_branch_cleans_empty_dirs(self, repo: pathlib.Path) -> None: |
| 470 | _branch(repo, "feat/sub/task") |
| 471 | _branch(repo, "-D", "feat/sub/task") |
| 472 | # The feat/ and feat/sub/ dirs should be gone |
| 473 | feat_dir = heads_dir(repo) / "feat" |
| 474 | assert not feat_dir.exists() |
| 475 | |
| 476 | def test_delete_removes_reflog_file(self, repo: pathlib.Path) -> None: |
| 477 | """Deleting a branch removes its reflog file — git-idiomatic behaviour.""" |
| 478 | _invoke(repo, ["checkout", "-b", "bye"]) # checkout writes the reflog |
| 479 | _invoke(repo, ["checkout", "main"]) |
| 480 | reflog = logs_dir(repo) / "refs" / "heads" / "bye" |
| 481 | assert reflog.exists(), "reflog should exist after checkout -b" |
| 482 | _branch(repo, "-d", "bye") |
| 483 | assert not reflog.exists(), "reflog must be deleted when branch is deleted" |
| 484 | |
| 485 | def test_delete_nested_branch_removes_reflog_and_empty_dirs( |
| 486 | self, repo: pathlib.Path |
| 487 | ) -> None: |
| 488 | """Nested branch deletion removes reflog file and its empty parent dirs.""" |
| 489 | _invoke(repo, ["checkout", "-b", "feat/ui/button"]) |
| 490 | _invoke(repo, ["checkout", "main"]) |
| 491 | reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "ui" / "button" |
| 492 | assert reflog.exists(), "reflog should exist after checkout -b" |
| 493 | _branch(repo, "-D", "feat/ui/button") |
| 494 | assert not reflog.exists() |
| 495 | log_feat_dir = logs_dir(repo) / "refs" / "heads" / "feat" |
| 496 | assert not log_feat_dir.exists(), "empty reflog parent dirs must be cleaned up" |
| 497 | |
| 498 | def test_force_delete_also_removes_reflog(self, repo: pathlib.Path) -> None: |
| 499 | """-D (force delete) removes the reflog just like -d.""" |
| 500 | _invoke(repo, ["checkout", "-b", "force-log"]) |
| 501 | (repo / "tmp.py").write_text("x=1\n") |
| 502 | _commit(repo, "-m", "unmerged") |
| 503 | _invoke(repo, ["checkout", "main"]) |
| 504 | reflog = logs_dir(repo) / "refs" / "heads" / "force-log" |
| 505 | assert reflog.exists() |
| 506 | _branch(repo, "-D", "force-log") |
| 507 | assert not reflog.exists() |
| 508 | |
| 509 | |
| 510 | # ────────────────────────────────────────────────────────────────────────────── |
| 511 | # Integration — CREATE REFLOG |
| 512 | # ────────────────────────────────────────────────────────────────────────────── |
| 513 | |
| 514 | |
| 515 | class TestCreateReflog: |
| 516 | def test_branch_create_writes_reflog(self, repo: pathlib.Path) -> None: |
| 517 | """muse branch -b writes a reflog entry — git-idiomatic behaviour.""" |
| 518 | _branch(repo, "feat/new") |
| 519 | reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "new" |
| 520 | assert reflog.exists(), "reflog must exist after muse branch -b" |
| 521 | |
| 522 | def test_branch_create_reflog_contains_branch_created(self, repo: pathlib.Path) -> None: |
| 523 | """Reflog entry records a 'branch: Created' operation.""" |
| 524 | _branch(repo, "task/thing") |
| 525 | reflog = logs_dir(repo) / "refs" / "heads" / "task" / "thing" |
| 526 | content = reflog.read_text(encoding="utf-8") |
| 527 | assert "branch: Created" in content |
| 528 | |
| 529 | def test_branch_create_reflog_records_start_point(self, repo: pathlib.Path) -> None: |
| 530 | """Reflog entry for a branch created from another branch names that source.""" |
| 531 | _branch(repo, "task/from-main", "main") |
| 532 | reflog = logs_dir(repo) / "refs" / "heads" / "task" / "from-main" |
| 533 | content = reflog.read_text(encoding="utf-8") |
| 534 | assert "main" in content |
| 535 | |
| 536 | |
| 537 | # ────────────────────────────────────────────────────────────────────────────── |
| 538 | # Integration — RENAME |
| 539 | # ────────────────────────────────────────────────────────────────────────────── |
| 540 | |
| 541 | |
| 542 | class TestRename: |
| 543 | def test_rename_basic(self, repo: pathlib.Path) -> None: |
| 544 | _branch(repo, "old-name") |
| 545 | result = _branch(repo, "-m", "old-name", "new-name") |
| 546 | assert result.exit_code == 0 |
| 547 | names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] |
| 548 | assert "new-name" in names |
| 549 | assert "old-name" not in names |
| 550 | |
| 551 | def test_rename_omit_old_uses_current(self, repo: pathlib.Path) -> None: |
| 552 | _branch(repo, "temp") |
| 553 | _invoke(repo, ["checkout", "temp"]) |
| 554 | result = _branch(repo, "-m", "renamed") |
| 555 | assert result.exit_code == 0 |
| 556 | assert read_current_branch(repo) == "renamed" |
| 557 | _invoke(repo, ["checkout", "main"]) |
| 558 | |
| 559 | def test_rename_json_schema(self, repo: pathlib.Path) -> None: |
| 560 | _branch(repo, "src") |
| 561 | result = _branch(repo, "-m", "src", "dst", "--json") |
| 562 | data = json.loads(result.output) |
| 563 | assert data["action"] == "renamed" |
| 564 | assert data["from"] == "src" |
| 565 | assert data["to"] == "dst" |
| 566 | |
| 567 | def test_rename_to_existing_exits_1(self, repo: pathlib.Path) -> None: |
| 568 | _branch(repo, "a") |
| 569 | _branch(repo, "b") |
| 570 | result = _branch(repo, "-m", "a", "b") |
| 571 | assert result.exit_code == 1 |
| 572 | |
| 573 | def test_force_rename_to_existing_exits_0(self, repo: pathlib.Path) -> None: |
| 574 | _branch(repo, "a") |
| 575 | _branch(repo, "b") |
| 576 | result = _branch(repo, "-M", "a", "b") |
| 577 | assert result.exit_code == 0 |
| 578 | |
| 579 | def test_rename_updates_head_when_current(self, repo: pathlib.Path) -> None: |
| 580 | _branch(repo, "temp2") |
| 581 | _invoke(repo, ["checkout", "temp2"]) |
| 582 | _branch(repo, "-m", "temp2", "newname") |
| 583 | assert read_current_branch(repo) == "newname" |
| 584 | _invoke(repo, ["checkout", "main"]) |
| 585 | |
| 586 | def test_rename_nonexistent_exits_1(self, repo: pathlib.Path) -> None: |
| 587 | result = _branch(repo, "-m", "ghost", "newname") |
| 588 | assert result.exit_code == 1 |
| 589 | |
| 590 | |
| 591 | # ────────────────────────────────────────────────────────────────────────────── |
| 592 | # Integration — COPY |
| 593 | # ────────────────────────────────────────────────────────────────────────────── |
| 594 | |
| 595 | |
| 596 | class TestCopy: |
| 597 | def test_copy_basic(self, repo: pathlib.Path) -> None: |
| 598 | _branch(repo, "orig") |
| 599 | result = _branch(repo, "-c", "orig", "clone") |
| 600 | assert result.exit_code == 0 |
| 601 | names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] |
| 602 | assert "orig" in names |
| 603 | assert "clone" in names |
| 604 | |
| 605 | def test_copy_same_tip(self, repo: pathlib.Path) -> None: |
| 606 | _branch(repo, "src") |
| 607 | _branch(repo, "-c", "src", "dst") |
| 608 | tip_src = get_head_commit_id(repo, "src") |
| 609 | tip_dst = get_head_commit_id(repo, "dst") |
| 610 | assert tip_src == tip_dst |
| 611 | |
| 612 | def test_copy_json_schema(self, repo: pathlib.Path) -> None: |
| 613 | _branch(repo, "original") |
| 614 | result = _branch(repo, "-c", "original", "copy1", "--json") |
| 615 | data = json.loads(result.output) |
| 616 | assert data["action"] == "copied" |
| 617 | assert data["from"] == "original" |
| 618 | assert data["to"] == "copy1" |
| 619 | |
| 620 | def test_copy_to_existing_exits_1(self, repo: pathlib.Path) -> None: |
| 621 | _branch(repo, "x") |
| 622 | _branch(repo, "y") |
| 623 | result = _branch(repo, "-c", "x", "y") |
| 624 | assert result.exit_code == 1 |
| 625 | |
| 626 | def test_force_copy_to_existing_exits_0(self, repo: pathlib.Path) -> None: |
| 627 | _branch(repo, "p") |
| 628 | _branch(repo, "q") |
| 629 | result = _branch(repo, "-C", "p", "q") |
| 630 | assert result.exit_code == 0 |
| 631 | |
| 632 | def test_copy_omit_src_uses_current(self, repo: pathlib.Path) -> None: |
| 633 | head = get_head_commit_id(repo, "main") |
| 634 | result = _branch(repo, "-c", "main-copy") |
| 635 | assert result.exit_code == 0 |
| 636 | tip = get_head_commit_id(repo, "main-copy") |
| 637 | assert tip == head |
| 638 | |
| 639 | |
| 640 | # ────────────────────────────────────────────────────────────────────────────── |
| 641 | # Integration — LIST |
| 642 | # ────────────────────────────────────────────────────────────────────────────── |
| 643 | |
| 644 | |
| 645 | class TestList: |
| 646 | def test_list_text_exits_0(self, repo: pathlib.Path) -> None: |
| 647 | result = _branch(repo) |
| 648 | assert result.exit_code == 0 |
| 649 | |
| 650 | def test_list_contains_main(self, repo: pathlib.Path) -> None: |
| 651 | result = _branch(repo) |
| 652 | assert "main" in result.output |
| 653 | |
| 654 | def test_list_marks_current_branch(self, repo: pathlib.Path) -> None: |
| 655 | result = _branch(repo) |
| 656 | # Current branch line must start with "* " |
| 657 | current_lines = [l for l in result.output.splitlines() if l.startswith("* ")] |
| 658 | assert len(current_lines) == 1 |
| 659 | assert "main" in current_lines[0] |
| 660 | |
| 661 | def test_list_json_schema(self, repo: pathlib.Path) -> None: |
| 662 | result = _branch(repo, "--json") |
| 663 | data = json.loads(result.output) |
| 664 | assert isinstance(data, list) |
| 665 | assert len(data) >= 1 |
| 666 | keys = set(data[0].keys()) |
| 667 | assert {"name", "current", "commit_id", "last_message", "upstream"} <= keys |
| 668 | |
| 669 | def test_list_json_current_flag(self, repo: pathlib.Path) -> None: |
| 670 | result = _branch(repo, "--json") |
| 671 | data = json.loads(result.output) |
| 672 | current = [b for b in data if b["current"]] |
| 673 | assert len(current) == 1 |
| 674 | assert current[0]["name"] == "main" |
| 675 | |
| 676 | def test_list_json_last_message_populated(self, repo: pathlib.Path) -> None: |
| 677 | result = _branch(repo, "--json") |
| 678 | data = json.loads(result.output) |
| 679 | main_entry = next(b for b in data if b["name"] == "main") |
| 680 | assert main_entry["last_message"] is not None |
| 681 | assert "initial" in main_entry["last_message"] |
| 682 | |
| 683 | def test_list_json_upstream_null_by_default(self, repo: pathlib.Path) -> None: |
| 684 | result = _branch(repo, "--json") |
| 685 | data = json.loads(result.output) |
| 686 | main_entry = next(b for b in data if b["name"] == "main") |
| 687 | assert main_entry["upstream"] is None |
| 688 | |
| 689 | def test_list_verbose_shows_sha(self, repo: pathlib.Path) -> None: |
| 690 | result = _branch(repo, "-v") |
| 691 | # Short SHA should appear |
| 692 | cid = get_head_commit_id(repo, "main") |
| 693 | assert cid is not None |
| 694 | assert cid[:8] in result.output |
| 695 | |
| 696 | def test_list_verbose_shows_message(self, repo: pathlib.Path) -> None: |
| 697 | result = _branch(repo, "-v") |
| 698 | assert "initial" in result.output |
| 699 | |
| 700 | def test_list_multiple_branches(self, repo: pathlib.Path) -> None: |
| 701 | _branch(repo, "feat/a") |
| 702 | _branch(repo, "feat/b") |
| 703 | result = _branch(repo, "--json") |
| 704 | data = json.loads(result.output) |
| 705 | names = [b["name"] for b in data] |
| 706 | assert "feat/a" in names |
| 707 | assert "feat/b" in names |
| 708 | |
| 709 | def test_list_sorted_by_name(self, repo: pathlib.Path) -> None: |
| 710 | _branch(repo, "z-last") |
| 711 | _branch(repo, "a-first") |
| 712 | result = _branch(repo, "--json") |
| 713 | data = json.loads(result.output) |
| 714 | names = [b["name"] for b in data] |
| 715 | assert names == sorted(names) |
| 716 | |
| 717 | def test_list_sort_committeddate(self, repo: pathlib.Path) -> None: |
| 718 | _branch(repo, "feat-x") |
| 719 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 720 | assert result.exit_code == 0 |
| 721 | data = json.loads(result.output) |
| 722 | assert isinstance(data, list) |
| 723 | |
| 724 | |
| 725 | # ────────────────────────────────────────────────────────────────────────────── |
| 726 | # Integration — FILTERS |
| 727 | # ────────────────────────────────────────────────────────────────────────────── |
| 728 | |
| 729 | |
| 730 | class TestFilters: |
| 731 | def test_merged_filter_includes_self(self, repo: pathlib.Path) -> None: |
| 732 | result = _branch(repo, "--merged", "--json") |
| 733 | data = json.loads(result.output) |
| 734 | names = [b["name"] for b in data] |
| 735 | assert "main" in names |
| 736 | |
| 737 | def test_merged_filter_excludes_unmerged(self, repo: pathlib.Path) -> None: |
| 738 | _branch(repo, "unmerged-feat") |
| 739 | _invoke(repo, ["checkout", "unmerged-feat"]) |
| 740 | (repo / "u.py").write_text("u=1\n") |
| 741 | _commit(repo, "-m", "unmerged") |
| 742 | _invoke(repo, ["checkout", "main"]) |
| 743 | result = _branch(repo, "--merged", "--json") |
| 744 | data = json.loads(result.output) |
| 745 | names = [b["name"] for b in data] |
| 746 | assert "unmerged-feat" not in names |
| 747 | |
| 748 | def test_no_merged_filter_includes_unmerged(self, repo: pathlib.Path) -> None: |
| 749 | _branch(repo, "exclusive-feat") |
| 750 | _invoke(repo, ["checkout", "exclusive-feat"]) |
| 751 | (repo / "e.py").write_text("e=1\n") |
| 752 | _commit(repo, "-m", "exclusive") |
| 753 | _invoke(repo, ["checkout", "main"]) |
| 754 | result = _branch(repo, "--no-merged", "--json") |
| 755 | data = json.loads(result.output) |
| 756 | names = [b["name"] for b in data] |
| 757 | assert "exclusive-feat" in names |
| 758 | |
| 759 | def test_no_merged_filter_excludes_self(self, repo: pathlib.Path) -> None: |
| 760 | result = _branch(repo, "--no-merged", "--json") |
| 761 | data = json.loads(result.output) |
| 762 | names = [b["name"] for b in data] |
| 763 | assert "main" not in names |
| 764 | |
| 765 | def test_contains_commit_filter(self, repo: pathlib.Path) -> None: |
| 766 | cid = get_head_commit_id(repo, "main") |
| 767 | assert cid is not None |
| 768 | result = _branch(repo, "--contains", cid, "--json") |
| 769 | data = json.loads(result.output) |
| 770 | names = [b["name"] for b in data] |
| 771 | assert "main" in names |
| 772 | |
| 773 | def test_contains_unknown_commit_empty(self, repo: pathlib.Path) -> None: |
| 774 | result = _branch(repo, "--contains", "a" * 64, "--json") |
| 775 | data = json.loads(result.output) |
| 776 | assert data == [] |
| 777 | |
| 778 | |
| 779 | # ────────────────────────────────────────────────────────────────────────────── |
| 780 | # Integration — validation |
| 781 | # ────────────────────────────────────────────────────────────────────────────── |
| 782 | |
| 783 | |
| 784 | class TestValidation: |
| 785 | def test_ansi_in_pattern_arg_sanitized(self, repo: pathlib.Path) -> None: |
| 786 | result = _branch(repo, "--pattern", "\x1b[31mxml\x1b[0m") |
| 787 | assert "\x1b" not in result.output |
| 788 | |
| 789 | def test_delete_without_name_exits_1(self, repo: pathlib.Path) -> None: |
| 790 | result = _branch(repo, "-d") |
| 791 | assert result.exit_code == 1 |
| 792 | |
| 793 | def test_rename_too_many_args_exits_1(self, repo: pathlib.Path) -> None: |
| 794 | result = _branch(repo, "-m", "a", "b", "c") |
| 795 | assert result.exit_code == 1 |
| 796 | |
| 797 | def test_copy_too_many_args_exits_1(self, repo: pathlib.Path) -> None: |
| 798 | result = _branch(repo, "-c", "a", "b", "c") |
| 799 | assert result.exit_code == 1 |
| 800 | |
| 801 | |
| 802 | # ────────────────────────────────────────────────────────────────────────────── |
| 803 | # Security — ANSI injection |
| 804 | # ────────────────────────────────────────────────────────────────────────────── |
| 805 | |
| 806 | |
| 807 | class TestSecurityAnsi: |
| 808 | def _has_ansi(self, s: str) -> bool: |
| 809 | return "\x1b[" in s |
| 810 | |
| 811 | def test_ansi_in_branch_name_rejected(self, repo: pathlib.Path) -> None: |
| 812 | result = _branch(repo, "\x1b[31mmalicious\x1b[0m") |
| 813 | assert result.exit_code == 1 |
| 814 | assert not self._has_ansi(result.output) |
| 815 | |
| 816 | def test_ansi_in_delete_name_rejected(self, repo: pathlib.Path) -> None: |
| 817 | result = _branch(repo, "-d", "\x1b[31mmalicious\x1b[0m") |
| 818 | assert result.exit_code == 1 |
| 819 | assert not self._has_ansi(result.output) |
| 820 | |
| 821 | def test_ansi_in_rename_new_name_rejected(self, repo: pathlib.Path) -> None: |
| 822 | result = _branch(repo, "-m", "\x1b[31mnew\x1b[0m") |
| 823 | assert result.exit_code == 1 |
| 824 | assert not self._has_ansi(result.output) |
| 825 | |
| 826 | def test_ansi_in_contains_arg_sanitized(self, repo: pathlib.Path) -> None: |
| 827 | result = _branch(repo, "--contains", "\x1b[31mxml\x1b[0m") |
| 828 | assert not self._has_ansi(result.output) |
| 829 | |
| 830 | def test_ansi_in_contains_commit_id(self, repo: pathlib.Path) -> None: |
| 831 | result = _branch(repo, "--contains", "\x1b[31mmalicious\x1b[0m") |
| 832 | # Should exit 0 (no match, empty list) or exit 0 with empty list |
| 833 | # Either way, ANSI must not appear in output |
| 834 | assert not self._has_ansi(result.output) |
| 835 | |
| 836 | def test_errors_go_to_stderr(self, repo: pathlib.Path) -> None: |
| 837 | result = _branch(repo, "-d", "nonexistent") |
| 838 | assert result.exit_code == 1 |
| 839 | # Error should NOT appear in stdout |
| 840 | assert "not found" not in result.output.lower() or (result.stderr and "not found" in result.stderr.lower()) |
| 841 | |
| 842 | |
| 843 | # ────────────────────────────────────────────────────────────────────────────── |
| 844 | # Stress |
| 845 | # ────────────────────────────────────────────────────────────────────────────── |
| 846 | |
| 847 | |
| 848 | @pytest.mark.slow |
| 849 | class TestStress: |
| 850 | def test_list_500_branches_fast(self, repo: pathlib.Path) -> None: |
| 851 | """Listing 500 branches must complete in under 2 seconds.""" |
| 852 | for i in range(500): |
| 853 | _branch(repo, f"feat/task-{i:04d}") |
| 854 | t0 = time.perf_counter() |
| 855 | result = _branch(repo, "--json") |
| 856 | elapsed = (time.perf_counter() - t0) * 1000 |
| 857 | data = json.loads(result.output) |
| 858 | assert len(data) == 501 # main + 500 |
| 859 | assert elapsed < 2000, f"list 500 branches took {elapsed:.0f}ms (limit 2000ms)" |
| 860 | |
| 861 | def test_merged_filter_100_branches(self, repo: pathlib.Path) -> None: |
| 862 | """--merged filter on 100 branches completes in reasonable time.""" |
| 863 | for i in range(100): |
| 864 | _branch(repo, f"task-{i:03d}") |
| 865 | t0 = time.perf_counter() |
| 866 | result = _branch(repo, "--merged", "--json") |
| 867 | elapsed = (time.perf_counter() - t0) * 1000 |
| 868 | data = json.loads(result.output) |
| 869 | # All branches share the same commit as main → all merged |
| 870 | assert len(data) == 101 |
| 871 | assert elapsed < 3000, f"--merged on 100 branches took {elapsed:.0f}ms" |
| 872 | |
| 873 | def test_sort_committeddate_100_branches(self, repo: pathlib.Path) -> None: |
| 874 | for i in range(100): |
| 875 | _branch(repo, f"sort-{i:03d}") |
| 876 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 877 | assert result.exit_code == 0 |
| 878 | data = json.loads(result.output) |
| 879 | assert len(data) == 101 |
| 880 | |
| 881 | def test_concurrent_branch_list_separate_repos(self, tmp_path: pathlib.Path) -> None: |
| 882 | errors: list[str] = [] |
| 883 | |
| 884 | def do_branch(idx: int) -> None: |
| 885 | repo_dir = tmp_path / f"repo_{idx}" |
| 886 | repo_dir.mkdir() |
| 887 | subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) |
| 888 | (repo_dir / "x.py").write_text(f"x={idx}\n") |
| 889 | subprocess.run( |
| 890 | ["muse", "commit", "-m", f"c{idx}"], |
| 891 | cwd=str(repo_dir), capture_output=True, |
| 892 | ) |
| 893 | for j in range(5): |
| 894 | subprocess.run( |
| 895 | ["muse", "branch", f"b{j}"], |
| 896 | cwd=str(repo_dir), capture_output=True, |
| 897 | ) |
| 898 | r = subprocess.run( |
| 899 | ["muse", "branch", "--json"], |
| 900 | cwd=str(repo_dir), capture_output=True, text=True, |
| 901 | ) |
| 902 | if r.returncode != 0: |
| 903 | errors.append(f"repo_{idx}: branch --json failed") |
| 904 | return |
| 905 | data = json.loads(r.stdout) |
| 906 | if len(data) != 6: # main + 5 |
| 907 | errors.append(f"repo_{idx}: expected 6 branches, got {len(data)}") |
| 908 | |
| 909 | threads = [threading.Thread(target=do_branch, args=(i,)) for i in range(6)] |
| 910 | for t in threads: |
| 911 | t.start() |
| 912 | for t in threads: |
| 913 | t.join() |
| 914 | assert not errors, f"Concurrent branch errors:\n{'\n'.join(errors)}" |
| 915 | |
| 916 | def test_deep_ancestor_chain_is_merged(self, repo: pathlib.Path) -> None: |
| 917 | """A branch with 50 ancestors is correctly detected as merged.""" |
| 918 | _branch(repo, "long-chain") |
| 919 | _invoke(repo, ["checkout", "long-chain"]) |
| 920 | for i in range(50): |
| 921 | (repo / f"step_{i:03d}.py").write_text(f"s={i}\n") |
| 922 | _commit(repo, "-m", f"step {i}") |
| 923 | _invoke(repo, ["checkout", "main"]) |
| 924 | _invoke(repo, ["merge", "long-chain"]) |
| 925 | result = _branch(repo, "--merged", "--json") |
| 926 | data = json.loads(result.output) |
| 927 | names = [b["name"] for b in data] |
| 928 | assert "long-chain" in names |
| 929 | |
| 930 | def test_merged_filter_ancestor_set_computed_once( |
| 931 | self, repo: pathlib.Path |
| 932 | ) -> None: |
| 933 | """--merged must compute the 'into' ancestor set once, not once per branch. |
| 934 | |
| 935 | With N branches, the naive implementation calls _commit_ancestors N times |
| 936 | for the same 'into' tip. The fix pre-computes it once and checks each |
| 937 | branch tip against the cached set. |
| 938 | """ |
| 939 | from unittest.mock import patch |
| 940 | import muse.cli.commands.branch as branch_module |
| 941 | |
| 942 | for i in range(20): |
| 943 | _branch(repo, f"feat-{i:02d}") |
| 944 | |
| 945 | with patch.object( |
| 946 | branch_module, "_commit_ancestors", wraps=branch_module._commit_ancestors |
| 947 | ) as mock_ca: |
| 948 | result = _branch(repo, "--merged", "--json") |
| 949 | |
| 950 | assert result.exit_code == 0 |
| 951 | data = json.loads(result.output) |
| 952 | assert len(data) == 21 # main + 20 |
| 953 | |
| 954 | # The ancestor set for 'main' (the into branch) must be computed exactly once, |
| 955 | # not once per branch being checked. |
| 956 | into_calls = [c for c in mock_ca.call_args_list if c.args[1] != ""] |
| 957 | # All calls with the same commit_id (main's tip) should collapse to 1. |
| 958 | unique_commit_ids = {c.args[1] for c in mock_ca.call_args_list} |
| 959 | assert len(mock_ca.call_args_list) <= len(unique_commit_ids) + 1, ( |
| 960 | f"_commit_ancestors called {len(mock_ca.call_args_list)}× but only " |
| 961 | f"{len(unique_commit_ids)} unique commit IDs — ancestor set is being " |
| 962 | "recomputed per branch instead of once" |
| 963 | ) |
File History
3 commits
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29
fix(branch): guard -d --dry-run against destructive writes;…
Sonnet 4.6
patch
8 days ago
sha256:144495ae90498f9d51aeac4bf7f9ee9e9502d14474745a856b96921f7349c0ce
next round of fixing tests
Human
14 days ago
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103
feat: branch --prune-config, fix hub repo delete docstrings…
Sonnet 4.6
minor
⚠
15 days ago