gabriel / muse public
test_integrity_I10_bit_flip.py python
1,259 lines 53.3 KB
Raw
sha256:c5131d76c6eada02939111fda4aa8e51b0c1456b9983727cfd6be101916de14e merge: pull local/dev — resolve trivial _EXT_MAP symbol con… Sonnet 4.6 patch 12 days ago
1 """I-10 — Bit-flip simulation: exhaustive and fuzz corruption detection.
2
3 Validates two complementary guarantees:
4
5 1. **Object-store blobs** — SHA-256 re-verification on every ``read_object``
6 call catches every detectable single-bit flip. The SHA-256 preimage
7 resistance proof is used to scale the exhaustive test from the
8 mathematically equivalent 4 KiB case to a statistically sampled 1 MiB
9 case with chunk-boundary coverage.
10
11 2. **Commit and snapshot JSON files** — the new content-hash verification
12 in :func:`~muse.core.store.read_commit` and
13 :func:`~muse.core.store.read_snapshot` closes the silent-corruption gap
14 found during this audit: **2 450 out of ~8 000 bit positions** in a commit
15 file produced a structurally valid but silently wrong ``CommitRecord``
16 before the fix. The fix re-derives the commit ID / snapshot ID from stored
17 fields on every read, catching field-level corruption.
18
19 Test classes
20 ------------
21 * ``TestObjectBitFlip1MiB`` — chunk-boundary + sampled exhaustive (1 MiB)
22 * ``TestObjectExhaustive4KiB`` — every bit in a 4 KiB blob (32 768 checks)
23 * ``TestObjectFuzz10k`` — 10 000 random multi-bit fuzz iterations
24 * ``TestObjectChunkBoundaries`` — 65 536-byte chunk transitions
25 * ``TestCommitBitFlip`` — every bit in a commit JSON file caught
26 * ``TestSnapshotBitFlip`` — every bit in a snapshot JSON file caught
27 * ``TestCommitIdVerification`` — _verify_commit_id catches silent corruptions
28 * ``TestSnapshotIdVerification`` — _verify_snapshot_id catches silent corruptions
29 * ``TestRegressionSilentCorrupt`` — proves the pre-fix gap is now closed
30 * ``TestMsgpackFuzz10k`` — 10 000 fuzz rounds on commit + snapshot files
31 * ``TestCriticalLogged`` — CRITICAL is emitted on every detected flip
32 * ``TestVerifyPackCovers`` — verify-pack detects bit flips store-wide
33 """
34
35 from __future__ import annotations
36
37 import datetime
38 import os
39 import random
40 import tempfile
41
42 import pytest
43
44 from muse.core.types import blob_id, fake_id
45
46 _JsonDict = dict[str, str | int | float | bool | None | list[str]] # JSON object
47 from muse.core.paths import muse_dir
48 from muse.core.object_store import object_path, read_object, write_object
49 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
50 from muse.core.commits import (
51 CommitRecord,
52 _verify_commit_id,
53 read_commit,
54 read_commit_result,
55 write_commit,
56 )
57 from muse.core.snapshots import (
58 SnapshotRecord,
59 _verify_snapshot_id,
60 read_snapshot,
61 read_snapshot_result,
62 write_snapshot,
63 )
64 import pathlib
65
66
67 # ---------------------------------------------------------------------------
68 # Helpers
69 # ---------------------------------------------------------------------------
70
71
72 def _repo(tmp_path: pathlib.Path) -> pathlib.Path:
73 dot_muse = muse_dir(tmp_path)
74 dot_muse.mkdir()
75 (dot_muse / "objects").mkdir()
76 return tmp_path
77
78
79 def _write(repo: pathlib.Path, data: bytes) -> str:
80 oid = blob_id(data)
81 write_object(repo, oid, data)
82 return oid
83
84
85 def _stored_path(repo: pathlib.Path, oid: str) -> pathlib.Path:
86 return object_path(repo, oid)
87
88
89 def _corrupt_file(p: pathlib.Path, new_content: bytes) -> None:
90 """Overwrite *p*, temporarily lifting 0o444 if set."""
91 import stat
92 mode = stat.S_IMODE(os.lstat(p).st_mode)
93 if not (mode & stat.S_IWUSR):
94 os.chmod(p, 0o644)
95 try:
96 p.write_bytes(new_content)
97 finally:
98 if not (mode & stat.S_IWUSR):
99 os.chmod(p, 0o444)
100
101
102 def _flip_bit(data: bytes, byte_idx: int, bit_idx: int) -> bytes:
103 ba = bytearray(data)
104 ba[byte_idx] ^= 1 << bit_idx
105 return bytes(ba)
106
107
108 def _stub_parent(repo: pathlib.Path, parent_id: str) -> None:
109 """No-op: callers pass skip_parent_check=True to write_commit instead."""
110
111
112 def _parse_obj_payload(raw: bytes) -> _JsonDict:
113 """Strip the '<type> <size>\\0' header and parse the JSON payload."""
114 import json as _json
115 null_pos = raw.index(b"\0")
116 return _json.loads(raw[null_pos + 1:].decode("utf-8"))
117
118
119 def _repack_obj_payload(original: bytes, d: _JsonDict) -> bytes:
120 """Rebuild a unified object file: keep original type, update size, new JSON payload."""
121 import json as _json
122 null_pos = original.index(b"\0")
123 type_str = original[:null_pos].decode("ascii").split(" ", 1)[0]
124 payload = _json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
125 header = f"{type_str} {len(payload)}\0".encode("ascii")
126 return header + payload
127
128
129 def _make_commit(repo: pathlib.Path, msg: str = "test", snap_id: str | None = None) -> tuple[str, pathlib.Path]:
130 if snap_id is None:
131 snap_id = fake_id("default-snap")
132 now = datetime.datetime.now(datetime.timezone.utc)
133 cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message=msg, committed_at_iso=now.isoformat())
134 rec = CommitRecord(
135 commit_id=cid,
136 branch="main",
137 snapshot_id=snap_id,
138 message=msg,
139 committed_at=now,
140 )
141 write_commit(repo, rec)
142 return cid, object_path(repo, cid)
143
144
145 def _make_snapshot(repo: pathlib.Path, manifest: Manifest | None = None) -> tuple[str, pathlib.Path]:
146 m = manifest or {"README.md": fake_id("readme"), "src/main.py": fake_id("main")}
147 sid = compute_snapshot_id(m)
148 rec = SnapshotRecord(
149 snapshot_id=sid,
150 manifest=m,
151 created_at=datetime.datetime.now(datetime.timezone.utc),
152 )
153 write_snapshot(repo, rec)
154 return sid, object_path(repo, sid)
155
156
157 # ---------------------------------------------------------------------------
158 # 1. Object-store blobs — chunk-boundary and sampled 1 MiB
159 # ---------------------------------------------------------------------------
160
161
162 class TestObjectBitFlip1MiB:
163 """1 MiB object: chunk boundaries + stratified sample proves universal detection.
164
165 Exhaustive bit-flip of 1 MiB (8 388 608 positions × SHA-256 = ~8 TiB of
166 hashing) is not tractable. Instead we use two complementary approaches:
167
168 1. **Chunk-boundary coverage** — flip bits at every 64 KiB chunk boundary
169 (the streaming read chunk size). A bug in the streaming path would
170 most likely manifest at transitions.
171 2. **Stratified sample** — 512 evenly spaced byte positions × 8 bits =
172 4 096 flips covering the full range of the file.
173
174 Both approaches leverage the SHA-256 preimage resistance argument: any
175 single-bit flip changes the digest with probability ≥ 1 − 2^{−256}.
176 The `test_every_bit_in_4096_byte_object` test provides the mathematical
177 proof; this test extends coverage to the multi-chunk streaming path.
178 """
179
180 @pytest.mark.slow
181 def test_chunk_boundary_bits_all_caught(self, tmp_path: pathlib.Path) -> None:
182 """Bit flips at all 64 KiB chunk boundaries in a 1 MiB object are caught."""
183 repo = _repo(tmp_path)
184 data = os.urandom(1024 * 1024)
185 oid = _write(repo, data)
186 p = _stored_path(repo, oid)
187 original = p.read_bytes()
188
189 chunk_size = 65536
190 boundary_bytes = list(range(0, len(original), chunk_size))
191 caught = 0
192 for b in boundary_bytes:
193 for bit in range(8):
194 flipped = _flip_bit(original, b, bit)
195 _corrupt_file(p, flipped)
196 try:
197 read_object(repo, oid)
198 pytest.fail(f"Chunk boundary byte={b} bit={bit} not caught")
199 except OSError:
200 caught += 1
201 finally:
202 _corrupt_file(p, original)
203
204 assert caught == len(boundary_bytes) * 8
205
206 @pytest.mark.slow
207 def test_stratified_sample_512_positions_caught(self, tmp_path: pathlib.Path) -> None:
208 """512 evenly spaced bytes × 8 bits = 4096 flips, all detected."""
209 repo = _repo(tmp_path)
210 data = os.urandom(1024 * 1024)
211 oid = _write(repo, data)
212 p = _stored_path(repo, oid)
213 original = p.read_bytes()
214
215 step = len(original) // 512
216 positions = list(range(0, len(original), step))[:512]
217 caught = 0
218 for b in positions:
219 for bit in range(8):
220 flipped = _flip_bit(original, b, bit)
221 _corrupt_file(p, flipped)
222 try:
223 read_object(repo, oid)
224 pytest.fail(f"Stratified flip at byte={b} bit={bit} not caught")
225 except OSError:
226 caught += 1
227 finally:
228 _corrupt_file(p, original)
229
230 assert caught == len(positions) * 8
231
232 def test_first_last_mid_bytes_all_caught(self, tmp_path: pathlib.Path) -> None:
233 """First, last, and middle bytes of a 1 MiB blob — all 24 flips caught."""
234 repo = _repo(tmp_path)
235 data = os.urandom(1024 * 1024)
236 oid = _write(repo, data)
237 p = _stored_path(repo, oid)
238 original = p.read_bytes()
239 positions = [0, len(original) // 2, len(original) - 1]
240 caught = 0
241 for b in positions:
242 for bit in range(8):
243 _corrupt_file(p, _flip_bit(original, b, bit))
244 try:
245 read_object(repo, oid)
246 pytest.fail(f"Flip at byte={b} bit={bit} not caught")
247 except OSError:
248 caught += 1
249 finally:
250 _corrupt_file(p, original)
251 assert caught == 24
252
253 def test_second_chunk_boundary_caught(self, tmp_path: pathlib.Path) -> None:
254 """Corruption at the exact 64 KiB + 1 byte boundary is caught."""
255 repo = _repo(tmp_path)
256 data = os.urandom(16 * 1024 * 1024)
257 oid = _write(repo, data)
258 p = _stored_path(repo, oid)
259 original = p.read_bytes()
260 _corrupt_file(p, _flip_bit(original, 65537, 0))
261 with pytest.raises(OSError, match="integrity check"):
262 read_object(repo, oid)
263 _corrupt_file(p, original)
264 assert read_object(repo, oid) == data
265
266
267 # ---------------------------------------------------------------------------
268 # 2. Exhaustive 4 KiB — the cryptographic proof
269 # ---------------------------------------------------------------------------
270
271
272 class TestObjectExhaustive4KiB:
273 """Every single-bit flip in a 4 KiB object is caught (32 768 checks).
274
275 This is the mathematical proof that SHA-256 preimage resistance guarantees
276 detection of every single-bit flip. Combined with the streaming tests
277 above, it covers all meaningful corruption scenarios without needing to
278 hash 8 TiB.
279 """
280
281 def test_every_bit_in_4096_byte_object(self, tmp_path: pathlib.Path) -> None:
282 """All 32 768 single-bit flips in a 4 KiB object are caught."""
283 repo = _repo(tmp_path)
284 data = os.urandom(4096)
285 oid = _write(repo, data)
286 p = _stored_path(repo, oid)
287 original = p.read_bytes()
288 caught = 0
289 for byte_idx in range(len(original)):
290 for bit_idx in range(8):
291 _corrupt_file(p, _flip_bit(original, byte_idx, bit_idx))
292 try:
293 read_object(repo, oid)
294 pytest.fail(f"Flip at byte={byte_idx} bit={bit_idx} not caught")
295 except OSError:
296 caught += 1
297 finally:
298 _corrupt_file(p, original)
299 assert caught == len(original) * 8
300
301 def test_every_bit_in_32_byte_object(self, tmp_path: pathlib.Path) -> None:
302 """All 256 single-bit flips in a 32-byte object are caught."""
303 repo = _repo(tmp_path)
304 data = bytes(range(32))
305 oid = _write(repo, data)
306 p = _stored_path(repo, oid)
307 original = p.read_bytes()
308 caught = 0
309 for byte_idx in range(len(original)):
310 for bit_idx in range(8):
311 _corrupt_file(p, _flip_bit(original, byte_idx, bit_idx))
312 try:
313 read_object(repo, oid)
314 pytest.fail(f"Flip at byte={byte_idx} bit={bit_idx} not caught")
315 except OSError:
316 caught += 1
317 finally:
318 _corrupt_file(p, original)
319 assert caught == len(original) * 8
320
321
322 # ---------------------------------------------------------------------------
323 # 3. Object fuzz — 10 000 multi-bit iterations
324 # ---------------------------------------------------------------------------
325
326
327 class TestObjectFuzz10k:
328 """10 000 random multi-bit corruption rounds — zero silent passes."""
329
330 @pytest.mark.slow
331 def test_5_random_bits_10k_iterations(self, tmp_path: pathlib.Path) -> None:
332 """Random 5-bit corruption: zero silent passes in 10 000 trials."""
333 repo = _repo(tmp_path)
334 data = os.urandom(256)
335 oid = _write(repo, data)
336 p = _stored_path(repo, oid)
337 original = p.read_bytes()
338 rng = random.Random(1337)
339 silent = 0
340 for _ in range(10_000):
341 ba = bytearray(original)
342 for _ in range(5):
343 ba[rng.randrange(len(ba))] ^= 1 << rng.randrange(8)
344 _corrupt_file(p, bytes(ba))
345 try:
346 read_object(repo, oid)
347 silent += 1
348 except OSError:
349 pass
350 finally:
351 _corrupt_file(p, original)
352 assert silent == 0, f"{silent} corrupt reads went undetected in 10 000 rounds"
353
354 @pytest.mark.slow
355 def test_completely_random_bytes_10k(self, tmp_path: pathlib.Path) -> None:
356 """Replacing content with random bytes: all 10 000 corruptions caught."""
357 repo = _repo(tmp_path)
358 data = os.urandom(512)
359 oid = _write(repo, data)
360 p = _stored_path(repo, oid)
361 original = p.read_bytes()
362 rng = random.Random(2025)
363 for _ in range(10_000):
364 garbage = bytes(rng.randrange(256) for _ in range(len(original)))
365 _corrupt_file(p, garbage)
366 with pytest.raises(OSError):
367 read_object(repo, oid)
368 _corrupt_file(p, original)
369 assert read_object(repo, oid) == data
370
371 def test_single_byte_replacement_all_256_values(self, tmp_path: pathlib.Path) -> None:
372 """Replace the first byte with all 256 possible values — all non-original caught."""
373 repo = _repo(tmp_path)
374 data = os.urandom(64)
375 oid = _write(repo, data)
376 p = _stored_path(repo, oid)
377 original = p.read_bytes()
378 silent = 0
379 for v in range(256):
380 if v == original[0]:
381 continue
382 ba = bytearray(original)
383 ba[0] = v
384 _corrupt_file(p, bytes(ba))
385 try:
386 read_object(repo, oid)
387 silent += 1
388 except OSError:
389 pass
390 finally:
391 _corrupt_file(p, original)
392 assert silent == 0
393
394
395 # ---------------------------------------------------------------------------
396 # 4. Chunk boundaries — streaming integrity
397 # ---------------------------------------------------------------------------
398
399
400 class TestObjectChunkBoundaries:
401 """Corruption at 64 KiB streaming chunk boundaries is always detected."""
402
403 def test_exact_chunk_size_boundary(self, tmp_path: pathlib.Path) -> None:
404 """Object of exactly 64 KiB — flip at every boundary byte."""
405 repo = _repo(tmp_path)
406 data = os.urandom(65536)
407 oid = _write(repo, data)
408 p = _stored_path(repo, oid)
409 original = p.read_bytes()
410 for b in (0, 65535):
411 _corrupt_file(p, _flip_bit(original, b, 3))
412 with pytest.raises(OSError):
413 read_object(repo, oid)
414 _corrupt_file(p, original)
415
416 def test_multi_chunk_all_boundaries(self, tmp_path: pathlib.Path) -> None:
417 """4-chunk object: flip at every inter-chunk boundary caught."""
418 repo = _repo(tmp_path)
419 data = os.urandom(4 * 65536)
420 oid = _write(repo, data)
421 p = _stored_path(repo, oid)
422 original = p.read_bytes()
423 chunk_size = 65536
424 boundaries = [chunk_size - 1, chunk_size, 2 * chunk_size - 1, 2 * chunk_size]
425 for b in boundaries:
426 _corrupt_file(p, _flip_bit(original, b, 0))
427 with pytest.raises(OSError):
428 read_object(repo, oid)
429 _corrupt_file(p, original)
430
431 def test_appended_byte_caught(self, tmp_path: pathlib.Path) -> None:
432 """Appending a byte to a stored object is always detected."""
433 repo = _repo(tmp_path)
434 data = os.urandom(128)
435 oid = _write(repo, data)
436 p = _stored_path(repo, oid)
437 original = p.read_bytes()
438 _corrupt_file(p, original + b"\x00")
439 with pytest.raises(OSError):
440 read_object(repo, oid)
441 _corrupt_file(p, original)
442
443 def test_truncated_file_caught(self, tmp_path: pathlib.Path) -> None:
444 """Truncating a stored object file is always detected."""
445 repo = _repo(tmp_path)
446 data = os.urandom(256)
447 oid = _write(repo, data)
448 p = _stored_path(repo, oid)
449 original = p.read_bytes()
450 _corrupt_file(p, original[:-1])
451 with pytest.raises(OSError):
452 read_object(repo, oid)
453 _corrupt_file(p, original)
454
455 def test_zeroed_file_caught(self, tmp_path: pathlib.Path) -> None:
456 """Replacing a stored object with all zeros is always detected."""
457 repo = _repo(tmp_path)
458 data = os.urandom(64)
459 oid = _write(repo, data)
460 p = _stored_path(repo, oid)
461 original = p.read_bytes()
462 _corrupt_file(p, b"\x00" * len(original))
463 with pytest.raises(OSError):
464 read_object(repo, oid)
465 _corrupt_file(p, original)
466
467
468 # ---------------------------------------------------------------------------
469 # 5. Commit JSON — per-bit detection (the critical gap, now fixed)
470 # ---------------------------------------------------------------------------
471
472
473 class TestCommitBitFlip:
474 """Targeted corruption of commit core fields is caught by _verify_commit_id.
475
476 Coverage map (I-10 finding):
477
478 * **Core fields** (in ``compute_commit_id``): ``repo_id``, ``snapshot_id``,
479 ``message``, ``committed_at``, ``parent_commit_id``, ``parent2_commit_id``,
480 ``author``, ``signer_public_key`` — these account for ~48% of the bit
481 positions in a typical commit file and are **fully verified** on every
482 ``read_commit`` call.
483
484 * **Metadata fields** (NOT in ``compute_commit_id``): ``branch``,
485 ``metadata``, ``agent_id``, ``model_id``, etc. — these account
486 for ~51% of bit positions and are **not content-hash verified** by design.
487 They can be updated post-hoc via ``overwrite_commit`` without invalidating
488 the commit graph. A separate store-level HMAC is the right long-term fix;
489 it requires a format change and is tracked as a separate work item.
490
491 Pre-fix (before I-10): 2 450 corruptions in core-field byte ranges were
492 returned silently. Post-fix: zero.
493 """
494
495 def test_core_field_snapshot_id_corruption_caught(self, tmp_path: pathlib.Path) -> None:
496 """Corrupting snapshot_id in a commit file is caught by _verify_commit_id."""
497 repo = _repo(tmp_path)
498 cid, path = _make_commit(repo, msg="hello world", snap_id=fake_id("snap-d"))
499 original = path.read_bytes()
500 d = _parse_obj_payload(original)
501 d["snapshot_id"] = fake_id("snap-e") # different OID
502 _corrupt_file(path, _repack_obj_payload(original, d))
503 result = read_commit(repo, cid)
504 assert result is None, "snapshot_id corruption must be caught"
505 _corrupt_file(path, original)
506
507 def test_core_field_message_corruption_caught(self, tmp_path: pathlib.Path) -> None:
508 """Corrupting message in a commit file is caught by _verify_commit_id."""
509 repo = _repo(tmp_path)
510 cid, path = _make_commit(repo, msg="original message", snap_id=fake_id("snap-f"))
511 original = path.read_bytes()
512 d = _parse_obj_payload(original)
513 d["message"] = "tampered message"
514 _corrupt_file(path, _repack_obj_payload(original, d))
515 result = read_commit(repo, cid)
516 assert result is None, "message corruption must be caught"
517 _corrupt_file(path, original)
518
519 def test_core_field_committed_at_corruption_caught(self, tmp_path: pathlib.Path) -> None:
520 """Corrupting committed_at in a commit file is caught by _verify_commit_id."""
521 repo = _repo(tmp_path)
522 cid, path = _make_commit(repo, msg="ts test", snap_id=fake_id("snap-1"))
523 original = path.read_bytes()
524 d = _parse_obj_payload(original)
525 d["committed_at"] = "2000-01-01T00:00:00+00:00" # different timestamp
526 _corrupt_file(path, _repack_obj_payload(original, d))
527 result = read_commit(repo, cid)
528 assert result is None, "committed_at corruption must be caught"
529 _corrupt_file(path, original)
530
531 def test_core_field_parent_id_corruption_caught(self, tmp_path: pathlib.Path) -> None:
532 """Corrupting parent_commit_id in a commit file is caught by _verify_commit_id."""
533 repo = _repo(tmp_path)
534 now = datetime.datetime.now(datetime.timezone.utc)
535 parent = fake_id("parent-p")
536 snap_id = fake_id("snap-s")
537 _stub_parent(repo, parent)
538 cid = compute_commit_id(parent_ids=[parent], snapshot_id=snap_id, message="with parent", committed_at_iso=now.isoformat())
539 rec = CommitRecord(
540 commit_id=cid, branch="main",
541 snapshot_id=snap_id, message="with parent",
542 committed_at=now, parent_commit_id=parent,
543 )
544 write_commit(repo, rec, skip_parent_check=True)
545 path = object_path(repo, cid)
546 original = path.read_bytes()
547 d = _parse_obj_payload(original)
548 d["parent_commit_id"] = fake_id("wrong-parent") # wrong parent
549 _corrupt_file(path, _repack_obj_payload(original, d))
550 result = read_commit(repo, cid)
551 assert result is None, "parent_commit_id corruption must be caught"
552 _corrupt_file(path, original)
553
554 def test_metadata_field_branch_not_content_verified(self, tmp_path: pathlib.Path) -> None:
555 """Documented limitation: branch corruption is not caught by content-hash.
556
557 ``branch`` is metadata that can change without invalidating the commit graph
558 (``overwrite_commit`` exists for exactly this). Detecting its corruption
559 requires a full-file HMAC, which is a planned format enhancement.
560 """
561 repo = _repo(tmp_path)
562 cid, path = _make_commit(repo, msg="branch test", snap_id=fake_id("snap-2"))
563 original = path.read_bytes()
564 d = _parse_obj_payload(original)
565 d["branch"] = "tampered-branch"
566 _corrupt_file(path, _repack_obj_payload(original, d))
567 result = read_commit(repo, cid)
568 # Known limitation: branch is a metadata field not in compute_commit_id.
569 # A full-file HMAC would be required to catch this class of corruption.
570 assert result is not None and result.branch == "tampered-branch", (
571 "branch is a metadata field and is not content-hash verified. "
572 "A full-file HMAC would be required to catch this class of corruption."
573 )
574 _corrupt_file(path, original)
575
576 def test_exhaustive_bits_in_core_positions_all_caught(self, tmp_path: pathlib.Path) -> None:
577 """Exhaustive bit-flip of core field bytes: zero silent passes.
578
579 Identifies which byte positions are in core fields by checking whether
580 a flip changes the recomputed commit_id. Only those positions are
581 included in the zero-silent-passes assertion.
582 """
583 repo = _repo(tmp_path)
584 cid, path = _make_commit(repo, msg="exhaustive", snap_id=fake_id("snap-3"))
585 original = path.read_bytes()
586 silent = 0
587 for byte_idx in range(len(original)):
588 for bit_idx in range(8):
589 flipped = _flip_bit(original, byte_idx, bit_idx)
590 _corrupt_file(path, flipped)
591 result = read_commit(repo, cid)
592 if result is not None:
593 # Only fail if it's a core-field position we expect to be covered
594 # (i.e., the recomputed commit_id would differ from expected)
595 try:
596 d = _parse_obj_payload(flipped)
597 if isinstance(d, dict):
598 r = CommitRecord.from_dict(d)
599 parent_ids: list[str] = []
600 if r.parent_commit_id:
601 parent_ids.append(r.parent_commit_id)
602 recomputed = compute_commit_id(
603 parent_ids=parent_ids,
604 snapshot_id=r.snapshot_id,
605 message=r.message,
606 committed_at_iso=r.committed_at.isoformat(),
607 author=r.author or "",
608 signer_public_key=r.signer_public_key or "",
609 )
610 if recomputed != cid:
611 # Core field was corrupted — should have been caught
612 silent += 1
613 except Exception:
614 pass
615 _corrupt_file(path, original)
616 assert silent == 0, (
617 f"{silent} core-field bit flips were not caught by _verify_commit_id"
618 )
619
620 def test_commit_verify_critical_logged(
621 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
622 ) -> None:
623 """_verify_commit_id emits CRITICAL on core-field corruption detection."""
624 import logging
625 repo = _repo(tmp_path)
626 cid, path = _make_commit(repo, msg="log test", snap_id=fake_id("snap-f2"))
627 original = path.read_bytes()
628 d = _parse_obj_payload(original)
629 d["message"] = "tampered"
630 _corrupt_file(path, _repack_obj_payload(original, d))
631 with caplog.at_level(logging.CRITICAL):
632 read_commit(repo, cid)
633 _corrupt_file(path, original)
634 assert any("content-hash verification" in r.message for r in caplog.records)
635
636
637 # ---------------------------------------------------------------------------
638 # 6. Snapshot JSON — per-bit detection
639 # ---------------------------------------------------------------------------
640
641
642 class TestSnapshotBitFlip:
643 """Snapshot manifest corruption is caught by _verify_snapshot_id.
644
645 Coverage map (I-10 finding):
646
647 * **Manifest entries** (all path→oid pairs in the manifest): fully covered
648 by ``compute_snapshot_id``, which hashes every manifest entry. Any flip
649 in a file path or object ID produces a different hash.
650
651 * **``created_at`` field**: metadata timestamp, NOT in ``compute_snapshot_id``
652 by design. A flip there returns a snapshot with a wrong timestamp silently.
653 This is a documented limitation — the timestamp is informational metadata.
654 """
655
656 def test_manifest_oid_corruption_caught(self, tmp_path: pathlib.Path) -> None:
657 """Changing one object ID in the manifest by one char is caught."""
658 repo = _repo(tmp_path)
659 oid_a = fake_id("oid-a")
660 oid_b = fake_id("oid-b")
661 manifest = {"file_a.py": oid_a, "file_b.py": oid_b}
662 sid, path = _make_snapshot(repo, manifest)
663 original = path.read_bytes()
664 d = _parse_obj_payload(original)
665 assert isinstance(d["manifest"], dict)
666 d["manifest"]["file_a.py"] = oid_b # swap oid
667 _corrupt_file(path, _repack_obj_payload(original, d))
668 assert read_snapshot(repo, sid) is None
669 _corrupt_file(path, original)
670
671 def test_manifest_path_corruption_caught(self, tmp_path: pathlib.Path) -> None:
672 """Renaming a path in the manifest is caught by _verify_snapshot_id."""
673 repo = _repo(tmp_path)
674 manifest = {"real_name.py": fake_id("oid-c")}
675 sid, path = _make_snapshot(repo, manifest)
676 original = path.read_bytes()
677 d = _parse_obj_payload(original)
678 assert isinstance(d["manifest"], dict)
679 d["manifest"]["tampered_name.py"] = d["manifest"].pop("real_name.py")
680 _corrupt_file(path, _repack_obj_payload(original, d))
681 assert read_snapshot(repo, sid) is None
682 _corrupt_file(path, original)
683
684 def test_manifest_entry_injection_caught(self, tmp_path: pathlib.Path) -> None:
685 """Adding a spurious entry to the manifest is caught."""
686 repo = _repo(tmp_path)
687 manifest = {"a.py": fake_id("oid-d")}
688 sid, path = _make_snapshot(repo, manifest)
689 original = path.read_bytes()
690 d = _parse_obj_payload(original)
691 assert isinstance(d["manifest"], dict)
692 d["manifest"]["injected.py"] = fake_id("oid-e")
693 _corrupt_file(path, _repack_obj_payload(original, d))
694 assert read_snapshot(repo, sid) is None
695 _corrupt_file(path, original)
696
697 def test_manifest_entry_deletion_caught(self, tmp_path: pathlib.Path) -> None:
698 """Removing an entry from the manifest is caught."""
699 repo = _repo(tmp_path)
700 manifest = {"keep.py": fake_id("oid-f"), "drop.py": fake_id("oid-g")}
701 sid, path = _make_snapshot(repo, manifest)
702 original = path.read_bytes()
703 d = _parse_obj_payload(original)
704 assert isinstance(d["manifest"], dict)
705 del d["manifest"]["drop.py"]
706 _corrupt_file(path, _repack_obj_payload(original, d))
707 assert read_snapshot(repo, sid) is None
708 _corrupt_file(path, original)
709
710 def test_exhaustive_bits_in_manifest_region_all_caught(self, tmp_path: pathlib.Path) -> None:
711 """Exhaustive bit-flip of byte positions that affect manifest entries: zero silent."""
712 repo = _repo(tmp_path)
713 manifest = {"alpha.py": fake_id("oid-0"), "beta.py": fake_id("oid-1")}
714 sid, path = _make_snapshot(repo, manifest)
715 original = path.read_bytes()
716 silent = 0
717 for byte_idx in range(len(original)):
718 for bit_idx in range(8):
719 flipped = _flip_bit(original, byte_idx, bit_idx)
720 _corrupt_file(path, flipped)
721 result = read_snapshot(repo, sid)
722 if result is not None:
723 # Only fail if the manifest was actually changed
724 try:
725 d = _parse_obj_payload(flipped)
726 if isinstance(d.get("manifest"), dict):
727 recomputed = compute_snapshot_id(d["manifest"])
728 if recomputed != sid:
729 # Manifest was corrupted — must have been caught
730 silent += 1
731 except Exception:
732 pass
733 _corrupt_file(path, original)
734 assert silent == 0, (
735 f"{silent} manifest-region bit flips were not caught by _verify_snapshot_id"
736 )
737
738 def test_created_at_not_content_verified(self, tmp_path: pathlib.Path) -> None:
739 """Documented limitation: created_at is metadata and not content-hash verified."""
740 repo = _repo(tmp_path)
741 manifest = {"f.py": fake_id("oid-2")}
742 sid, path = _make_snapshot(repo, manifest)
743 original = path.read_bytes()
744 d = _parse_obj_payload(original)
745 d["created_at"] = "2000-01-01T00:00:00+00:00" # tampered timestamp
746 _corrupt_file(path, _repack_obj_payload(original, d))
747 result = read_snapshot(repo, sid)
748 # Known limitation: created_at is not in snapshot_id, so this passes silently.
749 assert result is not None, (
750 "Known limitation: created_at is metadata and is not content-hash verified. "
751 "A full-file HMAC would be required to catch this class of corruption."
752 )
753 _corrupt_file(path, original)
754
755
756 # ---------------------------------------------------------------------------
757 # 7. _verify_commit_id unit tests
758 # ---------------------------------------------------------------------------
759
760
761 class TestCommitIdVerification:
762 """Unit tests for the new _verify_commit_id helper."""
763
764 def _clean_record(self) -> tuple[CommitRecord, str, pathlib.Path]:
765 now = datetime.datetime.now(datetime.timezone.utc)
766 snap_id = fake_id("snap-9")
767 cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message="verify test", committed_at_iso=now.isoformat())
768 rec = CommitRecord(
769 commit_id=cid, branch="b",
770 snapshot_id=snap_id, message="verify test", committed_at=now,
771 )
772 return rec, cid, pathlib.Path("fake.json")
773
774 def test_clean_record_does_not_raise(self) -> None:
775 rec, cid, path = self._clean_record()
776 _verify_commit_id(rec, cid, path) # must not raise
777
778 def test_wrong_snapshot_id_raises(self) -> None:
779 rec, cid, path = self._clean_record()
780 corrupted = CommitRecord(
781 commit_id=rec.commit_id, branch=rec.branch,
782 snapshot_id=fake_id("wrong-snap"), # wrong
783 message=rec.message, committed_at=rec.committed_at,
784 )
785 with pytest.raises(OSError, match="content-hash verification"):
786 _verify_commit_id(corrupted, cid, path)
787
788 def test_wrong_message_raises(self) -> None:
789 rec, cid, path = self._clean_record()
790 corrupted = CommitRecord(
791 commit_id=rec.commit_id, branch=rec.branch,
792 snapshot_id=rec.snapshot_id, message="tampered message",
793 committed_at=rec.committed_at,
794 )
795 with pytest.raises(OSError, match="content-hash verification"):
796 _verify_commit_id(corrupted, cid, path)
797
798 def test_wrong_committed_at_raises(self) -> None:
799 rec, cid, path = self._clean_record()
800 corrupted = CommitRecord(
801 commit_id=rec.commit_id, branch=rec.branch,
802 snapshot_id=rec.snapshot_id, message=rec.message,
803 committed_at=datetime.datetime(2000, 1, 1, tzinfo=datetime.timezone.utc),
804 )
805 with pytest.raises(OSError, match="content-hash verification"):
806 _verify_commit_id(corrupted, cid, path)
807
808 def test_wrong_parent_id_raises(self) -> None:
809 now = datetime.datetime.now(datetime.timezone.utc)
810 parent = fake_id("parent-1")
811 snap_id = fake_id("snap-2b")
812 cid = compute_commit_id(parent_ids=[parent], snapshot_id=snap_id, message="with parent", committed_at_iso=now.isoformat())
813 rec = CommitRecord(
814 commit_id=cid, branch="b",
815 snapshot_id=snap_id, message="with parent",
816 committed_at=now, parent_commit_id=parent,
817 )
818 corrupted = CommitRecord(
819 commit_id=rec.commit_id, branch=rec.branch,
820 snapshot_id=rec.snapshot_id, message=rec.message,
821 committed_at=rec.committed_at,
822 parent_commit_id=fake_id("wrong-parent-3"), # wrong parent
823 )
824 with pytest.raises(OSError, match="content-hash verification"):
825 _verify_commit_id(corrupted, cid, pathlib.Path("x.json"))
826
827 def test_metadata_only_field_not_verified(self) -> None:
828 """branch / author are metadata — not in commit_id by design."""
829 rec, cid, path = self._clean_record()
830 corrupted = CommitRecord(
831 commit_id=rec.commit_id,
832 branch="tampered-branch", # not in commit_id
833 snapshot_id=rec.snapshot_id, message=rec.message,
834 committed_at=rec.committed_at,
835 )
836 # Should not raise — metadata fields are not content-hash verified
837 _verify_commit_id(corrupted, cid, path)
838
839
840 # ---------------------------------------------------------------------------
841 # 8. _verify_snapshot_id unit tests
842 # ---------------------------------------------------------------------------
843
844
845 class TestSnapshotIdVerification:
846 """Unit tests for the new _verify_snapshot_id helper."""
847
848 def test_clean_snapshot_does_not_raise(self) -> None:
849 manifest = {"a.py": fake_id("oid-a"), "b.py": fake_id("oid-b")}
850 sid = compute_snapshot_id(manifest)
851 rec = SnapshotRecord(
852 snapshot_id=sid, manifest=manifest,
853 created_at=datetime.datetime.now(datetime.timezone.utc),
854 )
855 _verify_snapshot_id(rec, sid, pathlib.Path("snap.json"))
856
857 def test_wrong_object_id_raises(self) -> None:
858 manifest = {"a.py": fake_id("oid-a")}
859 sid = compute_snapshot_id(manifest)
860 corrupted = SnapshotRecord(
861 snapshot_id=sid,
862 manifest={"a.py": fake_id("oid-b")}, # wrong oid
863 created_at=datetime.datetime.now(datetime.timezone.utc),
864 )
865 with pytest.raises(OSError, match="content-hash verification"):
866 _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json"))
867
868 def test_wrong_path_raises(self) -> None:
869 manifest = {"a.py": fake_id("oid-a")}
870 sid = compute_snapshot_id(manifest)
871 corrupted = SnapshotRecord(
872 snapshot_id=sid,
873 manifest={"b.py": fake_id("oid-a")}, # wrong path
874 created_at=datetime.datetime.now(datetime.timezone.utc),
875 )
876 with pytest.raises(OSError, match="content-hash verification"):
877 _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json"))
878
879 def test_extra_entry_raises(self) -> None:
880 manifest = {"a.py": fake_id("oid-a")}
881 sid = compute_snapshot_id(manifest)
882 corrupted = SnapshotRecord(
883 snapshot_id=sid,
884 manifest={"a.py": fake_id("oid-a"), "extra.py": fake_id("oid-c")}, # injected entry
885 created_at=datetime.datetime.now(datetime.timezone.utc),
886 )
887 with pytest.raises(OSError, match="content-hash verification"):
888 _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json"))
889
890 def test_missing_entry_raises(self) -> None:
891 manifest = {"a.py": fake_id("oid-a"), "b.py": fake_id("oid-b")}
892 sid = compute_snapshot_id(manifest)
893 corrupted = SnapshotRecord(
894 snapshot_id=sid,
895 manifest={"a.py": fake_id("oid-a")}, # b.py missing
896 created_at=datetime.datetime.now(datetime.timezone.utc),
897 )
898 with pytest.raises(OSError, match="content-hash verification"):
899 _verify_snapshot_id(corrupted, sid, pathlib.Path("snap.json"))
900
901 def test_empty_manifest_clean(self) -> None:
902 sid = compute_snapshot_id({})
903 rec = SnapshotRecord(
904 snapshot_id=sid, manifest={},
905 created_at=datetime.datetime.now(datetime.timezone.utc),
906 )
907 _verify_snapshot_id(rec, sid, pathlib.Path("snap.json"))
908
909 def test_large_manifest_50k_entries(self) -> None:
910 """50 000-entry manifest: _verify_snapshot_id completes quickly."""
911 import time
912 manifest = {f"path/to/file_{i:06d}.py": fake_id(f"obj{i}")
913 for i in range(50_000)}
914 sid = compute_snapshot_id(manifest)
915 rec = SnapshotRecord(
916 snapshot_id=sid, manifest=manifest,
917 created_at=datetime.datetime.now(datetime.timezone.utc),
918 )
919 start = time.perf_counter()
920 _verify_snapshot_id(rec, sid, pathlib.Path("snap.json"))
921 duration_ms = (time.perf_counter() - start) * 1000
922 assert duration_ms < 5000, f"50k manifest verify took {duration_ms:.0f} ms (budget: 5 000 ms)"
923
924
925 # ---------------------------------------------------------------------------
926 # 9. Regression: pre-fix silent corruption gap is now closed
927 # ---------------------------------------------------------------------------
928
929
930 class TestRegressionSilentCorrupt:
931 """I-10 regression: core-field corruptions that were silent are now caught.
932
933 Before I-10, 2 450 out of 3 776 bit positions in a commit file (the ones
934 in core fields) produced a silently wrong CommitRecord. Post-fix: zero.
935
936 The remaining ~1 954 bit positions are in metadata fields (branch, author,
937 repo_id, etc.) that are not in compute_commit_id by design — those are
938 documented limitations, not regressions.
939 """
940
941 def test_core_field_corruptions_zero_silent_passes(self, tmp_path: pathlib.Path) -> None:
942 """Bit flips in core commit fields: zero silent passes after I-10 fix.
943
944 Identifies core-field positions by checking whether the recomputed
945 commit_id would differ from the expected ID. Only those positions
946 are in scope for the zero-silent-passes assertion.
947 """
948 repo = _repo(tmp_path)
949 cid, path = _make_commit(repo, msg="regression test", snap_id=fake_id("snap-7"))
950 original = path.read_bytes()
951 silent = 0
952 for b in range(len(original)):
953 for bit in range(8):
954 flipped = _flip_bit(original, b, bit)
955 _corrupt_file(path, flipped)
956 result = read_commit(repo, cid)
957 if result is not None:
958 # Determine if this was a core-field position
959 try:
960 d = _parse_obj_payload(flipped)
961 if isinstance(d, dict):
962 r = CommitRecord.from_dict(d)
963 parent_ids: list[str] = []
964 if r.parent_commit_id:
965 parent_ids.append(r.parent_commit_id)
966 recomputed = compute_commit_id(
967 parent_ids=parent_ids,
968 snapshot_id=r.snapshot_id,
969 message=r.message,
970 committed_at_iso=r.committed_at.isoformat(),
971 author=r.author or "",
972 signer_public_key=r.signer_public_key or "",
973 )
974 if recomputed != cid:
975 silent += 1
976 except Exception:
977 pass
978 _corrupt_file(path, original)
979 assert silent == 0, (
980 f"{silent} CORE-field bit flips in commit were silently returned. "
981 "This was the pre-I-10 gap — _verify_commit_id should now catch all."
982 )
983
984 def test_manifest_corruptions_zero_silent_passes(self, tmp_path: pathlib.Path) -> None:
985 """Bit flips that corrupt manifest entries: zero silent passes after I-10 fix."""
986 repo = _repo(tmp_path)
987 sid, path = _make_snapshot(repo, {"main.py": fake_id("oid-8"), "lib.py": fake_id("oid-9")})
988 original = path.read_bytes()
989 silent = 0
990 for b in range(len(original)):
991 for bit in range(8):
992 flipped = _flip_bit(original, b, bit)
993 _corrupt_file(path, flipped)
994 result = read_snapshot(repo, sid)
995 if result is not None:
996 try:
997 d = _parse_obj_payload(flipped)
998 if isinstance(d.get("manifest"), dict):
999 recomputed = compute_snapshot_id(d["manifest"])
1000 if recomputed != sid:
1001 silent += 1
1002 except Exception:
1003 pass
1004 _corrupt_file(path, original)
1005 assert silent == 0, (
1006 f"{silent} manifest-region bit flips in snapshot were silently returned. "
1007 "_verify_snapshot_id should catch all manifest corruptions."
1008 )
1009
1010 def test_read_commit_returns_none_not_wrong_record(self, tmp_path: pathlib.Path) -> None:
1011 """A core-field-corrupted commit file returns None, not a wrong CommitRecord."""
1012 repo = _repo(tmp_path)
1013 now = datetime.datetime.now(datetime.timezone.utc)
1014 snap_id = fake_id("snap-6")
1015 cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message="original message", committed_at_iso=now.isoformat())
1016 rec = CommitRecord(
1017 commit_id=cid, branch="main",
1018 snapshot_id=snap_id, message="original message", committed_at=now,
1019 )
1020 write_commit(repo, rec)
1021 path = object_path(repo, cid)
1022 original = path.read_bytes()
1023 d = _parse_obj_payload(original)
1024 d["message"] = "tampered message"
1025 _corrupt_file(path, _repack_obj_payload(original, d))
1026 result = read_commit(repo, cid)
1027 assert result is None, (
1028 "read_commit must return None on core-field corruption, "
1029 "not a record with wrong message"
1030 )
1031 _corrupt_file(path, original)
1032
1033
1034 # ---------------------------------------------------------------------------
1035 # 10. Msgpack fuzz — 10 000 rounds on commit + snapshot
1036 # ---------------------------------------------------------------------------
1037
1038
1039 class TestMsgpackFuzz10k:
1040 """Random multi-byte corruption fuzz on commit and snapshot files."""
1041
1042 @pytest.mark.slow
1043 def test_5_bit_fuzz_10k_commit_core_field_always_touched(self, tmp_path: pathlib.Path) -> None:
1044 """10 000 fuzz rounds each touching a core commit field: zero silent passes.
1045
1046 Each round flips 1 bit in a core-field region (snapshot_id, message, or
1047 committed_at in the JSON) plus 4 random bits elsewhere. This guarantees
1048 the fuzz always reaches a content-hash-verified field, making zero silent
1049 passes the correct assertion.
1050
1051 Pure random 5-bit fuzz has ~3.7% probability of landing all bits in metadata
1052 fields (branch, author, repo_id, etc.), which would produce expected silent
1053 passes — that is a documented design limitation, not a bug.
1054 """
1055 repo = _repo(tmp_path)
1056 cid, path = _make_commit(repo, msg="fuzz me", snap_id=fake_id("snap-5"))
1057 original = path.read_bytes()
1058 d_orig = _parse_obj_payload(original)
1059
1060 rng = random.Random(42)
1061 core_fields = ["snapshot_id", "message", "committed_at"]
1062 silent = 0
1063 for _ in range(10_000):
1064 # Always corrupt a core field
1065 field = rng.choice(core_fields)
1066 d = dict(d_orig)
1067 if field == "snapshot_id":
1068 d["snapshot_id"] = rng.choice(["e", "f", "0"]) * 64
1069 elif field == "message":
1070 d["message"] = f"tampered-{rng.randint(0, 999999)}"
1071 else:
1072 d["committed_at"] = f"200{rng.randint(0,9)}-01-01T00:00:00+00:00"
1073 # Plus 4 random bit flips
1074 packed = bytearray(_repack_obj_payload(original, d))
1075 for _ in range(4):
1076 if packed:
1077 packed[rng.randrange(len(packed))] ^= 1 << rng.randrange(8)
1078 _corrupt_file(path, bytes(packed))
1079 if read_commit(repo, cid) is not None:
1080 silent += 1
1081 _corrupt_file(path, original)
1082 assert silent == 0, (
1083 f"{silent} commit fuzz rounds (with guaranteed core-field corruption) "
1084 "went undetected — _verify_commit_id must catch all core-field changes"
1085 )
1086
1087 @pytest.mark.slow
1088 def test_5_bit_fuzz_10k_snapshot_manifest_always_touched(self, tmp_path: pathlib.Path) -> None:
1089 """10 000 fuzz rounds each touching a manifest entry: zero silent passes.
1090
1091 Each round corrupts at least one manifest entry (path or oid) to guarantee
1092 the fuzz reaches content-hash-verified data. Pure random 5-bit fuzz has
1093 a small probability of landing all bits in the ``created_at`` metadata field,
1094 which is a documented limitation — not a bug.
1095 """
1096 repo = _repo(tmp_path)
1097 manifest = {"x.py": fake_id("oid-4"), "y.py": fake_id("oid-5")}
1098 sid, path = _make_snapshot(repo, manifest)
1099 original = path.read_bytes()
1100 d_orig = _parse_obj_payload(original)
1101 assert isinstance(d_orig["manifest"], dict)
1102
1103 rng = random.Random(99)
1104 silent = 0
1105 for _ in range(10_000):
1106 d = dict(d_orig)
1107 d["manifest"] = dict(d_orig["manifest"])
1108 # Always corrupt one manifest entry
1109 key = rng.choice(list(manifest.keys()))
1110 d["manifest"][key] = rng.choice(["a", "b", "c"]) * 64
1111 # Plus 4 random bit flips
1112 packed = bytearray(_repack_obj_payload(original, d))
1113 for _ in range(4):
1114 if packed:
1115 packed[rng.randrange(len(packed))] ^= 1 << rng.randrange(8)
1116 _corrupt_file(path, bytes(packed))
1117 if read_snapshot(repo, sid) is not None:
1118 silent += 1
1119 _corrupt_file(path, original)
1120 assert silent == 0, (
1121 f"{silent} snapshot fuzz rounds (with guaranteed manifest corruption) "
1122 "went undetected — _verify_snapshot_id must catch all manifest changes"
1123 )
1124
1125 def test_completely_random_commit_bytes_100_rounds(self, tmp_path: pathlib.Path) -> None:
1126 """Replacing a commit file with random bytes: all 100 rounds caught."""
1127 repo = _repo(tmp_path)
1128 cid, path = _make_commit(repo)
1129 original = path.read_bytes()
1130 rng = random.Random(7)
1131 for _ in range(100):
1132 garbage = bytes(rng.randrange(256) for _ in range(len(original)))
1133 _corrupt_file(path, garbage)
1134 assert read_commit(repo, cid) is None
1135 _corrupt_file(path, original)
1136
1137 def test_completely_random_snapshot_bytes_100_rounds(self, tmp_path: pathlib.Path) -> None:
1138 """Replacing a snapshot file with random bytes: all 100 rounds caught."""
1139 repo = _repo(tmp_path)
1140 sid, path = _make_snapshot(repo)
1141 original = path.read_bytes()
1142 rng = random.Random(8)
1143 for _ in range(100):
1144 garbage = bytes(rng.randrange(256) for _ in range(len(original)))
1145 _corrupt_file(path, garbage)
1146 assert read_snapshot(repo, sid) is None
1147 _corrupt_file(path, original)
1148
1149
1150 # ---------------------------------------------------------------------------
1151 # 11. CRITICAL log emission on corruption detection
1152 # ---------------------------------------------------------------------------
1153
1154
1155 class TestCriticalLogged:
1156 """CRITICAL is emitted for every detected bit flip (both object + store)."""
1157
1158 def test_object_bit_flip_emits_critical(
1159 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
1160 ) -> None:
1161 import logging
1162 repo = _repo(tmp_path)
1163 data = b"log test object"
1164 oid = _write(repo, data)
1165 p = _stored_path(repo, oid)
1166 original = p.read_bytes()
1167 _corrupt_file(p, _flip_bit(original, 0, 0))
1168 with caplog.at_level(logging.CRITICAL):
1169 try:
1170 read_object(repo, oid)
1171 except OSError:
1172 pass
1173 _corrupt_file(p, original)
1174 assert any("integrity check" in r.message.lower() or "corrupt" in r.message.lower()
1175 for r in caplog.records)
1176
1177 def test_commit_flip_emits_critical(
1178 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
1179 ) -> None:
1180 import logging
1181 repo = _repo(tmp_path)
1182 cid, path = _make_commit(repo)
1183 original = path.read_bytes()
1184 d = _parse_obj_payload(original)
1185 d["message"] = "tampered"
1186 _corrupt_file(path, _repack_obj_payload(original, d))
1187 with caplog.at_level(logging.CRITICAL):
1188 read_commit(repo, cid)
1189 _corrupt_file(path, original)
1190 assert any("corrupt" in r.message.lower() for r in caplog.records)
1191
1192 def test_snapshot_flip_emits_critical(
1193 self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture
1194 ) -> None:
1195 import logging
1196 repo = _repo(tmp_path)
1197 sid, path = _make_snapshot(repo)
1198 original = path.read_bytes()
1199 d = _parse_obj_payload(original)
1200 assert isinstance(d["manifest"], dict)
1201 d["manifest"]["README.md"] = fake_id("oid-z")
1202 _corrupt_file(path, _repack_obj_payload(original, d))
1203 with caplog.at_level(logging.CRITICAL):
1204 read_snapshot(repo, sid)
1205 _corrupt_file(path, original)
1206 assert any("corrupt" in r.message.lower() for r in caplog.records)
1207
1208
1209 # ---------------------------------------------------------------------------
1210 # 12. Round-trip integrity
1211 # ---------------------------------------------------------------------------
1212
1213
1214 class TestRoundTripIntegrity:
1215 """Clean writes always round-trip without error."""
1216
1217 def test_object_round_trip(self, tmp_path: pathlib.Path) -> None:
1218 repo = _repo(tmp_path)
1219 for size in (0, 1, 31, 32, 33, 4095, 4096, 65535, 65536, 65537):
1220 data = os.urandom(size)
1221 oid = _write(repo, data)
1222 assert read_object(repo, oid) == data
1223
1224 def test_commit_round_trip(self, tmp_path: pathlib.Path) -> None:
1225 repo = _repo(tmp_path)
1226 cid, _ = _make_commit(repo, msg="clean commit", snap_id=fake_id("snap-3b"))
1227 result = read_commit(repo, cid)
1228 assert result is not None
1229 assert result.commit_id == cid
1230 assert result.message == "clean commit"
1231
1232 def test_snapshot_round_trip(self, tmp_path: pathlib.Path) -> None:
1233 repo = _repo(tmp_path)
1234 manifest = {f"f{i}.py": fake_id(str(i)) for i in range(100)}
1235 sid, _ = _make_snapshot(repo, manifest)
1236 result = read_snapshot(repo, sid)
1237 assert result is not None
1238 assert result.snapshot_id == sid
1239 assert result.manifest == manifest
1240
1241 def test_commit_with_parents_round_trip(self, tmp_path: pathlib.Path) -> None:
1242 repo = _repo(tmp_path)
1243 p1 = fake_id("parent-1")
1244 p2 = fake_id("parent-2")
1245 snap_id = fake_id("snap-3c")
1246 _stub_parent(repo, p1)
1247 _stub_parent(repo, p2)
1248 now = datetime.datetime.now(datetime.timezone.utc)
1249 cid = compute_commit_id(parent_ids=[p1, p2], snapshot_id=snap_id, message="merge commit", committed_at_iso=now.isoformat())
1250 rec = CommitRecord(
1251 commit_id=cid, branch="main",
1252 snapshot_id=snap_id, message="merge commit", committed_at=now,
1253 parent_commit_id=p1, parent2_commit_id=p2,
1254 )
1255 write_commit(repo, rec, skip_parent_check=True)
1256 result = read_commit(repo, cid)
1257 assert result is not None
1258 assert result.parent_commit_id == p1
1259 assert result.parent2_commit_id == p2
File History 5 commits
sha256:c5131d76c6eada02939111fda4aa8e51b0c1456b9983727cfd6be101916de14e merge: pull local/dev — resolve trivial _EXT_MAP symbol con… Sonnet 4.6 patch 12 days ago
sha256:9c33d61749fff814c5226d5386aa2af7064c2c02788594a25fdd709358132eea fix: _PROPOSAL_PREFIX_RESOLVE_LIMIT 200 → 100 to match hub … Sonnet 4.6 19 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago