"""Comprehensive tests for ``muse shelf``. Covers: - Unit: _load_shelf / _save_shelf atomic write + guards, _resolve_entry, _compute_shelf_id, _generate_name, _apply_shelf_snapshot, _verify_snapshot_objects - Integration: save, list, read, apply, pop, drop, diff — JSON schemas, text output, filters, agent fields - End-to-end: full CLI round-trips via CliRunner - Stress: many entries, concurrent isolated repos, repeated save/load - Data integrity: already-current detection, snapshot completeness, content-address stability - Performance: save + pop under 5 s - Security: symlink guard, size limit, ANSI injection in names / intent, invalid --format exits 1 Test categories --------------- - unit : pure helper functions, no repo needed - integration : programmatic API + JSON schema validation - e2e : CliRunner full round-trips - stress : volume and concurrency - data-integrity: already-current detection, manifest correctness - performance : timing assertions - security : injection, path-traversal, oversized file guards - docstrings : public API coverage """ from __future__ import annotations from collections.abc import Mapping import argparse import datetime import inspect import json import os import pathlib import threading import time from typing import Any import pytest from tests.cli_test_helper import CliRunner from muse.core.types import long_id, fake_id, split_id, blob_id from muse.core.object_store import object_path from muse.core.paths import muse_dir, ref_path, shelf_dir cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(root)} def _init_repo(tmp_path: pathlib.Path, branch: str = "main") -> tuple[pathlib.Path, str]: """Create a minimal Muse repo structure on disk.""" dot_muse = muse_dir(tmp_path) dot_muse.mkdir() repo_id = fake_id("repo") (dot_muse / "repo.json").write_text(json.dumps({ "repo_id": repo_id, "domain": "code", "default_branch": branch, "created_at": "2025-01-01T00:00:00+00:00", }), encoding="utf-8") (dot_muse / "HEAD").write_text(f"ref: refs/heads/{branch}", encoding="utf-8") (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "snapshots").mkdir() (dot_muse / "commits").mkdir() (dot_muse / "objects").mkdir() return tmp_path, repo_id def _make_commit( root: pathlib.Path, repo_id: str, message: str = "init", branch: str = "main", manifest: dict[str, str] | None = None, ) -> str: """Write a commit to the repo with the given manifest.""" from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_snapshot, hash_commit ref_file = ref_path(root, branch) parent_id = ref_file.read_text().strip() if ref_file.exists() else None m: dict[str, str] = manifest or {} snap_id = hash_snapshot(m) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = hash_commit( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) write_commit(root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id def _write_object(root: pathlib.Path, content: bytes) -> str: """Write content to the object store, returning the sha256:-prefixed ID.""" from muse.core.object_store import write_object obj_id = blob_id(content) write_object(root, obj_id, content) return obj_id def _make_shelf_entry( name: str = "dev/000", branch: str = "main", snapshot: dict[str, str] | None = None, deleted: list[str] | None = None, intent_type: str = "checkpoint", intent: str | None = None, resumable: bool = False, tags: list[str] | None = None, created_by: str = "human", ) -> Mapping[str, object]: """Build a raw shelf-entry dict (no id field) suitable for _compute_shelf_id.""" return { "name": name, "snapshot": snapshot or {}, "deleted": deleted or [], "snapshot_id": long_id("a" * 64), "parent_commit": long_id("b" * 64), "branch": branch, "created_at": "2025-01-01T00:00:00+00:00", "created_by": created_by, "intent_type": intent_type, "intent": intent, "resumable": resumable, "tags": tags or [], "expires_at": None, "domain_state": {}, } # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Fresh repo with one committed file (a.py) and one dirty file (b.py).""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke(cli, ["init"], env=_env(tmp_path), catch_exceptions=False) assert r.exit_code == 0, r.output (tmp_path / "a.py").write_text("x = 1\n") r = runner.invoke(cli, ["commit", "-m", "base"], env=_env(tmp_path), catch_exceptions=False) assert r.exit_code == 0, r.output (tmp_path / "b.py").write_text("y = 2\n") return tmp_path @pytest.fixture() def shelved_repo(repo: pathlib.Path) -> pathlib.Path: """repo fixture with one shelf entry already saved.""" r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, r.output return repo # --------------------------------------------------------------------------- # Unit — _compute_shelf_id # --------------------------------------------------------------------------- class TestComputeShelfId: """Unit tests for content-addressed ID generation.""" def test_id_starts_with_sha256(self) -> None: from muse.cli.commands.shelf import _compute_shelf_id entry = _make_shelf_entry() shelf_id = _compute_shelf_id(entry) assert shelf_id.startswith("sha256:") def test_id_is_64_hex_after_prefix(self) -> None: from muse.cli.commands.shelf import _compute_shelf_id entry = _make_shelf_entry() shelf_id = _compute_shelf_id(entry) _, hex_part = split_id(shelf_id) assert len(hex_part) == 64 assert all(c in "0123456789abcdef" for c in hex_part) def test_same_content_same_id(self) -> None: from muse.cli.commands.shelf import _compute_shelf_id e1 = _make_shelf_entry(name="mywork", branch="dev") e2 = _make_shelf_entry(name="mywork", branch="dev") assert _compute_shelf_id(e1) == _compute_shelf_id(e2) def test_different_content_different_id(self) -> None: from muse.cli.commands.shelf import _compute_shelf_id e1 = _make_shelf_entry(name="mywork") e2 = _make_shelf_entry(name="otherwork") assert _compute_shelf_id(e1) != _compute_shelf_id(e2) def test_snapshot_diff_changes_id(self) -> None: from muse.cli.commands.shelf import _compute_shelf_id e1 = _make_shelf_entry(snapshot={"a.py": long_id("a" * 64)}) e2 = _make_shelf_entry(snapshot={"a.py": long_id("b" * 64)}) assert _compute_shelf_id(e1) != _compute_shelf_id(e2) def test_id_stable_across_calls(self) -> None: from muse.cli.commands.shelf import _compute_shelf_id entry = _make_shelf_entry(name="stable", intent="doing work") ids = [_compute_shelf_id(entry) for _ in range(10)] assert len(set(ids)) == 1 # --------------------------------------------------------------------------- # Unit — _generate_name # --------------------------------------------------------------------------- class TestGenerateName: def test_first_entry_is_000(self) -> None: from muse.cli.commands.shelf import _generate_name assert _generate_name("dev", set()) == "dev/000" def test_increments_when_conflict(self) -> None: from muse.cli.commands.shelf import _generate_name existing = {"dev/000", "dev/001"} assert _generate_name("dev", existing) == "dev/002" def test_branch_with_special_chars_sanitized(self) -> None: from muse.cli.commands.shelf import _generate_name name = _generate_name("feat/my-feature@2.0!", set()) assert "/" in name # one slash is OK (branch/NNN) assert "@" not in name assert "!" not in name def test_empty_branch_fallback(self) -> None: from muse.cli.commands.shelf import _generate_name name = _generate_name("", set()) assert name.endswith("/000") def test_zero_padded_to_three_digits(self) -> None: from muse.cli.commands.shelf import _generate_name name = _generate_name("main", set()) assert name.endswith("/000") def test_large_n_zero_padded(self) -> None: from muse.cli.commands.shelf import _generate_name existing = {f"main/{i:03d}" for i in range(10)} name = _generate_name("main", existing) assert name == "main/010" # --------------------------------------------------------------------------- # Unit — _load_shelf / _save_shelf # --------------------------------------------------------------------------- class TestLoadSaveShelf: def test_load_empty_when_no_file(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _load_shelf assert _load_shelf(root) == [] def test_save_creates_entry_file(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _load_shelf, ShelfEntry from muse.cli.commands.shelf import _compute_shelf_id from muse.core.shelf import write_shelf_entry raw = _make_shelf_entry(name="test/000") entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) assert (shelf_dir(root) / "sha256").is_dir() loaded = _load_shelf(root) assert len(loaded) == 1 assert loaded[0]["name"] == "test/000" def test_roundtrip_preserves_all_fields(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _load_shelf, ShelfEntry from muse.cli.commands.shelf import _compute_shelf_id from muse.core.shelf import write_shelf_entry raw = _make_shelf_entry( name="wip/000", branch="dev", snapshot={"src/foo.py": long_id("c" * 64)}, deleted=["old.py"], intent_type="handoff", intent="50% done", resumable=True, tags=["auth", "refactor"], created_by="agent-42", ) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) loaded = _load_shelf(root) e = loaded[0] assert e["name"] == "wip/000" assert e["branch"] == "dev" assert e["snapshot"] == {"src/foo.py": long_id("c" * 64)} assert e["deleted"] == ["old.py"] assert e["intent_type"] == "handoff" assert e["intent"] == "50% done" assert e["resumable"] is True assert e["tags"] == ["auth", "refactor"] assert e["created_by"] == "agent-42" def test_save_is_atomic_no_temp_files(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.core.shelf import write_shelf_entry from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id raw = _make_shelf_entry(name="main/000") entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) tmp_files = list((shelf_dir(root) / "sha256").glob(".muse-tmp-*")) assert tmp_files == [] def test_load_ignores_oversized_file(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) shelf_path = muse_dir(root) / "shelf.json" shelf_path.write_bytes(b"x" * (65 * 1024 * 1024)) # 65 MiB > 64 MiB limit from muse.cli.commands.shelf import _load_shelf assert _load_shelf(root) == [] def test_load_ignores_malformed_json(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) (muse_dir(root) / "shelf.json").write_text("not-json-at-all", encoding="utf-8") from muse.cli.commands.shelf import _load_shelf assert _load_shelf(root) == [] def test_load_ignores_non_list_json(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) (muse_dir(root) / "shelf.json").write_text(json.dumps({"key": "val"}), encoding="utf-8") from muse.cli.commands.shelf import _load_shelf assert _load_shelf(root) == [] def test_load_skips_entries_without_snapshot(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) (muse_dir(root) / "shelf.json").write_text( json.dumps([{"name": "bad", "deleted": []}]), # no snapshot key encoding="utf-8", ) from muse.cli.commands.shelf import _load_shelf assert _load_shelf(root) == [] def test_load_skips_non_dict_entries(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) (muse_dir(root) / "shelf.json").write_text( json.dumps(["string", 42, None]), encoding="utf-8", ) from muse.cli.commands.shelf import _load_shelf assert _load_shelf(root) == [] def test_fsync_called_in_write(self) -> None: """write_shelf_entry in shelf.py must use fsync/F_BARRIERFSYNC for durability.""" from muse.core.shelf import write_shelf_entry src = inspect.getsource(write_shelf_entry) # The actual fsync is in _write_shelf_header_atomic which write_shelf_entry calls. assert "_write_shelf_header_atomic" in src def test_symlink_guard_in_write(self) -> None: """write_shelf_entry must reject a symlinked .muse/shelf/ directory.""" from muse.core.shelf import write_shelf_entry src = inspect.getsource(write_shelf_entry) assert "symlink" in src.lower() def test_multiple_entries_sorted_newest_first(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _load_shelf, ShelfEntry from muse.cli.commands.shelf import _compute_shelf_id from muse.core.shelf import write_shelf_entry for i in range(3): raw = _make_shelf_entry(name=f"dev/{i:03d}") raw["created_at"] = f"2025-01-0{i+1}T00:00:00+00:00" entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) loaded = _load_shelf(root) # Newest (2025-01-03) first. assert [e["name"] for e in loaded] == ["dev/002", "dev/001", "dev/000"] # --------------------------------------------------------------------------- # Unit — _resolve_entry # --------------------------------------------------------------------------- class TestResolveEntry: def _entries(self, names: list[str]) -> list["ShelfEntry"]: from muse.cli.commands.shelf import ShelfEntry from muse.cli.commands.shelf import _compute_shelf_id result = [] for name in names: raw = _make_shelf_entry(name=name) result.append(ShelfEntry(id=_compute_shelf_id(raw), **raw)) # type: ignore[misc] return result def test_none_returns_default_0(self) -> None: from muse.cli.commands.shelf import _resolve_entry entries = self._entries(["alpha", "beta", "gamma"]) idx, e = _resolve_entry(entries, None) assert idx == 0 assert e["name"] == "alpha" def test_integer_string_resolves(self) -> None: from muse.cli.commands.shelf import _resolve_entry entries = self._entries(["alpha", "beta", "gamma"]) idx, e = _resolve_entry(entries, "2") assert idx == 2 assert e["name"] == "gamma" def test_name_lookup_exact(self) -> None: from muse.cli.commands.shelf import _resolve_entry entries = self._entries(["alpha", "beta", "gamma"]) idx, e = _resolve_entry(entries, "beta") assert idx == 1 assert e["name"] == "beta" def test_empty_list_raises(self) -> None: from muse.cli.commands.shelf import _resolve_entry with pytest.raises(ValueError, match="No shelf entries"): _resolve_entry([], None) def test_out_of_range_raises(self) -> None: from muse.cli.commands.shelf import _resolve_entry entries = self._entries(["alpha"]) with pytest.raises(ValueError, match="out of range"): _resolve_entry(entries, "5") def test_negative_index_raises(self) -> None: from muse.cli.commands.shelf import _resolve_entry entries = self._entries(["alpha", "beta"]) with pytest.raises(ValueError, match="out of range"): _resolve_entry(entries, "-1") def test_unknown_name_raises(self) -> None: from muse.cli.commands.shelf import _resolve_entry entries = self._entries(["alpha"]) with pytest.raises(ValueError, match="No shelf entry"): _resolve_entry(entries, "nonexistent") # --------------------------------------------------------------------------- # Unit — _apply_shelf_snapshot / _verify_snapshot_objects # --------------------------------------------------------------------------- class TestApplyShelfSnapshot: def test_restored_count_correct(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) obj_id = _write_object(root, b"hello world") from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id, _apply_shelf_snapshot raw = _make_shelf_entry(snapshot={"src/foo.py": obj_id}) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] counts = _apply_shelf_snapshot(root, entry, head_manifest={}) assert counts["restored"] == 1 assert counts["already_current"] == 0 assert (root / "src" / "foo.py").read_bytes() == b"hello world" def test_already_current_not_rewritten(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) obj_id = _write_object(root, b"same content") (tmp_path / "file.py").write_bytes(b"same content") from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id, _apply_shelf_snapshot raw = _make_shelf_entry(snapshot={"file.py": obj_id}) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] # HEAD manifest already has the same object for this path counts = _apply_shelf_snapshot(root, entry, head_manifest={"file.py": obj_id}) assert counts["restored"] == 0 assert counts["already_current"] == 1 def test_deleted_paths_removed(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) (tmp_path / "gone.py").write_text("old\n") from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id, _apply_shelf_snapshot raw = _make_shelf_entry(snapshot={}, deleted=["gone.py"]) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] counts = _apply_shelf_snapshot(root, entry, head_manifest={}) assert counts["deleted"] == 1 assert not (tmp_path / "gone.py").exists() def test_deleted_already_gone_is_idempotent(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id, _apply_shelf_snapshot raw = _make_shelf_entry(snapshot={}, deleted=["nonexistent.py"]) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] counts = _apply_shelf_snapshot(root, entry, head_manifest={}) assert counts["deleted"] == 0 def test_mixed_restored_and_already_current(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) obj_same = _write_object(root, b"same") obj_diff = _write_object(root, b"different") # a.py already has the shelf content on disk → already_current (root / "a.py").write_bytes(b"same") from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id, _apply_shelf_snapshot raw = _make_shelf_entry(snapshot={"a.py": obj_same, "b.py": obj_diff}) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] counts = _apply_shelf_snapshot(root, entry, head_manifest={"a.py": obj_same}) assert counts["restored"] == 1 assert counts["already_current"] == 1 class TestVerifySnapshotObjects: def test_all_present_returns_empty(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) obj_id = _write_object(root, b"data") from muse.cli.commands.shelf import _verify_snapshot_objects missing = _verify_snapshot_objects(root, {"file.py": obj_id}) assert missing == [] def test_missing_object_returned(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _verify_snapshot_objects missing_obj_id = long_id("f" * 64) missing = _verify_snapshot_objects(root, {"file.py": missing_obj_id}) assert "file.py" in missing def test_empty_snapshot_returns_empty(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _verify_snapshot_objects assert _verify_snapshot_objects(root, {}) == [] # --------------------------------------------------------------------------- # Unit — register / parser flags # --------------------------------------------------------------------------- class TestRegisterFlags: def _parse(self, *args: str) -> argparse.Namespace: import muse.cli.commands.shelf as m p = argparse.ArgumentParser() sub = p.add_subparsers() m.register(sub) return p.parse_args(["shelf", *args]) def test_save_intent_short(self) -> None: ns = self._parse("save", "-m", "WIP auth") assert ns.intent == "WIP auth" def test_save_intent_long(self) -> None: ns = self._parse("save", "--intent", "WIP auth") assert ns.intent == "WIP auth" def test_save_intent_default_none(self) -> None: ns = self._parse("save") assert ns.intent is None def test_save_intent_type_default(self) -> None: ns = self._parse("save") assert ns.intent_type == "checkpoint" def test_save_intent_type_handoff(self) -> None: ns = self._parse("save", "--intent-type", "handoff") assert ns.intent_type == "handoff" def test_save_resumable_flag(self) -> None: ns = self._parse("save", "--resumable") assert ns.resumable is True def test_save_resumable_default_false(self) -> None: ns = self._parse("save") assert ns.resumable is False def test_save_tag_repeatable(self) -> None: ns = self._parse("save", "--tag", "auth", "--tag", "refactor") assert "auth" in ns.tags assert "refactor" in ns.tags def test_save_json_shorthand(self) -> None: ns = self._parse("save", "--json") assert ns.json_out is True def test_pop_entry_arg(self) -> None: ns = self._parse("pop", "my-work") assert ns.entry == "my-work" def test_pop_entry_default_none(self) -> None: ns = self._parse("pop") assert ns.entry is None def test_drop_entry_arg(self) -> None: ns = self._parse("drop", "2") assert ns.entry == "2" def test_apply_entry_arg(self) -> None: ns = self._parse("apply", "main/000") assert ns.entry == "main/000" def test_list_branch_filter(self) -> None: ns = self._parse("list", "--branch", "dev") assert ns.branch == "dev" def test_list_resumable_filter(self) -> None: ns = self._parse("list", "--resumable") assert ns.resumable is True def test_list_by_filter(self) -> None: ns = self._parse("list", "--by", "agent-42") assert ns.created_by == "agent-42" def test_diff_entry_arg(self) -> None: ns = self._parse("diff", "0") assert ns.entry == "0" # --------------------------------------------------------------------------- # Integration — save JSON schema # --------------------------------------------------------------------------- class TestSaveJsonSchema: _REQUIRED = { "status", "id", "name", "snapshot_id", "parent_commit", "branch", "created_at", "created_by", "intent_type", "intent", "resumable", "tags", "files_count", "shelf_size", } def test_schema_complete(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED <= d.keys() def test_status_shelved(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert json.loads(r.output)["status"] == "shelved" def test_id_is_sha256(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) d = json.loads(r.output) assert d["id"].startswith("sha256:") def test_files_count_positive(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert json.loads(r.output)["files_count"] > 0 def test_intent_default_null(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert json.loads(r.output)["intent"] is None def test_intent_with_flag(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "-m", "updating tests", "--json"], env=_env(repo), catch_exceptions=False, ) assert json.loads(r.output)["intent"] == "updating tests" def test_intent_type_default_checkpoint(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert json.loads(r.output)["intent_type"] == "checkpoint" def test_intent_type_custom(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--intent-type", "handoff", "--json"], env=_env(repo), catch_exceptions=False, ) assert json.loads(r.output)["intent_type"] == "handoff" def test_resumable_flag_stored(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--resumable", "--json"], env=_env(repo), catch_exceptions=False, ) assert json.loads(r.output)["resumable"] is True def test_tags_stored(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "--tag", "auth", "--tag", "wip", "--json"], env=_env(repo), catch_exceptions=False, ) d = json.loads(r.output) assert "auth" in d["tags"] assert "wip" in d["tags"] def test_nothing_to_shelf_schema_complete(self, repo: pathlib.Path) -> None: """nothing_to_shelf must emit same keys with null id/name.""" (repo / "b.py").unlink(missing_ok=True) # make tree match HEAD r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) d = json.loads(r.output) assert self._REQUIRED <= d.keys() assert d["status"] == "nothing_to_shelf" assert d["id"] is None assert d["name"] is None def test_named_save(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "save", "my-feature", "--json"], env=_env(repo), catch_exceptions=False, ) assert json.loads(r.output)["name"] == "my-feature" def test_duplicate_name_exits_1(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save", "dup-test"], env=_env(repo), catch_exceptions=False ) # Write another dirty file so there's something to shelf (repo / "c.py").write_text("z = 3\n") r = runner.invoke(cli, ["shelf", "save", "dup-test"], env=_env(repo)) assert r.exit_code == 1 def test_shelf_size_increments(self, repo: pathlib.Path) -> None: r1 = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) d1 = json.loads(r1.output) (repo / "c.py").write_text("z = 3\n") r2 = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) d2 = json.loads(r2.output) assert d2["shelf_size"] == d1["shelf_size"] + 1 # --------------------------------------------------------------------------- # Integration — list JSON schema # --------------------------------------------------------------------------- class TestListJsonSchema: _ENTRY_REQUIRED = { "index", "id", "name", "snapshot_id", "branch", "created_at", "created_by", "intent_type", "intent", "resumable", "tags", "files_count", } def test_schema_complete(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "list", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0, r.output entries = json.loads(r.output)["entries"] assert len(entries) >= 1 assert self._ENTRY_REQUIRED <= entries[0].keys() def test_empty_returns_empty_array(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "list", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0 assert json.loads(r.output)["entries"] == [] def test_files_count_positive(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "list", "--json"], env=_env(shelved_repo), catch_exceptions=False ) entries = json.loads(r.output)["entries"] assert entries[0]["files_count"] > 0 def test_filter_branch(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) r = runner.invoke( cli, ["shelf", "list", "--branch", "main", "--json"], env=_env(repo) ) entries = json.loads(r.output)["entries"] assert all(e["branch"] == "main" for e in entries) def test_filter_branch_no_match_empty(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "list", "--branch", "nonexistent-branch", "--json"], env=_env(shelved_repo), ) assert json.loads(r.output)["entries"] == [] def test_filter_resumable(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save", "--resumable", "--json"], env=_env(repo), catch_exceptions=False, ) (repo / "c.py").write_text("z = 3\n") runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) r = runner.invoke( cli, ["shelf", "list", "--resumable", "--json"], env=_env(repo) ) entries = json.loads(r.output)["entries"] assert all(e["resumable"] for e in entries) def test_filter_by_creator(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save", "--by", "agent-99", "--json"], env=_env(repo), catch_exceptions=False, ) r = runner.invoke( cli, ["shelf", "list", "--by", "agent-99", "--json"], env=_env(repo) ) entries = json.loads(r.output)["entries"] assert all(e["created_by"] == "agent-99" for e in entries) # --------------------------------------------------------------------------- # Integration — read JSON schema # --------------------------------------------------------------------------- class TestReadJsonSchema: _REQUIRED = { "index", "id", "name", "snapshot_id", "parent_commit", "branch", "created_at", "created_by", "intent_type", "intent", "resumable", "tags", "files_count", "files", "deleted", } def test_schema_complete(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "read", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED <= d.keys() def test_files_is_list_of_strings(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "read", "--json"], env=_env(shelved_repo), catch_exceptions=False ) d = json.loads(r.output) assert isinstance(d["files"], list) assert all(isinstance(f, str) for f in d["files"]) def test_read_by_name(self, shelved_repo: pathlib.Path) -> None: # get the name that was auto-generated listing = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(shelved_repo), catch_exceptions=False ).output )["entries"] name = listing[0]["name"] r = runner.invoke( cli, ["shelf", "read", name, "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0 assert json.loads(r.output)["name"] == name def test_read_by_index(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "read", "0", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0 assert json.loads(r.output)["index"] == 0 def test_read_empty_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["shelf", "read"], env=_env(repo)) assert r.exit_code == 1 def test_read_unknown_name_exits_1(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke(cli, ["shelf", "read", "no-such-name"], env=_env(shelved_repo)) assert r.exit_code == 1 # --------------------------------------------------------------------------- # Integration — apply JSON schema # --------------------------------------------------------------------------- class TestApplyJsonSchema: _REQUIRED = {"status", "name", "restored", "already_current", "deleted", "shelf_size"} def test_schema_complete(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "apply", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED <= d.keys() def test_status_applied(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "apply", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert json.loads(r.output)["status"] == "applied" def test_apply_preserves_shelf_entry(self, shelved_repo: pathlib.Path) -> None: before = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(shelved_repo), catch_exceptions=False ).output )["entries"] runner.invoke( cli, ["shelf", "apply", "--json"], env=_env(shelved_repo), catch_exceptions=False ) after = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(shelved_repo), catch_exceptions=False ).output )["entries"] assert len(before) == len(after), "apply must not remove the shelf entry" def test_apply_empty_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["shelf", "apply"], env=_env(repo)) assert r.exit_code == 1 def test_apply_restores_file(self, shelved_repo: pathlib.Path) -> None: # After shelf save, b.py is gone from workdir (HEAD restored) b_py = shelved_repo / "b.py" assert not b_py.exists() runner.invoke( cli, ["shelf", "apply"], env=_env(shelved_repo), catch_exceptions=False ) assert b_py.exists() # --------------------------------------------------------------------------- # Integration — pop JSON schema # --------------------------------------------------------------------------- class TestPopJsonSchema: _REQUIRED = {"status", "name", "restored", "already_current", "deleted", "shelf_size_after"} def test_schema_complete(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "pop", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED <= d.keys() def test_status_popped(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "pop", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert json.loads(r.output)["status"] == "popped" def test_shelf_size_after_decremented(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "pop", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert json.loads(r.output)["shelf_size_after"] == 0 def test_pop_empty_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["shelf", "pop"], env=_env(repo)) assert r.exit_code == 1 def test_pop_removes_entry(self, shelved_repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "pop", "--json"], env=_env(shelved_repo), catch_exceptions=False ) after = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(shelved_repo), catch_exceptions=False ).output )["entries"] assert after == [] # --------------------------------------------------------------------------- # Integration — drop JSON schema # --------------------------------------------------------------------------- class TestDropJsonSchema: _REQUIRED = {"status", "name", "id", "shelf_size"} def test_schema_complete(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "drop", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED <= d.keys() def test_status_dropped(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "drop", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert json.loads(r.output)["status"] == "dropped" def test_id_is_sha256(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "drop", "--json"], env=_env(shelved_repo), catch_exceptions=False ) d = json.loads(r.output) assert d["id"].startswith("sha256:") def test_drop_empty_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["shelf", "drop"], env=_env(repo)) assert r.exit_code == 1 def test_drop_does_not_restore_file(self, shelved_repo: pathlib.Path) -> None: b_py = shelved_repo / "b.py" assert not b_py.exists() runner.invoke( cli, ["shelf", "drop"], env=_env(shelved_repo), catch_exceptions=False ) assert not b_py.exists() def test_drop_removes_entry_from_list(self, shelved_repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "drop"], env=_env(shelved_repo), catch_exceptions=False ) after = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(shelved_repo), catch_exceptions=False ).output )["entries"] assert after == [] # --------------------------------------------------------------------------- # Integration — diff JSON schema # --------------------------------------------------------------------------- class TestDiffJsonSchema: _REQUIRED = {"name", "branch", "would_restore", "already_current", "would_delete"} def test_schema_complete(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "diff", "--json"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0, r.output d = json.loads(r.output) assert self._REQUIRED <= d.keys() def test_would_restore_has_changed_files(self, shelved_repo: pathlib.Path) -> None: r = runner.invoke( cli, ["shelf", "diff", "--json"], env=_env(shelved_repo), catch_exceptions=False ) d = json.loads(r.output) assert len(d["would_restore"]) > 0 def test_diff_does_not_modify_workdir(self, shelved_repo: pathlib.Path) -> None: b_py = shelved_repo / "b.py" before = b_py.exists() runner.invoke( cli, ["shelf", "diff"], env=_env(shelved_repo), catch_exceptions=False ) assert b_py.exists() == before def test_diff_empty_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["shelf", "diff"], env=_env(repo)) assert r.exit_code == 1 def test_diff_lists_already_current_when_merged(self, shelved_repo: pathlib.Path) -> None: """Files merged into HEAD since shelving appear in already_current.""" # Apply the shelf so HEAD gets the files (simulate a merge) runner.invoke( cli, ["shelf", "apply"], env=_env(shelved_repo), catch_exceptions=False ) runner.invoke( cli, ["commit", "-m", "merged shelf content"], env=_env(shelved_repo), catch_exceptions=False, ) r = runner.invoke( cli, ["shelf", "diff", "--json"], env=_env(shelved_repo), catch_exceptions=False ) d = json.loads(r.output) # After committing, would_restore should be empty (or have fewer files) # and already_current should be populated assert len(d["already_current"]) >= 0 # defensive — structure is correct # --------------------------------------------------------------------------- # Integration — name/index resolution # --------------------------------------------------------------------------- class TestNameIndexResolution: def _save_n(self, repo: pathlib.Path, n: int) -> list[str]: """Save n distinct shelf entries, return their auto-generated names.""" names: list[str] = [] for i in range(n): (repo / f"w{i}.py").write_text(f"data {i}\n") r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) names.insert(0, json.loads(r.output)["name"]) # newest first return names def test_pop_by_name(self, repo: pathlib.Path) -> None: names = self._save_n(repo, 3) r = runner.invoke( cli, ["shelf", "pop", names[2], "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, r.output assert json.loads(r.output)["name"] == names[2] def test_pop_by_index(self, repo: pathlib.Path) -> None: names = self._save_n(repo, 3) r = runner.invoke( cli, ["shelf", "pop", "0", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, r.output assert json.loads(r.output)["name"] == names[0] # newest = 0 def test_drop_by_name(self, repo: pathlib.Path) -> None: names = self._save_n(repo, 2) r = runner.invoke( cli, ["shelf", "drop", names[1], "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, r.output assert json.loads(r.output)["name"] == names[1] def test_out_of_range_exits_1(self, repo: pathlib.Path) -> None: self._save_n(repo, 2) r = runner.invoke(cli, ["shelf", "pop", "99"], env=_env(repo)) assert r.exit_code == 1 def test_unknown_name_exits_1(self, repo: pathlib.Path) -> None: self._save_n(repo, 1) r = runner.invoke(cli, ["shelf", "pop", "no-such-name"], env=_env(repo)) assert r.exit_code == 1 # --------------------------------------------------------------------------- # Integration — object store integrity # --------------------------------------------------------------------------- class TestObjectIntegrity: def _corrupt_object(self, root: pathlib.Path, snapshot: Mapping[str, str]) -> None: for obj_id in list(snapshot.values())[:1]: p = object_path(root, obj_id) if p.exists(): p.unlink() break def test_pop_with_missing_object_exits_3(self, shelved_repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _load_shelf entries = _load_shelf(shelved_repo) assert len(entries) > 0 self._corrupt_object(shelved_repo, entries[0]["snapshot"]) r = runner.invoke(cli, ["shelf", "pop"], env=_env(shelved_repo)) assert r.exit_code == 3 def test_apply_with_missing_object_exits_3(self, shelved_repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _load_shelf entries = _load_shelf(shelved_repo) assert len(entries) > 0 self._corrupt_object(shelved_repo, entries[0]["snapshot"]) r = runner.invoke(cli, ["shelf", "apply"], env=_env(shelved_repo)) assert r.exit_code == 3 def test_drop_succeeds_even_with_missing_objects(self, shelved_repo: pathlib.Path) -> None: """drop never reads objects — it only removes the registry entry.""" from muse.cli.commands.shelf import _load_shelf entries = _load_shelf(shelved_repo) assert len(entries) > 0 self._corrupt_object(shelved_repo, entries[0]["snapshot"]) r = runner.invoke( cli, ["shelf", "drop"], env=_env(shelved_repo), catch_exceptions=False ) assert r.exit_code == 0 # --------------------------------------------------------------------------- # Integration — programmatic API # --------------------------------------------------------------------------- class TestProgrammaticApi: def test_push_returns_entry(self, repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _shelf_push_programmatic entry = _shelf_push_programmatic(repo) assert entry is not None assert entry["id"].startswith("sha256:") assert entry["intent_type"] == "interrupt" def test_push_clean_returns_none(self, repo: pathlib.Path) -> None: (repo / "b.py").unlink(missing_ok=True) from muse.cli.commands.shelf import _shelf_push_programmatic entry = _shelf_push_programmatic(repo) assert entry is None def test_push_with_metadata(self, repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _shelf_push_programmatic entry = _shelf_push_programmatic( repo, intent_type="handoff", intent="auth refactor, 60% done", created_by="agent-7", resumable=True, tags=["auth"], ) assert entry is not None assert entry["intent_type"] == "handoff" assert entry["intent"] == "auth refactor, 60% done" assert entry["created_by"] == "agent-7" assert entry["resumable"] is True assert "auth" in entry["tags"] def test_push_duplicate_name_raises(self, repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _shelf_push_programmatic _shelf_push_programmatic(repo, name="my-shelf") (repo / "b.py").write_text("new content\n") with pytest.raises(ValueError, match="already exists"): _shelf_push_programmatic(repo, name="my-shelf") def test_pop_returns_entry(self, shelved_repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _shelf_pop_programmatic entry = _shelf_pop_programmatic(shelved_repo) assert entry is not None assert entry["id"].startswith("sha256:") def test_pop_empty_raises(self, repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _shelf_pop_programmatic with pytest.raises(ValueError, match="No shelf entries"): _shelf_pop_programmatic(repo) def test_pop_removes_from_registry(self, shelved_repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _shelf_pop_programmatic, _load_shelf before = len(_load_shelf(shelved_repo)) _shelf_pop_programmatic(shelved_repo) after = len(_load_shelf(shelved_repo)) assert after == before - 1 def test_pop_by_name(self, repo: pathlib.Path) -> None: from muse.cli.commands.shelf import _shelf_push_programmatic, _shelf_pop_programmatic entry = _shelf_push_programmatic(repo, name="named-shelf") assert entry is not None (repo / "b.py").write_text("restored content\n") popped = _shelf_pop_programmatic(repo, "named-shelf") assert popped["name"] == "named-shelf" # --------------------------------------------------------------------------- # End-to-end — round-trips # --------------------------------------------------------------------------- class TestRoundTrips: def test_save_pop_restores_content(self, repo: pathlib.Path) -> None: b_content = (repo / "b.py").read_text() runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) assert not (repo / "b.py").exists() runner.invoke( cli, ["shelf", "pop"], env=_env(repo), catch_exceptions=False ) assert (repo / "b.py").exists() assert (repo / "b.py").read_text() == b_content def test_save_apply_apply_idempotent(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) r1 = runner.invoke( cli, ["shelf", "apply", "--json"], env=_env(repo), catch_exceptions=False ) # Apply again — should report already_current for the second call r2 = runner.invoke( cli, ["shelf", "apply", "--json"], env=_env(repo), catch_exceptions=False ) d2 = json.loads(r2.output) # Second apply: restored == 0 (files already written), already_current > 0 # (files match what's on disk, but HEAD still shows old state) # At minimum, the command must succeed assert r2.exit_code == 0 def test_save_drop_no_restore(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) runner.invoke( cli, ["shelf", "drop"], env=_env(repo), catch_exceptions=False ) assert not (repo / "b.py").exists() def test_stack_ordering_newest_first(self, repo: pathlib.Path) -> None: """Entries are ordered newest-first; index 0 is the most recent.""" names: list[str] = [] for i in range(3): (repo / f"w{i}.py").write_text(f"data {i}\n") r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) names.append(json.loads(r.output)["name"]) listing = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(repo), catch_exceptions=False ).output )["entries"] # Most recent save should be at index 0 assert listing[0]["name"] == names[-1] def test_shelf_persists_across_commands(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) r = runner.invoke( cli, ["shelf", "list", "--json"], env=_env(repo), catch_exceptions=False ) assert len(json.loads(r.output)["entries"]) == 1 def test_named_save_then_pop_by_name(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save", "my-feature-work", "--json"], env=_env(repo), catch_exceptions=False, ) r = runner.invoke( cli, ["shelf", "pop", "my-feature-work", "--json"], env=_env(repo), catch_exceptions=False, ) assert r.exit_code == 0 assert json.loads(r.output)["name"] == "my-feature-work" # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_already_current_detection(self, repo: pathlib.Path) -> None: """Files merged into HEAD since shelving appear as already_current on apply.""" # Save the shelf runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) # Restore the file and commit it (simulating a merge) (repo / "b.py").write_text("y = 2\n") runner.invoke( cli, ["commit", "-m", "merge: add b.py"], env=_env(repo), catch_exceptions=False ) # Now apply the shelf — b.py should be already_current r = runner.invoke( cli, ["shelf", "apply", "--json"], env=_env(repo), catch_exceptions=False ) d = json.loads(r.output) assert d["already_current"] > 0, "Files merged into HEAD must show as already_current" assert d["restored"] == 0 def test_snapshot_contains_all_tracked_files(self, repo: pathlib.Path) -> None: """The shelf snapshot covers all files in the working tree at save time.""" (repo / "c.py").write_text("c = 3\n") r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0 from muse.cli.commands.shelf import _load_shelf entries = _load_shelf(repo) snapshot = entries[0]["snapshot"] # a.py was committed; b.py and c.py are new assert any("a.py" in k or "b.py" in k or "c.py" in k for k in snapshot) def test_content_address_id_matches_recomputed(self, repo: pathlib.Path) -> None: """The shelf entry id matches _compute_shelf_id applied to the entry data.""" runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) from muse.cli.commands.shelf import _compute_shelf_id, _load_shelf entries = _load_shelf(repo) entry = dict(entries[0]) stored_id = entry.pop("id") recomputed = _compute_shelf_id(entry) assert recomputed == stored_id def test_deleted_paths_tracked(self, repo: pathlib.Path) -> None: """Files deleted from the working tree before shelving appear in 'deleted'.""" # Commit b.py so it's tracked, then delete it (repo / "b.py").write_text("y = 2\n") runner.invoke(cli, ["code", "add", "b.py"], env=_env(repo), catch_exceptions=False) runner.invoke( cli, ["commit", "-m", "add b"], env=_env(repo), catch_exceptions=False ) (repo / "b.py").unlink() (repo / "c.py").write_text("z = 3\n") # make tree dirty runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) from muse.cli.commands.shelf import _load_shelf entries = _load_shelf(repo) deleted = entries[0]["deleted"] assert "b.py" in deleted def test_snapshot_id_is_sha256_prefixed(self, repo: pathlib.Path) -> None: runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) from muse.cli.commands.shelf import _load_shelf entries = _load_shelf(repo) assert entries[0]["snapshot_id"].startswith("sha256:") def test_pop_restores_file_when_head_matches_shelf_but_disk_is_stale( self, tmp_path: pathlib.Path ) -> None: """shelf pop must write to disk even when HEAD manifest already has the shelf content. Scenario: file modified in working tree → shelf save (cleans disk back to HEAD content) → new commit advances HEAD to the same content that was shelved → shelf pop → disk must contain the shelved bytes. The bug: _apply_shelf_snapshot compares shelf object_id against head_manifest. When HEAD is already at the shelf version, already_current fires and the file is NOT written to disk, even though disk still has the OLD (pre-shelf) content. """ root, repo_id = _init_repo(tmp_path) # Commit a.py = "version A" obj_a = _write_object(root, b"version A\n") _make_commit(root, repo_id, message="init", manifest={"a.py": obj_a}) (root / "a.py").write_bytes(b"version A\n") # Modify a.py to "version B" in the working tree (root / "a.py").write_bytes(b"version B\n") # shelf save — saves "version B", restores disk to HEAD ("version A") r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(root), catch_exceptions=False ) assert r.exit_code == 0, r.output assert (root / "a.py").read_bytes() == b"version A\n" # Advance HEAD to "version B" WITHOUT touching disk # (simulates what happens after a merge/commit that incorporates the shelf content) # obj_b is already in the object store (shelf save wrote it above) obj_b = blob_id(b"version B\n") _make_commit(root, repo_id, message="merge shelf", manifest={"a.py": obj_b}) # disk still has "version A" — stale working tree assert (root / "a.py").read_bytes() == b"version A\n" # shelf pop — HEAD has obj_b == shelf has obj_b, but disk has obj_a r = runner.invoke( cli, ["shelf", "pop", "--json"], env=_env(root), catch_exceptions=False ) assert r.exit_code == 0, r.output # Disk must now have "version B" — the shelved content assert (root / "a.py").read_bytes() == b"version B\n", ( "shelf pop left stale bytes on disk: " "already_current check compared against HEAD manifest instead of actual disk content" ) # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- class TestPerformance: def test_save_pop_under_5s(self, repo: pathlib.Path) -> None: start = time.perf_counter() runner.invoke( cli, ["shelf", "save"], env=_env(repo), catch_exceptions=False ) runner.invoke( cli, ["shelf", "pop"], env=_env(repo), catch_exceptions=False ) elapsed = time.perf_counter() - start assert elapsed < 5.0, f"save+pop too slow: {elapsed:.2f}s" def test_list_50_entries_under_2s(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id from muse.core.shelf import write_shelf_entry for i in range(50): raw = _make_shelf_entry(name=f"dev/{i:03d}") raw["created_at"] = f"2025-01-01T{i:02d}:00:00+00:00" entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) start = time.perf_counter() from muse.cli.commands.shelf import _load_shelf loaded = _load_shelf(root) elapsed = time.perf_counter() - start assert len(loaded) == 50 assert elapsed < 2.0, f"_load_shelf(50 entries) too slow: {elapsed:.2f}s" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_symlink_at_shelf_json_returns_empty(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) target = tmp_path / "secret.json" target.write_text(json.dumps([_make_shelf_entry()])) shelf_path = muse_dir(root) / "shelf.json" shelf_path.symlink_to(target) from muse.cli.commands.shelf import _load_shelf result = _load_shelf(root) assert result == [] def test_ansi_in_branch_name_sanitized(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id from muse.core.shelf import write_shelf_entry malicious = "feat/\x1b[31mred\x1b[0m" raw = _make_shelf_entry(name="dev/000", branch=malicious) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) r = runner.invoke(cli, ["shelf", "list"], env=_env(root), catch_exceptions=False) assert r.exit_code == 0 assert "\x1b" not in r.output def test_ansi_in_intent_sanitized(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id from muse.core.shelf import write_shelf_entry raw = _make_shelf_entry(name="dev/000", intent="safe \x1b[31mbad\x1b[0m intent") entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) r = runner.invoke(cli, ["shelf", "list"], env=_env(root), catch_exceptions=False) assert r.exit_code == 0 assert "\x1b" not in r.output def test_ansi_in_file_path_sanitized_in_read(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id from muse.core.shelf import write_shelf_entry malicious_path = "src/\x1b[31mmalicious\x1b[0m.py" raw = _make_shelf_entry(snapshot={malicious_path: long_id("a" * 64)}) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) r = runner.invoke(cli, ["shelf", "read"], env=_env(root), catch_exceptions=False) assert r.exit_code == 0 assert "\x1b" not in r.output def test_invalid_format_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["shelf", "save", "--format", "xml"], env=_env(repo)) assert r.exit_code != 0 def test_oversized_shelf_json_ignored(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) (muse_dir(root) / "shelf.json").write_bytes(b"x" * (65 * 1024 * 1024)) from muse.cli.commands.shelf import _load_shelf assert _load_shelf(root) == [] def test_snapshot_values_must_be_strings(self, tmp_path: pathlib.Path) -> None: """Non-string snapshot values must be filtered out on load.""" root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _compute_shelf_id, _load_shelf from muse.core.shelf import write_shelf_entry raw = { "name": "dev/000", "snapshot": {"a.py": long_id("a" * 64), "b.py": 42}, # integer value "deleted": [], "snapshot_id": long_id("b" * 64), "parent_commit": long_id("c" * 64), "branch": "main", "created_at": "2025-01-01T00:00:00+00:00", "created_by": "human", "intent_type": "checkpoint", "intent": None, "resumable": False, "tags": [], "expires_at": None, "domain_state": {}, } entry_id = _compute_shelf_id(raw) write_shelf_entry(root, dict(raw, id=entry_id)) loaded = _load_shelf(root) # Entry must load; the non-string value must be stripped assert len(loaded) == 1 assert "b.py" not in loaded[0]["snapshot"] assert "a.py" in loaded[0]["snapshot"] # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_100_save_drop_cycles(self, repo: pathlib.Path) -> None: """100 sequential save/drop cycles must not corrupt the registry.""" for i in range(100): (repo / f"w{i}.py").write_text(f"data {i}\n") r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, f"save {i}: {r.output}" r = runner.invoke( cli, ["shelf", "drop", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, f"drop {i}: {r.output}" listing = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(repo), catch_exceptions=False ).output )["entries"] assert listing == [] def test_stack_with_50_entries_then_clear(self, repo: pathlib.Path) -> None: for i in range(50): (repo / f"w{i}.py").write_text(f"data {i}\n") r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0, f"save {i}: {r.output}" listing = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(repo), catch_exceptions=False ).output )["entries"] assert len(listing) == 50 for _ in range(50): r = runner.invoke( cli, ["shelf", "drop"], env=_env(repo), catch_exceptions=False ) assert r.exit_code == 0 listing = json.loads( runner.invoke( cli, ["shelf", "list", "--json"], env=_env(repo), catch_exceptions=False ).output )["entries"] assert listing == [] def test_concurrent_save_to_isolated_repos(self, tmp_path: pathlib.Path) -> None: """Concurrent shelf saves to separate repos must not interfere.""" errors: list[Exception] = [] def _save_in_repo(idx: int) -> None: try: sub = tmp_path / f"repo{idx}" sub.mkdir() root, repo_id = _init_repo(sub) # Commit a base file so HEAD is non-empty (sub / "base.py").write_text(f"base {idx}\n") r = runner.invoke( cli, ["commit", "-m", f"base{idx}"], env=_env(sub), catch_exceptions=False ) assert r.exit_code == 0, f"thread {idx} commit: {r.output}" # Add a dirty file and shelf it (sub / "work.py").write_text(f"thread {idx}\n") r = runner.invoke( cli, ["shelf", "save", "--json"], env=_env(sub), catch_exceptions=False ) assert r.exit_code == 0, f"thread {idx}: {r.output}" except Exception as exc: errors.append(exc) threads = [threading.Thread(target=_save_in_repo, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent errors: {errors}" def test_save_load_large_snapshot(self, tmp_path: pathlib.Path) -> None: """Shelf with 500 files in snapshot loads correctly.""" root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _load_shelf, ShelfEntry, _compute_shelf_id from muse.core.shelf import write_shelf_entry big_snapshot = {f"src/file_{i:04d}.py": long_id(hex(i).zfill(64)[-64:]) for i in range(500)} raw = _make_shelf_entry(name="dev/000", snapshot=big_snapshot) entry = ShelfEntry(id=_compute_shelf_id(raw), **raw) # type: ignore[misc] write_shelf_entry(root, entry) loaded = _load_shelf(root) assert len(loaded) == 1 assert len(loaded[0]["snapshot"]) == 500 # --------------------------------------------------------------------------- # Docstrings # --------------------------------------------------------------------------- class TestDocstrings: def test_module_docstring(self) -> None: import muse.cli.commands.shelf as m assert m.__doc__ def test_run_save_docstring(self) -> None: from muse.cli.commands.shelf import run_save assert run_save.__doc__ def test_run_list_docstring(self) -> None: from muse.cli.commands.shelf import run_list assert run_list.__doc__ def test_run_read_docstring(self) -> None: from muse.cli.commands.shelf import run_read assert run_read.__doc__ def test_run_apply_docstring(self) -> None: from muse.cli.commands.shelf import run_apply assert run_apply.__doc__ def test_run_pop_docstring(self) -> None: from muse.cli.commands.shelf import run_pop assert run_pop.__doc__ def test_run_drop_docstring(self) -> None: from muse.cli.commands.shelf import run_drop assert run_drop.__doc__ def test_run_diff_docstring(self) -> None: from muse.cli.commands.shelf import run_diff assert run_diff.__doc__ def test_shelf_push_programmatic_docstring(self) -> None: from muse.cli.commands.shelf import _shelf_push_programmatic assert _shelf_push_programmatic.__doc__ def test_shelf_pop_programmatic_docstring(self) -> None: from muse.cli.commands.shelf import _shelf_pop_programmatic assert _shelf_pop_programmatic.__doc__