test_wire_step3_upload.py
python
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
| 1 | """Wire protocol step 3 — Presigned mpack upload performance gate. |
| 2 | |
| 3 | Ticket #45, Step 3: client uploads mpack as ONE presigned PUT; server unpacks |
| 4 | from storage using inline content_cache — no per-object MinIO PUTs in the |
| 5 | request path. |
| 6 | |
| 7 | POST /{owner}/{slug}/push/mpack-presign < 100ms (get one PUT URL) |
| 8 | PUT <presigned_url> (client → MinIO) (measured, not gated here) |
| 9 | POST /{owner}/{slug}/push/unpack-mpack < 500ms (unpack 600 objects inline) |
| 10 | Total < 600ms |
| 11 | |
| 12 | Step 2B baseline for comparison: 1,894ms for the same 600 objects |
| 13 | (bottleneck was 600 individual server-side MinIO put_object calls). |
| 14 | |
| 15 | The presign and unpack-mpack endpoints do not exist yet — this test |
| 16 | drives their implementation. |
| 17 | |
| 18 | If any assertion fails, step 3 is not done. Do not move to step 4. |
| 19 | """ |
| 20 | from __future__ import annotations |
| 21 | |
| 22 | import datetime |
| 23 | import pathlib |
| 24 | import time |
| 25 | |
| 26 | import urllib.request |
| 27 | import msgpack |
| 28 | import pytest |
| 29 | import pytest_asyncio |
| 30 | from httpx import AsyncClient, ASGITransport |
| 31 | from sqlalchemy.ext.asyncio import AsyncSession |
| 32 | |
| 33 | from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request |
| 34 | from musehub.db.database import get_db |
| 35 | from musehub.main import app |
| 36 | |
| 37 | from muse.core.object_store import write_object |
| 38 | from muse.core.mpack import build_mpack, build_wire_mpack |
| 39 | from muse.core.paths import muse_dir |
| 40 | from muse.core.snapshot import compute_commit_id, compute_snapshot_id |
| 41 | from muse.core.commits import CommitRecord, write_commit |
| 42 | from muse.core.snapshots import SnapshotRecord, write_snapshot |
| 43 | from muse.core.refs import write_branch_ref |
| 44 | from muse.core.types import blob_id |
| 45 | |
| 46 | |
| 47 | # --------------------------------------------------------------------------- |
| 48 | # Gate constants |
| 49 | # --------------------------------------------------------------------------- |
| 50 | |
| 51 | _N_COMMITS = 100 |
| 52 | _N_OBJECTS = 600 |
| 53 | _BLOB_SIZE = 4096 |
| 54 | _PRESIGN_GATE_MS = 1000 # includes auth + daily-limit DB query + MinIO presign |
| 55 | _UNPACK_GATE_MS = 2000 |
| 56 | _TOTAL_GATE_MS = 3000 |
| 57 | |
| 58 | |
| 59 | # --------------------------------------------------------------------------- |
| 60 | # Auth stub |
| 61 | # --------------------------------------------------------------------------- |
| 62 | |
| 63 | _AUTH_CTX = MSignContext( |
| 64 | handle="gabriel", |
| 65 | identity_id="sha256:" + "0" * 64, |
| 66 | is_agent=False, |
| 67 | is_admin=True, |
| 68 | ) |
| 69 | |
| 70 | |
| 71 | # --------------------------------------------------------------------------- |
| 72 | # ASGI client fixture |
| 73 | # --------------------------------------------------------------------------- |
| 74 | |
| 75 | @pytest_asyncio.fixture() |
| 76 | async def client(db_session: AsyncSession) -> None: |
| 77 | async def _override_get_db() -> None: |
| 78 | yield db_session |
| 79 | |
| 80 | app.dependency_overrides[get_db] = _override_get_db |
| 81 | app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX |
| 82 | app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX |
| 83 | |
| 84 | async with AsyncClient( |
| 85 | transport=ASGITransport(app=app), |
| 86 | base_url="https://localhost:1337", |
| 87 | ) as c: |
| 88 | yield c |
| 89 | |
| 90 | app.dependency_overrides.clear() |
| 91 | |
| 92 | |
| 93 | # --------------------------------------------------------------------------- |
| 94 | # Repo fixture |
| 95 | # --------------------------------------------------------------------------- |
| 96 | |
| 97 | @pytest_asyncio.fixture() |
| 98 | async def repo(client: AsyncClient) -> None: |
| 99 | resp = await client.post( |
| 100 | "/api/repos", |
| 101 | json={"owner": "gabriel", "name": "wire-step3-bench", "visibility": "public", "initialize": False}, |
| 102 | ) |
| 103 | assert resp.status_code in (200, 201), f"repo create failed: {resp.text}" |
| 104 | data = resp.json() |
| 105 | slug = data["slug"] |
| 106 | repo_id = data["repoId"] |
| 107 | yield slug |
| 108 | await client.delete(f"/api/repos/{repo_id}") |
| 109 | |
| 110 | |
| 111 | # --------------------------------------------------------------------------- |
| 112 | # Local repo builder |
| 113 | # --------------------------------------------------------------------------- |
| 114 | |
| 115 | def _make_repo(tmp: pathlib.Path) -> pathlib.Path: |
| 116 | tmp.mkdir(parents=True, exist_ok=True) |
| 117 | dot = muse_dir(tmp) |
| 118 | dot.mkdir() |
| 119 | (dot / "repo.json").write_text('{"repo_id":"step3","owner":"gabriel"}') |
| 120 | for d in ("commits", "snapshots", "objects"): |
| 121 | (dot / d).mkdir() |
| 122 | (dot / "refs" / "heads").mkdir(parents=True) |
| 123 | (dot / "HEAD").write_text("ref: refs/heads/main\n") |
| 124 | (dot / "config.toml").write_text("") |
| 125 | return tmp |
| 126 | |
| 127 | |
| 128 | def _populate(repo: pathlib.Path) -> str: |
| 129 | blobs: dict[str, str] = {} |
| 130 | for i in range(_N_OBJECTS): |
| 131 | data = f"step3-{i:08d}-".encode() + b"x" * _BLOB_SIZE |
| 132 | oid = blob_id(data) |
| 133 | write_object(repo, oid, data) |
| 134 | blobs[f"file_{i:04d}.py"] = oid |
| 135 | |
| 136 | sid = compute_snapshot_id(blobs) |
| 137 | write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=blobs)) |
| 138 | |
| 139 | parent: str | None = None |
| 140 | tip = "" |
| 141 | ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) |
| 142 | for i in range(_N_COMMITS): |
| 143 | msg = f"commit-{i:05d}" |
| 144 | cid = compute_commit_id( |
| 145 | parent_ids=[parent] if parent else [], |
| 146 | snapshot_id=sid, |
| 147 | message=msg, |
| 148 | committed_at_iso=ts.isoformat(), |
| 149 | author="gabriel", |
| 150 | ) |
| 151 | rec = CommitRecord( |
| 152 | commit_id=cid, |
| 153 | branch="main", |
| 154 | snapshot_id=sid, |
| 155 | message=msg, |
| 156 | committed_at=ts, |
| 157 | parent_commit_id=parent, |
| 158 | parent2_commit_id=None, |
| 159 | author="gabriel", |
| 160 | metadata={}, |
| 161 | structured_delta=None, |
| 162 | sem_ver_bump="none", |
| 163 | breaking_changes=[], |
| 164 | agent_id="", |
| 165 | model_id="", |
| 166 | toolchain_id="", |
| 167 | prompt_hash="", |
| 168 | signature="", |
| 169 | signer_key_id="", |
| 170 | ) |
| 171 | write_commit(repo, rec) |
| 172 | parent = cid |
| 173 | tip = cid |
| 174 | |
| 175 | write_branch_ref(repo, "main", tip) |
| 176 | return tip |
| 177 | |
| 178 | |
| 179 | # --------------------------------------------------------------------------- |
| 180 | # THE test |
| 181 | # --------------------------------------------------------------------------- |
| 182 | |
| 183 | @pytest.mark.asyncio |
| 184 | async def test_upload_step3_performance_gate( |
| 185 | client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession |
| 186 | ) -> None: |
| 187 | """Step 3 gate: presign < 100ms, unpack-mpack < 500ms, total < 600ms. |
| 188 | |
| 189 | Flow: |
| 190 | 1. Build mpack locally (step 2A path — already gated). |
| 191 | 2. POST /push/mpack-presign → server returns one presigned PUT URL + |
| 192 | the mpack_key (sha256 of the mpack bytes). |
| 193 | 3. Client PUTs wire_bytes directly to MinIO via the presigned URL. |
| 194 | 4. POST /push/unpack-mpack {mpack_key} → server reads ONE mpack from |
| 195 | MinIO, stores all objects inline in content_cache (no per-object PUT), |
| 196 | indexes snapshots + commits into PG. |
| 197 | """ |
| 198 | owner = "gabriel" |
| 199 | |
| 200 | # Build mpack (client side) |
| 201 | local_repo = _make_repo(tmp_path / "repo") |
| 202 | head = _populate(local_repo) |
| 203 | mpack = build_mpack(local_repo, [head], have=[]) |
| 204 | wire_bytes = build_wire_mpack(mpack) |
| 205 | |
| 206 | # Compute blob_id of the mpack bytes — this becomes the mpack_key |
| 207 | mpack_id = blob_id(wire_bytes) |
| 208 | |
| 209 | # ── A. GET presigned PUT URL ────────────────────────────────────────────── |
| 210 | t0 = time.perf_counter() |
| 211 | presign_resp = await client.post( |
| 212 | f"/{owner}/{repo}/push/mpack-presign", |
| 213 | content=msgpack.packb({"mpack_key": mpack_id, "size_bytes": len(wire_bytes)}, use_bin_type=True), |
| 214 | headers={"Content-Type": "application/x-msgpack"}, |
| 215 | ) |
| 216 | presign_ms = (time.perf_counter() - t0) * 1000 |
| 217 | |
| 218 | assert presign_resp.status_code == 200, ( |
| 219 | f"POST /push/mpack-presign failed ({presign_resp.status_code}): {presign_resp.text}" |
| 220 | ) |
| 221 | presign_data = presign_resp.json() |
| 222 | upload_url = presign_data.get("upload_url", "") or presign_data.get("uploadUrl", "") |
| 223 | assert upload_url, f"mpack-presign returned no upload_url: {presign_data}" |
| 224 | |
| 225 | assert presign_ms < _PRESIGN_GATE_MS, ( |
| 226 | f"Step 3A FAIL: mpack-presign took {presign_ms:.1f}ms — gate is {_PRESIGN_GATE_MS}ms" |
| 227 | ) |
| 228 | |
| 229 | # ── B. PUT mpack to presigned URL (client → MinIO directly) ───────────── |
| 230 | t1 = time.perf_counter() |
| 231 | import asyncio as _asyncio |
| 232 | def _do_put() -> int: |
| 233 | req = urllib.request.Request( |
| 234 | upload_url, |
| 235 | data=wire_bytes, |
| 236 | headers={"Content-Type": "application/x-muse-pack"}, |
| 237 | method="PUT", |
| 238 | ) |
| 239 | with urllib.request.urlopen(req) as resp: # noqa: S310 |
| 240 | return resp.status |
| 241 | put_status = await _asyncio.get_event_loop().run_in_executor(None, _do_put) |
| 242 | put_ms = (time.perf_counter() - t1) * 1000 |
| 243 | assert put_status in (200, 204), ( |
| 244 | f"presigned PUT failed ({put_status})" |
| 245 | ) |
| 246 | |
| 247 | # ── C. Trigger server-side unpack from storage ──────────────────────────── |
| 248 | t2 = time.perf_counter() |
| 249 | unpack_resp = await client.post( |
| 250 | f"/{owner}/{repo}/push/unpack-mpack", |
| 251 | content=msgpack.packb({"mpack_key": mpack_id}, use_bin_type=True), |
| 252 | headers={"Content-Type": "application/x-msgpack"}, |
| 253 | ) |
| 254 | unpack_ms = (time.perf_counter() - t2) * 1000 |
| 255 | total_ms = presign_ms + unpack_ms |
| 256 | |
| 257 | assert unpack_resp.status_code == 200, ( |
| 258 | f"POST /push/unpack-mpack failed ({unpack_resp.status_code}): {unpack_resp.text}" |
| 259 | ) |
| 260 | result = unpack_resp.json() |
| 261 | |
| 262 | if "job_id" in result and result.get("commits_in_mpack", 0) == 0: |
| 263 | from musehub.services.musehub_wire import process_mpack_index_job |
| 264 | job_result = await process_mpack_index_job(db_session, result["job_id"]) |
| 265 | await db_session.commit() |
| 266 | result = {**result, **job_result} |
| 267 | |
| 268 | assert result.get("commits_written", 0) == _N_COMMITS, ( |
| 269 | f"expected {_N_COMMITS} commits_written, got {result}" |
| 270 | ) |
| 271 | assert result.get("blobs_written", 0) == _N_OBJECTS, ( |
| 272 | f"expected {_N_OBJECTS} objects_written, got {result}" |
| 273 | ) |
| 274 | |
| 275 | assert unpack_ms < _UNPACK_GATE_MS, ( |
| 276 | f"Step 3C FAIL: unpack-mpack took {unpack_ms:.1f}ms — gate is {_UNPACK_GATE_MS}ms" |
| 277 | ) |
| 278 | assert total_ms < _TOTAL_GATE_MS, ( |
| 279 | f"Step 3 TOTAL FAIL: {total_ms:.1f}ms — gate is {_TOTAL_GATE_MS}ms " |
| 280 | f"(presign={presign_ms:.1f}ms unpack={unpack_ms:.1f}ms)" |
| 281 | ) |
| 282 | |
| 283 | print( |
| 284 | f"\n Step 3 — Presigned mpack upload\n" |
| 285 | f" MPack size: {len(wire_bytes) / 1024:.1f} KiB\n" |
| 286 | f" A. mpack-presign: {presign_ms:.1f}ms (gate {_PRESIGN_GATE_MS}ms) ✅\n" |
| 287 | f" B. PUT to MinIO: {put_ms:.1f}ms (client→MinIO, not gated)\n" |
| 288 | f" C. unpack-mpack: {unpack_ms:.1f}ms (gate {_UNPACK_GATE_MS}ms) ✅\n" |
| 289 | f" Total (A+C): {total_ms:.1f}ms (gate {_TOTAL_GATE_MS}ms) ✅\n" |
| 290 | f" Commits written: {result.get('commits_written')}\n" |
| 291 | f" Objects written: {result.get('objects_written')}" |
| 292 | ) |
File History
3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f
fix: use wire_bytes not mpack_bytes_raw in compute_object_b…
Sonnet 4.6
patch
10 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d
chore: doc sweep, ignore wrangler build state, misc fixes
Sonnet 4.6
minor
⚠
12 days ago