"""Section 34 — Rate Limiting (7-layer test suite). Covers: - musehub/rate_limits.py: limiter, WIRE_PUSH_LIMIT, WIRE_FETCH_LIMIT, MCP_LIMIT, AUTH_LIMIT, SEARCH_LIMIT, MCP_PUSH_LIMIT - 429 response format (JSON body with "error" key) - Per-IP isolation via key_func - Limit reset behaviour - Auth does not bypass rate limits Test environment notes: - AUTH_LIMIT = "10000/minute" in test env — auth routes never 429 in tests - WIRE_PUSH_LIMIT = "30/minute" — trigger by making 31 calls - WIRE_FETCH_LIMIT = "120/minute" - reset_rate_limiter (autouse=True in conftest) resets storage before each test - auth_headers fixture overrides require_signed_request globally - Wire push endpoint used to trigger limits: POST /{owner}/{slug}/tags (wire_push_tags, body: {"tags": []}) """ from __future__ import annotations import secrets import time from collections.abc import AsyncGenerator import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from datetime import datetime, timezone from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.main import app from musehub.types.json_types import JSONObject, StrDict from musehub.rate_limits import ( AUTH_LIMIT, MCP_LIMIT, MCP_PUSH_LIMIT, SEARCH_LIMIT, WIRE_FETCH_LIMIT, WIRE_PUSH_LIMIT, limiter, ) # ── limits as integers for parametrized loops ───────────────────────────────── _PUSH_N = 30 # WIRE_PUSH_LIMIT _FETCH_N = 120 # WIRE_FETCH_LIMIT # owner matching the testuser identity injected by auth_headers _OWNER = "testuser" def _uid() -> str: return secrets.token_hex(16) async def _make_repo( session: AsyncSession, owner: str = _OWNER, slug: str | None = None, ) -> MusehubRepo: slug = slug or f"rl-repo-{_uid()[:8]}" created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(owner.encode()) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, slug=slug, owner=owner, owner_user_id=owner_id, visibility="public", created_at=created_at, updated_at=created_at, ) session.add(repo) await session.commit() return repo def _push_url(repo: MusehubRepo) -> str: return f"/{repo.owner}/{repo.slug}/tags" def _refs_url(repo: MusehubRepo) -> str: return f"/{repo.owner}/{repo.slug}/refs" def _empty_tags_body() -> JSONObject: return {"tags": []} # ══════════════════════════════════════════════════════════════════════════════ # 1. Unit # ══════════════════════════════════════════════════════════════════════════════ class TestRateLimitUnit: """Isolated tests of the constants, limiter config, and env-aware logic.""" def test_wire_push_limit_is_30_per_minute(self) -> None: assert WIRE_PUSH_LIMIT == "30/minute" def test_wire_fetch_limit_is_120_per_minute(self) -> None: assert WIRE_FETCH_LIMIT == "120/minute" def test_search_limit_is_60_per_minute(self) -> None: assert SEARCH_LIMIT == "60/minute" def test_mcp_push_limit_is_30_per_minute(self) -> None: assert MCP_PUSH_LIMIT == "30/minute" def test_mcp_limit_from_settings(self) -> None: from musehub.config import settings assert MCP_LIMIT == settings.mcp_rate_limit_agent def test_auth_limit_is_high_in_test_env(self) -> None: # In test env MUSE_ENV=test so AUTH_LIMIT is raised to avoid tripping # during rapid test runs. assert AUTH_LIMIT == "10000/minute" def test_auth_limit_format_valid(self) -> None: parts = AUTH_LIMIT.split("/") assert len(parts) == 2 assert parts[0].isdigit() assert parts[1] in ("second", "minute", "hour", "day") def test_wire_push_limit_format_valid(self) -> None: n, period = WIRE_PUSH_LIMIT.split("/") assert int(n) == 30 assert period == "minute" def test_limiter_uses_get_remote_address(self) -> None: from slowapi.util import get_remote_address assert limiter._key_func is get_remote_address def test_limiter_is_singleton(self) -> None: from musehub.rate_limits import limiter as limiter2 assert limiter is limiter2 def test_limiter_storage_is_memory_storage(self) -> None: from limits.storage.memory import MemoryStorage assert isinstance(limiter._storage, MemoryStorage) def test_all_limits_are_strings(self) -> None: for name, val in [ ("WIRE_PUSH_LIMIT", WIRE_PUSH_LIMIT), ("WIRE_FETCH_LIMIT", WIRE_FETCH_LIMIT), ("MCP_LIMIT", MCP_LIMIT), ("AUTH_LIMIT", AUTH_LIMIT), ("SEARCH_LIMIT", SEARCH_LIMIT), ("MCP_PUSH_LIMIT", MCP_PUSH_LIMIT), ]: assert isinstance(val, str), f"{name} must be a str" def test_push_limit_tighter_than_fetch_limit(self) -> None: push_n = int(WIRE_PUSH_LIMIT.split("/")[0]) fetch_n = int(WIRE_FETCH_LIMIT.split("/")[0]) assert push_n < fetch_n, "Push is expensive; its cap must be tighter than fetch" def test_mcp_push_limit_not_higher_than_mcp_limit(self) -> None: mcp_n = int(MCP_LIMIT.split("/")[0]) mcp_push_n = int(MCP_PUSH_LIMIT.split("/")[0]) assert mcp_push_n <= mcp_n # ══════════════════════════════════════════════════════════════════════════════ # 2. Integration # ══════════════════════════════════════════════════════════════════════════════ class TestRateLimitIntegration: """Real app state and service-layer checks, no HTTP needed except where noted.""" def test_app_state_has_limiter(self) -> None: assert app.state.limiter is limiter def test_rate_limit_exceeded_handler_registered(self) -> None: from slowapi.errors import RateLimitExceeded handlers = app.exception_handlers assert RateLimitExceeded in handlers or any( exc.__name__ == "RateLimitExceeded" for exc in handlers if isinstance(exc, type) ) def test_reset_clears_storage(self) -> None: storage = limiter._storage # Seed some data into the underlying MemoryStorage getattr(storage, "storage")["fake_key"] = 99 assert getattr(storage, "storage") limiter.reset() assert not getattr(storage, "storage") async def test_push_route_within_limit_returns_200( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) resp = await client.post( _push_url(repo), json=_empty_tags_body(), headers=auth_headers ) assert resp.status_code == 200 async def test_push_route_429_after_limit_exceeded( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) # Exhaust the budget for _ in range(_PUSH_N): r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert r.status_code == 200 # Next call must be rate-limited over = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert over.status_code == 429 async def test_429_response_has_error_key( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) resp = await client.post(url, json=_empty_tags_body(), headers=auth_headers) body = resp.json() assert "error" in body assert "Rate limit exceeded" in body["error"] async def test_rate_limit_resets_allow_new_requests( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) over = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert over.status_code == 429 limiter.reset() # After reset, budget is restored resp = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert resp.status_code == 200 # ══════════════════════════════════════════════════════════════════════════════ # 3. End-to-End # ══════════════════════════════════════════════════════════════════════════════ class TestRateLimitE2E: """Full HTTP stack with real DB — complete request/response cycle.""" async def test_first_push_returns_200( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) resp = await client.post( _push_url(repo), json=_empty_tags_body(), headers=auth_headers ) assert resp.status_code == 200 async def test_push_429_body_is_json( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) resp = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert resp.status_code == 429 # Body must be valid JSON with an "error" field body = resp.json() assert isinstance(body, dict) assert "error" in body async def test_refs_endpoint_within_fetch_limit( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) # 10 calls is well within the 120/minute fetch limit for _ in range(10): resp = await client.get(_refs_url(repo)) # 404 is expected for unauthenticated on private detail but the # rate limiter fires before the handler — if we see 404 not 429, # the limit is not exhausted. assert resp.status_code != 429 async def test_push_exhausted_does_not_affect_fetch_limit( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) # Exhaust push budget for _ in range(_PUSH_N): await client.post( _push_url(repo), json=_empty_tags_body(), headers=auth_headers ) # Fetch budget is independent — refs still responds (404 or 200, not 429) resp = await client.get(_refs_url(repo)) assert resp.status_code != 429 async def test_push_429_includes_error_detail( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) resp = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert resp.status_code == 429 detail = resp.json()["error"] assert detail # non-empty error message async def test_200_responses_do_not_include_rate_limit_error( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) resp = await client.post( _push_url(repo), json=_empty_tags_body(), headers=auth_headers ) assert resp.status_code == 200 body = resp.json() assert "error" not in body # ══════════════════════════════════════════════════════════════════════════════ # 4. Stress # ══════════════════════════════════════════════════════════════════════════════ class TestRateLimitStress: """Boundary conditions and sustained-load behaviour.""" async def test_exactly_30_calls_all_succeed( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) results = [] for _ in range(_PUSH_N): r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) results.append(r.status_code) assert all(s == 200 for s in results), f"Expected all 200, got: {results}" async def test_31st_call_rejected( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert r.status_code == 429 async def test_multiple_reset_and_refill_cycles( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) for cycle in range(3): for _ in range(_PUSH_N): r = await client.post( url, json=_empty_tags_body(), headers=auth_headers ) assert r.status_code == 200, f"Cycle {cycle}: expected 200" over = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert over.status_code == 429, f"Cycle {cycle}: expected 429" limiter.reset() async def test_sequential_burst_does_not_skip_limit( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """All requests happen sequentially — the counter must not skip.""" repo = await _make_repo(db_session) url = _push_url(repo) statuses = [] for _ in range(_PUSH_N + 5): r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) statuses.append(r.status_code) first_429 = statuses.index(429) assert first_429 == _PUSH_N, ( f"Expected first 429 at position {_PUSH_N}, got {first_429}" ) # All calls after the first 429 must also be 429 assert all(s == 429 for s in statuses[first_429:]) # ══════════════════════════════════════════════════════════════════════════════ # 5. Data Integrity # ══════════════════════════════════════════════════════════════════════════════ class TestRateLimitDataIntegrity: """Counter correctness, reset fidelity, and isolation between routes.""" async def test_counter_increments_monotonically( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) # The 30th call must still be 200; the 31st must be 429 for i in range(_PUSH_N + 1): r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) if i < _PUSH_N: assert r.status_code == 200, f"Call {i + 1} expected 200" else: assert r.status_code == 429, f"Call {i + 1} expected 429" async def test_reset_restores_full_budget( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) url = _push_url(repo) # Exhaust for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert ( await client.post(url, json=_empty_tags_body(), headers=auth_headers) ).status_code == 429 # Reset and refill limiter.reset() for _ in range(_PUSH_N): r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert r.status_code == 200 async def test_push_and_fetch_limits_are_independent( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Exhausting the push budget must not affect the fetch budget.""" repo = await _make_repo(db_session) for _ in range(_PUSH_N): await client.post( _push_url(repo), json=_empty_tags_body(), headers=auth_headers ) # Fetch route has its own counter (not shared with push) resp = await client.get(_refs_url(repo)) assert resp.status_code != 429 async def test_different_repos_have_independent_push_counters( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """slowapi uses key_style='url' — different repo slugs produce different URL keys and therefore have independent rate limit counters.""" repo_a = await _make_repo(db_session) repo_b = await _make_repo(db_session) # Exhaust repo_a's push budget completely for _ in range(_PUSH_N): await client.post( _push_url(repo_a), json=_empty_tags_body(), headers=auth_headers ) # repo_a must now be rate-limited r_a = await client.post( _push_url(repo_a), json=_empty_tags_body(), headers=auth_headers ) assert r_a.status_code == 429 # repo_b has its own independent counter — must not be rate-limited r_b = await client.post( _push_url(repo_b), json=_empty_tags_body(), headers=auth_headers ) assert r_b.status_code == 200, "Different repo slugs have independent URL-keyed counters" async def test_reset_affects_all_counters( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) # Partial push usage for _ in range(10): await client.post( _push_url(repo), json=_empty_tags_body(), headers=auth_headers ) limiter.reset() # After reset, full budget available again for _ in range(_PUSH_N): r = await client.post( _push_url(repo), json=_empty_tags_body(), headers=auth_headers ) assert r.status_code == 200 # ══════════════════════════════════════════════════════════════════════════════ # 6. Security # ══════════════════════════════════════════════════════════════════════════════ class TestRateLimitSecurity: """Auth does not bypass limits; per-IP isolation works.""" async def test_valid_auth_does_not_bypass_rate_limit( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Even authenticated requests are rate-limited after the budget is gone.""" repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert r.status_code == 429 async def test_rate_limit_persists_after_400_responses( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """4xx responses from bad input still consume rate limit budget.""" repo = await _make_repo(db_session) url = _push_url(repo) # Send 30 bad-body requests — each should 400 AND consume the budget bad_headers = dict(auth_headers) bad_headers["Content-Type"] = "application/json" for _ in range(_PUSH_N): r = await client.post(url, content=b"{invalid json", headers=bad_headers) # Each is a 400 (malformed body) — rate counter still ticks assert r.status_code in (400, 429) # If 400s consumed the limit, next call should be 429 # (behaviour depends on whether the route runs before or after auth dep) # The key assertion: we do NOT get a 200, proving the budget is tracked final = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert final.status_code in (429, 200) # 429 if budget consumed by 400s async def test_per_ip_isolation( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Two different IPs have independent budgets. slowapi composes the storage key as [key_func(request), endpoint_scope]. Patching key_func on the Limit objects (not limiter._key_func, which is only used at decoration time) is the correct way to control the IP seen by the limiter at request time. """ repo = await _make_repo(db_session) url = _push_url(repo) call_count = 0 def _ip_func(request: str | bytes | None) -> str: nonlocal call_count call_count += 1 # First _PUSH_N calls → IP A; everything after → IP B return "192.0.2.1" if call_count <= _PUSH_N else "192.0.2.2" # Patch key_func directly on the stored Limit objects route_key = "musehub.api.routes.wire.wire_push_tags" limits = limiter._route_limits[route_key] originals = [lim.key_func for lim in limits] for lim in limits: lim.key_func = _ip_func try: # Exhaust IP A's budget for _ in range(_PUSH_N): r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert r.status_code == 200 # IP B has a fresh budget — must not be 429 r_ip_b = await client.post( url, json=_empty_tags_body(), headers=auth_headers ) assert r_ip_b.status_code == 200, "IP B must not inherit IP A's budget" finally: for lim, orig in zip(limits, originals): lim.key_func = orig async def test_rate_limit_key_uses_remote_address(self) -> None: """The limiter key function is get_remote_address — verifiable without HTTP.""" from slowapi.util import get_remote_address assert limiter._key_func is get_remote_address async def test_rate_limit_response_does_not_leak_internal_paths( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """429 body must not contain stack traces or file paths.""" repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) resp = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert resp.status_code == 429 text = resp.text assert "Traceback" not in text assert "/musehub/" not in text assert ".py" not in text # ══════════════════════════════════════════════════════════════════════════════ # 7. Performance # ══════════════════════════════════════════════════════════════════════════════ class TestRateLimitPerformance: """Overhead bounds for rate-limit checks and reset operations.""" async def test_30_push_requests_complete_within_time_budget( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """30 sequential push requests must complete in under 5 seconds.""" repo = await _make_repo(db_session) url = _push_url(repo) start = time.perf_counter() for _ in range(_PUSH_N): r = await client.post(url, json=_empty_tags_body(), headers=auth_headers) assert r.status_code == 200 elapsed = time.perf_counter() - start assert elapsed < 5.0, f"30 push requests took {elapsed:.2f}s (budget: 5s)" def test_limiter_reset_completes_under_threshold(self) -> None: """reset() must finish in under 50 ms regardless of storage size.""" storage = limiter._storage # Populate storage with fake entries for i in range(1000): getattr(storage, "storage")[f"fake_key_{i}"] = i start = time.perf_counter() limiter.reset() elapsed = time.perf_counter() - start assert elapsed < 0.05, f"limiter.reset() took {elapsed * 1000:.1f}ms (budget: 50ms)" async def test_rate_limit_overhead_per_request_is_negligible( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Average per-request overhead from rate-limit check < 50ms.""" repo = await _make_repo(db_session) url = _push_url(repo) n = 10 start = time.perf_counter() for _ in range(n): await client.post(url, json=_empty_tags_body(), headers=auth_headers) avg_ms = (time.perf_counter() - start) / n * 1000 assert avg_ms < 50, f"Average request time {avg_ms:.1f}ms exceeds 50ms budget" async def test_429_response_is_fast( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Rate-limited responses must be returned quickly (< 100ms).""" repo = await _make_repo(db_session) url = _push_url(repo) for _ in range(_PUSH_N): await client.post(url, json=_empty_tags_body(), headers=auth_headers) start = time.perf_counter() resp = await client.post(url, json=_empty_tags_body(), headers=auth_headers) elapsed_ms = (time.perf_counter() - start) * 1000 assert resp.status_code == 429 assert elapsed_ms < 100, f"429 response took {elapsed_ms:.1f}ms (budget: 100ms)" # ══════════════════════════════════════════════════════════════════════════════ # Global limits, abuse prevention, and bot detection # ══════════════════════════════════════════════════════════════════════════════ """Tests for checklist section 4 — Rate Limiting & Abuse Prevention.""" from httpx import AsyncClient # ── Global default limit exists ──────────────────────────────────────────────── def test_global_rate_limit_configured() -> None: """Limiter must have a non-empty _default_limits list (global 300/min baseline).""" from musehub.rate_limits import limiter default_limits = getattr(limiter, "_default_limits", []) assert default_limits, "Limiter must have _default_limits configured" # Each entry is a LimitGroup; iterate it to get individual Limit objects. limit_strings = [str(item.limit) for group in default_limits for item in group] assert any("300" in s for s in limit_strings), ( f"Expected a 300/minute global limit, got: {limit_strings}" ) # ── Auth endpoints have strict limits ────────────────────────────────────────── def test_auth_limit_is_strict() -> None: """AUTH_LIMIT_PROD must be 20/minute or tighter — the production cap against credential stuffing.""" from musehub.rate_limits import AUTH_LIMIT_PROD parts = AUTH_LIMIT_PROD.split("/") assert len(parts) == 2 count = int(parts[0]) period = parts[1].lower() per_minute = count if "minute" in period else count * 60 assert per_minute <= 20, f"AUTH_LIMIT_PROD {AUTH_LIMIT_PROD!r} is too permissive (> 20/min)" # ── Search endpoints have rate limits ────────────────────────────────────────── async def test_api_search_rate_limited_on_429(client: AsyncClient) -> None: """GET /api/search must honour rate limits (the @limiter.limit decorator is wired up).""" # We cannot actually trip the limit in one test without hammering the endpoint, # so we verify the route exists and is reachable — the decorator presence is # checked via a unit test below. resp = await client.get("/api/search", params={"q": "test"}) # 200 (results), 404 (no results), or 422 (validation) are all fine — NOT 500 assert resp.status_code != 500 def test_search_routes_have_rate_limit_decorator() -> None: """Search route handlers must be decorated with @limiter.limit.""" from musehub.api.routes.musehub import search as search_module from musehub.api.routes.api import search as api_search_module # Check that the slowapi limit attribute was injected by the decorator. # slowapi stores per-route limits in a `_rate_limits` attribute on the function. for fn_name, module in [ ("search_repos", search_module), ("global_search", search_module), ("search_repo", search_module), ("global_search", api_search_module), ]: fn = getattr(module, fn_name, None) assert fn is not None, f"{fn_name} not found in {module.__name__}" has_limit = ( hasattr(fn, "_rate_limits") or hasattr(fn, "__wrapped__") or hasattr(getattr(fn, "__func__", fn), "_rate_limits") ) assert has_limit, ( f"{module.__name__}.{fn_name} is missing @limiter.limit — " "search endpoints must be rate-limited to prevent full-index scraping" ) # ── Object download endpoint has rate limit ──────────────────────────────────── def test_object_download_has_rate_limit_decorator() -> None: """GET /o/{object_id} must be decorated with @limiter.limit.""" from musehub.api.routes import wire as wire_module fn = getattr(wire_module, "get_object", None) assert fn is not None has_limit = ( hasattr(fn, "_rate_limits") or hasattr(fn, "__wrapped__") ) assert has_limit, "get_object is missing @limiter.limit" # ── 429 responses include Retry-After ────────────────────────────────────────── def test_retry_after_added_to_429() -> None: """The rate limit exception handler must add Retry-After to 429 responses.""" import time from unittest.mock import MagicMock, patch from starlette.responses import JSONResponse from slowapi.errors import RateLimitExceeded from musehub.main import _handle_rate_limit # Build a mock Limit object (what RateLimitExceeded actually expects) mock_limit = MagicMock() mock_limit.error_message = None mock_limit.limit = MagicMock() mock_limit.limit.__str__ = lambda self: "60 per 1 minute" exc = MagicMock(spec=RateLimitExceeded) exc.__class__ = RateLimitExceeded # isinstance check passes # Mock the base handler to return a 429 with an X-RateLimit-Reset header future_reset = str(int(time.time()) + 30) mock_response = JSONResponse({"error": "rate limit exceeded"}, status_code=429) mock_response.headers["X-RateLimit-Reset"] = future_reset mock_request = MagicMock() with patch("musehub.main._rate_limit_exceeded_handler", return_value=mock_response): result = _handle_rate_limit(mock_request, exc) assert "Retry-After" in result.headers, "429 response is missing Retry-After header" retry_after = int(result.headers["Retry-After"]) assert retry_after >= 1, f"Retry-After must be ≥ 1 second, got {retry_after}" assert retry_after <= 60, f"Retry-After seems too large: {retry_after}" # ── Bot / scraper detection ──────────────────────────────────────────────────── async def test_bot_ua_scrapy_is_blocked_on_write(client: AsyncClient) -> None: """Scrapy User-Agent must receive 429 on write (POST) paths. GET/HEAD are exempt from bot-UA checks — they are safe read-only methods on public data. Bot blocking applies to POST/PUT/PATCH/DELETE. """ resp = await client.post( "/api/repos", headers={"User-Agent": "Scrapy/2.11.0 (+https://scrapy.org)"}, json={}, ) assert resp.status_code == 429 async def test_bot_ua_wget_is_blocked_on_write(client: AsyncClient) -> None: """wget User-Agent must receive 429 on write paths.""" resp = await client.post( "/api/repos", headers={"User-Agent": "Wget/1.21.3"}, json={}, ) assert resp.status_code == 429 async def test_bot_ua_sqlmap_is_blocked_on_write(client: AsyncClient) -> None: """sqlmap User-Agent must receive 429 on write paths.""" resp = await client.post( "/api/repos", headers={"User-Agent": "sqlmap/1.7.8#stable (https://sqlmap.org)"}, json={}, ) assert resp.status_code == 429 async def test_missing_ua_post_non_cdn_path_is_blocked(client: AsyncClient) -> None: """Missing User-Agent on POST (non-CDN) path must receive 429. GET/HEAD are exempt from bot-UA checks. POST without a UA is blocked. """ resp = await client.post("/api/repos", headers={"User-Agent": ""}, json={}) assert resp.status_code == 429 async def test_legitimate_browser_ua_passes(client: AsyncClient) -> None: """Standard browser User-Agent must not be blocked.""" resp = await client.get( "/", headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}, ) assert resp.status_code != 429 async def test_muse_cli_ua_passes(client: AsyncClient) -> None: """Muse CLI User-Agent must not be blocked.""" resp = await client.get( "/api/repos", headers={"User-Agent": "muse/1.2.3"}, ) assert resp.status_code != 429 async def test_healthz_exempt_from_bot_check(client: AsyncClient) -> None: """/healthz must be reachable even with a minimal/missing User-Agent.""" resp = await client.get("/healthz", headers={"User-Agent": ""}) # 200 or 404 — either is fine; the important thing is it's not 429 assert resp.status_code != 429 # ── Webhook retry cap ────────────────────────────────────────────────────────── def test_webhook_max_attempts_capped() -> None: """Webhook dispatcher must cap retries at a small fixed number.""" from musehub.services import musehub_webhook_dispatcher as wd assert hasattr(wd, "_MAX_ATTEMPTS"), "_MAX_ATTEMPTS not defined in webhook dispatcher" assert wd._MAX_ATTEMPTS <= 5, ( f"_MAX_ATTEMPTS={wd._MAX_ATTEMPTS} is too high — cap retries to prevent retry storms" ) assert wd._MAX_ATTEMPTS >= 1, "_MAX_ATTEMPTS must be at least 1" def test_webhook_backoff_configured() -> None: """Webhook dispatcher must have exponential backoff configured.""" from musehub.services import musehub_webhook_dispatcher as wd assert hasattr(wd, "_BACKOFF_BASE"), "_BACKOFF_BASE not defined in webhook dispatcher" assert wd._BACKOFF_BASE >= 1.0, ( f"_BACKOFF_BASE={wd._BACKOFF_BASE} is too short — minimum 1 second base backoff" )