"""Section 17 — Mists Security Hardening: adversarial and access-control tests.
Covers every security property stated in the Phase 8 spec:
Filename attacks Path traversal, null bytes, control characters,
ANSI escape sequences, path separators — all must
return 422 Unprocessable Entity.
Content injection HTML/JS payloads stored verbatim; no server-side
sanitisation that would alter or strip content.
The embedding contract lives at the template layer
(Jinja2 auto-escapes {{ mist.content | e }}).
Access control Secret mists invisible to non-owners in list and
explore endpoints; non-owner update/delete blocked;
secret mist detail returns 403 to prevent leaking
existence.
Collision resistance Identical bytes → identical mist_id → POST returns
409 Conflict (idempotent, not an error the caller
should retry).
Large content Body ≥ 11 MiB triggers ContentSizeLimitMiddleware
→ 413 Request Entity Too Large before any DB write.
Fork depth Fork chain capped at 5 levels; attempting to fork
a depth-5 mist returns 422.
"""
from __future__ import annotations
import secrets
from datetime import datetime, timezone
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.core.genesis import compute_identity_id, compute_repo_id
from musehub.db.musehub_repo_models import MusehubRepo
from musehub.types.json_types import JSONObject, JSONValue, StrDict
_OWNER = "testuser" # matches conftest._TEST_HANDLE
_OTHER = "otheruser"
async def _make_mist_repo(
db_session: AsyncSession,
mid: str,
owner: str,
visibility: str = "public",
) -> MusehubRepo:
"""Create a MusehubRepo for a mist test and return it."""
created_at = datetime.now(tz=timezone.utc)
owner_id = compute_identity_id(owner.encode())
repo_id = compute_repo_id(owner_id, mid, "code", created_at.isoformat())
repo = MusehubRepo(
repo_id=repo_id,
name=mid, owner=owner, slug=mid,
visibility=visibility, owner_user_id=owner_id,
created_at=created_at, updated_at=created_at,
)
db_session.add(repo)
await db_session.flush()
return repo
_PY_CONTENT = "def hello():\n return 'hello world'\n"
def _payload(**overrides: JSONValue) -> JSONObject:
base: JSONObject = {
"filename": f"sec_{secrets.token_hex(4)}.py",
"content": _PY_CONTENT + secrets.token_hex(16), # unique content per call
"visibility": "public",
}
base.update(overrides)
return base
async def _create(client: AsyncClient, headers: StrDict, **overrides: JSONValue) -> JSONObject:
r = await client.post("/api/mists", json=_payload(**overrides), headers=headers)
assert r.status_code == 201, r.text
return dict(r.json())
# ═══════════════════════════════════════════════════════════════════════════════
# Filename attacks
# ═══════════════════════════════════════════════════════════════════════════════
class TestFilenameAttacks:
"""POST /api/mists with malicious filenames must be rejected (422)."""
@pytest.mark.anyio
async def test_path_traversal_dotdot(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="../evil.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_path_traversal_deep(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="../../etc/passwd"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_null_byte(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="evil\x00.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_forward_slash_separator(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="subdir/evil.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_backslash_separator(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="subdir\\evil.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_control_character_tab(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="evil\t.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_control_character_newline(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="evil\n.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_ansi_escape_sequence(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename="\x1b[31mevil\x1b[0m.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_overlong_filename(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename=f"{'a' * 256}.py"),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_empty_filename_rejected(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(filename=""),
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_valid_filename_accepted(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
"""Confirm the gate accepts ordinary safe filenames."""
r = await client.post(
"/api/mists",
json=_payload(filename="valid_name.py"),
headers=auth_headers,
)
assert r.status_code == 201
# ═══════════════════════════════════════════════════════════════════════════════
# Content injection
# ═══════════════════════════════════════════════════════════════════════════════
class TestContentInjection:
"""HTML/JS payloads must be stored verbatim; no server-side stripping."""
@pytest.mark.anyio
async def test_xss_script_tag_stored_verbatim(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
xss = ''
body = _payload(content=xss + secrets.token_hex(16))
r = await client.post("/api/mists", json=body, headers=auth_headers)
assert r.status_code == 201
mist_id = r.json()["mistId"]
r2 = await client.get(f"/api/mists/{mist_id}")
assert r2.status_code == 200
assert xss in r2.json()["content"], "XSS payload must be stored verbatim"
@pytest.mark.anyio
async def test_html_entity_stored_verbatim(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
payload = '
<not-encoded>'
body = _payload(content=payload + secrets.token_hex(16))
r = await client.post("/api/mists", json=body, headers=auth_headers)
assert r.status_code == 201
mist_id = r.json()["mistId"]
r2 = await client.get(f"/api/mists/{mist_id}")
assert r2.status_code == 200
# Content stored as-is — sanitisation is the template's responsibility.
assert payload in r2.json()["content"]
@pytest.mark.anyio
async def test_unicode_content_roundtrips(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
unicode_content = f"# 日本語テスト\nprint('こんにちは世界')\n{secrets.token_hex(16)}"
body = _payload(content=unicode_content)
r = await client.post("/api/mists", json=body, headers=auth_headers)
assert r.status_code == 201
mist_id = r.json()["mistId"]
r2 = await client.get(f"/api/mists/{mist_id}")
assert r2.status_code == 200
assert unicode_content in r2.json()["content"]
# ═══════════════════════════════════════════════════════════════════════════════
# Access control
# ═══════════════════════════════════════════════════════════════════════════════
class TestAccessControl:
"""Secret mists invisible to non-owners; non-owner mutations blocked."""
@pytest.mark.anyio
async def test_secret_mist_not_in_other_owners_list(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
# Create a secret mist owned by "otheruser" directly via service layer.
# auth_headers authenticates as "testuser" — a legitimate non-owner.
from muse.plugins.mist.plugin import compute_mist_id
from musehub.services.musehub_mists import create_mist as _svc_create
content = f"secret_list {secrets.token_hex(16)}"
mid = compute_mist_id(content.encode())
repo = await _make_mist_repo(db_session, mid, "otheruser", "secret")
await _svc_create(
db_session, mist_id=mid, filename="secret.py", content=content,
owner="otheruser", repo_id=str(repo.repo_id), visibility="secret",
)
await db_session.commit()
# testuser (non-owner) fetching otheruser's list must not see the secret mist.
r = await client.get("/api/otheruser/mists", headers=auth_headers)
assert r.status_code == 200
ids = [m["mistId"] for m in r.json()["mists"]]
assert mid not in ids, "Secret mist must not appear in non-owner's list view"
@pytest.mark.anyio
async def test_secret_mist_not_in_explore(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
mist = await _create(client, auth_headers, visibility="secret")
mist_id = mist["mistId"]
r = await client.get("/api/mists/explore")
assert r.status_code == 200
ids = [m["mistId"] for m in r.json()["mists"]]
assert mist_id not in ids, "Secret mist must not appear in explore feed"
@pytest.mark.anyio
async def test_secret_mist_detail_returns_403_for_non_owner(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
# Create a secret mist owned by "otheruser" directly via service layer.
# testuser (from auth_headers override) is the authenticated non-owner caller.
from muse.plugins.mist.plugin import compute_mist_id
from musehub.services.musehub_mists import create_mist as _svc_create
content = f"secret_detail {secrets.token_hex(16)}"
mid = compute_mist_id(content.encode())
repo = await _make_mist_repo(db_session, mid, "otheruser", "secret")
await _svc_create(
db_session, mist_id=mid, filename="secret.py", content=content,
owner="otheruser", repo_id=str(repo.repo_id), visibility="secret",
)
await db_session.commit()
# testuser is authenticated but is not the owner of this secret mist.
r = await client.get(f"/api/mists/{mid}", headers=auth_headers)
assert r.status_code in (403, 404), (
"Secret mist must not be accessible to non-owner (even authenticated)"
)
@pytest.mark.anyio
async def test_non_owner_update_returns_404(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
# Create a mist directly via service layer owned by "otheruser".
# auth_headers authenticates as "testuser" — a legitimate non-owner caller.
from muse.plugins.mist.plugin import compute_mist_id
from musehub.services.musehub_mists import create_mist as _svc_create
content = f"non_owner_upd {secrets.token_hex(16)}"
mid = compute_mist_id(content.encode())
repo = await _make_mist_repo(db_session, mid, "otheruser")
await _svc_create(
db_session, mist_id=mid, filename="f.py", content=content,
owner="otheruser", repo_id=str(repo.repo_id),
)
await db_session.commit()
r = await client.patch(
f"/api/mists/{mid}",
json={"title": "Hijacked"},
headers=auth_headers, # testuser ≠ otheruser
)
assert r.status_code in (403, 404)
@pytest.mark.anyio
async def test_non_owner_delete_returns_404(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
from muse.plugins.mist.plugin import compute_mist_id
from musehub.services.musehub_mists import create_mist as _svc_create
content = f"non_owner_del {secrets.token_hex(16)}"
mid = compute_mist_id(content.encode())
repo = await _make_mist_repo(db_session, mid, "otheruser")
await _svc_create(
db_session, mist_id=mid, filename="f.py", content=content,
owner="otheruser", repo_id=str(repo.repo_id),
)
await db_session.commit()
r = await client.delete(
f"/api/mists/{mid}",
headers=auth_headers, # testuser ≠ otheruser
)
assert r.status_code in (403, 404)
@pytest.mark.anyio
async def test_unauthenticated_create_returns_401(
self, client: AsyncClient
) -> None:
r = await client.post("/api/mists", json=_payload())
assert r.status_code == 401
@pytest.mark.anyio
async def test_unauthenticated_update_returns_401(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
# Create via service layer so auth_headers fixture is NOT active.
from muse.plugins.mist.plugin import compute_mist_id
from musehub.services.musehub_mists import create_mist as _svc_create
content = f"unauth_upd {secrets.token_hex(16)}"
mid = compute_mist_id(content.encode())
repo = await _make_mist_repo(db_session, mid, "testuser")
await _svc_create(
db_session, mist_id=mid, filename="f.py", content=content,
owner="testuser", repo_id=str(repo.repo_id),
)
await db_session.commit()
r = await client.patch(f"/api/mists/{mid}", json={"title": "x"})
assert r.status_code == 401
@pytest.mark.anyio
async def test_unauthenticated_delete_returns_401(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
from muse.plugins.mist.plugin import compute_mist_id
from musehub.services.musehub_mists import create_mist as _svc_create
content = f"unauth_del {secrets.token_hex(16)}"
mid = compute_mist_id(content.encode())
repo = await _make_mist_repo(db_session, mid, "testuser")
await _svc_create(
db_session, mist_id=mid, filename="f.py", content=content,
owner="testuser", repo_id=str(repo.repo_id),
)
await db_session.commit()
r = await client.delete(f"/api/mists/{mid}")
assert r.status_code == 401
@pytest.mark.anyio
async def test_owner_can_see_own_secret_mist(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
mist = await _create(client, auth_headers, visibility="secret")
mist_id = mist["mistId"]
r = await client.get(f"/api/mists/{mist_id}", headers=auth_headers)
assert r.status_code == 200
assert r.json()["mistId"] == mist_id
# ═══════════════════════════════════════════════════════════════════════════════
# Collision resistance
# ═══════════════════════════════════════════════════════════════════════════════
class TestCollisionResistance:
"""Identical content bytes → identical mist_id → POST returns 409."""
@pytest.mark.anyio
async def test_duplicate_content_returns_409(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
fixed_content = "def idempotent(): return 42\n"
body = _payload(content=fixed_content, filename="idempotent.py")
r1 = await client.post("/api/mists", json=body, headers=auth_headers)
assert r1.status_code == 201
mist_id = r1.json()["mistId"]
r2 = await client.post("/api/mists", json=body, headers=auth_headers)
assert r2.status_code == 409, (
"Re-posting identical content must return 409 (content-addressed)"
)
@pytest.mark.anyio
async def test_different_content_different_id(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
r1 = await _create(client, auth_headers, content=f"content_a {secrets.token_hex(16)}")
r2 = await _create(client, auth_headers, content=f"content_b {secrets.token_hex(16)}")
assert r1["mistId"] != r2["mistId"]
@pytest.mark.anyio
async def test_mist_id_deterministic_from_content(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
"""mist_id is deterministic — recomputing it offline verifies integrity."""
from muse.plugins.mist.plugin import compute_mist_id
content = f"def check(): pass\n# unique: {secrets.token_hex(16)}"
r = await _create(client, auth_headers, content=content)
expected_id = compute_mist_id(content.encode("utf-8"))
assert r["mistId"] == expected_id
# ═══════════════════════════════════════════════════════════════════════════════
# Large content rejection
# ═══════════════════════════════════════════════════════════════════════════════
class TestLargeContentRejection:
"""Requests whose body exceeds 10 MiB must be rejected (413)."""
@pytest.mark.anyio
async def test_oversized_content_returns_413(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
# 11 MiB of ASCII content — well above the 10 MiB middleware cap.
oversized = "x" * (11 * 1024 * 1024)
body: JSONObject = {
"filename": "large.py",
"content": oversized,
"visibility": "public",
}
r = await client.post("/api/mists", json=body, headers=auth_headers)
assert r.status_code == 413, (
"Content > 10 MiB must be rejected by ContentSizeLimitMiddleware"
)
@pytest.mark.anyio
async def test_near_limit_content_accepted(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
"""Content just under 1 MiB should succeed (sanity check)."""
content = "a" * (512 * 1024) # 512 KiB — well within limit
r = await client.post(
"/api/mists",
json=_payload(content=content),
headers=auth_headers,
)
assert r.status_code == 201
# ═══════════════════════════════════════════════════════════════════════════════
# Fork depth enforcement
# ═══════════════════════════════════════════════════════════════════════════════
class TestForkDepthEnforcement:
"""Fork chain is capped at depth 5; further forks must return 422."""
@pytest.mark.anyio
async def test_fork_chain_to_max_depth(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
# Create the root mist.
root = await _create(client, auth_headers)
current_id = root["mistId"]
# Fork 5 times — all must succeed.
for depth in range(1, 6):
r = await client.post(
f"/api/mists/{current_id}/fork", headers=auth_headers
)
assert r.status_code == 201, (
f"Fork at depth {depth} must succeed; got {r.status_code}: {r.text}"
)
current_id = r.json()["mistId"]
@pytest.mark.anyio
async def test_fork_past_max_depth_returns_422(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
# Build a chain of depth 5.
root = await _create(client, auth_headers)
current_id = root["mistId"]
for _ in range(5):
r = await client.post(
f"/api/mists/{current_id}/fork", headers=auth_headers
)
assert r.status_code == 201
current_id = r.json()["mistId"]
# Forking the depth-5 mist must fail.
r = await client.post(
f"/api/mists/{current_id}/fork", headers=auth_headers
)
assert r.status_code == 422, (
f"Fork past depth 5 must be rejected; got {r.status_code}: {r.text}"
)
@pytest.mark.anyio
async def test_fork_nonexistent_mist_returns_404(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
r = await client.post(
"/api/mists/doesnotexist/fork", headers=auth_headers
)
assert r.status_code == 404
# ═══════════════════════════════════════════════════════════════════════════════
# Tag injection
# ═══════════════════════════════════════════════════════════════════════════════
class TestTagSecurity:
"""Tags have count and length limits; HTML-special and null-byte tags rejected."""
@pytest.mark.anyio
async def test_too_many_tags_rejected(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(tags=[f"tag{i}" for i in range(11)]), # max is 10
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_overlong_tag_rejected(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(tags=["a" * 65]), # max is 64
headers=auth_headers,
)
assert r.status_code == 422
@pytest.mark.anyio
async def test_null_byte_in_tag_rejected(
self, client: AsyncClient, auth_headers: StrDict
) -> None:
r = await client.post(
"/api/mists",
json=_payload(tags=["evil\x00tag"]),
headers=auth_headers,
)
assert r.status_code == 422