"""Detect-Refactor intel — full 7-tier test suite (issue #22). Tests are written TDD-first: all tests must be RED before Phase 4–7 implementation begins, then GREEN after. Tiers ----- T01–T05 Layer T1 — DB model (columns, nullable, cascade, index, commit_message) T06–T12 Layer T2 — Provider (no subprocess, impl, sig, move, rename, empty, idempotent) T13–T19 Layer T3 — Route (200, empty state, 404, kind filter, top filter, stat chips, sort) T20–T24 Layer T4 — E2E HTML (kind badges, detail links, stat chips, cycle panel absent, dashboard card) T25–T28 Layer T5 — Data integrity (upsert idempotent, cross-repo isolation, commit_message stored, kind index) T29–T31 Layer T6 — Performance (provider speed, route speed, bulk upsert) T32–T34 Layer T7 — Security (XSS escape in address, SQL injection top param, no 500 on bad kind) """ from __future__ import annotations import time from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import msgpack import pytest import pytest_asyncio import sqlalchemy as sa from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelRefactorEvent from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import JSONObject from tests.factories import create_repo from muse.core.types import long_id _REF = long_id("a" * 64) _SNAP_A = long_id("b" * 64) _SNAP_B = long_id("c" * 64) _CID_A = long_id("d" * 64) _CID_B = long_id("e" * 64) _OBJ_1 = long_id("f" * 64) _OBJ_2 = long_id("1" * 64) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── async def _insert_event( session: AsyncSession, repo_id: str, event_id: str, kind: str = "implementation", address: str = "src/foo.py::bar", detail: str | None = None, commit_id: str = _CID_A, commit_message: str | None = "feat: add bar", committed_at: datetime | None = None, ) -> None: """Upsert one row into musehub_intel_refactor_events.""" if committed_at is None: committed_at = datetime(2026, 1, 1, tzinfo=timezone.utc) await session.execute( pg_insert(MusehubIntelRefactorEvent) .values( event_id=event_id, repo_id=repo_id, kind=kind, address=address, detail=detail, commit_id=commit_id, commit_message=commit_message, committed_at=committed_at, ) .on_conflict_do_update( index_elements=["event_id"], set_={ "kind": kind, "address": address, "detail": detail, "commit_id": commit_id, "commit_message": commit_message, "committed_at": committed_at, }, ) ) async def _seed_two_commits( session: AsyncSession, repo_id: str, head_manifest: dict[str, str], parent_manifest: dict[str, str], owner: str, slug: str, ) -> tuple[str, str]: """Insert parent commit/snapshot then HEAD commit/snapshot. Returns ``(head_commit_id, parent_commit_id)``. """ # ── parent ──────────────────────────────────────────────────────────────── await session.execute( pg_insert(MusehubSnapshot) .values( snapshot_id = _SNAP_B, directories = [], manifest_blob = msgpack.packb(parent_manifest), entry_count = len(parent_manifest), created_at = datetime(2026, 1, 1, tzinfo=timezone.utc), ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=_SNAP_B) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommit) .values( commit_id = _CID_B, branch = "dev", parent_ids = [], message = "chore: initial", author = owner, timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc), snapshot_id = _SNAP_B, ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=_CID_B) .on_conflict_do_nothing() ) # ── HEAD ────────────────────────────────────────────────────────────────── await session.execute( pg_insert(MusehubSnapshot) .values( snapshot_id = _SNAP_A, directories = [], manifest_blob = msgpack.packb(head_manifest), entry_count = len(head_manifest), created_at = datetime(2026, 1, 2, tzinfo=timezone.utc), ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=_SNAP_A) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommit) .values( commit_id = _CID_A, branch = "dev", parent_ids = [_CID_B], message = "feat: refactor things", author = owner, timestamp = datetime(2026, 1, 2, tzinfo=timezone.utc), snapshot_id = _SNAP_A, ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=_CID_A) .on_conflict_do_nothing() ) await session.commit() return _CID_A, _CID_B def _sym( file_path: str, name: str, body_hash: str, signature_id: str, kind: str = "function", ) -> tuple[str, dict]: """Return ``(address, rec)`` suitable for a parse_symbols side_effect dict.""" return f"{file_path}::{name}", { "kind": kind, "name": name, "qualified_name": name, "content_id": long_id("9" * 64), "body_hash": body_hash, "signature_id": signature_id, "metadata_id": "", "canonical_key": f"{file_path}##function#{name}#1", "lineno": 1, "end_lineno": 5, } @pytest_asyncio.fixture async def rf_repo(db_session: AsyncSession) -> MusehubRepo: """Repo seeded with 5 detect-refactor event rows.""" repo = await create_repo(db_session, owner="rfuser", slug="rf-e2e") rid = str(repo.repo_id) kinds = [ ("implementation", "src/a.py::foo"), ("implementation", "src/b.py::bar"), ("signature", "src/c.py::baz"), ("move", "src/d.py::old_fn"), ("rename", "src/e.py::qux"), ] for i, (kind, addr) in enumerate(kinds): await _insert_event( db_session, rid, event_id=long_id(f"ev{'0' * 60}{i:02d}"), kind=kind, address=addr, commit_id=_CID_A, ) await db_session.commit() return repo # ───────────────────────────────────────────────────────────────────────────── # Layer T1 — DB model # ───────────────────────────────────────────────────────────────────────────── class TestDBModel: def test_T01_model_has_commit_message_column(self) -> None: """MusehubIntelRefactorEvent must expose a commit_message column.""" cols = {c.name for c in MusehubIntelRefactorEvent.__table__.columns} assert "commit_message" in cols def test_T02_commit_message_is_nullable(self) -> None: """commit_message must be nullable (older rows pre-migration have NULL).""" col = MusehubIntelRefactorEvent.__table__.columns["commit_message"] assert col.nullable is True def test_T03_model_has_required_columns(self) -> None: """event_id, repo_id, kind, address, commit_id, committed_at must exist.""" cols = {c.name for c in MusehubIntelRefactorEvent.__table__.columns} for required in ("event_id", "repo_id", "kind", "address", "commit_id", "committed_at"): assert required in cols, f"Column '{required}' missing" def test_T04_cascade_delete_configured(self) -> None: """repo_id FK must use CASCADE so repo deletion cleans events.""" fks = MusehubIntelRefactorEvent.__table__.foreign_keys for fk in fks: if "repo_id" in str(fk.parent): assert fk.ondelete == "CASCADE" return pytest.fail("No CASCADE FK found for repo_id") def test_T05_composite_index_exists(self) -> None: """ix_intel_refactor_events_repo_kind index must cover (repo_id, kind).""" indexes = MusehubIntelRefactorEvent.__table__.indexes names = {idx.name for idx in indexes} assert "ix_intel_refactor_events_repo_kind" in names # ───────────────────────────────────────────────────────────────────────────── # Layer T2 — Provider # ───────────────────────────────────────────────────────────────────────────── class TestProvider: @pytest.mark.asyncio async def test_T06_provider_uses_no_subprocess( self, db_session: AsyncSession ) -> None: """DetectRefactorProvider must never call _run_muse or import subprocess.""" from musehub.services import musehub_intel_providers as svc import inspect, ast, textwrap src = inspect.getsource(svc.DetectRefactorProvider) # Strip docstrings from AST so comment-only mentions don't trip us up tree = ast.parse(textwrap.dedent(src)) non_doc_src = ast.unparse(tree) assert "_run_muse" not in non_doc_src, "DetectRefactorProvider must not call _run_muse" assert "import subprocess" not in non_doc_src, "DetectRefactorProvider must not import subprocess" @pytest.mark.asyncio async def test_T07_provider_detects_implementation_change( self, db_session: AsyncSession ) -> None: """Same address, different body_hash → kind='implementation'.""" from musehub.services.musehub_intel_providers import DetectRefactorProvider repo = await create_repo(db_session, owner="rfp1", slug="rf-impl") rid = str(repo.repo_id) manifest = {"src/foo.py": _OBJ_1} await _seed_two_commits(db_session, rid, manifest, manifest, "rfp1", "rf-impl") # HEAD has different body_hash, same sig head_tree = dict([_sym("src/foo.py", "bar", long_id("a" * 64), long_id("s" * 64))]) parent_tree = dict([_sym("src/foo.py", "bar", long_id("b" * 64), long_id("s" * 64))]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[head_tree, parent_tree, head_tree, parent_tree]), ): results = await DetectRefactorProvider().compute( db_session, rid, _CID_A, {"owner": "rfp1", "slug": "rf-impl"} ) assert results, "Provider returned no results" count = results[0][1]["count"] assert count == 1, f"Expected 1 implementation event, got {count}" row = (await db_session.execute( sa.select(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalars().first() assert row is not None assert row.kind == "implementation" @pytest.mark.asyncio async def test_T08_provider_detects_signature_change( self, db_session: AsyncSession ) -> None: """Same body_hash but different signature_id → kind='signature'.""" from musehub.services.musehub_intel_providers import DetectRefactorProvider repo = await create_repo(db_session, owner="rfp2", slug="rf-sig") rid = str(repo.repo_id) manifest = {"src/foo.py": _OBJ_1} await _seed_two_commits(db_session, rid, manifest, manifest, "rfp2", "rf-sig") body_h = long_id("b" * 64) head_tree = dict([_sym("src/foo.py", "bar", body_h, long_id("s1" + "a" * 62))]) parent_tree = dict([_sym("src/foo.py", "bar", body_h, long_id("s2" + "a" * 62))]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[head_tree, parent_tree, head_tree, parent_tree]), ): results = await DetectRefactorProvider().compute( db_session, rid, _CID_A, {"owner": "rfp2", "slug": "rf-sig"} ) count = results[0][1]["count"] assert count == 1, f"Expected 1 signature event, got {count}" row = (await db_session.execute( sa.select(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalars().first() assert row is not None assert row.kind == "signature" @pytest.mark.asyncio async def test_T09_provider_detects_move( self, db_session: AsyncSession ) -> None: """Same body_hash at different file path → kind='move'.""" from musehub.services.musehub_intel_providers import DetectRefactorProvider repo = await create_repo(db_session, owner="rfp3", slug="rf-move") rid = str(repo.repo_id) head_manifest = {"src/new.py": _OBJ_1} parent_manifest = {"src/old.py": _OBJ_2} await _seed_two_commits( db_session, rid, head_manifest, parent_manifest, "rfp3", "rf-move" ) body_h = long_id("b" * 64) sig_h = long_id("s" * 64) head_tree = dict([_sym("src/new.py", "fn", body_h, sig_h)]) parent_tree = dict([_sym("src/old.py", "fn", body_h, sig_h)]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[head_tree, parent_tree, head_tree, parent_tree]), ): results = await DetectRefactorProvider().compute( db_session, rid, _CID_A, {"owner": "rfp3", "slug": "rf-move"} ) count = results[0][1]["count"] assert count == 1, f"Expected 1 move event, got {count}" row = (await db_session.execute( sa.select(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalars().first() assert row is not None assert row.kind == "move" assert row.detail == "src/new.py::fn" @pytest.mark.asyncio async def test_T10_provider_detects_rename( self, db_session: AsyncSession ) -> None: """Same body_hash at same file but different name → kind='rename'.""" from musehub.services.musehub_intel_providers import DetectRefactorProvider repo = await create_repo(db_session, owner="rfp4", slug="rf-rename") rid = str(repo.repo_id) manifest = {"src/foo.py": _OBJ_1} await _seed_two_commits(db_session, rid, manifest, manifest, "rfp4", "rf-rename") body_h = long_id("b" * 64) sig_h = long_id("s" * 64) head_tree = dict([_sym("src/foo.py", "new_name", body_h, sig_h)]) parent_tree = dict([_sym("src/foo.py", "old_name", body_h, sig_h)]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"]) with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[head_tree, parent_tree, head_tree, parent_tree]), ): results = await DetectRefactorProvider().compute( db_session, rid, _CID_A, {"owner": "rfp4", "slug": "rf-rename"} ) count = results[0][1]["count"] assert count == 1, f"Expected 1 rename event, got {count}" row = (await db_session.execute( sa.select(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalars().first() assert row is not None assert row.kind == "rename" @pytest.mark.asyncio async def test_T11_provider_returns_empty_when_no_parent( self, db_session: AsyncSession ) -> None: """Initial commit (no parent) must produce no events.""" from musehub.services.musehub_intel_providers import DetectRefactorProvider repo = await create_repo(db_session, owner="rfp5", slug="rf-noparen") rid = str(repo.repo_id) snap_id = long_id("f" * 64) await session_insert_snapshot(db_session, rid, snap_id, {"src/a.py": _OBJ_1}) await db_session.execute( pg_insert(MusehubCommit) .values( commit_id = _CID_A, branch = "dev", parent_ids = [], # ← no parent message = "init", author = "rfp5", timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc), snapshot_id = snap_id, ) .on_conflict_do_nothing() ) await db_session.execute( pg_insert(MusehubCommitRef) .values(repo_id=rid, commit_id=_CID_A) .on_conflict_do_nothing() ) await db_session.commit() results = await DetectRefactorProvider().compute( db_session, rid, _CID_A, {"owner": "rfp5", "slug": "rf-noparen"} ) assert results == [], f"Expected [], got {results}" @pytest.mark.asyncio async def test_T12_provider_is_idempotent( self, db_session: AsyncSession ) -> None: """Running the provider twice must not create duplicate rows.""" from musehub.services.musehub_intel_providers import DetectRefactorProvider repo = await create_repo(db_session, owner="rfp6", slug="rf-idem") rid = str(repo.repo_id) manifest = {"src/foo.py": _OBJ_1} await _seed_two_commits(db_session, rid, manifest, manifest, "rfp6", "rf-idem") body_h = long_id("b" * 64) head_tree = dict([_sym("src/foo.py", "bar", body_h, long_id("x" * 64))]) parent_tree = dict([_sym("src/foo.py", "bar", long_id("y" * 64), long_id("x" * 64))]) mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"src") for _ in range(2): with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[head_tree.copy(), parent_tree.copy(), head_tree.copy(), parent_tree.copy()]), ): await DetectRefactorProvider().compute( db_session, rid, _CID_A, {"owner": "rfp6", "slug": "rf-idem"} ) count = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalar_one() assert count == 1, f"Expected 1 row after 2 runs, got {count}" # ───────────────────────────────────────────────────────────────────────────── # Layer T3 — Route # ───────────────────────────────────────────────────────────────────────────── class TestRoute: @pytest.mark.asyncio async def test_T13_refactor_page_returns_200( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """GET /rfuser/rf-e2e/intel/refactoring must return HTTP 200.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring") assert resp.status_code == 200, resp.text[:500] @pytest.mark.asyncio async def test_T14_refactor_page_empty_state( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Route must render empty state when no event rows exist.""" repo = await create_repo(db_session, owner="rfempty", slug="rf-nodata") await db_session.commit() resp = await client.get("/rfempty/rf-nodata/intel/refactoring") assert resp.status_code == 200 assert "Push a commit" in resp.text @pytest.mark.asyncio async def test_T15_refactor_page_404_for_unknown_repo( self, client: AsyncClient ) -> None: """Route must return 404 for an unknown repo slug.""" resp = await client.get("/nobody/nonexistent-repo/intel/refactoring") assert resp.status_code == 404 @pytest.mark.asyncio async def test_T16_kind_filter_limits_results( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """?kind=implementation must only show implementation events.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring?kind=implementation") assert resp.status_code == 200 html = resp.text assert "implementation" in html @pytest.mark.asyncio async def test_T17_top_filter_limits_results( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """?top=2 must limit the event list to 2 rows.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring?top=2") assert resp.status_code == 200 @pytest.mark.asyncio async def test_T18_stat_chips_present_in_html( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """Response must include the total, implementation, and signature counts.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring") html = resp.text assert "rf-stat-val" in html, "Missing stat chip value elements" @pytest.mark.asyncio async def test_T19_invalid_top_does_not_500( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """?top=GARBAGE must return 200 and fall back to the default top.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring?top=GARBAGE") assert resp.status_code == 200, f"Expected 200 on bad top, got {resp.status_code}" # ───────────────────────────────────────────────────────────────────────────── # Layer T4 — E2E HTML # ───────────────────────────────────────────────────────────────────────────── class TestHTML: @pytest.mark.asyncio async def test_T20_kind_badges_appear_in_html( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """All four kind values must appear in the rendered HTML.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring") html = resp.text for kind in ("implementation", "signature", "move", "rename"): assert kind in html, f"Kind '{kind}' not found in HTML" @pytest.mark.asyncio async def test_T21_address_rendered_in_rows( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """Event addresses must appear in the rendered row list.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring") assert "src/a.py" in resp.text @pytest.mark.asyncio async def test_T22_stat_chip_values_match_db( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """Total stat chip must reflect the 5 seeded events.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring") html = resp.text # 5 seeded events → the total count "5" must appear somewhere assert "5" in html @pytest.mark.asyncio async def test_T23_dashboard_link_present( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """'← Intel Hub' back link must be present on the refactoring page.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring") assert "Intel Hub" in resp.text @pytest.mark.asyncio async def test_T24_dashboard_card_present( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """Intel dashboard must show the Detect Refactor card.""" resp = await client.get("/rfuser/rf-e2e/intel") assert resp.status_code == 200 html = resp.text assert "Detect Refactor" in html or "refactoring" in html # ───────────────────────────────────────────────────────────────────────────── # Layer T5 — Data integrity # ───────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: @pytest.mark.asyncio async def test_T25_upsert_is_idempotent( self, db_session: AsyncSession ) -> None: """Inserting the same event_id twice must result in exactly one row.""" repo = await create_repo(db_session, owner="rfdi1", slug="rf-upsert") rid = str(repo.repo_id) for _ in range(2): await _insert_event(db_session, rid, event_id=long_id("e" * 64)) await db_session.commit() count = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalar_one() assert count == 1 @pytest.mark.asyncio async def test_T26_cross_repo_isolation( self, db_session: AsyncSession ) -> None: """Events from different repos must not leak into each other's results.""" repo_a = await create_repo(db_session, owner="rfdi2a", slug="rf-iso-a") repo_b = await create_repo(db_session, owner="rfdi2b", slug="rf-iso-b") rid_a, rid_b = str(repo_a.repo_id), str(repo_b.repo_id) await _insert_event(db_session, rid_a, event_id=long_id("a" * 64)) await _insert_event(db_session, rid_b, event_id=long_id("b" * 64)) await db_session.commit() count_a = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid_a) )).scalar_one() count_b = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid_b) )).scalar_one() assert count_a == 1 assert count_b == 1 @pytest.mark.asyncio async def test_T27_commit_message_stored( self, db_session: AsyncSession ) -> None: """commit_message must be persisted and queryable.""" repo = await create_repo(db_session, owner="rfdi3", slug="rf-msg") rid = str(repo.repo_id) await _insert_event( db_session, rid, event_id=long_id("m" * 64), commit_message="feat: spectacular refactor", ) await db_session.commit() row = (await db_session.execute( sa.select(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalars().first() assert row is not None assert row.commit_message == "feat: spectacular refactor" @pytest.mark.asyncio async def test_T28_cascade_delete_removes_events( self, db_session: AsyncSession ) -> None: """Deleting the repo must cascade-delete all its refactoring events.""" repo = await create_repo(db_session, owner="rfdi4", slug="rf-cascade") rid = str(repo.repo_id) await _insert_event(db_session, rid, event_id=long_id("z" * 64)) await db_session.commit() await db_session.delete(repo) await db_session.commit() count = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalar_one() assert count == 0 # ───────────────────────────────────────────────────────────────────────────── # Layer T6 — Performance # ───────────────────────────────────────────────────────────────────────────── class TestPerformance: @pytest.mark.asyncio async def test_T29_provider_completes_under_5s( self, db_session: AsyncSession ) -> None: """Provider must finish under 5 seconds for a 200-symbol diff.""" from musehub.services.musehub_intel_providers import DetectRefactorProvider repo = await create_repo(db_session, owner="rfperf1", slug="rf-perf") rid = str(repo.repo_id) manifest = {f"src/file_{i}.py": long_id(f"{'0' * 63}{i}") for i in range(10)} await _seed_two_commits(db_session, rid, manifest, manifest, "rfperf1", "rf-perf") def _make_tree(prefix: str) -> JSONObject: tree = {} for i in range(20): addr, rec = _sym( f"src/file_{i % 10}.py", f"fn_{i}", long_id(prefix * 64), long_id("s" * 64), ) tree[addr] = rec return tree head_tree = _make_tree("a") parent_tree = _make_tree("b") mock_backend = AsyncMock() mock_backend.get = AsyncMock(return_value=b"src") t0 = time.monotonic() with ( patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend), patch("musehub.services.musehub_intel_providers.parse_symbols", side_effect=[head_tree.copy(), parent_tree.copy()] * 20), ): await DetectRefactorProvider().compute( db_session, rid, _CID_A, {"owner": "rfperf1", "slug": "rf-perf"} ) elapsed = time.monotonic() - t0 assert elapsed < 5.0, f"Provider took {elapsed:.2f}s — exceeds 5s budget" @pytest.mark.asyncio async def test_T30_route_responds_under_500ms( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """Refactoring page must respond in under 500 ms with 5 seeded rows.""" t0 = time.monotonic() resp = await client.get("/rfuser/rf-e2e/intel/refactoring") elapsed = (time.monotonic() - t0) * 1000 assert resp.status_code == 200 assert elapsed < 500, f"Route took {elapsed:.0f}ms — exceeds 500ms budget" @pytest.mark.asyncio async def test_T31_bulk_insert_500_events( self, db_session: AsyncSession ) -> None: """Inserting 500 events must complete in under 10 seconds.""" repo = await create_repo(db_session, owner="rfperf2", slug="rf-bulk") rid = str(repo.repo_id) t0 = time.monotonic() for i in range(500): await _insert_event( db_session, rid, event_id=long_id(f"{'0' * 60}{i:04d}"), address=f"src/f{i}.py::fn", ) await db_session.commit() elapsed = time.monotonic() - t0 assert elapsed < 10.0, f"500-row insert took {elapsed:.2f}s" count = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent) .where(MusehubIntelRefactorEvent.repo_id == rid) )).scalar_one() assert count == 500 # ───────────────────────────────────────────────────────────────────────────── # Layer T7 — Security # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: @pytest.mark.asyncio async def test_T32_xss_in_address_is_escaped( self, client: AsyncClient, db_session: AsyncSession ) -> None: """address containing HTML must be escaped — raw tags must not appear.""" repo = await create_repo(db_session, owner="rfxss", slug="rf-xss") rid = str(repo.repo_id) await _insert_event( db_session, rid, event_id=long_id("x" * 64), address='src/.py::fn', ) await db_session.commit() resp = await client.get("/rfxss/rf-xss/intel/refactoring") assert resp.status_code == 200 assert "" not in resp.text @pytest.mark.asyncio async def test_T33_sql_injection_in_top_param_returns_200( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """?top=1;DROP TABLE must return 200 and fall back to default top.""" resp = await client.get( "/rfuser/rf-e2e/intel/refactoring", params={"top": "1;DROP TABLE musehub_intel_refactor_events;--"}, ) assert resp.status_code == 200 @pytest.mark.asyncio async def test_T34_unknown_kind_filter_returns_200( self, client: AsyncClient, rf_repo: MusehubRepo ) -> None: """?kind=INVALID must return 200 with an empty or full result set.""" resp = await client.get("/rfuser/rf-e2e/intel/refactoring?kind=INVALID") assert resp.status_code == 200 # ───────────────────────────────────────────────────────────────────────────── # Helpers used by T11 only # ───────────────────────────────────────────────────────────────────────────── async def session_insert_snapshot( session: AsyncSession, repo_id: str, snap_id: str, manifest: dict[str, str], ) -> None: await session.execute( pg_insert(MusehubSnapshot) .values( snapshot_id = snap_id, directories = [], manifest_blob = msgpack.packb(manifest), entry_count = len(manifest), created_at = datetime(2026, 1, 1, tzinfo=timezone.utc), ) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() )