"""Tests for muse.core.mpack — MPack build and apply operations.""" from __future__ import annotations import datetime import json import pathlib import pytest from muse.core.object_store import has_object, read_object, write_object from muse.core.mpack import ( BlobPayload, MPack, apply_mpack, build_mpack, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.types import Manifest, NULL_LONG_ID, long_id from muse.core.commits import ( CommitRecord, read_commit, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, read_snapshot, write_snapshot, ) from muse.core.paths import commits_dir, objects_dir, snapshots_dir, muse_dir # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal .muse/ repo structure.""" dot_muse = muse_dir(tmp_path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "objects").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") return tmp_path def _make_object(root: pathlib.Path, content: bytes) -> str: """Write raw bytes into the object store; return the object_id.""" from muse.core.types import blob_id oid = blob_id(content) write_object(root, oid, content) return oid def _make_snapshot(root: pathlib.Path, manifest: Manifest) -> str: """Write a snapshot with a valid content-hash snapshot_id. Returns the snapshot_id.""" snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) return snap_id def _make_commit( root: pathlib.Path, snapshot_id: str, message: str = "test", parent: str | None = None, ) -> str: """Write a commit with a valid content-hash commit_id. Returns the commit_id.""" committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [parent] if parent else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snapshot_id, message=message, committed_at_iso=committed_at.isoformat(), ) c = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snapshot_id, message=message, committed_at=committed_at, parent_commit_id=parent, ) write_commit(root, c) return commit_id # --------------------------------------------------------------------------- # build_mpack tests # --------------------------------------------------------------------------- class TestBuildMPack: def test_single_commit_no_history(self, repo: pathlib.Path) -> None: content = b"hello world" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"file.txt": oid}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) assert len(mpack.get("commits") or []) == 1 assert len(mpack.get("snapshots") or []) == 1 assert len(mpack.get("blobs") or []) == 1 assert (mpack.get("blobs") or [{}])[0]["object_id"] == oid def test_object_content_is_raw_bytes(self, repo: pathlib.Path) -> None: content = b"\x00\x01\x02\x03" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"bin.dat": oid}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) objs = mpack.get("blobs") or [] assert len(objs) == 1 assert objs[0]["content"] == content def test_multi_commit_chain(self, repo: pathlib.Path) -> None: oid1 = _make_object(repo, b"v1") oid2 = _make_object(repo, b"v2") snap1_id = _make_snapshot(repo, {"f.txt": oid1}) snap2_id = _make_snapshot(repo, {"f.txt": oid2}) c1_id = _make_commit(repo, snap1_id) c2_id = _make_commit(repo, snap2_id, parent=c1_id) mpack = build_mpack(repo, [c2_id]) assert len(mpack.get("commits") or []) == 2 assert len(mpack.get("snapshots") or []) == 2 assert len(mpack.get("blobs") or []) == 2 def test_have_excludes_ancestor_commits(self, repo: pathlib.Path) -> None: oid1 = _make_object(repo, b"v1") oid2 = _make_object(repo, b"v2") snap1_id = _make_snapshot(repo, {"f.txt": oid1}) snap2_id = _make_snapshot(repo, {"f.txt": oid2}) c1_id = _make_commit(repo, snap1_id) c2_id = _make_commit(repo, snap2_id, parent=c1_id) mpack = build_mpack(repo, [c2_id], have=[c1_id]) # Only c2 should be in the mpack; c1 is in have. commit_ids = [c["commit_id"] for c in (mpack.get("commits") or [])] assert c2_id in commit_ids assert c1_id not in commit_ids def test_deduplicates_shared_objects(self, repo: pathlib.Path) -> None: shared_oid = _make_object(repo, b"shared") snap1_id = _make_snapshot(repo, {"a.txt": shared_oid}) snap2_id = _make_snapshot(repo, {"b.txt": shared_oid}) c1_id = _make_commit(repo, snap1_id) c2_id = _make_commit(repo, snap2_id, parent=c1_id) mpack = build_mpack(repo, [c2_id]) # Shared object should appear only once. object_ids = [o["object_id"] for o in (mpack.get("blobs") or [])] assert object_ids.count(shared_oid) == 1 def test_empty_commit_ids_returns_empty_mpack(self, repo: pathlib.Path) -> None: mpack = build_mpack(repo, []) assert (mpack.get("commits") or []) == [] assert (mpack.get("blobs") or []) == [] def test_missing_commit_skipped_gracefully(self, repo: pathlib.Path) -> None: # Should not raise even if a commit_id does not exist. mpack = build_mpack(repo, [NULL_LONG_ID]) assert (mpack.get("commits") or []) == [] def test_snapshot_always_included_for_every_commit(self, repo: pathlib.Path) -> None: """Every commit in the mpack must have its snapshot included. This is the data-integrity invariant that prevents the corruption pattern where commits arrive on the remote without their snapshots, making them permanently unreadable after a local .muse wipe. """ oid = _make_object(repo, b"content") snap_id = _make_snapshot(repo, {"a.txt": oid}) c_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c_id]) commit_snap_ids = {c["snapshot_id"] for c in (mpack.get("commits") or [])} bundled_snap_ids = {s["snapshot_id"] for s in (mpack.get("snapshots") or [])} assert commit_snap_ids == bundled_snap_ids, ( "Every commit's snapshot_id must appear in the mpack's snapshots list" ) def test_missing_snapshot_raises_not_skips(self, repo: pathlib.Path) -> None: """build_mpack must raise ValueError when a commit's snapshot is absent. Silently skipping was the root cause of the recurring snapshot corruption: commits reached the remote without their snapshots, and subsequent pulls restored commits but not snapshots. """ # Write commit record directly — no snapshot written import datetime from muse.core.ids import hash_commit as compute_commit_id from muse.core.types import long_id as _long_id snap_id = _long_id("ab" * 32) # valid prefixed ID, but no snapshot file exists committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) c_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="orphan", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=c_id, branch="main", snapshot_id=snap_id, message="orphan", committed_at=committed_at, )) with pytest.raises(ValueError, match="Push aborted"): build_mpack(repo, [c_id]) def test_merge_commit_includes_both_parents(self, repo: pathlib.Path) -> None: oid_a = _make_object(repo, b"branch-a") oid_b = _make_object(repo, b"branch-b") snap_a_id = _make_snapshot(repo, {"a.txt": oid_a}) snap_b_id = _make_snapshot(repo, {"b.txt": oid_b}) snap_m_id = _make_snapshot(repo, {"a.txt": oid_a, "b.txt": oid_b}) c_a_id = _make_commit(repo, snap_a_id) c_b_id = _make_commit(repo, snap_b_id) # Merge commit with two parents — compute its ID from both parent hashes. committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) c_merge_id = compute_commit_id( parent_ids=[c_a_id, c_b_id], snapshot_id=snap_m_id, message="merge", committed_at_iso=committed_at.isoformat(), ) c_merge = CommitRecord( commit_id=c_merge_id, branch="main", snapshot_id=snap_m_id, message="merge", committed_at=committed_at, parent_commit_id=c_a_id, parent2_commit_id=c_b_id, ) write_commit(repo, c_merge) mpack = build_mpack(repo, [c_merge_id]) commit_ids = {c["commit_id"] for c in (mpack.get("commits") or [])} assert {c_merge_id, c_a_id, c_b_id}.issubset(commit_ids) # --------------------------------------------------------------------------- # apply_mpack tests # --------------------------------------------------------------------------- class TestApplyMPack: def test_round_trip(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: """build_mpack → apply_mpack in a fresh repo produces identical data.""" content = b"round trip" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"f.txt": oid}) c1_id = _make_commit(repo, snap_id, message="initial") mpack = build_mpack(repo, [c1_id]) # Apply into a fresh repo. dest = tmp_path / "dest" dot_muse = muse_dir(dest) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "objects").mkdir(parents=True) result = apply_mpack(dest, mpack) assert result["blobs_written"] == 1 assert has_object(dest, oid) assert read_object(dest, oid) == content assert read_snapshot(dest, snap_id) is not None assert read_commit(dest, c1_id) is not None def test_idempotent_apply(self, repo: pathlib.Path) -> None: """Applying the same mpack twice does not raise and new_count = 0.""" content = b"idempotent" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"f.txt": oid}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) apply_mpack(repo, mpack) result = apply_mpack(repo, mpack) assert result["blobs_written"] == 0 # All already present. def test_malformed_object_skipped(self, repo: pathlib.Path) -> None: # content must be bytes; passing wrong type is caught gracefully mpack: MPack = { "commits": [], "snapshots": [], "blobs": [BlobPayload(object_id="abc123", content=b"")], } result = apply_mpack(repo, mpack) assert result["blobs_written"] == 0 def test_empty_mpack_is_noop(self, repo: pathlib.Path) -> None: mpack: MPack = {} result = apply_mpack(repo, mpack) assert result["blobs_written"] == 0 def test_apply_preserves_commit_metadata( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: oid = _make_object(repo, b"data") snap_id = _make_snapshot(repo, {"data.bin": oid}) c1_id = _make_commit(repo, snap_id, message="preserve me") mpack = build_mpack(repo, [c1_id]) dest = tmp_path / "d" (commits_dir(dest)).mkdir(parents=True) (snapshots_dir(dest)).mkdir(parents=True) (objects_dir(dest)).mkdir(parents=True) apply_mpack(dest, mpack) commit = read_commit(dest, c1_id) assert commit is not None assert commit.message == "preserve me" assert commit.snapshot_id == snap_id def test_apply_returns_new_object_count( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: oid1 = _make_object(repo, b"obj1") oid2 = _make_object(repo, b"obj2") snap_id = _make_snapshot(repo, {"a": oid1, "b": oid2}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) dest = tmp_path / "d" (commits_dir(dest)).mkdir(parents=True) (snapshots_dir(dest)).mkdir(parents=True) (objects_dir(dest)).mkdir(parents=True) result = apply_mpack(dest, mpack) assert result["blobs_written"] == 2 def test_apply_full_manifest_snapshot_from_server( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: """apply_mpack must write snapshots sent in full-manifest format. The server fetch response may include WireSnapshot dicts with a ``manifest`` key (full content, no delta encoding). _apply_snapshot_deltas only understands the ``delta_upsert``/``delta_remove`` format used by build_mpack. When it receives a full-manifest dict: {"snapshot_id": "sha256:...", "manifest": {"f.txt": "sha256:..."}, "directories": [], "created_at": ""} it finds delta_upsert={} and delta_remove=[], reconstructs base={}, computes sha256(empty) = "sha256:e3b0c44...", which mismatches the real snapshot_id → snapshot skipped → pull aborted with "snapshot referenced by commit" error. The fix must handle both formats in _apply_snapshot_deltas: - delta format: ``{snapshot_id, parent_snapshot_id, delta_upsert, delta_remove}`` - full format: ``{snapshot_id, manifest, directories, ...}`` """ oid = _make_object(repo, b"stream content") snap_id = _make_snapshot(repo, {"stream.txt": oid}) c_id = _make_commit(repo, snap_id, message="stream commit") # Simulate what _coerce_snapshot_dict produces from a full-manifest snapshot: # a dict with 'manifest' key, NO 'delta_upsert' or 'delta_remove'. full_manifest_snapshot = { "snapshot_id": snap_id, "manifest": {"stream.txt": oid}, "directories": [], "created_at": "", } commit_dict = read_commit(repo, c_id) assert commit_dict is not None dest = tmp_path / "dest" (commits_dir(dest)).mkdir(parents=True) (snapshots_dir(dest)).mkdir(parents=True) (objects_dir(dest)).mkdir(parents=True) write_object(dest, oid, b"stream content") # object already present mpack: MPack = { "commits": [commit_dict.to_dict()], "snapshots": [full_manifest_snapshot], "blobs": [], } result = apply_mpack(dest, mpack) assert read_snapshot(dest, snap_id) is not None, ( "Snapshot with full manifest format was not written — " "_apply_snapshot_deltas did not handle the 'manifest' key" ) assert result["snapshots_written"] == 1