gabriel / muse public
test_cmd_coord_lifecycle.py python
1,189 lines 52.5 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago
1 """Tests for ``muse coord release``, ``muse coord heartbeat``, and ``muse coord gc``.
2
3 Coverage matrix
4 ---------------
5 Unit
6 ~~~~
7 * :func:`muse.core.coordination.create_release` — write, idempotency, record_id guard
8 * :func:`muse.core.coordination.load_all_releases` — load, corrupt-file tolerance
9 * :func:`muse.core.coordination.load_released_ids` — efficiency path (stems only)
10 * :func:`muse.core.coordination.create_heartbeat` — write, atomic overwrite
11 * :func:`muse.core.coordination.load_heartbeat_map` — load, corrupt-file tolerance
12 * :func:`muse.core.coordination.run_coord_gc` — dry-run, execute, grace period, orphans
13 * :func:`muse.core.coordination.active_reservations` — lifecycle-aware liveness
14
15 Integration
16 ~~~~~~~~~~~
17 * ``muse coord release`` — single, --all-for-run, idempotent, not-found, bad args
18 * ``muse coord heartbeat`` — extends TTL, released guard, not-found, bad args
19 * ``muse coord gc`` — dry-run, execute, --include-intents, --verbose, JSON output
20 * ``muse coord list`` — lifecycle-aware fields (released, effective_expires_at)
21
22 Security
23 ~~~~~~~~
24 * record_id injection in reservation_id → rejected before file I/O
25 * Path traversal attempts via reservation_id → ValueError from _validate_reservation_id
26 * ANSI escape sequences in run_id / reservation_id not written to files
27 * GC does not follow symlinks or traverse outside .muse/coordination/
28
29 Stress
30 ~~~~~~
31 * 200 reservations → release all via --all-for-run in < 2 s
32 * 500 expired reservations → GC collects all in < 3 s
33 * 100 heartbeats → load_heartbeat_map in < 0.5 s
34 * release + heartbeat + gc round-trip correctness under concurrent-ish writes
35 """
36
37 from __future__ import annotations
38
39 import datetime
40 import itertools
41 import json
42 import pathlib
43 import time
44
45 import pytest
46
47 from muse.core.types import MsgpackDict, content_hash, fake_id
48 from muse.core.paths import muse_dir
49 from muse.core.coordination import Reservation
50
51 _id_seq = itertools.count()
52
53 def _new_id() -> str:
54 return content_hash({"seq": next(_id_seq)})
55
56 # ── Helpers ───────────────────────────────────────────────────────────────────
57
58
59 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
60 """Initialise a minimal muse repo (just the .muse dir + HEAD)."""
61 dot_muse = muse_dir(tmp_path)
62 dot_muse.mkdir()
63 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
64 return tmp_path
65
66
67 def _now_utc() -> datetime.datetime:
68 return datetime.datetime.now(datetime.timezone.utc)
69
70
71 def _dt_iso(dt: datetime.datetime) -> str:
72 return dt.isoformat()
73
74
75 # ── Fixtures ──────────────────────────────────────────────────────────────────
76
77
78 @pytest.fixture()
79 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
80 return _make_repo(tmp_path)
81
82
83 @pytest.fixture()
84 def reservation(repo: pathlib.Path) -> Reservation:
85 """A fresh active reservation in *repo*."""
86 from muse.core.coordination import create_reservation
87 return create_reservation(
88 repo,
89 run_id="agent-test",
90 branch="feat/test",
91 addresses=["src/billing.py::compute_total"],
92 ttl_seconds=3600,
93 )
94
95
96 @pytest.fixture()
97 def expired_reservation(repo: pathlib.Path) -> Reservation:
98 """A reservation whose TTL has already passed."""
99 from muse.core.coordination import create_reservation
100 res = create_reservation(
101 repo,
102 run_id="agent-expired",
103 branch="feat/expired",
104 addresses=["src/old.py::dead_code"],
105 ttl_seconds=1,
106 )
107 # Back-date expires_at on disk so it's expired.
108 import json as _json
109 from muse.core.coordination import _reservations_dir
110 path = _reservations_dir(repo) / f"{res.reservation_id}.json"
111 data = _json.loads(path.read_text())
112 data["expires_at"] = (_now_utc() - datetime.timedelta(seconds=10)).isoformat()
113 path.write_text(_json.dumps(data))
114 res.expires_at = _now_utc() - datetime.timedelta(seconds=10)
115 return res
116
117
118 # ─────────────────────────────────────────────────────────────────────────────
119 # Unit tests — coordination.py primitives
120 # ─────────────────────────────────────────────────────────────────────────────
121
122
123 class TestCreateRelease:
124 def test_creates_tombstone_file(self, repo: pathlib.Path, reservation: Reservation) -> None:
125 from muse.core.coordination import _releases_dir, create_release
126 rel = create_release(repo, reservation.reservation_id, "agent-test", "completed")
127 path = _releases_dir(repo) / f"{reservation.reservation_id}.json"
128 assert path.exists()
129 assert rel.reason == "completed"
130
131 def test_roundtrip_from_dict(self, repo: pathlib.Path, reservation: Reservation) -> None:
132 from muse.core.coordination import Release, create_release
133 rel = create_release(repo, reservation.reservation_id, "agent-test", "cancelled")
134 data = rel.to_dict()
135 rel2 = Release.from_dict(data)
136 assert rel2.reservation_id == rel.reservation_id
137 assert rel2.reason == "cancelled"
138
139 def test_raises_file_exists_on_double_release(self, repo: pathlib.Path, reservation: Reservation) -> None:
140 from muse.core.coordination import create_release
141 create_release(repo, reservation.reservation_id, "agent-test", "completed")
142 with pytest.raises(FileExistsError, match="already released"):
143 create_release(repo, reservation.reservation_id, "agent-test", "cancelled")
144
145 def test_rejects_invalid_record_id(self, repo: pathlib.Path) -> None:
146 from muse.core.coordination import create_release
147 with pytest.raises(ValueError, match="sha256"):
148 create_release(repo, "../../etc/passwd", "agent", "completed")
149
150 def test_rejects_invalid_reason(self, repo: pathlib.Path, reservation: Reservation) -> None:
151 from muse.core.coordination import create_release
152 with pytest.raises(ValueError, match="reason must be one of"):
153 create_release(repo, reservation.reservation_id, "agent", "unknown-reason")
154
155 def test_valid_reasons(self, repo: pathlib.Path) -> None:
156 from muse.core.coordination import create_reservation, create_release
157 for i, reason in enumerate(("completed", "cancelled", "superseded")):
158 res = create_reservation(repo, run_id=f"ag{i}", branch="b", addresses=["x::y"])
159 rel = create_release(repo, res.reservation_id, f"ag{i}", reason)
160 assert rel.reason == reason
161
162 def test_path_containment_check(self, repo: pathlib.Path, reservation: Reservation) -> None:
163 """record_id validation prevents path traversal before containment check."""
164 from muse.core.coordination import create_release
165 bad_ids = [
166 "../../../etc",
167 "a/b/c",
168 "not-a-record-id",
169 "",
170 ]
171 for bad_id in bad_ids:
172 with pytest.raises(ValueError):
173 create_release(repo, bad_id, "agent", "completed")
174
175
176 class TestLoadReleases:
177 def test_empty_dir(self, repo: pathlib.Path) -> None:
178 from muse.core.coordination import load_all_releases, load_released_ids
179 assert load_all_releases(repo) == []
180 assert load_released_ids(repo) == frozenset()
181
182 def test_missing_dir_returns_empty(self, tmp_path: pathlib.Path) -> None:
183 repo = _make_repo(tmp_path)
184 from muse.core.coordination import load_all_releases, load_released_ids
185 assert load_all_releases(repo) == []
186 assert load_released_ids(repo) == frozenset()
187
188 def test_load_released_ids_uses_stems(self, repo: pathlib.Path, reservation: Reservation) -> None:
189 from muse.core.coordination import create_release, load_released_ids
190 create_release(repo, reservation.reservation_id, "ag", "completed")
191 ids = load_released_ids(repo)
192 assert reservation.reservation_id in ids
193
194 def test_corrupt_file_skipped(self, repo: pathlib.Path, reservation: Reservation) -> None:
195 from muse.core.coordination import _releases_dir, create_release, load_all_releases
196 create_release(repo, reservation.reservation_id, "ag", "completed")
197 # Write a corrupt file alongside.
198 (_releases_dir(repo) / "corrupt.json").write_text("not-json")
199 releases = load_all_releases(repo)
200 # Only the valid one is loaded; corrupt one is silently skipped.
201 assert len(releases) == 1
202
203 def test_load_released_ids_includes_corrupt_stem(self, repo: pathlib.Path) -> None:
204 """load_released_ids uses stems only — corrupt JSON doesn't block it."""
205 from muse.core.coordination import _releases_dir, _ensure_coord_dirs, load_released_ids
206 _ensure_coord_dirs(repo)
207 stem_id = fake_id("corrupt-release-stem")
208 (_releases_dir(repo) / f"{stem_id}.json").write_text("not-json")
209 ids = load_released_ids(repo)
210 assert stem_id in ids
211
212
213 class TestCreateHeartbeat:
214 def test_creates_heartbeat_file(self, repo: pathlib.Path, reservation: Reservation) -> None:
215 from muse.core.coordination import _heartbeats_dir, create_heartbeat
216 hb = create_heartbeat(repo, reservation.reservation_id, "agent-test", 3600)
217 path = _heartbeats_dir(repo) / f"{reservation.reservation_id}.json"
218 assert path.exists()
219 assert hb.extended_expires_at > _now_utc()
220
221 def test_atomic_overwrite(self, repo: pathlib.Path, reservation: Reservation) -> None:
222 from muse.core.coordination import create_heartbeat
223 hb1 = create_heartbeat(repo, reservation.reservation_id, "ag", 600)
224 hb2 = create_heartbeat(repo, reservation.reservation_id, "ag", 7200)
225 # Second heartbeat extends further.
226 assert hb2.extended_expires_at > hb1.extended_expires_at
227
228 def test_rejects_invalid_record_id(self, repo: pathlib.Path) -> None:
229 from muse.core.coordination import create_heartbeat
230 with pytest.raises(ValueError, match="sha256"):
231 create_heartbeat(repo, "not/a-record-id", "ag", 3600)
232
233 def test_rejects_non_positive_extension(self, repo: pathlib.Path, reservation: Reservation) -> None:
234 from muse.core.coordination import create_heartbeat
235 with pytest.raises(ValueError, match="extension_seconds"):
236 create_heartbeat(repo, reservation.reservation_id, "ag", 0)
237 with pytest.raises(ValueError, match="extension_seconds"):
238 create_heartbeat(repo, reservation.reservation_id, "ag", -1)
239
240 def test_roundtrip(self, repo: pathlib.Path, reservation: Reservation) -> None:
241 from muse.core.coordination import Heartbeat, create_heartbeat
242 hb = create_heartbeat(repo, reservation.reservation_id, "ag", 1800)
243 data = hb.to_dict()
244 hb2 = Heartbeat.from_dict(data)
245 assert hb2.reservation_id == hb.reservation_id
246 assert hb2.run_id == hb.run_id
247
248
249 class TestLoadHeartbeatMap:
250 def test_empty(self, repo: pathlib.Path) -> None:
251 from muse.core.coordination import load_heartbeat_map
252 assert load_heartbeat_map(repo) == {}
253
254 def test_missing_dir_returns_empty(self, tmp_path: pathlib.Path) -> None:
255 repo = _make_repo(tmp_path)
256 from muse.core.coordination import load_heartbeat_map
257 assert load_heartbeat_map(repo) == {}
258
259 def test_load_single(self, repo: pathlib.Path, reservation: Reservation) -> None:
260 from muse.core.coordination import create_heartbeat, load_heartbeat_map
261 create_heartbeat(repo, reservation.reservation_id, "ag", 3600)
262 hb_map = load_heartbeat_map(repo)
263 assert reservation.reservation_id in hb_map
264
265 def test_corrupt_file_skipped(self, repo: pathlib.Path, reservation: Reservation) -> None:
266 from muse.core.coordination import (
267 _heartbeats_dir, create_heartbeat, load_heartbeat_map,
268 )
269 create_heartbeat(repo, reservation.reservation_id, "ag", 3600)
270 (_heartbeats_dir(repo) / "corrupt.json").write_text("bad-json")
271 hb_map = load_heartbeat_map(repo)
272 assert len(hb_map) == 1
273
274
275 class TestActiveReservations:
276 def test_active_reservation_included(self, repo: pathlib.Path, reservation: Reservation) -> None:
277 from muse.core.coordination import active_reservations
278 active = active_reservations(repo)
279 ids = [r.reservation_id for r in active]
280 assert reservation.reservation_id in ids
281
282 def test_released_excluded(self, repo: pathlib.Path, reservation: Reservation) -> None:
283 from muse.core.coordination import active_reservations, create_release
284 create_release(repo, reservation.reservation_id, "ag", "completed")
285 active = active_reservations(repo)
286 ids = [r.reservation_id for r in active]
287 assert reservation.reservation_id not in ids
288
289 def test_expired_excluded(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
290 from muse.core.coordination import active_reservations
291 active = active_reservations(repo)
292 ids = [r.reservation_id for r in active]
293 assert expired_reservation.reservation_id not in ids
294
295 def test_heartbeat_extends_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
296 from muse.core.coordination import active_reservations, create_heartbeat
297 # Heartbeat extends the expired reservation.
298 create_heartbeat(repo, expired_reservation.reservation_id, "ag", 3600)
299 active = active_reservations(repo)
300 ids = [r.reservation_id for r in active]
301 assert expired_reservation.reservation_id in ids
302
303 def test_released_with_heartbeat_excluded(self, repo: pathlib.Path, reservation: Reservation) -> None:
304 """A released reservation stays excluded even if a heartbeat exists."""
305 from muse.core.coordination import (
306 active_reservations, create_heartbeat, create_release,
307 )
308 create_release(repo, reservation.reservation_id, "ag", "completed")
309 create_heartbeat(repo, reservation.reservation_id, "ag", 7200)
310 active = active_reservations(repo)
311 ids = [r.reservation_id for r in active]
312 assert reservation.reservation_id not in ids
313
314
315 class TestRunCoordGc:
316 def test_dry_run_removes_nothing(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
317 from muse.core.coordination import _reservations_dir, run_coord_gc
318 result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0)
319 assert result.dry_run is True
320 assert result.reservations_removed >= 1
321 # File still exists after dry run.
322 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
323 assert path.exists()
324
325 def test_execute_removes_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
326 from muse.core.coordination import _reservations_dir, run_coord_gc
327 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
328 assert result.reservations_removed >= 1
329 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
330 assert not path.exists()
331
332 def test_active_reservation_not_collected(self, repo: pathlib.Path, reservation: Reservation) -> None:
333 from muse.core.coordination import _reservations_dir, run_coord_gc
334 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
335 path = _reservations_dir(repo) / f"{reservation.reservation_id}.json"
336 assert path.exists()
337
338 def test_grace_period_protects_recently_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
339 """With a large grace period, the recently-expired record is not collected."""
340 from muse.core.coordination import _reservations_dir, run_coord_gc
341 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=9999)
342 assert result.reservations_removed == 0
343 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
344 assert path.exists()
345
346 def test_removes_release_tombstone_with_reservation(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
347 from muse.core.coordination import (
348 _releases_dir, create_release, run_coord_gc,
349 )
350 create_release(repo, expired_reservation.reservation_id, "ag", "completed")
351 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
352 path = _releases_dir(repo) / f"{expired_reservation.reservation_id}.json"
353 assert not path.exists()
354
355 def test_removes_heartbeat_with_reservation(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
356 from muse.core.coordination import (
357 _heartbeats_dir, create_heartbeat, run_coord_gc,
358 )
359 # Need to back-date the heartbeat too, so effective expiry is still in the past.
360 hb = create_heartbeat(repo, expired_reservation.reservation_id, "ag", 1)
361 # Back-date extended_expires_at.
362 import json as _json
363 path = _heartbeats_dir(repo) / f"{expired_reservation.reservation_id}.json"
364 data = _json.loads(path.read_text())
365 data["extended_expires_at"] = (_now_utc() - datetime.timedelta(seconds=5)).isoformat()
366 path.write_text(_json.dumps(data))
367 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
368 assert not path.exists()
369
370 def test_removes_orphaned_release(self, repo: pathlib.Path) -> None:
371 """Release file without a matching reservation is an orphan — collect it."""
372 from muse.core.coordination import (
373 _ensure_coord_dirs, _releases_dir, run_coord_gc,
374 )
375 _ensure_coord_dirs(repo)
376 orphan_id = _new_id()
377 orphan_path = _releases_dir(repo) / f"{orphan_id}.json"
378 orphan_path.write_text(json.dumps({"reservation_id": orphan_id, "reason": "completed"}))
379 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
380 assert result.releases_removed >= 1
381 assert not orphan_path.exists()
382
383 def test_removes_orphaned_heartbeat(self, repo: pathlib.Path) -> None:
384 """Heartbeat file without a matching reservation is an orphan — collect it."""
385 from muse.core.coordination import (
386 _ensure_coord_dirs, _heartbeats_dir, run_coord_gc,
387 )
388 _ensure_coord_dirs(repo)
389 orphan_id = _new_id()
390 orphan_path = _heartbeats_dir(repo) / f"{orphan_id}.json"
391 orphan_path.write_text(json.dumps({"reservation_id": orphan_id}))
392 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
393 assert result.heartbeats_removed >= 1
394 assert not orphan_path.exists()
395
396 def test_include_intents_off_by_default(self, repo: pathlib.Path) -> None:
397 from muse.core.coordination import create_reservation, create_intent, run_coord_gc
398 res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"])
399 create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "old→new")
400 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
401 assert result.intents_removed == 0
402
403 def test_include_intents_removes_old(self, repo: pathlib.Path) -> None:
404 from muse.core.coordination import (
405 _intents_dir, create_intent, create_reservation, run_coord_gc,
406 )
407 res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"])
408 create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "old→new")
409 # Back-date the intent.
410 intent_path = next(_intents_dir(repo).glob("*.json"))
411 import json as _json
412 data = _json.loads(intent_path.read_text())
413 data["created_at"] = (_now_utc() - datetime.timedelta(days=8)).isoformat()
414 intent_path.write_text(_json.dumps(data))
415 result = run_coord_gc(
416 repo, dry_run=False, grace_period_seconds=0,
417 include_intents=True, max_intent_age_seconds=3600,
418 )
419 assert result.intents_removed == 1
420 assert not intent_path.exists()
421
422 def test_result_totals_match_parts(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
423 from muse.core.coordination import run_coord_gc
424 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
425 assert result.total_removed == (
426 result.reservations_removed
427 + result.releases_removed
428 + result.heartbeats_removed
429 + result.intents_removed
430 )
431 assert result.total_removed_bytes == (
432 result.reservations_removed_bytes
433 + result.releases_removed_bytes
434 + result.heartbeats_removed_bytes
435 + result.intents_removed_bytes
436 )
437
438 def test_duration_ms_recorded(self, repo: pathlib.Path) -> None:
439 from muse.core.coordination import run_coord_gc
440 result = run_coord_gc(repo, dry_run=True)
441 assert result.duration_ms >= 0
442
443 def test_removed_ids_populated(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
444 from muse.core.coordination import run_coord_gc
445 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
446 assert expired_reservation.reservation_id in result.removed_ids
447
448
449 # ─────────────────────────────────────────────────────────────────────────────
450 # Integration tests — muse coord release
451 # ─────────────────────────────────────────────────────────────────────────────
452
453
454 class TestReleaseCmdSingle:
455 def _run(self, args: MsgpackDict, repo: pathlib.Path) -> int:
456 from muse.cli.commands.release_coord import run as release_run
457 import argparse, os
458 ns = argparse.Namespace(
459 reservation_id=args.get("reservation_id"),
460 run_id=args.get("run_id", "agent-42"),
461 reason=args.get("reason", "completed"),
462 all_for_run=None,
463 json_out=args.get("fmt", "json") != "text",
464 )
465 old = os.getcwd()
466 os.chdir(repo)
467 try:
468 release_run(ns)
469 return 0
470 except SystemExit as exc:
471 return exc.code
472 finally:
473 os.chdir(old)
474
475 def test_release_success(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
476 code = self._run({"reservation_id": reservation.reservation_id}, repo)
477 assert code == 0
478 out = capsys.readouterr().out
479 data = json.loads(out)
480 assert data["status"] == "released"
481 assert data["reservation_id"] == reservation.reservation_id
482 assert data["reason"] == "completed"
483 assert isinstance(data["duration_ms"], float)
484
485 def test_release_reason_cancelled(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
486 code = self._run(
487 {"reservation_id": reservation.reservation_id, "reason": "cancelled"},
488 repo,
489 )
490 assert code == 0
491 out = capsys.readouterr().out
492 data = json.loads(out)
493 assert data["reason"] == "cancelled"
494
495 def test_release_already_released_is_idempotent(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
496 from muse.core.coordination import create_release
497 create_release(repo, reservation.reservation_id, "ag", "completed")
498 code = self._run({"reservation_id": reservation.reservation_id}, repo)
499 assert code == 0
500 out = capsys.readouterr().out
501 data = json.loads(out)
502 assert data["status"] == "already_released"
503
504 def test_release_not_found_exits_4(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
505 nonexistent_id = fake_id("nonexistent-release")
506 code = self._run({"reservation_id": nonexistent_id}, repo)
507 assert code == 4 # ExitCode.NOT_FOUND
508 out = capsys.readouterr().out
509 data = json.loads(out)
510 assert data["status"] == "not_found"
511
512 def test_bad_record_id_exits_1(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
513 code = self._run({"reservation_id": "not-a-record-id"}, repo)
514 assert code == 1 # ExitCode.USER_ERROR
515
516 def test_text_output(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
517 from muse.cli.commands.release_coord import run as release_run
518 import argparse, os
519 ns = argparse.Namespace(
520 reservation_id=reservation.reservation_id,
521 run_id="agent-42",
522 reason="completed",
523 all_for_run=None,
524 json_out=False,
525 )
526 old = os.getcwd()
527 os.chdir(repo)
528 try:
529 release_run(ns)
530 except SystemExit:
531 pass
532 finally:
533 os.chdir(old)
534 out = capsys.readouterr().out
535 assert "released" in out
536 assert reservation.reservation_id[:8] in out
537
538
539 class TestReleaseCmdBatch:
540 def _run_batch(self, repo: pathlib.Path, all_for_run: str, run_id: str = "controller") -> None:
541 from muse.cli.commands.release_coord import run as release_run
542 import argparse, os
543 ns = argparse.Namespace(
544 reservation_id=None,
545 run_id=run_id,
546 reason="completed",
547 all_for_run=all_for_run,
548 json_out=True,
549 )
550 old = os.getcwd()
551 os.chdir(repo)
552 try:
553 release_run(ns)
554 except SystemExit:
555 pass
556 finally:
557 os.chdir(old)
558
559 def test_batch_releases_all_for_run(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
560 from muse.core.coordination import create_reservation, load_released_ids
561 ids = []
562 for i in range(5):
563 res = create_reservation(
564 repo, run_id="batch-agent", branch="b", addresses=[f"f.py::fn{i}"]
565 )
566 ids.append(res.reservation_id)
567 # One extra reservation for a different run.
568 other = create_reservation(
569 repo, run_id="other-agent", branch="b", addresses=["g.py::fn"]
570 )
571 self._run_batch(repo, "batch-agent")
572 released = load_released_ids(repo)
573 for rid in ids:
574 assert rid in released
575 # Other agent's reservation not released.
576 assert other.reservation_id not in released
577
578 def test_batch_skips_already_released(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
579 from muse.core.coordination import create_release
580 create_release(repo, reservation.reservation_id, "ag", "completed")
581 self._run_batch(repo, "agent-test")
582 out = capsys.readouterr().out
583 data = json.loads(out)
584 assert data["skipped_already_released"] >= 1
585
586 def test_cannot_combine_id_and_all_for_run(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
587 from muse.cli.commands.release_coord import run as release_run
588 import argparse, os
589 ns = argparse.Namespace(
590 reservation_id=reservation.reservation_id,
591 run_id="ag",
592 reason="completed",
593 all_for_run="ag",
594 json_out=True,
595 )
596 old = os.getcwd()
597 os.chdir(repo)
598 try:
599 with pytest.raises(SystemExit) as exc:
600 release_run(ns)
601 finally:
602 os.chdir(old)
603 assert exc.value.code == 1 # ExitCode.USER_ERROR
604
605 def test_must_specify_id_or_all_for_run(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
606 from muse.cli.commands.release_coord import run as release_run
607 import argparse, os
608 ns = argparse.Namespace(
609 reservation_id=None,
610 run_id="ag",
611 reason="completed",
612 all_for_run=None,
613 json_out=True,
614 )
615 old = os.getcwd()
616 os.chdir(repo)
617 try:
618 with pytest.raises(SystemExit) as exc:
619 release_run(ns)
620 finally:
621 os.chdir(old)
622 assert exc.value.code == 1 # ExitCode.USER_ERROR
623
624
625 # ─────────────────────────────────────────────────────────────────────────────
626 # Integration tests — muse coord heartbeat
627 # ─────────────────────────────────────────────────────────────────────────────
628
629
630 class TestHeartbeatCmd:
631 def _run(self, args: MsgpackDict, repo: pathlib.Path) -> int:
632 from muse.cli.commands.heartbeat_coord import run as hb_run
633 import argparse, os
634 ns = argparse.Namespace(
635 reservation_id=args.get("reservation_id"),
636 run_id=args.get("run_id", "agent-42"),
637 extension_seconds=args.get("extension_seconds", 3600),
638 json_out=args.get("fmt", "json") != "text",
639 )
640 old = os.getcwd()
641 os.chdir(repo)
642 try:
643 with pytest.raises(SystemExit) as exc:
644 hb_run(ns)
645 return exc.value.code
646 except SystemExit as exc:
647 return exc.code
648 finally:
649 os.chdir(old)
650
651 def _run_no_exit(self, args: MsgpackDict, repo: pathlib.Path) -> None:
652 from muse.cli.commands.heartbeat_coord import run as hb_run
653 import argparse, os
654 ns = argparse.Namespace(
655 reservation_id=args.get("reservation_id"),
656 run_id=args.get("run_id", "agent-42"),
657 extension_seconds=args.get("extension_seconds", 3600),
658 json_out=args.get("fmt", "json") != "text",
659 )
660 old = os.getcwd()
661 os.chdir(repo)
662 try:
663 hb_run(ns)
664 except SystemExit:
665 pass
666 finally:
667 os.chdir(old)
668
669 def test_heartbeat_success(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
670 self._run_no_exit({"reservation_id": reservation.reservation_id}, repo)
671 out = capsys.readouterr().out
672 data = json.loads(out)
673 assert data["status"] == "ok"
674 assert data["reservation_id"] == reservation.reservation_id
675 assert data["ttl_extended_seconds"] == 3600
676 assert isinstance(data["duration_ms"], float)
677
678 def test_heartbeat_extends_further_on_repeat(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
679 from muse.core.coordination import load_heartbeat_map
680 self._run_no_exit(
681 {"reservation_id": reservation.reservation_id, "extension_seconds": 600},
682 repo,
683 )
684 hb1 = load_heartbeat_map(repo)[reservation.reservation_id]
685 self._run_no_exit(
686 {"reservation_id": reservation.reservation_id, "extension_seconds": 7200},
687 repo,
688 )
689 hb2 = load_heartbeat_map(repo)[reservation.reservation_id]
690 assert hb2.extended_expires_at > hb1.extended_expires_at
691
692 def test_heartbeat_makes_expired_active(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
693 from muse.core.coordination import active_reservations
694 # Before heartbeat, expired.
695 assert expired_reservation.reservation_id not in [
696 r.reservation_id for r in active_reservations(repo)
697 ]
698 self._run_no_exit(
699 {"reservation_id": expired_reservation.reservation_id},
700 repo,
701 )
702 # After heartbeat, active.
703 assert expired_reservation.reservation_id in [
704 r.reservation_id for r in active_reservations(repo)
705 ]
706
707 def test_heartbeat_refused_for_released(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
708 from muse.core.coordination import create_release
709 create_release(repo, reservation.reservation_id, "ag", "completed")
710 code = self._run({"reservation_id": reservation.reservation_id}, repo)
711 assert code == 1
712 out = capsys.readouterr().out
713 data = json.loads(out)
714 assert data["status"] == "already_released"
715
716 def test_heartbeat_not_found_exits_4(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
717 nonexistent_id = fake_id("nonexistent-heartbeat")
718 code = self._run({"reservation_id": nonexistent_id}, repo)
719 assert code == 4 # ExitCode.NOT_FOUND
720 out = capsys.readouterr().out
721 data = json.loads(out)
722 assert data["status"] == "not_found"
723
724 def test_bad_record_id_exits_1(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
725 code = self._run({"reservation_id": "bad//id"}, repo)
726 assert code == 1 # ExitCode.USER_ERROR
727
728 def test_negative_extension_exits_1(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
729 code = self._run(
730 {"reservation_id": reservation.reservation_id, "extension_seconds": -1},
731 repo,
732 )
733 assert code == 1 # ExitCode.USER_ERROR
734
735 def test_zero_extension_exits_1(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
736 code = self._run(
737 {"reservation_id": reservation.reservation_id, "extension_seconds": 0},
738 repo,
739 )
740 assert code == 1 # ExitCode.USER_ERROR
741
742 def test_text_output(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
743 from muse.cli.commands.heartbeat_coord import run as hb_run
744 import argparse, os
745 ns = argparse.Namespace(
746 reservation_id=reservation.reservation_id,
747 run_id="agent-42",
748 extension_seconds=3600,
749 json_out=False,
750 )
751 old = os.getcwd()
752 os.chdir(repo)
753 try:
754 hb_run(ns)
755 except SystemExit:
756 pass
757 finally:
758 os.chdir(old)
759 out = capsys.readouterr().out
760 assert "heartbeat" in out
761 assert reservation.reservation_id[:8] in out
762
763
764 # ─────────────────────────────────────────────────────────────────────────────
765 # Integration tests — muse coord gc
766 # ─────────────────────────────────────────────────────────────────────────────
767
768
769 class TestCoordGcCmd:
770 def _run(self, args: MsgpackDict, repo: pathlib.Path) -> None:
771 from muse.cli.commands.coord_gc import run as gc_run
772 import argparse, os
773 ns = argparse.Namespace(
774 execute=args.get("execute", False),
775 grace_period_seconds=args.get("grace_period_seconds", 300),
776 include_intents=args.get("include_intents", False),
777 max_intent_age_seconds=args.get("max_intent_age_seconds", 604800),
778 verbose=args.get("verbose", False),
779 json_out=args.get("fmt", "json") != "text",
780 )
781 old = os.getcwd()
782 os.chdir(repo)
783 try:
784 gc_run(ns)
785 except SystemExit:
786 pass
787 finally:
788 os.chdir(old)
789
790 def test_dry_run_by_default(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
791 self._run({}, repo)
792 out = capsys.readouterr().out
793 data = json.loads(out)
794 assert data["dry_run"] is True
795
796 def test_dry_run_does_not_delete(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
797 from muse.core.coordination import _reservations_dir
798 self._run({"grace_period_seconds": 0}, repo)
799 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
800 assert path.exists()
801
802 def test_execute_deletes_expired(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
803 from muse.core.coordination import _reservations_dir
804 self._run({"execute": True, "grace_period_seconds": 0}, repo)
805 out = capsys.readouterr().out
806 data = json.loads(out)
807 assert data["dry_run"] is False
808 assert data["reservations_removed"] >= 1
809 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
810 assert not path.exists()
811
812 def test_json_schema_complete(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
813 self._run({}, repo)
814 out = capsys.readouterr().out
815 data = json.loads(out)
816 required_keys = {
817 "dry_run", "grace_period_seconds", "include_intents",
818 "max_intent_age_seconds", "reservations_removed",
819 "reservations_removed_bytes", "releases_removed",
820 "releases_removed_bytes", "heartbeats_removed",
821 "heartbeats_removed_bytes", "intents_removed",
822 "intents_removed_bytes", "total_removed",
823 "total_removed_bytes", "removed_ids", "duration_ms",
824 }
825 assert required_keys.issubset(data.keys())
826
827 def test_duration_ms_is_float(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
828 self._run({}, repo)
829 out = capsys.readouterr().out
830 data = json.loads(out)
831 assert isinstance(data["duration_ms"], float)
832
833 def test_include_intents_false_preserves_intents(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
834 from muse.core.coordination import create_reservation, create_intent
835 res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"])
836 create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "")
837 self._run({"execute": True, "grace_period_seconds": 0}, repo)
838 out = capsys.readouterr().out
839 data = json.loads(out)
840 assert data["intents_removed"] == 0
841
842 def test_bad_grace_period_exits_1(self, repo: pathlib.Path) -> None:
843 from muse.cli.commands.coord_gc import run as gc_run
844 import argparse, os
845 ns = argparse.Namespace(
846 execute=False,
847 grace_period_seconds=-1,
848 include_intents=False,
849 max_intent_age_seconds=604800,
850 verbose=False,
851 json_out=True,
852 )
853 old = os.getcwd()
854 os.chdir(repo)
855 try:
856 with pytest.raises(SystemExit) as exc:
857 gc_run(ns)
858 finally:
859 os.chdir(old)
860 assert exc.value.code == 1 # ExitCode.USER_ERROR
861
862 def test_text_output_dry_run(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
863 self._run({"fmt": "text", "grace_period_seconds": 0}, repo)
864 out = capsys.readouterr().out
865 assert "DRY RUN" in out
866
867 def test_text_output_execute(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
868 self._run({"fmt": "text", "execute": True, "grace_period_seconds": 0}, repo)
869 out = capsys.readouterr().out
870 assert "GC complete" in out
871
872 def test_verbose_lists_removed_ids(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
873 self._run(
874 {"fmt": "text", "execute": True, "grace_period_seconds": 0, "verbose": True},
875 repo,
876 )
877 out = capsys.readouterr().out
878 assert expired_reservation.reservation_id in out
879
880 def test_nothing_to_collect(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
881 """Active reservation → nothing collected."""
882 self._run({"execute": True, "grace_period_seconds": 300}, repo)
883 out = capsys.readouterr().out
884 data = json.loads(out)
885 assert data["total_removed"] == 0
886
887 def test_fmt_bytes_helper(self) -> None:
888 from muse.cli.commands.coord_gc import _fmt_bytes
889 assert _fmt_bytes(0) == "0 B"
890 assert _fmt_bytes(512) == "512 B"
891 assert _fmt_bytes(1024) == "1.0 KiB"
892 assert _fmt_bytes(1048576) == "1.0 MiB"
893
894
895 # ─────────────────────────────────────────────────────────────────────────────
896 # Integration tests — muse coord list (lifecycle-aware)
897 # ─────────────────────────────────────────────────────────────────────────────
898
899
900 class TestListCoordLifecycle:
901 def _run_json(self, repo: pathlib.Path, extra_args: MsgpackDict | None = None) -> None:
902 from muse.cli.commands.list_coord import run as list_run
903 import argparse, os
904 ns = argparse.Namespace(
905 include_expired=True,
906 kind="all",
907 run_id=None,
908 branch=None,
909 address_glob=None,
910 operation=None,
911 limit=None,
912 summary=False,
913 json_out=True,
914 **(extra_args or {}),
915 )
916 old = os.getcwd()
917 os.chdir(repo)
918 try:
919 list_run(ns)
920 finally:
921 os.chdir(old)
922
923 def test_released_field_present(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
924 self._run_json(repo)
925 out = capsys.readouterr().out
926 data = json.loads(out)
927 entries = {e["reservation_id"]: e for e in data["reservations"]}
928 entry = entries[reservation.reservation_id]
929 assert "released" in entry
930 assert entry["released"] is False
931
932 def test_released_field_true_after_release(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
933 from muse.core.coordination import create_release
934 create_release(repo, reservation.reservation_id, "ag", "completed")
935 self._run_json(repo)
936 out = capsys.readouterr().out
937 data = json.loads(out)
938 entries = {e["reservation_id"]: e for e in data["reservations"]}
939 entry = entries[reservation.reservation_id]
940 assert entry["released"] is True
941 assert entry["is_active"] is False
942
943 def test_effective_expires_at_field(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
944 self._run_json(repo)
945 out = capsys.readouterr().out
946 data = json.loads(out)
947 entry = data["reservations"][0]
948 assert "effective_expires_at" in entry
949 # Parse without error.
950 datetime.datetime.fromisoformat(entry["effective_expires_at"])
951
952 def test_effective_expires_extended_by_heartbeat(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
953 from muse.core.coordination import create_heartbeat
954 create_heartbeat(repo, expired_reservation.reservation_id, "ag", 7200)
955 self._run_json(repo)
956 out = capsys.readouterr().out
957 data = json.loads(out)
958 entries = {e["reservation_id"]: e for e in data["reservations"]}
959 entry = entries[expired_reservation.reservation_id]
960 # effective_expires_at should be in the future.
961 eff = datetime.datetime.fromisoformat(entry["effective_expires_at"])
962 assert eff > _now_utc()
963 assert entry["ttl_remaining_seconds"] > 0
964
965 def test_released_reservations_count_in_json(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
966 from muse.core.coordination import create_release
967 create_release(repo, reservation.reservation_id, "ag", "completed")
968 self._run_json(repo)
969 out = capsys.readouterr().out
970 data = json.loads(out)
971 assert "released_reservations" in data
972 assert data["released_reservations"] >= 1
973
974
975 # ─────────────────────────────────────────────────────────────────────────────
976 # Security tests
977 # ─────────────────────────────────────────────────────────────────────────────
978
979
980 class TestSecurity:
981 def test_release_record_id_injection_rejected(self, repo: pathlib.Path) -> None:
982 from muse.core.coordination import create_release
983 attacks = [
984 "../../etc/passwd",
985 "../releases/legit-id",
986 "a" * 200,
987 "\x00malicious",
988 "' OR 1=1 --",
989 ]
990 for attack in attacks:
991 with pytest.raises((ValueError, FileNotFoundError, OSError)):
992 create_release(repo, attack, "ag", "completed")
993
994 def test_heartbeat_record_id_injection_rejected(self, repo: pathlib.Path) -> None:
995 from muse.core.coordination import create_heartbeat
996 attacks = ["../../etc/passwd", "../heartbeats/x", "not-a-record-id"]
997 for attack in attacks:
998 with pytest.raises(ValueError):
999 create_heartbeat(repo, attack, "ag", 3600)
1000
1001 def test_release_cmd_rejects_traversal(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1002 from muse.cli.commands.release_coord import run as release_run
1003 import argparse, os
1004 ns = argparse.Namespace(
1005 reservation_id="../../etc/passwd",
1006 run_id="ag",
1007 reason="completed",
1008 all_for_run=None,
1009 json_out=True,
1010 )
1011 old = os.getcwd()
1012 os.chdir(repo)
1013 try:
1014 with pytest.raises(SystemExit) as exc:
1015 release_run(ns)
1016 finally:
1017 os.chdir(old)
1018 assert exc.value.code == 1 # ExitCode.USER_ERROR
1019
1020 def test_heartbeat_cmd_rejects_traversal(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1021 from muse.cli.commands.heartbeat_coord import run as hb_run
1022 import argparse, os
1023 ns = argparse.Namespace(
1024 reservation_id="../../etc/shadow",
1025 run_id="ag",
1026 extension_seconds=3600,
1027 json_out=True,
1028 )
1029 old = os.getcwd()
1030 os.chdir(repo)
1031 try:
1032 with pytest.raises(SystemExit) as exc:
1033 hb_run(ns)
1034 finally:
1035 os.chdir(old)
1036 assert exc.value.code == 1 # ExitCode.USER_ERROR
1037
1038 def test_gc_does_not_escape_coord_dir(self, repo: pathlib.Path) -> None:
1039 """GC only scans known subdirs — no user input used for path construction."""
1040 from muse.core.coordination import run_coord_gc
1041 # Create a file outside .muse/coordination/ — GC must not touch it.
1042 sentinel = repo / "DO_NOT_DELETE.txt"
1043 sentinel.write_text("safe")
1044 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
1045 assert sentinel.exists()
1046
1047
1048 # ─────────────────────────────────────────────────────────────────────────────
1049 # Stress tests
1050 # ─────────────────────────────────────────────────────────────────────────────
1051
1052
1053 class TestStress:
1054 def test_batch_release_200_reservations_under_2s(self, repo: pathlib.Path) -> None:
1055 """Releasing 200 reservations via --all-for-run completes in < 2 s."""
1056 from muse.core.coordination import create_reservation, load_released_ids
1057 from muse.cli.commands.release_coord import run as release_run
1058 import argparse, os
1059
1060 N = 200
1061 for i in range(N):
1062 create_reservation(
1063 repo,
1064 run_id="stress-agent",
1065 branch="feat/stress",
1066 addresses=[f"src/mod{i}.py::fn{i}"],
1067 )
1068
1069 ns = argparse.Namespace(
1070 reservation_id=None,
1071 run_id="controller",
1072 reason="completed",
1073 all_for_run="stress-agent",
1074 json_out=True,
1075 )
1076 old = os.getcwd()
1077 os.chdir(repo)
1078 t0 = time.monotonic()
1079 try:
1080 release_run(ns)
1081 except SystemExit:
1082 pass
1083 finally:
1084 os.chdir(old)
1085 elapsed = time.monotonic() - t0
1086
1087 released = load_released_ids(repo)
1088 assert len(released) == N
1089 assert elapsed < 2.0, f"batch release took {elapsed:.2f}s — too slow"
1090
1091 def test_gc_500_expired_reservations_under_3s(self, repo: pathlib.Path) -> None:
1092 """GC collects 500 expired reservations in < 3 s."""
1093 from muse.core.coordination import (
1094 _reservations_dir, _ensure_coord_dirs, run_coord_gc,
1095 )
1096
1097 _ensure_coord_dirs(repo)
1098 res_dir = _reservations_dir(repo)
1099 now = _now_utc()
1100 expired_at = (now - datetime.timedelta(hours=2)).isoformat()
1101
1102 N = 500
1103 for i in range(N):
1104 rid = _new_id()
1105 data = {
1106 "schema_version": "0.0.0",
1107 "reservation_id": rid,
1108 "run_id": f"ag-{i}",
1109 "branch": "feat/x",
1110 "addresses": [f"f{i}.py::fn"],
1111 "created_at": (now - datetime.timedelta(hours=3)).isoformat(),
1112 "expires_at": expired_at,
1113 "operation": None,
1114 }
1115 (res_dir / f"{rid}.json").write_text(json.dumps(data))
1116
1117 t0 = time.monotonic()
1118 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
1119 elapsed = time.monotonic() - t0
1120
1121 assert result.reservations_removed == N
1122 assert elapsed < 3.0, f"GC took {elapsed:.2f}s — too slow"
1123
1124 def test_load_heartbeat_map_100_entries_under_500ms(self, repo: pathlib.Path) -> None:
1125 """Loading 100 heartbeat files completes in < 500 ms."""
1126 from muse.core.coordination import (
1127 _ensure_coord_dirs, _heartbeats_dir, load_heartbeat_map,
1128 )
1129
1130 _ensure_coord_dirs(repo)
1131 hb_dir = _heartbeats_dir(repo)
1132 now = _now_utc()
1133
1134 N = 100
1135 for i in range(N):
1136 rid = _new_id()
1137 data = {
1138 "schema_version": "0.0.0",
1139 "reservation_id": rid,
1140 "run_id": f"ag-{i}",
1141 "last_beat_at": now.isoformat(),
1142 "extended_expires_at": (now + datetime.timedelta(hours=1)).isoformat(),
1143 }
1144 (hb_dir / f"{rid}.json").write_text(json.dumps(data))
1145
1146 t0 = time.monotonic()
1147 hb_map = load_heartbeat_map(repo)
1148 elapsed = time.monotonic() - t0
1149
1150 assert len(hb_map) == N
1151 assert elapsed < 0.5, f"load_heartbeat_map took {elapsed:.3f}s — too slow"
1152
1153 def test_round_trip_correctness_under_concurrent_writes(self, repo: pathlib.Path) -> None:
1154 """release → heartbeat attempt → gc correctness under rapid writes."""
1155 from muse.core.coordination import (
1156 active_reservations, create_heartbeat, create_release,
1157 create_reservation, load_released_ids, run_coord_gc,
1158 )
1159
1160 N = 50
1161 reservations = [
1162 create_reservation(
1163 repo, run_id=f"ag-{i}", branch="b", addresses=[f"f{i}.py::fn"]
1164 )
1165 for i in range(N)
1166 ]
1167
1168 # Release half.
1169 for res in reservations[:N // 2]:
1170 create_release(repo, res.reservation_id, res.run_id, "completed")
1171
1172 # Heartbeat the other half.
1173 for res in reservations[N // 2:]:
1174 create_heartbeat(repo, res.reservation_id, res.run_id, 7200)
1175
1176 active = active_reservations(repo)
1177 active_ids = {r.reservation_id for r in active}
1178
1179 # Released half must not be active.
1180 for res in reservations[:N // 2]:
1181 assert res.reservation_id not in active_ids
1182
1183 # Heartbeat half must be active.
1184 for res in reservations[N // 2:]:
1185 assert res.reservation_id in active_ids
1186
1187 # GC with large grace period → nothing removed yet.
1188 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=9999)
1189 assert result.reservations_removed == 0
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago