test_enqueue_batch.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """TDD — enqueue_push_intel must use O(1) DB queries, not O(N). |
| 2 | |
| 3 | Root cause of the phase-10 slowness observed 2026-05-10: |
| 4 | enqueue_job called once per job type, each call doing a sequential SELECT |
| 5 | (idempotency check) + INSERT. With 17 job types + profile.snapshot = 18 |
| 6 | round-trips × ~80ms each = ~1.4s added to every push finalization. |
| 7 | |
| 8 | Fix: batch the idempotency check into a single |
| 9 | SELECT job_type FROM musehub_background_jobs |
| 10 | WHERE repo_id=X AND job_type IN (...) AND status='pending' |
| 11 | then bulk INSERT the missing ones in one statement. |
| 12 | |
| 13 | Tests: |
| 14 | B1 structural: enqueue_push_intel source must not call enqueue_job in a loop |
| 15 | B2 idempotency: second call inserts 0 new rows |
| 16 | B3 wall-clock for enqueue_push_intel + enqueue_profile_snapshot < 50ms |
| 17 | B4 all job types from job_types_for_push actually land in the DB |
| 18 | """ |
| 19 | from __future__ import annotations |
| 20 | |
| 21 | import inspect |
| 22 | import time |
| 23 | |
| 24 | import pytest |
| 25 | from sqlalchemy import func, select |
| 26 | from sqlalchemy.ext.asyncio import AsyncSession |
| 27 | |
| 28 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 29 | from musehub.services.musehub_jobs import enqueue_profile_snapshot, enqueue_push_intel |
| 30 | from musehub.services.musehub_intel_providers import job_types_for_push |
| 31 | from tests.factories import create_repo |
| 32 | |
| 33 | |
| 34 | # --------------------------------------------------------------------------- |
| 35 | # B1 — structural: enqueue_push_intel must not call enqueue_job in a loop |
| 36 | # --------------------------------------------------------------------------- |
| 37 | |
| 38 | def test_B1_no_enqueue_job_loop() -> None: |
| 39 | """enqueue_push_intel source must not contain a for-loop that calls enqueue_job.""" |
| 40 | import musehub.services.musehub_jobs as _jobs |
| 41 | |
| 42 | src = inspect.getsource(_jobs.enqueue_push_intel) |
| 43 | lines = src.splitlines() |
| 44 | |
| 45 | for_indent: int | None = None |
| 46 | for line in lines: |
| 47 | stripped = line.strip() |
| 48 | if not stripped or stripped.startswith("#"): |
| 49 | continue |
| 50 | indent = len(line) - len(line.lstrip()) |
| 51 | if stripped.startswith("for ") and "job_type" in stripped: |
| 52 | for_indent = indent |
| 53 | continue |
| 54 | if for_indent is not None: |
| 55 | if indent <= for_indent: |
| 56 | for_indent = None |
| 57 | elif "enqueue_job" in stripped and "await" in stripped: |
| 58 | raise AssertionError( |
| 59 | "enqueue_push_intel still calls enqueue_job inside a for-loop — " |
| 60 | "O(N) DB round-trips. Replace with a single batch " |
| 61 | "SELECT … IN + bulk INSERT … ON CONFLICT DO NOTHING." |
| 62 | ) |
| 63 | |
| 64 | |
| 65 | # --------------------------------------------------------------------------- |
| 66 | # B2 — idempotency: second call inserts 0 rows |
| 67 | # --------------------------------------------------------------------------- |
| 68 | |
| 69 | @pytest.mark.asyncio |
| 70 | async def test_B2_idempotent(db_session: AsyncSession) -> None: |
| 71 | """Second call for the same repo must insert 0 new rows.""" |
| 72 | repo = await create_repo(db_session, owner="gabriel") |
| 73 | repo_id = str(repo.repo_id) |
| 74 | |
| 75 | await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None) |
| 76 | await db_session.commit() |
| 77 | |
| 78 | count_before = (await db_session.execute( |
| 79 | select(func.count()).select_from(MusehubBackgroundJob).where( |
| 80 | MusehubBackgroundJob.repo_id == repo_id, |
| 81 | MusehubBackgroundJob.status == "pending", |
| 82 | ) |
| 83 | )).scalar() |
| 84 | |
| 85 | await enqueue_push_intel(db_session, repo_id, head="sha256:def", domain_id=None) |
| 86 | await db_session.commit() |
| 87 | |
| 88 | count_after = (await db_session.execute( |
| 89 | select(func.count()).select_from(MusehubBackgroundJob).where( |
| 90 | MusehubBackgroundJob.repo_id == repo_id, |
| 91 | MusehubBackgroundJob.status == "pending", |
| 92 | ) |
| 93 | )).scalar() |
| 94 | |
| 95 | assert count_after == count_before, ( |
| 96 | f"Second call inserted {count_after - count_before} extra rows — " |
| 97 | "idempotency broken." |
| 98 | ) |
| 99 | |
| 100 | |
| 101 | # --------------------------------------------------------------------------- |
| 102 | # B3 — wall-clock < 50ms with real DB |
| 103 | # --------------------------------------------------------------------------- |
| 104 | |
| 105 | @pytest.mark.asyncio |
| 106 | async def test_B3_fast(db_session: AsyncSession) -> None: |
| 107 | """enqueue_push_intel + enqueue_profile_snapshot must complete in <50ms.""" |
| 108 | repo = await create_repo(db_session, owner="gabriel") |
| 109 | repo_id = str(repo.repo_id) |
| 110 | |
| 111 | t0 = time.monotonic() |
| 112 | await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None) |
| 113 | await enqueue_profile_snapshot(db_session, repo_id, handle="gabriel") |
| 114 | elapsed = time.monotonic() - t0 |
| 115 | |
| 116 | assert elapsed < 0.05, ( |
| 117 | f"enqueue_push_intel + enqueue_profile_snapshot took {elapsed*1000:.1f}ms " |
| 118 | f"— expected <50ms. Sequential per-job DB round-trips are the cause." |
| 119 | ) |
| 120 | |
| 121 | |
| 122 | # --------------------------------------------------------------------------- |
| 123 | # B4 — all job types actually get inserted |
| 124 | # --------------------------------------------------------------------------- |
| 125 | |
| 126 | @pytest.mark.asyncio |
| 127 | async def test_B4_all_job_types_inserted(db_session: AsyncSession) -> None: |
| 128 | """Every type from job_types_for_push must appear in the DB after enqueueing.""" |
| 129 | repo = await create_repo(db_session, owner="gabriel") |
| 130 | repo_id = str(repo.repo_id) |
| 131 | expected = set(job_types_for_push(None)) |
| 132 | |
| 133 | await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None) |
| 134 | await db_session.commit() |
| 135 | |
| 136 | rows = (await db_session.execute( |
| 137 | select(MusehubBackgroundJob.job_type).where( |
| 138 | MusehubBackgroundJob.repo_id == repo_id, |
| 139 | MusehubBackgroundJob.status == "pending", |
| 140 | ) |
| 141 | )).scalars().all() |
| 142 | actual = set(rows) |
| 143 | |
| 144 | missing = expected - actual |
| 145 | assert not missing, f"These job types were not inserted: {missing}" |