gabriel / musehub public
repair_objects.py python
151 lines 5.7 KB
Raw
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 6 days ago
1 #!/usr/bin/env python3
2 """Repair orphaned musehub_objects DB records whose bytes are absent from storage.
3
4 Run this when DB and storage have diverged (e.g. after a volume wipe or a failed
5 migration). The script:
6
7 1. Reads all object_ids from musehub_objects.
8 2. Checks each one against the configured storage backend in parallel.
9 3. Deletes DB rows whose bytes are missing from storage.
10
11 After a successful run, a normal ``muse push`` from any client that holds the
12 missing objects will re-upload them — the filter endpoint will correctly report
13 them as missing because their DB records are gone.
14
15 Run inside Docker on the target environment:
16
17 docker exec musehub-green python3 /app/deploy/repair_objects.py [--dry-run] [--batch 500]
18
19 Options:
20 --dry-run Print what would be deleted without touching the DB.
21 --batch N Process N objects per DB page (default 500).
22 --concurrency N Parallel storage checks per batch (default 64).
23 """
24 from __future__ import annotations
25
26 import argparse
27 import asyncio
28 import sys
29
30 import sqlalchemy as sa
31 from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
32 from sqlalchemy.orm import sessionmaker
33
34 from musehub.config import settings
35 from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef, MusehubRepo
36 from musehub.storage import get_backend
37
38
39 async def repair(dry_run: bool, batch_size: int, concurrency: int) -> int:
40 engine = create_async_engine(settings.database_url, echo=False)
41 async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
42
43 total_checked = 0
44 total_missing = 0
45 total_deleted = 0
46 errors = 0
47
48 async with async_session() as session:
49 # Page through all objects to avoid loading millions of rows at once.
50 # DISTINCT ON (object_id) picks one row per object.
51 offset = 0
52 while True:
53 rows = (await session.execute(
54 sa.select(
55 MusehubObject.object_id,
56 MusehubRepo.owner,
57 MusehubRepo.slug,
58 )
59 .select_from(MusehubObject)
60 .outerjoin(
61 MusehubObjectRef,
62 MusehubObjectRef.object_id == MusehubObject.object_id,
63 )
64 .outerjoin(
65 MusehubRepo,
66 MusehubRepo.repo_id == MusehubObjectRef.repo_id,
67 )
68 .where(MusehubObject.deleted_at.is_(None))
69 .distinct(MusehubObject.object_id)
70 .order_by(MusehubObject.object_id)
71 .offset(offset)
72 .limit(batch_size)
73 )).all()
74
75 if not rows:
76 break
77
78 offset += len(rows)
79 total_checked += len(rows)
80
81 # Check storage existence in parallel, bounded by concurrency.
82 sem = asyncio.Semaphore(concurrency)
83
84 async def _check(oid: str, owner: str | None, slug: str | None) -> tuple[str, bool]:
85 async with sem:
86 backend = get_backend()
87 try:
88 exists = await backend.exists(oid)
89 except Exception as exc:
90 print(f" ERROR checking {oid}: {exc}", file=sys.stderr)
91 return oid, True # assume present to avoid false deletion
92 return oid, exists
93
94 results = await asyncio.gather(*(_check(oid, owner, slug) for oid, owner, slug in rows))
95 orphaned = [oid for oid, exists in results if not exists]
96 total_missing += len(orphaned)
97
98 if not orphaned:
99 print(f" batch offset={offset - len(rows)}: {len(rows)} checked, 0 orphaned")
100 continue
101
102 print(f" batch offset={offset - len(rows)}: {len(rows)} checked, {len(orphaned)} orphaned")
103 for oid in orphaned:
104 print(f" orphaned: {oid}")
105
106 if dry_run:
107 continue
108
109 try:
110 await session.execute(
111 sa.delete(MusehubObject).where(
112 MusehubObject.object_id.in_(orphaned)
113 )
114 )
115 await session.commit()
116 total_deleted += len(orphaned)
117 except Exception as exc:
118 print(f" ERROR deleting batch: {exc}", file=sys.stderr)
119 await session.rollback()
120 errors += len(orphaned)
121
122 prefix = "[dry-run] " if dry_run else ""
123 print(
124 f"\n{prefix}Repair complete: "
125 f"{total_checked} checked, "
126 f"{total_missing} orphaned, "
127 f"{total_deleted} deleted, "
128 f"{errors} errors"
129 )
130 if not dry_run and total_deleted > 0:
131 print(
132 f"\nNext step: muse push <remote> <branch> from any client that holds "
133 f"the missing objects. The filter endpoint will now report all "
134 f"{total_deleted} deleted objects as missing and the client will re-upload them."
135 )
136 return errors
137
138
139 def main() -> None:
140 parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
141 parser.add_argument("--dry-run", action="store_true", help="Print orphaned records without deleting")
142 parser.add_argument("--batch", type=int, default=500, metavar="N", help="DB page size (default 500)")
143 parser.add_argument("--concurrency", type=int, default=64, metavar="N", help="Parallel storage checks (default 64)")
144 args = parser.parse_args()
145
146 errors = asyncio.run(repair(dry_run=args.dry_run, batch_size=args.batch, concurrency=args.concurrency))
147 sys.exit(1 if errors else 0)
148
149
150 if __name__ == "__main__":
151 main()
File History 1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 6 days ago