test_cmd_coord_sync.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
| 1 | """Comprehensive tests for ``muse coord sync push`` and ``muse coord sync pull``. |
| 2 | |
| 3 | Coverage matrix |
| 4 | --------------- |
| 5 | Unit |
| 6 | ~~~~ |
| 7 | * _gather_local_records — reads reservations from disk |
| 8 | * _gather_local_records — reads heartbeats from disk |
| 9 | * _gather_local_records — kinds filter respected |
| 10 | * _gather_local_records — corrupt file skipped gracefully |
| 11 | * _gather_local_records — claims use claimer_run_id field |
| 12 | * _gather_local_records — all 7 kinds gathered |
| 13 | * _write_remote_records — writes correct paths under remote/ |
| 14 | * _write_remote_records — overwrites existing files |
| 15 | * _write_remote_records — record with no record_id skipped |
| 16 | * _write_remote_records — unknown kind rejected (path traversal prevention) |
| 17 | * _write_remote_records — unsafe record_id rejected (path traversal prevention) |
| 18 | * _write_remote_records — compact JSON written (no indent) |
| 19 | |
| 20 | Integration (all network calls mocked) |
| 21 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 22 | * push with no local records — text says "(no local coordination records to push)" |
| 23 | * push with reservations — calls push_to_hub with correct args |
| 24 | * push CoordBusError — exits 1 with error message |
| 25 | * pull empty result — prints "Pulled 0 new record(s)" |
| 26 | * pull with records — writes files to remote dir |
| 27 | * pull CoordBusError — exits 1 with error message |
| 28 | * --format json push — valid JSON with inserted/skipped/total/failed/elapsed/schema_version |
| 29 | * --format json pull — valid JSON with count/cursor/records/elapsed/schema_version |
| 30 | * --json shorthand push — same as --format json |
| 31 | * --json shorthand pull — same as --format json |
| 32 | * --since-id N — passed through to pull_from_hub |
| 33 | * --kinds filter push — restricts gathered kinds |
| 34 | * --limit N — passed through to pull_from_hub |
| 35 | * duration_ms present in both push and pull JSON output |
| 36 | |
| 37 | Input validation |
| 38 | ~~~~~~~~~~~~~~~~ |
| 39 | * --owner too long → exit 1 before any I/O |
| 40 | * --slug too long → exit 1 before any I/O |
| 41 | * --since-id negative → exit 1 before any I/O |
| 42 | * --limit = 0 → exit 1 before any I/O |
| 43 | * --limit > 1000 → exit 1 before any I/O |
| 44 | * --limit at boundary 1 → accepted |
| 45 | * --limit at boundary 1000 → accepted |
| 46 | * push owner/slug validation fires before require_repo |
| 47 | * pull since-id/limit validation fires before require_repo |
| 48 | |
| 49 | Security |
| 50 | ~~~~~~~~ |
| 51 | * owner/slug with path traversal chars are passed as strings to push_to_hub |
| 52 | * token not echoed in output |
| 53 | * _write_remote_records rejects unknown kind (prevents escaping remote/) |
| 54 | * _write_remote_records rejects traversal in record_id |
| 55 | |
| 56 | Stress |
| 57 | ~~~~~~ |
| 58 | * push 600 records splits into multiple batches |
| 59 | * pull returns 1000 records, all written to disk |
| 60 | """ |
| 61 | |
| 62 | from __future__ import annotations |
| 63 | |
| 64 | import itertools |
| 65 | import json |
| 66 | import pathlib |
| 67 | from typing import TYPE_CHECKING |
| 68 | |
| 69 | import pytest |
| 70 | from unittest.mock import patch, MagicMock, call |
| 71 | |
| 72 | from tests.cli_test_helper import CliRunner |
| 73 | from muse.core.types import MsgpackDict, content_hash |
| 74 | from muse.core.coord_bus import CoordBusError, JsonDict |
| 75 | |
| 76 | _id_seq = itertools.count() |
| 77 | |
| 78 | |
| 79 | def _new_id() -> str: |
| 80 | return content_hash({"seq": next(_id_seq)}) |
| 81 | |
| 82 | if TYPE_CHECKING: |
| 83 | from muse.core.transport import SigningIdentity |
| 84 | from muse.core.paths import coordination_dir, muse_dir |
| 85 | from muse.cli.commands.coord_sync import ( |
| 86 | _MAX_OWNER_LEN, |
| 87 | _MAX_SLUG_LEN, |
| 88 | _MAX_PULL_LIMIT, |
| 89 | _ALL_KINDS, |
| 90 | ) |
| 91 | |
| 92 | runner = CliRunner() |
| 93 | cli = None |
| 94 | |
| 95 | _PUSH_TARGET = "muse.cli.commands.coord_sync.push_to_hub" |
| 96 | _PULL_TARGET = "muse.cli.commands.coord_sync.pull_from_hub" |
| 97 | |
| 98 | |
| 99 | # ── Fixtures ────────────────────────────────────────────────────────────────── |
| 100 | |
| 101 | |
| 102 | @pytest.fixture() |
| 103 | def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: |
| 104 | dot_muse = muse_dir(tmp_path) |
| 105 | dot_muse.mkdir() |
| 106 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 107 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 108 | return tmp_path |
| 109 | |
| 110 | |
| 111 | # ── Helpers ─────────────────────────────────────────────────────────────────── |
| 112 | |
| 113 | |
| 114 | def _write_local_claim(repo: pathlib.Path, task_id: str | None = None, run_id: str = "worker-1") -> str: |
| 115 | claim_dir = coordination_dir(repo) / "claims" |
| 116 | claim_dir.mkdir(parents=True, exist_ok=True) |
| 117 | tid = task_id or _new_id() |
| 118 | data = { |
| 119 | "task_id": tid, |
| 120 | "claimer_run_id": run_id, |
| 121 | "claimed_at": "2026-01-01T00:00:00+00:00", |
| 122 | "expires_at": "2026-12-31T00:00:00+00:00", |
| 123 | } |
| 124 | (claim_dir / f"{tid}.json").write_text(json.dumps(data)) |
| 125 | return tid |
| 126 | |
| 127 | |
| 128 | def _write_local_reservation(repo: pathlib.Path, run_id: str = "agent-1") -> str: |
| 129 | coord_dir = coordination_dir(repo) / "reservations" |
| 130 | coord_dir.mkdir(parents=True, exist_ok=True) |
| 131 | rid = _new_id() |
| 132 | data = { |
| 133 | "reservation_id": rid, |
| 134 | "run_id": run_id, |
| 135 | "branch": "main", |
| 136 | "addresses": ["src/x.py::foo"], |
| 137 | "operation": None, |
| 138 | "created_at": "2026-01-01T00:00:00+00:00", |
| 139 | "expires_at": "2026-12-31T00:00:00+00:00", |
| 140 | } |
| 141 | (coord_dir / f"{rid}.json").write_text(json.dumps(data)) |
| 142 | return rid |
| 143 | |
| 144 | |
| 145 | def _write_local_heartbeat(repo: pathlib.Path, run_id: str = "agent-1") -> str: |
| 146 | hb_dir = coordination_dir(repo) / "heartbeats" |
| 147 | hb_dir.mkdir(parents=True, exist_ok=True) |
| 148 | data = { |
| 149 | "run_id": run_id, |
| 150 | "last_seen": "2026-01-01T00:01:00+00:00", |
| 151 | "expires_at": "2026-12-31T00:00:00+00:00", |
| 152 | } |
| 153 | (hb_dir / f"{run_id}.json").write_text(json.dumps(data)) |
| 154 | return run_id |
| 155 | |
| 156 | |
| 157 | _PUSH_ARGS = [ |
| 158 | "coord", "sync", "push", |
| 159 | "--hub", "https://localhost:1337", |
| 160 | "--owner", "gabriel", |
| 161 | "--slug", "myrepo", |
| 162 | |
| 163 | ] |
| 164 | |
| 165 | _PULL_ARGS = [ |
| 166 | "coord", "sync", "pull", |
| 167 | "--hub", "https://localhost:1337", |
| 168 | "--owner", "gabriel", |
| 169 | "--slug", "myrepo", |
| 170 | |
| 171 | "--since-id", "0", |
| 172 | ] |
| 173 | |
| 174 | |
| 175 | def _push_ok(inserted: int = 1, skipped: int = 0) -> MsgpackDict: |
| 176 | return {"inserted": inserted, "skipped": skipped} |
| 177 | |
| 178 | |
| 179 | def _pull_ok(records: list[MsgpackDict] | None = None, cursor: int = 0) -> MsgpackDict: |
| 180 | return {"records": records or [], "cursor": cursor} |
| 181 | |
| 182 | |
| 183 | # ── Unit: _gather_local_records ─────────────────────────────────────────────── |
| 184 | |
| 185 | |
| 186 | class TestGatherLocalRecords: |
| 187 | def test_empty_coordination_dir_returns_empty(self, repo: pathlib.Path) -> None: |
| 188 | from muse.cli.commands.coord_sync import _gather_local_records |
| 189 | records = _gather_local_records(repo, kinds=["reservation"]) |
| 190 | assert records == [] |
| 191 | |
| 192 | def test_reads_reservation_from_disk(self, repo: pathlib.Path) -> None: |
| 193 | from muse.cli.commands.coord_sync import _gather_local_records |
| 194 | rid = _write_local_reservation(repo) |
| 195 | records = _gather_local_records(repo, kinds=["reservation"]) |
| 196 | assert len(records) == 1 |
| 197 | assert records[0]["kind"] == "reservation" |
| 198 | assert records[0]["record_id"] == rid |
| 199 | |
| 200 | def test_reads_heartbeat_from_disk(self, repo: pathlib.Path) -> None: |
| 201 | from muse.cli.commands.coord_sync import _gather_local_records |
| 202 | run_id = _write_local_heartbeat(repo, "hb-agent") |
| 203 | records = _gather_local_records(repo, kinds=["heartbeat"]) |
| 204 | assert len(records) == 1 |
| 205 | assert records[0]["kind"] == "heartbeat" |
| 206 | assert records[0]["run_id"] == run_id |
| 207 | |
| 208 | def test_kinds_filter_excludes_heartbeats(self, repo: pathlib.Path) -> None: |
| 209 | from muse.cli.commands.coord_sync import _gather_local_records |
| 210 | _write_local_reservation(repo) |
| 211 | _write_local_heartbeat(repo) |
| 212 | records = _gather_local_records(repo, kinds=["reservation"]) |
| 213 | assert all(r["kind"] == "reservation" for r in records) |
| 214 | |
| 215 | def test_kinds_filter_excludes_reservations(self, repo: pathlib.Path) -> None: |
| 216 | from muse.cli.commands.coord_sync import _gather_local_records |
| 217 | _write_local_reservation(repo) |
| 218 | _write_local_heartbeat(repo) |
| 219 | records = _gather_local_records(repo, kinds=["heartbeat"]) |
| 220 | assert all(r["kind"] == "heartbeat" for r in records) |
| 221 | |
| 222 | def test_corrupt_file_skipped_gracefully(self, repo: pathlib.Path) -> None: |
| 223 | from muse.cli.commands.coord_sync import _gather_local_records |
| 224 | coord_dir = coordination_dir(repo) / "reservations" |
| 225 | coord_dir.mkdir(parents=True, exist_ok=True) |
| 226 | (coord_dir / "bad.json").write_text("not-valid-json{{{{") |
| 227 | records = _gather_local_records(repo, kinds=["reservation"]) |
| 228 | assert records == [] |
| 229 | |
| 230 | def test_multiple_records_all_returned(self, repo: pathlib.Path) -> None: |
| 231 | from muse.cli.commands.coord_sync import _gather_local_records |
| 232 | for i in range(5): |
| 233 | _write_local_reservation(repo, run_id=f"agent-{i}") |
| 234 | records = _gather_local_records(repo, kinds=["reservation"]) |
| 235 | assert len(records) == 5 |
| 236 | |
| 237 | def test_payload_field_contains_original_data(self, repo: pathlib.Path) -> None: |
| 238 | from muse.cli.commands.coord_sync import _gather_local_records |
| 239 | rid = _write_local_reservation(repo, run_id="my-agent") |
| 240 | records = _gather_local_records(repo, kinds=["reservation"]) |
| 241 | assert records[0]["payload"]["reservation_id"] == rid |
| 242 | assert records[0]["payload"]["run_id"] == "my-agent" |
| 243 | |
| 244 | |
| 245 | # ── Unit: _write_remote_records ─────────────────────────────────────────────── |
| 246 | |
| 247 | |
| 248 | class TestWriteRemoteRecords: |
| 249 | def test_writes_file_to_correct_path(self, repo: pathlib.Path) -> None: |
| 250 | from muse.cli.commands.coord_sync import _write_remote_records |
| 251 | rec = {"kind": "reservation", "record_id": "abc-123", "payload": {"x": 1}} |
| 252 | _write_remote_records(repo, [rec]) |
| 253 | target = coordination_dir(repo) / "remote" / "reservation" / "abc-123.json" |
| 254 | assert target.exists() |
| 255 | |
| 256 | def test_written_file_contains_correct_data(self, repo: pathlib.Path) -> None: |
| 257 | from muse.cli.commands.coord_sync import _write_remote_records |
| 258 | rec = {"kind": "reservation", "record_id": "def-456", "payload": {"y": 2}} |
| 259 | _write_remote_records(repo, [rec]) |
| 260 | target = coordination_dir(repo) / "remote" / "reservation" / "def-456.json" |
| 261 | data = json.loads(target.read_text()) |
| 262 | assert data["payload"]["y"] == 2 |
| 263 | |
| 264 | def test_overwrites_existing_file(self, repo: pathlib.Path) -> None: |
| 265 | from muse.cli.commands.coord_sync import _write_remote_records |
| 266 | kind_dir = coordination_dir(repo) / "remote" / "reservation" |
| 267 | kind_dir.mkdir(parents=True, exist_ok=True) |
| 268 | (kind_dir / "ghi-789.json").write_text('{"old": true}') |
| 269 | rec = {"kind": "reservation", "record_id": "ghi-789", "payload": {"new": True}} |
| 270 | _write_remote_records(repo, [rec]) |
| 271 | data = json.loads((kind_dir / "ghi-789.json").read_text()) |
| 272 | assert data["payload"]["new"] is True |
| 273 | |
| 274 | def test_record_without_record_id_skipped(self, repo: pathlib.Path) -> None: |
| 275 | from muse.cli.commands.coord_sync import _write_remote_records |
| 276 | rec = {"kind": "reservation", "record_id": "", "payload": {}} |
| 277 | _write_remote_records(repo, [rec]) |
| 278 | remote_dir = coordination_dir(repo) / "remote" |
| 279 | assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) |
| 280 | |
| 281 | def test_multiple_kinds_written_to_separate_dirs(self, repo: pathlib.Path) -> None: |
| 282 | from muse.cli.commands.coord_sync import _write_remote_records |
| 283 | records = [ |
| 284 | {"kind": "reservation", "record_id": "r1", "payload": {}}, |
| 285 | {"kind": "heartbeat", "record_id": "h1", "payload": {}}, |
| 286 | ] |
| 287 | _write_remote_records(repo, records) |
| 288 | assert (coordination_dir(repo) / "remote" / "reservation" / "r1.json").exists() |
| 289 | assert (coordination_dir(repo) / "remote" / "heartbeat" / "h1.json").exists() |
| 290 | |
| 291 | |
| 292 | # ── Integration: push ───────────────────────────────────────────────────────── |
| 293 | |
| 294 | |
| 295 | class TestCoordSyncPushIntegration: |
| 296 | def test_push_no_local_records_text_output(self, repo: pathlib.Path) -> None: |
| 297 | with patch(_PUSH_TARGET) as mock_push: |
| 298 | result = runner.invoke(cli, _PUSH_ARGS) |
| 299 | assert result.exit_code == 0 |
| 300 | assert "no local coordination records to push" in result.output |
| 301 | mock_push.assert_not_called() |
| 302 | |
| 303 | def test_push_with_reservation_calls_push_to_hub(self, repo: pathlib.Path) -> None: |
| 304 | _write_local_reservation(repo) |
| 305 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)) as mock_push: |
| 306 | result = runner.invoke(cli, _PUSH_ARGS) |
| 307 | assert result.exit_code == 0 |
| 308 | mock_push.assert_called_once() |
| 309 | call_args = mock_push.call_args |
| 310 | assert call_args[0][1] == "gabriel" # owner |
| 311 | assert call_args[0][2] == "myrepo" # slug |
| 312 | |
| 313 | def test_push_text_output_contains_inserted_skipped(self, repo: pathlib.Path) -> None: |
| 314 | _write_local_reservation(repo) |
| 315 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 316 | result = runner.invoke(cli, _PUSH_ARGS) |
| 317 | assert result.exit_code == 0 |
| 318 | assert "inserted" in result.output |
| 319 | assert "skipped" in result.output |
| 320 | |
| 321 | def test_push_coord_bus_error_exits_1(self, repo: pathlib.Path) -> None: |
| 322 | _write_local_reservation(repo) |
| 323 | with patch(_PUSH_TARGET, side_effect=CoordBusError("hub down")): |
| 324 | result = runner.invoke(cli, _PUSH_ARGS) |
| 325 | assert result.exit_code == 1 |
| 326 | assert "error" in result.stderr.lower() or "hub down" in result.stderr |
| 327 | |
| 328 | def test_push_format_json_valid_structure(self, repo: pathlib.Path) -> None: |
| 329 | _write_local_reservation(repo) |
| 330 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 331 | result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 332 | assert result.exit_code == 0 |
| 333 | data = json.loads(result.output.strip()) |
| 334 | assert "inserted" in data |
| 335 | assert "skipped" in data |
| 336 | assert "total" in data |
| 337 | assert "failed" in data |
| 338 | |
| 339 | def test_push_json_shorthand(self, repo: pathlib.Path) -> None: |
| 340 | _write_local_reservation(repo) |
| 341 | with patch(_PUSH_TARGET, return_value=_push_ok(2, 1)): |
| 342 | r1 = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 343 | r2 = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 344 | d1 = json.loads(r1.output.strip()) |
| 345 | d2 = json.loads(r2.output.strip()) |
| 346 | d1.pop("timestamp", None) |
| 347 | d2.pop("timestamp", None) |
| 348 | d1.pop("duration_ms", None) |
| 349 | d2.pop("duration_ms", None) |
| 350 | assert d1 == d2 |
| 351 | |
| 352 | def test_push_json_no_records_returns_zeros(self, repo: pathlib.Path) -> None: |
| 353 | with patch(_PUSH_TARGET): |
| 354 | result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 355 | assert result.exit_code == 0 |
| 356 | data = json.loads(result.output.strip()) |
| 357 | assert data["total"] == 0 |
| 358 | assert data["inserted"] == 0 |
| 359 | |
| 360 | def test_push_kinds_filter_passed_through(self, repo: pathlib.Path) -> None: |
| 361 | _write_local_reservation(repo) |
| 362 | _write_local_heartbeat(repo) |
| 363 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)) as mock_push: |
| 364 | result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation"]) |
| 365 | assert result.exit_code == 0 |
| 366 | # Only 1 kind → batch should contain only reservations |
| 367 | batch_arg = mock_push.call_args[0][3] |
| 368 | assert all(r["kind"] == "reservation" for r in batch_arg) |
| 369 | |
| 370 | def test_push_coord_bus_error_json_failed_true(self, repo: pathlib.Path) -> None: |
| 371 | _write_local_reservation(repo) |
| 372 | with patch(_PUSH_TARGET, side_effect=CoordBusError("oops")): |
| 373 | result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 374 | # exit_code 1; summary JSON (with failed=True) is the last JSON line |
| 375 | assert result.exit_code == 1 |
| 376 | json_lines = [ln for ln in result.output.splitlines() if ln.startswith("{")] |
| 377 | assert json_lines, "Expected at least one JSON output line" |
| 378 | # The summary JSON is last; the error JSON ({"error": ...}) may appear before it |
| 379 | summary = json.loads(json_lines[-1]) |
| 380 | assert summary["failed"] is True |
| 381 | |
| 382 | |
| 383 | # ── Integration: pull ───────────────────────────────────────────────────────── |
| 384 | |
| 385 | |
| 386 | class TestCoordSyncPullIntegration: |
| 387 | def test_pull_empty_result_exits_0(self, repo: pathlib.Path) -> None: |
| 388 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 389 | result = runner.invoke(cli, _PULL_ARGS) |
| 390 | assert result.exit_code == 0 |
| 391 | assert "Pulled 0 new record(s)" in result.output |
| 392 | |
| 393 | def test_pull_with_records_writes_files(self, repo: pathlib.Path) -> None: |
| 394 | records = [{"kind": "reservation", "record_id": "r1", "payload": {"x": 1}}] |
| 395 | with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=1)): |
| 396 | result = runner.invoke(cli, _PULL_ARGS) |
| 397 | assert result.exit_code == 0 |
| 398 | target = coordination_dir(repo) / "remote" / "reservation" / "r1.json" |
| 399 | assert target.exists() |
| 400 | |
| 401 | def test_pull_text_output_contains_cursor(self, repo: pathlib.Path) -> None: |
| 402 | records = [{"kind": "reservation", "record_id": "r42", "payload": {}}] |
| 403 | with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=42)): |
| 404 | result = runner.invoke(cli, _PULL_ARGS) |
| 405 | assert result.exit_code == 0 |
| 406 | assert "cursor: 42" in result.output |
| 407 | |
| 408 | def test_pull_coord_bus_error_exits_1(self, repo: pathlib.Path) -> None: |
| 409 | with patch(_PULL_TARGET, side_effect=CoordBusError("connection refused")): |
| 410 | result = runner.invoke(cli, _PULL_ARGS) |
| 411 | assert result.exit_code == 1 |
| 412 | assert "connection refused" in result.stderr |
| 413 | |
| 414 | def test_pull_format_json_valid_structure(self, repo: pathlib.Path) -> None: |
| 415 | with patch(_PULL_TARGET, return_value=_pull_ok(cursor=7)): |
| 416 | result = runner.invoke(cli, _PULL_ARGS + ["--json"]) |
| 417 | assert result.exit_code == 0 |
| 418 | data = json.loads(result.output.strip()) |
| 419 | assert "count" in data |
| 420 | assert "cursor" in data |
| 421 | assert "records" in data |
| 422 | |
| 423 | def test_pull_json_shorthand(self, repo: pathlib.Path) -> None: |
| 424 | with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)): |
| 425 | r1 = runner.invoke(cli, _PULL_ARGS + ["--json"]) |
| 426 | r2 = runner.invoke(cli, _PULL_ARGS + ["--json"]) |
| 427 | d1 = json.loads(r1.output.strip()) |
| 428 | d2 = json.loads(r2.output.strip()) |
| 429 | d1.pop("timestamp", None) |
| 430 | d2.pop("timestamp", None) |
| 431 | d1.pop("duration_ms", None) |
| 432 | d2.pop("duration_ms", None) |
| 433 | assert d1 == d2 |
| 434 | |
| 435 | def test_pull_since_id_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: |
| 436 | with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: |
| 437 | runner.invoke(cli, _PULL_ARGS[:-2] + ["--since-id", "99"]) |
| 438 | assert mock_pull.called |
| 439 | call_args = mock_pull.call_args[0] |
| 440 | assert call_args[3] == 99 # since_id |
| 441 | |
| 442 | def test_pull_limit_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: |
| 443 | with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: |
| 444 | runner.invoke(cli, _PULL_ARGS + ["--limit", "42"]) |
| 445 | call_args = mock_pull.call_args[0] |
| 446 | assert call_args[5] == 42 # limit |
| 447 | |
| 448 | def test_pull_kinds_filter_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: |
| 449 | with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: |
| 450 | runner.invoke(cli, _PULL_ARGS + ["--kinds", "reservation", "heartbeat"]) |
| 451 | call_args = mock_pull.call_args[0] |
| 452 | assert "reservation" in call_args[4] |
| 453 | assert "heartbeat" in call_args[4] |
| 454 | |
| 455 | def test_pull_json_count_matches_records_length(self, repo: pathlib.Path) -> None: |
| 456 | records = [ |
| 457 | {"kind": "reservation", "record_id": f"r{i}", "payload": {}} |
| 458 | for i in range(5) |
| 459 | ] |
| 460 | with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=5)): |
| 461 | result = runner.invoke(cli, _PULL_ARGS + ["--json"]) |
| 462 | data = json.loads(result.output.strip()) |
| 463 | assert data["count"] == 5 |
| 464 | assert len(data["records"]) == 5 |
| 465 | |
| 466 | def test_pull_signing_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: |
| 467 | with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: |
| 468 | runner.invoke(cli, _PULL_ARGS) |
| 469 | call_args = mock_pull.call_args[0] |
| 470 | assert call_args[6] is None # signing (no identity configured in test) |
| 471 | |
| 472 | |
| 473 | # ── Security ────────────────────────────────────────────────────────────────── |
| 474 | |
| 475 | |
| 476 | class TestCoordSyncSecurity: |
| 477 | def test_push_traversal_owner_passed_as_string(self, repo: pathlib.Path) -> None: |
| 478 | """Path-traversal chars in owner are passed as-is to push_to_hub (encoding is push_to_hub's job).""" |
| 479 | _write_local_reservation(repo) |
| 480 | malicious_owner = "../traversal" |
| 481 | args = [ |
| 482 | "coord", "sync", "push", |
| 483 | "--hub", "https://localhost:1337", |
| 484 | "--owner", malicious_owner, |
| 485 | "--slug", "myrepo", |
| 486 | |
| 487 | ] |
| 488 | with patch(_PUSH_TARGET, return_value=_push_ok()) as mock_push: |
| 489 | runner.invoke(cli, args) |
| 490 | if mock_push.called: |
| 491 | call_args = mock_push.call_args[0] |
| 492 | assert call_args[1] == malicious_owner # owner string passed verbatim |
| 493 | |
| 494 | def test_token_not_in_output(self, repo: pathlib.Path) -> None: |
| 495 | _write_local_reservation(repo) |
| 496 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 497 | result = runner.invoke(cli, _PUSH_ARGS) |
| 498 | assert "tok" not in result.output |
| 499 | |
| 500 | def test_pull_token_not_in_output(self, repo: pathlib.Path) -> None: |
| 501 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 502 | result = runner.invoke(cli, _PULL_ARGS) |
| 503 | assert "tok" not in result.output |
| 504 | |
| 505 | def test_write_remote_records_rejects_unknown_kind(self, repo: pathlib.Path) -> None: |
| 506 | """Server-supplied kind '../../malicious' must not escape remote/ directory.""" |
| 507 | from muse.cli.commands.coord_sync import _write_remote_records |
| 508 | rec = {"kind": "../../malicious", "record_id": "abc123", "payload": {}} |
| 509 | _write_remote_records(repo, [rec]) |
| 510 | remote_dir = coordination_dir(repo) / "remote" |
| 511 | assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) |
| 512 | |
| 513 | def test_write_remote_records_rejects_traversal_record_id(self, repo: pathlib.Path) -> None: |
| 514 | """Server-supplied record_id '../../../etc/passwd' must not escape kind dir.""" |
| 515 | from muse.cli.commands.coord_sync import _write_remote_records |
| 516 | rec = {"kind": "reservation", "record_id": "../../../etc/passwd", "payload": {}} |
| 517 | _write_remote_records(repo, [rec]) |
| 518 | remote_dir = coordination_dir(repo) / "remote" |
| 519 | assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) |
| 520 | |
| 521 | |
| 522 | # ── Unit: _gather_local_records — claim field name ───────────────────────────── |
| 523 | |
| 524 | |
| 525 | class TestGatherLocalRecordsClaims: |
| 526 | def test_claim_uses_claimer_run_id_field(self, repo: pathlib.Path) -> None: |
| 527 | """Bug fix: claims must read 'claimer_run_id', not 'claimed_by'.""" |
| 528 | from muse.cli.commands.coord_sync import _gather_local_records |
| 529 | tid = _write_local_claim(repo, run_id="correct-worker") |
| 530 | records = _gather_local_records(repo, kinds=["claim"]) |
| 531 | assert len(records) == 1 |
| 532 | assert records[0]["run_id"] == "correct-worker" |
| 533 | |
| 534 | def test_claim_record_id_is_task_id(self, repo: pathlib.Path) -> None: |
| 535 | from muse.cli.commands.coord_sync import _gather_local_records |
| 536 | tid = _write_local_claim(repo) |
| 537 | records = _gather_local_records(repo, kinds=["claim"]) |
| 538 | assert records[0]["record_id"] == tid |
| 539 | |
| 540 | def test_claim_expires_at_included(self, repo: pathlib.Path) -> None: |
| 541 | from muse.cli.commands.coord_sync import _gather_local_records |
| 542 | _write_local_claim(repo) |
| 543 | records = _gather_local_records(repo, kinds=["claim"]) |
| 544 | assert records[0]["expires_at"] is not None |
| 545 | |
| 546 | |
| 547 | # ── Unit: _write_remote_records — compact JSON ──────────────────────────────── |
| 548 | |
| 549 | |
| 550 | class TestWriteRemoteRecordsCompact: |
| 551 | def test_written_json_is_compact(self, repo: pathlib.Path) -> None: |
| 552 | """No indent=2 — remote files must be compact single-line JSON.""" |
| 553 | from muse.cli.commands.coord_sync import _write_remote_records |
| 554 | rec = {"kind": "reservation", "record_id": "compact-test", "payload": {"x": 1}} |
| 555 | _write_remote_records(repo, [rec]) |
| 556 | target = coordination_dir(repo) / "remote" / "reservation" / "compact-test.json" |
| 557 | raw = target.read_text().strip() |
| 558 | # Compact JSON has no interior newlines |
| 559 | assert "\n" not in raw |
| 560 | |
| 561 | def test_valid_kinds_all_accepted(self, repo: pathlib.Path) -> None: |
| 562 | """All 7 kinds are accepted by the allowlist.""" |
| 563 | from muse.cli.commands.coord_sync import _write_remote_records |
| 564 | records = [ |
| 565 | {"kind": k, "record_id": f"id-{i}", "payload": {}} |
| 566 | for i, k in enumerate(_ALL_KINDS) |
| 567 | ] |
| 568 | _write_remote_records(repo, records) |
| 569 | remote_dir = coordination_dir(repo) / "remote" |
| 570 | written = list(remote_dir.rglob("*.json")) |
| 571 | assert len(written) == len(_ALL_KINDS) |
| 572 | |
| 573 | def test_empty_record_id_still_skipped(self, repo: pathlib.Path) -> None: |
| 574 | from muse.cli.commands.coord_sync import _write_remote_records |
| 575 | rec = {"kind": "reservation", "record_id": "", "payload": {}} |
| 576 | _write_remote_records(repo, [rec]) |
| 577 | remote_dir = coordination_dir(repo) / "remote" |
| 578 | assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) |
| 579 | |
| 580 | def test_record_id_with_dots_rejected(self, repo: pathlib.Path) -> None: |
| 581 | """Dots in record_id are not allowed (could be used for traversal).""" |
| 582 | from muse.cli.commands.coord_sync import _write_remote_records |
| 583 | rec = {"kind": "reservation", "record_id": "..malicious", "payload": {}} |
| 584 | _write_remote_records(repo, [rec]) |
| 585 | remote_dir = coordination_dir(repo) / "remote" |
| 586 | assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) |
| 587 | |
| 588 | def test_record_id_with_slash_rejected(self, repo: pathlib.Path) -> None: |
| 589 | from muse.cli.commands.coord_sync import _write_remote_records |
| 590 | rec = {"kind": "reservation", "record_id": "a/b", "payload": {}} |
| 591 | _write_remote_records(repo, [rec]) |
| 592 | remote_dir = coordination_dir(repo) / "remote" |
| 593 | assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) |
| 594 | |
| 595 | def test_record_id_too_long_rejected(self, repo: pathlib.Path) -> None: |
| 596 | """record_ids over 128 chars are rejected.""" |
| 597 | from muse.cli.commands.coord_sync import _write_remote_records |
| 598 | rec = {"kind": "reservation", "record_id": "a" * 129, "payload": {}} |
| 599 | _write_remote_records(repo, [rec]) |
| 600 | remote_dir = coordination_dir(repo) / "remote" |
| 601 | assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) |
| 602 | |
| 603 | |
| 604 | # ── Input validation ────────────────────────────────────────────────────────── |
| 605 | |
| 606 | |
| 607 | class TestSyncInputValidation: |
| 608 | def _push_args(self, owner: str = "gabriel", slug: str = "myrepo", extra: list[str] | None = None) -> list[str]: |
| 609 | args = [ |
| 610 | "coord", "sync", "push", |
| 611 | "--hub", "https://localhost:1337", |
| 612 | "--owner", owner, |
| 613 | "--slug", slug, |
| 614 | |
| 615 | ] |
| 616 | if extra: |
| 617 | args.extend(extra) |
| 618 | return args |
| 619 | |
| 620 | def _pull_args(self, owner: str = "gabriel", slug: str = "myrepo", extra: list[str] | None = None) -> list[str]: |
| 621 | args = [ |
| 622 | "coord", "sync", "pull", |
| 623 | "--hub", "https://localhost:1337", |
| 624 | "--owner", owner, |
| 625 | "--slug", slug, |
| 626 | |
| 627 | ] |
| 628 | if extra: |
| 629 | args.extend(extra) |
| 630 | return args |
| 631 | |
| 632 | def test_push_owner_too_long_exits_1(self, repo: pathlib.Path) -> None: |
| 633 | owner = "x" * (_MAX_OWNER_LEN + 1) |
| 634 | result = runner.invoke(cli, self._push_args(owner=owner)) |
| 635 | assert result.exit_code == 1 |
| 636 | |
| 637 | def test_push_slug_too_long_exits_1(self, repo: pathlib.Path) -> None: |
| 638 | slug = "x" * (_MAX_SLUG_LEN + 1) |
| 639 | result = runner.invoke(cli, self._push_args(slug=slug)) |
| 640 | assert result.exit_code == 1 |
| 641 | |
| 642 | def test_push_owner_at_max_accepted(self, repo: pathlib.Path) -> None: |
| 643 | owner = "x" * _MAX_OWNER_LEN |
| 644 | with patch(_PUSH_TARGET, return_value=_push_ok()): |
| 645 | result = runner.invoke(cli, self._push_args(owner=owner)) |
| 646 | # No validation error — exits 0 (no records to push) |
| 647 | assert result.exit_code == 0 |
| 648 | |
| 649 | def test_push_slug_at_max_accepted(self, repo: pathlib.Path) -> None: |
| 650 | slug = "x" * _MAX_SLUG_LEN |
| 651 | with patch(_PUSH_TARGET, return_value=_push_ok()): |
| 652 | result = runner.invoke(cli, self._push_args(slug=slug)) |
| 653 | assert result.exit_code == 0 |
| 654 | |
| 655 | def test_push_owner_too_long_json_error(self, repo: pathlib.Path) -> None: |
| 656 | owner = "x" * (_MAX_OWNER_LEN + 1) |
| 657 | result = runner.invoke(cli, self._push_args(owner=owner) + ["--json"]) |
| 658 | assert result.exit_code == 1 |
| 659 | data = json.loads(result.output.strip()) |
| 660 | assert data["status"] == "bad_args" |
| 661 | |
| 662 | def test_pull_owner_too_long_exits_1(self, repo: pathlib.Path) -> None: |
| 663 | owner = "x" * (_MAX_OWNER_LEN + 1) |
| 664 | result = runner.invoke(cli, self._pull_args(owner=owner)) |
| 665 | assert result.exit_code == 1 |
| 666 | |
| 667 | def test_pull_slug_too_long_exits_1(self, repo: pathlib.Path) -> None: |
| 668 | slug = "x" * (_MAX_SLUG_LEN + 1) |
| 669 | result = runner.invoke(cli, self._pull_args(slug=slug)) |
| 670 | assert result.exit_code == 1 |
| 671 | |
| 672 | def test_pull_since_id_negative_exits_1(self, repo: pathlib.Path) -> None: |
| 673 | result = runner.invoke(cli, self._pull_args(extra=["--since-id", "-1"])) |
| 674 | assert result.exit_code == 1 |
| 675 | |
| 676 | def test_pull_since_id_negative_json_error(self, repo: pathlib.Path) -> None: |
| 677 | result = runner.invoke(cli, self._pull_args(extra=["--since-id", "-1", "--json"])) |
| 678 | assert result.exit_code == 1 |
| 679 | data = json.loads(result.output.strip()) |
| 680 | assert data["status"] == "bad_args" |
| 681 | |
| 682 | def test_pull_limit_zero_exits_1(self, repo: pathlib.Path) -> None: |
| 683 | result = runner.invoke(cli, self._pull_args(extra=["--limit", "0"])) |
| 684 | assert result.exit_code == 1 |
| 685 | |
| 686 | def test_pull_limit_over_max_exits_1(self, repo: pathlib.Path) -> None: |
| 687 | result = runner.invoke(cli, self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT + 1)])) |
| 688 | assert result.exit_code == 1 |
| 689 | |
| 690 | def test_pull_limit_at_min_accepted(self, repo: pathlib.Path) -> None: |
| 691 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 692 | result = runner.invoke(cli, self._pull_args(extra=["--limit", "1"])) |
| 693 | assert result.exit_code == 0 |
| 694 | |
| 695 | def test_pull_limit_at_max_accepted(self, repo: pathlib.Path) -> None: |
| 696 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 697 | result = runner.invoke(cli, self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT)])) |
| 698 | assert result.exit_code == 0 |
| 699 | |
| 700 | def test_pull_limit_over_max_json_error(self, repo: pathlib.Path) -> None: |
| 701 | result = runner.invoke( |
| 702 | cli, |
| 703 | self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT + 1), "--json"]), |
| 704 | ) |
| 705 | assert result.exit_code == 1 |
| 706 | data = json.loads(result.output.strip()) |
| 707 | assert data["status"] == "bad_args" |
| 708 | |
| 709 | def test_push_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 710 | """Validation must not touch filesystem — no repo needed.""" |
| 711 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) # no .muse dir |
| 712 | owner = "x" * (_MAX_OWNER_LEN + 1) |
| 713 | result = runner.invoke(cli, [ |
| 714 | "coord", "sync", "push", |
| 715 | "--hub", "https://localhost:1337", |
| 716 | "--owner", owner, |
| 717 | "--slug", "myrepo", |
| 718 | |
| 719 | ]) |
| 720 | assert result.exit_code == 1 |
| 721 | |
| 722 | def test_pull_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 723 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) # no .muse dir |
| 724 | result = runner.invoke(cli, [ |
| 725 | "coord", "sync", "pull", |
| 726 | "--hub", "https://localhost:1337", |
| 727 | "--owner", "gabriel", |
| 728 | "--slug", "myrepo", |
| 729 | |
| 730 | "--since-id", "-5", |
| 731 | ]) |
| 732 | assert result.exit_code == 1 |
| 733 | |
| 734 | |
| 735 | # ── JSON schema: schema_version and duration_ms ────────────────────────── |
| 736 | |
| 737 | |
| 738 | class TestSyncJsonSchema: |
| 739 | def test_push_json_includes_schema_version(self, repo: pathlib.Path) -> None: |
| 740 | _write_local_reservation(repo) |
| 741 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 742 | result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 743 | assert result.exit_code == 0 |
| 744 | data = json.loads(result.output.strip()) |
| 745 | assert "schema" in data |
| 746 | |
| 747 | def test_push_json_includes_duration_ms(self, repo: pathlib.Path) -> None: |
| 748 | _write_local_reservation(repo) |
| 749 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 750 | result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 751 | assert result.exit_code == 0 |
| 752 | data = json.loads(result.output.strip()) |
| 753 | assert "duration_ms" in data |
| 754 | assert isinstance(data["duration_ms"], float) |
| 755 | |
| 756 | def test_push_json_no_records_includes_elapsed(self, repo: pathlib.Path) -> None: |
| 757 | with patch(_PUSH_TARGET): |
| 758 | result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) |
| 759 | assert result.exit_code == 0 |
| 760 | data = json.loads(result.output.strip()) |
| 761 | assert "duration_ms" in data |
| 762 | |
| 763 | def test_pull_json_includes_schema_version(self, repo: pathlib.Path) -> None: |
| 764 | with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)): |
| 765 | result = runner.invoke(cli, _PULL_ARGS + ["--json"]) |
| 766 | assert result.exit_code == 0 |
| 767 | data = json.loads(result.output.strip()) |
| 768 | assert "schema" in data |
| 769 | |
| 770 | def test_pull_json_includes_duration_ms(self, repo: pathlib.Path) -> None: |
| 771 | with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)): |
| 772 | result = runner.invoke(cli, _PULL_ARGS + ["--json"]) |
| 773 | assert result.exit_code == 0 |
| 774 | data = json.loads(result.output.strip()) |
| 775 | assert "duration_ms" in data |
| 776 | assert isinstance(data["duration_ms"], float) |
| 777 | |
| 778 | def test_push_text_includes_elapsed(self, repo: pathlib.Path) -> None: |
| 779 | _write_local_reservation(repo) |
| 780 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 781 | result = runner.invoke(cli, _PUSH_ARGS) |
| 782 | assert result.exit_code == 0 |
| 783 | # elapsed is formatted as (Ns) |
| 784 | assert "s)" in result.output |
| 785 | |
| 786 | def test_pull_text_includes_elapsed(self, repo: pathlib.Path) -> None: |
| 787 | with patch(_PULL_TARGET, return_value=_pull_ok(cursor=1)): |
| 788 | result = runner.invoke(cli, _PULL_ARGS) |
| 789 | assert result.exit_code == 0 |
| 790 | assert "s)" in result.output |
| 791 | |
| 792 | |
| 793 | # ── Gather all 7 kinds ──────────────────────────────────────────────────────── |
| 794 | |
| 795 | |
| 796 | class TestGatherAllKinds: |
| 797 | def _write_intent(self, repo: pathlib.Path) -> str: |
| 798 | d = coordination_dir(repo) / "intents" |
| 799 | d.mkdir(parents=True, exist_ok=True) |
| 800 | iid = _new_id() |
| 801 | data = {"intent_id": iid, "run_id": "agent", "expires_at": None} |
| 802 | (d / f"{iid}.json").write_text(json.dumps(data)) |
| 803 | return iid |
| 804 | |
| 805 | def _write_release(self, repo: pathlib.Path) -> str: |
| 806 | d = coordination_dir(repo) / "releases" |
| 807 | d.mkdir(parents=True, exist_ok=True) |
| 808 | rid = _new_id() |
| 809 | data = {"release_id": rid, "run_id": "agent"} |
| 810 | (d / f"{rid}.json").write_text(json.dumps(data)) |
| 811 | return rid |
| 812 | |
| 813 | def _write_dependency(self, repo: pathlib.Path) -> str: |
| 814 | d = coordination_dir(repo) / "dependencies" |
| 815 | d.mkdir(parents=True, exist_ok=True) |
| 816 | rid = _new_id() |
| 817 | data = {"reservation_id": rid} |
| 818 | (d / f"{rid}.json").write_text(json.dumps(data)) |
| 819 | return rid |
| 820 | |
| 821 | def _write_task(self, repo: pathlib.Path) -> str: |
| 822 | d = coordination_dir(repo) / "tasks" |
| 823 | d.mkdir(parents=True, exist_ok=True) |
| 824 | tid = _new_id() |
| 825 | data = {"task_id": tid, "run_id": "creator"} |
| 826 | (d / f"{tid}.json").write_text(json.dumps(data)) |
| 827 | return tid |
| 828 | |
| 829 | def test_all_kinds_gathered(self, repo: pathlib.Path) -> None: |
| 830 | from muse.cli.commands.coord_sync import _gather_local_records |
| 831 | _write_local_reservation(repo) |
| 832 | _write_local_heartbeat(repo, "hb-1") |
| 833 | self._write_intent(repo) |
| 834 | self._write_release(repo) |
| 835 | self._write_dependency(repo) |
| 836 | self._write_task(repo) |
| 837 | _write_local_claim(repo) |
| 838 | records = _gather_local_records(repo, kinds=list(_ALL_KINDS)) |
| 839 | kinds_found = {r["kind"] for r in records} |
| 840 | assert kinds_found == set(_ALL_KINDS) |
| 841 | |
| 842 | def test_each_kind_has_correct_record_id(self, repo: pathlib.Path) -> None: |
| 843 | from muse.cli.commands.coord_sync import _gather_local_records |
| 844 | rid = _write_local_reservation(repo) |
| 845 | records = _gather_local_records(repo, kinds=["reservation"]) |
| 846 | assert records[0]["record_id"] == rid |
| 847 | |
| 848 | def test_release_has_none_expires_at(self, repo: pathlib.Path) -> None: |
| 849 | from muse.cli.commands.coord_sync import _gather_local_records |
| 850 | self._write_release(repo) |
| 851 | records = _gather_local_records(repo, kinds=["release"]) |
| 852 | assert records[0]["expires_at"] is None |
| 853 | |
| 854 | def test_dependency_has_none_expires_at(self, repo: pathlib.Path) -> None: |
| 855 | from muse.cli.commands.coord_sync import _gather_local_records |
| 856 | self._write_dependency(repo) |
| 857 | records = _gather_local_records(repo, kinds=["dependency"]) |
| 858 | assert records[0]["expires_at"] is None |
| 859 | |
| 860 | def test_task_has_none_expires_at(self, repo: pathlib.Path) -> None: |
| 861 | from muse.cli.commands.coord_sync import _gather_local_records |
| 862 | self._write_task(repo) |
| 863 | records = _gather_local_records(repo, kinds=["task"]) |
| 864 | assert records[0]["expires_at"] is None |
| 865 | |
| 866 | |
| 867 | # ── Stress tests ────────────────────────────────────────────────────────────── |
| 868 | |
| 869 | |
| 870 | class TestSyncStress: |
| 871 | def test_push_600_records_batched(self, repo: pathlib.Path) -> None: |
| 872 | """600 records must be split across ≥ 2 batches of MAX_PUSH_BATCH.""" |
| 873 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 874 | for i in range(600): |
| 875 | _write_local_reservation(repo, run_id=f"agent-{i}") |
| 876 | call_count = 0 |
| 877 | |
| 878 | def fake_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: |
| 879 | nonlocal call_count |
| 880 | call_count += 1 |
| 881 | assert len(batch) <= MAX_PUSH_BATCH |
| 882 | return {"inserted": len(batch), "skipped": 0} |
| 883 | |
| 884 | with patch(_PUSH_TARGET, side_effect=fake_push): |
| 885 | result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation"]) |
| 886 | assert result.exit_code == 0 |
| 887 | assert call_count >= 2 |
| 888 | |
| 889 | def test_push_600_records_inserted_count_correct(self, repo: pathlib.Path) -> None: |
| 890 | for i in range(600): |
| 891 | _write_local_reservation(repo, run_id=f"agent-{i}") |
| 892 | with patch(_PUSH_TARGET, return_value=_push_ok(inserted=1, skipped=0)) as mock: |
| 893 | result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation", "--json"]) |
| 894 | assert result.exit_code == 0 |
| 895 | data = json.loads(result.output.strip()) |
| 896 | assert data["total"] == 600 |
| 897 | |
| 898 | def test_pull_1000_records_all_written(self, repo: pathlib.Path) -> None: |
| 899 | records = [ |
| 900 | {"kind": "reservation", "record_id": _new_id(), "payload": {}} |
| 901 | for _ in range(1000) |
| 902 | ] |
| 903 | with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=1000)): |
| 904 | result = runner.invoke(cli, _PULL_ARGS + ["--limit", "1000"]) |
| 905 | assert result.exit_code == 0 |
| 906 | remote_dir = coordination_dir(repo) / "remote" / "reservation" |
| 907 | written = list(remote_dir.glob("*.json")) |
| 908 | assert len(written) == 1000 |
| 909 | |
| 910 | def test_pull_mixed_invalid_records_skipped(self, repo: pathlib.Path) -> None: |
| 911 | """Records with invalid kind/record_id are skipped; valid ones still written.""" |
| 912 | good = {"kind": "reservation", "record_id": "good-record-1", "payload": {}} |
| 913 | bad_kind = {"kind": "../traversal", "record_id": "malicious-id", "payload": {}} |
| 914 | bad_id = {"kind": "reservation", "record_id": "../etc/passwd", "payload": {}} |
| 915 | with patch(_PULL_TARGET, return_value=_pull_ok([good, bad_kind, bad_id], cursor=3)): |
| 916 | result = runner.invoke(cli, _PULL_ARGS) |
| 917 | assert result.exit_code == 0 |
| 918 | good_file = coordination_dir(repo) / "remote" / "reservation" / "good-record-1.json" |
| 919 | assert good_file.exists() |
| 920 | # Only 1 file written (the good record) |
| 921 | remote_dir = coordination_dir(repo) / "remote" |
| 922 | all_files = list(remote_dir.rglob("*.json")) |
| 923 | assert len(all_files) == 1 |
| 924 | |
| 925 | def test_push_partial_failure_reports_failed_true(self, repo: pathlib.Path) -> None: |
| 926 | """If one batch fails, failed=True in JSON even if other batches succeed.""" |
| 927 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 928 | # Write enough for 2 batches |
| 929 | for i in range(MAX_PUSH_BATCH + 1): |
| 930 | _write_local_reservation(repo, run_id=f"agent-{i}") |
| 931 | |
| 932 | call_count = 0 |
| 933 | def sometimes_fail(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: |
| 934 | nonlocal call_count |
| 935 | call_count += 1 |
| 936 | if call_count == 1: |
| 937 | raise CoordBusError("first batch failed") |
| 938 | return {"inserted": len(batch), "skipped": 0} |
| 939 | |
| 940 | with patch(_PUSH_TARGET, side_effect=sometimes_fail): |
| 941 | result = runner.invoke( |
| 942 | cli, _PUSH_ARGS + ["--kinds", "reservation", "--json"] |
| 943 | ) |
| 944 | assert result.exit_code == 1 |
| 945 | # Find JSON line (error from first batch goes to stdout too in JSON mode) |
| 946 | json_lines = [ln for ln in result.output.splitlines() if ln.startswith("{")] |
| 947 | final = json.loads(json_lines[-1]) |
| 948 | assert final["failed"] is True |
| 949 | |
| 950 | |
| 951 | # --------------------------------------------------------------------------- |
| 952 | # Extended — muse coord sync push |
| 953 | # --------------------------------------------------------------------------- |
| 954 | |
| 955 | |
| 956 | class TestCoordSyncPushExtended: |
| 957 | def test_j_alias_works(self, repo: pathlib.Path) -> None: |
| 958 | """-j is equivalent to --json for push.""" |
| 959 | with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): |
| 960 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 961 | assert result.exit_code == 0, result.output |
| 962 | data = json.loads(result.output.strip()) |
| 963 | assert "inserted" in data |
| 964 | |
| 965 | def test_help_flag(self, repo: pathlib.Path) -> None: |
| 966 | result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) |
| 967 | assert result.exit_code == 0 |
| 968 | |
| 969 | def test_json_compact_single_line(self, repo: pathlib.Path) -> None: |
| 970 | """JSON output is a single compact line — no indent=2.""" |
| 971 | _write_local_reservation(repo) |
| 972 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 973 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 974 | assert result.exit_code == 0 |
| 975 | lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] |
| 976 | assert len(lines) == 1, f"Expected compact JSON, got: {result.output!r}" |
| 977 | |
| 978 | def test_json_all_required_fields(self, repo: pathlib.Path) -> None: |
| 979 | """JSON always has schema_version, inserted, skipped, total, failed, duration_ms.""" |
| 980 | _write_local_reservation(repo) |
| 981 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 982 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 983 | data = json.loads(result.output.strip()) |
| 984 | for field in ("schema", "inserted", "skipped", "total", "failed", "duration_ms"): |
| 985 | assert field in data, f"Missing field: {field}" |
| 986 | |
| 987 | def test_json_inserted_is_int(self, repo: pathlib.Path) -> None: |
| 988 | _write_local_reservation(repo) |
| 989 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 990 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 991 | data = json.loads(result.output.strip()) |
| 992 | assert isinstance(data["inserted"], int) |
| 993 | assert isinstance(data["skipped"], int) |
| 994 | assert isinstance(data["total"], int) |
| 995 | |
| 996 | def test_json_failed_is_bool(self, repo: pathlib.Path) -> None: |
| 997 | with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): |
| 998 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 999 | data = json.loads(result.output.strip()) |
| 1000 | assert isinstance(data["failed"], bool) |
| 1001 | assert data["failed"] is False |
| 1002 | |
| 1003 | def test_json_duration_ms_is_number(self, repo: pathlib.Path) -> None: |
| 1004 | with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): |
| 1005 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 1006 | data = json.loads(result.output.strip()) |
| 1007 | assert isinstance(data["duration_ms"], (int, float)) |
| 1008 | assert data["duration_ms"] >= 0 |
| 1009 | |
| 1010 | def test_json_schema_is_int(self, repo: pathlib.Path) -> None: |
| 1011 | with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): |
| 1012 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 1013 | data = json.loads(result.output.strip()) |
| 1014 | assert isinstance(data["schema"], int) |
| 1015 | assert data["schema"] >= 1 |
| 1016 | |
| 1017 | def test_json_total_matches_gathered_records(self, repo: pathlib.Path) -> None: |
| 1018 | _write_local_reservation(repo) |
| 1019 | _write_local_reservation(repo) |
| 1020 | with patch(_PUSH_TARGET, return_value=_push_ok(2, 0)): |
| 1021 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 1022 | data = json.loads(result.output.strip()) |
| 1023 | assert data["total"] == 2 |
| 1024 | |
| 1025 | def test_json_no_records_total_is_zero(self, repo: pathlib.Path) -> None: |
| 1026 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 1027 | assert result.exit_code == 0 |
| 1028 | data = json.loads(result.output.strip()) |
| 1029 | assert data["total"] == 0 |
| 1030 | assert data["inserted"] == 0 |
| 1031 | assert data["skipped"] == 0 |
| 1032 | |
| 1033 | def test_idempotent_second_push_all_skipped(self, repo: pathlib.Path) -> None: |
| 1034 | """Second push: all records skipped (hub already has them).""" |
| 1035 | _write_local_reservation(repo) |
| 1036 | with patch(_PUSH_TARGET, return_value=_push_ok(0, 1)): |
| 1037 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 1038 | data = json.loads(result.output.strip()) |
| 1039 | assert data["skipped"] == 1 |
| 1040 | assert data["inserted"] == 0 |
| 1041 | |
| 1042 | def test_all_7_kinds_pushed(self, repo: pathlib.Path) -> None: |
| 1043 | """Push with all 7 kinds gathers records from every kind directory.""" |
| 1044 | coord_dir = coordination_dir(repo) |
| 1045 | kind_dirs = { |
| 1046 | "reservations": ("reservation_id", "run_id"), |
| 1047 | "heartbeats": ("run_id", "run_id"), |
| 1048 | "intents": ("intent_id", "run_id"), |
| 1049 | "releases": ("release_id", "run_id"), |
| 1050 | "dependencies": ("reservation_id", "reservation_id"), |
| 1051 | "tasks": ("task_id", "run_id"), |
| 1052 | "claims": ("task_id", "claimer_run_id"), |
| 1053 | } |
| 1054 | for subdir, (id_field, run_field) in kind_dirs.items(): |
| 1055 | d = coord_dir / subdir |
| 1056 | d.mkdir(parents=True, exist_ok=True) |
| 1057 | rid = _new_id() |
| 1058 | (d / f"{rid}.json").write_text(json.dumps({id_field: rid, run_field: "r1"})) |
| 1059 | |
| 1060 | with patch(_PUSH_TARGET, return_value=_push_ok(7, 0)) as mock_push: |
| 1061 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 1062 | assert result.exit_code == 0 |
| 1063 | data = json.loads(result.output.strip()) |
| 1064 | assert data["total"] == 7 |
| 1065 | |
| 1066 | def test_text_output_shows_owner_slug(self, repo: pathlib.Path) -> None: |
| 1067 | _write_local_reservation(repo) |
| 1068 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 1069 | result = runner.invoke(cli, _PUSH_ARGS) |
| 1070 | assert "gabriel" in result.output |
| 1071 | assert "myrepo" in result.output |
| 1072 | |
| 1073 | def test_text_output_shows_checkmark_on_success(self, repo: pathlib.Path) -> None: |
| 1074 | _write_local_reservation(repo) |
| 1075 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 1076 | result = runner.invoke(cli, _PUSH_ARGS) |
| 1077 | assert "✅" in result.output |
| 1078 | |
| 1079 | def test_text_output_shows_cross_on_failure(self, repo: pathlib.Path) -> None: |
| 1080 | _write_local_reservation(repo) |
| 1081 | with patch(_PUSH_TARGET, side_effect=CoordBusError("hub down")): |
| 1082 | result = runner.invoke(cli, _PUSH_ARGS) |
| 1083 | assert "❌" in result.stderr or result.exit_code == 1 |
| 1084 | |
| 1085 | def test_help_shows_agent_quickstart(self, repo: pathlib.Path) -> None: |
| 1086 | result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) |
| 1087 | assert "Agent quickstart" in result.output |
| 1088 | |
| 1089 | def test_help_shows_json_schema(self, repo: pathlib.Path) -> None: |
| 1090 | result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) |
| 1091 | assert "JSON output schema" in result.output |
| 1092 | |
| 1093 | def test_help_shows_exit_codes(self, repo: pathlib.Path) -> None: |
| 1094 | result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) |
| 1095 | assert "Exit codes" in result.output |
| 1096 | |
| 1097 | |
| 1098 | # --------------------------------------------------------------------------- |
| 1099 | # Security — muse coord sync push |
| 1100 | # --------------------------------------------------------------------------- |
| 1101 | |
| 1102 | |
| 1103 | class TestCoordSyncPushSecurity: |
| 1104 | def test_ansi_in_owner_sanitized_in_text_output(self, repo: pathlib.Path) -> None: |
| 1105 | """ANSI codes in --owner must not bleed into text output.""" |
| 1106 | _write_local_reservation(repo) |
| 1107 | ansi_owner = "\x1b[31mmalicious\x1b[0m" |
| 1108 | push_args = [ |
| 1109 | "coord", "sync", "push", |
| 1110 | "--hub", "https://localhost:1337", |
| 1111 | "--owner", ansi_owner, |
| 1112 | "--slug", "myrepo", |
| 1113 | |
| 1114 | ] |
| 1115 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 1116 | result = runner.invoke(cli, push_args) |
| 1117 | assert "\x1b" not in result.output |
| 1118 | |
| 1119 | def test_ansi_in_slug_sanitized_in_text_output(self, repo: pathlib.Path) -> None: |
| 1120 | _write_local_reservation(repo) |
| 1121 | ansi_slug = "\x1b[32minjected\x1b[0m" |
| 1122 | push_args = [ |
| 1123 | "coord", "sync", "push", |
| 1124 | "--hub", "https://localhost:1337", |
| 1125 | "--owner", "gabriel", |
| 1126 | "--slug", ansi_slug, |
| 1127 | |
| 1128 | ] |
| 1129 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 1130 | result = runner.invoke(cli, push_args) |
| 1131 | assert "\x1b" not in result.output |
| 1132 | |
| 1133 | def test_token_not_in_json_output(self, repo: pathlib.Path) -> None: |
| 1134 | """Auth token must never appear in JSON output.""" |
| 1135 | _write_local_reservation(repo) |
| 1136 | secret = "super-secret-token-xyz" |
| 1137 | push_args = [ |
| 1138 | "coord", "sync", "push", |
| 1139 | "--hub", "https://localhost:1337", |
| 1140 | "--owner", "gabriel", |
| 1141 | "--slug", "myrepo", |
| 1142 | |
| 1143 | "--json", |
| 1144 | ] |
| 1145 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 1146 | result = runner.invoke(cli, push_args) |
| 1147 | assert secret not in result.output |
| 1148 | |
| 1149 | def test_token_not_in_text_output(self, repo: pathlib.Path) -> None: |
| 1150 | _write_local_reservation(repo) |
| 1151 | secret = "super-secret-token-abc" |
| 1152 | push_args = [ |
| 1153 | "coord", "sync", "push", |
| 1154 | "--hub", "https://localhost:1337", |
| 1155 | "--owner", "gabriel", |
| 1156 | "--slug", "myrepo", |
| 1157 | |
| 1158 | ] |
| 1159 | with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): |
| 1160 | result = runner.invoke(cli, push_args) |
| 1161 | assert secret not in result.output |
| 1162 | |
| 1163 | def test_no_traceback_on_coord_bus_error(self, repo: pathlib.Path) -> None: |
| 1164 | _write_local_reservation(repo) |
| 1165 | with patch(_PUSH_TARGET, side_effect=CoordBusError("network failure")): |
| 1166 | result = runner.invoke(cli, _PUSH_ARGS) |
| 1167 | assert "Traceback" not in result.output |
| 1168 | |
| 1169 | def test_owner_length_cap_before_io(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: |
| 1170 | """Owner length check fires before any file system access.""" |
| 1171 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 1172 | long_owner = "x" * (_MAX_OWNER_LEN + 1) |
| 1173 | push_args = [ |
| 1174 | "coord", "sync", "push", |
| 1175 | "--hub", "https://localhost:1337", |
| 1176 | "--owner", long_owner, |
| 1177 | "--slug", "myrepo", |
| 1178 | |
| 1179 | ] |
| 1180 | result = runner.invoke(cli, push_args) |
| 1181 | assert result.exit_code == 1 |
| 1182 | assert "Traceback" not in result.output |
| 1183 | |
| 1184 | |
| 1185 | # --------------------------------------------------------------------------- |
| 1186 | # Stress — muse coord sync push |
| 1187 | # --------------------------------------------------------------------------- |
| 1188 | |
| 1189 | |
| 1190 | class TestCoordSyncPushStress: |
| 1191 | def test_50_sequential_push_calls_no_records(self, repo: pathlib.Path) -> None: |
| 1192 | """50 sequential pushes with no records all exit 0.""" |
| 1193 | for i in range(50): |
| 1194 | result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) |
| 1195 | assert result.exit_code == 0, f"Call {i}: {result.output}" |
| 1196 | data = json.loads(result.output.strip()) |
| 1197 | assert data["total"] == 0 |
| 1198 | |
| 1199 | def test_push_1200_records_correct_batch_count(self, repo: pathlib.Path) -> None: |
| 1200 | """1200 records → ceil(1200/500) = 3 batches.""" |
| 1201 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 1202 | for i in range(1200): |
| 1203 | _write_local_reservation(repo, run_id=f"agent-{i}") |
| 1204 | call_count = 0 |
| 1205 | def counting_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: |
| 1206 | nonlocal call_count |
| 1207 | call_count += 1 |
| 1208 | return {"inserted": len(batch), "skipped": 0} |
| 1209 | with patch(_PUSH_TARGET, side_effect=counting_push): |
| 1210 | result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation", "-j"]) |
| 1211 | assert result.exit_code == 0 |
| 1212 | expected_batches = -(-1200 // MAX_PUSH_BATCH) # ceil division |
| 1213 | assert call_count == expected_batches |
| 1214 | data = json.loads(result.output.strip()) |
| 1215 | assert data["inserted"] == 1200 |
| 1216 | |
| 1217 | def test_concurrent_push_8_threads(self, repo: pathlib.Path) -> None: |
| 1218 | """8 threads each call run_push directly; patches applied at test level. |
| 1219 | |
| 1220 | The goal is to verify that concurrent calls to run_push do not crash |
| 1221 | or corrupt internal state. All threads share the same repo fixture; |
| 1222 | per-thread module mutation is intentionally avoided here because |
| 1223 | unguarded write-then-restore of a module attribute across threads is a |
| 1224 | race condition that can leave the module permanently patched after the |
| 1225 | test completes, polluting later tests. |
| 1226 | """ |
| 1227 | import argparse |
| 1228 | import threading |
| 1229 | |
| 1230 | from muse.cli.commands.coord_sync import run_push |
| 1231 | |
| 1232 | _write_local_reservation(repo, run_id="shared-agent") |
| 1233 | |
| 1234 | errors: list[str] = [] |
| 1235 | |
| 1236 | def worker(idx: int) -> None: |
| 1237 | args = argparse.Namespace( |
| 1238 | hub="https://localhost:1337", |
| 1239 | owner="gabriel", |
| 1240 | slug="myrepo", |
| 1241 | signing=None, |
| 1242 | kinds=list(_ALL_KINDS), |
| 1243 | json_out=True, |
| 1244 | ) |
| 1245 | try: |
| 1246 | run_push(args) |
| 1247 | except SystemExit as exc: |
| 1248 | if exc.code != 0: |
| 1249 | errors.append(f"Thread {idx}: exit {exc.code}") |
| 1250 | except Exception as exc: |
| 1251 | errors.append(f"Thread {idx}: {exc}") |
| 1252 | |
| 1253 | def fake_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: |
| 1254 | return {"inserted": len(batch), "skipped": 0} |
| 1255 | |
| 1256 | push_p = patch(_PUSH_TARGET, side_effect=fake_push) |
| 1257 | repo_p = patch("muse.cli.commands.coord_sync.require_repo", return_value=repo) |
| 1258 | push_p.start() |
| 1259 | repo_p.start() |
| 1260 | try: |
| 1261 | threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] |
| 1262 | for t in threads: |
| 1263 | t.start() |
| 1264 | for t in threads: |
| 1265 | t.join() |
| 1266 | finally: |
| 1267 | push_p.stop() |
| 1268 | repo_p.stop() |
| 1269 | assert not errors, f"Concurrent failures: {errors}" |
| 1270 | |
| 1271 | |
| 1272 | # --------------------------------------------------------------------------- |
| 1273 | # Extended — muse coord sync pull |
| 1274 | # --------------------------------------------------------------------------- |
| 1275 | |
| 1276 | |
| 1277 | class TestCoordSyncPullExtended: |
| 1278 | def test_j_alias_works(self, repo: pathlib.Path) -> None: |
| 1279 | """-j is equivalent to --json for pull.""" |
| 1280 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1281 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1282 | assert result.exit_code == 0, result.output |
| 1283 | data = json.loads(result.output.strip()) |
| 1284 | assert "count" in data |
| 1285 | |
| 1286 | def test_help_flag(self, repo: pathlib.Path) -> None: |
| 1287 | result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) |
| 1288 | assert result.exit_code == 0 |
| 1289 | |
| 1290 | def test_json_compact_single_line(self, repo: pathlib.Path) -> None: |
| 1291 | """JSON output is a single compact line — no indent=2.""" |
| 1292 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1293 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1294 | assert result.exit_code == 0 |
| 1295 | lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] |
| 1296 | assert len(lines) == 1, f"Expected compact JSON, got: {result.output!r}" |
| 1297 | |
| 1298 | def test_json_all_required_fields(self, repo: pathlib.Path) -> None: |
| 1299 | """JSON always has schema, count, cursor, records, duration_ms.""" |
| 1300 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1301 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1302 | data = json.loads(result.output.strip()) |
| 1303 | for field in ("schema", "count", "cursor", "records", "duration_ms"): |
| 1304 | assert field in data, f"Missing field: {field}" |
| 1305 | |
| 1306 | def test_json_count_is_int(self, repo: pathlib.Path) -> None: |
| 1307 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1308 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1309 | data = json.loads(result.output.strip()) |
| 1310 | assert isinstance(data["count"], int) |
| 1311 | assert isinstance(data["cursor"], int) |
| 1312 | |
| 1313 | def test_json_records_is_list(self, repo: pathlib.Path) -> None: |
| 1314 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1315 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1316 | data = json.loads(result.output.strip()) |
| 1317 | assert isinstance(data["records"], list) |
| 1318 | |
| 1319 | def test_json_duration_ms_is_number(self, repo: pathlib.Path) -> None: |
| 1320 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1321 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1322 | data = json.loads(result.output.strip()) |
| 1323 | assert isinstance(data["duration_ms"], (int, float)) |
| 1324 | assert data["duration_ms"] >= 0 |
| 1325 | |
| 1326 | def test_json_schema_is_int(self, repo: pathlib.Path) -> None: |
| 1327 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1328 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1329 | data = json.loads(result.output.strip()) |
| 1330 | assert isinstance(data["schema"], int) |
| 1331 | assert data["schema"] >= 1 |
| 1332 | |
| 1333 | def test_json_count_matches_records_length(self, repo: pathlib.Path) -> None: |
| 1334 | fake_records = [ |
| 1335 | {"kind": "reservation", "record_id": _new_id(), "run_id": "r1", "payload": {}}, |
| 1336 | {"kind": "reservation", "record_id": _new_id(), "run_id": "r2", "payload": {}}, |
| 1337 | ] |
| 1338 | with patch(_PULL_TARGET, return_value=_pull_ok(fake_records, cursor=2)): |
| 1339 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1340 | data = json.loads(result.output.strip()) |
| 1341 | assert data["count"] == 2 |
| 1342 | assert len(data["records"]) == 2 |
| 1343 | |
| 1344 | def test_json_cursor_reflects_hub_cursor(self, repo: pathlib.Path) -> None: |
| 1345 | with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=42)): |
| 1346 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1347 | data = json.loads(result.output.strip()) |
| 1348 | assert data["cursor"] == 42 |
| 1349 | |
| 1350 | def test_zero_records_exits_0(self, repo: pathlib.Path) -> None: |
| 1351 | """0 records returned is a valid success.""" |
| 1352 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1353 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1354 | assert result.exit_code == 0 |
| 1355 | data = json.loads(result.output.strip()) |
| 1356 | assert data["count"] == 0 |
| 1357 | |
| 1358 | def test_incremental_pull_since_id_forwarded(self, repo: pathlib.Path) -> None: |
| 1359 | """--since-id is passed through to pull_from_hub.""" |
| 1360 | with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: |
| 1361 | runner.invoke(cli, _PULL_ARGS + ["--since-id", "99"]) |
| 1362 | _, kwargs = mock_pull.call_args |
| 1363 | assert mock_pull.call_args[0][3] == 99 or kwargs.get("since_id") == 99 or mock_pull.call_args[0][3] == 99 |
| 1364 | |
| 1365 | def test_records_written_to_remote_dir(self, repo: pathlib.Path) -> None: |
| 1366 | """Records returned by hub are written to .muse/coordination/remote/.""" |
| 1367 | rid = _new_id() |
| 1368 | fake_records = [{"kind": "reservation", "record_id": rid, "run_id": "r1", "payload": {}}] |
| 1369 | with patch(_PULL_TARGET, return_value=_pull_ok(fake_records)): |
| 1370 | result = runner.invoke(cli, _PULL_ARGS) |
| 1371 | assert result.exit_code == 0 |
| 1372 | written = coordination_dir(repo) / "remote" / "reservation" / f"{rid}.json" |
| 1373 | assert written.exists() |
| 1374 | |
| 1375 | def test_text_output_shows_owner_slug(self, repo: pathlib.Path) -> None: |
| 1376 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1377 | result = runner.invoke(cli, _PULL_ARGS) |
| 1378 | assert "gabriel" in result.output |
| 1379 | assert "myrepo" in result.output |
| 1380 | |
| 1381 | def test_text_output_shows_cursor(self, repo: pathlib.Path) -> None: |
| 1382 | with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=7)): |
| 1383 | result = runner.invoke(cli, _PULL_ARGS) |
| 1384 | assert "7" in result.output |
| 1385 | |
| 1386 | def test_text_output_shows_remote_path_when_records(self, repo: pathlib.Path) -> None: |
| 1387 | rid = _new_id() |
| 1388 | fake_records = [{"kind": "reservation", "record_id": rid, "run_id": "r1", "payload": {}}] |
| 1389 | with patch(_PULL_TARGET, return_value=_pull_ok(fake_records)): |
| 1390 | result = runner.invoke(cli, _PULL_ARGS) |
| 1391 | assert "remote" in result.output.lower() |
| 1392 | |
| 1393 | def test_help_shows_agent_quickstart(self, repo: pathlib.Path) -> None: |
| 1394 | result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) |
| 1395 | assert "Agent quickstart" in result.output |
| 1396 | |
| 1397 | def test_help_shows_json_schema(self, repo: pathlib.Path) -> None: |
| 1398 | result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) |
| 1399 | assert "JSON output schema" in result.output |
| 1400 | |
| 1401 | def test_help_shows_exit_codes(self, repo: pathlib.Path) -> None: |
| 1402 | result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) |
| 1403 | assert "Exit codes" in result.output |
| 1404 | |
| 1405 | |
| 1406 | # --------------------------------------------------------------------------- |
| 1407 | # Security — muse coord sync pull |
| 1408 | # --------------------------------------------------------------------------- |
| 1409 | |
| 1410 | |
| 1411 | class TestCoordSyncPullSecurity: |
| 1412 | def test_ansi_in_owner_sanitized_in_text_output(self, repo: pathlib.Path) -> None: |
| 1413 | """ANSI codes in --owner must not bleed into text output.""" |
| 1414 | ansi_owner = "\x1b[31mmalicious\x1b[0m" |
| 1415 | pull_args = [ |
| 1416 | "coord", "sync", "pull", |
| 1417 | "--hub", "https://localhost:1337", |
| 1418 | "--owner", ansi_owner, |
| 1419 | "--slug", "myrepo", |
| 1420 | |
| 1421 | "--since-id", "0", |
| 1422 | ] |
| 1423 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1424 | result = runner.invoke(cli, pull_args) |
| 1425 | assert "\x1b" not in result.output |
| 1426 | |
| 1427 | def test_ansi_in_slug_sanitized_in_text_output(self, repo: pathlib.Path) -> None: |
| 1428 | ansi_slug = "\x1b[32minjected\x1b[0m" |
| 1429 | pull_args = [ |
| 1430 | "coord", "sync", "pull", |
| 1431 | "--hub", "https://localhost:1337", |
| 1432 | "--owner", "gabriel", |
| 1433 | "--slug", ansi_slug, |
| 1434 | |
| 1435 | "--since-id", "0", |
| 1436 | ] |
| 1437 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1438 | result = runner.invoke(cli, pull_args) |
| 1439 | assert "\x1b" not in result.output |
| 1440 | |
| 1441 | def test_token_not_in_json_output(self, repo: pathlib.Path) -> None: |
| 1442 | """Auth token must never appear in JSON output.""" |
| 1443 | secret = "super-secret-pull-token" |
| 1444 | pull_args = [ |
| 1445 | "coord", "sync", "pull", |
| 1446 | "--hub", "https://localhost:1337", |
| 1447 | "--owner", "gabriel", |
| 1448 | "--slug", "myrepo", |
| 1449 | |
| 1450 | "--since-id", "0", |
| 1451 | "--json", |
| 1452 | ] |
| 1453 | with patch(_PULL_TARGET, return_value=_pull_ok()): |
| 1454 | result = runner.invoke(cli, pull_args) |
| 1455 | assert secret not in result.output |
| 1456 | |
| 1457 | def test_no_traceback_on_coord_bus_error(self, repo: pathlib.Path) -> None: |
| 1458 | with patch(_PULL_TARGET, side_effect=CoordBusError("timeout")): |
| 1459 | result = runner.invoke(cli, _PULL_ARGS) |
| 1460 | assert "Traceback" not in result.output |
| 1461 | |
| 1462 | def test_remote_records_with_traversal_record_id_skipped(self, repo: pathlib.Path) -> None: |
| 1463 | """A record with path-traversal record_id must not escape remote/.""" |
| 1464 | malicious_records = [ |
| 1465 | {"kind": "reservation", "record_id": "../../malicious", "run_id": "r1", "payload": {}}, |
| 1466 | ] |
| 1467 | with patch(_PULL_TARGET, return_value=_pull_ok(malicious_records)): |
| 1468 | result = runner.invoke(cli, _PULL_ARGS) |
| 1469 | assert result.exit_code == 0 |
| 1470 | malicious_path = coordination_dir(repo) / "remote" / "reservation" / "../../malicious.json" |
| 1471 | assert not malicious_path.exists() |
| 1472 | # Confirm nothing was written at all |
| 1473 | remote_dir = coordination_dir(repo) / "remote" |
| 1474 | if remote_dir.exists(): |
| 1475 | assert list(remote_dir.rglob("*.json")) == [] |
| 1476 | |
| 1477 | def test_remote_records_with_unknown_kind_skipped(self, repo: pathlib.Path) -> None: |
| 1478 | """A record with an unknown kind must not be written anywhere.""" |
| 1479 | malicious_records = [ |
| 1480 | {"kind": "../malicious_dir", "record_id": "safe-id", "run_id": "r1", "payload": {}}, |
| 1481 | ] |
| 1482 | with patch(_PULL_TARGET, return_value=_pull_ok(malicious_records)): |
| 1483 | result = runner.invoke(cli, _PULL_ARGS) |
| 1484 | assert result.exit_code == 0 |
| 1485 | remote_dir = coordination_dir(repo) / "remote" |
| 1486 | if remote_dir.exists(): |
| 1487 | assert list(remote_dir.rglob("*.json")) == [] |
| 1488 | |
| 1489 | |
| 1490 | # --------------------------------------------------------------------------- |
| 1491 | # Stress — muse coord sync pull |
| 1492 | # --------------------------------------------------------------------------- |
| 1493 | |
| 1494 | |
| 1495 | class TestCoordSyncPullStress: |
| 1496 | def test_50_sequential_pull_calls_no_records(self, repo: pathlib.Path) -> None: |
| 1497 | """50 sequential pulls with no records all exit 0.""" |
| 1498 | for i in range(50): |
| 1499 | with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=i)): |
| 1500 | result = runner.invoke(cli, _PULL_ARGS + ["-j"]) |
| 1501 | assert result.exit_code == 0, f"Call {i}: {result.output}" |
| 1502 | data = json.loads(result.output.strip()) |
| 1503 | assert data["count"] == 0 |
| 1504 | |
| 1505 | def test_incremental_cursor_chain_100_steps(self, repo: pathlib.Path) -> None: |
| 1506 | """100 incremental pulls each use the cursor from the previous step.""" |
| 1507 | cursor = 0 |
| 1508 | for i in range(100): |
| 1509 | rid = _new_id() |
| 1510 | fake_records = [{"kind": "reservation", "record_id": rid, "run_id": f"r{i}", "payload": {}}] |
| 1511 | with patch(_PULL_TARGET, return_value=_pull_ok(fake_records, cursor=cursor + 1)): |
| 1512 | args = _PULL_ARGS + ["--since-id", str(cursor), "-j"] |
| 1513 | result = runner.invoke(cli, args) |
| 1514 | assert result.exit_code == 0, f"Step {i}: {result.output}" |
| 1515 | data = json.loads(result.output.strip()) |
| 1516 | cursor = data["cursor"] |
| 1517 | assert cursor == 100 |
| 1518 | |
| 1519 | def test_concurrent_pull_8_threads(self, repo: pathlib.Path) -> None: |
| 1520 | """8 threads each call run_pull concurrently; patches applied at test level.""" |
| 1521 | import argparse |
| 1522 | import threading |
| 1523 | |
| 1524 | from muse.cli.commands.coord_sync import run_pull |
| 1525 | |
| 1526 | errors: list[str] = [] |
| 1527 | |
| 1528 | def worker(idx: int) -> None: |
| 1529 | args = argparse.Namespace( |
| 1530 | hub="https://localhost:1337", |
| 1531 | owner="gabriel", |
| 1532 | slug="myrepo", |
| 1533 | signing=None, |
| 1534 | since_id=0, |
| 1535 | kinds=[], |
| 1536 | limit=500, |
| 1537 | json_out=True, |
| 1538 | ) |
| 1539 | try: |
| 1540 | run_pull(args) |
| 1541 | except SystemExit as exc: |
| 1542 | if exc.code != 0: |
| 1543 | errors.append(f"Thread {idx}: exit {exc.code}") |
| 1544 | except Exception as exc: |
| 1545 | errors.append(f"Thread {idx}: {exc}") |
| 1546 | |
| 1547 | pull_p = patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)) |
| 1548 | repo_p = patch("muse.cli.commands.coord_sync.require_repo", return_value=repo) |
| 1549 | pull_p.start() |
| 1550 | repo_p.start() |
| 1551 | try: |
| 1552 | threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] |
| 1553 | for t in threads: |
| 1554 | t.start() |
| 1555 | for t in threads: |
| 1556 | t.join() |
| 1557 | finally: |
| 1558 | pull_p.stop() |
| 1559 | repo_p.stop() |
| 1560 | assert not errors, f"Concurrent failures: {errors}" |
| 1561 | |
| 1562 | |
| 1563 | # --------------------------------------------------------------------------- |
| 1564 | # TestRegisterFlags — --json / -j normalized at argparse level |
| 1565 | # --------------------------------------------------------------------------- |
| 1566 | |
| 1567 | |
| 1568 | class TestRegisterFlags: |
| 1569 | """register() must expose --json/-j with dest=json_out on pull and push.""" |
| 1570 | |
| 1571 | def _make_parser(self) -> "argparse.ArgumentParser": |
| 1572 | import argparse as ap |
| 1573 | from muse.cli.commands.coord_sync import register |
| 1574 | root = ap.ArgumentParser() |
| 1575 | subs = root.add_subparsers() |
| 1576 | register(subs) |
| 1577 | return root |
| 1578 | |
| 1579 | # --json/-j lives on the pull/push sub-subparsers, not on sync itself. |
| 1580 | _PULL_REQUIRED = ["sync", "pull", "--owner", "o", "--slug", "s"] |
| 1581 | |
| 1582 | def test_json_out_default_false(self) -> None: |
| 1583 | p = self._make_parser() |
| 1584 | ns = p.parse_args(self._PULL_REQUIRED) |
| 1585 | assert ns.json_out is False |
| 1586 | |
| 1587 | def test_json_out_true_with_json_flag(self) -> None: |
| 1588 | p = self._make_parser() |
| 1589 | ns = p.parse_args(self._PULL_REQUIRED + ["--json"]) |
| 1590 | assert ns.json_out is True |
| 1591 | |
| 1592 | def test_json_out_true_with_j_flag(self) -> None: |
| 1593 | p = self._make_parser() |
| 1594 | ns = p.parse_args(self._PULL_REQUIRED + ["-j"]) |
| 1595 | assert ns.json_out is True |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago