"""Languages intel — full 7-tier test suite (issue #20). Tests are written TDD-first: all tests must be RED before Phase 4–7 implementation begins, then GREEN after. Tiers ----- T01–T04 Layer T1 — DB model (columns, nullable, kinds_json, cascade) T05–T09 Layer T2 — Provider (no subprocess, file counts, kinds, pct, empty) T10–T17 Layer T3 — Route (200, empty state, 404, sort, filter, pagination) T18–T21 Layer T4 — E2E HTML (stat chips, bar width, kind chips, dashboard link) T22–T24 Layer T5 — Data integrity (no duplicates, upsert overwrite, cross-repo) T25–T27 Layer T6 — Performance (provider speed, route speed, index check) T28–T30 Layer T7 — Security (XSS escape, SQL injection, no 500 on bad input) """ from __future__ import annotations import time from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import pytest import pytest_asyncio import sqlalchemy as sa from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelLanguages from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import JSONObject from tests.factories import create_repo from muse.core.types import long_id _REF = long_id("b" * 64) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── async def _insert_lang_row( session: AsyncSession, repo_id: str, language: str, file_count: int = 1, symbol_count: int = 0, pct: float = 0.0, kinds_json: JSONObject | None = None, ref: str = _REF, ) -> None: """Upsert one row into musehub_intel_languages.""" await session.execute( pg_insert(MusehubIntelLanguages) .values( repo_id=repo_id, language=language, file_count=file_count, symbol_count=symbol_count, pct=pct, kinds_json=kinds_json, ref=ref, ) .on_conflict_do_update( index_elements=["repo_id", "language"], set_={ "file_count": file_count, "symbol_count": symbol_count, "pct": pct, "kinds_json": kinds_json, "ref": ref, }, ) ) async def _seed_snapshot( session: AsyncSession, repo_id: str, manifest: dict[str, str], ) -> str: """Insert a MusehubCommit + MusehubSnapshot, return snapshot_id.""" import msgpack snap_id = long_id("c" * 64) commit_id = long_id("d" * 64) await session.execute( pg_insert(MusehubSnapshot) .values( snapshot_id = snap_id, directories = [], manifest_blob= msgpack.packb(manifest), entry_count = len(manifest), created_at = datetime(2026, 1, 1, tzinfo=timezone.utc), ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommit) .values( commit_id = commit_id, branch = "dev", parent_ids = [], message = "test", author = "lnuser", timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc), snapshot_id = snap_id, ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.commit() return snap_id def _fake_tree(n: int, kinds: list[str] | None = None) -> JSONObject: """Return a SymbolTree dict with n public symbols spread across kinds.""" _kinds = kinds or ["function", "class", "method"] return { f"src/mod.py::sym_{i}": { "kind": _kinds[i % len(_kinds)], "name": f"sym_{i}", "qualified_name": f"sym_{i}", "content_id": long_id("a" * 64), "body_hash": long_id("b" * 64), "signature_id": long_id("c" * 64), "metadata_id": "", "canonical_key": f"src/mod.py##function#sym_{i}#1", "lineno": i + 1, "end_lineno": i + 2, } for i in range(n) } @pytest_asyncio.fixture async def ln_repo(db_session: AsyncSession) -> MusehubRepo: """Repo seeded with Python, TypeScript, and CSS language rows.""" repo = await create_repo(db_session, owner="lnuser", slug="ln-e2e") rid = str(repo.repo_id) await _insert_lang_row( db_session, rid, "Python", file_count=30, symbol_count=1500, pct=75.0, kinds_json={"function": 800, "class": 400, "method": 300}, ) await _insert_lang_row( db_session, rid, "TypeScript", file_count=10, symbol_count=400, pct=20.0, kinds_json={"function": 300, "class": 100}, ) await _insert_lang_row( db_session, rid, "CSS", file_count=5, symbol_count=0, pct=0.0, kinds_json=None, ) await db_session.commit() return repo # ───────────────────────────────────────────────────────────────────────────── # Layer T1 — DB model # ───────────────────────────────────────────────────────────────────────────── class TestDBModel: def test_T01_model_has_all_required_columns(self) -> None: """MusehubIntelLanguages must declare all expected mapped columns.""" cols = { c.key for c in sa.inspect(MusehubIntelLanguages).mapper.column_attrs } for required in ( "repo_id", "language", "file_count", "symbol_count", "pct", "kinds_json", "ref", ): assert required in cols, ( f"Column '{required}' missing from MusehubIntelLanguages" ) def test_T02_kinds_json_is_nullable(self) -> None: """kinds_json must be nullable — non-code languages have no symbol breakdown.""" col = MusehubIntelLanguages.__table__.c["kinds_json"] assert col.nullable, "kinds_json must be nullable" def test_T03_composite_pk_is_repo_id_plus_language(self) -> None: """Primary key must be (repo_id, language) — no single-column PK.""" pk_cols = { c.name for c in MusehubIntelLanguages.__table__.primary_key.columns } assert pk_cols == {"repo_id", "language"}, ( f"Expected PK {{repo_id, language}}, got {pk_cols}" ) @pytest.mark.asyncio async def test_T04_cascade_delete_removes_lang_rows( self, db_session: AsyncSession ) -> None: """Deleting a repo must cascade-delete all its language rows.""" repo = await create_repo(db_session, owner="lnuser", slug="t04-cascade") rid = str(repo.repo_id) await _insert_lang_row(db_session, rid, "Python", file_count=3) await db_session.commit() row = await db_session.scalar( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == rid, MusehubIntelLanguages.language == "Python", ) ) assert row is not None, "Row not found after insert" await db_session.delete(repo) await db_session.commit() remaining = (await db_session.execute( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == rid ) )).scalars().all() assert not remaining, ( "Cascade delete failed — languages rows remain after repo delete" ) # ───────────────────────────────────────────────────────────────────────────── # Layer T2 — Provider # ───────────────────────────────────────────────────────────────────────────── class TestProvider: @pytest.mark.asyncio async def test_T05_provider_does_not_use_subprocess( self, db_session: AsyncSession ) -> None: """LanguagesProvider must never call asyncio.create_subprocess_exec or _run_muse.""" import inspect from musehub.services import musehub_intel_providers as _mod src = inspect.getsource(_mod.LanguagesProvider.compute) assert "create_subprocess" not in src, ( "LanguagesProvider.compute calls create_subprocess — forbidden" ) assert "_run_muse" not in src, ( "LanguagesProvider.compute calls _run_muse — forbidden" ) @pytest.mark.asyncio async def test_T06_provider_counts_files_per_language( self, db_session: AsyncSession ) -> None: """Provider must count files per language via language_of(), not subprocess.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session, owner="lnuser", slug="t06-files") rid = str(repo.repo_id) manifest = { "src/a.py": long_id("e" * 64), "src/b.py": long_id("f" * 64), "src/app.ts": long_id("1" * 64), "static/main.css": long_id("2" * 64), } await _seed_snapshot(db_session, rid, manifest) mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"# placeholder") with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", return_value={}), patch("musehub.services.musehub_intel_providers.language_of", side_effect=lambda p: ( "Python" if p.endswith(".py") else "TypeScript" if p.endswith(".ts") else "CSS" )), ): result = await _PROVIDER_REGISTRY["intel.code.languages"].compute( db_session, rid, _REF, {"owner": repo.owner, "slug": repo.slug}, ) assert result == [("intel.code.languages", {"count": 3})], ( f"Expected 3 language rows, got: {result}" ) rows = (await db_session.execute( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == rid ) )).scalars().all() by_lang = {r.language: r for r in rows} assert by_lang["Python"].file_count == 2 assert by_lang["TypeScript"].file_count == 1 assert by_lang["CSS"].file_count == 1 @pytest.mark.asyncio async def test_T07_provider_records_kinds_json( self, db_session: AsyncSession ) -> None: """kinds_json must contain kind → count breakdown, imports excluded.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session, owner="lnuser", slug="t07-kinds") rid = str(repo.repo_id) await _seed_snapshot(db_session, rid, {"src/x.py": long_id("3" * 64)}) tree = _fake_tree(6, kinds=["function", "class", "import"]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"# placeholder") with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", return_value=tree), patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), ): await _PROVIDER_REGISTRY["intel.code.languages"].compute( db_session, rid, _REF, {"owner": repo.owner, "slug": repo.slug}, ) row = await db_session.scalar( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == rid, MusehubIntelLanguages.language == "Python", ) ) assert row is not None assert row.kinds_json is not None, "kinds_json must not be None when symbols exist" assert "import" not in row.kinds_json, ( "import pseudo-symbols must be excluded from kinds_json" ) assert set(row.kinds_json.keys()) <= {"function", "class", "method", "async_function", "async_method"}, ( f"Unexpected kinds in kinds_json: {set(row.kinds_json.keys())}" ) @pytest.mark.asyncio async def test_T08_provider_pct_sums_correctly( self, db_session: AsyncSession ) -> None: """Sum of pct across all languages must be ≈ 100 when all files have symbols.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session, owner="lnuser", slug="t08-pct") rid = str(repo.repo_id) manifest = { "src/a.py": long_id("4" * 64), "src/b.ts": long_id("5" * 64), } await _seed_snapshot(db_session, rid, manifest) py_tree = _fake_tree(3, kinds=["function"]) ts_tree = _fake_tree(1, kinds=["function"]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"# placeholder") def _fake_parse(src: bytes, path: str) -> JSONObject: return py_tree if path.endswith(".py") else ts_tree with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=_fake_parse), patch("musehub.services.musehub_intel_providers.language_of", side_effect=lambda p: "Python" if p.endswith(".py") else "TypeScript"), ): await _PROVIDER_REGISTRY["intel.code.languages"].compute( db_session, rid, _REF, {"owner": repo.owner, "slug": repo.slug}, ) rows = (await db_session.execute( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == rid ) )).scalars().all() total_pct = sum(r.pct for r in rows) assert abs(total_pct - 100.0) < 0.01, ( f"pct values do not sum to 100 (sum={total_pct:.2f})" ) @pytest.mark.asyncio async def test_T09_provider_returns_empty_when_no_snapshot( self, db_session: AsyncSession ) -> None: """Provider must return [] without crashing when the repo has no snapshot.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session, owner="lnuser", slug="t09-nosnap") rid = str(repo.repo_id) await db_session.commit() result = await _PROVIDER_REGISTRY["intel.code.languages"].compute( db_session, rid, _REF, {"owner": repo.owner, "slug": repo.slug}, ) assert result == [], f"Expected [] when no snapshot exists, got {result}" # ───────────────────────────────────────────────────────────────────────────── # Layer T3 — Route # ───────────────────────────────────────────────────────────────────────────── class TestRoute: @pytest.mark.asyncio async def test_T10_returns_200_with_language_data( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """Route must return 200 when language rows exist.""" r = await client.get("/lnuser/ln-e2e/intel/languages") assert r.status_code == 200 @pytest.mark.asyncio async def test_T11_returns_200_with_empty_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Route must return 200 even when musehub_intel_languages has no rows.""" await create_repo(db_session, owner="lnuser", slug="t11-empty") await db_session.commit() r = await client.get("/lnuser/t11-empty/intel/languages") assert r.status_code == 200 @pytest.mark.asyncio async def test_T12_unknown_repo_returns_404( self, client: AsyncClient ) -> None: """Non-existent repo path must return 403 or 404, not 200 or 500.""" r = await client.get("/nobody/no-such-repo/intel/languages") assert r.status_code in (403, 404) @pytest.mark.asyncio async def test_T13_sort_by_files_param_accepted( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """?sort=files must return 200 and not raise an error.""" r = await client.get("/lnuser/ln-e2e/intel/languages?sort=files") assert r.status_code == 200 @pytest.mark.asyncio async def test_T14_sort_by_symbols_param_accepted( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """?sort=symbols must return 200.""" r = await client.get("/lnuser/ln-e2e/intel/languages?sort=symbols") assert r.status_code == 200 @pytest.mark.asyncio async def test_T15_unknown_sort_coerced_to_default( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """?sort=garbage must return 200, coerced to default sort (pct desc).""" r = await client.get("/lnuser/ln-e2e/intel/languages?sort=garbage") assert r.status_code == 200 @pytest.mark.asyncio async def test_T16_top_param_limits_rows( self, client: AsyncClient, db_session: AsyncSession ) -> None: """?top=20 must return at most 20 language rows when 25 exist.""" repo = await create_repo(db_session, owner="lnuser", slug="t16-top") rid = str(repo.repo_id) langs = [f"Lang{i:02d}" for i in range(25)] for i, lang in enumerate(langs): await _insert_lang_row(db_session, rid, lang, file_count=i + 1) await db_session.commit() r = await client.get("/lnuser/t16-top/intel/languages?top=20") assert r.status_code == 200 count = sum(1 for lang in langs if lang in r.text) assert count <= 20, f"Expected ≤20 languages for ?top=20, found {count}" @pytest.mark.asyncio async def test_T17_top_invalid_string_returns_422( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """?top=abc must be rejected with 422 (FastAPI type validation).""" r = await client.get("/lnuser/ln-e2e/intel/languages?top=abc") assert r.status_code == 422 # ───────────────────────────────────────────────────────────────────────────── # Layer T4 — E2E HTML # ───────────────────────────────────────────────────────────────────────────── class TestE2E: @pytest.mark.asyncio async def test_T18_language_names_appear_in_page( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """All seeded language names must appear in the rendered HTML.""" r = await client.get("/lnuser/ln-e2e/intel/languages") assert r.status_code == 200 for lang in ("Python", "TypeScript", "CSS"): assert lang in r.text, f"Language '{lang}' missing from page" @pytest.mark.asyncio async def test_T19_pct_bar_width_rendered( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """A width style attribute must appear in the HTML (for bar rendering).""" r = await client.get("/lnuser/ln-e2e/intel/languages") assert r.status_code == 200 assert "width:" in r.text, "No width style found — pct bars not rendered" @pytest.mark.asyncio async def test_T20_kind_chips_rendered_for_python( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """Kind chips for Python (function, class, method) must appear in HTML.""" r = await client.get("/lnuser/ln-e2e/intel/languages") assert r.status_code == 200 body = r.text.lower() for kind in ("function", "class", "method"): assert kind in body, f"Kind chip '{kind}' missing from languages page" @pytest.mark.asyncio async def test_T21_dashboard_card_links_to_languages_page( self, client: AsyncClient, ln_repo: MusehubRepo ) -> None: """Intel dashboard must include a link to /intel/languages.""" r = await client.get("/lnuser/ln-e2e/intel") assert r.status_code == 200 assert b"/intel/languages" in r.content # ───────────────────────────────────────────────────────────────────────────── # Layer T5 — Data integrity # ───────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: @pytest.mark.asyncio async def test_T22_double_upsert_produces_one_row( self, db_session: AsyncSession ) -> None: """Upserting the same (repo_id, language) twice must not create duplicates.""" repo = await create_repo(db_session, owner="lnuser", slug="t22-dup") rid = str(repo.repo_id) for _ in range(2): await _insert_lang_row(db_session, rid, "Python", file_count=5) await db_session.commit() rows = (await db_session.execute( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == rid ) )).scalars().all() assert len(rows) == 1, ( f"Expected 1 row after double upsert, got {len(rows)}" ) @pytest.mark.asyncio async def test_T23_second_upsert_overwrites_file_count( self, db_session: AsyncSession ) -> None: """A second upsert must overwrite file_count with the latest value.""" repo = await create_repo(db_session, owner="lnuser", slug="t23-overwrite") rid = str(repo.repo_id) await _insert_lang_row(db_session, rid, "Python", file_count=5) await _insert_lang_row(db_session, rid, "Python", file_count=12) await db_session.commit() row = await db_session.scalar( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == rid, MusehubIntelLanguages.language == "Python", ) ) assert row is not None assert row.file_count == 12, ( f"Expected file_count=12 after overwrite upsert, got {row.file_count}" ) @pytest.mark.asyncio async def test_T24_cross_repo_isolation( self, db_session: AsyncSession ) -> None: """Languages from repo A must not appear in repo B's DB rows.""" repo_a = await create_repo(db_session, owner="lnuser", slug="t24-repo-a") repo_b = await create_repo(db_session, owner="lnuser", slug="t24-repo-b") await _insert_lang_row( db_session, str(repo_a.repo_id), "SecretLang", file_count=99 ) await db_session.commit() rows_b = (await db_session.execute( sa.select(MusehubIntelLanguages).where( MusehubIntelLanguages.repo_id == str(repo_b.repo_id) ) )).scalars().all() assert not rows_b, "Repo B must not see Repo A's language rows" # ───────────────────────────────────────────────────────────────────────────── # Layer T6 — Performance # ───────────────────────────────────────────────────────────────────────────── class TestPerformance: @pytest.mark.asyncio async def test_T25_provider_completes_100_files_under_2s( self, db_session: AsyncSession ) -> None: """Provider must process a 100-file manifest in < 2 s wall time.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session, owner="lnuser", slug="t25-speed") rid = str(repo.repo_id) manifest = {f"src/file_{i}.py": long_id(f"{'0' * 63}{i % 10}") for i in range(100)} await _seed_snapshot(db_session, rid, manifest) mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"# py") with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", return_value=_fake_tree(10)), patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), ): t0 = time.monotonic() await _PROVIDER_REGISTRY["intel.code.languages"].compute( db_session, rid, _REF, {"owner": repo.owner, "slug": repo.slug}, ) elapsed = time.monotonic() - t0 assert elapsed < 2.0, ( f"Provider took {elapsed:.2f}s for 100 files (limit: 2s)" ) @pytest.mark.asyncio async def test_T26_route_responds_under_200ms_for_50_languages( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Route must respond in < 200 ms when 50 language rows exist.""" repo = await create_repo(db_session, owner="lnuser", slug="t26-perf") rid = str(repo.repo_id) rows = [ { "repo_id": rid, "language": f"Lang{i:02d}", "file_count": i + 1, "symbol_count": (i + 1) * 100, "pct": 2.0, "kinds_json": {"function": (i + 1) * 50}, "ref": _REF, } for i in range(50) ] await db_session.execute( pg_insert(MusehubIntelLanguages) .values(rows) .on_conflict_do_nothing() ) await db_session.commit() t0 = time.monotonic() r = await client.get("/lnuser/t26-perf/intel/languages") elapsed = time.monotonic() - t0 assert r.status_code == 200 assert elapsed < 0.2, ( f"Route took {elapsed:.3f}s for 50 language rows (limit: 0.2s)" ) @pytest.mark.asyncio async def test_T27_db_query_uses_lang_index( self, db_session: AsyncSession ) -> None: """SELECT on musehub_intel_languages must use ix_intel_languages_repo index.""" explain = await db_session.execute( sa.text( "EXPLAIN SELECT * FROM musehub_intel_languages WHERE repo_id = 'x'" ) ) plan = " ".join(row[0] for row in explain.all()) assert "ix_intel_languages_repo" in plan or "Index" in plan, ( f"Query plan does not use ix_intel_languages_repo:\n{plan}" ) # ───────────────────────────────────────────────────────────────────────────── # Layer T7 — Security # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: @pytest.mark.asyncio async def test_T28_xss_in_language_name_is_escaped( self, client: AsyncClient, db_session: AsyncSession ) -> None: """XSS payload stored in language name must be HTML-escaped in response.""" repo = await create_repo(db_session, owner="lnuser", slug="t28-xss") rid = str(repo.repo_id) await _insert_lang_row( db_session, rid, language="", file_count=1, ) await db_session.commit() r = await client.get("/lnuser/t28-xss/intel/languages") assert r.status_code == 200 assert "