"""Breakage intel — full 7-tier test suite (issue #23). Tests are written TDD-first: all tests must be RED before Phase 4–7 implementation begins, then GREEN after. Tiers ----- T01–T05 Layer T1 — DB model (columns, nullable, cascade, indexes, meta) T06–T12 Layer T2 — Provider (no subprocess, stale detect, known sym bypass, module resolve bypass, empty, idempotent, meta) T13–T19 Layer T3 — Route (200, empty state, 404, type filter, top filter, stat chips, bad input no 500) T20–T24 Layer T4 — E2E HTML (severity badges, stat chips, file paths, dashboard card, empty state text) T25–T28 Layer T5 — Data integrity (upsert idempotent, cross-repo isolation, severity stored, type index) T29–T31 Layer T6 — Performance (provider speed, route speed, bulk upsert) T32–T34 Layer T7 — Security (XSS in file_path, SQL injection top param, no 500 on bad type) """ from __future__ import annotations import time from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import msgpack import pytest import pytest_asyncio import sqlalchemy as sa from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import long_id from musehub.db.musehub_intel_models import MusehubIntelBreakageIssue, MusehubIntelBreakageMeta from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from tests.factories import create_repo _REF = long_id("a" * 64) _SNAP = long_id("b" * 64) _CID = long_id("c" * 64) _OBJ_1 = long_id("d" * 64) _OBJ_2 = long_id("e" * 64) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── async def _insert_issue( session: AsyncSession, repo_id: str, issue_id: str, file_path: str = "src/foo.py", issue_type: str = "stale_import", description: str = "imports 'blob_id' but no symbol or module with that name exists in the HEAD snapshot", severity: str = "warning", ref: str = "dev", ) -> None: await session.execute( pg_insert(MusehubIntelBreakageIssue) .values( issue_id=issue_id, repo_id=repo_id, file_path=file_path, issue_type=issue_type, description=description, severity=severity, ref=ref, ) .on_conflict_do_update( index_elements=["issue_id"], set_={ "file_path": file_path, "issue_type": issue_type, "description": description, "severity": severity, "ref": ref, }, ) ) async def _insert_meta( session: AsyncSession, repo_id: str, total_issues: int = 0, warning_count: int = 0, error_count: int = 0, file_count: int = 0, ref: str = "dev", ) -> None: await session.execute( pg_insert(MusehubIntelBreakageMeta) .values( repo_id=repo_id, total_issues=total_issues, warning_count=warning_count, error_count=error_count, file_count=file_count, ref=ref, ) .on_conflict_do_update( index_elements=["repo_id"], set_={ "total_issues": total_issues, "warning_count": warning_count, "error_count": error_count, "file_count": file_count, "ref": ref, }, ) ) async def _seed_commit( session: AsyncSession, repo_id: str, manifest: dict[str, str], owner: str, slug: str, ) -> None: """Insert a single commit + snapshot (no parent needed for breakage).""" await session.execute( pg_insert(MusehubSnapshot) .values( snapshot_id = _SNAP, directories = [], manifest_blob = msgpack.packb(manifest), entry_count = len(manifest), created_at = datetime(2026, 1, 1, tzinfo=timezone.utc), ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=_SNAP) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommit) .values( commit_id = _CID, branch = "dev", parent_ids = [], message = "feat: some change", author = owner, timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc), snapshot_id = _SNAP, ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=_CID) .on_conflict_do_nothing() ) await session.commit() def _import_sym( file_path: str, symbol_name: str, module_dotted: str, ) -> tuple[str, dict]: """Return ``(address, rec)`` for a parse_symbols import record.""" address = f"{file_path}::import::{symbol_name}" return address, { "kind": "import", "name": symbol_name, "qualified_name": f"import::{module_dotted}::{symbol_name}", "content_id": long_id("1" * 64), "body_hash": "", "signature_id": "", "metadata_id": "", "canonical_key": f"{file_path}##import#{symbol_name}#1", "lineno": 1, "end_lineno": 1, } def _fn_sym( file_path: str, name: str, ) -> tuple[str, dict]: """Return ``(address, rec)`` for a parse_symbols function record.""" address = f"{file_path}::{name}" return address, { "kind": "function", "name": name, "qualified_name": name, "content_id": long_id("2" * 64), "body_hash": long_id("3" * 64), "signature_id": long_id("4" * 64), "metadata_id": "", "canonical_key": f"{file_path}##function#{name}#1", "lineno": 1, "end_lineno": 5, } @pytest_asyncio.fixture async def bk_repo(db_session: AsyncSession) -> MusehubRepo: """Repo seeded with 5 breakage issue rows and a meta row.""" repo = await create_repo(db_session, owner="bkuser", slug="bk-e2e") rid = str(repo.repo_id) for i in range(5): await _insert_issue( db_session, rid, issue_id=f"sha256:bk{'0' * 60}{i:02d}", file_path=f"src/file_{i}.py", description=f"imports 'sym_{i}' but no symbol or module with that name exists in the HEAD snapshot", ) await _insert_meta(db_session, rid, total_issues=5, warning_count=5, file_count=5) await db_session.commit() return repo # ───────────────────────────────────────────────────────────────────────────── # Layer T1 — DB model # ───────────────────────────────────────────────────────────────────────────── class TestDBModel: def test_T01_issue_model_has_required_columns(self) -> None: """MusehubIntelBreakageIssue must expose all required columns.""" cols = {c.name for c in MusehubIntelBreakageIssue.__table__.columns} for required in ("issue_id", "repo_id", "file_path", "issue_type", "description", "severity", "ref"): assert required in cols, f"Column '{required}' missing" def test_T02_meta_model_has_required_columns(self) -> None: """MusehubIntelBreakageMeta must expose all required columns.""" cols = {c.name for c in MusehubIntelBreakageMeta.__table__.columns} for required in ("repo_id", "total_issues", "warning_count", "error_count", "file_count", "ref"): assert required in cols, f"Column '{required}' missing" def test_T03_cascade_delete_on_issue(self) -> None: """repo_id FK on issues table must use CASCADE.""" fks = MusehubIntelBreakageIssue.__table__.foreign_keys for fk in fks: if "repo_id" in str(fk.parent): assert fk.ondelete == "CASCADE" return pytest.fail("No CASCADE FK found for repo_id on breakage issues") def test_T04_cascade_delete_on_meta(self) -> None: """repo_id FK on meta table must use CASCADE.""" fks = MusehubIntelBreakageMeta.__table__.foreign_keys for fk in fks: if "repo_id" in str(fk.parent): assert fk.ondelete == "CASCADE" return pytest.fail("No CASCADE FK found for repo_id on breakage meta") def test_T05_composite_index_exists(self) -> None: """ix_intel_breakage_issues_repo_type index must cover (repo_id, issue_type).""" indexes = MusehubIntelBreakageIssue.__table__.indexes names = {idx.name for idx in indexes} assert "ix_intel_breakage_issues_repo_type" in names # ───────────────────────────────────────────────────────────────────────────── # Layer T2 — Provider # ───────────────────────────────────────────────────────────────────────────── class TestProvider: @pytest.mark.asyncio async def test_T06_provider_uses_no_subprocess( self, db_session: AsyncSession ) -> None: """BreakageProvider must never call _run_muse or import subprocess.""" from musehub.services import musehub_intel_providers as svc import inspect, ast, textwrap src = inspect.getsource(svc.BreakageProvider) # Strip docstrings from AST so comment-only mentions don't trip us up tree = ast.parse(textwrap.dedent(src)) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): if ( node.body and isinstance(node.body[0], ast.Expr) and isinstance(node.body[0].value, ast.Constant) ): node.body = node.body[1:] or [ast.Pass()] non_doc_src = ast.unparse(tree) assert "_run_muse" not in non_doc_src, "BreakageProvider must not call _run_muse" assert "subprocess" not in non_doc_src, "BreakageProvider must not use subprocess" assert "create_subprocess" not in non_doc_src @pytest.mark.asyncio async def test_T07_provider_detects_stale_import( self, db_session: AsyncSession ) -> None: """Provider must detect an import whose symbol doesn't exist anywhere.""" from musehub.services.musehub_intel_providers import BreakageProvider repo = await create_repo(db_session, owner="bkp1", slug="bk-stale") rid = str(repo.repo_id) await _seed_commit(db_session, rid, {"src/a.py": _OBJ_1}, "bkp1", "bk-stale") # src/a.py imports 'blob_id' from 'muse.core.types' # 'muse/core/types.py' is NOT in the manifest # 'blob_id' is NOT defined anywhere in the snapshot file_tree = dict([ _import_sym("src/a.py", "blob_id", "muse.core.types"), ]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"src content") with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", return_value=file_tree), ): results = await BreakageProvider().compute( db_session, rid, "dev", {"owner": "bkp1", "slug": "bk-stale"} ) assert results, "Expected at least one result tuple" count = results[0][1]["count"] assert count == 1, f"Expected 1 stale import, got {count}" row = (await db_session.execute( sa.select(MusehubIntelBreakageIssue) .where(MusehubIntelBreakageIssue.repo_id == rid) )).scalars().first() assert row is not None assert row.issue_type == "stale_import" assert "blob_id" in row.description @pytest.mark.asyncio async def test_T08_known_symbol_bypasses_stale_flag( self, db_session: AsyncSession ) -> None: """Import of a symbol that exists somewhere in the snapshot is NOT stale.""" from musehub.services.musehub_intel_providers import BreakageProvider repo = await create_repo(db_session, owner="bkp2", slug="bk-known") rid = str(repo.repo_id) await _seed_commit( db_session, rid, {"src/a.py": _OBJ_1, "src/b.py": _OBJ_2}, "bkp2", "bk-known", ) # src/a.py imports 'my_fn' from 'src.b' # src/b.py defines 'my_fn' # → not stale tree_a = dict([_import_sym("src/a.py", "my_fn", "src.b")]) tree_b = dict([_fn_sym("src/b.py", "my_fn")]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"a", b"b"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[tree_a, tree_b]), ): results = await BreakageProvider().compute( db_session, rid, "dev", {"owner": "bkp2", "slug": "bk-known"} ) count = results[0][1]["count"] assert count == 0, f"Expected 0 issues (symbol exists), got {count}" @pytest.mark.asyncio async def test_T09_resolved_module_bypasses_stale_flag( self, db_session: AsyncSession ) -> None: """Import whose module path resolves to a tracked file is NOT stale.""" from musehub.services.musehub_intel_providers import BreakageProvider repo = await create_repo(db_session, owner="bkp3", slug="bk-resolve") rid = str(repo.repo_id) # Manifest includes src/b.py — so 'src.b' resolves await _seed_commit( db_session, rid, {"src/a.py": _OBJ_1, "src/b.py": _OBJ_2}, "bkp3", "bk-resolve", ) # src/a.py imports 'anything' from 'src.b' — module resolves → not stale tree_a = dict([_import_sym("src/a.py", "anything", "src.b")]) tree_b = dict([_fn_sym("src/b.py", "other_fn")]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"a", b"b"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[tree_a, tree_b]), ): results = await BreakageProvider().compute( db_session, rid, "dev", {"owner": "bkp3", "slug": "bk-resolve"} ) count = results[0][1]["count"] assert count == 0, f"Expected 0 issues (module resolved), got {count}" @pytest.mark.asyncio async def test_T10_provider_returns_empty_on_empty_manifest( self, db_session: AsyncSession ) -> None: """Provider must return [(intel_type, {count: 0})] on empty snapshot.""" from musehub.services.musehub_intel_providers import BreakageProvider repo = await create_repo(db_session, owner="bkp4", slug="bk-empty") rid = str(repo.repo_id) await _seed_commit(db_session, rid, {}, "bkp4", "bk-empty") results = await BreakageProvider().compute( db_session, rid, "dev", {"owner": "bkp4", "slug": "bk-empty"} ) assert results[0][1]["count"] == 0 @pytest.mark.asyncio async def test_T11_provider_is_idempotent( self, db_session: AsyncSession ) -> None: """Running the provider twice must produce exactly 1 row, not 2.""" from musehub.services.musehub_intel_providers import BreakageProvider repo = await create_repo(db_session, owner="bkp5", slug="bk-idempotent") rid = str(repo.repo_id) await _seed_commit(db_session, rid, {"src/a.py": _OBJ_1}, "bkp5", "bk-idempotent") file_tree = dict([_import_sym("src/a.py", "gone_fn", "gone.module")]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"content") for _ in range(2): with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", return_value=file_tree), ): await BreakageProvider().compute( db_session, rid, "dev", {"owner": "bkp5", "slug": "bk-idempotent"} ) rows = (await db_session.execute( sa.select(MusehubIntelBreakageIssue) .where(MusehubIntelBreakageIssue.repo_id == rid) )).scalars().all() assert len(rows) == 1, f"Expected 1 row after 2 runs, got {len(rows)}" @pytest.mark.asyncio async def test_T12_provider_writes_meta_row( self, db_session: AsyncSession ) -> None: """Provider must upsert a meta row with accurate counts.""" from musehub.services.musehub_intel_providers import BreakageProvider repo = await create_repo(db_session, owner="bkp6", slug="bk-meta") rid = str(repo.repo_id) await _seed_commit( db_session, rid, {"src/a.py": _OBJ_1, "src/b.py": _OBJ_2}, "bkp6", "bk-meta", ) tree_a = dict([ _import_sym("src/a.py", "stale1", "gone.one"), _import_sym("src/a.py", "stale2", "gone.two"), ]) tree_b = dict([_import_sym("src/b.py", "stale3", "gone.three")]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"a", b"b"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[tree_a, tree_b]), ): await BreakageProvider().compute( db_session, rid, "dev", {"owner": "bkp6", "slug": "bk-meta"} ) meta = (await db_session.execute( sa.select(MusehubIntelBreakageMeta) .where(MusehubIntelBreakageMeta.repo_id == rid) )).scalars().first() assert meta is not None, "Meta row not written" assert meta.total_issues == 3 assert meta.warning_count == 3 assert meta.file_count == 2 # ───────────────────────────────────────────────────────────────────────────── # Layer T3 — Route # ───────────────────────────────────────────────────────────────────────────── class TestRoute: @pytest.mark.asyncio async def test_T13_breakage_page_returns_200( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """GET /bkuser/bk-e2e/intel/breakage must return HTTP 200.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage") assert resp.status_code == 200, resp.text[:500] @pytest.mark.asyncio async def test_T14_breakage_page_empty_state( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Route must render empty state when no issue rows exist.""" repo = await create_repo(db_session, owner="bkempty", slug="bk-nodata") await db_session.commit() resp = await client.get("/bkempty/bk-nodata/intel/breakage") assert resp.status_code == 200 assert "Push a commit" in resp.text @pytest.mark.asyncio async def test_T15_breakage_page_404_for_unknown_repo( self, client: AsyncClient ) -> None: """Route must return 404 for an unknown repo slug.""" resp = await client.get("/nobody/nonexistent-repo/intel/breakage") assert resp.status_code == 404 @pytest.mark.asyncio async def test_T16_type_filter_limits_results( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """?type=stale_import must return 200 and include stale_import rows.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage?type=stale_import") assert resp.status_code == 200 assert "stale_import" in resp.text @pytest.mark.asyncio async def test_T17_top_filter_limits_results( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """?top=2 must return 200 and limit the issue list.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage?top=2") assert resp.status_code == 200 @pytest.mark.asyncio async def test_T18_stat_chips_present_in_html( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """Response must include the stat chip elements.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage") assert "bk-stat-val" in resp.text @pytest.mark.asyncio async def test_T19_invalid_top_does_not_500( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """?top=GARBAGE must return 200 and fall back to default top.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage?top=GARBAGE") assert resp.status_code == 200 # ───────────────────────────────────────────────────────────────────────────── # Layer T4 — E2E HTML # ───────────────────────────────────────────────────────────────────────────── class TestHTML: @pytest.mark.asyncio async def test_T20_severity_badge_in_html( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """Each issue row must include a severity badge element.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage") assert "bk-sev-badge" in resp.text @pytest.mark.asyncio async def test_T21_stat_chips_rendered( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """Stat chips for Total and Warnings must appear in the page.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage") html = resp.text assert "bk-stat-val" in html assert "bk-stat-lbl" in html @pytest.mark.asyncio async def test_T22_file_paths_rendered_in_rows( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """Each issue row must display the file_path.""" resp = await client.get("/bkuser/bk-e2e/intel/breakage") html = resp.text assert "src/file_0.py" in html @pytest.mark.asyncio async def test_T23_empty_state_has_helpful_message( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Empty-state div must contain a push-prompt message.""" repo = await create_repo(db_session, owner="bkempty2", slug="bk-empty2") await db_session.commit() resp = await client.get("/bkempty2/bk-empty2/intel/breakage") assert "Push a commit" in resp.text @pytest.mark.asyncio async def test_T24_dashboard_card_links_to_breakage_page( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """Intel dashboard must include a card linking to the breakage page.""" resp = await client.get("/bkuser/bk-e2e/intel") html = resp.text assert "breakage" in html.lower() # ───────────────────────────────────────────────────────────────────────────── # Layer T5 — Data integrity # ───────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: @pytest.mark.asyncio async def test_T25_upsert_is_idempotent( self, db_session: AsyncSession ) -> None: """Inserting the same issue_id twice must yield exactly one row.""" repo = await create_repo(db_session, owner="bkdi1", slug="bk-di1") rid = str(repo.repo_id) iid = long_id("f" * 64) for _ in range(2): await _insert_issue(db_session, rid, issue_id=iid) await db_session.commit() rows = (await db_session.execute( sa.select(MusehubIntelBreakageIssue) .where(MusehubIntelBreakageIssue.repo_id == rid) )).scalars().all() assert len(rows) == 1 @pytest.mark.asyncio async def test_T26_cross_repo_isolation( self, db_session: AsyncSession ) -> None: """Issues from repo A must not appear in repo B queries.""" repo_a = await create_repo(db_session, owner="bkdi2a", slug="bk-a") repo_b = await create_repo(db_session, owner="bkdi2b", slug="bk-b") rid_a = str(repo_a.repo_id) rid_b = str(repo_b.repo_id) await _insert_issue(db_session, rid_a, long_id("a" * 64), file_path="src/a.py") await db_session.commit() rows_b = (await db_session.execute( sa.select(MusehubIntelBreakageIssue) .where(MusehubIntelBreakageIssue.repo_id == rid_b) )).scalars().all() assert len(rows_b) == 0 @pytest.mark.asyncio async def test_T27_severity_stored_correctly( self, db_session: AsyncSession ) -> None: """Rows must preserve the severity value written by the provider.""" repo = await create_repo(db_session, owner="bkdi3", slug="bk-sev") rid = str(repo.repo_id) await _insert_issue(db_session, rid, long_id("e" * 64), severity="error") await db_session.commit() row = (await db_session.execute( sa.select(MusehubIntelBreakageIssue) .where(MusehubIntelBreakageIssue.repo_id == rid) )).scalars().first() assert row is not None assert row.severity == "error" @pytest.mark.asyncio async def test_T28_type_index_exists(self) -> None: """ix_intel_breakage_issues_repo_type index must be present in ORM.""" indexes = MusehubIntelBreakageIssue.__table__.indexes names = {idx.name for idx in indexes} assert "ix_intel_breakage_issues_repo_type" in names # ───────────────────────────────────────────────────────────────────────────── # Layer T6 — Performance # ───────────────────────────────────────────────────────────────────────────── class TestPerformance: @pytest.mark.asyncio async def test_T29_provider_completes_under_5s( self, db_session: AsyncSession ) -> None: """BreakageProvider must complete in under 5 seconds for 50 files.""" from musehub.services.musehub_intel_providers import BreakageProvider repo = await create_repo(db_session, owner="bkperf1", slug="bk-perf1") rid = str(repo.repo_id) manifest = {f"src/file_{i}.py": long_id("a" * 62 + f"{i:02d}") for i in range(50)} await _seed_commit(db_session, rid, manifest, "bkperf1", "bk-perf1") file_trees = [ dict([_import_sym(f"src/file_{i}.py", "gone_fn", "gone.module")]) for i in range(50) ] mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"content") t0 = time.monotonic() with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=file_trees), ): await BreakageProvider().compute( db_session, rid, "dev", {"owner": "bkperf1", "slug": "bk-perf1"} ) elapsed = time.monotonic() - t0 assert elapsed < 5.0, f"Provider took {elapsed:.2f}s (> 5s limit)" @pytest.mark.asyncio async def test_T30_route_responds_under_2s( self, client: AsyncClient, bk_repo: MusehubRepo ) -> None: """GET /intel/breakage must respond in under 2 seconds.""" t0 = time.monotonic() resp = await client.get("/bkuser/bk-e2e/intel/breakage") elapsed = time.monotonic() - t0 assert resp.status_code == 200 assert elapsed < 2.0, f"Route took {elapsed:.2f}s (> 2s limit)" @pytest.mark.asyncio async def test_T31_bulk_upsert_500_issues( self, db_session: AsyncSession ) -> None: """Inserting 500 issues via upsert must not raise.""" repo = await create_repo(db_session, owner="bkperf3", slug="bk-bulk") rid = str(repo.repo_id) for i in range(500): hex_i = format(i, "060x") await _insert_issue( db_session, rid, issue_id=long_id(hex_i[:64]), file_path=f"src/f{i}.py", ) await db_session.commit() count = (await db_session.execute( sa.select(sa.func.count()) .select_from(MusehubIntelBreakageIssue) .where(MusehubIntelBreakageIssue.repo_id == rid) )).scalar_one() assert count == 500 # ───────────────────────────────────────────────────────────────────────────── # Layer T7 — Security # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: @pytest.mark.asyncio async def test_T32_xss_in_file_path_is_escaped( self, client: AsyncClient, db_session: AsyncSession ) -> None: """file_path with " await _insert_issue( db_session, rid, issue_id=long_id("x" * 64), file_path=xss, description="imports 'fn' but no symbol or module with that name exists in the HEAD snapshot", ) await db_session.commit() resp = await client.get("/bksec1/bk-xss/intel/breakage") assert resp.status_code == 200 assert "