test_phase2_blast_risk_route.py
python
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7
fix: repair syntax errors from typing annotation cleanup
Sonnet 4.6
20 days ago
| 1 | """TDD spec for Phase 2 — /intel/blast-risk route (issue #11). |
| 2 | |
| 3 | Route: |
| 4 | GET /{owner}/{repo_slug}/intel/blast-risk |
| 5 | |
| 6 | Query params: |
| 7 | risk (optional) — filter to one tier: critical | high | medium | low |
| 8 | top (optional, default 50) — row limit: 25 | 50 | 100 | 250 |
| 9 | |
| 10 | Handler queries musehub_intel_blast_risk directly, ordered by risk_score DESC. |
| 11 | |
| 12 | Template: musehub/templates/musehub/pages/intel_blast_risk.html |
| 13 | |
| 14 | Layers: |
| 15 | 1. Route registered — "intel/blast-risk" in ui_intel router |
| 16 | 2. Basic 200 — GET returns 200 for a known repo |
| 17 | 3. Not found — unknown repo → 404 |
| 18 | 4. Empty state — zero rows → 200 with empty-state marker |
| 19 | 5. Rows rendered — seeded rows appear in response text |
| 20 | 6. Order by score — highest risk_score appears before lower ones |
| 21 | 7. Filter ?risk=critical — only critical rows returned |
| 22 | 8. Filter ?risk=high — only high rows returned |
| 23 | 9. Invalid risk param — unknown tier ignored, all rows returned |
| 24 | 10. top=25 limit — response shows at most 25 rows |
| 25 | 11. top clamped — invalid top value clamped to default (50) |
| 26 | 12. Stats: critical count in page |
| 27 | 13. Stats: high count in page |
| 28 | 14. Stats: total count in page |
| 29 | 15. MIME type — Content-Type: text/html |
| 30 | 16. SQL-derived — no subprocess called during handler |
| 31 | 17. Score ordering — critical > high > medium > low all render |
| 32 | 18. Multi-repo isolation — rows from other repo absent |
| 33 | 19. top=100 — accepted without error |
| 34 | 20. top=250 — accepted without error |
| 35 | """ |
| 36 | from __future__ import annotations |
| 37 | |
| 38 | import secrets |
| 39 | from unittest.mock import patch |
| 40 | |
| 41 | import pytest |
| 42 | import pytest_asyncio |
| 43 | from httpx import AsyncClient |
| 44 | from sqlalchemy import select |
| 45 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 46 | from sqlalchemy.ext.asyncio import AsyncSession |
| 47 | |
| 48 | from muse.core.types import fake_id, long_id |
| 49 | from musehub.db.musehub_intel_models import MusehubIntelBlastRisk |
| 50 | from musehub.db.musehub_repo_models import MusehubRepo |
| 51 | from tests.factories import create_repo |
| 52 | |
| 53 | |
| 54 | def _uid() -> str: |
| 55 | return fake_id(secrets.token_hex(16)) |
| 56 | |
| 57 | |
| 58 | _OWNER = "testuser" |
| 59 | _SLUG = "blastriskripo" |
| 60 | _REF = long_id("c" * 64) |
| 61 | |
| 62 | |
| 63 | async def _seed_risk( |
| 64 | session: AsyncSession, |
| 65 | repo_id: str, |
| 66 | *, |
| 67 | address: str, |
| 68 | kind: str = "function", |
| 69 | risk: str = "high", |
| 70 | risk_score: int = 60, |
| 71 | impact_score: float = 0.5, |
| 72 | churn_score: float = 0.5, |
| 73 | test_gap_score: float = 1.0, |
| 74 | coupling_score: float = 0.3, |
| 75 | ) -> None: |
| 76 | stmt = ( |
| 77 | pg_insert(MusehubIntelBlastRisk) |
| 78 | .values( |
| 79 | repo_id=repo_id, |
| 80 | address=address, |
| 81 | kind=kind, |
| 82 | risk=risk, |
| 83 | risk_score=risk_score, |
| 84 | impact_score=impact_score, |
| 85 | churn_score=churn_score, |
| 86 | test_gap_score=test_gap_score, |
| 87 | coupling_score=coupling_score, |
| 88 | ref=_REF, |
| 89 | ) |
| 90 | .on_conflict_do_update( |
| 91 | index_elements=["repo_id", "address"], |
| 92 | set_={ |
| 93 | "risk": risk, |
| 94 | "risk_score": risk_score, |
| 95 | "impact_score": impact_score, |
| 96 | "churn_score": churn_score, |
| 97 | }, |
| 98 | ) |
| 99 | ) |
| 100 | await session.execute(stmt) |
| 101 | await session.flush() |
| 102 | |
| 103 | |
| 104 | # --------------------------------------------------------------------------- |
| 105 | # Fixtures |
| 106 | # --------------------------------------------------------------------------- |
| 107 | |
| 108 | @pytest_asyncio.fixture |
| 109 | async def risk_repo(db_session: AsyncSession) -> MusehubRepo: |
| 110 | return await create_repo(db_session, owner=_OWNER, slug=_SLUG) |
| 111 | |
| 112 | |
| 113 | @pytest_asyncio.fixture |
| 114 | async def risk_repo_with_rows(db_session: AsyncSession, risk_repo: MusehubRepo) -> MusehubRepo: |
| 115 | repo_id = risk_repo.repo_id |
| 116 | await db_session.commit() |
| 117 | await _seed_risk(db_session, repo_id, address="pkg/a.py::critical_fn", |
| 118 | risk="critical", risk_score=90) |
| 119 | await _seed_risk(db_session, repo_id, address="pkg/b.py::high_fn", |
| 120 | risk="high", risk_score=60) |
| 121 | await _seed_risk(db_session, repo_id, address="pkg/c.py::medium_fn", |
| 122 | risk="medium", risk_score=40) |
| 123 | await _seed_risk(db_session, repo_id, address="pkg/d.py::low_fn", |
| 124 | risk="low", risk_score=10) |
| 125 | await db_session.commit() |
| 126 | return risk_repo |
| 127 | |
| 128 | |
| 129 | # --------------------------------------------------------------------------- |
| 130 | # Layer 1 — Route registration |
| 131 | # --------------------------------------------------------------------------- |
| 132 | |
| 133 | class TestBlastRiskRouteRegistration: |
| 134 | |
| 135 | def test_P2_01_blast_risk_route_registered(self) -> None: |
| 136 | from musehub.api.routes.musehub.ui_intel import router |
| 137 | paths = [r.path for r in router.routes] |
| 138 | assert any("blast-risk" in p for p in paths) |
| 139 | |
| 140 | |
| 141 | # --------------------------------------------------------------------------- |
| 142 | # Layer 2 — Basic 200 |
| 143 | # --------------------------------------------------------------------------- |
| 144 | |
| 145 | class TestBlastRiskBasic: |
| 146 | |
| 147 | @pytest.mark.asyncio |
| 148 | async def test_P2_02_get_returns_200( |
| 149 | self, client: AsyncClient, risk_repo: MusehubRepo |
| 150 | ) -> None: |
| 151 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 152 | assert resp.status_code == 200 |
| 153 | |
| 154 | @pytest.mark.asyncio |
| 155 | async def test_P2_15_content_type_html( |
| 156 | self, client: AsyncClient, risk_repo: MusehubRepo |
| 157 | ) -> None: |
| 158 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 159 | assert "text/html" in resp.headers["content-type"] |
| 160 | |
| 161 | |
| 162 | # --------------------------------------------------------------------------- |
| 163 | # Layer 3 — Not found |
| 164 | # --------------------------------------------------------------------------- |
| 165 | |
| 166 | class TestBlastRiskNotFound: |
| 167 | |
| 168 | @pytest.mark.asyncio |
| 169 | async def test_P2_03_unknown_repo_returns_404( |
| 170 | self, client: AsyncClient |
| 171 | ) -> None: |
| 172 | resp = await client.get("/nobody/norepo/intel/blast-risk") |
| 173 | assert resp.status_code == 404 |
| 174 | |
| 175 | |
| 176 | # --------------------------------------------------------------------------- |
| 177 | # Layer 4 — Empty state |
| 178 | # --------------------------------------------------------------------------- |
| 179 | |
| 180 | class TestBlastRiskEmptyState: |
| 181 | |
| 182 | @pytest.mark.asyncio |
| 183 | async def test_P2_04_empty_repo_shows_empty_state( |
| 184 | self, client: AsyncClient, risk_repo: MusehubRepo |
| 185 | ) -> None: |
| 186 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 187 | assert resp.status_code == 200 |
| 188 | # Must contain some empty-state marker — "no symbols" or "0" or similar |
| 189 | text = resp.text.lower() |
| 190 | assert "no " in text or "0 " in text or "empty" in text or "risk" in text |
| 191 | |
| 192 | |
| 193 | # --------------------------------------------------------------------------- |
| 194 | # Layer 5 — Rows rendered |
| 195 | # --------------------------------------------------------------------------- |
| 196 | |
| 197 | class TestBlastRiskRowsRendered: |
| 198 | |
| 199 | @pytest.mark.asyncio |
| 200 | async def test_P2_05_seeded_rows_appear_in_response( |
| 201 | self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo |
| 202 | ) -> None: |
| 203 | repo_id = risk_repo.repo_id |
| 204 | await db_session.commit() |
| 205 | await _seed_risk(db_session, repo_id, address="pkg/render.py::visible_fn", |
| 206 | risk="high", risk_score=65) |
| 207 | await db_session.commit() |
| 208 | |
| 209 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 210 | assert "visible_fn" in resp.text |
| 211 | |
| 212 | |
| 213 | # --------------------------------------------------------------------------- |
| 214 | # Layer 6 — Order by score DESC |
| 215 | # --------------------------------------------------------------------------- |
| 216 | |
| 217 | class TestBlastRiskOrdering: |
| 218 | |
| 219 | @pytest.mark.asyncio |
| 220 | async def test_P2_06_highest_score_appears_before_lower( |
| 221 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 222 | ) -> None: |
| 223 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 224 | text = resp.text |
| 225 | pos_critical = text.find("critical_fn") |
| 226 | pos_high = text.find("high_fn") |
| 227 | assert pos_critical < pos_high, "critical (score=90) must appear before high (score=60)" |
| 228 | |
| 229 | |
| 230 | # --------------------------------------------------------------------------- |
| 231 | # Layer 7 — Filter ?risk=critical |
| 232 | # --------------------------------------------------------------------------- |
| 233 | |
| 234 | class TestBlastRiskFilterCritical: |
| 235 | |
| 236 | @pytest.mark.asyncio |
| 237 | async def test_P2_07_filter_risk_critical_only( |
| 238 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 239 | ) -> None: |
| 240 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=critical") |
| 241 | assert "critical_fn" in resp.text |
| 242 | assert "high_fn" not in resp.text |
| 243 | assert "medium_fn" not in resp.text |
| 244 | assert "low_fn" not in resp.text |
| 245 | |
| 246 | |
| 247 | # --------------------------------------------------------------------------- |
| 248 | # Layer 8 — Filter ?risk=high |
| 249 | # --------------------------------------------------------------------------- |
| 250 | |
| 251 | class TestBlastRiskFilterHigh: |
| 252 | |
| 253 | @pytest.mark.asyncio |
| 254 | async def test_P2_08_filter_risk_high_only( |
| 255 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 256 | ) -> None: |
| 257 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=high") |
| 258 | assert "high_fn" in resp.text |
| 259 | assert "critical_fn" not in resp.text |
| 260 | assert "medium_fn" not in resp.text |
| 261 | assert "low_fn" not in resp.text |
| 262 | |
| 263 | |
| 264 | # --------------------------------------------------------------------------- |
| 265 | # Layer 9 — Invalid risk param ignored |
| 266 | # --------------------------------------------------------------------------- |
| 267 | |
| 268 | class TestBlastRiskInvalidFilter: |
| 269 | |
| 270 | @pytest.mark.asyncio |
| 271 | async def test_P2_09_invalid_risk_param_returns_all_rows( |
| 272 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 273 | ) -> None: |
| 274 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=bogus") |
| 275 | assert resp.status_code == 200 |
| 276 | assert "critical_fn" in resp.text |
| 277 | assert "high_fn" in resp.text |
| 278 | |
| 279 | |
| 280 | # --------------------------------------------------------------------------- |
| 281 | # Layer 10 — top=25 limit |
| 282 | # --------------------------------------------------------------------------- |
| 283 | |
| 284 | class TestBlastRiskTopParam: |
| 285 | |
| 286 | @pytest.mark.asyncio |
| 287 | async def test_P2_10_top_25_limits_rows( |
| 288 | self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo |
| 289 | ) -> None: |
| 290 | repo_id = risk_repo.repo_id |
| 291 | await db_session.commit() |
| 292 | for i in range(30): |
| 293 | await _seed_risk( |
| 294 | db_session, repo_id, |
| 295 | address=f"pkg/top_{i:03d}.py::fn_{i}", |
| 296 | risk="high", risk_score=50 + i, |
| 297 | ) |
| 298 | await db_session.commit() |
| 299 | |
| 300 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=25") |
| 301 | # fn_29 (highest score) should appear; fn_00 (lowest score) should not |
| 302 | assert "fn_29" in resp.text |
| 303 | assert "fn_00" not in resp.text |
| 304 | |
| 305 | @pytest.mark.asyncio |
| 306 | async def test_P2_11_invalid_top_clamped_to_default( |
| 307 | self, client: AsyncClient, risk_repo: MusehubRepo |
| 308 | ) -> None: |
| 309 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=9999") |
| 310 | assert resp.status_code == 200 |
| 311 | |
| 312 | @pytest.mark.asyncio |
| 313 | async def test_P2_19_top_100_accepted( |
| 314 | self, client: AsyncClient, risk_repo: MusehubRepo |
| 315 | ) -> None: |
| 316 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=100") |
| 317 | assert resp.status_code == 200 |
| 318 | |
| 319 | @pytest.mark.asyncio |
| 320 | async def test_P2_20_top_250_accepted( |
| 321 | self, client: AsyncClient, risk_repo: MusehubRepo |
| 322 | ) -> None: |
| 323 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=250") |
| 324 | assert resp.status_code == 200 |
| 325 | |
| 326 | |
| 327 | # --------------------------------------------------------------------------- |
| 328 | # Layer 12–14 — Stats in page |
| 329 | # --------------------------------------------------------------------------- |
| 330 | |
| 331 | class TestBlastRiskStats: |
| 332 | |
| 333 | @pytest.mark.asyncio |
| 334 | async def test_P2_12_critical_count_in_page( |
| 335 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 336 | ) -> None: |
| 337 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 338 | # 1 critical row seeded; "1" must appear somewhere in the stat area |
| 339 | assert "1" in resp.text |
| 340 | |
| 341 | @pytest.mark.asyncio |
| 342 | async def test_P2_13_high_count_in_page( |
| 343 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 344 | ) -> None: |
| 345 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 346 | assert "1" in resp.text # 1 high row |
| 347 | |
| 348 | @pytest.mark.asyncio |
| 349 | async def test_P2_14_total_count_in_page( |
| 350 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 351 | ) -> None: |
| 352 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 353 | assert "4" in resp.text # 4 total rows |
| 354 | |
| 355 | |
| 356 | # --------------------------------------------------------------------------- |
| 357 | # Layer 16 — No subprocess |
| 358 | # --------------------------------------------------------------------------- |
| 359 | |
| 360 | class TestBlastRiskNoSubprocess: |
| 361 | |
| 362 | @pytest.mark.asyncio |
| 363 | async def test_P2_16_no_subprocess_called( |
| 364 | self, client: AsyncClient, risk_repo: MusehubRepo |
| 365 | ) -> None: |
| 366 | with patch("asyncio.create_subprocess_exec") as mock_proc: |
| 367 | await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 368 | mock_proc.assert_not_called() |
| 369 | |
| 370 | |
| 371 | # --------------------------------------------------------------------------- |
| 372 | # Layer 17 — All risk tiers render |
| 373 | # --------------------------------------------------------------------------- |
| 374 | |
| 375 | class TestBlastRiskAllTiers: |
| 376 | |
| 377 | @pytest.mark.asyncio |
| 378 | async def test_P2_17_all_four_tiers_rendered( |
| 379 | self, client: AsyncClient, risk_repo_with_rows: MusehubRepo |
| 380 | ) -> None: |
| 381 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 382 | text = resp.text |
| 383 | assert "critical_fn" in text |
| 384 | assert "high_fn" in text |
| 385 | assert "medium_fn" in text |
| 386 | assert "low_fn" in text |
| 387 | |
| 388 | |
| 389 | # --------------------------------------------------------------------------- |
| 390 | # Layer 18 — Multi-repo isolation |
| 391 | # --------------------------------------------------------------------------- |
| 392 | |
| 393 | class TestBlastRiskIsolation: |
| 394 | |
| 395 | @pytest.mark.asyncio |
| 396 | async def test_P2_18_other_repo_rows_absent( |
| 397 | self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo |
| 398 | ) -> None: |
| 399 | other_repo = await create_repo(db_session, owner="other", slug="otherrepo") |
| 400 | other_id = other_repo.repo_id |
| 401 | await db_session.commit() |
| 402 | await _seed_risk(db_session, other_id, address="pkg/spy.py::spy_fn", |
| 403 | risk="critical", risk_score=95) |
| 404 | await db_session.commit() |
| 405 | |
| 406 | resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk") |
| 407 | assert "spy_fn" not in resp.text |
File History
2 commits
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7
fix: repair syntax errors from typing annotation cleanup
Sonnet 4.6
20 days ago
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
20 days ago