"""API Surface dashboard — full 7-tier test suite (issue #19).
Tests are written TDD-first: all tests in this file must be RED before
Phase 3–5 implementation begins, then GREEN after.
Tiers:
T01–T03 Layer T1 — DB model (composite PK, nullable fields, cascade)
T04–T06 Layer T2 — Provider batch performance
T07–T15 Layer T3 — Route (unit / integration)
T16–T19 Layer T4 — E2E (HTML body assertions)
T20–T22 Layer T5 — State integrity
T23–T25 Layer T6 — Performance
T26–T30 Layer T7 — Security
"""
from __future__ import annotations
import time
from unittest.mock import AsyncMock, patch
import typing
import pytest
import sqlalchemy as sa
from httpx import AsyncClient
from sqlalchemy.engine import CursorResult
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.db.musehub_intel_models import MusehubIntelApiSurface
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef
from musehub.types.json_types import JSONObject
from tests.factories import create_repo
from muse.core.types import long_id
_REF = long_id("b" * 64)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _insert_as_row(
session: AsyncSession,
repo_id: str,
address: str,
kind: str = "function",
signature_id: str | None = None,
visibility: str = "public",
ref: str = _REF,
) -> None:
"""Upsert one row into musehub_intel_api_surface."""
await session.execute(
pg_insert(MusehubIntelApiSurface)
.values(
repo_id=repo_id,
address=address,
kind=kind,
signature_id=signature_id,
visibility=visibility,
ref=ref,
)
.on_conflict_do_update(
index_elements=["repo_id", "address"],
set_={
"kind": kind,
"signature_id": signature_id,
"visibility": visibility,
"ref": ref,
},
)
)
import pytest_asyncio
@pytest_asyncio.fixture
async def as_repo(db_session: AsyncSession) -> MusehubRepo:
"""Repo seeded with one symbol of each kind."""
repo = await create_repo(db_session, owner="asuser", slug="as-e2e")
rid = str(repo.repo_id)
await _insert_as_row(db_session, rid, "src/billing.py::compute_total",
kind="function")
await _insert_as_row(db_session, rid, "src/billing.py::async_fetch",
kind="async_function")
await _insert_as_row(db_session, rid, "src/models.py::UserRecord",
kind="class")
await _insert_as_row(db_session, rid, "src/models.py::UserRecord.save",
kind="method")
await _insert_as_row(db_session, rid, "src/models.py::UserRecord.async_load",
kind="async_method")
await db_session.commit()
return repo
# ─────────────────────────────────────────────────────────────────────────────
# Layer T1 — DB model
# ─────────────────────────────────────────────────────────────────────────────
class TestDBModel:
def test_T01_model_has_required_columns(self) -> None:
"""MusehubIntelApiSurface must declare all expected mapped columns."""
cols = {c.key for c in sa.inspect(MusehubIntelApiSurface).mapper.column_attrs}
for required in ("repo_id", "address", "kind", "signature_id", "visibility", "ref"):
assert required in cols, f"Column '{required}' missing from MusehubIntelApiSurface"
def test_T02_signature_id_is_nullable(self) -> None:
"""signature_id must be nullable — not all symbols have a signature object."""
col = MusehubIntelApiSurface.__table__.c["signature_id"]
assert col.nullable, "signature_id must be nullable"
@pytest.mark.asyncio
async def test_T03_row_insert_and_cascade_delete(
self, db_session: AsyncSession
) -> None:
"""Row inserts cleanly; deleting the repo cascades to api_surface rows."""
repo = await create_repo(db_session, owner="asuser", slug="t03-cascade")
rid = str(repo.repo_id)
await _insert_as_row(db_session, rid, "src/x.py::fn")
await db_session.commit()
# row present
row = await db_session.scalar(
sa.select(MusehubIntelApiSurface).where(
MusehubIntelApiSurface.repo_id == rid,
MusehubIntelApiSurface.address == "src/x.py::fn",
)
)
assert row is not None, "Row not found after insert"
# cascade delete
await db_session.delete(repo)
await db_session.commit()
remaining = (await db_session.execute(
sa.select(MusehubIntelApiSurface).where(
MusehubIntelApiSurface.repo_id == rid
)
)).scalars().all()
assert not remaining, "Cascade delete failed — api_surface rows remain after repo delete"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T2 — Provider batch performance
# ─────────────────────────────────────────────────────────────────────────────
async def _seed_snapshot(
session: AsyncSession,
repo_id: str,
manifest: dict[str, str],
) -> str:
"""Insert a MusehubCommit + MusehubSnapshot and return the snapshot_id."""
import msgpack
from datetime import datetime, timezone
snap_id = long_id("c" * 64)
commit_id = long_id("d" * 64)
await session.execute(
pg_insert(MusehubSnapshot)
.values(
snapshot_id=snap_id,
directories=[],
manifest_blob=msgpack.packb(manifest),
entry_count=len(manifest),
created_at=datetime.now(timezone.utc),
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubSnapshotRef)
.values(repo_id=repo_id, snapshot_id=snap_id)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommit)
.values(
commit_id=commit_id,
branch="dev",
parent_ids=[],
message="test",
author="asuser",
timestamp=datetime(2026, 1, 1, tzinfo=timezone.utc),
snapshot_id=snap_id,
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommitRef)
.values(repo_id=repo_id, commit_id=commit_id)
.on_conflict_do_nothing()
)
# Seed MusehubObject rows so session.get(MusehubObject, oid) returns a row,
# allowing the provider to proceed to _read_obj_bytes → parse_symbols.
for path, oid in manifest.items():
await session.execute(
pg_insert(MusehubObject)
.values(
object_id=oid,
path=path,
size_bytes=16,
storage_uri=f"mem://{oid}",
)
.on_conflict_do_nothing(index_elements=["object_id"])
)
await session.commit()
return snap_id
def _fake_tree(n: int, prefix: str = "fn") -> JSONObject:
"""Return a SymbolTree dict with *n* public function symbols."""
return {
f"src/file.py::{prefix}_{i}": {
"kind": "function",
"name": f"{prefix}_{i}",
"qualified_name": f"{prefix}_{i}",
"content_id": long_id("a" * 64),
"body_hash": long_id("b" * 64),
"signature_id": long_id("c" * 64),
"metadata_id": "",
"canonical_key": f"src/file.py##function#{prefix}_{i}#1",
"lineno": i + 1,
"end_lineno": i + 2,
}
for i in range(n)
}
class TestProviderBatch:
@pytest.mark.asyncio
async def test_T04_provider_issues_one_sql_per_chunk(
self, db_session: AsyncSession
) -> None:
"""ApiSurfaceProvider must batch-upsert, not execute one statement per symbol."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="asuser", slug="t04-batch")
rid = str(repo.repo_id)
await _seed_snapshot(db_session, rid, {"src/file.py": long_id("e" * 64)})
execute_calls: list[sa.Executable] = []
original_execute = db_session.execute
async def counting_execute(stmt: sa.Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]:
execute_calls.append(stmt)
return await original_execute(stmt, *args, **kwargs)
with (
patch("musehub.storage.backends.read_object_bytes",
new=AsyncMock(return_value=b"# placeholder")),
patch("musehub.services.musehub_intel_providers.parse_symbols",
return_value=_fake_tree(50)),
):
db_session.execute = counting_execute # type: ignore[method-assign]
await _PROVIDER_REGISTRY["intel.code.api_surface"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
db_session.execute = original_execute # type: ignore[method-assign]
# 50 symbols fit in one chunk — expect exactly 1 INSERT execute
insert_calls = [
c for c in execute_calls
if "insert" in str(type(c).__name__).lower() or "insert" in str(c).lower()
]
assert len(insert_calls) == 1, (
f"Expected 1 batch upsert for 50 symbols, got {len(insert_calls)}"
)
@pytest.mark.asyncio
async def test_T05_provider_uses_ceil_n_over_1000_sql_calls_for_2500_symbols(
self, db_session: AsyncSession
) -> None:
"""2,500 symbols → exactly 3 INSERT statements (ceil(2500/1000) = 3)."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="asuser", slug="t05-chunks")
rid = str(repo.repo_id)
await _seed_snapshot(db_session, rid, {"src/big.py": long_id("f" * 64)})
execute_calls: list[sa.Executable] = []
original_execute = db_session.execute
async def counting_execute(stmt: sa.Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]:
execute_calls.append(stmt)
return await original_execute(stmt, *args, **kwargs)
with (
patch("musehub.storage.backends.read_object_bytes",
new=AsyncMock(return_value=b"# placeholder")),
patch("musehub.services.musehub_intel_providers.parse_symbols",
return_value=_fake_tree(2500)),
):
db_session.execute = counting_execute # type: ignore[method-assign]
result = await _PROVIDER_REGISTRY["intel.code.api_surface"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
db_session.execute = original_execute # type: ignore[method-assign]
insert_calls = [
c for c in execute_calls
if "insert" in str(type(c).__name__).lower() or "insert" in str(c).lower()
]
assert len(insert_calls) == 3, (
f"2500 symbols should produce 3 INSERT chunks, got {len(insert_calls)}"
)
assert result == [("intel.code.api_surface", {"count": 2500})]
@pytest.mark.asyncio
async def test_T06_empty_symbols_returns_empty_list(
self, db_session: AsyncSession
) -> None:
"""Provider must return [] and issue no INSERTs when parse_symbols yields nothing."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="asuser", slug="t06-empty")
rid = str(repo.repo_id)
await _seed_snapshot(db_session, rid, {"src/empty.py": long_id("a" * 64)})
execute_calls: list[sa.Executable] = []
original_execute = db_session.execute
async def counting_execute(stmt: sa.Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]:
execute_calls.append(stmt)
return await original_execute(stmt, *args, **kwargs)
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"# no public symbols")
with (
patch("musehub.services.musehub_intel_providers.get_backend",
return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
return_value={}),
):
db_session.execute = counting_execute # type: ignore[method-assign]
result = await _PROVIDER_REGISTRY["intel.code.api_surface"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
db_session.execute = original_execute # type: ignore[method-assign]
assert result == [], "Empty symbols list must return []"
insert_calls = [c for c in execute_calls if "insert" in str(c).lower()]
assert len(insert_calls) == 0, "No DB writes expected for empty symbol list"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T3 — Route (unit / integration)
# ─────────────────────────────────────────────────────────────────────────────
class TestRoute:
@pytest.mark.asyncio
async def test_T07_returns_200_with_empty_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must return 200 even when musehub_intel_api_surface has no rows."""
await create_repo(db_session, owner="asuser", slug="t07-empty")
await db_session.commit()
r = await client.get("/asuser/t07-empty/intel/api-surface")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T08_returns_200_with_data(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""Route returns 200 when rows exist."""
r = await client.get("/asuser/as-e2e/intel/api-surface")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T09_kind_filter_function_only(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""?kind=function returns only function symbols, not class or method."""
r = await client.get("/asuser/as-e2e/intel/api-surface?kind=function")
assert r.status_code == 200
assert "compute_total" in r.text
assert "UserRecord.save" not in r.text
assert "UserRecord" not in r.text or "compute_total" in r.text
@pytest.mark.asyncio
async def test_T10_kind_filter_class_only(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""?kind=class returns only class symbols."""
r = await client.get("/asuser/as-e2e/intel/api-surface?kind=class")
assert r.status_code == 200
assert "UserRecord" in r.text
assert "compute_total" not in r.text
@pytest.mark.asyncio
async def test_T11_kind_filter_async_function(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""?kind=async_function returns only async_function symbols."""
r = await client.get("/asuser/as-e2e/intel/api-surface?kind=async_function")
assert r.status_code == 200
assert "async_fetch" in r.text
assert "compute_total" not in r.text
@pytest.mark.asyncio
async def test_T12_unknown_kind_coerced_to_all(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""?kind=garbage must return 200 (treated as no filter), not 400/500."""
r = await client.get("/asuser/as-e2e/intel/api-surface?kind=garbage")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T13_top_param_limits_results(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""?top=20 returns at most 20 symbols even when 25 exist."""
repo = await create_repo(db_session, owner="asuser", slug="t13-top")
rid = str(repo.repo_id)
for i in range(25):
await _insert_as_row(db_session, rid,
f"src/f{i}.py::fn_{i}", kind="function")
await db_session.commit()
r = await client.get("/asuser/t13-top/intel/api-surface?top=20")
assert r.status_code == 200
count = sum(1 for i in range(25) if f"src/f{i}.py::fn_{i}" in r.text)
assert count <= 20, f"Expected ≤20 results for ?top=20, got {count}"
@pytest.mark.asyncio
async def test_T14_top_invalid_string_returns_422(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""?top=notanumber must be rejected with 422 (FastAPI type validation)."""
r = await client.get("/asuser/as-e2e/intel/api-surface?top=notanumber")
assert r.status_code == 422
@pytest.mark.asyncio
async def test_T15_unknown_repo_returns_404(
self, client: AsyncClient
) -> None:
"""Non-existent repo path must return 404, not 200 or 500."""
r = await client.get("/nobody/no-such-repo/intel/api-surface")
assert r.status_code in (403, 404)
# ─────────────────────────────────────────────────────────────────────────────
# Layer T4 — E2E (HTML body assertions)
# ─────────────────────────────────────────────────────────────────────────────
class TestE2E:
@pytest.mark.asyncio
async def test_T16_total_count_chip_shows_correct_value(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""Stat chip for Total must reflect the DB row count (5 symbols seeded)."""
r = await client.get("/asuser/as-e2e/intel/api-surface")
assert r.status_code == 200
# 5 symbols seeded in fixture; total chip must contain "5"
assert "5" in r.text
@pytest.mark.asyncio
async def test_T17_kind_breakdown_chips_present(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""Kind breakdown stat chips must appear for all five kinds."""
r = await client.get("/asuser/as-e2e/intel/api-surface")
assert r.status_code == 200
body = r.text.lower()
for kind_label in ("function", "class", "method"):
assert kind_label in body, f"Kind label '{kind_label}' missing from page"
@pytest.mark.asyncio
async def test_T18_symbol_address_split_rendered(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""Symbol file and name parts must both appear in the HTML."""
r = await client.get("/asuser/as-e2e/intel/api-surface")
assert r.status_code == 200
# file part
assert "src/billing.py" in r.text
# name part
assert "compute_total" in r.text
@pytest.mark.asyncio
async def test_T19_dashboard_card_links_to_api_surface_page(
self, client: AsyncClient, as_repo: MusehubRepo
) -> None:
"""Intel dashboard must include a link to /intel/api-surface."""
r = await client.get("/asuser/as-e2e/intel")
assert r.status_code == 200
assert b"/intel/api-surface" in r.content
# ─────────────────────────────────────────────────────────────────────────────
# Layer T5 — State integrity
# ─────────────────────────────────────────────────────────────────────────────
class TestStateIntegrity:
@pytest.mark.asyncio
async def test_T20_double_upsert_produces_one_row(
self, db_session: AsyncSession
) -> None:
"""Upserting the same address twice must not create duplicate rows."""
repo = await create_repo(db_session, owner="asuser", slug="t20-dup")
rid = str(repo.repo_id)
addr = "src/a.py::fn"
for _ in range(2):
await _insert_as_row(db_session, rid, addr, kind="function")
await db_session.commit()
rows = (await db_session.execute(
sa.select(MusehubIntelApiSurface).where(
MusehubIntelApiSurface.repo_id == rid
)
)).scalars().all()
assert len(rows) == 1, f"Expected 1 row, got {len(rows)} — upsert created duplicates"
@pytest.mark.asyncio
async def test_T21_second_upsert_overwrites_kind(
self, db_session: AsyncSession
) -> None:
"""A second upsert with a different kind must overwrite the first."""
repo = await create_repo(db_session, owner="asuser", slug="t21-overwrite")
rid = str(repo.repo_id)
addr = "src/a.py::Foo"
await _insert_as_row(db_session, rid, addr, kind="class")
await _insert_as_row(db_session, rid, addr, kind="function")
await db_session.commit()
row = await db_session.scalar(
sa.select(MusehubIntelApiSurface).where(
MusehubIntelApiSurface.repo_id == rid,
MusehubIntelApiSurface.address == addr,
)
)
assert row is not None
assert row.kind == "function", (
f"Expected kind='function' after second upsert, got '{row.kind}'"
)
@pytest.mark.asyncio
async def test_T22_cross_repo_isolation(
self, db_session: AsyncSession
) -> None:
"""Symbols from repo A must not appear under repo B's page URL."""
repo_a = await create_repo(db_session, owner="asuser", slug="t22-repo-a")
repo_b = await create_repo(db_session, owner="asuser", slug="t22-repo-b")
await _insert_as_row(db_session, str(repo_a.repo_id),
"src/secret.py::private_fn", kind="function")
await db_session.commit()
rows_b = (await db_session.execute(
sa.select(MusehubIntelApiSurface).where(
MusehubIntelApiSurface.repo_id == str(repo_b.repo_id)
)
)).scalars().all()
assert not rows_b, "Repo B must not see Repo A's api_surface symbols"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T6 — Performance
# ─────────────────────────────────────────────────────────────────────────────
class TestPerformance:
@pytest.mark.asyncio
async def test_T23_route_responds_under_200ms_for_5k_symbols(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must respond in < 200ms for a repo with 5,000 symbol rows."""
repo = await create_repo(db_session, owner="asuser", slug="t23-perf")
rid = str(repo.repo_id)
chunk_size = 1000
kinds = ["function", "async_function", "class", "method", "async_method"]
for start in range(0, 5_000, chunk_size):
rows = [
{
"repo_id": rid,
"address": f"src/file{i}.py::sym_{i}",
"kind": kinds[i % len(kinds)],
"signature_id": None,
"visibility": "public",
"ref": _REF,
}
for i in range(start, start + chunk_size)
]
await db_session.execute(
pg_insert(MusehubIntelApiSurface)
.values(rows)
.on_conflict_do_nothing()
)
await db_session.commit()
t0 = time.monotonic()
r = await client.get("/asuser/t23-perf/intel/api-surface")
elapsed = time.monotonic() - t0
assert r.status_code == 200
assert elapsed < 0.2, f"Route took {elapsed:.3f}s for 5k symbols (limit: 0.2s)"
@pytest.mark.asyncio
async def test_T24_db_query_uses_repo_index(
self, db_session: AsyncSession
) -> None:
"""SELECT on musehub_intel_api_surface must use ix_intel_api_surface_repo index."""
explain = await db_session.execute(
sa.text(
"EXPLAIN SELECT * FROM musehub_intel_api_surface WHERE repo_id = 'x'"
)
)
plan = " ".join(row[0] for row in explain.all())
assert "ix_intel_api_surface_repo" in plan or "Index" in plan, (
f"Query plan does not use ix_intel_api_surface_repo:\n{plan}"
)
@pytest.mark.asyncio
async def test_T25_batch_upsert_1000_rows_under_500ms(
self, db_session: AsyncSession
) -> None:
"""Direct batch upsert of 1,000 rows must complete in < 500ms wall time."""
repo = await create_repo(db_session, owner="asuser", slug="t25-batch")
rid = str(repo.repo_id)
rows = [
{
"repo_id": rid,
"address": f"src/f{i}.py::fn",
"kind": "function",
"signature_id": None,
"visibility": "public",
"ref": _REF,
}
for i in range(1000)
]
t0 = time.monotonic()
await db_session.execute(
pg_insert(MusehubIntelApiSurface)
.values(rows)
.on_conflict_do_nothing()
)
await db_session.commit()
elapsed = time.monotonic() - t0
assert elapsed < 0.5, f"1000-row batch took {elapsed:.3f}s (limit: 0.5s)"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T7 — Security
# ─────────────────────────────────────────────────────────────────────────────
class TestSecurity:
@pytest.mark.asyncio
async def test_T26_xss_in_address_is_escaped(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""XSS payload in address must be HTML-escaped in the response."""
repo = await create_repo(db_session, owner="asuser", slug="t26-xss")
rid = str(repo.repo_id)
xss = ""
await _insert_as_row(db_session, rid, f"src/x.py::{xss[:40]}")
await db_session.commit()
r = await client.get("/asuser/t26-xss/intel/api-surface")
assert r.status_code == 200
assert "