"""TDD — apply_mpack writes pack, not loose objects (issue #70 Phase 2). After this change: - Wire-received objects land in a single .mpack + .idx file pair. - Zero loose object writes for wire-received blobs. - Commits and snapshots still go to .muse/commits/ and .muse/snapshots/. - read_object() still works transparently via the pack store fallthrough. - All existing safety invariants (dedup, size cap, integrity check, failed-object propagation) are preserved. """ from __future__ import annotations import datetime import json import pathlib from unittest.mock import patch import pytest from muse.core.mpack import MPack, apply_mpack from muse.core.object_store import has_object, read_object from muse.core.paths import muse_dir, packs_dir from muse.core.ids import hash_commit, hash_snapshot from muse.core.store import ( CommitRecord, SnapshotRecord, read_commit, read_snapshot, ) from muse.core.types import blob_id _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- def _init_repo(root: pathlib.Path) -> pathlib.Path: dot = muse_dir(root) dot.mkdir(parents=True) (dot / "repo.json").write_text(json.dumps({"repo_id": "ps-test"})) for d in ("commits", "snapshots", "objects", "refs/heads"): (dot / d).mkdir(parents=True, exist_ok=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") return root def _make_mpack(n_objects: int = 3) -> tuple[MPack, list[tuple[str, bytes]]]: """Build a minimal MPack with *n_objects* blobs, one snapshot, one commit.""" objects: list[tuple[str, bytes]] = [] manifest: dict[str, str] = {} for i in range(n_objects): content = f"file-content-{i}".encode() * 16 oid = blob_id(content) objects.append((oid, content)) manifest[f"file_{i}.txt"] = oid sid = hash_snapshot(manifest) cid = hash_commit( parent_ids=[], snapshot_id=sid, message="test commit", committed_at_iso=_DT.isoformat(), ) mpack: MPack = { "objects": [{"object_id": oid, "content": raw} for oid, raw in objects], "snapshots": [{ "snapshot_id": sid, "parent_snapshot_id": None, "delta_add": manifest, "delta_remove": [], }], "commits": [CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message="test commit", committed_at=_DT, parent_commit_id=None, parent2_commit_id=None, author="", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ).to_dict()], "tags": [], } return mpack, objects # --------------------------------------------------------------------------- # Core behaviour # --------------------------------------------------------------------------- class TestApplyMpackWritesPack: def test_blobs_go_to_pack_not_loose(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, objects = _make_mpack(5) apply_mpack(repo, mpack) loose_dir = muse_dir(repo) / "objects" / "sha256" loose_files = {p for p in loose_dir.rglob("*") if p.is_file()} if loose_dir.exists() else set() blob_ids = {oid for oid, _ in objects} # Blobs must go to the pack store — none should appear as loose objects. for oid in blob_ids: _, hex_part = oid.split(":", 1) loose_path = loose_dir / hex_part[:2] / hex_part[2:] assert loose_path not in loose_files, f"blob {oid} written as loose object" def test_writes_exactly_one_pack_and_one_idx(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, _ = _make_mpack(5) apply_mpack(repo, mpack) pack_dir = packs_dir(repo) mpack_files = list(pack_dir.glob("*.mpack")) idx_files = list(pack_dir.glob("*.idx")) assert len(mpack_files) == 1, f"expected 1 .mpack, got {len(mpack_files)}" assert len(idx_files) == 1, f"expected 1 .idx, got {len(idx_files)}" def test_objects_readable_via_read_object(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, objects = _make_mpack(5) apply_mpack(repo, mpack) for oid, content in objects: assert read_object(repo, oid) == content def test_has_object_finds_packed_objects(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, objects = _make_mpack(3) apply_mpack(repo, mpack) for oid, _ in objects: assert has_object(repo, oid) def test_commits_still_written_to_commits_dir(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, _ = _make_mpack(2) cid = mpack["commits"][0]["commit_id"] apply_mpack(repo, mpack) assert read_commit(repo, cid) is not None def test_snapshots_still_written_to_snapshots_dir(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, _ = _make_mpack(2) sid = mpack["snapshots"][0]["snapshot_id"] apply_mpack(repo, mpack) assert read_snapshot(repo, sid) is not None def test_xl_objects_produce_two_files_not_thousands(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, _ = _make_mpack(500) apply_mpack(repo, mpack) pack_dir = packs_dir(repo) total_files = len(list(pack_dir.glob("*"))) assert total_files == 2, f"expected 2 files (1 .mpack + 1 .idx), got {total_files}" def test_apply_mpack_result_counts_objects_written(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, objects = _make_mpack(4) result = apply_mpack(repo, mpack) assert result["objects_written"] == 4 assert result["objects_skipped"] == 0 def test_apply_mpack_idempotent(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack, objects = _make_mpack(3) apply_mpack(repo, mpack) result2 = apply_mpack(repo, mpack) # Second apply: all objects already present → all skipped assert result2["objects_written"] == 0 assert result2["objects_skipped"] == 3 # Still exactly one pack file (no duplicate written) assert len(list(packs_dir(repo).glob("*.mpack"))) == 1 # --------------------------------------------------------------------------- # Safety invariants preserved # --------------------------------------------------------------------------- class TestSafetyInvariantsPreserved: def test_poisoned_object_skips_its_snapshot_and_commit(self, tmp_path: pathlib.Path) -> None: """Content/ID mismatch → object, snapshot, and commit all skipped.""" repo = _init_repo(tmp_path) content = b"legitimate content" oid = blob_id(content) bad_content = b"poisoned content" # wrong bytes for this oid manifest = {"file.txt": oid} sid = hash_snapshot(manifest) cid = hash_commit( parent_ids=[], snapshot_id=sid, message="poisoned", committed_at_iso=_DT.isoformat(), ) mpack: MPack = { "objects": [{"object_id": oid, "content": bad_content}], "snapshots": [{"snapshot_id": sid, "parent_snapshot_id": None, "delta_add": manifest, "delta_remove": []}], "commits": [CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message="poisoned", committed_at=_DT, parent_commit_id=None, parent2_commit_id=None, author="", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ).to_dict()], "tags": [], } result = apply_mpack(repo, mpack) assert not has_object(repo, oid) assert read_snapshot(repo, sid) is None assert read_commit(repo, cid) is None assert oid in result["failed_objects"] def test_oserror_on_write_pack_aborts_cleanly(self, tmp_path: pathlib.Path) -> None: """OSError from write_pack must propagate before any snapshot or commit is written.""" repo = _init_repo(tmp_path) mpack, _ = _make_mpack(1) sid = mpack["snapshots"][0]["snapshot_id"] cid = mpack["commits"][0]["commit_id"] with patch("muse.core.mpack.write_pack", side_effect=OSError("disk full")): with pytest.raises(OSError, match="disk full"): apply_mpack(repo, mpack) assert read_snapshot(repo, sid) is None assert read_commit(repo, cid) is None def test_duplicate_object_ids_skipped(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) content = b"dedup me" oid = blob_id(content) mpack: MPack = { "objects": [ {"object_id": oid, "content": content}, {"object_id": oid, "content": content}, # duplicate ], "snapshots": [], "commits": [], "tags": [], } result = apply_mpack(repo, mpack) assert result["objects_written"] == 1 assert result["objects_skipped"] == 1 def test_oversized_object_tracked_as_failed(self, tmp_path: pathlib.Path) -> None: from muse.core.validation import MAX_OBJECT_WRITE_BYTES repo = _init_repo(tmp_path) big = b"x" * (MAX_OBJECT_WRITE_BYTES + 1) oid = blob_id(big) mpack: MPack = { "objects": [{"object_id": oid, "content": big}], "snapshots": [], "commits": [], "tags": [], } result = apply_mpack(repo, mpack) assert oid in result["failed_objects"] assert not has_object(repo, oid) def test_empty_objects_list_writes_no_pack(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) mpack: MPack = {"objects": [], "snapshots": [], "commits": [], "tags": []} apply_mpack(repo, mpack) assert not packs_dir(repo).exists() or not list(packs_dir(repo).glob("*.mpack"))