#!/usr/bin/env python3 """Repair orphaned musehub_objects DB records whose bytes are absent from storage. Run this when DB and storage have diverged (e.g. after a volume wipe or a failed migration). The script: 1. Reads all object_ids from musehub_objects. 2. Checks each one against the configured storage backend in parallel. 3. Deletes DB rows whose bytes are missing from storage. After a successful run, a normal ``muse push`` from any client that holds the missing objects will re-upload them — the filter endpoint will correctly report them as missing because their DB records are gone. Run inside Docker on the target environment: docker exec musehub-green python3 /app/deploy/repair_objects.py [--dry-run] [--batch 500] Options: --dry-run Print what would be deleted without touching the DB. --batch N Process N objects per DB page (default 500). --concurrency N Parallel storage checks per batch (default 64). """ from __future__ import annotations import argparse import asyncio import sys import sqlalchemy as sa from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from musehub.config import settings from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef, MusehubRepo from musehub.storage import get_backend async def repair(dry_run: bool, batch_size: int, concurrency: int) -> int: engine = create_async_engine(settings.database_url, echo=False) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) total_checked = 0 total_missing = 0 total_deleted = 0 errors = 0 async with async_session() as session: # Page through all objects to avoid loading millions of rows at once. # DISTINCT ON (object_id) picks one row per object. offset = 0 while True: rows = (await session.execute( sa.select( MusehubObject.object_id, MusehubRepo.owner, MusehubRepo.slug, ) .select_from(MusehubObject) .outerjoin( MusehubObjectRef, MusehubObjectRef.object_id == MusehubObject.object_id, ) .outerjoin( MusehubRepo, MusehubRepo.repo_id == MusehubObjectRef.repo_id, ) .where(MusehubObject.deleted_at.is_(None)) .distinct(MusehubObject.object_id) .order_by(MusehubObject.object_id) .offset(offset) .limit(batch_size) )).all() if not rows: break offset += len(rows) total_checked += len(rows) # Check storage existence in parallel, bounded by concurrency. sem = asyncio.Semaphore(concurrency) async def _check(oid: str, owner: str | None, slug: str | None) -> tuple[str, bool]: async with sem: backend = get_backend() try: exists = await backend.exists(oid) except Exception as exc: print(f" ERROR checking {oid}: {exc}", file=sys.stderr) return oid, True # assume present to avoid false deletion return oid, exists results = await asyncio.gather(*(_check(oid, owner, slug) for oid, owner, slug in rows)) orphaned = [oid for oid, exists in results if not exists] total_missing += len(orphaned) if not orphaned: print(f" batch offset={offset - len(rows)}: {len(rows)} checked, 0 orphaned") continue print(f" batch offset={offset - len(rows)}: {len(rows)} checked, {len(orphaned)} orphaned") for oid in orphaned: print(f" orphaned: {oid}") if dry_run: continue try: await session.execute( sa.delete(MusehubObject).where( MusehubObject.object_id.in_(orphaned) ) ) await session.commit() total_deleted += len(orphaned) except Exception as exc: print(f" ERROR deleting batch: {exc}", file=sys.stderr) await session.rollback() errors += len(orphaned) prefix = "[dry-run] " if dry_run else "" print( f"\n{prefix}Repair complete: " f"{total_checked} checked, " f"{total_missing} orphaned, " f"{total_deleted} deleted, " f"{errors} errors" ) if not dry_run and total_deleted > 0: print( f"\nNext step: muse push from any client that holds " f"the missing objects. The filter endpoint will now report all " f"{total_deleted} deleted objects as missing and the client will re-upload them." ) return errors def main() -> None: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--dry-run", action="store_true", help="Print orphaned records without deleting") parser.add_argument("--batch", type=int, default=500, metavar="N", help="DB page size (default 500)") parser.add_argument("--concurrency", type=int, default=64, metavar="N", help="Parallel storage checks (default 64)") args = parser.parse_args() errors = asyncio.run(repair(dry_run=args.dry_run, batch_size=args.batch, concurrency=args.concurrency)) sys.exit(1 if errors else 0) if __name__ == "__main__": main()