"""TDD — Phase 4: shelf entries from binary msgpack to git-header+JSON. Phase 4 requirements (issue #12): - shelf_entry_path() returns a path with NO extension - write_shelf_entry() writes "shelf \0" framing (same as commits) - read_shelf_entry() parses header+JSON; falls back to .msgpack on miss (silent upgrade) - list_shelf_entries() finds new-format files (no extension) - delete_shelf_entry() removes new-format AND legacy .msgpack files - gc._collect_shelf_objects() finds object IDs in new-format shelf entries """ from __future__ import annotations import json import pathlib from typing import TypedDict import msgpack import pytest from muse.core.ids import hash_blob from muse.core.paths import shelf_dir from muse.core.shelf import ( delete_shelf_entry, list_shelf_entries, read_shelf_entry, shelf_entry_path, write_shelf_entry, ) from muse.core.types import long_id, split_id # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _OBJ_A = hash_blob(b"object-a") _OBJ_B = hash_blob(b"object-b") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: (tmp_path / ".muse" / "shelf").mkdir(parents=True) return tmp_path class _ShelfEntry(TypedDict): id: str name: str snapshot: dict[str, str] created_at: str message: str branch: str intent_type: str resumable: bool metadata: dict[str, str] def _shelf_id(tag: str = "a") -> str: return hash_blob(f"shelf-entry-{tag}".encode()) def _entry(tag: str = "a") -> _ShelfEntry: eid = _shelf_id(tag) return { "id": eid, "name": f"entry-{tag}", "snapshot": {f"file_{tag}.py": _OBJ_A}, "created_at": "2026-05-21T00:00:00+00:00", "message": f"shelf {tag}", "branch": "dev", "intent_type": "manual", "resumable": False, "metadata": {}, } def _legacy_path(repo: pathlib.Path, entry_id: str) -> pathlib.Path: """Old-format .msgpack path for a shelf entry.""" return shelf_entry_path(repo, entry_id).with_suffix(".msgpack") def _write_legacy_shelf(repo: pathlib.Path, entry: _ShelfEntry) -> pathlib.Path: """Write a shelf entry in the old binary msgpack format.""" path = _legacy_path(repo, str(entry["id"])) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(msgpack.packb(entry, use_bin_type=True)) return path # --------------------------------------------------------------------------- # shelf_entry_path — no extension # --------------------------------------------------------------------------- class TestShelfEntryPath: def test_shelf_entry_path_has_no_extension(self, tmp_path: pathlib.Path) -> None: """shelf_entry_path() must return a path with no file extension.""" eid = _shelf_id() p = shelf_entry_path(tmp_path, eid) assert p.suffix == "", f"Expected no extension, got {p.suffix!r}" def test_shelf_entry_path_structure(self, tmp_path: pathlib.Path) -> None: """shelf_entry_path() uses .muse/shelf// layout.""" eid = _shelf_id() p = shelf_entry_path(tmp_path, eid) algo, hex_id = split_id(eid) assert p.name == hex_id assert p.parent.name == algo # --------------------------------------------------------------------------- # write_shelf_entry — git-header+JSON format # --------------------------------------------------------------------------- class TestWriteShelfEntry: def test_write_produces_shelf_header(self, tmp_path: pathlib.Path) -> None: """write_shelf_entry() must write a file starting with 'shelf \\0'.""" repo = _make_repo(tmp_path) e = _entry() write_shelf_entry(repo, e) path = shelf_entry_path(repo, str(e["id"])) assert path.exists() raw = path.read_bytes() null_idx = raw.index(b"\0") header = raw[:null_idx].decode() type_str, size_str = header.split(" ", 1) assert type_str == "shelf" assert int(size_str) == len(raw[null_idx + 1:]) def test_write_payload_is_valid_json(self, tmp_path: pathlib.Path) -> None: """The payload after the null byte must be valid UTF-8 JSON.""" repo = _make_repo(tmp_path) e = _entry() write_shelf_entry(repo, e) path = shelf_entry_path(repo, str(e["id"])) raw = path.read_bytes() null_idx = raw.index(b"\0") data = json.loads(raw[null_idx + 1:].decode("utf-8")) assert data["id"] == e["id"] def test_write_read_roundtrip(self, tmp_path: pathlib.Path) -> None: """write_shelf_entry() then read_shelf_entry() returns the same dict.""" repo = _make_repo(tmp_path) e = _entry("rt") write_shelf_entry(repo, e) result = read_shelf_entry(repo, str(e["id"])) assert result is not None assert result["id"] == e["id"] assert result["message"] == "shelf rt" assert result["snapshot"] == {f"file_rt.py": _OBJ_A} def test_file_has_no_extension(self, tmp_path: pathlib.Path) -> None: """The file created by write_shelf_entry() must have no extension.""" repo = _make_repo(tmp_path) e = _entry() write_shelf_entry(repo, e) path = shelf_entry_path(repo, str(e["id"])) assert path.suffix == "" assert path.exists() # --------------------------------------------------------------------------- # list_shelf_entries — finds new-format files # --------------------------------------------------------------------------- class TestListShelfEntries: def test_list_finds_new_format_entries(self, tmp_path: pathlib.Path) -> None: """list_shelf_entries() returns entries written in the new format.""" repo = _make_repo(tmp_path) e = _entry("list") write_shelf_entry(repo, e) results = list_shelf_entries(repo) assert len(results) == 1 assert results[0]["id"] == e["id"] def test_list_multiple_entries(self, tmp_path: pathlib.Path) -> None: """list_shelf_entries() returns all new-format entries sorted by created_at.""" repo = _make_repo(tmp_path) for tag in ("x", "y", "z"): write_shelf_entry(repo, _entry(tag)) results = list_shelf_entries(repo) assert len(results) == 3 # --------------------------------------------------------------------------- # Legacy .msgpack upgrade — read_shelf_entry # --------------------------------------------------------------------------- class TestLegacyShelfUpgrade: def test_legacy_msgpack_readable_via_read_shelf_entry(self, tmp_path: pathlib.Path) -> None: """read_shelf_entry() reads a legacy .msgpack shelf entry.""" repo = _make_repo(tmp_path) e = _entry("leg") _write_legacy_shelf(repo, e) result = read_shelf_entry(repo, str(e["id"])) assert result is not None assert result["id"] == e["id"] assert result["message"] == "shelf leg" def test_legacy_msgpack_readable_via_list(self, tmp_path: pathlib.Path) -> None: """list_shelf_entries() returns a legacy .msgpack shelf entry.""" repo = _make_repo(tmp_path) e = _entry("leglist") _write_legacy_shelf(repo, e) results = list_shelf_entries(repo) assert len(results) == 1 assert results[0]["id"] == e["id"] def test_legacy_migrated_to_new_format_on_read(self, tmp_path: pathlib.Path) -> None: """After read_shelf_entry() reads a legacy entry, the new-format file exists.""" repo = _make_repo(tmp_path) e = _entry("migr") _write_legacy_shelf(repo, e) read_shelf_entry(repo, str(e["id"])) new_path = shelf_entry_path(repo, str(e["id"])) assert new_path.exists(), "New-format shelf entry must exist after migration" raw = new_path.read_bytes() assert raw.startswith(b"shelf "), "Migrated file must use shelf header format" def test_legacy_msgpack_removed_after_migration(self, tmp_path: pathlib.Path) -> None: """The old .msgpack file is deleted after read_shelf_entry() migrates it.""" repo = _make_repo(tmp_path) e = _entry("del") legacy = _write_legacy_shelf(repo, e) assert legacy.exists() read_shelf_entry(repo, str(e["id"])) assert not legacy.exists(), "Old .msgpack shelf file must be removed after migration" def test_legacy_migrated_on_list(self, tmp_path: pathlib.Path) -> None: """list_shelf_entries() also migrates legacy .msgpack entries.""" repo = _make_repo(tmp_path) e = _entry("lmig") legacy = _write_legacy_shelf(repo, e) list_shelf_entries(repo) new_path = shelf_entry_path(repo, str(e["id"])) assert new_path.exists() assert not legacy.exists() def test_mixed_format_list(self, tmp_path: pathlib.Path) -> None: """list_shelf_entries() returns both new-format and legacy entries.""" repo = _make_repo(tmp_path) e_new = _entry("new") e_old = _entry("old") write_shelf_entry(repo, e_new) _write_legacy_shelf(repo, e_old) results = list_shelf_entries(repo) ids = {r["id"] for r in results} assert e_new["id"] in ids assert e_old["id"] in ids assert len(results) == 2 # --------------------------------------------------------------------------- # delete_shelf_entry — removes both new and legacy files # --------------------------------------------------------------------------- class TestDeleteShelfEntry: def test_delete_removes_new_format_file(self, tmp_path: pathlib.Path) -> None: """delete_shelf_entry() removes a new-format (no extension) shelf file.""" repo = _make_repo(tmp_path) e = _entry("delnew") write_shelf_entry(repo, e) assert shelf_entry_path(repo, str(e["id"])).exists() result = delete_shelf_entry(repo, str(e["id"])) assert result is True assert not shelf_entry_path(repo, str(e["id"])).exists() def test_delete_removes_legacy_msgpack_file(self, tmp_path: pathlib.Path) -> None: """delete_shelf_entry() also removes a legacy .msgpack shelf file.""" repo = _make_repo(tmp_path) e = _entry("delleg") legacy = _write_legacy_shelf(repo, e) assert legacy.exists() result = delete_shelf_entry(repo, str(e["id"])) assert result is True assert not legacy.exists() def test_delete_absent_entry_returns_false(self, tmp_path: pathlib.Path) -> None: """delete_shelf_entry() returns False when entry doesn't exist.""" repo = _make_repo(tmp_path) e = _entry("absent") result = delete_shelf_entry(repo, str(e["id"])) assert result is False # --------------------------------------------------------------------------- # GC — finds object IDs in new-format shelf entries # --------------------------------------------------------------------------- class TestGcShelfWalk: def test_gc_finds_object_ids_in_new_format_entry(self, tmp_path: pathlib.Path) -> None: """_collect_shelf_objects() marks object IDs from new-format entries reachable.""" from muse.core.gc import _collect_shelf_objects repo = _make_repo(tmp_path) e = _entry("gc") write_shelf_entry(repo, e) reachable: set[str] = set() _collect_shelf_objects(repo, reachable) assert _OBJ_A in reachable, \ "object ID from a new-format shelf entry must appear in GC reachable set"