"""Advanced mist-domain tests — state integrity, performance, and security gaps. Fills the gaps not covered by the eight TDD phase files or the existing test_mists / test_mist_routes / test_mist_security suites: State integrity - Indexer idempotency: re-indexing the same commit produces no duplicate history entries (ON CONFLICT DO NOTHING guarantees). - ``persist_intel_results`` upsert: a second call for the same (repo_id, intel_type) replaces the row, not appends. - Version monotonicity: each content update increments version; a metadata-only update (title) leaves version unchanged. - Counter independence: view_count and embed_count are per-mist and do not bleed across rows. - History accumulation: two distinct commits for the same repo produce additive history entries (not replaced). - Manifest with no extractable anchors (binary/markdown only) returns [] and writes no history entries. Performance - ``build_mist_anchor_index`` on a 5-function file: under 500 ms. - ``MistProvider.compute`` for a repo with a 5-function file: under 1 s. - ``list_mists`` service call across 100 rows: under 500 ms. - ``persist_intel_results`` for 50 result tuples: under 1 s. Security (additional scenarios not in test_mist_security.py) - Unauthenticated fork attempt returns 401. - Non-owner fork of a *secret* mist returns 403 or 404. - Non-owner fork of a *public* mist succeeds (fork is public by design). - Corrupted / garbage cursor in list query is silently ignored (no 500). - ``validate_mist_manifest`` with an empty manifest is always valid. - ``validate_mist_manifest`` accumulates errors across multiple bad files. """ from __future__ import annotations import secrets import time from datetime import datetime, timezone import msgpack import pytest from httpx import AsyncClient from muse.core.types import blob_id from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_intel_models import MusehubIntelResult, MusehubSymbolHistoryEntry, MusehubSymbolIntel from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import JSONObject, JSONValue, StrDict # --------------------------------------------------------------------------- # Seed helpers # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _oid(raw: bytes) -> str: return blob_id(raw) def _commit_id() -> str: return blob_id(secrets.token_bytes(16)) def _snap_id(manifest: StrDict) -> str: return blob_id(msgpack.packb(sorted(manifest.items()), use_bin_type=True)) def _manifest_blob(manifest: StrDict) -> bytes: return msgpack.packb(manifest, use_bin_type=True) async def _seed_repo( session: AsyncSession, owner: str, artifacts: dict[str, bytes], *, visibility: str = "public", ) -> tuple[MusehubRepo, MusehubCommit]: owner_id = compute_identity_id(owner.encode()) slug = f"adv-{secrets.token_hex(4)}" created_at = _now() repo_id = compute_repo_id(owner_id, slug, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility=visibility, owner_user_id=owner_id, domain_id="mist", description="advanced test repo", tags=[], created_at=created_at, ) session.add(repo) await session.flush() manifest: dict[str, str] = {} for filename, raw in artifacts.items(): oid = _oid(raw) manifest[filename] = oid if await session.get(MusehubObject, oid) is None: session.add(MusehubObject( object_id=oid, path=filename, size_bytes=len(raw), content_cache=raw, )) await session.flush() snap_id = _snap_id(manifest) if await session.get(MusehubSnapshot, snap_id) is None: session.add(MusehubSnapshot( snapshot_id=snap_id, entry_count=len(manifest), manifest_blob=_manifest_blob(manifest), )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) await session.flush() commit_id = _commit_id() commit = MusehubCommit( commit_id=commit_id, message="advanced: initial", author=owner, branch="main", parent_ids=[], snapshot_id=snap_id, timestamp=_now(), ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) await session.flush() return repo, commit _FIVE_FN_PY = b"def a(): pass\ndef b(): pass\ndef c(): pass\ndef d(): pass\ndef e(): pass\n" _OWNER = "testuser" def _mist_payload(**overrides: JSONValue) -> JSONObject: base: JSONObject = { "filename": f"adv_{secrets.token_hex(4)}.py", "content": f"def fn(): pass\n# {secrets.token_hex(16)}", "visibility": "public", } base.update(overrides) return base async def _create(client: AsyncClient, headers: StrDict, **overrides: JSONValue) -> JSONObject: r = await client.post("/api/mists", json=_mist_payload(**overrides), headers=headers) assert r.status_code == 201, r.text return dict(r.json()) # ═══════════════════════════════════════════════════════════════════════════ # State integrity — indexer idempotency # ═══════════════════════════════════════════════════════════════════════════ class TestIndexerIdempotency: """Re-indexing the same commit must not create duplicate DB rows.""" @pytest.mark.asyncio async def test_reindex_same_commit_no_duplicate_history_entries( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner = f"idem_{secrets.token_hex(4)}" repo, commit = await _seed_repo(db_session, owner, {"utils.py": _FIVE_FN_PY}) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) count_after_first = (await db_session.execute( select(func.count()).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalar_one() # Second call — ON CONFLICT DO NOTHING must prevent duplicates. await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) count_after_second = (await db_session.execute( select(func.count()).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalar_one() assert count_after_first == count_after_second, ( "Re-indexing the same commit must not produce duplicate history entries; " f"got {count_after_first} then {count_after_second}" ) @pytest.mark.asyncio async def test_reindex_same_commit_no_duplicate_intel_rows( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner = f"idem2_{secrets.token_hex(4)}" repo, commit = await _seed_repo(db_session, owner, {"mod.py": _FIVE_FN_PY}) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) intel_count = (await db_session.execute( select(func.count()).where( MusehubSymbolIntel.repo_id == repo.repo_id ) )).scalar_one() # 5 functions → 5 addresses; each should appear exactly once. assert intel_count == 5, ( f"Expected 5 unique symbol intel rows; got {intel_count}" ) @pytest.mark.asyncio async def test_empty_manifest_returns_empty_list( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner = f"empty_{secrets.token_hex(4)}" # Seed a repo whose commit has an empty snapshot. owner_id = compute_identity_id(owner.encode()) slug = f"empty-{secrets.token_hex(3)}" created_at = _now() repo_id = compute_repo_id(owner_id, slug, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, domain_id="mist", tags=[], created_at=created_at, ) db_session.add(repo) await db_session.flush() empty_manifest: dict[str, str] = {} snap_id = _snap_id(empty_manifest) db_session.add(MusehubSnapshot( snapshot_id=snap_id, entry_count=0, manifest_blob=_manifest_blob(empty_manifest), )) db_session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) await db_session.flush() _cid = _commit_id() commit = MusehubCommit( commit_id=_cid, message="empty", author=owner, branch="main", parent_ids=[], snapshot_id=snap_id, timestamp=_now(), ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=_cid)) await db_session.flush() result = await build_mist_anchor_index( db_session, repo_id, commit.commit_id ) assert result == [], "Empty snapshot manifest must return []" @pytest.mark.asyncio async def test_no_anchor_artifacts_returns_empty_list( self, db_session: AsyncSession ) -> None: """Manifest containing only files that yield no anchors returns [].""" from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner = f"noanchor_{secrets.token_hex(4)}" # JSON and YAML yield no symbol anchors. repo, commit = await _seed_repo( db_session, owner, { "config.yaml": b"key: value\nother: 123\n", "schema.json": b'{"type": "object"}', } ) result = await build_mist_anchor_index( db_session, repo.repo_id, commit.commit_id ) assert result == [], ( "Manifest with only non-code artifacts (JSON/YAML) must return []" ) @pytest.mark.asyncio async def test_history_entries_accumulate_across_commits( self, db_session: AsyncSession ) -> None: """A second commit with new anchors adds to history — does not replace.""" from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner = f"accum_{secrets.token_hex(4)}" repo, commit1 = await _seed_repo( db_session, owner, {"v1.py": b"def first(): pass\n"} ) await build_mist_anchor_index(db_session, repo.repo_id, commit1.commit_id) count_after_first = (await db_session.execute( select(func.count()).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalar_one() # Second commit with a different file. raw2 = b"def second(): pass\ndef third(): pass\n" oid2 = _oid(raw2) if await db_session.get(MusehubObject, oid2) is None: db_session.add(MusehubObject( object_id=oid2, path="v2.py", size_bytes=len(raw2), content_cache=raw2, )) await db_session.flush() manifest2 = {"v2.py": oid2} snap2_id = _snap_id(manifest2) if await db_session.get(MusehubSnapshot, snap2_id) is None: db_session.add(MusehubSnapshot( snapshot_id=snap2_id, entry_count=1, manifest_blob=_manifest_blob(manifest2), )) db_session.add(MusehubSnapshotRef(repo_id=repo.repo_id, snapshot_id=snap2_id)) await db_session.flush() _cid2 = _commit_id() commit2 = MusehubCommit( commit_id=_cid2, message="second commit", author=owner, branch="main", parent_ids=[commit1.commit_id], snapshot_id=snap2_id, timestamp=_now(), ) db_session.add(commit2) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=_cid2)) await db_session.flush() await build_mist_anchor_index(db_session, repo.repo_id, commit2.commit_id) count_after_second = (await db_session.execute( select(func.count()).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalar_one() assert count_after_second > count_after_first, ( "A second commit with new anchors must add history entries; " f"count stayed at {count_after_second}" ) # ═══════════════════════════════════════════════════════════════════════════ # State integrity — intel results upsert # ═══════════════════════════════════════════════════════════════════════════ class TestIntelResultsUpsert: """persist_intel_results must overwrite, not duplicate, on repeated calls.""" @pytest.mark.asyncio async def test_second_persist_call_overwrites_not_duplicates( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import persist_intel_results owner = f"upsert_{secrets.token_hex(4)}" repo, commit = await _seed_repo(db_session, owner, {"x.py": _FIVE_FN_PY}) results1 = [("mist.anchor_index", {"anchor_count": 3, "filename_count": 1})] await persist_intel_results(db_session, repo.repo_id, commit.commit_id, results1) await db_session.flush() results2 = [("mist.anchor_index", {"anchor_count": 5, "filename_count": 1})] await persist_intel_results(db_session, repo.repo_id, commit.commit_id, results2) await db_session.flush() rows = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "mist.anchor_index", ) )).scalars().all() assert len(rows) == 1, ( f"Expected exactly 1 intel result row after two upserts; got {len(rows)}" ) import json data = json.loads(rows[0].data_json) assert data["anchor_count"] == 5, ( "Second persist call must overwrite the first; expected anchor_count=5" ) # ═══════════════════════════════════════════════════════════════════════════ # State integrity — CRUD version monotonicity and counter isolation # ═══════════════════════════════════════════════════════════════════════════ class TestCRUDStateIntegrity: """Version, view_count, and embed_count integrity across mutations.""" @pytest.mark.asyncio async def test_version_increments_on_each_content_update( self, client: AsyncClient, auth_headers: StrDict ) -> None: mist = await _create(client, auth_headers) mist_id = mist["mistId"] assert mist["version"] == 1 for expected in range(2, 5): r = await client.patch( f"/api/mists/{mist_id}", json={"content": f"def fn(): return {expected}\n# {secrets.token_hex(16)}"}, headers=auth_headers, ) assert r.status_code == 200 assert r.json()["version"] == expected, ( f"Expected version={expected} after update #{expected - 1}; " f"got {r.json()['version']}" ) @pytest.mark.asyncio async def test_metadata_only_update_does_not_increment_version( self, client: AsyncClient, auth_headers: StrDict ) -> None: mist = await _create(client, auth_headers) mist_id = mist["mistId"] initial_version = mist["version"] r = await client.patch( f"/api/mists/{mist_id}", json={"title": "New title", "description": "New description"}, headers=auth_headers, ) assert r.status_code == 200 assert r.json()["version"] == initial_version, ( "Metadata-only update must not increment version" ) @pytest.mark.asyncio async def test_view_count_per_mist_independent( self, client: AsyncClient, auth_headers: StrDict ) -> None: a = await _create(client, auth_headers) b = await _create(client, auth_headers) # Hit mist A three times, mist B once. for _ in range(3): await client.get(f"/api/mists/{a['mistId']}") await client.get(f"/api/mists/{b['mistId']}") ra = (await client.get(f"/api/mists/{a['mistId']}")).json() rb = (await client.get(f"/api/mists/{b['mistId']}")).json() assert ra["viewCount"] >= 3 assert ra["viewCount"] != rb["viewCount"], ( "view_count must be independent per mist" ) @pytest.mark.asyncio async def test_embed_count_per_mist_independent( self, client: AsyncClient, auth_headers: StrDict ) -> None: a = await _create(client, auth_headers) b = await _create(client, auth_headers) # Embed mist A twice, leave B at zero. for _ in range(2): await client.get(f"/api/{_OWNER}/mists/{a['mistId']}/embed") ra = (await client.get(f"/api/mists/{a['mistId']}")).json() rb = (await client.get(f"/api/mists/{b['mistId']}")).json() assert ra["embedCount"] >= 2 assert rb["embedCount"] == 0 or ra["embedCount"] != rb["embedCount"], ( "embed_count must be independent per mist" ) @pytest.mark.asyncio async def test_deleted_mist_absent_from_list( self, client: AsyncClient, auth_headers: StrDict ) -> None: mist = await _create(client, auth_headers) mist_id = mist["mistId"] r_del = await client.delete(f"/api/mists/{mist_id}", headers=auth_headers) assert r_del.status_code == 204 r_list = await client.get(f"/api/{_OWNER}/mists") assert r_list.status_code == 200 ids = [m["mistId"] for m in r_list.json()["mists"]] assert mist_id not in ids, "Deleted mist must not appear in owner list" @pytest.mark.asyncio async def test_fork_count_matches_number_of_direct_forks( self, client: AsyncClient, auth_headers: StrDict ) -> None: root = await _create(client, auth_headers) root_id = root["mistId"] for _ in range(4): r = await client.post(f"/api/mists/{root_id}/fork", headers=auth_headers) assert r.status_code == 201 r_root = await client.get(f"/api/mists/{root_id}") assert r_root.json()["forkCount"] == 4 # ═══════════════════════════════════════════════════════════════════════════ # Performance # ═══════════════════════════════════════════════════════════════════════════ class TestPerformance: """Latency assertions for the indexer, provider, and service layer.""" @pytest.mark.asyncio async def test_build_mist_anchor_index_under_500ms( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner = f"perf1_{secrets.token_hex(4)}" repo, commit = await _seed_repo(db_session, owner, {"perf.py": _FIVE_FN_PY}) start = time.monotonic() await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) elapsed = time.monotonic() - start assert elapsed < 0.5, ( f"build_mist_anchor_index took {elapsed:.3f}s — expected < 500 ms" ) @pytest.mark.asyncio async def test_mist_provider_compute_under_1s( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import MistProvider owner = f"perf2_{secrets.token_hex(4)}" repo, commit = await _seed_repo(db_session, owner, {"perf.py": _FIVE_FN_PY}) provider = MistProvider() start = time.monotonic() await provider.compute(db_session, repo.repo_id, commit.commit_id, {}) elapsed = time.monotonic() - start assert elapsed < 1.0, ( f"MistProvider.compute took {elapsed:.3f}s — expected < 1 s" ) @pytest.mark.asyncio async def test_list_mists_100_rows_under_500ms( self, db_session: AsyncSession ) -> None: from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create, list_mists perf_owner = f"listperf_{secrets.token_hex(4)}" owner_id = compute_identity_id(perf_owner.encode()) unique_type = f"lp_{secrets.token_hex(4)}" for i in range(100): content = f"# list perf {i} {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) slug = f"lp_{mid}" created_at = _now() repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "mist", created_at.isoformat()), name=slug, owner=perf_owner, slug=slug, visibility="public", owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) db_session.add(repo) await db_session.flush() await _svc_create( db_session, mist_id=mid, filename=f"lp_{i}.py", content=content, owner=perf_owner, repo_id=str(repo.repo_id), artifact_type=unique_type, ) await db_session.commit() start = time.monotonic() result = await list_mists( db_session, owner=perf_owner, limit=100, ) elapsed = time.monotonic() - start assert elapsed < 0.5, ( f"list_mists(100 rows) took {elapsed:.3f}s — expected < 500 ms" ) assert result.total >= 100 @pytest.mark.asyncio async def test_persist_intel_results_50_tuples_under_1s( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import persist_intel_results owner = f"perf3_{secrets.token_hex(4)}" repo, commit = await _seed_repo(db_session, owner, {"x.py": _FIVE_FN_PY}) results = [ (f"mist.perf_type_{i}", {"value": i, "anchor_count": i}) for i in range(50) ] start = time.monotonic() await persist_intel_results(db_session, repo.repo_id, commit.commit_id, results) await db_session.flush() elapsed = time.monotonic() - start assert elapsed < 1.0, ( f"persist_intel_results(50 tuples) took {elapsed:.3f}s — expected < 1 s" ) # ═══════════════════════════════════════════════════════════════════════════ # Security — additional scenarios # ═══════════════════════════════════════════════════════════════════════════ class TestAdditionalSecurity: """Scenarios not covered by test_mist_security.py.""" @pytest.mark.asyncio async def test_unauthenticated_fork_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Fork without auth headers must be rejected 401. Mist is created directly via service layer so the auth_headers fixture (which injects global dependency overrides) is NOT active. """ from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"unauth_fork {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) owner_id = compute_identity_id(b"testuser") created_at = _now() repo_id = compute_repo_id(owner_id, mid, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=mid, owner="testuser", slug=mid, visibility="public", owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) db_session.add(repo) await db_session.flush() await _svc_create( db_session, mist_id=mid, filename="f.py", content=content, owner="testuser", repo_id=str(repo_id), ) await db_session.commit() # No auth_headers fixture active → require_signed_request not overridden. r = await client.post(f"/api/mists/{mid}/fork") assert r.status_code == 401, ( f"Unauthenticated fork must return 401; got {r.status_code}" ) @pytest.mark.asyncio async def test_non_owner_fork_of_secret_mist_blocked( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Non-owner forking a secret mist must be blocked (403 or 404).""" from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"secret_fork_test {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) other_owner_id = compute_identity_id(b"otheruser") created_at = _now() repo_id = compute_repo_id(other_owner_id, mid, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=mid, owner="otheruser", slug=mid, visibility="secret", owner_user_id=other_owner_id, created_at=created_at, updated_at=created_at, ) db_session.add(repo) await db_session.flush() await _svc_create( db_session, mist_id=mid, filename="secret.py", content=content, owner="otheruser", repo_id=str(repo_id), visibility="secret", ) await db_session.commit() # auth_headers authenticates as "testuser" — not "otheruser". r = await client.post(f"/api/mists/{mid}/fork", headers=auth_headers) assert r.status_code in (403, 404), ( f"Non-owner fork of secret mist must be blocked; got {r.status_code}" ) @pytest.mark.asyncio async def test_non_owner_fork_of_public_mist_succeeds( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Any authenticated user may fork a public mist.""" from muse.plugins.mist.plugin import compute_mist_id from musehub.services.musehub_mists import create_mist as _svc_create content = f"public_fork_test {secrets.token_hex(16)}" mid = compute_mist_id(content.encode()) other_owner_id = compute_identity_id(b"publicowner") created_at = _now() repo_id = compute_repo_id(other_owner_id, mid, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=mid, owner="publicowner", slug=mid, visibility="public", owner_user_id=other_owner_id, created_at=created_at, updated_at=created_at, ) db_session.add(repo) await db_session.flush() await _svc_create( db_session, mist_id=mid, filename="public.py", content=content, owner="publicowner", repo_id=str(repo_id), visibility="public", ) await db_session.commit() # auth_headers authenticates as "testuser" — not "publicowner". r = await client.post(f"/api/mists/{mid}/fork", headers=auth_headers) assert r.status_code == 201, ( f"Authenticated user must be able to fork a public mist; got {r.status_code}" ) @pytest.mark.asyncio async def test_garbage_cursor_in_list_does_not_crash( self, client: AsyncClient ) -> None: """A corrupted cursor value must be silently ignored (no 500).""" r = await client.get( "/api/mists/explore", params={"cursor": "not-a-valid-iso8601-cursor!!@@##"}, ) assert r.status_code == 200, ( f"Garbage cursor must not cause a 500; got {r.status_code}" ) @pytest.mark.asyncio async def test_empty_cursor_in_list_treated_as_first_page( self, client: AsyncClient ) -> None: r = await client.get("/api/mists/explore", params={"cursor": ""}) assert r.status_code == 200 def test_validate_mist_manifest_empty_manifest_is_valid(self) -> None: from musehub.services.musehub_mist_push_validator import validate_mist_manifest result = validate_mist_manifest({}) assert result.valid, "Empty manifest must be valid (nothing to reject)" assert result.errors == [] assert result.warnings == [] def test_validate_mist_manifest_accumulates_all_errors(self) -> None: from musehub.services.musehub_mist_push_validator import validate_mist_manifest result = validate_mist_manifest({ "../traversal.py": "sha256:aaa", "valid.py": "sha256:bbb", "subdir/nested.py": "sha256:ccc", "null\x00byte.py": "sha256:ddd", }) assert not result.valid, "Manifest with multiple bad filenames must be invalid" assert len(result.errors) >= 3, ( f"Expected at least 3 errors (one per bad filename); got {result.errors}" ) def test_validate_mist_manifest_warnings_do_not_block(self) -> None: from musehub.services.musehub_mist_push_validator import validate_mist_manifest result = validate_mist_manifest({ "data.unknown_ext": "sha256:abc", "noextension": "sha256:def", }) assert result.valid, "Unrecognised extensions are warnings, not errors" assert len(result.warnings) >= 1 # ═══════════════════════════════════════════════════════════════════════════ # Docstrings — source coverage check # ═══════════════════════════════════════════════════════════════════════════ class TestDocstrings: """Every public symbol in the mist stack has a docstring.""" def test_mist_provider_class_has_docstring(self) -> None: from musehub.services.musehub_intel_providers import MistProvider assert MistProvider.__doc__, "MistProvider must have a class docstring" def test_mist_provider_compute_has_no_stale_phase_labels(self) -> None: import inspect from musehub.services.musehub_intel_providers import MistProvider src = inspect.getsource(MistProvider.compute) assert "Phase 1:" not in src, "Stale 'Phase 1:' label must be removed" assert "Phase 3:" not in src, "Stale 'Phase 3:' label must be removed" def test_profile_snapshot_provider_docstring_says_six_domains(self) -> None: from musehub.services.musehub_intel_providers import ProfileSnapshotProvider doc = ProfileSnapshotProvider.__doc__ or "" assert "6-domain" in doc, ( "ProfileSnapshotProvider docstring must say '6-domain' (canvas was updated)" ) def test_build_mist_anchor_index_has_docstring(self) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index assert build_mist_anchor_index.__doc__ def test_history_weeks_constant_exported(self) -> None: from musehub.services.musehub_mist_indexer import _HISTORY_WEEKS assert isinstance(_HISTORY_WEEKS, int) assert _HISTORY_WEEKS == 12 def test_validate_mist_manifest_has_docstring(self) -> None: from musehub.services.musehub_mist_push_validator import validate_mist_manifest assert validate_mist_manifest.__doc__ def test_mist_validation_result_has_docstring(self) -> None: from musehub.services.musehub_mist_push_validator import MistValidationResult assert MistValidationResult.__doc__ def test_max_content_bytes_constant_documented(self) -> None: import inspect import musehub.models.mists as _mod src = inspect.getsource(_mod) assert "ContentSizeLimitMiddleware" in src, ( "_MAX_CONTENT_BYTES must document that enforcement is via middleware" )