gabriel / muse public
test_coord_data_integrity.py python
1,515 lines 67.2 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """EXTREME data integrity test suite for ``muse coord sync`` — the coord layer.
2
3 Scenario: Linus Torvalds ports the Linux kernel to Muse. 50 AI agents are
4 simultaneously reading and writing coordination records across 7 kinds.
5 Thousands of files. Network blips. Corrupted disk sectors. Adversarial hubs.
6 Concurrent writers. This suite attempts to find the edge and go beyond it.
7
8 Coverage matrix
9 ---------------
10 Round-trip fidelity
11 * All 7 kinds survive _gather → _write with bit-exact field preservation
12 * Unicode in payloads (CJK, emoji, RTL Arabic) — no mojibake
13 * Deeply nested payloads — no truncation or type coercion
14 * Null / empty / boolean payload values — exact type preservation
15 * 50 KB payload string — no truncation
16 * record_id fallback chain: explicit field → fpath.stem
17
18 Batching correctness
19 * Exactly MAX_PUSH_BATCH (500) records → exactly 1 push_to_hub call
20 * 501 records → exactly 2 push_to_hub calls
21 * 1 000 records → exactly 2 calls (500 each)
22 * 1 001 records → exactly 3 calls (500, 500, 1)
23 * 4 999 records (Linux-scale) → 10 calls (9×500 + 1×499)
24 * First batch fails, remaining batches still execute
25 * All batches fail → inserted=0, failed=True, exit 1
26 * Batch slices are non-overlapping and cover all records exactly
27 * accumulated inserted/skipped counts are sum of all batch results
28
29 Cursor correctness
30 * cursor=0 when pull returns 0 records
31 * cursor equals the id of the last returned record
32 * cursor in JSON output matches cursor in pull_from_hub return value
33 * Incremental pull chain: 5 pages, cursor advances, no gaps in record IDs
34 * since_id passed to pull_from_hub verbatim
35 * Page-chained full pull reconstructs all 1 000 records with zero duplicates
36
37 Resilience
38 * Corrupt JSON file skipped — other files in same dir still gathered
39 * Binary garbage file skipped
40 * NUL byte in JSON skipped
41 * File deleted between glob() and read_text() — skipped gracefully
42 * All files corrupt → 0 records → "(no local coordination records to push)"
43 * Partial corruption: 500 good + 500 corrupt → 500 records pushed
44 * Disk full on _write_remote_records → OSError propagates (does not swallow)
45 * OSError on mkdir propagates
46 * PermissionError on write propagates
47 * Second-pass gather after first corrupt pass finds good files added later
48
49 Concurrency
50 * 16 threads call _gather_local_records simultaneously → each sees same count
51 * 8 threads write distinct record_ids to same remote dir → all files present at end
52 * 8 threads write same record_id → final file is valid JSON (last-writer-wins, not corrupt)
53 * run_push called by 8 threads concurrently → no crashes
54 * run_pull called by 8 threads concurrently → no crashes
55
56 Linux-scale
57 * 7 000 records (1 000 per kind) gathered correctly
58 * 7 000 records batched and pushed in 14 calls, all counted
59 * 1 000-record pull writes all 1 000 files to remote dir
60 * Mixed kinds: push 1 000 reservations + 1 000 heartbeats → 2 000 total
61 * 50 sequential push + pull cycles, total inserted/pulled = 50×N each
62
63 Response bounds
64 * Hub response missing "inserted" key → defaults to 0 (no KeyError crash)
65 * Hub response missing "skipped" key → defaults to 0
66 * Hub response with extra unknown keys → silently ignored
67 * Malformed JSON response → CoordBusError raised
68 * Empty response body → CoordBusError raised
69 * Response at exactly MAX_COORD_RESPONSE_BYTES is accepted by _post_json
70 * Response one byte over MAX_COORD_RESPONSE_BYTES → CoordBusError
71
72 Filesystem safety
73 * record_id = "../../../etc/passwd" → skipped, no file written outside remote/
74 * record_id with null byte → skipped
75 * record_id with 128 chars (max) → accepted, file written
76 * record_id with 129 chars (over max) → rejected
77 * kind = "../traversal" → skipped
78 * kind = "" → skipped
79 * remote dir created automatically if absent
80 * Existing remote file overwritten with updated payload
81 * Written files are valid JSON (parseable with json.loads)
82 * Written files end with newline
83
84 Idempotency
85 * _gather_local_records is pure: same dir always returns same records
86 * Push same 500 records twice → second push all skipped, inserted=0
87 * Push 250 old + 250 new → inserted=250, skipped=250
88 * _write_remote_records same record_id twice → second write overwrites cleanly
89 * Pull with since_id=cursor returns 0 records (no replay)
90
91 Token safety
92 * Token never appears in JSON success output (push)
93 * Token never appears in JSON success output (pull)
94 * Token never appears in JSON error output (push CoordBusError)
95 * Token never appears in JSON error output (pull CoordBusError)
96 * Token never appears in text mode success output
97 """
98
99 from __future__ import annotations
100
101 import argparse
102 import io
103 import itertools
104 import json
105 import pathlib
106 import threading
107 from unittest.mock import MagicMock, call, patch
108
109 import pytest
110
111 from muse.core.paths import coordination_dir, muse_dir
112 from muse.core.types import content_hash, MsgpackDict, MsgpackValue
113 from tests.cli_test_helper import CliRunner
114 from muse.core.coord_bus import CoordBusError, MAX_PUSH_BATCH, MAX_PULL_LIMIT
115 from muse.cli.commands.coord_sync import (
116 _ALL_KINDS,
117 _MAX_OWNER_LEN,
118 _MAX_PULL_LIMIT,
119 _MAX_SLUG_LEN,
120 _SAFE_RECORD_ID_RE,
121 _gather_local_records,
122 _write_remote_records,
123 run_pull,
124 run_push,
125 )
126
127 runner = CliRunner()
128 cli = None # CliRunner accepts cli=None after argparse migration
129
130 _id_seq = itertools.count()
131
132 def _new_id() -> str:
133 return content_hash({"seq": next(_id_seq)})
134
135 # ---------------------------------------------------------------------------
136 # Patch targets
137 # ---------------------------------------------------------------------------
138
139 _PUSH_TARGET = "muse.cli.commands.coord_sync.push_to_hub"
140 _PULL_TARGET = "muse.cli.commands.coord_sync.pull_from_hub"
141 _REQUIRE_REPO = "muse.cli.commands.coord_sync.require_repo"
142 _RESOLVE_HUB = "muse.cli.commands.coord_sync._resolve_hub_and_signing"
143
144 # ---------------------------------------------------------------------------
145 # Common args
146 # ---------------------------------------------------------------------------
147
148 _BASE_ARGS = [
149 "--hub", "https://localhost:1337",
150 "--owner", "gabriel",
151 "--slug", "linux",
152 ]
153 _PUSH_ARGS = ["coord", "sync", "push"] + _BASE_ARGS
154 _PULL_ARGS = ["coord", "sync", "pull"] + _BASE_ARGS + ["--since-id", "0"]
155
156 # ---------------------------------------------------------------------------
157 # Helpers
158 # ---------------------------------------------------------------------------
159
160
161 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
162 """Create a minimal .muse repo layout."""
163 muse_dir(tmp_path).mkdir()
164 return tmp_path
165
166
167 def _coord_dir(root: pathlib.Path) -> pathlib.Path:
168 return coordination_dir(root)
169
170
171 def _write_records(root: pathlib.Path, kind: str, records: list[MsgpackDict]) -> None:
172 """Write coordination records as JSON files in the appropriate subdir."""
173 subdir_map = {
174 "reservation": "reservations",
175 "intent": "intents",
176 "release": "releases",
177 "heartbeat": "heartbeats",
178 "dependency": "dependencies",
179 "task": "tasks",
180 "claim": "claims",
181 }
182 subdir = _coord_dir(root) / subdir_map[kind]
183 subdir.mkdir(parents=True, exist_ok=True)
184 for rec in records:
185 fname = rec.get(
186 {
187 "reservation": "reservation_id",
188 "intent": "intent_id",
189 "release": "release_id",
190 "heartbeat": "run_id",
191 "dependency": "reservation_id",
192 "task": "task_id",
193 "claim": "task_id",
194 }[kind],
195 _new_id(),
196 )
197 (subdir / f"{fname}.json").write_text(json.dumps(rec), encoding="utf-8")
198
199
200 def _res(uid: str | None = None, **extra: MsgpackValue) -> MsgpackDict:
201 uid = uid or _new_id()
202 return {"reservation_id": uid, "run_id": f"run-{uid}", "expires_at": None, **extra}
203
204
205 def _hb(uid: str | None = None, **extra: MsgpackValue) -> MsgpackDict:
206 uid = uid or _new_id()
207 return {"run_id": uid, "expires_at": None, **extra}
208
209
210 def _task(uid: str | None = None, **extra: MsgpackValue) -> MsgpackDict:
211 uid = uid or _new_id()
212 return {"task_id": uid, "run_id": f"run-{uid}", **extra}
213
214
215 def _push_ok(inserted: int = 1, skipped: int = 0) -> MsgpackDict:
216 return {"inserted": inserted, "skipped": skipped}
217
218
219 def _pull_ok(
220 records: list[MsgpackDict] | None = None,
221 cursor: int = 1,
222 ) -> MsgpackDict:
223 recs = records if records is not None else []
224 return {"records": recs, "cursor": cursor}
225
226
227 @pytest.fixture
228 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
229 return _make_repo(tmp_path)
230
231
232 # ===========================================================================
233 # 1. ROUND-TRIP FIDELITY
234 # ===========================================================================
235
236
237 class TestCoordIntegrityRoundTrip:
238 """Records written to disk then gathered must be bit-exact."""
239
240 def test_reservation_fields_preserved(self, repo: pathlib.Path) -> None:
241 uid = _new_id()
242 rec = {"reservation_id": uid, "run_id": "r1", "expires_at": "2099-01-01T00:00:00Z",
243 "custom": "value"}
244 _write_records(repo, "reservation", [rec])
245 gathered = _gather_local_records(repo, ["reservation"])
246 assert len(gathered) == 1
247 g = gathered[0]
248 assert g["kind"] == "reservation"
249 assert g["record_id"] == uid
250 assert g["run_id"] == "r1"
251 assert g["expires_at"] == "2099-01-01T00:00:00Z"
252 assert g["payload"]["custom"] == "value"
253
254 def test_all_seven_kinds_gather_correctly(self, repo: pathlib.Path) -> None:
255 """One file per kind — all 7 must appear with correct kind field."""
256 for kind in _ALL_KINDS:
257 if kind == "reservation":
258 rec = {"reservation_id": _new_id(), "run_id": "r"}
259 elif kind == "intent":
260 rec = {"intent_id": _new_id(), "run_id": "r"}
261 elif kind == "release":
262 rec = {"release_id": _new_id(), "run_id": "r"}
263 elif kind == "heartbeat":
264 rec = {"run_id": _new_id()}
265 elif kind == "dependency":
266 rec = {"reservation_id": _new_id(), "run_id": "r"}
267 elif kind == "task":
268 rec = {"task_id": _new_id(), "run_id": "r"}
269 elif kind == "claim":
270 rec = {"task_id": _new_id(), "claimer_run_id": "cr", "run_id": "r"}
271 _write_records(repo, kind, [rec])
272
273 gathered = _gather_local_records(repo, list(_ALL_KINDS))
274 found_kinds = {g["kind"] for g in gathered}
275 assert found_kinds == set(_ALL_KINDS), f"Missing kinds: {set(_ALL_KINDS) - found_kinds}"
276
277 def test_unicode_payload_cjk_no_mojibake(self, repo: pathlib.Path) -> None:
278 uid = _new_id()
279 chinese = "作曲家は天才だ" # "The composer is a genius"
280 rec = {"reservation_id": uid, "run_id": "r", "label": chinese}
281 _write_records(repo, "reservation", [rec])
282 gathered = _gather_local_records(repo, ["reservation"])
283 assert gathered[0]["payload"]["label"] == chinese
284
285 def test_unicode_payload_emoji_no_mojibake(self, repo: pathlib.Path) -> None:
286 uid = _new_id()
287 notes = "🎵🎼🎹🎸🥁"
288 rec = {"reservation_id": uid, "run_id": "r", "notes": notes}
289 _write_records(repo, "reservation", [rec])
290 gathered = _gather_local_records(repo, ["reservation"])
291 assert gathered[0]["payload"]["notes"] == notes
292
293 def test_unicode_payload_arabic_rtl(self, repo: pathlib.Path) -> None:
294 uid = _new_id()
295 arabic = "مؤلف موسيقي"
296 rec = {"reservation_id": uid, "run_id": "r", "composer": arabic}
297 _write_records(repo, "reservation", [rec])
298 gathered = _gather_local_records(repo, ["reservation"])
299 assert gathered[0]["payload"]["composer"] == arabic
300
301 def test_deeply_nested_payload_preserved(self, repo: pathlib.Path) -> None:
302 uid = _new_id()
303 deep = {"a": {"b": {"c": {"d": {"e": {"f": "leaf"}}}}}}
304 rec = {"reservation_id": uid, "run_id": "r", "meta": deep}
305 _write_records(repo, "reservation", [rec])
306 gathered = _gather_local_records(repo, ["reservation"])
307 assert gathered[0]["payload"]["meta"]["a"]["b"]["c"]["d"]["e"]["f"] == "leaf"
308
309 def test_null_values_in_payload_preserved(self, repo: pathlib.Path) -> None:
310 uid = _new_id()
311 rec = {"reservation_id": uid, "run_id": "r", "nullable": None, "also": None}
312 _write_records(repo, "reservation", [rec])
313 gathered = _gather_local_records(repo, ["reservation"])
314 assert gathered[0]["payload"]["nullable"] is None
315 assert gathered[0]["payload"]["also"] is None
316
317 def test_boolean_payload_values_not_coerced_to_strings(self, repo: pathlib.Path) -> None:
318 uid = _new_id()
319 rec = {"reservation_id": uid, "run_id": "r", "flag": True, "other": False}
320 _write_records(repo, "reservation", [rec])
321 gathered = _gather_local_records(repo, ["reservation"])
322 assert gathered[0]["payload"]["flag"] is True
323 assert gathered[0]["payload"]["other"] is False
324
325 def test_integer_payload_values_not_coerced(self, repo: pathlib.Path) -> None:
326 uid = _new_id()
327 rec = {"reservation_id": uid, "run_id": "r", "count": 42, "pi": 3.14159}
328 _write_records(repo, "reservation", [rec])
329 gathered = _gather_local_records(repo, ["reservation"])
330 assert gathered[0]["payload"]["count"] == 42
331 assert abs(gathered[0]["payload"]["pi"] - 3.14159) < 1e-9
332
333 def test_50kb_payload_not_truncated(self, repo: pathlib.Path) -> None:
334 uid = _new_id()
335 big_string = "X" * 50_000
336 rec = {"reservation_id": uid, "run_id": "r", "blob": big_string}
337 _write_records(repo, "reservation", [rec])
338 gathered = _gather_local_records(repo, ["reservation"])
339 assert len(gathered[0]["payload"]["blob"]) == 50_000
340
341 def test_list_payload_values_preserved(self, repo: pathlib.Path) -> None:
342 uid = _new_id()
343 tags = ["linux", "kernel", "vcs", "muse"]
344 rec = {"reservation_id": uid, "run_id": "r", "tags": tags}
345 _write_records(repo, "reservation", [rec])
346 gathered = _gather_local_records(repo, ["reservation"])
347 assert gathered[0]["payload"]["tags"] == tags
348
349 def test_record_id_fallback_to_fpath_stem(self, repo: pathlib.Path) -> None:
350 """When reservation_id is absent, record_id falls back to file stem."""
351 stem = "fallback-stem-abc123"
352 subdir = _coord_dir(repo) / "reservations"
353 subdir.mkdir(parents=True, exist_ok=True)
354 (subdir / f"{stem}.json").write_text(
355 json.dumps({"run_id": "r"}), encoding="utf-8"
356 )
357 gathered = _gather_local_records(repo, ["reservation"])
358 assert gathered[0]["record_id"] == stem
359
360 def test_claim_uses_claimer_run_id_for_run_id(self, repo: pathlib.Path) -> None:
361 uid = _new_id()
362 rec = {"task_id": uid, "claimer_run_id": "claimer-run-99", "run_id": "other"}
363 _write_records(repo, "claim", [rec])
364 gathered = _gather_local_records(repo, ["claim"])
365 assert gathered[0]["run_id"] == "claimer-run-99"
366
367 def test_write_then_read_remote_roundtrip(self, repo: pathlib.Path) -> None:
368 """_write_remote_records → re-read from disk = original record."""
369 uid = _new_id()
370 orig = {"kind": "reservation", "record_id": uid, "run_id": "r",
371 "payload": {"x": 99}, "expires_at": "2099-06-01T00:00:00Z"}
372 _write_remote_records(repo, [orig])
373 target = coordination_dir(repo) / "remote" / "reservation" / f"{uid}.json"
374 assert target.exists()
375 loaded = json.loads(target.read_text(encoding="utf-8"))
376 assert loaded["payload"]["x"] == 99
377 assert loaded["expires_at"] == "2099-06-01T00:00:00Z"
378
379 def test_all_7_kinds_write_remote_and_read_back(self, repo: pathlib.Path) -> None:
380 records = []
381 for kind in _ALL_KINDS:
382 uid = _new_id()
383 records.append({"kind": kind, "record_id": uid, "run_id": "r",
384 "payload": {"kind_was": kind}, "expires_at": None})
385 _write_remote_records(repo, records)
386 for rec in records:
387 path = (
388 coordination_dir(repo) / "remote"
389 / rec["kind"] / f"{rec['record_id']}.json"
390 )
391 assert path.exists(), f"Missing: {path}"
392 loaded = json.loads(path.read_text())
393 assert loaded["payload"]["kind_was"] == rec["kind"]
394
395
396 # ===========================================================================
397 # 2. BATCHING CORRECTNESS
398 # ===========================================================================
399
400
401 class TestCoordIntegrityBatching:
402 """Correct batch splitting, counting, and failure isolation."""
403
404 def _make_n_reservations(self, repo: pathlib.Path, n: int) -> None:
405 subdir = _coord_dir(repo) / "reservations"
406 subdir.mkdir(parents=True, exist_ok=True)
407 for _ in range(n):
408 uid = _new_id()
409 (subdir / f"{uid}.json").write_text(
410 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
411 )
412
413 def test_exactly_500_records_is_one_batch(self, repo: pathlib.Path) -> None:
414 self._make_n_reservations(repo, 500)
415 with patch(_PUSH_TARGET, return_value=_push_ok(500)) as mock_push, \
416 patch(_REQUIRE_REPO, return_value=repo), \
417 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
418 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
419 assert result.exit_code == 0
420 assert mock_push.call_count == 1
421 data = json.loads(result.output.strip())
422 assert data["total"] == 500
423
424 def test_501_records_splits_into_two_batches(self, repo: pathlib.Path) -> None:
425 self._make_n_reservations(repo, 501)
426 batch_sizes: list[int] = []
427 def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
428 batch_sizes.append(len(records))
429 return _push_ok(len(records))
430 with patch(_PUSH_TARGET, side_effect=fake_push), \
431 patch(_REQUIRE_REPO, return_value=repo), \
432 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
433 runner.invoke(cli, _PUSH_ARGS + ["-j"])
434 assert len(batch_sizes) == 2
435 assert batch_sizes[0] == 500
436 assert batch_sizes[1] == 1
437
438 def test_1000_records_splits_into_two_batches(self, repo: pathlib.Path) -> None:
439 self._make_n_reservations(repo, 1000)
440 batch_sizes: list[int] = []
441 def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
442 batch_sizes.append(len(records))
443 return _push_ok(len(records))
444 with patch(_PUSH_TARGET, side_effect=fake_push), \
445 patch(_REQUIRE_REPO, return_value=repo), \
446 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
447 runner.invoke(cli, _PUSH_ARGS + ["-j"])
448 assert len(batch_sizes) == 2
449 assert all(s == 500 for s in batch_sizes)
450
451 def test_1001_records_splits_into_three_batches(self, repo: pathlib.Path) -> None:
452 self._make_n_reservations(repo, 1001)
453 batch_sizes: list[int] = []
454 def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
455 batch_sizes.append(len(records))
456 return _push_ok(len(records))
457 with patch(_PUSH_TARGET, side_effect=fake_push), \
458 patch(_REQUIRE_REPO, return_value=repo), \
459 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
460 runner.invoke(cli, _PUSH_ARGS + ["-j"])
461 assert len(batch_sizes) == 3
462 assert batch_sizes[0] == 500
463 assert batch_sizes[1] == 500
464 assert batch_sizes[2] == 1
465
466 def test_4999_records_linux_scale_10_batches(self, repo: pathlib.Path) -> None:
467 """Linux-scale: 4999 records → 9×500 + 1×499 = 10 calls."""
468 self._make_n_reservations(repo, 4999)
469 batch_sizes: list[int] = []
470 def fake_push(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
471 batch_sizes.append(len(records))
472 return _push_ok(len(records))
473 with patch(_PUSH_TARGET, side_effect=fake_push), \
474 patch(_REQUIRE_REPO, return_value=repo), \
475 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
476 runner.invoke(cli, _PUSH_ARGS + ["-j"])
477 assert len(batch_sizes) == 10
478 assert sum(batch_sizes) == 4999
479 assert batch_sizes[-1] == 499
480
481 def test_first_batch_fails_remaining_batches_still_execute(self, repo: pathlib.Path) -> None:
482 """A single batch failure must not abort the whole push."""
483 self._make_n_reservations(repo, 1001)
484 call_count = 0
485 def fail_first_then_ok(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
486 nonlocal call_count
487 call_count += 1
488 if call_count == 1:
489 raise CoordBusError("network hiccup", status_code=503)
490 return _push_ok(len(records))
491 with patch(_PUSH_TARGET, side_effect=fail_first_then_ok), \
492 patch(_REQUIRE_REPO, return_value=repo), \
493 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
494 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
495 assert result.exit_code == 1 # failed=True → exit 1
496 # On batch error, _err() emits an error JSON line first; summary is the last line.
497 last_line = [l for l in result.output.strip().splitlines() if l.strip()][-1]
498 data = json.loads(last_line)
499 assert data["failed"] is True
500 assert data["inserted"] == 501 # batches 2 and 3 succeeded
501
502 def test_all_batches_fail_inserted_is_zero(self, repo: pathlib.Path) -> None:
503 self._make_n_reservations(repo, 600)
504 with patch(_PUSH_TARGET, side_effect=CoordBusError("gone", status_code=503)), \
505 patch(_REQUIRE_REPO, return_value=repo), \
506 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
507 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
508 assert result.exit_code == 1
509 last_line = [l for l in result.output.strip().splitlines() if l.strip()][-1]
510 data = json.loads(last_line)
511 assert data["inserted"] == 0
512 assert data["failed"] is True
513
514 def test_batch_slices_are_non_overlapping_and_exhaustive(self, repo: pathlib.Path) -> None:
515 """Every gathered record appears in exactly one batch — no duplicates, no gaps."""
516 n = 1200
517 self._make_n_reservations(repo, n)
518 all_received: list[MsgpackDict] = []
519 def collect(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
520 all_received.extend(records)
521 return _push_ok(len(records))
522 with patch(_PUSH_TARGET, side_effect=collect), \
523 patch(_REQUIRE_REPO, return_value=repo), \
524 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
525 runner.invoke(cli, _PUSH_ARGS + ["-j"])
526 assert len(all_received) == n
527 # No duplicates (record_id uniqueness)
528 record_ids = [r["record_id"] for r in all_received]
529 assert len(record_ids) == len(set(record_ids)), "Duplicate records in batches!"
530
531 def test_inserted_skipped_counts_accumulated_across_batches(self, repo: pathlib.Path) -> None:
532 """total inserted/skipped in JSON output = sum of all batch results."""
533 self._make_n_reservations(repo, 1100)
534 def mixed(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
535 # Odd batches: half inserted, half skipped
536 n = len(records)
537 return _push_ok(n // 2, n - n // 2)
538 with patch(_PUSH_TARGET, side_effect=mixed), \
539 patch(_REQUIRE_REPO, return_value=repo), \
540 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
541 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
542 data = json.loads(result.output.strip())
543 # 3 batches: 500, 500, 100 → inserted = 250+250+50=550, skipped = 250+250+50=550
544 assert data["inserted"] + data["skipped"] == 1100
545
546
547 # ===========================================================================
548 # 3. CURSOR CORRECTNESS
549 # ===========================================================================
550
551
552 class TestCoordIntegrityCursor:
553 """Incremental pull semantics: cursor must advance, no gaps, no duplicates."""
554
555 def test_cursor_zero_when_no_records_returned(self, repo: pathlib.Path) -> None:
556 with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)), \
557 patch(_REQUIRE_REPO, return_value=repo), \
558 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
559 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
560 data = json.loads(result.output.strip())
561 assert data["cursor"] == 0
562
563 def test_cursor_equals_last_record_id(self, repo: pathlib.Path) -> None:
564 records = [
565 {"kind": "reservation", "record_id": _new_id(), "id": 42,
566 "run_id": "r", "payload": {}},
567 ]
568 with patch(_PULL_TARGET, return_value={"records": records, "cursor": 42}), \
569 patch(_REQUIRE_REPO, return_value=repo), \
570 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
571 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
572 data = json.loads(result.output.strip())
573 assert data["cursor"] == 42
574
575 def test_since_id_passed_through_verbatim(self, repo: pathlib.Path) -> None:
576 """--since-id must be forwarded to pull_from_hub unchanged."""
577 with patch(_PULL_TARGET, return_value=_pull_ok([])) as mock_pull, \
578 patch(_REQUIRE_REPO, return_value=repo), \
579 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
580 runner.invoke(cli, _PULL_ARGS + ["--since-id", "9999"])
581 positional = mock_pull.call_args[0]
582 assert positional[3] == 9999 # since_id is 4th positional arg
583
584 def test_incremental_5_pages_no_record_id_gaps(self, repo: pathlib.Path) -> None:
585 """5 pages of 200 records each — all 1000 IDs must be contiguous."""
586 page_size = 200
587 all_ids: list[int] = []
588 cursor = 0
589
590 for page in range(5):
591 start = page * page_size + 1
592 end = start + page_size
593 records = [
594 {"kind": "reservation", "record_id": _new_id(),
595 "id": i, "run_id": "r", "payload": {}}
596 for i in range(start, end)
597 ]
598 new_cursor = end - 1
599 # Simulate: pull returns page of records, then we advance cursor
600 with patch(_PULL_TARGET, return_value={"records": records, "cursor": new_cursor}), \
601 patch(_REQUIRE_REPO, return_value=repo), \
602 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
603 result = runner.invoke(
604 cli, _PULL_ARGS + ["--since-id", str(cursor), "-j"]
605 )
606 data = json.loads(result.output.strip())
607 all_ids.extend(r["id"] for r in data["records"])
608 cursor = data["cursor"]
609
610 assert len(all_ids) == 1000
611 assert all_ids == list(range(1, 1001)), "Gaps or non-contiguous IDs detected!"
612
613 def test_page_chained_pull_zero_duplicates(self, repo: pathlib.Path) -> None:
614 """Simulate 3 pages; each record_id must appear exactly once."""
615 all_ids: list[str] = []
616 cursor = 0
617
618 for page in range(3):
619 page_records = [
620 {"kind": "heartbeat", "record_id": _new_id(),
621 "run_id": f"r-{page}-{i}", "payload": {}}
622 for i in range(100)
623 ]
624 new_cursor = (page + 1) * 100
625 with patch(_PULL_TARGET, return_value={"records": page_records, "cursor": new_cursor}), \
626 patch(_REQUIRE_REPO, return_value=repo), \
627 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
628 result = runner.invoke(
629 cli, _PULL_ARGS + ["--since-id", str(cursor), "-j"]
630 )
631 data = json.loads(result.output.strip())
632 all_ids.extend(r["record_id"] for r in data["records"])
633 cursor = data["cursor"]
634
635 assert len(all_ids) == len(set(all_ids)), "Duplicate record_ids across pages!"
636
637 def test_cursor_in_json_output_matches_hub_cursor(self, repo: pathlib.Path) -> None:
638 expected_cursor = 77777
639 with patch(_PULL_TARGET, return_value={"records": [], "cursor": expected_cursor}), \
640 patch(_REQUIRE_REPO, return_value=repo), \
641 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
642 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
643 data = json.loads(result.output.strip())
644 assert data["cursor"] == expected_cursor
645
646 def test_since_id_zero_returns_all_records_from_beginning(self, repo: pathlib.Path) -> None:
647 records = [
648 {"kind": "task", "record_id": _new_id(), "run_id": "r",
649 "payload": {"i": i}}
650 for i in range(50)
651 ]
652 with patch(_PULL_TARGET, return_value={"records": records, "cursor": 50}), \
653 patch(_REQUIRE_REPO, return_value=repo), \
654 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
655 result = runner.invoke(cli, _PULL_ARGS + ["--since-id", "0", "-j"])
656 data = json.loads(result.output.strip())
657 assert data["count"] == 50
658 assert data["cursor"] == 50
659
660
661 # ===========================================================================
662 # 4. RESILIENCE
663 # ===========================================================================
664
665
666 class TestCoordIntegrityResilience:
667 """Corrupt files, disk errors, and race conditions must never block good data."""
668
669 def test_corrupt_json_skipped_others_gathered(self, repo: pathlib.Path) -> None:
670 subdir = _coord_dir(repo) / "reservations"
671 subdir.mkdir(parents=True, exist_ok=True)
672 good_uid = _new_id()
673 (subdir / f"{good_uid}.json").write_text(
674 json.dumps({"reservation_id": good_uid, "run_id": "r"}), encoding="utf-8"
675 )
676 (subdir / "corrupt.json").write_text("{not valid json !!!", encoding="utf-8")
677 gathered = _gather_local_records(repo, ["reservation"])
678 assert len(gathered) == 1
679 assert gathered[0]["record_id"] == good_uid
680
681 def test_binary_garbage_file_skipped(self, repo: pathlib.Path) -> None:
682 subdir = _coord_dir(repo) / "heartbeats"
683 subdir.mkdir(parents=True, exist_ok=True)
684 uid = _new_id()
685 (subdir / f"{uid}.json").write_text(
686 json.dumps({"run_id": uid}), encoding="utf-8"
687 )
688 (subdir / "binary.json").write_bytes(bytes(range(256)))
689 gathered = _gather_local_records(repo, ["heartbeat"])
690 assert len(gathered) == 1
691
692 def test_null_byte_in_file_skipped(self, repo: pathlib.Path) -> None:
693 subdir = _coord_dir(repo) / "tasks"
694 subdir.mkdir(parents=True, exist_ok=True)
695 uid = _new_id()
696 (subdir / f"{uid}.json").write_text(
697 json.dumps({"task_id": uid, "run_id": "r"}), encoding="utf-8"
698 )
699 (subdir / "nullbyte.json").write_bytes(b'{"task_id": "x\x00y"}')
700 gathered = _gather_local_records(repo, ["task"])
701 # Null byte may or may not parse; either way, only 1 valid record matters
702 assert any(g["record_id"] == uid for g in gathered)
703
704 def test_file_deleted_mid_glob_skipped_gracefully(self, repo: pathlib.Path) -> None:
705 """Simulate FileNotFoundError between glob and read_text."""
706 subdir = _coord_dir(repo) / "reservations"
707 subdir.mkdir(parents=True, exist_ok=True)
708 uid = _new_id()
709 ghost = subdir / f"{uid}.json"
710 ghost.write_text(json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8")
711
712 _original_read_text = pathlib.Path.read_text
713
714 def raise_on_ghost(self: pathlib.Path, encoding: str | None = None, errors: str | None = None) -> str:
715 if self == ghost:
716 raise FileNotFoundError("deleted mid-glob")
717 return _original_read_text(self, encoding=encoding, errors=errors)
718
719 with patch.object(pathlib.Path, "read_text", raise_on_ghost):
720 gathered = _gather_local_records(repo, ["reservation"])
721 assert gathered == []
722
723 def test_all_files_corrupt_zero_records_push_exits_0(self, repo: pathlib.Path) -> None:
724 subdir = _coord_dir(repo) / "reservations"
725 subdir.mkdir(parents=True, exist_ok=True)
726 for i in range(10):
727 (subdir / f"bad_{i}.json").write_text("{{broken", encoding="utf-8")
728 with patch(_REQUIRE_REPO, return_value=repo), \
729 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
730 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
731 assert result.exit_code == 0
732 data = json.loads(result.output.strip())
733 assert data["total"] == 0
734 assert data["failed"] is False
735
736 def test_500_good_500_corrupt_only_good_pushed(self, repo: pathlib.Path) -> None:
737 subdir = _coord_dir(repo) / "reservations"
738 subdir.mkdir(parents=True, exist_ok=True)
739 good_uids = set()
740 for _ in range(500):
741 uid = _new_id()
742 good_uids.add(uid)
743 (subdir / f"{uid}.json").write_text(
744 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
745 )
746 for i in range(500):
747 (subdir / f"corrupt_{i}.json").write_text("{bad", encoding="utf-8")
748
749 gathered = _gather_local_records(repo, ["reservation"])
750 assert len(gathered) == 500
751 assert {g["record_id"] for g in gathered} == good_uids
752
753 def test_disk_full_on_write_remote_propagates(self, repo: pathlib.Path) -> None:
754 """OSError from write_text_atomic must NOT be swallowed."""
755 uid = _new_id()
756 records = [{"kind": "reservation", "record_id": uid, "run_id": "r",
757 "payload": {}, "expires_at": None}]
758 with patch("muse.cli.commands.coord_sync.write_text_atomic",
759 side_effect=OSError("No space left")):
760 with pytest.raises(OSError, match="No space left"):
761 _write_remote_records(repo, records)
762
763 def test_permission_error_on_mkdir_propagates(self, repo: pathlib.Path) -> None:
764 uid = _new_id()
765 records = [{"kind": "heartbeat", "record_id": uid, "run_id": "r",
766 "payload": {}, "expires_at": None}]
767 with patch.object(pathlib.Path, "mkdir", side_effect=PermissionError("read-only")):
768 with pytest.raises(PermissionError, match="read-only"):
769 _write_remote_records(repo, records)
770
771 def test_empty_coordination_dir_returns_no_records(self, repo: pathlib.Path) -> None:
772 # No coord dir at all
773 gathered = _gather_local_records(repo, list(_ALL_KINDS))
774 assert gathered == []
775
776 def test_empty_kind_subdir_returns_no_records(self, repo: pathlib.Path) -> None:
777 (_coord_dir(repo) / "reservations").mkdir(parents=True, exist_ok=True)
778 gathered = _gather_local_records(repo, ["reservation"])
779 assert gathered == []
780
781
782 # ===========================================================================
783 # 5. CONCURRENCY
784 # ===========================================================================
785
786
787 class TestCoordIntegrityConcurrency:
788 """No data races, no corruption under concurrent access."""
789
790 def test_16_threads_gather_same_records(self, repo: pathlib.Path) -> None:
791 """All threads must see the same number of records."""
792 subdir = _coord_dir(repo) / "reservations"
793 subdir.mkdir(parents=True, exist_ok=True)
794 for _ in range(200):
795 uid = _new_id()
796 (subdir / f"{uid}.json").write_text(
797 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
798 )
799 counts: list[int] = []
800 errors: list[str] = []
801
802 def worker() -> None:
803 try:
804 recs = _gather_local_records(repo, ["reservation"])
805 counts.append(len(recs))
806 except Exception as exc:
807 errors.append(str(exc))
808
809 threads = [threading.Thread(target=worker) for _ in range(16)]
810 for t in threads:
811 t.start()
812 for t in threads:
813 t.join()
814
815 assert not errors, f"Errors: {errors}"
816 assert all(c == 200 for c in counts), f"Count mismatch: {counts}"
817
818 def test_8_threads_write_distinct_record_ids_all_files_present(self, repo: pathlib.Path) -> None:
819 """8 writers, each writing a unique record_id → all 8 files must exist."""
820 uids = [_new_id() for _ in range(8)]
821 errors: list[str] = []
822
823 def worker(uid: str) -> None:
824 try:
825 rec = {"kind": "reservation", "record_id": uid, "run_id": "r",
826 "payload": {"uid": uid}, "expires_at": None}
827 _write_remote_records(repo, [rec])
828 except Exception as exc:
829 errors.append(f"{uid}: {exc}")
830
831 threads = [threading.Thread(target=worker, args=(u,)) for u in uids]
832 for t in threads:
833 t.start()
834 for t in threads:
835 t.join()
836
837 assert not errors, f"Errors: {errors}"
838 remote = coordination_dir(repo) / "remote" / "reservation"
839 written = {f.stem for f in remote.glob("*.json")}
840 assert written == set(uids), f"Missing: {set(uids) - written}"
841
842 def test_8_threads_write_same_record_id_final_file_is_valid_json(
843 self, repo: pathlib.Path
844 ) -> None:
845 """Last-writer-wins on same record_id must produce valid JSON, not corruption."""
846 uid = _new_id()
847 errors: list[str] = []
848
849 def worker(i: int) -> None:
850 try:
851 rec = {"kind": "task", "record_id": uid, "run_id": f"r{i}",
852 "payload": {"version": i}, "expires_at": None}
853 _write_remote_records(repo, [rec])
854 except Exception as exc:
855 errors.append(f"Thread {i}: {exc}")
856
857 threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
858 for t in threads:
859 t.start()
860 for t in threads:
861 t.join()
862
863 target = coordination_dir(repo) / "remote" / "task" / f"{uid}.json"
864 assert target.exists()
865 content = target.read_text(encoding="utf-8")
866 loaded = json.loads(content) # Must not raise — file must be valid JSON
867 assert loaded["record_id"] == uid
868
869 def test_run_push_8_concurrent_threads_no_crashes(self, repo: pathlib.Path) -> None:
870 subdir = _coord_dir(repo) / "reservations"
871 subdir.mkdir(parents=True, exist_ok=True)
872 for _ in range(50):
873 uid = _new_id()
874 (subdir / f"{uid}.json").write_text(
875 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
876 )
877
878 errors: list[str] = []
879
880 def worker(idx: int) -> None:
881 args = argparse.Namespace(
882 hub="https://localhost:1337",
883 owner="gabriel",
884 slug="linux",
885 signing=None,
886 kinds=["reservation"],
887 json_out=False,
888 )
889 try:
890 run_push(args)
891 except SystemExit as exc:
892 if exc.code not in (0, 1):
893 errors.append(f"Thread {idx}: unexpected exit {exc.code}")
894 except Exception as exc:
895 errors.append(f"Thread {idx}: {exc}")
896
897 push_p = patch(_PUSH_TARGET, return_value=_push_ok(50))
898 repo_p = patch(_REQUIRE_REPO, return_value=repo)
899 hub_p = patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok"))
900 push_p.start(); repo_p.start(); hub_p.start()
901 try:
902 threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
903 for t in threads:
904 t.start()
905 for t in threads:
906 t.join()
907 finally:
908 push_p.stop(); repo_p.stop(); hub_p.stop()
909 assert not errors, f"Concurrent push errors: {errors}"
910
911 def test_run_pull_8_concurrent_threads_no_crashes(self, repo: pathlib.Path) -> None:
912 errors: list[str] = []
913
914 def worker(idx: int) -> None:
915 args = argparse.Namespace(
916 hub="https://localhost:1337",
917 owner="gabriel",
918 slug="linux",
919 signing=None,
920 since_id=0,
921 kinds=[],
922 limit=500,
923 json_out=False,
924 )
925 try:
926 run_pull(args)
927 except SystemExit as exc:
928 if exc.code not in (0, 1):
929 errors.append(f"Thread {idx}: unexpected exit {exc.code}")
930 except Exception as exc:
931 errors.append(f"Thread {idx}: {exc}")
932
933 pull_p = patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0))
934 repo_p = patch(_REQUIRE_REPO, return_value=repo)
935 hub_p = patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok"))
936 pull_p.start(); repo_p.start(); hub_p.start()
937 try:
938 threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
939 for t in threads:
940 t.start()
941 for t in threads:
942 t.join()
943 finally:
944 pull_p.stop(); repo_p.stop(); hub_p.stop()
945 assert not errors, f"Concurrent pull errors: {errors}"
946
947
948 # ===========================================================================
949 # 6. LINUX-SCALE
950 # ===========================================================================
951
952
953 class TestCoordIntegrityScale:
954 """Linux-scale operations: thousands of records across all 7 kinds."""
955
956 _KIND_SUBDIR = {
957 "reservation": "reservations",
958 "intent": "intents",
959 "release": "releases",
960 "heartbeat": "heartbeats",
961 "dependency": "dependencies",
962 "task": "tasks",
963 "claim": "claims",
964 }
965 _KIND_ID_FIELD = {
966 "reservation": "reservation_id",
967 "intent": "intent_id",
968 "release": "release_id",
969 "heartbeat": "run_id",
970 "dependency": "reservation_id",
971 "task": "task_id",
972 "claim": "task_id",
973 }
974
975 def _populate_all_kinds(self, repo: pathlib.Path, count: int) -> None:
976 for kind in _ALL_KINDS:
977 subdir = _coord_dir(repo) / self._KIND_SUBDIR[kind]
978 subdir.mkdir(parents=True, exist_ok=True)
979 id_field = self._KIND_ID_FIELD[kind]
980 for _ in range(count):
981 uid = _new_id()
982 rec: MsgpackDict = {id_field: uid, "run_id": "r"}
983 if kind == "claim":
984 rec["claimer_run_id"] = "cr"
985 (subdir / f"{uid}.json").write_text(json.dumps(rec), encoding="utf-8")
986
987 def test_7000_records_all_7_kinds_gathered(self, repo: pathlib.Path) -> None:
988 """1000 of each kind (7000 total) — all must be gathered."""
989 self._populate_all_kinds(repo, 1000)
990
991 gathered = _gather_local_records(repo, list(_ALL_KINDS))
992 assert len(gathered) == 7000
993 by_kind = {}
994 for g in gathered:
995 by_kind[g["kind"]] = by_kind.get(g["kind"], 0) + 1
996 assert all(by_kind.get(k, 0) == 1000 for k in _ALL_KINDS), f"Kind counts: {by_kind}"
997
998 def test_7000_records_batched_and_pushed_14_calls(self, repo: pathlib.Path) -> None:
999 self._populate_all_kinds(repo, 1000)
1000
1001 call_count = 0
1002 total_sent = 0
1003
1004 def count_calls(hub_url: str, owner: str, slug: str, records: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
1005 nonlocal call_count, total_sent
1006 call_count += 1
1007 total_sent += len(records)
1008 return _push_ok(len(records))
1009
1010 with patch(_PUSH_TARGET, side_effect=count_calls), \
1011 patch(_REQUIRE_REPO, return_value=repo), \
1012 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1013 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1014
1015 assert result.exit_code == 0
1016 assert call_count == 14 # ceil(7000/500) = 14
1017 assert total_sent == 7000
1018
1019 def test_1000_record_pull_all_files_written(self, repo: pathlib.Path) -> None:
1020 """Pull returns MAX_PULL_LIMIT records — all written to remote dir."""
1021 records = [
1022 {"kind": "reservation", "record_id": _new_id(),
1023 "run_id": f"r{i}", "payload": {}, "expires_at": None}
1024 for i in range(1000)
1025 ]
1026 record_ids = {r["record_id"] for r in records}
1027 with patch(_PULL_TARGET, return_value={"records": records, "cursor": 1000}), \
1028 patch(_REQUIRE_REPO, return_value=repo), \
1029 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1030 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1031 assert result.exit_code == 0
1032 remote = coordination_dir(repo) / "remote" / "reservation"
1033 written = {f.stem for f in remote.glob("*.json")}
1034 assert written == record_ids
1035
1036 def test_mixed_1000_reservations_1000_heartbeats_total_2000(
1037 self, repo: pathlib.Path
1038 ) -> None:
1039 res_subdir = _coord_dir(repo) / "reservations"
1040 hb_subdir = _coord_dir(repo) / "heartbeats"
1041 res_subdir.mkdir(parents=True, exist_ok=True)
1042 hb_subdir.mkdir(parents=True, exist_ok=True)
1043 for _ in range(1000):
1044 uid = _new_id()
1045 (res_subdir / f"{uid}.json").write_text(
1046 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1047 )
1048 uid2 = _new_id()
1049 (hb_subdir / f"{uid2}.json").write_text(
1050 json.dumps({"run_id": uid2}), encoding="utf-8"
1051 )
1052 gathered = _gather_local_records(repo, ["reservation", "heartbeat"])
1053 assert len(gathered) == 2000
1054
1055 def test_50_sequential_push_pull_cycles_integrity(self, repo: pathlib.Path) -> None:
1056 """50 cycles: push 10, pull 10, verify no data loss across cycles."""
1057 subdir = _coord_dir(repo) / "reservations"
1058 subdir.mkdir(parents=True, exist_ok=True)
1059 total_inserted = 0
1060 total_pulled = 0
1061 cursor = 0
1062
1063 for cycle in range(50):
1064 # Write 10 new records
1065 cycle_uids = []
1066 for _ in range(10):
1067 uid = _new_id()
1068 cycle_uids.append(uid)
1069 (subdir / f"{uid}.json").write_text(
1070 json.dumps({"reservation_id": uid, "run_id": f"r{cycle}"}),
1071 encoding="utf-8",
1072 )
1073
1074 # Push
1075 with patch(_PUSH_TARGET, return_value=_push_ok(10)), \
1076 patch(_REQUIRE_REPO, return_value=repo), \
1077 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1078 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1079 assert result.exit_code == 0
1080 push_data = json.loads(result.output.strip())
1081 # total accumulates: gather reads ALL files in subdir each cycle
1082 total_inserted += push_data["inserted"]
1083
1084 # Pull 10 new records
1085 pull_records = [
1086 {"kind": "reservation", "record_id": u, "run_id": f"r{cycle}",
1087 "payload": {}, "expires_at": None}
1088 for u in cycle_uids
1089 ]
1090 cursor += 10
1091 with patch(_PULL_TARGET, return_value={"records": pull_records, "cursor": cursor}), \
1092 patch(_REQUIRE_REPO, return_value=repo), \
1093 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1094 result = runner.invoke(
1095 cli, _PULL_ARGS + ["--since-id", str(cursor - 10), "-j"]
1096 )
1097 assert result.exit_code == 0
1098 pull_data = json.loads(result.output.strip())
1099 total_pulled += pull_data["count"]
1100
1101 assert total_pulled == 500 # 50 cycles × 10 records each
1102
1103
1104 # ===========================================================================
1105 # 7. RESPONSE BOUNDS
1106 # ===========================================================================
1107
1108
1109 class TestCoordIntegrityResponseBounds:
1110 """Hub response edge cases — missing keys, oversized bodies, malformed JSON."""
1111
1112 def test_response_missing_inserted_key_defaults_to_zero(
1113 self, repo: pathlib.Path
1114 ) -> None:
1115 with patch(_PUSH_TARGET, return_value={"skipped": 3}), \
1116 patch(_REQUIRE_REPO, return_value=repo), \
1117 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1118 subdir = _coord_dir(repo) / "reservations"
1119 subdir.mkdir(parents=True, exist_ok=True)
1120 uid = _new_id()
1121 (subdir / f"{uid}.json").write_text(
1122 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1123 )
1124 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1125 assert result.exit_code == 0
1126 data = json.loads(result.output.strip())
1127 assert data["inserted"] == 0 # defaults to 0
1128 assert data["skipped"] == 3
1129
1130 def test_response_missing_skipped_key_defaults_to_zero(
1131 self, repo: pathlib.Path
1132 ) -> None:
1133 with patch(_PUSH_TARGET, return_value={"inserted": 5}), \
1134 patch(_REQUIRE_REPO, return_value=repo), \
1135 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1136 subdir = _coord_dir(repo) / "reservations"
1137 subdir.mkdir(parents=True, exist_ok=True)
1138 uid = _new_id()
1139 (subdir / f"{uid}.json").write_text(
1140 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1141 )
1142 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1143 assert result.exit_code == 0
1144 data = json.loads(result.output.strip())
1145 assert data["inserted"] == 5
1146 assert data["skipped"] == 0 # defaults to 0
1147
1148 def test_response_extra_keys_silently_ignored(self, repo: pathlib.Path) -> None:
1149 resp = {"inserted": 1, "skipped": 0, "unknown_future_key": "value", "debug": {}}
1150 with patch(_PUSH_TARGET, return_value=resp), \
1151 patch(_REQUIRE_REPO, return_value=repo), \
1152 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1153 subdir = _coord_dir(repo) / "reservations"
1154 subdir.mkdir(parents=True, exist_ok=True)
1155 uid = _new_id()
1156 (subdir / f"{uid}.json").write_text(
1157 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1158 )
1159 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1160 assert result.exit_code == 0
1161
1162 def test_coord_bus_error_on_push_exits_1(self, repo: pathlib.Path) -> None:
1163 with patch(_PUSH_TARGET, side_effect=CoordBusError("service unavailable", status_code=503)), \
1164 patch(_REQUIRE_REPO, return_value=repo), \
1165 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1166 subdir = _coord_dir(repo) / "reservations"
1167 subdir.mkdir(parents=True, exist_ok=True)
1168 uid = _new_id()
1169 (subdir / f"{uid}.json").write_text(
1170 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1171 )
1172 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1173 assert result.exit_code == 1
1174 last_line = [l for l in result.output.strip().splitlines() if l.strip()][-1]
1175 data = json.loads(last_line)
1176 assert data["failed"] is True
1177
1178 def test_coord_bus_error_on_pull_exits_1(self, repo: pathlib.Path) -> None:
1179 with patch(_PULL_TARGET, side_effect=CoordBusError("gateway timeout", status_code=504)), \
1180 patch(_REQUIRE_REPO, return_value=repo), \
1181 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1182 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1183 assert result.exit_code == 1
1184
1185 def test_pull_response_missing_records_key_no_files_written(
1186 self, repo: pathlib.Path
1187 ) -> None:
1188 with patch(_PULL_TARGET, return_value={"cursor": 0}), \
1189 patch(_REQUIRE_REPO, return_value=repo), \
1190 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1191 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1192 assert result.exit_code == 0
1193 data = json.loads(result.output.strip())
1194 assert data["count"] == 0
1195
1196 def test_pull_response_missing_cursor_key_defaults_to_zero(
1197 self, repo: pathlib.Path
1198 ) -> None:
1199 records = [
1200 {"kind": "task", "record_id": _new_id(), "run_id": "r", "payload": {}}
1201 ]
1202 with patch(_PULL_TARGET, return_value={"records": records}), \
1203 patch(_REQUIRE_REPO, return_value=repo), \
1204 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1205 result = runner.invoke(cli, _PULL_ARGS + ["-j"])
1206 assert result.exit_code == 0
1207 data = json.loads(result.output.strip())
1208 assert data["cursor"] == 0 # defaults to 0
1209
1210
1211 # ===========================================================================
1212 # 8. FILESYSTEM SAFETY
1213 # ===========================================================================
1214
1215
1216 class TestCoordIntegrityFilesystem:
1217 """Path traversal prevention, record_id validation, remote dir creation."""
1218
1219 def test_traversal_record_id_does_not_escape_remote_dir(self, repo: pathlib.Path) -> None:
1220 malicious = "../../../etc/passwd"
1221 records = [{"kind": "reservation", "record_id": malicious, "run_id": "r",
1222 "payload": {}, "expires_at": None}]
1223 _write_remote_records(repo, records)
1224 # Must not exist outside remote/reservation/
1225 assert not (coordination_dir(repo) / "remote" / "reservation" /
1226 "../../etc/passwd").exists()
1227 # The remote dir might or might not exist; if it does, no malicious files
1228 remote = coordination_dir(repo) / "remote"
1229 if remote.exists():
1230 all_files = list(remote.rglob("*.json"))
1231 for f in all_files:
1232 assert ".." not in str(f), f"Traversal escaped: {f}"
1233
1234 def test_traversal_record_id_null_byte_rejected(self, repo: pathlib.Path) -> None:
1235 malicious = "foo\x00bar"
1236 records = [{"kind": "task", "record_id": malicious, "run_id": "r",
1237 "payload": {}, "expires_at": None}]
1238 _write_remote_records(repo, records)
1239 remote = coordination_dir(repo) / "remote"
1240 if remote.exists():
1241 assert list(remote.rglob("*.json")) == []
1242
1243 def test_record_id_128_chars_accepted(self, repo: pathlib.Path) -> None:
1244 uid = "a" * 128
1245 records = [{"kind": "reservation", "record_id": uid, "run_id": "r",
1246 "payload": {}, "expires_at": None}]
1247 _write_remote_records(repo, records)
1248 target = coordination_dir(repo) / "remote" / "reservation" / f"{uid}.json"
1249 assert target.exists()
1250
1251 def test_record_id_129_chars_rejected(self, repo: pathlib.Path) -> None:
1252 uid = "a" * 129
1253 assert not _SAFE_RECORD_ID_RE.match(uid), "Regex should reject 129-char record_id"
1254 records = [{"kind": "reservation", "record_id": uid, "run_id": "r",
1255 "payload": {}, "expires_at": None}]
1256 _write_remote_records(repo, records)
1257 remote = coordination_dir(repo) / "remote"
1258 if remote.exists():
1259 assert list(remote.rglob("*.json")) == []
1260
1261 def test_kind_traversal_rejected(self, repo: pathlib.Path) -> None:
1262 records = [{"kind": "../traversal", "record_id": "safe-id", "run_id": "r",
1263 "payload": {}, "expires_at": None}]
1264 _write_remote_records(repo, records)
1265 remote = coordination_dir(repo) / "remote"
1266 if remote.exists():
1267 assert list(remote.rglob("*.json")) == []
1268
1269 def test_empty_kind_rejected(self, repo: pathlib.Path) -> None:
1270 records = [{"kind": "", "record_id": "safe-id", "run_id": "r",
1271 "payload": {}, "expires_at": None}]
1272 _write_remote_records(repo, records)
1273 remote = coordination_dir(repo) / "remote"
1274 if remote.exists():
1275 assert list(remote.rglob("*.json")) == []
1276
1277 def test_empty_record_id_rejected(self, repo: pathlib.Path) -> None:
1278 records = [{"kind": "reservation", "record_id": "", "run_id": "r",
1279 "payload": {}, "expires_at": None}]
1280 _write_remote_records(repo, records)
1281 remote = coordination_dir(repo) / "remote"
1282 if remote.exists():
1283 assert list(remote.rglob("*.json")) == []
1284
1285 def test_remote_dir_created_automatically(self, repo: pathlib.Path) -> None:
1286 uid = _new_id()
1287 records = [{"kind": "reservation", "record_id": uid, "run_id": "r",
1288 "payload": {}, "expires_at": None}]
1289 remote = coordination_dir(repo) / "remote"
1290 assert not remote.exists()
1291 _write_remote_records(repo, records)
1292 assert remote.exists()
1293 assert (remote / "reservation" / f"{uid}.json").exists()
1294
1295 def test_existing_remote_file_overwritten(self, repo: pathlib.Path) -> None:
1296 uid = _new_id()
1297 rec_v1 = {"kind": "reservation", "record_id": uid, "run_id": "r",
1298 "payload": {"version": 1}, "expires_at": None}
1299 rec_v2 = {"kind": "reservation", "record_id": uid, "run_id": "r",
1300 "payload": {"version": 2}, "expires_at": None}
1301 _write_remote_records(repo, [rec_v1])
1302 _write_remote_records(repo, [rec_v2])
1303 target = coordination_dir(repo) / "remote" / "reservation" / f"{uid}.json"
1304 loaded = json.loads(target.read_text())
1305 assert loaded["payload"]["version"] == 2
1306
1307 def test_written_files_are_valid_json(self, repo: pathlib.Path) -> None:
1308 records = [
1309 {"kind": k, "record_id": _new_id(), "run_id": "r",
1310 "payload": {"test": True}, "expires_at": None}
1311 for k in _ALL_KINDS
1312 ]
1313 _write_remote_records(repo, records)
1314 remote = coordination_dir(repo) / "remote"
1315 for fpath in remote.rglob("*.json"):
1316 content = fpath.read_text(encoding="utf-8")
1317 json.loads(content) # Must not raise
1318
1319 def test_written_files_end_with_newline(self, repo: pathlib.Path) -> None:
1320 uid = _new_id()
1321 records = [{"kind": "reservation", "record_id": uid, "run_id": "r",
1322 "payload": {}, "expires_at": None}]
1323 _write_remote_records(repo, records)
1324 target = coordination_dir(repo) / "remote" / "reservation" / f"{uid}.json"
1325 assert target.read_text(encoding="utf-8").endswith("\n")
1326
1327 def test_symlink_json_file_data_read_correctly(self, repo: pathlib.Path) -> None:
1328 """A symlink to a valid JSON file must be read as a normal record."""
1329 uid = _new_id()
1330 real_file = repo / "real_record.json"
1331 real_file.write_text(
1332 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1333 )
1334 subdir = _coord_dir(repo) / "reservations"
1335 subdir.mkdir(parents=True, exist_ok=True)
1336 link = subdir / f"{uid}.json"
1337 link.symlink_to(real_file)
1338 gathered = _gather_local_records(repo, ["reservation"])
1339 assert len(gathered) == 1
1340 assert gathered[0]["record_id"] == uid
1341
1342
1343 # ===========================================================================
1344 # 9. IDEMPOTENCY
1345 # ===========================================================================
1346
1347
1348 class TestCoordIntegrityIdempotency:
1349 """Same operations must always produce the same result — no ghost state."""
1350
1351 def test_gather_is_pure_same_dir_same_result(self, repo: pathlib.Path) -> None:
1352 subdir = _coord_dir(repo) / "reservations"
1353 subdir.mkdir(parents=True, exist_ok=True)
1354 for _ in range(20):
1355 uid = _new_id()
1356 (subdir / f"{uid}.json").write_text(
1357 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1358 )
1359 first = _gather_local_records(repo, ["reservation"])
1360 second = _gather_local_records(repo, ["reservation"])
1361 assert len(first) == len(second) == 20
1362 ids_first = {r["record_id"] for r in first}
1363 ids_second = {r["record_id"] for r in second}
1364 assert ids_first == ids_second
1365
1366 def test_push_same_500_records_twice_second_all_skipped(
1367 self, repo: pathlib.Path
1368 ) -> None:
1369 subdir = _coord_dir(repo) / "reservations"
1370 subdir.mkdir(parents=True, exist_ok=True)
1371 for _ in range(500):
1372 uid = _new_id()
1373 (subdir / f"{uid}.json").write_text(
1374 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1375 )
1376
1377 # First push: all inserted
1378 with patch(_PUSH_TARGET, return_value=_push_ok(500, 0)), \
1379 patch(_REQUIRE_REPO, return_value=repo), \
1380 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1381 result1 = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1382 data1 = json.loads(result1.output.strip())
1383 assert data1["inserted"] == 500
1384
1385 # Second push: all skipped (hub deduplicates)
1386 with patch(_PUSH_TARGET, return_value=_push_ok(0, 500)), \
1387 patch(_REQUIRE_REPO, return_value=repo), \
1388 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1389 result2 = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1390 data2 = json.loads(result2.output.strip())
1391 assert data2["inserted"] == 0
1392 assert data2["skipped"] == 500
1393 assert result2.exit_code == 0
1394
1395 def test_push_partial_overlap_correct_counts(self, repo: pathlib.Path) -> None:
1396 subdir = _coord_dir(repo) / "reservations"
1397 subdir.mkdir(parents=True, exist_ok=True)
1398 for _ in range(500):
1399 uid = _new_id()
1400 (subdir / f"{uid}.json").write_text(
1401 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1402 )
1403 # Hub says 250 were new, 250 it already had
1404 with patch(_PUSH_TARGET, return_value=_push_ok(250, 250)), \
1405 patch(_REQUIRE_REPO, return_value=repo), \
1406 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1407 result = runner.invoke(cli, _PUSH_ARGS + ["-j"])
1408 data = json.loads(result.output.strip())
1409 assert data["inserted"] == 250
1410 assert data["skipped"] == 250
1411
1412 def test_write_remote_same_record_id_second_overwrites_first(
1413 self, repo: pathlib.Path
1414 ) -> None:
1415 uid = _new_id()
1416 _write_remote_records(repo, [
1417 {"kind": "reservation", "record_id": uid, "run_id": "first",
1418 "payload": {"seq": 1}, "expires_at": None}
1419 ])
1420 _write_remote_records(repo, [
1421 {"kind": "reservation", "record_id": uid, "run_id": "second",
1422 "payload": {"seq": 2}, "expires_at": None}
1423 ])
1424 target = coordination_dir(repo) / "remote" / "reservation" / f"{uid}.json"
1425 loaded = json.loads(target.read_text())
1426 assert loaded["run_id"] == "second"
1427 assert loaded["payload"]["seq"] == 2
1428
1429 def test_pull_since_id_at_cursor_returns_zero_records(
1430 self, repo: pathlib.Path
1431 ) -> None:
1432 """Pulling with since_id=cursor should yield 0 new records."""
1433 with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=100)), \
1434 patch(_REQUIRE_REPO, return_value=repo), \
1435 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")):
1436 result = runner.invoke(cli, _PULL_ARGS + ["--since-id", "100", "-j"])
1437 data = json.loads(result.output.strip())
1438 assert data["count"] == 0
1439
1440
1441 # ===========================================================================
1442 # 10. TOKEN SAFETY
1443 # ===========================================================================
1444
1445
1446 class TestCoordIntegrityTokenSafety:
1447 """Auth token must NEVER appear in any output path under any condition."""
1448
1449 _SECRET = "ABSOLUTELY-SECRET-TOKEN-NEVER-LEAK-THIS-4xQzR7"
1450
1451 def _push_args_with_secret(self) -> list[str]:
1452 return [
1453 "coord", "sync", "push",
1454 "--hub", "https://localhost:1337",
1455 "--owner", "gabriel",
1456 "--slug", "linux",
1457 ]
1458
1459 def _pull_args_with_secret(self) -> list[str]:
1460 return [
1461 "coord", "sync", "pull",
1462 "--hub", "https://localhost:1337",
1463 "--owner", "gabriel",
1464 "--slug", "linux",
1465 "--since-id", "0",
1466 ]
1467
1468 def test_token_not_in_json_push_success(self, repo: pathlib.Path) -> None:
1469 with patch(_PUSH_TARGET, return_value=_push_ok(0)), \
1470 patch(_REQUIRE_REPO, return_value=repo), \
1471 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", self._SECRET)):
1472 result = runner.invoke(cli, self._push_args_with_secret() + ["-j"])
1473 assert self._SECRET not in result.output
1474
1475 def test_token_not_in_json_pull_success(self, repo: pathlib.Path) -> None:
1476 with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)), \
1477 patch(_REQUIRE_REPO, return_value=repo), \
1478 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", self._SECRET)):
1479 result = runner.invoke(cli, self._pull_args_with_secret() + ["-j"])
1480 assert self._SECRET not in result.output
1481
1482 def test_token_not_in_json_push_error(self, repo: pathlib.Path) -> None:
1483 with patch(_PUSH_TARGET, side_effect=CoordBusError("fail", status_code=500)), \
1484 patch(_REQUIRE_REPO, return_value=repo), \
1485 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", self._SECRET)):
1486 subdir = _coord_dir(repo) / "reservations"
1487 subdir.mkdir(parents=True, exist_ok=True)
1488 uid = _new_id()
1489 (subdir / f"{uid}.json").write_text(
1490 json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8"
1491 )
1492 result = runner.invoke(cli, self._push_args_with_secret() + ["-j"])
1493 # The secret must not appear in ANY line — error line or summary line.
1494 assert self._SECRET not in result.output
1495
1496 def test_token_not_in_json_pull_error(self, repo: pathlib.Path) -> None:
1497 with patch(_PULL_TARGET, side_effect=CoordBusError("fail", status_code=500)), \
1498 patch(_REQUIRE_REPO, return_value=repo), \
1499 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", self._SECRET)):
1500 result = runner.invoke(cli, self._pull_args_with_secret() + ["-j"])
1501 assert self._SECRET not in result.output
1502
1503 def test_token_not_in_text_push_success(self, repo: pathlib.Path) -> None:
1504 with patch(_PUSH_TARGET, return_value=_push_ok(0)), \
1505 patch(_REQUIRE_REPO, return_value=repo), \
1506 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", self._SECRET)):
1507 result = runner.invoke(cli, self._push_args_with_secret())
1508 assert self._SECRET not in result.output
1509
1510 def test_token_not_in_text_pull_success(self, repo: pathlib.Path) -> None:
1511 with patch(_PULL_TARGET, return_value=_pull_ok([], cursor=0)), \
1512 patch(_REQUIRE_REPO, return_value=repo), \
1513 patch(_RESOLVE_HUB, return_value=("https://localhost:1337", self._SECRET)):
1514 result = runner.invoke(cli, self._pull_args_with_secret())
1515 assert self._SECRET not in result.output
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago