test_intel_codemap.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
22 days ago
| 1 | """Code Map intel — full 7-tier test suite (issue #21). |
| 2 | |
| 3 | Tests are written TDD-first: all tests must be RED before Phase 4–7 |
| 4 | implementation begins, then GREEN after. |
| 5 | |
| 6 | Tiers |
| 7 | ----- |
| 8 | T01–T05 Layer T1 — DB model (columns, nullable, cascade, meta, index) |
| 9 | T06–T11 Layer T2 — Provider (no subprocess, fan_in, fan_out, cycles, edges, empty) |
| 10 | T12–T19 Layer T3 — Route (200, empty state, 404, sort, top filter, stat chips, meta) |
| 11 | T20–T23 Layer T4 — E2E HTML (stat chips, fan-in bar, cycle panel, dashboard link) |
| 12 | T24–T26 Layer T5 — Data integrity (upsert idempotent, meta overwrite, cross-repo) |
| 13 | T27–T29 Layer T6 — Performance (provider speed, route speed, index check) |
| 14 | T30–T32 Layer T7 — Security (XSS escape, SQL injection, no 500 on bad params) |
| 15 | """ |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import time |
| 19 | from datetime import datetime, timezone |
| 20 | from unittest.mock import AsyncMock, patch |
| 21 | |
| 22 | import pytest |
| 23 | import pytest_asyncio |
| 24 | import sqlalchemy as sa |
| 25 | from httpx import AsyncClient |
| 26 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 27 | from sqlalchemy.ext.asyncio import AsyncSession |
| 28 | |
| 29 | from muse.core.types import long_id |
| 30 | from musehub.db.musehub_intel_models import MusehubIntelCodemapMeta, MusehubIntelCodemapModule |
| 31 | from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef |
| 32 | from musehub.types.json_types import JSONObject |
| 33 | from tests.factories import create_repo |
| 34 | |
| 35 | _REF = long_id("b" * 64) |
| 36 | |
| 37 | |
| 38 | # ───────────────────────────────────────────────────────────────────────────── |
| 39 | # Helpers |
| 40 | # ───────────────────────────────────────────────────────────────────────────── |
| 41 | |
| 42 | async def _insert_module_row( |
| 43 | session: AsyncSession, |
| 44 | repo_id: str, |
| 45 | file_path: str, |
| 46 | symbol_count: int = 0, |
| 47 | fan_in: int = 0, |
| 48 | fan_out: int = 0, |
| 49 | language: str = "Python", |
| 50 | ref: str = _REF, |
| 51 | ) -> None: |
| 52 | """Upsert one row into musehub_intel_codemap_modules.""" |
| 53 | await session.execute( |
| 54 | pg_insert(MusehubIntelCodemapModule) |
| 55 | .values( |
| 56 | repo_id=repo_id, |
| 57 | file_path=file_path, |
| 58 | symbol_count=symbol_count, |
| 59 | fan_in=fan_in, |
| 60 | fan_out=fan_out, |
| 61 | language=language, |
| 62 | ref=ref, |
| 63 | ) |
| 64 | .on_conflict_do_update( |
| 65 | index_elements=["repo_id", "file_path"], |
| 66 | set_={ |
| 67 | "symbol_count": symbol_count, |
| 68 | "fan_in": fan_in, |
| 69 | "fan_out": fan_out, |
| 70 | "language": language, |
| 71 | "ref": ref, |
| 72 | }, |
| 73 | ) |
| 74 | ) |
| 75 | |
| 76 | |
| 77 | async def _insert_meta_row( |
| 78 | session: AsyncSession, |
| 79 | repo_id: str, |
| 80 | total_modules: int = 0, |
| 81 | total_edges: int = 0, |
| 82 | cycle_count: int = 0, |
| 83 | cycles_json: list[list[str]] | None = None, |
| 84 | ref: str = _REF, |
| 85 | ) -> None: |
| 86 | """Upsert one row into musehub_intel_codemap_meta.""" |
| 87 | await session.execute( |
| 88 | pg_insert(MusehubIntelCodemapMeta) |
| 89 | .values( |
| 90 | repo_id=repo_id, |
| 91 | total_modules=total_modules, |
| 92 | total_edges=total_edges, |
| 93 | cycle_count=cycle_count, |
| 94 | cycles_json=cycles_json, |
| 95 | ref=ref, |
| 96 | ) |
| 97 | .on_conflict_do_update( |
| 98 | index_elements=["repo_id"], |
| 99 | set_={ |
| 100 | "total_modules": total_modules, |
| 101 | "total_edges": total_edges, |
| 102 | "cycle_count": cycle_count, |
| 103 | "cycles_json": cycles_json, |
| 104 | "ref": ref, |
| 105 | }, |
| 106 | ) |
| 107 | ) |
| 108 | |
| 109 | |
| 110 | async def _seed_snapshot( |
| 111 | session: AsyncSession, |
| 112 | repo_id: str, |
| 113 | manifest: dict[str, str], |
| 114 | ) -> str: |
| 115 | """Insert a MusehubCommit + MusehubSnapshot, return snapshot_id.""" |
| 116 | import msgpack |
| 117 | |
| 118 | snap_id = long_id("c" * 64) |
| 119 | commit_id = long_id("d" * 64) |
| 120 | |
| 121 | await session.execute( |
| 122 | pg_insert(MusehubSnapshot) |
| 123 | .values( |
| 124 | snapshot_id = snap_id, |
| 125 | directories = [], |
| 126 | manifest_blob= msgpack.packb(manifest), |
| 127 | entry_count = len(manifest), |
| 128 | created_at = datetime(2026, 1, 1, tzinfo=timezone.utc), |
| 129 | ) |
| 130 | .on_conflict_do_nothing() |
| 131 | ) |
| 132 | await session.execute( |
| 133 | pg_insert(MusehubSnapshotRef) |
| 134 | .values(repo_id=repo_id, snapshot_id=snap_id) |
| 135 | .on_conflict_do_nothing() |
| 136 | ) |
| 137 | await session.execute( |
| 138 | pg_insert(MusehubCommit) |
| 139 | .values( |
| 140 | commit_id = commit_id, |
| 141 | branch = "dev", |
| 142 | parent_ids = [], |
| 143 | message = "test", |
| 144 | author = "cmuser", |
| 145 | timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc), |
| 146 | snapshot_id = snap_id, |
| 147 | ) |
| 148 | .on_conflict_do_nothing() |
| 149 | ) |
| 150 | await session.execute( |
| 151 | pg_insert(MusehubCommitRef) |
| 152 | .values(repo_id=repo_id, commit_id=commit_id) |
| 153 | .on_conflict_do_nothing() |
| 154 | ) |
| 155 | await session.commit() |
| 156 | return snap_id |
| 157 | |
| 158 | |
| 159 | def _fake_import_tree( |
| 160 | file_path: str, |
| 161 | imports: list[str], |
| 162 | n_symbols: int = 3, |
| 163 | ) -> JSONObject: |
| 164 | """Return a SymbolTree with n_symbols functions plus import records.""" |
| 165 | tree: JSONObject = {} |
| 166 | for i in range(n_symbols): |
| 167 | tree[f"{file_path}::fn_{i}"] = { |
| 168 | "kind": "function", |
| 169 | "name": f"fn_{i}", |
| 170 | "qualified_name": f"fn_{i}", |
| 171 | "content_id": long_id("a" * 64), |
| 172 | "body_hash": long_id("b" * 64), |
| 173 | "signature_id": long_id("c" * 64), |
| 174 | "metadata_id": "", |
| 175 | "canonical_key": f"{file_path}##function#fn_{i}#1", |
| 176 | "lineno": i + 1, |
| 177 | "end_lineno": i + 2, |
| 178 | } |
| 179 | for j, dotted in enumerate(imports): |
| 180 | key = f"{file_path}::_import_{j}" |
| 181 | tree[key] = { |
| 182 | "kind": "import", |
| 183 | "name": dotted.split(".")[-1], |
| 184 | "qualified_name": f"import::{dotted}::_sym", |
| 185 | "content_id": long_id("e" * 64), |
| 186 | "body_hash": "", |
| 187 | "signature_id": "", |
| 188 | "metadata_id": "", |
| 189 | "canonical_key": f"{file_path}##import#{dotted}#0", |
| 190 | "lineno": n_symbols + j + 1, |
| 191 | "end_lineno": n_symbols + j + 2, |
| 192 | } |
| 193 | return tree |
| 194 | |
| 195 | |
| 196 | @pytest_asyncio.fixture |
| 197 | async def cm_repo(db_session: AsyncSession) -> MusehubRepo: |
| 198 | """Repo seeded with 5 module rows and a meta row.""" |
| 199 | repo = await create_repo(db_session, owner="cmuser", slug="cm-e2e") |
| 200 | rid = str(repo.repo_id) |
| 201 | |
| 202 | for i, (fp, fi, fo) in enumerate([ |
| 203 | ("musehub/api/routes/ui.py", 8, 3), |
| 204 | ("musehub/services/svc.py", 5, 4), |
| 205 | ("musehub/db/models.py", 4, 1), |
| 206 | ("musehub/core/types.py", 3, 0), |
| 207 | ("musehub/utils/helpers.py", 1, 2), |
| 208 | ]): |
| 209 | await _insert_module_row( |
| 210 | db_session, rid, fp, |
| 211 | symbol_count=10 + i, |
| 212 | fan_in=fi, fan_out=fo, |
| 213 | language="Python", |
| 214 | ) |
| 215 | |
| 216 | await _insert_meta_row( |
| 217 | db_session, rid, |
| 218 | total_modules=5, total_edges=17, cycle_count=0, |
| 219 | ) |
| 220 | await db_session.commit() |
| 221 | return repo |
| 222 | |
| 223 | |
| 224 | # ───────────────────────────────────────────────────────────────────────────── |
| 225 | # Layer T1 — DB model |
| 226 | # ───────────────────────────────────────────────────────────────────────────── |
| 227 | |
| 228 | class TestDBModel: |
| 229 | |
| 230 | def test_T01_module_model_has_all_required_columns(self) -> None: |
| 231 | """MusehubIntelCodemapModule must declare all expected mapped columns.""" |
| 232 | cols = { |
| 233 | c.key |
| 234 | for c in sa.inspect(MusehubIntelCodemapModule).mapper.column_attrs |
| 235 | } |
| 236 | for required in ("repo_id", "file_path", "symbol_count", "fan_in", "fan_out", "language", "ref"): |
| 237 | assert required in cols, f"Column '{required}' missing from MusehubIntelCodemapModule" |
| 238 | |
| 239 | def test_T02_meta_model_has_all_required_columns(self) -> None: |
| 240 | """MusehubIntelCodemapMeta must declare all expected mapped columns.""" |
| 241 | cols = { |
| 242 | c.key |
| 243 | for c in sa.inspect(MusehubIntelCodemapMeta).mapper.column_attrs |
| 244 | } |
| 245 | for required in ("repo_id", "total_modules", "total_edges", "cycle_count", "cycles_json", "ref"): |
| 246 | assert required in cols, f"Column '{required}' missing from MusehubIntelCodemapMeta" |
| 247 | |
| 248 | def test_T03_cycles_json_is_nullable(self) -> None: |
| 249 | """cycles_json must be nullable — most repos have no cycles.""" |
| 250 | col = MusehubIntelCodemapMeta.__table__.c["cycles_json"] |
| 251 | assert col.nullable, "cycles_json must be nullable" |
| 252 | |
| 253 | def test_T04_composite_pk_modules(self) -> None: |
| 254 | """Primary key of codemap_modules must be (repo_id, file_path).""" |
| 255 | pk_cols = {c.name for c in MusehubIntelCodemapModule.__table__.primary_key.columns} |
| 256 | assert pk_cols == {"repo_id", "file_path"}, f"Unexpected PK: {pk_cols}" |
| 257 | |
| 258 | @pytest.mark.asyncio |
| 259 | async def test_T05_cascade_delete_removes_module_rows( |
| 260 | self, db_session: AsyncSession |
| 261 | ) -> None: |
| 262 | """Deleting a repo must cascade-delete all codemap module rows.""" |
| 263 | repo = await create_repo(db_session, owner="cmuser2", slug="cm-cascade") |
| 264 | rid = str(repo.repo_id) |
| 265 | await _insert_module_row(db_session, rid, "src/a.py", fan_in=1) |
| 266 | await db_session.commit() |
| 267 | |
| 268 | await db_session.delete(repo) |
| 269 | await db_session.commit() |
| 270 | |
| 271 | result = await db_session.execute( |
| 272 | sa.select(MusehubIntelCodemapModule) |
| 273 | .where(MusehubIntelCodemapModule.repo_id == rid) |
| 274 | ) |
| 275 | assert result.first() is None, "Cascade delete failed — module rows remain" |
| 276 | |
| 277 | |
| 278 | # ───────────────────────────────────────────────────────────────────────────── |
| 279 | # Layer T2 — Provider |
| 280 | # ───────────────────────────────────────────────────────────────────────────── |
| 281 | |
| 282 | class TestProvider: |
| 283 | |
| 284 | @pytest.mark.asyncio |
| 285 | async def test_T06_provider_returns_no_subprocess( |
| 286 | self, db_session: AsyncSession |
| 287 | ) -> None: |
| 288 | """CodemapProvider.compute must not import subprocess or asyncio.create_subprocess_exec.""" |
| 289 | import inspect |
| 290 | from musehub.services.musehub_intel_providers import CodemapProvider |
| 291 | src = inspect.getsource(CodemapProvider.compute) |
| 292 | assert "subprocess" not in src, "CodemapProvider.compute spawns a subprocess" |
| 293 | assert "create_subprocess" not in src, "CodemapProvider.compute uses create_subprocess" |
| 294 | |
| 295 | @pytest.mark.asyncio |
| 296 | async def test_T07_provider_computes_fan_out( |
| 297 | self, db_session: AsyncSession |
| 298 | ) -> None: |
| 299 | """fan_out counts resolved imports from the manifest, not stdlib.""" |
| 300 | from musehub.services.musehub_intel_providers import CodemapProvider |
| 301 | |
| 302 | repo = await create_repo(db_session, owner="cmuser3", slug="cm-fanout") |
| 303 | rid = str(repo.repo_id) |
| 304 | |
| 305 | # a.py imports b.py; c.py is stdlib (unresolved) |
| 306 | manifest = { |
| 307 | "src/a.py": long_id("a" * 64), |
| 308 | "src/b.py": long_id("b" * 64), |
| 309 | } |
| 310 | await _seed_snapshot(db_session, rid, manifest) |
| 311 | |
| 312 | a_tree = _fake_import_tree("src/a.py", ["src.b"], n_symbols=2) |
| 313 | b_tree = _fake_import_tree("src/b.py", ["os.path"], n_symbols=1) |
| 314 | |
| 315 | mock_backend = AsyncMock() |
| 316 | mock_backend.get = AsyncMock(side_effect=[b"src_a", b"src_b"]) |
| 317 | |
| 318 | with ( |
| 319 | patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), |
| 320 | patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]), |
| 321 | patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), |
| 322 | ): |
| 323 | provider = CodemapProvider() |
| 324 | results = await provider.compute(db_session, rid, "dev", {"owner": "cmuser3", "slug": "cm-fanout"}) |
| 325 | |
| 326 | assert results, "Provider returned empty results" |
| 327 | assert results[0][0] == "intel.code.codemap" |
| 328 | data = results[0][1] |
| 329 | # a.py resolves src.b → 1 edge; b.py resolves os.path → 0 (stdlib) |
| 330 | assert data["edges"] == 1, f"Expected 1 edge, got {data['edges']}" |
| 331 | |
| 332 | @pytest.mark.asyncio |
| 333 | async def test_T08_provider_computes_fan_in( |
| 334 | self, db_session: AsyncSession |
| 335 | ) -> None: |
| 336 | """fan_in of b.py must equal number of files that import b.py.""" |
| 337 | from musehub.services.musehub_intel_providers import CodemapProvider |
| 338 | |
| 339 | repo = await create_repo(db_session, owner="cmuser4", slug="cm-fanin") |
| 340 | rid = str(repo.repo_id) |
| 341 | |
| 342 | manifest = { |
| 343 | "src/a.py": long_id("a" * 64), |
| 344 | "src/b.py": long_id("b" * 64), |
| 345 | "src/c.py": long_id("c" * 64), |
| 346 | } |
| 347 | await _seed_snapshot(db_session, rid, manifest) |
| 348 | |
| 349 | a_tree = _fake_import_tree("src/a.py", ["src.b"], n_symbols=1) |
| 350 | b_tree = _fake_import_tree("src/b.py", [], n_symbols=1) |
| 351 | c_tree = _fake_import_tree("src/c.py", ["src.b"], n_symbols=1) |
| 352 | |
| 353 | mock_backend = AsyncMock() |
| 354 | mock_backend.get = AsyncMock(side_effect=[b"src_a", b"src_b", b"src_c"]) |
| 355 | |
| 356 | with ( |
| 357 | patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), |
| 358 | patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree, c_tree]), |
| 359 | patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), |
| 360 | ): |
| 361 | provider = CodemapProvider() |
| 362 | await provider.compute(db_session, rid, "dev", {"owner": "cmuser4", "slug": "cm-fanin"}) |
| 363 | await db_session.commit() |
| 364 | |
| 365 | result = await db_session.execute( |
| 366 | sa.select(MusehubIntelCodemapModule) |
| 367 | .where( |
| 368 | MusehubIntelCodemapModule.repo_id == rid, |
| 369 | MusehubIntelCodemapModule.file_path == "src/b.py", |
| 370 | ) |
| 371 | ) |
| 372 | row = result.scalar_one_or_none() |
| 373 | assert row is not None, "src/b.py row not found" |
| 374 | assert row.fan_in == 2, f"Expected fan_in=2, got {row.fan_in}" |
| 375 | |
| 376 | @pytest.mark.asyncio |
| 377 | async def test_T09_provider_detects_no_cycles_for_dag( |
| 378 | self, db_session: AsyncSession |
| 379 | ) -> None: |
| 380 | """A pure DAG import graph must produce cycle_count=0.""" |
| 381 | from musehub.services.musehub_intel_providers import CodemapProvider |
| 382 | |
| 383 | repo = await create_repo(db_session, owner="cmuser5", slug="cm-nocycle") |
| 384 | rid = str(repo.repo_id) |
| 385 | |
| 386 | manifest = {"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)} |
| 387 | await _seed_snapshot(db_session, rid, manifest) |
| 388 | |
| 389 | a_tree = _fake_import_tree("src/a.py", ["src.b"]) |
| 390 | b_tree = _fake_import_tree("src/b.py", []) |
| 391 | |
| 392 | mock_backend = AsyncMock() |
| 393 | mock_backend.get = AsyncMock(side_effect=[b"a", b"b"]) |
| 394 | |
| 395 | with ( |
| 396 | patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), |
| 397 | patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]), |
| 398 | patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), |
| 399 | ): |
| 400 | results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser5", "slug": "cm-nocycle"}) |
| 401 | |
| 402 | assert results[0][1]["cycles"] == 0 |
| 403 | |
| 404 | @pytest.mark.asyncio |
| 405 | async def test_T10_provider_detects_mutual_import_cycle( |
| 406 | self, db_session: AsyncSession |
| 407 | ) -> None: |
| 408 | """A ↔ B mutual import must be detected as one cycle.""" |
| 409 | from musehub.services.musehub_intel_providers import CodemapProvider |
| 410 | |
| 411 | repo = await create_repo(db_session, owner="cmuser6", slug="cm-cycle") |
| 412 | rid = str(repo.repo_id) |
| 413 | |
| 414 | manifest = {"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)} |
| 415 | await _seed_snapshot(db_session, rid, manifest) |
| 416 | |
| 417 | # a imports b AND b imports a → cycle |
| 418 | a_tree = _fake_import_tree("src/a.py", ["src.b"]) |
| 419 | b_tree = _fake_import_tree("src/b.py", ["src.a"]) |
| 420 | |
| 421 | mock_backend = AsyncMock() |
| 422 | mock_backend.get = AsyncMock(side_effect=[b"a", b"b"]) |
| 423 | |
| 424 | with ( |
| 425 | patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), |
| 426 | patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[a_tree, b_tree]), |
| 427 | patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), |
| 428 | ): |
| 429 | results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser6", "slug": "cm-cycle"}) |
| 430 | |
| 431 | assert results[0][1]["cycles"] == 1, f"Expected 1 cycle, got {results[0][1]['cycles']}" |
| 432 | |
| 433 | @pytest.mark.asyncio |
| 434 | async def test_T11_provider_returns_empty_for_missing_manifest( |
| 435 | self, db_session: AsyncSession |
| 436 | ) -> None: |
| 437 | """Provider must return [] when no commits exist for the repo.""" |
| 438 | from musehub.services.musehub_intel_providers import CodemapProvider |
| 439 | |
| 440 | repo = await create_repo(db_session, owner="cmuser7", slug="cm-empty") |
| 441 | rid = str(repo.repo_id) |
| 442 | await db_session.commit() |
| 443 | |
| 444 | results = await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmuser7", "slug": "cm-empty"}) |
| 445 | assert results == [], f"Expected [], got {results}" |
| 446 | |
| 447 | |
| 448 | # ───────────────────────────────────────────────────────────────────────────── |
| 449 | # Layer T3 — Route |
| 450 | # ───────────────────────────────────────────────────────────────────────────── |
| 451 | |
| 452 | class TestRoute: |
| 453 | |
| 454 | @pytest.mark.asyncio |
| 455 | async def test_T12_codemap_page_returns_200( |
| 456 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 457 | ) -> None: |
| 458 | """GET /cmuser/cm-e2e/intel/codemap must return HTTP 200.""" |
| 459 | resp = await client.get("/cmuser/cm-e2e/intel/codemap") |
| 460 | assert resp.status_code == 200, f"Expected 200, got {resp.status_code}" |
| 461 | |
| 462 | @pytest.mark.asyncio |
| 463 | async def test_T13_codemap_page_empty_state( |
| 464 | self, client: AsyncClient, db_session: AsyncSession |
| 465 | ) -> None: |
| 466 | """Route must render empty state when no codemap rows exist.""" |
| 467 | repo = await create_repo(db_session, owner="cmempty", slug="cm-nodata") |
| 468 | await db_session.commit() |
| 469 | resp = await client.get("/cmempty/cm-nodata/intel/codemap") |
| 470 | assert resp.status_code == 200 |
| 471 | assert "Push a commit" in resp.text |
| 472 | |
| 473 | @pytest.mark.asyncio |
| 474 | async def test_T14_codemap_page_404_for_missing_repo( |
| 475 | self, client: AsyncClient |
| 476 | ) -> None: |
| 477 | """Route must return 404 for a repo that does not exist.""" |
| 478 | resp = await client.get("/ghost/no-such-repo/intel/codemap") |
| 479 | assert resp.status_code == 404 |
| 480 | |
| 481 | @pytest.mark.asyncio |
| 482 | async def test_T15_sort_by_fan_in( |
| 483 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 484 | ) -> None: |
| 485 | """sort=fan-in must return modules ordered by fan_in descending.""" |
| 486 | resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=fan-in") |
| 487 | assert resp.status_code == 200 |
| 488 | # musehub/api/routes/ui.py has fan_in=8, must appear before others |
| 489 | text = resp.text |
| 490 | pos_api = text.find("ui.py") |
| 491 | pos_types = text.find("types.py") |
| 492 | assert pos_api < pos_types, "fan-in sort: ui.py (fi=8) should appear before types.py (fi=3)" |
| 493 | |
| 494 | @pytest.mark.asyncio |
| 495 | async def test_T16_sort_by_fan_out( |
| 496 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 497 | ) -> None: |
| 498 | """sort=fan-out must return modules ordered by fan_out descending.""" |
| 499 | resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=fan-out") |
| 500 | assert resp.status_code == 200 |
| 501 | text = resp.text |
| 502 | # svc.py has fan_out=4, must appear before models.py (fo=1) |
| 503 | pos_svc = text.find("svc.py") |
| 504 | pos_models = text.find("models.py") |
| 505 | assert pos_svc < pos_models, "fan-out sort: svc.py (fo=4) should appear before models.py (fo=1)" |
| 506 | |
| 507 | @pytest.mark.asyncio |
| 508 | async def test_T17_unknown_sort_coerces_to_symbols( |
| 509 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 510 | ) -> None: |
| 511 | """An unknown sort param must be coerced to 'symbols' (no 400/500).""" |
| 512 | resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=invalid_sort") |
| 513 | assert resp.status_code == 200 |
| 514 | |
| 515 | @pytest.mark.asyncio |
| 516 | async def test_T18_top_filter_limits_rows( |
| 517 | self, client: AsyncClient, db_session: AsyncSession |
| 518 | ) -> None: |
| 519 | """top=20 with 25 total modules must show max 20 rows but stat chip shows 25.""" |
| 520 | repo = await create_repo(db_session, owner="cmtop", slug="cm-top") |
| 521 | rid = str(repo.repo_id) |
| 522 | for i in range(25): |
| 523 | await _insert_module_row(db_session, rid, f"src/mod_{i}.py", fan_in=i) |
| 524 | await _insert_meta_row(db_session, rid, total_modules=25, total_edges=0) |
| 525 | await db_session.commit() |
| 526 | |
| 527 | resp = await client.get("/cmtop/cm-top/intel/codemap?top=20") |
| 528 | assert resp.status_code == 200 |
| 529 | # stat chip must show 25, not 20 |
| 530 | assert "25" in resp.text, "Stat chip must show total module count (25), not page length" |
| 531 | |
| 532 | @pytest.mark.asyncio |
| 533 | async def test_T19_stat_chips_use_meta_row( |
| 534 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 535 | ) -> None: |
| 536 | """Stat chips must display meta row values (5 modules, 17 edges, 0 cycles).""" |
| 537 | resp = await client.get("/cmuser/cm-e2e/intel/codemap") |
| 538 | assert resp.status_code == 200 |
| 539 | assert "17" in resp.text, "Edge count from meta row (17) not found in response" |
| 540 | |
| 541 | |
| 542 | # ───────────────────────────────────────────────────────────────────────────── |
| 543 | # Layer T4 — E2E HTML |
| 544 | # ───────────────────────────────────────────────────────────────────────────── |
| 545 | |
| 546 | class TestHTML: |
| 547 | |
| 548 | @pytest.mark.asyncio |
| 549 | async def test_T20_stat_chips_present( |
| 550 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 551 | ) -> None: |
| 552 | """HTML must contain stat chip labels: Modules, Edges, Cycles.""" |
| 553 | resp = await client.get("/cmuser/cm-e2e/intel/codemap") |
| 554 | assert resp.status_code == 200 |
| 555 | for label in ("Modules", "Edges", "Cycles"): |
| 556 | assert label in resp.text, f"Stat chip label '{label}' missing" |
| 557 | |
| 558 | @pytest.mark.asyncio |
| 559 | async def test_T21_fan_in_bar_present( |
| 560 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 561 | ) -> None: |
| 562 | """HTML must contain cm-fan-bar element for fan-in visualization.""" |
| 563 | resp = await client.get("/cmuser/cm-e2e/intel/codemap") |
| 564 | assert resp.status_code == 200 |
| 565 | assert "cm-fan-bar" in resp.text, "cm-fan-bar class missing from HTML" |
| 566 | |
| 567 | @pytest.mark.asyncio |
| 568 | async def test_T22_cycle_ok_shown_when_no_cycles( |
| 569 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 570 | ) -> None: |
| 571 | """cm-cycle-ok element must be present when cycle_count == 0.""" |
| 572 | resp = await client.get("/cmuser/cm-e2e/intel/codemap") |
| 573 | assert resp.status_code == 200 |
| 574 | assert "cm-cycle-ok" in resp.text, "cm-cycle-ok class missing (cycle_count=0)" |
| 575 | |
| 576 | @pytest.mark.asyncio |
| 577 | async def test_T23_dashboard_back_link_present( |
| 578 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 579 | ) -> None: |
| 580 | """Page must contain a back link to the intel dashboard.""" |
| 581 | resp = await client.get("/cmuser/cm-e2e/intel/codemap") |
| 582 | assert resp.status_code == 200 |
| 583 | assert "/intel" in resp.text, "Back link to Intel Hub missing" |
| 584 | |
| 585 | |
| 586 | # ───────────────────────────────────────────────────────────────────────────── |
| 587 | # Layer T5 — Data integrity |
| 588 | # ───────────────────────────────────────────────────────────────────────────── |
| 589 | |
| 590 | class TestDataIntegrity: |
| 591 | |
| 592 | @pytest.mark.asyncio |
| 593 | async def test_T24_upsert_is_idempotent( |
| 594 | self, db_session: AsyncSession |
| 595 | ) -> None: |
| 596 | """Upserting the same module row twice must not create duplicates.""" |
| 597 | repo = await create_repo(db_session, owner="cmdup", slug="cm-dup") |
| 598 | rid = str(repo.repo_id) |
| 599 | |
| 600 | for _ in range(2): |
| 601 | await _insert_module_row(db_session, rid, "src/a.py", fan_in=3) |
| 602 | await db_session.commit() |
| 603 | |
| 604 | result = await db_session.execute( |
| 605 | sa.select(sa.func.count()) |
| 606 | .select_from(MusehubIntelCodemapModule) |
| 607 | .where(MusehubIntelCodemapModule.repo_id == rid) |
| 608 | ) |
| 609 | count = result.scalar_one() |
| 610 | assert count == 1, f"Upsert created duplicate: expected 1 row, got {count}" |
| 611 | |
| 612 | @pytest.mark.asyncio |
| 613 | async def test_T25_meta_upsert_overwrites( |
| 614 | self, db_session: AsyncSession |
| 615 | ) -> None: |
| 616 | """Second meta upsert must overwrite total_edges, not create a second row.""" |
| 617 | repo = await create_repo(db_session, owner="cmmeta", slug="cm-meta") |
| 618 | rid = str(repo.repo_id) |
| 619 | |
| 620 | await _insert_meta_row(db_session, rid, total_modules=5, total_edges=10) |
| 621 | await _insert_meta_row(db_session, rid, total_modules=6, total_edges=20) |
| 622 | await db_session.commit() |
| 623 | |
| 624 | result = await db_session.execute( |
| 625 | sa.select(MusehubIntelCodemapMeta) |
| 626 | .where(MusehubIntelCodemapMeta.repo_id == rid) |
| 627 | ) |
| 628 | rows = result.scalars().all() |
| 629 | assert len(rows) == 1, f"Expected 1 meta row, got {len(rows)}" |
| 630 | assert rows[0].total_edges == 20, f"Expected total_edges=20, got {rows[0].total_edges}" |
| 631 | |
| 632 | @pytest.mark.asyncio |
| 633 | async def test_T26_cross_repo_isolation( |
| 634 | self, db_session: AsyncSession |
| 635 | ) -> None: |
| 636 | """Module rows from repo A must not appear in repo B queries.""" |
| 637 | repo_a = await create_repo(db_session, owner="cmisolate", slug="repo-a") |
| 638 | repo_b = await create_repo(db_session, owner="cmisolate", slug="repo-b") |
| 639 | rid_a = str(repo_a.repo_id) |
| 640 | rid_b = str(repo_b.repo_id) |
| 641 | |
| 642 | await _insert_module_row(db_session, rid_a, "src/a.py", fan_in=5) |
| 643 | await db_session.commit() |
| 644 | |
| 645 | result = await db_session.execute( |
| 646 | sa.select(MusehubIntelCodemapModule) |
| 647 | .where(MusehubIntelCodemapModule.repo_id == rid_b) |
| 648 | ) |
| 649 | assert result.first() is None, "Cross-repo contamination: repo B sees repo A rows" |
| 650 | |
| 651 | |
| 652 | # ───────────────────────────────────────────────────────────────────────────── |
| 653 | # Layer T6 — Performance |
| 654 | # ───────────────────────────────────────────────────────────────────────────── |
| 655 | |
| 656 | class TestPerformance: |
| 657 | |
| 658 | @pytest.mark.asyncio |
| 659 | async def test_T27_provider_completes_under_threshold( |
| 660 | self, db_session: AsyncSession |
| 661 | ) -> None: |
| 662 | """CodemapProvider must complete within 10 s for a 50-file manifest.""" |
| 663 | from musehub.services.musehub_intel_providers import CodemapProvider |
| 664 | |
| 665 | repo = await create_repo(db_session, owner="cmperfp", slug="cm-perf-p") |
| 666 | rid = str(repo.repo_id) |
| 667 | n = 50 |
| 668 | manifest = {f"src/mod_{i}.py": long_id(f"{i:064x}") for i in range(n)} |
| 669 | await _seed_snapshot(db_session, rid, manifest) |
| 670 | |
| 671 | def _tree_for(path: str) -> JSONObject: |
| 672 | return _fake_import_tree(path, [], n_symbols=5) |
| 673 | |
| 674 | trees = [_tree_for(fp) for fp in manifest] |
| 675 | src_bytes = [b"src"] * n |
| 676 | mock_backend = AsyncMock() |
| 677 | mock_backend.get = AsyncMock(side_effect=src_bytes) |
| 678 | |
| 679 | with ( |
| 680 | patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), |
| 681 | patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=trees), |
| 682 | patch("musehub.services.musehub_intel_providers.language_of", return_value="Python"), |
| 683 | ): |
| 684 | t0 = time.monotonic() |
| 685 | await CodemapProvider().compute(db_session, rid, "dev", {"owner": "cmperfp", "slug": "cm-perf-p"}) |
| 686 | elapsed = time.monotonic() - t0 |
| 687 | |
| 688 | assert elapsed < 10.0, f"Provider took {elapsed:.2f}s — exceeds 10s threshold" |
| 689 | |
| 690 | @pytest.mark.asyncio |
| 691 | async def test_T28_route_responds_under_500ms( |
| 692 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 693 | ) -> None: |
| 694 | """GET /intel/codemap must respond in under 500 ms for a seeded repo.""" |
| 695 | t0 = time.monotonic() |
| 696 | resp = await client.get("/cmuser/cm-e2e/intel/codemap") |
| 697 | elapsed = time.monotonic() - t0 |
| 698 | assert resp.status_code == 200 |
| 699 | assert elapsed < 0.5, f"Route took {elapsed:.3f}s — exceeds 500ms threshold" |
| 700 | |
| 701 | def test_T29_module_table_has_repo_index(self) -> None: |
| 702 | """ix_intel_codemap_modules_repo index must exist on the ORM model.""" |
| 703 | table = MusehubIntelCodemapModule.__table__ |
| 704 | index_names = {idx.name for idx in table.indexes} |
| 705 | assert "ix_intel_codemap_modules_repo" in index_names, ( |
| 706 | f"Index missing. Found: {index_names}" |
| 707 | ) |
| 708 | |
| 709 | |
| 710 | # ───────────────────────────────────────────────────────────────────────────── |
| 711 | # Layer T7 — Security |
| 712 | # ───────────────────────────────────────────────────────────────────────────── |
| 713 | |
| 714 | class TestSecurity: |
| 715 | |
| 716 | @pytest.mark.asyncio |
| 717 | async def test_T30_xss_file_path_is_escaped( |
| 718 | self, client: AsyncClient, db_session: AsyncSession |
| 719 | ) -> None: |
| 720 | """file_path containing HTML must be escaped in the rendered page.""" |
| 721 | repo = await create_repo(db_session, owner="cmxss", slug="cm-xss") |
| 722 | rid = str(repo.repo_id) |
| 723 | xss_path = "src/<script>alert(1)</script>.py" |
| 724 | await _insert_module_row(db_session, rid, xss_path, fan_in=1) |
| 725 | await _insert_meta_row(db_session, rid, total_modules=1, total_edges=0) |
| 726 | await db_session.commit() |
| 727 | |
| 728 | resp = await client.get("/cmxss/cm-xss/intel/codemap") |
| 729 | assert resp.status_code == 200 |
| 730 | assert "<script>alert(1)</script>" not in resp.text, "Unescaped XSS payload in response" |
| 731 | |
| 732 | @pytest.mark.asyncio |
| 733 | async def test_T31_sql_injection_in_sort_param_is_safe( |
| 734 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 735 | ) -> None: |
| 736 | """SQL injection attempt in sort param must return 200, not 500.""" |
| 737 | resp = await client.get("/cmuser/cm-e2e/intel/codemap?sort=symbols%3BDROP+TABLE+musehub_intel_codemap_modules") |
| 738 | assert resp.status_code == 200 |
| 739 | |
| 740 | @pytest.mark.asyncio |
| 741 | async def test_T32_invalid_top_param_does_not_500( |
| 742 | self, client: AsyncClient, cm_repo: MusehubRepo |
| 743 | ) -> None: |
| 744 | """Invalid top param (non-integer string) must not return 500.""" |
| 745 | resp = await client.get("/cmuser/cm-e2e/intel/codemap?top=INVALID") |
| 746 | assert resp.status_code == 200 |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
22 days ago