""" Tests for the bug: _write_remote_records uses write_text() which truncates the target file to 0 bytes before writing. Any concurrent reader between the truncation and write completion sees either 0 bytes or partial/corrupt content. Root cause (coord_sync.py::_write_remote_records, last line): target.write_text(f"{json.dumps(rec)} ", encoding="utf-8") write_text() opens with mode 'w', which truncates to 0 bytes immediately. The write then fills the file. Between truncation and final close() there is a window where the file is partially written or empty. The fix: use write_text_atomic() which writes to a mkstemp temp file and then os.rename()s into place. rename() is atomic on POSIX — readers always see either the old content or the complete new content, never 0 bytes. write_text_atomic is already imported in coord_sync.py and used throughout the codebase for exactly this purpose. Coverage: Unit — _write_remote_records produces valid files post-write Concurrency — concurrent writers never leave 0-byte or invalid-JSON files Data integrity — written content exactly matches the source record Idempotency — overwriting same record_id produces correct final state Security — path validation still enforced after atomicity fix Performance — atomic write is not dramatically slower than write_text Regression — all existing _write_remote_records security tests still pass """ from __future__ import annotations import json import pathlib import threading import time from unittest.mock import patch import pytest from muse.core.types import MsgpackDict from muse.core.paths import coordination_dir, muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim") _FUTURE_TS = "2099-12-31T23:59:59+00:00" def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir(parents=True, exist_ok=True) return tmp_path def _record(kind: str, record_id: str, payload_size: int = 64) -> MsgpackDict: return { "kind": kind, "record_id": record_id, "run_id": "run-torvalds", "payload": {"data": "k" * payload_size}, "expires_at": _FUTURE_TS, } def _remote_path(root: pathlib.Path, kind: str, record_id: str) -> pathlib.Path: return coordination_dir(root) / "remote" / kind / f"{record_id}.json" def _write_remote(root: pathlib.Path, records: list[dict]) -> None: from muse.cli.commands.coord_sync import _write_remote_records _write_remote_records(root, records) # ============================================================================= # 1. UNIT — basic correctness after write # ============================================================================= class TestWriteRemoteUnit: def test_written_file_is_valid_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) rec = _record("reservation", "res-000001") _write_remote(root, [rec]) path = _remote_path(root, "reservation", "res-000001") assert path.exists() data = json.loads(path.read_text()) assert data["kind"] == "reservation" assert data["record_id"] == "res-000001" def test_written_file_content_exactly_matches_record(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) rec = _record("task", "task-abc123", payload_size=256) _write_remote(root, [rec]) path = _remote_path(root, "task", "task-abc123") data = json.loads(path.read_text()) assert data == rec def test_file_is_never_empty_after_write(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) for i in range(20): rec = _record("heartbeat", f"hb-{i:04d}") _write_remote(root, [rec]) path = _remote_path(root, "heartbeat", f"hb-{i:04d}") content = path.read_text() assert content, f"file {path} is empty after write" def test_overwrite_produces_new_content(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) rec_v1 = _record("claim", "claim-001") rec_v1["payload"] = {"version": 1} _write_remote(root, [rec_v1]) rec_v2 = _record("claim", "claim-001") rec_v2["payload"] = {"version": 2} _write_remote(root, [rec_v2]) path = _remote_path(root, "claim", "claim-001") data = json.loads(path.read_text()) assert data["payload"]["version"] == 2 def test_all_seven_kinds_written_correctly(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) records = [_record(kind, f"{kind}-001") for kind in _ALL_KINDS] _write_remote(root, records) for kind in _ALL_KINDS: path = _remote_path(root, kind, f"{kind}-001") assert path.exists(), f"{kind} file not written" data = json.loads(path.read_text()) assert data["kind"] == kind # ============================================================================= # 2. CONCURRENCY — concurrent writers must never produce 0-byte or corrupt files # ============================================================================= class TestWriteRemoteConcurrency: """ These tests FAIL before the fix (write_text) and PASS after (write_text_atomic). The bug: write_text opens with 'w' which truncates to 0 bytes before writing. Between truncation and close(), concurrent readers see empty or partial files. """ def test_concurrent_writes_same_record_id_no_zero_byte_reads(self, tmp_path: pathlib.Path) -> None: """ Writer thread: overwrites the same file 500 times. Reader thread: reads the file concurrently, checks for 0-byte or corrupt reads. """ root = _make_repo(tmp_path) rec = _record("reservation", "res-concurrent", payload_size=4096) # Prime the file _write_remote(root, [rec]) target = _remote_path(root, "reservation", "res-concurrent") zero_byte_reads: list[int] = [] corrupt_reads: list[str] = [] stop = threading.Event() def writer() -> None: for _ in range(500): _write_remote(root, [rec]) def reader() -> None: iteration = 0 while not stop.is_set(): try: content = target.read_text() if not content.strip(): zero_byte_reads.append(iteration) else: json.loads(content) except json.JSONDecodeError as exc: corrupt_reads.append(str(exc)) except FileNotFoundError: pass # transient — rename may briefly make old name disappear iteration += 1 t_writer = threading.Thread(target=writer) t_reader = threading.Thread(target=reader) t_reader.start() t_writer.start() t_writer.join() stop.set() t_reader.join() assert zero_byte_reads == [], ( f"Saw {len(zero_byte_reads)} zero-byte reads — write_text truncation window exposed.\n" f"First occurrence at reader iteration {zero_byte_reads[0]}.\n" f"Fix: use write_text_atomic() instead of write_text()." ) assert corrupt_reads == [], ( f"Saw {len(corrupt_reads)} corrupt-JSON reads — torn write occurred.\n" f"First error: {corrupt_reads[0]}" ) def test_concurrent_writes_different_record_ids_all_valid(self, tmp_path: pathlib.Path) -> None: """ 8 threads each writing their own record_id 100 times. All files must be valid JSON after all threads complete. """ root = _make_repo(tmp_path) n_threads = 8 n_writes = 100 def worker(idx: int) -> None: rec = _record("intent", f"intent-{idx:04d}", payload_size=2048) for _ in range(n_writes): _write_remote(root, [rec]) threads = [threading.Thread(target=worker, args=(i,)) for i in range(n_threads)] for t in threads: t.start() for t in threads: t.join() # All files must be valid JSON for i in range(n_threads): path = _remote_path(root, "intent", f"intent-{i:04d}") assert path.exists(), f"intent-{i:04d}.json missing" content = path.read_text() assert content, f"intent-{i:04d}.json is empty" data = json.loads(content) assert data["record_id"] == f"intent-{i:04d}" def test_concurrent_pull_and_read_no_corruption(self, tmp_path: pathlib.Path) -> None: """ Simulates two concurrent coord sync pull operations writing to the same remote dir. Both pull the same record. Reader checks consistency throughout. """ root = _make_repo(tmp_path) rec = _record("release", "rel-kernel-6-14", payload_size=8192) _write_remote(root, [rec]) # prime target = _remote_path(root, "release", "rel-kernel-6-14") errors: list[str] = [] stop = threading.Event() def puller() -> None: for _ in range(300): _write_remote(root, [rec]) def reader() -> None: while not stop.is_set(): try: content = target.read_text() if not content.strip(): errors.append("zero-byte file read") else: json.loads(content) except json.JSONDecodeError as exc: errors.append(f"corrupt JSON: {exc}") except FileNotFoundError: pass t1 = threading.Thread(target=puller) t2 = threading.Thread(target=puller) t_reader = threading.Thread(target=reader) t_reader.start() t1.start() t2.start() t1.join() t2.join() stop.set() t_reader.join() assert errors == [], ( f"{len(errors)} corruption events during concurrent pull simulation.\n" f"First: {errors[0]}" ) # ============================================================================= # 3. DATA INTEGRITY — written content survives overwrite correctly # ============================================================================= class TestWriteRemoteDataIntegrity: def test_large_payload_written_completely(self, tmp_path: pathlib.Path) -> None: """50KB payload must be written and read back completely.""" root = _make_repo(tmp_path) rec = _record("dependency", "dep-kernel-mm", payload_size=50 * 1024) _write_remote(root, [rec]) path = _remote_path(root, "dependency", "dep-kernel-mm") data = json.loads(path.read_text()) assert len(data["payload"]["data"]) == 50 * 1024 def test_sequential_overwrites_produce_correct_final_state(self, tmp_path: pathlib.Path) -> None: """100 sequential overwrites of the same record_id — final content is correct.""" root = _make_repo(tmp_path) for version in range(100): rec = _record("reservation", "res-overwrite") rec["payload"]["version"] = version _write_remote(root, [rec]) path = _remote_path(root, "reservation", "res-overwrite") data = json.loads(path.read_text()) assert data["payload"]["version"] == 99 def test_batch_write_all_files_present(self, tmp_path: pathlib.Path) -> None: """Writing 1000 records in one call — all files must be present and valid.""" root = _make_repo(tmp_path) records = [ _record(_ALL_KINDS[i % len(_ALL_KINDS)], f"rec-{i:06d}") for i in range(1000) ] _write_remote(root, records) for i, rec in enumerate(records): path = _remote_path(root, rec["kind"], f"rec-{i:06d}") assert path.exists(), f"rec-{i:06d} missing" data = json.loads(path.read_text()) assert data["record_id"] == f"rec-{i:06d}" def test_file_never_partially_written_single_thread(self, tmp_path: pathlib.Path) -> None: """Single-threaded: read immediately after each write must be complete.""" root = _make_repo(tmp_path) for i in range(50): rec = _record("task", "task-sequential", payload_size=1024 * (i % 10 + 1)) _write_remote(root, [rec]) path = _remote_path(root, "task", "task-sequential") content = path.read_text() assert content, f"empty file after write {i}" data = json.loads(content) assert data == rec, f"content mismatch at write {i}" # ============================================================================= # 4. SECURITY — path validation still enforced (regression) # ============================================================================= class TestWriteRemoteSecurity: """Verify the fix doesn't break existing security validation.""" def test_unknown_kind_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) rec = {"kind": "../traversal", "record_id": "safe-id", "payload": {}} _write_remote(root, [rec]) malicious_path = coordination_dir(root) / "remote" / ".." / "traversal" / "safe-id.json" assert not malicious_path.resolve().exists(), "path traversal via kind succeeded" def test_unsafe_record_id_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) rec = {"kind": "reservation", "record_id": "../../../etc/passwd", "payload": {}} _write_remote(root, [rec]) malicious = coordination_dir(root) / "remote" / "reservation" / "../../../etc/passwd.json" assert not pathlib.Path("/etc/passwd.json").exists() or True # sanity # The file must not be written outside the remote/ dir remote_dir = coordination_dir(root) / "remote" written = list(remote_dir.rglob("*.json")) if remote_dir.exists() else [] assert written == [], f"unsafe record_id produced files: {written}" def test_null_byte_record_id_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) rec = {"kind": "reservation", "record_id": "valid\x00malicious", "payload": {}} _write_remote(root, [rec]) remote_dir = coordination_dir(root) / "remote" written = list(remote_dir.rglob("*.json")) if remote_dir.exists() else [] assert written == [], f"null-byte record_id produced files: {written}" def test_valid_records_still_written_after_invalid_ones_skipped(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) records = [ {"kind": "../traversal", "record_id": "id-1", "payload": {}}, # bad kind _record("reservation", "res-valid-001"), # good {"kind": "reservation", "record_id": "../bad", "payload": {}}, # bad record_id _record("task", "task-valid-001"), # good ] _write_remote(root, records) assert _remote_path(root, "reservation", "res-valid-001").exists() assert _remote_path(root, "task", "task-valid-001").exists() # Only 2 valid files remote_dir = coordination_dir(root) / "remote" written = list(remote_dir.rglob("*.json")) assert len(written) == 2 # ============================================================================= # 5. PERFORMANCE — atomic write is not dramatically slower than write_text # ============================================================================= class TestWriteRemotePerformance: def test_500_records_completes_under_3s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) records = [ _record(_ALL_KINDS[i % len(_ALL_KINDS)], f"perf-{i:06d}", payload_size=512) for i in range(500) ] t0 = time.monotonic() _write_remote(root, records) elapsed = time.monotonic() - t0 assert elapsed < 3.0, f"500 atomic writes took {elapsed:.3f}s (> 3s)" def test_atomic_write_throughput_records_per_second(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) records = [_record("reservation", f"tput-{i:06d}") for i in range(200)] t0 = time.monotonic() _write_remote(root, records) elapsed = time.monotonic() - t0 rps = 200 / elapsed if elapsed > 0 else float("inf") assert rps >= 50, f"write_remote throughput {rps:.0f} rec/s is below minimum 50/s"