worker.py
python
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
| 1 | """MuseHub background worker — processes jobs from musehub_background_jobs. |
| 2 | |
| 3 | Run as a separate process (separate Docker service) so that memory-intensive |
| 4 | jobs (intel indexing, GC) cannot OOM-crash the web server. |
| 5 | |
| 6 | Usage: |
| 7 | python -m musehub.worker |
| 8 | |
| 9 | Environment: |
| 10 | DATABASE_URL — async SQLAlchemy URL (postgresql+asyncpg://...) |
| 11 | WORKER_POLL_INTERVAL — seconds between poll cycles (default: 1.0) |
| 12 | WORKER_MAX_JOBS — max jobs to process before exiting (default: unlimited, 0) |
| 13 | |
| 14 | Architecture |
| 15 | ------------ |
| 16 | Job handlers are registered in ``musehub.services.musehub_intel_providers``. |
| 17 | Each ``IntelProvider`` is keyed by job_type string and returns a list of |
| 18 | ``(intel_type, data_dict)`` tuples. The worker persists those results via |
| 19 | ``persist_intel_results`` and then marks the job done. |
| 20 | |
| 21 | Adding a new domain: |
| 22 | 1. Implement ``IntelProvider`` in musehub_intel_providers.py |
| 23 | 2. Add it to ``_PROVIDER_REGISTRY`` there |
| 24 | 3. Update ``job_types_for_push`` to enqueue it for the right domain |
| 25 | |
| 26 | No changes to this file are needed for new domains. |
| 27 | |
| 28 | Stale claim recovery |
| 29 | -------------------- |
| 30 | On every poll cycle the worker resets any ``running`` jobs whose |
| 31 | ``claimed_at`` is older than ``_STALE_CLAIM_MINUTES``. This handles the |
| 32 | case where a previous worker process crashed mid-job and left a row stuck |
| 33 | in the ``running`` state. |
| 34 | """ |
| 35 | |
| 36 | import asyncio |
| 37 | import logging |
| 38 | import os |
| 39 | import traceback |
| 40 | |
| 41 | |
| 42 | # Force DEBUG=false before any musehub imports so SQLAlchemy engine is |
| 43 | # created with echo=False regardless of the container's DEBUG env var. |
| 44 | os.environ.setdefault("DEBUG", "false") |
| 45 | os.environ["DEBUG"] = "false" |
| 46 | |
| 47 | logger = logging.getLogger(__name__) |
| 48 | |
| 49 | _POLL_INTERVAL: float = float(os.environ.get("WORKER_POLL_INTERVAL", "1.0")) |
| 50 | _MAX_JOBS: int = int(os.environ.get("WORKER_MAX_JOBS", "0")) # 0 = unlimited |
| 51 | |
| 52 | # --------------------------------------------------------------------------- |
| 53 | # GC handler (not an IntelProvider — no intel results) |
| 54 | # --------------------------------------------------------------------------- |
| 55 | |
| 56 | async def _handle_gc(session: "AsyncSession", repo_id: str) -> None: |
| 57 | from musehub.services.musehub_gc import run_gc |
| 58 | result = await run_gc(session, repo_id) |
| 59 | if result.commits_deleted: |
| 60 | logger.info( |
| 61 | "✅ gc done: repo=%s pruned %d commits, %d snapshots", |
| 62 | repo_id, result.commits_deleted, result.snapshots_deleted, |
| 63 | ) |
| 64 | await session.commit() |
| 65 | |
| 66 | # --------------------------------------------------------------------------- |
| 67 | # Poll loop |
| 68 | # --------------------------------------------------------------------------- |
| 69 | |
| 70 | async def _process_one() -> bool: |
| 71 | """Claim and execute one job. Returns True if a job was found.""" |
| 72 | from sqlalchemy.ext.asyncio import AsyncSession |
| 73 | from musehub.db.database import AsyncSessionLocal |
| 74 | from musehub.services.musehub_jobs import ( |
| 75 | claim_next_job, |
| 76 | complete_job, |
| 77 | fail_job, |
| 78 | reclaim_stale_jobs, |
| 79 | ) |
| 80 | from musehub.services.musehub_intel_providers import ( |
| 81 | get_provider, |
| 82 | persist_intel_results, |
| 83 | ) |
| 84 | |
| 85 | # Reset stale running jobs before claiming a new one. |
| 86 | async with AsyncSessionLocal() as session: |
| 87 | n_reclaimed = await reclaim_stale_jobs(session) |
| 88 | if n_reclaimed: |
| 89 | await session.commit() |
| 90 | |
| 91 | async with AsyncSessionLocal() as session: |
| 92 | job = await claim_next_job(session) |
| 93 | if job is None: |
| 94 | return False |
| 95 | await session.commit() |
| 96 | |
| 97 | job_id = job.job_id |
| 98 | job_type = job.job_type |
| 99 | repo_id = job.repo_id |
| 100 | payload = job.payload or {} |
| 101 | |
| 102 | try: |
| 103 | if job_type == "gc": |
| 104 | async with AsyncSessionLocal() as session: |
| 105 | await _handle_gc(session, repo_id) |
| 106 | |
| 107 | elif job_type == "push.file_last_commits": |
| 108 | async with AsyncSessionLocal() as session: |
| 109 | from musehub.services.file_last_commits import compute_and_store_file_last_commits |
| 110 | branch = str(payload.get("branch", "")) |
| 111 | head_commit_id = str(payload.get("head", "")) |
| 112 | if branch and head_commit_id: |
| 113 | await compute_and_store_file_last_commits(session, repo_id, branch, head_commit_id) |
| 114 | await session.commit() |
| 115 | logger.info("✅ push.file_last_commits done: repo=%s branch=%s", repo_id, branch) |
| 116 | |
| 117 | elif job_type == "content.scan": |
| 118 | # Stub — always passes clean until a real CSAM API is configured. |
| 119 | logger.debug("content.scan: stub pass for job=%s repo=%s", job_id, repo_id) |
| 120 | |
| 121 | elif job_type == "mpack.index": |
| 122 | async with AsyncSessionLocal() as session: |
| 123 | from musehub.services.musehub_wire import process_mpack_index_job |
| 124 | result = await process_mpack_index_job(session, job_id) |
| 125 | await session.commit() |
| 126 | logger.info( |
| 127 | "✅ mpack.index done: job=%s indexed=%d byte_ranges=%d elapsed=%.1fms", |
| 128 | job_id[:16], result.get("mpack_index_written", 0), |
| 129 | result.get("byte_ranges_computed", 0), result.get("elapsed_ms", 0), |
| 130 | ) |
| 131 | |
| 132 | elif job_type == "mpack.gc": |
| 133 | async with AsyncSessionLocal() as session: |
| 134 | from musehub.services.musehub_wire import process_mpack_gc_job |
| 135 | result = await process_mpack_gc_job(session, repo_id) |
| 136 | await session.commit() |
| 137 | if result["skipped"]: |
| 138 | logger.info("✅ mpack.gc done (skipped): repo=%s mpacks=%d", repo_id, result["packs_before"]) |
| 139 | else: |
| 140 | logger.info( |
| 141 | "✅ mpack.gc done: repo=%s mpacks %d→%d key=%s", |
| 142 | repo_id, result["packs_before"], result["packs_after"], result["consolidated_key"], |
| 143 | ) |
| 144 | |
| 145 | elif job_type == "profile.snapshot": |
| 146 | # Profile snapshot jobs write directly to musehub_profile_snapshots |
| 147 | # (not through persist_intel_results) and need their own commit. |
| 148 | async with AsyncSessionLocal() as session: |
| 149 | from musehub.services.musehub_intel_providers import _compute_and_persist_profile_snapshot |
| 150 | handle = str(payload.get("handle", "")) |
| 151 | if handle: |
| 152 | await _compute_and_persist_profile_snapshot(session, handle) |
| 153 | await session.commit() |
| 154 | logger.info("✅ profile.snapshot done: handle=%s", handle) |
| 155 | |
| 156 | else: |
| 157 | provider = get_provider(job_type) |
| 158 | if provider is None: |
| 159 | async with AsyncSessionLocal() as session: |
| 160 | await fail_job(session, job_id, f"unknown job_type: {job_type}") |
| 161 | await session.commit() |
| 162 | return True |
| 163 | |
| 164 | ref = str(payload.get("head", "")) |
| 165 | async with AsyncSessionLocal() as session: |
| 166 | results = await provider.compute(session, repo_id, ref, payload) |
| 167 | if results: |
| 168 | await persist_intel_results(session, repo_id, ref, results) |
| 169 | await session.commit() |
| 170 | logger.info( |
| 171 | "✅ %s done: repo=%s ref=%s types=[%s]", |
| 172 | job_type, |
| 173 | repo_id, |
| 174 | ref if ref else "—", |
| 175 | ", ".join(t for t, _ in results), |
| 176 | ) |
| 177 | |
| 178 | async with AsyncSessionLocal() as session: |
| 179 | await complete_job(session, job_id) |
| 180 | await session.commit() |
| 181 | |
| 182 | except Exception: |
| 183 | err = traceback.format_exc() |
| 184 | logger.error("Job failed: type=%s repo=%s\n%s", job_type, repo_id, err) |
| 185 | async with AsyncSessionLocal() as session: |
| 186 | await fail_job(session, job_id, err) |
| 187 | await session.commit() |
| 188 | |
| 189 | return True |
| 190 | |
| 191 | async def run() -> None: |
| 192 | """Poll the job queue until the process is interrupted.""" |
| 193 | logging.basicConfig( |
| 194 | level=logging.INFO, |
| 195 | format='{"time": "%(asctime)s", "level": "%(levelname)s", "msg": "%(message)s"}', |
| 196 | ) |
| 197 | from musehub.db.database import init_db |
| 198 | await init_db() |
| 199 | logger.info("✅ MuseHub worker started (poll_interval=%.1fs)", _POLL_INTERVAL) |
| 200 | |
| 201 | jobs_processed = 0 |
| 202 | while True: |
| 203 | try: |
| 204 | found = await _process_one() |
| 205 | except Exception: |
| 206 | logger.error("Worker poll error:\n%s", traceback.format_exc()) |
| 207 | found = False |
| 208 | |
| 209 | if _MAX_JOBS and jobs_processed >= _MAX_JOBS: |
| 210 | logger.info("Worker reached MAX_JOBS=%d, exiting.", _MAX_JOBS) |
| 211 | break |
| 212 | |
| 213 | if found: |
| 214 | jobs_processed += 1 |
| 215 | else: |
| 216 | await asyncio.sleep(_POLL_INTERVAL) |
| 217 | |
| 218 | if __name__ == "__main__": |
| 219 | asyncio.run(run()) |
File History
3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f
fix: use wire_bytes not mpack_bytes_raw in compute_object_b…
Sonnet 4.6
patch
9 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d
chore: doc sweep, ignore wrangler build state, misc fixes
Sonnet 4.6
minor
⚠
12 days ago