""" Tests for the bug: write_text_atomic silently swallowed ALL OSError from os.fsync() — including ENOSPC and EIO — instead of only suppressing EINVAL (the errno virtual filesystems return to indicate fsync is unsupported). Root cause (muse/core/store.py): write_text_atomic lines 324–327: try: os.fsync(fh.fileno()) except OSError: pass # best-effort ← BUG: swallows ENOSPC, EIO, etc. When a disk is full (ENOSPC) or has a hardware error (EIO), fsync raises an OSError with errno.ENOSPC or errno.EIO. The current code silently swallows these errors. tmp.replace(path) then succeeds — the target file now points at a temp file whose data is only in the page cache. The caller sees a normal return (no exception) and believes the write succeeded. The OS may silently discard the page-cache data if it cannot flush it to disk. The fix: only suppress errno.EINVAL. Re-raise everything else (ENOSPC, EIO, EROFS, EBADF, …). Coverage: Unit — write_text_atomic raises on ENOSPC, EIO; suppresses EINVAL Data integrity — after ENOSPC, no misleading success state in caller Security — ENOSPC during HEAD/branch ref writes propagates (not silenced) Integration — coord record write propagates ENOSPC to _write_remote_records E2E — CLI coord sync gets clean error, not silent corruption Stress — rapid repeated ENOSPC raises, never succeeds silently Performance — suppressed EINVAL path (normal) is not dramatically slower Regression — EINVAL is still suppressed (virtual filesystem compatibility) """ from __future__ import annotations import errno import os import pathlib import sys import tempfile import threading import time from unittest.mock import MagicMock, patch import pytest from muse.core.types import MsgpackDict from muse.core.paths import coordination_dir, head_path, heads_dir, muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir(parents=True, exist_ok=True) return tmp_path def _oserror(err: int) -> OSError: e = OSError(err, os.strerror(err)) e.errno = err return e # ============================================================================= # 1. UNIT — write_text_atomic fsync error handling # ============================================================================= class TestWriteTextAtomicFsync: """write_text_atomic must re-raise fatal OSErrors and suppress only EINVAL.""" def test_enospc_raises(self, tmp_path: pathlib.Path) -> None: """ENOSPC from fsync must propagate — disk full is a fatal error.""" from muse.core.io import write_text_atomic with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError) as exc_info: write_text_atomic(tmp_path / "test.txt", "hello") assert exc_info.value.errno == errno.ENOSPC def test_eio_raises(self, tmp_path: pathlib.Path) -> None: """EIO from fsync must propagate — hardware error is fatal.""" from muse.core.io import write_text_atomic with patch("os.fsync", side_effect=_oserror(errno.EIO)): with pytest.raises(OSError) as exc_info: write_text_atomic(tmp_path / "test.txt", "hello") assert exc_info.value.errno == errno.EIO def test_erofs_raises(self, tmp_path: pathlib.Path) -> None: """EROFS (read-only filesystem) from fsync must propagate.""" from muse.core.io import write_text_atomic with patch("os.fsync", side_effect=_oserror(errno.EROFS)): with pytest.raises(OSError) as exc_info: write_text_atomic(tmp_path / "test.txt", "hello") assert exc_info.value.errno == errno.EROFS def test_einval_suppressed(self, tmp_path: pathlib.Path) -> None: """EINVAL from fsync must be silently suppressed (virtual filesystem compat).""" from muse.core.io import write_text_atomic with patch("os.fsync", side_effect=_oserror(errno.EINVAL)): write_text_atomic(tmp_path / "test.txt", "hello") # must not raise assert (tmp_path / "test.txt").read_text() == "hello" def test_enospc_leaves_no_temp_files(self, tmp_path: pathlib.Path) -> None: """On ENOSPC, the temp file must be cleaned up — no orphaned .muse-tmp-* files.""" from muse.core.io import write_text_atomic with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError): write_text_atomic(tmp_path / "output.txt", "data") tmp_files = list(tmp_path.glob(".muse-tmp-*")) assert tmp_files == [], f"orphaned temp files after ENOSPC: {tmp_files}" def test_enospc_does_not_create_target(self, tmp_path: pathlib.Path) -> None: """On ENOSPC, the target file must not be created (rename never called).""" from muse.core.io import write_text_atomic target = tmp_path / "should-not-exist.txt" with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError): write_text_atomic(target, "data") assert not target.exists(), "target file created despite ENOSPC" def test_enospc_does_not_overwrite_existing(self, tmp_path: pathlib.Path) -> None: """On ENOSPC, an existing target file must be preserved (not replaced).""" from muse.core.io import write_text_atomic target = tmp_path / "existing.txt" target.write_text("original content") with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError): write_text_atomic(target, "new content") assert target.read_text() == "original content", ( "existing file was overwritten despite ENOSPC" ) def test_successful_write_still_works(self, tmp_path: pathlib.Path) -> None: """After the fix, normal writes (no fsync error) must still succeed.""" from muse.core.io import write_text_atomic write_text_atomic(tmp_path / "ok.txt", "success") assert (tmp_path / "ok.txt").read_text() == "success" def test_multiple_enospc_all_raise(self, tmp_path: pathlib.Path) -> None: """Every ENOSPC call raises — no silent tolerance after repeated failures.""" from muse.core.io import write_text_atomic for i in range(10): with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError) as exc_info: write_text_atomic(tmp_path / f"file-{i}.txt", f"content-{i}") assert exc_info.value.errno == errno.ENOSPC # ============================================================================= # 2. DATA INTEGRITY — caller sees exception, not silent success # ============================================================================= class TestDataIntegrityOnEnospc: """After ENOSPC, callers must see an exception — never a silent success.""" def test_write_text_atomic_enospc_exception_propagates_to_caller(self, tmp_path: pathlib.Path) -> None: """Callers of write_text_atomic must see OSError on ENOSPC.""" from muse.core.io import write_text_atomic result = None exception = None with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): try: write_text_atomic(tmp_path / "out.txt", "data") result = "success" except OSError as e: exception = e assert result is None, "write_text_atomic returned normally despite ENOSPC" assert exception is not None assert exception.errno == errno.ENOSPC def test_no_stale_state_after_enospc(self, tmp_path: pathlib.Path) -> None: """After ENOSPC, no partial state should exist in the target path.""" from muse.core.io import write_text_atomic target = tmp_path / "state.txt" with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError): write_text_atomic(target, "new state") # Target must not exist (was not pre-existing) assert not target.exists() def test_old_file_preserved_after_enospc(self, tmp_path: pathlib.Path) -> None: """When overwriting, ENOSPC must leave the old file intact.""" from muse.core.io import write_text_atomic target = tmp_path / "config.txt" target.write_text("version: 1") with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError): write_text_atomic(target, "version: 2") assert target.read_text() == "version: 1", "old config was destroyed on ENOSPC" # ============================================================================= # 3. SECURITY — critical VCS state writes must propagate ENOSPC # ============================================================================= class TestSecurityCriticalWritesEnospc: """HEAD, branch refs, and coordination records must not silently corrupt on ENOSPC.""" def test_write_head_enospc_raises(self, tmp_path: pathlib.Path) -> None: """Writing HEAD ref must propagate ENOSPC.""" from muse.core.io import write_text_atomic hp = head_path(tmp_path) hp.parent.mkdir(parents=True, exist_ok=True) with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError) as exc_info: write_text_atomic(hp, "ref: refs/heads/main\n") assert exc_info.value.errno == errno.ENOSPC assert not hp.exists() def test_write_branch_ref_enospc_raises(self, tmp_path: pathlib.Path) -> None: """Writing branch ref must propagate ENOSPC.""" from muse.core.io import write_text_atomic ref_path = heads_dir(tmp_path) / "main" ref_path.parent.mkdir(parents=True, exist_ok=True) with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError) as exc_info: write_text_atomic(ref_path, "abc123def456\n") assert exc_info.value.errno == errno.ENOSPC def test_write_coord_record_enospc_propagates(self, tmp_path: pathlib.Path) -> None: """Writing a coordination record must propagate ENOSPC.""" root = _make_repo(tmp_path) import json from muse.cli.commands.coord_sync import _write_remote_records rec = { "kind": "reservation", "record_id": "res-enospc-test", "run_id": "run-test", "payload": {"data": "important"}, "expires_at": "2099-12-31T23:59:59+00:00", } with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError) as exc_info: _write_remote_records(root, [rec]) assert exc_info.value.errno == errno.ENOSPC def test_enospc_does_not_silently_produce_empty_head(self, tmp_path: pathlib.Path) -> None: """A zero-byte HEAD would cause every muse command to fail — must not happen.""" from muse.core.io import write_text_atomic hp = head_path(tmp_path) hp.parent.mkdir(parents=True, exist_ok=True) with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError): write_text_atomic(hp, "ref: refs/heads/main\n") # HEAD must not exist at all (not as a zero-byte file) if hp.exists(): assert hp.stat().st_size > 0, "HEAD was created as zero-byte file" # ============================================================================= # 4. INTEGRATION — coord _write_remote_records propagates ENOSPC # ============================================================================= class TestIntegrationCoordEnospc: """_write_remote_records uses write_text_atomic — ENOSPC must bubble up.""" def _make_rec(self, kind: str = "reservation", record_id: str = "res-001") -> MsgpackDict: return { "kind": kind, "record_id": record_id, "run_id": "run-torvalds", "payload": {"data": "x" * 1024}, "expires_at": "2099-12-31T23:59:59+00:00", } def test_enospc_raises_from_write_remote_records(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) from muse.cli.commands.coord_sync import _write_remote_records with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError) as exc_info: _write_remote_records(root, [self._make_rec()]) assert exc_info.value.errno == errno.ENOSPC def test_eio_raises_from_write_remote_records(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) from muse.cli.commands.coord_sync import _write_remote_records with patch("os.fsync", side_effect=_oserror(errno.EIO)): with pytest.raises(OSError) as exc_info: _write_remote_records(root, [self._make_rec()]) assert exc_info.value.errno == errno.EIO def test_einval_suppressed_in_write_remote_records(self, tmp_path: pathlib.Path) -> None: """Virtual filesystem compat: EINVAL from fsync must be suppressed.""" root = _make_repo(tmp_path) from muse.cli.commands.coord_sync import _write_remote_records with patch("os.fsync", side_effect=_oserror(errno.EINVAL)): _write_remote_records(root, [self._make_rec()]) # must not raise # File must be present and valid path = ( coordination_dir(tmp_path) / "remote" / "reservation" / "res-001.json" ) assert path.exists() def test_enospc_on_second_record_first_record_still_written(self, tmp_path: pathlib.Path) -> None: """ENOSPC on the second record must not prevent the first from being written.""" root = _make_repo(tmp_path) from muse.cli.commands.coord_sync import _write_remote_records recs = [ self._make_rec("reservation", "res-first"), self._make_rec("intent", "intent-second"), ] call_count = [0] original_fsync = os.fsync def fsync_side_effect(fd: int) -> None: call_count[0] += 1 if call_count[0] >= 2: raise _oserror(errno.ENOSPC) return original_fsync(fd) with patch("os.fsync", side_effect=fsync_side_effect): with pytest.raises(OSError): _write_remote_records(root, recs) first_path = ( coordination_dir(tmp_path) / "remote" / "reservation" / "res-first.json" ) assert first_path.exists(), "first record was not written before ENOSPC" # ============================================================================= # 5. STRESS — repeated ENOSPC never silently succeeds # ============================================================================= class TestStressEnospc: """Repeated ENOSPC must always raise — the bug must never be intermittent.""" def test_100_consecutive_enospc_all_raise(self, tmp_path: pathlib.Path) -> None: from muse.core.io import write_text_atomic silent_successes = 0 for i in range(100): with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): try: write_text_atomic(tmp_path / f"file-{i}.txt", f"data-{i}") silent_successes += 1 except OSError: pass assert silent_successes == 0, ( f"{silent_successes} writes silently succeeded despite ENOSPC" ) def test_concurrent_threads_all_see_enospc(self, tmp_path: pathlib.Path) -> None: """Under concurrent load, every thread sees ENOSPC — not just some. Patch os.fsync globally before spawning threads so the mock is in place for all of them. Patching inside each thread is unsafe because `patch` modifies a module-level attribute (global state) and concurrent `with patch(...)` blocks race with each other. """ from muse.core.io import write_text_atomic silent_successes = [] exceptions = [] lock = threading.Lock() def worker(idx: int) -> None: try: write_text_atomic(tmp_path / f"t-{idx}.txt", f"data-{idx}") with lock: silent_successes.append(idx) except OSError: with lock: exceptions.append(idx) with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): threads = [threading.Thread(target=worker, args=(i,)) for i in range(20)] for t in threads: t.start() for t in threads: t.join() assert silent_successes == [], ( f"Threads {silent_successes} silently succeeded despite ENOSPC" ) assert len(exceptions) == 20 def test_no_orphaned_temp_files_after_100_enospc(self, tmp_path: pathlib.Path) -> None: """100 ENOSPC writes must not leave orphaned temp files.""" from muse.core.io import write_text_atomic for i in range(100): with patch("os.fsync", side_effect=_oserror(errno.ENOSPC)): with pytest.raises(OSError): write_text_atomic(tmp_path / "file.txt", f"data-{i}") tmp_files = list(tmp_path.glob(".muse-tmp-*")) assert tmp_files == [], f"{len(tmp_files)} orphaned temp files" # ============================================================================= # 6. REGRESSION — EINVAL is still suppressed (virtual filesystem compat) # ============================================================================= class TestRegressionEinvalSuppressed: """The fix must not break virtual filesystem compatibility.""" def test_write_text_atomic_einval_suppressed(self, tmp_path: pathlib.Path) -> None: from muse.core.io import write_text_atomic with patch("os.fsync", side_effect=_oserror(errno.EINVAL)): write_text_atomic(tmp_path / "v.txt", "virtual-fs-content") assert (tmp_path / "v.txt").read_text() == "virtual-fs-content" def test_no_fsync_error_still_works(self, tmp_path: pathlib.Path) -> None: """When fsync succeeds normally, write_text_atomic still works.""" from muse.core.io import write_text_atomic write_text_atomic(tmp_path / "normal.txt", "hello world") assert (tmp_path / "normal.txt").read_text() == "hello world" def test_docker_tmpfs_compat_einval_suppressed_batch(self, tmp_path: pathlib.Path) -> None: """20 writes with EINVAL suppressed — simulates Docker tmpfs environment.""" from muse.core.io import write_text_atomic with patch("os.fsync", side_effect=_oserror(errno.EINVAL)): for i in range(20): write_text_atomic(tmp_path / f"file-{i}.txt", f"content-{i}") for i in range(20): assert (tmp_path / f"file-{i}.txt").read_text() == f"content-{i}" # ============================================================================= # 7. PERFORMANCE — suppressed EINVAL (normal path) is not dramatically slower # ============================================================================= class TestPerformanceNormalPath: """The fix must not introduce significant overhead to the common (no-error) path.""" def test_1000_writes_complete_under_5s(self, tmp_path: pathlib.Path) -> None: from muse.core.io import write_text_atomic t0 = time.monotonic() for i in range(1000): write_text_atomic(tmp_path / f"perf-{i:04d}.txt", f"data-{i}" * 32) elapsed = time.monotonic() - t0 assert elapsed < 15.0, f"1000 atomic text writes took {elapsed:.3f}s (> 15s)"