coord_sync.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago
| 1 | """``muse coord sync`` — push/pull local coordination state to/from MuseHub. |
| 2 | |
| 3 | Enables distributed agent swarms to share coordination records (reservations, |
| 4 | intents, releases, heartbeats, dependencies, tasks, claims) across machines |
| 5 | via the MuseHub coordination bus. |
| 6 | |
| 7 | Sub-commands |
| 8 | ------------ |
| 9 | ``muse coord sync push`` |
| 10 | Reads all local coordination records from ``.muse/coordination/`` and |
| 11 | pushes them to MuseHub in batches. Records already on the hub are |
| 12 | silently skipped (idempotent). Requires a valid signing identity. |
| 13 | |
| 14 | ``muse coord sync pull`` |
| 15 | Pulls coordination records from MuseHub into local read-only JSON files |
| 16 | under ``.muse/coordination/remote/``. Local agents can read these to |
| 17 | discover reservations and tasks created by remote agents. Requires auth |
| 18 | for private repos; public repos allow unauthenticated pull. |
| 19 | |
| 20 | Usage:: |
| 21 | |
| 22 | muse coord sync push --hub https://localhost:1337 --owner gabriel --slug myrepo |
| 23 | muse coord sync pull --hub https://localhost:1337 --owner gabriel --slug myrepo |
| 24 | muse coord sync pull --hub https://localhost:1337 --owner gabriel --slug myrepo \\ |
| 25 | --since-id 42 --kinds reservation heartbeat |
| 26 | |
| 27 | Output (push):: |
| 28 | |
| 29 | ✅ Pushed 12 record(s) to gabriel/myrepo — inserted: 10, skipped: 2 |
| 30 | |
| 31 | Output (pull):: |
| 32 | |
| 33 | ✅ Pulled 8 new record(s) from gabriel/myrepo — cursor: 20 |
| 34 | |
| 35 | JSON output (push):: |
| 36 | |
| 37 | { |
| 38 | "schema_version": "...", |
| 39 | "inserted": 10, |
| 40 | "skipped": 2, |
| 41 | "total": 12, |
| 42 | "failed": false, |
| 43 | "duration_ms": 0.123 |
| 44 | } |
| 45 | |
| 46 | JSON output (pull):: |
| 47 | |
| 48 | { |
| 49 | "schema_version": "...", |
| 50 | "count": 8, |
| 51 | "cursor": 20, |
| 52 | "records": [...], |
| 53 | "duration_ms": 0.045 |
| 54 | } |
| 55 | |
| 56 | Flags (common): |
| 57 | |
| 58 | ``--hub URL`` |
| 59 | MuseHub base URL (default: read from ``[hub] url`` in ``config.toml``). |
| 60 | |
| 61 | ``--owner NAME`` |
| 62 | Repository owner username. Maximum 256 characters. |
| 63 | |
| 64 | ``--slug NAME`` |
| 65 | Repository URL slug. Maximum 256 characters. |
| 66 | |
| 67 | ``--json`` |
| 68 | Emit results as JSON. |
| 69 | |
| 70 | Push-only flags: |
| 71 | |
| 72 | ``--kinds KIND [KIND ...]`` |
| 73 | Limit push to specific record kinds (default: all kinds). |
| 74 | |
| 75 | Pull-only flags: |
| 76 | |
| 77 | ``--since-id N`` |
| 78 | Return only records with id > N (default: 0 = all records). Must be >= 0. |
| 79 | |
| 80 | ``--kinds KIND [KIND ...]`` |
| 81 | Filter by record kind. |
| 82 | |
| 83 | ``--limit N`` |
| 84 | Maximum records to pull per call (1–1000, default: 500). |
| 85 | |
| 86 | Exit codes:: |
| 87 | |
| 88 | 0 — success |
| 89 | 1 — bad arguments, auth failure, or hub communication error |
| 90 | """ |
| 91 | |
| 92 | import argparse |
| 93 | import json |
| 94 | import logging |
| 95 | import pathlib |
| 96 | import re |
| 97 | import sys |
| 98 | from datetime import datetime, timezone |
| 99 | from typing import TYPE_CHECKING |
| 100 | |
| 101 | from muse.core.types import load_json_file |
| 102 | from muse.core.paths import coordination_dir as _coordination_dir |
| 103 | from muse.core.coord_bus import ( |
| 104 | CoordBusError, |
| 105 | MAX_PUSH_BATCH, |
| 106 | pull_from_hub, |
| 107 | push_to_hub, |
| 108 | ) |
| 109 | from muse.core.coord_bus import JsonDict |
| 110 | from muse.core.errors import ExitCode |
| 111 | from muse.core.repo import require_repo |
| 112 | from muse.core.io import write_text_atomic |
| 113 | from muse.core.validation import clamp_int, sanitize_display |
| 114 | from muse.core.envelope import EnvelopeJson, make_envelope |
| 115 | from muse.core.timing import start_timer |
| 116 | from typing import TypedDict |
| 117 | |
| 118 | if TYPE_CHECKING: |
| 119 | from muse.core.transport import SigningIdentity |
| 120 | |
| 121 | logger = logging.getLogger(__name__) |
| 122 | |
| 123 | # Coordination record kinds supported by the bus. |
| 124 | _ALL_KINDS = ( |
| 125 | "reservation", |
| 126 | "intent", |
| 127 | "release", |
| 128 | "heartbeat", |
| 129 | "dependency", |
| 130 | "task", |
| 131 | "claim", |
| 132 | ) |
| 133 | |
| 134 | # Directory under .muse/coordination/ where pulled remote records are stored. |
| 135 | _REMOTE_DIR_NAME = "remote" |
| 136 | |
| 137 | # ── Input constraints ───────────────────────────────────────────────────────── |
| 138 | |
| 139 | #: Maximum length of --owner and --slug to prevent oversized HTTP requests. |
| 140 | _MAX_OWNER_LEN: int = 256 |
| 141 | _MAX_SLUG_LEN: int = 256 |
| 142 | |
| 143 | #: Maximum records to pull per call. |
| 144 | _MAX_PULL_LIMIT: int = 1000 |
| 145 | |
| 146 | #: Safe record ID pattern — alphanumeric, hyphens, underscores, colons. |
| 147 | #: Accepts sha256 genesis IDs (sha256:<64-hex>) and opaque run_id strings. |
| 148 | #: Prevents path traversal when server-supplied record_id is used to |
| 149 | #: construct filesystem paths under .muse/coordination/remote/<kind>/<id>.json. |
| 150 | _SAFE_RECORD_ID_RE: re.Pattern[str] = re.compile(r"^[a-zA-Z0-9_\-:]{1,128}$") |
| 151 | |
| 152 | # ── Error helper ────────────────────────────────────────────────────────────── |
| 153 | |
| 154 | def _err(msg: str, json_out: bool = False, status: str = "error") -> None: |
| 155 | """Print an error and return. Caller is responsible for raising SystemExit.""" |
| 156 | if json_out: |
| 157 | print(json.dumps({"error": msg, "status": status})) |
| 158 | else: |
| 159 | print(f"❌ {msg}", file=sys.stderr) |
| 160 | |
| 161 | class _CoordSyncPushJson(EnvelopeJson): |
| 162 | """JSON output for ``muse coord sync push --json``.""" |
| 163 | |
| 164 | inserted: int |
| 165 | skipped: int |
| 166 | total: int |
| 167 | failed: bool |
| 168 | errors: list[str] |
| 169 | |
| 170 | class _CoordSyncPullJson(EnvelopeJson): |
| 171 | """JSON output for ``muse coord sync pull --json``.""" |
| 172 | |
| 173 | count: int |
| 174 | cursor: int |
| 175 | records: list[dict] |
| 176 | |
| 177 | def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: |
| 178 | """Register ``sync`` on *subparsers* (nested push/pull sub-subcommands). |
| 179 | |
| 180 | Wires ``push`` and ``pull`` sub-subcommands with their flags, defaults, |
| 181 | and help text. Both sub-commands share ``--hub``, ``--owner``, ``--slug``, |
| 182 | ``--format`` / ``--json``. Sets ``func`` to |
| 183 | :func:`run_push` and :func:`run_pull` respectively. |
| 184 | """ |
| 185 | sync_parser = subparsers.add_parser( |
| 186 | "sync", |
| 187 | help="Push/pull coordination records to/from MuseHub.", |
| 188 | description=__doc__, |
| 189 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 190 | ) |
| 191 | sync_subs = sync_parser.add_subparsers(dest="sync_command", metavar="SYNC_COMMAND") |
| 192 | sync_subs.required = True |
| 193 | |
| 194 | # ── Common arguments shared by push and pull ────────────────────────────── |
| 195 | def _add_common(p: argparse.ArgumentParser) -> None: |
| 196 | p.add_argument( |
| 197 | "--hub", |
| 198 | default=None, |
| 199 | metavar="URL", |
| 200 | help="MuseHub base URL (default: from config.toml [hub] url).", |
| 201 | ) |
| 202 | p.add_argument( |
| 203 | "--owner", |
| 204 | required=True, |
| 205 | metavar="NAME", |
| 206 | help=f"Repository owner username (max {_MAX_OWNER_LEN} chars).", |
| 207 | ) |
| 208 | p.add_argument( |
| 209 | "--slug", |
| 210 | required=True, |
| 211 | metavar="NAME", |
| 212 | help=f"Repository URL slug (max {_MAX_SLUG_LEN} chars).", |
| 213 | ) |
| 214 | p.add_argument( |
| 215 | "--json", "-j", |
| 216 | action="store_true", |
| 217 | dest="json_out", |
| 218 | help="Emit machine-readable JSON.", |
| 219 | ) |
| 220 | |
| 221 | # ── Pull ────────────────────────────────────────────────────────────────── |
| 222 | pull_parser = sync_subs.add_parser( |
| 223 | "pull", |
| 224 | help="Pull coordination records from MuseHub.", |
| 225 | description=( |
| 226 | "Fetch coordination records from MuseHub and write them to\n" |
| 227 | ".muse/coordination/remote/<kind>/<id>.json.\n" |
| 228 | "Use --since-id <cursor> for incremental pulls — pass the cursor\n" |
| 229 | "from the previous JSON output back as --since-id on the next call.\n\n" |
| 230 | "Agent quickstart\n" |
| 231 | "----------------\n" |
| 232 | " muse coord sync pull --owner gabriel --slug myrepo --json\n" |
| 233 | " muse coord sync pull --owner gabriel --slug myrepo -j\n" |
| 234 | " muse coord sync pull --owner gabriel --slug myrepo -j | jq .cursor\n" |
| 235 | " muse coord sync pull --owner gabriel --slug myrepo \\\n" |
| 236 | " --since-id 42 --kinds reservation intent -j\n\n" |
| 237 | "JSON output schema\n" |
| 238 | "------------------\n" |
| 239 | ' {"schema_version": "<str>", "count": <int>, "cursor": <int>,\n' |
| 240 | ' "records": [...], "duration_ms": <float>}\n\n' |
| 241 | "Exit codes\n" |
| 242 | "----------\n" |
| 243 | " 0 — pull succeeded (0 records is a valid success)\n" |
| 244 | " 1 — bad arguments, auth error, or hub communication error\n" |
| 245 | " 2 — not inside a Muse repository\n" |
| 246 | ), |
| 247 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 248 | ) |
| 249 | _add_common(pull_parser) |
| 250 | pull_parser.add_argument( |
| 251 | "--since-id", |
| 252 | dest="since_id", |
| 253 | type=int, |
| 254 | default=0, |
| 255 | metavar="N", |
| 256 | help="Only pull records with id > N (default: 0 = all). Must be >= 0.", |
| 257 | ) |
| 258 | pull_parser.add_argument( |
| 259 | "--kinds", |
| 260 | nargs="+", |
| 261 | choices=_ALL_KINDS, |
| 262 | default=[], |
| 263 | metavar="KIND", |
| 264 | help=f"Filter by kind. Choices: {', '.join(_ALL_KINDS)}.", |
| 265 | ) |
| 266 | pull_parser.add_argument( |
| 267 | "--limit", |
| 268 | type=int, |
| 269 | default=500, |
| 270 | metavar="N", |
| 271 | help=f"Maximum records to pull per call (1–{_MAX_PULL_LIMIT}, default: 500).", |
| 272 | ) |
| 273 | pull_parser.set_defaults(func=run_pull) |
| 274 | |
| 275 | # ── Push ────────────────────────────────────────────────────────────────── |
| 276 | push_parser = sync_subs.add_parser( |
| 277 | "push", |
| 278 | help="Push local coordination records to MuseHub.", |
| 279 | description=( |
| 280 | "Read all local coordination files from .muse/coordination/ and\n" |
| 281 | "POST them to MuseHub in batches. Records already on the hub are\n" |
| 282 | "silently skipped (idempotent). Fails with exit 1 if any batch\n" |
| 283 | "errors; other batches still complete.\n\n" |
| 284 | "Agent quickstart\n" |
| 285 | "----------------\n" |
| 286 | " muse coord sync push --owner gabriel --slug myrepo --json\n" |
| 287 | " muse coord sync push --owner gabriel --slug myrepo -j\n" |
| 288 | " muse coord sync push --owner gabriel --slug myrepo -j | jq .inserted\n" |
| 289 | " muse coord sync push --owner gabriel --slug myrepo \\\n" |
| 290 | " --kinds reservation intent -j\n\n" |
| 291 | "JSON output schema\n" |
| 292 | "------------------\n" |
| 293 | ' {"schema_version": "<str>", "inserted": <int>, "skipped": <int>,\n' |
| 294 | ' "total": <int>, "failed": <bool>, "duration_ms": <float>}\n\n' |
| 295 | "Exit codes\n" |
| 296 | "----------\n" |
| 297 | " 0 — all batches pushed successfully (or nothing to push)\n" |
| 298 | " 1 — bad arguments, auth error, or at least one batch failed\n" |
| 299 | " 2 — not inside a Muse repository\n" |
| 300 | ), |
| 301 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 302 | ) |
| 303 | _add_common(push_parser) |
| 304 | push_parser.add_argument( |
| 305 | "--kinds", |
| 306 | nargs="+", |
| 307 | choices=_ALL_KINDS, |
| 308 | default=list(_ALL_KINDS), |
| 309 | metavar="KIND", |
| 310 | help=f"Limit push to these kinds (default: all). Choices: {', '.join(_ALL_KINDS)}.", |
| 311 | ) |
| 312 | push_parser.set_defaults(func=run_push) |
| 313 | |
| 314 | # ── Helpers ──────────────────────────────────────────────────────────────────── |
| 315 | |
| 316 | def _resolve_hub_and_signing( |
| 317 | args: argparse.Namespace, |
| 318 | json_out: bool = False, |
| 319 | ) -> tuple[str, str | None]: |
| 320 | """Return ``(hub_url, signing)`` from args or config. |
| 321 | |
| 322 | Reads ``[hub] url`` from ``config.toml`` and the signing identity from |
| 323 | ``~/.muse/identity.toml`` when the corresponding CLI flags are omitted. |
| 324 | |
| 325 | Raises :class:`SystemExit` with exit code |
| 326 | :attr:`~muse.core.errors.ExitCode.USER_ERROR` if no hub URL can be |
| 327 | determined. Error messages go to *stdout* as JSON when *json_out* is |
| 328 | ``True``, otherwise to *stderr*. |
| 329 | |
| 330 | Args: |
| 331 | args: Parsed ``argparse.Namespace`` with ``hub`` and ``token`` attrs. |
| 332 | json_out: Route error messages to stdout as JSON when ``True``. |
| 333 | |
| 334 | Returns: |
| 335 | ``(hub_url, signing)`` where *signing* may be ``None`` for unauthenticated |
| 336 | access to public repos. |
| 337 | """ |
| 338 | hub_url: str | None = args.hub |
| 339 | signing: SigningIdentity | None = None |
| 340 | |
| 341 | if hub_url is None or signing is None: |
| 342 | try: |
| 343 | from muse.cli.config import get_hub_url, get_signing_identity |
| 344 | if hub_url is None: |
| 345 | hub_url = get_hub_url() |
| 346 | if signing is None: |
| 347 | signing = get_signing_identity(hub_url) |
| 348 | except Exception as exc: # noqa: BLE001 |
| 349 | if hub_url is None: |
| 350 | msg = ( |
| 351 | "no --hub URL supplied and could not read hub URL from config: " |
| 352 | f"{exc}" |
| 353 | ) |
| 354 | _err(msg, json_out, "no_hub_url") |
| 355 | raise SystemExit(ExitCode.USER_ERROR) |
| 356 | |
| 357 | if hub_url is None: |
| 358 | _err("--hub URL is required", json_out, "no_hub_url") |
| 359 | raise SystemExit(ExitCode.USER_ERROR) |
| 360 | |
| 361 | return hub_url, signing |
| 362 | |
| 363 | def _gather_local_records( |
| 364 | root: pathlib.Path, |
| 365 | kinds: list[str], |
| 366 | ) -> list[JsonDict]: |
| 367 | """Load local coordination records from ``.muse/coordination/``. |
| 368 | |
| 369 | Returns a flat list of serialized record dicts, each with the fields |
| 370 | expected by the hub's push API: ``kind``, ``record_id``, ``run_id``, |
| 371 | ``payload``, and ``expires_at``. |
| 372 | |
| 373 | Only kinds listed in *kinds* are included. Corrupt or unreadable files |
| 374 | are silently skipped (logged at DEBUG level) so a single bad file never |
| 375 | blocks a push. |
| 376 | |
| 377 | Args: |
| 378 | root: Repository root directory (contains ``.muse/``). |
| 379 | kinds: Subset of :data:`_ALL_KINDS` to include. |
| 380 | |
| 381 | Returns: |
| 382 | List of record dicts ready for ``push_to_hub``. |
| 383 | """ |
| 384 | coord_dir = _coordination_dir(root) |
| 385 | records: list[JsonDict] = [] |
| 386 | |
| 387 | # ── Reservations ────────────────────────────────────────────────────────── |
| 388 | if "reservation" in kinds: |
| 389 | res_dir = coord_dir / "reservations" |
| 390 | if res_dir.is_dir(): |
| 391 | for fpath in res_dir.glob("*.json"): |
| 392 | try: |
| 393 | data = load_json_file(fpath) |
| 394 | if data is None: |
| 395 | continue |
| 396 | records.append({ |
| 397 | "kind": "reservation", |
| 398 | "record_id": data.get("reservation_id", fpath.stem), |
| 399 | "run_id": data.get("run_id", ""), |
| 400 | "payload": data, |
| 401 | "expires_at": data.get("expires_at"), |
| 402 | }) |
| 403 | except Exception: # noqa: BLE001 |
| 404 | logger.debug("gather_local_records: skipping %s", fpath) |
| 405 | |
| 406 | # ── Heartbeats ──────────────────────────────────────────────────────────── |
| 407 | if "heartbeat" in kinds: |
| 408 | hb_dir = coord_dir / "heartbeats" |
| 409 | if hb_dir.is_dir(): |
| 410 | for fpath in hb_dir.glob("*.json"): |
| 411 | try: |
| 412 | data = load_json_file(fpath) |
| 413 | if data is None: |
| 414 | continue |
| 415 | records.append({ |
| 416 | "kind": "heartbeat", |
| 417 | "record_id": data.get("run_id", fpath.stem), |
| 418 | "run_id": data.get("run_id", ""), |
| 419 | "payload": data, |
| 420 | "expires_at": data.get("expires_at"), |
| 421 | }) |
| 422 | except Exception: # noqa: BLE001 |
| 423 | logger.debug("gather_local_records: skipping heartbeat %s", fpath) |
| 424 | |
| 425 | # ── Intents ─────────────────────────────────────────────────────────────── |
| 426 | if "intent" in kinds: |
| 427 | intent_dir = coord_dir / "intents" |
| 428 | if intent_dir.is_dir(): |
| 429 | for fpath in intent_dir.glob("*.json"): |
| 430 | try: |
| 431 | data = load_json_file(fpath) |
| 432 | if data is None: |
| 433 | continue |
| 434 | records.append({ |
| 435 | "kind": "intent", |
| 436 | "record_id": data.get("intent_id", fpath.stem), |
| 437 | "run_id": data.get("run_id", ""), |
| 438 | "payload": data, |
| 439 | "expires_at": data.get("expires_at"), |
| 440 | }) |
| 441 | except Exception: # noqa: BLE001 |
| 442 | logger.debug("gather_local_records: skipping intent %s", fpath) |
| 443 | |
| 444 | # ── Releases ────────────────────────────────────────────────────────────── |
| 445 | if "release" in kinds: |
| 446 | release_dir = coord_dir / "releases" |
| 447 | if release_dir.is_dir(): |
| 448 | for fpath in release_dir.glob("*.json"): |
| 449 | try: |
| 450 | data = load_json_file(fpath) |
| 451 | if data is None: |
| 452 | continue |
| 453 | records.append({ |
| 454 | "kind": "release", |
| 455 | "record_id": data.get("release_id", fpath.stem), |
| 456 | "run_id": data.get("run_id", ""), |
| 457 | "payload": data, |
| 458 | "expires_at": None, |
| 459 | }) |
| 460 | except Exception: # noqa: BLE001 |
| 461 | logger.debug("gather_local_records: skipping release %s", fpath) |
| 462 | |
| 463 | # ── Dependencies ────────────────────────────────────────────────────────── |
| 464 | if "dependency" in kinds: |
| 465 | dep_dir = coord_dir / "dependencies" |
| 466 | if dep_dir.is_dir(): |
| 467 | for fpath in dep_dir.glob("*.json"): |
| 468 | try: |
| 469 | data = load_json_file(fpath) |
| 470 | if data is None: |
| 471 | continue |
| 472 | records.append({ |
| 473 | "kind": "dependency", |
| 474 | "record_id": data.get("reservation_id", fpath.stem), |
| 475 | "run_id": data.get("run_id", ""), |
| 476 | "payload": data, |
| 477 | "expires_at": None, |
| 478 | }) |
| 479 | except Exception: # noqa: BLE001 |
| 480 | logger.debug("gather_local_records: skipping dependency %s", fpath) |
| 481 | |
| 482 | # ── Tasks ───────────────────────────────────────────────────────────────── |
| 483 | if "task" in kinds: |
| 484 | task_dir = coord_dir / "tasks" |
| 485 | if task_dir.is_dir(): |
| 486 | for fpath in task_dir.glob("*.json"): |
| 487 | try: |
| 488 | data = load_json_file(fpath) |
| 489 | if data is None: |
| 490 | continue |
| 491 | records.append({ |
| 492 | "kind": "task", |
| 493 | "record_id": data.get("task_id", fpath.stem), |
| 494 | "run_id": data.get("run_id", ""), |
| 495 | "payload": data, |
| 496 | "expires_at": None, |
| 497 | }) |
| 498 | except Exception: # noqa: BLE001 |
| 499 | logger.debug("gather_local_records: skipping task %s", fpath) |
| 500 | |
| 501 | # ── Claims ──────────────────────────────────────────────────────────────── |
| 502 | if "claim" in kinds: |
| 503 | claim_dir = coord_dir / "claims" |
| 504 | if claim_dir.is_dir(): |
| 505 | for fpath in claim_dir.glob("*.json"): |
| 506 | try: |
| 507 | data = load_json_file(fpath) |
| 508 | if data is None: |
| 509 | continue |
| 510 | records.append({ |
| 511 | "kind": "claim", |
| 512 | "record_id": data.get("task_id", fpath.stem), |
| 513 | "run_id": data.get("claimer_run_id", ""), |
| 514 | "payload": data, |
| 515 | "expires_at": data.get("expires_at"), |
| 516 | }) |
| 517 | except Exception: # noqa: BLE001 |
| 518 | logger.debug("gather_local_records: skipping claim %s", fpath) |
| 519 | |
| 520 | return records |
| 521 | |
| 522 | def _write_remote_records( |
| 523 | root: pathlib.Path, |
| 524 | records: list[JsonDict], |
| 525 | ) -> None: |
| 526 | """Write pulled remote records to ``.muse/coordination/remote/``. |
| 527 | |
| 528 | Each record is written to ``remote/<kind>/<record_id>.json``. |
| 529 | Existing files are overwritten to keep remote state up to date. |
| 530 | |
| 531 | Security |
| 532 | -------- |
| 533 | Both *kind* and *record_id* come from the MuseHub response and must be |
| 534 | sanitized before use in filesystem paths: |
| 535 | |
| 536 | * **kind** — validated against the :data:`_ALL_KINDS` allowlist. Records |
| 537 | with an unrecognised kind are skipped; a server cannot write outside the |
| 538 | ``remote/`` subtree by supplying ``"../traversal"`` as the kind. |
| 539 | * **record_id** — validated against :data:`_SAFE_RECORD_ID_RE` |
| 540 | (alphanumeric, hyphens, underscores, colons, 1–128 chars). Records with |
| 541 | an unsafe ID are skipped; a server cannot escape the kind directory via |
| 542 | ``"../../../etc/passwd"`` or similar. |
| 543 | |
| 544 | Args: |
| 545 | root: Repository root directory (contains ``.muse/``). |
| 546 | records: List of record dicts returned by :func:`pull_from_hub`. |
| 547 | """ |
| 548 | remote_dir = _coordination_dir(root) / _REMOTE_DIR_NAME |
| 549 | for rec in records: |
| 550 | kind = rec.get("kind", "") |
| 551 | record_id = rec.get("record_id", "") |
| 552 | |
| 553 | # Allowlist check — reject unknown kinds outright. |
| 554 | if kind not in _ALL_KINDS: |
| 555 | logger.warning("write_remote_records: rejected unknown kind %r", kind) |
| 556 | continue |
| 557 | |
| 558 | # ID safety check — reject traversal-capable strings. |
| 559 | if not record_id or not _SAFE_RECORD_ID_RE.match(record_id): |
| 560 | logger.warning( |
| 561 | "write_remote_records: rejected unsafe record_id %r (kind=%r)", |
| 562 | record_id, |
| 563 | kind, |
| 564 | ) |
| 565 | continue |
| 566 | |
| 567 | kind_dir = remote_dir / kind |
| 568 | kind_dir.mkdir(parents=True, exist_ok=True) |
| 569 | target = kind_dir / f"{record_id}.json" |
| 570 | write_text_atomic(target, f"{json.dumps(rec)}\n") |
| 571 | |
| 572 | # ── Command handlers ─────────────────────────────────────────────────────────── |
| 573 | |
| 574 | def run_push(args: argparse.Namespace) -> None: |
| 575 | """Push local coordination records to MuseHub. |
| 576 | |
| 577 | Loads all local ``.muse/coordination/`` records, batches them, and POSTs to |
| 578 | the hub. Idempotent — records already on the hub are silently skipped. If |
| 579 | any batch fails the remaining batches still run; ``failed=true`` in output. |
| 580 | |
| 581 | Agent quickstart |
| 582 | ---------------- |
| 583 | :: |
| 584 | |
| 585 | muse coord-sync push --owner gabriel --slug muse --json |
| 586 | muse coord-sync push --owner gabriel --slug muse --kinds locks reservations --json |
| 587 | |
| 588 | JSON fields |
| 589 | ----------- |
| 590 | inserted Number of records the hub accepted as new. |
| 591 | skipped Number of records already present on the hub. |
| 592 | total Total local records gathered. |
| 593 | failed ``true`` if any batch raised an error. |
| 594 | errors List of error strings (empty on success). |
| 595 | |
| 596 | Exit codes |
| 597 | ---------- |
| 598 | 0 All batches pushed (or nothing to push). |
| 599 | 1 Bad arguments, auth error, or at least one batch failed. |
| 600 | 2 Not inside a Muse repository. |
| 601 | """ |
| 602 | json_out: bool = args.json_out |
| 603 | owner: str = args.owner |
| 604 | slug: str = args.slug |
| 605 | |
| 606 | # ── Input validation (before any file I/O) ──────────────────────────────── |
| 607 | |
| 608 | if len(owner) > _MAX_OWNER_LEN: |
| 609 | msg = f"--owner is too long ({len(owner)} chars; max {_MAX_OWNER_LEN})" |
| 610 | _err(msg, json_out, "bad_args") |
| 611 | raise SystemExit(ExitCode.USER_ERROR) |
| 612 | |
| 613 | if len(slug) > _MAX_SLUG_LEN: |
| 614 | msg = f"--slug is too long ({len(slug)} chars; max {_MAX_SLUG_LEN})" |
| 615 | _err(msg, json_out, "bad_args") |
| 616 | raise SystemExit(ExitCode.USER_ERROR) |
| 617 | |
| 618 | root = require_repo() |
| 619 | hub_url, signing = _resolve_hub_and_signing(args, json_out) |
| 620 | kinds: list[str] = args.kinds |
| 621 | |
| 622 | elapsed = start_timer() |
| 623 | records = _gather_local_records(root, kinds) |
| 624 | |
| 625 | if not records: |
| 626 | if json_out: |
| 627 | print(json.dumps(_CoordSyncPushJson( |
| 628 | **make_envelope(elapsed), |
| 629 | inserted=0, skipped=0, total=0, failed=False, errors=[], |
| 630 | ))) |
| 631 | else: |
| 632 | print(" (no local coordination records to push)") |
| 633 | return |
| 634 | |
| 635 | # Push in batches. |
| 636 | total_inserted = 0 |
| 637 | total_skipped = 0 |
| 638 | failed = False |
| 639 | errors: list[str] = [] |
| 640 | |
| 641 | for i in range(0, len(records), MAX_PUSH_BATCH): |
| 642 | batch = records[i : i + MAX_PUSH_BATCH] |
| 643 | try: |
| 644 | result = push_to_hub(hub_url, owner, slug, batch, signing) |
| 645 | total_inserted += int(result.get("inserted", 0)) |
| 646 | total_skipped += int(result.get("skipped", 0)) |
| 647 | except CoordBusError as exc: |
| 648 | logger.error("coord sync push: batch %d failed: %s", i // MAX_PUSH_BATCH + 1, exc) |
| 649 | if not json_out: |
| 650 | # Text mode: print error to stderr immediately so the operator |
| 651 | # sees it. In JSON mode we accumulate errors and emit them in |
| 652 | # the single final JSON object — printing here would produce |
| 653 | # multiple JSON objects on stdout which breaks any caller doing |
| 654 | # json.loads(stdout). |
| 655 | _err(str(exc), json_out=False, status="hub_error") |
| 656 | errors.append(str(exc)) |
| 657 | failed = True |
| 658 | |
| 659 | if json_out: |
| 660 | print(json.dumps(_CoordSyncPushJson( |
| 661 | **make_envelope(elapsed), |
| 662 | inserted=total_inserted, |
| 663 | skipped=total_skipped, |
| 664 | total=len(records), |
| 665 | failed=failed, |
| 666 | errors=errors, |
| 667 | ))) |
| 668 | else: |
| 669 | status_icon = "❌" if failed else "✅" |
| 670 | print( |
| 671 | f"\n{status_icon} Pushed {len(records)} record(s) to " |
| 672 | f"{sanitize_display(owner)}/{sanitize_display(slug)}" |
| 673 | f" — inserted: {total_inserted}, skipped: {total_skipped}" |
| 674 | f" ({elapsed():.3f}s)" |
| 675 | ) |
| 676 | |
| 677 | if failed: |
| 678 | raise SystemExit(ExitCode.USER_ERROR) |
| 679 | |
| 680 | def run_pull(args: argparse.Namespace) -> None: |
| 681 | """Pull coordination records from MuseHub. |
| 682 | |
| 683 | Writes pulled records to ``.muse/coordination/remote/<kind>/<id>.json``. |
| 684 | Returns a cursor (last seen record ID) for incremental pulls — pass it back |
| 685 | as ``--since-id`` on subsequent calls. Exit 0 even when 0 records available. |
| 686 | |
| 687 | Agent quickstart |
| 688 | ---------------- |
| 689 | :: |
| 690 | |
| 691 | muse coord-sync pull --owner gabriel --slug muse --json |
| 692 | muse coord-sync pull --owner gabriel --slug muse --since-id 42 --json |
| 693 | |
| 694 | JSON fields |
| 695 | ----------- |
| 696 | count Number of records returned by this pull. |
| 697 | cursor Last record ID returned; pass as ``--since-id`` for next pull. |
| 698 | ``0`` when no records were returned. |
| 699 | records Full list of record dicts as returned by the hub. |
| 700 | |
| 701 | Exit codes |
| 702 | ---------- |
| 703 | 0 Pull succeeded (0 records is also 0). |
| 704 | 1 Bad arguments, auth error, or hub communication error. |
| 705 | 2 Not inside a Muse repository. |
| 706 | """ |
| 707 | json_out: bool = args.json_out |
| 708 | owner: str = args.owner |
| 709 | slug: str = args.slug |
| 710 | |
| 711 | # ── Input validation (before any file I/O) ──────────────────────────────── |
| 712 | |
| 713 | if len(owner) > _MAX_OWNER_LEN: |
| 714 | msg = f"--owner is too long ({len(owner)} chars; max {_MAX_OWNER_LEN})" |
| 715 | _err(msg, json_out, "bad_args") |
| 716 | raise SystemExit(ExitCode.USER_ERROR) |
| 717 | |
| 718 | if len(slug) > _MAX_SLUG_LEN: |
| 719 | msg = f"--slug is too long ({len(slug)} chars; max {_MAX_SLUG_LEN})" |
| 720 | _err(msg, json_out, "bad_args") |
| 721 | raise SystemExit(ExitCode.USER_ERROR) |
| 722 | |
| 723 | since_id: int = args.since_id |
| 724 | if since_id < 0: |
| 725 | msg = f"--since-id must be >= 0, got {since_id}" |
| 726 | _err(msg, json_out, "bad_args") |
| 727 | raise SystemExit(ExitCode.USER_ERROR) |
| 728 | |
| 729 | raw_limit: int = args.limit |
| 730 | if not (1 <= raw_limit <= _MAX_PULL_LIMIT): |
| 731 | msg = f"--limit must be 1–{_MAX_PULL_LIMIT}, got {raw_limit}" |
| 732 | _err(msg, json_out, "bad_args") |
| 733 | raise SystemExit(ExitCode.USER_ERROR) |
| 734 | |
| 735 | limit: int = clamp_int(raw_limit, 1, _MAX_PULL_LIMIT, "limit") |
| 736 | kinds: list[str] = args.kinds or [] |
| 737 | |
| 738 | root = require_repo() |
| 739 | hub_url, signing = _resolve_hub_and_signing(args, json_out) |
| 740 | |
| 741 | elapsed = start_timer() |
| 742 | try: |
| 743 | result = pull_from_hub(hub_url, owner, slug, since_id, kinds, limit, signing) |
| 744 | except CoordBusError as exc: |
| 745 | _err(str(exc), json_out, "hub_error") |
| 746 | raise SystemExit(ExitCode.USER_ERROR) |
| 747 | |
| 748 | pulled_records: list[JsonDict] = result.get("records", []) |
| 749 | cursor: int = result.get("cursor", 0) |
| 750 | |
| 751 | # Write remote records to disk (path-safe: kind + record id are sanitized inside). |
| 752 | if pulled_records: |
| 753 | _write_remote_records(root, pulled_records) |
| 754 | |
| 755 | if json_out: |
| 756 | print(json.dumps(_CoordSyncPullJson( |
| 757 | **make_envelope(elapsed), |
| 758 | count=len(pulled_records), |
| 759 | cursor=cursor, |
| 760 | records=pulled_records, |
| 761 | ))) |
| 762 | else: |
| 763 | print( |
| 764 | f"\n✅ Pulled {len(pulled_records)} new record(s) from " |
| 765 | f"{sanitize_display(owner)}/{sanitize_display(slug)}" |
| 766 | f" — cursor: {cursor} ({elapsed():.3f}s)" |
| 767 | ) |
| 768 | if pulled_records: |
| 769 | print(f" Records written to .muse/coordination/remote/") |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago