"""Wire protocol — scale gate matching real-world musehub repo dimensions. Step 4 (test_wire_step4_e2e.py) uses 100 commits sharing ONE snapshot. Real musehub push: 1028 commits × 1022 DISTINCT snapshots × 5151 objects. That's the case that hangs — this test reproduces it. Each commit gets its own snapshot (one file changed per commit), so the server must INSERT ~1000 snapshot rows, ~1000 commit rows, and ~5000 object rows. Gate: full server pipeline < 10s. """ from __future__ import annotations import datetime import hashlib import pathlib import time import asyncio as _asyncio import urllib.request import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack, build_wire_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id # --------------------------------------------------------------------------- # Scale parameters — match real musehub repo dimensions # --------------------------------------------------------------------------- _N_COMMITS = 1_000 _N_OBJECTS = 5_000 # total distinct blobs _BLOB_SIZE = 4_096 _E2E_GATE_S = 15.0 # generous gate; goal is not to hang (raised from 10s — Phase 2/3 add concurrent S3 puts for 1000 commits) # --------------------------------------------------------------------------- # Auth + fixtures # --------------------------------------------------------------------------- _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "wire-scale", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), f"repo create failed: {resp.text}" data = resp.json() slug = data["slug"] repo_id = data["repoId"] yield slug await client.delete(f"/api/repos/{repo_id}") # --------------------------------------------------------------------------- # Local repo builder — one distinct snapshot per commit # --------------------------------------------------------------------------- def _make_repo(tmp: pathlib.Path) -> pathlib.Path: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"scale","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") return tmp def _populate(repo: pathlib.Path) -> str: """Create _N_OBJECTS blobs, then _N_COMMITS commits each with a distinct snapshot. Each commit swaps out one file so every snapshot is unique — this matches the real-world case where every commit touches at least one file. """ # Create all blobs up front. blob_ids: list[str] = [] for i in range(_N_OBJECTS): data = f"scale-{i:08d}-".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(repo, oid, data) blob_ids.append(oid) # Base manifest: files 0…N_OBJECTS-1 base_manifest: dict[str, str] = { f"file_{i:04d}.py": blob_ids[i] for i in range(_N_OBJECTS) } parent: str | None = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): # Each commit replaces one file with a re-hashed variant, making the # snapshot unique (different manifest → different snapshot_id). manifest = dict(base_manifest) variant = f"commit-{i:05d}-variant".encode() + b"y" * _BLOB_SIZE variant_oid = blob_id(variant) write_object(repo, variant_oid, variant) manifest[f"file_{i % _N_OBJECTS:04d}.py"] = variant_oid sid = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) rec = CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, rec) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(repo, "main", tip) return tip # --------------------------------------------------------------------------- # THE test # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_scale_unpack_mpack( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession ) -> None: """Scale gate: 1000 commits × 1000 distinct snapshots × 5000 objects < 10s.""" owner = "gabriel" branch = "main" local_repo = _make_repo(tmp_path / "repo") t_build = time.perf_counter() head = _populate(local_repo) mpack = build_mpack(local_repo, [head], have=[]) wire_bytes = build_wire_mpack(mpack) mpack_key = blob_id(wire_bytes) print(f"\n build+pack: {time.perf_counter()-t_build:.2f}s " f"({len(wire_bytes)//1024} KiB)", flush=True) t_server = time.perf_counter() # 1. mpack-presign presign_resp = await client.post( f"/{owner}/{repo}/push/mpack-presign", content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True), headers={"Content-Type": "application/x-msgpack"}, ) assert presign_resp.status_code == 200, presign_resp.text upload_url = presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl") assert upload_url t_presign = time.perf_counter() # 2. PUT → MinIO def _do_put() -> int: req = urllib.request.Request( upload_url, data=wire_bytes, headers={"Content-Type": "application/x-muse-pack"}, method="PUT", ) with urllib.request.urlopen(req) as resp: # noqa: S310 return resp.status put_status = await _asyncio.get_event_loop().run_in_executor(None, _do_put) assert put_status in (200, 204), f"PUT failed {put_status}" t_put = time.perf_counter() # 3. unpack-mpack unpack_resp = await client.post( f"/{owner}/{repo}/push/unpack-mpack", content=msgpack.packb( {"mpack_key": mpack_key, "branch": branch, "head": head}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert unpack_resp.status_code == 200, unpack_resp.text result = unpack_resp.json() t_unpack = time.perf_counter() # 4. verify branch head refs_resp = await client.get(f"/{owner}/{repo}/refs") assert refs_resp.status_code == 200 branch_heads = refs_resp.json().get("branch_heads", {}) t_verify = time.perf_counter() server_ms = ( (t_presign - t_server) + (t_unpack - t_put) + (t_verify - t_unpack) ) * 1000 put_ms = (t_put - t_presign) * 1000 if "job_id" in result and result.get("commits_in_mpack", 0) == 0: from musehub.services.musehub_wire import process_mpack_index_job job_result = await process_mpack_index_job(db_session, result["job_id"]) await db_session.commit() result = {**result, **job_result} assert result.get("commits_written") == _N_COMMITS, result assert branch_heads.get(branch) == head, branch_heads gate_ms = _E2E_GATE_S * 1000 assert server_ms < gate_ms, ( f"Scale gate FAIL: {server_ms:.0f}ms > {gate_ms:.0f}ms\n" f" presign={( t_presign-t_server)*1000:.0f}ms " f"unpack={(t_unpack-t_put)*1000:.0f}ms " f"verify={(t_verify-t_unpack)*1000:.0f}ms" ) print( f"\n Scale gate — 1000 commits × 1000 snapshots × 5000 objects\n" f" mpack: {len(wire_bytes)//1024} KiB\n" f" presign: {(t_presign-t_server)*1000:.0f}ms\n" f" PUT → MinIO: {put_ms:.0f}ms (not counted)\n" f" unpack-mpack: {(t_unpack-t_put)*1000:.0f}ms\n" f" verify refs: {(t_verify-t_unpack)*1000:.0f}ms\n" f" server total: {server_ms:.0f}ms (gate {gate_ms:.0f}ms)\n" f" commits written: {result.get('commits_written')}\n" f" objects written: {result.get('objects_written')}" )