"""Breakage intel — full 7-tier test suite (issue #23).
Tests are written TDD-first: all tests must be RED before Phase 4–7
implementation begins, then GREEN after.
Tiers
-----
T01–T05 Layer T1 — DB model (columns, nullable, cascade, indexes, meta)
T06–T12 Layer T2 — Provider (no subprocess, stale detect, known sym bypass, module resolve bypass, empty, idempotent, meta)
T13–T19 Layer T3 — Route (200, empty state, 404, type filter, top filter, stat chips, bad input no 500)
T20–T24 Layer T4 — E2E HTML (severity badges, stat chips, file paths, dashboard card, empty state text)
T25–T28 Layer T5 — Data integrity (upsert idempotent, cross-repo isolation, severity stored, type index)
T29–T31 Layer T6 — Performance (provider speed, route speed, bulk upsert)
T32–T34 Layer T7 — Security (XSS in file_path, SQL injection top param, no 500 on bad type)
"""
from __future__ import annotations
import time
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import msgpack
import pytest
import pytest_asyncio
import sqlalchemy as sa
from httpx import AsyncClient
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from muse.core.types import long_id
from musehub.db.musehub_intel_models import MusehubIntelBreakageIssue, MusehubIntelBreakageMeta
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef
from tests.factories import create_repo
_REF = long_id("a" * 64)
_SNAP = long_id("b" * 64)
_CID = long_id("c" * 64)
_OBJ_1 = long_id("d" * 64)
_OBJ_2 = long_id("e" * 64)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
async def _insert_issue(
session: AsyncSession,
repo_id: str,
issue_id: str,
file_path: str = "src/foo.py",
issue_type: str = "stale_import",
description: str = "imports 'blob_id' but no symbol or module with that name exists in the HEAD snapshot",
severity: str = "warning",
ref: str = "dev",
) -> None:
await session.execute(
pg_insert(MusehubIntelBreakageIssue)
.values(
issue_id=issue_id,
repo_id=repo_id,
file_path=file_path,
issue_type=issue_type,
description=description,
severity=severity,
ref=ref,
)
.on_conflict_do_update(
index_elements=["issue_id"],
set_={
"file_path": file_path,
"issue_type": issue_type,
"description": description,
"severity": severity,
"ref": ref,
},
)
)
async def _insert_meta(
session: AsyncSession,
repo_id: str,
total_issues: int = 0,
warning_count: int = 0,
error_count: int = 0,
file_count: int = 0,
ref: str = "dev",
) -> None:
await session.execute(
pg_insert(MusehubIntelBreakageMeta)
.values(
repo_id=repo_id,
total_issues=total_issues,
warning_count=warning_count,
error_count=error_count,
file_count=file_count,
ref=ref,
)
.on_conflict_do_update(
index_elements=["repo_id"],
set_={
"total_issues": total_issues,
"warning_count": warning_count,
"error_count": error_count,
"file_count": file_count,
"ref": ref,
},
)
)
async def _seed_commit(
session: AsyncSession,
repo_id: str,
manifest: dict[str, str],
owner: str,
slug: str,
) -> None:
"""Insert a single commit + snapshot (no parent needed for breakage)."""
await session.execute(
pg_insert(MusehubSnapshot)
.values(
snapshot_id = _SNAP,
directories = [],
manifest_blob = msgpack.packb(manifest),
entry_count = len(manifest),
created_at = datetime(2026, 1, 1, tzinfo=timezone.utc),
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubSnapshotRef)
.values(repo_id=repo_id, snapshot_id=_SNAP)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommit)
.values(
commit_id = _CID,
branch = "dev",
parent_ids = [],
message = "feat: some change",
author = owner,
timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc),
snapshot_id = _SNAP,
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommitRef)
.values(repo_id=repo_id, commit_id=_CID)
.on_conflict_do_nothing()
)
await session.commit()
def _import_sym(
file_path: str,
symbol_name: str,
module_dotted: str,
) -> tuple[str, dict]:
"""Return ``(address, rec)`` for a parse_symbols import record."""
address = f"{file_path}::import::{symbol_name}"
return address, {
"kind": "import",
"name": symbol_name,
"qualified_name": f"import::{module_dotted}::{symbol_name}",
"content_id": long_id("1" * 64),
"body_hash": "",
"signature_id": "",
"metadata_id": "",
"canonical_key": f"{file_path}##import#{symbol_name}#1",
"lineno": 1,
"end_lineno": 1,
}
def _fn_sym(
file_path: str,
name: str,
) -> tuple[str, dict]:
"""Return ``(address, rec)`` for a parse_symbols function record."""
address = f"{file_path}::{name}"
return address, {
"kind": "function",
"name": name,
"qualified_name": name,
"content_id": long_id("2" * 64),
"body_hash": long_id("3" * 64),
"signature_id": long_id("4" * 64),
"metadata_id": "",
"canonical_key": f"{file_path}##function#{name}#1",
"lineno": 1,
"end_lineno": 5,
}
@pytest_asyncio.fixture
async def bk_repo(db_session: AsyncSession) -> MusehubRepo:
"""Repo seeded with 5 breakage issue rows and a meta row."""
repo = await create_repo(db_session, owner="bkuser", slug="bk-e2e")
rid = str(repo.repo_id)
for i in range(5):
await _insert_issue(
db_session, rid,
issue_id=f"sha256:bk{'0' * 60}{i:02d}",
file_path=f"src/file_{i}.py",
description=f"imports 'sym_{i}' but no symbol or module with that name exists in the HEAD snapshot",
)
await _insert_meta(db_session, rid, total_issues=5, warning_count=5, file_count=5)
await db_session.commit()
return repo
# ─────────────────────────────────────────────────────────────────────────────
# Layer T1 — DB model
# ─────────────────────────────────────────────────────────────────────────────
class TestDBModel:
def test_T01_issue_model_has_required_columns(self) -> None:
"""MusehubIntelBreakageIssue must expose all required columns."""
cols = {c.name for c in MusehubIntelBreakageIssue.__table__.columns}
for required in ("issue_id", "repo_id", "file_path", "issue_type", "description", "severity", "ref"):
assert required in cols, f"Column '{required}' missing"
def test_T02_meta_model_has_required_columns(self) -> None:
"""MusehubIntelBreakageMeta must expose all required columns."""
cols = {c.name for c in MusehubIntelBreakageMeta.__table__.columns}
for required in ("repo_id", "total_issues", "warning_count", "error_count", "file_count", "ref"):
assert required in cols, f"Column '{required}' missing"
def test_T03_cascade_delete_on_issue(self) -> None:
"""repo_id FK on issues table must use CASCADE."""
fks = MusehubIntelBreakageIssue.__table__.foreign_keys
for fk in fks:
if "repo_id" in str(fk.parent):
assert fk.ondelete == "CASCADE"
return
pytest.fail("No CASCADE FK found for repo_id on breakage issues")
def test_T04_cascade_delete_on_meta(self) -> None:
"""repo_id FK on meta table must use CASCADE."""
fks = MusehubIntelBreakageMeta.__table__.foreign_keys
for fk in fks:
if "repo_id" in str(fk.parent):
assert fk.ondelete == "CASCADE"
return
pytest.fail("No CASCADE FK found for repo_id on breakage meta")
def test_T05_composite_index_exists(self) -> None:
"""ix_intel_breakage_issues_repo_type index must cover (repo_id, issue_type)."""
indexes = MusehubIntelBreakageIssue.__table__.indexes
names = {idx.name for idx in indexes}
assert "ix_intel_breakage_issues_repo_type" in names
# ─────────────────────────────────────────────────────────────────────────────
# Layer T2 — Provider
# ─────────────────────────────────────────────────────────────────────────────
class TestProvider:
@pytest.mark.asyncio
async def test_T06_provider_uses_no_subprocess(
self, db_session: AsyncSession
) -> None:
"""BreakageProvider must never call _run_muse or import subprocess."""
from musehub.services import musehub_intel_providers as svc
import inspect, ast, textwrap
src = inspect.getsource(svc.BreakageProvider)
# Strip docstrings from AST so comment-only mentions don't trip us up
tree = ast.parse(textwrap.dedent(src))
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
if (
node.body
and isinstance(node.body[0], ast.Expr)
and isinstance(node.body[0].value, ast.Constant)
):
node.body = node.body[1:] or [ast.Pass()]
non_doc_src = ast.unparse(tree)
assert "_run_muse" not in non_doc_src, "BreakageProvider must not call _run_muse"
assert "subprocess" not in non_doc_src, "BreakageProvider must not use subprocess"
assert "create_subprocess" not in non_doc_src
@pytest.mark.asyncio
async def test_T07_provider_detects_stale_import(
self, db_session: AsyncSession
) -> None:
"""Provider must detect an import whose symbol doesn't exist anywhere."""
from musehub.services.musehub_intel_providers import BreakageProvider
repo = await create_repo(db_session, owner="bkp1", slug="bk-stale")
rid = str(repo.repo_id)
await _seed_commit(db_session, rid, {"src/a.py": _OBJ_1}, "bkp1", "bk-stale")
# src/a.py imports 'blob_id' from 'muse.core.types'
# 'muse/core/types.py' is NOT in the manifest
# 'blob_id' is NOT defined anywhere in the snapshot
file_tree = dict([
_import_sym("src/a.py", "blob_id", "muse.core.types"),
])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"src content")
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols", return_value=file_tree),
):
results = await BreakageProvider().compute(
db_session, rid, "dev", {"owner": "bkp1", "slug": "bk-stale"}
)
assert results, "Expected at least one result tuple"
count = results[0][1]["count"]
assert count == 1, f"Expected 1 stale import, got {count}"
row = (await db_session.execute(
sa.select(MusehubIntelBreakageIssue)
.where(MusehubIntelBreakageIssue.repo_id == rid)
)).scalars().first()
assert row is not None
assert row.issue_type == "stale_import"
assert "blob_id" in row.description
@pytest.mark.asyncio
async def test_T08_known_symbol_bypasses_stale_flag(
self, db_session: AsyncSession
) -> None:
"""Import of a symbol that exists somewhere in the snapshot is NOT stale."""
from musehub.services.musehub_intel_providers import BreakageProvider
repo = await create_repo(db_session, owner="bkp2", slug="bk-known")
rid = str(repo.repo_id)
await _seed_commit(
db_session, rid,
{"src/a.py": _OBJ_1, "src/b.py": _OBJ_2},
"bkp2", "bk-known",
)
# src/a.py imports 'my_fn' from 'src.b'
# src/b.py defines 'my_fn'
# → not stale
tree_a = dict([_import_sym("src/a.py", "my_fn", "src.b")])
tree_b = dict([_fn_sym("src/b.py", "my_fn")])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"a", b"b"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[tree_a, tree_b]),
):
results = await BreakageProvider().compute(
db_session, rid, "dev", {"owner": "bkp2", "slug": "bk-known"}
)
count = results[0][1]["count"]
assert count == 0, f"Expected 0 issues (symbol exists), got {count}"
@pytest.mark.asyncio
async def test_T09_resolved_module_bypasses_stale_flag(
self, db_session: AsyncSession
) -> None:
"""Import whose module path resolves to a tracked file is NOT stale."""
from musehub.services.musehub_intel_providers import BreakageProvider
repo = await create_repo(db_session, owner="bkp3", slug="bk-resolve")
rid = str(repo.repo_id)
# Manifest includes src/b.py — so 'src.b' resolves
await _seed_commit(
db_session, rid,
{"src/a.py": _OBJ_1, "src/b.py": _OBJ_2},
"bkp3", "bk-resolve",
)
# src/a.py imports 'anything' from 'src.b' — module resolves → not stale
tree_a = dict([_import_sym("src/a.py", "anything", "src.b")])
tree_b = dict([_fn_sym("src/b.py", "other_fn")])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"a", b"b"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[tree_a, tree_b]),
):
results = await BreakageProvider().compute(
db_session, rid, "dev", {"owner": "bkp3", "slug": "bk-resolve"}
)
count = results[0][1]["count"]
assert count == 0, f"Expected 0 issues (module resolved), got {count}"
@pytest.mark.asyncio
async def test_T10_provider_returns_empty_on_empty_manifest(
self, db_session: AsyncSession
) -> None:
"""Provider must return [(intel_type, {count: 0})] on empty snapshot."""
from musehub.services.musehub_intel_providers import BreakageProvider
repo = await create_repo(db_session, owner="bkp4", slug="bk-empty")
rid = str(repo.repo_id)
await _seed_commit(db_session, rid, {}, "bkp4", "bk-empty")
results = await BreakageProvider().compute(
db_session, rid, "dev", {"owner": "bkp4", "slug": "bk-empty"}
)
assert results[0][1]["count"] == 0
@pytest.mark.asyncio
async def test_T11_provider_is_idempotent(
self, db_session: AsyncSession
) -> None:
"""Running the provider twice must produce exactly 1 row, not 2."""
from musehub.services.musehub_intel_providers import BreakageProvider
repo = await create_repo(db_session, owner="bkp5", slug="bk-idempotent")
rid = str(repo.repo_id)
await _seed_commit(db_session, rid, {"src/a.py": _OBJ_1}, "bkp5", "bk-idempotent")
file_tree = dict([_import_sym("src/a.py", "gone_fn", "gone.module")])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"content")
for _ in range(2):
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols", return_value=file_tree),
):
await BreakageProvider().compute(
db_session, rid, "dev", {"owner": "bkp5", "slug": "bk-idempotent"}
)
rows = (await db_session.execute(
sa.select(MusehubIntelBreakageIssue)
.where(MusehubIntelBreakageIssue.repo_id == rid)
)).scalars().all()
assert len(rows) == 1, f"Expected 1 row after 2 runs, got {len(rows)}"
@pytest.mark.asyncio
async def test_T12_provider_writes_meta_row(
self, db_session: AsyncSession
) -> None:
"""Provider must upsert a meta row with accurate counts."""
from musehub.services.musehub_intel_providers import BreakageProvider
repo = await create_repo(db_session, owner="bkp6", slug="bk-meta")
rid = str(repo.repo_id)
await _seed_commit(
db_session, rid,
{"src/a.py": _OBJ_1, "src/b.py": _OBJ_2},
"bkp6", "bk-meta",
)
tree_a = dict([
_import_sym("src/a.py", "stale1", "gone.one"),
_import_sym("src/a.py", "stale2", "gone.two"),
])
tree_b = dict([_import_sym("src/b.py", "stale3", "gone.three")])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"a", b"b"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[tree_a, tree_b]),
):
await BreakageProvider().compute(
db_session, rid, "dev", {"owner": "bkp6", "slug": "bk-meta"}
)
meta = (await db_session.execute(
sa.select(MusehubIntelBreakageMeta)
.where(MusehubIntelBreakageMeta.repo_id == rid)
)).scalars().first()
assert meta is not None, "Meta row not written"
assert meta.total_issues == 3
assert meta.warning_count == 3
assert meta.file_count == 2
# ─────────────────────────────────────────────────────────────────────────────
# Layer T3 — Route
# ─────────────────────────────────────────────────────────────────────────────
class TestRoute:
@pytest.mark.asyncio
async def test_T13_breakage_page_returns_200(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""GET /bkuser/bk-e2e/intel/breakage must return HTTP 200."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage")
assert resp.status_code == 200, resp.text[:500]
@pytest.mark.asyncio
async def test_T14_breakage_page_empty_state(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must render empty state when no issue rows exist."""
repo = await create_repo(db_session, owner="bkempty", slug="bk-nodata")
await db_session.commit()
resp = await client.get("/bkempty/bk-nodata/intel/breakage")
assert resp.status_code == 200
assert "Push a commit" in resp.text
@pytest.mark.asyncio
async def test_T15_breakage_page_404_for_unknown_repo(
self, client: AsyncClient
) -> None:
"""Route must return 404 for an unknown repo slug."""
resp = await client.get("/nobody/nonexistent-repo/intel/breakage")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_T16_type_filter_limits_results(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""?type=stale_import must return 200 and include stale_import rows."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage?type=stale_import")
assert resp.status_code == 200
assert "stale_import" in resp.text
@pytest.mark.asyncio
async def test_T17_top_filter_limits_results(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""?top=2 must return 200 and limit the issue list."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage?top=2")
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_T18_stat_chips_present_in_html(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""Response must include the stat chip elements."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage")
assert "bk-stat-val" in resp.text
@pytest.mark.asyncio
async def test_T19_invalid_top_does_not_500(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""?top=GARBAGE must return 200 and fall back to default top."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage?top=GARBAGE")
assert resp.status_code == 200
# ─────────────────────────────────────────────────────────────────────────────
# Layer T4 — E2E HTML
# ─────────────────────────────────────────────────────────────────────────────
class TestHTML:
@pytest.mark.asyncio
async def test_T20_severity_badge_in_html(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""Each issue row must include a severity badge element."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage")
assert "bk-sev-badge" in resp.text
@pytest.mark.asyncio
async def test_T21_stat_chips_rendered(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""Stat chips for Total and Warnings must appear in the page."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage")
html = resp.text
assert "bk-stat-val" in html
assert "bk-stat-lbl" in html
@pytest.mark.asyncio
async def test_T22_file_paths_rendered_in_rows(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""Each issue row must display the file_path."""
resp = await client.get("/bkuser/bk-e2e/intel/breakage")
html = resp.text
assert "src/file_0.py" in html
@pytest.mark.asyncio
async def test_T23_empty_state_has_helpful_message(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Empty-state div must contain a push-prompt message."""
repo = await create_repo(db_session, owner="bkempty2", slug="bk-empty2")
await db_session.commit()
resp = await client.get("/bkempty2/bk-empty2/intel/breakage")
assert "Push a commit" in resp.text
@pytest.mark.asyncio
async def test_T24_dashboard_card_links_to_breakage_page(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""Intel dashboard must include a card linking to the breakage page."""
resp = await client.get("/bkuser/bk-e2e/intel")
html = resp.text
assert "breakage" in html.lower()
# ─────────────────────────────────────────────────────────────────────────────
# Layer T5 — Data integrity
# ─────────────────────────────────────────────────────────────────────────────
class TestDataIntegrity:
@pytest.mark.asyncio
async def test_T25_upsert_is_idempotent(
self, db_session: AsyncSession
) -> None:
"""Inserting the same issue_id twice must yield exactly one row."""
repo = await create_repo(db_session, owner="bkdi1", slug="bk-di1")
rid = str(repo.repo_id)
iid = long_id("f" * 64)
for _ in range(2):
await _insert_issue(db_session, rid, issue_id=iid)
await db_session.commit()
rows = (await db_session.execute(
sa.select(MusehubIntelBreakageIssue)
.where(MusehubIntelBreakageIssue.repo_id == rid)
)).scalars().all()
assert len(rows) == 1
@pytest.mark.asyncio
async def test_T26_cross_repo_isolation(
self, db_session: AsyncSession
) -> None:
"""Issues from repo A must not appear in repo B queries."""
repo_a = await create_repo(db_session, owner="bkdi2a", slug="bk-a")
repo_b = await create_repo(db_session, owner="bkdi2b", slug="bk-b")
rid_a = str(repo_a.repo_id)
rid_b = str(repo_b.repo_id)
await _insert_issue(db_session, rid_a, long_id("a" * 64), file_path="src/a.py")
await db_session.commit()
rows_b = (await db_session.execute(
sa.select(MusehubIntelBreakageIssue)
.where(MusehubIntelBreakageIssue.repo_id == rid_b)
)).scalars().all()
assert len(rows_b) == 0
@pytest.mark.asyncio
async def test_T27_severity_stored_correctly(
self, db_session: AsyncSession
) -> None:
"""Rows must preserve the severity value written by the provider."""
repo = await create_repo(db_session, owner="bkdi3", slug="bk-sev")
rid = str(repo.repo_id)
await _insert_issue(db_session, rid, long_id("e" * 64), severity="error")
await db_session.commit()
row = (await db_session.execute(
sa.select(MusehubIntelBreakageIssue)
.where(MusehubIntelBreakageIssue.repo_id == rid)
)).scalars().first()
assert row is not None
assert row.severity == "error"
@pytest.mark.asyncio
async def test_T28_type_index_exists(self) -> None:
"""ix_intel_breakage_issues_repo_type index must be present in ORM."""
indexes = MusehubIntelBreakageIssue.__table__.indexes
names = {idx.name for idx in indexes}
assert "ix_intel_breakage_issues_repo_type" in names
# ─────────────────────────────────────────────────────────────────────────────
# Layer T6 — Performance
# ─────────────────────────────────────────────────────────────────────────────
class TestPerformance:
@pytest.mark.asyncio
async def test_T29_provider_completes_under_5s(
self, db_session: AsyncSession
) -> None:
"""BreakageProvider must complete in under 5 seconds for 50 files."""
from musehub.services.musehub_intel_providers import BreakageProvider
repo = await create_repo(db_session, owner="bkperf1", slug="bk-perf1")
rid = str(repo.repo_id)
manifest = {f"src/file_{i}.py": long_id("a" * 62 + f"{i:02d}") for i in range(50)}
await _seed_commit(db_session, rid, manifest, "bkperf1", "bk-perf1")
file_trees = [
dict([_import_sym(f"src/file_{i}.py", "gone_fn", "gone.module")])
for i in range(50)
]
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"content")
t0 = time.monotonic()
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=file_trees),
):
await BreakageProvider().compute(
db_session, rid, "dev", {"owner": "bkperf1", "slug": "bk-perf1"}
)
elapsed = time.monotonic() - t0
assert elapsed < 5.0, f"Provider took {elapsed:.2f}s (> 5s limit)"
@pytest.mark.asyncio
async def test_T30_route_responds_under_2s(
self, client: AsyncClient, bk_repo: MusehubRepo
) -> None:
"""GET /intel/breakage must respond in under 2 seconds."""
t0 = time.monotonic()
resp = await client.get("/bkuser/bk-e2e/intel/breakage")
elapsed = time.monotonic() - t0
assert resp.status_code == 200
assert elapsed < 2.0, f"Route took {elapsed:.2f}s (> 2s limit)"
@pytest.mark.asyncio
async def test_T31_bulk_upsert_500_issues(
self, db_session: AsyncSession
) -> None:
"""Inserting 500 issues via upsert must not raise."""
repo = await create_repo(db_session, owner="bkperf3", slug="bk-bulk")
rid = str(repo.repo_id)
for i in range(500):
hex_i = format(i, "060x")
await _insert_issue(
db_session, rid,
issue_id=long_id(hex_i[:64]),
file_path=f"src/f{i}.py",
)
await db_session.commit()
count = (await db_session.execute(
sa.select(sa.func.count())
.select_from(MusehubIntelBreakageIssue)
.where(MusehubIntelBreakageIssue.repo_id == rid)
)).scalar_one()
assert count == 500
# ─────────────────────────────────────────────────────────────────────────────
# Layer T7 — Security
# ─────────────────────────────────────────────────────────────────────────────
class TestSecurity:
@pytest.mark.asyncio
async def test_T32_xss_in_file_path_is_escaped(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""file_path with "
await _insert_issue(
db_session, rid,
issue_id=long_id("x" * 64),
file_path=xss,
description="imports 'fn' but no symbol or module with that name exists in the HEAD snapshot",
)
await db_session.commit()
resp = await client.get("/bksec1/bk-xss/intel/breakage")
assert resp.status_code == 200
assert "