"""Tier 7 — Security tests for the clone browser (issue #17).
Validates that the clone browser routes are hardened against common
web-application attacks: path traversal, SQL injection, XSS, and
oversized / malformed input. All cases expect the route to return
a non-500 status (either 200 for graceful degradation or 422 for
FastAPI type-validation rejection — both are acceptable; only 500
indicates an unhandled exception in application code).
Cases:
SEC01 Path traversal in ``cluster`` param — 200, no FS content leak
SEC02 SQL meta-characters in ``tier`` param — 200, no DB error
SEC03 SQL meta-characters in ``top`` param — 422 (type validation) or 200
SEC04 XSS payload in ``cluster`` param — Jinja2 autoescape prevents raw injection
SEC05 members_json with embedded HTML — tags escaped in output
SEC06 Oversized ``cluster`` param (4 096 chars) — 200, not 500
SEC07 Null bytes in ``cluster`` param — sanitised to 200, not 500
"""
from __future__ import annotations
import json
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from muse.core.types import long_id
from musehub.db.musehub_intel_models import MusehubIntelClones
from tests.factories import create_repo
_REF = long_id("a" * 64)
_HASH = long_id("1" * 64)
_XSS_PAYLOADS = [
"",
'">
',
"javascript:alert(1)",
"{{7*7}}",
]
_SQL_PAYLOADS = [
"' OR '1'='1",
"1; DROP TABLE musehub_intel_clones; --",
"UNION SELECT * FROM musehub_repos--",
"' AND SLEEP(5)--",
]
@pytest_asyncio.fixture
async def repo(db_session: AsyncSession) -> MusehubRepo:
r = await create_repo(db_session, owner="secuser", slug="sec-test")
members = json.dumps([
{
"address": "src/a.py::fn_0",
"kind": "function",
"language": "Python",
"body_hash": long_id("a" * 64),
"signature_id": long_id("b" * 64),
"content_id": long_id("a" * 64),
}
])
await db_session.execute(
pg_insert(MusehubIntelClones)
.values(
repo_id=str(r.repo_id),
cluster_hash=_HASH,
tier="exact",
member_count=1,
members_json=members,
ref=_REF,
)
.on_conflict_do_nothing()
)
await db_session.commit()
return r
@pytest_asyncio.fixture
async def xss_repo(db_session: AsyncSession) -> tuple[MusehubRepo, str]:
"""A repo whose cluster members_json contains raw HTML tags."""
r = await create_repo(db_session, owner="secuser", slug="sec-xss")
members = json.dumps([
{
"address": '::fn_0',
"kind": "evil",
"language": "Python",
"body_hash": long_id("a" * 64),
"signature_id": long_id("b" * 64),
"content_id": long_id("a" * 64),
}
])
h = long_id("x" * 64)
await db_session.execute(
pg_insert(MusehubIntelClones)
.values(
repo_id=str(r.repo_id),
cluster_hash=h,
tier="exact",
member_count=1,
members_json=members,
ref=_REF,
)
.on_conflict_do_nothing()
)
await db_session.commit()
return r, h
class TestClonesSecurity:
@pytest.mark.asyncio
@pytest.mark.parametrize("traversal", [
"../../../etc/passwd",
"sha256:../secret",
"../../../../proc/self/environ",
"%2e%2e%2f%2e%2e%2fetc%2fpasswd",
])
async def test_SEC01_path_traversal_in_cluster_param(
self, client: AsyncClient, repo: MusehubRepo, traversal: str
) -> None:
"""Path traversal sequences in cluster param return 200 with no FS content."""
r = await client.get(
f"/secuser/sec-test/intel/clones/detail?cluster={traversal}"
)
assert r.status_code == 200
body = r.text
# Ensure no filesystem content was leaked — not that the path isn't echoed
assert "root:x:" not in body
assert "UID=" not in body
@pytest.mark.asyncio
@pytest.mark.parametrize("payload", _SQL_PAYLOADS)
async def test_SEC02_sql_injection_in_tier_param(
self, client: AsyncClient, repo: MusehubRepo, payload: str
) -> None:
"""SQL meta-characters in tier param are sanitised — 200, no DB error."""
r = await client.get(
f"/secuser/sec-test/intel/clones",
params={"tier": payload},
)
assert r.status_code == 200
@pytest.mark.asyncio
@pytest.mark.parametrize("payload", [
"99999999999999999999",
"0; DROP TABLE musehub_repos; --",
"-1",
"1 UNION SELECT 1--",
])
async def test_SEC03_sql_injection_in_top_param(
self, client: AsyncClient, repo: MusehubRepo, payload: str
) -> None:
"""SQL injection / overflow in top param is rejected by type validation or clamped."""
r = await client.get(
f"/secuser/sec-test/intel/clones",
params={"top": payload},
)
# 422 = FastAPI type validation rejected the non-integer (good)
# 200 = value was clamped to a valid top (also good)
# 500 = unhandled exception (bad — must never happen)
assert r.status_code in (200, 422), (
f"Expected 200 or 422, got {r.status_code}"
)
@pytest.mark.asyncio
@pytest.mark.parametrize("xss", _XSS_PAYLOADS)
async def test_SEC04_xss_in_cluster_param(
self, client: AsyncClient, repo: MusehubRepo, xss: str
) -> None:
"""XSS payloads in cluster param are HTML-escaped by Jinja2 autoescape."""
r = await client.get(
f"/secuser/sec-test/intel/clones/detail",
params={"cluster": xss},
)
assert r.status_code == 200
body = r.text
# Raw payload must not appear unescaped — Jinja2 autoescape converts < and > to entities
assert " tag from members_json must not appear unescaped
assert "" not in body
assert "evil" not in body
@pytest.mark.asyncio
async def test_SEC06_oversized_cluster_param_returns_200(
self, client: AsyncClient, repo: MusehubRepo
) -> None:
"""A 4 096-character cluster param is handled gracefully — 200, not 500."""
huge = long_id("a" * (4096 - 7))
r = await client.get(
f"/secuser/sec-test/intel/clones/detail?cluster={huge}"
)
assert r.status_code == 200
@pytest.mark.asyncio
async def test_SEC07_null_bytes_in_cluster_param(
self, client: AsyncClient, repo: MusehubRepo
) -> None:
"""Null bytes in cluster param are stripped — 200, not 500."""
null_cluster = "sha256:\x001234\x00abcd"
r = await client.get(
f"/secuser/sec-test/intel/clones/detail",
params={"cluster": null_cluster},
)
assert r.status_code == 200