"""Tests for snapshot schema_version field and zstd at-rest compression. Every new snapshot file is written as ``zstd(json(data))`` when the JSON payload exceeds ``_ZSTD_COMPRESS_THRESHOLD`` bytes. Smaller snapshots stay as raw JSON — no overhead for tiny repos. Detection is self-describing via the 4-byte zstd magic ``\\x28\\xb5\\x2f\\xfd`` at the start of the file, so old uncompressed files remain fully readable without any migration. ``schema_version`` (integer, currently 1) is stored in each snapshot record as metadata. It is intentionally excluded from the snapshot-ID hash — the hash captures only content (manifest paths + object IDs + directories). This lets the schema version evolve (e.g. when the Rust port lands) without invalidating any existing snapshot ID. Seven-tier coverage ------------------- - Unit — constants, zstd helpers, schema_version field contract - Integration — write/read roundtrip with schema_version and compression - E2E — full CLI: ``muse snapshot create`` stores compressed file on disk - Stress — 1 000-file manifest compresses and decompresses without error - State — pre-compression (uncompressed) snapshots are still readable - Integrity — ``_verify_snapshot_id`` passes on compressed snapshots; schema_version cannot alter the content hash - Performance — 1 000-file roundtrip completes within 2 s - Security — zstd "bomb" that expands beyond MAX_MSGPACK_BYTES is rejected """ from __future__ import annotations import datetime import pathlib import time import json as _json import pytest from muse.core.ids import hash_snapshot as compute_snapshot_id from muse.core.io import MAX_MSGPACK_BYTES from muse.core.snapshots import ( SnapshotRecord, read_snapshot, snapshot_path, write_snapshot, ) from muse.core.types import content_hash, long_id from muse.core.paths import muse_dir, snapshots_dir from muse.core.object_store import object_path, objects_dir, write_object # --------------------------------------------------------------------------- # Helpers shared across tiers # --------------------------------------------------------------------------- def _obj_id(n: int) -> str: return long_id(f"{n:064x}") def _make_snapshot(n_files: int = 5, note: str = "") -> SnapshotRecord: manifest = {f"src/file_{i:04d}.py": _obj_id(i) for i in range(n_files)} snap_id = compute_snapshot_id(manifest) return SnapshotRecord( snapshot_id=snap_id, manifest=manifest, note=note, ) def _init_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal .muse/ tree — just enough for snapshot read/write.""" muse = muse_dir(tmp_path) (muse / "snapshots").mkdir(parents=True) return tmp_path # --------------------------------------------------------------------------- # Tier 1 — Unit: constants and zstd helper contract # --------------------------------------------------------------------------- class TestConstants: def test_snapshot_schema_version_is_int(self) -> None: """_SNAPSHOT_SCHEMA_VERSION must be a plain int, not str or float.""" from muse.core.snapshots import _SNAPSHOT_SCHEMA_VERSION assert isinstance(_SNAPSHOT_SCHEMA_VERSION, int) def test_snapshot_schema_version_is_one(self) -> None: """Current schema version is 1 — bump only on breaking layout changes.""" from muse.core.snapshots import _SNAPSHOT_SCHEMA_VERSION assert _SNAPSHOT_SCHEMA_VERSION == 1 def test_zstd_magic_is_correct(self) -> None: """The 4-byte zstd frame magic must match the zstd specification.""" from muse.core.io import _ZSTD_MAGIC assert _ZSTD_MAGIC == b"\x28\xb5\x2f\xfd" def test_compress_threshold_is_positive(self) -> None: from muse.core.io import _ZSTD_COMPRESS_THRESHOLD assert _ZSTD_COMPRESS_THRESHOLD > 0 def test_compress_threshold_is_reasonable(self) -> None: """Threshold must be large enough that single-file snapshots are not compressed.""" from muse.core.io import _ZSTD_COMPRESS_THRESHOLD assert _ZSTD_COMPRESS_THRESHOLD >= 1024 class TestZstdHelpers: def test_zstd_roundtrip(self) -> None: """compress → decompress_if_needed must return the original bytes exactly.""" from muse.core.io import _zstd_compress, zstd_decompress_if_needed original = b"hello " * 1_000 compressed = _zstd_compress(original) recovered = zstd_decompress_if_needed(compressed) assert recovered == original def test_compressed_output_starts_with_magic(self) -> None: """zstd output frame must begin with the 4-byte magic sequence.""" from muse.core.io import _ZSTD_MAGIC, _zstd_compress compressed = _zstd_compress(b"data " * 500) assert compressed[:4] == _ZSTD_MAGIC def test_decompress_noop_on_plain_bytes(self) -> None: """Non-zstd bytes are returned unchanged — no corruption.""" from muse.core.io import zstd_decompress_if_needed plain = _json.dumps({"key": "value"}).encode() assert zstd_decompress_if_needed(plain) is plain or zstd_decompress_if_needed(plain) == plain def test_decompress_noop_on_empty(self) -> None: from muse.core.io import zstd_decompress_if_needed assert zstd_decompress_if_needed(b"") == b"" def test_compress_is_smaller_than_input_for_repetitive_data(self) -> None: from muse.core.io import _zstd_compress data = b"aaaa" * 10_000 assert len(_zstd_compress(data)) < len(data) # --------------------------------------------------------------------------- # Tier 2 — Integration: schema_version field in SnapshotRecord # --------------------------------------------------------------------------- class TestSchemaVersionField: def test_default_schema_version_is_one(self) -> None: """Newly created SnapshotRecord defaults to schema_version=1.""" snap = _make_snapshot() assert snap.schema_version == 1 def test_to_dict_includes_schema_version(self) -> None: """Serialized dict must carry the schema_version key.""" snap = _make_snapshot() d = snap.to_dict() assert "schema_version" in d assert d["schema_version"] == 1 def test_schema_version_excluded_from_snapshot_id_hash(self) -> None: """schema_version is metadata — changing it must not change snapshot_id.""" manifest = {"a.py": _obj_id(0xAAAA)} snap_id = compute_snapshot_id(manifest) snap_v1 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=1) snap_v99 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=99) # Both records carry the same snapshot_id; re-verification must pass for both from muse.core.snapshots import _verify_snapshot_id _verify_snapshot_id(snap_v1, snap_id, pathlib.Path("")) _verify_snapshot_id(snap_v99, snap_id, pathlib.Path("")) def test_from_dict_reads_schema_version(self) -> None: """from_dict must deserialise schema_version from the stored dict.""" snap = _make_snapshot() d = snap.to_dict() recovered = SnapshotRecord.from_dict(d) assert recovered.schema_version == 1 def test_from_dict_defaults_schema_version_for_old_files(self) -> None: """Files written before schema_version was added must read as version 1.""" snap = _make_snapshot() d = snap.to_dict() del d["schema_version"] # simulate a pre-migration file recovered = SnapshotRecord.from_dict(d) assert recovered.schema_version == 1 def test_from_dict_reads_schema_version(self) -> None: snap = _make_snapshot() recovered = SnapshotRecord.from_dict(snap.to_dict()) assert recovered.schema_version == 1 def test_from_dict_defaults_schema_version_for_missing_key(self) -> None: snap = _make_snapshot() d = snap.to_dict() del d["schema_version"] recovered = SnapshotRecord.from_dict(d) assert recovered.schema_version == 1 # --------------------------------------------------------------------------- # Tier 3 — Integration: write / read roundtrip with compression # --------------------------------------------------------------------------- class TestCompressionRoundtrip: def test_large_snapshot_on_disk_is_zstd_compressed(self, tmp_path: pathlib.Path) -> None: """A large snapshot must be written to the unified object store with the correct header.""" root = _init_repo(tmp_path) n = 500 snap = _make_snapshot(n_files=n) write_snapshot(root, snap) path = object_path(root, snap.snapshot_id) assert path.exists(), "Snapshot not found in unified object store" raw = path.read_bytes() assert raw.startswith(b"snapshot "), ( f"Expected 'snapshot ' header in unified store; got {raw[:20]!r}" ) def test_small_snapshot_on_disk_is_not_compressed(self, tmp_path: pathlib.Path) -> None: """A small snapshot must be stored in the unified object store with the correct header.""" from muse.core.io import _ZSTD_MAGIC root = _init_repo(tmp_path) snap = _make_snapshot(n_files=1) write_snapshot(root, snap) raw = object_path(root, snap.snapshot_id).read_bytes() assert raw[:4] != _ZSTD_MAGIC def test_compressed_roundtrip_record_is_identical(self, tmp_path: pathlib.Path) -> None: """write_snapshot → read_snapshot must return an identical record (large).""" root = _init_repo(tmp_path) snap = _make_snapshot(n_files=500) write_snapshot(root, snap) loaded = read_snapshot(root, snap.snapshot_id) assert loaded is not None assert loaded.snapshot_id == snap.snapshot_id assert loaded.manifest == snap.manifest assert loaded.directories == snap.directories assert loaded.schema_version == snap.schema_version def test_small_roundtrip_record_is_identical(self, tmp_path: pathlib.Path) -> None: """write_snapshot → read_snapshot for a tiny (uncompressed) file.""" root = _init_repo(tmp_path) snap = _make_snapshot(n_files=2, note="tiny") write_snapshot(root, snap) loaded = read_snapshot(root, snap.snapshot_id) assert loaded is not None assert loaded.snapshot_id == snap.snapshot_id assert loaded.note == "tiny" assert loaded.schema_version == 1 def test_schema_version_survives_roundtrip(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) snap = _make_snapshot(n_files=500) write_snapshot(root, snap) loaded = read_snapshot(root, snap.snapshot_id) assert loaded is not None assert loaded.schema_version == 1 # --------------------------------------------------------------------------- # Tier 4 — E2E: CLI creates a compressed file on disk # --------------------------------------------------------------------------- class TestCliCompression: def test_cli_commit_writes_compressed_snapshot(self, tmp_path: pathlib.Path) -> None: """``muse commit`` must write a snapshot object into the unified object store.""" from tests.cli_test_helper import CliRunner from muse.core.types import fake_id, blob_id import os runner = CliRunner() env = {"MUSE_REPO_ROOT": str(tmp_path)} # Minimal repo structure dot_muse = muse_dir(tmp_path) dot_muse.mkdir() repo_id = fake_id("repo") (dot_muse / "repo.json").write_text( __import__("json").dumps({ "repo_id": repo_id, "domain": "code", "default_branch": "main", "created_at": "2025-01-01T00:00:00+00:00", }) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "refs" / "heads").mkdir(parents=True) for d in ("snapshots", "commits", "objects"): (dot_muse / d).mkdir() # Write 300 source files into the unified object store src = tmp_path / "src" src.mkdir() for i in range(300): content = f"module_{i:04d} = {i}\n".encode() obj_id = blob_id(content) write_object(tmp_path, obj_id, content) (src / f"module_{i:04d}.py").write_text(f"module_{i:04d} = {i}\n") r = runner.invoke(None, ["commit", "-m", "big"], env=env, catch_exceptions=False) assert r.exit_code == 0, r.output # Find snapshot objects in the unified store (files starting with "snapshot " header) obj_dir = objects_dir(tmp_path) snap_objects = [ p for p in obj_dir.rglob("*") if p.is_file() and p.read_bytes().startswith(b"snapshot ") ] assert snap_objects, "No snapshot objects found in unified store after commit" # --------------------------------------------------------------------------- # Tier 5 — Stress: 1 000-file manifest # --------------------------------------------------------------------------- class TestStress: def test_1000_file_snapshot_compress_decompress(self, tmp_path: pathlib.Path) -> None: """1 000-file manifest must write and read back correctly under compression.""" root = _init_repo(tmp_path) snap = _make_snapshot(n_files=1_000) write_snapshot(root, snap) loaded = read_snapshot(root, snap.snapshot_id) assert loaded is not None assert len(loaded.manifest) == 1_000 assert loaded.snapshot_id == snap.snapshot_id # --------------------------------------------------------------------------- # Tier 6 — State: pre-compression files remain readable # --------------------------------------------------------------------------- class TestBackwardCompat: def test_old_uncompressed_snapshot_still_readable(self, tmp_path: pathlib.Path) -> None: """A snapshot written to the object store without schema_version must still load.""" import json as _json root = _init_repo(tmp_path) snap = _make_snapshot(n_files=5) raw_dict = snap.to_dict() del raw_dict["schema_version"] # simulate pre-migration file payload = _json.dumps(raw_dict, separators=(",", ":")).encode() path = object_path(root, snap.snapshot_id) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(b"snapshot " + str(len(payload)).encode() + b"\0" + payload) loaded = read_snapshot(root, snap.snapshot_id) assert loaded is not None assert loaded.snapshot_id == snap.snapshot_id assert loaded.schema_version == 1 # default applied def test_mixed_compressed_uncompressed_in_same_dir(self, tmp_path: pathlib.Path) -> None: """Both compressed and uncompressed snapshots may coexist in .muse/snapshots/.""" root = _init_repo(tmp_path) small = _make_snapshot(n_files=1) large = _make_snapshot(n_files=500) write_snapshot(root, small) write_snapshot(root, large) loaded_small = read_snapshot(root, small.snapshot_id) loaded_large = read_snapshot(root, large.snapshot_id) assert loaded_small is not None assert loaded_large is not None assert len(loaded_large.manifest) == 500 # --------------------------------------------------------------------------- # Tier 7 — Integrity: verify_snapshot_id passes through compression round-trip # --------------------------------------------------------------------------- class TestIntegrity: def test_verify_snapshot_id_passes_after_compression(self, tmp_path: pathlib.Path) -> None: """Hash verification must succeed when reading a compressed snapshot.""" root = _init_repo(tmp_path) snap = _make_snapshot(n_files=500) write_snapshot(root, snap) # read_snapshot internally calls _verify_snapshot_id; None means failure loaded = read_snapshot(root, snap.snapshot_id) assert loaded is not None, "read_snapshot returned None — hash verification failed" def test_tampered_compressed_manifest_is_rejected(self, tmp_path: pathlib.Path) -> None: """Altering a byte in a snapshot payload must cause read_snapshot to return None.""" import os root = _init_repo(tmp_path) snap = _make_snapshot(n_files=500) write_snapshot(root, snap) path = object_path(root, snap.snapshot_id) raw = bytearray(path.read_bytes()) # Find end of "snapshot N\0" header and flip a byte in the JSON payload header_end = raw.index(ord("\0")) + 1 if len(raw) > header_end + 4: raw[header_end + 4] ^= 0xFF os.chmod(path, 0o644) path.write_bytes(bytes(raw)) loaded = read_snapshot(root, snap.snapshot_id) assert loaded is None, "Tampered snapshot should not load" def test_gc_finds_objects_in_compressed_snapshot(self, tmp_path: pathlib.Path) -> None: """GC reachability walk must extract object IDs from compressed snapshots.""" from muse.core.gc import _collect_reachable_objects root = _init_repo(tmp_path) snap = _make_snapshot(n_files=500) write_snapshot(root, snap) reachable: set[str] = _collect_reachable_objects(root) # All object IDs in the manifest must appear in the reachable set for oid in snap.manifest.values(): assert oid in reachable, f"Object {oid[:24]}… not found in GC reachable set" # --------------------------------------------------------------------------- # Tier 8 — Performance # --------------------------------------------------------------------------- class TestPerformance: def test_1000_file_roundtrip_under_2s(self, tmp_path: pathlib.Path) -> None: """write_snapshot + read_snapshot for 1 000 files must complete within 2 s.""" root = _init_repo(tmp_path) snap = _make_snapshot(n_files=1_000) start = time.perf_counter() write_snapshot(root, snap) loaded = read_snapshot(root, snap.snapshot_id) elapsed = time.perf_counter() - start assert loaded is not None assert elapsed < 2.0, f"Roundtrip took {elapsed:.3f}s — exceeds 2s budget" # --------------------------------------------------------------------------- # Tier 9 — Security # --------------------------------------------------------------------------- class TestSecurity: def test_zstd_bomb_rejected(self, tmp_path: pathlib.Path) -> None: """A zstd-compressed payload that decompresses beyond MAX_MSGPACK_BYTES must be rejected.""" import zstandard from muse.core.io import _ZSTD_MAGIC root = _init_repo(tmp_path) snap = _make_snapshot(n_files=1) # Build a payload that is huge when decompressed. # We use a repetitive structure that compresses very well. huge_data = b"\x00" * (MAX_MSGPACK_BYTES + 1) compressed = zstandard.ZstdCompressor(level=1).compress(huge_data) assert compressed[:4] == _ZSTD_MAGIC path = snapshot_path(root, snap.snapshot_id) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(compressed) # read_snapshot must fail gracefully — not crash, not return data loaded = read_snapshot(root, snap.snapshot_id) assert loaded is None, "Zstd bomb should be rejected, not loaded" def test_schema_version_cannot_alter_snapshot_id(self, tmp_path: pathlib.Path) -> None: """Two records differing only in schema_version must have the same snapshot_id.""" manifest = {"src/main.py": _obj_id(0xFFFF)} snap_id = compute_snapshot_id(manifest) r1 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=1) r2 = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, schema_version=42) assert r1.snapshot_id == r2.snapshot_id def test_symlinked_snapshot_dir_not_written(self, tmp_path: pathlib.Path) -> None: """write_snapshot must refuse to write when the object shard dir is a symlink.""" import shutil root = _init_repo(tmp_path) snap = _make_snapshot(n_files=1) shard_dir = object_path(root, snap.snapshot_id).parent # Pre-create the shard dir so mkdir(exist_ok=True) doesn't recreate it, # then replace it with a symlink to /tmp. shard_dir.mkdir(parents=True, exist_ok=True) shutil.rmtree(shard_dir) shard_dir.symlink_to("/tmp") with pytest.raises((ValueError, OSError)): write_snapshot(root, snap)