"""Code Map intel — full 7-tier test suite (issue #21). Tests are written TDD-first: all tests must be RED before Phase 4–7 implementation begins, then GREEN after. Tiers ----- T01–T05 Layer T1 — DB model (columns, nullable, cascade, meta, index) T06–T11 Layer T2 — Provider (no subprocess, fan_in, fan_out, cycles, edges, empty) T12–T19 Layer T3 — Route (200, empty state, 404, sort, top filter, stat chips, meta) T20–T23 Layer T4 — E2E HTML (stat chips, fan-in bar, cycle panel, dashboard link) T24–T26 Layer T5 — Data integrity (upsert idempotent, meta overwrite, cross-repo) T27–T29 Layer T6 — Performance (provider speed, route speed, index check) T30–T32 Layer T7 — Security (XSS escape, SQL injection, no 500 on bad params) """ from __future__ import annotations import time from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import pytest import pytest_asyncio import sqlalchemy as sa from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import long_id from musehub.db.musehub_intel_models import MusehubIntelCodemapMeta, MusehubIntelCodemapModule from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import JSONObject from tests.factories import create_repo _REF = long_id("b" * 64) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── async def _insert_module_row( session: AsyncSession, repo_id: str, file_path: str, symbol_count: int = 0, fan_in: int = 0, fan_out: int = 0, language: str = "Python", ref: str = _REF, ) -> None: """Upsert one row into musehub_intel_codemap_modules.""" await session.execute( pg_insert(MusehubIntelCodemapModule) .values( repo_id=repo_id, file_path=file_path, symbol_count=symbol_count, fan_in=fan_in, fan_out=fan_out, language=language, ref=ref, ) .on_conflict_do_update( index_elements=["repo_id", "file_path"], set_={ "symbol_count": symbol_count, "fan_in": fan_in, "fan_out": fan_out, "language": language, "ref": ref, }, ) ) async def _insert_meta_row( session: AsyncSession, repo_id: str, total_modules: int = 0, total_edges: int = 0, cycle_count: int = 0, cycles_json: list[list[str]] | None = None, ref: str = _REF, ) -> None: """Upsert one row into musehub_intel_codemap_meta.""" await session.execute( pg_insert(MusehubIntelCodemapMeta) .values( repo_id=repo_id, total_modules=total_modules, total_edges=total_edges, cycle_count=cycle_count, cycles_json=cycles_json, ref=ref, ) .on_conflict_do_update( index_elements=["repo_id"], set_={ "total_modules": total_modules, "total_edges": total_edges, "cycle_count": cycle_count, "cycles_json": cycles_json, "ref": ref, }, ) ) async def _seed_snapshot( session: AsyncSession, repo_id: str, manifest: dict[str, str], ) -> str: """Insert a MusehubCommit + MusehubSnapshot, return snapshot_id.""" import msgpack snap_id = long_id("c" * 64) commit_id = long_id("d" * 64) await session.execute( pg_insert(MusehubSnapshot) .values( snapshot_id = snap_id, directories = [], manifest_blob= msgpack.packb(manifest), entry_count = len(manifest), created_at = datetime(2026, 1, 1, tzinfo=timezone.utc), ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommit) .values( commit_id = commit_id, branch = "dev", parent_ids = [], message = "test", author = "cmuser", timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc), snapshot_id = snap_id, ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.commit() return snap_id def _fake_import_tree( file_path: str, imports: list[str], n_symbols: int = 3, ) -> JSONObject: """Return a SymbolTree with n_symbols functions plus import records.""" tree: JSONObject = {} for i in range(n_symbols): tree[f"{file_path}::fn_{i}"] = { "kind": "function", "name": f"fn_{i}", "qualified_name": f"fn_{i}", "content_id": long_id("a" * 64), "body_hash": long_id("b" * 64), "signature_id": long_id("c" * 64), "metadata_id": "", "canonical_key": f"{file_path}##function#fn_{i}#1", "lineno": i + 1, "end_lineno": i + 2, } for j, dotted in enumerate(imports): key = f"{file_path}::_import_{j}" tree[key] = { "kind": "import", "name": dotted.split(".")[-1], "qualified_name": f"import::{dotted}::_sym", "content_id": long_id("e" * 64), "body_hash": "", "signature_id": "", "metadata_id": "", "canonical_key": f"{file_path}##import#{dotted}#0", "lineno": n_symbols + j + 1, "end_lineno": n_symbols + j + 2, } return tree @pytest_asyncio.fixture async def cm_repo(db_session: AsyncSession) -> MusehubRepo: """Repo seeded with 5 module rows and a meta row.""" repo = await create_repo(db_session, owner="cmuser", slug="cm-e2e") rid = str(repo.repo_id) for i, (fp, fi, fo) in enumerate([ ("musehub/api/routes/ui.py", 8, 3), ("musehub/services/svc.py", 5, 4), ("musehub/db/models.py", 4, 1), ("musehub/core/types.py", 3, 0), ("musehub/utils/helpers.py", 1, 2), ]): await _insert_module_row( db_session, rid, fp, symbol_count=10 + i, fan_in=fi, fan_out=fo, language="Python", ) await _insert_meta_row( db_session, rid, total_modules=5, total_edges=17, cycle_count=0, ) await db_session.commit() return repo # ───────────────────────────────────────────────────────────────────────────── # Layer T1 — DB model # ───────────────────────────────────────────────────────────────────────────── class TestDBModel: def test_T01_module_model_has_all_required_columns(self) -> None: """MusehubIntelCodemapModule must declare all expected mapped columns.""" cols = { c.key for c in sa.inspect(MusehubIntelCodemapModule).mapper.column_attrs } for required in ("repo_id", "file_path", "symbol_count", "fan_in", "fan_out", "language", "ref"): assert required in cols, f"Column '{required}' missing from MusehubIntelCodemapModule" def test_T02_meta_model_has_all_required_columns(self) -> None: """MusehubIntelCodemapMeta must declare all expected mapped columns.""" cols = { c.key for c in sa.inspect(MusehubIntelCodemapMeta).mapper.column_attrs } for required in ("repo_id", "total_modules", "total_edges", "cycle_count", "cycles_json", "ref"): assert required in cols, f"Column '{required}' missing from MusehubIntelCodemapMeta" def test_T03_cycles_json_is_nullable(self) -> None: """cycles_json must be nullable — most repos have no cycles.""" col = MusehubIntelCodemapMeta.__table__.c["cycles_json"] assert col.nullable, "cycles_json must be nullable" def test_T04_composite_pk_modules(self) -> None: """Primary key of codemap_modules must be (repo_id, file_path).""" pk_cols = {c.name for c in MusehubIntelCodemapModule.__table__.primary_key.columns} assert pk_cols == {"repo_id", "file_path"}, f"Unexpected PK: {pk_cols}" @pytest.mark.asyncio async def test_T05_cascade_delete_removes_module_rows( self, db_session: AsyncSession ) -> None: """Deleting a repo must cascade-delete all codemap module rows.""" repo = await create_repo(db_session, owner="cmuser2", slug="cm-cascade") rid = str(repo.repo_id) await _insert_module_row(db_session, rid, "src/a.py", fan_in=1) await db_session.commit() await db_session.delete(repo) await db_session.commit() result = await db_session.execute( sa.select(MusehubIntelCodemapModule) .where(MusehubIntelCodemapModule.repo_id == rid) ) assert result.first() is None, "Cascade delete failed — module rows remain" # ───────────────────────────────────────────────────────────────────────────── # Layer T2 — Provider # ───────────────────────────────────────────────────────────────────────────── class TestProvider: @pytest.mark.asyncio async def test_T06_provider_returns_no_subprocess( self, db_session: AsyncSession ) -> None: """CodemapProvider.compute must not import subprocess or asyncio.create_subprocess_exec.""" import inspect from musehub.services.musehub_intel_providers import CodemapProvider src = inspect.getsource(CodemapProvider.compute) assert "subprocess" not in src, "CodemapProvider.compute spawns a subprocess" assert "create_subprocess" not in src, "CodemapProvider.compute uses create_subprocess" @pytest.mark.asyncio async def test_T07_provider_computes_fan_out( self, db_session: AsyncSession ) -> None: """fan_out counts resolved imports from the manifest, not stdlib.""" from musehub.services.musehub_intel_providers import CodemapProvider repo = await create_repo(db_session, owner="cmuser3", slug="cm-fanout") rid = str(repo.repo_id) # a.py imports b.py; c.py is stdlib (unresolved) manifest = { "src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64), } await _seed_snapshot(db_session, rid, manifest) a_tree = _fake_import_tree("src/a.py", ["src.b"], n_symbols=2) b_tree = _fake_import_tree("src/b.py", ["os.path"], n_symbols=1) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"src_a", b"src_b"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]), patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), ): provider = CodemapProvider() results = await provider.compute(db_session, rid, "dev", {"owner": "cmuser3", "slug": "cm-fanout"}) assert results, "Provider returned empty results" assert results[0][0] == "intel.code.codemap" data = results[0][1] # a.py resolves src.b → 1 edge; b.py resolves os.path → 0 (stdlib) assert data["edges"] == 1, f"Expected 1 edge, got {data['edges']}" @pytest.mark.asyncio async def test_T08_provider_computes_fan_in( self, db_session: AsyncSession ) -> None: """fan_in of b.py must equal number of files that import b.py.""" from musehub.services.musehub_intel_providers import CodemapProvider repo = await create_repo(db_session, owner="cmuser4", slug="cm-fanin") rid = str(repo.repo_id) manifest = { "src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64), "src/c.py": long_id("c" * 64), } await _seed_snapshot(db_session, rid, manifest) a_tree = _fake_import_tree("src/a.py", ["src.b"], n_symbols=1) b_tree = _fake_import_tree("src/b.py", [], n_symbols=1) c_tree = _fake_import_tree("src/c.py", ["src.b"], n_symbols=1) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"src_a", b"src_b", b"src_c"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree, c_tree]), patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), ): provider = CodemapProvider() await provider.compute(db_session, rid, "dev", {"owner": "cmuser4", "slug": "cm-fanin"}) await db_session.commit() result = await db_session.execute( sa.select(MusehubIntelCodemapModule) .where( MusehubIntelCodemapModule.repo_id == rid, MusehubIntelCodemapModule.file_path == "src/b.py", ) ) row = result.scalar_one_or_none() assert row is not None, "src/b.py row not found" assert row.fan_in == 2, f"Expected fan_in=2, got {row.fan_in}" @pytest.mark.asyncio async def test_T09_provider_detects_no_cycles_for_dag( self, db_session: AsyncSession ) -> None: """A pure DAG import graph must produce cycle_count=0.""" from musehub.services.musehub_intel_providers import CodemapProvider repo = await create_repo(db_session, owner="cmuser5", slug="cm-nocycle") rid = str(repo.repo_id) manifest = {"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)} await _seed_snapshot(db_session, rid, manifest) a_tree = _fake_import_tree("src/a.py", ["src.b"]) b_tree = _fake_import_tree("src/b.py", []) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"a", b"b"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]), patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), ): results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser5", "slug": "cm-nocycle"}) assert results[0][1]["cycles"] == 0 @pytest.mark.asyncio async def test_T10_provider_detects_mutual_import_cycle( self, db_session: AsyncSession ) -> None: """A ↔ B mutual import must be detected as one cycle.""" from musehub.services.musehub_intel_providers import CodemapProvider repo = await create_repo(db_session, owner="cmuser6", slug="cm-cycle") rid = str(repo.repo_id) manifest = {"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)} await _seed_snapshot(db_session, rid, manifest) # a imports b AND b imports a → cycle a_tree = _fake_import_tree("src/a.py", ["src.b"]) b_tree = _fake_import_tree("src/b.py", ["src.a"]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"a", b"b"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]), patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), ): results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser6", "slug": "cm-cycle"}) assert results[0][1]["cycles"] == 1, f"Expected 1 cycle, got {results[0][1]['cycles']}" @pytest.mark.asyncio async def test_T11_provider_returns_empty_for_missing_manifest( self, db_session: AsyncSession ) -> None: """Provider must return [] when no commits exist for the repo.""" from musehub.services.musehub_intel_providers import CodemapProvider repo = await create_repo(db_session, owner="cmuser7", slug="cm-empty") rid = str(repo.repo_id) await db_session.commit() results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser7", "slug": "cm-empty"}) assert results == [], f"Expected [], got {results}" # ───────────────────────────────────────────────────────────────────────────── # Layer T3 — Route # ───────────────────────────────────────────────────────────────────────────── class TestRoute: @pytest.mark.asyncio async def test_T12_codemap_page_returns_200( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """GET /cmuser/cm-e2e/intel/codemap must return HTTP 200.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap") assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" @pytest.mark.asyncio async def test_T13_codemap_page_empty_state( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Route must render empty state when no codemap rows exist.""" repo = await create_repo(db_session, owner="cmempty", slug="cm-nodata") await db_session.commit() resp = await client.get("/cmempty/cm-nodata/intel/codemap") assert resp.status_code == 200 assert "Push a commit" in resp.text @pytest.mark.asyncio async def test_T14_codemap_page_404_for_missing_repo( self, client: AsyncClient ) -> None: """Route must return 404 for a repo that does not exist.""" resp = await client.get("/ghost/no-such-repo/intel/codemap") assert resp.status_code == 404 @pytest.mark.asyncio async def test_T15_sort_by_fan_in( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """sort=fan-in must return modules ordered by fan_in descending.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=fan-in") assert resp.status_code == 200 # musehub/api/routes/ui.py has fan_in=8, must appear before others text = resp.text pos_api = text.find("ui.py") pos_types = text.find("types.py") assert pos_api < pos_types, "fan-in sort: ui.py (fi=8) should appear before types.py (fi=3)" @pytest.mark.asyncio async def test_T16_sort_by_fan_out( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """sort=fan-out must return modules ordered by fan_out descending.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=fan-out") assert resp.status_code == 200 text = resp.text # svc.py has fan_out=4, must appear before models.py (fo=1) pos_svc = text.find("svc.py") pos_models = text.find("models.py") assert pos_svc < pos_models, "fan-out sort: svc.py (fo=4) should appear before models.py (fo=1)" @pytest.mark.asyncio async def test_T17_unknown_sort_coerces_to_symbols( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """An unknown sort param must be coerced to 'symbols' (no 400/500).""" resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=invalid_sort") assert resp.status_code == 200 @pytest.mark.asyncio async def test_T18_top_filter_limits_rows( self, client: AsyncClient, db_session: AsyncSession ) -> None: """top=20 with 25 total modules must show max 20 rows but stat chip shows 25.""" repo = await create_repo(db_session, owner="cmtop", slug="cm-top") rid = str(repo.repo_id) for i in range(25): await _insert_module_row(db_session, rid, f"src/mod_{i}.py", fan_in=i) await _insert_meta_row(db_session, rid, total_modules=25, total_edges=0) await db_session.commit() resp = await client.get("/cmtop/cm-top/intel/codemap?top=20") assert resp.status_code == 200 # stat chip must show 25, not 20 assert "25" in resp.text, "Stat chip must show total module count (25), not page length" @pytest.mark.asyncio async def test_T19_stat_chips_use_meta_row( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """Stat chips must display meta row values (5 modules, 17 edges, 0 cycles).""" resp = await client.get("/cmuser/cm-e2e/intel/codemap") assert resp.status_code == 200 assert "17" in resp.text, "Edge count from meta row (17) not found in response" # ───────────────────────────────────────────────────────────────────────────── # Layer T4 — E2E HTML # ───────────────────────────────────────────────────────────────────────────── class TestHTML: @pytest.mark.asyncio async def test_T20_stat_chips_present( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """HTML must contain stat chip labels: Modules, Edges, Cycles.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap") assert resp.status_code == 200 for label in ("Modules", "Edges", "Cycles"): assert label in resp.text, f"Stat chip label '{label}' missing" @pytest.mark.asyncio async def test_T21_fan_in_bar_present( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """HTML must contain cm-fan-bar element for fan-in visualization.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap") assert resp.status_code == 200 assert "cm-fan-bar" in resp.text, "cm-fan-bar class missing from HTML" @pytest.mark.asyncio async def test_T22_cycle_ok_shown_when_no_cycles( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """cm-cycle-ok element must be present when cycle_count == 0.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap") assert resp.status_code == 200 assert "cm-cycle-ok" in resp.text, "cm-cycle-ok class missing (cycle_count=0)" @pytest.mark.asyncio async def test_T23_dashboard_back_link_present( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """Page must contain a back link to the intel dashboard.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap") assert resp.status_code == 200 assert "/intel" in resp.text, "Back link to Intel Hub missing" # ───────────────────────────────────────────────────────────────────────────── # Layer T5 — Data integrity # ───────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: @pytest.mark.asyncio async def test_T24_upsert_is_idempotent( self, db_session: AsyncSession ) -> None: """Upserting the same module row twice must not create duplicates.""" repo = await create_repo(db_session, owner="cmdup", slug="cm-dup") rid = str(repo.repo_id) for _ in range(2): await _insert_module_row(db_session, rid, "src/a.py", fan_in=3) await db_session.commit() result = await db_session.execute( sa.select(sa.func.count()) .select_from(MusehubIntelCodemapModule) .where(MusehubIntelCodemapModule.repo_id == rid) ) count = result.scalar_one() assert count == 1, f"Upsert created duplicate: expected 1 row, got {count}" @pytest.mark.asyncio async def test_T25_meta_upsert_overwrites( self, db_session: AsyncSession ) -> None: """Second meta upsert must overwrite total_edges, not create a second row.""" repo = await create_repo(db_session, owner="cmmeta", slug="cm-meta") rid = str(repo.repo_id) await _insert_meta_row(db_session, rid, total_modules=5, total_edges=10) await _insert_meta_row(db_session, rid, total_modules=6, total_edges=20) await db_session.commit() result = await db_session.execute( sa.select(MusehubIntelCodemapMeta) .where(MusehubIntelCodemapMeta.repo_id == rid) ) rows = result.scalars().all() assert len(rows) == 1, f"Expected 1 meta row, got {len(rows)}" assert rows[0].total_edges == 20, f"Expected total_edges=20, got {rows[0].total_edges}" @pytest.mark.asyncio async def test_T26_cross_repo_isolation( self, db_session: AsyncSession ) -> None: """Module rows from repo A must not appear in repo B queries.""" repo_a = await create_repo(db_session, owner="cmisolate", slug="repo-a") repo_b = await create_repo(db_session, owner="cmisolate", slug="repo-b") rid_a = str(repo_a.repo_id) rid_b = str(repo_b.repo_id) await _insert_module_row(db_session, rid_a, "src/a.py", fan_in=5) await db_session.commit() result = await db_session.execute( sa.select(MusehubIntelCodemapModule) .where(MusehubIntelCodemapModule.repo_id == rid_b) ) assert result.first() is None, "Cross-repo contamination: repo B sees repo A rows" # ───────────────────────────────────────────────────────────────────────────── # Layer T6 — Performance # ───────────────────────────────────────────────────────────────────────────── class TestPerformance: @pytest.mark.asyncio async def test_T27_provider_completes_under_threshold( self, db_session: AsyncSession ) -> None: """CodemapProvider must complete within 10 s for a 50-file manifest.""" from musehub.services.musehub_intel_providers import CodemapProvider repo = await create_repo(db_session, owner="cmperfp", slug="cm-perf-p") rid = str(repo.repo_id) n = 50 manifest = {f"src/mod_{i}.py": long_id(f"{i:064x}") for i in range(n)} await _seed_snapshot(db_session, rid, manifest) def _tree_for(path: str) -> JSONObject: return _fake_import_tree(path, [], n_symbols=5) trees = [_tree_for(fp) for fp in manifest] src_bytes = [b"src"] * n mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=src_bytes) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=trees), patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), ): t0 = time.monotonic() await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmperfp", "slug": "cm-perf-p"}) elapsed = time.monotonic() - t0 assert elapsed < 10.0, f"Provider took {elapsed:.2f}s — exceeds 10s threshold" @pytest.mark.asyncio async def test_T28_route_responds_under_500ms( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """GET /intel/codemap must respond in under 500 ms for a seeded repo.""" t0 = time.monotonic() resp = await client.get("/cmuser/cm-e2e/intel/codemap") elapsed = time.monotonic() - t0 assert resp.status_code == 200 assert elapsed < 0.5, f"Route took {elapsed:.3f}s — exceeds 500ms threshold" def test_T29_module_table_has_repo_index(self) -> None: """ix_intel_codemap_modules_repo index must exist on the ORM model.""" table = MusehubIntelCodemapModule.__table__ index_names = {idx.name for idx in table.indexes} assert "ix_intel_codemap_modules_repo" in index_names, ( f"Index missing. Found: {index_names}" ) # ───────────────────────────────────────────────────────────────────────────── # Layer T7 — Security # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: @pytest.mark.asyncio async def test_T30_xss_file_path_is_escaped( self, client: AsyncClient, db_session: AsyncSession ) -> None: """file_path containing HTML must be escaped in the rendered page.""" repo = await create_repo(db_session, owner="cmxss", slug="cm-xss") rid = str(repo.repo_id) xss_path = "src/.py" await _insert_module_row(db_session, rid, xss_path, fan_in=1) await _insert_meta_row(db_session, rid, total_modules=1, total_edges=0) await db_session.commit() resp = await client.get("/cmxss/cm-xss/intel/codemap") assert resp.status_code == 200 assert "" not in resp.text, "Unescaped XSS payload in response" @pytest.mark.asyncio async def test_T31_sql_injection_in_sort_param_is_safe( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """SQL injection attempt in sort param must return 200, not 500.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=symbols%3BDROP+TABLE+musehub_intel_codemap_modules") assert resp.status_code == 200 @pytest.mark.asyncio async def test_T32_invalid_top_param_does_not_500( self, client: AsyncClient, cm_repo: MusehubRepo ) -> None: """Invalid top param (non-integer string) must not return 500.""" resp = await client.get("/cmuser/cm-e2e/intel/codemap?top=INVALID") assert resp.status_code == 200