"""TDD — enqueue_push_intel must use O(1) DB queries, not O(N). Root cause of the phase-10 slowness observed 2026-05-10: enqueue_job called once per job type, each call doing a sequential SELECT (idempotency check) + INSERT. With 17 job types + profile.snapshot = 18 round-trips × ~80ms each = ~1.4s added to every push finalization. Fix: batch the idempotency check into a single SELECT job_type FROM musehub_background_jobs WHERE repo_id=X AND job_type IN (...) AND status='pending' then bulk INSERT the missing ones in one statement. Tests: B1 structural: enqueue_push_intel source must not call enqueue_job in a loop B2 idempotency: second call inserts 0 new rows B3 wall-clock for enqueue_push_intel + enqueue_profile_snapshot < 50ms B4 all job types from job_types_for_push actually land in the DB """ from __future__ import annotations import inspect import time import pytest from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.services.musehub_jobs import enqueue_profile_snapshot, enqueue_push_intel from musehub.services.musehub_intel_providers import job_types_for_push from tests.factories import create_repo # --------------------------------------------------------------------------- # B1 — structural: enqueue_push_intel must not call enqueue_job in a loop # --------------------------------------------------------------------------- def test_B1_no_enqueue_job_loop() -> None: """enqueue_push_intel source must not contain a for-loop that calls enqueue_job.""" import musehub.services.musehub_jobs as _jobs src = inspect.getsource(_jobs.enqueue_push_intel) lines = src.splitlines() for_indent: int | None = None for line in lines: stripped = line.strip() if not stripped or stripped.startswith("#"): continue indent = len(line) - len(line.lstrip()) if stripped.startswith("for ") and "job_type" in stripped: for_indent = indent continue if for_indent is not None: if indent <= for_indent: for_indent = None elif "enqueue_job" in stripped and "await" in stripped: raise AssertionError( "enqueue_push_intel still calls enqueue_job inside a for-loop — " "O(N) DB round-trips. Replace with a single batch " "SELECT … IN + bulk INSERT … ON CONFLICT DO NOTHING." ) # --------------------------------------------------------------------------- # B2 — idempotency: second call inserts 0 rows # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_B2_idempotent(db_session: AsyncSession) -> None: """Second call for the same repo must insert 0 new rows.""" repo = await create_repo(db_session, owner="gabriel") repo_id = str(repo.repo_id) await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None) await db_session.commit() count_before = (await db_session.execute( select(func.count()).select_from(MusehubBackgroundJob).where( MusehubBackgroundJob.repo_id == repo_id, MusehubBackgroundJob.status == "pending", ) )).scalar() await enqueue_push_intel(db_session, repo_id, head="sha256:def", domain_id=None) await db_session.commit() count_after = (await db_session.execute( select(func.count()).select_from(MusehubBackgroundJob).where( MusehubBackgroundJob.repo_id == repo_id, MusehubBackgroundJob.status == "pending", ) )).scalar() assert count_after == count_before, ( f"Second call inserted {count_after - count_before} extra rows — " "idempotency broken." ) # --------------------------------------------------------------------------- # B3 — wall-clock < 50ms with real DB # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_B3_fast(db_session: AsyncSession) -> None: """enqueue_push_intel + enqueue_profile_snapshot must complete in <50ms.""" repo = await create_repo(db_session, owner="gabriel") repo_id = str(repo.repo_id) t0 = time.monotonic() await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None) await enqueue_profile_snapshot(db_session, repo_id, handle="gabriel") elapsed = time.monotonic() - t0 assert elapsed < 0.05, ( f"enqueue_push_intel + enqueue_profile_snapshot took {elapsed*1000:.1f}ms " f"— expected <50ms. Sequential per-job DB round-trips are the cause." ) # --------------------------------------------------------------------------- # B4 — all job types actually get inserted # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_B4_all_job_types_inserted(db_session: AsyncSession) -> None: """Every type from job_types_for_push must appear in the DB after enqueueing.""" repo = await create_repo(db_session, owner="gabriel") repo_id = str(repo.repo_id) expected = set(job_types_for_push(None)) await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None) await db_session.commit() rows = (await db_session.execute( select(MusehubBackgroundJob.job_type).where( MusehubBackgroundJob.repo_id == repo_id, MusehubBackgroundJob.status == "pending", ) )).scalars().all() actual = set(rows) missing = expected - actual assert not missing, f"These job types were not inserted: {missing}"