gabriel / muse public
coord_sync.py python
769 lines 29.5 KB
Raw
1 """``muse coord sync`` — push/pull local coordination state to/from MuseHub.
2
3 Enables distributed agent swarms to share coordination records (reservations,
4 intents, releases, heartbeats, dependencies, tasks, claims) across machines
5 via the MuseHub coordination bus.
6
7 Sub-commands
8 ------------
9 ``muse coord sync push``
10 Reads all local coordination records from ``.muse/coordination/`` and
11 pushes them to MuseHub in batches. Records already on the hub are
12 silently skipped (idempotent). Requires a valid signing identity.
13
14 ``muse coord sync pull``
15 Pulls coordination records from MuseHub into local read-only JSON files
16 under ``.muse/coordination/remote/``. Local agents can read these to
17 discover reservations and tasks created by remote agents. Requires auth
18 for private repos; public repos allow unauthenticated pull.
19
20 Usage::
21
22 muse coord sync push --hub https://localhost:1337 --owner gabriel --slug myrepo
23 muse coord sync pull --hub https://localhost:1337 --owner gabriel --slug myrepo
24 muse coord sync pull --hub https://localhost:1337 --owner gabriel --slug myrepo \\
25 --since-id 42 --kinds reservation heartbeat
26
27 Output (push)::
28
29 ✅ Pushed 12 record(s) to gabriel/myrepo — inserted: 10, skipped: 2
30
31 Output (pull)::
32
33 ✅ Pulled 8 new record(s) from gabriel/myrepo — cursor: 20
34
35 JSON output (push)::
36
37 {
38 "schema_version": "...",
39 "inserted": 10,
40 "skipped": 2,
41 "total": 12,
42 "failed": false,
43 "duration_ms": 0.123
44 }
45
46 JSON output (pull)::
47
48 {
49 "schema_version": "...",
50 "count": 8,
51 "cursor": 20,
52 "records": [...],
53 "duration_ms": 0.045
54 }
55
56 Flags (common):
57
58 ``--hub URL``
59 MuseHub base URL (default: read from ``[hub] url`` in ``config.toml``).
60
61 ``--owner NAME``
62 Repository owner username. Maximum 256 characters.
63
64 ``--slug NAME``
65 Repository URL slug. Maximum 256 characters.
66
67 ``--json``
68 Emit results as JSON.
69
70 Push-only flags:
71
72 ``--kinds KIND [KIND ...]``
73 Limit push to specific record kinds (default: all kinds).
74
75 Pull-only flags:
76
77 ``--since-id N``
78 Return only records with id > N (default: 0 = all records). Must be >= 0.
79
80 ``--kinds KIND [KIND ...]``
81 Filter by record kind.
82
83 ``--limit N``
84 Maximum records to pull per call (1–1000, default: 500).
85
86 Exit codes::
87
88 0 — success
89 1 — bad arguments, auth failure, or hub communication error
90 """
91
92 import argparse
93 import json
94 import logging
95 import pathlib
96 import re
97 import sys
98 from datetime import datetime, timezone
99 from typing import TYPE_CHECKING
100
101 from muse.core.types import load_json_file
102 from muse.core.paths import coordination_dir as _coordination_dir
103 from muse.core.coord_bus import (
104 CoordBusError,
105 MAX_PUSH_BATCH,
106 pull_from_hub,
107 push_to_hub,
108 )
109 from muse.core.coord_bus import JsonDict
110 from muse.core.errors import ExitCode
111 from muse.core.repo import require_repo
112 from muse.core.io import write_text_atomic
113 from muse.core.validation import clamp_int, sanitize_display
114 from muse.core.envelope import EnvelopeJson, make_envelope
115 from muse.core.timing import start_timer
116 from typing import TypedDict
117
118 if TYPE_CHECKING:
119 from muse.core.transport import SigningIdentity
120
121 logger = logging.getLogger(__name__)
122
123 # Coordination record kinds supported by the bus.
124 _ALL_KINDS = (
125 "reservation",
126 "intent",
127 "release",
128 "heartbeat",
129 "dependency",
130 "task",
131 "claim",
132 )
133
134 # Directory under .muse/coordination/ where pulled remote records are stored.
135 _REMOTE_DIR_NAME = "remote"
136
137 # ── Input constraints ─────────────────────────────────────────────────────────
138
139 #: Maximum length of --owner and --slug to prevent oversized HTTP requests.
140 _MAX_OWNER_LEN: int = 256
141 _MAX_SLUG_LEN: int = 256
142
143 #: Maximum records to pull per call.
144 _MAX_PULL_LIMIT: int = 1000
145
146 #: Safe record ID pattern — alphanumeric, hyphens, underscores, colons.
147 #: Accepts sha256 genesis IDs (sha256:<64-hex>) and opaque run_id strings.
148 #: Prevents path traversal when server-supplied record_id is used to
149 #: construct filesystem paths under .muse/coordination/remote/<kind>/<id>.json.
150 _SAFE_RECORD_ID_RE: re.Pattern[str] = re.compile(r"^[a-zA-Z0-9_\-:]{1,128}$")
151
152 # ── Error helper ──────────────────────────────────────────────────────────────
153
154 def _err(msg: str, json_out: bool = False, status: str = "error") -> None:
155 """Print an error and return. Caller is responsible for raising SystemExit."""
156 if json_out:
157 print(json.dumps({"error": msg, "status": status}))
158 else:
159 print(f"❌ {msg}", file=sys.stderr)
160
161 class _CoordSyncPushJson(EnvelopeJson):
162 """JSON output for ``muse coord sync push --json``."""
163
164 inserted: int
165 skipped: int
166 total: int
167 failed: bool
168 errors: list[str]
169
170 class _CoordSyncPullJson(EnvelopeJson):
171 """JSON output for ``muse coord sync pull --json``."""
172
173 count: int
174 cursor: int
175 records: list[dict]
176
177 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
178 """Register ``sync`` on *subparsers* (nested push/pull sub-subcommands).
179
180 Wires ``push`` and ``pull`` sub-subcommands with their flags, defaults,
181 and help text. Both sub-commands share ``--hub``, ``--owner``, ``--slug``,
182 ``--format`` / ``--json``. Sets ``func`` to
183 :func:`run_push` and :func:`run_pull` respectively.
184 """
185 sync_parser = subparsers.add_parser(
186 "sync",
187 help="Push/pull coordination records to/from MuseHub.",
188 description=__doc__,
189 formatter_class=argparse.RawDescriptionHelpFormatter,
190 )
191 sync_subs = sync_parser.add_subparsers(dest="sync_command", metavar="SYNC_COMMAND")
192 sync_subs.required = True
193
194 # ── Common arguments shared by push and pull ──────────────────────────────
195 def _add_common(p: argparse.ArgumentParser) -> None:
196 p.add_argument(
197 "--hub",
198 default=None,
199 metavar="URL",
200 help="MuseHub base URL (default: from config.toml [hub] url).",
201 )
202 p.add_argument(
203 "--owner",
204 required=True,
205 metavar="NAME",
206 help=f"Repository owner username (max {_MAX_OWNER_LEN} chars).",
207 )
208 p.add_argument(
209 "--slug",
210 required=True,
211 metavar="NAME",
212 help=f"Repository URL slug (max {_MAX_SLUG_LEN} chars).",
213 )
214 p.add_argument(
215 "--json", "-j",
216 action="store_true",
217 dest="json_out",
218 help="Emit machine-readable JSON.",
219 )
220
221 # ── Pull ──────────────────────────────────────────────────────────────────
222 pull_parser = sync_subs.add_parser(
223 "pull",
224 help="Pull coordination records from MuseHub.",
225 description=(
226 "Fetch coordination records from MuseHub and write them to\n"
227 ".muse/coordination/remote/<kind>/<id>.json.\n"
228 "Use --since-id <cursor> for incremental pulls — pass the cursor\n"
229 "from the previous JSON output back as --since-id on the next call.\n\n"
230 "Agent quickstart\n"
231 "----------------\n"
232 " muse coord sync pull --owner gabriel --slug myrepo --json\n"
233 " muse coord sync pull --owner gabriel --slug myrepo -j\n"
234 " muse coord sync pull --owner gabriel --slug myrepo -j | jq .cursor\n"
235 " muse coord sync pull --owner gabriel --slug myrepo \\\n"
236 " --since-id 42 --kinds reservation intent -j\n\n"
237 "JSON output schema\n"
238 "------------------\n"
239 ' {"schema_version": "<str>", "count": <int>, "cursor": <int>,\n'
240 ' "records": [...], "duration_ms": <float>}\n\n'
241 "Exit codes\n"
242 "----------\n"
243 " 0 — pull succeeded (0 records is a valid success)\n"
244 " 1 — bad arguments, auth error, or hub communication error\n"
245 " 2 — not inside a Muse repository\n"
246 ),
247 formatter_class=argparse.RawDescriptionHelpFormatter,
248 )
249 _add_common(pull_parser)
250 pull_parser.add_argument(
251 "--since-id",
252 dest="since_id",
253 type=int,
254 default=0,
255 metavar="N",
256 help="Only pull records with id > N (default: 0 = all). Must be >= 0.",
257 )
258 pull_parser.add_argument(
259 "--kinds",
260 nargs="+",
261 choices=_ALL_KINDS,
262 default=[],
263 metavar="KIND",
264 help=f"Filter by kind. Choices: {', '.join(_ALL_KINDS)}.",
265 )
266 pull_parser.add_argument(
267 "--limit",
268 type=int,
269 default=500,
270 metavar="N",
271 help=f"Maximum records to pull per call (1–{_MAX_PULL_LIMIT}, default: 500).",
272 )
273 pull_parser.set_defaults(func=run_pull)
274
275 # ── Push ──────────────────────────────────────────────────────────────────
276 push_parser = sync_subs.add_parser(
277 "push",
278 help="Push local coordination records to MuseHub.",
279 description=(
280 "Read all local coordination files from .muse/coordination/ and\n"
281 "POST them to MuseHub in batches. Records already on the hub are\n"
282 "silently skipped (idempotent). Fails with exit 1 if any batch\n"
283 "errors; other batches still complete.\n\n"
284 "Agent quickstart\n"
285 "----------------\n"
286 " muse coord sync push --owner gabriel --slug myrepo --json\n"
287 " muse coord sync push --owner gabriel --slug myrepo -j\n"
288 " muse coord sync push --owner gabriel --slug myrepo -j | jq .inserted\n"
289 " muse coord sync push --owner gabriel --slug myrepo \\\n"
290 " --kinds reservation intent -j\n\n"
291 "JSON output schema\n"
292 "------------------\n"
293 ' {"schema_version": "<str>", "inserted": <int>, "skipped": <int>,\n'
294 ' "total": <int>, "failed": <bool>, "duration_ms": <float>}\n\n'
295 "Exit codes\n"
296 "----------\n"
297 " 0 — all batches pushed successfully (or nothing to push)\n"
298 " 1 — bad arguments, auth error, or at least one batch failed\n"
299 " 2 — not inside a Muse repository\n"
300 ),
301 formatter_class=argparse.RawDescriptionHelpFormatter,
302 )
303 _add_common(push_parser)
304 push_parser.add_argument(
305 "--kinds",
306 nargs="+",
307 choices=_ALL_KINDS,
308 default=list(_ALL_KINDS),
309 metavar="KIND",
310 help=f"Limit push to these kinds (default: all). Choices: {', '.join(_ALL_KINDS)}.",
311 )
312 push_parser.set_defaults(func=run_push)
313
314 # ── Helpers ────────────────────────────────────────────────────────────────────
315
316 def _resolve_hub_and_signing(
317 args: argparse.Namespace,
318 json_out: bool = False,
319 ) -> tuple[str, str | None]:
320 """Return ``(hub_url, signing)`` from args or config.
321
322 Reads ``[hub] url`` from ``config.toml`` and the signing identity from
323 ``~/.muse/identity.toml`` when the corresponding CLI flags are omitted.
324
325 Raises :class:`SystemExit` with exit code
326 :attr:`~muse.core.errors.ExitCode.USER_ERROR` if no hub URL can be
327 determined. Error messages go to *stdout* as JSON when *json_out* is
328 ``True``, otherwise to *stderr*.
329
330 Args:
331 args: Parsed ``argparse.Namespace`` with ``hub`` and ``token`` attrs.
332 json_out: Route error messages to stdout as JSON when ``True``.
333
334 Returns:
335 ``(hub_url, signing)`` where *signing* may be ``None`` for unauthenticated
336 access to public repos.
337 """
338 hub_url: str | None = args.hub
339 signing: SigningIdentity | None = None
340
341 if hub_url is None or signing is None:
342 try:
343 from muse.cli.config import get_hub_url, get_signing_identity
344 if hub_url is None:
345 hub_url = get_hub_url()
346 if signing is None:
347 signing = get_signing_identity(hub_url)
348 except Exception as exc: # noqa: BLE001
349 if hub_url is None:
350 msg = (
351 "no --hub URL supplied and could not read hub URL from config: "
352 f"{exc}"
353 )
354 _err(msg, json_out, "no_hub_url")
355 raise SystemExit(ExitCode.USER_ERROR)
356
357 if hub_url is None:
358 _err("--hub URL is required", json_out, "no_hub_url")
359 raise SystemExit(ExitCode.USER_ERROR)
360
361 return hub_url, signing
362
363 def _gather_local_records(
364 root: pathlib.Path,
365 kinds: list[str],
366 ) -> list[JsonDict]:
367 """Load local coordination records from ``.muse/coordination/``.
368
369 Returns a flat list of serialized record dicts, each with the fields
370 expected by the hub's push API: ``kind``, ``record_id``, ``run_id``,
371 ``payload``, and ``expires_at``.
372
373 Only kinds listed in *kinds* are included. Corrupt or unreadable files
374 are silently skipped (logged at DEBUG level) so a single bad file never
375 blocks a push.
376
377 Args:
378 root: Repository root directory (contains ``.muse/``).
379 kinds: Subset of :data:`_ALL_KINDS` to include.
380
381 Returns:
382 List of record dicts ready for ``push_to_hub``.
383 """
384 coord_dir = _coordination_dir(root)
385 records: list[JsonDict] = []
386
387 # ── Reservations ──────────────────────────────────────────────────────────
388 if "reservation" in kinds:
389 res_dir = coord_dir / "reservations"
390 if res_dir.is_dir():
391 for fpath in res_dir.glob("*.json"):
392 try:
393 data = load_json_file(fpath)
394 if data is None:
395 continue
396 records.append({
397 "kind": "reservation",
398 "record_id": data.get("reservation_id", fpath.stem),
399 "run_id": data.get("run_id", ""),
400 "payload": data,
401 "expires_at": data.get("expires_at"),
402 })
403 except Exception: # noqa: BLE001
404 logger.debug("gather_local_records: skipping %s", fpath)
405
406 # ── Heartbeats ────────────────────────────────────────────────────────────
407 if "heartbeat" in kinds:
408 hb_dir = coord_dir / "heartbeats"
409 if hb_dir.is_dir():
410 for fpath in hb_dir.glob("*.json"):
411 try:
412 data = load_json_file(fpath)
413 if data is None:
414 continue
415 records.append({
416 "kind": "heartbeat",
417 "record_id": data.get("run_id", fpath.stem),
418 "run_id": data.get("run_id", ""),
419 "payload": data,
420 "expires_at": data.get("expires_at"),
421 })
422 except Exception: # noqa: BLE001
423 logger.debug("gather_local_records: skipping heartbeat %s", fpath)
424
425 # ── Intents ───────────────────────────────────────────────────────────────
426 if "intent" in kinds:
427 intent_dir = coord_dir / "intents"
428 if intent_dir.is_dir():
429 for fpath in intent_dir.glob("*.json"):
430 try:
431 data = load_json_file(fpath)
432 if data is None:
433 continue
434 records.append({
435 "kind": "intent",
436 "record_id": data.get("intent_id", fpath.stem),
437 "run_id": data.get("run_id", ""),
438 "payload": data,
439 "expires_at": data.get("expires_at"),
440 })
441 except Exception: # noqa: BLE001
442 logger.debug("gather_local_records: skipping intent %s", fpath)
443
444 # ── Releases ──────────────────────────────────────────────────────────────
445 if "release" in kinds:
446 release_dir = coord_dir / "releases"
447 if release_dir.is_dir():
448 for fpath in release_dir.glob("*.json"):
449 try:
450 data = load_json_file(fpath)
451 if data is None:
452 continue
453 records.append({
454 "kind": "release",
455 "record_id": data.get("release_id", fpath.stem),
456 "run_id": data.get("run_id", ""),
457 "payload": data,
458 "expires_at": None,
459 })
460 except Exception: # noqa: BLE001
461 logger.debug("gather_local_records: skipping release %s", fpath)
462
463 # ── Dependencies ──────────────────────────────────────────────────────────
464 if "dependency" in kinds:
465 dep_dir = coord_dir / "dependencies"
466 if dep_dir.is_dir():
467 for fpath in dep_dir.glob("*.json"):
468 try:
469 data = load_json_file(fpath)
470 if data is None:
471 continue
472 records.append({
473 "kind": "dependency",
474 "record_id": data.get("reservation_id", fpath.stem),
475 "run_id": data.get("run_id", ""),
476 "payload": data,
477 "expires_at": None,
478 })
479 except Exception: # noqa: BLE001
480 logger.debug("gather_local_records: skipping dependency %s", fpath)
481
482 # ── Tasks ─────────────────────────────────────────────────────────────────
483 if "task" in kinds:
484 task_dir = coord_dir / "tasks"
485 if task_dir.is_dir():
486 for fpath in task_dir.glob("*.json"):
487 try:
488 data = load_json_file(fpath)
489 if data is None:
490 continue
491 records.append({
492 "kind": "task",
493 "record_id": data.get("task_id", fpath.stem),
494 "run_id": data.get("run_id", ""),
495 "payload": data,
496 "expires_at": None,
497 })
498 except Exception: # noqa: BLE001
499 logger.debug("gather_local_records: skipping task %s", fpath)
500
501 # ── Claims ────────────────────────────────────────────────────────────────
502 if "claim" in kinds:
503 claim_dir = coord_dir / "claims"
504 if claim_dir.is_dir():
505 for fpath in claim_dir.glob("*.json"):
506 try:
507 data = load_json_file(fpath)
508 if data is None:
509 continue
510 records.append({
511 "kind": "claim",
512 "record_id": data.get("task_id", fpath.stem),
513 "run_id": data.get("claimer_run_id", ""),
514 "payload": data,
515 "expires_at": data.get("expires_at"),
516 })
517 except Exception: # noqa: BLE001
518 logger.debug("gather_local_records: skipping claim %s", fpath)
519
520 return records
521
522 def _write_remote_records(
523 root: pathlib.Path,
524 records: list[JsonDict],
525 ) -> None:
526 """Write pulled remote records to ``.muse/coordination/remote/``.
527
528 Each record is written to ``remote/<kind>/<record_id>.json``.
529 Existing files are overwritten to keep remote state up to date.
530
531 Security
532 --------
533 Both *kind* and *record_id* come from the MuseHub response and must be
534 sanitized before use in filesystem paths:
535
536 * **kind** — validated against the :data:`_ALL_KINDS` allowlist. Records
537 with an unrecognised kind are skipped; a server cannot write outside the
538 ``remote/`` subtree by supplying ``"../traversal"`` as the kind.
539 * **record_id** — validated against :data:`_SAFE_RECORD_ID_RE`
540 (alphanumeric, hyphens, underscores, colons, 1–128 chars). Records with
541 an unsafe ID are skipped; a server cannot escape the kind directory via
542 ``"../../../etc/passwd"`` or similar.
543
544 Args:
545 root: Repository root directory (contains ``.muse/``).
546 records: List of record dicts returned by :func:`pull_from_hub`.
547 """
548 remote_dir = _coordination_dir(root) / _REMOTE_DIR_NAME
549 for rec in records:
550 kind = rec.get("kind", "")
551 record_id = rec.get("record_id", "")
552
553 # Allowlist check — reject unknown kinds outright.
554 if kind not in _ALL_KINDS:
555 logger.warning("write_remote_records: rejected unknown kind %r", kind)
556 continue
557
558 # ID safety check — reject traversal-capable strings.
559 if not record_id or not _SAFE_RECORD_ID_RE.match(record_id):
560 logger.warning(
561 "write_remote_records: rejected unsafe record_id %r (kind=%r)",
562 record_id,
563 kind,
564 )
565 continue
566
567 kind_dir = remote_dir / kind
568 kind_dir.mkdir(parents=True, exist_ok=True)
569 target = kind_dir / f"{record_id}.json"
570 write_text_atomic(target, f"{json.dumps(rec)}\n")
571
572 # ── Command handlers ───────────────────────────────────────────────────────────
573
574 def run_push(args: argparse.Namespace) -> None:
575 """Push local coordination records to MuseHub.
576
577 Loads all local ``.muse/coordination/`` records, batches them, and POSTs to
578 the hub. Idempotent — records already on the hub are silently skipped. If
579 any batch fails the remaining batches still run; ``failed=true`` in output.
580
581 Agent quickstart
582 ----------------
583 ::
584
585 muse coord-sync push --owner gabriel --slug muse --json
586 muse coord-sync push --owner gabriel --slug muse --kinds locks reservations --json
587
588 JSON fields
589 -----------
590 inserted Number of records the hub accepted as new.
591 skipped Number of records already present on the hub.
592 total Total local records gathered.
593 failed ``true`` if any batch raised an error.
594 errors List of error strings (empty on success).
595
596 Exit codes
597 ----------
598 0 All batches pushed (or nothing to push).
599 1 Bad arguments, auth error, or at least one batch failed.
600 2 Not inside a Muse repository.
601 """
602 json_out: bool = args.json_out
603 owner: str = args.owner
604 slug: str = args.slug
605
606 # ── Input validation (before any file I/O) ────────────────────────────────
607
608 if len(owner) > _MAX_OWNER_LEN:
609 msg = f"--owner is too long ({len(owner)} chars; max {_MAX_OWNER_LEN})"
610 _err(msg, json_out, "bad_args")
611 raise SystemExit(ExitCode.USER_ERROR)
612
613 if len(slug) > _MAX_SLUG_LEN:
614 msg = f"--slug is too long ({len(slug)} chars; max {_MAX_SLUG_LEN})"
615 _err(msg, json_out, "bad_args")
616 raise SystemExit(ExitCode.USER_ERROR)
617
618 root = require_repo()
619 hub_url, signing = _resolve_hub_and_signing(args, json_out)
620 kinds: list[str] = args.kinds
621
622 elapsed = start_timer()
623 records = _gather_local_records(root, kinds)
624
625 if not records:
626 if json_out:
627 print(json.dumps(_CoordSyncPushJson(
628 **make_envelope(elapsed),
629 inserted=0, skipped=0, total=0, failed=False, errors=[],
630 )))
631 else:
632 print(" (no local coordination records to push)")
633 return
634
635 # Push in batches.
636 total_inserted = 0
637 total_skipped = 0
638 failed = False
639 errors: list[str] = []
640
641 for i in range(0, len(records), MAX_PUSH_BATCH):
642 batch = records[i : i + MAX_PUSH_BATCH]
643 try:
644 result = push_to_hub(hub_url, owner, slug, batch, signing)
645 total_inserted += int(result.get("inserted", 0))
646 total_skipped += int(result.get("skipped", 0))
647 except CoordBusError as exc:
648 logger.error("coord sync push: batch %d failed: %s", i // MAX_PUSH_BATCH + 1, exc)
649 if not json_out:
650 # Text mode: print error to stderr immediately so the operator
651 # sees it. In JSON mode we accumulate errors and emit them in
652 # the single final JSON object — printing here would produce
653 # multiple JSON objects on stdout which breaks any caller doing
654 # json.loads(stdout).
655 _err(str(exc), json_out=False, status="hub_error")
656 errors.append(str(exc))
657 failed = True
658
659 if json_out:
660 print(json.dumps(_CoordSyncPushJson(
661 **make_envelope(elapsed),
662 inserted=total_inserted,
663 skipped=total_skipped,
664 total=len(records),
665 failed=failed,
666 errors=errors,
667 )))
668 else:
669 status_icon = "❌" if failed else "✅"
670 print(
671 f"\n{status_icon} Pushed {len(records)} record(s) to "
672 f"{sanitize_display(owner)}/{sanitize_display(slug)}"
673 f" — inserted: {total_inserted}, skipped: {total_skipped}"
674 f" ({elapsed():.3f}s)"
675 )
676
677 if failed:
678 raise SystemExit(ExitCode.USER_ERROR)
679
680 def run_pull(args: argparse.Namespace) -> None:
681 """Pull coordination records from MuseHub.
682
683 Writes pulled records to ``.muse/coordination/remote/<kind>/<id>.json``.
684 Returns a cursor (last seen record ID) for incremental pulls — pass it back
685 as ``--since-id`` on subsequent calls. Exit 0 even when 0 records available.
686
687 Agent quickstart
688 ----------------
689 ::
690
691 muse coord-sync pull --owner gabriel --slug muse --json
692 muse coord-sync pull --owner gabriel --slug muse --since-id 42 --json
693
694 JSON fields
695 -----------
696 count Number of records returned by this pull.
697 cursor Last record ID returned; pass as ``--since-id`` for next pull.
698 ``0`` when no records were returned.
699 records Full list of record dicts as returned by the hub.
700
701 Exit codes
702 ----------
703 0 Pull succeeded (0 records is also 0).
704 1 Bad arguments, auth error, or hub communication error.
705 2 Not inside a Muse repository.
706 """
707 json_out: bool = args.json_out
708 owner: str = args.owner
709 slug: str = args.slug
710
711 # ── Input validation (before any file I/O) ────────────────────────────────
712
713 if len(owner) > _MAX_OWNER_LEN:
714 msg = f"--owner is too long ({len(owner)} chars; max {_MAX_OWNER_LEN})"
715 _err(msg, json_out, "bad_args")
716 raise SystemExit(ExitCode.USER_ERROR)
717
718 if len(slug) > _MAX_SLUG_LEN:
719 msg = f"--slug is too long ({len(slug)} chars; max {_MAX_SLUG_LEN})"
720 _err(msg, json_out, "bad_args")
721 raise SystemExit(ExitCode.USER_ERROR)
722
723 since_id: int = args.since_id
724 if since_id < 0:
725 msg = f"--since-id must be >= 0, got {since_id}"
726 _err(msg, json_out, "bad_args")
727 raise SystemExit(ExitCode.USER_ERROR)
728
729 raw_limit: int = args.limit
730 if not (1 <= raw_limit <= _MAX_PULL_LIMIT):
731 msg = f"--limit must be 1–{_MAX_PULL_LIMIT}, got {raw_limit}"
732 _err(msg, json_out, "bad_args")
733 raise SystemExit(ExitCode.USER_ERROR)
734
735 limit: int = clamp_int(raw_limit, 1, _MAX_PULL_LIMIT, "limit")
736 kinds: list[str] = args.kinds or []
737
738 root = require_repo()
739 hub_url, signing = _resolve_hub_and_signing(args, json_out)
740
741 elapsed = start_timer()
742 try:
743 result = pull_from_hub(hub_url, owner, slug, since_id, kinds, limit, signing)
744 except CoordBusError as exc:
745 _err(str(exc), json_out, "hub_error")
746 raise SystemExit(ExitCode.USER_ERROR)
747
748 pulled_records: list[JsonDict] = result.get("records", [])
749 cursor: int = result.get("cursor", 0)
750
751 # Write remote records to disk (path-safe: kind + record id are sanitized inside).
752 if pulled_records:
753 _write_remote_records(root, pulled_records)
754
755 if json_out:
756 print(json.dumps(_CoordSyncPullJson(
757 **make_envelope(elapsed),
758 count=len(pulled_records),
759 cursor=cursor,
760 records=pulled_records,
761 )))
762 else:
763 print(
764 f"\n✅ Pulled {len(pulled_records)} new record(s) from "
765 f"{sanitize_display(owner)}/{sanitize_display(slug)}"
766 f" — cursor: {cursor} ({elapsed():.3f}s)"
767 )
768 if pulled_records:
769 print(f" Records written to .muse/coordination/remote/")
File History 1 commit