"""Type Health dashboard — full 7-tier test suite (issue #18).
Tests are written TDD-first: all tests in this file must be RED before
Phase 3–5 implementation begins, then GREEN after.
Tiers:
T01–T03 Layer T1 — DB extension (return_annotation column)
T04–T05 Layer T2 — Provider batch performance
T06–T14 Layer T3 — Route (unit / integration)
T15–T19 Layer T4 — E2E (HTML body assertions)
T20–T22 Layer T5 — State integrity
T23–T25 Layer T6 — Performance
T26–T30 Layer T7 — Security
"""
from __future__ import annotations
import time
from unittest.mock import AsyncMock, patch
import typing
import pytest
import pytest_asyncio
import sqlalchemy as sa
from httpx import AsyncClient
from sqlalchemy.engine import CursorResult
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.db.musehub_intel_models import MusehubIntelType
from musehub.db.musehub_repo_models import MusehubRepo
from tests.factories import create_repo
from muse.core.types import long_id
_REF = long_id("a" * 64)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _insert_type_row(
session: AsyncSession,
repo_id: str,
address: str,
kind: str = "function",
type_score: float = 1.0,
return_is_any: bool = False,
params_total: int = 2,
params_annotated: int = 2,
params_with_any: int = 0,
return_annotation: str | None = "str",
) -> None:
await session.execute(
pg_insert(MusehubIntelType)
.values(
repo_id=repo_id,
address=address,
kind=kind,
type_score=type_score,
return_is_any=return_is_any,
params_total=params_total,
params_annotated=params_annotated,
params_with_any=params_with_any,
return_annotation=return_annotation,
ref=_REF,
)
.on_conflict_do_update(
index_elements=["repo_id", "address"],
set_={
"type_score": type_score,
"return_annotation": return_annotation,
},
)
)
@pytest_asyncio.fixture
async def type_repo(db_session: AsyncSession) -> MusehubRepo:
"""Repo with a mix of fully-typed, partial, untyped, and any-polluted symbols."""
repo = await create_repo(db_session, owner="typeuser", slug="type-e2e")
rid = str(repo.repo_id)
# fully typed (score=1.0)
await _insert_type_row(db_session, rid, "src/a.py::fn_full",
type_score=1.0, return_annotation="str")
# partial (score=0.75)
await _insert_type_row(db_session, rid, "src/b.py::fn_partial",
kind="method", type_score=0.75,
params_total=4, params_annotated=3,
return_annotation="None")
# untyped (score=0.0)
await _insert_type_row(db_session, rid, "src/c.py::fn_untyped",
type_score=0.0, params_annotated=0,
return_annotation=None)
# any-polluted (has params_with_any)
await _insert_type_row(db_session, rid, "src/d.py::fn_any",
type_score=0.75, return_is_any=False,
params_with_any=1, return_annotation="Any")
await db_session.commit()
return repo
# ─────────────────────────────────────────────────────────────────────────────
# Layer T1 — DB extension
# ─────────────────────────────────────────────────────────────────────────────
class TestDBExtension:
def test_T01_return_annotation_column_exists_on_model(self) -> None:
"""MusehubIntelType must have a return_annotation mapped column."""
cols = {c.key for c in sa.inspect(MusehubIntelType).mapper.column_attrs}
assert "return_annotation" in cols, (
"return_annotation column missing from MusehubIntelType"
)
def test_T02_return_annotation_is_nullable(self) -> None:
"""return_annotation must be nullable (existing rows have no value)."""
col = MusehubIntelType.__table__.c["return_annotation"]
assert col.nullable, "return_annotation must be nullable"
@pytest.mark.asyncio
async def test_T03_return_annotation_stored_and_retrieved(
self, db_session: AsyncSession
) -> None:
"""Inserting a row with return_annotation persists and round-trips."""
repo = await create_repo(db_session, owner="typeuser", slug="t03")
await _insert_type_row(db_session, str(repo.repo_id),
"src/x.py::fn", return_annotation="list[str]")
await db_session.commit()
row = await db_session.scalar(
sa.select(MusehubIntelType).where(
MusehubIntelType.repo_id == str(repo.repo_id),
MusehubIntelType.address == "src/x.py::fn",
)
)
assert row is not None
assert row.return_annotation == "list[str]"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T2 — Provider batch performance
# ─────────────────────────────────────────────────────────────────────────────
class TestProviderBatch:
@pytest.mark.asyncio
async def test_T04_type_provider_issues_one_sql_per_chunk(
self, db_session: AsyncSession
) -> None:
"""TypeProvider must use batch upsert, not one execute per symbol."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="typeuser", slug="t04")
ref = _REF
symbols = [
{
"address": f"src/m{i}.py::fn",
"kind": "function",
"return_annotation": "str",
"return_is_any": False,
"params_total": 1,
"params_annotated": 1,
"params_with_any": 0,
"type_score": 1.0,
}
for i in range(50)
]
muse_out = __import__("json").dumps({"symbols": symbols})
execute_calls: list[sa.Executable] = []
original_execute = db_session.execute
async def counting_execute(stmt: sa.Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]:
execute_calls.append(stmt)
return await original_execute(stmt, *args, **kwargs)
with patch("asyncio.create_subprocess_exec",
return_value=_mock_process(muse_out)):
db_session.execute = counting_execute # type: ignore[method-assign]
await _PROVIDER_REGISTRY["intel.code.type"].compute(
db_session, repo.repo_id, ref,
{"head": ref, "owner": repo.owner, "slug": repo.slug},
)
db_session.execute = original_execute # type: ignore[method-assign]
# 50 symbols fit in one chunk of 1000 — expect exactly 1 upsert execute
upsert_calls = [
c for c in execute_calls
if hasattr(c, "is_dml") or "INSERT" in str(type(c).__name__).upper()
or "insert" in str(c).lower()
]
assert len(upsert_calls) == 1, (
f"Expected 1 batch upsert execute for 50 symbols, got {len(upsert_calls)}"
)
@pytest.mark.asyncio
async def test_T05_upsert_500_symbols_under_500ms(
self, db_session: AsyncSession
) -> None:
"""Batch-upserting 500 symbols must complete in under 500ms."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="typeuser", slug="t05")
symbols = [
{
"address": f"src/file{i}.py::fn_{i}",
"kind": "function",
"return_annotation": "int",
"return_is_any": False,
"params_total": 2,
"params_annotated": 2,
"params_with_any": 0,
"type_score": 1.0,
}
for i in range(500)
]
muse_out = __import__("json").dumps({"symbols": symbols})
t0 = time.monotonic()
with patch("asyncio.create_subprocess_exec",
return_value=_mock_process(muse_out)):
await _PROVIDER_REGISTRY["intel.code.type"].compute(
db_session, repo.repo_id, _REF,
{"head": _REF, "owner": repo.owner, "slug": repo.slug},
)
elapsed = time.monotonic() - t0
assert elapsed < 0.5, f"500-symbol batch took {elapsed:.3f}s (limit: 0.5s)"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T3 — Route (unit / integration)
# ─────────────────────────────────────────────────────────────────────────────
class TestRoute:
@pytest.mark.asyncio
async def test_T06_returns_200_with_empty_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must return 200 even when musehub_intel_type has no rows."""
await create_repo(db_session, owner="typeuser", slug="t06-empty")
await db_session.commit()
r = await client.get("/typeuser/t06-empty/intel/type")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T07_returns_200_with_data(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Route returns 200 when rows exist."""
r = await client.get(f"/typeuser/type-e2e/intel/type")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T08_summary_stats_match_db_counts(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Coverage fraction and tier counts must be derived from DB, not hardcoded."""
r = await client.get("/typeuser/type-e2e/intel/type")
assert r.status_code == 200
body = r.text
# 1 fully typed out of 4 total → 25.0%
assert "25" in body, "coverage_pct (25%) not found in response"
# 1 untyped symbol
assert "fn_untyped" in body or "1" in body
@pytest.mark.asyncio
async def test_T09_filter_tier_untyped(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""?tier=untyped returns only symbols with type_score < 0.5."""
r = await client.get("/typeuser/type-e2e/intel/type?tier=untyped")
assert r.status_code == 200
assert "fn_untyped" in r.text
assert "fn_full" not in r.text
@pytest.mark.asyncio
async def test_T10_filter_tier_partial(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""?tier=partial returns only symbols with 0.5 ≤ type_score < 1.0."""
r = await client.get("/typeuser/type-e2e/intel/type?tier=partial")
assert r.status_code == 200
assert "fn_partial" in r.text
assert "fn_untyped" not in r.text
assert "fn_full" not in r.text
@pytest.mark.asyncio
async def test_T11_filter_tier_any(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""?tier=any returns only symbols with return_is_any or params_with_any > 0."""
r = await client.get("/typeuser/type-e2e/intel/type?tier=any")
assert r.status_code == 200
assert "fn_any" in r.text
assert "fn_full" not in r.text
@pytest.mark.asyncio
async def test_T12_filter_kind_function(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""?kind=function returns only function-kind symbols."""
r = await client.get("/typeuser/type-e2e/intel/type?kind=function")
assert r.status_code == 200
# fn_full is kind=function; fn_partial is kind=method
assert "fn_full" in r.text
assert "fn_partial" not in r.text
@pytest.mark.asyncio
async def test_T13_default_sort_score_ascending(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Default sort is type_score ASC (worst-typed first)."""
r = await client.get("/typeuser/type-e2e/intel/type")
assert r.status_code == 200
body = r.text
pos_untyped = body.find("fn_untyped")
pos_full = body.find("fn_full")
assert pos_untyped != -1 and pos_full != -1
assert pos_untyped < pos_full, "Untyped symbol must appear before fully-typed"
@pytest.mark.asyncio
async def test_T14_top_param_limits_results(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""?top=20 returns at most 20 symbols even when 25 exist."""
repo = await create_repo(db_session, owner="typeuser", slug="t14-top")
rid = str(repo.repo_id)
for i in range(25):
await _insert_type_row(db_session, rid,
f"src/f{i}.py::fn_{i}", type_score=float(i) / 24)
await db_session.commit()
r = await client.get("/typeuser/t14-top/intel/type?top=20")
assert r.status_code == 200
count = sum(1 for i in range(25) if f"src/f{i}.py::fn_{i}" in r.text)
assert count <= 20, f"Expected ≤20 results for ?top=20, got {count}"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T4 — E2E (HTML body assertions)
# ─────────────────────────────────────────────────────────────────────────────
class TestE2E:
@pytest.mark.asyncio
async def test_T15_coverage_fraction_rendered_as_pct(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Coverage fraction (0.25) must be rendered as a percentage string."""
r = await client.get("/typeuser/type-e2e/intel/type")
assert r.status_code == 200
# 1/4 fully typed = 25%
assert "25" in r.text
@pytest.mark.asyncio
async def test_T16_symbol_address_in_html(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Symbol addresses must appear verbatim in the HTML body."""
r = await client.get("/typeuser/type-e2e/intel/type")
assert r.status_code == 200
assert "src/c.py::fn_untyped" in r.text
@pytest.mark.asyncio
async def test_T17_return_annotation_in_html(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Non-null return_annotation must appear in the rendered HTML."""
r = await client.get("/typeuser/type-e2e/intel/type")
assert r.status_code == 200
assert "list[str]" in r.text or "str" in r.text
@pytest.mark.asyncio
async def test_T18_any_badge_rendered_for_any_polluted_symbol(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Any-pollution indicator must appear for symbols with params_with_any > 0."""
r = await client.get("/typeuser/type-e2e/intel/type")
assert r.status_code == 200
# The any-badge or warning marker must be in the HTML
assert "any" in r.text.lower() or "⚠" in r.text
@pytest.mark.asyncio
async def test_T19_dashboard_card_links_to_type_page(
self, client: AsyncClient, type_repo: MusehubRepo
) -> None:
"""Intel dashboard card must include a link to /intel/type."""
r = await client.get("/typeuser/type-e2e/intel")
assert r.status_code == 200
assert b"/intel/type" in r.content
# ─────────────────────────────────────────────────────────────────────────────
# Layer T5 — State integrity
# ─────────────────────────────────────────────────────────────────────────────
class TestStateIntegrity:
@pytest.mark.asyncio
async def test_T20_push_twice_produces_one_row_per_symbol(
self, db_session: AsyncSession
) -> None:
"""Upserting the same address twice must not create duplicate rows."""
repo = await create_repo(db_session, owner="typeuser", slug="t20-dup")
rid = str(repo.repo_id)
addr = "src/a.py::fn"
for _ in range(2):
await _insert_type_row(db_session, rid, addr, type_score=1.0)
await db_session.commit()
rows = (await db_session.execute(
sa.select(MusehubIntelType).where(
MusehubIntelType.repo_id == rid
)
)).scalars().all()
assert len(rows) == 1, f"Expected 1 row, got {len(rows)} — upsert broken"
@pytest.mark.asyncio
async def test_T21_second_push_overwrites_type_score(
self, db_session: AsyncSession
) -> None:
"""A second push with different type_score must overwrite the first."""
repo = await create_repo(db_session, owner="typeuser", slug="t21-overwrite")
rid = str(repo.repo_id)
addr = "src/a.py::fn"
await _insert_type_row(db_session, rid, addr, type_score=0.5)
await _insert_type_row(db_session, rid, addr, type_score=1.0)
await db_session.commit()
row = await db_session.scalar(
sa.select(MusehubIntelType).where(
MusehubIntelType.repo_id == rid,
MusehubIntelType.address == addr,
)
)
assert row is not None
assert row.type_score == pytest.approx(1.0), (
f"Expected score 1.0 after second push, got {row.type_score}"
)
@pytest.mark.asyncio
async def test_T22_repo_delete_cascades_type_rows(
self, db_session: AsyncSession
) -> None:
"""Deleting the repo must cascade-delete all musehub_intel_type rows."""
from musehub.db.musehub_repo_models import MusehubRepo
repo = await create_repo(db_session, owner="typeuser", slug="t22-cascade")
rid = str(repo.repo_id)
await _insert_type_row(db_session, rid, "src/a.py::fn")
await db_session.commit()
await db_session.delete(repo)
await db_session.commit()
remaining = (await db_session.execute(
sa.select(MusehubIntelType).where(
MusehubIntelType.repo_id == rid
)
)).scalars().all()
assert not remaining, "Cascade delete failed — type rows remain after repo delete"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T6 — Performance
# ─────────────────────────────────────────────────────────────────────────────
class TestPerformance:
@pytest.mark.asyncio
async def test_T23_route_responds_under_200ms_for_10k_symbols(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must respond in < 200ms for a repo with 10,000 symbol rows."""
repo = await create_repo(db_session, owner="typeuser", slug="t23-perf")
rid = str(repo.repo_id)
# Insert 10k rows via direct batch insert
chunk = 1000
for start in range(0, 10_000, chunk):
rows = [
{
"repo_id": rid,
"address": f"src/file{i}.py::fn_{i}",
"kind": "function",
"type_score": 1.0 if i % 3 != 0 else 0.5,
"return_is_any": False,
"params_total": 2,
"params_annotated": 2,
"params_with_any": 0,
"return_annotation": "str",
"ref": _REF,
}
for i in range(start, start + chunk)
]
await db_session.execute(
pg_insert(MusehubIntelType)
.values(rows)
.on_conflict_do_nothing()
)
await db_session.commit()
t0 = time.monotonic()
r = await client.get(f"/typeuser/t23-perf/intel/type")
elapsed = time.monotonic() - t0
assert r.status_code == 200
assert elapsed < 0.2, f"Route took {elapsed:.3f}s for 10k symbols (limit: 0.2s)"
@pytest.mark.asyncio
async def test_T24_db_query_uses_repo_index(
self, db_session: AsyncSession
) -> None:
"""SELECT on musehub_intel_type must use ix_intel_type_repo index."""
explain = await db_session.execute(
sa.text(
"EXPLAIN SELECT * FROM musehub_intel_type WHERE repo_id = 'x'"
)
)
plan = " ".join(row[0] for row in explain.all())
assert "ix_intel_type_repo" in plan or "Index" in plan, (
f"Query plan does not use ix_intel_type_repo:\n{plan}"
)
@pytest.mark.asyncio
async def test_T25_batch_upsert_500_symbols_under_500ms(
self, db_session: AsyncSession
) -> None:
"""Direct batch upsert of 500 rows must complete in < 500ms wall time."""
repo = await create_repo(db_session, owner="typeuser", slug="t25-batch")
rid = str(repo.repo_id)
rows = [
{
"repo_id": rid,
"address": f"src/f{i}.py::fn",
"kind": "function",
"type_score": 0.9,
"return_is_any": False,
"params_total": 1,
"params_annotated": 1,
"params_with_any": 0,
"return_annotation": None,
"ref": _REF,
}
for i in range(500)
]
t0 = time.monotonic()
await db_session.execute(
pg_insert(MusehubIntelType)
.values(rows)
.on_conflict_do_nothing()
)
await db_session.commit()
elapsed = time.monotonic() - t0
assert elapsed < 0.5, f"500-row batch took {elapsed:.3f}s (limit: 0.5s)"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T7 — Security
# ─────────────────────────────────────────────────────────────────────────────
class TestSecurity:
@pytest.mark.asyncio
async def test_T26_xss_in_address_is_escaped(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""XSS payload in address must be HTML-escaped in the response."""
repo = await create_repo(db_session, owner="typeuser", slug="t26-xss")
rid = str(repo.repo_id)
xss = ''
# Truncate to fit VARCHAR(512) and make it a valid-ish address
await _insert_type_row(db_session, rid,
f"src/x.py::{xss[:40]}", type_score=0.0)
await db_session.commit()
r = await client.get("/typeuser/t26-xss/intel/type")
assert r.status_code == 200
# Jinja2 autoescape must convert