"""Comprehensive hardening tests for ``muse fetch``. Coverage -------- Unit - _stale_ref_names: no-dir, all-live, stale detected, nested branches, symlink skip - _prune_stale_refs: dry-run, live delete, empty-parent cleanup, return values - negotiate_have (in transport): empty list, single-round ready, fallback, large-stress Integration (mocked transport) - _fetch_one: up-to-date, fetched, dry-run writes nothing, unknown remote, transport error, branch missing without prune, branch missing with prune, negotiate called before fetch_pack, negotiate fallback, set_remote_head after apply_pack Security - ANSI injection in remote name stripped in stderr - ANSI injection in branch name stripped in stderr - available-branches list sanitized before output - symlink traversal blocked in _stale_ref_names - all diagnostics go to stderr, not stdout E2E (via CliRunner) - basic fetch exits 0 - already-up-to-date exits 0 - --json output schema correct - --format json equivalent to --json - --dry-run exits 0 - --dry-run --json status = "dry_run" - --branch flag - --branch --json carries correct branch - unknown remote exits non-zero - --prune flag - --prune --json includes pruned list - --all fetches every remote - --all --json has N results - --all + --branch fetches named branch from every remote - --all with no remotes exits non-zero Performance - negotiate_have result used as have, not raw all_local - 10 000-commit negotiation converges in 3 rounds Stress - 8 concurrent prune scans on isolated repos - 8 concurrent negotiate_have calls """ from __future__ import annotations import contextlib import json import pathlib import threading from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult if TYPE_CHECKING: from muse.cli.commands.fetch import _FetchJson, _RemoteResultJson from muse.core.pack import ApplyResult, PackBundle from muse.core.transport import MuseTransport, NegotiateResponse cli = None runner = CliRunner() REMOTE_ID = "a" * 64 OLD_REMOTE_ID = "b" * 64 from muse.core._types import Manifest type _RemoteInfoMap = dict[str, str | dict[str, str]] # ── typed helpers ───────────────────────────────────────────────────────────── def _make_apply_result( commits_written: int = 3, objects_written: int = 7, ) -> "ApplyResult": from muse.core.pack import ApplyResult return ApplyResult( commits_written=commits_written, snapshots_written=commits_written, objects_written=objects_written, objects_skipped=0, ) def _make_bundle() -> "PackBundle": from muse.core.pack import PackBundle return PackBundle(commits=[], snapshots=[], objects=[]) def _make_remote_info( branch_heads: Manifest | None = None, ) -> _RemoteInfoMap: return { "repo_id": "test-repo-id", "domain": "code", "default_branch": "main", "branch_heads": branch_heads or {"main": REMOTE_ID}, } def _make_negotiate_response( ack: list[str] | None = None, ready: bool = True, ) -> "NegotiateResponse": return {"ack": ack or [], "common_base": None, "ready": ready} def _make_transport_mock(branch_heads: Manifest | None = None) -> MagicMock: t = MagicMock() t.fetch_remote_info.return_value = _make_remote_info(branch_heads) t.fetch_pack.return_value = _make_bundle() t.negotiate.return_value = _make_negotiate_response(ready=True) return t def _json_line(result: InvokeResult) -> "_FetchJson": """Extract the JSON object from cli_test_helper's combined output. The test helper mixes stderr into result.output, so we scan for the first line beginning with '{'. """ for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): parsed: _FetchJson = json.loads(stripped) return parsed raise ValueError(f"No JSON line in output:\n{result.output!r}") def _init_repo(tmp_path: pathlib.Path) -> None: muse_dir = tmp_path / ".muse" for sub in ("objects", "commits", "snapshots", "remotes", "refs/heads", "branches"): (muse_dir / sub).mkdir(parents=True, exist_ok=True) (muse_dir / "HEAD").write_text("ref: refs/heads/main\n") (muse_dir / "refs" / "heads" / "main").write_text("") (muse_dir / "config.toml").write_text( '[remotes.origin]\nurl = "http://localhost:19999"\n' ) (muse_dir / "repo.json").write_text('{"id": "test-repo-id"}') def _write_remote_ref( tmp_path: pathlib.Path, remote: str, branch: str, commit_id: str ) -> None: ref_file = tmp_path / ".muse" / "remotes" / remote / branch ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id) # ── Unit: _stale_ref_names ──────────────────────────────────────────────────── class TestStaleRefNames: def test_no_refs_dir_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _stale_ref_names assert _stale_ref_names(tmp_path, "origin", {"main": REMOTE_ID}) == [] def test_all_live_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "main", REMOTE_ID) assert _stale_ref_names(tmp_path, "origin", {"main": REMOTE_ID}) == [] def test_stale_branch_detected(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "main", REMOTE_ID) _write_remote_ref(tmp_path, "origin", "feat/old", OLD_REMOTE_ID) stale = _stale_ref_names(tmp_path, "origin", {"main": REMOTE_ID}) assert stale == ["feat/old"] def test_nested_branch_name_preserved(self, tmp_path: pathlib.Path) -> None: """Slashes in branch names stored as nested files must round-trip correctly.""" from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "feat/ui/redesign", REMOTE_ID) stale = _stale_ref_names(tmp_path, "origin", {}) assert "feat/ui/redesign" in stale def test_symlinks_skipped(self, tmp_path: pathlib.Path) -> None: """Symlinks inside the refs dir must not be followed (path-traversal guard).""" from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) refs_dir = tmp_path / ".muse" / "remotes" / "origin" refs_dir.mkdir(parents=True, exist_ok=True) target = tmp_path / "outside.txt" target.write_text("sensitive") (refs_dir / "evil-link").symlink_to(target) stale = _stale_ref_names(tmp_path, "origin", {}) assert "evil-link" not in stale # ── Unit: _prune_stale_refs ─────────────────────────────────────────────────── class TestPruneStaleRefs: def test_dry_run_does_not_delete( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "dead-branch", OLD_REMOTE_ID) pruned = _prune_stale_refs(tmp_path, "origin", {}, dry_run=True) assert pruned == ["origin/dead-branch"] assert (tmp_path / ".muse" / "remotes" / "origin" / "dead-branch").exists() assert "Would prune" in capsys.readouterr().err def test_live_delete_removes_file( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "dead-branch", OLD_REMOTE_ID) pruned = _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert pruned == ["origin/dead-branch"] assert not (tmp_path / ".muse" / "remotes" / "origin" / "dead-branch").exists() assert "[deleted]" in capsys.readouterr().err def test_empty_parent_dirs_removed(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "feat/old-thing", OLD_REMOTE_ID) _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert not (tmp_path / ".muse" / "remotes" / "origin" / "feat").exists() def test_returns_qualified_remote_branch_names(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "stale-a", OLD_REMOTE_ID) _write_remote_ref(tmp_path, "origin", "stale-b", OLD_REMOTE_ID) pruned = _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert "origin/stale-a" in pruned assert "origin/stale-b" in pruned def test_no_refs_dir_is_noop(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) assert _prune_stale_refs(tmp_path, "no-remote", {}, dry_run=False) == [] def test_output_goes_to_stderr_not_stdout( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _prune_stale_refs _init_repo(tmp_path) _write_remote_ref(tmp_path, "origin", "dead", OLD_REMOTE_ID) _prune_stale_refs(tmp_path, "origin", {}, dry_run=False) assert capsys.readouterr().out == "" # ── Unit: negotiate_have ────────────────────────────────────────────────────── class TestNegotiateHave: def _make_transport( self, ready_after: int = 1, ack_ids: list[str] | None = None, ) -> MagicMock: call_count = 0 def negotiate( url: str, token: str | None, want: list[str], have: list[str] ) -> "NegotiateResponse": nonlocal call_count call_count += 1 return {"ack": ack_ids or have, "common_base": None, "ready": call_count >= ready_after} t = MagicMock() t.negotiate.side_effect = negotiate return t def test_empty_local_returns_empty_no_network(self) -> None: from muse.core.transport import negotiate_have transport = self._make_transport() assert negotiate_have(transport, "http://x", None, ["want"], []) == [] transport.negotiate.assert_not_called() def test_single_round_ready_returns_ack(self) -> None: from muse.core.transport import negotiate_have transport = self._make_transport(ready_after=1, ack_ids=["common"]) result = negotiate_have(transport, "http://x", None, ["want"], ["c1", "c2"]) assert result == ["common"] def test_falls_back_to_full_list_when_never_ready(self) -> None: from muse.core.transport import negotiate_have, NEGOTIATE_DEPTH transport = self._make_transport(ready_after=999) all_local = [f"c{i}" for i in range(NEGOTIATE_DEPTH + 5)] result = negotiate_have(transport, "http://x", None, ["want"], all_local) assert result == all_local def test_stress_10k_commits_3_rounds(self) -> None: """10 000-commit history must converge in exactly 3 rounds.""" from muse.core.transport import negotiate_have transport = self._make_transport(ready_after=3) all_local = [f"c{i}" for i in range(10_000)] result = negotiate_have(transport, "http://x", None, ["want"], all_local) assert len(result) > 0 assert transport.negotiate.call_count == 3 # ── Integration: _fetch_one ─────────────────────────────────────────────────── class TestFetchOne: def _patches( self, already_known: str | None = None, branch_heads: Manifest | None = None, apply_result: "ApplyResult | None" = None, ) -> contextlib.ExitStack: stack = contextlib.ExitStack() transport = _make_transport_mock(branch_heads or {"main": REMOTE_ID}) stack.enter_context(patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999")) stack.enter_context(patch("muse.cli.commands.fetch.get_signing_identity", return_value=None)) stack.enter_context(patch("muse.cli.commands.fetch.make_transport", return_value=transport)) stack.enter_context(patch("muse.cli.commands.fetch.get_remote_head", return_value=already_known)) stack.enter_context(patch("muse.cli.commands.fetch.set_remote_head")) stack.enter_context(patch("muse.cli.commands.fetch.apply_pack", return_value=apply_result or _make_apply_result())) stack.enter_context(patch("muse.cli.commands.fetch.get_all_commits", return_value=[])) stack.enter_context(patch("muse.cli.commands.fetch.negotiate_have", return_value=[])) return stack def test_up_to_date_status(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(already_known=REMOTE_ID): result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert result["status"] == "up_to_date" assert result["commits_received"] == 0 def test_fetched_status(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(): result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert result["status"] == "fetched" assert result["commits_received"] == 3 assert result["objects_written"] == 7 def test_commits_received_from_apply_result_not_bundle(self, tmp_path: pathlib.Path) -> None: """Regression: use apply_result['commits_written'], not len(bundle['commits']).""" from muse.cli.commands.fetch import _fetch_one with self._patches(apply_result=_make_apply_result(commits_written=5, objects_written=12)): result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert result["commits_received"] == 5 assert result["objects_written"] == 12 def test_dry_run_does_not_write(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one set_mock = MagicMock() with self._patches() as stack: stack.enter_context(patch("muse.cli.commands.fetch.set_remote_head", set_mock)) result = _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=True) assert result["status"] == "dry_run" def test_unknown_remote_exits_user_error(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one from muse.core.errors import ExitCode with patch("muse.cli.commands.fetch.get_remote", return_value=None): with pytest.raises(SystemExit) as exc: _fetch_one(tmp_path, "no-such", "main", prune=False, dry_run=False) assert exc.value.code == ExitCode.USER_ERROR def test_branch_missing_without_prune_exits(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(branch_heads={"dev": REMOTE_ID}): with pytest.raises(SystemExit): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) def test_branch_missing_with_prune_returns_branch_missing(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.fetch import _fetch_one with self._patches(branch_heads={"dev": REMOTE_ID}): result = _fetch_one(tmp_path, "origin", "main", prune=True, dry_run=False) assert result["status"] == "branch_missing" def test_negotiate_called_before_fetch_pack(self, tmp_path: pathlib.Path) -> None: """MWP negotiation must precede fetch_pack to minimise wire transfer.""" from muse.cli.commands.fetch import _fetch_one call_order: list[str] = [] def _neg( _t: "MuseTransport", _url: str, _token: str | None, _want: list[str], _all: list[str], ) -> list[str]: call_order.append("negotiate_have") return ["common"] transport = MagicMock() transport.fetch_remote_info.return_value = _make_remote_info({"main": REMOTE_ID}) def _fp( url: str, token: str | None, want: list[str], have: list[str], ) -> "PackBundle": call_order.append("fetch_pack") return _make_bundle() transport.fetch_pack.side_effect = _fp with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=transport), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_pack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), patch("muse.cli.commands.fetch.negotiate_have", _neg), ): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert call_order.index("negotiate_have") < call_order.index("fetch_pack") def test_negotiate_failure_falls_back_to_full_have(self, tmp_path: pathlib.Path) -> None: """If negotiate_have raises TransportError the full local list is used.""" from muse.cli.commands.fetch import _fetch_one from muse.core.transport import TransportError local_commits = [MagicMock(commit_id=f"c{i}") for i in range(5)] captured_have: list[list[str]] = [] transport = MagicMock() transport.fetch_remote_info.return_value = _make_remote_info({"main": REMOTE_ID}) def _fp(url: str, token: str | None, want: list[str], have: list[str]) -> "PackBundle": captured_have.append(have) return _make_bundle() transport.fetch_pack.side_effect = _fp def _neg_raise( _t: "MuseTransport", _url: str, _token: str | None, _want: list[str], _all: list[str], ) -> list[str]: raise TransportError("negotiate not supported", 501) with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=transport), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_pack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=local_commits), patch("muse.cli.commands.fetch.negotiate_have", _neg_raise), ): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert captured_have[0] == [f"c{i}" for i in range(5)] def test_set_remote_head_called_after_apply_pack(self, tmp_path: pathlib.Path) -> None: """Remote tracking pointer must only advance after apply_pack succeeds.""" from muse.cli.commands.fetch import _fetch_one call_order: list[str] = [] def _apply(_root: pathlib.Path, _bundle: "PackBundle") -> "ApplyResult": call_order.append("apply_pack") return _make_apply_result() def _set_head( remote_name: str, branch: str, commit_id: str, repo_root: pathlib.Path | None = None, ) -> None: call_order.append("set_remote_head") with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.apply_pack", _apply), patch("muse.cli.commands.fetch.set_remote_head", _set_head), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), patch("muse.cli.commands.fetch.negotiate_have", return_value=[]), ): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert call_order.index("apply_pack") < call_order.index("set_remote_head") # ── Security ────────────────────────────────────────────────────────────────── class TestSecurity: def test_ansi_in_remote_name_stripped( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _fetch_one evil = "\x1b[31mEVIL\x1b[0m" with patch("muse.cli.commands.fetch.get_remote", return_value=None): with pytest.raises(SystemExit): _fetch_one(tmp_path, evil, "main", prune=False, dry_run=False) assert "\x1b[" not in capsys.readouterr().err def test_ansi_in_branch_name_stripped( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _fetch_one evil_branch = "\x1b[31mHACKED\x1b[0m" with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock({"main": REMOTE_ID})), ): with pytest.raises(SystemExit): _fetch_one(tmp_path, "origin", evil_branch, prune=False, dry_run=False) assert "\x1b[" not in capsys.readouterr().err def test_available_branches_sanitized_in_error( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: """Branch names returned by the remote must be sanitized before printing.""" from muse.cli.commands.fetch import _fetch_one evil_branch = "\x1b[32mhijacked\x1b[0m" with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock({evil_branch: REMOTE_ID})), ): with pytest.raises(SystemExit): _fetch_one(tmp_path, "origin", "no-such", prune=False, dry_run=False) assert "\x1b[" not in capsys.readouterr().err def test_symlink_traversal_blocked_in_stale_ref_names( self, tmp_path: pathlib.Path ) -> None: from muse.cli.commands.fetch import _stale_ref_names _init_repo(tmp_path) refs_dir = tmp_path / ".muse" / "remotes" / "origin" refs_dir.mkdir(parents=True, exist_ok=True) (tmp_path / "secret.txt").write_text("top-secret") (refs_dir / "evil").symlink_to(tmp_path / "secret.txt") assert "evil" not in _stale_ref_names(tmp_path, "origin", {}) def test_all_diagnostics_go_to_stderr_not_stdout( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: from muse.cli.commands.fetch import _fetch_one with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_pack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), patch("muse.cli.commands.fetch.negotiate_have", return_value=[]), ): _fetch_one(tmp_path, "origin", "main", prune=False, dry_run=False) assert capsys.readouterr().out == "" # ── E2E: CLI via CliRunner ──────────────────────────────────────────────────── def _invoke(*args: str, branch_heads: Manifest | None = None) -> InvokeResult: """Invoke ``muse fetch`` with all transport-layer functions mocked.""" transport = _make_transport_mock(branch_heads) with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=transport), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_pack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), patch("muse.cli.commands.fetch.negotiate_have", return_value=[]), ): return runner.invoke(cli, ["fetch", *args]) class TestCLIFetch: def test_basic_fetch_exits_zero(self) -> None: assert _invoke().exit_code == 0 def test_already_up_to_date_exits_zero(self) -> None: with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.fetch.get_remote_head", return_value=REMOTE_ID), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_pack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), patch("muse.cli.commands.fetch.negotiate_have", return_value=[]), ): result = runner.invoke(cli, ["fetch"]) assert result.exit_code == 0 def test_json_schema_complete(self) -> None: result = _invoke("--json") assert result.exit_code == 0 data = _json_line(result) assert "results" in data assert "dry_run" in data r = data["results"][0] for key in ("remote", "branch", "status", "commits_received", "objects_written", "head", "pruned", "dry_run"): assert key in r, f"Missing key: {key}" assert r["status"] in {"fetched", "up_to_date", "dry_run", "branch_missing"} def test_format_json_equivalent_to_json_flag(self) -> None: assert _json_line(_invoke("--json")) == _json_line(_invoke("--format", "json")) def test_dry_run_exits_zero(self) -> None: assert _invoke("--dry-run").exit_code == 0 def test_dry_run_json_status(self) -> None: result = _invoke("--dry-run", "--json") assert result.exit_code == 0 data = _json_line(result) assert data["dry_run"] is True assert data["results"][0]["status"] == "dry_run" def test_branch_flag(self) -> None: assert _invoke("--branch", "dev", branch_heads={"dev": REMOTE_ID}).exit_code == 0 def test_branch_flag_json_carries_branch(self) -> None: result = _invoke("--branch", "dev", "--json", branch_heads={"dev": REMOTE_ID}) assert result.exit_code == 0 assert _json_line(result)["results"][0]["branch"] == "dev" def test_unknown_remote_exits_nonzero(self) -> None: with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.get_remote", return_value=None), ): result = runner.invoke(cli, ["fetch", "no-such-remote"]) assert result.exit_code != 0 def test_prune_flag_succeeds(self) -> None: assert _invoke("--prune").exit_code == 0 def test_prune_json_has_pruned_list(self) -> None: result = _invoke("--prune", "--json") assert result.exit_code == 0 assert isinstance(_json_line(result)["results"][0]["pruned"], list) def test_json_on_stdout_parseable(self) -> None: result = _invoke("--json") assert result.exit_code == 0 data = _json_line(result) assert "results" in data class TestCLIFetchAll: def _invoke_all(self, *extra: str, branch_heads: Manifest | None = None) -> InvokeResult: remotes = [ {"name": "origin", "url": "http://origin"}, {"name": "upstream", "url": "http://upstream"}, ] transport = _make_transport_mock(branch_heads or {"main": REMOTE_ID}) with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.list_remotes", return_value=remotes), patch("muse.cli.commands.fetch.get_remote", return_value="http://localhost:19999"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=transport), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_pack", return_value=_make_apply_result()), patch("muse.cli.commands.fetch.get_all_commits", return_value=[]), patch("muse.cli.commands.fetch.negotiate_have", return_value=[]), ): return runner.invoke(cli, ["fetch", "--all", *extra]) def test_all_exits_zero(self) -> None: assert self._invoke_all().exit_code == 0 def test_all_json_has_result_per_remote(self) -> None: result = self._invoke_all("--json") assert result.exit_code == 0 data = _json_line(result) assert len(data["results"]) == 2 remotes_seen = {r["remote"] for r in data["results"]} assert "origin" in remotes_seen assert "upstream" in remotes_seen def test_all_plus_branch_uses_named_branch(self) -> None: """--all --branch dev must fetch 'dev' from every remote.""" result = self._invoke_all("--branch", "dev", "--json", branch_heads={"dev": REMOTE_ID}) assert result.exit_code == 0 data = _json_line(result) for r in data["results"]: assert r["branch"] == "dev" def test_all_no_remotes_exits_nonzero(self) -> None: with ( patch("muse.cli.commands.fetch.require_repo", return_value=pathlib.Path("/fake")), patch("muse.cli.commands.fetch.read_current_branch", return_value="main"), patch("muse.cli.commands.fetch.list_remotes", return_value=[]), ): result = runner.invoke(cli, ["fetch", "--all"]) assert result.exit_code != 0 # ── Performance ─────────────────────────────────────────────────────────────── class TestPerformance: def test_negotiate_result_used_as_have_not_all_local(self) -> None: """fetch_pack must receive negotiate_have output, not the raw all_local list.""" minimal = ["common-base-only"] captured_have: list[list[str]] = [] transport = MagicMock() transport.fetch_remote_info.return_value = _make_remote_info() def _fp(url: str, token: str | None, want: list[str], have: list[str]) -> "PackBundle": captured_have.append(have) return _make_bundle() transport.fetch_pack.side_effect = _fp with ( patch("muse.cli.commands.fetch.get_remote", return_value="http://x"), patch("muse.cli.commands.fetch.get_signing_identity", return_value=None), patch("muse.cli.commands.fetch.make_transport", return_value=transport), patch("muse.cli.commands.fetch.get_remote_head", return_value=None), patch("muse.cli.commands.fetch.set_remote_head"), patch("muse.cli.commands.fetch.apply_pack", return_value=_make_apply_result()), patch( "muse.cli.commands.fetch.get_all_commits", return_value=[MagicMock(commit_id=f"c{i}") for i in range(1_000)], ), patch("muse.cli.commands.fetch.negotiate_have", return_value=minimal), ): from muse.cli.commands.fetch import _fetch_one _fetch_one(pathlib.Path("/fake"), "origin", "main", prune=False, dry_run=False) assert captured_have[0] == minimal def test_large_negotiation_converges_in_3_rounds(self) -> None: from muse.core.transport import negotiate_have transport = MagicMock() rounds: list[int] = [0] def _neg(url: str, token: str | None, want: list[str], have: list[str]) -> "NegotiateResponse": rounds[0] += 1 return {"ack": have[:1], "common_base": None, "ready": rounds[0] >= 3} transport.negotiate.side_effect = _neg result = negotiate_have( transport, "http://x", None, ["want"], [f"c{i}" for i in range(10_000)] ) assert len(result) > 0 assert rounds[0] == 3 # ── Stress: concurrent filesystem and negotiation ──────────────────────────── class TestStressConcurrent: def test_8_concurrent_prune_scans_isolated_repos(self, tmp_path: pathlib.Path) -> None: """_prune_stale_refs on isolated repos must not interfere across threads.""" from muse.cli.commands.fetch import _prune_stale_refs errors: list[str] = [] def _do(idx: int) -> None: try: repo = tmp_path / f"repo{idx}" repo.mkdir() _init_repo(repo) _write_remote_ref(repo, "origin", "stale-branch", OLD_REMOTE_ID) pruned = _prune_stale_refs(repo, "origin", {}, dry_run=False) assert pruned == ["origin/stale-branch"] assert not (repo / ".muse" / "remotes" / "origin" / "stale-branch").exists() except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent prune failures: {errors}" def test_8_concurrent_negotiate_have_calls(self) -> None: """negotiate_have is stateless — 8 concurrent calls must not interfere.""" from muse.core.transport import negotiate_have errors: list[str] = [] def _do(idx: int) -> None: try: transport = MagicMock() transport.negotiate.return_value = _make_negotiate_response( ack=[f"common-{idx}"], ready=True ) result = negotiate_have( transport, "http://x", None, [f"want-{idx}"], [f"c{i}" for i in range(100)] ) assert result == [f"common-{idx}"] except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent negotiate_have failures: {errors}"