test_intel_languages.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
21 days ago
| 1 | """Languages intel — full 7-tier test suite (issue #20). |
| 2 | |
| 3 | Tests are written TDD-first: all tests must be RED before Phase 4–7 |
| 4 | implementation begins, then GREEN after. |
| 5 | |
| 6 | Tiers |
| 7 | ----- |
| 8 | T01–T04 Layer T1 — DB model (columns, nullable, kinds_json, cascade) |
| 9 | T05–T09 Layer T2 — Provider (no subprocess, file counts, kinds, pct, empty) |
| 10 | T10–T17 Layer T3 — Route (200, empty state, 404, sort, filter, pagination) |
| 11 | T18–T21 Layer T4 — E2E HTML (stat chips, bar width, kind chips, dashboard link) |
| 12 | T22–T24 Layer T5 — Data integrity (no duplicates, upsert overwrite, cross-repo) |
| 13 | T25–T27 Layer T6 — Performance (provider speed, route speed, index check) |
| 14 | T28–T30 Layer T7 — Security (XSS escape, SQL injection, no 500 on bad input) |
| 15 | """ |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import time |
| 19 | from datetime import datetime, timezone |
| 20 | from unittest.mock import AsyncMock, patch |
| 21 | |
| 22 | import pytest |
| 23 | import pytest_asyncio |
| 24 | import sqlalchemy as sa |
| 25 | from httpx import AsyncClient |
| 26 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 27 | from sqlalchemy.ext.asyncio import AsyncSession |
| 28 | |
| 29 | from musehub.db.musehub_intel_models import MusehubIntelLanguages |
| 30 | from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef |
| 31 | from musehub.types.json_types import JSONObject |
| 32 | from tests.factories import create_repo |
| 33 | from muse.core.types import long_id |
| 34 | |
| 35 | _REF = long_id("b" * 64) |
| 36 | |
| 37 | |
| 38 | # ───────────────────────────────────────────────────────────────────────────── |
| 39 | # Helpers |
| 40 | # ───────────────────────────────────────────────────────────────────────────── |
| 41 | |
| 42 | async def _insert_lang_row( |
| 43 | session: AsyncSession, |
| 44 | repo_id: str, |
| 45 | language: str, |
| 46 | file_count: int = 1, |
| 47 | symbol_count: int = 0, |
| 48 | pct: float = 0.0, |
| 49 | kinds_json: JSONObject | None = None, |
| 50 | ref: str = _REF, |
| 51 | ) -> None: |
| 52 | """Upsert one row into musehub_intel_languages.""" |
| 53 | await session.execute( |
| 54 | pg_insert(MusehubIntelLanguages) |
| 55 | .values( |
| 56 | repo_id=repo_id, |
| 57 | language=language, |
| 58 | file_count=file_count, |
| 59 | symbol_count=symbol_count, |
| 60 | pct=pct, |
| 61 | kinds_json=kinds_json, |
| 62 | ref=ref, |
| 63 | ) |
| 64 | .on_conflict_do_update( |
| 65 | index_elements=["repo_id", "language"], |
| 66 | set_={ |
| 67 | "file_count": file_count, |
| 68 | "symbol_count": symbol_count, |
| 69 | "pct": pct, |
| 70 | "kinds_json": kinds_json, |
| 71 | "ref": ref, |
| 72 | }, |
| 73 | ) |
| 74 | ) |
| 75 | |
| 76 | |
| 77 | async def _seed_snapshot( |
| 78 | session: AsyncSession, |
| 79 | repo_id: str, |
| 80 | manifest: dict[str, str], |
| 81 | ) -> str: |
| 82 | """Insert a MusehubCommit + MusehubSnapshot, return snapshot_id.""" |
| 83 | import msgpack |
| 84 | |
| 85 | snap_id = long_id("c" * 64) |
| 86 | commit_id = long_id("d" * 64) |
| 87 | |
| 88 | await session.execute( |
| 89 | pg_insert(MusehubSnapshot) |
| 90 | .values( |
| 91 | snapshot_id = snap_id, |
| 92 | directories = [], |
| 93 | manifest_blob= msgpack.packb(manifest), |
| 94 | entry_count = len(manifest), |
| 95 | created_at = datetime(2026, 1, 1, tzinfo=timezone.utc), |
| 96 | ) |
| 97 | .on_conflict_do_nothing() |
| 98 | ) |
| 99 | await session.execute( |
| 100 | pg_insert(MusehubSnapshotRef) |
| 101 | .values(repo_id=repo_id, snapshot_id=snap_id) |
| 102 | .on_conflict_do_nothing() |
| 103 | ) |
| 104 | await session.execute( |
| 105 | pg_insert(MusehubCommit) |
| 106 | .values( |
| 107 | commit_id = commit_id, |
| 108 | branch = "dev", |
| 109 | parent_ids = [], |
| 110 | message = "test", |
| 111 | author = "lnuser", |
| 112 | timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc), |
| 113 | snapshot_id = snap_id, |
| 114 | ) |
| 115 | .on_conflict_do_nothing() |
| 116 | ) |
| 117 | await session.execute( |
| 118 | pg_insert(MusehubCommitRef) |
| 119 | .values(repo_id=repo_id, commit_id=commit_id) |
| 120 | .on_conflict_do_nothing() |
| 121 | ) |
| 122 | await session.commit() |
| 123 | return snap_id |
| 124 | |
| 125 | |
| 126 | def _fake_tree(n: int, kinds: list[str] | None = None) -> JSONObject: |
| 127 | """Return a SymbolTree dict with n public symbols spread across kinds.""" |
| 128 | _kinds = kinds or ["function", "class", "method"] |
| 129 | return { |
| 130 | f"src/mod.py::sym_{i}": { |
| 131 | "kind": _kinds[i % len(_kinds)], |
| 132 | "name": f"sym_{i}", |
| 133 | "qualified_name": f"sym_{i}", |
| 134 | "content_id": long_id("a" * 64), |
| 135 | "body_hash": long_id("b" * 64), |
| 136 | "signature_id": long_id("c" * 64), |
| 137 | "metadata_id": "", |
| 138 | "canonical_key": f"src/mod.py##function#sym_{i}#1", |
| 139 | "lineno": i + 1, |
| 140 | "end_lineno": i + 2, |
| 141 | } |
| 142 | for i in range(n) |
| 143 | } |
| 144 | |
| 145 | |
| 146 | @pytest_asyncio.fixture |
| 147 | async def ln_repo(db_session: AsyncSession) -> MusehubRepo: |
| 148 | """Repo seeded with Python, TypeScript, and CSS language rows.""" |
| 149 | repo = await create_repo(db_session, owner="lnuser", slug="ln-e2e") |
| 150 | rid = str(repo.repo_id) |
| 151 | |
| 152 | await _insert_lang_row( |
| 153 | db_session, rid, "Python", |
| 154 | file_count=30, symbol_count=1500, pct=75.0, |
| 155 | kinds_json={"function": 800, "class": 400, "method": 300}, |
| 156 | ) |
| 157 | await _insert_lang_row( |
| 158 | db_session, rid, "TypeScript", |
| 159 | file_count=10, symbol_count=400, pct=20.0, |
| 160 | kinds_json={"function": 300, "class": 100}, |
| 161 | ) |
| 162 | await _insert_lang_row( |
| 163 | db_session, rid, "CSS", |
| 164 | file_count=5, symbol_count=0, pct=0.0, |
| 165 | kinds_json=None, |
| 166 | ) |
| 167 | |
| 168 | await db_session.commit() |
| 169 | return repo |
| 170 | |
| 171 | |
| 172 | # ───────────────────────────────────────────────────────────────────────────── |
| 173 | # Layer T1 — DB model |
| 174 | # ───────────────────────────────────────────────────────────────────────────── |
| 175 | |
| 176 | class TestDBModel: |
| 177 | |
| 178 | def test_T01_model_has_all_required_columns(self) -> None: |
| 179 | """MusehubIntelLanguages must declare all expected mapped columns.""" |
| 180 | cols = { |
| 181 | c.key |
| 182 | for c in sa.inspect(MusehubIntelLanguages).mapper.column_attrs |
| 183 | } |
| 184 | for required in ( |
| 185 | "repo_id", "language", "file_count", "symbol_count", "pct", |
| 186 | "kinds_json", "ref", |
| 187 | ): |
| 188 | assert required in cols, ( |
| 189 | f"Column '{required}' missing from MusehubIntelLanguages" |
| 190 | ) |
| 191 | |
| 192 | def test_T02_kinds_json_is_nullable(self) -> None: |
| 193 | """kinds_json must be nullable — non-code languages have no symbol breakdown.""" |
| 194 | col = MusehubIntelLanguages.__table__.c["kinds_json"] |
| 195 | assert col.nullable, "kinds_json must be nullable" |
| 196 | |
| 197 | def test_T03_composite_pk_is_repo_id_plus_language(self) -> None: |
| 198 | """Primary key must be (repo_id, language) — no single-column PK.""" |
| 199 | pk_cols = { |
| 200 | c.name |
| 201 | for c in MusehubIntelLanguages.__table__.primary_key.columns |
| 202 | } |
| 203 | assert pk_cols == {"repo_id", "language"}, ( |
| 204 | f"Expected PK {{repo_id, language}}, got {pk_cols}" |
| 205 | ) |
| 206 | |
| 207 | @pytest.mark.asyncio |
| 208 | async def test_T04_cascade_delete_removes_lang_rows( |
| 209 | self, db_session: AsyncSession |
| 210 | ) -> None: |
| 211 | """Deleting a repo must cascade-delete all its language rows.""" |
| 212 | repo = await create_repo(db_session, owner="lnuser", slug="t04-cascade") |
| 213 | rid = str(repo.repo_id) |
| 214 | await _insert_lang_row(db_session, rid, "Python", file_count=3) |
| 215 | await db_session.commit() |
| 216 | |
| 217 | row = await db_session.scalar( |
| 218 | sa.select(MusehubIntelLanguages).where( |
| 219 | MusehubIntelLanguages.repo_id == rid, |
| 220 | MusehubIntelLanguages.language == "Python", |
| 221 | ) |
| 222 | ) |
| 223 | assert row is not None, "Row not found after insert" |
| 224 | |
| 225 | await db_session.delete(repo) |
| 226 | await db_session.commit() |
| 227 | |
| 228 | remaining = (await db_session.execute( |
| 229 | sa.select(MusehubIntelLanguages).where( |
| 230 | MusehubIntelLanguages.repo_id == rid |
| 231 | ) |
| 232 | )).scalars().all() |
| 233 | assert not remaining, ( |
| 234 | "Cascade delete failed — languages rows remain after repo delete" |
| 235 | ) |
| 236 | |
| 237 | |
| 238 | # ───────────────────────────────────────────────────────────────────────────── |
| 239 | # Layer T2 — Provider |
| 240 | # ───────────────────────────────────────────────────────────────────────────── |
| 241 | |
| 242 | class TestProvider: |
| 243 | |
| 244 | @pytest.mark.asyncio |
| 245 | async def test_T05_provider_does_not_use_subprocess( |
| 246 | self, db_session: AsyncSession |
| 247 | ) -> None: |
| 248 | """LanguagesProvider must never call asyncio.create_subprocess_exec or _run_muse.""" |
| 249 | import inspect |
| 250 | from musehub.services import musehub_intel_providers as _mod |
| 251 | |
| 252 | src = inspect.getsource(_mod.LanguagesProvider.compute) |
| 253 | assert "create_subprocess" not in src, ( |
| 254 | "LanguagesProvider.compute calls create_subprocess — forbidden" |
| 255 | ) |
| 256 | assert "_run_muse" not in src, ( |
| 257 | "LanguagesProvider.compute calls _run_muse — forbidden" |
| 258 | ) |
| 259 | |
| 260 | @pytest.mark.asyncio |
| 261 | async def test_T06_provider_counts_files_per_language( |
| 262 | self, db_session: AsyncSession |
| 263 | ) -> None: |
| 264 | """Provider must count files per language via language_of(), not subprocess.""" |
| 265 | from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY |
| 266 | |
| 267 | repo = await create_repo(db_session, owner="lnuser", slug="t06-files") |
| 268 | rid = str(repo.repo_id) |
| 269 | manifest = { |
| 270 | "src/a.py": long_id("e" * 64), |
| 271 | "src/b.py": long_id("f" * 64), |
| 272 | "src/app.ts": long_id("1" * 64), |
| 273 | "static/main.css": long_id("2" * 64), |
| 274 | } |
| 275 | await _seed_snapshot(db_session, rid, manifest) |
| 276 | |
| 277 | mock_backend = AsyncMock() |
| 278 | mock_backend.get = AsyncMock(return_value=b"# placeholder") |
| 279 | |
| 280 | with ( |
| 281 | patch("musehub.services.musehub_intel_providers.get_backend", |
| 282 | return_value=mock_backend), |
| 283 | patch("musehub.services.musehub_intel_providers.parse_symbols", |
| 284 | return_value={}), |
| 285 | patch("musehub.services.musehub_intel_providers.language_of", |
| 286 | side_effect=lambda p: ( |
| 287 | "Python" if p.endswith(".py") else |
| 288 | "TypeScript" if p.endswith(".ts") else |
| 289 | "CSS" |
| 290 | )), |
| 291 | ): |
| 292 | result = await _PROVIDER_REGISTRY["intel.code.languages"].compute( |
| 293 | db_session, rid, _REF, |
| 294 | {"owner": repo.owner, "slug": repo.slug}, |
| 295 | ) |
| 296 | |
| 297 | assert result == [("intel.code.languages", {"count": 3})], ( |
| 298 | f"Expected 3 language rows, got: {result}" |
| 299 | ) |
| 300 | |
| 301 | rows = (await db_session.execute( |
| 302 | sa.select(MusehubIntelLanguages).where( |
| 303 | MusehubIntelLanguages.repo_id == rid |
| 304 | ) |
| 305 | )).scalars().all() |
| 306 | by_lang = {r.language: r for r in rows} |
| 307 | assert by_lang["Python"].file_count == 2 |
| 308 | assert by_lang["TypeScript"].file_count == 1 |
| 309 | assert by_lang["CSS"].file_count == 1 |
| 310 | |
| 311 | @pytest.mark.asyncio |
| 312 | async def test_T07_provider_records_kinds_json( |
| 313 | self, db_session: AsyncSession |
| 314 | ) -> None: |
| 315 | """kinds_json must contain kind → count breakdown, imports excluded.""" |
| 316 | from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY |
| 317 | |
| 318 | repo = await create_repo(db_session, owner="lnuser", slug="t07-kinds") |
| 319 | rid = str(repo.repo_id) |
| 320 | await _seed_snapshot(db_session, rid, {"src/x.py": long_id("3" * 64)}) |
| 321 | |
| 322 | tree = _fake_tree(6, kinds=["function", "class", "import"]) |
| 323 | |
| 324 | mock_backend = AsyncMock() |
| 325 | mock_backend.get = AsyncMock(return_value=b"# placeholder") |
| 326 | |
| 327 | with ( |
| 328 | patch("musehub.services.musehub_intel_providers.get_backend", |
| 329 | return_value=mock_backend), |
| 330 | patch("musehub.services.musehub_intel_providers.parse_symbols", |
| 331 | return_value=tree), |
| 332 | patch("musehub.services.musehub_intel_providers.language_of", |
| 333 | return_value="Python"), |
| 334 | ): |
| 335 | await _PROVIDER_REGISTRY["intel.code.languages"].compute( |
| 336 | db_session, rid, _REF, |
| 337 | {"owner": repo.owner, "slug": repo.slug}, |
| 338 | ) |
| 339 | |
| 340 | row = await db_session.scalar( |
| 341 | sa.select(MusehubIntelLanguages).where( |
| 342 | MusehubIntelLanguages.repo_id == rid, |
| 343 | MusehubIntelLanguages.language == "Python", |
| 344 | ) |
| 345 | ) |
| 346 | assert row is not None |
| 347 | assert row.kinds_json is not None, "kinds_json must not be None when symbols exist" |
| 348 | assert "import" not in row.kinds_json, ( |
| 349 | "import pseudo-symbols must be excluded from kinds_json" |
| 350 | ) |
| 351 | assert set(row.kinds_json.keys()) <= {"function", "class", "method", |
| 352 | "async_function", "async_method"}, ( |
| 353 | f"Unexpected kinds in kinds_json: {set(row.kinds_json.keys())}" |
| 354 | ) |
| 355 | |
| 356 | @pytest.mark.asyncio |
| 357 | async def test_T08_provider_pct_sums_correctly( |
| 358 | self, db_session: AsyncSession |
| 359 | ) -> None: |
| 360 | """Sum of pct across all languages must be ≈ 100 when all files have symbols.""" |
| 361 | from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY |
| 362 | |
| 363 | repo = await create_repo(db_session, owner="lnuser", slug="t08-pct") |
| 364 | rid = str(repo.repo_id) |
| 365 | manifest = { |
| 366 | "src/a.py": long_id("4" * 64), |
| 367 | "src/b.ts": long_id("5" * 64), |
| 368 | } |
| 369 | await _seed_snapshot(db_session, rid, manifest) |
| 370 | |
| 371 | py_tree = _fake_tree(3, kinds=["function"]) |
| 372 | ts_tree = _fake_tree(1, kinds=["function"]) |
| 373 | |
| 374 | mock_backend = AsyncMock() |
| 375 | mock_backend.get = AsyncMock(return_value=b"# placeholder") |
| 376 | |
| 377 | def _fake_parse(src: bytes, path: str) -> JSONObject: |
| 378 | return py_tree if path.endswith(".py") else ts_tree |
| 379 | |
| 380 | with ( |
| 381 | patch("musehub.services.musehub_intel_providers.get_backend", |
| 382 | return_value=mock_backend), |
| 383 | patch("musehub.services.musehub_intel_providers.parse_symbols", |
| 384 | side_effect=_fake_parse), |
| 385 | patch("musehub.services.musehub_intel_providers.language_of", |
| 386 | side_effect=lambda p: "Python" if p.endswith(".py") else "TypeScript"), |
| 387 | ): |
| 388 | await _PROVIDER_REGISTRY["intel.code.languages"].compute( |
| 389 | db_session, rid, _REF, |
| 390 | {"owner": repo.owner, "slug": repo.slug}, |
| 391 | ) |
| 392 | |
| 393 | rows = (await db_session.execute( |
| 394 | sa.select(MusehubIntelLanguages).where( |
| 395 | MusehubIntelLanguages.repo_id == rid |
| 396 | ) |
| 397 | )).scalars().all() |
| 398 | total_pct = sum(r.pct for r in rows) |
| 399 | assert abs(total_pct - 100.0) < 0.01, ( |
| 400 | f"pct values do not sum to 100 (sum={total_pct:.2f})" |
| 401 | ) |
| 402 | |
| 403 | @pytest.mark.asyncio |
| 404 | async def test_T09_provider_returns_empty_when_no_snapshot( |
| 405 | self, db_session: AsyncSession |
| 406 | ) -> None: |
| 407 | """Provider must return [] without crashing when the repo has no snapshot.""" |
| 408 | from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY |
| 409 | |
| 410 | repo = await create_repo(db_session, owner="lnuser", slug="t09-nosnap") |
| 411 | rid = str(repo.repo_id) |
| 412 | await db_session.commit() |
| 413 | |
| 414 | result = await _PROVIDER_REGISTRY["intel.code.languages"].compute( |
| 415 | db_session, rid, _REF, |
| 416 | {"owner": repo.owner, "slug": repo.slug}, |
| 417 | ) |
| 418 | assert result == [], f"Expected [] when no snapshot exists, got {result}" |
| 419 | |
| 420 | |
| 421 | # ───────────────────────────────────────────────────────────────────────────── |
| 422 | # Layer T3 — Route |
| 423 | # ───────────────────────────────────────────────────────────────────────────── |
| 424 | |
| 425 | class TestRoute: |
| 426 | |
| 427 | @pytest.mark.asyncio |
| 428 | async def test_T10_returns_200_with_language_data( |
| 429 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 430 | ) -> None: |
| 431 | """Route must return 200 when language rows exist.""" |
| 432 | r = await client.get("/lnuser/ln-e2e/intel/languages") |
| 433 | assert r.status_code == 200 |
| 434 | |
| 435 | @pytest.mark.asyncio |
| 436 | async def test_T11_returns_200_with_empty_repo( |
| 437 | self, client: AsyncClient, db_session: AsyncSession |
| 438 | ) -> None: |
| 439 | """Route must return 200 even when musehub_intel_languages has no rows.""" |
| 440 | await create_repo(db_session, owner="lnuser", slug="t11-empty") |
| 441 | await db_session.commit() |
| 442 | r = await client.get("/lnuser/t11-empty/intel/languages") |
| 443 | assert r.status_code == 200 |
| 444 | |
| 445 | @pytest.mark.asyncio |
| 446 | async def test_T12_unknown_repo_returns_404( |
| 447 | self, client: AsyncClient |
| 448 | ) -> None: |
| 449 | """Non-existent repo path must return 403 or 404, not 200 or 500.""" |
| 450 | r = await client.get("/nobody/no-such-repo/intel/languages") |
| 451 | assert r.status_code in (403, 404) |
| 452 | |
| 453 | @pytest.mark.asyncio |
| 454 | async def test_T13_sort_by_files_param_accepted( |
| 455 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 456 | ) -> None: |
| 457 | """?sort=files must return 200 and not raise an error.""" |
| 458 | r = await client.get("/lnuser/ln-e2e/intel/languages?sort=files") |
| 459 | assert r.status_code == 200 |
| 460 | |
| 461 | @pytest.mark.asyncio |
| 462 | async def test_T14_sort_by_symbols_param_accepted( |
| 463 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 464 | ) -> None: |
| 465 | """?sort=symbols must return 200.""" |
| 466 | r = await client.get("/lnuser/ln-e2e/intel/languages?sort=symbols") |
| 467 | assert r.status_code == 200 |
| 468 | |
| 469 | @pytest.mark.asyncio |
| 470 | async def test_T15_unknown_sort_coerced_to_default( |
| 471 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 472 | ) -> None: |
| 473 | """?sort=garbage must return 200, coerced to default sort (pct desc).""" |
| 474 | r = await client.get("/lnuser/ln-e2e/intel/languages?sort=garbage") |
| 475 | assert r.status_code == 200 |
| 476 | |
| 477 | @pytest.mark.asyncio |
| 478 | async def test_T16_top_param_limits_rows( |
| 479 | self, client: AsyncClient, db_session: AsyncSession |
| 480 | ) -> None: |
| 481 | """?top=20 must return at most 20 language rows when 25 exist.""" |
| 482 | repo = await create_repo(db_session, owner="lnuser", slug="t16-top") |
| 483 | rid = str(repo.repo_id) |
| 484 | langs = [f"Lang{i:02d}" for i in range(25)] |
| 485 | for i, lang in enumerate(langs): |
| 486 | await _insert_lang_row(db_session, rid, lang, file_count=i + 1) |
| 487 | await db_session.commit() |
| 488 | |
| 489 | r = await client.get("/lnuser/t16-top/intel/languages?top=20") |
| 490 | assert r.status_code == 200 |
| 491 | count = sum(1 for lang in langs if lang in r.text) |
| 492 | assert count <= 20, f"Expected ≤20 languages for ?top=20, found {count}" |
| 493 | |
| 494 | @pytest.mark.asyncio |
| 495 | async def test_T17_top_invalid_string_returns_422( |
| 496 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 497 | ) -> None: |
| 498 | """?top=abc must be rejected with 422 (FastAPI type validation).""" |
| 499 | r = await client.get("/lnuser/ln-e2e/intel/languages?top=abc") |
| 500 | assert r.status_code == 422 |
| 501 | |
| 502 | |
| 503 | # ───────────────────────────────────────────────────────────────────────────── |
| 504 | # Layer T4 — E2E HTML |
| 505 | # ───────────────────────────────────────────────────────────────────────────── |
| 506 | |
| 507 | class TestE2E: |
| 508 | |
| 509 | @pytest.mark.asyncio |
| 510 | async def test_T18_language_names_appear_in_page( |
| 511 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 512 | ) -> None: |
| 513 | """All seeded language names must appear in the rendered HTML.""" |
| 514 | r = await client.get("/lnuser/ln-e2e/intel/languages") |
| 515 | assert r.status_code == 200 |
| 516 | for lang in ("Python", "TypeScript", "CSS"): |
| 517 | assert lang in r.text, f"Language '{lang}' missing from page" |
| 518 | |
| 519 | @pytest.mark.asyncio |
| 520 | async def test_T19_pct_bar_width_rendered( |
| 521 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 522 | ) -> None: |
| 523 | """A width style attribute must appear in the HTML (for bar rendering).""" |
| 524 | r = await client.get("/lnuser/ln-e2e/intel/languages") |
| 525 | assert r.status_code == 200 |
| 526 | assert "width:" in r.text, "No width style found — pct bars not rendered" |
| 527 | |
| 528 | @pytest.mark.asyncio |
| 529 | async def test_T20_kind_chips_rendered_for_python( |
| 530 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 531 | ) -> None: |
| 532 | """Kind chips for Python (function, class, method) must appear in HTML.""" |
| 533 | r = await client.get("/lnuser/ln-e2e/intel/languages") |
| 534 | assert r.status_code == 200 |
| 535 | body = r.text.lower() |
| 536 | for kind in ("function", "class", "method"): |
| 537 | assert kind in body, f"Kind chip '{kind}' missing from languages page" |
| 538 | |
| 539 | @pytest.mark.asyncio |
| 540 | async def test_T21_dashboard_card_links_to_languages_page( |
| 541 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 542 | ) -> None: |
| 543 | """Intel dashboard must include a link to /intel/languages.""" |
| 544 | r = await client.get("/lnuser/ln-e2e/intel") |
| 545 | assert r.status_code == 200 |
| 546 | assert b"/intel/languages" in r.content |
| 547 | |
| 548 | |
| 549 | # ───────────────────────────────────────────────────────────────────────────── |
| 550 | # Layer T5 — Data integrity |
| 551 | # ───────────────────────────────────────────────────────────────────────────── |
| 552 | |
| 553 | class TestDataIntegrity: |
| 554 | |
| 555 | @pytest.mark.asyncio |
| 556 | async def test_T22_double_upsert_produces_one_row( |
| 557 | self, db_session: AsyncSession |
| 558 | ) -> None: |
| 559 | """Upserting the same (repo_id, language) twice must not create duplicates.""" |
| 560 | repo = await create_repo(db_session, owner="lnuser", slug="t22-dup") |
| 561 | rid = str(repo.repo_id) |
| 562 | |
| 563 | for _ in range(2): |
| 564 | await _insert_lang_row(db_session, rid, "Python", file_count=5) |
| 565 | await db_session.commit() |
| 566 | |
| 567 | rows = (await db_session.execute( |
| 568 | sa.select(MusehubIntelLanguages).where( |
| 569 | MusehubIntelLanguages.repo_id == rid |
| 570 | ) |
| 571 | )).scalars().all() |
| 572 | assert len(rows) == 1, ( |
| 573 | f"Expected 1 row after double upsert, got {len(rows)}" |
| 574 | ) |
| 575 | |
| 576 | @pytest.mark.asyncio |
| 577 | async def test_T23_second_upsert_overwrites_file_count( |
| 578 | self, db_session: AsyncSession |
| 579 | ) -> None: |
| 580 | """A second upsert must overwrite file_count with the latest value.""" |
| 581 | repo = await create_repo(db_session, owner="lnuser", slug="t23-overwrite") |
| 582 | rid = str(repo.repo_id) |
| 583 | |
| 584 | await _insert_lang_row(db_session, rid, "Python", file_count=5) |
| 585 | await _insert_lang_row(db_session, rid, "Python", file_count=12) |
| 586 | await db_session.commit() |
| 587 | |
| 588 | row = await db_session.scalar( |
| 589 | sa.select(MusehubIntelLanguages).where( |
| 590 | MusehubIntelLanguages.repo_id == rid, |
| 591 | MusehubIntelLanguages.language == "Python", |
| 592 | ) |
| 593 | ) |
| 594 | assert row is not None |
| 595 | assert row.file_count == 12, ( |
| 596 | f"Expected file_count=12 after overwrite upsert, got {row.file_count}" |
| 597 | ) |
| 598 | |
| 599 | @pytest.mark.asyncio |
| 600 | async def test_T24_cross_repo_isolation( |
| 601 | self, db_session: AsyncSession |
| 602 | ) -> None: |
| 603 | """Languages from repo A must not appear in repo B's DB rows.""" |
| 604 | repo_a = await create_repo(db_session, owner="lnuser", slug="t24-repo-a") |
| 605 | repo_b = await create_repo(db_session, owner="lnuser", slug="t24-repo-b") |
| 606 | |
| 607 | await _insert_lang_row( |
| 608 | db_session, str(repo_a.repo_id), "SecretLang", file_count=99 |
| 609 | ) |
| 610 | await db_session.commit() |
| 611 | |
| 612 | rows_b = (await db_session.execute( |
| 613 | sa.select(MusehubIntelLanguages).where( |
| 614 | MusehubIntelLanguages.repo_id == str(repo_b.repo_id) |
| 615 | ) |
| 616 | )).scalars().all() |
| 617 | assert not rows_b, "Repo B must not see Repo A's language rows" |
| 618 | |
| 619 | |
| 620 | # ───────────────────────────────────────────────────────────────────────────── |
| 621 | # Layer T6 — Performance |
| 622 | # ───────────────────────────────────────────────────────────────────────────── |
| 623 | |
| 624 | class TestPerformance: |
| 625 | |
| 626 | @pytest.mark.asyncio |
| 627 | async def test_T25_provider_completes_100_files_under_2s( |
| 628 | self, db_session: AsyncSession |
| 629 | ) -> None: |
| 630 | """Provider must process a 100-file manifest in < 2 s wall time.""" |
| 631 | from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY |
| 632 | |
| 633 | repo = await create_repo(db_session, owner="lnuser", slug="t25-speed") |
| 634 | rid = str(repo.repo_id) |
| 635 | manifest = {f"src/file_{i}.py": long_id(f"{'0' * 63}{i % 10}") for i in range(100)} |
| 636 | await _seed_snapshot(db_session, rid, manifest) |
| 637 | |
| 638 | mock_backend = AsyncMock() |
| 639 | mock_backend.get = AsyncMock(return_value=b"# py") |
| 640 | |
| 641 | with ( |
| 642 | patch("musehub.services.musehub_intel_providers.get_backend", |
| 643 | return_value=mock_backend), |
| 644 | patch("musehub.services.musehub_intel_providers.parse_symbols", |
| 645 | return_value=_fake_tree(10)), |
| 646 | patch("musehub.services.musehub_intel_providers.language_of", |
| 647 | return_value="Python"), |
| 648 | ): |
| 649 | t0 = time.monotonic() |
| 650 | await _PROVIDER_REGISTRY["intel.code.languages"].compute( |
| 651 | db_session, rid, _REF, |
| 652 | {"owner": repo.owner, "slug": repo.slug}, |
| 653 | ) |
| 654 | elapsed = time.monotonic() - t0 |
| 655 | |
| 656 | assert elapsed < 2.0, ( |
| 657 | f"Provider took {elapsed:.2f}s for 100 files (limit: 2s)" |
| 658 | ) |
| 659 | |
| 660 | @pytest.mark.asyncio |
| 661 | async def test_T26_route_responds_under_200ms_for_50_languages( |
| 662 | self, client: AsyncClient, db_session: AsyncSession |
| 663 | ) -> None: |
| 664 | """Route must respond in < 200 ms when 50 language rows exist.""" |
| 665 | repo = await create_repo(db_session, owner="lnuser", slug="t26-perf") |
| 666 | rid = str(repo.repo_id) |
| 667 | rows = [ |
| 668 | { |
| 669 | "repo_id": rid, |
| 670 | "language": f"Lang{i:02d}", |
| 671 | "file_count": i + 1, |
| 672 | "symbol_count": (i + 1) * 100, |
| 673 | "pct": 2.0, |
| 674 | "kinds_json": {"function": (i + 1) * 50}, |
| 675 | "ref": _REF, |
| 676 | } |
| 677 | for i in range(50) |
| 678 | ] |
| 679 | await db_session.execute( |
| 680 | pg_insert(MusehubIntelLanguages) |
| 681 | .values(rows) |
| 682 | .on_conflict_do_nothing() |
| 683 | ) |
| 684 | await db_session.commit() |
| 685 | |
| 686 | t0 = time.monotonic() |
| 687 | r = await client.get("/lnuser/t26-perf/intel/languages") |
| 688 | elapsed = time.monotonic() - t0 |
| 689 | |
| 690 | assert r.status_code == 200 |
| 691 | assert elapsed < 0.2, ( |
| 692 | f"Route took {elapsed:.3f}s for 50 language rows (limit: 0.2s)" |
| 693 | ) |
| 694 | |
| 695 | @pytest.mark.asyncio |
| 696 | async def test_T27_db_query_uses_lang_index( |
| 697 | self, db_session: AsyncSession |
| 698 | ) -> None: |
| 699 | """SELECT on musehub_intel_languages must use ix_intel_languages_repo index.""" |
| 700 | explain = await db_session.execute( |
| 701 | sa.text( |
| 702 | "EXPLAIN SELECT * FROM musehub_intel_languages WHERE repo_id = 'x'" |
| 703 | ) |
| 704 | ) |
| 705 | plan = " ".join(row[0] for row in explain.all()) |
| 706 | assert "ix_intel_languages_repo" in plan or "Index" in plan, ( |
| 707 | f"Query plan does not use ix_intel_languages_repo:\n{plan}" |
| 708 | ) |
| 709 | |
| 710 | |
| 711 | # ───────────────────────────────────────────────────────────────────────────── |
| 712 | # Layer T7 — Security |
| 713 | # ───────────────────────────────────────────────────────────────────────────── |
| 714 | |
| 715 | class TestSecurity: |
| 716 | |
| 717 | @pytest.mark.asyncio |
| 718 | async def test_T28_xss_in_language_name_is_escaped( |
| 719 | self, client: AsyncClient, db_session: AsyncSession |
| 720 | ) -> None: |
| 721 | """XSS payload stored in language name must be HTML-escaped in response.""" |
| 722 | repo = await create_repo(db_session, owner="lnuser", slug="t28-xss") |
| 723 | rid = str(repo.repo_id) |
| 724 | await _insert_lang_row( |
| 725 | db_session, rid, |
| 726 | language="<script>alert(1)</script>", |
| 727 | file_count=1, |
| 728 | ) |
| 729 | await db_session.commit() |
| 730 | |
| 731 | r = await client.get("/lnuser/t28-xss/intel/languages") |
| 732 | assert r.status_code == 200 |
| 733 | assert "<script>alert" not in r.text, ( |
| 734 | "XSS in language name not escaped by Jinja2" |
| 735 | ) |
| 736 | |
| 737 | @pytest.mark.asyncio |
| 738 | async def test_T29_sql_injection_in_sort_param_safe( |
| 739 | self, client: AsyncClient, ln_repo: MusehubRepo |
| 740 | ) -> None: |
| 741 | """SQL injection string in ?sort= must not cause a 500.""" |
| 742 | r = await client.get( |
| 743 | "/lnuser/ln-e2e/intel/languages" |
| 744 | "?sort=pct%3B%20DROP%20TABLE%20musehub_intel_languages%3B--" |
| 745 | ) |
| 746 | assert r.status_code in (200, 422), ( |
| 747 | f"SQL injection in ?sort= caused unexpected status {r.status_code}" |
| 748 | ) |
| 749 | |
| 750 | @pytest.mark.asyncio |
| 751 | async def test_T30_unauthenticated_nonexistent_repo_no_500( |
| 752 | self, client: AsyncClient |
| 753 | ) -> None: |
| 754 | """Accessing a non-existent repo URL must never return 500.""" |
| 755 | r = await client.get("/attacker/does-not-exist/intel/languages") |
| 756 | assert r.status_code != 500, ( |
| 757 | "Non-existent repo path returned 500 — should be 403 or 404" |
| 758 | ) |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
21 days ago