"""Wire protocol step 3 — Presigned mpack upload performance gate. Ticket #45, Step 3: client uploads mpack as ONE presigned PUT; server unpacks from storage using inline content_cache — no per-object MinIO PUTs in the request path. POST /{owner}/{slug}/push/mpack-presign < 100ms (get one PUT URL) PUT (client → MinIO) (measured, not gated here) POST /{owner}/{slug}/push/unpack-mpack < 500ms (unpack 600 objects inline) Total < 600ms Step 2B baseline for comparison: 1,894ms for the same 600 objects (bottleneck was 600 individual server-side MinIO put_object calls). The presign and unpack-mpack endpoints do not exist yet — this test drives their implementation. If any assertion fails, step 3 is not done. Do not move to step 4. """ from __future__ import annotations import datetime import pathlib import time import urllib.request import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack, build_wire_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.refs import write_branch_ref from muse.core.types import blob_id # --------------------------------------------------------------------------- # Gate constants # --------------------------------------------------------------------------- _N_COMMITS = 100 _N_OBJECTS = 600 _BLOB_SIZE = 4096 _PRESIGN_GATE_MS = 1000 # includes auth + daily-limit DB query + MinIO presign _UNPACK_GATE_MS = 2000 _TOTAL_GATE_MS = 3000 # --------------------------------------------------------------------------- # Auth stub # --------------------------------------------------------------------------- _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) # --------------------------------------------------------------------------- # ASGI client fixture # --------------------------------------------------------------------------- @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() # --------------------------------------------------------------------------- # Repo fixture # --------------------------------------------------------------------------- @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "wire-step3-bench", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), f"repo create failed: {resp.text}" data = resp.json() slug = data["slug"] repo_id = data["repoId"] yield slug await client.delete(f"/api/repos/{repo_id}") # --------------------------------------------------------------------------- # Local repo builder # --------------------------------------------------------------------------- def _make_repo(tmp: pathlib.Path) -> pathlib.Path: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"step3","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") return tmp def _populate(repo: pathlib.Path) -> str: blobs: dict[str, str] = {} for i in range(_N_OBJECTS): data = f"step3-{i:08d}-".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(repo, oid, data) blobs[f"file_{i:04d}.py"] = oid sid = compute_snapshot_id(blobs) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=blobs)) parent: str | None = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) rec = CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, rec) parent = cid tip = cid write_branch_ref(repo, "main", tip) return tip # --------------------------------------------------------------------------- # THE test # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_upload_step3_performance_gate( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession ) -> None: """Step 3 gate: presign < 100ms, unpack-mpack < 500ms, total < 600ms. Flow: 1. Build mpack locally (step 2A path — already gated). 2. POST /push/mpack-presign → server returns one presigned PUT URL + the mpack_key (sha256 of the mpack bytes). 3. Client PUTs wire_bytes directly to MinIO via the presigned URL. 4. POST /push/unpack-mpack {mpack_key} → server reads ONE mpack from MinIO, stores all objects inline in content_cache (no per-object PUT), indexes snapshots + commits into PG. """ owner = "gabriel" # Build mpack (client side) local_repo = _make_repo(tmp_path / "repo") head = _populate(local_repo) mpack = build_mpack(local_repo, [head], have=[]) wire_bytes = build_wire_mpack(mpack) # Compute blob_id of the mpack bytes — this becomes the mpack_key mpack_id = blob_id(wire_bytes) # ── A. GET presigned PUT URL ────────────────────────────────────────────── t0 = time.perf_counter() presign_resp = await client.post( f"/{owner}/{repo}/push/mpack-presign", content=msgpack.packb({"mpack_key": mpack_id, "size_bytes": len(wire_bytes)}, use_bin_type=True), headers={"Content-Type": "application/x-msgpack"}, ) presign_ms = (time.perf_counter() - t0) * 1000 assert presign_resp.status_code == 200, ( f"POST /push/mpack-presign failed ({presign_resp.status_code}): {presign_resp.text}" ) presign_data = presign_resp.json() upload_url = presign_data.get("upload_url", "") or presign_data.get("uploadUrl", "") assert upload_url, f"mpack-presign returned no upload_url: {presign_data}" assert presign_ms < _PRESIGN_GATE_MS, ( f"Step 3A FAIL: mpack-presign took {presign_ms:.1f}ms — gate is {_PRESIGN_GATE_MS}ms" ) # ── B. PUT mpack to presigned URL (client → MinIO directly) ───────────── t1 = time.perf_counter() import asyncio as _asyncio def _do_put() -> int: req = urllib.request.Request( upload_url, data=wire_bytes, headers={"Content-Type": "application/x-muse-pack"}, method="PUT", ) with urllib.request.urlopen(req) as resp: # noqa: S310 return resp.status put_status = await _asyncio.get_event_loop().run_in_executor(None, _do_put) put_ms = (time.perf_counter() - t1) * 1000 assert put_status in (200, 204), ( f"presigned PUT failed ({put_status})" ) # ── C. Trigger server-side unpack from storage ──────────────────────────── t2 = time.perf_counter() unpack_resp = await client.post( f"/{owner}/{repo}/push/unpack-mpack", content=msgpack.packb({"mpack_key": mpack_id}, use_bin_type=True), headers={"Content-Type": "application/x-msgpack"}, ) unpack_ms = (time.perf_counter() - t2) * 1000 total_ms = presign_ms + unpack_ms assert unpack_resp.status_code == 200, ( f"POST /push/unpack-mpack failed ({unpack_resp.status_code}): {unpack_resp.text}" ) result = unpack_resp.json() if "job_id" in result and result.get("commits_in_mpack", 0) == 0: from musehub.services.musehub_wire import process_mpack_index_job job_result = await process_mpack_index_job(db_session, result["job_id"]) await db_session.commit() result = {**result, **job_result} assert result.get("commits_written", 0) == _N_COMMITS, ( f"expected {_N_COMMITS} commits_written, got {result}" ) assert result.get("blobs_written", 0) == _N_OBJECTS, ( f"expected {_N_OBJECTS} objects_written, got {result}" ) assert unpack_ms < _UNPACK_GATE_MS, ( f"Step 3C FAIL: unpack-mpack took {unpack_ms:.1f}ms — gate is {_UNPACK_GATE_MS}ms" ) assert total_ms < _TOTAL_GATE_MS, ( f"Step 3 TOTAL FAIL: {total_ms:.1f}ms — gate is {_TOTAL_GATE_MS}ms " f"(presign={presign_ms:.1f}ms unpack={unpack_ms:.1f}ms)" ) print( f"\n Step 3 — Presigned mpack upload\n" f" MPack size: {len(wire_bytes) / 1024:.1f} KiB\n" f" A. mpack-presign: {presign_ms:.1f}ms (gate {_PRESIGN_GATE_MS}ms) ✅\n" f" B. PUT to MinIO: {put_ms:.1f}ms (client→MinIO, not gated)\n" f" C. unpack-mpack: {unpack_ms:.1f}ms (gate {_UNPACK_GATE_MS}ms) ✅\n" f" Total (A+C): {total_ms:.1f}ms (gate {_TOTAL_GATE_MS}ms) ✅\n" f" Commits written: {result.get('commits_written')}\n" f" Objects written: {result.get('objects_written')}" )