"""Section 18 — Mists Stress Suite (Tier 4). Covers the three stress scenarios from the Phase 8 spec: Concurrent creates 50 mists created concurrently via asyncio.gather — all must succeed with unique mist_ids; no DB-level serialisation errors or integrity violations. Large explore 200 mists seeded directly into the DB; the explore endpoint must return all of them across cursor-paginated requests with correct total, no duplicates, and no dropped rows. Fork chains Deep fork chain built to the platform maximum (depth 5) via repeated POST /api/mists/{id}/fork calls; the chain must complete without error and the depth-5 node must carry fork_depth=5 and a populated fork_parent_id. All tests target the HTTP layer (AsyncClient) to exercise the full stack: auth, validation, service, and ORM. """ from __future__ import annotations import asyncio import secrets import time from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.types.json_types import JSONObject, JSONValue, StrDict _OWNER = "testuser" # matches conftest._TEST_HANDLE def _payload(**overrides: JSONValue) -> JSONObject: base: JSONObject = { "filename": f"stress_{secrets.token_hex(4)}.py", "content": f"# stress test\nvalue = {secrets.token_hex(16)!r}\n", "visibility": "public", } base.update(overrides) return base async def _create(client: AsyncClient, headers: StrDict, **overrides: JSONValue) -> JSONObject: r = await client.post("/api/mists", json=_payload(**overrides), headers=headers) assert r.status_code == 201, r.text return dict(r.json()) # ═══════════════════════════════════════════════════════════════════════════════ # Concurrent creates # ═══════════════════════════════════════════════════════════════════════════════ class TestConcurrentCreates: """15 mists created concurrently — all must succeed, all IDs unique.""" @pytest.mark.anyio async def test_concurrent_creates_all_succeed( self, client: AsyncClient, auth_headers: StrDict ) -> None: n = 15 async def _one() -> JSONObject: return await _create(client, auth_headers) results = await asyncio.gather(*[_one() for _ in range(n)]) assert len(results) == n, f"Expected {n} results, got {len(results)}" ids = [r["mistId"] for r in results] assert len(set(ids)) == n, ( f"Expected {n} unique mist IDs, got {len(set(ids))} — " "duplicate content across concurrent requests" ) @pytest.mark.anyio async def test_concurrent_creates_all_visible_in_list( self, client: AsyncClient, auth_headers: StrDict ) -> None: """Created mists must all appear in the owner list.""" unique_tag = secrets.token_hex(6) n = 15 async def _one() -> str: r = await _create(client, auth_headers, tags=[unique_tag]) return r["mistId"] created_ids = set(await asyncio.gather(*[_one() for _ in range(n)])) # Paginate the owner list and collect all IDs with our tag. found: set[str] = set() cursor: str | None = None while True: params: JSONObject = {"limit": 100} if cursor: params["cursor"] = cursor r = await client.get(f"/api/{_OWNER}/mists", params=params) assert r.status_code == 200 body = r.json() for m in body["mists"]: if unique_tag in (m.get("tags") or []): found.add(m["mistId"]) cursor = body.get("nextCursor") if not cursor: break assert created_ids == found, ( f"Created {len(created_ids)} mists but found {len(found)} with tag" ) # ═══════════════════════════════════════════════════════════════════════════════ # Large explore — 200 mist seed + full pagination # ═══════════════════════════════════════════════════════════════════════════════ class TestLargeExplore: """Seed 200 mists directly into the DB then paginate explore to collect all.""" @pytest.mark.anyio async def test_200_mist_explore_no_duplicates( self, client: AsyncClient, db_session: AsyncSession ) -> None: from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create # Use a unique artifact_type so this test's rows are isolated. unique_type = f"stress_{secrets.token_hex(4)}" n = 200 _stress_owner_id = compute_identity_id(b"stressuser") for i in range(n): content = f"# stress explore {i}\nvalue = {secrets.token_hex(16)!r}\n" mid = compute_mist_id(content.encode()) _slug = f"mist_{mid}" _created_at = datetime.now(tz=timezone.utc) repo = MusehubRepo( repo_id=compute_repo_id(_stress_owner_id, _slug, "code", _created_at.isoformat()), name=f"repo_{mid}", owner="stressuser", slug=_slug, visibility="public", owner_user_id=_stress_owner_id, created_at=_created_at, updated_at=_created_at, ) db_session.add(repo) await db_session.flush() await _svc_create( db_session, mist_id=mid, filename=f"stress_{i:04d}.py", content=content, owner="stressuser", repo_id=str(repo.repo_id), artifact_type=unique_type, ) await db_session.commit() # Paginate through all pages and collect IDs. collected: list[str] = [] cursor: str | None = None pages = 0 while True: params: JSONObject = { "artifact_type": unique_type, "limit": 20, } if cursor: params["cursor"] = cursor r = await client.get("/api/mists/explore", params=params) assert r.status_code == 200, r.text body = r.json() collected.extend(m["mistId"] for m in body["mists"]) cursor = body.get("nextCursor") pages += 1 if not cursor: break assert len(collected) == n, ( f"Expected {n} mists, collected {len(collected)} across {pages} page(s)" ) assert len(set(collected)) == n, ( f"Duplicate IDs in paginated explore output ({len(collected) - len(set(collected))} dups)" ) @pytest.mark.anyio async def test_large_explore_total_count_accurate( self, client: AsyncClient, db_session: AsyncSession ) -> None: """The total field on the first page must reflect the full seed count.""" from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create unique_type = f"cnt_{secrets.token_hex(4)}" n = 50 _count_owner_id = compute_identity_id(b"countuser") for i in range(n): content = f"# count check {i} {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) _slug = f"rcm_{mid}" _created_at = datetime.now(tz=timezone.utc) repo = MusehubRepo( repo_id=compute_repo_id(_count_owner_id, _slug, "code", _created_at.isoformat()), name=f"rc_{mid}", owner="countuser", slug=_slug, visibility="public", owner_user_id=_count_owner_id, created_at=_created_at, updated_at=_created_at, ) db_session.add(repo) await db_session.flush() await _svc_create( db_session, mist_id=mid, filename=f"count_{i}.py", content=content, owner="countuser", repo_id=str(repo.repo_id), artifact_type=unique_type, ) await db_session.commit() r = await client.get( "/api/mists/explore", params={"artifact_type": unique_type, "limit": 10}, ) assert r.status_code == 200 body = r.json() assert body["total"] == n, ( f"total={body['total']} but seeded {n} mists with type={unique_type!r}" ) @pytest.mark.anyio async def test_large_explore_under_2s( self, client: AsyncClient, db_session: AsyncSession ) -> None: """First-page explore of 200+ mists must respond in under 2 seconds.""" from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create unique_type = f"perf_{secrets.token_hex(4)}" n = 100 _perf_owner_id = compute_identity_id(b"perfuser") for i in range(n): content = f"# perf {i} {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) _slug = f"pm_{mid}" _created_at = datetime.now(tz=timezone.utc) repo = MusehubRepo( repo_id=compute_repo_id(_perf_owner_id, _slug, "code", _created_at.isoformat()), name=f"p_{mid}", owner="perfuser", slug=_slug, visibility="public", owner_user_id=_perf_owner_id, created_at=_created_at, updated_at=_created_at, ) db_session.add(repo) await db_session.flush() await _svc_create( db_session, mist_id=mid, filename=f"perf_{i}.py", content=content, owner="perfuser", repo_id=str(repo.repo_id), artifact_type=unique_type, ) await db_session.commit() start = time.monotonic() r = await client.get( "/api/mists/explore", params={"artifact_type": unique_type, "limit": 20}, ) elapsed = time.monotonic() - start assert r.status_code == 200 assert elapsed < 2.0, f"Explore took {elapsed:.3f}s — expected < 2s" # ═══════════════════════════════════════════════════════════════════════════════ # Fork chains # ═══════════════════════════════════════════════════════════════════════════════ class TestForkChains: """Full-depth fork chain built via HTTP; structural invariants verified.""" @pytest.mark.anyio async def test_fork_chain_depth_and_parent_ids( self, client: AsyncClient, auth_headers: StrDict ) -> None: """Each fork has the correct fork_parent_id and fork_depth.""" root = await _create(client, auth_headers) root_id = root["mistId"] chain = [root_id] for depth in range(1, 6): r = await client.post( f"/api/mists/{chain[-1]}/fork", headers=auth_headers ) assert r.status_code == 201, ( f"Fork at depth {depth} failed: {r.status_code} {r.text}" ) fork_id = r.json()["mistId"] chain.append(fork_id) # Verify each fork's metadata. for depth in range(1, 6): r = await client.get(f"/api/mists/{chain[depth]}") assert r.status_code == 200 body = r.json() assert body["forkParentId"] == chain[depth - 1], ( f"Depth {depth}: forkParentId={body['forkParentId']!r}, " f"expected {chain[depth - 1]!r}" ) assert body["forkDepth"] == depth, ( f"Depth {depth}: forkDepth={body['forkDepth']!r}, expected {depth}" ) @pytest.mark.anyio async def test_fork_inherits_content_and_filename( self, client: AsyncClient, auth_headers: StrDict ) -> None: root = await _create( client, auth_headers, content=f"def root_fn(): return 'root'\n{secrets.token_hex(16)}", filename="root.py", ) r = await client.post( f"/api/mists/{root['mistId']}/fork", headers=auth_headers ) assert r.status_code == 201 fork_id = r.json()["mistId"] r2 = await client.get(f"/api/mists/{fork_id}") assert r2.status_code == 200 body = r2.json() assert body["content"] == root["content"] assert body["filename"] == root["filename"] @pytest.mark.anyio async def test_fork_count_increments_on_parent( self, client: AsyncClient, auth_headers: StrDict ) -> None: root = await _create(client, auth_headers) initial_forks = root.get("forkCount", 0) for _ in range(3): r = await client.post( f"/api/mists/{root['mistId']}/fork", headers=auth_headers ) assert r.status_code == 201 r = await client.get(f"/api/mists/{root['mistId']}") assert r.status_code == 200 assert r.json()["forkCount"] == initial_forks + 3 @pytest.mark.anyio async def test_fork_chain_completes_under_5s( self, client: AsyncClient, auth_headers: StrDict ) -> None: """Building a full 5-fork chain must complete in under 5 seconds.""" root = await _create(client, auth_headers) current_id = root["mistId"] start = time.monotonic() for _ in range(5): r = await client.post( f"/api/mists/{current_id}/fork", headers=auth_headers ) assert r.status_code == 201 current_id = r.json()["mistId"] elapsed = time.monotonic() - start assert elapsed < 5.0, f"Fork chain took {elapsed:.3f}s — expected < 5s"