test_cmd_watch_coord.py
file-level
1
files
1
commits
0
hotspots
0
π§ dead
0
π₯ blast risk
| 1 | """Tests for ``muse coord watch`` (watch_coord.py). |
| 2 | |
| 3 | Coverage matrix |
| 4 | --------------- |
| 5 | Unit |
| 6 | TestWatchEvent β to_dict() schema, all fields present |
| 7 | TestScanDirs β empty dir, missing dir, multiple kinds, TOCTOU safety |
| 8 | TestDiffSnapshots β no change, added, removed, modified, cross-kind |
| 9 | TestPassesFilters β kind/run_id/branch filters, AND semantics, empty data |
| 10 | TestEmitEventJson β JSON mode produces valid NDJSON, all event types |
| 11 | TestEmitEventText β text mode, all kinds, missing data, ANSI stripped |
| 12 | TestCheckExpirations β expired emitted, active not, removed not double-counted |
| 13 | TestMakeEvent β timestamp is UTC ISO 8601 |
| 14 | |
| 15 | Integration |
| 16 | TestWatchLoopOnce β --once emits snapshot for all existing records |
| 17 | TestWatchLoopFilters β kind/run_id/branch filters in loop |
| 18 | TestWatchLoopDetectsAdded β new reservation appears β added event |
| 19 | TestWatchLoopDetectsModified β heartbeat updated β modified event |
| 20 | TestWatchLoopDetectsRemoved β file deleted β removed event (with cached data) |
| 21 | TestWatchLoopDetectsExpiry β reservation expires β expired event (no file change) |
| 22 | TestWatchLoopTimeout β loop exits after timeout seconds |
| 23 | TestWatchLoopJsonOutput β all events are valid NDJSON in json mode |
| 24 | TestWatchLoopMaxEvents β loop exits after --max-events events emitted |
| 25 | |
| 26 | End-to-end |
| 27 | TestRunCommand β run() with --once via mock require_repo |
| 28 | TestRunCommandValidation β run() validation: run-id/poll-interval/timeout/max-events |
| 29 | TestBackendSelection β kqueue backend on macOS, polling on other |
| 30 | TestSignalHandling β SIGTERM triggers clean exit |
| 31 | |
| 32 | Security |
| 33 | TestAnsiInjectionSanitized β ANSI escapes in run_id/branch stripped from text |
| 34 | TestSymlinkDirRejected β kqueue raises ValueError on symlinked coord dir |
| 35 | TestKindFilterAllowlist β invalid kind strings rejected by argparse |
| 36 | |
| 37 | Stress |
| 38 | TestStress500RapidAdds β 500 reservations added β all 500 added events |
| 39 | TestStressLargeSnapshot β 1000 existing records β snapshot < 1 s |
| 40 | TestStressDiffPerformance β diff of 2Γ1000-entry snapshots < 50 ms |
| 41 | TestStressManyExpirations β 200 expiring reservations β all expired events < 2 s |
| 42 | TestStressMaxEvents β max_events cap respected under 500-record load |
| 43 | """ |
| 44 | |
| 45 | from __future__ import annotations |
| 46 | |
| 47 | import argparse |
| 48 | import datetime |
| 49 | import io |
| 50 | import itertools |
| 51 | import json |
| 52 | import pathlib |
| 53 | import tempfile |
| 54 | import time |
| 55 | from collections.abc import Callable |
| 56 | from contextlib import redirect_stdout |
| 57 | from unittest.mock import MagicMock, patch |
| 58 | |
| 59 | import pytest |
| 60 | |
| 61 | from muse.core.types import Manifest, MsgpackDict, MsgpackValue, content_hash, fake_id, now_utc_iso |
| 62 | from muse.core.paths import muse_dir |
| 63 | |
| 64 | _id_seq = itertools.count() |
| 65 | |
| 66 | |
| 67 | def _new_id() -> str: |
| 68 | return content_hash({"seq": next(_id_seq)}) |
| 69 | |
| 70 | # Module under test. |
| 71 | from muse.cli.commands.watch_coord import ( |
| 72 | WatchEvent, |
| 73 | _Backend, |
| 74 | _KqueueBackend, |
| 75 | _MAX_RUN_ID_LEN, |
| 76 | _MIN_POLL_INTERVAL, |
| 77 | _MAX_POLL_INTERVAL, |
| 78 | _PollingBackend, |
| 79 | _Snapshot, |
| 80 | _check_expirations, |
| 81 | _coord_dir, |
| 82 | _diff_snapshots, |
| 83 | _emit_event, |
| 84 | _ensure_coord_dirs, |
| 85 | _load_record, |
| 86 | _make_event, |
| 87 | _passes_filters, |
| 88 | _scan_dirs, |
| 89 | _watch_loop, |
| 90 | ) |
| 91 | from muse.core.coordination import ( |
| 92 | Reservation, |
| 93 | create_heartbeat, |
| 94 | create_intent, |
| 95 | create_reservation, |
| 96 | ) |
| 97 | from muse.core.errors import ExitCode |
| 98 | |
| 99 | |
| 100 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 101 | # Fixtures |
| 102 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 103 | |
| 104 | |
| 105 | @pytest.fixture |
| 106 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 107 | """Minimal muse repository with coord dirs created.""" |
| 108 | dot_muse = muse_dir(tmp_path) |
| 109 | dot_muse.mkdir() |
| 110 | (dot_muse / "repo.json").write_text( |
| 111 | json.dumps({"repo_id": fake_id("watch-coord-repo"), "name": "test-repo"}) |
| 112 | ) |
| 113 | _ensure_coord_dirs(tmp_path) |
| 114 | return tmp_path |
| 115 | |
| 116 | |
| 117 | class _ImmediateBackend(_Backend): |
| 118 | """Test backend that fires a side-effect once then sleeps so the timeout fires. |
| 119 | |
| 120 | Design: ``wait()`` calls the side_effect on the first invocation (so files |
| 121 | can be written between the snapshot and the next scan), then sleeps for |
| 122 | ``timeout`` on all subsequent calls. This means the outer ``_watch_loop`` |
| 123 | will do exactly one meaningful diff iteration (the one that sees the |
| 124 | side-effect's changes) and then block until the timeout deadline expires. |
| 125 | |
| 126 | Always pass ``timeout`` (a finite deadline) when using ``once=False``. |
| 127 | """ |
| 128 | |
| 129 | name = "immediate" |
| 130 | |
| 131 | def __init__( |
| 132 | self, |
| 133 | *, |
| 134 | side_effect: Callable[[], None] | None = None, |
| 135 | ) -> None: |
| 136 | self._side_effect = side_effect |
| 137 | self._fired = False |
| 138 | |
| 139 | def wait(self, timeout: float) -> bool: |
| 140 | if not self._fired: |
| 141 | if self._side_effect: |
| 142 | self._side_effect() |
| 143 | self._fired = True |
| 144 | return True # Signal: something may have changed. |
| 145 | # Already fired β sleep so the loop deadline can expire. |
| 146 | time.sleep(max(0.0, timeout)) |
| 147 | return False |
| 148 | |
| 149 | def close(self) -> None: |
| 150 | pass |
| 151 | |
| 152 | |
| 153 | def _make_reservation(repo: pathlib.Path, **kwargs: MsgpackValue) -> Reservation: |
| 154 | """Create a reservation with sensible defaults.""" |
| 155 | return create_reservation( |
| 156 | repo, |
| 157 | run_id=kwargs.get("run_id", "agent-1"), |
| 158 | branch=kwargs.get("branch", "main"), |
| 159 | addresses=kwargs.get("addresses", ["src/mod.py::fn"]), |
| 160 | ttl_seconds=kwargs.get("ttl_seconds", 3600), |
| 161 | operation=kwargs.get("operation", "modify"), |
| 162 | ) |
| 163 | |
| 164 | |
| 165 | def _run_loop( |
| 166 | repo: pathlib.Path, |
| 167 | *, |
| 168 | side_effect: "callable | None" = None, |
| 169 | kind_filter: str | None = None, |
| 170 | run_id_filter: str | None = None, |
| 171 | branch_filter: str | None = None, |
| 172 | as_json: bool = True, |
| 173 | once: bool = True, |
| 174 | # When once=False, a finite timeout is required so the loop exits. |
| 175 | # Default 1.0 s is enough for one side-effect iteration. |
| 176 | timeout: float | None = None, |
| 177 | poll_interval: float = 0.05, |
| 178 | max_events: int | None = None, |
| 179 | ) -> list[dict]: |
| 180 | """Run _watch_loop with an ImmediateBackend; return parsed events. |
| 181 | |
| 182 | For ``once=False`` tests, pass ``timeout`` explicitly (e.g. ``timeout=1.0``). |
| 183 | The backend fires the side_effect on the first wait, then sleeps until the |
| 184 | deadline, giving the loop exactly one diff iteration to detect changes. |
| 185 | """ |
| 186 | # Enforce finite timeout for non-once mode to prevent hangs. |
| 187 | if not once and timeout is None: |
| 188 | timeout = 1.0 |
| 189 | buf = io.StringIO() |
| 190 | backend = _ImmediateBackend(side_effect=side_effect) |
| 191 | with redirect_stdout(buf): |
| 192 | _watch_loop( |
| 193 | repo, |
| 194 | backend, |
| 195 | kind_filter=kind_filter, |
| 196 | run_id_filter=run_id_filter, |
| 197 | branch_filter=branch_filter, |
| 198 | as_json=as_json, |
| 199 | once=once, |
| 200 | timeout=timeout, |
| 201 | poll_interval=poll_interval, |
| 202 | max_events=max_events, |
| 203 | ) |
| 204 | lines = [l for l in buf.getvalue().splitlines() if l.strip()] |
| 205 | return [json.loads(line) for line in lines] |
| 206 | |
| 207 | |
| 208 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 209 | # Unit β WatchEvent |
| 210 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 211 | |
| 212 | |
| 213 | class TestWatchEvent: |
| 214 | def test_to_dict_has_all_fields(self) -> None: |
| 215 | ev = WatchEvent("added", "reservation", "uid-1", "2026-01-01T00:00:00+00:00", {}) |
| 216 | d = ev.to_dict() |
| 217 | assert set(d.keys()) == { |
| 218 | "schema_version", "event_type", "kind", "id", "timestamp", "data" |
| 219 | } |
| 220 | |
| 221 | def test_to_dict_values_round_trip(self) -> None: |
| 222 | payload = {"run_id": "a", "branch": "main"} |
| 223 | ev = WatchEvent("modified", "heartbeat", "uid-2", "2026-01-01T00:00:00+00:00", payload) |
| 224 | d = ev.to_dict() |
| 225 | assert d["event_type"] == "modified" |
| 226 | assert d["kind"] == "heartbeat" |
| 227 | assert d["id"] == "uid-2" |
| 228 | assert d["data"] == payload |
| 229 | |
| 230 | def test_to_dict_schema_version_is_string(self) -> None: |
| 231 | ev = WatchEvent("snapshot", "intent", "uid-3", "ts", {}) |
| 232 | assert isinstance(ev.to_dict()["schema_version"], str) |
| 233 | |
| 234 | |
| 235 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 236 | # Unit β _scan_dirs |
| 237 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 238 | |
| 239 | |
| 240 | class TestScanDirs: |
| 241 | def test_empty_coord_dirs_returns_empty_dicts(self, repo: pathlib.Path) -> None: |
| 242 | snap = _scan_dirs(repo) |
| 243 | for sub in ("reservations", "intents", "releases", "heartbeats"): |
| 244 | assert snap[sub] == {} |
| 245 | |
| 246 | def test_missing_subdir_returns_empty(self, tmp_path: pathlib.Path) -> None: |
| 247 | # No .muse/ dir at all. |
| 248 | snap = _scan_dirs(tmp_path) |
| 249 | for sub in ("reservations", "intents", "releases", "heartbeats"): |
| 250 | assert snap[sub] == {} |
| 251 | |
| 252 | def test_file_appears_in_snapshot(self, repo: pathlib.Path) -> None: |
| 253 | path = _coord_dir(repo) / "reservations" / "abc123.json" |
| 254 | path.write_text('{"x": 1}') |
| 255 | snap = _scan_dirs(repo) |
| 256 | assert "abc123" in snap["reservations"] |
| 257 | |
| 258 | def test_snapshot_entry_is_mtime_ns_and_size(self, repo: pathlib.Path) -> None: |
| 259 | path = _coord_dir(repo) / "reservations" / "abc123.json" |
| 260 | path.write_text('{"x": 1}') |
| 261 | snap = _scan_dirs(repo) |
| 262 | mtime_ns, size = snap["reservations"]["abc123"] |
| 263 | st = path.stat() |
| 264 | assert mtime_ns == st.st_mtime_ns |
| 265 | assert size == st.st_size |
| 266 | |
| 267 | def test_non_json_files_ignored(self, repo: pathlib.Path) -> None: |
| 268 | (_coord_dir(repo) / "reservations" / "not-a-json.txt").write_text("hi") |
| 269 | snap = _scan_dirs(repo) |
| 270 | assert snap["reservations"] == {} |
| 271 | |
| 272 | def test_multiple_kinds_all_scanned(self, repo: pathlib.Path) -> None: |
| 273 | (_coord_dir(repo) / "reservations" / "r1.json").write_text("{}") |
| 274 | (_coord_dir(repo) / "intents" / "i1.json").write_text("{}") |
| 275 | snap = _scan_dirs(repo) |
| 276 | assert "r1" in snap["reservations"] |
| 277 | assert "i1" in snap["intents"] |
| 278 | |
| 279 | def test_toctou_deleted_file_skipped(self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 280 | """File that disappears between glob and stat is silently skipped.""" |
| 281 | real_glob = pathlib.Path.glob |
| 282 | |
| 283 | def patched_glob(self: pathlib.Path, pattern: str) -> list[pathlib.Path]: |
| 284 | results = list(real_glob(self, pattern)) |
| 285 | # Inject a ghost path that does not exist. |
| 286 | ghost = self / "ghost.json" |
| 287 | return results + [ghost] |
| 288 | |
| 289 | monkeypatch.setattr(pathlib.Path, "glob", patched_glob) |
| 290 | snap = _scan_dirs(repo) |
| 291 | assert "ghost" not in snap["reservations"] |
| 292 | |
| 293 | |
| 294 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 295 | # Unit β _diff_snapshots |
| 296 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 297 | |
| 298 | |
| 299 | class TestDiffSnapshots: |
| 300 | def _empty(self) -> _Snapshot: |
| 301 | return {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")} |
| 302 | |
| 303 | def test_no_change_no_events(self) -> None: |
| 304 | snap = self._empty() |
| 305 | snap["reservations"]["uid1"] = (100, 50) |
| 306 | assert _diff_snapshots(snap, snap) == [] |
| 307 | |
| 308 | def test_added_detected(self) -> None: |
| 309 | old = self._empty() |
| 310 | new = {**old, "reservations": {"uid1": (100, 50)}} |
| 311 | events = _diff_snapshots(old, new) |
| 312 | assert ("added", "reservations", "uid1") in events |
| 313 | |
| 314 | def test_removed_detected(self) -> None: |
| 315 | old = {**self._empty(), "reservations": {"uid1": (100, 50)}} |
| 316 | new = self._empty() |
| 317 | events = _diff_snapshots(old, new) |
| 318 | assert ("removed", "reservations", "uid1") in events |
| 319 | |
| 320 | def test_modified_detected(self) -> None: |
| 321 | snap = {**self._empty(), "reservations": {"uid1": (100, 50)}} |
| 322 | new_snap = {**self._empty(), "reservations": {"uid1": (200, 51)}} |
| 323 | events = _diff_snapshots(snap, new_snap) |
| 324 | assert ("modified", "reservations", "uid1") in events |
| 325 | |
| 326 | def test_same_mtime_different_size_is_modified(self) -> None: |
| 327 | snap = {**self._empty(), "reservations": {"uid1": (100, 50)}} |
| 328 | new_snap = {**self._empty(), "reservations": {"uid1": (100, 99)}} |
| 329 | events = _diff_snapshots(snap, new_snap) |
| 330 | assert ("modified", "reservations", "uid1") in events |
| 331 | |
| 332 | def test_unchanged_mtime_and_size_is_not_modified(self) -> None: |
| 333 | snap = {**self._empty(), "reservations": {"uid1": (100, 50)}} |
| 334 | events = _diff_snapshots(snap, snap) |
| 335 | assert not any(e[0] == "modified" for e in events) |
| 336 | |
| 337 | def test_multiple_kinds_in_one_diff(self) -> None: |
| 338 | old = {**self._empty(), "reservations": {"r1": (1, 10)}} |
| 339 | new = { |
| 340 | "reservations": {}, |
| 341 | "intents": {"i1": (2, 20)}, |
| 342 | "releases": {}, |
| 343 | "heartbeats": {}, |
| 344 | } |
| 345 | events = _diff_snapshots(old, new) |
| 346 | event_types = {(e[0], e[2]) for e in events} |
| 347 | assert ("removed", "r1") in event_types |
| 348 | assert ("added", "i1") in event_types |
| 349 | |
| 350 | def test_output_is_sorted_deterministic(self) -> None: |
| 351 | """IDs within each kind are sorted for reproducible test output.""" |
| 352 | old = self._empty() |
| 353 | new = { |
| 354 | **self._empty(), |
| 355 | "reservations": {"zzz": (1, 1), "aaa": (2, 2), "mmm": (3, 3)}, |
| 356 | } |
| 357 | events = _diff_snapshots(old, new) |
| 358 | added_ids = [uid for et, kind, uid in events if et == "added"] |
| 359 | assert added_ids == sorted(added_ids) |
| 360 | |
| 361 | |
| 362 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 363 | # Unit β _passes_filters |
| 364 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 365 | |
| 366 | |
| 367 | class TestPassesFilters: |
| 368 | def _data(self, run_id: str = "agent-1", branch: str = "main") -> Manifest: |
| 369 | return {"run_id": run_id, "branch": branch} |
| 370 | |
| 371 | def test_no_filters_always_passes(self) -> None: |
| 372 | assert _passes_filters("reservation", self._data(), None, None, None) |
| 373 | |
| 374 | def test_kind_filter_match(self) -> None: |
| 375 | assert _passes_filters("reservation", self._data(), "reservation", None, None) |
| 376 | |
| 377 | def test_kind_filter_no_match(self) -> None: |
| 378 | assert not _passes_filters("intent", self._data(), "reservation", None, None) |
| 379 | |
| 380 | def test_run_id_filter_match(self) -> None: |
| 381 | assert _passes_filters("reservation", self._data(), None, "agent-1", None) |
| 382 | |
| 383 | def test_run_id_filter_no_match(self) -> None: |
| 384 | assert not _passes_filters("reservation", self._data(), None, "agent-9", None) |
| 385 | |
| 386 | def test_branch_filter_match(self) -> None: |
| 387 | assert _passes_filters("reservation", self._data(), None, None, "main") |
| 388 | |
| 389 | def test_branch_filter_no_match(self) -> None: |
| 390 | assert not _passes_filters("reservation", self._data(), None, None, "feature/x") |
| 391 | |
| 392 | def test_all_filters_and_semantics(self) -> None: |
| 393 | """All three filters must pass simultaneously.""" |
| 394 | d = self._data("agent-1", "main") |
| 395 | assert _passes_filters("reservation", d, "reservation", "agent-1", "main") |
| 396 | assert not _passes_filters("reservation", d, "reservation", "agent-1", "other") |
| 397 | |
| 398 | def test_empty_data_fails_run_id_filter(self) -> None: |
| 399 | """Empty data (removed event with no cache) fails run_id/branch filters.""" |
| 400 | assert not _passes_filters("reservation", {}, None, "agent-1", None) |
| 401 | |
| 402 | def test_empty_data_passes_when_no_filters(self) -> None: |
| 403 | assert _passes_filters("reservation", {}, None, None, None) |
| 404 | |
| 405 | |
| 406 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 407 | # Unit β _emit_event JSON mode |
| 408 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 409 | |
| 410 | |
| 411 | class TestEmitEventJson: |
| 412 | def _capture_json(self, ev: WatchEvent) -> MsgpackDict: |
| 413 | buf = io.StringIO() |
| 414 | with redirect_stdout(buf): |
| 415 | _emit_event(ev, as_json=True) |
| 416 | return json.loads(buf.getvalue().strip()) |
| 417 | |
| 418 | def test_json_is_valid_ndjson(self) -> None: |
| 419 | ev = WatchEvent("added", "reservation", "uid1", "2026-01-01T00:00:00+00:00", |
| 420 | {"run_id": "a", "branch": "b"}) |
| 421 | d = self._capture_json(ev) |
| 422 | assert d["event_type"] == "added" |
| 423 | assert d["kind"] == "reservation" |
| 424 | |
| 425 | def test_all_event_types_serialise(self) -> None: |
| 426 | for et in ("snapshot", "added", "modified", "removed", "expired"): |
| 427 | ev = WatchEvent(et, "intent", "uid2", "ts", {}) |
| 428 | d = self._capture_json(ev) |
| 429 | assert d["event_type"] == et |
| 430 | |
| 431 | def test_data_payload_preserved(self) -> None: |
| 432 | payload = {"run_id": "x", "addresses": ["f.py::fn"], "branch": "dev"} |
| 433 | ev = WatchEvent("added", "reservation", "uid3", "ts", payload) |
| 434 | d = self._capture_json(ev) |
| 435 | assert d["data"] == payload |
| 436 | |
| 437 | |
| 438 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 439 | # Unit β _emit_event text mode |
| 440 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 441 | |
| 442 | |
| 443 | class TestEmitEventText: |
| 444 | def _capture_text(self, ev: WatchEvent) -> str: |
| 445 | buf = io.StringIO() |
| 446 | with redirect_stdout(buf): |
| 447 | _emit_event(ev, as_json=False) |
| 448 | return buf.getvalue() |
| 449 | |
| 450 | def test_reservation_icon_and_kind(self) -> None: |
| 451 | ev = WatchEvent("added", "reservation", "a" * 8, "ts", |
| 452 | {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]}) |
| 453 | out = self._capture_text(ev) |
| 454 | assert "+" in out |
| 455 | assert "reservation" in out |
| 456 | |
| 457 | def test_snapshot_icon(self) -> None: |
| 458 | ev = WatchEvent("snapshot", "reservation", "a" * 8, "ts", |
| 459 | {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]}) |
| 460 | out = self._capture_text(ev) |
| 461 | assert "Β·" in out |
| 462 | |
| 463 | def test_expired_icon(self) -> None: |
| 464 | ev = WatchEvent("expired", "reservations", "a" * 8, "ts", |
| 465 | {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]}) |
| 466 | out = self._capture_text(ev) |
| 467 | assert "!" in out |
| 468 | |
| 469 | def test_empty_data_does_not_crash(self) -> None: |
| 470 | ev = WatchEvent("removed", "reservation", "b" * 8, "ts", {}) |
| 471 | out = self._capture_text(ev) |
| 472 | assert "bbbbbbbb" in out |
| 473 | |
| 474 | def test_heartbeat_shows_extends_to(self) -> None: |
| 475 | ev = WatchEvent("modified", "heartbeat", "c" * 8, "ts", |
| 476 | {"run_id": "r1", "reservation_id": "d" * 8, |
| 477 | "extended_expires_at": "2099-01-01T00:00:00+00:00"}) |
| 478 | out = self._capture_text(ev) |
| 479 | assert "extends to" in out |
| 480 | |
| 481 | def test_release_shows_reason(self) -> None: |
| 482 | ev = WatchEvent("added", "release", "e" * 8, "ts", |
| 483 | {"run_id": "r1", "reason": "completed", |
| 484 | "reservation_id": "f" * 8}) |
| 485 | out = self._capture_text(ev) |
| 486 | assert "completed" in out |
| 487 | |
| 488 | def test_intent_shows_operation(self) -> None: |
| 489 | ev = WatchEvent("added", "intent", "g" * 8, "ts", |
| 490 | {"run_id": "r1", "branch": "main", "operation": "delete", |
| 491 | "addresses": ["src/mod.py::fn"]}) |
| 492 | out = self._capture_text(ev) |
| 493 | assert "delete" in out |
| 494 | |
| 495 | def test_ansi_escape_stripped_from_run_id(self) -> None: |
| 496 | """ANSI escape in run_id must not reach terminal output.""" |
| 497 | malicious_run_id = "\x1b[31mmalicious\x1b[0m" |
| 498 | ev = WatchEvent("added", "reservation", "h" * 8, "ts", |
| 499 | {"run_id": malicious_run_id, "branch": "main", |
| 500 | "addresses": ["f.py::fn"]}) |
| 501 | out = self._capture_text(ev) |
| 502 | assert "\x1b" not in out |
| 503 | assert "malicious" in out # Content preserved, escapes stripped. |
| 504 | |
| 505 | def test_long_address_list_truncated(self) -> None: |
| 506 | addrs = [f"f.py::fn{i}" for i in range(10)] |
| 507 | ev = WatchEvent("added", "reservation", "i" * 8, "ts", |
| 508 | {"run_id": "r1", "branch": "main", "addresses": addrs}) |
| 509 | out = self._capture_text(ev) |
| 510 | assert "+7 more" in out |
| 511 | |
| 512 | |
| 513 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 514 | # Unit β _check_expirations |
| 515 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 516 | |
| 517 | |
| 518 | class TestCheckExpirations: |
| 519 | def test_no_change_no_events(self, repo: pathlib.Path) -> None: |
| 520 | res = _make_reservation(repo) |
| 521 | active_ids = {res.reservation_id} |
| 522 | events, curr = _check_expirations(repo, active_ids, set()) |
| 523 | assert events == [] |
| 524 | assert res.reservation_id in curr |
| 525 | |
| 526 | def test_expired_reservation_emits_event(self, repo: pathlib.Path) -> None: |
| 527 | """An ID that was active but is no longer β expired event.""" |
| 528 | gone_id = _new_id() |
| 529 | active_ids = {gone_id} |
| 530 | # Create the file with an explicit past expiry so _load_record can find it |
| 531 | # and active_reservations() correctly excludes it. |
| 532 | past_iso = ( |
| 533 | datetime.datetime.now(datetime.timezone.utc) |
| 534 | - datetime.timedelta(hours=2) |
| 535 | ).isoformat() |
| 536 | now_iso = now_utc_iso() |
| 537 | path = _coord_dir(repo) / "reservations" / f"{gone_id}.json" |
| 538 | path.write_text(json.dumps({ |
| 539 | "reservation_id": gone_id, |
| 540 | "run_id": "r1", |
| 541 | "branch": "main", |
| 542 | "addresses": ["src/mod.py::fn"], |
| 543 | "created_at": now_iso, |
| 544 | "expires_at": past_iso, |
| 545 | "operation": "modify", |
| 546 | "schema_version": "1.0.0", |
| 547 | })) |
| 548 | events, curr = _check_expirations(repo, active_ids, set()) |
| 549 | assert any(e.event_type == "expired" and e.id == gone_id for e in events) |
| 550 | assert gone_id not in curr |
| 551 | |
| 552 | def test_removed_id_not_double_counted(self, repo: pathlib.Path) -> None: |
| 553 | """ID in removed_ids must NOT produce an expired event too.""" |
| 554 | gone_id = _new_id() |
| 555 | events, _ = _check_expirations(repo, {gone_id}, {gone_id}) |
| 556 | assert events == [] |
| 557 | |
| 558 | def test_active_reservation_stays_active(self, repo: pathlib.Path) -> None: |
| 559 | """Live reservation is NOT reported as expired.""" |
| 560 | res = _make_reservation(repo) |
| 561 | prev = {res.reservation_id} |
| 562 | events, curr = _check_expirations(repo, prev, set()) |
| 563 | assert not any(e.event_type == "expired" for e in events) |
| 564 | |
| 565 | def test_returns_current_active_ids(self, repo: pathlib.Path) -> None: |
| 566 | res = _make_reservation(repo) |
| 567 | _, curr = _check_expirations(repo, set(), set()) |
| 568 | assert res.reservation_id in curr |
| 569 | |
| 570 | |
| 571 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 572 | # Unit β _make_event |
| 573 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 574 | |
| 575 | |
| 576 | class TestMakeEvent: |
| 577 | def test_timestamp_is_utc_iso8601(self) -> None: |
| 578 | ev = _make_event("added", "reservation", "uid1", {}) |
| 579 | # Must parse as UTC datetime. |
| 580 | dt = datetime.datetime.fromisoformat(ev.timestamp) |
| 581 | assert dt.tzinfo is not None |
| 582 | |
| 583 | def test_fields_set_correctly(self) -> None: |
| 584 | ev = _make_event("removed", "intent", "uid2", {"x": 1}) |
| 585 | assert ev.event_type == "removed" |
| 586 | assert ev.kind == "intent" |
| 587 | assert ev.id == "uid2" |
| 588 | assert ev.data == {"x": 1} |
| 589 | |
| 590 | |
| 591 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 592 | # Integration β _watch_loop --once |
| 593 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 594 | |
| 595 | |
| 596 | class TestWatchLoopOnce: |
| 597 | def test_empty_repo_no_events(self, repo: pathlib.Path) -> None: |
| 598 | events = _run_loop(repo, once=True) |
| 599 | assert events == [] |
| 600 | |
| 601 | def test_existing_reservation_emits_snapshot(self, repo: pathlib.Path) -> None: |
| 602 | _make_reservation(repo, run_id="agent-1") |
| 603 | events = _run_loop(repo, once=True) |
| 604 | kinds = [e["kind"] for e in events] |
| 605 | assert "reservations" in kinds |
| 606 | |
| 607 | def test_snapshot_event_type(self, repo: pathlib.Path) -> None: |
| 608 | _make_reservation(repo) |
| 609 | events = _run_loop(repo, once=True) |
| 610 | assert all(e["event_type"] == "snapshot" for e in events) |
| 611 | |
| 612 | def test_all_kinds_emitted_in_snapshot(self, repo: pathlib.Path) -> None: |
| 613 | res = _make_reservation(repo) |
| 614 | create_intent(repo, res.reservation_id, "agent-1", "main", |
| 615 | ["src/mod.py::fn"], "modify") |
| 616 | events = _run_loop(repo, once=True) |
| 617 | kinds = {e["kind"] for e in events} |
| 618 | assert "reservations" in kinds |
| 619 | assert "intents" in kinds |
| 620 | |
| 621 | def test_snapshot_carries_data(self, repo: pathlib.Path) -> None: |
| 622 | _make_reservation(repo, run_id="agent-snapshot-test") |
| 623 | events = _run_loop(repo, once=True) |
| 624 | res_events = [e for e in events if e["kind"] == "reservations"] |
| 625 | assert any(e["data"].get("run_id") == "agent-snapshot-test" for e in res_events) |
| 626 | |
| 627 | def test_once_does_not_loop(self, repo: pathlib.Path) -> None: |
| 628 | """--once must return quickly (no blocking wait calls).""" |
| 629 | start = time.monotonic() |
| 630 | _run_loop(repo, once=True) |
| 631 | elapsed = time.monotonic() - start |
| 632 | assert elapsed < 1.0 # Must not block for poll_interval. |
| 633 | |
| 634 | |
| 635 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 636 | # Integration β filters in _watch_loop |
| 637 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 638 | |
| 639 | |
| 640 | class TestWatchLoopFilters: |
| 641 | def test_kind_filter_reservations(self, repo: pathlib.Path) -> None: |
| 642 | res = _make_reservation(repo) |
| 643 | create_intent(repo, res.reservation_id, "agent-1", "main", |
| 644 | ["src/mod.py::fn"], "modify") |
| 645 | events = _run_loop(repo, once=True, kind_filter="reservations") |
| 646 | assert all(e["kind"] == "reservations" for e in events) |
| 647 | |
| 648 | def test_kind_filter_intents(self, repo: pathlib.Path) -> None: |
| 649 | res = _make_reservation(repo) |
| 650 | create_intent(repo, res.reservation_id, "agent-1", "main", |
| 651 | ["src/mod.py::fn"], "modify") |
| 652 | events = _run_loop(repo, once=True, kind_filter="intents") |
| 653 | assert all(e["kind"] == "intents" for e in events) |
| 654 | |
| 655 | def test_run_id_filter(self, repo: pathlib.Path) -> None: |
| 656 | _make_reservation(repo, run_id="agent-A") |
| 657 | _make_reservation(repo, run_id="agent-B") |
| 658 | events = _run_loop(repo, once=True, run_id_filter="agent-A") |
| 659 | assert all(e["data"].get("run_id") == "agent-A" for e in events) |
| 660 | assert len(events) == 1 |
| 661 | |
| 662 | def test_branch_filter(self, repo: pathlib.Path) -> None: |
| 663 | _make_reservation(repo, branch="main") |
| 664 | _make_reservation(repo, branch="feature/x") |
| 665 | events = _run_loop(repo, once=True, branch_filter="main") |
| 666 | assert all(e["data"].get("branch") == "main" for e in events) |
| 667 | assert len(events) == 1 |
| 668 | |
| 669 | def test_combined_filters(self, repo: pathlib.Path) -> None: |
| 670 | _make_reservation(repo, run_id="agent-A", branch="main") |
| 671 | _make_reservation(repo, run_id="agent-A", branch="feature/x") |
| 672 | _make_reservation(repo, run_id="agent-B", branch="main") |
| 673 | events = _run_loop(repo, once=True, run_id_filter="agent-A", branch_filter="main") |
| 674 | assert len(events) == 1 |
| 675 | assert events[0]["data"]["run_id"] == "agent-A" |
| 676 | assert events[0]["data"]["branch"] == "main" |
| 677 | |
| 678 | def test_no_match_no_events(self, repo: pathlib.Path) -> None: |
| 679 | _make_reservation(repo, run_id="agent-X") |
| 680 | events = _run_loop(repo, once=True, run_id_filter="agent-NOBODY") |
| 681 | assert events == [] |
| 682 | |
| 683 | |
| 684 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 685 | # Integration β added events |
| 686 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 687 | |
| 688 | |
| 689 | class TestWatchLoopDetectsAdded: |
| 690 | def test_new_reservation_emits_added(self, repo: pathlib.Path) -> None: |
| 691 | added = [] |
| 692 | |
| 693 | def _write_res() -> None: |
| 694 | _make_reservation(repo, run_id="agent-new") |
| 695 | |
| 696 | events = _run_loop(repo, side_effect=_write_res, once=False) |
| 697 | added = [e for e in events if e["event_type"] == "added"] |
| 698 | assert len(added) == 1 |
| 699 | assert added[0]["kind"] == "reservations" |
| 700 | assert added[0]["data"]["run_id"] == "agent-new" |
| 701 | |
| 702 | def test_new_intent_emits_added(self, repo: pathlib.Path) -> None: |
| 703 | res = _make_reservation(repo) |
| 704 | |
| 705 | def _write_intent() -> None: |
| 706 | create_intent(repo, res.reservation_id, "agent-1", "main", |
| 707 | ["src/mod.py::fn"], "modify") |
| 708 | |
| 709 | events = _run_loop(repo, side_effect=_write_intent, once=False) |
| 710 | added = [e for e in events if e["event_type"] == "added" and e["kind"] == "intents"] |
| 711 | assert len(added) == 1 |
| 712 | |
| 713 | def test_added_event_carries_data(self, repo: pathlib.Path) -> None: |
| 714 | def _write_res() -> None: |
| 715 | _make_reservation(repo, run_id="agent-data-test", branch="feature/y") |
| 716 | |
| 717 | events = _run_loop(repo, side_effect=_write_res, once=False) |
| 718 | added = [e for e in events if e["event_type"] == "added"] |
| 719 | assert added[0]["data"]["run_id"] == "agent-data-test" |
| 720 | assert added[0]["data"]["branch"] == "feature/y" |
| 721 | |
| 722 | |
| 723 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 724 | # Integration β modified events |
| 725 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 726 | |
| 727 | |
| 728 | class TestWatchLoopDetectsModified: |
| 729 | def test_heartbeat_update_emits_modified(self, repo: pathlib.Path) -> None: |
| 730 | res = _make_reservation(repo) |
| 731 | # Write an initial heartbeat so the file exists in the snapshot. |
| 732 | create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=100) |
| 733 | |
| 734 | def _update_hb() -> None: |
| 735 | # Small sleep ensures mtime changes on fast filesystems. |
| 736 | time.sleep(0.02) |
| 737 | create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=200) |
| 738 | |
| 739 | events = _run_loop(repo, side_effect=_update_hb, once=False) |
| 740 | modified = [e for e in events if e["event_type"] == "modified" and e["kind"] == "heartbeats"] |
| 741 | assert len(modified) == 1 |
| 742 | |
| 743 | def test_modified_event_carries_new_data(self, repo: pathlib.Path) -> None: |
| 744 | res = _make_reservation(repo) |
| 745 | create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=100) |
| 746 | |
| 747 | def _update_hb() -> None: |
| 748 | time.sleep(0.02) |
| 749 | create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=9999) |
| 750 | |
| 751 | events = _run_loop(repo, side_effect=_update_hb, once=False) |
| 752 | modified = [e for e in events if e["event_type"] == "modified"] |
| 753 | if modified: |
| 754 | # Data should reflect the new heartbeat. |
| 755 | assert "extended_expires_at" in modified[0]["data"] |
| 756 | |
| 757 | |
| 758 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 759 | # Integration β removed events |
| 760 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 761 | |
| 762 | |
| 763 | class TestWatchLoopDetectsRemoved: |
| 764 | def test_deleted_file_emits_removed(self, repo: pathlib.Path) -> None: |
| 765 | res = _make_reservation(repo) |
| 766 | # Snapshot includes the reservation. |
| 767 | path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" |
| 768 | |
| 769 | def _delete_file() -> None: |
| 770 | path.unlink() |
| 771 | |
| 772 | events = _run_loop(repo, side_effect=_delete_file, once=False) |
| 773 | removed = [e for e in events if e["event_type"] == "removed"] |
| 774 | assert len(removed) == 1 |
| 775 | assert removed[0]["id"] == res.reservation_id |
| 776 | |
| 777 | def test_removed_event_carries_cached_data(self, repo: pathlib.Path) -> None: |
| 778 | """Even after deletion, removed events carry the last-known data.""" |
| 779 | res = _make_reservation(repo, run_id="agent-cached") |
| 780 | path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" |
| 781 | |
| 782 | def _delete_file() -> None: |
| 783 | path.unlink() |
| 784 | |
| 785 | events = _run_loop(repo, side_effect=_delete_file, once=False) |
| 786 | removed = [e for e in events if e["event_type"] == "removed"] |
| 787 | assert removed[0]["data"].get("run_id") == "agent-cached" |
| 788 | |
| 789 | def test_removed_event_passes_run_id_filter(self, repo: pathlib.Path) -> None: |
| 790 | """Filter by run_id must work for removed events using cached data.""" |
| 791 | res = _make_reservation(repo, run_id="agent-filter-test") |
| 792 | path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" |
| 793 | |
| 794 | def _delete_file() -> None: |
| 795 | path.unlink() |
| 796 | |
| 797 | events = _run_loop( |
| 798 | repo, side_effect=_delete_file, once=False, |
| 799 | run_id_filter="agent-filter-test", |
| 800 | ) |
| 801 | removed = [e for e in events if e["event_type"] == "removed"] |
| 802 | assert len(removed) == 1 |
| 803 | |
| 804 | |
| 805 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 806 | # Integration β expiration events |
| 807 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 808 | |
| 809 | |
| 810 | class TestWatchLoopDetectsExpiry: |
| 811 | def test_expired_reservation_emits_expired(self, repo: pathlib.Path) -> None: |
| 812 | """Reservation active at startup that expires during loop β expired event. |
| 813 | |
| 814 | Strategy: create an active reservation, then the side_effect rewrites the |
| 815 | file with expires_at in the past. The loop sees it as modified on the FS |
| 816 | AND sees it drop out of active_reservations() β fires expired event. |
| 817 | """ |
| 818 | res = _make_reservation(repo, ttl_seconds=3600) |
| 819 | path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" |
| 820 | |
| 821 | def _expire_it() -> None: |
| 822 | data = json.loads(path.read_text()) |
| 823 | past = ( |
| 824 | datetime.datetime.now(datetime.timezone.utc) |
| 825 | - datetime.timedelta(hours=2) |
| 826 | ).isoformat() |
| 827 | data["expires_at"] = past |
| 828 | path.write_text(json.dumps(data)) |
| 829 | |
| 830 | events = _run_loop(repo, side_effect=_expire_it, once=False) |
| 831 | expired = [e for e in events if e["event_type"] == "expired"] |
| 832 | assert any(e["id"] == res.reservation_id for e in expired) |
| 833 | |
| 834 | def test_active_reservation_no_expired_event(self, repo: pathlib.Path) -> None: |
| 835 | res = _make_reservation(repo, ttl_seconds=9999) |
| 836 | events = _run_loop(repo, once=False) |
| 837 | expired = [e for e in events if e["event_type"] == "expired"] |
| 838 | assert not any(e["id"] == res.reservation_id for e in expired) |
| 839 | |
| 840 | def test_removed_reservation_not_expired(self, repo: pathlib.Path) -> None: |
| 841 | """GC'd reservation (file deleted) must not also fire expired. |
| 842 | |
| 843 | Strategy: create an active reservation, then the side_effect deletes it |
| 844 | (simulating GC). Must see a removed event but NOT an additional expired |
| 845 | event for the same ID. |
| 846 | """ |
| 847 | res = _make_reservation(repo, ttl_seconds=3600) |
| 848 | path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json" |
| 849 | |
| 850 | def _delete_file() -> None: |
| 851 | path.unlink() |
| 852 | |
| 853 | events = _run_loop(repo, side_effect=_delete_file, once=False) |
| 854 | removed = [e for e in events if e["event_type"] == "removed" and e["id"] == res.reservation_id] |
| 855 | expired_dup = [ |
| 856 | e for e in events |
| 857 | if e["event_type"] == "expired" and e["id"] == res.reservation_id |
| 858 | ] |
| 859 | # Must see a removed event, must NOT also see expired for the same ID. |
| 860 | assert removed |
| 861 | assert not expired_dup |
| 862 | |
| 863 | |
| 864 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 865 | # Integration β timeout |
| 866 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 867 | |
| 868 | |
| 869 | class TestWatchLoopTimeout: |
| 870 | def test_timeout_zero_equivalent_to_once(self, repo: pathlib.Path) -> None: |
| 871 | """timeout=0 should exit after snapshot, no looping.""" |
| 872 | _make_reservation(repo) |
| 873 | start = time.monotonic() |
| 874 | events = _run_loop(repo, once=False, timeout=0.0) |
| 875 | elapsed = time.monotonic() - start |
| 876 | # Should return quickly (snapshot only). |
| 877 | assert elapsed < 1.0 |
| 878 | # Should still emit the snapshot. |
| 879 | assert any(e["event_type"] == "snapshot" for e in events) |
| 880 | |
| 881 | def test_loop_exits_after_timeout(self, repo: pathlib.Path) -> None: |
| 882 | """With a real PollingBackend and short timeout, loop exits.""" |
| 883 | buf = io.StringIO() |
| 884 | backend = _PollingBackend(0.05) |
| 885 | start = time.monotonic() |
| 886 | with redirect_stdout(buf): |
| 887 | _watch_loop( |
| 888 | repo, backend, |
| 889 | kind_filter=None, run_id_filter=None, branch_filter=None, |
| 890 | as_json=True, once=False, timeout=0.2, poll_interval=0.05, |
| 891 | ) |
| 892 | elapsed = time.monotonic() - start |
| 893 | assert elapsed < 2.0 # Must not run forever. |
| 894 | |
| 895 | |
| 896 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 897 | # Integration β JSON output |
| 898 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 899 | |
| 900 | |
| 901 | class TestWatchLoopJsonOutput: |
| 902 | def test_all_snapshot_events_are_valid_json(self, repo: pathlib.Path) -> None: |
| 903 | _make_reservation(repo) |
| 904 | events = _run_loop(repo, once=True, as_json=True) |
| 905 | for ev in events: |
| 906 | # Already parsed by _run_loop, so this just checks structure. |
| 907 | assert "schema_version" in ev |
| 908 | assert "event_type" in ev |
| 909 | assert "kind" in ev |
| 910 | assert "id" in ev |
| 911 | assert "timestamp" in ev |
| 912 | assert "data" in ev |
| 913 | |
| 914 | def test_json_event_type_values(self, repo: pathlib.Path) -> None: |
| 915 | _make_reservation(repo) |
| 916 | events = _run_loop(repo, once=True) |
| 917 | valid_types = {"snapshot", "added", "modified", "removed", "expired"} |
| 918 | for ev in events: |
| 919 | assert ev["event_type"] in valid_types |
| 920 | |
| 921 | |
| 922 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 923 | # End-to-end β run() with mocked require_repo |
| 924 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 925 | |
| 926 | |
| 927 | class TestRunCommand: |
| 928 | def _make_args(self, repo: pathlib.Path, **kwargs: MsgpackValue) -> argparse.Namespace: |
| 929 | import argparse |
| 930 | ns = argparse.Namespace() |
| 931 | ns.once = kwargs.get("once", True) |
| 932 | ns.timeout = kwargs.get("timeout", None) |
| 933 | ns.max_events = kwargs.get("max_events", None) |
| 934 | ns.poll_interval = kwargs.get("poll_interval", 0.1) |
| 935 | ns.run_id = kwargs.get("run_id", None) |
| 936 | ns.branch_filter = kwargs.get("branch_filter", None) |
| 937 | ns.kind = kwargs.get("kind", None) |
| 938 | ns.json_out = kwargs.get("json_out", True) |
| 939 | return ns |
| 940 | |
| 941 | def test_run_once_emits_snapshot(self, repo: pathlib.Path) -> None: |
| 942 | from muse.cli.commands.watch_coord import run |
| 943 | _make_reservation(repo) |
| 944 | args = self._make_args(repo, once=True) |
| 945 | buf = io.StringIO() |
| 946 | with ( |
| 947 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 948 | redirect_stdout(buf), |
| 949 | ): |
| 950 | run(args) |
| 951 | lines = [l for l in buf.getvalue().splitlines() if l.strip()] |
| 952 | events = [json.loads(l) for l in lines] |
| 953 | assert any(e["event_type"] == "snapshot" for e in events) |
| 954 | |
| 955 | def test_run_text_mode_header_printed(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 956 | from muse.cli.commands.watch_coord import run |
| 957 | args = self._make_args(repo, once=True, json_out=False) |
| 958 | with patch("muse.cli.commands.watch_coord.require_repo", return_value=repo): |
| 959 | run(args) |
| 960 | captured = capsys.readouterr() |
| 961 | assert "muse coord watch" in captured.out |
| 962 | assert "watch ended" in captured.out |
| 963 | |
| 964 | def test_run_json_mode_no_header(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 965 | from muse.cli.commands.watch_coord import run |
| 966 | args = self._make_args(repo, once=True, json_out=True) |
| 967 | with patch("muse.cli.commands.watch_coord.require_repo", return_value=repo): |
| 968 | run(args) |
| 969 | captured = capsys.readouterr() |
| 970 | lines = [l for l in captured.out.splitlines() if l.strip()] |
| 971 | for line in lines: |
| 972 | # Every non-empty line must be valid JSON. |
| 973 | json.loads(line) |
| 974 | |
| 975 | def test_run_timeout_zero_treated_as_once(self, repo: pathlib.Path) -> None: |
| 976 | """--timeout 0 must not block.""" |
| 977 | from muse.cli.commands.watch_coord import run |
| 978 | args = self._make_args(repo, once=False, timeout=0.0, json_out=True) |
| 979 | start = time.monotonic() |
| 980 | buf = io.StringIO() |
| 981 | with ( |
| 982 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 983 | redirect_stdout(buf), |
| 984 | ): |
| 985 | run(args) |
| 986 | assert time.monotonic() - start < 1.0 |
| 987 | |
| 988 | |
| 989 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 990 | # End-to-end β backend selection |
| 991 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 992 | |
| 993 | |
| 994 | class TestBackendSelection: |
| 995 | def test_polling_backend_wait_sleeps(self) -> None: |
| 996 | b = _PollingBackend(0.05) |
| 997 | start = time.monotonic() |
| 998 | result = b.wait(0.1) |
| 999 | elapsed = time.monotonic() - start |
| 1000 | assert result is True |
| 1001 | assert elapsed >= 0.04 # Should have slept. |
| 1002 | b.close() |
| 1003 | |
| 1004 | def test_polling_backend_respects_timeout_cap(self) -> None: |
| 1005 | """wait(timeout) must not sleep longer than timeout.""" |
| 1006 | b = _PollingBackend(10.0) # Long interval. |
| 1007 | start = time.monotonic() |
| 1008 | b.wait(0.05) # Short timeout. |
| 1009 | elapsed = time.monotonic() - start |
| 1010 | assert elapsed < 1.0 # Capped at 0.05 s. |
| 1011 | b.close() |
| 1012 | |
| 1013 | def test_polling_backend_close_is_idempotent(self) -> None: |
| 1014 | b = _PollingBackend(1.0) |
| 1015 | b.close() |
| 1016 | b.close() # Should not raise. |
| 1017 | |
| 1018 | @pytest.mark.skipif( |
| 1019 | not hasattr(__import__("select"), "kqueue"), |
| 1020 | reason="kqueue not available on this platform", |
| 1021 | ) |
| 1022 | def test_kqueue_backend_initialises(self, repo: pathlib.Path) -> None: |
| 1023 | """kqueue backend can be created and closed without error.""" |
| 1024 | from muse.cli.commands.watch_coord import _SUBDIRS |
| 1025 | dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS] |
| 1026 | b = _KqueueBackend(dirs) |
| 1027 | assert b.name == "kqueue" |
| 1028 | b.close() |
| 1029 | |
| 1030 | @pytest.mark.skipif( |
| 1031 | not hasattr(__import__("select"), "kqueue"), |
| 1032 | reason="kqueue not available on this platform", |
| 1033 | ) |
| 1034 | def test_kqueue_backend_detects_new_file(self, repo: pathlib.Path) -> None: |
| 1035 | """kqueue wakes when a new JSON file is added to a watched dir.""" |
| 1036 | from muse.cli.commands.watch_coord import _SUBDIRS |
| 1037 | dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS] |
| 1038 | b = _KqueueBackend(dirs) |
| 1039 | try: |
| 1040 | # No change β should time out. |
| 1041 | result_before = b.wait(0.05) |
| 1042 | # Add a file. |
| 1043 | (_coord_dir(repo) / "reservations" / "newfile.json").write_text("{}") |
| 1044 | # Should detect change. |
| 1045 | result_after = b.wait(0.5) |
| 1046 | assert result_after is True |
| 1047 | finally: |
| 1048 | b.close() |
| 1049 | |
| 1050 | @pytest.mark.skipif( |
| 1051 | not hasattr(__import__("select"), "kqueue"), |
| 1052 | reason="kqueue not available on this platform", |
| 1053 | ) |
| 1054 | def test_kqueue_backend_close_releases_fds(self, repo: pathlib.Path) -> None: |
| 1055 | """All fds must be released after close.""" |
| 1056 | import resource |
| 1057 | from muse.cli.commands.watch_coord import _SUBDIRS |
| 1058 | dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS] |
| 1059 | fds_before = resource.getrlimit(resource.RLIMIT_NOFILE)[0] |
| 1060 | b = _KqueueBackend(dirs) |
| 1061 | b.close() |
| 1062 | # Just ensure no exception β fd leak would only show in long-running tests. |
| 1063 | |
| 1064 | |
| 1065 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1066 | # Security tests |
| 1067 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1068 | |
| 1069 | |
| 1070 | class TestAnsiInjectionSanitized: |
| 1071 | def test_ansi_in_run_id_stripped_text_output(self) -> None: |
| 1072 | """ESC sequences in run_id must not reach stdout in text mode.""" |
| 1073 | malicious = "\x1b[1;31mROOT\x1b[0m" |
| 1074 | ev = WatchEvent("added", "reservation", "a" * 8, "ts", |
| 1075 | {"run_id": malicious, "branch": "main", "addresses": ["f.py::fn"]}) |
| 1076 | buf = io.StringIO() |
| 1077 | with redirect_stdout(buf): |
| 1078 | _emit_event(ev, as_json=False) |
| 1079 | assert "\x1b" not in buf.getvalue() |
| 1080 | assert "ROOT" in buf.getvalue() |
| 1081 | |
| 1082 | def test_ansi_in_branch_stripped(self) -> None: |
| 1083 | malicious_branch = "\x1b[4mmaster\x1b[0m" |
| 1084 | ev = WatchEvent("added", "reservation", "b" * 8, "ts", |
| 1085 | {"run_id": "agent", "branch": malicious_branch, "addresses": ["f.py::fn"]}) |
| 1086 | buf = io.StringIO() |
| 1087 | with redirect_stdout(buf): |
| 1088 | _emit_event(ev, as_json=False) |
| 1089 | assert "\x1b" not in buf.getvalue() |
| 1090 | |
| 1091 | def test_ansi_in_address_stripped(self) -> None: |
| 1092 | malicious_addr = "\x1b[32msrc/malicious.py::inject\x1b[0m" |
| 1093 | ev = WatchEvent("added", "reservation", "c" * 8, "ts", |
| 1094 | {"run_id": "agent", "branch": "main", "addresses": [malicious_addr]}) |
| 1095 | buf = io.StringIO() |
| 1096 | with redirect_stdout(buf): |
| 1097 | _emit_event(ev, as_json=False) |
| 1098 | assert "\x1b" not in buf.getvalue() |
| 1099 | |
| 1100 | def test_json_output_preserves_raw_data(self) -> None: |
| 1101 | """JSON output must preserve raw data as-is (no sanitisation).""" |
| 1102 | malicious = "\x1b[31mmalicious\x1b[0m" |
| 1103 | ev = WatchEvent("added", "reservation", "d" * 8, "ts", |
| 1104 | {"run_id": malicious, "branch": "main", "addresses": ["f.py::fn"]}) |
| 1105 | buf = io.StringIO() |
| 1106 | with redirect_stdout(buf): |
| 1107 | _emit_event(ev, as_json=True) |
| 1108 | parsed = json.loads(buf.getvalue()) |
| 1109 | assert parsed["data"]["run_id"] == malicious # Stored as-is in JSON. |
| 1110 | |
| 1111 | |
| 1112 | class TestSymlinkDirRejected: |
| 1113 | @pytest.mark.skipif( |
| 1114 | not hasattr(__import__("select"), "kqueue"), |
| 1115 | reason="kqueue not available on this platform", |
| 1116 | ) |
| 1117 | def test_symlinked_coord_dir_raises_valueerror(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: |
| 1118 | """kqueue backend must refuse to watch a symlinked directory.""" |
| 1119 | import shutil |
| 1120 | # Create a real target dir for the symlink to point at. |
| 1121 | real_dir = tmp_path / "real_target" |
| 1122 | real_dir.mkdir() |
| 1123 | link_dir = _coord_dir(repo) / "reservations" |
| 1124 | # Remove the existing real directory before replacing with a symlink. |
| 1125 | shutil.rmtree(link_dir, ignore_errors=True) |
| 1126 | link_dir.symlink_to(real_dir) |
| 1127 | try: |
| 1128 | dirs = [link_dir] |
| 1129 | with pytest.raises(ValueError, match="symlink"): |
| 1130 | _KqueueBackend(dirs) |
| 1131 | finally: |
| 1132 | link_dir.unlink(missing_ok=True) |
| 1133 | # Restore the real dir for other tests. |
| 1134 | link_dir.mkdir(exist_ok=True) |
| 1135 | |
| 1136 | |
| 1137 | class TestKindFilterAllowlist: |
| 1138 | def test_valid_kinds_accepted(self) -> None: |
| 1139 | for kind in ("reservations", "intents", "releases", "heartbeats"): |
| 1140 | assert _passes_filters(kind, {}, kind, None, None) |
| 1141 | |
| 1142 | def test_arbitrary_kind_string_rejected_by_filter(self) -> None: |
| 1143 | """_passes_filters rejects unrecognised kinds when kind_filter is set.""" |
| 1144 | assert not _passes_filters("../etc", {}, "reservations", None, None) |
| 1145 | |
| 1146 | def test_argparse_rejects_invalid_kind(self) -> None: |
| 1147 | """argparse choices validation rejects invalid --kind values.""" |
| 1148 | import argparse |
| 1149 | from muse.cli.commands.watch_coord import register |
| 1150 | p = argparse.ArgumentParser() |
| 1151 | subs = p.add_subparsers() |
| 1152 | register(subs) |
| 1153 | with pytest.raises(SystemExit): |
| 1154 | p.parse_args(["watch", "--kind", "invalid_kind"]) |
| 1155 | |
| 1156 | |
| 1157 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1158 | # Stress tests |
| 1159 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1160 | |
| 1161 | |
| 1162 | class TestStress500RapidAdds: |
| 1163 | def test_500_reservations_all_detected(self, repo: pathlib.Path) -> None: |
| 1164 | """500 pre-existing reservations must all appear in the snapshot.""" |
| 1165 | # Write 500 reservation files directly (bypass create_reservation for speed). |
| 1166 | res_dir = _coord_dir(repo) / "reservations" |
| 1167 | now_iso = now_utc_iso() |
| 1168 | exp_iso = ( |
| 1169 | datetime.datetime.now(datetime.timezone.utc) |
| 1170 | + datetime.timedelta(hours=1) |
| 1171 | ).isoformat() |
| 1172 | ids = [] |
| 1173 | for i in range(500): |
| 1174 | uid = _new_id() |
| 1175 | ids.append(uid) |
| 1176 | data = { |
| 1177 | "reservation_id": uid, |
| 1178 | "run_id": f"agent-{i}", |
| 1179 | "branch": "main", |
| 1180 | "addresses": [f"src/mod{i}.py::fn"], |
| 1181 | "created_at": now_iso, |
| 1182 | "expires_at": exp_iso, |
| 1183 | "operation": "modify", |
| 1184 | "schema_version": "1.0.0", |
| 1185 | } |
| 1186 | (res_dir / f"{uid}.json").write_text(json.dumps(data)) |
| 1187 | |
| 1188 | start = time.monotonic() |
| 1189 | events = _run_loop(repo, once=True) |
| 1190 | elapsed = time.monotonic() - start |
| 1191 | |
| 1192 | snapshot_events = [e for e in events if e["event_type"] == "snapshot"] |
| 1193 | snapshot_ids = {e["id"] for e in snapshot_events} |
| 1194 | assert snapshot_ids == set(ids) |
| 1195 | assert elapsed < 3.0, f"500 snapshot took {elapsed:.2f}s (limit: 3s)" |
| 1196 | |
| 1197 | |
| 1198 | class TestStressLargeSnapshot: |
| 1199 | def test_1000_records_snapshot_under_1s(self, repo: pathlib.Path) -> None: |
| 1200 | """Snapshot of 1000 mixed records must complete in under 1 second.""" |
| 1201 | now_iso = now_utc_iso() |
| 1202 | exp_iso = ( |
| 1203 | datetime.datetime.now(datetime.timezone.utc) |
| 1204 | + datetime.timedelta(hours=1) |
| 1205 | ).isoformat() |
| 1206 | for i in range(250): |
| 1207 | for kind in ("reservations", "intents", "releases", "heartbeats"): |
| 1208 | uid = _new_id() |
| 1209 | (_coord_dir(repo) / kind / f"{uid}.json").write_text( |
| 1210 | json.dumps({"id": uid, "run_id": f"agent-{i}", "branch": "main", |
| 1211 | "created_at": now_iso, "expires_at": exp_iso}) |
| 1212 | ) |
| 1213 | start = time.monotonic() |
| 1214 | snap = _scan_dirs(repo) |
| 1215 | elapsed = time.monotonic() - start |
| 1216 | total = sum(len(v) for v in snap.values()) |
| 1217 | assert total == 1000 |
| 1218 | assert elapsed < 1.0, f"scan of 1000 records took {elapsed:.3f}s" |
| 1219 | |
| 1220 | |
| 1221 | class TestStressDiffPerformance: |
| 1222 | def test_diff_1000_entries_under_50ms(self) -> None: |
| 1223 | """Diffing two 1000-entry snapshots must take < 50 ms.""" |
| 1224 | old: _Snapshot = {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")} |
| 1225 | new: _Snapshot = {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")} |
| 1226 | for i in range(250): |
| 1227 | for sub in ("reservations", "intents", "releases", "heartbeats"): |
| 1228 | uid = _new_id() |
| 1229 | old[sub][uid] = (i * 1000, i * 10) |
| 1230 | new[sub][uid] = (i * 1000, i * 10) |
| 1231 | # Add 10 new entries to make the diff non-trivial. |
| 1232 | for _ in range(10): |
| 1233 | new["reservations"][_new_id()] = (999999, 99) |
| 1234 | |
| 1235 | start = time.monotonic() |
| 1236 | for _ in range(100): # 100 diff calls. |
| 1237 | _diff_snapshots(old, new) |
| 1238 | elapsed = time.monotonic() - start |
| 1239 | assert elapsed < 5.0, f"100 Γ diff took {elapsed:.3f}s (limit: 5s)" |
| 1240 | |
| 1241 | |
| 1242 | class TestStressManyExpirations: |
| 1243 | def test_200_expired_reservations_detected(self, repo: pathlib.Path) -> None: |
| 1244 | """200 expired reservations must all emit expired events.""" |
| 1245 | res_dir = _coord_dir(repo) / "reservations" |
| 1246 | past_iso = ( |
| 1247 | datetime.datetime.now(datetime.timezone.utc) |
| 1248 | - datetime.timedelta(hours=1) |
| 1249 | ).isoformat() |
| 1250 | now_iso = now_utc_iso() |
| 1251 | ids = set() |
| 1252 | for i in range(200): |
| 1253 | uid = _new_id() |
| 1254 | ids.add(uid) |
| 1255 | data = { |
| 1256 | "reservation_id": uid, |
| 1257 | "run_id": f"agent-{i}", |
| 1258 | "branch": "main", |
| 1259 | "addresses": [f"src/m{i}.py::fn"], |
| 1260 | "created_at": now_iso, |
| 1261 | "expires_at": past_iso, # Already expired. |
| 1262 | "operation": "modify", |
| 1263 | "schema_version": "1.0.0", |
| 1264 | } |
| 1265 | (res_dir / f"{uid}.json").write_text(json.dumps(data)) |
| 1266 | |
| 1267 | # prev_active_ids claims all 200 were active. |
| 1268 | start = time.monotonic() |
| 1269 | exp_events, curr = _check_expirations(repo, ids, set()) |
| 1270 | elapsed = time.monotonic() - start |
| 1271 | |
| 1272 | expired_ids = {e.id for e in exp_events} |
| 1273 | assert expired_ids == ids |
| 1274 | assert elapsed < 2.0, f"200 expiration checks took {elapsed:.3f}s (limit: 2s)" |
| 1275 | |
| 1276 | |
| 1277 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1278 | # Integration β _watch_loop with max_events |
| 1279 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1280 | |
| 1281 | |
| 1282 | class TestWatchLoopMaxEvents: |
| 1283 | def test_max_events_1_returns_single_snapshot(self, repo: pathlib.Path) -> None: |
| 1284 | """max_events=1 must emit exactly 1 event even if more exist.""" |
| 1285 | for i in range(5): |
| 1286 | _make_reservation(repo, run_id=f"ag-{i}") |
| 1287 | events = _run_loop(repo, once=True, max_events=1) |
| 1288 | assert len(events) == 1 |
| 1289 | |
| 1290 | def test_max_events_3_caps_at_3(self, repo: pathlib.Path) -> None: |
| 1291 | for i in range(10): |
| 1292 | _make_reservation(repo, run_id=f"ag-{i}") |
| 1293 | events = _run_loop(repo, once=True, max_events=3) |
| 1294 | assert len(events) == 3 |
| 1295 | |
| 1296 | def test_max_events_larger_than_existing_returns_all(self, repo: pathlib.Path) -> None: |
| 1297 | for i in range(4): |
| 1298 | _make_reservation(repo, run_id=f"ag-{i}") |
| 1299 | events = _run_loop(repo, once=True, max_events=100) |
| 1300 | assert len(events) == 4 |
| 1301 | |
| 1302 | def test_max_events_zero_is_rejected_by_run(self, repo: pathlib.Path) -> None: |
| 1303 | """run() must reject max_events=0 with USER_ERROR.""" |
| 1304 | from muse.cli.commands.watch_coord import run as watch_run |
| 1305 | ns = argparse.Namespace( |
| 1306 | once=True, timeout=None, max_events=0, |
| 1307 | poll_interval=0.1, run_id=None, |
| 1308 | branch_filter=None, kind=None, json_out=True, |
| 1309 | ) |
| 1310 | buf = io.StringIO() |
| 1311 | with ( |
| 1312 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 1313 | redirect_stdout(buf), |
| 1314 | ): |
| 1315 | with pytest.raises(SystemExit) as exc_info: |
| 1316 | watch_run(ns) |
| 1317 | assert exc_info.value.code == ExitCode.USER_ERROR |
| 1318 | |
| 1319 | def test_max_events_negative_is_rejected_by_run(self, repo: pathlib.Path) -> None: |
| 1320 | from muse.cli.commands.watch_coord import run as watch_run |
| 1321 | ns = argparse.Namespace( |
| 1322 | once=True, timeout=None, max_events=-5, |
| 1323 | poll_interval=0.1, run_id=None, |
| 1324 | branch_filter=None, kind=None, json_out=True, |
| 1325 | ) |
| 1326 | buf = io.StringIO() |
| 1327 | with ( |
| 1328 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 1329 | redirect_stdout(buf), |
| 1330 | ): |
| 1331 | with pytest.raises(SystemExit) as exc_info: |
| 1332 | watch_run(ns) |
| 1333 | assert exc_info.value.code == ExitCode.USER_ERROR |
| 1334 | |
| 1335 | def test_max_events_caps_added_events_in_loop(self, repo: pathlib.Path) -> None: |
| 1336 | """max_events must also cap events emitted during the loop (not just snapshots).""" |
| 1337 | # No pre-existing records; add 10 during the side effect. |
| 1338 | def _add_records() -> None: |
| 1339 | for i in range(10): |
| 1340 | _make_reservation(repo, run_id=f"ag-{i}") |
| 1341 | |
| 1342 | events = _run_loop( |
| 1343 | repo, |
| 1344 | once=False, |
| 1345 | timeout=1.0, |
| 1346 | side_effect=_add_records, |
| 1347 | max_events=3, |
| 1348 | ) |
| 1349 | assert len(events) <= 3 |
| 1350 | |
| 1351 | def test_max_events_json_error_is_compact(self, repo: pathlib.Path) -> None: |
| 1352 | """Error for bad max_events in JSON mode must be a single compact line.""" |
| 1353 | from muse.cli.commands.watch_coord import run as watch_run |
| 1354 | ns = argparse.Namespace( |
| 1355 | once=True, timeout=None, max_events=0, |
| 1356 | poll_interval=0.1, run_id=None, |
| 1357 | branch_filter=None, kind=None, json_out=True, |
| 1358 | ) |
| 1359 | buf = io.StringIO() |
| 1360 | with ( |
| 1361 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 1362 | redirect_stdout(buf), |
| 1363 | ): |
| 1364 | with pytest.raises(SystemExit): |
| 1365 | watch_run(ns) |
| 1366 | out = buf.getvalue().strip() |
| 1367 | assert "\n" not in out |
| 1368 | data = json.loads(out) |
| 1369 | assert "error" in data |
| 1370 | |
| 1371 | |
| 1372 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1373 | # End-to-end β run() input validation |
| 1374 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1375 | |
| 1376 | |
| 1377 | class TestRunCommandValidation: |
| 1378 | def _make_args(self, **kwargs: MsgpackValue) -> argparse.Namespace: |
| 1379 | ns = argparse.Namespace() |
| 1380 | ns.once = kwargs.get("once", True) |
| 1381 | ns.timeout = kwargs.get("timeout", None) |
| 1382 | ns.max_events = kwargs.get("max_events", None) |
| 1383 | ns.poll_interval = kwargs.get("poll_interval", 0.1) |
| 1384 | ns.run_id = kwargs.get("run_id", None) |
| 1385 | ns.branch_filter = kwargs.get("branch_filter", None) |
| 1386 | ns.kind = kwargs.get("kind", None) |
| 1387 | ns.json_out = kwargs.get("json_out", False) |
| 1388 | return ns |
| 1389 | |
| 1390 | def test_run_id_at_max_length_accepted(self, repo: pathlib.Path) -> None: |
| 1391 | from muse.cli.commands.watch_coord import run as watch_run |
| 1392 | run_id = "a" * _MAX_RUN_ID_LEN |
| 1393 | ns = self._make_args(run_id=run_id, once=True) |
| 1394 | buf = io.StringIO() |
| 1395 | with ( |
| 1396 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 1397 | redirect_stdout(buf), |
| 1398 | ): |
| 1399 | watch_run(ns) # must not raise |
| 1400 | |
| 1401 | def test_run_id_over_max_exits_user_error(self, repo: pathlib.Path) -> None: |
| 1402 | from muse.cli.commands.watch_coord import run as watch_run |
| 1403 | run_id = "a" * (_MAX_RUN_ID_LEN + 1) |
| 1404 | ns = self._make_args(run_id=run_id) |
| 1405 | with pytest.raises(SystemExit) as exc_info: |
| 1406 | watch_run(ns) |
| 1407 | assert exc_info.value.code == ExitCode.USER_ERROR |
| 1408 | |
| 1409 | def test_run_id_over_max_json_returns_error_field(self, repo: pathlib.Path) -> None: |
| 1410 | from muse.cli.commands.watch_coord import run as watch_run |
| 1411 | run_id = "a" * (_MAX_RUN_ID_LEN + 1) |
| 1412 | ns = self._make_args(run_id=run_id, json_out=True) |
| 1413 | buf = io.StringIO() |
| 1414 | with redirect_stdout(buf): |
| 1415 | with pytest.raises(SystemExit): |
| 1416 | watch_run(ns) |
| 1417 | out = buf.getvalue().strip() |
| 1418 | assert "\n" not in out # Compact JSON. |
| 1419 | data = json.loads(out) |
| 1420 | assert "error" in data |
| 1421 | assert data.get("status") == "bad_args" |
| 1422 | |
| 1423 | def test_poll_interval_below_min_exits_user_error(self, repo: pathlib.Path) -> None: |
| 1424 | from muse.cli.commands.watch_coord import run as watch_run |
| 1425 | ns = self._make_args(poll_interval=0.001) |
| 1426 | with pytest.raises(SystemExit) as exc_info: |
| 1427 | watch_run(ns) |
| 1428 | assert exc_info.value.code == ExitCode.USER_ERROR |
| 1429 | |
| 1430 | def test_poll_interval_above_max_exits_user_error(self, repo: pathlib.Path) -> None: |
| 1431 | from muse.cli.commands.watch_coord import run as watch_run |
| 1432 | ns = self._make_args(poll_interval=99999.0) |
| 1433 | with pytest.raises(SystemExit) as exc_info: |
| 1434 | watch_run(ns) |
| 1435 | assert exc_info.value.code == ExitCode.USER_ERROR |
| 1436 | |
| 1437 | def test_poll_interval_at_min_accepted(self, repo: pathlib.Path) -> None: |
| 1438 | from muse.cli.commands.watch_coord import run as watch_run |
| 1439 | ns = self._make_args(poll_interval=_MIN_POLL_INTERVAL, once=True) |
| 1440 | buf = io.StringIO() |
| 1441 | with ( |
| 1442 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 1443 | redirect_stdout(buf), |
| 1444 | ): |
| 1445 | watch_run(ns) # must not raise |
| 1446 | |
| 1447 | def test_poll_interval_at_max_accepted(self, repo: pathlib.Path) -> None: |
| 1448 | from muse.cli.commands.watch_coord import run as watch_run |
| 1449 | ns = self._make_args(poll_interval=_MAX_POLL_INTERVAL, once=True) |
| 1450 | buf = io.StringIO() |
| 1451 | with ( |
| 1452 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 1453 | redirect_stdout(buf), |
| 1454 | ): |
| 1455 | watch_run(ns) # must not raise |
| 1456 | |
| 1457 | def test_timeout_negative_exits_user_error(self, repo: pathlib.Path) -> None: |
| 1458 | from muse.cli.commands.watch_coord import run as watch_run |
| 1459 | ns = self._make_args(timeout=-1.0) |
| 1460 | with pytest.raises(SystemExit) as exc_info: |
| 1461 | watch_run(ns) |
| 1462 | assert exc_info.value.code == ExitCode.USER_ERROR |
| 1463 | |
| 1464 | def test_timeout_zero_accepted_as_once(self, repo: pathlib.Path) -> None: |
| 1465 | """--timeout 0 must complete quickly (treated as --once).""" |
| 1466 | from muse.cli.commands.watch_coord import run as watch_run |
| 1467 | ns = self._make_args(timeout=0.0, json_out=True) |
| 1468 | buf = io.StringIO() |
| 1469 | start = time.monotonic() |
| 1470 | with ( |
| 1471 | patch("muse.cli.commands.watch_coord.require_repo", return_value=repo), |
| 1472 | redirect_stdout(buf), |
| 1473 | ): |
| 1474 | watch_run(ns) |
| 1475 | assert time.monotonic() - start < 1.0 |
| 1476 | |
| 1477 | def test_validation_fires_before_require_repo(self) -> None: |
| 1478 | """Bad --run-id must fail before require_repo() is ever called.""" |
| 1479 | from muse.cli.commands.watch_coord import run as watch_run |
| 1480 | run_id = "x" * (_MAX_RUN_ID_LEN + 1) |
| 1481 | ns = argparse.Namespace( |
| 1482 | once=True, timeout=None, max_events=None, |
| 1483 | poll_interval=0.1, run_id=run_id, |
| 1484 | branch_filter=None, kind=None, json_out=False, |
| 1485 | ) |
| 1486 | called = [] |
| 1487 | with patch( |
| 1488 | "muse.cli.commands.watch_coord.require_repo", |
| 1489 | side_effect=lambda: called.append(True), |
| 1490 | ): |
| 1491 | with pytest.raises(SystemExit): |
| 1492 | watch_run(ns) |
| 1493 | assert called == [], "require_repo must not be called before validation passes" |
| 1494 | |
| 1495 | def test_error_message_has_check_mark_prefix(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 1496 | """Validation error on stderr must start with β.""" |
| 1497 | from muse.cli.commands.watch_coord import run as watch_run |
| 1498 | ns = self._make_args(poll_interval=0.0001) |
| 1499 | with pytest.raises(SystemExit): |
| 1500 | watch_run(ns) |
| 1501 | err = capsys.readouterr().err |
| 1502 | assert "β" in err |
| 1503 | |
| 1504 | |
| 1505 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1506 | # Stress β max_events under load |
| 1507 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 1508 | |
| 1509 | |
| 1510 | class TestStressMaxEvents: |
| 1511 | def test_max_events_10_from_500_records_returns_exactly_10(self, repo: pathlib.Path) -> None: |
| 1512 | """max_events=10 must return exactly 10 events even with 500 records.""" |
| 1513 | res_dir = _coord_dir(repo) / "reservations" |
| 1514 | now_iso = now_utc_iso() |
| 1515 | exp_iso = ( |
| 1516 | datetime.datetime.now(datetime.timezone.utc) |
| 1517 | + datetime.timedelta(hours=1) |
| 1518 | ).isoformat() |
| 1519 | for i in range(500): |
| 1520 | uid = _new_id() |
| 1521 | data = { |
| 1522 | "reservation_id": uid, |
| 1523 | "run_id": f"agent-{i}", |
| 1524 | "branch": "main", |
| 1525 | "addresses": [f"src/mod{i}.py::fn"], |
| 1526 | "created_at": now_iso, |
| 1527 | "expires_at": exp_iso, |
| 1528 | "operation": "modify", |
| 1529 | "schema_version": "1.0.0", |
| 1530 | } |
| 1531 | (res_dir / f"{uid}.json").write_text(json.dumps(data)) |
| 1532 | |
| 1533 | start = time.monotonic() |
| 1534 | events = _run_loop(repo, once=True, max_events=10) |
| 1535 | elapsed = time.monotonic() - start |
| 1536 | |
| 1537 | assert len(events) == 10 |
| 1538 | assert elapsed < 2.0, f"max_events cap with 500 records took {elapsed:.2f}s" |
| 1539 | |
| 1540 | |
| 1541 | # --------------------------------------------------------------------------- |
| 1542 | # Flag registration |
| 1543 | # --------------------------------------------------------------------------- |
| 1544 | |
| 1545 | |
| 1546 | class TestRegisterFlags: |
| 1547 | def _parse(self, *args: str) -> "argparse.Namespace": |
| 1548 | import argparse |
| 1549 | from muse.cli.commands.watch_coord import register |
| 1550 | p = argparse.ArgumentParser() |
| 1551 | sub = p.add_subparsers() |
| 1552 | register(sub) |
| 1553 | return p.parse_args(["watch", *args]) |
| 1554 | |
| 1555 | def test_default_json_out_is_false(self) -> None: |
| 1556 | ns = self._parse() |
| 1557 | assert ns.json_out is False |
| 1558 | |
| 1559 | def test_json_flag_sets_json_out(self) -> None: |
| 1560 | ns = self._parse("--json") |
| 1561 | assert ns.json_out is True |
| 1562 | |
| 1563 | def test_j_shorthand_sets_json_out(self) -> None: |
| 1564 | ns = self._parse("-j") |
| 1565 | assert ns.json_out is True |