test_cmd_branch.py
python
sha256:144495ae90498f9d51aeac4bf7f9ee9e9502d14474745a856b96921f7349c0ce
next round of fixing tests
Human
3 days ago
| 1 | """Tests for ``muse branch``. |
| 2 | |
| 3 | Coverage tiers |
| 4 | -------------- |
| 5 | Unit — parser flags, dead-code removal, helpers (_resolve_start_point, |
| 6 | _list_local_branches, _list_remotes, _upstream_for, |
| 7 | _commit_ancestors, _is_merged, _contains_commit, |
| 8 | _cleanup_empty_dirs). |
| 9 | Integration — create, delete, force-delete, rename, force-rename, copy, |
| 10 | force-copy, listing, filtering, sorting. |
| 11 | End-to-end — full CLI invocations: text and JSON output, all operations. |
| 12 | Security — ANSI injection in branch names, format flags, messages. |
| 13 | Stress — 500 branches, concurrent list, deep ancestry chains. |
| 14 | """ |
| 15 | |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import json |
| 19 | import os |
| 20 | import pathlib |
| 21 | import subprocess |
| 22 | import threading |
| 23 | import time |
| 24 | from typing import TYPE_CHECKING |
| 25 | |
| 26 | import pytest |
| 27 | |
| 28 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 29 | from muse.core.refs import ( |
| 30 | get_head_commit_id, |
| 31 | read_current_branch, |
| 32 | ) |
| 33 | from muse.core.types import short_id |
| 34 | from muse.core.paths import heads_dir, logs_dir |
| 35 | |
| 36 | if TYPE_CHECKING: |
| 37 | import argparse |
| 38 | |
| 39 | runner = CliRunner() |
| 40 | |
| 41 | # ────────────────────────────────────────────────────────────────────────────── |
| 42 | # Helpers |
| 43 | # ────────────────────────────────────────────────────────────────────────────── |
| 44 | |
| 45 | |
| 46 | def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: |
| 47 | saved = os.getcwd() |
| 48 | try: |
| 49 | os.chdir(repo) |
| 50 | return runner.invoke(None, args) |
| 51 | finally: |
| 52 | os.chdir(saved) |
| 53 | |
| 54 | |
| 55 | def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 56 | return _invoke(repo, ["branch", *extra]) |
| 57 | |
| 58 | |
| 59 | def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 60 | _invoke(repo, ["code", "add", "."]) |
| 61 | return _invoke(repo, ["commit", *extra]) |
| 62 | |
| 63 | |
| 64 | @pytest.fixture() |
| 65 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 66 | """Initialised repo with one commit on ``main``.""" |
| 67 | saved = os.getcwd() |
| 68 | try: |
| 69 | os.chdir(tmp_path) |
| 70 | runner.invoke(None, ["init"]) |
| 71 | finally: |
| 72 | os.chdir(saved) |
| 73 | (tmp_path / "a.py").write_text("x = 1\n") |
| 74 | _commit(tmp_path, "-m", "initial") |
| 75 | return tmp_path |
| 76 | |
| 77 | |
| 78 | @pytest.fixture() |
| 79 | def two_commit_repo(repo: pathlib.Path) -> pathlib.Path: |
| 80 | """Repo with two commits on ``main``.""" |
| 81 | (repo / "b.py").write_text("y = 2\n") |
| 82 | _commit(repo, "-m", "second") |
| 83 | return repo |
| 84 | |
| 85 | |
| 86 | # ────────────────────────────────────────────────────────────────────────────── |
| 87 | # Unit — parser flags |
| 88 | # ────────────────────────────────────────────────────────────────────────────── |
| 89 | |
| 90 | |
| 91 | class TestRegisterFlags: |
| 92 | def _parse(self, *args: str) -> "argparse.Namespace": |
| 93 | import argparse |
| 94 | |
| 95 | from muse.cli.commands.branch import register |
| 96 | |
| 97 | p = argparse.ArgumentParser() |
| 98 | sub = p.add_subparsers() |
| 99 | register(sub) |
| 100 | return p.parse_args(["branch", *args]) |
| 101 | |
| 102 | def test_default_json_out_is_false(self) -> None: |
| 103 | ns = self._parse() |
| 104 | assert ns.json_out is False |
| 105 | |
| 106 | def test_json_flag_sets_json_out(self) -> None: |
| 107 | ns = self._parse("--json") |
| 108 | assert ns.json_out is True |
| 109 | |
| 110 | def test_j_shorthand_sets_json_out(self) -> None: |
| 111 | ns = self._parse("-j") |
| 112 | assert ns.json_out is True |
| 113 | |
| 114 | def test_delete_flag(self) -> None: |
| 115 | ns = self._parse("-d", "foo") |
| 116 | assert ns.op == "delete" |
| 117 | |
| 118 | def test_force_delete_flag(self) -> None: |
| 119 | ns = self._parse("-D", "foo") |
| 120 | assert ns.op == "force_delete" |
| 121 | |
| 122 | def test_rename_flag(self) -> None: |
| 123 | ns = self._parse("-m", "new") |
| 124 | assert ns.op == "rename" |
| 125 | |
| 126 | def test_force_rename_flag(self) -> None: |
| 127 | ns = self._parse("-M", "new") |
| 128 | assert ns.op == "force_rename" |
| 129 | |
| 130 | def test_copy_flag(self) -> None: |
| 131 | ns = self._parse("-c", "copy") |
| 132 | assert ns.op == "copy" |
| 133 | |
| 134 | def test_force_copy_flag(self) -> None: |
| 135 | ns = self._parse("-C", "copy") |
| 136 | assert ns.op == "force_copy" |
| 137 | |
| 138 | def test_verbose_default_0(self) -> None: |
| 139 | ns = self._parse() |
| 140 | assert ns.verbose == 0 |
| 141 | |
| 142 | def test_verbose_v_is_1(self) -> None: |
| 143 | ns = self._parse("-v") |
| 144 | assert ns.verbose == 1 |
| 145 | |
| 146 | def test_verbose_vv_is_2(self) -> None: |
| 147 | ns = self._parse("-vv") |
| 148 | assert ns.verbose == 2 |
| 149 | |
| 150 | def test_remotes_flag(self) -> None: |
| 151 | ns = self._parse("-r") |
| 152 | assert ns.remotes is True |
| 153 | |
| 154 | def test_all_flag(self) -> None: |
| 155 | ns = self._parse("-a") |
| 156 | assert ns.all_branches is True |
| 157 | |
| 158 | def test_sort_default_name(self) -> None: |
| 159 | ns = self._parse() |
| 160 | assert ns.sort == "name" |
| 161 | |
| 162 | def test_sort_committeddate(self) -> None: |
| 163 | ns = self._parse("--sort", "committeddate") |
| 164 | assert ns.sort == "committeddate" |
| 165 | |
| 166 | def test_sort_invalid_rejected(self) -> None: |
| 167 | import argparse |
| 168 | |
| 169 | from muse.cli.commands.branch import register |
| 170 | |
| 171 | p = argparse.ArgumentParser() |
| 172 | sub = p.add_subparsers() |
| 173 | register(sub) |
| 174 | with pytest.raises(SystemExit): |
| 175 | p.parse_args(["branch", "--sort", "invalid"]) |
| 176 | |
| 177 | |
| 178 | # ────────────────────────────────────────────────────────────────────────────── |
| 179 | # Unit — dead-code removal |
| 180 | # ────────────────────────────────────────────────────────────────────────────── |
| 181 | |
| 182 | |
| 183 | class TestDeadCodeRemoved: |
| 184 | def test_op_list_branch_removed(self) -> None: |
| 185 | import inspect |
| 186 | |
| 187 | import muse.cli.commands.branch as m |
| 188 | |
| 189 | src = inspect.getsource(m.run) |
| 190 | assert 'op == "list"' not in src, ( |
| 191 | 'op == "list" was a dead branch (nothing in register() creates it); must be deleted' |
| 192 | ) |
| 193 | |
| 194 | def test_inline_tomllib_import_removed(self) -> None: |
| 195 | import inspect |
| 196 | |
| 197 | import muse.cli.commands.branch as m |
| 198 | |
| 199 | src = inspect.getsource(m._upstream_for) |
| 200 | assert "import tomllib" not in src, ( |
| 201 | "inline 'import tomllib' inside _upstream_for should be a module-level import" |
| 202 | ) |
| 203 | |
| 204 | def test_double_sanitize_removed(self) -> None: |
| 205 | """The verbose listing previously double-sanitized name_str (stripping ANSI).""" |
| 206 | import inspect |
| 207 | |
| 208 | import muse.cli.commands.branch as m |
| 209 | |
| 210 | src = inspect.getsource(m.run) |
| 211 | assert "sanitize_display(name_str)" not in src, ( |
| 212 | "name_str was double-sanitized; the second call stripped ANSI from current branch" |
| 213 | ) |
| 214 | |
| 215 | |
| 216 | # ────────────────────────────────────────────────────────────────────────────── |
| 217 | # Unit — _resolve_start_point |
| 218 | # ────────────────────────────────────────────────────────────────────────────── |
| 219 | |
| 220 | |
| 221 | class TestResolveStartPoint: |
| 222 | def test_resolves_branch_name(self, repo: pathlib.Path) -> None: |
| 223 | from muse.cli.commands.branch import _resolve_start_point |
| 224 | |
| 225 | cid = get_head_commit_id(repo, "main") |
| 226 | result = _resolve_start_point(repo, "main", "main") |
| 227 | assert result == cid |
| 228 | |
| 229 | def test_resolves_full_sha(self, repo: pathlib.Path) -> None: |
| 230 | from muse.cli.commands.branch import _resolve_start_point |
| 231 | |
| 232 | cid = get_head_commit_id(repo, "main") |
| 233 | assert cid is not None |
| 234 | result = _resolve_start_point(repo, "main", cid) |
| 235 | assert result == cid |
| 236 | |
| 237 | def test_resolves_partial_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 238 | from muse.cli.commands.branch import _resolve_start_point |
| 239 | from muse.core.refs import read_current_branch |
| 240 | from muse.core.commits import get_commits_for_branch |
| 241 | |
| 242 | repo = two_commit_repo |
| 243 | branch = read_current_branch(repo) |
| 244 | commits = get_commits_for_branch(repo, branch) |
| 245 | first_sha = commits[-1].commit_id # oldest commit |
| 246 | # 12-char prefix should resolve |
| 247 | result = _resolve_start_point(repo, "main", short_id(first_sha)) |
| 248 | assert result == first_sha |
| 249 | |
| 250 | def test_returns_input_for_unresolvable(self, repo: pathlib.Path) -> None: |
| 251 | from muse.cli.commands.branch import _resolve_start_point |
| 252 | |
| 253 | result = _resolve_start_point(repo, "main", "nonexistent-ref") |
| 254 | assert result == "nonexistent-ref" |
| 255 | |
| 256 | |
| 257 | # ────────────────────────────────────────────────────────────────────────────── |
| 258 | # Unit — _list_local_branches |
| 259 | # ────────────────────────────────────────────────────────────────────────────── |
| 260 | |
| 261 | |
| 262 | class TestListLocalBranches: |
| 263 | def test_returns_sorted_list(self, repo: pathlib.Path) -> None: |
| 264 | from muse.cli.commands.branch import _list_local_branches |
| 265 | |
| 266 | _branch(repo, "z-last") |
| 267 | _branch(repo, "a-first") |
| 268 | branches = _list_local_branches(repo) |
| 269 | assert branches == sorted(branches) |
| 270 | |
| 271 | def test_skips_hidden_files(self, repo: pathlib.Path) -> None: |
| 272 | from muse.cli.commands.branch import _list_local_branches |
| 273 | |
| 274 | # Plant a hidden lock file inside refs/heads/ |
| 275 | lock = heads_dir(repo) / ".lock" |
| 276 | lock.write_text("locked") |
| 277 | branches = _list_local_branches(repo) |
| 278 | assert ".lock" not in branches |
| 279 | assert not any(b.startswith(".") for b in branches) |
| 280 | |
| 281 | def test_empty_repo_returns_empty(self, tmp_path: pathlib.Path) -> None: |
| 282 | from muse.cli.commands.branch import _list_local_branches |
| 283 | |
| 284 | assert _list_local_branches(tmp_path) == [] |
| 285 | |
| 286 | def test_includes_nested_branches(self, repo: pathlib.Path) -> None: |
| 287 | from muse.cli.commands.branch import _list_local_branches |
| 288 | |
| 289 | _branch(repo, "feat/sub/task") |
| 290 | branches = _list_local_branches(repo) |
| 291 | assert "feat/sub/task" in branches |
| 292 | |
| 293 | |
| 294 | # ────────────────────────────────────────────────────────────────────────────── |
| 295 | # Unit — _commit_ancestors, _is_merged, _contains_commit |
| 296 | # ────────────────────────────────────────────────────────────────────────────── |
| 297 | |
| 298 | |
| 299 | class TestCommitGraph: |
| 300 | def test_commit_ancestors_includes_self(self, repo: pathlib.Path) -> None: |
| 301 | from muse.cli.commands.branch import _commit_ancestors |
| 302 | |
| 303 | cid = get_head_commit_id(repo, "main") |
| 304 | assert cid is not None |
| 305 | ancestors = _commit_ancestors(repo, cid) |
| 306 | assert cid in ancestors |
| 307 | |
| 308 | def test_is_merged_true_for_same_branch(self, repo: pathlib.Path) -> None: |
| 309 | from muse.cli.commands.branch import _is_merged |
| 310 | |
| 311 | assert _is_merged(repo, "main", "main") |
| 312 | |
| 313 | def test_is_merged_false_for_unmerged(self, repo: pathlib.Path) -> None: |
| 314 | from muse.cli.commands.branch import _is_merged |
| 315 | |
| 316 | _branch(repo, "feat") |
| 317 | _invoke(repo, ["checkout", "feat"]) |
| 318 | (repo / "c.py").write_text("c=1\n") |
| 319 | _commit(repo, "-m", "feat commit") |
| 320 | _invoke(repo, ["checkout", "main"]) |
| 321 | assert not _is_merged(repo, "feat", "main") |
| 322 | |
| 323 | def test_contains_commit_true(self, repo: pathlib.Path) -> None: |
| 324 | from muse.cli.commands.branch import _contains_commit |
| 325 | |
| 326 | cid = get_head_commit_id(repo, "main") |
| 327 | assert cid is not None |
| 328 | assert _contains_commit(repo, "main", cid) |
| 329 | |
| 330 | def test_contains_commit_false_for_unknown(self, repo: pathlib.Path) -> None: |
| 331 | from muse.cli.commands.branch import _contains_commit |
| 332 | |
| 333 | assert not _contains_commit(repo, "main", "a" * 64) |
| 334 | |
| 335 | |
| 336 | # ────────────────────────────────────────────────────────────────────────────── |
| 337 | # Integration — CREATE |
| 338 | # ────────────────────────────────────────────────────────────────────────────── |
| 339 | |
| 340 | |
| 341 | class TestCreate: |
| 342 | def test_create_basic_exits_0(self, repo: pathlib.Path) -> None: |
| 343 | result = _branch(repo, "new-branch") |
| 344 | assert result.exit_code == 0 |
| 345 | |
| 346 | def test_create_text_output(self, repo: pathlib.Path) -> None: |
| 347 | result = _branch(repo, "my-branch") |
| 348 | assert "my-branch" in result.output |
| 349 | |
| 350 | def test_create_json_schema(self, repo: pathlib.Path) -> None: |
| 351 | result = _branch(repo, "json-branch", "--json") |
| 352 | data = json.loads(result.output) |
| 353 | assert data["action"] == "created" |
| 354 | assert data["branch"] == "json-branch" |
| 355 | assert "commit_id" in data |
| 356 | assert "from" in data |
| 357 | |
| 358 | def test_create_json_from_is_none_at_head(self, repo: pathlib.Path) -> None: |
| 359 | result = _branch(repo, "from-head", "--json") |
| 360 | data = json.loads(result.output) |
| 361 | assert data["from"] is None |
| 362 | |
| 363 | def test_create_at_full_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 364 | repo = two_commit_repo |
| 365 | from muse.core.refs import read_current_branch |
| 366 | from muse.core.commits import get_commits_for_branch |
| 367 | |
| 368 | branch = read_current_branch(repo) |
| 369 | commits = get_commits_for_branch(repo, branch) |
| 370 | first_sha = commits[-1].commit_id |
| 371 | |
| 372 | result = _branch(repo, "at-sha", first_sha) |
| 373 | assert result.exit_code == 0 |
| 374 | tip = get_head_commit_id(repo, "at-sha") |
| 375 | assert tip == first_sha |
| 376 | |
| 377 | def test_create_at_partial_sha(self, two_commit_repo: pathlib.Path) -> None: |
| 378 | repo = two_commit_repo |
| 379 | from muse.core.refs import read_current_branch |
| 380 | from muse.core.commits import get_commits_for_branch |
| 381 | |
| 382 | branch = read_current_branch(repo) |
| 383 | commits = get_commits_for_branch(repo, branch) |
| 384 | first_sha = commits[-1].commit_id |
| 385 | |
| 386 | result = _branch(repo, "at-partial", short_id(first_sha)) |
| 387 | assert result.exit_code == 0 |
| 388 | tip = get_head_commit_id(repo, "at-partial") |
| 389 | assert tip == first_sha |
| 390 | |
| 391 | def test_create_at_branch_name(self, repo: pathlib.Path) -> None: |
| 392 | head_cid = get_head_commit_id(repo, "main") |
| 393 | result = _branch(repo, "copy-of-main", "main") |
| 394 | assert result.exit_code == 0 |
| 395 | tip = get_head_commit_id(repo, "copy-of-main") |
| 396 | assert tip == head_cid |
| 397 | |
| 398 | def test_create_json_from_field_populated(self, repo: pathlib.Path) -> None: |
| 399 | result = _branch(repo, "with-from", "main", "--json") |
| 400 | data = json.loads(result.output) |
| 401 | assert data["from"] == "main" |
| 402 | |
| 403 | def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None: |
| 404 | _branch(repo, "dup") |
| 405 | result = _branch(repo, "dup") |
| 406 | assert result.exit_code == 1 |
| 407 | |
| 408 | def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None: |
| 409 | result = _branch(repo, "bad..name") |
| 410 | assert result.exit_code == 1 |
| 411 | |
| 412 | def test_create_does_not_checkout(self, repo: pathlib.Path) -> None: |
| 413 | _branch(repo, "new-but-no-switch") |
| 414 | assert read_current_branch(repo) == "main" |
| 415 | |
| 416 | |
| 417 | # ────────────────────────────────────────────────────────────────────────────── |
| 418 | # Integration — DELETE |
| 419 | # ────────────────────────────────────────────────────────────────────────────── |
| 420 | |
| 421 | |
| 422 | class TestDelete: |
| 423 | def test_delete_merged_branch_exits_0(self, repo: pathlib.Path) -> None: |
| 424 | _branch(repo, "to-delete") |
| 425 | # Branch points to same commit as main → considered merged |
| 426 | result = _branch(repo, "-d", "to-delete") |
| 427 | assert result.exit_code == 0 |
| 428 | |
| 429 | def test_delete_json_schema(self, repo: pathlib.Path) -> None: |
| 430 | _branch(repo, "del-json") |
| 431 | result = _branch(repo, "-d", "del-json", "--json") |
| 432 | data = json.loads(result.output) |
| 433 | assert data["action"] == "deleted" |
| 434 | assert data["branch"] == "del-json" |
| 435 | assert "was" in data |
| 436 | |
| 437 | def test_delete_unmerged_exits_1_without_force(self, repo: pathlib.Path) -> None: |
| 438 | _branch(repo, "unmerged") |
| 439 | _invoke(repo, ["checkout", "unmerged"]) |
| 440 | (repo / "z.py").write_text("z=1\n") |
| 441 | _commit(repo, "-m", "unmerged work") |
| 442 | _invoke(repo, ["checkout", "main"]) |
| 443 | result = _branch(repo, "-d", "unmerged") |
| 444 | assert result.exit_code == 1 |
| 445 | |
| 446 | def test_force_delete_unmerged_exits_0(self, repo: pathlib.Path) -> None: |
| 447 | _branch(repo, "force-del") |
| 448 | _invoke(repo, ["checkout", "force-del"]) |
| 449 | (repo / "x.py").write_text("x=1\n") |
| 450 | _commit(repo, "-m", "exclusive work") |
| 451 | _invoke(repo, ["checkout", "main"]) |
| 452 | result = _branch(repo, "-D", "force-del") |
| 453 | assert result.exit_code == 0 |
| 454 | |
| 455 | def test_delete_current_branch_exits_1(self, repo: pathlib.Path) -> None: |
| 456 | result = _branch(repo, "-d", "main") |
| 457 | assert result.exit_code == 1 |
| 458 | |
| 459 | def test_delete_nonexistent_exits_1(self, repo: pathlib.Path) -> None: |
| 460 | result = _branch(repo, "-d", "ghost") |
| 461 | assert result.exit_code == 1 |
| 462 | |
| 463 | def test_delete_removes_branch_from_list(self, repo: pathlib.Path) -> None: |
| 464 | _branch(repo, "temp") |
| 465 | _branch(repo, "-d", "temp") |
| 466 | result = _branch(repo, "--json") |
| 467 | names = [b["name"] for b in json.loads(result.output)] |
| 468 | assert "temp" not in names |
| 469 | |
| 470 | def test_delete_nested_branch_cleans_empty_dirs(self, repo: pathlib.Path) -> None: |
| 471 | _branch(repo, "feat/sub/task") |
| 472 | _branch(repo, "-D", "feat/sub/task") |
| 473 | # The feat/ and feat/sub/ dirs should be gone |
| 474 | feat_dir = heads_dir(repo) / "feat" |
| 475 | assert not feat_dir.exists() |
| 476 | |
| 477 | def test_delete_removes_reflog_file(self, repo: pathlib.Path) -> None: |
| 478 | """Deleting a branch removes its reflog file — git-idiomatic behaviour.""" |
| 479 | _invoke(repo, ["checkout", "-b", "bye"]) # checkout writes the reflog |
| 480 | _invoke(repo, ["checkout", "main"]) |
| 481 | reflog = logs_dir(repo) / "refs" / "heads" / "bye" |
| 482 | assert reflog.exists(), "reflog should exist after checkout -b" |
| 483 | _branch(repo, "-d", "bye") |
| 484 | assert not reflog.exists(), "reflog must be deleted when branch is deleted" |
| 485 | |
| 486 | def test_delete_nested_branch_removes_reflog_and_empty_dirs( |
| 487 | self, repo: pathlib.Path |
| 488 | ) -> None: |
| 489 | """Nested branch deletion removes reflog file and its empty parent dirs.""" |
| 490 | _invoke(repo, ["checkout", "-b", "feat/ui/button"]) |
| 491 | _invoke(repo, ["checkout", "main"]) |
| 492 | reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "ui" / "button" |
| 493 | assert reflog.exists(), "reflog should exist after checkout -b" |
| 494 | _branch(repo, "-D", "feat/ui/button") |
| 495 | assert not reflog.exists() |
| 496 | log_feat_dir = logs_dir(repo) / "refs" / "heads" / "feat" |
| 497 | assert not log_feat_dir.exists(), "empty reflog parent dirs must be cleaned up" |
| 498 | |
| 499 | def test_force_delete_also_removes_reflog(self, repo: pathlib.Path) -> None: |
| 500 | """-D (force delete) removes the reflog just like -d.""" |
| 501 | _invoke(repo, ["checkout", "-b", "force-log"]) |
| 502 | (repo / "tmp.py").write_text("x=1\n") |
| 503 | _commit(repo, "-m", "unmerged") |
| 504 | _invoke(repo, ["checkout", "main"]) |
| 505 | reflog = logs_dir(repo) / "refs" / "heads" / "force-log" |
| 506 | assert reflog.exists() |
| 507 | _branch(repo, "-D", "force-log") |
| 508 | assert not reflog.exists() |
| 509 | |
| 510 | |
| 511 | # ────────────────────────────────────────────────────────────────────────────── |
| 512 | # Integration — CREATE REFLOG |
| 513 | # ────────────────────────────────────────────────────────────────────────────── |
| 514 | |
| 515 | |
| 516 | class TestCreateReflog: |
| 517 | def test_branch_create_writes_reflog(self, repo: pathlib.Path) -> None: |
| 518 | """muse branch -b writes a reflog entry — git-idiomatic behaviour.""" |
| 519 | _branch(repo, "feat/new") |
| 520 | reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "new" |
| 521 | assert reflog.exists(), "reflog must exist after muse branch -b" |
| 522 | |
| 523 | def test_branch_create_reflog_contains_branch_created(self, repo: pathlib.Path) -> None: |
| 524 | """Reflog entry records a 'branch: Created' operation.""" |
| 525 | _branch(repo, "task/thing") |
| 526 | reflog = logs_dir(repo) / "refs" / "heads" / "task" / "thing" |
| 527 | content = reflog.read_text(encoding="utf-8") |
| 528 | assert "branch: Created" in content |
| 529 | |
| 530 | def test_branch_create_reflog_records_start_point(self, repo: pathlib.Path) -> None: |
| 531 | """Reflog entry for a branch created from another branch names that source.""" |
| 532 | _branch(repo, "task/from-main", "main") |
| 533 | reflog = logs_dir(repo) / "refs" / "heads" / "task" / "from-main" |
| 534 | content = reflog.read_text(encoding="utf-8") |
| 535 | assert "main" in content |
| 536 | |
| 537 | |
| 538 | # ────────────────────────────────────────────────────────────────────────────── |
| 539 | # Integration — RENAME |
| 540 | # ────────────────────────────────────────────────────────────────────────────── |
| 541 | |
| 542 | |
| 543 | class TestRename: |
| 544 | def test_rename_basic(self, repo: pathlib.Path) -> None: |
| 545 | _branch(repo, "old-name") |
| 546 | result = _branch(repo, "-m", "old-name", "new-name") |
| 547 | assert result.exit_code == 0 |
| 548 | names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] |
| 549 | assert "new-name" in names |
| 550 | assert "old-name" not in names |
| 551 | |
| 552 | def test_rename_omit_old_uses_current(self, repo: pathlib.Path) -> None: |
| 553 | _branch(repo, "temp") |
| 554 | _invoke(repo, ["checkout", "temp"]) |
| 555 | result = _branch(repo, "-m", "renamed") |
| 556 | assert result.exit_code == 0 |
| 557 | assert read_current_branch(repo) == "renamed" |
| 558 | _invoke(repo, ["checkout", "main"]) |
| 559 | |
| 560 | def test_rename_json_schema(self, repo: pathlib.Path) -> None: |
| 561 | _branch(repo, "src") |
| 562 | result = _branch(repo, "-m", "src", "dst", "--json") |
| 563 | data = json.loads(result.output) |
| 564 | assert data["action"] == "renamed" |
| 565 | assert data["from"] == "src" |
| 566 | assert data["to"] == "dst" |
| 567 | |
| 568 | def test_rename_to_existing_exits_1(self, repo: pathlib.Path) -> None: |
| 569 | _branch(repo, "a") |
| 570 | _branch(repo, "b") |
| 571 | result = _branch(repo, "-m", "a", "b") |
| 572 | assert result.exit_code == 1 |
| 573 | |
| 574 | def test_force_rename_to_existing_exits_0(self, repo: pathlib.Path) -> None: |
| 575 | _branch(repo, "a") |
| 576 | _branch(repo, "b") |
| 577 | result = _branch(repo, "-M", "a", "b") |
| 578 | assert result.exit_code == 0 |
| 579 | |
| 580 | def test_rename_updates_head_when_current(self, repo: pathlib.Path) -> None: |
| 581 | _branch(repo, "temp2") |
| 582 | _invoke(repo, ["checkout", "temp2"]) |
| 583 | _branch(repo, "-m", "temp2", "newname") |
| 584 | assert read_current_branch(repo) == "newname" |
| 585 | _invoke(repo, ["checkout", "main"]) |
| 586 | |
| 587 | def test_rename_nonexistent_exits_1(self, repo: pathlib.Path) -> None: |
| 588 | result = _branch(repo, "-m", "ghost", "newname") |
| 589 | assert result.exit_code == 1 |
| 590 | |
| 591 | |
| 592 | # ────────────────────────────────────────────────────────────────────────────── |
| 593 | # Integration — COPY |
| 594 | # ────────────────────────────────────────────────────────────────────────────── |
| 595 | |
| 596 | |
| 597 | class TestCopy: |
| 598 | def test_copy_basic(self, repo: pathlib.Path) -> None: |
| 599 | _branch(repo, "orig") |
| 600 | result = _branch(repo, "-c", "orig", "clone") |
| 601 | assert result.exit_code == 0 |
| 602 | names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] |
| 603 | assert "orig" in names |
| 604 | assert "clone" in names |
| 605 | |
| 606 | def test_copy_same_tip(self, repo: pathlib.Path) -> None: |
| 607 | _branch(repo, "src") |
| 608 | _branch(repo, "-c", "src", "dst") |
| 609 | tip_src = get_head_commit_id(repo, "src") |
| 610 | tip_dst = get_head_commit_id(repo, "dst") |
| 611 | assert tip_src == tip_dst |
| 612 | |
| 613 | def test_copy_json_schema(self, repo: pathlib.Path) -> None: |
| 614 | _branch(repo, "original") |
| 615 | result = _branch(repo, "-c", "original", "copy1", "--json") |
| 616 | data = json.loads(result.output) |
| 617 | assert data["action"] == "copied" |
| 618 | assert data["from"] == "original" |
| 619 | assert data["to"] == "copy1" |
| 620 | |
| 621 | def test_copy_to_existing_exits_1(self, repo: pathlib.Path) -> None: |
| 622 | _branch(repo, "x") |
| 623 | _branch(repo, "y") |
| 624 | result = _branch(repo, "-c", "x", "y") |
| 625 | assert result.exit_code == 1 |
| 626 | |
| 627 | def test_force_copy_to_existing_exits_0(self, repo: pathlib.Path) -> None: |
| 628 | _branch(repo, "p") |
| 629 | _branch(repo, "q") |
| 630 | result = _branch(repo, "-C", "p", "q") |
| 631 | assert result.exit_code == 0 |
| 632 | |
| 633 | def test_copy_omit_src_uses_current(self, repo: pathlib.Path) -> None: |
| 634 | head = get_head_commit_id(repo, "main") |
| 635 | result = _branch(repo, "-c", "main-copy") |
| 636 | assert result.exit_code == 0 |
| 637 | tip = get_head_commit_id(repo, "main-copy") |
| 638 | assert tip == head |
| 639 | |
| 640 | |
| 641 | # ────────────────────────────────────────────────────────────────────────────── |
| 642 | # Integration — LIST |
| 643 | # ────────────────────────────────────────────────────────────────────────────── |
| 644 | |
| 645 | |
| 646 | class TestList: |
| 647 | def test_list_text_exits_0(self, repo: pathlib.Path) -> None: |
| 648 | result = _branch(repo) |
| 649 | assert result.exit_code == 0 |
| 650 | |
| 651 | def test_list_contains_main(self, repo: pathlib.Path) -> None: |
| 652 | result = _branch(repo) |
| 653 | assert "main" in result.output |
| 654 | |
| 655 | def test_list_marks_current_branch(self, repo: pathlib.Path) -> None: |
| 656 | result = _branch(repo) |
| 657 | # Current branch line must start with "* " |
| 658 | current_lines = [l for l in result.output.splitlines() if l.startswith("* ")] |
| 659 | assert len(current_lines) == 1 |
| 660 | assert "main" in current_lines[0] |
| 661 | |
| 662 | def test_list_json_schema(self, repo: pathlib.Path) -> None: |
| 663 | result = _branch(repo, "--json") |
| 664 | data = json.loads(result.output) |
| 665 | assert isinstance(data, list) |
| 666 | assert len(data) >= 1 |
| 667 | keys = set(data[0].keys()) |
| 668 | assert {"name", "current", "commit_id", "last_message", "upstream"} <= keys |
| 669 | |
| 670 | def test_list_json_current_flag(self, repo: pathlib.Path) -> None: |
| 671 | result = _branch(repo, "--json") |
| 672 | data = json.loads(result.output) |
| 673 | current = [b for b in data if b["current"]] |
| 674 | assert len(current) == 1 |
| 675 | assert current[0]["name"] == "main" |
| 676 | |
| 677 | def test_list_json_last_message_populated(self, repo: pathlib.Path) -> None: |
| 678 | result = _branch(repo, "--json") |
| 679 | data = json.loads(result.output) |
| 680 | main_entry = next(b for b in data if b["name"] == "main") |
| 681 | assert main_entry["last_message"] is not None |
| 682 | assert "initial" in main_entry["last_message"] |
| 683 | |
| 684 | def test_list_json_upstream_null_by_default(self, repo: pathlib.Path) -> None: |
| 685 | result = _branch(repo, "--json") |
| 686 | data = json.loads(result.output) |
| 687 | main_entry = next(b for b in data if b["name"] == "main") |
| 688 | assert main_entry["upstream"] is None |
| 689 | |
| 690 | def test_list_verbose_shows_sha(self, repo: pathlib.Path) -> None: |
| 691 | result = _branch(repo, "-v") |
| 692 | # Short SHA should appear |
| 693 | cid = get_head_commit_id(repo, "main") |
| 694 | assert cid is not None |
| 695 | assert cid[:8] in result.output |
| 696 | |
| 697 | def test_list_verbose_shows_message(self, repo: pathlib.Path) -> None: |
| 698 | result = _branch(repo, "-v") |
| 699 | assert "initial" in result.output |
| 700 | |
| 701 | def test_list_multiple_branches(self, repo: pathlib.Path) -> None: |
| 702 | _branch(repo, "feat/a") |
| 703 | _branch(repo, "feat/b") |
| 704 | result = _branch(repo, "--json") |
| 705 | data = json.loads(result.output) |
| 706 | names = [b["name"] for b in data] |
| 707 | assert "feat/a" in names |
| 708 | assert "feat/b" in names |
| 709 | |
| 710 | def test_list_sorted_by_name(self, repo: pathlib.Path) -> None: |
| 711 | _branch(repo, "z-last") |
| 712 | _branch(repo, "a-first") |
| 713 | result = _branch(repo, "--json") |
| 714 | data = json.loads(result.output) |
| 715 | names = [b["name"] for b in data] |
| 716 | assert names == sorted(names) |
| 717 | |
| 718 | def test_list_sort_committeddate(self, repo: pathlib.Path) -> None: |
| 719 | _branch(repo, "feat-x") |
| 720 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 721 | assert result.exit_code == 0 |
| 722 | data = json.loads(result.output) |
| 723 | assert isinstance(data, list) |
| 724 | |
| 725 | |
| 726 | # ────────────────────────────────────────────────────────────────────────────── |
| 727 | # Integration — FILTERS |
| 728 | # ────────────────────────────────────────────────────────────────────────────── |
| 729 | |
| 730 | |
| 731 | class TestFilters: |
| 732 | def test_merged_filter_includes_self(self, repo: pathlib.Path) -> None: |
| 733 | result = _branch(repo, "--merged", "--json") |
| 734 | data = json.loads(result.output) |
| 735 | names = [b["name"] for b in data] |
| 736 | assert "main" in names |
| 737 | |
| 738 | def test_merged_filter_excludes_unmerged(self, repo: pathlib.Path) -> None: |
| 739 | _branch(repo, "unmerged-feat") |
| 740 | _invoke(repo, ["checkout", "unmerged-feat"]) |
| 741 | (repo / "u.py").write_text("u=1\n") |
| 742 | _commit(repo, "-m", "unmerged") |
| 743 | _invoke(repo, ["checkout", "main"]) |
| 744 | result = _branch(repo, "--merged", "--json") |
| 745 | data = json.loads(result.output) |
| 746 | names = [b["name"] for b in data] |
| 747 | assert "unmerged-feat" not in names |
| 748 | |
| 749 | def test_no_merged_filter_includes_unmerged(self, repo: pathlib.Path) -> None: |
| 750 | _branch(repo, "exclusive-feat") |
| 751 | _invoke(repo, ["checkout", "exclusive-feat"]) |
| 752 | (repo / "e.py").write_text("e=1\n") |
| 753 | _commit(repo, "-m", "exclusive") |
| 754 | _invoke(repo, ["checkout", "main"]) |
| 755 | result = _branch(repo, "--no-merged", "--json") |
| 756 | data = json.loads(result.output) |
| 757 | names = [b["name"] for b in data] |
| 758 | assert "exclusive-feat" in names |
| 759 | |
| 760 | def test_no_merged_filter_excludes_self(self, repo: pathlib.Path) -> None: |
| 761 | result = _branch(repo, "--no-merged", "--json") |
| 762 | data = json.loads(result.output) |
| 763 | names = [b["name"] for b in data] |
| 764 | assert "main" not in names |
| 765 | |
| 766 | def test_contains_commit_filter(self, repo: pathlib.Path) -> None: |
| 767 | cid = get_head_commit_id(repo, "main") |
| 768 | assert cid is not None |
| 769 | result = _branch(repo, "--contains", cid, "--json") |
| 770 | data = json.loads(result.output) |
| 771 | names = [b["name"] for b in data] |
| 772 | assert "main" in names |
| 773 | |
| 774 | def test_contains_unknown_commit_empty(self, repo: pathlib.Path) -> None: |
| 775 | result = _branch(repo, "--contains", "a" * 64, "--json") |
| 776 | data = json.loads(result.output) |
| 777 | assert data == [] |
| 778 | |
| 779 | |
| 780 | # ────────────────────────────────────────────────────────────────────────────── |
| 781 | # Integration — validation |
| 782 | # ────────────────────────────────────────────────────────────────────────────── |
| 783 | |
| 784 | |
| 785 | class TestValidation: |
| 786 | def test_ansi_in_pattern_arg_sanitized(self, repo: pathlib.Path) -> None: |
| 787 | result = _branch(repo, "--pattern", "\x1b[31mxml\x1b[0m") |
| 788 | assert "\x1b" not in result.output |
| 789 | |
| 790 | def test_delete_without_name_exits_1(self, repo: pathlib.Path) -> None: |
| 791 | result = _branch(repo, "-d") |
| 792 | assert result.exit_code == 1 |
| 793 | |
| 794 | def test_rename_too_many_args_exits_1(self, repo: pathlib.Path) -> None: |
| 795 | result = _branch(repo, "-m", "a", "b", "c") |
| 796 | assert result.exit_code == 1 |
| 797 | |
| 798 | def test_copy_too_many_args_exits_1(self, repo: pathlib.Path) -> None: |
| 799 | result = _branch(repo, "-c", "a", "b", "c") |
| 800 | assert result.exit_code == 1 |
| 801 | |
| 802 | |
| 803 | # ────────────────────────────────────────────────────────────────────────────── |
| 804 | # Security — ANSI injection |
| 805 | # ────────────────────────────────────────────────────────────────────────────── |
| 806 | |
| 807 | |
| 808 | class TestSecurityAnsi: |
| 809 | def _has_ansi(self, s: str) -> bool: |
| 810 | return "\x1b[" in s |
| 811 | |
| 812 | def test_ansi_in_branch_name_rejected(self, repo: pathlib.Path) -> None: |
| 813 | result = _branch(repo, "\x1b[31mmalicious\x1b[0m") |
| 814 | assert result.exit_code == 1 |
| 815 | assert not self._has_ansi(result.output) |
| 816 | |
| 817 | def test_ansi_in_delete_name_rejected(self, repo: pathlib.Path) -> None: |
| 818 | result = _branch(repo, "-d", "\x1b[31mmalicious\x1b[0m") |
| 819 | assert result.exit_code == 1 |
| 820 | assert not self._has_ansi(result.output) |
| 821 | |
| 822 | def test_ansi_in_rename_new_name_rejected(self, repo: pathlib.Path) -> None: |
| 823 | result = _branch(repo, "-m", "\x1b[31mnew\x1b[0m") |
| 824 | assert result.exit_code == 1 |
| 825 | assert not self._has_ansi(result.output) |
| 826 | |
| 827 | def test_ansi_in_contains_arg_sanitized(self, repo: pathlib.Path) -> None: |
| 828 | result = _branch(repo, "--contains", "\x1b[31mxml\x1b[0m") |
| 829 | assert not self._has_ansi(result.output) |
| 830 | |
| 831 | def test_ansi_in_contains_commit_id(self, repo: pathlib.Path) -> None: |
| 832 | result = _branch(repo, "--contains", "\x1b[31mmalicious\x1b[0m") |
| 833 | # Should exit 0 (no match, empty list) or exit 0 with empty list |
| 834 | # Either way, ANSI must not appear in output |
| 835 | assert not self._has_ansi(result.output) |
| 836 | |
| 837 | def test_errors_go_to_stderr(self, repo: pathlib.Path) -> None: |
| 838 | result = _branch(repo, "-d", "nonexistent") |
| 839 | assert result.exit_code == 1 |
| 840 | # Error should NOT appear in stdout |
| 841 | assert "not found" not in result.output.lower() or (result.stderr and "not found" in result.stderr.lower()) |
| 842 | |
| 843 | |
| 844 | # ────────────────────────────────────────────────────────────────────────────── |
| 845 | # Stress |
| 846 | # ────────────────────────────────────────────────────────────────────────────── |
| 847 | |
| 848 | |
| 849 | @pytest.mark.slow |
| 850 | class TestStress: |
| 851 | def test_list_500_branches_fast(self, repo: pathlib.Path) -> None: |
| 852 | """Listing 500 branches must complete in under 2 seconds.""" |
| 853 | for i in range(500): |
| 854 | _branch(repo, f"feat/task-{i:04d}") |
| 855 | t0 = time.perf_counter() |
| 856 | result = _branch(repo, "--json") |
| 857 | elapsed = (time.perf_counter() - t0) * 1000 |
| 858 | data = json.loads(result.output) |
| 859 | assert len(data) == 501 # main + 500 |
| 860 | assert elapsed < 2000, f"list 500 branches took {elapsed:.0f}ms (limit 2000ms)" |
| 861 | |
| 862 | def test_merged_filter_100_branches(self, repo: pathlib.Path) -> None: |
| 863 | """--merged filter on 100 branches completes in reasonable time.""" |
| 864 | for i in range(100): |
| 865 | _branch(repo, f"task-{i:03d}") |
| 866 | t0 = time.perf_counter() |
| 867 | result = _branch(repo, "--merged", "--json") |
| 868 | elapsed = (time.perf_counter() - t0) * 1000 |
| 869 | data = json.loads(result.output) |
| 870 | # All branches share the same commit as main → all merged |
| 871 | assert len(data) == 101 |
| 872 | assert elapsed < 3000, f"--merged on 100 branches took {elapsed:.0f}ms" |
| 873 | |
| 874 | def test_sort_committeddate_100_branches(self, repo: pathlib.Path) -> None: |
| 875 | for i in range(100): |
| 876 | _branch(repo, f"sort-{i:03d}") |
| 877 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 878 | assert result.exit_code == 0 |
| 879 | data = json.loads(result.output) |
| 880 | assert len(data) == 101 |
| 881 | |
| 882 | def test_concurrent_branch_list_separate_repos(self, tmp_path: pathlib.Path) -> None: |
| 883 | errors: list[str] = [] |
| 884 | |
| 885 | def do_branch(idx: int) -> None: |
| 886 | repo_dir = tmp_path / f"repo_{idx}" |
| 887 | repo_dir.mkdir() |
| 888 | subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) |
| 889 | (repo_dir / "x.py").write_text(f"x={idx}\n") |
| 890 | subprocess.run( |
| 891 | ["muse", "commit", "-m", f"c{idx}"], |
| 892 | cwd=str(repo_dir), capture_output=True, |
| 893 | ) |
| 894 | for j in range(5): |
| 895 | subprocess.run( |
| 896 | ["muse", "branch", f"b{j}"], |
| 897 | cwd=str(repo_dir), capture_output=True, |
| 898 | ) |
| 899 | r = subprocess.run( |
| 900 | ["muse", "branch", "--json"], |
| 901 | cwd=str(repo_dir), capture_output=True, text=True, |
| 902 | ) |
| 903 | if r.returncode != 0: |
| 904 | errors.append(f"repo_{idx}: branch --json failed") |
| 905 | return |
| 906 | data = json.loads(r.stdout) |
| 907 | if len(data) != 6: # main + 5 |
| 908 | errors.append(f"repo_{idx}: expected 6 branches, got {len(data)}") |
| 909 | |
| 910 | threads = [threading.Thread(target=do_branch, args=(i,)) for i in range(6)] |
| 911 | for t in threads: |
| 912 | t.start() |
| 913 | for t in threads: |
| 914 | t.join() |
| 915 | assert not errors, f"Concurrent branch errors:\n{'\n'.join(errors)}" |
| 916 | |
| 917 | def test_deep_ancestor_chain_is_merged(self, repo: pathlib.Path) -> None: |
| 918 | """A branch with 50 ancestors is correctly detected as merged.""" |
| 919 | _branch(repo, "long-chain") |
| 920 | _invoke(repo, ["checkout", "long-chain"]) |
| 921 | for i in range(50): |
| 922 | (repo / f"step_{i:03d}.py").write_text(f"s={i}\n") |
| 923 | _commit(repo, "-m", f"step {i}") |
| 924 | _invoke(repo, ["checkout", "main"]) |
| 925 | _invoke(repo, ["merge", "long-chain"]) |
| 926 | result = _branch(repo, "--merged", "--json") |
| 927 | data = json.loads(result.output) |
| 928 | names = [b["name"] for b in data] |
| 929 | assert "long-chain" in names |
| 930 | |
| 931 | def test_merged_filter_ancestor_set_computed_once( |
| 932 | self, repo: pathlib.Path |
| 933 | ) -> None: |
| 934 | """--merged must compute the 'into' ancestor set once, not once per branch. |
| 935 | |
| 936 | With N branches, the naive implementation calls _commit_ancestors N times |
| 937 | for the same 'into' tip. The fix pre-computes it once and checks each |
| 938 | branch tip against the cached set. |
| 939 | """ |
| 940 | from unittest.mock import patch |
| 941 | import muse.cli.commands.branch as branch_module |
| 942 | |
| 943 | for i in range(20): |
| 944 | _branch(repo, f"feat-{i:02d}") |
| 945 | |
| 946 | with patch.object( |
| 947 | branch_module, "_commit_ancestors", wraps=branch_module._commit_ancestors |
| 948 | ) as mock_ca: |
| 949 | result = _branch(repo, "--merged", "--json") |
| 950 | |
| 951 | assert result.exit_code == 0 |
| 952 | data = json.loads(result.output) |
| 953 | assert len(data) == 21 # main + 20 |
| 954 | |
| 955 | # The ancestor set for 'main' (the into branch) must be computed exactly once, |
| 956 | # not once per branch being checked. |
| 957 | into_calls = [c for c in mock_ca.call_args_list if c.args[1] != ""] |
| 958 | # All calls with the same commit_id (main's tip) should collapse to 1. |
| 959 | unique_commit_ids = {c.args[1] for c in mock_ca.call_args_list} |
| 960 | assert len(mock_ca.call_args_list) <= len(unique_commit_ids) + 1, ( |
| 961 | f"_commit_ancestors called {len(mock_ca.call_args_list)}× but only " |
| 962 | f"{len(unique_commit_ids)} unique commit IDs — ancestor set is being " |
| 963 | "recomputed per branch instead of once" |
| 964 | ) |
| 965 | |
| 966 | |
| 967 | # ────────────────────────────────────────────────────────────────────────────── |
| 968 | # Prune-config — remove stale [branch.*] entries from config.toml |
| 969 | # ────────────────────────────────────────────────────────────────────────────── |
| 970 | |
| 971 | |
| 972 | class TestPruneConfig: |
| 973 | """muse branch --prune-config removes config entries for nonexistent branches.""" |
| 974 | |
| 975 | def test_prune_config_removes_stale_entries(self, repo: pathlib.Path) -> None: |
| 976 | """Stale [branch.*] sections are deleted; live branches are kept.""" |
| 977 | from muse.cli.config import write_branch_meta, read_branch_meta |
| 978 | |
| 979 | # Write a stale entry directly — branch ref never created |
| 980 | write_branch_meta(repo, "task/stale-1", intent="stale one", resumable=True) |
| 981 | write_branch_meta(repo, "task/stale-2", intent="stale two", resumable=True) |
| 982 | |
| 983 | # Create a real branch so it should be kept |
| 984 | _branch(repo, "feat/keep-me", "--intent", "keep this", "--resumable") |
| 985 | |
| 986 | result = _branch(repo, "--prune-config", "--json") |
| 987 | assert result.exit_code == 0, result.output |
| 988 | |
| 989 | data = json.loads(result.output) |
| 990 | assert data["pruned"] == 2, f"expected 2 pruned, got {data}" |
| 991 | assert data["kept"] >= 1 # feat/keep-me and main at minimum |
| 992 | |
| 993 | # Stale entries are gone from config |
| 994 | assert read_branch_meta(repo, "task/stale-1") == {} |
| 995 | assert read_branch_meta(repo, "task/stale-2") == {} |
| 996 | |
| 997 | # Live branch entry still present |
| 998 | assert read_branch_meta(repo, "feat/keep-me").get("intent") == "keep this" |
| 999 | |
| 1000 | def test_prune_config_noop_when_all_live(self, repo: pathlib.Path) -> None: |
| 1001 | """Pruning a repo with no stale entries reports 0 pruned.""" |
| 1002 | _branch(repo, "feat/real-branch") |
| 1003 | result = _branch(repo, "--prune-config", "--json") |
| 1004 | assert result.exit_code == 0 |
| 1005 | data = json.loads(result.output) |
| 1006 | assert data["pruned"] == 0 |
| 1007 | |
| 1008 | def test_prune_config_dry_run(self, repo: pathlib.Path) -> None: |
| 1009 | """--prune-config --dry-run reports stale entries without removing them.""" |
| 1010 | from muse.cli.config import write_branch_meta, read_branch_meta |
| 1011 | |
| 1012 | write_branch_meta(repo, "task/ghost", intent="ghost", resumable=True) |
| 1013 | |
| 1014 | result = _branch(repo, "--prune-config", "--dry-run", "--json") |
| 1015 | assert result.exit_code == 0 |
| 1016 | data = json.loads(result.output) |
| 1017 | assert data["pruned"] == 1 |
| 1018 | assert data["dry_run"] is True |
| 1019 | |
| 1020 | # Entry still present — dry run made no changes |
| 1021 | assert read_branch_meta(repo, "task/ghost").get("intent") == "ghost" |
File History
2 commits
sha256:144495ae90498f9d51aeac4bf7f9ee9e9502d14474745a856b96921f7349c0ce
next round of fixing tests
Human
3 days ago
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103
feat: branch --prune-config, fix hub repo delete docstrings…
Sonnet 4.6
minor
⚠
4 days ago