test_clones_security.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
20 days ago
| 1 | """Tier 7 — Security tests for the clone browser (issue #17). |
| 2 | |
| 3 | Validates that the clone browser routes are hardened against common |
| 4 | web-application attacks: path traversal, SQL injection, XSS, and |
| 5 | oversized / malformed input. All cases expect the route to return |
| 6 | a non-500 status (either 200 for graceful degradation or 422 for |
| 7 | FastAPI type-validation rejection — both are acceptable; only 500 |
| 8 | indicates an unhandled exception in application code). |
| 9 | |
| 10 | Cases: |
| 11 | SEC01 Path traversal in ``cluster`` param — 200, no FS content leak |
| 12 | SEC02 SQL meta-characters in ``tier`` param — 200, no DB error |
| 13 | SEC03 SQL meta-characters in ``top`` param — 422 (type validation) or 200 |
| 14 | SEC04 XSS payload in ``cluster`` param — Jinja2 autoescape prevents raw injection |
| 15 | SEC05 members_json with embedded HTML — tags escaped in output |
| 16 | SEC06 Oversized ``cluster`` param (4 096 chars) — 200, not 500 |
| 17 | SEC07 Null bytes in ``cluster`` param — sanitised to 200, not 500 |
| 18 | """ |
| 19 | from __future__ import annotations |
| 20 | |
| 21 | import json |
| 22 | |
| 23 | import pytest |
| 24 | import pytest_asyncio |
| 25 | from httpx import AsyncClient |
| 26 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 27 | from sqlalchemy.ext.asyncio import AsyncSession |
| 28 | |
| 29 | from muse.core.types import long_id |
| 30 | from musehub.db.musehub_intel_models import MusehubIntelClones |
| 31 | from tests.factories import create_repo |
| 32 | |
| 33 | _REF = long_id("a" * 64) |
| 34 | _HASH = long_id("1" * 64) |
| 35 | |
| 36 | _XSS_PAYLOADS = [ |
| 37 | "<script>alert(1)</script>", |
| 38 | '"><img src=x onerror=alert(1)>', |
| 39 | "javascript:alert(1)", |
| 40 | "{{7*7}}", |
| 41 | ] |
| 42 | |
| 43 | _SQL_PAYLOADS = [ |
| 44 | "' OR '1'='1", |
| 45 | "1; DROP TABLE musehub_intel_clones; --", |
| 46 | "UNION SELECT * FROM musehub_repos--", |
| 47 | "' AND SLEEP(5)--", |
| 48 | ] |
| 49 | |
| 50 | |
| 51 | @pytest_asyncio.fixture |
| 52 | async def repo(db_session: AsyncSession) -> MusehubRepo: |
| 53 | r = await create_repo(db_session, owner="secuser", slug="sec-test") |
| 54 | members = json.dumps([ |
| 55 | { |
| 56 | "address": "src/a.py::fn_0", |
| 57 | "kind": "function", |
| 58 | "language": "Python", |
| 59 | "body_hash": long_id("a" * 64), |
| 60 | "signature_id": long_id("b" * 64), |
| 61 | "content_id": long_id("a" * 64), |
| 62 | } |
| 63 | ]) |
| 64 | await db_session.execute( |
| 65 | pg_insert(MusehubIntelClones) |
| 66 | .values( |
| 67 | repo_id=str(r.repo_id), |
| 68 | cluster_hash=_HASH, |
| 69 | tier="exact", |
| 70 | member_count=1, |
| 71 | members_json=members, |
| 72 | ref=_REF, |
| 73 | ) |
| 74 | .on_conflict_do_nothing() |
| 75 | ) |
| 76 | await db_session.commit() |
| 77 | return r |
| 78 | |
| 79 | |
| 80 | @pytest_asyncio.fixture |
| 81 | async def xss_repo(db_session: AsyncSession) -> tuple[MusehubRepo, str]: |
| 82 | """A repo whose cluster members_json contains raw HTML tags.""" |
| 83 | r = await create_repo(db_session, owner="secuser", slug="sec-xss") |
| 84 | members = json.dumps([ |
| 85 | { |
| 86 | "address": '<script>alert(1)</script>::fn_0', |
| 87 | "kind": "<b>evil</b>", |
| 88 | "language": "Python", |
| 89 | "body_hash": long_id("a" * 64), |
| 90 | "signature_id": long_id("b" * 64), |
| 91 | "content_id": long_id("a" * 64), |
| 92 | } |
| 93 | ]) |
| 94 | h = long_id("x" * 64) |
| 95 | await db_session.execute( |
| 96 | pg_insert(MusehubIntelClones) |
| 97 | .values( |
| 98 | repo_id=str(r.repo_id), |
| 99 | cluster_hash=h, |
| 100 | tier="exact", |
| 101 | member_count=1, |
| 102 | members_json=members, |
| 103 | ref=_REF, |
| 104 | ) |
| 105 | .on_conflict_do_nothing() |
| 106 | ) |
| 107 | await db_session.commit() |
| 108 | return r, h |
| 109 | |
| 110 | |
| 111 | class TestClonesSecurity: |
| 112 | |
| 113 | @pytest.mark.asyncio |
| 114 | @pytest.mark.parametrize("traversal", [ |
| 115 | "../../../etc/passwd", |
| 116 | "sha256:../secret", |
| 117 | "../../../../proc/self/environ", |
| 118 | "%2e%2e%2f%2e%2e%2fetc%2fpasswd", |
| 119 | ]) |
| 120 | async def test_SEC01_path_traversal_in_cluster_param( |
| 121 | self, client: AsyncClient, repo: MusehubRepo, traversal: str |
| 122 | ) -> None: |
| 123 | """Path traversal sequences in cluster param return 200 with no FS content.""" |
| 124 | r = await client.get( |
| 125 | f"/secuser/sec-test/intel/clones/detail?cluster={traversal}" |
| 126 | ) |
| 127 | assert r.status_code == 200 |
| 128 | body = r.text |
| 129 | # Ensure no filesystem content was leaked — not that the path isn't echoed |
| 130 | assert "root:x:" not in body |
| 131 | assert "UID=" not in body |
| 132 | |
| 133 | @pytest.mark.asyncio |
| 134 | @pytest.mark.parametrize("payload", _SQL_PAYLOADS) |
| 135 | async def test_SEC02_sql_injection_in_tier_param( |
| 136 | self, client: AsyncClient, repo: MusehubRepo, payload: str |
| 137 | ) -> None: |
| 138 | """SQL meta-characters in tier param are sanitised — 200, no DB error.""" |
| 139 | r = await client.get( |
| 140 | f"/secuser/sec-test/intel/clones", |
| 141 | params={"tier": payload}, |
| 142 | ) |
| 143 | assert r.status_code == 200 |
| 144 | |
| 145 | @pytest.mark.asyncio |
| 146 | @pytest.mark.parametrize("payload", [ |
| 147 | "99999999999999999999", |
| 148 | "0; DROP TABLE musehub_repos; --", |
| 149 | "-1", |
| 150 | "1 UNION SELECT 1--", |
| 151 | ]) |
| 152 | async def test_SEC03_sql_injection_in_top_param( |
| 153 | self, client: AsyncClient, repo: MusehubRepo, payload: str |
| 154 | ) -> None: |
| 155 | """SQL injection / overflow in top param is rejected by type validation or clamped.""" |
| 156 | r = await client.get( |
| 157 | f"/secuser/sec-test/intel/clones", |
| 158 | params={"top": payload}, |
| 159 | ) |
| 160 | # 422 = FastAPI type validation rejected the non-integer (good) |
| 161 | # 200 = value was clamped to a valid top (also good) |
| 162 | # 500 = unhandled exception (bad — must never happen) |
| 163 | assert r.status_code in (200, 422), ( |
| 164 | f"Expected 200 or 422, got {r.status_code}" |
| 165 | ) |
| 166 | |
| 167 | @pytest.mark.asyncio |
| 168 | @pytest.mark.parametrize("xss", _XSS_PAYLOADS) |
| 169 | async def test_SEC04_xss_in_cluster_param( |
| 170 | self, client: AsyncClient, repo: MusehubRepo, xss: str |
| 171 | ) -> None: |
| 172 | """XSS payloads in cluster param are HTML-escaped by Jinja2 autoescape.""" |
| 173 | r = await client.get( |
| 174 | f"/secuser/sec-test/intel/clones/detail", |
| 175 | params={"cluster": xss}, |
| 176 | ) |
| 177 | assert r.status_code == 200 |
| 178 | body = r.text |
| 179 | # Raw payload must not appear unescaped — Jinja2 autoescape converts < and > to entities |
| 180 | assert "<script>alert(1)" not in body |
| 181 | # onerror attribute is only dangerous when followed by unescaped > |
| 182 | assert 'onerror=alert(1)>' not in body |
| 183 | |
| 184 | @pytest.mark.asyncio |
| 185 | async def test_SEC05_members_json_html_escaped_in_output( |
| 186 | self, client: AsyncClient, xss_repo: tuple[MusehubRepo, str] |
| 187 | ) -> None: |
| 188 | """HTML tags stored in members_json are escaped — never rendered raw.""" |
| 189 | repo, h = xss_repo |
| 190 | r = await client.get( |
| 191 | f"/secuser/sec-xss/intel/clones/detail?cluster={h}" |
| 192 | ) |
| 193 | assert r.status_code == 200 |
| 194 | body = r.text |
| 195 | # The raw <script>…</script> tag from members_json must not appear unescaped |
| 196 | assert "<script>alert(1)</script>" not in body |
| 197 | assert "<b>evil</b>" not in body |
| 198 | |
| 199 | @pytest.mark.asyncio |
| 200 | async def test_SEC06_oversized_cluster_param_returns_200( |
| 201 | self, client: AsyncClient, repo: MusehubRepo |
| 202 | ) -> None: |
| 203 | """A 4 096-character cluster param is handled gracefully — 200, not 500.""" |
| 204 | huge = long_id("a" * (4096 - 7)) |
| 205 | r = await client.get( |
| 206 | f"/secuser/sec-test/intel/clones/detail?cluster={huge}" |
| 207 | ) |
| 208 | assert r.status_code == 200 |
| 209 | |
| 210 | @pytest.mark.asyncio |
| 211 | async def test_SEC07_null_bytes_in_cluster_param( |
| 212 | self, client: AsyncClient, repo: MusehubRepo |
| 213 | ) -> None: |
| 214 | """Null bytes in cluster param are stripped — 200, not 500.""" |
| 215 | null_cluster = "sha256:\x001234\x00abcd" |
| 216 | r = await client.get( |
| 217 | f"/secuser/sec-test/intel/clones/detail", |
| 218 | params={"cluster": null_cluster}, |
| 219 | ) |
| 220 | assert r.status_code == 200 |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
20 days ago