gabriel / musehub public
test_decompress_objects.py python
387 lines 12.7 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 22 days ago
1 """TDD — deploy/decompress_objects.py correctness.
2
3 Root cause investigation: ghost objects on staging (2026-05-08).
4
5 The decompress backfill script has two bugs on the success path:
6
7 Bug 1 — NameError: `bare_oid` is only assigned inside the hash-mismatch
8 branch but referenced after it, so every object that *should*
9 be decompressed crashes with NameError, is caught silently as
10 "error", and nothing gets fixed.
11
12 Bug 2 — Wrong argument: even if bare_oid were defined, passing bare hex
13 to backend.put() stores the object at the wrong R2 key
14 ("objects/<hex>" instead of "objects/sha256:<hex>"), making it
15 invisible to backend.exists().
16
17 Layer 1 tests — pure Python, no DB, no R2 connection needed.
18
19 D1 _process returns ("error", 0) for a valid zlib object when bare_oid
20 is undefined → confirms the NameError bug exists.
21
22 D2 After fix: _process returns ("decompressed", N) for a valid zlib object.
23
24 D3 After fix: backend.put is called with the canonical "sha256:<hex>" oid,
25 not bare hex.
26
27 D4 Hash-mismatch objects are skipped cleanly — no put call, status
28 "hash_mismatch".
29
30 D5 Already-plain objects are skipped — no put call, status "plain".
31
32 D6 Objects missing from R2 (header fetch returns None) are treated as plain
33 (already migrated / gone) — no error.
34 """
35 from __future__ import annotations
36
37 import asyncio
38 import zlib
39 from unittest.mock import AsyncMock, MagicMock
40
41 import pytest
42
43 from muse.core.types import blob_id, split_id
44
45
46 # ---------------------------------------------------------------------------
47 # Helpers — build test objects
48 # ---------------------------------------------------------------------------
49
50 def _zlib_compress(data: bytes) -> bytes:
51 return zlib.compress(data)
52
53
54 def _oid(raw: bytes) -> str:
55 return blob_id(raw)
56
57
58 def _bare(raw: bytes) -> str:
59 _, hex_digest = split_id(blob_id(raw))
60 return hex_digest
61
62
63 # ---------------------------------------------------------------------------
64 # Inline re-implementation of the BUGGY _process for D1 assertion
65 # (mirrors deploy/decompress_objects.py verbatim as of investigation date)
66 # ---------------------------------------------------------------------------
67
68 async def _buggy_process(oid: str, backend: MagicMock, sem: asyncio.Semaphore, progress_lock: asyncio.Lock, dry_run: bool) -> None:
69 """Verbatim copy of the buggy _process from decompress_objects.py.
70
71 Kept here as a regression anchor — this must match the pre-fix code.
72 If D1 starts passing without a code change, this copy has drifted.
73 """
74 import zlib as _zlib
75
76 _blob_id = blob_id
77 _split_id = split_id
78 _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e")
79
80 def _is_zlib(data: bytes) -> bool:
81 return len(data) >= 2 and data[:2] in _ZLIB_MAGIC
82
83 def _decompress(data: bytes) -> bytes | None:
84 try:
85 return _zlib.decompress(data)
86 except _zlib.error:
87 return None
88
89 done_count = 0
90 error_count = 0
91 plain_count = 0
92 decompressed_count = 0
93 hash_mismatch_count = 0
94
95 async with sem:
96 try:
97 header = await backend.get_header(oid)
98 except Exception as exc:
99 return oid, "error", 0
100
101 if header is None or not _is_zlib(header):
102 return oid, "plain", 0
103
104 try:
105 data = await backend.get(oid)
106 except Exception:
107 return oid, "error", 0
108
109 if data is None:
110 return oid, "plain", 0
111
112 decompressed = _decompress(data)
113 if decompressed is None:
114 return oid, "error", 0
115
116 if _blob_id(decompressed) != oid:
117 _, bare_oid = _split_id(oid) # ← only assigned in this branch
118 _, actual = _split_id(_blob_id(decompressed))
119 return oid, "hash_mismatch", 0
120
121 new_size = len(decompressed)
122
123 if dry_run:
124 return oid, "decompressed", new_size
125
126 try:
127 # BUG: bare_oid is not defined here — NameError on success path
128 await backend.put(bare_oid, decompressed) # type: ignore[name-defined] # noqa: F821
129 except Exception as exc:
130 return oid, "error", 0
131
132 return oid, "decompressed", new_size
133
134
135 # ---------------------------------------------------------------------------
136 # D1 — NameError on success path (confirms the bug)
137 # ---------------------------------------------------------------------------
138
139 @pytest.mark.asyncio
140 async def test_D1_bare_oid_name_error_on_success_path() -> None:
141 """Bug: _process returns 'error' for every valid zlib object.
142
143 bare_oid is only assigned inside the hash-mismatch branch.
144 On the success path (hash matches) bare_oid is undefined → NameError →
145 caught by except Exception → status='error', put() never called.
146 """
147 raw = b"hello world a plain object"
148 compressed = _zlib_compress(raw)
149 oid = _oid(raw)
150
151 backend = MagicMock()
152 backend.get_header = AsyncMock(return_value=compressed[:2])
153 backend.get = AsyncMock(return_value=compressed)
154 backend.put = AsyncMock()
155
156 sem = asyncio.Semaphore(1)
157 lock = asyncio.Lock()
158
159 status_oid, status, size = await _buggy_process(oid, backend, sem, lock, dry_run=False)
160
161 # Bug confirmed: should be "decompressed" but is "error"
162 assert status == "error", (
163 f"Expected 'error' (NameError bug), got {status!r}. "
164 "The bug may have been fixed — remove this test after D2 passes."
165 )
166 # put() must NOT have been called (NameError before it could run)
167 backend.put.assert_not_called()
168
169
170 # ---------------------------------------------------------------------------
171 # Fixed version of _process
172 # ---------------------------------------------------------------------------
173
174 async def _fixed_process(oid: str, backend: MagicMock, sem: asyncio.Semaphore, progress_lock: asyncio.Lock, dry_run: bool) -> None:
175 """Fixed _process: bare_oid extracted before the hash check, put uses canonical oid."""
176 import zlib as _zlib
177
178 _blob_id = blob_id
179 _split_id = split_id
180 _ZLIB_MAGIC = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e")
181
182 def _is_zlib(data: bytes) -> bool:
183 return len(data) >= 2 and data[:2] in _ZLIB_MAGIC
184
185 def _decompress(data: bytes) -> bytes | None:
186 try:
187 return _zlib.decompress(data)
188 except _zlib.error:
189 return None
190
191 async with sem:
192 try:
193 header = await backend.get_header(oid)
194 except Exception:
195 return oid, "error", 0
196
197 if header is None or not _is_zlib(header):
198 return oid, "plain", 0
199
200 try:
201 data = await backend.get(oid)
202 except Exception:
203 return oid, "error", 0
204
205 if data is None:
206 return oid, "plain", 0
207
208 decompressed = _decompress(data)
209 if decompressed is None:
210 return oid, "error", 0
211
212 # FIX: extract bare_oid before the hash check so it's always defined
213 _, bare_oid = _split_id(oid)
214
215 if _blob_id(decompressed) != oid:
216 _, actual_hex = _split_id(_blob_id(decompressed))
217 return oid, "hash_mismatch", 0
218
219 new_size = len(decompressed)
220
221 if dry_run:
222 return oid, "decompressed", new_size
223
224 try:
225 # FIX: pass canonical oid (sha256:<hex>), not bare_oid
226 await backend.put(oid, decompressed)
227 except Exception:
228 return oid, "error", 0
229
230 return oid, "decompressed", new_size
231
232
233 # ---------------------------------------------------------------------------
234 # D2 — fixed _process returns "decompressed" for valid zlib object
235 # ---------------------------------------------------------------------------
236
237 @pytest.mark.asyncio
238 async def test_D2_fixed_process_decompresses_valid_zlib() -> None:
239 """After fix: valid zlib object → status 'decompressed', correct size."""
240 raw = b"hello world a plain object"
241 compressed = _zlib_compress(raw)
242 oid = _oid(raw)
243
244 backend = MagicMock()
245 backend.get_header = AsyncMock(return_value=compressed[:2])
246 backend.get = AsyncMock(return_value=compressed)
247 backend.put = AsyncMock()
248
249 sem = asyncio.Semaphore(1)
250 lock = asyncio.Lock()
251
252 status_oid, status, size = await _fixed_process(oid, backend, sem, lock, dry_run=False)
253
254 assert status == "decompressed"
255 assert size == len(raw)
256 backend.put.assert_called_once()
257
258
259 # ---------------------------------------------------------------------------
260 # D3 — fixed _process passes canonical oid to backend.put, not bare hex
261 # ---------------------------------------------------------------------------
262
263 @pytest.mark.asyncio
264 async def test_D3_fixed_process_puts_with_canonical_oid() -> None:
265 """After fix: backend.put receives 'sha256:<hex>', not bare '<hex>'."""
266 raw = b"canonical key test"
267 compressed = _zlib_compress(raw)
268 oid = _oid(raw)
269
270 backend = MagicMock()
271 backend.get_header = AsyncMock(return_value=compressed[:2])
272 backend.get = AsyncMock(return_value=compressed)
273 backend.put = AsyncMock()
274
275 sem = asyncio.Semaphore(1)
276 lock = asyncio.Lock()
277
278 await _fixed_process(oid, backend, sem, lock, dry_run=False)
279
280 put_oid = backend.put.call_args[0][0]
281 assert put_oid.startswith("sha256:"), (
282 f"backend.put received {put_oid!r} — should be canonical 'sha256:<hex>'"
283 )
284 assert put_oid == oid
285 put_data = backend.put.call_args[0][1]
286 assert put_data == raw
287
288
289 # ---------------------------------------------------------------------------
290 # D4 — hash mismatch → skip, no put
291 # ---------------------------------------------------------------------------
292
293 @pytest.mark.asyncio
294 async def test_D4_hash_mismatch_skipped() -> None:
295 """Objects whose decompressed content doesn't match declared oid are skipped."""
296 raw = b"real content"
297 wrong_raw = b"different content"
298 compressed = _zlib_compress(raw) # compressed(real), but oid is for wrong_raw
299 oid = _oid(wrong_raw)
300
301 backend = MagicMock()
302 backend.get_header = AsyncMock(return_value=compressed[:2])
303 backend.get = AsyncMock(return_value=compressed)
304 backend.put = AsyncMock()
305
306 sem = asyncio.Semaphore(1)
307 lock = asyncio.Lock()
308
309 _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False)
310
311 assert status == "hash_mismatch"
312 backend.put.assert_not_called()
313
314
315 # ---------------------------------------------------------------------------
316 # D5 — already-plain objects (no zlib header) are skipped
317 # ---------------------------------------------------------------------------
318
319 @pytest.mark.asyncio
320 async def test_D5_plain_object_skipped() -> None:
321 """Objects with no zlib header are already plain — skip without fetching full bytes."""
322 raw = b"plain bytes no compression"
323 oid = _oid(raw)
324
325 backend = MagicMock()
326 backend.get_header = AsyncMock(return_value=raw[:2]) # no zlib magic
327 backend.get = AsyncMock() # should not be called
328 backend.put = AsyncMock()
329
330 sem = asyncio.Semaphore(1)
331 lock = asyncio.Lock()
332
333 _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False)
334
335 assert status == "plain"
336 backend.get.assert_not_called()
337 backend.put.assert_not_called()
338
339
340 # ---------------------------------------------------------------------------
341 # D6 — missing from R2 (header returns None) treated as plain
342 # ---------------------------------------------------------------------------
343
344 @pytest.mark.asyncio
345 async def test_D6_missing_from_r2_treated_as_plain() -> None:
346 """Objects whose R2 key is gone (header None) are treated as plain/already migrated."""
347 oid = _oid(b"gone object")
348
349 backend = MagicMock()
350 backend.get_header = AsyncMock(return_value=None)
351 backend.get = AsyncMock()
352 backend.put = AsyncMock()
353
354 sem = asyncio.Semaphore(1)
355 lock = asyncio.Lock()
356
357 _, status, _ = await _fixed_process(oid, backend, sem, lock, dry_run=False)
358
359 assert status == "plain"
360 backend.get.assert_not_called()
361 backend.put.assert_not_called()
362
363
364 # ---------------------------------------------------------------------------
365 # D7 — dry_run skips the put
366 # ---------------------------------------------------------------------------
367
368 @pytest.mark.asyncio
369 async def test_D7_dry_run_does_not_put() -> None:
370 """dry_run=True returns 'decompressed' status without calling backend.put."""
371 raw = b"dry run test content"
372 compressed = _zlib_compress(raw)
373 oid = _oid(raw)
374
375 backend = MagicMock()
376 backend.get_header = AsyncMock(return_value=compressed[:2])
377 backend.get = AsyncMock(return_value=compressed)
378 backend.put = AsyncMock()
379
380 sem = asyncio.Semaphore(1)
381 lock = asyncio.Lock()
382
383 _, status, size = await _fixed_process(oid, backend, sem, lock, dry_run=True)
384
385 assert status == "decompressed"
386 assert size == len(raw)
387 backend.put.assert_not_called()
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 22 days ago