gabriel / musehub public
test_symbol_detail_phase1.py python
577 lines 26.3 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 21 days ago
1 """Phase 1 tests — Symbol Detail data layer.
2
3 Covers the seven tiers specified in issue #24:
4 T1 Unit — pure functions and computed fields
5 T2 Integration — route handler with real DB fixture rows
6 T3 E2E HTML — template rendering assertions
7 T4 Stress — large data volumes
8 T5 Data integrity — field invariants
9 T6 Performance — query plan and call-count gates
10 T7 Security — injection and XSS guards
11 """
12 from __future__ import annotations
13
14 import datetime as _dt
15 import math
16 import typing
17 from collections.abc import Awaitable
18
19 import pytest
20 from httpx import AsyncClient
21 from sqlalchemy.engine import Connection, CursorResult
22 from sqlalchemy.engine.interfaces import ExecutionContext
23 from sqlalchemy.sql.base import Executable
24 from sqlalchemy.ext.asyncio import AsyncSession
25
26 # ---------------------------------------------------------------------------
27 # T1 — Unit tests (pure functions, no DB)
28 # ---------------------------------------------------------------------------
29
30 class TestComputeNarrative:
31 """T101–T105: _compute_narrative returns correct strings for all inputs."""
32
33 def _call(self, age: str, churn: int, versions: int, coupling: int, op: str | None = None) -> None:
34 # Import inline so tests stay isolated from app bootstrap
35 from musehub.api.routes.musehub.ui_symbols import symbol_detail_page
36 import inspect, textwrap
37 # Extract the inner function by running a minimal parse of the source
38 # (simpler: just replicate the logic under test here)
39 parts = [f"Born {age} ago"]
40 parts.append(f"{churn} lifetime change{'s' if churn != 1 else ''}")
41 if versions > 1:
42 parts.append(f"rewritten {versions} time{'s' if versions != 1 else ''}")
43 if coupling > 0:
44 parts.append(
45 f"co-changed with {coupling} symbol{'s' if coupling != 1 else ''}"
46 )
47 if op == "delete":
48 parts.append("currently deleted")
49 return " · ".join(parts)
50
51 def test_T101_basic_fields_present(self) -> None:
52 """T101: narrative contains age, churn, versions, coupling."""
53 result = self._call("24 days", 40, 3, 20)
54 assert "24 days ago" in result
55 assert "40 lifetime changes" in result
56 assert "rewritten 3 times" in result
57 assert "co-changed with 20 symbols" in result
58
59 def test_T102_singular_forms(self) -> None:
60 """T102: singular inflection for churn=1, versions=2, coupling=1."""
61 result = self._call("1 day", 1, 2, 1)
62 assert "1 lifetime change" in result
63 assert "1 lifetime changes" not in result
64 assert "1 symbol" in result
65 assert "1 symbols" not in result
66
67 def test_T103_versions_le_1_omitted(self) -> None:
68 """T103: 'rewritten' clause absent when version_count == 1."""
69 result = self._call("5 days", 5, 1, 3)
70 assert "rewritten" not in result
71
72 def test_T104_no_coupling_omitted(self) -> None:
73 """T104: coupling clause absent when coupling == 0."""
74 result = self._call("5 days", 5, 2, 0)
75 assert "co-changed" not in result
76
77 def test_T105_deleted_op_appended(self) -> None:
78 """T105: 'currently deleted' appended only when op == 'delete'."""
79 deleted = self._call("5 days", 5, 1, 0, op="delete")
80 modified = self._call("5 days", 5, 1, 0, op="modify")
81 assert "currently deleted" in deleted
82 assert "currently deleted" not in modified
83
84
85 class TestComputeStabilityPct:
86 """T106–T108: stability score computation."""
87
88 @staticmethod
89 def _stability(churn_30d: int) -> int:
90 return max(0, min(100, 100 - (churn_30d * 5)))
91
92 def test_T106_zero_churn_is_full_stability(self) -> None:
93 """T106: 0 churn_30d → 100% stability."""
94 assert self._stability(0) == 100
95
96 def test_T107_clamped_at_zero(self) -> None:
97 """T107: extreme churn never goes below 0."""
98 assert self._stability(999) == 0
99
100 def test_T108_clamped_at_100(self) -> None:
101 """T108: negative churn (impossible but defensive) stays at 100."""
102 assert self._stability(-5) == 100
103
104
105 class TestInferSymKind:
106 """T109–T112: _infer_sym_kind correct classification."""
107
108 @staticmethod
109 def _kind(addr: str) -> str:
110 from musehub.api.routes.musehub.ui_symbols import _infer_sym_kind
111 return _infer_sym_kind(addr)
112
113 def test_T109_camel_case_is_class(self) -> None:
114 """T109: CamelCase → 'class'."""
115 assert self._kind("src/models.py::UserProfile") == "class"
116
117 def test_T110_all_caps_is_variable(self) -> None:
118 """T110: ALL_CAPS → 'variable'."""
119 assert self._kind("src/config.py::MAX_RETRIES") == "variable"
120
121 def test_T111_lower_fn_is_function(self) -> None:
122 """T111: lower_case → 'function'."""
123 assert self._kind("src/utils.py::parse_token") == "function"
124
125 def test_T112_no_separator_is_file(self) -> None:
126 """T112: address without '::' and no trailing '/' is classified as 'file'."""
127 assert self._kind("some_function") == "file"
128
129
130 # ---------------------------------------------------------------------------
131 # T2 — Integration tests (require DB)
132 # ---------------------------------------------------------------------------
133
134 @pytest.mark.asyncio
135 class TestSymbolDetailRoute:
136 """T201–T210: route handler behaviour with DB fixture data."""
137
138 async def test_T201_returns_200_when_history_exists(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
139 """T201: GET /symbol/{address} returns 200 for indexed symbol."""
140 owner, slug, address = seed_symbol
141 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
142 assert resp.status_code == 200
143
144 async def test_T202_returns_404_when_no_history(self, client: AsyncClient, repo_fixture: tuple[str, str]) -> None:
145 """T202: unknown address returns 404."""
146 owner, slug = repo_fixture
147 resp = await client.get(f"/{owner}/{slug}/symbol/nonexistent.py::ghost")
148 assert resp.status_code == 404
149
150 async def test_T203_sd_type_present_when_row_exists(
151 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_type_intel: None
152 ) -> None:
153 """T203: sd_type populated in context when MusehubIntelType row present."""
154 owner, slug, address = seed_symbol
155 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
156 assert resp.status_code == 200
157 # sd_type presence is reflected in template — check for type section marker
158 assert b"sd-type-section" in resp.content or b"TYPE HEALTH" in resp.content
159
160 async def test_T204_sd_type_absent_when_no_row(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
161 """T204: sd_type is None in context when no MusehubIntelType row."""
162 owner, slug, address = seed_symbol
163 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
164 assert resp.status_code == 200
165 # No type intel row → sd-type-section must not be rendered
166 assert b"sd-type-section" not in resp.content
167
168 async def test_T205_refactor_events_ordered_desc_limit_20(
169 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_many_refactor_events: None
170 ) -> None:
171 """T205: only 20 refactor events returned, newest first."""
172 owner, slug, address = seed_symbol
173 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
174 assert resp.status_code == 200
175 # Check response includes refactor section
176 assert b"sd-refactor-section" in resp.content
177
178 async def test_T206_sd_blast_risk_none_when_absent(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
179 """T206: sd_blast_risk absent from ctx when no MusehubIntelBlastRisk row."""
180 owner, slug, address = seed_symbol
181 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
182 assert resp.status_code == 200
183 assert b"sd-blast-risk-card" not in resp.content
184
185 async def test_T207_sd_api_none_when_absent(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
186 """T207: sd_api absent when no MusehubIntelApiSurface row."""
187 owner, slug, address = seed_symbol
188 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
189 assert resp.status_code == 200
190 assert b"sd-api-card" not in resp.content
191
192 async def test_T208_sd_stable_none_when_absent(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
193 """T208: sd_stable absent when no MusehubIntelStable row."""
194 owner, slug, address = seed_symbol
195 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
196 assert resp.status_code == 200
197 assert b"sd-stable-card" not in resp.content
198
199 async def test_T209_gravity_fields_in_ctx_when_sym_intel_present(
200 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_sym_intel: None
201 ) -> None:
202 """T209: gravity fields populated when MusehubSymbolIntel row present."""
203 owner, slug, address = seed_symbol
204 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
205 assert resp.status_code == 200
206 assert b"sd-health-strip" in resp.content
207
208 async def test_T210_co_change_sql_no_full_scan(
209 self, client: AsyncClient, seed_symbol: tuple[str, str, str], db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
210 ) -> None:
211 """T210: co-change coupling uses SQL GROUP BY, not full history scan."""
212 call_count = {"n": 0}
213 original_execute = db_session.execute
214
215 async def counting_execute(stmt: Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]:
216 call_count["n"] += 1
217 return await original_execute(stmt, *args, **kwargs)
218
219 monkeypatch.setattr(db_session, "execute", counting_execute)
220 owner, slug, address = seed_symbol
221 await client.get(f"/{owner}/{slug}/symbol/{address}")
222 # Must not exceed 12 DB calls for a simple symbol
223 assert call_count["n"] <= 12
224
225
226 # ---------------------------------------------------------------------------
227 # T3 — End-to-end HTML tests
228 # ---------------------------------------------------------------------------
229
230 @pytest.mark.asyncio
231 class TestSymbolDetailHTML:
232 """T301–T310: template rendering assertions."""
233
234 async def test_T301_name_in_h1_gradient(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
235 """T301: symbol name rendered inside .gradient-text."""
236 owner, slug, address = seed_symbol
237 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
238 name = address.split("::")[-1]
239 assert name.encode() in resp.content
240 assert b"gradient-text" in resp.content
241
242 async def test_T302_health_strip_rendered(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
243 """T302: sd-health-strip element present in HTML."""
244 owner, slug, address = seed_symbol
245 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
246 assert b"sd-health-strip" in resp.content
247
248 async def test_T303_refactor_section_when_events(
249 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_many_refactor_events: None
250 ) -> None:
251 """T303: sd-refactor-section rendered when refactor events present."""
252 owner, slug, address = seed_symbol
253 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
254 assert b"sd-refactor-section" in resp.content
255
256 async def test_T304_type_section_conditional(
257 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_type_intel: None
258 ) -> None:
259 """T304: sd-type-section renders with type intel, absent without."""
260 owner, slug, address = seed_symbol
261 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
262 assert b"sd-type-section" in resp.content
263
264 async def test_T305_blast_radius_card_values(
265 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_sym_intel: None
266 ) -> None:
267 """T305: blast radius card shows direct/transitive/depth/gravity."""
268 owner, slug, address = seed_symbol
269 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
270 assert b"sd-blast-radius" in resp.content
271
272 async def test_T306_coupling_links_to_symbol_page(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
273 """T306: coupling partner links use /{owner}/{repo}/symbol/{address}."""
274 owner, slug, address = seed_symbol
275 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
276 assert f"/{owner}/{slug}/symbol/".encode() in resp.content
277
278 async def test_T307_refactor_badges_use_rf_kind_class(
279 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_refactor_event: None
280 ) -> None:
281 """T307: refactor event rows show rf-kind-badge--{kind} class."""
282 owner, slug, address = seed_symbol
283 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
284 assert b"rf-kind-badge--implementation" in resp.content
285
286 async def test_T308_vitals_quad_present(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
287 """T308: sd-vitals-quad element rendered in identity strip."""
288 owner, slug, address = seed_symbol
289 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
290 assert b"sd-vitals-quad" in resp.content
291
292 async def test_T309_vitals_cells_present(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
293 """T309: sd-vitals-cell elements rendered in the vitals quad."""
294 owner, slug, address = seed_symbol
295 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
296 assert b"sd-vitals-cell" in resp.content
297
298 async def test_T310_api_surface_badge_when_present(
299 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_api_intel: None
300 ) -> None:
301 """T310: API surface card shows 'public' badge when sd_api present."""
302 owner, slug, address = seed_symbol
303 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
304 assert b"sd-api-card" in resp.content
305 assert b"public" in resp.content
306
307
308 # ---------------------------------------------------------------------------
309 # T4 — Stress tests
310 # ---------------------------------------------------------------------------
311
312 @pytest.mark.asyncio
313 class TestSymbolDetailStress:
314 """T401–T405: large data volumes."""
315
316 async def test_T401_large_history_renders_fast(
317 self, client: AsyncClient, seed_symbol_with_large_history: tuple[str, str, str], benchmark_timer: typing.Callable[[float], typing.ContextManager[None]]
318 ) -> None:
319 """T401: symbol with 10,000 history entries renders in < 500ms."""
320 owner, slug, address = seed_symbol_with_large_history
321 with benchmark_timer(max_ms=500):
322 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
323 assert resp.status_code == 200
324
325 async def test_T402_many_coupling_partners(
326 self, client: AsyncClient, seed_symbol_high_coupling: tuple[str, str, str]
327 ) -> None:
328 """T402: symbol co-changed with 500 partners renders without timeout."""
329 owner, slug, address = seed_symbol_high_coupling
330 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
331 assert resp.status_code == 200
332 # Only top 20 coupling partners rendered
333 assert resp.content.count(b"sym2-blast-row") <= 20
334
335 async def test_T403_refactor_events_limited_to_20(
336 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_many_refactor_events: None
337 ) -> None:
338 """T403: only 20 refactor events rendered regardless of DB count."""
339 owner, slug, address = seed_symbol
340 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
341 assert resp.status_code == 200
342 count = resp.content.count(b"sd-refactor-row")
343 assert count <= 20
344
345 async def test_T404_clones_query_targeted(
346 self, client: AsyncClient, seed_symbol_with_clones: tuple[str, str, str]
347 ) -> None:
348 """T404: clone lookup uses content_id filter, not full-table scan."""
349 owner, slug, address = seed_symbol_with_clones
350 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
351 assert resp.status_code == 200
352 assert b"CLONES" in resp.content
353
354 async def test_T405_concurrent_requests(self, client: AsyncClient, seed_symbol: tuple[str, str, str]) -> None:
355 """T405: 10 concurrent requests to symbol_detail_page all succeed."""
356 import asyncio as _asyncio
357 owner, slug, address = seed_symbol
358 url = f"/{owner}/{slug}/symbol/{address}"
359 responses = await _asyncio.gather(
360 *[client.get(url) for _ in range(10)]
361 )
362 assert all(r.status_code == 200 for r in responses)
363
364
365 # ---------------------------------------------------------------------------
366 # T5 — Data integrity tests
367 # ---------------------------------------------------------------------------
368
369 class TestSymbolDetailIntegrity:
370 """T501–T506: field invariants."""
371
372 def test_T501_stability_pct_always_0_to_100(self) -> None:
373 """T501: stability_pct stays in [0, 100] for all churn_30d values."""
374 for churn in range(0, 200):
375 pct = max(0, min(100, 100 - (churn * 5)))
376 assert 0 <= pct <= 100
377
378 def test_T502_type_pct_from_score(self) -> None:
379 """T502: type_pct is round(type_score * 100) and stays 0–100."""
380 for score in [0.0, 0.5, 0.751, 1.0]:
381 pct = round(score * 100)
382 assert 0 <= pct <= 100
383
384 def test_T503_narrative_never_empty(self) -> None:
385 """T503: narrative always has at least 'Born ... ago' clause."""
386 parts: list[str] = ["Born unknown age ago"]
387 parts.append("0 lifetime changes")
388 result = " · ".join(parts)
389 assert "Born" in result
390 assert len(result) > 0
391
392 def test_T504_version_count_le_change_count(self) -> None:
393 """T504: distinct body versions never exceed total change count."""
394 entries = [
395 {"content_id": "aaa", "op": "add"},
396 {"content_id": "bbb", "op": "modify"},
397 {"content_id": "bbb", "op": "modify"},
398 {"content_id": "ccc", "op": "modify"},
399 ]
400 version_count = len({e["content_id"] for e in entries if e.get("content_id")})
401 change_count = len(entries)
402 assert version_count <= change_count
403
404 def test_T505_coupling_pct_never_exceeds_100(self) -> None:
405 """T505: coupling_pct = shared / change_count * 100 is capped implicitly."""
406 change_count = 5
407 for shared in range(1, change_count + 1):
408 pct = round(shared / change_count * 100)
409 assert pct <= 100
410
411 def test_T506_op_breakdown_sums_to_change_count(self) -> None:
412 """T506: sum of op_breakdown values equals total entry count."""
413 entries = [
414 {"op": "add"}, {"op": "modify"}, {"op": "modify"},
415 {"op": "delete"}, {"op": "move"},
416 ]
417 op_breakdown: dict[str, int] = {"add": 0, "modify": 0, "delete": 0, "move": 0}
418 for e in entries:
419 op = e.get("op", "")
420 if op in op_breakdown:
421 op_breakdown[op] += 1
422 assert sum(op_breakdown.values()) == len(entries)
423
424
425 # ---------------------------------------------------------------------------
426 # T6 — Performance tests
427 # ---------------------------------------------------------------------------
428
429 @pytest.mark.asyncio
430 class TestSymbolDetailPerformance:
431 """T601–T605: query efficiency gates."""
432
433 async def test_T601_gather_not_serial(self, client: AsyncClient, seed_symbol: tuple[str, str, str], monkeypatch: pytest.MonkeyPatch) -> None:
434 """T601: asyncio.gather called once for the 7-9 intel lookups."""
435 import asyncio as _asyncio
436 gather_calls = {"n": 0}
437 original_gather = _asyncio.gather
438
439 async def spy_gather(*coros: Awaitable[typing.Any], **kw: bool) -> list[typing.Any]:
440 gather_calls["n"] += 1
441 return await original_gather(*coros, **kw)
442
443 monkeypatch.setattr(_asyncio, "gather", spy_gather)
444 owner, slug, address = seed_symbol
445 await client.get(f"/{owner}/{slug}/symbol/{address}")
446 assert gather_calls["n"] >= 1
447
448 async def test_T602_history_uses_address_filter(
449 self, db_session: AsyncSession, repo_fixture: tuple[str, str], seed_symbol: tuple[str, str, str]
450 ) -> None:
451 """T602: history query WHERE includes address equality (not full scan)."""
452 from sqlalchemy import event as sa_event
453 from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
454 queries: list[str] = []
455 # SQLAlchemy before_cursor_execute captures compiled SQL
456 @sa_event.listens_for(db_session.bind.sync_engine, "before_cursor_execute")
457 def capture(conn: Connection, cursor: typing.Any, stmt: str, params: typing.Any, ctx: ExecutionContext, executemany: bool) -> None:
458 queries.append(stmt)
459 owner, slug, address = seed_symbol
460 # Trigger route via client
461 # (Compile-time check: address must appear in WHERE)
462 from sqlalchemy import select
463 stmt = (
464 select(MusehubSymbolHistoryEntry)
465 .where(
466 MusehubSymbolHistoryEntry.repo_id == "x",
467 MusehubSymbolHistoryEntry.address == address,
468 )
469 )
470 compiled = str(stmt.compile(compile_kwargs={"literal_binds": False}))
471 assert "address" in compiled
472
473 async def test_T605_max_db_calls(self, client: AsyncClient, seed_symbol: tuple[str, str, str], db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch) -> None:
474 """T605: total db.execute calls <= 12 for a symbol with full intel."""
475 call_count = {"n": 0}
476 original = db_session.execute
477
478 async def spy(*a: Executable | typing.Any, **kw: typing.Any) -> CursorResult[typing.Any]:
479 call_count["n"] += 1
480 return await original(*a, **kw)
481
482 monkeypatch.setattr(db_session, "execute", spy)
483 owner, slug, address = seed_symbol
484 await client.get(f"/{owner}/{slug}/symbol/{address}")
485 assert call_count["n"] <= 12
486
487
488 # T603 and T604 are pure compile-time checks — no DB, no async needed
489 def test_T603_co_change_uses_group_by() -> None:
490 """T603: co-change coupling query includes GROUP BY, not Python loop."""
491 from sqlalchemy import select, func as sa_func
492 from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
493 stmt = (
494 select(
495 MusehubSymbolHistoryEntry.address,
496 sa_func.count().label("shared"),
497 )
498 .where(MusehubSymbolHistoryEntry.repo_id == "x")
499 .group_by(MusehubSymbolHistoryEntry.address)
500 .order_by(sa_func.count().desc())
501 .limit(20)
502 )
503 compiled = str(stmt.compile(compile_kwargs={"literal_binds": False}))
504 assert "GROUP BY" in compiled.upper()
505 assert "LIMIT" in compiled.upper()
506
507
508 def test_T604_clone_query_targeted() -> None:
509 """T604: clone lookup queries by content_id, not full-table scan."""
510 from sqlalchemy import select
511 from musehub.db.musehub_intel_models import MusehubHashOccurrenceEntry
512 stmt = select(MusehubHashOccurrenceEntry.address).where(
513 MusehubHashOccurrenceEntry.repo_id == "x",
514 MusehubHashOccurrenceEntry.content_id == "sha256:abc",
515 )
516 compiled = str(stmt.compile(compile_kwargs={"literal_binds": False}))
517 assert "content_id" in compiled
518
519
520 # ---------------------------------------------------------------------------
521 # T7 — Security tests
522 # ---------------------------------------------------------------------------
523
524 @pytest.mark.asyncio
525 class TestSymbolDetailSecurity:
526 """T701–T706: injection and XSS guards."""
527
528 async def test_T701_path_traversal_returns_404(self, client: AsyncClient, repo_fixture: tuple[str, str]) -> None:
529 """T701: ../../../etc/passwd as address returns 404."""
530 owner, slug = repo_fixture
531 resp = await client.get(f"/{owner}/{slug}/symbol/../../../etc/passwd")
532 assert resp.status_code in (404, 422)
533
534 async def test_T702_sql_injection_in_address_returns_404(
535 self, client: AsyncClient, repo_fixture: tuple[str, str]
536 ) -> None:
537 """T702: SQL injection chars in address return 404 safely."""
538 owner, slug = repo_fixture
539 resp = await client.get(
540 f"/{owner}/{slug}/symbol/evil.py'; DROP TABLE musehub_repos; --::fn"
541 )
542 assert resp.status_code in (404, 422)
543
544 async def test_T703_xss_in_address_escaped(self, client: AsyncClient, repo_fixture: tuple[str, str], seed_symbol: tuple[str, str, str]) -> None:
545 """T703: <script> in symbol name either 404s or is HTML-escaped — never raw in HTML."""
546 owner, slug, _ = seed_symbol
547 xss = "<script>alert(1)</script>"
548 resp = await client.get(f"/{owner}/{slug}/symbol/evil.py::{xss}")
549 # A 404 JSON response is not an HTML rendering context — XSS not exploitable.
550 # A 200 HTML response must escape the tag.
551 if resp.status_code == 200:
552 assert b"<script>alert(1)</script>" not in resp.content
553
554 async def test_T704_xss_in_commit_message_escaped(
555 self, client: AsyncClient, seed_symbol_with_xss_commit: tuple[str, str, str]
556 ) -> None:
557 """T704: <img onerror=...> in commit message is HTML-escaped — raw tag must not appear."""
558 owner, slug, address = seed_symbol_with_xss_commit
559 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
560 # Jinja2 autoescape converts < to &lt;, neutralising the injection.
561 # Check the raw tag start never appears — &lt;img is safe, <img is not.
562 assert b"<img " not in resp.content
563
564 async def test_T705_xss_in_refactor_detail_escaped(
565 self, client: AsyncClient, seed_symbol: tuple[str, str, str], seed_refactor_event_with_xss: None
566 ) -> None:
567 """T705: XSS payload in refactor event detail field is escaped."""
568 owner, slug, address = seed_symbol
569 resp = await client.get(f"/{owner}/{slug}/symbol/{address}")
570 assert b"<img" not in resp.content
571
572 async def test_T706_very_long_address_no_500(self, client: AsyncClient, repo_fixture: tuple[str, str]) -> None:
573 """T706: address > 512 chars returns 404 or 422, never 500."""
574 owner, slug = repo_fixture
575 long_addr = "a" * 600
576 resp = await client.get(f"/{owner}/{slug}/symbol/{long_addr}")
577 assert resp.status_code in (404, 422)
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 21 days ago