#!/usr/bin/env python3 """One-time migration: normalize R2 object keys to canonical algo:hex format. Three key formats exist in R2 from different code eras: objects/ old — no algorithm prefix, ambiguous objects/sha256_ bad — underscore substitution, non-standard objects/sha256: correct — canonical algo:hex convention This script copies every object under the old formats to the canonical key, verifies the content hash, then deletes the old key. Run inside Docker on the target instance: docker exec musehub-blue python3 /app/deploy/migrate_r2_keys.py [--dry-run] [--concurrency 32] """ from __future__ import annotations import argparse import asyncio import hashlib import re import sys import time import boto3 from musehub.config import settings _HEX64 = re.compile(r'^[0-9a-f]{64}$') _OLD_BARE = re.compile(r'^objects/([0-9a-f]{64})$') _OLD_UNDER = re.compile(r'^objects/sha256_([0-9a-f]{64})$') _CANONICAL = re.compile(r'^objects/sha256:[0-9a-f]{64}$') def _canonical_key(hex64: str) -> str: return f"objects/sha256:{hex64}" def _get_client(): return boto3.client( "s3", endpoint_url=settings.blob_storage_endpoint, aws_access_key_id=settings.blob_storage_access_key_id, aws_secret_access_key=settings.blob_storage_secret_access_key, region_name=settings.blob_storage_region or "auto", ) async def migrate(dry_run: bool, concurrency: int) -> None: client = _get_client() bucket = settings.blob_storage_bucket # Collect all non-canonical keys. to_migrate: list[tuple[str, str]] = [] # (old_key, canonical_key) paginator = client.get_paginator("list_objects_v2") total_scanned = 0 for page in paginator.paginate(Bucket=bucket, Prefix="objects/"): for obj in page.get("Contents", []): key = obj["Key"] total_scanned += 1 m = _OLD_BARE.match(key) or _OLD_UNDER.match(key) if m: to_migrate.append((key, _canonical_key(m.group(1)))) print(f"Scanned {total_scanned:,} objects") print(f"Found {len(to_migrate):,} keys to migrate") if not to_migrate: print("Nothing to do.") return sem = asyncio.Semaphore(concurrency) ok = errors = skipped = 0 start = time.monotonic() async def _migrate_one(old_key: str, new_key: str) -> str: nonlocal ok, errors, skipped async with sem: hex64 = new_key.split("sha256:")[1] def _copy_and_delete() -> str: # Fetch — a 404 means a previous run already migrated and deleted this key try: data = client.get_object(Bucket=bucket, Key=old_key)["Body"].read() except client.exceptions.NoSuchKey: return f"SKIP already migrated {old_key}" except Exception as exc: try: client.head_object(Bucket=bucket, Key=old_key) except Exception: return f"SKIP already migrated {old_key}" return f"ERROR fetch {old_key}: {exc}" # Verify actual = hashlib.sha256(data).hexdigest() if actual != hex64: return f"SKIP hash mismatch {old_key}: declared={hex64[:12]}… actual={actual[:12]}…" if dry_run: return f"[dry] would copy {old_key} → {new_key} ({len(data)} bytes)" # Write canonical key try: client.put_object(Bucket=bucket, Key=new_key, Body=data) except Exception as exc: return f"ERROR put {new_key}: {exc}" # Delete old key try: client.delete_object(Bucket=bucket, Key=old_key) except Exception as exc: return f"ERROR delete {old_key}: {exc}" return f"OK {old_key} → {new_key} ({len(data)} bytes)" result = await asyncio.to_thread(_copy_and_delete) if result.startswith("OK") or result.startswith("[dry]"): ok += 1 elif result.startswith("SKIP"): skipped += 1 else: errors += 1 done = ok + skipped + errors elapsed = time.monotonic() - start rate = done / elapsed if elapsed > 0 else 0 remaining = len(to_migrate) - done eta = f"{remaining / rate:.0f}s" if rate > 0 else "?" print( f"\r [{done:>{len(str(len(to_migrate)))}}/{len(to_migrate)}]" f" ok={ok} skip={skipped} err={errors}" f" {rate:.1f}/s ETA {eta} {result[:80]}", end="", flush=True, ) return result await asyncio.gather(*(_migrate_one(old, new) for old, new in to_migrate)) print() elapsed = time.monotonic() - start prefix = "[dry-run] " if dry_run else "" print( f"\n{prefix}Migration complete ({elapsed:.1f}s):\n" f" {ok:6,} migrated\n" f" {skipped:6,} skipped (hash mismatch)\n" f" {errors:6,} errors" ) if errors: sys.exit(1) def main() -> None: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--concurrency", type=int, default=32, metavar="N") args = parser.parse_args() asyncio.run(migrate(dry_run=args.dry_run, concurrency=args.concurrency)) if __name__ == "__main__": main()