test_mist_security.py
python
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠ breaking
1 day ago
| 1 | """Section 17 — Mists Security Hardening: adversarial and access-control tests. |
| 2 | |
| 3 | Covers every security property stated in the Phase 8 spec: |
| 4 | |
| 5 | Filename attacks Path traversal, null bytes, control characters, |
| 6 | ANSI escape sequences, path separators — all must |
| 7 | return 422 Unprocessable Entity. |
| 8 | |
| 9 | Content injection HTML/JS payloads stored verbatim; no server-side |
| 10 | sanitisation that would alter or strip content. |
| 11 | The embedding contract lives at the template layer |
| 12 | (Jinja2 auto-escapes {{ mist.content | e }}). |
| 13 | |
| 14 | Access control Secret mists invisible to non-owners in list and |
| 15 | explore endpoints; non-owner update/delete blocked; |
| 16 | secret mist detail returns 403 to prevent leaking |
| 17 | existence. |
| 18 | |
| 19 | Collision resistance Identical bytes → identical mist_id → POST returns |
| 20 | 409 Conflict (idempotent, not an error the caller |
| 21 | should retry). |
| 22 | |
| 23 | Large content Body ≥ 11 MiB triggers ContentSizeLimitMiddleware |
| 24 | → 413 Request Entity Too Large before any DB write. |
| 25 | |
| 26 | Fork depth Fork chain capped at 5 levels; attempting to fork |
| 27 | a depth-5 mist returns 422. |
| 28 | """ |
| 29 | from __future__ import annotations |
| 30 | |
| 31 | import secrets |
| 32 | from datetime import datetime, timezone |
| 33 | |
| 34 | import pytest |
| 35 | from httpx import AsyncClient |
| 36 | from sqlalchemy.ext.asyncio import AsyncSession |
| 37 | |
| 38 | from musehub.core.genesis import compute_identity_id, compute_repo_id |
| 39 | from musehub.db.musehub_repo_models import MusehubRepo |
| 40 | from musehub.types.json_types import JSONObject, JSONValue, StrDict |
| 41 | |
| 42 | _OWNER = "testuser" # matches conftest._TEST_HANDLE |
| 43 | _OTHER = "otheruser" |
| 44 | |
| 45 | |
| 46 | async def _make_mist_repo( |
| 47 | db_session: AsyncSession, |
| 48 | mid: str, |
| 49 | owner: str, |
| 50 | visibility: str = "public", |
| 51 | ) -> MusehubRepo: |
| 52 | """Create a MusehubRepo for a mist test and return it.""" |
| 53 | created_at = datetime.now(tz=timezone.utc) |
| 54 | owner_id = compute_identity_id(owner.encode()) |
| 55 | repo_id = compute_repo_id(owner_id, mid, "code", created_at.isoformat()) |
| 56 | repo = MusehubRepo( |
| 57 | repo_id=repo_id, |
| 58 | name=mid, owner=owner, slug=mid, |
| 59 | visibility=visibility, owner_user_id=owner_id, |
| 60 | created_at=created_at, updated_at=created_at, |
| 61 | ) |
| 62 | db_session.add(repo) |
| 63 | await db_session.flush() |
| 64 | return repo |
| 65 | |
| 66 | _PY_CONTENT = "def hello():\n return 'hello world'\n" |
| 67 | |
| 68 | |
| 69 | def _payload(**overrides: JSONValue) -> JSONObject: |
| 70 | base: JSONObject = { |
| 71 | "filename": f"sec_{secrets.token_hex(4)}.py", |
| 72 | "content": _PY_CONTENT + secrets.token_hex(16), # unique content per call |
| 73 | "visibility": "public", |
| 74 | } |
| 75 | base.update(overrides) |
| 76 | return base |
| 77 | |
| 78 | |
| 79 | async def _create(client: AsyncClient, headers: StrDict, **overrides: JSONValue) -> JSONObject: |
| 80 | r = await client.post("/api/mists", json=_payload(**overrides), headers=headers) |
| 81 | assert r.status_code == 201, r.text |
| 82 | return dict(r.json()) |
| 83 | |
| 84 | |
| 85 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 86 | # Filename attacks |
| 87 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 88 | |
| 89 | class TestFilenameAttacks: |
| 90 | """POST /api/mists with malicious filenames must be rejected (422).""" |
| 91 | |
| 92 | @pytest.mark.anyio |
| 93 | async def test_path_traversal_dotdot( |
| 94 | self, client: AsyncClient, auth_headers: StrDict |
| 95 | ) -> None: |
| 96 | r = await client.post( |
| 97 | "/api/mists", |
| 98 | json=_payload(filename="../evil.py"), |
| 99 | headers=auth_headers, |
| 100 | ) |
| 101 | assert r.status_code == 422 |
| 102 | |
| 103 | @pytest.mark.anyio |
| 104 | async def test_path_traversal_deep( |
| 105 | self, client: AsyncClient, auth_headers: StrDict |
| 106 | ) -> None: |
| 107 | r = await client.post( |
| 108 | "/api/mists", |
| 109 | json=_payload(filename="../../etc/passwd"), |
| 110 | headers=auth_headers, |
| 111 | ) |
| 112 | assert r.status_code == 422 |
| 113 | |
| 114 | @pytest.mark.anyio |
| 115 | async def test_null_byte( |
| 116 | self, client: AsyncClient, auth_headers: StrDict |
| 117 | ) -> None: |
| 118 | r = await client.post( |
| 119 | "/api/mists", |
| 120 | json=_payload(filename="evil\x00.py"), |
| 121 | headers=auth_headers, |
| 122 | ) |
| 123 | assert r.status_code == 422 |
| 124 | |
| 125 | @pytest.mark.anyio |
| 126 | async def test_forward_slash_separator( |
| 127 | self, client: AsyncClient, auth_headers: StrDict |
| 128 | ) -> None: |
| 129 | r = await client.post( |
| 130 | "/api/mists", |
| 131 | json=_payload(filename="subdir/evil.py"), |
| 132 | headers=auth_headers, |
| 133 | ) |
| 134 | assert r.status_code == 422 |
| 135 | |
| 136 | @pytest.mark.anyio |
| 137 | async def test_backslash_separator( |
| 138 | self, client: AsyncClient, auth_headers: StrDict |
| 139 | ) -> None: |
| 140 | r = await client.post( |
| 141 | "/api/mists", |
| 142 | json=_payload(filename="subdir\\evil.py"), |
| 143 | headers=auth_headers, |
| 144 | ) |
| 145 | assert r.status_code == 422 |
| 146 | |
| 147 | @pytest.mark.anyio |
| 148 | async def test_control_character_tab( |
| 149 | self, client: AsyncClient, auth_headers: StrDict |
| 150 | ) -> None: |
| 151 | r = await client.post( |
| 152 | "/api/mists", |
| 153 | json=_payload(filename="evil\t.py"), |
| 154 | headers=auth_headers, |
| 155 | ) |
| 156 | assert r.status_code == 422 |
| 157 | |
| 158 | @pytest.mark.anyio |
| 159 | async def test_control_character_newline( |
| 160 | self, client: AsyncClient, auth_headers: StrDict |
| 161 | ) -> None: |
| 162 | r = await client.post( |
| 163 | "/api/mists", |
| 164 | json=_payload(filename="evil\n.py"), |
| 165 | headers=auth_headers, |
| 166 | ) |
| 167 | assert r.status_code == 422 |
| 168 | |
| 169 | @pytest.mark.anyio |
| 170 | async def test_ansi_escape_sequence( |
| 171 | self, client: AsyncClient, auth_headers: StrDict |
| 172 | ) -> None: |
| 173 | r = await client.post( |
| 174 | "/api/mists", |
| 175 | json=_payload(filename="\x1b[31mevil\x1b[0m.py"), |
| 176 | headers=auth_headers, |
| 177 | ) |
| 178 | assert r.status_code == 422 |
| 179 | |
| 180 | @pytest.mark.anyio |
| 181 | async def test_overlong_filename( |
| 182 | self, client: AsyncClient, auth_headers: StrDict |
| 183 | ) -> None: |
| 184 | r = await client.post( |
| 185 | "/api/mists", |
| 186 | json=_payload(filename=f"{'a' * 256}.py"), |
| 187 | headers=auth_headers, |
| 188 | ) |
| 189 | assert r.status_code == 422 |
| 190 | |
| 191 | @pytest.mark.anyio |
| 192 | async def test_empty_filename_rejected( |
| 193 | self, client: AsyncClient, auth_headers: StrDict |
| 194 | ) -> None: |
| 195 | r = await client.post( |
| 196 | "/api/mists", |
| 197 | json=_payload(filename=""), |
| 198 | headers=auth_headers, |
| 199 | ) |
| 200 | assert r.status_code == 422 |
| 201 | |
| 202 | @pytest.mark.anyio |
| 203 | async def test_valid_filename_accepted( |
| 204 | self, client: AsyncClient, auth_headers: StrDict |
| 205 | ) -> None: |
| 206 | """Confirm the gate accepts ordinary safe filenames.""" |
| 207 | r = await client.post( |
| 208 | "/api/mists", |
| 209 | json=_payload(filename="valid_name.py"), |
| 210 | headers=auth_headers, |
| 211 | ) |
| 212 | assert r.status_code == 201 |
| 213 | |
| 214 | |
| 215 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 216 | # Content injection |
| 217 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 218 | |
| 219 | class TestContentInjection: |
| 220 | """HTML/JS payloads must be stored verbatim; no server-side stripping.""" |
| 221 | |
| 222 | @pytest.mark.anyio |
| 223 | async def test_xss_script_tag_stored_verbatim( |
| 224 | self, |
| 225 | client: AsyncClient, |
| 226 | auth_headers: StrDict, |
| 227 | db_session: AsyncSession, |
| 228 | ) -> None: |
| 229 | xss = '<script>alert("xss")</script>' |
| 230 | body = _payload(content=xss + secrets.token_hex(16)) |
| 231 | r = await client.post("/api/mists", json=body, headers=auth_headers) |
| 232 | assert r.status_code == 201 |
| 233 | mist_id = r.json()["mistId"] |
| 234 | |
| 235 | r2 = await client.get(f"/api/mists/{mist_id}") |
| 236 | assert r2.status_code == 200 |
| 237 | assert xss in r2.json()["content"], "XSS payload must be stored verbatim" |
| 238 | |
| 239 | @pytest.mark.anyio |
| 240 | async def test_html_entity_stored_verbatim( |
| 241 | self, |
| 242 | client: AsyncClient, |
| 243 | auth_headers: StrDict, |
| 244 | db_session: AsyncSession, |
| 245 | ) -> None: |
| 246 | payload = '<img src=x onerror=alert(1)> <not-encoded>' |
| 247 | body = _payload(content=payload + secrets.token_hex(16)) |
| 248 | r = await client.post("/api/mists", json=body, headers=auth_headers) |
| 249 | assert r.status_code == 201 |
| 250 | mist_id = r.json()["mistId"] |
| 251 | |
| 252 | r2 = await client.get(f"/api/mists/{mist_id}") |
| 253 | assert r2.status_code == 200 |
| 254 | # Content stored as-is — sanitisation is the template's responsibility. |
| 255 | assert payload in r2.json()["content"] |
| 256 | |
| 257 | @pytest.mark.anyio |
| 258 | async def test_unicode_content_roundtrips( |
| 259 | self, |
| 260 | client: AsyncClient, |
| 261 | auth_headers: StrDict, |
| 262 | db_session: AsyncSession, |
| 263 | ) -> None: |
| 264 | unicode_content = f"# 日本語テスト\nprint('こんにちは世界')\n{secrets.token_hex(16)}" |
| 265 | body = _payload(content=unicode_content) |
| 266 | r = await client.post("/api/mists", json=body, headers=auth_headers) |
| 267 | assert r.status_code == 201 |
| 268 | mist_id = r.json()["mistId"] |
| 269 | |
| 270 | r2 = await client.get(f"/api/mists/{mist_id}") |
| 271 | assert r2.status_code == 200 |
| 272 | assert unicode_content in r2.json()["content"] |
| 273 | |
| 274 | |
| 275 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 276 | # Access control |
| 277 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 278 | |
| 279 | class TestAccessControl: |
| 280 | """Secret mists invisible to non-owners; non-owner mutations blocked.""" |
| 281 | |
| 282 | @pytest.mark.anyio |
| 283 | async def test_secret_mist_not_in_other_owners_list( |
| 284 | self, |
| 285 | client: AsyncClient, |
| 286 | auth_headers: StrDict, |
| 287 | db_session: AsyncSession, |
| 288 | ) -> None: |
| 289 | # Create a secret mist owned by "otheruser" directly via service layer. |
| 290 | # auth_headers authenticates as "testuser" — a legitimate non-owner. |
| 291 | from muse.plugins.mist.plugin import compute_mist_id |
| 292 | from musehub.services.musehub_mists import create_mist as _svc_create |
| 293 | |
| 294 | content = f"secret_list {secrets.token_hex(16)}" |
| 295 | mid = compute_mist_id(content.encode()) |
| 296 | repo = await _make_mist_repo(db_session, mid, "otheruser", "secret") |
| 297 | await _svc_create( |
| 298 | db_session, mist_id=mid, filename="secret.py", content=content, |
| 299 | owner="otheruser", repo_id=str(repo.repo_id), visibility="secret", |
| 300 | ) |
| 301 | await db_session.commit() |
| 302 | |
| 303 | # testuser (non-owner) fetching otheruser's list must not see the secret mist. |
| 304 | r = await client.get("/api/otheruser/mists", headers=auth_headers) |
| 305 | assert r.status_code == 200 |
| 306 | ids = [m["mistId"] for m in r.json()["mists"]] |
| 307 | assert mid not in ids, "Secret mist must not appear in non-owner's list view" |
| 308 | |
| 309 | @pytest.mark.anyio |
| 310 | async def test_secret_mist_not_in_explore( |
| 311 | self, |
| 312 | client: AsyncClient, |
| 313 | auth_headers: StrDict, |
| 314 | db_session: AsyncSession, |
| 315 | ) -> None: |
| 316 | mist = await _create(client, auth_headers, visibility="secret") |
| 317 | mist_id = mist["mistId"] |
| 318 | |
| 319 | r = await client.get("/api/mists/explore") |
| 320 | assert r.status_code == 200 |
| 321 | ids = [m["mistId"] for m in r.json()["mists"]] |
| 322 | assert mist_id not in ids, "Secret mist must not appear in explore feed" |
| 323 | |
| 324 | @pytest.mark.anyio |
| 325 | async def test_secret_mist_detail_returns_403_for_non_owner( |
| 326 | self, |
| 327 | client: AsyncClient, |
| 328 | auth_headers: StrDict, |
| 329 | db_session: AsyncSession, |
| 330 | ) -> None: |
| 331 | # Create a secret mist owned by "otheruser" directly via service layer. |
| 332 | # testuser (from auth_headers override) is the authenticated non-owner caller. |
| 333 | from muse.plugins.mist.plugin import compute_mist_id |
| 334 | from musehub.services.musehub_mists import create_mist as _svc_create |
| 335 | |
| 336 | content = f"secret_detail {secrets.token_hex(16)}" |
| 337 | mid = compute_mist_id(content.encode()) |
| 338 | repo = await _make_mist_repo(db_session, mid, "otheruser", "secret") |
| 339 | await _svc_create( |
| 340 | db_session, mist_id=mid, filename="secret.py", content=content, |
| 341 | owner="otheruser", repo_id=str(repo.repo_id), visibility="secret", |
| 342 | ) |
| 343 | await db_session.commit() |
| 344 | |
| 345 | # testuser is authenticated but is not the owner of this secret mist. |
| 346 | r = await client.get(f"/api/mists/{mid}", headers=auth_headers) |
| 347 | assert r.status_code in (403, 404), ( |
| 348 | "Secret mist must not be accessible to non-owner (even authenticated)" |
| 349 | ) |
| 350 | |
| 351 | @pytest.mark.anyio |
| 352 | async def test_non_owner_update_returns_404( |
| 353 | self, |
| 354 | client: AsyncClient, |
| 355 | auth_headers: StrDict, |
| 356 | db_session: AsyncSession, |
| 357 | ) -> None: |
| 358 | # Create a mist directly via service layer owned by "otheruser". |
| 359 | # auth_headers authenticates as "testuser" — a legitimate non-owner caller. |
| 360 | from muse.plugins.mist.plugin import compute_mist_id |
| 361 | from musehub.services.musehub_mists import create_mist as _svc_create |
| 362 | |
| 363 | content = f"non_owner_upd {secrets.token_hex(16)}" |
| 364 | mid = compute_mist_id(content.encode()) |
| 365 | repo = await _make_mist_repo(db_session, mid, "otheruser") |
| 366 | await _svc_create( |
| 367 | db_session, mist_id=mid, filename="f.py", content=content, |
| 368 | owner="otheruser", repo_id=str(repo.repo_id), |
| 369 | ) |
| 370 | await db_session.commit() |
| 371 | |
| 372 | r = await client.patch( |
| 373 | f"/api/mists/{mid}", |
| 374 | json={"title": "Hijacked"}, |
| 375 | headers=auth_headers, # testuser ≠ otheruser |
| 376 | ) |
| 377 | assert r.status_code in (403, 404) |
| 378 | |
| 379 | @pytest.mark.anyio |
| 380 | async def test_non_owner_delete_returns_404( |
| 381 | self, |
| 382 | client: AsyncClient, |
| 383 | auth_headers: StrDict, |
| 384 | db_session: AsyncSession, |
| 385 | ) -> None: |
| 386 | from muse.plugins.mist.plugin import compute_mist_id |
| 387 | from musehub.services.musehub_mists import create_mist as _svc_create |
| 388 | |
| 389 | content = f"non_owner_del {secrets.token_hex(16)}" |
| 390 | mid = compute_mist_id(content.encode()) |
| 391 | repo = await _make_mist_repo(db_session, mid, "otheruser") |
| 392 | await _svc_create( |
| 393 | db_session, mist_id=mid, filename="f.py", content=content, |
| 394 | owner="otheruser", repo_id=str(repo.repo_id), |
| 395 | ) |
| 396 | await db_session.commit() |
| 397 | |
| 398 | r = await client.delete( |
| 399 | f"/api/mists/{mid}", |
| 400 | headers=auth_headers, # testuser ≠ otheruser |
| 401 | ) |
| 402 | assert r.status_code in (403, 404) |
| 403 | |
| 404 | @pytest.mark.anyio |
| 405 | async def test_unauthenticated_create_returns_401( |
| 406 | self, client: AsyncClient |
| 407 | ) -> None: |
| 408 | r = await client.post("/api/mists", json=_payload()) |
| 409 | assert r.status_code == 401 |
| 410 | |
| 411 | @pytest.mark.anyio |
| 412 | async def test_unauthenticated_update_returns_401( |
| 413 | self, client: AsyncClient, db_session: AsyncSession |
| 414 | ) -> None: |
| 415 | # Create via service layer so auth_headers fixture is NOT active. |
| 416 | from muse.plugins.mist.plugin import compute_mist_id |
| 417 | from musehub.services.musehub_mists import create_mist as _svc_create |
| 418 | |
| 419 | content = f"unauth_upd {secrets.token_hex(16)}" |
| 420 | mid = compute_mist_id(content.encode()) |
| 421 | repo = await _make_mist_repo(db_session, mid, "testuser") |
| 422 | await _svc_create( |
| 423 | db_session, mist_id=mid, filename="f.py", content=content, |
| 424 | owner="testuser", repo_id=str(repo.repo_id), |
| 425 | ) |
| 426 | await db_session.commit() |
| 427 | |
| 428 | r = await client.patch(f"/api/mists/{mid}", json={"title": "x"}) |
| 429 | assert r.status_code == 401 |
| 430 | |
| 431 | @pytest.mark.anyio |
| 432 | async def test_unauthenticated_delete_returns_401( |
| 433 | self, client: AsyncClient, db_session: AsyncSession |
| 434 | ) -> None: |
| 435 | from muse.plugins.mist.plugin import compute_mist_id |
| 436 | from musehub.services.musehub_mists import create_mist as _svc_create |
| 437 | |
| 438 | content = f"unauth_del {secrets.token_hex(16)}" |
| 439 | mid = compute_mist_id(content.encode()) |
| 440 | repo = await _make_mist_repo(db_session, mid, "testuser") |
| 441 | await _svc_create( |
| 442 | db_session, mist_id=mid, filename="f.py", content=content, |
| 443 | owner="testuser", repo_id=str(repo.repo_id), |
| 444 | ) |
| 445 | await db_session.commit() |
| 446 | |
| 447 | r = await client.delete(f"/api/mists/{mid}") |
| 448 | assert r.status_code == 401 |
| 449 | |
| 450 | @pytest.mark.anyio |
| 451 | async def test_owner_can_see_own_secret_mist( |
| 452 | self, |
| 453 | client: AsyncClient, |
| 454 | auth_headers: StrDict, |
| 455 | db_session: AsyncSession, |
| 456 | ) -> None: |
| 457 | mist = await _create(client, auth_headers, visibility="secret") |
| 458 | mist_id = mist["mistId"] |
| 459 | |
| 460 | r = await client.get(f"/api/mists/{mist_id}", headers=auth_headers) |
| 461 | assert r.status_code == 200 |
| 462 | assert r.json()["mistId"] == mist_id |
| 463 | |
| 464 | |
| 465 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 466 | # Collision resistance |
| 467 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 468 | |
| 469 | class TestCollisionResistance: |
| 470 | """Identical content bytes → identical mist_id → POST returns 409.""" |
| 471 | |
| 472 | @pytest.mark.anyio |
| 473 | async def test_duplicate_content_returns_409( |
| 474 | self, |
| 475 | client: AsyncClient, |
| 476 | auth_headers: StrDict, |
| 477 | db_session: AsyncSession, |
| 478 | ) -> None: |
| 479 | fixed_content = "def idempotent(): return 42\n" |
| 480 | body = _payload(content=fixed_content, filename="idempotent.py") |
| 481 | |
| 482 | r1 = await client.post("/api/mists", json=body, headers=auth_headers) |
| 483 | assert r1.status_code == 201 |
| 484 | mist_id = r1.json()["mistId"] |
| 485 | |
| 486 | r2 = await client.post("/api/mists", json=body, headers=auth_headers) |
| 487 | assert r2.status_code == 409, ( |
| 488 | "Re-posting identical content must return 409 (content-addressed)" |
| 489 | ) |
| 490 | |
| 491 | @pytest.mark.anyio |
| 492 | async def test_different_content_different_id( |
| 493 | self, |
| 494 | client: AsyncClient, |
| 495 | auth_headers: StrDict, |
| 496 | db_session: AsyncSession, |
| 497 | ) -> None: |
| 498 | r1 = await _create(client, auth_headers, content=f"content_a {secrets.token_hex(16)}") |
| 499 | r2 = await _create(client, auth_headers, content=f"content_b {secrets.token_hex(16)}") |
| 500 | assert r1["mistId"] != r2["mistId"] |
| 501 | |
| 502 | @pytest.mark.anyio |
| 503 | async def test_mist_id_deterministic_from_content( |
| 504 | self, |
| 505 | client: AsyncClient, |
| 506 | auth_headers: StrDict, |
| 507 | db_session: AsyncSession, |
| 508 | ) -> None: |
| 509 | """mist_id is deterministic — recomputing it offline verifies integrity.""" |
| 510 | from muse.plugins.mist.plugin import compute_mist_id |
| 511 | |
| 512 | content = f"def check(): pass\n# unique: {secrets.token_hex(16)}" |
| 513 | r = await _create(client, auth_headers, content=content) |
| 514 | |
| 515 | expected_id = compute_mist_id(content.encode("utf-8")) |
| 516 | assert r["mistId"] == expected_id |
| 517 | |
| 518 | |
| 519 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 520 | # Large content rejection |
| 521 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 522 | |
| 523 | class TestLargeContentRejection: |
| 524 | """Requests whose body exceeds 10 MiB must be rejected (413).""" |
| 525 | |
| 526 | @pytest.mark.anyio |
| 527 | async def test_oversized_content_returns_413( |
| 528 | self, |
| 529 | client: AsyncClient, |
| 530 | auth_headers: StrDict, |
| 531 | db_session: AsyncSession, |
| 532 | ) -> None: |
| 533 | # 11 MiB of ASCII content — well above the 10 MiB middleware cap. |
| 534 | oversized = "x" * (11 * 1024 * 1024) |
| 535 | body: JSONObject = { |
| 536 | "filename": "large.py", |
| 537 | "content": oversized, |
| 538 | "visibility": "public", |
| 539 | } |
| 540 | r = await client.post("/api/mists", json=body, headers=auth_headers) |
| 541 | assert r.status_code == 413, ( |
| 542 | "Content > 10 MiB must be rejected by ContentSizeLimitMiddleware" |
| 543 | ) |
| 544 | |
| 545 | @pytest.mark.anyio |
| 546 | async def test_near_limit_content_accepted( |
| 547 | self, |
| 548 | client: AsyncClient, |
| 549 | auth_headers: StrDict, |
| 550 | db_session: AsyncSession, |
| 551 | ) -> None: |
| 552 | """Content just under 1 MiB should succeed (sanity check).""" |
| 553 | content = "a" * (512 * 1024) # 512 KiB — well within limit |
| 554 | r = await client.post( |
| 555 | "/api/mists", |
| 556 | json=_payload(content=content), |
| 557 | headers=auth_headers, |
| 558 | ) |
| 559 | assert r.status_code == 201 |
| 560 | |
| 561 | |
| 562 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 563 | # Fork depth enforcement |
| 564 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 565 | |
| 566 | class TestForkDepthEnforcement: |
| 567 | """Fork chain is capped at depth 5; further forks must return 422.""" |
| 568 | |
| 569 | @pytest.mark.anyio |
| 570 | async def test_fork_chain_to_max_depth( |
| 571 | self, |
| 572 | client: AsyncClient, |
| 573 | auth_headers: StrDict, |
| 574 | db_session: AsyncSession, |
| 575 | ) -> None: |
| 576 | # Create the root mist. |
| 577 | root = await _create(client, auth_headers) |
| 578 | current_id = root["mistId"] |
| 579 | |
| 580 | # Fork 5 times — all must succeed. |
| 581 | for depth in range(1, 6): |
| 582 | r = await client.post( |
| 583 | f"/api/mists/{current_id}/fork", headers=auth_headers |
| 584 | ) |
| 585 | assert r.status_code == 201, ( |
| 586 | f"Fork at depth {depth} must succeed; got {r.status_code}: {r.text}" |
| 587 | ) |
| 588 | current_id = r.json()["mistId"] |
| 589 | |
| 590 | @pytest.mark.anyio |
| 591 | async def test_fork_past_max_depth_returns_422( |
| 592 | self, |
| 593 | client: AsyncClient, |
| 594 | auth_headers: StrDict, |
| 595 | db_session: AsyncSession, |
| 596 | ) -> None: |
| 597 | # Build a chain of depth 5. |
| 598 | root = await _create(client, auth_headers) |
| 599 | current_id = root["mistId"] |
| 600 | for _ in range(5): |
| 601 | r = await client.post( |
| 602 | f"/api/mists/{current_id}/fork", headers=auth_headers |
| 603 | ) |
| 604 | assert r.status_code == 201 |
| 605 | current_id = r.json()["mistId"] |
| 606 | |
| 607 | # Forking the depth-5 mist must fail. |
| 608 | r = await client.post( |
| 609 | f"/api/mists/{current_id}/fork", headers=auth_headers |
| 610 | ) |
| 611 | assert r.status_code == 422, ( |
| 612 | f"Fork past depth 5 must be rejected; got {r.status_code}: {r.text}" |
| 613 | ) |
| 614 | |
| 615 | @pytest.mark.anyio |
| 616 | async def test_fork_nonexistent_mist_returns_404( |
| 617 | self, |
| 618 | client: AsyncClient, |
| 619 | auth_headers: StrDict, |
| 620 | db_session: AsyncSession, |
| 621 | ) -> None: |
| 622 | r = await client.post( |
| 623 | "/api/mists/doesnotexist/fork", headers=auth_headers |
| 624 | ) |
| 625 | assert r.status_code == 404 |
| 626 | |
| 627 | |
| 628 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 629 | # Tag injection |
| 630 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 631 | |
| 632 | class TestTagSecurity: |
| 633 | """Tags have count and length limits; HTML-special and null-byte tags rejected.""" |
| 634 | |
| 635 | @pytest.mark.anyio |
| 636 | async def test_too_many_tags_rejected( |
| 637 | self, client: AsyncClient, auth_headers: StrDict |
| 638 | ) -> None: |
| 639 | r = await client.post( |
| 640 | "/api/mists", |
| 641 | json=_payload(tags=[f"tag{i}" for i in range(11)]), # max is 10 |
| 642 | headers=auth_headers, |
| 643 | ) |
| 644 | assert r.status_code == 422 |
| 645 | |
| 646 | @pytest.mark.anyio |
| 647 | async def test_overlong_tag_rejected( |
| 648 | self, client: AsyncClient, auth_headers: StrDict |
| 649 | ) -> None: |
| 650 | r = await client.post( |
| 651 | "/api/mists", |
| 652 | json=_payload(tags=["a" * 65]), # max is 64 |
| 653 | headers=auth_headers, |
| 654 | ) |
| 655 | assert r.status_code == 422 |
| 656 | |
| 657 | @pytest.mark.anyio |
| 658 | async def test_null_byte_in_tag_rejected( |
| 659 | self, client: AsyncClient, auth_headers: StrDict |
| 660 | ) -> None: |
| 661 | r = await client.post( |
| 662 | "/api/mists", |
| 663 | json=_payload(tags=["evil\x00tag"]), |
| 664 | headers=auth_headers, |
| 665 | ) |
| 666 | assert r.status_code == 422 |
File History
1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠
1 day ago