"""Comprehensive tests for ``muse coord release``. Coverage matrix --------------- Unit ~~~~ * create_release roundtrip — write, load, fields intact * Release.to_dict — required keys present * Valid reasons accepted — completed, cancelled, superseded * Invalid reason rejected — ValueError raised * Double release raises FileExistsError * Path traversal in reservation_id rejected before file I/O Integration — CLI ~~~~~~~~~~~~~~~~~ * Single release — text output, exit 0 * --reason cancelled — accepted * --reason superseded — accepted * --all-for-run — releases all active reservations for run-id * --all-for-run with no active reservations — 0 released, exit 0 * Double release exits 0 (already released — idempotent) * Not-found sha256 ID exits ExitCode.NOT_FOUND (4) * Invalid ID exits ExitCode.USER_ERROR (1) * --format json — valid JSON, required keys, compact (no indent) * --json shorthand — valid JSON Input validation ~~~~~~~~~~~~~~~~ * --run-id at exactly 256 chars accepted * --run-id over 256 chars rejected with USER_ERROR (1) * --run-id validation fires before any file I/O (no reservation file left) * Mutual exclusion errors use USER_ERROR (1) Security ~~~~~~~~ * sha256 ID path traversal rejected before file I/O * ANSI escape sequences in run_id are safe (sanitized in output) * null byte in reservation_id rejected * sha256 ID validation before require_repo — no side effects on bad input Concurrent ~~~~~~~~~~ * Two threads racing to release the same reservation — both exit 0 Stress ~~~~~~ * 200 reservations released via --all-for-run < 3 s * 50 concurrent --all-for-run reads do not corrupt each other """ from __future__ import annotations import datetime import json import pathlib import time from muse.core.types import fake_id from muse.core.paths import muse_dir import threading import pytest from tests.cli_test_helper import CliRunner from muse.core.coordination import ( Release, Reservation, create_release, create_reservation, load_all_reservations, load_released_ids, ) from muse.cli.commands.release_coord import _MAX_RUN_ID_LEN from muse.core.errors import ExitCode cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now_utc() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke(cli, ["init", "--domain", "code"]) assert r.exit_code == 0, r.output return tmp_path @pytest.fixture() def reservation(repo: pathlib.Path) -> Reservation: return create_reservation( repo, run_id="agent-test", branch="feat/test", addresses=["src/billing.py::compute_total"], ttl_seconds=3600, ) # --------------------------------------------------------------------------- # Unit — core helpers # --------------------------------------------------------------------------- class TestCreateReleaseRoundtrip: def test_roundtrip_fields(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) rid = res.reservation_id rel = create_release(root, rid, run_id="agent-1", reason="completed") assert rel.reservation_id == rid assert rel.run_id == "agent-1" assert rel.reason == "completed" assert isinstance(rel.released_at, datetime.datetime) def test_release_persisted_to_disk(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) rid = res.reservation_id create_release(root, rid, run_id="agent-1", reason="completed") released = load_released_ids(root) assert rid in released def test_to_dict_keys(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="agent-1", branch="main", addresses=["a.py::f"], ttl_seconds=3600) rel = create_release(root, res.reservation_id, run_id="agent-1") d = rel.to_dict() assert "reservation_id" in d assert "run_id" in d assert "released_at" in d assert "reason" in d assert "schema_version" in d def test_valid_reason_completed(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="a", branch="main", addresses=["x.py::g"], ttl_seconds=3600) rel = create_release(root, res.reservation_id, run_id="a", reason="completed") assert rel.reason == "completed" def test_valid_reason_cancelled(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="a", branch="main", addresses=["x.py::g"], ttl_seconds=3600) rel = create_release(root, res.reservation_id, run_id="a", reason="cancelled") assert rel.reason == "cancelled" def test_valid_reason_superseded(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="a", branch="main", addresses=["x.py::g"], ttl_seconds=3600) rel = create_release(root, res.reservation_id, run_id="a", reason="superseded") assert rel.reason == "superseded" def test_invalid_reason_raises(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="a", branch="main", addresses=["x.py::g"], ttl_seconds=3600) with pytest.raises(ValueError, match="reason must be one of"): create_release(root, res.reservation_id, run_id="a", reason="bogus") def test_double_release_raises_file_exists(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) res = create_reservation(root, run_id="a", branch="main", addresses=["x.py::g"], ttl_seconds=3600) create_release(root, res.reservation_id, run_id="a") with pytest.raises(FileExistsError): create_release(root, res.reservation_id, run_id="a") def test_path_traversal_reservation_id_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) with pytest.raises(ValueError, match="sha256"): create_release(root, "../../etc/passwd", run_id="a") def test_path_traversal_with_slashes_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) with pytest.raises(ValueError): create_release(root, "../traversal/path", run_id="a") def test_non_sha256_string_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) with pytest.raises(ValueError): create_release(root, "not-a-valid-id", run_id="a") # --------------------------------------------------------------------------- # Integration — CLI # --------------------------------------------------------------------------- class TestReleaseSingleCli: def test_basic_release_exits_zero(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-1"]) assert result.exit_code == 0, result.output def test_basic_release_text_contains_id(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-1"]) assert rid[:8] in result.output or rid in result.output def test_basic_release_text_contains_released(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-1"]) assert "released" in result.output.lower() def test_reason_cancelled(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--reason", "cancelled"] ) assert result.exit_code == 0, result.output assert "cancelled" in result.output def test_reason_superseded(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--reason", "superseded"] ) assert result.exit_code == 0, result.output assert "superseded" in result.output def test_double_release_exits_zero_idempotent(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id r1 = runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-1"]) assert r1.exit_code == 0, r1.output r2 = runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-1"]) # Already released — idempotent path returns 0 assert r2.exit_code == 0, r2.output assert "already released" in r2.output.lower() def test_not_found_id_exits_nonzero(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-reservation") result = runner.invoke( cli, ["coord", "release", nonexistent, "--run-id", "agent-1"] ) assert result.exit_code != 0 combined = result.output + (result.stderr or "") assert "not found" in combined.lower() def test_invalid_id_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "release", "not-a-valid-id", "--run-id", "agent-1"] ) assert result.exit_code != 0 def test_no_reservation_id_and_no_all_for_run_exits_nonzero( self, repo: pathlib.Path ) -> None: result = runner.invoke(cli, ["coord", "release", "--run-id", "agent-1"]) assert result.exit_code != 0 def test_both_reservation_id_and_all_for_run_exits_nonzero( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--all-for-run", "agent-1"], ) assert result.exit_code != 0 class TestReleaseFormatJson: def test_format_json_flag(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["reservation_id"] == rid assert data["run_id"] == "agent-1" assert "reason" in data assert "released_at" in data def test_json_shorthand(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"] ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["reservation_id"] == rid def test_json_status_released(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "released" def test_json_duration_ms_present(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_json_not_found_has_status(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-reservation") result = runner.invoke( cli, ["coord", "release", nonexistent, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "not_found" def test_json_already_released_has_status(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-1"]) result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "already_released" class TestReleaseAllForRun: def test_all_for_run_releases_all(self, repo: pathlib.Path) -> None: for i in range(5): create_reservation( repo, run_id="batch-agent", branch="main", addresses=[f"src/m{i}.py::f"], ttl_seconds=3600, ) result = runner.invoke( cli, ["coord", "release", "--all-for-run", "batch-agent", "--run-id", "batch-agent"] ) assert result.exit_code == 0, result.output released = load_released_ids(repo) all_res = load_all_reservations(repo) batch_ids = {r.reservation_id for r in all_res if r.run_id == "batch-agent"} assert batch_ids == batch_ids & released def test_all_for_run_no_active_exits_zero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "release", "--all-for-run", "nonexistent-run", "--run-id", "agent-1"], ) assert result.exit_code == 0, result.output def test_all_for_run_no_active_text_shows_zero(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "release", "--all-for-run", "nonexistent-run", "--run-id", "agent-1"], ) assert "0" in result.output def test_all_for_run_json_schema(self, repo: pathlib.Path) -> None: for i in range(3): create_reservation( repo, run_id="run-j", branch="main", addresses=[f"src/x{i}.py::f"], ttl_seconds=3600, ) result = runner.invoke( cli, ["coord", "release", "--all-for-run", "run-j", "--run-id", "run-j", "--json"], ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "released" in data assert "skipped_already_released" in data assert "duration_ms" in data assert data["status"] == "ok" assert len(data["released"]) == 3 def test_all_for_run_released_entries_have_required_keys(self, repo: pathlib.Path) -> None: create_reservation( repo, run_id="run-k", branch="main", addresses=["a.py::b"], ttl_seconds=3600, ) result = runner.invoke( cli, ["coord", "release", "--all-for-run", "run-k", "--run-id", "run-k", "--json"], ) data = json.loads(result.output) entry = data["released"][0] assert "reservation_id" in entry assert "run_id" in entry assert "released_at" in entry assert "reason" in entry def test_all_for_run_only_targets_matching_run_id(self, repo: pathlib.Path) -> None: res_a = create_reservation( repo, run_id="run-a", branch="main", addresses=["a.py::x"], ttl_seconds=3600, ) create_reservation( repo, run_id="run-b", branch="main", addresses=["b.py::y"], ttl_seconds=3600, ) runner.invoke( cli, ["coord", "release", "--all-for-run", "run-a", "--run-id", "run-a"], ) released = load_released_ids(repo) assert res_a.reservation_id in released # run-b should not be released all_res = load_all_reservations(repo) run_b_ids = {r.reservation_id for r in all_res if r.run_id == "run-b"} assert not run_b_ids & released def test_all_for_run_with_reason_cancelled(self, repo: pathlib.Path) -> None: create_reservation( repo, run_id="run-c", branch="main", addresses=["c.py::f"], ttl_seconds=3600, ) result = runner.invoke( cli, [ "coord", "release", "--all-for-run", "run-c", "--run-id", "run-c", "--reason", "cancelled", "--json", ], ) data = json.loads(result.output) assert data["released"][0]["reason"] == "cancelled" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestReleaseSecurity: def test_path_traversal_in_reservation_id_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "release", "../../etc/passwd", "--run-id", "agent-1"] ) assert result.exit_code != 0 # Must not have created any file outside coord dir assert not (repo / "etc").exists() assert not (repo.parent / "etc").exists() def test_path_traversal_with_dots_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "release", "../sneaky", "--run-id", "agent-1"] ) assert result.exit_code != 0 def test_ansi_in_run_id_does_not_crash(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id ansi_run_id = "\x1b[31magent-malicious\x1b[0m" result = runner.invoke( cli, ["coord", "release", rid, "--run-id", ansi_run_id] ) # Should complete without crashing (exit 0 or 1 depending on sanitization) # The important thing is it doesn't raise an unhandled exception assert result.exit_code in (0, 1, 2) def test_null_byte_in_reservation_id_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "release", "abc\x00def", "--run-id", "agent-1"] ) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestReleaseStress: def test_200_reservations_all_for_run_under_3s(self, repo: pathlib.Path) -> None: for i in range(200): create_reservation( repo, run_id="stress-agent", branch="feat/stress", addresses=[f"src/stress_{i}.py::func"], ttl_seconds=3600, ) t0 = time.monotonic() result = runner.invoke( cli, ["coord", "release", "--all-for-run", "stress-agent", "--run-id", "stress-agent"], ) elapsed = time.monotonic() - t0 assert result.exit_code == 0, result.output assert elapsed < 3.0, f"Batch release took {elapsed:.2f}s (> 3s limit)" released = load_released_ids(repo) all_res = load_all_reservations(repo) stress_ids = {r.reservation_id for r in all_res if r.run_id == "stress-agent"} assert len(stress_ids) == 200 assert stress_ids <= released def test_50_concurrent_list_reads_do_not_corrupt(self, repo: pathlib.Path) -> None: """Concurrent reads of the released-IDs set must never raise.""" for i in range(20): res = create_reservation( repo, run_id="concurrent-read-agent", branch="main", addresses=[f"src/r{i}.py::f"], ttl_seconds=3600, ) create_release(repo, res.reservation_id, run_id="concurrent-read-agent") errors: list[Exception] = [] def _reader() -> None: try: ids = load_released_ids(repo) assert len(ids) >= 20 except Exception as exc: # noqa: BLE001 errors.append(exc) threads = [threading.Thread(target=_reader) for _ in range(50)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent read errors: {errors}" # --------------------------------------------------------------------------- # Input validation # --------------------------------------------------------------------------- class TestReleaseInputValidation: def test_run_id_at_max_length_accepted(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id long_run_id = "x" * _MAX_RUN_ID_LEN result = runner.invoke( cli, ["coord", "release", rid, "--run-id", long_run_id] ) assert result.exit_code == 0, result.output def test_run_id_over_max_length_exits_user_error(self, repo: pathlib.Path) -> None: too_long = "x" * (_MAX_RUN_ID_LEN + 1) result = runner.invoke( cli, ["coord", "release", "--run-id", too_long, "--all-for-run", "agent-1"] ) assert result.exit_code == ExitCode.USER_ERROR def test_run_id_over_max_length_message_on_stderr(self, repo: pathlib.Path) -> None: too_long = "x" * (_MAX_RUN_ID_LEN + 1) result = runner.invoke( cli, ["coord", "release", "--run-id", too_long, "--all-for-run", "agent-1"] ) combined = result.output + (result.stderr or "") assert "run-id" in combined.lower() or "too long" in combined.lower() def test_run_id_over_max_leaves_no_release_file(self, repo: pathlib.Path, reservation: Reservation) -> None: """Validation must fire before any file I/O.""" too_long = "x" * (_MAX_RUN_ID_LEN + 1) runner.invoke( cli, ["coord", "release", reservation.reservation_id, "--run-id", too_long], ) released = load_released_ids(repo) assert reservation.reservation_id not in released def test_mutual_exclusion_exits_user_error(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--all-for-run", "agent-1"], ) assert result.exit_code == ExitCode.USER_ERROR def test_missing_both_exits_user_error(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "release", "--run-id", "agent-1"]) assert result.exit_code == ExitCode.USER_ERROR def test_invalid_id_exits_user_error(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["coord", "release", "not-a-valid-id", "--run-id", "agent-1"] ) assert result.exit_code == ExitCode.USER_ERROR def test_not_found_id_exits_not_found(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-reservation") result = runner.invoke( cli, ["coord", "release", nonexistent, "--run-id", "agent-1"] ) assert result.exit_code == ExitCode.NOT_FOUND def test_not_found_id_json_status(self, repo: pathlib.Path) -> None: nonexistent = fake_id("nonexistent-reservation") result = runner.invoke( cli, ["coord", "release", nonexistent, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "not_found" assert result.exit_code == ExitCode.NOT_FOUND def test_invalid_reason_rejected_by_parser(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--reason", "bogus-reason"] ) assert result.exit_code != 0 # --------------------------------------------------------------------------- # JSON output format # --------------------------------------------------------------------------- class TestReleaseJsonFormat: def test_single_release_json_is_compact(self, repo: pathlib.Path, reservation: Reservation) -> None: """JSON output must be compact (no indent=2 pretty-printing).""" rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"] ) assert result.exit_code == 0, result.output # Compact JSON has no newlines inside the object assert "\n" not in result.output.strip() def test_batch_json_is_compact(self, repo: pathlib.Path) -> None: for i in range(3): create_reservation( repo, run_id="compact-agent", branch="main", addresses=[f"src/c{i}.py::f"], ttl_seconds=3600, ) result = runner.invoke( cli, ["coord", "release", "--all-for-run", "compact-agent", "--run-id", "compact-agent", "--json"], ) assert result.exit_code == 0, result.output assert "\n" not in result.output.strip() def test_already_released_json_released_at_is_null( self, repo: pathlib.Path, reservation: Reservation ) -> None: rid = reservation.reservation_id runner.invoke(cli, ["coord", "release", rid, "--run-id", "agent-1"]) result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert data["status"] == "already_released" assert data["released_at"] is None def test_batch_json_skipped_already_released_count(self, repo: pathlib.Path) -> None: reservations = [ create_reservation( repo, run_id="skip-agent", branch="main", addresses=[f"src/s{i}.py::f"], ttl_seconds=3600, ) for i in range(4) ] # Pre-release 2 of the 4 for res in reservations[:2]: create_release(repo, res.reservation_id, run_id="skip-agent") result = runner.invoke( cli, ["coord", "release", "--all-for-run", "skip-agent", "--run-id", "skip-agent", "--json"], ) data = json.loads(result.output) assert data["skipped_already_released"] == 2 assert len(data["released"]) == 2 def test_single_json_has_duration_ms(self, repo: pathlib.Path, reservation: Reservation) -> None: rid = reservation.reservation_id result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-1", "--json"] ) data = json.loads(result.output) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_batch_json_has_duration_ms(self, repo: pathlib.Path) -> None: create_reservation( repo, run_id="elapsed-agent", branch="main", addresses=["x.py::f"], ttl_seconds=3600, ) result = runner.invoke( cli, ["coord", "release", "--all-for-run", "elapsed-agent", "--run-id", "elapsed-agent", "--json"], ) data = json.loads(result.output) assert isinstance(data["duration_ms"], float) # --------------------------------------------------------------------------- # Concurrent # --------------------------------------------------------------------------- class TestReleaseConcurrent: def test_two_threads_race_to_release_same_reservation( self, repo: pathlib.Path, reservation: Reservation ) -> None: """Both threads must exit 0; one writes the tombstone, the other sees already_released.""" rid = reservation.reservation_id exit_codes: list[int] = [] lock = threading.Lock() def _release() -> None: result = runner.invoke( cli, ["coord", "release", rid, "--run-id", "agent-race"] ) with lock: exit_codes.append(result.exit_code) t1 = threading.Thread(target=_release) t2 = threading.Thread(target=_release) t1.start() t2.start() t1.join() t2.join() assert exit_codes == [0, 0], f"Expected both exits 0, got {exit_codes}" released = load_released_ids(repo) assert rid in released def test_concurrent_batch_releases_idempotent(self, repo: pathlib.Path) -> None: """Multiple agents calling --all-for-run concurrently produce the same final state.""" for i in range(10): create_reservation( repo, run_id="concurrent-batch", branch="main", addresses=[f"src/cb{i}.py::f"], ttl_seconds=3600, ) results: list[int] = [] lock = threading.Lock() def _batch() -> None: result = runner.invoke( cli, ["coord", "release", "--all-for-run", "concurrent-batch", "--run-id", "concurrent-batch"], ) with lock: results.append(result.exit_code) threads = [threading.Thread(target=_batch) for _ in range(5)] for t in threads: t.start() for t in threads: t.join() assert all(c == 0 for c in results), f"Non-zero exit in concurrent batch: {results}" released = load_released_ids(repo) all_res = load_all_reservations(repo) batch_ids = {r.reservation_id for r in all_res if r.run_id == "concurrent-batch"} assert batch_ids == batch_ids & released class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.release_coord import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["release", "--run-id", "agent-1"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.release_coord import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["release", "--run-id", "agent-1", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.release_coord import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["release", "--run-id", "agent-1", "-j"]) assert args.json_out is True