""" Tier 4 — Stress tests for enrich_repo_cards() under load. All tests run against the test database (not mocks) to catch real query behaviour: N+1 regressions, batch overflows, and degenerate data patterns that would silently misbehave in production. Test IDs -------- T400 — enriching 50 repos issues exactly 5 SQL queries (no N+1) T401 — enriching 100 repos completes in < 2 s (performance floor) T402 — repos with 1000 commits each produce correct pulse buckets T403 — 100 symbols per repo returns the correct hottest without full-scan T404 — mixed batch: some repos with data, some without — no cross-contamination T405 — passing duplicate repo_ids is idempotent (no doubled rows) """ from __future__ import annotations import time from datetime import datetime, timedelta, timezone import typing import pytest from sqlalchemy import Executable from sqlalchemy.engine import CursorResult from sqlalchemy.ext.asyncio import AsyncSession from musehub.services.repo_card_enrichment import ( _PULSE_DAYS, enrich_repo_cards, ) from tests.factories import create_commit, create_repo def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) def _days_ago(n: int) -> datetime: return _utc_now() - timedelta(days=n) # --------------------------------------------------------------------------- # T400 — no N+1: 5 queries regardless of batch size # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t400_no_n_plus_one_queries(db_session: AsyncSession) -> None: """T400: enriching 50 repos uses at most 6 queries (5 signal + 1 init).""" repos = [await create_repo(db_session, visibility="public") for _ in range(50)] repo_ids = [r.repo_id for r in repos] query_count = 0 original_execute = db_session.execute async def counting_execute(stmt: Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]: nonlocal query_count query_count += 1 return await original_execute(stmt, *args, **kwargs) db_session.execute = counting_execute # type: ignore[method-assign] await enrich_repo_cards(db_session, repo_ids) db_session.execute = original_execute # type: ignore[method-assign] # 5 signal queries (pulse, autonomy, hottest, blast, dead+breakage). # Some implementations may split dead/breakage — allow up to 7. assert query_count <= 7, f"Expected ≤7 queries, got {query_count}" # --------------------------------------------------------------------------- # T401 — 100-repo batch completes in < 2 s # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t401_hundred_repos_under_two_seconds(db_session: AsyncSession) -> None: """T401: enrich_repo_cards with 100 repos finishes in under 2 seconds.""" repos = [await create_repo(db_session, visibility="public") for _ in range(100)] repo_ids = [r.repo_id for r in repos] t0 = time.monotonic() await enrich_repo_cards(db_session, repo_ids) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"Enrichment took {elapsed:.2f}s — expected < 2s" # --------------------------------------------------------------------------- # T402 — 1000 commits produce valid 30-day pulse # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t402_high_volume_commits_correct_pulse(db_session: AsyncSession) -> None: """T402: a repo with 1000 commits in the window yields valid 30-bucket pulse.""" repo = await create_repo(db_session, visibility="public") # Spread 1000 commits across the 30-day window for i in range(1000): day_offset = i % _PULSE_DAYS await create_commit(db_session, repo.repo_id, timestamp=_days_ago(day_offset)) result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] assert len(enc.pulse_buckets) == _PULSE_DAYS total_counted = sum(b.count for b in enc.pulse_buckets) assert total_counted == 1000 # Busiest bucket is normalised to h=24 max_h = max(b.h for b in enc.pulse_buckets) assert max_h == 24 # --------------------------------------------------------------------------- # T403 — 100 symbols: hottest is still the correct one # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t403_hundred_symbols_hottest_correct(db_session: AsyncSession) -> None: """T403: with 100 symbols the hottest is reliably the one with max churn_30d.""" from musehub.db.musehub_intel_models import MusehubSymbolIntel repo = await create_repo(db_session, visibility="public") for i in range(99): db_session.add(MusehubSymbolIntel( repo_id=repo.repo_id, address=f"src/mod_{i}.py::fn_{i}", churn_30d=i, blast=0, )) # The winner: churn_30d = 9999 db_session.add(MusehubSymbolIntel( repo_id=repo.repo_id, address="src/winner.py::hottest_fn", churn_30d=9999, blast=0, )) await db_session.commit() result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] assert enc.hottest_symbol is not None assert enc.hottest_symbol.address == "src/winner.py::hottest_fn" assert enc.hottest_symbol.churn_30d == 9999 # --------------------------------------------------------------------------- # T404 — mixed batch: data isolation # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t404_mixed_batch_no_cross_contamination(db_session: AsyncSession) -> None: """T404: 25 repos with data + 25 without — no signal leaks between repos.""" from musehub.db.musehub_intel_models import MusehubSymbolIntel repos_with = [await create_repo(db_session, visibility="public") for _ in range(25)] repos_without = [await create_repo(db_session, visibility="public") for _ in range(25)] for repo in repos_with: db_session.add(MusehubSymbolIntel( repo_id=repo.repo_id, address="src/a.py::fn", churn_30d=10, blast=5, )) await db_session.commit() all_ids = [r.repo_id for r in repos_with + repos_without] result = await enrich_repo_cards(db_session, all_ids) for repo in repos_with: assert result[repo.repo_id].hottest_symbol is not None for repo in repos_without: enc = result[repo.repo_id] assert enc.hottest_symbol is None assert enc.blast_leader is None assert enc.dead_count == 0 assert enc.autonomy_pct == 0 # --------------------------------------------------------------------------- # T405 — duplicate repo_ids are idempotent # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t405_duplicate_repo_ids_idempotent(db_session: AsyncSession) -> None: """T405: passing the same repo_id twice yields exactly one result entry.""" repo = await create_repo(db_session, visibility="public") await create_commit(db_session, repo.repo_id, timestamp=_utc_now()) result = await enrich_repo_cards( db_session, [repo.repo_id, repo.repo_id, repo.repo_id] ) # Only one entry regardless of duplicates in input assert len(result) == 1 assert repo.repo_id in result # Pulse should not double-count due to deduplication total_counted = sum(b.count for b in result[repo.repo_id].pulse_buckets) assert total_counted == 1