repair_objects.py
python
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
6 days ago
| 1 | #!/usr/bin/env python3 |
| 2 | """Repair orphaned musehub_objects DB records whose bytes are absent from storage. |
| 3 | |
| 4 | Run this when DB and storage have diverged (e.g. after a volume wipe or a failed |
| 5 | migration). The script: |
| 6 | |
| 7 | 1. Reads all object_ids from musehub_objects. |
| 8 | 2. Checks each one against the configured storage backend in parallel. |
| 9 | 3. Deletes DB rows whose bytes are missing from storage. |
| 10 | |
| 11 | After a successful run, a normal ``muse push`` from any client that holds the |
| 12 | missing objects will re-upload them — the filter endpoint will correctly report |
| 13 | them as missing because their DB records are gone. |
| 14 | |
| 15 | Run inside Docker on the target environment: |
| 16 | |
| 17 | docker exec musehub-green python3 /app/deploy/repair_objects.py [--dry-run] [--batch 500] |
| 18 | |
| 19 | Options: |
| 20 | --dry-run Print what would be deleted without touching the DB. |
| 21 | --batch N Process N objects per DB page (default 500). |
| 22 | --concurrency N Parallel storage checks per batch (default 64). |
| 23 | """ |
| 24 | from __future__ import annotations |
| 25 | |
| 26 | import argparse |
| 27 | import asyncio |
| 28 | import sys |
| 29 | |
| 30 | import sqlalchemy as sa |
| 31 | from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession |
| 32 | from sqlalchemy.orm import sessionmaker |
| 33 | |
| 34 | from musehub.config import settings |
| 35 | from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef, MusehubRepo |
| 36 | from musehub.storage import get_backend |
| 37 | |
| 38 | |
| 39 | async def repair(dry_run: bool, batch_size: int, concurrency: int) -> int: |
| 40 | engine = create_async_engine(settings.database_url, echo=False) |
| 41 | async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) |
| 42 | |
| 43 | total_checked = 0 |
| 44 | total_missing = 0 |
| 45 | total_deleted = 0 |
| 46 | errors = 0 |
| 47 | |
| 48 | async with async_session() as session: |
| 49 | # Page through all objects to avoid loading millions of rows at once. |
| 50 | # DISTINCT ON (object_id) picks one row per object. |
| 51 | offset = 0 |
| 52 | while True: |
| 53 | rows = (await session.execute( |
| 54 | sa.select( |
| 55 | MusehubObject.object_id, |
| 56 | MusehubRepo.owner, |
| 57 | MusehubRepo.slug, |
| 58 | ) |
| 59 | .select_from(MusehubObject) |
| 60 | .outerjoin( |
| 61 | MusehubObjectRef, |
| 62 | MusehubObjectRef.object_id == MusehubObject.object_id, |
| 63 | ) |
| 64 | .outerjoin( |
| 65 | MusehubRepo, |
| 66 | MusehubRepo.repo_id == MusehubObjectRef.repo_id, |
| 67 | ) |
| 68 | .where(MusehubObject.deleted_at.is_(None)) |
| 69 | .distinct(MusehubObject.object_id) |
| 70 | .order_by(MusehubObject.object_id) |
| 71 | .offset(offset) |
| 72 | .limit(batch_size) |
| 73 | )).all() |
| 74 | |
| 75 | if not rows: |
| 76 | break |
| 77 | |
| 78 | offset += len(rows) |
| 79 | total_checked += len(rows) |
| 80 | |
| 81 | # Check storage existence in parallel, bounded by concurrency. |
| 82 | sem = asyncio.Semaphore(concurrency) |
| 83 | |
| 84 | async def _check(oid: str, owner: str | None, slug: str | None) -> tuple[str, bool]: |
| 85 | async with sem: |
| 86 | backend = get_backend() |
| 87 | try: |
| 88 | exists = await backend.exists(oid) |
| 89 | except Exception as exc: |
| 90 | print(f" ERROR checking {oid}: {exc}", file=sys.stderr) |
| 91 | return oid, True # assume present to avoid false deletion |
| 92 | return oid, exists |
| 93 | |
| 94 | results = await asyncio.gather(*(_check(oid, owner, slug) for oid, owner, slug in rows)) |
| 95 | orphaned = [oid for oid, exists in results if not exists] |
| 96 | total_missing += len(orphaned) |
| 97 | |
| 98 | if not orphaned: |
| 99 | print(f" batch offset={offset - len(rows)}: {len(rows)} checked, 0 orphaned") |
| 100 | continue |
| 101 | |
| 102 | print(f" batch offset={offset - len(rows)}: {len(rows)} checked, {len(orphaned)} orphaned") |
| 103 | for oid in orphaned: |
| 104 | print(f" orphaned: {oid}") |
| 105 | |
| 106 | if dry_run: |
| 107 | continue |
| 108 | |
| 109 | try: |
| 110 | await session.execute( |
| 111 | sa.delete(MusehubObject).where( |
| 112 | MusehubObject.object_id.in_(orphaned) |
| 113 | ) |
| 114 | ) |
| 115 | await session.commit() |
| 116 | total_deleted += len(orphaned) |
| 117 | except Exception as exc: |
| 118 | print(f" ERROR deleting batch: {exc}", file=sys.stderr) |
| 119 | await session.rollback() |
| 120 | errors += len(orphaned) |
| 121 | |
| 122 | prefix = "[dry-run] " if dry_run else "" |
| 123 | print( |
| 124 | f"\n{prefix}Repair complete: " |
| 125 | f"{total_checked} checked, " |
| 126 | f"{total_missing} orphaned, " |
| 127 | f"{total_deleted} deleted, " |
| 128 | f"{errors} errors" |
| 129 | ) |
| 130 | if not dry_run and total_deleted > 0: |
| 131 | print( |
| 132 | f"\nNext step: muse push <remote> <branch> from any client that holds " |
| 133 | f"the missing objects. The filter endpoint will now report all " |
| 134 | f"{total_deleted} deleted objects as missing and the client will re-upload them." |
| 135 | ) |
| 136 | return errors |
| 137 | |
| 138 | |
| 139 | def main() -> None: |
| 140 | parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) |
| 141 | parser.add_argument("--dry-run", action="store_true", help="Print orphaned records without deleting") |
| 142 | parser.add_argument("--batch", type=int, default=500, metavar="N", help="DB page size (default 500)") |
| 143 | parser.add_argument("--concurrency", type=int, default=64, metavar="N", help="Parallel storage checks (default 64)") |
| 144 | args = parser.parse_args() |
| 145 | |
| 146 | errors = asyncio.run(repair(dry_run=args.dry_run, batch_size=args.batch, concurrency=args.concurrency)) |
| 147 | sys.exit(1 if errors else 0) |
| 148 | |
| 149 | |
| 150 | if __name__ == "__main__": |
| 151 | main() |
File History
1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
6 days ago