gabriel / musehub public
test_phase2_blast_risk_route.py python
406 lines 14.1 KB
Raw
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7 fix: repair syntax errors from typing annotation cleanup Sonnet 4.6 21 days ago
1 """TDD spec for Phase 2 — /intel/blast-risk route (issue #11).
2
3 Route:
4 GET /{owner}/{repo_slug}/intel/blast-risk
5
6 Query params:
7 risk (optional) — filter to one tier: critical | high | medium | low
8 top (optional, default 50) — row limit: 25 | 50 | 100 | 250
9
10 Handler queries musehub_intel_blast_risk directly, ordered by risk_score DESC.
11
12 Template: musehub/templates/musehub/pages/intel_blast_risk.html
13
14 Layers:
15 1. Route registered — "intel/blast-risk" in ui_intel router
16 2. Basic 200 — GET returns 200 for a known repo
17 3. Not found — unknown repo → 404
18 4. Empty state — zero rows → 200 with empty-state marker
19 5. Rows rendered — seeded rows appear in response text
20 6. Order by score — highest risk_score appears before lower ones
21 7. Filter ?risk=critical — only critical rows returned
22 8. Filter ?risk=high — only high rows returned
23 9. Invalid risk param — unknown tier ignored, all rows returned
24 10. top=25 limit — response shows at most 25 rows
25 11. top clamped — invalid top value clamped to default (50)
26 12. Stats: critical count in page
27 13. Stats: high count in page
28 14. Stats: total count in page
29 15. MIME type — Content-Type: text/html
30 16. SQL-derived — no subprocess called during handler
31 17. Score ordering — critical > high > medium > low all render
32 18. Multi-repo isolation — rows from other repo absent
33 19. top=100 — accepted without error
34 20. top=250 — accepted without error
35 """
36 from __future__ import annotations
37
38 import secrets
39 from unittest.mock import patch
40
41 import pytest
42 import pytest_asyncio
43 from httpx import AsyncClient
44 from sqlalchemy import select
45 from sqlalchemy.dialects.postgresql import insert as pg_insert
46 from sqlalchemy.ext.asyncio import AsyncSession
47
48 from muse.core.types import fake_id, long_id
49 from musehub.db.musehub_intel_models import MusehubIntelBlastRisk
50 from tests.factories import create_repo
51
52
53 def _uid() -> str:
54 return fake_id(secrets.token_hex(16))
55
56
57 _OWNER = "testuser"
58 _SLUG = "blastriskripo"
59 _REF = long_id("c" * 64)
60
61
62 async def _seed_risk(
63 session: AsyncSession,
64 repo_id: str,
65 *,
66 address: str,
67 kind: str = "function",
68 risk: str = "high",
69 risk_score: int = 60,
70 impact_score: float = 0.5,
71 churn_score: float = 0.5,
72 test_gap_score: float = 1.0,
73 coupling_score: float = 0.3,
74 ) -> None:
75 stmt = (
76 pg_insert(MusehubIntelBlastRisk)
77 .values(
78 repo_id=repo_id,
79 address=address,
80 kind=kind,
81 risk=risk,
82 risk_score=risk_score,
83 impact_score=impact_score,
84 churn_score=churn_score,
85 test_gap_score=test_gap_score,
86 coupling_score=coupling_score,
87 ref=_REF,
88 )
89 .on_conflict_do_update(
90 index_elements=["repo_id", "address"],
91 set_={
92 "risk": risk,
93 "risk_score": risk_score,
94 "impact_score": impact_score,
95 "churn_score": churn_score,
96 },
97 )
98 )
99 await session.execute(stmt)
100 await session.flush()
101
102
103 # ---------------------------------------------------------------------------
104 # Fixtures
105 # ---------------------------------------------------------------------------
106
107 @pytest_asyncio.fixture
108 async def risk_repo(db_session: AsyncSession):
109 return await create_repo(db_session, owner=_OWNER, slug=_SLUG)
110
111
112 @pytest_asyncio.fixture
113 async def risk_repo_with_rows(db_session: AsyncSession, risk_repo):
114 repo_id = risk_repo.repo_id
115 await db_session.commit()
116 await _seed_risk(db_session, repo_id, address="pkg/a.py::critical_fn",
117 risk="critical", risk_score=90)
118 await _seed_risk(db_session, repo_id, address="pkg/b.py::high_fn",
119 risk="high", risk_score=60)
120 await _seed_risk(db_session, repo_id, address="pkg/c.py::medium_fn",
121 risk="medium", risk_score=40)
122 await _seed_risk(db_session, repo_id, address="pkg/d.py::low_fn",
123 risk="low", risk_score=10)
124 await db_session.commit()
125 return risk_repo
126
127
128 # ---------------------------------------------------------------------------
129 # Layer 1 — Route registration
130 # ---------------------------------------------------------------------------
131
132 class TestBlastRiskRouteRegistration:
133
134 def test_P2_01_blast_risk_route_registered(self) -> None:
135 from musehub.api.routes.musehub.ui_intel import router
136 paths = [r.path for r in router.routes]
137 assert any("blast-risk" in p for p in paths)
138
139
140 # ---------------------------------------------------------------------------
141 # Layer 2 — Basic 200
142 # ---------------------------------------------------------------------------
143
144 class TestBlastRiskBasic:
145
146 @pytest.mark.asyncio
147 async def test_P2_02_get_returns_200(
148 self, client: AsyncClient, risk_repo
149 ) -> None:
150 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
151 assert resp.status_code == 200
152
153 @pytest.mark.asyncio
154 async def test_P2_15_content_type_html(
155 self, client: AsyncClient, risk_repo
156 ) -> None:
157 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
158 assert "text/html" in resp.headers["content-type"]
159
160
161 # ---------------------------------------------------------------------------
162 # Layer 3 — Not found
163 # ---------------------------------------------------------------------------
164
165 class TestBlastRiskNotFound:
166
167 @pytest.mark.asyncio
168 async def test_P2_03_unknown_repo_returns_404(
169 self, client: AsyncClient
170 ) -> None:
171 resp = await client.get("/nobody/norepo/intel/blast-risk")
172 assert resp.status_code == 404
173
174
175 # ---------------------------------------------------------------------------
176 # Layer 4 — Empty state
177 # ---------------------------------------------------------------------------
178
179 class TestBlastRiskEmptyState:
180
181 @pytest.mark.asyncio
182 async def test_P2_04_empty_repo_shows_empty_state(
183 self, client: AsyncClient, risk_repo
184 ) -> None:
185 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
186 assert resp.status_code == 200
187 # Must contain some empty-state marker — "no symbols" or "0" or similar
188 text = resp.text.lower()
189 assert "no " in text or "0 " in text or "empty" in text or "risk" in text
190
191
192 # ---------------------------------------------------------------------------
193 # Layer 5 — Rows rendered
194 # ---------------------------------------------------------------------------
195
196 class TestBlastRiskRowsRendered:
197
198 @pytest.mark.asyncio
199 async def test_P2_05_seeded_rows_appear_in_response(
200 self, client: AsyncClient, db_session: AsyncSession, risk_repo
201 ) -> None:
202 repo_id = risk_repo.repo_id
203 await db_session.commit()
204 await _seed_risk(db_session, repo_id, address="pkg/render.py::visible_fn",
205 risk="high", risk_score=65)
206 await db_session.commit()
207
208 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
209 assert "visible_fn" in resp.text
210
211
212 # ---------------------------------------------------------------------------
213 # Layer 6 — Order by score DESC
214 # ---------------------------------------------------------------------------
215
216 class TestBlastRiskOrdering:
217
218 @pytest.mark.asyncio
219 async def test_P2_06_highest_score_appears_before_lower(
220 self, client: AsyncClient, risk_repo_with_rows
221 ) -> None:
222 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
223 text = resp.text
224 pos_critical = text.find("critical_fn")
225 pos_high = text.find("high_fn")
226 assert pos_critical < pos_high, "critical (score=90) must appear before high (score=60)"
227
228
229 # ---------------------------------------------------------------------------
230 # Layer 7 — Filter ?risk=critical
231 # ---------------------------------------------------------------------------
232
233 class TestBlastRiskFilterCritical:
234
235 @pytest.mark.asyncio
236 async def test_P2_07_filter_risk_critical_only(
237 self, client: AsyncClient, risk_repo_with_rows
238 ) -> None:
239 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=critical")
240 assert "critical_fn" in resp.text
241 assert "high_fn" not in resp.text
242 assert "medium_fn" not in resp.text
243 assert "low_fn" not in resp.text
244
245
246 # ---------------------------------------------------------------------------
247 # Layer 8 — Filter ?risk=high
248 # ---------------------------------------------------------------------------
249
250 class TestBlastRiskFilterHigh:
251
252 @pytest.mark.asyncio
253 async def test_P2_08_filter_risk_high_only(
254 self, client: AsyncClient, risk_repo_with_rows
255 ) -> None:
256 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=high")
257 assert "high_fn" in resp.text
258 assert "critical_fn" not in resp.text
259 assert "medium_fn" not in resp.text
260 assert "low_fn" not in resp.text
261
262
263 # ---------------------------------------------------------------------------
264 # Layer 9 — Invalid risk param ignored
265 # ---------------------------------------------------------------------------
266
267 class TestBlastRiskInvalidFilter:
268
269 @pytest.mark.asyncio
270 async def test_P2_09_invalid_risk_param_returns_all_rows(
271 self, client: AsyncClient, risk_repo_with_rows
272 ) -> None:
273 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?risk=bogus")
274 assert resp.status_code == 200
275 assert "critical_fn" in resp.text
276 assert "high_fn" in resp.text
277
278
279 # ---------------------------------------------------------------------------
280 # Layer 10 — top=25 limit
281 # ---------------------------------------------------------------------------
282
283 class TestBlastRiskTopParam:
284
285 @pytest.mark.asyncio
286 async def test_P2_10_top_25_limits_rows(
287 self, client: AsyncClient, db_session: AsyncSession, risk_repo
288 ) -> None:
289 repo_id = risk_repo.repo_id
290 await db_session.commit()
291 for i in range(30):
292 await _seed_risk(
293 db_session, repo_id,
294 address=f"pkg/top_{i:03d}.py::fn_{i}",
295 risk="high", risk_score=50 + i,
296 )
297 await db_session.commit()
298
299 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=25")
300 # fn_29 (highest score) should appear; fn_00 (lowest score) should not
301 assert "fn_29" in resp.text
302 assert "fn_00" not in resp.text
303
304 @pytest.mark.asyncio
305 async def test_P2_11_invalid_top_clamped_to_default(
306 self, client: AsyncClient, risk_repo
307 ) -> None:
308 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=9999")
309 assert resp.status_code == 200
310
311 @pytest.mark.asyncio
312 async def test_P2_19_top_100_accepted(
313 self, client: AsyncClient, risk_repo
314 ) -> None:
315 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=100")
316 assert resp.status_code == 200
317
318 @pytest.mark.asyncio
319 async def test_P2_20_top_250_accepted(
320 self, client: AsyncClient, risk_repo
321 ) -> None:
322 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk?top=250")
323 assert resp.status_code == 200
324
325
326 # ---------------------------------------------------------------------------
327 # Layer 12–14 — Stats in page
328 # ---------------------------------------------------------------------------
329
330 class TestBlastRiskStats:
331
332 @pytest.mark.asyncio
333 async def test_P2_12_critical_count_in_page(
334 self, client: AsyncClient, risk_repo_with_rows
335 ) -> None:
336 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
337 # 1 critical row seeded; "1" must appear somewhere in the stat area
338 assert "1" in resp.text
339
340 @pytest.mark.asyncio
341 async def test_P2_13_high_count_in_page(
342 self, client: AsyncClient, risk_repo_with_rows
343 ) -> None:
344 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
345 assert "1" in resp.text # 1 high row
346
347 @pytest.mark.asyncio
348 async def test_P2_14_total_count_in_page(
349 self, client: AsyncClient, risk_repo_with_rows
350 ) -> None:
351 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
352 assert "4" in resp.text # 4 total rows
353
354
355 # ---------------------------------------------------------------------------
356 # Layer 16 — No subprocess
357 # ---------------------------------------------------------------------------
358
359 class TestBlastRiskNoSubprocess:
360
361 @pytest.mark.asyncio
362 async def test_P2_16_no_subprocess_called(
363 self, client: AsyncClient, risk_repo
364 ) -> None:
365 with patch("asyncio.create_subprocess_exec") as mock_proc:
366 await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
367 mock_proc.assert_not_called()
368
369
370 # ---------------------------------------------------------------------------
371 # Layer 17 — All risk tiers render
372 # ---------------------------------------------------------------------------
373
374 class TestBlastRiskAllTiers:
375
376 @pytest.mark.asyncio
377 async def test_P2_17_all_four_tiers_rendered(
378 self, client: AsyncClient, risk_repo_with_rows
379 ) -> None:
380 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
381 text = resp.text
382 assert "critical_fn" in text
383 assert "high_fn" in text
384 assert "medium_fn" in text
385 assert "low_fn" in text
386
387
388 # ---------------------------------------------------------------------------
389 # Layer 18 — Multi-repo isolation
390 # ---------------------------------------------------------------------------
391
392 class TestBlastRiskIsolation:
393
394 @pytest.mark.asyncio
395 async def test_P2_18_other_repo_rows_absent(
396 self, client: AsyncClient, db_session: AsyncSession, risk_repo
397 ) -> None:
398 other_repo = await create_repo(db_session, owner="other", slug="otherrepo")
399 other_id = other_repo.repo_id
400 await db_session.commit()
401 await _seed_risk(db_session, other_id, address="pkg/spy.py::spy_fn",
402 risk="critical", risk_score=95)
403 await db_session.commit()
404
405 resp = await client.get(f"/{_OWNER}/{_SLUG}/intel/blast-risk")
406 assert "spy_fn" not in resp.text
File History 2 commits
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7 fix: repair syntax errors from typing annotation cleanup Sonnet 4.6 21 days ago
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 21 days ago