gabriel / muse public
test_cmd_watch_coord.py python
1,565 lines 70.1 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Tests for ``muse coord watch`` (watch_coord.py).
2
3 Coverage matrix
4 ---------------
5 Unit
6 TestWatchEvent — to_dict() schema, all fields present
7 TestScanDirs — empty dir, missing dir, multiple kinds, TOCTOU safety
8 TestDiffSnapshots — no change, added, removed, modified, cross-kind
9 TestPassesFilters — kind/run_id/branch filters, AND semantics, empty data
10 TestEmitEventJson — JSON mode produces valid NDJSON, all event types
11 TestEmitEventText — text mode, all kinds, missing data, ANSI stripped
12 TestCheckExpirations — expired emitted, active not, removed not double-counted
13 TestMakeEvent — timestamp is UTC ISO 8601
14
15 Integration
16 TestWatchLoopOnce — --once emits snapshot for all existing records
17 TestWatchLoopFilters — kind/run_id/branch filters in loop
18 TestWatchLoopDetectsAdded — new reservation appears → added event
19 TestWatchLoopDetectsModified — heartbeat updated → modified event
20 TestWatchLoopDetectsRemoved — file deleted → removed event (with cached data)
21 TestWatchLoopDetectsExpiry — reservation expires → expired event (no file change)
22 TestWatchLoopTimeout — loop exits after timeout seconds
23 TestWatchLoopJsonOutput — all events are valid NDJSON in json mode
24 TestWatchLoopMaxEvents — loop exits after --max-events events emitted
25
26 End-to-end
27 TestRunCommand — run() with --once via mock require_repo
28 TestRunCommandValidation — run() validation: run-id/poll-interval/timeout/max-events
29 TestBackendSelection — kqueue backend on macOS, polling on other
30 TestSignalHandling — SIGTERM triggers clean exit
31
32 Security
33 TestAnsiInjectionSanitized — ANSI escapes in run_id/branch stripped from text
34 TestSymlinkDirRejected — kqueue raises ValueError on symlinked coord dir
35 TestKindFilterAllowlist — invalid kind strings rejected by argparse
36
37 Stress
38 TestStress500RapidAdds — 500 reservations added → all 500 added events
39 TestStressLargeSnapshot — 1000 existing records → snapshot < 1 s
40 TestStressDiffPerformance — diff of 2×1000-entry snapshots < 50 ms
41 TestStressManyExpirations — 200 expiring reservations → all expired events < 2 s
42 TestStressMaxEvents — max_events cap respected under 500-record load
43 """
44
45 from __future__ import annotations
46
47 import argparse
48 import datetime
49 import io
50 import itertools
51 import json
52 import pathlib
53 import tempfile
54 import time
55 from collections.abc import Callable
56 from contextlib import redirect_stdout
57 from unittest.mock import MagicMock, patch
58
59 import pytest
60
61 from muse.core.types import Manifest, MsgpackDict, MsgpackValue, content_hash, fake_id, now_utc_iso
62 from muse.core.paths import muse_dir
63
64 _id_seq = itertools.count()
65
66
67 def _new_id() -> str:
68 return content_hash({"seq": next(_id_seq)})
69
70 # Module under test.
71 from muse.cli.commands.watch_coord import (
72 WatchEvent,
73 _Backend,
74 _KqueueBackend,
75 _MAX_RUN_ID_LEN,
76 _MIN_POLL_INTERVAL,
77 _MAX_POLL_INTERVAL,
78 _PollingBackend,
79 _Snapshot,
80 _check_expirations,
81 _coord_dir,
82 _diff_snapshots,
83 _emit_event,
84 _ensure_coord_dirs,
85 _load_record,
86 _make_event,
87 _passes_filters,
88 _scan_dirs,
89 _watch_loop,
90 )
91 from muse.core.coordination import (
92 Reservation,
93 create_heartbeat,
94 create_intent,
95 create_reservation,
96 )
97 from muse.core.errors import ExitCode
98
99
100 # ─────────────────────────────────────────────────────────────────────────────
101 # Fixtures
102 # ─────────────────────────────────────────────────────────────────────────────
103
104
105 @pytest.fixture
106 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
107 """Minimal muse repository with coord dirs created."""
108 dot_muse = muse_dir(tmp_path)
109 dot_muse.mkdir()
110 (dot_muse / "repo.json").write_text(
111 json.dumps({"repo_id": fake_id("watch-coord-repo"), "name": "test-repo"})
112 )
113 _ensure_coord_dirs(tmp_path)
114 return tmp_path
115
116
117 class _ImmediateBackend(_Backend):
118 """Test backend that fires a side-effect once then sleeps so the timeout fires.
119
120 Design: ``wait()`` calls the side_effect on the first invocation (so files
121 can be written between the snapshot and the next scan), then sleeps for
122 ``timeout`` on all subsequent calls. This means the outer ``_watch_loop``
123 will do exactly one meaningful diff iteration (the one that sees the
124 side-effect's changes) and then block until the timeout deadline expires.
125
126 Always pass ``timeout`` (a finite deadline) when using ``once=False``.
127 """
128
129 name = "immediate"
130
131 def __init__(
132 self,
133 *,
134 side_effect: Callable[[], None] | None = None,
135 ) -> None:
136 self._side_effect = side_effect
137 self._fired = False
138
139 def wait(self, timeout: float) -> bool:
140 if not self._fired:
141 if self._side_effect:
142 self._side_effect()
143 self._fired = True
144 return True # Signal: something may have changed.
145 # Already fired — sleep so the loop deadline can expire.
146 time.sleep(max(0.0, timeout))
147 return False
148
149 def close(self) -> None:
150 pass
151
152
153 def _make_reservation(repo: pathlib.Path, **kwargs: MsgpackValue) -> Reservation:
154 """Create a reservation with sensible defaults."""
155 return create_reservation(
156 repo,
157 run_id=kwargs.get("run_id", "agent-1"),
158 branch=kwargs.get("branch", "main"),
159 addresses=kwargs.get("addresses", ["src/mod.py::fn"]),
160 ttl_seconds=kwargs.get("ttl_seconds", 3600),
161 operation=kwargs.get("operation", "modify"),
162 )
163
164
165 def _run_loop(
166 repo: pathlib.Path,
167 *,
168 side_effect: "callable | None" = None,
169 kind_filter: str | None = None,
170 run_id_filter: str | None = None,
171 branch_filter: str | None = None,
172 as_json: bool = True,
173 once: bool = True,
174 # When once=False, a finite timeout is required so the loop exits.
175 # Default 1.0 s is enough for one side-effect iteration.
176 timeout: float | None = None,
177 poll_interval: float = 0.05,
178 max_events: int | None = None,
179 ) -> list[dict]:
180 """Run _watch_loop with an ImmediateBackend; return parsed events.
181
182 For ``once=False`` tests, pass ``timeout`` explicitly (e.g. ``timeout=1.0``).
183 The backend fires the side_effect on the first wait, then sleeps until the
184 deadline, giving the loop exactly one diff iteration to detect changes.
185 """
186 # Enforce finite timeout for non-once mode to prevent hangs.
187 if not once and timeout is None:
188 timeout = 1.0
189 buf = io.StringIO()
190 backend = _ImmediateBackend(side_effect=side_effect)
191 with redirect_stdout(buf):
192 _watch_loop(
193 repo,
194 backend,
195 kind_filter=kind_filter,
196 run_id_filter=run_id_filter,
197 branch_filter=branch_filter,
198 as_json=as_json,
199 once=once,
200 timeout=timeout,
201 poll_interval=poll_interval,
202 max_events=max_events,
203 )
204 lines = [l for l in buf.getvalue().splitlines() if l.strip()]
205 return [json.loads(line) for line in lines]
206
207
208 # ─────────────────────────────────────────────────────────────────────────────
209 # Unit — WatchEvent
210 # ─────────────────────────────────────────────────────────────────────────────
211
212
213 class TestWatchEvent:
214 def test_to_dict_has_all_fields(self) -> None:
215 ev = WatchEvent("added", "reservation", "uid-1", "2026-01-01T00:00:00+00:00", {})
216 d = ev.to_dict()
217 assert set(d.keys()) == {
218 "schema_version", "event_type", "kind", "id", "timestamp", "data"
219 }
220
221 def test_to_dict_values_round_trip(self) -> None:
222 payload = {"run_id": "a", "branch": "main"}
223 ev = WatchEvent("modified", "heartbeat", "uid-2", "2026-01-01T00:00:00+00:00", payload)
224 d = ev.to_dict()
225 assert d["event_type"] == "modified"
226 assert d["kind"] == "heartbeat"
227 assert d["id"] == "uid-2"
228 assert d["data"] == payload
229
230 def test_to_dict_schema_version_is_string(self) -> None:
231 ev = WatchEvent("snapshot", "intent", "uid-3", "ts", {})
232 assert isinstance(ev.to_dict()["schema_version"], str)
233
234
235 # ─────────────────────────────────────────────────────────────────────────────
236 # Unit — _scan_dirs
237 # ─────────────────────────────────────────────────────────────────────────────
238
239
240 class TestScanDirs:
241 def test_empty_coord_dirs_returns_empty_dicts(self, repo: pathlib.Path) -> None:
242 snap = _scan_dirs(repo)
243 for sub in ("reservations", "intents", "releases", "heartbeats"):
244 assert snap[sub] == {}
245
246 def test_missing_subdir_returns_empty(self, tmp_path: pathlib.Path) -> None:
247 # No .muse/ dir at all.
248 snap = _scan_dirs(tmp_path)
249 for sub in ("reservations", "intents", "releases", "heartbeats"):
250 assert snap[sub] == {}
251
252 def test_file_appears_in_snapshot(self, repo: pathlib.Path) -> None:
253 path = _coord_dir(repo) / "reservations" / "abc123.json"
254 path.write_text('{"x": 1}')
255 snap = _scan_dirs(repo)
256 assert "abc123" in snap["reservations"]
257
258 def test_snapshot_entry_is_mtime_ns_and_size(self, repo: pathlib.Path) -> None:
259 path = _coord_dir(repo) / "reservations" / "abc123.json"
260 path.write_text('{"x": 1}')
261 snap = _scan_dirs(repo)
262 mtime_ns, size = snap["reservations"]["abc123"]
263 st = path.stat()
264 assert mtime_ns == st.st_mtime_ns
265 assert size == st.st_size
266
267 def test_non_json_files_ignored(self, repo: pathlib.Path) -> None:
268 (_coord_dir(repo) / "reservations" / "not-a-json.txt").write_text("hi")
269 snap = _scan_dirs(repo)
270 assert snap["reservations"] == {}
271
272 def test_multiple_kinds_all_scanned(self, repo: pathlib.Path) -> None:
273 (_coord_dir(repo) / "reservations" / "r1.json").write_text("{}")
274 (_coord_dir(repo) / "intents" / "i1.json").write_text("{}")
275 snap = _scan_dirs(repo)
276 assert "r1" in snap["reservations"]
277 assert "i1" in snap["intents"]
278
279 def test_toctou_deleted_file_skipped(self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
280 """File that disappears between glob and stat is silently skipped."""
281 real_glob = pathlib.Path.glob
282
283 def patched_glob(self: pathlib.Path, pattern: str) -> list[pathlib.Path]:
284 results = list(real_glob(self, pattern))
285 # Inject a ghost path that does not exist.
286 ghost = self / "ghost.json"
287 return results + [ghost]
288
289 monkeypatch.setattr(pathlib.Path, "glob", patched_glob)
290 snap = _scan_dirs(repo)
291 assert "ghost" not in snap["reservations"]
292
293
294 # ─────────────────────────────────────────────────────────────────────────────
295 # Unit — _diff_snapshots
296 # ─────────────────────────────────────────────────────────────────────────────
297
298
299 class TestDiffSnapshots:
300 def _empty(self) -> _Snapshot:
301 return {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")}
302
303 def test_no_change_no_events(self) -> None:
304 snap = self._empty()
305 snap["reservations"]["uid1"] = (100, 50)
306 assert _diff_snapshots(snap, snap) == []
307
308 def test_added_detected(self) -> None:
309 old = self._empty()
310 new = {**old, "reservations": {"uid1": (100, 50)}}
311 events = _diff_snapshots(old, new)
312 assert ("added", "reservations", "uid1") in events
313
314 def test_removed_detected(self) -> None:
315 old = {**self._empty(), "reservations": {"uid1": (100, 50)}}
316 new = self._empty()
317 events = _diff_snapshots(old, new)
318 assert ("removed", "reservations", "uid1") in events
319
320 def test_modified_detected(self) -> None:
321 snap = {**self._empty(), "reservations": {"uid1": (100, 50)}}
322 new_snap = {**self._empty(), "reservations": {"uid1": (200, 51)}}
323 events = _diff_snapshots(snap, new_snap)
324 assert ("modified", "reservations", "uid1") in events
325
326 def test_same_mtime_different_size_is_modified(self) -> None:
327 snap = {**self._empty(), "reservations": {"uid1": (100, 50)}}
328 new_snap = {**self._empty(), "reservations": {"uid1": (100, 99)}}
329 events = _diff_snapshots(snap, new_snap)
330 assert ("modified", "reservations", "uid1") in events
331
332 def test_unchanged_mtime_and_size_is_not_modified(self) -> None:
333 snap = {**self._empty(), "reservations": {"uid1": (100, 50)}}
334 events = _diff_snapshots(snap, snap)
335 assert not any(e[0] == "modified" for e in events)
336
337 def test_multiple_kinds_in_one_diff(self) -> None:
338 old = {**self._empty(), "reservations": {"r1": (1, 10)}}
339 new = {
340 "reservations": {},
341 "intents": {"i1": (2, 20)},
342 "releases": {},
343 "heartbeats": {},
344 }
345 events = _diff_snapshots(old, new)
346 event_types = {(e[0], e[2]) for e in events}
347 assert ("removed", "r1") in event_types
348 assert ("added", "i1") in event_types
349
350 def test_output_is_sorted_deterministic(self) -> None:
351 """IDs within each kind are sorted for reproducible test output."""
352 old = self._empty()
353 new = {
354 **self._empty(),
355 "reservations": {"zzz": (1, 1), "aaa": (2, 2), "mmm": (3, 3)},
356 }
357 events = _diff_snapshots(old, new)
358 added_ids = [uid for et, kind, uid in events if et == "added"]
359 assert added_ids == sorted(added_ids)
360
361
362 # ─────────────────────────────────────────────────────────────────────────────
363 # Unit — _passes_filters
364 # ─────────────────────────────────────────────────────────────────────────────
365
366
367 class TestPassesFilters:
368 def _data(self, run_id: str = "agent-1", branch: str = "main") -> Manifest:
369 return {"run_id": run_id, "branch": branch}
370
371 def test_no_filters_always_passes(self) -> None:
372 assert _passes_filters("reservation", self._data(), None, None, None)
373
374 def test_kind_filter_match(self) -> None:
375 assert _passes_filters("reservation", self._data(), "reservation", None, None)
376
377 def test_kind_filter_no_match(self) -> None:
378 assert not _passes_filters("intent", self._data(), "reservation", None, None)
379
380 def test_run_id_filter_match(self) -> None:
381 assert _passes_filters("reservation", self._data(), None, "agent-1", None)
382
383 def test_run_id_filter_no_match(self) -> None:
384 assert not _passes_filters("reservation", self._data(), None, "agent-9", None)
385
386 def test_branch_filter_match(self) -> None:
387 assert _passes_filters("reservation", self._data(), None, None, "main")
388
389 def test_branch_filter_no_match(self) -> None:
390 assert not _passes_filters("reservation", self._data(), None, None, "feature/x")
391
392 def test_all_filters_and_semantics(self) -> None:
393 """All three filters must pass simultaneously."""
394 d = self._data("agent-1", "main")
395 assert _passes_filters("reservation", d, "reservation", "agent-1", "main")
396 assert not _passes_filters("reservation", d, "reservation", "agent-1", "other")
397
398 def test_empty_data_fails_run_id_filter(self) -> None:
399 """Empty data (removed event with no cache) fails run_id/branch filters."""
400 assert not _passes_filters("reservation", {}, None, "agent-1", None)
401
402 def test_empty_data_passes_when_no_filters(self) -> None:
403 assert _passes_filters("reservation", {}, None, None, None)
404
405
406 # ─────────────────────────────────────────────────────────────────────────────
407 # Unit — _emit_event JSON mode
408 # ─────────────────────────────────────────────────────────────────────────────
409
410
411 class TestEmitEventJson:
412 def _capture_json(self, ev: WatchEvent) -> MsgpackDict:
413 buf = io.StringIO()
414 with redirect_stdout(buf):
415 _emit_event(ev, as_json=True)
416 return json.loads(buf.getvalue().strip())
417
418 def test_json_is_valid_ndjson(self) -> None:
419 ev = WatchEvent("added", "reservation", "uid1", "2026-01-01T00:00:00+00:00",
420 {"run_id": "a", "branch": "b"})
421 d = self._capture_json(ev)
422 assert d["event_type"] == "added"
423 assert d["kind"] == "reservation"
424
425 def test_all_event_types_serialise(self) -> None:
426 for et in ("snapshot", "added", "modified", "removed", "expired"):
427 ev = WatchEvent(et, "intent", "uid2", "ts", {})
428 d = self._capture_json(ev)
429 assert d["event_type"] == et
430
431 def test_data_payload_preserved(self) -> None:
432 payload = {"run_id": "x", "addresses": ["f.py::fn"], "branch": "dev"}
433 ev = WatchEvent("added", "reservation", "uid3", "ts", payload)
434 d = self._capture_json(ev)
435 assert d["data"] == payload
436
437
438 # ─────────────────────────────────────────────────────────────────────────────
439 # Unit — _emit_event text mode
440 # ─────────────────────────────────────────────────────────────────────────────
441
442
443 class TestEmitEventText:
444 def _capture_text(self, ev: WatchEvent) -> str:
445 buf = io.StringIO()
446 with redirect_stdout(buf):
447 _emit_event(ev, as_json=False)
448 return buf.getvalue()
449
450 def test_reservation_icon_and_kind(self) -> None:
451 ev = WatchEvent("added", "reservation", "a" * 8, "ts",
452 {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]})
453 out = self._capture_text(ev)
454 assert "+" in out
455 assert "reservation" in out
456
457 def test_snapshot_icon(self) -> None:
458 ev = WatchEvent("snapshot", "reservation", "a" * 8, "ts",
459 {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]})
460 out = self._capture_text(ev)
461 assert "·" in out
462
463 def test_expired_icon(self) -> None:
464 ev = WatchEvent("expired", "reservations", "a" * 8, "ts",
465 {"run_id": "r1", "branch": "main", "addresses": ["f.py::fn"]})
466 out = self._capture_text(ev)
467 assert "!" in out
468
469 def test_empty_data_does_not_crash(self) -> None:
470 ev = WatchEvent("removed", "reservation", "b" * 8, "ts", {})
471 out = self._capture_text(ev)
472 assert "bbbbbbbb" in out
473
474 def test_heartbeat_shows_extends_to(self) -> None:
475 ev = WatchEvent("modified", "heartbeat", "c" * 8, "ts",
476 {"run_id": "r1", "reservation_id": "d" * 8,
477 "extended_expires_at": "2099-01-01T00:00:00+00:00"})
478 out = self._capture_text(ev)
479 assert "extends to" in out
480
481 def test_release_shows_reason(self) -> None:
482 ev = WatchEvent("added", "release", "e" * 8, "ts",
483 {"run_id": "r1", "reason": "completed",
484 "reservation_id": "f" * 8})
485 out = self._capture_text(ev)
486 assert "completed" in out
487
488 def test_intent_shows_operation(self) -> None:
489 ev = WatchEvent("added", "intent", "g" * 8, "ts",
490 {"run_id": "r1", "branch": "main", "operation": "delete",
491 "addresses": ["src/mod.py::fn"]})
492 out = self._capture_text(ev)
493 assert "delete" in out
494
495 def test_ansi_escape_stripped_from_run_id(self) -> None:
496 """ANSI escape in run_id must not reach terminal output."""
497 malicious_run_id = "\x1b[31mmalicious\x1b[0m"
498 ev = WatchEvent("added", "reservation", "h" * 8, "ts",
499 {"run_id": malicious_run_id, "branch": "main",
500 "addresses": ["f.py::fn"]})
501 out = self._capture_text(ev)
502 assert "\x1b" not in out
503 assert "malicious" in out # Content preserved, escapes stripped.
504
505 def test_long_address_list_truncated(self) -> None:
506 addrs = [f"f.py::fn{i}" for i in range(10)]
507 ev = WatchEvent("added", "reservation", "i" * 8, "ts",
508 {"run_id": "r1", "branch": "main", "addresses": addrs})
509 out = self._capture_text(ev)
510 assert "+7 more" in out
511
512
513 # ─────────────────────────────────────────────────────────────────────────────
514 # Unit — _check_expirations
515 # ─────────────────────────────────────────────────────────────────────────────
516
517
518 class TestCheckExpirations:
519 def test_no_change_no_events(self, repo: pathlib.Path) -> None:
520 res = _make_reservation(repo)
521 active_ids = {res.reservation_id}
522 events, curr = _check_expirations(repo, active_ids, set())
523 assert events == []
524 assert res.reservation_id in curr
525
526 def test_expired_reservation_emits_event(self, repo: pathlib.Path) -> None:
527 """An ID that was active but is no longer → expired event."""
528 gone_id = _new_id()
529 active_ids = {gone_id}
530 # Create the file with an explicit past expiry so _load_record can find it
531 # and active_reservations() correctly excludes it.
532 past_iso = (
533 datetime.datetime.now(datetime.timezone.utc)
534 - datetime.timedelta(hours=2)
535 ).isoformat()
536 now_iso = now_utc_iso()
537 path = _coord_dir(repo) / "reservations" / f"{gone_id}.json"
538 path.write_text(json.dumps({
539 "reservation_id": gone_id,
540 "run_id": "r1",
541 "branch": "main",
542 "addresses": ["src/mod.py::fn"],
543 "created_at": now_iso,
544 "expires_at": past_iso,
545 "operation": "modify",
546 "schema_version": "1.0.0",
547 }))
548 events, curr = _check_expirations(repo, active_ids, set())
549 assert any(e.event_type == "expired" and e.id == gone_id for e in events)
550 assert gone_id not in curr
551
552 def test_removed_id_not_double_counted(self, repo: pathlib.Path) -> None:
553 """ID in removed_ids must NOT produce an expired event too."""
554 gone_id = _new_id()
555 events, _ = _check_expirations(repo, {gone_id}, {gone_id})
556 assert events == []
557
558 def test_active_reservation_stays_active(self, repo: pathlib.Path) -> None:
559 """Live reservation is NOT reported as expired."""
560 res = _make_reservation(repo)
561 prev = {res.reservation_id}
562 events, curr = _check_expirations(repo, prev, set())
563 assert not any(e.event_type == "expired" for e in events)
564
565 def test_returns_current_active_ids(self, repo: pathlib.Path) -> None:
566 res = _make_reservation(repo)
567 _, curr = _check_expirations(repo, set(), set())
568 assert res.reservation_id in curr
569
570
571 # ─────────────────────────────────────────────────────────────────────────────
572 # Unit — _make_event
573 # ─────────────────────────────────────────────────────────────────────────────
574
575
576 class TestMakeEvent:
577 def test_timestamp_is_utc_iso8601(self) -> None:
578 ev = _make_event("added", "reservation", "uid1", {})
579 # Must parse as UTC datetime.
580 dt = datetime.datetime.fromisoformat(ev.timestamp)
581 assert dt.tzinfo is not None
582
583 def test_fields_set_correctly(self) -> None:
584 ev = _make_event("removed", "intent", "uid2", {"x": 1})
585 assert ev.event_type == "removed"
586 assert ev.kind == "intent"
587 assert ev.id == "uid2"
588 assert ev.data == {"x": 1}
589
590
591 # ─────────────────────────────────────────────────────────────────────────────
592 # Integration — _watch_loop --once
593 # ─────────────────────────────────────────────────────────────────────────────
594
595
596 class TestWatchLoopOnce:
597 def test_empty_repo_no_events(self, repo: pathlib.Path) -> None:
598 events = _run_loop(repo, once=True)
599 assert events == []
600
601 def test_existing_reservation_emits_snapshot(self, repo: pathlib.Path) -> None:
602 _make_reservation(repo, run_id="agent-1")
603 events = _run_loop(repo, once=True)
604 kinds = [e["kind"] for e in events]
605 assert "reservations" in kinds
606
607 def test_snapshot_event_type(self, repo: pathlib.Path) -> None:
608 _make_reservation(repo)
609 events = _run_loop(repo, once=True)
610 assert all(e["event_type"] == "snapshot" for e in events)
611
612 def test_all_kinds_emitted_in_snapshot(self, repo: pathlib.Path) -> None:
613 res = _make_reservation(repo)
614 create_intent(repo, res.reservation_id, "agent-1", "main",
615 ["src/mod.py::fn"], "modify")
616 events = _run_loop(repo, once=True)
617 kinds = {e["kind"] for e in events}
618 assert "reservations" in kinds
619 assert "intents" in kinds
620
621 def test_snapshot_carries_data(self, repo: pathlib.Path) -> None:
622 _make_reservation(repo, run_id="agent-snapshot-test")
623 events = _run_loop(repo, once=True)
624 res_events = [e for e in events if e["kind"] == "reservations"]
625 assert any(e["data"].get("run_id") == "agent-snapshot-test" for e in res_events)
626
627 def test_once_does_not_loop(self, repo: pathlib.Path) -> None:
628 """--once must return quickly (no blocking wait calls)."""
629 start = time.monotonic()
630 _run_loop(repo, once=True)
631 elapsed = time.monotonic() - start
632 assert elapsed < 1.0 # Must not block for poll_interval.
633
634
635 # ─────────────────────────────────────────────────────────────────────────────
636 # Integration — filters in _watch_loop
637 # ─────────────────────────────────────────────────────────────────────────────
638
639
640 class TestWatchLoopFilters:
641 def test_kind_filter_reservations(self, repo: pathlib.Path) -> None:
642 res = _make_reservation(repo)
643 create_intent(repo, res.reservation_id, "agent-1", "main",
644 ["src/mod.py::fn"], "modify")
645 events = _run_loop(repo, once=True, kind_filter="reservations")
646 assert all(e["kind"] == "reservations" for e in events)
647
648 def test_kind_filter_intents(self, repo: pathlib.Path) -> None:
649 res = _make_reservation(repo)
650 create_intent(repo, res.reservation_id, "agent-1", "main",
651 ["src/mod.py::fn"], "modify")
652 events = _run_loop(repo, once=True, kind_filter="intents")
653 assert all(e["kind"] == "intents" for e in events)
654
655 def test_run_id_filter(self, repo: pathlib.Path) -> None:
656 _make_reservation(repo, run_id="agent-A")
657 _make_reservation(repo, run_id="agent-B")
658 events = _run_loop(repo, once=True, run_id_filter="agent-A")
659 assert all(e["data"].get("run_id") == "agent-A" for e in events)
660 assert len(events) == 1
661
662 def test_branch_filter(self, repo: pathlib.Path) -> None:
663 _make_reservation(repo, branch="main")
664 _make_reservation(repo, branch="feature/x")
665 events = _run_loop(repo, once=True, branch_filter="main")
666 assert all(e["data"].get("branch") == "main" for e in events)
667 assert len(events) == 1
668
669 def test_combined_filters(self, repo: pathlib.Path) -> None:
670 _make_reservation(repo, run_id="agent-A", branch="main")
671 _make_reservation(repo, run_id="agent-A", branch="feature/x")
672 _make_reservation(repo, run_id="agent-B", branch="main")
673 events = _run_loop(repo, once=True, run_id_filter="agent-A", branch_filter="main")
674 assert len(events) == 1
675 assert events[0]["data"]["run_id"] == "agent-A"
676 assert events[0]["data"]["branch"] == "main"
677
678 def test_no_match_no_events(self, repo: pathlib.Path) -> None:
679 _make_reservation(repo, run_id="agent-X")
680 events = _run_loop(repo, once=True, run_id_filter="agent-NOBODY")
681 assert events == []
682
683
684 # ─────────────────────────────────────────────────────────────────────────────
685 # Integration — added events
686 # ─────────────────────────────────────────────────────────────────────────────
687
688
689 class TestWatchLoopDetectsAdded:
690 def test_new_reservation_emits_added(self, repo: pathlib.Path) -> None:
691 added = []
692
693 def _write_res() -> None:
694 _make_reservation(repo, run_id="agent-new")
695
696 events = _run_loop(repo, side_effect=_write_res, once=False)
697 added = [e for e in events if e["event_type"] == "added"]
698 assert len(added) == 1
699 assert added[0]["kind"] == "reservations"
700 assert added[0]["data"]["run_id"] == "agent-new"
701
702 def test_new_intent_emits_added(self, repo: pathlib.Path) -> None:
703 res = _make_reservation(repo)
704
705 def _write_intent() -> None:
706 create_intent(repo, res.reservation_id, "agent-1", "main",
707 ["src/mod.py::fn"], "modify")
708
709 events = _run_loop(repo, side_effect=_write_intent, once=False)
710 added = [e for e in events if e["event_type"] == "added" and e["kind"] == "intents"]
711 assert len(added) == 1
712
713 def test_added_event_carries_data(self, repo: pathlib.Path) -> None:
714 def _write_res() -> None:
715 _make_reservation(repo, run_id="agent-data-test", branch="feature/y")
716
717 events = _run_loop(repo, side_effect=_write_res, once=False)
718 added = [e for e in events if e["event_type"] == "added"]
719 assert added[0]["data"]["run_id"] == "agent-data-test"
720 assert added[0]["data"]["branch"] == "feature/y"
721
722
723 # ─────────────────────────────────────────────────────────────────────────────
724 # Integration — modified events
725 # ─────────────────────────────────────────────────────────────────────────────
726
727
728 class TestWatchLoopDetectsModified:
729 def test_heartbeat_update_emits_modified(self, repo: pathlib.Path) -> None:
730 res = _make_reservation(repo)
731 # Write an initial heartbeat so the file exists in the snapshot.
732 create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=100)
733
734 def _update_hb() -> None:
735 # Small sleep ensures mtime changes on fast filesystems.
736 time.sleep(0.02)
737 create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=200)
738
739 events = _run_loop(repo, side_effect=_update_hb, once=False)
740 modified = [e for e in events if e["event_type"] == "modified" and e["kind"] == "heartbeats"]
741 assert len(modified) == 1
742
743 def test_modified_event_carries_new_data(self, repo: pathlib.Path) -> None:
744 res = _make_reservation(repo)
745 create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=100)
746
747 def _update_hb() -> None:
748 time.sleep(0.02)
749 create_heartbeat(repo, res.reservation_id, "agent-1", extension_seconds=9999)
750
751 events = _run_loop(repo, side_effect=_update_hb, once=False)
752 modified = [e for e in events if e["event_type"] == "modified"]
753 if modified:
754 # Data should reflect the new heartbeat.
755 assert "extended_expires_at" in modified[0]["data"]
756
757
758 # ─────────────────────────────────────────────────────────────────────────────
759 # Integration — removed events
760 # ─────────────────────────────────────────────────────────────────────────────
761
762
763 class TestWatchLoopDetectsRemoved:
764 def test_deleted_file_emits_removed(self, repo: pathlib.Path) -> None:
765 res = _make_reservation(repo)
766 # Snapshot includes the reservation.
767 path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json"
768
769 def _delete_file() -> None:
770 path.unlink()
771
772 events = _run_loop(repo, side_effect=_delete_file, once=False)
773 removed = [e for e in events if e["event_type"] == "removed"]
774 assert len(removed) == 1
775 assert removed[0]["id"] == res.reservation_id
776
777 def test_removed_event_carries_cached_data(self, repo: pathlib.Path) -> None:
778 """Even after deletion, removed events carry the last-known data."""
779 res = _make_reservation(repo, run_id="agent-cached")
780 path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json"
781
782 def _delete_file() -> None:
783 path.unlink()
784
785 events = _run_loop(repo, side_effect=_delete_file, once=False)
786 removed = [e for e in events if e["event_type"] == "removed"]
787 assert removed[0]["data"].get("run_id") == "agent-cached"
788
789 def test_removed_event_passes_run_id_filter(self, repo: pathlib.Path) -> None:
790 """Filter by run_id must work for removed events using cached data."""
791 res = _make_reservation(repo, run_id="agent-filter-test")
792 path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json"
793
794 def _delete_file() -> None:
795 path.unlink()
796
797 events = _run_loop(
798 repo, side_effect=_delete_file, once=False,
799 run_id_filter="agent-filter-test",
800 )
801 removed = [e for e in events if e["event_type"] == "removed"]
802 assert len(removed) == 1
803
804
805 # ─────────────────────────────────────────────────────────────────────────────
806 # Integration — expiration events
807 # ─────────────────────────────────────────────────────────────────────────────
808
809
810 class TestWatchLoopDetectsExpiry:
811 def test_expired_reservation_emits_expired(self, repo: pathlib.Path) -> None:
812 """Reservation active at startup that expires during loop → expired event.
813
814 Strategy: create an active reservation, then the side_effect rewrites the
815 file with expires_at in the past. The loop sees it as modified on the FS
816 AND sees it drop out of active_reservations() → fires expired event.
817 """
818 res = _make_reservation(repo, ttl_seconds=3600)
819 path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json"
820
821 def _expire_it() -> None:
822 data = json.loads(path.read_text())
823 past = (
824 datetime.datetime.now(datetime.timezone.utc)
825 - datetime.timedelta(hours=2)
826 ).isoformat()
827 data["expires_at"] = past
828 path.write_text(json.dumps(data))
829
830 events = _run_loop(repo, side_effect=_expire_it, once=False)
831 expired = [e for e in events if e["event_type"] == "expired"]
832 assert any(e["id"] == res.reservation_id for e in expired)
833
834 def test_active_reservation_no_expired_event(self, repo: pathlib.Path) -> None:
835 res = _make_reservation(repo, ttl_seconds=9999)
836 events = _run_loop(repo, once=False)
837 expired = [e for e in events if e["event_type"] == "expired"]
838 assert not any(e["id"] == res.reservation_id for e in expired)
839
840 def test_removed_reservation_not_expired(self, repo: pathlib.Path) -> None:
841 """GC'd reservation (file deleted) must not also fire expired.
842
843 Strategy: create an active reservation, then the side_effect deletes it
844 (simulating GC). Must see a removed event but NOT an additional expired
845 event for the same ID.
846 """
847 res = _make_reservation(repo, ttl_seconds=3600)
848 path = _coord_dir(repo) / "reservations" / f"{res.reservation_id}.json"
849
850 def _delete_file() -> None:
851 path.unlink()
852
853 events = _run_loop(repo, side_effect=_delete_file, once=False)
854 removed = [e for e in events if e["event_type"] == "removed" and e["id"] == res.reservation_id]
855 expired_dup = [
856 e for e in events
857 if e["event_type"] == "expired" and e["id"] == res.reservation_id
858 ]
859 # Must see a removed event, must NOT also see expired for the same ID.
860 assert removed
861 assert not expired_dup
862
863
864 # ─────────────────────────────────────────────────────────────────────────────
865 # Integration — timeout
866 # ─────────────────────────────────────────────────────────────────────────────
867
868
869 class TestWatchLoopTimeout:
870 def test_timeout_zero_equivalent_to_once(self, repo: pathlib.Path) -> None:
871 """timeout=0 should exit after snapshot, no looping."""
872 _make_reservation(repo)
873 start = time.monotonic()
874 events = _run_loop(repo, once=False, timeout=0.0)
875 elapsed = time.monotonic() - start
876 # Should return quickly (snapshot only).
877 assert elapsed < 1.0
878 # Should still emit the snapshot.
879 assert any(e["event_type"] == "snapshot" for e in events)
880
881 def test_loop_exits_after_timeout(self, repo: pathlib.Path) -> None:
882 """With a real PollingBackend and short timeout, loop exits."""
883 buf = io.StringIO()
884 backend = _PollingBackend(0.05)
885 start = time.monotonic()
886 with redirect_stdout(buf):
887 _watch_loop(
888 repo, backend,
889 kind_filter=None, run_id_filter=None, branch_filter=None,
890 as_json=True, once=False, timeout=0.2, poll_interval=0.05,
891 )
892 elapsed = time.monotonic() - start
893 assert elapsed < 2.0 # Must not run forever.
894
895
896 # ─────────────────────────────────────────────────────────────────────────────
897 # Integration — JSON output
898 # ─────────────────────────────────────────────────────────────────────────────
899
900
901 class TestWatchLoopJsonOutput:
902 def test_all_snapshot_events_are_valid_json(self, repo: pathlib.Path) -> None:
903 _make_reservation(repo)
904 events = _run_loop(repo, once=True, as_json=True)
905 for ev in events:
906 # Already parsed by _run_loop, so this just checks structure.
907 assert "schema_version" in ev
908 assert "event_type" in ev
909 assert "kind" in ev
910 assert "id" in ev
911 assert "timestamp" in ev
912 assert "data" in ev
913
914 def test_json_event_type_values(self, repo: pathlib.Path) -> None:
915 _make_reservation(repo)
916 events = _run_loop(repo, once=True)
917 valid_types = {"snapshot", "added", "modified", "removed", "expired"}
918 for ev in events:
919 assert ev["event_type"] in valid_types
920
921
922 # ─────────────────────────────────────────────────────────────────────────────
923 # End-to-end — run() with mocked require_repo
924 # ─────────────────────────────────────────────────────────────────────────────
925
926
927 class TestRunCommand:
928 def _make_args(self, repo: pathlib.Path, **kwargs: MsgpackValue) -> argparse.Namespace:
929 import argparse
930 ns = argparse.Namespace()
931 ns.once = kwargs.get("once", True)
932 ns.timeout = kwargs.get("timeout", None)
933 ns.max_events = kwargs.get("max_events", None)
934 ns.poll_interval = kwargs.get("poll_interval", 0.1)
935 ns.run_id = kwargs.get("run_id", None)
936 ns.branch_filter = kwargs.get("branch_filter", None)
937 ns.kind = kwargs.get("kind", None)
938 ns.json_out = kwargs.get("json_out", True)
939 return ns
940
941 def test_run_once_emits_snapshot(self, repo: pathlib.Path) -> None:
942 from muse.cli.commands.watch_coord import run
943 _make_reservation(repo)
944 args = self._make_args(repo, once=True)
945 buf = io.StringIO()
946 with (
947 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
948 redirect_stdout(buf),
949 ):
950 run(args)
951 lines = [l for l in buf.getvalue().splitlines() if l.strip()]
952 events = [json.loads(l) for l in lines]
953 assert any(e["event_type"] == "snapshot" for e in events)
954
955 def test_run_text_mode_header_printed(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
956 from muse.cli.commands.watch_coord import run
957 args = self._make_args(repo, once=True, json_out=False)
958 with patch("muse.cli.commands.watch_coord.require_repo", return_value=repo):
959 run(args)
960 captured = capsys.readouterr()
961 assert "muse coord watch" in captured.out
962 assert "watch ended" in captured.out
963
964 def test_run_json_mode_no_header(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
965 from muse.cli.commands.watch_coord import run
966 args = self._make_args(repo, once=True, json_out=True)
967 with patch("muse.cli.commands.watch_coord.require_repo", return_value=repo):
968 run(args)
969 captured = capsys.readouterr()
970 lines = [l for l in captured.out.splitlines() if l.strip()]
971 for line in lines:
972 # Every non-empty line must be valid JSON.
973 json.loads(line)
974
975 def test_run_timeout_zero_treated_as_once(self, repo: pathlib.Path) -> None:
976 """--timeout 0 must not block."""
977 from muse.cli.commands.watch_coord import run
978 args = self._make_args(repo, once=False, timeout=0.0, json_out=True)
979 start = time.monotonic()
980 buf = io.StringIO()
981 with (
982 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
983 redirect_stdout(buf),
984 ):
985 run(args)
986 assert time.monotonic() - start < 1.0
987
988
989 # ─────────────────────────────────────────────────────────────────────────────
990 # End-to-end — backend selection
991 # ─────────────────────────────────────────────────────────────────────────────
992
993
994 class TestBackendSelection:
995 def test_polling_backend_wait_sleeps(self) -> None:
996 b = _PollingBackend(0.05)
997 start = time.monotonic()
998 result = b.wait(0.1)
999 elapsed = time.monotonic() - start
1000 assert result is True
1001 assert elapsed >= 0.04 # Should have slept.
1002 b.close()
1003
1004 def test_polling_backend_respects_timeout_cap(self) -> None:
1005 """wait(timeout) must not sleep longer than timeout."""
1006 b = _PollingBackend(10.0) # Long interval.
1007 start = time.monotonic()
1008 b.wait(0.05) # Short timeout.
1009 elapsed = time.monotonic() - start
1010 assert elapsed < 1.0 # Capped at 0.05 s.
1011 b.close()
1012
1013 def test_polling_backend_close_is_idempotent(self) -> None:
1014 b = _PollingBackend(1.0)
1015 b.close()
1016 b.close() # Should not raise.
1017
1018 @pytest.mark.skipif(
1019 not hasattr(__import__("select"), "kqueue"),
1020 reason="kqueue not available on this platform",
1021 )
1022 def test_kqueue_backend_initialises(self, repo: pathlib.Path) -> None:
1023 """kqueue backend can be created and closed without error."""
1024 from muse.cli.commands.watch_coord import _SUBDIRS
1025 dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS]
1026 b = _KqueueBackend(dirs)
1027 assert b.name == "kqueue"
1028 b.close()
1029
1030 @pytest.mark.skipif(
1031 not hasattr(__import__("select"), "kqueue"),
1032 reason="kqueue not available on this platform",
1033 )
1034 def test_kqueue_backend_detects_new_file(self, repo: pathlib.Path) -> None:
1035 """kqueue wakes when a new JSON file is added to a watched dir."""
1036 from muse.cli.commands.watch_coord import _SUBDIRS
1037 dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS]
1038 b = _KqueueBackend(dirs)
1039 try:
1040 # No change — should time out.
1041 result_before = b.wait(0.05)
1042 # Add a file.
1043 (_coord_dir(repo) / "reservations" / "newfile.json").write_text("{}")
1044 # Should detect change.
1045 result_after = b.wait(0.5)
1046 assert result_after is True
1047 finally:
1048 b.close()
1049
1050 @pytest.mark.skipif(
1051 not hasattr(__import__("select"), "kqueue"),
1052 reason="kqueue not available on this platform",
1053 )
1054 def test_kqueue_backend_close_releases_fds(self, repo: pathlib.Path) -> None:
1055 """All fds must be released after close."""
1056 import resource
1057 from muse.cli.commands.watch_coord import _SUBDIRS
1058 dirs = [_coord_dir(repo) / sub for sub in _SUBDIRS]
1059 fds_before = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
1060 b = _KqueueBackend(dirs)
1061 b.close()
1062 # Just ensure no exception — fd leak would only show in long-running tests.
1063
1064
1065 # ─────────────────────────────────────────────────────────────────────────────
1066 # Security tests
1067 # ─────────────────────────────────────────────────────────────────────────────
1068
1069
1070 class TestAnsiInjectionSanitized:
1071 def test_ansi_in_run_id_stripped_text_output(self) -> None:
1072 """ESC sequences in run_id must not reach stdout in text mode."""
1073 malicious = "\x1b[1;31mROOT\x1b[0m"
1074 ev = WatchEvent("added", "reservation", "a" * 8, "ts",
1075 {"run_id": malicious, "branch": "main", "addresses": ["f.py::fn"]})
1076 buf = io.StringIO()
1077 with redirect_stdout(buf):
1078 _emit_event(ev, as_json=False)
1079 assert "\x1b" not in buf.getvalue()
1080 assert "ROOT" in buf.getvalue()
1081
1082 def test_ansi_in_branch_stripped(self) -> None:
1083 malicious_branch = "\x1b[4mmaster\x1b[0m"
1084 ev = WatchEvent("added", "reservation", "b" * 8, "ts",
1085 {"run_id": "agent", "branch": malicious_branch, "addresses": ["f.py::fn"]})
1086 buf = io.StringIO()
1087 with redirect_stdout(buf):
1088 _emit_event(ev, as_json=False)
1089 assert "\x1b" not in buf.getvalue()
1090
1091 def test_ansi_in_address_stripped(self) -> None:
1092 malicious_addr = "\x1b[32msrc/malicious.py::inject\x1b[0m"
1093 ev = WatchEvent("added", "reservation", "c" * 8, "ts",
1094 {"run_id": "agent", "branch": "main", "addresses": [malicious_addr]})
1095 buf = io.StringIO()
1096 with redirect_stdout(buf):
1097 _emit_event(ev, as_json=False)
1098 assert "\x1b" not in buf.getvalue()
1099
1100 def test_json_output_preserves_raw_data(self) -> None:
1101 """JSON output must preserve raw data as-is (no sanitisation)."""
1102 malicious = "\x1b[31mmalicious\x1b[0m"
1103 ev = WatchEvent("added", "reservation", "d" * 8, "ts",
1104 {"run_id": malicious, "branch": "main", "addresses": ["f.py::fn"]})
1105 buf = io.StringIO()
1106 with redirect_stdout(buf):
1107 _emit_event(ev, as_json=True)
1108 parsed = json.loads(buf.getvalue())
1109 assert parsed["data"]["run_id"] == malicious # Stored as-is in JSON.
1110
1111
1112 class TestSymlinkDirRejected:
1113 @pytest.mark.skipif(
1114 not hasattr(__import__("select"), "kqueue"),
1115 reason="kqueue not available on this platform",
1116 )
1117 def test_symlinked_coord_dir_raises_valueerror(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None:
1118 """kqueue backend must refuse to watch a symlinked directory."""
1119 import shutil
1120 # Create a real target dir for the symlink to point at.
1121 real_dir = tmp_path / "real_target"
1122 real_dir.mkdir()
1123 link_dir = _coord_dir(repo) / "reservations"
1124 # Remove the existing real directory before replacing with a symlink.
1125 shutil.rmtree(link_dir, ignore_errors=True)
1126 link_dir.symlink_to(real_dir)
1127 try:
1128 dirs = [link_dir]
1129 with pytest.raises(ValueError, match="symlink"):
1130 _KqueueBackend(dirs)
1131 finally:
1132 link_dir.unlink(missing_ok=True)
1133 # Restore the real dir for other tests.
1134 link_dir.mkdir(exist_ok=True)
1135
1136
1137 class TestKindFilterAllowlist:
1138 def test_valid_kinds_accepted(self) -> None:
1139 for kind in ("reservations", "intents", "releases", "heartbeats"):
1140 assert _passes_filters(kind, {}, kind, None, None)
1141
1142 def test_arbitrary_kind_string_rejected_by_filter(self) -> None:
1143 """_passes_filters rejects unrecognised kinds when kind_filter is set."""
1144 assert not _passes_filters("../etc", {}, "reservations", None, None)
1145
1146 def test_argparse_rejects_invalid_kind(self) -> None:
1147 """argparse choices validation rejects invalid --kind values."""
1148 import argparse
1149 from muse.cli.commands.watch_coord import register
1150 p = argparse.ArgumentParser()
1151 subs = p.add_subparsers()
1152 register(subs)
1153 with pytest.raises(SystemExit):
1154 p.parse_args(["watch", "--kind", "invalid_kind"])
1155
1156
1157 # ─────────────────────────────────────────────────────────────────────────────
1158 # Stress tests
1159 # ─────────────────────────────────────────────────────────────────────────────
1160
1161
1162 class TestStress500RapidAdds:
1163 def test_500_reservations_all_detected(self, repo: pathlib.Path) -> None:
1164 """500 pre-existing reservations must all appear in the snapshot."""
1165 # Write 500 reservation files directly (bypass create_reservation for speed).
1166 res_dir = _coord_dir(repo) / "reservations"
1167 now_iso = now_utc_iso()
1168 exp_iso = (
1169 datetime.datetime.now(datetime.timezone.utc)
1170 + datetime.timedelta(hours=1)
1171 ).isoformat()
1172 ids = []
1173 for i in range(500):
1174 uid = _new_id()
1175 ids.append(uid)
1176 data = {
1177 "reservation_id": uid,
1178 "run_id": f"agent-{i}",
1179 "branch": "main",
1180 "addresses": [f"src/mod{i}.py::fn"],
1181 "created_at": now_iso,
1182 "expires_at": exp_iso,
1183 "operation": "modify",
1184 "schema_version": "1.0.0",
1185 }
1186 (res_dir / f"{uid}.json").write_text(json.dumps(data))
1187
1188 start = time.monotonic()
1189 events = _run_loop(repo, once=True)
1190 elapsed = time.monotonic() - start
1191
1192 snapshot_events = [e for e in events if e["event_type"] == "snapshot"]
1193 snapshot_ids = {e["id"] for e in snapshot_events}
1194 assert snapshot_ids == set(ids)
1195 assert elapsed < 3.0, f"500 snapshot took {elapsed:.2f}s (limit: 3s)"
1196
1197
1198 class TestStressLargeSnapshot:
1199 def test_1000_records_snapshot_under_1s(self, repo: pathlib.Path) -> None:
1200 """Snapshot of 1000 mixed records must complete in under 1 second."""
1201 now_iso = now_utc_iso()
1202 exp_iso = (
1203 datetime.datetime.now(datetime.timezone.utc)
1204 + datetime.timedelta(hours=1)
1205 ).isoformat()
1206 for i in range(250):
1207 for kind in ("reservations", "intents", "releases", "heartbeats"):
1208 uid = _new_id()
1209 (_coord_dir(repo) / kind / f"{uid}.json").write_text(
1210 json.dumps({"id": uid, "run_id": f"agent-{i}", "branch": "main",
1211 "created_at": now_iso, "expires_at": exp_iso})
1212 )
1213 start = time.monotonic()
1214 snap = _scan_dirs(repo)
1215 elapsed = time.monotonic() - start
1216 total = sum(len(v) for v in snap.values())
1217 assert total == 1000
1218 assert elapsed < 1.0, f"scan of 1000 records took {elapsed:.3f}s"
1219
1220
1221 class TestStressDiffPerformance:
1222 def test_diff_1000_entries_under_50ms(self) -> None:
1223 """Diffing two 1000-entry snapshots must take < 50 ms."""
1224 old: _Snapshot = {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")}
1225 new: _Snapshot = {sub: {} for sub in ("reservations", "intents", "releases", "heartbeats")}
1226 for i in range(250):
1227 for sub in ("reservations", "intents", "releases", "heartbeats"):
1228 uid = _new_id()
1229 old[sub][uid] = (i * 1000, i * 10)
1230 new[sub][uid] = (i * 1000, i * 10)
1231 # Add 10 new entries to make the diff non-trivial.
1232 for _ in range(10):
1233 new["reservations"][_new_id()] = (999999, 99)
1234
1235 start = time.monotonic()
1236 for _ in range(100): # 100 diff calls.
1237 _diff_snapshots(old, new)
1238 elapsed = time.monotonic() - start
1239 assert elapsed < 5.0, f"100 × diff took {elapsed:.3f}s (limit: 5s)"
1240
1241
1242 class TestStressManyExpirations:
1243 def test_200_expired_reservations_detected(self, repo: pathlib.Path) -> None:
1244 """200 expired reservations must all emit expired events."""
1245 res_dir = _coord_dir(repo) / "reservations"
1246 past_iso = (
1247 datetime.datetime.now(datetime.timezone.utc)
1248 - datetime.timedelta(hours=1)
1249 ).isoformat()
1250 now_iso = now_utc_iso()
1251 ids = set()
1252 for i in range(200):
1253 uid = _new_id()
1254 ids.add(uid)
1255 data = {
1256 "reservation_id": uid,
1257 "run_id": f"agent-{i}",
1258 "branch": "main",
1259 "addresses": [f"src/m{i}.py::fn"],
1260 "created_at": now_iso,
1261 "expires_at": past_iso, # Already expired.
1262 "operation": "modify",
1263 "schema_version": "1.0.0",
1264 }
1265 (res_dir / f"{uid}.json").write_text(json.dumps(data))
1266
1267 # prev_active_ids claims all 200 were active.
1268 start = time.monotonic()
1269 exp_events, curr = _check_expirations(repo, ids, set())
1270 elapsed = time.monotonic() - start
1271
1272 expired_ids = {e.id for e in exp_events}
1273 assert expired_ids == ids
1274 assert elapsed < 2.0, f"200 expiration checks took {elapsed:.3f}s (limit: 2s)"
1275
1276
1277 # ─────────────────────────────────────────────────────────────────────────────
1278 # Integration — _watch_loop with max_events
1279 # ─────────────────────────────────────────────────────────────────────────────
1280
1281
1282 class TestWatchLoopMaxEvents:
1283 def test_max_events_1_returns_single_snapshot(self, repo: pathlib.Path) -> None:
1284 """max_events=1 must emit exactly 1 event even if more exist."""
1285 for i in range(5):
1286 _make_reservation(repo, run_id=f"ag-{i}")
1287 events = _run_loop(repo, once=True, max_events=1)
1288 assert len(events) == 1
1289
1290 def test_max_events_3_caps_at_3(self, repo: pathlib.Path) -> None:
1291 for i in range(10):
1292 _make_reservation(repo, run_id=f"ag-{i}")
1293 events = _run_loop(repo, once=True, max_events=3)
1294 assert len(events) == 3
1295
1296 def test_max_events_larger_than_existing_returns_all(self, repo: pathlib.Path) -> None:
1297 for i in range(4):
1298 _make_reservation(repo, run_id=f"ag-{i}")
1299 events = _run_loop(repo, once=True, max_events=100)
1300 assert len(events) == 4
1301
1302 def test_max_events_zero_is_rejected_by_run(self, repo: pathlib.Path) -> None:
1303 """run() must reject max_events=0 with USER_ERROR."""
1304 from muse.cli.commands.watch_coord import run as watch_run
1305 ns = argparse.Namespace(
1306 once=True, timeout=None, max_events=0,
1307 poll_interval=0.1, run_id=None,
1308 branch_filter=None, kind=None, json_out=True,
1309 )
1310 buf = io.StringIO()
1311 with (
1312 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
1313 redirect_stdout(buf),
1314 ):
1315 with pytest.raises(SystemExit) as exc_info:
1316 watch_run(ns)
1317 assert exc_info.value.code == ExitCode.USER_ERROR
1318
1319 def test_max_events_negative_is_rejected_by_run(self, repo: pathlib.Path) -> None:
1320 from muse.cli.commands.watch_coord import run as watch_run
1321 ns = argparse.Namespace(
1322 once=True, timeout=None, max_events=-5,
1323 poll_interval=0.1, run_id=None,
1324 branch_filter=None, kind=None, json_out=True,
1325 )
1326 buf = io.StringIO()
1327 with (
1328 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
1329 redirect_stdout(buf),
1330 ):
1331 with pytest.raises(SystemExit) as exc_info:
1332 watch_run(ns)
1333 assert exc_info.value.code == ExitCode.USER_ERROR
1334
1335 def test_max_events_caps_added_events_in_loop(self, repo: pathlib.Path) -> None:
1336 """max_events must also cap events emitted during the loop (not just snapshots)."""
1337 # No pre-existing records; add 10 during the side effect.
1338 def _add_records() -> None:
1339 for i in range(10):
1340 _make_reservation(repo, run_id=f"ag-{i}")
1341
1342 events = _run_loop(
1343 repo,
1344 once=False,
1345 timeout=1.0,
1346 side_effect=_add_records,
1347 max_events=3,
1348 )
1349 assert len(events) <= 3
1350
1351 def test_max_events_json_error_is_compact(self, repo: pathlib.Path) -> None:
1352 """Error for bad max_events in JSON mode must be a single compact line."""
1353 from muse.cli.commands.watch_coord import run as watch_run
1354 ns = argparse.Namespace(
1355 once=True, timeout=None, max_events=0,
1356 poll_interval=0.1, run_id=None,
1357 branch_filter=None, kind=None, json_out=True,
1358 )
1359 buf = io.StringIO()
1360 with (
1361 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
1362 redirect_stdout(buf),
1363 ):
1364 with pytest.raises(SystemExit):
1365 watch_run(ns)
1366 out = buf.getvalue().strip()
1367 assert "\n" not in out
1368 data = json.loads(out)
1369 assert "error" in data
1370
1371
1372 # ─────────────────────────────────────────────────────────────────────────────
1373 # End-to-end — run() input validation
1374 # ─────────────────────────────────────────────────────────────────────────────
1375
1376
1377 class TestRunCommandValidation:
1378 def _make_args(self, **kwargs: MsgpackValue) -> argparse.Namespace:
1379 ns = argparse.Namespace()
1380 ns.once = kwargs.get("once", True)
1381 ns.timeout = kwargs.get("timeout", None)
1382 ns.max_events = kwargs.get("max_events", None)
1383 ns.poll_interval = kwargs.get("poll_interval", 0.1)
1384 ns.run_id = kwargs.get("run_id", None)
1385 ns.branch_filter = kwargs.get("branch_filter", None)
1386 ns.kind = kwargs.get("kind", None)
1387 ns.json_out = kwargs.get("json_out", False)
1388 return ns
1389
1390 def test_run_id_at_max_length_accepted(self, repo: pathlib.Path) -> None:
1391 from muse.cli.commands.watch_coord import run as watch_run
1392 run_id = "a" * _MAX_RUN_ID_LEN
1393 ns = self._make_args(run_id=run_id, once=True)
1394 buf = io.StringIO()
1395 with (
1396 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
1397 redirect_stdout(buf),
1398 ):
1399 watch_run(ns) # must not raise
1400
1401 def test_run_id_over_max_exits_user_error(self, repo: pathlib.Path) -> None:
1402 from muse.cli.commands.watch_coord import run as watch_run
1403 run_id = "a" * (_MAX_RUN_ID_LEN + 1)
1404 ns = self._make_args(run_id=run_id)
1405 with pytest.raises(SystemExit) as exc_info:
1406 watch_run(ns)
1407 assert exc_info.value.code == ExitCode.USER_ERROR
1408
1409 def test_run_id_over_max_json_returns_error_field(self, repo: pathlib.Path) -> None:
1410 from muse.cli.commands.watch_coord import run as watch_run
1411 run_id = "a" * (_MAX_RUN_ID_LEN + 1)
1412 ns = self._make_args(run_id=run_id, json_out=True)
1413 buf = io.StringIO()
1414 with redirect_stdout(buf):
1415 with pytest.raises(SystemExit):
1416 watch_run(ns)
1417 out = buf.getvalue().strip()
1418 assert "\n" not in out # Compact JSON.
1419 data = json.loads(out)
1420 assert "error" in data
1421 assert data.get("status") == "bad_args"
1422
1423 def test_poll_interval_below_min_exits_user_error(self, repo: pathlib.Path) -> None:
1424 from muse.cli.commands.watch_coord import run as watch_run
1425 ns = self._make_args(poll_interval=0.001)
1426 with pytest.raises(SystemExit) as exc_info:
1427 watch_run(ns)
1428 assert exc_info.value.code == ExitCode.USER_ERROR
1429
1430 def test_poll_interval_above_max_exits_user_error(self, repo: pathlib.Path) -> None:
1431 from muse.cli.commands.watch_coord import run as watch_run
1432 ns = self._make_args(poll_interval=99999.0)
1433 with pytest.raises(SystemExit) as exc_info:
1434 watch_run(ns)
1435 assert exc_info.value.code == ExitCode.USER_ERROR
1436
1437 def test_poll_interval_at_min_accepted(self, repo: pathlib.Path) -> None:
1438 from muse.cli.commands.watch_coord import run as watch_run
1439 ns = self._make_args(poll_interval=_MIN_POLL_INTERVAL, once=True)
1440 buf = io.StringIO()
1441 with (
1442 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
1443 redirect_stdout(buf),
1444 ):
1445 watch_run(ns) # must not raise
1446
1447 def test_poll_interval_at_max_accepted(self, repo: pathlib.Path) -> None:
1448 from muse.cli.commands.watch_coord import run as watch_run
1449 ns = self._make_args(poll_interval=_MAX_POLL_INTERVAL, once=True)
1450 buf = io.StringIO()
1451 with (
1452 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
1453 redirect_stdout(buf),
1454 ):
1455 watch_run(ns) # must not raise
1456
1457 def test_timeout_negative_exits_user_error(self, repo: pathlib.Path) -> None:
1458 from muse.cli.commands.watch_coord import run as watch_run
1459 ns = self._make_args(timeout=-1.0)
1460 with pytest.raises(SystemExit) as exc_info:
1461 watch_run(ns)
1462 assert exc_info.value.code == ExitCode.USER_ERROR
1463
1464 def test_timeout_zero_accepted_as_once(self, repo: pathlib.Path) -> None:
1465 """--timeout 0 must complete quickly (treated as --once)."""
1466 from muse.cli.commands.watch_coord import run as watch_run
1467 ns = self._make_args(timeout=0.0, json_out=True)
1468 buf = io.StringIO()
1469 start = time.monotonic()
1470 with (
1471 patch("muse.cli.commands.watch_coord.require_repo", return_value=repo),
1472 redirect_stdout(buf),
1473 ):
1474 watch_run(ns)
1475 assert time.monotonic() - start < 1.0
1476
1477 def test_validation_fires_before_require_repo(self) -> None:
1478 """Bad --run-id must fail before require_repo() is ever called."""
1479 from muse.cli.commands.watch_coord import run as watch_run
1480 run_id = "x" * (_MAX_RUN_ID_LEN + 1)
1481 ns = argparse.Namespace(
1482 once=True, timeout=None, max_events=None,
1483 poll_interval=0.1, run_id=run_id,
1484 branch_filter=None, kind=None, json_out=False,
1485 )
1486 called = []
1487 with patch(
1488 "muse.cli.commands.watch_coord.require_repo",
1489 side_effect=lambda: called.append(True),
1490 ):
1491 with pytest.raises(SystemExit):
1492 watch_run(ns)
1493 assert called == [], "require_repo must not be called before validation passes"
1494
1495 def test_error_message_has_check_mark_prefix(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1496 """Validation error on stderr must start with ❌."""
1497 from muse.cli.commands.watch_coord import run as watch_run
1498 ns = self._make_args(poll_interval=0.0001)
1499 with pytest.raises(SystemExit):
1500 watch_run(ns)
1501 err = capsys.readouterr().err
1502 assert "❌" in err
1503
1504
1505 # ─────────────────────────────────────────────────────────────────────────────
1506 # Stress — max_events under load
1507 # ─────────────────────────────────────────────────────────────────────────────
1508
1509
1510 class TestStressMaxEvents:
1511 def test_max_events_10_from_500_records_returns_exactly_10(self, repo: pathlib.Path) -> None:
1512 """max_events=10 must return exactly 10 events even with 500 records."""
1513 res_dir = _coord_dir(repo) / "reservations"
1514 now_iso = now_utc_iso()
1515 exp_iso = (
1516 datetime.datetime.now(datetime.timezone.utc)
1517 + datetime.timedelta(hours=1)
1518 ).isoformat()
1519 for i in range(500):
1520 uid = _new_id()
1521 data = {
1522 "reservation_id": uid,
1523 "run_id": f"agent-{i}",
1524 "branch": "main",
1525 "addresses": [f"src/mod{i}.py::fn"],
1526 "created_at": now_iso,
1527 "expires_at": exp_iso,
1528 "operation": "modify",
1529 "schema_version": "1.0.0",
1530 }
1531 (res_dir / f"{uid}.json").write_text(json.dumps(data))
1532
1533 start = time.monotonic()
1534 events = _run_loop(repo, once=True, max_events=10)
1535 elapsed = time.monotonic() - start
1536
1537 assert len(events) == 10
1538 assert elapsed < 2.0, f"max_events cap with 500 records took {elapsed:.2f}s"
1539
1540
1541 # ---------------------------------------------------------------------------
1542 # Flag registration
1543 # ---------------------------------------------------------------------------
1544
1545
1546 class TestRegisterFlags:
1547 def _parse(self, *args: str) -> "argparse.Namespace":
1548 import argparse
1549 from muse.cli.commands.watch_coord import register
1550 p = argparse.ArgumentParser()
1551 sub = p.add_subparsers()
1552 register(sub)
1553 return p.parse_args(["watch", *args])
1554
1555 def test_default_json_out_is_false(self) -> None:
1556 ns = self._parse()
1557 assert ns.json_out is False
1558
1559 def test_json_flag_sets_json_out(self) -> None:
1560 ns = self._parse("--json")
1561 assert ns.json_out is True
1562
1563 def test_j_shorthand_sets_json_out(self) -> None:
1564 ns = self._parse("-j")
1565 assert ns.json_out is True
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 29 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago