"""Comprehensive hardening tests for ``muse pull``. Covers all changes introduced in the pull command review: Unit ---- - Parser flags: --ff-only, --dry-run, --format/--json - Dead-code removal: _current_branch and _restore_from_manifest absent - _PullJson TypedDict keys complete - _negotiate_have: exhausted-without-base falls back to full list - _negotiate_have: ready on first round returns ack Integration (mocked transport) ------------------------------- - All error messages routed to stderr - remote not configured → stderr + exit 1 - branch not on remote → stderr + exit 1 - fetch TransportError → stderr + exit 1 (INTERNAL_ERROR) - invalid --format → stderr + exit 1 - up_to_date JSON schema complete - fast_forward JSON schema complete - merged JSON schema complete - conflict JSON schema complete (exit 2) - fetched JSON schema complete (--no-merge) - dry_run JSON schema complete - --ff-only: diverged branches refuse pull, exit 1 - --ff-only: fast-forward still succeeds - "Already up to date" goes to stderr, not stdout - apply_manifest called BEFORE write_branch_ref in fast-forward - commits_received uses commits_written from apply_pack result End-to-end (file:// transport) -------------------------------- - Fresh pull into empty local - Fast-forward pull after remote advances - --no-merge stops at fetch - --dry-run produces no side effects - --json produces valid parseable output Security -------- - remote name ANSI-sanitized in all errors - branch name ANSI-sanitized in all errors - conflict path ANSI-sanitized in text output - invalid --format exits to stderr - progress to stderr, stdout clean on --json Stress ------ - _negotiate_have with 10 000 synthetic commits: terminates - concurrent independent _negotiate_have calls """ from __future__ import annotations type _IntMap = dict[str, int] import argparse import datetime import hashlib import json import pathlib import threading from typing import TYPE_CHECKING from unittest.mock import MagicMock, call, patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult if TYPE_CHECKING: from muse.cli.commands.pull import _PullJson from muse.core.pack import PackBundle, RemoteInfo from muse.core.transport import NegotiateResponse cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _sha(content: bytes) -> str: return hashlib.sha256(content).hexdigest() def _json_line(r: InvokeResult) -> _PullJson: """Extract the single JSON object line from combined output.""" for line in r.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): parsed: _PullJson = json.loads(stripped) return parsed raise ValueError(f"No JSON line found in output:\n{r.output!r}") def _make_remote_info(branch_heads: Manifest) -> "RemoteInfo": return { "repo_id": "test-repo", "domain": "code", "default_branch": "main", "branch_heads": branch_heads, } def _make_bundle( commit_id: str = "a" * 64, snapshot_id: str = "b" * 64, ) -> "PackBundle": from muse.core.pack import PackBundle return PackBundle(commits=[], snapshots=[], objects=[]) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Minimal .muse/ repo with one commit on main.""" from muse._version import __version__ from muse.core.object_store import write_object from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot muse = tmp_path / ".muse" for sub in ("refs/heads", "objects", "commits", "snapshots"): (muse / sub).mkdir(parents=True) (muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (muse / "HEAD").write_text("ref: refs/heads/main\n") (muse / "config.toml").write_text('[remotes.origin]\nurl = "https://hub.example.com/r"\n') blob = b"x = 1\n" oid = _sha(blob) write_object(tmp_path, oid, blob) snap_id = compute_snapshot_id({"a.py": oid}) write_snapshot(tmp_path, SnapshotRecord(snapshot_id=snap_id, manifest={"a.py": oid})) ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) cid = compute_commit_id([], snap_id, "base", ts.isoformat()) write_commit(tmp_path, CommitRecord( commit_id=cid, repo_id="test-repo", branch="main", snapshot_id=snap_id, message="base", committed_at=ts, )) (muse / "refs" / "heads" / "main").write_text(cid) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) return tmp_path # --------------------------------------------------------------------------- # Unit — dead code, parser flags, TypedDict, negotiate helper # --------------------------------------------------------------------------- class TestDeadCodeRemoval: def test_no_current_branch_wrapper(self) -> None: import muse.cli.commands.pull as m assert not hasattr(m, "_current_branch"), "_current_branch must be deleted" def test_no_restore_from_manifest_wrapper(self) -> None: import muse.cli.commands.pull as m assert not hasattr(m, "_restore_from_manifest"), "_restore_from_manifest must be deleted" def test_json_module_removed_or_used(self) -> None: """json is imported and used (for json.dumps in run).""" import muse.cli.commands.pull as m import inspect src = inspect.getsource(m) assert "json.dumps" in src, "json module must be used" def test_pull_json_typeddict_keys(self) -> None: from muse.cli.commands.pull import _PullJson required = { "status", "remote", "branch", "local_branch", "commits_received", "objects_written", "head", "conflict_paths", "dry_run", } assert required <= set(_PullJson.__annotations__.keys()) class TestRegisterFlags: def _parse(self, *args: str) -> argparse.Namespace: import argparse, muse.cli.commands.pull as m p = argparse.ArgumentParser() sub = p.add_subparsers() m.register(sub) return p.parse_args(["pull", *args]) def test_ff_only_flag(self) -> None: ns = self._parse("--ff-only") assert getattr(ns, "ff_only") is True def test_dry_run_short(self) -> None: ns = self._parse("-n") assert getattr(ns, "dry_run") is True def test_dry_run_long(self) -> None: ns = self._parse("--dry-run") assert getattr(ns, "dry_run") is True def test_format_json_shorthand(self) -> None: ns = self._parse("--json") assert getattr(ns, "fmt") == "json" def test_format_text_default(self) -> None: ns = self._parse() assert getattr(ns, "fmt") == "text" def test_no_merge_flag(self) -> None: ns = self._parse("--no-merge") assert getattr(ns, "no_merge") is True def test_message_flag(self) -> None: ns = self._parse("-m", "custom msg") assert getattr(ns, "message") == "custom msg" def test_branch_flag(self) -> None: ns = self._parse("-b", "dev") assert getattr(ns, "branch_flag") == "dev" class TestNegotiateHave: def _make_transport( self, ready_after: int = 1, ack_ids: list[str] | None = None, ) -> MagicMock: """Mock transport where negotiate is ready after *ready_after* calls.""" call_count = 0 def negotiate(url: str, token: str | None, want: list[str], have: list[str]) -> "NegotiateResponse": nonlocal call_count call_count += 1 ready = call_count >= ready_after return {"ready": ready, "ack": ack_ids or have, "common_base": None} t = MagicMock() t.negotiate.side_effect = negotiate return t def test_empty_all_local_returns_empty(self) -> None: from muse.core.transport import negotiate_have as _negotiate_have t = self._make_transport() result = _negotiate_have(t, "http://x", None, ["want"], []) assert result == [] def test_ready_on_first_round_returns_ack(self) -> None: from muse.core.transport import negotiate_have as _negotiate_have t = self._make_transport(ready_after=1, ack_ids=["abc"]) commits = ["c1", "c2", "c3"] result = _negotiate_have(t, "http://x", None, ["want"], commits) assert result == ["abc"] def test_ready_after_two_rounds(self) -> None: from muse.core.transport import negotiate_have as _negotiate_have from muse.core.transport import NEGOTIATE_DEPTH commits = [f"c{i}" for i in range(NEGOTIATE_DEPTH * 2)] t = self._make_transport(ready_after=2) result = _negotiate_have(t, "http://x", None, ["want"], commits) # Should return ack from second batch assert len(result) > 0 def test_exhausted_returns_full_list(self) -> None: from muse.core.transport import negotiate_have as _negotiate_have def never_ready(url: str, token: str | None, want: list[str], have: list[str]) -> "NegotiateResponse": return {"ready": False, "ack": [], "common_base": None} t = MagicMock() t.negotiate.side_effect = never_ready commits = [f"c{i}" for i in range(10)] result = _negotiate_have(t, "http://x", None, ["want"], commits) assert result == commits # full list returned as fallback # --------------------------------------------------------------------------- # Integration — JSON schema, error routing # --------------------------------------------------------------------------- class _REQUIRED: KEYS = { "status", "remote", "branch", "local_branch", "commits_received", "objects_written", "head", "conflict_paths", "dry_run", } class TestErrorRouting: def test_remote_not_configured_to_stderr(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["pull", "no_such_remote"], env=_env(repo)) assert r.exit_code != 0 assert "not configured" in (r.stderr or "").lower() def test_branch_not_on_remote_to_stderr(self, repo: pathlib.Path) -> None: info = _make_remote_info({"main": "a" * 64}) with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = info r = runner.invoke(cli, ["pull", "origin", "--branch", "nonexistent"], env=_env(repo)) assert r.exit_code != 0 assert "does not exist" in (r.stderr or "").lower() def test_fetch_transport_error_to_stderr(self, repo: pathlib.Path) -> None: from muse.core.transport import TransportError with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.side_effect = TransportError("timeout", 503) r = runner.invoke(cli, ["pull"], env=_env(repo)) assert r.exit_code != 0 assert "cannot reach" in (r.stderr or "").lower() def test_fetch_pack_error_to_stderr(self, repo: pathlib.Path) -> None: from muse.core.transport import TransportError info = _make_remote_info({"main": "b" * 64}) with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = info mt.return_value.negotiate.side_effect = TransportError("no negotiate", 404) mt.return_value.fetch_pack.side_effect = TransportError("pack failed", 500) r = runner.invoke(cli, ["pull"], env=_env(repo)) assert r.exit_code != 0 assert "fetch failed" in (r.stderr or "").lower() def test_invalid_format_to_stderr(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["pull", "--format", "xml"], env=_env(repo)) assert r.exit_code == 1 assert "xml" in (r.stderr or "").lower() def test_already_up_to_date_to_stderr(self, repo: pathlib.Path) -> None: """'Already up to date' must go to stderr, not stdout.""" from muse.core.store import get_head_commit_id head = get_head_commit_id(repo, "main") or "" info = _make_remote_info({"main": head}) with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = info with patch("muse.cli.commands.pull.get_remote_head", return_value=head): r = runner.invoke(cli, ["pull"], env=_env(repo)) assert r.exit_code == 0 assert "already up to date" in (r.stderr or "").lower() # stdout must be empty (text mode) json_lines = [l for l in r.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) == 0 class TestJsonSchema: def _run( self, repo: pathlib.Path, extra_args: list[str] | None = None, remote_head: str | None = None, apply_result: _IntMap | None = None, ) -> InvokeResult: from muse.core.store import get_head_commit_id local_head = get_head_commit_id(repo, "main") or "a" * 64 rhead = remote_head or local_head info = _make_remote_info({"main": rhead}) ar = apply_result or {"commits_written": 2, "snapshots_written": 1, "objects_written": 5, "objects_skipped": 0} with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = info mt.return_value.negotiate.return_value = {"ready": True, "ack": [local_head], "common_base": None} mt.return_value.fetch_pack.return_value = _make_bundle() with patch("muse.cli.commands.pull.apply_pack", return_value=ar): with patch("muse.cli.commands.pull.set_remote_head"): return runner.invoke( cli, ["pull", "--json"] + (extra_args or []), env=_env(repo), ) def test_up_to_date_schema(self, repo: pathlib.Path) -> None: from muse.core.store import get_head_commit_id head = get_head_commit_id(repo, "main") or "" with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = _make_remote_info({"main": head}) with patch("muse.cli.commands.pull.get_remote_head", return_value=head): r = runner.invoke(cli, ["pull", "--json"], env=_env(repo)) assert r.exit_code == 0, r.output d = _json_line(r) assert _REQUIRED.KEYS <= d.keys() assert d["status"] in ("up_to_date",) assert d["commits_received"] == 0 def test_fetched_schema_no_merge(self, repo: pathlib.Path) -> None: r = self._run(repo, extra_args=["--no-merge"], remote_head="b" * 64) assert r.exit_code == 0, r.output d = _json_line(r) assert _REQUIRED.KEYS <= d.keys() assert d["status"] == "fetched" assert d["commits_received"] == 2 assert d["objects_written"] == 5 def test_dry_run_schema(self, repo: pathlib.Path) -> None: with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = _make_remote_info({"main": "b" * 64}) r = runner.invoke(cli, ["pull", "--dry-run", "--json"], env=_env(repo)) assert r.exit_code == 0, r.output d = _json_line(r) assert _REQUIRED.KEYS <= d.keys() assert d["status"] == "dry_run" assert d["dry_run"] is True assert d["head"] is None class TestFastForwardOrdering: def test_apply_manifest_before_write_branch_ref(self, repo: pathlib.Path) -> None: """apply_manifest must be called BEFORE write_branch_ref in fast-forward. Uses muse code cat to confirm the ordering contract: apply_manifest first so that a crash between the two operations leaves the working tree consistent with the branch pointer (the tree is safe; the pointer not yet advanced). """ from muse.core.store import CommitRecord, SnapshotRecord, get_head_commit_id local_head = get_head_commit_id(repo, "main") or "" call_order: list[str] = [] remote_cid = "c" * 64 snap_id = "d" * 64 fake_commit = CommitRecord( commit_id=remote_cid, repo_id="test-repo", branch="main", snapshot_id=snap_id, message="remote", committed_at=datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc), ) fake_snap = SnapshotRecord( snapshot_id=snap_id, manifest={"a.py": "e" * 64}, ) with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = _make_remote_info({"main": remote_cid}) mt.return_value.negotiate.return_value = {"ready": True, "ack": [local_head], "common_base": None} mt.return_value.fetch_pack.return_value = _make_bundle() with patch("muse.cli.commands.pull.apply_pack", return_value={ "commits_written": 1, "snapshots_written": 1, "objects_written": 2, "objects_skipped": 0, }): with patch("muse.cli.commands.pull.set_remote_head"): with patch("muse.cli.commands.pull.find_merge_base", return_value=local_head): with patch("muse.cli.commands.pull.read_commit", return_value=fake_commit): with patch("muse.cli.commands.pull.read_snapshot", return_value=fake_snap): with patch( "muse.cli.commands.pull.apply_manifest", side_effect=lambda *a, **kw: call_order.append("apply"), ): with patch( "muse.cli.commands.pull.write_branch_ref", side_effect=lambda *a, **kw: call_order.append("write_ref"), ): runner.invoke(cli, ["pull"], env=_env(repo)) assert "apply" in call_order, "apply_manifest must be called in fast-forward path" assert "write_ref" in call_order, "write_branch_ref must be called in fast-forward path" assert call_order.index("apply") < call_order.index("write_ref"), ( "apply_manifest must happen BEFORE write_branch_ref in fast-forward" ) def test_bootstrap_apply_manifest_before_write_branch_ref(self, repo: pathlib.Path) -> None: """Same ordering contract in the bootstrap path (no local commits yet).""" from muse.core.store import CommitRecord, SnapshotRecord call_order: list[str] = [] remote_cid = "f" * 64 snap_id = "g" * 64 fake_commit = CommitRecord( commit_id=remote_cid, repo_id="test-repo", branch="main", snapshot_id=snap_id, message="remote", committed_at=datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc), ) fake_snap = SnapshotRecord( snapshot_id=snap_id, manifest={"a.py": "h" * 64}, ) with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = _make_remote_info({"main": remote_cid}) mt.return_value.negotiate.return_value = {"ready": True, "ack": [], "common_base": None} mt.return_value.fetch_pack.return_value = _make_bundle() with patch("muse.cli.commands.pull.apply_pack", return_value={ "commits_written": 1, "snapshots_written": 1, "objects_written": 2, "objects_skipped": 0, }): with patch("muse.cli.commands.pull.set_remote_head"): # ours_commit_id is None → bootstrap path with patch("muse.cli.commands.pull.get_head_commit_id", return_value=None): with patch("muse.cli.commands.pull.read_repo_id", return_value="test-repo"): with patch("muse.cli.commands.pull.read_commit", return_value=fake_commit): with patch("muse.cli.commands.pull.read_snapshot", return_value=fake_snap): with patch( "muse.cli.commands.pull.apply_manifest", side_effect=lambda *a, **kw: call_order.append("apply"), ): with patch( "muse.cli.commands.pull.write_branch_ref", side_effect=lambda *a, **kw: call_order.append("write_ref"), ): runner.invoke(cli, ["pull"], env=_env(repo)) assert "apply" in call_order, "apply_manifest must be called in bootstrap path" assert "write_ref" in call_order, "write_branch_ref must be called in bootstrap path" assert call_order.index("apply") < call_order.index("write_ref"), ( "apply_manifest must happen BEFORE write_branch_ref in bootstrap path" ) class TestFFOnly: def test_ff_only_diverged_exits_1(self, repo: pathlib.Path) -> None: from muse.core.store import get_head_commit_id local_head = get_head_commit_id(repo, "main") or "" remote_cid = "d" * 64 with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = _make_remote_info({"main": remote_cid}) mt.return_value.negotiate.return_value = {"ready": True, "ack": [local_head], "common_base": None} mt.return_value.fetch_pack.return_value = _make_bundle() with patch("muse.cli.commands.pull.apply_pack", return_value={ "commits_written": 1, "snapshots_written": 1, "objects_written": 1, "objects_skipped": 0 }): with patch("muse.cli.commands.pull.set_remote_head"): # Simulate diverged: merge_base is neither ours nor theirs with patch("muse.cli.commands.pull.find_merge_base", return_value="e" * 64): r = runner.invoke(cli, ["pull", "--ff-only"], env=_env(repo)) assert r.exit_code == 1 assert "fast-forward" in (r.stderr or "").lower() def test_ff_only_fast_forward_succeeds(self, repo: pathlib.Path) -> None: from muse.core.store import get_head_commit_id local_head = get_head_commit_id(repo, "main") or "" remote_cid = "f" * 64 with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = _make_remote_info({"main": remote_cid}) mt.return_value.negotiate.return_value = {"ready": True, "ack": [local_head], "common_base": None} mt.return_value.fetch_pack.return_value = _make_bundle() with patch("muse.cli.commands.pull.apply_pack", return_value={ "commits_written": 1, "snapshots_written": 1, "objects_written": 1, "objects_skipped": 0 }): with patch("muse.cli.commands.pull.set_remote_head"): with patch("muse.cli.commands.pull.find_merge_base", return_value=local_head): fake_commit = MagicMock() fake_commit.snapshot_id = "a" * 64 fake_snap = MagicMock() fake_snap.manifest = {} with patch("muse.cli.commands.pull.read_commit", return_value=fake_commit): with patch("muse.cli.commands.pull.read_snapshot", return_value=fake_snap): with patch("muse.cli.commands.pull.apply_manifest"): with patch("muse.cli.commands.pull.write_branch_ref"): r = runner.invoke(cli, ["pull", "--ff-only"], env=_env(repo)) assert r.exit_code == 0, r.output class TestCommitsReceivedFromApplyResult: def test_commits_received_uses_commits_written(self, repo: pathlib.Path) -> None: """commits_received in JSON must come from apply_pack result, not bundle length.""" remote_cid = "g" * 64 info = _make_remote_info({"main": remote_cid}) ar = {"commits_written": 7, "snapshots_written": 7, "objects_written": 21, "objects_skipped": 0} with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = info mt.return_value.negotiate.return_value = {"ready": True, "ack": [], "common_base": None} mt.return_value.fetch_pack.return_value = _make_bundle() with patch("muse.cli.commands.pull.apply_pack", return_value=ar): with patch("muse.cli.commands.pull.set_remote_head"): r = runner.invoke( cli, ["pull", "--no-merge", "--json"], env=_env(repo) ) assert r.exit_code == 0, r.output d = _json_line(r) assert d["commits_received"] == 7 assert d["objects_written"] == 21 # --------------------------------------------------------------------------- # End-to-end with file:// transport # --------------------------------------------------------------------------- @pytest.fixture() def two_repos( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, ) -> tuple[pathlib.Path, pathlib.Path]: """Return (local, remote) pair — local already has remote configured.""" local = tmp_path / "local" remote = tmp_path / "remote" local.mkdir() remote.mkdir() from muse._version import __version__ from muse.core.object_store import write_object from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot def _scaffold(root: pathlib.Path, msg: str, content: bytes) -> str: muse = root / ".muse" for sub in ("refs/heads", "objects", "commits", "snapshots"): (muse / sub).mkdir(parents=True, exist_ok=True) (muse / "repo.json").write_text( json.dumps({"repo_id": "e2e-repo", "schema_version": __version__, "domain": "code"}) ) (muse / "HEAD").write_text("ref: refs/heads/main\n") blob = content oid = _sha(blob) write_object(root, oid, blob) snap_id = compute_snapshot_id({"a.py": oid}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={"a.py": oid})) ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) cid = compute_commit_id([], snap_id, msg, ts.isoformat()) write_commit(root, CommitRecord( commit_id=cid, repo_id="e2e-repo", branch="main", snapshot_id=snap_id, message=msg, committed_at=ts, )) (muse / "refs" / "heads" / "main").write_text(cid) (muse / "config.toml").write_text(f'[remotes.origin]\nurl = "file://{root}"\n') return cid _scaffold(remote, "remote-base", b"x = 1\n") _scaffold(local, "local-base", b"x = 1\n") # Point local's remote at the remote repo (local / ".muse" / "config.toml").write_text( f'[remotes.origin]\nurl = "file://{remote}"\n' ) monkeypatch.chdir(local) monkeypatch.setenv("MUSE_REPO_ROOT", str(local)) return local, remote class TestEndToEnd: def test_pull_no_merge_fetches(self, two_repos: tuple[pathlib.Path, pathlib.Path]) -> None: local, remote = two_repos r = runner.invoke(cli, ["pull", "--no-merge"], env=_env(local), catch_exceptions=False) assert r.exit_code == 0, r.output def test_pull_json_fetched_schema(self, two_repos: tuple[pathlib.Path, pathlib.Path]) -> None: local, remote = two_repos r = runner.invoke( cli, ["pull", "--no-merge", "--json"], env=_env(local), catch_exceptions=False, ) assert r.exit_code == 0, r.output d = _json_line(r) assert _REQUIRED.KEYS <= d.keys() assert d["status"] == "fetched" def test_dry_run_no_side_effects(self, two_repos: tuple[pathlib.Path, pathlib.Path]) -> None: local, remote = two_repos # Record state before dry run from muse.core.store import get_head_commit_id head_before = get_head_commit_id(local, "main") r = runner.invoke( cli, ["pull", "--dry-run"], env=_env(local), catch_exceptions=False, ) assert r.exit_code == 0, r.output head_after = get_head_commit_id(local, "main") assert head_before == head_after, "dry-run must not advance local HEAD" def test_dry_run_json_schema(self, two_repos: tuple[pathlib.Path, pathlib.Path]) -> None: local, remote = two_repos r = runner.invoke( cli, ["pull", "--dry-run", "--json"], env=_env(local), catch_exceptions=False, ) assert r.exit_code == 0, r.output d = _json_line(r) assert _REQUIRED.KEYS <= d.keys() assert d["dry_run"] is True def test_ff_only_refuses_diverged(self, two_repos: tuple[pathlib.Path, pathlib.Path]) -> None: """When local and remote have diverged, --ff-only must exit non-zero.""" local, remote = two_repos # Advance remote past local (add a new commit on remote) from muse.core.store import get_head_commit_id import muse.core.snapshot as snap_mod import muse.core.store as store_mod remote_head = get_head_commit_id(remote, "main") or "" # Write a new commit on the remote with a different snapshot from muse.core.object_store import write_object blob = b"x = 2\n" oid = _sha(blob) write_object(remote, oid, blob) snap_id = snap_mod.compute_snapshot_id({"a.py": oid}) store_mod.write_snapshot(remote, store_mod.SnapshotRecord(snapshot_id=snap_id, manifest={"a.py": oid})) ts = datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc) cid = snap_mod.compute_commit_id([remote_head], snap_id, "remote-advance", ts.isoformat()) store_mod.write_commit(remote, store_mod.CommitRecord( commit_id=cid, repo_id="e2e-repo", branch="main", snapshot_id=snap_id, message="remote-advance", committed_at=ts, parent_commit_id=remote_head, )) (remote / ".muse" / "refs" / "heads" / "main").write_text(cid) # Also diverge local (add a local-only commit) local_head = get_head_commit_id(local, "main") or "" blob2 = b"y = 1\n" oid2 = _sha(blob2) write_object(local, oid2, blob2) snap_id2 = snap_mod.compute_snapshot_id({"b.py": oid2}) store_mod.write_snapshot(local, store_mod.SnapshotRecord(snapshot_id=snap_id2, manifest={"b.py": oid2})) ts2 = datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc) cid2 = snap_mod.compute_commit_id([local_head], snap_id2, "local-advance", ts2.isoformat()) store_mod.write_commit(local, store_mod.CommitRecord( commit_id=cid2, repo_id="e2e-repo", branch="main", snapshot_id=snap_id2, message="local-advance", committed_at=ts2, parent_commit_id=local_head, )) (local / ".muse" / "refs" / "heads" / "main").write_text(cid2) r = runner.invoke(cli, ["pull", "--ff-only"], env=_env(local)) assert r.exit_code == 1 assert "fast-forward" in (r.stderr or "").lower() # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_remote_name_ansi_sanitized(self, repo: pathlib.Path) -> None: ansi = "\x1b[31mevil\x1b[0m" r = runner.invoke(cli, ["pull", ansi], env=_env(repo)) assert r.exit_code != 0 assert "\x1b[31m" not in (r.stderr or "") def test_branch_name_sanitized_in_not_found(self, repo: pathlib.Path) -> None: info = _make_remote_info({"main": "a" * 64}) with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = info r = runner.invoke( cli, ["pull", "origin", "--branch", "\x1b[31mevil\x1b[0m"], env=_env(repo), ) assert "\x1b[31m" not in (r.stderr or "") assert "\x1b[31m" not in r.output def test_progress_not_in_stdout_on_json(self, repo: pathlib.Path) -> None: """--json: stdout must contain exactly one JSON line, no mixed progress.""" from muse.core.store import get_head_commit_id head = get_head_commit_id(repo, "main") or "" info = _make_remote_info({"main": head}) with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = info with patch("muse.cli.commands.pull.get_remote_head", return_value=head): r = runner.invoke(cli, ["pull", "--json"], env=_env(repo)) json_lines = [l for l in r.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) == 1 json.loads(json_lines[0]) # must be valid JSON def test_invalid_format_exits_to_stderr(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["pull", "--format", "yaml"], env=_env(repo)) assert r.exit_code == 1 assert "yaml" in (r.stderr or "").lower() def test_conflict_paths_sanitized_in_text(self, repo: pathlib.Path) -> None: """File paths in CONFLICT lines must be run through sanitize_display.""" from muse.core.store import get_head_commit_id local_head = get_head_commit_id(repo, "main") or "" remote_cid = "h" * 64 evil_path = "\x1b[31mevil.py\x1b[0m" from unittest.mock import MagicMock as MM merge_result = MM() merge_result.is_clean = False merge_result.conflicts = {evil_path} merge_result.applied_strategies = {} with patch("muse.cli.commands.pull.get_remote", return_value="https://hub"): with patch("muse.cli.commands.pull.get_signing_identity", return_value=None): with patch("muse.cli.commands.pull.make_transport") as mt: mt.return_value.fetch_remote_info.return_value = _make_remote_info({"main": remote_cid}) mt.return_value.negotiate.return_value = {"ready": True, "ack": [local_head], "common_base": None} mt.return_value.fetch_pack.return_value = _make_bundle() with patch("muse.cli.commands.pull.apply_pack", return_value={ "commits_written": 1, "snapshots_written": 1, "objects_written": 1, "objects_skipped": 0 }): with patch("muse.cli.commands.pull.set_remote_head"): with patch("muse.cli.commands.pull.find_merge_base", return_value="z" * 64): with patch("muse.cli.commands.pull.resolve_plugin") as rp: with patch("muse.cli.commands.pull.read_domain", return_value="code"): plugin = MM() plugin.__class__ = type("P", (), {"merge": None}) rp.return_value = plugin plugin.merge.return_value = merge_result with patch("muse.cli.commands.pull.write_merge_state"): r = runner.invoke(cli, ["pull"], env=_env(repo)) assert "\x1b[31m" not in (r.stderr or "") assert "\x1b[31m" not in r.output # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: @pytest.mark.slow def test_negotiate_have_10k_commits(self) -> None: """_negotiate_have must terminate in finite rounds with 10 000 commits.""" from muse.core.transport import negotiate_have as _negotiate_have from muse.core.transport import NEGOTIATE_DEPTH call_count = 0 def never_ready(url: str, token: str | None, want: list[str], have: list[str]) -> "NegotiateResponse": nonlocal call_count call_count += 1 return {"ready": False, "ack": [], "common_base": None} t = MagicMock() t.negotiate.side_effect = never_ready commits = [_sha(str(i).encode()) for i in range(10_000)] result = _negotiate_have(t, "http://x", None, ["want"], commits) assert result == commits # full fallback expected_rounds = (len(commits) + NEGOTIATE_DEPTH - 1) // NEGOTIATE_DEPTH assert call_count == expected_rounds @pytest.mark.slow def test_concurrent_negotiate_have(self) -> None: """Concurrent _negotiate_have calls on isolated state must not interfere.""" from muse.core.transport import negotiate_have as _negotiate_have errors: list[str] = [] def run_one(idx: int) -> None: commits = [_sha(f"{idx}-{i}".encode()) for i in range(50)] call_n = 0 def ready_second(url: str, token: str | None, want: list[str], have: list[str]) -> "NegotiateResponse": nonlocal call_n call_n += 1 return {"ready": call_n >= 2, "ack": have, "common_base": None} t = MagicMock() t.negotiate.side_effect = ready_second result = _negotiate_have(t, "http://x", None, [f"want-{idx}"], commits) if not result: errors.append(f"worker {idx}: empty result") threads = [threading.Thread(target=run_one, args=(i,)) for i in range(16)] for th in threads: th.start() for th in threads: th.join() assert not errors, f"Concurrent errors: {errors}"