gabriel / musehub public
test_wire_step4c_scale.py python
289 lines 9.9 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """Wire protocol — scale gate matching real-world musehub repo dimensions.
2
3 Step 4 (test_wire_step4_e2e.py) uses 100 commits sharing ONE snapshot.
4 Real musehub push: 1028 commits × 1022 DISTINCT snapshots × 5151 objects.
5 That's the case that hangs — this test reproduces it.
6
7 Each commit gets its own snapshot (one file changed per commit), so the
8 server must INSERT ~1000 snapshot rows, ~1000 commit rows, and ~5000
9 object rows. Gate: full server pipeline < 10s.
10 """
11 from __future__ import annotations
12
13 import datetime
14 import hashlib
15 import pathlib
16 import time
17
18 import asyncio as _asyncio
19 import urllib.request
20 import msgpack
21 import pytest
22 import pytest_asyncio
23 from httpx import AsyncClient, ASGITransport
24 from sqlalchemy.ext.asyncio import AsyncSession
25
26 from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request
27 from musehub.db.database import get_db
28 from musehub.main import app
29
30 from muse.core.object_store import write_object
31 from muse.core.mpack import build_mpack, build_wire_mpack
32 from muse.core.paths import muse_dir
33 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
34 from muse.core.commits import CommitRecord, write_commit
35 from muse.core.refs import write_branch_ref
36 from muse.core.snapshots import SnapshotRecord, write_snapshot
37 from muse.core.types import blob_id
38
39
40 # ---------------------------------------------------------------------------
41 # Scale parameters — match real musehub repo dimensions
42 # ---------------------------------------------------------------------------
43
44 _N_COMMITS = 1_000
45 _N_OBJECTS = 5_000 # total distinct blobs
46 _BLOB_SIZE = 4_096
47 _E2E_GATE_S = 15.0 # generous gate; goal is not to hang (raised from 10s — Phase 2/3 add concurrent S3 puts for 1000 commits)
48
49
50 # ---------------------------------------------------------------------------
51 # Auth + fixtures
52 # ---------------------------------------------------------------------------
53
54 _AUTH_CTX = MSignContext(
55 handle="gabriel",
56 identity_id="sha256:" + "0" * 64,
57 is_agent=False,
58 is_admin=True,
59 )
60
61
62 @pytest_asyncio.fixture()
63 async def client(db_session: AsyncSession) -> None:
64 async def _override_get_db() -> None:
65 yield db_session
66
67 app.dependency_overrides[get_db] = _override_get_db
68 app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX
69 app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX
70
71 async with AsyncClient(
72 transport=ASGITransport(app=app),
73 base_url="https://localhost:1337",
74 ) as c:
75 yield c
76
77 app.dependency_overrides.clear()
78
79
80 @pytest_asyncio.fixture()
81 async def repo(client: AsyncClient) -> None:
82 resp = await client.post(
83 "/api/repos",
84 json={"owner": "gabriel", "name": "wire-scale", "visibility": "public", "initialize": False},
85 )
86 assert resp.status_code in (200, 201), f"repo create failed: {resp.text}"
87 data = resp.json()
88 slug = data["slug"]
89 repo_id = data["repoId"]
90 yield slug
91 await client.delete(f"/api/repos/{repo_id}")
92
93
94 # ---------------------------------------------------------------------------
95 # Local repo builder — one distinct snapshot per commit
96 # ---------------------------------------------------------------------------
97
98 def _make_repo(tmp: pathlib.Path) -> pathlib.Path:
99 tmp.mkdir(parents=True, exist_ok=True)
100 dot = muse_dir(tmp)
101 dot.mkdir()
102 (dot / "repo.json").write_text('{"repo_id":"scale","owner":"gabriel"}')
103 for d in ("commits", "snapshots", "objects"):
104 (dot / d).mkdir()
105 (dot / "refs" / "heads").mkdir(parents=True)
106 (dot / "HEAD").write_text("ref: refs/heads/main\n")
107 (dot / "config.toml").write_text("")
108 return tmp
109
110
111 def _populate(repo: pathlib.Path) -> str:
112 """Create _N_OBJECTS blobs, then _N_COMMITS commits each with a distinct snapshot.
113
114 Each commit swaps out one file so every snapshot is unique — this matches
115 the real-world case where every commit touches at least one file.
116 """
117 # Create all blobs up front.
118 blob_ids: list[str] = []
119 for i in range(_N_OBJECTS):
120 data = f"scale-{i:08d}-".encode() + b"x" * _BLOB_SIZE
121 oid = blob_id(data)
122 write_object(repo, oid, data)
123 blob_ids.append(oid)
124
125 # Base manifest: files 0…N_OBJECTS-1
126 base_manifest: dict[str, str] = {
127 f"file_{i:04d}.py": blob_ids[i] for i in range(_N_OBJECTS)
128 }
129
130 parent: str | None = None
131 tip = ""
132 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
133
134 for i in range(_N_COMMITS):
135 # Each commit replaces one file with a re-hashed variant, making the
136 # snapshot unique (different manifest → different snapshot_id).
137 manifest = dict(base_manifest)
138 variant = f"commit-{i:05d}-variant".encode() + b"y" * _BLOB_SIZE
139 variant_oid = blob_id(variant)
140 write_object(repo, variant_oid, variant)
141 manifest[f"file_{i % _N_OBJECTS:04d}.py"] = variant_oid
142
143 sid = compute_snapshot_id(manifest)
144 write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest))
145
146 msg = f"commit-{i:05d}"
147 cid = compute_commit_id(
148 parent_ids=[parent] if parent else [],
149 snapshot_id=sid,
150 message=msg,
151 committed_at_iso=ts.isoformat(),
152 author="gabriel",
153 )
154 rec = CommitRecord(
155 commit_id=cid,
156 branch="main",
157 snapshot_id=sid,
158 message=msg,
159 committed_at=ts,
160 parent_commit_id=parent,
161 parent2_commit_id=None,
162 author="gabriel",
163 metadata={},
164 structured_delta=None,
165 sem_ver_bump="none",
166 breaking_changes=[],
167 agent_id="",
168 model_id="",
169 toolchain_id="",
170 prompt_hash="",
171 signature="",
172 signer_key_id="",
173 )
174 write_commit(repo, rec)
175 parent = cid
176 tip = cid
177 ts += datetime.timedelta(seconds=60)
178
179 write_branch_ref(repo, "main", tip)
180 return tip
181
182
183 # ---------------------------------------------------------------------------
184 # THE test
185 # ---------------------------------------------------------------------------
186
187 @pytest.mark.asyncio
188 async def test_scale_unpack_mpack(
189 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession
190 ) -> None:
191 """Scale gate: 1000 commits × 1000 distinct snapshots × 5000 objects < 10s."""
192 owner = "gabriel"
193 branch = "main"
194
195 local_repo = _make_repo(tmp_path / "repo")
196
197 t_build = time.perf_counter()
198 head = _populate(local_repo)
199 mpack = build_mpack(local_repo, [head], have=[])
200 wire_bytes = build_wire_mpack(mpack)
201 mpack_key = blob_id(wire_bytes)
202 print(f"\n build+pack: {time.perf_counter()-t_build:.2f}s "
203 f"({len(wire_bytes)//1024} KiB)", flush=True)
204
205 t_server = time.perf_counter()
206
207 # 1. mpack-presign
208 presign_resp = await client.post(
209 f"/{owner}/{repo}/push/mpack-presign",
210 content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True),
211 headers={"Content-Type": "application/x-msgpack"},
212 )
213 assert presign_resp.status_code == 200, presign_resp.text
214 upload_url = presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl")
215 assert upload_url
216
217 t_presign = time.perf_counter()
218
219 # 2. PUT → MinIO
220 def _do_put() -> int:
221 req = urllib.request.Request(
222 upload_url,
223 data=wire_bytes,
224 headers={"Content-Type": "application/x-muse-pack"},
225 method="PUT",
226 )
227 with urllib.request.urlopen(req) as resp: # noqa: S310
228 return resp.status
229 put_status = await _asyncio.get_event_loop().run_in_executor(None, _do_put)
230 assert put_status in (200, 204), f"PUT failed {put_status}"
231
232 t_put = time.perf_counter()
233
234 # 3. unpack-mpack
235 unpack_resp = await client.post(
236 f"/{owner}/{repo}/push/unpack-mpack",
237 content=msgpack.packb(
238 {"mpack_key": mpack_key, "branch": branch, "head": head},
239 use_bin_type=True,
240 ),
241 headers={"Content-Type": "application/x-msgpack"},
242 )
243 assert unpack_resp.status_code == 200, unpack_resp.text
244 result = unpack_resp.json()
245
246 t_unpack = time.perf_counter()
247
248 # 4. verify branch head
249 refs_resp = await client.get(f"/{owner}/{repo}/refs")
250 assert refs_resp.status_code == 200
251 branch_heads = refs_resp.json().get("branch_heads", {})
252
253 t_verify = time.perf_counter()
254
255 server_ms = (
256 (t_presign - t_server) +
257 (t_unpack - t_put) +
258 (t_verify - t_unpack)
259 ) * 1000
260 put_ms = (t_put - t_presign) * 1000
261
262 if "job_id" in result and result.get("commits_in_mpack", 0) == 0:
263 from musehub.services.musehub_wire import process_mpack_index_job
264 job_result = await process_mpack_index_job(db_session, result["job_id"])
265 await db_session.commit()
266 result = {**result, **job_result}
267
268 assert result.get("commits_written") == _N_COMMITS, result
269 assert branch_heads.get(branch) == head, branch_heads
270
271 gate_ms = _E2E_GATE_S * 1000
272 assert server_ms < gate_ms, (
273 f"Scale gate FAIL: {server_ms:.0f}ms > {gate_ms:.0f}ms\n"
274 f" presign={( t_presign-t_server)*1000:.0f}ms "
275 f"unpack={(t_unpack-t_put)*1000:.0f}ms "
276 f"verify={(t_verify-t_unpack)*1000:.0f}ms"
277 )
278
279 print(
280 f"\n Scale gate — 1000 commits × 1000 snapshots × 5000 objects\n"
281 f" mpack: {len(wire_bytes)//1024} KiB\n"
282 f" presign: {(t_presign-t_server)*1000:.0f}ms\n"
283 f" PUT → MinIO: {put_ms:.0f}ms (not counted)\n"
284 f" unpack-mpack: {(t_unpack-t_put)*1000:.0f}ms\n"
285 f" verify refs: {(t_verify-t_unpack)*1000:.0f}ms\n"
286 f" server total: {server_ms:.0f}ms (gate {gate_ms:.0f}ms)\n"
287 f" commits written: {result.get('commits_written')}\n"
288 f" objects written: {result.get('objects_written')}"
289 )
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 10 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 12 days ago