gabriel / musehub public

locustfile.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """MuseHub load-test scenarios β€” run with Locust against a live staging instance.
2
3 Install:
4 pip install locust
5
6 Usage (all scenarios share this file; select via --tags or --class):
7
8 # Baseline: 100 concurrent users, read-heavy, verify p99 < 500 ms
9 locust -f locustfile.py --class-picker \
10 --host https://staging.musehub.ai \
11 --users 100 --spawn-rate 10 \
12 --run-time 5m --headless \
13 --html baseline-report.html
14
15 # Spike: ramp to 1000 users in 30 s, hold 60 s β€” expect 429s, no crashes
16 locust -f locustfile.py BaselineUser SpikeBurst \
17 --host https://staging.musehub.ai \
18 --users 1000 --spawn-rate 100 \
19 --run-time 90s --headless \
20 --html spike-report.html
21
22 # Soak: sustained moderate load 12 h β€” watch for memory / connection leaks
23 locust -f locustfile.py SoakUser \
24 --host https://staging.musehub.ai \
25 --users 30 --spawn-rate 2 \
26 --run-time 12h --headless \
27 --html soak-report.html
28
29 # Write-heavy: 50 concurrent push users for 5 min
30 locust -f locustfile.py WritePushUser \
31 --host https://staging.musehub.ai \
32 --users 50 --spawn-rate 5 \
33 --run-time 5m --headless \
34 --html write-report.html
35
36 Environment:
37 MUSEHUB_TOKEN β€” MSign token for authenticated requests
38 MUSEHUB_OWNER β€” repo owner (default: gabriel)
39 MUSEHUB_REPO β€” target repo slug (default: muse)
40 PUSH_OBJECT_KB β€” size of synthetic push payload in KiB (default: 4)
41
42 Success criteria:
43 Baseline : p99 < 500 ms, error rate < 0.1 %
44 Spike : no 5xx; 429s expected and counted, not failures; Retry-After present
45 Soak : RSS growth < 50 MiB over 12 h (check via /debug/memory if enabled)
46 Write : no object-store corruption; DB row counts consistent post-run
47 """
48 from __future__ import annotations
49
50 import os
51 import time
52 import uuid
53
54 from muse.core.types import blob_id
55
56 import msgpack
57 from locust import HttpUser, between, constant, events, task
58
59 # ── config from environment ───────────────────────────────────────────────────
60 _TOKEN = os.getenv("MUSEHUB_TOKEN", "")
61 _OWNER = os.getenv("MUSEHUB_OWNER", "gabriel")
62 _REPO = os.getenv("MUSEHUB_REPO", "muse")
63 _PUSH_KB = int(os.getenv("PUSH_OBJECT_KB", "4"))
64
65
66 def _auth_headers() -> dict[str, str]:
67 if _TOKEN:
68 return {"Authorization": f"Bearer {_TOKEN}"}
69 return {}
70
71
72 def _mp(data: object) -> bytes:
73 return msgpack.packb(data, use_bin_type=True)
74
75
76 def _make_object(size_kb: int = _PUSH_KB) -> tuple[str, bytes]:
77 """Return (object_id, raw_bytes) for a synthetic blob of ``size_kb`` KiB.
78
79 ``object_id`` uses the canonical ``sha256:<hex>`` form.
80 """
81 payload = os.urandom(size_kb * 1024)
82 return blob_id(payload), payload
83
84
85 # ── baseline: read-heavy, mixed public endpoints ──────────────────────────────
86
87 class BaselineUser(HttpUser):
88 """Simulates a normal human browsing repos, commits, and issues.
89
90 Target: p99 < 500 ms at 100 concurrent users.
91 """
92
93 wait_time = between(0.5, 2.0)
94 weight = 8 # 80 % of users in mixed runs
95
96 def on_start(self) -> None:
97 self._owner = _OWNER
98 self._repo = _REPO
99
100 @task(5)
101 def view_repo_home(self) -> None:
102 self.client.get(f"/{self._owner}/{self._repo}", name="/owner/repo")
103
104 @task(4)
105 def list_commits(self) -> None:
106 self.client.get(
107 f"/{self._owner}/{self._repo}/commits",
108 name="/owner/repo/commits",
109 )
110
111 @task(3)
112 def list_issues(self) -> None:
113 self.client.get(
114 f"/{self._owner}/{self._repo}/issues",
115 name="/owner/repo/issues",
116 )
117
118 @task(2)
119 def view_proposals(self) -> None:
120 self.client.get(
121 f"/{self._owner}/{self._repo}/proposals",
122 name="/owner/repo/proposals",
123 )
124
125 @task(2)
126 def browse_tree(self) -> None:
127 self.client.get(
128 f"/{self._owner}/{self._repo}/tree/main",
129 name="/owner/repo/tree/ref",
130 )
131
132 @task(1)
133 def api_repo_info(self) -> None:
134 self.client.get(
135 f"/api/v1/repos/{self._owner}/{self._repo}",
136 name="/api/v1/repos/owner/repo",
137 headers=_auth_headers(),
138 )
139
140 @task(1)
141 def api_list_commits(self) -> None:
142 self.client.get(
143 f"/api/v1/repos/{self._owner}/{self._repo}/commits",
144 name="/api/v1/repos/owner/repo/commits",
145 headers=_auth_headers(),
146 )
147
148
149 # ── spike: intentional rate-limit testing ────────────────────────────────────
150
151 class SpikeBurst(HttpUser):
152 """Hammers a single lightweight endpoint to provoke 429s.
153
154 Expect: 429 responses with Retry-After header, no 5xx.
155 """
156
157 wait_time = constant(0) # fire as fast as possible
158 weight = 2 # 20 % of users in mixed runs
159
160 @task
161 def burst_search(self) -> None:
162 with self.client.get(
163 "/api/v1/search",
164 params={"q": "test"},
165 name="/api/v1/search [burst]",
166 catch_response=True,
167 headers=_auth_headers(),
168 ) as resp:
169 if resp.status_code == 429:
170 assert "retry-after" in resp.headers, "429 missing Retry-After"
171 resp.success() # 429 is expected, not a failure
172 elif resp.status_code >= 500:
173 resp.failure(f"Server error {resp.status_code}")
174
175
176 # ── soak: sustained moderate load ─────────────────────────────────────────────
177
178 class SoakUser(HttpUser):
179 """Steady-state moderate load for 12-hour leak detection.
180
181 Watch metrics:
182 - RSS growth via /debug/memory (if MUSEHUB_DEBUG_MEMORY is enabled on staging)
183 - DB connection count (pg_stat_activity)
184 - Error rate must stay < 0.05 %
185 """
186
187 wait_time = between(2.0, 5.0)
188
189 def on_start(self) -> None:
190 self._owner = _OWNER
191 self._repo = _REPO
192
193 @task(4)
194 def view_repo(self) -> None:
195 self.client.get(f"/{self._owner}/{self._repo}", name="/owner/repo [soak]")
196
197 @task(2)
198 def list_commits(self) -> None:
199 self.client.get(
200 f"/{self._owner}/{self._repo}/commits",
201 name="/owner/repo/commits [soak]",
202 )
203
204 @task(1)
205 def api_health(self) -> None:
206 # Lightweight endpoint to verify no connection leak
207 self.client.get(
208 "/api/v1/openapi.json",
209 name="/api/v1/openapi.json [soak]",
210 )
211
212
213 # ── write-heavy: 50 concurrent push users ─────────────────────────────────────
214
215 class WritePushUser(HttpUser):
216 """Simulates concurrent muse push clients.
217
218 Each user: pre-upload one object, then push a commit referencing it.
219 Verify: no 5xx, no object corruption, DB row counts consistent.
220 """
221
222 wait_time = between(1.0, 3.0)
223
224 def on_start(self) -> None:
225 self._owner = _OWNER
226 self._repo = _REPO
227 self._headers = {
228 **_auth_headers(),
229 "Content-Type": "application/x-msgpack",
230 }
231
232 # Resolve repo_id once per virtual user
233 resp = self.client.get(f"/api/v1/repos/{self._owner}/{self._repo}")
234 if resp.status_code == 200:
235 self._repo_id = resp.json().get("repo_id", "")
236 else:
237 self._repo_id = ""
238
239 @task
240 def push_one_commit(self) -> None:
241 if not self._repo_id:
242 return
243
244 obj_id, obj_bytes = _make_object()
245 commit_id = blob_id(os.urandom(32))
246
247 # Phase 1 β€” pre-upload object
248 pre = self.client.post(
249 f"/wire/{self._repo_id}/push/objects",
250 data=_mp({"objects": [{
251 "object_id": obj_id,
252 "size_bytes": len(obj_bytes),
253 "path": f"blob/{commit_id}.bin",
254 "repo_id": self._repo_id,
255 }]}),
256 headers=self._headers,
257 name="/wire/push/objects [write]",
258 )
259 if pre.status_code not in (200, 409): # 409 = already exists
260 return
261
262 # Phase 2 β€” push commit
263 self.client.post(
264 f"/wire/{self._repo_id}/push",
265 data=_mp({
266 "commits": [{
267 "commit_id": commit_id,
268 "parent_ids": [],
269 "branch": "load-test",
270 "message": f"load-test commit {commit_id}",
271 "author": "load-test",
272 "timestamp": int(time.time()),
273 "snapshot": {},
274 "tags": [],
275 }],
276 "snapshots": [],
277 "refs": {"load-test": commit_id},
278 }),
279 headers=self._headers,
280 name="/wire/push [write]",
281 )
282
283
284 # ── event hooks: print summary thresholds after run ──────────────────────────
285
286 @events.quitting.add_listener
287 def _check_thresholds(environment, **kwargs): # type: ignore[no-untyped-def]
288 stats = environment.stats.total
289 if stats.num_requests == 0:
290 return
291
292 p99_ms = stats.get_response_time_percentile(0.99)
293 error_pct = 100 * stats.num_failures / stats.num_requests
294
295 print("\n── Load test summary ────────────────────────────────")
296 print(f" Requests : {stats.num_requests}")
297 print(f" Failures : {stats.num_failures} ({error_pct:.2f} %)")
298 print(f" p50 : {stats.get_response_time_percentile(0.50):.0f} ms")
299 print(f" p95 : {stats.get_response_time_percentile(0.95):.0f} ms")
300 print(f" p99 : {p99_ms:.0f} ms")
301 print("─────────────────────────────────────────────────────")
302
303 if p99_ms > 500:
304 print(f" ⚠️ p99 {p99_ms:.0f} ms EXCEEDS 500 ms target")
305 environment.process_exit_code = 1
306 else:
307 print(f" βœ… p99 {p99_ms:.0f} ms within 500 ms target")
308
309 if error_pct > 0.1:
310 print(f" ⚠️ error rate {error_pct:.2f} % EXCEEDS 0.1 % target")
311 environment.process_exit_code = 1
312 else:
313 print(f" βœ… error rate {error_pct:.2f} % within 0.1 % target")