gabriel / muse public
test_integrity_I1_read_verify.py python
863 lines 33.3 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 7 days ago
1 """I-1: Read-time SHA-256 verification in read_object.
2
3 Every read now re-verifies the digest of the bytes returned against the
4 object_id. This test suite:
5
6 Unit — confirms clean objects pass, corrupt objects raise OSError.
7 Unit — MAX_FILE_BYTES size limit enforcement.
8 Unit — CRITICAL log emission on corruption.
9 Integration — confirms the cat-object command surfaces the error.
10 Integration — verify-pack catches bit-flipped LOCAL STORE objects.
11 Integration — verify-object --all audits the full local store.
12 Perf — 256 MiB object re-hash must complete within 500 ms on NVMe.
13 Stress — bit-flips at every byte position; multi-bit and random fuzz.
14 Regression — write→corrupt→read round-trip never silently returns bad data.
15 """
16 from __future__ import annotations
17 from collections.abc import Mapping
18
19 type _ObjPayload = dict[str, str | bytes]
20
21 import json
22 import logging
23 import os
24 import pathlib
25 import random
26 import struct
27 import tempfile
28 import time
29 from typing import TypedDict
30
31 import msgpack
32 import pytest
33
34 from muse.core.object_store import (
35 objects_dir,
36 object_path,
37 read_object,
38 write_object,
39 )
40 from muse.core.paths import muse_dir
41 from muse.core.validation import MAX_FILE_BYTES
42 from muse.core.types import Manifest, blob_id, fake_id
43 from tests.cli_test_helper import CliRunner, InvokeResult
44
45
46 # ---------------------------------------------------------------------------
47 # Helpers
48 # ---------------------------------------------------------------------------
49
50 def _repo(tmp_path: pathlib.Path) -> pathlib.Path:
51 """Minimal .muse/ skeleton."""
52 muse_dir(tmp_path).mkdir()
53 return tmp_path
54
55
56 def _write(repo: pathlib.Path, data: bytes) -> str:
57 oid = blob_id(data)
58 write_object(repo, oid, data)
59 return oid
60
61
62 def _stored_path(repo: pathlib.Path, oid: str) -> pathlib.Path:
63 return object_path(repo, oid)
64
65
66 def _flip_bit(data: bytes, byte_idx: int, bit_idx: int) -> bytes:
67 """Return *data* with one bit flipped at position (byte_idx, bit_idx)."""
68 ba = bytearray(data)
69 ba[byte_idx] ^= 1 << bit_idx
70 return bytes(ba)
71
72
73 def _corrupt_file(p: pathlib.Path, new_content: bytes) -> None:
74 """Overwrite *p* with *new_content*, temporarily lifting the 0o444 guard.
75
76 Object files are written with mode 0o444 (read-only) to enforce
77 content-addressability at the OS level. Corruption tests must override
78 that protection to simulate disk errors, cosmic-ray bit flips, etc.
79 The permission is restored to 0o444 after the write.
80 """
81 os.chmod(p, 0o644)
82 try:
83 p.write_bytes(new_content)
84 finally:
85 os.chmod(p, 0o444)
86
87
88 def _corrupt_stored(repo: pathlib.Path, oid: str, byte_idx: int = 0, bit_idx: int = 0) -> None:
89 """Flip one bit in the on-disk object file."""
90 p = _stored_path(repo, oid)
91 data = p.read_bytes()
92 _corrupt_file(p, _flip_bit(data, byte_idx, bit_idx))
93
94
95 def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult:
96 runner = CliRunner()
97 env = {"MUSE_REPO_ROOT": str(repo)}
98 return runner.invoke(None, ["cat-object", *args], env=env)
99
100
101 # ---------------------------------------------------------------------------
102 # Unit: happy path — clean objects always pass
103 # ---------------------------------------------------------------------------
104
105 class TestCleanObjectsPass:
106 def test_empty_bytes(self, tmp_path: pathlib.Path) -> None:
107 repo = _repo(tmp_path)
108 oid = _write(repo, b"")
109 assert read_object(repo, oid) == b""
110
111 def test_small_ascii(self, tmp_path: pathlib.Path) -> None:
112 repo = _repo(tmp_path)
113 data = b"hello muse"
114 oid = _write(repo, data)
115 assert read_object(repo, oid) == data
116
117 def test_binary_blob(self, tmp_path: pathlib.Path) -> None:
118 repo = _repo(tmp_path)
119 data = bytes(range(256)) * 100
120 oid = _write(repo, data)
121 assert read_object(repo, oid) == data
122
123 def test_1_mib_object(self, tmp_path: pathlib.Path) -> None:
124 repo = _repo(tmp_path)
125 data = os.urandom(1024 * 1024)
126 oid = _write(repo, data)
127 assert read_object(repo, oid) == data
128
129 def test_read_twice_same_result(self, tmp_path: pathlib.Path) -> None:
130 repo = _repo(tmp_path)
131 data = b"idempotent read"
132 oid = _write(repo, data)
133 assert read_object(repo, oid) == read_object(repo, oid)
134
135 def test_absent_returns_none(self, tmp_path: pathlib.Path) -> None:
136 repo = _repo(tmp_path)
137 absent = fake_id("absent")
138 assert read_object(repo, absent) is None
139
140
141 # ---------------------------------------------------------------------------
142 # Unit: single-bit corruption always raises OSError
143 # ---------------------------------------------------------------------------
144
145 class TestSingleBitCorruption:
146 def test_flip_first_byte_first_bit(self, tmp_path: pathlib.Path) -> None:
147 repo = _repo(tmp_path)
148 oid = _write(repo, b"critical data")
149 _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0)
150 with pytest.raises(OSError, match="integrity check"):
151 read_object(repo, oid)
152
153 def test_flip_first_byte_last_bit(self, tmp_path: pathlib.Path) -> None:
154 repo = _repo(tmp_path)
155 oid = _write(repo, b"critical data")
156 _corrupt_stored(repo, oid, byte_idx=0, bit_idx=7)
157 with pytest.raises(OSError, match="integrity check"):
158 read_object(repo, oid)
159
160 def test_flip_last_byte(self, tmp_path: pathlib.Path) -> None:
161 repo = _repo(tmp_path)
162 data = b"end matters too"
163 oid = _write(repo, data)
164 _corrupt_stored(repo, oid, byte_idx=len(data) - 1, bit_idx=3)
165 with pytest.raises(OSError, match="integrity check"):
166 read_object(repo, oid)
167
168 def test_flip_middle_byte(self, tmp_path: pathlib.Path) -> None:
169 repo = _repo(tmp_path)
170 data = b"middle byte flip"
171 oid = _write(repo, data)
172 mid = len(data) // 2
173 _corrupt_stored(repo, oid, byte_idx=mid, bit_idx=4)
174 with pytest.raises(OSError, match="integrity check"):
175 read_object(repo, oid)
176
177 def test_error_message_contains_expected_prefix(self, tmp_path: pathlib.Path) -> None:
178 repo = _repo(tmp_path)
179 oid = _write(repo, b"check message content")
180 _corrupt_stored(repo, oid, byte_idx=0, bit_idx=1)
181 with pytest.raises(OSError) as exc_info:
182 read_object(repo, oid)
183 msg = str(exc_info.value)
184 assert oid[:8] in msg
185 assert "SHA-256" in msg or "integrity" in msg.lower()
186
187 def test_error_suggests_verify_pack(self, tmp_path: pathlib.Path) -> None:
188 repo = _repo(tmp_path)
189 oid = _write(repo, b"suggest remedy")
190 _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0)
191 with pytest.raises(OSError) as exc_info:
192 read_object(repo, oid)
193 assert "verify-pack" in str(exc_info.value)
194
195 def test_clean_sibling_unaffected(self, tmp_path: pathlib.Path) -> None:
196 """Corruption of one object must not affect a sibling object."""
197 repo = _repo(tmp_path)
198 oid_a = _write(repo, b"object a - clean")
199 oid_b = _write(repo, b"object b - will corrupt")
200 _corrupt_stored(repo, oid_b, byte_idx=0, bit_idx=0)
201 # b is corrupt
202 with pytest.raises(OSError):
203 read_object(repo, oid_b)
204 # a is still fine
205 assert read_object(repo, oid_a) == b"object a - clean"
206
207 def test_truncated_file_raises(self, tmp_path: pathlib.Path) -> None:
208 """A file truncated to zero bytes is caught by the hash check."""
209 repo = _repo(tmp_path)
210 data = b"will be truncated"
211 oid = _write(repo, data)
212 _corrupt_file(_stored_path(repo, oid), b"")
213 with pytest.raises(OSError, match="integrity check"):
214 read_object(repo, oid)
215
216 def test_fully_zeroed_file_raises(self, tmp_path: pathlib.Path) -> None:
217 repo = _repo(tmp_path)
218 data = b"zeroed out"
219 oid = _write(repo, data)
220 _corrupt_file(_stored_path(repo, oid), b"\x00" * len(data))
221 with pytest.raises(OSError, match="integrity check"):
222 read_object(repo, oid)
223
224 def test_appended_byte_raises(self, tmp_path: pathlib.Path) -> None:
225 """A byte appended to the end is caught."""
226 repo = _repo(tmp_path)
227 data = b"exact bytes"
228 oid = _write(repo, data)
229 _corrupt_file(_stored_path(repo, oid), data + b"\xff")
230 with pytest.raises(OSError, match="integrity check"):
231 read_object(repo, oid)
232
233 def test_prepended_byte_raises(self, tmp_path: pathlib.Path) -> None:
234 """A byte prepended to the start is caught."""
235 repo = _repo(tmp_path)
236 data = b"exact bytes"
237 oid = _write(repo, data)
238 _corrupt_file(_stored_path(repo, oid), b"\x00" + data)
239 with pytest.raises(OSError, match="integrity check"):
240 read_object(repo, oid)
241
242
243 # ---------------------------------------------------------------------------
244 # Stress: exhaustive single-bit sweep
245 # ---------------------------------------------------------------------------
246
247 class TestExhaustiveBitFlip:
248 def test_every_bit_in_32_byte_object(self, tmp_path: pathlib.Path) -> None:
249 """Every one of the 256 single-bit flips in a 32-byte payload is caught."""
250 repo = _repo(tmp_path)
251 data = bytes(range(32))
252 oid = _write(repo, data)
253 p = _stored_path(repo, oid)
254 original = p.read_bytes()
255 caught = 0
256 for byte_idx in range(len(original)):
257 for bit_idx in range(8):
258 flipped = _flip_bit(original, byte_idx, bit_idx)
259 _corrupt_file(p, flipped)
260 try:
261 read_object(repo, oid)
262 # A flip that happens to produce a valid SHA-256 preimage
263 # is theoretically impossible — if this branch is hit, fail.
264 pytest.fail(
265 f"Bit flip at byte={byte_idx} bit={bit_idx} "
266 "was not caught — corrupt data returned silently"
267 )
268 except OSError:
269 caught += 1
270 finally:
271 _corrupt_file(p, original)
272 assert caught == len(original) * 8
273
274 @pytest.mark.slow
275 def test_every_bit_in_4096_byte_object(self, tmp_path: pathlib.Path) -> None:
276 """Every bit flip in a 4 KiB object is caught (32 768 checks)."""
277 repo = _repo(tmp_path)
278 data = os.urandom(4096)
279 oid = _write(repo, data)
280 p = _stored_path(repo, oid)
281 original = p.read_bytes()
282 for byte_idx in range(len(original)):
283 for bit_idx in range(8):
284 flipped = _flip_bit(original, byte_idx, bit_idx)
285 _corrupt_file(p, flipped)
286 with pytest.raises(OSError):
287 read_object(repo, oid)
288 _corrupt_file(p, original)
289
290
291 # ---------------------------------------------------------------------------
292 # Stress: multi-bit and random fuzz
293 # ---------------------------------------------------------------------------
294
295 class TestFuzzCorruption:
296 def test_5_random_bits_1000_iterations(self, tmp_path: pathlib.Path) -> None:
297 """Random 5-bit corruption: zero silent passes in 1000 trials."""
298 repo = _repo(tmp_path)
299 data = os.urandom(256)
300 oid = _write(repo, data)
301 p = _stored_path(repo, oid)
302 original = p.read_bytes()
303 rng = random.Random(42)
304 silent_passes = 0
305 for _ in range(1000):
306 ba = bytearray(original)
307 for _ in range(5):
308 idx = rng.randrange(len(ba))
309 bit = rng.randrange(8)
310 ba[idx] ^= 1 << bit
311 _corrupt_file(p, bytes(ba))
312 try:
313 read_object(repo, oid)
314 silent_passes += 1
315 except OSError:
316 pass
317 finally:
318 _corrupt_file(p, original)
319 assert silent_passes == 0, f"{silent_passes} corrupt reads went undetected"
320
321 def test_completely_random_content_1000_iterations(self, tmp_path: pathlib.Path) -> None:
322 """Replacing the file with entirely random bytes is always caught."""
323 repo = _repo(tmp_path)
324 data = os.urandom(128)
325 oid = _write(repo, data)
326 p = _stored_path(repo, oid)
327 original = p.read_bytes()
328 rng = random.Random(99)
329 for _ in range(1000):
330 garbage = bytes(rng.randrange(256) for _ in range(len(original)))
331 _corrupt_file(p, garbage)
332 with pytest.raises(OSError):
333 read_object(repo, oid)
334 _corrupt_file(p, original)
335
336 def test_struct_pack_corruption(self, tmp_path: pathlib.Path) -> None:
337 """Struct-level 4-byte word corruption is always caught."""
338 repo = _repo(tmp_path)
339 data = b"struct corruption test " * 10
340 oid = _write(repo, data)
341 p = _stored_path(repo, oid)
342 original = p.read_bytes()
343 for word_offset in range(0, len(original) - 3, 4):
344 ba = bytearray(original)
345 # XOR one 32-bit word
346 word = struct.unpack_from(">I", ba, word_offset)[0]
347 struct.pack_into(">I", ba, word_offset, word ^ 0xDEADBEEF)
348 _corrupt_file(p, bytes(ba))
349 with pytest.raises(OSError):
350 read_object(repo, oid)
351 _corrupt_file(p, original)
352
353
354 # ---------------------------------------------------------------------------
355 # Integration: cat-object surfaces the error
356 # ---------------------------------------------------------------------------
357
358 class TestCatObjectIntegration:
359 def test_cat_clean_object_json(self, tmp_path: pathlib.Path) -> None:
360 repo = _repo(tmp_path)
361 data = b"cat-object integration test"
362 oid = _write(repo, data)
363 r = _invoke(repo, "--json", oid)
364 assert r.exit_code == 0
365 import json
366 d = json.loads(r.output)
367 assert d["object_id"] == oid
368
369 def test_cat_corrupt_object_errors(self, tmp_path: pathlib.Path) -> None:
370 """cat-object raw mode on a bit-flipped object must exit non-zero.
371
372 The --json (info) mode only checks file existence/size — it intentionally
373 does not read content. The raw mode MUST verify the hash before streaming
374 any bytes to stdout.
375 """
376 repo = _repo(tmp_path)
377 data = b"will be corrupted for cat-object test"
378 oid = _write(repo, data)
379 _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0)
380 # Raw mode (no --json) is the mode that reads and streams bytes.
381 r = _invoke(repo, oid)
382 assert r.exit_code != 0
383
384 def test_cat_corrupt_object_no_raw_data_in_output(self, tmp_path: pathlib.Path) -> None:
385 """A corrupt object must NEVER have its raw bytes echoed to stdout."""
386 repo = _repo(tmp_path)
387 sentinel = b"TOP_SECRET_PAYLOAD_MUST_NOT_LEAK"
388 oid = _write(repo, sentinel)
389 _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0)
390 r = _invoke(repo, oid)
391 # The sentinel string must not appear anywhere in stdout
392 assert b"TOP_SECRET_PAYLOAD" not in r.stdout_bytes
393
394
395 # ---------------------------------------------------------------------------
396 # Regression: write → corrupt → read never returns bad data
397 # ---------------------------------------------------------------------------
398
399 class TestRegressionSilentCorruption:
400 def test_concurrent_read_after_corruption(self, tmp_path: pathlib.Path) -> None:
401 """Simulate a read race: write clean, corrupt disk, read — must raise."""
402 import threading
403 repo = _repo(tmp_path)
404 data = os.urandom(4096)
405 oid = _write(repo, data)
406 results: list[str] = []
407
408 def corrupt_then_read() -> None:
409 _corrupt_stored(repo, oid, byte_idx=100, bit_idx=3)
410 try:
411 read_object(repo, oid)
412 results.append("silent_pass")
413 except OSError:
414 results.append("caught")
415
416 t = threading.Thread(target=corrupt_then_read)
417 t.start()
418 t.join()
419 assert "silent_pass" not in results, "Corrupt data returned silently in thread"
420
421 def test_large_object_stream_integrity(self, tmp_path: pathlib.Path) -> None:
422 """16 MiB object: corruption in the second chunk boundary is caught."""
423 repo = _repo(tmp_path)
424 # 16 MiB — forces multiple 64 KiB streaming chunks
425 data = os.urandom(16 * 1024 * 1024)
426 oid = _write(repo, data)
427 p = _stored_path(repo, oid)
428 original = p.read_bytes()
429 # Corrupt a byte at the second chunk boundary (64 KiB + 1)
430 _corrupt_file(p, _flip_bit(original, 65537, 0))
431 with pytest.raises(OSError, match="integrity check"):
432 read_object(repo, oid)
433 # Restore and confirm clean read works
434 _corrupt_file(p, original)
435 assert read_object(repo, oid) == data
436
437
438 # ---------------------------------------------------------------------------
439 # Gap 2: CRITICAL log emission on corruption
440 # ---------------------------------------------------------------------------
441
442 class TestCriticalLogOnCorruption:
443 """Corruption must be logged at CRITICAL — agents parsing structured logs
444 must receive a severity signal, not just a silent Python exception."""
445
446 def test_critical_logged_on_bit_flip(
447 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
448 ) -> None:
449 repo = _repo(tmp_path)
450 data = b"must log at critical"
451 oid = _write(repo, data)
452 _corrupt_stored(repo, oid, byte_idx=0, bit_idx=0)
453
454 with caplog.at_level(logging.CRITICAL, logger="muse.core.object_store"):
455 with pytest.raises(OSError):
456 read_object(repo, oid)
457
458 critical_records = [r for r in caplog.records if r.levelno >= logging.CRITICAL]
459 assert critical_records, "No CRITICAL log emitted on bit-flip corruption"
460 assert any(oid[:8] in r.getMessage() for r in critical_records), (
461 "CRITICAL log does not include the object ID"
462 )
463
464 def test_critical_message_mentions_corruption(
465 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
466 ) -> None:
467 repo = _repo(tmp_path)
468 data = b"critical message check"
469 oid = _write(repo, data)
470 _corrupt_stored(repo, oid, byte_idx=5, bit_idx=2)
471
472 with caplog.at_level(logging.CRITICAL, logger="muse.core.object_store"):
473 with pytest.raises(OSError):
474 read_object(repo, oid)
475
476 messages = " ".join(r.getMessage() for r in caplog.records)
477 assert "corrupt" in messages.lower() or "integrity" in messages.lower(), (
478 f"CRITICAL log does not mention corruption: {messages!r}"
479 )
480
481 def test_no_critical_on_clean_read(
482 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
483 ) -> None:
484 """A clean read must NOT emit CRITICAL — no false alarms."""
485 repo = _repo(tmp_path)
486 data = b"clean - no alarm"
487 oid = _write(repo, data)
488
489 with caplog.at_level(logging.CRITICAL, logger="muse.core.object_store"):
490 result = read_object(repo, oid)
491
492 assert result == data
493 critical_records = [r for r in caplog.records if r.levelno >= logging.CRITICAL]
494 assert critical_records == [], f"False CRITICAL alarm on clean read: {critical_records}"
495
496
497 # ---------------------------------------------------------------------------
498 # Gap 3: MAX_FILE_BYTES size limit enforcement
499 # ---------------------------------------------------------------------------
500
501 class TestMaxFileBytesLimit:
502 """read_object must reject objects that exceed MAX_FILE_BYTES before
503 reading their content into memory — preventing OOM on pathological input."""
504
505 def test_oversized_object_raises_oserror(self, tmp_path: pathlib.Path) -> None:
506 """A file exceeding MAX_FILE_BYTES raises OSError before any read."""
507 from unittest.mock import patch as _patch, MagicMock
508 repo = _repo(tmp_path)
509 data = b"placeholder"
510 oid = _write(repo, data)
511
512 # Inject a fake stat result with an inflated st_size so we don't
513 # need to allocate gigabytes of real data.
514 mock_stat = MagicMock()
515 mock_stat.st_size = MAX_FILE_BYTES + 1
516
517 with _patch.object(pathlib.Path, "stat", return_value=mock_stat):
518 with pytest.raises(OSError, match="MiB read limit"):
519 read_object(repo, oid)
520
521 def test_exactly_max_size_allowed(self, tmp_path: pathlib.Path) -> None:
522 """An object exactly at MAX_FILE_BYTES must be readable (boundary check)."""
523 from unittest.mock import patch as _patch, MagicMock
524 repo = _repo(tmp_path)
525 data = b"boundary"
526 oid = _write(repo, data)
527
528 # st_size == MAX_FILE_BYTES: the guard is strict greater-than, so this
529 # should not fire. The actual (small) file is then read and verified.
530 mock_stat = MagicMock()
531 mock_stat.st_size = MAX_FILE_BYTES
532 with _patch.object(pathlib.Path, "stat", return_value=mock_stat):
533 result = read_object(repo, oid)
534
535 assert result == data
536
537 def test_error_message_includes_mib_limit(self, tmp_path: pathlib.Path) -> None:
538 """The OSError message must include the configured MiB limit."""
539 from unittest.mock import patch as _patch, MagicMock
540 repo = _repo(tmp_path)
541 data = b"size limit error msg"
542 oid = _write(repo, data)
543
544 mock_stat = MagicMock()
545 mock_stat.st_size = MAX_FILE_BYTES + 1024
546
547 with _patch.object(pathlib.Path, "stat", return_value=mock_stat):
548 with pytest.raises(OSError) as exc_info:
549 read_object(repo, oid)
550
551 assert "MiB" in str(exc_info.value), (
552 f"Error message does not include MiB limit: {exc_info.value}"
553 )
554
555
556 # ---------------------------------------------------------------------------
557 # Gap 4: verify-pack catches bit-flipped LOCAL STORE objects
558 # ---------------------------------------------------------------------------
559
560 def _full_repo(tmp_path: pathlib.Path) -> pathlib.Path:
561 """Create a minimal repo with objects, HEAD and config for CLI invocation."""
562 dot_muse = muse_dir(tmp_path)
563 dot_muse.mkdir()
564 for d in ("objects", "commits", "snapshots", "refs/heads"):
565 (dot_muse / d).mkdir(parents=True, exist_ok=True)
566 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8")
567 (dot_muse / "repo.json").write_text(
568 '{"repo_id": "test-repo", "domain": "code", "default_branch": "main"}',
569 encoding="utf-8",
570 )
571 return tmp_path
572
573
574 class _BundleSnapEntry(TypedDict, total=False):
575 snapshot_id: str
576 manifest: Manifest
577
578
579 _BUNDLE_META = {
580 "mode": "full",
581 "base_commits": [],
582 "created_at": "2026-01-01T00:00:00Z",
583 }
584
585
586 class _BundleDict(TypedDict):
587 objects: list[Mapping[str, str | bytes]]
588 snapshots: list[_BundleSnapEntry]
589 commits: list[Mapping[str, str]]
590 meta: Mapping[str, object]
591
592
593 def _good_bundle_obj(data: bytes) -> _ObjPayload:
594 oid = blob_id(data)
595 return {"object_id": oid, "content": data}
596
597
598 def _make_pack(objects: list[_ObjPayload]) -> bytes:
599 mpack: _BundleDict = {"blobs": objects, "snapshots": [], "commits": [], "meta": _BUNDLE_META}
600 packed: bytes = msgpack.packb(mpack, use_bin_type=True)
601 return packed
602
603
604 def _snap_bundle(snap_id: str, manifest: Manifest) -> bytes:
605 """Build a mpack whose snapshot references objects in the LOCAL STORE."""
606 mpack: _BundleDict = {
607 "blobs": [],
608 "snapshots": [{"snapshot_id": snap_id, "manifest": manifest}],
609 "commits": [],
610 "meta": _BUNDLE_META,
611 }
612 packed: bytes = msgpack.packb(mpack, use_bin_type=True)
613 return packed
614
615
616 class TestVerifyPackLocalStoreIntegrity:
617 """verify-pack must catch SHA-256 mismatches in LOCAL STORE objects that
618 are referenced by mpack snapshots — not just objects inside the mpack.
619
620 Before the fix: has_object() (existence check only) was used.
621 After the fix: read_object() (hash-verified read) is used — a bit-flipped
622 local store object is reported as a failure.
623 """
624
625 def _vp(
626 self,
627 repo: pathlib.Path,
628 extra_args: list[str],
629 env_root: pathlib.Path | None = None,
630 ) -> "InvokeResult":
631 env_root = env_root or repo
632 runner = CliRunner()
633 return runner.invoke(None, ["verify-pack", "--json"] + extra_args,
634 env={"MUSE_REPO_ROOT": str(env_root)})
635
636 def test_clean_local_store_object_passes(self, tmp_path: pathlib.Path) -> None:
637 """A snapshot referencing a clean local store object must verify OK."""
638 repo = _full_repo(tmp_path)
639 data = b"clean local object"
640 oid = blob_id(data)
641 write_object(repo, oid, data)
642
643 snap_id = fake_id("snap1")
644 bf = tmp_path / "mpack.muse"
645 bf.write_bytes(_snap_bundle(snap_id, {"file.py": oid}))
646
647 r = self._vp(repo, ["--file", str(bf)])
648 assert r.exit_code == 0, f"Expected 0 but got {r.exit_code}: {r.output}"
649 d = json.loads(r.output)
650 assert d["all_ok"] is True
651
652 def test_bit_flipped_local_store_object_fails(self, tmp_path: pathlib.Path) -> None:
653 """A snapshot referencing a bit-flipped local store object must fail.
654
655 This is the core regression test for the has_object→read_object fix.
656 Before the fix, verify-pack would report all_ok=True even when the local
657 store contained a corrupt object.
658 """
659 repo = _full_repo(tmp_path)
660 data = b"will be bit-flipped in local store"
661 oid = blob_id(data)
662 write_object(repo, oid, data)
663
664 # Flip a bit in the locally stored object.
665 stored = object_path(repo, oid)
666 original = stored.read_bytes()
667 _corrupt_file(stored, _flip_bit(original, 0, 0))
668
669 snap_id = fake_id("snap2")
670 bf = tmp_path / "mpack.muse"
671 bf.write_bytes(_snap_bundle(snap_id, {"code.py": oid}))
672
673 r = self._vp(repo, ["--file", str(bf)])
674 assert r.exit_code != 0, (
675 "verify-pack reported all_ok=True on a bit-flipped local store object "
676 "(regression: has_object() was used instead of read_object())"
677 )
678 d = json.loads(r.output)
679 assert d["all_ok"] is False
680 assert any(
681 "integrity" in f["error"].lower() or "sha-256" in f["error"].lower()
682 for f in d["failures"]
683 ), f"No integrity error in failures: {d['failures']}"
684
685 def test_zeroed_local_store_object_fails(self, tmp_path: pathlib.Path) -> None:
686 """Zeroing the stored file content is caught by verify-pack."""
687 repo = _full_repo(tmp_path)
688 data = b"will be zeroed"
689 oid = blob_id(data)
690 write_object(repo, oid, data)
691 _corrupt_file(object_path(repo, oid), b"\x00" * len(data))
692
693 snap_id = fake_id("snap3")
694 bf = tmp_path / "mpack.muse"
695 bf.write_bytes(_snap_bundle(snap_id, {"z.py": oid}))
696
697 r = self._vp(repo, ["--file", str(bf)])
698 d = json.loads(r.output)
699 assert d["all_ok"] is False
700
701 def test_no_local_flag_skips_local_check(self, tmp_path: pathlib.Path) -> None:
702 """--no-local skips local store checks entirely — corrupt local object not reported."""
703 repo = _full_repo(tmp_path)
704 data = b"corrupt but skipped"
705 oid = blob_id(data)
706 write_object(repo, oid, data)
707 stored = object_path(repo, oid)
708 _corrupt_file(stored, _flip_bit(stored.read_bytes(), 0, 0))
709
710 snap_id = fake_id("snap4")
711 bf = tmp_path / "mpack.muse"
712 bf.write_bytes(_snap_bundle(snap_id, {"s.py": oid}))
713
714 r = self._vp(repo, ["--file", str(bf), "--no-local"])
715 # --no-local skips the integrity check; the object is "missing" from
716 # the mpack (not present in bundle_object_ids) and local check is skipped,
717 # so the snapshot manifest entry reports a missing object — not a corruption.
718 d = json.loads(r.output)
719 # Either all_ok (if no manifest check happens) or a "missing" failure —
720 # crucially NOT an integrity/SHA-256 failure.
721 if not d["all_ok"]:
722 for f in d["failures"]:
723 assert "integrity" not in f["error"].lower(), (
724 "--no-local should not report integrity failures"
725 )
726
727
728 # ---------------------------------------------------------------------------
729 # Gap 5: verify-object --all audits the full local store
730 # ---------------------------------------------------------------------------
731
732 class TestVerifyObjectAllCorrupt:
733 """muse verify-object --all must surface bit-flipped objects
734 across the entire local store — not just objects passed on the CLI."""
735
736 def _vobj(
737 self, repo: pathlib.Path, args: list[str]
738 ) -> "InvokeResult":
739 runner = CliRunner()
740 return runner.invoke(
741 None, ["verify-object", "--json"] + args,
742 env={"MUSE_REPO_ROOT": str(repo)},
743 )
744
745 def test_verify_all_clean_store_passes(self, tmp_path: pathlib.Path) -> None:
746 repo = _full_repo(tmp_path)
747 for i in range(5):
748 write_object(repo, blob_id(f"obj{i}".encode()), f"obj{i}".encode())
749 r = self._vobj(repo, ["--all"])
750 assert r.exit_code == 0
751 d = json.loads(r.output)
752 assert d["all_ok"] is True
753 assert d["checked"] == 5
754
755 def test_verify_all_catches_single_bit_flip(self, tmp_path: pathlib.Path) -> None:
756 """--all must detect a bit-flip in one object among many clean ones."""
757 repo = _full_repo(tmp_path)
758 oids: list[str] = []
759 for i in range(10):
760 data = f"object-{i}".encode()
761 oid = blob_id(data)
762 write_object(repo, oid, data)
763 oids.append(oid)
764
765 # Corrupt exactly one object.
766 corrupt_oid = oids[4]
767 p = object_path(repo, corrupt_oid)
768 _corrupt_file(p, _flip_bit(p.read_bytes(), 0, 0))
769
770 r = self._vobj(repo, ["--all"])
771 assert r.exit_code != 0
772 d = json.loads(r.output)
773 assert d["all_ok"] is False
774 assert d["failed"] == 1
775 assert any(res["object_id"] == corrupt_oid for res in d["results"] if not res["ok"])
776
777 def test_verify_all_catches_multiple_corruptions(self, tmp_path: pathlib.Path) -> None:
778 """--all must catch ALL corrupt objects, not just the first."""
779 repo = _full_repo(tmp_path)
780 oids: list[str] = []
781 for i in range(6):
782 data = f"multi-corrupt-{i}".encode()
783 oid = blob_id(data)
784 write_object(repo, oid, data)
785 oids.append(oid)
786
787 # Corrupt three objects.
788 corrupt = {oids[0], oids[2], oids[5]}
789 for oid in corrupt:
790 p = object_path(repo, oid)
791 _corrupt_file(p, _flip_bit(p.read_bytes(), 0, 0))
792
793 r = self._vobj(repo, ["--all"])
794 d = json.loads(r.output)
795 assert d["all_ok"] is False
796 failed_ids = {res["object_id"] for res in d["results"] if not res["ok"]}
797 assert failed_ids == corrupt, f"Expected {corrupt}, got {failed_ids}"
798
799 def test_verify_explicit_id_corrupt(self, tmp_path: pathlib.Path) -> None:
800 """Passing a corrupt object ID explicitly must detect the failure."""
801 repo = _full_repo(tmp_path)
802 data = b"explicit check"
803 oid = blob_id(data)
804 write_object(repo, oid, data)
805 p = object_path(repo, oid)
806 _corrupt_file(p, _flip_bit(p.read_bytes(), 3, 1))
807
808 r = self._vobj(repo, [oid])
809 assert r.exit_code != 0
810 d = json.loads(r.output)
811 assert d["all_ok"] is False
812
813
814 # ---------------------------------------------------------------------------
815 # Gap 6: Performance benchmark — 256 MiB re-hash < 500 ms
816 # ---------------------------------------------------------------------------
817
818 class TestPerformanceBenchmark:
819 """The read-time re-hash must not introduce unacceptable latency.
820
821 Plan requirement: overhead of re-hashing a 256 MiB object must be
822 < 500 ms on modern hardware (streaming SHA-256 is I/O-bound, not
823 CPU-bound — the bottleneck is NVMe throughput, not the hash itself).
824
825 This test writes to a tmpfs / in-memory filesystem (tmp_path) so it
826 measures pure CPU and memory bandwidth, which is the lower bound.
827 Real NVMe latency may be higher but SHA-256 itself adds < 50 ms for
828 256 MiB on any modern CPU.
829 """
830
831 @pytest.mark.perf
832 def test_256_mib_hash_under_500ms(self, tmp_path: pathlib.Path) -> None:
833 """read_object on a 256 MiB blob must complete within 500 ms."""
834 repo = _repo(tmp_path)
835 size = 256 * 1024 * 1024 - 64 # leave room for the blob header
836 data = os.urandom(size)
837 oid = _write(repo, data)
838
839 start = time.perf_counter()
840 result = read_object(repo, oid)
841 duration_ms = (time.perf_counter() - start) * 1000
842
843 assert result == data, "256 MiB object content corrupted"
844 assert duration_ms < 500, (
845 f"read_object re-hash took {duration_ms:.1f} ms on a 256 MiB object "
846 f"— exceeds the 500 ms budget. SHA-256 performance regression detected."
847 )
848
849 @pytest.mark.perf
850 def test_1_mib_hash_under_10ms(self, tmp_path: pathlib.Path) -> None:
851 """1 MiB object hash must be sub-10 ms — baseline for small commits."""
852 repo = _repo(tmp_path)
853 data = os.urandom(1024 * 1024)
854 oid = _write(repo, data)
855
856 start = time.perf_counter()
857 result = read_object(repo, oid)
858 duration_ms = (time.perf_counter() - start) * 1000
859
860 assert result == data
861 assert duration_ms < 10, (
862 f"1 MiB read_object took {duration_ms:.1f} ms — performance regression"
863 )
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 7 days ago