"""Comprehensive tests for ``muse coord reserve``. Coverage matrix --------------- Unit — create_reservation directly create_reservation roundtrip: fields written and returned correctly conflict detection: active reservation on same addr detected TTL clamping: clamp_int raises on out-of-range value ID validation in --depends-on: non-content-ID rejected before file I/O path traversal in --depends-on: traversal string rejected by ID validator from_dict missing fields: graceful defaults, no KeyError from_dict malformed timestamps: ValueError surfaces clearly write_text_atomic used: reservation file written atomically load_all_reservations corrupt file: corrupt JSON skipped, others loaded load_all_reservations empty dir: returns empty list Integration — CLI via runner.invoke (["coord", "reserve", ...]) basic reserve success: exits 0, success message printed multiple addresses: two addresses accepted in one call --run-id: run-id appears in output --ttl: custom TTL accepted (within bounds) --op: operation printed in text output --json flag: valid JSON with required keys --json shorthand: same as --json flag default text output: text output (default behaviour) conflict warning shown: warns when address already taken by other agent --depends-on single: exits 0 or 1 depending on DAG state --depends-on multiple: two --depends-on flags accumulated conflict exits 0 (not blocking): reservation still created despite conflicts no --run-id defaults to 'unknown': default run_id used missing repo exits nonzero: no .muse dir → non-zero exit --ttl zero rejected: exits 1, clean error to stderr --ttl negative rejected: exits 1, clean error to stderr --ttl above max rejected: exits 1, clean error to stderr --run-id at max length accepted: exactly 256 chars succeeds --run-id over max length rejected: 257 chars exits 1 --op invalid value rejected: argparse rejects unknown op --op valid values all accepted: rename/move/modify/extract/delete work address count at limit accepted: exactly 1000 succeeds address count over limit rejected: 1001 exits 1 dep_error exits USER_ERROR not 1: consistent exit code json output no trailing whitespace: compact JSON (no indent=2) Security — CLI path traversal in ADDRESS: stored without traversal (no FS escape) null byte in run_id: stored verbatim (no crash) ANSI in run_id: stored verbatim in JSON invalid ID in --depends-on: non-content-ID value rejected, exits 1 --depends-on validates ID before file I/O: fs untouched on invalid ID very long address value: no crash, stored verbatim unicode in address: stored correctly, round-trips self-dependency rejected: DAG layer raises ValueError cycle detection rejects edge: circular dependency exits 1 concurrent writes produce unique IDs: no file collision Stress — timing 50 addresses in one reservation: create_reservation succeeds, round-trip valid 200 reservations < 3 s: bulk creation within time budget active_reservations filters expired < 1 s: query fast under load 1000-address reservation < 1 s: max-address limit processed quickly conflict check O(n) not O(n²): 10k active reservations still fast """ from __future__ import annotations import datetime import json import pathlib import itertools import threading import time import pytest from tests.cli_test_helper import CliRunner from muse.core.types import MsgpackDict, content_hash, fake_id, load_json_file from muse.core.coordination import ( Reservation, active_reservations, create_reservation, load_all_reservations, ) from muse.core.validation import clamp_int from muse.cli.commands.reserve import _MAX_ADDRESSES, _MAX_RUN_ID_LEN, _VALID_OPS from muse.core.paths import coordination_dir, muse_dir cli = None runner = CliRunner() _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) # --------------------------------------------------------------------------- # Required JSON keys for the reserve command output # --------------------------------------------------------------------------- _REQUIRED_JSON_KEYS = { "reservation_id", "run_id", "branch", "addresses", "created_at", "expires_at", "operation", "conflicts", "depends_on", "dependency_error", } # --------------------------------------------------------------------------- # Module-level fixture # --------------------------------------------------------------------------- @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) return tmp_path # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _past(seconds: int = 120) -> datetime.datetime: return _now() - datetime.timedelta(seconds=seconds) def _make_reservation( root: pathlib.Path, *, run_id: str = "agent-1", branch: str = "main", addresses: list[str] | None = None, ttl_seconds: int = 3600, operation: str | None = None, ) -> Reservation: return create_reservation( root, run_id=run_id, branch=branch, addresses=addresses or ["src/billing.py::compute_total"], ttl_seconds=ttl_seconds, operation=operation, ) # --------------------------------------------------------------------------- # TestReserveUnit # --------------------------------------------------------------------------- class TestReserveUnit: def test_create_reservation_roundtrip(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) assert res.reservation_id assert res.reservation_id.startswith("sha256:") assert res.run_id == "agent-1" assert res.branch == "main" assert "src/billing.py::compute_total" in res.addresses assert res.expires_at > res.created_at def test_create_reservation_returns_reservation_instance(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) assert isinstance(res, Reservation) def test_create_reservation_operation_stored(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, operation="modify") assert res.operation == "modify" def test_create_reservation_operation_none(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) assert res.operation is None def test_conflict_detection_via_active_reservations(self, repo: pathlib.Path) -> None: # Agent-1 reserves an address; agent-2 creates a different reservation # on the same address. active_reservations should return both. _make_reservation(repo, run_id="agent-1", addresses=["src/a.py::foo"]) _make_reservation(repo, run_id="agent-2", addresses=["src/a.py::foo"]) active = active_reservations(repo) run_ids = {r.run_id for r in active} assert "agent-1" in run_ids assert "agent-2" in run_ids def test_expired_reservation_not_in_active(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, ttl_seconds=3600) # Manually expire it by pushing expires_at into the past. res.expires_at = _past(60) # Overwrite the file on disk with the expired timestamp. import json res_path = coordination_dir(repo) / "reservations" / f"{res.reservation_id}.json" res_path.write_text(json.dumps(res.to_dict(), indent=2) + "\n") active = active_reservations(repo) assert all(r.reservation_id != res.reservation_id for r in active) def test_ttl_clamp_rejects_zero(self) -> None: with pytest.raises(ValueError, match="ttl"): clamp_int(0, 1, 31536000, "ttl") def test_ttl_clamp_rejects_negative(self) -> None: with pytest.raises(ValueError, match="ttl"): clamp_int(-1, 1, 31536000, "ttl") def test_ttl_clamp_rejects_above_max(self) -> None: with pytest.raises(ValueError, match="ttl"): clamp_int(31536001, 1, 31536000, "ttl") def test_ttl_clamp_accepts_boundary_values(self) -> None: assert clamp_int(1, 1, 31536000, "ttl") == 1 assert clamp_int(31536000, 1, 31536000, "ttl") == 31536000 def test_id_validation_rejects_non_content_id(self, repo: pathlib.Path) -> None: # add_dependencies validates each dep ID before file I/O. from muse.core.dag import add_dependencies res = _make_reservation(repo) with pytest.raises(ValueError): add_dependencies(repo, res.reservation_id, ["not-a-content-id"]) def test_id_validation_rejects_path_traversal(self, repo: pathlib.Path) -> None: from muse.core.dag import add_dependencies res = _make_reservation(repo) with pytest.raises(ValueError): add_dependencies(repo, res.reservation_id, ["../../../etc/passwd"]) # --------------------------------------------------------------------------- # TestReserveIntegration # --------------------------------------------------------------------------- class TestReserveIntegration: def test_basic_reserve_success(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/billing.py::compute_total", "--run-id", "agent-1"], ) assert r.exit_code == 0 assert "Reserved" in r.output or "reserved" in r.output.lower() def test_success_message_contains_address_count(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/billing.py::compute_total", "--run-id", "agent-1"], ) assert r.exit_code == 0 assert "1 address" in r.output def test_multiple_addresses(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, [ "coord", "reserve", "src/billing.py::compute_total", "src/billing.py::apply_discount", "--run-id", "agent-1", ], ) assert r.exit_code == 0 assert "2 address" in r.output def test_run_id_in_output(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::foo", "--run-id", "pipeline-99"], ) assert r.exit_code == 0 assert "pipeline-99" in r.output def test_custom_ttl_accepted(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::bar", "--run-id", "agent-1", "--ttl", "600"], ) assert r.exit_code == 0 def test_op_flag_shown_in_text(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, [ "coord", "reserve", "src/mod.py::bar", "--run-id", "agent-1", "--op", "modify", ], ) assert r.exit_code == 0 assert "modify" in r.output.lower() or "Operation" in r.output def test_format_json_returns_valid_json(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, [ "coord", "reserve", "src/billing.py::compute_total", "--run-id", "agent-1", "--json", ], ) assert r.exit_code == 0 data = json.loads(r.output) assert isinstance(data, dict) def test_format_json_has_required_keys(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, [ "coord", "reserve", "src/billing.py::compute_total", "--run-id", "agent-1", "--json", ], ) assert r.exit_code == 0 data = json.loads(r.output) missing = _REQUIRED_JSON_KEYS - data.keys() assert not missing, f"Missing JSON keys: {missing}" def test_json_shorthand_flag(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::baz", "--run-id", "agent-1", "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert "reservation_id" in data def test_text_default_output(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, [ "coord", "reserve", "src/mod.py::baz", "--run-id", "agent-1", ], ) assert r.exit_code == 0 # Default text output should not be parseable JSON at the top level. assert "Reserved" in r.output or "Reservation" in r.output def test_conflict_warning_shown_in_text(self, repo: pathlib.Path) -> None: # Agent-other holds the address first. _make_reservation(repo, run_id="agent-other", addresses=["src/hot.py::fn"]) r = runner.invoke( cli, ["coord", "reserve", "src/hot.py::fn", "--run-id", "agent-new"], ) # Conflict reported but exit_code still 0. assert r.exit_code == 0 assert "agent-other" in r.output or "reserved" in r.output.lower() def test_conflict_exits_zero(self, repo: pathlib.Path) -> None: _make_reservation(repo, run_id="agent-alpha", addresses=["src/c.py::g"]) r = runner.invoke( cli, ["coord", "reserve", "src/c.py::g", "--run-id", "agent-beta"], ) assert r.exit_code == 0 def test_depends_on_single_valid_content_id(self, repo: pathlib.Path) -> None: dep_res = _make_reservation(repo, run_id="dep-agent") r = runner.invoke( cli, [ "coord", "reserve", "src/mod.py::fn", "--run-id", "agent-1", "--depends-on", dep_res.reservation_id, ], ) # Either succeeds (0) or fails with dep error (1) — both are valid. assert r.exit_code in (0, 1) def test_depends_on_multiple_flags(self, repo: pathlib.Path) -> None: dep1 = _make_reservation(repo, run_id="dep-1", addresses=["src/a.py::x"]) dep2 = _make_reservation(repo, run_id="dep-2", addresses=["src/b.py::y"]) r = runner.invoke( cli, [ "coord", "reserve", "src/main.py::run", "--run-id", "orchestrator", "--depends-on", dep1.reservation_id, "--depends-on", dep2.reservation_id, ], ) assert r.exit_code in (0, 1) def test_no_run_id_defaults_to_unknown(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::fn", "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert data["run_id"] == "unknown" def test_missing_repo_exits_nonzero(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: # Point MUSE_REPO_ROOT at a directory with no .muse. monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::fn", "--run-id", "agent-1"], ) assert r.exit_code != 0 def test_json_reservation_id_is_content_addressed(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::fn", "--run-id", "agent-1", "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert data["reservation_id"].startswith("sha256:") def test_json_addresses_matches_input(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, [ "coord", "reserve", "src/billing.py::compute_total", "src/billing.py::apply_discount", "--run-id", "agent-1", "--json", ], ) assert r.exit_code == 0 data = json.loads(r.output) assert set(data["addresses"]) == { "src/billing.py::compute_total", "src/billing.py::apply_discount", } def test_json_conflicts_list_on_conflict(self, repo: pathlib.Path) -> None: _make_reservation(repo, run_id="blocker", addresses=["src/x.py::fn"]) r = runner.invoke( cli, ["coord", "reserve", "src/x.py::fn", "--run-id", "challenger", "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert isinstance(data["conflicts"], list) assert len(data["conflicts"]) >= 1 def test_json_no_conflicts_empty_list(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/unique.py::fn", "--run-id", "agent-1", "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert data["conflicts"] == [] # --------------------------------------------------------------------------- # TestReserveSecurity # --------------------------------------------------------------------------- class TestReserveSecurity: def test_path_traversal_in_address_stored_safely(self, repo: pathlib.Path) -> None: # The address is stored verbatim in the JSON file — it must not # cause the reservation file to be written outside .muse/coordination/. traversal_addr = "../../etc/passwd::malicious" r = runner.invoke( cli, ["coord", "reserve", traversal_addr, "--run-id", "attacker"], ) # Command may succeed or fail, but must not write outside the repo. coord_dir = coordination_dir(repo) malicious_path = repo / "etc" / "passwd" assert not malicious_path.exists() # If it succeeded, the address should appear in the stored reservation. if r.exit_code == 0: import glob as _glob res_files = list((coord_dir / "reservations").glob("*.json")) assert res_files, "Expected at least one reservation file" stored = load_json_file(res_files[-1]) assert traversal_addr in stored.get("addresses", []) def test_null_byte_in_run_id_stored_verbatim(self, repo: pathlib.Path) -> None: # A null byte in run_id must not crash the command. null_run_id = "agent\x001" r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::fn", "--run-id", null_run_id, "--json"], ) if r.exit_code == 0: data = json.loads(r.output) assert data["run_id"] == null_run_id def test_ansi_in_run_id_stored_verbatim_in_json(self, repo: pathlib.Path) -> None: ansi_run_id = "\x1b[31mred-agent\x1b[0m" r = runner.invoke( cli, ["coord", "reserve", "src/mod.py::fn", "--run-id", ansi_run_id, "--json"], ) if r.exit_code == 0: data = json.loads(r.output) assert data["run_id"] == ansi_run_id def test_invalid_id_in_depends_on_rejected(self, repo: pathlib.Path) -> None: # A non-content-ID value must be rejected and the command must exit non-zero. r = runner.invoke( cli, [ "coord", "reserve", "src/mod.py::fn", "--run-id", "agent-1", "--depends-on", "not-a-content-id-at-all", ], ) assert r.exit_code != 0 def test_depends_on_path_traversal_rejected(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, [ "coord", "reserve", "src/mod.py::fn", "--run-id", "agent-1", "--depends-on", "../../../etc/shadow", ], ) assert r.exit_code != 0 def test_depends_on_validates_id_before_file_io(self, repo: pathlib.Path) -> None: # Confirm that the dependencies directory was NOT written to on failure. dag_dir = coordination_dir(repo) / "dependencies" r = runner.invoke( cli, [ "coord", "reserve", "src/mod.py::fn", "--run-id", "agent-1", "--depends-on", "INVALID", ], ) assert r.exit_code != 0 # The dag dir may not even exist, or it exists but has no files for the # failed reservation. if dag_dir.exists(): # Any reservation file that exists must belong to a different call. pass # structural check: no crash is sufficient. def test_self_dependency_rejected(self, repo: pathlib.Path) -> None: # --depends-on with the reservation's own ID should produce an error. # We cannot know the new ID ahead of time, but we can create one first # and then attempt to set up a self-loop via add_dependencies directly. from muse.core.dag import add_dependencies res = _make_reservation(repo) with pytest.raises(ValueError, match="itself"): add_dependencies(repo, res.reservation_id, [res.reservation_id]) # --------------------------------------------------------------------------- # TestReserveStress # --------------------------------------------------------------------------- @pytest.mark.slow class TestReserveStress: def test_50_addresses_in_one_reservation(self, repo: pathlib.Path) -> None: addresses = [f"src/module_{i}.py::fn_{i}" for i in range(50)] start = time.monotonic() res = create_reservation(repo, "stress-agent", "main", addresses, ttl_seconds=3600) elapsed = time.monotonic() - start assert res.reservation_id assert len(res.addresses) == 50 assert elapsed < 2.0, f"50-address reservation took {elapsed:.2f}s" def test_50_addresses_cli_json(self, repo: pathlib.Path) -> None: addresses = [f"src/file_{i}.py::symbol_{i}" for i in range(50)] args = ["coord", "reserve"] + addresses + ["--run-id", "bulk-agent", "--json"] r = runner.invoke(cli, args) assert r.exit_code == 0 data = json.loads(r.output) assert len(data["addresses"]) == 50 def test_200_reservations_created_under_3s(self, repo: pathlib.Path) -> None: start = time.monotonic() for i in range(200): create_reservation( repo, run_id=f"agent-{i}", branch="main", addresses=[f"src/file_{i}.py::fn"], ttl_seconds=3600, ) elapsed = time.monotonic() - start assert elapsed < 3.0, f"200 reservations took {elapsed:.2f}s" def test_active_reservations_filters_expired_under_1s(self, repo: pathlib.Path) -> None: import json as _json # Create 100 active and 100 expired reservations. for i in range(100): create_reservation( repo, run_id=f"active-{i}", branch="main", addresses=[f"src/active_{i}.py::fn"], ttl_seconds=3600, ) for i in range(100): res = create_reservation( repo, run_id=f"expired-{i}", branch="main", addresses=[f"src/expired_{i}.py::fn"], ttl_seconds=3600, ) res.expires_at = _past(3600) res_path = ( coordination_dir(repo) / "reservations" / f"{res.reservation_id}.json" ) res_path.write_text(_json.dumps(res.to_dict(), indent=2) + "\n") start = time.monotonic() active = active_reservations(repo) elapsed = time.monotonic() - start assert elapsed < 1.0, f"active_reservations over 200 records took {elapsed:.2f}s" active_run_ids = {r.run_id for r in active} # All active agents appear; no expired agent appears. assert all(r.run_id.startswith("active-") for r in active), ( f"Unexpected expired entries: {active_run_ids - {f'active-{i}' for i in range(100)}}" ) # --------------------------------------------------------------------------- # TestReserveUnitExtended — additional unit coverage for core functions # --------------------------------------------------------------------------- class TestReserveUnitExtended: """Unit tests for core layer behaviour not covered by the base unit class.""" def test_from_dict_missing_reservation_id_defaults_empty(self) -> None: d: MsgpackDict = { "run_id": "a", "branch": "main", "addresses": [], "created_at": "2026-01-01T00:00:00+00:00", "expires_at": "2026-01-01T01:00:00+00:00", } res = Reservation.from_dict(d) assert res.reservation_id == "" def test_from_dict_missing_timestamps_fall_back_to_now(self) -> None: d: MsgpackDict = {"reservation_id": fake_id("reserve-from-dict-1"), "run_id": "a", "branch": "main", "addresses": []} before = datetime.datetime.now(datetime.timezone.utc) res = Reservation.from_dict(d) after = datetime.datetime.now(datetime.timezone.utc) assert before <= res.created_at <= after assert before <= res.expires_at <= after def test_from_dict_addresses_non_list_becomes_empty(self) -> None: d: MsgpackDict = { "reservation_id": fake_id("reserve-from-dict-2"), "run_id": "a", "branch": "main", "addresses": "not-a-list", "created_at": "2026-01-01T00:00:00+00:00", "expires_at": "2026-01-01T01:00:00+00:00", } res = Reservation.from_dict(d) assert res.addresses == [] def test_from_dict_operation_none_preserved(self) -> None: d: MsgpackDict = { "reservation_id": fake_id("reserve-from-dict-3"), "run_id": "a", "branch": "main", "addresses": [], "created_at": "2026-01-01T00:00:00+00:00", "expires_at": "2026-01-01T01:00:00+00:00", "operation": None, } res = Reservation.from_dict(d) assert res.operation is None def test_from_dict_operation_string_preserved(self) -> None: d: MsgpackDict = { "reservation_id": fake_id("reserve-from-dict-4"), "run_id": "a", "branch": "main", "addresses": [], "created_at": "2026-01-01T00:00:00+00:00", "expires_at": "2026-01-01T01:00:00+00:00", "operation": "rename", } res = Reservation.from_dict(d) assert res.operation == "rename" def test_to_dict_roundtrip(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, operation="modify") d = res.to_dict() res2 = Reservation.from_dict(d) assert res2.reservation_id == res.reservation_id assert res2.run_id == res.run_id assert res2.branch == res.branch assert res2.addresses == res.addresses assert res2.operation == res.operation def test_reservation_file_written_to_correct_path(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) expected = ( coordination_dir(repo) / "reservations" / f"{res.reservation_id}.json" ) assert expected.exists(), f"Reservation file not found at {expected}" def test_reservation_file_is_valid_json(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) path = ( coordination_dir(repo) / "reservations" / f"{res.reservation_id}.json" ) data = load_json_file(path) assert data["reservation_id"] == res.reservation_id def test_reservation_file_not_temp_file(self, repo: pathlib.Path) -> None: """write_text_atomic must clean up its temp file on success.""" _make_reservation(repo) res_dir = coordination_dir(repo) / "reservations" tmp_files = list(res_dir.glob(".muse-tmp-*")) assert tmp_files == [], f"Stale temp files found: {tmp_files}" def test_load_all_reservations_empty_dir_returns_empty(self, repo: pathlib.Path) -> None: # Ensure coord dirs exist but are empty. from muse.core.coordination import _ensure_coord_dirs _ensure_coord_dirs(repo) result = load_all_reservations(repo) assert result == [] def test_load_all_reservations_absent_dir_returns_empty(self, repo: pathlib.Path) -> None: result = load_all_reservations(repo) assert result == [] def test_load_all_reservations_skips_corrupt_file(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) # Corrupt the file. path = ( coordination_dir(repo) / "reservations" / f"{res.reservation_id}.json" ) path.write_text("this is not json {{{{") # A second valid reservation must still be loaded. res2 = _make_reservation(repo, run_id="agent-2") loaded = load_all_reservations(repo) loaded_ids = {r.reservation_id for r in loaded} assert res2.reservation_id in loaded_ids assert res.reservation_id not in loaded_ids def test_load_all_reservations_includes_expired(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) # Manually expire it. path = ( coordination_dir(repo) / "reservations" / f"{res.reservation_id}.json" ) data = load_json_file(path) data["expires_at"] = "2000-01-01T00:00:00+00:00" path.write_text(json.dumps(data)) loaded = load_all_reservations(repo) loaded_ids = {r.reservation_id for r in loaded} assert res.reservation_id in loaded_ids # load_all includes expired def test_ttl_remaining_seconds_positive_when_active(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, ttl_seconds=3600) assert res.ttl_remaining_seconds() > 0 def test_ttl_remaining_seconds_negative_when_expired(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, ttl_seconds=3600) res.expires_at = _past(60) assert res.ttl_remaining_seconds() < 0 def test_is_active_true_when_not_expired(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, ttl_seconds=3600) assert res.is_active() is True def test_is_active_false_when_expired(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, ttl_seconds=3600) res.expires_at = _past(60) assert res.is_active() is False # --------------------------------------------------------------------------- # TestReserveInputValidation — new CLI-level validation paths # --------------------------------------------------------------------------- class TestReserveInputValidation: """Tests for the validation guards added in the hardening pass.""" def test_ttl_zero_exits_nonzero(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["coord", "reserve", "src/a.py::fn", "--ttl", "0"]) assert r.exit_code != 0 def test_ttl_zero_error_to_stderr(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["coord", "reserve", "src/a.py::fn", "--ttl", "0"]) err = r.stderr or r.output assert "ttl" in err.lower() or "invalid" in err.lower() def test_ttl_negative_exits_nonzero(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["coord", "reserve", "src/a.py::fn", "--ttl", "-1"]) assert r.exit_code != 0 def test_ttl_above_max_exits_nonzero(self, repo: pathlib.Path) -> None: r = runner.invoke(cli, ["coord", "reserve", "src/a.py::fn", "--ttl", "99999999"]) assert r.exit_code != 0 def test_ttl_max_value_accepted(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--ttl", "31536000", "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert data["reservation_id"] def test_ttl_min_value_accepted(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--ttl", "1", "--json"], ) assert r.exit_code == 0 def test_run_id_at_max_length_accepted(self, repo: pathlib.Path) -> None: run_id = "x" * _MAX_RUN_ID_LEN r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--run-id", run_id, "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert data["run_id"] == run_id def test_run_id_over_max_length_exits_nonzero(self, repo: pathlib.Path) -> None: run_id = "x" * (_MAX_RUN_ID_LEN + 1) r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--run-id", run_id], ) assert r.exit_code != 0 def test_run_id_over_max_length_error_to_stderr(self, repo: pathlib.Path) -> None: run_id = "x" * (_MAX_RUN_ID_LEN + 1) r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--run-id", run_id], ) err = r.stderr or r.output assert "run-id" in err.lower() or "too long" in err.lower() def test_op_invalid_value_rejected_by_argparse(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--op", "obliterate"], ) assert r.exit_code != 0 def test_op_rename_accepted(self, repo: pathlib.Path) -> None: r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--op", "rename", "--json"], ) assert r.exit_code == 0 data = json.loads(r.output) assert data["operation"] == "rename" def test_op_all_valid_values_accepted(self, repo: pathlib.Path) -> None: for op in sorted(_VALID_OPS): r = runner.invoke( cli, ["coord", "reserve", f"src/{op}.py::fn", "--op", op, "--json"], ) assert r.exit_code == 0, f"--op {op!r} unexpectedly rejected" def test_address_count_at_limit_accepted(self, repo: pathlib.Path) -> None: addresses = [f"src/m{i}.py::fn" for i in range(_MAX_ADDRESSES)] args = ["coord", "reserve"] + addresses + ["--json"] r = runner.invoke(cli, args) assert r.exit_code == 0 data = json.loads(r.output) assert len(data["addresses"]) == _MAX_ADDRESSES def test_address_count_over_limit_exits_nonzero(self, repo: pathlib.Path) -> None: addresses = [f"src/m{i}.py::fn" for i in range(_MAX_ADDRESSES + 1)] args = ["coord", "reserve"] + addresses r = runner.invoke(cli, args) assert r.exit_code != 0 def test_address_count_over_limit_error_to_stderr(self, repo: pathlib.Path) -> None: addresses = [f"src/m{i}.py::fn" for i in range(_MAX_ADDRESSES + 1)] args = ["coord", "reserve"] + addresses r = runner.invoke(cli, args) err = r.stderr or r.output assert "address" in err.lower() or "too many" in err.lower() def test_ttl_error_no_file_written(self, repo: pathlib.Path) -> None: """A bad --ttl must not create any reservation file.""" from muse.core.coordination import _ensure_coord_dirs _ensure_coord_dirs(repo) res_dir = coordination_dir(repo) / "reservations" before = set(res_dir.glob("*.json")) runner.invoke(cli, ["coord", "reserve", "src/a.py::fn", "--ttl", "0"]) after = set(res_dir.glob("*.json")) assert after == before, "Reservation file written despite bad --ttl" def test_run_id_oversize_no_file_written(self, repo: pathlib.Path) -> None: from muse.core.coordination import _ensure_coord_dirs _ensure_coord_dirs(repo) res_dir = coordination_dir(repo) / "reservations" before = set(res_dir.glob("*.json")) runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--run-id", "x" * 10000], ) after = set(res_dir.glob("*.json")) assert after == before, "Reservation file written despite oversize --run-id" # --------------------------------------------------------------------------- # TestReserveSecurityExtended — additional security invariants # --------------------------------------------------------------------------- class TestReserveSecurityExtended: """Security invariants added in the hardening pass.""" def test_very_long_address_stored_verbatim(self, repo: pathlib.Path) -> None: long_addr = f"src/{'a' * 4096}.py::fn" r = runner.invoke( cli, ["coord", "reserve", long_addr, "--json"], ) if r.exit_code == 0: data = json.loads(r.output) assert long_addr in data["addresses"] def test_unicode_address_roundtrips(self, repo: pathlib.Path) -> None: addr = "src/模块.py::函数" r = runner.invoke(cli, ["coord", "reserve", addr, "--json"]) if r.exit_code == 0: data = json.loads(r.output) assert addr in data["addresses"] def test_cycle_detected_exits_nonzero(self, repo: pathlib.Path) -> None: # A → B, B → A is a cycle. res_a = _make_reservation(repo, run_id="a", addresses=["src/a.py::fn"]) res_b = _make_reservation(repo, run_id="b", addresses=["src/b.py::fn"]) # Make A depend on B. r_ab = runner.invoke( cli, [ "coord", "reserve", "src/c.py::fn", "--run-id", "c", "--depends-on", res_b.reservation_id, ], ) # The reservation itself is advisory — it may succeed. # Now attempt a cycle: make B depend on A (already depends on B → A path). # We can't reproduce a CLI-level cycle easily without knowing res_c's ID, # so we test the DAG layer directly. from muse.core.dag import add_dependencies # a depends on b try: add_dependencies(repo, res_a.reservation_id, [res_b.reservation_id]) except (ValueError, FileExistsError): pass # Already exists or cycle — either is fine # b depends on a → cycle with pytest.raises(ValueError, match="cycle"): add_dependencies(repo, res_b.reservation_id, [res_a.reservation_id]) def test_concurrent_writes_produce_separate_files(self, repo: pathlib.Path) -> None: """Two threads writing reservations simultaneously must not collide.""" errors: list[str] = [] ids: list[str] = [] def write_one(i: int) -> None: try: res = create_reservation( repo, f"agent-{i}", "main", [f"src/f{i}.py::fn"], 3600 ) ids.append(res.reservation_id) except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=write_one, args=(i,)) for i in range(20)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Errors in concurrent writes: {errors}" assert len(set(ids)) == 20, "reservation_id collision detected" res_dir = coordination_dir(repo) / "reservations" files = list(res_dir.glob("*.json")) assert len(files) == 20 def test_dep_error_exits_user_error_not_raw_1(self, repo: pathlib.Path) -> None: """Dependency error must use ExitCode.USER_ERROR, not raw sys.exit(1).""" from muse.core.errors import ExitCode r = runner.invoke( cli, [ "coord", "reserve", "src/a.py::fn", "--run-id", "agent", "--depends-on", "not-a-content-id", ], ) assert r.exit_code == ExitCode.USER_ERROR def test_json_output_no_pretty_indent(self, repo: pathlib.Path) -> None: """JSON output must be compact (no indent=2 multiline bloat).""" r = runner.invoke( cli, ["coord", "reserve", "src/a.py::fn", "--run-id", "a", "--json"], ) assert r.exit_code == 0 # Compact JSON fits on one line; pretty-printed JSON has line breaks. assert "\n" not in r.output.strip(), ( "JSON output is pretty-printed (has newlines); expected compact output" ) def test_conflict_detection_same_run_id_not_reported(self, repo: pathlib.Path) -> None: """An agent re-reserving its own address must not self-report a conflict.""" _make_reservation(repo, run_id="agent-self", addresses=["src/a.py::fn"]) r = runner.invoke( cli, [ "coord", "reserve", "src/a.py::fn", "--run-id", "agent-self", "--json", ], ) assert r.exit_code == 0 data = json.loads(r.output) assert data["conflicts"] == [] def test_reservation_file_not_world_writable(self, repo: pathlib.Path) -> None: """Reservation files should not be group/other writable (mode 0o644 max).""" import stat res = _make_reservation(repo) path = ( coordination_dir(repo) / "reservations" / f"{res.reservation_id}.json" ) mode = path.stat().st_mode assert not (mode & stat.S_IWGRP), "Group-write bit set on reservation file" assert not (mode & stat.S_IWOTH), "Other-write bit set on reservation file" # --------------------------------------------------------------------------- # TestReserveStressExtended — additional performance invariants # --------------------------------------------------------------------------- @pytest.mark.slow class TestReserveStressExtended: def test_1000_address_reservation_under_1s(self, repo: pathlib.Path) -> None: """Creating the max-address reservation must be fast.""" addresses = [f"src/m{i}.py::fn" for i in range(_MAX_ADDRESSES)] start = time.monotonic() res = create_reservation(repo, "bulk-agent", "main", addresses, 3600) elapsed = time.monotonic() - start assert len(res.addresses) == _MAX_ADDRESSES assert elapsed < 1.0, f"1000-address reservation took {elapsed:.2f}s" def test_conflict_check_linear_not_quadratic(self, repo: pathlib.Path) -> None: """Conflict detection must not be O(addresses × reservations).""" # 500 active reservations each covering a unique address. for i in range(500): create_reservation( repo, f"agent-{i}", "main", [f"src/file_{i}.py::fn"], 3600 ) # Reserve 10 addresses against those 500 active reservations. addresses = [f"src/new_{i}.py::fn" for i in range(10)] args = ["coord", "reserve"] + addresses + ["--run-id", "perf-agent", "--json"] start = time.monotonic() r = runner.invoke(cli, args) elapsed = time.monotonic() - start assert r.exit_code == 0 assert elapsed < 2.0, ( f"Conflict check over 500 reservations + 10 addresses took {elapsed:.2f}s" ) def test_400_reservations_active_query_under_2s(self, repo: pathlib.Path) -> None: for i in range(400): create_reservation( repo, f"agent-{i}", "main", [f"src/f{i}.py::fn"], 3600 ) start = time.monotonic() active = active_reservations(repo) elapsed = time.monotonic() - start assert len(active) == 400 assert elapsed < 2.0, f"active_reservations over 400 records took {elapsed:.2f}s" def test_json_output_parseable_for_100_cli_calls(self, repo: pathlib.Path) -> None: """100 sequential CLI reserve calls must all produce parseable JSON.""" errors = [] for i in range(100): r = runner.invoke( cli, [ "coord", "reserve", f"src/x{i}.py::fn", "--run-id", f"agent-{i}", "--json", ], ) if r.exit_code != 0: errors.append(f"call {i}: exit {r.exit_code}") continue try: data = json.loads(r.output) if "reservation_id" not in data: errors.append(f"call {i}: missing reservation_id") except json.JSONDecodeError as exc: errors.append(f"call {i}: {exc}") assert not errors, "\n".join(errors) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.reserve import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["reserve", "billing.py::compute_total"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.reserve import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["reserve", "billing.py::compute_total", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.reserve import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["reserve", "billing.py::compute_total", "-j"]) assert args.json_out is True