gabriel / musehub public
worker.py python
235 lines 9.5 KB
Raw
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 7 days ago
1 """MuseHub background worker — processes jobs from musehub_background_jobs.
2
3 Run as a separate process (separate Docker service) so that memory-intensive
4 jobs (intel indexing, GC) cannot OOM-crash the web server.
5
6 Usage:
7 python -m musehub.worker
8
9 Environment:
10 DATABASE_URL — async SQLAlchemy URL (postgresql+asyncpg://...)
11 WORKER_POLL_INTERVAL — seconds between poll cycles (default: 1.0)
12 WORKER_MAX_JOBS — max jobs to process before exiting (default: unlimited, 0)
13
14 Architecture
15 ------------
16 Job handlers are registered in ``musehub.services.musehub_intel_providers``.
17 Each ``IntelProvider`` is keyed by job_type string and returns a list of
18 ``(intel_type, data_dict)`` tuples. The worker persists those results via
19 ``persist_intel_results`` and then marks the job done.
20
21 Adding a new domain:
22 1. Implement ``IntelProvider`` in musehub_intel_providers.py
23 2. Add it to ``_PROVIDER_REGISTRY`` there
24 3. Update ``job_types_for_push`` to enqueue it for the right domain
25
26 No changes to this file are needed for new domains.
27
28 Stale claim recovery
29 --------------------
30 On every poll cycle the worker resets any ``running`` jobs whose
31 ``claimed_at`` is older than ``_STALE_CLAIM_MINUTES``. This handles the
32 case where a previous worker process crashed mid-job and left a row stuck
33 in the ``running`` state.
34 """
35
36 import asyncio
37 import logging
38 import os
39 import traceback
40
41
42 # Force DEBUG=false before any musehub imports so SQLAlchemy engine is
43 # created with echo=False regardless of the container's DEBUG env var.
44 os.environ.setdefault("DEBUG", "false")
45 os.environ["DEBUG"] = "false"
46
47 logger = logging.getLogger(__name__)
48
49 _POLL_INTERVAL: float = float(os.environ.get("WORKER_POLL_INTERVAL", "1.0"))
50 _MAX_JOBS: int = int(os.environ.get("WORKER_MAX_JOBS", "0")) # 0 = unlimited
51
52 # ---------------------------------------------------------------------------
53 # GC handler (not an IntelProvider — no intel results)
54 # ---------------------------------------------------------------------------
55
56 async def _handle_gc(session: "AsyncSession", repo_id: str) -> None:
57 from musehub.services.musehub_gc import gc_fetch_mpack_cache, run_gc
58 result = await run_gc(session, repo_id)
59 if result.commits_deleted:
60 logger.info(
61 "✅ gc done: repo=%s pruned %d commits, %d snapshots",
62 repo_id, result.commits_deleted, result.snapshots_deleted,
63 )
64 # FMC_19 — GC expired fetch mpack cache entries and their R2 objects
65 n_cache = await gc_fetch_mpack_cache(session, repo_id)
66 if n_cache:
67 logger.info("✅ gc: cleaned %d expired fetch mpack cache rows repo=%s", n_cache, repo_id)
68 await session.commit()
69
70 # ---------------------------------------------------------------------------
71 # Poll loop
72 # ---------------------------------------------------------------------------
73
74 async def _process_one() -> bool:
75 """Claim and execute one job. Returns True if a job was found."""
76 from sqlalchemy.ext.asyncio import AsyncSession
77 from musehub.db.database import AsyncSessionLocal
78 from musehub.services.musehub_jobs import (
79 claim_next_job,
80 complete_job,
81 fail_job,
82 reclaim_stale_jobs,
83 )
84 from musehub.services.musehub_intel_providers import (
85 get_provider,
86 persist_intel_results,
87 )
88
89 # Reset stale running jobs before claiming a new one.
90 async with AsyncSessionLocal() as session:
91 n_reclaimed = await reclaim_stale_jobs(session)
92 if n_reclaimed:
93 await session.commit()
94
95 async with AsyncSessionLocal() as session:
96 job = await claim_next_job(session)
97 if job is None:
98 return False
99 await session.commit()
100
101 job_id = job.job_id
102 job_type = job.job_type
103 repo_id = job.repo_id
104 payload = job.payload or {}
105
106 try:
107 if job_type == "gc":
108 async with AsyncSessionLocal() as session:
109 await _handle_gc(session, repo_id)
110
111 elif job_type == "push.file_last_commits":
112 async with AsyncSessionLocal() as session:
113 from musehub.services.file_last_commits import compute_and_store_file_last_commits
114 branch = str(payload.get("branch", ""))
115 head_commit_id = str(payload.get("head", ""))
116 if branch and head_commit_id:
117 await compute_and_store_file_last_commits(session, repo_id, branch, head_commit_id)
118 await session.commit()
119 logger.info("✅ push.file_last_commits done: repo=%s branch=%s", repo_id, branch)
120
121 elif job_type == "content.scan":
122 # Stub — always passes clean until a real CSAM API is configured.
123 logger.debug("content.scan: stub pass for job=%s repo=%s", job_id, repo_id)
124
125 elif job_type == "mpack.index":
126 async with AsyncSessionLocal() as session:
127 from musehub.services.musehub_wire import process_mpack_index_job
128 result = await process_mpack_index_job(session, job_id)
129 await session.commit()
130 logger.info(
131 "✅ mpack.index done: job=%s indexed=%d byte_ranges=%d elapsed=%.1fms",
132 job_id[:16], result.get("mpack_index_written", 0),
133 result.get("byte_ranges_computed", 0), result.get("elapsed_ms", 0),
134 )
135
136 elif job_type == "mpack.gc":
137 async with AsyncSessionLocal() as session:
138 from musehub.services.musehub_wire import process_mpack_gc_job
139 result = await process_mpack_gc_job(session, repo_id)
140 await session.commit()
141 if result["skipped"]:
142 logger.info("✅ mpack.gc done (skipped): repo=%s mpacks=%d", repo_id, result["packs_before"])
143 else:
144 logger.info(
145 "✅ mpack.gc done: repo=%s mpacks %d→%d key=%s",
146 repo_id, result["packs_before"], result["packs_after"], result["consolidated_key"],
147 )
148
149 elif job_type == "fetch.mpack.prebuild":
150 async with AsyncSessionLocal() as session:
151 from musehub.services.musehub_wire_fetch import process_fetch_mpack_prebuild_job
152 result = await process_fetch_mpack_prebuild_job(session, job_id)
153 await session.commit()
154 logger.info(
155 "✅ fetch.mpack.prebuild done: job=%s repo=%s tips=%d built=%d skipped=%d elapsed=%.1fms",
156 job_id[:16], repo_id,
157 result["tips_requested"], result["tips_built"],
158 result["tips_skipped"], result["elapsed_ms"],
159 )
160
161 elif job_type == "profile.snapshot":
162 # Profile snapshot jobs write directly to musehub_profile_snapshots
163 # (not through persist_intel_results) and need their own commit.
164 async with AsyncSessionLocal() as session:
165 from musehub.services.musehub_intel_providers import _compute_and_persist_profile_snapshot
166 handle = str(payload.get("handle", ""))
167 if handle:
168 await _compute_and_persist_profile_snapshot(session, handle)
169 await session.commit()
170 logger.info("✅ profile.snapshot done: handle=%s", handle)
171
172 else:
173 provider = get_provider(job_type)
174 if provider is None:
175 async with AsyncSessionLocal() as session:
176 await fail_job(session, job_id, f"unknown job_type: {job_type}")
177 await session.commit()
178 return True
179
180 ref = str(payload.get("head", ""))
181 async with AsyncSessionLocal() as session:
182 results = await provider.compute(session, repo_id, ref, payload)
183 if results:
184 await persist_intel_results(session, repo_id, ref, results)
185 await session.commit()
186 logger.info(
187 "✅ %s done: repo=%s ref=%s types=[%s]",
188 job_type,
189 repo_id,
190 ref if ref else "—",
191 ", ".join(t for t, _ in results),
192 )
193
194 async with AsyncSessionLocal() as session:
195 await complete_job(session, job_id)
196 await session.commit()
197
198 except Exception:
199 err = traceback.format_exc()
200 logger.error("Job failed: type=%s repo=%s\n%s", job_type, repo_id, err)
201 async with AsyncSessionLocal() as session:
202 await fail_job(session, job_id, err)
203 await session.commit()
204
205 return True
206
207 async def run() -> None:
208 """Poll the job queue until the process is interrupted."""
209 logging.basicConfig(
210 level=logging.INFO,
211 format='{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s"}',
212 )
213 from musehub.db.database import init_db
214 await init_db()
215 logger.info("✅ MuseHub worker started (poll_interval=%.1fs)", _POLL_INTERVAL)
216
217 jobs_processed = 0
218 while True:
219 try:
220 found = await _process_one()
221 except Exception:
222 logger.error("Worker poll error:\n%s", traceback.format_exc())
223 found = False
224
225 if _MAX_JOBS and jobs_processed >= _MAX_JOBS:
226 logger.info("Worker reached MAX_JOBS=%d, exiting.", _MAX_JOBS)
227 break
228
229 if found:
230 jobs_processed += 1
231 else:
232 await asyncio.sleep(_POLL_INTERVAL)
233
234 if __name__ == "__main__":
235 asyncio.run(run())
File History 3 commits
sha256:f58d788df3ccdda8f8987b428418db655a38582309239b99d7b9715ea6dff618 feat(#92): phase 5 — GC expired fetch mpack cache entries (… Sonnet 4.6 patch 7 days ago
sha256:0e447fc3f6b7887d5d9e86b557c659ef7d0b05e2e09ddb0cb551ada240e48a51 feat(phase2): fetch.mpack.prebuild job handler + worker dis… Sonnet 4.6 patch 7 days ago
sha256:8a7ff43f27504c1f6abba59ffbab3dc89bd1d8bcfa4c57f6e280286d1a58195a feat: resurrect process_mpack_index_job with byte-range ind… Sonnet 4.6 patch 18 days ago