"""Comprehensive hardening tests for ``muse fetch``. Coverage -------- Unit - _stale_ref_names: no-dir, all-live, stale detected, nested branches, symlink skip - _prune_stale_refs: dry-run, live delete, empty-parent cleanup, return values Integration (mocked transport) - _fetch_one: up-to-date, fetched, dry-run writes nothing, unknown remote, transport error, branch missing without prune, branch missing with prune, set_remote_head after apply_mpack Security - ANSI injection in remote name stripped in stderr - ANSI injection in branch name stripped in stderr - available-branches list sanitized before output - symlink traversal blocked in _stale_ref_names - all diagnostics go to stderr, not stdout E2E (via CliRunner) - basic fetch exits 0 - already-up-to-date exits 0 - --json output schema correct - --format json equivalent to --json - --dry-run exits 0 - --dry-run --json status = "dry_run" - --branch flag - --branch --json carries correct branch - unknown remote exits non-zero - --prune flag - --prune --json includes pruned list - --all fetches every remote - --all --json has N results - --all + --branch fetches named branch from every remote - --all with no remotes exits non-zero Stress - 8 concurrent prune scans on isolated repos """ from __future__ import annotations import contextlib import json import pathlib import threading from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult if TYPE_CHECKING: from muse.cli.commands.fetch import _FetchJson, _RemoteResultJson from muse.core.mpack import ApplyResult, MPack from muse.core.transport import MuseTransport cli = None runner = CliRunner() REMOTE_ID = "a" * 64 OLD_REMOTE_ID = "b" * 64 from muse.core.types import Manifest, blob_id from muse.core.paths import muse_dir, remotes_dir type _RemoteInfoMap = dict[str, str | dict[str, str]] # ── typed helpers ───────────────────────────────────────────────────────────── def _make_apply_result( commits_written: int = 3, blobs_written: int = 7, ) -> "ApplyResult": from muse.core.mpack import ApplyResult return ApplyResult( commits_written=commits_written, snapshots_written=commits_written, blobs_written=blobs_written, blobs_skipped=0, tags_written=0, failed_blobs=[], skipped_snapshots=[], ) def _make_bundle() -> "MPack": from muse.core.mpack import MPack return MPack(commits=[], snapshots=[], blobs=[]) def _make_fetch_mpack_result( commits_count: int = 0, objects_count: int = 0, ) -> Mapping[str, object]: """Return a FetchMPackResult-shaped dict for mocking fetch_mpack.""" from muse.core.transport import FetchMPackResult blobs: list[dict] = [] for i in range(objects_count): content = f"fake-blob-{i}".encode() blobs.append({"object_id": blob_id(content), "content": content}) return FetchMPackResult( repo_id="test-repo-id", domain="code", default_branch="main", branch_heads={"main": REMOTE_ID}, commits=[], snapshots=[], blobs=blobs, blobs_received=len(blobs), shallow_commits=[], ) def _make_remote_info( branch_heads: Manifest | None = None, ) -> _RemoteInfoMap: return { "repo_id": "test-repo-id", "domain": "code", "default_branch": "main", "branch_heads": branch_heads or {"main": REMOTE_ID}, } def _make_transport_mock( branch_heads: Manifest | None = None, objects_count: int = 7, ) -> MagicMock: t = MagicMock() t.fetch_remote_info.return_value = _make_remote_info(branch_heads) def _fetch_mpack( url: str, token: str | None, want: list[str], have: list[str], **kwargs: str, ) -> Mapping[str, object]: return _make_fetch_mpack_result(objects_count=objects_count) t.fetch_mpack.side_effect = _fetch_mpack return t def _json_line(result: InvokeResult) -> "_FetchJson": """Extract the JSON object from cli_test_helper's combined output. The test helper mixes stderr into result.output, so we scan for the first line beginning with '{'. """ for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): parsed: _FetchJson = json.loads(stripped) return parsed raise ValueError(f"No JSON line in output:\n{result.output!r}") def _init_repo(tmp_path: pathlib.Path) -> None: dot_muse = muse_dir(tmp_path) for sub in ("objects", "commits", "snapshots", "remotes", "refs/heads", "branches"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") (dot_muse / "config.toml").write_text( '[remotes.origin]\nurl = "http://localhost:19999"\n' ) (dot_muse / "repo.json").write_text('{"id": "test-repo-id"}') def _write_remote_ref( tmp_path: pathlib.Path, remote: str, branch: str, commit_id: str ) -> None: ref_file = remotes_dir(tmp_path) / remote / branch ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id) # ── Unit: _stale_ref_names ──────────────────────────────────────────────────── class TestStaleRefNames: def test_no_refs_dir_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _stale_ref_names assert _stale_ref_names(tmp_path, "origin", {"main": REMOTE_ID}) == [] def test_all_live_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "main", REMOTE_ID) assert _stale_ref_names(tmp_path, "origin", {"main": REMOTE_ID}) == [] def test_stale_branch_detected(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "main", REMOTE_ID) _write_remote_ref(tmp_path, "origin", "feat/old", OLD_REMOTE_ID) stale = _stale_ref_names(tmp_path, "origin", {"main": REMOTE_ID}) assert stale == ["feat/old"] def test_nested_branch_name_preserved(self, tmp_path: pathlib.Path) -> None: """Slashes in branch names stored as nested files must round-trip correctly.""" from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "feat/ui/redesign", REMOTE_ID) stale = _stale_ref_names(tmp_path, "origin", {}) assert "feat/ui/redesign" in stale def test_symlinks_skipped(self, tmp_path: pathlib.Path) -> None: """Symlinks inside the refs dir must not be followed (path-traversal guard).""" from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) refs_dir = remotes_dir(tmp_path) / "origin" refs_dir.mkdir(parents=True, exist_ok=True) target = tmp_path / "outside.txt" target.write_text("sensitive") (refs_dir / "malicious-link").symlink_to(target) stale = _stale_ref_names(tmp_path, "origin", {}) assert "malicious-link" not in stale # ── Unit: _prune_stale_refs ─────────────────────────────────────────────────── class TestPruneStaleRefs: def test_dry_run_does_not_delete( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "dead-branch", OLD_REMOTE_ID) pruned = _prune_stale_refs(tmp_path, "origin", {}, dry_run=True) assert pruned == ["origin/dead-branch"] assert (remotes_dir(tmp_path) / "origin" / "dead-branch").exists() assert "Would prune" in capsys.readouterr().err def test_live_delete_removes_file( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "dead-branch", OLD_REMOTE_ID) pruned = _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert pruned == ["origin/dead-branch"] assert not (remotes_dir(tmp_path) / "origin" / "dead-branch").exists() assert "[deleted]" in capsys.readouterr().err def test_empty_parent_dirs_removed(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "feat/old-thing", OLD_REMOTE_ID) _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert not (remotes_dir(tmp_path) / "origin" / "feat").exists() def test_returns_qualified_remote_branch_names(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "stale-a", OLD_REMOTE_ID) _write_remote_ref(tmp_path, "origin", "stale-b", OLD_REMOTE_ID) pruned = _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert "origin/stale-a" in pruned assert "origin/stale-b" in pruned def test_no_refs_dir_is_noop(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) assert _prune_stale_refs(tmp_path, "no-remote", {}, dry_run=False) == [] def test_output_goes_to_stderr_not_stdout( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "dead", OLD_REMOTE_ID) _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert capsys.readouterr().out == "" # ── Integration: _fetch_one ─────────────────────────────────────────────────── class TestFetchOne: def _patches( self, already_known: str | None = None, branch_heads: Manifest | None = None, apply_result: "ApplyResult | None" = None, objects_count: int = 7, ) -> contextlib.ExitStack: stack = contextlib.ExitStack() transport = _make_transport_mock(branch_heads or {"main": REMOTE_ID}, objects_count=objects_count) stack.enter_context(patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999")) stack.enter_context(patch("muse.cli.commands.fetch.get_signing_identity", return_value=None)) stack.enter_context(patch("muse.cli.commands.fetch.make_transport", return_value=transport)) stack.enter_context(patch("muse.cli.commands.fetch.get_remote_head", return_value=already_known)) stack.enter_context(patch("muse.cli.commands.fetch.set_remote_head")) stack.enter_context(patch("muse.cli.commands.fetch.apply_mpack", return_value=apply_result or _make_apply_result())) stack.enter_context(patch("muse.cli.commands.fetch.get_all_commits", return_value=[])) return stack def test_up_to_date_status(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(already_known=REMOTE_ID): result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert result["status"] == "up_to_date" assert result["commits_received"] == 0 def test_fetched_status(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(): result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert result["status"] == "fetched" assert result["commits_received"] == 3 assert result["blobs_written"] == 7 def test_commits_received_from_apply_result_not_bundle(self, tmp_path: pathlib.Path) -> None: """Regression: use apply_result['commits_written'], not len(mpack['commits']).""" from muse.cli.commands.fetch import _fetch_one with self._patches(apply_result=_make_apply_result(commits_written=5, blobs_written=12), objects_count=12): result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert result["commits_received"] == 5 assert result["blobs_written"] == 12 def test_dry_run_does_not_write(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one set_mock = MagicMock() with self._patches() as stack: stack.enter_context(patch("muse.cli.commands.fetch.set_remote_head", set_mock)) result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=True) assert result["status"] == "dry_run" def test_unknown_remote_exits_user_error(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one from muse.core.errors import ExitCode with patch("muse.cli.commands.fetch.get_remote", return_value=None): with pytest.raises(SystemExit) as exc: _fetch_one(tmp_path, "no-such", "main", prune=False, dry_run=False) assert exc.value.code == ExitCode.USER_ERROR def test_branch_missing_without_prune_exits(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(branch_heads={"dev": REMOTE_ID}): with pytest.raises(SystemExit): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) def test_branch_missing_with_prune_returns_branch_missing(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(branch_heads={"dev": REMOTE_ID}): result = _fetch_one(tmp_path, "origin", "main", prune=True, dry_run=False) assert result["status"] == "branch_missing" def test_set_remote_head_called_after_apply_mpack(self, tmp_path: pathlib.Path) -> None: """Remote tracking pointer must only advance after apply_mpack succeeds.""" from muse.cli.commands.fetch import _fetch_one call_order: list[str] = [] def _apply(_root: pathlib.Path, _bundle: "MPack") -> "ApplyResult": call_order.append("apply_mpack") return _make_apply_result() def _set_head( remote_name: str, branch: str, commit_id: str, repo_root: pathlib.Path | None = None, ) -> None: call_order.append("set_remote_head") with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.apply_mpack", _apply), patch("muse.cli.commands.fetch.set_remote_head", _set_head), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), ): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert call_order.index("apply_mpack") < call_order.index("set_remote_head") # ── Security ────────────────────────────────────────────────────────────────── class TestSecurity: def test_ansi_in_remote_name_stripped( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _fetch_one malicious = "\x1b[31mEVIL\x1b[0m" with patch("muse.cli.commands.fetch.get_remote", return_value=None): with pytest.raises(SystemExit): _fetch_one(tmp_path, malicious, "main", prune=False, dry_run=False) assert "\x1b[" not in capsys.readouterr().err def test_ansi_in_branch_name_stripped( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _fetch_one malicious_branch = "\x1b[31mHACKED\x1b[0m" with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock({"main": REMOTE_ID})), ): with pytest.raises(SystemExit): _fetch_one(tmp_path, "origin", malicious_branch, prune=False, dry_run=False) assert "\x1b[" not in capsys.readouterr().err def test_available_branches_sanitized_in_error( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: """Branch names returned by the remote must be sanitized before printing.""" from muse.cli.commands.fetch import _fetch_one malicious_branch = "\x1b[32mhijacked\x1b[0m" with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock({malicious_branch: REMOTE_ID})), ): with pytest.raises(SystemExit): _fetch_one(tmp_path, "origin", "no-such", prune=False, dry_run=False) assert "\x1b[" not in capsys.readouterr().err def test_symlink_traversal_blocked_in_stale_ref_names( self, tmp_path: pathlib.Path ) -> None: from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) refs_dir = remotes_dir(tmp_path) / "origin" refs_dir.mkdir(parents=True, exist_ok=True) (tmp_path / "secret.txt").write_text("top-secret") (refs_dir / "malicious").symlink_to(tmp_path / "secret.txt") assert "malicious" not in _stale_ref_names(tmp_path, "origin", {}) def test_all_diagnostics_go_to_stderr_not_stdout( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _fetch_one with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), ): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert capsys.readouterr().out == "" # ── E2E: CLI via CliRunner ──────────────────────────────────────────────────── def _invoke(*args: str, branch_heads: Manifest | None = None) -> InvokeResult: """Invoke ``muse fetch`` with all transport-layer functions mocked.""" transport = _make_transport_mock(branch_heads) with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=transport), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), ): return runner.invoke(cli, ["fetch", *args]) class TestCLIFetch: def test_basic_fetch_exits_zero(self) -> None: assert _invoke().exit_code == 0 def test_already_up_to_date_exits_zero(self) -> None: with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.fetch.get_remote_head", return_value=REMOTE_ID), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), ): result = runner.invoke(cli, ["fetch"]) assert result.exit_code == 0 def test_json_schema_complete(self) -> None: result = _invoke("--json") assert result.exit_code == 0 data = _json_line(result) assert "results" in data assert "dry_run" in data r = data["results"][0] for key in ("remote", "branch", "status", "commits_received", "blobs_written", "head", "pruned", "dry_run"): assert key in r, f"Missing key: {key}" assert r["status"] in {"fetched", "up_to_date", "dry_run", "branch_missing"} def test_json_flag_produces_valid_json(self) -> None: data = _json_line(_invoke("--json")) assert "exit_code" in data def test_dry_run_exits_zero(self) -> None: assert _invoke("--dry-run").exit_code == 0 def test_dry_run_json_status(self) -> None: result = _invoke("--dry-run", "--json") assert result.exit_code == 0 data = _json_line(result) assert data["dry_run"] is True assert data["results"][0]["status"] == "dry_run" def test_branch_flag(self) -> None: assert _invoke("--branch", "dev", branch_heads={"dev": REMOTE_ID}).exit_code == 0 def test_branch_flag_json_carries_branch(self) -> None: result = _invoke("--branch", "dev", "--json", branch_heads={"dev": REMOTE_ID}) assert result.exit_code == 0 assert _json_line(result)["results"][0]["branch"] == "dev" def test_unknown_remote_exits_nonzero(self) -> None: with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.get_remote", return_value=None), ): result = runner.invoke(cli, ["fetch", "no-such-remote"]) assert result.exit_code != 0 def test_prune_flag_succeeds(self) -> None: assert _invoke("--prune").exit_code == 0 def test_prune_json_has_pruned_list(self) -> None: result = _invoke("--prune", "--json") assert result.exit_code == 0 assert isinstance(_json_line(result)["results"][0]["pruned"], list) def test_json_on_stdout_parseable(self) -> None: result = _invoke("--json") assert result.exit_code == 0 data = _json_line(result) assert "results" in data class TestCLIFetchAll: def _invoke_all(self, *extra: str, branch_heads: Manifest | None = None) -> InvokeResult: remotes = [ {"name": "origin", "url": "http://origin"}, {"name": "upstream", "url": "http://upstream"}, ] transport = _make_transport_mock(branch_heads or {"main": REMOTE_ID}) with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.list_remotes", return_value=remotes), patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=transport), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), ): return runner.invoke(cli, ["fetch", "--all", *extra]) def test_all_exits_zero(self) -> None: assert self._invoke_all().exit_code == 0 def test_all_json_has_result_per_remote(self) -> None: result = self._invoke_all("--json") assert result.exit_code == 0 data = _json_line(result) assert len(data["results"]) == 2 remotes_seen = {r["remote"] for r in data["results"]} assert "origin" in remotes_seen assert "upstream" in remotes_seen def test_all_plus_branch_uses_named_branch(self) -> None: """--all --branch dev must fetch 'dev' from every remote.""" result = self._invoke_all("--branch", "dev", "--json", branch_heads={"dev": REMOTE_ID}) assert result.exit_code == 0 data = _json_line(result) for r in data["results"]: assert r["branch"] == "dev" def test_all_no_remotes_exits_nonzero(self) -> None: with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.list_remotes", return_value=[]), ): result = runner.invoke(cli, ["fetch", "--all"]) assert result.exit_code != 0 # ── Stress: concurrent filesystem ──────────────────────────────────────────── class TestStressConcurrent: def test_8_concurrent_prune_scans_isolated_repos(self, tmp_path: pathlib.Path) -> None: """_prune_stale_refs on isolated repos must not interfere across threads.""" from muse.cli.commands.fetch import _prune_stale_refs errors: list[str] = [] def _do(idx: int) -> None: try: repo = tmp_path / f"repo{idx}" repo.mkdir() _init_repo(repo) _write_remote_ref(repo, "origin", "stale-branch", OLD_REMOTE_ID) pruned = _prune_stale_refs(repo, "origin", {}, dry_run=False) assert pruned == ["origin/stale-branch"] assert not (remotes_dir(repo) / "origin" / "stale-branch").exists() except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent prune failures: {errors}"