gabriel / musehub public

test_mist_security.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Section 17 β€” Mists Security Hardening: adversarial and access-control tests.
2
3 Covers every security property stated in the Phase 8 spec:
4
5 Filename attacks Path traversal, null bytes, control characters,
6 ANSI escape sequences, path separators β€” all must
7 return 422 Unprocessable Entity.
8
9 Content injection HTML/JS payloads stored verbatim; no server-side
10 sanitisation that would alter or strip content.
11 The embedding contract lives at the template layer
12 (Jinja2 auto-escapes {{ mist.content | e }}).
13
14 Access control Secret mists invisible to non-owners in list and
15 explore endpoints; non-owner update/delete blocked;
16 secret mist detail returns 403 to prevent leaking
17 existence.
18
19 Collision resistance Identical bytes β†’ identical mist_id β†’ POST returns
20 409 Conflict (idempotent, not an error the caller
21 should retry).
22
23 Large content Body β‰₯ 11 MiB triggers ContentSizeLimitMiddleware
24 β†’ 413 Request Entity Too Large before any DB write.
25
26 Fork depth Fork chain capped at 5 levels; attempting to fork
27 a depth-5 mist returns 422.
28 """
29 from __future__ import annotations
30
31 import secrets
32 from datetime import datetime, timezone
33
34 import pytest
35 from httpx import AsyncClient
36 from sqlalchemy.ext.asyncio import AsyncSession
37
38 from musehub.core.genesis import compute_identity_id, compute_repo_id
39 from musehub.db.musehub_repo_models import MusehubRepo
40 from musehub.types.json_types import JSONObject, JSONValue, StrDict
41
42 _OWNER = "testuser" # matches conftest._TEST_HANDLE
43 _OTHER = "otheruser"
44
45
46 async def _make_mist_repo(
47 db_session: AsyncSession,
48 mid: str,
49 owner: str,
50 visibility: str = "public",
51 ) -> MusehubRepo:
52 """Create a MusehubRepo for a mist test and return it."""
53 created_at = datetime.now(tz=timezone.utc)
54 owner_id = compute_identity_id(owner.encode())
55 repo_id = compute_repo_id(owner_id, mid, "code", created_at.isoformat())
56 repo = MusehubRepo(
57 repo_id=repo_id,
58 name=mid, owner=owner, slug=mid,
59 visibility=visibility, owner_user_id=owner_id,
60 created_at=created_at, updated_at=created_at,
61 )
62 db_session.add(repo)
63 await db_session.flush()
64 return repo
65
66 _PY_CONTENT = "def hello():\n return 'hello world'\n"
67
68
69 def _payload(**overrides: JSONValue) -> JSONObject:
70 base: JSONObject = {
71 "filename": f"sec_{secrets.token_hex(4)}.py",
72 "content": _PY_CONTENT + secrets.token_hex(16), # unique content per call
73 "visibility": "public",
74 }
75 base.update(overrides)
76 return base
77
78
79 async def _create(client: AsyncClient, headers: StrDict, **overrides: JSONValue) -> JSONObject:
80 r = await client.post("/api/mists", json=_payload(**overrides), headers=headers)
81 assert r.status_code == 201, r.text
82 return dict(r.json())
83
84
85 # ═══════════════════════════════════════════════════════════════════════════════
86 # Filename attacks
87 # ═══════════════════════════════════════════════════════════════════════════════
88
89 class TestFilenameAttacks:
90 """POST /api/mists with malicious filenames must be rejected (422)."""
91
92 @pytest.mark.anyio
93 async def test_path_traversal_dotdot(
94 self, client: AsyncClient, auth_headers: StrDict
95 ) -> None:
96 r = await client.post(
97 "/api/mists",
98 json=_payload(filename="../evil.py"),
99 headers=auth_headers,
100 )
101 assert r.status_code == 422
102
103 @pytest.mark.anyio
104 async def test_path_traversal_deep(
105 self, client: AsyncClient, auth_headers: StrDict
106 ) -> None:
107 r = await client.post(
108 "/api/mists",
109 json=_payload(filename="../../etc/passwd"),
110 headers=auth_headers,
111 )
112 assert r.status_code == 422
113
114 @pytest.mark.anyio
115 async def test_null_byte(
116 self, client: AsyncClient, auth_headers: StrDict
117 ) -> None:
118 r = await client.post(
119 "/api/mists",
120 json=_payload(filename="evil\x00.py"),
121 headers=auth_headers,
122 )
123 assert r.status_code == 422
124
125 @pytest.mark.anyio
126 async def test_forward_slash_separator(
127 self, client: AsyncClient, auth_headers: StrDict
128 ) -> None:
129 r = await client.post(
130 "/api/mists",
131 json=_payload(filename="subdir/evil.py"),
132 headers=auth_headers,
133 )
134 assert r.status_code == 422
135
136 @pytest.mark.anyio
137 async def test_backslash_separator(
138 self, client: AsyncClient, auth_headers: StrDict
139 ) -> None:
140 r = await client.post(
141 "/api/mists",
142 json=_payload(filename="subdir\\evil.py"),
143 headers=auth_headers,
144 )
145 assert r.status_code == 422
146
147 @pytest.mark.anyio
148 async def test_control_character_tab(
149 self, client: AsyncClient, auth_headers: StrDict
150 ) -> None:
151 r = await client.post(
152 "/api/mists",
153 json=_payload(filename="evil\t.py"),
154 headers=auth_headers,
155 )
156 assert r.status_code == 422
157
158 @pytest.mark.anyio
159 async def test_control_character_newline(
160 self, client: AsyncClient, auth_headers: StrDict
161 ) -> None:
162 r = await client.post(
163 "/api/mists",
164 json=_payload(filename="evil\n.py"),
165 headers=auth_headers,
166 )
167 assert r.status_code == 422
168
169 @pytest.mark.anyio
170 async def test_ansi_escape_sequence(
171 self, client: AsyncClient, auth_headers: StrDict
172 ) -> None:
173 r = await client.post(
174 "/api/mists",
175 json=_payload(filename="\x1b[31mevil\x1b[0m.py"),
176 headers=auth_headers,
177 )
178 assert r.status_code == 422
179
180 @pytest.mark.anyio
181 async def test_overlong_filename(
182 self, client: AsyncClient, auth_headers: StrDict
183 ) -> None:
184 r = await client.post(
185 "/api/mists",
186 json=_payload(filename=f"{'a' * 256}.py"),
187 headers=auth_headers,
188 )
189 assert r.status_code == 422
190
191 @pytest.mark.anyio
192 async def test_empty_filename_rejected(
193 self, client: AsyncClient, auth_headers: StrDict
194 ) -> None:
195 r = await client.post(
196 "/api/mists",
197 json=_payload(filename=""),
198 headers=auth_headers,
199 )
200 assert r.status_code == 422
201
202 @pytest.mark.anyio
203 async def test_valid_filename_accepted(
204 self, client: AsyncClient, auth_headers: StrDict
205 ) -> None:
206 """Confirm the gate accepts ordinary safe filenames."""
207 r = await client.post(
208 "/api/mists",
209 json=_payload(filename="valid_name.py"),
210 headers=auth_headers,
211 )
212 assert r.status_code == 201
213
214
215 # ═══════════════════════════════════════════════════════════════════════════════
216 # Content injection
217 # ═══════════════════════════════════════════════════════════════════════════════
218
219 class TestContentInjection:
220 """HTML/JS payloads must be stored verbatim; no server-side stripping."""
221
222 @pytest.mark.anyio
223 async def test_xss_script_tag_stored_verbatim(
224 self,
225 client: AsyncClient,
226 auth_headers: StrDict,
227 db_session: AsyncSession,
228 ) -> None:
229 xss = '<script>alert("xss")</script>'
230 body = _payload(content=xss + secrets.token_hex(16))
231 r = await client.post("/api/mists", json=body, headers=auth_headers)
232 assert r.status_code == 201
233 mist_id = r.json()["mistId"]
234
235 r2 = await client.get(f"/api/mists/{mist_id}")
236 assert r2.status_code == 200
237 assert xss in r2.json()["content"], "XSS payload must be stored verbatim"
238
239 @pytest.mark.anyio
240 async def test_html_entity_stored_verbatim(
241 self,
242 client: AsyncClient,
243 auth_headers: StrDict,
244 db_session: AsyncSession,
245 ) -> None:
246 payload = '<img src=x onerror=alert(1)> &lt;not-encoded&gt;'
247 body = _payload(content=payload + secrets.token_hex(16))
248 r = await client.post("/api/mists", json=body, headers=auth_headers)
249 assert r.status_code == 201
250 mist_id = r.json()["mistId"]
251
252 r2 = await client.get(f"/api/mists/{mist_id}")
253 assert r2.status_code == 200
254 # Content stored as-is β€” sanitisation is the template's responsibility.
255 assert payload in r2.json()["content"]
256
257 @pytest.mark.anyio
258 async def test_unicode_content_roundtrips(
259 self,
260 client: AsyncClient,
261 auth_headers: StrDict,
262 db_session: AsyncSession,
263 ) -> None:
264 unicode_content = f"# ζ—₯本θͺžγƒ†γ‚Ήγƒˆ\nprint('γ“γ‚“γ«γ‘γ―δΈ–η•Œ')\n{secrets.token_hex(16)}"
265 body = _payload(content=unicode_content)
266 r = await client.post("/api/mists", json=body, headers=auth_headers)
267 assert r.status_code == 201
268 mist_id = r.json()["mistId"]
269
270 r2 = await client.get(f"/api/mists/{mist_id}")
271 assert r2.status_code == 200
272 assert unicode_content in r2.json()["content"]
273
274
275 # ═══════════════════════════════════════════════════════════════════════════════
276 # Access control
277 # ═══════════════════════════════════════════════════════════════════════════════
278
279 class TestAccessControl:
280 """Secret mists invisible to non-owners; non-owner mutations blocked."""
281
282 @pytest.mark.anyio
283 async def test_secret_mist_not_in_other_owners_list(
284 self,
285 client: AsyncClient,
286 auth_headers: StrDict,
287 db_session: AsyncSession,
288 ) -> None:
289 # Create a secret mist owned by "otheruser" directly via service layer.
290 # auth_headers authenticates as "testuser" β€” a legitimate non-owner.
291 from muse.plugins.mist.plugin import compute_mist_id
292 from musehub.services.musehub_mists import create_mist as _svc_create
293
294 content = f"secret_list {secrets.token_hex(16)}"
295 mid = compute_mist_id(content.encode())
296 repo = await _make_mist_repo(db_session, mid, "otheruser", "secret")
297 await _svc_create(
298 db_session, mist_id=mid, filename="secret.py", content=content,
299 owner="otheruser", repo_id=str(repo.repo_id), visibility="secret",
300 )
301 await db_session.commit()
302
303 # testuser (non-owner) fetching otheruser's list must not see the secret mist.
304 r = await client.get("/api/otheruser/mists", headers=auth_headers)
305 assert r.status_code == 200
306 ids = [m["mistId"] for m in r.json()["mists"]]
307 assert mid not in ids, "Secret mist must not appear in non-owner's list view"
308
309 @pytest.mark.anyio
310 async def test_secret_mist_not_in_explore(
311 self,
312 client: AsyncClient,
313 auth_headers: StrDict,
314 db_session: AsyncSession,
315 ) -> None:
316 mist = await _create(client, auth_headers, visibility="secret")
317 mist_id = mist["mistId"]
318
319 r = await client.get("/api/mists/explore")
320 assert r.status_code == 200
321 ids = [m["mistId"] for m in r.json()["mists"]]
322 assert mist_id not in ids, "Secret mist must not appear in explore feed"
323
324 @pytest.mark.anyio
325 async def test_secret_mist_detail_returns_403_for_non_owner(
326 self,
327 client: AsyncClient,
328 auth_headers: StrDict,
329 db_session: AsyncSession,
330 ) -> None:
331 # Create a secret mist owned by "otheruser" directly via service layer.
332 # testuser (from auth_headers override) is the authenticated non-owner caller.
333 from muse.plugins.mist.plugin import compute_mist_id
334 from musehub.services.musehub_mists import create_mist as _svc_create
335
336 content = f"secret_detail {secrets.token_hex(16)}"
337 mid = compute_mist_id(content.encode())
338 repo = await _make_mist_repo(db_session, mid, "otheruser", "secret")
339 await _svc_create(
340 db_session, mist_id=mid, filename="secret.py", content=content,
341 owner="otheruser", repo_id=str(repo.repo_id), visibility="secret",
342 )
343 await db_session.commit()
344
345 # testuser is authenticated but is not the owner of this secret mist.
346 r = await client.get(f"/api/mists/{mid}", headers=auth_headers)
347 assert r.status_code in (403, 404), (
348 "Secret mist must not be accessible to non-owner (even authenticated)"
349 )
350
351 @pytest.mark.anyio
352 async def test_non_owner_update_returns_404(
353 self,
354 client: AsyncClient,
355 auth_headers: StrDict,
356 db_session: AsyncSession,
357 ) -> None:
358 # Create a mist directly via service layer owned by "otheruser".
359 # auth_headers authenticates as "testuser" β€” a legitimate non-owner caller.
360 from muse.plugins.mist.plugin import compute_mist_id
361 from musehub.services.musehub_mists import create_mist as _svc_create
362
363 content = f"non_owner_upd {secrets.token_hex(16)}"
364 mid = compute_mist_id(content.encode())
365 repo = await _make_mist_repo(db_session, mid, "otheruser")
366 await _svc_create(
367 db_session, mist_id=mid, filename="f.py", content=content,
368 owner="otheruser", repo_id=str(repo.repo_id),
369 )
370 await db_session.commit()
371
372 r = await client.patch(
373 f"/api/mists/{mid}",
374 json={"title": "Hijacked"},
375 headers=auth_headers, # testuser β‰  otheruser
376 )
377 assert r.status_code in (403, 404)
378
379 @pytest.mark.anyio
380 async def test_non_owner_delete_returns_404(
381 self,
382 client: AsyncClient,
383 auth_headers: StrDict,
384 db_session: AsyncSession,
385 ) -> None:
386 from muse.plugins.mist.plugin import compute_mist_id
387 from musehub.services.musehub_mists import create_mist as _svc_create
388
389 content = f"non_owner_del {secrets.token_hex(16)}"
390 mid = compute_mist_id(content.encode())
391 repo = await _make_mist_repo(db_session, mid, "otheruser")
392 await _svc_create(
393 db_session, mist_id=mid, filename="f.py", content=content,
394 owner="otheruser", repo_id=str(repo.repo_id),
395 )
396 await db_session.commit()
397
398 r = await client.delete(
399 f"/api/mists/{mid}",
400 headers=auth_headers, # testuser β‰  otheruser
401 )
402 assert r.status_code in (403, 404)
403
404 @pytest.mark.anyio
405 async def test_unauthenticated_create_returns_401(
406 self, client: AsyncClient
407 ) -> None:
408 r = await client.post("/api/mists", json=_payload())
409 assert r.status_code == 401
410
411 @pytest.mark.anyio
412 async def test_unauthenticated_update_returns_401(
413 self, client: AsyncClient, db_session: AsyncSession
414 ) -> None:
415 # Create via service layer so auth_headers fixture is NOT active.
416 from muse.plugins.mist.plugin import compute_mist_id
417 from musehub.services.musehub_mists import create_mist as _svc_create
418
419 content = f"unauth_upd {secrets.token_hex(16)}"
420 mid = compute_mist_id(content.encode())
421 repo = await _make_mist_repo(db_session, mid, "testuser")
422 await _svc_create(
423 db_session, mist_id=mid, filename="f.py", content=content,
424 owner="testuser", repo_id=str(repo.repo_id),
425 )
426 await db_session.commit()
427
428 r = await client.patch(f"/api/mists/{mid}", json={"title": "x"})
429 assert r.status_code == 401
430
431 @pytest.mark.anyio
432 async def test_unauthenticated_delete_returns_401(
433 self, client: AsyncClient, db_session: AsyncSession
434 ) -> None:
435 from muse.plugins.mist.plugin import compute_mist_id
436 from musehub.services.musehub_mists import create_mist as _svc_create
437
438 content = f"unauth_del {secrets.token_hex(16)}"
439 mid = compute_mist_id(content.encode())
440 repo = await _make_mist_repo(db_session, mid, "testuser")
441 await _svc_create(
442 db_session, mist_id=mid, filename="f.py", content=content,
443 owner="testuser", repo_id=str(repo.repo_id),
444 )
445 await db_session.commit()
446
447 r = await client.delete(f"/api/mists/{mid}")
448 assert r.status_code == 401
449
450 @pytest.mark.anyio
451 async def test_owner_can_see_own_secret_mist(
452 self,
453 client: AsyncClient,
454 auth_headers: StrDict,
455 db_session: AsyncSession,
456 ) -> None:
457 mist = await _create(client, auth_headers, visibility="secret")
458 mist_id = mist["mistId"]
459
460 r = await client.get(f"/api/mists/{mist_id}", headers=auth_headers)
461 assert r.status_code == 200
462 assert r.json()["mistId"] == mist_id
463
464
465 # ═══════════════════════════════════════════════════════════════════════════════
466 # Collision resistance
467 # ═══════════════════════════════════════════════════════════════════════════════
468
469 class TestCollisionResistance:
470 """Identical content bytes β†’ identical mist_id β†’ POST returns 409."""
471
472 @pytest.mark.anyio
473 async def test_duplicate_content_returns_409(
474 self,
475 client: AsyncClient,
476 auth_headers: StrDict,
477 db_session: AsyncSession,
478 ) -> None:
479 fixed_content = "def idempotent(): return 42\n"
480 body = _payload(content=fixed_content, filename="idempotent.py")
481
482 r1 = await client.post("/api/mists", json=body, headers=auth_headers)
483 assert r1.status_code == 201
484 mist_id = r1.json()["mistId"]
485
486 r2 = await client.post("/api/mists", json=body, headers=auth_headers)
487 assert r2.status_code == 409, (
488 "Re-posting identical content must return 409 (content-addressed)"
489 )
490
491 @pytest.mark.anyio
492 async def test_different_content_different_id(
493 self,
494 client: AsyncClient,
495 auth_headers: StrDict,
496 db_session: AsyncSession,
497 ) -> None:
498 r1 = await _create(client, auth_headers, content=f"content_a {secrets.token_hex(16)}")
499 r2 = await _create(client, auth_headers, content=f"content_b {secrets.token_hex(16)}")
500 assert r1["mistId"] != r2["mistId"]
501
502 @pytest.mark.anyio
503 async def test_mist_id_deterministic_from_content(
504 self,
505 client: AsyncClient,
506 auth_headers: StrDict,
507 db_session: AsyncSession,
508 ) -> None:
509 """mist_id is deterministic β€” recomputing it offline verifies integrity."""
510 from muse.plugins.mist.plugin import compute_mist_id
511
512 content = f"def check(): pass\n# unique: {secrets.token_hex(16)}"
513 r = await _create(client, auth_headers, content=content)
514
515 expected_id = compute_mist_id(content.encode("utf-8"))
516 assert r["mistId"] == expected_id
517
518
519 # ═══════════════════════════════════════════════════════════════════════════════
520 # Large content rejection
521 # ═══════════════════════════════════════════════════════════════════════════════
522
523 class TestLargeContentRejection:
524 """Requests whose body exceeds 10 MiB must be rejected (413)."""
525
526 @pytest.mark.anyio
527 async def test_oversized_content_returns_413(
528 self,
529 client: AsyncClient,
530 auth_headers: StrDict,
531 db_session: AsyncSession,
532 ) -> None:
533 # 11 MiB of ASCII content β€” well above the 10 MiB middleware cap.
534 oversized = "x" * (11 * 1024 * 1024)
535 body: JSONObject = {
536 "filename": "large.py",
537 "content": oversized,
538 "visibility": "public",
539 }
540 r = await client.post("/api/mists", json=body, headers=auth_headers)
541 assert r.status_code == 413, (
542 "Content > 10 MiB must be rejected by ContentSizeLimitMiddleware"
543 )
544
545 @pytest.mark.anyio
546 async def test_near_limit_content_accepted(
547 self,
548 client: AsyncClient,
549 auth_headers: StrDict,
550 db_session: AsyncSession,
551 ) -> None:
552 """Content just under 1 MiB should succeed (sanity check)."""
553 content = "a" * (512 * 1024) # 512 KiB β€” well within limit
554 r = await client.post(
555 "/api/mists",
556 json=_payload(content=content),
557 headers=auth_headers,
558 )
559 assert r.status_code == 201
560
561
562 # ═══════════════════════════════════════════════════════════════════════════════
563 # Fork depth enforcement
564 # ═══════════════════════════════════════════════════════════════════════════════
565
566 class TestForkDepthEnforcement:
567 """Fork chain is capped at depth 5; further forks must return 422."""
568
569 @pytest.mark.anyio
570 async def test_fork_chain_to_max_depth(
571 self,
572 client: AsyncClient,
573 auth_headers: StrDict,
574 db_session: AsyncSession,
575 ) -> None:
576 # Create the root mist.
577 root = await _create(client, auth_headers)
578 current_id = root["mistId"]
579
580 # Fork 5 times β€” all must succeed.
581 for depth in range(1, 6):
582 r = await client.post(
583 f"/api/mists/{current_id}/fork", headers=auth_headers
584 )
585 assert r.status_code == 201, (
586 f"Fork at depth {depth} must succeed; got {r.status_code}: {r.text}"
587 )
588 current_id = r.json()["mistId"]
589
590 @pytest.mark.anyio
591 async def test_fork_past_max_depth_returns_422(
592 self,
593 client: AsyncClient,
594 auth_headers: StrDict,
595 db_session: AsyncSession,
596 ) -> None:
597 # Build a chain of depth 5.
598 root = await _create(client, auth_headers)
599 current_id = root["mistId"]
600 for _ in range(5):
601 r = await client.post(
602 f"/api/mists/{current_id}/fork", headers=auth_headers
603 )
604 assert r.status_code == 201
605 current_id = r.json()["mistId"]
606
607 # Forking the depth-5 mist must fail.
608 r = await client.post(
609 f"/api/mists/{current_id}/fork", headers=auth_headers
610 )
611 assert r.status_code == 422, (
612 f"Fork past depth 5 must be rejected; got {r.status_code}: {r.text}"
613 )
614
615 @pytest.mark.anyio
616 async def test_fork_nonexistent_mist_returns_404(
617 self,
618 client: AsyncClient,
619 auth_headers: StrDict,
620 db_session: AsyncSession,
621 ) -> None:
622 r = await client.post(
623 "/api/mists/doesnotexist/fork", headers=auth_headers
624 )
625 assert r.status_code == 404
626
627
628 # ═══════════════════════════════════════════════════════════════════════════════
629 # Tag injection
630 # ═══════════════════════════════════════════════════════════════════════════════
631
632 class TestTagSecurity:
633 """Tags have count and length limits; HTML-special and null-byte tags rejected."""
634
635 @pytest.mark.anyio
636 async def test_too_many_tags_rejected(
637 self, client: AsyncClient, auth_headers: StrDict
638 ) -> None:
639 r = await client.post(
640 "/api/mists",
641 json=_payload(tags=[f"tag{i}" for i in range(11)]), # max is 10
642 headers=auth_headers,
643 )
644 assert r.status_code == 422
645
646 @pytest.mark.anyio
647 async def test_overlong_tag_rejected(
648 self, client: AsyncClient, auth_headers: StrDict
649 ) -> None:
650 r = await client.post(
651 "/api/mists",
652 json=_payload(tags=["a" * 65]), # max is 64
653 headers=auth_headers,
654 )
655 assert r.status_code == 422
656
657 @pytest.mark.anyio
658 async def test_null_byte_in_tag_rejected(
659 self, client: AsyncClient, auth_headers: StrDict
660 ) -> None:
661 r = await client.post(
662 "/api/mists",
663 json=_payload(tags=["evil\x00tag"]),
664 headers=auth_headers,
665 )
666 assert r.status_code == 422