"""TDD — bundle objects must be zstd-compressed on the bundle push path. Git compresses pack objects in C (libxdiff/zlib). Our custom Python delta loop was O(n) per blob in pure Python — 18s for 5162 real-world objects. The fix: use zstandard (C extension) for per-object compression. No custom delta loop. One C call per object. Three invariants: 1. Wire size with compression < 40% of raw payload. zstd on Python-like text achieves ~4-6x ratio easily. 2. Every compressed object reconstructs to bytes matching its object_id. sha256(zstd.decompress(content)) == object_id. This is the correctness proof — content-addressing is the verification. 3. build_mpack_from_walk(compress=True) completes in < 1s for 200 commits × 100 files × 5 changed/commit. This is the speed gate — C extension, not Python loop. Repo dimensions: 200 commits × 100 files × 5 changed/commit, BLOB_SIZE=1024. """ from __future__ import annotations import datetime import pathlib import time import pytest import zstandard from muse.core.object_store import write_object from muse.core.mpack import build_mpack_from_walk, walk_commits from muse.core.paths import muse_dir from muse.core.ids import hash_commit, hash_snapshot from muse.core.refs import write_branch_ref from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import blob_id # --------------------------------------------------------------------------- # Dimensions # --------------------------------------------------------------------------- _N_FILES = 100 _N_COMMITS = 200 _FILES_CHANGED = 5 _BLOB_SIZE = 1024 # --------------------------------------------------------------------------- # Repo builder # --------------------------------------------------------------------------- def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str]: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"compress-test","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") blob_ids: list[str] = [] for i in range(_N_FILES): data = (f"# file {i:04d}\n" + f"x = {i}\n" * (_BLOB_SIZE // 8)).encode() oid = blob_id(data) write_object(tmp, oid, data) blob_ids.append(oid) base_manifest: dict[str, str] = { f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES) } parent: str | None = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES data = ( f"# file {idx:04d}\n" + f"x = {idx}\n" * (_BLOB_SIZE // 8) + f"# commit {i:05d}\n" ).encode() oid = blob_id(data) write_object(tmp, oid, data) manifest[f"src/file_{idx:04d}.py"] = oid sid = hash_snapshot(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = hash_commit( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(tmp, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(tmp, "main", tip) return tmp, tip # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_bundle_compression_reduces_wire_size(tmp_path: pathlib.Path) -> None: """Compressed payload must be < 40% of raw — zstd on text achieves 4-6x.""" repo, head = _make_repo(tmp_path / "repo") walk = walk_commits(repo, [head]) bundle_raw = build_mpack_from_walk(repo, walk) bundle_comp = build_mpack_from_walk(repo, walk, compress=True) raw_bytes = sum(len(o.get("content") or b"") for o in bundle_raw["blobs"]) comp_bytes = sum(len(o.get("content") or b"") for o in bundle_comp["blobs"]) ratio = comp_bytes / raw_bytes if raw_bytes else 1.0 assert ratio < 0.40, ( f"compressed payload is {ratio:.1%} of raw — expected < 40%\n" f" raw: {raw_bytes:,} bytes\n" f" compressed: {comp_bytes:,} bytes\n" f" blobs: {len(bundle_comp['blobs'])}" ) def test_bundle_compressed_objects_reconstructable(tmp_path: pathlib.Path) -> None: """sha256(zstd.decompress(content)) == object_id for every compressed object. Content-addressing is the proof — no external verification needed. """ repo, head = _make_repo(tmp_path / "repo") walk = walk_commits(repo, [head]) bundle = build_mpack_from_walk(repo, walk, compress=True) dctx = zstandard.ZstdDecompressor() for obj in bundle["blobs"]: enc = obj.get("encoding", "raw") content: bytes = obj.get("content") or b"" oid: str = obj["object_id"] if enc == "raw": assert blob_id(content) == oid elif enc == "zstd": raw = dctx.decompress(content) assert blob_id(raw) == oid, f"zstd object {oid[:16]}… sha256 mismatch" else: pytest.fail(f"unexpected encoding {enc!r} on object {oid[:16]}…") def test_bundle_compression_is_fast(tmp_path: pathlib.Path) -> None: """build_mpack_from_walk(compress=True) must complete in < 1s. zstd is a C extension — one call per object, no Python loop. This gate proves we are not running a pure-Python compression loop. """ repo, head = _make_repo(tmp_path / "repo") walk = walk_commits(repo, [head]) t0 = time.perf_counter() bundle = build_mpack_from_walk(repo, walk, compress=True) elapsed = time.perf_counter() - t0 n_blobs = len(bundle["blobs"]) assert elapsed < 1.0, ( f"build_mpack_from_walk(compress=True) took {elapsed:.2f}s — expected < 1s\n" f" {_N_COMMITS} commits × {_N_FILES} files × {_FILES_CHANGED} changed\n" f" {n_blobs} blobs" ) print(f"\n {n_blobs} blobs compressed in {elapsed*1000:.0f}ms")