"""Comprehensive tests for ``muse coord heartbeat``. Coverage matrix --------------- Unit ~~~~ * create_heartbeat roundtrip — write, load, fields intact * Heartbeat.to_dict — required keys present * Heartbeat extends TTL — extended_expires_at is now + extension_seconds * Negative extension rejected — ValueError raised * Zero extension rejected — ValueError raised * Path traversal in reservation_id rejected before file I/O Integration — CLI ~~~~~~~~~~~~~~~~~ * Basic heartbeat — text output, exit 0 * --extension 7200 — custom extension applied * Released guard — heartbeat after release exits USER_ERROR (1) * Not-found reservation_id exits NOT_FOUND (4) * Invalid reservation_id exits USER_ERROR (1) * --format json — valid JSON, required keys, compact (no indent) * --json shorthand — valid JSON * Idempotent — second heartbeat extends expiry further Input validation ~~~~~~~~~~~~~~~~ * --run-id at exactly 256 chars accepted * --run-id over 256 chars exits USER_ERROR (1) before file I/O * --extension 0 / negative exits USER_ERROR (1) * --extension above 31_536_000 exits USER_ERROR (1) * --extension at exactly 31_536_000 accepted * Validation fires before any file I/O Security ~~~~~~~~ * Path traversal in reservation_id rejected before file I/O * ANSI escape sequences in run_id are safe (sanitized in output) * null byte in reservation_id rejected * Exit codes: USER_ERROR for bad_id, NOT_FOUND for not_found Concurrent ~~~~~~~~~~ * 20 threads racing to heartbeat the same reservation — all exit 0 * last writer's extended_expires_at survives in the heartbeat map Stress ~~~~~~ * 100 heartbeats created and loaded via load_heartbeat_map < 1 s * 500 sequential CLI heartbeats complete < 10 s """ from __future__ import annotations import datetime import json import pathlib import time import threading import pytest from tests.cli_test_helper import CliRunner from muse.core.coordination import ( Heartbeat, create_heartbeat, create_release, create_reservation, load_heartbeat_map, load_released_ids, ) from muse.cli.commands.heartbeat_coord import _MAX_EXTENSION_SECONDS, _MAX_RUN_ID_LEN from muse.core.errors import ExitCode from muse.core.paths import muse_dir from muse.core.types import fake_id cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now_utc() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke(cli, ["init", "--domain", "code"]) assert r.exit_code == 0, r.output return tmp_path @pytest.fixture() def reservation(repo: pathlib.Path) -> Reservation: return create_reservation( repo, run_id="agent-test", branch="feat/test", addresses=["src/billing.py::compute_total"], ttl_seconds=3600, ) # --------------------------------------------------------------------------- # Unit — core helpers # --------------------------------------------------------------------------- class TestCreateHeartbeatRoundtrip: def test_roundtrip_fields(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) rid = res.reservation_id hb = create_heartbeat(root, rid, run_id="agent-1", extension_seconds=3600) assert hb.reservation_id == rid assert hb.run_id == "agent-1" assert isinstance(hb.last_beat_at, datetime.datetime) assert isinstance(hb.extended_expires_at, datetime.datetime) def test_roundtrip_persisted_to_disk(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) rid = res.reservation_id create_heartbeat(root, rid, run_id="agent-1", extension_seconds=3600) hb_map = load_heartbeat_map(root) assert rid in hb_map def test_to_dict_required_keys(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) hb = create_heartbeat(root, res.reservation_id, run_id="agent-1") d = hb.to_dict() assert "reservation_id" in d assert "run_id" in d assert "last_beat_at" in d assert "extended_expires_at" in d assert "schema_version" in d def test_extends_ttl_by_extension_seconds(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) t_before = _now_utc() hb = create_heartbeat(root, res.reservation_id, run_id="agent-1", extension_seconds=7200) t_after = _now_utc() lower = t_before + datetime.timedelta(seconds=7199) upper = t_after + datetime.timedelta(seconds=7201) assert lower <= hb.extended_expires_at <= upper def test_default_extension_is_3600s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) t_before = _now_utc() hb = create_heartbeat(root, res.reservation_id, run_id="agent-1") t_after = _now_utc() lower = t_before + datetime.timedelta(seconds=3599) upper = t_after + datetime.timedelta(seconds=3601) assert lower <= hb.extended_expires_at <= upper def test_negative_extension_raises(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) with pytest.raises(ValueError, match="extension_seconds must be > 0"): create_heartbeat(root, res.reservation_id, run_id="agent-1", extension_seconds=-1) def test_zero_extension_raises(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) with pytest.raises(ValueError, match="extension_seconds must be > 0"): create_heartbeat(root, res.reservation_id, run_id="agent-1", extension_seconds=0) def test_path_traversal_reservation_id_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) with pytest.raises(ValueError, match="sha256"): create_heartbeat(root, "../../etc/passwd", run_id="agent-1") def test_path_traversal_with_slashes_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) with pytest.raises(ValueError): create_heartbeat(root, "../sneaky/path", run_id="agent-1") def test_non_content_id_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) with pytest.raises(ValueError): create_heartbeat(root, "not-a-content-id", run_id="agent-1") def test_second_heartbeat_overwrites_first(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) rid = res.reservation_id hb1 = create_heartbeat(root, rid, run_id="agent-1", extension_seconds=1000) hb2 = create_heartbeat(root, rid, run_id="agent-1", extension_seconds=5000) assert hb2.extended_expires_at > hb1.extended_expires_at hb_map = load_heartbeat_map(root) assert hb_map[rid].extended_expires_at == hb2.extended_expires_at # --------------------------------------------------------------------------- # Integration — CLI # --------------------------------------------------------------------------- class TestHeartbeatBasicCli: def test_basic_heartbeat_exits_zero(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) assert result.exit_code == 0, result.output def test_basic_heartbeat_text_contains_reservation_id( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) assert rid[:8] in result.output or rid in result.output def test_basic_heartbeat_text_contains_heartbeat( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) assert "heartbeat" in result.output.lower() def test_basic_heartbeat_text_contains_extended( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) assert "extended" in result.output.lower() or "until" in result.output.lower() def test_extension_7200_accepted(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200"], ) assert result.exit_code == 0, result.output def test_extension_7200_shown_in_output(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200"], ) assert "7200" in result.output def test_heartbeat_after_release_exits_nonzero( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-test"]) result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) assert result.exit_code != 0 def test_heartbeat_after_release_mentions_released( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-test"]) result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) combined = result.output + (result.stderr or "") assert "released" in combined.lower() def test_not_found_id_exits_nonzero(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-heartbeat-2") result = runner.invoke( cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1"] ) assert result.exit_code != 0 def test_not_found_id_mentions_not_found(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-heartbeat-1") result = runner.invoke( cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1"] ) combined = result.output + (result.stderr or "") assert "not found" in combined.lower() def test_invalid_id_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "heartbeat", "not-a-content-id", "--run-id", "agent-1"] ) assert result.exit_code != 0 def test_negative_extension_exits_nonzero( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "-1"], ) assert result.exit_code != 0 def test_zero_extension_exits_nonzero( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "0"], ) assert result.exit_code != 0 def test_idempotent_second_heartbeat_exits_zero( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id r1 = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) assert r1.exit_code == 0, r1.output r2 = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200"], ) assert r2.exit_code == 0, r2.output def test_idempotent_second_heartbeat_extends_further( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id r1 = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "1000", "--json"], ) d1 = json.loads(r1.output) r2 = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200", "--json"], ) d2 = json.loads(r2.output) exp1 = datetime.datetime.fromisoformat(d1["extended_expires_at"]) exp2 = datetime.datetime.fromisoformat(d2["extended_expires_at"]) assert exp2 > exp1 class TestHeartbeatFormatJson: def test_format_json_flag(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["reservation_id"] == rid assert data["run_id"] == "agent-test" def test_json_shorthand(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["reservation_id"] == rid def test_json_status_ok(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) assert data["status"] == "ok" def test_json_required_keys_present(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) for key in ("reservation_id", "run_id", "last_beat_at", "extended_expires_at", "ttl_extended_seconds", "duration_ms"): assert key in data, f"missing key: {key}" def test_json_extended_expires_at_is_iso(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) # Should parse without error dt = datetime.datetime.fromisoformat(data["extended_expires_at"]) assert dt > _now_utc() - datetime.timedelta(seconds=5) def test_json_ttl_extended_seconds_matches_extension( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "7200", "--json"], ) data = json.loads(result.output) assert data["ttl_extended_seconds"] == 7200 def test_json_duration_ms_is_float(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) assert isinstance(data["duration_ms"], float) def test_json_not_found_has_status(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-heartbeat-2") result = runner.invoke( cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "not_found" def test_json_already_released_has_status( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-test"]) result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) assert data["status"] == "already_released" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestHeartbeatSecurity: def test_path_traversal_in_reservation_id_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "heartbeat", "../../etc/passwd", "--run-id", "agent-1"] ) assert result.exit_code != 0 assert not (repo / "etc").exists() assert not (repo.parent / "etc").exists() def test_path_traversal_with_dots_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "heartbeat", "../sneaky", "--run-id", "agent-1"] ) assert result.exit_code != 0 def test_null_byte_in_reservation_id_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "heartbeat", "abc\x00def", "--run-id", "agent-1"] ) assert result.exit_code != 0 def test_ansi_in_run_id_does_not_crash(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id ansi_run_id = "\x1b[31magent-malicious\x1b[0m" result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", ansi_run_id] ) # Must not crash with an unhandled exception assert result.exit_code in (0, 1, 2) # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestHeartbeatStress: def test_100_heartbeats_created_and_loaded_under_1s( self, tmp_path: pathlib.Path ) -> None: root = _make_repo(tmp_path) rids: list[str] = [] for i in range(100): res = create_reservation( root, run_id="stress-agent", branch="feat/stress", addresses=[f"src/s{i}.py::func"], ttl_seconds=3600, ) rids.append(res.reservation_id) t0 = time.monotonic() for rid in rids: create_heartbeat(root, rid, run_id="stress-agent", extension_seconds=3600) hb_map = load_heartbeat_map(root) elapsed = time.monotonic() - t0 assert len(hb_map) == 100, f"expected 100 heartbeats, got {len(hb_map)}" assert elapsed < 1.0, f"100 heartbeats took {elapsed:.3f}s (> 1s limit)" for rid in rids: assert rid in hb_map def test_500_sequential_cli_heartbeats_under_10s(self, repo: pathlib.Path) -> None: """500 back-to-back CLI invocations must complete in under 10 s.""" res = create_reservation( repo, run_id="seq-stress-agent", branch="main", addresses=["src/seq.py::func"], ttl_seconds=3600, ) rid = res.reservation_id t0 = time.monotonic() for _ in range(500): result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "seq-stress-agent", "--json"] ) assert result.exit_code == 0, result.output elapsed = time.monotonic() - t0 assert elapsed < 30.0, f"500 sequential heartbeats took {elapsed:.2f}s (> 30s)" # --------------------------------------------------------------------------- # Input validation # --------------------------------------------------------------------------- class TestHeartbeatInputValidation: def test_run_id_at_max_length_accepted(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id long_run_id = "a" * _MAX_RUN_ID_LEN result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", long_run_id] ) assert result.exit_code == 0, result.output def test_run_id_over_max_length_exits_user_error( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id too_long = "a" * (_MAX_RUN_ID_LEN + 1) result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", too_long] ) assert result.exit_code == ExitCode.USER_ERROR def test_run_id_over_max_leaves_no_heartbeat_file( self, repo: pathlib.Path, reservation: Reservation ) -> None: """Validation must fire before any file I/O.""" rid = reservation.reservation_id too_long = "a" * (_MAX_RUN_ID_LEN + 1) runner.invoke(cli, ["coord", "heartbeat", rid, "--run-id", too_long]) hb_map = load_heartbeat_map(repo) assert rid not in hb_map def test_extension_zero_exits_user_error( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "0"] ) assert result.exit_code == ExitCode.USER_ERROR def test_extension_negative_exits_user_error( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", "-100"], ) assert result.exit_code == ExitCode.USER_ERROR def test_extension_above_max_exits_user_error( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id over_max = str(_MAX_EXTENSION_SECONDS + 1) result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", over_max], ) assert result.exit_code == ExitCode.USER_ERROR def test_extension_at_max_accepted(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, [ "coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", str(_MAX_EXTENSION_SECONDS), ], ) assert result.exit_code == 0, result.output def test_extension_above_max_leaves_no_heartbeat( self, repo: pathlib.Path, reservation: Reservation ) -> None: """Validation must fire before any file I/O.""" rid = reservation.reservation_id over_max = str(_MAX_EXTENSION_SECONDS + 1) runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-1", "--extension", over_max], ) hb_map = load_heartbeat_map(repo) assert rid not in hb_map def test_invalid_id_exits_user_error(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "heartbeat", "not-a-content-id", "--run-id", "agent-1"] ) assert result.exit_code == ExitCode.USER_ERROR def test_not_found_id_exits_not_found(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-heartbeat-3") result = runner.invoke( cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1"] ) assert result.exit_code == ExitCode.NOT_FOUND def test_already_released_exits_user_error( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id create_release(repo, rid, run_id="agent-test") result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test"] ) assert result.exit_code == ExitCode.USER_ERROR def test_invalid_id_json_has_status(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "heartbeat", "not-valid", "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "bad_id" assert result.exit_code == ExitCode.USER_ERROR def test_not_found_json_has_status(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-heartbeat-4") result = runner.invoke( cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "not_found" assert result.exit_code == ExitCode.NOT_FOUND def test_already_released_json_has_status( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id create_release(repo, rid, run_id="agent-test") result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) assert data["status"] == "already_released" assert result.exit_code == ExitCode.USER_ERROR # --------------------------------------------------------------------------- # JSON output format # --------------------------------------------------------------------------- class TestHeartbeatJsonFormat: def test_ok_json_is_compact(self, repo: pathlib.Path, reservation: Reservation) -> None: """JSON output must be compact — no newlines inside the object.""" rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) assert result.exit_code == 0, result.output assert "\n" not in result.output.strip() def test_not_found_json_is_compact(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-heartbeat-5") result = runner.invoke( cli, ["coord", "heartbeat", nonexistent, "--run-id", "agent-1", "--json"] ) assert "\n" not in result.output.strip() def test_already_released_json_is_compact( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id create_release(repo, rid, run_id="agent-test") result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) assert "\n" not in result.output.strip() def test_ok_json_all_required_keys(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) for key in ( "status", "reservation_id", "run_id", "last_beat_at", "extended_expires_at", "ttl_extended_seconds", "duration_ms", ): assert key in data, f"missing JSON key: {key}" def test_ttl_extended_seconds_matches_extension_arg( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "9999", "--json"], ) data = json.loads(result.output) assert data["ttl_extended_seconds"] == 9999 def test_extended_expires_at_is_future(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--extension", "3600", "--json"], ) data = json.loads(result.output) exp = datetime.datetime.fromisoformat(data["extended_expires_at"]) assert exp > datetime.datetime.now(datetime.timezone.utc) def test_duration_ms_non_negative(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-test", "--json"] ) data = json.loads(result.output) assert data["duration_ms"] >= 0.0 # --------------------------------------------------------------------------- # Concurrent # --------------------------------------------------------------------------- class TestHeartbeatConcurrent: def test_20_threads_race_to_heartbeat_same_reservation( self, repo: pathlib.Path, reservation: Reservation ) -> None: """All threads must exit 0; atomic writes prevent corruption.""" rid = reservation.reservation_id exit_codes: list[int] = [] lock = threading.Lock() def _beat() -> None: result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-race", "--json"] ) with lock: exit_codes.append(result.exit_code) threads = [threading.Thread(target=_beat) for _ in range(20)] for t in threads: t.start() for t in threads: t.join() assert all(c == 0 for c in exit_codes), f"Non-zero exits: {exit_codes}" # Heartbeat file must be parseable after concurrent writes hb_map = load_heartbeat_map(repo) assert rid in hb_map def test_concurrent_heartbeat_last_writer_wins( self, repo: pathlib.Path, reservation: Reservation ) -> None: """After concurrent writes the heartbeat map must contain a valid entry.""" rid = reservation.reservation_id extensions = [1000, 2000, 3000, 4000, 5000] results: list[dict] = [] lock = threading.Lock() def _beat(ext: int) -> None: result = runner.invoke( cli, ["coord", "heartbeat", rid, "--run-id", "agent-lww", "--extension", str(ext), "--json"], ) if result.exit_code == 0 and result.output.strip(): with lock: results.append(json.loads(result.output)) threads = [threading.Thread(target=_beat, args=(e,)) for e in extensions] for t in threads: t.start() for t in threads: t.join() assert results, "No successful heartbeat in concurrent run" hb_map = load_heartbeat_map(repo) assert rid in hb_map # The surviving heartbeat's extended_expires_at must parse cleanly hb = hb_map[rid] assert isinstance(hb.extended_expires_at, datetime.datetime) class TestRegisterFlags: def _make_parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.heartbeat_coord import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) return p def test_default_json_out_is_false(self) -> None: p = self._make_parser() args = p.parse_args(["heartbeat", "some-reservation-id", "--run-id", "agent-1"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: p = self._make_parser() args = p.parse_args(["heartbeat", "some-reservation-id", "--run-id", "agent-1", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: p = self._make_parser() args = p.parse_args(["heartbeat", "some-reservation-id", "--run-id", "agent-1", "-j"]) assert args.json_out is True