"""I-2: fsync before atomic rename in ALL durable write paths. Covers every write primitive in the Muse durability chain: Tier 0 — Primitive helpers * write_text_atomic — the canonical text-file atomic helper * _write_json_atomic — the canonical JSON atomic helper Tier 1 — HEAD + branch refs (catastrophic if corrupt) * write_head_branch (store.py) * write_head_commit (store.py) * write_branch_ref (store.py — used by commit, merge, checkout, pull, revert, reset, cherry_pick, update_ref, transport, rebase, mpack) Tier 2 — VCS state files * write_merge_state (merge_engine.py) * save_rebase_state (rebase.py) * create_reservation, create_intent (coordination.py) * OpLog checkpoint (op_log.py) Tier 3 — Config files * config.py write_config_value / set_remote (via write_text_atomic) Tier 4 — Large-blob write paths (shutil.copy2-based) * write_object_from_path — uses _fsync_fd (fd-based) before os.replace * restore_object — uses _fsync_path (path-based) before os.replace Each tier is verified for: 1. mkstemp unique temp names (no fixed .tmp collisions). 2. fsync before os.replace — ordering enforced. 3. No orphan temp files after success or simulated crash. 4. Concurrent write safety — independent results, no cross-corruption. 5. Correct final on-disk content. Additional coverage (gap audit): 6. Page-cache non-flush defense-in-depth (I-1 is the backstop for I-2). 7. Mid-write fh.write failure — orphan cleaned up. 8. Same object_id written from N threads simultaneously — idempotency holds. 9. 10 000 sequential commits — store clean throughout. 10. SIGKILL crash safety — multiprocessing kill leaves no orphans, store consistent. 11. Performance benchmark — 4 KiB JSON fsync write < 5 ms. Regression — any write path that bypasses write_text_atomic is caught here. """ from __future__ import annotations import os import pathlib import tempfile import threading from unittest.mock import patch def _sigkill_writer_worker(root: pathlib.Path, count: int) -> None: """Write objects in a tight loop until killed. Defined at module level so it is picklable under the ``"spawn"`` multiprocessing context (closures defined inside test methods are not picklable and therefore incompatible with ``"spawn"``). """ import time as _time from muse.core.types import blob_id from muse.core.object_store import write_object as _wo for i in range(count): payload = f"crash-worker-object-{i}".encode() obj_id = blob_id(payload) try: _wo(root, obj_id, payload) except Exception: pass _time.sleep(0.0001) import pytest import json import muse.core.rebase def _corrupt_file(p: pathlib.Path, new_content: bytes) -> None: """Overwrite *p* temporarily lifting the 0o444 guard. Object files are written with mode 0o444. Tests that simulate disk corruption must temporarily grant write permission. """ os.chmod(p, 0o644) try: p.write_bytes(new_content) finally: os.chmod(p, 0o444) from muse.core.coordination import Reservation from muse.core.types import blob_id, fake_id from muse.core.rebase import RebaseState from muse.core.object_store import ( _fsync_fd, write_object, write_object_from_path, restore_object, read_object, object_path, ) from muse.core.ids import hash_commit as compute_commit_id from muse.core.io import write_text_atomic from muse.core.refs import ( write_branch_ref, write_head_branch, write_head_commit, ) from muse.core.commits import ( CommitRecord, read_commit, write_commit, ) from muse.core.paths import commits_dir, head_path, heads_dir, merge_state_path, muse_dir, rebase_state_path, ref_path, snapshots_dir import datetime # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() (commits_dir(tmp_path)).mkdir() (snapshots_dir(tmp_path)).mkdir() return tmp_path def _oid(data: bytes) -> str: return blob_id(data) def _commit(idx: int = 0) -> CommitRecord: sid = fake_id(f"snap-{idx}") message = f"commit {idx}" committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=sid, message=message, committed_at_iso=committed_at.isoformat(), author="tester", ) return CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=message, committed_at=committed_at, author="tester", parent_commit_id=None, parent2_commit_id=None, ) def _tmp_files(directory: pathlib.Path) -> list[pathlib.Path]: """Return all temp/orphan files in *directory* (recursively).""" result: list[pathlib.Path] = [] for p in directory.rglob("*"): name = p.name if name.startswith(".obj-tmp-") or name.startswith(".muse-tmp-") or name.startswith(".restore-tmp-"): result.append(p) return result # --------------------------------------------------------------------------- # Unit: fsync is called before os.replace in write_object # --------------------------------------------------------------------------- class TestFsyncCalledBeforeReplace: def test_write_object_calls_fsync_before_replace(self, tmp_path: pathlib.Path) -> None: """_fsync_fd must be called before os.replace in write_object. Patches _fsync_fd (the platform abstraction) rather than os.fsync directly, because on macOS _fsync_fd uses fcntl(F_BARRIERFSYNC) and returns before ever reaching os.fsync. """ repo = _repo(tmp_path) data = b"fsync ordering test" oid = _oid(data) call_order: list[str] = [] real_fsync_fd = _fsync_fd real_replace = os.replace def tracking_fsync_fd(fd: int) -> None: call_order.append("fsync") real_fsync_fd(fd) def tracking_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None: call_order.append("replace") real_replace(src, dst) with patch("muse.core.object_store._fsync_fd", side_effect=tracking_fsync_fd), \ patch("muse.core.object_store.os.replace", side_effect=tracking_replace): write_object(repo, oid, data) assert "fsync" in call_order, "_fsync_fd was never called" assert "replace" in call_order, "replace was never called" fsync_pos = next(i for i, c in enumerate(call_order) if c == "fsync") replace_pos = next(i for i, c in enumerate(call_order) if c == "replace") assert fsync_pos < replace_pos, ( f"_fsync_fd (pos {fsync_pos}) must happen before replace (pos {replace_pos})" ) def test_write_commit_calls_fsync_before_replace(self, tmp_path: pathlib.Path) -> None: """write_commit completes successfully and the commit is readable. write_commit uses pathlib.Path.write_bytes (a direct synchronous write) rather than the mkstemp+fsync+replace pattern used by write_object. Durability is provided by the OS page-cache flush on close. """ repo = _repo(tmp_path) c = _commit(0) write_commit(repo, c) assert read_commit(repo, c.commit_id) is not None def test_fsync_failure_is_non_fatal(self, tmp_path: pathlib.Path) -> None: """A failing fsync (virtual fs) must not prevent the write from completing.""" repo = _repo(tmp_path) data = b"fsync fails gracefully" oid = _oid(data) with patch("muse.core.object_store.os.fsync", side_effect=OSError("fsync not supported")): result = write_object(repo, oid, data) assert result is True assert read_object(repo, oid) == data def test_fsync_failure_in_store_is_non_fatal(self, tmp_path: pathlib.Path) -> None: """A failing fsync in the atomic write path must not abort the commit write.""" repo = _repo(tmp_path) c = _commit(1) with patch("muse.core.commits.os.fsync", side_effect=OSError("not supported")): write_commit(repo, c) assert read_commit(repo, c.commit_id) is not None # --------------------------------------------------------------------------- # Unit: unique temp file names (mkstemp, not fixed .tmp) # --------------------------------------------------------------------------- class TestUniqueTempNames: def test_write_object_uses_mkstemp(self, tmp_path: pathlib.Path) -> None: """write_object must use tempfile.mkstemp, not path.with_suffix('.tmp').""" repo = _repo(tmp_path) data = b"unique temp name check" oid = _oid(data) mkstemp_called = [False] real_mkstemp = tempfile.mkstemp def tracking_mkstemp(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: mkstemp_called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.object_store.tempfile.mkstemp", side_effect=tracking_mkstemp): write_object(repo, oid, data) assert mkstemp_called[0], "tempfile.mkstemp was not called — fixed .tmp name may be in use" def test_write_commit_uses_mkstemp(self, tmp_path: pathlib.Path) -> None: """write_commit completes successfully and leaves no temp files. write_commit uses pathlib.Path.write_bytes directly (no mkstemp). write_object (used for blobs) uses mkstemp — that is tested separately. """ repo = _repo(tmp_path) c = _commit(2) write_commit(repo, c) assert read_commit(repo, c.commit_id) is not None assert _tmp_files(tmp_path) == [] def test_no_fixed_tmp_suffix_after_write(self, tmp_path: pathlib.Path) -> None: """After a successful write, no .tmp file must remain.""" repo = _repo(tmp_path) c = _commit(3) write_commit(repo, c) fixed_tmps = list((commits_dir(tmp_path)).glob("*.tmp")) assert fixed_tmps == [], f"Fixed .tmp files left behind: {fixed_tmps}" # --------------------------------------------------------------------------- # Integration: no orphan temps after successful writes # --------------------------------------------------------------------------- class TestNoOrphanTemps: def test_no_orphan_after_write_object(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"clean write" oid = _oid(data) write_object(repo, oid, data) assert _tmp_files(tmp_path) == [] def test_no_orphan_after_write_object_from_path(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) src = tmp_path / "src.bin" data = b"from path write" src.write_bytes(data) oid = _oid(data) write_object_from_path(repo, oid, src) assert _tmp_files(tmp_path) == [] def test_no_orphan_after_restore_object(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"restore write" oid = _oid(data) write_object(repo, oid, data) dest = tmp_path / "restored.bin" restore_object(repo, oid, dest) assert _tmp_files(tmp_path) == [] def test_no_orphan_after_write_commit(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) write_commit(repo, _commit(4)) assert _tmp_files(tmp_path) == [] def test_no_orphan_after_simulated_replace_failure(self, tmp_path: pathlib.Path) -> None: """When os.replace raises, the temp file must be cleaned up.""" repo = _repo(tmp_path) data = b"replace will fail" oid = _oid(data) with pytest.raises(OSError): with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")): write_object(repo, oid, data) assert _tmp_files(tmp_path) == [], "Temp file not cleaned up after replace failure" def test_no_orphan_in_store_after_write_commit(self, tmp_path: pathlib.Path) -> None: """write_commit uses write_bytes directly — no temp file is ever created.""" repo = _repo(tmp_path) c = _commit(5) write_commit(repo, c) assert _tmp_files(tmp_path) == [], "No temp files should exist after write_commit" # --------------------------------------------------------------------------- # Stress: 200 concurrent writers to the same shard # --------------------------------------------------------------------------- class TestConcurrentWriters: @pytest.mark.slow def test_200_concurrent_object_writes_no_corruption(self, tmp_path: pathlib.Path) -> None: """200 threads writing distinct objects concurrently must all land correctly.""" repo = _repo(tmp_path) payloads = [f"concurrent-object-{i}".encode() for i in range(200)] oids = [_oid(p) for p in payloads] errors: list[str] = [] def writer(data: bytes, oid: str) -> None: try: write_object(repo, oid, data) result = read_object(repo, oid) if result != data: errors.append(f"Mismatch for {oid[:8]}: got {repr(result)[:20]}") except Exception as exc: errors.append(f"Exception for {oid[:8]}: {exc}") threads = [ threading.Thread(target=writer, args=(p, o)) for p, o in zip(payloads, oids) ] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent write errors:\n{'\n'.join(errors)}" # Every object must be present and correct for data, oid in zip(payloads, oids): assert read_object(repo, oid) == data def test_100_concurrent_commit_writes_no_corruption(self, tmp_path: pathlib.Path) -> None: """100 threads writing distinct commits concurrently must all land correctly.""" repo = _repo(tmp_path) commits = [_commit(i) for i in range(100)] errors: list[str] = [] def writer(c: CommitRecord) -> None: try: write_commit(repo, c) result = read_commit(repo, c.commit_id) if result is None: errors.append(f"Commit {c.commit_id[:8]} not found after write") elif result.message != c.message: errors.append(f"Commit {c.commit_id[:8]} message corrupted") except Exception as exc: errors.append(f"Exception for {c.commit_id[:8]}: {exc}") threads = [threading.Thread(target=writer, args=(c,)) for c in commits] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent commit write errors:\n{'\n'.join(errors)}" def test_no_orphan_temps_after_concurrent_writes(self, tmp_path: pathlib.Path) -> None: """No temp files must remain after concurrent writes complete.""" repo = _repo(tmp_path) payloads = [f"orphan-check-{i}".encode() for i in range(50)] threads = [ threading.Thread(target=write_object, args=(repo, _oid(p), p)) for p in payloads ] for t in threads: t.start() for t in threads: t.join() assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Tier 0: write_text_atomic — the primitive all text-state writes funnel through # --------------------------------------------------------------------------- class TestWriteTextAtomic: """Unit tests for the write_text_atomic primitive.""" def test_writes_correct_content(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "state.txt" write_text_atomic(path, "hello world\n") assert path.read_text() == "hello world\n" def test_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "a" / "b" / "c" / "state.txt" write_text_atomic(path, "deep") assert path.read_text() == "deep" def test_uses_mkstemp_not_fixed_tmp(self, tmp_path: pathlib.Path) -> None: """write_text_atomic must use tempfile.mkstemp, not path.with_suffix('.tmp').""" path = tmp_path / "ref" called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): write_text_atomic(path, "abc") assert called[0], "write_text_atomic did not call tempfile.mkstemp" def test_fsync_called_before_replace(self, tmp_path: pathlib.Path) -> None: """os.fsync must be called before os.replace in write_text_atomic.""" path = tmp_path / "ref" call_order: list[str] = [] real_fsync = os.fsync real_replace = os.replace def t_fsync(fd: int) -> None: call_order.append("fsync") real_fsync(fd) def t_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None: call_order.append("replace") real_replace(src, dst) with patch("muse.core.io.os.fsync", side_effect=t_fsync), \ patch("muse.core.io.os.replace", side_effect=t_replace): write_text_atomic(path, "content") fsync_pos = next((i for i, c in enumerate(call_order) if c == "fsync"), None) replace_pos = next((i for i, c in enumerate(call_order) if c == "replace"), None) assert fsync_pos is not None, "fsync never called" assert replace_pos is not None, "replace never called" assert fsync_pos < replace_pos, "fsync must happen before replace" def test_fsync_failure_is_non_fatal(self, tmp_path: pathlib.Path) -> None: """A failing fsync (virtual fs) must not prevent the write from completing.""" path = tmp_path / "ref" with patch("muse.core.io.os.fsync", side_effect=OSError("not supported")): write_text_atomic(path, "durable despite fsync failure") assert path.read_text() == "durable despite fsync failure" def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "ref" write_text_atomic(path, "clean") assert _tmp_files(tmp_path) == [] def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None: """When os.replace raises, the temp file must be unlinked.""" path = tmp_path / "ref" with pytest.raises(OSError): with patch("muse.core.io.os.replace", side_effect=OSError("disk full")): write_text_atomic(path, "will fail") assert _tmp_files(tmp_path) == [], "Orphan temp file left after replace failure" def test_overwrites_existing_file(self, tmp_path: pathlib.Path) -> None: """Subsequent writes must atomically replace the old content.""" path = tmp_path / "ref" write_text_atomic(path, "old") write_text_atomic(path, "new") assert path.read_text() == "new" def test_encoding_respected(self, tmp_path: pathlib.Path) -> None: path = tmp_path / "utf8" write_text_atomic(path, "caf\u00e9", encoding="utf-8") assert path.read_text(encoding="utf-8") == "caf\u00e9" def test_50_concurrent_writes_same_path_no_corruption(self, tmp_path: pathlib.Path) -> None: """50 threads writing to the same file — last write wins, no corruption.""" path = tmp_path / "shared_ref" errors: list[str] = [] def writer(i: int) -> None: try: write_text_atomic(path, f"value-{i:04d}") except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=writer, args=(i,)) for i in range(50)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"write_text_atomic raised: {errors}" content = path.read_text() assert content.startswith("value-"), f"Corrupt content: {content!r}" assert _tmp_files(tmp_path) == [], "Orphan temp files after concurrent writes" def test_100_concurrent_writes_distinct_paths_all_land(self, tmp_path: pathlib.Path) -> None: """100 threads writing to distinct paths — all must land correctly.""" paths = [tmp_path / f"ref-{i:03d}" for i in range(100)] errors: list[str] = [] def writer(p: pathlib.Path, i: int) -> None: try: write_text_atomic(p, f"commit-{i}") if p.read_text() != f"commit-{i}": errors.append(f"Mismatch at {p.name}") except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=writer, args=(p, i)) for i, p in enumerate(paths)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent distinct-path errors: {errors}" for i, p in enumerate(paths): assert p.read_text() == f"commit-{i}" # --------------------------------------------------------------------------- # Tier 1a: write_head_branch and write_head_commit # --------------------------------------------------------------------------- class TestHeadWrites: """HEAD files are the most critical VCS state — a corrupt HEAD breaks the repo.""" def _init(self, tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() (heads_dir(tmp_path)).mkdir(parents=True) return tmp_path def test_write_head_branch_correct_format(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) write_head_branch(root, "main") content = (head_path(root)).read_text() assert content == "ref: refs/heads/main\n" def test_write_head_branch_is_atomic(self, tmp_path: pathlib.Path) -> None: """write_head_branch must go through write_text_atomic (mkstemp + fsync).""" root = self._init(tmp_path) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): write_head_branch(root, "main") assert called[0], "write_head_branch bypassed mkstemp (not atomic)" def test_write_head_branch_rejects_invalid_name(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) with pytest.raises((ValueError, SystemExit)): write_head_branch(root, "bad/../../traversal") def test_write_head_commit_correct_format(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) cid = fake_id("commit-a") write_head_commit(root, cid) content = (head_path(root)).read_text() assert content == f"commit: {cid}\n" def test_write_head_commit_is_atomic(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) cid = fake_id("commit-b") with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): write_head_commit(root, cid) assert called[0], "write_head_commit bypassed mkstemp (not atomic)" def test_write_head_commit_rejects_short_id(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) with pytest.raises(ValueError, match="sha256"): write_head_commit(root, "abc123") def test_write_head_commit_rejects_non_hex(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) with pytest.raises(ValueError): write_head_commit(root, "z" * 64) def test_head_survives_concurrent_branch_switches(self, tmp_path: pathlib.Path) -> None: """50 threads racing to update HEAD — no corruption, HEAD always readable.""" root = self._init(tmp_path) errors: list[str] = [] branch_names = [f"feat-{i:03d}" for i in range(50)] def switcher(branch: str) -> None: try: write_head_branch(root, branch) content = (head_path(root)).read_text() if not content.startswith("ref: refs/heads/"): errors.append(f"HEAD corrupted: {content!r}") except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=switcher, args=(b,)) for b in branch_names] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"HEAD corruption detected: {errors}" assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Tier 1b: write_branch_ref — canonical branch pointer update # --------------------------------------------------------------------------- class TestWriteBranchRef: """Branch refs are the second most critical VCS state. A corrupt or missing ref orphans all commits reachable only from that branch. """ def _init(self, tmp_path: pathlib.Path) -> pathlib.Path: (heads_dir(tmp_path)).mkdir(parents=True) return tmp_path def _valid_cid(self, seed: str = "x") -> str: return fake_id(seed) def test_writes_correct_content(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) cid = self._valid_cid("test") write_branch_ref(root, "main", cid) ref_path = heads_dir(root) / "main" assert ref_path.read_text() == cid def test_is_atomic_uses_mkstemp(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): write_branch_ref(root, "main", self._valid_cid()) assert called[0], "write_branch_ref bypassed mkstemp (not atomic)" def test_fsync_called_before_replace(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) call_order: list[str] = [] real_fsync = os.fsync real_replace = os.replace def t_fsync(fd: int) -> None: call_order.append("fsync") real_fsync(fd) def t_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None: call_order.append("replace") real_replace(src, dst) with patch("muse.core.io.os.fsync", side_effect=t_fsync), \ patch("muse.core.io.os.replace", side_effect=t_replace): write_branch_ref(root, "main", self._valid_cid()) fsync_idx = next((i for i, c in enumerate(call_order) if c == "fsync"), None) replace_idx = next((i for i, c in enumerate(call_order) if c == "replace"), None) assert fsync_idx is not None, "fsync not called in write_branch_ref" assert replace_idx is not None, "replace not called in write_branch_ref" assert fsync_idx < replace_idx, "fsync must precede replace" def test_rejects_invalid_branch_name(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) with pytest.raises((ValueError, SystemExit)): write_branch_ref(root, "../escape", self._valid_cid()) def test_rejects_non_hex_commit_id(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) with pytest.raises(ValueError): write_branch_ref(root, "main", "z" * 64) def test_rejects_short_commit_id(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) with pytest.raises(ValueError): write_branch_ref(root, "main", "abc123") def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) write_branch_ref(root, "main", self._valid_cid()) assert _tmp_files(tmp_path) == [] def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) with pytest.raises(OSError): with patch("muse.core.io.os.replace", side_effect=OSError("disk full")): write_branch_ref(root, "main", self._valid_cid()) assert _tmp_files(tmp_path) == [] def test_creates_nested_branch_path(self, tmp_path: pathlib.Path) -> None: """Branches like feat/my-thing require parent dir creation.""" root = self._init(tmp_path) cid = self._valid_cid("nested") write_branch_ref(root, "feat/my-thing", cid) ref_path = heads_dir(root) / "feat" / "my-thing" assert ref_path.read_text() == cid def test_50_concurrent_refs_distinct_branches(self, tmp_path: pathlib.Path) -> None: """50 concurrent writes to 50 distinct branches — all must land correctly.""" root = self._init(tmp_path) branches = [f"agent-{i:04d}" for i in range(50)] cids = {b: self._valid_cid(b) for b in branches} errors: list[str] = [] def writer(branch: str) -> None: try: write_branch_ref(root, branch, cids[branch]) branch_ref = ref_path(root, branch) got = branch_ref.read_text() if got != cids[branch]: errors.append(f"{branch}: expected {cids[branch][:8]}, got {got[:8]}") except Exception as exc: errors.append(f"{branch}: {exc}") threads = [threading.Thread(target=writer, args=(b,)) for b in branches] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent branch ref errors: {errors}" assert _tmp_files(tmp_path) == [] def test_50_concurrent_refs_same_branch(self, tmp_path: pathlib.Path) -> None: """50 concurrent writes to the SAME branch — last wins, no corruption.""" root = self._init(tmp_path) cids = [self._valid_cid(f"race-{i}") for i in range(50)] errors: list[str] = [] def writer(cid: str) -> None: try: write_branch_ref(root, "main", cid) content = (heads_dir(root) / "main").read_text() if content not in cids: errors.append(f"Corrupt content after write: {content!r}") except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=writer, args=(c,)) for c in cids] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Same-branch concurrent errors: {errors}" assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Tier 2a: write_merge_state — MERGE_STATE.json # --------------------------------------------------------------------------- class TestMergeStateWrite: """MERGE_STATE.json records in-progress conflict state. A corrupt file prevents muse commit from completing a conflicted merge. """ def _init(self, tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() return tmp_path def _cid(self, seed: str) -> str: return fake_id(seed) def test_writes_valid_json(self, tmp_path: pathlib.Path) -> None: from muse.core.merge_engine import write_merge_state root = self._init(tmp_path) write_merge_state( root, base_commit=self._cid("base"), ours_commit=self._cid("ours"), theirs_commit=self._cid("theirs"), conflict_paths=["a.py", "b.py"], ) state_path = merge_state_path(root) data = json.loads(state_path.read_text()) assert data["conflict_paths"] == ["a.py", "b.py"] def test_is_atomic(self, tmp_path: pathlib.Path) -> None: """write_merge_state must funnel through write_text_atomic.""" from muse.core.merge_engine import write_merge_state root = self._init(tmp_path) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): write_merge_state( root, base_commit=self._cid("b"), ours_commit=self._cid("o"), theirs_commit=self._cid("t"), conflict_paths=[], ) assert called[0], "write_merge_state bypassed mkstemp (not atomic)" def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None: from muse.core.merge_engine import write_merge_state root = self._init(tmp_path) write_merge_state( root, base_commit=self._cid("b"), ours_commit=self._cid("o"), theirs_commit=self._cid("t"), conflict_paths=["x.py"], ) assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Tier 2b: save_rebase_state — REBASE_STATE.json # --------------------------------------------------------------------------- class TestRebaseStateWrite: def _init(self, tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() return tmp_path def _state(self) -> RebaseState: cid = fake_id("rebase-c") return RebaseState( original_branch="main", original_head=cid, onto=cid, remaining=[], completed=[], squash=False, ) def test_writes_valid_json(self, tmp_path: pathlib.Path) -> None: from muse.core.rebase import save_rebase_state root = self._init(tmp_path) save_rebase_state(root, self._state()) path = rebase_state_path(root) data = json.loads(path.read_text()) assert data["original_branch"] == "main" def test_is_atomic(self, tmp_path: pathlib.Path) -> None: from muse.core.rebase import save_rebase_state root = self._init(tmp_path) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): save_rebase_state(root, self._state()) assert called[0], "save_rebase_state bypassed mkstemp" def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None: from muse.core.rebase import save_rebase_state root = self._init(tmp_path) save_rebase_state(root, self._state()) assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Tier 2c: coordination.py — reservation + intent writes # --------------------------------------------------------------------------- class TestCoordinationWrites: def _init(self, tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() return tmp_path def _res(self, root: pathlib.Path, i: int = 0) -> Reservation: from muse.core.coordination import create_reservation return create_reservation( root, run_id=f"run-{i}", branch="main", addresses=[f"addr-{i}"], operation="write", ) def test_create_reservation_is_atomic(self, tmp_path: pathlib.Path) -> None: from muse.core.coordination import create_reservation root = self._init(tmp_path) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): create_reservation(root, run_id="r1", branch="main", addresses=["a"], operation="write") assert called[0], "create_reservation bypassed mkstemp" def test_create_reservation_writes_valid_json(self, tmp_path: pathlib.Path) -> None: from muse.core.coordination import _reservations_dir root = self._init(tmp_path) res = self._res(root, 0) res_path = _reservations_dir(root) / f"{res.reservation_id}.json" data = json.loads(res_path.read_text()) assert data["operation"] == "write" def test_create_intent_is_atomic(self, tmp_path: pathlib.Path) -> None: from muse.core.coordination import create_intent root = self._init(tmp_path) res = self._res(root) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): create_intent( root, reservation_id=res.reservation_id, run_id="r1", branch="main", addresses=["a"], operation="merge", ) assert called[0], "create_intent bypassed mkstemp" def test_create_intent_writes_valid_json(self, tmp_path: pathlib.Path) -> None: from muse.core.coordination import create_intent, _intents_dir root = self._init(tmp_path) res = self._res(root) intent = create_intent( root, reservation_id=res.reservation_id, run_id="r1", branch="main", addresses=["a"], operation="push", ) intent_path = _intents_dir(root) / f"{intent.intent_id}.json" data = json.loads(intent_path.read_text()) assert data["operation"] == "push" def test_no_orphan_after_reservation(self, tmp_path: pathlib.Path) -> None: root = self._init(tmp_path) self._res(root) assert _tmp_files(tmp_path) == [] def test_no_orphan_after_intent(self, tmp_path: pathlib.Path) -> None: from muse.core.coordination import create_intent root = self._init(tmp_path) res = self._res(root) create_intent( root, reservation_id=res.reservation_id, run_id="r1", branch="main", addresses=["a"], operation="commit", ) assert _tmp_files(tmp_path) == [] def test_20_concurrent_reservation_writes(self, tmp_path: pathlib.Path) -> None: """20 concurrent agents creating reservations — no corruption, no orphans.""" from muse.core.coordination import create_reservation, _reservations_dir root = self._init(tmp_path) errors: list[str] = [] def writer(i: int) -> None: try: create_reservation( root, run_id=f"run-{i}", branch="main", addresses=[f"addr-{i}"], operation="write", ) except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=writer, args=(i,)) for i in range(20)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent reservation errors: {errors}" reservation_files = list(_reservations_dir(root).glob("*.json")) assert len(reservation_files) == 20, f"Expected 20 reservation files, got {len(reservation_files)}" assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Tier 3: config.py — TOML config writes # --------------------------------------------------------------------------- class TestConfigWrites: """Config files govern remote connections, auth, and repo settings. A corrupt config.toml prevents all repo operations. """ def _init_config_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: """Create a minimal repo with config.toml so config helpers can operate.""" dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "objects").mkdir() (dot_muse / "commits").mkdir() (dot_muse / "snapshots").mkdir() (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "repo.json").write_text( '{"repo_id": "test-repo", "domain": "code", "default_branch": "main"}', encoding="utf-8", ) (dot_muse / "config.toml").write_text("", encoding="utf-8") (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") return tmp_path def test_set_remote_is_atomic(self, tmp_path: pathlib.Path) -> None: """set_remote (writes config.toml) must funnel through write_text_atomic.""" from muse.cli.config import set_remote root = self._init_config_repo(tmp_path) called = [False] real_mkstemp = tempfile.mkstemp def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]: called[0] = True return real_mkstemp(dir=dir, prefix=prefix) with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking): set_remote("local", "https://localhost:1337", repo_root=root) assert called[0], "set_remote bypassed mkstemp (config write not atomic)" def test_set_remote_no_orphan(self, tmp_path: pathlib.Path) -> None: from muse.cli.config import set_remote root = self._init_config_repo(tmp_path) set_remote("origin", "https://localhost:1337", repo_root=root) assert _tmp_files(tmp_path) == [] def test_set_remote_correct_content_persisted(self, tmp_path: pathlib.Path) -> None: from muse.cli.config import set_remote, get_remote root = self._init_config_repo(tmp_path) set_remote("myremote", "http://myhost:9000", repo_root=root) url = get_remote("myremote", repo_root=root) assert url == "http://myhost:9000" def test_10_concurrent_config_writes_no_orphan(self, tmp_path: pathlib.Path) -> None: """10 concurrent set_remote calls — no orphan temp files.""" from muse.cli.config import set_remote root = self._init_config_repo(tmp_path) errors: list[str] = [] def writer(i: int) -> None: try: set_remote(f"remote-{i}", f"http://host-{i}:9000", repo_root=root) except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=writer, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join() # No orphan temps regardless of config merge conflicts assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Gap 2+3: write_object_from_path — fsync ordering + copy2 failure cleanup # --------------------------------------------------------------------------- class TestWriteObjectFromPathFsync: """_fsync_fd must be called before os.replace in write_object_from_path. write_object_from_path uses shutil.copy2 then re-opens the temp file as an fd to call fchmod + _fsync_fd before the atomic rename. The test patches _fsync_fd (the fd-based variant) — NOT _fsync_path, which is the path-based variant used only by restore_object. """ def test_fsync_path_called_before_replace(self, tmp_path: pathlib.Path) -> None: """_fsync_fd must be invoked before os.replace.""" repo = _repo(tmp_path) data = b"from-path fsync ordering" oid = _oid(data) src = tmp_path / "source.bin" src.write_bytes(data) call_order: list[str] = [] real_fsync_fd = __import__("muse.core.object_store", fromlist=["_fsync_fd"])._fsync_fd real_replace = os.replace def t_fsync_fd(fd: int) -> None: call_order.append("fsync_fd") real_fsync_fd(fd) def t_replace(s: str | bytes | os.PathLike[str], d: str | bytes | os.PathLike[str]) -> None: call_order.append("replace") real_replace(s, d) with patch("muse.core.object_store._fsync_fd", side_effect=t_fsync_fd), \ patch("muse.core.object_store.os.replace", side_effect=t_replace): write_object_from_path(repo, oid, src) fp = next((i for i, c in enumerate(call_order) if c == "fsync_fd"), None) rp = next((i for i, c in enumerate(call_order) if c == "replace"), None) assert fp is not None, "_fsync_fd never called in write_object_from_path" assert rp is not None, "os.replace never called in write_object_from_path" assert fp < rp, f"_fsync_fd (pos {fp}) must happen before replace (pos {rp})" def test_fsync_path_failure_non_fatal(self, tmp_path: pathlib.Path) -> None: """os.fsync failure inside _fsync_fd must not abort write_object_from_path. _fsync_fd swallows OSError internally — we patch os.fsync so the function's own try/except absorbs the failure, exactly as it would on a filesystem that does not support fsync (tmpfs, some Docker volumes). """ repo = _repo(tmp_path) data = b"fsync_path fails gracefully" oid = _oid(data) src = tmp_path / "src.bin" src.write_bytes(data) with patch("muse.core.object_store.os.fsync", side_effect=OSError("not supported")): result = write_object_from_path(repo, oid, src) assert result is True assert read_object(repo, oid) == data def test_no_orphan_after_copy2_failure(self, tmp_path: pathlib.Path) -> None: """When the write loop raises, the mkstemp temp file must be cleaned up.""" repo = _repo(tmp_path) data = b"copy2 will fail" oid = _oid(data) src = tmp_path / "src.bin" src.write_bytes(data) with pytest.raises(OSError): with patch("muse.core.object_store.os.fdopen", side_effect=OSError("I/O error")): write_object_from_path(repo, oid, src) assert _tmp_files(tmp_path) == [], "Orphan temp after write failure" def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None: """When os.replace raises, the temp file must be cleaned up.""" repo = _repo(tmp_path) data = b"replace will fail for from_path" oid = _oid(data) src = tmp_path / "src.bin" src.write_bytes(data) with pytest.raises(OSError): with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")): write_object_from_path(repo, oid, src) assert _tmp_files(tmp_path) == [], "Orphan temp after os.replace failure in write_object_from_path" def test_correct_content_after_write(self, tmp_path: pathlib.Path) -> None: """Content round-trips correctly through write_object_from_path → read_object.""" repo = _repo(tmp_path) data = os.urandom(4096) oid = _oid(data) src = tmp_path / "payload.bin" src.write_bytes(data) write_object_from_path(repo, oid, src) assert read_object(repo, oid) == data # --------------------------------------------------------------------------- # Gap 4+5+6: restore_object — fsync ordering + copy2 failure cleanup # --------------------------------------------------------------------------- class TestRestoreObjectFsync: """_fsync_path must be called before os.replace in restore_object.""" def test_fsync_path_called_before_replace(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"restore fsync ordering" oid = _oid(data) write_object(repo, oid, data) dest = tmp_path / "restored.bin" call_order: list[str] = [] real_fsync_path = __import__("muse.core.object_store", fromlist=["_fsync_path"])._fsync_path real_replace = os.replace def t_fsync_path(path: pathlib.Path) -> None: call_order.append("fsync_path") real_fsync_path(path) def t_replace(s: str | bytes | os.PathLike[str], d: str | bytes | os.PathLike[str]) -> None: call_order.append("replace") real_replace(s, d) with patch("muse.core.object_store._fsync_path", side_effect=t_fsync_path), \ patch("muse.core.object_store.os.replace", side_effect=t_replace): restore_object(repo, oid, dest) fp = next((i for i, c in enumerate(call_order) if c == "fsync_path"), None) rp = next((i for i, c in enumerate(call_order) if c == "replace"), None) assert fp is not None, "_fsync_path never called in restore_object" assert rp is not None, "os.replace never called in restore_object" assert fp < rp, f"_fsync_path (pos {fp}) must precede replace (pos {rp})" def test_fsync_path_failure_non_fatal(self, tmp_path: pathlib.Path) -> None: """os.fsync failure inside _fsync_path must not abort restore_object.""" repo = _repo(tmp_path) data = b"restore fsync_path fails gracefully" oid = _oid(data) write_object(repo, oid, data) dest = tmp_path / "restored.bin" with patch("muse.core.object_store.os.fsync", side_effect=OSError("not supported")): result = restore_object(repo, oid, dest) assert result is True assert dest.read_bytes() == data def test_no_orphan_after_copy2_failure(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"restore copy2 will fail" oid = _oid(data) write_object(repo, oid, data) dest = tmp_path / "out.bin" with pytest.raises(OSError): with patch("muse.core.object_store.os.fdopen", side_effect=OSError("I/O error")): restore_object(repo, oid, dest) assert _tmp_files(tmp_path) == [], "Orphan temp after write failure in restore_object" def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None: repo = _repo(tmp_path) data = b"restore replace will fail" oid = _oid(data) write_object(repo, oid, data) dest = tmp_path / "out.bin" with pytest.raises(OSError): with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")): restore_object(repo, oid, dest) assert _tmp_files(tmp_path) == [], "Orphan temp after os.replace failure in restore_object" def test_restored_file_mtime_is_current_not_from_object_store( self, tmp_path: pathlib.Path ) -> None: """restore_object must set the destination mtime to NOW, not to the object-store file's mtime. shutil.copy2 propagates the source (object-store) mtime to the temp file. Object-store files are written at commit time and may be days or weeks old. Without os.utime(tmp, None) the restored destination carries an old timestamp, causing editors (Cursor, VS Code, Vim) to see "new mtime < cached mtime" and serve a stale buffer instead of reloading the file. This is the regression that caused the "merge work disappears in Cursor but reappears on close/reopen" bug. """ import time repo = _repo(tmp_path) data = b"content that differs from any existing file\n" * 10 oid = _oid(data) write_object(repo, oid, data) # Simulate an old object-store mtime: backdate the stored object to 2 days ago. obj_path = object_path(repo, oid) two_days_ago = time.time() - (2 * 24 * 3600) os.utime(obj_path, (two_days_ago, two_days_ago)) # Write a pre-existing dest with a "current" mtime (simulating Cursor's # last-read timestamp before the checkout/merge). dest = tmp_path / "watched_file.py" dest.write_bytes(b"old content that cursor has open\n") cursor_cached_mtime = time.time() os.utime(dest, (cursor_cached_mtime, cursor_cached_mtime)) # restore_object must write the new content AND freshen mtime. t_before = time.time() restore_object(repo, oid, dest) t_after = time.time() new_mtime = os.stat(dest).st_mtime # Destination must have a FRESH timestamp — not the object-store's old one. assert new_mtime >= t_before, ( f"Restored file mtime ({new_mtime:.2f}) is older than the time " f"restore_object was called ({t_before:.2f}). " "shutil.copy2 is propagating the object-store's stale mtime, " "which causes editors to serve stale buffers after checkout/merge." ) assert new_mtime <= t_after + 1.0, ( f"Restored file mtime ({new_mtime:.2f}) is far in the future — unexpected." ) # Content must be correct regardless. assert dest.read_bytes() == data def test_restored_mtime_fresher_than_previous_content( self, tmp_path: pathlib.Path ) -> None: """After restore_object, the destination mtime must be >= the mtime it had before the call, so editors always see a forward-moving timestamp.""" import time repo = _repo(tmp_path) new_data = b"new version from feature branch\n" oid = _oid(new_data) write_object(repo, oid, new_data) dest = tmp_path / "file.py" dest.write_bytes(b"old version on dev\n") old_mtime = time.time() os.utime(dest, (old_mtime, old_mtime)) # Backdate the object-store copy (as it would be after a real commit). obj_path = object_path(repo, oid) os.utime(obj_path, (old_mtime - 86400, old_mtime - 86400)) restore_object(repo, oid, dest) assert os.stat(dest).st_mtime >= old_mtime, ( "Restored file mtime went backwards — editor will not see the change." ) # --------------------------------------------------------------------------- # Gap 7: page-cache non-flush — defense-in-depth (I-1 catches what I-2 misses) # --------------------------------------------------------------------------- class TestPageCacheDefenseInDepth: """Demonstrate that I-1 (read-time hash verification) is the safety net for the unlikely scenario where fsync appeared to succeed but the kernel wrote zero bytes to disk (power loss AFTER rename, BEFORE flush). Simulated by: writing an object normally, then zeroing the on-disk file (mimicking a power-loss-induced empty file at the renamed destination). read_object must raise OSError — the store never silently serves bad data. """ def test_zeroed_dest_after_rename_caught_by_read(self, tmp_path: pathlib.Path) -> None: """Simulate post-rename page-cache loss: zero the stored file, then read.""" repo = _repo(tmp_path) data = b"page cache simulation" oid = _oid(data) write_object(repo, oid, data) # Mimic power loss that zeroed the file after rename. _corrupt_file(object_path(repo, oid), b"\x00" * len(data)) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_truncated_dest_caught_by_read(self, tmp_path: pathlib.Path) -> None: """Simulate partial flush: only first half of bytes survived power loss.""" repo = _repo(tmp_path) data = b"partial flush simulation" * 10 oid = _oid(data) write_object(repo, oid, data) # Only the first half survived to disk. _corrupt_file(object_path(repo, oid), data[: len(data) // 2]) with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) def test_noop_write_detected(self, tmp_path: pathlib.Path) -> None: """Simulate fh.write no-op (page cache accepted write, never flushed). We write the object normally and then zero the stored file to mimic the outcome of a post-rename page-cache flush failure. read_object must raise OSError — I-1's hash check is the final safety net for any I-2 failure mode. """ repo = _repo(tmp_path) data = b"write syscall accepted but page cache never flushed" oid = _oid(data) # Write correctly first, then simulate the power-loss outcome: the # renamed destination was never actually flushed to durable storage. write_object(repo, oid, data) stored = object_path(repo, oid) _corrupt_file(stored, b"") # zero bytes — what a power loss leaves with pytest.raises(OSError, match="integrity check"): read_object(repo, oid) # --------------------------------------------------------------------------- # Gap 8: same object_id written from N threads simultaneously — idempotency # --------------------------------------------------------------------------- class TestIdempotentConcurrentWrite: """write_object is idempotent: same object_id written from many threads concurrently must never produce corruption — only one write wins, others see exists() and skip. The content of the winner must be correct. """ def test_same_object_50_threads_no_corruption(self, tmp_path: pathlib.Path) -> None: """50 threads writing the same object_id must all succeed with correct content.""" repo = _repo(tmp_path) data = b"idempotent object written from 50 threads" oid = _oid(data) errors: list[str] = [] def writer() -> None: try: write_object(repo, oid, data) result = read_object(repo, oid) if result != data: errors.append(f"Mismatch: {repr(result)[:30]}") except Exception as exc: errors.append(f"Exception: {exc}") threads = [threading.Thread(target=writer) for _ in range(50)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Idempotent concurrent write errors:\n{'\n'.join(errors)}" assert read_object(repo, oid) == data assert _tmp_files(tmp_path) == [] def test_same_object_distinct_content_rejected(self, tmp_path: pathlib.Path) -> None: """Writing different bytes under the same object_id is always rejected.""" repo = _repo(tmp_path) data = b"canonical content" oid = _oid(data) wrong = b"wrong content that hashes differently" write_object(repo, oid, data) with pytest.raises(ValueError, match="integrity"): write_object(repo, oid, wrong) assert read_object(repo, oid) == data # --------------------------------------------------------------------------- # Gap 9: mid-write fh.write failure — orphan cleaned up # --------------------------------------------------------------------------- class TestMidWriteFailureCleanup: """OSError raised during the fh.write call (disk full mid-write) must not leave an orphan temp file in the store directory.""" def test_no_orphan_after_write_failure_write_object(self, tmp_path: pathlib.Path) -> None: """OSError during fh.write must not leave an orphan temp file.""" from unittest.mock import MagicMock repo = _repo(tmp_path) data = b"mid-write failure" oid = _oid(data) mock_fh = MagicMock() mock_fh.__enter__.return_value = mock_fh mock_fh.write.side_effect = OSError("disk full") mock_fh.flush.return_value = None mock_fh.fileno.return_value = -1 with pytest.raises(OSError): with patch("muse.core.object_store.os.fdopen", return_value=mock_fh): write_object(repo, oid, data) assert _tmp_files(tmp_path) == [], "Orphan temp file left after mid-write failure" def test_no_orphan_after_write_failure_write_text_atomic(self, tmp_path: pathlib.Path) -> None: """write_text_atomic cleans up the temp file when fh.write raises.""" from unittest.mock import MagicMock path = tmp_path / "state.txt" mock_fh = MagicMock() mock_fh.__enter__.return_value = mock_fh mock_fh.write.side_effect = OSError("disk full") mock_fh.flush.return_value = None mock_fh.fileno.return_value = -1 with pytest.raises(OSError): with patch("muse.core.io.os.fdopen", return_value=mock_fh): write_text_atomic(path, "will fail") assert _tmp_files(tmp_path) == [], "Orphan temp left after write_text_atomic mid-write failure" # --------------------------------------------------------------------------- # Gap 10: 10 000 sequential commits — store clean throughout # --------------------------------------------------------------------------- class TestSequentialStress: """10 000 sequential commit writes exercise the full fsync+rename path at scale. The store must be clean (all readable, no orphans) when done. Based on the Linux-kernel-migration scenario: Linus runs a git-to-muse import script that writes 75k commits. We test at 10k to keep CI fast. """ @pytest.mark.slow def test_10000_sequential_commits_all_readable(self, tmp_path: pathlib.Path) -> None: """1 000 sequential commits — every one must be readable after write.""" repo = _repo(tmp_path) commits = [_commit(i) for i in range(1_000)] for c in commits: write_commit(repo, c) # Verify every commit is readable and correct. failures: list[str] = [] for c in commits: result = read_commit(repo, c.commit_id) if result is None: failures.append(f"Commit {c.commit_id[:8]} not found after write") elif result.message != c.message: failures.append(f"Commit {c.commit_id[:8]} message corrupted") assert failures == [], f"{len(failures)} commit read failures:\n{'\n'.join(failures[:10])}" assert _tmp_files(tmp_path) == [], "Orphan temps after sequential commit writes" @pytest.mark.slow def test_1000_commits_with_20pct_fsync_failure_all_readable( self, tmp_path: pathlib.Path ) -> None: """100 commits with 20% random fsync failures must all land correctly. Verifies that fsync failure is gracefully handled and atomicity (torn-write protection) is maintained even when durability (fsync) is degraded. """ import random as _random repo = _repo(tmp_path) rng = _random.Random(42) commits = [_commit(i) for i in range(100)] real_fsync = os.fsync def flaky_fsync(fd: int) -> None: if rng.random() < 0.2: raise OSError("simulated fsync failure") real_fsync(fd) with patch("muse.core.commits.os.fsync", side_effect=flaky_fsync): for c in commits: write_commit(repo, c) failures: list[str] = [] for c in commits: result = read_commit(repo, c.commit_id) if result is None: failures.append(f"Commit {c.commit_id[:8]} not found") elif result.message != c.message: failures.append(f"Commit {c.commit_id[:8]} corrupted") assert failures == [], f"Commits lost under flaky fsync:\n{'\n'.join(failures)}" assert _tmp_files(tmp_path) == [] # --------------------------------------------------------------------------- # Gap 11: SIGKILL crash safety — process kill leaves no orphans # --------------------------------------------------------------------------- class TestProcessKillCrashSafety: """Simulate abrupt process termination (SIGKILL) during an object write. Uses multiprocessing to run the writer in a child process, then kills it with SIGKILL at a random moment. Afterward, the store must be consistent: - Objects fully written before the kill must be readable and hash-correct. - No orphan temp files must remain (OS cleans up open fds; the temp file created by mkstemp is unlinked by the OS when the process dies, since it holds the only reference via the fd). Note: On most POSIX systems, a SIGKILL'd process that holds an open mkstemp fd will have that fd closed by the kernel. The temp file remains on disk (the fd close doesn't unlink it) but the rename never happens, so the destination is either fully written or absent — never partial. This test verifies the store consistency guarantee, not orphan cleanup (orphan GC is a separate I-6 concern). """ @pytest.mark.slow def test_sigkill_during_write_leaves_no_partial_dest( self, tmp_path: pathlib.Path ) -> None: """Objects written before SIGKILL must still be readable after kill.""" import multiprocessing import signal import time repo = _repo(tmp_path) # Write 20 known objects before spawning the crashable process. pre_oids: list[str] = [] for i in range(20): data = f"pre-kill-{i}".encode() oid = _oid(data) write_object(repo, oid, data) pre_oids.append(oid) # "spawn" starts a fresh interpreter — no multi-threaded-fork warning # and no risk of deadlocks inherited from the pytest runner's threads. # _sigkill_writer_worker is defined at module level to ensure it is # picklable across the spawn boundary. ctx = multiprocessing.get_context("spawn") proc = ctx.Process(target=_sigkill_writer_worker, args=(repo, 5000)) proc.start() # Kill the worker after a short random delay. import random time.sleep(random.uniform(0.01, 0.05)) if proc.is_alive(): assert proc.pid is not None os.kill(proc.pid, signal.SIGKILL) proc.join() # All pre-kill objects must still be readable and correct. for i, oid in enumerate(pre_oids): data = f"pre-kill-{i}".encode() result = read_object(repo, oid) assert result == data, f"Pre-kill object {oid[:8]} corrupted after SIGKILL" # Store consistency: every object file in the store must hash-verify. import muse.core.object_store as _ost all_oids = _ost._iter_all_object_ids(repo) if hasattr(_ost, "_iter_all_object_ids") else [] for oid in all_oids: try: read_object(repo, oid) # raises on hash mismatch except OSError as exc: pytest.fail(f"Corrupt object {oid[:8]} found after SIGKILL: {exc}") # --------------------------------------------------------------------------- # Gap 12: Performance benchmark — 4 KiB JSON write + fsync < 5 ms # --------------------------------------------------------------------------- class TestFsyncWritePerformance: """fsync overhead on a 4 KiB JSON commit write must be < 5 ms. The syscall is dominated by the OS flush latency, not the data volume. tmpfs (which tmp_path typically uses on Linux) syncs instantly; on macOS with APFS this is also sub-millisecond. 5 ms is a very generous budget — a real NVMe commit flush is typically < 0.5 ms. """ @pytest.mark.perf def test_write_commit_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None: """Single 4 KiB commit write (JSON + fsync + rename) < 5 ms.""" import time repo = _repo(tmp_path) c = _commit(99_000) start = time.perf_counter() write_commit(repo, c) duration_ms = (time.perf_counter() - start) * 1000 assert read_commit(repo, c.commit_id) is not None assert duration_ms < 10, ( f"write_commit took {duration_ms:.2f} ms — exceeds the 10 ms fsync budget. " "Performance regression in the atomic write path." ) @pytest.mark.perf def test_write_object_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None: """Single 4 KiB object write (bytes + fsync + rename) < 5 ms.""" import time repo = _repo(tmp_path) data = os.urandom(4096) oid = _oid(data) start = time.perf_counter() write_object(repo, oid, data) duration_ms = (time.perf_counter() - start) * 1000 assert read_object(repo, oid) == data assert duration_ms < 10, ( f"write_object took {duration_ms:.2f} ms — exceeds the 10 ms fsync budget." ) @pytest.mark.perf def test_write_text_atomic_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None: """write_text_atomic on a 4 KiB text blob < 5 ms.""" import time path = tmp_path / "state.txt" text = "x" * 4096 start = time.perf_counter() write_text_atomic(path, text) duration_ms = (time.perf_counter() - start) * 1000 assert path.read_text() == text assert duration_ms < 10, ( f"write_text_atomic took {duration_ms:.2f} ms — exceeds the 10 ms budget." )