"""Comprehensive tests for ``muse coord gc``. Coverage matrix --------------- Unit ~~~~ * run_coord_gc dry_run=True — does not delete files * run_coord_gc dry_run=False — deletes expired records * grace period — recently expired records skipped * include_intents flag — intents purged when opt-in * orphaned release (no reservation) — collected * orphaned heartbeat (no reservation) — collected * released reservation — collected after grace period * heartbeat-extended reservation — not collected until effective expiry passes * _fmt_bytes — all size ranges including TiB Integration ~~~~~~~~~~~ * Default (dry-run) — shows "DRY RUN", nothing deleted * --execute — actually deletes expired reservations * --grace-period large — recently expired records not deleted * --include-intents — intents counted in output * --verbose — removed IDs printed to output * --format json — valid compact JSON with all required keys * --json shorthand — same result as --format json * Empty repo — exits 0 with "Nothing to collect" * --grace-period -1 — exits USER_ERROR (1) * --max-intent-age 0 — exits USER_ERROR (1) * JSON compact — no newlines inside the object * Dry-run JSON — dry_run=true, nothing deleted * Orphaned heartbeat/release removed even with active reservations present E2E ~~~ * Full lifecycle: reserve → heartbeat → release → gc → all files gone * Active reservation survives GC; expired neighbour is collected * Concurrent dry-run GC on same repo does not crash Stress ~~~~~~ * 500 expired reservations GC'd < 3 s * 1000-record mixed repo (active + expired) — only expired collected """ from __future__ import annotations import argparse import datetime import itertools import json as _json import pathlib import time import pytest from tests.cli_test_helper import CliRunner from muse.core.types import content_hash, now_utc_iso from muse.core.paths import coordination_dir, muse_dir _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) runner = CliRunner() cli = None # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) return tmp_path # ── Helpers ─────────────────────────────────────────────────────────────────── def _write_expired_reservation(repo: pathlib.Path, run_id: str = "agent-x") -> str: coord_dir = coordination_dir(repo) / "reservations" coord_dir.mkdir(parents=True, exist_ok=True) rid = _new_id() now = datetime.datetime.now(datetime.timezone.utc) data = { "reservation_id": rid, "run_id": run_id, "branch": "main", "addresses": ["src/x.py::foo"], "operation": None, "created_at": (now - datetime.timedelta(hours=2)).isoformat(), "expires_at": (now - datetime.timedelta(hours=1)).isoformat(), } (coord_dir / f"{rid}.json").write_text(_json.dumps(data)) return rid def _write_active_reservation(repo: pathlib.Path, run_id: str = "agent-y") -> str: coord_dir = coordination_dir(repo) / "reservations" coord_dir.mkdir(parents=True, exist_ok=True) rid = _new_id() now = datetime.datetime.now(datetime.timezone.utc) data = { "reservation_id": rid, "run_id": run_id, "branch": "main", "addresses": ["src/y.py::bar"], "operation": None, "created_at": now.isoformat(), "expires_at": (now + datetime.timedelta(hours=1)).isoformat(), } (coord_dir / f"{rid}.json").write_text(_json.dumps(data)) return rid def _write_expired_intent(repo: pathlib.Path, run_id: str = "agent-z") -> str: intent_dir = coordination_dir(repo) / "intents" intent_dir.mkdir(parents=True, exist_ok=True) iid = _new_id() now = datetime.datetime.now(datetime.timezone.utc) data = { "intent_id": iid, "run_id": run_id, "branch": "main", "addresses": ["src/z.py::baz"], "created_at": (now - datetime.timedelta(days=10)).isoformat(), "expires_at": (now - datetime.timedelta(days=9)).isoformat(), } (intent_dir / f"{iid}.json").write_text(_json.dumps(data)) return iid # ── Unit: run_coord_gc ──────────────────────────────────────────────────────── class TestRunCoordGcUnit: def test_dry_run_does_not_delete_files(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc rid = _write_expired_reservation(repo) result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0) assert result.dry_run is True # File must still exist after dry run res_file = coordination_dir(repo) / "reservations" / f"{rid}.json" assert res_file.exists() def test_execute_deletes_expired_reservation(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc rid = _write_expired_reservation(repo) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.dry_run is False assert result.reservations_removed >= 1 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json" assert not res_file.exists() def test_active_reservation_not_deleted(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc rid = _write_active_reservation(repo) run_coord_gc(repo, dry_run=False, grace_period_seconds=0) res_file = coordination_dir(repo) / "reservations" / f"{rid}.json" assert res_file.exists() def test_grace_period_protects_recently_expired(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc rid = _write_expired_reservation(repo) # Grace period of 7 hours (25200s) > 1 hour elapsed since expiry → skipped result = run_coord_gc(repo, dry_run=False, grace_period_seconds=25200) assert result.reservations_removed == 0 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json" assert res_file.exists() def test_include_intents_false_leaves_intents(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc iid = _write_expired_intent(repo) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0, include_intents=False) assert result.intents_removed == 0 intent_file = coordination_dir(repo) / "intents" / f"{iid}.json" assert intent_file.exists() def test_include_intents_true_removes_old_intents(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc _write_expired_intent(repo) result = run_coord_gc( repo, dry_run=False, grace_period_seconds=0, include_intents=True, max_intent_age_seconds=60, # 1 minute — our intent is 10 days old ) assert result.intents_removed >= 1 def test_result_has_duration_ms(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0) assert result.duration_ms >= 0.0 def test_empty_repo_total_removed_is_zero(self, repo: pathlib.Path) -> None: from muse.core.coordination import run_coord_gc result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.total_removed == 0 # ── Integration ─────────────────────────────────────────────────────────────── class TestCoordGcIntegration: def test_default_dry_run(self, repo: pathlib.Path) -> None: _write_expired_reservation(repo) result = runner.invoke(cli, ["coord", "gc"]) assert result.exit_code == 0 assert "DRY RUN" in result.output def test_default_dry_run_does_not_delete(self, repo: pathlib.Path) -> None: rid = _write_expired_reservation(repo) runner.invoke(cli, ["coord", "gc"]) res_file = coordination_dir(repo) / "reservations" / f"{rid}.json" assert res_file.exists() def test_execute_deletes_expired(self, repo: pathlib.Path) -> None: rid = _write_expired_reservation(repo) result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"]) assert result.exit_code == 0 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json" assert not res_file.exists() def test_execute_text_output_shows_gc_complete(self, repo: pathlib.Path) -> None: _write_expired_reservation(repo) result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"]) assert result.exit_code == 0 assert "GC complete" in result.output def test_grace_period_large_nothing_deleted(self, repo: pathlib.Path) -> None: _write_expired_reservation(repo) # expired 1 hour ago # Grace period of 7 hours (25200s) > 1 hour elapsed since expiry → skipped result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "25200"]) assert result.exit_code == 0 assert "Nothing to collect" in result.output def test_include_intents_flag_reaches_intents(self, repo: pathlib.Path) -> None: _write_expired_intent(repo) result = runner.invoke(cli, [ "coord", "gc", "--execute", "--include-intents", "--max-intent-age", "60", "--grace-period", "0", ]) assert result.exit_code == 0 def test_verbose_prints_removed_ids(self, repo: pathlib.Path) -> None: rid = _write_expired_reservation(repo) result = runner.invoke(cli, [ "coord", "gc", "--execute", "--grace-period", "0", "--verbose", ]) assert result.exit_code == 0 assert rid in result.output def test_format_json_valid_structure(self, repo: pathlib.Path) -> None: _write_expired_reservation(repo) result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0", "--json"]) assert result.exit_code == 0 data = _json.loads(result.output.strip()) required_keys = { "dry_run", "grace_period_seconds", "include_intents", "max_intent_age_seconds", "reservations_removed", "reservations_removed_bytes", "releases_removed", "releases_removed_bytes", "heartbeats_removed", "heartbeats_removed_bytes", "intents_removed", "intents_removed_bytes", "total_removed", "total_removed_bytes", "removed_ids", "duration_ms", } assert required_keys <= set(data) def test_json_shorthand(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "gc", "--json"]) assert result.exit_code == 0 data = _json.loads(result.output.strip()) assert "dry_run" in data assert data["dry_run"] is True def test_empty_repo_exits_0_nothing_to_collect(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "gc"]) assert result.exit_code == 0 assert "Nothing to collect" in result.output def test_grace_period_negative_exits_user_error(self, repo: pathlib.Path) -> None: from muse.core.errors import ExitCode result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1"]) assert result.exit_code == ExitCode.USER_ERROR def test_max_intent_age_zero_exits_user_error(self, repo: pathlib.Path) -> None: from muse.core.errors import ExitCode result = runner.invoke(cli, ["coord", "gc", "--max-intent-age", "0"]) assert result.exit_code == ExitCode.USER_ERROR def test_json_dry_run_field_true_by_default(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "gc", "--json"]) data = _json.loads(result.output.strip()) assert data["dry_run"] is True def test_json_execute_dry_run_field_false(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "gc", "--execute", "--json"]) data = _json.loads(result.output.strip()) assert data["dry_run"] is False def test_duration_ms_is_nonnegative_float(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "gc", "--json"]) data = _json.loads(result.output.strip()) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 # ── Security ────────────────────────────────────────────────────────────────── class TestCoordGcSecurity: def test_gc_does_not_traverse_outside_coordination_dir(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: """Verify GC only touches .muse/coordination/ subdirectories.""" sentinel = tmp_path / "outside_sentinel.json" sentinel.write_text('{"should": "not be deleted"}') result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"]) assert result.exit_code == 0 assert sentinel.exists(), "GC must not delete files outside .muse/coordination/" def test_symlink_outside_repo_not_followed(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: """A symlink inside coordination/ pointing outside the repo is skipped.""" import os outside_file = tmp_path / "secret.json" outside_file.write_text('{"secret": "data"}') reservations_dir = coordination_dir(repo) / "reservations" reservations_dir.mkdir(parents=True, exist_ok=True) link = reservations_dir / "malicious_link.json" try: os.symlink(outside_file, link) except OSError: pytest.skip("symlink creation not supported") # GC should not crash and should not delete the outside file result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"]) assert result.exit_code == 0 assert outside_file.exists() # ── Stress ──────────────────────────────────────────────────────────────────── class TestCoordGcStress: def test_500_expired_reservations_under_3s(self, repo: pathlib.Path) -> None: for _ in range(500): _write_expired_reservation(repo) t0 = time.monotonic() result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"]) elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 3.0 data_lines = result.output assert "GC complete" in data_lines or "removed" in data_lines.lower() # Verify all files are gone reservations_dir = coordination_dir(repo) / "reservations" remaining = list(reservations_dir.glob("*.json")) assert len(remaining) == 0 def test_1000_mixed_repo_only_expired_collected(self, repo: pathlib.Path) -> None: """500 active + 500 expired: only the expired set is removed.""" for _ in range(500): _write_expired_reservation(repo) active_ids = {_write_active_reservation(repo) for _ in range(500)} result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"]) assert result.exit_code == 0 reservations_dir = coordination_dir(repo) / "reservations" surviving = {p.stem for p in reservations_dir.glob("*.json")} assert active_ids == surviving, "Active reservations must not be collected" # --------------------------------------------------------------------------- # Unit — _fmt_bytes # --------------------------------------------------------------------------- class TestFmtBytes: def test_zero(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(0) == "0 B" def test_under_1024(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(1023) == "1023 B" def test_exactly_1_kib(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(1024) == "1.0 KiB" def test_exactly_1_mib(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(1024 ** 2) == "1.0 MiB" def test_exactly_1_gib(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(1024 ** 3) == "1.0 GiB" def test_exactly_1_tib(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(1024 ** 4) == "1.0 TiB" def test_2_tib(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes assert _fmt_bytes(2 * 1024 ** 4) == "2.0 TiB" def test_gib_does_not_overflow_to_wrong_unit(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes # 512 GiB — must stay in GiB, not TiB assert _fmt_bytes(512 * 1024 ** 3) == "512.0 GiB" def test_1023_gib_stays_gib(self) -> None: from muse.cli.commands.coord_gc import _fmt_bytes result = _fmt_bytes(1023 * 1024 ** 3) assert "GiB" in result # --------------------------------------------------------------------------- # Unit — orphan / lifecycle # --------------------------------------------------------------------------- class TestRunCoordGcOrphanAndLifecycle: def test_orphaned_release_collected(self, repo: pathlib.Path) -> None: """A release tombstone with no matching reservation is an orphan — collect it.""" from muse.core.coordination import run_coord_gc releases_dir = coordination_dir(repo) / "releases" releases_dir.mkdir(parents=True, exist_ok=True) orphan_id = _new_id() orphan_path = releases_dir / f"{orphan_id}.json" orphan_path.write_text(_json.dumps({ "reservation_id": orphan_id, "run_id": "ghost", "released_at": now_utc_iso(), "reason": "completed", })) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.releases_removed >= 1 assert not orphan_path.exists() def test_orphaned_heartbeat_collected(self, repo: pathlib.Path) -> None: """A heartbeat file with no matching reservation is an orphan — collect it.""" from muse.core.coordination import run_coord_gc hb_dir = coordination_dir(repo) / "heartbeats" hb_dir.mkdir(parents=True, exist_ok=True) orphan_id = _new_id() orphan_path = hb_dir / f"{orphan_id}.json" now = datetime.datetime.now(datetime.timezone.utc) orphan_path.write_text(_json.dumps({ "reservation_id": orphan_id, "run_id": "ghost", "last_beat_at": now.isoformat(), "extended_expires_at": (now + datetime.timedelta(hours=1)).isoformat(), })) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.heartbeats_removed >= 1 assert not orphan_path.exists() def test_released_reservation_collected_after_grace(self, repo: pathlib.Path) -> None: """Reserve → release → gc(grace=0): reservation + tombstone both gone.""" from muse.core.coordination import ( run_coord_gc, create_reservation, create_release ) res = create_reservation( repo, run_id="agent-r", branch="main", addresses=["a.py::f"], ttl_seconds=3600, ) rid = res.reservation_id create_release(repo, rid, run_id="agent-r") result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.reservations_removed >= 1 assert result.releases_removed >= 1 res_path = coordination_dir(repo) / "reservations" / f"{rid}.json" rel_path = coordination_dir(repo) / "releases" / f"{rid}.json" assert not res_path.exists() assert not rel_path.exists() def test_heartbeat_extended_reservation_not_expired(self, repo: pathlib.Path) -> None: """A reservation past its original TTL but heartbeated is still active.""" from muse.core.coordination import ( run_coord_gc, create_reservation, create_heartbeat ) # Reservation expired 30 minutes ago coord_dir = coordination_dir(repo) / "reservations" coord_dir.mkdir(parents=True, exist_ok=True) rid = _new_id() now = datetime.datetime.now(datetime.timezone.utc) data = { "reservation_id": rid, "run_id": "agent-hb", "branch": "main", "addresses": ["a.py::f"], "operation": None, "created_at": (now - datetime.timedelta(hours=2)).isoformat(), "expires_at": (now - datetime.timedelta(minutes=30)).isoformat(), } (coord_dir / f"{rid}.json").write_text(_json.dumps(data)) # Heartbeat extends it 2 hours into the future hb_dir = coordination_dir(repo) / "heartbeats" hb_dir.mkdir(parents=True, exist_ok=True) (hb_dir / f"{rid}.json").write_text(_json.dumps({ "reservation_id": rid, "run_id": "agent-hb", "last_beat_at": now.isoformat(), "extended_expires_at": (now + datetime.timedelta(hours=2)).isoformat(), })) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result.reservations_removed == 0 assert (coord_dir / f"{rid}.json").exists(), "Heartbeated reservation must survive" def test_dry_run_removed_ids_populated(self, repo: pathlib.Path) -> None: """dry_run=True must populate removed_ids even though nothing is deleted.""" from muse.core.coordination import run_coord_gc rid = _write_expired_reservation(repo) result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0) assert rid in result.removed_ids res_path = coordination_dir(repo) / "reservations" / f"{rid}.json" assert res_path.exists(), "dry_run must not delete files" def test_total_removed_is_sum_of_parts(self, repo: pathlib.Path) -> None: """total_removed == sum of all category counters.""" from muse.core.coordination import run_coord_gc _write_expired_reservation(repo) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) expected = ( result.reservations_removed + result.releases_removed + result.heartbeats_removed + result.intents_removed ) assert result.total_removed == expected def test_total_bytes_is_sum_of_parts(self, repo: pathlib.Path) -> None: """total_removed_bytes == sum of all byte counters.""" from muse.core.coordination import run_coord_gc _write_expired_reservation(repo) result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) expected = ( result.reservations_removed_bytes + result.releases_removed_bytes + result.heartbeats_removed_bytes + result.intents_removed_bytes ) assert result.total_removed_bytes == expected # --------------------------------------------------------------------------- # Integration — JSON compact format and new exit codes # --------------------------------------------------------------------------- class TestCoordGcJsonAndExitCodes: def test_json_output_is_compact(self, repo: pathlib.Path) -> None: """JSON must be single-line (no indent=2 pretty-printing).""" result = runner.invoke(cli, ["coord", "gc", "--json"]) assert result.exit_code == 0 assert "\n" not in result.output.strip() def test_json_execute_compact(self, repo: pathlib.Path) -> None: _write_expired_reservation(repo) result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0", "--json"]) assert result.exit_code == 0 assert "\n" not in result.output.strip() def test_grace_period_negative_json_has_status(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1", "--json"]) data = _json.loads(result.output) assert data["status"] == "bad_args" def test_max_intent_age_zero_json_has_status(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "gc", "--max-intent-age", "0", "--json"]) data = _json.loads(result.output) assert data["status"] == "bad_args" def test_grace_period_negative_exits_user_error(self, repo: pathlib.Path) -> None: from muse.core.errors import ExitCode result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1"]) assert result.exit_code == ExitCode.USER_ERROR def test_max_intent_age_zero_exits_user_error(self, repo: pathlib.Path) -> None: from muse.core.errors import ExitCode result = runner.invoke(cli, ["coord", "gc", "--max-intent-age", "0"]) assert result.exit_code == ExitCode.USER_ERROR def test_json_removed_ids_list_populated(self, repo: pathlib.Path) -> None: rid = _write_expired_reservation(repo) result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0", "--json"]) data = _json.loads(result.output) assert rid in data["removed_ids"] def test_json_dry_run_removed_ids_populated(self, repo: pathlib.Path) -> None: """removed_ids is populated in dry-run JSON too.""" rid = _write_expired_reservation(repo) result = runner.invoke(cli, ["coord", "gc", "--json"]) data = _json.loads(result.output) assert rid in data["removed_ids"] assert data["dry_run"] is True def test_orphaned_release_in_json_output(self, repo: pathlib.Path) -> None: """Orphaned release collected and reflected in JSON releases_removed.""" releases_dir = coordination_dir(repo) / "releases" releases_dir.mkdir(parents=True, exist_ok=True) orphan_id = _new_id() (releases_dir / f"{orphan_id}.json").write_text(_json.dumps({ "reservation_id": orphan_id, "run_id": "ghost", "released_at": now_utc_iso(), "reason": "completed", })) result = runner.invoke(cli, [ "coord", "gc", "--execute", "--grace-period", "0", "--json" ]) data = _json.loads(result.output) assert data["releases_removed"] >= 1 def test_error_message_uses_emoji_prefix(self, repo: pathlib.Path) -> None: """Validation errors must start with ❌, not bare 'error:'.""" result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1"]) combined = result.output + (result.stderr or "") assert "❌" in combined # --------------------------------------------------------------------------- # E2E — full lifecycle # --------------------------------------------------------------------------- class TestCoordGcE2E: def test_full_lifecycle_reserve_release_gc(self, repo: pathlib.Path) -> None: """Reserve → release → GC: reservation + tombstone both removed.""" from muse.core.coordination import ( create_reservation, create_release, run_coord_gc, load_all_reservations, load_released_ids, ) res = create_reservation( repo, run_id="e2e-agent", branch="main", addresses=["src/e2e.py::func"], ttl_seconds=3600, ) rid = res.reservation_id create_release(repo, rid, run_id="e2e-agent") gc_result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert rid in gc_result.removed_ids res_path = coordination_dir(repo) / "reservations" / f"{rid}.json" rel_path = coordination_dir(repo) / "releases" / f"{rid}.json" assert not res_path.exists() assert not rel_path.exists() def test_active_survives_while_expired_neighbour_collected(self, repo: pathlib.Path) -> None: """One active + one expired: only expired is collected.""" from muse.core.coordination import run_coord_gc active_id = _write_active_reservation(repo) expired_id = _write_expired_reservation(repo) run_coord_gc(repo, dry_run=False, grace_period_seconds=0) res_dir = coordination_dir(repo) / "reservations" surviving = {p.stem for p in res_dir.glob("*.json")} assert active_id in surviving assert expired_id not in surviving def test_gc_with_heartbeat_extended_reservation(self, repo: pathlib.Path) -> None: """Expired-by-TTL but heartbeat-extended: survives GC.""" from muse.core.coordination import ( create_reservation, create_heartbeat, run_coord_gc ) res = create_reservation( repo, run_id="hb-agent", branch="main", addresses=["src/hb.py::func"], ttl_seconds=1, ) rid = res.reservation_id # Heartbeat extends 2 hours into the future create_heartbeat(repo, rid, run_id="hb-agent", extension_seconds=7200) gc_result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert rid not in gc_result.removed_ids res_path = coordination_dir(repo) / "reservations" / f"{rid}.json" assert res_path.exists() def test_repeated_gc_is_idempotent(self, repo: pathlib.Path) -> None: """Running GC twice on an already-clean repo returns zero totals.""" from muse.core.coordination import run_coord_gc _write_expired_reservation(repo) run_coord_gc(repo, dry_run=False, grace_period_seconds=0) result2 = run_coord_gc(repo, dry_run=False, grace_period_seconds=0) assert result2.total_removed == 0 def test_gc_via_cli_full_lifecycle(self, repo: pathlib.Path) -> None: """End-to-end via CLI: reserve → release → gc --execute → JSON confirms removal.""" from muse.core.coordination import create_reservation, create_release res = create_reservation( repo, run_id="cli-e2e", branch="main", addresses=["src/cli.py::func"], ttl_seconds=3600, ) rid = res.reservation_id create_release(repo, rid, run_id="cli-e2e") result = runner.invoke(cli, [ "coord", "gc", "--execute", "--grace-period", "0", "--json", ]) assert result.exit_code == 0 data = _json.loads(result.output) assert rid in data["removed_ids"] assert data["reservations_removed"] >= 1 assert data["releases_removed"] >= 1 # --------------------------------------------------------------------------- # Concurrent # --------------------------------------------------------------------------- class TestCoordGcConcurrent: def test_concurrent_dry_run_does_not_crash(self, repo: pathlib.Path) -> None: """20 concurrent dry-run GC passes on the same repo must all succeed.""" import threading for _ in range(50): _write_expired_reservation(repo) errors: list[Exception] = [] lock = threading.Lock() def _gc() -> None: try: result = runner.invoke(cli, ["coord", "gc", "--json"]) assert result.exit_code == 0 except Exception as exc: # noqa: BLE001 with lock: errors.append(exc) threads = [threading.Thread(target=_gc) for _ in range(20)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent dry-run errors: {errors}" def test_concurrent_execute_leaves_no_files(self, repo: pathlib.Path) -> None: """Two concurrent execute passes must not leave any collectable files on disk. Both passes may succeed or one may race ahead — either is fine. The critical invariant is that all expired files are gone afterward. """ import threading from muse.core.coordination import run_coord_gc for _ in range(100): _write_expired_reservation(repo) def _gc() -> None: run_coord_gc(repo, dry_run=False, grace_period_seconds=0) t1 = threading.Thread(target=_gc) t2 = threading.Thread(target=_gc) t1.start() t2.start() t1.join() t2.join() res_dir = coordination_dir(repo) / "reservations" if res_dir.exists(): remaining = list(res_dir.glob("*.json")) assert remaining == [], f"Files left after concurrent GC: {remaining}" # --------------------------------------------------------------------------- # TestRegisterFlags — --json / -j normalized at argparse level # --------------------------------------------------------------------------- class TestRegisterFlags: """register() must expose --json with -j shorthand and dest=json_out.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse as ap from muse.cli.commands.coord_gc import register root = ap.ArgumentParser() subs = root.add_subparsers() register(subs) return root def test_json_out_default_false(self) -> None: p = self._make_parser() ns = p.parse_args(['gc']) assert ns.json_out is False def test_json_out_true_with_json_flag(self) -> None: p = self._make_parser() ns = p.parse_args(['gc', '--json']) assert ns.json_out is True def test_json_out_true_with_j_flag(self) -> None: p = self._make_parser() ns = p.parse_args(['gc', '-j']) assert ns.json_out is True