"""TDD — deploy/decompress_objects.py correctness. Root cause investigation: ghost objects on staging (2026-05-08). The decompress backfill script has two bugs on the success path: Bug 1 — NameError: `bare_oid` is only assigned inside the hash-mismatch branch but referenced after it, so every object that *should* be decompressed crashes with NameError, is caught silently as "error", and nothing gets fixed. Bug 2 — Wrong argument: even if bare_oid were defined, passing bare hex to backend.put() stores the object at the wrong R2 key ("objects/" instead of "objects/sha256:"), making it invisible to backend.exists(). Layer 1 tests — pure Python, no DB, no R2 connection needed. D1 _process returns ("error", 0) for a valid zlib object when bare_oid is undefined → confirms the NameError bug exists. D2 After fix: _process returns ("decompressed", N) for a valid zlib object. D3 After fix: backend.put is called with the canonical "sha256:" oid, not bare hex. D4 Hash-mismatch objects are skipped cleanly — no put call, status "hash_mismatch". D5 Already-plain objects are skipped — no put call, status "plain". D6 Objects missing from R2 (header fetch returns None) are treated as plain (already migrated / gone) — no error. """ from __future__ import annotations import asyncio import zlib from unittest.mock import AsyncMock, MagicMock import pytest from muse.core.types import blob_id, split_id # --------------------------------------------------------------------------- # Helpers — build test objects # --------------------------------------------------------------------------- def _zlib_compress(data: bytes) -> bytes: return zlib.compress(data) def _oid(raw: bytes) -> str: return blob_id(raw) def _bare(raw: bytes) -> str: _, hex_digest = split_id(blob_id(raw)) return hex_digest # --------------------------------------------------------------------------- # Inline re-implementation of the BUGGY _process for D1 assertion # (mirrors deploy/decompress_objects.py verbatim as of investigation date) # --------------------------------------------------------------------------- async def _buggy_process(oid: str, backend: MagicMock, sem: asyncio.Semaphore, progress_lock: asyncio.Lock, dry_run: bool) -> None: """Verbatim copy of the buggy _process from decompress_objects.py. Kept here as a regression anchor — this must match the pre-fix code. If D1 starts passing without a code change, this copy has drifted. """ import zlib as _zlib _blob_id = blob_id _split_id = split_id _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e") def _is_zlib(data: bytes) -> bool: return len(data) >= 2 and data[:2] in _ZLIB_MAGIC def _decompress(data: bytes) -> bytes | None: try: return _zlib.decompress(data) except _zlib.error: return None done_count = 0 error_count = 0 plain_count = 0 decompressed_count = 0 hash_mismatch_count = 0 async with sem: try: header = await backend.get_header(oid) except Exception as exc: return oid, "error", 0 if header is None or not _is_zlib(header): return oid, "plain", 0 try: data = await backend.get(oid) except Exception: return oid, "error", 0 if data is None: return oid, "plain", 0 decompressed = _decompress(data) if decompressed is None: return oid, "error", 0 if _blob_id(decompressed) != oid: _, bare_oid = _split_id(oid) # ← only assigned in this branch _, actual = _split_id(_blob_id(decompressed)) return oid, "hash_mismatch", 0 new_size = len(decompressed) if dry_run: return oid, "decompressed", new_size try: # BUG: bare_oid is not defined here — NameError on success path await backend.put(bare_oid, decompressed) # type: ignore[name-defined] # noqa: F821 except Exception as exc: return oid, "error", 0 return oid, "decompressed", new_size # --------------------------------------------------------------------------- # D1 — NameError on success path (confirms the bug) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_D1_bare_oid_name_error_on_success_path() -> None: """Bug: _process returns 'error' for every valid zlib object. bare_oid is only assigned inside the hash-mismatch branch. On the success path (hash matches) bare_oid is undefined → NameError → caught by except Exception → status='error', put() never called. """ raw = b"hello world a plain object" compressed = _zlib_compress(raw) oid = _oid(raw) backend = MagicMock() backend.get_header = AsyncMock(return_value=compressed[:2]) backend.get = AsyncMock(return_value=compressed) backend.put = AsyncMock() sem = asyncio.Semaphore(1) lock = asyncio.Lock() status_oid, status, size = await _buggy_process(oid, backend, sem, lock, dry_run=False) # Bug confirmed: should be "decompressed" but is "error" assert status == "error", ( f"Expected 'error' (NameError bug), got {status!r}. " "The bug may have been fixed — remove this test after D2 passes." ) # put() must NOT have been called (NameError before it could run) backend.put.assert_not_called() # --------------------------------------------------------------------------- # Fixed version of _process # --------------------------------------------------------------------------- async def _fixed_process(oid: str, backend: MagicMock, sem: asyncio.Semaphore, progress_lock: asyncio.Lock, dry_run: bool) -> None: """Fixed _process: bare_oid extracted before the hash check, put uses canonical oid.""" import zlib as _zlib _blob_id = blob_id _split_id = split_id _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e") def _is_zlib(data: bytes) -> bool: return len(data) >= 2 and data[:2] in _ZLIB_MAGIC def _decompress(data: bytes) -> bytes | None: try: return _zlib.decompress(data) except _zlib.error: return None async with sem: try: header = await backend.get_header(oid) except Exception: return oid, "error", 0 if header is None or not _is_zlib(header): return oid, "plain", 0 try: data = await backend.get(oid) except Exception: return oid, "error", 0 if data is None: return oid, "plain", 0 decompressed = _decompress(data) if decompressed is None: return oid, "error", 0 # FIX: extract bare_oid before the hash check so it's always defined _, bare_oid = _split_id(oid) if _blob_id(decompressed) != oid: _, actual_hex = _split_id(_blob_id(decompressed)) return oid, "hash_mismatch", 0 new_size = len(decompressed) if dry_run: return oid, "decompressed", new_size try: # FIX: pass canonical oid (sha256:), not bare_oid await backend.put(oid, decompressed) except Exception: return oid, "error", 0 return oid, "decompressed", new_size # --------------------------------------------------------------------------- # D2 — fixed _process returns "decompressed" for valid zlib object # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_D2_fixed_process_decompresses_valid_zlib() -> None: """After fix: valid zlib object → status 'decompressed', correct size.""" raw = b"hello world a plain object" compressed = _zlib_compress(raw) oid = _oid(raw) backend = MagicMock() backend.get_header = AsyncMock(return_value=compressed[:2]) backend.get = AsyncMock(return_value=compressed) backend.put = AsyncMock() sem = asyncio.Semaphore(1) lock = asyncio.Lock() status_oid, status, size = await _fixed_process(oid, backend, sem, lock, dry_run=False) assert status == "decompressed" assert size == len(raw) backend.put.assert_called_once() # --------------------------------------------------------------------------- # D3 — fixed _process passes canonical oid to backend.put, not bare hex # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_D3_fixed_process_puts_with_canonical_oid() -> None: """After fix: backend.put receives 'sha256:', not bare ''.""" raw = b"canonical key test" compressed = _zlib_compress(raw) oid = _oid(raw) backend = MagicMock() backend.get_header = AsyncMock(return_value=compressed[:2]) backend.get = AsyncMock(return_value=compressed) backend.put = AsyncMock() sem = asyncio.Semaphore(1) lock = asyncio.Lock() await _fixed_process(oid, backend, sem, lock, dry_run=False) put_oid = backend.put.call_args[0][0] assert put_oid.startswith("sha256:"), ( f"backend.put received {put_oid!r} — should be canonical 'sha256:'" ) assert put_oid == oid put_data = backend.put.call_args[0][1] assert put_data == raw # --------------------------------------------------------------------------- # D4 — hash mismatch → skip, no put # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_D4_hash_mismatch_skipped() -> None: """Objects whose decompressed content doesn't match declared oid are skipped.""" raw = b"real content" wrong_raw = b"different content" compressed = _zlib_compress(raw) # compressed(real), but oid is for wrong_raw oid = _oid(wrong_raw) backend = MagicMock() backend.get_header = AsyncMock(return_value=compressed[:2]) backend.get = AsyncMock(return_value=compressed) backend.put = AsyncMock() sem = asyncio.Semaphore(1) lock = asyncio.Lock() _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False) assert status == "hash_mismatch" backend.put.assert_not_called() # --------------------------------------------------------------------------- # D5 — already-plain objects (no zlib header) are skipped # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_D5_plain_object_skipped() -> None: """Objects with no zlib header are already plain — skip without fetching full bytes.""" raw = b"plain bytes no compression" oid = _oid(raw) backend = MagicMock() backend.get_header = AsyncMock(return_value=raw[:2]) # no zlib magic backend.get = AsyncMock() # should not be called backend.put = AsyncMock() sem = asyncio.Semaphore(1) lock = asyncio.Lock() _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False) assert status == "plain" backend.get.assert_not_called() backend.put.assert_not_called() # --------------------------------------------------------------------------- # D6 — missing from R2 (header returns None) treated as plain # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_D6_missing_from_r2_treated_as_plain() -> None: """Objects whose R2 key is gone (header None) are treated as plain/already migrated.""" oid = _oid(b"gone object") backend = MagicMock() backend.get_header = AsyncMock(return_value=None) backend.get = AsyncMock() backend.put = AsyncMock() sem = asyncio.Semaphore(1) lock = asyncio.Lock() _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False) assert status == "plain" backend.get.assert_not_called() backend.put.assert_not_called() # --------------------------------------------------------------------------- # D7 — dry_run skips the put # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_D7_dry_run_does_not_put() -> None: """dry_run=True returns 'decompressed' status without calling backend.put.""" raw = b"dry run test content" compressed = _zlib_compress(raw) oid = _oid(raw) backend = MagicMock() backend.get_header = AsyncMock(return_value=compressed[:2]) backend.get = AsyncMock(return_value=compressed) backend.put = AsyncMock() sem = asyncio.Semaphore(1) lock = asyncio.Lock() _, status, size = await _fixed_process(oid, backend, sem, lock, dry_run=True) assert status == "decompressed" assert size == len(raw) backend.put.assert_not_called()