"""Wire Protocol — Stress and Performance tests. Stress (layer 4): high concurrency, large payloads, sustained load. Performance (layer 7): latency budgets, query efficiency, memory/connection bounds. Rate-limit budget per test (reset by conftest.reset_rate_limiter): WIRE_PUSH_LIMIT = 30/minute WIRE_FETCH_LIMIT = 120/minute Object storage is isolated to a per-test temp directory by conftest._tmp_objects_dir. """ from __future__ import annotations import time import uuid from datetime import datetime, timezone import msgpack import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.db import musehub_models as db from musehub.models.wire import MAX_OBJECTS_PER_PUSH from tests.factories import create_repo as factory_create_repo from musehub.muse_contracts.json_types import JSONObject, StrDict # ── helpers ──────────────────────────────────────────────────────────────────── def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) def _mp(data: JSONObject) -> bytes: return msgpack.packb(data, use_bin_type=True) def _make_commit(repo_id: str, commit_id: str | None = None, parent: str | None = None) -> JSONObject: return { "commit_id": commit_id or uuid.uuid4().hex, "repo_id": repo_id, "branch": "main", "snapshot_id": None, "message": "stress test commit", "committed_at": _utc_now().isoformat(), "parent_commit_id": parent, "author": "Stress ", "sem_ver_bump": "patch", } def _make_object(content: bytes | None = None, path: str = "file.bin") -> JSONObject: return { "object_id": uuid.uuid4().hex, "content": content or uuid.uuid4().bytes * 4, # 64 bytes of pseudo-random "path": path, } def _make_snapshot(snap_id: str, obj_map: StrDict) -> JSONObject: return { "snapshot_id": snap_id, "manifest": obj_map, "created_at": _utc_now().isoformat(), } def _push_payload(repo_id: str, commits: list[dict], snapshots: list[dict] = (), objects: list[dict] = (), branch: str = "main", force: bool = False) -> bytes: return _mp({ "bundle": { "commits": list(commits), "snapshots": list(snapshots), "objects": list(objects), }, "branch": branch, "force": force, }) _MP_HEADERS = {"Content-Type": "application/x-msgpack", "Accept": "application/x-msgpack"} # ── Stress 1: concurrent pushes to independent repos ───────────────────────── @pytest.mark.asyncio async def test_high_volume_pushes_to_separate_repos_all_succeed( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """20 sequential pushes to 20 different repos must all succeed within budget. Validates the push path handles high repo-fan-out volume correctly. Uses sequential dispatch to validate correctness at volume, exercising the same code paths as concurrent real-world pushes. """ N = 20 repos = [ await factory_create_repo(db_session, slug=f"vol-push-{i}", owner="test-user-wire") for i in range(N) ] t0 = time.perf_counter() for repo in repos: payload = _push_payload(repo.repo_id, [_make_commit(repo.repo_id)]) resp = await client.post( f"/{repo.owner}/{repo.slug}/push", content=payload, headers=wire_headers, ) assert resp.status_code == 200, f"Push to {repo.slug} failed: {resp.text}" elapsed = time.perf_counter() - t0 assert elapsed < 10.0, f"20 repo pushes took {elapsed:.2f}s" # ── Stress 2: concurrent fetch readers on the same repo ─────────────────────── @pytest.mark.asyncio async def test_high_volume_fetches_same_repo( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """30 sequential fetch requests against the same repo must all return 200. Validates the BFS read path handles sustained fetch load without errors. fetch has a 120/min rate limit so 30 requests stays well within budget. """ repo = await factory_create_repo(db_session, slug="volume-fetch-repo", owner="test-user-wire") # Seed 5 commits commits = [] parent = None for i in range(5): cid = uuid.uuid4().hex c = _make_commit(repo.repo_id, commit_id=cid, parent=parent) commits.append(c) parent = cid push_r = await client.post( f"/{repo.owner}/{repo.slug}/push", content=_push_payload(repo.repo_id, commits), headers=wire_headers, ) assert push_r.status_code == 200 tip_id = commits[-1]["commit_id"] t0 = time.perf_counter() for i in range(30): resp = await client.post( f"/{repo.owner}/{repo.slug}/fetch", content=_mp({"want": [tip_id], "have": []}), headers=_MP_HEADERS, ) assert resp.status_code == 200, f"Fetch {i} failed" elapsed = time.perf_counter() - t0 assert elapsed < 10.0, f"30 fetches took {elapsed:.2f}s" # ── Stress 3: large bundle — many objects ───────────────────────────────────── @pytest.mark.asyncio async def test_large_object_bundle_push_and_fetch( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """Push a bundle with 100 objects; fetch returns all of them with correct content. Tests that the pack assembly and BFS object collection handles large manifests without truncation or silent data loss. """ OBJECT_COUNT = 100 repo = await factory_create_repo(db_session, slug="large-bundle-test", owner="test-user-wire") objects = [_make_object(f"content-{i}".encode(), f"file_{i}.bin") for i in range(OBJECT_COUNT)] manifest = {obj["path"]: obj["object_id"] for obj in objects} snap_id = f"snap_{uuid.uuid4().hex[:8]}" snap = _make_snapshot(snap_id, manifest) commit_id = uuid.uuid4().hex commit = _make_commit(repo.repo_id, commit_id=commit_id) commit["snapshot_id"] = snap_id push_resp = await client.post( f"/{repo.owner}/{repo.slug}/push", content=_push_payload(repo.repo_id, [commit], [snap], objects), headers=wire_headers, ) assert push_resp.status_code == 200, push_resp.text fetch_resp = await client.post( f"/{repo.owner}/{repo.slug}/fetch", content=_mp({"want": [commit_id], "have": []}), headers=_MP_HEADERS, ) assert fetch_resp.status_code == 200 data = msgpack.unpackb(fetch_resp.content, raw=False) assert len(data["objects"]) == OBJECT_COUNT # ── Stress 4: deep commit chain BFS fetch ──────────────────────────────────── @pytest.mark.asyncio async def test_deep_commit_chain_bfs_fetch( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """Push a 25-commit chain; fetch from tip with have=[root] returns exactly 24 commits. Validates BFS traversal stops correctly at the have boundary and doesn't over-fetch or under-fetch when the chain has known depth. """ DEPTH = 25 repo = await factory_create_repo(db_session, slug="deep-bfs-test", owner="test-user-wire") commits = [] parent = None for _ in range(DEPTH): cid = uuid.uuid4().hex c = _make_commit(repo.repo_id, commit_id=cid, parent=parent) commits.append(c) parent = cid # Push in batches to stay within push rate limit (30/min) batch_size = 25 for i in range(0, len(commits), batch_size): batch = commits[i:i + batch_size] r = await client.post( f"/{repo.owner}/{repo.slug}/push", content=_push_payload(repo.repo_id, batch, force=True), headers=wire_headers, ) assert r.status_code == 200, f"Batch push failed: {r.text}" root_id = commits[0]["commit_id"] tip_id = commits[-1]["commit_id"] fetch_resp = await client.post( f"/{repo.owner}/{repo.slug}/fetch", content=_mp({"want": [tip_id], "have": [root_id]}), headers=_MP_HEADERS, ) assert fetch_resp.status_code == 200 data = msgpack.unpackb(fetch_resp.content, raw=False) fetched_ids = {c["commit_id"] for c in data["commits"]} # Must include all commits between root and tip (exclusive of root) assert tip_id in fetched_ids assert root_id not in fetched_ids assert len(data["commits"]) == DEPTH - 1 # ── Stress 5: filter-objects with large ID set ─────────────────────────────── @pytest.mark.asyncio async def test_filter_objects_large_set( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """filter-objects with 500 unknown IDs returns all 500 as missing. Tests the single-IN-clause query handles a large ID list without truncation or query plan degradation. """ repo = await factory_create_repo(db_session, slug="filter-large-test", owner="test-user-wire") oids = [uuid.uuid4().hex for _ in range(500)] resp = await client.post( f"/{repo.owner}/{repo.slug}/filter-objects", content=_mp({"object_ids": oids}), headers=wire_headers, ) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) assert len(data["missing"]) == 500 # ── Stress 6: push/objects chunked pre-upload — many objects ────────────────── @pytest.mark.asyncio async def test_push_objects_large_batch( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """push/objects with 50 objects stores all 50 and reports correct stored count.""" repo = await factory_create_repo(db_session, slug="push-objects-large-test", owner="test-user-wire") objects = [_make_object(f"blob-{i}".encode()) for i in range(50)] resp = await client.post( f"/{repo.owner}/{repo.slug}/push/objects", content=_mp({"objects": objects}), headers=wire_headers, ) assert resp.status_code == 200 data = msgpack.unpackb(resp.content, raw=False) assert data["stored"] == 50 assert data["skipped"] == 0 # ── Performance 1: single push round-trip latency ──────────────────────────── @pytest.mark.asyncio async def test_single_push_latency_under_budget( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """A minimal push (1 commit, 0 objects) must complete in under 500ms. Tests the fast path for incremental pushes where objects are already on the server and only the commit+branch pointer needs to be written. """ repo = await factory_create_repo(db_session, slug="push-latency-test", owner="test-user-wire") payload = _push_payload(repo.repo_id, [_make_commit(repo.repo_id)]) t0 = time.perf_counter() resp = await client.post( f"/{repo.owner}/{repo.slug}/push", content=payload, headers=wire_headers, ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert resp.status_code == 200 assert elapsed_ms < 500, f"Push took {elapsed_ms:.0f}ms — exceeds 500ms budget" # ── Performance 2: fetch latency with moderate history ─────────────────────── @pytest.mark.asyncio async def test_fetch_latency_10_commits_under_budget( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """Fetching a 10-commit chain must complete in under 500ms. Tests that BFS traversal + commit/snapshot assembly stays fast for typical incremental pull sizes. """ repo = await factory_create_repo(db_session, slug="fetch-latency-test", owner="test-user-wire") commits = [] parent = None for _ in range(10): cid = uuid.uuid4().hex c = _make_commit(repo.repo_id, commit_id=cid, parent=parent) commits.append(c) parent = cid push_resp = await client.post( f"/{repo.owner}/{repo.slug}/push", content=_push_payload(repo.repo_id, commits), headers=wire_headers, ) assert push_resp.status_code == 200 tip_id = commits[-1]["commit_id"] t0 = time.perf_counter() fetch_resp = await client.post( f"/{repo.owner}/{repo.slug}/fetch", content=_mp({"want": [tip_id], "have": []}), headers=_MP_HEADERS, ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert fetch_resp.status_code == 200 assert elapsed_ms < 500, f"Fetch took {elapsed_ms:.0f}ms — exceeds 500ms budget" # ── Performance 3: filter-objects latency ───────────────────────────────────── @pytest.mark.asyncio async def test_filter_objects_100_ids_under_budget( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """filter-objects with 100 IDs must respond in under 200ms. The single IN-clause query must be fast enough that incremental pushes using MWP object deduplication do not add noticeable latency. """ repo = await factory_create_repo(db_session, slug="filter-latency-test", owner="test-user-wire") oids = [uuid.uuid4().hex for _ in range(100)] t0 = time.perf_counter() resp = await client.post( f"/{repo.owner}/{repo.slug}/filter-objects", content=_mp({"object_ids": oids}), headers=wire_headers, ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert resp.status_code == 200 assert elapsed_ms < 200, f"filter-objects took {elapsed_ms:.0f}ms — exceeds 200ms budget" # ── Performance 4: BFS does not degrade with depth ──────────────────────────── @pytest.mark.asyncio async def test_bfs_fetch_shallow_faster_than_deep( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """Fetching a 5-commit window should be faster than fetching a 20-commit window. Validates BFS is proportional to the delta size — not to the total history size. Both must still be within the 1s absolute budget. """ repo = await factory_create_repo(db_session, slug="bfs-depth-perf-test", owner="test-user-wire") commits = [] parent = None for _ in range(20): cid = uuid.uuid4().hex c = _make_commit(repo.repo_id, commit_id=cid, parent=parent) commits.append(c) parent = cid push_resp = await client.post( f"/{repo.owner}/{repo.slug}/push", content=_push_payload(repo.repo_id, commits), headers=wire_headers, ) assert push_resp.status_code == 200 # Shallow: fetch only last 5 commits (have = commit[14]) shallow_have = commits[14]["commit_id"] tip_id = commits[-1]["commit_id"] t0 = time.perf_counter() resp_shallow = await client.post( f"/{repo.owner}/{repo.slug}/fetch", content=_mp({"want": [tip_id], "have": [shallow_have]}), headers=_MP_HEADERS, ) shallow_ms = (time.perf_counter() - t0) * 1000 assert resp_shallow.status_code == 200 assert len(msgpack.unpackb(resp_shallow.content, raw=False)["commits"]) == 5 # Deep: fetch all 20 commits (have = []) t0 = time.perf_counter() resp_deep = await client.post( f"/{repo.owner}/{repo.slug}/fetch", content=_mp({"want": [tip_id], "have": []}), headers=_MP_HEADERS, ) deep_ms = (time.perf_counter() - t0) * 1000 assert resp_deep.status_code == 200 assert len(msgpack.unpackb(resp_deep.content, raw=False)["commits"]) == 20 # Both within absolute budget assert shallow_ms < 1000, f"Shallow fetch took {shallow_ms:.0f}ms" assert deep_ms < 1000, f"Deep fetch took {deep_ms:.0f}ms" # ── Performance 5: refs endpoint is cheap ──────────────────────────────────── @pytest.mark.asyncio async def test_refs_latency_under_budget( client: AsyncClient, db_session: AsyncSession, ) -> None: """GET /refs must respond in under 100ms — it is the preflight for every push/pull.""" repo = await factory_create_repo(db_session, slug="refs-latency-test") # Warm up await client.get(f"/{repo.owner}/{repo.slug}/refs") t0 = time.perf_counter() resp = await client.get(f"/{repo.owner}/{repo.slug}/refs") elapsed_ms = (time.perf_counter() - t0) * 1000 assert resp.status_code == 200 assert elapsed_ms < 100, f"GET /refs took {elapsed_ms:.0f}ms — exceeds 100ms budget" # ── Performance 6: negotiate is cheap ──────────────────────────────────────── @pytest.mark.asyncio async def test_negotiate_latency_under_budget( client: AsyncClient, db_session: AsyncSession, ) -> None: """negotiate with 10 have IDs must complete in under 200ms. It is called once per push/pull negotiation round — high latency here adds directly to perceived CLI responsiveness. """ repo = await factory_create_repo(db_session, slug="negotiate-latency-test") # Seed a commit the server knows about commit_id = uuid.uuid4().hex db_session.add(db.MusehubCommit( commit_id=commit_id, repo_id=repo.repo_id, branch="main", parent_ids=[], message="base", author="T", timestamp=_utc_now(), snapshot_id=None, commit_meta={}, )) await db_session.commit() have = [uuid.uuid4().hex for _ in range(9)] + [commit_id] t0 = time.perf_counter() resp = await client.post( f"/{repo.owner}/{repo.slug}/negotiate", content=_mp({"have": have, "want": [commit_id]}), headers={"Content-Type": "application/x-msgpack", "Accept": "application/x-msgpack"}, ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert resp.status_code == 200 assert elapsed_ms < 200, f"negotiate took {elapsed_ms:.0f}ms — exceeds 200ms budget" # ── Performance 7: sequential push throughput ───────────────────────────────── @pytest.mark.asyncio async def test_sequential_push_throughput( client: AsyncClient, db_session: AsyncSession, wire_headers: StrDict, ) -> None: """20 sequential pushes (each with 1 commit) complete in under 5 seconds. This is the hot path for ``muse push`` on a developer machine. 20 requests × 250ms/req = 5s worst-case budget. """ repo = await factory_create_repo(db_session, slug="push-throughput-test", owner="test-user-wire") parent = None t0 = time.perf_counter() for _ in range(20): cid = uuid.uuid4().hex commit = _make_commit(repo.repo_id, commit_id=cid, parent=parent) resp = await client.post( f"/{repo.owner}/{repo.slug}/push", content=_push_payload(repo.repo_id, [commit]), headers=wire_headers, ) assert resp.status_code == 200 parent = cid elapsed = time.perf_counter() - t0 assert elapsed < 5.0, f"20 sequential pushes took {elapsed:.2f}s — exceeds 5s budget"