"""EXTREME data integrity test suite for ``muse coord sync`` — the coord layer. Scenario: Linus Torvalds ports the Linux kernel to Muse. 50 AI agents are simultaneously reading and writing coordination records across 7 kinds. Thousands of files. Network blips. Corrupted disk sectors. Adversarial hubs. Concurrent writers. This suite attempts to find the edge and go beyond it. Coverage matrix --------------- Round-trip fidelity * All 7 kinds survive _gather → _write with bit-exact field preservation * Unicode in payloads (CJK, emoji, RTL Arabic) — no mojibake * Deeply nested payloads — no truncation or type coercion * Null / empty / boolean payload values — exact type preservation * 50 KB payload string — no truncation * record_uuid fallback chain: explicit field → fpath.stem Batching correctness * Exactly MAX_PUSH_BATCH (500) records → exactly 1 push_to_hub call * 501 records → exactly 2 push_to_hub calls * 1 000 records → exactly 2 calls (500 each) * 1 001 records → exactly 3 calls (500, 500, 1) * 4 999 records (Linux-scale) → 10 calls (9×500 + 1×499) * First batch fails, remaining batches still execute * All batches fail → inserted=0, failed=True, exit 1 * Batch slices are non-overlapping and cover all records exactly * accumulated inserted/skipped counts are sum of all batch results Cursor correctness * cursor=0 when pull returns 0 records * cursor equals the id of the last returned record * cursor in JSON output matches cursor in pull_from_hub return value * Incremental pull chain: 5 pages, cursor advances, no gaps in record IDs * since_id passed to pull_from_hub verbatim * Page-chained full pull reconstructs all 1 000 records with zero duplicates Resilience * Corrupt JSON file skipped — other files in same dir still gathered * Binary garbage file skipped * NUL byte in JSON skipped * File deleted between glob() and read_text() — skipped gracefully * All files corrupt → 0 records → "(no local coordination records to push)" * Partial corruption: 500 good + 500 corrupt → 500 records pushed * Disk full on _write_remote_records → OSError propagates (does not swallow) * OSError on mkdir propagates * PermissionError on write propagates * Second-pass gather after first corrupt pass finds good files added later Concurrency * 16 threads call _gather_local_records simultaneously → each sees same count * 8 threads write distinct UUIDs to same remote dir → all files present at end * 8 threads write same UUID → final file is valid JSON (last-writer-wins, not corrupt) * run_push called by 8 threads concurrently → no crashes * run_pull called by 8 threads concurrently → no crashes Linux-scale * 7 000 records (1 000 per kind) gathered correctly * 7 000 records batched and pushed in 14 calls, all counted * 1 000-record pull writes all 1 000 files to remote dir * Mixed kinds: push 1 000 reservations + 1 000 heartbeats → 2 000 total * 50 sequential push + pull cycles, total inserted/pulled = 50×N each Response bounds * Hub response missing "inserted" key → defaults to 0 (no KeyError crash) * Hub response missing "skipped" key → defaults to 0 * Hub response with extra unknown keys → silently ignored * Malformed JSON response → CoordBusError raised * Empty response body → CoordBusError raised * Response at exactly MAX_COORD_RESPONSE_BYTES is accepted by _post_json * Response one byte over MAX_COORD_RESPONSE_BYTES → CoordBusError Filesystem safety * record_uuid = "../../../etc/passwd" → skipped, no file written outside remote/ * record_uuid with null byte → skipped * record_uuid with 128 chars (max) → accepted, file written * record_uuid with 129 chars (over max) → rejected * kind = "../evil" → skipped * kind = "" → skipped * remote dir created automatically if absent * Existing remote file overwritten with updated payload * Written files are valid JSON (parseable with json.loads) * Written files end with newline Idempotency * _gather_local_records is pure: same dir always returns same records * Push same 500 records twice → second push all skipped, inserted=0 * Push 250 old + 250 new → inserted=250, skipped=250 * _write_remote_records same UUID twice → second write overwrites cleanly * Pull with since_id=cursor returns 0 records (no replay) Token safety * Token never appears in JSON success output (push) * Token never appears in JSON success output (pull) * Token never appears in JSON error output (push CoordBusError) * Token never appears in JSON error output (pull CoordBusError) * Token never appears in text mode success output """ from __future__ import annotations import argparse import io import json import pathlib import threading import uuid from unittest.mock import MagicMock, call, patch import pytest from muse.core._types import MsgpackDict, MsgpackValue from tests.cli_test_helper import CliRunner from muse.core.coord_bus import CoordBusError, MAX_PUSH_BATCH, MAX_PULL_LIMIT from muse.cli.commands.coord_sync import ( _ALL_KINDS, _MAX_OWNER_LEN, _MAX_PULL_LIMIT, _MAX_SLUG_LEN, _SAFE_RECORD_UUID_RE, _gather_local_records, _write_remote_records, run_pull, run_push, ) runner = CliRunner() cli = None # CliRunner accepts cli=None after argparse migration # --------------------------------------------------------------------------- # Patch targets # --------------------------------------------------------------------------- _PUSH_TARGET = "muse.cli.commands.coord_sync.push_to_hub" _PULL_TARGET = "muse.cli.commands.coord_sync.pull_from_hub" _REQUIRE_REPO = "muse.cli.commands.coord_sync.require_repo" _RESOLVE_HUB = "muse.cli.commands.coord_sync._resolve_hub_and_signing" # --------------------------------------------------------------------------- # Common args # --------------------------------------------------------------------------- _BASE_ARGS = [ "--hub", "http://localhost:10003", "--owner", "gabriel", "--slug", "linux", ] _PUSH_ARGS = ["coord", "sync", "push"] + _BASE_ARGS _PULL_ARGS = ["coord", "sync", "pull"] + _BASE_ARGS + ["--since-id", "0"] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Create a minimal .muse repo layout.""" (tmp_path / ".muse").mkdir() return tmp_path def _coord_dir(root: pathlib.Path) -> pathlib.Path: return root / ".muse" / "coordination" def _write_records(root: pathlib.Path, kind: str, records: list[MsgpackDict]) -> None: """Write coordination records as JSON files in the appropriate subdir.""" subdir_map = { "reservation": "reservations", "intent": "intents", "release": "releases", "heartbeat": "heartbeats", "dependency": "dependencies", "task": "tasks", "claim": "claims", } subdir = _coord_dir(root) / subdir_map[kind] subdir.mkdir(parents=True, exist_ok=True) for rec in records: fname = rec.get( { "reservation": "reservation_id", "intent": "intent_id", "release": "release_id", "heartbeat": "run_id", "dependency": "reservation_id", "task": "task_id", "claim": "task_id", }[kind], str(uuid.uuid4()), ) (subdir / f"{fname}.json").write_text(json.dumps(rec), encoding="utf-8") def _res(uid: str | None = None, **extra: MsgpackValue) -> MsgpackDict: uid = uid or str(uuid.uuid4()) return {"reservation_id": uid, "run_id": f"run-{uid}", "expires_at": None, **extra} def _hb(uid: str | None = None, **extra: MsgpackValue) -> MsgpackDict: uid = uid or str(uuid.uuid4()) return {"run_id": uid, "expires_at": None, **extra} def _task(uid: str | None = None, **extra: MsgpackValue) -> MsgpackDict: uid = uid or str(uuid.uuid4()) return {"task_id": uid, "run_id": f"run-{uid}", **extra} def _push_ok(inserted: int = 1, skipped: int = 0) -> MsgpackDict: return {"inserted": inserted, "skipped": skipped} def _pull_ok( records: list[MsgpackDict] | None = None, cursor: int = 1, ) -> MsgpackDict: recs = records if records is not None else [] return {"records": recs, "cursor": cursor} @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: return _make_repo(tmp_path) # =========================================================================== # 1. ROUND-TRIP FIDELITY # =========================================================================== class TestCoordIntegrityRoundTrip: """Records written to disk then gathered must be bit-exact.""" def test_reservation_fields_preserved(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) rec = {"reservation_id": uid, "run_id": "r1", "expires_at": "2099-01-01T00:00:00Z", "custom": "value"} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert len(gathered) == 1 g = gathered[0] assert g["kind"] == "reservation" assert g["record_uuid"] == uid assert g["run_id"] == "r1" assert g["expires_at"] == "2099-01-01T00:00:00Z" assert g["payload"]["custom"] == "value" def test_all_seven_kinds_gather_correctly(self, repo: pathlib.Path) -> None: """One file per kind — all 7 must appear with correct kind field.""" for kind in _ALL_KINDS: if kind == "reservation": rec = {"reservation_id": str(uuid.uuid4()), "run_id": "r"} elif kind == "intent": rec = {"intent_id": str(uuid.uuid4()), "run_id": "r"} elif kind == "release": rec = {"release_id": str(uuid.uuid4()), "run_id": "r"} elif kind == "heartbeat": rec = {"run_id": str(uuid.uuid4())} elif kind == "dependency": rec = {"reservation_id": str(uuid.uuid4()), "run_id": "r"} elif kind == "task": rec = {"task_id": str(uuid.uuid4()), "run_id": "r"} elif kind == "claim": rec = {"task_id": str(uuid.uuid4()), "claimer_run_id": "cr", "run_id": "r"} _write_records(repo, kind, [rec]) gathered = _gather_local_records(repo, list(_ALL_KINDS)) found_kinds = {g["kind"] for g in gathered} assert found_kinds == set(_ALL_KINDS), f"Missing kinds: {set(_ALL_KINDS) - found_kinds}" def test_unicode_payload_cjk_no_mojibake(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) chinese = "作曲家は天才だ" # "The composer is a genius" rec = {"reservation_id": uid, "run_id": "r", "label": chinese} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["label"] == chinese def test_unicode_payload_emoji_no_mojibake(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) notes = "🎵🎼🎹🎸🥁" rec = {"reservation_id": uid, "run_id": "r", "notes": notes} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["notes"] == notes def test_unicode_payload_arabic_rtl(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) arabic = "مؤلف موسيقي" rec = {"reservation_id": uid, "run_id": "r", "composer": arabic} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["composer"] == arabic def test_deeply_nested_payload_preserved(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) deep = {"a": {"b": {"c": {"d": {"e": {"f": "leaf"}}}}}} rec = {"reservation_id": uid, "run_id": "r", "meta": deep} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["meta"]["a"]["b"]["c"]["d"]["e"]["f"] == "leaf" def test_null_values_in_payload_preserved(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) rec = {"reservation_id": uid, "run_id": "r", "nullable": None, "also": None} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["nullable"] is None assert gathered[0]["payload"]["also"] is None def test_boolean_payload_values_not_coerced_to_strings(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) rec = {"reservation_id": uid, "run_id": "r", "flag": True, "other": False} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["flag"] is True assert gathered[0]["payload"]["other"] is False def test_integer_payload_values_not_coerced(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) rec = {"reservation_id": uid, "run_id": "r", "count": 42, "pi": 3.14159} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["count"] == 42 assert abs(gathered[0]["payload"]["pi"] - 3.14159) < 1e-9 def test_50kb_payload_not_truncated(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) big_string = "X" * 50_000 rec = {"reservation_id": uid, "run_id": "r", "blob": big_string} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert len(gathered[0]["payload"]["blob"]) == 50_000 def test_list_payload_values_preserved(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) tags = ["linux", "kernel", "vcs", "muse"] rec = {"reservation_id": uid, "run_id": "r", "tags": tags} _write_records(repo, "reservation", [rec]) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["payload"]["tags"] == tags def test_record_uuid_fallback_to_fpath_stem(self, repo: pathlib.Path) -> None: """When reservation_id is absent, record_uuid falls back to file stem.""" stem = "fallback-stem-abc123" subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) (subdir / f"{stem}.json").write_text( json.dumps({"run_id": "r"}), encoding="utf-8" ) gathered = _gather_local_records(repo, ["reservation"]) assert gathered[0]["record_uuid"] == stem def test_claim_uses_claimer_run_id_for_run_id(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) rec = {"task_id": uid, "claimer_run_id": "claimer-run-99", "run_id": "other"} _write_records(repo, "claim", [rec]) gathered = _gather_local_records(repo, ["claim"]) assert gathered[0]["run_id"] == "claimer-run-99" def test_write_then_read_remote_roundtrip(self, repo: pathlib.Path) -> None: """_write_remote_records → re-read from disk = original record.""" uid = str(uuid.uuid4()) orig = {"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {"x": 99}, "expires_at": "2099-06-01T00:00:00Z"} _write_remote_records(repo, [orig]) target = repo / ".muse" / "coordination" / "remote" / "reservation" / f"{uid}.json" assert target.exists() loaded = json.loads(target.read_text(encoding="utf-8")) assert loaded["payload"]["x"] == 99 assert loaded["expires_at"] == "2099-06-01T00:00:00Z" def test_all_7_kinds_write_remote_and_read_back(self, repo: pathlib.Path) -> None: records = [] for kind in _ALL_KINDS: uid = str(uuid.uuid4()) records.append({"kind": kind, "record_uuid": uid, "run_id": "r", "payload": {"kind_was": kind}, "expires_at": None}) _write_remote_records(repo, records) for rec in records: path = ( repo / ".muse" / "coordination" / "remote" / rec["kind"] / f"{rec['record_uuid']}.json" ) assert path.exists(), f"Missing: {path}" loaded = json.loads(path.read_text()) assert loaded["payload"]["kind_was"] == rec["kind"] # =========================================================================== # 2. BATCHING CORRECTNESS # =========================================================================== class TestCoordIntegrityBatching: """Correct batch splitting, counting, and failure isolation.""" def _make_n_reservations(self, repo: pathlib.Path, n: int) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) for _ in range(n): uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) def test_exactly_500_records_is_one_batch(self, repo: pathlib.Path) -> None: self._make_n_reservations(repo, 500) with patch(_PUSH_TARGET, return_value=_push_ok(500)) as mock_push, \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 assert mock_push.call_count == 1 data = json.loads(result.output.strip()) assert data["total"] == 500 def test_501_records_splits_into_two_batches(self, repo: pathlib.Path) -> None: self._make_n_reservations(repo, 501) batch_sizes: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: batch_sizes.append(len(records)) return _push_ok(len(records)) with patch(_PUSH_TARGET, side_effect=fake_push), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert len(batch_sizes) == 2 assert batch_sizes[0] == 500 assert batch_sizes[1] == 1 def test_1000_records_splits_into_two_batches(self, repo: pathlib.Path) -> None: self._make_n_reservations(repo, 1000) batch_sizes: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: batch_sizes.append(len(records)) return _push_ok(len(records)) with patch(_PUSH_TARGET, side_effect=fake_push), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert len(batch_sizes) == 2 assert all(s == 500 for s in batch_sizes) def test_1001_records_splits_into_three_batches(self, repo: pathlib.Path) -> None: self._make_n_reservations(repo, 1001) batch_sizes: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: batch_sizes.append(len(records)) return _push_ok(len(records)) with patch(_PUSH_TARGET, side_effect=fake_push), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert len(batch_sizes) == 3 assert batch_sizes[0] == 500 assert batch_sizes[1] == 500 assert batch_sizes[2] == 1 def test_4999_records_linux_scale_10_batches(self, repo: pathlib.Path) -> None: """Linux-scale: 4999 records → 9×500 + 1×499 = 10 calls.""" self._make_n_reservations(repo, 4999) batch_sizes: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: batch_sizes.append(len(records)) return _push_ok(len(records)) with patch(_PUSH_TARGET, side_effect=fake_push), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert len(batch_sizes) == 10 assert sum(batch_sizes) == 4999 assert batch_sizes[-1] == 499 def test_first_batch_fails_remaining_batches_still_execute(self, repo: pathlib.Path) -> None: """A single batch failure must not abort the whole push.""" self._make_n_reservations(repo, 1001) call_count = 0 def fail_first_then_ok(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: nonlocal call_count call_count += 1 if call_count == 1: raise CoordBusError("network hiccup", status_code=503) return _push_ok(len(records)) with patch(_PUSH_TARGET, side_effect=fail_first_then_ok), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 1 # failed=True → exit 1 # On batch error, _err() emits an error JSON line first; summary is the last line. last_line = [l for l in result.output.strip().splitlines() if l.strip()][-1] data = json.loads(last_line) assert data["failed"] is True assert data["inserted"] == 501 # batches 2 and 3 succeeded def test_all_batches_fail_inserted_is_zero(self, repo: pathlib.Path) -> None: self._make_n_reservations(repo, 600) with patch(_PUSH_TARGET, side_effect=CoordBusError("gone", status_code=503)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 1 last_line = [l for l in result.output.strip().splitlines() if l.strip()][-1] data = json.loads(last_line) assert data["inserted"] == 0 assert data["failed"] is True def test_batch_slices_are_non_overlapping_and_exhaustive(self, repo: pathlib.Path) -> None: """Every gathered record appears in exactly one batch — no duplicates, no gaps.""" n = 1200 self._make_n_reservations(repo, n) all_received: list[MsgpackDict] = [] def collect(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: all_received.extend(records) return _push_ok(len(records)) with patch(_PUSH_TARGET, side_effect=collect), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert len(all_received) == n # No duplicates (record_uuid uniqueness) uuids = [r["record_uuid"] for r in all_received] assert len(uuids) == len(set(uuids)), "Duplicate records in batches!" def test_inserted_skipped_counts_accumulated_across_batches(self, repo: pathlib.Path) -> None: """total inserted/skipped in JSON output = sum of all batch results.""" self._make_n_reservations(repo, 1100) def mixed(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: # Odd batches: half inserted, half skipped n = len(records) return _push_ok(n // 2, n - n // 2) with patch(_PUSH_TARGET, side_effect=mixed), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) # 3 batches: 500, 500, 100 → inserted = 250+250+50=550, skipped = 250+250+50=550 assert data["inserted"] + data["skipped"] == 1100 # =========================================================================== # 3. CURSOR CORRECTNESS # =========================================================================== class TestCoordIntegrityCursor: """Incremental pull semantics: cursor must advance, no gaps, no duplicates.""" def test_cursor_zero_when_no_records_returned(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["cursor"] == 0 def test_cursor_equals_last_record_id(self, repo: pathlib.Path) -> None: records = [ {"kind": "reservation", "record_uuid": str(uuid.uuid4()), "id": 42, "run_id": "r", "payload": {}}, ] with patch(_PULL_TARGET, return_value={"records": records, "cursor": 42}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["cursor"] == 42 def test_since_id_passed_through_verbatim(self, repo: pathlib.Path) -> None: """--since-id must be forwarded to pull_from_hub unchanged.""" with patch(_PULL_TARGET, return_value=_pull_ok([])) as mock_pull, \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): runner.invoke(cli, _PULL_ARGS + ["--since-id", "9999"]) positional = mock_pull.call_args[0] assert positional[3] == 9999 # since_id is 4th positional arg def test_incremental_5_pages_no_record_id_gaps(self, repo: pathlib.Path) -> None: """5 pages of 200 records each — all 1000 IDs must be contiguous.""" page_size = 200 all_ids: list[int] = [] cursor = 0 for page in range(5): start = page * page_size + 1 end = start + page_size records = [ {"kind": "reservation", "record_uuid": str(uuid.uuid4()), "id": i, "run_id": "r", "payload": {}} for i in range(start, end) ] new_cursor = end - 1 # Simulate: pull returns page of records, then we advance cursor with patch(_PULL_TARGET, return_value={"records": records, "cursor": new_cursor}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke( cli, _PULL_ARGS + ["--since-id", str(cursor), "-j"] ) data = json.loads(result.output.strip()) all_ids.extend(r["id"] for r in data["records"]) cursor = data["cursor"] assert len(all_ids) == 1000 assert all_ids == list(range(1, 1001)), "Gaps or non-contiguous IDs detected!" def test_page_chained_pull_zero_duplicates(self, repo: pathlib.Path) -> None: """Simulate 3 pages; each record UUID must appear exactly once.""" all_uuids: list[str] = [] cursor = 0 for page in range(3): page_records = [ {"kind": "heartbeat", "record_uuid": str(uuid.uuid4()), "run_id": f"r-{page}-{i}", "payload": {}} for i in range(100) ] new_cursor = (page + 1) * 100 with patch(_PULL_TARGET, return_value={"records": page_records, "cursor": new_cursor}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke( cli, _PULL_ARGS + ["--since-id", str(cursor), "-j"] ) data = json.loads(result.output.strip()) all_uuids.extend(r["record_uuid"] for r in data["records"]) cursor = data["cursor"] assert len(all_uuids) == len(set(all_uuids)), "Duplicate record UUIDs across pages!" def test_cursor_in_json_output_matches_hub_cursor(self, repo: pathlib.Path) -> None: expected_cursor = 77777 with patch(_PULL_TARGET, return_value={"records": [], "cursor": expected_cursor}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["cursor"] == expected_cursor def test_since_id_zero_returns_all_records_from_beginning(self, repo: pathlib.Path) -> None: records = [ {"kind": "task", "record_uuid": str(uuid.uuid4()), "run_id": "r", "payload": {"i": i}} for i in range(50) ] with patch(_PULL_TARGET, return_value={"records": records, "cursor": 50}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["--since-id", "0", "-j"]) data = json.loads(result.output.strip()) assert data["count"] == 50 assert data["cursor"] == 50 # =========================================================================== # 4. RESILIENCE # =========================================================================== class TestCoordIntegrityResilience: """Corrupt files, disk errors, and race conditions must never block good data.""" def test_corrupt_json_skipped_others_gathered(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) good_uid = str(uuid.uuid4()) (subdir / f"{good_uid}.json").write_text( json.dumps({"reservation_id": good_uid, "run_id": "r"}), encoding="utf-8" ) (subdir / "corrupt.json").write_text("{not valid json !!!", encoding="utf-8") gathered = _gather_local_records(repo, ["reservation"]) assert len(gathered) == 1 assert gathered[0]["record_uuid"] == good_uid def test_binary_garbage_file_skipped(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "heartbeats" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"run_id": uid}), encoding="utf-8" ) (subdir / "binary.json").write_bytes(bytes(range(256))) gathered = _gather_local_records(repo, ["heartbeat"]) assert len(gathered) == 1 def test_null_byte_in_file_skipped(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "tasks" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"task_id": uid, "run_id": "r"}), encoding="utf-8" ) (subdir / "nullbyte.json").write_bytes(b'{"task_id": "x\x00y"}') gathered = _gather_local_records(repo, ["task"]) # Null byte may or may not parse; either way, only 1 valid record matters assert any(g["record_uuid"] == uid for g in gathered) def test_file_deleted_mid_glob_skipped_gracefully(self, repo: pathlib.Path) -> None: """Simulate FileNotFoundError between glob and read_text.""" subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) ghost = subdir / f"{uid}.json" ghost.write_text(json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8") _original_read_text = pathlib.Path.read_text def raise_on_ghost(self: pathlib.Path, encoding: str | None = None, errors: str | None = None) -> str: if self == ghost: raise FileNotFoundError("deleted mid-glob") return _original_read_text(self, encoding=encoding, errors=errors) with patch.object(pathlib.Path, "read_text", raise_on_ghost): gathered = _gather_local_records(repo, ["reservation"]) assert gathered == [] def test_all_files_corrupt_zero_records_push_exits_0(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) for i in range(10): (subdir / f"bad_{i}.json").write_text("{{broken", encoding="utf-8") with patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["total"] == 0 assert data["failed"] is False def test_500_good_500_corrupt_only_good_pushed(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) good_uids = set() for _ in range(500): uid = str(uuid.uuid4()) good_uids.add(uid) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) for i in range(500): (subdir / f"corrupt_{i}.json").write_text("{bad", encoding="utf-8") gathered = _gather_local_records(repo, ["reservation"]) assert len(gathered) == 500 assert {g["record_uuid"] for g in gathered} == good_uids def test_disk_full_on_write_remote_propagates(self, repo: pathlib.Path) -> None: """OSError from write_text_atomic must NOT be swallowed.""" uid = str(uuid.uuid4()) records = [{"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {}, "expires_at": None}] with patch("muse.cli.commands.coord_sync.write_text_atomic", side_effect=OSError("No space left")): with pytest.raises(OSError, match="No space left"): _write_remote_records(repo, records) def test_permission_error_on_mkdir_propagates(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) records = [{"kind": "heartbeat", "record_uuid": uid, "run_id": "r", "payload": {}, "expires_at": None}] with patch.object(pathlib.Path, "mkdir", side_effect=PermissionError("read-only")): with pytest.raises(PermissionError, match="read-only"): _write_remote_records(repo, records) def test_empty_coordination_dir_returns_no_records(self, repo: pathlib.Path) -> None: # No coord dir at all gathered = _gather_local_records(repo, list(_ALL_KINDS)) assert gathered == [] def test_empty_kind_subdir_returns_no_records(self, repo: pathlib.Path) -> None: (_coord_dir(repo) / "reservations").mkdir(parents=True, exist_ok=True) gathered = _gather_local_records(repo, ["reservation"]) assert gathered == [] # =========================================================================== # 5. CONCURRENCY # =========================================================================== class TestCoordIntegrityConcurrency: """No data races, no corruption under concurrent access.""" def test_16_threads_gather_same_records(self, repo: pathlib.Path) -> None: """All threads must see the same number of records.""" subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) for _ in range(200): uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) counts: list[int] = [] errors: list[str] = [] def worker() -> None: try: recs = _gather_local_records(repo, ["reservation"]) counts.append(len(recs)) except Exception as exc: errors.append(str(exc)) threads = [threading.Thread(target=worker) for _ in range(16)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Errors: {errors}" assert all(c == 200 for c in counts), f"Count mismatch: {counts}" def test_8_threads_write_distinct_uuids_all_files_present(self, repo: pathlib.Path) -> None: """8 writers, each writing a unique UUID → all 8 files must exist.""" uids = [str(uuid.uuid4()) for _ in range(8)] errors: list[str] = [] def worker(uid: str) -> None: try: rec = {"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {"uid": uid}, "expires_at": None} _write_remote_records(repo, [rec]) except Exception as exc: errors.append(f"{uid}: {exc}") threads = [threading.Thread(target=worker, args=(u,)) for u in uids] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Errors: {errors}" remote = repo / ".muse" / "coordination" / "remote" / "reservation" written = {f.stem for f in remote.glob("*.json")} assert written == set(uids), f"Missing: {set(uids) - written}" def test_8_threads_write_same_uuid_final_file_is_valid_json( self, repo: pathlib.Path ) -> None: """Last-writer-wins on same UUID must produce valid JSON, not corruption.""" uid = str(uuid.uuid4()) errors: list[str] = [] def worker(i: int) -> None: try: rec = {"kind": "task", "record_uuid": uid, "run_id": f"r{i}", "payload": {"version": i}, "expires_at": None} _write_remote_records(repo, [rec]) except Exception as exc: errors.append(f"Thread {i}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() target = repo / ".muse" / "coordination" / "remote" / "task" / f"{uid}.json" assert target.exists() content = target.read_text(encoding="utf-8") loaded = json.loads(content) # Must not raise — file must be valid JSON assert loaded["record_uuid"] == uid def test_run_push_8_concurrent_threads_no_crashes(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) for _ in range(50): uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) errors: list[str] = [] def worker(idx: int) -> None: args = argparse.Namespace( hub="http://localhost:10003", owner="gabriel", slug="linux", signing=None, kinds=["reservation"], fmt="json", ) try: run_push(args) except SystemExit as exc: if exc.code not in (0, 1): errors.append(f"Thread {idx}: unexpected exit {exc.code}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") push_p = patch(_PUSH_TARGET, return_value=_push_ok(50)) repo_p = patch(_REQUIRE_REPO, return_value=repo) hub_p = patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")) push_p.start(); repo_p.start(); hub_p.start() try: threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() finally: push_p.stop(); repo_p.stop(); hub_p.stop() assert not errors, f"Concurrent push errors: {errors}" def test_run_pull_8_concurrent_threads_no_crashes(self, repo: pathlib.Path) -> None: errors: list[str] = [] def worker(idx: int) -> None: args = argparse.Namespace( hub="http://localhost:10003", owner="gabriel", slug="linux", signing=None, since_id=0, kinds=[], limit=500, fmt="json", ) try: run_pull(args) except SystemExit as exc: if exc.code not in (0, 1): errors.append(f"Thread {idx}: unexpected exit {exc.code}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") pull_p = patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)) repo_p = patch(_REQUIRE_REPO, return_value=repo) hub_p = patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")) pull_p.start(); repo_p.start(); hub_p.start() try: threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() finally: pull_p.stop(); repo_p.stop(); hub_p.stop() assert not errors, f"Concurrent pull errors: {errors}" # =========================================================================== # 6. LINUX-SCALE # =========================================================================== class TestCoordIntegrityScale: """Linux-scale operations: thousands of records across all 7 kinds.""" _KIND_SUBDIR = { "reservation": "reservations", "intent": "intents", "release": "releases", "heartbeat": "heartbeats", "dependency": "dependencies", "task": "tasks", "claim": "claims", } _KIND_ID_FIELD = { "reservation": "reservation_id", "intent": "intent_id", "release": "release_id", "heartbeat": "run_id", "dependency": "reservation_id", "task": "task_id", "claim": "task_id", } def _populate_all_kinds(self, repo: pathlib.Path, count: int) -> None: for kind in _ALL_KINDS: subdir = _coord_dir(repo) / self._KIND_SUBDIR[kind] subdir.mkdir(parents=True, exist_ok=True) id_field = self._KIND_ID_FIELD[kind] for _ in range(count): uid = str(uuid.uuid4()) rec: MsgpackDict = {id_field: uid, "run_id": "r"} if kind == "claim": rec["claimer_run_id"] = "cr" (subdir / f"{uid}.json").write_text(json.dumps(rec), encoding="utf-8") def test_7000_records_all_7_kinds_gathered(self, repo: pathlib.Path) -> None: """1000 of each kind (7000 total) — all must be gathered.""" self._populate_all_kinds(repo, 1000) gathered = _gather_local_records(repo, list(_ALL_KINDS)) assert len(gathered) == 7000 by_kind = {} for g in gathered: by_kind[g["kind"]] = by_kind.get(g["kind"], 0) + 1 assert all(by_kind.get(k, 0) == 1000 for k in _ALL_KINDS), f"Kind counts: {by_kind}" def test_7000_records_batched_and_pushed_14_calls(self, repo: pathlib.Path) -> None: self._populate_all_kinds(repo, 1000) call_count = 0 total_sent = 0 def count_calls(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict: nonlocal call_count, total_sent call_count += 1 total_sent += len(records) return _push_ok(len(records)) with patch(_PUSH_TARGET, side_effect=count_calls), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 assert call_count == 14 # ceil(7000/500) = 14 assert total_sent == 7000 def test_1000_record_pull_all_files_written(self, repo: pathlib.Path) -> None: """Pull returns MAX_PULL_LIMIT records — all written to remote dir.""" records = [ {"kind": "reservation", "record_uuid": str(uuid.uuid4()), "run_id": f"r{i}", "payload": {}, "expires_at": None} for i in range(1000) ] uuids = {r["record_uuid"] for r in records} with patch(_PULL_TARGET, return_value={"records": records, "cursor": 1000}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 0 remote = repo / ".muse" / "coordination" / "remote" / "reservation" written = {f.stem for f in remote.glob("*.json")} assert written == uuids def test_mixed_1000_reservations_1000_heartbeats_total_2000( self, repo: pathlib.Path ) -> None: res_subdir = _coord_dir(repo) / "reservations" hb_subdir = _coord_dir(repo) / "heartbeats" res_subdir.mkdir(parents=True, exist_ok=True) hb_subdir.mkdir(parents=True, exist_ok=True) for _ in range(1000): uid = str(uuid.uuid4()) (res_subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) uid2 = str(uuid.uuid4()) (hb_subdir / f"{uid2}.json").write_text( json.dumps({"run_id": uid2}), encoding="utf-8" ) gathered = _gather_local_records(repo, ["reservation", "heartbeat"]) assert len(gathered) == 2000 def test_50_sequential_push_pull_cycles_integrity(self, repo: pathlib.Path) -> None: """50 cycles: push 10, pull 10, verify no data loss across cycles.""" subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) total_inserted = 0 total_pulled = 0 cursor = 0 for cycle in range(50): # Write 10 new records cycle_uids = [] for _ in range(10): uid = str(uuid.uuid4()) cycle_uids.append(uid) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": f"r{cycle}"}), encoding="utf-8", ) # Push with patch(_PUSH_TARGET, return_value=_push_ok(10)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 push_data = json.loads(result.output.strip()) # total accumulates: gather reads ALL files in subdir each cycle total_inserted += push_data["inserted"] # Pull 10 new records pull_records = [ {"kind": "reservation", "record_uuid": u, "run_id": f"r{cycle}", "payload": {}, "expires_at": None} for u in cycle_uids ] cursor += 10 with patch(_PULL_TARGET, return_value={"records": pull_records, "cursor": cursor}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke( cli, _PULL_ARGS + ["--since-id", str(cursor - 10), "-j"] ) assert result.exit_code == 0 pull_data = json.loads(result.output.strip()) total_pulled += pull_data["count"] assert total_pulled == 500 # 50 cycles × 10 records each # =========================================================================== # 7. RESPONSE BOUNDS # =========================================================================== class TestCoordIntegrityResponseBounds: """Hub response edge cases — missing keys, oversized bodies, malformed JSON.""" def test_response_missing_inserted_key_defaults_to_zero( self, repo: pathlib.Path ) -> None: with patch(_PUSH_TARGET, return_value={"skipped": 3}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["inserted"] == 0 # defaults to 0 assert data["skipped"] == 3 def test_response_missing_skipped_key_defaults_to_zero( self, repo: pathlib.Path ) -> None: with patch(_PUSH_TARGET, return_value={"inserted": 5}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["inserted"] == 5 assert data["skipped"] == 0 # defaults to 0 def test_response_extra_keys_silently_ignored(self, repo: pathlib.Path) -> None: resp = {"inserted": 1, "skipped": 0, "unknown_future_key": "value", "debug": {}} with patch(_PUSH_TARGET, return_value=resp), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 0 def test_coord_bus_error_on_push_exits_1(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, side_effect=CoordBusError("service unavailable", status_code=503)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) assert result.exit_code == 1 last_line = [l for l in result.output.strip().splitlines() if l.strip()][-1] data = json.loads(last_line) assert data["failed"] is True def test_coord_bus_error_on_pull_exits_1(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, side_effect=CoordBusError("gateway timeout", status_code=504)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 1 def test_pull_response_missing_records_key_no_files_written( self, repo: pathlib.Path ) -> None: with patch(_PULL_TARGET, return_value={"cursor": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["count"] == 0 def test_pull_response_missing_cursor_key_defaults_to_zero( self, repo: pathlib.Path ) -> None: records = [ {"kind": "task", "record_uuid": str(uuid.uuid4()), "run_id": "r", "payload": {}} ] with patch(_PULL_TARGET, return_value={"records": records}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["-j"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["cursor"] == 0 # defaults to 0 # =========================================================================== # 8. FILESYSTEM SAFETY # =========================================================================== class TestCoordIntegrityFilesystem: """Path traversal prevention, UUID validation, remote dir creation.""" def test_traversal_uuid_does_not_escape_remote_dir(self, repo: pathlib.Path) -> None: evil = "../../../etc/passwd" records = [{"kind": "reservation", "record_uuid": evil, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) # Must not exist outside remote/reservation/ assert not (repo / ".muse" / "coordination" / "remote" / "reservation" / "../../etc/passwd").exists() # The remote dir might or might not exist; if it does, no evil files remote = repo / ".muse" / "coordination" / "remote" if remote.exists(): all_files = list(remote.rglob("*.json")) for f in all_files: assert ".." not in str(f), f"Traversal escaped: {f}" def test_traversal_uuid_null_byte_rejected(self, repo: pathlib.Path) -> None: evil = "foo\x00bar" records = [{"kind": "task", "record_uuid": evil, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = repo / ".muse" / "coordination" / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_uuid_128_chars_accepted(self, repo: pathlib.Path) -> None: uid = "a" * 128 records = [{"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) target = repo / ".muse" / "coordination" / "remote" / "reservation" / f"{uid}.json" assert target.exists() def test_uuid_129_chars_rejected(self, repo: pathlib.Path) -> None: uid = "a" * 129 assert not _SAFE_RECORD_UUID_RE.match(uid), "Regex should reject 129-char UUID" records = [{"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = repo / ".muse" / "coordination" / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_kind_traversal_rejected(self, repo: pathlib.Path) -> None: records = [{"kind": "../evil", "record_uuid": "safe-uuid", "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = repo / ".muse" / "coordination" / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_empty_kind_rejected(self, repo: pathlib.Path) -> None: records = [{"kind": "", "record_uuid": "safe-uuid", "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = repo / ".muse" / "coordination" / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_empty_uuid_rejected(self, repo: pathlib.Path) -> None: records = [{"kind": "reservation", "record_uuid": "", "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = repo / ".muse" / "coordination" / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_remote_dir_created_automatically(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) records = [{"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {}, "expires_at": None}] remote = repo / ".muse" / "coordination" / "remote" assert not remote.exists() _write_remote_records(repo, records) assert remote.exists() assert (remote / "reservation" / f"{uid}.json").exists() def test_existing_remote_file_overwritten(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) rec_v1 = {"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {"version": 1}, "expires_at": None} rec_v2 = {"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {"version": 2}, "expires_at": None} _write_remote_records(repo, [rec_v1]) _write_remote_records(repo, [rec_v2]) target = repo / ".muse" / "coordination" / "remote" / "reservation" / f"{uid}.json" loaded = json.loads(target.read_text()) assert loaded["payload"]["version"] == 2 def test_written_files_are_valid_json(self, repo: pathlib.Path) -> None: records = [ {"kind": k, "record_uuid": str(uuid.uuid4()), "run_id": "r", "payload": {"test": True}, "expires_at": None} for k in _ALL_KINDS ] _write_remote_records(repo, records) remote = repo / ".muse" / "coordination" / "remote" for fpath in remote.rglob("*.json"): content = fpath.read_text(encoding="utf-8") json.loads(content) # Must not raise def test_written_files_end_with_newline(self, repo: pathlib.Path) -> None: uid = str(uuid.uuid4()) records = [{"kind": "reservation", "record_uuid": uid, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) target = repo / ".muse" / "coordination" / "remote" / "reservation" / f"{uid}.json" assert target.read_text(encoding="utf-8").endswith("\n") def test_symlink_json_file_data_read_correctly(self, repo: pathlib.Path) -> None: """A symlink to a valid JSON file must be read as a normal record.""" uid = str(uuid.uuid4()) real_file = repo / "real_record.json" real_file.write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) link = subdir / f"{uid}.json" link.symlink_to(real_file) gathered = _gather_local_records(repo, ["reservation"]) assert len(gathered) == 1 assert gathered[0]["record_uuid"] == uid # =========================================================================== # 9. IDEMPOTENCY # =========================================================================== class TestCoordIntegrityIdempotency: """Same operations must always produce the same result — no ghost state.""" def test_gather_is_pure_same_dir_same_result(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) for _ in range(20): uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) first = _gather_local_records(repo, ["reservation"]) second = _gather_local_records(repo, ["reservation"]) assert len(first) == len(second) == 20 uuids_first = {r["record_uuid"] for r in first} uuids_second = {r["record_uuid"] for r in second} assert uuids_first == uuids_second def test_push_same_500_records_twice_second_all_skipped( self, repo: pathlib.Path ) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) for _ in range(500): uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) # First push: all inserted with patch(_PUSH_TARGET, return_value=_push_ok(500, 0)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result1 = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data1 = json.loads(result1.output.strip()) assert data1["inserted"] == 500 # Second push: all skipped (hub deduplicates) with patch(_PUSH_TARGET, return_value=_push_ok(0, 500)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result2 = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data2 = json.loads(result2.output.strip()) assert data2["inserted"] == 0 assert data2["skipped"] == 500 assert result2.exit_code == 0 def test_push_partial_overlap_correct_counts(self, repo: pathlib.Path) -> None: subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) for _ in range(500): uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) # Hub says 250 were new, 250 it already had with patch(_PUSH_TARGET, return_value=_push_ok(250, 250)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PUSH_ARGS + ["-j"]) data = json.loads(result.output.strip()) assert data["inserted"] == 250 assert data["skipped"] == 250 def test_write_remote_same_uuid_second_overwrites_first( self, repo: pathlib.Path ) -> None: uid = str(uuid.uuid4()) _write_remote_records(repo, [ {"kind": "reservation", "record_uuid": uid, "run_id": "first", "payload": {"seq": 1}, "expires_at": None} ]) _write_remote_records(repo, [ {"kind": "reservation", "record_uuid": uid, "run_id": "second", "payload": {"seq": 2}, "expires_at": None} ]) target = repo / ".muse" / "coordination" / "remote" / "reservation" / f"{uid}.json" loaded = json.loads(target.read_text()) assert loaded["run_id"] == "second" assert loaded["payload"]["seq"] == 2 def test_pull_since_id_at_cursor_returns_zero_records( self, repo: pathlib.Path ) -> None: """Pulling with since_id=cursor should yield 0 new records.""" with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=100)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", "tok")): result = runner.invoke(cli, _PULL_ARGS + ["--since-id", "100", "-j"]) data = json.loads(result.output.strip()) assert data["count"] == 0 # =========================================================================== # 10. TOKEN SAFETY # =========================================================================== class TestCoordIntegrityTokenSafety: """Auth token must NEVER appear in any output path under any condition.""" _SECRET = "ABSOLUTELY-SECRET-TOKEN-NEVER-LEAK-THIS-4xQzR7" def _push_args_with_secret(self) -> list[str]: return [ "coord", "sync", "push", "--hub", "http://localhost:10003", "--owner", "gabriel", "--slug", "linux", ] def _pull_args_with_secret(self) -> list[str]: return [ "coord", "sync", "pull", "--hub", "http://localhost:10003", "--owner", "gabriel", "--slug", "linux", "--since-id", "0", ] def test_token_not_in_json_push_success(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, return_value=_push_ok(0)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", self._SECRET)): result = runner.invoke(cli, self._push_args_with_secret() + ["-j"]) assert self._SECRET not in result.output def test_token_not_in_json_pull_success(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", self._SECRET)): result = runner.invoke(cli, self._pull_args_with_secret() + ["-j"]) assert self._SECRET not in result.output def test_token_not_in_json_push_error(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, side_effect=CoordBusError("fail", status_code=500)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", self._SECRET)): subdir = _coord_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = str(uuid.uuid4()) (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) result = runner.invoke(cli, self._push_args_with_secret() + ["-j"]) # The secret must not appear in ANY line — error line or summary line. assert self._SECRET not in result.output def test_token_not_in_json_pull_error(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, side_effect=CoordBusError("fail", status_code=500)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", self._SECRET)): result = runner.invoke(cli, self._pull_args_with_secret() + ["-j"]) assert self._SECRET not in result.output def test_token_not_in_text_push_success(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, return_value=_push_ok(0)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", self._SECRET)): result = runner.invoke(cli, self._push_args_with_secret()) assert self._SECRET not in result.output def test_token_not_in_text_pull_success(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("http://localhost:10003", self._SECRET)): result = runner.invoke(cli, self._pull_args_with_secret()) assert self._SECRET not in result.output