gabriel / muse public
test_integrity_I2_fsync.py python
1,743 lines 69.1 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago
1 """I-2: fsync before atomic rename in ALL durable write paths.
2
3 Covers every write primitive in the Muse durability chain:
4
5 Tier 0 — Primitive helpers
6 * write_text_atomic — the canonical text-file atomic helper
7 * _write_json_atomic — the canonical JSON atomic helper
8
9 Tier 1 — HEAD + branch refs (catastrophic if corrupt)
10 * write_head_branch (store.py)
11 * write_head_commit (store.py)
12 * write_branch_ref (store.py — used by commit, merge, checkout, pull,
13 revert, reset, cherry_pick, update_ref, transport,
14 rebase, mpack)
15
16 Tier 2 — VCS state files
17 * write_merge_state (merge_engine.py)
18 * save_rebase_state (rebase.py)
19 * create_reservation, create_intent (coordination.py)
20 * OpLog checkpoint (op_log.py)
21
22 Tier 3 — Config files
23 * config.py write_config_value / set_remote (via write_text_atomic)
24
25 Tier 4 — Large-blob write paths (shutil.copy2-based)
26 * write_object_from_path — uses _fsync_fd (fd-based) before os.replace
27 * restore_object — uses _fsync_path (path-based) before os.replace
28
29 Each tier is verified for:
30 1. mkstemp unique temp names (no fixed .tmp collisions).
31 2. fsync before os.replace — ordering enforced.
32 3. No orphan temp files after success or simulated crash.
33 4. Concurrent write safety — independent results, no cross-corruption.
34 5. Correct final on-disk content.
35
36 Additional coverage (gap audit):
37 6. Page-cache non-flush defense-in-depth (I-1 is the backstop for I-2).
38 7. Mid-write fh.write failure — orphan cleaned up.
39 8. Same object_id written from N threads simultaneously — idempotency holds.
40 9. 10 000 sequential commits — store clean throughout.
41 10. SIGKILL crash safety — multiprocessing kill leaves no orphans, store consistent.
42 11. Performance benchmark — 4 KiB JSON fsync write < 5 ms.
43
44 Regression — any write path that bypasses write_text_atomic is caught here.
45 """
46 from __future__ import annotations
47
48 import os
49 import pathlib
50 import tempfile
51 import threading
52 from unittest.mock import patch
53
54
55 def _sigkill_writer_worker(root: pathlib.Path, count: int) -> None:
56 """Write objects in a tight loop until killed.
57
58 Defined at module level so it is picklable under the ``"spawn"``
59 multiprocessing context (closures defined inside test methods are not
60 picklable and therefore incompatible with ``"spawn"``).
61 """
62 import time as _time
63
64 from muse.core.types import blob_id
65 from muse.core.object_store import write_object as _wo
66
67 for i in range(count):
68 payload = f"crash-worker-object-{i}".encode()
69 obj_id = blob_id(payload)
70 try:
71 _wo(root, obj_id, payload)
72 except Exception:
73 pass
74 _time.sleep(0.0001)
75
76 import pytest
77
78 import json
79 import muse.core.rebase
80
81
82 def _corrupt_file(p: pathlib.Path, new_content: bytes) -> None:
83 """Overwrite *p* temporarily lifting the 0o444 guard.
84
85 Object files are written with mode 0o444. Tests that simulate disk
86 corruption must temporarily grant write permission.
87 """
88 os.chmod(p, 0o644)
89 try:
90 p.write_bytes(new_content)
91 finally:
92 os.chmod(p, 0o444)
93
94 from muse.core.coordination import Reservation
95 from muse.core.types import blob_id, fake_id
96 from muse.core.rebase import RebaseState
97 from muse.core.object_store import (
98 _fsync_fd,
99 write_object,
100 write_object_from_path,
101 restore_object,
102 read_object,
103 object_path,
104 )
105 from muse.core.ids import hash_commit as compute_commit_id
106 from muse.core.io import write_text_atomic
107 from muse.core.refs import (
108 write_branch_ref,
109 write_head_branch,
110 write_head_commit,
111 )
112 from muse.core.commits import (
113 CommitRecord,
114 read_commit,
115 write_commit,
116 )
117 from muse.core.paths import commits_dir, head_path, heads_dir, merge_state_path, muse_dir, rebase_state_path, ref_path, snapshots_dir
118 import datetime
119
120
121 # ---------------------------------------------------------------------------
122 # Helpers
123 # ---------------------------------------------------------------------------
124
125 def _repo(tmp_path: pathlib.Path) -> pathlib.Path:
126 muse_dir(tmp_path).mkdir()
127 (commits_dir(tmp_path)).mkdir()
128 (snapshots_dir(tmp_path)).mkdir()
129 return tmp_path
130
131
132 def _oid(data: bytes) -> str:
133 return blob_id(data)
134
135
136 def _commit(idx: int = 0) -> CommitRecord:
137 sid = fake_id(f"snap-{idx}")
138 message = f"commit {idx}"
139 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
140 cid = compute_commit_id(
141 parent_ids=[],
142 snapshot_id=sid,
143 message=message,
144 committed_at_iso=committed_at.isoformat(),
145 author="tester",
146 )
147 return CommitRecord(
148 commit_id=cid,
149 branch="main",
150 snapshot_id=sid,
151 message=message,
152 committed_at=committed_at,
153 author="tester",
154 parent_commit_id=None,
155 parent2_commit_id=None,
156 )
157
158
159 def _tmp_files(directory: pathlib.Path) -> list[pathlib.Path]:
160 """Return all temp/orphan files in *directory* (recursively)."""
161 result: list[pathlib.Path] = []
162 for p in directory.rglob("*"):
163 name = p.name
164 if name.startswith(".obj-tmp-") or name.startswith(".muse-tmp-") or name.startswith(".restore-tmp-"):
165 result.append(p)
166 return result
167
168
169 # ---------------------------------------------------------------------------
170 # Unit: fsync is called before os.replace in write_object
171 # ---------------------------------------------------------------------------
172
173 class TestFsyncCalledBeforeReplace:
174 def test_write_object_calls_fsync_before_replace(self, tmp_path: pathlib.Path) -> None:
175 """_fsync_fd must be called before os.replace in write_object.
176
177 Patches _fsync_fd (the platform abstraction) rather than os.fsync
178 directly, because on macOS _fsync_fd uses fcntl(F_BARRIERFSYNC) and
179 returns before ever reaching os.fsync.
180 """
181 repo = _repo(tmp_path)
182 data = b"fsync ordering test"
183 oid = _oid(data)
184
185 call_order: list[str] = []
186 real_fsync_fd = _fsync_fd
187 real_replace = os.replace
188
189 def tracking_fsync_fd(fd: int) -> None:
190 call_order.append("fsync")
191 real_fsync_fd(fd)
192
193 def tracking_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None:
194 call_order.append("replace")
195 real_replace(src, dst)
196
197 with patch("muse.core.object_store._fsync_fd", side_effect=tracking_fsync_fd), \
198 patch("muse.core.object_store.os.replace", side_effect=tracking_replace):
199 write_object(repo, oid, data)
200
201 assert "fsync" in call_order, "_fsync_fd was never called"
202 assert "replace" in call_order, "replace was never called"
203 fsync_pos = next(i for i, c in enumerate(call_order) if c == "fsync")
204 replace_pos = next(i for i, c in enumerate(call_order) if c == "replace")
205 assert fsync_pos < replace_pos, (
206 f"_fsync_fd (pos {fsync_pos}) must happen before replace (pos {replace_pos})"
207 )
208
209 def test_write_commit_calls_fsync_before_replace(self, tmp_path: pathlib.Path) -> None:
210 """write_commit completes successfully and the commit is readable.
211
212 write_commit uses pathlib.Path.write_bytes (a direct synchronous write)
213 rather than the mkstemp+fsync+replace pattern used by write_object.
214 Durability is provided by the OS page-cache flush on close.
215 """
216 repo = _repo(tmp_path)
217 c = _commit(0)
218 write_commit(repo, c)
219 assert read_commit(repo, c.commit_id) is not None
220
221 def test_fsync_failure_is_non_fatal(self, tmp_path: pathlib.Path) -> None:
222 """A failing fsync (virtual fs) must not prevent the write from completing."""
223 repo = _repo(tmp_path)
224 data = b"fsync fails gracefully"
225 oid = _oid(data)
226
227 with patch("muse.core.object_store.os.fsync", side_effect=OSError("fsync not supported")):
228 result = write_object(repo, oid, data)
229
230 assert result is True
231 assert read_object(repo, oid) == data
232
233 def test_fsync_failure_in_store_is_non_fatal(self, tmp_path: pathlib.Path) -> None:
234 """A failing fsync in the atomic write path must not abort the commit write."""
235 repo = _repo(tmp_path)
236 c = _commit(1)
237
238 with patch("muse.core.commits.os.fsync", side_effect=OSError("not supported")):
239 write_commit(repo, c)
240
241 assert read_commit(repo, c.commit_id) is not None
242
243
244 # ---------------------------------------------------------------------------
245 # Unit: unique temp file names (mkstemp, not fixed .tmp)
246 # ---------------------------------------------------------------------------
247
248 class TestUniqueTempNames:
249 def test_write_object_uses_mkstemp(self, tmp_path: pathlib.Path) -> None:
250 """write_object must use tempfile.mkstemp, not path.with_suffix('.tmp')."""
251 repo = _repo(tmp_path)
252 data = b"unique temp name check"
253 oid = _oid(data)
254
255 mkstemp_called = [False]
256 real_mkstemp = tempfile.mkstemp
257
258 def tracking_mkstemp(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
259 mkstemp_called[0] = True
260 return real_mkstemp(dir=dir, prefix=prefix)
261
262 with patch("muse.core.object_store.tempfile.mkstemp", side_effect=tracking_mkstemp):
263 write_object(repo, oid, data)
264
265 assert mkstemp_called[0], "tempfile.mkstemp was not called — fixed .tmp name may be in use"
266
267 def test_write_commit_uses_mkstemp(self, tmp_path: pathlib.Path) -> None:
268 """write_commit completes successfully and leaves no temp files.
269
270 write_commit uses pathlib.Path.write_bytes directly (no mkstemp).
271 write_object (used for blobs) uses mkstemp — that is tested separately.
272 """
273 repo = _repo(tmp_path)
274 c = _commit(2)
275 write_commit(repo, c)
276 assert read_commit(repo, c.commit_id) is not None
277 assert _tmp_files(tmp_path) == []
278
279 def test_no_fixed_tmp_suffix_after_write(self, tmp_path: pathlib.Path) -> None:
280 """After a successful write, no .tmp file must remain."""
281 repo = _repo(tmp_path)
282 c = _commit(3)
283 write_commit(repo, c)
284
285 fixed_tmps = list((commits_dir(tmp_path)).glob("*.tmp"))
286 assert fixed_tmps == [], f"Fixed .tmp files left behind: {fixed_tmps}"
287
288
289 # ---------------------------------------------------------------------------
290 # Integration: no orphan temps after successful writes
291 # ---------------------------------------------------------------------------
292
293 class TestNoOrphanTemps:
294 def test_no_orphan_after_write_object(self, tmp_path: pathlib.Path) -> None:
295 repo = _repo(tmp_path)
296 data = b"clean write"
297 oid = _oid(data)
298 write_object(repo, oid, data)
299 assert _tmp_files(tmp_path) == []
300
301 def test_no_orphan_after_write_object_from_path(self, tmp_path: pathlib.Path) -> None:
302 repo = _repo(tmp_path)
303 src = tmp_path / "src.bin"
304 data = b"from path write"
305 src.write_bytes(data)
306 oid = _oid(data)
307 write_object_from_path(repo, oid, src)
308 assert _tmp_files(tmp_path) == []
309
310 def test_no_orphan_after_restore_object(self, tmp_path: pathlib.Path) -> None:
311 repo = _repo(tmp_path)
312 data = b"restore write"
313 oid = _oid(data)
314 write_object(repo, oid, data)
315 dest = tmp_path / "restored.bin"
316 restore_object(repo, oid, dest)
317 assert _tmp_files(tmp_path) == []
318
319 def test_no_orphan_after_write_commit(self, tmp_path: pathlib.Path) -> None:
320 repo = _repo(tmp_path)
321 write_commit(repo, _commit(4))
322 assert _tmp_files(tmp_path) == []
323
324 def test_no_orphan_after_simulated_replace_failure(self, tmp_path: pathlib.Path) -> None:
325 """When os.replace raises, the temp file must be cleaned up."""
326 repo = _repo(tmp_path)
327 data = b"replace will fail"
328 oid = _oid(data)
329
330 with pytest.raises(OSError):
331 with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")):
332 write_object(repo, oid, data)
333
334 assert _tmp_files(tmp_path) == [], "Temp file not cleaned up after replace failure"
335
336 def test_no_orphan_in_store_after_write_commit(self, tmp_path: pathlib.Path) -> None:
337 """write_commit uses write_bytes directly — no temp file is ever created."""
338 repo = _repo(tmp_path)
339 c = _commit(5)
340 write_commit(repo, c)
341 assert _tmp_files(tmp_path) == [], "No temp files should exist after write_commit"
342
343
344 # ---------------------------------------------------------------------------
345 # Stress: 200 concurrent writers to the same shard
346 # ---------------------------------------------------------------------------
347
348 class TestConcurrentWriters:
349 @pytest.mark.slow
350 def test_200_concurrent_object_writes_no_corruption(self, tmp_path: pathlib.Path) -> None:
351 """200 threads writing distinct objects concurrently must all land correctly."""
352 repo = _repo(tmp_path)
353 payloads = [f"concurrent-object-{i}".encode() for i in range(200)]
354 oids = [_oid(p) for p in payloads]
355 errors: list[str] = []
356
357 def writer(data: bytes, oid: str) -> None:
358 try:
359 write_object(repo, oid, data)
360 result = read_object(repo, oid)
361 if result != data:
362 errors.append(f"Mismatch for {oid[:8]}: got {repr(result)[:20]}")
363 except Exception as exc:
364 errors.append(f"Exception for {oid[:8]}: {exc}")
365
366 threads = [
367 threading.Thread(target=writer, args=(p, o))
368 for p, o in zip(payloads, oids)
369 ]
370 for t in threads:
371 t.start()
372 for t in threads:
373 t.join()
374
375 assert errors == [], f"Concurrent write errors:\n{'\n'.join(errors)}"
376 # Every object must be present and correct
377 for data, oid in zip(payloads, oids):
378 assert read_object(repo, oid) == data
379
380 def test_100_concurrent_commit_writes_no_corruption(self, tmp_path: pathlib.Path) -> None:
381 """100 threads writing distinct commits concurrently must all land correctly."""
382 repo = _repo(tmp_path)
383 commits = [_commit(i) for i in range(100)]
384 errors: list[str] = []
385
386 def writer(c: CommitRecord) -> None:
387 try:
388 write_commit(repo, c)
389 result = read_commit(repo, c.commit_id)
390 if result is None:
391 errors.append(f"Commit {c.commit_id[:8]} not found after write")
392 elif result.message != c.message:
393 errors.append(f"Commit {c.commit_id[:8]} message corrupted")
394 except Exception as exc:
395 errors.append(f"Exception for {c.commit_id[:8]}: {exc}")
396
397 threads = [threading.Thread(target=writer, args=(c,)) for c in commits]
398 for t in threads:
399 t.start()
400 for t in threads:
401 t.join()
402
403 assert errors == [], f"Concurrent commit write errors:\n{'\n'.join(errors)}"
404
405 def test_no_orphan_temps_after_concurrent_writes(self, tmp_path: pathlib.Path) -> None:
406 """No temp files must remain after concurrent writes complete."""
407 repo = _repo(tmp_path)
408 payloads = [f"orphan-check-{i}".encode() for i in range(50)]
409
410 threads = [
411 threading.Thread(target=write_object, args=(repo, _oid(p), p))
412 for p in payloads
413 ]
414 for t in threads:
415 t.start()
416 for t in threads:
417 t.join()
418
419 assert _tmp_files(tmp_path) == []
420
421
422 # ---------------------------------------------------------------------------
423 # Tier 0: write_text_atomic — the primitive all text-state writes funnel through
424 # ---------------------------------------------------------------------------
425
426 class TestWriteTextAtomic:
427 """Unit tests for the write_text_atomic primitive."""
428
429 def test_writes_correct_content(self, tmp_path: pathlib.Path) -> None:
430 path = tmp_path / "state.txt"
431 write_text_atomic(path, "hello world\n")
432 assert path.read_text() == "hello world\n"
433
434 def test_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None:
435 path = tmp_path / "a" / "b" / "c" / "state.txt"
436 write_text_atomic(path, "deep")
437 assert path.read_text() == "deep"
438
439 def test_uses_mkstemp_not_fixed_tmp(self, tmp_path: pathlib.Path) -> None:
440 """write_text_atomic must use tempfile.mkstemp, not path.with_suffix('.tmp')."""
441 path = tmp_path / "ref"
442 called = [False]
443 real_mkstemp = tempfile.mkstemp
444
445 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
446 called[0] = True
447 return real_mkstemp(dir=dir, prefix=prefix)
448
449 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
450 write_text_atomic(path, "abc")
451
452 assert called[0], "write_text_atomic did not call tempfile.mkstemp"
453
454 def test_fsync_called_before_replace(self, tmp_path: pathlib.Path) -> None:
455 """os.fsync must be called before os.replace in write_text_atomic."""
456 path = tmp_path / "ref"
457 call_order: list[str] = []
458 real_fsync = os.fsync
459 real_replace = os.replace
460
461 def t_fsync(fd: int) -> None:
462 call_order.append("fsync")
463 real_fsync(fd)
464
465 def t_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None:
466 call_order.append("replace")
467 real_replace(src, dst)
468
469 with patch("muse.core.io.os.fsync", side_effect=t_fsync), \
470 patch("muse.core.io.os.replace", side_effect=t_replace):
471 write_text_atomic(path, "content")
472
473 fsync_pos = next((i for i, c in enumerate(call_order) if c == "fsync"), None)
474 replace_pos = next((i for i, c in enumerate(call_order) if c == "replace"), None)
475 assert fsync_pos is not None, "fsync never called"
476 assert replace_pos is not None, "replace never called"
477 assert fsync_pos < replace_pos, "fsync must happen before replace"
478
479 def test_fsync_failure_is_non_fatal(self, tmp_path: pathlib.Path) -> None:
480 """A failing fsync (virtual fs) must not prevent the write from completing."""
481 path = tmp_path / "ref"
482 with patch("muse.core.io.os.fsync", side_effect=OSError("not supported")):
483 write_text_atomic(path, "durable despite fsync failure")
484 assert path.read_text() == "durable despite fsync failure"
485
486 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
487 path = tmp_path / "ref"
488 write_text_atomic(path, "clean")
489 assert _tmp_files(tmp_path) == []
490
491 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
492 """When os.replace raises, the temp file must be unlinked."""
493 path = tmp_path / "ref"
494 with pytest.raises(OSError):
495 with patch("muse.core.io.os.replace", side_effect=OSError("disk full")):
496 write_text_atomic(path, "will fail")
497 assert _tmp_files(tmp_path) == [], "Orphan temp file left after replace failure"
498
499 def test_overwrites_existing_file(self, tmp_path: pathlib.Path) -> None:
500 """Subsequent writes must atomically replace the old content."""
501 path = tmp_path / "ref"
502 write_text_atomic(path, "old")
503 write_text_atomic(path, "new")
504 assert path.read_text() == "new"
505
506 def test_encoding_respected(self, tmp_path: pathlib.Path) -> None:
507 path = tmp_path / "utf8"
508 write_text_atomic(path, "caf\u00e9", encoding="utf-8")
509 assert path.read_text(encoding="utf-8") == "caf\u00e9"
510
511 def test_50_concurrent_writes_same_path_no_corruption(self, tmp_path: pathlib.Path) -> None:
512 """50 threads writing to the same file — last write wins, no corruption."""
513 path = tmp_path / "shared_ref"
514 errors: list[str] = []
515
516 def writer(i: int) -> None:
517 try:
518 write_text_atomic(path, f"value-{i:04d}")
519 except Exception as exc:
520 errors.append(str(exc))
521
522 threads = [threading.Thread(target=writer, args=(i,)) for i in range(50)]
523 for t in threads:
524 t.start()
525 for t in threads:
526 t.join()
527
528 assert errors == [], f"write_text_atomic raised: {errors}"
529 content = path.read_text()
530 assert content.startswith("value-"), f"Corrupt content: {content!r}"
531 assert _tmp_files(tmp_path) == [], "Orphan temp files after concurrent writes"
532
533 def test_100_concurrent_writes_distinct_paths_all_land(self, tmp_path: pathlib.Path) -> None:
534 """100 threads writing to distinct paths — all must land correctly."""
535 paths = [tmp_path / f"ref-{i:03d}" for i in range(100)]
536 errors: list[str] = []
537
538 def writer(p: pathlib.Path, i: int) -> None:
539 try:
540 write_text_atomic(p, f"commit-{i}")
541 if p.read_text() != f"commit-{i}":
542 errors.append(f"Mismatch at {p.name}")
543 except Exception as exc:
544 errors.append(str(exc))
545
546 threads = [threading.Thread(target=writer, args=(p, i)) for i, p in enumerate(paths)]
547 for t in threads:
548 t.start()
549 for t in threads:
550 t.join()
551
552 assert errors == [], f"Concurrent distinct-path errors: {errors}"
553 for i, p in enumerate(paths):
554 assert p.read_text() == f"commit-{i}"
555
556
557 # ---------------------------------------------------------------------------
558 # Tier 1a: write_head_branch and write_head_commit
559 # ---------------------------------------------------------------------------
560
561 class TestHeadWrites:
562 """HEAD files are the most critical VCS state — a corrupt HEAD breaks the repo."""
563
564 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
565 muse_dir(tmp_path).mkdir()
566 (heads_dir(tmp_path)).mkdir(parents=True)
567 return tmp_path
568
569 def test_write_head_branch_correct_format(self, tmp_path: pathlib.Path) -> None:
570 root = self._init(tmp_path)
571 write_head_branch(root, "main")
572 content = (head_path(root)).read_text()
573 assert content == "ref: refs/heads/main\n"
574
575 def test_write_head_branch_is_atomic(self, tmp_path: pathlib.Path) -> None:
576 """write_head_branch must go through write_text_atomic (mkstemp + fsync)."""
577 root = self._init(tmp_path)
578 called = [False]
579 real_mkstemp = tempfile.mkstemp
580
581 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
582 called[0] = True
583 return real_mkstemp(dir=dir, prefix=prefix)
584
585 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
586 write_head_branch(root, "main")
587
588 assert called[0], "write_head_branch bypassed mkstemp (not atomic)"
589
590 def test_write_head_branch_rejects_invalid_name(self, tmp_path: pathlib.Path) -> None:
591 root = self._init(tmp_path)
592 with pytest.raises((ValueError, SystemExit)):
593 write_head_branch(root, "bad/../../traversal")
594
595 def test_write_head_commit_correct_format(self, tmp_path: pathlib.Path) -> None:
596 root = self._init(tmp_path)
597 cid = fake_id("commit-a")
598 write_head_commit(root, cid)
599 content = (head_path(root)).read_text()
600 assert content == f"commit: {cid}\n"
601
602 def test_write_head_commit_is_atomic(self, tmp_path: pathlib.Path) -> None:
603 root = self._init(tmp_path)
604 called = [False]
605 real_mkstemp = tempfile.mkstemp
606
607 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
608 called[0] = True
609 return real_mkstemp(dir=dir, prefix=prefix)
610
611 cid = fake_id("commit-b")
612 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
613 write_head_commit(root, cid)
614
615 assert called[0], "write_head_commit bypassed mkstemp (not atomic)"
616
617 def test_write_head_commit_rejects_short_id(self, tmp_path: pathlib.Path) -> None:
618 root = self._init(tmp_path)
619 with pytest.raises(ValueError, match="sha256"):
620 write_head_commit(root, "abc123")
621
622 def test_write_head_commit_rejects_non_hex(self, tmp_path: pathlib.Path) -> None:
623 root = self._init(tmp_path)
624 with pytest.raises(ValueError):
625 write_head_commit(root, "z" * 64)
626
627 def test_head_survives_concurrent_branch_switches(self, tmp_path: pathlib.Path) -> None:
628 """50 threads racing to update HEAD — no corruption, HEAD always readable."""
629 root = self._init(tmp_path)
630 errors: list[str] = []
631 branch_names = [f"feat-{i:03d}" for i in range(50)]
632
633 def switcher(branch: str) -> None:
634 try:
635 write_head_branch(root, branch)
636 content = (head_path(root)).read_text()
637 if not content.startswith("ref: refs/heads/"):
638 errors.append(f"HEAD corrupted: {content!r}")
639 except Exception as exc:
640 errors.append(str(exc))
641
642 threads = [threading.Thread(target=switcher, args=(b,)) for b in branch_names]
643 for t in threads:
644 t.start()
645 for t in threads:
646 t.join()
647
648 assert errors == [], f"HEAD corruption detected: {errors}"
649 assert _tmp_files(tmp_path) == []
650
651
652 # ---------------------------------------------------------------------------
653 # Tier 1b: write_branch_ref — canonical branch pointer update
654 # ---------------------------------------------------------------------------
655
656 class TestWriteBranchRef:
657 """Branch refs are the second most critical VCS state.
658
659 A corrupt or missing ref orphans all commits reachable only from that branch.
660 """
661
662 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
663 (heads_dir(tmp_path)).mkdir(parents=True)
664 return tmp_path
665
666 def _valid_cid(self, seed: str = "x") -> str:
667 return fake_id(seed)
668
669 def test_writes_correct_content(self, tmp_path: pathlib.Path) -> None:
670 root = self._init(tmp_path)
671 cid = self._valid_cid("test")
672 write_branch_ref(root, "main", cid)
673 ref_path = heads_dir(root) / "main"
674 assert ref_path.read_text() == cid
675
676 def test_is_atomic_uses_mkstemp(self, tmp_path: pathlib.Path) -> None:
677 root = self._init(tmp_path)
678 called = [False]
679 real_mkstemp = tempfile.mkstemp
680
681 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
682 called[0] = True
683 return real_mkstemp(dir=dir, prefix=prefix)
684
685 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
686 write_branch_ref(root, "main", self._valid_cid())
687
688 assert called[0], "write_branch_ref bypassed mkstemp (not atomic)"
689
690 def test_fsync_called_before_replace(self, tmp_path: pathlib.Path) -> None:
691 root = self._init(tmp_path)
692 call_order: list[str] = []
693 real_fsync = os.fsync
694 real_replace = os.replace
695
696 def t_fsync(fd: int) -> None:
697 call_order.append("fsync")
698 real_fsync(fd)
699
700 def t_replace(src: str | bytes | os.PathLike[str], dst: str | bytes | os.PathLike[str]) -> None:
701 call_order.append("replace")
702 real_replace(src, dst)
703
704 with patch("muse.core.io.os.fsync", side_effect=t_fsync), \
705 patch("muse.core.io.os.replace", side_effect=t_replace):
706 write_branch_ref(root, "main", self._valid_cid())
707
708 fsync_idx = next((i for i, c in enumerate(call_order) if c == "fsync"), None)
709 replace_idx = next((i for i, c in enumerate(call_order) if c == "replace"), None)
710 assert fsync_idx is not None, "fsync not called in write_branch_ref"
711 assert replace_idx is not None, "replace not called in write_branch_ref"
712 assert fsync_idx < replace_idx, "fsync must precede replace"
713
714 def test_rejects_invalid_branch_name(self, tmp_path: pathlib.Path) -> None:
715 root = self._init(tmp_path)
716 with pytest.raises((ValueError, SystemExit)):
717 write_branch_ref(root, "../escape", self._valid_cid())
718
719 def test_rejects_non_hex_commit_id(self, tmp_path: pathlib.Path) -> None:
720 root = self._init(tmp_path)
721 with pytest.raises(ValueError):
722 write_branch_ref(root, "main", "z" * 64)
723
724 def test_rejects_short_commit_id(self, tmp_path: pathlib.Path) -> None:
725 root = self._init(tmp_path)
726 with pytest.raises(ValueError):
727 write_branch_ref(root, "main", "abc123")
728
729 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
730 root = self._init(tmp_path)
731 write_branch_ref(root, "main", self._valid_cid())
732 assert _tmp_files(tmp_path) == []
733
734 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
735 root = self._init(tmp_path)
736 with pytest.raises(OSError):
737 with patch("muse.core.io.os.replace", side_effect=OSError("disk full")):
738 write_branch_ref(root, "main", self._valid_cid())
739 assert _tmp_files(tmp_path) == []
740
741 def test_creates_nested_branch_path(self, tmp_path: pathlib.Path) -> None:
742 """Branches like feat/my-thing require parent dir creation."""
743 root = self._init(tmp_path)
744 cid = self._valid_cid("nested")
745 write_branch_ref(root, "feat/my-thing", cid)
746 ref_path = heads_dir(root) / "feat" / "my-thing"
747 assert ref_path.read_text() == cid
748
749 def test_50_concurrent_refs_distinct_branches(self, tmp_path: pathlib.Path) -> None:
750 """50 concurrent writes to 50 distinct branches — all must land correctly."""
751 root = self._init(tmp_path)
752 branches = [f"agent-{i:04d}" for i in range(50)]
753 cids = {b: self._valid_cid(b) for b in branches}
754 errors: list[str] = []
755
756 def writer(branch: str) -> None:
757 try:
758 write_branch_ref(root, branch, cids[branch])
759 branch_ref = ref_path(root, branch)
760 got = branch_ref.read_text()
761 if got != cids[branch]:
762 errors.append(f"{branch}: expected {cids[branch][:8]}, got {got[:8]}")
763 except Exception as exc:
764 errors.append(f"{branch}: {exc}")
765
766 threads = [threading.Thread(target=writer, args=(b,)) for b in branches]
767 for t in threads:
768 t.start()
769 for t in threads:
770 t.join()
771
772 assert errors == [], f"Concurrent branch ref errors: {errors}"
773 assert _tmp_files(tmp_path) == []
774
775 def test_50_concurrent_refs_same_branch(self, tmp_path: pathlib.Path) -> None:
776 """50 concurrent writes to the SAME branch — last wins, no corruption."""
777 root = self._init(tmp_path)
778 cids = [self._valid_cid(f"race-{i}") for i in range(50)]
779 errors: list[str] = []
780
781 def writer(cid: str) -> None:
782 try:
783 write_branch_ref(root, "main", cid)
784 content = (heads_dir(root) / "main").read_text()
785 if content not in cids:
786 errors.append(f"Corrupt content after write: {content!r}")
787 except Exception as exc:
788 errors.append(str(exc))
789
790 threads = [threading.Thread(target=writer, args=(c,)) for c in cids]
791 for t in threads:
792 t.start()
793 for t in threads:
794 t.join()
795
796 assert errors == [], f"Same-branch concurrent errors: {errors}"
797 assert _tmp_files(tmp_path) == []
798
799
800 # ---------------------------------------------------------------------------
801 # Tier 2a: write_merge_state — MERGE_STATE.json
802 # ---------------------------------------------------------------------------
803
804 class TestMergeStateWrite:
805 """MERGE_STATE.json records in-progress conflict state.
806
807 A corrupt file prevents muse commit from completing a conflicted merge.
808 """
809
810 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
811 muse_dir(tmp_path).mkdir()
812 return tmp_path
813
814 def _cid(self, seed: str) -> str:
815 return fake_id(seed)
816
817 def test_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
818 from muse.core.merge_engine import write_merge_state
819 root = self._init(tmp_path)
820 write_merge_state(
821 root,
822 base_commit=self._cid("base"),
823 ours_commit=self._cid("ours"),
824 theirs_commit=self._cid("theirs"),
825 conflict_paths=["a.py", "b.py"],
826 )
827 state_path = merge_state_path(root)
828 data = json.loads(state_path.read_text())
829 assert data["conflict_paths"] == ["a.py", "b.py"]
830
831 def test_is_atomic(self, tmp_path: pathlib.Path) -> None:
832 """write_merge_state must funnel through write_text_atomic."""
833 from muse.core.merge_engine import write_merge_state
834 root = self._init(tmp_path)
835 called = [False]
836 real_mkstemp = tempfile.mkstemp
837
838 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
839 called[0] = True
840 return real_mkstemp(dir=dir, prefix=prefix)
841
842 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
843 write_merge_state(
844 root,
845 base_commit=self._cid("b"),
846 ours_commit=self._cid("o"),
847 theirs_commit=self._cid("t"),
848 conflict_paths=[],
849 )
850
851 assert called[0], "write_merge_state bypassed mkstemp (not atomic)"
852
853 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
854 from muse.core.merge_engine import write_merge_state
855 root = self._init(tmp_path)
856 write_merge_state(
857 root,
858 base_commit=self._cid("b"),
859 ours_commit=self._cid("o"),
860 theirs_commit=self._cid("t"),
861 conflict_paths=["x.py"],
862 )
863 assert _tmp_files(tmp_path) == []
864
865
866 # ---------------------------------------------------------------------------
867 # Tier 2b: save_rebase_state — REBASE_STATE.json
868 # ---------------------------------------------------------------------------
869
870 class TestRebaseStateWrite:
871 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
872 muse_dir(tmp_path).mkdir()
873 return tmp_path
874
875 def _state(self) -> RebaseState:
876 cid = fake_id("rebase-c")
877 return RebaseState(
878 original_branch="main",
879 original_head=cid,
880 onto=cid,
881 remaining=[],
882 completed=[],
883 squash=False,
884 )
885
886 def test_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
887 from muse.core.rebase import save_rebase_state
888 root = self._init(tmp_path)
889 save_rebase_state(root, self._state())
890 path = rebase_state_path(root)
891 data = json.loads(path.read_text())
892 assert data["original_branch"] == "main"
893
894 def test_is_atomic(self, tmp_path: pathlib.Path) -> None:
895 from muse.core.rebase import save_rebase_state
896 root = self._init(tmp_path)
897 called = [False]
898 real_mkstemp = tempfile.mkstemp
899
900 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
901 called[0] = True
902 return real_mkstemp(dir=dir, prefix=prefix)
903
904 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
905 save_rebase_state(root, self._state())
906
907 assert called[0], "save_rebase_state bypassed mkstemp"
908
909 def test_no_orphan_after_success(self, tmp_path: pathlib.Path) -> None:
910 from muse.core.rebase import save_rebase_state
911 root = self._init(tmp_path)
912 save_rebase_state(root, self._state())
913 assert _tmp_files(tmp_path) == []
914
915
916 # ---------------------------------------------------------------------------
917 # Tier 2c: coordination.py — reservation + intent writes
918 # ---------------------------------------------------------------------------
919
920 class TestCoordinationWrites:
921 def _init(self, tmp_path: pathlib.Path) -> pathlib.Path:
922 muse_dir(tmp_path).mkdir()
923 return tmp_path
924
925 def _res(self, root: pathlib.Path, i: int = 0) -> Reservation:
926 from muse.core.coordination import create_reservation
927 return create_reservation(
928 root,
929 run_id=f"run-{i}",
930 branch="main",
931 addresses=[f"addr-{i}"],
932 operation="write",
933 )
934
935 def test_create_reservation_is_atomic(self, tmp_path: pathlib.Path) -> None:
936 from muse.core.coordination import create_reservation
937 root = self._init(tmp_path)
938 called = [False]
939 real_mkstemp = tempfile.mkstemp
940
941 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
942 called[0] = True
943 return real_mkstemp(dir=dir, prefix=prefix)
944
945 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
946 create_reservation(root, run_id="r1", branch="main", addresses=["a"], operation="write")
947
948 assert called[0], "create_reservation bypassed mkstemp"
949
950 def test_create_reservation_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
951 from muse.core.coordination import _reservations_dir
952 root = self._init(tmp_path)
953 res = self._res(root, 0)
954 res_path = _reservations_dir(root) / f"{res.reservation_id}.json"
955 data = json.loads(res_path.read_text())
956 assert data["operation"] == "write"
957
958 def test_create_intent_is_atomic(self, tmp_path: pathlib.Path) -> None:
959 from muse.core.coordination import create_intent
960 root = self._init(tmp_path)
961 res = self._res(root)
962 called = [False]
963 real_mkstemp = tempfile.mkstemp
964
965 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
966 called[0] = True
967 return real_mkstemp(dir=dir, prefix=prefix)
968
969 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
970 create_intent(
971 root,
972 reservation_id=res.reservation_id,
973 run_id="r1",
974 branch="main",
975 addresses=["a"],
976 operation="merge",
977 )
978
979 assert called[0], "create_intent bypassed mkstemp"
980
981 def test_create_intent_writes_valid_json(self, tmp_path: pathlib.Path) -> None:
982 from muse.core.coordination import create_intent, _intents_dir
983 root = self._init(tmp_path)
984 res = self._res(root)
985 intent = create_intent(
986 root,
987 reservation_id=res.reservation_id,
988 run_id="r1",
989 branch="main",
990 addresses=["a"],
991 operation="push",
992 )
993 intent_path = _intents_dir(root) / f"{intent.intent_id}.json"
994 data = json.loads(intent_path.read_text())
995 assert data["operation"] == "push"
996
997 def test_no_orphan_after_reservation(self, tmp_path: pathlib.Path) -> None:
998 root = self._init(tmp_path)
999 self._res(root)
1000 assert _tmp_files(tmp_path) == []
1001
1002 def test_no_orphan_after_intent(self, tmp_path: pathlib.Path) -> None:
1003 from muse.core.coordination import create_intent
1004 root = self._init(tmp_path)
1005 res = self._res(root)
1006 create_intent(
1007 root,
1008 reservation_id=res.reservation_id,
1009 run_id="r1",
1010 branch="main",
1011 addresses=["a"],
1012 operation="commit",
1013 )
1014 assert _tmp_files(tmp_path) == []
1015
1016 def test_20_concurrent_reservation_writes(self, tmp_path: pathlib.Path) -> None:
1017 """20 concurrent agents creating reservations — no corruption, no orphans."""
1018 from muse.core.coordination import create_reservation, _reservations_dir
1019 root = self._init(tmp_path)
1020 errors: list[str] = []
1021
1022 def writer(i: int) -> None:
1023 try:
1024 create_reservation(
1025 root,
1026 run_id=f"run-{i}",
1027 branch="main",
1028 addresses=[f"addr-{i}"],
1029 operation="write",
1030 )
1031 except Exception as exc:
1032 errors.append(str(exc))
1033
1034 threads = [threading.Thread(target=writer, args=(i,)) for i in range(20)]
1035 for t in threads:
1036 t.start()
1037 for t in threads:
1038 t.join()
1039
1040 assert errors == [], f"Concurrent reservation errors: {errors}"
1041 reservation_files = list(_reservations_dir(root).glob("*.json"))
1042 assert len(reservation_files) == 20, f"Expected 20 reservation files, got {len(reservation_files)}"
1043 assert _tmp_files(tmp_path) == []
1044
1045
1046 # ---------------------------------------------------------------------------
1047 # Tier 3: config.py — TOML config writes
1048 # ---------------------------------------------------------------------------
1049
1050 class TestConfigWrites:
1051 """Config files govern remote connections, auth, and repo settings.
1052
1053 A corrupt config.toml prevents all repo operations.
1054 """
1055
1056 def _init_config_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
1057 """Create a minimal repo with config.toml so config helpers can operate."""
1058 dot_muse = muse_dir(tmp_path)
1059 dot_muse.mkdir()
1060 (dot_muse / "objects").mkdir()
1061 (dot_muse / "commits").mkdir()
1062 (dot_muse / "snapshots").mkdir()
1063 (dot_muse / "refs" / "heads").mkdir(parents=True)
1064 (dot_muse / "repo.json").write_text(
1065 '{"repo_id": "test-repo", "domain": "code", "default_branch": "main"}',
1066 encoding="utf-8",
1067 )
1068 (dot_muse / "config.toml").write_text("", encoding="utf-8")
1069 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8")
1070 return tmp_path
1071
1072 def test_set_remote_is_atomic(self, tmp_path: pathlib.Path) -> None:
1073 """set_remote (writes config.toml) must funnel through write_text_atomic."""
1074 from muse.cli.config import set_remote
1075 root = self._init_config_repo(tmp_path)
1076 called = [False]
1077 real_mkstemp = tempfile.mkstemp
1078
1079 def tracking(dir: pathlib.Path | None = None, prefix: str = "") -> tuple[int, str]:
1080 called[0] = True
1081 return real_mkstemp(dir=dir, prefix=prefix)
1082
1083 with patch("muse.core.io.tempfile.mkstemp", side_effect=tracking):
1084 set_remote("local", "https://localhost:1337", repo_root=root)
1085
1086 assert called[0], "set_remote bypassed mkstemp (config write not atomic)"
1087
1088 def test_set_remote_no_orphan(self, tmp_path: pathlib.Path) -> None:
1089 from muse.cli.config import set_remote
1090 root = self._init_config_repo(tmp_path)
1091 set_remote("origin", "https://localhost:1337", repo_root=root)
1092 assert _tmp_files(tmp_path) == []
1093
1094 def test_set_remote_correct_content_persisted(self, tmp_path: pathlib.Path) -> None:
1095 from muse.cli.config import set_remote, get_remote
1096 root = self._init_config_repo(tmp_path)
1097 set_remote("myremote", "http://myhost:9000", repo_root=root)
1098 url = get_remote("myremote", repo_root=root)
1099 assert url == "http://myhost:9000"
1100
1101 def test_10_concurrent_config_writes_no_orphan(self, tmp_path: pathlib.Path) -> None:
1102 """10 concurrent set_remote calls — no orphan temp files."""
1103 from muse.cli.config import set_remote
1104 root = self._init_config_repo(tmp_path)
1105 errors: list[str] = []
1106
1107 def writer(i: int) -> None:
1108 try:
1109 set_remote(f"remote-{i}", f"http://host-{i}:9000", repo_root=root)
1110 except Exception as exc:
1111 errors.append(str(exc))
1112
1113 threads = [threading.Thread(target=writer, args=(i,)) for i in range(10)]
1114 for t in threads:
1115 t.start()
1116 for t in threads:
1117 t.join()
1118
1119 # No orphan temps regardless of config merge conflicts
1120 assert _tmp_files(tmp_path) == []
1121
1122
1123 # ---------------------------------------------------------------------------
1124 # Gap 2+3: write_object_from_path — fsync ordering + copy2 failure cleanup
1125 # ---------------------------------------------------------------------------
1126
1127 class TestWriteObjectFromPathFsync:
1128 """_fsync_fd must be called before os.replace in write_object_from_path.
1129
1130 write_object_from_path uses shutil.copy2 then re-opens the temp file as
1131 an fd to call fchmod + _fsync_fd before the atomic rename. The test
1132 patches _fsync_fd (the fd-based variant) — NOT _fsync_path, which is the
1133 path-based variant used only by restore_object.
1134 """
1135
1136 def test_fsync_path_called_before_replace(self, tmp_path: pathlib.Path) -> None:
1137 """_fsync_fd must be invoked before os.replace."""
1138 repo = _repo(tmp_path)
1139 data = b"from-path fsync ordering"
1140 oid = _oid(data)
1141 src = tmp_path / "source.bin"
1142 src.write_bytes(data)
1143
1144 call_order: list[str] = []
1145 real_fsync_fd = __import__("muse.core.object_store", fromlist=["_fsync_fd"])._fsync_fd
1146 real_replace = os.replace
1147
1148 def t_fsync_fd(fd: int) -> None:
1149 call_order.append("fsync_fd")
1150 real_fsync_fd(fd)
1151
1152 def t_replace(s: str | bytes | os.PathLike[str], d: str | bytes | os.PathLike[str]) -> None:
1153 call_order.append("replace")
1154 real_replace(s, d)
1155
1156 with patch("muse.core.object_store._fsync_fd", side_effect=t_fsync_fd), \
1157 patch("muse.core.object_store.os.replace", side_effect=t_replace):
1158 write_object_from_path(repo, oid, src)
1159
1160 fp = next((i for i, c in enumerate(call_order) if c == "fsync_fd"), None)
1161 rp = next((i for i, c in enumerate(call_order) if c == "replace"), None)
1162 assert fp is not None, "_fsync_fd never called in write_object_from_path"
1163 assert rp is not None, "os.replace never called in write_object_from_path"
1164 assert fp < rp, f"_fsync_fd (pos {fp}) must happen before replace (pos {rp})"
1165
1166 def test_fsync_path_failure_non_fatal(self, tmp_path: pathlib.Path) -> None:
1167 """os.fsync failure inside _fsync_fd must not abort write_object_from_path.
1168
1169 _fsync_fd swallows OSError internally — we patch os.fsync so the
1170 function's own try/except absorbs the failure, exactly as it would on a
1171 filesystem that does not support fsync (tmpfs, some Docker volumes).
1172 """
1173 repo = _repo(tmp_path)
1174 data = b"fsync_path fails gracefully"
1175 oid = _oid(data)
1176 src = tmp_path / "src.bin"
1177 src.write_bytes(data)
1178
1179 with patch("muse.core.object_store.os.fsync", side_effect=OSError("not supported")):
1180 result = write_object_from_path(repo, oid, src)
1181
1182 assert result is True
1183 assert read_object(repo, oid) == data
1184
1185 def test_no_orphan_after_copy2_failure(self, tmp_path: pathlib.Path) -> None:
1186 """When the write loop raises, the mkstemp temp file must be cleaned up."""
1187 repo = _repo(tmp_path)
1188 data = b"copy2 will fail"
1189 oid = _oid(data)
1190 src = tmp_path / "src.bin"
1191 src.write_bytes(data)
1192
1193 with pytest.raises(OSError):
1194 with patch("muse.core.object_store.os.fdopen", side_effect=OSError("I/O error")):
1195 write_object_from_path(repo, oid, src)
1196
1197 assert _tmp_files(tmp_path) == [], "Orphan temp after write failure"
1198
1199 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
1200 """When os.replace raises, the temp file must be cleaned up."""
1201 repo = _repo(tmp_path)
1202 data = b"replace will fail for from_path"
1203 oid = _oid(data)
1204 src = tmp_path / "src.bin"
1205 src.write_bytes(data)
1206
1207 with pytest.raises(OSError):
1208 with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")):
1209 write_object_from_path(repo, oid, src)
1210
1211 assert _tmp_files(tmp_path) == [], "Orphan temp after os.replace failure in write_object_from_path"
1212
1213 def test_correct_content_after_write(self, tmp_path: pathlib.Path) -> None:
1214 """Content round-trips correctly through write_object_from_path → read_object."""
1215 repo = _repo(tmp_path)
1216 data = os.urandom(4096)
1217 oid = _oid(data)
1218 src = tmp_path / "payload.bin"
1219 src.write_bytes(data)
1220
1221 write_object_from_path(repo, oid, src)
1222 assert read_object(repo, oid) == data
1223
1224
1225 # ---------------------------------------------------------------------------
1226 # Gap 4+5+6: restore_object — fsync ordering + copy2 failure cleanup
1227 # ---------------------------------------------------------------------------
1228
1229 class TestRestoreObjectFsync:
1230 """_fsync_path must be called before os.replace in restore_object."""
1231
1232 def test_fsync_path_called_before_replace(self, tmp_path: pathlib.Path) -> None:
1233 repo = _repo(tmp_path)
1234 data = b"restore fsync ordering"
1235 oid = _oid(data)
1236 write_object(repo, oid, data)
1237 dest = tmp_path / "restored.bin"
1238
1239 call_order: list[str] = []
1240 real_fsync_path = __import__("muse.core.object_store", fromlist=["_fsync_path"])._fsync_path
1241 real_replace = os.replace
1242
1243 def t_fsync_path(path: pathlib.Path) -> None:
1244 call_order.append("fsync_path")
1245 real_fsync_path(path)
1246
1247 def t_replace(s: str | bytes | os.PathLike[str], d: str | bytes | os.PathLike[str]) -> None:
1248 call_order.append("replace")
1249 real_replace(s, d)
1250
1251 with patch("muse.core.object_store._fsync_path", side_effect=t_fsync_path), \
1252 patch("muse.core.object_store.os.replace", side_effect=t_replace):
1253 restore_object(repo, oid, dest)
1254
1255 fp = next((i for i, c in enumerate(call_order) if c == "fsync_path"), None)
1256 rp = next((i for i, c in enumerate(call_order) if c == "replace"), None)
1257 assert fp is not None, "_fsync_path never called in restore_object"
1258 assert rp is not None, "os.replace never called in restore_object"
1259 assert fp < rp, f"_fsync_path (pos {fp}) must precede replace (pos {rp})"
1260
1261 def test_fsync_path_failure_non_fatal(self, tmp_path: pathlib.Path) -> None:
1262 """os.fsync failure inside _fsync_path must not abort restore_object."""
1263 repo = _repo(tmp_path)
1264 data = b"restore fsync_path fails gracefully"
1265 oid = _oid(data)
1266 write_object(repo, oid, data)
1267 dest = tmp_path / "restored.bin"
1268
1269 with patch("muse.core.object_store.os.fsync", side_effect=OSError("not supported")):
1270 result = restore_object(repo, oid, dest)
1271
1272 assert result is True
1273 assert dest.read_bytes() == data
1274
1275 def test_no_orphan_after_copy2_failure(self, tmp_path: pathlib.Path) -> None:
1276 repo = _repo(tmp_path)
1277 data = b"restore copy2 will fail"
1278 oid = _oid(data)
1279 write_object(repo, oid, data)
1280 dest = tmp_path / "out.bin"
1281
1282 with pytest.raises(OSError):
1283 with patch("muse.core.object_store.os.fdopen", side_effect=OSError("I/O error")):
1284 restore_object(repo, oid, dest)
1285
1286 assert _tmp_files(tmp_path) == [], "Orphan temp after write failure in restore_object"
1287
1288 def test_no_orphan_after_replace_failure(self, tmp_path: pathlib.Path) -> None:
1289 repo = _repo(tmp_path)
1290 data = b"restore replace will fail"
1291 oid = _oid(data)
1292 write_object(repo, oid, data)
1293 dest = tmp_path / "out.bin"
1294
1295 with pytest.raises(OSError):
1296 with patch("muse.core.object_store.os.replace", side_effect=OSError("disk full")):
1297 restore_object(repo, oid, dest)
1298
1299 assert _tmp_files(tmp_path) == [], "Orphan temp after os.replace failure in restore_object"
1300
1301 def test_restored_file_mtime_is_current_not_from_object_store(
1302 self, tmp_path: pathlib.Path
1303 ) -> None:
1304 """restore_object must set the destination mtime to NOW, not to the
1305 object-store file's mtime.
1306
1307 shutil.copy2 propagates the source (object-store) mtime to the temp
1308 file. Object-store files are written at commit time and may be days
1309 or weeks old. Without os.utime(tmp, None) the restored destination
1310 carries an old timestamp, causing editors (Cursor, VS Code, Vim) to
1311 see "new mtime < cached mtime" and serve a stale buffer instead of
1312 reloading the file. This is the regression that caused the
1313 "merge work disappears in Cursor but reappears on close/reopen" bug.
1314 """
1315 import time
1316
1317 repo = _repo(tmp_path)
1318 data = b"content that differs from any existing file\n" * 10
1319 oid = _oid(data)
1320 write_object(repo, oid, data)
1321
1322 # Simulate an old object-store mtime: backdate the stored object to 2 days ago.
1323 obj_path = object_path(repo, oid)
1324 two_days_ago = time.time() - (2 * 24 * 3600)
1325 os.utime(obj_path, (two_days_ago, two_days_ago))
1326
1327 # Write a pre-existing dest with a "current" mtime (simulating Cursor's
1328 # last-read timestamp before the checkout/merge).
1329 dest = tmp_path / "watched_file.py"
1330 dest.write_bytes(b"old content that cursor has open\n")
1331 cursor_cached_mtime = time.time()
1332 os.utime(dest, (cursor_cached_mtime, cursor_cached_mtime))
1333
1334 # restore_object must write the new content AND freshen mtime.
1335 t_before = time.time()
1336 restore_object(repo, oid, dest)
1337 t_after = time.time()
1338
1339 new_mtime = os.stat(dest).st_mtime
1340
1341 # Destination must have a FRESH timestamp — not the object-store's old one.
1342 assert new_mtime >= t_before, (
1343 f"Restored file mtime ({new_mtime:.2f}) is older than the time "
1344 f"restore_object was called ({t_before:.2f}). "
1345 "shutil.copy2 is propagating the object-store's stale mtime, "
1346 "which causes editors to serve stale buffers after checkout/merge."
1347 )
1348 assert new_mtime <= t_after + 1.0, (
1349 f"Restored file mtime ({new_mtime:.2f}) is far in the future — unexpected."
1350 )
1351
1352 # Content must be correct regardless.
1353 assert dest.read_bytes() == data
1354
1355 def test_restored_mtime_fresher_than_previous_content(
1356 self, tmp_path: pathlib.Path
1357 ) -> None:
1358 """After restore_object, the destination mtime must be >= the mtime it
1359 had before the call, so editors always see a forward-moving timestamp."""
1360 import time
1361
1362 repo = _repo(tmp_path)
1363 new_data = b"new version from feature branch\n"
1364 oid = _oid(new_data)
1365 write_object(repo, oid, new_data)
1366
1367 dest = tmp_path / "file.py"
1368 dest.write_bytes(b"old version on dev\n")
1369 old_mtime = time.time()
1370 os.utime(dest, (old_mtime, old_mtime))
1371
1372 # Backdate the object-store copy (as it would be after a real commit).
1373 obj_path = object_path(repo, oid)
1374 os.utime(obj_path, (old_mtime - 86400, old_mtime - 86400))
1375
1376 restore_object(repo, oid, dest)
1377
1378 assert os.stat(dest).st_mtime >= old_mtime, (
1379 "Restored file mtime went backwards — editor will not see the change."
1380 )
1381
1382
1383 # ---------------------------------------------------------------------------
1384 # Gap 7: page-cache non-flush — defense-in-depth (I-1 catches what I-2 misses)
1385 # ---------------------------------------------------------------------------
1386
1387 class TestPageCacheDefenseInDepth:
1388 """Demonstrate that I-1 (read-time hash verification) is the safety net
1389 for the unlikely scenario where fsync appeared to succeed but the kernel
1390 wrote zero bytes to disk (power loss AFTER rename, BEFORE flush).
1391
1392 Simulated by: writing an object normally, then zeroing the on-disk file
1393 (mimicking a power-loss-induced empty file at the renamed destination).
1394 read_object must raise OSError — the store never silently serves bad data.
1395 """
1396
1397 def test_zeroed_dest_after_rename_caught_by_read(self, tmp_path: pathlib.Path) -> None:
1398 """Simulate post-rename page-cache loss: zero the stored file, then read."""
1399 repo = _repo(tmp_path)
1400 data = b"page cache simulation"
1401 oid = _oid(data)
1402 write_object(repo, oid, data)
1403
1404 # Mimic power loss that zeroed the file after rename.
1405 _corrupt_file(object_path(repo, oid), b"\x00" * len(data))
1406
1407 with pytest.raises(OSError, match="integrity check"):
1408 read_object(repo, oid)
1409
1410 def test_truncated_dest_caught_by_read(self, tmp_path: pathlib.Path) -> None:
1411 """Simulate partial flush: only first half of bytes survived power loss."""
1412 repo = _repo(tmp_path)
1413 data = b"partial flush simulation" * 10
1414 oid = _oid(data)
1415 write_object(repo, oid, data)
1416 # Only the first half survived to disk.
1417 _corrupt_file(object_path(repo, oid), data[: len(data) // 2])
1418
1419 with pytest.raises(OSError, match="integrity check"):
1420 read_object(repo, oid)
1421
1422 def test_noop_write_detected(self, tmp_path: pathlib.Path) -> None:
1423 """Simulate fh.write no-op (page cache accepted write, never flushed).
1424
1425 We write the object normally and then zero the stored file to mimic the
1426 outcome of a post-rename page-cache flush failure. read_object must
1427 raise OSError — I-1's hash check is the final safety net for any I-2
1428 failure mode.
1429 """
1430 repo = _repo(tmp_path)
1431 data = b"write syscall accepted but page cache never flushed"
1432 oid = _oid(data)
1433
1434 # Write correctly first, then simulate the power-loss outcome: the
1435 # renamed destination was never actually flushed to durable storage.
1436 write_object(repo, oid, data)
1437 stored = object_path(repo, oid)
1438 _corrupt_file(stored, b"") # zero bytes — what a power loss leaves
1439
1440 with pytest.raises(OSError, match="integrity check"):
1441 read_object(repo, oid)
1442
1443
1444 # ---------------------------------------------------------------------------
1445 # Gap 8: same object_id written from N threads simultaneously — idempotency
1446 # ---------------------------------------------------------------------------
1447
1448 class TestIdempotentConcurrentWrite:
1449 """write_object is idempotent: same object_id written from many threads
1450 concurrently must never produce corruption — only one write wins, others
1451 see exists() and skip. The content of the winner must be correct.
1452 """
1453
1454 def test_same_object_50_threads_no_corruption(self, tmp_path: pathlib.Path) -> None:
1455 """50 threads writing the same object_id must all succeed with correct content."""
1456 repo = _repo(tmp_path)
1457 data = b"idempotent object written from 50 threads"
1458 oid = _oid(data)
1459 errors: list[str] = []
1460
1461 def writer() -> None:
1462 try:
1463 write_object(repo, oid, data)
1464 result = read_object(repo, oid)
1465 if result != data:
1466 errors.append(f"Mismatch: {repr(result)[:30]}")
1467 except Exception as exc:
1468 errors.append(f"Exception: {exc}")
1469
1470 threads = [threading.Thread(target=writer) for _ in range(50)]
1471 for t in threads:
1472 t.start()
1473 for t in threads:
1474 t.join()
1475
1476 assert errors == [], f"Idempotent concurrent write errors:\n{'\n'.join(errors)}"
1477 assert read_object(repo, oid) == data
1478 assert _tmp_files(tmp_path) == []
1479
1480 def test_same_object_distinct_content_rejected(self, tmp_path: pathlib.Path) -> None:
1481 """Writing different bytes under the same object_id is always rejected."""
1482 repo = _repo(tmp_path)
1483 data = b"canonical content"
1484 oid = _oid(data)
1485 wrong = b"wrong content that hashes differently"
1486
1487 write_object(repo, oid, data)
1488
1489 with pytest.raises(ValueError, match="integrity"):
1490 write_object(repo, oid, wrong)
1491
1492 assert read_object(repo, oid) == data
1493
1494
1495 # ---------------------------------------------------------------------------
1496 # Gap 9: mid-write fh.write failure — orphan cleaned up
1497 # ---------------------------------------------------------------------------
1498
1499 class TestMidWriteFailureCleanup:
1500 """OSError raised during the fh.write call (disk full mid-write) must not
1501 leave an orphan temp file in the store directory."""
1502
1503 def test_no_orphan_after_write_failure_write_object(self, tmp_path: pathlib.Path) -> None:
1504 """OSError during fh.write must not leave an orphan temp file."""
1505 from unittest.mock import MagicMock
1506 repo = _repo(tmp_path)
1507 data = b"mid-write failure"
1508 oid = _oid(data)
1509
1510 mock_fh = MagicMock()
1511 mock_fh.__enter__.return_value = mock_fh
1512 mock_fh.write.side_effect = OSError("disk full")
1513 mock_fh.flush.return_value = None
1514 mock_fh.fileno.return_value = -1
1515
1516 with pytest.raises(OSError):
1517 with patch("muse.core.object_store.os.fdopen", return_value=mock_fh):
1518 write_object(repo, oid, data)
1519
1520 assert _tmp_files(tmp_path) == [], "Orphan temp file left after mid-write failure"
1521
1522 def test_no_orphan_after_write_failure_write_text_atomic(self, tmp_path: pathlib.Path) -> None:
1523 """write_text_atomic cleans up the temp file when fh.write raises."""
1524 from unittest.mock import MagicMock
1525 path = tmp_path / "state.txt"
1526
1527 mock_fh = MagicMock()
1528 mock_fh.__enter__.return_value = mock_fh
1529 mock_fh.write.side_effect = OSError("disk full")
1530 mock_fh.flush.return_value = None
1531 mock_fh.fileno.return_value = -1
1532
1533 with pytest.raises(OSError):
1534 with patch("muse.core.io.os.fdopen", return_value=mock_fh):
1535 write_text_atomic(path, "will fail")
1536
1537 assert _tmp_files(tmp_path) == [], "Orphan temp left after write_text_atomic mid-write failure"
1538
1539
1540 # ---------------------------------------------------------------------------
1541 # Gap 10: 10 000 sequential commits — store clean throughout
1542 # ---------------------------------------------------------------------------
1543
1544 class TestSequentialStress:
1545 """10 000 sequential commit writes exercise the full fsync+rename path at
1546 scale. The store must be clean (all readable, no orphans) when done.
1547
1548 Based on the Linux-kernel-migration scenario: Linus runs a git-to-muse
1549 import script that writes 75k commits. We test at 10k to keep CI fast.
1550 """
1551
1552 @pytest.mark.slow
1553 def test_10000_sequential_commits_all_readable(self, tmp_path: pathlib.Path) -> None:
1554 """1 000 sequential commits — every one must be readable after write."""
1555 repo = _repo(tmp_path)
1556 commits = [_commit(i) for i in range(1_000)]
1557
1558 for c in commits:
1559 write_commit(repo, c)
1560
1561 # Verify every commit is readable and correct.
1562 failures: list[str] = []
1563 for c in commits:
1564 result = read_commit(repo, c.commit_id)
1565 if result is None:
1566 failures.append(f"Commit {c.commit_id[:8]} not found after write")
1567 elif result.message != c.message:
1568 failures.append(f"Commit {c.commit_id[:8]} message corrupted")
1569
1570 assert failures == [], f"{len(failures)} commit read failures:\n{'\n'.join(failures[:10])}"
1571 assert _tmp_files(tmp_path) == [], "Orphan temps after sequential commit writes"
1572
1573 @pytest.mark.slow
1574 def test_1000_commits_with_20pct_fsync_failure_all_readable(
1575 self, tmp_path: pathlib.Path
1576 ) -> None:
1577 """100 commits with 20% random fsync failures must all land correctly.
1578
1579 Verifies that fsync failure is gracefully handled and atomicity (torn-write
1580 protection) is maintained even when durability (fsync) is degraded.
1581 """
1582 import random as _random
1583 repo = _repo(tmp_path)
1584 rng = _random.Random(42)
1585 commits = [_commit(i) for i in range(100)]
1586 real_fsync = os.fsync
1587
1588 def flaky_fsync(fd: int) -> None:
1589 if rng.random() < 0.2:
1590 raise OSError("simulated fsync failure")
1591 real_fsync(fd)
1592
1593 with patch("muse.core.commits.os.fsync", side_effect=flaky_fsync):
1594 for c in commits:
1595 write_commit(repo, c)
1596
1597 failures: list[str] = []
1598 for c in commits:
1599 result = read_commit(repo, c.commit_id)
1600 if result is None:
1601 failures.append(f"Commit {c.commit_id[:8]} not found")
1602 elif result.message != c.message:
1603 failures.append(f"Commit {c.commit_id[:8]} corrupted")
1604
1605 assert failures == [], f"Commits lost under flaky fsync:\n{'\n'.join(failures)}"
1606 assert _tmp_files(tmp_path) == []
1607
1608
1609 # ---------------------------------------------------------------------------
1610 # Gap 11: SIGKILL crash safety — process kill leaves no orphans
1611 # ---------------------------------------------------------------------------
1612
1613 class TestProcessKillCrashSafety:
1614 """Simulate abrupt process termination (SIGKILL) during an object write.
1615
1616 Uses multiprocessing to run the writer in a child process, then kills it
1617 with SIGKILL at a random moment. Afterward, the store must be consistent:
1618 - Objects fully written before the kill must be readable and hash-correct.
1619 - No orphan temp files must remain (OS cleans up open fds; the temp file
1620 created by mkstemp is unlinked by the OS when the process dies, since
1621 it holds the only reference via the fd).
1622
1623 Note: On most POSIX systems, a SIGKILL'd process that holds an open
1624 mkstemp fd will have that fd closed by the kernel. The temp file remains
1625 on disk (the fd close doesn't unlink it) but the rename never happens, so
1626 the destination is either fully written or absent — never partial.
1627 This test verifies the store consistency guarantee, not orphan cleanup
1628 (orphan GC is a separate I-6 concern).
1629 """
1630
1631 @pytest.mark.slow
1632 def test_sigkill_during_write_leaves_no_partial_dest(
1633 self, tmp_path: pathlib.Path
1634 ) -> None:
1635 """Objects written before SIGKILL must still be readable after kill."""
1636 import multiprocessing
1637 import signal
1638 import time
1639
1640 repo = _repo(tmp_path)
1641
1642 # Write 20 known objects before spawning the crashable process.
1643 pre_oids: list[str] = []
1644 for i in range(20):
1645 data = f"pre-kill-{i}".encode()
1646 oid = _oid(data)
1647 write_object(repo, oid, data)
1648 pre_oids.append(oid)
1649
1650 # "spawn" starts a fresh interpreter — no multi-threaded-fork warning
1651 # and no risk of deadlocks inherited from the pytest runner's threads.
1652 # _sigkill_writer_worker is defined at module level to ensure it is
1653 # picklable across the spawn boundary.
1654 ctx = multiprocessing.get_context("spawn")
1655 proc = ctx.Process(target=_sigkill_writer_worker, args=(repo, 5000))
1656 proc.start()
1657
1658 # Kill the worker after a short random delay.
1659 import random
1660 time.sleep(random.uniform(0.01, 0.05))
1661 if proc.is_alive():
1662 assert proc.pid is not None
1663 os.kill(proc.pid, signal.SIGKILL)
1664 proc.join()
1665
1666 # All pre-kill objects must still be readable and correct.
1667 for i, oid in enumerate(pre_oids):
1668 data = f"pre-kill-{i}".encode()
1669 result = read_object(repo, oid)
1670 assert result == data, f"Pre-kill object {oid[:8]} corrupted after SIGKILL"
1671
1672 # Store consistency: every object file in the store must hash-verify.
1673 import muse.core.object_store as _ost
1674 all_oids = _ost._iter_all_object_ids(repo) if hasattr(_ost, "_iter_all_object_ids") else []
1675 for oid in all_oids:
1676 try:
1677 read_object(repo, oid) # raises on hash mismatch
1678 except OSError as exc:
1679 pytest.fail(f"Corrupt object {oid[:8]} found after SIGKILL: {exc}")
1680
1681
1682 # ---------------------------------------------------------------------------
1683 # Gap 12: Performance benchmark — 4 KiB JSON write + fsync < 5 ms
1684 # ---------------------------------------------------------------------------
1685
1686 class TestFsyncWritePerformance:
1687 """fsync overhead on a 4 KiB JSON commit write must be < 5 ms.
1688
1689 The syscall is dominated by the OS flush latency, not the data volume.
1690 tmpfs (which tmp_path typically uses on Linux) syncs instantly; on macOS
1691 with APFS this is also sub-millisecond. 5 ms is a very generous budget —
1692 a real NVMe commit flush is typically < 0.5 ms.
1693 """
1694
1695 @pytest.mark.perf
1696 def test_write_commit_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None:
1697 """Single 4 KiB commit write (JSON + fsync + rename) < 5 ms."""
1698 import time
1699 repo = _repo(tmp_path)
1700 c = _commit(99_000)
1701
1702 start = time.perf_counter()
1703 write_commit(repo, c)
1704 duration_ms = (time.perf_counter() - start) * 1000
1705
1706 assert read_commit(repo, c.commit_id) is not None
1707 assert duration_ms < 10, (
1708 f"write_commit took {duration_ms:.2f} ms — exceeds the 10 ms fsync budget. "
1709 "Performance regression in the atomic write path."
1710 )
1711
1712 @pytest.mark.perf
1713 def test_write_object_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None:
1714 """Single 4 KiB object write (bytes + fsync + rename) < 5 ms."""
1715 import time
1716 repo = _repo(tmp_path)
1717 data = os.urandom(4096)
1718 oid = _oid(data)
1719
1720 start = time.perf_counter()
1721 write_object(repo, oid, data)
1722 duration_ms = (time.perf_counter() - start) * 1000
1723
1724 assert read_object(repo, oid) == data
1725 assert duration_ms < 10, (
1726 f"write_object took {duration_ms:.2f} ms — exceeds the 10 ms fsync budget."
1727 )
1728
1729 @pytest.mark.perf
1730 def test_write_text_atomic_4kib_under_5ms(self, tmp_path: pathlib.Path) -> None:
1731 """write_text_atomic on a 4 KiB text blob < 5 ms."""
1732 import time
1733 path = tmp_path / "state.txt"
1734 text = "x" * 4096
1735
1736 start = time.perf_counter()
1737 write_text_atomic(path, text)
1738 duration_ms = (time.perf_counter() - start) * 1000
1739
1740 assert path.read_text() == text
1741 assert duration_ms < 10, (
1742 f"write_text_atomic took {duration_ms:.2f} ms — exceeds the 10 ms budget."
1743 )
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago