gabriel / musehub public
worker.py python
219 lines 8.5 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """MuseHub background worker — processes jobs from musehub_background_jobs.
2
3 Run as a separate process (separate Docker service) so that memory-intensive
4 jobs (intel indexing, GC) cannot OOM-crash the web server.
5
6 Usage:
7 python -m musehub.worker
8
9 Environment:
10 DATABASE_URL — async SQLAlchemy URL (postgresql+asyncpg://...)
11 WORKER_POLL_INTERVAL — seconds between poll cycles (default: 1.0)
12 WORKER_MAX_JOBS — max jobs to process before exiting (default: unlimited, 0)
13
14 Architecture
15 ------------
16 Job handlers are registered in ``musehub.services.musehub_intel_providers``.
17 Each ``IntelProvider`` is keyed by job_type string and returns a list of
18 ``(intel_type, data_dict)`` tuples. The worker persists those results via
19 ``persist_intel_results`` and then marks the job done.
20
21 Adding a new domain:
22 1. Implement ``IntelProvider`` in musehub_intel_providers.py
23 2. Add it to ``_PROVIDER_REGISTRY`` there
24 3. Update ``job_types_for_push`` to enqueue it for the right domain
25
26 No changes to this file are needed for new domains.
27
28 Stale claim recovery
29 --------------------
30 On every poll cycle the worker resets any ``running`` jobs whose
31 ``claimed_at`` is older than ``_STALE_CLAIM_MINUTES``. This handles the
32 case where a previous worker process crashed mid-job and left a row stuck
33 in the ``running`` state.
34 """
35
36 import asyncio
37 import logging
38 import os
39 import traceback
40
41
42 # Force DEBUG=false before any musehub imports so SQLAlchemy engine is
43 # created with echo=False regardless of the container's DEBUG env var.
44 os.environ.setdefault("DEBUG", "false")
45 os.environ["DEBUG"] = "false"
46
47 logger = logging.getLogger(__name__)
48
49 _POLL_INTERVAL: float = float(os.environ.get("WORKER_POLL_INTERVAL", "1.0"))
50 _MAX_JOBS: int = int(os.environ.get("WORKER_MAX_JOBS", "0")) # 0 = unlimited
51
52 # ---------------------------------------------------------------------------
53 # GC handler (not an IntelProvider — no intel results)
54 # ---------------------------------------------------------------------------
55
56 async def _handle_gc(session: "AsyncSession", repo_id: str) -> None:
57 from musehub.services.musehub_gc import run_gc
58 result = await run_gc(session, repo_id)
59 if result.commits_deleted:
60 logger.info(
61 "✅ gc done: repo=%s pruned %d commits, %d snapshots",
62 repo_id, result.commits_deleted, result.snapshots_deleted,
63 )
64 await session.commit()
65
66 # ---------------------------------------------------------------------------
67 # Poll loop
68 # ---------------------------------------------------------------------------
69
70 async def _process_one() -> bool:
71 """Claim and execute one job. Returns True if a job was found."""
72 from sqlalchemy.ext.asyncio import AsyncSession
73 from musehub.db.database import AsyncSessionLocal
74 from musehub.services.musehub_jobs import (
75 claim_next_job,
76 complete_job,
77 fail_job,
78 reclaim_stale_jobs,
79 )
80 from musehub.services.musehub_intel_providers import (
81 get_provider,
82 persist_intel_results,
83 )
84
85 # Reset stale running jobs before claiming a new one.
86 async with AsyncSessionLocal() as session:
87 n_reclaimed = await reclaim_stale_jobs(session)
88 if n_reclaimed:
89 await session.commit()
90
91 async with AsyncSessionLocal() as session:
92 job = await claim_next_job(session)
93 if job is None:
94 return False
95 await session.commit()
96
97 job_id = job.job_id
98 job_type = job.job_type
99 repo_id = job.repo_id
100 payload = job.payload or {}
101
102 try:
103 if job_type == "gc":
104 async with AsyncSessionLocal() as session:
105 await _handle_gc(session, repo_id)
106
107 elif job_type == "push.file_last_commits":
108 async with AsyncSessionLocal() as session:
109 from musehub.services.file_last_commits import compute_and_store_file_last_commits
110 branch = str(payload.get("branch", ""))
111 head_commit_id = str(payload.get("head", ""))
112 if branch and head_commit_id:
113 await compute_and_store_file_last_commits(session, repo_id, branch, head_commit_id)
114 await session.commit()
115 logger.info("✅ push.file_last_commits done: repo=%s branch=%s", repo_id, branch)
116
117 elif job_type == "content.scan":
118 # Stub — always passes clean until a real CSAM API is configured.
119 logger.debug("content.scan: stub pass for job=%s repo=%s", job_id, repo_id)
120
121 elif job_type == "mpack.index":
122 async with AsyncSessionLocal() as session:
123 from musehub.services.musehub_wire import process_mpack_index_job
124 result = await process_mpack_index_job(session, job_id)
125 await session.commit()
126 logger.info(
127 "✅ mpack.index done: job=%s indexed=%d byte_ranges=%d elapsed=%.1fms",
128 job_id[:16], result.get("mpack_index_written", 0),
129 result.get("byte_ranges_computed", 0), result.get("elapsed_ms", 0),
130 )
131
132 elif job_type == "mpack.gc":
133 async with AsyncSessionLocal() as session:
134 from musehub.services.musehub_wire import process_mpack_gc_job
135 result = await process_mpack_gc_job(session, repo_id)
136 await session.commit()
137 if result["skipped"]:
138 logger.info("✅ mpack.gc done (skipped): repo=%s mpacks=%d", repo_id, result["packs_before"])
139 else:
140 logger.info(
141 "✅ mpack.gc done: repo=%s mpacks %d→%d key=%s",
142 repo_id, result["packs_before"], result["packs_after"], result["consolidated_key"],
143 )
144
145 elif job_type == "profile.snapshot":
146 # Profile snapshot jobs write directly to musehub_profile_snapshots
147 # (not through persist_intel_results) and need their own commit.
148 async with AsyncSessionLocal() as session:
149 from musehub.services.musehub_intel_providers import _compute_and_persist_profile_snapshot
150 handle = str(payload.get("handle", ""))
151 if handle:
152 await _compute_and_persist_profile_snapshot(session, handle)
153 await session.commit()
154 logger.info("✅ profile.snapshot done: handle=%s", handle)
155
156 else:
157 provider = get_provider(job_type)
158 if provider is None:
159 async with AsyncSessionLocal() as session:
160 await fail_job(session, job_id, f"unknown job_type: {job_type}")
161 await session.commit()
162 return True
163
164 ref = str(payload.get("head", ""))
165 async with AsyncSessionLocal() as session:
166 results = await provider.compute(session, repo_id, ref, payload)
167 if results:
168 await persist_intel_results(session, repo_id, ref, results)
169 await session.commit()
170 logger.info(
171 "✅ %s done: repo=%s ref=%s types=[%s]",
172 job_type,
173 repo_id,
174 ref if ref else "—",
175 ", ".join(t for t, _ in results),
176 )
177
178 async with AsyncSessionLocal() as session:
179 await complete_job(session, job_id)
180 await session.commit()
181
182 except Exception:
183 err = traceback.format_exc()
184 logger.error("Job failed: type=%s repo=%s\n%s", job_type, repo_id, err)
185 async with AsyncSessionLocal() as session:
186 await fail_job(session, job_id, err)
187 await session.commit()
188
189 return True
190
191 async def run() -> None:
192 """Poll the job queue until the process is interrupted."""
193 logging.basicConfig(
194 level=logging.INFO,
195 format='{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s"}',
196 )
197 from musehub.db.database import init_db
198 await init_db()
199 logger.info("✅ MuseHub worker started (poll_interval=%.1fs)", _POLL_INTERVAL)
200
201 jobs_processed = 0
202 while True:
203 try:
204 found = await _process_one()
205 except Exception:
206 logger.error("Worker poll error:\n%s", traceback.format_exc())
207 found = False
208
209 if _MAX_JOBS and jobs_processed >= _MAX_JOBS:
210 logger.info("Worker reached MAX_JOBS=%d, exiting.", _MAX_JOBS)
211 break
212
213 if found:
214 jobs_processed += 1
215 else:
216 await asyncio.sleep(_POLL_INTERVAL)
217
218 if __name__ == "__main__":
219 asyncio.run(run())
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 9 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 12 days ago