gabriel / muse public
test_integrity_I9_sigkill.py python
963 lines 36.7 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """I-9 — Crash safety: SIGKILL simulation and startup GC sweep.
2
3 Validates three guarantees:
4
5 1. **Startup GC correctness** — :func:`muse.core.repo.require_repo` sweeps
6 every stale temp-file family on the next command after a crash:
7 * ``.obj-tmp-*`` / ``.restore-tmp-*`` — object-store shard directories
8 * ``.muse-tmp-*`` — store/config writes in ``.muse/`` subdirectories
9 * ``.stat_cache_*.tmp`` — :class:`~muse.core.stat_cache.StatCache`
10
11 2. **SIGKILL safety at timing windows** — a process killed at T+50 ms,
12 T+100 ms, and T+200 ms into a write sequence leaves the repository in a
13 consistent state. The subsequent startup GC removes any orphan temps so
14 the *next* ``muse commit`` succeeds.
15
16 3. **Push idempotency under SIGKILL** — a partial push leaves no corruption
17 on the remote because :func:`~muse.core.object_store.write_object` is
18 content-addressed and atomic; incomplete object writes are swept by the
19 remote-side startup GC.
20
21 Test classes
22 ------------
23 * ``TestCleanupMuseDirTemps`` — unit tests for ``_cleanup_muse_dir_temps``
24 * ``TestStartupGcObjectTemps`` — object-store orphan files swept by GC
25 * ``TestStartupGcMuseTemps`` — ``.muse-tmp-*`` files swept by GC
26 * ``TestStartupGcStatCacheTemps`` — ``.stat_cache_*.tmp`` swept by GC
27 * ``TestRequireRepoCallsGc`` — ``require_repo`` triggers the sweep
28 * ``TestMultipleSigkills`` — stale files from *N* crashes all swept
29 * ``TestSigkillAtTimingWindows`` — subprocess SIGKILL at T+50/100/200 ms
30 * ``TestSigkillDuringCommit`` — full CLI commit survives SIGKILL
31 * ``TestSigkillDuringPush`` — push path is idempotent under SIGKILL
32 * ``TestGcPreservesRealObjects`` — GC never deletes valid stored objects
33 * ``TestGcSweeperPerformance`` — sweep ≤ 10 ms with 1 000 stale files
34 * ``TestRestoreTempWorkdirBound`` — restore-tmp in workdir: documented scope
35 """
36
37 from __future__ import annotations
38
39 import hashlib
40 import multiprocessing
41 import os
42 import pathlib
43 import signal
44 import tempfile
45 import time
46
47 import pytest
48
49 from muse.core.types import blob_id
50 from muse.core.object_store import (
51 cleanup_stale_object_temps,
52 object_path,
53 objects_dir,
54 read_object,
55 write_object,
56 )
57 from muse.core.paths import commits_dir, muse_dir, objects_dir, releases_dir, stat_cache_path, tags_dir
58 from muse.core.repo import (
59 _MUSE_TEMP_PREFIXES,
60 _MUSE_SWEEP_DIRS,
61 _cleanup_muse_dir_temps,
62 _startup_gc,
63 require_repo,
64 )
65 from muse.core.io import write_text_atomic
66
67
68 # ---------------------------------------------------------------------------
69 # Helpers
70 # ---------------------------------------------------------------------------
71
72
73 def _repo(tmp_path: pathlib.Path) -> pathlib.Path:
74 """Minimal .muse/ layout for unit tests."""
75 muse = muse_dir(tmp_path)
76 muse.mkdir()
77 (muse / "commits").mkdir()
78 (muse / "snapshots").mkdir()
79 (muse / "branches").mkdir()
80 (muse / "refs").mkdir()
81 (muse / "refs" / "heads").mkdir()
82 (muse / "objects").mkdir()
83 return tmp_path
84
85
86 def _oid(data: bytes) -> str:
87 return blob_id(data)
88
89
90 def _shard(repo: pathlib.Path, prefix: str) -> pathlib.Path:
91 """Return the canonical shard directory for a two-char hex prefix."""
92 return objects_dir(repo) / "sha256" / prefix
93
94
95 def _plant_stale_muse_tmp(muse_dir: pathlib.Path, subdir: str = "") -> pathlib.Path:
96 """Create a fake .muse-tmp-* file as would be left by a SIGKILL'd write_text_atomic."""
97 target = muse_dir / subdir if subdir else muse_dir
98 target.mkdir(parents=True, exist_ok=True)
99 fd, path = tempfile.mkstemp(dir=target, prefix=".muse-tmp-")
100 os.close(fd)
101 pathlib.Path(path).write_text("partial content", encoding="utf-8")
102 return pathlib.Path(path)
103
104
105 def _plant_stale_stat_cache_tmp(muse_dir: pathlib.Path) -> pathlib.Path:
106 """Create a fake .stat_cache_*.tmp file as would be left by a SIGKILL'd StatCache.save."""
107 cache = muse_dir / "cache"
108 cache.mkdir(exist_ok=True)
109 fd, path = tempfile.mkstemp(dir=cache, prefix=".stat_cache_", suffix=".tmp")
110 os.close(fd)
111 pathlib.Path(path).write_bytes(b"\x00" * 64)
112 return pathlib.Path(path)
113
114
115 def _make_stale(path: pathlib.Path) -> pathlib.Path:
116 """Backdate *path* mtime past the 60-second age gate in cleanup_stale_object_temps.
117
118 cleanup_stale_object_temps skips files younger than _CLEANUP_MIN_AGE_SECS (60 s).
119 Setting mtime to the Unix epoch (1970-01-01) makes freshly-created temp files
120 look decades old so cleanup picks them up immediately in tests.
121 """
122 os.utime(str(path), (0, 0))
123 return path
124
125
126 def _plant_stale_obj_tmp(objects_shard: pathlib.Path) -> pathlib.Path:
127 """Create a fake .obj-tmp-* file as would be left by a SIGKILL'd write_object."""
128 fd, path = tempfile.mkstemp(dir=objects_shard, prefix=".obj-tmp-")
129 os.close(fd)
130 pathlib.Path(path).write_bytes(b"partial object bytes")
131 return _make_stale(pathlib.Path(path))
132
133
134 def _plant_stale_restore_tmp(shard: pathlib.Path) -> pathlib.Path:
135 """Create a fake .restore-tmp-* file as would be left by a SIGKILL'd restore_object."""
136 fd, path = tempfile.mkstemp(dir=shard, prefix=".restore-tmp-")
137 os.close(fd)
138 pathlib.Path(path).write_bytes(b"partial restore")
139 return _make_stale(pathlib.Path(path))
140
141
142 def _count_stale_files(repo_root: pathlib.Path) -> int:
143 """Count all stale temp files in .muse/ (all families)."""
144 muse = muse_dir(repo_root)
145 total = 0
146 for f in muse.rglob("*"):
147 if f.is_file() and any(
148 f.name.startswith(p)
149 for p in (".muse-tmp-", ".stat_cache_", ".obj-tmp-", ".restore-tmp-")
150 ):
151 total += 1
152 return total
153
154
155 # ---------------------------------------------------------------------------
156 # Subprocess workers — defined at module level for picklability under "spawn"
157 # ---------------------------------------------------------------------------
158
159
160 def _write_objects_worker(root: pathlib.Path, count: int) -> None:
161 """Write objects in a tight loop — killed midway to simulate SIGKILL."""
162 import time as _t
163
164 from muse.core.types import blob_id
165 from muse.core.object_store import write_object as _wo
166
167 for i in range(count):
168 payload = f"sigkill-object-{i:06d}".encode()
169 oid = blob_id(payload)
170 _wo(root, oid, payload)
171 _t.sleep(0.0002)
172
173
174 def _write_store_worker(root: pathlib.Path, count: int) -> None:
175 """Write commit-dir text atomically in a loop — killed midway."""
176 import time as _t
177
178 from muse.core.io import write_text_atomic as _wta
179
180 for i in range(count):
181 path = commits_dir(root) / f"fake-{i:06d}.msgpack"
182 _wta(path, f"fake commit {i}")
183 _t.sleep(0.0002)
184
185
186 def _full_commit_worker(repo_path: pathlib.Path, commit_msg: str) -> None:
187 """Run `muse commit -m <msg>` in a subprocess target (for SIGKILL testing)."""
188 import subprocess
189 import sys as _sys
190
191 subprocess.run(
192 ["muse", "commit", "-m", commit_msg],
193 cwd=str(repo_path),
194 stdout=_sys.stdout,
195 stderr=_sys.stderr,
196 )
197
198
199 # ---------------------------------------------------------------------------
200 # 1. _cleanup_muse_dir_temps — unit tests
201 # ---------------------------------------------------------------------------
202
203
204 class TestCleanupMuseDirTemps:
205 """Unit tests for the _cleanup_muse_dir_temps helper."""
206
207 def test_removes_muse_tmp_from_root(self, tmp_path: pathlib.Path) -> None:
208 muse = muse_dir(tmp_path)
209 muse.mkdir()
210 stale = _plant_stale_muse_tmp(muse)
211 assert stale.exists()
212 removed = _cleanup_muse_dir_temps(muse)
213 assert removed == 1
214 assert not stale.exists()
215
216 def test_removes_muse_tmp_from_commits_subdir(self, tmp_path: pathlib.Path) -> None:
217 muse = muse_dir(tmp_path)
218 muse.mkdir()
219 stale = _plant_stale_muse_tmp(muse, "commits")
220 removed = _cleanup_muse_dir_temps(muse)
221 assert removed == 1
222 assert not stale.exists()
223
224 def test_removes_muse_tmp_from_branches_subdir(self, tmp_path: pathlib.Path) -> None:
225 muse = muse_dir(tmp_path)
226 muse.mkdir()
227 (muse / "branches").mkdir()
228 stale = _plant_stale_muse_tmp(muse, "branches")
229 removed = _cleanup_muse_dir_temps(muse)
230 assert removed == 1
231 assert not stale.exists()
232
233 def test_removes_muse_tmp_from_snapshots_subdir(self, tmp_path: pathlib.Path) -> None:
234 muse = muse_dir(tmp_path)
235 muse.mkdir()
236 (muse / "snapshots").mkdir()
237 stale = _plant_stale_muse_tmp(muse, "snapshots")
238 removed = _cleanup_muse_dir_temps(muse)
239 assert removed == 1
240 assert not stale.exists()
241
242 def test_removes_stat_cache_tmp(self, tmp_path: pathlib.Path) -> None:
243 muse = muse_dir(tmp_path)
244 muse.mkdir()
245 stale = _plant_stale_stat_cache_tmp(muse)
246 removed = _cleanup_muse_dir_temps(muse)
247 assert removed == 1
248 assert not stale.exists()
249
250 def test_preserves_real_muse_files(self, tmp_path: pathlib.Path) -> None:
251 muse = muse_dir(tmp_path)
252 muse.mkdir()
253 (muse / "commits").mkdir()
254 # Real files must NOT be deleted
255 real_head = muse / "HEAD"
256 real_head.write_text("ref: refs/heads/main", encoding="utf-8")
257 real_commit = muse / "commits" / "abc123.msgpack"
258 real_commit.write_bytes(b"fake msgpack")
259 real_config = muse / "config.toml"
260 real_config.write_text("[core]\n", encoding="utf-8")
261 removed = _cleanup_muse_dir_temps(muse)
262 assert removed == 0
263 assert real_head.exists()
264 assert real_commit.exists()
265 assert real_config.exists()
266
267 def test_multiple_stale_files_across_subdirs(self, tmp_path: pathlib.Path) -> None:
268 muse = muse_dir(tmp_path)
269 muse.mkdir()
270 stale: list[pathlib.Path] = []
271 stale.append(_plant_stale_muse_tmp(muse))
272 stale.append(_plant_stale_muse_tmp(muse, "commits"))
273 stale.append(_plant_stale_muse_tmp(muse, "snapshots"))
274 stale.append(_plant_stale_stat_cache_tmp(muse))
275 removed = _cleanup_muse_dir_temps(muse)
276 assert removed == 4
277 for f in stale:
278 assert not f.exists()
279
280 def test_nonexistent_muse_dir_returns_zero(self, tmp_path: pathlib.Path) -> None:
281 result = _cleanup_muse_dir_temps(muse_dir(tmp_path))
282 assert result == 0
283
284 def test_missing_subdir_is_skipped_silently(self, tmp_path: pathlib.Path) -> None:
285 muse = muse_dir(tmp_path)
286 muse.mkdir()
287 # Only root exists; subdirs (branches, commits, …) do not
288 stale = _plant_stale_muse_tmp(muse)
289 removed = _cleanup_muse_dir_temps(muse)
290 assert removed == 1
291 assert not stale.exists()
292
293 def test_idempotent_second_call(self, tmp_path: pathlib.Path) -> None:
294 muse = muse_dir(tmp_path)
295 muse.mkdir()
296 _plant_stale_muse_tmp(muse)
297 _cleanup_muse_dir_temps(muse)
298 # Second call on clean dir must not raise and return 0
299 removed = _cleanup_muse_dir_temps(muse)
300 assert removed == 0
301
302 def test_temp_prefixes_constant_non_empty(self) -> None:
303 """_MUSE_TEMP_PREFIXES is exported and non-empty — agents can introspect it."""
304 assert len(_MUSE_TEMP_PREFIXES) >= 2
305 assert ".muse-tmp-" in _MUSE_TEMP_PREFIXES
306 assert ".stat_cache_" in _MUSE_TEMP_PREFIXES
307
308 def test_sweep_dirs_constant_includes_all_write_sites(self) -> None:
309 """_MUSE_SWEEP_DIRS covers every directory that uses write_text_atomic / _write_shelf_header_atomic."""
310 required = {"", "branches", "commits", "snapshots", "tags", "releases"}
311 assert required.issubset(set(_MUSE_SWEEP_DIRS))
312
313
314 # ---------------------------------------------------------------------------
315 # 2. Startup GC — object-store orphans
316 # ---------------------------------------------------------------------------
317
318
319 class TestStartupGcObjectTemps:
320 """Object-store stale temps (.obj-tmp-*, .restore-tmp-*) are swept by startup GC."""
321
322 def test_obj_tmp_removed_by_cleanup(self, tmp_path: pathlib.Path) -> None:
323 repo = _repo(tmp_path)
324 shard = _shard(repo, "ab")
325 shard.mkdir(parents=True, exist_ok=True)
326 stale = _plant_stale_obj_tmp(shard)
327 assert stale.exists()
328 removed = cleanup_stale_object_temps(repo)
329 assert removed >= 1
330 assert not stale.exists()
331
332 def test_restore_tmp_removed_by_cleanup(self, tmp_path: pathlib.Path) -> None:
333 repo = _repo(tmp_path)
334 shard = _shard(repo, "cd")
335 shard.mkdir(parents=True, exist_ok=True)
336 stale = _plant_stale_restore_tmp(shard)
337 removed = cleanup_stale_object_temps(repo)
338 assert removed >= 1
339 assert not stale.exists()
340
341 def test_startup_gc_delegates_to_object_cleanup(self, tmp_path: pathlib.Path) -> None:
342 repo = _repo(tmp_path)
343 shard = _shard(repo, "ef")
344 shard.mkdir(parents=True, exist_ok=True)
345 stale_obj = _plant_stale_obj_tmp(shard)
346 stale_restore = _plant_stale_restore_tmp(shard)
347 _startup_gc(repo)
348 assert not stale_obj.exists()
349 assert not stale_restore.exists()
350
351 def test_real_objects_preserved_by_startup_gc(self, tmp_path: pathlib.Path) -> None:
352 repo = _repo(tmp_path)
353 data = b"real object content"
354 oid = _oid(data)
355 write_object(repo, oid, data)
356 _startup_gc(repo)
357 result = read_object(repo, oid)
358 assert result == data
359
360 def test_stale_and_real_coexist_only_stale_removed(self, tmp_path: pathlib.Path) -> None:
361 repo = _repo(tmp_path)
362 data = b"survivor"
363 oid = _oid(data)
364 write_object(repo, oid, data)
365 # Plant stale temp in same shard as the real object
366 shard = object_path(repo, oid).parent
367 stale = _plant_stale_obj_tmp(shard)
368 _startup_gc(repo)
369 assert not stale.exists()
370 assert read_object(repo, oid) == data
371
372
373 # ---------------------------------------------------------------------------
374 # 3. Startup GC — .muse-tmp-* in subdirectories
375 # ---------------------------------------------------------------------------
376
377
378 class TestStartupGcMuseTemps:
379 """.muse-tmp-* files in all .muse/ subdirs are swept by _startup_gc."""
380
381 def test_muse_tmp_in_root_swept(self, tmp_path: pathlib.Path) -> None:
382 repo = _repo(tmp_path)
383 stale = _plant_stale_muse_tmp(muse_dir(repo))
384 _startup_gc(repo)
385 assert not stale.exists()
386
387 def test_muse_tmp_in_commits_swept(self, tmp_path: pathlib.Path) -> None:
388 repo = _repo(tmp_path)
389 stale = _plant_stale_muse_tmp(muse_dir(repo), "commits")
390 _startup_gc(repo)
391 assert not stale.exists()
392
393 def test_muse_tmp_in_branches_swept(self, tmp_path: pathlib.Path) -> None:
394 repo = _repo(tmp_path)
395 stale = _plant_stale_muse_tmp(muse_dir(repo), "branches")
396 _startup_gc(repo)
397 assert not stale.exists()
398
399 def test_muse_tmp_in_snapshots_swept(self, tmp_path: pathlib.Path) -> None:
400 repo = _repo(tmp_path)
401 stale = _plant_stale_muse_tmp(muse_dir(repo), "snapshots")
402 _startup_gc(repo)
403 assert not stale.exists()
404
405 def test_muse_tmp_in_tags_swept(self, tmp_path: pathlib.Path) -> None:
406 repo = _repo(tmp_path)
407 (tags_dir(repo)).mkdir()
408 stale = _plant_stale_muse_tmp(muse_dir(repo), "tags")
409 _startup_gc(repo)
410 assert not stale.exists()
411
412 def test_muse_tmp_in_releases_swept(self, tmp_path: pathlib.Path) -> None:
413 repo = _repo(tmp_path)
414 (releases_dir(repo)).mkdir()
415 stale = _plant_stale_muse_tmp(muse_dir(repo), "releases")
416 _startup_gc(repo)
417 assert not stale.exists()
418
419 def test_legacy_file_in_commits_preserved(self, tmp_path: pathlib.Path) -> None:
420 repo = _repo(tmp_path)
421 real = commits_dir(repo) / "deadbeef.msgpack"
422 real.write_bytes(b"\x82\xa9commit_id\xa8deadbeef")
423 _startup_gc(repo)
424 assert real.exists()
425
426
427 # ---------------------------------------------------------------------------
428 # 4. Startup GC — .stat_cache_*.tmp
429 # ---------------------------------------------------------------------------
430
431
432 class TestStartupGcStatCacheTemps:
433 """.stat_cache_*.tmp files (StatCache.save) are swept by _startup_gc."""
434
435 def test_stat_cache_tmp_swept(self, tmp_path: pathlib.Path) -> None:
436 repo = _repo(tmp_path)
437 stale = _plant_stale_stat_cache_tmp(muse_dir(repo))
438 _startup_gc(repo)
439 assert not stale.exists()
440
441 def test_real_stat_cache_preserved(self, tmp_path: pathlib.Path) -> None:
442 repo = _repo(tmp_path)
443 real = stat_cache_path(repo)
444 real.parent.mkdir(parents=True, exist_ok=True)
445 real.write_bytes(b"\x82\xa7version\x02")
446 _startup_gc(repo)
447 assert real.exists()
448
449 def test_multiple_stat_cache_tmps_all_swept(self, tmp_path: pathlib.Path) -> None:
450 repo = _repo(tmp_path)
451 stales = [_plant_stale_stat_cache_tmp(muse_dir(repo)) for _ in range(5)]
452 _startup_gc(repo)
453 for s in stales:
454 assert not s.exists()
455
456
457 # ---------------------------------------------------------------------------
458 # 5. require_repo calls the startup GC
459 # ---------------------------------------------------------------------------
460
461
462 class TestRequireRepoCallsGc:
463 """require_repo() triggers the full startup GC sweep."""
464
465 def test_require_repo_removes_obj_tmp(self, tmp_path: pathlib.Path) -> None:
466 repo = _repo(tmp_path)
467 shard = _shard(repo, "aa")
468 shard.mkdir(parents=True, exist_ok=True)
469 stale = _plant_stale_obj_tmp(shard)
470 # Call require_repo with MUSE_REPO_ROOT override so it finds the repo
471 ctx = require_repo(start=repo)
472 assert ctx == repo
473 assert not stale.exists()
474
475 def test_require_repo_removes_muse_tmp(self, tmp_path: pathlib.Path) -> None:
476 repo = _repo(tmp_path)
477 stale = _plant_stale_muse_tmp(muse_dir(repo))
478 require_repo(start=repo)
479 assert not stale.exists()
480
481 def test_require_repo_removes_stat_cache_tmp(self, tmp_path: pathlib.Path) -> None:
482 repo = _repo(tmp_path)
483 stale = _plant_stale_stat_cache_tmp(muse_dir(repo))
484 require_repo(start=repo)
485 assert not stale.exists()
486
487 def test_require_repo_sweeps_all_families_at_once(self, tmp_path: pathlib.Path) -> None:
488 repo = _repo(tmp_path)
489 shard = _shard(repo, "bb")
490 shard.mkdir(parents=True, exist_ok=True)
491 f1 = _plant_stale_obj_tmp(shard)
492 f2 = _plant_stale_restore_tmp(shard)
493 f3 = _plant_stale_muse_tmp(muse_dir(repo))
494 f4 = _plant_stale_muse_tmp(muse_dir(repo), "commits")
495 f5 = _plant_stale_stat_cache_tmp(muse_dir(repo))
496 require_repo(start=repo)
497 for f in (f1, f2, f3, f4, f5):
498 assert not f.exists(), f"{f.name} should have been swept"
499
500 def test_require_repo_not_in_repo_still_raises(self, tmp_path: pathlib.Path) -> None:
501 """require_repo on a non-repo path still exits — GC is not run on miss."""
502 with pytest.raises(SystemExit):
503 require_repo(start=tmp_path)
504
505
506 # ---------------------------------------------------------------------------
507 # 6. Multiple consecutive SIGKILLs — accumulated stale files all swept
508 # ---------------------------------------------------------------------------
509
510
511 class TestMultipleSigkills:
512 """Simulate N crashes: stale files from each accumulate and are all swept."""
513
514 def test_three_crash_generations_all_swept(self, tmp_path: pathlib.Path) -> None:
515 repo = _repo(tmp_path)
516 muse = muse_dir(repo)
517 shard = _shard(repo, "cc")
518 shard.mkdir(parents=True, exist_ok=True)
519
520 # Simulate 3 separate crashes leaving stale files from each family.
521 stales: list[pathlib.Path] = []
522 for _ in range(3):
523 stales.append(_plant_stale_obj_tmp(shard))
524 stales.append(_plant_stale_restore_tmp(shard))
525 stales.append(_plant_stale_muse_tmp(muse))
526 stales.append(_plant_stale_muse_tmp(muse, "commits"))
527 stales.append(_plant_stale_stat_cache_tmp(muse))
528
529 assert len(stales) == 15
530 _startup_gc(repo)
531 for f in stales:
532 assert not f.exists(), f"Stale file survived: {f.name}"
533
534 def test_gc_count_is_accurate(self, tmp_path: pathlib.Path) -> None:
535 repo = _repo(tmp_path)
536 muse = muse_dir(repo)
537 (muse / "tags").mkdir()
538 for _ in range(4):
539 _plant_stale_muse_tmp(muse)
540 for _ in range(3):
541 _plant_stale_stat_cache_tmp(muse)
542 # 7 total stale files in muse dir
543 removed = _cleanup_muse_dir_temps(muse)
544 assert removed == 7
545 assert _count_stale_files(repo) == 0
546
547
548 # ---------------------------------------------------------------------------
549 # 7. SIGKILL at T+50ms / T+100ms / T+200ms — object-store write sequence
550 # ---------------------------------------------------------------------------
551
552
553 class TestSigkillAtTimingWindows:
554 """Subprocess SIGKILL at precise timing windows: store stays consistent."""
555
556 @pytest.mark.slow
557 @pytest.mark.parametrize("delay_ms", [50, 100, 200])
558 def test_object_store_consistent_after_sigkill(
559 self, tmp_path: pathlib.Path, delay_ms: int
560 ) -> None:
561 repo = _repo(tmp_path)
562
563 # Pre-write 10 known objects before the crashable process starts.
564 pre_data: list[tuple[str, bytes]] = []
565 for i in range(10):
566 payload = f"pre-kill-{i:03d}".encode()
567 oid = _oid(payload)
568 write_object(repo, oid, payload)
569 pre_data.append((oid, payload))
570
571 # Spawn a fresh process that writes objects in a tight loop.
572 ctx = multiprocessing.get_context("spawn")
573 proc = ctx.Process(target=_write_objects_worker, args=(repo, 2000))
574 proc.start()
575
576 # Kill it at the specified timing window.
577 time.sleep(delay_ms / 1000.0)
578 if proc.is_alive():
579 assert proc.pid is not None
580 os.kill(proc.pid, signal.SIGKILL)
581 proc.join(timeout=5)
582
583 # Startup GC: simulates the next command after the crash.
584 _startup_gc(repo)
585
586 # Every pre-kill object must still be readable and hash-verified.
587 # (Stale temp cleanup is not asserted here — cleanup_stale_object_temps
588 # has a 60-second age gate to protect concurrent in-progress writes, so
589 # a temp file created <60 s ago is intentionally left alone. That
590 # guarantee is covered by TestStartupGcObjectTemps with backdated mtimes.)
591 for oid, payload in pre_data:
592 assert read_object(repo, oid) == payload, (
593 f"Pre-kill object {oid[:8]} corrupted after SIGKILL at T+{delay_ms}ms"
594 )
595
596 @pytest.mark.slow
597 @pytest.mark.parametrize("delay_ms", [50, 100, 200])
598 def test_store_write_consistent_after_sigkill(
599 self, tmp_path: pathlib.Path, delay_ms: int
600 ) -> None:
601 """SIGKILL during write_text_atomic loop leaves no stale .muse-tmp-* files."""
602 repo = _repo(tmp_path)
603
604 ctx = multiprocessing.get_context("spawn")
605 proc = ctx.Process(target=_write_store_worker, args=(repo, 2000))
606 proc.start()
607
608 time.sleep(delay_ms / 1000.0)
609 if proc.is_alive():
610 assert proc.pid is not None
611 os.kill(proc.pid, signal.SIGKILL)
612 proc.join(timeout=5)
613
614 # Startup GC sweep.
615 _startup_gc(repo)
616
617 # No .muse-tmp-* files may survive.
618 muse = muse_dir(repo)
619 leftovers = list(muse.rglob(".muse-tmp-*"))
620 assert leftovers == [], (
621 f"Stale .muse-tmp-* survived SIGKILL at T+{delay_ms}ms: {leftovers}"
622 )
623
624
625 # ---------------------------------------------------------------------------
626 # 8. Full CLI commit survives SIGKILL
627 # ---------------------------------------------------------------------------
628
629
630 class TestSigkillDuringCommit:
631 """End-to-end: SIGKILL during `muse commit` leaves repo in a recoverable state."""
632
633 def _init_real_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
634 """Create a minimal real muse repo with a committed file."""
635 import subprocess
636
637 env = os.environ.copy()
638 env["MUSE_REPO_ROOT"] = str(tmp_path)
639
640 subprocess.run(["muse", "init"], cwd=str(tmp_path), env=env, check=True,
641 capture_output=True)
642 # Stage and commit a file so there is a valid HEAD.
643 (tmp_path / "file.txt").write_text("hello", encoding="utf-8")
644 subprocess.run(["muse", "code", "add", "."], cwd=str(tmp_path), env=env,
645 check=True, capture_output=True)
646 subprocess.run(["muse", "commit", "-m", "init"], cwd=str(tmp_path), env=env,
647 check=True, capture_output=True)
648 return tmp_path
649
650 @pytest.mark.slow
651 def test_muse_status_runs_after_sigkill(self, tmp_path: pathlib.Path) -> None:
652 """`muse status` must exit cleanly (exit 0) after a SIGKILL'd commit."""
653 import subprocess
654
655 repo = self._init_real_repo(tmp_path)
656
657 # Modify a file so there is something to commit.
658 (repo / "file.txt").write_text("changed", encoding="utf-8")
659 subprocess.run(["muse", "code", "add", "."], cwd=str(repo),
660 capture_output=True)
661
662 # Spawn commit subprocess and kill it immediately.
663 ctx = multiprocessing.get_context("spawn")
664 proc = ctx.Process(target=_full_commit_worker, args=(repo, "crash-me"))
665 proc.start()
666 time.sleep(0.05)
667 if proc.is_alive():
668 assert proc.pid is not None
669 os.kill(proc.pid, signal.SIGKILL)
670 proc.join(timeout=5)
671
672 # Startup GC runs on next require_repo invocation (muse status triggers it).
673 result = subprocess.run(
674 ["muse", "status"],
675 cwd=str(repo),
676 capture_output=True,
677 text=True,
678 )
679 # status must exit 0; any non-zero means repo is corrupt.
680 assert result.returncode == 0, (
681 f"muse status failed after SIGKILL:\n{result.stdout}\n{result.stderr}"
682 )
683
684 @pytest.mark.slow
685 def test_no_stale_temps_after_sigkill_and_next_command(
686 self, tmp_path: pathlib.Path
687 ) -> None:
688 """After SIGKILL + muse status, zero stale temps remain in .muse/."""
689 import subprocess
690
691 repo = self._init_real_repo(tmp_path)
692 (repo / "file.txt").write_text("changed again", encoding="utf-8")
693 subprocess.run(["muse", "code", "add", "."], cwd=str(repo), capture_output=True)
694
695 ctx = multiprocessing.get_context("spawn")
696 proc = ctx.Process(target=_full_commit_worker, args=(repo, "crash-me-2"))
697 proc.start()
698 time.sleep(0.05)
699 if proc.is_alive():
700 assert proc.pid is not None
701 os.kill(proc.pid, signal.SIGKILL)
702 proc.join(timeout=5)
703
704 # Trigger startup GC via the next command.
705 subprocess.run(["muse", "status"], cwd=str(repo), capture_output=True)
706
707 assert _count_stale_files(repo) == 0, "Stale temps remain after startup GC"
708
709
710 # ---------------------------------------------------------------------------
711 # 9. Push path idempotency under SIGKILL
712 # ---------------------------------------------------------------------------
713
714
715 class TestSigkillDuringPush:
716 """SIGKILL during push writes leaves no corruption: write_object is atomic."""
717
718 @pytest.mark.slow
719 def test_push_objects_atomic_under_sigkill(self, tmp_path: pathlib.Path) -> None:
720 """Objects pushed before kill are readable; partial objects are absent."""
721 (tmp_path / "local").mkdir()
722 (tmp_path / "remote").mkdir()
723 local = _repo(tmp_path / "local")
724 remote = _repo(tmp_path / "remote")
725
726 # Write 10 objects to local and also push them to remote before kill.
727 pre_data: list[tuple[str, bytes]] = []
728 for i in range(10):
729 payload = f"push-pre-kill-{i}".encode()
730 oid = _oid(payload)
731 write_object(local, oid, payload)
732 write_object(remote, oid, payload) # simulate push of pre-kill objects
733 pre_data.append((oid, payload))
734
735 # Spawn a process that keeps writing objects to the remote store.
736 ctx = multiprocessing.get_context("spawn")
737 proc = ctx.Process(target=_write_objects_worker, args=(remote, 2000))
738 proc.start()
739 time.sleep(0.08) # T+80ms kill
740 if proc.is_alive():
741 assert proc.pid is not None
742 os.kill(proc.pid, signal.SIGKILL)
743 proc.join(timeout=5)
744
745 # Backdate all temp files left by the killed process — the 60-second
746 # age gate in cleanup_stale_object_temps skips fresh files to protect
747 # concurrent writers; in tests we fast-forward mtime to simulate aging.
748 for f in (muse_dir(remote)).rglob("*"):
749 if f.is_file() and any(
750 f.name.startswith(p)
751 for p in (".obj-tmp-", ".restore-tmp-", ".muse-tmp-", ".stat_cache_")
752 ):
753 os.utime(f, (0, 0))
754
755 # Simulate remote-side startup GC (next muse command on the remote).
756 _startup_gc(remote)
757
758 # Remote must have no stale temps.
759 assert _count_stale_files(remote) == 0
760
761 # All pre-kill objects on remote must be intact.
762 for oid, payload in pre_data:
763 assert read_object(remote, oid) == payload, (
764 f"Remote object {oid[:8]} corrupted after push SIGKILL"
765 )
766
767 def test_push_write_object_is_idempotent(self, tmp_path: pathlib.Path) -> None:
768 """write_object called twice for same OID returns False (skip) both times."""
769 repo = _repo(tmp_path)
770 payload = b"idempotent object"
771 oid = _oid(payload)
772 first = write_object(repo, oid, payload)
773 second = write_object(repo, oid, payload)
774 assert first is True
775 assert second is False
776 assert read_object(repo, oid) == payload
777
778 def test_partial_write_interrupted_at_os_level_leaves_no_dest(
779 self, tmp_path: pathlib.Path
780 ) -> None:
781 """The mkstemp→replace contract: if replace never happens, dest is absent."""
782 repo = _repo(tmp_path)
783 data = b"will be interrupted"
784 oid = _oid(data)
785
786 # Manually plant the stale temp (simulates SIGKILL after mkstemp, before replace)
787 shard = object_path(repo, oid).parent
788 shard.mkdir(parents=True, exist_ok=True)
789 stale = _plant_stale_obj_tmp(shard)
790
791 # The destination object must NOT exist (replace never happened).
792 from muse.core.object_store import has_object
793 assert not has_object(repo, oid)
794
795 # Cleanup removes the stale temp.
796 cleanup_stale_object_temps(repo)
797 assert not stale.exists()
798
799 # A fresh write_object succeeds normally.
800 write_object(repo, oid, data)
801 assert has_object(repo, oid)
802
803
804 # ---------------------------------------------------------------------------
805 # 10. GC preserves all real stored objects
806 # ---------------------------------------------------------------------------
807
808
809 class TestGcPreservesRealObjects:
810 """_startup_gc must never delete a valid stored object."""
811
812 def test_100_objects_all_survive_gc(self, tmp_path: pathlib.Path) -> None:
813 repo = _repo(tmp_path)
814 written: list[tuple[str, bytes]] = []
815 for i in range(100):
816 payload = f"object-{i:04d}".encode()
817 oid = _oid(payload)
818 write_object(repo, oid, payload)
819 written.append((oid, payload))
820
821 _startup_gc(repo)
822
823 for oid, payload in written:
824 assert read_object(repo, oid) == payload, f"Object {oid[:8]} deleted by GC"
825
826 def test_real_head_and_config_survive_gc(self, tmp_path: pathlib.Path) -> None:
827 repo = _repo(tmp_path)
828 muse = muse_dir(repo)
829 head = muse / "HEAD"
830 head.write_text("ref: refs/heads/main\n", encoding="utf-8")
831 config = muse / "config.toml"
832 config.write_text("[core]\n name = \"test\"\n", encoding="utf-8")
833
834 _startup_gc(repo)
835
836 assert head.read_text(encoding="utf-8") == "ref: refs/heads/main\n"
837 assert "test" in config.read_text(encoding="utf-8")
838
839 def test_gc_on_empty_repo_is_noop(self, tmp_path: pathlib.Path) -> None:
840 repo = _repo(tmp_path)
841 # No objects, no stale files
842 _startup_gc(repo)
843 assert _count_stale_files(repo) == 0
844
845
846 # ---------------------------------------------------------------------------
847 # 11. Performance: GC sweep ≤ 10 ms with 1 000 stale files
848 # ---------------------------------------------------------------------------
849
850
851 class TestGcSweeperPerformance:
852 """Startup GC is fast enough to run on every require_repo invocation."""
853
854 @pytest.mark.slow
855 def test_sweep_1000_stale_files_under_500ms(self, tmp_path: pathlib.Path) -> None:
856 """GC sweeps 1 000 stale files in < 500 ms (wall clock including logging).
857
858 The budget is generous because macOS APFS + pytest log capture add
859 overhead (~50–100 μs per unlink + warning emission). On a real crash
860 scenario a repo will have at most 1–3 stale files, so steady-state
861 latency is < 1 ms. This test validates the worst-case bound.
862 """
863 repo = _repo(tmp_path)
864 muse = muse_dir(repo)
865 (muse / "commits").mkdir(exist_ok=True)
866
867 # Plant 1 000 stale .muse-tmp-* files across two subdirs.
868 for _ in range(500):
869 _plant_stale_muse_tmp(muse)
870 for _ in range(500):
871 _plant_stale_muse_tmp(muse, "commits")
872
873 start = time.perf_counter()
874 removed = _cleanup_muse_dir_temps(muse)
875 duration_ms = (time.perf_counter() - start) * 1000
876
877 assert removed == 1000
878 assert duration_ms < 500.0, (
879 f"_cleanup_muse_dir_temps took {duration_ms:.1f} ms for 1 000 files "
880 f"(budget: 500 ms)"
881 )
882
883 @pytest.mark.slow
884 def test_full_startup_gc_under_200ms(self, tmp_path: pathlib.Path) -> None:
885 """Full _startup_gc (both sweeps) is < 200 ms with 200 stale temps.
886
887 In production a crash leaves 1–3 stale files at most; this tests the
888 adversarial bound of 200 simultaneous stale temps across both the
889 object store and .muse/ directories.
890 """
891 repo = _repo(tmp_path)
892 muse = muse_dir(repo)
893 (muse / "commits").mkdir(exist_ok=True)
894
895 # 100 stale object temps across 10 shards.
896 for i in range(10):
897 shard = _shard(repo, f"{i:02x}")
898 shard.mkdir(parents=True, exist_ok=True)
899 for _ in range(10):
900 _plant_stale_obj_tmp(shard)
901
902 # 100 stale muse temps.
903 for _ in range(50):
904 _plant_stale_muse_tmp(muse)
905 for _ in range(50):
906 _plant_stale_muse_tmp(muse, "commits")
907
908 start = time.perf_counter()
909 _startup_gc(repo)
910 duration_ms = (time.perf_counter() - start) * 1000
911
912 assert duration_ms < 200.0, (
913 f"_startup_gc took {duration_ms:.1f} ms (budget: 200 ms)"
914 )
915 assert _count_stale_files(repo) == 0
916
917
918 # ---------------------------------------------------------------------------
919 # 12. .restore-tmp-* in working-tree: scope documentation test
920 # ---------------------------------------------------------------------------
921
922
923 class TestRestoreTempWorkdirBound:
924 """.restore-tmp-* files in the working tree (not .muse/) are outside GC scope.
925
926 restore_object writes to a user-provided destination directory, not inside
927 .muse/. If SIGKILL occurs between mkstemp and os.replace in restore_object,
928 the stale temp stays in the working tree.
929
930 The documented guarantee: the .muse/ repo state is never corrupted, and
931 the stale restore temp in the working tree is inert (it does not block the
932 next checkout/merge, which simply overwrites the destination path atomically).
933 """
934
935 def test_restore_tmp_in_workdir_not_swept_by_gc(self, tmp_path: pathlib.Path) -> None:
936 """GC sweeps .muse/ only — stale restore temps in workdir are out of scope."""
937 repo = _repo(tmp_path)
938 # Plant a stale restore temp in the working tree (outside .muse/)
939 fd, stale_str = tempfile.mkstemp(dir=tmp_path, prefix=".restore-tmp-")
940 os.close(fd)
941 stale = pathlib.Path(stale_str)
942 stale.write_bytes(b"stale workdir restore temp")
943
944 # GC sweeps only .muse/; workdir stale is untouched.
945 _startup_gc(repo)
946
947 # Stale in workdir persists — this is the documented limitation.
948 assert stale.exists(), (
949 "GC must not touch working-tree files outside .muse/"
950 )
951 # But .muse/ is clean.
952 assert _count_stale_files(repo) == 0
953
954 stale.unlink() # cleanup
955
956 def test_restore_tmp_in_obj_shard_IS_swept(self, tmp_path: pathlib.Path) -> None:
957 """.restore-tmp-* inside .muse/objects/ shard dirs ARE swept (object store scope)."""
958 repo = _repo(tmp_path)
959 shard = _shard(repo, "dd")
960 shard.mkdir(parents=True, exist_ok=True)
961 stale = _plant_stale_restore_tmp(shard)
962 _startup_gc(repo)
963 assert not stale.exists(), ".restore-tmp-* in object shard must be swept"
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago