test_intel_type.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
21 days ago
| 1 | """Type Health dashboard — full 7-tier test suite (issue #18). |
| 2 | |
| 3 | Tests are written TDD-first: all tests in this file must be RED before |
| 4 | Phase 3–5 implementation begins, then GREEN after. |
| 5 | |
| 6 | Tiers: |
| 7 | T01–T03 Layer T1 — DB extension (return_annotation column) |
| 8 | T04–T05 Layer T2 — Provider batch performance |
| 9 | T06–T14 Layer T3 — Route (unit / integration) |
| 10 | T15–T19 Layer T4 — E2E (HTML body assertions) |
| 11 | T20–T22 Layer T5 — State integrity |
| 12 | T23–T25 Layer T6 — Performance |
| 13 | T26–T30 Layer T7 — Security |
| 14 | """ |
| 15 | from __future__ import annotations |
| 16 | |
| 17 | import time |
| 18 | from unittest.mock import AsyncMock, patch |
| 19 | |
| 20 | import typing |
| 21 | |
| 22 | import pytest |
| 23 | import pytest_asyncio |
| 24 | import sqlalchemy as sa |
| 25 | from httpx import AsyncClient |
| 26 | from sqlalchemy.engine import CursorResult |
| 27 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 28 | from sqlalchemy.ext.asyncio import AsyncSession |
| 29 | |
| 30 | from musehub.db.musehub_intel_models import MusehubIntelType |
| 31 | from musehub.db.musehub_repo_models import MusehubRepo |
| 32 | from tests.factories import create_repo |
| 33 | from muse.core.types import long_id |
| 34 | |
| 35 | _REF = long_id("a" * 64) |
| 36 | |
| 37 | |
| 38 | # --------------------------------------------------------------------------- |
| 39 | # Helpers |
| 40 | # --------------------------------------------------------------------------- |
| 41 | |
| 42 | async def _insert_type_row( |
| 43 | session: AsyncSession, |
| 44 | repo_id: str, |
| 45 | address: str, |
| 46 | kind: str = "function", |
| 47 | type_score: float = 1.0, |
| 48 | return_is_any: bool = False, |
| 49 | params_total: int = 2, |
| 50 | params_annotated: int = 2, |
| 51 | params_with_any: int = 0, |
| 52 | return_annotation: str | None = "str", |
| 53 | ) -> None: |
| 54 | await session.execute( |
| 55 | pg_insert(MusehubIntelType) |
| 56 | .values( |
| 57 | repo_id=repo_id, |
| 58 | address=address, |
| 59 | kind=kind, |
| 60 | type_score=type_score, |
| 61 | return_is_any=return_is_any, |
| 62 | params_total=params_total, |
| 63 | params_annotated=params_annotated, |
| 64 | params_with_any=params_with_any, |
| 65 | return_annotation=return_annotation, |
| 66 | ref=_REF, |
| 67 | ) |
| 68 | .on_conflict_do_update( |
| 69 | index_elements=["repo_id", "address"], |
| 70 | set_={ |
| 71 | "type_score": type_score, |
| 72 | "return_annotation": return_annotation, |
| 73 | }, |
| 74 | ) |
| 75 | ) |
| 76 | |
| 77 | |
| 78 | @pytest_asyncio.fixture |
| 79 | async def type_repo(db_session: AsyncSession) -> MusehubRepo: |
| 80 | """Repo with a mix of fully-typed, partial, untyped, and any-polluted symbols.""" |
| 81 | repo = await create_repo(db_session, owner="typeuser", slug="type-e2e") |
| 82 | rid = str(repo.repo_id) |
| 83 | |
| 84 | # fully typed (score=1.0) |
| 85 | await _insert_type_row(db_session, rid, "src/a.py::fn_full", |
| 86 | type_score=1.0, return_annotation="str") |
| 87 | # partial (score=0.75) |
| 88 | await _insert_type_row(db_session, rid, "src/b.py::fn_partial", |
| 89 | kind="method", type_score=0.75, |
| 90 | params_total=4, params_annotated=3, |
| 91 | return_annotation="None") |
| 92 | # untyped (score=0.0) |
| 93 | await _insert_type_row(db_session, rid, "src/c.py::fn_untyped", |
| 94 | type_score=0.0, params_annotated=0, |
| 95 | return_annotation=None) |
| 96 | # any-polluted (has params_with_any) |
| 97 | await _insert_type_row(db_session, rid, "src/d.py::fn_any", |
| 98 | type_score=0.75, return_is_any=False, |
| 99 | params_with_any=1, return_annotation="Any") |
| 100 | |
| 101 | await db_session.commit() |
| 102 | return repo |
| 103 | |
| 104 | |
| 105 | # ───────────────────────────────────────────────────────────────────────────── |
| 106 | # Layer T1 — DB extension |
| 107 | # ───────────────────────────────────────────────────────────────────────────── |
| 108 | |
| 109 | class TestDBExtension: |
| 110 | |
| 111 | def test_T01_return_annotation_column_exists_on_model(self) -> None: |
| 112 | """MusehubIntelType must have a return_annotation mapped column.""" |
| 113 | cols = {c.key for c in sa.inspect(MusehubIntelType).mapper.column_attrs} |
| 114 | assert "return_annotation" in cols, ( |
| 115 | "return_annotation column missing from MusehubIntelType" |
| 116 | ) |
| 117 | |
| 118 | def test_T02_return_annotation_is_nullable(self) -> None: |
| 119 | """return_annotation must be nullable (existing rows have no value).""" |
| 120 | col = MusehubIntelType.__table__.c["return_annotation"] |
| 121 | assert col.nullable, "return_annotation must be nullable" |
| 122 | |
| 123 | @pytest.mark.asyncio |
| 124 | async def test_T03_return_annotation_stored_and_retrieved( |
| 125 | self, db_session: AsyncSession |
| 126 | ) -> None: |
| 127 | """Inserting a row with return_annotation persists and round-trips.""" |
| 128 | repo = await create_repo(db_session, owner="typeuser", slug="t03") |
| 129 | await _insert_type_row(db_session, str(repo.repo_id), |
| 130 | "src/x.py::fn", return_annotation="list[str]") |
| 131 | await db_session.commit() |
| 132 | |
| 133 | row = await db_session.scalar( |
| 134 | sa.select(MusehubIntelType).where( |
| 135 | MusehubIntelType.repo_id == str(repo.repo_id), |
| 136 | MusehubIntelType.address == "src/x.py::fn", |
| 137 | ) |
| 138 | ) |
| 139 | assert row is not None |
| 140 | assert row.return_annotation == "list[str]" |
| 141 | |
| 142 | |
| 143 | # ───────────────────────────────────────────────────────────────────────────── |
| 144 | # Layer T2 — Provider batch performance |
| 145 | # ───────────────────────────────────────────────────────────────────────────── |
| 146 | |
| 147 | class TestProviderBatch: |
| 148 | |
| 149 | @pytest.mark.asyncio |
| 150 | async def test_T04_type_provider_issues_one_sql_per_chunk( |
| 151 | self, db_session: AsyncSession |
| 152 | ) -> None: |
| 153 | """TypeProvider must use batch upsert, not one execute per symbol.""" |
| 154 | from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY |
| 155 | |
| 156 | repo = await create_repo(db_session, owner="typeuser", slug="t04") |
| 157 | ref = _REF |
| 158 | |
| 159 | symbols = [ |
| 160 | { |
| 161 | "address": f"src/m{i}.py::fn", |
| 162 | "kind": "function", |
| 163 | "return_annotation": "str", |
| 164 | "return_is_any": False, |
| 165 | "params_total": 1, |
| 166 | "params_annotated": 1, |
| 167 | "params_with_any": 0, |
| 168 | "type_score": 1.0, |
| 169 | } |
| 170 | for i in range(50) |
| 171 | ] |
| 172 | muse_out = __import__("json").dumps({"symbols": symbols}) |
| 173 | |
| 174 | execute_calls: list[sa.Executable] = [] |
| 175 | original_execute = db_session.execute |
| 176 | |
| 177 | async def counting_execute(stmt: sa.Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]: |
| 178 | execute_calls.append(stmt) |
| 179 | return await original_execute(stmt, *args, **kwargs) |
| 180 | |
| 181 | with patch("asyncio.create_subprocess_exec", |
| 182 | return_value=_mock_process(muse_out)): |
| 183 | db_session.execute = counting_execute # type: ignore[method-assign] |
| 184 | await _PROVIDER_REGISTRY["intel.code.type"].compute( |
| 185 | db_session, repo.repo_id, ref, |
| 186 | {"head": ref, "owner": repo.owner, "slug": repo.slug}, |
| 187 | ) |
| 188 | db_session.execute = original_execute # type: ignore[method-assign] |
| 189 | |
| 190 | # 50 symbols fit in one chunk of 1000 — expect exactly 1 upsert execute |
| 191 | upsert_calls = [ |
| 192 | c for c in execute_calls |
| 193 | if hasattr(c, "is_dml") or "INSERT" in str(type(c).__name__).upper() |
| 194 | or "insert" in str(c).lower() |
| 195 | ] |
| 196 | assert len(upsert_calls) == 1, ( |
| 197 | f"Expected 1 batch upsert execute for 50 symbols, got {len(upsert_calls)}" |
| 198 | ) |
| 199 | |
| 200 | @pytest.mark.asyncio |
| 201 | async def test_T05_upsert_500_symbols_under_500ms( |
| 202 | self, db_session: AsyncSession |
| 203 | ) -> None: |
| 204 | """Batch-upserting 500 symbols must complete in under 500ms.""" |
| 205 | from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY |
| 206 | |
| 207 | repo = await create_repo(db_session, owner="typeuser", slug="t05") |
| 208 | symbols = [ |
| 209 | { |
| 210 | "address": f"src/file{i}.py::fn_{i}", |
| 211 | "kind": "function", |
| 212 | "return_annotation": "int", |
| 213 | "return_is_any": False, |
| 214 | "params_total": 2, |
| 215 | "params_annotated": 2, |
| 216 | "params_with_any": 0, |
| 217 | "type_score": 1.0, |
| 218 | } |
| 219 | for i in range(500) |
| 220 | ] |
| 221 | muse_out = __import__("json").dumps({"symbols": symbols}) |
| 222 | |
| 223 | t0 = time.monotonic() |
| 224 | with patch("asyncio.create_subprocess_exec", |
| 225 | return_value=_mock_process(muse_out)): |
| 226 | await _PROVIDER_REGISTRY["intel.code.type"].compute( |
| 227 | db_session, repo.repo_id, _REF, |
| 228 | {"head": _REF, "owner": repo.owner, "slug": repo.slug}, |
| 229 | ) |
| 230 | elapsed = time.monotonic() - t0 |
| 231 | assert elapsed < 0.5, f"500-symbol batch took {elapsed:.3f}s (limit: 0.5s)" |
| 232 | |
| 233 | |
| 234 | # ───────────────────────────────────────────────────────────────────────────── |
| 235 | # Layer T3 — Route (unit / integration) |
| 236 | # ───────────────────────────────────────────────────────────────────────────── |
| 237 | |
| 238 | class TestRoute: |
| 239 | |
| 240 | @pytest.mark.asyncio |
| 241 | async def test_T06_returns_200_with_empty_repo( |
| 242 | self, client: AsyncClient, db_session: AsyncSession |
| 243 | ) -> None: |
| 244 | """Route must return 200 even when musehub_intel_type has no rows.""" |
| 245 | await create_repo(db_session, owner="typeuser", slug="t06-empty") |
| 246 | await db_session.commit() |
| 247 | r = await client.get("/typeuser/t06-empty/intel/type") |
| 248 | assert r.status_code == 200 |
| 249 | |
| 250 | @pytest.mark.asyncio |
| 251 | async def test_T07_returns_200_with_data( |
| 252 | self, client: AsyncClient, type_repo: MusehubRepo |
| 253 | ) -> None: |
| 254 | """Route returns 200 when rows exist.""" |
| 255 | r = await client.get(f"/typeuser/type-e2e/intel/type") |
| 256 | assert r.status_code == 200 |
| 257 | |
| 258 | @pytest.mark.asyncio |
| 259 | async def test_T08_summary_stats_match_db_counts( |
| 260 | self, client: AsyncClient, type_repo: MusehubRepo |
| 261 | ) -> None: |
| 262 | """Coverage fraction and tier counts must be derived from DB, not hardcoded.""" |
| 263 | r = await client.get("/typeuser/type-e2e/intel/type") |
| 264 | assert r.status_code == 200 |
| 265 | body = r.text |
| 266 | # 1 fully typed out of 4 total → 25.0% |
| 267 | assert "25" in body, "coverage_pct (25%) not found in response" |
| 268 | # 1 untyped symbol |
| 269 | assert "fn_untyped" in body or "1" in body |
| 270 | |
| 271 | @pytest.mark.asyncio |
| 272 | async def test_T09_filter_tier_untyped( |
| 273 | self, client: AsyncClient, type_repo: MusehubRepo |
| 274 | ) -> None: |
| 275 | """?tier=untyped returns only symbols with type_score < 0.5.""" |
| 276 | r = await client.get("/typeuser/type-e2e/intel/type?tier=untyped") |
| 277 | assert r.status_code == 200 |
| 278 | assert "fn_untyped" in r.text |
| 279 | assert "fn_full" not in r.text |
| 280 | |
| 281 | @pytest.mark.asyncio |
| 282 | async def test_T10_filter_tier_partial( |
| 283 | self, client: AsyncClient, type_repo: MusehubRepo |
| 284 | ) -> None: |
| 285 | """?tier=partial returns only symbols with 0.5 ≤ type_score < 1.0.""" |
| 286 | r = await client.get("/typeuser/type-e2e/intel/type?tier=partial") |
| 287 | assert r.status_code == 200 |
| 288 | assert "fn_partial" in r.text |
| 289 | assert "fn_untyped" not in r.text |
| 290 | assert "fn_full" not in r.text |
| 291 | |
| 292 | @pytest.mark.asyncio |
| 293 | async def test_T11_filter_tier_any( |
| 294 | self, client: AsyncClient, type_repo: MusehubRepo |
| 295 | ) -> None: |
| 296 | """?tier=any returns only symbols with return_is_any or params_with_any > 0.""" |
| 297 | r = await client.get("/typeuser/type-e2e/intel/type?tier=any") |
| 298 | assert r.status_code == 200 |
| 299 | assert "fn_any" in r.text |
| 300 | assert "fn_full" not in r.text |
| 301 | |
| 302 | @pytest.mark.asyncio |
| 303 | async def test_T12_filter_kind_function( |
| 304 | self, client: AsyncClient, type_repo: MusehubRepo |
| 305 | ) -> None: |
| 306 | """?kind=function returns only function-kind symbols.""" |
| 307 | r = await client.get("/typeuser/type-e2e/intel/type?kind=function") |
| 308 | assert r.status_code == 200 |
| 309 | # fn_full is kind=function; fn_partial is kind=method |
| 310 | assert "fn_full" in r.text |
| 311 | assert "fn_partial" not in r.text |
| 312 | |
| 313 | @pytest.mark.asyncio |
| 314 | async def test_T13_default_sort_score_ascending( |
| 315 | self, client: AsyncClient, type_repo: MusehubRepo |
| 316 | ) -> None: |
| 317 | """Default sort is type_score ASC (worst-typed first).""" |
| 318 | r = await client.get("/typeuser/type-e2e/intel/type") |
| 319 | assert r.status_code == 200 |
| 320 | body = r.text |
| 321 | pos_untyped = body.find("fn_untyped") |
| 322 | pos_full = body.find("fn_full") |
| 323 | assert pos_untyped != -1 and pos_full != -1 |
| 324 | assert pos_untyped < pos_full, "Untyped symbol must appear before fully-typed" |
| 325 | |
| 326 | @pytest.mark.asyncio |
| 327 | async def test_T14_top_param_limits_results( |
| 328 | self, client: AsyncClient, db_session: AsyncSession |
| 329 | ) -> None: |
| 330 | """?top=20 returns at most 20 symbols even when 25 exist.""" |
| 331 | repo = await create_repo(db_session, owner="typeuser", slug="t14-top") |
| 332 | rid = str(repo.repo_id) |
| 333 | for i in range(25): |
| 334 | await _insert_type_row(db_session, rid, |
| 335 | f"src/f{i}.py::fn_{i}", type_score=float(i) / 24) |
| 336 | await db_session.commit() |
| 337 | |
| 338 | r = await client.get("/typeuser/t14-top/intel/type?top=20") |
| 339 | assert r.status_code == 200 |
| 340 | count = sum(1 for i in range(25) if f"src/f{i}.py::fn_{i}" in r.text) |
| 341 | assert count <= 20, f"Expected ≤20 results for ?top=20, got {count}" |
| 342 | |
| 343 | |
| 344 | # ───────────────────────────────────────────────────────────────────────────── |
| 345 | # Layer T4 — E2E (HTML body assertions) |
| 346 | # ───────────────────────────────────────────────────────────────────────────── |
| 347 | |
| 348 | class TestE2E: |
| 349 | |
| 350 | @pytest.mark.asyncio |
| 351 | async def test_T15_coverage_fraction_rendered_as_pct( |
| 352 | self, client: AsyncClient, type_repo: MusehubRepo |
| 353 | ) -> None: |
| 354 | """Coverage fraction (0.25) must be rendered as a percentage string.""" |
| 355 | r = await client.get("/typeuser/type-e2e/intel/type") |
| 356 | assert r.status_code == 200 |
| 357 | # 1/4 fully typed = 25% |
| 358 | assert "25" in r.text |
| 359 | |
| 360 | @pytest.mark.asyncio |
| 361 | async def test_T16_symbol_address_in_html( |
| 362 | self, client: AsyncClient, type_repo: MusehubRepo |
| 363 | ) -> None: |
| 364 | """Symbol addresses must appear verbatim in the HTML body.""" |
| 365 | r = await client.get("/typeuser/type-e2e/intel/type") |
| 366 | assert r.status_code == 200 |
| 367 | assert "src/c.py::fn_untyped" in r.text |
| 368 | |
| 369 | @pytest.mark.asyncio |
| 370 | async def test_T17_return_annotation_in_html( |
| 371 | self, client: AsyncClient, type_repo: MusehubRepo |
| 372 | ) -> None: |
| 373 | """Non-null return_annotation must appear in the rendered HTML.""" |
| 374 | r = await client.get("/typeuser/type-e2e/intel/type") |
| 375 | assert r.status_code == 200 |
| 376 | assert "list[str]" in r.text or "str" in r.text |
| 377 | |
| 378 | @pytest.mark.asyncio |
| 379 | async def test_T18_any_badge_rendered_for_any_polluted_symbol( |
| 380 | self, client: AsyncClient, type_repo: MusehubRepo |
| 381 | ) -> None: |
| 382 | """Any-pollution indicator must appear for symbols with params_with_any > 0.""" |
| 383 | r = await client.get("/typeuser/type-e2e/intel/type") |
| 384 | assert r.status_code == 200 |
| 385 | # The any-badge or warning marker must be in the HTML |
| 386 | assert "any" in r.text.lower() or "⚠" in r.text |
| 387 | |
| 388 | @pytest.mark.asyncio |
| 389 | async def test_T19_dashboard_card_links_to_type_page( |
| 390 | self, client: AsyncClient, type_repo: MusehubRepo |
| 391 | ) -> None: |
| 392 | """Intel dashboard card must include a link to /intel/type.""" |
| 393 | r = await client.get("/typeuser/type-e2e/intel") |
| 394 | assert r.status_code == 200 |
| 395 | assert b"/intel/type" in r.content |
| 396 | |
| 397 | |
| 398 | # ───────────────────────────────────────────────────────────────────────────── |
| 399 | # Layer T5 — State integrity |
| 400 | # ───────────────────────────────────────────────────────────────────────────── |
| 401 | |
| 402 | class TestStateIntegrity: |
| 403 | |
| 404 | @pytest.mark.asyncio |
| 405 | async def test_T20_push_twice_produces_one_row_per_symbol( |
| 406 | self, db_session: AsyncSession |
| 407 | ) -> None: |
| 408 | """Upserting the same address twice must not create duplicate rows.""" |
| 409 | repo = await create_repo(db_session, owner="typeuser", slug="t20-dup") |
| 410 | rid = str(repo.repo_id) |
| 411 | addr = "src/a.py::fn" |
| 412 | |
| 413 | for _ in range(2): |
| 414 | await _insert_type_row(db_session, rid, addr, type_score=1.0) |
| 415 | await db_session.commit() |
| 416 | |
| 417 | rows = (await db_session.execute( |
| 418 | sa.select(MusehubIntelType).where( |
| 419 | MusehubIntelType.repo_id == rid |
| 420 | ) |
| 421 | )).scalars().all() |
| 422 | assert len(rows) == 1, f"Expected 1 row, got {len(rows)} — upsert broken" |
| 423 | |
| 424 | @pytest.mark.asyncio |
| 425 | async def test_T21_second_push_overwrites_type_score( |
| 426 | self, db_session: AsyncSession |
| 427 | ) -> None: |
| 428 | """A second push with different type_score must overwrite the first.""" |
| 429 | repo = await create_repo(db_session, owner="typeuser", slug="t21-overwrite") |
| 430 | rid = str(repo.repo_id) |
| 431 | addr = "src/a.py::fn" |
| 432 | |
| 433 | await _insert_type_row(db_session, rid, addr, type_score=0.5) |
| 434 | await _insert_type_row(db_session, rid, addr, type_score=1.0) |
| 435 | await db_session.commit() |
| 436 | |
| 437 | row = await db_session.scalar( |
| 438 | sa.select(MusehubIntelType).where( |
| 439 | MusehubIntelType.repo_id == rid, |
| 440 | MusehubIntelType.address == addr, |
| 441 | ) |
| 442 | ) |
| 443 | assert row is not None |
| 444 | assert row.type_score == pytest.approx(1.0), ( |
| 445 | f"Expected score 1.0 after second push, got {row.type_score}" |
| 446 | ) |
| 447 | |
| 448 | @pytest.mark.asyncio |
| 449 | async def test_T22_repo_delete_cascades_type_rows( |
| 450 | self, db_session: AsyncSession |
| 451 | ) -> None: |
| 452 | """Deleting the repo must cascade-delete all musehub_intel_type rows.""" |
| 453 | from musehub.db.musehub_repo_models import MusehubRepo |
| 454 | |
| 455 | repo = await create_repo(db_session, owner="typeuser", slug="t22-cascade") |
| 456 | rid = str(repo.repo_id) |
| 457 | await _insert_type_row(db_session, rid, "src/a.py::fn") |
| 458 | await db_session.commit() |
| 459 | |
| 460 | await db_session.delete(repo) |
| 461 | await db_session.commit() |
| 462 | |
| 463 | remaining = (await db_session.execute( |
| 464 | sa.select(MusehubIntelType).where( |
| 465 | MusehubIntelType.repo_id == rid |
| 466 | ) |
| 467 | )).scalars().all() |
| 468 | assert not remaining, "Cascade delete failed — type rows remain after repo delete" |
| 469 | |
| 470 | |
| 471 | # ───────────────────────────────────────────────────────────────────────────── |
| 472 | # Layer T6 — Performance |
| 473 | # ───────────────────────────────────────────────────────────────────────────── |
| 474 | |
| 475 | class TestPerformance: |
| 476 | |
| 477 | @pytest.mark.asyncio |
| 478 | async def test_T23_route_responds_under_200ms_for_10k_symbols( |
| 479 | self, client: AsyncClient, db_session: AsyncSession |
| 480 | ) -> None: |
| 481 | """Route must respond in < 200ms for a repo with 10,000 symbol rows.""" |
| 482 | repo = await create_repo(db_session, owner="typeuser", slug="t23-perf") |
| 483 | rid = str(repo.repo_id) |
| 484 | |
| 485 | # Insert 10k rows via direct batch insert |
| 486 | chunk = 1000 |
| 487 | for start in range(0, 10_000, chunk): |
| 488 | rows = [ |
| 489 | { |
| 490 | "repo_id": rid, |
| 491 | "address": f"src/file{i}.py::fn_{i}", |
| 492 | "kind": "function", |
| 493 | "type_score": 1.0 if i % 3 != 0 else 0.5, |
| 494 | "return_is_any": False, |
| 495 | "params_total": 2, |
| 496 | "params_annotated": 2, |
| 497 | "params_with_any": 0, |
| 498 | "return_annotation": "str", |
| 499 | "ref": _REF, |
| 500 | } |
| 501 | for i in range(start, start + chunk) |
| 502 | ] |
| 503 | await db_session.execute( |
| 504 | pg_insert(MusehubIntelType) |
| 505 | .values(rows) |
| 506 | .on_conflict_do_nothing() |
| 507 | ) |
| 508 | await db_session.commit() |
| 509 | |
| 510 | t0 = time.monotonic() |
| 511 | r = await client.get(f"/typeuser/t23-perf/intel/type") |
| 512 | elapsed = time.monotonic() - t0 |
| 513 | |
| 514 | assert r.status_code == 200 |
| 515 | assert elapsed < 0.2, f"Route took {elapsed:.3f}s for 10k symbols (limit: 0.2s)" |
| 516 | |
| 517 | @pytest.mark.asyncio |
| 518 | async def test_T24_db_query_uses_repo_index( |
| 519 | self, db_session: AsyncSession |
| 520 | ) -> None: |
| 521 | """SELECT on musehub_intel_type must use ix_intel_type_repo index.""" |
| 522 | explain = await db_session.execute( |
| 523 | sa.text( |
| 524 | "EXPLAIN SELECT * FROM musehub_intel_type WHERE repo_id = 'x'" |
| 525 | ) |
| 526 | ) |
| 527 | plan = " ".join(row[0] for row in explain.all()) |
| 528 | assert "ix_intel_type_repo" in plan or "Index" in plan, ( |
| 529 | f"Query plan does not use ix_intel_type_repo:\n{plan}" |
| 530 | ) |
| 531 | |
| 532 | @pytest.mark.asyncio |
| 533 | async def test_T25_batch_upsert_500_symbols_under_500ms( |
| 534 | self, db_session: AsyncSession |
| 535 | ) -> None: |
| 536 | """Direct batch upsert of 500 rows must complete in < 500ms wall time.""" |
| 537 | repo = await create_repo(db_session, owner="typeuser", slug="t25-batch") |
| 538 | rid = str(repo.repo_id) |
| 539 | rows = [ |
| 540 | { |
| 541 | "repo_id": rid, |
| 542 | "address": f"src/f{i}.py::fn", |
| 543 | "kind": "function", |
| 544 | "type_score": 0.9, |
| 545 | "return_is_any": False, |
| 546 | "params_total": 1, |
| 547 | "params_annotated": 1, |
| 548 | "params_with_any": 0, |
| 549 | "return_annotation": None, |
| 550 | "ref": _REF, |
| 551 | } |
| 552 | for i in range(500) |
| 553 | ] |
| 554 | t0 = time.monotonic() |
| 555 | await db_session.execute( |
| 556 | pg_insert(MusehubIntelType) |
| 557 | .values(rows) |
| 558 | .on_conflict_do_nothing() |
| 559 | ) |
| 560 | await db_session.commit() |
| 561 | elapsed = time.monotonic() - t0 |
| 562 | assert elapsed < 0.5, f"500-row batch took {elapsed:.3f}s (limit: 0.5s)" |
| 563 | |
| 564 | |
| 565 | # ───────────────────────────────────────────────────────────────────────────── |
| 566 | # Layer T7 — Security |
| 567 | # ───────────────────────────────────────────────────────────────────────────── |
| 568 | |
| 569 | class TestSecurity: |
| 570 | |
| 571 | @pytest.mark.asyncio |
| 572 | async def test_T26_xss_in_address_is_escaped( |
| 573 | self, client: AsyncClient, db_session: AsyncSession |
| 574 | ) -> None: |
| 575 | """XSS payload in address must be HTML-escaped in the response.""" |
| 576 | repo = await create_repo(db_session, owner="typeuser", slug="t26-xss") |
| 577 | rid = str(repo.repo_id) |
| 578 | xss = '<script>alert(1)</script>' |
| 579 | # Truncate to fit VARCHAR(512) and make it a valid-ish address |
| 580 | await _insert_type_row(db_session, rid, |
| 581 | f"src/x.py::{xss[:40]}", type_score=0.0) |
| 582 | await db_session.commit() |
| 583 | |
| 584 | r = await client.get("/typeuser/t26-xss/intel/type") |
| 585 | assert r.status_code == 200 |
| 586 | # Jinja2 autoescape must convert <script> to <script>. |
| 587 | # The raw executable tag must not appear unescaped. |
| 588 | assert "<script>alert" not in r.text, "XSS in address not escaped" |
| 589 | |
| 590 | @pytest.mark.asyncio |
| 591 | async def test_T27_xss_in_return_annotation_is_escaped( |
| 592 | self, client: AsyncClient, db_session: AsyncSession |
| 593 | ) -> None: |
| 594 | """XSS payload in return_annotation must be HTML-escaped.""" |
| 595 | repo = await create_repo(db_session, owner="typeuser", slug="t27-xss-ret") |
| 596 | rid = str(repo.repo_id) |
| 597 | await _insert_type_row(db_session, rid, "src/x.py::fn", |
| 598 | return_annotation='<img src=x onerror=alert(1)>') |
| 599 | await db_session.commit() |
| 600 | |
| 601 | r = await client.get("/typeuser/t27-xss-ret/intel/type") |
| 602 | assert r.status_code == 200 |
| 603 | # Raw unescaped tag must not appear; <img ... is safe. |
| 604 | assert "<img src=x onerror" not in r.text, "XSS in return_annotation not escaped" |
| 605 | |
| 606 | @pytest.mark.asyncio |
| 607 | async def test_T28_unknown_tier_param_treated_as_all( |
| 608 | self, client: AsyncClient, type_repo: MusehubRepo |
| 609 | ) -> None: |
| 610 | """?tier=unknown must return 200 (treated as 'all'), not 400/500.""" |
| 611 | r = await client.get("/typeuser/type-e2e/intel/type?tier=garbage") |
| 612 | assert r.status_code == 200 |
| 613 | |
| 614 | @pytest.mark.asyncio |
| 615 | async def test_T29_non_integer_top_param_returns_422( |
| 616 | self, client: AsyncClient, type_repo: MusehubRepo |
| 617 | ) -> None: |
| 618 | """?top=notanumber must be rejected with 422 (FastAPI type validation).""" |
| 619 | r = await client.get("/typeuser/type-e2e/intel/type?top=notanumber") |
| 620 | assert r.status_code == 422 |
| 621 | |
| 622 | @pytest.mark.asyncio |
| 623 | async def test_T30_private_repo_returns_403_or_404_unauthenticated( |
| 624 | self, client: AsyncClient |
| 625 | ) -> None: |
| 626 | """A non-existent repo path must not return 200 or 500.""" |
| 627 | r = await client.get("/nobody/no-such-repo/intel/type") |
| 628 | assert r.status_code in (403, 404) |
| 629 | |
| 630 | |
| 631 | # --------------------------------------------------------------------------- |
| 632 | # Internal helpers |
| 633 | # --------------------------------------------------------------------------- |
| 634 | |
| 635 | def _mock_process(stdout: str, returncode: int = 0) -> AsyncMock: |
| 636 | proc = AsyncMock() |
| 637 | proc.returncode = returncode |
| 638 | proc.communicate = AsyncMock(return_value=(stdout.encode(), b"")) |
| 639 | return proc |
| 640 | |
| 641 | |
| 642 | async def _make_repo_via_client(client: AsyncClient, owner: str, slug: str) -> str: |
| 643 | """Return slug — the caller constructs the path externally.""" |
| 644 | return slug |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
21 days ago