gabriel / muse public
task_queue.py python
1,518 lines 54.6 KB
Raw
sha256:1ddad36d76d3a8d323f9b3664169cb184b7a38b39208214a2ae504154260826f fix: show full cryptographic IDs in all human-readable CLI output Sonnet 4.6 patch 1 day ago
1 """``muse coord`` task-queue subcommands — real work distribution.
2
3 Provides five subcommands for operating the file-system–based task queue:
4
5 ``muse coord enqueue``
6 Add a task to the queue.
7
8 ``muse coord claim``
9 Atomically claim the highest-priority pending task. Exactly one agent
10 wins when multiple agents call ``claim`` concurrently.
11
12 ``muse coord complete``
13 Mark a claimed task as successfully completed.
14
15 ``muse coord fail-task``
16 Mark a claimed task as failed (agent could not complete the work).
17
18 ``muse coord tasks``
19 List tasks with optional status/queue/run-id filtering.
20
21 Typical multi-agent workflow::
22
23 # Orchestrator enqueues work:
24 muse coord enqueue "Refactor billing module" \\
25 --priority 5 --payload '{"addresses":["billing.py::*"]}' \\
26 --run-id orchestrator --tags billing
27
28 # N agents compete for tasks (exactly one wins per call):
29 TASK=$(muse coord claim --run-id $AGENT_ID --json)
30 TASK_ID=$(echo "$TASK" | python3 -c "import sys,json; print(json.load(sys.stdin)['task_id'])")
31
32 # Agent works on the task …
33
34 # Agent reports result:
35 muse coord complete $TASK_ID --run-id $AGENT_ID --result '{"pr": 42}'
36 # or on failure:
37 muse coord fail-task $TASK_ID --run-id $AGENT_ID --error "timed out"
38
39 Exit codes (all subcommands)::
40
41 0 — success
42 1 — bad arguments, task not found, queue empty, permission error, or unexpected error
43
44 JSON output
45 -----------
46 All subcommands accept ``--json`` / ``--format json``. The JSON schema for
47 each is documented in the individual function docstrings below.
48 """
49
50 import argparse
51 import json
52 import sys
53 import time
54 from collections.abc import Callable
55 from typing import TypedDict
56
57 from muse.core.types import JsonValue
58 from muse.core.envelope import EnvelopeJson, make_envelope
59 from muse.core.errors import ExitCode
60 from muse.core.repo import require_repo
61 from muse.core.task_queue import (
62 ClaimRecord,
63 TaskRecord,
64 _MAX_QUEUE_LEN,
65 _MAX_TAG_LEN,
66 _MAX_TAGS,
67 _MAX_TITLE_LEN,
68 _validate_queue_name,
69 cancel_task,
70 claim_next_task,
71 complete_task,
72 create_task,
73 fail_task,
74 get_task_status,
75 heartbeat_claim,
76 load_all_claims,
77 load_all_tasks,
78 load_claim,
79 load_task,
80 )
81 from muse.core.validation import sanitize_display
82 from muse.core.timing import start_timer
83
84 type _IntMap = dict[str, int]
85 type _Payload = dict[str, JsonValue]
86
87 # ── Wire-shape TypedDicts ─────────────────────────────────────────────────────
88
89 class _EnqueueJson(EnvelopeJson):
90 task_id: str
91 title: str
92 priority: int
93 queue: str
94 ttl_seconds: int
95 created_by: str
96 created_at: str
97 tags: list[str]
98 payload: _Payload
99
100 class _ClaimSuccessJson(EnvelopeJson):
101 task_id: str
102 claimer_run_id: str
103 claimed_at: str
104 expires_at: str
105 status: str
106 heartbeat_at: str
107 claim_nonce: str
108 result: _Payload | None
109 error: str | None
110 task: _Payload
111
112 class _ClaimEmptyJson(EnvelopeJson):
113 status: str
114 queue: str | None
115
116 class _ClaimRecordJson(EnvelopeJson):
117 """Wire shape for complete / fail-task / cancel-task responses."""
118
119 task_id: str
120 claimer_run_id: str
121 claimed_at: str
122 expires_at: str
123 status: str
124 heartbeat_at: str
125 claim_nonce: str
126 result: _Payload | None
127 error: str | None
128
129 class _TaskItemJson(TypedDict):
130 task_id: str
131 title: str
132 priority: int
133 queue: str
134 status: str
135 created_by: str
136 created_at: str
137 ttl_seconds: int
138 tags: list[str]
139 payload: _Payload
140 claimer_run_id: str | None
141 expires_at: str | None
142
143 class _TasksJson(EnvelopeJson):
144 total: int
145 pending: int
146 claimed: int
147 timed_out: int
148 completed: int
149 failed: int
150 cancelled: int
151 limit: int
152 truncated: bool
153 items: list[_TaskItemJson]
154
155 # ── Module-level constants ────────────────────────────────────────────────────
156
157 #: Maximum byte-length of ``--run-id``. Mirrors all other coordination commands.
158 _MAX_RUN_ID_LEN: int = 256
159
160 #: Maximum byte-length of the serialised ``--payload`` JSON string.
161 #: Prevents unbounded memory use when the payload is later loaded into RAM.
162 _MAX_PAYLOAD_BYTES: int = 65536 # 64 KiB
163
164 #: Minimum claim TTL in seconds. A TTL of 0 creates an immediately-expired
165 #: claim which would be immediately re-claimable by any other agent.
166 _MIN_CLAIM_TTL: int = 1
167
168 #: Maximum claim TTL in seconds (24 h). Prevents agents from holding tasks
169 #: indefinitely with no heartbeat path.
170 _MAX_CLAIM_TTL: int = 86400
171
172 #: Maximum ``--wait`` polling duration in seconds (1 h).
173 _MAX_WAIT_SECONDS: int = 3600
174
175 #: Maximum byte-length of the serialised ``--result`` JSON string.
176 #: Mirrors :data:`_MAX_PAYLOAD_BYTES`; keeps claim files bounded in size.
177 _MAX_RESULT_BYTES: int = 65536 # 64 KiB
178
179 #: Maximum character-length of the ``--error`` message on fail-task.
180 #: Bounds claim file sizes and prevents pathological memory use when loading
181 #: large sets of failed claims for analysis.
182 _MAX_ERROR_LEN: int = 4096
183
184 #: Maximum value of ``--limit`` on ``tasks``. Prevents accidentally dumping
185 #: the entire queue in one shot on very large repos.
186 _MAX_LIMIT: int = 10000
187
188 # ── Shared argument helpers ───────────────────────────────────────────────────
189
190 def _add_format_args(parser: argparse.ArgumentParser) -> None:
191 """Add --json / -j to *parser*."""
192 parser.add_argument(
193 "--json", "-j",
194 action="store_true",
195 dest="json_out",
196 help="Emit machine-readable JSON on stdout.",
197 )
198 parser.set_defaults(json_out=False)
199
200 def _err(
201 msg: str,
202 json_out: bool = False,
203 status: str = "error",
204 elapsed: Callable[[], float] | None = None,
205 ) -> None:
206 """Print an error message.
207
208 In JSON mode, emits a JSON envelope with ``error`` and ``status`` to
209 *stdout* (so the machine-readable stream is never broken by a bare text
210 line). In text mode, prefixes with ``❌`` and writes to *stderr*.
211 """
212 if json_out:
213 env = make_envelope(elapsed, exit_code=ExitCode.USER_ERROR) if elapsed is not None else {}
214 print(json.dumps({**env, "error": msg, "status": status}))
215 else:
216 print(f"❌ {msg}", file=sys.stderr)
217
218 # ── muse coord enqueue ────────────────────────────────────────────────────────
219
220 def register_enqueue(
221 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
222 ) -> None:
223 """Register ``enqueue`` on *subparsers* (under ``muse coord``).
224
225 Wires all flags with their defaults, choices, and help text so that
226 ``--help`` output is accurate. Sets ``func`` to :func:`run_enqueue`.
227
228 Flags registered
229 ----------------
230 ``title``
231 Positional. Short human-readable description (≤ ``_MAX_TITLE_LEN``
232 chars; trimmed by the core layer, not rejected).
233 ``--priority N``
234 Integer priority; higher = claimed first. Default: 0.
235 ``--queue QUEUE``
236 Target logical queue. Must match ``[a-zA-Z0-9_-]+``. Default:
237 ``"default"``.
238 ``--ttl SECONDS``
239 Pending TTL in seconds. Must be ≥ 1. Default: 86400 (24 h).
240 ``--run-id RUNID``
241 Enqueuing agent identifier. Maximum :data:`_MAX_RUN_ID_LEN` chars.
242 ``--payload JSON``
243 Arbitrary JSON object (must be ``{}``-style, not an array).
244 Maximum :data:`_MAX_PAYLOAD_BYTES` bytes when UTF-8 encoded.
245 ``--tags TAG1,TAG2``
246 Comma-separated tag list. Maximum :data:`~muse.core.task_queue._MAX_TAGS`
247 tags; each tag is trimmed to :data:`~muse.core.task_queue._MAX_TAG_LEN`
248 chars by the core layer.
249 ``--format`` / ``--json``
250 Machine-readable compact JSON output.
251
252 JSON output schema::
253
254 {
255 "schema_version": str,
256 "task_id": str, // content-addressed ID
257 "title": str,
258 "priority": int,
259 "queue": str,
260 "ttl_seconds": int,
261 "created_by": str,
262 "created_at": str, // ISO 8601 UTC
263 "tags": [str, ...],
264 "payload": dict,
265 "duration_ms": float
266 }
267
268 Exit codes::
269
270 0 — task enqueued
271 1 — bad arguments
272 """
273 parser = subparsers.add_parser(
274 "enqueue",
275 help="Add a task to the coordination task queue.",
276 formatter_class=argparse.RawDescriptionHelpFormatter,
277 description="""Add a work item to the task queue for agents to claim and execute.
278
279 Tasks are ordered by priority (higher = first) then creation time (FIFO within
280 same priority). The ``payload`` field carries arbitrary JSON that the claiming
281 agent receives along with the task definition.
282 """,
283 )
284 parser.add_argument(
285 "title",
286 help=(
287 f"Short human-readable task description (≤ {_MAX_TITLE_LEN} chars)."
288 ),
289 )
290 parser.add_argument(
291 "--priority",
292 type=int,
293 default=0,
294 metavar="N",
295 help="Task priority (higher = processed first, default 0).",
296 )
297 parser.add_argument(
298 "--queue",
299 default="default",
300 metavar="QUEUE",
301 help=(
302 f"Target queue name (default: 'default'). "
303 f"Max {_MAX_QUEUE_LEN} chars, pattern [a-zA-Z0-9_-]."
304 ),
305 )
306 parser.add_argument(
307 "--ttl",
308 type=int,
309 default=86400,
310 dest="ttl_seconds",
311 metavar="SECONDS",
312 help="Seconds the task may remain pending before expiry (default 86400, min 1).",
313 )
314 parser.add_argument(
315 "--run-id",
316 default="unknown",
317 dest="run_id",
318 metavar="RUNID",
319 help=(
320 f"Enqueuing agent / orchestrator identifier. "
321 f"Maximum {_MAX_RUN_ID_LEN} characters."
322 ),
323 )
324 parser.add_argument(
325 "--payload",
326 default="{}",
327 metavar="JSON",
328 help=(
329 f"Arbitrary JSON object attached to the task (default: {{}}). "
330 f"Must be a JSON object (not array). "
331 f"Maximum {_MAX_PAYLOAD_BYTES} bytes when UTF-8 encoded."
332 ),
333 )
334 parser.add_argument(
335 "--tags",
336 default="",
337 metavar="TAG1,TAG2",
338 help=(
339 f"Comma-separated list of tags. "
340 f"Maximum {_MAX_TAGS} tags, each ≤ {_MAX_TAG_LEN} chars."
341 ),
342 )
343 _add_format_args(parser)
344 parser.set_defaults(func=run_enqueue)
345
346 def run_enqueue(args: argparse.Namespace) -> None:
347 """Create and persist a new task in the coordination queue.
348
349 Enqueues a work item with a title, priority, payload, tags, and TTL.
350 Input validation (title, run-id, ttl, payload size, queue name, tag count)
351 fires before any file I/O. Exactly one task file is created atomically.
352
353 Agent quickstart::
354
355 muse coord enqueue "Refactor billing" --run-id orchestrator --json
356 muse coord enqueue "Fix bug" --priority 5 --queue hotfix --run-id orch --json
357 muse coord enqueue "Analyse" --payload '{"file":"x.py"}' --tags analysis --run-id orch --json
358
359 JSON fields::
360
361 task_id content-addressed ID of the newly created task.
362 title Human-readable task description.
363 priority Integer priority (higher = claimed first).
364 queue Target queue name.
365 ttl_seconds Seconds the task may remain pending.
366 created_by Enqueuing agent identifier (--run-id).
367 created_at ISO 8601 UTC creation timestamp.
368 tags List of string tags.
369 payload Arbitrary JSON object attached to the task.
370 muse_version Muse release that produced this output.
371 schema Envelope schema version (int).
372 exit_code 0 on success, 1 on bad arguments.
373 duration_ms Wall-clock milliseconds for the command.
374 timestamp ISO-8601 UTC timestamp of command completion.
375 warnings List of non-fatal advisory messages.
376
377 Exit codes::
378
379 0 Task enqueued successfully.
380 1 Bad arguments (empty title, invalid queue, oversized payload, etc.).
381 """
382 elapsed = start_timer()
383 json_out: bool = args.json_out
384
385 # ── Input validation (before any file I/O) ────────────────────────────────
386
387 if not args.title or not args.title.strip():
388 msg = "title must be non-empty"
389 _err(msg, json_out, "bad_args", elapsed)
390 raise SystemExit(ExitCode.USER_ERROR)
391
392 if len(args.run_id) > _MAX_RUN_ID_LEN:
393 msg = f"--run-id is too long ({len(args.run_id)} chars; max {_MAX_RUN_ID_LEN})"
394 _err(msg, json_out, "bad_args", elapsed)
395 raise SystemExit(ExitCode.USER_ERROR)
396
397 if args.ttl_seconds < 1:
398 msg = f"--ttl must be ≥ 1, got {args.ttl_seconds}"
399 _err(msg, json_out, "bad_args", elapsed)
400 raise SystemExit(ExitCode.USER_ERROR)
401
402 payload_bytes = args.payload.encode()
403 if len(payload_bytes) > _MAX_PAYLOAD_BYTES:
404 msg = (
405 f"--payload is too large ({len(payload_bytes)} bytes; "
406 f"max {_MAX_PAYLOAD_BYTES})"
407 )
408 _err(msg, json_out, "bad_args", elapsed)
409 raise SystemExit(ExitCode.USER_ERROR)
410
411 try:
412 payload = json.loads(args.payload)
413 if not isinstance(payload, dict):
414 raise ValueError("payload must be a JSON object")
415 except (json.JSONDecodeError, ValueError) as exc:
416 _err(f"invalid --payload: {exc}", json_out, "bad_payload", elapsed)
417 raise SystemExit(ExitCode.USER_ERROR)
418
419 tags = [t.strip() for t in args.tags.split(",") if t.strip()] if args.tags else []
420 if len(tags) > _MAX_TAGS:
421 msg = f"too many tags: {len(tags)} (max {_MAX_TAGS})"
422 _err(msg, json_out, "bad_args", elapsed)
423 raise SystemExit(ExitCode.USER_ERROR)
424
425 try:
426 _validate_queue_name(args.queue)
427 except ValueError as exc:
428 _err(str(exc), json_out, "bad_queue", elapsed)
429 raise SystemExit(ExitCode.USER_ERROR)
430
431 root = require_repo()
432
433 try:
434 task = create_task(
435 root,
436 args.title,
437 payload=payload,
438 priority=args.priority,
439 queue=args.queue,
440 ttl_seconds=args.ttl_seconds,
441 created_by=args.run_id,
442 tags=tags,
443 )
444 except ValueError as exc:
445 _err(str(exc), json_out, "bad_args", elapsed)
446 raise SystemExit(ExitCode.USER_ERROR)
447
448 if json_out:
449 print(json.dumps({**make_envelope(elapsed), **task.to_dict()}))
450 return
451
452 print(f"\n✅ Task enqueued")
453 print(f" Task ID: {sanitize_display(task.task_id)}")
454 print(f" Title: {sanitize_display(task.title)}")
455 print(f" Queue: {sanitize_display(task.queue)} priority={task.priority}")
456 print(f" TTL: {task.ttl_seconds}s")
457 if task.tags:
458 print(f" Tags: {', '.join(sanitize_display(t) for t in task.tags)}")
459 print(f"\n ({elapsed():.3f}s)")
460
461 # ── muse coord claim ──────────────────────────────────────────────────────────
462
463 def register_claim(
464 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
465 ) -> None:
466 """Register ``claim`` on *subparsers* (under ``muse coord``).
467
468 Wires all flags with their defaults, choices, and help text so that
469 ``--help`` output is accurate. Sets ``func`` to :func:`run_claim`.
470
471 Flags registered
472 ----------------
473 ``--run-id RUNID``
474 Required. Claiming agent identifier stored in the claim record.
475 Maximum :data:`_MAX_RUN_ID_LEN` characters.
476 ``--queue QUEUE``
477 Restrict claiming to a specific queue. When omitted, the
478 highest-priority task across all queues is claimed. Must match
479 ``[a-zA-Z0-9_-]+`` when provided.
480 ``--claim-ttl SECONDS``
481 How long the claim is valid before another agent may re-claim the
482 task. Must be in ``[_MIN_CLAIM_TTL, _MAX_CLAIM_TTL]``. Agents
483 should call ``muse coord heartbeat`` every ``claim-ttl / 2``
484 seconds to keep the claim alive. Default: 3600 (1 h).
485 ``--wait SECONDS``
486 If the queue is empty, poll until a task appears or *SECONDS*
487 elapses. ``0`` (default) returns immediately on empty. Maximum
488 :data:`_MAX_WAIT_SECONDS`. Uses exponential backoff capped at 5 s.
489 ``--format`` / ``--json``
490 Machine-readable compact JSON output.
491
492 JSON output schema (success, exit 0)::
493
494 {
495 "schema_version": str,
496 "status": "claimed",
497 "task_id": str,
498 "claimer_run_id": str,
499 "claimed_at": str, // ISO 8601 UTC
500 "expires_at": str, // ISO 8601 UTC
501 "task": { ... }, // full TaskRecord fields
502 "duration_ms": float
503 }
504
505 JSON output schema (queue empty after any wait, exit 1)::
506
507 {
508 "schema_version": str,
509 "status": "empty",
510 "queue": str | null,
511 "duration_ms": float
512 }
513
514 Exit codes::
515
516 0 — task claimed successfully
517 1 — queue empty (after exhausting --wait period) or bad arguments
518 """
519 parser = subparsers.add_parser(
520 "claim",
521 help="Atomically claim the highest-priority pending task.",
522 formatter_class=argparse.RawDescriptionHelpFormatter,
523 description="""Scan the task queue and atomically claim the highest-priority
524 pending task using O_CREAT|O_EXCL. When multiple agents call 'claim'
525 simultaneously, exactly one wins per task — no task is ever claimed twice.
526
527 Timed-out tasks (claim.expires_at < now) are eligible for re-claiming using
528 optimistic concurrency (atomic rename + read-back nonce verification).
529 """,
530 )
531 parser.add_argument(
532 "--run-id",
533 required=True,
534 dest="run_id",
535 metavar="RUNID",
536 help=(
537 f"Claiming agent identifier (stored in the claim record). "
538 f"Maximum {_MAX_RUN_ID_LEN} characters."
539 ),
540 )
541 parser.add_argument(
542 "--queue",
543 default=None,
544 metavar="QUEUE",
545 help=(
546 "Only claim from this queue (default: any queue). "
547 "Must match [a-zA-Z0-9_-]+ when provided."
548 ),
549 )
550 parser.add_argument(
551 "--claim-ttl",
552 type=int,
553 default=3600,
554 dest="claim_ttl",
555 metavar="SECONDS",
556 help=(
557 f"Claim TTL in seconds "
558 f"(min {_MIN_CLAIM_TTL}, max {_MAX_CLAIM_TTL}, default 3600). "
559 f"Heartbeat every claim-ttl/2 seconds to keep the claim alive."
560 ),
561 )
562 parser.add_argument(
563 "--wait",
564 type=int,
565 default=0,
566 dest="wait",
567 metavar="SECONDS",
568 help=(
569 f"If the queue is empty, poll until a task appears or SECONDS "
570 f"elapses (default 0 = return immediately). "
571 f"Maximum {_MAX_WAIT_SECONDS} s. Uses exponential backoff."
572 ),
573 )
574 _add_format_args(parser)
575 parser.set_defaults(func=run_claim)
576
577 def run_claim(args: argparse.Namespace) -> None:
578 """Atomically claim the highest-priority pending task.
579
580 Uses O_CREAT|O_EXCL for initial claims and atomic rename + nonce
581 verification for re-claiming timed-out tasks. Exactly one agent wins
582 per task when multiple agents call claim concurrently. With --wait,
583 polls with exponential backoff until a task appears or the deadline elapses.
584
585 Agent quickstart::
586
587 muse coord claim --run-id agent-1 --json
588 muse coord claim --run-id agent-1 --queue hotfix --json
589 muse coord claim --run-id agent-1 --wait 60 --json
590
591 JSON fields (claimed)::
592
593 task_id content-addressed ID of the claimed task.
594 claimer_run_id Agent identifier that claimed the task.
595 claimed_at ISO 8601 UTC when the claim was created.
596 expires_at ISO 8601 UTC when the claim expires.
597 status Always "claimed".
598 heartbeat_at ISO 8601 UTC of last heartbeat.
599 claim_nonce Random nonce for optimistic re-claim verification.
600 result null (set by complete).
601 error null (set by fail-task).
602 task Full TaskRecord dict (task_id, title, priority, …).
603 muse_version Muse release that produced this output.
604 schema Envelope schema version (int).
605 exit_code 0 on success, 1 on empty queue or bad arguments.
606 duration_ms Wall-clock milliseconds for the command.
607 timestamp ISO-8601 UTC timestamp of command completion.
608 warnings List of non-fatal advisory messages.
609
610 JSON fields (empty queue)::
611
612 status Always "empty".
613 queue Queue filter that was applied (null if none).
614
615 Exit codes::
616
617 0 Task claimed successfully.
618 1 Queue empty after --wait period, or bad arguments.
619 """
620 elapsed = start_timer()
621 json_out: bool = args.json_out
622 wait_seconds: int = getattr(args, "wait", 0)
623
624 # ── Input validation (before any file I/O) ────────────────────────────────
625
626 if len(args.run_id) > _MAX_RUN_ID_LEN:
627 msg = f"--run-id is too long ({len(args.run_id)} chars; max {_MAX_RUN_ID_LEN})"
628 _err(msg, json_out, "bad_args", elapsed)
629 raise SystemExit(ExitCode.USER_ERROR)
630
631 if not (_MIN_CLAIM_TTL <= args.claim_ttl <= _MAX_CLAIM_TTL):
632 msg = (
633 f"--claim-ttl must be between {_MIN_CLAIM_TTL} and "
634 f"{_MAX_CLAIM_TTL}, got {args.claim_ttl}"
635 )
636 _err(msg, json_out, "bad_args", elapsed)
637 raise SystemExit(ExitCode.USER_ERROR)
638
639 if not (0 <= wait_seconds <= _MAX_WAIT_SECONDS):
640 msg = f"--wait must be between 0 and {_MAX_WAIT_SECONDS}, got {wait_seconds}"
641 _err(msg, json_out, "bad_args", elapsed)
642 raise SystemExit(ExitCode.USER_ERROR)
643
644 if args.queue is not None:
645 try:
646 _validate_queue_name(args.queue)
647 except ValueError as exc:
648 _err(str(exc), json_out, "bad_queue", elapsed)
649 raise SystemExit(ExitCode.USER_ERROR)
650
651 root = require_repo()
652
653 # ── Claim loop (with optional --wait polling) ─────────────────────────────
654
655 result = None
656 try:
657 result = claim_next_task(
658 root,
659 args.run_id,
660 queue=args.queue,
661 claim_ttl_seconds=args.claim_ttl,
662 )
663 except (ValueError, OSError) as exc:
664 _err(str(exc), json_out, elapsed=elapsed)
665 raise SystemExit(ExitCode.USER_ERROR)
666
667 if result is None and wait_seconds > 0:
668 deadline = time.monotonic() + wait_seconds
669 poll = 0.5
670 while time.monotonic() < deadline:
671 remaining = deadline - time.monotonic()
672 time.sleep(min(poll, remaining))
673 poll = min(poll * 1.5, 5.0)
674 try:
675 result = claim_next_task(
676 root,
677 args.run_id,
678 queue=args.queue,
679 claim_ttl_seconds=args.claim_ttl,
680 )
681 except (ValueError, OSError) as exc:
682 _err(str(exc), json_out, elapsed=elapsed)
683 raise SystemExit(ExitCode.USER_ERROR)
684 if result is not None:
685 break
686
687 if result is None:
688 if json_out:
689 print(json.dumps(_ClaimEmptyJson(
690 **make_envelope(elapsed, exit_code=ExitCode.USER_ERROR),
691 status="empty",
692 queue=args.queue,
693 )))
694 else:
695 queue_str = sanitize_display(args.queue or "any")
696 waited = f" after {elapsed():.1f}s" if wait_seconds > 0 else ""
697 print(f"\n Queue [{queue_str}] is empty — no pending tasks{waited}.")
698 print(f"\n ({elapsed():.3f}s)")
699 raise SystemExit(ExitCode.USER_ERROR)
700
701 task, claim = result
702
703 if json_out:
704 print(json.dumps({**make_envelope(elapsed), **claim.to_dict(), "task": task.to_dict()}))
705 return
706
707 ttl_remaining = int((claim.expires_at - claim.claimed_at).total_seconds())
708 print(f"\n🔒 Task claimed")
709 print(f" Task ID: {sanitize_display(claim.task_id)}")
710 print(f" Title: {sanitize_display(task.title)}")
711 print(f" Claimer: {sanitize_display(claim.claimer_run_id)}")
712 print(f" Queue: {sanitize_display(task.queue)} priority={task.priority}")
713 print(f" Expires in: {ttl_remaining}s ({claim.expires_at.isoformat()})")
714 if task.payload:
715 print(f" Payload: {json.dumps(task.payload, indent=None)[:120]}")
716 if task.tags:
717 print(f" Tags: {', '.join(sanitize_display(t) for t in task.tags)}")
718 print(f"\n ({elapsed():.3f}s)")
719
720 # ── muse coord complete ───────────────────────────────────────────────────────
721
722 def register_complete(
723 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
724 ) -> None:
725 """Register ``complete`` on *subparsers* (under ``muse coord``).
726
727 Wires all flags with their defaults and help text so that ``--help``
728 output is accurate. Sets ``func`` to :func:`run_complete`.
729
730 Flags registered
731 ----------------
732 ``task_id`` (positional)
733 content-addressed ID of the task to mark completed. Must be a valid sha256: content ID; validated
734 before any file I/O.
735 ``--run-id RUNID``
736 Required. The claiming agent's identifier — must exactly match the
737 ``claimer_run_id`` recorded when the task was claimed. Capped at
738 :data:`_MAX_RUN_ID_LEN` characters.
739 ``--result JSON``
740 Optional JSON object describing the outcome (e.g. proposal URL, artefact
741 paths). Must be a JSON object (not an array or scalar). Serialised
742 size is capped at :data:`_MAX_RESULT_BYTES` bytes to prevent unbounded
743 claim files. Defaults to ``{}``.
744 ``--format`` / ``--json``
745 Emit compact JSON to stdout; default is human-readable text.
746
747 JSON output schema::
748
749 {
750 "schema_version": str,
751 "task_id": str,
752 "claimer_run_id": str,
753 "claimed_at": str, // ISO 8601
754 "expires_at": str, // ISO 8601
755 "status": "completed",
756 "heartbeat_at": str, // ISO 8601
757 "claim_nonce": str,
758 "result": dict | null,
759 "error": null,
760 "duration_ms": float
761 }
762
763 Exit codes::
764
765 0 — task marked completed
766 1 — bad arguments, task not found, not claimed, or wrong run-id
767 """
768 parser = subparsers.add_parser(
769 "complete",
770 help="Mark a claimed task as successfully completed.",
771 formatter_class=argparse.RawDescriptionHelpFormatter,
772 description=__doc__,
773 )
774 parser.add_argument("task_id", help="content-addressed ID of the task to complete.")
775 parser.add_argument(
776 "--run-id",
777 required=True,
778 dest="run_id",
779 metavar="RUNID",
780 help=(
781 "Claiming agent identifier — must match the original claimer. "
782 f"Maximum {_MAX_RUN_ID_LEN} characters."
783 ),
784 )
785 parser.add_argument(
786 "--result",
787 default="{}",
788 metavar="JSON",
789 help=(
790 "JSON object describing the outcome (default: {{}}). "
791 f"Maximum {_MAX_RESULT_BYTES} bytes serialised."
792 ),
793 )
794 _add_format_args(parser)
795 parser.set_defaults(func=run_complete)
796
797 def run_complete(args: argparse.Namespace) -> None:
798 """Mark a claimed task as completed.
799
800 Validates claimer ownership (--run-id must match the original claimer)
801 and atomically updates the claim record. The optional --result payload
802 is attached to the claim for the orchestrator to inspect.
803
804 Agent quickstart::
805
806 muse coord complete <task-id> --run-id agent-1 --json
807 muse coord complete <task-id> --run-id agent-1 --result '{"pr":42}' --json
808
809 JSON fields::
810
811 task_id content-addressed ID of the completed task.
812 claimer_run_id Agent that completed the task.
813 claimed_at ISO 8601 UTC when originally claimed.
814 expires_at ISO 8601 UTC expiry of the original claim.
815 status Always "completed".
816 heartbeat_at ISO 8601 UTC of last heartbeat.
817 claim_nonce Original nonce from claim time.
818 result Result JSON object provided via --result (or null).
819 error null.
820 muse_version Muse release that produced this output.
821 schema Envelope schema version (int).
822 exit_code 0 on success, 1 on error.
823 duration_ms Wall-clock milliseconds for the command.
824 timestamp ISO-8601 UTC timestamp of command completion.
825 warnings List of non-fatal advisory messages.
826
827 Exit codes::
828
829 0 Task marked completed.
830 1 Bad arguments, task not found, not claimed, or wrong run-id.
831 """
832 elapsed = start_timer()
833 json_out: bool = args.json_out
834
835 # ── Input validation (before any file I/O) ────────────────────────────────
836
837 if len(args.run_id) > _MAX_RUN_ID_LEN:
838 msg = f"--run-id is too long ({len(args.run_id)} chars; max {_MAX_RUN_ID_LEN})"
839 _err(msg, json_out, "bad_args", elapsed)
840 raise SystemExit(ExitCode.USER_ERROR)
841
842 try:
843 result_data = json.loads(args.result)
844 if not isinstance(result_data, dict):
845 raise ValueError("--result must be a JSON object, not a scalar or array")
846 except (json.JSONDecodeError, ValueError) as exc:
847 _err(f"invalid --result: {exc}", json_out, "bad_args", elapsed)
848 raise SystemExit(ExitCode.USER_ERROR)
849
850 result_bytes = len(args.result.encode())
851 if result_bytes > _MAX_RESULT_BYTES:
852 msg = f"--result is too large ({result_bytes} bytes; max {_MAX_RESULT_BYTES})"
853 _err(msg, json_out, "bad_args", elapsed)
854 raise SystemExit(ExitCode.USER_ERROR)
855
856 try:
857 from muse.core.task_queue import _validate_task_id
858 _validate_task_id(args.task_id)
859 except ValueError as exc:
860 _err(str(exc), json_out, "bad_task_id", elapsed)
861 raise SystemExit(ExitCode.USER_ERROR)
862
863 root = require_repo()
864
865 try:
866 claim = complete_task(root, args.task_id, args.run_id, result=result_data or None)
867 except (FileNotFoundError, PermissionError, RuntimeError, ValueError) as exc:
868 _err(str(exc), json_out, elapsed=elapsed)
869 raise SystemExit(ExitCode.USER_ERROR)
870
871 if json_out:
872 print(json.dumps({**make_envelope(elapsed), **claim.to_dict()}))
873 return
874
875 task = load_task(root, claim.task_id)
876 print(f"\n✅ Task completed")
877 print(f" Task ID: {sanitize_display(claim.task_id)}")
878 if task is not None:
879 print(f" Title: {sanitize_display(task.title)}")
880 print(f" Queue: {sanitize_display(task.queue)}")
881 print(f" By: {sanitize_display(claim.claimer_run_id)}")
882 if result_data:
883 print(f" Result: {json.dumps(result_data, indent=None)[:120]}")
884 print(f"\n ({elapsed():.3f}s)")
885
886 # ── muse coord fail-task ──────────────────────────────────────────────────────
887
888 def register_fail_task(
889 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
890 ) -> None:
891 """Register ``fail-task`` on *subparsers* (under ``muse coord``).
892
893 Wires all flags with their defaults and help text so that ``--help``
894 output is accurate. Sets ``func`` to :func:`run_fail_task`.
895
896 Flags registered
897 ----------------
898 ``task_id`` (positional)
899 content-addressed ID of the task to mark failed. Must be a valid sha256: content ID; validated
900 before any file I/O.
901 ``--run-id RUNID``
902 Required. The claiming agent's identifier — must exactly match the
903 ``claimer_run_id`` recorded when the task was claimed. Capped at
904 :data:`_MAX_RUN_ID_LEN` characters.
905 ``--error MESSAGE``
906 Human-readable error message describing why the task failed. Capped
907 at :data:`_MAX_ERROR_LEN` characters to keep claim files bounded.
908 Always include a meaningful message so orchestrators and retry agents
909 can understand the failure mode without reading logs.
910 ``--format`` / ``--json``
911 Emit compact JSON to stdout; default is human-readable text.
912
913 JSON output schema::
914
915 {
916 "schema_version": str,
917 "task_id": str,
918 "claimer_run_id": str,
919 "claimed_at": str, // ISO 8601
920 "expires_at": str, // ISO 8601
921 "status": "failed",
922 "heartbeat_at": str, // ISO 8601
923 "claim_nonce": str,
924 "result": null,
925 "error": str,
926 "duration_ms": float
927 }
928
929 Exit codes::
930
931 0 — task marked failed
932 1 — bad arguments, task not found, not claimed, or wrong run-id
933 """
934 parser = subparsers.add_parser(
935 "fail-task",
936 help="Mark a claimed task as failed.",
937 formatter_class=argparse.RawDescriptionHelpFormatter,
938 description=__doc__,
939 )
940 parser.add_argument("task_id", help="content-addressed ID of the task to fail.")
941 parser.add_argument(
942 "--run-id",
943 required=True,
944 dest="run_id",
945 metavar="RUNID",
946 help=(
947 "Claiming agent identifier — must match the original claimer. "
948 f"Maximum {_MAX_RUN_ID_LEN} characters."
949 ),
950 )
951 parser.add_argument(
952 "--error",
953 default="",
954 metavar="MESSAGE",
955 help=(
956 "Human-readable error message describing why the task failed. "
957 f"Maximum {_MAX_ERROR_LEN} characters. Always provide a message "
958 "so orchestrators can diagnose failures without reading logs."
959 ),
960 )
961 _add_format_args(parser)
962 parser.set_defaults(func=run_fail_task)
963
964 def run_fail_task(args: argparse.Namespace) -> None:
965 """Mark a claimed task as failed.
966
967 Records an error message on the claim and updates its status to "failed".
968 The orchestrator can inspect the error and re-enqueue or escalate as needed.
969 Always supply --error with enough detail to diagnose the failure without logs.
970
971 Agent quickstart::
972
973 muse coord fail-task <task-id> --run-id agent-1 --json
974 muse coord fail-task <task-id> --run-id agent-1 --error "timed out" --json
975
976 JSON fields::
977
978 task_id content-addressed ID of the failed task.
979 claimer_run_id Agent that reported the failure.
980 claimed_at ISO 8601 UTC when originally claimed.
981 expires_at ISO 8601 UTC expiry of the original claim.
982 status Always "failed".
983 heartbeat_at ISO 8601 UTC of last heartbeat.
984 claim_nonce Original nonce from claim time.
985 result null.
986 error Human-readable failure message from --error.
987 muse_version Muse release that produced this output.
988 schema Envelope schema version (int).
989 exit_code 0 on success, 1 on error.
990 duration_ms Wall-clock milliseconds for the command.
991 timestamp ISO-8601 UTC timestamp of command completion.
992 warnings List of non-fatal advisory messages.
993
994 Exit codes::
995
996 0 Task marked failed.
997 1 Bad arguments, task not found, not claimed, or wrong run-id.
998 """
999 elapsed = start_timer()
1000 json_out: bool = args.json_out
1001
1002 # ── Input validation (before any file I/O) ────────────────────────────────
1003
1004 if len(args.run_id) > _MAX_RUN_ID_LEN:
1005 msg = f"--run-id is too long ({len(args.run_id)} chars; max {_MAX_RUN_ID_LEN})"
1006 _err(msg, json_out, "bad_args", elapsed)
1007 raise SystemExit(ExitCode.USER_ERROR)
1008
1009 if len(args.error) > _MAX_ERROR_LEN:
1010 msg = f"--error is too long ({len(args.error)} chars; max {_MAX_ERROR_LEN})"
1011 _err(msg, json_out, "bad_args", elapsed)
1012 raise SystemExit(ExitCode.USER_ERROR)
1013
1014 try:
1015 from muse.core.task_queue import _validate_task_id
1016 _validate_task_id(args.task_id)
1017 except ValueError as exc:
1018 _err(str(exc), json_out, "bad_task_id", elapsed)
1019 raise SystemExit(ExitCode.USER_ERROR)
1020
1021 root = require_repo()
1022
1023 try:
1024 claim = fail_task(root, args.task_id, args.run_id, error=args.error)
1025 except (FileNotFoundError, PermissionError, RuntimeError, ValueError) as exc:
1026 _err(str(exc), json_out, elapsed=elapsed)
1027 raise SystemExit(ExitCode.USER_ERROR)
1028
1029 if json_out:
1030 print(json.dumps({**make_envelope(elapsed), **claim.to_dict()}))
1031 return
1032
1033 task = load_task(root, claim.task_id)
1034 print(f"\n❌ Task failed")
1035 print(f" Task ID: {sanitize_display(claim.task_id)}")
1036 if task is not None:
1037 print(f" Title: {sanitize_display(task.title)}")
1038 print(f" Queue: {sanitize_display(task.queue)}")
1039 print(f" By: {sanitize_display(claim.claimer_run_id)}")
1040 if args.error:
1041 print(f" Error: {sanitize_display(args.error[:200])}")
1042 print(f"\n ({elapsed():.3f}s)")
1043
1044 # ── muse coord cancel-task ────────────────────────────────────────────────────
1045
1046 def register_cancel_task(
1047 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
1048 ) -> None:
1049 """Register ``cancel-task`` on *subparsers* (under ``muse coord``).
1050
1051 Wires all flags with their defaults and help text so that ``--help``
1052 output is accurate. Sets ``func`` to :func:`run_cancel_task`.
1053
1054 Flags registered
1055 ----------------
1056 ``task_id`` (positional)
1057 content-addressed ID of the task to cancel. Must be a valid sha256: content ID; validated before
1058 any file I/O.
1059 ``--run-id RUNID``
1060 Required. The calling agent's identifier. For pending tasks, this
1061 becomes the ``claimer_run_id`` in the cancel record. For claimed
1062 tasks, must match the original claimer unless ``--force`` is used.
1063 Capped at :data:`_MAX_RUN_ID_LEN` characters.
1064 ``--force``
1065 Cancel even if the task is claimed by a different agent. Use with
1066 caution — this aborts in-flight work without notifying the claimer.
1067 Harmless for pending tasks.
1068 ``--format`` / ``--json``
1069 Emit compact JSON to stdout; default is human-readable text.
1070
1071 JSON output schema::
1072
1073 {
1074 "schema_version": str,
1075 "task_id": str,
1076 "claimer_run_id": str,
1077 "claimed_at": str, // ISO 8601
1078 "expires_at": str, // ISO 8601
1079 "status": "cancelled",
1080 "heartbeat_at": str, // ISO 8601
1081 "claim_nonce": str,
1082 "result": null,
1083 "error": str,
1084 "duration_ms": float
1085 }
1086
1087 Exit codes::
1088
1089 0 — task cancelled
1090 1 — bad arguments, task not found, already terminal, or permission denied
1091 """
1092 parser = subparsers.add_parser(
1093 "cancel-task",
1094 help="Cancel a pending or claimed task.",
1095 formatter_class=argparse.RawDescriptionHelpFormatter,
1096 description=__doc__,
1097 )
1098 parser.add_argument("task_id", help="content-addressed ID of the task to cancel.")
1099 parser.add_argument(
1100 "--run-id",
1101 required=True,
1102 dest="run_id",
1103 metavar="RUNID",
1104 help=(
1105 "Calling agent identifier. For claimed tasks, must match the "
1106 "original claimer unless --force is used. "
1107 f"Maximum {_MAX_RUN_ID_LEN} characters."
1108 ),
1109 )
1110 parser.add_argument(
1111 "--force",
1112 action="store_true",
1113 default=False,
1114 help=(
1115 "Cancel even if the task is claimed by a different agent. "
1116 "Use with caution — aborts in-flight work without notifying "
1117 "the claimer."
1118 ),
1119 )
1120 _add_format_args(parser)
1121 parser.set_defaults(func=run_cancel_task)
1122
1123 def run_cancel_task(args: argparse.Namespace) -> None:
1124 """Cancel a pending or claimed task.
1125
1126 For pending tasks, atomically creates a cancelled claim via O_CREAT|O_EXCL.
1127 For claimed tasks, ownership is verified (--run-id must match) unless
1128 --force is set. Terminal tasks (completed/failed/cancelled) cannot be cancelled.
1129
1130 Agent quickstart::
1131
1132 muse coord cancel-task <task-id> --run-id agent-1 --json
1133 muse coord cancel-task <task-id> --run-id orch --force --json
1134
1135 JSON fields::
1136
1137 task_id content-addressed ID of the cancelled task.
1138 claimer_run_id Agent that cancelled the task.
1139 claimed_at ISO 8601 UTC of the cancel claim.
1140 expires_at ISO 8601 UTC expiry.
1141 status Always "cancelled".
1142 heartbeat_at ISO 8601 UTC of last heartbeat.
1143 claim_nonce Nonce from claim time.
1144 result null.
1145 error Cancellation reason (if any).
1146 muse_version Muse release that produced this output.
1147 schema Envelope schema version (int).
1148 exit_code 0 on success, 1 on error.
1149 duration_ms Wall-clock milliseconds for the command.
1150 timestamp ISO-8601 UTC timestamp of command completion.
1151 warnings List of non-fatal advisory messages.
1152
1153 Exit codes::
1154
1155 0 Task cancelled.
1156 1 Bad arguments, task not found, already terminal, or permission denied.
1157 """
1158 elapsed = start_timer()
1159 json_out: bool = args.json_out
1160
1161 # ── Input validation (before any file I/O) ────────────────────────────────
1162
1163 if len(args.run_id) > _MAX_RUN_ID_LEN:
1164 msg = f"--run-id is too long ({len(args.run_id)} chars; max {_MAX_RUN_ID_LEN})"
1165 _err(msg, json_out, "bad_args", elapsed)
1166 raise SystemExit(ExitCode.USER_ERROR)
1167
1168 try:
1169 from muse.core.task_queue import _validate_task_id
1170 _validate_task_id(args.task_id)
1171 except ValueError as exc:
1172 _err(str(exc), json_out, "bad_task_id", elapsed)
1173 raise SystemExit(ExitCode.USER_ERROR)
1174
1175 root = require_repo()
1176
1177 try:
1178 claim = cancel_task(root, args.task_id, args.run_id, force=args.force)
1179 except (FileNotFoundError, FileExistsError, PermissionError, RuntimeError, ValueError) as exc:
1180 _err(str(exc), json_out, elapsed=elapsed)
1181 raise SystemExit(ExitCode.USER_ERROR)
1182
1183 if json_out:
1184 print(json.dumps({**make_envelope(elapsed), **claim.to_dict()}))
1185 return
1186
1187 task = load_task(root, claim.task_id)
1188 print(f"\n🚫 Task cancelled")
1189 print(f" Task ID: {sanitize_display(claim.task_id)}")
1190 if task is not None:
1191 print(f" Title: {sanitize_display(task.title)}")
1192 print(f" Queue: {sanitize_display(task.queue)}")
1193 print(f" By: {sanitize_display(claim.claimer_run_id)}")
1194 if args.force:
1195 print(f" (forced)")
1196 print(f"\n ({elapsed():.3f}s)")
1197
1198 # ── muse coord tasks ──────────────────────────────────────────────────────────
1199
1200 def register_tasks(
1201 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
1202 ) -> None:
1203 """Register ``tasks`` on *subparsers* (under ``muse coord``).
1204
1205 Wires all flags with their defaults and help text so that ``--help``
1206 output is accurate. Sets ``func`` to :func:`run_tasks`.
1207
1208 Flags registered
1209 ----------------
1210 ``--status STATUS``
1211 Filter items to a single status value. One of: ``pending``,
1212 ``claimed``, ``timed_out``, ``completed``, ``failed``,
1213 ``cancelled``. The global status counts in the output always
1214 reflect the *full* queue regardless of this filter.
1215 ``--queue QUEUE``
1216 Filter items by queue name. Must match ``[a-zA-Z0-9_-]+``;
1217 validated before any file I/O.
1218 ``--run-id RUNID``
1219 Filter items to tasks whose claimer matches *RUNID* (claimed,
1220 completed, and failed tasks only). Capped at
1221 :data:`_MAX_RUN_ID_LEN` characters.
1222 ``--limit N``
1223 Maximum number of items to return (default: 200; max:
1224 :data:`_MAX_LIMIT`). The status counts always reflect the full
1225 queue; only the ``items`` list is truncated.
1226 ``--format`` / ``--json``
1227 Emit compact JSON to stdout; default is human-readable text.
1228
1229 Derived status values::
1230
1231 pending — not yet claimed
1232 claimed — actively claimed, claim TTL not expired
1233 timed_out — claim TTL expired (eligible for re-claiming)
1234 completed — done successfully
1235 failed — done with error
1236 cancelled — cancelled before or after claiming
1237
1238 JSON output schema::
1239
1240 {
1241 "schema_version": str,
1242 "total": int,
1243 "pending": int,
1244 "claimed": int,
1245 "timed_out": int,
1246 "completed": int,
1247 "failed": int,
1248 "cancelled": int,
1249 "limit": int,
1250 "truncated": bool,
1251 "items": [
1252 {
1253 "task_id": str,
1254 "title": str,
1255 "priority": int,
1256 "queue": str,
1257 "status": str,
1258 "created_by": str,
1259 "created_at": str, // ISO 8601
1260 "ttl_seconds": int,
1261 "tags": [str, ...],
1262 "payload": dict,
1263 "claimer_run_id": str | null,
1264 "expires_at": str | null // ISO 8601; null if not claimed
1265 },
1266 ...
1267 ],
1268 "duration_ms": float
1269 }
1270
1271 Exit codes::
1272
1273 0 — success (empty queue is still success)
1274 1 — bad arguments or unexpected error
1275 """
1276 parser = subparsers.add_parser(
1277 "tasks",
1278 help="List tasks with optional status/queue/run-id/limit filtering.",
1279 formatter_class=argparse.RawDescriptionHelpFormatter,
1280 description=__doc__,
1281 )
1282 parser.add_argument(
1283 "--status",
1284 default=None,
1285 choices=("pending", "claimed", "timed_out", "completed", "failed", "cancelled"),
1286 metavar="STATUS",
1287 help=(
1288 "Filter by status: pending | claimed | timed_out | "
1289 "completed | failed | cancelled"
1290 ),
1291 )
1292 parser.add_argument(
1293 "--queue",
1294 default=None,
1295 metavar="QUEUE",
1296 help=(
1297 "Filter by queue name. Must match [a-zA-Z0-9_-]+. "
1298 "Validated before any file I/O."
1299 ),
1300 )
1301 parser.add_argument(
1302 "--run-id",
1303 default=None,
1304 dest="run_id",
1305 metavar="RUNID",
1306 help=(
1307 "Filter by claimer run_id (claimed/completed/failed tasks only). "
1308 f"Maximum {_MAX_RUN_ID_LEN} characters."
1309 ),
1310 )
1311 parser.add_argument(
1312 "--limit",
1313 type=int,
1314 default=200,
1315 metavar="N",
1316 help=(
1317 f"Maximum items to return (default: 200; max: {_MAX_LIMIT}). "
1318 "Status counts always reflect the full queue."
1319 ),
1320 )
1321 _add_format_args(parser)
1322 parser.set_defaults(func=run_tasks)
1323
1324 def run_tasks(args: argparse.Namespace) -> None:
1325 """List tasks from the coordination queue with filtering and pagination.
1326
1327 Loads all tasks and claims, derives per-task status, applies optional
1328 filters (--status, --queue, --run-id), sorts by priority desc / created_at
1329 asc, and truncates to --limit. Global status counts always reflect the
1330 full queue regardless of filters.
1331
1332 Agent quickstart::
1333
1334 muse coord tasks --json
1335 muse coord tasks --status pending --json
1336 muse coord tasks --queue hotfix --limit 50 --json
1337 muse coord tasks --run-id agent-1 --json
1338
1339 JSON fields::
1340
1341 total Total tasks across all statuses.
1342 pending Count of pending tasks.
1343 claimed Count of actively claimed tasks.
1344 timed_out Count of tasks with expired claims.
1345 completed Count of completed tasks.
1346 failed Count of failed tasks.
1347 cancelled Count of cancelled tasks.
1348 limit --limit value applied to the items list.
1349 truncated true if items were truncated to limit.
1350 items List of task+status entries.
1351 muse_version Muse release that produced this output.
1352 schema Envelope schema version (int).
1353 exit_code 0 on success, 1 on bad arguments.
1354 duration_ms Wall-clock milliseconds for the command.
1355 timestamp ISO-8601 UTC timestamp of command completion.
1356 warnings List of non-fatal advisory messages.
1357
1358 Exit codes::
1359
1360 0 Success (empty queue is still success).
1361 1 Bad arguments or unexpected error.
1362 """
1363 import datetime as _dt
1364
1365 elapsed = start_timer()
1366 json_out: bool = args.json_out
1367 limit: int = getattr(args, "limit", 200)
1368
1369 # ── Input validation (before any file I/O) ────────────────────────────────
1370
1371 if args.queue is not None:
1372 try:
1373 _validate_queue_name(args.queue)
1374 except ValueError as exc:
1375 _err(str(exc), json_out, "bad_queue", elapsed)
1376 raise SystemExit(ExitCode.USER_ERROR)
1377
1378 if args.run_id is not None and len(args.run_id) > _MAX_RUN_ID_LEN:
1379 msg = f"--run-id is too long ({len(args.run_id)} chars; max {_MAX_RUN_ID_LEN})"
1380 _err(msg, json_out, "bad_args", elapsed)
1381 raise SystemExit(ExitCode.USER_ERROR)
1382
1383 if not (1 <= limit <= _MAX_LIMIT):
1384 msg = f"--limit must be between 1 and {_MAX_LIMIT}, got {limit}"
1385 _err(msg, json_out, "bad_args", elapsed)
1386 raise SystemExit(ExitCode.USER_ERROR)
1387
1388 root = require_repo()
1389
1390 all_tasks = load_all_tasks(root)
1391 all_claims = load_all_claims(root)
1392 now_ts = _dt.datetime.now(_dt.timezone.utc)
1393
1394 # Build enriched items (filtered).
1395 items = []
1396 for task in all_tasks:
1397 claim = all_claims.get(task.task_id)
1398 status = get_task_status(task, claim, now_ts)
1399
1400 if args.status and status != args.status:
1401 continue
1402 if args.queue and task.queue != args.queue:
1403 continue
1404 if args.run_id:
1405 if claim is None or claim.claimer_run_id != args.run_id:
1406 continue
1407
1408 items.append({
1409 "task_id": task.task_id,
1410 "title": task.title,
1411 "priority": task.priority,
1412 "queue": task.queue,
1413 "status": status,
1414 "created_by": task.created_by,
1415 "created_at": task.created_at.isoformat(),
1416 "ttl_seconds": task.ttl_seconds,
1417 "tags": task.tags,
1418 "payload": task.payload,
1419 "claimer_run_id": claim.claimer_run_id if claim else None,
1420 "expires_at": claim.expires_at.isoformat() if claim else None,
1421 })
1422
1423 # Sort: priority desc, then created_at asc.
1424 items.sort(key=lambda i: (-i["priority"], i["created_at"]))
1425
1426 # Apply limit after sort so the highest-priority items are always included.
1427 truncated = len(items) > limit
1428 items = items[:limit]
1429
1430 # Global status counts — always computed from the full all_tasks list,
1431 # independent of any filter, so the summary bar reflects queue health.
1432 counts: _IntMap = {
1433 s: 0 for s in ("pending", "claimed", "timed_out", "completed", "failed", "cancelled")
1434 }
1435 for task in all_tasks:
1436 claim = all_claims.get(task.task_id)
1437 s = get_task_status(task, claim, now_ts)
1438 counts[s] += 1
1439
1440 if json_out:
1441 print(json.dumps({
1442 **make_envelope(elapsed),
1443 "total": sum(counts.values()),
1444 **counts,
1445 "limit": limit,
1446 "truncated": truncated,
1447 "items": items,
1448 }))
1449 return
1450
1451 # Text output.
1452 filter_parts = []
1453 if args.status:
1454 filter_parts.append(f"status={args.status}")
1455 if args.queue:
1456 filter_parts.append(f"queue={sanitize_display(args.queue)}")
1457 if args.run_id:
1458 filter_parts.append(f"run-id={sanitize_display(args.run_id)}")
1459 filter_str = f" filter: {', '.join(filter_parts)}" if filter_parts else ""
1460
1461 print(f"\nTask queue — {sum(counts.values())} task(s)")
1462 if filter_str:
1463 print(filter_str)
1464 print(
1465 f" {counts['pending']} pending "
1466 f"{counts['claimed']} claimed "
1467 f"{counts['timed_out']} timed_out "
1468 f"{counts['completed']} completed "
1469 f"{counts['failed']} failed "
1470 f"{counts['cancelled']} cancelled"
1471 )
1472 print("─" * 80)
1473
1474 if not items:
1475 print("\n (no tasks matching filter)")
1476 else:
1477 print(f"\n{'ID':8} {'ST':10} {'PRI':3} {'QUEUE':12} {'CLAIMER':20} TITLE")
1478 print("─" * 80)
1479 _STATUS_ICONS = {
1480 "pending": "⏳",
1481 "claimed": "🔒",
1482 "timed_out": "⏰",
1483 "completed": "✅",
1484 "failed": "❌",
1485 "cancelled": "🚫",
1486 }
1487 for item in items:
1488 icon = _STATUS_ICONS.get(item["status"], "?")
1489 tid = sanitize_display(item["task_id"])
1490 st = f"{icon}{item['status'][:9]}"
1491 pri = str(item["priority"])
1492 q = sanitize_display(item["queue"][:12])
1493 claimer = sanitize_display((item["claimer_run_id"] or "-")[:20])
1494 title = sanitize_display(item["title"][:40])
1495 print(f"{tid} {st:<11} {pri:>3} {q:<12} {claimer:<20} {title}")
1496
1497 if truncated:
1498 print(f"\n (showing {limit} of {len(items) + (len(all_tasks) - limit)} — use --limit to see more)")
1499
1500 print(f"\n ({elapsed():.3f}s)")
1501
1502 # ── Registration ──────────────────────────────────────────────────────────────
1503
1504 def register_all(
1505 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
1506 ) -> None:
1507 """Register all task-queue subcommands on *subparsers*.
1508
1509 Called from :func:`muse.cli.app.main` to attach the five task-queue
1510 commands to the ``muse coord`` subparser group.
1511 """
1512 register_enqueue(subparsers)
1513 register_claim(subparsers)
1514 register_complete(subparsers)
1515 register_fail_task(subparsers)
1516 register_cancel_task(subparsers)
1517 register_tasks(subparsers)
1518
File History 1 commit
sha256:1ddad36d76d3a8d323f9b3664169cb184b7a38b39208214a2ae504154260826f fix: show full cryptographic IDs in all human-readable CLI output Sonnet 4.6 patch 1 day ago