"""Tier 7 — Security tests for the clone browser (issue #17). Validates that the clone browser routes are hardened against common web-application attacks: path traversal, SQL injection, XSS, and oversized / malformed input. All cases expect the route to return a non-500 status (either 200 for graceful degradation or 422 for FastAPI type-validation rejection — both are acceptable; only 500 indicates an unhandled exception in application code). Cases: SEC01 Path traversal in ``cluster`` param — 200, no FS content leak SEC02 SQL meta-characters in ``tier`` param — 200, no DB error SEC03 SQL meta-characters in ``top`` param — 422 (type validation) or 200 SEC04 XSS payload in ``cluster`` param — Jinja2 autoescape prevents raw injection SEC05 members_json with embedded HTML — tags escaped in output SEC06 Oversized ``cluster`` param (4 096 chars) — 200, not 500 SEC07 Null bytes in ``cluster`` param — sanitised to 200, not 500 """ from __future__ import annotations import json import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import long_id from musehub.db.musehub_intel_models import MusehubIntelClones from tests.factories import create_repo _REF = long_id("a" * 64) _HASH = long_id("1" * 64) _XSS_PAYLOADS = [ "", '">', "javascript:alert(1)", "{{7*7}}", ] _SQL_PAYLOADS = [ "' OR '1'='1", "1; DROP TABLE musehub_intel_clones; --", "UNION SELECT * FROM musehub_repos--", "' AND SLEEP(5)--", ] @pytest_asyncio.fixture async def repo(db_session: AsyncSession) -> MusehubRepo: r = await create_repo(db_session, owner="secuser", slug="sec-test") members = json.dumps([ { "address": "src/a.py::fn_0", "kind": "function", "language": "Python", "body_hash": long_id("a" * 64), "signature_id": long_id("b" * 64), "content_id": long_id("a" * 64), } ]) await db_session.execute( pg_insert(MusehubIntelClones) .values( repo_id=str(r.repo_id), cluster_hash=_HASH, tier="exact", member_count=1, members_json=members, ref=_REF, ) .on_conflict_do_nothing() ) await db_session.commit() return r @pytest_asyncio.fixture async def xss_repo(db_session: AsyncSession) -> tuple[MusehubRepo, str]: """A repo whose cluster members_json contains raw HTML tags.""" r = await create_repo(db_session, owner="secuser", slug="sec-xss") members = json.dumps([ { "address": '::fn_0', "kind": "evil", "language": "Python", "body_hash": long_id("a" * 64), "signature_id": long_id("b" * 64), "content_id": long_id("a" * 64), } ]) h = long_id("x" * 64) await db_session.execute( pg_insert(MusehubIntelClones) .values( repo_id=str(r.repo_id), cluster_hash=h, tier="exact", member_count=1, members_json=members, ref=_REF, ) .on_conflict_do_nothing() ) await db_session.commit() return r, h class TestClonesSecurity: @pytest.mark.asyncio @pytest.mark.parametrize("traversal", [ "../../../etc/passwd", "sha256:../secret", "../../../../proc/self/environ", "%2e%2e%2f%2e%2e%2fetc%2fpasswd", ]) async def test_SEC01_path_traversal_in_cluster_param( self, client: AsyncClient, repo: MusehubRepo, traversal: str ) -> None: """Path traversal sequences in cluster param return 200 with no FS content.""" r = await client.get( f"/secuser/sec-test/intel/clones/detail?cluster={traversal}" ) assert r.status_code == 200 body = r.text # Ensure no filesystem content was leaked — not that the path isn't echoed assert "root:x:" not in body assert "UID=" not in body @pytest.mark.asyncio @pytest.mark.parametrize("payload", _SQL_PAYLOADS) async def test_SEC02_sql_injection_in_tier_param( self, client: AsyncClient, repo: MusehubRepo, payload: str ) -> None: """SQL meta-characters in tier param are sanitised — 200, no DB error.""" r = await client.get( f"/secuser/sec-test/intel/clones", params={"tier": payload}, ) assert r.status_code == 200 @pytest.mark.asyncio @pytest.mark.parametrize("payload", [ "99999999999999999999", "0; DROP TABLE musehub_repos; --", "-1", "1 UNION SELECT 1--", ]) async def test_SEC03_sql_injection_in_top_param( self, client: AsyncClient, repo: MusehubRepo, payload: str ) -> None: """SQL injection / overflow in top param is rejected by type validation or clamped.""" r = await client.get( f"/secuser/sec-test/intel/clones", params={"top": payload}, ) # 422 = FastAPI type validation rejected the non-integer (good) # 200 = value was clamped to a valid top (also good) # 500 = unhandled exception (bad — must never happen) assert r.status_code in (200, 422), ( f"Expected 200 or 422, got {r.status_code}" ) @pytest.mark.asyncio @pytest.mark.parametrize("xss", _XSS_PAYLOADS) async def test_SEC04_xss_in_cluster_param( self, client: AsyncClient, repo: MusehubRepo, xss: str ) -> None: """XSS payloads in cluster param are HTML-escaped by Jinja2 autoescape.""" r = await client.get( f"/secuser/sec-test/intel/clones/detail", params={"cluster": xss}, ) assert r.status_code == 200 body = r.text # Raw payload must not appear unescaped — Jinja2 autoescape converts < and > to entities assert " tag from members_json must not appear unescaped assert "" not in body assert "evil" not in body @pytest.mark.asyncio async def test_SEC06_oversized_cluster_param_returns_200( self, client: AsyncClient, repo: MusehubRepo ) -> None: """A 4 096-character cluster param is handled gracefully — 200, not 500.""" huge = long_id("a" * (4096 - 7)) r = await client.get( f"/secuser/sec-test/intel/clones/detail?cluster={huge}" ) assert r.status_code == 200 @pytest.mark.asyncio async def test_SEC07_null_bytes_in_cluster_param( self, client: AsyncClient, repo: MusehubRepo ) -> None: """Null bytes in cluster param are stripped — 200, not 500.""" null_cluster = "sha256:\x001234\x00abcd" r = await client.get( f"/secuser/sec-test/intel/clones/detail", params={"cluster": null_cluster}, ) assert r.status_code == 200