gabriel / muse public

test_cmd_coord_sync.py file-level

at sha256:5 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Comprehensive tests for ``muse coord sync push`` and ``muse coord sync pull``.
2
3 Coverage matrix
4 ---------------
5 Unit
6 ~~~~
7 * _gather_local_records β€” reads reservations from disk
8 * _gather_local_records β€” reads heartbeats from disk
9 * _gather_local_records β€” kinds filter respected
10 * _gather_local_records β€” corrupt file skipped gracefully
11 * _gather_local_records β€” claims use claimer_run_id field
12 * _gather_local_records β€” all 7 kinds gathered
13 * _write_remote_records β€” writes correct paths under remote/
14 * _write_remote_records β€” overwrites existing files
15 * _write_remote_records β€” record with no record_id skipped
16 * _write_remote_records β€” unknown kind rejected (path traversal prevention)
17 * _write_remote_records β€” unsafe record_id rejected (path traversal prevention)
18 * _write_remote_records β€” compact JSON written (no indent)
19
20 Integration (all network calls mocked)
21 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
22 * push with no local records β€” text says "(no local coordination records to push)"
23 * push with reservations β€” calls push_to_hub with correct args
24 * push CoordBusError β€” exits 1 with error message
25 * pull empty result β€” prints "Pulled 0 new record(s)"
26 * pull with records β€” writes files to remote dir
27 * pull CoordBusError β€” exits 1 with error message
28 * --format json push β€” valid JSON with inserted/skipped/total/failed/elapsed/schema_version
29 * --format json pull β€” valid JSON with count/cursor/records/elapsed/schema_version
30 * --json shorthand push β€” same as --format json
31 * --json shorthand pull β€” same as --format json
32 * --since-id N β€” passed through to pull_from_hub
33 * --kinds filter push β€” restricts gathered kinds
34 * --limit N β€” passed through to pull_from_hub
35 * duration_ms present in both push and pull JSON output
36
37 Input validation
38 ~~~~~~~~~~~~~~~~
39 * --owner too long β†’ exit 1 before any I/O
40 * --slug too long β†’ exit 1 before any I/O
41 * --since-id negative β†’ exit 1 before any I/O
42 * --limit = 0 β†’ exit 1 before any I/O
43 * --limit > 1000 β†’ exit 1 before any I/O
44 * --limit at boundary 1 β†’ accepted
45 * --limit at boundary 1000 β†’ accepted
46 * push owner/slug validation fires before require_repo
47 * pull since-id/limit validation fires before require_repo
48
49 Security
50 ~~~~~~~~
51 * owner/slug with path traversal chars are passed as strings to push_to_hub
52 * token not echoed in output
53 * _write_remote_records rejects unknown kind (prevents escaping remote/)
54 * _write_remote_records rejects traversal in record_id
55
56 Stress
57 ~~~~~~
58 * push 600 records splits into multiple batches
59 * pull returns 1000 records, all written to disk
60 """
61
62 from __future__ import annotations
63
64 import itertools
65 import json
66 import pathlib
67 from typing import TYPE_CHECKING
68
69 import pytest
70 from unittest.mock import patch, MagicMock, call
71
72 from tests.cli_test_helper import CliRunner
73 from muse.core.types import MsgpackDict, content_hash
74 from muse.core.coord_bus import CoordBusError, JsonDict
75
76 _id_seq = itertools.count()
77
78
79 def _new_id() -> str:
80 return content_hash({"seq": next(_id_seq)})
81
82 if TYPE_CHECKING:
83 from muse.core.transport import SigningIdentity
84 from muse.core.paths import coordination_dir, muse_dir
85 from muse.cli.commands.coord_sync import (
86 _MAX_OWNER_LEN,
87 _MAX_SLUG_LEN,
88 _MAX_PULL_LIMIT,
89 _ALL_KINDS,
90 )
91
92 runner = CliRunner()
93 cli = None
94
95 _PUSH_TARGET = "muse.cli.commands.coord_sync.push_to_hub"
96 _PULL_TARGET = "muse.cli.commands.coord_sync.pull_from_hub"
97
98
99 # ── Fixtures ──────────────────────────────────────────────────────────────────
100
101
102 @pytest.fixture()
103 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
104 dot_muse = muse_dir(tmp_path)
105 dot_muse.mkdir()
106 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
107 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
108 return tmp_path
109
110
111 # ── Helpers ───────────────────────────────────────────────────────────────────
112
113
114 def _write_local_claim(repo: pathlib.Path, task_id: str | None = None, run_id: str = "worker-1") -> str:
115 claim_dir = coordination_dir(repo) / "claims"
116 claim_dir.mkdir(parents=True, exist_ok=True)
117 tid = task_id or _new_id()
118 data = {
119 "task_id": tid,
120 "claimer_run_id": run_id,
121 "claimed_at": "2026-01-01T00:00:00+00:00",
122 "expires_at": "2026-12-31T00:00:00+00:00",
123 }
124 (claim_dir / f"{tid}.json").write_text(json.dumps(data))
125 return tid
126
127
128 def _write_local_reservation(repo: pathlib.Path, run_id: str = "agent-1") -> str:
129 coord_dir = coordination_dir(repo) / "reservations"
130 coord_dir.mkdir(parents=True, exist_ok=True)
131 rid = _new_id()
132 data = {
133 "reservation_id": rid,
134 "run_id": run_id,
135 "branch": "main",
136 "addresses": ["src/x.py::foo"],
137 "operation": None,
138 "created_at": "2026-01-01T00:00:00+00:00",
139 "expires_at": "2026-12-31T00:00:00+00:00",
140 }
141 (coord_dir / f"{rid}.json").write_text(json.dumps(data))
142 return rid
143
144
145 def _write_local_heartbeat(repo: pathlib.Path, run_id: str = "agent-1") -> str:
146 hb_dir = coordination_dir(repo) / "heartbeats"
147 hb_dir.mkdir(parents=True, exist_ok=True)
148 data = {
149 "run_id": run_id,
150 "last_seen": "2026-01-01T00:01:00+00:00",
151 "expires_at": "2026-12-31T00:00:00+00:00",
152 }
153 (hb_dir / f"{run_id}.json").write_text(json.dumps(data))
154 return run_id
155
156
157 _PUSH_ARGS = [
158 "coord", "sync", "push",
159 "--hub", "https://localhost:1337",
160 "--owner", "gabriel",
161 "--slug", "myrepo",
162
163 ]
164
165 _PULL_ARGS = [
166 "coord", "sync", "pull",
167 "--hub", "https://localhost:1337",
168 "--owner", "gabriel",
169 "--slug", "myrepo",
170
171 "--since-id", "0",
172 ]
173
174
175 def _push_ok(inserted: int = 1, skipped: int = 0) -> MsgpackDict:
176 return {"inserted": inserted, "skipped": skipped}
177
178
179 def _pull_ok(records: list[MsgpackDict] | None = None, cursor: int = 0) -> MsgpackDict:
180 return {"records": records or [], "cursor": cursor}
181
182
183 # ── Unit: _gather_local_records ───────────────────────────────────────────────
184
185
186 class TestGatherLocalRecords:
187 def test_empty_coordination_dir_returns_empty(self, repo: pathlib.Path) -> None:
188 from muse.cli.commands.coord_sync import _gather_local_records
189 records = _gather_local_records(repo, kinds=["reservation"])
190 assert records == []
191
192 def test_reads_reservation_from_disk(self, repo: pathlib.Path) -> None:
193 from muse.cli.commands.coord_sync import _gather_local_records
194 rid = _write_local_reservation(repo)
195 records = _gather_local_records(repo, kinds=["reservation"])
196 assert len(records) == 1
197 assert records[0]["kind"] == "reservation"
198 assert records[0]["record_id"] == rid
199
200 def test_reads_heartbeat_from_disk(self, repo: pathlib.Path) -> None:
201 from muse.cli.commands.coord_sync import _gather_local_records
202 run_id = _write_local_heartbeat(repo, "hb-agent")
203 records = _gather_local_records(repo, kinds=["heartbeat"])
204 assert len(records) == 1
205 assert records[0]["kind"] == "heartbeat"
206 assert records[0]["run_id"] == run_id
207
208 def test_kinds_filter_excludes_heartbeats(self, repo: pathlib.Path) -> None:
209 from muse.cli.commands.coord_sync import _gather_local_records
210 _write_local_reservation(repo)
211 _write_local_heartbeat(repo)
212 records = _gather_local_records(repo, kinds=["reservation"])
213 assert all(r["kind"] == "reservation" for r in records)
214
215 def test_kinds_filter_excludes_reservations(self, repo: pathlib.Path) -> None:
216 from muse.cli.commands.coord_sync import _gather_local_records
217 _write_local_reservation(repo)
218 _write_local_heartbeat(repo)
219 records = _gather_local_records(repo, kinds=["heartbeat"])
220 assert all(r["kind"] == "heartbeat" for r in records)
221
222 def test_corrupt_file_skipped_gracefully(self, repo: pathlib.Path) -> None:
223 from muse.cli.commands.coord_sync import _gather_local_records
224 coord_dir = coordination_dir(repo) / "reservations"
225 coord_dir.mkdir(parents=True, exist_ok=True)
226 (coord_dir / "bad.json").write_text("not-valid-json{{{{")
227 records = _gather_local_records(repo, kinds=["reservation"])
228 assert records == []
229
230 def test_multiple_records_all_returned(self, repo: pathlib.Path) -> None:
231 from muse.cli.commands.coord_sync import _gather_local_records
232 for i in range(5):
233 _write_local_reservation(repo, run_id=f"agent-{i}")
234 records = _gather_local_records(repo, kinds=["reservation"])
235 assert len(records) == 5
236
237 def test_payload_field_contains_original_data(self, repo: pathlib.Path) -> None:
238 from muse.cli.commands.coord_sync import _gather_local_records
239 rid = _write_local_reservation(repo, run_id="my-agent")
240 records = _gather_local_records(repo, kinds=["reservation"])
241 assert records[0]["payload"]["reservation_id"] == rid
242 assert records[0]["payload"]["run_id"] == "my-agent"
243
244
245 # ── Unit: _write_remote_records ───────────────────────────────────────────────
246
247
248 class TestWriteRemoteRecords:
249 def test_writes_file_to_correct_path(self, repo: pathlib.Path) -> None:
250 from muse.cli.commands.coord_sync import _write_remote_records
251 rec = {"kind": "reservation", "record_id": "abc-123", "payload": {"x": 1}}
252 _write_remote_records(repo, [rec])
253 target = coordination_dir(repo) / "remote" / "reservation" / "abc-123.json"
254 assert target.exists()
255
256 def test_written_file_contains_correct_data(self, repo: pathlib.Path) -> None:
257 from muse.cli.commands.coord_sync import _write_remote_records
258 rec = {"kind": "reservation", "record_id": "def-456", "payload": {"y": 2}}
259 _write_remote_records(repo, [rec])
260 target = coordination_dir(repo) / "remote" / "reservation" / "def-456.json"
261 data = json.loads(target.read_text())
262 assert data["payload"]["y"] == 2
263
264 def test_overwrites_existing_file(self, repo: pathlib.Path) -> None:
265 from muse.cli.commands.coord_sync import _write_remote_records
266 kind_dir = coordination_dir(repo) / "remote" / "reservation"
267 kind_dir.mkdir(parents=True, exist_ok=True)
268 (kind_dir / "ghi-789.json").write_text('{"old": true}')
269 rec = {"kind": "reservation", "record_id": "ghi-789", "payload": {"new": True}}
270 _write_remote_records(repo, [rec])
271 data = json.loads((kind_dir / "ghi-789.json").read_text())
272 assert data["payload"]["new"] is True
273
274 def test_record_without_record_id_skipped(self, repo: pathlib.Path) -> None:
275 from muse.cli.commands.coord_sync import _write_remote_records
276 rec = {"kind": "reservation", "record_id": "", "payload": {}}
277 _write_remote_records(repo, [rec])
278 remote_dir = coordination_dir(repo) / "remote"
279 assert not remote_dir.exists() or not any(remote_dir.rglob("*.json"))
280
281 def test_multiple_kinds_written_to_separate_dirs(self, repo: pathlib.Path) -> None:
282 from muse.cli.commands.coord_sync import _write_remote_records
283 records = [
284 {"kind": "reservation", "record_id": "r1", "payload": {}},
285 {"kind": "heartbeat", "record_id": "h1", "payload": {}},
286 ]
287 _write_remote_records(repo, records)
288 assert (coordination_dir(repo) / "remote" / "reservation" / "r1.json").exists()
289 assert (coordination_dir(repo) / "remote" / "heartbeat" / "h1.json").exists()
290
291
292 # ── Integration: push ─────────────────────────────────────────────────────────
293
294
295 class TestCoordSyncPushIntegration:
296 def test_push_no_local_records_text_output(self, repo: pathlib.Path) -> None:
297 with patch(_PUSH_TARGET) as mock_push:
298 result = runner.invoke(cli, _PUSH_ARGS)
299 assert result.exit_code == 0
300 assert "no local coordination records to push" in result.output
301 mock_push.assert_not_called()
302
303 def test_push_with_reservation_calls_push_to_hub(self, repo: pathlib.Path) -> None:
304 _write_local_reservation(repo)
305 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)) as mock_push:
306 result = runner.invoke(cli, _PUSH_ARGS)
307 assert result.exit_code == 0
308 mock_push.assert_called_once()
309 call_args = mock_push.call_args
310 assert call_args[0][1] == "gabriel" # owner
311 assert call_args[0][2] == "myrepo" # slug
312
313 def test_push_text_output_contains_inserted_skipped(self, repo: pathlib.Path) -> None:
314 _write_local_reservation(repo)
315 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
316 result = runner.invoke(cli, _PUSH_ARGS)
317 assert result.exit_code == 0
318 assert "inserted" in result.output
319 assert "skipped" in result.output
320
321 def test_push_coord_bus_error_exits_1(self, repo: pathlib.Path) -> None:
322 _write_local_reservation(repo)
323 with patch(_PUSH_TARGET, side_effect=CoordBusError("hub down")):
324 result = runner.invoke(cli, _PUSH_ARGS)
325 assert result.exit_code == 1
326 assert "error" in result.stderr.lower() or "hub down" in result.stderr
327
328 def test_push_format_json_valid_structure(self, repo: pathlib.Path) -> None:
329 _write_local_reservation(repo)
330 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
331 result = runner.invoke(cli, _PUSH_ARGS + ["--json"])
332 assert result.exit_code == 0
333 data = json.loads(result.output.strip())
334 assert "inserted" in data
335 assert "skipped" in data
336 assert "total" in data
337 assert "failed" in data
338
339 def test_push_json_shorthand(self, repo: pathlib.Path) -> None:
340 _write_local_reservation(repo)
341 with patch(_PUSH_TARGET, return_value=_push_ok(2, 1)):
342 r1 = runner.invoke(cli, _PUSH_ARGS + ["--json"])
343 r2 = runner.invoke(cli, _PUSH_ARGS + ["--json"])
344 d1 = json.loads(r1.output.strip())
345 d2 = json.loads(r2.output.strip())
346 d1.pop("timestamp", None)
347 d2.pop("timestamp", None)
348 d1.pop("duration_ms", None)
349 d2.pop("duration_ms", None)
350 assert d1 == d2
351
352 def test_push_json_no_records_returns_zeros(self, repo: pathlib.Path) -> None:
353 with patch(_PUSH_TARGET):
354 result = runner.invoke(cli, _PUSH_ARGS + ["--json"])
355 assert result.exit_code == 0
356 data = json.loads(result.output.strip())
357 assert data["total"] == 0
358 assert data["inserted"] == 0
359
360 def test_push_kinds_filter_passed_through(self, repo: pathlib.Path) -> None:
361 _write_local_reservation(repo)
362 _write_local_heartbeat(repo)
363 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)) as mock_push:
364 result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation"])
365 assert result.exit_code == 0
366 # Only 1 kind β†’ batch should contain only reservations
367 batch_arg = mock_push.call_args[0][3]
368 assert all(r["kind"] == "reservation" for r in batch_arg)
369
370 def test_push_coord_bus_error_json_failed_true(self, repo: pathlib.Path) -> None:
371 _write_local_reservation(repo)
372 with patch(_PUSH_TARGET, side_effect=CoordBusError("oops")):
373 result = runner.invoke(cli, _PUSH_ARGS + ["--json"])
374 # exit_code 1; summary JSON (with failed=True) is the last JSON line
375 assert result.exit_code == 1
376 json_lines = [ln for ln in result.output.splitlines() if ln.startswith("{")]
377 assert json_lines, "Expected at least one JSON output line"
378 # The summary JSON is last; the error JSON ({"error": ...}) may appear before it
379 summary = json.loads(json_lines[-1])
380 assert summary["failed"] is True
381
382
383 # ── Integration: pull ─────────────────────────────────────────────────────────
384
385
386 class TestCoordSyncPullIntegration:
387 def test_pull_empty_result_exits_0(self, repo: pathlib.Path) -> None:
388 with patch(_PULL_TARGET, return_value=_pull_ok()):
389 result = runner.invoke(cli, _PULL_ARGS)
390 assert result.exit_code == 0
391 assert "Pulled 0 new record(s)" in result.output
392
393 def test_pull_with_records_writes_files(self, repo: pathlib.Path) -> None:
394 records = [{"kind": "reservation", "record_id": "r1", "payload": {"x": 1}}]
395 with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=1)):
396 result = runner.invoke(cli, _PULL_ARGS)
397 assert result.exit_code == 0
398 target = coordination_dir(repo) / "remote" / "reservation" / "r1.json"
399 assert target.exists()
400
401 def test_pull_text_output_contains_cursor(self, repo: pathlib.Path) -> None:
402 records = [{"kind": "reservation", "record_id": "r42", "payload": {}}]
403 with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=42)):
404 result = runner.invoke(cli, _PULL_ARGS)
405 assert result.exit_code == 0
406 assert "cursor: 42" in result.output
407
408 def test_pull_coord_bus_error_exits_1(self, repo: pathlib.Path) -> None:
409 with patch(_PULL_TARGET, side_effect=CoordBusError("connection refused")):
410 result = runner.invoke(cli, _PULL_ARGS)
411 assert result.exit_code == 1
412 assert "connection refused" in result.stderr
413
414 def test_pull_format_json_valid_structure(self, repo: pathlib.Path) -> None:
415 with patch(_PULL_TARGET, return_value=_pull_ok(cursor=7)):
416 result = runner.invoke(cli, _PULL_ARGS + ["--json"])
417 assert result.exit_code == 0
418 data = json.loads(result.output.strip())
419 assert "count" in data
420 assert "cursor" in data
421 assert "records" in data
422
423 def test_pull_json_shorthand(self, repo: pathlib.Path) -> None:
424 with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)):
425 r1 = runner.invoke(cli, _PULL_ARGS + ["--json"])
426 r2 = runner.invoke(cli, _PULL_ARGS + ["--json"])
427 d1 = json.loads(r1.output.strip())
428 d2 = json.loads(r2.output.strip())
429 d1.pop("timestamp", None)
430 d2.pop("timestamp", None)
431 d1.pop("duration_ms", None)
432 d2.pop("duration_ms", None)
433 assert d1 == d2
434
435 def test_pull_since_id_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None:
436 with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull:
437 runner.invoke(cli, _PULL_ARGS[:-2] + ["--since-id", "99"])
438 assert mock_pull.called
439 call_args = mock_pull.call_args[0]
440 assert call_args[3] == 99 # since_id
441
442 def test_pull_limit_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None:
443 with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull:
444 runner.invoke(cli, _PULL_ARGS + ["--limit", "42"])
445 call_args = mock_pull.call_args[0]
446 assert call_args[5] == 42 # limit
447
448 def test_pull_kinds_filter_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None:
449 with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull:
450 runner.invoke(cli, _PULL_ARGS + ["--kinds", "reservation", "heartbeat"])
451 call_args = mock_pull.call_args[0]
452 assert "reservation" in call_args[4]
453 assert "heartbeat" in call_args[4]
454
455 def test_pull_json_count_matches_records_length(self, repo: pathlib.Path) -> None:
456 records = [
457 {"kind": "reservation", "record_id": f"r{i}", "payload": {}}
458 for i in range(5)
459 ]
460 with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=5)):
461 result = runner.invoke(cli, _PULL_ARGS + ["--json"])
462 data = json.loads(result.output.strip())
463 assert data["count"] == 5
464 assert len(data["records"]) == 5
465
466 def test_pull_signing_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None:
467 with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull:
468 runner.invoke(cli, _PULL_ARGS)
469 call_args = mock_pull.call_args[0]
470 assert call_args[6] is None # signing (no identity configured in test)
471
472
473 # ── Security ──────────────────────────────────────────────────────────────────
474
475
476 class TestCoordSyncSecurity:
477 def test_push_traversal_owner_passed_as_string(self, repo: pathlib.Path) -> None:
478 """Path-traversal chars in owner are passed as-is to push_to_hub (encoding is push_to_hub's job)."""
479 _write_local_reservation(repo)
480 malicious_owner = "../traversal"
481 args = [
482 "coord", "sync", "push",
483 "--hub", "https://localhost:1337",
484 "--owner", malicious_owner,
485 "--slug", "myrepo",
486
487 ]
488 with patch(_PUSH_TARGET, return_value=_push_ok()) as mock_push:
489 runner.invoke(cli, args)
490 if mock_push.called:
491 call_args = mock_push.call_args[0]
492 assert call_args[1] == malicious_owner # owner string passed verbatim
493
494 def test_token_not_in_output(self, repo: pathlib.Path) -> None:
495 _write_local_reservation(repo)
496 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
497 result = runner.invoke(cli, _PUSH_ARGS)
498 assert "tok" not in result.output
499
500 def test_pull_token_not_in_output(self, repo: pathlib.Path) -> None:
501 with patch(_PULL_TARGET, return_value=_pull_ok()):
502 result = runner.invoke(cli, _PULL_ARGS)
503 assert "tok" not in result.output
504
505 def test_write_remote_records_rejects_unknown_kind(self, repo: pathlib.Path) -> None:
506 """Server-supplied kind '../../malicious' must not escape remote/ directory."""
507 from muse.cli.commands.coord_sync import _write_remote_records
508 rec = {"kind": "../../malicious", "record_id": "abc123", "payload": {}}
509 _write_remote_records(repo, [rec])
510 remote_dir = coordination_dir(repo) / "remote"
511 assert not remote_dir.exists() or not any(remote_dir.rglob("*.json"))
512
513 def test_write_remote_records_rejects_traversal_record_id(self, repo: pathlib.Path) -> None:
514 """Server-supplied record_id '../../../etc/passwd' must not escape kind dir."""
515 from muse.cli.commands.coord_sync import _write_remote_records
516 rec = {"kind": "reservation", "record_id": "../../../etc/passwd", "payload": {}}
517 _write_remote_records(repo, [rec])
518 remote_dir = coordination_dir(repo) / "remote"
519 assert not remote_dir.exists() or not any(remote_dir.rglob("*.json"))
520
521
522 # ── Unit: _gather_local_records β€” claim field name ─────────────────────────────
523
524
525 class TestGatherLocalRecordsClaims:
526 def test_claim_uses_claimer_run_id_field(self, repo: pathlib.Path) -> None:
527 """Bug fix: claims must read 'claimer_run_id', not 'claimed_by'."""
528 from muse.cli.commands.coord_sync import _gather_local_records
529 tid = _write_local_claim(repo, run_id="correct-worker")
530 records = _gather_local_records(repo, kinds=["claim"])
531 assert len(records) == 1
532 assert records[0]["run_id"] == "correct-worker"
533
534 def test_claim_record_id_is_task_id(self, repo: pathlib.Path) -> None:
535 from muse.cli.commands.coord_sync import _gather_local_records
536 tid = _write_local_claim(repo)
537 records = _gather_local_records(repo, kinds=["claim"])
538 assert records[0]["record_id"] == tid
539
540 def test_claim_expires_at_included(self, repo: pathlib.Path) -> None:
541 from muse.cli.commands.coord_sync import _gather_local_records
542 _write_local_claim(repo)
543 records = _gather_local_records(repo, kinds=["claim"])
544 assert records[0]["expires_at"] is not None
545
546
547 # ── Unit: _write_remote_records β€” compact JSON ────────────────────────────────
548
549
550 class TestWriteRemoteRecordsCompact:
551 def test_written_json_is_compact(self, repo: pathlib.Path) -> None:
552 """No indent=2 β€” remote files must be compact single-line JSON."""
553 from muse.cli.commands.coord_sync import _write_remote_records
554 rec = {"kind": "reservation", "record_id": "compact-test", "payload": {"x": 1}}
555 _write_remote_records(repo, [rec])
556 target = coordination_dir(repo) / "remote" / "reservation" / "compact-test.json"
557 raw = target.read_text().strip()
558 # Compact JSON has no interior newlines
559 assert "\n" not in raw
560
561 def test_valid_kinds_all_accepted(self, repo: pathlib.Path) -> None:
562 """All 7 kinds are accepted by the allowlist."""
563 from muse.cli.commands.coord_sync import _write_remote_records
564 records = [
565 {"kind": k, "record_id": f"id-{i}", "payload": {}}
566 for i, k in enumerate(_ALL_KINDS)
567 ]
568 _write_remote_records(repo, records)
569 remote_dir = coordination_dir(repo) / "remote"
570 written = list(remote_dir.rglob("*.json"))
571 assert len(written) == len(_ALL_KINDS)
572
573 def test_empty_record_id_still_skipped(self, repo: pathlib.Path) -> None:
574 from muse.cli.commands.coord_sync import _write_remote_records
575 rec = {"kind": "reservation", "record_id": "", "payload": {}}
576 _write_remote_records(repo, [rec])
577 remote_dir = coordination_dir(repo) / "remote"
578 assert not remote_dir.exists() or not any(remote_dir.rglob("*.json"))
579
580 def test_record_id_with_dots_rejected(self, repo: pathlib.Path) -> None:
581 """Dots in record_id are not allowed (could be used for traversal)."""
582 from muse.cli.commands.coord_sync import _write_remote_records
583 rec = {"kind": "reservation", "record_id": "..malicious", "payload": {}}
584 _write_remote_records(repo, [rec])
585 remote_dir = coordination_dir(repo) / "remote"
586 assert not remote_dir.exists() or not any(remote_dir.rglob("*.json"))
587
588 def test_record_id_with_slash_rejected(self, repo: pathlib.Path) -> None:
589 from muse.cli.commands.coord_sync import _write_remote_records
590 rec = {"kind": "reservation", "record_id": "a/b", "payload": {}}
591 _write_remote_records(repo, [rec])
592 remote_dir = coordination_dir(repo) / "remote"
593 assert not remote_dir.exists() or not any(remote_dir.rglob("*.json"))
594
595 def test_record_id_too_long_rejected(self, repo: pathlib.Path) -> None:
596 """record_ids over 128 chars are rejected."""
597 from muse.cli.commands.coord_sync import _write_remote_records
598 rec = {"kind": "reservation", "record_id": "a" * 129, "payload": {}}
599 _write_remote_records(repo, [rec])
600 remote_dir = coordination_dir(repo) / "remote"
601 assert not remote_dir.exists() or not any(remote_dir.rglob("*.json"))
602
603
604 # ── Input validation ──────────────────────────────────────────────────────────
605
606
607 class TestSyncInputValidation:
608 def _push_args(self, owner: str = "gabriel", slug: str = "myrepo", extra: list[str] | None = None) -> list[str]:
609 args = [
610 "coord", "sync", "push",
611 "--hub", "https://localhost:1337",
612 "--owner", owner,
613 "--slug", slug,
614
615 ]
616 if extra:
617 args.extend(extra)
618 return args
619
620 def _pull_args(self, owner: str = "gabriel", slug: str = "myrepo", extra: list[str] | None = None) -> list[str]:
621 args = [
622 "coord", "sync", "pull",
623 "--hub", "https://localhost:1337",
624 "--owner", owner,
625 "--slug", slug,
626
627 ]
628 if extra:
629 args.extend(extra)
630 return args
631
632 def test_push_owner_too_long_exits_1(self, repo: pathlib.Path) -> None:
633 owner = "x" * (_MAX_OWNER_LEN + 1)
634 result = runner.invoke(cli, self._push_args(owner=owner))
635 assert result.exit_code == 1
636
637 def test_push_slug_too_long_exits_1(self, repo: pathlib.Path) -> None:
638 slug = "x" * (_MAX_SLUG_LEN + 1)
639 result = runner.invoke(cli, self._push_args(slug=slug))
640 assert result.exit_code == 1
641
642 def test_push_owner_at_max_accepted(self, repo: pathlib.Path) -> None:
643 owner = "x" * _MAX_OWNER_LEN
644 with patch(_PUSH_TARGET, return_value=_push_ok()):
645 result = runner.invoke(cli, self._push_args(owner=owner))
646 # No validation error β€” exits 0 (no records to push)
647 assert result.exit_code == 0
648
649 def test_push_slug_at_max_accepted(self, repo: pathlib.Path) -> None:
650 slug = "x" * _MAX_SLUG_LEN
651 with patch(_PUSH_TARGET, return_value=_push_ok()):
652 result = runner.invoke(cli, self._push_args(slug=slug))
653 assert result.exit_code == 0
654
655 def test_push_owner_too_long_json_error(self, repo: pathlib.Path) -> None:
656 owner = "x" * (_MAX_OWNER_LEN + 1)
657 result = runner.invoke(cli, self._push_args(owner=owner) + ["--json"])
658 assert result.exit_code == 1
659 data = json.loads(result.output.strip())
660 assert data["status"] == "bad_args"
661
662 def test_pull_owner_too_long_exits_1(self, repo: pathlib.Path) -> None:
663 owner = "x" * (_MAX_OWNER_LEN + 1)
664 result = runner.invoke(cli, self._pull_args(owner=owner))
665 assert result.exit_code == 1
666
667 def test_pull_slug_too_long_exits_1(self, repo: pathlib.Path) -> None:
668 slug = "x" * (_MAX_SLUG_LEN + 1)
669 result = runner.invoke(cli, self._pull_args(slug=slug))
670 assert result.exit_code == 1
671
672 def test_pull_since_id_negative_exits_1(self, repo: pathlib.Path) -> None:
673 result = runner.invoke(cli, self._pull_args(extra=["--since-id", "-1"]))
674 assert result.exit_code == 1
675
676 def test_pull_since_id_negative_json_error(self, repo: pathlib.Path) -> None:
677 result = runner.invoke(cli, self._pull_args(extra=["--since-id", "-1", "--json"]))
678 assert result.exit_code == 1
679 data = json.loads(result.output.strip())
680 assert data["status"] == "bad_args"
681
682 def test_pull_limit_zero_exits_1(self, repo: pathlib.Path) -> None:
683 result = runner.invoke(cli, self._pull_args(extra=["--limit", "0"]))
684 assert result.exit_code == 1
685
686 def test_pull_limit_over_max_exits_1(self, repo: pathlib.Path) -> None:
687 result = runner.invoke(cli, self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT + 1)]))
688 assert result.exit_code == 1
689
690 def test_pull_limit_at_min_accepted(self, repo: pathlib.Path) -> None:
691 with patch(_PULL_TARGET, return_value=_pull_ok()):
692 result = runner.invoke(cli, self._pull_args(extra=["--limit", "1"]))
693 assert result.exit_code == 0
694
695 def test_pull_limit_at_max_accepted(self, repo: pathlib.Path) -> None:
696 with patch(_PULL_TARGET, return_value=_pull_ok()):
697 result = runner.invoke(cli, self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT)]))
698 assert result.exit_code == 0
699
700 def test_pull_limit_over_max_json_error(self, repo: pathlib.Path) -> None:
701 result = runner.invoke(
702 cli,
703 self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT + 1), "--json"]),
704 )
705 assert result.exit_code == 1
706 data = json.loads(result.output.strip())
707 assert data["status"] == "bad_args"
708
709 def test_push_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
710 """Validation must not touch filesystem β€” no repo needed."""
711 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) # no .muse dir
712 owner = "x" * (_MAX_OWNER_LEN + 1)
713 result = runner.invoke(cli, [
714 "coord", "sync", "push",
715 "--hub", "https://localhost:1337",
716 "--owner", owner,
717 "--slug", "myrepo",
718
719 ])
720 assert result.exit_code == 1
721
722 def test_pull_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
723 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) # no .muse dir
724 result = runner.invoke(cli, [
725 "coord", "sync", "pull",
726 "--hub", "https://localhost:1337",
727 "--owner", "gabriel",
728 "--slug", "myrepo",
729
730 "--since-id", "-5",
731 ])
732 assert result.exit_code == 1
733
734
735 # ── JSON schema: schema_version and duration_ms ──────────────────────────
736
737
738 class TestSyncJsonSchema:
739 def test_push_json_includes_schema_version(self, repo: pathlib.Path) -> None:
740 _write_local_reservation(repo)
741 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
742 result = runner.invoke(cli, _PUSH_ARGS + ["--json"])
743 assert result.exit_code == 0
744 data = json.loads(result.output.strip())
745 assert "schema" in data
746
747 def test_push_json_includes_duration_ms(self, repo: pathlib.Path) -> None:
748 _write_local_reservation(repo)
749 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
750 result = runner.invoke(cli, _PUSH_ARGS + ["--json"])
751 assert result.exit_code == 0
752 data = json.loads(result.output.strip())
753 assert "duration_ms" in data
754 assert isinstance(data["duration_ms"], float)
755
756 def test_push_json_no_records_includes_elapsed(self, repo: pathlib.Path) -> None:
757 with patch(_PUSH_TARGET):
758 result = runner.invoke(cli, _PUSH_ARGS + ["--json"])
759 assert result.exit_code == 0
760 data = json.loads(result.output.strip())
761 assert "duration_ms" in data
762
763 def test_pull_json_includes_schema_version(self, repo: pathlib.Path) -> None:
764 with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)):
765 result = runner.invoke(cli, _PULL_ARGS + ["--json"])
766 assert result.exit_code == 0
767 data = json.loads(result.output.strip())
768 assert "schema" in data
769
770 def test_pull_json_includes_duration_ms(self, repo: pathlib.Path) -> None:
771 with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)):
772 result = runner.invoke(cli, _PULL_ARGS + ["--json"])
773 assert result.exit_code == 0
774 data = json.loads(result.output.strip())
775 assert "duration_ms" in data
776 assert isinstance(data["duration_ms"], float)
777
778 def test_push_text_includes_elapsed(self, repo: pathlib.Path) -> None:
779 _write_local_reservation(repo)
780 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
781 result = runner.invoke(cli, _PUSH_ARGS)
782 assert result.exit_code == 0
783 # elapsed is formatted as (Ns)
784 assert "s)" in result.output
785
786 def test_pull_text_includes_elapsed(self, repo: pathlib.Path) -> None:
787 with patch(_PULL_TARGET, return_value=_pull_ok(cursor=1)):
788 result = runner.invoke(cli, _PULL_ARGS)
789 assert result.exit_code == 0
790 assert "s)" in result.output
791
792
793 # ── Gather all 7 kinds ────────────────────────────────────────────────────────
794
795
796 class TestGatherAllKinds:
797 def _write_intent(self, repo: pathlib.Path) -> str:
798 d = coordination_dir(repo) / "intents"
799 d.mkdir(parents=True, exist_ok=True)
800 iid = _new_id()
801 data = {"intent_id": iid, "run_id": "agent", "expires_at": None}
802 (d / f"{iid}.json").write_text(json.dumps(data))
803 return iid
804
805 def _write_release(self, repo: pathlib.Path) -> str:
806 d = coordination_dir(repo) / "releases"
807 d.mkdir(parents=True, exist_ok=True)
808 rid = _new_id()
809 data = {"release_id": rid, "run_id": "agent"}
810 (d / f"{rid}.json").write_text(json.dumps(data))
811 return rid
812
813 def _write_dependency(self, repo: pathlib.Path) -> str:
814 d = coordination_dir(repo) / "dependencies"
815 d.mkdir(parents=True, exist_ok=True)
816 rid = _new_id()
817 data = {"reservation_id": rid}
818 (d / f"{rid}.json").write_text(json.dumps(data))
819 return rid
820
821 def _write_task(self, repo: pathlib.Path) -> str:
822 d = coordination_dir(repo) / "tasks"
823 d.mkdir(parents=True, exist_ok=True)
824 tid = _new_id()
825 data = {"task_id": tid, "run_id": "creator"}
826 (d / f"{tid}.json").write_text(json.dumps(data))
827 return tid
828
829 def test_all_kinds_gathered(self, repo: pathlib.Path) -> None:
830 from muse.cli.commands.coord_sync import _gather_local_records
831 _write_local_reservation(repo)
832 _write_local_heartbeat(repo, "hb-1")
833 self._write_intent(repo)
834 self._write_release(repo)
835 self._write_dependency(repo)
836 self._write_task(repo)
837 _write_local_claim(repo)
838 records = _gather_local_records(repo, kinds=list(_ALL_KINDS))
839 kinds_found = {r["kind"] for r in records}
840 assert kinds_found == set(_ALL_KINDS)
841
842 def test_each_kind_has_correct_record_id(self, repo: pathlib.Path) -> None:
843 from muse.cli.commands.coord_sync import _gather_local_records
844 rid = _write_local_reservation(repo)
845 records = _gather_local_records(repo, kinds=["reservation"])
846 assert records[0]["record_id"] == rid
847
848 def test_release_has_none_expires_at(self, repo: pathlib.Path) -> None:
849 from muse.cli.commands.coord_sync import _gather_local_records
850 self._write_release(repo)
851 records = _gather_local_records(repo, kinds=["release"])
852 assert records[0]["expires_at"] is None
853
854 def test_dependency_has_none_expires_at(self, repo: pathlib.Path) -> None:
855 from muse.cli.commands.coord_sync import _gather_local_records
856 self._write_dependency(repo)
857 records = _gather_local_records(repo, kinds=["dependency"])
858 assert records[0]["expires_at"] is None
859
860 def test_task_has_none_expires_at(self, repo: pathlib.Path) -> None:
861 from muse.cli.commands.coord_sync import _gather_local_records
862 self._write_task(repo)
863 records = _gather_local_records(repo, kinds=["task"])
864 assert records[0]["expires_at"] is None
865
866
867 # ── Stress tests ──────────────────────────────────────────────────────────────
868
869
870 class TestSyncStress:
871 def test_push_600_records_batched(self, repo: pathlib.Path) -> None:
872 """600 records must be split across β‰₯ 2 batches of MAX_PUSH_BATCH."""
873 from muse.core.coord_bus import MAX_PUSH_BATCH
874 for i in range(600):
875 _write_local_reservation(repo, run_id=f"agent-{i}")
876 call_count = 0
877
878 def fake_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict:
879 nonlocal call_count
880 call_count += 1
881 assert len(batch) <= MAX_PUSH_BATCH
882 return {"inserted": len(batch), "skipped": 0}
883
884 with patch(_PUSH_TARGET, side_effect=fake_push):
885 result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation"])
886 assert result.exit_code == 0
887 assert call_count >= 2
888
889 def test_push_600_records_inserted_count_correct(self, repo: pathlib.Path) -> None:
890 for i in range(600):
891 _write_local_reservation(repo, run_id=f"agent-{i}")
892 with patch(_PUSH_TARGET, return_value=_push_ok(inserted=1, skipped=0)) as mock:
893 result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation", "--json"])
894 assert result.exit_code == 0
895 data = json.loads(result.output.strip())
896 assert data["total"] == 600
897
898 def test_pull_1000_records_all_written(self, repo: pathlib.Path) -> None:
899 records = [
900 {"kind": "reservation", "record_id": _new_id(), "payload": {}}
901 for _ in range(1000)
902 ]
903 with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=1000)):
904 result = runner.invoke(cli, _PULL_ARGS + ["--limit", "1000"])
905 assert result.exit_code == 0
906 remote_dir = coordination_dir(repo) / "remote" / "reservation"
907 written = list(remote_dir.glob("*.json"))
908 assert len(written) == 1000
909
910 def test_pull_mixed_invalid_records_skipped(self, repo: pathlib.Path) -> None:
911 """Records with invalid kind/record_id are skipped; valid ones still written."""
912 good = {"kind": "reservation", "record_id": "good-record-1", "payload": {}}
913 bad_kind = {"kind": "../traversal", "record_id": "malicious-id", "payload": {}}
914 bad_id = {"kind": "reservation", "record_id": "../etc/passwd", "payload": {}}
915 with patch(_PULL_TARGET, return_value=_pull_ok([good, bad_kind, bad_id], cursor=3)):
916 result = runner.invoke(cli, _PULL_ARGS)
917 assert result.exit_code == 0
918 good_file = coordination_dir(repo) / "remote" / "reservation" / "good-record-1.json"
919 assert good_file.exists()
920 # Only 1 file written (the good record)
921 remote_dir = coordination_dir(repo) / "remote"
922 all_files = list(remote_dir.rglob("*.json"))
923 assert len(all_files) == 1
924
925 def test_push_partial_failure_reports_failed_true(self, repo: pathlib.Path) -> None:
926 """If one batch fails, failed=True in JSON even if other batches succeed."""
927 from muse.core.coord_bus import MAX_PUSH_BATCH
928 # Write enough for 2 batches
929 for i in range(MAX_PUSH_BATCH + 1):
930 _write_local_reservation(repo, run_id=f"agent-{i}")
931
932 call_count = 0
933 def sometimes_fail(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict:
934 nonlocal call_count
935 call_count += 1
936 if call_count == 1:
937 raise CoordBusError("first batch failed")
938 return {"inserted": len(batch), "skipped": 0}
939
940 with patch(_PUSH_TARGET, side_effect=sometimes_fail):
941 result = runner.invoke(
942 cli, _PUSH_ARGS + ["--kinds", "reservation", "--json"]
943 )
944 assert result.exit_code == 1
945 # Find JSON line (error from first batch goes to stdout too in JSON mode)
946 json_lines = [ln for ln in result.output.splitlines() if ln.startswith("{")]
947 final = json.loads(json_lines[-1])
948 assert final["failed"] is True
949
950
951 # ---------------------------------------------------------------------------
952 # Extended β€” muse coord sync push
953 # ---------------------------------------------------------------------------
954
955
956 class TestCoordSyncPushExtended:
957 def test_j_alias_works(self, repo: pathlib.Path) -> None:
958 """-j is equivalent to --json for push."""
959 with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)):
960 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
961 assert result.exit_code == 0, result.output
962 data = json.loads(result.output.strip())
963 assert "inserted" in data
964
965 def test_help_flag(self, repo: pathlib.Path) -> None:
966 result = runner.invoke(cli, ["coord", "sync", "push", "--help"])
967 assert result.exit_code == 0
968
969 def test_json_compact_single_line(self, repo: pathlib.Path) -> None:
970 """JSON output is a single compact line β€” no indent=2."""
971 _write_local_reservation(repo)
972 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
973 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
974 assert result.exit_code == 0
975 lines = [l for l in result.output.splitlines() if l.strip().startswith("{")]
976 assert len(lines) == 1, f"Expected compact JSON, got: {result.output!r}"
977
978 def test_json_all_required_fields(self, repo: pathlib.Path) -> None:
979 """JSON always has schema_version, inserted, skipped, total, failed, duration_ms."""
980 _write_local_reservation(repo)
981 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
982 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
983 data = json.loads(result.output.strip())
984 for field in ("schema", "inserted", "skipped", "total", "failed", "duration_ms"):
985 assert field in data, f"Missing field: {field}"
986
987 def test_json_inserted_is_int(self, repo: pathlib.Path) -> None:
988 _write_local_reservation(repo)
989 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
990 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
991 data = json.loads(result.output.strip())
992 assert isinstance(data["inserted"], int)
993 assert isinstance(data["skipped"], int)
994 assert isinstance(data["total"], int)
995
996 def test_json_failed_is_bool(self, repo: pathlib.Path) -> None:
997 with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)):
998 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
999 data = json.loads(result.output.strip())
1000 assert isinstance(data["failed"], bool)
1001 assert data["failed"] is False
1002
1003 def test_json_duration_ms_is_number(self, repo: pathlib.Path) -> None:
1004 with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)):
1005 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1006 data = json.loads(result.output.strip())
1007 assert isinstance(data["duration_ms"], (int, float))
1008 assert data["duration_ms"] >= 0
1009
1010 def test_json_schema_is_int(self, repo: pathlib.Path) -> None:
1011 with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)):
1012 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1013 data = json.loads(result.output.strip())
1014 assert isinstance(data["schema"], int)
1015 assert data["schema"] >= 1
1016
1017 def test_json_total_matches_gathered_records(self, repo: pathlib.Path) -> None:
1018 _write_local_reservation(repo)
1019 _write_local_reservation(repo)
1020 with patch(_PUSH_TARGET, return_value=_push_ok(2, 0)):
1021 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1022 data = json.loads(result.output.strip())
1023 assert data["total"] == 2
1024
1025 def test_json_no_records_total_is_zero(self, repo: pathlib.Path) -> None:
1026 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1027 assert result.exit_code == 0
1028 data = json.loads(result.output.strip())
1029 assert data["total"] == 0
1030 assert data["inserted"] == 0
1031 assert data["skipped"] == 0
1032
1033 def test_idempotent_second_push_all_skipped(self, repo: pathlib.Path) -> None:
1034 """Second push: all records skipped (hub already has them)."""
1035 _write_local_reservation(repo)
1036 with patch(_PUSH_TARGET, return_value=_push_ok(0, 1)):
1037 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1038 data = json.loads(result.output.strip())
1039 assert data["skipped"] == 1
1040 assert data["inserted"] == 0
1041
1042 def test_all_7_kinds_pushed(self, repo: pathlib.Path) -> None:
1043 """Push with all 7 kinds gathers records from every kind directory."""
1044 coord_dir = coordination_dir(repo)
1045 kind_dirs = {
1046 "reservations": ("reservation_id", "run_id"),
1047 "heartbeats": ("run_id", "run_id"),
1048 "intents": ("intent_id", "run_id"),
1049 "releases": ("release_id", "run_id"),
1050 "dependencies": ("reservation_id", "reservation_id"),
1051 "tasks": ("task_id", "run_id"),
1052 "claims": ("task_id", "claimer_run_id"),
1053 }
1054 for subdir, (id_field, run_field) in kind_dirs.items():
1055 d = coord_dir / subdir
1056 d.mkdir(parents=True, exist_ok=True)
1057 rid = _new_id()
1058 (d / f"{rid}.json").write_text(json.dumps({id_field: rid, run_field: "r1"}))
1059
1060 with patch(_PUSH_TARGET, return_value=_push_ok(7, 0)) as mock_push:
1061 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1062 assert result.exit_code == 0
1063 data = json.loads(result.output.strip())
1064 assert data["total"] == 7
1065
1066 def test_text_output_shows_owner_slug(self, repo: pathlib.Path) -> None:
1067 _write_local_reservation(repo)
1068 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
1069 result = runner.invoke(cli, _PUSH_ARGS)
1070 assert "gabriel" in result.output
1071 assert "myrepo" in result.output
1072
1073 def test_text_output_shows_checkmark_on_success(self, repo: pathlib.Path) -> None:
1074 _write_local_reservation(repo)
1075 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
1076 result = runner.invoke(cli, _PUSH_ARGS)
1077 assert "βœ…" in result.output
1078
1079 def test_text_output_shows_cross_on_failure(self, repo: pathlib.Path) -> None:
1080 _write_local_reservation(repo)
1081 with patch(_PUSH_TARGET, side_effect=CoordBusError("hub down")):
1082 result = runner.invoke(cli, _PUSH_ARGS)
1083 assert "❌" in result.stderr or result.exit_code == 1
1084
1085 def test_help_shows_agent_quickstart(self, repo: pathlib.Path) -> None:
1086 result = runner.invoke(cli, ["coord", "sync", "push", "--help"])
1087 assert "Agent quickstart" in result.output
1088
1089 def test_help_shows_json_schema(self, repo: pathlib.Path) -> None:
1090 result = runner.invoke(cli, ["coord", "sync", "push", "--help"])
1091 assert "JSON output schema" in result.output
1092
1093 def test_help_shows_exit_codes(self, repo: pathlib.Path) -> None:
1094 result = runner.invoke(cli, ["coord", "sync", "push", "--help"])
1095 assert "Exit codes" in result.output
1096
1097
1098 # ---------------------------------------------------------------------------
1099 # Security β€” muse coord sync push
1100 # ---------------------------------------------------------------------------
1101
1102
1103 class TestCoordSyncPushSecurity:
1104 def test_ansi_in_owner_sanitized_in_text_output(self, repo: pathlib.Path) -> None:
1105 """ANSI codes in --owner must not bleed into text output."""
1106 _write_local_reservation(repo)
1107 ansi_owner = "\x1b[31mmalicious\x1b[0m"
1108 push_args = [
1109 "coord", "sync", "push",
1110 "--hub", "https://localhost:1337",
1111 "--owner", ansi_owner,
1112 "--slug", "myrepo",
1113
1114 ]
1115 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
1116 result = runner.invoke(cli, push_args)
1117 assert "\x1b" not in result.output
1118
1119 def test_ansi_in_slug_sanitized_in_text_output(self, repo: pathlib.Path) -> None:
1120 _write_local_reservation(repo)
1121 ansi_slug = "\x1b[32minjected\x1b[0m"
1122 push_args = [
1123 "coord", "sync", "push",
1124 "--hub", "https://localhost:1337",
1125 "--owner", "gabriel",
1126 "--slug", ansi_slug,
1127
1128 ]
1129 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
1130 result = runner.invoke(cli, push_args)
1131 assert "\x1b" not in result.output
1132
1133 def test_token_not_in_json_output(self, repo: pathlib.Path) -> None:
1134 """Auth token must never appear in JSON output."""
1135 _write_local_reservation(repo)
1136 secret = "super-secret-token-xyz"
1137 push_args = [
1138 "coord", "sync", "push",
1139 "--hub", "https://localhost:1337",
1140 "--owner", "gabriel",
1141 "--slug", "myrepo",
1142
1143 "--json",
1144 ]
1145 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
1146 result = runner.invoke(cli, push_args)
1147 assert secret not in result.output
1148
1149 def test_token_not_in_text_output(self, repo: pathlib.Path) -> None:
1150 _write_local_reservation(repo)
1151 secret = "super-secret-token-abc"
1152 push_args = [
1153 "coord", "sync", "push",
1154 "--hub", "https://localhost:1337",
1155 "--owner", "gabriel",
1156 "--slug", "myrepo",
1157
1158 ]
1159 with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)):
1160 result = runner.invoke(cli, push_args)
1161 assert secret not in result.output
1162
1163 def test_no_traceback_on_coord_bus_error(self, repo: pathlib.Path) -> None:
1164 _write_local_reservation(repo)
1165 with patch(_PUSH_TARGET, side_effect=CoordBusError("network failure")):
1166 result = runner.invoke(cli, _PUSH_ARGS)
1167 assert "Traceback" not in result.output
1168
1169 def test_owner_length_cap_before_io(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
1170 """Owner length check fires before any file system access."""
1171 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
1172 long_owner = "x" * (_MAX_OWNER_LEN + 1)
1173 push_args = [
1174 "coord", "sync", "push",
1175 "--hub", "https://localhost:1337",
1176 "--owner", long_owner,
1177 "--slug", "myrepo",
1178
1179 ]
1180 result = runner.invoke(cli, push_args)
1181 assert result.exit_code == 1
1182 assert "Traceback" not in result.output
1183
1184
1185 # ---------------------------------------------------------------------------
1186 # Stress β€” muse coord sync push
1187 # ---------------------------------------------------------------------------
1188
1189
1190 class TestCoordSyncPushStress:
1191 def test_50_sequential_push_calls_no_records(self, repo: pathlib.Path) -> None:
1192 """50 sequential pushes with no records all exit 0."""
1193 for i in range(50):
1194 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1195 assert result.exit_code == 0, f"Call {i}: {result.output}"
1196 data = json.loads(result.output.strip())
1197 assert data["total"] == 0
1198
1199 def test_push_1200_records_correct_batch_count(self, repo: pathlib.Path) -> None:
1200 """1200 records β†’ ceil(1200/500) = 3 batches."""
1201 from muse.core.coord_bus import MAX_PUSH_BATCH
1202 for i in range(1200):
1203 _write_local_reservation(repo, run_id=f"agent-{i}")
1204 call_count = 0
1205 def counting_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict:
1206 nonlocal call_count
1207 call_count += 1
1208 return {"inserted": len(batch), "skipped": 0}
1209 with patch(_PUSH_TARGET, side_effect=counting_push):
1210 result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation", "-j"])
1211 assert result.exit_code == 0
1212 expected_batches = -(-1200 // MAX_PUSH_BATCH) # ceil division
1213 assert call_count == expected_batches
1214 data = json.loads(result.output.strip())
1215 assert data["inserted"] == 1200
1216
1217 def test_concurrent_push_8_threads(self, repo: pathlib.Path) -> None:
1218 """8 threads each call run_push directly; patches applied at test level.
1219
1220 The goal is to verify that concurrent calls to run_push do not crash
1221 or corrupt internal state. All threads share the same repo fixture;
1222 per-thread module mutation is intentionally avoided here because
1223 unguarded write-then-restore of a module attribute across threads is a
1224 race condition that can leave the module permanently patched after the
1225 test completes, polluting later tests.
1226 """
1227 import argparse
1228 import threading
1229
1230 from muse.cli.commands.coord_sync import run_push
1231
1232 _write_local_reservation(repo, run_id="shared-agent")
1233
1234 errors: list[str] = []
1235
1236 def worker(idx: int) -> None:
1237 args = argparse.Namespace(
1238 hub="https://localhost:1337",
1239 owner="gabriel",
1240 slug="myrepo",
1241 signing=None,
1242 kinds=list(_ALL_KINDS),
1243 json_out=True,
1244 )
1245 try:
1246 run_push(args)
1247 except SystemExit as exc:
1248 if exc.code != 0:
1249 errors.append(f"Thread {idx}: exit {exc.code}")
1250 except Exception as exc:
1251 errors.append(f"Thread {idx}: {exc}")
1252
1253 def fake_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict:
1254 return {"inserted": len(batch), "skipped": 0}
1255
1256 push_p = patch(_PUSH_TARGET, side_effect=fake_push)
1257 repo_p = patch("muse.cli.commands.coord_sync.require_repo", return_value=repo)
1258 push_p.start()
1259 repo_p.start()
1260 try:
1261 threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
1262 for t in threads:
1263 t.start()
1264 for t in threads:
1265 t.join()
1266 finally:
1267 push_p.stop()
1268 repo_p.stop()
1269 assert not errors, f"Concurrent failures: {errors}"
1270
1271
1272 # ---------------------------------------------------------------------------
1273 # Extended β€” muse coord sync pull
1274 # ---------------------------------------------------------------------------
1275
1276
1277 class TestCoordSyncPullExtended:
1278 def test_j_alias_works(self, repo: pathlib.Path) -> None:
1279 """-j is equivalent to --json for pull."""
1280 with patch(_PULL_TARGET, return_value=_pull_ok()):
1281 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1282 assert result.exit_code == 0, result.output
1283 data = json.loads(result.output.strip())
1284 assert "count" in data
1285
1286 def test_help_flag(self, repo: pathlib.Path) -> None:
1287 result = runner.invoke(cli, ["coord", "sync", "pull", "--help"])
1288 assert result.exit_code == 0
1289
1290 def test_json_compact_single_line(self, repo: pathlib.Path) -> None:
1291 """JSON output is a single compact line β€” no indent=2."""
1292 with patch(_PULL_TARGET, return_value=_pull_ok()):
1293 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1294 assert result.exit_code == 0
1295 lines = [l for l in result.output.splitlines() if l.strip().startswith("{")]
1296 assert len(lines) == 1, f"Expected compact JSON, got: {result.output!r}"
1297
1298 def test_json_all_required_fields(self, repo: pathlib.Path) -> None:
1299 """JSON always has schema, count, cursor, records, duration_ms."""
1300 with patch(_PULL_TARGET, return_value=_pull_ok()):
1301 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1302 data = json.loads(result.output.strip())
1303 for field in ("schema", "count", "cursor", "records", "duration_ms"):
1304 assert field in data, f"Missing field: {field}"
1305
1306 def test_json_count_is_int(self, repo: pathlib.Path) -> None:
1307 with patch(_PULL_TARGET, return_value=_pull_ok()):
1308 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1309 data = json.loads(result.output.strip())
1310 assert isinstance(data["count"], int)
1311 assert isinstance(data["cursor"], int)
1312
1313 def test_json_records_is_list(self, repo: pathlib.Path) -> None:
1314 with patch(_PULL_TARGET, return_value=_pull_ok()):
1315 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1316 data = json.loads(result.output.strip())
1317 assert isinstance(data["records"], list)
1318
1319 def test_json_duration_ms_is_number(self, repo: pathlib.Path) -> None:
1320 with patch(_PULL_TARGET, return_value=_pull_ok()):
1321 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1322 data = json.loads(result.output.strip())
1323 assert isinstance(data["duration_ms"], (int, float))
1324 assert data["duration_ms"] >= 0
1325
1326 def test_json_schema_is_int(self, repo: pathlib.Path) -> None:
1327 with patch(_PULL_TARGET, return_value=_pull_ok()):
1328 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1329 data = json.loads(result.output.strip())
1330 assert isinstance(data["schema"], int)
1331 assert data["schema"] >= 1
1332
1333 def test_json_count_matches_records_length(self, repo: pathlib.Path) -> None:
1334 fake_records = [
1335 {"kind": "reservation", "record_id": _new_id(), "run_id": "r1", "payload": {}},
1336 {"kind": "reservation", "record_id": _new_id(), "run_id": "r2", "payload": {}},
1337 ]
1338 with patch(_PULL_TARGET, return_value=_pull_ok(fake_records, cursor=2)):
1339 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1340 data = json.loads(result.output.strip())
1341 assert data["count"] == 2
1342 assert len(data["records"]) == 2
1343
1344 def test_json_cursor_reflects_hub_cursor(self, repo: pathlib.Path) -> None:
1345 with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=42)):
1346 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1347 data = json.loads(result.output.strip())
1348 assert data["cursor"] == 42
1349
1350 def test_zero_records_exits_0(self, repo: pathlib.Path) -> None:
1351 """0 records returned is a valid success."""
1352 with patch(_PULL_TARGET, return_value=_pull_ok()):
1353 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1354 assert result.exit_code == 0
1355 data = json.loads(result.output.strip())
1356 assert data["count"] == 0
1357
1358 def test_incremental_pull_since_id_forwarded(self, repo: pathlib.Path) -> None:
1359 """--since-id is passed through to pull_from_hub."""
1360 with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull:
1361 runner.invoke(cli, _PULL_ARGS + ["--since-id", "99"])
1362 _, kwargs = mock_pull.call_args
1363 assert mock_pull.call_args[0][3] == 99 or kwargs.get("since_id") == 99 or mock_pull.call_args[0][3] == 99
1364
1365 def test_records_written_to_remote_dir(self, repo: pathlib.Path) -> None:
1366 """Records returned by hub are written to .muse/coordination/remote/."""
1367 rid = _new_id()
1368 fake_records = [{"kind": "reservation", "record_id": rid, "run_id": "r1", "payload": {}}]
1369 with patch(_PULL_TARGET, return_value=_pull_ok(fake_records)):
1370 result = runner.invoke(cli, _PULL_ARGS)
1371 assert result.exit_code == 0
1372 written = coordination_dir(repo) / "remote" / "reservation" / f"{rid}.json"
1373 assert written.exists()
1374
1375 def test_text_output_shows_owner_slug(self, repo: pathlib.Path) -> None:
1376 with patch(_PULL_TARGET, return_value=_pull_ok()):
1377 result = runner.invoke(cli, _PULL_ARGS)
1378 assert "gabriel" in result.output
1379 assert "myrepo" in result.output
1380
1381 def test_text_output_shows_cursor(self, repo: pathlib.Path) -> None:
1382 with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=7)):
1383 result = runner.invoke(cli, _PULL_ARGS)
1384 assert "7" in result.output
1385
1386 def test_text_output_shows_remote_path_when_records(self, repo: pathlib.Path) -> None:
1387 rid = _new_id()
1388 fake_records = [{"kind": "reservation", "record_id": rid, "run_id": "r1", "payload": {}}]
1389 with patch(_PULL_TARGET, return_value=_pull_ok(fake_records)):
1390 result = runner.invoke(cli, _PULL_ARGS)
1391 assert "remote" in result.output.lower()
1392
1393 def test_help_shows_agent_quickstart(self, repo: pathlib.Path) -> None:
1394 result = runner.invoke(cli, ["coord", "sync", "pull", "--help"])
1395 assert "Agent quickstart" in result.output
1396
1397 def test_help_shows_json_schema(self, repo: pathlib.Path) -> None:
1398 result = runner.invoke(cli, ["coord", "sync", "pull", "--help"])
1399 assert "JSON output schema" in result.output
1400
1401 def test_help_shows_exit_codes(self, repo: pathlib.Path) -> None:
1402 result = runner.invoke(cli, ["coord", "sync", "pull", "--help"])
1403 assert "Exit codes" in result.output
1404
1405
1406 # ---------------------------------------------------------------------------
1407 # Security β€” muse coord sync pull
1408 # ---------------------------------------------------------------------------
1409
1410
1411 class TestCoordSyncPullSecurity:
1412 def test_ansi_in_owner_sanitized_in_text_output(self, repo: pathlib.Path) -> None:
1413 """ANSI codes in --owner must not bleed into text output."""
1414 ansi_owner = "\x1b[31mmalicious\x1b[0m"
1415 pull_args = [
1416 "coord", "sync", "pull",
1417 "--hub", "https://localhost:1337",
1418 "--owner", ansi_owner,
1419 "--slug", "myrepo",
1420
1421 "--since-id", "0",
1422 ]
1423 with patch(_PULL_TARGET, return_value=_pull_ok()):
1424 result = runner.invoke(cli, pull_args)
1425 assert "\x1b" not in result.output
1426
1427 def test_ansi_in_slug_sanitized_in_text_output(self, repo: pathlib.Path) -> None:
1428 ansi_slug = "\x1b[32minjected\x1b[0m"
1429 pull_args = [
1430 "coord", "sync", "pull",
1431 "--hub", "https://localhost:1337",
1432 "--owner", "gabriel",
1433 "--slug", ansi_slug,
1434
1435 "--since-id", "0",
1436 ]
1437 with patch(_PULL_TARGET, return_value=_pull_ok()):
1438 result = runner.invoke(cli, pull_args)
1439 assert "\x1b" not in result.output
1440
1441 def test_token_not_in_json_output(self, repo: pathlib.Path) -> None:
1442 """Auth token must never appear in JSON output."""
1443 secret = "super-secret-pull-token"
1444 pull_args = [
1445 "coord", "sync", "pull",
1446 "--hub", "https://localhost:1337",
1447 "--owner", "gabriel",
1448 "--slug", "myrepo",
1449
1450 "--since-id", "0",
1451 "--json",
1452 ]
1453 with patch(_PULL_TARGET, return_value=_pull_ok()):
1454 result = runner.invoke(cli, pull_args)
1455 assert secret not in result.output
1456
1457 def test_no_traceback_on_coord_bus_error(self, repo: pathlib.Path) -> None:
1458 with patch(_PULL_TARGET, side_effect=CoordBusError("timeout")):
1459 result = runner.invoke(cli, _PULL_ARGS)
1460 assert "Traceback" not in result.output
1461
1462 def test_remote_records_with_traversal_record_id_skipped(self, repo: pathlib.Path) -> None:
1463 """A record with path-traversal record_id must not escape remote/."""
1464 malicious_records = [
1465 {"kind": "reservation", "record_id": "../../malicious", "run_id": "r1", "payload": {}},
1466 ]
1467 with patch(_PULL_TARGET, return_value=_pull_ok(malicious_records)):
1468 result = runner.invoke(cli, _PULL_ARGS)
1469 assert result.exit_code == 0
1470 malicious_path = coordination_dir(repo) / "remote" / "reservation" / "../../malicious.json"
1471 assert not malicious_path.exists()
1472 # Confirm nothing was written at all
1473 remote_dir = coordination_dir(repo) / "remote"
1474 if remote_dir.exists():
1475 assert list(remote_dir.rglob("*.json")) == []
1476
1477 def test_remote_records_with_unknown_kind_skipped(self, repo: pathlib.Path) -> None:
1478 """A record with an unknown kind must not be written anywhere."""
1479 malicious_records = [
1480 {"kind": "../malicious_dir", "record_id": "safe-id", "run_id": "r1", "payload": {}},
1481 ]
1482 with patch(_PULL_TARGET, return_value=_pull_ok(malicious_records)):
1483 result = runner.invoke(cli, _PULL_ARGS)
1484 assert result.exit_code == 0
1485 remote_dir = coordination_dir(repo) / "remote"
1486 if remote_dir.exists():
1487 assert list(remote_dir.rglob("*.json")) == []
1488
1489
1490 # ---------------------------------------------------------------------------
1491 # Stress β€” muse coord sync pull
1492 # ---------------------------------------------------------------------------
1493
1494
1495 class TestCoordSyncPullStress:
1496 def test_50_sequential_pull_calls_no_records(self, repo: pathlib.Path) -> None:
1497 """50 sequential pulls with no records all exit 0."""
1498 for i in range(50):
1499 with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=i)):
1500 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1501 assert result.exit_code == 0, f"Call {i}: {result.output}"
1502 data = json.loads(result.output.strip())
1503 assert data["count"] == 0
1504
1505 def test_incremental_cursor_chain_100_steps(self, repo: pathlib.Path) -> None:
1506 """100 incremental pulls each use the cursor from the previous step."""
1507 cursor = 0
1508 for i in range(100):
1509 rid = _new_id()
1510 fake_records = [{"kind": "reservation", "record_id": rid, "run_id": f"r{i}", "payload": {}}]
1511 with patch(_PULL_TARGET, return_value=_pull_ok(fake_records, cursor=cursor + 1)):
1512 args = _PULL_ARGS + ["--since-id", str(cursor), "-j"]
1513 result = runner.invoke(cli, args)
1514 assert result.exit_code == 0, f"Step {i}: {result.output}"
1515 data = json.loads(result.output.strip())
1516 cursor = data["cursor"]
1517 assert cursor == 100
1518
1519 def test_concurrent_pull_8_threads(self, repo: pathlib.Path) -> None:
1520 """8 threads each call run_pull concurrently; patches applied at test level."""
1521 import argparse
1522 import threading
1523
1524 from muse.cli.commands.coord_sync import run_pull
1525
1526 errors: list[str] = []
1527
1528 def worker(idx: int) -> None:
1529 args = argparse.Namespace(
1530 hub="https://localhost:1337",
1531 owner="gabriel",
1532 slug="myrepo",
1533 signing=None,
1534 since_id=0,
1535 kinds=[],
1536 limit=500,
1537 json_out=True,
1538 )
1539 try:
1540 run_pull(args)
1541 except SystemExit as exc:
1542 if exc.code != 0:
1543 errors.append(f"Thread {idx}: exit {exc.code}")
1544 except Exception as exc:
1545 errors.append(f"Thread {idx}: {exc}")
1546
1547 pull_p = patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0))
1548 repo_p = patch("muse.cli.commands.coord_sync.require_repo", return_value=repo)
1549 pull_p.start()
1550 repo_p.start()
1551 try:
1552 threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
1553 for t in threads:
1554 t.start()
1555 for t in threads:
1556 t.join()
1557 finally:
1558 pull_p.stop()
1559 repo_p.stop()
1560 assert not errors, f"Concurrent failures: {errors}"
1561
1562
1563 # ---------------------------------------------------------------------------
1564 # TestRegisterFlags β€” --json / -j normalized at argparse level
1565 # ---------------------------------------------------------------------------
1566
1567
1568 class TestRegisterFlags:
1569 """register() must expose --json/-j with dest=json_out on pull and push."""
1570
1571 def _make_parser(self) -> "argparse.ArgumentParser":
1572 import argparse as ap
1573 from muse.cli.commands.coord_sync import register
1574 root = ap.ArgumentParser()
1575 subs = root.add_subparsers()
1576 register(subs)
1577 return root
1578
1579 # --json/-j lives on the pull/push sub-subparsers, not on sync itself.
1580 _PULL_REQUIRED = ["sync", "pull", "--owner", "o", "--slug", "s"]
1581
1582 def test_json_out_default_false(self) -> None:
1583 p = self._make_parser()
1584 ns = p.parse_args(self._PULL_REQUIRED)
1585 assert ns.json_out is False
1586
1587 def test_json_out_true_with_json_flag(self) -> None:
1588 p = self._make_parser()
1589 ns = p.parse_args(self._PULL_REQUIRED + ["--json"])
1590 assert ns.json_out is True
1591
1592 def test_json_out_true_with_j_flag(self) -> None:
1593 p = self._make_parser()
1594 ns = p.parse_args(self._PULL_REQUIRED + ["-j"])
1595 assert ns.json_out is True