#!/usr/bin/env python3 """Enqueue mpack.index jobs for all mpacks with unindexed byte ranges. Processes one mpack at a time, waiting for the worker to complete each job before moving to the next. This lets you verify each batch is working before proceeding. Usage: docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py --dry-run docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py --poll-interval 10 Options: --dry-run Print what would be enqueued without creating jobs. --poll-interval Seconds between job completion polls (default: 5). --limit Max mpacks to process (default: unlimited). """ from __future__ import annotations import argparse import asyncio import logging import sys import time logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger(__name__) async def _get_unindexed_mpacks(engine, repo_map: dict) -> list[tuple[str, str]]: """Return [(mpack_id, repo_id)] for mpacks with any unindexed object. Uses a LEFT JOIN so mpacks without object_refs still appear (repo_id may be null). Falls back to any repo_id that references ANY object in the mpack. """ from sqlalchemy import text async with engine.connect() as conn: rows = (await conn.execute(text(""" SELECT DISTINCT ON (mi.mpack_id) mi.mpack_id, COALESCE(mor.repo_id, '') as repo_id FROM musehub_mpack_index mi LEFT JOIN musehub_object_refs mor ON mor.object_id = mi.entity_id WHERE mi.entity_type = 'object' AND mi.byte_offset IS NULL ORDER BY mi.mpack_id, mor.repo_id NULLS LAST """))).fetchall() return [(r[0], r[1]) for r in rows] async def _enqueue_one(engine, mpack_id: str, repo_id: str) -> str: """Insert a pending mpack.index job and return job_id.""" import json from sqlalchemy import text from musehub.core.genesis import compute_job_id from datetime import datetime, timezone now = datetime.now(tz=timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", f"{mpack_id}{now.isoformat()}") payload_json = json.dumps({"mpack_key": mpack_id}) async with engine.begin() as conn: await conn.execute(text(""" INSERT INTO musehub_background_jobs (job_id, repo_id, job_type, payload, status, created_at, attempt) VALUES (:job_id, :repo_id, 'mpack.index', cast(:payload as jsonb), 'pending', :now, 0) ON CONFLICT DO NOTHING """), { "job_id": job_id, "repo_id": repo_id, "payload": payload_json, "now": now, }) return job_id async def _wait_for_job(engine, job_id: str, poll_interval: int) -> str: """Poll until job is done/failed. Returns final status.""" from sqlalchemy import text while True: async with engine.connect() as conn: row = (await conn.execute( text("SELECT status, error FROM musehub_background_jobs WHERE job_id = :jid"), {"jid": job_id}, )).fetchone() if row is None: return "missing" status = row[0] if status in ("done", "failed", "quarantined"): if status == "failed": log.error(" ❌ job %s failed: %s", job_id[:16], (row[1] or "")[:120]) return status await asyncio.sleep(poll_interval) async def _byte_range_count(engine, mpack_id: str) -> tuple[int, int]: """Return (total_objects, objects_with_byte_range) for a mpack.""" from sqlalchemy import text async with engine.connect() as conn: row = (await conn.execute(text(""" SELECT COUNT(*), COUNT(byte_offset) FROM musehub_mpack_index WHERE mpack_id = :mid AND entity_type = 'object' """), {"mid": mpack_id})).fetchone() return (row[0], row[1]) if row else (0, 0) async def main(dry_run: bool, poll_interval: int, limit: int) -> None: from sqlalchemy.ext.asyncio import create_async_engine from musehub.config import settings engine = create_async_engine(settings.database_url, echo=False) log.info("Scanning for mpacks with unindexed byte ranges...") mpacks = await _get_unindexed_mpacks(engine, {}) log.info("Found %d mpacks to index", len(mpacks)) if dry_run: for mpack_id, repo_id in mpacks[:20]: total, with_range = await _byte_range_count(engine, mpack_id) log.info(" DRY-RUN mpack=%s repo=%s objects=%d indexed=%d", mpack_id[:20], repo_id[:16], total, with_range) if len(mpacks) > 20: log.info(" ... and %d more", len(mpacks) - 20) return if limit: mpacks = mpacks[:limit] log.info("Processing %d mpacks (limited)", len(mpacks)) done = failed = skipped = 0 for i, (mpack_id, repo_id) in enumerate(mpacks, 1): total, with_range = await _byte_range_count(engine, mpack_id) if with_range == total and total > 0: log.info("[%d/%d] SKIP mpack=%s all %d already indexed", i, len(mpacks), mpack_id[:20], total) skipped += 1 continue if not repo_id: log.warning("[%d/%d] SKIP mpack=%s — no repo_id (orphaned, no object_refs)", i, len(mpacks), mpack_id[:20]) skipped += 1 continue log.info("[%d/%d] ENQUEUE mpack=%s repo=%s objects=%d/%d indexed", i, len(mpacks), mpack_id[:20], repo_id[:16], with_range, total) job_id = await _enqueue_one(engine, mpack_id, repo_id) log.info(" job_id=%s — waiting for worker...", job_id[:16]) status = await _wait_for_job(engine, job_id, poll_interval) total2, with_range2 = await _byte_range_count(engine, mpack_id) if status == "done": log.info(" ✅ done byte_ranges=%d/%d", with_range2, total2) done += 1 elif status == "failed": # Check if it failed because the mpack is missing from storage (expected for old/GCed mpacks) from sqlalchemy import text as _text async with engine.connect() as _conn: _err = (_await_conn := await _conn.execute( _text("SELECT error FROM musehub_background_jobs WHERE job_id = :jid"), {"jid": job_id}, )).scalar() if _err and "not found in storage" in _err: log.warning(" ⚠️ mpack gone from storage (GCed?) — skipping byte_ranges=%d/%d", with_range2, total2) skipped += 1 else: log.error(" ❌ status=%s byte_ranges=%d/%d", status, with_range2, total2) failed += 1 else: log.error(" ❌ status=%s byte_ranges=%d/%d", status, with_range2, total2) failed += 1 log.info("=== BACKFILL COMPLETE ===") log.info("done=%d failed=%d skipped=%d total=%d", done, failed, skipped, len(mpacks)) if failed: sys.exit(1) if __name__ == "__main__": parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--poll-interval", type=int, default=5, metavar="N") parser.add_argument("--limit", type=int, default=0, metavar="N") args = parser.parse_args() asyncio.run(main(dry_run=args.dry_run, poll_interval=args.poll_interval, limit=args.limit))