gabriel / muse public
test_write_snapshot_incoming_verify.py python
341 lines 14.2 KB
Raw
sha256:a73c3f57b665e8c0be2c9e977b3ebefdb7ae8d46f196986d911c6a8f5d8b8d49 docs: update store.py references to focused module paths Sonnet 4.6 27 days ago
1 """Tests for Bug 9: write_snapshot accepts incoming SnapshotRecord whose
2 snapshot_id doesn't match the hash of its manifest — creating a file that
3 read_snapshot always reports as corrupt (hash mismatch), permanently unreadable.
4
5 The symmetrical fix to Bug 8b (write_commit incoming verification): both
6 write_commit and write_snapshot must verify the incoming record hash before
7 touching disk.
8
9 Attack scenarios:
10 - apply_mpack receives a mpack where snapshot_id is wrong (corruption/attack)
11 - A manually-constructed SnapshotRecord is passed with mismatched snapshot_id
12 - An adversary injects a snapshot that checksums fine for the wrong ID
13
14 Scope of tests
15 --------------
16 Unit (write_snapshot incoming hash verification):
17 - write_snapshot rejects incoming record with wrong snapshot_id (new file)
18 - write_snapshot rejects incoming record with wrong snapshot_id (existing good file)
19 - write_snapshot accepts valid incoming record (new file)
20 - write_snapshot is idempotent on valid record (second call skips)
21 - write_snapshot rejects incoming record with one wrong manifest entry
22 - write_snapshot rejects incoming record with extra injected manifest entry
23 - write_snapshot rejects incoming record with missing manifest entry
24 - write_snapshot rejects incoming record with wrong directories hash
25
26 Integration (apply_mpack with corrupt snapshot_id):
27 - apply_mpack skips snapshot with wrong snapshot_id
28 - apply_mpack does not skip valid snapshots when one is corrupt
29 - apply_mpack: written snapshot must be readable via read_snapshot
30 - apply_mpack: corrupt snapshot_id in mpack cannot poison existing good snapshot
31
32 Security:
33 - A mpack cannot substitute a manifest that passes a different snapshot's hash
34 - Injected manifest entries are rejected before reaching disk
35
36 Stress:
37 - 100-snapshot mpack with one corrupt snapshot_id: 99 written, 1 skipped
38 """
39 from __future__ import annotations
40
41 import datetime
42 import pathlib
43
44 import pytest
45
46 from muse.core.mpack import apply_mpack, MPack, SnapshotDeltaDict
47 from muse.core.paths import muse_dir
48 from muse.core.ids import hash_snapshot as compute_snapshot_id
49
50 from muse.core.types import Manifest, NULL_COMMIT_ID, blob_id, fake_id, long_id
51 from muse.core.snapshots import (
52 SnapshotRecord,
53 read_snapshot,
54 write_snapshot,
55 )
56
57 _TS = datetime.datetime(2024, 6, 15, 10, 0, 0, tzinfo=datetime.timezone.utc)
58
59
60 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
61 repo = tmp_path / "repo"
62 repo.mkdir()
63 muse_dir(repo).mkdir()
64 return repo
65
66
67 def _good_snap(manifest: Manifest | None = None) -> SnapshotRecord:
68 m = manifest or {"src/main.py": long_id("a" * 64)}
69 snap_id = compute_snapshot_id(m)
70 return SnapshotRecord(
71 snapshot_id=snap_id,
72 manifest=m,
73 directories=[],
74 created_at=_TS,
75 note="",
76 )
77
78
79 def _snap_with_wrong_id(manifest: Manifest | None = None) -> SnapshotRecord:
80 """SnapshotRecord whose snapshot_id doesn't match the hash of its manifest."""
81 m = manifest or {"src/main.py": long_id("a" * 64)}
82 return SnapshotRecord(
83 snapshot_id=long_id("f" * 64), # wrong — doesn't match manifest hash
84 manifest=m,
85 directories=[],
86 created_at=_TS,
87 note="",
88 )
89
90
91 # ──────────────────────────────────────────────────────────────────────────────
92 # Unit: write_snapshot incoming hash verification
93 # ──────────────────────────────────────────────────────────────────────────────
94
95 class TestWriteSnapshotIncomingVerification:
96
97 def test_rejects_wrong_snapshot_id_new_file(self, tmp_path: pathlib.Path) -> None:
98 """BUG: write_snapshot writes the bad record; read_snapshot returns None forever."""
99 repo = _make_repo(tmp_path)
100 bad = _snap_with_wrong_id()
101 with pytest.raises((ValueError, OSError)):
102 write_snapshot(repo, bad)
103
104 def test_rejects_wrong_snapshot_id_existing_good_file(self, tmp_path: pathlib.Path) -> None:
105 """write_snapshot must not write over an existing good file with a bad incoming record.
106 (The bad record has the same snapshot_id as good, but different manifest.)
107 """
108 repo = _make_repo(tmp_path)
109 good = _good_snap()
110 write_snapshot(repo, good)
111
112 # Construct bad record with SAME snapshot_id but different manifest
113 bad = SnapshotRecord(
114 snapshot_id=good.snapshot_id, # same ID
115 manifest={"other.py": long_id("b" * 64)}, # different content — hash won't match
116 directories=[],
117 created_at=_TS,
118 note="",
119 )
120 with pytest.raises((ValueError, OSError)):
121 write_snapshot(repo, bad)
122
123 # Good file must still be intact
124 stored = read_snapshot(repo, good.snapshot_id)
125 assert stored is not None
126 assert stored.manifest == good.manifest
127
128 def test_accepts_valid_incoming_record(self, tmp_path: pathlib.Path) -> None:
129 repo = _make_repo(tmp_path)
130 good = _good_snap()
131 write_snapshot(repo, good) # must not raise
132 stored = read_snapshot(repo, good.snapshot_id)
133 assert stored is not None
134 assert stored.snapshot_id == good.snapshot_id
135
136 def test_idempotent_on_valid_record(self, tmp_path: pathlib.Path) -> None:
137 repo = _make_repo(tmp_path)
138 good = _good_snap()
139 write_snapshot(repo, good)
140 write_snapshot(repo, good) # second call must not raise
141 assert read_snapshot(repo, good.snapshot_id) is not None
142
143 def test_rejects_incoming_with_wrong_object_id(self, tmp_path: pathlib.Path) -> None:
144 """Incoming snapshot with a tampered object ID must be rejected."""
145 repo = _make_repo(tmp_path)
146 good = _good_snap({"src/main.py": long_id("a" * 64)})
147
148 tampered = SnapshotRecord(
149 snapshot_id=good.snapshot_id, # original hash
150 manifest={"src/main.py": long_id("b" * 64)}, # different object ID
151 directories=[],
152 created_at=_TS,
153 note="",
154 )
155 with pytest.raises((ValueError, OSError)):
156 write_snapshot(repo, tampered)
157
158 def test_rejects_incoming_with_injected_manifest_entry(self, tmp_path: pathlib.Path) -> None:
159 """Incoming snapshot with an injected extra file must be rejected."""
160 repo = _make_repo(tmp_path)
161 good = _good_snap({"src/main.py": long_id("a" * 64)})
162
163 tampered = SnapshotRecord(
164 snapshot_id=good.snapshot_id,
165 manifest={"src/main.py": long_id("a" * 64), "malicious.sh": long_id("e" * 64)}, # injected
166 directories=[],
167 created_at=_TS,
168 note="",
169 )
170 with pytest.raises((ValueError, OSError)):
171 write_snapshot(repo, tampered)
172
173 def test_rejects_incoming_with_missing_manifest_entry(self, tmp_path: pathlib.Path) -> None:
174 """Incoming snapshot with a removed file entry must be rejected."""
175 repo = _make_repo(tmp_path)
176 good = _good_snap({"src/a.py": long_id("a" * 64), "src/b.py": long_id("b" * 64)})
177
178 tampered = SnapshotRecord(
179 snapshot_id=good.snapshot_id,
180 manifest={"src/a.py": long_id("a" * 64)}, # missing src/b.py
181 directories=[],
182 created_at=_TS,
183 note="",
184 )
185 with pytest.raises((ValueError, OSError)):
186 write_snapshot(repo, tampered)
187
188 def test_rejects_incoming_with_wrong_directories_hash(self, tmp_path: pathlib.Path) -> None:
189 """Incoming snapshot with different directories list must be rejected."""
190 repo = _make_repo(tmp_path)
191 manifest = {"src/main.py": long_id("a" * 64)}
192 snap_id = compute_snapshot_id(manifest, ["src"])
193 good = SnapshotRecord(
194 snapshot_id=snap_id,
195 manifest=manifest,
196 directories=["src"],
197 created_at=_TS,
198 note="",
199 )
200 write_snapshot(repo, good) # good write
201
202 tampered = SnapshotRecord(
203 snapshot_id=snap_id,
204 manifest=manifest,
205 directories=["src", "malicious"], # different directories
206 created_at=_TS,
207 note="",
208 )
209 with pytest.raises((ValueError, OSError)):
210 write_snapshot(repo, tampered)
211
212
213 # ──────────────────────────────────────────────────────────────────────────────
214 # Integration: apply_mpack with corrupt snapshot_id
215 # ──────────────────────────────────────────────────────────────────────────────
216
217 def _bundle_with_snapshots(snapshots: list[SnapshotDeltaDict]) -> MPack:
218 return MPack(objects=[], snapshots=snapshots, commits=[], tags=[])
219
220
221 def _to_delta(snap: SnapshotRecord) -> SnapshotDeltaDict:
222 """Convert a SnapshotRecord to a standalone SnapshotDeltaDict for mpack construction."""
223 return SnapshotDeltaDict(
224 snapshot_id=snap.snapshot_id,
225 parent_snapshot_id=None,
226 delta_add=dict(snap.manifest),
227 delta_remove=[],
228 )
229
230
231 class TestApplyPackCorruptSnapshotId:
232
233 def test_apply_pack_skips_snapshot_with_wrong_snapshot_id(self, tmp_path: pathlib.Path) -> None:
234 """apply_mpack must not write a snapshot with mismatched snapshot_id."""
235 repo = _make_repo(tmp_path)
236 good = _good_snap()
237 wire = _to_delta(good)
238 wire["snapshot_id"] = long_id("f" * 64) # mismatch
239
240 mpack = _bundle_with_snapshots([wire])
241 apply_mpack(repo, mpack)
242
243 # The corrupt entry must not be on disk, or if on disk must be unreadable
244 result = read_snapshot(repo, long_id("f" * 64))
245 assert result is None, (
246 "SECURITY: apply_mpack wrote a snapshot with mismatched snapshot_id; "
247 "the file is on disk but permanently unreadable."
248 )
249
250 def test_apply_pack_valid_snapshot_is_readable(self, tmp_path: pathlib.Path) -> None:
251 """Regression: valid snapshots must still be written and readable."""
252 repo = _make_repo(tmp_path)
253 good = _good_snap()
254
255 mpack = _bundle_with_snapshots([_to_delta(good)])
256 result = apply_mpack(repo, mpack)
257
258 assert result["snapshots_written"] == 1
259 stored = read_snapshot(repo, good.snapshot_id)
260 assert stored is not None
261 assert stored.manifest == good.manifest
262
263 def test_apply_pack_one_corrupt_does_not_block_valid_snapshots(self, tmp_path: pathlib.Path) -> None:
264 """One corrupt snapshot in a mpack must not block the valid ones."""
265 repo = _make_repo(tmp_path)
266 good1 = _good_snap({"a.py": long_id("a" * 64)})
267 good2 = _good_snap({"b.py": long_id("b" * 64)})
268
269 corrupt_wire = _to_delta(good1)
270 corrupt_wire["snapshot_id"] = NULL_COMMIT_ID # mismatch
271
272 mpack = _bundle_with_snapshots([
273 corrupt_wire,
274 _to_delta(good1),
275 _to_delta(good2),
276 ])
277 result = apply_mpack(repo, mpack)
278
279 assert result["snapshots_written"] >= 2
280 assert read_snapshot(repo, good1.snapshot_id) is not None
281 assert read_snapshot(repo, good2.snapshot_id) is not None
282
283 def test_apply_pack_corrupt_bundle_cannot_poison_existing_good_snapshot(self, tmp_path: pathlib.Path) -> None:
284 """A malicious mpack must not be able to overwrite an existing valid snapshot."""
285 repo = _make_repo(tmp_path)
286 good = _good_snap({"src/main.py": long_id("a" * 64)})
287 write_snapshot(repo, good) # write the good snapshot first
288
289 # MPack: same snapshot_id, different (injected) delta_add — hash won't match
290 wire = SnapshotDeltaDict(
291 snapshot_id=good.snapshot_id,
292 parent_snapshot_id=None,
293 delta_add={"src/main.py": long_id("b" * 64)}, # tampered
294 delta_remove=[],
295 )
296 mpack = _bundle_with_snapshots([wire])
297 apply_mpack(repo, mpack)
298
299 stored = read_snapshot(repo, good.snapshot_id)
300 assert stored is not None, "Good snapshot was destroyed by malicious mpack"
301 assert stored.manifest == good.manifest, (
302 f"SECURITY: manifest was overwritten by malicious mpack. "
303 f"Expected {good.manifest}, got {stored.manifest}"
304 )
305
306
307 # ──────────────────────────────────────────────────────────────────────────────
308 # Stress: large mpack with one corrupt snapshot_id
309 # ──────────────────────────────────────────────────────────────────────────────
310
311 class TestApplyPackSnapshotStress:
312
313 def test_100_snapshot_bundle_one_corrupt(self, tmp_path: pathlib.Path) -> None:
314 """100-snapshot mpack, one with wrong snapshot_id: 99 written, no crash."""
315 repo = _make_repo(tmp_path)
316 snaps = [_good_snap({f"src/f{i}.py": fake_id(f"obj-{i}")}) for i in range(100)]
317
318 wires = []
319 corrupt_id = None
320 for i, snap in enumerate(snaps):
321 wire = _to_delta(snap)
322 if i == 50:
323 wire["snapshot_id"] = fake_id("corrupt-snap-50")
324 corrupt_id = fake_id("corrupt-snap-50")
325 wires.append((snap.snapshot_id, wire, i != 50))
326
327 mpack = _bundle_with_snapshots([w for _, w, _ in wires])
328 result = apply_mpack(repo, mpack)
329
330 assert result["snapshots_written"] >= 0 # no crash
331
332 # corrupt entry must not be readable
333 if corrupt_id:
334 assert read_snapshot(repo, corrupt_id) is None
335
336 # spot-check first 5 valid snapshots
337 for snap_id, _, is_good in wires[:5]:
338 if is_good:
339 assert read_snapshot(repo, snap_id) is not None, (
340 f"Valid snapshot {snap_id[:8]} not readable after apply_mpack"
341 )
File History 2 commits
sha256:a73c3f57b665e8c0be2c9e977b3ebefdb7ae8d46f196986d911c6a8f5d8b8d49 docs: update store.py references to focused module paths Sonnet 4.6 27 days ago
sha256:b6cae4448122b2cc690d913be26f7e0a539f11855b8d288bd48be43eb532b5b2 refactor: migrate all source callers off muse.core.store re… Sonnet 4.6 minor 27 days ago