"""Phase 8 TDD: End-to-end Mist domain smoke test. Exercises the full path for a mist-domain repo in one integration test: seed repo + artifacts → job_types_for_push dispatches intel.mist → MistProvider.compute runs build_mist_anchor_index → symbol anchors persisted to musehub_symbol_history_entries + musehub_symbol_intel → persist_intel_results writes mist.anchor_index to musehub_intel_results → profile activity canvas includes a "mist" domain grid with total >= 1 → GET /api/mists/explore returns 200 → GET /api/{owner}/mists returns 200 → GET /muse/mists returns 200 → GET /api/openapi.json lists /api/mists paths No mocks — all assertions run against the real PostgreSQL test DB and the live FastAPI app instance (same fixtures as phases 1–7). """ from __future__ import annotations import secrets from datetime import datetime, timezone import msgpack import pytest from httpx import AsyncClient from muse.core.types import blob_id from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_intel_models import MusehubIntelResult, MusehubSymbolHistoryEntry, MusehubSymbolIntel from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import StrDict # --------------------------------------------------------------------------- # Seed helper (inline — no cross-test import) # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _oid(content: bytes) -> str: return blob_id(content) def _manifest_blob(manifest: StrDict) -> bytes: return msgpack.packb(manifest, use_bin_type=True) def _commit_id() -> str: return blob_id(secrets.token_bytes(16)) def _snap_id(manifest: StrDict) -> str: return blob_id(msgpack.packb(sorted(manifest.items()), use_bin_type=True)) async def _seed_mist_repo( session: AsyncSession, owner: str, artifacts: dict[str, bytes], ) -> tuple[MusehubRepo, MusehubCommit]: """Create a mist-domain repo with a commit pointing at a snapshot.""" owner_id = compute_identity_id(owner.encode()) slug = f"smoke-{secrets.token_hex(4)}" created_at = _now() repo_id = compute_repo_id(owner_id, slug, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, domain_id="mist", description="smoke-test mist repo", tags=[], created_at=created_at, ) session.add(repo) await session.flush() manifest: dict[str, str] = {} for filename, raw in artifacts.items(): oid = _oid(raw) manifest[filename] = oid if await session.get(MusehubObject, oid) is None: session.add(MusehubObject( object_id=oid, path=filename, size_bytes=len(raw), content_cache=raw, )) await session.flush() snap_id = _snap_id(manifest) if await session.get(MusehubSnapshot, snap_id) is None: session.add(MusehubSnapshot( snapshot_id=snap_id, entry_count=len(manifest), manifest_blob=_manifest_blob(manifest), )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) await session.flush() cid = _commit_id() commit = MusehubCommit( commit_id=cid, message="smoke: initial mist", author=owner, branch="main", parent_ids=[], snapshot_id=snap_id, timestamp=_now(), ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await session.flush() return repo, commit # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- _ARTIFACTS: dict[str, bytes] = { "utils.py": b"def helper_one(): pass\ndef helper_two(): pass\n", "schema.json": b'{"type": "object", "properties": {"id": {"type": "string"}}}', "README.md": b"# Smoke test mist\nContent-addressed artifact share.\n", } # --------------------------------------------------------------------------- # Phase 8 — smoke test # --------------------------------------------------------------------------- class TestMistDomainEndToEnd: @pytest.mark.asyncio async def test_intel_mist_dispatched_for_mist_domain(self) -> None: """job_types_for_push('mist') must include 'intel.mist'.""" from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("mist") assert "intel.mist" in types, ( f"'intel.mist' must be dispatched for mist repos; got {types}" ) @pytest.mark.asyncio async def test_anchors_persisted_after_indexing( self, db_session: AsyncSession ) -> None: """After build_mist_anchor_index runs, symbol history entries must exist.""" from musehub.services.musehub_mist_indexer import build_mist_anchor_index from sqlalchemy import select owner = f"smoke_{secrets.token_hex(4)}" repo, commit = await _seed_mist_repo(db_session, owner, _ARTIFACTS) results = await build_mist_anchor_index( db_session, repo.repo_id, commit.commit_id ) history_count = (await db_session.execute( select(func.count()).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalar_one() assert history_count >= 1, ( f"Expected at least 1 symbol history entry after indexing; got {history_count}" ) @pytest.mark.asyncio async def test_intel_results_written_by_mist_provider( self, db_session: AsyncSession ) -> None: """MistProvider.compute + persist_intel_results must write mist.anchor_index.""" from musehub.services.musehub_intel_providers import MistProvider, persist_intel_results owner = f"smoke_{secrets.token_hex(4)}" repo, commit = await _seed_mist_repo(db_session, owner, _ARTIFACTS) provider = MistProvider() results = await provider.compute( db_session, repo.repo_id, commit.commit_id, {} ) # Must return at least the anchor_index result. result_types = [r[0] for r in results] assert "mist.anchor_index" in result_types, ( f"MistProvider.compute must return 'mist.anchor_index'; got {result_types}" ) await persist_intel_results( db_session, repo.repo_id, commit.commit_id, results ) await db_session.flush() row = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "mist.anchor_index", ) )).scalar_one_or_none() assert row is not None, ( "persist_intel_results must write a 'mist.anchor_index' row to musehub_intel_results" ) @pytest.mark.asyncio async def test_symbol_intel_rows_written( self, db_session: AsyncSession ) -> None: """Symbol intel rows must be upserted for each anchor extracted.""" from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner = f"smoke_{secrets.token_hex(4)}" repo, commit = await _seed_mist_repo(db_session, owner, _ARTIFACTS) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) intel_count = (await db_session.execute( select(func.count()).where( MusehubSymbolIntel.repo_id == repo.repo_id ) )).scalar_one() assert intel_count >= 1, ( f"Expected at least 1 symbol intel row after indexing; got {intel_count}" ) @pytest.mark.asyncio async def test_profile_canvas_has_mist_grid( self, db_session: AsyncSession ) -> None: """After seeding a mist repo with commits, profile canvas includes 'mist' domain.""" from musehub.services.musehub_profile import build_activity_canvas owner = f"smoke_{secrets.token_hex(4)}" await _seed_mist_repo(db_session, owner, _ARTIFACTS) domains = await build_activity_canvas(db_session, owner) domain_names = [d.domain for d in domains] assert "mist" in domain_names, ( f"Profile canvas must include 'mist' domain; got {domain_names}" ) mist = next(d for d in domains if d.domain == "mist") assert mist.total >= 0 # zero is fine for snapshot-only push; non-crash matters @pytest.mark.asyncio async def test_push_validator_rejects_path_traversal(self) -> None: """validate_mist_manifest must reject path traversal filenames.""" from musehub.services.musehub_mist_push_validator import validate_mist_manifest result = validate_mist_manifest({"../evil.py": "sha256:abc"}) assert not result.valid assert len(result.errors) >= 1 @pytest.mark.asyncio async def test_explore_endpoint_returns_200(self, client: AsyncClient) -> None: """GET /api/mists/explore must return 200.""" r = await client.get("/api/mists/explore") assert r.status_code == 200, ( f"GET /api/mists/explore returned {r.status_code}" ) @pytest.mark.asyncio async def test_owner_mists_endpoint_returns_200( self, client: AsyncClient ) -> None: """GET /api/{owner}/mists must return 200 (empty list for unknown owner is fine).""" r = await client.get("/api/gabriel/mists") assert r.status_code == 200, ( f"GET /api/gabriel/mists returned {r.status_code}" ) @pytest.mark.asyncio async def test_docs_mists_page_returns_200(self, client: AsyncClient) -> None: """GET /muse/mists must return 200 with HTML content.""" r = await client.get("/muse/mists") assert r.status_code == 200 assert "text/html" in r.headers.get("content-type", "") @pytest.mark.asyncio async def test_openapi_schema_lists_mists_paths( self, client: AsyncClient ) -> None: """GET /api/openapi.json must list /api/mists paths.""" r = await client.get("/api/openapi.json") assert r.status_code == 200 paths = r.json().get("paths", {}) mist_paths = [p for p in paths if "/mists" in p] assert len(mist_paths) > 0, ( f"No /mists paths in OpenAPI schema; sample: {list(paths.keys())[:20]}" ) @pytest.mark.asyncio async def test_full_pipeline_anchor_count_positive( self, db_session: AsyncSession ) -> None: """Full pipeline: index → intel → confirm anchor_count > 0 in result data.""" import json from musehub.services.musehub_intel_providers import MistProvider, persist_intel_results owner = f"smoke_{secrets.token_hex(4)}" # utils.py has two functions → at least 2 anchors repo, commit = await _seed_mist_repo( db_session, owner, {"utils.py": b"def alpha(): pass\ndef beta(): pass\n"} ) provider = MistProvider() results = await provider.compute( db_session, repo.repo_id, commit.commit_id, {} ) await persist_intel_results( db_session, repo.repo_id, commit.commit_id, results ) await db_session.flush() row = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "mist.anchor_index", ) )).scalar_one_or_none() assert row is not None data = json.loads(row.data_json) assert data.get("anchor_count", 0) >= 2, ( f"Expected anchor_count >= 2 for utils.py with 2 functions; got {data}" )