gabriel / muse public
test_coord_write_remote_atomic.py python
408 lines 16.6 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """
2 Tests for the bug: _write_remote_records uses write_text() which truncates
3 the target file to 0 bytes before writing. Any concurrent reader between the
4 truncation and write completion sees either 0 bytes or partial/corrupt content.
5
6 Root cause (coord_sync.py::_write_remote_records, last line):
7
8 target.write_text(f"{json.dumps(rec)}
9 ", encoding="utf-8")
10
11 write_text() opens with mode 'w', which truncates to 0 bytes immediately.
12 The write then fills the file. Between truncation and final close() there is
13 a window where the file is partially written or empty.
14
15 The fix: use write_text_atomic() which writes to a mkstemp temp file and then
16 os.rename()s into place. rename() is atomic on POSIX — readers always see
17 either the old content or the complete new content, never 0 bytes.
18
19 write_text_atomic is already imported in coord_sync.py and used throughout
20 the codebase for exactly this purpose.
21
22 Coverage:
23 Unit — _write_remote_records produces valid files post-write
24 Concurrency — concurrent writers never leave 0-byte or invalid-JSON files
25 Data integrity — written content exactly matches the source record
26 Idempotency — overwriting same record_id produces correct final state
27 Security — path validation still enforced after atomicity fix
28 Performance — atomic write is not dramatically slower than write_text
29 Regression — all existing _write_remote_records security tests still pass
30 """
31 from __future__ import annotations
32
33 import json
34 import pathlib
35 import threading
36 import time
37 from unittest.mock import patch
38
39 import pytest
40
41 from muse.core.types import MsgpackDict
42 from muse.core.paths import coordination_dir, muse_dir
43
44 # ---------------------------------------------------------------------------
45 # Helpers
46 # ---------------------------------------------------------------------------
47
48 _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim")
49 _FUTURE_TS = "2099-12-31T23:59:59+00:00"
50
51
52 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
53 muse_dir(tmp_path).mkdir(parents=True, exist_ok=True)
54 return tmp_path
55
56
57 def _record(kind: str, record_id: str, payload_size: int = 64) -> MsgpackDict:
58 return {
59 "kind": kind,
60 "record_id": record_id,
61 "run_id": "run-torvalds",
62 "payload": {"data": "k" * payload_size},
63 "expires_at": _FUTURE_TS,
64 }
65
66
67 def _remote_path(root: pathlib.Path, kind: str, record_id: str) -> pathlib.Path:
68 return coordination_dir(root) / "remote" / kind / f"{record_id}.json"
69
70
71 def _write_remote(root: pathlib.Path, records: list[dict]) -> None:
72 from muse.cli.commands.coord_sync import _write_remote_records
73 _write_remote_records(root, records)
74
75
76 # =============================================================================
77 # 1. UNIT — basic correctness after write
78 # =============================================================================
79
80 class TestWriteRemoteUnit:
81
82 def test_written_file_is_valid_json(self, tmp_path: pathlib.Path) -> None:
83 root = _make_repo(tmp_path)
84 rec = _record("reservation", "res-000001")
85 _write_remote(root, [rec])
86 path = _remote_path(root, "reservation", "res-000001")
87 assert path.exists()
88 data = json.loads(path.read_text())
89 assert data["kind"] == "reservation"
90 assert data["record_id"] == "res-000001"
91
92 def test_written_file_content_exactly_matches_record(self, tmp_path: pathlib.Path) -> None:
93 root = _make_repo(tmp_path)
94 rec = _record("task", "task-abc123", payload_size=256)
95 _write_remote(root, [rec])
96 path = _remote_path(root, "task", "task-abc123")
97 data = json.loads(path.read_text())
98 assert data == rec
99
100 def test_file_is_never_empty_after_write(self, tmp_path: pathlib.Path) -> None:
101 root = _make_repo(tmp_path)
102 for i in range(20):
103 rec = _record("heartbeat", f"hb-{i:04d}")
104 _write_remote(root, [rec])
105 path = _remote_path(root, "heartbeat", f"hb-{i:04d}")
106 content = path.read_text()
107 assert content, f"file {path} is empty after write"
108
109 def test_overwrite_produces_new_content(self, tmp_path: pathlib.Path) -> None:
110 root = _make_repo(tmp_path)
111 rec_v1 = _record("claim", "claim-001")
112 rec_v1["payload"] = {"version": 1}
113 _write_remote(root, [rec_v1])
114
115 rec_v2 = _record("claim", "claim-001")
116 rec_v2["payload"] = {"version": 2}
117 _write_remote(root, [rec_v2])
118
119 path = _remote_path(root, "claim", "claim-001")
120 data = json.loads(path.read_text())
121 assert data["payload"]["version"] == 2
122
123 def test_all_seven_kinds_written_correctly(self, tmp_path: pathlib.Path) -> None:
124 root = _make_repo(tmp_path)
125 records = [_record(kind, f"{kind}-001") for kind in _ALL_KINDS]
126 _write_remote(root, records)
127 for kind in _ALL_KINDS:
128 path = _remote_path(root, kind, f"{kind}-001")
129 assert path.exists(), f"{kind} file not written"
130 data = json.loads(path.read_text())
131 assert data["kind"] == kind
132
133
134 # =============================================================================
135 # 2. CONCURRENCY — concurrent writers must never produce 0-byte or corrupt files
136 # =============================================================================
137
138 class TestWriteRemoteConcurrency:
139 """
140 These tests FAIL before the fix (write_text) and PASS after (write_text_atomic).
141
142 The bug: write_text opens with 'w' which truncates to 0 bytes before writing.
143 Between truncation and close(), concurrent readers see empty or partial files.
144 """
145
146 def test_concurrent_writes_same_record_id_no_zero_byte_reads(self, tmp_path: pathlib.Path) -> None:
147 """
148 Writer thread: overwrites the same file 500 times.
149 Reader thread: reads the file concurrently, checks for 0-byte or corrupt reads.
150 """
151 root = _make_repo(tmp_path)
152 rec = _record("reservation", "res-concurrent", payload_size=4096)
153 # Prime the file
154 _write_remote(root, [rec])
155
156 target = _remote_path(root, "reservation", "res-concurrent")
157 zero_byte_reads: list[int] = []
158 corrupt_reads: list[str] = []
159 stop = threading.Event()
160
161 def writer() -> None:
162 for _ in range(500):
163 _write_remote(root, [rec])
164
165 def reader() -> None:
166 iteration = 0
167 while not stop.is_set():
168 try:
169 content = target.read_text()
170 if not content.strip():
171 zero_byte_reads.append(iteration)
172 else:
173 json.loads(content)
174 except json.JSONDecodeError as exc:
175 corrupt_reads.append(str(exc))
176 except FileNotFoundError:
177 pass # transient — rename may briefly make old name disappear
178 iteration += 1
179
180 t_writer = threading.Thread(target=writer)
181 t_reader = threading.Thread(target=reader)
182 t_reader.start()
183 t_writer.start()
184 t_writer.join()
185 stop.set()
186 t_reader.join()
187
188 assert zero_byte_reads == [], (
189 f"Saw {len(zero_byte_reads)} zero-byte reads — write_text truncation window exposed.\n"
190 f"First occurrence at reader iteration {zero_byte_reads[0]}.\n"
191 f"Fix: use write_text_atomic() instead of write_text()."
192 )
193 assert corrupt_reads == [], (
194 f"Saw {len(corrupt_reads)} corrupt-JSON reads — torn write occurred.\n"
195 f"First error: {corrupt_reads[0]}"
196 )
197
198 def test_concurrent_writes_different_record_ids_all_valid(self, tmp_path: pathlib.Path) -> None:
199 """
200 8 threads each writing their own record_id 100 times.
201 All files must be valid JSON after all threads complete.
202 """
203 root = _make_repo(tmp_path)
204 n_threads = 8
205 n_writes = 100
206
207 def worker(idx: int) -> None:
208 rec = _record("intent", f"intent-{idx:04d}", payload_size=2048)
209 for _ in range(n_writes):
210 _write_remote(root, [rec])
211
212 threads = [threading.Thread(target=worker, args=(i,)) for i in range(n_threads)]
213 for t in threads:
214 t.start()
215 for t in threads:
216 t.join()
217
218 # All files must be valid JSON
219 for i in range(n_threads):
220 path = _remote_path(root, "intent", f"intent-{i:04d}")
221 assert path.exists(), f"intent-{i:04d}.json missing"
222 content = path.read_text()
223 assert content, f"intent-{i:04d}.json is empty"
224 data = json.loads(content)
225 assert data["record_id"] == f"intent-{i:04d}"
226
227 def test_concurrent_pull_and_read_no_corruption(self, tmp_path: pathlib.Path) -> None:
228 """
229 Simulates two concurrent coord sync pull operations writing to the same remote dir.
230 Both pull the same record. Reader checks consistency throughout.
231 """
232 root = _make_repo(tmp_path)
233 rec = _record("release", "rel-kernel-6-14", payload_size=8192)
234 _write_remote(root, [rec]) # prime
235
236 target = _remote_path(root, "release", "rel-kernel-6-14")
237 errors: list[str] = []
238 stop = threading.Event()
239
240 def puller() -> None:
241 for _ in range(300):
242 _write_remote(root, [rec])
243
244 def reader() -> None:
245 while not stop.is_set():
246 try:
247 content = target.read_text()
248 if not content.strip():
249 errors.append("zero-byte file read")
250 else:
251 json.loads(content)
252 except json.JSONDecodeError as exc:
253 errors.append(f"corrupt JSON: {exc}")
254 except FileNotFoundError:
255 pass
256
257 t1 = threading.Thread(target=puller)
258 t2 = threading.Thread(target=puller)
259 t_reader = threading.Thread(target=reader)
260
261 t_reader.start()
262 t1.start()
263 t2.start()
264 t1.join()
265 t2.join()
266 stop.set()
267 t_reader.join()
268
269 assert errors == [], (
270 f"{len(errors)} corruption events during concurrent pull simulation.\n"
271 f"First: {errors[0]}"
272 )
273
274
275 # =============================================================================
276 # 3. DATA INTEGRITY — written content survives overwrite correctly
277 # =============================================================================
278
279 class TestWriteRemoteDataIntegrity:
280
281 def test_large_payload_written_completely(self, tmp_path: pathlib.Path) -> None:
282 """50KB payload must be written and read back completely."""
283 root = _make_repo(tmp_path)
284 rec = _record("dependency", "dep-kernel-mm", payload_size=50 * 1024)
285 _write_remote(root, [rec])
286
287 path = _remote_path(root, "dependency", "dep-kernel-mm")
288 data = json.loads(path.read_text())
289 assert len(data["payload"]["data"]) == 50 * 1024
290
291 def test_sequential_overwrites_produce_correct_final_state(self, tmp_path: pathlib.Path) -> None:
292 """100 sequential overwrites of the same record_id — final content is correct."""
293 root = _make_repo(tmp_path)
294 for version in range(100):
295 rec = _record("reservation", "res-overwrite")
296 rec["payload"]["version"] = version
297 _write_remote(root, [rec])
298
299 path = _remote_path(root, "reservation", "res-overwrite")
300 data = json.loads(path.read_text())
301 assert data["payload"]["version"] == 99
302
303 def test_batch_write_all_files_present(self, tmp_path: pathlib.Path) -> None:
304 """Writing 1000 records in one call — all files must be present and valid."""
305 root = _make_repo(tmp_path)
306 records = [
307 _record(_ALL_KINDS[i % len(_ALL_KINDS)], f"rec-{i:06d}")
308 for i in range(1000)
309 ]
310 _write_remote(root, records)
311
312 for i, rec in enumerate(records):
313 path = _remote_path(root, rec["kind"], f"rec-{i:06d}")
314 assert path.exists(), f"rec-{i:06d} missing"
315 data = json.loads(path.read_text())
316 assert data["record_id"] == f"rec-{i:06d}"
317
318 def test_file_never_partially_written_single_thread(self, tmp_path: pathlib.Path) -> None:
319 """Single-threaded: read immediately after each write must be complete."""
320 root = _make_repo(tmp_path)
321 for i in range(50):
322 rec = _record("task", "task-sequential", payload_size=1024 * (i % 10 + 1))
323 _write_remote(root, [rec])
324 path = _remote_path(root, "task", "task-sequential")
325 content = path.read_text()
326 assert content, f"empty file after write {i}"
327 data = json.loads(content)
328 assert data == rec, f"content mismatch at write {i}"
329
330
331 # =============================================================================
332 # 4. SECURITY — path validation still enforced (regression)
333 # =============================================================================
334
335 class TestWriteRemoteSecurity:
336 """Verify the fix doesn't break existing security validation."""
337
338 def test_unknown_kind_rejected(self, tmp_path: pathlib.Path) -> None:
339 root = _make_repo(tmp_path)
340 rec = {"kind": "../traversal", "record_id": "safe-id", "payload": {}}
341 _write_remote(root, [rec])
342 malicious_path = coordination_dir(root) / "remote" / ".." / "traversal" / "safe-id.json"
343 assert not malicious_path.resolve().exists(), "path traversal via kind succeeded"
344
345 def test_unsafe_record_id_rejected(self, tmp_path: pathlib.Path) -> None:
346 root = _make_repo(tmp_path)
347 rec = {"kind": "reservation", "record_id": "../../../etc/passwd", "payload": {}}
348 _write_remote(root, [rec])
349 malicious = coordination_dir(root) / "remote" / "reservation" / "../../../etc/passwd.json"
350 assert not pathlib.Path("/etc/passwd.json").exists() or True # sanity
351 # The file must not be written outside the remote/ dir
352 remote_dir = coordination_dir(root) / "remote"
353 written = list(remote_dir.rglob("*.json")) if remote_dir.exists() else []
354 assert written == [], f"unsafe record_id produced files: {written}"
355
356 def test_null_byte_record_id_rejected(self, tmp_path: pathlib.Path) -> None:
357 root = _make_repo(tmp_path)
358 rec = {"kind": "reservation", "record_id": "valid\x00malicious", "payload": {}}
359 _write_remote(root, [rec])
360 remote_dir = coordination_dir(root) / "remote"
361 written = list(remote_dir.rglob("*.json")) if remote_dir.exists() else []
362 assert written == [], f"null-byte record_id produced files: {written}"
363
364 def test_valid_records_still_written_after_invalid_ones_skipped(self, tmp_path: pathlib.Path) -> None:
365 root = _make_repo(tmp_path)
366 records = [
367 {"kind": "../traversal", "record_id": "id-1", "payload": {}}, # bad kind
368 _record("reservation", "res-valid-001"), # good
369 {"kind": "reservation", "record_id": "../bad", "payload": {}}, # bad record_id
370 _record("task", "task-valid-001"), # good
371 ]
372 _write_remote(root, records)
373
374 assert _remote_path(root, "reservation", "res-valid-001").exists()
375 assert _remote_path(root, "task", "task-valid-001").exists()
376 # Only 2 valid files
377 remote_dir = coordination_dir(root) / "remote"
378 written = list(remote_dir.rglob("*.json"))
379 assert len(written) == 2
380
381
382 # =============================================================================
383 # 5. PERFORMANCE — atomic write is not dramatically slower than write_text
384 # =============================================================================
385
386 class TestWriteRemotePerformance:
387
388 def test_500_records_completes_under_3s(self, tmp_path: pathlib.Path) -> None:
389 root = _make_repo(tmp_path)
390 records = [
391 _record(_ALL_KINDS[i % len(_ALL_KINDS)], f"perf-{i:06d}", payload_size=512)
392 for i in range(500)
393 ]
394 t0 = time.monotonic()
395 _write_remote(root, records)
396 elapsed = time.monotonic() - t0
397 assert elapsed < 3.0, f"500 atomic writes took {elapsed:.3f}s (> 3s)"
398
399 def test_atomic_write_throughput_records_per_second(self, tmp_path: pathlib.Path) -> None:
400 root = _make_repo(tmp_path)
401 records = [_record("reservation", f"tput-{i:06d}") for i in range(200)]
402
403 t0 = time.monotonic()
404 _write_remote(root, records)
405 elapsed = time.monotonic() - t0
406
407 rps = 200 / elapsed if elapsed > 0 else float("inf")
408 assert rps >= 50, f"write_remote throughput {rps:.0f} rec/s is below minimum 50/s"
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago