"""Tests for ``muse branch``. Coverage tiers -------------- Unit — parser flags, dead-code removal, helpers (_resolve_start_point, _list_local_branches, _list_remotes, _upstream_for, _commit_ancestors, _is_merged, _contains_commit, _cleanup_empty_dirs). Integration — create, delete, force-delete, rename, force-rename, copy, force-copy, listing, filtering, sorting. End-to-end — full CLI invocations: text and JSON output, all operations. Security — ANSI injection in branch names, format flags, messages. Stress — 500 branches, concurrent list, deep ancestry chains. """ from __future__ import annotations import json import os import pathlib import subprocess import threading import time from typing import TYPE_CHECKING import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.types import short_id from muse.core.paths import heads_dir, logs_dir if TYPE_CHECKING: import argparse runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["branch", *extra]) def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: _invoke(repo, ["code", "add", "."]) return _invoke(repo, ["commit", *extra]) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one commit on ``main``.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "a.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") return tmp_path @pytest.fixture() def two_commit_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with two commits on ``main``.""" (repo / "b.py").write_text("y = 2\n") _commit(repo, "-m", "second") return repo # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.branch import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["branch", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j") assert ns.json_out is True def test_delete_flag(self) -> None: ns = self._parse("-d", "foo") assert ns.op == "delete" def test_force_delete_flag(self) -> None: ns = self._parse("-D", "foo") assert ns.op == "force_delete" def test_rename_flag(self) -> None: ns = self._parse("-m", "new") assert ns.op == "rename" def test_force_rename_flag(self) -> None: ns = self._parse("-M", "new") assert ns.op == "force_rename" def test_copy_flag(self) -> None: ns = self._parse("-c", "copy") assert ns.op == "copy" def test_force_copy_flag(self) -> None: ns = self._parse("-C", "copy") assert ns.op == "force_copy" def test_verbose_default_0(self) -> None: ns = self._parse() assert ns.verbose == 0 def test_verbose_v_is_1(self) -> None: ns = self._parse("-v") assert ns.verbose == 1 def test_verbose_vv_is_2(self) -> None: ns = self._parse("-vv") assert ns.verbose == 2 def test_remotes_flag(self) -> None: ns = self._parse("-r") assert ns.remotes is True def test_all_flag(self) -> None: ns = self._parse("-a") assert ns.all_branches is True def test_sort_default_name(self) -> None: ns = self._parse() assert ns.sort == "name" def test_sort_committeddate(self) -> None: ns = self._parse("--sort", "committeddate") assert ns.sort == "committeddate" def test_sort_invalid_rejected(self) -> None: import argparse from muse.cli.commands.branch import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) with pytest.raises(SystemExit): p.parse_args(["branch", "--sort", "invalid"]) # ────────────────────────────────────────────────────────────────────────────── # Unit — dead-code removal # ────────────────────────────────────────────────────────────────────────────── class TestDeadCodeRemoved: def test_op_list_branch_removed(self) -> None: import inspect import muse.cli.commands.branch as m src = inspect.getsource(m.run) assert 'op == "list"' not in src, ( 'op == "list" was a dead branch (nothing in register() creates it); must be deleted' ) def test_inline_tomllib_import_removed(self) -> None: import inspect import muse.cli.commands.branch as m src = inspect.getsource(m._upstream_for) assert "import tomllib" not in src, ( "inline 'import tomllib' inside _upstream_for should be a module-level import" ) def test_double_sanitize_removed(self) -> None: """The verbose listing previously double-sanitized name_str (stripping ANSI).""" import inspect import muse.cli.commands.branch as m src = inspect.getsource(m.run) assert "sanitize_display(name_str)" not in src, ( "name_str was double-sanitized; the second call stripped ANSI from current branch" ) # ────────────────────────────────────────────────────────────────────────────── # Unit — _resolve_start_point # ────────────────────────────────────────────────────────────────────────────── class TestResolveStartPoint: def test_resolves_branch_name(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _resolve_start_point cid = get_head_commit_id(repo, "main") result = _resolve_start_point(repo, "main", "main") assert result == cid def test_resolves_full_sha(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _resolve_start_point cid = get_head_commit_id(repo, "main") assert cid is not None result = _resolve_start_point(repo, "main", cid) assert result == cid def test_resolves_partial_sha(self, two_commit_repo: pathlib.Path) -> None: from muse.cli.commands.branch import _resolve_start_point from muse.core.refs import read_current_branch from muse.core.commits import get_commits_for_branch repo = two_commit_repo branch = read_current_branch(repo) commits = get_commits_for_branch(repo, branch) first_sha = commits[-1].commit_id # oldest commit # 12-char prefix should resolve result = _resolve_start_point(repo, "main", short_id(first_sha)) assert result == first_sha def test_returns_input_for_unresolvable(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _resolve_start_point result = _resolve_start_point(repo, "main", "nonexistent-ref") assert result == "nonexistent-ref" # ────────────────────────────────────────────────────────────────────────────── # Unit — _list_local_branches # ────────────────────────────────────────────────────────────────────────────── class TestListLocalBranches: def test_returns_sorted_list(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _list_local_branches _branch(repo, "z-last") _branch(repo, "a-first") branches = _list_local_branches(repo) assert branches == sorted(branches) def test_skips_hidden_files(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _list_local_branches # Plant a hidden lock file inside refs/heads/ lock = heads_dir(repo) / ".lock" lock.write_text("locked") branches = _list_local_branches(repo) assert ".lock" not in branches assert not any(b.startswith(".") for b in branches) def test_empty_repo_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.branch import _list_local_branches assert _list_local_branches(tmp_path) == [] def test_includes_nested_branches(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _list_local_branches _branch(repo, "feat/sub/task") branches = _list_local_branches(repo) assert "feat/sub/task" in branches # ────────────────────────────────────────────────────────────────────────────── # Unit — _commit_ancestors, _is_merged, _contains_commit # ────────────────────────────────────────────────────────────────────────────── class TestCommitGraph: def test_commit_ancestors_includes_self(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _commit_ancestors cid = get_head_commit_id(repo, "main") assert cid is not None ancestors = _commit_ancestors(repo, cid) assert cid in ancestors def test_is_merged_true_for_same_branch(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _is_merged assert _is_merged(repo, "main", "main") def test_is_merged_false_for_unmerged(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _is_merged _branch(repo, "feat") _invoke(repo, ["checkout", "feat"]) (repo / "c.py").write_text("c=1\n") _commit(repo, "-m", "feat commit") _invoke(repo, ["checkout", "main"]) assert not _is_merged(repo, "feat", "main") def test_contains_commit_true(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _contains_commit cid = get_head_commit_id(repo, "main") assert cid is not None assert _contains_commit(repo, "main", cid) def test_contains_commit_false_for_unknown(self, repo: pathlib.Path) -> None: from muse.cli.commands.branch import _contains_commit assert not _contains_commit(repo, "main", "a" * 64) # ────────────────────────────────────────────────────────────────────────────── # Integration — CREATE # ────────────────────────────────────────────────────────────────────────────── class TestCreate: def test_create_basic_exits_0(self, repo: pathlib.Path) -> None: result = _branch(repo, "new-branch") assert result.exit_code == 0 def test_create_text_output(self, repo: pathlib.Path) -> None: result = _branch(repo, "my-branch") assert "my-branch" in result.output def test_create_json_schema(self, repo: pathlib.Path) -> None: result = _branch(repo, "json-branch", "--json") data = json.loads(result.output) assert data["action"] == "created" assert data["branch"] == "json-branch" assert "commit_id" in data assert "from" in data def test_create_json_from_is_none_at_head(self, repo: pathlib.Path) -> None: result = _branch(repo, "from-head", "--json") data = json.loads(result.output) assert data["from"] is None def test_create_at_full_sha(self, two_commit_repo: pathlib.Path) -> None: repo = two_commit_repo from muse.core.refs import read_current_branch from muse.core.commits import get_commits_for_branch branch = read_current_branch(repo) commits = get_commits_for_branch(repo, branch) first_sha = commits[-1].commit_id result = _branch(repo, "at-sha", first_sha) assert result.exit_code == 0 tip = get_head_commit_id(repo, "at-sha") assert tip == first_sha def test_create_at_partial_sha(self, two_commit_repo: pathlib.Path) -> None: repo = two_commit_repo from muse.core.refs import read_current_branch from muse.core.commits import get_commits_for_branch branch = read_current_branch(repo) commits = get_commits_for_branch(repo, branch) first_sha = commits[-1].commit_id result = _branch(repo, "at-partial", short_id(first_sha)) assert result.exit_code == 0 tip = get_head_commit_id(repo, "at-partial") assert tip == first_sha def test_create_at_branch_name(self, repo: pathlib.Path) -> None: head_cid = get_head_commit_id(repo, "main") result = _branch(repo, "copy-of-main", "main") assert result.exit_code == 0 tip = get_head_commit_id(repo, "copy-of-main") assert tip == head_cid def test_create_json_from_field_populated(self, repo: pathlib.Path) -> None: result = _branch(repo, "with-from", "main", "--json") data = json.loads(result.output) assert data["from"] == "main" def test_create_duplicate_exits_1(self, repo: pathlib.Path) -> None: _branch(repo, "dup") result = _branch(repo, "dup") assert result.exit_code == 1 def test_create_invalid_name_exits_1(self, repo: pathlib.Path) -> None: result = _branch(repo, "bad..name") assert result.exit_code == 1 def test_create_does_not_checkout(self, repo: pathlib.Path) -> None: _branch(repo, "new-but-no-switch") assert read_current_branch(repo) == "main" # ────────────────────────────────────────────────────────────────────────────── # Integration — DELETE # ────────────────────────────────────────────────────────────────────────────── class TestDelete: def test_delete_merged_branch_exits_0(self, repo: pathlib.Path) -> None: _branch(repo, "to-delete") # Branch points to same commit as main → considered merged result = _branch(repo, "-d", "to-delete") assert result.exit_code == 0 def test_delete_json_schema(self, repo: pathlib.Path) -> None: _branch(repo, "del-json") result = _branch(repo, "-d", "del-json", "--json") data = json.loads(result.output) assert data["action"] == "deleted" assert data["branch"] == "del-json" assert "was" in data def test_delete_unmerged_exits_1_without_force(self, repo: pathlib.Path) -> None: _branch(repo, "unmerged") _invoke(repo, ["checkout", "unmerged"]) (repo / "z.py").write_text("z=1\n") _commit(repo, "-m", "unmerged work") _invoke(repo, ["checkout", "main"]) result = _branch(repo, "-d", "unmerged") assert result.exit_code == 1 def test_force_delete_unmerged_exits_0(self, repo: pathlib.Path) -> None: _branch(repo, "force-del") _invoke(repo, ["checkout", "force-del"]) (repo / "x.py").write_text("x=1\n") _commit(repo, "-m", "exclusive work") _invoke(repo, ["checkout", "main"]) result = _branch(repo, "-D", "force-del") assert result.exit_code == 0 def test_delete_current_branch_exits_1(self, repo: pathlib.Path) -> None: result = _branch(repo, "-d", "main") assert result.exit_code == 1 def test_delete_nonexistent_exits_1(self, repo: pathlib.Path) -> None: result = _branch(repo, "-d", "ghost") assert result.exit_code == 1 def test_delete_removes_branch_from_list(self, repo: pathlib.Path) -> None: _branch(repo, "temp") _branch(repo, "-d", "temp") result = _branch(repo, "--json") names = [b["name"] for b in json.loads(result.output)] assert "temp" not in names def test_delete_nested_branch_cleans_empty_dirs(self, repo: pathlib.Path) -> None: _branch(repo, "feat/sub/task") _branch(repo, "-D", "feat/sub/task") # The feat/ and feat/sub/ dirs should be gone feat_dir = heads_dir(repo) / "feat" assert not feat_dir.exists() def test_delete_removes_reflog_file(self, repo: pathlib.Path) -> None: """Deleting a branch removes its reflog file — git-idiomatic behaviour.""" _invoke(repo, ["checkout", "-b", "bye"]) # checkout writes the reflog _invoke(repo, ["checkout", "main"]) reflog = logs_dir(repo) / "refs" / "heads" / "bye" assert reflog.exists(), "reflog should exist after checkout -b" _branch(repo, "-d", "bye") assert not reflog.exists(), "reflog must be deleted when branch is deleted" def test_delete_nested_branch_removes_reflog_and_empty_dirs( self, repo: pathlib.Path ) -> None: """Nested branch deletion removes reflog file and its empty parent dirs.""" _invoke(repo, ["checkout", "-b", "feat/ui/button"]) _invoke(repo, ["checkout", "main"]) reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "ui" / "button" assert reflog.exists(), "reflog should exist after checkout -b" _branch(repo, "-D", "feat/ui/button") assert not reflog.exists() log_feat_dir = logs_dir(repo) / "refs" / "heads" / "feat" assert not log_feat_dir.exists(), "empty reflog parent dirs must be cleaned up" def test_force_delete_also_removes_reflog(self, repo: pathlib.Path) -> None: """-D (force delete) removes the reflog just like -d.""" _invoke(repo, ["checkout", "-b", "force-log"]) (repo / "tmp.py").write_text("x=1\n") _commit(repo, "-m", "unmerged") _invoke(repo, ["checkout", "main"]) reflog = logs_dir(repo) / "refs" / "heads" / "force-log" assert reflog.exists() _branch(repo, "-D", "force-log") assert not reflog.exists() # ────────────────────────────────────────────────────────────────────────────── # Integration — CREATE REFLOG # ────────────────────────────────────────────────────────────────────────────── class TestCreateReflog: def test_branch_create_writes_reflog(self, repo: pathlib.Path) -> None: """muse branch -b writes a reflog entry — git-idiomatic behaviour.""" _branch(repo, "feat/new") reflog = logs_dir(repo) / "refs" / "heads" / "feat" / "new" assert reflog.exists(), "reflog must exist after muse branch -b" def test_branch_create_reflog_contains_branch_created(self, repo: pathlib.Path) -> None: """Reflog entry records a 'branch: Created' operation.""" _branch(repo, "task/thing") reflog = logs_dir(repo) / "refs" / "heads" / "task" / "thing" content = reflog.read_text(encoding="utf-8") assert "branch: Created" in content def test_branch_create_reflog_records_start_point(self, repo: pathlib.Path) -> None: """Reflog entry for a branch created from another branch names that source.""" _branch(repo, "task/from-main", "main") reflog = logs_dir(repo) / "refs" / "heads" / "task" / "from-main" content = reflog.read_text(encoding="utf-8") assert "main" in content # ────────────────────────────────────────────────────────────────────────────── # Integration — RENAME # ────────────────────────────────────────────────────────────────────────────── class TestRename: def test_rename_basic(self, repo: pathlib.Path) -> None: _branch(repo, "old-name") result = _branch(repo, "-m", "old-name", "new-name") assert result.exit_code == 0 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] assert "new-name" in names assert "old-name" not in names def test_rename_omit_old_uses_current(self, repo: pathlib.Path) -> None: _branch(repo, "temp") _invoke(repo, ["checkout", "temp"]) result = _branch(repo, "-m", "renamed") assert result.exit_code == 0 assert read_current_branch(repo) == "renamed" _invoke(repo, ["checkout", "main"]) def test_rename_json_schema(self, repo: pathlib.Path) -> None: _branch(repo, "src") result = _branch(repo, "-m", "src", "dst", "--json") data = json.loads(result.output) assert data["action"] == "renamed" assert data["from"] == "src" assert data["to"] == "dst" def test_rename_to_existing_exits_1(self, repo: pathlib.Path) -> None: _branch(repo, "a") _branch(repo, "b") result = _branch(repo, "-m", "a", "b") assert result.exit_code == 1 def test_force_rename_to_existing_exits_0(self, repo: pathlib.Path) -> None: _branch(repo, "a") _branch(repo, "b") result = _branch(repo, "-M", "a", "b") assert result.exit_code == 0 def test_rename_updates_head_when_current(self, repo: pathlib.Path) -> None: _branch(repo, "temp2") _invoke(repo, ["checkout", "temp2"]) _branch(repo, "-m", "temp2", "newname") assert read_current_branch(repo) == "newname" _invoke(repo, ["checkout", "main"]) def test_rename_nonexistent_exits_1(self, repo: pathlib.Path) -> None: result = _branch(repo, "-m", "ghost", "newname") assert result.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Integration — COPY # ────────────────────────────────────────────────────────────────────────────── class TestCopy: def test_copy_basic(self, repo: pathlib.Path) -> None: _branch(repo, "orig") result = _branch(repo, "-c", "orig", "clone") assert result.exit_code == 0 names = [b["name"] for b in json.loads(_branch(repo, "--json").output)] assert "orig" in names assert "clone" in names def test_copy_same_tip(self, repo: pathlib.Path) -> None: _branch(repo, "src") _branch(repo, "-c", "src", "dst") tip_src = get_head_commit_id(repo, "src") tip_dst = get_head_commit_id(repo, "dst") assert tip_src == tip_dst def test_copy_json_schema(self, repo: pathlib.Path) -> None: _branch(repo, "original") result = _branch(repo, "-c", "original", "copy1", "--json") data = json.loads(result.output) assert data["action"] == "copied" assert data["from"] == "original" assert data["to"] == "copy1" def test_copy_to_existing_exits_1(self, repo: pathlib.Path) -> None: _branch(repo, "x") _branch(repo, "y") result = _branch(repo, "-c", "x", "y") assert result.exit_code == 1 def test_force_copy_to_existing_exits_0(self, repo: pathlib.Path) -> None: _branch(repo, "p") _branch(repo, "q") result = _branch(repo, "-C", "p", "q") assert result.exit_code == 0 def test_copy_omit_src_uses_current(self, repo: pathlib.Path) -> None: head = get_head_commit_id(repo, "main") result = _branch(repo, "-c", "main-copy") assert result.exit_code == 0 tip = get_head_commit_id(repo, "main-copy") assert tip == head # ────────────────────────────────────────────────────────────────────────────── # Integration — LIST # ────────────────────────────────────────────────────────────────────────────── class TestList: def test_list_text_exits_0(self, repo: pathlib.Path) -> None: result = _branch(repo) assert result.exit_code == 0 def test_list_contains_main(self, repo: pathlib.Path) -> None: result = _branch(repo) assert "main" in result.output def test_list_marks_current_branch(self, repo: pathlib.Path) -> None: result = _branch(repo) # Current branch line must start with "* " current_lines = [l for l in result.output.splitlines() if l.startswith("* ")] assert len(current_lines) == 1 assert "main" in current_lines[0] def test_list_json_schema(self, repo: pathlib.Path) -> None: result = _branch(repo, "--json") data = json.loads(result.output) assert isinstance(data, list) assert len(data) >= 1 keys = set(data[0].keys()) assert {"name", "current", "commit_id", "last_message", "upstream"} <= keys def test_list_json_current_flag(self, repo: pathlib.Path) -> None: result = _branch(repo, "--json") data = json.loads(result.output) current = [b for b in data if b["current"]] assert len(current) == 1 assert current[0]["name"] == "main" def test_list_json_last_message_populated(self, repo: pathlib.Path) -> None: result = _branch(repo, "--json") data = json.loads(result.output) main_entry = next(b for b in data if b["name"] == "main") assert main_entry["last_message"] is not None assert "initial" in main_entry["last_message"] def test_list_json_upstream_null_by_default(self, repo: pathlib.Path) -> None: result = _branch(repo, "--json") data = json.loads(result.output) main_entry = next(b for b in data if b["name"] == "main") assert main_entry["upstream"] is None def test_list_verbose_shows_sha(self, repo: pathlib.Path) -> None: result = _branch(repo, "-v") # Short SHA should appear cid = get_head_commit_id(repo, "main") assert cid is not None assert cid[:8] in result.output def test_list_verbose_shows_message(self, repo: pathlib.Path) -> None: result = _branch(repo, "-v") assert "initial" in result.output def test_list_multiple_branches(self, repo: pathlib.Path) -> None: _branch(repo, "feat/a") _branch(repo, "feat/b") result = _branch(repo, "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert "feat/a" in names assert "feat/b" in names def test_list_sorted_by_name(self, repo: pathlib.Path) -> None: _branch(repo, "z-last") _branch(repo, "a-first") result = _branch(repo, "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert names == sorted(names) def test_list_sort_committeddate(self, repo: pathlib.Path) -> None: _branch(repo, "feat-x") result = _branch(repo, "--sort", "committeddate", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, list) # ────────────────────────────────────────────────────────────────────────────── # Integration — FILTERS # ────────────────────────────────────────────────────────────────────────────── class TestFilters: def test_merged_filter_includes_self(self, repo: pathlib.Path) -> None: result = _branch(repo, "--merged", "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert "main" in names def test_merged_filter_excludes_unmerged(self, repo: pathlib.Path) -> None: _branch(repo, "unmerged-feat") _invoke(repo, ["checkout", "unmerged-feat"]) (repo / "u.py").write_text("u=1\n") _commit(repo, "-m", "unmerged") _invoke(repo, ["checkout", "main"]) result = _branch(repo, "--merged", "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert "unmerged-feat" not in names def test_no_merged_filter_includes_unmerged(self, repo: pathlib.Path) -> None: _branch(repo, "exclusive-feat") _invoke(repo, ["checkout", "exclusive-feat"]) (repo / "e.py").write_text("e=1\n") _commit(repo, "-m", "exclusive") _invoke(repo, ["checkout", "main"]) result = _branch(repo, "--no-merged", "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert "exclusive-feat" in names def test_no_merged_filter_excludes_self(self, repo: pathlib.Path) -> None: result = _branch(repo, "--no-merged", "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert "main" not in names def test_contains_commit_filter(self, repo: pathlib.Path) -> None: cid = get_head_commit_id(repo, "main") assert cid is not None result = _branch(repo, "--contains", cid, "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert "main" in names def test_contains_unknown_commit_empty(self, repo: pathlib.Path) -> None: result = _branch(repo, "--contains", "a" * 64, "--json") data = json.loads(result.output) assert data == [] # ────────────────────────────────────────────────────────────────────────────── # Integration — validation # ────────────────────────────────────────────────────────────────────────────── class TestValidation: def test_ansi_in_pattern_arg_sanitized(self, repo: pathlib.Path) -> None: result = _branch(repo, "--pattern", "\x1b[31mxml\x1b[0m") assert "\x1b" not in result.output def test_delete_without_name_exits_1(self, repo: pathlib.Path) -> None: result = _branch(repo, "-d") assert result.exit_code == 1 def test_rename_too_many_args_exits_1(self, repo: pathlib.Path) -> None: result = _branch(repo, "-m", "a", "b", "c") assert result.exit_code == 1 def test_copy_too_many_args_exits_1(self, repo: pathlib.Path) -> None: result = _branch(repo, "-c", "a", "b", "c") assert result.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: def _has_ansi(self, s: str) -> bool: return "\x1b[" in s def test_ansi_in_branch_name_rejected(self, repo: pathlib.Path) -> None: result = _branch(repo, "\x1b[31mmalicious\x1b[0m") assert result.exit_code == 1 assert not self._has_ansi(result.output) def test_ansi_in_delete_name_rejected(self, repo: pathlib.Path) -> None: result = _branch(repo, "-d", "\x1b[31mmalicious\x1b[0m") assert result.exit_code == 1 assert not self._has_ansi(result.output) def test_ansi_in_rename_new_name_rejected(self, repo: pathlib.Path) -> None: result = _branch(repo, "-m", "\x1b[31mnew\x1b[0m") assert result.exit_code == 1 assert not self._has_ansi(result.output) def test_ansi_in_contains_arg_sanitized(self, repo: pathlib.Path) -> None: result = _branch(repo, "--contains", "\x1b[31mxml\x1b[0m") assert not self._has_ansi(result.output) def test_ansi_in_contains_commit_id(self, repo: pathlib.Path) -> None: result = _branch(repo, "--contains", "\x1b[31mmalicious\x1b[0m") # Should exit 0 (no match, empty list) or exit 0 with empty list # Either way, ANSI must not appear in output assert not self._has_ansi(result.output) def test_errors_go_to_stderr(self, repo: pathlib.Path) -> None: result = _branch(repo, "-d", "nonexistent") assert result.exit_code == 1 # Error should NOT appear in stdout assert "not found" not in result.output.lower() or (result.stderr and "not found" in result.stderr.lower()) # ────────────────────────────────────────────────────────────────────────────── # Stress # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStress: def test_list_500_branches_fast(self, repo: pathlib.Path) -> None: """Listing 500 branches must complete in under 2 seconds.""" for i in range(500): _branch(repo, f"feat/task-{i:04d}") t0 = time.perf_counter() result = _branch(repo, "--json") elapsed = (time.perf_counter() - t0) * 1000 data = json.loads(result.output) assert len(data) == 501 # main + 500 assert elapsed < 2000, f"list 500 branches took {elapsed:.0f}ms (limit 2000ms)" def test_merged_filter_100_branches(self, repo: pathlib.Path) -> None: """--merged filter on 100 branches completes in reasonable time.""" for i in range(100): _branch(repo, f"task-{i:03d}") t0 = time.perf_counter() result = _branch(repo, "--merged", "--json") elapsed = (time.perf_counter() - t0) * 1000 data = json.loads(result.output) # All branches share the same commit as main → all merged assert len(data) == 101 assert elapsed < 3000, f"--merged on 100 branches took {elapsed:.0f}ms" def test_sort_committeddate_100_branches(self, repo: pathlib.Path) -> None: for i in range(100): _branch(repo, f"sort-{i:03d}") result = _branch(repo, "--sort", "committeddate", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert len(data) == 101 def test_concurrent_branch_list_separate_repos(self, tmp_path: pathlib.Path) -> None: errors: list[str] = [] def do_branch(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) (repo_dir / "x.py").write_text(f"x={idx}\n") subprocess.run( ["muse", "commit", "-m", f"c{idx}"], cwd=str(repo_dir), capture_output=True, ) for j in range(5): subprocess.run( ["muse", "branch", f"b{j}"], cwd=str(repo_dir), capture_output=True, ) r = subprocess.run( ["muse", "branch", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) if r.returncode != 0: errors.append(f"repo_{idx}: branch --json failed") return data = json.loads(r.stdout) if len(data) != 6: # main + 5 errors.append(f"repo_{idx}: expected 6 branches, got {len(data)}") threads = [threading.Thread(target=do_branch, args=(i,)) for i in range(6)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent branch errors:\n{'\n'.join(errors)}" def test_deep_ancestor_chain_is_merged(self, repo: pathlib.Path) -> None: """A branch with 50 ancestors is correctly detected as merged.""" _branch(repo, "long-chain") _invoke(repo, ["checkout", "long-chain"]) for i in range(50): (repo / f"step_{i:03d}.py").write_text(f"s={i}\n") _commit(repo, "-m", f"step {i}") _invoke(repo, ["checkout", "main"]) _invoke(repo, ["merge", "long-chain"]) result = _branch(repo, "--merged", "--json") data = json.loads(result.output) names = [b["name"] for b in data] assert "long-chain" in names def test_merged_filter_ancestor_set_computed_once( self, repo: pathlib.Path ) -> None: """--merged must compute the 'into' ancestor set once, not once per branch. With N branches, the naive implementation calls _commit_ancestors N times for the same 'into' tip. The fix pre-computes it once and checks each branch tip against the cached set. """ from unittest.mock import patch import muse.cli.commands.branch as branch_module for i in range(20): _branch(repo, f"feat-{i:02d}") with patch.object( branch_module, "_commit_ancestors", wraps=branch_module._commit_ancestors ) as mock_ca: result = _branch(repo, "--merged", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert len(data) == 21 # main + 20 # The ancestor set for 'main' (the into branch) must be computed exactly once, # not once per branch being checked. into_calls = [c for c in mock_ca.call_args_list if c.args[1] != ""] # All calls with the same commit_id (main's tip) should collapse to 1. unique_commit_ids = {c.args[1] for c in mock_ca.call_args_list} assert len(mock_ca.call_args_list) <= len(unique_commit_ids) + 1, ( f"_commit_ancestors called {len(mock_ca.call_args_list)}× but only " f"{len(unique_commit_ids)} unique commit IDs — ancestor set is being " "recomputed per branch instead of once" ) # ────────────────────────────────────────────────────────────────────────────── # Prune-config — remove stale [branch.*] entries from config.toml # ────────────────────────────────────────────────────────────────────────────── class TestPruneConfig: """muse branch --prune-config removes config entries for nonexistent branches.""" def test_prune_config_removes_stale_entries(self, repo: pathlib.Path) -> None: """Stale [branch.*] sections are deleted; live branches are kept.""" from muse.cli.config import write_branch_meta, read_branch_meta # Write a stale entry directly — branch ref never created write_branch_meta(repo, "task/stale-1", intent="stale one", resumable=True) write_branch_meta(repo, "task/stale-2", intent="stale two", resumable=True) # Create a real branch so it should be kept _branch(repo, "feat/keep-me", "--intent", "keep this", "--resumable") result = _branch(repo, "--prune-config", "--json") assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["pruned"] == 2, f"expected 2 pruned, got {data}" assert data["kept"] >= 1 # feat/keep-me and main at minimum # Stale entries are gone from config assert read_branch_meta(repo, "task/stale-1") == {} assert read_branch_meta(repo, "task/stale-2") == {} # Live branch entry still present assert read_branch_meta(repo, "feat/keep-me").get("intent") == "keep this" def test_prune_config_noop_when_all_live(self, repo: pathlib.Path) -> None: """Pruning a repo with no stale entries reports 0 pruned.""" _branch(repo, "feat/real-branch") result = _branch(repo, "--prune-config", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["pruned"] == 0 def test_prune_config_dry_run(self, repo: pathlib.Path) -> None: """--prune-config --dry-run reports stale entries without removing them.""" from muse.cli.config import write_branch_meta, read_branch_meta write_branch_meta(repo, "task/ghost", intent="ghost", resumable=True) result = _branch(repo, "--prune-config", "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["pruned"] == 1 assert data["dry_run"] is True # Entry still present — dry run made no changes assert read_branch_meta(repo, "task/ghost").get("intent") == "ghost"