"""Tests for ``muse coord watch`` (watch_coord.py). Coverage matrix --------------- Unit TestWatchEvent — to_dict() schema, all fields present TestScanDirs — empty dir, missing dir, multiple kinds, TOCTOU safety TestDiffSnapshots — no change, added, removed, modified, cross-kind TestPassesFilters — kind/run_id/branch filters, AND semantics, empty data TestEmitEventJson — JSON mode produces valid NDJSON, all event types TestEmitEventText — text mode, all kinds, missing data, ANSI stripped TestCheckExpirations — expired emitted, active not, removed not double-counted TestMakeEvent — timestamp is UTC ISO 8601 Integration TestWatchLoopOnce — --once emits snapshot for all existing records TestWatchLoopFilters — kind/run_id/branch filters in loop TestWatchLoopDetectsAdded — new reservation appears → added event TestWatchLoopDetectsModified — heartbeat updated → modified event TestWatchLoopDetectsRemoved — file deleted → removed event (with cached data) TestWatchLoopDetectsExpiry — reservation expires → expired event (no file change) TestWatchLoopTimeout — loop exits after timeout seconds TestWatchLoopJsonOutput — all events are valid NDJSON in json mode TestWatchLoopMaxEvents — loop exits after --max-events events emitted End-to-end TestRunCommand — run() with --once via mock require_repo TestRunCommandValidation — run() validation: run-id/poll-interval/timeout/max-events TestBackendSelection — kqueue backend on macOS, polling on other TestSignalHandling — SIGTERM triggers clean exit Security TestAnsiInjectionSanitized — ANSI escapes in run_id/branch stripped from text TestSymlinkDirRejected — kqueue raises ValueError on symlinked coord dir TestKindFilterAllowlist — invalid kind strings rejected by argparse Stress TestStress500RapidAdds — 500 reservations added → all 500 added events TestStressLargeSnapshot — 1000 existing records → snapshot < 1 s TestStressDiffPerformance — diff of 2×1000-entry snapshots < 50 ms TestStressManyExpirations — 200 expiring reservations → all expired events < 2 s TestStressMaxEvents — max_events cap respected under 500-record load """ from __future__ import annotations import argparse import datetime import io import itertools import json import pathlib import tempfile import time from collections.abc import Callable from contextlib import redirect_stdout from unittest.mock import MagicMock, patch import pytest from muse.core.types import Manifest, MsgpackDict, MsgpackValue, content_hash, fake_id, now_utc_iso from muse.core.paths import muse_dir _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) # Module under test. from muse.cli.commands.watch_coord import ( WatchEvent, _Backend, _KqueueBackend, _MAX_RUN_ID_LEN, _MIN_POLL_INTERVAL, _MAX_POLL_INTERVAL, _PollingBackend, _Snapshot, _check_expirations, _coord_dir, _diff_snapshots, _emit_event, _ensure_coord_dirs, _load_record, _make_event, _passes_filters, _scan_dirs, _watch_loop, ) from muse.core.coordination import ( Reservation, create_heartbeat, create_intent, create_reservation, ) from muse.core.errors import ExitCode # ───────────────────────────────────────────────────────────────────────────── # Fixtures # ───────────────────────────────────────────────────────────────────────────── @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal muse repository with coord dirs created.""" dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text( json.dumps({"repo_id": fake_id("watch-coord-repo"), "name": "test-repo"}) ) _ensure_coord_dirs(tmp_path) return tmp_path class _ImmediateBackend(_Backend): """Test backend that fires a side-effect once then sleeps so the timeout fires. Design: ``wait()`` calls the side_effect on the first invocation (so files can be written between the snapshot and the next scan), then sleeps for ``timeout`` on all subsequent calls. This means the outer ``_watch_loop`` will do exactly one meaningful diff iteration (the one that sees the side-effect's changes) and then block until the timeout deadline expires. Always pass ``timeout`` (a finite deadline) when using ``once=False``. """ name = "immediate" def __init__( self, *, side_effect: Callable[[], None] | None = None, ) -> None: self._side_effect = side_effect self._fired = False def wait(self, timeout: float) -> bool: if not self._fired: if self._side_effect: self._side_effect() self._fired = True return True # Signal: something may have changed. # Already fired — sleep so the loop deadline can expire. time.sleep(max(0.0, timeout)) return False def close(self) -> None: pass def _make_reservation(repo: pathlib.Path, **kwargs: MsgpackValue) -> Reservation: """Create a reservation with sensible defaults.""" return create_reservation( repo, run_id=kwargs.get("run_id", "agent-1"), branch=kwargs.get("branch", "main"), addresses=kwargs.get("addresses", ["src/mod.py::fn"]), ttl_seconds=kwargs.get("ttl_seconds", 3600), operation=kwargs.get("operation", "modify"), ) def _run_loop( repo: pathlib.Path, *, side_effect: "callable | None" = None, kind_filter: str | None = None, run_id_filter: str | None = None, branch_filter: str | None = None, as_json: bool = True, once: bool = True, # When once=False, a finite timeout is required so the loop exits. # Default 1.0 s is enough for one side-effect iteration. timeout: float | None = None, poll_interval: float = 0.05, max_events: int | None = None, ) -> list[dict]: """Run _watch_loop with an ImmediateBackend; return parsed events. For ``once=False`` tests, pass ``timeout`` explicitly (e.g. ``timeout=1.0``). The backend fires the side_effect on the first wait, then sleeps until the deadline, giving the loop exactly one diff iteration to detect changes. """ # Enforce finite timeout for non-once mode to prevent hangs. if not once and timeout is None: timeout = 1.0 buf = io.StringIO() backend = _ImmediateBackend(side_effect=side_effect) with redirect_stdout(buf): _watch_loop( repo, backend, kind_filter=kind_filter, run_id_filter=run_id_filter, branch_filter=branch_filter, as_json=as_json, once=once, timeout=timeout, poll_interval=poll_interval, max_events=max_events, ) lines = [l for l in buf.getvalue().splitlines() if l.strip()] return [json.loads(line) for line in lines] # ───────────────────────────────────────────────────────────────────────────── # Unit — WatchEvent # ───────────────────────────────────────────────────────────────────────────── class TestWatchEvent: def test_to_dict_has_all_fields(self) -> None: ev = WatchEvent("added", "reservation", "uid-1", "2026-01-01T00:00:00+00:00", {}) d = ev.to_dict() assert set(d.keys()) == { "schema_version", "event_type", "kind", "id", "timestamp", "data" } def test_to_dict_values_round_trip(self) -> None: payload = {"run_id": "a", "branch": "main"} ev = WatchEvent("modified", "heartbeat", "uid-2", "2026-01-01T00:00:00+00:00", payload) d = ev.to_dict() assert d["event_type"] == "modified" assert d["kind"] == "heartbeat" assert d["id"] == "uid-2" assert d["data"] == payload def test_to_dict_schema_version_is_string(self) -> None: ev = WatchEvent("snapshot", "intent", "uid-3", "ts", {}) assert isinstance(ev.to_dict()["schema_version"], str) # ───────────────────────────────────────────────────────────────────────────── # Unit — _scan_dirs # ───────────────────────────────────────────────────────────────────────────── class TestScanDirs: def test_empty_coord_dirs_returns_empty_dicts(self, repo: pathlib.Path) -> None: snap = _scan_dirs(repo) for sub in ("reservations", "intents", "releases", "heartbeats"): assert snap[sub] == {} def test_missing_subdir_returns_empty(self, tmp_path: pathlib.Path) -> None: # No .muse/ dir at all. snap = _scan_dirs(tmp_path) for sub in ("reservations", "intents", "releases", "heartbeats"): assert snap[sub] == {} def test_file_appears_in_snapshot(self, repo: pathlib.Path) -> None: path = _coord_dir(repo) / "reservations" / "abc123.json" path.write_text('{"x": 1}') snap = _scan_dirs(repo) assert "abc123" in snap["reservations"] def test_snapshot_entry_is_mtime_ns_and_size(self, repo: pathlib.Path) -> None: path = _coord_dir(repo) / "reservations" / "abc123.json" path.write_text('{"x": 1}') snap = _scan_dirs(repo) mtime_ns, size = snap["reservations"]["abc123"] st = path.stat() assert mtime_ns == st.st_mtime_ns assert size == st.st_size def test_non_json_files_ignored(self, repo: pathlib.Path) -> None: (_coord_dir(repo) / "reservations" / "not-a-json.txt").write_text("hi") snap = _scan_dirs(repo) assert snap["reservations"] == {} def test_multiple_kinds_all_scanned(self, repo: pathlib.Path) -> None: (_coord_dir(repo) / "reservations" / "r1.json").write_text("{}") (_coord_dir(repo) / "intents" / "i1.json").write_text("{}") snap = _scan_dirs(repo) assert "r1" in snap["reservations"] assert "i1" in snap["intents"] def test_toctou_deleted_file_skipped(self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """File that disappears between glob and stat is silently skipped.""" real_glob = pathlib.Path.glob def patched_glob(self: pathlib.Path, pattern: str) -> list[pathlib.Path]: results = list(real_glob(self, pattern)) # Inject a ghost path that does not exist. ghost = self / "ghost.json" return results + [ghost] monkeypatch.setattr(pathlib.Path, "glob", patched_glob) snap = _scan_dirs(repo) assert "ghost" not in snap["reservations"] # ───────────────────────────────────────────────────────────────────────────── # Unit — _diff_snapshots # ───────────────────────────────────────────────────────────────────────────── class TestDiffSnapshots: def _empty(self) -> _Snapshot: return {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")} def test_no_change_no_events(self) -> None: snap = self._empty() snap["reservations"]["uid1"] = (100, 50) assert _diff_snapshots(snap, snap) == [] def test_added_detected(self) -> None: old = self._empty() new = {**old, "reservations": {"uid1": (100, 50)}} events = _diff_snapshots(old, new) assert ("added", "reservations", "uid1") in events def test_removed_detected(self) -> None: old = {**self._empty(), "reservations": {"uid1": (100, 50)}} new = self._empty() events = _diff_snapshots(old, new) assert ("removed", "reservations", "uid1") in events def test_modified_detected(self) -> None: snap = {**self._empty(), "reservations": {"uid1": (100, 50)}} new_snap = {**self._empty(), "reservations": {"uid1": (200, 51)}} events = _diff_snapshots(snap, new_snap) assert ("modified", "reservations", "uid1") in events def test_same_mtime_different_size_is_modified(self) -> None: snap = {**self._empty(), "reservations": {"uid1": (100, 50)}} new_snap = {**self._empty(), "reservations": {"uid1": (100, 99)}} events = _diff_snapshots(snap, new_snap) assert ("modified", "reservations", "uid1") in events def test_unchanged_mtime_and_size_is_not_modified(self) -> None: snap = {**self._empty(), "reservations": {"uid1": (100, 50)}} events = _diff_snapshots(snap, snap) assert not any(e[0] == "modified" for e in events) def test_multiple_kinds_in_one_diff(self) -> None: old = {**self._empty(), "reservations": {"r1": (1, 10)}} new = { "reservations": {}, "intents": {"i1": (2, 20)}, "releases": {}, "heartbeats": {}, } events = _diff_snapshots(old, new) event_types = {(e[0], e[2]) for e in events} assert ("removed", "r1") in event_types assert ("added", "i1") in event_types def test_output_is_sorted_deterministic(self) -> None: """IDs within each kind are sorted for reproducible test output.""" old = self._empty() new = { **self._empty(), "reservations": {"zzz": (1, 1), "aaa": (2, 2), "mmm": (3, 3)}, } events = _diff_snapshots(old, new) added_ids = [uid for et, kind, uid in events if et == "added"] assert added_ids == sorted(added_ids) # ───────────────────────────────────────────────────────────────────────────── # Unit — _passes_filters # ───────────────────────────────────────────────────────────────────────────── class TestPassesFilters: def _data(self, run_id: str = "agent-1", branch: str = "main") -> Manifest: return {"run_id": run_id, "branch": branch} def test_no_filters_always_passes(self) -> None: assert _passes_filters("reservation", self._data(), None, None, None) def test_kind_filter_match(self) -> None: assert _passes_filters("reservation", self._data(), "reservation", None, None) def test_kind_filter_no_match(self) -> None: assert not _passes_filters("intent", self._data(), "reservation", None, None) def test_run_id_filter_match(self) -> None: assert _passes_filters("reservation", self._data(), None, "agent-1", None) def test_run_id_filter_no_match(self) -> None: assert not _passes_filters("reservation", self._data(), None, "agent-9", None) def test_branch_filter_match(self) -> None: assert _passes_filters("reservation", self._data(), None, None, "main") def test_branch_filter_no_match(self) -> None: assert not _passes_filters("reservation", self._data(), None, None, "feature/x") def test_all_filters_and_semantics(self) -> None: """All three filters must pass simultaneously.""" d = self._data("agent-1", "main") assert _passes_filters("reservation", d, "reservation", "agent-1", "main") assert not _passes_filters("reservation", d, "reservation", "agent-1", "other") def test_empty_data_fails_run_id_filter(self) -> None: """Empty data (removed event with no cache) fails run_id/branch filters.""" assert not _passes_filters("reservation", {}, None, "agent-1", None) def test_empty_data_passes_when_no_filters(self) -> None: assert _passes_filters("reservation", {}, None, None, None) # ───────────────────────────────────────────────────────────────────────────── # Unit — _emit_event JSON mode # ───────────────────────────────────────────────────────────────────────────── class TestEmitEventJson: def _capture_json(self, ev: WatchEvent) -> MsgpackDict: buf = io.StringIO() with redirect_stdout(buf): _emit_event(ev, as_json=True) return json.loads(buf.getvalue().strip()) def test_json_is_valid_ndjson(self) -> None: ev = WatchEvent("added", "reservation", "uid1", "2026-01-01T00:00:00+00:00", {"run_id": "a", "branch": "b"}) d = self._capture_json(ev) assert d["event_type"] == "added" assert d["kind"] == "reservation" def test_all_event_types_serialise(self) -> None: for et in ("snapshot", "added", "modified", "removed", "expired"): ev = WatchEvent(et, "intent", "uid2", "ts", {}) d = self._capture_json(ev) assert d["event_type"] == et def test_data_payload_preserved(self) -> None: payload = {"run_id": "x", "addresses": ["f.py::fn"], "branch": "dev"} ev = WatchEvent("added", "reservation", "uid3", "ts", payload) d = self._capture_json(ev) assert d["data"] == payload # ───────────────────────────────────────────────────────────────────────────── # Unit — _emit_event text mode # ───────────────────────────────────────────────────────────────────────────── class TestEmitEventText: def _capture_text(self, ev: WatchEvent) -> str: buf = io.StringIO() with redirect_stdout(buf): _emit_event(ev, as_json=False) return buf.getvalue() def test_reservation_icon_and_kind(self) -> None: ev = WatchEvent("added", "reservation", "a" * 8, "ts", {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]}) out = self._capture_text(ev) assert "+" in out assert "reservation" in out def test_snapshot_icon(self) -> None: ev = WatchEvent("snapshot", "reservation", "a" * 8, "ts", {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]}) out = self._capture_text(ev) assert "·" in out def test_expired_icon(self) -> None: ev = WatchEvent("expired", "reservations", "a" * 8, "ts", {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]}) out = self._capture_text(ev) assert "!" in out def test_empty_data_does_not_crash(self) -> None: ev = WatchEvent("removed", "reservation", "b" * 8, "ts", {}) out = self._capture_text(ev) assert "bbbbbbbb" in out def test_heartbeat_shows_extends_to(self) -> None: ev = WatchEvent("modified", "heartbeat", "c" * 8, "ts", {"run_id": "r1", "reservation_id": "d" * 8, "extended_expires_at": "2099-01-01T00:00:00+00:00"}) out = self._capture_text(ev) assert "extends to" in out def test_release_shows_reason(self) -> None: ev = WatchEvent("added", "release", "e" * 8, "ts", {"run_id": "r1", "reason": "completed", "reservation_id": "f" * 8}) out = self._capture_text(ev) assert "completed" in out def test_intent_shows_operation(self) -> None: ev = WatchEvent("added", "intent", "g" * 8, "ts", {"run_id": "r1", "branch": "main", "operation": "delete", "addresses": ["src/mod.py::fn"]}) out = self._capture_text(ev) assert "delete" in out def test_ansi_escape_stripped_from_run_id(self) -> None: """ANSI escape in run_id must not reach terminal output.""" malicious_run_id = "\x1b[31mmalicious\x1b[0m" ev = WatchEvent("added", "reservation", "h" * 8, "ts", {"run_id": malicious_run_id, "branch": "main", "addresses": ["f.py::fn"]}) out = self._capture_text(ev) assert "\x1b" not in out assert "malicious" in out # Content preserved, escapes stripped. def test_long_address_list_truncated(self) -> None: addrs = [f"f.py::fn{i}" for i in range(10)] ev = WatchEvent("added", "reservation", "i" * 8, "ts", {"run_id": "r1", "branch": "main", "addresses": addrs}) out = self._capture_text(ev) assert "+7 more" in out # ───────────────────────────────────────────────────────────────────────────── # Unit — _check_expirations # ───────────────────────────────────────────────────────────────────────────── class TestCheckExpirations: def test_no_change_no_events(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) active_ids = {res.reservation_id} events, curr = _check_expirations(repo, active_ids, set()) assert events == [] assert res.reservation_id in curr def test_expired_reservation_emits_event(self, repo: pathlib.Path) -> None: """An ID that was active but is no longer → expired event.""" gone_id = _new_id() active_ids = {gone_id} # Create the file with an explicit past expiry so _load_record can find it # and active_reservations() correctly excludes it. past_iso = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) ).isoformat() now_iso = now_utc_iso() path = _coord_dir(repo) / "reservations" / f"{gone_id}.json" path.write_text(json.dumps({ "reservation_id": gone_id, "run_id": "r1", "branch": "main", "addresses": ["src/mod.py::fn"], "created_at": now_iso, "expires_at": past_iso, "operation": "modify", "schema_version": "1.0.0", })) events, curr = _check_expirations(repo, active_ids, set()) assert any(e.event_type == "expired" and e.id == gone_id for e in events) assert gone_id not in curr def test_removed_id_not_double_counted(self, repo: pathlib.Path) -> None: """ID in removed_ids must NOT produce an expired event too.""" gone_id = _new_id() events, _ = _check_expirations(repo, {gone_id}, {gone_id}) assert events == [] def test_active_reservation_stays_active(self, repo: pathlib.Path) -> None: """Live reservation is NOT reported as expired.""" res = _make_reservation(repo) prev = {res.reservation_id} events, curr = _check_expirations(repo, prev, set()) assert not any(e.event_type == "expired" for e in events) def test_returns_current_active_ids(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) _, curr = _check_expirations(repo, set(), set()) assert res.reservation_id in curr # ───────────────────────────────────────────────────────────────────────────── # Unit — _make_event # ───────────────────────────────────────────────────────────────────────────── class TestMakeEvent: def test_timestamp_is_utc_iso8601(self) -> None: ev = _make_event("added", "reservation", "uid1", {}) # Must parse as UTC datetime. dt = datetime.datetime.fromisoformat(ev.timestamp) assert dt.tzinfo is not None def test_fields_set_correctly(self) -> None: ev = _make_event("removed", "intent", "uid2", {"x": 1}) assert ev.event_type == "removed" assert ev.kind == "intent" assert ev.id == "uid2" assert ev.data == {"x": 1} # ───────────────────────────────────────────────────────────────────────────── # Integration — _watch_loop --once # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopOnce: def test_empty_repo_no_events(self, repo: pathlib.Path) -> None: events = _run_loop(repo, once=True) assert events == [] def test_existing_reservation_emits_snapshot(self, repo: pathlib.Path) -> None: _make_reservation(repo, run_id="agent-1") events = _run_loop(repo, once=True) kinds = [e["kind"] for e in events] assert "reservations" in kinds def test_snapshot_event_type(self, repo: pathlib.Path) -> None: _make_reservation(repo) events = _run_loop(repo, once=True) assert all(e["event_type"] == "snapshot" for e in events) def test_all_kinds_emitted_in_snapshot(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) create_intent(repo, res.reservation_id, "agent-1", "main", ["src/mod.py::fn"], "modify") events = _run_loop(repo, once=True) kinds = {e["kind"] for e in events} assert "reservations" in kinds assert "intents" in kinds def test_snapshot_carries_data(self, repo: pathlib.Path) -> None: _make_reservation(repo, run_id="agent-snapshot-test") events = _run_loop(repo, once=True) res_events = [e for e in events if e["kind"] == "reservations"] assert any(e["data"].get("run_id") == "agent-snapshot-test" for e in res_events) def test_once_does_not_loop(self, repo: pathlib.Path) -> None: """--once must return quickly (no blocking wait calls).""" start = time.monotonic() _run_loop(repo, once=True) elapsed = time.monotonic() - start assert elapsed < 1.0 # Must not block for poll_interval. # ───────────────────────────────────────────────────────────────────────────── # Integration — filters in _watch_loop # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopFilters: def test_kind_filter_reservations(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) create_intent(repo, res.reservation_id, "agent-1", "main", ["src/mod.py::fn"], "modify") events = _run_loop(repo, once=True, kind_filter="reservations") assert all(e["kind"] == "reservations" for e in events) def test_kind_filter_intents(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) create_intent(repo, res.reservation_id, "agent-1", "main", ["src/mod.py::fn"], "modify") events = _run_loop(repo, once=True, kind_filter="intents") assert all(e["kind"] == "intents" for e in events) def test_run_id_filter(self, repo: pathlib.Path) -> None: _make_reservation(repo, run_id="agent-A") _make_reservation(repo, run_id="agent-B") events = _run_loop(repo, once=True, run_id_filter="agent-A") assert all(e["data"].get("run_id") == "agent-A" for e in events) assert len(events) == 1 def test_branch_filter(self, repo: pathlib.Path) -> None: _make_reservation(repo, branch="main") _make_reservation(repo, branch="feature/x") events = _run_loop(repo, once=True, branch_filter="main") assert all(e["data"].get("branch") == "main" for e in events) assert len(events) == 1 def test_combined_filters(self, repo: pathlib.Path) -> None: _make_reservation(repo, run_id="agent-A", branch="main") _make_reservation(repo, run_id="agent-A", branch="feature/x") _make_reservation(repo, run_id="agent-B", branch="main") events = _run_loop(repo, once=True, run_id_filter="agent-A", branch_filter="main") assert len(events) == 1 assert events[0]["data"]["run_id"] == "agent-A" assert events[0]["data"]["branch"] == "main" def test_no_match_no_events(self, repo: pathlib.Path) -> None: _make_reservation(repo, run_id="agent-X") events = _run_loop(repo, once=True, run_id_filter="agent-NOBODY") assert events == [] # ───────────────────────────────────────────────────────────────────────────── # Integration — added events # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopDetectsAdded: def test_new_reservation_emits_added(self, repo: pathlib.Path) -> None: added = [] def _write_res() -> None: _make_reservation(repo, run_id="agent-new") events = _run_loop(repo, side_effect=_write_res, once=False) added = [e for e in events if e["event_type"] == "added"] assert len(added) == 1 assert added[0]["kind"] == "reservations" assert added[0]["data"]["run_id"] == "agent-new" def test_new_intent_emits_added(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) def _write_intent() -> None: create_intent(repo, res.reservation_id, "agent-1", "main", ["src/mod.py::fn"], "modify") events = _run_loop(repo, side_effect=_write_intent, once=False) added = [e for e in events if e["event_type"] == "added" and e["kind"] == "intents"] assert len(added) == 1 def test_added_event_carries_data(self, repo: pathlib.Path) -> None: def _write_res() -> None: _make_reservation(repo, run_id="agent-data-test", branch="feature/y") events = _run_loop(repo, side_effect=_write_res, once=False) added = [e for e in events if e["event_type"] == "added"] assert added[0]["data"]["run_id"] == "agent-data-test" assert added[0]["data"]["branch"] == "feature/y" # ───────────────────────────────────────────────────────────────────────────── # Integration — modified events # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopDetectsModified: def test_heartbeat_update_emits_modified(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) # Write an initial heartbeat so the file exists in the snapshot. create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=100) def _update_hb() -> None: # Small sleep ensures mtime changes on fast filesystems. time.sleep(0.02) create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=200) events = _run_loop(repo, side_effect=_update_hb, once=False) modified = [e for e in events if e["event_type"] == "modified" and e["kind"] == "heartbeats"] assert len(modified) == 1 def test_modified_event_carries_new_data(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=100) def _update_hb() -> None: time.sleep(0.02) create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=9999) events = _run_loop(repo, side_effect=_update_hb, once=False) modified = [e for e in events if e["event_type"] == "modified"] if modified: # Data should reflect the new heartbeat. assert "extended_expires_at" in modified[0]["data"] # ───────────────────────────────────────────────────────────────────────────── # Integration — removed events # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopDetectsRemoved: def test_deleted_file_emits_removed(self, repo: pathlib.Path) -> None: res = _make_reservation(repo) # Snapshot includes the reservation. path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" def _delete_file() -> None: path.unlink() events = _run_loop(repo, side_effect=_delete_file, once=False) removed = [e for e in events if e["event_type"] == "removed"] assert len(removed) == 1 assert removed[0]["id"] == res.reservation_id def test_removed_event_carries_cached_data(self, repo: pathlib.Path) -> None: """Even after deletion, removed events carry the last-known data.""" res = _make_reservation(repo, run_id="agent-cached") path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" def _delete_file() -> None: path.unlink() events = _run_loop(repo, side_effect=_delete_file, once=False) removed = [e for e in events if e["event_type"] == "removed"] assert removed[0]["data"].get("run_id") == "agent-cached" def test_removed_event_passes_run_id_filter(self, repo: pathlib.Path) -> None: """Filter by run_id must work for removed events using cached data.""" res = _make_reservation(repo, run_id="agent-filter-test") path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" def _delete_file() -> None: path.unlink() events = _run_loop( repo, side_effect=_delete_file, once=False, run_id_filter="agent-filter-test", ) removed = [e for e in events if e["event_type"] == "removed"] assert len(removed) == 1 # ───────────────────────────────────────────────────────────────────────────── # Integration — expiration events # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopDetectsExpiry: def test_expired_reservation_emits_expired(self, repo: pathlib.Path) -> None: """Reservation active at startup that expires during loop → expired event. Strategy: create an active reservation, then the side_effect rewrites the file with expires_at in the past. The loop sees it as modified on the FS AND sees it drop out of active_reservations() → fires expired event. """ res = _make_reservation(repo, ttl_seconds=3600) path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" def _expire_it() -> None: data = json.loads(path.read_text()) past = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=2) ).isoformat() data["expires_at"] = past path.write_text(json.dumps(data)) events = _run_loop(repo, side_effect=_expire_it, once=False) expired = [e for e in events if e["event_type"] == "expired"] assert any(e["id"] == res.reservation_id for e in expired) def test_active_reservation_no_expired_event(self, repo: pathlib.Path) -> None: res = _make_reservation(repo, ttl_seconds=9999) events = _run_loop(repo, once=False) expired = [e for e in events if e["event_type"] == "expired"] assert not any(e["id"] == res.reservation_id for e in expired) def test_removed_reservation_not_expired(self, repo: pathlib.Path) -> None: """GC'd reservation (file deleted) must not also fire expired. Strategy: create an active reservation, then the side_effect deletes it (simulating GC). Must see a removed event but NOT an additional expired event for the same ID. """ res = _make_reservation(repo, ttl_seconds=3600) path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" def _delete_file() -> None: path.unlink() events = _run_loop(repo, side_effect=_delete_file, once=False) removed = [e for e in events if e["event_type"] == "removed" and e["id"] == res.reservation_id] expired_dup = [ e for e in events if e["event_type"] == "expired" and e["id"] == res.reservation_id ] # Must see a removed event, must NOT also see expired for the same ID. assert removed assert not expired_dup # ───────────────────────────────────────────────────────────────────────────── # Integration — timeout # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopTimeout: def test_timeout_zero_equivalent_to_once(self, repo: pathlib.Path) -> None: """timeout=0 should exit after snapshot, no looping.""" _make_reservation(repo) start = time.monotonic() events = _run_loop(repo, once=False, timeout=0.0) elapsed = time.monotonic() - start # Should return quickly (snapshot only). assert elapsed < 1.0 # Should still emit the snapshot. assert any(e["event_type"] == "snapshot" for e in events) def test_loop_exits_after_timeout(self, repo: pathlib.Path) -> None: """With a real PollingBackend and short timeout, loop exits.""" buf = io.StringIO() backend = _PollingBackend(0.05) start = time.monotonic() with redirect_stdout(buf): _watch_loop( repo, backend, kind_filter=None, run_id_filter=None, branch_filter=None, as_json=True, once=False, timeout=0.2, poll_interval=0.05, ) elapsed = time.monotonic() - start assert elapsed < 2.0 # Must not run forever. # ───────────────────────────────────────────────────────────────────────────── # Integration — JSON output # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopJsonOutput: def test_all_snapshot_events_are_valid_json(self, repo: pathlib.Path) -> None: _make_reservation(repo) events = _run_loop(repo, once=True, as_json=True) for ev in events: # Already parsed by _run_loop, so this just checks structure. assert "schema_version" in ev assert "event_type" in ev assert "kind" in ev assert "id" in ev assert "timestamp" in ev assert "data" in ev def test_json_event_type_values(self, repo: pathlib.Path) -> None: _make_reservation(repo) events = _run_loop(repo, once=True) valid_types = {"snapshot", "added", "modified", "removed", "expired"} for ev in events: assert ev["event_type"] in valid_types # ───────────────────────────────────────────────────────────────────────────── # End-to-end — run() with mocked require_repo # ───────────────────────────────────────────────────────────────────────────── class TestRunCommand: def _make_args(self, repo: pathlib.Path, **kwargs: MsgpackValue) -> argparse.Namespace: import argparse ns = argparse.Namespace() ns.once = kwargs.get("once", True) ns.timeout = kwargs.get("timeout", None) ns.max_events = kwargs.get("max_events", None) ns.poll_interval = kwargs.get("poll_interval", 0.1) ns.run_id = kwargs.get("run_id", None) ns.branch_filter = kwargs.get("branch_filter", None) ns.kind = kwargs.get("kind", None) ns.json_out = kwargs.get("json_out", True) return ns def test_run_once_emits_snapshot(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run _make_reservation(repo) args = self._make_args(repo, once=True) buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): run(args) lines = [l for l in buf.getvalue().splitlines() if l.strip()] events = [json.loads(l) for l in lines] assert any(e["event_type"] == "snapshot" for e in events) def test_run_text_mode_header_printed(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.watch_coord import run args = self._make_args(repo, once=True, json_out=False) with patch("muse.cli.commands.watch_coord.require_repo", return_value=repo): run(args) captured = capsys.readouterr() assert "muse coord watch" in captured.out assert "watch ended" in captured.out def test_run_json_mode_no_header(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.watch_coord import run args = self._make_args(repo, once=True, json_out=True) with patch("muse.cli.commands.watch_coord.require_repo", return_value=repo): run(args) captured = capsys.readouterr() lines = [l for l in captured.out.splitlines() if l.strip()] for line in lines: # Every non-empty line must be valid JSON. json.loads(line) def test_run_timeout_zero_treated_as_once(self, repo: pathlib.Path) -> None: """--timeout 0 must not block.""" from muse.cli.commands.watch_coord import run args = self._make_args(repo, once=False, timeout=0.0, json_out=True) start = time.monotonic() buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): run(args) assert time.monotonic() - start < 1.0 # ───────────────────────────────────────────────────────────────────────────── # End-to-end — backend selection # ───────────────────────────────────────────────────────────────────────────── class TestBackendSelection: def test_polling_backend_wait_sleeps(self) -> None: b = _PollingBackend(0.05) start = time.monotonic() result = b.wait(0.1) elapsed = time.monotonic() - start assert result is True assert elapsed >= 0.04 # Should have slept. b.close() def test_polling_backend_respects_timeout_cap(self) -> None: """wait(timeout) must not sleep longer than timeout.""" b = _PollingBackend(10.0) # Long interval. start = time.monotonic() b.wait(0.05) # Short timeout. elapsed = time.monotonic() - start assert elapsed < 1.0 # Capped at 0.05 s. b.close() def test_polling_backend_close_is_idempotent(self) -> None: b = _PollingBackend(1.0) b.close() b.close() # Should not raise. @pytest.mark.skipif( not hasattr(__import__("select"), "kqueue"), reason="kqueue not available on this platform", ) def test_kqueue_backend_initialises(self, repo: pathlib.Path) -> None: """kqueue backend can be created and closed without error.""" from muse.cli.commands.watch_coord import _SUBDIRS dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS] b = _KqueueBackend(dirs) assert b.name == "kqueue" b.close() @pytest.mark.skipif( not hasattr(__import__("select"), "kqueue"), reason="kqueue not available on this platform", ) def test_kqueue_backend_detects_new_file(self, repo: pathlib.Path) -> None: """kqueue wakes when a new JSON file is added to a watched dir.""" from muse.cli.commands.watch_coord import _SUBDIRS dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS] b = _KqueueBackend(dirs) try: # No change — should time out. result_before = b.wait(0.05) # Add a file. (_coord_dir(repo) / "reservations" / "newfile.json").write_text("{}") # Should detect change. result_after = b.wait(0.5) assert result_after is True finally: b.close() @pytest.mark.skipif( not hasattr(__import__("select"), "kqueue"), reason="kqueue not available on this platform", ) def test_kqueue_backend_close_releases_fds(self, repo: pathlib.Path) -> None: """All fds must be released after close.""" import resource from muse.cli.commands.watch_coord import _SUBDIRS dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS] fds_before = resource.getrlimit(resource.RLIMIT_NOFILE)[0] b = _KqueueBackend(dirs) b.close() # Just ensure no exception — fd leak would only show in long-running tests. # ───────────────────────────────────────────────────────────────────────────── # Security tests # ───────────────────────────────────────────────────────────────────────────── class TestAnsiInjectionSanitized: def test_ansi_in_run_id_stripped_text_output(self) -> None: """ESC sequences in run_id must not reach stdout in text mode.""" malicious = "\x1b[1;31mROOT\x1b[0m" ev = WatchEvent("added", "reservation", "a" * 8, "ts", {"run_id": malicious, "branch": "main", "addresses": ["f.py::fn"]}) buf = io.StringIO() with redirect_stdout(buf): _emit_event(ev, as_json=False) assert "\x1b" not in buf.getvalue() assert "ROOT" in buf.getvalue() def test_ansi_in_branch_stripped(self) -> None: malicious_branch = "\x1b[4mmaster\x1b[0m" ev = WatchEvent("added", "reservation", "b" * 8, "ts", {"run_id": "agent", "branch": malicious_branch, "addresses": ["f.py::fn"]}) buf = io.StringIO() with redirect_stdout(buf): _emit_event(ev, as_json=False) assert "\x1b" not in buf.getvalue() def test_ansi_in_address_stripped(self) -> None: malicious_addr = "\x1b[32msrc/malicious.py::inject\x1b[0m" ev = WatchEvent("added", "reservation", "c" * 8, "ts", {"run_id": "agent", "branch": "main", "addresses": [malicious_addr]}) buf = io.StringIO() with redirect_stdout(buf): _emit_event(ev, as_json=False) assert "\x1b" not in buf.getvalue() def test_json_output_preserves_raw_data(self) -> None: """JSON output must preserve raw data as-is (no sanitisation).""" malicious = "\x1b[31mmalicious\x1b[0m" ev = WatchEvent("added", "reservation", "d" * 8, "ts", {"run_id": malicious, "branch": "main", "addresses": ["f.py::fn"]}) buf = io.StringIO() with redirect_stdout(buf): _emit_event(ev, as_json=True) parsed = json.loads(buf.getvalue()) assert parsed["data"]["run_id"] == malicious # Stored as-is in JSON. class TestSymlinkDirRejected: @pytest.mark.skipif( not hasattr(__import__("select"), "kqueue"), reason="kqueue not available on this platform", ) def test_symlinked_coord_dir_raises_valueerror(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: """kqueue backend must refuse to watch a symlinked directory.""" import shutil # Create a real target dir for the symlink to point at. real_dir = tmp_path / "real_target" real_dir.mkdir() link_dir = _coord_dir(repo) / "reservations" # Remove the existing real directory before replacing with a symlink. shutil.rmtree(link_dir, ignore_errors=True) link_dir.symlink_to(real_dir) try: dirs = [link_dir] with pytest.raises(ValueError, match="symlink"): _KqueueBackend(dirs) finally: link_dir.unlink(missing_ok=True) # Restore the real dir for other tests. link_dir.mkdir(exist_ok=True) class TestKindFilterAllowlist: def test_valid_kinds_accepted(self) -> None: for kind in ("reservations", "intents", "releases", "heartbeats"): assert _passes_filters(kind, {}, kind, None, None) def test_arbitrary_kind_string_rejected_by_filter(self) -> None: """_passes_filters rejects unrecognised kinds when kind_filter is set.""" assert not _passes_filters("../etc", {}, "reservations", None, None) def test_argparse_rejects_invalid_kind(self) -> None: """argparse choices validation rejects invalid --kind values.""" import argparse from muse.cli.commands.watch_coord import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) with pytest.raises(SystemExit): p.parse_args(["watch", "--kind", "invalid_kind"]) # ───────────────────────────────────────────────────────────────────────────── # Stress tests # ───────────────────────────────────────────────────────────────────────────── class TestStress500RapidAdds: def test_500_reservations_all_detected(self, repo: pathlib.Path) -> None: """500 pre-existing reservations must all appear in the snapshot.""" # Write 500 reservation files directly (bypass create_reservation for speed). res_dir = _coord_dir(repo) / "reservations" now_iso = now_utc_iso() exp_iso = ( datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1) ).isoformat() ids = [] for i in range(500): uid = _new_id() ids.append(uid) data = { "reservation_id": uid, "run_id": f"agent-{i}", "branch": "main", "addresses": [f"src/mod{i}.py::fn"], "created_at": now_iso, "expires_at": exp_iso, "operation": "modify", "schema_version": "1.0.0", } (res_dir / f"{uid}.json").write_text(json.dumps(data)) start = time.monotonic() events = _run_loop(repo, once=True) elapsed = time.monotonic() - start snapshot_events = [e for e in events if e["event_type"] == "snapshot"] snapshot_ids = {e["id"] for e in snapshot_events} assert snapshot_ids == set(ids) assert elapsed < 3.0, f"500 snapshot took {elapsed:.2f}s (limit: 3s)" class TestStressLargeSnapshot: def test_1000_records_snapshot_under_1s(self, repo: pathlib.Path) -> None: """Snapshot of 1000 mixed records must complete in under 1 second.""" now_iso = now_utc_iso() exp_iso = ( datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1) ).isoformat() for i in range(250): for kind in ("reservations", "intents", "releases", "heartbeats"): uid = _new_id() (_coord_dir(repo) / kind / f"{uid}.json").write_text( json.dumps({"id": uid, "run_id": f"agent-{i}", "branch": "main", "created_at": now_iso, "expires_at": exp_iso}) ) start = time.monotonic() snap = _scan_dirs(repo) elapsed = time.monotonic() - start total = sum(len(v) for v in snap.values()) assert total == 1000 assert elapsed < 1.0, f"scan of 1000 records took {elapsed:.3f}s" class TestStressDiffPerformance: def test_diff_1000_entries_under_50ms(self) -> None: """Diffing two 1000-entry snapshots must take < 50 ms.""" old: _Snapshot = {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")} new: _Snapshot = {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")} for i in range(250): for sub in ("reservations", "intents", "releases", "heartbeats"): uid = _new_id() old[sub][uid] = (i * 1000, i * 10) new[sub][uid] = (i * 1000, i * 10) # Add 10 new entries to make the diff non-trivial. for _ in range(10): new["reservations"][_new_id()] = (999999, 99) start = time.monotonic() for _ in range(100): # 100 diff calls. _diff_snapshots(old, new) elapsed = time.monotonic() - start assert elapsed < 5.0, f"100 × diff took {elapsed:.3f}s (limit: 5s)" class TestStressManyExpirations: def test_200_expired_reservations_detected(self, repo: pathlib.Path) -> None: """200 expired reservations must all emit expired events.""" res_dir = _coord_dir(repo) / "reservations" past_iso = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1) ).isoformat() now_iso = now_utc_iso() ids = set() for i in range(200): uid = _new_id() ids.add(uid) data = { "reservation_id": uid, "run_id": f"agent-{i}", "branch": "main", "addresses": [f"src/m{i}.py::fn"], "created_at": now_iso, "expires_at": past_iso, # Already expired. "operation": "modify", "schema_version": "1.0.0", } (res_dir / f"{uid}.json").write_text(json.dumps(data)) # prev_active_ids claims all 200 were active. start = time.monotonic() exp_events, curr = _check_expirations(repo, ids, set()) elapsed = time.monotonic() - start expired_ids = {e.id for e in exp_events} assert expired_ids == ids assert elapsed < 2.0, f"200 expiration checks took {elapsed:.3f}s (limit: 2s)" # ───────────────────────────────────────────────────────────────────────────── # Integration — _watch_loop with max_events # ───────────────────────────────────────────────────────────────────────────── class TestWatchLoopMaxEvents: def test_max_events_1_returns_single_snapshot(self, repo: pathlib.Path) -> None: """max_events=1 must emit exactly 1 event even if more exist.""" for i in range(5): _make_reservation(repo, run_id=f"ag-{i}") events = _run_loop(repo, once=True, max_events=1) assert len(events) == 1 def test_max_events_3_caps_at_3(self, repo: pathlib.Path) -> None: for i in range(10): _make_reservation(repo, run_id=f"ag-{i}") events = _run_loop(repo, once=True, max_events=3) assert len(events) == 3 def test_max_events_larger_than_existing_returns_all(self, repo: pathlib.Path) -> None: for i in range(4): _make_reservation(repo, run_id=f"ag-{i}") events = _run_loop(repo, once=True, max_events=100) assert len(events) == 4 def test_max_events_zero_is_rejected_by_run(self, repo: pathlib.Path) -> None: """run() must reject max_events=0 with USER_ERROR.""" from muse.cli.commands.watch_coord import run as watch_run ns = argparse.Namespace( once=True, timeout=None, max_events=0, poll_interval=0.1, run_id=None, branch_filter=None, kind=None, json_out=True, ) buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): with pytest.raises(SystemExit) as exc_info: watch_run(ns) assert exc_info.value.code == ExitCode.USER_ERROR def test_max_events_negative_is_rejected_by_run(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run ns = argparse.Namespace( once=True, timeout=None, max_events=-5, poll_interval=0.1, run_id=None, branch_filter=None, kind=None, json_out=True, ) buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): with pytest.raises(SystemExit) as exc_info: watch_run(ns) assert exc_info.value.code == ExitCode.USER_ERROR def test_max_events_caps_added_events_in_loop(self, repo: pathlib.Path) -> None: """max_events must also cap events emitted during the loop (not just snapshots).""" # No pre-existing records; add 10 during the side effect. def _add_records() -> None: for i in range(10): _make_reservation(repo, run_id=f"ag-{i}") events = _run_loop( repo, once=False, timeout=1.0, side_effect=_add_records, max_events=3, ) assert len(events) <= 3 def test_max_events_json_error_is_compact(self, repo: pathlib.Path) -> None: """Error for bad max_events in JSON mode must be a single compact line.""" from muse.cli.commands.watch_coord import run as watch_run ns = argparse.Namespace( once=True, timeout=None, max_events=0, poll_interval=0.1, run_id=None, branch_filter=None, kind=None, json_out=True, ) buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): with pytest.raises(SystemExit): watch_run(ns) out = buf.getvalue().strip() assert "\n" not in out data = json.loads(out) assert "error" in data # ───────────────────────────────────────────────────────────────────────────── # End-to-end — run() input validation # ───────────────────────────────────────────────────────────────────────────── class TestRunCommandValidation: def _make_args(self, **kwargs: MsgpackValue) -> argparse.Namespace: ns = argparse.Namespace() ns.once = kwargs.get("once", True) ns.timeout = kwargs.get("timeout", None) ns.max_events = kwargs.get("max_events", None) ns.poll_interval = kwargs.get("poll_interval", 0.1) ns.run_id = kwargs.get("run_id", None) ns.branch_filter = kwargs.get("branch_filter", None) ns.kind = kwargs.get("kind", None) ns.json_out = kwargs.get("json_out", False) return ns def test_run_id_at_max_length_accepted(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run run_id = "a" * _MAX_RUN_ID_LEN ns = self._make_args(run_id=run_id, once=True) buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): watch_run(ns) # must not raise def test_run_id_over_max_exits_user_error(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run run_id = "a" * (_MAX_RUN_ID_LEN + 1) ns = self._make_args(run_id=run_id) with pytest.raises(SystemExit) as exc_info: watch_run(ns) assert exc_info.value.code == ExitCode.USER_ERROR def test_run_id_over_max_json_returns_error_field(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run run_id = "a" * (_MAX_RUN_ID_LEN + 1) ns = self._make_args(run_id=run_id, json_out=True) buf = io.StringIO() with redirect_stdout(buf): with pytest.raises(SystemExit): watch_run(ns) out = buf.getvalue().strip() assert "\n" not in out # Compact JSON. data = json.loads(out) assert "error" in data assert data.get("status") == "bad_args" def test_poll_interval_below_min_exits_user_error(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run ns = self._make_args(poll_interval=0.001) with pytest.raises(SystemExit) as exc_info: watch_run(ns) assert exc_info.value.code == ExitCode.USER_ERROR def test_poll_interval_above_max_exits_user_error(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run ns = self._make_args(poll_interval=99999.0) with pytest.raises(SystemExit) as exc_info: watch_run(ns) assert exc_info.value.code == ExitCode.USER_ERROR def test_poll_interval_at_min_accepted(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run ns = self._make_args(poll_interval=_MIN_POLL_INTERVAL, once=True) buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): watch_run(ns) # must not raise def test_poll_interval_at_max_accepted(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run ns = self._make_args(poll_interval=_MAX_POLL_INTERVAL, once=True) buf = io.StringIO() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): watch_run(ns) # must not raise def test_timeout_negative_exits_user_error(self, repo: pathlib.Path) -> None: from muse.cli.commands.watch_coord import run as watch_run ns = self._make_args(timeout=-1.0) with pytest.raises(SystemExit) as exc_info: watch_run(ns) assert exc_info.value.code == ExitCode.USER_ERROR def test_timeout_zero_accepted_as_once(self, repo: pathlib.Path) -> None: """--timeout 0 must complete quickly (treated as --once).""" from muse.cli.commands.watch_coord import run as watch_run ns = self._make_args(timeout=0.0, json_out=True) buf = io.StringIO() start = time.monotonic() with ( patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), redirect_stdout(buf), ): watch_run(ns) assert time.monotonic() - start < 1.0 def test_validation_fires_before_require_repo(self) -> None: """Bad --run-id must fail before require_repo() is ever called.""" from muse.cli.commands.watch_coord import run as watch_run run_id = "x" * (_MAX_RUN_ID_LEN + 1) ns = argparse.Namespace( once=True, timeout=None, max_events=None, poll_interval=0.1, run_id=run_id, branch_filter=None, kind=None, json_out=False, ) called = [] with patch( "muse.cli.commands.watch_coord.require_repo", side_effect=lambda: called.append(True), ): with pytest.raises(SystemExit): watch_run(ns) assert called == [], "require_repo must not be called before validation passes" def test_error_message_has_check_mark_prefix(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Validation error on stderr must start with ❌.""" from muse.cli.commands.watch_coord import run as watch_run ns = self._make_args(poll_interval=0.0001) with pytest.raises(SystemExit): watch_run(ns) err = capsys.readouterr().err assert "❌" in err # ───────────────────────────────────────────────────────────────────────────── # Stress — max_events under load # ───────────────────────────────────────────────────────────────────────────── class TestStressMaxEvents: def test_max_events_10_from_500_records_returns_exactly_10(self, repo: pathlib.Path) -> None: """max_events=10 must return exactly 10 events even with 500 records.""" res_dir = _coord_dir(repo) / "reservations" now_iso = now_utc_iso() exp_iso = ( datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1) ).isoformat() for i in range(500): uid = _new_id() data = { "reservation_id": uid, "run_id": f"agent-{i}", "branch": "main", "addresses": [f"src/mod{i}.py::fn"], "created_at": now_iso, "expires_at": exp_iso, "operation": "modify", "schema_version": "1.0.0", } (res_dir / f"{uid}.json").write_text(json.dumps(data)) start = time.monotonic() events = _run_loop(repo, once=True, max_events=10) elapsed = time.monotonic() - start assert len(events) == 10 assert elapsed < 2.0, f"max_events cap with 500 records took {elapsed:.2f}s" # --------------------------------------------------------------------------- # Flag registration # --------------------------------------------------------------------------- class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.watch_coord import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["watch", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j") assert ns.json_out is True