"""Tests for ``muse coord release``, ``muse coord heartbeat``, and ``muse coord gc``. Coverage matrix --------------- Unit ~~~~ * :func:`muse.core.coordination.create_release` — write, idempotency, UUID guard * :func:`muse.core.coordination.load_all_releases` — load, corrupt-file tolerance * :func:`muse.core.coordination.load_released_ids` — efficiency path (stems only) * :func:`muse.core.coordination.create_heartbeat` — write, atomic overwrite * :func:`muse.core.coordination.load_heartbeat_map` — load, corrupt-file tolerance * :func:`muse.core.coordination.run_coord_gc` — dry-run, execute, grace period, orphans * :func:`muse.core.coordination.active_reservations` — lifecycle-aware liveness Integration ~~~~~~~~~~~ * ``muse coord release`` — single, --all-for-run, idempotent, not-found, bad args * ``muse coord heartbeat`` — extends TTL, released guard, not-found, bad args * ``muse coord gc`` — dry-run, execute, --include-intents, --verbose, JSON output * ``muse coord list`` — lifecycle-aware fields (released, effective_expires_at) Security ~~~~~~~~ * UUID injection in reservation_id → rejected before file I/O * Path traversal attempts via reservation_id → ValueError from _validate_reservation_id * ANSI escape sequences in run_id / reservation_id not written to files * GC does not follow symlinks or traverse outside .muse/coordination/ Stress ~~~~~~ * 200 reservations → release all via --all-for-run in < 2 s * 500 expired reservations → GC collects all in < 3 s * 100 heartbeats → load_heartbeat_map in < 0.5 s * release + heartbeat + gc round-trip correctness under concurrent-ish writes """ from __future__ import annotations import datetime import json import pathlib import time import uuid import pytest from muse.core._types import MsgpackDict from muse.core.coordination import Reservation # ── Helpers ─────────────────────────────────────────────────────────────────── def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialise a minimal muse repo (just the .muse dir + HEAD).""" muse_dir = tmp_path / ".muse" muse_dir.mkdir() (muse_dir / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path def _now_utc() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _dt_iso(dt: datetime.datetime) -> str: return dt.isoformat() # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: return _make_repo(tmp_path) @pytest.fixture() def reservation(repo: pathlib.Path) -> Reservation: """A fresh active reservation in *repo*.""" from muse.core.coordination import create_reservation return create_reservation( repo, run_id="agent-test", branch="feat/test", addresses=["src/billing.py::compute_total"], ttl_seconds=3600, ) @pytest.fixture() def expired_reservation(repo: pathlib.Path) -> Reservation: """A reservation whose TTL has already passed.""" from muse.core.coordination import create_reservation res = create_reservation( repo, run_id="agent-expired", branch="feat/expired", addresses=["src/old.py::dead_code"], ttl_seconds=1, ) # Back-date expires_at on disk so it's expired. import json as _json from muse.core.coordination import _reservations_dir path = _reservations_dir(repo) / f"{res.reservation_id}.json" data = _json.loads(path.read_text()) data["expires_at"] = (_now_utc() - datetime.timedelta(seconds=10)).isoformat() path.write_text(_json.dumps(data)) res.expires_at = _now_utc() - datetime.timedelta(seconds=10) return res # ───────────────────────────────────────────────────────────────────────────── # Unit tests — coordination.py primitives # ───────────────────────────────────────────────────────────────────────────── class TestCreateRelease: def test_creates_tombstone_file(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import _releases_dir, create_release rel = create_release(repo, reservation.reservation_id, "agent-test", "completed") path = _releases_dir(repo) / f"{reservation.reservation_id}.json" assert path.exists() assert rel.reason == "completed" def test_roundtrip_from_dict(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import Release, create_release rel = create_release(repo, reservation.reservation_id, "agent-test", "cancelled") data = rel.to_dict() rel2 = Release.from_dict(data) assert rel2.reservation_id == rel.reservation_id assert rel2.reason == "cancelled" def test_raises_file_exists_on_double_release(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import create_release create_release(repo, reservation.reservation_id, "agent-test", "completed") with pytest.raises(FileExistsError, match="already released"): create_release(repo, reservation.reservation_id, "agent-test", "cancelled") def test_rejects_invalid_uuid(self, repo: pathlib.Path) -> None: from muse.core.coordination import create_release with pytest.raises(ValueError, match="valid UUID"): create_release(repo, "../../etc/passwd", "agent", "completed") def test_rejects_invalid_reason(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import create_release with pytest.raises(ValueError, match="reason must be one of"): create_release(repo, reservation.reservation_id, "agent", "unknown-reason") def test_valid_reasons(self, repo: pathlib.Path) -> None: from muse.core.coordination import create_reservation, create_release for reason in ("completed", "cancelled", "superseded"): res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"]) rel = create_release(repo, res.reservation_id, "ag", reason) assert rel.reason == reason def test_path_containment_check(self, repo: pathlib.Path, reservation: Reservation) -> None: """UUID validation prevents path traversal before containment check.""" from muse.core.coordination import create_release bad_ids = [ "../../../etc", "a/b/c", "not-a-uuid", "", ] for bad_id in bad_ids: with pytest.raises(ValueError): create_release(repo, bad_id, "agent", "completed") class TestLoadReleases: def test_empty_dir(self, repo: pathlib.Path) -> None: from muse.core.coordination import load_all_releases, load_released_ids assert load_all_releases(repo) == [] assert load_released_ids(repo) == frozenset() def test_missing_dir_returns_empty(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) from muse.core.coordination import load_all_releases, load_released_ids assert load_all_releases(repo) == [] assert load_released_ids(repo) == frozenset() def test_load_released_ids_uses_stems(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import create_release, load_released_ids create_release(repo, reservation.reservation_id, "ag", "completed") ids = load_released_ids(repo) assert reservation.reservation_id in ids def test_corrupt_file_skipped(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import _releases_dir, create_release, load_all_releases create_release(repo, reservation.reservation_id, "ag", "completed") # Write a corrupt file alongside. (_releases_dir(repo) / "corrupt.json").write_text("not-json") releases = load_all_releases(repo) # Only the valid one is loaded; corrupt one is silently skipped. assert len(releases) == 1 def test_load_released_ids_includes_corrupt_stem(self, repo: pathlib.Path) -> None: """load_released_ids uses stems only — corrupt JSON doesn't block it.""" from muse.core.coordination import _releases_dir, _ensure_coord_dirs, load_released_ids _ensure_coord_dirs(repo) fake_id = str(uuid.uuid4()) (_releases_dir(repo) / f"{fake_id}.json").write_text("not-json") ids = load_released_ids(repo) assert fake_id in ids class TestCreateHeartbeat: def test_creates_heartbeat_file(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import _heartbeats_dir, create_heartbeat hb = create_heartbeat(repo, reservation.reservation_id, "agent-test", 3600) path = _heartbeats_dir(repo) / f"{reservation.reservation_id}.json" assert path.exists() assert hb.extended_expires_at > _now_utc() def test_atomic_overwrite(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import create_heartbeat hb1 = create_heartbeat(repo, reservation.reservation_id, "ag", 600) hb2 = create_heartbeat(repo, reservation.reservation_id, "ag", 7200) # Second heartbeat extends further. assert hb2.extended_expires_at > hb1.extended_expires_at def test_rejects_invalid_uuid(self, repo: pathlib.Path) -> None: from muse.core.coordination import create_heartbeat with pytest.raises(ValueError, match="valid UUID"): create_heartbeat(repo, "not/a-uuid", "ag", 3600) def test_rejects_non_positive_extension(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import create_heartbeat with pytest.raises(ValueError, match="extension_seconds"): create_heartbeat(repo, reservation.reservation_id, "ag", 0) with pytest.raises(ValueError, match="extension_seconds"): create_heartbeat(repo, reservation.reservation_id, "ag", -1) def test_roundtrip(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import Heartbeat, create_heartbeat hb = create_heartbeat(repo, reservation.reservation_id, "ag", 1800) data = hb.to_dict() hb2 = Heartbeat.from_dict(data) assert hb2.reservation_id == hb.reservation_id assert hb2.run_id == hb.run_id class TestLoadHeartbeatMap: def test_empty(self, repo: pathlib.Path) -> None: from muse.core.coordination import load_heartbeat_map assert load_heartbeat_map(repo) == {} def test_missing_dir_returns_empty(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) from muse.core.coordination import load_heartbeat_map assert load_heartbeat_map(repo) == {} def test_load_single(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import create_heartbeat, load_heartbeat_map create_heartbeat(repo, reservation.reservation_id, "ag", 3600) hb_map = load_heartbeat_map(repo) assert reservation.reservation_id in hb_map def test_corrupt_file_skipped(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import ( _heartbeats_dir, create_heartbeat, load_heartbeat_map, ) create_heartbeat(repo, reservation.reservation_id, "ag", 3600) (_heartbeats_dir(repo) / "corrupt.json").write_text("bad-json") hb_map = load_heartbeat_map(repo) assert len(hb_map) == 1 class TestActiveReservations: def test_active_reservation_included(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import active_reservations active = active_reservations(repo) ids = [r.reservation_id for r in active] assert reservation.reservation_id in ids def test_released_excluded(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import active_reservations, create_release create_release(repo, reservation.reservation_id, "ag", "completed") active = active_reservations(repo) ids = [r.reservation_id for r in active] assert reservation.reservation_id not in ids def test_expired_excluded(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import active_reservations active = active_reservations(repo) ids = [r.reservation_id for r in active] assert expired_reservation.reservation_id not in ids def test_heartbeat_extends_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import active_reservations, create_heartbeat # Heartbeat extends the expired reservation. create_heartbeat(repo, expired_reservation.reservation_id, "ag", 3600) active = active_reservations(repo) ids = [r.reservation_id for r in active] assert expired_reservation.reservation_id in ids def test_released_with_heartbeat_excluded(self, repo: pathlib.Path, reservation: Reservation) -> None: """A released reservation stays excluded even if a heartbeat exists.""" from muse.core.coordination import ( active_reservations, create_heartbeat, create_release, ) create_release(repo, reservation.reservation_id, "ag", "completed") create_heartbeat(repo, reservation.reservation_id, "ag", 7200) active = active_reservations(repo) ids = [r.reservation_id for r in active] assert reservation.reservation_id not in ids class TestRunCoordGc: def test_dry_run_removes_nothing(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import _reservations_dir, run_coord_gc result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0) assert result.dry_run is True assert result.reservations_removed >= 1 # File still exists after dry run. path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json" assert path.exists() def test_execute_removes_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import _reservations_dir, run_coord_gc result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.reservations_removed >= 1 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json" assert not path.exists() def test_active_reservation_not_collected(self, repo: pathlib.Path, reservation: Reservation) -> None: from muse.core.coordination import _reservations_dir, run_coord_gc run_coord_gc(repo, dry_run=False, grace_period_seconds=0) path = _reservations_dir(repo) / f"{reservation.reservation_id}.json" assert path.exists() def test_grace_period_protects_recently_expired(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: """With a large grace period, the recently-expired record is not collected.""" from muse.core.coordination import _reservations_dir, run_coord_gc result = run_coord_gc(repo, dry_run=False, grace_period_seconds=9999) assert result.reservations_removed == 0 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json" assert path.exists() def test_removes_release_tombstone_with_reservation(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import ( _releases_dir, create_release, run_coord_gc, ) create_release(repo, expired_reservation.reservation_id, "ag", "completed") run_coord_gc(repo, dry_run=False, grace_period_seconds=0) path = _releases_dir(repo) / f"{expired_reservation.reservation_id}.json" assert not path.exists() def test_removes_heartbeat_with_reservation(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import ( _heartbeats_dir, create_heartbeat, run_coord_gc, ) # Need to back-date the heartbeat too, so effective expiry is still in the past. hb = create_heartbeat(repo, expired_reservation.reservation_id, "ag", 1) # Back-date extended_expires_at. import json as _json path = _heartbeats_dir(repo) / f"{expired_reservation.reservation_id}.json" data = _json.loads(path.read_text()) data["extended_expires_at"] = (_now_utc() - datetime.timedelta(seconds=5)).isoformat() path.write_text(_json.dumps(data)) run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert not path.exists() def test_removes_orphaned_release(self, repo: pathlib.Path) -> None: """Release file without a matching reservation is an orphan — collect it.""" from muse.core.coordination import ( _ensure_coord_dirs, _releases_dir, run_coord_gc, ) _ensure_coord_dirs(repo) orphan_id = str(uuid.uuid4()) orphan_path = _releases_dir(repo) / f"{orphan_id}.json" orphan_path.write_text(json.dumps({"reservation_id": orphan_id, "reason": "completed"})) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.releases_removed >= 1 assert not orphan_path.exists() def test_removes_orphaned_heartbeat(self, repo: pathlib.Path) -> None: """Heartbeat file without a matching reservation is an orphan — collect it.""" from muse.core.coordination import ( _ensure_coord_dirs, _heartbeats_dir, run_coord_gc, ) _ensure_coord_dirs(repo) orphan_id = str(uuid.uuid4()) orphan_path = _heartbeats_dir(repo) / f"{orphan_id}.json" orphan_path.write_text(json.dumps({"reservation_id": orphan_id})) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.heartbeats_removed >= 1 assert not orphan_path.exists() def test_include_intents_off_by_default(self, repo: pathlib.Path) -> None: from muse.core.coordination import create_reservation, create_intent, run_coord_gc res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"]) create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "old→new") result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.intents_removed == 0 def test_include_intents_removes_old(self, repo: pathlib.Path) -> None: from muse.core.coordination import ( _intents_dir, create_intent, create_reservation, run_coord_gc, ) res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"]) create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "old→new") # Back-date the intent. intent_path = next(_intents_dir(repo).glob("*.json")) import json as _json data = _json.loads(intent_path.read_text()) data["created_at"] = (_now_utc() - datetime.timedelta(days=8)).isoformat() intent_path.write_text(_json.dumps(data)) result = run_coord_gc( repo, dry_run=False, grace_period_seconds=0, include_intents=True, max_intent_age_seconds=3600, ) assert result.intents_removed == 1 assert not intent_path.exists() def test_result_totals_match_parts(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import run_coord_gc result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.total_removed == ( result.reservations_removed + result.releases_removed + result.heartbeats_removed + result.intents_removed ) assert result.total_removed_bytes == ( result.reservations_removed_bytes + result.releases_removed_bytes + result.heartbeats_removed_bytes + result.intents_removed_bytes ) def test_elapsed_seconds_recorded(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc result = run_coord_gc(repo, dry_run=True) assert result.elapsed_seconds >= 0 def test_removed_ids_populated(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import run_coord_gc result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert expired_reservation.reservation_id in result.removed_ids # ───────────────────────────────────────────────────────────────────────────── # Integration tests — muse coord release # ───────────────────────────────────────────────────────────────────────────── class TestReleaseCmdSingle: def _run(self, args: MsgpackDict, repo: pathlib.Path) -> int: from muse.cli.commands.release_coord import run as release_run import argparse, os ns = argparse.Namespace( reservation_id=args.get("reservation_id"), run_id=args.get("run_id", "agent-42"), reason=args.get("reason", "completed"), all_for_run=None, fmt=args.get("fmt", "json"), ) old = os.getcwd() os.chdir(repo) try: release_run(ns) return 0 except SystemExit as exc: return exc.code finally: os.chdir(old) def test_release_success(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: code = self._run({"reservation_id": reservation.reservation_id}, repo) assert code == 0 out = capsys.readouterr().out data = json.loads(out) assert data["status"] == "released" assert data["reservation_id"] == reservation.reservation_id assert data["reason"] == "completed" assert isinstance(data["elapsed_seconds"], float) def test_release_reason_cancelled(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: code = self._run( {"reservation_id": reservation.reservation_id, "reason": "cancelled"}, repo, ) assert code == 0 out = capsys.readouterr().out data = json.loads(out) assert data["reason"] == "cancelled" def test_release_already_released_is_idempotent(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_release create_release(repo, reservation.reservation_id, "ag", "completed") code = self._run({"reservation_id": reservation.reservation_id}, repo) assert code == 0 out = capsys.readouterr().out data = json.loads(out) assert data["status"] == "already_released" def test_release_not_found_exits_4(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: fake_id = str(uuid.uuid4()) code = self._run({"reservation_id": fake_id}, repo) assert code == 4 # ExitCode.NOT_FOUND out = capsys.readouterr().out data = json.loads(out) assert data["status"] == "not_found" def test_bad_uuid_exits_1(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: code = self._run({"reservation_id": "not-a-uuid"}, repo) assert code == 1 # ExitCode.USER_ERROR def test_text_output(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.release_coord import run as release_run import argparse, os ns = argparse.Namespace( reservation_id=reservation.reservation_id, run_id="agent-42", reason="completed", all_for_run=None, fmt="text", ) old = os.getcwd() os.chdir(repo) try: release_run(ns) except SystemExit: pass finally: os.chdir(old) out = capsys.readouterr().out assert "released" in out assert reservation.reservation_id[:8] in out class TestReleaseCmdBatch: def _run_batch(self, repo: pathlib.Path, all_for_run: str, run_id: str = "controller") -> None: from muse.cli.commands.release_coord import run as release_run import argparse, os ns = argparse.Namespace( reservation_id=None, run_id=run_id, reason="completed", all_for_run=all_for_run, fmt="json", ) old = os.getcwd() os.chdir(repo) try: release_run(ns) except SystemExit: pass finally: os.chdir(old) def test_batch_releases_all_for_run(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation, load_released_ids ids = [] for i in range(5): res = create_reservation( repo, run_id="batch-agent", branch="b", addresses=[f"f.py::fn{i}"] ) ids.append(res.reservation_id) # One extra reservation for a different run. other = create_reservation( repo, run_id="other-agent", branch="b", addresses=["g.py::fn"] ) self._run_batch(repo, "batch-agent") released = load_released_ids(repo) for rid in ids: assert rid in released # Other agent's reservation not released. assert other.reservation_id not in released def test_batch_skips_already_released(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_release create_release(repo, reservation.reservation_id, "ag", "completed") self._run_batch(repo, "agent-test") out = capsys.readouterr().out data = json.loads(out) assert data["skipped_already_released"] >= 1 def test_cannot_combine_id_and_all_for_run(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.release_coord import run as release_run import argparse, os ns = argparse.Namespace( reservation_id=reservation.reservation_id, run_id="ag", reason="completed", all_for_run="ag", fmt="json", ) old = os.getcwd() os.chdir(repo) try: with pytest.raises(SystemExit) as exc: release_run(ns) finally: os.chdir(old) assert exc.value.code == 1 # ExitCode.USER_ERROR def test_must_specify_id_or_all_for_run(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.release_coord import run as release_run import argparse, os ns = argparse.Namespace( reservation_id=None, run_id="ag", reason="completed", all_for_run=None, fmt="json", ) old = os.getcwd() os.chdir(repo) try: with pytest.raises(SystemExit) as exc: release_run(ns) finally: os.chdir(old) assert exc.value.code == 1 # ExitCode.USER_ERROR # ───────────────────────────────────────────────────────────────────────────── # Integration tests — muse coord heartbeat # ───────────────────────────────────────────────────────────────────────────── class TestHeartbeatCmd: def _run(self, args: MsgpackDict, repo: pathlib.Path) -> int: from muse.cli.commands.heartbeat_coord import run as hb_run import argparse, os ns = argparse.Namespace( reservation_id=args.get("reservation_id"), run_id=args.get("run_id", "agent-42"), extension_seconds=args.get("extension_seconds", 3600), fmt=args.get("fmt", "json"), ) old = os.getcwd() os.chdir(repo) try: with pytest.raises(SystemExit) as exc: hb_run(ns) return exc.value.code except SystemExit as exc: return exc.code finally: os.chdir(old) def _run_no_exit(self, args: MsgpackDict, repo: pathlib.Path) -> None: from muse.cli.commands.heartbeat_coord import run as hb_run import argparse, os ns = argparse.Namespace( reservation_id=args.get("reservation_id"), run_id=args.get("run_id", "agent-42"), extension_seconds=args.get("extension_seconds", 3600), fmt=args.get("fmt", "json"), ) old = os.getcwd() os.chdir(repo) try: hb_run(ns) except SystemExit: pass finally: os.chdir(old) def test_heartbeat_success(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: self._run_no_exit({"reservation_id": reservation.reservation_id}, repo) out = capsys.readouterr().out data = json.loads(out) assert data["status"] == "ok" assert data["reservation_id"] == reservation.reservation_id assert data["ttl_extended_seconds"] == 3600 assert isinstance(data["elapsed_seconds"], float) def test_heartbeat_extends_further_on_repeat(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import load_heartbeat_map self._run_no_exit( {"reservation_id": reservation.reservation_id, "extension_seconds": 600}, repo, ) hb1 = load_heartbeat_map(repo)[reservation.reservation_id] self._run_no_exit( {"reservation_id": reservation.reservation_id, "extension_seconds": 7200}, repo, ) hb2 = load_heartbeat_map(repo)[reservation.reservation_id] assert hb2.extended_expires_at > hb1.extended_expires_at def test_heartbeat_makes_expired_active(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import active_reservations # Before heartbeat, expired. assert expired_reservation.reservation_id not in [ r.reservation_id for r in active_reservations(repo) ] self._run_no_exit( {"reservation_id": expired_reservation.reservation_id}, repo, ) # After heartbeat, active. assert expired_reservation.reservation_id in [ r.reservation_id for r in active_reservations(repo) ] def test_heartbeat_refused_for_released(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_release create_release(repo, reservation.reservation_id, "ag", "completed") code = self._run({"reservation_id": reservation.reservation_id}, repo) assert code == 1 out = capsys.readouterr().out data = json.loads(out) assert data["status"] == "already_released" def test_heartbeat_not_found_exits_4(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: code = self._run({"reservation_id": str(uuid.uuid4())}, repo) assert code == 4 # ExitCode.NOT_FOUND out = capsys.readouterr().out data = json.loads(out) assert data["status"] == "not_found" def test_bad_uuid_exits_1(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: code = self._run({"reservation_id": "bad//id"}, repo) assert code == 1 # ExitCode.USER_ERROR def test_negative_extension_exits_1(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: code = self._run( {"reservation_id": reservation.reservation_id, "extension_seconds": -1}, repo, ) assert code == 1 # ExitCode.USER_ERROR def test_zero_extension_exits_1(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: code = self._run( {"reservation_id": reservation.reservation_id, "extension_seconds": 0}, repo, ) assert code == 1 # ExitCode.USER_ERROR def test_text_output(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.heartbeat_coord import run as hb_run import argparse, os ns = argparse.Namespace( reservation_id=reservation.reservation_id, run_id="agent-42", extension_seconds=3600, fmt="text", ) old = os.getcwd() os.chdir(repo) try: hb_run(ns) except SystemExit: pass finally: os.chdir(old) out = capsys.readouterr().out assert "heartbeat" in out assert reservation.reservation_id[:8] in out # ───────────────────────────────────────────────────────────────────────────── # Integration tests — muse coord gc # ───────────────────────────────────────────────────────────────────────────── class TestCoordGcCmd: def _run(self, args: MsgpackDict, repo: pathlib.Path) -> None: from muse.cli.commands.coord_gc import run as gc_run import argparse, os ns = argparse.Namespace( execute=args.get("execute", False), grace_period_seconds=args.get("grace_period_seconds", 300), include_intents=args.get("include_intents", False), max_intent_age_seconds=args.get("max_intent_age_seconds", 604800), verbose=args.get("verbose", False), fmt=args.get("fmt", "json"), ) old = os.getcwd() os.chdir(repo) try: gc_run(ns) except SystemExit: pass finally: os.chdir(old) def test_dry_run_by_default(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: self._run({}, repo) out = capsys.readouterr().out data = json.loads(out) assert data["dry_run"] is True def test_dry_run_does_not_delete(self, repo: pathlib.Path, expired_reservation: Reservation) -> None: from muse.core.coordination import _reservations_dir self._run({"grace_period_seconds": 0}, repo) path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json" assert path.exists() def test_execute_deletes_expired(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import _reservations_dir self._run({"execute": True, "grace_period_seconds": 0}, repo) out = capsys.readouterr().out data = json.loads(out) assert data["dry_run"] is False assert data["reservations_removed"] >= 1 path = _reservations_dir(repo) / f"{expired_reservation.reservation_id}.json" assert not path.exists() def test_json_schema_complete(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: self._run({}, repo) out = capsys.readouterr().out data = json.loads(out) required_keys = { "dry_run", "grace_period_seconds", "include_intents", "max_intent_age_seconds", "reservations_removed", "reservations_removed_bytes", "releases_removed", "releases_removed_bytes", "heartbeats_removed", "heartbeats_removed_bytes", "intents_removed", "intents_removed_bytes", "total_removed", "total_removed_bytes", "removed_ids", "elapsed_seconds", } assert required_keys.issubset(data.keys()) def test_elapsed_seconds_is_float(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: self._run({}, repo) out = capsys.readouterr().out data = json.loads(out) assert isinstance(data["elapsed_seconds"], float) def test_include_intents_false_preserves_intents(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation, create_intent res = create_reservation(repo, run_id="ag", branch="b", addresses=["x::y"]) create_intent(repo, res.reservation_id, "ag", "b", ["x::y"], "rename", "") self._run({"execute": True, "grace_period_seconds": 0}, repo) out = capsys.readouterr().out data = json.loads(out) assert data["intents_removed"] == 0 def test_bad_grace_period_exits_1(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_gc import run as gc_run import argparse, os ns = argparse.Namespace( execute=False, grace_period_seconds=-1, include_intents=False, max_intent_age_seconds=604800, verbose=False, fmt="json", ) old = os.getcwd() os.chdir(repo) try: with pytest.raises(SystemExit) as exc: gc_run(ns) finally: os.chdir(old) assert exc.value.code == 1 # ExitCode.USER_ERROR def test_text_output_dry_run(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: self._run({"fmt": "text", "grace_period_seconds": 0}, repo) out = capsys.readouterr().out assert "DRY RUN" in out def test_text_output_execute(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: self._run({"fmt": "text", "execute": True, "grace_period_seconds": 0}, repo) out = capsys.readouterr().out assert "GC complete" in out def test_verbose_lists_removed_ids(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: self._run( {"fmt": "text", "execute": True, "grace_period_seconds": 0, "verbose": True}, repo, ) out = capsys.readouterr().out assert expired_reservation.reservation_id in out def test_nothing_to_collect(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: """Active reservation → nothing collected.""" self._run({"execute": True, "grace_period_seconds": 300}, repo) out = capsys.readouterr().out data = json.loads(out) assert data["total_removed"] == 0 def test_fmt_bytes_helper(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(0) == "0 B" assert _fmt_bytes(512) == "512 B" assert _fmt_bytes(1024) == "1.0 KiB" assert _fmt_bytes(1048576) == "1.0 MiB" # ───────────────────────────────────────────────────────────────────────────── # Integration tests — muse coord list (lifecycle-aware) # ───────────────────────────────────────────────────────────────────────────── class TestListCoordLifecycle: def _run_json(self, repo: pathlib.Path, extra_args: MsgpackDict | None = None) -> None: from muse.cli.commands.list_coord import run as list_run import argparse, os ns = argparse.Namespace( include_expired=True, kind="all", run_id=None, branch=None, address_glob=None, operation=None, limit=None, summary=False, fmt="json", **(extra_args or {}), ) old = os.getcwd() os.chdir(repo) try: list_run(ns) finally: os.chdir(old) def test_released_field_present(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: self._run_json(repo) out = capsys.readouterr().out data = json.loads(out) entries = {e["reservation_id"]: e for e in data["reservations"]} entry = entries[reservation.reservation_id] assert "released" in entry assert entry["released"] is False def test_released_field_true_after_release(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_release create_release(repo, reservation.reservation_id, "ag", "completed") self._run_json(repo) out = capsys.readouterr().out data = json.loads(out) entries = {e["reservation_id"]: e for e in data["reservations"]} entry = entries[reservation.reservation_id] assert entry["released"] is True assert entry["is_active"] is False def test_effective_expires_at_field(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: self._run_json(repo) out = capsys.readouterr().out data = json.loads(out) entry = data["reservations"][0] assert "effective_expires_at" in entry # Parse without error. datetime.datetime.fromisoformat(entry["effective_expires_at"]) def test_effective_expires_extended_by_heartbeat(self, repo: pathlib.Path, expired_reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_heartbeat create_heartbeat(repo, expired_reservation.reservation_id, "ag", 7200) self._run_json(repo) out = capsys.readouterr().out data = json.loads(out) entries = {e["reservation_id"]: e for e in data["reservations"]} entry = entries[expired_reservation.reservation_id] # effective_expires_at should be in the future. eff = datetime.datetime.fromisoformat(entry["effective_expires_at"]) assert eff > _now_utc() assert entry["ttl_remaining_seconds"] > 0 def test_released_reservations_count_in_json(self, repo: pathlib.Path, reservation: Reservation, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_release create_release(repo, reservation.reservation_id, "ag", "completed") self._run_json(repo) out = capsys.readouterr().out data = json.loads(out) assert "released_reservations" in data assert data["released_reservations"] >= 1 # ───────────────────────────────────────────────────────────────────────────── # Security tests # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: def test_release_uuid_injection_rejected(self, repo: pathlib.Path) -> None: from muse.core.coordination import create_release attacks = [ "../../etc/passwd", "../releases/legit-uuid", "a" * 200, "\x00evil", "' OR 1=1 --", ] for attack in attacks: with pytest.raises((ValueError, FileNotFoundError, OSError)): create_release(repo, attack, "ag", "completed") def test_heartbeat_uuid_injection_rejected(self, repo: pathlib.Path) -> None: from muse.core.coordination import create_heartbeat attacks = ["../../etc/passwd", "../heartbeats/x", "not-a-uuid"] for attack in attacks: with pytest.raises(ValueError): create_heartbeat(repo, attack, "ag", 3600) def test_release_cmd_rejects_traversal(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.release_coord import run as release_run import argparse, os ns = argparse.Namespace( reservation_id="../../etc/passwd", run_id="ag", reason="completed", all_for_run=None, fmt="json", ) old = os.getcwd() os.chdir(repo) try: with pytest.raises(SystemExit) as exc: release_run(ns) finally: os.chdir(old) assert exc.value.code == 1 # ExitCode.USER_ERROR def test_heartbeat_cmd_rejects_traversal(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.heartbeat_coord import run as hb_run import argparse, os ns = argparse.Namespace( reservation_id="../../etc/shadow", run_id="ag", extension_seconds=3600, fmt="json", ) old = os.getcwd() os.chdir(repo) try: with pytest.raises(SystemExit) as exc: hb_run(ns) finally: os.chdir(old) assert exc.value.code == 1 # ExitCode.USER_ERROR def test_gc_does_not_escape_coord_dir(self, repo: pathlib.Path) -> None: """GC only scans known subdirs — no user input used for path construction.""" from muse.core.coordination import run_coord_gc # Create a file outside .muse/coordination/ — GC must not touch it. sentinel = repo / "DO_NOT_DELETE.txt" sentinel.write_text("safe") run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert sentinel.exists() # ───────────────────────────────────────────────────────────────────────────── # Stress tests # ───────────────────────────────────────────────────────────────────────────── class TestStress: def test_batch_release_200_reservations_under_2s(self, repo: pathlib.Path) -> None: """Releasing 200 reservations via --all-for-run completes in < 2 s.""" from muse.core.coordination import create_reservation, load_released_ids from muse.cli.commands.release_coord import run as release_run import argparse, os N = 200 for i in range(N): create_reservation( repo, run_id="stress-agent", branch="feat/stress", addresses=[f"src/mod{i}.py::fn{i}"], ) ns = argparse.Namespace( reservation_id=None, run_id="controller", reason="completed", all_for_run="stress-agent", fmt="json", ) old = os.getcwd() os.chdir(repo) t0 = time.monotonic() try: release_run(ns) except SystemExit: pass finally: os.chdir(old) elapsed = time.monotonic() - t0 released = load_released_ids(repo) assert len(released) == N assert elapsed < 2.0, f"batch release took {elapsed:.2f}s — too slow" def test_gc_500_expired_reservations_under_3s(self, repo: pathlib.Path) -> None: """GC collects 500 expired reservations in < 3 s.""" from muse.core.coordination import ( _reservations_dir, _ensure_coord_dirs, run_coord_gc, ) _ensure_coord_dirs(repo) res_dir = _reservations_dir(repo) now = _now_utc() expired_at = (now - datetime.timedelta(hours=2)).isoformat() N = 500 for i in range(N): rid = str(uuid.uuid4()) data = { "schema_version": "0.0.0", "reservation_id": rid, "run_id": f"ag-{i}", "branch": "feat/x", "addresses": [f"f{i}.py::fn"], "created_at": (now - datetime.timedelta(hours=3)).isoformat(), "expires_at": expired_at, "operation": None, } (res_dir / f"{rid}.json").write_text(json.dumps(data)) t0 = time.monotonic() result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) elapsed = time.monotonic() - t0 assert result.reservations_removed == N assert elapsed < 3.0, f"GC took {elapsed:.2f}s — too slow" def test_load_heartbeat_map_100_entries_under_500ms(self, repo: pathlib.Path) -> None: """Loading 100 heartbeat files completes in < 500 ms.""" from muse.core.coordination import ( _ensure_coord_dirs, _heartbeats_dir, load_heartbeat_map, ) _ensure_coord_dirs(repo) hb_dir = _heartbeats_dir(repo) now = _now_utc() N = 100 for i in range(N): rid = str(uuid.uuid4()) data = { "schema_version": "0.0.0", "reservation_id": rid, "run_id": f"ag-{i}", "last_beat_at": now.isoformat(), "extended_expires_at": (now + datetime.timedelta(hours=1)).isoformat(), } (hb_dir / f"{rid}.json").write_text(json.dumps(data)) t0 = time.monotonic() hb_map = load_heartbeat_map(repo) elapsed = time.monotonic() - t0 assert len(hb_map) == N assert elapsed < 0.5, f"load_heartbeat_map took {elapsed:.3f}s — too slow" def test_round_trip_correctness_under_concurrent_writes(self, repo: pathlib.Path) -> None: """release → heartbeat attempt → gc correctness under rapid writes.""" from muse.core.coordination import ( active_reservations, create_heartbeat, create_release, create_reservation, load_released_ids, run_coord_gc, ) N = 50 reservations = [ create_reservation( repo, run_id=f"ag-{i}", branch="b", addresses=[f"f{i}.py::fn"] ) for i in range(N) ] # Release half. for res in reservations[:N // 2]: create_release(repo, res.reservation_id, res.run_id, "completed") # Heartbeat the other half. for res in reservations[N // 2:]: create_heartbeat(repo, res.reservation_id, res.run_id, 7200) active = active_reservations(repo) active_ids = {r.reservation_id for r in active} # Released half must not be active. for res in reservations[:N // 2]: assert res.reservation_id not in active_ids # Heartbeat half must be active. for res in reservations[N // 2:]: assert res.reservation_id in active_ids # GC with large grace period → nothing removed yet. result = run_coord_gc(repo, dry_run=False, grace_period_seconds=9999) assert result.reservations_removed == 0