"""Languages intel — full 7-tier test suite (issue #20).
Tests are written TDD-first: all tests must be RED before Phase 4–7
implementation begins, then GREEN after.
Tiers
-----
T01–T04 Layer T1 — DB model (columns, nullable, kinds_json, cascade)
T05–T09 Layer T2 — Provider (no subprocess, file counts, kinds, pct, empty)
T10–T17 Layer T3 — Route (200, empty state, 404, sort, filter, pagination)
T18–T21 Layer T4 — E2E HTML (stat chips, bar width, kind chips, dashboard link)
T22–T24 Layer T5 — Data integrity (no duplicates, upsert overwrite, cross-repo)
T25–T27 Layer T6 — Performance (provider speed, route speed, index check)
T28–T30 Layer T7 — Security (XSS escape, SQL injection, no 500 on bad input)
"""
from __future__ import annotations
import time
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
import sqlalchemy as sa
from httpx import AsyncClient
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.db.musehub_intel_models import MusehubIntelLanguages
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef
from musehub.types.json_types import JSONObject
from tests.factories import create_repo
from muse.core.types import long_id
_REF = long_id("b" * 64)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
async def _insert_lang_row(
session: AsyncSession,
repo_id: str,
language: str,
file_count: int = 1,
symbol_count: int = 0,
pct: float = 0.0,
kinds_json: JSONObject | None = None,
ref: str = _REF,
) -> None:
"""Upsert one row into musehub_intel_languages."""
await session.execute(
pg_insert(MusehubIntelLanguages)
.values(
repo_id=repo_id,
language=language,
file_count=file_count,
symbol_count=symbol_count,
pct=pct,
kinds_json=kinds_json,
ref=ref,
)
.on_conflict_do_update(
index_elements=["repo_id", "language"],
set_={
"file_count": file_count,
"symbol_count": symbol_count,
"pct": pct,
"kinds_json": kinds_json,
"ref": ref,
},
)
)
async def _seed_snapshot(
session: AsyncSession,
repo_id: str,
manifest: dict[str, str],
) -> str:
"""Insert a MusehubCommit + MusehubSnapshot, return snapshot_id."""
import msgpack
snap_id = long_id("c" * 64)
commit_id = long_id("d" * 64)
await session.execute(
pg_insert(MusehubSnapshot)
.values(
snapshot_id = snap_id,
directories = [],
manifest_blob= msgpack.packb(manifest),
entry_count = len(manifest),
created_at = datetime(2026, 1, 1, tzinfo=timezone.utc),
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubSnapshotRef)
.values(repo_id=repo_id, snapshot_id=snap_id)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommit)
.values(
commit_id = commit_id,
branch = "dev",
parent_ids = [],
message = "test",
author = "lnuser",
timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc),
snapshot_id = snap_id,
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommitRef)
.values(repo_id=repo_id, commit_id=commit_id)
.on_conflict_do_nothing()
)
await session.commit()
return snap_id
def _fake_tree(n: int, kinds: list[str] | None = None) -> JSONObject:
"""Return a SymbolTree dict with n public symbols spread across kinds."""
_kinds = kinds or ["function", "class", "method"]
return {
f"src/mod.py::sym_{i}": {
"kind": _kinds[i % len(_kinds)],
"name": f"sym_{i}",
"qualified_name": f"sym_{i}",
"content_id": long_id("a" * 64),
"body_hash": long_id("b" * 64),
"signature_id": long_id("c" * 64),
"metadata_id": "",
"canonical_key": f"src/mod.py##function#sym_{i}#1",
"lineno": i + 1,
"end_lineno": i + 2,
}
for i in range(n)
}
@pytest_asyncio.fixture
async def ln_repo(db_session: AsyncSession) -> MusehubRepo:
"""Repo seeded with Python, TypeScript, and CSS language rows."""
repo = await create_repo(db_session, owner="lnuser", slug="ln-e2e")
rid = str(repo.repo_id)
await _insert_lang_row(
db_session, rid, "Python",
file_count=30, symbol_count=1500, pct=75.0,
kinds_json={"function": 800, "class": 400, "method": 300},
)
await _insert_lang_row(
db_session, rid, "TypeScript",
file_count=10, symbol_count=400, pct=20.0,
kinds_json={"function": 300, "class": 100},
)
await _insert_lang_row(
db_session, rid, "CSS",
file_count=5, symbol_count=0, pct=0.0,
kinds_json=None,
)
await db_session.commit()
return repo
# ─────────────────────────────────────────────────────────────────────────────
# Layer T1 — DB model
# ─────────────────────────────────────────────────────────────────────────────
class TestDBModel:
def test_T01_model_has_all_required_columns(self) -> None:
"""MusehubIntelLanguages must declare all expected mapped columns."""
cols = {
c.key
for c in sa.inspect(MusehubIntelLanguages).mapper.column_attrs
}
for required in (
"repo_id", "language", "file_count", "symbol_count", "pct",
"kinds_json", "ref",
):
assert required in cols, (
f"Column '{required}' missing from MusehubIntelLanguages"
)
def test_T02_kinds_json_is_nullable(self) -> None:
"""kinds_json must be nullable — non-code languages have no symbol breakdown."""
col = MusehubIntelLanguages.__table__.c["kinds_json"]
assert col.nullable, "kinds_json must be nullable"
def test_T03_composite_pk_is_repo_id_plus_language(self) -> None:
"""Primary key must be (repo_id, language) — no single-column PK."""
pk_cols = {
c.name
for c in MusehubIntelLanguages.__table__.primary_key.columns
}
assert pk_cols == {"repo_id", "language"}, (
f"Expected PK {{repo_id, language}}, got {pk_cols}"
)
@pytest.mark.asyncio
async def test_T04_cascade_delete_removes_lang_rows(
self, db_session: AsyncSession
) -> None:
"""Deleting a repo must cascade-delete all its language rows."""
repo = await create_repo(db_session, owner="lnuser", slug="t04-cascade")
rid = str(repo.repo_id)
await _insert_lang_row(db_session, rid, "Python", file_count=3)
await db_session.commit()
row = await db_session.scalar(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == rid,
MusehubIntelLanguages.language == "Python",
)
)
assert row is not None, "Row not found after insert"
await db_session.delete(repo)
await db_session.commit()
remaining = (await db_session.execute(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == rid
)
)).scalars().all()
assert not remaining, (
"Cascade delete failed — languages rows remain after repo delete"
)
# ─────────────────────────────────────────────────────────────────────────────
# Layer T2 — Provider
# ─────────────────────────────────────────────────────────────────────────────
class TestProvider:
@pytest.mark.asyncio
async def test_T05_provider_does_not_use_subprocess(
self, db_session: AsyncSession
) -> None:
"""LanguagesProvider must never call asyncio.create_subprocess_exec or _run_muse."""
import inspect
from musehub.services import musehub_intel_providers as _mod
src = inspect.getsource(_mod.LanguagesProvider.compute)
assert "create_subprocess" not in src, (
"LanguagesProvider.compute calls create_subprocess — forbidden"
)
assert "_run_muse" not in src, (
"LanguagesProvider.compute calls _run_muse — forbidden"
)
@pytest.mark.asyncio
async def test_T06_provider_counts_files_per_language(
self, db_session: AsyncSession
) -> None:
"""Provider must count files per language via language_of(), not subprocess."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="lnuser", slug="t06-files")
rid = str(repo.repo_id)
manifest = {
"src/a.py": long_id("e" * 64),
"src/b.py": long_id("f" * 64),
"src/app.ts": long_id("1" * 64),
"static/main.css": long_id("2" * 64),
}
await _seed_snapshot(db_session, rid, manifest)
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"# placeholder")
with (
patch("musehub.services.musehub_intel_providers.get_backend",
return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
return_value={}),
patch("musehub.services.musehub_intel_providers.language_of",
side_effect=lambda p: (
"Python" if p.endswith(".py") else
"TypeScript" if p.endswith(".ts") else
"CSS"
)),
):
result = await _PROVIDER_REGISTRY["intel.code.languages"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
assert result == [("intel.code.languages", {"count": 3})], (
f"Expected 3 language rows, got: {result}"
)
rows = (await db_session.execute(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == rid
)
)).scalars().all()
by_lang = {r.language: r for r in rows}
assert by_lang["Python"].file_count == 2
assert by_lang["TypeScript"].file_count == 1
assert by_lang["CSS"].file_count == 1
@pytest.mark.asyncio
async def test_T07_provider_records_kinds_json(
self, db_session: AsyncSession
) -> None:
"""kinds_json must contain kind → count breakdown, imports excluded."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="lnuser", slug="t07-kinds")
rid = str(repo.repo_id)
await _seed_snapshot(db_session, rid, {"src/x.py": long_id("3" * 64)})
tree = _fake_tree(6, kinds=["function", "class", "import"])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"# placeholder")
with (
patch("musehub.services.musehub_intel_providers.get_backend",
return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
return_value=tree),
patch("musehub.services.musehub_intel_providers.language_of",
return_value="Python"),
):
await _PROVIDER_REGISTRY["intel.code.languages"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
row = await db_session.scalar(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == rid,
MusehubIntelLanguages.language == "Python",
)
)
assert row is not None
assert row.kinds_json is not None, "kinds_json must not be None when symbols exist"
assert "import" not in row.kinds_json, (
"import pseudo-symbols must be excluded from kinds_json"
)
assert set(row.kinds_json.keys()) <= {"function", "class", "method",
"async_function", "async_method"}, (
f"Unexpected kinds in kinds_json: {set(row.kinds_json.keys())}"
)
@pytest.mark.asyncio
async def test_T08_provider_pct_sums_correctly(
self, db_session: AsyncSession
) -> None:
"""Sum of pct across all languages must be ≈ 100 when all files have symbols."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="lnuser", slug="t08-pct")
rid = str(repo.repo_id)
manifest = {
"src/a.py": long_id("4" * 64),
"src/b.ts": long_id("5" * 64),
}
await _seed_snapshot(db_session, rid, manifest)
py_tree = _fake_tree(3, kinds=["function"])
ts_tree = _fake_tree(1, kinds=["function"])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"# placeholder")
def _fake_parse(src: bytes, path: str) -> JSONObject:
return py_tree if path.endswith(".py") else ts_tree
with (
patch("musehub.services.musehub_intel_providers.get_backend",
return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=_fake_parse),
patch("musehub.services.musehub_intel_providers.language_of",
side_effect=lambda p: "Python" if p.endswith(".py") else "TypeScript"),
):
await _PROVIDER_REGISTRY["intel.code.languages"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
rows = (await db_session.execute(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == rid
)
)).scalars().all()
total_pct = sum(r.pct for r in rows)
assert abs(total_pct - 100.0) < 0.01, (
f"pct values do not sum to 100 (sum={total_pct:.2f})"
)
@pytest.mark.asyncio
async def test_T09_provider_returns_empty_when_no_snapshot(
self, db_session: AsyncSession
) -> None:
"""Provider must return [] without crashing when the repo has no snapshot."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="lnuser", slug="t09-nosnap")
rid = str(repo.repo_id)
await db_session.commit()
result = await _PROVIDER_REGISTRY["intel.code.languages"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
assert result == [], f"Expected [] when no snapshot exists, got {result}"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T3 — Route
# ─────────────────────────────────────────────────────────────────────────────
class TestRoute:
@pytest.mark.asyncio
async def test_T10_returns_200_with_language_data(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""Route must return 200 when language rows exist."""
r = await client.get("/lnuser/ln-e2e/intel/languages")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T11_returns_200_with_empty_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must return 200 even when musehub_intel_languages has no rows."""
await create_repo(db_session, owner="lnuser", slug="t11-empty")
await db_session.commit()
r = await client.get("/lnuser/t11-empty/intel/languages")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T12_unknown_repo_returns_404(
self, client: AsyncClient
) -> None:
"""Non-existent repo path must return 403 or 404, not 200 or 500."""
r = await client.get("/nobody/no-such-repo/intel/languages")
assert r.status_code in (403, 404)
@pytest.mark.asyncio
async def test_T13_sort_by_files_param_accepted(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""?sort=files must return 200 and not raise an error."""
r = await client.get("/lnuser/ln-e2e/intel/languages?sort=files")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T14_sort_by_symbols_param_accepted(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""?sort=symbols must return 200."""
r = await client.get("/lnuser/ln-e2e/intel/languages?sort=symbols")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T15_unknown_sort_coerced_to_default(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""?sort=garbage must return 200, coerced to default sort (pct desc)."""
r = await client.get("/lnuser/ln-e2e/intel/languages?sort=garbage")
assert r.status_code == 200
@pytest.mark.asyncio
async def test_T16_top_param_limits_rows(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""?top=20 must return at most 20 language rows when 25 exist."""
repo = await create_repo(db_session, owner="lnuser", slug="t16-top")
rid = str(repo.repo_id)
langs = [f"Lang{i:02d}" for i in range(25)]
for i, lang in enumerate(langs):
await _insert_lang_row(db_session, rid, lang, file_count=i + 1)
await db_session.commit()
r = await client.get("/lnuser/t16-top/intel/languages?top=20")
assert r.status_code == 200
count = sum(1 for lang in langs if lang in r.text)
assert count <= 20, f"Expected ≤20 languages for ?top=20, found {count}"
@pytest.mark.asyncio
async def test_T17_top_invalid_string_returns_422(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""?top=abc must be rejected with 422 (FastAPI type validation)."""
r = await client.get("/lnuser/ln-e2e/intel/languages?top=abc")
assert r.status_code == 422
# ─────────────────────────────────────────────────────────────────────────────
# Layer T4 — E2E HTML
# ─────────────────────────────────────────────────────────────────────────────
class TestE2E:
@pytest.mark.asyncio
async def test_T18_language_names_appear_in_page(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""All seeded language names must appear in the rendered HTML."""
r = await client.get("/lnuser/ln-e2e/intel/languages")
assert r.status_code == 200
for lang in ("Python", "TypeScript", "CSS"):
assert lang in r.text, f"Language '{lang}' missing from page"
@pytest.mark.asyncio
async def test_T19_pct_bar_width_rendered(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""A width style attribute must appear in the HTML (for bar rendering)."""
r = await client.get("/lnuser/ln-e2e/intel/languages")
assert r.status_code == 200
assert "width:" in r.text, "No width style found — pct bars not rendered"
@pytest.mark.asyncio
async def test_T20_kind_chips_rendered_for_python(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""Kind chips for Python (function, class, method) must appear in HTML."""
r = await client.get("/lnuser/ln-e2e/intel/languages")
assert r.status_code == 200
body = r.text.lower()
for kind in ("function", "class", "method"):
assert kind in body, f"Kind chip '{kind}' missing from languages page"
@pytest.mark.asyncio
async def test_T21_dashboard_card_links_to_languages_page(
self, client: AsyncClient, ln_repo: MusehubRepo
) -> None:
"""Intel dashboard must include a link to /intel/languages."""
r = await client.get("/lnuser/ln-e2e/intel")
assert r.status_code == 200
assert b"/intel/languages" in r.content
# ─────────────────────────────────────────────────────────────────────────────
# Layer T5 — Data integrity
# ─────────────────────────────────────────────────────────────────────────────
class TestDataIntegrity:
@pytest.mark.asyncio
async def test_T22_double_upsert_produces_one_row(
self, db_session: AsyncSession
) -> None:
"""Upserting the same (repo_id, language) twice must not create duplicates."""
repo = await create_repo(db_session, owner="lnuser", slug="t22-dup")
rid = str(repo.repo_id)
for _ in range(2):
await _insert_lang_row(db_session, rid, "Python", file_count=5)
await db_session.commit()
rows = (await db_session.execute(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == rid
)
)).scalars().all()
assert len(rows) == 1, (
f"Expected 1 row after double upsert, got {len(rows)}"
)
@pytest.mark.asyncio
async def test_T23_second_upsert_overwrites_file_count(
self, db_session: AsyncSession
) -> None:
"""A second upsert must overwrite file_count with the latest value."""
repo = await create_repo(db_session, owner="lnuser", slug="t23-overwrite")
rid = str(repo.repo_id)
await _insert_lang_row(db_session, rid, "Python", file_count=5)
await _insert_lang_row(db_session, rid, "Python", file_count=12)
await db_session.commit()
row = await db_session.scalar(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == rid,
MusehubIntelLanguages.language == "Python",
)
)
assert row is not None
assert row.file_count == 12, (
f"Expected file_count=12 after overwrite upsert, got {row.file_count}"
)
@pytest.mark.asyncio
async def test_T24_cross_repo_isolation(
self, db_session: AsyncSession
) -> None:
"""Languages from repo A must not appear in repo B's DB rows."""
repo_a = await create_repo(db_session, owner="lnuser", slug="t24-repo-a")
repo_b = await create_repo(db_session, owner="lnuser", slug="t24-repo-b")
await _insert_lang_row(
db_session, str(repo_a.repo_id), "SecretLang", file_count=99
)
await db_session.commit()
rows_b = (await db_session.execute(
sa.select(MusehubIntelLanguages).where(
MusehubIntelLanguages.repo_id == str(repo_b.repo_id)
)
)).scalars().all()
assert not rows_b, "Repo B must not see Repo A's language rows"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T6 — Performance
# ─────────────────────────────────────────────────────────────────────────────
class TestPerformance:
@pytest.mark.asyncio
async def test_T25_provider_completes_100_files_under_2s(
self, db_session: AsyncSession
) -> None:
"""Provider must process a 100-file manifest in < 2 s wall time."""
from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
repo = await create_repo(db_session, owner="lnuser", slug="t25-speed")
rid = str(repo.repo_id)
manifest = {f"src/file_{i}.py": long_id(f"{'0' * 63}{i % 10}") for i in range(100)}
await _seed_snapshot(db_session, rid, manifest)
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"# py")
with (
patch("musehub.services.musehub_intel_providers.get_backend",
return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
return_value=_fake_tree(10)),
patch("musehub.services.musehub_intel_providers.language_of",
return_value="Python"),
):
t0 = time.monotonic()
await _PROVIDER_REGISTRY["intel.code.languages"].compute(
db_session, rid, _REF,
{"owner": repo.owner, "slug": repo.slug},
)
elapsed = time.monotonic() - t0
assert elapsed < 2.0, (
f"Provider took {elapsed:.2f}s for 100 files (limit: 2s)"
)
@pytest.mark.asyncio
async def test_T26_route_responds_under_200ms_for_50_languages(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must respond in < 200 ms when 50 language rows exist."""
repo = await create_repo(db_session, owner="lnuser", slug="t26-perf")
rid = str(repo.repo_id)
rows = [
{
"repo_id": rid,
"language": f"Lang{i:02d}",
"file_count": i + 1,
"symbol_count": (i + 1) * 100,
"pct": 2.0,
"kinds_json": {"function": (i + 1) * 50},
"ref": _REF,
}
for i in range(50)
]
await db_session.execute(
pg_insert(MusehubIntelLanguages)
.values(rows)
.on_conflict_do_nothing()
)
await db_session.commit()
t0 = time.monotonic()
r = await client.get("/lnuser/t26-perf/intel/languages")
elapsed = time.monotonic() - t0
assert r.status_code == 200
assert elapsed < 0.2, (
f"Route took {elapsed:.3f}s for 50 language rows (limit: 0.2s)"
)
@pytest.mark.asyncio
async def test_T27_db_query_uses_lang_index(
self, db_session: AsyncSession
) -> None:
"""SELECT on musehub_intel_languages must use ix_intel_languages_repo index."""
explain = await db_session.execute(
sa.text(
"EXPLAIN SELECT * FROM musehub_intel_languages WHERE repo_id = 'x'"
)
)
plan = " ".join(row[0] for row in explain.all())
assert "ix_intel_languages_repo" in plan or "Index" in plan, (
f"Query plan does not use ix_intel_languages_repo:\n{plan}"
)
# ─────────────────────────────────────────────────────────────────────────────
# Layer T7 — Security
# ─────────────────────────────────────────────────────────────────────────────
class TestSecurity:
@pytest.mark.asyncio
async def test_T28_xss_in_language_name_is_escaped(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""XSS payload stored in language name must be HTML-escaped in response."""
repo = await create_repo(db_session, owner="lnuser", slug="t28-xss")
rid = str(repo.repo_id)
await _insert_lang_row(
db_session, rid,
language="",
file_count=1,
)
await db_session.commit()
r = await client.get("/lnuser/t28-xss/intel/languages")
assert r.status_code == 200
assert "