gabriel / musehub public
decompress_objects.py python
338 lines 12.7 KB
Raw
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e chore: update smoke_muse.sh comment to reference rc9 Sonnet 4.6 minor ⚠ breaking 21 days ago
1 #!/usr/bin/env python3
2 """One-time backfill: decompress zlib-stored objects in R2 and replace with plain bytes.
3
4 Objects pushed via the old wire path were stored zlib-compressed in R2 under the
5 SHA-256 of their *plain* content. This violates content-addressing: the declared
6 identity (SHA-256 of plain bytes) does not match the stored bytes (compressed).
7
8 This script corrects all such objects:
9
10 1. Pages through musehub_objects rows where storage_uri starts with "s3://".
11 2. Fetches each object from R2.
12 3. Skips objects that are already plain bytes.
13 4. For zlib-compressed objects:
14 a. Decompresses.
15 b. Verifies SHA-256(decompressed) == object_id. Skips on mismatch.
16 c. Re-uploads plain bytes to R2 (same key — idempotent).
17 d. Updates size_bytes in DB (content_cache stays NULL).
18 5. Reports totals and any errors.
19
20 After a successful run, decompress_if_needed() is no longer needed on the read
21 path — all objects in R2 are guaranteed to be plain bytes.
22
23 Run inside Docker on the target instance:
24
25 docker exec musehub-blue python3 /app/deploy/decompress_objects.py [--dry-run] [--batch 200] [--concurrency 16]
26
27 Options:
28 --dry-run Print what would be changed without touching R2 or the DB.
29 --batch N DB page size (default 200).
30 --concurrency N Parallel R2 fetches per batch (default 16).
31 --repo-id UUID Limit to a single repo (targeted fix).
32 """
33 from __future__ import annotations
34
35 import argparse
36 import asyncio
37 import sys
38 import time
39 import zlib
40
41 import sqlalchemy as sa
42 from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
43 from sqlalchemy.orm import sessionmaker
44
45 from muse.core.types import blob_id, split_id
46 from musehub.config import settings
47 from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef
48 from musehub.storage import get_backend
49
50 _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e")
51
52
53 def _is_zlib(data: bytes) -> bool:
54 return len(data) >= 2 and data[:2] in _ZLIB_MAGIC
55
56
57 def _decompress(data: bytes) -> bytes | None:
58 try:
59 return zlib.decompress(data)
60 except zlib.error:
61 return None
62
63
64 def _fmt_eta(seconds: float) -> str:
65 if seconds < 60:
66 return f"{seconds:.0f}s"
67 if seconds < 3600:
68 return f"{seconds / 60:.1f}m"
69 return f"{seconds / 3600:.1f}h"
70
71
72 async def _get_header(backend: object, object_id: str) -> bytes | None:
73 """Return the first 2 bytes of an object using a Range GET."""
74 client = backend._get_client() # type: ignore[attr-defined]
75 key = backend._key(object_id) # type: ignore[attr-defined]
76
77 def _range_get() -> bytes | None:
78 try:
79 resp = client.get_object(
80 Bucket=backend._bucket, Key=key, Range="bytes=0-1" # type: ignore[attr-defined]
81 )
82 return resp["Body"].read(2)
83 except Exception:
84 return None
85
86 return await asyncio.to_thread(_range_get)
87
88
89 async def backfill(dry_run: bool, quiet: bool, batch_size: int, concurrency: int, repo_id: str | None) -> int:
90 engine = create_async_engine(settings.database_url, echo=False)
91 async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
92 backend = get_backend()
93
94 # ── count total objects up front so we know the denominator ──────────────
95 async with async_session() as session:
96 count_stmt = (
97 sa.select(sa.func.count()).select_from(MusehubObject)
98 .where(
99 MusehubObject.storage_uri.like("s3://%"),
100 MusehubObject.deleted_at.is_(None),
101 )
102 )
103 if repo_id:
104 count_stmt = (
105 sa.select(sa.func.count()).select_from(MusehubObject)
106 .join(MusehubObjectRef, MusehubObject.object_id == MusehubObjectRef.object_id)
107 .where(
108 MusehubObjectRef.repo_id == repo_id,
109 MusehubObject.storage_uri.like("s3://%"),
110 MusehubObject.deleted_at.is_(None),
111 )
112 )
113 total_objects: int = (await session.execute(count_stmt)).scalar_one()
114
115 scope = f"repo_id={repo_id}" if repo_id else "all repos"
116 print(f"Backfill scope: {scope}")
117 print(f"Total objects to scan: {total_objects:,}")
118 if total_objects == 0:
119 print("Nothing to do.")
120 return 0
121 print()
122
123 # ── shared progress state (updated inside asyncio tasks) ─────────────────
124 done_count = 0
125 plain_count = 0
126 decompressed_count = 0
127 hash_mismatch_count = 0
128 error_count = 0
129 start_time = time.monotonic()
130 progress_lock = asyncio.Lock()
131
132 def _progress_line(extra: str = "") -> None:
133 if quiet:
134 return
135 elapsed = time.monotonic() - start_time
136 rate = done_count / elapsed if elapsed > 0 else 0
137 remaining = total_objects - done_count
138 eta_str = _fmt_eta(remaining / rate) if rate > 0 else "?"
139 pct = 100 * done_count / total_objects if total_objects else 100
140 print(
141 f"\r [{done_count:>{len(str(total_objects))}}/{total_objects}]"
142 f" {pct:5.1f}%"
143 f" {remaining:,} remaining"
144 f" {rate:.1f} obj/s"
145 f" ETA {eta_str}"
146 + (f" {extra}" if extra else ""),
147 end="",
148 flush=True,
149 )
150
151 total_checked = 0
152 total_errors = 0
153
154 async with async_session() as session:
155 offset = 0
156 while True:
157 obj_stmt = (
158 sa.select(MusehubObject.object_id)
159 .where(
160 MusehubObject.storage_uri.like("s3://%"),
161 MusehubObject.deleted_at.is_(None),
162 )
163 )
164 if repo_id:
165 obj_stmt = (
166 sa.select(MusehubObject.object_id)
167 .join(MusehubObjectRef, MusehubObject.object_id == MusehubObjectRef.object_id)
168 .where(
169 MusehubObjectRef.repo_id == repo_id,
170 MusehubObject.storage_uri.like("s3://%"),
171 MusehubObject.deleted_at.is_(None),
172 )
173 )
174
175 rows = (await session.execute(
176 obj_stmt
177 .order_by(MusehubObject.object_id)
178 .offset(offset)
179 .limit(batch_size)
180 )).scalars().all()
181
182 if not rows:
183 break
184
185 offset += len(rows)
186 total_checked += len(rows)
187 sem = asyncio.Semaphore(concurrency)
188
189 async def _process(oid: str) -> tuple[str, str, int]:
190 nonlocal done_count, plain_count, decompressed_count
191 nonlocal hash_mismatch_count, error_count
192
193 async with sem:
194 status = "plain"
195 new_size = 0
196 detail = ""
197
198 try:
199 header = await _get_header(backend, oid)
200 except Exception as exc:
201 print(f"\n ERROR fetching header {oid}: {exc}", file=sys.stderr)
202 async with progress_lock:
203 done_count += 1
204 error_count += 1
205 _progress_line()
206 return oid, "error", 0
207
208 if header is None or not _is_zlib(header):
209 async with progress_lock:
210 done_count += 1
211 plain_count += 1
212 _progress_line()
213 return oid, "plain", 0
214
215 # Has zlib header — fetch full object.
216 try:
217 data = await backend.get(oid)
218 except Exception as exc:
219 print(f"\n ERROR fetching {oid}: {exc}", file=sys.stderr)
220 async with progress_lock:
221 done_count += 1
222 error_count += 1
223 _progress_line()
224 return oid, "error", 0
225
226 if data is None:
227 async with progress_lock:
228 done_count += 1
229 plain_count += 1
230 _progress_line()
231 return oid, "plain", 0
232
233 decompressed = _decompress(data)
234 if decompressed is None:
235 detail = f"zlib header but decompress failed — skipping"
236 async with progress_lock:
237 done_count += 1
238 error_count += 1
239 _progress_line(f"WARN {oid} {detail}")
240 return oid, "error", 0
241
242 _, bare_hex = split_id(oid)
243
244 if blob_id(decompressed) != oid:
245 _, actual = split_id(blob_id(decompressed))
246 detail = f"hash mismatch (declared={bare_hex[:12]}… actual={actual[:12]}…)"
247 async with progress_lock:
248 done_count += 1
249 hash_mismatch_count += 1
250 _progress_line(f"WARN {oid} {detail}")
251 return oid, "hash_mismatch", 0
252
253 new_size = len(decompressed)
254 verb = "[dry] decompress" if dry_run else "decompress"
255 async with progress_lock:
256 done_count += 1
257 decompressed_count += 1
258 _progress_line(f"{verb} {oid} ({len(data)} → {new_size} bytes)")
259
260 if dry_run:
261 return oid, "decompressed", new_size
262
263 try:
264 await backend.put(oid, decompressed)
265 except Exception as exc:
266 print(f"\n ERROR re-uploading {oid}: {exc}", file=sys.stderr)
267 async with progress_lock:
268 error_count += 1
269 return oid, "error", 0
270
271 return oid, "decompressed", new_size
272
273 r2_results = await asyncio.gather(*(_process(oid) for oid in rows))
274
275 # DB updates for successfully decompressed objects.
276 if not dry_run:
277 for oid, status, new_size in r2_results:
278 if status != "decompressed":
279 continue
280 try:
281 await session.execute(
282 sa.update(MusehubObject)
283 .where(MusehubObject.object_id == oid)
284 .values(size_bytes=new_size)
285 )
286 await session.commit()
287 except Exception as exc:
288 print(f"\n ERROR updating DB for {oid}: {exc}", file=sys.stderr)
289 await session.rollback()
290 async with progress_lock:
291 error_count += 1
292
293 # Final newline after the inline progress line.
294 if not quiet:
295 print()
296
297 elapsed = time.monotonic() - start_time
298 prefix = "[dry-run] " if dry_run else ""
299 print(
300 f"\n{prefix}Backfill complete ({elapsed:.1f}s):\n"
301 f" {total_checked:6,} objects checked\n"
302 f" {plain_count:6,} already plain (skipped)\n"
303 f" {decompressed_count:6,} decompressed and re-uploaded\n"
304 f" {hash_mismatch_count:6,} skipped (hash mismatch after decompress)\n"
305 f" {error_count:6,} errors"
306 )
307 return error_count
308
309
310 def main() -> None:
311 parser = argparse.ArgumentParser(
312 description=__doc__,
313 formatter_class=argparse.RawDescriptionHelpFormatter,
314 )
315 parser.add_argument("--dry-run", action="store_true",
316 help="Print what would change without touching R2 or the DB")
317 parser.add_argument("--quiet", action="store_true",
318 help="Suppress per-object progress; only print final summary")
319 parser.add_argument("--batch", type=int, default=200, metavar="N",
320 help="DB page size (default 200)")
321 parser.add_argument("--concurrency", type=int, default=16, metavar="N",
322 help="Parallel R2 fetches per batch (default 16)")
323 parser.add_argument("--repo-id", default=None, metavar="UUID",
324 help="Limit to a single repo_id (for targeted testing)")
325 args = parser.parse_args()
326
327 errors = asyncio.run(backfill(
328 dry_run=args.dry_run,
329 quiet=args.quiet,
330 batch_size=args.batch,
331 concurrency=args.concurrency,
332 repo_id=args.repo_id,
333 ))
334 sys.exit(1 if errors else 0)
335
336
337 if __name__ == "__main__":
338 main()
File History 2 commits
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e chore: update smoke_muse.sh comment to reference rc9 Sonnet 4.6 minor 21 days ago
sha256:39e9c4e6f2134da0732e6983268a218178973936f8d7ca03c91f2b5ad42133c8 fix: use read_object_bytes in blob viewer; add zstd magic d… Sonnet 4.6 patch 21 days ago