gabriel / musehub public
test_phase2_blast_risk_route.py python
407 lines 14.5 KB
Raw
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7 fix: repair syntax errors from typing annotation cleanup Sonnet 4.6 20 days ago
1 """TDD spec for Phase 2 — /intel/blast-risk route (issue #11).
2
3 Route:
4 GET /{owner}/{repo_slug}/intel/blast-risk
5
6 Query params:
7 risk (optional) — filter to one tier: critical | high | medium | low
8 top (optional, default 50) — row limit: 25 | 50 | 100 | 250
9
10 Handler queries musehub_intel_blast_risk directly, ordered by risk_score DESC.
11
12 Template: musehub/templates/musehub/pages/intel_blast_risk.html
13
14 Layers:
15 1. Route registered — "intel/blast-risk" in ui_intel router
16 2. Basic 200 — GET returns 200 for a known repo
17 3. Not found — unknown repo → 404
18 4. Empty state — zero rows → 200 with empty-state marker
19 5. Rows rendered — seeded rows appear in response text
20 6. Order by score — highest risk_score appears before lower ones
21 7. Filter ?risk=critical — only critical rows returned
22 8. Filter ?risk=high — only high rows returned
23 9. Invalid risk param — unknown tier ignored, all rows returned
24 10. top=25 limit — response shows at most 25 rows
25 11. top clamped — invalid top value clamped to default (50)
26 12. Stats: critical count in page
27 13. Stats: high count in page
28 14. Stats: total count in page
29 15. MIME type — Content-Type: text/html
30 16. SQL-derived — no subprocess called during handler
31 17. Score ordering — critical > high > medium > low all render
32 18. Multi-repo isolation — rows from other repo absent
33 19. top=100 — accepted without error
34 20. top=250 — accepted without error
35 """
36 from __future__ import annotations
37
38 import secrets
39 from unittest.mock import patch
40
41 import pytest
42 import pytest_asyncio
43 from httpx import AsyncClient
44 from sqlalchemy import select
45 from sqlalchemy.dialects.postgresql import insert as pg_insert
46 from sqlalchemy.ext.asyncio import AsyncSession
47
48 from muse.core.types import fake_id, long_id
49 from musehub.db.musehub_intel_models import MusehubIntelBlastRisk
50 from musehub.db.musehub_repo_models import MusehubRepo
51 from tests.factories import create_repo
52
53
54 def _uid() -> str:
55 return fake_id(secrets.token_hex(16))
56
57
58 _OWNER = "testuser"
59 _SLUG = "blastriskripo"
60 _REF = long_id("c" * 64)
61
62
63 async def _seed_risk(
64 session: AsyncSession,
65 repo_id: str,
66 *,
67 address: str,
68 kind: str = "function",
69 risk: str = "high",
70 risk_score: int = 60,
71 impact_score: float = 0.5,
72 churn_score: float = 0.5,
73 test_gap_score: float = 1.0,
74 coupling_score: float = 0.3,
75 ) -> None:
76 stmt = (
77 pg_insert(MusehubIntelBlastRisk)
78 .values(
79 repo_id=repo_id,
80 address=address,
81 kind=kind,
82 risk=risk,
83 risk_score=risk_score,
84 impact_score=impact_score,
85 churn_score=churn_score,
86 test_gap_score=test_gap_score,
87 coupling_score=coupling_score,
88 ref=_REF,
89 )
90 .on_conflict_do_update(
91 index_elements=["repo_id", "address"],
92 set_={
93 "risk": risk,
94 "risk_score": risk_score,
95 "impact_score": impact_score,
96 "churn_score": churn_score,
97 },
98 )
99 )
100 await session.execute(stmt)
101 await session.flush()
102
103
104 # ---------------------------------------------------------------------------
105 # Fixtures
106 # ---------------------------------------------------------------------------
107
108 @pytest_asyncio.fixture
109 async def risk_repo(db_session: AsyncSession) -> MusehubRepo:
110 return await create_repo(db_session, owner=_OWNER, slug=_SLUG)
111
112
113 @pytest_asyncio.fixture
114 async def risk_repo_with_rows(db_session: AsyncSession, risk_repo: MusehubRepo) -> MusehubRepo:
115 repo_id = risk_repo.repo_id
116 await db_session.commit()
117 await _seed_risk(db_session, repo_id, address="pkg/a.py::critical_fn",
118 risk="critical", risk_score=90)
119 await _seed_risk(db_session, repo_id, address="pkg/b.py::high_fn",
120 risk="high", risk_score=60)
121 await _seed_risk(db_session, repo_id, address="pkg/c.py::medium_fn",
122 risk="medium", risk_score=40)
123 await _seed_risk(db_session, repo_id, address="pkg/d.py::low_fn",
124 risk="low", risk_score=10)
125 await db_session.commit()
126 return risk_repo
127
128
129 # ---------------------------------------------------------------------------
130 # Layer 1 — Route registration
131 # ---------------------------------------------------------------------------
132
133 class TestBlastRiskRouteRegistration:
134
135 def test_P2_01_blast_risk_route_registered(self) -> None:
136 from musehub.api.routes.musehub.ui_intel import router
137 paths = [r.path for r in router.routes]
138 assert any("blast-risk" in p for p in paths)
139
140
141 # ---------------------------------------------------------------------------
142 # Layer 2 — Basic 200
143 # ---------------------------------------------------------------------------
144
145 class TestBlastRiskBasic:
146
147 @pytest.mark.asyncio
148 async def test_P2_02_get_returns_200(
149 self, client: AsyncClient, risk_repo: MusehubRepo
150 ) -> None:
151 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
152 assert resp.status_code == 200
153
154 @pytest.mark.asyncio
155 async def test_P2_15_content_type_html(
156 self, client: AsyncClient, risk_repo: MusehubRepo
157 ) -> None:
158 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
159 assert "text/html" in resp.headers["content-type"]
160
161
162 # ---------------------------------------------------------------------------
163 # Layer 3 — Not found
164 # ---------------------------------------------------------------------------
165
166 class TestBlastRiskNotFound:
167
168 @pytest.mark.asyncio
169 async def test_P2_03_unknown_repo_returns_404(
170 self, client: AsyncClient
171 ) -> None:
172 resp = await client.get("/nobody/norepo/intel/blast-risk")
173 assert resp.status_code == 404
174
175
176 # ---------------------------------------------------------------------------
177 # Layer 4 — Empty state
178 # ---------------------------------------------------------------------------
179
180 class TestBlastRiskEmptyState:
181
182 @pytest.mark.asyncio
183 async def test_P2_04_empty_repo_shows_empty_state(
184 self, client: AsyncClient, risk_repo: MusehubRepo
185 ) -> None:
186 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
187 assert resp.status_code == 200
188 # Must contain some empty-state marker — "no symbols" or "0" or similar
189 text = resp.text.lower()
190 assert "no " in text or "0 " in text or "empty" in text or "risk" in text
191
192
193 # ---------------------------------------------------------------------------
194 # Layer 5 — Rows rendered
195 # ---------------------------------------------------------------------------
196
197 class TestBlastRiskRowsRendered:
198
199 @pytest.mark.asyncio
200 async def test_P2_05_seeded_rows_appear_in_response(
201 self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo
202 ) -> None:
203 repo_id = risk_repo.repo_id
204 await db_session.commit()
205 await _seed_risk(db_session, repo_id, address="pkg/render.py::visible_fn",
206 risk="high", risk_score=65)
207 await db_session.commit()
208
209 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
210 assert "visible_fn" in resp.text
211
212
213 # ---------------------------------------------------------------------------
214 # Layer 6 — Order by score DESC
215 # ---------------------------------------------------------------------------
216
217 class TestBlastRiskOrdering:
218
219 @pytest.mark.asyncio
220 async def test_P2_06_highest_score_appears_before_lower(
221 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
222 ) -> None:
223 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
224 text = resp.text
225 pos_critical = text.find("critical_fn")
226 pos_high = text.find("high_fn")
227 assert pos_critical < pos_high, "critical (score=90) must appear before high (score=60)"
228
229
230 # ---------------------------------------------------------------------------
231 # Layer 7 — Filter ?risk=critical
232 # ---------------------------------------------------------------------------
233
234 class TestBlastRiskFilterCritical:
235
236 @pytest.mark.asyncio
237 async def test_P2_07_filter_risk_critical_only(
238 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
239 ) -> None:
240 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=critical")
241 assert "critical_fn" in resp.text
242 assert "high_fn" not in resp.text
243 assert "medium_fn" not in resp.text
244 assert "low_fn" not in resp.text
245
246
247 # ---------------------------------------------------------------------------
248 # Layer 8 — Filter ?risk=high
249 # ---------------------------------------------------------------------------
250
251 class TestBlastRiskFilterHigh:
252
253 @pytest.mark.asyncio
254 async def test_P2_08_filter_risk_high_only(
255 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
256 ) -> None:
257 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=high")
258 assert "high_fn" in resp.text
259 assert "critical_fn" not in resp.text
260 assert "medium_fn" not in resp.text
261 assert "low_fn" not in resp.text
262
263
264 # ---------------------------------------------------------------------------
265 # Layer 9 — Invalid risk param ignored
266 # ---------------------------------------------------------------------------
267
268 class TestBlastRiskInvalidFilter:
269
270 @pytest.mark.asyncio
271 async def test_P2_09_invalid_risk_param_returns_all_rows(
272 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
273 ) -> None:
274 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=bogus")
275 assert resp.status_code == 200
276 assert "critical_fn" in resp.text
277 assert "high_fn" in resp.text
278
279
280 # ---------------------------------------------------------------------------
281 # Layer 10 — top=25 limit
282 # ---------------------------------------------------------------------------
283
284 class TestBlastRiskTopParam:
285
286 @pytest.mark.asyncio
287 async def test_P2_10_top_25_limits_rows(
288 self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo
289 ) -> None:
290 repo_id = risk_repo.repo_id
291 await db_session.commit()
292 for i in range(30):
293 await _seed_risk(
294 db_session, repo_id,
295 address=f"pkg/top_{i:03d}.py::fn_{i}",
296 risk="high", risk_score=50 + i,
297 )
298 await db_session.commit()
299
300 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=25")
301 # fn_29 (highest score) should appear; fn_00 (lowest score) should not
302 assert "fn_29" in resp.text
303 assert "fn_00" not in resp.text
304
305 @pytest.mark.asyncio
306 async def test_P2_11_invalid_top_clamped_to_default(
307 self, client: AsyncClient, risk_repo: MusehubRepo
308 ) -> None:
309 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=9999")
310 assert resp.status_code == 200
311
312 @pytest.mark.asyncio
313 async def test_P2_19_top_100_accepted(
314 self, client: AsyncClient, risk_repo: MusehubRepo
315 ) -> None:
316 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=100")
317 assert resp.status_code == 200
318
319 @pytest.mark.asyncio
320 async def test_P2_20_top_250_accepted(
321 self, client: AsyncClient, risk_repo: MusehubRepo
322 ) -> None:
323 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=250")
324 assert resp.status_code == 200
325
326
327 # ---------------------------------------------------------------------------
328 # Layer 12–14 — Stats in page
329 # ---------------------------------------------------------------------------
330
331 class TestBlastRiskStats:
332
333 @pytest.mark.asyncio
334 async def test_P2_12_critical_count_in_page(
335 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
336 ) -> None:
337 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
338 # 1 critical row seeded; "1" must appear somewhere in the stat area
339 assert "1" in resp.text
340
341 @pytest.mark.asyncio
342 async def test_P2_13_high_count_in_page(
343 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
344 ) -> None:
345 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
346 assert "1" in resp.text # 1 high row
347
348 @pytest.mark.asyncio
349 async def test_P2_14_total_count_in_page(
350 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
351 ) -> None:
352 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
353 assert "4" in resp.text # 4 total rows
354
355
356 # ---------------------------------------------------------------------------
357 # Layer 16 — No subprocess
358 # ---------------------------------------------------------------------------
359
360 class TestBlastRiskNoSubprocess:
361
362 @pytest.mark.asyncio
363 async def test_P2_16_no_subprocess_called(
364 self, client: AsyncClient, risk_repo: MusehubRepo
365 ) -> None:
366 with patch("asyncio.create_subprocess_exec") as mock_proc:
367 await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
368 mock_proc.assert_not_called()
369
370
371 # ---------------------------------------------------------------------------
372 # Layer 17 — All risk tiers render
373 # ---------------------------------------------------------------------------
374
375 class TestBlastRiskAllTiers:
376
377 @pytest.mark.asyncio
378 async def test_P2_17_all_four_tiers_rendered(
379 self, client: AsyncClient, risk_repo_with_rows: MusehubRepo
380 ) -> None:
381 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
382 text = resp.text
383 assert "critical_fn" in text
384 assert "high_fn" in text
385 assert "medium_fn" in text
386 assert "low_fn" in text
387
388
389 # ---------------------------------------------------------------------------
390 # Layer 18 — Multi-repo isolation
391 # ---------------------------------------------------------------------------
392
393 class TestBlastRiskIsolation:
394
395 @pytest.mark.asyncio
396 async def test_P2_18_other_repo_rows_absent(
397 self, client: AsyncClient, db_session: AsyncSession, risk_repo: MusehubRepo
398 ) -> None:
399 other_repo = await create_repo(db_session, owner="other", slug="otherrepo")
400 other_id = other_repo.repo_id
401 await db_session.commit()
402 await _seed_risk(db_session, other_id, address="pkg/spy.py::spy_fn",
403 risk="critical", risk_score=95)
404 await db_session.commit()
405
406 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
407 assert "spy_fn" not in resp.text
File History 2 commits
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7 fix: repair syntax errors from typing annotation cleanup Sonnet 4.6 20 days ago
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago