"""Phase 3 TDD: Mist snapshot indexer — symbol anchor extraction on push. Tests are written RED first. Run before touching musehub_mist_indexer.py and musehub_intel_providers.py to confirm they fail, then implement to green. The indexer reads a mist repo's HEAD commit snapshot manifest, loads each artifact's bytes from the object store, extracts symbol anchors, and writes normalized rows to: musehub_symbol_history_entries — one row per (repo_id, address, commit_id) musehub_symbol_intel — one row per (repo_id, address) This makes mist anchors discoverable via muse code grep / code impact across the entire hub, using the same infrastructure as code-domain symbols. Idempotency: indexing the same commit twice must produce the same row count. """ from __future__ import annotations import secrets from datetime import datetime, timezone import msgpack import pytest from muse.core.types import blob_id from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry, MusehubSymbolIntel from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import StrDict # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _oid(content: bytes) -> str: return blob_id(content) def _manifest_blob(manifest: StrDict) -> bytes: return msgpack.packb(manifest, use_bin_type=True) def _commit_id() -> str: return blob_id(secrets.token_bytes(16)) def _snap_id(manifest: StrDict) -> str: return blob_id(msgpack.packb(sorted(manifest.items()), use_bin_type=True)) async def _seed_mist_vcs_repo( session: AsyncSession, *, owner: str = "testuser", artifacts: dict[str, bytes], # filename → raw content bytes ) -> tuple[MusehubRepo, MusehubCommit]: """Create a mist repo with a commit pointing at a snapshot of the given artifacts. Each artifact becomes a MusehubObject with content_cache populated so read_object_bytes() can serve it without hitting disk or S3. """ owner_id = compute_identity_id(owner.encode()) slug = f"mist-{secrets.token_hex(4)}" created_at = _now() repo_id = compute_repo_id(owner_id, slug, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, domain_id="mist", description="", tags=[], created_at=created_at, ) session.add(repo) await session.flush() # Create MusehubObject rows with content_cache for each artifact. manifest: dict[str, str] = {} for filename, raw in artifacts.items(): oid = _oid(raw) manifest[filename] = oid obj = MusehubObject( object_id=oid, path=filename, size_bytes=len(raw), content_cache=raw, ) # ON CONFLICT DO NOTHING — same bytes may appear in multiple artifacts. existing = await session.get(MusehubObject, oid) if existing is None: session.add(obj) await session.flush() # Create snapshot row. snap_id = _snap_id(manifest) existing_snap = await session.get(MusehubSnapshot, snap_id) if existing_snap is None: session.add(MusehubSnapshot( snapshot_id=snap_id, entry_count=len(manifest), manifest_blob=_manifest_blob(manifest), )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) await session.flush() # Create commit row pointing at the snapshot. cid = _commit_id() commit = MusehubCommit( commit_id=cid, message="initial mist", author=owner, branch="main", parent_ids=[], snapshot_id=snap_id, timestamp=_now(), ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await session.commit() await session.refresh(repo) await session.refresh(commit) return repo, commit # --------------------------------------------------------------------------- # 1. build_mist_anchor_index exists and is importable # --------------------------------------------------------------------------- class TestBuildMistAnchorIndexExists: def test_function_is_importable(self) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index import inspect assert inspect.iscoroutinefunction(build_mist_anchor_index) def test_function_signature(self) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index import inspect sig = inspect.signature(build_mist_anchor_index) assert "repo_id" in sig.parameters assert "head_commit_id" in sig.parameters # --------------------------------------------------------------------------- # 2. Anchor extraction → musehub_symbol_history_entries # --------------------------------------------------------------------------- class TestMistAnchorIndexerHistoryEntries: @pytest.mark.asyncio async def test_python_artifact_writes_history_entries( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={ "utils.py": b"def add(a, b):\n return a + b\n\ndef sub(a, b):\n return a - b\n", }, ) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalars().all() addresses = {r.address for r in rows} assert any("add" in a for a in addresses), f"Expected 'add' anchor; got {addresses}" assert any("sub" in a for a in addresses), f"Expected 'sub' anchor; got {addresses}" @pytest.mark.asyncio async def test_history_entry_fields( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index content = b"def process(x):\n return x\n" repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={"module.py": content}, ) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await db_session.commit() row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address.like("module.py::%"), ) )).scalars().first() assert row is not None assert row.commit_id == commit.commit_id assert row.author == test_user.handle assert row.op in ("add", "modify") assert row.committed_at is not None @pytest.mark.asyncio async def test_multiple_artifacts_all_indexed( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={ "a.py": b"def alpha(): pass\n", "b.py": b"def beta(): pass\n", }, ) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalars().all() addresses = {r.address for r in rows} assert any("alpha" in a for a in addresses) assert any("beta" in a for a in addresses) @pytest.mark.asyncio async def test_binary_artifact_produces_no_history_entries( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={"image.png": b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00"}, ) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await db_session.commit() count = (await db_session.execute( select(func.count()).select_from(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert count == 0 # --------------------------------------------------------------------------- # 3. Anchor extraction → musehub_symbol_intel # --------------------------------------------------------------------------- class TestMistAnchorIndexerSymbolIntel: @pytest.mark.asyncio async def test_python_artifact_writes_symbol_intel( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={"calc.py": b"def mul(a, b):\n return a * b\n"}, ) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolIntel).where( MusehubSymbolIntel.repo_id == repo.repo_id, ) )).scalars().all() assert len(rows) >= 1 addresses = {r.address for r in rows} assert any("mul" in a for a in addresses) @pytest.mark.asyncio async def test_symbol_intel_churn_is_at_least_one( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={"api.py": b"async def fetch(url):\n pass\n"}, ) await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await db_session.commit() row = (await db_session.execute( select(MusehubSymbolIntel).where( MusehubSymbolIntel.repo_id == repo.repo_id, MusehubSymbolIntel.address.like("api.py::%"), ) )).scalars().first() assert row is not None assert row.churn >= 1 # --------------------------------------------------------------------------- # 4. Idempotency # --------------------------------------------------------------------------- class TestMistAnchorIndexerIdempotency: @pytest.mark.asyncio async def test_indexing_same_commit_twice_is_idempotent( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={"ops.py": b"def create(): pass\ndef delete(): pass\n"}, ) for _ in range(2): await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) await db_session.commit() history_count = (await db_session.execute( select(func.count()).select_from(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() intel_count = (await db_session.execute( select(func.count()).select_from(MusehubSymbolIntel).where( MusehubSymbolIntel.repo_id == repo.repo_id, ) )).scalar_one() assert history_count == intel_count, ( "Each anchor should produce exactly one history entry and one intel row" ) # Verify rows are present (not zero from double-delete or something) assert history_count >= 2, f"Expected ≥2 anchors for create+delete; got {history_count}" # --------------------------------------------------------------------------- # 5. Edge cases # --------------------------------------------------------------------------- class TestMistAnchorIndexerEdgeCases: @pytest.mark.asyncio async def test_commit_without_snapshot_returns_empty( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index from musehub.core.genesis import compute_repo_id owner_id = compute_identity_id(test_user.handle.encode()) created_at = _now() repo_id = compute_repo_id(owner_id, "no-snap", "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name="no-snap", owner=test_user.handle, slug="no-snap", visibility="public", owner_user_id=owner_id, domain_id="mist", description="", tags=[], created_at=created_at, ) db_session.add(repo) cid = _commit_id() commit = MusehubCommit( commit_id=cid, message="empty", author=test_user.handle, branch="main", parent_ids=[], snapshot_id=None, timestamp=_now(), ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await db_session.commit() result = await build_mist_anchor_index(db_session, repo_id, cid) assert result == [] @pytest.mark.asyncio async def test_object_missing_from_store_is_skipped( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """Object_id in manifest but no MusehubObject row → skip gracefully.""" from musehub.services.musehub_mist_indexer import build_mist_anchor_index owner_id = compute_identity_id(test_user.handle.encode()) created_at = _now() repo_id = compute_repo_id(owner_id, "ghost-obj", "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name="ghost-obj", owner=test_user.handle, slug="ghost-obj", visibility="public", owner_user_id=owner_id, domain_id="mist", description="", tags=[], created_at=created_at, ) db_session.add(repo) await db_session.flush() ghost_oid = blob_id(b"ghost content that has no DB row") manifest = {"ghost.py": ghost_oid} snap_id = _snap_id(manifest) db_session.add(MusehubSnapshot( snapshot_id=snap_id, entry_count=1, manifest_blob=_manifest_blob(manifest), )) db_session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) await db_session.flush() cid = _commit_id() commit = MusehubCommit( commit_id=cid, message="ghost", author=test_user.handle, branch="main", parent_ids=[], snapshot_id=snap_id, timestamp=_now(), ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await db_session.commit() # Must not raise — silently skips the missing object. result = await build_mist_anchor_index(db_session, repo_id, cid) assert isinstance(result, list) @pytest.mark.asyncio async def test_returns_intel_result_tuple( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_mist_indexer import build_mist_anchor_index repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={"result.py": b"def answer(): return 42\n"}, ) result = await build_mist_anchor_index(db_session, repo.repo_id, commit.commit_id) assert len(result) == 1 intel_type, data = result[0] assert intel_type == "mist.anchor_index" assert "anchor_count" in data assert data["anchor_count"] >= 1 # --------------------------------------------------------------------------- # 6. MistProvider delegates to build_mist_anchor_index # --------------------------------------------------------------------------- class TestMistProviderDelegatesToIndexer: @pytest.mark.asyncio async def test_mist_provider_writes_normalized_rows( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """MistProvider.compute triggers the normalized indexer for VCS-backed mists.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY provider = _PROVIDER_REGISTRY["intel.mist"] repo, commit = await _seed_mist_vcs_repo( db_session, owner=test_user.handle, artifacts={"svc.py": b"class Service:\n def run(self): pass\n"}, ) await provider.compute(db_session, repo.repo_id, commit.commit_id, {}) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalars().all() assert len(rows) >= 1, "MistProvider must write normalized symbol history entries"