"""Tests for BotThrottleMiddleware — agent-first principal model. Core invariants: - MSign-authenticated requests bypass UA checks entirely. - GET / HEAD requests bypass UA checks — public reads are always allowed. - UA-based blocking applies only to unauthenticated non-GET/HEAD requests. """ from __future__ import annotations import pytest from httpx import AsyncClient # --------------------------------------------------------------------------- # Authenticated requests — always pass through regardless of UA # --------------------------------------------------------------------------- async def test_msign_with_curl_ua_passes(client: AsyncClient) -> None: """MSign-authenticated curl calls must not be blocked. Developers and agents routinely test with curl. Once they sign the request, the UA is irrelevant — identity is proven cryptographically. """ resp = await client.get( "/healthz", headers={ "User-Agent": "curl/8.4.0", "Authorization": "MSign handle=\"gabriel\" alg=\"ed25519\" ts=1234567890 sig=\"fakesig\"", }, ) # Must not be 429 — may be 401/403 (invalid sig) but never bot-blocked assert resp.status_code != 429 async def test_msign_with_python_requests_ua_passes(client: AsyncClient) -> None: """MSign-authenticated python-requests calls must not be blocked.""" resp = await client.get( "/healthz", headers={ "User-Agent": "python-requests/2.31.0", "Authorization": "MSign handle=\"agentception-abc123\" alg=\"ed25519\" ts=1234567890 sig=\"fakesig\"", }, ) assert resp.status_code != 429 async def test_msign_with_go_ua_passes(client: AsyncClient) -> None: """MSign-authenticated Go HTTP client calls must not be blocked.""" resp = await client.get( "/healthz", headers={ "User-Agent": "Go-http-client/1.1", "Authorization": "MSign handle=\"some-agent\" alg=\"ed25519\" ts=1234567890 sig=\"fakesig\"", }, ) assert resp.status_code != 429 async def test_msign_with_no_ua_passes(client: AsyncClient) -> None: """MSign-authenticated requests with no UA must not be blocked. An agent that omits the UA header entirely is still authenticated. """ resp = await client.get( "/healthz", headers={ "Authorization": "MSign handle=\"agent-42\" alg=\"ed25519\" ts=1234567890 sig=\"fakesig\"", }, ) assert resp.status_code != 429 # --------------------------------------------------------------------------- # GET requests — always pass through regardless of UA (public reads are open) # --------------------------------------------------------------------------- async def test_get_with_curl_ua_passes(client: AsyncClient) -> None: """GET with curl UA must not be blocked. curl is the canonical tool for reading public pages and testing APIs. GET is a safe read-only method — UA checks add no meaningful protection here since slowapi already caps request rates. """ resp = await client.get( "/api/identities", headers={"User-Agent": "curl/8.4.0"}, ) assert resp.status_code != 429 async def test_get_with_no_ua_passes(client: AsyncClient) -> None: """GET with no UA must not be blocked — public reads are unconditionally allowed.""" resp = await client.get("/api/identities", headers={"User-Agent": ""}) assert resp.status_code != 429 async def test_get_with_scanner_ua_passes(client: AsyncClient) -> None: """GET with a scanner UA is not blocked — read-only methods bypass UA checks. Vulnerability scanners doing GETs are irritating but not dangerous; slowapi handles their rate. Blocking GETs would break legitimate tooling. """ resp = await client.get( "/api/identities", headers={"User-Agent": "sqlmap/1.7"}, ) assert resp.status_code != 429 # --------------------------------------------------------------------------- # Non-GET unauthenticated requests with bad UAs — must be blocked # --------------------------------------------------------------------------- async def test_post_unauthenticated_curl_ua_blocked(client: AsyncClient) -> None: """Unauthenticated POST with curl UA is blocked — write paths require known clients.""" resp = await client.post( "/api/v1/repos", headers={"User-Agent": "curl/8.4.0"}, content=b"{}", ) assert resp.status_code == 429 async def test_post_unauthenticated_python_requests_ua_blocked(client: AsyncClient) -> None: """Unauthenticated POST with python-requests UA is blocked.""" resp = await client.post( "/api/v1/repos", headers={"User-Agent": "python-requests/2.31.0"}, content=b"{}", ) assert resp.status_code == 429 async def test_post_unauthenticated_missing_ua_blocked(client: AsyncClient) -> None: """Unauthenticated POST with no UA is blocked — write paths require a UA.""" resp = await client.post( "/api/v1/repos", headers={"User-Agent": ""}, content=b"{}", ) assert resp.status_code == 429 async def test_post_unauthenticated_scanner_ua_blocked(client: AsyncClient) -> None: """Vulnerability scanners are blocked on write paths.""" for ua in ["sqlmap/1.7", "nikto/2.1.6", "nuclei/3.0", "masscan/1.3"]: resp = await client.post( "/api/v1/repos", headers={"User-Agent": ua}, content=b"{}", ) assert resp.status_code == 429, f"Expected 429 for UA: {ua}" # --------------------------------------------------------------------------- # Exempt paths — always pass through # --------------------------------------------------------------------------- async def test_healthz_passes_with_no_ua(client: AsyncClient) -> None: """/healthz must never be blocked — monitoring probes have minimal UAs.""" resp = await client.get("/healthz") assert resp.status_code != 429 async def test_healthz_passes_with_curl_ua(client: AsyncClient) -> None: """/healthz must pass even with a normally-blocked UA.""" resp = await client.get("/healthz", headers={"User-Agent": "curl/8.4.0"}) assert resp.status_code != 429 # --------------------------------------------------------------------------- # Error message — must be informative, not accusatory # --------------------------------------------------------------------------- async def test_blocked_response_body_is_informative(client: AsyncClient) -> None: """Blocked response on a write path must guide the client toward authentication.""" resp = await client.post( "/api/v1/repos", headers={"User-Agent": "curl/8.4.0"}, content=b"{}", ) assert resp.status_code == 429 body = resp.json() assert "MSign" in body["detail"] or "authenticate" in body["detail"].lower() # --------------------------------------------------------------------------- # Muse CLI UA — always passes unauthenticated (it's a known good client) # --------------------------------------------------------------------------- async def test_muse_cli_ua_passes_unauthenticated(client: AsyncClient) -> None: """The muse CLI UA must not be blocked even without auth. muse CLI sends 'muse/' for unauthenticated pre-flight calls like listing remotes before signing in. """ resp = await client.get( "/api/identities", headers={"User-Agent": "muse/1.0.0"}, ) assert resp.status_code != 429 async def test_browser_ua_passes_unauthenticated(client: AsyncClient) -> None: """Standard browser UAs must pass without auth.""" resp = await client.get( "/api/identities", headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}, ) assert resp.status_code != 429 # --------------------------------------------------------------------------- # Install / uninstall scripts — must be curl-fetchable before muse is installed # --------------------------------------------------------------------------- async def test_install_script_passes_with_curl_ua(client: AsyncClient) -> None: """/install.sh must be fetchable with plain curl — that's its entire purpose. ``curl -fsSL staging.musehub.ai/install.sh | sh`` is the documented install command. Blocking it with 429 breaks the install flow. """ resp = await client.get("/install.sh", headers={"User-Agent": "curl/8.6.0"}) assert resp.status_code != 429 async def test_uninstall_script_passes_with_curl_ua(client: AsyncClient) -> None: """/uninstall.sh must also be fetchable with plain curl.""" resp = await client.get("/uninstall.sh", headers={"User-Agent": "curl/8.6.0"}) assert resp.status_code != 429 async def test_install_script_passes_with_no_ua(client: AsyncClient) -> None: """/install.sh passes even without a UA — some minimal shell environments omit it.""" resp = await client.get("/install.sh", headers={"User-Agent": ""}) assert resp.status_code != 429 # --------------------------------------------------------------------------- # Release binary downloads — fetched by install.sh via curl # --------------------------------------------------------------------------- async def test_release_tarball_passes_with_curl_ua(client: AsyncClient) -> None: """/releases/* must be fetchable with plain curl. install.sh runs ``curl -fsSL .../releases/muse---.tar.gz`` as its second step. If /releases/ is blocked, the install command succeeds on the script fetch but silently fails on the binary download. """ resp = await client.get( "/releases/muse-0.2.0-linux-arm64.tar.gz", headers={"User-Agent": "curl/8.6.0"}, ) assert resp.status_code != 429 async def test_release_tarball_passes_with_no_ua(client: AsyncClient) -> None: """/releases/* passes even without a UA.""" resp = await client.get( "/releases/muse-0.2.0-linux-arm64.tar.gz", headers={"User-Agent": ""}, ) assert resp.status_code != 429