"""Comprehensive tests for ``muse coord sync push`` and ``muse coord sync pull``. Coverage matrix --------------- Unit ~~~~ * _gather_local_records — reads reservations from disk * _gather_local_records — reads heartbeats from disk * _gather_local_records — kinds filter respected * _gather_local_records — corrupt file skipped gracefully * _gather_local_records — claims use claimer_run_id field * _gather_local_records — all 7 kinds gathered * _write_remote_records — writes correct paths under remote/ * _write_remote_records — overwrites existing files * _write_remote_records — record with no record_id skipped * _write_remote_records — unknown kind rejected (path traversal prevention) * _write_remote_records — unsafe record_id rejected (path traversal prevention) * _write_remote_records — compact JSON written (no indent) Integration (all network calls mocked) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * push with no local records — text says "(no local coordination records to push)" * push with reservations — calls push_to_hub with correct args * push CoordBusError — exits 1 with error message * pull empty result — prints "Pulled 0 new record(s)" * pull with records — writes files to remote dir * pull CoordBusError — exits 1 with error message * --format json push — valid JSON with inserted/skipped/total/failed/elapsed/schema_version * --format json pull — valid JSON with count/cursor/records/elapsed/schema_version * --json shorthand push — same as --format json * --json shorthand pull — same as --format json * --since-id N — passed through to pull_from_hub * --kinds filter push — restricts gathered kinds * --limit N — passed through to pull_from_hub * duration_ms present in both push and pull JSON output Input validation ~~~~~~~~~~~~~~~~ * --owner too long → exit 1 before any I/O * --slug too long → exit 1 before any I/O * --since-id negative → exit 1 before any I/O * --limit = 0 → exit 1 before any I/O * --limit > 1000 → exit 1 before any I/O * --limit at boundary 1 → accepted * --limit at boundary 1000 → accepted * push owner/slug validation fires before require_repo * pull since-id/limit validation fires before require_repo Security ~~~~~~~~ * owner/slug with path traversal chars are passed as strings to push_to_hub * token not echoed in output * _write_remote_records rejects unknown kind (prevents escaping remote/) * _write_remote_records rejects traversal in record_id Stress ~~~~~~ * push 600 records splits into multiple batches * pull returns 1000 records, all written to disk """ from __future__ import annotations import itertools import json import pathlib from typing import TYPE_CHECKING import pytest from unittest.mock import patch, MagicMock, call from tests.cli_test_helper import CliRunner from muse.core.types import MsgpackDict, content_hash from muse.core.coord_bus import CoordBusError, JsonDict _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) if TYPE_CHECKING: from muse.core.transport import SigningIdentity from muse.core.paths import coordination_dir, muse_dir from muse.cli.commands.coord_sync import ( _MAX_OWNER_LEN, _MAX_SLUG_LEN, _MAX_PULL_LIMIT, _ALL_KINDS, ) runner = CliRunner() cli = None _PUSH_TARGET = "muse.cli.commands.coord_sync.push_to_hub" _PULL_TARGET = "muse.cli.commands.coord_sync.pull_from_hub" # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) return tmp_path # ── Helpers ─────────────────────────────────────────────────────────────────── def _write_local_claim(repo: pathlib.Path, task_id: str | None = None, run_id: str = "worker-1") -> str: claim_dir = coordination_dir(repo) / "claims" claim_dir.mkdir(parents=True, exist_ok=True) tid = task_id or _new_id() data = { "task_id": tid, "claimer_run_id": run_id, "claimed_at": "2026-01-01T00:00:00+00:00", "expires_at": "2026-12-31T00:00:00+00:00", } (claim_dir / f"{tid}.json").write_text(json.dumps(data)) return tid def _write_local_reservation(repo: pathlib.Path, run_id: str = "agent-1") -> str: coord_dir = coordination_dir(repo) / "reservations" coord_dir.mkdir(parents=True, exist_ok=True) rid = _new_id() data = { "reservation_id": rid, "run_id": run_id, "branch": "main", "addresses": ["src/x.py::foo"], "operation": None, "created_at": "2026-01-01T00:00:00+00:00", "expires_at": "2026-12-31T00:00:00+00:00", } (coord_dir / f"{rid}.json").write_text(json.dumps(data)) return rid def _write_local_heartbeat(repo: pathlib.Path, run_id: str = "agent-1") -> str: hb_dir = coordination_dir(repo) / "heartbeats" hb_dir.mkdir(parents=True, exist_ok=True) data = { "run_id": run_id, "last_seen": "2026-01-01T00:01:00+00:00", "expires_at": "2026-12-31T00:00:00+00:00", } (hb_dir / f"{run_id}.json").write_text(json.dumps(data)) return run_id _PUSH_ARGS = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", "myrepo", ] _PULL_ARGS = [ "coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", "myrepo", "--since-id", "0", ] def _push_ok(inserted: int = 1, skipped: int = 0) -> MsgpackDict: return {"inserted": inserted, "skipped": skipped} def _pull_ok(records: list[MsgpackDict] | None = None, cursor: int = 0) -> MsgpackDict: return {"records": records or [], "cursor": cursor} # ── Unit: _gather_local_records ─────────────────────────────────────────────── class TestGatherLocalRecords: def test_empty_coordination_dir_returns_empty(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records records = _gather_local_records(repo, kinds=["reservation"]) assert records == [] def test_reads_reservation_from_disk(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records rid = _write_local_reservation(repo) records = _gather_local_records(repo, kinds=["reservation"]) assert len(records) == 1 assert records[0]["kind"] == "reservation" assert records[0]["record_id"] == rid def test_reads_heartbeat_from_disk(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records run_id = _write_local_heartbeat(repo, "hb-agent") records = _gather_local_records(repo, kinds=["heartbeat"]) assert len(records) == 1 assert records[0]["kind"] == "heartbeat" assert records[0]["run_id"] == run_id def test_kinds_filter_excludes_heartbeats(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records _write_local_reservation(repo) _write_local_heartbeat(repo) records = _gather_local_records(repo, kinds=["reservation"]) assert all(r["kind"] == "reservation" for r in records) def test_kinds_filter_excludes_reservations(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records _write_local_reservation(repo) _write_local_heartbeat(repo) records = _gather_local_records(repo, kinds=["heartbeat"]) assert all(r["kind"] == "heartbeat" for r in records) def test_corrupt_file_skipped_gracefully(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records coord_dir = coordination_dir(repo) / "reservations" coord_dir.mkdir(parents=True, exist_ok=True) (coord_dir / "bad.json").write_text("not-valid-json{{{{") records = _gather_local_records(repo, kinds=["reservation"]) assert records == [] def test_multiple_records_all_returned(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records for i in range(5): _write_local_reservation(repo, run_id=f"agent-{i}") records = _gather_local_records(repo, kinds=["reservation"]) assert len(records) == 5 def test_payload_field_contains_original_data(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records rid = _write_local_reservation(repo, run_id="my-agent") records = _gather_local_records(repo, kinds=["reservation"]) assert records[0]["payload"]["reservation_id"] == rid assert records[0]["payload"]["run_id"] == "my-agent" # ── Unit: _write_remote_records ─────────────────────────────────────────────── class TestWriteRemoteRecords: def test_writes_file_to_correct_path(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "abc-123", "payload": {"x": 1}} _write_remote_records(repo, [rec]) target = coordination_dir(repo) / "remote" / "reservation" / "abc-123.json" assert target.exists() def test_written_file_contains_correct_data(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "def-456", "payload": {"y": 2}} _write_remote_records(repo, [rec]) target = coordination_dir(repo) / "remote" / "reservation" / "def-456.json" data = json.loads(target.read_text()) assert data["payload"]["y"] == 2 def test_overwrites_existing_file(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _write_remote_records kind_dir = coordination_dir(repo) / "remote" / "reservation" kind_dir.mkdir(parents=True, exist_ok=True) (kind_dir / "ghi-789.json").write_text('{"old": true}') rec = {"kind": "reservation", "record_id": "ghi-789", "payload": {"new": True}} _write_remote_records(repo, [rec]) data = json.loads((kind_dir / "ghi-789.json").read_text()) assert data["payload"]["new"] is True def test_record_without_record_id_skipped(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "", "payload": {}} _write_remote_records(repo, [rec]) remote_dir = coordination_dir(repo) / "remote" assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) def test_multiple_kinds_written_to_separate_dirs(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _write_remote_records records = [ {"kind": "reservation", "record_id": "r1", "payload": {}}, {"kind": "heartbeat", "record_id": "h1", "payload": {}}, ] _write_remote_records(repo, records) assert (coordination_dir(repo) / "remote" / "reservation" / "r1.json").exists() assert (coordination_dir(repo) / "remote" / "heartbeat" / "h1.json").exists() # ── Integration: push ───────────────────────────────────────────────────────── class TestCoordSyncPushIntegration: def test_push_no_local_records_text_output(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET) as mock_push: result = runner.invoke(cli, _PUSH_ARGS) assert result.exit_code == 0 assert "no local coordination records to push" in result.output mock_push.assert_not_called() def test_push_with_reservation_calls_push_to_hub(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)) as mock_push: result = runner.invoke(cli, _PUSH_ARGS) assert result.exit_code == 0 mock_push.assert_called_once() call_args = mock_push.call_args assert call_args[0][1] == "gabriel" # owner assert call_args[0][2] == "myrepo" # slug def test_push_text_output_contains_inserted_skipped(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS) assert result.exit_code == 0 assert "inserted" in result.output assert "skipped" in result.output def test_push_coord_bus_error_exits_1(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, side_effect=CoordBusError("hub down")): result = runner.invoke(cli, _PUSH_ARGS) assert result.exit_code == 1 assert "error" in result.stderr.lower() or "hub down" in result.stderr def test_push_format_json_valid_structure(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "inserted" in data assert "skipped" in data assert "total" in data assert "failed" in data def test_push_json_shorthand(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(2, 1)): r1 = runner.invoke(cli, _PUSH_ARGS + ["--json"]) r2 = runner.invoke(cli, _PUSH_ARGS + ["--json"]) d1 = json.loads(r1.output.strip()) d2 = json.loads(r2.output.strip()) d1.pop("timestamp", None) d2.pop("timestamp", None) d1.pop("duration_ms", None) d2.pop("duration_ms", None) assert d1 == d2 def test_push_json_no_records_returns_zeros(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET): result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["total"] == 0 assert data["inserted"] == 0 def test_push_kinds_filter_passed_through(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) _write_local_heartbeat(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)) as mock_push: result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation"]) assert result.exit_code == 0 # Only 1 kind → batch should contain only reservations batch_arg = mock_push.call_args[0][3] assert all(r["kind"] == "reservation" for r in batch_arg) def test_push_coord_bus_error_json_failed_true(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, side_effect=CoordBusError("oops")): result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) # exit_code 1; summary JSON (with failed=True) is the last JSON line assert result.exit_code == 1 json_lines = [ln for ln in result.output.splitlines() if ln.startswith("{")] assert json_lines, "Expected at least one JSON output line" # The summary JSON is last; the error JSON ({"error": ...}) may appear before it summary = json.loads(json_lines[-1]) assert summary["failed"] is True # ── Integration: pull ───────────────────────────────────────────────────────── class TestCoordSyncPullIntegration: def test_pull_empty_result_exits_0(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 assert "Pulled 0 new record(s)" in result.output def test_pull_with_records_writes_files(self, repo: pathlib.Path) -> None: records = [{"kind": "reservation", "record_id": "r1", "payload": {"x": 1}}] with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=1)): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 target = coordination_dir(repo) / "remote" / "reservation" / "r1.json" assert target.exists() def test_pull_text_output_contains_cursor(self, repo: pathlib.Path) -> None: records = [{"kind": "reservation", "record_id": "r42", "payload": {}}] with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=42)): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 assert "cursor: 42" in result.output def test_pull_coord_bus_error_exits_1(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, side_effect=CoordBusError("connection refused")): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 1 assert "connection refused" in result.stderr def test_pull_format_json_valid_structure(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok(cursor=7)): result = runner.invoke(cli, _PULL_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "count" in data assert "cursor" in data assert "records" in data def test_pull_json_shorthand(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)): r1 = runner.invoke(cli, _PULL_ARGS + ["--json"]) r2 = runner.invoke(cli, _PULL_ARGS + ["--json"]) d1 = json.loads(r1.output.strip()) d2 = json.loads(r2.output.strip()) d1.pop("timestamp", None) d2.pop("timestamp", None) d1.pop("duration_ms", None) d2.pop("duration_ms", None) assert d1 == d2 def test_pull_since_id_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: runner.invoke(cli, _PULL_ARGS[:-2] + ["--since-id", "99"]) assert mock_pull.called call_args = mock_pull.call_args[0] assert call_args[3] == 99 # since_id def test_pull_limit_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: runner.invoke(cli, _PULL_ARGS + ["--limit", "42"]) call_args = mock_pull.call_args[0] assert call_args[5] == 42 # limit def test_pull_kinds_filter_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: runner.invoke(cli, _PULL_ARGS + ["--kinds", "reservation", "heartbeat"]) call_args = mock_pull.call_args[0] assert "reservation" in call_args[4] assert "heartbeat" in call_args[4] def test_pull_json_count_matches_records_length(self, repo: pathlib.Path) -> None: records = [ {"kind": "reservation", "record_id": f"r{i}", "payload": {}} for i in range(5) ] with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=5)): result = runner.invoke(cli, _PULL_ARGS + ["--json"]) data = json.loads(result.output.strip()) assert data["count"] == 5 assert len(data["records"]) == 5 def test_pull_signing_passed_to_pull_from_hub(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: runner.invoke(cli, _PULL_ARGS) call_args = mock_pull.call_args[0] assert call_args[6] is None # signing (no identity configured in test) # ── Security ────────────────────────────────────────────────────────────────── class TestCoordSyncSecurity: def test_push_traversal_owner_passed_as_string(self, repo: pathlib.Path) -> None: """Path-traversal chars in owner are passed as-is to push_to_hub (encoding is push_to_hub's job).""" _write_local_reservation(repo) malicious_owner = "../traversal" args = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", malicious_owner, "--slug", "myrepo", ] with patch(_PUSH_TARGET, return_value=_push_ok()) as mock_push: runner.invoke(cli, args) if mock_push.called: call_args = mock_push.call_args[0] assert call_args[1] == malicious_owner # owner string passed verbatim def test_token_not_in_output(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS) assert "tok" not in result.output def test_pull_token_not_in_output(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS) assert "tok" not in result.output def test_write_remote_records_rejects_unknown_kind(self, repo: pathlib.Path) -> None: """Server-supplied kind '../../malicious' must not escape remote/ directory.""" from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "../../malicious", "record_id": "abc123", "payload": {}} _write_remote_records(repo, [rec]) remote_dir = coordination_dir(repo) / "remote" assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) def test_write_remote_records_rejects_traversal_record_id(self, repo: pathlib.Path) -> None: """Server-supplied record_id '../../../etc/passwd' must not escape kind dir.""" from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "../../../etc/passwd", "payload": {}} _write_remote_records(repo, [rec]) remote_dir = coordination_dir(repo) / "remote" assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) # ── Unit: _gather_local_records — claim field name ───────────────────────────── class TestGatherLocalRecordsClaims: def test_claim_uses_claimer_run_id_field(self, repo: pathlib.Path) -> None: """Bug fix: claims must read 'claimer_run_id', not 'claimed_by'.""" from muse.cli.commands.coord_sync import _gather_local_records tid = _write_local_claim(repo, run_id="correct-worker") records = _gather_local_records(repo, kinds=["claim"]) assert len(records) == 1 assert records[0]["run_id"] == "correct-worker" def test_claim_record_id_is_task_id(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records tid = _write_local_claim(repo) records = _gather_local_records(repo, kinds=["claim"]) assert records[0]["record_id"] == tid def test_claim_expires_at_included(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records _write_local_claim(repo) records = _gather_local_records(repo, kinds=["claim"]) assert records[0]["expires_at"] is not None # ── Unit: _write_remote_records — compact JSON ──────────────────────────────── class TestWriteRemoteRecordsCompact: def test_written_json_is_compact(self, repo: pathlib.Path) -> None: """No indent=2 — remote files must be compact single-line JSON.""" from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "compact-test", "payload": {"x": 1}} _write_remote_records(repo, [rec]) target = coordination_dir(repo) / "remote" / "reservation" / "compact-test.json" raw = target.read_text().strip() # Compact JSON has no interior newlines assert "\n" not in raw def test_valid_kinds_all_accepted(self, repo: pathlib.Path) -> None: """All 7 kinds are accepted by the allowlist.""" from muse.cli.commands.coord_sync import _write_remote_records records = [ {"kind": k, "record_id": f"id-{i}", "payload": {}} for i, k in enumerate(_ALL_KINDS) ] _write_remote_records(repo, records) remote_dir = coordination_dir(repo) / "remote" written = list(remote_dir.rglob("*.json")) assert len(written) == len(_ALL_KINDS) def test_empty_record_id_still_skipped(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "", "payload": {}} _write_remote_records(repo, [rec]) remote_dir = coordination_dir(repo) / "remote" assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) def test_record_id_with_dots_rejected(self, repo: pathlib.Path) -> None: """Dots in record_id are not allowed (could be used for traversal).""" from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "..malicious", "payload": {}} _write_remote_records(repo, [rec]) remote_dir = coordination_dir(repo) / "remote" assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) def test_record_id_with_slash_rejected(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "a/b", "payload": {}} _write_remote_records(repo, [rec]) remote_dir = coordination_dir(repo) / "remote" assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) def test_record_id_too_long_rejected(self, repo: pathlib.Path) -> None: """record_ids over 128 chars are rejected.""" from muse.cli.commands.coord_sync import _write_remote_records rec = {"kind": "reservation", "record_id": "a" * 129, "payload": {}} _write_remote_records(repo, [rec]) remote_dir = coordination_dir(repo) / "remote" assert not remote_dir.exists() or not any(remote_dir.rglob("*.json")) # ── Input validation ────────────────────────────────────────────────────────── class TestSyncInputValidation: def _push_args(self, owner: str = "gabriel", slug: str = "myrepo", extra: list[str] | None = None) -> list[str]: args = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", owner, "--slug", slug, ] if extra: args.extend(extra) return args def _pull_args(self, owner: str = "gabriel", slug: str = "myrepo", extra: list[str] | None = None) -> list[str]: args = [ "coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", owner, "--slug", slug, ] if extra: args.extend(extra) return args def test_push_owner_too_long_exits_1(self, repo: pathlib.Path) -> None: owner = "x" * (_MAX_OWNER_LEN + 1) result = runner.invoke(cli, self._push_args(owner=owner)) assert result.exit_code == 1 def test_push_slug_too_long_exits_1(self, repo: pathlib.Path) -> None: slug = "x" * (_MAX_SLUG_LEN + 1) result = runner.invoke(cli, self._push_args(slug=slug)) assert result.exit_code == 1 def test_push_owner_at_max_accepted(self, repo: pathlib.Path) -> None: owner = "x" * _MAX_OWNER_LEN with patch(_PUSH_TARGET, return_value=_push_ok()): result = runner.invoke(cli, self._push_args(owner=owner)) # No validation error — exits 0 (no records to push) assert result.exit_code == 0 def test_push_slug_at_max_accepted(self, repo: pathlib.Path) -> None: slug = "x" * _MAX_SLUG_LEN with patch(_PUSH_TARGET, return_value=_push_ok()): result = runner.invoke(cli, self._push_args(slug=slug)) assert result.exit_code == 0 def test_push_owner_too_long_json_error(self, repo: pathlib.Path) -> None: owner = "x" * (_MAX_OWNER_LEN + 1) result = runner.invoke(cli, self._push_args(owner=owner) + ["--json"]) assert result.exit_code == 1 data = json.loads(result.output.strip()) assert data["status"] == "bad_args" def test_pull_owner_too_long_exits_1(self, repo: pathlib.Path) -> None: owner = "x" * (_MAX_OWNER_LEN + 1) result = runner.invoke(cli, self._pull_args(owner=owner)) assert result.exit_code == 1 def test_pull_slug_too_long_exits_1(self, repo: pathlib.Path) -> None: slug = "x" * (_MAX_SLUG_LEN + 1) result = runner.invoke(cli, self._pull_args(slug=slug)) assert result.exit_code == 1 def test_pull_since_id_negative_exits_1(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, self._pull_args(extra=["--since-id", "-1"])) assert result.exit_code == 1 def test_pull_since_id_negative_json_error(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, self._pull_args(extra=["--since-id", "-1", "--json"])) assert result.exit_code == 1 data = json.loads(result.output.strip()) assert data["status"] == "bad_args" def test_pull_limit_zero_exits_1(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, self._pull_args(extra=["--limit", "0"])) assert result.exit_code == 1 def test_pull_limit_over_max_exits_1(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT + 1)])) assert result.exit_code == 1 def test_pull_limit_at_min_accepted(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, self._pull_args(extra=["--limit", "1"])) assert result.exit_code == 0 def test_pull_limit_at_max_accepted(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT)])) assert result.exit_code == 0 def test_pull_limit_over_max_json_error(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, self._pull_args(extra=["--limit", str(_MAX_PULL_LIMIT + 1), "--json"]), ) assert result.exit_code == 1 data = json.loads(result.output.strip()) assert data["status"] == "bad_args" def test_push_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """Validation must not touch filesystem — no repo needed.""" monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) # no .muse dir owner = "x" * (_MAX_OWNER_LEN + 1) result = runner.invoke(cli, [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", owner, "--slug", "myrepo", ]) assert result.exit_code == 1 def test_pull_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) # no .muse dir result = runner.invoke(cli, [ "coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", "myrepo", "--since-id", "-5", ]) assert result.exit_code == 1 # ── JSON schema: schema_version and duration_ms ────────────────────────── class TestSyncJsonSchema: def test_push_json_includes_schema_version(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "schema" in data def test_push_json_includes_duration_ms(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_push_json_no_records_includes_elapsed(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET): result = runner.invoke(cli, _PUSH_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "duration_ms" in data def test_pull_json_includes_schema_version(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)): result = runner.invoke(cli, _PULL_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "schema" in data def test_pull_json_includes_duration_ms(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok(cursor=3)): result = runner.invoke(cli, _PULL_ARGS + ["--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_push_text_includes_elapsed(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS) assert result.exit_code == 0 # elapsed is formatted as (Ns) assert "s)" in result.output def test_pull_text_includes_elapsed(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok(cursor=1)): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 assert "s)" in result.output # ── Gather all 7 kinds ──────────────────────────────────────────────────────── class TestGatherAllKinds: def _write_intent(self, repo: pathlib.Path) -> str: d = coordination_dir(repo) / "intents" d.mkdir(parents=True, exist_ok=True) iid = _new_id() data = {"intent_id": iid, "run_id": "agent", "expires_at": None} (d / f"{iid}.json").write_text(json.dumps(data)) return iid def _write_release(self, repo: pathlib.Path) -> str: d = coordination_dir(repo) / "releases" d.mkdir(parents=True, exist_ok=True) rid = _new_id() data = {"release_id": rid, "run_id": "agent"} (d / f"{rid}.json").write_text(json.dumps(data)) return rid def _write_dependency(self, repo: pathlib.Path) -> str: d = coordination_dir(repo) / "dependencies" d.mkdir(parents=True, exist_ok=True) rid = _new_id() data = {"reservation_id": rid} (d / f"{rid}.json").write_text(json.dumps(data)) return rid def _write_task(self, repo: pathlib.Path) -> str: d = coordination_dir(repo) / "tasks" d.mkdir(parents=True, exist_ok=True) tid = _new_id() data = {"task_id": tid, "run_id": "creator"} (d / f"{tid}.json").write_text(json.dumps(data)) return tid def test_all_kinds_gathered(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records _write_local_reservation(repo) _write_local_heartbeat(repo, "hb-1") self._write_intent(repo) self._write_release(repo) self._write_dependency(repo) self._write_task(repo) _write_local_claim(repo) records = _gather_local_records(repo, kinds=list(_ALL_KINDS)) kinds_found = {r["kind"] for r in records} assert kinds_found == set(_ALL_KINDS) def test_each_kind_has_correct_record_id(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records rid = _write_local_reservation(repo) records = _gather_local_records(repo, kinds=["reservation"]) assert records[0]["record_id"] == rid def test_release_has_none_expires_at(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records self._write_release(repo) records = _gather_local_records(repo, kinds=["release"]) assert records[0]["expires_at"] is None def test_dependency_has_none_expires_at(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records self._write_dependency(repo) records = _gather_local_records(repo, kinds=["dependency"]) assert records[0]["expires_at"] is None def test_task_has_none_expires_at(self, repo: pathlib.Path) -> None: from muse.cli.commands.coord_sync import _gather_local_records self._write_task(repo) records = _gather_local_records(repo, kinds=["task"]) assert records[0]["expires_at"] is None # ── Stress tests ────────────────────────────────────────────────────────────── class TestSyncStress: def test_push_600_records_batched(self, repo: pathlib.Path) -> None: """600 records must be split across ≥ 2 batches of MAX_PUSH_BATCH.""" from muse.core.coord_bus import MAX_PUSH_BATCH for i in range(600): _write_local_reservation(repo, run_id=f"agent-{i}") call_count = 0 def fake_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: nonlocal call_count call_count += 1 assert len(batch) <= MAX_PUSH_BATCH return {"inserted": len(batch), "skipped": 0} with patch(_PUSH_TARGET, side_effect=fake_push): result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation"]) assert result.exit_code == 0 assert call_count >= 2 def test_push_600_records_inserted_count_correct(self, repo: pathlib.Path) -> None: for i in range(600): _write_local_reservation(repo, run_id=f"agent-{i}") with patch(_PUSH_TARGET, return_value=_push_ok(inserted=1, skipped=0)) as mock: result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation", "--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["total"] == 600 def test_pull_1000_records_all_written(self, repo: pathlib.Path) -> None: records = [ {"kind": "reservation", "record_id": _new_id(), "payload": {}} for _ in range(1000) ] with patch(_PULL_TARGET, return_value=_pull_ok(records, cursor=1000)): result = runner.invoke(cli, _PULL_ARGS + ["--limit", "1000"]) assert result.exit_code == 0 remote_dir = coordination_dir(repo) / "remote" / "reservation" written = list(remote_dir.glob("*.json")) assert len(written) == 1000 def test_pull_mixed_invalid_records_skipped(self, repo: pathlib.Path) -> None: """Records with invalid kind/record_id are skipped; valid ones still written.""" good = {"kind": "reservation", "record_id": "good-record-1", "payload": {}} bad_kind = {"kind": "../traversal", "record_id": "malicious-id", "payload": {}} bad_id = {"kind": "reservation", "record_id": "../etc/passwd", "payload": {}} with patch(_PULL_TARGET, return_value=_pull_ok([good, bad_kind, bad_id], cursor=3)): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 good_file = coordination_dir(repo) / "remote" / "reservation" / "good-record-1.json" assert good_file.exists() # Only 1 file written (the good record) remote_dir = coordination_dir(repo) / "remote" all_files = list(remote_dir.rglob("*.json")) assert len(all_files) == 1 def test_push_partial_failure_reports_failed_true(self, repo: pathlib.Path) -> None: """If one batch fails, failed=True in JSON even if other batches succeed.""" from muse.core.coord_bus import MAX_PUSH_BATCH # Write enough for 2 batches for i in range(MAX_PUSH_BATCH + 1): _write_local_reservation(repo, run_id=f"agent-{i}") call_count = 0 def sometimes_fail(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: nonlocal call_count call_count += 1 if call_count == 1: raise CoordBusError("first batch failed") return {"inserted": len(batch), "skipped": 0} with patch(_PUSH_TARGET, side_effect=sometimes_fail): result = runner.invoke( cli, _PUSH_ARGS + ["--kinds", "reservation", "--json"] ) assert result.exit_code == 1 # Find JSON line (error from first batch goes to stdout too in JSON mode) json_lines = [ln for ln in result.output.splitlines() if ln.startswith("{")] final = json.loads(json_lines[-1]) assert final["failed"] is True # --------------------------------------------------------------------------- # Extended — muse coord sync push # --------------------------------------------------------------------------- class TestCoordSyncPushExtended: def test_j_alias_works(self, repo: pathlib.Path) -> None: """-j is equivalent to --json for push.""" with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0, result.output data = json.loads(result.output.strip()) assert "inserted" in data def test_help_flag(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) assert result.exit_code == 0 def test_json_compact_single_line(self, repo: pathlib.Path) -> None: """JSON output is a single compact line — no indent=2.""" _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(lines) == 1, f"Expected compact JSON, got: {result.output!r}" def test_json_all_required_fields(self, repo: pathlib.Path) -> None: """JSON always has schema_version, inserted, skipped, total, failed, duration_ms.""" _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) for field in ("schema", "inserted", "skipped", "total", "failed", "duration_ms"): assert field in data, f"Missing field: {field}" def test_json_inserted_is_int(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["inserted"], int) assert isinstance(data["skipped"], int) assert isinstance(data["total"], int) def test_json_failed_is_bool(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["failed"], bool) assert data["failed"] is False def test_json_duration_ms_is_number(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_json_schema_is_int(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, return_value=_push_ok(0, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["schema"], int) assert data["schema"] >= 1 def test_json_total_matches_gathered_records(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(2, 0)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["total"] == 2 def test_json_no_records_total_is_zero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["total"] == 0 assert data["inserted"] == 0 assert data["skipped"] == 0 def test_idempotent_second_push_all_skipped(self, repo: pathlib.Path) -> None: """Second push: all records skipped (hub already has them).""" _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(0, 1)): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["skipped"] == 1 assert data["inserted"] == 0 def test_all_7_kinds_pushed(self, repo: pathlib.Path) -> None: """Push with all 7 kinds gathers records from every kind directory.""" coord_dir = coordination_dir(repo) kind_dirs = { "reservations": ("reservation_id", "run_id"), "heartbeats": ("run_id", "run_id"), "intents": ("intent_id", "run_id"), "releases": ("release_id", "run_id"), "dependencies": ("reservation_id", "reservation_id"), "tasks": ("task_id", "run_id"), "claims": ("task_id", "claimer_run_id"), } for subdir, (id_field, run_field) in kind_dirs.items(): d = coord_dir / subdir d.mkdir(parents=True, exist_ok=True) rid = _new_id() (d / f"{rid}.json").write_text(json.dumps({id_field: rid, run_field: "r1"})) with patch(_PUSH_TARGET, return_value=_push_ok(7, 0)) as mock_push: result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["total"] == 7 def test_text_output_shows_owner_slug(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS) assert "gabriel" in result.output assert "myrepo" in result.output def test_text_output_shows_checkmark_on_success(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, _PUSH_ARGS) assert "✅" in result.output def test_text_output_shows_cross_on_failure(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, side_effect=CoordBusError("hub down")): result = runner.invoke(cli, _PUSH_ARGS) assert "❌" in result.stderr or result.exit_code == 1 def test_help_shows_agent_quickstart(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) assert "Agent quickstart" in result.output def test_help_shows_json_schema(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) assert "JSON output schema" in result.output def test_help_shows_exit_codes(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "push", "--help"]) assert "Exit codes" in result.output # --------------------------------------------------------------------------- # Security — muse coord sync push # --------------------------------------------------------------------------- class TestCoordSyncPushSecurity: def test_ansi_in_owner_sanitized_in_text_output(self, repo: pathlib.Path) -> None: """ANSI codes in --owner must not bleed into text output.""" _write_local_reservation(repo) ansi_owner = "\x1b[31mmalicious\x1b[0m" push_args = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", ansi_owner, "--slug", "myrepo", ] with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, push_args) assert "\x1b" not in result.output def test_ansi_in_slug_sanitized_in_text_output(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) ansi_slug = "\x1b[32minjected\x1b[0m" push_args = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", ansi_slug, ] with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, push_args) assert "\x1b" not in result.output def test_token_not_in_json_output(self, repo: pathlib.Path) -> None: """Auth token must never appear in JSON output.""" _write_local_reservation(repo) secret = "super-secret-token-xyz" push_args = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", "myrepo", "--json", ] with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, push_args) assert secret not in result.output def test_token_not_in_text_output(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) secret = "super-secret-token-abc" push_args = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", "myrepo", ] with patch(_PUSH_TARGET, return_value=_push_ok(1, 0)): result = runner.invoke(cli, push_args) assert secret not in result.output def test_no_traceback_on_coord_bus_error(self, repo: pathlib.Path) -> None: _write_local_reservation(repo) with patch(_PUSH_TARGET, side_effect=CoordBusError("network failure")): result = runner.invoke(cli, _PUSH_ARGS) assert "Traceback" not in result.output def test_owner_length_cap_before_io(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """Owner length check fires before any file system access.""" monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) long_owner = "x" * (_MAX_OWNER_LEN + 1) push_args = [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", long_owner, "--slug", "myrepo", ] result = runner.invoke(cli, push_args) assert result.exit_code == 1 assert "Traceback" not in result.output # --------------------------------------------------------------------------- # Stress — muse coord sync push # --------------------------------------------------------------------------- class TestCoordSyncPushStress: def test_50_sequential_push_calls_no_records(self, repo: pathlib.Path) -> None: """50 sequential pushes with no records all exit 0.""" for i in range(50): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0, f"Call {i}: {result.output}" data = json.loads(result.output.strip()) assert data["total"] == 0 def test_push_1200_records_correct_batch_count(self, repo: pathlib.Path) -> None: """1200 records → ceil(1200/500) = 3 batches.""" from muse.core.coord_bus import MAX_PUSH_BATCH for i in range(1200): _write_local_reservation(repo, run_id=f"agent-{i}") call_count = 0 def counting_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: nonlocal call_count call_count += 1 return {"inserted": len(batch), "skipped": 0} with patch(_PUSH_TARGET, side_effect=counting_push): result = runner.invoke(cli, _PUSH_ARGS + ["--kinds", "reservation", "-j"]) assert result.exit_code == 0 expected_batches = -(-1200 // MAX_PUSH_BATCH) # ceil division assert call_count == expected_batches data = json.loads(result.output.strip()) assert data["inserted"] == 1200 def test_concurrent_push_8_threads(self, repo: pathlib.Path) -> None: """8 threads each call run_push directly; patches applied at test level. The goal is to verify that concurrent calls to run_push do not crash or corrupt internal state. All threads share the same repo fixture; per-thread module mutation is intentionally avoided here because unguarded write-then-restore of a module attribute across threads is a race condition that can leave the module permanently patched after the test completes, polluting later tests. """ import argparse import threading from muse.cli.commands.coord_sync import run_push _write_local_reservation(repo, run_id="shared-agent") errors: list[str] = [] def worker(idx: int) -> None: args = argparse.Namespace( hub="https://localhost:1337", owner="gabriel", slug="myrepo", signing=None, kinds=list(_ALL_KINDS), json_out=True, ) try: run_push(args) except SystemExit as exc: if exc.code != 0: errors.append(f"Thread {idx}: exit {exc.code}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") def fake_push(hub: str, owner: str, slug: str, batch: list[JsonDict], token: SigningIdentity | None) -> MsgpackDict: return {"inserted": len(batch), "skipped": 0} push_p = patch(_PUSH_TARGET, side_effect=fake_push) repo_p = patch("muse.cli.commands.coord_sync.require_repo", return_value=repo) push_p.start() repo_p.start() try: threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() finally: push_p.stop() repo_p.stop() assert not errors, f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Extended — muse coord sync pull # --------------------------------------------------------------------------- class TestCoordSyncPullExtended: def test_j_alias_works(self, repo: pathlib.Path) -> None: """-j is equivalent to --json for pull.""" with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 0, result.output data = json.loads(result.output.strip()) assert "count" in data def test_help_flag(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) assert result.exit_code == 0 def test_json_compact_single_line(self, repo: pathlib.Path) -> None: """JSON output is a single compact line — no indent=2.""" with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 0 lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(lines) == 1, f"Expected compact JSON, got: {result.output!r}" def test_json_all_required_fields(self, repo: pathlib.Path) -> None: """JSON always has schema, count, cursor, records, duration_ms.""" with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) for field in ("schema", "count", "cursor", "records", "duration_ms"): assert field in data, f"Missing field: {field}" def test_json_count_is_int(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["count"], int) assert isinstance(data["cursor"], int) def test_json_records_is_list(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["records"], list) def test_json_duration_ms_is_number(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_json_schema_is_int(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert isinstance(data["schema"], int) assert data["schema"] >= 1 def test_json_count_matches_records_length(self, repo: pathlib.Path) -> None: fake_records = [ {"kind": "reservation", "record_id": _new_id(), "run_id": "r1", "payload": {}}, {"kind": "reservation", "record_id": _new_id(), "run_id": "r2", "payload": {}}, ] with patch(_PULL_TARGET, return_value=_pull_ok(fake_records, cursor=2)): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["count"] == 2 assert len(data["records"]) == 2 def test_json_cursor_reflects_hub_cursor(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=42)): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["cursor"] == 42 def test_zero_records_exits_0(self, repo: pathlib.Path) -> None: """0 records returned is a valid success.""" with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["count"] == 0 def test_incremental_pull_since_id_forwarded(self, repo: pathlib.Path) -> None: """--since-id is passed through to pull_from_hub.""" with patch(_PULL_TARGET, return_value=_pull_ok()) as mock_pull: runner.invoke(cli, _PULL_ARGS + ["--since-id", "99"]) _, kwargs = mock_pull.call_args assert mock_pull.call_args[0][3] == 99 or kwargs.get("since_id") == 99 or mock_pull.call_args[0][3] == 99 def test_records_written_to_remote_dir(self, repo: pathlib.Path) -> None: """Records returned by hub are written to .muse/coordination/remote/.""" rid = _new_id() fake_records = [{"kind": "reservation", "record_id": rid, "run_id": "r1", "payload": {}}] with patch(_PULL_TARGET, return_value=_pull_ok(fake_records)): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 written = coordination_dir(repo) / "remote" / "reservation" / f"{rid}.json" assert written.exists() def test_text_output_shows_owner_slug(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, _PULL_ARGS) assert "gabriel" in result.output assert "myrepo" in result.output def test_text_output_shows_cursor(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=7)): result = runner.invoke(cli, _PULL_ARGS) assert "7" in result.output def test_text_output_shows_remote_path_when_records(self, repo: pathlib.Path) -> None: rid = _new_id() fake_records = [{"kind": "reservation", "record_id": rid, "run_id": "r1", "payload": {}}] with patch(_PULL_TARGET, return_value=_pull_ok(fake_records)): result = runner.invoke(cli, _PULL_ARGS) assert "remote" in result.output.lower() def test_help_shows_agent_quickstart(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) assert "Agent quickstart" in result.output def test_help_shows_json_schema(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) assert "JSON output schema" in result.output def test_help_shows_exit_codes(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["coord", "sync", "pull", "--help"]) assert "Exit codes" in result.output # --------------------------------------------------------------------------- # Security — muse coord sync pull # --------------------------------------------------------------------------- class TestCoordSyncPullSecurity: def test_ansi_in_owner_sanitized_in_text_output(self, repo: pathlib.Path) -> None: """ANSI codes in --owner must not bleed into text output.""" ansi_owner = "\x1b[31mmalicious\x1b[0m" pull_args = [ "coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", ansi_owner, "--slug", "myrepo", "--since-id", "0", ] with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, pull_args) assert "\x1b" not in result.output def test_ansi_in_slug_sanitized_in_text_output(self, repo: pathlib.Path) -> None: ansi_slug = "\x1b[32minjected\x1b[0m" pull_args = [ "coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", ansi_slug, "--since-id", "0", ] with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, pull_args) assert "\x1b" not in result.output def test_token_not_in_json_output(self, repo: pathlib.Path) -> None: """Auth token must never appear in JSON output.""" secret = "super-secret-pull-token" pull_args = [ "coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", "myrepo", "--since-id", "0", "--json", ] with patch(_PULL_TARGET, return_value=_pull_ok()): result = runner.invoke(cli, pull_args) assert secret not in result.output def test_no_traceback_on_coord_bus_error(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, side_effect=CoordBusError("timeout")): result = runner.invoke(cli, _PULL_ARGS) assert "Traceback" not in result.output def test_remote_records_with_traversal_record_id_skipped(self, repo: pathlib.Path) -> None: """A record with path-traversal record_id must not escape remote/.""" malicious_records = [ {"kind": "reservation", "record_id": "../../malicious", "run_id": "r1", "payload": {}}, ] with patch(_PULL_TARGET, return_value=_pull_ok(malicious_records)): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 malicious_path = coordination_dir(repo) / "remote" / "reservation" / "../../malicious.json" assert not malicious_path.exists() # Confirm nothing was written at all remote_dir = coordination_dir(repo) / "remote" if remote_dir.exists(): assert list(remote_dir.rglob("*.json")) == [] def test_remote_records_with_unknown_kind_skipped(self, repo: pathlib.Path) -> None: """A record with an unknown kind must not be written anywhere.""" malicious_records = [ {"kind": "../malicious_dir", "record_id": "safe-id", "run_id": "r1", "payload": {}}, ] with patch(_PULL_TARGET, return_value=_pull_ok(malicious_records)): result = runner.invoke(cli, _PULL_ARGS) assert result.exit_code == 0 remote_dir = coordination_dir(repo) / "remote" if remote_dir.exists(): assert list(remote_dir.rglob("*.json")) == [] # --------------------------------------------------------------------------- # Stress — muse coord sync pull # --------------------------------------------------------------------------- class TestCoordSyncPullStress: def test_50_sequential_pull_calls_no_records(self, repo: pathlib.Path) -> None: """50 sequential pulls with no records all exit 0.""" for i in range(50): with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=i)): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 0, f"Call {i}: {result.output}" data = json.loads(result.output.strip()) assert data["count"] == 0 def test_incremental_cursor_chain_100_steps(self, repo: pathlib.Path) -> None: """100 incremental pulls each use the cursor from the previous step.""" cursor = 0 for i in range(100): rid = _new_id() fake_records = [{"kind": "reservation", "record_id": rid, "run_id": f"r{i}", "payload": {}}] with patch(_PULL_TARGET, return_value=_pull_ok(fake_records, cursor=cursor + 1)): args = _PULL_ARGS + ["--since-id", str(cursor), "-j"] result = runner.invoke(cli, args) assert result.exit_code == 0, f"Step {i}: {result.output}" data = json.loads(result.output.strip()) cursor = data["cursor"] assert cursor == 100 def test_concurrent_pull_8_threads(self, repo: pathlib.Path) -> None: """8 threads each call run_pull concurrently; patches applied at test level.""" import argparse import threading from muse.cli.commands.coord_sync import run_pull errors: list[str] = [] def worker(idx: int) -> None: args = argparse.Namespace( hub="https://localhost:1337", owner="gabriel", slug="myrepo", signing=None, since_id=0, kinds=[], limit=500, json_out=True, ) try: run_pull(args) except SystemExit as exc: if exc.code != 0: errors.append(f"Thread {idx}: exit {exc.code}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") pull_p = patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)) repo_p = patch("muse.cli.commands.coord_sync.require_repo", return_value=repo) pull_p.start() repo_p.start() try: threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() finally: pull_p.stop() repo_p.stop() assert not errors, f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # TestRegisterFlags — --json / -j normalized at argparse level # --------------------------------------------------------------------------- class TestRegisterFlags: """register() must expose --json/-j with dest=json_out on pull and push.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse as ap from muse.cli.commands.coord_sync import register root = ap.ArgumentParser() subs = root.add_subparsers() register(subs) return root # --json/-j lives on the pull/push sub-subparsers, not on sync itself. _PULL_REQUIRED = ["sync", "pull", "--owner", "o", "--slug", "s"] def test_json_out_default_false(self) -> None: p = self._make_parser() ns = p.parse_args(self._PULL_REQUIRED) assert ns.json_out is False def test_json_out_true_with_json_flag(self) -> None: p = self._make_parser() ns = p.parse_args(self._PULL_REQUIRED + ["--json"]) assert ns.json_out is True def test_json_out_true_with_j_flag(self) -> None: p = self._make_parser() ns = p.parse_args(self._PULL_REQUIRED + ["-j"]) assert ns.json_out is True