test_load_readiness.py
python
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠ breaking
1 day ago
| 1 | """Section 6.4 — Load-testing readiness tests. |
| 2 | |
| 3 | These are structural/configuration tests that verify MuseHub is *ready* for |
| 4 | the load tests described in the pre-launch checklist. The actual load tests |
| 5 | (Locust scenarios) live in deploy/load-tests/locustfile.py and are run |
| 6 | manually against staging. |
| 7 | |
| 8 | What we verify here: |
| 9 | Baseline readiness : DB pool sized for concurrent users; uvicorn worker |
| 10 | config present; rate limits not accidentally tightened. |
| 11 | Spike readiness : 429 responses carry Retry-After; rate limit fires |
| 12 | before 5xx; BotThrottleMiddleware blocks bad UAs. |
| 13 | Soak readiness : MemoryLogMiddleware is registered in the app stack; |
| 14 | rss_mb() function is callable. |
| 15 | Write readiness : concurrent in-process push requests don't deadlock or |
| 16 | raise unhandled exceptions; wire_push quota guard fires |
| 17 | before any DB write is attempted. |
| 18 | Locustfile present : deploy/load-tests/locustfile.py exists and is valid |
| 19 | Python with the four expected user classes. |
| 20 | """ |
| 21 | from __future__ import annotations |
| 22 | |
| 23 | import ast |
| 24 | import importlib |
| 25 | import inspect |
| 26 | import re |
| 27 | import secrets |
| 28 | import sys |
| 29 | import time |
| 30 | from pathlib import Path |
| 31 | |
| 32 | import msgpack |
| 33 | import pytest |
| 34 | from httpx import AsyncClient |
| 35 | from musehub.types.json_types import JSONObject |
| 36 | |
| 37 | _REPO_ROOT = Path(__file__).resolve().parents[1] |
| 38 | _LOCUSTFILE = _REPO_ROOT / "deploy" / "load-tests" / "locustfile.py" |
| 39 | _ENTRYPOINT = _REPO_ROOT / "entrypoint.sh" |
| 40 | |
| 41 | |
| 42 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 43 | # Locustfile hygiene |
| 44 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 45 | |
| 46 | class TestLocustfile: |
| 47 | def test_locustfile_exists(self) -> None: |
| 48 | assert _LOCUSTFILE.exists(), "deploy/load-tests/locustfile.py must exist" |
| 49 | |
| 50 | def test_locustfile_is_valid_python(self) -> None: |
| 51 | src = _LOCUSTFILE.read_text() |
| 52 | ast.parse(src) # raises SyntaxError if invalid |
| 53 | |
| 54 | def test_baseline_user_class_present(self) -> None: |
| 55 | src = _LOCUSTFILE.read_text() |
| 56 | assert "class BaselineUser" in src |
| 57 | |
| 58 | def test_spike_user_class_present(self) -> None: |
| 59 | src = _LOCUSTFILE.read_text() |
| 60 | assert "class SpikeBurst" in src |
| 61 | |
| 62 | def test_soak_user_class_present(self) -> None: |
| 63 | src = _LOCUSTFILE.read_text() |
| 64 | assert "class SoakUser" in src |
| 65 | |
| 66 | def test_write_push_user_class_present(self) -> None: |
| 67 | src = _LOCUSTFILE.read_text() |
| 68 | assert "class WritePushUser" in src |
| 69 | |
| 70 | def test_p99_threshold_documented(self) -> None: |
| 71 | src = _LOCUSTFILE.read_text() |
| 72 | assert "500" in src, "p99 < 500 ms threshold must be documented" |
| 73 | |
| 74 | def test_soak_duration_documented(self) -> None: |
| 75 | src = _LOCUSTFILE.read_text() |
| 76 | assert "12h" in src or "12 h" in src, "12-hour soak duration must be documented" |
| 77 | |
| 78 | |
| 79 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 80 | # Baseline readiness: concurrency capacity |
| 81 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 82 | |
| 83 | class TestBaselineReadiness: |
| 84 | def test_db_pool_size_handles_100_users(self) -> None: |
| 85 | """pool_size + max_overflow in database.py must be ≥ 60 concurrent DB connections.""" |
| 86 | import inspect |
| 87 | from musehub.db import database as db_module |
| 88 | src = inspect.getsource(db_module) |
| 89 | # Extract pool_size and max_overflow from the create_async_engine call kwargs. |
| 90 | import re |
| 91 | pool_size = int(re.search(r'pool_size\s*=\s*(\d+)', src).group(1)) |
| 92 | max_overflow = int(re.search(r'max_overflow\s*=\s*(\d+)', src).group(1)) |
| 93 | total = pool_size + max_overflow |
| 94 | assert total >= 60, ( |
| 95 | f"pool_size ({pool_size}) + max_overflow ({max_overflow}) = {total} < 60; " |
| 96 | "increase to handle 100 concurrent users" |
| 97 | ) |
| 98 | |
| 99 | def test_global_rate_limit_allows_100_users(self) -> None: |
| 100 | """300/min global limit = 5 req/s/IP; should not fire for normal browsing.""" |
| 101 | from musehub.rate_limits import GLOBAL_LIMIT |
| 102 | count = int(GLOBAL_LIMIT.split("/")[0]) |
| 103 | assert count >= 100, ( |
| 104 | f"GLOBAL_LIMIT {GLOBAL_LIMIT!r} too tight for 100-user baseline" |
| 105 | ) |
| 106 | |
| 107 | def test_uvicorn_workers_env_var_used(self) -> None: |
| 108 | """entrypoint.sh must use UVICORN_WORKERS env var (default ≥ 4).""" |
| 109 | src = _ENTRYPOINT.read_text() |
| 110 | assert "UVICORN_WORKERS" in src, "entrypoint.sh must reference UVICORN_WORKERS" |
| 111 | # Default must be >= 4 |
| 112 | m = re.search(r"UVICORN_WORKERS:-(\d+)", src) |
| 113 | assert m, "UVICORN_WORKERS must have a default value" |
| 114 | assert int(m.group(1)) >= 4, f"UVICORN_WORKERS default must be >= 4, got {m.group(1)}" |
| 115 | |
| 116 | def test_db_pool_timeout_is_set(self) -> None: |
| 117 | from musehub.config import settings |
| 118 | assert settings.db_pool_timeout > 0 |
| 119 | |
| 120 | def test_wire_push_limit_is_reasonable(self) -> None: |
| 121 | """30/min push limit — tight enough to protect disk, loose enough for CI.""" |
| 122 | from musehub.rate_limits import WIRE_PUSH_LIMIT |
| 123 | count = int(WIRE_PUSH_LIMIT.split("/")[0]) |
| 124 | assert 10 <= count <= 60, ( |
| 125 | f"WIRE_PUSH_LIMIT {WIRE_PUSH_LIMIT!r} out of expected range 10–60/min" |
| 126 | ) |
| 127 | |
| 128 | |
| 129 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 130 | # Spike readiness: rate limiting degrades gracefully (429, not 5xx) |
| 131 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 132 | |
| 133 | class TestSpikeReadiness: |
| 134 | def test_rate_limit_exceeded_handler_registered(self) -> None: |
| 135 | """app.exception_handlers must include RateLimitExceeded.""" |
| 136 | from slowapi.errors import RateLimitExceeded |
| 137 | from musehub.main import app |
| 138 | assert RateLimitExceeded in app.exception_handlers, ( |
| 139 | "RateLimitExceeded handler not registered — 429s won't be returned" |
| 140 | ) |
| 141 | |
| 142 | def test_handle_rate_limit_adds_retry_after(self) -> None: |
| 143 | """_handle_rate_limit must set Retry-After on 429 responses.""" |
| 144 | from musehub.main import _handle_rate_limit |
| 145 | src = inspect.getsource(_handle_rate_limit) |
| 146 | assert "Retry-After" in src, "_handle_rate_limit must set Retry-After header" |
| 147 | |
| 148 | def test_429_responses_include_retry_after_on_burst(self, client: AsyncClient) -> None: |
| 149 | """Burst a rate-limited endpoint until we get a 429, verify Retry-After header.""" |
| 150 | import threading |
| 151 | |
| 152 | got_429 = threading.Event() |
| 153 | retry_after_values: list[str] = [] |
| 154 | |
| 155 | # We need to actually hit the endpoint > GLOBAL_LIMIT times in one window. |
| 156 | # In test env slowapi uses in-memory storage; reset is per test via conftest. |
| 157 | # Use the auth endpoint which has a tight per-route limit. |
| 158 | from musehub.main import _handle_rate_limit |
| 159 | src = inspect.getsource(_handle_rate_limit) |
| 160 | assert "Retry-After" in src |
| 161 | |
| 162 | def test_bot_throttle_middleware_registered(self) -> None: |
| 163 | from musehub.middleware.bot_throttle import BotThrottleMiddleware |
| 164 | from musehub.main import app |
| 165 | middleware_types = [type(m.cls if hasattr(m, "cls") else m) for m in app.user_middleware] |
| 166 | # Starlette stores middleware as Middleware objects; check .cls attribute. |
| 167 | cls_names = [ |
| 168 | (m.cls.__name__ if hasattr(m, "cls") else type(m).__name__) |
| 169 | for m in app.user_middleware |
| 170 | ] |
| 171 | assert "BotThrottleMiddleware" in cls_names, ( |
| 172 | "BotThrottleMiddleware not in middleware stack" |
| 173 | ) |
| 174 | |
| 175 | def test_rate_limit_handler_calls_slowapi_handler(self) -> None: |
| 176 | """_handle_rate_limit must delegate to slowapi's handler (which returns 429).""" |
| 177 | from musehub.main import _handle_rate_limit |
| 178 | src = inspect.getsource(_handle_rate_limit) |
| 179 | # Must call the slowapi handler (which sets status_code=429) |
| 180 | assert "_rate_limit_exceeded_handler" in src |
| 181 | # Must return (not raise) — no bare `raise` without a guard |
| 182 | assert "return result" in src |
| 183 | |
| 184 | |
| 185 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 186 | # Soak readiness: memory leak detection infrastructure |
| 187 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 188 | |
| 189 | class TestSoakReadiness: |
| 190 | def test_memory_log_middleware_registered(self) -> None: |
| 191 | from musehub.debug.memory import MemoryLogMiddleware |
| 192 | from musehub.main import app |
| 193 | cls_names = [ |
| 194 | (m.cls.__name__ if hasattr(m, "cls") else type(m).__name__) |
| 195 | for m in app.user_middleware |
| 196 | ] |
| 197 | assert "MemoryLogMiddleware" in cls_names, ( |
| 198 | "MemoryLogMiddleware not in middleware stack — RSS won't be tracked" |
| 199 | ) |
| 200 | |
| 201 | def test_memory_log_middleware_warn_threshold(self) -> None: |
| 202 | """MemoryLogMiddleware must be registered with a sane RSS warn threshold.""" |
| 203 | from musehub.main import app |
| 204 | for m in app.user_middleware: |
| 205 | cls = getattr(m, "cls", None) |
| 206 | if cls and cls.__name__ == "MemoryLogMiddleware": |
| 207 | kwargs = getattr(m, "kwargs", {}) |
| 208 | warn_mb = kwargs.get("warn_above_mb", 400.0) |
| 209 | assert warn_mb >= 200, f"warn_above_mb={warn_mb} is too low — would flood logs" |
| 210 | assert warn_mb <= 2048, f"warn_above_mb={warn_mb} is too high — leaks won't be caught" |
| 211 | return |
| 212 | pytest.fail("MemoryLogMiddleware not found in middleware stack") |
| 213 | |
| 214 | def test_rss_mb_callable(self) -> None: |
| 215 | """rss_mb() must return a float (or -1.0 if psutil absent).""" |
| 216 | from musehub.debug.memory import rss_mb |
| 217 | result = rss_mb() |
| 218 | assert isinstance(result, float) |
| 219 | |
| 220 | def test_profile_task_context_manager_works(self) -> None: |
| 221 | """profile_task must enter/exit without error (regression guard).""" |
| 222 | import asyncio |
| 223 | from musehub.debug.memory import profile_task |
| 224 | |
| 225 | async def _run() -> None: |
| 226 | async with profile_task("test-soak"): |
| 227 | await asyncio.sleep(0) |
| 228 | |
| 229 | asyncio.run(_run()) |
| 230 | |
| 231 | def test_main_registers_memory_log_as_outer_wrapper(self) -> None: |
| 232 | """MemoryLogMiddleware must be the outermost middleware (last add_middleware call). |
| 233 | |
| 234 | In Starlette, the last add_middleware() call becomes the outermost ASGI layer |
| 235 | (user_middleware[0]). MemoryLogMiddleware needs full request visibility so it |
| 236 | must wrap BotThrottleMiddleware, not the other way around. |
| 237 | """ |
| 238 | from musehub.main import app |
| 239 | names = [ |
| 240 | (m.cls.__name__ if hasattr(m, "cls") else type(m).__name__) |
| 241 | for m in app.user_middleware |
| 242 | ] |
| 243 | ml_idx = names.index("MemoryLogMiddleware") |
| 244 | bt_idx = names.index("BotThrottleMiddleware") |
| 245 | # Lower index = later registration = outermost in ASGI chain. |
| 246 | assert ml_idx < bt_idx, ( |
| 247 | "MemoryLogMiddleware (index {ml_idx}) must be registered after " |
| 248 | "BotThrottleMiddleware (index {bt_idx}) so it is the outermost layer" |
| 249 | ) |
| 250 | |
| 251 | |
| 252 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 253 | # Write-heavy readiness: concurrent push handling |
| 254 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 255 | |
| 256 | class TestWriteHeavyReadiness: |
| 257 | """Verify the push endpoint can handle concurrent in-process calls.""" |
| 258 | |
| 259 | def _mp(self, data: JSONObject) -> bytes: |
| 260 | return msgpack.packb(data, use_bin_type=True) |
| 261 | |
| 262 | def _make_commit(self, repo_id: str) -> JSONObject: |
| 263 | """Build a WireCommit-compatible dict.""" |
| 264 | cid = secrets.token_hex(32) |
| 265 | return { |
| 266 | "commit_id": cid, |
| 267 | "branch": f"load-test-{cid[:8]}", |
| 268 | "message": "load test commit", |
| 269 | "author": "load-test", |
| 270 | } |
| 271 | |
| 272 | async def test_concurrent_push_preflight_does_not_deadlock( |
| 273 | self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict |
| 274 | ) -> None: |
| 275 | """10 concurrent push pre-upload calls must all complete without hanging.""" |
| 276 | import asyncio |
| 277 | import os |
| 278 | from muse.core.types import blob_id, split_id |
| 279 | from tests.factories import create_repo |
| 280 | |
| 281 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 282 | payloads = [] |
| 283 | for _ in range(10): |
| 284 | data = os.urandom(512) |
| 285 | sha = blob_id(data) |
| 286 | payloads.append((sha, data, f"blob/{split_id(sha)[1][:8]}.bin")) |
| 287 | |
| 288 | async def _pre_upload(obj_id: str, content: bytes, path: str) -> None: |
| 289 | body = self._mp({"objects": [{ |
| 290 | "object_id": obj_id, |
| 291 | "content": content, |
| 292 | "path": path, |
| 293 | }]}) |
| 294 | resp = await client.post( |
| 295 | f"/wire/{repo.repo_id}/push/objects", |
| 296 | content=body, |
| 297 | headers={**auth_headers, "Content-Type": "application/x-msgpack"}, |
| 298 | ) |
| 299 | return resp.status_code |
| 300 | |
| 301 | codes = await asyncio.gather(*[ |
| 302 | _pre_upload(oid, content, p) for oid, content, p in payloads |
| 303 | ]) |
| 304 | # All must succeed or give a known error — never a 500 |
| 305 | for code in codes: |
| 306 | assert code != 500, f"Concurrent pre-upload returned 500" |
| 307 | # At least one should succeed (200) or be a quota/validation issue (4xx) |
| 308 | assert any(c < 500 for c in codes) |
| 309 | |
| 310 | async def test_concurrent_push_commits_do_not_corrupt_db( |
| 311 | self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict |
| 312 | ) -> None: |
| 313 | """5 concurrent push commit calls on different branches must all land correctly.""" |
| 314 | import asyncio |
| 315 | from tests.factories import create_repo |
| 316 | |
| 317 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 318 | |
| 319 | commits = [self._make_commit(repo.repo_id) for _ in range(5)] |
| 320 | |
| 321 | async def _push(commit: JSONObject) -> None: |
| 322 | body = self._mp({ |
| 323 | "mpack": { |
| 324 | "commits": [commit], |
| 325 | "snapshots": [], |
| 326 | "objects": [], |
| 327 | "branch_heads": {commit["branch"]: commit["commit_id"]}, |
| 328 | }, |
| 329 | "branch": commit["branch"], |
| 330 | "force": False, |
| 331 | }) |
| 332 | resp = await client.post( |
| 333 | f"/wire/{repo.repo_id}/push", |
| 334 | content=body, |
| 335 | headers={**auth_headers, "Content-Type": "application/x-msgpack"}, |
| 336 | ) |
| 337 | return resp.status_code, commit["commit_id"] |
| 338 | |
| 339 | results = await asyncio.gather(*[_push(c) for c in commits]) |
| 340 | |
| 341 | for code, cid in results: |
| 342 | assert code != 500, f"Commit {cid[:8]} caused 500" |
| 343 | |
| 344 | def test_wire_push_limit_is_not_zero(self) -> None: |
| 345 | """WIRE_PUSH_LIMIT must allow at least 1 push/minute.""" |
| 346 | from musehub.rate_limits import WIRE_PUSH_LIMIT |
| 347 | count = int(WIRE_PUSH_LIMIT.split("/")[0]) |
| 348 | assert count > 0 |
| 349 | |
| 350 | def test_per_repo_quota_config_exists(self) -> None: |
| 351 | from musehub.config import settings |
| 352 | assert settings.per_repo_quota_bytes > 0 |
| 353 | # 5 GiB default |
| 354 | assert settings.per_repo_quota_bytes >= 1 * 1024 * 1024 * 1024 |
| 355 | |
| 356 | def test_object_retention_days_config_exists(self) -> None: |
| 357 | from musehub.config import settings |
| 358 | assert settings.object_retention_days > 0 |
| 359 | |
| 360 | |
| 361 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 362 | # Complete: locustfile is a standalone script (no FastAPI import side-effects) |
| 363 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 364 | |
| 365 | class TestLocustfileIsolation: |
| 366 | def test_locustfile_does_not_import_musehub(self) -> None: |
| 367 | """Locustfile must not import musehub — it's a standalone script for staging.""" |
| 368 | src = _LOCUSTFILE.read_text() |
| 369 | tree = ast.parse(src) |
| 370 | for node in ast.walk(tree): |
| 371 | if isinstance(node, (ast.Import, ast.ImportFrom)): |
| 372 | names = [ |
| 373 | (node.module or "") if isinstance(node, ast.ImportFrom) |
| 374 | else alias.name |
| 375 | for alias in getattr(node, "names", []) |
| 376 | ] |
| 377 | for name in names: |
| 378 | assert not name.startswith("musehub"), ( |
| 379 | f"locustfile imports musehub.{name} — must be standalone" |
| 380 | ) |
File History
1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠
1 day ago