gabriel / muse public
watch_coord.py python
976 lines 37.1 KB
Raw
sha256:1ddad36d76d3a8d323f9b3664169cb184b7a38b39208214a2ae504154260826f fix: show full cryptographic IDs in all human-readable CLI output Sonnet 4.6 patch 8 days ago
1 """``muse coord watch`` — stream coordination events in real time.
2
3 Watches ``.muse/coordination/`` for file-system changes and emits a structured
4 event for every coordination record that is added, modified, removed, or
5 expired. On macOS/BSD the kqueue kernel interface is used, so the process
6 sleeps in the kernel and wakes **only** when a file actually changes — zero
7 CPU overhead between events. On Linux and other platforms a configurable
8 polling fallback is used instead.
9
10 This replaces the polling anti-pattern where agents repeatedly call
11 ``muse coord list`` to detect swarm state changes. Piping
12 ``muse coord watch --json`` gives an agent an event-driven coordination bus.
13
14 Event types
15 -----------
16 ``snapshot``
17 Emitted once per existing record immediately after startup, before any
18 change events. Agents should consume all ``snapshot`` events first to
19 build an accurate initial picture of the swarm, then react to deltas.
20
21 ``added``
22 A new coordination file appeared on disk (e.g. another agent just called
23 ``muse coord reserve``).
24
25 ``modified``
26 An existing file changed (e.g. a heartbeat that extended a reservation's
27 TTL, or an intent that was amended).
28
29 ``removed``
30 A file was deleted from disk (e.g. GC cleaned up an expired reservation
31 with ``muse coord gc --execute``). The event carries the last-known data
32 for the record so filters still work correctly.
33
34 ``expired``
35 A reservation that was previously active has crossed its effective
36 expiry timestamp. The file may still exist on disk (not GC'd yet) —
37 this event fires when ``active_reservations()`` stops returning it.
38
39 Event kinds
40 -----------
41 ``reservation`` — ``.muse/coordination/reservations/<id>.json``
42 ``intent`` — ``.muse/coordination/intents/<id>.json``
43 ``release`` — ``.muse/coordination/releases/<id>.json``
44 ``heartbeat`` — ``.muse/coordination/heartbeats/<id>.json``
45
46 Usage::
47
48 muse coord watch # stream all events, run forever
49 muse coord watch --once # emit current state as snapshots, then exit
50 muse coord watch --timeout 60 # run for 60 seconds then exit
51 muse coord watch --max-events 10 # exit after receiving 10 events
52 muse coord watch --kind reservation # only reservation events
53 muse coord watch --run-id agent-42 # only events from agent-42
54 muse coord watch --branch feature/x # only events on feature/x
55 muse coord watch --json # NDJSON output (one event per line)
56 muse coord watch --poll-interval 0.5 # faster fallback poll cadence
57
58 JSON output schema (NDJSON — one JSON object per line)::
59
60 {
61 "schema_version": str,
62 "event_type": "snapshot" | "added" | "modified" | "removed" | "expired",
63 "kind": "reservation" | "intent" | "release" | "heartbeat",
64 "id": str, // content-addressed ID of the changed record
65 "timestamp": str, // ISO 8601 UTC — when the event was generated
66 "data": dict | {} // parsed record content; empty for "removed"
67 // if the cache had no prior data
68 }
69
70 Text output example::
71
72 muse coord watch — watching .muse/coordination/ (kqueue)
73 ──────────────────────────────────────────────────────────────
74 · reservation agent-42@feature/billing a1b2c3d4 [billing.py::compute_total]
75 + reservation agent-99@main e5f6a7b8 [auth.py::verify_token]
76 ~ heartbeat agent-42 a1b2c3d4 extends to 2026-03-30T15:00Z
77 ✓ release agent-42 a1b2c3d4 completed (res: a1b2c3d4)
78 ! expired agent-99@main e5f6a7b8 [auth.py::verify_token]
79
80 Flags::
81
82 --once Emit current state as snapshot events, then exit.
83 --timeout SECONDS Stop watching after N seconds (>= 0; 0 = --once).
84 --max-events N Exit after emitting N events (>= 1). Useful for
85 batch processing without a time-based timeout.
86 --poll-interval SECS Polling/kqueue-timeout interval in seconds
87 (default 1.0; range [0.01, 3600]).
88 --run-id RUNID Only emit events where data.run_id == RUNID.
89 Maximum 256 characters.
90 --branch BRANCH Only emit events where data.branch == BRANCH.
91 --kind KIND Only emit events for this kind
92 (reservation | intent | release | heartbeat).
93 --format / -f Output format: text (default) or json.
94 --json Shorthand for --format json.
95
96 Security::
97
98 All agent-supplied strings are passed through
99 :func:`~muse.core.validation.sanitize_display` before printing to strip
100 ANSI escape sequences and terminal control characters.
101
102 The ``--kind`` argument is validated against an allowlist before use.
103 The coordination directories are checked for symlink attacks before any
104 kqueue fd is opened.
105
106 Performance::
107
108 kqueue backend O(0) CPU between events — woken by the kernel only when
109 a directory changes. Handles ``write_text_atomic``'s
110 mkstemp+rename pattern correctly (confirmed via testing).
111
112 polling backend O(n) scan every ``--poll-interval`` seconds where n is
113 the total number of JSON files across all 4 coord dirs.
114
115 Record cache The last-known data for every live record is kept in
116 memory so ``removed`` events can carry the record's data
117 even after the file is gone. The cache is bounded by the
118 number of live coordination records (typically ≪ 10 000).
119
120 Exit codes::
121
122 0 — normal exit (--once, --timeout, --max-events, SIGINT, or SIGTERM)
123 1 — bad arguments or backend initialisation failure
124 """
125
126 import argparse
127 import dataclasses
128 import json
129 import logging
130 import os
131 import pathlib
132 import select
133 import signal
134 import sys
135 import time
136 import types
137
138 from muse import __version__
139 from muse.core.types import load_json_file, now_utc_iso
140 from muse.core.paths import coordination_dir as _coordination_dir
141 from muse.core.coordination import active_reservations
142 from muse.core.errors import ExitCode
143 from muse.core.repo import require_repo
144 from muse.core.validation import sanitize_display
145
146 type _IconMap = dict[str, str]
147 logger = logging.getLogger(__name__)
148
149 # JSON-compatible value type for parsed coordination record payloads.
150 JsonValue = str | int | float | bool | None | list["JsonValue"] | "JsonDict"
151 JsonDict = dict[str, JsonValue]
152
153 # ── Input constraints ─────────────────────────────────────────────────────────
154
155 #: Maximum byte-length of the ``--run-id`` filter value. Matches the cap
156 #: applied to run-id in all other coordination commands.
157 _MAX_RUN_ID_LEN: int = 256
158
159 #: Allowed range for ``--poll-interval`` (seconds).
160 _MIN_POLL_INTERVAL: float = 0.01
161 _MAX_POLL_INTERVAL: float = 3600.0
162
163 # ── Directory layout ──────────────────────────────────────────────────────────
164
165 _SUBDIRS: tuple[str, ...] = ("reservations", "intents", "releases", "heartbeats", "tasks", "claims", "dependencies")
166 _VALID_KINDS: frozenset[str] = frozenset(_SUBDIRS)
167
168 def _coord_dir(root: pathlib.Path) -> pathlib.Path:
169 """Return ``.muse/coordination/`` for *root*."""
170 return _coordination_dir(root)
171
172 def _ensure_coord_dirs(root: pathlib.Path) -> None:
173 """Create all four coordination subdirectories if they do not exist."""
174 for sub in _SUBDIRS:
175 (_coord_dir(root) / sub).mkdir(parents=True, exist_ok=True)
176
177 # ── Data model ────────────────────────────────────────────────────────────────
178
179 # Internal type alias: kind → {record_id → (mtime_ns, size)}
180 _SnapshotInner = dict[str, tuple[int, int]]
181 _Snapshot = dict[str, _SnapshotInner]
182
183 @dataclasses.dataclass
184 class WatchEvent:
185 """A single coordination event emitted by the watch loop.
186
187 Attributes
188 ----------
189 event_type:
190 One of ``"snapshot"``, ``"added"``, ``"modified"``, ``"removed"``,
191 or ``"expired"``.
192 kind:
193 One of ``"reservation"``, ``"intent"``, ``"release"``,
194 or ``"heartbeat"``.
195 id:
196 content-addressed ID of the coordination record that changed.
197 timestamp:
198 ISO 8601 UTC string — when this event was generated (not when the
199 underlying file change occurred).
200 data:
201 Parsed JSON content of the record. Empty dict for ``"removed"``
202 events when no cached data exists.
203 """
204
205 event_type: str
206 kind: str
207 id: str
208 timestamp: str
209 data: JsonDict
210
211 def to_dict(self) -> JsonDict:
212 """Serialise to the public JSON schema."""
213 return {
214 "schema_version": __version__,
215 "event_type": self.event_type,
216 "kind": self.kind,
217 "id": self.id,
218 "timestamp": self.timestamp,
219 "data": self.data,
220 }
221
222 # ── Backend ABC ───────────────────────────────────────────────────────────────
223
224 class _Backend:
225 """Abstract base for kqueue and polling backends.
226
227 Subclasses implement :meth:`wait` and :meth:`close`. The ``name``
228 attribute identifies the backend for diagnostic output.
229 """
230
231 name: str = "unknown"
232
233 def wait(self, timeout: float) -> bool:
234 """Block up to *timeout* seconds.
235
236 Returns ``True`` if at least one FS change was detected (kqueue), or
237 ``True`` unconditionally after sleeping (polling). Returns ``False``
238 only if the backend times out with certainty that nothing changed
239 (never happens in the polling backend).
240 """
241 raise NotImplementedError
242
243 def close(self) -> None:
244 """Release all OS resources held by this backend."""
245
246 # ── kqueue backend (macOS / BSD) ──────────────────────────────────────────────
247
248 class _KqueueBackend(_Backend):
249 """File-system event watcher using BSD kqueue.
250
251 Opens one ``O_RDONLY`` file descriptor per coordination subdirectory and
252 registers a ``KQ_FILTER_VNODE / KQ_NOTE_WRITE`` interest. The ``wait``
253 call blocks in the kernel until a directory entry is added, removed, or
254 replaced (which ``write_text_atomic``'s mkstemp+rename pattern triggers).
255
256 Parameters
257 ----------
258 dirs:
259 List of directory paths to watch. Each directory is created if it
260 does not exist. A symlink-swap attack (symlinked directory pointing
261 outside the repo) raises :exc:`ValueError` before any fd is opened.
262
263 Raises
264 ------
265 ValueError
266 If any watched directory is a symlink.
267 OSError
268 If a directory cannot be opened for watching.
269 """
270
271 name = "kqueue"
272
273 def __init__(self, dirs: list[pathlib.Path]) -> None:
274 self._kq = select.kqueue()
275 self._fds: list[int] = []
276 kevents = []
277 for d in dirs:
278 d.mkdir(parents=True, exist_ok=True)
279 # Symlink-swap guard: reject symlinked directories.
280 if d.is_symlink():
281 self._kq.close()
282 raise ValueError(
283 f"Coordination directory is a symlink — refusing to watch: {d}"
284 )
285 fd = os.open(str(d), os.O_RDONLY)
286 self._fds.append(fd)
287 kevents.append(select.kevent(
288 fd,
289 filter=select.KQ_FILTER_VNODE,
290 flags=(
291 select.KQ_EV_ADD
292 | select.KQ_EV_ENABLE
293 | select.KQ_EV_CLEAR
294 ),
295 fflags=(
296 select.KQ_NOTE_WRITE
297 | select.KQ_NOTE_DELETE
298 | select.KQ_NOTE_EXTEND
299 | select.KQ_NOTE_RENAME
300 ),
301 ))
302 # Register all events in one syscall.
303 self._kq.control(kevents, 0)
304
305 def wait(self, timeout: float) -> bool:
306 """Block up to *timeout* seconds; return True if any FS change fired."""
307 events = self._kq.control(None, 64, max(0.0, timeout))
308 return len(events) > 0
309
310 def close(self) -> None:
311 """Close the kqueue and all watched directory fds."""
312 try:
313 self._kq.close()
314 except OSError:
315 pass
316 for fd in self._fds:
317 try:
318 os.close(fd)
319 except OSError:
320 pass
321 self._fds.clear()
322
323 # ── Polling backend (Linux / other) ──────────────────────────────────────────
324
325 class _PollingBackend(_Backend):
326 """mtime-scan polling backend for platforms without kqueue.
327
328 Sleeps for *interval* seconds between directory scans. Always returns
329 ``True`` from :meth:`wait` so the caller always runs a full diff.
330
331 Parameters
332 ----------
333 interval:
334 Polling interval in seconds (clamped to [0.01, 3600]).
335 """
336
337 name = "polling"
338
339 def __init__(self, interval: float) -> None:
340 self._interval = max(0.01, min(3600.0, interval))
341
342 def wait(self, timeout: float) -> bool:
343 """Sleep for ``min(interval, timeout)`` seconds; always returns True."""
344 time.sleep(min(self._interval, max(0.0, timeout)))
345 return True
346
347 def close(self) -> None:
348 pass
349
350 # ── Core scanning and diffing ─────────────────────────────────────────────────
351
352 def _scan_dirs(root: pathlib.Path) -> _Snapshot:
353 """Snapshot the mtime_ns and size of every ``.json`` file in all coord dirs.
354
355 Returns a mapping ``{kind: {record_id: (mtime_ns, size_bytes)}}``. Directories
356 that do not exist yet are returned as empty dicts. Files that disappear
357 between the glob and the stat call are silently skipped.
358
359 This function is O(n) where n is the total number of JSON files across the
360 four coordination subdirectories. On a typical swarm with hundreds of
361 agents this is a single directory scan per kind, which is very fast.
362 """
363 snap: _Snapshot = {sub: {} for sub in _SUBDIRS}
364 coord = _coord_dir(root)
365 for sub in _SUBDIRS:
366 d = coord / sub
367 if not d.exists():
368 continue
369 for path in d.glob("*.json"):
370 try:
371 st = path.stat()
372 snap[sub][path.stem] = (st.st_mtime_ns, st.st_size)
373 except OSError:
374 pass # Deleted between glob and stat — skip gracefully.
375 return snap
376
377 def _diff_snapshots(
378 old: _Snapshot,
379 new: _Snapshot,
380 ) -> list[tuple[str, str, str]]:
381 """Compute the delta between two snapshots.
382
383 Returns a list of ``(event_type, kind, record_id)`` tuples:
384
385 * ``"added"`` — record_id exists in *new* but not *old*
386 * ``"removed"`` — record_id exists in *old* but not *new*
387 * ``"modified"`` — record_id exists in both but ``(mtime_ns, size)`` changed
388
389 Results are sorted by kind (canonical ``_SUBDIRS`` order) and then by record_id
390 within each kind for deterministic test output.
391 """
392 events: list[tuple[str, str, str]] = []
393 for sub in _SUBDIRS:
394 old_sub = old.get(sub, {})
395 new_sub = new.get(sub, {})
396 old_ids = set(old_sub)
397 new_ids = set(new_sub)
398 for uid in sorted(new_ids - old_ids):
399 events.append(("added", sub, uid))
400 for uid in sorted(old_ids - new_ids):
401 events.append(("removed", sub, uid))
402 for uid in sorted(old_ids & new_ids):
403 if old_sub[uid] != new_sub[uid]:
404 events.append(("modified", sub, uid))
405 return events
406
407 def _load_record(root: pathlib.Path, kind: str, uid: str) -> JsonDict | None:
408 """Load and parse the JSON record for *kind*/*uid*.
409
410 Returns ``None`` if the file does not exist or cannot be parsed. This
411 handles the TOCTOU window between snapshot and load gracefully.
412 """
413 path = _coord_dir(root) / kind / f"{uid}.json"
414 return load_json_file(path)
415
416 # ── Filtering ─────────────────────────────────────────────────────────────────
417
418 def _passes_filters(
419 kind: str,
420 data: JsonDict,
421 kind_filter: str | None,
422 run_id_filter: str | None,
423 branch_filter: str | None,
424 ) -> bool:
425 """Return True if an event with the given *kind* and *data* passes all filters.
426
427 Filters compose with AND semantics:
428
429 * ``kind_filter`` — exact match on the event kind string.
430 * ``run_id_filter`` — exact match on ``data["run_id"]``.
431 * ``branch_filter`` — exact match on ``data["branch"]``.
432
433 An event with no ``run_id`` or ``branch`` field in *data* (e.g. a
434 ``removed`` event with empty data) will NOT pass run_id/branch filters.
435 This is intentional — the record cache ensures removed events carry cached
436 data so filters still work for known records.
437 """
438 if kind_filter and kind != kind_filter:
439 return False
440 if run_id_filter and data.get("run_id") != run_id_filter:
441 return False
442 if branch_filter and data.get("branch") != branch_filter:
443 return False
444 return True
445
446 # ── Event emission ────────────────────────────────────────────────────────────
447
448 def _make_event(
449 event_type: str,
450 kind: str,
451 uid: str,
452 data: JsonDict,
453 ) -> WatchEvent:
454 """Construct a :class:`WatchEvent` stamped with the current UTC time."""
455 return WatchEvent(
456 event_type=event_type,
457 kind=kind,
458 id=uid,
459 timestamp=now_utc_iso(),
460 data=data,
461 )
462
463 # Icon prefixes for text output.
464 _TEXT_ICONS: _IconMap = {
465 "snapshot": "·",
466 "added": "+",
467 "modified": "~",
468 "removed": "-",
469 "expired": "!",
470 }
471
472 def _emit_event(ev: WatchEvent, as_json: bool) -> None:
473 """Print *ev* to stdout in JSON (NDJSON) or human-readable text format.
474
475 **JSON mode**: one JSON object per line, flushed immediately so agents
476 reading from a pipe receive events without buffering.
477
478 **Text mode**: each line is ``<icon> <kind> <summary>`` — concise enough
479 to scan in a terminal but includes the ID prefix so events can be
480 correlated with ``muse coord list`` output.
481
482 All agent-supplied strings (run_id, branch, addresses, reason) are passed
483 through :func:`~muse.core.validation.sanitize_display` before printing.
484 """
485 if as_json:
486 print(json.dumps(ev.to_dict()), flush=True)
487 return
488
489 icon = _TEXT_ICONS.get(ev.event_type, "?")
490 uid_short = sanitize_display(ev.id[:8])
491 kind_col = f"{ev.kind:<12}"
492
493 data = ev.data
494 if not data:
495 # Removed event with empty cache or expiration with no data.
496 print(f"{icon} {kind_col} {uid_short}", flush=True)
497 return
498
499 run_id = sanitize_display(data.get("run_id", "?"))
500 branch = sanitize_display(data.get("branch") or "")
501 agent_str = f"{run_id}@{branch}" if branch else run_id
502
503 if ev.kind == "reservation":
504 addrs: list[str] = data.get("addresses", [])
505 addr_parts = [sanitize_display(a) for a in addrs[:3]]
506 suffix = f" +{len(addrs) - 3} more" if len(addrs) > 3 else ""
507 addr_str = ", ".join(addr_parts) + suffix
508 print(
509 f"{icon} {kind_col} {agent_str:<32} {uid_short} [{addr_str}]",
510 flush=True,
511 )
512
513 elif ev.kind == "intent":
514 op = sanitize_display(data.get("operation", "?"))
515 addrs = data.get("addresses", [])
516 first_addr = sanitize_display(addrs[0]) if addrs else "?"
517 print(
518 f"{icon} {kind_col} {agent_str:<32} {uid_short} {op} {first_addr}",
519 flush=True,
520 )
521
522 elif ev.kind == "release":
523 reason = sanitize_display(data.get("reason", "?"))
524 res_id = sanitize_display((data.get("reservation_id") or "")[:8])
525 print(
526 f"{icon} {kind_col} {run_id:<32} {uid_short} {reason} (res: {res_id})",
527 flush=True,
528 )
529
530 elif ev.kind == "heartbeat":
531 ext = sanitize_display(data.get("extended_expires_at", "?"))
532 res_id = sanitize_display((data.get("reservation_id") or "")[:8])
533 print(
534 f"{icon} {kind_col} {run_id:<32} {uid_short} extends to {ext} (res: {res_id})",
535 flush=True,
536 )
537
538 else:
539 print(f"{icon} {kind_col} {agent_str:<32} {uid_short}", flush=True)
540
541 # ── Expiration detection ──────────────────────────────────────────────────────
542
543 def _check_expirations(
544 root: pathlib.Path,
545 prev_active_ids: set[str],
546 removed_ids: set[str],
547 ) -> tuple[list[WatchEvent], set[str]]:
548 """Detect reservations that crossed their effective expiry since last check.
549
550 Compares the set of currently-active reservation IDs (from
551 :func:`~muse.core.coordination.active_reservations`) against
552 *prev_active_ids*. Any ID that was previously active but is no longer —
553 AND was not explicitly removed from disk this cycle — is treated as
554 expired.
555
556 Parameters
557 ----------
558 root:
559 Repository root.
560 prev_active_ids:
561 IDs that were active on the previous call.
562 removed_ids:
563 IDs of reservations whose JSON files were deleted this cycle.
564 These are already emitted as ``removed`` events and must not be
565 double-counted as ``expired``.
566
567 Returns
568 -------
569 (expiry_events, current_active_ids)
570 The list of new expiration events and the refreshed active-ID set
571 (to be passed as *prev_active_ids* on the next call).
572 """
573 curr_active = active_reservations(root)
574 curr_active_ids: set[str] = {r.reservation_id for r in curr_active}
575
576 events: list[WatchEvent] = []
577 for uid in prev_active_ids - curr_active_ids:
578 if uid in removed_ids:
579 continue # GC'd reservation — already covered by a removed event.
580 data = _load_record(root, "reservations", uid) or {}
581 events.append(_make_event("expired", "reservations", uid, data))
582
583 return events, curr_active_ids
584
585 # ── Core watch loop ───────────────────────────────────────────────────────────
586
587 def _watch_loop(
588 root: pathlib.Path,
589 backend: _Backend,
590 *,
591 kind_filter: str | None,
592 run_id_filter: str | None,
593 branch_filter: str | None,
594 as_json: bool,
595 once: bool,
596 timeout: float | None,
597 poll_interval: float,
598 max_events: int | None = None,
599 ) -> None:
600 """Core event loop — scan, diff, emit, repeat.
601
602 This function is deliberately separated from :func:`run` so that tests can
603 inject a mock backend without subprocess overhead.
604
605 Algorithm
606 ---------
607 1. **Snapshot**: scan all four coord dirs and build
608 ``{kind: {record_id: (mtime_ns, size)}}``.
609 2. **Snapshot events**: emit one ``snapshot`` event per existing record
610 (filtered). This gives agents an accurate initial picture of swarm
611 state before any deltas.
612 3. **Record cache**: store last-known data for every record in memory so
613 ``removed`` events carry the record's content even after the file is
614 gone. This is critical for ``--run-id`` / ``--branch`` filters to
615 work correctly on removal events.
616 4. **Loop**: wait for the backend (kqueue event or poll interval), rescan,
617 diff against previous snapshot, emit change events, check expirations.
618 5. **Expiration**: after each loop iteration, compare the set of active
619 reservation IDs against the previous set. IDs that left the active set
620 without being explicitly removed fire ``expired`` events.
621 6. **Max-events cap**: if *max_events* is set, the loop exits as soon as
622 the total number of emitted events (snapshot + change) reaches the cap.
623
624 Parameters
625 ----------
626 root:
627 Repository root (directory containing ``.muse/``).
628 backend:
629 Initialised :class:`_KqueueBackend` or :class:`_PollingBackend`.
630 kind_filter:
631 If set, only emit events for this kind.
632 run_id_filter:
633 If set, only emit events where ``data["run_id"]`` matches.
634 branch_filter:
635 If set, only emit events where ``data["branch"]`` matches.
636 as_json:
637 If True, output NDJSON; otherwise human-readable text.
638 once:
639 If True, emit snapshot events then return immediately (no loop).
640 timeout:
641 Stop looping after this many seconds. ``None`` = run forever.
642 poll_interval:
643 Maximum time to block in :meth:`_Backend.wait` (also the kqueue
644 timeout so expirations are checked at least this often).
645 max_events:
646 If set, exit after emitting this many events (across snapshot and
647 change events). ``None`` = no cap.
648 """
649 emitted = 0 # Total events emitted so far (for max_events enforcement).
650
651 def _emit(ev: WatchEvent) -> bool:
652 """Emit *ev* and return True; return False if the cap has been reached."""
653 nonlocal emitted
654 _emit_event(ev, as_json)
655 emitted += 1
656 return max_events is None or emitted < max_events
657
658 # Snapshot initial state.
659 snap = _scan_dirs(root)
660
661 # Record cache: (kind, record_id) → last-known parsed data.
662 record_cache: dict[tuple[str, str], JsonDict] = {}
663
664 # Emit initial state as snapshot events.
665 for sub in _SUBDIRS:
666 for uid in sorted(snap.get(sub, {})):
667 data = _load_record(root, sub, uid) or {}
668 record_cache[(sub, uid)] = data
669 if _passes_filters(sub, data, kind_filter, run_id_filter, branch_filter):
670 ev = _make_event("snapshot", sub, uid, data)
671 if not _emit(ev):
672 return
673
674 if once:
675 return
676
677 # Seed expiration tracker with currently-active reservations.
678 prev_active_ids: set[str] = {
679 r.reservation_id for r in active_reservations(root)
680 }
681
682 deadline = time.monotonic() + timeout if timeout is not None else None
683
684 while True:
685 remaining = (deadline - time.monotonic()) if deadline is not None else None
686 if remaining is not None and remaining <= 0.0:
687 break
688
689 # Block up to poll_interval (or remaining timeout, whichever is less).
690 wait_sec = min(
691 remaining if remaining is not None else poll_interval,
692 poll_interval,
693 )
694 backend.wait(wait_sec)
695
696 # Rescan unconditionally — handles both FS events and timeout wakeups
697 # (needed for expiration checks even when no files changed).
698 new_snap = _scan_dirs(root)
699 diff = _diff_snapshots(snap, new_snap)
700 snap = new_snap
701
702 # Track which reservation files were deleted this cycle (for expiry dedup).
703 removed_reservation_ids: set[str] = set()
704
705 for event_type, kind, uid in diff:
706 if event_type == "removed":
707 # Use cached data so filters work even after the file is gone.
708 data = record_cache.pop((kind, uid), {})
709 if kind == "reservations":
710 removed_reservation_ids.add(uid)
711 else:
712 data = _load_record(root, kind, uid) or {}
713 if data:
714 record_cache[(kind, uid)] = data
715
716 if _passes_filters(kind, data, kind_filter, run_id_filter, branch_filter):
717 ev = _make_event(event_type, kind, uid, data)
718 if not _emit(ev):
719 return
720
721 # Check for expired reservations (independent of FS changes).
722 exp_events, prev_active_ids = _check_expirations(
723 root, prev_active_ids, removed_reservation_ids
724 )
725 for ev in exp_events:
726 if _passes_filters(
727 ev.kind, ev.data, kind_filter, run_id_filter, branch_filter
728 ):
729 if not _emit(ev):
730 return
731
732 # ── CLI registration ──────────────────────────────────────────────────────────
733
734 def register(
735 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
736 ) -> None:
737 """Register the ``watch`` subcommand on *subparsers* (under ``muse coord``).
738
739 Wires all flags with their defaults, choices, and help text so that
740 ``--help`` output is accurate. Sets ``func`` to :func:`run`.
741 """
742 parser = subparsers.add_parser(
743 "watch",
744 help="Stream coordination events in real time (kqueue/polling).",
745 description=__doc__,
746 formatter_class=argparse.RawDescriptionHelpFormatter,
747 )
748 parser.add_argument(
749 "--once",
750 action="store_true",
751 default=False,
752 help="Emit current state as snapshot events, then exit.",
753 )
754 parser.add_argument(
755 "--timeout",
756 type=float,
757 default=None,
758 metavar="SECONDS",
759 help=(
760 "Stop watching after SECONDS seconds (>= 0). "
761 "0 is equivalent to --once."
762 ),
763 )
764 parser.add_argument(
765 "--max-events",
766 type=int,
767 default=None,
768 metavar="N",
769 dest="max_events",
770 help=(
771 "Exit after emitting N events (>= 1). Useful for batch "
772 "processing without a time-based timeout."
773 ),
774 )
775 parser.add_argument(
776 "--poll-interval",
777 type=float,
778 default=1.0,
779 metavar="SECS",
780 dest="poll_interval",
781 help=(
782 f"Polling/kqueue-timeout interval in seconds "
783 f"(default: 1.0; range: [{_MIN_POLL_INTERVAL}, {_MAX_POLL_INTERVAL}])."
784 ),
785 )
786 parser.add_argument(
787 "--run-id",
788 default=None,
789 metavar="RUNID",
790 dest="run_id",
791 help=(
792 "Only emit events where data.run_id matches. "
793 f"Maximum {_MAX_RUN_ID_LEN} characters."
794 ),
795 )
796 parser.add_argument(
797 "--branch", "-b",
798 default=None,
799 metavar="BRANCH",
800 dest="branch_filter",
801 help="Only emit events where data.branch matches.",
802 )
803 parser.add_argument(
804 "--kind",
805 default=None,
806 choices=sorted(_VALID_KINDS),
807 metavar="KIND",
808 help=(
809 "Only emit events for this kind "
810 f"({' | '.join(sorted(_VALID_KINDS))})."
811 ),
812 )
813 parser.add_argument(
814 "--json", "-j",
815 action="store_true",
816 dest="json_out",
817 help="Emit machine-readable JSON events on stdout.",
818 )
819 parser.set_defaults(func=run, json_out=False)
820
821 # ── Command entry point ───────────────────────────────────────────────────────
822
823 def run(args: argparse.Namespace) -> None:
824 """Stream coordination events from ``.muse/coordination/`` as NDJSON.
825
826 Emits one JSON object per line for every coordination record that is added,
827 modified, removed, or expired. Uses kqueue on macOS/BSD (zero CPU between
828 events) and a configurable polling fallback on Linux. SIGTERM maps to
829 SystemExit(0) for clean process-manager shutdown.
830
831 Agent quickstart::
832
833 muse coord watch --json # stream all events forever
834 muse coord watch --once --json # snapshot current state, exit
835 muse coord watch --kind reservation --json # reservation events only
836 muse coord watch --run-id agent-42 --json # filter by agent run-id
837 muse coord watch --timeout 60 --json # exit after 60 seconds
838
839 JSON fields (NDJSON — one object per line)::
840
841 schema_version Muse version string that produced this event.
842 event_type "snapshot", "added", "modified", "removed", or "expired".
843 kind "reservation", "intent", "release", or "heartbeat".
844 id content-addressed ID of the changed coordination record.
845 timestamp ISO-8601 UTC timestamp when the event was generated.
846 data Parsed record content; empty dict for removed events with no cache.
847
848 Exit codes::
849
850 0 Normal exit (--once, --timeout, --max-events, SIGINT, or SIGTERM).
851 1 Bad arguments or backend initialisation failure.
852 """
853 as_json: bool = args.json_out
854 kind_filter: str | None = args.kind
855 run_id_filter: str | None = args.run_id
856 branch_filter: str | None = args.branch_filter
857 once: bool = args.once
858 timeout: float | None = args.timeout
859 max_events: int | None = args.max_events
860 poll_interval: float = args.poll_interval
861
862 # ── Input validation (before any file I/O) ────────────────────────────────
863
864 if run_id_filter is not None and len(run_id_filter) > _MAX_RUN_ID_LEN:
865 msg = f"--run-id is too long ({len(run_id_filter)} chars; max {_MAX_RUN_ID_LEN})"
866 if as_json:
867 print(json.dumps({"error": msg, "status": "bad_args"}))
868 else:
869 print(f"❌ {msg}", file=sys.stderr)
870 raise SystemExit(ExitCode.USER_ERROR)
871
872 if not (_MIN_POLL_INTERVAL <= poll_interval <= _MAX_POLL_INTERVAL):
873 msg = (
874 f"--poll-interval must be in [{_MIN_POLL_INTERVAL}, {_MAX_POLL_INTERVAL}],"
875 f" got {poll_interval}"
876 )
877 if as_json:
878 print(json.dumps({"error": msg, "status": "bad_args"}))
879 else:
880 print(f"❌ {msg}", file=sys.stderr)
881 raise SystemExit(ExitCode.USER_ERROR)
882
883 if timeout is not None and timeout < 0.0:
884 msg = f"--timeout must be >= 0, got {timeout}"
885 if as_json:
886 print(json.dumps({"error": msg, "status": "bad_args"}))
887 else:
888 print(f"❌ {msg}", file=sys.stderr)
889 raise SystemExit(ExitCode.USER_ERROR)
890
891 if max_events is not None and max_events < 1:
892 msg = f"--max-events must be >= 1, got {max_events}"
893 if as_json:
894 print(json.dumps({"error": msg, "status": "bad_args"}))
895 else:
896 print(f"❌ {msg}", file=sys.stderr)
897 raise SystemExit(ExitCode.USER_ERROR)
898
899 # --timeout 0 is equivalent to --once.
900 if timeout is not None and timeout == 0.0:
901 once = True
902 timeout = None
903
904 root = require_repo()
905
906 # Ensure all coord dirs exist before we try to open or watch them.
907 _ensure_coord_dirs(root)
908
909 # Select and initialise the backend.
910 try:
911 if hasattr(select, "kqueue"):
912 dirs = [_coord_dir(root) / sub for sub in _SUBDIRS]
913 backend: _Backend = _KqueueBackend(dirs)
914 else:
915 backend = _PollingBackend(poll_interval)
916 except (OSError, ValueError) as exc:
917 _err(f"Failed to initialise watch backend: {exc}")
918 raise SystemExit(ExitCode.USER_ERROR)
919
920 # Register SIGTERM → SystemExit(0) so process managers can stop us cleanly.
921 prev_sigterm = signal.getsignal(signal.SIGTERM)
922
923 def _handle_sigterm(sig: int, frame: types.FrameType | None) -> None: # noqa: ARG001
924 raise SystemExit(0)
925
926 signal.signal(signal.SIGTERM, _handle_sigterm)
927
928 if not as_json:
929 backend_label = backend.name
930 print(
931 f"\nmuse coord watch — watching .muse/coordination/ ({backend_label})"
932 )
933 parts: list[str] = []
934 if kind_filter:
935 parts.append(f"kind={kind_filter}")
936 if run_id_filter:
937 parts.append(f"run-id={sanitize_display(run_id_filter)}")
938 if branch_filter:
939 parts.append(f"branch={sanitize_display(branch_filter)}")
940 if max_events is not None:
941 parts.append(f"max-events={max_events}")
942 if parts:
943 print(f" filter: {', '.join(parts)}")
944 print("─" * 62)
945
946 try:
947 _watch_loop(
948 root,
949 backend,
950 kind_filter=kind_filter,
951 run_id_filter=run_id_filter,
952 branch_filter=branch_filter,
953 as_json=as_json,
954 once=once,
955 timeout=timeout,
956 poll_interval=poll_interval,
957 max_events=max_events,
958 )
959 except KeyboardInterrupt:
960 pass
961 finally:
962 backend.close()
963 signal.signal(signal.SIGTERM, prev_sigterm)
964 if not as_json:
965 print("\n (watch ended)", flush=True)
966
967 # ── Helpers ───────────────────────────────────────────────────────────────────
968
969 def _err(msg: str) -> None:
970 """Print an error message to *stderr* with the ``❌`` prefix.
971
972 Used for backend initialisation failures and other pre-loop errors.
973 Validation failures in :func:`run` print their own messages before calling
974 ``raise SystemExit``.
975 """
976 print(f"❌ {msg}", file=sys.stderr)
File History 1 commit
sha256:1ddad36d76d3a8d323f9b3664169cb184b7a38b39208214a2ae504154260826f fix: show full cryptographic IDs in all human-readable CLI output Sonnet 4.6 patch 8 days ago