"""TDD spec for Phase 2 — worker intel providers (issue #8). 11 new ``IntelProvider`` subclasses, one per normalized intel table, each wrapping a ``muse code --json`` subprocess call and upserting rows into the corresponding DB table. New job types (all in the ``intel.code.*`` namespace): intel.code.coupling → MusehubIntelCoupling intel.code.entangle → MusehubIntelEntangle intel.code.dead → MusehubIntelDead intel.code.blast_risk → MusehubIntelBlastRisk intel.code.stable → MusehubIntelStable intel.code.velocity → MusehubIntelVelocity intel.code.clones → MusehubIntelClones intel.code.type → MusehubIntelType intel.code.api_surface → MusehubIntelApiSurface intel.code.languages → MusehubIntelLanguages intel.code.detect_refactor → MusehubIntelRefactorEvent Contract each provider must satisfy: 1. Registered under its job-type key in ``_PROVIDER_REGISTRY``. 2. ``compute()`` calls ``muse -C code --json`` (or the equivalent runner) and upserts result rows. 3. ``compute()`` returns a non-empty ``IntelResults`` list on success. 4. ``compute()`` returns ``[]`` gracefully when the muse command yields no results (empty repo, no symbols, etc.). 5. ``compute()`` returns ``[]`` gracefully when the subprocess exits non-zero. Layers: 1. Registry — job types present in _PROVIDER_REGISTRY 2. Dispatch — job_types_for_push("code") includes all 11 new types 3. Coupling — provider upserts MusehubIntelCoupling rows 4. Entangle — provider upserts MusehubIntelEntangle rows 5. Dead — provider upserts MusehubIntelDead rows 6. BlastRisk — provider upserts MusehubIntelBlastRisk rows 7. Stable — provider upserts MusehubIntelStable rows 8. Velocity — provider upserts MusehubIntelVelocity rows 9. Clones — provider upserts MusehubIntelClones rows 10. Type — provider upserts MusehubIntelType rows 11. ApiSurface — provider upserts MusehubIntelApiSurface rows 12. Languages — provider upserts MusehubIntelLanguages rows 13. Refactor — provider upserts MusehubIntelRefactorEvent rows 14. Empty — all providers handle empty muse output gracefully 15. Error — all providers handle non-zero exit gracefully """ from __future__ import annotations import json import secrets from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch import msgpack import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id from tests.factories import create_repo type _ContentMap = dict[str, bytes] _ALL_PHASE2_JOB_TYPES = [ "intel.code.coupling", "intel.code.entangle", "intel.code.dead", "intel.code.blast_risk", "intel.code.stable", "intel.code.velocity", "intel.code.clones", "intel.code.type", "intel.code.api_surface", "intel.code.languages", "intel.code.detect_refactor", ] def _uid() -> str: return fake_id(secrets.token_hex(16)) def _now() -> datetime: return datetime.now(tz=timezone.utc) def _mock_process(stdout: str, returncode: int = 0) -> AsyncMock: """Return an asyncio.subprocess.Process mock.""" proc = AsyncMock() proc.returncode = returncode proc.communicate = AsyncMock(return_value=(stdout.encode(), b"")) return proc async def _make_commit_and_snapshot( session: AsyncSession, repo_id: str, manifest: dict[str, str], parent_ids: list[str] | None = None, ) -> tuple[str, str]: """Insert MusehubSnapshot + MusehubCommit + MusehubObject rows; return (commit_id, snapshot_id).""" from musehub.db.musehub_repo_models import ( MusehubCommit, MusehubCommitRef, MusehubObject, MusehubSnapshot, MusehubSnapshotRef, ) from sqlalchemy.dialects.postgresql import insert as pg_insert snap_id = _uid() commit_id = _uid() session.add(MusehubSnapshot( snapshot_id=snap_id, manifest_blob=msgpack.packb(manifest, use_bin_type=True), )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) session.add(MusehubCommit( commit_id=commit_id, branch="main", message="test", author="tester", timestamp=datetime.now(tz=timezone.utc), snapshot_id=snap_id, parent_ids=parent_ids or [], )) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) # Seed MusehubObject rows so providers can find objects via session.get(MusehubObject, oid) for path, oid in manifest.items(): await session.execute( pg_insert(MusehubObject) .values(object_id=oid, path=path, size_bytes=32, storage_uri=f"mem://{oid}") .on_conflict_do_nothing(index_elements=["object_id"]) ) await session.flush() return commit_id, snap_id def _mock_backend(content_map: _ContentMap) -> AsyncMock: backend = AsyncMock() backend.get = AsyncMock(side_effect=lambda oid, **_: content_map.get(oid)) return backend # ───────────────────────────────────────────────────────────────────────────── # Layer 1 — Registry: all 11 providers registered # ───────────────────────────────────────────────────────────────────────────── class TestPhase2Registry: def test_P2_01_all_phase2_job_types_in_registry(self) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY missing = [jt for jt in _ALL_PHASE2_JOB_TYPES if jt not in _PROVIDER_REGISTRY] assert not missing, f"Missing from _PROVIDER_REGISTRY: {missing}" def test_P2_02_registry_providers_satisfy_protocol(self) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY, IntelProvider for jt in _ALL_PHASE2_JOB_TYPES: provider = _PROVIDER_REGISTRY[jt] assert isinstance(provider, IntelProvider), ( f"{jt} provider does not satisfy IntelProvider protocol" ) # ───────────────────────────────────────────────────────────────────────────── # Layer 2 — Dispatch: job_types_for_push includes all 11 new types # ───────────────────────────────────────────────────────────────────────────── class TestPhase2Dispatch: def test_P2_03_job_types_for_push_code_includes_all_phase2_types(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("code") missing = [jt for jt in _ALL_PHASE2_JOB_TYPES if jt not in types] assert not missing, f"Missing from job_types_for_push('code'): {missing}" def test_P2_04_job_types_for_push_code_still_includes_legacy_types(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("code") assert "intel.structural" in types assert "intel.code" in types assert "gc" in types def test_P2_05_job_types_for_push_midi_excludes_phase2_types(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push types = job_types_for_push("midi") for jt in _ALL_PHASE2_JOB_TYPES: assert jt not in types, f"{jt} should not run for midi repos" # ───────────────────────────────────────────────────────────────────────────── # Layer 10 — TypeProvider # ───────────────────────────────────────────────────────────────────────────── class TestPhase2TypeProvider: @pytest.mark.asyncio async def test_P2_14_type_upserts_rows(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) py_src = b"def fn(x: int, y: str) -> bool:\n pass\n" obj_id = "obj-type-test" backend = _mock_backend({obj_id: py_src}) commit_id, _ = await _make_commit_and_snapshot( db_session, repo.repo_id, {"a.py": obj_id} ) with patch("musehub.storage.backends.get_backend", return_value=backend), \ patch("musehub.services.musehub_intel_providers.get_backend", return_value=backend), \ patch("musehub.storage.backends.read_object_bytes", new=AsyncMock(return_value=py_src)): results = await _PROVIDER_REGISTRY["intel.code.type"].compute( db_session, repo.repo_id, commit_id, {"head": commit_id, "owner": repo.owner, "slug": repo.slug}, ) assert results rows = (await db_session.execute( select(db.MusehubIntelType).where(db.MusehubIntelType.repo_id == repo.repo_id) )).scalars().all() assert len(rows) == 1 assert rows[0].type_score == pytest.approx(1.0) assert rows[0].return_is_any is False # ───────────────────────────────────────────────────────────────────────────── # Layer 11 — ApiSurfaceProvider # ───────────────────────────────────────────────────────────────────────────── class TestPhase2ApiSurfaceProvider: @pytest.mark.asyncio async def test_P2_15_api_surface_upserts_rows(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) py_src = b"def get_repo(repo_id: str) -> dict:\n pass\n" obj_id = "obj-api-test" backend = _mock_backend({obj_id: py_src}) commit_id, _ = await _make_commit_and_snapshot( db_session, repo.repo_id, {"api/routes.py": obj_id} ) with patch("musehub.services.musehub_intel_providers.get_backend", return_value=backend), \ patch("musehub.storage.backends.read_object_bytes", new=AsyncMock(return_value=py_src)): results = await _PROVIDER_REGISTRY["intel.code.api_surface"].compute( db_session, repo.repo_id, commit_id, {"head": commit_id, "owner": repo.owner, "slug": repo.slug}, ) assert results rows = (await db_session.execute( select(db.MusehubIntelApiSurface).where(db.MusehubIntelApiSurface.repo_id == repo.repo_id) )).scalars().all() assert len(rows) == 1 assert rows[0].signature_id is not None assert rows[0].visibility == "public" # ───────────────────────────────────────────────────────────────────────────── # Layer 12 — LanguagesProvider # ───────────────────────────────────────────────────────────────────────────── class TestPhase2LanguagesProvider: @pytest.mark.asyncio async def test_P2_16_languages_upserts_rows(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) py_src = b"def fn(x: int) -> bool:\n pass\n" py_oid = "obj-lang-py" toml_src = b"[workspace]\nversion = 1\n" toml_oid = "obj-lang-toml" backend = _mock_backend({py_oid: py_src, toml_oid: toml_src}) commit_id, _ = await _make_commit_and_snapshot( db_session, repo.repo_id, {"src/main.py": py_oid, "pyproject.toml": toml_oid} ) with patch("musehub.services.musehub_intel_providers.get_backend", return_value=backend): results = await _PROVIDER_REGISTRY["intel.code.languages"].compute( db_session, repo.repo_id, commit_id, {"head": commit_id, "owner": repo.owner, "slug": repo.slug}, ) assert results rows = (await db_session.execute( select(db.MusehubIntelLanguages).where(db.MusehubIntelLanguages.repo_id == repo.repo_id) )).scalars().all() assert len(rows) == 2 py = next(r for r in rows if r.language == "Python") assert py.file_count == 1 assert py.symbol_count == 1 # ───────────────────────────────────────────────────────────────────────────── # Layer 13 — DetectRefactorProvider # ───────────────────────────────────────────────────────────────────────────── class TestPhase2DetectRefactorProvider: @pytest.mark.asyncio async def test_P2_17_detect_refactor_upserts_rows(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) # parent snapshot: a.py has old_name parent_src = b"def old_name():\n pass\n" parent_oid = "obj-refactor-parent" # head snapshot: a.py has new_name (same body → rename) head_src = b"def new_name():\n pass\n" head_oid = "obj-refactor-head" backend = _mock_backend({parent_oid: parent_src, head_oid: head_src}) parent_commit_id, _ = await _make_commit_and_snapshot( db_session, repo.repo_id, {"a.py": parent_oid} ) head_commit_id, _ = await _make_commit_and_snapshot( db_session, repo.repo_id, {"a.py": head_oid}, parent_ids=[parent_commit_id], ) with patch("musehub.services.musehub_intel_providers.get_backend", return_value=backend): results = await _PROVIDER_REGISTRY["intel.code.detect_refactor"].compute( db_session, repo.repo_id, head_commit_id, {"head": head_commit_id, "owner": repo.owner, "slug": repo.slug}, ) assert results rows = (await db_session.execute( select(db.MusehubIntelRefactorEvent).where( db.MusehubIntelRefactorEvent.repo_id == repo.repo_id ) )).scalars().all() assert len(rows) == 1 assert rows[0].kind == "rename" assert rows[0].address == "a.py::old_name" @pytest.mark.asyncio async def test_P2_18_detect_refactor_deduplicates_by_event_id(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) parent_src = b"def old_name():\n pass\n" parent_oid = "obj-dedup-parent" head_src = b"def new_name():\n pass\n" head_oid = "obj-dedup-head" backend = _mock_backend({parent_oid: parent_src, head_oid: head_src}) parent_commit_id, _ = await _make_commit_and_snapshot( db_session, repo.repo_id, {"a.py": parent_oid} ) head_commit_id, _ = await _make_commit_and_snapshot( db_session, repo.repo_id, {"a.py": head_oid}, parent_ids=[parent_commit_id], ) with patch("musehub.services.musehub_intel_providers.get_backend", return_value=backend): await _PROVIDER_REGISTRY["intel.code.detect_refactor"].compute( db_session, repo.repo_id, head_commit_id, {"head": head_commit_id, "owner": repo.owner, "slug": repo.slug}, ) with patch("musehub.services.musehub_intel_providers.get_backend", return_value=backend): await _PROVIDER_REGISTRY["intel.code.detect_refactor"].compute( db_session, repo.repo_id, head_commit_id, {"head": head_commit_id, "owner": repo.owner, "slug": repo.slug}, ) rows = (await db_session.execute( select(db.MusehubIntelRefactorEvent).where( db.MusehubIntelRefactorEvent.repo_id == repo.repo_id ) )).scalars().all() assert len(rows) == 1, "duplicate event inserted — event_id upsert is broken" # ───────────────────────────────────────────────────────────────────────────── # Layer 14 — Empty output: all providers return [] gracefully # ───────────────────────────────────────────────────────────────────────────── class TestPhase2EmptyOutput: @pytest.mark.asyncio @pytest.mark.parametrize("job_type,empty_key", [ ("intel.code.coupling", '{"pairs": []}'), ("intel.code.entangle", '{"pairs": []}'), ("intel.code.dead", '{"candidates": []}'), ("intel.code.blast_risk", '{"symbols": []}'), ("intel.code.stable", '{"symbols": []}'), ("intel.code.velocity", '{"modules": []}'), ("intel.code.clones", '{"clusters": []}'), ("intel.code.type", '{"symbols": []}'), ("intel.code.api_surface", '{"symbols": []}'), ("intel.code.languages", '{"languages": []}'), ("intel.code.detect_refactor",'{"events": []}'), ]) async def test_P2_19_empty_muse_output_returns_empty_list( self, job_type: str, empty_key: str, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) ref = _uid() with patch("asyncio.create_subprocess_exec", return_value=_mock_process(empty_key)): results = await _PROVIDER_REGISTRY[job_type].compute( db_session, repo.repo_id, ref, {"head": ref, "owner": repo.owner, "slug": repo.slug} ) assert results == [] # ───────────────────────────────────────────────────────────────────────────── # Layer 15 — Non-zero exit: all providers return [] gracefully # ───────────────────────────────────────────────────────────────────────────── class TestPhase2ErrorHandling: @pytest.mark.asyncio @pytest.mark.parametrize("job_type", _ALL_PHASE2_JOB_TYPES) async def test_P2_20_nonzero_exit_returns_empty_list( self, job_type: str, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) ref = _uid() with patch("asyncio.create_subprocess_exec", return_value=_mock_process("", returncode=1)): results = await _PROVIDER_REGISTRY[job_type].compute( db_session, repo.repo_id, ref, {"head": ref, "owner": repo.owner, "slug": repo.slug} ) assert results == []