"""Type Health dashboard — full 7-tier test suite (issue #18). Tests are written TDD-first: all tests in this file must be RED before Phase 3–5 implementation begins, then GREEN after. Tiers: T01–T03 Layer T1 — DB extension (return_annotation column) T04–T05 Layer T2 — Provider batch performance T06–T14 Layer T3 — Route (unit / integration) T15–T19 Layer T4 — E2E (HTML body assertions) T20–T22 Layer T5 — State integrity T23–T25 Layer T6 — Performance T26–T30 Layer T7 — Security """ from __future__ import annotations import time from unittest.mock import AsyncMock, patch import typing import pytest import pytest_asyncio import sqlalchemy as sa from httpx import AsyncClient from sqlalchemy.engine import CursorResult from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelType from musehub.db.musehub_repo_models import MusehubRepo from tests.factories import create_repo from muse.core.types import long_id _REF = long_id("a" * 64) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _insert_type_row( session: AsyncSession, repo_id: str, address: str, kind: str = "function", type_score: float = 1.0, return_is_any: bool = False, params_total: int = 2, params_annotated: int = 2, params_with_any: int = 0, return_annotation: str | None = "str", ) -> None: await session.execute( pg_insert(MusehubIntelType) .values( repo_id=repo_id, address=address, kind=kind, type_score=type_score, return_is_any=return_is_any, params_total=params_total, params_annotated=params_annotated, params_with_any=params_with_any, return_annotation=return_annotation, ref=_REF, ) .on_conflict_do_update( index_elements=["repo_id", "address"], set_={ "type_score": type_score, "return_annotation": return_annotation, }, ) ) @pytest_asyncio.fixture async def type_repo(db_session: AsyncSession) -> MusehubRepo: """Repo with a mix of fully-typed, partial, untyped, and any-polluted symbols.""" repo = await create_repo(db_session, owner="typeuser", slug="type-e2e") rid = str(repo.repo_id) # fully typed (score=1.0) await _insert_type_row(db_session, rid, "src/a.py::fn_full", type_score=1.0, return_annotation="str") # partial (score=0.75) await _insert_type_row(db_session, rid, "src/b.py::fn_partial", kind="method", type_score=0.75, params_total=4, params_annotated=3, return_annotation="None") # untyped (score=0.0) await _insert_type_row(db_session, rid, "src/c.py::fn_untyped", type_score=0.0, params_annotated=0, return_annotation=None) # any-polluted (has params_with_any) await _insert_type_row(db_session, rid, "src/d.py::fn_any", type_score=0.75, return_is_any=False, params_with_any=1, return_annotation="Any") await db_session.commit() return repo # ───────────────────────────────────────────────────────────────────────────── # Layer T1 — DB extension # ───────────────────────────────────────────────────────────────────────────── class TestDBExtension: def test_T01_return_annotation_column_exists_on_model(self) -> None: """MusehubIntelType must have a return_annotation mapped column.""" cols = {c.key for c in sa.inspect(MusehubIntelType).mapper.column_attrs} assert "return_annotation" in cols, ( "return_annotation column missing from MusehubIntelType" ) def test_T02_return_annotation_is_nullable(self) -> None: """return_annotation must be nullable (existing rows have no value).""" col = MusehubIntelType.__table__.c["return_annotation"] assert col.nullable, "return_annotation must be nullable" @pytest.mark.asyncio async def test_T03_return_annotation_stored_and_retrieved( self, db_session: AsyncSession ) -> None: """Inserting a row with return_annotation persists and round-trips.""" repo = await create_repo(db_session, owner="typeuser", slug="t03") await _insert_type_row(db_session, str(repo.repo_id), "src/x.py::fn", return_annotation="list[str]") await db_session.commit() row = await db_session.scalar( sa.select(MusehubIntelType).where( MusehubIntelType.repo_id == str(repo.repo_id), MusehubIntelType.address == "src/x.py::fn", ) ) assert row is not None assert row.return_annotation == "list[str]" # ───────────────────────────────────────────────────────────────────────────── # Layer T2 — Provider batch performance # ───────────────────────────────────────────────────────────────────────────── class TestProviderBatch: @pytest.mark.asyncio async def test_T04_type_provider_issues_one_sql_per_chunk( self, db_session: AsyncSession ) -> None: """TypeProvider must use batch upsert, not one execute per symbol.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session, owner="typeuser", slug="t04") ref = _REF symbols = [ { "address": f"src/m{i}.py::fn", "kind": "function", "return_annotation": "str", "return_is_any": False, "params_total": 1, "params_annotated": 1, "params_with_any": 0, "type_score": 1.0, } for i in range(50) ] muse_out = __import__("json").dumps({"symbols": symbols}) execute_calls: list[sa.Executable] = [] original_execute = db_session.execute async def counting_execute(stmt: sa.Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]: execute_calls.append(stmt) return await original_execute(stmt, *args, **kwargs) with patch("asyncio.create_subprocess_exec", return_value=_mock_process(muse_out)): db_session.execute = counting_execute # type: ignore[method-assign] await _PROVIDER_REGISTRY["intel.code.type"].compute( db_session, repo.repo_id, ref, {"head": ref, "owner": repo.owner, "slug": repo.slug}, ) db_session.execute = original_execute # type: ignore[method-assign] # 50 symbols fit in one chunk of 1000 — expect exactly 1 upsert execute upsert_calls = [ c for c in execute_calls if hasattr(c, "is_dml") or "INSERT" in str(type(c).__name__).upper() or "insert" in str(c).lower() ] assert len(upsert_calls) == 1, ( f"Expected 1 batch upsert execute for 50 symbols, got {len(upsert_calls)}" ) @pytest.mark.asyncio async def test_T05_upsert_500_symbols_under_500ms( self, db_session: AsyncSession ) -> None: """Batch-upserting 500 symbols must complete in under 500ms.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session, owner="typeuser", slug="t05") symbols = [ { "address": f"src/file{i}.py::fn_{i}", "kind": "function", "return_annotation": "int", "return_is_any": False, "params_total": 2, "params_annotated": 2, "params_with_any": 0, "type_score": 1.0, } for i in range(500) ] muse_out = __import__("json").dumps({"symbols": symbols}) t0 = time.monotonic() with patch("asyncio.create_subprocess_exec", return_value=_mock_process(muse_out)): await _PROVIDER_REGISTRY["intel.code.type"].compute( db_session, repo.repo_id, _REF, {"head": _REF, "owner": repo.owner, "slug": repo.slug}, ) elapsed = time.monotonic() - t0 assert elapsed < 0.5, f"500-symbol batch took {elapsed:.3f}s (limit: 0.5s)" # ───────────────────────────────────────────────────────────────────────────── # Layer T3 — Route (unit / integration) # ───────────────────────────────────────────────────────────────────────────── class TestRoute: @pytest.mark.asyncio async def test_T06_returns_200_with_empty_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Route must return 200 even when musehub_intel_type has no rows.""" await create_repo(db_session, owner="typeuser", slug="t06-empty") await db_session.commit() r = await client.get("/typeuser/t06-empty/intel/type") assert r.status_code == 200 @pytest.mark.asyncio async def test_T07_returns_200_with_data( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Route returns 200 when rows exist.""" r = await client.get(f"/typeuser/type-e2e/intel/type") assert r.status_code == 200 @pytest.mark.asyncio async def test_T08_summary_stats_match_db_counts( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Coverage fraction and tier counts must be derived from DB, not hardcoded.""" r = await client.get("/typeuser/type-e2e/intel/type") assert r.status_code == 200 body = r.text # 1 fully typed out of 4 total → 25.0% assert "25" in body, "coverage_pct (25%) not found in response" # 1 untyped symbol assert "fn_untyped" in body or "1" in body @pytest.mark.asyncio async def test_T09_filter_tier_untyped( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """?tier=untyped returns only symbols with type_score < 0.5.""" r = await client.get("/typeuser/type-e2e/intel/type?tier=untyped") assert r.status_code == 200 assert "fn_untyped" in r.text assert "fn_full" not in r.text @pytest.mark.asyncio async def test_T10_filter_tier_partial( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """?tier=partial returns only symbols with 0.5 ≤ type_score < 1.0.""" r = await client.get("/typeuser/type-e2e/intel/type?tier=partial") assert r.status_code == 200 assert "fn_partial" in r.text assert "fn_untyped" not in r.text assert "fn_full" not in r.text @pytest.mark.asyncio async def test_T11_filter_tier_any( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """?tier=any returns only symbols with return_is_any or params_with_any > 0.""" r = await client.get("/typeuser/type-e2e/intel/type?tier=any") assert r.status_code == 200 assert "fn_any" in r.text assert "fn_full" not in r.text @pytest.mark.asyncio async def test_T12_filter_kind_function( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """?kind=function returns only function-kind symbols.""" r = await client.get("/typeuser/type-e2e/intel/type?kind=function") assert r.status_code == 200 # fn_full is kind=function; fn_partial is kind=method assert "fn_full" in r.text assert "fn_partial" not in r.text @pytest.mark.asyncio async def test_T13_default_sort_score_ascending( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Default sort is type_score ASC (worst-typed first).""" r = await client.get("/typeuser/type-e2e/intel/type") assert r.status_code == 200 body = r.text pos_untyped = body.find("fn_untyped") pos_full = body.find("fn_full") assert pos_untyped != -1 and pos_full != -1 assert pos_untyped < pos_full, "Untyped symbol must appear before fully-typed" @pytest.mark.asyncio async def test_T14_top_param_limits_results( self, client: AsyncClient, db_session: AsyncSession ) -> None: """?top=20 returns at most 20 symbols even when 25 exist.""" repo = await create_repo(db_session, owner="typeuser", slug="t14-top") rid = str(repo.repo_id) for i in range(25): await _insert_type_row(db_session, rid, f"src/f{i}.py::fn_{i}", type_score=float(i) / 24) await db_session.commit() r = await client.get("/typeuser/t14-top/intel/type?top=20") assert r.status_code == 200 count = sum(1 for i in range(25) if f"src/f{i}.py::fn_{i}" in r.text) assert count <= 20, f"Expected ≤20 results for ?top=20, got {count}" # ───────────────────────────────────────────────────────────────────────────── # Layer T4 — E2E (HTML body assertions) # ───────────────────────────────────────────────────────────────────────────── class TestE2E: @pytest.mark.asyncio async def test_T15_coverage_fraction_rendered_as_pct( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Coverage fraction (0.25) must be rendered as a percentage string.""" r = await client.get("/typeuser/type-e2e/intel/type") assert r.status_code == 200 # 1/4 fully typed = 25% assert "25" in r.text @pytest.mark.asyncio async def test_T16_symbol_address_in_html( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Symbol addresses must appear verbatim in the HTML body.""" r = await client.get("/typeuser/type-e2e/intel/type") assert r.status_code == 200 assert "src/c.py::fn_untyped" in r.text @pytest.mark.asyncio async def test_T17_return_annotation_in_html( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Non-null return_annotation must appear in the rendered HTML.""" r = await client.get("/typeuser/type-e2e/intel/type") assert r.status_code == 200 assert "list[str]" in r.text or "str" in r.text @pytest.mark.asyncio async def test_T18_any_badge_rendered_for_any_polluted_symbol( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Any-pollution indicator must appear for symbols with params_with_any > 0.""" r = await client.get("/typeuser/type-e2e/intel/type") assert r.status_code == 200 # The any-badge or warning marker must be in the HTML assert "any" in r.text.lower() or "⚠" in r.text @pytest.mark.asyncio async def test_T19_dashboard_card_links_to_type_page( self, client: AsyncClient, type_repo: MusehubRepo ) -> None: """Intel dashboard card must include a link to /intel/type.""" r = await client.get("/typeuser/type-e2e/intel") assert r.status_code == 200 assert b"/intel/type" in r.content # ───────────────────────────────────────────────────────────────────────────── # Layer T5 — State integrity # ───────────────────────────────────────────────────────────────────────────── class TestStateIntegrity: @pytest.mark.asyncio async def test_T20_push_twice_produces_one_row_per_symbol( self, db_session: AsyncSession ) -> None: """Upserting the same address twice must not create duplicate rows.""" repo = await create_repo(db_session, owner="typeuser", slug="t20-dup") rid = str(repo.repo_id) addr = "src/a.py::fn" for _ in range(2): await _insert_type_row(db_session, rid, addr, type_score=1.0) await db_session.commit() rows = (await db_session.execute( sa.select(MusehubIntelType).where( MusehubIntelType.repo_id == rid ) )).scalars().all() assert len(rows) == 1, f"Expected 1 row, got {len(rows)} — upsert broken" @pytest.mark.asyncio async def test_T21_second_push_overwrites_type_score( self, db_session: AsyncSession ) -> None: """A second push with different type_score must overwrite the first.""" repo = await create_repo(db_session, owner="typeuser", slug="t21-overwrite") rid = str(repo.repo_id) addr = "src/a.py::fn" await _insert_type_row(db_session, rid, addr, type_score=0.5) await _insert_type_row(db_session, rid, addr, type_score=1.0) await db_session.commit() row = await db_session.scalar( sa.select(MusehubIntelType).where( MusehubIntelType.repo_id == rid, MusehubIntelType.address == addr, ) ) assert row is not None assert row.type_score == pytest.approx(1.0), ( f"Expected score 1.0 after second push, got {row.type_score}" ) @pytest.mark.asyncio async def test_T22_repo_delete_cascades_type_rows( self, db_session: AsyncSession ) -> None: """Deleting the repo must cascade-delete all musehub_intel_type rows.""" from musehub.db.musehub_repo_models import MusehubRepo repo = await create_repo(db_session, owner="typeuser", slug="t22-cascade") rid = str(repo.repo_id) await _insert_type_row(db_session, rid, "src/a.py::fn") await db_session.commit() await db_session.delete(repo) await db_session.commit() remaining = (await db_session.execute( sa.select(MusehubIntelType).where( MusehubIntelType.repo_id == rid ) )).scalars().all() assert not remaining, "Cascade delete failed — type rows remain after repo delete" # ───────────────────────────────────────────────────────────────────────────── # Layer T6 — Performance # ───────────────────────────────────────────────────────────────────────────── class TestPerformance: @pytest.mark.asyncio async def test_T23_route_responds_under_200ms_for_10k_symbols( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Route must respond in < 200ms for a repo with 10,000 symbol rows.""" repo = await create_repo(db_session, owner="typeuser", slug="t23-perf") rid = str(repo.repo_id) # Insert 10k rows via direct batch insert chunk = 1000 for start in range(0, 10_000, chunk): rows = [ { "repo_id": rid, "address": f"src/file{i}.py::fn_{i}", "kind": "function", "type_score": 1.0 if i % 3 != 0 else 0.5, "return_is_any": False, "params_total": 2, "params_annotated": 2, "params_with_any": 0, "return_annotation": "str", "ref": _REF, } for i in range(start, start + chunk) ] await db_session.execute( pg_insert(MusehubIntelType) .values(rows) .on_conflict_do_nothing() ) await db_session.commit() t0 = time.monotonic() r = await client.get(f"/typeuser/t23-perf/intel/type") elapsed = time.monotonic() - t0 assert r.status_code == 200 assert elapsed < 0.2, f"Route took {elapsed:.3f}s for 10k symbols (limit: 0.2s)" @pytest.mark.asyncio async def test_T24_db_query_uses_repo_index( self, db_session: AsyncSession ) -> None: """SELECT on musehub_intel_type must use ix_intel_type_repo index.""" explain = await db_session.execute( sa.text( "EXPLAIN SELECT * FROM musehub_intel_type WHERE repo_id = 'x'" ) ) plan = " ".join(row[0] for row in explain.all()) assert "ix_intel_type_repo" in plan or "Index" in plan, ( f"Query plan does not use ix_intel_type_repo:\n{plan}" ) @pytest.mark.asyncio async def test_T25_batch_upsert_500_symbols_under_500ms( self, db_session: AsyncSession ) -> None: """Direct batch upsert of 500 rows must complete in < 500ms wall time.""" repo = await create_repo(db_session, owner="typeuser", slug="t25-batch") rid = str(repo.repo_id) rows = [ { "repo_id": rid, "address": f"src/f{i}.py::fn", "kind": "function", "type_score": 0.9, "return_is_any": False, "params_total": 1, "params_annotated": 1, "params_with_any": 0, "return_annotation": None, "ref": _REF, } for i in range(500) ] t0 = time.monotonic() await db_session.execute( pg_insert(MusehubIntelType) .values(rows) .on_conflict_do_nothing() ) await db_session.commit() elapsed = time.monotonic() - t0 assert elapsed < 0.5, f"500-row batch took {elapsed:.3f}s (limit: 0.5s)" # ───────────────────────────────────────────────────────────────────────────── # Layer T7 — Security # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: @pytest.mark.asyncio async def test_T26_xss_in_address_is_escaped( self, client: AsyncClient, db_session: AsyncSession ) -> None: """XSS payload in address must be HTML-escaped in the response.""" repo = await create_repo(db_session, owner="typeuser", slug="t26-xss") rid = str(repo.repo_id) xss = '' # Truncate to fit VARCHAR(512) and make it a valid-ish address await _insert_type_row(db_session, rid, f"src/x.py::{xss[:40]}", type_score=0.0) await db_session.commit() r = await client.get("/typeuser/t26-xss/intel/type") assert r.status_code == 200 # Jinja2 autoescape must convert