"""TDD spec for Phase 2 — /intel/blast-risk route (issue #11). Route: GET /{owner}/{repo_slug}/intel/blast-risk Query params: risk (optional) — filter to one tier: critical | high | medium | low top (optional, default 50) — row limit: 25 | 50 | 100 | 250 Handler queries musehub_intel_blast_risk directly, ordered by risk_score DESC. Template: musehub/templates/musehub/pages/intel_blast_risk.html Layers: 1. Route registered — "intel/blast-risk" in ui_intel router 2. Basic 200 — GET returns 200 for a known repo 3. Not found — unknown repo → 404 4. Empty state — zero rows → 200 with empty-state marker 5. Rows rendered — seeded rows appear in response text 6. Order by score — highest risk_score appears before lower ones 7. Filter ?risk=critical — only critical rows returned 8. Filter ?risk=high — only high rows returned 9. Invalid risk param — unknown tier ignored, all rows returned 10. top=25 limit — response shows at most 25 rows 11. top clamped — invalid top value clamped to default (50) 12. Stats: critical count in page 13. Stats: high count in page 14. Stats: total count in page 15. MIME type — Content-Type: text/html 16. SQL-derived — no subprocess called during handler 17. Score ordering — critical > high > medium > low all render 18. Multi-repo isolation — rows from other repo absent 19. top=100 — accepted without error 20. top=250 — accepted without error """ from __future__ import annotations import secrets from unittest.mock import patch import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id, long_id from musehub.db.musehub_intel_models import MusehubIntelBlastRisk from musehub.db.musehub_repo_models import MusehubRepo from tests.factories import create_repo def _uid() -> str: return fake_id(secrets.token_hex(16)) _OWNER = "testuser" _SLUG = "blastriskripo" _REF = long_id("c" * 64) async def _seed_risk( session: AsyncSession, repo_id: str, *, address: str, kind: str = "function", risk: str = "high", risk_score: int = 60, impact_score: float = 0.5, churn_score: float = 0.5, test_gap_score: float = 1.0, coupling_score: float = 0.3, ) -> None: stmt = ( pg_insert(MusehubIntelBlastRisk) .values( repo_id=repo_id, address=address, kind=kind, risk=risk, risk_score=risk_score, impact_score=impact_score, churn_score=churn_score, test_gap_score=test_gap_score, coupling_score=coupling_score, ref=_REF, ) .on_conflict_do_update( index_elements=["repo_id", "address"], set_={ "risk": risk, "risk_score": risk_score, "impact_score": impact_score, "churn_score": churn_score, }, ) ) await session.execute(stmt) await session.flush() # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture async def risk_repo(db_session: AsyncSession) -> MusehubRepo: return await create_repo(db_session, owner=_OWNER, slug=_SLUG) @pytest_asyncio.fixture async def risk_repo_with_rows(db_session: AsyncSession, risk_repo: MusehubRepo) -> MusehubRepo: repo_id = risk_repo.repo_id await db_session.commit() await _seed_risk(db_session, repo_id, address="pkg/a.py::critical_fn", risk="critical", risk_score=90) await _seed_risk(db_session, repo_id, address="pkg/b.py::high_fn", risk="high", risk_score=60) await _seed_risk(db_session, repo_id, address="pkg/c.py::medium_fn", risk="medium", risk_score=40) await _seed_risk(db_session, repo_id, address="pkg/d.py::low_fn", risk="low", risk_score=10) await db_session.commit() return risk_repo # --------------------------------------------------------------------------- # Layer 1 — Route registration # --------------------------------------------------------------------------- class TestBlastRiskRouteRegistration: def test_P2_01_blast_risk_route_registered(self) -> None: from musehub.api.routes.musehub.ui_intel import router paths = [r.path for r in router.routes] assert any("blast-risk" in p for p in paths) # --------------------------------------------------------------------------- # Layer 2 — Basic 200 # --------------------------------------------------------------------------- class TestBlastRiskBasic: @pytest.mark.asyncio async def test_P2_02_get_returns_200( self, client: AsyncClient, risk_repo: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") assert resp.status_code == 200 @pytest.mark.asyncio async def test_P2_15_content_type_html( self, client: AsyncClient, risk_repo: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") assert "text/html" in resp.headers["content-type"] # --------------------------------------------------------------------------- # Layer 3 — Not found # --------------------------------------------------------------------------- class TestBlastRiskNotFound: @pytest.mark.asyncio async def test_P2_03_unknown_repo_returns_404( self, client: AsyncClient ) -> None: resp = await client.get("/nobody/norepo/intel/blast-risk") assert resp.status_code == 404 # --------------------------------------------------------------------------- # Layer 4 — Empty state # --------------------------------------------------------------------------- class TestBlastRiskEmptyState: @pytest.mark.asyncio async def test_P2_04_empty_repo_shows_empty_state( self, client: AsyncClient, risk_repo: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") assert resp.status_code == 200 # Must contain some empty-state marker — "no symbols" or "0" or similar text = resp.text.lower() assert "no " in text or "0 " in text or "empty" in text or "risk" in text # --------------------------------------------------------------------------- # Layer 5 — Rows rendered # --------------------------------------------------------------------------- class TestBlastRiskRowsRendered: @pytest.mark.asyncio async def test_P2_05_seeded_rows_appear_in_response( self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo ) -> None: repo_id = risk_repo.repo_id await db_session.commit() await _seed_risk(db_session, repo_id, address="pkg/render.py::visible_fn", risk="high", risk_score=65) await db_session.commit() resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") assert "visible_fn" in resp.text # --------------------------------------------------------------------------- # Layer 6 — Order by score DESC # --------------------------------------------------------------------------- class TestBlastRiskOrdering: @pytest.mark.asyncio async def test_P2_06_highest_score_appears_before_lower( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") text = resp.text pos_critical = text.find("critical_fn") pos_high = text.find("high_fn") assert pos_critical < pos_high, "critical (score=90) must appear before high (score=60)" # --------------------------------------------------------------------------- # Layer 7 — Filter ?risk=critical # --------------------------------------------------------------------------- class TestBlastRiskFilterCritical: @pytest.mark.asyncio async def test_P2_07_filter_risk_critical_only( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=critical") assert "critical_fn" in resp.text assert "high_fn" not in resp.text assert "medium_fn" not in resp.text assert "low_fn" not in resp.text # --------------------------------------------------------------------------- # Layer 8 — Filter ?risk=high # --------------------------------------------------------------------------- class TestBlastRiskFilterHigh: @pytest.mark.asyncio async def test_P2_08_filter_risk_high_only( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=high") assert "high_fn" in resp.text assert "critical_fn" not in resp.text assert "medium_fn" not in resp.text assert "low_fn" not in resp.text # --------------------------------------------------------------------------- # Layer 9 — Invalid risk param ignored # --------------------------------------------------------------------------- class TestBlastRiskInvalidFilter: @pytest.mark.asyncio async def test_P2_09_invalid_risk_param_returns_all_rows( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=bogus") assert resp.status_code == 200 assert "critical_fn" in resp.text assert "high_fn" in resp.text # --------------------------------------------------------------------------- # Layer 10 — top=25 limit # --------------------------------------------------------------------------- class TestBlastRiskTopParam: @pytest.mark.asyncio async def test_P2_10_top_25_limits_rows( self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo ) -> None: repo_id = risk_repo.repo_id await db_session.commit() for i in range(30): await _seed_risk( db_session, repo_id, address=f"pkg/top_{i:03d}.py::fn_{i}", risk="high", risk_score=50 + i, ) await db_session.commit() resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=25") # fn_29 (highest score) should appear; fn_00 (lowest score) should not assert "fn_29" in resp.text assert "fn_00" not in resp.text @pytest.mark.asyncio async def test_P2_11_invalid_top_clamped_to_default( self, client: AsyncClient, risk_repo: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=9999") assert resp.status_code == 200 @pytest.mark.asyncio async def test_P2_19_top_100_accepted( self, client: AsyncClient, risk_repo: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=100") assert resp.status_code == 200 @pytest.mark.asyncio async def test_P2_20_top_250_accepted( self, client: AsyncClient, risk_repo: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=250") assert resp.status_code == 200 # --------------------------------------------------------------------------- # Layer 12–14 — Stats in page # --------------------------------------------------------------------------- class TestBlastRiskStats: @pytest.mark.asyncio async def test_P2_12_critical_count_in_page( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") # 1 critical row seeded; "1" must appear somewhere in the stat area assert "1" in resp.text @pytest.mark.asyncio async def test_P2_13_high_count_in_page( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") assert "1" in resp.text # 1 high row @pytest.mark.asyncio async def test_P2_14_total_count_in_page( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") assert "4" in resp.text # 4 total rows # --------------------------------------------------------------------------- # Layer 16 — No subprocess # --------------------------------------------------------------------------- class TestBlastRiskNoSubprocess: @pytest.mark.asyncio async def test_P2_16_no_subprocess_called( self, client: AsyncClient, risk_repo: MusehubRepo ) -> None: with patch("asyncio.create_subprocess_exec") as mock_proc: await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") mock_proc.assert_not_called() # --------------------------------------------------------------------------- # Layer 17 — All risk tiers render # --------------------------------------------------------------------------- class TestBlastRiskAllTiers: @pytest.mark.asyncio async def test_P2_17_all_four_tiers_rendered( self, client: AsyncClient, risk_repo_with_rows: MusehubRepo ) -> None: resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") text = resp.text assert "critical_fn" in text assert "high_fn" in text assert "medium_fn" in text assert "low_fn" in text # --------------------------------------------------------------------------- # Layer 18 — Multi-repo isolation # --------------------------------------------------------------------------- class TestBlastRiskIsolation: @pytest.mark.asyncio async def test_P2_18_other_repo_rows_absent( self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo ) -> None: other_repo = await create_repo(db_session, owner="other", slug="otherrepo") other_id = other_repo.repo_id await db_session.commit() await _seed_risk(db_session, other_id, address="pkg/spy.py::spy_fn", risk="critical", risk_score=95) await db_session.commit() resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") assert "spy_fn" not in resp.text