gabriel / musehub public
test_wire_step3_upload.py python
292 lines 10.6 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """Wire protocol step 3 — Presigned mpack upload performance gate.
2
3 Ticket #45, Step 3: client uploads mpack as ONE presigned PUT; server unpacks
4 from storage using inline content_cache — no per-object MinIO PUTs in the
5 request path.
6
7 POST /{owner}/{slug}/push/mpack-presign < 100ms (get one PUT URL)
8 PUT <presigned_url> (client → MinIO) (measured, not gated here)
9 POST /{owner}/{slug}/push/unpack-mpack < 500ms (unpack 600 objects inline)
10 Total < 600ms
11
12 Step 2B baseline for comparison: 1,894ms for the same 600 objects
13 (bottleneck was 600 individual server-side MinIO put_object calls).
14
15 The presign and unpack-mpack endpoints do not exist yet — this test
16 drives their implementation.
17
18 If any assertion fails, step 3 is not done. Do not move to step 4.
19 """
20 from __future__ import annotations
21
22 import datetime
23 import pathlib
24 import time
25
26 import urllib.request
27 import msgpack
28 import pytest
29 import pytest_asyncio
30 from httpx import AsyncClient, ASGITransport
31 from sqlalchemy.ext.asyncio import AsyncSession
32
33 from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request
34 from musehub.db.database import get_db
35 from musehub.main import app
36
37 from muse.core.object_store import write_object
38 from muse.core.mpack import build_mpack, build_wire_mpack
39 from muse.core.paths import muse_dir
40 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
41 from muse.core.commits import CommitRecord, write_commit
42 from muse.core.snapshots import SnapshotRecord, write_snapshot
43 from muse.core.refs import write_branch_ref
44 from muse.core.types import blob_id
45
46
47 # ---------------------------------------------------------------------------
48 # Gate constants
49 # ---------------------------------------------------------------------------
50
51 _N_COMMITS = 100
52 _N_OBJECTS = 600
53 _BLOB_SIZE = 4096
54 _PRESIGN_GATE_MS = 1000 # includes auth + daily-limit DB query + MinIO presign
55 _UNPACK_GATE_MS = 2000
56 _TOTAL_GATE_MS = 3000
57
58
59 # ---------------------------------------------------------------------------
60 # Auth stub
61 # ---------------------------------------------------------------------------
62
63 _AUTH_CTX = MSignContext(
64 handle="gabriel",
65 identity_id="sha256:" + "0" * 64,
66 is_agent=False,
67 is_admin=True,
68 )
69
70
71 # ---------------------------------------------------------------------------
72 # ASGI client fixture
73 # ---------------------------------------------------------------------------
74
75 @pytest_asyncio.fixture()
76 async def client(db_session: AsyncSession) -> None:
77 async def _override_get_db() -> None:
78 yield db_session
79
80 app.dependency_overrides[get_db] = _override_get_db
81 app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX
82 app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX
83
84 async with AsyncClient(
85 transport=ASGITransport(app=app),
86 base_url="https://localhost:1337",
87 ) as c:
88 yield c
89
90 app.dependency_overrides.clear()
91
92
93 # ---------------------------------------------------------------------------
94 # Repo fixture
95 # ---------------------------------------------------------------------------
96
97 @pytest_asyncio.fixture()
98 async def repo(client: AsyncClient) -> None:
99 resp = await client.post(
100 "/api/repos",
101 json={"owner": "gabriel", "name": "wire-step3-bench", "visibility": "public", "initialize": False},
102 )
103 assert resp.status_code in (200, 201), f"repo create failed: {resp.text}"
104 data = resp.json()
105 slug = data["slug"]
106 repo_id = data["repoId"]
107 yield slug
108 await client.delete(f"/api/repos/{repo_id}")
109
110
111 # ---------------------------------------------------------------------------
112 # Local repo builder
113 # ---------------------------------------------------------------------------
114
115 def _make_repo(tmp: pathlib.Path) -> pathlib.Path:
116 tmp.mkdir(parents=True, exist_ok=True)
117 dot = muse_dir(tmp)
118 dot.mkdir()
119 (dot / "repo.json").write_text('{"repo_id":"step3","owner":"gabriel"}')
120 for d in ("commits", "snapshots", "objects"):
121 (dot / d).mkdir()
122 (dot / "refs" / "heads").mkdir(parents=True)
123 (dot / "HEAD").write_text("ref: refs/heads/main\n")
124 (dot / "config.toml").write_text("")
125 return tmp
126
127
128 def _populate(repo: pathlib.Path) -> str:
129 blobs: dict[str, str] = {}
130 for i in range(_N_OBJECTS):
131 data = f"step3-{i:08d}-".encode() + b"x" * _BLOB_SIZE
132 oid = blob_id(data)
133 write_object(repo, oid, data)
134 blobs[f"file_{i:04d}.py"] = oid
135
136 sid = compute_snapshot_id(blobs)
137 write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=blobs))
138
139 parent: str | None = None
140 tip = ""
141 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
142 for i in range(_N_COMMITS):
143 msg = f"commit-{i:05d}"
144 cid = compute_commit_id(
145 parent_ids=[parent] if parent else [],
146 snapshot_id=sid,
147 message=msg,
148 committed_at_iso=ts.isoformat(),
149 author="gabriel",
150 )
151 rec = CommitRecord(
152 commit_id=cid,
153 branch="main",
154 snapshot_id=sid,
155 message=msg,
156 committed_at=ts,
157 parent_commit_id=parent,
158 parent2_commit_id=None,
159 author="gabriel",
160 metadata={},
161 structured_delta=None,
162 sem_ver_bump="none",
163 breaking_changes=[],
164 agent_id="",
165 model_id="",
166 toolchain_id="",
167 prompt_hash="",
168 signature="",
169 signer_key_id="",
170 )
171 write_commit(repo, rec)
172 parent = cid
173 tip = cid
174
175 write_branch_ref(repo, "main", tip)
176 return tip
177
178
179 # ---------------------------------------------------------------------------
180 # THE test
181 # ---------------------------------------------------------------------------
182
183 @pytest.mark.asyncio
184 async def test_upload_step3_performance_gate(
185 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession
186 ) -> None:
187 """Step 3 gate: presign < 100ms, unpack-mpack < 500ms, total < 600ms.
188
189 Flow:
190 1. Build mpack locally (step 2A path — already gated).
191 2. POST /push/mpack-presign → server returns one presigned PUT URL +
192 the mpack_key (sha256 of the mpack bytes).
193 3. Client PUTs wire_bytes directly to MinIO via the presigned URL.
194 4. POST /push/unpack-mpack {mpack_key} → server reads ONE mpack from
195 MinIO, stores all objects inline in content_cache (no per-object PUT),
196 indexes snapshots + commits into PG.
197 """
198 owner = "gabriel"
199
200 # Build mpack (client side)
201 local_repo = _make_repo(tmp_path / "repo")
202 head = _populate(local_repo)
203 mpack = build_mpack(local_repo, [head], have=[])
204 wire_bytes = build_wire_mpack(mpack)
205
206 # Compute blob_id of the mpack bytes — this becomes the mpack_key
207 mpack_id = blob_id(wire_bytes)
208
209 # ── A. GET presigned PUT URL ──────────────────────────────────────────────
210 t0 = time.perf_counter()
211 presign_resp = await client.post(
212 f"/{owner}/{repo}/push/mpack-presign",
213 content=msgpack.packb({"mpack_key": mpack_id, "size_bytes": len(wire_bytes)}, use_bin_type=True),
214 headers={"Content-Type": "application/x-msgpack"},
215 )
216 presign_ms = (time.perf_counter() - t0) * 1000
217
218 assert presign_resp.status_code == 200, (
219 f"POST /push/mpack-presign failed ({presign_resp.status_code}): {presign_resp.text}"
220 )
221 presign_data = presign_resp.json()
222 upload_url = presign_data.get("upload_url", "") or presign_data.get("uploadUrl", "")
223 assert upload_url, f"mpack-presign returned no upload_url: {presign_data}"
224
225 assert presign_ms < _PRESIGN_GATE_MS, (
226 f"Step 3A FAIL: mpack-presign took {presign_ms:.1f}ms — gate is {_PRESIGN_GATE_MS}ms"
227 )
228
229 # ── B. PUT mpack to presigned URL (client → MinIO directly) ─────────────
230 t1 = time.perf_counter()
231 import asyncio as _asyncio
232 def _do_put() -> int:
233 req = urllib.request.Request(
234 upload_url,
235 data=wire_bytes,
236 headers={"Content-Type": "application/x-muse-pack"},
237 method="PUT",
238 )
239 with urllib.request.urlopen(req) as resp: # noqa: S310
240 return resp.status
241 put_status = await _asyncio.get_event_loop().run_in_executor(None, _do_put)
242 put_ms = (time.perf_counter() - t1) * 1000
243 assert put_status in (200, 204), (
244 f"presigned PUT failed ({put_status})"
245 )
246
247 # ── C. Trigger server-side unpack from storage ────────────────────────────
248 t2 = time.perf_counter()
249 unpack_resp = await client.post(
250 f"/{owner}/{repo}/push/unpack-mpack",
251 content=msgpack.packb({"mpack_key": mpack_id}, use_bin_type=True),
252 headers={"Content-Type": "application/x-msgpack"},
253 )
254 unpack_ms = (time.perf_counter() - t2) * 1000
255 total_ms = presign_ms + unpack_ms
256
257 assert unpack_resp.status_code == 200, (
258 f"POST /push/unpack-mpack failed ({unpack_resp.status_code}): {unpack_resp.text}"
259 )
260 result = unpack_resp.json()
261
262 if "job_id" in result and result.get("commits_in_mpack", 0) == 0:
263 from musehub.services.musehub_wire import process_mpack_index_job
264 job_result = await process_mpack_index_job(db_session, result["job_id"])
265 await db_session.commit()
266 result = {**result, **job_result}
267
268 assert result.get("commits_written", 0) == _N_COMMITS, (
269 f"expected {_N_COMMITS} commits_written, got {result}"
270 )
271 assert result.get("blobs_written", 0) == _N_OBJECTS, (
272 f"expected {_N_OBJECTS} objects_written, got {result}"
273 )
274
275 assert unpack_ms < _UNPACK_GATE_MS, (
276 f"Step 3C FAIL: unpack-mpack took {unpack_ms:.1f}ms — gate is {_UNPACK_GATE_MS}ms"
277 )
278 assert total_ms < _TOTAL_GATE_MS, (
279 f"Step 3 TOTAL FAIL: {total_ms:.1f}ms — gate is {_TOTAL_GATE_MS}ms "
280 f"(presign={presign_ms:.1f}ms unpack={unpack_ms:.1f}ms)"
281 )
282
283 print(
284 f"\n Step 3 — Presigned mpack upload\n"
285 f" MPack size: {len(wire_bytes) / 1024:.1f} KiB\n"
286 f" A. mpack-presign: {presign_ms:.1f}ms (gate {_PRESIGN_GATE_MS}ms) ✅\n"
287 f" B. PUT to MinIO: {put_ms:.1f}ms (client→MinIO, not gated)\n"
288 f" C. unpack-mpack: {unpack_ms:.1f}ms (gate {_UNPACK_GATE_MS}ms) ✅\n"
289 f" Total (A+C): {total_ms:.1f}ms (gate {_TOTAL_GATE_MS}ms) ✅\n"
290 f" Commits written: {result.get('commits_written')}\n"
291 f" Objects written: {result.get('objects_written')}"
292 )
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 10 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 12 days ago