"""Pytest configuration and fixtures."""
from __future__ import annotations
from pathlib import Path
import logging
import os
import typing
from collections.abc import AsyncGenerator, Generator
if not os.environ.get("MUSE_ENV"):
os.environ["MUSE_ENV"] = "test"
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.pool import NullPool
from musehub.core.genesis import compute_identity_id
from musehub.db import database
from musehub.db.database import Base, get_db
from musehub.db.musehub_identity_models import MusehubIdentity
from musehub.types.json_types import JSONValue
# Force all ORM models into Base.metadata before any create_all/drop_all.
# muse_cli_models is only imported inside init_db() in production; without
# this explicit import, Base.metadata is non-deterministic in tests (depends
# on import order), causing drop_all to miss tables that create_all later
# tries to create — resulting in duplicate-key errors on pg_type.
import musehub.db.muse_cli_models as _muse_cli_models # noqa: F401
from musehub.auth.request_signing import MSignContext, optional_signed_request, require_signed_request
from musehub.main import app
from musehub.rate_limits import limiter
type _JobPayload = dict[str, str | int | bool | None]
import musehub.auth.failure_limiter as _failure_limiter
@pytest.fixture()
def _stub_push_background_tasks(monkeypatch: pytest.MonkeyPatch) -> None:
"""Spy fixture for tests that need to assert enqueue_push_intel was called.
Opt-in only — add this as an explicit parameter to tests that need it.
Tests that exercise enqueue logic directly should NOT use this fixture;
they should call the real implementation and verify DB state.
"""
import musehub.services.musehub_jobs as _jobs
import musehub.services.musehub_wire as _wire
_jobs._test_enqueued_calls.clear()
async def _spy_enqueue(
session: AsyncSession, repo_id: str, head: str, domain_id: str | None = None, branch: str = "", owner: str | None = None,
) -> None:
_jobs._test_enqueued_calls.append((repo_id, "enqueue_push_intel", {"head": head, "domain_id": domain_id, "branch": branch}))
monkeypatch.setattr(_jobs, "enqueue_push_intel", _spy_enqueue)
monkeypatch.setattr(_wire, "enqueue_push_intel", _spy_enqueue)
@pytest.fixture(autouse=True)
def _tmp_objects_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Redirect object storage to the local MinIO instance for tests.
Uses the same BlobBackend (boto3 + S3-compatible) that runs in staging/prod,
pointed at MinIO on localhost:9000. Content-addressing gives natural test
isolation — tests cannot read each other's objects unless they wrote
identical bytes (correct behaviour).
Requires MinIO to be running: docker compose up minio createbuckets -d
"""
import musehub.storage.backends as _backends_mod
import musehub.services.musehub_wire as _wire_svc
import musehub.services.musehub_wire_fetch as _wire_fetch_svc
import musehub.services.musehub_wire_push as _wire_push_svc
import musehub.services.musehub_wire_shared as _wire_shared_svc
import musehub.services.musehub_gc as _gc_svc
import musehub.api.routes.wire as _wire_route
from musehub.config import settings
from musehub.storage.backends import MemoryBackend
test_backend = MemoryBackend()
import importlib
_backend_fn = lambda *_a, **_kw: test_backend
monkeypatch.setattr(_backends_mod, "get_backend", _backend_fn)
monkeypatch.setattr(_wire_svc, "get_backend", _backend_fn)
monkeypatch.setattr(_wire_fetch_svc, "get_backend", _backend_fn)
monkeypatch.setattr(_wire_push_svc, "get_backend", _backend_fn)
monkeypatch.setattr(_wire_shared_svc, "get_backend", _backend_fn)
monkeypatch.setattr(_gc_svc, "get_backend", _backend_fn)
monkeypatch.setattr(_wire_route, "get_backend", _backend_fn)
try:
_repair_mod = importlib.import_module("deploy.repair_objects")
monkeypatch.setattr(_repair_mod, "get_backend", _backend_fn)
except ModuleNotFoundError:
pass
monkeypatch.setattr(settings, "musehub_repos_dir", str(tmp_path))
# Redirect the /releases StaticFiles mount to a temp dir so tests that
# hit /releases/* don't fail because /data/releases doesn't exist locally.
releases_dir = f"{tmp_path}/releases"
os.makedirs(releases_dir, exist_ok=True)
from musehub.main import app as _app
for _route in _app.routes:
if getattr(_route, "name", None) == "releases":
_static = _route.app # type: ignore[attr-defined]
_static.directory = releases_dir
_static.config_checked = False # force re-check with new dir
break
def pytest_configure(config: pytest.Config) -> None:
"""Ensure asyncio_mode is auto so async fixtures work (e.g. in Docker when pyproject not in cwd)."""
if hasattr(config.option, "asyncio_mode") and config.option.asyncio_mode is None:
config.option.asyncio_mode = "auto"
# Suppress verbose library loggers that flood the test output with DEBUG lines.
for name in ("httpcore", "httpx", "sqlalchemy", "asyncio", "faker"):
logging.getLogger(name).setLevel(logging.WARNING)
@pytest.fixture(autouse=True)
def reset_rate_limiter() -> Generator[None, None, None]:
"""Reset in-memory rate-limit counters before every test.
Without this, the shared MemoryStorage accumulates hits across all tests
in a session. Auth endpoints cap at 20/minute; running 30+ auth tests
back-to-back exhausts that budget and causes 429s for legitimate calls.
"""
limiter.reset()
_failure_limiter._failures.clear()
yield
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
_WIRE_CONTEXT = MSignContext(
handle="test-user-wire",
identity_id="wire-test-user-id",
is_agent=False,
is_admin=False,
)
@pytest.fixture
def wire_headers() -> Generator[dict[str, str], None, None]:
"""Override auth deps to inject a fake MSignContext for wire protocol tests."""
app.dependency_overrides[require_signed_request] = lambda: _WIRE_CONTEXT
app.dependency_overrides[optional_signed_request] = lambda: _WIRE_CONTEXT
yield {
"Content-Type": "application/x-msgpack",
"Accept": "application/x-msgpack",
}
app.dependency_overrides.pop(require_signed_request, None)
app.dependency_overrides.pop(optional_signed_request, None)
@pytest.fixture(autouse=True)
def _reset_variation_store() -> Generator[None, None, None]:
"""Reset the singleton VariationStore between tests to prevent cross-test pollution.
Gracefully no-ops if the variation module has been removed (MuseHub extraction).
"""
yield
try:
from musehub.variation.storage.variation_store import reset_variation_store
reset_variation_store()
except ModuleNotFoundError:
pass
_TEST_DATABASE_URL = os.environ.get(
"TEST_DATABASE_URL",
"postgresql+asyncpg://musehub:musehub@localhost:5434/musehub_test",
)
# Sync URL for psycopg2 — used by the session-scoped schema fixture.
_TEST_DATABASE_URL_SYNC = _TEST_DATABASE_URL.replace("+asyncpg", "")
# Shared async engine for the whole test session (NullPool = no connection
# reuse between tests, but engine object creation is cheap so we create it
# once and share it).
_TEST_ENGINE = create_async_engine(_TEST_DATABASE_URL, poolclass=NullPool)
_TEST_SESSION_FACTORY = async_sessionmaker(
bind=_TEST_ENGINE,
class_=AsyncSession,
expire_on_commit=False,
)
# Pre-compute the TRUNCATE statement for all tables so we don't rebuild it
# each test. Reversed sorted_tables respects FK dependency order.
_TRUNCATE_SQL = "TRUNCATE {} RESTART IDENTITY CASCADE".format(
", ".join(t.name for t in reversed(Base.metadata.sorted_tables))
)
@pytest.fixture(scope="session", autouse=True)
def _db_schema() -> Generator[None, None, None]:
"""Create the test schema once per test session using a sync psycopg2 engine.
This replaces per-test drop_all/create_all (which took ~3 s per test on
PostgreSQL) with a single DDL pass at session start and end. Individual
tests get a clean slate via TRUNCATE in the db_session fixture instead.
"""
from sqlalchemy import create_engine as _create_engine
from sqlalchemy import text as _text
# connect_timeout=10: if postgres is unreachable or still starting (e.g.
# Docker container not ready), fail fast instead of blocking in C forever.
# Without this, Ctrl+C cannot kill the process because psycopg2's socket
# read is a non-interruptible C-level call.
sync_engine = _create_engine(
_TEST_DATABASE_URL_SYNC,
connect_args={"connect_timeout": 10},
)
# Terminate any leftover connections from interrupted test runs before
# running drop_all. If a previous pytest session was killed with SIGQUIT
# (Ctrl+\) it leaves postgres backends idle-in-transaction holding locks on
# the test tables. drop_all then waits forever for those locks, which
# makes the next test run freeze with Ctrl+C unresponsive.
with sync_engine.connect() as _conn:
_conn.execute(_text(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
"WHERE datname = current_database() AND pid != pg_backend_pid()"
))
_conn.commit()
# Dispose so drop_all / create_all get fresh connections — the
# pg_terminate_backend above may have killed pooled connections.
sync_engine.dispose()
sync_engine2 = _create_engine(
_TEST_DATABASE_URL_SYNC,
connect_args={"connect_timeout": 10},
)
Base.metadata.drop_all(sync_engine2)
sync_engine2.dispose()
sync_engine2 = _create_engine(
_TEST_DATABASE_URL_SYNC,
connect_args={"connect_timeout": 10},
)
Base.metadata.create_all(sync_engine2)
# Seed claim types (mirrors alembic/versions/0043 seed logic)
from musehub.services.musehub_attestations import _CLAIM_TYPES
with sync_engine2.connect() as _conn:
for ct in _CLAIM_TYPES.values():
_conn.execute(_text(
"INSERT INTO musehub_attestation_claim_types "
"(type_key, category, label, description, valid_scopes, introduced_at) "
"VALUES (:key, :cat, :label, :desc, :scopes, NOW()) "
"ON CONFLICT (type_key) DO NOTHING"
), {"key": ct["type_key"], "cat": ct["category"], "label": ct["label"],
"desc": ct["description"], "scopes": ct["valid_scopes"]})
_conn.commit()
sync_engine2.dispose()
yield
sync_engine3 = _create_engine(
_TEST_DATABASE_URL_SYNC,
connect_args={"connect_timeout": 10},
)
with sync_engine3.connect() as _conn:
_conn.execute(_text(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
"WHERE datname = current_database() AND pid != pg_backend_pid()"
))
_conn.commit()
sync_engine3.dispose()
sync_engine4 = _create_engine(
_TEST_DATABASE_URL_SYNC,
connect_args={"connect_timeout": 10},
)
Base.metadata.drop_all(sync_engine4)
sync_engine4.dispose()
@pytest_asyncio.fixture
async def db_session(_db_schema: None) -> AsyncGenerator[AsyncSession, None]:
"""Provide a clean DB session for each test.
Tables are truncated (not dropped/recreated) between tests — a single
TRUNCATE … CASCADE is ~100× faster than drop_all + create_all on
PostgreSQL, cutting per-test overhead from ~3 s to ~30 ms.
"""
from sqlalchemy import text as _text
async with _TEST_ENGINE.begin() as conn:
# Terminate ALL other backends before TRUNCATE. A failed test can
# leave a connection in any state (idle in transaction, idle in
# transaction (aborted), active) — filtering by state misses some
# cases and causes deadlocks when TRUNCATE races the stale transaction.
await conn.execute(_text(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
"WHERE datname = current_database() AND pid != pg_backend_pid()"
))
await conn.execute(_text(_TRUNCATE_SQL))
# Re-seed reference tables that are wiped by TRUNCATE CASCADE.
from musehub.services.musehub_attestations import _CLAIM_TYPES
for _ct in _CLAIM_TYPES.values():
await conn.execute(_text(
"INSERT INTO musehub_attestation_claim_types "
"(type_key, category, label, description, valid_scopes, introduced_at) "
"VALUES (:key, :cat, :label, :desc, :scopes, NOW()) "
"ON CONFLICT (type_key) DO NOTHING"
), {"key": _ct["type_key"], "cat": _ct["category"], "label": _ct["label"],
"desc": _ct["description"], "scopes": _ct["valid_scopes"]})
old_engine = database._engine
old_factory = database._async_session_factory
database._engine = _TEST_ENGINE
database._async_session_factory = _TEST_SESSION_FACTORY
try:
async with _TEST_SESSION_FACTORY() as session:
async def override_get_db() -> AsyncGenerator[AsyncSession, None]:
# Each request gets its own session so concurrent requests
# (e.g. stress tests) don't share a single connection and
# raise "concurrent operations are not permitted".
# All test setup data is committed, so independent sessions
# see it without needing to share the test session.
async with _TEST_SESSION_FACTORY() as req_session:
yield req_session
app.dependency_overrides[get_db] = override_get_db
yield session
app.dependency_overrides.clear()
finally:
database._engine = old_engine
database._async_session_factory = old_factory
@pytest_asyncio.fixture
async def session_factory(_db_schema: None) -> async_sessionmaker:
"""Expose the test session factory for tests needing multiple concurrent sessions."""
return _TEST_SESSION_FACTORY
class _Asgi24Wrapper:
"""Inject spec_version='2.4' into every HTTP scope.
ASGI 2.4 tells Starlette to skip listen_for_disconnect on streaming responses,
which is required for correct behaviour with async body reads.
"""
def __init__(self, app: typing.Any) -> None:
self._app = app
async def __call__(self, scope: typing.MutableMapping[str, typing.Any], receive: typing.Any, send: typing.Any) -> None:
if scope.get("type") == "http":
scope.setdefault("asgi", {})["spec_version"] = "2.4"
await self._app(scope, receive, send)
@pytest_asyncio.fixture
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
"""Create an async test client. Depends on db_session so auth revocation check uses test DB."""
transport = ASGITransport(app=_Asgi24Wrapper(app))
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
# -----------------------------------------------------------------------------
# Auth fixtures for API contract and integration tests
# Uses dependency_overrides to inject a fake MSignContext so tests don't need
# real Ed25519 key pairs. Only active for tests that request auth_headers.
# -----------------------------------------------------------------------------
_TEST_IDENTITY_ID = compute_identity_id(b"testuser")
_TEST_HANDLE = "testuser"
_TEST_CONTEXT = MSignContext(
handle=_TEST_HANDLE,
identity_id=_TEST_IDENTITY_ID,
is_agent=False,
is_admin=False,
)
@pytest_asyncio.fixture
async def test_user(db_session: AsyncSession) -> MusehubIdentity:
"""Create a test identity in the DB for authenticated route tests."""
identity = MusehubIdentity(
identity_id=_TEST_IDENTITY_ID,
handle=_TEST_HANDLE,
display_name="Test User",
identity_type="human",
)
db_session.add(identity)
await db_session.commit()
await db_session.refresh(identity)
# Close the autobegin transaction started by refresh() so subsequent
# test-body commits don't hit "another operation is in progress".
await db_session.commit()
return identity
@pytest.fixture
def auth_headers(test_user: MusehubIdentity) -> Generator[dict[str, str], None, None]:
"""Override auth dependencies to inject a fake MSignContext for the test duration.
Tests that need to verify 401 behaviour for *unauthenticated* requests should
use a separate client call without passing ``auth_headers`` — note that while
this fixture is active the app-level dep overrides are set globally, so any
request made within the same test function will be treated as authenticated.
Tests that need to distinguish authed/unauthed flows within one function should
use ``app.dependency_overrides`` directly or split into two test functions.
"""
app.dependency_overrides[require_signed_request] = lambda: _TEST_CONTEXT
app.dependency_overrides[optional_signed_request] = lambda: _TEST_CONTEXT
yield {"Content-Type": "application/json"}
app.dependency_overrides.pop(require_signed_request, None)
app.dependency_overrides.pop(optional_signed_request, None)
# ---------------------------------------------------------------------------
# Symbol-detail fixtures
# Used by test_symbol_detail_phase1.py (T2–T7 tiers).
# ---------------------------------------------------------------------------
import datetime as _dt
import contextlib as _contextlib
import time as _time
from muse.core.types import blob_id
def _utc_now() -> _dt.datetime:
return _dt.datetime.now(tz=_dt.timezone.utc)
async def _make_repo_row(session: AsyncSession, owner: str, slug: str) -> "MusehubRepo":
from musehub.db.musehub_repo_models import MusehubRepo
from musehub.core.genesis import compute_identity_id, compute_repo_id
owner_user_id = compute_identity_id(owner.encode())
created_at = _utc_now()
repo_id = compute_repo_id(owner_user_id, slug, "code", created_at.isoformat())
repo = MusehubRepo(
repo_id=repo_id,
name=slug,
owner=owner,
slug=slug,
visibility="public",
owner_user_id=owner_user_id,
description="",
tags=[],
created_at=created_at,
)
session.add(repo)
await session.commit()
return repo
async def _make_commit_row(session: AsyncSession, repo_id: str, commit_id: str, **kwargs: JSONValue) -> None:
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef
defaults = dict(
commit_id=commit_id,
branch="dev",
parent_ids=[],
message="feat: test",
author="gabriel",
timestamp=_utc_now(),
snapshot_id=blob_id(f"snap-{commit_id}".encode()),
agent_id="claude-code",
model_id="claude-sonnet-4-6",
commit_branch="task/test",
signature="",
)
defaults.update(kwargs)
session.add(MusehubCommit(**defaults))
await session.flush()
session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id))
await session.commit()
async def _make_history_entry(
session: AsyncSession, repo_id: str, address: str, commit_id: str,
op: str = "add", content_id: str | None = None,
committed_at: _dt.datetime | None = None,
message: str | None = None,
) -> None:
from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
session.add(MusehubSymbolHistoryEntry(
repo_id=repo_id,
address=address,
commit_id=commit_id,
committed_at=committed_at or _utc_now(),
author="gabriel",
op=op,
content_id=content_id or blob_id(f"body-{address}-{commit_id}".encode()),
message=message,
))
await session.commit()
@pytest_asyncio.fixture
async def repo_fixture(db_session: AsyncSession) -> tuple[str, str]:
"""Create a bare repo with no symbol history. Returns (owner, slug)."""
repo = await _make_repo_row(db_session, "gabriel", "test-repo")
return ("gabriel", repo.slug)
@pytest_asyncio.fixture
async def seed_symbol(db_session: AsyncSession) -> tuple[str, str, str]:
"""Create a repo with one symbol history entry and a commit. Returns (owner, slug, address)."""
owner, slug = "gabriel", "seed-repo"
address = "src/core.py::compute"
repo = await _make_repo_row(db_session, owner, slug)
commit_id = blob_id(f"commit-seed-{slug}".encode())
await _make_commit_row(db_session, repo.repo_id, commit_id)
await _make_history_entry(db_session, repo.repo_id, address, commit_id)
return (owner, slug, address)
@pytest_asyncio.fixture
async def seed_type_intel(db_session: AsyncSession, seed_symbol: tuple[str, str, str]) -> None:
"""Add a MusehubIntelType row for the seeded symbol."""
from musehub.db.musehub_intel_models import MusehubIntelType
from musehub.db.musehub_repo_models import MusehubRepo
from sqlalchemy import select
owner, slug, address = seed_symbol
result = await db_session.execute(select(MusehubRepo).where(MusehubRepo.owner == owner, MusehubRepo.slug == slug))
repo = result.scalar_one()
db_session.add(MusehubIntelType(
repo_id=repo.repo_id,
address=address,
kind="function",
return_is_any=False,
params_total=2,
params_annotated=2,
params_with_any=0,
type_score=0.95,
ref="dev",
))
await db_session.commit()
@pytest_asyncio.fixture
async def seed_sym_intel(db_session: AsyncSession, seed_symbol: tuple[str, str, str]) -> None:
"""Add a MusehubSymbolIntel row for the seeded symbol."""
from musehub.db.musehub_intel_models import MusehubSymbolIntel
from musehub.db.musehub_repo_models import MusehubRepo
from sqlalchemy import select
owner, slug, address = seed_symbol
result = await db_session.execute(select(MusehubRepo).where(MusehubRepo.owner == owner, MusehubRepo.slug == slug))
repo = result.scalar_one()
db_session.add(MusehubSymbolIntel(
repo_id=repo.repo_id,
address=address,
churn=5,
churn_30d=2,
churn_90d=4,
blast=3,
blast_direct=2,
blast_cross=1,
blast_top=[],
author_count=1,
gravity=0.1,
weekly=[],
gravity_pct=10.0,
gravity_direct_dependents=2,
gravity_transitive_dependents=3,
gravity_max_depth=2,
))
await db_session.commit()
@pytest_asyncio.fixture
async def seed_api_intel(db_session: AsyncSession, seed_symbol: tuple[str, str, str]) -> None:
"""Add a MusehubIntelApiSurface row for the seeded symbol."""
from musehub.db.musehub_intel_models import MusehubIntelApiSurface
from musehub.db.musehub_repo_models import MusehubRepo
from sqlalchemy import select
owner, slug, address = seed_symbol
result = await db_session.execute(select(MusehubRepo).where(MusehubRepo.owner == owner, MusehubRepo.slug == slug))
repo = result.scalar_one()
db_session.add(MusehubIntelApiSurface(
repo_id=repo.repo_id,
address=address,
kind="function",
visibility="public",
ref="dev",
))
await db_session.commit()
@pytest_asyncio.fixture
async def seed_many_refactor_events(db_session: AsyncSession, seed_symbol: tuple[str, str, str]) -> None:
"""Add 25 MusehubIntelRefactorEvent rows for the seeded symbol."""
from musehub.db.musehub_intel_models import MusehubIntelRefactorEvent
from musehub.db.musehub_repo_models import MusehubRepo
from sqlalchemy import select
owner, slug, address = seed_symbol
result = await db_session.execute(select(MusehubRepo).where(MusehubRepo.owner == owner, MusehubRepo.slug == slug))
repo = result.scalar_one()
for i in range(25):
db_session.add(MusehubIntelRefactorEvent(
event_id=blob_id(f"refactor-{i}-{slug}".encode()),
repo_id=repo.repo_id,
kind="implementation",
address=address,
detail=f"refactor event {i}",
commit_id=blob_id(f"rc-{i}".encode()),
committed_at=_utc_now(),
))
await db_session.commit()
@pytest_asyncio.fixture
async def seed_refactor_event(db_session: AsyncSession, seed_symbol: tuple[str, str, str]) -> None:
"""Add one MusehubIntelRefactorEvent with kind=implementation."""
from musehub.db.musehub_intel_models import MusehubIntelRefactorEvent
from musehub.db.musehub_repo_models import MusehubRepo
from sqlalchemy import select
owner, slug, address = seed_symbol
result = await db_session.execute(select(MusehubRepo).where(MusehubRepo.owner == owner, MusehubRepo.slug == slug))
repo = result.scalar_one()
db_session.add(MusehubIntelRefactorEvent(
event_id=blob_id(f"refactor-single-{slug}".encode()),
repo_id=repo.repo_id,
kind="implementation",
address=address,
detail="extracted helper",
commit_id=blob_id(f"rc-single-{slug}".encode()),
committed_at=_utc_now(),
))
await db_session.commit()
@pytest_asyncio.fixture
async def seed_refactor_event_with_xss(db_session: AsyncSession, seed_symbol: tuple[str, str, str]) -> None:
"""Add a refactor event whose detail field contains an XSS payload."""
from musehub.db.musehub_intel_models import MusehubIntelRefactorEvent
from musehub.db.musehub_repo_models import MusehubRepo
from sqlalchemy import select
owner, slug, address = seed_symbol
result = await db_session.execute(select(MusehubRepo).where(MusehubRepo.owner == owner, MusehubRepo.slug == slug))
repo = result.scalar_one()
db_session.add(MusehubIntelRefactorEvent(
event_id=blob_id(f"refactor-xss-{slug}".encode()),
repo_id=repo.repo_id,
kind="implementation",
address=address,
detail='
',
commit_id=blob_id(f"rc-xss-{slug}".encode()),
committed_at=_utc_now(),
))
await db_session.commit()
@pytest_asyncio.fixture
async def seed_symbol_with_xss_commit(db_session: AsyncSession) -> tuple[str, str, str]:
"""Create a symbol whose commit message contains an XSS payload."""
owner, slug = "gabriel", "xss-repo"
address = "src/evil.py::fn"
repo = await _make_repo_row(db_session, owner, slug)
commit_id = blob_id(f"commit-xss-{slug}".encode())
await _make_commit_row(
db_session, repo.repo_id, commit_id,
message='
feat: xss test',
)
await _make_history_entry(db_session, repo.repo_id, address, commit_id)
return (owner, slug, address)
@pytest_asyncio.fixture
async def seed_symbol_with_large_history(db_session: AsyncSession) -> tuple[str, str, str]:
"""Create a symbol with 200 history entries (stress test — not 10k, keeps test fast)."""
owner, slug = "gabriel", "large-history-repo"
address = "src/big.py::process"
repo = await _make_repo_row(db_session, owner, slug)
for i in range(200):
cid = blob_id(f"commit-large-{i}-{slug}".encode())
await _make_commit_row(db_session, repo.repo_id, cid)
await _make_history_entry(db_session, repo.repo_id, address, cid, op="modify")
return (owner, slug, address)
@pytest_asyncio.fixture
async def seed_symbol_high_coupling(db_session: AsyncSession) -> tuple[str, str, str]:
"""Create a symbol that co-changes with many partners."""
from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
owner, slug = "gabriel", "coupling-repo"
address = "src/hub.py::dispatch"
repo = await _make_repo_row(db_session, owner, slug)
commit_id = blob_id(f"commit-coupling-{slug}".encode())
await _make_commit_row(db_session, repo.repo_id, commit_id)
await _make_history_entry(db_session, repo.repo_id, address, commit_id)
# 25 co-changing partners in the same commit
for i in range(25):
partner = f"src/partner_{i}.py::fn"
db_session.add(MusehubSymbolHistoryEntry(
repo_id=repo.repo_id,
address=partner,
commit_id=commit_id,
committed_at=_utc_now(),
author="gabriel",
op="modify",
content_id=blob_id(f"body-partner-{i}".encode()),
))
await db_session.commit()
return (owner, slug, address)
@pytest_asyncio.fixture
async def seed_symbol_with_clones(db_session: AsyncSession) -> tuple[str, str, str]:
"""Create a symbol with a clone entry."""
from musehub.db.musehub_intel_models import MusehubHashOccurrenceEntry
owner, slug = "gabriel", "clones-repo"
address = "src/original.py::fn"
repo = await _make_repo_row(db_session, owner, slug)
commit_id = blob_id(f"commit-clone-{slug}".encode())
content_id = blob_id(f"shared-body-{slug}".encode())
await _make_commit_row(db_session, repo.repo_id, commit_id)
await _make_history_entry(db_session, repo.repo_id, address, commit_id, content_id=content_id)
# Clone: same content_id, different address
db_session.add(MusehubHashOccurrenceEntry(
repo_id=repo.repo_id,
content_id=content_id,
address="src/copy.py::fn",
))
db_session.add(MusehubHashOccurrenceEntry(
repo_id=repo.repo_id,
content_id=content_id,
address=address,
))
await db_session.commit()
return (owner, slug, address)
@pytest.fixture
def benchmark_timer() -> typing.Callable[[float], typing.ContextManager[None]]:
"""Context manager that asserts elapsed time stays under max_ms."""
@_contextlib.contextmanager
def _timer(max_ms: float) -> Generator[None, None, None]:
start = _time.monotonic()
yield
elapsed_ms = (_time.monotonic() - start) * 1000
assert elapsed_ms < max_ms, f"took {elapsed_ms:.0f}ms, limit {max_ms}ms"
return _timer
# ---------------------------------------------------------------------------
# Pagination fixtures
# Used by test_symbol_detail_pagination.py.
# ---------------------------------------------------------------------------
async def _seed_history_entries(
db_session: AsyncSession,
owner: str,
slug: str,
count: int,
) -> tuple[str, str, str]:
"""Create a repo + symbol with *count* history entries spaced 1 hour apart.
Commit messages are ``entry-{i}`` for i in 0..count-1.
entry-0 is the oldest, entry-(count-1) is the newest.
Returns (owner, slug, address).
"""
from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
address = "src/core.py::paginate_fn"
repo = await _make_repo_row(db_session, owner, slug)
base_ts = _dt.datetime(2026, 1, 1, 0, 0, 0, tzinfo=_dt.timezone.utc)
for i in range(count):
committed_at = base_ts + _dt.timedelta(hours=i)
commit_id = blob_id(f"commit-hist-{i}-{slug}".encode())
await _make_commit_row(
db_session, repo.repo_id, commit_id,
message=f"entry-{i}",
timestamp=committed_at,
)
await _make_history_entry(
db_session, repo.repo_id, address, commit_id,
op="modify", committed_at=committed_at,
message=f"entry-{i}",
)
return (owner, slug, address)
async def _seed_coupling_partners(
db_session: AsyncSession,
owner: str,
slug: str,
partner_count: int,
) -> tuple[str, str, str]:
"""Create a repo + symbol with *partner_count* coupling partners.
The target symbol appears in all *partner_count* commits.
Partner i appears in commits i..(partner_count-1), giving it
shared_commits = partner_count - i (descending: partner_0 has the most).
Returns (owner, slug, address).
"""
from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
address = "src/hub.py::dispatch"
repo = await _make_repo_row(db_session, owner, slug)
base_ts = _dt.datetime(2026, 2, 1, 0, 0, 0, tzinfo=_dt.timezone.utc)
for j in range(partner_count):
committed_at = base_ts + _dt.timedelta(hours=j)
commit_id = blob_id(f"commit-coup-{j}-{slug}".encode())
await _make_commit_row(
db_session, repo.repo_id, commit_id,
message=f"coupling-commit-{j}",
timestamp=committed_at,
)
# Target symbol in every commit
db_session.add(MusehubSymbolHistoryEntry(
repo_id=repo.repo_id,
address=address,
commit_id=commit_id,
committed_at=committed_at,
author="gabriel",
op="modify",
content_id=blob_id(f"body-target-{j}-{slug}".encode()),
))
# Partner i appears in commit j only when i <= j
for i in range(j + 1):
partner = f"src/partner_{i}.py::fn_{i}"
db_session.add(MusehubSymbolHistoryEntry(
repo_id=repo.repo_id,
address=partner,
commit_id=commit_id,
committed_at=committed_at,
author="gabriel",
op="modify",
content_id=blob_id(f"body-partner-{i}-{j}-{slug}".encode()),
))
await db_session.commit()
# Insert pre-computed coupling rows: partner_i appears in partner_count-i commits.
from musehub.db.musehub_intel_models import MusehubSymbolCoupling
for i in range(partner_count):
db_session.add(MusehubSymbolCoupling(
repo_id=repo.repo_id,
address=address,
co_address=f"src/partner_{i}.py::fn_{i}",
shared_commits=partner_count - i,
))
await db_session.commit()
return (owner, slug, address)
@pytest_asyncio.fixture
async def seed_symbol_with_26_history(db_session: AsyncSession) -> tuple[str, str, str]:
return await _seed_history_entries(db_session, "gabriel", "hist26-repo", 26)
@pytest_asyncio.fixture
async def seed_symbol_with_exactly_10_history(db_session: AsyncSession) -> tuple[str, str, str]:
return await _seed_history_entries(db_session, "gabriel", "hist10-repo", 10)
@pytest_asyncio.fixture
async def seed_symbol_with_11_history(db_session: AsyncSession) -> tuple[str, str, str]:
return await _seed_history_entries(db_session, "gabriel", "hist11-repo", 11)
@pytest_asyncio.fixture
async def seed_symbol_high_coupling_40(db_session: AsyncSession) -> tuple[str, str, str]:
return await _seed_coupling_partners(db_session, "gabriel", "coup40-repo", 40)
@pytest_asyncio.fixture
async def seed_symbol_with_exactly_15_coupling(db_session: AsyncSession) -> tuple[str, str, str]:
return await _seed_coupling_partners(db_session, "gabriel", "coup15-repo", 15)
@pytest_asyncio.fixture
async def seed_symbol_with_16_coupling(db_session: AsyncSession) -> tuple[str, str, str]:
return await _seed_coupling_partners(db_session, "gabriel", "coup16-repo", 16)
@pytest_asyncio.fixture
async def seed_symbol_with_26_history_and_40_coupling(
db_session: AsyncSession,
) -> tuple[str, str, str]:
"""26 history entries + 26 coupling partners (from the same commits).
The target appears in all 26 commits. Partner_i appears in commits i..25,
giving shared_commits = 26 - i (descending). This yields 26 partners with
positive shared counts → 2 coupling pages (15 + 11) and 3 history pages
(10 + 10 + 6). Keeping partners in the history commits avoids inflating the
target's change_count with extra coupling-only commits.
"""
from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
owner, slug = "gabriel", "hist26-coup26-repo"
address = "src/core.py::paginate_fn"
repo = await _make_repo_row(db_session, owner, slug)
base_ts = _dt.datetime(2026, 1, 1, 0, 0, 0, tzinfo=_dt.timezone.utc)
# 26 commits — target appears in all; partner_i appears in commits i..25.
for i in range(26):
committed_at = base_ts + _dt.timedelta(hours=i)
commit_id = blob_id(f"commit-combo-{i}-{slug}".encode())
await _make_commit_row(
db_session, repo.repo_id, commit_id,
message=f"entry-{i}", timestamp=committed_at,
)
await _make_history_entry(
db_session, repo.repo_id, address, commit_id,
op="modify", committed_at=committed_at,
message=f"entry-{i}",
)
# Every partner whose index <= i is added to this commit.
# Partner_j appears in commits j..25 → shared = 26 - j.
for j in range(i + 1):
partner = f"src/partner_{j}.py::fn_{j}"
db_session.add(MusehubSymbolHistoryEntry(
repo_id=repo.repo_id,
address=partner,
commit_id=commit_id,
committed_at=committed_at,
author="gabriel",
op="modify",
content_id=blob_id(f"body-combo-partner-{j}-{i}-{slug}".encode()),
))
await db_session.commit()
# Insert pre-computed coupling rows: partner_j appears in 26-j commits.
from musehub.db.musehub_intel_models import MusehubSymbolCoupling
for j in range(26):
db_session.add(MusehubSymbolCoupling(
repo_id=repo.repo_id,
address=address,
co_address=f"src/partner_{j}.py::fn_{j}",
shared_commits=26 - j,
))
await db_session.commit()
return (owner, slug, address)