"""Phase 1 TDD: Mist domain intel pipeline — job dispatch + MistProvider. Tests are written RED first. Run them before touching musehub_intel_providers.py to confirm they fail for the right reason, then implement to make them green. Coverage: 1. job_types_for_push("mist") includes "intel.mist" 2. MistProvider.compute extracts anchors and persists intel results 3. MistProvider.compute handles binary / anchor-free artifacts gracefully 4. Regression: code and midi dispatch are unaffected by the mist branch 5. MistProvider is registered in _PROVIDER_REGISTRY under "intel.mist" """ from __future__ import annotations import secrets from datetime import datetime, timezone import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelResult from musehub.db.musehub_repo_models import MusehubMist, MusehubRepo from musehub.core.genesis import compute_identity_id, compute_repo_id # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid() -> str: return secrets.token_hex(16) def _repo_id(owner: str, slug: str) -> str: return compute_repo_id( compute_identity_id(owner.encode()), slug, "mist", _now().isoformat(), ) async def _seed_mist( session: AsyncSession, *, owner: str = "testuser", filename: str = "snippet.py", content: str = "def hello():\n return 'world'\n", artifact_type: str = "code", symbol_anchors: list[str] | None = None, mist_id: str | None = None, ) -> tuple[MusehubRepo, MusehubMist]: """Create a MusehubRepo (domain_id='mist') and a linked MusehubMist row.""" slug = mist_id or f"mist-{secrets.token_hex(4)}" owner_id = compute_identity_id(owner.encode()) created_at = _now() repo_id = compute_repo_id(owner_id, slug, "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, domain_id="mist", description="test mist repo", tags=[], created_at=created_at, ) session.add(repo) await session.flush() actual_mist_id = mist_id or f"Abc{secrets.token_hex(5)[:9]}" mist = MusehubMist( mist_id=actual_mist_id, repo_id=repo_id, owner=owner, filename=filename, content=content, artifact_type=artifact_type, language="python" if filename.endswith(".py") else "", size_bytes=len(content.encode()), symbol_anchors=symbol_anchors or [], ) session.add(mist) await session.commit() await session.refresh(repo) await session.refresh(mist) return repo, mist # --------------------------------------------------------------------------- # 1. job_types_for_push dispatch # --------------------------------------------------------------------------- class TestJobTypesForPush: def test_mist_domain_dispatches_intel_mist(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("mist") assert "intel.mist" in types, ( f"job_types_for_push('mist') must include 'intel.mist'; got {types}" ) def test_mist_domain_always_includes_structural(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("mist") assert "intel.structural" in types def test_mist_domain_always_includes_gc(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("mist") assert "gc" in types def test_mist_domain_does_not_include_intel_code(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("mist") assert "intel.code" not in types, ( "mist domain must not trigger code intel job" ) def test_code_domain_unaffected(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("code") assert "intel.code" in types assert "intel.mist" not in types def test_midi_domain_unaffected(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("midi") assert "intel.midi" in types assert "intel.mist" not in types def test_none_domain_defaults_to_code(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push(None) assert "intel.code" in types assert "intel.mist" not in types # --------------------------------------------------------------------------- # 2. _PROVIDER_REGISTRY contains "intel.mist" # --------------------------------------------------------------------------- class TestProviderRegistry: def test_intel_mist_is_registered(self) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY assert "intel.mist" in _PROVIDER_REGISTRY, ( "'intel.mist' must be in _PROVIDER_REGISTRY" ) def test_intel_mist_satisfies_protocol(self) -> None: from musehub.services.musehub_intel_providers import ( _PROVIDER_REGISTRY, IntelProvider, ) provider = _PROVIDER_REGISTRY["intel.mist"] assert isinstance(provider, IntelProvider), ( "MistProvider must satisfy the IntelProvider protocol" ) # --------------------------------------------------------------------------- # 3. MistProvider.compute — anchor extraction # --------------------------------------------------------------------------- class TestMistProviderCompute: @pytest.mark.asyncio async def test_extracts_anchors_for_python_artifact( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """Provider returns mist.anchors result with correct symbol addresses.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY provider = _PROVIDER_REGISTRY["intel.mist"] repo, mist = await _seed_mist( db_session, owner=test_user.handle, filename="utils.py", content="def add(a, b):\n return a + b\n\ndef subtract(a, b):\n return a - b\n", artifact_type="code", ) results = await provider.compute(db_session, repo.repo_id, "HEAD", {}) assert len(results) == 1 intel_type, data = results[0] assert intel_type == "mist.anchors" assert data["mist_id"] == mist.mist_id assert data["filename"] == "utils.py" assert data["artifact_type"] == "code" anchors: list[str] = data["symbol_anchors"] assert any("add" in a for a in anchors), f"Expected 'add' anchor; got {anchors}" assert any("subtract" in a for a in anchors), f"Expected 'subtract' anchor; got {anchors}" assert data["anchor_count"] == len(anchors) @pytest.mark.asyncio async def test_anchor_count_matches_symbol_anchors_length( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY provider = _PROVIDER_REGISTRY["intel.mist"] repo, _ = await _seed_mist( db_session, owner=test_user.handle, filename="calc.py", content=( "class Calc:\n" " def mul(self, a, b): return a * b\n" " def div(self, a, b): return a / b\n" ), ) results = await provider.compute(db_session, repo.repo_id, "HEAD", {}) _, data = results[0] assert data["anchor_count"] == len(data["symbol_anchors"]) @pytest.mark.asyncio async def test_binary_artifact_produces_zero_anchors( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """Binary content (e.g. base64) with no parsable symbols → zero anchors, no crash.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY provider = _PROVIDER_REGISTRY["intel.mist"] # Base64-encoded PNG header — not parseable as Python/JS/TS binary_content = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk" repo, _ = await _seed_mist( db_session, owner=test_user.handle, filename="image.png", content=binary_content, artifact_type="image", ) results = await provider.compute(db_session, repo.repo_id, "HEAD", {}) assert len(results) == 1 _, data = results[0] assert data["symbol_anchors"] == [] assert data["anchor_count"] == 0 @pytest.mark.asyncio async def test_no_mist_for_repo_returns_empty( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """A repo with no mist row (edge case) → empty results, no crash.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY provider = _PROVIDER_REGISTRY["intel.mist"] owner_id = compute_identity_id(test_user.handle.encode()) created_at = _now() repo_id = compute_repo_id(owner_id, "orphan-repo", "mist", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name="orphan-repo", owner=test_user.handle, slug="orphan-repo", visibility="public", owner_user_id=owner_id, domain_id="mist", description="", tags=[], created_at=created_at, ) db_session.add(repo) await db_session.commit() results = await provider.compute(db_session, repo_id, "HEAD", {}) assert results == [] @pytest.mark.asyncio async def test_updates_symbol_anchors_on_mist_row( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """Provider refreshes mist.symbol_anchors in the DB if they were stale.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY provider = _PROVIDER_REGISTRY["intel.mist"] # Seed with deliberately empty symbol_anchors repo, mist = await _seed_mist( db_session, owner=test_user.handle, filename="module.py", content="def process(data):\n return data\n", symbol_anchors=[], # stale — will be refreshed by provider ) await provider.compute(db_session, repo.repo_id, "HEAD", {}) await db_session.commit() await db_session.refresh(mist) assert any("process" in a for a in mist.symbol_anchors), ( f"mist.symbol_anchors should be refreshed; got {mist.symbol_anchors}" ) @pytest.mark.asyncio async def test_results_persisted_via_persist_intel_results( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """Full pipeline: compute → persist_intel_results → row in musehub_intel_results.""" from musehub.services.musehub_intel_providers import ( _PROVIDER_REGISTRY, persist_intel_results, ) provider = _PROVIDER_REGISTRY["intel.mist"] repo, _ = await _seed_mist( db_session, owner=test_user.handle, filename="api.py", content="async def handle(request):\n pass\n", ) results = await provider.compute(db_session, repo.repo_id, "HEAD", {}) await persist_intel_results(db_session, repo.repo_id, "HEAD", results) await db_session.commit() row = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "mist.anchors", ) )).scalar_one_or_none() assert row is not None, "intel result row must exist after persist_intel_results" import json data = json.loads(row.data_json) assert data["mist_id"] is not None assert "symbol_anchors" in data @pytest.mark.asyncio async def test_persist_is_idempotent( self, db_session: AsyncSession, test_user: db.MusehubIdentity ) -> None: """Running compute + persist twice for the same repo produces exactly one row.""" from musehub.services.musehub_intel_providers import ( _PROVIDER_REGISTRY, persist_intel_results, ) from sqlalchemy import func provider = _PROVIDER_REGISTRY["intel.mist"] repo, _ = await _seed_mist( db_session, owner=test_user.handle, filename="idempotent.py", content="def noop(): pass\n", ) for _ in range(2): results = await provider.compute(db_session, repo.repo_id, "HEAD", {}) await persist_intel_results(db_session, repo.repo_id, "HEAD", results) await db_session.commit() count = (await db_session.execute( select(func.count()).select_from(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "mist.anchors", ) )).scalar_one() assert count == 1, f"Idempotent upsert must produce exactly 1 row; got {count}"