"""TDD spec for Phase 1 — SQL-derived BlastRiskProvider (issue #11). BlastRiskProvider replaces the muse-CLI subprocess approach with a pure SQL derivation from `musehub_symbol_intel` blast and churn columns. Risk score formula: impact_score = min(blast / 50.0, 1.0) — normalized blast radius churn_score = min(churn_30d / 20.0, 1.0) — normalized 30-day churn test_gap_score = 1.0 — no coverage data → worst case coupling_score = min(blast_cross / 10.0, 1.0) — normalized cross-domain blast risk_score = round( impact_score * 40 + churn_score * 25 + test_gap_score * 20 + coupling_score * 15 ) Risk tiers: critical → risk_score >= 75 high → risk_score >= 50 medium → risk_score >= 25 low → risk_score < 25 tracked_kinds = {function, async_function, method, async_method, class} Only symbols with blast > 0 are candidates. Layers: Unit (no DB): 1. Registry — "intel.code.blast_risk" in _PROVIDER_REGISTRY 2. Protocol — satisfies IntelProvider 3. Dispatch — job_types_for_push("code") includes "intel.code.blast_risk" job_types_for_push("midi") excludes "intel.code.blast_risk" 4. Tier thresholds — _risk_tier boundaries at 75/50/25 5. Score formula — weights sum correctly, all-max → 100, all-zero → 0 Integration (DB): 6. High blast+churn → critical tier 7. Zero blast → excluded 8. Untracked kind → excluded 9. risk_score capped at 100 10. Idempotent — run twice, one row per address 11. Return type — [("intel.code.blast_risk", {"count": N})] 12. Empty repo → [] State integrity: 13. Re-run updates risk_score in-place (upsert, not duplicate) 14. Upsert does not touch symbol_intel blast/churn columns 15. ref column updated to latest ref on each run Performance: 16. 1000 symbol rows processed in < 5 seconds No subprocess: 17. compute() never calls asyncio.create_subprocess_exec """ from __future__ import annotations import secrets import time from unittest.mock import patch import pytest import pytest_asyncio from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id, long_id from musehub.db.musehub_intel_models import MusehubIntelBlastRisk, MusehubSymbolIntel from musehub.types.json_types import JSONObject from tests.factories import create_repo def _uid() -> str: return fake_id(secrets.token_hex(16)) _TRACKED_KINDS = ("function", "async_function", "method", "async_method", "class") _REF_A = long_id("a" * 64) _REF_B = long_id("b" * 64) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _seed_symbol( session: AsyncSession, repo_id: str, *, address: str, kind: str = "function", blast: int = 10, blast_direct: int = 5, blast_cross: int = 2, churn: int = 5, churn_30d: int = 3, churn_90d: int = 4, ) -> None: stmt = ( pg_insert(MusehubSymbolIntel) .values( repo_id=repo_id, address=address, symbol_kind=kind, blast=blast, blast_direct=blast_direct, blast_cross=blast_cross, churn=churn, churn_30d=churn_30d, churn_90d=churn_90d, author_count=1, gravity=0.0, weekly=[0] * 12, blast_top=[], ) .on_conflict_do_update( index_elements=["repo_id", "address"], set_={ "symbol_kind": kind, "blast": blast, "blast_direct": blast_direct, "blast_cross": blast_cross, "churn": churn, "churn_30d": churn_30d, "churn_90d": churn_90d, }, ) ) await session.execute(stmt) await session.flush() async def _get_risk_row( session: AsyncSession, repo_id: str, address: str ) -> MusehubIntelBlastRisk | None: result = await session.execute( select(MusehubIntelBlastRisk).where( MusehubIntelBlastRisk.repo_id == repo_id, MusehubIntelBlastRisk.address == address, ) ) return result.scalar_one_or_none() async def _run_provider(session: AsyncSession, repo_id: str, ref: str = _REF_A) -> list[tuple[str, JSONObject]]: from musehub.services.musehub_intel_providers import BlastRiskProvider provider = BlastRiskProvider() return await provider.compute(session, repo_id, ref, {}) # --------------------------------------------------------------------------- # Layer 1 — Registry # --------------------------------------------------------------------------- class TestBlastRiskRegistry: def test_P1_01_blast_risk_in_provider_registry(self) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY assert "intel.code.blast_risk" in _PROVIDER_REGISTRY def test_P1_02_blast_risk_satisfies_intel_provider_protocol(self) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY, IntelProvider provider = _PROVIDER_REGISTRY["intel.code.blast_risk"] assert isinstance(provider, IntelProvider) # --------------------------------------------------------------------------- # Layer 2 — Dispatch # --------------------------------------------------------------------------- class TestBlastRiskDispatch: def test_P1_03_job_types_for_push_code_includes_blast_risk(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push assert "intel.code.blast_risk" in job_types_for_push("code") def test_P1_04_job_types_for_push_midi_excludes_blast_risk(self) -> None: from musehub.services.musehub_intel_providers import job_types_for_push assert "intel.code.blast_risk" not in job_types_for_push("midi") # --------------------------------------------------------------------------- # Layer 3 — Tier thresholds (unit, no DB) # --------------------------------------------------------------------------- class TestRiskTierThresholds: def test_P1_05_score_75_is_critical(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(75) == "critical" def test_P1_06_score_100_is_critical(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(100) == "critical" def test_P1_07_score_50_is_high(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(50) == "high" def test_P1_08_score_74_is_high(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(74) == "high" def test_P1_09_score_25_is_medium(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(25) == "medium" def test_P1_10_score_49_is_medium(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(49) == "medium" def test_P1_11_score_24_is_low(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(24) == "low" def test_P1_11b_score_0_is_low(self) -> None: from musehub.services.musehub_intel_providers import _risk_tier assert _risk_tier(0) == "low" # --------------------------------------------------------------------------- # Layer 4 — Score formula (unit, no DB) # --------------------------------------------------------------------------- class TestRiskScoreFormula: def test_P1_12_all_max_inputs_yield_100(self) -> None: from musehub.services.musehub_intel_providers import _compute_risk_score score = _compute_risk_score( impact_score=1.0, churn_score=1.0, test_gap_score=1.0, coupling_score=1.0, ) assert score == 100 def test_P1_13_all_zero_inputs_yield_0(self) -> None: from musehub.services.musehub_intel_providers import _compute_risk_score score = _compute_risk_score( impact_score=0.0, churn_score=0.0, test_gap_score=0.0, coupling_score=0.0, ) assert score == 0 def test_P1_14_impact_weight_is_40(self) -> None: from musehub.services.musehub_intel_providers import _compute_risk_score score = _compute_risk_score( impact_score=1.0, churn_score=0.0, test_gap_score=0.0, coupling_score=0.0, ) assert score == 40 def test_P1_15_churn_weight_is_25(self) -> None: from musehub.services.musehub_intel_providers import _compute_risk_score score = _compute_risk_score( impact_score=0.0, churn_score=1.0, test_gap_score=0.0, coupling_score=0.0, ) assert score == 25 def test_P1_16_test_gap_weight_is_20(self) -> None: from musehub.services.musehub_intel_providers import _compute_risk_score score = _compute_risk_score( impact_score=0.0, churn_score=0.0, test_gap_score=1.0, coupling_score=0.0, ) assert score == 20 def test_P1_17_coupling_weight_is_15(self) -> None: from musehub.services.musehub_intel_providers import _compute_risk_score score = _compute_risk_score( impact_score=0.0, churn_score=0.0, test_gap_score=0.0, coupling_score=1.0, ) assert score == 15 # --------------------------------------------------------------------------- # Layer 5 — Integration: confidence tiers via DB # --------------------------------------------------------------------------- class TestBlastRiskIntegration: @pytest.mark.asyncio async def test_P1_18_high_blast_high_churn_yields_critical( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) # blast=50 → impact=1.0, churn_30d=20 → churn=1.0 → score=100 → critical await _seed_symbol( db_session, repo.repo_id, address="pkg/a.py::risky_fn", blast=50, blast_cross=10, churn_30d=20, ) await _run_provider(db_session, repo.repo_id) row = await _get_risk_row(db_session, repo.repo_id, "pkg/a.py::risky_fn") assert row is not None assert row.risk == "critical" @pytest.mark.asyncio async def test_P1_19_zero_blast_excluded( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/b.py::no_blast", blast=0, churn_30d=10, ) await _run_provider(db_session, repo.repo_id) row = await _get_risk_row(db_session, repo.repo_id, "pkg/b.py::no_blast") assert row is None @pytest.mark.asyncio async def test_P1_20_untracked_kind_excluded( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/c.py::some_import", kind="import", blast=20, churn_30d=5, ) await _run_provider(db_session, repo.repo_id) row = await _get_risk_row(db_session, repo.repo_id, "pkg/c.py::some_import") assert row is None @pytest.mark.asyncio async def test_P1_21_risk_score_capped_at_100( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) # Extreme inputs — all scores at 1.0 → should be exactly 100, never over await _seed_symbol( db_session, repo.repo_id, address="pkg/d.py::overflow_fn", blast=9999, blast_cross=9999, churn_30d=9999, ) await _run_provider(db_session, repo.repo_id) row = await _get_risk_row(db_session, repo.repo_id, "pkg/d.py::overflow_fn") assert row is not None assert row.risk_score <= 100 @pytest.mark.asyncio async def test_P1_22_idempotent_run_twice_one_row( self, db_session: AsyncSession ) -> None: from sqlalchemy import func repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/e.py::idem_fn", blast=10, churn_30d=5, ) await _run_provider(db_session, repo.repo_id) await _run_provider(db_session, repo.repo_id) count = (await db_session.execute( select(func.count()).select_from(MusehubIntelBlastRisk).where( MusehubIntelBlastRisk.repo_id == repo.repo_id, MusehubIntelBlastRisk.address == "pkg/e.py::idem_fn", ) )).scalar_one() assert count == 1 @pytest.mark.asyncio async def test_P1_23_return_type( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/f.py::ret_fn", blast=10, churn_30d=3, ) result = await _run_provider(db_session, repo.repo_id) assert len(result) == 1 intel_type, data = result[0] assert intel_type == "intel.code.blast_risk" assert data["count"] == 1 @pytest.mark.asyncio async def test_P1_24_empty_repo_returns_empty_list( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) result = await _run_provider(db_session, repo.repo_id) assert result == [] # --------------------------------------------------------------------------- # Layer 6 — State integrity # --------------------------------------------------------------------------- class TestBlastRiskStateIntegrity: @pytest.mark.asyncio async def test_P1_25_rerun_updates_risk_score_in_place( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) repo_id = repo.repo_id # capture before any expire_all invalidates the ORM object # First run: low churn → lower score await _seed_symbol(db_session, repo_id, address="pkg/g.py::update_fn", blast=10, blast_cross=0, churn_30d=0) await _run_provider(db_session, repo_id) row_first = await _get_risk_row(db_session, repo_id, "pkg/g.py::update_fn") score_first = row_first.risk_score # Update symbol to high churn — raw SQL upsert, bypasses ORM identity map await _seed_symbol(db_session, repo_id, address="pkg/g.py::update_fn", blast=10, blast_cross=0, churn_30d=20) await _run_provider(db_session, repo_id) db_session.expire_all() # invalidate cached blast_risk row so _get_risk_row re-fetches row_second = await _get_risk_row(db_session, repo_id, "pkg/g.py::update_fn") assert row_second.risk_score > score_first # Still one row from sqlalchemy import func count = (await db_session.execute( select(func.count()).select_from(MusehubIntelBlastRisk).where( MusehubIntelBlastRisk.repo_id == repo_id, MusehubIntelBlastRisk.address == "pkg/g.py::update_fn", ) )).scalar_one() assert count == 1 @pytest.mark.asyncio async def test_P1_26_upsert_does_not_touch_symbol_intel_blast_churn( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/h.py::no_touch_fn", blast=7, churn=3, churn_30d=2, ) await _run_provider(db_session, repo.repo_id) intel_row = (await db_session.execute( select(MusehubSymbolIntel).where( MusehubSymbolIntel.repo_id == repo.repo_id, MusehubSymbolIntel.address == "pkg/h.py::no_touch_fn", ) )).scalar_one() assert intel_row.blast == 7 assert intel_row.churn == 3 assert intel_row.churn_30d == 2 @pytest.mark.asyncio async def test_P1_27_ref_column_updated_on_rerun( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) repo_id = repo.repo_id # capture before any expire_all invalidates the ORM object await _seed_symbol(db_session, repo_id, address="pkg/i.py::ref_fn", blast=10, churn_30d=3) await _run_provider(db_session, repo_id, ref=_REF_A) row = await _get_risk_row(db_session, repo_id, "pkg/i.py::ref_fn") assert row.ref == _REF_A await _run_provider(db_session, repo_id, ref=_REF_B) db_session.expire_all() # invalidate cached blast_risk row so _get_risk_row re-fetches row = await _get_risk_row(db_session, repo_id, "pkg/i.py::ref_fn") assert row.ref == _REF_B # --------------------------------------------------------------------------- # Layer 7 — Performance # --------------------------------------------------------------------------- class TestBlastRiskPerformance: @pytest.mark.asyncio async def test_P1_28_1000_symbols_processed_under_5_seconds( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) # Bulk insert via executemany-style from sqlalchemy import text rows = [] for i in range(1000): rows.append({ "repo_id": repo.repo_id, "address": f"pkg/perf_{i}.py::fn_{i}", "symbol_kind": _TRACKED_KINDS[i % len(_TRACKED_KINDS)], "blast": (i % 50) + 1, "blast_direct": i % 10, "blast_cross": i % 10, "churn": (i % 20) + 1, "churn_30d": i % 20, "churn_90d": i % 20, "author_count": 1, "gravity": 0.0, "weekly": [0] * 12, "blast_top": [], }) await db_session.execute( pg_insert(MusehubSymbolIntel) .values(rows) .on_conflict_do_nothing() ) await db_session.flush() start = time.monotonic() await _run_provider(db_session, repo.repo_id) elapsed = time.monotonic() - start assert elapsed < 5.0, f"1000 symbols took {elapsed:.2f}s (limit: 5s)" # --------------------------------------------------------------------------- # Layer 8 — No subprocess # --------------------------------------------------------------------------- class TestBlastRiskNoSubprocess: @pytest.mark.asyncio async def test_P1_29_no_subprocess_spawned( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session) await _seed_symbol( db_session, repo.repo_id, address="pkg/j.py::no_proc_fn", blast=10, churn_30d=3, ) with patch("asyncio.create_subprocess_exec") as mock_exec: await _run_provider(db_session, repo.repo_id) mock_exec.assert_not_called()