"""Section 17 — Mists Security Hardening: adversarial and access-control tests. Covers every security property stated in the Phase 8 spec: Filename attacks Path traversal, null bytes, control characters, ANSI escape sequences, path separators — all must return 422 Unprocessable Entity. Content injection HTML/JS payloads stored verbatim; no server-side sanitisation that would alter or strip content. The embedding contract lives at the template layer (Jinja2 auto-escapes {{ mist.content | e }}). Access control Secret mists invisible to non-owners in list and explore endpoints; non-owner update/delete blocked; secret mist detail returns 403 to prevent leaking existence. Collision resistance Identical bytes → identical mist_id → POST returns 409 Conflict (idempotent, not an error the caller should retry). Large content Body ≥ 11 MiB triggers ContentSizeLimitMiddleware → 413 Request Entity Too Large before any DB write. Fork depth Fork chain capped at 5 levels; attempting to fork a depth-5 mist returns 422. """ from __future__ import annotations import secrets from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.types.json_types import JSONObject, JSONValue, StrDict _OWNER = "testuser" # matches conftest._TEST_HANDLE _OTHER = "otheruser" async def _make_mist_repo( db_session: AsyncSession, mid: str, owner: str, visibility: str = "public", ) -> MusehubRepo: """Create a MusehubRepo for a mist test and return it.""" created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(owner.encode()) repo_id = compute_repo_id(owner_id, mid, "code", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=mid, owner=owner, slug=mid, visibility=visibility, owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) db_session.add(repo) await db_session.flush() return repo _PY_CONTENT = "def hello():\n return 'hello world'\n" def _payload(**overrides: JSONValue) -> JSONObject: base: JSONObject = { "filename": f"sec_{secrets.token_hex(4)}.py", "content": _PY_CONTENT + secrets.token_hex(16), # unique content per call "visibility": "public", } base.update(overrides) return base async def _create(client: AsyncClient, headers: StrDict, **overrides: JSONValue) -> JSONObject: r = await client.post("/api/mists", json=_payload(**overrides), headers=headers) assert r.status_code == 201, r.text return dict(r.json()) # ═══════════════════════════════════════════════════════════════════════════════ # Filename attacks # ═══════════════════════════════════════════════════════════════════════════════ class TestFilenameAttacks: """POST /api/mists with malicious filenames must be rejected (422).""" @pytest.mark.anyio async def test_path_traversal_dotdot( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="../evil.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_path_traversal_deep( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="../../etc/passwd"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_null_byte( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="evil\x00.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_forward_slash_separator( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="subdir/evil.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_backslash_separator( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="subdir\\evil.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_control_character_tab( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="evil\t.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_control_character_newline( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="evil\n.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_ansi_escape_sequence( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename="\x1b[31mevil\x1b[0m.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_overlong_filename( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename=f"{'a' * 256}.py"), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_empty_filename_rejected( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(filename=""), headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_valid_filename_accepted( self, client: AsyncClient, auth_headers: StrDict ) -> None: """Confirm the gate accepts ordinary safe filenames.""" r = await client.post( "/api/mists", json=_payload(filename="valid_name.py"), headers=auth_headers, ) assert r.status_code == 201 # ═══════════════════════════════════════════════════════════════════════════════ # Content injection # ═══════════════════════════════════════════════════════════════════════════════ class TestContentInjection: """HTML/JS payloads must be stored verbatim; no server-side stripping.""" @pytest.mark.anyio async def test_xss_script_tag_stored_verbatim( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: xss = '' body = _payload(content=xss + secrets.token_hex(16)) r = await client.post("/api/mists", json=body, headers=auth_headers) assert r.status_code == 201 mist_id = r.json()["mistId"] r2 = await client.get(f"/api/mists/{mist_id}") assert r2.status_code == 200 assert xss in r2.json()["content"], "XSS payload must be stored verbatim" @pytest.mark.anyio async def test_html_entity_stored_verbatim( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: payload = ' <not-encoded>' body = _payload(content=payload + secrets.token_hex(16)) r = await client.post("/api/mists", json=body, headers=auth_headers) assert r.status_code == 201 mist_id = r.json()["mistId"] r2 = await client.get(f"/api/mists/{mist_id}") assert r2.status_code == 200 # Content stored as-is — sanitisation is the template's responsibility. assert payload in r2.json()["content"] @pytest.mark.anyio async def test_unicode_content_roundtrips( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: unicode_content = f"# 日本語テスト\nprint('こんにちは世界')\n{secrets.token_hex(16)}" body = _payload(content=unicode_content) r = await client.post("/api/mists", json=body, headers=auth_headers) assert r.status_code == 201 mist_id = r.json()["mistId"] r2 = await client.get(f"/api/mists/{mist_id}") assert r2.status_code == 200 assert unicode_content in r2.json()["content"] # ═══════════════════════════════════════════════════════════════════════════════ # Access control # ═══════════════════════════════════════════════════════════════════════════════ class TestAccessControl: """Secret mists invisible to non-owners; non-owner mutations blocked.""" @pytest.mark.anyio async def test_secret_mist_not_in_other_owners_list( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: # Create a secret mist owned by "otheruser" directly via service layer. # auth_headers authenticates as "testuser" — a legitimate non-owner. from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"secret_list {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) repo = await _make_mist_repo(db_session, mid, "otheruser", "secret") await _svc_create( db_session, mist_id=mid, filename="secret.py", content=content, owner="otheruser", repo_id=str(repo.repo_id), visibility="secret", ) await db_session.commit() # testuser (non-owner) fetching otheruser's list must not see the secret mist. r = await client.get("/api/otheruser/mists", headers=auth_headers) assert r.status_code == 200 ids = [m["mistId"] for m in r.json()["mists"]] assert mid not in ids, "Secret mist must not appear in non-owner's list view" @pytest.mark.anyio async def test_secret_mist_not_in_explore( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: mist = await _create(client, auth_headers, visibility="secret") mist_id = mist["mistId"] r = await client.get("/api/mists/explore") assert r.status_code == 200 ids = [m["mistId"] for m in r.json()["mists"]] assert mist_id not in ids, "Secret mist must not appear in explore feed" @pytest.mark.anyio async def test_secret_mist_detail_returns_403_for_non_owner( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: # Create a secret mist owned by "otheruser" directly via service layer. # testuser (from auth_headers override) is the authenticated non-owner caller. from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"secret_detail {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) repo = await _make_mist_repo(db_session, mid, "otheruser", "secret") await _svc_create( db_session, mist_id=mid, filename="secret.py", content=content, owner="otheruser", repo_id=str(repo.repo_id), visibility="secret", ) await db_session.commit() # testuser is authenticated but is not the owner of this secret mist. r = await client.get(f"/api/mists/{mid}", headers=auth_headers) assert r.status_code in (403, 404), ( "Secret mist must not be accessible to non-owner (even authenticated)" ) @pytest.mark.anyio async def test_non_owner_update_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: # Create a mist directly via service layer owned by "otheruser". # auth_headers authenticates as "testuser" — a legitimate non-owner caller. from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"non_owner_upd {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) repo = await _make_mist_repo(db_session, mid, "otheruser") await _svc_create( db_session, mist_id=mid, filename="f.py", content=content, owner="otheruser", repo_id=str(repo.repo_id), ) await db_session.commit() r = await client.patch( f"/api/mists/{mid}", json={"title": "Hijacked"}, headers=auth_headers, # testuser ≠ otheruser ) assert r.status_code in (403, 404) @pytest.mark.anyio async def test_non_owner_delete_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"non_owner_del {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) repo = await _make_mist_repo(db_session, mid, "otheruser") await _svc_create( db_session, mist_id=mid, filename="f.py", content=content, owner="otheruser", repo_id=str(repo.repo_id), ) await db_session.commit() r = await client.delete( f"/api/mists/{mid}", headers=auth_headers, # testuser ≠ otheruser ) assert r.status_code in (403, 404) @pytest.mark.anyio async def test_unauthenticated_create_returns_401( self, client: AsyncClient ) -> None: r = await client.post("/api/mists", json=_payload()) assert r.status_code == 401 @pytest.mark.anyio async def test_unauthenticated_update_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: # Create via service layer so auth_headers fixture is NOT active. from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"unauth_upd {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) repo = await _make_mist_repo(db_session, mid, "testuser") await _svc_create( db_session, mist_id=mid, filename="f.py", content=content, owner="testuser", repo_id=str(repo.repo_id), ) await db_session.commit() r = await client.patch(f"/api/mists/{mid}", json={"title": "x"}) assert r.status_code == 401 @pytest.mark.anyio async def test_unauthenticated_delete_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"unauth_del {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) repo = await _make_mist_repo(db_session, mid, "testuser") await _svc_create( db_session, mist_id=mid, filename="f.py", content=content, owner="testuser", repo_id=str(repo.repo_id), ) await db_session.commit() r = await client.delete(f"/api/mists/{mid}") assert r.status_code == 401 @pytest.mark.anyio async def test_owner_can_see_own_secret_mist( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: mist = await _create(client, auth_headers, visibility="secret") mist_id = mist["mistId"] r = await client.get(f"/api/mists/{mist_id}", headers=auth_headers) assert r.status_code == 200 assert r.json()["mistId"] == mist_id # ═══════════════════════════════════════════════════════════════════════════════ # Collision resistance # ═══════════════════════════════════════════════════════════════════════════════ class TestCollisionResistance: """Identical content bytes → identical mist_id → POST returns 409.""" @pytest.mark.anyio async def test_duplicate_content_returns_409( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: fixed_content = "def idempotent(): return 42\n" body = _payload(content=fixed_content, filename="idempotent.py") r1 = await client.post("/api/mists", json=body, headers=auth_headers) assert r1.status_code == 201 mist_id = r1.json()["mistId"] r2 = await client.post("/api/mists", json=body, headers=auth_headers) assert r2.status_code == 409, ( "Re-posting identical content must return 409 (content-addressed)" ) @pytest.mark.anyio async def test_different_content_different_id( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: r1 = await _create(client, auth_headers, content=f"content_a {secrets.token_hex(16)}") r2 = await _create(client, auth_headers, content=f"content_b {secrets.token_hex(16)}") assert r1["mistId"] != r2["mistId"] @pytest.mark.anyio async def test_mist_id_deterministic_from_content( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """mist_id is deterministic — recomputing it offline verifies integrity.""" from muse.plugins.mist.plugin import compute_mist_id content = f"def check(): pass\n# unique: {secrets.token_hex(16)}" r = await _create(client, auth_headers, content=content) expected_id = compute_mist_id(content.encode("utf-8")) assert r["mistId"] == expected_id # ═══════════════════════════════════════════════════════════════════════════════ # Large content rejection # ═══════════════════════════════════════════════════════════════════════════════ class TestLargeContentRejection: """Requests whose body exceeds 10 MiB must be rejected (413).""" @pytest.mark.anyio async def test_oversized_content_returns_413( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: # 11 MiB of ASCII content — well above the 10 MiB middleware cap. oversized = "x" * (11 * 1024 * 1024) body: JSONObject = { "filename": "large.py", "content": oversized, "visibility": "public", } r = await client.post("/api/mists", json=body, headers=auth_headers) assert r.status_code == 413, ( "Content > 10 MiB must be rejected by ContentSizeLimitMiddleware" ) @pytest.mark.anyio async def test_near_limit_content_accepted( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Content just under 1 MiB should succeed (sanity check).""" content = "a" * (512 * 1024) # 512 KiB — well within limit r = await client.post( "/api/mists", json=_payload(content=content), headers=auth_headers, ) assert r.status_code == 201 # ═══════════════════════════════════════════════════════════════════════════════ # Fork depth enforcement # ═══════════════════════════════════════════════════════════════════════════════ class TestForkDepthEnforcement: """Fork chain is capped at depth 5; further forks must return 422.""" @pytest.mark.anyio async def test_fork_chain_to_max_depth( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: # Create the root mist. root = await _create(client, auth_headers) current_id = root["mistId"] # Fork 5 times — all must succeed. for depth in range(1, 6): r = await client.post( f"/api/mists/{current_id}/fork", headers=auth_headers ) assert r.status_code == 201, ( f"Fork at depth {depth} must succeed; got {r.status_code}: {r.text}" ) current_id = r.json()["mistId"] @pytest.mark.anyio async def test_fork_past_max_depth_returns_422( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: # Build a chain of depth 5. root = await _create(client, auth_headers) current_id = root["mistId"] for _ in range(5): r = await client.post( f"/api/mists/{current_id}/fork", headers=auth_headers ) assert r.status_code == 201 current_id = r.json()["mistId"] # Forking the depth-5 mist must fail. r = await client.post( f"/api/mists/{current_id}/fork", headers=auth_headers ) assert r.status_code == 422, ( f"Fork past depth 5 must be rejected; got {r.status_code}: {r.text}" ) @pytest.mark.anyio async def test_fork_nonexistent_mist_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: r = await client.post( "/api/mists/doesnotexist/fork", headers=auth_headers ) assert r.status_code == 404 # ═══════════════════════════════════════════════════════════════════════════════ # Tag injection # ═══════════════════════════════════════════════════════════════════════════════ class TestTagSecurity: """Tags have count and length limits; HTML-special and null-byte tags rejected.""" @pytest.mark.anyio async def test_too_many_tags_rejected( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(tags=[f"tag{i}" for i in range(11)]), # max is 10 headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_overlong_tag_rejected( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(tags=["a" * 65]), # max is 64 headers=auth_headers, ) assert r.status_code == 422 @pytest.mark.anyio async def test_null_byte_in_tag_rejected( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/mists", json=_payload(tags=["evil\x00tag"]), headers=auth_headers, ) assert r.status_code == 422