gabriel / musehub public

test_enqueue_batch.py file-level

at sha256:7 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """TDD — enqueue_push_intel must use O(1) DB queries, not O(N).
2
3 Root cause of the phase-10 slowness observed 2026-05-10:
4 enqueue_job called once per job type, each call doing a sequential SELECT
5 (idempotency check) + INSERT. With 17 job types + profile.snapshot = 18
6 round-trips × ~80ms each = ~1.4s added to every push finalization.
7
8 Fix: batch the idempotency check into a single
9 SELECT job_type FROM musehub_background_jobs
10 WHERE repo_id=X AND job_type IN (...) AND status='pending'
11 then bulk INSERT the missing ones in one statement.
12
13 Tests:
14 B1 structural: enqueue_push_intel source must not call enqueue_job in a loop
15 B2 idempotency: second call inserts 0 new rows
16 B3 wall-clock for enqueue_push_intel + enqueue_profile_snapshot < 50ms
17 B4 all job types from job_types_for_push actually land in the DB
18 """
19 from __future__ import annotations
20
21 import inspect
22 import time
23
24 import pytest
25 from sqlalchemy import func, select
26 from sqlalchemy.ext.asyncio import AsyncSession
27
28 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
29 from musehub.services.musehub_jobs import enqueue_profile_snapshot, enqueue_push_intel
30 from musehub.services.musehub_intel_providers import job_types_for_push
31 from tests.factories import create_repo
32
33
34 # ---------------------------------------------------------------------------
35 # B1 — structural: enqueue_push_intel must not call enqueue_job in a loop
36 # ---------------------------------------------------------------------------
37
38 def test_B1_no_enqueue_job_loop() -> None:
39 """enqueue_push_intel source must not contain a for-loop that calls enqueue_job."""
40 import musehub.services.musehub_jobs as _jobs
41
42 src = inspect.getsource(_jobs.enqueue_push_intel)
43 lines = src.splitlines()
44
45 for_indent: int | None = None
46 for line in lines:
47 stripped = line.strip()
48 if not stripped or stripped.startswith("#"):
49 continue
50 indent = len(line) - len(line.lstrip())
51 if stripped.startswith("for ") and "job_type" in stripped:
52 for_indent = indent
53 continue
54 if for_indent is not None:
55 if indent <= for_indent:
56 for_indent = None
57 elif "enqueue_job" in stripped and "await" in stripped:
58 raise AssertionError(
59 "enqueue_push_intel still calls enqueue_job inside a for-loop — "
60 "O(N) DB round-trips. Replace with a single batch "
61 "SELECT … IN + bulk INSERT … ON CONFLICT DO NOTHING."
62 )
63
64
65 # ---------------------------------------------------------------------------
66 # B2 — idempotency: second call inserts 0 rows
67 # ---------------------------------------------------------------------------
68
69 @pytest.mark.asyncio
70 async def test_B2_idempotent(db_session: AsyncSession) -> None:
71 """Second call for the same repo must insert 0 new rows."""
72 repo = await create_repo(db_session, owner="gabriel")
73 repo_id = str(repo.repo_id)
74
75 await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None)
76 await db_session.commit()
77
78 count_before = (await db_session.execute(
79 select(func.count()).select_from(MusehubBackgroundJob).where(
80 MusehubBackgroundJob.repo_id == repo_id,
81 MusehubBackgroundJob.status == "pending",
82 )
83 )).scalar()
84
85 await enqueue_push_intel(db_session, repo_id, head="sha256:def", domain_id=None)
86 await db_session.commit()
87
88 count_after = (await db_session.execute(
89 select(func.count()).select_from(MusehubBackgroundJob).where(
90 MusehubBackgroundJob.repo_id == repo_id,
91 MusehubBackgroundJob.status == "pending",
92 )
93 )).scalar()
94
95 assert count_after == count_before, (
96 f"Second call inserted {count_after - count_before} extra rows — "
97 "idempotency broken."
98 )
99
100
101 # ---------------------------------------------------------------------------
102 # B3 — wall-clock < 50ms with real DB
103 # ---------------------------------------------------------------------------
104
105 @pytest.mark.asyncio
106 async def test_B3_fast(db_session: AsyncSession) -> None:
107 """enqueue_push_intel + enqueue_profile_snapshot must complete in <50ms."""
108 repo = await create_repo(db_session, owner="gabriel")
109 repo_id = str(repo.repo_id)
110
111 t0 = time.monotonic()
112 await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None)
113 await enqueue_profile_snapshot(db_session, repo_id, handle="gabriel")
114 elapsed = time.monotonic() - t0
115
116 assert elapsed < 0.05, (
117 f"enqueue_push_intel + enqueue_profile_snapshot took {elapsed*1000:.1f}ms "
118 f"— expected <50ms. Sequential per-job DB round-trips are the cause."
119 )
120
121
122 # ---------------------------------------------------------------------------
123 # B4 — all job types actually get inserted
124 # ---------------------------------------------------------------------------
125
126 @pytest.mark.asyncio
127 async def test_B4_all_job_types_inserted(db_session: AsyncSession) -> None:
128 """Every type from job_types_for_push must appear in the DB after enqueueing."""
129 repo = await create_repo(db_session, owner="gabriel")
130 repo_id = str(repo.repo_id)
131 expected = set(job_types_for_push(None))
132
133 await enqueue_push_intel(db_session, repo_id, head="sha256:abc", domain_id=None)
134 await db_session.commit()
135
136 rows = (await db_session.execute(
137 select(MusehubBackgroundJob.job_type).where(
138 MusehubBackgroundJob.repo_id == repo_id,
139 MusehubBackgroundJob.status == "pending",
140 )
141 )).scalars().all()
142 actual = set(rows)
143
144 missing = expected - actual
145 assert not missing, f"These job types were not inserted: {missing}"