"""TDD — mpack push delta format end-to-end. Proves the full path: client builds delta mpack → PUT to MinIO → server reconstructs manifests from deltas → snapshots written correctly. Dimensions match the real musehub repo: ~1031 commits, ~700 files per snapshot, ~5 files changed per commit. The one principle: snapshot_id = sha256(manifest). The delta chain is the proof. No full manifest blobs on the wire or in PG. """ from __future__ import annotations import datetime import hashlib import pathlib import time import httpx import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id # --------------------------------------------------------------------------- # Dimensions — match real musehub repo # --------------------------------------------------------------------------- _N_FILES = 700 # files per snapshot _N_COMMITS = 1_031 _FILES_CHANGED = 5 # files changed per commit _BLOB_SIZE = 512 _GATE_S = 60.0 # --------------------------------------------------------------------------- # Auth fixtures # --------------------------------------------------------------------------- _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "mpack-delta-e2e", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data["slug"] await client.delete(f"/api/repos/{data['repoId']}") # --------------------------------------------------------------------------- # Local repo builder # --------------------------------------------------------------------------- def _make_repo(tmp: pathlib.Path) -> pathlib.Path: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"delta-e2e","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") return tmp def _populate(repo: pathlib.Path) -> str: blob_ids: list[str] = [] for i in range(_N_FILES): data = f"base-{i:06d}".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(repo, oid, data) blob_ids.append(oid) base_manifest: dict[str, str] = { f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES) } parent: str | None = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES variant = f"commit-{i:05d}-file-{j}".encode() + b"y" * _BLOB_SIZE variant_oid = blob_id(variant) write_object(repo, variant_oid, variant) manifest[f"src/file_{idx:04d}.py"] = variant_oid sid = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(repo, "main", tip) return tip # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_mpack_is_delta_encoded(tmp_path: pathlib.Path) -> None: """Client-side: mpack snapshots are deltas, not full manifests. First snapshot has delta_upsert == full manifest (no parent). All subsequent snapshots have delta_upsert << _N_FILES entries. """ repo = _make_repo(tmp_path / "repo") head = _populate(repo) mpack = build_mpack(repo, [head], have=[]) snaps = mpack.get("snapshots") or [] assert len(snaps) == _N_COMMITS # First snapshot: full manifest as delta_upsert (no parent) assert len(snaps[0].get("delta_upsert", {})) == _N_FILES assert snaps[0].get("parent_snapshot_id") is None # All subsequent: only changed files for snap in snaps[1:]: n = len(snap.get("delta_upsert", {})) # Each commit changes _FILES_CHANGED files; when those differ from the # previous commit's changed files, delta_upsert includes both the new # additions and the reversions — at most 2× _FILES_CHANGED. assert n <= _FILES_CHANGED * 2, ( f"snapshot {snap['snapshot_id'][:16]} has {n} delta_upsert entries — " f"expected ≤ {_FILES_CHANGED * 2}" ) assert "manifest" not in snap, "full manifest must not appear in delta mpack" # Wire size: delta snapshots must be < 5% of full-manifest equivalent full_size = _N_COMMITS * _N_FILES * 80 # rough: 80 bytes per path+oid entry delta_size = sum( len(msgpack.packb(s, use_bin_type=True)) for s in snaps ) ratio = delta_size / full_size assert ratio < 0.05, f"delta snapshots are {ratio:.1%} of full — expected < 5%" @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_mpack_push_delta_e2e( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession ) -> None: """Full path: build delta mpack → PUT to MinIO → unpack-mpack → verify. Gate: server pipeline < 10s for 1031 commits × 700 files × 5 changed per commit. Proves manifest_blob is NOT stored (delta chain is the proof). """ local_repo = _make_repo(tmp_path / "repo") head = _populate(local_repo) mpack = build_mpack(local_repo, [head], have=[]) wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() t_server = time.perf_counter() # presign presign_resp = await client.post( f"/gabriel/{repo}/push/mpack-presign", content=msgpack.packb( {"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert presign_resp.status_code == 200, presign_resp.text upload_url = presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl") assert upload_url t_presign = time.perf_counter() # PUT to MinIO async with httpx.AsyncClient() as raw: put_resp = await raw.put(upload_url, content=wire_bytes) assert put_resp.status_code in (200, 204) t_put = time.perf_counter() # unpack unpack_resp = await client.post( f"/gabriel/{repo}/push/unpack-mpack", content=msgpack.packb( {"mpack_key": mpack_key, "branch": "main", "head": head}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert unpack_resp.status_code == 200, unpack_resp.text result = unpack_resp.json() t_unpack = time.perf_counter() # verify refs refs_resp = await client.get(f"/gabriel/{repo}/refs") assert refs_resp.status_code == 200 assert refs_resp.json().get("branch_heads", {}).get("main") == head t_done = time.perf_counter() server_ms = ( (t_presign - t_server) + (t_unpack - t_put) + (t_done - t_unpack) ) * 1000 assert result.get("commits_written") == _N_COMMITS, result assert result.get("snapshots_written") == _N_COMMITS, result gate_ms = _GATE_S * 1000 assert server_ms < gate_ms, ( f"Gate FAIL: {server_ms:.0f}ms > {gate_ms:.0f}ms\n" f" presign={(t_presign-t_server)*1000:.0f}ms " f"unpack={(t_unpack-t_put)*1000:.0f}ms " f"refs={(t_done-t_unpack)*1000:.0f}ms" ) print( f"\n {_N_COMMITS} commits × {_N_FILES} files × {_FILES_CHANGED} changed/commit\n" f" mpack wire: {len(wire_bytes)//1024} KiB\n" f" server total: {server_ms:.0f}ms (gate {gate_ms:.0f}ms)\n" f" commits: {result.get('commits_written')}\n" f" snapshots: {result.get('snapshots_written')}" )