gabriel / muse public
test_integrity_I2_fsync.py python
1,741 lines 69.1 KB
Raw
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor ⚠ breaking 28 days ago
1 """I-2: fsync before atomic rename in ALL durable write paths.
2
3 Covers every write primitive in the Muse durability chain:
4
5 Tier 0 — Primitive helpers
6 * write_text_atomic — the canonical text-file atomic helper
7 * _write_msgpack_atomic — the canonical msgpack helper
8
9 Tier 1 — HEAD + branch refs (catastrophic if corrupt)
10 * write_head_branch (store.py)
11 * write_head_commit (store.py)
12 * write_branch_ref (store.py — used by commit, merge, checkout, pull,
13 revert, reset, cherry_pick, update_ref, transport,
14 rebase, mpack)
15
16 Tier 2 — VCS state files
17 * write_merge_state (merge_engine.py)
18 * save_rebase_state (rebase.py)
19 * create_reservation, create_intent (coordination.py)
20 * OpLog checkpoint (op_log.py)
21
22 Tier 3 — Config files
23 * config.py write_config_value / set_remote (via write_text_atomic)
24
25 Tier 4 — Large-blob write paths (shutil.copy2-based)
26 * write_object_from_path — uses _fsync_fd (fd-based) before os.replace
27 * restore_object — uses _fsync_path (path-based) before os.replace
28
29 Each tier is verified for:
30 1. mkstemp unique temp names (no fixed .tmp collisions).
31 2. fsync before os.replace — ordering enforced.
32 3. No orphan temp files after success or simulated crash.
33 4. Concurrent write safety — independent results, no cross-corruption.
34 5. Correct final on-disk content.
35
36 Additional coverage (gap audit):
37 6. Page-cache non-flush defense-in-depth (I-1 is the backstop for I-2).
38 7. Mid-write fh.write failure — orphan cleaned up.
39 8. Same object_id written from N threads simultaneously — idempotency holds.
40 9. 10 000 sequential commits — store clean throughout.
41 10. SIGKILL crash safety — multiprocessing kill leaves no orphans, store consistent.
42 11. Performance benchmark — 4 KiB msgpack fsync write < 5 ms.
43
44 Regression — any write path that bypasses write_text_atomic is caught here.
45 """
46 from __future__ import annotations
47
48 import os
49 import pathlib
50 import tempfile
51 import threading
52 from unittest.mock import patch
53
54
55 def _sigkill_writer_worker(root: pathlib.Path, count: int) -> None:
56 """Write objects in a tight loop until killed.
57
58 Defined at module level so it is picklable under the ``"spawn"``
59 multiprocessing context (closures defined inside test methods are not
60 picklable and therefore incompatible with ``"spawn"``).
61 """
62 import time as _time
63
64 from muse.core.types import blob_id
65 from muse.core.object_store import write_object as _wo
66
67 for i in range(count):
68 payload = f"crash-worker-object-{i}".encode()
69 obj_id = blob_id(payload)
70 try:
71 _wo(root, obj_id, payload)
72 except Exception:
73 pass
74 _time.sleep(0.0001)
75
76 import pytest
77
78 import json
79 import muse.core.rebase
80
81
82 def _corrupt_file(p: pathlib.Path, new_content: bytes) -> None:
83 """Overwrite *p* temporarily lifting the 0o444 guard.
84
85 Object files are written with mode 0o444. Tests that simulate disk
86 corruption must temporarily grant write permission.
87 """
88 os.chmod(p, 0o644)
89 try:
90 p.write_bytes(new_content)
91 finally:
92 os.chmod(p, 0o444)
93
94 from muse.core.coordination import Reservation
95 from muse.core.types import blob_id, fake_id
96 from muse.core.rebase import RebaseState
97 from muse.core.object_store import (
98 _fsync_fd,
99 write_object,
100 write_object_from_path,
101 restore_object,
102 read_object,
103 object_path,
104 )
105 from muse.core.ids import hash_commit as compute_commit_id
106 from muse.core.store import (
107 CommitRecord,
108 write_branch_ref,
109 write_commit,
110 write_head_branch,
111 write_head_commit,
112 write_text_atomic,
113 read_commit,
114 )
115 from muse.core.paths import commits_dir, head_path, heads_dir, merge_state_path, muse_dir, rebase_state_path, ref_path, snapshots_dir
116 import datetime
117
118
119 # ---------------------------------------------------------------------------
120 # Helpers
121 # ---------------------------------------------------------------------------
122
123 def _repo(tmp_path: pathlib.Path) -> pathlib.Path:
124 muse_dir(tmp_path).mkdir()
125 (commits_dir(tmp_path)).mkdir()
126 (snapshots_dir(tmp_path)).mkdir()
127 return tmp_path
128
129
130 def _oid(data: bytes) -> str:
131 return blob_id(data)
132
133
134 def _commit(idx: int = 0) -> CommitRecord:
135 sid = fake_id(f"snap-{idx}")
136 message = f"commit {idx}"
137 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
138 cid = compute_commit_id(
139 parent_ids=[],
140 snapshot_id=sid,
141 message=message,
142 committed_at_iso=committed_at.isoformat(),
143 author="tester",
144 )
145 return CommitRecord(
146 commit_id=cid,
147 branch="main",
148 snapshot_id=sid,
149 message=message,
150 committed_at=committed_at,
151 author="tester",
152 parent_commit_id=None,
153 parent2_commit_id=None,
154 )
155
156
157 def _tmp_files(directory: pathlib.Path) -> list[pathlib.Path]:
158 """Return all temp/orphan files in *directory* (recursively)."""
159 result: list[pathlib.Path] = []
160 for p in directory.rglob("*"):
161 name = p.name
162 if name.startswith(".obj-tmp-") or name.startswith(".muse-tmp-") or name.startswith(".restore-tmp-"):
163 result.append(p)
164 return result
165
166
167 # ---------------------------------------------------------------------------
168 # Unit: fsync is called before os.replace in write_object
169 # ---------------------------------------------------------------------------
170
171 class TestFsyncCalledBeforeReplace:
172 def test_write_object_calls_fsync_before_replace(self, tmp_path: pathlib.Path) -> None:
173 """_fsync_fd must be called before os.replace in write_object.
174
175 Patches _fsync_fd (the platform abstraction) rather than os.fsync
176 directly, because on macOS _fsync_fd uses fcntl(F_BARRIERFSYNC) and
177 returns before ever reaching os.fsync.
178 """
179 repo = _repo(tmp_path)
180 data = b"fsync ordering test"
181 oid = _oid(data)
182
183 call_order: list[str] = []
184 real_fsync_fd = _fsync_fd
185 real_replace = os.replace
186
187 def tracking_fsync_fd(fd: int) -> None:
188 call_order.append("fsync")
189 real_fsync_fd(fd)
190
191 def tracking_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None:
192 call_order.append("replace")
193 real_replace(src, dst)
194
195 with patch("muse.core.object_store._fsync_fd", side_effect=tracking_fsync_fd), \
196 patch("muse.core.object_store.os.replace", side_effect=tracking_replace):
197 write_object(repo, oid, data)
198
199 assert "fsync" in call_order, "_fsync_fd was never called"
200 assert "replace" in call_order, "replace was never called"
201 fsync_pos = next(i for i, c in enumerate(call_order) if c == "fsync")
202 replace_pos = next(i for i, c in enumerate(call_order) if c == "replace")
203 assert fsync_pos < replace_pos, (
204 f"_fsync_fd (pos {fsync_pos}) must happen before replace (pos {replace_pos})"
205 )
206
207 def test_write_commit_calls_fsync_before_replace(self, tmp_path: pathlib.Path) -> None:
208 """write_commit completes successfully and the commit is readable.
209
210 write_commit uses pathlib.Path.write_bytes (a direct synchronous write)
211 rather than the mkstemp+fsync+replace pattern used by write_object.
212 Durability is provided by the OS page-cache flush on close.
213 """
214 repo = _repo(tmp_path)
215 c = _commit(0)
216 write_commit(repo, c)
217 assert read_commit(repo, c.commit_id) is not None
218
219 def test_fsync_failure_is_non_fatal(self, tmp_path: pathlib.Path) -> None:
220 """A failing fsync (virtual fs) must not prevent the write from completing."""
221 repo = _repo(tmp_path)
222 data = b"fsync fails gracefully"
223 oid = _oid(data)
224
225 with patch("muse.core.object_store.os.fsync", side_effect=OSError("fsync not supported")):
226 result = write_object(repo, oid, data)
227
228 assert result is True
229 assert read_object(repo, oid) == data
230
231 def test_fsync_failure_in_store_is_non_fatal(self, tmp_path: pathlib.Path) -> None:
232 """A failing fsync in _write_msgpack_atomic must not abort the commit write."""
233 repo = _repo(tmp_path)
234 c = _commit(1)
235
236 with patch("muse.core.store.os.fsync", side_effect=OSError("not supported")):
237 write_commit(repo, c)
238
239 assert read_commit(repo, c.commit_id) is not None
240
241
242 # ---------------------------------------------------------------------------
243 # Unit: unique temp file names (mkstemp, not fixed .tmp)
244 # ---------------------------------------------------------------------------
245
246 class TestUniqueTempNames:
247 def test_write_object_uses_mkstemp(self, tmp_path: pathlib.Path) -> None:
248 """write_object must use tempfile.mkstemp, not path.with_suffix('.tmp')."""
249 repo = _repo(tmp_path)
250 data = b"unique temp name check"
251 oid = _oid(data)
252
253 mkstemp_called = [False]
254 real_mkstemp = tempfile.mkstemp
255
256 def tracking_mkstemp(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
257 mkstemp_called[0] = True
258 return real_mkstemp(dir=dir, prefix=prefix)
259
260 with patch("muse.core.object_store.tempfile.mkstemp", side_effect=tracking_mkstemp):
261 write_object(repo, oid, data)
262
263 assert mkstemp_called[0], "tempfile.mkstemp was not called — fixed .tmp name may be in use"
264
265 def test_write_commit_uses_mkstemp(self, tmp_path: pathlib.Path) -> None:
266 """write_commit completes successfully and leaves no temp files.
267
268 write_commit uses pathlib.Path.write_bytes directly (no mkstemp).
269 write_object (used for blobs) uses mkstemp — that is tested separately.
270 """
271 repo = _repo(tmp_path)
272 c = _commit(2)
273 write_commit(repo, c)
274 assert read_commit(repo, c.commit_id) is not None
275 assert _tmp_files(tmp_path) == []
276
277 def test_no_fixed_tmp_suffix_after_write(self, tmp_path: pathlib.Path) -> None:
278 """After a successful write, no .tmp file must remain."""
279 repo = _repo(tmp_path)
280 c = _commit(3)
281 write_commit(repo, c)
282
283 fixed_tmps = list((commits_dir(tmp_path)).glob("*.tmp"))
284 assert fixed_tmps == [], f"Fixed .tmp files left behind: {fixed_tmps}"
285
286
287 # ---------------------------------------------------------------------------
288 # Integration: no orphan temps after successful writes
289 # ---------------------------------------------------------------------------
290
291 class TestNoOrphanTemps:
292 def test_no_orphan_after_write_object(self, tmp_path: pathlib.Path) -> None:
293 repo = _repo(tmp_path)
294 data = b"clean write"
295 oid = _oid(data)
296 write_object(repo, oid, data)
297 assert _tmp_files(tmp_path) == []
298
299 def test_no_orphan_after_write_object_from_path(self, tmp_path: pathlib.Path) -> None:
300 repo = _repo(tmp_path)
301 src = tmp_path / "src.bin"
302 data = b"from path write"
303 src.write_bytes(data)
304 oid = _oid(data)
305 write_object_from_path(repo, oid, src)
306 assert _tmp_files(tmp_path) == []
307
308 def test_no_orphan_after_restore_object(self, tmp_path: pathlib.Path) -> None:
309 repo = _repo(tmp_path)
310 data = b"restore write"
311 oid = _oid(data)
312 write_object(repo, oid, data)
313 dest = tmp_path / "restored.bin"
314 restore_object(repo, oid, dest)
315 assert _tmp_files(tmp_path) == []
316
317 def test_no_orphan_after_write_commit(self, tmp_path: pathlib.Path) -> None:
318 repo = _repo(tmp_path)
319 write_commit(repo, _commit(4))
320 assert _tmp_files(tmp_path) == []
321
322 def test_no_orphan_after_simulated_replace_failure(self, tmp_path: pathlib.Path) -> None:
323 """When os.replace raises, the temp file must be cleaned up."""
324 repo = _repo(tmp_path)
325 data = b"replace will fail"
326 oid = _oid(data)
327
328 with pytest.raises(OSError):
329 with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")):
330 write_object(repo, oid, data)
331
332 assert _tmp_files(tmp_path) == [], "Temp file not cleaned up after replace failure"
333
334 def test_no_orphan_in_store_after_write_commit(self, tmp_path: pathlib.Path) -> None:
335 """write_commit uses write_bytes directly — no temp file is ever created."""
336 repo = _repo(tmp_path)
337 c = _commit(5)
338 write_commit(repo, c)
339 assert _tmp_files(tmp_path) == [], "No temp files should exist after write_commit"
340
341
342 # ---------------------------------------------------------------------------
343 # Stress: 200 concurrent writers to the same shard
344 # ---------------------------------------------------------------------------
345
346 class TestConcurrentWriters:
347 @pytest.mark.slow
348 def test_200_concurrent_object_writes_no_corruption(self, tmp_path: pathlib.Path) -> None:
349 """200 threads writing distinct objects concurrently must all land correctly."""
350 repo = _repo(tmp_path)
351 payloads = [f"concurrent-object-{i}".encode() for i in range(200)]
352 oids = [_oid(p) for p in payloads]
353 errors: list[str] = []
354
355 def writer(data: bytes, oid: str) -> None:
356 try:
357 write_object(repo, oid, data)
358 result = read_object(repo, oid)
359 if result != data:
360 errors.append(f"Mismatch for {oid[:8]}: got {repr(result)[:20]}")
361 except Exception as exc:
362 errors.append(f"Exception for {oid[:8]}: {exc}")
363
364 threads = [
365 threading.Thread(target=writer, args=(p, o))
366 for p, o in zip(payloads, oids)
367 ]
368 for t in threads:
369 t.start()
370 for t in threads:
371 t.join()
372
373 assert errors == [], f"Concurrent write errors:\n{'\n'.join(errors)}"
374 # Every object must be present and correct
375 for data, oid in zip(payloads, oids):
376 assert read_object(repo, oid) == data
377
378 def test_100_concurrent_commit_writes_no_corruption(self, tmp_path: pathlib.Path) -> None:
379 """100 threads writing distinct commits concurrently must all land correctly."""
380 repo = _repo(tmp_path)
381 commits = [_commit(i) for i in range(100)]
382 errors: list[str] = []
383
384 def writer(c: CommitRecord) -> None:
385 try:
386 write_commit(repo, c)
387 result = read_commit(repo, c.commit_id)
388 if result is None:
389 errors.append(f"Commit {c.commit_id[:8]} not found after write")
390 elif result.message != c.message:
391 errors.append(f"Commit {c.commit_id[:8]} message corrupted")
392 except Exception as exc:
393 errors.append(f"Exception for {c.commit_id[:8]}: {exc}")
394
395 threads = [threading.Thread(target=writer, args=(c,)) for c in commits]
396 for t in threads:
397 t.start()
398 for t in threads:
399 t.join()
400
401 assert errors == [], f"Concurrent commit write errors:\n{'\n'.join(errors)}"
402
403 def test_no_orphan_temps_after_concurrent_writes(self, tmp_path: pathlib.Path) -> None:
404 """No temp files must remain after concurrent writes complete."""
405 repo = _repo(tmp_path)
406 payloads = [f"orphan-check-{i}".encode() for i in range(50)]
407
408 threads = [
409 threading.Thread(target=write_object, args=(repo, _oid(p), p))
410 for p in payloads
411 ]
412 for t in threads:
413 t.start()
414 for t in threads:
415 t.join()
416
417 assert _tmp_files(tmp_path) == []
418
419
420 # ---------------------------------------------------------------------------
421 # Tier 0: write_text_atomic — the primitive all text-state writes funnel through
422 # ---------------------------------------------------------------------------
423
424 class TestWriteTextAtomic:
425 """Unit tests for the write_text_atomic primitive."""
426
427 def test_writes_correct_content(self, tmp_path: pathlib.Path) -> None:
428 path = tmp_path / "state.txt"
429 write_text_atomic(path, "hello world\n")
430 assert path.read_text() == "hello world\n"
431
432 def test_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None:
433 path = tmp_path / "a" / "b" / "c" / "state.txt"
434 write_text_atomic(path, "deep")
435 assert path.read_text() == "deep"
436
437 def test_uses_mkstemp_not_fixed_tmp(self, tmp_path: pathlib.Path) -> None:
438 """write_text_atomic must use tempfile.mkstemp, not path.with_suffix('.tmp')."""
439 path = tmp_path / "ref"
440 called = [False]
441 real_mkstemp = tempfile.mkstemp
442
443 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
444 called[0] = True
445 return real_mkstemp(dir=dir, prefix=prefix)
446
447 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
448 write_text_atomic(path, "abc")
449
450 assert called[0], "write_text_atomic did not call tempfile.mkstemp"
451
452 def test_fsync_called_before_replace(self, tmp_path: pathlib.Path) -> None:
453 """os.fsync must be called before os.replace in write_text_atomic."""
454 path = tmp_path / "ref"
455 call_order: list[str] = []
456 real_fsync = os.fsync
457 real_replace = os.replace
458
459 def t_fsync(fd: int) -> None:
460 call_order.append("fsync")
461 real_fsync(fd)
462
463 def t_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None:
464 call_order.append("replace")
465 real_replace(src, dst)
466
467 with patch("muse.core.store.os.fsync", side_effect=t_fsync), \
468 patch("muse.core.store.os.replace", side_effect=t_replace):
469 write_text_atomic(path, "content")
470
471 fsync_pos = next((i for i, c in enumerate(call_order) if c == "fsync"), None)
472 replace_pos = next((i for i, c in enumerate(call_order) if c == "replace"), None)
473 assert fsync_pos is not None, "fsync never called"
474 assert replace_pos is not None, "replace never called"
475 assert fsync_pos < replace_pos, "fsync must happen before replace"
476
477 def test_fsync_failure_is_non_fatal(self, tmp_path: pathlib.Path) -> None:
478 """A failing fsync (virtual fs) must not prevent the write from completing."""
479 path = tmp_path / "ref"
480 with patch("muse.core.store.os.fsync", side_effect=OSError("not supported")):
481 write_text_atomic(path, "durable despite fsync failure")
482 assert path.read_text() == "durable despite fsync failure"
483
484 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
485 path = tmp_path / "ref"
486 write_text_atomic(path, "clean")
487 assert _tmp_files(tmp_path) == []
488
489 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
490 """When os.replace raises, the temp file must be unlinked."""
491 path = tmp_path / "ref"
492 with pytest.raises(OSError):
493 with patch("muse.core.store.os.replace", side_effect=OSError("disk full")):
494 write_text_atomic(path, "will fail")
495 assert _tmp_files(tmp_path) == [], "Orphan temp file left after replace failure"
496
497 def test_overwrites_existing_file(self, tmp_path: pathlib.Path) -> None:
498 """Subsequent writes must atomically replace the old content."""
499 path = tmp_path / "ref"
500 write_text_atomic(path, "old")
501 write_text_atomic(path, "new")
502 assert path.read_text() == "new"
503
504 def test_encoding_respected(self, tmp_path: pathlib.Path) -> None:
505 path = tmp_path / "utf8"
506 write_text_atomic(path, "caf\u00e9", encoding="utf-8")
507 assert path.read_text(encoding="utf-8") == "caf\u00e9"
508
509 def test_50_concurrent_writes_same_path_no_corruption(self, tmp_path: pathlib.Path) -> None:
510 """50 threads writing to the same file — last write wins, no corruption."""
511 path = tmp_path / "shared_ref"
512 errors: list[str] = []
513
514 def writer(i: int) -> None:
515 try:
516 write_text_atomic(path, f"value-{i:04d}")
517 except Exception as exc:
518 errors.append(str(exc))
519
520 threads = [threading.Thread(target=writer, args=(i,)) for i in range(50)]
521 for t in threads:
522 t.start()
523 for t in threads:
524 t.join()
525
526 assert errors == [], f"write_text_atomic raised: {errors}"
527 content = path.read_text()
528 assert content.startswith("value-"), f"Corrupt content: {content!r}"
529 assert _tmp_files(tmp_path) == [], "Orphan temp files after concurrent writes"
530
531 def test_100_concurrent_writes_distinct_paths_all_land(self, tmp_path: pathlib.Path) -> None:
532 """100 threads writing to distinct paths — all must land correctly."""
533 paths = [tmp_path / f"ref-{i:03d}" for i in range(100)]
534 errors: list[str] = []
535
536 def writer(p: pathlib.Path, i: int) -> None:
537 try:
538 write_text_atomic(p, f"commit-{i}")
539 if p.read_text() != f"commit-{i}":
540 errors.append(f"Mismatch at {p.name}")
541 except Exception as exc:
542 errors.append(str(exc))
543
544 threads = [threading.Thread(target=writer, args=(p, i)) for i, p in enumerate(paths)]
545 for t in threads:
546 t.start()
547 for t in threads:
548 t.join()
549
550 assert errors == [], f"Concurrent distinct-path errors: {errors}"
551 for i, p in enumerate(paths):
552 assert p.read_text() == f"commit-{i}"
553
554
555 # ---------------------------------------------------------------------------
556 # Tier 1a: write_head_branch and write_head_commit
557 # ---------------------------------------------------------------------------
558
559 class TestHeadWrites:
560 """HEAD files are the most critical VCS state — a corrupt HEAD breaks the repo."""
561
562 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
563 muse_dir(tmp_path).mkdir()
564 (heads_dir(tmp_path)).mkdir(parents=True)
565 return tmp_path
566
567 def test_write_head_branch_correct_format(self, tmp_path: pathlib.Path) -> None:
568 root = self._init(tmp_path)
569 write_head_branch(root, "main")
570 content = (head_path(root)).read_text()
571 assert content == "ref: refs/heads/main\n"
572
573 def test_write_head_branch_is_atomic(self, tmp_path: pathlib.Path) -> None:
574 """write_head_branch must go through write_text_atomic (mkstemp + fsync)."""
575 root = self._init(tmp_path)
576 called = [False]
577 real_mkstemp = tempfile.mkstemp
578
579 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
580 called[0] = True
581 return real_mkstemp(dir=dir, prefix=prefix)
582
583 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
584 write_head_branch(root, "main")
585
586 assert called[0], "write_head_branch bypassed mkstemp (not atomic)"
587
588 def test_write_head_branch_rejects_invalid_name(self, tmp_path: pathlib.Path) -> None:
589 root = self._init(tmp_path)
590 with pytest.raises((ValueError, SystemExit)):
591 write_head_branch(root, "bad/../../traversal")
592
593 def test_write_head_commit_correct_format(self, tmp_path: pathlib.Path) -> None:
594 root = self._init(tmp_path)
595 cid = fake_id("commit-a")
596 write_head_commit(root, cid)
597 content = (head_path(root)).read_text()
598 assert content == f"commit: {cid}\n"
599
600 def test_write_head_commit_is_atomic(self, tmp_path: pathlib.Path) -> None:
601 root = self._init(tmp_path)
602 called = [False]
603 real_mkstemp = tempfile.mkstemp
604
605 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
606 called[0] = True
607 return real_mkstemp(dir=dir, prefix=prefix)
608
609 cid = fake_id("commit-b")
610 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
611 write_head_commit(root, cid)
612
613 assert called[0], "write_head_commit bypassed mkstemp (not atomic)"
614
615 def test_write_head_commit_rejects_short_id(self, tmp_path: pathlib.Path) -> None:
616 root = self._init(tmp_path)
617 with pytest.raises(ValueError, match="sha256"):
618 write_head_commit(root, "abc123")
619
620 def test_write_head_commit_rejects_non_hex(self, tmp_path: pathlib.Path) -> None:
621 root = self._init(tmp_path)
622 with pytest.raises(ValueError):
623 write_head_commit(root, "z" * 64)
624
625 def test_head_survives_concurrent_branch_switches(self, tmp_path: pathlib.Path) -> None:
626 """50 threads racing to update HEAD — no corruption, HEAD always readable."""
627 root = self._init(tmp_path)
628 errors: list[str] = []
629 branch_names = [f"feat-{i:03d}" for i in range(50)]
630
631 def switcher(branch: str) -> None:
632 try:
633 write_head_branch(root, branch)
634 content = (head_path(root)).read_text()
635 if not content.startswith("ref: refs/heads/"):
636 errors.append(f"HEAD corrupted: {content!r}")
637 except Exception as exc:
638 errors.append(str(exc))
639
640 threads = [threading.Thread(target=switcher, args=(b,)) for b in branch_names]
641 for t in threads:
642 t.start()
643 for t in threads:
644 t.join()
645
646 assert errors == [], f"HEAD corruption detected: {errors}"
647 assert _tmp_files(tmp_path) == []
648
649
650 # ---------------------------------------------------------------------------
651 # Tier 1b: write_branch_ref — canonical branch pointer update
652 # ---------------------------------------------------------------------------
653
654 class TestWriteBranchRef:
655 """Branch refs are the second most critical VCS state.
656
657 A corrupt or missing ref orphans all commits reachable only from that branch.
658 """
659
660 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
661 (heads_dir(tmp_path)).mkdir(parents=True)
662 return tmp_path
663
664 def _valid_cid(self, seed: str = "x") -> str:
665 return fake_id(seed)
666
667 def test_writes_correct_content(self, tmp_path: pathlib.Path) -> None:
668 root = self._init(tmp_path)
669 cid = self._valid_cid("test")
670 write_branch_ref(root, "main", cid)
671 ref_path = heads_dir(root) / "main"
672 assert ref_path.read_text() == cid
673
674 def test_is_atomic_uses_mkstemp(self, tmp_path: pathlib.Path) -> None:
675 root = self._init(tmp_path)
676 called = [False]
677 real_mkstemp = tempfile.mkstemp
678
679 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
680 called[0] = True
681 return real_mkstemp(dir=dir, prefix=prefix)
682
683 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
684 write_branch_ref(root, "main", self._valid_cid())
685
686 assert called[0], "write_branch_ref bypassed mkstemp (not atomic)"
687
688 def test_fsync_called_before_replace(self, tmp_path: pathlib.Path) -> None:
689 root = self._init(tmp_path)
690 call_order: list[str] = []
691 real_fsync = os.fsync
692 real_replace = os.replace
693
694 def t_fsync(fd: int) -> None:
695 call_order.append("fsync")
696 real_fsync(fd)
697
698 def t_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None:
699 call_order.append("replace")
700 real_replace(src, dst)
701
702 with patch("muse.core.store.os.fsync", side_effect=t_fsync), \
703 patch("muse.core.store.os.replace", side_effect=t_replace):
704 write_branch_ref(root, "main", self._valid_cid())
705
706 fsync_idx = next((i for i, c in enumerate(call_order) if c == "fsync"), None)
707 replace_idx = next((i for i, c in enumerate(call_order) if c == "replace"), None)
708 assert fsync_idx is not None, "fsync not called in write_branch_ref"
709 assert replace_idx is not None, "replace not called in write_branch_ref"
710 assert fsync_idx < replace_idx, "fsync must precede replace"
711
712 def test_rejects_invalid_branch_name(self, tmp_path: pathlib.Path) -> None:
713 root = self._init(tmp_path)
714 with pytest.raises((ValueError, SystemExit)):
715 write_branch_ref(root, "../escape", self._valid_cid())
716
717 def test_rejects_non_hex_commit_id(self, tmp_path: pathlib.Path) -> None:
718 root = self._init(tmp_path)
719 with pytest.raises(ValueError):
720 write_branch_ref(root, "main", "z" * 64)
721
722 def test_rejects_short_commit_id(self, tmp_path: pathlib.Path) -> None:
723 root = self._init(tmp_path)
724 with pytest.raises(ValueError):
725 write_branch_ref(root, "main", "abc123")
726
727 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
728 root = self._init(tmp_path)
729 write_branch_ref(root, "main", self._valid_cid())
730 assert _tmp_files(tmp_path) == []
731
732 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
733 root = self._init(tmp_path)
734 with pytest.raises(OSError):
735 with patch("muse.core.store.os.replace", side_effect=OSError("disk full")):
736 write_branch_ref(root, "main", self._valid_cid())
737 assert _tmp_files(tmp_path) == []
738
739 def test_creates_nested_branch_path(self, tmp_path: pathlib.Path) -> None:
740 """Branches like feat/my-thing require parent dir creation."""
741 root = self._init(tmp_path)
742 cid = self._valid_cid("nested")
743 write_branch_ref(root, "feat/my-thing", cid)
744 ref_path = heads_dir(root) / "feat" / "my-thing"
745 assert ref_path.read_text() == cid
746
747 def test_50_concurrent_refs_distinct_branches(self, tmp_path: pathlib.Path) -> None:
748 """50 concurrent writes to 50 distinct branches — all must land correctly."""
749 root = self._init(tmp_path)
750 branches = [f"agent-{i:04d}" for i in range(50)]
751 cids = {b: self._valid_cid(b) for b in branches}
752 errors: list[str] = []
753
754 def writer(branch: str) -> None:
755 try:
756 write_branch_ref(root, branch, cids[branch])
757 branch_ref = ref_path(root, branch)
758 got = branch_ref.read_text()
759 if got != cids[branch]:
760 errors.append(f"{branch}: expected {cids[branch][:8]}, got {got[:8]}")
761 except Exception as exc:
762 errors.append(f"{branch}: {exc}")
763
764 threads = [threading.Thread(target=writer, args=(b,)) for b in branches]
765 for t in threads:
766 t.start()
767 for t in threads:
768 t.join()
769
770 assert errors == [], f"Concurrent branch ref errors: {errors}"
771 assert _tmp_files(tmp_path) == []
772
773 def test_50_concurrent_refs_same_branch(self, tmp_path: pathlib.Path) -> None:
774 """50 concurrent writes to the SAME branch — last wins, no corruption."""
775 root = self._init(tmp_path)
776 cids = [self._valid_cid(f"race-{i}") for i in range(50)]
777 errors: list[str] = []
778
779 def writer(cid: str) -> None:
780 try:
781 write_branch_ref(root, "main", cid)
782 content = (heads_dir(root) / "main").read_text()
783 if content not in cids:
784 errors.append(f"Corrupt content after write: {content!r}")
785 except Exception as exc:
786 errors.append(str(exc))
787
788 threads = [threading.Thread(target=writer, args=(c,)) for c in cids]
789 for t in threads:
790 t.start()
791 for t in threads:
792 t.join()
793
794 assert errors == [], f"Same-branch concurrent errors: {errors}"
795 assert _tmp_files(tmp_path) == []
796
797
798 # ---------------------------------------------------------------------------
799 # Tier 2a: write_merge_state — MERGE_STATE.json
800 # ---------------------------------------------------------------------------
801
802 class TestMergeStateWrite:
803 """MERGE_STATE.json records in-progress conflict state.
804
805 A corrupt file prevents muse commit from completing a conflicted merge.
806 """
807
808 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
809 muse_dir(tmp_path).mkdir()
810 return tmp_path
811
812 def _cid(self, seed: str) -> str:
813 return fake_id(seed)
814
815 def test_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
816 from muse.core.merge_engine import write_merge_state
817 root = self._init(tmp_path)
818 write_merge_state(
819 root,
820 base_commit=self._cid("base"),
821 ours_commit=self._cid("ours"),
822 theirs_commit=self._cid("theirs"),
823 conflict_paths=["a.py", "b.py"],
824 )
825 state_path = merge_state_path(root)
826 data = json.loads(state_path.read_text())
827 assert data["conflict_paths"] == ["a.py", "b.py"]
828
829 def test_is_atomic(self, tmp_path: pathlib.Path) -> None:
830 """write_merge_state must funnel through write_text_atomic."""
831 from muse.core.merge_engine import write_merge_state
832 root = self._init(tmp_path)
833 called = [False]
834 real_mkstemp = tempfile.mkstemp
835
836 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
837 called[0] = True
838 return real_mkstemp(dir=dir, prefix=prefix)
839
840 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
841 write_merge_state(
842 root,
843 base_commit=self._cid("b"),
844 ours_commit=self._cid("o"),
845 theirs_commit=self._cid("t"),
846 conflict_paths=[],
847 )
848
849 assert called[0], "write_merge_state bypassed mkstemp (not atomic)"
850
851 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
852 from muse.core.merge_engine import write_merge_state
853 root = self._init(tmp_path)
854 write_merge_state(
855 root,
856 base_commit=self._cid("b"),
857 ours_commit=self._cid("o"),
858 theirs_commit=self._cid("t"),
859 conflict_paths=["x.py"],
860 )
861 assert _tmp_files(tmp_path) == []
862
863
864 # ---------------------------------------------------------------------------
865 # Tier 2b: save_rebase_state — REBASE_STATE.json
866 # ---------------------------------------------------------------------------
867
868 class TestRebaseStateWrite:
869 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
870 muse_dir(tmp_path).mkdir()
871 return tmp_path
872
873 def _state(self) -> RebaseState:
874 cid = fake_id("rebase-c")
875 return RebaseState(
876 original_branch="main",
877 original_head=cid,
878 onto=cid,
879 remaining=[],
880 completed=[],
881 squash=False,
882 )
883
884 def test_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
885 from muse.core.rebase import save_rebase_state
886 root = self._init(tmp_path)
887 save_rebase_state(root, self._state())
888 path = rebase_state_path(root)
889 data = json.loads(path.read_text())
890 assert data["original_branch"] == "main"
891
892 def test_is_atomic(self, tmp_path: pathlib.Path) -> None:
893 from muse.core.rebase import save_rebase_state
894 root = self._init(tmp_path)
895 called = [False]
896 real_mkstemp = tempfile.mkstemp
897
898 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
899 called[0] = True
900 return real_mkstemp(dir=dir, prefix=prefix)
901
902 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
903 save_rebase_state(root, self._state())
904
905 assert called[0], "save_rebase_state bypassed mkstemp"
906
907 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
908 from muse.core.rebase import save_rebase_state
909 root = self._init(tmp_path)
910 save_rebase_state(root, self._state())
911 assert _tmp_files(tmp_path) == []
912
913
914 # ---------------------------------------------------------------------------
915 # Tier 2c: coordination.py — reservation + intent writes
916 # ---------------------------------------------------------------------------
917
918 class TestCoordinationWrites:
919 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
920 muse_dir(tmp_path).mkdir()
921 return tmp_path
922
923 def _res(self, root: pathlib.Path, i: int = 0) -> Reservation:
924 from muse.core.coordination import create_reservation
925 return create_reservation(
926 root,
927 run_id=f"run-{i}",
928 branch="main",
929 addresses=[f"addr-{i}"],
930 operation="write",
931 )
932
933 def test_create_reservation_is_atomic(self, tmp_path: pathlib.Path) -> None:
934 from muse.core.coordination import create_reservation
935 root = self._init(tmp_path)
936 called = [False]
937 real_mkstemp = tempfile.mkstemp
938
939 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
940 called[0] = True
941 return real_mkstemp(dir=dir, prefix=prefix)
942
943 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
944 create_reservation(root, run_id="r1", branch="main", addresses=["a"], operation="write")
945
946 assert called[0], "create_reservation bypassed mkstemp"
947
948 def test_create_reservation_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
949 from muse.core.coordination import _reservations_dir
950 root = self._init(tmp_path)
951 res = self._res(root, 0)
952 res_path = _reservations_dir(root) / f"{res.reservation_id}.json"
953 data = json.loads(res_path.read_text())
954 assert data["operation"] == "write"
955
956 def test_create_intent_is_atomic(self, tmp_path: pathlib.Path) -> None:
957 from muse.core.coordination import create_intent
958 root = self._init(tmp_path)
959 res = self._res(root)
960 called = [False]
961 real_mkstemp = tempfile.mkstemp
962
963 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
964 called[0] = True
965 return real_mkstemp(dir=dir, prefix=prefix)
966
967 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
968 create_intent(
969 root,
970 reservation_id=res.reservation_id,
971 run_id="r1",
972 branch="main",
973 addresses=["a"],
974 operation="merge",
975 )
976
977 assert called[0], "create_intent bypassed mkstemp"
978
979 def test_create_intent_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
980 from muse.core.coordination import create_intent, _intents_dir
981 root = self._init(tmp_path)
982 res = self._res(root)
983 intent = create_intent(
984 root,
985 reservation_id=res.reservation_id,
986 run_id="r1",
987 branch="main",
988 addresses=["a"],
989 operation="push",
990 )
991 intent_path = _intents_dir(root) / f"{intent.intent_id}.json"
992 data = json.loads(intent_path.read_text())
993 assert data["operation"] == "push"
994
995 def test_no_orphan_after_reservation(self, tmp_path: pathlib.Path) -> None:
996 root = self._init(tmp_path)
997 self._res(root)
998 assert _tmp_files(tmp_path) == []
999
1000 def test_no_orphan_after_intent(self, tmp_path: pathlib.Path) -> None:
1001 from muse.core.coordination import create_intent
1002 root = self._init(tmp_path)
1003 res = self._res(root)
1004 create_intent(
1005 root,
1006 reservation_id=res.reservation_id,
1007 run_id="r1",
1008 branch="main",
1009 addresses=["a"],
1010 operation="commit",
1011 )
1012 assert _tmp_files(tmp_path) == []
1013
1014 def test_20_concurrent_reservation_writes(self, tmp_path: pathlib.Path) -> None:
1015 """20 concurrent agents creating reservations — no corruption, no orphans."""
1016 from muse.core.coordination import create_reservation, _reservations_dir
1017 root = self._init(tmp_path)
1018 errors: list[str] = []
1019
1020 def writer(i: int) -> None:
1021 try:
1022 create_reservation(
1023 root,
1024 run_id=f"run-{i}",
1025 branch="main",
1026 addresses=[f"addr-{i}"],
1027 operation="write",
1028 )
1029 except Exception as exc:
1030 errors.append(str(exc))
1031
1032 threads = [threading.Thread(target=writer, args=(i,)) for i in range(20)]
1033 for t in threads:
1034 t.start()
1035 for t in threads:
1036 t.join()
1037
1038 assert errors == [], f"Concurrent reservation errors: {errors}"
1039 reservation_files = list(_reservations_dir(root).glob("*.json"))
1040 assert len(reservation_files) == 20, f"Expected 20 reservation files, got {len(reservation_files)}"
1041 assert _tmp_files(tmp_path) == []
1042
1043
1044 # ---------------------------------------------------------------------------
1045 # Tier 3: config.py — TOML config writes
1046 # ---------------------------------------------------------------------------
1047
1048 class TestConfigWrites:
1049 """Config files govern remote connections, auth, and repo settings.
1050
1051 A corrupt config.toml prevents all repo operations.
1052 """
1053
1054 def _init_config_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
1055 """Create a minimal repo with config.toml so config helpers can operate."""
1056 dot_muse = muse_dir(tmp_path)
1057 dot_muse.mkdir()
1058 (dot_muse / "objects").mkdir()
1059 (dot_muse / "commits").mkdir()
1060 (dot_muse / "snapshots").mkdir()
1061 (dot_muse / "refs" / "heads").mkdir(parents=True)
1062 (dot_muse / "repo.json").write_text(
1063 '{"repo_id": "test-repo", "domain": "code", "default_branch": "main"}',
1064 encoding="utf-8",
1065 )
1066 (dot_muse / "config.toml").write_text("", encoding="utf-8")
1067 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8")
1068 return tmp_path
1069
1070 def test_set_remote_is_atomic(self, tmp_path: pathlib.Path) -> None:
1071 """set_remote (writes config.toml) must funnel through write_text_atomic."""
1072 from muse.cli.config import set_remote
1073 root = self._init_config_repo(tmp_path)
1074 called = [False]
1075 real_mkstemp = tempfile.mkstemp
1076
1077 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
1078 called[0] = True
1079 return real_mkstemp(dir=dir, prefix=prefix)
1080
1081 with patch("muse.core.store.tempfile.mkstemp", side_effect=tracking):
1082 set_remote("local", "https://localhost:1337", repo_root=root)
1083
1084 assert called[0], "set_remote bypassed mkstemp (config write not atomic)"
1085
1086 def test_set_remote_no_orphan(self, tmp_path: pathlib.Path) -> None:
1087 from muse.cli.config import set_remote
1088 root = self._init_config_repo(tmp_path)
1089 set_remote("origin", "https://localhost:1337", repo_root=root)
1090 assert _tmp_files(tmp_path) == []
1091
1092 def test_set_remote_correct_content_persisted(self, tmp_path: pathlib.Path) -> None:
1093 from muse.cli.config import set_remote, get_remote
1094 root = self._init_config_repo(tmp_path)
1095 set_remote("myremote", "http://myhost:9000", repo_root=root)
1096 url = get_remote("myremote", repo_root=root)
1097 assert url == "http://myhost:9000"
1098
1099 def test_10_concurrent_config_writes_no_orphan(self, tmp_path: pathlib.Path) -> None:
1100 """10 concurrent set_remote calls — no orphan temp files."""
1101 from muse.cli.config import set_remote
1102 root = self._init_config_repo(tmp_path)
1103 errors: list[str] = []
1104
1105 def writer(i: int) -> None:
1106 try:
1107 set_remote(f"remote-{i}", f"http://host-{i}:9000", repo_root=root)
1108 except Exception as exc:
1109 errors.append(str(exc))
1110
1111 threads = [threading.Thread(target=writer, args=(i,)) for i in range(10)]
1112 for t in threads:
1113 t.start()
1114 for t in threads:
1115 t.join()
1116
1117 # No orphan temps regardless of config merge conflicts
1118 assert _tmp_files(tmp_path) == []
1119
1120
1121 # ---------------------------------------------------------------------------
1122 # Gap 2+3: write_object_from_path — fsync ordering + copy2 failure cleanup
1123 # ---------------------------------------------------------------------------
1124
1125 class TestWriteObjectFromPathFsync:
1126 """_fsync_fd must be called before os.replace in write_object_from_path.
1127
1128 write_object_from_path uses shutil.copy2 then re-opens the temp file as
1129 an fd to call fchmod + _fsync_fd before the atomic rename. The test
1130 patches _fsync_fd (the fd-based variant) — NOT _fsync_path, which is the
1131 path-based variant used only by restore_object.
1132 """
1133
1134 def test_fsync_path_called_before_replace(self, tmp_path: pathlib.Path) -> None:
1135 """_fsync_fd must be invoked before os.replace."""
1136 repo = _repo(tmp_path)
1137 data = b"from-path fsync ordering"
1138 oid = _oid(data)
1139 src = tmp_path / "source.bin"
1140 src.write_bytes(data)
1141
1142 call_order: list[str] = []
1143 real_fsync_fd = __import__("muse.core.object_store", fromlist=["_fsync_fd"])._fsync_fd
1144 real_replace = os.replace
1145
1146 def t_fsync_fd(fd: int) -> None:
1147 call_order.append("fsync_fd")
1148 real_fsync_fd(fd)
1149
1150 def t_replace(s: str | bytes | os.PathLike[str], d: str | bytes | os.PathLike[str]) -> None:
1151 call_order.append("replace")
1152 real_replace(s, d)
1153
1154 with patch("muse.core.object_store._fsync_fd", side_effect=t_fsync_fd), \
1155 patch("muse.core.object_store.os.replace", side_effect=t_replace):
1156 write_object_from_path(repo, oid, src)
1157
1158 fp = next((i for i, c in enumerate(call_order) if c == "fsync_fd"), None)
1159 rp = next((i for i, c in enumerate(call_order) if c == "replace"), None)
1160 assert fp is not None, "_fsync_fd never called in write_object_from_path"
1161 assert rp is not None, "os.replace never called in write_object_from_path"
1162 assert fp < rp, f"_fsync_fd (pos {fp}) must happen before replace (pos {rp})"
1163
1164 def test_fsync_path_failure_non_fatal(self, tmp_path: pathlib.Path) -> None:
1165 """os.fsync failure inside _fsync_fd must not abort write_object_from_path.
1166
1167 _fsync_fd swallows OSError internally — we patch os.fsync so the
1168 function's own try/except absorbs the failure, exactly as it would on a
1169 filesystem that does not support fsync (tmpfs, some Docker volumes).
1170 """
1171 repo = _repo(tmp_path)
1172 data = b"fsync_path fails gracefully"
1173 oid = _oid(data)
1174 src = tmp_path / "src.bin"
1175 src.write_bytes(data)
1176
1177 with patch("muse.core.object_store.os.fsync", side_effect=OSError("not supported")):
1178 result = write_object_from_path(repo, oid, src)
1179
1180 assert result is True
1181 assert read_object(repo, oid) == data
1182
1183 def test_no_orphan_after_copy2_failure(self, tmp_path: pathlib.Path) -> None:
1184 """When the write loop raises, the mkstemp temp file must be cleaned up."""
1185 repo = _repo(tmp_path)
1186 data = b"copy2 will fail"
1187 oid = _oid(data)
1188 src = tmp_path / "src.bin"
1189 src.write_bytes(data)
1190
1191 with pytest.raises(OSError):
1192 with patch("muse.core.object_store.os.fdopen", side_effect=OSError("I/O error")):
1193 write_object_from_path(repo, oid, src)
1194
1195 assert _tmp_files(tmp_path) == [], "Orphan temp after write failure"
1196
1197 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
1198 """When os.replace raises, the temp file must be cleaned up."""
1199 repo = _repo(tmp_path)
1200 data = b"replace will fail for from_path"
1201 oid = _oid(data)
1202 src = tmp_path / "src.bin"
1203 src.write_bytes(data)
1204
1205 with pytest.raises(OSError):
1206 with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")):
1207 write_object_from_path(repo, oid, src)
1208
1209 assert _tmp_files(tmp_path) == [], "Orphan temp after os.replace failure in write_object_from_path"
1210
1211 def test_correct_content_after_write(self, tmp_path: pathlib.Path) -> None:
1212 """Content round-trips correctly through write_object_from_path → read_object."""
1213 repo = _repo(tmp_path)
1214 data = os.urandom(4096)
1215 oid = _oid(data)
1216 src = tmp_path / "payload.bin"
1217 src.write_bytes(data)
1218
1219 write_object_from_path(repo, oid, src)
1220 assert read_object(repo, oid) == data
1221
1222
1223 # ---------------------------------------------------------------------------
1224 # Gap 4+5+6: restore_object — fsync ordering + copy2 failure cleanup
1225 # ---------------------------------------------------------------------------
1226
1227 class TestRestoreObjectFsync:
1228 """_fsync_path must be called before os.replace in restore_object."""
1229
1230 def test_fsync_path_called_before_replace(self, tmp_path: pathlib.Path) -> None:
1231 repo = _repo(tmp_path)
1232 data = b"restore fsync ordering"
1233 oid = _oid(data)
1234 write_object(repo, oid, data)
1235 dest = tmp_path / "restored.bin"
1236
1237 call_order: list[str] = []
1238 real_fsync_path = __import__("muse.core.object_store", fromlist=["_fsync_path"])._fsync_path
1239 real_replace = os.replace
1240
1241 def t_fsync_path(path: pathlib.Path) -> None:
1242 call_order.append("fsync_path")
1243 real_fsync_path(path)
1244
1245 def t_replace(s: str | bytes | os.PathLike[str], d: str | bytes | os.PathLike[str]) -> None:
1246 call_order.append("replace")
1247 real_replace(s, d)
1248
1249 with patch("muse.core.object_store._fsync_path", side_effect=t_fsync_path), \
1250 patch("muse.core.object_store.os.replace", side_effect=t_replace):
1251 restore_object(repo, oid, dest)
1252
1253 fp = next((i for i, c in enumerate(call_order) if c == "fsync_path"), None)
1254 rp = next((i for i, c in enumerate(call_order) if c == "replace"), None)
1255 assert fp is not None, "_fsync_path never called in restore_object"
1256 assert rp is not None, "os.replace never called in restore_object"
1257 assert fp < rp, f"_fsync_path (pos {fp}) must precede replace (pos {rp})"
1258
1259 def test_fsync_path_failure_non_fatal(self, tmp_path: pathlib.Path) -> None:
1260 """os.fsync failure inside _fsync_path must not abort restore_object."""
1261 repo = _repo(tmp_path)
1262 data = b"restore fsync_path fails gracefully"
1263 oid = _oid(data)
1264 write_object(repo, oid, data)
1265 dest = tmp_path / "restored.bin"
1266
1267 with patch("muse.core.object_store.os.fsync", side_effect=OSError("not supported")):
1268 result = restore_object(repo, oid, dest)
1269
1270 assert result is True
1271 assert dest.read_bytes() == data
1272
1273 def test_no_orphan_after_copy2_failure(self, tmp_path: pathlib.Path) -> None:
1274 repo = _repo(tmp_path)
1275 data = b"restore copy2 will fail"
1276 oid = _oid(data)
1277 write_object(repo, oid, data)
1278 dest = tmp_path / "out.bin"
1279
1280 with pytest.raises(OSError):
1281 with patch("muse.core.object_store.os.fdopen", side_effect=OSError("I/O error")):
1282 restore_object(repo, oid, dest)
1283
1284 assert _tmp_files(tmp_path) == [], "Orphan temp after write failure in restore_object"
1285
1286 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
1287 repo = _repo(tmp_path)
1288 data = b"restore replace will fail"
1289 oid = _oid(data)
1290 write_object(repo, oid, data)
1291 dest = tmp_path / "out.bin"
1292
1293 with pytest.raises(OSError):
1294 with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")):
1295 restore_object(repo, oid, dest)
1296
1297 assert _tmp_files(tmp_path) == [], "Orphan temp after os.replace failure in restore_object"
1298
1299 def test_restored_file_mtime_is_current_not_from_object_store(
1300 self, tmp_path: pathlib.Path
1301 ) -> None:
1302 """restore_object must set the destination mtime to NOW, not to the
1303 object-store file's mtime.
1304
1305 shutil.copy2 propagates the source (object-store) mtime to the temp
1306 file. Object-store files are written at commit time and may be days
1307 or weeks old. Without os.utime(tmp, None) the restored destination
1308 carries an old timestamp, causing editors (Cursor, VS Code, Vim) to
1309 see "new mtime < cached mtime" and serve a stale buffer instead of
1310 reloading the file. This is the regression that caused the
1311 "merge work disappears in Cursor but reappears on close/reopen" bug.
1312 """
1313 import time
1314
1315 repo = _repo(tmp_path)
1316 data = b"content that differs from any existing file\n" * 10
1317 oid = _oid(data)
1318 write_object(repo, oid, data)
1319
1320 # Simulate an old object-store mtime: backdate the stored object to 2 days ago.
1321 obj_path = object_path(repo, oid)
1322 two_days_ago = time.time() - (2 * 24 * 3600)
1323 os.utime(obj_path, (two_days_ago, two_days_ago))
1324
1325 # Write a pre-existing dest with a "current" mtime (simulating Cursor's
1326 # last-read timestamp before the checkout/merge).
1327 dest = tmp_path / "watched_file.py"
1328 dest.write_bytes(b"old content that cursor has open\n")
1329 cursor_cached_mtime = time.time()
1330 os.utime(dest, (cursor_cached_mtime, cursor_cached_mtime))
1331
1332 # restore_object must write the new content AND freshen mtime.
1333 t_before = time.time()
1334 restore_object(repo, oid, dest)
1335 t_after = time.time()
1336
1337 new_mtime = os.stat(dest).st_mtime
1338
1339 # Destination must have a FRESH timestamp — not the object-store's old one.
1340 assert new_mtime >= t_before, (
1341 f"Restored file mtime ({new_mtime:.2f}) is older than the time "
1342 f"restore_object was called ({t_before:.2f}). "
1343 "shutil.copy2 is propagating the object-store's stale mtime, "
1344 "which causes editors to serve stale buffers after checkout/merge."
1345 )
1346 assert new_mtime <= t_after + 1.0, (
1347 f"Restored file mtime ({new_mtime:.2f}) is far in the future — unexpected."
1348 )
1349
1350 # Content must be correct regardless.
1351 assert dest.read_bytes() == data
1352
1353 def test_restored_mtime_fresher_than_previous_content(
1354 self, tmp_path: pathlib.Path
1355 ) -> None:
1356 """After restore_object, the destination mtime must be >= the mtime it
1357 had before the call, so editors always see a forward-moving timestamp."""
1358 import time
1359
1360 repo = _repo(tmp_path)
1361 new_data = b"new version from feature branch\n"
1362 oid = _oid(new_data)
1363 write_object(repo, oid, new_data)
1364
1365 dest = tmp_path / "file.py"
1366 dest.write_bytes(b"old version on dev\n")
1367 old_mtime = time.time()
1368 os.utime(dest, (old_mtime, old_mtime))
1369
1370 # Backdate the object-store copy (as it would be after a real commit).
1371 obj_path = object_path(repo, oid)
1372 os.utime(obj_path, (old_mtime - 86400, old_mtime - 86400))
1373
1374 restore_object(repo, oid, dest)
1375
1376 assert os.stat(dest).st_mtime >= old_mtime, (
1377 "Restored file mtime went backwards — editor will not see the change."
1378 )
1379
1380
1381 # ---------------------------------------------------------------------------
1382 # Gap 7: page-cache non-flush — defense-in-depth (I-1 catches what I-2 misses)
1383 # ---------------------------------------------------------------------------
1384
1385 class TestPageCacheDefenseInDepth:
1386 """Demonstrate that I-1 (read-time hash verification) is the safety net
1387 for the unlikely scenario where fsync appeared to succeed but the kernel
1388 wrote zero bytes to disk (power loss AFTER rename, BEFORE flush).
1389
1390 Simulated by: writing an object normally, then zeroing the on-disk file
1391 (mimicking a power-loss-induced empty file at the renamed destination).
1392 read_object must raise OSError — the store never silently serves bad data.
1393 """
1394
1395 def test_zeroed_dest_after_rename_caught_by_read(self, tmp_path: pathlib.Path) -> None:
1396 """Simulate post-rename page-cache loss: zero the stored file, then read."""
1397 repo = _repo(tmp_path)
1398 data = b"page cache simulation"
1399 oid = _oid(data)
1400 write_object(repo, oid, data)
1401
1402 # Mimic power loss that zeroed the file after rename.
1403 _corrupt_file(object_path(repo, oid), b"\x00" * len(data))
1404
1405 with pytest.raises(OSError, match="integrity check"):
1406 read_object(repo, oid)
1407
1408 def test_truncated_dest_caught_by_read(self, tmp_path: pathlib.Path) -> None:
1409 """Simulate partial flush: only first half of bytes survived power loss."""
1410 repo = _repo(tmp_path)
1411 data = b"partial flush simulation" * 10
1412 oid = _oid(data)
1413 write_object(repo, oid, data)
1414 # Only the first half survived to disk.
1415 _corrupt_file(object_path(repo, oid), data[: len(data) // 2])
1416
1417 with pytest.raises(OSError, match="integrity check"):
1418 read_object(repo, oid)
1419
1420 def test_noop_write_detected(self, tmp_path: pathlib.Path) -> None:
1421 """Simulate fh.write no-op (page cache accepted write, never flushed).
1422
1423 We write the object normally and then zero the stored file to mimic the
1424 outcome of a post-rename page-cache flush failure. read_object must
1425 raise OSError — I-1's hash check is the final safety net for any I-2
1426 failure mode.
1427 """
1428 repo = _repo(tmp_path)
1429 data = b"write syscall accepted but page cache never flushed"
1430 oid = _oid(data)
1431
1432 # Write correctly first, then simulate the power-loss outcome: the
1433 # renamed destination was never actually flushed to durable storage.
1434 write_object(repo, oid, data)
1435 stored = object_path(repo, oid)
1436 _corrupt_file(stored, b"") # zero bytes — what a power loss leaves
1437
1438 with pytest.raises(OSError, match="integrity check"):
1439 read_object(repo, oid)
1440
1441
1442 # ---------------------------------------------------------------------------
1443 # Gap 8: same object_id written from N threads simultaneously — idempotency
1444 # ---------------------------------------------------------------------------
1445
1446 class TestIdempotentConcurrentWrite:
1447 """write_object is idempotent: same object_id written from many threads
1448 concurrently must never produce corruption — only one write wins, others
1449 see exists() and skip. The content of the winner must be correct.
1450 """
1451
1452 def test_same_object_50_threads_no_corruption(self, tmp_path: pathlib.Path) -> None:
1453 """50 threads writing the same object_id must all succeed with correct content."""
1454 repo = _repo(tmp_path)
1455 data = b"idempotent object written from 50 threads"
1456 oid = _oid(data)
1457 errors: list[str] = []
1458
1459 def writer() -> None:
1460 try:
1461 write_object(repo, oid, data)
1462 result = read_object(repo, oid)
1463 if result != data:
1464 errors.append(f"Mismatch: {repr(result)[:30]}")
1465 except Exception as exc:
1466 errors.append(f"Exception: {exc}")
1467
1468 threads = [threading.Thread(target=writer) for _ in range(50)]
1469 for t in threads:
1470 t.start()
1471 for t in threads:
1472 t.join()
1473
1474 assert errors == [], f"Idempotent concurrent write errors:\n{'\n'.join(errors)}"
1475 assert read_object(repo, oid) == data
1476 assert _tmp_files(tmp_path) == []
1477
1478 def test_same_object_distinct_content_rejected(self, tmp_path: pathlib.Path) -> None:
1479 """Writing different bytes under the same object_id is always rejected."""
1480 repo = _repo(tmp_path)
1481 data = b"canonical content"
1482 oid = _oid(data)
1483 wrong = b"wrong content that hashes differently"
1484
1485 write_object(repo, oid, data)
1486
1487 with pytest.raises(ValueError, match="integrity"):
1488 write_object(repo, oid, wrong)
1489
1490 assert read_object(repo, oid) == data
1491
1492
1493 # ---------------------------------------------------------------------------
1494 # Gap 9: mid-write fh.write failure — orphan cleaned up
1495 # ---------------------------------------------------------------------------
1496
1497 class TestMidWriteFailureCleanup:
1498 """OSError raised during the fh.write call (disk full mid-write) must not
1499 leave an orphan temp file in the store directory."""
1500
1501 def test_no_orphan_after_write_failure_write_object(self, tmp_path: pathlib.Path) -> None:
1502 """OSError during fh.write must not leave an orphan temp file."""
1503 from unittest.mock import MagicMock
1504 repo = _repo(tmp_path)
1505 data = b"mid-write failure"
1506 oid = _oid(data)
1507
1508 mock_fh = MagicMock()
1509 mock_fh.__enter__.return_value = mock_fh
1510 mock_fh.write.side_effect = OSError("disk full")
1511 mock_fh.flush.return_value = None
1512 mock_fh.fileno.return_value = -1
1513
1514 with pytest.raises(OSError):
1515 with patch("muse.core.object_store.os.fdopen", return_value=mock_fh):
1516 write_object(repo, oid, data)
1517
1518 assert _tmp_files(tmp_path) == [], "Orphan temp file left after mid-write failure"
1519
1520 def test_no_orphan_after_write_failure_write_text_atomic(self, tmp_path: pathlib.Path) -> None:
1521 """write_text_atomic cleans up the temp file when fh.write raises."""
1522 from unittest.mock import MagicMock
1523 path = tmp_path / "state.txt"
1524
1525 mock_fh = MagicMock()
1526 mock_fh.__enter__.return_value = mock_fh
1527 mock_fh.write.side_effect = OSError("disk full")
1528 mock_fh.flush.return_value = None
1529 mock_fh.fileno.return_value = -1
1530
1531 with pytest.raises(OSError):
1532 with patch("muse.core.store.os.fdopen", return_value=mock_fh):
1533 write_text_atomic(path, "will fail")
1534
1535 assert _tmp_files(tmp_path) == [], "Orphan temp left after write_text_atomic mid-write failure"
1536
1537
1538 # ---------------------------------------------------------------------------
1539 # Gap 10: 10 000 sequential commits — store clean throughout
1540 # ---------------------------------------------------------------------------
1541
1542 class TestSequentialStress:
1543 """10 000 sequential commit writes exercise the full fsync+rename path at
1544 scale. The store must be clean (all readable, no orphans) when done.
1545
1546 Based on the Linux-kernel-migration scenario: Linus runs a git-to-muse
1547 import script that writes 75k commits. We test at 10k to keep CI fast.
1548 """
1549
1550 @pytest.mark.slow
1551 def test_10000_sequential_commits_all_readable(self, tmp_path: pathlib.Path) -> None:
1552 """1 000 sequential commits — every one must be readable after write."""
1553 repo = _repo(tmp_path)
1554 commits = [_commit(i) for i in range(1_000)]
1555
1556 for c in commits:
1557 write_commit(repo, c)
1558
1559 # Verify every commit is readable and correct.
1560 failures: list[str] = []
1561 for c in commits:
1562 result = read_commit(repo, c.commit_id)
1563 if result is None:
1564 failures.append(f"Commit {c.commit_id[:8]} not found after write")
1565 elif result.message != c.message:
1566 failures.append(f"Commit {c.commit_id[:8]} message corrupted")
1567
1568 assert failures == [], f"{len(failures)} commit read failures:\n{'\n'.join(failures[:10])}"
1569 assert _tmp_files(tmp_path) == [], "Orphan temps after sequential commit writes"
1570
1571 @pytest.mark.slow
1572 def test_1000_commits_with_20pct_fsync_failure_all_readable(
1573 self, tmp_path: pathlib.Path
1574 ) -> None:
1575 """100 commits with 20% random fsync failures must all land correctly.
1576
1577 Verifies that fsync failure is gracefully handled and atomicity (torn-write
1578 protection) is maintained even when durability (fsync) is degraded.
1579 """
1580 import random as _random
1581 repo = _repo(tmp_path)
1582 rng = _random.Random(42)
1583 commits = [_commit(i) for i in range(100)]
1584 real_fsync = os.fsync
1585
1586 def flaky_fsync(fd: int) -> None:
1587 if rng.random() < 0.2:
1588 raise OSError("simulated fsync failure")
1589 real_fsync(fd)
1590
1591 with patch("muse.core.store.os.fsync", side_effect=flaky_fsync):
1592 for c in commits:
1593 write_commit(repo, c)
1594
1595 failures: list[str] = []
1596 for c in commits:
1597 result = read_commit(repo, c.commit_id)
1598 if result is None:
1599 failures.append(f"Commit {c.commit_id[:8]} not found")
1600 elif result.message != c.message:
1601 failures.append(f"Commit {c.commit_id[:8]} corrupted")
1602
1603 assert failures == [], f"Commits lost under flaky fsync:\n{'\n'.join(failures)}"
1604 assert _tmp_files(tmp_path) == []
1605
1606
1607 # ---------------------------------------------------------------------------
1608 # Gap 11: SIGKILL crash safety — process kill leaves no orphans
1609 # ---------------------------------------------------------------------------
1610
1611 class TestProcessKillCrashSafety:
1612 """Simulate abrupt process termination (SIGKILL) during an object write.
1613
1614 Uses multiprocessing to run the writer in a child process, then kills it
1615 with SIGKILL at a random moment. Afterward, the store must be consistent:
1616 - Objects fully written before the kill must be readable and hash-correct.
1617 - No orphan temp files must remain (OS cleans up open fds; the temp file
1618 created by mkstemp is unlinked by the OS when the process dies, since
1619 it holds the only reference via the fd).
1620
1621 Note: On most POSIX systems, a SIGKILL'd process that holds an open
1622 mkstemp fd will have that fd closed by the kernel. The temp file remains
1623 on disk (the fd close doesn't unlink it) but the rename never happens, so
1624 the destination is either fully written or absent — never partial.
1625 This test verifies the store consistency guarantee, not orphan cleanup
1626 (orphan GC is a separate I-6 concern).
1627 """
1628
1629 @pytest.mark.slow
1630 def test_sigkill_during_write_leaves_no_partial_dest(
1631 self, tmp_path: pathlib.Path
1632 ) -> None:
1633 """Objects written before SIGKILL must still be readable after kill."""
1634 import multiprocessing
1635 import signal
1636 import time
1637
1638 repo = _repo(tmp_path)
1639
1640 # Write 20 known objects before spawning the crashable process.
1641 pre_oids: list[str] = []
1642 for i in range(20):
1643 data = f"pre-kill-{i}".encode()
1644 oid = _oid(data)
1645 write_object(repo, oid, data)
1646 pre_oids.append(oid)
1647
1648 # "spawn" starts a fresh interpreter — no multi-threaded-fork warning
1649 # and no risk of deadlocks inherited from the pytest runner's threads.
1650 # _sigkill_writer_worker is defined at module level to ensure it is
1651 # picklable across the spawn boundary.
1652 ctx = multiprocessing.get_context("spawn")
1653 proc = ctx.Process(target=_sigkill_writer_worker, args=(repo, 5000))
1654 proc.start()
1655
1656 # Kill the worker after a short random delay.
1657 import random
1658 time.sleep(random.uniform(0.01, 0.05))
1659 if proc.is_alive():
1660 assert proc.pid is not None
1661 os.kill(proc.pid, signal.SIGKILL)
1662 proc.join()
1663
1664 # All pre-kill objects must still be readable and correct.
1665 for i, oid in enumerate(pre_oids):
1666 data = f"pre-kill-{i}".encode()
1667 result = read_object(repo, oid)
1668 assert result == data, f"Pre-kill object {oid[:8]} corrupted after SIGKILL"
1669
1670 # Store consistency: every object file in the store must hash-verify.
1671 import muse.core.object_store as _ost
1672 all_oids = _ost._iter_all_object_ids(repo) if hasattr(_ost, "_iter_all_object_ids") else []
1673 for oid in all_oids:
1674 try:
1675 read_object(repo, oid) # raises on hash mismatch
1676 except OSError as exc:
1677 pytest.fail(f"Corrupt object {oid[:8]} found after SIGKILL: {exc}")
1678
1679
1680 # ---------------------------------------------------------------------------
1681 # Gap 12: Performance benchmark — 4 KiB msgpack write + fsync < 5 ms
1682 # ---------------------------------------------------------------------------
1683
1684 class TestFsyncWritePerformance:
1685 """fsync overhead on a 4 KiB msgpack file must be < 5 ms.
1686
1687 The syscall is dominated by the OS flush latency, not the data volume.
1688 tmpfs (which tmp_path typically uses on Linux) syncs instantly; on macOS
1689 with APFS this is also sub-millisecond. 5 ms is a very generous budget —
1690 a real NVMe commit flush is typically < 0.5 ms.
1691 """
1692
1693 @pytest.mark.perf
1694 def test_write_commit_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None:
1695 """Single 4 KiB commit write (msgpack + fsync + rename) < 5 ms."""
1696 import time
1697 repo = _repo(tmp_path)
1698 c = _commit(99_000)
1699
1700 start = time.perf_counter()
1701 write_commit(repo, c)
1702 duration_ms = (time.perf_counter() - start) * 1000
1703
1704 assert read_commit(repo, c.commit_id) is not None
1705 assert duration_ms < 10, (
1706 f"write_commit took {duration_ms:.2f} ms — exceeds the 10 ms fsync budget. "
1707 "Performance regression in the atomic write path."
1708 )
1709
1710 @pytest.mark.perf
1711 def test_write_object_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None:
1712 """Single 4 KiB object write (bytes + fsync + rename) < 5 ms."""
1713 import time
1714 repo = _repo(tmp_path)
1715 data = os.urandom(4096)
1716 oid = _oid(data)
1717
1718 start = time.perf_counter()
1719 write_object(repo, oid, data)
1720 duration_ms = (time.perf_counter() - start) * 1000
1721
1722 assert read_object(repo, oid) == data
1723 assert duration_ms < 10, (
1724 f"write_object took {duration_ms:.2f} ms — exceeds the 10 ms fsync budget."
1725 )
1726
1727 @pytest.mark.perf
1728 def test_write_text_atomic_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None:
1729 """write_text_atomic on a 4 KiB text blob < 5 ms."""
1730 import time
1731 path = tmp_path / "state.txt"
1732 text = "x" * 4096
1733
1734 start = time.perf_counter()
1735 write_text_atomic(path, text)
1736 duration_ms = (time.perf_counter() - start) * 1000
1737
1738 assert path.read_text() == text
1739 assert duration_ms < 10, (
1740 f"write_text_atomic took {duration_ms:.2f} ms — exceeds the 10 ms budget."
1741 )
File History 1 commit
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago