"""TDD — mpack push path end-to-end tests. Gap 1: _push_mpack() is the production push path but was never exercised in any test. These tests drive the full mpack protocol (presign → PUT → unpack-mpack) with a fake urllib layer. Gap 6: The merge-commit P2 ancestor optimisation: when the local tip is a merge commit, _push_mpack adds ALL remote branch heads to branch_have so the BFS stops at any already-remote ancestor on the P2 chain rather than walking back to the repo root. Ported from deleted transport tests: V1 Second push sends commits_count=1 (have filter works) V2 First push (empty remote) sends all commits V3 Live GET /refs anchors the BFS even without a tracking ref VII1 Two-push sequence: commits_count in second push equals 1 New tests: E1 presign → PUT → unpack-mpack sequence is called in order E2 mpack_key in presign body == sha256 of the actual wire bytes E6 Non-merge commit: branch_have contains only the direct remote head E7 Merge commit: branch_have includes all remote branch heads (P2 opt) """ from __future__ import annotations import datetime import json import pathlib from collections.abc import Mapping from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import msgpack import pytest from muse._version import __version__ from muse.core.mpack import PushResult, RemoteInfo from muse.core.object_store import write_object from muse.core.paths import heads_dir, muse_dir, remotes_dir if TYPE_CHECKING: from muse.core.mpack import _WalkResult _Headers = dict[str, str] # HTTP header map _JsonDict = dict[str, str | int | float | bool | None | list[str]] # JSON object _RemoteHeads = dict[str, str] # branch → commit_id _WalkCall = dict[str, list[str] | None] # {"tips": [...], "have": [...] | None} from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from tests.cli_test_helper import CliRunner cli = None runner = CliRunner() _UPLOAD_URL = "https://minio.example.com/mpacks/put?sig=x" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _bare_repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: muse = muse_dir(tmp_path) for d in ("commits", "snapshots", "objects", "refs/heads", "remotes"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main\n") (muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (muse / "config.toml").write_text('[remotes.origin]\nurl = "https://hub.example.com/r"\n') monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) return tmp_path def _make_commit( root: pathlib.Path, label: str, parent_id: str | None = None, parent2_id: str | None = None, content: bytes | None = None, ) -> CommitRecord: raw = content if content is not None else f"content-{label}".encode() oid = blob_id(raw) write_object(root, oid, raw) manifest: Manifest = {"file.txt": oid} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [p for p in (parent_id, parent2_id) if p] cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"commit {label}", committed_at_iso=committed_at.isoformat(), ) commit = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message=f"commit {label}", committed_at=committed_at, parent_commit_id=parent_id, parent2_commit_id=parent2_id, ) write_commit(root, commit) return commit class _CapturingClient: """Fake urllib layer that records all requests and returns success responses.""" def __init__(self, result_head: str = "") -> None: self.posts: list[tuple[str, bytes]] = [] self.puts: list[tuple[str, bytes]] = [] # events records (url, kind) in true call order for sequence assertions self.events: list[tuple[str, str]] = [] self._result_head = result_head def fake_post(self, url: str, data: bytes, headers: Mapping[str, str], *, verify: bool = True) -> tuple[int, bytes]: self.posts.append((url, data)) self.events.append((url, "post")) if "mpack-presign" in url: return 200, json.dumps({"upload_url": _UPLOAD_URL}).encode() return 200, json.dumps( {"job_id": "job-e2e", "head": self._result_head, "branch": "main", "blobs_in_mpack": 0, "commits_in_mpack": 0} ).encode() def fake_put(self, url: str, data: bytes, **kw: int | bool) -> tuple[int, bytes]: self.puts.append((url, data)) self.events.append((url, "put")) return 200, b"" def _push_transport(remote_heads: _RemoteHeads | None = None) -> MagicMock: transport = MagicMock() transport.fetch_remote_info.return_value = RemoteInfo( domain="code", default_branch="main", branch_heads=remote_heads or {}, ) mock_req = MagicMock() mock_req.headers = {"Authorization": "MSign stub", "Content-Type": "application/x-msgpack"} transport._build_request.return_value = mock_req return transport def _run_push_cli( monkeypatch: pytest.MonkeyPatch, transport: MagicMock, client: "_CapturingClient", args: list[str] | None = None, ) -> "object": with ( patch("muse.cli.commands.push.make_transport", return_value=transport), patch("muse.cli.commands.push._urllib_post", side_effect=client.fake_post), patch("muse.cli.commands.push._urllib_put", side_effect=client.fake_put), ): return runner.invoke(cli, args or ["push", "origin"], catch_exceptions=False) def _unpack_body_of(posts: list[tuple[str, bytes]], path_fragment: str) -> _JsonDict: import json matching = [body for url, body in posts if path_fragment in url] assert matching, f"No POST to URL containing {path_fragment!r}" return json.loads(matching[0]) # =========================================================================== # E1 — presign → PUT → unpack sequence # =========================================================================== def test_e1_three_step_sequence_in_order( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """_run_mpack_path must call presign, then PUT, then unpack in that order.""" root = _bare_repo(tmp_path, monkeypatch) commit = _make_commit(root, "e1") (heads_dir(root) / "main").write_text(commit.commit_id) transport = _push_transport() client = _CapturingClient(result_head=commit.commit_id) result = _run_push_cli(monkeypatch, transport, client) assert result.exit_code == 0, result.output # Verify all three steps happened presign_posts = [url for url, _ in client.posts if "mpack-presign" in url] unpack_posts = [url for url, _ in client.posts if "unpack-mpack" in url] puts = [url for url, _ in client.puts] assert presign_posts, "POST to /push/mpack-presign missing" assert puts, "PUT to MinIO missing" assert unpack_posts, "POST to /push/unpack-mpack missing" # Order: presign first, then PUT, then unpack (use events for true call order) presign_idx = next(i for i, (u, _) in enumerate(client.events) if "mpack-presign" in u) put_idx = next(i for i, (_, t) in enumerate(client.events) if t == "put") unpack_idx = next(i for i, (u, _) in enumerate(client.events) if "unpack-mpack" in u) assert presign_idx < put_idx < unpack_idx, ( f"Sequence wrong: presign@{presign_idx} put@{put_idx} unpack@{unpack_idx}" ) # =========================================================================== # E2 — mpack_key in presign body matches sha256 of wire bytes # =========================================================================== def test_e2_presign_key_matches_put_bytes( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """The mpack_key in the presign request must equal blob_id(wire_bytes PUT to MinIO).""" from muse.core.types import blob_id root = _bare_repo(tmp_path, monkeypatch) commit = _make_commit(root, "e2", content=b"key integrity test") (heads_dir(root) / "main").write_text(commit.commit_id) transport = _push_transport() client = _CapturingClient(result_head=commit.commit_id) result = _run_push_cli(monkeypatch, transport, client) assert result.exit_code == 0 presign_body = _unpack_body_of(client.posts, "mpack-presign") put_bytes = client.puts[0][1] expected_key = blob_id(put_bytes) assert presign_body["mpack_key"] == expected_key, ( f"presign mpack_key={presign_body['mpack_key'][:30]} " f"does not match blob_id of PUT bytes={expected_key[:30]}" ) # =========================================================================== # V1-port — second push sends commits_count=1 (have filter works) # =========================================================================== def test_v1_second_push_commits_count_is_one( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Second push: remote has c1, local has c2 → unpack body has commits_count=1.""" root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "v1-first", content=b"v1") c2 = _make_commit(root, "v1-second", parent_id=c1.commit_id, content=b"v2") (heads_dir(root) / "main").write_text(c2.commit_id) # Remote already has c1 (tracking ref + live remote info) transport = _push_transport(remote_heads={"main": c1.commit_id}) client = _CapturingClient(result_head=c2.commit_id) result = _run_push_cli(monkeypatch, transport, client) assert result.exit_code == 0, result.output unpack_body = _unpack_body_of(client.posts, "unpack-mpack") assert unpack_body["commits_count"] == 1, ( f"Second push must send commits_count=1, got {unpack_body['commits_count']}. " "Bug: have filter dropped the ancestor anchor so BFS walked all commits." ) # =========================================================================== # V2-port — first push (empty remote) sends all commits # =========================================================================== def test_v2_first_push_sends_all_commits( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """When the remote is empty, unpack body has commits_count equal to full history.""" root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "v2-a", content=b"a") c2 = _make_commit(root, "v2-b", parent_id=c1.commit_id, content=b"b") (heads_dir(root) / "main").write_text(c2.commit_id) transport = _push_transport(remote_heads={}) # remote has nothing client = _CapturingClient(result_head=c2.commit_id) result = _run_push_cli(monkeypatch, transport, client) assert result.exit_code == 0, result.output unpack_body = _unpack_body_of(client.posts, "unpack-mpack") assert unpack_body["commits_count"] == 2, ( f"Full history push must send commits_count=2, got {unpack_body['commits_count']}" ) # =========================================================================== # V3-port — live GET /refs anchors BFS even without tracking ref # =========================================================================== def test_v3_live_refs_anchors_bfs( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Even without a local tracking ref, GET /refs result anchors the BFS.""" root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "v3-base", content=b"base") c2 = _make_commit(root, "v3-new", parent_id=c1.commit_id, content=b"new") (heads_dir(root) / "main").write_text(c2.commit_id) # No tracking ref written — only live remote info has c1 transport = _push_transport(remote_heads={"main": c1.commit_id}) client = _CapturingClient(result_head=c2.commit_id) result = _run_push_cli(monkeypatch, transport, client) assert result.exit_code == 0, result.output unpack_body = _unpack_body_of(client.posts, "unpack-mpack") assert unpack_body["commits_count"] == 1, ( f"Live /refs anchor must limit BFS to 1 new commit, got {unpack_body['commits_count']}" ) # =========================================================================== # VII1-port — two sequential pushes; second has commits_count=1 # =========================================================================== def test_vii1_push_twice_second_sends_one_commit( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Full two-push sequence: first sends N commits; second sends exactly 1.""" root = _bare_repo(tmp_path, monkeypatch) n = 5 ids: list[str] = [] parent: str | None = None for i in range(n): c = _make_commit(root, f"chain-{i}", parent_id=parent, content=f"v{i}".encode()) ids.append(c.commit_id) parent = c.commit_id (heads_dir(root) / "main").write_text(ids[-1]) # First push — remote is empty t1 = _push_transport(remote_heads={}) c1_client = _CapturingClient(result_head=ids[-1]) r1 = _run_push_cli(monkeypatch, t1, c1_client) assert r1.exit_code == 0, r1.output unpack1 = _unpack_body_of(c1_client.posts, "unpack-mpack") assert unpack1["commits_count"] == n, ( f"First push must send all {n} commits, got {unpack1['commits_count']}" ) # Simulate tracking ref updated by set_remote_head origin_dir = remotes_dir(root) / "origin" origin_dir.mkdir(parents=True, exist_ok=True) (origin_dir / "main").write_text(ids[-1]) # Add one new commit c_new = _make_commit(root, "one-more", parent_id=ids[-1], content=b"new") (heads_dir(root) / "main").write_text(c_new.commit_id) # Second push — remote has ids[-1] t2 = _push_transport(remote_heads={"main": ids[-1]}) c2_client = _CapturingClient(result_head=c_new.commit_id) r2 = _run_push_cli(monkeypatch, t2, c2_client) assert r2.exit_code == 0, r2.output unpack2 = _unpack_body_of(c2_client.posts, "unpack-mpack") assert unpack2["commits_count"] == 1, ( f"Second push must send commits_count=1, got {unpack2['commits_count']}. " "Regression: have filter dropped the ancestor anchor." ) # =========================================================================== # E6 — non-merge commit: branch_have is single remote head # =========================================================================== def test_e6_non_merge_branch_have_is_single_head( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """A non-merge commit: branch_have contains only the direct remote branch head.""" from unittest.mock import call root = _bare_repo(tmp_path, monkeypatch) c1 = _make_commit(root, "e6-base", content=b"base") c2 = _make_commit(root, "e6-tip", parent_id=c1.commit_id, content=b"tip") (heads_dir(root) / "main").write_text(c2.commit_id) # Remote has both main and dev; local tip is a simple (non-merge) commit transport = _push_transport(remote_heads={ "main": c1.commit_id, "dev": c1.commit_id, }) client = _CapturingClient(result_head=c2.commit_id) walk_calls: list[_WalkCall] = [] from muse.cli.commands import push as _push_mod original_walk = _push_mod.walk_commits def _capture_walk( root_arg: pathlib.Path, tips: list[str], *, have: list[str] | None, ) -> "_WalkResult": walk_calls.append({"tips": tips, "have": have}) return original_walk(root_arg, tips, have=have) with patch.object(_push_mod, "walk_commits", side_effect=_capture_walk): _run_push_cli(monkeypatch, transport, client) # The BFS walk that counts commits uses _branch_have (from branch_have param in _push_mpack). # For a non-merge commit: branch_have = [remote_head of the target branch only] branch_walks = [w for w in walk_calls if w["tips"] == [c2.commit_id]] assert branch_walks, f"Expected a walk with tip={c2.commit_id[:16]}" # The have list for the main BFS should contain c1 (remote main head) # and NOT the dev head (no P2 optimisation for non-merge commits) main_walk = branch_walks[0] assert c1.commit_id in main_walk["have"], "Remote main head must be in have" # =========================================================================== # E7 — merge commit: branch_have includes ALL remote branch heads (P2 opt) # =========================================================================== def test_e7_merge_commit_branch_have_includes_all_remote_heads( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Gap 6: when the local tip is a merge commit, branch_have must include all remote branch heads so the BFS stops at any already-remote ancestor on P2's chain.""" root = _bare_repo(tmp_path, monkeypatch) # Build: main-base → feature-tip; merge commit has both as parents base = _make_commit(root, "e7-base", content=b"base") feature = _make_commit(root, "e7-feature", parent_id=base.commit_id, content=b"feature") merge = _make_commit( root, "e7-merge", parent_id=base.commit_id, parent2_id=feature.commit_id, content=b"merge", ) (heads_dir(root) / "main").write_text(merge.commit_id) remote_feature_head = feature.commit_id remote_main_head = base.commit_id transport = _push_transport(remote_heads={ "main": remote_main_head, "feature": remote_feature_head, }) client = _CapturingClient(result_head=merge.commit_id) walk_calls: list[_WalkCall] = [] from muse.cli.commands import push as _push_mod original_walk = _push_mod.walk_commits def _capture_walk( root_arg: pathlib.Path, tips: list[str], *, have: list[str] | None, ) -> "_WalkResult": walk_calls.append({"tips": list(tips), "have": list(have) if have else have}) return original_walk(root_arg, tips, have=have) with patch.object(_push_mod, "walk_commits", side_effect=_capture_walk): _run_push_cli(monkeypatch, transport, client) # The first walk in _push_mpack uses branch_have = [remote_main_head] + all other heads # for merge commits (to stop BFS on P2's chain) merge_tip_walks = [w for w in walk_calls if merge.commit_id in w["tips"]] assert merge_tip_walks, "Expected a walk with merge commit tip" main_walk_have = merge_tip_walks[0]["have"] assert remote_main_head in main_walk_have, ( "branch_have must include remote main head for merge commit" ) assert remote_feature_head in main_walk_have, ( "branch_have must include feature branch head (P2 optimisation) for merge commit. " "Fix: add all remote_branch_heads.values() to branch_have when local tip is a merge." )