"""Tests for muse.core.mpack — MPack build and apply operations.""" from __future__ import annotations import datetime import json import pathlib import pytest from muse.core.object_store import has_object, read_object, write_object from muse.core.mpack import ( BlobPayload, MPack, SnapshotDeltaDict, apply_mpack, build_mpack, ) from muse.core.commits import CommitDict from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.types import Manifest, blob_id, fake_id from muse.core.commits import ( CommitRecord, read_commit, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, read_snapshot, write_snapshot, ) from muse.core.paths import commits_dir, objects_dir, snapshots_dir, muse_dir # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal .muse/ repo structure.""" dot_muse = muse_dir(tmp_path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "objects").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") return tmp_path def _make_object(root: pathlib.Path, content: bytes) -> str: """Write raw bytes into the object store; return the object_id.""" oid = blob_id(content) write_object(root, oid, content) return oid def _make_snapshot(root: pathlib.Path, manifest: Manifest) -> str: """Write a snapshot with a valid content-hash snapshot_id. Returns the snapshot_id.""" snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) return snap_id def _make_commit( root: pathlib.Path, snapshot_id: str, message: str = "test", parent: str | None = None, ) -> str: """Write a commit with a valid content-hash commit_id. Returns the commit_id.""" committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [parent] if parent else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snapshot_id, message=message, committed_at_iso=committed_at.isoformat(), ) c = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snapshot_id, message=message, committed_at=committed_at, parent_commit_id=parent, ) write_commit(root, c) return commit_id # --------------------------------------------------------------------------- # build_mpack tests # --------------------------------------------------------------------------- class TestBuildPack: def test_single_commit_no_history(self, repo: pathlib.Path) -> None: content = b"hello world" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"file.txt": oid}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) assert len(mpack.get("commits") or []) == 1 assert len(mpack.get("snapshots") or []) == 1 assert len(mpack.get("blobs") or []) == 1 assert (mpack.get("blobs") or [{}])[0]["object_id"] == oid def test_object_content_is_raw_bytes(self, repo: pathlib.Path) -> None: content = b"\x00\x01\x02\x03" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"bin.dat": oid}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) objs = mpack.get("blobs") or [] assert len(objs) == 1 assert objs[0]["content"] == content def test_multi_commit_chain(self, repo: pathlib.Path) -> None: oid1 = _make_object(repo, b"v1") oid2 = _make_object(repo, b"v2") snap1_id = _make_snapshot(repo, {"f.txt": oid1}) snap2_id = _make_snapshot(repo, {"f.txt": oid2}) c1_id = _make_commit(repo, snap1_id) c2_id = _make_commit(repo, snap2_id, parent=c1_id) mpack = build_mpack(repo, [c2_id]) assert len(mpack.get("commits") or []) == 2 assert len(mpack.get("snapshots") or []) == 2 assert len(mpack.get("blobs") or []) == 2 def test_have_excludes_ancestor_commits(self, repo: pathlib.Path) -> None: oid1 = _make_object(repo, b"v1") oid2 = _make_object(repo, b"v2") snap1_id = _make_snapshot(repo, {"f.txt": oid1}) snap2_id = _make_snapshot(repo, {"f.txt": oid2}) c1_id = _make_commit(repo, snap1_id) c2_id = _make_commit(repo, snap2_id, parent=c1_id) mpack = build_mpack(repo, [c2_id], have=[c1_id]) # Only c2 should be in the mpack; c1 is in have. commit_ids = [c["commit_id"] for c in (mpack.get("commits") or [])] assert c2_id in commit_ids assert c1_id not in commit_ids def test_deduplicates_shared_objects(self, repo: pathlib.Path) -> None: shared_oid = _make_object(repo, b"shared") snap1_id = _make_snapshot(repo, {"a.txt": shared_oid}) snap2_id = _make_snapshot(repo, {"b.txt": shared_oid}) c1_id = _make_commit(repo, snap1_id) c2_id = _make_commit(repo, snap2_id, parent=c1_id) mpack = build_mpack(repo, [c2_id]) # Shared object should appear only once. object_ids = [o["object_id"] for o in (mpack.get("blobs") or [])] assert object_ids.count(shared_oid) == 1 def test_empty_commit_ids_returns_empty_bundle(self, repo: pathlib.Path) -> None: mpack = build_mpack(repo, []) assert (mpack.get("commits") or []) == [] assert (mpack.get("blobs") or []) == [] def test_missing_commit_skipped_gracefully(self, repo: pathlib.Path) -> None: # Should not raise even if a commit_id does not exist. mpack = build_mpack(repo, [fake_id("nonexistent")]) assert (mpack.get("commits") or []) == [] def test_snapshot_always_included_for_every_commit(self, repo: pathlib.Path) -> None: """Every commit in the pack must have its snapshot included. This is the data-integrity invariant that prevents the corruption pattern where commits arrive on the remote without their snapshots, making them permanently unreadable after a local .muse wipe. """ oid = _make_object(repo, b"content") snap_id = _make_snapshot(repo, {"a.txt": oid}) c_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c_id]) commit_snap_ids = {c["snapshot_id"] for c in (mpack.get("commits") or [])} bundled_snap_ids = {s["snapshot_id"] for s in (mpack.get("snapshots") or [])} assert commit_snap_ids == bundled_snap_ids, ( "Every commit's snapshot_id must appear in the mpack's snapshots list" ) def test_missing_snapshot_raises_not_skips(self, repo: pathlib.Path) -> None: """build_mpack must raise ValueError when a commit's snapshot is absent. Silently skipping was the root cause of the recurring snapshot corruption: commits reached the remote without their snapshots, and subsequent pulls restored commits but not snapshots. """ # Write commit record directly — no snapshot written import datetime from muse.core.ids import hash_commit as compute_commit_id snap_id = fake_id("ab-missing-snapshot") # valid prefixed ID, but no snapshot file exists committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) c_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="orphan", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=c_id, branch="main", snapshot_id=snap_id, message="orphan", committed_at=committed_at, )) with pytest.raises(ValueError, match="Push aborted"): build_mpack(repo, [c_id]) def test_merge_commit_includes_both_parents(self, repo: pathlib.Path) -> None: oid_a = _make_object(repo, b"branch-a") oid_b = _make_object(repo, b"branch-b") snap_a_id = _make_snapshot(repo, {"a.txt": oid_a}) snap_b_id = _make_snapshot(repo, {"b.txt": oid_b}) snap_m_id = _make_snapshot(repo, {"a.txt": oid_a, "b.txt": oid_b}) c_a_id = _make_commit(repo, snap_a_id) c_b_id = _make_commit(repo, snap_b_id) # Merge commit with two parents — compute its ID from both parent hashes. committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) c_merge_id = compute_commit_id( parent_ids=[c_a_id, c_b_id], snapshot_id=snap_m_id, message="merge", committed_at_iso=committed_at.isoformat(), ) c_merge = CommitRecord( commit_id=c_merge_id, branch="main", snapshot_id=snap_m_id, message="merge", committed_at=committed_at, parent_commit_id=c_a_id, parent2_commit_id=c_b_id, ) write_commit(repo, c_merge) mpack = build_mpack(repo, [c_merge_id]) commit_ids = {c["commit_id"] for c in (mpack.get("commits") or [])} assert {c_merge_id, c_a_id, c_b_id}.issubset(commit_ids) # --------------------------------------------------------------------------- # apply_mpack tests # --------------------------------------------------------------------------- class TestApplyPack: def test_round_trip(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: """build_mpack → apply_mpack in a fresh repo produces identical data.""" content = b"round trip" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"f.txt": oid}) c1_id = _make_commit(repo, snap_id, message="initial") mpack = build_mpack(repo, [c1_id]) # Apply into a fresh repo. dest = tmp_path / "dest" dot_muse = muse_dir(dest) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "objects").mkdir(parents=True) result = apply_mpack(dest, mpack) assert result["blobs_written"] ==1 assert has_object(dest, oid) assert read_object(dest, oid) == content assert read_snapshot(dest, snap_id) is not None assert read_commit(dest, c1_id) is not None def test_idempotent_apply(self, repo: pathlib.Path) -> None: """Applying the same mpack twice does not raise and new_count = 0.""" content = b"idempotent" oid = _make_object(repo, content) snap_id = _make_snapshot(repo, {"f.txt": oid}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) apply_mpack(repo, mpack) result = apply_mpack(repo, mpack) assert result["blobs_written"] ==0 # All already present. def test_malformed_object_skipped(self, repo: pathlib.Path) -> None: # content must be bytes; passing wrong type is caught gracefully mpack: MPack = { "commits": [], "snapshots": [], "blobs": [BlobPayload(object_id="abc123", content=b"")], } result = apply_mpack(repo, mpack) assert result["blobs_written"] ==0 def test_empty_bundle_is_noop(self, repo: pathlib.Path) -> None: mpack: MPack = {} result = apply_mpack(repo, mpack) assert result["blobs_written"] ==0 def test_apply_preserves_commit_metadata( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: oid = _make_object(repo, b"data") snap_id = _make_snapshot(repo, {"data.bin": oid}) c1_id = _make_commit(repo, snap_id, message="preserve me") mpack = build_mpack(repo, [c1_id]) dest = tmp_path / "d" (commits_dir(dest)).mkdir(parents=True) (snapshots_dir(dest)).mkdir(parents=True) (objects_dir(dest)).mkdir(parents=True) apply_mpack(dest, mpack) commit = read_commit(dest, c1_id) assert commit is not None assert commit.message == "preserve me" assert commit.snapshot_id == snap_id def test_apply_returns_new_object_count( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: oid1 = _make_object(repo, b"obj1") oid2 = _make_object(repo, b"obj2") snap_id = _make_snapshot(repo, {"a": oid1, "b": oid2}) c1_id = _make_commit(repo, snap_id) mpack = build_mpack(repo, [c1_id]) dest = tmp_path / "d" (commits_dir(dest)).mkdir(parents=True) (snapshots_dir(dest)).mkdir(parents=True) (objects_dir(dest)).mkdir(parents=True) result = apply_mpack(dest, mpack) assert result["blobs_written"] ==2 # --------------------------------------------------------------------------- # Commit-without-snapshot guard — regression for the "snaps=0" pull bug. # # When the server returns a commit but omits its snapshot (e.g. snaps=0 due to # a wire_fetch bug), apply_mpack must NOT write the commit to the local store. # Writing the commit without its snapshot leaves the store in an inconsistent # state: on the next pull the commit is in `have`, the server returns nothing # new, and pull aborts with "snapshot missing" forever. # --------------------------------------------------------------------------- def _wire_snap(manifest: Manifest) -> SnapshotDeltaDict: """Build a full-manifest wire snapshot dict (no delta chain).""" snap_id = compute_snapshot_id(manifest) return SnapshotDeltaDict( snapshot_id=snap_id, parent_snapshot_id=None, delta_upsert=manifest, delta_remove=[], ) def _wire_commit(snap_id: str, message: str = "c", parent: str | None = None) -> CommitDict: """Build a minimal wire commit dict whose commit_id matches hash_commit exactly.""" import datetime from muse.core.ids import hash_commit committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [parent] if parent else [] author = "gabriel" commit_id = hash_commit( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), author=author, signer_public_key="", ) return { "commit_id": commit_id, "branch": "main", "snapshot_id": snap_id, "message": message, "committed_at": committed_at.isoformat(), "parent_commit_id": parent, "parent2_commit_id": None, "author": author, "agent_id": "", "model_id": "", "toolchain_id": "", } class TestApplyMpackMissingSnapshotGuard: """apply_mpack must not write a commit whose snapshot is absent from both the mpack and the local store (snaps=0 wire_fetch bug regression).""" def test_commit_not_written_when_snapshot_absent_from_mpack( self, repo: pathlib.Path ) -> None: """Core regression: commit arrives in mpack but its snapshot does not. The commit must NOT be written to the local store.""" oid = blob_id(b"some content") snap_id = compute_snapshot_id({"f.txt": oid}) wire_c = _wire_commit(snap_id) # mpack has the commit but zero snapshots — simulates snaps=0 result = apply_mpack(repo, {"commits": [wire_c], "snapshots": [], "blobs": []}) assert result["commits_written"] == 0, ( "commit must not be written when its snapshot is absent from the mpack" ) assert read_commit(repo, wire_c["commit_id"]) is None, ( "commit must not be present in the local store" ) def test_next_pull_can_succeed_after_refused_commit( self, repo: pathlib.Path ) -> None: """After the guard refuses the commit, a second apply with the snapshot included must succeed and write both commit and snapshot.""" oid = blob_id(b"content") manifest = {"f.txt": oid} snap_id = compute_snapshot_id(manifest) wire_c = _wire_commit(snap_id) wire_s = _wire_snap(manifest) # First apply: snapshot missing → commit refused r1 = apply_mpack(repo, {"commits": [wire_c], "snapshots": [], "blobs": []}) assert r1["commits_written"] == 0 # Second apply: snapshot included → commit written r2 = apply_mpack(repo, {"commits": [wire_c], "snapshots": [wire_s], "blobs": []}) assert r2["commits_written"] == 1 assert read_commit(repo, wire_c["commit_id"]) is not None assert read_snapshot(repo, snap_id) is not None def test_commit_written_when_snapshot_already_in_local_store( self, repo: pathlib.Path ) -> None: """If the snapshot is already in the local store (from a prior fetch), the commit must be written even if the mpack has zero snapshots.""" oid = _make_object(repo, b"pre-existing content") snap_id = _make_snapshot(repo, {"f.txt": oid}) # already in local store wire_c = _wire_commit(snap_id) result = apply_mpack(repo, {"commits": [wire_c], "snapshots": [], "blobs": []}) assert result["commits_written"] == 1, ( "commit must be written when its snapshot is already in the local store" ) assert read_commit(repo, wire_c["commit_id"]) is not None def test_commit_written_when_snapshot_in_same_mpack( self, repo: pathlib.Path ) -> None: """Happy path: both commit and snapshot in mpack → commit is written.""" oid = blob_id(b"happy path") manifest = {"g.txt": oid} snap_id = compute_snapshot_id(manifest) wire_c = _wire_commit(snap_id) wire_s = _wire_snap(manifest) result = apply_mpack(repo, {"commits": [wire_c], "snapshots": [wire_s], "blobs": []}) assert result["commits_written"] == 1 assert read_commit(repo, wire_c["commit_id"]) is not None