"""TDD — mpack push path end-to-end tests. Gap 1: _push_mpack() is the production push path but was never exercised in any test. These tests drive the full mpack protocol (presign → PUT → unpack-mpack) with a fake urllib layer. Gap 6: The merge-commit P2 ancestor optimisation: when the local tip is a merge commit, _push_mpack adds ALL remote branch heads to branch_have so the BFS stops at any already-remote ancestor on the P2 chain rather than walking back to the repo root. Ported from deleted transport tests: V1 Second push sends commits_count=1 (have filter works) V2 First push (empty remote) sends all commits V3 Live GET /refs anchors the BFS even without a tracking ref VII1 Two-push sequence: commits_count in second push equals 1 New tests: E1 presign → PUT → unpack-mpack sequence is called in order E2 mpack_key in presign body == sha256 of the actual wire bytes E6 Non-merge commit: branch_have contains only the direct remote head E7 Merge commit: branch_have includes all remote branch heads (P2 opt) """ from __future__ import annotations import datetime import json import pathlib from collections.abc import Mapping from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import msgpack import pytest from muse._version import __version__ from muse.core.mpack import PushResult, RemoteInfo from muse.core.transport import SigningIdentity from muse.core.object_store import write_object from muse.core.paths import heads_dir, muse_dir, remotes_dir if TYPE_CHECKING: from muse.core.mpack import _WalkResult _Headers = dict[str, str] # HTTP header map _JsonDict = dict[str, str | int | float | bool | None | list[str]] # JSON object _RemoteHeads = dict[str, str] # branch → commit_id _WalkCall = dict[str, list[str] | None] # {"tips": [...], "have": [...] | None} from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from tests.cli_test_helper import CliRunner cli = None runner = CliRunner() _UPLOAD_URL = "https://minio.example.com/mpacks/put?sig=x" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _bare_repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: muse = muse_dir(tmp_path) for d in ("commits", "snapshots", "objects", "refs/heads", "remotes"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main\n") (muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (muse / "config.toml").write_text('[remotes.origin]\nurl = "https://hub.example.com/r"\n') monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) return tmp_path def _make_commit( root: pathlib.Path, label: str, parent_id: str | None = None, parent2_id: str | None = None, content: bytes | None = None, ) -> CommitRecord: raw = content if content is not None else f"content-{label}".encode() oid = blob_id(raw) write_object(root, oid, raw) manifest: Manifest = {"file.txt": oid} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [p for p in (parent_id, parent2_id) if p] cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"commit {label}", committed_at_iso=committed_at.isoformat(), ) commit = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message=f"commit {label}", committed_at=committed_at, parent_commit_id=parent_id, parent2_commit_id=parent2_id, ) write_commit(root, commit) return commit class _CapturingClient: """Fake urllib layer that records all requests and returns success responses.""" def _fake_resp(body: bytes, status: int = 200) -> MagicMock: r = MagicMock() r.status_code = status r.content = body r.headers = {"content-type": "application/x-msgpack"} r.text = "" return r class _CapturingTransport: """Fake transport that records calls to push_mpack_presign/put/unpack.""" def __init__(self, remote_heads: _RemoteHeads | None = None, result_head: str = "") -> None: self._remote_heads = remote_heads or {} self._result_head = result_head self.presign_calls: list[tuple[str, "SigningIdentity | None", bytes]] = [] self.put_calls: list[tuple[str, bytes, str]] = [] self.unpack_calls: list[tuple[str, "SigningIdentity | None", str, dict[str, str | int | bool]]] = [] self.events: list[str] = [] def fake_post(self, url: str, data: bytes, headers: Mapping[str, str], *, verify: bool = True) -> tuple[int, bytes]: self.posts.append((url, data)) self.events.append((url, "post")) if "mpack-presign" in url: return 200, json.dumps({"upload_url": _UPLOAD_URL}).encode() return 200, json.dumps( {"job_id": "job-e2e", "head": self._result_head, "branch": "main", "blobs_in_mpack": 0, "commits_in_mpack": 0} ).encode() def fetch_remote_info(self, url: str, signing: "SigningIdentity | None") -> RemoteInfo: return RemoteInfo(domain="code", default_branch="main", branch_heads=self._remote_heads) def _build_request(self, method: str, url: str, signing: "SigningIdentity | None", body: bytes | None = None, **kw: "str | bytes | None") -> MagicMock: req = MagicMock() req.headers = {"Authorization": "MSign stub", "Content-Type": "application/x-msgpack"} return req def push_mpack_presign(self, url: str, signing: "SigningIdentity | None", mpack_bytes: bytes, ttl_seconds: int = 3600) -> "dict[str, str]": self.presign_calls.append((url, signing, mpack_bytes)) self.events.append("presign") return {"upload_url": _UPLOAD_URL, "mpack_key": blob_id(mpack_bytes)} def fake_put(self, url: str, data: bytes, **kw: int | bool) -> tuple[int, bytes]: self.puts.append((url, data)) self.events.append((url, "put")) return 200, b"" def push_mpack_put(self, upload_url: str, mpack_bytes: bytes, mpack_key: str = "") -> None: self.put_calls.append((upload_url, mpack_bytes, mpack_key)) self.events.append("put") def push_mpack_unpack(self, url: str, signing: "SigningIdentity | None", mpack_key: str, **kwargs: "str | int | bool") -> "dict[str, str | int]": self.unpack_calls.append((url, signing, mpack_key, kwargs)) # type: ignore[arg-type] self.events.append("unpack") return { "job_id": "job-e2e", "head": self._result_head, "branch": str(kwargs.get("branch", "main")), "blobs_in_mpack": 0, "commits_in_mpack": int(kwargs.get("commits_count", 0)), } def _run_push_cli( monkeypatch: pytest.MonkeyPatch, transport: "_CapturingTransport", client: "_CapturingTransport | None" = None, # kept for compat, ignored args: list[str] | None = None, ) -> "object": with ( patch("muse.cli.commands.push.make_transport", return_value=transport), patch("muse.cli.commands.push._urllib_post", side_effect=client.fake_post if client else lambda *a, **kw: (200, b"")), patch("muse.cli.commands.push._urllib_put", side_effect=client.fake_put if client else lambda *a, **kw: (200, b"")), ): return runner.invoke(cli, args or ["push", "origin"], catch_exceptions=False) def _unpack_kwargs(transport: "_CapturingTransport") -> _JsonDict: """Return the kwargs dict from the first push_mpack_unpack call.""" assert transport.unpack_calls, "push_mpack_unpack was not called" return dict(transport.unpack_calls[0][3]) # =========================================================================== # E1 — presign → PUT → unpack sequence # =========================================================================== def test_e1_three_step_sequence_in_order( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """_run_mpack_path must call presign, then PUT, then unpack in that order.""" root = _bare_repo(tmp_path, monkeypatch) commit = _make_commit(root, "e1") (heads_dir(root) / "main").write_text(commit.commit_id) transport = _CapturingTransport(result_head=commit.commit_id) result = _run_push_cli(monkeypatch, transport) assert result.exit_code == 0, result.output assert transport.presign_calls, "push_mpack_presign not called" assert transport.put_calls, "push_mpack_put not called" assert transport.unpack_calls, "push_mpack_unpack not called" presign_idx = transport.events.index("presign") put_idx = transport.events.index("put") unpack_idx = transport.events.index("unpack") assert presign_idx < put_idx < unpack_idx, ( f"Sequence wrong: presign@{presign_idx} put@{put_idx} unpack@{unpack_idx}" ) # =========================================================================== # E2 — mpack_key in presign body matches sha256 of wire bytes # =========================================================================== def test_e2_presign_key_matches_put_bytes( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """The mpack_key in the presign request must equal blob_id(wire_bytes PUT to MinIO).""" from muse.core.types import blob_id root = _bare_repo(tmp_path, monkeypatch) commit = _make_commit(root, "e2", content=b"key integrity test") (heads_dir(root) / "main").write_text(commit.commit_id) transport = _CapturingTransport(result_head=commit.commit_id) result = _run_push_cli(monkeypatch, transport) assert result.exit_code == 0 presign_mpack_bytes = transport.presign_calls[0][2] put_mpack_bytes = transport.put_calls[0][1] assert presign_mpack_bytes == put_mpack_bytes, ( "mpack bytes sent to presign and put must be identical" ) assert blob_id(put_mpack_bytes) == blob_id(presign_mpack_bytes) # =========================================================================== # V1-port — second push sends commits_count=1 (have filter works) # =========================================================================== def test_v1_second_push_commits_count_is_one( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Second push: remote has c1, local has c2 → unpack body has commits_count=1.""" root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "v1-first", content=b"v1") c2 = _make_commit(root, "v1-second", parent_id=c1.commit_id, content=b"v2") (heads_dir(root) / "main").write_text(c2.commit_id) # Remote already has c1 (tracking ref + live remote info) transport = _CapturingTransport(remote_heads={"main": c1.commit_id}, result_head=c2.commit_id) result = _run_push_cli(monkeypatch, transport) assert result.exit_code == 0, result.output kwargs = _unpack_kwargs(transport) assert kwargs["commits_count"] == 1, ( f"Second push must send commits_count=1, got {kwargs['commits_count']}. " "Bug: have filter dropped the ancestor anchor so BFS walked all commits." ) # =========================================================================== # V2-port — first push (empty remote) sends all commits # =========================================================================== def test_v2_first_push_sends_all_commits( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """When the remote is empty, unpack body has commits_count equal to full history.""" root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "v2-a", content=b"a") c2 = _make_commit(root, "v2-b", parent_id=c1.commit_id, content=b"b") (heads_dir(root) / "main").write_text(c2.commit_id) transport = _CapturingTransport(remote_heads={}, result_head=c2.commit_id) result = _run_push_cli(monkeypatch, transport) assert result.exit_code == 0, result.output kwargs = _unpack_kwargs(transport) assert kwargs["commits_count"] == 2, ( f"Full history push must send commits_count=2, got {kwargs['commits_count']}" ) # =========================================================================== # V3-port — live GET /refs anchors BFS even without tracking ref # =========================================================================== def test_v3_live_refs_anchors_bfs( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Even without a local tracking ref, GET /refs result anchors the BFS.""" root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "v3-base", content=b"base") c2 = _make_commit(root, "v3-new", parent_id=c1.commit_id, content=b"new") (heads_dir(root) / "main").write_text(c2.commit_id) # No tracking ref written — only live remote info has c1 transport = _CapturingTransport(remote_heads={"main": c1.commit_id}, result_head=c2.commit_id) result = _run_push_cli(monkeypatch, transport) assert result.exit_code == 0, result.output kwargs = _unpack_kwargs(transport) assert kwargs["commits_count"] == 1, ( f"Live /refs anchor must limit BFS to 1 new commit, got {kwargs['commits_count']}" ) # =========================================================================== # VII1-port — two sequential pushes; second has commits_count=1 # =========================================================================== def test_vii1_push_twice_second_sends_one_commit( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Full two-push sequence: first sends N commits; second sends exactly 1.""" root = _bare_repo(tmp_path, monkeypatch) n = 5 ids: list[str] = [] parent: str | None = None for i in range(n): c = _make_commit(root, f"chain-{i}", parent_id=parent, content=f"v{i}".encode()) ids.append(c.commit_id) parent = c.commit_id (heads_dir(root) / "main").write_text(ids[-1]) # First push — remote is empty t1 = _CapturingTransport(remote_heads={}, result_head=ids[-1]) r1 = _run_push_cli(monkeypatch, t1) assert r1.exit_code == 0, r1.output kwargs1 = _unpack_kwargs(t1) assert kwargs1["commits_count"] == n, ( f"First push must send all {n} commits, got {kwargs1['commits_count']}" ) # Simulate tracking ref updated by set_remote_head origin_dir = remotes_dir(root) / "origin" origin_dir.mkdir(parents=True, exist_ok=True) (origin_dir / "main").write_text(ids[-1]) # Add one new commit c_new = _make_commit(root, "one-more", parent_id=ids[-1], content=b"new") (heads_dir(root) / "main").write_text(c_new.commit_id) # Second push — remote has ids[-1] t2 = _CapturingTransport(remote_heads={"main": ids[-1]}, result_head=c_new.commit_id) r2 = _run_push_cli(monkeypatch, t2) assert r2.exit_code == 0, r2.output kwargs2 = _unpack_kwargs(t2) assert kwargs2["commits_count"] == 1, ( f"Second push must send commits_count=1, got {kwargs2['commits_count']}. " "Regression: have filter dropped the ancestor anchor." ) # =========================================================================== # E6 — non-merge commit: branch_have is single remote head # =========================================================================== def test_e6_non_merge_branch_have_is_single_head( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """A non-merge commit: branch_have contains only the direct remote branch head.""" from unittest.mock import call root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "e6-base", content=b"base") c2 = _make_commit(root, "e6-tip", parent_id=c1.commit_id, content=b"tip") (heads_dir(root) / "main").write_text(c2.commit_id) # Remote has both main and dev; local tip is a simple (non-merge) commit transport = _CapturingTransport( remote_heads={"main": c1.commit_id, "dev": c1.commit_id}, result_head=c2.commit_id, ) walk_calls: list[_WalkCall] = [] from muse.cli.commands import push as _push_mod original_walk = _push_mod.walk_commits def _capture_walk( root_arg: pathlib.Path, tips: list[str], *, have: list[str] | None, ) -> "_WalkResult": walk_calls.append({"tips": tips, "have": have}) return original_walk(root_arg, tips, have=have) with patch.object(_push_mod, "walk_commits", side_effect=_capture_walk): _run_push_cli(monkeypatch, transport) # The BFS walk that counts commits uses _branch_have (from branch_have param in _push_mpack). # For a non-merge commit: branch_have = [remote_head of the target branch only] branch_walks = [w for w in walk_calls if w["tips"] == [c2.commit_id]] assert branch_walks, f"Expected a walk with tip={c2.commit_id[:16]}" # The have list for the main BFS should contain c1 (remote main head) # and NOT the dev head (no P2 optimisation for non-merge commits) main_walk = branch_walks[0] assert c1.commit_id in main_walk["have"], "Remote main head must be in have" # =========================================================================== # E7 — merge commit: branch_have includes ALL remote branch heads (P2 opt) # =========================================================================== def test_e7_merge_commit_branch_have_includes_all_remote_heads( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Gap 6: when the local tip is a merge commit, branch_have must include all remote branch heads so the BFS stops at any already-remote ancestor on P2's chain.""" root = _bare_repo(tmp_path, monkeypatch) # Build: main-base → feature-tip; merge commit has both as parents base = _make_commit(root, "e7-base", content=b"base") feature = _make_commit(root, "e7-feature", parent_id=base.commit_id, content=b"feature") merge = _make_commit( root, "e7-merge", parent_id=base.commit_id, parent2_id=feature.commit_id, content=b"merge", ) (heads_dir(root) / "main").write_text(merge.commit_id) remote_feature_head = feature.commit_id remote_main_head = base.commit_id transport = _CapturingTransport( remote_heads={"main": remote_main_head, "feature": remote_feature_head}, result_head=merge.commit_id, ) walk_calls: list[_WalkCall] = [] from muse.cli.commands import push as _push_mod original_walk = _push_mod.walk_commits def _capture_walk( root_arg: pathlib.Path, tips: list[str], *, have: list[str] | None, ) -> "_WalkResult": walk_calls.append({"tips": list(tips), "have": list(have) if have else have}) return original_walk(root_arg, tips, have=have) with patch.object(_push_mod, "walk_commits", side_effect=_capture_walk): _run_push_cli(monkeypatch, transport) # The first walk in _push_mpack uses branch_have = [remote_main_head] + all other heads # for merge commits (to stop BFS on P2's chain) merge_tip_walks = [w for w in walk_calls if merge.commit_id in w["tips"]] assert merge_tip_walks, "Expected a walk with merge commit tip" main_walk_have = merge_tip_walks[0]["have"] assert remote_main_head in main_walk_have, ( "branch_have must include remote main head for merge commit" ) assert remote_feature_head in main_walk_have, ( "branch_have must include feature branch head (P2 optimisation) for merge commit. " "Fix: add all remote_branch_heads.values() to branch_have when local tip is a merge." )