"""Detect-Refactor intel — full 7-tier test suite (issue #22).
Tests are written TDD-first: all tests must be RED before Phase 4–7
implementation begins, then GREEN after.
Tiers
-----
T01–T05 Layer T1 — DB model (columns, nullable, cascade, index, commit_message)
T06–T12 Layer T2 — Provider (no subprocess, impl, sig, move, rename, empty, idempotent)
T13–T19 Layer T3 — Route (200, empty state, 404, kind filter, top filter, stat chips, sort)
T20–T24 Layer T4 — E2E HTML (kind badges, detail links, stat chips, cycle panel absent, dashboard card)
T25–T28 Layer T5 — Data integrity (upsert idempotent, cross-repo isolation, commit_message stored, kind index)
T29–T31 Layer T6 — Performance (provider speed, route speed, bulk upsert)
T32–T34 Layer T7 — Security (XSS escape in address, SQL injection top param, no 500 on bad kind)
"""
from __future__ import annotations
import time
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import msgpack
import pytest
import pytest_asyncio
import sqlalchemy as sa
from httpx import AsyncClient
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.db.musehub_intel_models import MusehubIntelRefactorEvent
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef
from musehub.types.json_types import JSONObject
from tests.factories import create_repo
from muse.core.types import long_id
_REF = long_id("a" * 64)
_SNAP_A = long_id("b" * 64)
_SNAP_B = long_id("c" * 64)
_CID_A = long_id("d" * 64)
_CID_B = long_id("e" * 64)
_OBJ_1 = long_id("f" * 64)
_OBJ_2 = long_id("1" * 64)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
async def _insert_event(
session: AsyncSession,
repo_id: str,
event_id: str,
kind: str = "implementation",
address: str = "src/foo.py::bar",
detail: str | None = None,
commit_id: str = _CID_A,
commit_message: str | None = "feat: add bar",
committed_at: datetime | None = None,
) -> None:
"""Upsert one row into musehub_intel_refactor_events."""
if committed_at is None:
committed_at = datetime(2026, 1, 1, tzinfo=timezone.utc)
await session.execute(
pg_insert(MusehubIntelRefactorEvent)
.values(
event_id=event_id,
repo_id=repo_id,
kind=kind,
address=address,
detail=detail,
commit_id=commit_id,
commit_message=commit_message,
committed_at=committed_at,
)
.on_conflict_do_update(
index_elements=["event_id"],
set_={
"kind": kind,
"address": address,
"detail": detail,
"commit_id": commit_id,
"commit_message": commit_message,
"committed_at": committed_at,
},
)
)
async def _seed_two_commits(
session: AsyncSession,
repo_id: str,
head_manifest: dict[str, str],
parent_manifest: dict[str, str],
owner: str,
slug: str,
) -> tuple[str, str]:
"""Insert parent commit/snapshot then HEAD commit/snapshot.
Returns ``(head_commit_id, parent_commit_id)``.
"""
# ── parent ────────────────────────────────────────────────────────────────
await session.execute(
pg_insert(MusehubSnapshot)
.values(
snapshot_id = _SNAP_B,
directories = [],
manifest_blob = msgpack.packb(parent_manifest),
entry_count = len(parent_manifest),
created_at = datetime(2026, 1, 1, tzinfo=timezone.utc),
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubSnapshotRef)
.values(repo_id=repo_id, snapshot_id=_SNAP_B)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommit)
.values(
commit_id = _CID_B,
branch = "dev",
parent_ids = [],
message = "chore: initial",
author = owner,
timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc),
snapshot_id = _SNAP_B,
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommitRef)
.values(repo_id=repo_id, commit_id=_CID_B)
.on_conflict_do_nothing()
)
# ── HEAD ──────────────────────────────────────────────────────────────────
await session.execute(
pg_insert(MusehubSnapshot)
.values(
snapshot_id = _SNAP_A,
directories = [],
manifest_blob = msgpack.packb(head_manifest),
entry_count = len(head_manifest),
created_at = datetime(2026, 1, 2, tzinfo=timezone.utc),
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubSnapshotRef)
.values(repo_id=repo_id, snapshot_id=_SNAP_A)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommit)
.values(
commit_id = _CID_A,
branch = "dev",
parent_ids = [_CID_B],
message = "feat: refactor things",
author = owner,
timestamp = datetime(2026, 1, 2, tzinfo=timezone.utc),
snapshot_id = _SNAP_A,
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubCommitRef)
.values(repo_id=repo_id, commit_id=_CID_A)
.on_conflict_do_nothing()
)
await session.commit()
return _CID_A, _CID_B
def _sym(
file_path: str,
name: str,
body_hash: str,
signature_id: str,
kind: str = "function",
) -> tuple[str, dict]:
"""Return ``(address, rec)`` suitable for a parse_symbols side_effect dict."""
return f"{file_path}::{name}", {
"kind": kind,
"name": name,
"qualified_name": name,
"content_id": long_id("9" * 64),
"body_hash": body_hash,
"signature_id": signature_id,
"metadata_id": "",
"canonical_key": f"{file_path}##function#{name}#1",
"lineno": 1,
"end_lineno": 5,
}
@pytest_asyncio.fixture
async def rf_repo(db_session: AsyncSession) -> MusehubRepo:
"""Repo seeded with 5 detect-refactor event rows."""
repo = await create_repo(db_session, owner="rfuser", slug="rf-e2e")
rid = str(repo.repo_id)
kinds = [
("implementation", "src/a.py::foo"),
("implementation", "src/b.py::bar"),
("signature", "src/c.py::baz"),
("move", "src/d.py::old_fn"),
("rename", "src/e.py::qux"),
]
for i, (kind, addr) in enumerate(kinds):
await _insert_event(
db_session, rid,
event_id=long_id(f"ev{'0' * 60}{i:02d}"),
kind=kind,
address=addr,
commit_id=_CID_A,
)
await db_session.commit()
return repo
# ─────────────────────────────────────────────────────────────────────────────
# Layer T1 — DB model
# ─────────────────────────────────────────────────────────────────────────────
class TestDBModel:
def test_T01_model_has_commit_message_column(self) -> None:
"""MusehubIntelRefactorEvent must expose a commit_message column."""
cols = {c.name for c in MusehubIntelRefactorEvent.__table__.columns}
assert "commit_message" in cols
def test_T02_commit_message_is_nullable(self) -> None:
"""commit_message must be nullable (older rows pre-migration have NULL)."""
col = MusehubIntelRefactorEvent.__table__.columns["commit_message"]
assert col.nullable is True
def test_T03_model_has_required_columns(self) -> None:
"""event_id, repo_id, kind, address, commit_id, committed_at must exist."""
cols = {c.name for c in MusehubIntelRefactorEvent.__table__.columns}
for required in ("event_id", "repo_id", "kind", "address", "commit_id", "committed_at"):
assert required in cols, f"Column '{required}' missing"
def test_T04_cascade_delete_configured(self) -> None:
"""repo_id FK must use CASCADE so repo deletion cleans events."""
fks = MusehubIntelRefactorEvent.__table__.foreign_keys
for fk in fks:
if "repo_id" in str(fk.parent):
assert fk.ondelete == "CASCADE"
return
pytest.fail("No CASCADE FK found for repo_id")
def test_T05_composite_index_exists(self) -> None:
"""ix_intel_refactor_events_repo_kind index must cover (repo_id, kind)."""
indexes = MusehubIntelRefactorEvent.__table__.indexes
names = {idx.name for idx in indexes}
assert "ix_intel_refactor_events_repo_kind" in names
# ─────────────────────────────────────────────────────────────────────────────
# Layer T2 — Provider
# ─────────────────────────────────────────────────────────────────────────────
class TestProvider:
@pytest.mark.asyncio
async def test_T06_provider_uses_no_subprocess(
self, db_session: AsyncSession
) -> None:
"""DetectRefactorProvider must never call _run_muse or import subprocess."""
from musehub.services import musehub_intel_providers as svc
import inspect, ast, textwrap
src = inspect.getsource(svc.DetectRefactorProvider)
# Strip docstrings from AST so comment-only mentions don't trip us up
tree = ast.parse(textwrap.dedent(src))
non_doc_src = ast.unparse(tree)
assert "_run_muse" not in non_doc_src, "DetectRefactorProvider must not call _run_muse"
assert "import subprocess" not in non_doc_src, "DetectRefactorProvider must not import subprocess"
@pytest.mark.asyncio
async def test_T07_provider_detects_implementation_change(
self, db_session: AsyncSession
) -> None:
"""Same address, different body_hash → kind='implementation'."""
from musehub.services.musehub_intel_providers import DetectRefactorProvider
repo = await create_repo(db_session, owner="rfp1", slug="rf-impl")
rid = str(repo.repo_id)
manifest = {"src/foo.py": _OBJ_1}
await _seed_two_commits(db_session, rid, manifest, manifest, "rfp1", "rf-impl")
# HEAD has different body_hash, same sig
head_tree = dict([_sym("src/foo.py", "bar", long_id("a" * 64), long_id("s" * 64))])
parent_tree = dict([_sym("src/foo.py", "bar", long_id("b" * 64), long_id("s" * 64))])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[head_tree, parent_tree, head_tree, parent_tree]),
):
results = await DetectRefactorProvider().compute(
db_session, rid, _CID_A, {"owner": "rfp1", "slug": "rf-impl"}
)
assert results, "Provider returned no results"
count = results[0][1]["count"]
assert count == 1, f"Expected 1 implementation event, got {count}"
row = (await db_session.execute(
sa.select(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalars().first()
assert row is not None
assert row.kind == "implementation"
@pytest.mark.asyncio
async def test_T08_provider_detects_signature_change(
self, db_session: AsyncSession
) -> None:
"""Same body_hash but different signature_id → kind='signature'."""
from musehub.services.musehub_intel_providers import DetectRefactorProvider
repo = await create_repo(db_session, owner="rfp2", slug="rf-sig")
rid = str(repo.repo_id)
manifest = {"src/foo.py": _OBJ_1}
await _seed_two_commits(db_session, rid, manifest, manifest, "rfp2", "rf-sig")
body_h = long_id("b" * 64)
head_tree = dict([_sym("src/foo.py", "bar", body_h, long_id("s1" + "a" * 62))])
parent_tree = dict([_sym("src/foo.py", "bar", body_h, long_id("s2" + "a" * 62))])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[head_tree, parent_tree, head_tree, parent_tree]),
):
results = await DetectRefactorProvider().compute(
db_session, rid, _CID_A, {"owner": "rfp2", "slug": "rf-sig"}
)
count = results[0][1]["count"]
assert count == 1, f"Expected 1 signature event, got {count}"
row = (await db_session.execute(
sa.select(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalars().first()
assert row is not None
assert row.kind == "signature"
@pytest.mark.asyncio
async def test_T09_provider_detects_move(
self, db_session: AsyncSession
) -> None:
"""Same body_hash at different file path → kind='move'."""
from musehub.services.musehub_intel_providers import DetectRefactorProvider
repo = await create_repo(db_session, owner="rfp3", slug="rf-move")
rid = str(repo.repo_id)
head_manifest = {"src/new.py": _OBJ_1}
parent_manifest = {"src/old.py": _OBJ_2}
await _seed_two_commits(
db_session, rid, head_manifest, parent_manifest, "rfp3", "rf-move"
)
body_h = long_id("b" * 64)
sig_h = long_id("s" * 64)
head_tree = dict([_sym("src/new.py", "fn", body_h, sig_h)])
parent_tree = dict([_sym("src/old.py", "fn", body_h, sig_h)])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[head_tree, parent_tree, head_tree, parent_tree]),
):
results = await DetectRefactorProvider().compute(
db_session, rid, _CID_A, {"owner": "rfp3", "slug": "rf-move"}
)
count = results[0][1]["count"]
assert count == 1, f"Expected 1 move event, got {count}"
row = (await db_session.execute(
sa.select(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalars().first()
assert row is not None
assert row.kind == "move"
assert row.detail == "src/new.py::fn"
@pytest.mark.asyncio
async def test_T10_provider_detects_rename(
self, db_session: AsyncSession
) -> None:
"""Same body_hash at same file but different name → kind='rename'."""
from musehub.services.musehub_intel_providers import DetectRefactorProvider
repo = await create_repo(db_session, owner="rfp4", slug="rf-rename")
rid = str(repo.repo_id)
manifest = {"src/foo.py": _OBJ_1}
await _seed_two_commits(db_session, rid, manifest, manifest, "rfp4", "rf-rename")
body_h = long_id("b" * 64)
sig_h = long_id("s" * 64)
head_tree = dict([_sym("src/foo.py", "new_name", body_h, sig_h)])
parent_tree = dict([_sym("src/foo.py", "old_name", body_h, sig_h)])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(side_effect=[b"head", b"parent"])
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[head_tree, parent_tree, head_tree, parent_tree]),
):
results = await DetectRefactorProvider().compute(
db_session, rid, _CID_A, {"owner": "rfp4", "slug": "rf-rename"}
)
count = results[0][1]["count"]
assert count == 1, f"Expected 1 rename event, got {count}"
row = (await db_session.execute(
sa.select(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalars().first()
assert row is not None
assert row.kind == "rename"
@pytest.mark.asyncio
async def test_T11_provider_returns_empty_when_no_parent(
self, db_session: AsyncSession
) -> None:
"""Initial commit (no parent) must produce no events."""
from musehub.services.musehub_intel_providers import DetectRefactorProvider
repo = await create_repo(db_session, owner="rfp5", slug="rf-noparen")
rid = str(repo.repo_id)
snap_id = long_id("f" * 64)
await session_insert_snapshot(db_session, rid, snap_id, {"src/a.py": _OBJ_1})
await db_session.execute(
pg_insert(MusehubCommit)
.values(
commit_id = _CID_A,
branch = "dev",
parent_ids = [], # ← no parent
message = "init",
author = "rfp5",
timestamp = datetime(2026, 1, 1, tzinfo=timezone.utc),
snapshot_id = snap_id,
)
.on_conflict_do_nothing()
)
await db_session.execute(
pg_insert(MusehubCommitRef)
.values(repo_id=rid, commit_id=_CID_A)
.on_conflict_do_nothing()
)
await db_session.commit()
results = await DetectRefactorProvider().compute(
db_session, rid, _CID_A, {"owner": "rfp5", "slug": "rf-noparen"}
)
assert results == [], f"Expected [], got {results}"
@pytest.mark.asyncio
async def test_T12_provider_is_idempotent(
self, db_session: AsyncSession
) -> None:
"""Running the provider twice must not create duplicate rows."""
from musehub.services.musehub_intel_providers import DetectRefactorProvider
repo = await create_repo(db_session, owner="rfp6", slug="rf-idem")
rid = str(repo.repo_id)
manifest = {"src/foo.py": _OBJ_1}
await _seed_two_commits(db_session, rid, manifest, manifest, "rfp6", "rf-idem")
body_h = long_id("b" * 64)
head_tree = dict([_sym("src/foo.py", "bar", body_h, long_id("x" * 64))])
parent_tree = dict([_sym("src/foo.py", "bar", long_id("y" * 64), long_id("x" * 64))])
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"src")
for _ in range(2):
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[head_tree.copy(), parent_tree.copy(),
head_tree.copy(), parent_tree.copy()]),
):
await DetectRefactorProvider().compute(
db_session, rid, _CID_A, {"owner": "rfp6", "slug": "rf-idem"}
)
count = (await db_session.execute(
sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalar_one()
assert count == 1, f"Expected 1 row after 2 runs, got {count}"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T3 — Route
# ─────────────────────────────────────────────────────────────────────────────
class TestRoute:
@pytest.mark.asyncio
async def test_T13_refactor_page_returns_200(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""GET /rfuser/rf-e2e/intel/refactoring must return HTTP 200."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring")
assert resp.status_code == 200, resp.text[:500]
@pytest.mark.asyncio
async def test_T14_refactor_page_empty_state(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Route must render empty state when no event rows exist."""
repo = await create_repo(db_session, owner="rfempty", slug="rf-nodata")
await db_session.commit()
resp = await client.get("/rfempty/rf-nodata/intel/refactoring")
assert resp.status_code == 200
assert "Push a commit" in resp.text
@pytest.mark.asyncio
async def test_T15_refactor_page_404_for_unknown_repo(
self, client: AsyncClient
) -> None:
"""Route must return 404 for an unknown repo slug."""
resp = await client.get("/nobody/nonexistent-repo/intel/refactoring")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_T16_kind_filter_limits_results(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""?kind=implementation must only show implementation events."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring?kind=implementation")
assert resp.status_code == 200
html = resp.text
assert "implementation" in html
@pytest.mark.asyncio
async def test_T17_top_filter_limits_results(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""?top=2 must limit the event list to 2 rows."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring?top=2")
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_T18_stat_chips_present_in_html(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""Response must include the total, implementation, and signature counts."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring")
html = resp.text
assert "rf-stat-val" in html, "Missing stat chip value elements"
@pytest.mark.asyncio
async def test_T19_invalid_top_does_not_500(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""?top=GARBAGE must return 200 and fall back to the default top."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring?top=GARBAGE")
assert resp.status_code == 200, f"Expected 200 on bad top, got {resp.status_code}"
# ─────────────────────────────────────────────────────────────────────────────
# Layer T4 — E2E HTML
# ─────────────────────────────────────────────────────────────────────────────
class TestHTML:
@pytest.mark.asyncio
async def test_T20_kind_badges_appear_in_html(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""All four kind values must appear in the rendered HTML."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring")
html = resp.text
for kind in ("implementation", "signature", "move", "rename"):
assert kind in html, f"Kind '{kind}' not found in HTML"
@pytest.mark.asyncio
async def test_T21_address_rendered_in_rows(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""Event addresses must appear in the rendered row list."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring")
assert "src/a.py" in resp.text
@pytest.mark.asyncio
async def test_T22_stat_chip_values_match_db(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""Total stat chip must reflect the 5 seeded events."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring")
html = resp.text
# 5 seeded events → the total count "5" must appear somewhere
assert "5" in html
@pytest.mark.asyncio
async def test_T23_dashboard_link_present(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""'← Intel Hub' back link must be present on the refactoring page."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring")
assert "Intel Hub" in resp.text
@pytest.mark.asyncio
async def test_T24_dashboard_card_present(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""Intel dashboard must show the Detect Refactor card."""
resp = await client.get("/rfuser/rf-e2e/intel")
assert resp.status_code == 200
html = resp.text
assert "Detect Refactor" in html or "refactoring" in html
# ─────────────────────────────────────────────────────────────────────────────
# Layer T5 — Data integrity
# ─────────────────────────────────────────────────────────────────────────────
class TestDataIntegrity:
@pytest.mark.asyncio
async def test_T25_upsert_is_idempotent(
self, db_session: AsyncSession
) -> None:
"""Inserting the same event_id twice must result in exactly one row."""
repo = await create_repo(db_session, owner="rfdi1", slug="rf-upsert")
rid = str(repo.repo_id)
for _ in range(2):
await _insert_event(db_session, rid, event_id=long_id("e" * 64))
await db_session.commit()
count = (await db_session.execute(
sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalar_one()
assert count == 1
@pytest.mark.asyncio
async def test_T26_cross_repo_isolation(
self, db_session: AsyncSession
) -> None:
"""Events from different repos must not leak into each other's results."""
repo_a = await create_repo(db_session, owner="rfdi2a", slug="rf-iso-a")
repo_b = await create_repo(db_session, owner="rfdi2b", slug="rf-iso-b")
rid_a, rid_b = str(repo_a.repo_id), str(repo_b.repo_id)
await _insert_event(db_session, rid_a, event_id=long_id("a" * 64))
await _insert_event(db_session, rid_b, event_id=long_id("b" * 64))
await db_session.commit()
count_a = (await db_session.execute(
sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid_a)
)).scalar_one()
count_b = (await db_session.execute(
sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid_b)
)).scalar_one()
assert count_a == 1
assert count_b == 1
@pytest.mark.asyncio
async def test_T27_commit_message_stored(
self, db_session: AsyncSession
) -> None:
"""commit_message must be persisted and queryable."""
repo = await create_repo(db_session, owner="rfdi3", slug="rf-msg")
rid = str(repo.repo_id)
await _insert_event(
db_session, rid,
event_id=long_id("m" * 64),
commit_message="feat: spectacular refactor",
)
await db_session.commit()
row = (await db_session.execute(
sa.select(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalars().first()
assert row is not None
assert row.commit_message == "feat: spectacular refactor"
@pytest.mark.asyncio
async def test_T28_cascade_delete_removes_events(
self, db_session: AsyncSession
) -> None:
"""Deleting the repo must cascade-delete all its refactoring events."""
repo = await create_repo(db_session, owner="rfdi4", slug="rf-cascade")
rid = str(repo.repo_id)
await _insert_event(db_session, rid, event_id=long_id("z" * 64))
await db_session.commit()
await db_session.delete(repo)
await db_session.commit()
count = (await db_session.execute(
sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalar_one()
assert count == 0
# ─────────────────────────────────────────────────────────────────────────────
# Layer T6 — Performance
# ─────────────────────────────────────────────────────────────────────────────
class TestPerformance:
@pytest.mark.asyncio
async def test_T29_provider_completes_under_5s(
self, db_session: AsyncSession
) -> None:
"""Provider must finish under 5 seconds for a 200-symbol diff."""
from musehub.services.musehub_intel_providers import DetectRefactorProvider
repo = await create_repo(db_session, owner="rfperf1", slug="rf-perf")
rid = str(repo.repo_id)
manifest = {f"src/file_{i}.py": long_id(f"{'0' * 63}{i}") for i in range(10)}
await _seed_two_commits(db_session, rid, manifest, manifest, "rfperf1", "rf-perf")
def _make_tree(prefix: str) -> JSONObject:
tree = {}
for i in range(20):
addr, rec = _sym(
f"src/file_{i % 10}.py", f"fn_{i}",
long_id(prefix * 64),
long_id("s" * 64),
)
tree[addr] = rec
return tree
head_tree = _make_tree("a")
parent_tree = _make_tree("b")
mock_backend = AsyncMock()
mock_backend.get = AsyncMock(return_value=b"src")
t0 = time.monotonic()
with (
patch("musehub.services.musehub_intel_providers.get_backend", return_value=mock_backend),
patch("musehub.services.musehub_intel_providers.parse_symbols",
side_effect=[head_tree.copy(), parent_tree.copy()] * 20),
):
await DetectRefactorProvider().compute(
db_session, rid, _CID_A, {"owner": "rfperf1", "slug": "rf-perf"}
)
elapsed = time.monotonic() - t0
assert elapsed < 5.0, f"Provider took {elapsed:.2f}s — exceeds 5s budget"
@pytest.mark.asyncio
async def test_T30_route_responds_under_500ms(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""Refactoring page must respond in under 500 ms with 5 seeded rows."""
t0 = time.monotonic()
resp = await client.get("/rfuser/rf-e2e/intel/refactoring")
elapsed = (time.monotonic() - t0) * 1000
assert resp.status_code == 200
assert elapsed < 500, f"Route took {elapsed:.0f}ms — exceeds 500ms budget"
@pytest.mark.asyncio
async def test_T31_bulk_insert_500_events(
self, db_session: AsyncSession
) -> None:
"""Inserting 500 events must complete in under 10 seconds."""
repo = await create_repo(db_session, owner="rfperf2", slug="rf-bulk")
rid = str(repo.repo_id)
t0 = time.monotonic()
for i in range(500):
await _insert_event(
db_session, rid,
event_id=long_id(f"{'0' * 60}{i:04d}"),
address=f"src/f{i}.py::fn",
)
await db_session.commit()
elapsed = time.monotonic() - t0
assert elapsed < 10.0, f"500-row insert took {elapsed:.2f}s"
count = (await db_session.execute(
sa.select(sa.func.count()).select_from(MusehubIntelRefactorEvent)
.where(MusehubIntelRefactorEvent.repo_id == rid)
)).scalar_one()
assert count == 500
# ─────────────────────────────────────────────────────────────────────────────
# Layer T7 — Security
# ─────────────────────────────────────────────────────────────────────────────
class TestSecurity:
@pytest.mark.asyncio
async def test_T32_xss_in_address_is_escaped(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""address containing HTML must be escaped — raw tags must not appear."""
repo = await create_repo(db_session, owner="rfxss", slug="rf-xss")
rid = str(repo.repo_id)
await _insert_event(
db_session, rid,
event_id=long_id("x" * 64),
address='src/.py::fn',
)
await db_session.commit()
resp = await client.get("/rfxss/rf-xss/intel/refactoring")
assert resp.status_code == 200
assert "" not in resp.text
@pytest.mark.asyncio
async def test_T33_sql_injection_in_top_param_returns_200(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""?top=1;DROP TABLE must return 200 and fall back to default top."""
resp = await client.get(
"/rfuser/rf-e2e/intel/refactoring",
params={"top": "1;DROP TABLE musehub_intel_refactor_events;--"},
)
assert resp.status_code == 200
@pytest.mark.asyncio
async def test_T34_unknown_kind_filter_returns_200(
self, client: AsyncClient, rf_repo: MusehubRepo
) -> None:
"""?kind=INVALID must return 200 with an empty or full result set."""
resp = await client.get("/rfuser/rf-e2e/intel/refactoring?kind=INVALID")
assert resp.status_code == 200
# ─────────────────────────────────────────────────────────────────────────────
# Helpers used by T11 only
# ─────────────────────────────────────────────────────────────────────────────
async def session_insert_snapshot(
session: AsyncSession,
repo_id: str,
snap_id: str,
manifest: dict[str, str],
) -> None:
await session.execute(
pg_insert(MusehubSnapshot)
.values(
snapshot_id = snap_id,
directories = [],
manifest_blob = msgpack.packb(manifest),
entry_count = len(manifest),
created_at = datetime(2026, 1, 1, tzinfo=timezone.utc),
)
.on_conflict_do_nothing()
)
await session.execute(
pg_insert(MusehubSnapshotRef)
.values(repo_id=repo_id, snapshot_id=snap_id)
.on_conflict_do_nothing()
)