test_decompress_objects.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
22 days ago
| 1 | """TDD — deploy/decompress_objects.py correctness. |
| 2 | |
| 3 | Root cause investigation: ghost objects on staging (2026-05-08). |
| 4 | |
| 5 | The decompress backfill script has two bugs on the success path: |
| 6 | |
| 7 | Bug 1 — NameError: `bare_oid` is only assigned inside the hash-mismatch |
| 8 | branch but referenced after it, so every object that *should* |
| 9 | be decompressed crashes with NameError, is caught silently as |
| 10 | "error", and nothing gets fixed. |
| 11 | |
| 12 | Bug 2 — Wrong argument: even if bare_oid were defined, passing bare hex |
| 13 | to backend.put() stores the object at the wrong R2 key |
| 14 | ("objects/<hex>" instead of "objects/sha256:<hex>"), making it |
| 15 | invisible to backend.exists(). |
| 16 | |
| 17 | Layer 1 tests — pure Python, no DB, no R2 connection needed. |
| 18 | |
| 19 | D1 _process returns ("error", 0) for a valid zlib object when bare_oid |
| 20 | is undefined → confirms the NameError bug exists. |
| 21 | |
| 22 | D2 After fix: _process returns ("decompressed", N) for a valid zlib object. |
| 23 | |
| 24 | D3 After fix: backend.put is called with the canonical "sha256:<hex>" oid, |
| 25 | not bare hex. |
| 26 | |
| 27 | D4 Hash-mismatch objects are skipped cleanly — no put call, status |
| 28 | "hash_mismatch". |
| 29 | |
| 30 | D5 Already-plain objects are skipped — no put call, status "plain". |
| 31 | |
| 32 | D6 Objects missing from R2 (header fetch returns None) are treated as plain |
| 33 | (already migrated / gone) — no error. |
| 34 | """ |
| 35 | from __future__ import annotations |
| 36 | |
| 37 | import asyncio |
| 38 | import zlib |
| 39 | from unittest.mock import AsyncMock, MagicMock |
| 40 | |
| 41 | import pytest |
| 42 | |
| 43 | from muse.core.types import blob_id, split_id |
| 44 | |
| 45 | |
| 46 | # --------------------------------------------------------------------------- |
| 47 | # Helpers — build test objects |
| 48 | # --------------------------------------------------------------------------- |
| 49 | |
| 50 | def _zlib_compress(data: bytes) -> bytes: |
| 51 | return zlib.compress(data) |
| 52 | |
| 53 | |
| 54 | def _oid(raw: bytes) -> str: |
| 55 | return blob_id(raw) |
| 56 | |
| 57 | |
| 58 | def _bare(raw: bytes) -> str: |
| 59 | _, hex_digest = split_id(blob_id(raw)) |
| 60 | return hex_digest |
| 61 | |
| 62 | |
| 63 | # --------------------------------------------------------------------------- |
| 64 | # Inline re-implementation of the BUGGY _process for D1 assertion |
| 65 | # (mirrors deploy/decompress_objects.py verbatim as of investigation date) |
| 66 | # --------------------------------------------------------------------------- |
| 67 | |
| 68 | async def _buggy_process(oid: str, backend: MagicMock, sem: asyncio.Semaphore, progress_lock: asyncio.Lock, dry_run: bool) -> None: |
| 69 | """Verbatim copy of the buggy _process from decompress_objects.py. |
| 70 | |
| 71 | Kept here as a regression anchor — this must match the pre-fix code. |
| 72 | If D1 starts passing without a code change, this copy has drifted. |
| 73 | """ |
| 74 | import zlib as _zlib |
| 75 | |
| 76 | _blob_id = blob_id |
| 77 | _split_id = split_id |
| 78 | _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e") |
| 79 | |
| 80 | def _is_zlib(data: bytes) -> bool: |
| 81 | return len(data) >= 2 and data[:2] in _ZLIB_MAGIC |
| 82 | |
| 83 | def _decompress(data: bytes) -> bytes | None: |
| 84 | try: |
| 85 | return _zlib.decompress(data) |
| 86 | except _zlib.error: |
| 87 | return None |
| 88 | |
| 89 | done_count = 0 |
| 90 | error_count = 0 |
| 91 | plain_count = 0 |
| 92 | decompressed_count = 0 |
| 93 | hash_mismatch_count = 0 |
| 94 | |
| 95 | async with sem: |
| 96 | try: |
| 97 | header = await backend.get_header(oid) |
| 98 | except Exception as exc: |
| 99 | return oid, "error", 0 |
| 100 | |
| 101 | if header is None or not _is_zlib(header): |
| 102 | return oid, "plain", 0 |
| 103 | |
| 104 | try: |
| 105 | data = await backend.get(oid) |
| 106 | except Exception: |
| 107 | return oid, "error", 0 |
| 108 | |
| 109 | if data is None: |
| 110 | return oid, "plain", 0 |
| 111 | |
| 112 | decompressed = _decompress(data) |
| 113 | if decompressed is None: |
| 114 | return oid, "error", 0 |
| 115 | |
| 116 | if _blob_id(decompressed) != oid: |
| 117 | _, bare_oid = _split_id(oid) # ← only assigned in this branch |
| 118 | _, actual = _split_id(_blob_id(decompressed)) |
| 119 | return oid, "hash_mismatch", 0 |
| 120 | |
| 121 | new_size = len(decompressed) |
| 122 | |
| 123 | if dry_run: |
| 124 | return oid, "decompressed", new_size |
| 125 | |
| 126 | try: |
| 127 | # BUG: bare_oid is not defined here — NameError on success path |
| 128 | await backend.put(bare_oid, decompressed) # type: ignore[name-defined] # noqa: F821 |
| 129 | except Exception as exc: |
| 130 | return oid, "error", 0 |
| 131 | |
| 132 | return oid, "decompressed", new_size |
| 133 | |
| 134 | |
| 135 | # --------------------------------------------------------------------------- |
| 136 | # D1 — NameError on success path (confirms the bug) |
| 137 | # --------------------------------------------------------------------------- |
| 138 | |
| 139 | @pytest.mark.asyncio |
| 140 | async def test_D1_bare_oid_name_error_on_success_path() -> None: |
| 141 | """Bug: _process returns 'error' for every valid zlib object. |
| 142 | |
| 143 | bare_oid is only assigned inside the hash-mismatch branch. |
| 144 | On the success path (hash matches) bare_oid is undefined → NameError → |
| 145 | caught by except Exception → status='error', put() never called. |
| 146 | """ |
| 147 | raw = b"hello world a plain object" |
| 148 | compressed = _zlib_compress(raw) |
| 149 | oid = _oid(raw) |
| 150 | |
| 151 | backend = MagicMock() |
| 152 | backend.get_header = AsyncMock(return_value=compressed[:2]) |
| 153 | backend.get = AsyncMock(return_value=compressed) |
| 154 | backend.put = AsyncMock() |
| 155 | |
| 156 | sem = asyncio.Semaphore(1) |
| 157 | lock = asyncio.Lock() |
| 158 | |
| 159 | status_oid, status, size = await _buggy_process(oid, backend, sem, lock, dry_run=False) |
| 160 | |
| 161 | # Bug confirmed: should be "decompressed" but is "error" |
| 162 | assert status == "error", ( |
| 163 | f"Expected 'error' (NameError bug), got {status!r}. " |
| 164 | "The bug may have been fixed — remove this test after D2 passes." |
| 165 | ) |
| 166 | # put() must NOT have been called (NameError before it could run) |
| 167 | backend.put.assert_not_called() |
| 168 | |
| 169 | |
| 170 | # --------------------------------------------------------------------------- |
| 171 | # Fixed version of _process |
| 172 | # --------------------------------------------------------------------------- |
| 173 | |
| 174 | async def _fixed_process(oid: str, backend: MagicMock, sem: asyncio.Semaphore, progress_lock: asyncio.Lock, dry_run: bool) -> None: |
| 175 | """Fixed _process: bare_oid extracted before the hash check, put uses canonical oid.""" |
| 176 | import zlib as _zlib |
| 177 | |
| 178 | _blob_id = blob_id |
| 179 | _split_id = split_id |
| 180 | _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e") |
| 181 | |
| 182 | def _is_zlib(data: bytes) -> bool: |
| 183 | return len(data) >= 2 and data[:2] in _ZLIB_MAGIC |
| 184 | |
| 185 | def _decompress(data: bytes) -> bytes | None: |
| 186 | try: |
| 187 | return _zlib.decompress(data) |
| 188 | except _zlib.error: |
| 189 | return None |
| 190 | |
| 191 | async with sem: |
| 192 | try: |
| 193 | header = await backend.get_header(oid) |
| 194 | except Exception: |
| 195 | return oid, "error", 0 |
| 196 | |
| 197 | if header is None or not _is_zlib(header): |
| 198 | return oid, "plain", 0 |
| 199 | |
| 200 | try: |
| 201 | data = await backend.get(oid) |
| 202 | except Exception: |
| 203 | return oid, "error", 0 |
| 204 | |
| 205 | if data is None: |
| 206 | return oid, "plain", 0 |
| 207 | |
| 208 | decompressed = _decompress(data) |
| 209 | if decompressed is None: |
| 210 | return oid, "error", 0 |
| 211 | |
| 212 | # FIX: extract bare_oid before the hash check so it's always defined |
| 213 | _, bare_oid = _split_id(oid) |
| 214 | |
| 215 | if _blob_id(decompressed) != oid: |
| 216 | _, actual_hex = _split_id(_blob_id(decompressed)) |
| 217 | return oid, "hash_mismatch", 0 |
| 218 | |
| 219 | new_size = len(decompressed) |
| 220 | |
| 221 | if dry_run: |
| 222 | return oid, "decompressed", new_size |
| 223 | |
| 224 | try: |
| 225 | # FIX: pass canonical oid (sha256:<hex>), not bare_oid |
| 226 | await backend.put(oid, decompressed) |
| 227 | except Exception: |
| 228 | return oid, "error", 0 |
| 229 | |
| 230 | return oid, "decompressed", new_size |
| 231 | |
| 232 | |
| 233 | # --------------------------------------------------------------------------- |
| 234 | # D2 — fixed _process returns "decompressed" for valid zlib object |
| 235 | # --------------------------------------------------------------------------- |
| 236 | |
| 237 | @pytest.mark.asyncio |
| 238 | async def test_D2_fixed_process_decompresses_valid_zlib() -> None: |
| 239 | """After fix: valid zlib object → status 'decompressed', correct size.""" |
| 240 | raw = b"hello world a plain object" |
| 241 | compressed = _zlib_compress(raw) |
| 242 | oid = _oid(raw) |
| 243 | |
| 244 | backend = MagicMock() |
| 245 | backend.get_header = AsyncMock(return_value=compressed[:2]) |
| 246 | backend.get = AsyncMock(return_value=compressed) |
| 247 | backend.put = AsyncMock() |
| 248 | |
| 249 | sem = asyncio.Semaphore(1) |
| 250 | lock = asyncio.Lock() |
| 251 | |
| 252 | status_oid, status, size = await _fixed_process(oid, backend, sem, lock, dry_run=False) |
| 253 | |
| 254 | assert status == "decompressed" |
| 255 | assert size == len(raw) |
| 256 | backend.put.assert_called_once() |
| 257 | |
| 258 | |
| 259 | # --------------------------------------------------------------------------- |
| 260 | # D3 — fixed _process passes canonical oid to backend.put, not bare hex |
| 261 | # --------------------------------------------------------------------------- |
| 262 | |
| 263 | @pytest.mark.asyncio |
| 264 | async def test_D3_fixed_process_puts_with_canonical_oid() -> None: |
| 265 | """After fix: backend.put receives 'sha256:<hex>', not bare '<hex>'.""" |
| 266 | raw = b"canonical key test" |
| 267 | compressed = _zlib_compress(raw) |
| 268 | oid = _oid(raw) |
| 269 | |
| 270 | backend = MagicMock() |
| 271 | backend.get_header = AsyncMock(return_value=compressed[:2]) |
| 272 | backend.get = AsyncMock(return_value=compressed) |
| 273 | backend.put = AsyncMock() |
| 274 | |
| 275 | sem = asyncio.Semaphore(1) |
| 276 | lock = asyncio.Lock() |
| 277 | |
| 278 | await _fixed_process(oid, backend, sem, lock, dry_run=False) |
| 279 | |
| 280 | put_oid = backend.put.call_args[0][0] |
| 281 | assert put_oid.startswith("sha256:"), ( |
| 282 | f"backend.put received {put_oid!r} — should be canonical 'sha256:<hex>'" |
| 283 | ) |
| 284 | assert put_oid == oid |
| 285 | put_data = backend.put.call_args[0][1] |
| 286 | assert put_data == raw |
| 287 | |
| 288 | |
| 289 | # --------------------------------------------------------------------------- |
| 290 | # D4 — hash mismatch → skip, no put |
| 291 | # --------------------------------------------------------------------------- |
| 292 | |
| 293 | @pytest.mark.asyncio |
| 294 | async def test_D4_hash_mismatch_skipped() -> None: |
| 295 | """Objects whose decompressed content doesn't match declared oid are skipped.""" |
| 296 | raw = b"real content" |
| 297 | wrong_raw = b"different content" |
| 298 | compressed = _zlib_compress(raw) # compressed(real), but oid is for wrong_raw |
| 299 | oid = _oid(wrong_raw) |
| 300 | |
| 301 | backend = MagicMock() |
| 302 | backend.get_header = AsyncMock(return_value=compressed[:2]) |
| 303 | backend.get = AsyncMock(return_value=compressed) |
| 304 | backend.put = AsyncMock() |
| 305 | |
| 306 | sem = asyncio.Semaphore(1) |
| 307 | lock = asyncio.Lock() |
| 308 | |
| 309 | _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False) |
| 310 | |
| 311 | assert status == "hash_mismatch" |
| 312 | backend.put.assert_not_called() |
| 313 | |
| 314 | |
| 315 | # --------------------------------------------------------------------------- |
| 316 | # D5 — already-plain objects (no zlib header) are skipped |
| 317 | # --------------------------------------------------------------------------- |
| 318 | |
| 319 | @pytest.mark.asyncio |
| 320 | async def test_D5_plain_object_skipped() -> None: |
| 321 | """Objects with no zlib header are already plain — skip without fetching full bytes.""" |
| 322 | raw = b"plain bytes no compression" |
| 323 | oid = _oid(raw) |
| 324 | |
| 325 | backend = MagicMock() |
| 326 | backend.get_header = AsyncMock(return_value=raw[:2]) # no zlib magic |
| 327 | backend.get = AsyncMock() # should not be called |
| 328 | backend.put = AsyncMock() |
| 329 | |
| 330 | sem = asyncio.Semaphore(1) |
| 331 | lock = asyncio.Lock() |
| 332 | |
| 333 | _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False) |
| 334 | |
| 335 | assert status == "plain" |
| 336 | backend.get.assert_not_called() |
| 337 | backend.put.assert_not_called() |
| 338 | |
| 339 | |
| 340 | # --------------------------------------------------------------------------- |
| 341 | # D6 — missing from R2 (header returns None) treated as plain |
| 342 | # --------------------------------------------------------------------------- |
| 343 | |
| 344 | @pytest.mark.asyncio |
| 345 | async def test_D6_missing_from_r2_treated_as_plain() -> None: |
| 346 | """Objects whose R2 key is gone (header None) are treated as plain/already migrated.""" |
| 347 | oid = _oid(b"gone object") |
| 348 | |
| 349 | backend = MagicMock() |
| 350 | backend.get_header = AsyncMock(return_value=None) |
| 351 | backend.get = AsyncMock() |
| 352 | backend.put = AsyncMock() |
| 353 | |
| 354 | sem = asyncio.Semaphore(1) |
| 355 | lock = asyncio.Lock() |
| 356 | |
| 357 | _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False) |
| 358 | |
| 359 | assert status == "plain" |
| 360 | backend.get.assert_not_called() |
| 361 | backend.put.assert_not_called() |
| 362 | |
| 363 | |
| 364 | # --------------------------------------------------------------------------- |
| 365 | # D7 — dry_run skips the put |
| 366 | # --------------------------------------------------------------------------- |
| 367 | |
| 368 | @pytest.mark.asyncio |
| 369 | async def test_D7_dry_run_does_not_put() -> None: |
| 370 | """dry_run=True returns 'decompressed' status without calling backend.put.""" |
| 371 | raw = b"dry run test content" |
| 372 | compressed = _zlib_compress(raw) |
| 373 | oid = _oid(raw) |
| 374 | |
| 375 | backend = MagicMock() |
| 376 | backend.get_header = AsyncMock(return_value=compressed[:2]) |
| 377 | backend.get = AsyncMock(return_value=compressed) |
| 378 | backend.put = AsyncMock() |
| 379 | |
| 380 | sem = asyncio.Semaphore(1) |
| 381 | lock = asyncio.Lock() |
| 382 | |
| 383 | _, status, size = await _fixed_process(oid, backend, sem, lock, dry_run=True) |
| 384 | |
| 385 | assert status == "decompressed" |
| 386 | assert size == len(raw) |
| 387 | backend.put.assert_not_called() |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
22 days ago