"""TDD spec for Phase 1 — SQL-derived DeadProvider (issue #10). DeadProvider replaces the muse-CLI subprocess approach with a pure SQL derivation from `musehub_symbol_intel` blast and churn columns. Confidence formula: HIGH → symbol_kind IN tracked_kinds AND blast == 0 AND churn == 1 MEDIUM → symbol_kind IN tracked_kinds AND blast == 0 AND churn > 1 AND churn_30d == 0 LOW → symbol_kind IN tracked_kinds AND blast == 0 AND churn_30d > 0 tracked_kinds = {function, async_function, method, async_method, class} Reason strings: HIGH: "Added once, never modified. Zero blast radius in full history." MEDIUM: "Modified in past but zero blast radius for ≥ 30 days." LOW: "Zero blast radius. Recently active — verify before deleting." Dismiss preservation: On upsert, dismissed=True is NEVER overwritten by a re-run. New rows always start with dismissed=False. Layers: 1. Registry — "intel.code.dead" in _PROVIDER_REGISTRY 2. Protocol — satisfies IntelProvider 3. Dispatch — job_types_for_push("code") includes "intel.code.dead" job_types_for_push("midi") excludes "intel.code.dead" 4. High conf — blast=0, churn=1 → confidence="high" 5. Medium conf — blast=0, churn>1, churn_30d=0 → confidence="medium" 6. Low conf — blast=0, churn_30d>0 → confidence="low" 7. Excluded — blast>0 → not a candidate 8. Kind filter — kind="import" (untracked) → excluded 9. Reasons — reason string correct per tier 10. Dismissed — new rows get dismissed=False; existing dismissed=True preserved 11. Empty — no symbol_intel rows → returns [] 12. Idempotent — run twice, one row per address 13. Return type — returns [("intel.code.dead", {"count": N})] 14. No subprocess — compute() never calls asyncio.create_subprocess_exec """ from __future__ import annotations import secrets from unittest.mock import patch import pytest import pytest_asyncio from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id from musehub.db.musehub_intel_models import MusehubIntelDead, MusehubSymbolIntel from musehub.types.json_types import JSONObject from tests.factories import create_repo def _uid() -> str: return fake_id(secrets.token_hex(16)) _TRACKED_KINDS = ("function", "async_function", "method", "async_method", "class") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _seed_symbol( session: AsyncSession, repo_id: str, *, address: str, kind: str = "function", blast: int = 0, blast_direct: int = 0, blast_cross: int = 0, churn: int = 1, churn_30d: int = 0, churn_90d: int = 0, ) -> None: stmt = ( pg_insert(MusehubSymbolIntel) .values( repo_id=repo_id, address=address, symbol_kind=kind, blast=blast, blast_direct=blast_direct, blast_cross=blast_cross, churn=churn, churn_30d=churn_30d, churn_90d=churn_90d, author_count=1, gravity=0.0, weekly=[0] * 12, blast_top=[], ) .on_conflict_do_update( index_elements=["repo_id", "address"], set_={ "symbol_kind": kind, "blast": blast, "blast_direct": blast_direct, "blast_cross": blast_cross, "churn": churn, "churn_30d": churn_30d, "churn_90d": churn_90d, }, ) ) await session.execute(stmt) await session.flush() async def _get_dead( session: AsyncSession, repo_id: str, address: str ) -> MusehubIntelDead | None: result = await session.execute( select(MusehubIntelDead).where( MusehubIntelDead.repo_id == repo_id, MusehubIntelDead.address == address, ) ) return result.scalar_one_or_none() async def _run_provider(session: AsyncSession, repo_id: str) -> list[tuple[str, JSONObject]]: from musehub.services.musehub_intel_providers import DeadProvider provider = DeadProvider() return await provider.compute(session, repo_id, "—", {}) # --------------------------------------------------------------------------- # Layer 1 — Registry # --------------------------------------------------------------------------- class TestDeadProviderRegistry: def test_P1_01_dead_in_provider_registry(self) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY assert "intel.code.dead" in _PROVIDER_REGISTRY def test_P1_02_dead_satisfies_intel_provider_protocol(self) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY, IntelProvider provider = _PROVIDER_REGISTRY["intel.code.dead"] assert isinstance(provider, IntelProvider) # --------------------------------------------------------------------------- # Layer 2 — Dispatch # --------------------------------------------------------------------------- class TestDeadProviderDispatch: def test_P1_03_job_types_for_push_code_includes_dead(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push assert "intel.code.dead" in job_types_for_push("code") def test_P1_04_job_types_for_push_midi_excludes_dead(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push assert "intel.code.dead" not in job_types_for_push("midi") # --------------------------------------------------------------------------- # Layer 3 — Confidence tiers # --------------------------------------------------------------------------- class TestDeadProviderConfidence: @pytest.mark.asyncio async def test_P1_05_blast0_churn1_yields_high( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/a.py::only_added", blast=0, churn=1, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/a.py::only_added") assert row is not None assert row.confidence == "high" @pytest.mark.asyncio async def test_P1_06_blast0_churn_gt1_churn30d0_yields_medium( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/b.py::touched_but_quiet", blast=0, churn=5, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/b.py::touched_but_quiet") assert row is not None assert row.confidence == "medium" @pytest.mark.asyncio async def test_P1_07_blast0_churn30d_gt0_yields_low( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/c.py::recently_active", blast=0, churn=3, churn_30d=2, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/c.py::recently_active") assert row is not None assert row.confidence == "low" # --------------------------------------------------------------------------- # Layer 4 — Exclusion rules # --------------------------------------------------------------------------- class TestDeadProviderExclusion: @pytest.mark.asyncio async def test_P1_08_blast_gt0_excluded( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/d.py::has_dependents", blast=3, churn=1, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/d.py::has_dependents") assert row is None @pytest.mark.asyncio async def test_P1_09_untracked_kind_excluded( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/e.py::some_import", kind="import", blast=0, churn=1, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/e.py::some_import") assert row is None # --------------------------------------------------------------------------- # Layer 5 — Reason strings # --------------------------------------------------------------------------- class TestDeadProviderReasons: @pytest.mark.asyncio async def test_P1_10_high_reason_string( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/f.py::high_sym", blast=0, churn=1, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/f.py::high_sym") assert row is not None assert row.reason == "Added once, never modified. Zero blast radius in full history." @pytest.mark.asyncio async def test_P1_10b_medium_reason_string( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/g.py::medium_sym", blast=0, churn=3, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/g.py::medium_sym") assert row is not None assert row.reason == "Modified in past but zero blast radius for ≥ 30 days." @pytest.mark.asyncio async def test_P1_10c_low_reason_string( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/h.py::low_sym", blast=0, churn=2, churn_30d=1, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/h.py::low_sym") assert row is not None assert row.reason == "Zero blast radius. Recently active — verify before deleting." # --------------------------------------------------------------------------- # Layer 6 — Dismiss preservation # --------------------------------------------------------------------------- class TestDeadProviderDismiss: @pytest.mark.asyncio async def test_P1_11_new_rows_start_not_dismissed( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/i.py::new_sym", blast=0, churn=1, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/i.py::new_sym") assert row is not None assert row.dismissed is False @pytest.mark.asyncio async def test_P1_11b_existing_dismissed_preserved_on_rerun( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/j.py::dismissed_sym", blast=0, churn=1, churn_30d=0, ) # First run — creates the row await _run_provider(db_session, repo.repo_id) # Manually dismiss it row = await _get_dead(db_session, repo.repo_id, "pkg/j.py::dismissed_sym") row.dismissed = True await db_session.flush() # Second run — must NOT reset dismissed to False await _run_provider(db_session, repo.repo_id) row = await _get_dead(db_session, repo.repo_id, "pkg/j.py::dismissed_sym") assert row is not None assert row.dismissed is True # --------------------------------------------------------------------------- # Layer 7 — Edge cases # --------------------------------------------------------------------------- class TestDeadProviderEdgeCases: @pytest.mark.asyncio async def test_P1_12_empty_repo_returns_empty_list( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) result = await _run_provider(db_session, repo.repo_id) assert result == [] @pytest.mark.asyncio async def test_P1_13_idempotent_run_twice_one_row( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/k.py::stable_sym", blast=0, churn=1, churn_30d=0, ) await _run_provider(db_session, repo.repo_id) await _run_provider(db_session, repo.repo_id) from sqlalchemy import func count = (await db_session.execute( select(func.count()).select_from(MusehubIntelDead).where( MusehubIntelDead.repo_id == repo.repo_id, MusehubIntelDead.address == "pkg/k.py::stable_sym", ) )).scalar_one() assert count == 1 @pytest.mark.asyncio async def test_P1_14_return_type( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/l.py::ret_sym", blast=0, churn=1, churn_30d=0, ) result = await _run_provider(db_session, repo.repo_id) assert len(result) == 1 intel_type, data = result[0] assert intel_type == "intel.code.dead" assert data["count"] == 1 # --------------------------------------------------------------------------- # Layer 8 — No subprocess # --------------------------------------------------------------------------- class TestDeadProviderNoSubprocess: @pytest.mark.asyncio async def test_P1_15_no_subprocess_spawned( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/m.py::no_proc_sym", blast=0, churn=1, churn_30d=0, ) with patch("asyncio.create_subprocess_exec") as mock_exec: await _run_provider(db_session, repo.repo_id) mock_exec.assert_not_called()