"""Comprehensive tests for ``muse stash``. Covers: - Unit: _load_stash / _save_stash atomic write, size guard - Integration: stash → pop, list, drop - E2E: full CLI via CliRunner - Security: stash.json size limit, atomic writes, sanitized output - Stress: many stash entries, repeated save/load """ from __future__ import annotations import datetime import json import pathlib import uuid import pytest from tests.cli_test_helper import CliRunner cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: muse_dir = tmp_path / ".muse" muse_dir.mkdir() repo_id = str(uuid.uuid4()) (muse_dir / "repo.json").write_text(json.dumps({ "repo_id": repo_id, "domain": "code", "default_branch": "main", "created_at": "2025-01-01T00:00:00+00:00", }), encoding="utf-8") (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse_dir / "refs" / "heads").mkdir(parents=True) (muse_dir / "snapshots").mkdir() (muse_dir / "commits").mkdir() (muse_dir / "objects").mkdir() return tmp_path, repo_id def _make_commit(root: pathlib.Path, repo_id: str, message: str = "init") -> str: from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot from muse.core.snapshot import compute_snapshot_id, compute_commit_id ref_file = root / ".muse" / "refs" / "heads" / "main" parent_id = ref_file.read_text().strip() if ref_file.exists() else None manifest: Manifest = {} snap_id = compute_snapshot_id(manifest) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) write_commit(root, CommitRecord( commit_id=commit_id, repo_id=repo_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id # --------------------------------------------------------------------------- # Unit tests # --------------------------------------------------------------------------- class TestStashUnit: def test_load_stash_empty(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.stash import _load_stash assert _load_stash(root) == [] def test_save_and_load_stash_roundtrip(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.stash import _load_stash, _save_stash, StashEntry entry = StashEntry( snapshot_id="a" * 64, delta={"file.mid": "b" * 64}, deleted=[], branch="main", stashed_at="2025-01-01T00:00:00+00:00", message=None, ) _save_stash(root, [entry]) loaded = _load_stash(root) assert len(loaded) == 1 assert loaded[0]["snapshot_id"] == "a" * 64 assert loaded[0]["branch"] == "main" assert loaded[0]["delta"] == {"file.mid": "b" * 64} assert loaded[0]["deleted"] == [] def test_save_stash_is_atomic(self, tmp_path: pathlib.Path) -> None: """After _save_stash, no temp files should remain in .muse/.""" root, _ = _init_repo(tmp_path) from muse.cli.commands.stash import _save_stash, StashEntry entry = StashEntry( snapshot_id="c" * 64, delta={}, deleted=[], branch="dev", stashed_at="2025-01-01T00:00:00+00:00", message=None, ) _save_stash(root, [entry]) tmp_files = list((root / ".muse").glob(".stash_tmp_*")) assert tmp_files == [] assert (root / ".muse" / "stash.json").exists() def test_load_stash_ignores_oversized_file(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) stash_path = root / ".muse" / "stash.json" stash_path.write_bytes(b"x" * (65 * 1024 * 1024)) # 65 MiB > 64 MiB limit from muse.cli.commands.stash import _load_stash result = _load_stash(root) assert result == [] # --------------------------------------------------------------------------- # Integration tests # --------------------------------------------------------------------------- class TestStashIntegration: def test_stash_with_no_changes_reports_nothing(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["stash"], env=_env(root), catch_exceptions=False) assert "Nothing to stash" in result.output def test_stash_list_empty(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["stash", "list"], env=_env(root), catch_exceptions=False) assert "No stash entries" in result.output def test_stash_pop_empty_fails(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["stash", "pop"], env=_env(root)) assert result.exit_code != 0 def test_stash_drop_empty_fails(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["stash", "drop"], env=_env(root)) assert result.exit_code != 0 def test_stash_list_shows_entries(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.stash import _save_stash, StashEntry _save_stash(root, [ StashEntry(snapshot_id="a" * 64, delta={}, deleted=[], branch="main", stashed_at="2025-01-01T00:00:00+00:00", message=None), ]) result = runner.invoke(cli, ["stash", "list"], env=_env(root), catch_exceptions=False) assert "stash@{0}" in result.output assert "main" in result.output # --------------------------------------------------------------------------- # Security tests # --------------------------------------------------------------------------- class TestStashSecurity: def test_stash_list_sanitizes_branch_name_with_control_chars( self, tmp_path: pathlib.Path ) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.stash import _save_stash, StashEntry malicious_branch = "feat/\x1b[31mred\x1b[0m" _save_stash(root, [ StashEntry(snapshot_id="a" * 64, delta={}, deleted=[], branch=malicious_branch, stashed_at="2025-01-01T00:00:00+00:00", message=None), ]) result = runner.invoke(cli, ["stash", "list"], env=_env(root), catch_exceptions=False) assert result.exit_code == 0 assert "\x1b" not in result.output def test_stash_pop_sanitizes_branch_name(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) from muse.cli.commands.stash import _save_stash, StashEntry _save_stash(root, [ StashEntry(snapshot_id="a" * 64, delta={}, deleted=[], branch="feat/\x1b[31mred\x1b[0m", stashed_at="2025-01-01T00:00:00+00:00", message=None), ]) result = runner.invoke(cli, ["stash", "pop"], env=_env(root), catch_exceptions=False) assert "\x1b" not in result.output # --------------------------------------------------------------------------- # Stress tests # --------------------------------------------------------------------------- class TestStashStress: def test_many_stash_entries_save_load(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.stash import _save_stash, _load_stash, StashEntry entries = [ StashEntry(snapshot_id=f"{'a' * 63}{i % 10}", delta={"file.mid": "b" * 64}, deleted=[], branch="main", stashed_at=f"2025-01-{i % 28 + 1:02d}T00:00:00+00:00", message=None) for i in range(50) ] _save_stash(root, entries) loaded = _load_stash(root) assert len(loaded) == 50 def test_repeated_save_load_no_corruption(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.stash import _save_stash, _load_stash, StashEntry for i in range(20): entry = StashEntry(snapshot_id=f"{'b' * 63}{i % 10}", delta={}, deleted=[], branch="main", stashed_at="2025-01-01T00:00:00+00:00", message=None) loaded = _load_stash(root) loaded.insert(0, entry) _save_stash(root, loaded) final = _load_stash(root) assert len(final) == 20 # --------------------------------------------------------------------------- # Regression tests — delta-based stash correctness # --------------------------------------------------------------------------- class TestStashDeltaRegression: """Regression tests for the stash full-snapshot bug. Before the fix, ``muse stash`` saved the entire working tree (all tracked files) and ``stash pop`` wrote ALL of them back — overwriting any files that had been committed since the stash was created with their older stashed versions. After the fix, only the delta (files that differ from HEAD) is saved and restored, so intervening commits are never clobbered. """ def _make_commit_with_files( self, root: pathlib.Path, repo_id: str, files: Manifest, message: str = "commit", ) -> str: """Write *files* to disk and record a commit whose snapshot includes them.""" from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot from muse.core.snapshot import compute_snapshot_id, compute_commit_id from muse.core.object_store import write_object_from_path import datetime for rel, content in files.items(): p = root / rel p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content, encoding="utf-8") manifest: Manifest = {} for rel in files: p = root / rel import hashlib data = p.read_bytes() oid = hashlib.sha256(data).hexdigest() manifest[rel] = oid write_object_from_path(root, oid, p) ref_file = root / ".muse" / "refs" / "heads" / "main" parent_id = ref_file.read_text().strip() if ref_file.exists() and ref_file.stat().st_size else None snap_id = compute_snapshot_id(manifest) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) write_commit(root, CommitRecord( commit_id=commit_id, repo_id=repo_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id def test_stash_saves_only_delta_not_full_snapshot( self, tmp_path: pathlib.Path ) -> None: """Stash entry delta must contain only changed files, not all tracked files.""" root, repo_id = _init_repo(tmp_path) # Commit two files. self._make_commit_with_files(root, repo_id, { "stable.py": "unchanged content\n", "changing.py": "original content\n", }) # Modify only one. (root / "changing.py").write_text("modified content\n", encoding="utf-8") result = runner.invoke(cli, ["stash"], env=_env(root), catch_exceptions=False) assert result.exit_code == 0 from muse.cli.commands.stash import _load_stash entries = _load_stash(root) assert len(entries) == 1 # Delta must contain only the changed file. assert "changing.py" in entries[0]["delta"] assert "stable.py" not in entries[0]["delta"] assert entries[0]["deleted"] == [] def test_stash_pop_does_not_overwrite_newer_commits( self, tmp_path: pathlib.Path ) -> None: """Pop must not clobber files that were committed after the stash was created. Sequence: 1. Commit file_a="v1" and file_b="original" 2. Modify file_b only → stash 3. Commit file_a="v2" (new committed version) 4. Pop stash 5. Assert file_a == "v2" (committed version preserved) 6. Assert file_b == "modified" (stashed change restored) """ root, repo_id = _init_repo(tmp_path) # Step 1: initial commit. self._make_commit_with_files(root, repo_id, { "file_a.py": "v1\n", "file_b.py": "original\n", }, message="initial") # Step 2: modify file_b and stash. (root / "file_b.py").write_text("modified\n", encoding="utf-8") stash_result = runner.invoke(cli, ["stash"], env=_env(root), catch_exceptions=False) assert stash_result.exit_code == 0 # After stash, working tree is at HEAD — file_b back to "original". assert (root / "file_b.py").read_text() == "original\n" # Step 3: commit a new version of file_a (simulates work done after stash). self._make_commit_with_files(root, repo_id, { "file_a.py": "v2\n", "file_b.py": "original\n", }, message="post-stash commit") assert (root / "file_a.py").read_text() == "v2\n" # Step 4: pop the stash. pop_result = runner.invoke(cli, ["stash", "pop"], env=_env(root), catch_exceptions=False) assert pop_result.exit_code == 0 # Step 5 & 6: verify. assert (root / "file_a.py").read_text() == "v2\n", ( "stash pop overwrote file_a with its stashed version — regression!" ) assert (root / "file_b.py").read_text() == "modified\n", ( "stash pop did not restore file_b" ) def test_stash_pop_restores_deleted_files(self, tmp_path: pathlib.Path) -> None: """A file deleted before stashing must be re-deleted on pop.""" root, repo_id = _init_repo(tmp_path) self._make_commit_with_files(root, repo_id, { "keep.py": "keep\n", "to_delete.py": "gone\n", }) # Delete one file before stashing. (root / "to_delete.py").unlink() runner.invoke(cli, ["stash"], env=_env(root), catch_exceptions=False) # After stash, HEAD is restored — to_delete.py is back. assert (root / "to_delete.py").exists() runner.invoke(cli, ["stash", "pop"], env=_env(root), catch_exceptions=False) assert not (root / "to_delete.py").exists(), ( "stash pop did not re-delete the file that was deleted before stashing" ) assert (root / "keep.py").exists() def test_stash_show_lists_only_changed_files(self, tmp_path: pathlib.Path) -> None: """stash show must list only changed files, not the entire working tree.""" root, repo_id = _init_repo(tmp_path) self._make_commit_with_files(root, repo_id, { "a.py": "a\n", "b.py": "b\n", "c.py": "c\n", }) # Modify only one file. (root / "b.py").write_text("b modified\n", encoding="utf-8") runner.invoke(cli, ["stash"], env=_env(root), catch_exceptions=False) result = runner.invoke( cli, ["stash", "show", "--json"], env=_env(root), catch_exceptions=False ) assert result.exit_code == 0 data = json.loads(result.output) assert data["files_count"] == 1 assert data["files"] == ["b.py"] # Unchanged files must not appear. assert "a.py" not in data["files"] assert "c.py" not in data["files"] def test_stash_files_count_is_delta_size(self, tmp_path: pathlib.Path) -> None: """files_count in stash output reflects the delta, not the full repo.""" root, repo_id = _init_repo(tmp_path) self._make_commit_with_files(root, repo_id, { "x.py": "x\n", "y.py": "y\n", "z.py": "z\n", }) (root / "x.py").write_text("x changed\n", encoding="utf-8") result = runner.invoke( cli, ["stash", "--json"], env=_env(root), catch_exceptions=False ) assert result.exit_code == 0 data = json.loads(result.output) assert data["files_count"] == 1 # only x.py changed