test_wire_step4c_scale.py
python
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
| 1 | """Wire protocol — scale gate matching real-world musehub repo dimensions. |
| 2 | |
| 3 | Step 4 (test_wire_step4_e2e.py) uses 100 commits sharing ONE snapshot. |
| 4 | Real musehub push: 1028 commits × 1022 DISTINCT snapshots × 5151 objects. |
| 5 | That's the case that hangs — this test reproduces it. |
| 6 | |
| 7 | Each commit gets its own snapshot (one file changed per commit), so the |
| 8 | server must INSERT ~1000 snapshot rows, ~1000 commit rows, and ~5000 |
| 9 | object rows. Gate: full server pipeline < 10s. |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import datetime |
| 14 | import hashlib |
| 15 | import pathlib |
| 16 | import time |
| 17 | |
| 18 | import asyncio as _asyncio |
| 19 | import urllib.request |
| 20 | import msgpack |
| 21 | import pytest |
| 22 | import pytest_asyncio |
| 23 | from httpx import AsyncClient, ASGITransport |
| 24 | from sqlalchemy.ext.asyncio import AsyncSession |
| 25 | |
| 26 | from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request |
| 27 | from musehub.db.database import get_db |
| 28 | from musehub.main import app |
| 29 | |
| 30 | from muse.core.object_store import write_object |
| 31 | from muse.core.mpack import build_mpack, build_wire_mpack |
| 32 | from muse.core.paths import muse_dir |
| 33 | from muse.core.snapshot import compute_commit_id, compute_snapshot_id |
| 34 | from muse.core.commits import CommitRecord, write_commit |
| 35 | from muse.core.refs import write_branch_ref |
| 36 | from muse.core.snapshots import SnapshotRecord, write_snapshot |
| 37 | from muse.core.types import blob_id |
| 38 | |
| 39 | |
| 40 | # --------------------------------------------------------------------------- |
| 41 | # Scale parameters — match real musehub repo dimensions |
| 42 | # --------------------------------------------------------------------------- |
| 43 | |
| 44 | _N_COMMITS = 1_000 |
| 45 | _N_OBJECTS = 5_000 # total distinct blobs |
| 46 | _BLOB_SIZE = 4_096 |
| 47 | _E2E_GATE_S = 15.0 # generous gate; goal is not to hang (raised from 10s — Phase 2/3 add concurrent S3 puts for 1000 commits) |
| 48 | |
| 49 | |
| 50 | # --------------------------------------------------------------------------- |
| 51 | # Auth + fixtures |
| 52 | # --------------------------------------------------------------------------- |
| 53 | |
| 54 | _AUTH_CTX = MSignContext( |
| 55 | handle="gabriel", |
| 56 | identity_id="sha256:" + "0" * 64, |
| 57 | is_agent=False, |
| 58 | is_admin=True, |
| 59 | ) |
| 60 | |
| 61 | |
| 62 | @pytest_asyncio.fixture() |
| 63 | async def client(db_session: AsyncSession) -> None: |
| 64 | async def _override_get_db() -> None: |
| 65 | yield db_session |
| 66 | |
| 67 | app.dependency_overrides[get_db] = _override_get_db |
| 68 | app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX |
| 69 | app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX |
| 70 | |
| 71 | async with AsyncClient( |
| 72 | transport=ASGITransport(app=app), |
| 73 | base_url="https://localhost:1337", |
| 74 | ) as c: |
| 75 | yield c |
| 76 | |
| 77 | app.dependency_overrides.clear() |
| 78 | |
| 79 | |
| 80 | @pytest_asyncio.fixture() |
| 81 | async def repo(client: AsyncClient) -> None: |
| 82 | resp = await client.post( |
| 83 | "/api/repos", |
| 84 | json={"owner": "gabriel", "name": "wire-scale", "visibility": "public", "initialize": False}, |
| 85 | ) |
| 86 | assert resp.status_code in (200, 201), f"repo create failed: {resp.text}" |
| 87 | data = resp.json() |
| 88 | slug = data["slug"] |
| 89 | repo_id = data["repoId"] |
| 90 | yield slug |
| 91 | await client.delete(f"/api/repos/{repo_id}") |
| 92 | |
| 93 | |
| 94 | # --------------------------------------------------------------------------- |
| 95 | # Local repo builder — one distinct snapshot per commit |
| 96 | # --------------------------------------------------------------------------- |
| 97 | |
| 98 | def _make_repo(tmp: pathlib.Path) -> pathlib.Path: |
| 99 | tmp.mkdir(parents=True, exist_ok=True) |
| 100 | dot = muse_dir(tmp) |
| 101 | dot.mkdir() |
| 102 | (dot / "repo.json").write_text('{"repo_id":"scale","owner":"gabriel"}') |
| 103 | for d in ("commits", "snapshots", "objects"): |
| 104 | (dot / d).mkdir() |
| 105 | (dot / "refs" / "heads").mkdir(parents=True) |
| 106 | (dot / "HEAD").write_text("ref: refs/heads/main\n") |
| 107 | (dot / "config.toml").write_text("") |
| 108 | return tmp |
| 109 | |
| 110 | |
| 111 | def _populate(repo: pathlib.Path) -> str: |
| 112 | """Create _N_OBJECTS blobs, then _N_COMMITS commits each with a distinct snapshot. |
| 113 | |
| 114 | Each commit swaps out one file so every snapshot is unique — this matches |
| 115 | the real-world case where every commit touches at least one file. |
| 116 | """ |
| 117 | # Create all blobs up front. |
| 118 | blob_ids: list[str] = [] |
| 119 | for i in range(_N_OBJECTS): |
| 120 | data = f"scale-{i:08d}-".encode() + b"x" * _BLOB_SIZE |
| 121 | oid = blob_id(data) |
| 122 | write_object(repo, oid, data) |
| 123 | blob_ids.append(oid) |
| 124 | |
| 125 | # Base manifest: files 0…N_OBJECTS-1 |
| 126 | base_manifest: dict[str, str] = { |
| 127 | f"file_{i:04d}.py": blob_ids[i] for i in range(_N_OBJECTS) |
| 128 | } |
| 129 | |
| 130 | parent: str | None = None |
| 131 | tip = "" |
| 132 | ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) |
| 133 | |
| 134 | for i in range(_N_COMMITS): |
| 135 | # Each commit replaces one file with a re-hashed variant, making the |
| 136 | # snapshot unique (different manifest → different snapshot_id). |
| 137 | manifest = dict(base_manifest) |
| 138 | variant = f"commit-{i:05d}-variant".encode() + b"y" * _BLOB_SIZE |
| 139 | variant_oid = blob_id(variant) |
| 140 | write_object(repo, variant_oid, variant) |
| 141 | manifest[f"file_{i % _N_OBJECTS:04d}.py"] = variant_oid |
| 142 | |
| 143 | sid = compute_snapshot_id(manifest) |
| 144 | write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest)) |
| 145 | |
| 146 | msg = f"commit-{i:05d}" |
| 147 | cid = compute_commit_id( |
| 148 | parent_ids=[parent] if parent else [], |
| 149 | snapshot_id=sid, |
| 150 | message=msg, |
| 151 | committed_at_iso=ts.isoformat(), |
| 152 | author="gabriel", |
| 153 | ) |
| 154 | rec = CommitRecord( |
| 155 | commit_id=cid, |
| 156 | branch="main", |
| 157 | snapshot_id=sid, |
| 158 | message=msg, |
| 159 | committed_at=ts, |
| 160 | parent_commit_id=parent, |
| 161 | parent2_commit_id=None, |
| 162 | author="gabriel", |
| 163 | metadata={}, |
| 164 | structured_delta=None, |
| 165 | sem_ver_bump="none", |
| 166 | breaking_changes=[], |
| 167 | agent_id="", |
| 168 | model_id="", |
| 169 | toolchain_id="", |
| 170 | prompt_hash="", |
| 171 | signature="", |
| 172 | signer_key_id="", |
| 173 | ) |
| 174 | write_commit(repo, rec) |
| 175 | parent = cid |
| 176 | tip = cid |
| 177 | ts += datetime.timedelta(seconds=60) |
| 178 | |
| 179 | write_branch_ref(repo, "main", tip) |
| 180 | return tip |
| 181 | |
| 182 | |
| 183 | # --------------------------------------------------------------------------- |
| 184 | # THE test |
| 185 | # --------------------------------------------------------------------------- |
| 186 | |
| 187 | @pytest.mark.asyncio |
| 188 | async def test_scale_unpack_mpack( |
| 189 | client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession |
| 190 | ) -> None: |
| 191 | """Scale gate: 1000 commits × 1000 distinct snapshots × 5000 objects < 10s.""" |
| 192 | owner = "gabriel" |
| 193 | branch = "main" |
| 194 | |
| 195 | local_repo = _make_repo(tmp_path / "repo") |
| 196 | |
| 197 | t_build = time.perf_counter() |
| 198 | head = _populate(local_repo) |
| 199 | mpack = build_mpack(local_repo, [head], have=[]) |
| 200 | wire_bytes = build_wire_mpack(mpack) |
| 201 | mpack_key = blob_id(wire_bytes) |
| 202 | print(f"\n build+pack: {time.perf_counter()-t_build:.2f}s " |
| 203 | f"({len(wire_bytes)//1024} KiB)", flush=True) |
| 204 | |
| 205 | t_server = time.perf_counter() |
| 206 | |
| 207 | # 1. mpack-presign |
| 208 | presign_resp = await client.post( |
| 209 | f"/{owner}/{repo}/push/mpack-presign", |
| 210 | content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True), |
| 211 | headers={"Content-Type": "application/x-msgpack"}, |
| 212 | ) |
| 213 | assert presign_resp.status_code == 200, presign_resp.text |
| 214 | upload_url = presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl") |
| 215 | assert upload_url |
| 216 | |
| 217 | t_presign = time.perf_counter() |
| 218 | |
| 219 | # 2. PUT → MinIO |
| 220 | def _do_put() -> int: |
| 221 | req = urllib.request.Request( |
| 222 | upload_url, |
| 223 | data=wire_bytes, |
| 224 | headers={"Content-Type": "application/x-muse-pack"}, |
| 225 | method="PUT", |
| 226 | ) |
| 227 | with urllib.request.urlopen(req) as resp: # noqa: S310 |
| 228 | return resp.status |
| 229 | put_status = await _asyncio.get_event_loop().run_in_executor(None, _do_put) |
| 230 | assert put_status in (200, 204), f"PUT failed {put_status}" |
| 231 | |
| 232 | t_put = time.perf_counter() |
| 233 | |
| 234 | # 3. unpack-mpack |
| 235 | unpack_resp = await client.post( |
| 236 | f"/{owner}/{repo}/push/unpack-mpack", |
| 237 | content=msgpack.packb( |
| 238 | {"mpack_key": mpack_key, "branch": branch, "head": head}, |
| 239 | use_bin_type=True, |
| 240 | ), |
| 241 | headers={"Content-Type": "application/x-msgpack"}, |
| 242 | ) |
| 243 | assert unpack_resp.status_code == 200, unpack_resp.text |
| 244 | result = unpack_resp.json() |
| 245 | |
| 246 | t_unpack = time.perf_counter() |
| 247 | |
| 248 | # 4. verify branch head |
| 249 | refs_resp = await client.get(f"/{owner}/{repo}/refs") |
| 250 | assert refs_resp.status_code == 200 |
| 251 | branch_heads = refs_resp.json().get("branch_heads", {}) |
| 252 | |
| 253 | t_verify = time.perf_counter() |
| 254 | |
| 255 | server_ms = ( |
| 256 | (t_presign - t_server) + |
| 257 | (t_unpack - t_put) + |
| 258 | (t_verify - t_unpack) |
| 259 | ) * 1000 |
| 260 | put_ms = (t_put - t_presign) * 1000 |
| 261 | |
| 262 | if "job_id" in result and result.get("commits_in_mpack", 0) == 0: |
| 263 | from musehub.services.musehub_wire import process_mpack_index_job |
| 264 | job_result = await process_mpack_index_job(db_session, result["job_id"]) |
| 265 | await db_session.commit() |
| 266 | result = {**result, **job_result} |
| 267 | |
| 268 | assert result.get("commits_written") == _N_COMMITS, result |
| 269 | assert branch_heads.get(branch) == head, branch_heads |
| 270 | |
| 271 | gate_ms = _E2E_GATE_S * 1000 |
| 272 | assert server_ms < gate_ms, ( |
| 273 | f"Scale gate FAIL: {server_ms:.0f}ms > {gate_ms:.0f}ms\n" |
| 274 | f" presign={( t_presign-t_server)*1000:.0f}ms " |
| 275 | f"unpack={(t_unpack-t_put)*1000:.0f}ms " |
| 276 | f"verify={(t_verify-t_unpack)*1000:.0f}ms" |
| 277 | ) |
| 278 | |
| 279 | print( |
| 280 | f"\n Scale gate — 1000 commits × 1000 snapshots × 5000 objects\n" |
| 281 | f" mpack: {len(wire_bytes)//1024} KiB\n" |
| 282 | f" presign: {(t_presign-t_server)*1000:.0f}ms\n" |
| 283 | f" PUT → MinIO: {put_ms:.0f}ms (not counted)\n" |
| 284 | f" unpack-mpack: {(t_unpack-t_put)*1000:.0f}ms\n" |
| 285 | f" verify refs: {(t_verify-t_unpack)*1000:.0f}ms\n" |
| 286 | f" server total: {server_ms:.0f}ms (gate {gate_ms:.0f}ms)\n" |
| 287 | f" commits written: {result.get('commits_written')}\n" |
| 288 | f" objects written: {result.get('objects_written')}" |
| 289 | ) |
File History
3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f
fix: use wire_bytes not mpack_bytes_raw in compute_object_b…
Sonnet 4.6
patch
10 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d
chore: doc sweep, ignore wrangler build state, misc fixes
Sonnet 4.6
minor
⚠
12 days ago