"""I-9 — Crash safety: SIGKILL simulation and startup GC sweep. Validates three guarantees: 1. **Startup GC correctness** — :func:`muse.core.repo.require_repo` sweeps every stale temp-file family on the next command after a crash: * ``.obj-tmp-*`` / ``.restore-tmp-*`` — object-store shard directories * ``.muse-tmp-*`` — store/config writes in ``.muse/`` subdirectories * ``.stat_cache_*.tmp`` — :class:`~muse.core.stat_cache.StatCache` 2. **SIGKILL safety at timing windows** — a process killed at T+50 ms, T+100 ms, and T+200 ms into a write sequence leaves the repository in a consistent state. The subsequent startup GC removes any orphan temps so the *next* ``muse commit`` succeeds. 3. **Push idempotency under SIGKILL** — a partial push leaves no corruption on the remote because :func:`~muse.core.object_store.write_object` is content-addressed and atomic; incomplete object writes are swept by the remote-side startup GC. Test classes ------------ * ``TestCleanupMuseDirTemps`` — unit tests for ``_cleanup_muse_dir_temps`` * ``TestStartupGcObjectTemps`` — object-store orphan files swept by GC * ``TestStartupGcMuseTemps`` — ``.muse-tmp-*`` files swept by GC * ``TestStartupGcStatCacheTemps`` — ``.stat_cache_*.tmp`` swept by GC * ``TestRequireRepoCallsGc`` — ``require_repo`` triggers the sweep * ``TestMultipleSigkills`` — stale files from *N* crashes all swept * ``TestSigkillAtTimingWindows`` — subprocess SIGKILL at T+50/100/200 ms * ``TestSigkillDuringCommit`` — full CLI commit survives SIGKILL * ``TestSigkillDuringPush`` — push path is idempotent under SIGKILL * ``TestGcPreservesRealObjects`` — GC never deletes valid stored objects * ``TestGcSweeperPerformance`` — sweep ≤ 10 ms with 1 000 stale files * ``TestRestoreTempWorkdirBound`` — restore-tmp in workdir: documented scope """ from __future__ import annotations import hashlib import multiprocessing import os import pathlib import signal import tempfile import time import pytest from muse.core.types import blob_id from muse.core.object_store import ( cleanup_stale_object_temps, object_path, objects_dir, read_object, write_object, ) from muse.core.paths import commits_dir, muse_dir, objects_dir, releases_dir, stat_cache_path, tags_dir from muse.core.repo import ( _MUSE_TEMP_PREFIXES, _MUSE_SWEEP_DIRS, _cleanup_muse_dir_temps, _startup_gc, require_repo, ) from muse.core.io import write_text_atomic # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal .muse/ layout for unit tests.""" muse = muse_dir(tmp_path) muse.mkdir() (muse / "commits").mkdir() (muse / "snapshots").mkdir() (muse / "branches").mkdir() (muse / "refs").mkdir() (muse / "refs" / "heads").mkdir() (muse / "objects").mkdir() return tmp_path def _oid(data: bytes) -> str: return blob_id(data) def _shard(repo: pathlib.Path, prefix: str) -> pathlib.Path: """Return the canonical shard directory for a two-char hex prefix.""" return objects_dir(repo) / "sha256" / prefix def _plant_stale_muse_tmp(muse_dir: pathlib.Path, subdir: str = "") -> pathlib.Path: """Create a fake .muse-tmp-* file as would be left by a SIGKILL'd write_text_atomic.""" target = muse_dir / subdir if subdir else muse_dir target.mkdir(parents=True, exist_ok=True) fd, path = tempfile.mkstemp(dir=target, prefix=".muse-tmp-") os.close(fd) pathlib.Path(path).write_text("partial content", encoding="utf-8") return pathlib.Path(path) def _plant_stale_stat_cache_tmp(muse_dir: pathlib.Path) -> pathlib.Path: """Create a fake .stat_cache_*.tmp file as would be left by a SIGKILL'd StatCache.save.""" cache = muse_dir / "cache" cache.mkdir(exist_ok=True) fd, path = tempfile.mkstemp(dir=cache, prefix=".stat_cache_", suffix=".tmp") os.close(fd) pathlib.Path(path).write_bytes(b"\x00" * 64) return pathlib.Path(path) def _make_stale(path: pathlib.Path) -> pathlib.Path: """Backdate *path* mtime past the 60-second age gate in cleanup_stale_object_temps. cleanup_stale_object_temps skips files younger than _CLEANUP_MIN_AGE_SECS (60 s). Setting mtime to the Unix epoch (1970-01-01) makes freshly-created temp files look decades old so cleanup picks them up immediately in tests. """ os.utime(str(path), (0, 0)) return path def _plant_stale_obj_tmp(objects_shard: pathlib.Path) -> pathlib.Path: """Create a fake .obj-tmp-* file as would be left by a SIGKILL'd write_object.""" fd, path = tempfile.mkstemp(dir=objects_shard, prefix=".obj-tmp-") os.close(fd) pathlib.Path(path).write_bytes(b"partial object bytes") return _make_stale(pathlib.Path(path)) def _plant_stale_restore_tmp(shard: pathlib.Path) -> pathlib.Path: """Create a fake .restore-tmp-* file as would be left by a SIGKILL'd restore_object.""" fd, path = tempfile.mkstemp(dir=shard, prefix=".restore-tmp-") os.close(fd) pathlib.Path(path).write_bytes(b"partial restore") return _make_stale(pathlib.Path(path)) def _count_stale_files(repo_root: pathlib.Path) -> int: """Count all stale temp files in .muse/ (all families).""" muse = muse_dir(repo_root) total = 0 for f in muse.rglob("*"): if f.is_file() and any( f.name.startswith(p) for p in (".muse-tmp-", ".stat_cache_", ".obj-tmp-", ".restore-tmp-") ): total += 1 return total # --------------------------------------------------------------------------- # Subprocess workers — defined at module level for picklability under "spawn" # --------------------------------------------------------------------------- def _write_objects_worker(root: pathlib.Path, count: int) -> None: """Write objects in a tight loop — killed midway to simulate SIGKILL.""" import time as _t from muse.core.types import blob_id from muse.core.object_store import write_object as _wo for i in range(count): payload = f"sigkill-object-{i:06d}".encode() oid = blob_id(payload) _wo(root, oid, payload) _t.sleep(0.0002) def _write_store_worker(root: pathlib.Path, count: int) -> None: """Write commit-dir text atomically in a loop — killed midway.""" import time as _t from muse.core.io import write_text_atomic as _wta for i in range(count): path = commits_dir(root) / f"fake-{i:06d}.msgpack" _wta(path, f"fake commit {i}") _t.sleep(0.0002) def _full_commit_worker(repo_path: pathlib.Path, commit_msg: str) -> None: """Run `muse commit -m ` in a subprocess target (for SIGKILL testing).""" import subprocess import sys as _sys subprocess.run( ["muse", "commit", "-m", commit_msg], cwd=str(repo_path), stdout=_sys.stdout, stderr=_sys.stderr, ) # --------------------------------------------------------------------------- # 1. _cleanup_muse_dir_temps — unit tests # --------------------------------------------------------------------------- class TestCleanupMuseDirTemps: """Unit tests for the _cleanup_muse_dir_temps helper.""" def test_removes_muse_tmp_from_root(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() stale = _plant_stale_muse_tmp(muse) assert stale.exists() removed = _cleanup_muse_dir_temps(muse) assert removed == 1 assert not stale.exists() def test_removes_muse_tmp_from_commits_subdir(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() stale = _plant_stale_muse_tmp(muse, "commits") removed = _cleanup_muse_dir_temps(muse) assert removed == 1 assert not stale.exists() def test_removes_muse_tmp_from_branches_subdir(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() (muse / "branches").mkdir() stale = _plant_stale_muse_tmp(muse, "branches") removed = _cleanup_muse_dir_temps(muse) assert removed == 1 assert not stale.exists() def test_removes_muse_tmp_from_snapshots_subdir(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() (muse / "snapshots").mkdir() stale = _plant_stale_muse_tmp(muse, "snapshots") removed = _cleanup_muse_dir_temps(muse) assert removed == 1 assert not stale.exists() def test_removes_stat_cache_tmp(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() stale = _plant_stale_stat_cache_tmp(muse) removed = _cleanup_muse_dir_temps(muse) assert removed == 1 assert not stale.exists() def test_preserves_real_muse_files(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() (muse / "commits").mkdir() # Real files must NOT be deleted real_head = muse / "HEAD" real_head.write_text("ref: refs/heads/main", encoding="utf-8") real_commit = muse / "commits" / "abc123.msgpack" real_commit.write_bytes(b"fake msgpack") real_config = muse / "config.toml" real_config.write_text("[core]\n", encoding="utf-8") removed = _cleanup_muse_dir_temps(muse) assert removed == 0 assert real_head.exists() assert real_commit.exists() assert real_config.exists() def test_multiple_stale_files_across_subdirs(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() stale: list[pathlib.Path] = [] stale.append(_plant_stale_muse_tmp(muse)) stale.append(_plant_stale_muse_tmp(muse, "commits")) stale.append(_plant_stale_muse_tmp(muse, "snapshots")) stale.append(_plant_stale_stat_cache_tmp(muse)) removed = _cleanup_muse_dir_temps(muse) assert removed == 4 for f in stale: assert not f.exists() def test_nonexistent_muse_dir_returns_zero(self, tmp_path: pathlib.Path) -> None: result = _cleanup_muse_dir_temps(muse_dir(tmp_path)) assert result == 0 def test_missing_subdir_is_skipped_silently(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() # Only root exists; subdirs (branches, commits, …) do not stale = _plant_stale_muse_tmp(muse) removed = _cleanup_muse_dir_temps(muse) assert removed == 1 assert not stale.exists() def test_idempotent_second_call(self, tmp_path: pathlib.Path) -> None: muse = muse_dir(tmp_path) muse.mkdir() _plant_stale_muse_tmp(muse) _cleanup_muse_dir_temps(muse) # Second call on clean dir must not raise and return 0 removed = _cleanup_muse_dir_temps(muse) assert removed == 0 def test_temp_prefixes_constant_non_empty(self) -> None: """_MUSE_TEMP_PREFIXES is exported and non-empty — agents can introspect it.""" assert len(_MUSE_TEMP_PREFIXES) >= 2 assert ".muse-tmp-" in _MUSE_TEMP_PREFIXES assert ".stat_cache_" in _MUSE_TEMP_PREFIXES def test_sweep_dirs_constant_includes_all_write_sites(self) -> None: """_MUSE_SWEEP_DIRS covers every directory that uses write_text_atomic / _write_shelf_header_atomic.""" required = {"", "branches", "commits", "snapshots", "tags", "releases"} assert required.issubset(set(_MUSE_SWEEP_DIRS)) # --------------------------------------------------------------------------- # 2. Startup GC — object-store orphans # --------------------------------------------------------------------------- class TestStartupGcObjectTemps: """Object-store stale temps (.obj-tmp-*, .restore-tmp-*) are swept by startup GC.""" def test_obj_tmp_removed_by_cleanup(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) shard = _shard(repo, "ab") shard.mkdir(parents=True, exist_ok=True) stale = _plant_stale_obj_tmp(shard) assert stale.exists() removed = cleanup_stale_object_temps(repo) assert removed >= 1 assert not stale.exists() def test_restore_tmp_removed_by_cleanup(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) shard = _shard(repo, "cd") shard.mkdir(parents=True, exist_ok=True) stale = _plant_stale_restore_tmp(shard) removed = cleanup_stale_object_temps(repo) assert removed >= 1 assert not stale.exists() def test_startup_gc_delegates_to_object_cleanup(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) shard = _shard(repo, "ef") shard.mkdir(parents=True, exist_ok=True) stale_obj = _plant_stale_obj_tmp(shard) stale_restore = _plant_stale_restore_tmp(shard) _startup_gc(repo) assert not stale_obj.exists() assert not stale_restore.exists() def test_real_objects_preserved_by_startup_gc(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"real object content" oid = _oid(data) write_object(repo, oid, data) _startup_gc(repo) result = read_object(repo, oid) assert result == data def test_stale_and_real_coexist_only_stale_removed(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"survivor" oid = _oid(data) write_object(repo, oid, data) # Plant stale temp in same shard as the real object shard = object_path(repo, oid).parent stale = _plant_stale_obj_tmp(shard) _startup_gc(repo) assert not stale.exists() assert read_object(repo, oid) == data # --------------------------------------------------------------------------- # 3. Startup GC — .muse-tmp-* in subdirectories # --------------------------------------------------------------------------- class TestStartupGcMuseTemps: """.muse-tmp-* files in all .muse/ subdirs are swept by _startup_gc.""" def test_muse_tmp_in_root_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stale = _plant_stale_muse_tmp(muse_dir(repo)) _startup_gc(repo) assert not stale.exists() def test_muse_tmp_in_commits_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stale = _plant_stale_muse_tmp(muse_dir(repo), "commits") _startup_gc(repo) assert not stale.exists() def test_muse_tmp_in_branches_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stale = _plant_stale_muse_tmp(muse_dir(repo), "branches") _startup_gc(repo) assert not stale.exists() def test_muse_tmp_in_snapshots_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stale = _plant_stale_muse_tmp(muse_dir(repo), "snapshots") _startup_gc(repo) assert not stale.exists() def test_muse_tmp_in_tags_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) (tags_dir(repo)).mkdir() stale = _plant_stale_muse_tmp(muse_dir(repo), "tags") _startup_gc(repo) assert not stale.exists() def test_muse_tmp_in_releases_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) (releases_dir(repo)).mkdir() stale = _plant_stale_muse_tmp(muse_dir(repo), "releases") _startup_gc(repo) assert not stale.exists() def test_legacy_file_in_commits_preserved(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) real = commits_dir(repo) / "deadbeef.msgpack" real.write_bytes(b"\x82\xa9commit_id\xa8deadbeef") _startup_gc(repo) assert real.exists() # --------------------------------------------------------------------------- # 4. Startup GC — .stat_cache_*.tmp # --------------------------------------------------------------------------- class TestStartupGcStatCacheTemps: """.stat_cache_*.tmp files (StatCache.save) are swept by _startup_gc.""" def test_stat_cache_tmp_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stale = _plant_stale_stat_cache_tmp(muse_dir(repo)) _startup_gc(repo) assert not stale.exists() def test_real_stat_cache_preserved(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) real = stat_cache_path(repo) real.parent.mkdir(parents=True, exist_ok=True) real.write_bytes(b"\x82\xa7version\x02") _startup_gc(repo) assert real.exists() def test_multiple_stat_cache_tmps_all_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stales = [_plant_stale_stat_cache_tmp(muse_dir(repo)) for _ in range(5)] _startup_gc(repo) for s in stales: assert not s.exists() # --------------------------------------------------------------------------- # 5. require_repo calls the startup GC # --------------------------------------------------------------------------- class TestRequireRepoCallsGc: """require_repo() triggers the full startup GC sweep.""" def test_require_repo_removes_obj_tmp(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) shard = _shard(repo, "aa") shard.mkdir(parents=True, exist_ok=True) stale = _plant_stale_obj_tmp(shard) # Call require_repo with MUSE_REPO_ROOT override so it finds the repo ctx = require_repo(start=repo) assert ctx == repo assert not stale.exists() def test_require_repo_removes_muse_tmp(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stale = _plant_stale_muse_tmp(muse_dir(repo)) require_repo(start=repo) assert not stale.exists() def test_require_repo_removes_stat_cache_tmp(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) stale = _plant_stale_stat_cache_tmp(muse_dir(repo)) require_repo(start=repo) assert not stale.exists() def test_require_repo_sweeps_all_families_at_once(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) shard = _shard(repo, "bb") shard.mkdir(parents=True, exist_ok=True) f1 = _plant_stale_obj_tmp(shard) f2 = _plant_stale_restore_tmp(shard) f3 = _plant_stale_muse_tmp(muse_dir(repo)) f4 = _plant_stale_muse_tmp(muse_dir(repo), "commits") f5 = _plant_stale_stat_cache_tmp(muse_dir(repo)) require_repo(start=repo) for f in (f1, f2, f3, f4, f5): assert not f.exists(), f"{f.name} should have been swept" def test_require_repo_not_in_repo_still_raises(self, tmp_path: pathlib.Path) -> None: """require_repo on a non-repo path still exits — GC is not run on miss.""" with pytest.raises(SystemExit): require_repo(start=tmp_path) # --------------------------------------------------------------------------- # 6. Multiple consecutive SIGKILLs — accumulated stale files all swept # --------------------------------------------------------------------------- class TestMultipleSigkills: """Simulate N crashes: stale files from each accumulate and are all swept.""" def test_three_crash_generations_all_swept(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) muse = muse_dir(repo) shard = _shard(repo, "cc") shard.mkdir(parents=True, exist_ok=True) # Simulate 3 separate crashes leaving stale files from each family. stales: list[pathlib.Path] = [] for _ in range(3): stales.append(_plant_stale_obj_tmp(shard)) stales.append(_plant_stale_restore_tmp(shard)) stales.append(_plant_stale_muse_tmp(muse)) stales.append(_plant_stale_muse_tmp(muse, "commits")) stales.append(_plant_stale_stat_cache_tmp(muse)) assert len(stales) == 15 _startup_gc(repo) for f in stales: assert not f.exists(), f"Stale file survived: {f.name}" def test_gc_count_is_accurate(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) muse = muse_dir(repo) (muse / "tags").mkdir() for _ in range(4): _plant_stale_muse_tmp(muse) for _ in range(3): _plant_stale_stat_cache_tmp(muse) # 7 total stale files in muse dir removed = _cleanup_muse_dir_temps(muse) assert removed == 7 assert _count_stale_files(repo) == 0 # --------------------------------------------------------------------------- # 7. SIGKILL at T+50ms / T+100ms / T+200ms — object-store write sequence # --------------------------------------------------------------------------- class TestSigkillAtTimingWindows: """Subprocess SIGKILL at precise timing windows: store stays consistent.""" @pytest.mark.slow @pytest.mark.parametrize("delay_ms", [50, 100, 200]) def test_object_store_consistent_after_sigkill( self, tmp_path: pathlib.Path, delay_ms: int ) -> None: repo = _repo(tmp_path) # Pre-write 10 known objects before the crashable process starts. pre_data: list[tuple[str, bytes]] = [] for i in range(10): payload = f"pre-kill-{i:03d}".encode() oid = _oid(payload) write_object(repo, oid, payload) pre_data.append((oid, payload)) # Spawn a fresh process that writes objects in a tight loop. ctx = multiprocessing.get_context("spawn") proc = ctx.Process(target=_write_objects_worker, args=(repo, 2000)) proc.start() # Kill it at the specified timing window. time.sleep(delay_ms / 1000.0) if proc.is_alive(): assert proc.pid is not None os.kill(proc.pid, signal.SIGKILL) proc.join(timeout=5) # Startup GC: simulates the next command after the crash. _startup_gc(repo) # Every pre-kill object must still be readable and hash-verified. # (Stale temp cleanup is not asserted here — cleanup_stale_object_temps # has a 60-second age gate to protect concurrent in-progress writes, so # a temp file created <60 s ago is intentionally left alone. That # guarantee is covered by TestStartupGcObjectTemps with backdated mtimes.) for oid, payload in pre_data: assert read_object(repo, oid) == payload, ( f"Pre-kill object {oid[:8]} corrupted after SIGKILL at T+{delay_ms}ms" ) @pytest.mark.slow @pytest.mark.parametrize("delay_ms", [50, 100, 200]) def test_store_write_consistent_after_sigkill( self, tmp_path: pathlib.Path, delay_ms: int ) -> None: """SIGKILL during write_text_atomic loop leaves no stale .muse-tmp-* files.""" repo = _repo(tmp_path) ctx = multiprocessing.get_context("spawn") proc = ctx.Process(target=_write_store_worker, args=(repo, 2000)) proc.start() time.sleep(delay_ms / 1000.0) if proc.is_alive(): assert proc.pid is not None os.kill(proc.pid, signal.SIGKILL) proc.join(timeout=5) # Startup GC sweep. _startup_gc(repo) # No .muse-tmp-* files may survive. muse = muse_dir(repo) leftovers = list(muse.rglob(".muse-tmp-*")) assert leftovers == [], ( f"Stale .muse-tmp-* survived SIGKILL at T+{delay_ms}ms: {leftovers}" ) # --------------------------------------------------------------------------- # 8. Full CLI commit survives SIGKILL # --------------------------------------------------------------------------- class TestSigkillDuringCommit: """End-to-end: SIGKILL during `muse commit` leaves repo in a recoverable state.""" def _init_real_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: """Create a minimal real muse repo with a committed file.""" import subprocess env = os.environ.copy() env["MUSE_REPO_ROOT"] = str(tmp_path) subprocess.run(["muse", "init"], cwd=str(tmp_path), env=env, check=True, capture_output=True) # Stage and commit a file so there is a valid HEAD. (tmp_path / "file.txt").write_text("hello", encoding="utf-8") subprocess.run(["muse", "code", "add", "."], cwd=str(tmp_path), env=env, check=True, capture_output=True) subprocess.run(["muse", "commit", "-m", "init"], cwd=str(tmp_path), env=env, check=True, capture_output=True) return tmp_path @pytest.mark.slow def test_muse_status_runs_after_sigkill(self, tmp_path: pathlib.Path) -> None: """`muse status` must exit cleanly (exit 0) after a SIGKILL'd commit.""" import subprocess repo = self._init_real_repo(tmp_path) # Modify a file so there is something to commit. (repo / "file.txt").write_text("changed", encoding="utf-8") subprocess.run(["muse", "code", "add", "."], cwd=str(repo), capture_output=True) # Spawn commit subprocess and kill it immediately. ctx = multiprocessing.get_context("spawn") proc = ctx.Process(target=_full_commit_worker, args=(repo, "crash-me")) proc.start() time.sleep(0.05) if proc.is_alive(): assert proc.pid is not None os.kill(proc.pid, signal.SIGKILL) proc.join(timeout=5) # Startup GC runs on next require_repo invocation (muse status triggers it). result = subprocess.run( ["muse", "status"], cwd=str(repo), capture_output=True, text=True, ) # status must exit 0; any non-zero means repo is corrupt. assert result.returncode == 0, ( f"muse status failed after SIGKILL:\n{result.stdout}\n{result.stderr}" ) @pytest.mark.slow def test_no_stale_temps_after_sigkill_and_next_command( self, tmp_path: pathlib.Path ) -> None: """After SIGKILL + muse status, zero stale temps remain in .muse/.""" import subprocess repo = self._init_real_repo(tmp_path) (repo / "file.txt").write_text("changed again", encoding="utf-8") subprocess.run(["muse", "code", "add", "."], cwd=str(repo), capture_output=True) ctx = multiprocessing.get_context("spawn") proc = ctx.Process(target=_full_commit_worker, args=(repo, "crash-me-2")) proc.start() time.sleep(0.05) if proc.is_alive(): assert proc.pid is not None os.kill(proc.pid, signal.SIGKILL) proc.join(timeout=5) # Trigger startup GC via the next command. subprocess.run(["muse", "status"], cwd=str(repo), capture_output=True) assert _count_stale_files(repo) == 0, "Stale temps remain after startup GC" # --------------------------------------------------------------------------- # 9. Push path idempotency under SIGKILL # --------------------------------------------------------------------------- class TestSigkillDuringPush: """SIGKILL during push writes leaves no corruption: write_object is atomic.""" @pytest.mark.slow def test_push_objects_atomic_under_sigkill(self, tmp_path: pathlib.Path) -> None: """Objects pushed before kill are readable; partial objects are absent.""" (tmp_path / "local").mkdir() (tmp_path / "remote").mkdir() local = _repo(tmp_path / "local") remote = _repo(tmp_path / "remote") # Write 10 objects to local and also push them to remote before kill. pre_data: list[tuple[str, bytes]] = [] for i in range(10): payload = f"push-pre-kill-{i}".encode() oid = _oid(payload) write_object(local, oid, payload) write_object(remote, oid, payload) # simulate push of pre-kill objects pre_data.append((oid, payload)) # Spawn a process that keeps writing objects to the remote store. ctx = multiprocessing.get_context("spawn") proc = ctx.Process(target=_write_objects_worker, args=(remote, 2000)) proc.start() time.sleep(0.08) # T+80ms kill if proc.is_alive(): assert proc.pid is not None os.kill(proc.pid, signal.SIGKILL) proc.join(timeout=5) # Backdate all temp files left by the killed process — the 60-second # age gate in cleanup_stale_object_temps skips fresh files to protect # concurrent writers; in tests we fast-forward mtime to simulate aging. for f in (muse_dir(remote)).rglob("*"): if f.is_file() and any( f.name.startswith(p) for p in (".obj-tmp-", ".restore-tmp-", ".muse-tmp-", ".stat_cache_") ): os.utime(f, (0, 0)) # Simulate remote-side startup GC (next muse command on the remote). _startup_gc(remote) # Remote must have no stale temps. assert _count_stale_files(remote) == 0 # All pre-kill objects on remote must be intact. for oid, payload in pre_data: assert read_object(remote, oid) == payload, ( f"Remote object {oid[:8]} corrupted after push SIGKILL" ) def test_push_write_object_is_idempotent(self, tmp_path: pathlib.Path) -> None: """write_object called twice for same OID returns False (skip) both times.""" repo = _repo(tmp_path) payload = b"idempotent object" oid = _oid(payload) first = write_object(repo, oid, payload) second = write_object(repo, oid, payload) assert first is True assert second is False assert read_object(repo, oid) == payload def test_partial_write_interrupted_at_os_level_leaves_no_dest( self, tmp_path: pathlib.Path ) -> None: """The mkstemp→replace contract: if replace never happens, dest is absent.""" repo = _repo(tmp_path) data = b"will be interrupted" oid = _oid(data) # Manually plant the stale temp (simulates SIGKILL after mkstemp, before replace) shard = object_path(repo, oid).parent shard.mkdir(parents=True, exist_ok=True) stale = _plant_stale_obj_tmp(shard) # The destination object must NOT exist (replace never happened). from muse.core.object_store import has_object assert not has_object(repo, oid) # Cleanup removes the stale temp. cleanup_stale_object_temps(repo) assert not stale.exists() # A fresh write_object succeeds normally. write_object(repo, oid, data) assert has_object(repo, oid) # --------------------------------------------------------------------------- # 10. GC preserves all real stored objects # --------------------------------------------------------------------------- class TestGcPreservesRealObjects: """_startup_gc must never delete a valid stored object.""" def test_100_objects_all_survive_gc(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) written: list[tuple[str, bytes]] = [] for i in range(100): payload = f"object-{i:04d}".encode() oid = _oid(payload) write_object(repo, oid, payload) written.append((oid, payload)) _startup_gc(repo) for oid, payload in written: assert read_object(repo, oid) == payload, f"Object {oid[:8]} deleted by GC" def test_real_head_and_config_survive_gc(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) muse = muse_dir(repo) head = muse / "HEAD" head.write_text("ref: refs/heads/main\n", encoding="utf-8") config = muse / "config.toml" config.write_text("[core]\n name = \"test\"\n", encoding="utf-8") _startup_gc(repo) assert head.read_text(encoding="utf-8") == "ref: refs/heads/main\n" assert "test" in config.read_text(encoding="utf-8") def test_gc_on_empty_repo_is_noop(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) # No objects, no stale files _startup_gc(repo) assert _count_stale_files(repo) == 0 # --------------------------------------------------------------------------- # 11. Performance: GC sweep ≤ 10 ms with 1 000 stale files # --------------------------------------------------------------------------- class TestGcSweeperPerformance: """Startup GC is fast enough to run on every require_repo invocation.""" @pytest.mark.slow def test_sweep_1000_stale_files_under_500ms(self, tmp_path: pathlib.Path) -> None: """GC sweeps 1 000 stale files in < 500 ms (wall clock including logging). The budget is generous because macOS APFS + pytest log capture add overhead (~50–100 μs per unlink + warning emission). On a real crash scenario a repo will have at most 1–3 stale files, so steady-state latency is < 1 ms. This test validates the worst-case bound. """ repo = _repo(tmp_path) muse = muse_dir(repo) (muse / "commits").mkdir(exist_ok=True) # Plant 1 000 stale .muse-tmp-* files across two subdirs. for _ in range(500): _plant_stale_muse_tmp(muse) for _ in range(500): _plant_stale_muse_tmp(muse, "commits") start = time.perf_counter() removed = _cleanup_muse_dir_temps(muse) duration_ms = (time.perf_counter() - start) * 1000 assert removed == 1000 assert duration_ms < 500.0, ( f"_cleanup_muse_dir_temps took {duration_ms:.1f} ms for 1 000 files " f"(budget: 500 ms)" ) @pytest.mark.slow def test_full_startup_gc_under_200ms(self, tmp_path: pathlib.Path) -> None: """Full _startup_gc (both sweeps) is < 200 ms with 200 stale temps. In production a crash leaves 1–3 stale files at most; this tests the adversarial bound of 200 simultaneous stale temps across both the object store and .muse/ directories. """ repo = _repo(tmp_path) muse = muse_dir(repo) (muse / "commits").mkdir(exist_ok=True) # 100 stale object temps across 10 shards. for i in range(10): shard = _shard(repo, f"{i:02x}") shard.mkdir(parents=True, exist_ok=True) for _ in range(10): _plant_stale_obj_tmp(shard) # 100 stale muse temps. for _ in range(50): _plant_stale_muse_tmp(muse) for _ in range(50): _plant_stale_muse_tmp(muse, "commits") start = time.perf_counter() _startup_gc(repo) duration_ms = (time.perf_counter() - start) * 1000 assert duration_ms < 200.0, ( f"_startup_gc took {duration_ms:.1f} ms (budget: 200 ms)" ) assert _count_stale_files(repo) == 0 # --------------------------------------------------------------------------- # 12. .restore-tmp-* in working-tree: scope documentation test # --------------------------------------------------------------------------- class TestRestoreTempWorkdirBound: """.restore-tmp-* files in the working tree (not .muse/) are outside GC scope. restore_object writes to a user-provided destination directory, not inside .muse/. If SIGKILL occurs between mkstemp and os.replace in restore_object, the stale temp stays in the working tree. The documented guarantee: the .muse/ repo state is never corrupted, and the stale restore temp in the working tree is inert (it does not block the next checkout/merge, which simply overwrites the destination path atomically). """ def test_restore_tmp_in_workdir_not_swept_by_gc(self, tmp_path: pathlib.Path) -> None: """GC sweeps .muse/ only — stale restore temps in workdir are out of scope.""" repo = _repo(tmp_path) # Plant a stale restore temp in the working tree (outside .muse/) fd, stale_str = tempfile.mkstemp(dir=tmp_path, prefix=".restore-tmp-") os.close(fd) stale = pathlib.Path(stale_str) stale.write_bytes(b"stale workdir restore temp") # GC sweeps only .muse/; workdir stale is untouched. _startup_gc(repo) # Stale in workdir persists — this is the documented limitation. assert stale.exists(), ( "GC must not touch working-tree files outside .muse/" ) # But .muse/ is clean. assert _count_stale_files(repo) == 0 stale.unlink() # cleanup def test_restore_tmp_in_obj_shard_IS_swept(self, tmp_path: pathlib.Path) -> None: """.restore-tmp-* inside .muse/objects/ shard dirs ARE swept (object store scope).""" repo = _repo(tmp_path) shard = _shard(repo, "dd") shard.mkdir(parents=True, exist_ok=True) stale = _plant_stale_restore_tmp(shard) _startup_gc(repo) assert not stale.exists(), ".restore-tmp-* in object shard must be swept"