gabriel / muse public
test_cmd_coord_lifecycle.py python
1,182 lines 52.1 KB
Raw
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 72 days ago
1 """Tests for ``muse coord release``, ``muse coord heartbeat``, and ``muse coord gc``.
2
3 Coverage matrix
4 ---------------
5 Unit
6 ~~~~
7 * :func:`muse.core.coordination.create_release` — write, idempotency, UUID guard
8 * :func:`muse.core.coordination.load_all_releases` — load, corrupt-file tolerance
9 * :func:`muse.core.coordination.load_released_ids` — efficiency path (stems only)
10 * :func:`muse.core.coordination.create_heartbeat` — write, atomic overwrite
11 * :func:`muse.core.coordination.load_heartbeat_map` — load, corrupt-file tolerance
12 * :func:`muse.core.coordination.run_coord_gc` — dry-run, execute, grace period, orphans
13 * :func:`muse.core.coordination.active_reservations` — lifecycle-aware liveness
14
15 Integration
16 ~~~~~~~~~~~
17 * ``muse coord release`` — single, --all-for-run, idempotent, not-found, bad args
18 * ``muse coord heartbeat`` — extends TTL, released guard, not-found, bad args
19 * ``muse coord gc`` — dry-run, execute, --include-intents, --verbose, JSON output
20 * ``muse coord list`` — lifecycle-aware fields (released, effective_expires_at)
21
22 Security
23 ~~~~~~~~
24 * UUID injection in reservation_id → rejected before file I/O
25 * Path traversal attempts via reservation_id → ValueError from _validate_reservation_id
26 * ANSI escape sequences in run_id / reservation_id not written to files
27 * GC does not follow symlinks or traverse outside .muse/coordination/
28
29 Stress
30 ~~~~~~
31 * 200 reservations → release all via --all-for-run in < 2 s
32 * 500 expired reservations → GC collects all in < 3 s
33 * 100 heartbeats → load_heartbeat_map in < 0.5 s
34 * release + heartbeat + gc round-trip correctness under concurrent-ish writes
35 """
36
37 from __future__ import annotations
38
39 import datetime
40 import json
41 import pathlib
42 import time
43 import uuid
44
45 import pytest
46
47 from muse.core._types import MsgpackDict
48 from muse.core.coordination import Reservation
49
50 # ── Helpers ───────────────────────────────────────────────────────────────────
51
52
53 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
54 """Initialise a minimal muse repo (just the .muse dir + HEAD)."""
55 muse_dir = tmp_path / ".muse"
56 muse_dir.mkdir()
57 (muse_dir / "HEAD").write_text("ref: refs/heads/main\n")
58 return tmp_path
59
60
61 def _now_utc() -> datetime.datetime:
62 return datetime.datetime.now(datetime.timezone.utc)
63
64
65 def _dt_iso(dt: datetime.datetime) -> str:
66 return dt.isoformat()
67
68
69 # ── Fixtures ──────────────────────────────────────────────────────────────────
70
71
72 @pytest.fixture()
73 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
74 return _make_repo(tmp_path)
75
76
77 @pytest.fixture()
78 def reservation(repo: pathlib.Path) -> Reservation:
79 """A fresh active reservation in *repo*."""
80 from muse.core.coordination import create_reservation
81 return create_reservation(
82 repo,
83 run_id="agent-test",
84 branch="feat/test",
85 addresses=["src/billing.py::compute_total"],
86 ttl_seconds=3600,
87 )
88
89
90 @pytest.fixture()
91 def expired_reservation(repo: pathlib.Path) -> Reservation:
92 """A reservation whose TTL has already passed."""
93 from muse.core.coordination import create_reservation
94 res = create_reservation(
95 repo,
96 run_id="agent-expired",
97 branch="feat/expired",
98 addresses=["src/old.py::dead_code"],
99 ttl_seconds=1,
100 )
101 # Back-date expires_at on disk so it's expired.
102 import json as _json
103 from muse.core.coordination import _reservations_dir
104 path = _reservations_dir(repo) / f"{res.reservation_id}.json"
105 data = _json.loads(path.read_text())
106 data["expires_at"] = (_now_utc() - datetime.timedelta(seconds=10)).isoformat()
107 path.write_text(_json.dumps(data))
108 res.expires_at = _now_utc() - datetime.timedelta(seconds=10)
109 return res
110
111
112 # ─────────────────────────────────────────────────────────────────────────────
113 # Unit tests — coordination.py primitives
114 # ─────────────────────────────────────────────────────────────────────────────
115
116
117 class TestCreateRelease:
118 def test_creates_tombstone_file(self, repo: pathlib.Path, reservation: Reservation) -> None:
119 from muse.core.coordination import _releases_dir, create_release
120 rel = create_release(repo, reservation.reservation_id, "agent-test", "completed")
121 path = _releases_dir(repo) / f"{reservation.reservation_id}.json"
122 assert path.exists()
123 assert rel.reason == "completed"
124
125 def test_roundtrip_from_dict(self, repo: pathlib.Path, reservation: Reservation) -> None:
126 from muse.core.coordination import Release, create_release
127 rel = create_release(repo, reservation.reservation_id, "agent-test", "cancelled")
128 data = rel.to_dict()
129 rel2 = Release.from_dict(data)
130 assert rel2.reservation_id == rel.reservation_id
131 assert rel2.reason == "cancelled"
132
133 def test_raises_file_exists_on_double_release(self, repo: pathlib.Path, reservation: Reservation) -> None:
134 from muse.core.coordination import create_release
135 create_release(repo, reservation.reservation_id, "agent-test", "completed")
136 with pytest.raises(FileExistsError, match="already released"):
137 create_release(repo, reservation.reservation_id, "agent-test", "cancelled")
138
139 def test_rejects_invalid_uuid(self, repo: pathlib.Path) -> None:
140 from muse.core.coordination import create_release
141 with pytest.raises(ValueError, match="valid UUID"):
142 create_release(repo, "../../etc/passwd", "agent", "completed")
143
144 def test_rejects_invalid_reason(self, repo: pathlib.Path, reservation: Reservation) -> None:
145 from muse.core.coordination import create_release
146 with pytest.raises(ValueError, match="reason must be one of"):
147 create_release(repo, reservation.reservation_id, "agent", "unknown-reason")
148
149 def test_valid_reasons(self, repo: pathlib.Path) -> None:
150 from muse.core.coordination import create_reservation, create_release
151 for reason in ("completed", "cancelled", "superseded"):
152 res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"])
153 rel = create_release(repo, res.reservation_id, "ag", reason)
154 assert rel.reason == reason
155
156 def test_path_containment_check(self, repo: pathlib.Path, reservation: Reservation) -> None:
157 """UUID validation prevents path traversal before containment check."""
158 from muse.core.coordination import create_release
159 bad_ids = [
160 "../../../etc",
161 "a/b/c",
162 "not-a-uuid",
163 "",
164 ]
165 for bad_id in bad_ids:
166 with pytest.raises(ValueError):
167 create_release(repo, bad_id, "agent", "completed")
168
169
170 class TestLoadReleases:
171 def test_empty_dir(self, repo: pathlib.Path) -> None:
172 from muse.core.coordination import load_all_releases, load_released_ids
173 assert load_all_releases(repo) == []
174 assert load_released_ids(repo) == frozenset()
175
176 def test_missing_dir_returns_empty(self, tmp_path: pathlib.Path) -> None:
177 repo = _make_repo(tmp_path)
178 from muse.core.coordination import load_all_releases, load_released_ids
179 assert load_all_releases(repo) == []
180 assert load_released_ids(repo) == frozenset()
181
182 def test_load_released_ids_uses_stems(self, repo: pathlib.Path, reservation: Reservation) -> None:
183 from muse.core.coordination import create_release, load_released_ids
184 create_release(repo, reservation.reservation_id, "ag", "completed")
185 ids = load_released_ids(repo)
186 assert reservation.reservation_id in ids
187
188 def test_corrupt_file_skipped(self, repo: pathlib.Path, reservation: Reservation) -> None:
189 from muse.core.coordination import _releases_dir, create_release, load_all_releases
190 create_release(repo, reservation.reservation_id, "ag", "completed")
191 # Write a corrupt file alongside.
192 (_releases_dir(repo) / "corrupt.json").write_text("not-json")
193 releases = load_all_releases(repo)
194 # Only the valid one is loaded; corrupt one is silently skipped.
195 assert len(releases) == 1
196
197 def test_load_released_ids_includes_corrupt_stem(self, repo: pathlib.Path) -> None:
198 """load_released_ids uses stems only — corrupt JSON doesn't block it."""
199 from muse.core.coordination import _releases_dir, _ensure_coord_dirs, load_released_ids
200 _ensure_coord_dirs(repo)
201 fake_id = str(uuid.uuid4())
202 (_releases_dir(repo) / f"{fake_id}.json").write_text("not-json")
203 ids = load_released_ids(repo)
204 assert fake_id in ids
205
206
207 class TestCreateHeartbeat:
208 def test_creates_heartbeat_file(self, repo: pathlib.Path, reservation: Reservation) -> None:
209 from muse.core.coordination import _heartbeats_dir, create_heartbeat
210 hb = create_heartbeat(repo, reservation.reservation_id, "agent-test", 3600)
211 path = _heartbeats_dir(repo) / f"{reservation.reservation_id}.json"
212 assert path.exists()
213 assert hb.extended_expires_at > _now_utc()
214
215 def test_atomic_overwrite(self, repo: pathlib.Path, reservation: Reservation) -> None:
216 from muse.core.coordination import create_heartbeat
217 hb1 = create_heartbeat(repo, reservation.reservation_id, "ag", 600)
218 hb2 = create_heartbeat(repo, reservation.reservation_id, "ag", 7200)
219 # Second heartbeat extends further.
220 assert hb2.extended_expires_at > hb1.extended_expires_at
221
222 def test_rejects_invalid_uuid(self, repo: pathlib.Path) -> None:
223 from muse.core.coordination import create_heartbeat
224 with pytest.raises(ValueError, match="valid UUID"):
225 create_heartbeat(repo, "not/a-uuid", "ag", 3600)
226
227 def test_rejects_non_positive_extension(self, repo: pathlib.Path, reservation: Reservation) -> None:
228 from muse.core.coordination import create_heartbeat
229 with pytest.raises(ValueError, match="extension_seconds"):
230 create_heartbeat(repo, reservation.reservation_id, "ag", 0)
231 with pytest.raises(ValueError, match="extension_seconds"):
232 create_heartbeat(repo, reservation.reservation_id, "ag", -1)
233
234 def test_roundtrip(self, repo: pathlib.Path, reservation: Reservation) -> None:
235 from muse.core.coordination import Heartbeat, create_heartbeat
236 hb = create_heartbeat(repo, reservation.reservation_id, "ag", 1800)
237 data = hb.to_dict()
238 hb2 = Heartbeat.from_dict(data)
239 assert hb2.reservation_id == hb.reservation_id
240 assert hb2.run_id == hb.run_id
241
242
243 class TestLoadHeartbeatMap:
244 def test_empty(self, repo: pathlib.Path) -> None:
245 from muse.core.coordination import load_heartbeat_map
246 assert load_heartbeat_map(repo) == {}
247
248 def test_missing_dir_returns_empty(self, tmp_path: pathlib.Path) -> None:
249 repo = _make_repo(tmp_path)
250 from muse.core.coordination import load_heartbeat_map
251 assert load_heartbeat_map(repo) == {}
252
253 def test_load_single(self, repo: pathlib.Path, reservation: Reservation) -> None:
254 from muse.core.coordination import create_heartbeat, load_heartbeat_map
255 create_heartbeat(repo, reservation.reservation_id, "ag", 3600)
256 hb_map = load_heartbeat_map(repo)
257 assert reservation.reservation_id in hb_map
258
259 def test_corrupt_file_skipped(self, repo: pathlib.Path, reservation: Reservation) -> None:
260 from muse.core.coordination import (
261 _heartbeats_dir, create_heartbeat, load_heartbeat_map,
262 )
263 create_heartbeat(repo, reservation.reservation_id, "ag", 3600)
264 (_heartbeats_dir(repo) / "corrupt.json").write_text("bad-json")
265 hb_map = load_heartbeat_map(repo)
266 assert len(hb_map) == 1
267
268
269 class TestActiveReservations:
270 def test_active_reservation_included(self, repo: pathlib.Path, reservation: Reservation) -> None:
271 from muse.core.coordination import active_reservations
272 active = active_reservations(repo)
273 ids = [r.reservation_id for r in active]
274 assert reservation.reservation_id in ids
275
276 def test_released_excluded(self, repo: pathlib.Path, reservation: Reservation) -> None:
277 from muse.core.coordination import active_reservations, create_release
278 create_release(repo, reservation.reservation_id, "ag", "completed")
279 active = active_reservations(repo)
280 ids = [r.reservation_id for r in active]
281 assert reservation.reservation_id not in ids
282
283 def test_expired_excluded(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
284 from muse.core.coordination import active_reservations
285 active = active_reservations(repo)
286 ids = [r.reservation_id for r in active]
287 assert expired_reservation.reservation_id not in ids
288
289 def test_heartbeat_extends_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
290 from muse.core.coordination import active_reservations, create_heartbeat
291 # Heartbeat extends the expired reservation.
292 create_heartbeat(repo, expired_reservation.reservation_id, "ag", 3600)
293 active = active_reservations(repo)
294 ids = [r.reservation_id for r in active]
295 assert expired_reservation.reservation_id in ids
296
297 def test_released_with_heartbeat_excluded(self, repo: pathlib.Path, reservation: Reservation) -> None:
298 """A released reservation stays excluded even if a heartbeat exists."""
299 from muse.core.coordination import (
300 active_reservations, create_heartbeat, create_release,
301 )
302 create_release(repo, reservation.reservation_id, "ag", "completed")
303 create_heartbeat(repo, reservation.reservation_id, "ag", 7200)
304 active = active_reservations(repo)
305 ids = [r.reservation_id for r in active]
306 assert reservation.reservation_id not in ids
307
308
309 class TestRunCoordGc:
310 def test_dry_run_removes_nothing(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
311 from muse.core.coordination import _reservations_dir, run_coord_gc
312 result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0)
313 assert result.dry_run is True
314 assert result.reservations_removed >= 1
315 # File still exists after dry run.
316 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
317 assert path.exists()
318
319 def test_execute_removes_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
320 from muse.core.coordination import _reservations_dir, run_coord_gc
321 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
322 assert result.reservations_removed >= 1
323 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
324 assert not path.exists()
325
326 def test_active_reservation_not_collected(self, repo: pathlib.Path, reservation: Reservation) -> None:
327 from muse.core.coordination import _reservations_dir, run_coord_gc
328 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
329 path = _reservations_dir(repo) / f"{reservation.reservation_id}.json"
330 assert path.exists()
331
332 def test_grace_period_protects_recently_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
333 """With a large grace period, the recently-expired record is not collected."""
334 from muse.core.coordination import _reservations_dir, run_coord_gc
335 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=9999)
336 assert result.reservations_removed == 0
337 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
338 assert path.exists()
339
340 def test_removes_release_tombstone_with_reservation(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
341 from muse.core.coordination import (
342 _releases_dir, create_release, run_coord_gc,
343 )
344 create_release(repo, expired_reservation.reservation_id, "ag", "completed")
345 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
346 path = _releases_dir(repo) / f"{expired_reservation.reservation_id}.json"
347 assert not path.exists()
348
349 def test_removes_heartbeat_with_reservation(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
350 from muse.core.coordination import (
351 _heartbeats_dir, create_heartbeat, run_coord_gc,
352 )
353 # Need to back-date the heartbeat too, so effective expiry is still in the past.
354 hb = create_heartbeat(repo, expired_reservation.reservation_id, "ag", 1)
355 # Back-date extended_expires_at.
356 import json as _json
357 path = _heartbeats_dir(repo) / f"{expired_reservation.reservation_id}.json"
358 data = _json.loads(path.read_text())
359 data["extended_expires_at"] = (_now_utc() - datetime.timedelta(seconds=5)).isoformat()
360 path.write_text(_json.dumps(data))
361 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
362 assert not path.exists()
363
364 def test_removes_orphaned_release(self, repo: pathlib.Path) -> None:
365 """Release file without a matching reservation is an orphan — collect it."""
366 from muse.core.coordination import (
367 _ensure_coord_dirs, _releases_dir, run_coord_gc,
368 )
369 _ensure_coord_dirs(repo)
370 orphan_id = str(uuid.uuid4())
371 orphan_path = _releases_dir(repo) / f"{orphan_id}.json"
372 orphan_path.write_text(json.dumps({"reservation_id": orphan_id, "reason": "completed"}))
373 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
374 assert result.releases_removed >= 1
375 assert not orphan_path.exists()
376
377 def test_removes_orphaned_heartbeat(self, repo: pathlib.Path) -> None:
378 """Heartbeat file without a matching reservation is an orphan — collect it."""
379 from muse.core.coordination import (
380 _ensure_coord_dirs, _heartbeats_dir, run_coord_gc,
381 )
382 _ensure_coord_dirs(repo)
383 orphan_id = str(uuid.uuid4())
384 orphan_path = _heartbeats_dir(repo) / f"{orphan_id}.json"
385 orphan_path.write_text(json.dumps({"reservation_id": orphan_id}))
386 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
387 assert result.heartbeats_removed >= 1
388 assert not orphan_path.exists()
389
390 def test_include_intents_off_by_default(self, repo: pathlib.Path) -> None:
391 from muse.core.coordination import create_reservation, create_intent, run_coord_gc
392 res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"])
393 create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "old→new")
394 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
395 assert result.intents_removed == 0
396
397 def test_include_intents_removes_old(self, repo: pathlib.Path) -> None:
398 from muse.core.coordination import (
399 _intents_dir, create_intent, create_reservation, run_coord_gc,
400 )
401 res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"])
402 create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "old→new")
403 # Back-date the intent.
404 intent_path = next(_intents_dir(repo).glob("*.json"))
405 import json as _json
406 data = _json.loads(intent_path.read_text())
407 data["created_at"] = (_now_utc() - datetime.timedelta(days=8)).isoformat()
408 intent_path.write_text(_json.dumps(data))
409 result = run_coord_gc(
410 repo, dry_run=False, grace_period_seconds=0,
411 include_intents=True, max_intent_age_seconds=3600,
412 )
413 assert result.intents_removed == 1
414 assert not intent_path.exists()
415
416 def test_result_totals_match_parts(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
417 from muse.core.coordination import run_coord_gc
418 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
419 assert result.total_removed == (
420 result.reservations_removed
421 + result.releases_removed
422 + result.heartbeats_removed
423 + result.intents_removed
424 )
425 assert result.total_removed_bytes == (
426 result.reservations_removed_bytes
427 + result.releases_removed_bytes
428 + result.heartbeats_removed_bytes
429 + result.intents_removed_bytes
430 )
431
432 def test_elapsed_seconds_recorded(self, repo: pathlib.Path) -> None:
433 from muse.core.coordination import run_coord_gc
434 result = run_coord_gc(repo, dry_run=True)
435 assert result.elapsed_seconds >= 0
436
437 def test_removed_ids_populated(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
438 from muse.core.coordination import run_coord_gc
439 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
440 assert expired_reservation.reservation_id in result.removed_ids
441
442
443 # ─────────────────────────────────────────────────────────────────────────────
444 # Integration tests — muse coord release
445 # ─────────────────────────────────────────────────────────────────────────────
446
447
448 class TestReleaseCmdSingle:
449 def _run(self, args: MsgpackDict, repo: pathlib.Path) -> int:
450 from muse.cli.commands.release_coord import run as release_run
451 import argparse, os
452 ns = argparse.Namespace(
453 reservation_id=args.get("reservation_id"),
454 run_id=args.get("run_id", "agent-42"),
455 reason=args.get("reason", "completed"),
456 all_for_run=None,
457 fmt=args.get("fmt", "json"),
458 )
459 old = os.getcwd()
460 os.chdir(repo)
461 try:
462 release_run(ns)
463 return 0
464 except SystemExit as exc:
465 return exc.code
466 finally:
467 os.chdir(old)
468
469 def test_release_success(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
470 code = self._run({"reservation_id": reservation.reservation_id}, repo)
471 assert code == 0
472 out = capsys.readouterr().out
473 data = json.loads(out)
474 assert data["status"] == "released"
475 assert data["reservation_id"] == reservation.reservation_id
476 assert data["reason"] == "completed"
477 assert isinstance(data["elapsed_seconds"], float)
478
479 def test_release_reason_cancelled(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
480 code = self._run(
481 {"reservation_id": reservation.reservation_id, "reason": "cancelled"},
482 repo,
483 )
484 assert code == 0
485 out = capsys.readouterr().out
486 data = json.loads(out)
487 assert data["reason"] == "cancelled"
488
489 def test_release_already_released_is_idempotent(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
490 from muse.core.coordination import create_release
491 create_release(repo, reservation.reservation_id, "ag", "completed")
492 code = self._run({"reservation_id": reservation.reservation_id}, repo)
493 assert code == 0
494 out = capsys.readouterr().out
495 data = json.loads(out)
496 assert data["status"] == "already_released"
497
498 def test_release_not_found_exits_4(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
499 fake_id = str(uuid.uuid4())
500 code = self._run({"reservation_id": fake_id}, repo)
501 assert code == 4 # ExitCode.NOT_FOUND
502 out = capsys.readouterr().out
503 data = json.loads(out)
504 assert data["status"] == "not_found"
505
506 def test_bad_uuid_exits_1(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
507 code = self._run({"reservation_id": "not-a-uuid"}, repo)
508 assert code == 1 # ExitCode.USER_ERROR
509
510 def test_text_output(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
511 from muse.cli.commands.release_coord import run as release_run
512 import argparse, os
513 ns = argparse.Namespace(
514 reservation_id=reservation.reservation_id,
515 run_id="agent-42",
516 reason="completed",
517 all_for_run=None,
518 fmt="text",
519 )
520 old = os.getcwd()
521 os.chdir(repo)
522 try:
523 release_run(ns)
524 except SystemExit:
525 pass
526 finally:
527 os.chdir(old)
528 out = capsys.readouterr().out
529 assert "released" in out
530 assert reservation.reservation_id[:8] in out
531
532
533 class TestReleaseCmdBatch:
534 def _run_batch(self, repo: pathlib.Path, all_for_run: str, run_id: str = "controller") -> None:
535 from muse.cli.commands.release_coord import run as release_run
536 import argparse, os
537 ns = argparse.Namespace(
538 reservation_id=None,
539 run_id=run_id,
540 reason="completed",
541 all_for_run=all_for_run,
542 fmt="json",
543 )
544 old = os.getcwd()
545 os.chdir(repo)
546 try:
547 release_run(ns)
548 except SystemExit:
549 pass
550 finally:
551 os.chdir(old)
552
553 def test_batch_releases_all_for_run(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
554 from muse.core.coordination import create_reservation, load_released_ids
555 ids = []
556 for i in range(5):
557 res = create_reservation(
558 repo, run_id="batch-agent", branch="b", addresses=[f"f.py::fn{i}"]
559 )
560 ids.append(res.reservation_id)
561 # One extra reservation for a different run.
562 other = create_reservation(
563 repo, run_id="other-agent", branch="b", addresses=["g.py::fn"]
564 )
565 self._run_batch(repo, "batch-agent")
566 released = load_released_ids(repo)
567 for rid in ids:
568 assert rid in released
569 # Other agent's reservation not released.
570 assert other.reservation_id not in released
571
572 def test_batch_skips_already_released(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
573 from muse.core.coordination import create_release
574 create_release(repo, reservation.reservation_id, "ag", "completed")
575 self._run_batch(repo, "agent-test")
576 out = capsys.readouterr().out
577 data = json.loads(out)
578 assert data["skipped_already_released"] >= 1
579
580 def test_cannot_combine_id_and_all_for_run(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
581 from muse.cli.commands.release_coord import run as release_run
582 import argparse, os
583 ns = argparse.Namespace(
584 reservation_id=reservation.reservation_id,
585 run_id="ag",
586 reason="completed",
587 all_for_run="ag",
588 fmt="json",
589 )
590 old = os.getcwd()
591 os.chdir(repo)
592 try:
593 with pytest.raises(SystemExit) as exc:
594 release_run(ns)
595 finally:
596 os.chdir(old)
597 assert exc.value.code == 1 # ExitCode.USER_ERROR
598
599 def test_must_specify_id_or_all_for_run(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
600 from muse.cli.commands.release_coord import run as release_run
601 import argparse, os
602 ns = argparse.Namespace(
603 reservation_id=None,
604 run_id="ag",
605 reason="completed",
606 all_for_run=None,
607 fmt="json",
608 )
609 old = os.getcwd()
610 os.chdir(repo)
611 try:
612 with pytest.raises(SystemExit) as exc:
613 release_run(ns)
614 finally:
615 os.chdir(old)
616 assert exc.value.code == 1 # ExitCode.USER_ERROR
617
618
619 # ─────────────────────────────────────────────────────────────────────────────
620 # Integration tests — muse coord heartbeat
621 # ─────────────────────────────────────────────────────────────────────────────
622
623
624 class TestHeartbeatCmd:
625 def _run(self, args: MsgpackDict, repo: pathlib.Path) -> int:
626 from muse.cli.commands.heartbeat_coord import run as hb_run
627 import argparse, os
628 ns = argparse.Namespace(
629 reservation_id=args.get("reservation_id"),
630 run_id=args.get("run_id", "agent-42"),
631 extension_seconds=args.get("extension_seconds", 3600),
632 fmt=args.get("fmt", "json"),
633 )
634 old = os.getcwd()
635 os.chdir(repo)
636 try:
637 with pytest.raises(SystemExit) as exc:
638 hb_run(ns)
639 return exc.value.code
640 except SystemExit as exc:
641 return exc.code
642 finally:
643 os.chdir(old)
644
645 def _run_no_exit(self, args: MsgpackDict, repo: pathlib.Path) -> None:
646 from muse.cli.commands.heartbeat_coord import run as hb_run
647 import argparse, os
648 ns = argparse.Namespace(
649 reservation_id=args.get("reservation_id"),
650 run_id=args.get("run_id", "agent-42"),
651 extension_seconds=args.get("extension_seconds", 3600),
652 fmt=args.get("fmt", "json"),
653 )
654 old = os.getcwd()
655 os.chdir(repo)
656 try:
657 hb_run(ns)
658 except SystemExit:
659 pass
660 finally:
661 os.chdir(old)
662
663 def test_heartbeat_success(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
664 self._run_no_exit({"reservation_id": reservation.reservation_id}, repo)
665 out = capsys.readouterr().out
666 data = json.loads(out)
667 assert data["status"] == "ok"
668 assert data["reservation_id"] == reservation.reservation_id
669 assert data["ttl_extended_seconds"] == 3600
670 assert isinstance(data["elapsed_seconds"], float)
671
672 def test_heartbeat_extends_further_on_repeat(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
673 from muse.core.coordination import load_heartbeat_map
674 self._run_no_exit(
675 {"reservation_id": reservation.reservation_id, "extension_seconds": 600},
676 repo,
677 )
678 hb1 = load_heartbeat_map(repo)[reservation.reservation_id]
679 self._run_no_exit(
680 {"reservation_id": reservation.reservation_id, "extension_seconds": 7200},
681 repo,
682 )
683 hb2 = load_heartbeat_map(repo)[reservation.reservation_id]
684 assert hb2.extended_expires_at > hb1.extended_expires_at
685
686 def test_heartbeat_makes_expired_active(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
687 from muse.core.coordination import active_reservations
688 # Before heartbeat, expired.
689 assert expired_reservation.reservation_id not in [
690 r.reservation_id for r in active_reservations(repo)
691 ]
692 self._run_no_exit(
693 {"reservation_id": expired_reservation.reservation_id},
694 repo,
695 )
696 # After heartbeat, active.
697 assert expired_reservation.reservation_id in [
698 r.reservation_id for r in active_reservations(repo)
699 ]
700
701 def test_heartbeat_refused_for_released(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
702 from muse.core.coordination import create_release
703 create_release(repo, reservation.reservation_id, "ag", "completed")
704 code = self._run({"reservation_id": reservation.reservation_id}, repo)
705 assert code == 1
706 out = capsys.readouterr().out
707 data = json.loads(out)
708 assert data["status"] == "already_released"
709
710 def test_heartbeat_not_found_exits_4(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
711 code = self._run({"reservation_id": str(uuid.uuid4())}, repo)
712 assert code == 4 # ExitCode.NOT_FOUND
713 out = capsys.readouterr().out
714 data = json.loads(out)
715 assert data["status"] == "not_found"
716
717 def test_bad_uuid_exits_1(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
718 code = self._run({"reservation_id": "bad//id"}, repo)
719 assert code == 1 # ExitCode.USER_ERROR
720
721 def test_negative_extension_exits_1(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
722 code = self._run(
723 {"reservation_id": reservation.reservation_id, "extension_seconds": -1},
724 repo,
725 )
726 assert code == 1 # ExitCode.USER_ERROR
727
728 def test_zero_extension_exits_1(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
729 code = self._run(
730 {"reservation_id": reservation.reservation_id, "extension_seconds": 0},
731 repo,
732 )
733 assert code == 1 # ExitCode.USER_ERROR
734
735 def test_text_output(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
736 from muse.cli.commands.heartbeat_coord import run as hb_run
737 import argparse, os
738 ns = argparse.Namespace(
739 reservation_id=reservation.reservation_id,
740 run_id="agent-42",
741 extension_seconds=3600,
742 fmt="text",
743 )
744 old = os.getcwd()
745 os.chdir(repo)
746 try:
747 hb_run(ns)
748 except SystemExit:
749 pass
750 finally:
751 os.chdir(old)
752 out = capsys.readouterr().out
753 assert "heartbeat" in out
754 assert reservation.reservation_id[:8] in out
755
756
757 # ─────────────────────────────────────────────────────────────────────────────
758 # Integration tests — muse coord gc
759 # ─────────────────────────────────────────────────────────────────────────────
760
761
762 class TestCoordGcCmd:
763 def _run(self, args: MsgpackDict, repo: pathlib.Path) -> None:
764 from muse.cli.commands.coord_gc import run as gc_run
765 import argparse, os
766 ns = argparse.Namespace(
767 execute=args.get("execute", False),
768 grace_period_seconds=args.get("grace_period_seconds", 300),
769 include_intents=args.get("include_intents", False),
770 max_intent_age_seconds=args.get("max_intent_age_seconds", 604800),
771 verbose=args.get("verbose", False),
772 fmt=args.get("fmt", "json"),
773 )
774 old = os.getcwd()
775 os.chdir(repo)
776 try:
777 gc_run(ns)
778 except SystemExit:
779 pass
780 finally:
781 os.chdir(old)
782
783 def test_dry_run_by_default(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
784 self._run({}, repo)
785 out = capsys.readouterr().out
786 data = json.loads(out)
787 assert data["dry_run"] is True
788
789 def test_dry_run_does_not_delete(self, repo: pathlib.Path, expired_reservation: Reservation) -> None:
790 from muse.core.coordination import _reservations_dir
791 self._run({"grace_period_seconds": 0}, repo)
792 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
793 assert path.exists()
794
795 def test_execute_deletes_expired(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
796 from muse.core.coordination import _reservations_dir
797 self._run({"execute": True, "grace_period_seconds": 0}, repo)
798 out = capsys.readouterr().out
799 data = json.loads(out)
800 assert data["dry_run"] is False
801 assert data["reservations_removed"] >= 1
802 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json"
803 assert not path.exists()
804
805 def test_json_schema_complete(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
806 self._run({}, repo)
807 out = capsys.readouterr().out
808 data = json.loads(out)
809 required_keys = {
810 "dry_run", "grace_period_seconds", "include_intents",
811 "max_intent_age_seconds", "reservations_removed",
812 "reservations_removed_bytes", "releases_removed",
813 "releases_removed_bytes", "heartbeats_removed",
814 "heartbeats_removed_bytes", "intents_removed",
815 "intents_removed_bytes", "total_removed",
816 "total_removed_bytes", "removed_ids", "elapsed_seconds",
817 }
818 assert required_keys.issubset(data.keys())
819
820 def test_elapsed_seconds_is_float(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
821 self._run({}, repo)
822 out = capsys.readouterr().out
823 data = json.loads(out)
824 assert isinstance(data["elapsed_seconds"], float)
825
826 def test_include_intents_false_preserves_intents(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
827 from muse.core.coordination import create_reservation, create_intent
828 res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"])
829 create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "")
830 self._run({"execute": True, "grace_period_seconds": 0}, repo)
831 out = capsys.readouterr().out
832 data = json.loads(out)
833 assert data["intents_removed"] == 0
834
835 def test_bad_grace_period_exits_1(self, repo: pathlib.Path) -> None:
836 from muse.cli.commands.coord_gc import run as gc_run
837 import argparse, os
838 ns = argparse.Namespace(
839 execute=False,
840 grace_period_seconds=-1,
841 include_intents=False,
842 max_intent_age_seconds=604800,
843 verbose=False,
844 fmt="json",
845 )
846 old = os.getcwd()
847 os.chdir(repo)
848 try:
849 with pytest.raises(SystemExit) as exc:
850 gc_run(ns)
851 finally:
852 os.chdir(old)
853 assert exc.value.code == 1 # ExitCode.USER_ERROR
854
855 def test_text_output_dry_run(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
856 self._run({"fmt": "text", "grace_period_seconds": 0}, repo)
857 out = capsys.readouterr().out
858 assert "DRY RUN" in out
859
860 def test_text_output_execute(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
861 self._run({"fmt": "text", "execute": True, "grace_period_seconds": 0}, repo)
862 out = capsys.readouterr().out
863 assert "GC complete" in out
864
865 def test_verbose_lists_removed_ids(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
866 self._run(
867 {"fmt": "text", "execute": True, "grace_period_seconds": 0, "verbose": True},
868 repo,
869 )
870 out = capsys.readouterr().out
871 assert expired_reservation.reservation_id in out
872
873 def test_nothing_to_collect(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
874 """Active reservation → nothing collected."""
875 self._run({"execute": True, "grace_period_seconds": 300}, repo)
876 out = capsys.readouterr().out
877 data = json.loads(out)
878 assert data["total_removed"] == 0
879
880 def test_fmt_bytes_helper(self) -> None:
881 from muse.cli.commands.coord_gc import _fmt_bytes
882 assert _fmt_bytes(0) == "0 B"
883 assert _fmt_bytes(512) == "512 B"
884 assert _fmt_bytes(1024) == "1.0 KiB"
885 assert _fmt_bytes(1048576) == "1.0 MiB"
886
887
888 # ─────────────────────────────────────────────────────────────────────────────
889 # Integration tests — muse coord list (lifecycle-aware)
890 # ─────────────────────────────────────────────────────────────────────────────
891
892
893 class TestListCoordLifecycle:
894 def _run_json(self, repo: pathlib.Path, extra_args: MsgpackDict | None = None) -> None:
895 from muse.cli.commands.list_coord import run as list_run
896 import argparse, os
897 ns = argparse.Namespace(
898 include_expired=True,
899 kind="all",
900 run_id=None,
901 branch=None,
902 address_glob=None,
903 operation=None,
904 limit=None,
905 summary=False,
906 fmt="json",
907 **(extra_args or {}),
908 )
909 old = os.getcwd()
910 os.chdir(repo)
911 try:
912 list_run(ns)
913 finally:
914 os.chdir(old)
915
916 def test_released_field_present(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
917 self._run_json(repo)
918 out = capsys.readouterr().out
919 data = json.loads(out)
920 entries = {e["reservation_id"]: e for e in data["reservations"]}
921 entry = entries[reservation.reservation_id]
922 assert "released" in entry
923 assert entry["released"] is False
924
925 def test_released_field_true_after_release(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
926 from muse.core.coordination import create_release
927 create_release(repo, reservation.reservation_id, "ag", "completed")
928 self._run_json(repo)
929 out = capsys.readouterr().out
930 data = json.loads(out)
931 entries = {e["reservation_id"]: e for e in data["reservations"]}
932 entry = entries[reservation.reservation_id]
933 assert entry["released"] is True
934 assert entry["is_active"] is False
935
936 def test_effective_expires_at_field(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
937 self._run_json(repo)
938 out = capsys.readouterr().out
939 data = json.loads(out)
940 entry = data["reservations"][0]
941 assert "effective_expires_at" in entry
942 # Parse without error.
943 datetime.datetime.fromisoformat(entry["effective_expires_at"])
944
945 def test_effective_expires_extended_by_heartbeat(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
946 from muse.core.coordination import create_heartbeat
947 create_heartbeat(repo, expired_reservation.reservation_id, "ag", 7200)
948 self._run_json(repo)
949 out = capsys.readouterr().out
950 data = json.loads(out)
951 entries = {e["reservation_id"]: e for e in data["reservations"]}
952 entry = entries[expired_reservation.reservation_id]
953 # effective_expires_at should be in the future.
954 eff = datetime.datetime.fromisoformat(entry["effective_expires_at"])
955 assert eff > _now_utc()
956 assert entry["ttl_remaining_seconds"] > 0
957
958 def test_released_reservations_count_in_json(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None:
959 from muse.core.coordination import create_release
960 create_release(repo, reservation.reservation_id, "ag", "completed")
961 self._run_json(repo)
962 out = capsys.readouterr().out
963 data = json.loads(out)
964 assert "released_reservations" in data
965 assert data["released_reservations"] >= 1
966
967
968 # ─────────────────────────────────────────────────────────────────────────────
969 # Security tests
970 # ─────────────────────────────────────────────────────────────────────────────
971
972
973 class TestSecurity:
974 def test_release_uuid_injection_rejected(self, repo: pathlib.Path) -> None:
975 from muse.core.coordination import create_release
976 attacks = [
977 "../../etc/passwd",
978 "../releases/legit-uuid",
979 "a" * 200,
980 "\x00evil",
981 "' OR 1=1 --",
982 ]
983 for attack in attacks:
984 with pytest.raises((ValueError, FileNotFoundError, OSError)):
985 create_release(repo, attack, "ag", "completed")
986
987 def test_heartbeat_uuid_injection_rejected(self, repo: pathlib.Path) -> None:
988 from muse.core.coordination import create_heartbeat
989 attacks = ["../../etc/passwd", "../heartbeats/x", "not-a-uuid"]
990 for attack in attacks:
991 with pytest.raises(ValueError):
992 create_heartbeat(repo, attack, "ag", 3600)
993
994 def test_release_cmd_rejects_traversal(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
995 from muse.cli.commands.release_coord import run as release_run
996 import argparse, os
997 ns = argparse.Namespace(
998 reservation_id="../../etc/passwd",
999 run_id="ag",
1000 reason="completed",
1001 all_for_run=None,
1002 fmt="json",
1003 )
1004 old = os.getcwd()
1005 os.chdir(repo)
1006 try:
1007 with pytest.raises(SystemExit) as exc:
1008 release_run(ns)
1009 finally:
1010 os.chdir(old)
1011 assert exc.value.code == 1 # ExitCode.USER_ERROR
1012
1013 def test_heartbeat_cmd_rejects_traversal(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1014 from muse.cli.commands.heartbeat_coord import run as hb_run
1015 import argparse, os
1016 ns = argparse.Namespace(
1017 reservation_id="../../etc/shadow",
1018 run_id="ag",
1019 extension_seconds=3600,
1020 fmt="json",
1021 )
1022 old = os.getcwd()
1023 os.chdir(repo)
1024 try:
1025 with pytest.raises(SystemExit) as exc:
1026 hb_run(ns)
1027 finally:
1028 os.chdir(old)
1029 assert exc.value.code == 1 # ExitCode.USER_ERROR
1030
1031 def test_gc_does_not_escape_coord_dir(self, repo: pathlib.Path) -> None:
1032 """GC only scans known subdirs — no user input used for path construction."""
1033 from muse.core.coordination import run_coord_gc
1034 # Create a file outside .muse/coordination/ — GC must not touch it.
1035 sentinel = repo / "DO_NOT_DELETE.txt"
1036 sentinel.write_text("safe")
1037 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
1038 assert sentinel.exists()
1039
1040
1041 # ─────────────────────────────────────────────────────────────────────────────
1042 # Stress tests
1043 # ─────────────────────────────────────────────────────────────────────────────
1044
1045
1046 class TestStress:
1047 def test_batch_release_200_reservations_under_2s(self, repo: pathlib.Path) -> None:
1048 """Releasing 200 reservations via --all-for-run completes in < 2 s."""
1049 from muse.core.coordination import create_reservation, load_released_ids
1050 from muse.cli.commands.release_coord import run as release_run
1051 import argparse, os
1052
1053 N = 200
1054 for i in range(N):
1055 create_reservation(
1056 repo,
1057 run_id="stress-agent",
1058 branch="feat/stress",
1059 addresses=[f"src/mod{i}.py::fn{i}"],
1060 )
1061
1062 ns = argparse.Namespace(
1063 reservation_id=None,
1064 run_id="controller",
1065 reason="completed",
1066 all_for_run="stress-agent",
1067 fmt="json",
1068 )
1069 old = os.getcwd()
1070 os.chdir(repo)
1071 t0 = time.monotonic()
1072 try:
1073 release_run(ns)
1074 except SystemExit:
1075 pass
1076 finally:
1077 os.chdir(old)
1078 elapsed = time.monotonic() - t0
1079
1080 released = load_released_ids(repo)
1081 assert len(released) == N
1082 assert elapsed < 2.0, f"batch release took {elapsed:.2f}s — too slow"
1083
1084 def test_gc_500_expired_reservations_under_3s(self, repo: pathlib.Path) -> None:
1085 """GC collects 500 expired reservations in < 3 s."""
1086 from muse.core.coordination import (
1087 _reservations_dir, _ensure_coord_dirs, run_coord_gc,
1088 )
1089
1090 _ensure_coord_dirs(repo)
1091 res_dir = _reservations_dir(repo)
1092 now = _now_utc()
1093 expired_at = (now - datetime.timedelta(hours=2)).isoformat()
1094
1095 N = 500
1096 for i in range(N):
1097 rid = str(uuid.uuid4())
1098 data = {
1099 "schema_version": "0.0.0",
1100 "reservation_id": rid,
1101 "run_id": f"ag-{i}",
1102 "branch": "feat/x",
1103 "addresses": [f"f{i}.py::fn"],
1104 "created_at": (now - datetime.timedelta(hours=3)).isoformat(),
1105 "expires_at": expired_at,
1106 "operation": None,
1107 }
1108 (res_dir / f"{rid}.json").write_text(json.dumps(data))
1109
1110 t0 = time.monotonic()
1111 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
1112 elapsed = time.monotonic() - t0
1113
1114 assert result.reservations_removed == N
1115 assert elapsed < 3.0, f"GC took {elapsed:.2f}s — too slow"
1116
1117 def test_load_heartbeat_map_100_entries_under_500ms(self, repo: pathlib.Path) -> None:
1118 """Loading 100 heartbeat files completes in < 500 ms."""
1119 from muse.core.coordination import (
1120 _ensure_coord_dirs, _heartbeats_dir, load_heartbeat_map,
1121 )
1122
1123 _ensure_coord_dirs(repo)
1124 hb_dir = _heartbeats_dir(repo)
1125 now = _now_utc()
1126
1127 N = 100
1128 for i in range(N):
1129 rid = str(uuid.uuid4())
1130 data = {
1131 "schema_version": "0.0.0",
1132 "reservation_id": rid,
1133 "run_id": f"ag-{i}",
1134 "last_beat_at": now.isoformat(),
1135 "extended_expires_at": (now + datetime.timedelta(hours=1)).isoformat(),
1136 }
1137 (hb_dir / f"{rid}.json").write_text(json.dumps(data))
1138
1139 t0 = time.monotonic()
1140 hb_map = load_heartbeat_map(repo)
1141 elapsed = time.monotonic() - t0
1142
1143 assert len(hb_map) == N
1144 assert elapsed < 0.5, f"load_heartbeat_map took {elapsed:.3f}s — too slow"
1145
1146 def test_round_trip_correctness_under_concurrent_writes(self, repo: pathlib.Path) -> None:
1147 """release → heartbeat attempt → gc correctness under rapid writes."""
1148 from muse.core.coordination import (
1149 active_reservations, create_heartbeat, create_release,
1150 create_reservation, load_released_ids, run_coord_gc,
1151 )
1152
1153 N = 50
1154 reservations = [
1155 create_reservation(
1156 repo, run_id=f"ag-{i}", branch="b", addresses=[f"f{i}.py::fn"]
1157 )
1158 for i in range(N)
1159 ]
1160
1161 # Release half.
1162 for res in reservations[:N // 2]:
1163 create_release(repo, res.reservation_id, res.run_id, "completed")
1164
1165 # Heartbeat the other half.
1166 for res in reservations[N // 2:]:
1167 create_heartbeat(repo, res.reservation_id, res.run_id, 7200)
1168
1169 active = active_reservations(repo)
1170 active_ids = {r.reservation_id for r in active}
1171
1172 # Released half must not be active.
1173 for res in reservations[:N // 2]:
1174 assert res.reservation_id not in active_ids
1175
1176 # Heartbeat half must be active.
1177 for res in reservations[N // 2:]:
1178 assert res.reservation_id in active_ids
1179
1180 # GC with large grace period → nothing removed yet.
1181 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=9999)
1182 assert result.reservations_removed == 0
File History 1 commit
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 72 days ago