gabriel / muse public
test_snapshot_schema_version_and_compression.py python
478 lines 20.3 KB
Raw
sha256:248464b6a2f758985cbef90f864fa62c61842be699d975d6e00b6a9509ef919c fix(delta): detect blob-identical file renames for files wi… Sonnet 4.6 patch 24 days ago
1 """Tests for snapshot schema_version field and zstd at-rest compression.
2
3 Every new snapshot file is written as ``zstd(json(data))`` when the JSON
4 payload exceeds ``_ZSTD_COMPRESS_THRESHOLD`` bytes. Smaller snapshots stay as
5 raw JSON — no overhead for tiny repos. Detection is self-describing via the
6 4-byte zstd magic ``\\x28\\xb5\\x2f\\xfd`` at the start of the file, so old
7 uncompressed files remain fully readable without any migration.
8
9 ``schema_version`` (integer, currently 1) is stored in each snapshot record as
10 metadata. It is intentionally excluded from the snapshot-ID hash — the hash
11 captures only content (manifest paths + object IDs + directories). This lets
12 the schema version evolve (e.g. when the Rust port lands) without invalidating
13 any existing snapshot ID.
14
15 Seven-tier coverage
16 -------------------
17 - Unit — constants, zstd helpers, schema_version field contract
18 - Integration — write/read roundtrip with schema_version and compression
19 - E2E — full CLI: ``muse snapshot create`` stores compressed file on disk
20 - Stress — 1 000-file manifest compresses and decompresses without error
21 - State — pre-compression (uncompressed) snapshots are still readable
22 - Integrity — ``_verify_snapshot_id`` passes on compressed snapshots;
23 schema_version cannot alter the content hash
24 - Performance — 1 000-file roundtrip completes within 2 s
25 - Security — zstd "bomb" that expands beyond MAX_MSGPACK_BYTES is rejected
26 """
27
28 from __future__ import annotations
29
30 import datetime
31 import pathlib
32 import time
33
34 import json as _json
35
36 import pytest
37
38 from muse.core.ids import hash_snapshot as compute_snapshot_id
39 from muse.core.io import MAX_MSGPACK_BYTES
40 from muse.core.snapshots import (
41 SnapshotRecord,
42 read_snapshot,
43 snapshot_path,
44 write_snapshot,
45 )
46 from muse.core.types import content_hash, long_id
47 from muse.core.paths import muse_dir, snapshots_dir
48 from muse.core.object_store import object_path, objects_dir, write_object
49
50
51 # ---------------------------------------------------------------------------
52 # Helpers shared across tiers
53 # ---------------------------------------------------------------------------
54
55
56 def _obj_id(n: int) -> str:
57 return long_id(f"{n:064x}")
58
59
60 def _make_snapshot(n_files: int = 5, note: str = "") -> SnapshotRecord:
61 manifest = {f"src/file_{i:04d}.py": _obj_id(i) for i in range(n_files)}
62 snap_id = compute_snapshot_id(manifest)
63 return SnapshotRecord(
64 snapshot_id=snap_id,
65 manifest=manifest,
66 note=note,
67 )
68
69
70 def _init_repo(tmp_path: pathlib.Path) -> pathlib.Path:
71 """Minimal .muse/ tree — just enough for snapshot read/write."""
72 muse = muse_dir(tmp_path)
73 (muse / "snapshots").mkdir(parents=True)
74 return tmp_path
75
76
77 # ---------------------------------------------------------------------------
78 # Tier 1 — Unit: constants and zstd helper contract
79 # ---------------------------------------------------------------------------
80
81
82 class TestConstants:
83 def test_snapshot_schema_version_is_int(self) -> None:
84 """_SNAPSHOT_SCHEMA_VERSION must be a plain int, not str or float."""
85 from muse.core.snapshots import _SNAPSHOT_SCHEMA_VERSION
86 assert isinstance(_SNAPSHOT_SCHEMA_VERSION, int)
87
88 def test_snapshot_schema_version_is_one(self) -> None:
89 """Current schema version is 1 — bump only on breaking layout changes."""
90 from muse.core.snapshots import _SNAPSHOT_SCHEMA_VERSION
91 assert _SNAPSHOT_SCHEMA_VERSION == 1
92
93 def test_zstd_magic_is_correct(self) -> None:
94 """The 4-byte zstd frame magic must match the zstd specification."""
95 from muse.core.io import _ZSTD_MAGIC
96 assert _ZSTD_MAGIC == b"\x28\xb5\x2f\xfd"
97
98 def test_compress_threshold_is_positive(self) -> None:
99 from muse.core.io import _ZSTD_COMPRESS_THRESHOLD
100 assert _ZSTD_COMPRESS_THRESHOLD > 0
101
102 def test_compress_threshold_is_reasonable(self) -> None:
103 """Threshold must be large enough that single-file snapshots are not compressed."""
104 from muse.core.io import _ZSTD_COMPRESS_THRESHOLD
105 assert _ZSTD_COMPRESS_THRESHOLD >= 1024
106
107
108 class TestZstdHelpers:
109 def test_zstd_roundtrip(self) -> None:
110 """compress → decompress_if_needed must return the original bytes exactly."""
111 from muse.core.io import _zstd_compress, zstd_decompress_if_needed
112 original = b"hello " * 1_000
113 compressed = _zstd_compress(original)
114 recovered = zstd_decompress_if_needed(compressed)
115 assert recovered == original
116
117 def test_compressed_output_starts_with_magic(self) -> None:
118 """zstd output frame must begin with the 4-byte magic sequence."""
119 from muse.core.io import _ZSTD_MAGIC, _zstd_compress
120 compressed = _zstd_compress(b"data " * 500)
121 assert compressed[:4] == _ZSTD_MAGIC
122
123 def test_decompress_noop_on_plain_bytes(self) -> None:
124 """Non-zstd bytes are returned unchanged — no corruption."""
125 from muse.core.io import zstd_decompress_if_needed
126 plain = _json.dumps({"key": "value"}).encode()
127 assert zstd_decompress_if_needed(plain) is plain or zstd_decompress_if_needed(plain) == plain
128
129 def test_decompress_noop_on_empty(self) -> None:
130 from muse.core.io import zstd_decompress_if_needed
131 assert zstd_decompress_if_needed(b"") == b""
132
133 def test_compress_is_smaller_than_input_for_repetitive_data(self) -> None:
134 from muse.core.io import _zstd_compress
135 data = b"aaaa" * 10_000
136 assert len(_zstd_compress(data)) < len(data)
137
138
139 # ---------------------------------------------------------------------------
140 # Tier 2 — Integration: schema_version field in SnapshotRecord
141 # ---------------------------------------------------------------------------
142
143
144 class TestSchemaVersionField:
145 def test_default_schema_version_is_one(self) -> None:
146 """Newly created SnapshotRecord defaults to schema_version=1."""
147 snap = _make_snapshot()
148 assert snap.schema_version == 1
149
150 def test_to_dict_includes_schema_version(self) -> None:
151 """Serialized dict must carry the schema_version key."""
152 snap = _make_snapshot()
153 d = snap.to_dict()
154 assert "schema_version" in d
155 assert d["schema_version"] == 1
156
157 def test_schema_version_excluded_from_snapshot_id_hash(self) -> None:
158 """schema_version is metadata — changing it must not change snapshot_id."""
159 manifest = {"a.py": _obj_id(0xAAAA)}
160 snap_id = compute_snapshot_id(manifest)
161 snap_v1 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=1)
162 snap_v99 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=99)
163 # Both records carry the same snapshot_id; re-verification must pass for both
164 from muse.core.snapshots import _verify_snapshot_id
165 _verify_snapshot_id(snap_v1, snap_id, pathlib.Path("<test>"))
166 _verify_snapshot_id(snap_v99, snap_id, pathlib.Path("<test>"))
167
168 def test_from_dict_reads_schema_version(self) -> None:
169 """from_dict must deserialise schema_version from the stored dict."""
170 snap = _make_snapshot()
171 d = snap.to_dict()
172 recovered = SnapshotRecord.from_dict(d)
173 assert recovered.schema_version == 1
174
175 def test_from_dict_defaults_schema_version_for_old_files(self) -> None:
176 """Files written before schema_version was added must read as version 1."""
177 snap = _make_snapshot()
178 d = snap.to_dict()
179 del d["schema_version"] # simulate a pre-migration file
180 recovered = SnapshotRecord.from_dict(d)
181 assert recovered.schema_version == 1
182
183 def test_from_dict_reads_schema_version(self) -> None:
184 snap = _make_snapshot()
185 recovered = SnapshotRecord.from_dict(snap.to_dict())
186 assert recovered.schema_version == 1
187
188 def test_from_dict_defaults_schema_version_for_missing_key(self) -> None:
189 snap = _make_snapshot()
190 d = snap.to_dict()
191 del d["schema_version"]
192 recovered = SnapshotRecord.from_dict(d)
193 assert recovered.schema_version == 1
194
195
196 # ---------------------------------------------------------------------------
197 # Tier 3 — Integration: write / read roundtrip with compression
198 # ---------------------------------------------------------------------------
199
200
201 class TestCompressionRoundtrip:
202 def test_large_snapshot_on_disk_is_zstd_compressed(self, tmp_path: pathlib.Path) -> None:
203 """A large snapshot must be written to the unified object store with the correct header."""
204 root = _init_repo(tmp_path)
205 n = 500
206 snap = _make_snapshot(n_files=n)
207 write_snapshot(root, snap)
208 path = object_path(root, snap.snapshot_id)
209 assert path.exists(), "Snapshot not found in unified object store"
210 raw = path.read_bytes()
211 assert raw.startswith(b"snapshot "), (
212 f"Expected 'snapshot ' header in unified store; got {raw[:20]!r}"
213 )
214
215 def test_small_snapshot_on_disk_is_not_compressed(self, tmp_path: pathlib.Path) -> None:
216 """A small snapshot must be stored in the unified object store with the correct header."""
217 from muse.core.io import _ZSTD_MAGIC
218 root = _init_repo(tmp_path)
219 snap = _make_snapshot(n_files=1)
220 write_snapshot(root, snap)
221 raw = object_path(root, snap.snapshot_id).read_bytes()
222 assert raw[:4] != _ZSTD_MAGIC
223
224 def test_compressed_roundtrip_record_is_identical(self, tmp_path: pathlib.Path) -> None:
225 """write_snapshot → read_snapshot must return an identical record (large)."""
226 root = _init_repo(tmp_path)
227 snap = _make_snapshot(n_files=500)
228 write_snapshot(root, snap)
229 loaded = read_snapshot(root, snap.snapshot_id)
230 assert loaded is not None
231 assert loaded.snapshot_id == snap.snapshot_id
232 assert loaded.manifest == snap.manifest
233 assert loaded.directories == snap.directories
234 assert loaded.schema_version == snap.schema_version
235
236 def test_small_roundtrip_record_is_identical(self, tmp_path: pathlib.Path) -> None:
237 """write_snapshot → read_snapshot for a tiny (uncompressed) file."""
238 root = _init_repo(tmp_path)
239 snap = _make_snapshot(n_files=2, note="tiny")
240 write_snapshot(root, snap)
241 loaded = read_snapshot(root, snap.snapshot_id)
242 assert loaded is not None
243 assert loaded.snapshot_id == snap.snapshot_id
244 assert loaded.note == "tiny"
245 assert loaded.schema_version == 1
246
247 def test_schema_version_survives_roundtrip(self, tmp_path: pathlib.Path) -> None:
248 root = _init_repo(tmp_path)
249 snap = _make_snapshot(n_files=500)
250 write_snapshot(root, snap)
251 loaded = read_snapshot(root, snap.snapshot_id)
252 assert loaded is not None
253 assert loaded.schema_version == 1
254
255
256 # ---------------------------------------------------------------------------
257 # Tier 4 — E2E: CLI creates a compressed file on disk
258 # ---------------------------------------------------------------------------
259
260
261 class TestCliCompression:
262 def test_cli_commit_writes_compressed_snapshot(self, tmp_path: pathlib.Path) -> None:
263 """``muse commit`` must write a snapshot object into the unified object store."""
264 from tests.cli_test_helper import CliRunner
265 from muse.core.types import fake_id, blob_id
266 import os
267
268 runner = CliRunner()
269 env = {"MUSE_REPO_ROOT": str(tmp_path)}
270
271 # Minimal repo structure
272 dot_muse = muse_dir(tmp_path)
273 dot_muse.mkdir()
274 repo_id = fake_id("repo")
275 (dot_muse / "repo.json").write_text(
276 __import__("json").dumps({
277 "repo_id": repo_id, "domain": "code",
278 "default_branch": "main", "created_at": "2025-01-01T00:00:00+00:00",
279 })
280 )
281 (dot_muse / "HEAD").write_text("ref: refs/heads/main")
282 (dot_muse / "refs" / "heads").mkdir(parents=True)
283 for d in ("snapshots", "commits", "objects"):
284 (dot_muse / d).mkdir()
285
286 # Write 300 source files into the unified object store
287 src = tmp_path / "src"
288 src.mkdir()
289 for i in range(300):
290 content = f"module_{i:04d} = {i}\n".encode()
291 obj_id = blob_id(content)
292 write_object(tmp_path, obj_id, content)
293 (src / f"module_{i:04d}.py").write_text(f"module_{i:04d} = {i}\n")
294
295 r = runner.invoke(None, ["commit", "-m", "big"], env=env, catch_exceptions=False)
296 assert r.exit_code == 0, r.output
297
298 # Find snapshot objects in the unified store (files starting with "snapshot " header)
299 obj_dir = objects_dir(tmp_path)
300 snap_objects = [
301 p for p in obj_dir.rglob("*")
302 if p.is_file() and p.read_bytes().startswith(b"snapshot ")
303 ]
304 assert snap_objects, "No snapshot objects found in unified store after commit"
305
306
307 # ---------------------------------------------------------------------------
308 # Tier 5 — Stress: 1 000-file manifest
309 # ---------------------------------------------------------------------------
310
311
312 class TestStress:
313 def test_1000_file_snapshot_compress_decompress(self, tmp_path: pathlib.Path) -> None:
314 """1 000-file manifest must write and read back correctly under compression."""
315 root = _init_repo(tmp_path)
316 snap = _make_snapshot(n_files=1_000)
317 write_snapshot(root, snap)
318 loaded = read_snapshot(root, snap.snapshot_id)
319 assert loaded is not None
320 assert len(loaded.manifest) == 1_000
321 assert loaded.snapshot_id == snap.snapshot_id
322
323
324 # ---------------------------------------------------------------------------
325 # Tier 6 — State: pre-compression files remain readable
326 # ---------------------------------------------------------------------------
327
328
329 class TestBackwardCompat:
330 def test_old_uncompressed_snapshot_still_readable(self, tmp_path: pathlib.Path) -> None:
331 """A snapshot written to the object store without schema_version must still load."""
332 import json as _json
333 root = _init_repo(tmp_path)
334 snap = _make_snapshot(n_files=5)
335 raw_dict = snap.to_dict()
336 del raw_dict["schema_version"] # simulate pre-migration file
337 payload = _json.dumps(raw_dict, separators=(",", ":")).encode()
338 path = object_path(root, snap.snapshot_id)
339 path.parent.mkdir(parents=True, exist_ok=True)
340 path.write_bytes(b"snapshot " + str(len(payload)).encode() + b"\0" + payload)
341
342 loaded = read_snapshot(root, snap.snapshot_id)
343 assert loaded is not None
344 assert loaded.snapshot_id == snap.snapshot_id
345 assert loaded.schema_version == 1 # default applied
346
347 def test_mixed_compressed_uncompressed_in_same_dir(self, tmp_path: pathlib.Path) -> None:
348 """Both compressed and uncompressed snapshots may coexist in .muse/snapshots/."""
349 root = _init_repo(tmp_path)
350 small = _make_snapshot(n_files=1)
351 large = _make_snapshot(n_files=500)
352
353 write_snapshot(root, small)
354 write_snapshot(root, large)
355
356 loaded_small = read_snapshot(root, small.snapshot_id)
357 loaded_large = read_snapshot(root, large.snapshot_id)
358
359 assert loaded_small is not None
360 assert loaded_large is not None
361 assert len(loaded_large.manifest) == 500
362
363
364 # ---------------------------------------------------------------------------
365 # Tier 7 — Integrity: verify_snapshot_id passes through compression round-trip
366 # ---------------------------------------------------------------------------
367
368
369 class TestIntegrity:
370 def test_verify_snapshot_id_passes_after_compression(self, tmp_path: pathlib.Path) -> None:
371 """Hash verification must succeed when reading a compressed snapshot."""
372 root = _init_repo(tmp_path)
373 snap = _make_snapshot(n_files=500)
374 write_snapshot(root, snap)
375 # read_snapshot internally calls _verify_snapshot_id; None means failure
376 loaded = read_snapshot(root, snap.snapshot_id)
377 assert loaded is not None, "read_snapshot returned None — hash verification failed"
378
379 def test_tampered_compressed_manifest_is_rejected(self, tmp_path: pathlib.Path) -> None:
380 """Altering a byte in a snapshot payload must cause read_snapshot to return None."""
381 import os
382 root = _init_repo(tmp_path)
383 snap = _make_snapshot(n_files=500)
384 write_snapshot(root, snap)
385 path = object_path(root, snap.snapshot_id)
386 raw = bytearray(path.read_bytes())
387 # Find end of "snapshot N\0" header and flip a byte in the JSON payload
388 header_end = raw.index(ord("\0")) + 1
389 if len(raw) > header_end + 4:
390 raw[header_end + 4] ^= 0xFF
391 os.chmod(path, 0o644)
392 path.write_bytes(bytes(raw))
393 loaded = read_snapshot(root, snap.snapshot_id)
394 assert loaded is None, "Tampered snapshot should not load"
395
396 def test_gc_finds_objects_in_compressed_snapshot(self, tmp_path: pathlib.Path) -> None:
397 """GC reachability walk must extract object IDs from compressed snapshots."""
398 from muse.core.gc import _collect_reachable_objects
399 root = _init_repo(tmp_path)
400 snap = _make_snapshot(n_files=500)
401 write_snapshot(root, snap)
402
403 reachable: set[str] = _collect_reachable_objects(root)
404
405 # All object IDs in the manifest must appear in the reachable set
406 for oid in snap.manifest.values():
407 assert oid in reachable, f"Object {oid[:24]}… not found in GC reachable set"
408
409
410 # ---------------------------------------------------------------------------
411 # Tier 8 — Performance
412 # ---------------------------------------------------------------------------
413
414
415 class TestPerformance:
416 def test_1000_file_roundtrip_under_2s(self, tmp_path: pathlib.Path) -> None:
417 """write_snapshot + read_snapshot for 1 000 files must complete within 2 s."""
418 root = _init_repo(tmp_path)
419 snap = _make_snapshot(n_files=1_000)
420
421 start = time.perf_counter()
422 write_snapshot(root, snap)
423 loaded = read_snapshot(root, snap.snapshot_id)
424 elapsed = time.perf_counter() - start
425
426 assert loaded is not None
427 assert elapsed < 2.0, f"Roundtrip took {elapsed:.3f}s — exceeds 2s budget"
428
429
430 # ---------------------------------------------------------------------------
431 # Tier 9 — Security
432 # ---------------------------------------------------------------------------
433
434
435 class TestSecurity:
436 def test_zstd_bomb_rejected(self, tmp_path: pathlib.Path) -> None:
437 """A zstd-compressed payload that decompresses beyond MAX_MSGPACK_BYTES must be rejected."""
438 import zstandard
439 from muse.core.io import _ZSTD_MAGIC
440
441 root = _init_repo(tmp_path)
442 snap = _make_snapshot(n_files=1)
443
444 # Build a payload that is huge when decompressed.
445 # We use a repetitive structure that compresses very well.
446 huge_data = b"\x00" * (MAX_MSGPACK_BYTES + 1)
447 compressed = zstandard.ZstdCompressor(level=1).compress(huge_data)
448 assert compressed[:4] == _ZSTD_MAGIC
449
450 path = snapshot_path(root, snap.snapshot_id)
451 path.parent.mkdir(parents=True, exist_ok=True)
452 path.write_bytes(compressed)
453
454 # read_snapshot must fail gracefully — not crash, not return data
455 loaded = read_snapshot(root, snap.snapshot_id)
456 assert loaded is None, "Zstd bomb should be rejected, not loaded"
457
458 def test_schema_version_cannot_alter_snapshot_id(self, tmp_path: pathlib.Path) -> None:
459 """Two records differing only in schema_version must have the same snapshot_id."""
460 manifest = {"src/main.py": _obj_id(0xFFFF)}
461 snap_id = compute_snapshot_id(manifest)
462 r1 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=1)
463 r2 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=42)
464 assert r1.snapshot_id == r2.snapshot_id
465
466 def test_symlinked_snapshot_dir_not_written(self, tmp_path: pathlib.Path) -> None:
467 """write_snapshot must refuse to write when the object shard dir is a symlink."""
468 import shutil
469 root = _init_repo(tmp_path)
470 snap = _make_snapshot(n_files=1)
471 shard_dir = object_path(root, snap.snapshot_id).parent
472 # Pre-create the shard dir so mkdir(exist_ok=True) doesn't recreate it,
473 # then replace it with a symlink to /tmp.
474 shard_dir.mkdir(parents=True, exist_ok=True)
475 shutil.rmtree(shard_dir)
476 shard_dir.symlink_to("/tmp")
477 with pytest.raises((ValueError, OSError)):
478 write_snapshot(root, snap)
File History 1 commit
sha256:248464b6a2f758985cbef90f864fa62c61842be699d975d6e00b6a9509ef919c fix(delta): detect blob-identical file renames for files wi… Sonnet 4.6 patch 24 days ago