gabriel / musehub public
enqueue_mpack_index_backfill.py python
184 lines 7.5 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 #!/usr/bin/env python3
2 """Enqueue mpack.index jobs for all mpacks with unindexed byte ranges.
3
4 Processes one mpack at a time, waiting for the worker to complete each job
5 before moving to the next. This lets you verify each batch is working before
6 proceeding.
7
8 Usage:
9 docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py
10 docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py --dry-run
11 docker exec musehub python3 /app/deploy/enqueue_mpack_index_backfill.py --poll-interval 10
12
13 Options:
14 --dry-run Print what would be enqueued without creating jobs.
15 --poll-interval Seconds between job completion polls (default: 5).
16 --limit Max mpacks to process (default: unlimited).
17 """
18 from __future__ import annotations
19
20 import argparse
21 import asyncio
22 import logging
23 import sys
24 import time
25
26 logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
27 log = logging.getLogger(__name__)
28
29
30 async def _get_unindexed_mpacks(engine, repo_map: dict) -> list[tuple[str, str]]:
31 """Return [(mpack_id, repo_id)] for mpacks with any unindexed object.
32
33 Uses a LEFT JOIN so mpacks without object_refs still appear (repo_id may be null).
34 Falls back to any repo_id that references ANY object in the mpack.
35 """
36 from sqlalchemy import text
37 async with engine.connect() as conn:
38 rows = (await conn.execute(text("""
39 SELECT DISTINCT ON (mi.mpack_id) mi.mpack_id, COALESCE(mor.repo_id, '') as repo_id
40 FROM musehub_mpack_index mi
41 LEFT JOIN musehub_object_refs mor ON mor.object_id = mi.entity_id
42 WHERE mi.entity_type = 'object'
43 AND mi.byte_offset IS NULL
44 ORDER BY mi.mpack_id, mor.repo_id NULLS LAST
45 """))).fetchall()
46 return [(r[0], r[1]) for r in rows]
47
48
49 async def _enqueue_one(engine, mpack_id: str, repo_id: str) -> str:
50 """Insert a pending mpack.index job and return job_id."""
51 import json
52 from sqlalchemy import text
53 from musehub.core.genesis import compute_job_id
54 from datetime import datetime, timezone
55 now = datetime.now(tz=timezone.utc)
56 job_id = compute_job_id(repo_id, "mpack.index", f"{mpack_id}{now.isoformat()}")
57 payload_json = json.dumps({"mpack_key": mpack_id})
58 async with engine.begin() as conn:
59 await conn.execute(text("""
60 INSERT INTO musehub_background_jobs
61 (job_id, repo_id, job_type, payload, status, created_at, attempt)
62 VALUES (:job_id, :repo_id, 'mpack.index',
63 cast(:payload as jsonb), 'pending', :now, 0)
64 ON CONFLICT DO NOTHING
65 """), {
66 "job_id": job_id,
67 "repo_id": repo_id,
68 "payload": payload_json,
69 "now": now,
70 })
71 return job_id
72
73
74 async def _wait_for_job(engine, job_id: str, poll_interval: int) -> str:
75 """Poll until job is done/failed. Returns final status."""
76 from sqlalchemy import text
77 while True:
78 async with engine.connect() as conn:
79 row = (await conn.execute(
80 text("SELECT status, error FROM musehub_background_jobs WHERE job_id = :jid"),
81 {"jid": job_id},
82 )).fetchone()
83 if row is None:
84 return "missing"
85 status = row[0]
86 if status in ("done", "failed", "quarantined"):
87 if status == "failed":
88 log.error(" ❌ job %s failed: %s", job_id[:16], (row[1] or "")[:120])
89 return status
90 await asyncio.sleep(poll_interval)
91
92
93 async def _byte_range_count(engine, mpack_id: str) -> tuple[int, int]:
94 """Return (total_objects, objects_with_byte_range) for a mpack."""
95 from sqlalchemy import text
96 async with engine.connect() as conn:
97 row = (await conn.execute(text("""
98 SELECT COUNT(*), COUNT(byte_offset)
99 FROM musehub_mpack_index
100 WHERE mpack_id = :mid AND entity_type = 'object'
101 """), {"mid": mpack_id})).fetchone()
102 return (row[0], row[1]) if row else (0, 0)
103
104
105 async def main(dry_run: bool, poll_interval: int, limit: int) -> None:
106 from sqlalchemy.ext.asyncio import create_async_engine
107 from musehub.config import settings
108
109 engine = create_async_engine(settings.database_url, echo=False)
110
111 log.info("Scanning for mpacks with unindexed byte ranges...")
112 mpacks = await _get_unindexed_mpacks(engine, {})
113 log.info("Found %d mpacks to index", len(mpacks))
114
115 if dry_run:
116 for mpack_id, repo_id in mpacks[:20]:
117 total, with_range = await _byte_range_count(engine, mpack_id)
118 log.info(" DRY-RUN mpack=%s repo=%s objects=%d indexed=%d",
119 mpack_id[:20], repo_id[:16], total, with_range)
120 if len(mpacks) > 20:
121 log.info(" ... and %d more", len(mpacks) - 20)
122 return
123
124 if limit:
125 mpacks = mpacks[:limit]
126 log.info("Processing %d mpacks (limited)", len(mpacks))
127
128 done = failed = skipped = 0
129 for i, (mpack_id, repo_id) in enumerate(mpacks, 1):
130 total, with_range = await _byte_range_count(engine, mpack_id)
131 if with_range == total and total > 0:
132 log.info("[%d/%d] SKIP mpack=%s all %d already indexed",
133 i, len(mpacks), mpack_id[:20], total)
134 skipped += 1
135 continue
136
137 if not repo_id:
138 log.warning("[%d/%d] SKIP mpack=%s — no repo_id (orphaned, no object_refs)",
139 i, len(mpacks), mpack_id[:20])
140 skipped += 1
141 continue
142
143 log.info("[%d/%d] ENQUEUE mpack=%s repo=%s objects=%d/%d indexed",
144 i, len(mpacks), mpack_id[:20], repo_id[:16], with_range, total)
145 job_id = await _enqueue_one(engine, mpack_id, repo_id)
146 log.info(" job_id=%s — waiting for worker...", job_id[:16])
147
148 status = await _wait_for_job(engine, job_id, poll_interval)
149 total2, with_range2 = await _byte_range_count(engine, mpack_id)
150
151 if status == "done":
152 log.info(" ✅ done byte_ranges=%d/%d", with_range2, total2)
153 done += 1
154 elif status == "failed":
155 # Check if it failed because the mpack is missing from storage (expected for old/GCed mpacks)
156 from sqlalchemy import text as _text
157 async with engine.connect() as _conn:
158 _err = (_await_conn := await _conn.execute(
159 _text("SELECT error FROM musehub_background_jobs WHERE job_id = :jid"),
160 {"jid": job_id},
161 )).scalar()
162 if _err and "not found in storage" in _err:
163 log.warning(" ⚠️ mpack gone from storage (GCed?) — skipping byte_ranges=%d/%d", with_range2, total2)
164 skipped += 1
165 else:
166 log.error(" ❌ status=%s byte_ranges=%d/%d", status, with_range2, total2)
167 failed += 1
168 else:
169 log.error(" ❌ status=%s byte_ranges=%d/%d", status, with_range2, total2)
170 failed += 1
171
172 log.info("=== BACKFILL COMPLETE ===")
173 log.info("done=%d failed=%d skipped=%d total=%d", done, failed, skipped, len(mpacks))
174 if failed:
175 sys.exit(1)
176
177
178 if __name__ == "__main__":
179 parser = argparse.ArgumentParser(description=__doc__)
180 parser.add_argument("--dry-run", action="store_true")
181 parser.add_argument("--poll-interval", type=int, default=5, metavar="N")
182 parser.add_argument("--limit", type=int, default=0, metavar="N")
183 args = parser.parse_args()
184 asyncio.run(main(dry_run=args.dry_run, poll_interval=args.poll_interval, limit=args.limit))
File History 1 commit
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago