gabriel / muse public
test_coord_push_null_counts.py python
794 lines 34.5 KB
Raw
sha256:06dba78c2a78e251b580422dd1fd547f3c8357ff18f7709a860873b2d24dbbbf chore: bump version to 0.2.0rc14 Sonnet 4.6 patch 16 hours ago
1 """
2 Tests for the bug: hub response with null/non-integer inserted/skipped counts
3 causes an unhandled TypeError or ValueError that escapes run_push as a raw
4 traceback instead of a clean CoordBusError.
5
6 Root cause (coord_bus.py::push_to_hub lines 233-234):
7
8 return {
9 "inserted": int(result.get("inserted", 0)), # BUG
10 "skipped": int(result.get("skipped", 0)), # BUG
11 }
12
13 When the hub returns {"inserted": null}:
14 - result.get("inserted", 0) → None (key EXISTS so default is NOT used)
15 - int(None) → TypeError
16
17 When the hub returns {"inserted": "three"}:
18 - int("three") → ValueError
19
20 Neither TypeError nor ValueError is CoordBusError.
21 run_push only catches CoordBusError — the exception escapes as a raw traceback.
22
23 Coverage:
24 Unit — push_to_hub directly, all bad-value variants
25 Integration — run_push with bad hub response (two layers deep)
26 End-to-end — CLI CliRunner invocation, asserts no traceback in output
27 Stress — 14-batch push, every batch returns a bad response
28 Performance — bad response handling overhead is negligible
29 Security — hub cannot cause arbitrary code exec via count field
30 Data integrity — partial-batch failures produce correct counts in output
31 """
32 from __future__ import annotations
33
34 import argparse
35 import json
36 import pathlib
37 import time
38 from io import BytesIO
39 from unittest.mock import MagicMock, patch
40
41 import pytest
42
43 from muse.core.types import MsgpackDict, MsgpackValue
44 from muse.core.paths import muse_dir
45
46 # ---------------------------------------------------------------------------
47 # Shared helpers
48 # ---------------------------------------------------------------------------
49
50 _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim")
51 _FUTURE_TS = "2099-12-31T23:59:59+00:00"
52
53
54 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
55 muse_dir(tmp_path).mkdir(parents=True, exist_ok=True)
56 return tmp_path
57
58
59 def _one_record() -> MsgpackDict:
60 return {
61 "kind": "reservation",
62 "record_id": "res-000001",
63 "run_id": "run-0",
64 "payload": {"reservation_id": "res-000001", "expires_at": _FUTURE_TS},
65 "expires_at": _FUTURE_TS,
66 }
67
68
69 def _make_http_response(body: MsgpackDict) -> BytesIO:
70 return BytesIO(json.dumps(body).encode())
71
72
73 def _run_push_with_hub_response(tmp_path: pathlib.Path, hub_response: MsgpackDict) -> tuple[int | None, str]:
74 """
75 Run run_push with _post_json mocked to return hub_response.
76 Returns (exit_code, stdout_captured).
77 exit_code is None if no SystemExit was raised (i.e. the function returned normally).
78 """
79 import io
80 import sys
81
82 root = _make_repo(tmp_path)
83
84 captured = io.StringIO()
85 exit_code = None
86
87 with patch("muse.cli.commands.coord_sync._gather_local_records",
88 return_value=[_one_record()]), \
89 patch("muse.core.coord_bus._post_json", return_value=hub_response), \
90 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
91 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
92 return_value=("https://localhost:1337", "tok")), \
93 patch("sys.stdout", captured):
94 args = argparse.Namespace(
95 owner="torvalds", slug="linux",
96 json_out=True, hub_url=None,
97 kinds=["reservation"],
98 )
99 try:
100 from muse.cli.commands.coord_sync import run_push
101 run_push(args)
102 except SystemExit as exc:
103 exit_code = exc.code
104 except Exception as exc:
105 # If a non-SystemExit exception escapes, surface it explicitly
106 return ("CRASH", f"{type(exc).__name__}: {exc}")
107
108 return (exit_code, captured.getvalue())
109
110
111 # =============================================================================
112 # 1. UNIT — push_to_hub directly
113 # =============================================================================
114
115 class TestPushToHubNullCountsUnit:
116 """
117 Unit tests on coord_bus.push_to_hub.
118 _post_json is mocked to return bad count values.
119 Assert that push_to_hub raises CoordBusError, NOT TypeError or ValueError.
120 """
121
122 @pytest.mark.parametrize("bad_inserted", [
123 None, # JSON null
124 "three", # non-numeric string
125 [], # list
126 {}, # dict
127 "1; drop table", # injection attempt
128 ])
129 def test_bad_inserted_raises_coord_bus_error_not_typeerror(self, bad_inserted: MsgpackValue) -> None:
130 from muse.core.coord_bus import push_to_hub, CoordBusError
131 with patch("muse.core.coord_bus._post_json",
132 return_value={"inserted": bad_inserted, "skipped": 0}):
133 with pytest.raises(CoordBusError):
134 push_to_hub("https://localhost:1337", "torvalds", "linux",
135 [_one_record()], signing=None)
136
137 @pytest.mark.parametrize("bad_skipped", [
138 None,
139 "three",
140 [],
141 {},
142 "1; drop table",
143 ])
144 def test_bad_skipped_raises_coord_bus_error_not_typeerror(self, bad_skipped: MsgpackValue) -> None:
145 from muse.core.coord_bus import push_to_hub, CoordBusError
146 with patch("muse.core.coord_bus._post_json",
147 return_value={"inserted": 1, "skipped": bad_skipped}):
148 with pytest.raises(CoordBusError):
149 push_to_hub("https://localhost:1337", "torvalds", "linux",
150 [_one_record()], signing=None)
151
152 def test_both_null_raises_coord_bus_error(self) -> None:
153 from muse.core.coord_bus import push_to_hub, CoordBusError
154 with patch("muse.core.coord_bus._post_json",
155 return_value={"inserted": None, "skipped": None}):
156 with pytest.raises(CoordBusError):
157 push_to_hub("https://localhost:1337", "torvalds", "linux",
158 [_one_record()], signing=None)
159
160 def test_null_never_raises_raw_typeerror(self) -> None:
161 """The specific confirmed bug: int(None) must not escape as TypeError."""
162 from muse.core.coord_bus import push_to_hub, CoordBusError
163 with patch("muse.core.coord_bus._post_json",
164 return_value={"inserted": None, "skipped": 0}):
165 try:
166 push_to_hub("https://localhost:1337", "torvalds", "linux",
167 [_one_record()], signing=None)
168 except CoordBusError:
169 pass # correct
170 except TypeError as exc:
171 pytest.fail(f"Raw TypeError escaped push_to_hub: {exc}")
172
173 def test_invalid_string_never_raises_raw_valueerror(self) -> None:
174 """int('three') must not escape as ValueError."""
175 from muse.core.coord_bus import push_to_hub, CoordBusError
176 with patch("muse.core.coord_bus._post_json",
177 return_value={"inserted": "three", "skipped": 0}):
178 try:
179 push_to_hub("https://localhost:1337", "torvalds", "linux",
180 [_one_record()], signing=None)
181 except CoordBusError:
182 pass # correct
183 except ValueError as exc:
184 pytest.fail(f"Raw ValueError escaped push_to_hub: {exc}")
185
186 # These should SUCCEED — confirm good values still work
187 def test_valid_integer_counts_pass_through(self) -> None:
188 from muse.core.coord_bus import push_to_hub
189 with patch("muse.core.coord_bus._post_json",
190 return_value={"inserted": 1, "skipped": 0}):
191 result = push_to_hub("https://localhost:1337", "torvalds", "linux",
192 [_one_record()], signing=None)
193 assert result == {"inserted": 1, "skipped": 0}
194
195 def test_float_count_truncated_to_int(self) -> None:
196 """Float counts are a hub bug but truncate cleanly when within bounds."""
197 from muse.core.coord_bus import push_to_hub
198 # Send 3 records so int(2.9)=2 and int(0.1)=0 both pass the bounds check
199 three_records = [_one_record(), _one_record(), _one_record()]
200 with patch("muse.core.coord_bus._post_json",
201 return_value={"inserted": 2.9, "skipped": 0.1}):
202 result = push_to_hub("https://localhost:1337", "torvalds", "linux",
203 three_records, signing=None)
204 assert result["inserted"] == 2
205 assert result["skipped"] == 0
206
207 def test_zero_counts_valid(self) -> None:
208 from muse.core.coord_bus import push_to_hub
209 with patch("muse.core.coord_bus._post_json",
210 return_value={"inserted": 0, "skipped": 0}):
211 result = push_to_hub("https://localhost:1337", "torvalds", "linux",
212 [_one_record()], signing=None)
213 assert result == {"inserted": 0, "skipped": 0}
214
215 def test_missing_both_keys_defaults_to_zero(self) -> None:
216 """Hub omits both keys entirely — already handled by .get default."""
217 from muse.core.coord_bus import push_to_hub
218 with patch("muse.core.coord_bus._post_json", return_value={}):
219 result = push_to_hub("https://localhost:1337", "torvalds", "linux",
220 [_one_record()], signing=None)
221 assert result == {"inserted": 0, "skipped": 0}
222
223 def test_negative_count_raises_coord_bus_error(self) -> None:
224 """Hub returning negative counts is a protocol violation."""
225 from muse.core.coord_bus import push_to_hub, CoordBusError
226 with patch("muse.core.coord_bus._post_json",
227 return_value={"inserted": -5, "skipped": 0}):
228 with pytest.raises(CoordBusError):
229 push_to_hub("https://localhost:1337", "torvalds", "linux",
230 [_one_record()], signing=None)
231
232 def test_count_exceeding_batch_size_raises_coord_bus_error(self) -> None:
233 """Hub claims it inserted more records than were sent — impossible."""
234 from muse.core.coord_bus import push_to_hub, CoordBusError, MAX_PUSH_BATCH
235 with patch("muse.core.coord_bus._post_json",
236 return_value={"inserted": MAX_PUSH_BATCH + 1, "skipped": 0}):
237 with pytest.raises(CoordBusError):
238 push_to_hub("https://localhost:1337", "torvalds", "linux",
239 [_one_record()], signing=None)
240
241
242 # =============================================================================
243 # 2. INTEGRATION — run_push with bad hub response (two layers deep)
244 # =============================================================================
245
246 class TestRunPushNullCountsIntegration:
247 """
248 Integration tests: run_push with _post_json mocked at the wire level.
249 Asserts clean exit (SystemExit(1) for hub errors) — never an unhandled exception.
250 """
251
252 def test_null_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None:
253 code, output = _run_push_with_hub_response(
254 tmp_path, {"inserted": None, "skipped": 0}
255 )
256 assert code != "CRASH", f"run_push crashed: {output}"
257
258 def test_null_skipped_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None:
259 code, output = _run_push_with_hub_response(
260 tmp_path, {"inserted": 1, "skipped": None}
261 )
262 assert code != "CRASH", f"run_push crashed: {output}"
263
264 def test_both_null_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None:
265 code, output = _run_push_with_hub_response(
266 tmp_path, {"inserted": None, "skipped": None}
267 )
268 assert code != "CRASH", f"run_push crashed: {output}"
269
270 def test_string_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None:
271 code, output = _run_push_with_hub_response(
272 tmp_path, {"inserted": "three", "skipped": 0}
273 )
274 assert code != "CRASH", f"run_push crashed: {output}"
275
276 def test_list_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None:
277 code, output = _run_push_with_hub_response(
278 tmp_path, {"inserted": [1, 2, 3], "skipped": 0}
279 )
280 assert code != "CRASH", f"run_push crashed: {output}"
281
282 def test_dict_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None:
283 code, output = _run_push_with_hub_response(
284 tmp_path, {"inserted": {"count": 1}, "skipped": 0}
285 )
286 assert code != "CRASH", f"run_push crashed: {output}"
287
288 def test_bad_response_exits_with_code_1(self, tmp_path: pathlib.Path) -> None:
289 """Bad hub response should be an error exit (code 1), not success (None/0)."""
290 code, output = _run_push_with_hub_response(
291 tmp_path, {"inserted": None, "skipped": None}
292 )
293 # run_push raises SystemExit(1) when failed=True; clean success returns None
294 assert code == 1, f"expected exit code 1 for bad hub response, got {code!r}"
295
296 def test_bad_response_json_output_has_failed_true(self, tmp_path: pathlib.Path) -> None:
297 """JSON output must have failed=true, not a raw exception message."""
298 code, output = _run_push_with_hub_response(
299 tmp_path, {"inserted": None, "skipped": None}
300 )
301 lines = [l for l in output.strip().splitlines() if l.strip()]
302 assert lines, "no output produced"
303 summary = json.loads(lines[-1])
304 assert summary.get("failed") is True, f"expected failed=true in {summary}"
305
306 def test_bad_response_output_contains_no_traceback(self, tmp_path: pathlib.Path) -> None:
307 """Traceback must never appear in stdout."""
308 _, output = _run_push_with_hub_response(
309 tmp_path, {"inserted": None, "skipped": None}
310 )
311 assert "Traceback" not in output, f"traceback leaked to stdout:\n{output}"
312 assert "TypeError" not in output, f"TypeError leaked to stdout:\n{output}"
313 assert "ValueError" not in output, f"ValueError leaked to stdout:\n{output}"
314
315 def test_good_response_still_works_after_fix(self, tmp_path: pathlib.Path) -> None:
316 """Valid hub response must still succeed after the fix is applied."""
317 code, output = _run_push_with_hub_response(
318 tmp_path, {"inserted": 1, "skipped": 0}
319 )
320 # run_push does not raise SystemExit on success — exit_code stays None
321 assert code in (0, None), f"expected clean exit for valid response, got {code!r}"
322 summary = json.loads(output.strip().splitlines()[-1])
323 assert summary["inserted"] == 1
324 assert summary["skipped"] == 0
325 assert summary["failed"] is False
326
327
328 # =============================================================================
329 # 3. END-TO-END — CLI output is valid JSON with no tracebacks
330 # =============================================================================
331
332 class TestRunPushNullCountsEndToEnd:
333 """
334 End-to-end: simulate what an operator would see at the terminal.
335 Output must be valid JSON, must contain no Python traceback text,
336 and the process must exit cleanly (no unhandled exception).
337 """
338
339 @pytest.mark.parametrize("bad_response", [
340 {"inserted": None, "skipped": None},
341 {"inserted": None, "skipped": 0},
342 {"inserted": 1, "skipped": None},
343 {"inserted": "bad", "skipped": 0},
344 {"inserted": [], "skipped": 0},
345 {}, # completely empty
346 {"other": "keys"}, # no inserted/skipped at all (already handled)
347 ])
348 def test_cli_output_is_valid_json_for_bad_hub_response(self, tmp_path: pathlib.Path, bad_response: MsgpackDict) -> None:
349 code, output = _run_push_with_hub_response(tmp_path, bad_response)
350 assert code != "CRASH", f"run_push crashed on {bad_response}: {output}"
351 lines = [l for l in output.strip().splitlines() if l.strip()]
352 assert lines, f"no output for hub response {bad_response}"
353 # Every output line must be valid JSON
354 for line in lines:
355 try:
356 json.loads(line)
357 except json.JSONDecodeError:
358 pytest.fail(f"non-JSON line in output for {bad_response}: {line!r}")
359
360 def test_cli_output_never_contains_exception_class_names(self, tmp_path: pathlib.Path) -> None:
361 for bad_val in [None, "bad", [], {}]:
362 _, output = _run_push_with_hub_response(
363 tmp_path, {"inserted": bad_val, "skipped": 0}
364 )
365 for forbidden in ("TypeError", "ValueError", "AttributeError",
366 "Traceback", "most recent call"):
367 assert forbidden not in output, (
368 f"{forbidden!r} leaked into CLI output for inserted={bad_val!r}:\n{output}"
369 )
370
371 def test_text_mode_also_clean_on_bad_response(self, tmp_path: pathlib.Path) -> None:
372 """Non-JSON (text) mode must also not crash."""
373 import io
374 import sys
375 root = _make_repo(tmp_path)
376 captured = io.StringIO()
377
378 with patch("muse.cli.commands.coord_sync._gather_local_records",
379 return_value=[_one_record()]), \
380 patch("muse.core.coord_bus._post_json",
381 return_value={"inserted": None, "skipped": None}), \
382 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
383 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
384 return_value=("https://localhost:1337", "tok")), \
385 patch("sys.stdout", captured):
386 args = argparse.Namespace(
387 owner="torvalds", slug="linux",
388 json_out=False, hub_url=None,
389 kinds=["reservation"],
390 )
391 try:
392 from muse.cli.commands.coord_sync import run_push
393 run_push(args)
394 except SystemExit:
395 pass
396 except Exception as exc:
397 pytest.fail(f"text mode crashed: {type(exc).__name__}: {exc}")
398
399 output = captured.getvalue()
400 assert "Traceback" not in output
401 assert "TypeError" not in output
402
403
404 # =============================================================================
405 # 4. STRESS — 14 batches, every batch returns a bad response
406 # =============================================================================
407
408 class TestRunPushNullCountsStress:
409 """
410 Stress tests: 14 batches (7000 records) all return null counts.
411 The system must not crash on any batch and must emit correct summary output.
412 """
413
414 def _run_push_n_batches(
415 self,
416 tmp_path: pathlib.Path,
417 n_records: int,
418 hub_responses: list[MsgpackDict],
419 ) -> tuple[int | None, MsgpackDict]:
420 import io
421 root = _make_repo(tmp_path)
422 records = [
423 {
424 "kind": "reservation",
425 "record_id": f"res-{i:06d}",
426 "run_id": f"run-{i}",
427 "payload": {},
428 "expires_at": _FUTURE_TS,
429 }
430 for i in range(n_records)
431 ]
432
433 response_iter = iter(hub_responses)
434
435 def fake_post_json(url: str, body: MsgpackDict, token: str) -> MsgpackDict:
436 try:
437 return next(response_iter)
438 except StopIteration:
439 return {"inserted": 0, "skipped": 0}
440
441 captured = io.StringIO()
442 exit_code = None
443
444 with patch("muse.cli.commands.coord_sync._gather_local_records",
445 return_value=records), \
446 patch("muse.core.coord_bus._post_json", side_effect=fake_post_json), \
447 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
448 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
449 return_value=("https://localhost:1337", "tok")), \
450 patch("sys.stdout", captured):
451 args = argparse.Namespace(
452 owner="torvalds", slug="linux",
453 json_out=True, hub_url=None,
454 kinds=["reservation"],
455 )
456 try:
457 from muse.cli.commands.coord_sync import run_push
458 run_push(args)
459 except SystemExit as exc:
460 exit_code = exc.code
461 except Exception as exc:
462 return ("CRASH", {})
463
464 lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()]
465 summary = json.loads(lines[-1]) if lines else {}
466 return (exit_code, summary)
467
468 def test_all_14_batches_return_null_no_crash(self, tmp_path: pathlib.Path) -> None:
469 from muse.core.coord_bus import MAX_PUSH_BATCH
470 n = MAX_PUSH_BATCH * 14 # 7000 records
471 responses = [{"inserted": None, "skipped": None}] * 14
472 code, summary = self._run_push_n_batches(tmp_path, n, responses)
473 assert code != "CRASH", "run_push crashed on 14 null-count batches"
474
475 def test_all_14_batches_return_null_exit_code_1(self, tmp_path: pathlib.Path) -> None:
476 from muse.core.coord_bus import MAX_PUSH_BATCH
477 n = MAX_PUSH_BATCH * 14
478 responses = [{"inserted": None, "skipped": None}] * 14
479 code, summary = self._run_push_n_batches(tmp_path, n, responses)
480 assert code == 1, f"expected exit 1 for all-null batches, got {code!r}"
481
482 def test_all_14_batches_return_null_failed_true_in_output(self, tmp_path: pathlib.Path) -> None:
483 from muse.core.coord_bus import MAX_PUSH_BATCH
484 n = MAX_PUSH_BATCH * 14
485 responses = [{"inserted": None, "skipped": None}] * 14
486 code, summary = self._run_push_n_batches(tmp_path, n, responses)
487 assert summary.get("failed") is True
488
489 def test_alternating_good_and_null_batches(self, tmp_path: pathlib.Path) -> None:
490 """Odd batches succeed, even batches return null. No crash. failed=true."""
491 from muse.core.coord_bus import MAX_PUSH_BATCH
492 n = MAX_PUSH_BATCH * 6 # 3000 records
493 responses = []
494 for i in range(6):
495 if i % 2 == 0:
496 responses.append({"inserted": MAX_PUSH_BATCH, "skipped": 0})
497 else:
498 responses.append({"inserted": None, "skipped": None})
499
500 code, summary = self._run_push_n_batches(tmp_path, n, responses)
501 assert code != "CRASH"
502 assert summary.get("failed") is True
503 # 3 good batches × MAX_PUSH_BATCH inserted
504 assert summary.get("inserted") == MAX_PUSH_BATCH * 3
505
506 def test_first_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None:
507 from muse.core.coord_bus import MAX_PUSH_BATCH
508 n = MAX_PUSH_BATCH * 3
509 responses = [
510 {"inserted": None, "skipped": None},
511 {"inserted": MAX_PUSH_BATCH, "skipped": 0},
512 {"inserted": MAX_PUSH_BATCH, "skipped": 0},
513 ]
514 code, summary = self._run_push_n_batches(tmp_path, n, responses)
515 assert code != "CRASH"
516 assert summary.get("failed") is True
517 # 2 good batches should still be counted
518 assert summary.get("inserted") == MAX_PUSH_BATCH * 2
519
520 def test_last_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None:
521 from muse.core.coord_bus import MAX_PUSH_BATCH
522 n = MAX_PUSH_BATCH * 3
523 responses = [
524 {"inserted": MAX_PUSH_BATCH, "skipped": 0},
525 {"inserted": MAX_PUSH_BATCH, "skipped": 0},
526 {"inserted": None, "skipped": None},
527 ]
528 code, summary = self._run_push_n_batches(tmp_path, n, responses)
529 assert code != "CRASH"
530 assert summary.get("failed") is True
531 assert summary.get("inserted") == MAX_PUSH_BATCH * 2
532
533
534 # =============================================================================
535 # 5. PERFORMANCE — bad response handling overhead is negligible
536 # =============================================================================
537
538 class TestRunPushNullCountsPerformance:
539 """
540 Bad response handling must not introduce measurable overhead.
541 Error paths in push_to_hub should be as fast as success paths.
542 """
543
544 def _measure_push(self, tmp_path: pathlib.Path, response: MsgpackDict) -> float:
545 import io
546 root = _make_repo(tmp_path)
547 records = [_one_record()]
548
549 with patch("muse.cli.commands.coord_sync._gather_local_records",
550 return_value=records), \
551 patch("muse.core.coord_bus._post_json", return_value=response), \
552 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
553 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
554 return_value=("https://localhost:1337", "tok")), \
555 patch("sys.stdout", io.StringIO()):
556 args = argparse.Namespace(
557 owner="torvalds", slug="linux",
558 json_out=True, hub_url=None,
559 kinds=["reservation"],
560 )
561 t0 = time.monotonic()
562 try:
563 from muse.cli.commands.coord_sync import run_push
564 run_push(args)
565 except SystemExit:
566 pass
567 return time.monotonic() - t0
568
569 def test_null_response_not_slower_than_good_response(self, tmp_path: pathlib.Path) -> None:
570 # Warm up
571 self._measure_push(tmp_path, {"inserted": 1, "skipped": 0})
572 self._measure_push(tmp_path / "x", {"inserted": None, "skipped": None})
573
574 good = self._measure_push(tmp_path / "good", {"inserted": 1, "skipped": 0})
575 bad = self._measure_push(tmp_path / "bad", {"inserted": None, "skipped": None})
576
577 assert bad < max(good * 10, 0.100), (
578 f"null response path ({bad:.4f}s) is unexpectedly slower than "
579 f"good response ({good:.4f}s)"
580 )
581
582 def test_100_consecutive_null_responses_under_1s(self, tmp_path: pathlib.Path) -> None:
583 import io
584
585 root = _make_repo(tmp_path)
586 records = [_one_record()]
587
588 t0 = time.monotonic()
589 for i in range(100):
590 with patch("muse.cli.commands.coord_sync._gather_local_records",
591 return_value=records), \
592 patch("muse.core.coord_bus._post_json",
593 return_value={"inserted": None, "skipped": None}), \
594 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
595 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
596 return_value=("https://localhost:1337", "tok")), \
597 patch("sys.stdout", io.StringIO()):
598 args = argparse.Namespace(
599 owner="torvalds", slug="linux",
600 json_out=True, hub_url=None,
601 kinds=["reservation"],
602 )
603 try:
604 from muse.cli.commands.coord_sync import run_push
605 run_push(args)
606 except SystemExit:
607 pass
608 elapsed = time.monotonic() - t0
609 assert elapsed < 1.0, f"100 null-response pushes took {elapsed:.3f}s (> 1s)"
610
611
612 # =============================================================================
613 # 6. SECURITY — hub cannot cause code execution via count fields
614 # =============================================================================
615
616 class TestRunPushNullCountsSecurity:
617 """
618 Security: a malicious hub cannot exploit the count parsing path.
619 All attack payloads must result in CoordBusError, never in exec/import.
620 """
621
622 @pytest.mark.parametrize("attack_payload", [
623 "__import__('os').system('ls')",
624 "1; __import__('os').system('ls')",
625 "exec('import os')",
626 "${7*7}",
627 "{{7*7}}",
628 "' OR 1=1 --",
629 "\x00\x01\x02",
630 "9" * 10000, # absurdly long numeric string
631 "1e308", # float overflow
632 "inf",
633 "nan",
634 "-inf",
635 ])
636 def test_malicious_inserted_raises_coord_bus_error_not_exec(self, attack_payload: str | int | float | None) -> None:
637 from muse.core.coord_bus import push_to_hub, CoordBusError
638 with patch("muse.core.coord_bus._post_json",
639 return_value={"inserted": attack_payload, "skipped": 0}):
640 try:
641 push_to_hub("https://localhost:1337", "torvalds", "linux",
642 [_one_record()], signing=None)
643 except CoordBusError:
644 pass # correct — attack contained
645 except Exception as exc:
646 pytest.fail(
647 f"Attack payload {attack_payload!r} escaped as "
648 f"{type(exc).__name__}: {exc}"
649 )
650
651 def test_attack_payload_causes_coord_bus_error_not_execution(self, tmp_path: pathlib.Path) -> None:
652 """
653 Malicious count value must be rejected as CoordBusError — not executed.
654 The error message may contain the repr of the bad value (that is fine for
655 a CLI tool), but the Python expression must never be evaluated.
656 """
657 attack = "__import__('os').system('id')"
658 code, output = _run_push_with_hub_response(
659 tmp_path, {"inserted": attack, "skipped": 0}
660 )
661 # Must be an error exit, not success
662 assert code == 1, f"expected exit 1 for attack payload, got {code!r}"
663 # Must not crash with unhandled exception
664 assert code != "CRASH", f"attack payload caused crash: {output}"
665 # Output must be valid JSON (no raw traceback)
666 lines = [l for l in output.strip().splitlines() if l.strip()]
667 for line in lines:
668 try:
669 json.loads(line)
670 except json.JSONDecodeError:
671 pytest.fail(f"non-JSON output for attack payload: {line!r}")
672
673 def test_extremely_large_count_rejected(self, tmp_path: pathlib.Path) -> None:
674 """Hub claiming it inserted 2^63 records is impossible; treat as error."""
675 huge = 2**63
676 code, output = _run_push_with_hub_response(
677 tmp_path, {"inserted": huge, "skipped": 0}
678 )
679 # Should not silently succeed with a nonsensical count
680 assert code != "CRASH"
681 lines = [l for l in output.strip().splitlines() if l.strip()]
682 summary = json.loads(lines[-1]) if lines else {}
683 # Either it fails, or if it "succeeds" the count must be sane (not 2^63)
684 if summary.get("failed") is False:
685 assert summary.get("inserted", 0) <= 10**9, (
686 f"hub's 2^63 count was accepted verbatim: {summary}"
687 )
688
689
690 # =============================================================================
691 # 7. DATA INTEGRITY — counts in output reflect reality
692 # =============================================================================
693
694 class TestRunPushNullCountsDataIntegrity:
695 """
696 When some batches succeed and some return bad counts, the summary output
697 must accurately reflect only the records from successful batches.
698 """
699
700 def test_total_reflects_records_sent_not_hub_count(self, tmp_path: pathlib.Path) -> None:
701 """'total' in output is len(local records), independent of hub response."""
702 code, output = _run_push_with_hub_response(
703 tmp_path, {"inserted": None, "skipped": None}
704 )
705 lines = [l for l in output.strip().splitlines() if l.strip()]
706 summary = json.loads(lines[-1])
707 # total must be 1 (we sent 1 record) regardless of hub response
708 assert summary.get("total") == 1, (
709 f"total should be 1 (records sent), got {summary.get('total')}"
710 )
711
712 def test_inserted_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None:
713 """When hub returns null for inserted, the count must be 0, not garbage."""
714 code, output = _run_push_with_hub_response(
715 tmp_path, {"inserted": None, "skipped": 1}
716 )
717 lines = [l for l in output.strip().splitlines() if l.strip()]
718 summary = json.loads(lines[-1])
719 # After fix: inserted should be 0 (not crashing, not garbage)
720 assert isinstance(summary.get("inserted"), int), (
721 f"inserted must be int in summary, got {summary.get('inserted')!r}"
722 )
723
724 def test_skipped_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None:
725 code, output = _run_push_with_hub_response(
726 tmp_path, {"inserted": 1, "skipped": None}
727 )
728 lines = [l for l in output.strip().splitlines() if l.strip()]
729 summary = json.loads(lines[-1])
730 assert isinstance(summary.get("skipped"), int), (
731 f"skipped must be int in summary, got {summary.get('skipped')!r}"
732 )
733
734 def test_partial_null_counts_are_accumulated_correctly(self, tmp_path: pathlib.Path) -> None:
735 """
736 3 batches: inserted=[5, null, 3].
737 After fix: total inserted = 5 + 0 + 3 = 8.
738 """
739 import io
740 from muse.core.coord_bus import MAX_PUSH_BATCH
741
742 root = _make_repo(tmp_path)
743 records = [
744 {"kind": "reservation", "record_id": f"res-{i:06d}",
745 "run_id": "r", "payload": {}, "expires_at": _FUTURE_TS}
746 for i in range(3)
747 ]
748
749 responses = iter([
750 {"inserted": 5, "skipped": 0},
751 {"inserted": None, "skipped": None},
752 {"inserted": 3, "skipped": 0},
753 ])
754
755 def fake_post(url: str, body: MsgpackDict, token: str) -> MsgpackDict:
756 return next(responses)
757
758 captured = io.StringIO()
759 with patch("muse.cli.commands.coord_sync._gather_local_records",
760 return_value=records * MAX_PUSH_BATCH), \
761 patch("muse.core.coord_bus._post_json", side_effect=fake_post), \
762 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
763 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
764 return_value=("https://localhost:1337", "tok")), \
765 patch("sys.stdout", captured):
766 args = argparse.Namespace(
767 owner="torvalds", slug="linux",
768 json_out=True, hub_url=None,
769 kinds=["reservation"],
770 )
771 try:
772 from muse.cli.commands.coord_sync import run_push
773 run_push(args)
774 except SystemExit:
775 pass
776 except Exception as exc:
777 pytest.fail(f"Crashed: {type(exc).__name__}: {exc}")
778
779 lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()]
780 summary = json.loads(lines[-1])
781 assert summary.get("inserted") == 8, (
782 f"expected inserted=8 (5+0+3), got {summary.get('inserted')}"
783 )
784 assert summary.get("failed") is True, "middle batch failed, so failed must be True"
785
786 def test_output_json_schema_complete_on_bad_response(self, tmp_path: pathlib.Path) -> None:
787 """All required keys must be present in JSON output even on error."""
788 code, output = _run_push_with_hub_response(
789 tmp_path, {"inserted": None, "skipped": None}
790 )
791 lines = [l for l in output.strip().splitlines() if l.strip()]
792 summary = json.loads(lines[-1])
793 for key in ("schema", "inserted", "skipped", "total", "failed", "duration_ms"):
794 assert key in summary, f"key {key!r} missing from summary: {summary}"
File History 1 commit
sha256:06dba78c2a78e251b580422dd1fd547f3c8357ff18f7709a860873b2d24dbbbf chore: bump version to 0.2.0rc14 Sonnet 4.6 patch 16 hours ago