"""MuseHub load-test scenarios — run with Locust against a live staging instance. Install: pip install locust Usage (all scenarios share this file; select via --tags or --class): # Baseline: 100 concurrent users, read-heavy, verify p99 < 500 ms locust -f locustfile.py --class-picker \ --host https://staging.musehub.ai \ --users 100 --spawn-rate 10 \ --run-time 5m --headless \ --html baseline-report.html # Spike: ramp to 1000 users in 30 s, hold 60 s — expect 429s, no crashes locust -f locustfile.py BaselineUser SpikeBurst \ --host https://staging.musehub.ai \ --users 1000 --spawn-rate 100 \ --run-time 90s --headless \ --html spike-report.html # Soak: sustained moderate load 12 h — watch for memory / connection leaks locust -f locustfile.py SoakUser \ --host https://staging.musehub.ai \ --users 30 --spawn-rate 2 \ --run-time 12h --headless \ --html soak-report.html # Write-heavy: 50 concurrent push users for 5 min locust -f locustfile.py WritePushUser \ --host https://staging.musehub.ai \ --users 50 --spawn-rate 5 \ --run-time 5m --headless \ --html write-report.html Environment: MUSEHUB_TOKEN — MSign token for authenticated requests MUSEHUB_OWNER — repo owner (default: gabriel) MUSEHUB_REPO — target repo slug (default: muse) PUSH_OBJECT_KB — size of synthetic push payload in KiB (default: 4) Success criteria: Baseline : p99 < 500 ms, error rate < 0.1 % Spike : no 5xx; 429s expected and counted, not failures; Retry-After present Soak : RSS growth < 50 MiB over 12 h (check via /debug/memory if enabled) Write : no object-store corruption; DB row counts consistent post-run """ from __future__ import annotations import os import time import uuid from muse.core.types import blob_id import msgpack from locust import HttpUser, between, constant, events, task # ── config from environment ─────────────────────────────────────────────────── _TOKEN = os.getenv("MUSEHUB_TOKEN", "") _OWNER = os.getenv("MUSEHUB_OWNER", "gabriel") _REPO = os.getenv("MUSEHUB_REPO", "muse") _PUSH_KB = int(os.getenv("PUSH_OBJECT_KB", "4")) def _auth_headers() -> dict[str, str]: if _TOKEN: return {"Authorization": f"Bearer {_TOKEN}"} return {} def _mp(data: object) -> bytes: return msgpack.packb(data, use_bin_type=True) def _make_object(size_kb: int = _PUSH_KB) -> tuple[str, bytes]: """Return (object_id, raw_bytes) for a synthetic blob of ``size_kb`` KiB. ``object_id`` uses the canonical ``sha256:`` form. """ payload = os.urandom(size_kb * 1024) return blob_id(payload), payload # ── baseline: read-heavy, mixed public endpoints ────────────────────────────── class BaselineUser(HttpUser): """Simulates a normal human browsing repos, commits, and issues. Target: p99 < 500 ms at 100 concurrent users. """ wait_time = between(0.5, 2.0) weight = 8 # 80 % of users in mixed runs def on_start(self) -> None: self._owner = _OWNER self._repo = _REPO @task(5) def view_repo_home(self) -> None: self.client.get(f"/{self._owner}/{self._repo}", name="/owner/repo") @task(4) def list_commits(self) -> None: self.client.get( f"/{self._owner}/{self._repo}/commits", name="/owner/repo/commits", ) @task(3) def list_issues(self) -> None: self.client.get( f"/{self._owner}/{self._repo}/issues", name="/owner/repo/issues", ) @task(2) def view_proposals(self) -> None: self.client.get( f"/{self._owner}/{self._repo}/proposals", name="/owner/repo/proposals", ) @task(2) def browse_tree(self) -> None: self.client.get( f"/{self._owner}/{self._repo}/tree/main", name="/owner/repo/tree/ref", ) @task(1) def api_repo_info(self) -> None: self.client.get( f"/api/v1/repos/{self._owner}/{self._repo}", name="/api/v1/repos/owner/repo", headers=_auth_headers(), ) @task(1) def api_list_commits(self) -> None: self.client.get( f"/api/v1/repos/{self._owner}/{self._repo}/commits", name="/api/v1/repos/owner/repo/commits", headers=_auth_headers(), ) # ── spike: intentional rate-limit testing ──────────────────────────────────── class SpikeBurst(HttpUser): """Hammers a single lightweight endpoint to provoke 429s. Expect: 429 responses with Retry-After header, no 5xx. """ wait_time = constant(0) # fire as fast as possible weight = 2 # 20 % of users in mixed runs @task def burst_search(self) -> None: with self.client.get( "/api/v1/search", params={"q": "test"}, name="/api/v1/search [burst]", catch_response=True, headers=_auth_headers(), ) as resp: if resp.status_code == 429: assert "retry-after" in resp.headers, "429 missing Retry-After" resp.success() # 429 is expected, not a failure elif resp.status_code >= 500: resp.failure(f"Server error {resp.status_code}") # ── soak: sustained moderate load ───────────────────────────────────────────── class SoakUser(HttpUser): """Steady-state moderate load for 12-hour leak detection. Watch metrics: - RSS growth via /debug/memory (if MUSEHUB_DEBUG_MEMORY is enabled on staging) - DB connection count (pg_stat_activity) - Error rate must stay < 0.05 % """ wait_time = between(2.0, 5.0) def on_start(self) -> None: self._owner = _OWNER self._repo = _REPO @task(4) def view_repo(self) -> None: self.client.get(f"/{self._owner}/{self._repo}", name="/owner/repo [soak]") @task(2) def list_commits(self) -> None: self.client.get( f"/{self._owner}/{self._repo}/commits", name="/owner/repo/commits [soak]", ) @task(1) def api_health(self) -> None: # Lightweight endpoint to verify no connection leak self.client.get( "/api/v1/openapi.json", name="/api/v1/openapi.json [soak]", ) # ── write-heavy: 50 concurrent push users ───────────────────────────────────── class WritePushUser(HttpUser): """Simulates concurrent muse push clients. Each user: pre-upload one object, then push a commit referencing it. Verify: no 5xx, no object corruption, DB row counts consistent. """ wait_time = between(1.0, 3.0) def on_start(self) -> None: self._owner = _OWNER self._repo = _REPO self._headers = { **_auth_headers(), "Content-Type": "application/x-msgpack", } # Resolve repo_id once per virtual user resp = self.client.get(f"/api/v1/repos/{self._owner}/{self._repo}") if resp.status_code == 200: self._repo_id = resp.json().get("repo_id", "") else: self._repo_id = "" @task def push_one_commit(self) -> None: if not self._repo_id: return obj_id, obj_bytes = _make_object() commit_id = blob_id(os.urandom(32)) # Phase 1 — pre-upload object pre = self.client.post( f"/wire/{self._repo_id}/push/objects", data=_mp({"objects": [{ "object_id": obj_id, "size_bytes": len(obj_bytes), "path": f"blob/{commit_id}.bin", "repo_id": self._repo_id, }]}), headers=self._headers, name="/wire/push/objects [write]", ) if pre.status_code not in (200, 409): # 409 = already exists return # Phase 2 — push commit self.client.post( f"/wire/{self._repo_id}/push", data=_mp({ "commits": [{ "commit_id": commit_id, "parent_ids": [], "branch": "load-test", "message": f"load-test commit {commit_id}", "author": "load-test", "timestamp": int(time.time()), "snapshot": {}, "tags": [], }], "snapshots": [], "refs": {"load-test": commit_id}, }), headers=self._headers, name="/wire/push [write]", ) # ── event hooks: print summary thresholds after run ────────────────────────── @events.quitting.add_listener def _check_thresholds(environment, **kwargs): # type: ignore[no-untyped-def] stats = environment.stats.total if stats.num_requests == 0: return p99_ms = stats.get_response_time_percentile(0.99) error_pct = 100 * stats.num_failures / stats.num_requests print("\n── Load test summary ────────────────────────────────") print(f" Requests : {stats.num_requests}") print(f" Failures : {stats.num_failures} ({error_pct:.2f} %)") print(f" p50 : {stats.get_response_time_percentile(0.50):.0f} ms") print(f" p95 : {stats.get_response_time_percentile(0.95):.0f} ms") print(f" p99 : {p99_ms:.0f} ms") print("─────────────────────────────────────────────────────") if p99_ms > 500: print(f" ⚠️ p99 {p99_ms:.0f} ms EXCEEDS 500 ms target") environment.process_exit_code = 1 else: print(f" ✅ p99 {p99_ms:.0f} ms within 500 ms target") if error_pct > 0.1: print(f" ⚠️ error rate {error_pct:.2f} % EXCEEDS 0.1 % target") environment.process_exit_code = 1 else: print(f" ✅ error rate {error_pct:.2f} % within 0.1 % target")