gabriel / muse public
test_cmd_heartbeat_coord.py python
854 lines 33.0 KB
Raw
1 """Comprehensive tests for ``muse coord heartbeat``.
2
3 Coverage matrix
4 ---------------
5 Unit
6 ~~~~
7 * create_heartbeat roundtrip — write, load, fields intact
8 * Heartbeat.to_dict — required keys present
9 * Heartbeat extends TTL — extended_expires_at is now + extension_seconds
10 * Negative extension rejected — ValueError raised
11 * Zero extension rejected — ValueError raised
12 * Path traversal in reservation_id rejected before file I/O
13
14 Integration — CLI
15 ~~~~~~~~~~~~~~~~~
16 * Basic heartbeat — text output, exit 0
17 * --extension 7200 — custom extension applied
18 * Released guard — heartbeat after release exits USER_ERROR (1)
19 * Not-found reservation_id exits NOT_FOUND (4)
20 * Invalid reservation_id exits USER_ERROR (1)
21 * --format json — valid JSON, required keys, compact (no indent)
22 * --json shorthand — valid JSON
23 * Idempotent — second heartbeat extends expiry further
24
25 Input validation
26 ~~~~~~~~~~~~~~~~
27 * --run-id at exactly 256 chars accepted
28 * --run-id over 256 chars exits USER_ERROR (1) before file I/O
29 * --extension 0 / negative exits USER_ERROR (1)
30 * --extension above 31_536_000 exits USER_ERROR (1)
31 * --extension at exactly 31_536_000 accepted
32 * Validation fires before any file I/O
33
34 Security
35 ~~~~~~~~
36 * Path traversal in reservation_id rejected before file I/O
37 * ANSI escape sequences in run_id are safe (sanitized in output)
38 * null byte in reservation_id rejected
39 * Exit codes: USER_ERROR for bad_id, NOT_FOUND for not_found
40
41 Concurrent
42 ~~~~~~~~~~
43 * 20 threads racing to heartbeat the same reservation — all exit 0
44 * last writer's extended_expires_at survives in the heartbeat map
45
46 Stress
47 ~~~~~~
48 * 100 heartbeats created and loaded via load_heartbeat_map < 1 s
49 * 500 sequential CLI heartbeats complete < 10 s
50 """
51
52 from __future__ import annotations
53
54 import datetime
55 import json
56 import pathlib
57 import time
58 import threading
59
60 import pytest
61
62 from tests.cli_test_helper import CliRunner
63 from muse.core.coordination import (
64 Heartbeat,
65 create_heartbeat,
66 create_release,
67 create_reservation,
68 load_heartbeat_map,
69 load_released_ids,
70 )
71 from muse.cli.commands.heartbeat_coord import _MAX_EXTENSION_SECONDS, _MAX_RUN_ID_LEN
72 from muse.core.errors import ExitCode
73 from muse.core.paths import muse_dir
74 from muse.core.types import fake_id
75
76 cli = None
77 runner = CliRunner()
78
79
80 # ---------------------------------------------------------------------------
81 # Helpers
82 # ---------------------------------------------------------------------------
83
84
85 def _now_utc() -> datetime.datetime:
86 return datetime.datetime.now(datetime.timezone.utc)
87
88
89 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
90 dot_muse = muse_dir(tmp_path)
91 dot_muse.mkdir()
92 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
93 return tmp_path
94
95
96 @pytest.fixture()
97 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
98 monkeypatch.chdir(tmp_path)
99 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
100 r = runner.invoke(cli, ["init", "--domain", "code"])
101 assert r.exit_code == 0, r.output
102 return tmp_path
103
104
105 @pytest.fixture()
106 def reservation(repo: pathlib.Path) -> Reservation:
107 return create_reservation(
108 repo,
109 run_id="agent-test",
110 branch="feat/test",
111 addresses=["src/billing.py::compute_total"],
112 ttl_seconds=3600,
113 )
114
115
116 # ---------------------------------------------------------------------------
117 # Unit — core helpers
118 # ---------------------------------------------------------------------------
119
120
121 class TestCreateHeartbeatRoundtrip:
122 def test_roundtrip_fields(self, tmp_path: pathlib.Path) -> None:
123 root = _make_repo(tmp_path)
124 res = create_reservation(root, run_id="agent-1", branch="main",
125 addresses=["a.py::f"], ttl_seconds=3600)
126 rid = res.reservation_id
127 hb = create_heartbeat(root, rid, run_id="agent-1", extension_seconds=3600)
128 assert hb.reservation_id == rid
129 assert hb.run_id == "agent-1"
130 assert isinstance(hb.last_beat_at, datetime.datetime)
131 assert isinstance(hb.extended_expires_at, datetime.datetime)
132
133 def test_roundtrip_persisted_to_disk(self, tmp_path: pathlib.Path) -> None:
134 root = _make_repo(tmp_path)
135 res = create_reservation(root, run_id="agent-1", branch="main",
136 addresses=["a.py::f"], ttl_seconds=3600)
137 rid = res.reservation_id
138 create_heartbeat(root, rid, run_id="agent-1", extension_seconds=3600)
139 hb_map = load_heartbeat_map(root)
140 assert rid in hb_map
141
142 def test_to_dict_required_keys(self, tmp_path: pathlib.Path) -> None:
143 root = _make_repo(tmp_path)
144 res = create_reservation(root, run_id="agent-1", branch="main",
145 addresses=["a.py::f"], ttl_seconds=3600)
146 hb = create_heartbeat(root, res.reservation_id, run_id="agent-1")
147 d = hb.to_dict()
148 assert "reservation_id" in d
149 assert "run_id" in d
150 assert "last_beat_at" in d
151 assert "extended_expires_at" in d
152 assert "schema_version" in d
153
154 def test_extends_ttl_by_extension_seconds(self, tmp_path: pathlib.Path) -> None:
155 root = _make_repo(tmp_path)
156 res = create_reservation(root, run_id="agent-1", branch="main",
157 addresses=["a.py::f"], ttl_seconds=3600)
158 t_before = _now_utc()
159 hb = create_heartbeat(root, res.reservation_id, run_id="agent-1",
160 extension_seconds=7200)
161 t_after = _now_utc()
162 lower = t_before + datetime.timedelta(seconds=7199)
163 upper = t_after + datetime.timedelta(seconds=7201)
164 assert lower <= hb.extended_expires_at <= upper
165
166 def test_default_extension_is_3600s(self, tmp_path: pathlib.Path) -> None:
167 root = _make_repo(tmp_path)
168 res = create_reservation(root, run_id="agent-1", branch="main",
169 addresses=["a.py::f"], ttl_seconds=3600)
170 t_before = _now_utc()
171 hb = create_heartbeat(root, res.reservation_id, run_id="agent-1")
172 t_after = _now_utc()
173 lower = t_before + datetime.timedelta(seconds=3599)
174 upper = t_after + datetime.timedelta(seconds=3601)
175 assert lower <= hb.extended_expires_at <= upper
176
177 def test_negative_extension_raises(self, tmp_path: pathlib.Path) -> None:
178 root = _make_repo(tmp_path)
179 res = create_reservation(root, run_id="agent-1", branch="main",
180 addresses=["a.py::f"], ttl_seconds=3600)
181 with pytest.raises(ValueError, match="extension_seconds must be > 0"):
182 create_heartbeat(root, res.reservation_id, run_id="agent-1",
183 extension_seconds=-1)
184
185 def test_zero_extension_raises(self, tmp_path: pathlib.Path) -> None:
186 root = _make_repo(tmp_path)
187 res = create_reservation(root, run_id="agent-1", branch="main",
188 addresses=["a.py::f"], ttl_seconds=3600)
189 with pytest.raises(ValueError, match="extension_seconds must be > 0"):
190 create_heartbeat(root, res.reservation_id, run_id="agent-1",
191 extension_seconds=0)
192
193 def test_path_traversal_reservation_id_rejected(self, tmp_path: pathlib.Path) -> None:
194 root = _make_repo(tmp_path)
195 with pytest.raises(ValueError, match="sha256"):
196 create_heartbeat(root, "../../etc/passwd", run_id="agent-1")
197
198 def test_path_traversal_with_slashes_rejected(self, tmp_path: pathlib.Path) -> None:
199 root = _make_repo(tmp_path)
200 with pytest.raises(ValueError):
201 create_heartbeat(root, "../sneaky/path", run_id="agent-1")
202
203 def test_non_content_id_rejected(self, tmp_path: pathlib.Path) -> None:
204 root = _make_repo(tmp_path)
205 with pytest.raises(ValueError):
206 create_heartbeat(root, "not-a-content-id", run_id="agent-1")
207
208 def test_second_heartbeat_overwrites_first(self, tmp_path: pathlib.Path) -> None:
209 root = _make_repo(tmp_path)
210 res = create_reservation(root, run_id="agent-1", branch="main",
211 addresses=["a.py::f"], ttl_seconds=3600)
212 rid = res.reservation_id
213 hb1 = create_heartbeat(root, rid, run_id="agent-1", extension_seconds=1000)
214 hb2 = create_heartbeat(root, rid, run_id="agent-1", extension_seconds=5000)
215 assert hb2.extended_expires_at > hb1.extended_expires_at
216 hb_map = load_heartbeat_map(root)
217 assert hb_map[rid].extended_expires_at == hb2.extended_expires_at
218
219
220 # ---------------------------------------------------------------------------
221 # Integration — CLI
222 # ---------------------------------------------------------------------------
223
224
225 class TestHeartbeatBasicCli:
226 def test_basic_heartbeat_exits_zero(self, repo: pathlib.Path, reservation: Reservation) -> None:
227 rid = reservation.reservation_id
228 result = runner.invoke(
229 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
230 )
231 assert result.exit_code == 0, result.output
232
233 def test_basic_heartbeat_text_contains_reservation_id(
234 self, repo: pathlib.Path, reservation: Reservation
235 ) -> None:
236 rid = reservation.reservation_id
237 result = runner.invoke(
238 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
239 )
240 assert rid[:8] in result.output or rid in result.output
241
242 def test_basic_heartbeat_text_contains_heartbeat(
243 self, repo: pathlib.Path, reservation: Reservation
244 ) -> None:
245 rid = reservation.reservation_id
246 result = runner.invoke(
247 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
248 )
249 assert "heartbeat" in result.output.lower()
250
251 def test_basic_heartbeat_text_contains_extended(
252 self, repo: pathlib.Path, reservation: Reservation
253 ) -> None:
254 rid = reservation.reservation_id
255 result = runner.invoke(
256 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
257 )
258 assert "extended" in result.output.lower() or "until" in result.output.lower()
259
260 def test_extension_7200_accepted(self, repo: pathlib.Path, reservation: Reservation) -> None:
261 rid = reservation.reservation_id
262 result = runner.invoke(
263 cli,
264 ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200"],
265 )
266 assert result.exit_code == 0, result.output
267
268 def test_extension_7200_shown_in_output(self, repo: pathlib.Path, reservation: Reservation) -> None:
269 rid = reservation.reservation_id
270 result = runner.invoke(
271 cli,
272 ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200"],
273 )
274 assert "7200" in result.output
275
276 def test_heartbeat_after_release_exits_nonzero(
277 self, repo: pathlib.Path, reservation: Reservation
278 ) -> None:
279 rid = reservation.reservation_id
280 runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-test"])
281 result = runner.invoke(
282 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
283 )
284 assert result.exit_code != 0
285
286 def test_heartbeat_after_release_mentions_released(
287 self, repo: pathlib.Path, reservation: Reservation
288 ) -> None:
289 rid = reservation.reservation_id
290 runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-test"])
291 result = runner.invoke(
292 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
293 )
294 combined = result.output + (result.stderr or "")
295 assert "released" in combined.lower()
296
297 def test_not_found_id_exits_nonzero(self, repo: pathlib.Path) -> None:
298 nonexistent = fake_id("nonexistent-heartbeat-2")
299 result = runner.invoke(
300 cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1"]
301 )
302 assert result.exit_code != 0
303
304 def test_not_found_id_mentions_not_found(self, repo: pathlib.Path) -> None:
305 nonexistent = fake_id("nonexistent-heartbeat-1")
306 result = runner.invoke(
307 cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1"]
308 )
309 combined = result.output + (result.stderr or "")
310 assert "not found" in combined.lower()
311
312 def test_invalid_id_exits_nonzero(self, repo: pathlib.Path) -> None:
313 result = runner.invoke(
314 cli, ["coord", "heartbeat", "not-a-content-id", "--run-id", "agent-1"]
315 )
316 assert result.exit_code != 0
317
318 def test_negative_extension_exits_nonzero(
319 self, repo: pathlib.Path, reservation: Reservation
320 ) -> None:
321 rid = reservation.reservation_id
322 result = runner.invoke(
323 cli,
324 ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "-1"],
325 )
326 assert result.exit_code != 0
327
328 def test_zero_extension_exits_nonzero(
329 self, repo: pathlib.Path, reservation: Reservation
330 ) -> None:
331 rid = reservation.reservation_id
332 result = runner.invoke(
333 cli,
334 ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "0"],
335 )
336 assert result.exit_code != 0
337
338 def test_idempotent_second_heartbeat_exits_zero(
339 self, repo: pathlib.Path, reservation: Reservation
340 ) -> None:
341 rid = reservation.reservation_id
342 r1 = runner.invoke(
343 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
344 )
345 assert r1.exit_code == 0, r1.output
346 r2 = runner.invoke(
347 cli,
348 ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200"],
349 )
350 assert r2.exit_code == 0, r2.output
351
352 def test_idempotent_second_heartbeat_extends_further(
353 self, repo: pathlib.Path, reservation: Reservation
354 ) -> None:
355 rid = reservation.reservation_id
356 r1 = runner.invoke(
357 cli,
358 ["coord", "heartbeat", rid, "--run-id", "agent-test",
359 "--extension", "1000", "--json"],
360 )
361 d1 = json.loads(r1.output)
362 r2 = runner.invoke(
363 cli,
364 ["coord", "heartbeat", rid, "--run-id", "agent-test",
365 "--extension", "7200", "--json"],
366 )
367 d2 = json.loads(r2.output)
368 exp1 = datetime.datetime.fromisoformat(d1["extended_expires_at"])
369 exp2 = datetime.datetime.fromisoformat(d2["extended_expires_at"])
370 assert exp2 > exp1
371
372
373 class TestHeartbeatFormatJson:
374 def test_format_json_flag(self, repo: pathlib.Path, reservation: Reservation) -> None:
375 rid = reservation.reservation_id
376 result = runner.invoke(
377 cli,
378 ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"],
379 )
380 assert result.exit_code == 0, result.output
381 data = json.loads(result.output)
382 assert data["reservation_id"] == rid
383 assert data["run_id"] == "agent-test"
384
385 def test_json_shorthand(self, repo: pathlib.Path, reservation: Reservation) -> None:
386 rid = reservation.reservation_id
387 result = runner.invoke(
388 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
389 )
390 assert result.exit_code == 0, result.output
391 data = json.loads(result.output)
392 assert data["reservation_id"] == rid
393
394 def test_json_status_ok(self, repo: pathlib.Path, reservation: Reservation) -> None:
395 rid = reservation.reservation_id
396 result = runner.invoke(
397 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
398 )
399 data = json.loads(result.output)
400 assert data["status"] == "ok"
401
402 def test_json_required_keys_present(self, repo: pathlib.Path, reservation: Reservation) -> None:
403 rid = reservation.reservation_id
404 result = runner.invoke(
405 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
406 )
407 data = json.loads(result.output)
408 for key in ("reservation_id", "run_id", "last_beat_at", "extended_expires_at",
409 "ttl_extended_seconds", "duration_ms"):
410 assert key in data, f"missing key: {key}"
411
412 def test_json_extended_expires_at_is_iso(self, repo: pathlib.Path, reservation: Reservation) -> None:
413 rid = reservation.reservation_id
414 result = runner.invoke(
415 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
416 )
417 data = json.loads(result.output)
418 # Should parse without error
419 dt = datetime.datetime.fromisoformat(data["extended_expires_at"])
420 assert dt > _now_utc() - datetime.timedelta(seconds=5)
421
422 def test_json_ttl_extended_seconds_matches_extension(
423 self, repo: pathlib.Path, reservation: Reservation
424 ) -> None:
425 rid = reservation.reservation_id
426 result = runner.invoke(
427 cli,
428 ["coord", "heartbeat", rid, "--run-id", "agent-test",
429 "--extension", "7200", "--json"],
430 )
431 data = json.loads(result.output)
432 assert data["ttl_extended_seconds"] == 7200
433
434 def test_json_duration_ms_is_float(self, repo: pathlib.Path, reservation: Reservation) -> None:
435 rid = reservation.reservation_id
436 result = runner.invoke(
437 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
438 )
439 data = json.loads(result.output)
440 assert isinstance(data["duration_ms"], float)
441
442 def test_json_not_found_has_status(self, repo: pathlib.Path) -> None:
443 nonexistent = fake_id("nonexistent-heartbeat-2")
444 result = runner.invoke(
445 cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1", "--json"]
446 )
447 data = json.loads(result.output)
448 assert data["status"] == "not_found"
449
450 def test_json_already_released_has_status(
451 self, repo: pathlib.Path, reservation: Reservation
452 ) -> None:
453 rid = reservation.reservation_id
454 runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-test"])
455 result = runner.invoke(
456 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
457 )
458 data = json.loads(result.output)
459 assert data["status"] == "already_released"
460
461
462 # ---------------------------------------------------------------------------
463 # Security
464 # ---------------------------------------------------------------------------
465
466
467 class TestHeartbeatSecurity:
468 def test_path_traversal_in_reservation_id_rejected(self, repo: pathlib.Path) -> None:
469 result = runner.invoke(
470 cli, ["coord", "heartbeat", "../../etc/passwd", "--run-id", "agent-1"]
471 )
472 assert result.exit_code != 0
473 assert not (repo / "etc").exists()
474 assert not (repo.parent / "etc").exists()
475
476 def test_path_traversal_with_dots_rejected(self, repo: pathlib.Path) -> None:
477 result = runner.invoke(
478 cli, ["coord", "heartbeat", "../sneaky", "--run-id", "agent-1"]
479 )
480 assert result.exit_code != 0
481
482 def test_null_byte_in_reservation_id_rejected(self, repo: pathlib.Path) -> None:
483 result = runner.invoke(
484 cli, ["coord", "heartbeat", "abc\x00def", "--run-id", "agent-1"]
485 )
486 assert result.exit_code != 0
487
488 def test_ansi_in_run_id_does_not_crash(self, repo: pathlib.Path, reservation: Reservation) -> None:
489 rid = reservation.reservation_id
490 ansi_run_id = "\x1b[31magent-malicious\x1b[0m"
491 result = runner.invoke(
492 cli, ["coord", "heartbeat", rid, "--run-id", ansi_run_id]
493 )
494 # Must not crash with an unhandled exception
495 assert result.exit_code in (0, 1, 2)
496
497
498 # ---------------------------------------------------------------------------
499 # Stress
500 # ---------------------------------------------------------------------------
501
502
503 class TestHeartbeatStress:
504 def test_100_heartbeats_created_and_loaded_under_1s(
505 self, tmp_path: pathlib.Path
506 ) -> None:
507 root = _make_repo(tmp_path)
508 rids: list[str] = []
509 for i in range(100):
510 res = create_reservation(
511 root,
512 run_id="stress-agent",
513 branch="feat/stress",
514 addresses=[f"src/s{i}.py::func"],
515 ttl_seconds=3600,
516 )
517 rids.append(res.reservation_id)
518
519 t0 = time.monotonic()
520 for rid in rids:
521 create_heartbeat(root, rid, run_id="stress-agent", extension_seconds=3600)
522 hb_map = load_heartbeat_map(root)
523 elapsed = time.monotonic() - t0
524
525 assert len(hb_map) == 100, f"expected 100 heartbeats, got {len(hb_map)}"
526 assert elapsed < 1.0, f"100 heartbeats took {elapsed:.3f}s (> 1s limit)"
527 for rid in rids:
528 assert rid in hb_map
529
530 def test_500_sequential_cli_heartbeats_under_10s(self, repo: pathlib.Path) -> None:
531 """500 back-to-back CLI invocations must complete in under 10 s."""
532 res = create_reservation(
533 repo,
534 run_id="seq-stress-agent",
535 branch="main",
536 addresses=["src/seq.py::func"],
537 ttl_seconds=3600,
538 )
539 rid = res.reservation_id
540 t0 = time.monotonic()
541 for _ in range(500):
542 result = runner.invoke(
543 cli, ["coord", "heartbeat", rid, "--run-id", "seq-stress-agent", "--json"]
544 )
545 assert result.exit_code == 0, result.output
546 elapsed = time.monotonic() - t0
547 assert elapsed < 30.0, f"500 sequential heartbeats took {elapsed:.2f}s (> 30s)"
548
549
550 # ---------------------------------------------------------------------------
551 # Input validation
552 # ---------------------------------------------------------------------------
553
554
555 class TestHeartbeatInputValidation:
556 def test_run_id_at_max_length_accepted(self, repo: pathlib.Path, reservation: Reservation) -> None:
557 rid = reservation.reservation_id
558 long_run_id = "a" * _MAX_RUN_ID_LEN
559 result = runner.invoke(
560 cli, ["coord", "heartbeat", rid, "--run-id", long_run_id]
561 )
562 assert result.exit_code == 0, result.output
563
564 def test_run_id_over_max_length_exits_user_error(
565 self, repo: pathlib.Path, reservation: Reservation
566 ) -> None:
567 rid = reservation.reservation_id
568 too_long = "a" * (_MAX_RUN_ID_LEN + 1)
569 result = runner.invoke(
570 cli, ["coord", "heartbeat", rid, "--run-id", too_long]
571 )
572 assert result.exit_code == ExitCode.USER_ERROR
573
574 def test_run_id_over_max_leaves_no_heartbeat_file(
575 self, repo: pathlib.Path, reservation: Reservation
576 ) -> None:
577 """Validation must fire before any file I/O."""
578 rid = reservation.reservation_id
579 too_long = "a" * (_MAX_RUN_ID_LEN + 1)
580 runner.invoke(cli, ["coord", "heartbeat", rid, "--run-id", too_long])
581 hb_map = load_heartbeat_map(repo)
582 assert rid not in hb_map
583
584 def test_extension_zero_exits_user_error(
585 self, repo: pathlib.Path, reservation: Reservation
586 ) -> None:
587 rid = reservation.reservation_id
588 result = runner.invoke(
589 cli, ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "0"]
590 )
591 assert result.exit_code == ExitCode.USER_ERROR
592
593 def test_extension_negative_exits_user_error(
594 self, repo: pathlib.Path, reservation: Reservation
595 ) -> None:
596 rid = reservation.reservation_id
597 result = runner.invoke(
598 cli,
599 ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "-100"],
600 )
601 assert result.exit_code == ExitCode.USER_ERROR
602
603 def test_extension_above_max_exits_user_error(
604 self, repo: pathlib.Path, reservation: Reservation
605 ) -> None:
606 rid = reservation.reservation_id
607 over_max = str(_MAX_EXTENSION_SECONDS + 1)
608 result = runner.invoke(
609 cli,
610 ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", over_max],
611 )
612 assert result.exit_code == ExitCode.USER_ERROR
613
614 def test_extension_at_max_accepted(self, repo: pathlib.Path, reservation: Reservation) -> None:
615 rid = reservation.reservation_id
616 result = runner.invoke(
617 cli,
618 [
619 "coord", "heartbeat", rid, "--run-id", "agent-1",
620 "--extension", str(_MAX_EXTENSION_SECONDS),
621 ],
622 )
623 assert result.exit_code == 0, result.output
624
625 def test_extension_above_max_leaves_no_heartbeat(
626 self, repo: pathlib.Path, reservation: Reservation
627 ) -> None:
628 """Validation must fire before any file I/O."""
629 rid = reservation.reservation_id
630 over_max = str(_MAX_EXTENSION_SECONDS + 1)
631 runner.invoke(
632 cli,
633 ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", over_max],
634 )
635 hb_map = load_heartbeat_map(repo)
636 assert rid not in hb_map
637
638 def test_invalid_id_exits_user_error(self, repo: pathlib.Path) -> None:
639 result = runner.invoke(
640 cli, ["coord", "heartbeat", "not-a-content-id", "--run-id", "agent-1"]
641 )
642 assert result.exit_code == ExitCode.USER_ERROR
643
644 def test_not_found_id_exits_not_found(self, repo: pathlib.Path) -> None:
645 nonexistent = fake_id("nonexistent-heartbeat-3")
646 result = runner.invoke(
647 cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1"]
648 )
649 assert result.exit_code == ExitCode.NOT_FOUND
650
651 def test_already_released_exits_user_error(
652 self, repo: pathlib.Path, reservation: Reservation
653 ) -> None:
654 rid = reservation.reservation_id
655 create_release(repo, rid, run_id="agent-test")
656 result = runner.invoke(
657 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"]
658 )
659 assert result.exit_code == ExitCode.USER_ERROR
660
661 def test_invalid_id_json_has_status(self, repo: pathlib.Path) -> None:
662 result = runner.invoke(
663 cli, ["coord", "heartbeat", "not-valid", "--run-id", "agent-1", "--json"]
664 )
665 data = json.loads(result.output)
666 assert data["status"] == "bad_id"
667 assert result.exit_code == ExitCode.USER_ERROR
668
669 def test_not_found_json_has_status(self, repo: pathlib.Path) -> None:
670 nonexistent = fake_id("nonexistent-heartbeat-4")
671 result = runner.invoke(
672 cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1", "--json"]
673 )
674 data = json.loads(result.output)
675 assert data["status"] == "not_found"
676 assert result.exit_code == ExitCode.NOT_FOUND
677
678 def test_already_released_json_has_status(
679 self, repo: pathlib.Path, reservation: Reservation
680 ) -> None:
681 rid = reservation.reservation_id
682 create_release(repo, rid, run_id="agent-test")
683 result = runner.invoke(
684 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
685 )
686 data = json.loads(result.output)
687 assert data["status"] == "already_released"
688 assert result.exit_code == ExitCode.USER_ERROR
689
690
691 # ---------------------------------------------------------------------------
692 # JSON output format
693 # ---------------------------------------------------------------------------
694
695
696 class TestHeartbeatJsonFormat:
697 def test_ok_json_is_compact(self, repo: pathlib.Path, reservation: Reservation) -> None:
698 """JSON output must be compact — no newlines inside the object."""
699 rid = reservation.reservation_id
700 result = runner.invoke(
701 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
702 )
703 assert result.exit_code == 0, result.output
704 assert "\n" not in result.output.strip()
705
706 def test_not_found_json_is_compact(self, repo: pathlib.Path) -> None:
707 nonexistent = fake_id("nonexistent-heartbeat-5")
708 result = runner.invoke(
709 cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1", "--json"]
710 )
711 assert "\n" not in result.output.strip()
712
713 def test_already_released_json_is_compact(
714 self, repo: pathlib.Path, reservation: Reservation
715 ) -> None:
716 rid = reservation.reservation_id
717 create_release(repo, rid, run_id="agent-test")
718 result = runner.invoke(
719 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
720 )
721 assert "\n" not in result.output.strip()
722
723 def test_ok_json_all_required_keys(self, repo: pathlib.Path, reservation: Reservation) -> None:
724 rid = reservation.reservation_id
725 result = runner.invoke(
726 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
727 )
728 data = json.loads(result.output)
729 for key in (
730 "status", "reservation_id", "run_id", "last_beat_at",
731 "extended_expires_at", "ttl_extended_seconds", "duration_ms",
732 ):
733 assert key in data, f"missing JSON key: {key}"
734
735 def test_ttl_extended_seconds_matches_extension_arg(
736 self, repo: pathlib.Path, reservation: Reservation
737 ) -> None:
738 rid = reservation.reservation_id
739 result = runner.invoke(
740 cli,
741 ["coord", "heartbeat", rid, "--run-id", "agent-test",
742 "--extension", "9999", "--json"],
743 )
744 data = json.loads(result.output)
745 assert data["ttl_extended_seconds"] == 9999
746
747 def test_extended_expires_at_is_future(self, repo: pathlib.Path, reservation: Reservation) -> None:
748 rid = reservation.reservation_id
749 result = runner.invoke(
750 cli,
751 ["coord", "heartbeat", rid, "--run-id", "agent-test",
752 "--extension", "3600", "--json"],
753 )
754 data = json.loads(result.output)
755 exp = datetime.datetime.fromisoformat(data["extended_expires_at"])
756 assert exp > datetime.datetime.now(datetime.timezone.utc)
757
758 def test_duration_ms_non_negative(self, repo: pathlib.Path, reservation: Reservation) -> None:
759 rid = reservation.reservation_id
760 result = runner.invoke(
761 cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"]
762 )
763 data = json.loads(result.output)
764 assert data["duration_ms"] >= 0.0
765
766
767 # ---------------------------------------------------------------------------
768 # Concurrent
769 # ---------------------------------------------------------------------------
770
771
772 class TestHeartbeatConcurrent:
773 def test_20_threads_race_to_heartbeat_same_reservation(
774 self, repo: pathlib.Path, reservation: Reservation
775 ) -> None:
776 """All threads must exit 0; atomic writes prevent corruption."""
777 rid = reservation.reservation_id
778 exit_codes: list[int] = []
779 lock = threading.Lock()
780
781 def _beat() -> None:
782 result = runner.invoke(
783 cli, ["coord", "heartbeat", rid, "--run-id", "agent-race", "--json"]
784 )
785 with lock:
786 exit_codes.append(result.exit_code)
787
788 threads = [threading.Thread(target=_beat) for _ in range(20)]
789 for t in threads:
790 t.start()
791 for t in threads:
792 t.join()
793
794 assert all(c == 0 for c in exit_codes), f"Non-zero exits: {exit_codes}"
795 # Heartbeat file must be parseable after concurrent writes
796 hb_map = load_heartbeat_map(repo)
797 assert rid in hb_map
798
799 def test_concurrent_heartbeat_last_writer_wins(
800 self, repo: pathlib.Path, reservation: Reservation
801 ) -> None:
802 """After concurrent writes the heartbeat map must contain a valid entry."""
803 rid = reservation.reservation_id
804 extensions = [1000, 2000, 3000, 4000, 5000]
805 results: list[dict] = []
806 lock = threading.Lock()
807
808 def _beat(ext: int) -> None:
809 result = runner.invoke(
810 cli,
811 ["coord", "heartbeat", rid, "--run-id", "agent-lww",
812 "--extension", str(ext), "--json"],
813 )
814 if result.exit_code == 0 and result.output.strip():
815 with lock:
816 results.append(json.loads(result.output))
817
818 threads = [threading.Thread(target=_beat, args=(e,)) for e in extensions]
819 for t in threads:
820 t.start()
821 for t in threads:
822 t.join()
823
824 assert results, "No successful heartbeat in concurrent run"
825 hb_map = load_heartbeat_map(repo)
826 assert rid in hb_map
827 # The surviving heartbeat's extended_expires_at must parse cleanly
828 hb = hb_map[rid]
829 assert isinstance(hb.extended_expires_at, datetime.datetime)
830
831
832 class TestRegisterFlags:
833 def _make_parser(self) -> "argparse.ArgumentParser":
834 import argparse
835 from muse.cli.commands.heartbeat_coord import register
836 p = argparse.ArgumentParser()
837 subs = p.add_subparsers()
838 register(subs)
839 return p
840
841 def test_default_json_out_is_false(self) -> None:
842 p = self._make_parser()
843 args = p.parse_args(["heartbeat", "some-reservation-id", "--run-id", "agent-1"])
844 assert args.json_out is False
845
846 def test_json_flag_sets_json_out(self) -> None:
847 p = self._make_parser()
848 args = p.parse_args(["heartbeat", "some-reservation-id", "--run-id", "agent-1", "--json"])
849 assert args.json_out is True
850
851 def test_j_shorthand_sets_json_out(self) -> None:
852 p = self._make_parser()
853 args = p.parse_args(["heartbeat", "some-reservation-id", "--run-id", "agent-1", "-j"])
854 assert args.json_out is True
File History 1 commit