gabriel / musehub public

migrate_r2_keys.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 #!/usr/bin/env python3
2 """One-time migration: normalize R2 object keys to canonical algo:hex format.
3
4 Three key formats exist in R2 from different code eras:
5
6 objects/<bare-hex> old — no algorithm prefix, ambiguous
7 objects/sha256_<hex> bad — underscore substitution, non-standard
8 objects/sha256:<hex> correct — canonical algo:hex convention
9
10 This script copies every object under the old formats to the canonical key,
11 verifies the content hash, then deletes the old key.
12
13 Run inside Docker on the target instance:
14
15 docker exec musehub-blue python3 /app/deploy/migrate_r2_keys.py [--dry-run] [--concurrency 32]
16 """
17 from __future__ import annotations
18
19 import argparse
20 import asyncio
21 import hashlib
22 import re
23 import sys
24 import time
25
26 import boto3
27
28 from musehub.config import settings
29
30 _HEX64 = re.compile(r'^[0-9a-f]{64}$')
31
32 _OLD_BARE = re.compile(r'^objects/([0-9a-f]{64})$')
33 _OLD_UNDER = re.compile(r'^objects/sha256_([0-9a-f]{64})$')
34 _CANONICAL = re.compile(r'^objects/sha256:[0-9a-f]{64}$')
35
36
37 def _canonical_key(hex64: str) -> str:
38 return f"objects/sha256:{hex64}"
39
40
41 def _get_client():
42 return boto3.client(
43 "s3",
44 endpoint_url=settings.blob_storage_endpoint,
45 aws_access_key_id=settings.blob_storage_access_key_id,
46 aws_secret_access_key=settings.blob_storage_secret_access_key,
47 region_name=settings.blob_storage_region or "auto",
48 )
49
50
51 async def migrate(dry_run: bool, concurrency: int) -> None:
52 client = _get_client()
53 bucket = settings.blob_storage_bucket
54
55 # Collect all non-canonical keys.
56 to_migrate: list[tuple[str, str]] = [] # (old_key, canonical_key)
57 paginator = client.get_paginator("list_objects_v2")
58 total_scanned = 0
59 for page in paginator.paginate(Bucket=bucket, Prefix="objects/"):
60 for obj in page.get("Contents", []):
61 key = obj["Key"]
62 total_scanned += 1
63 m = _OLD_BARE.match(key) or _OLD_UNDER.match(key)
64 if m:
65 to_migrate.append((key, _canonical_key(m.group(1))))
66
67 print(f"Scanned {total_scanned:,} objects")
68 print(f"Found {len(to_migrate):,} keys to migrate")
69 if not to_migrate:
70 print("Nothing to do.")
71 return
72
73 sem = asyncio.Semaphore(concurrency)
74 ok = errors = skipped = 0
75 start = time.monotonic()
76
77 async def _migrate_one(old_key: str, new_key: str) -> str:
78 nonlocal ok, errors, skipped
79 async with sem:
80 hex64 = new_key.split("sha256:")[1]
81
82 def _copy_and_delete() -> str:
83 # Fetch — a 404 means a previous run already migrated and deleted this key
84 try:
85 data = client.get_object(Bucket=bucket, Key=old_key)["Body"].read()
86 except client.exceptions.NoSuchKey:
87 return f"SKIP already migrated {old_key}"
88 except Exception as exc:
89 try:
90 client.head_object(Bucket=bucket, Key=old_key)
91 except Exception:
92 return f"SKIP already migrated {old_key}"
93 return f"ERROR fetch {old_key}: {exc}"
94
95 # Verify
96 actual = hashlib.sha256(data).hexdigest()
97 if actual != hex64:
98 return f"SKIP hash mismatch {old_key}: declared={hex64[:12]}… actual={actual[:12]}…"
99
100 if dry_run:
101 return f"[dry] would copy {old_key} → {new_key} ({len(data)} bytes)"
102
103 # Write canonical key
104 try:
105 client.put_object(Bucket=bucket, Key=new_key, Body=data)
106 except Exception as exc:
107 return f"ERROR put {new_key}: {exc}"
108
109 # Delete old key
110 try:
111 client.delete_object(Bucket=bucket, Key=old_key)
112 except Exception as exc:
113 return f"ERROR delete {old_key}: {exc}"
114
115 return f"OK {old_key} → {new_key} ({len(data)} bytes)"
116
117 result = await asyncio.to_thread(_copy_and_delete)
118
119 if result.startswith("OK") or result.startswith("[dry]"):
120 ok += 1
121 elif result.startswith("SKIP"):
122 skipped += 1
123 else:
124 errors += 1
125
126 done = ok + skipped + errors
127 elapsed = time.monotonic() - start
128 rate = done / elapsed if elapsed > 0 else 0
129 remaining = len(to_migrate) - done
130 eta = f"{remaining / rate:.0f}s" if rate > 0 else "?"
131 print(
132 f"\r [{done:>{len(str(len(to_migrate)))}}/{len(to_migrate)}]"
133 f" ok={ok} skip={skipped} err={errors}"
134 f" {rate:.1f}/s ETA {eta} {result[:80]}",
135 end="",
136 flush=True,
137 )
138 return result
139
140 await asyncio.gather(*(_migrate_one(old, new) for old, new in to_migrate))
141 print()
142
143 elapsed = time.monotonic() - start
144 prefix = "[dry-run] " if dry_run else ""
145 print(
146 f"\n{prefix}Migration complete ({elapsed:.1f}s):\n"
147 f" {ok:6,} migrated\n"
148 f" {skipped:6,} skipped (hash mismatch)\n"
149 f" {errors:6,} errors"
150 )
151 if errors:
152 sys.exit(1)
153
154
155 def main() -> None:
156 parser = argparse.ArgumentParser(description=__doc__,
157 formatter_class=argparse.RawDescriptionHelpFormatter)
158 parser.add_argument("--dry-run", action="store_true")
159 parser.add_argument("--concurrency", type=int, default=32, metavar="N")
160 args = parser.parse_args()
161 asyncio.run(migrate(dry_run=args.dry_run, concurrency=args.concurrency))
162
163
164 if __name__ == "__main__":
165 main()