decompress_objects.py
python
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e
chore: update smoke_muse.sh comment to reference rc9
Sonnet 4.6
minor
⚠ breaking
21 days ago
| 1 | #!/usr/bin/env python3 |
| 2 | """One-time backfill: decompress zlib-stored objects in R2 and replace with plain bytes. |
| 3 | |
| 4 | Objects pushed via the old wire path were stored zlib-compressed in R2 under the |
| 5 | SHA-256 of their *plain* content. This violates content-addressing: the declared |
| 6 | identity (SHA-256 of plain bytes) does not match the stored bytes (compressed). |
| 7 | |
| 8 | This script corrects all such objects: |
| 9 | |
| 10 | 1. Pages through musehub_objects rows where storage_uri starts with "s3://". |
| 11 | 2. Fetches each object from R2. |
| 12 | 3. Skips objects that are already plain bytes. |
| 13 | 4. For zlib-compressed objects: |
| 14 | a. Decompresses. |
| 15 | b. Verifies SHA-256(decompressed) == object_id. Skips on mismatch. |
| 16 | c. Re-uploads plain bytes to R2 (same key — idempotent). |
| 17 | d. Updates size_bytes in DB (content_cache stays NULL). |
| 18 | 5. Reports totals and any errors. |
| 19 | |
| 20 | After a successful run, decompress_if_needed() is no longer needed on the read |
| 21 | path — all objects in R2 are guaranteed to be plain bytes. |
| 22 | |
| 23 | Run inside Docker on the target instance: |
| 24 | |
| 25 | docker exec musehub-blue python3 /app/deploy/decompress_objects.py [--dry-run] [--batch 200] [--concurrency 16] |
| 26 | |
| 27 | Options: |
| 28 | --dry-run Print what would be changed without touching R2 or the DB. |
| 29 | --batch N DB page size (default 200). |
| 30 | --concurrency N Parallel R2 fetches per batch (default 16). |
| 31 | --repo-id UUID Limit to a single repo (targeted fix). |
| 32 | """ |
| 33 | from __future__ import annotations |
| 34 | |
| 35 | import argparse |
| 36 | import asyncio |
| 37 | import sys |
| 38 | import time |
| 39 | import zlib |
| 40 | |
| 41 | import sqlalchemy as sa |
| 42 | from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine |
| 43 | from sqlalchemy.orm import sessionmaker |
| 44 | |
| 45 | from muse.core.types import blob_id, split_id |
| 46 | from musehub.config import settings |
| 47 | from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef |
| 48 | from musehub.storage import get_backend |
| 49 | |
| 50 | _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e") |
| 51 | |
| 52 | |
| 53 | def _is_zlib(data: bytes) -> bool: |
| 54 | return len(data) >= 2 and data[:2] in _ZLIB_MAGIC |
| 55 | |
| 56 | |
| 57 | def _decompress(data: bytes) -> bytes | None: |
| 58 | try: |
| 59 | return zlib.decompress(data) |
| 60 | except zlib.error: |
| 61 | return None |
| 62 | |
| 63 | |
| 64 | def _fmt_eta(seconds: float) -> str: |
| 65 | if seconds < 60: |
| 66 | return f"{seconds:.0f}s" |
| 67 | if seconds < 3600: |
| 68 | return f"{seconds / 60:.1f}m" |
| 69 | return f"{seconds / 3600:.1f}h" |
| 70 | |
| 71 | |
| 72 | async def _get_header(backend: object, object_id: str) -> bytes | None: |
| 73 | """Return the first 2 bytes of an object using a Range GET.""" |
| 74 | client = backend._get_client() # type: ignore[attr-defined] |
| 75 | key = backend._key(object_id) # type: ignore[attr-defined] |
| 76 | |
| 77 | def _range_get() -> bytes | None: |
| 78 | try: |
| 79 | resp = client.get_object( |
| 80 | Bucket=backend._bucket, Key=key, Range="bytes=0-1" # type: ignore[attr-defined] |
| 81 | ) |
| 82 | return resp["Body"].read(2) |
| 83 | except Exception: |
| 84 | return None |
| 85 | |
| 86 | return await asyncio.to_thread(_range_get) |
| 87 | |
| 88 | |
| 89 | async def backfill(dry_run: bool, quiet: bool, batch_size: int, concurrency: int, repo_id: str | None) -> int: |
| 90 | engine = create_async_engine(settings.database_url, echo=False) |
| 91 | async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) |
| 92 | backend = get_backend() |
| 93 | |
| 94 | # ── count total objects up front so we know the denominator ────────────── |
| 95 | async with async_session() as session: |
| 96 | count_stmt = ( |
| 97 | sa.select(sa.func.count()).select_from(MusehubObject) |
| 98 | .where( |
| 99 | MusehubObject.storage_uri.like("s3://%"), |
| 100 | MusehubObject.deleted_at.is_(None), |
| 101 | ) |
| 102 | ) |
| 103 | if repo_id: |
| 104 | count_stmt = ( |
| 105 | sa.select(sa.func.count()).select_from(MusehubObject) |
| 106 | .join(MusehubObjectRef, MusehubObject.object_id == MusehubObjectRef.object_id) |
| 107 | .where( |
| 108 | MusehubObjectRef.repo_id == repo_id, |
| 109 | MusehubObject.storage_uri.like("s3://%"), |
| 110 | MusehubObject.deleted_at.is_(None), |
| 111 | ) |
| 112 | ) |
| 113 | total_objects: int = (await session.execute(count_stmt)).scalar_one() |
| 114 | |
| 115 | scope = f"repo_id={repo_id}" if repo_id else "all repos" |
| 116 | print(f"Backfill scope: {scope}") |
| 117 | print(f"Total objects to scan: {total_objects:,}") |
| 118 | if total_objects == 0: |
| 119 | print("Nothing to do.") |
| 120 | return 0 |
| 121 | print() |
| 122 | |
| 123 | # ── shared progress state (updated inside asyncio tasks) ───────────────── |
| 124 | done_count = 0 |
| 125 | plain_count = 0 |
| 126 | decompressed_count = 0 |
| 127 | hash_mismatch_count = 0 |
| 128 | error_count = 0 |
| 129 | start_time = time.monotonic() |
| 130 | progress_lock = asyncio.Lock() |
| 131 | |
| 132 | def _progress_line(extra: str = "") -> None: |
| 133 | if quiet: |
| 134 | return |
| 135 | elapsed = time.monotonic() - start_time |
| 136 | rate = done_count / elapsed if elapsed > 0 else 0 |
| 137 | remaining = total_objects - done_count |
| 138 | eta_str = _fmt_eta(remaining / rate) if rate > 0 else "?" |
| 139 | pct = 100 * done_count / total_objects if total_objects else 100 |
| 140 | print( |
| 141 | f"\r [{done_count:>{len(str(total_objects))}}/{total_objects}]" |
| 142 | f" {pct:5.1f}%" |
| 143 | f" {remaining:,} remaining" |
| 144 | f" {rate:.1f} obj/s" |
| 145 | f" ETA {eta_str}" |
| 146 | + (f" {extra}" if extra else ""), |
| 147 | end="", |
| 148 | flush=True, |
| 149 | ) |
| 150 | |
| 151 | total_checked = 0 |
| 152 | total_errors = 0 |
| 153 | |
| 154 | async with async_session() as session: |
| 155 | offset = 0 |
| 156 | while True: |
| 157 | obj_stmt = ( |
| 158 | sa.select(MusehubObject.object_id) |
| 159 | .where( |
| 160 | MusehubObject.storage_uri.like("s3://%"), |
| 161 | MusehubObject.deleted_at.is_(None), |
| 162 | ) |
| 163 | ) |
| 164 | if repo_id: |
| 165 | obj_stmt = ( |
| 166 | sa.select(MusehubObject.object_id) |
| 167 | .join(MusehubObjectRef, MusehubObject.object_id == MusehubObjectRef.object_id) |
| 168 | .where( |
| 169 | MusehubObjectRef.repo_id == repo_id, |
| 170 | MusehubObject.storage_uri.like("s3://%"), |
| 171 | MusehubObject.deleted_at.is_(None), |
| 172 | ) |
| 173 | ) |
| 174 | |
| 175 | rows = (await session.execute( |
| 176 | obj_stmt |
| 177 | .order_by(MusehubObject.object_id) |
| 178 | .offset(offset) |
| 179 | .limit(batch_size) |
| 180 | )).scalars().all() |
| 181 | |
| 182 | if not rows: |
| 183 | break |
| 184 | |
| 185 | offset += len(rows) |
| 186 | total_checked += len(rows) |
| 187 | sem = asyncio.Semaphore(concurrency) |
| 188 | |
| 189 | async def _process(oid: str) -> tuple[str, str, int]: |
| 190 | nonlocal done_count, plain_count, decompressed_count |
| 191 | nonlocal hash_mismatch_count, error_count |
| 192 | |
| 193 | async with sem: |
| 194 | status = "plain" |
| 195 | new_size = 0 |
| 196 | detail = "" |
| 197 | |
| 198 | try: |
| 199 | header = await _get_header(backend, oid) |
| 200 | except Exception as exc: |
| 201 | print(f"\n ERROR fetching header {oid}: {exc}", file=sys.stderr) |
| 202 | async with progress_lock: |
| 203 | done_count += 1 |
| 204 | error_count += 1 |
| 205 | _progress_line() |
| 206 | return oid, "error", 0 |
| 207 | |
| 208 | if header is None or not _is_zlib(header): |
| 209 | async with progress_lock: |
| 210 | done_count += 1 |
| 211 | plain_count += 1 |
| 212 | _progress_line() |
| 213 | return oid, "plain", 0 |
| 214 | |
| 215 | # Has zlib header — fetch full object. |
| 216 | try: |
| 217 | data = await backend.get(oid) |
| 218 | except Exception as exc: |
| 219 | print(f"\n ERROR fetching {oid}: {exc}", file=sys.stderr) |
| 220 | async with progress_lock: |
| 221 | done_count += 1 |
| 222 | error_count += 1 |
| 223 | _progress_line() |
| 224 | return oid, "error", 0 |
| 225 | |
| 226 | if data is None: |
| 227 | async with progress_lock: |
| 228 | done_count += 1 |
| 229 | plain_count += 1 |
| 230 | _progress_line() |
| 231 | return oid, "plain", 0 |
| 232 | |
| 233 | decompressed = _decompress(data) |
| 234 | if decompressed is None: |
| 235 | detail = f"zlib header but decompress failed — skipping" |
| 236 | async with progress_lock: |
| 237 | done_count += 1 |
| 238 | error_count += 1 |
| 239 | _progress_line(f"WARN {oid} {detail}") |
| 240 | return oid, "error", 0 |
| 241 | |
| 242 | _, bare_hex = split_id(oid) |
| 243 | |
| 244 | if blob_id(decompressed) != oid: |
| 245 | _, actual = split_id(blob_id(decompressed)) |
| 246 | detail = f"hash mismatch (declared={bare_hex[:12]}… actual={actual[:12]}…)" |
| 247 | async with progress_lock: |
| 248 | done_count += 1 |
| 249 | hash_mismatch_count += 1 |
| 250 | _progress_line(f"WARN {oid} {detail}") |
| 251 | return oid, "hash_mismatch", 0 |
| 252 | |
| 253 | new_size = len(decompressed) |
| 254 | verb = "[dry] decompress" if dry_run else "decompress" |
| 255 | async with progress_lock: |
| 256 | done_count += 1 |
| 257 | decompressed_count += 1 |
| 258 | _progress_line(f"{verb} {oid} ({len(data)} → {new_size} bytes)") |
| 259 | |
| 260 | if dry_run: |
| 261 | return oid, "decompressed", new_size |
| 262 | |
| 263 | try: |
| 264 | await backend.put(oid, decompressed) |
| 265 | except Exception as exc: |
| 266 | print(f"\n ERROR re-uploading {oid}: {exc}", file=sys.stderr) |
| 267 | async with progress_lock: |
| 268 | error_count += 1 |
| 269 | return oid, "error", 0 |
| 270 | |
| 271 | return oid, "decompressed", new_size |
| 272 | |
| 273 | r2_results = await asyncio.gather(*(_process(oid) for oid in rows)) |
| 274 | |
| 275 | # DB updates for successfully decompressed objects. |
| 276 | if not dry_run: |
| 277 | for oid, status, new_size in r2_results: |
| 278 | if status != "decompressed": |
| 279 | continue |
| 280 | try: |
| 281 | await session.execute( |
| 282 | sa.update(MusehubObject) |
| 283 | .where(MusehubObject.object_id == oid) |
| 284 | .values(size_bytes=new_size) |
| 285 | ) |
| 286 | await session.commit() |
| 287 | except Exception as exc: |
| 288 | print(f"\n ERROR updating DB for {oid}: {exc}", file=sys.stderr) |
| 289 | await session.rollback() |
| 290 | async with progress_lock: |
| 291 | error_count += 1 |
| 292 | |
| 293 | # Final newline after the inline progress line. |
| 294 | if not quiet: |
| 295 | print() |
| 296 | |
| 297 | elapsed = time.monotonic() - start_time |
| 298 | prefix = "[dry-run] " if dry_run else "" |
| 299 | print( |
| 300 | f"\n{prefix}Backfill complete ({elapsed:.1f}s):\n" |
| 301 | f" {total_checked:6,} objects checked\n" |
| 302 | f" {plain_count:6,} already plain (skipped)\n" |
| 303 | f" {decompressed_count:6,} decompressed and re-uploaded\n" |
| 304 | f" {hash_mismatch_count:6,} skipped (hash mismatch after decompress)\n" |
| 305 | f" {error_count:6,} errors" |
| 306 | ) |
| 307 | return error_count |
| 308 | |
| 309 | |
| 310 | def main() -> None: |
| 311 | parser = argparse.ArgumentParser( |
| 312 | description=__doc__, |
| 313 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 314 | ) |
| 315 | parser.add_argument("--dry-run", action="store_true", |
| 316 | help="Print what would change without touching R2 or the DB") |
| 317 | parser.add_argument("--quiet", action="store_true", |
| 318 | help="Suppress per-object progress; only print final summary") |
| 319 | parser.add_argument("--batch", type=int, default=200, metavar="N", |
| 320 | help="DB page size (default 200)") |
| 321 | parser.add_argument("--concurrency", type=int, default=16, metavar="N", |
| 322 | help="Parallel R2 fetches per batch (default 16)") |
| 323 | parser.add_argument("--repo-id", default=None, metavar="UUID", |
| 324 | help="Limit to a single repo_id (for targeted testing)") |
| 325 | args = parser.parse_args() |
| 326 | |
| 327 | errors = asyncio.run(backfill( |
| 328 | dry_run=args.dry_run, |
| 329 | quiet=args.quiet, |
| 330 | batch_size=args.batch, |
| 331 | concurrency=args.concurrency, |
| 332 | repo_id=args.repo_id, |
| 333 | )) |
| 334 | sys.exit(1 if errors else 0) |
| 335 | |
| 336 | |
| 337 | if __name__ == "__main__": |
| 338 | main() |
File History
2 commits
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e
chore: update smoke_muse.sh comment to reference rc9
Sonnet 4.6
minor
⚠
21 days ago
sha256:39e9c4e6f2134da0732e6983268a218178973936f8d7ca03c91f2b5ad42133c8
fix: use read_object_bytes in blob viewer; add zstd magic d…
Sonnet 4.6
patch
21 days ago