enqueue_mpack_index_backfill.py
python
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
| 1 | #!/usr/bin/env python3 |
| 2 | """Enqueue mpack.index jobs for all mpacks with unindexed byte ranges. |
| 3 | |
| 4 | Processes one mpack at a time, waiting for the worker to complete each job |
| 5 | before moving to the next. This lets you verify each batch is working before |
| 6 | proceeding. |
| 7 | |
| 8 | Usage: |
| 9 | docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py |
| 10 | docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py --dry-run |
| 11 | docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py --poll-interval 10 |
| 12 | |
| 13 | Options: |
| 14 | --dry-run Print what would be enqueued without creating jobs. |
| 15 | --poll-interval Seconds between job completion polls (default: 5). |
| 16 | --limit Max mpacks to process (default: unlimited). |
| 17 | """ |
| 18 | from __future__ import annotations |
| 19 | |
| 20 | import argparse |
| 21 | import asyncio |
| 22 | import logging |
| 23 | import sys |
| 24 | import time |
| 25 | |
| 26 | logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") |
| 27 | log = logging.getLogger(__name__) |
| 28 | |
| 29 | |
| 30 | async def _get_unindexed_mpacks(engine, repo_map: dict) -> list[tuple[str, str]]: |
| 31 | """Return [(mpack_id, repo_id)] for mpacks with any unindexed object. |
| 32 | |
| 33 | Uses a LEFT JOIN so mpacks without object_refs still appear (repo_id may be null). |
| 34 | Falls back to any repo_id that references ANY object in the mpack. |
| 35 | """ |
| 36 | from sqlalchemy import text |
| 37 | async with engine.connect() as conn: |
| 38 | rows = (await conn.execute(text(""" |
| 39 | SELECT DISTINCT ON (mi.mpack_id) mi.mpack_id, COALESCE(mor.repo_id, '') as repo_id |
| 40 | FROM musehub_mpack_index mi |
| 41 | LEFT JOIN musehub_object_refs mor ON mor.object_id = mi.entity_id |
| 42 | WHERE mi.entity_type = 'object' |
| 43 | AND mi.byte_offset IS NULL |
| 44 | ORDER BY mi.mpack_id, mor.repo_id NULLS LAST |
| 45 | """))).fetchall() |
| 46 | return [(r[0], r[1]) for r in rows] |
| 47 | |
| 48 | |
| 49 | async def _enqueue_one(engine, mpack_id: str, repo_id: str) -> str: |
| 50 | """Insert a pending mpack.index job and return job_id.""" |
| 51 | import json |
| 52 | from sqlalchemy import text |
| 53 | from musehub.core.genesis import compute_job_id |
| 54 | from datetime import datetime, timezone |
| 55 | now = datetime.now(tz=timezone.utc) |
| 56 | job_id = compute_job_id(repo_id, "mpack.index", f"{mpack_id}{now.isoformat()}") |
| 57 | payload_json = json.dumps({"mpack_key": mpack_id}) |
| 58 | async with engine.begin() as conn: |
| 59 | await conn.execute(text(""" |
| 60 | INSERT INTO musehub_background_jobs |
| 61 | (job_id, repo_id, job_type, payload, status, created_at, attempt) |
| 62 | VALUES (:job_id, :repo_id, 'mpack.index', |
| 63 | cast(:payload as jsonb), 'pending', :now, 0) |
| 64 | ON CONFLICT DO NOTHING |
| 65 | """), { |
| 66 | "job_id": job_id, |
| 67 | "repo_id": repo_id, |
| 68 | "payload": payload_json, |
| 69 | "now": now, |
| 70 | }) |
| 71 | return job_id |
| 72 | |
| 73 | |
| 74 | async def _wait_for_job(engine, job_id: str, poll_interval: int) -> str: |
| 75 | """Poll until job is done/failed. Returns final status.""" |
| 76 | from sqlalchemy import text |
| 77 | while True: |
| 78 | async with engine.connect() as conn: |
| 79 | row = (await conn.execute( |
| 80 | text("SELECT status, error FROM musehub_background_jobs WHERE job_id = :jid"), |
| 81 | {"jid": job_id}, |
| 82 | )).fetchone() |
| 83 | if row is None: |
| 84 | return "missing" |
| 85 | status = row[0] |
| 86 | if status in ("done", "failed", "quarantined"): |
| 87 | if status == "failed": |
| 88 | log.error(" ❌ job %s failed: %s", job_id[:16], (row[1] or "")[:120]) |
| 89 | return status |
| 90 | await asyncio.sleep(poll_interval) |
| 91 | |
| 92 | |
| 93 | async def _byte_range_count(engine, mpack_id: str) -> tuple[int, int]: |
| 94 | """Return (total_objects, objects_with_byte_range) for a mpack.""" |
| 95 | from sqlalchemy import text |
| 96 | async with engine.connect() as conn: |
| 97 | row = (await conn.execute(text(""" |
| 98 | SELECT COUNT(*), COUNT(byte_offset) |
| 99 | FROM musehub_mpack_index |
| 100 | WHERE mpack_id = :mid AND entity_type = 'object' |
| 101 | """), {"mid": mpack_id})).fetchone() |
| 102 | return (row[0], row[1]) if row else (0, 0) |
| 103 | |
| 104 | |
| 105 | async def main(dry_run: bool, poll_interval: int, limit: int) -> None: |
| 106 | from sqlalchemy.ext.asyncio import create_async_engine |
| 107 | from musehub.config import settings |
| 108 | |
| 109 | engine = create_async_engine(settings.database_url, echo=False) |
| 110 | |
| 111 | log.info("Scanning for mpacks with unindexed byte ranges...") |
| 112 | mpacks = await _get_unindexed_mpacks(engine, {}) |
| 113 | log.info("Found %d mpacks to index", len(mpacks)) |
| 114 | |
| 115 | if dry_run: |
| 116 | for mpack_id, repo_id in mpacks[:20]: |
| 117 | total, with_range = await _byte_range_count(engine, mpack_id) |
| 118 | log.info(" DRY-RUN mpack=%s repo=%s objects=%d indexed=%d", |
| 119 | mpack_id[:20], repo_id[:16], total, with_range) |
| 120 | if len(mpacks) > 20: |
| 121 | log.info(" ... and %d more", len(mpacks) - 20) |
| 122 | return |
| 123 | |
| 124 | if limit: |
| 125 | mpacks = mpacks[:limit] |
| 126 | log.info("Processing %d mpacks (limited)", len(mpacks)) |
| 127 | |
| 128 | done = failed = skipped = 0 |
| 129 | for i, (mpack_id, repo_id) in enumerate(mpacks, 1): |
| 130 | total, with_range = await _byte_range_count(engine, mpack_id) |
| 131 | if with_range == total and total > 0: |
| 132 | log.info("[%d/%d] SKIP mpack=%s all %d already indexed", |
| 133 | i, len(mpacks), mpack_id[:20], total) |
| 134 | skipped += 1 |
| 135 | continue |
| 136 | |
| 137 | if not repo_id: |
| 138 | log.warning("[%d/%d] SKIP mpack=%s — no repo_id (orphaned, no object_refs)", |
| 139 | i, len(mpacks), mpack_id[:20]) |
| 140 | skipped += 1 |
| 141 | continue |
| 142 | |
| 143 | log.info("[%d/%d] ENQUEUE mpack=%s repo=%s objects=%d/%d indexed", |
| 144 | i, len(mpacks), mpack_id[:20], repo_id[:16], with_range, total) |
| 145 | job_id = await _enqueue_one(engine, mpack_id, repo_id) |
| 146 | log.info(" job_id=%s — waiting for worker...", job_id[:16]) |
| 147 | |
| 148 | status = await _wait_for_job(engine, job_id, poll_interval) |
| 149 | total2, with_range2 = await _byte_range_count(engine, mpack_id) |
| 150 | |
| 151 | if status == "done": |
| 152 | log.info(" ✅ done byte_ranges=%d/%d", with_range2, total2) |
| 153 | done += 1 |
| 154 | elif status == "failed": |
| 155 | # Check if it failed because the mpack is missing from storage (expected for old/GCed mpacks) |
| 156 | from sqlalchemy import text as _text |
| 157 | async with engine.connect() as _conn: |
| 158 | _err = (_await_conn := await _conn.execute( |
| 159 | _text("SELECT error FROM musehub_background_jobs WHERE job_id = :jid"), |
| 160 | {"jid": job_id}, |
| 161 | )).scalar() |
| 162 | if _err and "not found in storage" in _err: |
| 163 | log.warning(" ⚠️ mpack gone from storage (GCed?) — skipping byte_ranges=%d/%d", with_range2, total2) |
| 164 | skipped += 1 |
| 165 | else: |
| 166 | log.error(" ❌ status=%s byte_ranges=%d/%d", status, with_range2, total2) |
| 167 | failed += 1 |
| 168 | else: |
| 169 | log.error(" ❌ status=%s byte_ranges=%d/%d", status, with_range2, total2) |
| 170 | failed += 1 |
| 171 | |
| 172 | log.info("=== BACKFILL COMPLETE ===") |
| 173 | log.info("done=%d failed=%d skipped=%d total=%d", done, failed, skipped, len(mpacks)) |
| 174 | if failed: |
| 175 | sys.exit(1) |
| 176 | |
| 177 | |
| 178 | if __name__ == "__main__": |
| 179 | parser = argparse.ArgumentParser(description=__doc__) |
| 180 | parser.add_argument("--dry-run", action="store_true") |
| 181 | parser.add_argument("--poll-interval", type=int, default=5, metavar="N") |
| 182 | parser.add_argument("--limit", type=int, default=0, metavar="N") |
| 183 | args = parser.parse_args() |
| 184 | asyncio.run(main(dry_run=args.dry_run, poll_interval=args.poll_interval, limit=args.limit)) |
File History
1 commit
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago