migrate_r2_keys.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | #!/usr/bin/env python3 |
| 2 | """One-time migration: normalize R2 object keys to canonical algo:hex format. |
| 3 | |
| 4 | Three key formats exist in R2 from different code eras: |
| 5 | |
| 6 | objects/<bare-hex> old — no algorithm prefix, ambiguous |
| 7 | objects/sha256_<hex> bad — underscore substitution, non-standard |
| 8 | objects/sha256:<hex> correct — canonical algo:hex convention |
| 9 | |
| 10 | This script copies every object under the old formats to the canonical key, |
| 11 | verifies the content hash, then deletes the old key. |
| 12 | |
| 13 | Run inside Docker on the target instance: |
| 14 | |
| 15 | docker exec musehub-blue python3 /app/deploy/migrate_r2_keys.py [--dry-run] [--concurrency 32] |
| 16 | """ |
| 17 | from __future__ import annotations |
| 18 | |
| 19 | import argparse |
| 20 | import asyncio |
| 21 | import hashlib |
| 22 | import re |
| 23 | import sys |
| 24 | import time |
| 25 | |
| 26 | import boto3 |
| 27 | |
| 28 | from musehub.config import settings |
| 29 | |
| 30 | _HEX64 = re.compile(r'^[0-9a-f]{64}$') |
| 31 | |
| 32 | _OLD_BARE = re.compile(r'^objects/([0-9a-f]{64})$') |
| 33 | _OLD_UNDER = re.compile(r'^objects/sha256_([0-9a-f]{64})$') |
| 34 | _CANONICAL = re.compile(r'^objects/sha256:[0-9a-f]{64}$') |
| 35 | |
| 36 | |
| 37 | def _canonical_key(hex64: str) -> str: |
| 38 | return f"objects/sha256:{hex64}" |
| 39 | |
| 40 | |
| 41 | def _get_client(): |
| 42 | return boto3.client( |
| 43 | "s3", |
| 44 | endpoint_url=settings.blob_storage_endpoint, |
| 45 | aws_access_key_id=settings.blob_storage_access_key_id, |
| 46 | aws_secret_access_key=settings.blob_storage_secret_access_key, |
| 47 | region_name=settings.blob_storage_region or "auto", |
| 48 | ) |
| 49 | |
| 50 | |
| 51 | async def migrate(dry_run: bool, concurrency: int) -> None: |
| 52 | client = _get_client() |
| 53 | bucket = settings.blob_storage_bucket |
| 54 | |
| 55 | # Collect all non-canonical keys. |
| 56 | to_migrate: list[tuple[str, str]] = [] # (old_key, canonical_key) |
| 57 | paginator = client.get_paginator("list_objects_v2") |
| 58 | total_scanned = 0 |
| 59 | for page in paginator.paginate(Bucket=bucket, Prefix="objects/"): |
| 60 | for obj in page.get("Contents", []): |
| 61 | key = obj["Key"] |
| 62 | total_scanned += 1 |
| 63 | m = _OLD_BARE.match(key) or _OLD_UNDER.match(key) |
| 64 | if m: |
| 65 | to_migrate.append((key, _canonical_key(m.group(1)))) |
| 66 | |
| 67 | print(f"Scanned {total_scanned:,} objects") |
| 68 | print(f"Found {len(to_migrate):,} keys to migrate") |
| 69 | if not to_migrate: |
| 70 | print("Nothing to do.") |
| 71 | return |
| 72 | |
| 73 | sem = asyncio.Semaphore(concurrency) |
| 74 | ok = errors = skipped = 0 |
| 75 | start = time.monotonic() |
| 76 | |
| 77 | async def _migrate_one(old_key: str, new_key: str) -> str: |
| 78 | nonlocal ok, errors, skipped |
| 79 | async with sem: |
| 80 | hex64 = new_key.split("sha256:")[1] |
| 81 | |
| 82 | def _copy_and_delete() -> str: |
| 83 | # Fetch — a 404 means a previous run already migrated and deleted this key |
| 84 | try: |
| 85 | data = client.get_object(Bucket=bucket, Key=old_key)["Body"].read() |
| 86 | except client.exceptions.NoSuchKey: |
| 87 | return f"SKIP already migrated {old_key}" |
| 88 | except Exception as exc: |
| 89 | try: |
| 90 | client.head_object(Bucket=bucket, Key=old_key) |
| 91 | except Exception: |
| 92 | return f"SKIP already migrated {old_key}" |
| 93 | return f"ERROR fetch {old_key}: {exc}" |
| 94 | |
| 95 | # Verify |
| 96 | actual = hashlib.sha256(data).hexdigest() |
| 97 | if actual != hex64: |
| 98 | return f"SKIP hash mismatch {old_key}: declared={hex64[:12]}… actual={actual[:12]}…" |
| 99 | |
| 100 | if dry_run: |
| 101 | return f"[dry] would copy {old_key} → {new_key} ({len(data)} bytes)" |
| 102 | |
| 103 | # Write canonical key |
| 104 | try: |
| 105 | client.put_object(Bucket=bucket, Key=new_key, Body=data) |
| 106 | except Exception as exc: |
| 107 | return f"ERROR put {new_key}: {exc}" |
| 108 | |
| 109 | # Delete old key |
| 110 | try: |
| 111 | client.delete_object(Bucket=bucket, Key=old_key) |
| 112 | except Exception as exc: |
| 113 | return f"ERROR delete {old_key}: {exc}" |
| 114 | |
| 115 | return f"OK {old_key} → {new_key} ({len(data)} bytes)" |
| 116 | |
| 117 | result = await asyncio.to_thread(_copy_and_delete) |
| 118 | |
| 119 | if result.startswith("OK") or result.startswith("[dry]"): |
| 120 | ok += 1 |
| 121 | elif result.startswith("SKIP"): |
| 122 | skipped += 1 |
| 123 | else: |
| 124 | errors += 1 |
| 125 | |
| 126 | done = ok + skipped + errors |
| 127 | elapsed = time.monotonic() - start |
| 128 | rate = done / elapsed if elapsed > 0 else 0 |
| 129 | remaining = len(to_migrate) - done |
| 130 | eta = f"{remaining / rate:.0f}s" if rate > 0 else "?" |
| 131 | print( |
| 132 | f"\r [{done:>{len(str(len(to_migrate)))}}/{len(to_migrate)}]" |
| 133 | f" ok={ok} skip={skipped} err={errors}" |
| 134 | f" {rate:.1f}/s ETA {eta} {result[:80]}", |
| 135 | end="", |
| 136 | flush=True, |
| 137 | ) |
| 138 | return result |
| 139 | |
| 140 | await asyncio.gather(*(_migrate_one(old, new) for old, new in to_migrate)) |
| 141 | print() |
| 142 | |
| 143 | elapsed = time.monotonic() - start |
| 144 | prefix = "[dry-run] " if dry_run else "" |
| 145 | print( |
| 146 | f"\n{prefix}Migration complete ({elapsed:.1f}s):\n" |
| 147 | f" {ok:6,} migrated\n" |
| 148 | f" {skipped:6,} skipped (hash mismatch)\n" |
| 149 | f" {errors:6,} errors" |
| 150 | ) |
| 151 | if errors: |
| 152 | sys.exit(1) |
| 153 | |
| 154 | |
| 155 | def main() -> None: |
| 156 | parser = argparse.ArgumentParser(description=__doc__, |
| 157 | formatter_class=argparse.RawDescriptionHelpFormatter) |
| 158 | parser.add_argument("--dry-run", action="store_true") |
| 159 | parser.add_argument("--concurrency", type=int, default=32, metavar="N") |
| 160 | args = parser.parse_args() |
| 161 | asyncio.run(migrate(dry_run=args.dry_run, concurrency=args.concurrency)) |
| 162 | |
| 163 | |
| 164 | if __name__ == "__main__": |
| 165 | main() |