"""Code Map intel — full 7-tier test suite (issue #21).
Tests are written TDD-first: all tests must be RED before Phase 4–7
implementation begins, then GREEN after.
Tiers
-----
T01–T05 Layer T1 — DB model (columns, nullable, cascade, meta, index)
T06–T11 Layer T2 — Provider (no subprocess, fan_in, fan_out, cycles, edges, empty)
T12–T19 Layer T3 — Route (200, empty state, 404, sort, top filter, stat chips, meta)
T20–T23 Layer T4 — E2E HTML (stat chips, fan-in bar, cycle panel, dashboard link)
T24–T26 Layer T5 — Data integrity (upsert idempotent, meta overwrite, cross-repo)
T27–T29 Layer T6 — Performance (provider speed, route speed, index check)
T30–T32 Layer T7 — Security (XSS escape, SQL injection, no 500 on bad params)
"""
from __future__ import annotations
import time
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
import sqlalchemy as sa
from httpx import AsyncClient
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from muse.core.types import long_id
from musehub.db.musehub_intel_models import MusehubIntelCodemapMeta, MusehubIntelCodemapModule
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef
from musehub.types.json_types import JSONObject
from tests.factories import create_repo
_REF = long_id("b" * 64)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
async def _insert_module_row(
session: AsyncSession,
repo_id: str,
file_path: str,
symbol_count: int = 0,
fan_in: int = 0,
fan_out: int = 0,
language: str = "Python",
ref: str = _REF,
) -> None:
"""Upsert one row into musehub_intel_codemap_modules."""
await session.execute(
pg_insert(MusehubIntelCodemapModule)
.values(
repo_id=repo_id,
file_path=file_path,
symbol_count=symbol_count,
fan_in=fan_in,
fan_out=fan_out,
language=language,
ref=ref,
)
.on_conflict_do_update(
index_elements=["repo_id", "file_path"],
set_={
"symbol_count": symbol_count,
"fan_in": fan_in,
"fan_out": fan_out,
"language": language,
"ref": ref,
},
)
)
async def _insert_meta_row(
session: AsyncSession,
repo_id: str,
total_modules: int = 0,
total_edges: int = 0,
cycle_count: int = 0,
cycles_json: list[list[str]] | None = None,
ref: str = _REF,
) -> None:
"""Upsert one row into musehub_intel_codemap_meta."""
await session.execute(
pg_insert(MusehubIntelCodemapMeta)
.values(
repo_id=repo_id,
total_modules=total_modules,
total_edges=total_edges,
cycle_count=cycle_count,
cycles_json=cycles_json,
ref=ref,
)
.on_conflict_do_update(
index_elements=["repo_id"],
set_={
"total_modules": total_modules,
"total_edges": total_edges,
"cycle_count": cycle_count,
"cycles_json": cycles_json,
"ref": ref,
},
)
)
async def _seed_snapshot(
session: AsyncSession,
repo_id: str,
manifest: dict[str, str],
) -> str:
"""Insert a MusehubCommit + MusehubSnapshot, return snapshot_id."""
import msgpack
snap_id = long_id("c" * 64)
commit_id = long_id("d" * 64)
await session.execute(
pg_insert(MusehubSnapshot)
.values(
snapshot_id = snap_id,
directories = [],
manifest_blob= msgpack.packb(manifest),
entry_count = len(manifest),
created_at = datetime(2026, 1, 1, tzinfo=timezone.utc),
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubSnapshotRef)
.values(repo_id=repo_id, snapshot_id=snap_id)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommit)
.values(
commit_id = commit_id,
branch = "dev",
parent_ids = [],
message = "test",
author = "cmuser",
timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc),
snapshot_id = snap_id,
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommitRef)
.values(repo_id=repo_id, commit_id=commit_id)
.on_conflict_do_nothing()
)
await session.commit()
return snap_id
def _fake_import_tree(
file_path: str,
imports: list[str],
n_symbols: int = 3,
) -> JSONObject:
"""Return a SymbolTree with n_symbols functions plus import records."""
tree: JSONObject = {}
for i in range(n_symbols):
tree[f"{file_path}::fn_{i}"] = {
"kind": "function",
"name": f"fn_{i}",
"qualified_name": f"fn_{i}",
"content_id": long_id("a" * 64),
"body_hash": long_id("b" * 64),
"signature_id": long_id("c" * 64),
"metadata_id": "",
"canonical_key": f"{file_path}##function#fn_{i}#1",
"lineno": i + 1,
"end_lineno": i + 2,
}
for j, dotted in enumerate(imports):
key = f"{file_path}::_import_{j}"
tree[key] = {
"kind": "import",
"name": dotted.split(".")[-1],
"qualified_name": f"import::{dotted}::_sym",
"content_id": long_id("e" * 64),
"body_hash": "",
"signature_id": "",
"metadata_id": "",
"canonical_key": f"{file_path}##import#{dotted}#0",
"lineno": n_symbols + j + 1,
"end_lineno": n_symbols + j + 2,
}
return tree
@pytest_asyncio.fixture
async def cm_repo(db_session: AsyncSession) -> MusehubRepo:
"""Repo seeded with 5 module rows and a meta row."""
repo = await create_repo(db_session, owner="cmuser", slug="cm-e2e")
rid = str(repo.repo_id)
for i, (fp, fi, fo) in enumerate([
("musehub/api/routes/ui.py", 8, 3),
("musehub/services/svc.py", 5, 4),
("musehub/db/models.py", 4, 1),
("musehub/core/types.py", 3, 0),
("musehub/utils/helpers.py", 1, 2),
]):
await _insert_module_row(
db_session, rid, fp,
symbol_count=10 + i,
fan_in=fi, fan_out=fo,
language="Python",
)
await _insert_meta_row(
db_session, rid,
total_modules=5, total_edges=17, cycle_count=0,
)
await db_session.commit()
return repo
# ─────────────────────────────────────────────────────────────────────────────
# Layer T1 — DB model
# ─────────────────────────────────────────────────────────────────────────────
class TestDBModel:
def test_T01_module_model_has_all_required_columns(self) -> None:
"""MusehubIntelCodemapModule must declare all expected mapped columns."""
cols = {
c.key
for c in sa.inspect(MusehubIntelCodemapModule).mapper.column_attrs
}
for required in ("repo_id", "file_path", "symbol_count", "fan_in", "fan_out", "language", "ref"):
assert required in cols, f"Column '{required}' missing from MusehubIntelCodemapModule"
def test_T02_meta_model_has_all_required_columns(self) -> None:
"""MusehubIntelCodemapMeta must declare all expected mapped columns."""
cols = {
c.key
for c in sa.inspect(MusehubIntelCodemapMeta).mapper.column_attrs
}
for required in ("repo_id", "total_modules", "total_edges", "cycle_count", "cycles_json", "ref"):
assert required in cols, f"Column '{required}' missing from MusehubIntelCodemapMeta"
def test_T03_cycles_json_is_nullable(self) -> None:
"""cycles_json must be nullable — most repos have no cycles."""
col = MusehubIntelCodemapMeta.__table__.c["cycles_json"]
assert col.nullable, "cycles_json must be nullable"
def test_T04_composite_pk_modules(self) -> None:
"""Primary key of codemap_modules must be (repo_id, file_path)."""
pk_cols = {c.name for c in MusehubIntelCodemapModule.__table__.primary_key.columns}
assert pk_cols == {"repo_id", "file_path"}, f"Unexpected PK: {pk_cols}"
@pytest.mark.asyncio
async def test_T05_cascade_delete_removes_module_rows(
self, db_session: AsyncSession
) -> None:
"""Deleting a repo must cascade-delete all codemap module rows."""
repo = await create_repo(db_session, owner="cmuser2", slug="cm-cascade")
rid = str(repo.repo_id)
await _insert_module_row(db_session, rid, "src/a.py", fan_in=1)
await db_session.commit()
await db_session.delete(repo)
await db_session.commit()
result = await db_session.execute(
sa.select(MusehubIntelCodemapModule)
.where(MusehubIntelCodemapModule.repo_id == rid)
)
assert result.first() is None, "Cascade delete failed — module rows remain"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T2 — Provider
# ─────────────────────────────────────────────────────────────────────────────
class TestProvider:
@pytest.mark.asyncio
async def test_T06_provider_returns_no_subprocess(
self, db_session: AsyncSession
) -> None:
"""CodemapProvider.compute must not import subprocess or asyncio.create_subprocess_exec."""
import inspect
from musehub.services.musehub_intel_providers import CodemapProvider
src = inspect.getsource(CodemapProvider.compute)
assert "subprocess" not in src, "CodemapProvider.compute spawns a subprocess"
assert "create_subprocess" not in src, "CodemapProvider.compute uses create_subprocess"
@pytest.mark.asyncio
async def test_T07_provider_computes_fan_out(
self, db_session: AsyncSession
) -> None:
"""fan_out counts resolved imports from the manifest, not stdlib."""
from musehub.services.musehub_intel_providers import CodemapProvider
repo = await create_repo(db_session, owner="cmuser3", slug="cm-fanout")
rid = str(repo.repo_id)
# a.py imports b.py; c.py is stdlib (unresolved)
manifest = {
"src/a.py": long_id("a" * 64),
"src/b.py": long_id("b" * 64),
}
await _seed_snapshot(db_session, rid, manifest)
a_tree = _fake_import_tree("src/a.py", ["src.b"], n_symbols=2)
b_tree = _fake_import_tree("src/b.py", ["os.path"], n_symbols=1)
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"src_a", b"src_b"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]),
patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"),
):
provider = CodemapProvider()
results = await provider.compute(db_session, rid, "dev", {"owner": "cmuser3", "slug": "cm-fanout"})
assert results, "Provider returned empty results"
assert results[0][0] == "intel.code.codemap"
data = results[0][1]
# a.py resolves src.b → 1 edge; b.py resolves os.path → 0 (stdlib)
assert data["edges"] == 1, f"Expected 1 edge, got {data['edges']}"
@pytest.mark.asyncio
async def test_T08_provider_computes_fan_in(
self, db_session: AsyncSession
) -> None:
"""fan_in of b.py must equal number of files that import b.py."""
from musehub.services.musehub_intel_providers import CodemapProvider
repo = await create_repo(db_session, owner="cmuser4", slug="cm-fanin")
rid = str(repo.repo_id)
manifest = {
"src/a.py": long_id("a" * 64),
"src/b.py": long_id("b" * 64),
"src/c.py": long_id("c" * 64),
}
await _seed_snapshot(db_session, rid, manifest)
a_tree = _fake_import_tree("src/a.py", ["src.b"], n_symbols=1)
b_tree = _fake_import_tree("src/b.py", [], n_symbols=1)
c_tree = _fake_import_tree("src/c.py", ["src.b"], n_symbols=1)
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"src_a", b"src_b", b"src_c"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree, c_tree]),
patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"),
):
provider = CodemapProvider()
await provider.compute(db_session, rid, "dev", {"owner": "cmuser4", "slug": "cm-fanin"})
await db_session.commit()
result = await db_session.execute(
sa.select(MusehubIntelCodemapModule)
.where(
MusehubIntelCodemapModule.repo_id == rid,
MusehubIntelCodemapModule.file_path == "src/b.py",
)
)
row = result.scalar_one_or_none()
assert row is not None, "src/b.py row not found"
assert row.fan_in == 2, f"Expected fan_in=2, got {row.fan_in}"
@pytest.mark.asyncio
async def test_T09_provider_detects_no_cycles_for_dag(
self, db_session: AsyncSession
) -> None:
"""A pure DAG import graph must produce cycle_count=0."""
from musehub.services.musehub_intel_providers import CodemapProvider
repo = await create_repo(db_session, owner="cmuser5", slug="cm-nocycle")
rid = str(repo.repo_id)
manifest = {"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)}
await _seed_snapshot(db_session, rid, manifest)
a_tree = _fake_import_tree("src/a.py", ["src.b"])
b_tree = _fake_import_tree("src/b.py", [])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"a", b"b"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]),
patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"),
):
results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser5", "slug": "cm-nocycle"})
assert results[0][1]["cycles"] == 0
@pytest.mark.asyncio
async def test_T10_provider_detects_mutual_import_cycle(
self, db_session: AsyncSession
) -> None:
"""A ↔ B mutual import must be detected as one cycle."""
from musehub.services.musehub_intel_providers import CodemapProvider
repo = await create_repo(db_session, owner="cmuser6", slug="cm-cycle")
rid = str(repo.repo_id)
manifest = {"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)}
await _seed_snapshot(db_session, rid, manifest)
# a imports b AND b imports a → cycle
a_tree = _fake_import_tree("src/a.py", ["src.b"])
b_tree = _fake_import_tree("src/b.py", ["src.a"])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"a", b"b"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]),
patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"),
):
results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser6", "slug": "cm-cycle"})
assert results[0][1]["cycles"] == 1, f"Expected 1 cycle, got {results[0][1]['cycles']}"
@pytest.mark.asyncio
async def test_T11_provider_returns_empty_for_missing_manifest(
self, db_session: AsyncSession
) -> None:
"""Provider must return [] when no commits exist for the repo."""
from musehub.services.musehub_intel_providers import CodemapProvider
repo = await create_repo(db_session, owner="cmuser7", slug="cm-empty")
rid = str(repo.repo_id)
await db_session.commit()
results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser7", "slug": "cm-empty"})
assert results == [], f"Expected [], got {results}"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T3 — Route
# ─────────────────────────────────────────────────────────────────────────────
class TestRoute:
@pytest.mark.asyncio
async def test_T12_codemap_page_returns_200(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""GET /cmuser/cm-e2e/intel/codemap must return HTTP 200."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap")
assert resp.status_code == 200, f"Expected 200, got {resp.status_code}"
@pytest.mark.asyncio
async def test_T13_codemap_page_empty_state(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must render empty state when no codemap rows exist."""
repo = await create_repo(db_session, owner="cmempty", slug="cm-nodata")
await db_session.commit()
resp = await client.get("/cmempty/cm-nodata/intel/codemap")
assert resp.status_code == 200
assert "Push a commit" in resp.text
@pytest.mark.asyncio
async def test_T14_codemap_page_404_for_missing_repo(
self, client: AsyncClient
) -> None:
"""Route must return 404 for a repo that does not exist."""
resp = await client.get("/ghost/no-such-repo/intel/codemap")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_T15_sort_by_fan_in(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""sort=fan-in must return modules ordered by fan_in descending."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=fan-in")
assert resp.status_code == 200
# musehub/api/routes/ui.py has fan_in=8, must appear before others
text = resp.text
pos_api = text.find("ui.py")
pos_types = text.find("types.py")
assert pos_api < pos_types, "fan-in sort: ui.py (fi=8) should appear before types.py (fi=3)"
@pytest.mark.asyncio
async def test_T16_sort_by_fan_out(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""sort=fan-out must return modules ordered by fan_out descending."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=fan-out")
assert resp.status_code == 200
text = resp.text
# svc.py has fan_out=4, must appear before models.py (fo=1)
pos_svc = text.find("svc.py")
pos_models = text.find("models.py")
assert pos_svc < pos_models, "fan-out sort: svc.py (fo=4) should appear before models.py (fo=1)"
@pytest.mark.asyncio
async def test_T17_unknown_sort_coerces_to_symbols(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""An unknown sort param must be coerced to 'symbols' (no 400/500)."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=invalid_sort")
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_T18_top_filter_limits_rows(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""top=20 with 25 total modules must show max 20 rows but stat chip shows 25."""
repo = await create_repo(db_session, owner="cmtop", slug="cm-top")
rid = str(repo.repo_id)
for i in range(25):
await _insert_module_row(db_session, rid, f"src/mod_{i}.py", fan_in=i)
await _insert_meta_row(db_session, rid, total_modules=25, total_edges=0)
await db_session.commit()
resp = await client.get("/cmtop/cm-top/intel/codemap?top=20")
assert resp.status_code == 200
# stat chip must show 25, not 20
assert "25" in resp.text, "Stat chip must show total module count (25), not page length"
@pytest.mark.asyncio
async def test_T19_stat_chips_use_meta_row(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""Stat chips must display meta row values (5 modules, 17 edges, 0 cycles)."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap")
assert resp.status_code == 200
assert "17" in resp.text, "Edge count from meta row (17) not found in response"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T4 — E2E HTML
# ─────────────────────────────────────────────────────────────────────────────
class TestHTML:
@pytest.mark.asyncio
async def test_T20_stat_chips_present(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""HTML must contain stat chip labels: Modules, Edges, Cycles."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap")
assert resp.status_code == 200
for label in ("Modules", "Edges", "Cycles"):
assert label in resp.text, f"Stat chip label '{label}' missing"
@pytest.mark.asyncio
async def test_T21_fan_in_bar_present(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""HTML must contain cm-fan-bar element for fan-in visualization."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap")
assert resp.status_code == 200
assert "cm-fan-bar" in resp.text, "cm-fan-bar class missing from HTML"
@pytest.mark.asyncio
async def test_T22_cycle_ok_shown_when_no_cycles(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""cm-cycle-ok element must be present when cycle_count == 0."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap")
assert resp.status_code == 200
assert "cm-cycle-ok" in resp.text, "cm-cycle-ok class missing (cycle_count=0)"
@pytest.mark.asyncio
async def test_T23_dashboard_back_link_present(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""Page must contain a back link to the intel dashboard."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap")
assert resp.status_code == 200
assert "/intel" in resp.text, "Back link to Intel Hub missing"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T5 — Data integrity
# ─────────────────────────────────────────────────────────────────────────────
class TestDataIntegrity:
@pytest.mark.asyncio
async def test_T24_upsert_is_idempotent(
self, db_session: AsyncSession
) -> None:
"""Upserting the same module row twice must not create duplicates."""
repo = await create_repo(db_session, owner="cmdup", slug="cm-dup")
rid = str(repo.repo_id)
for _ in range(2):
await _insert_module_row(db_session, rid, "src/a.py", fan_in=3)
await db_session.commit()
result = await db_session.execute(
sa.select(sa.func.count())
.select_from(MusehubIntelCodemapModule)
.where(MusehubIntelCodemapModule.repo_id == rid)
)
count = result.scalar_one()
assert count == 1, f"Upsert created duplicate: expected 1 row, got {count}"
@pytest.mark.asyncio
async def test_T25_meta_upsert_overwrites(
self, db_session: AsyncSession
) -> None:
"""Second meta upsert must overwrite total_edges, not create a second row."""
repo = await create_repo(db_session, owner="cmmeta", slug="cm-meta")
rid = str(repo.repo_id)
await _insert_meta_row(db_session, rid, total_modules=5, total_edges=10)
await _insert_meta_row(db_session, rid, total_modules=6, total_edges=20)
await db_session.commit()
result = await db_session.execute(
sa.select(MusehubIntelCodemapMeta)
.where(MusehubIntelCodemapMeta.repo_id == rid)
)
rows = result.scalars().all()
assert len(rows) == 1, f"Expected 1 meta row, got {len(rows)}"
assert rows[0].total_edges == 20, f"Expected total_edges=20, got {rows[0].total_edges}"
@pytest.mark.asyncio
async def test_T26_cross_repo_isolation(
self, db_session: AsyncSession
) -> None:
"""Module rows from repo A must not appear in repo B queries."""
repo_a = await create_repo(db_session, owner="cmisolate", slug="repo-a")
repo_b = await create_repo(db_session, owner="cmisolate", slug="repo-b")
rid_a = str(repo_a.repo_id)
rid_b = str(repo_b.repo_id)
await _insert_module_row(db_session, rid_a, "src/a.py", fan_in=5)
await db_session.commit()
result = await db_session.execute(
sa.select(MusehubIntelCodemapModule)
.where(MusehubIntelCodemapModule.repo_id == rid_b)
)
assert result.first() is None, "Cross-repo contamination: repo B sees repo A rows"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T6 — Performance
# ─────────────────────────────────────────────────────────────────────────────
class TestPerformance:
@pytest.mark.asyncio
async def test_T27_provider_completes_under_threshold(
self, db_session: AsyncSession
) -> None:
"""CodemapProvider must complete within 10 s for a 50-file manifest."""
from musehub.services.musehub_intel_providers import CodemapProvider
repo = await create_repo(db_session, owner="cmperfp", slug="cm-perf-p")
rid = str(repo.repo_id)
n = 50
manifest = {f"src/mod_{i}.py": long_id(f"{i:064x}") for i in range(n)}
await _seed_snapshot(db_session, rid, manifest)
def _tree_for(path: str) -> JSONObject:
return _fake_import_tree(path, [], n_symbols=5)
trees = [_tree_for(fp) for fp in manifest]
src_bytes = [b"src"] * n
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=src_bytes)
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=trees),
patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"),
):
t0 = time.monotonic()
await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmperfp", "slug": "cm-perf-p"})
elapsed = time.monotonic() - t0
assert elapsed < 10.0, f"Provider took {elapsed:.2f}s — exceeds 10s threshold"
@pytest.mark.asyncio
async def test_T28_route_responds_under_500ms(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""GET /intel/codemap must respond in under 500 ms for a seeded repo."""
t0 = time.monotonic()
resp = await client.get("/cmuser/cm-e2e/intel/codemap")
elapsed = time.monotonic() - t0
assert resp.status_code == 200
assert elapsed < 0.5, f"Route took {elapsed:.3f}s — exceeds 500ms threshold"
def test_T29_module_table_has_repo_index(self) -> None:
"""ix_intel_codemap_modules_repo index must exist on the ORM model."""
table = MusehubIntelCodemapModule.__table__
index_names = {idx.name for idx in table.indexes}
assert "ix_intel_codemap_modules_repo" in index_names, (
f"Index missing. Found: {index_names}"
)
# ─────────────────────────────────────────────────────────────────────────────
# Layer T7 — Security
# ─────────────────────────────────────────────────────────────────────────────
class TestSecurity:
@pytest.mark.asyncio
async def test_T30_xss_file_path_is_escaped(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""file_path containing HTML must be escaped in the rendered page."""
repo = await create_repo(db_session, owner="cmxss", slug="cm-xss")
rid = str(repo.repo_id)
xss_path = "src/.py"
await _insert_module_row(db_session, rid, xss_path, fan_in=1)
await _insert_meta_row(db_session, rid, total_modules=1, total_edges=0)
await db_session.commit()
resp = await client.get("/cmxss/cm-xss/intel/codemap")
assert resp.status_code == 200
assert "" not in resp.text, "Unescaped XSS payload in response"
@pytest.mark.asyncio
async def test_T31_sql_injection_in_sort_param_is_safe(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""SQL injection attempt in sort param must return 200, not 500."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=symbols%3BDROP+TABLE+musehub_intel_codemap_modules")
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_T32_invalid_top_param_does_not_500(
self, client: AsyncClient, cm_repo: MusehubRepo
) -> None:
"""Invalid top param (non-integer string) must not return 500."""
resp = await client.get("/cmuser/cm-e2e/intel/codemap?top=INVALID")
assert resp.status_code == 200