"""Object store write taxonomy — exhaustive correctness and safety tests. Every path that writes OR deletes objects is enumerated here. Each test targets one invariant. If a test fails, it means a write or delete path is broken; fix the production code, not the test. Write paths covered ------------------- W-1 write_object() — primary low-level write W-2 write_object_from_path() — write from filesystem file W-3 commit workflow — muse commit writes blobs then snapshot W-4 shelf save — blobs written before shelf entry W-5 fetch / pull _on_object — objects written on receive W-6 apply_mpack — mpack unbundle writes objects W-7 domain merge — plugin merge writes merged blob W-8 hash_object --write — explicit low-level write Delete paths covered -------------------- D-1 gc non-full (default) — orphan sweep via snapshots walker D-2 gc full — tight reachability from live refs D-3 gc full multi-branch — objects on ALL branches survive D-4 gc full object normalisation — sha256: prefixed IDs in reachable set D-5 prune — mirrors gc non-full with expire window D-6 maintenance gc task — calls run_gc with full=True Consistency invariants ---------------------- C-1 write → has_object True C-2 write → object_state PRESENT C-3 write → iter_stored_objects finds it C-4 has_object and object_state agree C-5 object_path canonical location C-6 no write → object_state MISSING (no promisors) C-7 no write → object_state PROMISED (promisors configured) """ from __future__ import annotations import datetime import json import pathlib import tempfile from collections.abc import Mapping from typing import TypedDict import pytest from muse.core.types import Manifest, blob_id, long_id, split_id from muse.core.gc import run_gc, _collect_reachable_snapshots, _collect_reachable_commits from muse.core.object_availability import ObjectState, load_promisor_remotes, object_state from muse.core.object_store import ( has_object, iter_stored_objects, object_path, read_object, write_object, write_object_from_path, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.shelf import write_shelf_entry from muse.core.paths import muse_dir, objects_dir, ref_path, shelf_dir # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal .muse repo skeleton.""" muse = muse_dir(tmp_path) for d in ("objects/sha256", "commits/sha256", "snapshots/sha256", "refs/heads"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path def _write_blob(repo: pathlib.Path, content: bytes) -> str: oid = blob_id(content) write_object(repo, oid, content) return oid class _ShelfEntryData(TypedDict): snapshot: dict[str, str] branch: str created_at: str def _write_shelf_entry(repo: pathlib.Path, snapshot: Mapping[str, str]) -> None: import json as _json entry: _ShelfEntryData = { "snapshot": dict(snapshot), "branch": "main", "created_at": "2026-01-01T00:00:00+00:00", } raw_bytes = _json.dumps(entry, sort_keys=True).encode() _, hex_id = split_id(blob_id(raw_bytes)) entry["id"] = f"sha256:{hex_id}" write_shelf_entry(repo, entry) def _write_snap(repo: pathlib.Path, manifest: Manifest) -> str: snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) return snap_id def _write_commit_on_branch( repo: pathlib.Path, snap_id: str, branch: str = "main", parent_id: str | None = None, message: str = "test", ) -> str: committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [parent_id] if parent_id else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_commit( repo, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, ), ) ref = ref_path(repo, branch) ref.parent.mkdir(parents=True, exist_ok=True) ref.write_text(commit_id) return commit_id # --------------------------------------------------------------------------- # W-1 write_object — canonical path # --------------------------------------------------------------------------- class TestWriteObject: """W-1: write_object() places objects at the canonical sha256/ path.""" def test_lands_under_sha256_dir(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = blob_id(b"hello") write_object(repo, oid, b"hello") p = object_path(repo, oid) assert p.exists() assert p.parent.parent.name == "sha256" def test_shard_prefix_is_first_two_hex_chars(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) content = b"shard-check" oid = blob_id(content) write_object(repo, oid, content) p = object_path(repo, oid) hex_id = split_id(oid)[1] assert p.parent.name == hex_id[:2] def test_filename_is_remaining_62_hex_chars(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) content = b"filename-check" oid = blob_id(content) write_object(repo, oid, content) p = object_path(repo, oid) hex_id = split_id(oid)[1] assert p.name == hex_id[2:] def test_idempotent_returns_false_on_second_write( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = blob_id(b"idempotent") assert write_object(repo, oid, b"idempotent") is True assert write_object(repo, oid, b"idempotent") is False def test_content_verifiable_after_write(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) content = b"verifiable content" oid = blob_id(content) write_object(repo, oid, content) assert read_object(repo, oid) == content def test_rejects_wrong_content(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = blob_id(b"correct") with pytest.raises(ValueError): write_object(repo, oid, b"wrong content") def test_rejects_bare_hex_object_id(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) bare_hex = split_id(blob_id(b"bare"))[1] with pytest.raises((ValueError, Exception)): write_object(repo, bare_hex, b"bare") # --------------------------------------------------------------------------- # W-2 write_object_from_path — canonical path # --------------------------------------------------------------------------- class TestWriteObjectFromPath: """W-2: write_object_from_path() writes from a file and lands at canonical path.""" def test_writes_to_sha256_dir(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) src = tmp_path / "source.txt" content = b"from-path content" src.write_bytes(content) oid = blob_id(content) write_object_from_path(repo, oid, src) p = object_path(repo, oid) assert p.exists() assert p.parent.parent.name == "sha256" def test_oid_matches_blob_id(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) content = b"oid must match blob_id" src = tmp_path / "f.txt" src.write_bytes(content) oid = blob_id(content) write_object_from_path(repo, oid, src) assert oid == blob_id(content) def test_content_readable_after_write(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) content = b"readable after write" src = tmp_path / "r.txt" src.write_bytes(content) oid = blob_id(content) write_object_from_path(repo, oid, src) assert read_object(repo, oid) == content # --------------------------------------------------------------------------- # C-1 … C-7 Consistency invariants # --------------------------------------------------------------------------- class TestConsistencyInvariants: """C-1 through C-7: consistency between write, has_object, object_state, iter.""" def test_c1_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"c1") assert has_object(repo, oid) def test_c2_object_state_present_after_write(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"c2") state = object_state(repo, oid, []) assert state == ObjectState.PRESENT def test_c3_iter_stored_objects_finds_written( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"c3") found = {o for o, _ in iter_stored_objects(repo)} assert oid in found def test_c4_has_object_and_object_state_agree_present( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"c4-present") assert has_object(repo, oid) assert object_state(repo, oid, []) == ObjectState.PRESENT def test_c4_has_object_and_object_state_agree_absent( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = blob_id(b"never written") assert not has_object(repo, oid) assert object_state(repo, oid, []) == ObjectState.MISSING def test_c5_object_path_canonical_location(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) content = b"canonical" oid = blob_id(content) write_object(repo, oid, content) p = object_path(repo, oid) hex_id = split_id(oid)[1] expected = objects_dir(repo) / "sha256" / hex_id[:2] / hex_id[2:] assert p == expected assert p.exists() def test_c6_object_state_missing_when_absent_no_promisors( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = blob_id(b"missing") state = object_state(repo, oid, promisor_remotes=[]) assert state == ObjectState.MISSING def test_c7_object_state_promised_when_absent_with_promisor( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = blob_id(b"promised") state = object_state(repo, oid, promisor_remotes=["staging"]) assert state == ObjectState.PROMISED def test_c7_object_state_present_beats_promisor( self, tmp_path: pathlib.Path ) -> None: """A present object is PRESENT even when promisors are configured.""" repo = _repo(tmp_path) oid = _write_blob(repo, b"present beats promisor") state = object_state(repo, oid, promisor_remotes=["staging"]) assert state == ObjectState.PRESENT # --------------------------------------------------------------------------- # D-1 GC non-full — orphan sweep # --------------------------------------------------------------------------- class TestGcNonFull: """D-1: default (non-full) GC sweeps orphans but retains all reachable objects.""" def test_orphan_collected(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"orphan") run_gc(repo, grace_period_seconds=0) assert not object_path(repo, oid).exists() def test_reachable_via_snapshot_survives(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"reachable") snap_id = _write_snap(repo, {"f.txt": oid}) _write_commit_on_branch(repo, snap_id) run_gc(repo, grace_period_seconds=0) assert object_path(repo, oid).exists() def test_reachable_on_non_default_branch_survives( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"non-default branch") snap_id = _write_snap(repo, {"g.txt": oid}) _write_commit_on_branch(repo, snap_id, branch="dev") run_gc(repo, grace_period_seconds=0) assert object_path(repo, oid).exists() def test_multiple_orphans_all_collected(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oids = [_write_blob(repo, f"o{i}".encode()) for i in range(5)] result = run_gc(repo, grace_period_seconds=0) assert result.collected_count == 5 for oid in oids: assert not object_path(repo, oid).exists() def test_grace_period_protects_recent_objects( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"fresh orphan") result = run_gc(repo, grace_period_seconds=3600) assert result.collected_count == 0 assert object_path(repo, oid).exists() # --------------------------------------------------------------------------- # D-2 GC full — tight reachability # --------------------------------------------------------------------------- class TestGcFull: """D-2: gc full mode uses tight reachability but must still retain all live objects.""" def test_reachable_object_survives_full_gc(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"live object") snap_id = _write_snap(repo, {"live.txt": oid}) _write_commit_on_branch(repo, snap_id) result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0 assert object_path(repo, oid).exists() def test_orphan_collected_by_full_gc(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) # One reachable, one orphan. live_oid = _write_blob(repo, b"live") snap_id = _write_snap(repo, {"f.txt": live_oid}) _write_commit_on_branch(repo, snap_id) orphan_oid = _write_blob(repo, b"orphan") result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 1 assert not object_path(repo, orphan_oid).exists() assert object_path(repo, live_oid).exists() def test_full_gc_dry_run_does_not_delete(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) oid = _write_blob(repo, b"dry-run orphan") result = run_gc(repo, full=True, dry_run=True, grace_period_seconds=0) assert result.dry_run is True assert object_path(repo, oid).exists() # --------------------------------------------------------------------------- # D-3 GC full multi-branch — objects on ALL live branches survive # --------------------------------------------------------------------------- class TestGcFullMultiBranch: """D-3: full GC must retain objects reachable from every live branch, not just HEAD.""" def test_object_on_secondary_branch_survives_full_gc( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) # main branch object main_oid = _write_blob(repo, b"main content") main_snap = _write_snap(repo, {"main.txt": main_oid}) _write_commit_on_branch(repo, main_snap, branch="main") # dev branch object (different content) dev_oid = _write_blob(repo, b"dev content") dev_snap = _write_snap(repo, {"dev.txt": dev_oid}) _write_commit_on_branch(repo, dev_snap, branch="dev") result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0 assert object_path(repo, main_oid).exists(), "main branch object deleted!" assert object_path(repo, dev_oid).exists(), "dev branch object deleted!" def test_object_on_three_branches_all_survive( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) oids = [] for branch in ("main", "dev", "feat/x"): oid = _write_blob(repo, f"content on {branch}".encode()) snap_id = _write_snap(repo, {f"{branch}.txt": oid}) _write_commit_on_branch(repo, snap_id, branch=branch) oids.append(oid) result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0 for oid in oids: assert object_path(repo, oid).exists() def test_shared_object_referenced_by_two_branches_survives( self, tmp_path: pathlib.Path ) -> None: """If main and dev both reference the same object, full GC must keep it.""" repo = _repo(tmp_path) shared_oid = _write_blob(repo, b"shared content") for branch in ("main", "dev"): snap_id = _write_snap(repo, {"shared.txt": shared_oid}) _write_commit_on_branch(repo, snap_id, branch=branch) result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0 assert object_path(repo, shared_oid).exists() # --------------------------------------------------------------------------- # D-4 GC full object ID normalisation # --------------------------------------------------------------------------- class TestGcFullObjectNormalisation: """D-4: full GC reachability set uses sha256:-prefixed IDs throughout. This is the critical invariant that ensures the reachable-objects set (built from snapshot manifests) matches the stored-objects set (built from iter_stored_objects). A mismatch would cause live objects to be incorrectly classified as unreachable and deleted. """ def test_reachable_set_uses_prefixed_ids(self, tmp_path: pathlib.Path) -> None: """_collect_reachable_snapshots returns sha256:-prefixed object IDs.""" repo = _repo(tmp_path) oid = _write_blob(repo, b"normalisation check") snap_id = _write_snap(repo, {"f.txt": oid}) _write_commit_on_branch(repo, snap_id) reachable_commits = _collect_reachable_commits(repo) _, reachable_objs = _collect_reachable_snapshots(repo, reachable_commits) # Every entry must carry the sha256: prefix. for obj_id in reachable_objs: assert obj_id.startswith("sha256:"), ( f"Reachable object ID missing sha256: prefix: {obj_id!r}" ) def test_iter_stored_objects_uses_prefixed_ids( self, tmp_path: pathlib.Path ) -> None: """iter_stored_objects returns sha256:-prefixed object IDs.""" repo = _repo(tmp_path) _write_blob(repo, b"stored check") for oid, _ in iter_stored_objects(repo): assert oid.startswith("sha256:"), ( f"iter_stored_objects returned unprefixed ID: {oid!r}" ) def test_reachable_set_matches_stored_set_for_live_objects( self, tmp_path: pathlib.Path ) -> None: """Every live object must appear in both sets with the same ID form.""" repo = _repo(tmp_path) oids = set() for i in range(3): oid = _write_blob(repo, f"live {i}".encode()) oids.add(oid) snap_id = _write_snap(repo, {f"f{i}.txt": o for i, o in enumerate(oids)}) _write_commit_on_branch(repo, snap_id) reachable_commits = _collect_reachable_commits(repo) _, reachable_objs = _collect_reachable_snapshots(repo, reachable_commits) stored = {o for o, _ in iter_stored_objects(repo)} # All live objects must be in both sets. for oid in oids: assert oid in reachable_objs, f"{oid} missing from reachable set" assert oid in stored, f"{oid} missing from stored set" def test_full_gc_does_not_delete_prefixed_manifest_objects( self, tmp_path: pathlib.Path ) -> None: """Regression: full GC must not delete objects whose IDs use sha256: prefix in the manifest.""" repo = _repo(tmp_path) contents = [f"file {i} content".encode() for i in range(5)] manifest = {} for i, c in enumerate(contents): oid = _write_blob(repo, c) manifest[f"file{i}.py"] = oid # Confirm the manifest value is prefixed. assert oid.startswith("sha256:"), f"blob_id returned unprefixed: {oid}" snap_id = _write_snap(repo, manifest) _write_commit_on_branch(repo, snap_id) result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0, ( f"Full GC deleted {result.collected_count} live objects: {result.collected_ids}" ) for oid in manifest.values(): assert object_path(repo, oid).exists(), f"Full GC deleted live object {oid}" def test_full_gc_retains_large_manifest(self, tmp_path: pathlib.Path) -> None: """Full GC must not delete any of N live objects in a large snapshot.""" repo = _repo(tmp_path) n = 50 manifest = {} for i in range(n): oid = _write_blob(repo, f"large manifest entry {i}".encode()) manifest[f"src/file_{i:03d}.py"] = oid snap_id = _write_snap(repo, manifest) _write_commit_on_branch(repo, snap_id) result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0, ( f"Full GC deleted objects from large manifest: {result.collected_ids[:5]}" ) # --------------------------------------------------------------------------- # D-5 Prune — mirrors non-full GC with expire window # --------------------------------------------------------------------------- class TestPruneSafety: """D-5: muse prune must never delete reachable objects.""" def test_prune_does_not_remove_committed_object( self, tmp_path: pathlib.Path ) -> None: """Objects referenced by commits must survive prune.""" from muse.core.gc import run_gc # prune delegates to gc repo = _repo(tmp_path) oid = _write_blob(repo, b"committed object") snap_id = _write_snap(repo, {"f.txt": oid}) _write_commit_on_branch(repo, snap_id) # Non-full GC is what prune uses. result = run_gc(repo, grace_period_seconds=0) assert result.collected_count == 0 assert object_path(repo, oid).exists() # --------------------------------------------------------------------------- # D-6 Maintenance gc task passes full=True # --------------------------------------------------------------------------- class TestMaintenanceGcUsesFull: """D-6: the maintenance 'gc' task must invoke run_gc with full=True.""" def test_maintenance_gc_task_calls_run_gc_with_full_true( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Confirm _run_gc (the maintenance task) passes full=True to run_gc.""" from muse.cli.commands import maintenance as maint_mod calls: list[dict] = [] def _capture_run_gc(root: pathlib.Path, *, dry_run: bool, grace_period_seconds: float, full: bool) -> "GcResult": calls.append({"full": full, "dry_run": dry_run}) from muse.core.gc import GcResult return GcResult(dry_run=dry_run, grace_period_seconds=grace_period_seconds, full=full) monkeypatch.setattr(maint_mod, "run_gc", _capture_run_gc) repo = _repo(tmp_path) maint_mod._run_gc(repo) assert calls, "run_gc was never called by maintenance _run_gc" assert calls[0]["full"] is True, ( f"Maintenance gc must pass full=True, got full={calls[0]['full']}" ) def test_maintenance_gc_retains_all_reachable_objects( self, tmp_path: pathlib.Path ) -> None: """End-to-end: running the maintenance gc task must not delete live objects.""" from muse.cli.commands.maintenance import _run_gc as maintenance_run_gc repo = _repo(tmp_path) # Write objects on two branches. for branch, content in (("main", b"main obj"), ("dev", b"dev obj")): oid = _write_blob(repo, content) snap_id = _write_snap(repo, {f"{branch}.py": oid}) _write_commit_on_branch(repo, snap_id, branch=branch) maintenance_run_gc(repo, dry_run=False) # Both objects must survive. for content in (b"main obj", b"dev obj"): oid = blob_id(content) assert object_path(repo, oid).exists(), ( f"Maintenance gc deleted live object {oid}" ) # --------------------------------------------------------------------------- # W-3 Commit workflow — objects written before commit record # --------------------------------------------------------------------------- class TestCommitWritePath: """W-3: the commit workflow must write blobs to the object store at the canonical path before creating the commit record. We test this at the store level (not the CLI) since the CLI requires a full working-tree environment. """ def test_snapshot_manifest_objects_at_canonical_path( self, tmp_path: pathlib.Path ) -> None: """Objects written for a commit land at the canonical sha256/ path.""" repo = _repo(tmp_path) contents = {f"src/file{i}.py": f"content {i}".encode() for i in range(3)} manifest = {} for path, content in contents.items(): oid = blob_id(content) write_object(repo, oid, content) manifest[path] = oid snap_id = _write_snap(repo, manifest) _write_commit_on_branch(repo, snap_id) # All objects reachable and at correct path. for oid in manifest.values(): p = object_path(repo, oid) assert p.exists() assert p.parent.parent.name == "sha256" def test_all_manifest_objects_survive_full_gc( self, tmp_path: pathlib.Path ) -> None: """Objects in a committed snapshot must all survive full GC.""" repo = _repo(tmp_path) manifest = {} for i in range(10): content = f"committed file {i}".encode() oid = blob_id(content) write_object(repo, oid, content) manifest[f"file{i}.py"] = oid snap_id = _write_snap(repo, manifest) _write_commit_on_branch(repo, snap_id) result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0 for oid in manifest.values(): assert object_path(repo, oid).exists() # --------------------------------------------------------------------------- # W-4 Shelf save — blobs written before shelf entry # --------------------------------------------------------------------------- class TestShelfWritePath: """W-4: shelf objects must survive GC even before they are committed.""" def test_shelved_objects_survive_non_full_gc( self, tmp_path: pathlib.Path ) -> None: repo = _repo(tmp_path) shelf_oid = _write_blob(repo, b"shelved work") _write_shelf_entry(repo, {"work.py": shelf_oid}) result = run_gc(repo, grace_period_seconds=0) assert result.collected_count == 0 assert object_path(repo, shelf_oid).exists() def test_shelved_objects_survive_full_gc(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) shelf_oid = _write_blob(repo, b"shelved full gc") _write_shelf_entry(repo, {"wip.py": shelf_oid}) result = run_gc(repo, full=True, grace_period_seconds=0) assert result.collected_count == 0 assert object_path(repo, shelf_oid).exists()