gabriel / muse public
heartbeat_coord.py python
295 lines 11.3 KB
Raw
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40 docs: add | jq convention to --json section of agent-guide Sonnet 4.6 1 day ago
1 """``muse coord heartbeat`` — extend a reservation's TTL without modifying it.
2
3 Long-running agents use heartbeats to keep their reservations alive past the
4 original TTL. Each heartbeat call atomically rewrites a single keep-alive file
5 at ``.muse/coordination/heartbeats/<reservation-id>.json``, updating
6 ``extended_expires_at`` to ``now + extension_seconds``.
7
8 The immutable reservation record is never modified. :func:`active_reservations`
9 and :func:`filter_reservations` automatically use
10 ``max(reservation.expires_at, heartbeat.extended_expires_at)`` as the effective
11 expiry, so the heartbeat transparently extends the reservation's liveness window.
12
13 Usage::
14
15 muse coord heartbeat <reservation-id> --run-id AGENT-42
16 muse coord heartbeat <reservation-id> --run-id AGENT-42 --extension 7200
17 muse coord heartbeat <reservation-id> --run-id AGENT-42 --json
18
19 Recommended pattern
20 -------------------
21 Poll every ``extension_seconds / 2`` to keep a safety margin::
22
23 while working:
24 do_work()
25 muse coord heartbeat $RES_ID --run-id $RUN_ID --extension 3600
26
27 Exit code 1 (``already_released``) is a hard signal to stop — the reservation
28 was cancelled by another agent or coordinator. Agents should treat it the same
29 as a graceful shutdown request.
30
31 JSON output schema::
32
33 {
34 "status": "ok" | "already_released" | "not_found",
35 "reservation_id": str,
36 "run_id": str,
37 "last_beat_at": str, // ISO 8601
38 "extended_expires_at": str, // ISO 8601
39 "ttl_extended_seconds": int,
40 "duration_ms": float
41 }
42
43 Exit codes::
44
45 0 — heartbeat written successfully
46 1 — bad arguments (--run-id too long, --extension out of range, bad content ID)
47 or reservation already released (stop sending heartbeats)
48 4 — reservation not found
49
50 Flags:
51
52 ``--run-id ID``
53 Agent/pipeline identifier for the audit trail (required).
54 Maximum length: 256 characters.
55
56 ``--extension SECONDS``
57 Number of seconds from now to set as the new expiry (default: 3600).
58 Range: 1–31 536 000 (1 s to 1 year).
59 """
60
61 import argparse
62 import json
63 import sys
64
65 from muse.core.coordination import (
66 _validate_reservation_id,
67 create_heartbeat,
68 load_all_reservations,
69 load_released_ids,
70 )
71 from muse.core.envelope import EnvelopeJson, make_envelope
72 from muse.core.errors import ExitCode
73 from muse.core.repo import require_repo
74 from muse.core.validation import clamp_int, sanitize_display
75 from muse.core.timing import start_timer
76
77 # ── Input constraints ─────────────────────────────────────────────────────────
78
79 #: Maximum byte-length of the ``--run-id`` value. Mirrors the limit in
80 #: ``reserve.py`` and ``release_coord.py`` so audit records stay bounded.
81 _MAX_RUN_ID_LEN: int = 256
82
83 #: Maximum value for ``--extension``. Same ceiling as ``reserve.py``'s
84 #: ``--ttl`` so a single heartbeat cannot pin a reservation open indefinitely.
85 _MAX_EXTENSION_SECONDS: int = 31_536_000 # 1 year
86
87 class _HeartbeatJson(EnvelopeJson):
88 """JSON output for ``muse coord heartbeat --json``.
89
90 Inherits the 6 standard envelope fields from :class:`~muse.core.envelope.EnvelopeJson`.
91
92 Fields
93 ------
94 status ``"ok"`` on success; ``"already_released"`` when the
95 reservation is cancelled (agent should stop);
96 ``"not_found"`` when the reservation ID is unknown.
97 reservation_id content-addressed ID of the reservation whose TTL was extended.
98 run_id Agent/pipeline identifier from ``--run-id``, echoed
99 back for audit correlation.
100 last_beat_at ISO-8601 UTC timestamp of this heartbeat write.
101 extended_expires_at ISO-8601 UTC timestamp of the new effective expiry
102 (``now + extension_seconds``).
103 ttl_extended_seconds Number of seconds from now used as the new expiry
104 (the ``--extension`` value that was applied).
105 """
106
107 status: str
108 reservation_id: str
109 run_id: str
110 last_beat_at: str
111 extended_expires_at: str
112 ttl_extended_seconds: int
113
114 class _HeartbeatErrorJson(EnvelopeJson):
115 """JSON error output for ``muse coord heartbeat --json`` on invalid reservation."""
116
117 status: str
118 reservation_id: str
119
120 # ── CLI registration ──────────────────────────────────────────────────────────
121
122 def register(
123 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
124 ) -> None:
125 """Register the ``heartbeat`` subcommand on *subparsers* (under ``muse coord``).
126
127 Wires all flags with their defaults, choices, and help text so that
128 ``--help`` output is accurate. Sets ``func`` to :func:`run`.
129 """
130 parser = subparsers.add_parser(
131 "heartbeat",
132 help="Extend a reservation's TTL by writing a heartbeat.",
133 description=__doc__,
134 formatter_class=argparse.RawDescriptionHelpFormatter,
135 )
136 parser.add_argument(
137 "reservation_id",
138 metavar="RESERVATION_ID",
139 help="content-addressed ID of the reservation to keep alive.",
140 )
141 parser.add_argument(
142 "--run-id",
143 required=True,
144 dest="run_id",
145 metavar="ID",
146 help=(
147 "Agent run-id sending the heartbeat (for audit trail). "
148 f"Max {_MAX_RUN_ID_LEN} characters."
149 ),
150 )
151 parser.add_argument(
152 "--extension",
153 type=int,
154 default=3600,
155 dest="extension_seconds",
156 metavar="SECONDS",
157 help=(
158 f"Seconds from now to set as the new expiry "
159 f"(default: 3600; range: 1–{_MAX_EXTENSION_SECONDS:,})."
160 ),
161 )
162 parser.add_argument(
163 "--json", "-j",
164 action="store_true",
165 dest="json_out",
166 help="Emit machine-readable JSON.",
167 )
168 parser.set_defaults(func=run)
169
170 # ── Command implementation ────────────────────────────────────────────────────
171
172 def run(args: argparse.Namespace) -> None:
173 """Write or refresh a heartbeat for an active reservation.
174
175 Atomically rewrites ``.muse/coordination/heartbeats/<id>.json`` to
176 extend the reservation's TTL. Call this periodically while holding a
177 reservation; if ``status == "released"`` the agent must stop work
178 immediately. Exit 4 means the reservation was never created or has
179 already expired.
180
181 Agent quickstart
182 ----------------
183 ::
184
185 muse coord heartbeat <reservation-id> --run-id agent-42 --format json
186 muse coord heartbeat <sha256:...> --extension 120 --run-id agent-42 --format json
187
188 JSON fields
189 -----------
190 status ``"ok"`` on success; ``"released"`` if evicted.
191 reservation_id content-addressed ID of the reservation updated.
192 run_id Agent run ID that sent the heartbeat.
193 last_beat_at ISO 8601 timestamp of this heartbeat.
194 extended_expires_at New expiry after the extension is applied.
195 ttl_extended_seconds Seconds the TTL was extended by.
196
197 Exit codes
198 ----------
199 0 Heartbeat written successfully.
200 1 Invalid arguments; or reservation already released (stop work).
201 4 Reservation not found.
202 """
203 elapsed = start_timer()
204 reservation_id: str = args.reservation_id
205 run_id: str = args.run_id
206 extension_seconds: int = args.extension_seconds
207 json_out: bool = args.json_out
208 # ── Input validation (before any file I/O) ────────────────────────────────
209
210 if len(run_id) > _MAX_RUN_ID_LEN:
211 msg = f"--run-id is too long ({len(run_id)} chars; max {_MAX_RUN_ID_LEN})"
212 if json_out:
213 print(json.dumps({"error": msg, "status": "bad_args"}))
214 else:
215 print(f"❌ {msg}", file=sys.stderr)
216 raise SystemExit(ExitCode.USER_ERROR)
217
218 try:
219 extension_seconds = clamp_int(
220 extension_seconds, 1, _MAX_EXTENSION_SECONDS, "--extension"
221 )
222 except ValueError as exc:
223 msg = str(exc)
224 if json_out:
225 print(json.dumps({"error": msg, "status": "bad_args"}))
226 else:
227 print(f"❌ Invalid --extension: {msg}", file=sys.stderr)
228 raise SystemExit(ExitCode.USER_ERROR)
229
230 try:
231 _validate_reservation_id(reservation_id)
232 except ValueError as exc:
233 if json_out:
234 print(json.dumps({"error": str(exc), "status": "bad_id"}))
235 else:
236 print(f"❌ {exc}", file=sys.stderr)
237 raise SystemExit(ExitCode.USER_ERROR)
238
239 root = require_repo()
240
241 # ── Released guard ────────────────────────────────────────────────────────
242 # Stem-only scan — no JSON parsing, O(released_count).
243 released_ids = load_released_ids(root)
244 if reservation_id in released_ids:
245 if json_out:
246 print(json.dumps(_HeartbeatErrorJson(
247 **make_envelope(elapsed, exit_code=ExitCode.USER_ERROR),
248 status="already_released",
249 reservation_id=reservation_id,
250 )))
251 else:
252 rid = sanitize_display(reservation_id)
253 print(
254 f"❌ reservation {rid} is already released — "
255 "stop sending heartbeats",
256 file=sys.stderr,
257 )
258 raise SystemExit(ExitCode.USER_ERROR)
259
260 # ── Existence check ───────────────────────────────────────────────────────
261 all_res = load_all_reservations(root)
262 known_ids = {r.reservation_id for r in all_res}
263 if reservation_id not in known_ids:
264 if json_out:
265 print(json.dumps(_HeartbeatErrorJson(
266 **make_envelope(elapsed, exit_code=ExitCode.NOT_FOUND),
267 status="not_found",
268 reservation_id=reservation_id,
269 )))
270 else:
271 rid = sanitize_display(reservation_id)
272 print(f"❌ reservation {rid} not found", file=sys.stderr)
273 raise SystemExit(ExitCode.NOT_FOUND)
274
275 # ── Write heartbeat ───────────────────────────────────────────────────────
276 hb = create_heartbeat(root, reservation_id, run_id, extension_seconds)
277
278 if json_out:
279 print(json.dumps(_HeartbeatJson(
280 **make_envelope(elapsed),
281 status="ok",
282 reservation_id=hb.reservation_id,
283 run_id=hb.run_id,
284 last_beat_at=hb.last_beat_at.isoformat(),
285 extended_expires_at=hb.extended_expires_at.isoformat(),
286 ttl_extended_seconds=extension_seconds,
287 )))
288 else:
289 rid = sanitize_display(hb.reservation_id)
290 run_label = sanitize_display(hb.run_id)
291 exp_str = hb.extended_expires_at.isoformat()[:19]
292 print(
293 f"💓 heartbeat {rid} run={run_label}"
294 f" extended until {exp_str}Z (+{extension_seconds}s)"
295 )
File History 1 commit
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40 docs: add | jq convention to --json section of agent-guide Sonnet 4.6 1 day ago