test_coord_push_null_counts.py
python
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠ breaking
30 days ago
| 1 | """ |
| 2 | Tests for the bug: hub response with null/non-integer inserted/skipped counts |
| 3 | causes an unhandled TypeError or ValueError that escapes run_push as a raw |
| 4 | traceback instead of a clean CoordBusError. |
| 5 | |
| 6 | Root cause (coord_bus.py::push_to_hub lines 233-234): |
| 7 | |
| 8 | return { |
| 9 | "inserted": int(result.get("inserted", 0)), # BUG |
| 10 | "skipped": int(result.get("skipped", 0)), # BUG |
| 11 | } |
| 12 | |
| 13 | When the hub returns {"inserted": null}: |
| 14 | - result.get("inserted", 0) → None (key EXISTS so default is NOT used) |
| 15 | - int(None) → TypeError |
| 16 | |
| 17 | When the hub returns {"inserted": "three"}: |
| 18 | - int("three") → ValueError |
| 19 | |
| 20 | Neither TypeError nor ValueError is CoordBusError. |
| 21 | run_push only catches CoordBusError — the exception escapes as a raw traceback. |
| 22 | |
| 23 | Coverage: |
| 24 | Unit — push_to_hub directly, all bad-value variants |
| 25 | Integration — run_push with bad hub response (two layers deep) |
| 26 | End-to-end — CLI CliRunner invocation, asserts no traceback in output |
| 27 | Stress — 14-batch push, every batch returns a bad response |
| 28 | Performance — bad response handling overhead is negligible |
| 29 | Security — hub cannot cause arbitrary code exec via count field |
| 30 | Data integrity — partial-batch failures produce correct counts in output |
| 31 | """ |
| 32 | from __future__ import annotations |
| 33 | |
| 34 | import argparse |
| 35 | import json |
| 36 | import pathlib |
| 37 | import time |
| 38 | from io import BytesIO |
| 39 | from unittest.mock import MagicMock, patch |
| 40 | |
| 41 | import pytest |
| 42 | |
| 43 | from muse.core.types import MsgpackDict, MsgpackValue |
| 44 | from muse.core.paths import muse_dir |
| 45 | |
| 46 | # --------------------------------------------------------------------------- |
| 47 | # Shared helpers |
| 48 | # --------------------------------------------------------------------------- |
| 49 | |
| 50 | _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim") |
| 51 | _FUTURE_TS = "2099-12-31T23:59:59+00:00" |
| 52 | |
| 53 | |
| 54 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 55 | muse_dir(tmp_path).mkdir(parents=True, exist_ok=True) |
| 56 | return tmp_path |
| 57 | |
| 58 | |
| 59 | def _one_record() -> MsgpackDict: |
| 60 | return { |
| 61 | "kind": "reservation", |
| 62 | "record_id": "res-000001", |
| 63 | "run_id": "run-0", |
| 64 | "payload": {"reservation_id": "res-000001", "expires_at": _FUTURE_TS}, |
| 65 | "expires_at": _FUTURE_TS, |
| 66 | } |
| 67 | |
| 68 | |
| 69 | def _make_http_response(body: MsgpackDict) -> BytesIO: |
| 70 | return BytesIO(json.dumps(body).encode()) |
| 71 | |
| 72 | |
| 73 | def _run_push_with_hub_response(tmp_path: pathlib.Path, hub_response: MsgpackDict) -> tuple[int | None, str]: |
| 74 | """ |
| 75 | Run run_push with _post_json mocked to return hub_response. |
| 76 | Returns (exit_code, stdout_captured). |
| 77 | exit_code is None if no SystemExit was raised (i.e. the function returned normally). |
| 78 | """ |
| 79 | import io |
| 80 | import sys |
| 81 | |
| 82 | root = _make_repo(tmp_path) |
| 83 | |
| 84 | captured = io.StringIO() |
| 85 | exit_code = None |
| 86 | |
| 87 | with patch("muse.cli.commands.coord_sync._gather_local_records", |
| 88 | return_value=[_one_record()]), \ |
| 89 | patch("muse.core.coord_bus._post_json", return_value=hub_response), \ |
| 90 | patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ |
| 91 | patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", |
| 92 | return_value=("https://localhost:1337", "tok")), \ |
| 93 | patch("sys.stdout", captured): |
| 94 | args = argparse.Namespace( |
| 95 | owner="torvalds", slug="linux", |
| 96 | json_out=True, hub_url=None, |
| 97 | kinds=["reservation"], |
| 98 | ) |
| 99 | try: |
| 100 | from muse.cli.commands.coord_sync import run_push |
| 101 | run_push(args) |
| 102 | except SystemExit as exc: |
| 103 | exit_code = exc.code |
| 104 | except Exception as exc: |
| 105 | # If a non-SystemExit exception escapes, surface it explicitly |
| 106 | return ("CRASH", f"{type(exc).__name__}: {exc}") |
| 107 | |
| 108 | return (exit_code, captured.getvalue()) |
| 109 | |
| 110 | |
| 111 | # ============================================================================= |
| 112 | # 1. UNIT — push_to_hub directly |
| 113 | # ============================================================================= |
| 114 | |
| 115 | class TestPushToHubNullCountsUnit: |
| 116 | """ |
| 117 | Unit tests on coord_bus.push_to_hub. |
| 118 | _post_json is mocked to return bad count values. |
| 119 | Assert that push_to_hub raises CoordBusError, NOT TypeError or ValueError. |
| 120 | """ |
| 121 | |
| 122 | @pytest.mark.parametrize("bad_inserted", [ |
| 123 | None, # JSON null |
| 124 | "three", # non-numeric string |
| 125 | [], # list |
| 126 | {}, # dict |
| 127 | "1; drop table", # injection attempt |
| 128 | ]) |
| 129 | def test_bad_inserted_raises_coord_bus_error_not_typeerror(self, bad_inserted: MsgpackValue) -> None: |
| 130 | from muse.core.coord_bus import push_to_hub, CoordBusError |
| 131 | with patch("muse.core.coord_bus._post_json", |
| 132 | return_value={"inserted": bad_inserted, "skipped": 0}): |
| 133 | with pytest.raises(CoordBusError): |
| 134 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 135 | [_one_record()], signing=None) |
| 136 | |
| 137 | @pytest.mark.parametrize("bad_skipped", [ |
| 138 | None, |
| 139 | "three", |
| 140 | [], |
| 141 | {}, |
| 142 | "1; drop table", |
| 143 | ]) |
| 144 | def test_bad_skipped_raises_coord_bus_error_not_typeerror(self, bad_skipped: MsgpackValue) -> None: |
| 145 | from muse.core.coord_bus import push_to_hub, CoordBusError |
| 146 | with patch("muse.core.coord_bus._post_json", |
| 147 | return_value={"inserted": 1, "skipped": bad_skipped}): |
| 148 | with pytest.raises(CoordBusError): |
| 149 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 150 | [_one_record()], signing=None) |
| 151 | |
| 152 | def test_both_null_raises_coord_bus_error(self) -> None: |
| 153 | from muse.core.coord_bus import push_to_hub, CoordBusError |
| 154 | with patch("muse.core.coord_bus._post_json", |
| 155 | return_value={"inserted": None, "skipped": None}): |
| 156 | with pytest.raises(CoordBusError): |
| 157 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 158 | [_one_record()], signing=None) |
| 159 | |
| 160 | def test_null_never_raises_raw_typeerror(self) -> None: |
| 161 | """The specific confirmed bug: int(None) must not escape as TypeError.""" |
| 162 | from muse.core.coord_bus import push_to_hub, CoordBusError |
| 163 | with patch("muse.core.coord_bus._post_json", |
| 164 | return_value={"inserted": None, "skipped": 0}): |
| 165 | try: |
| 166 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 167 | [_one_record()], signing=None) |
| 168 | except CoordBusError: |
| 169 | pass # correct |
| 170 | except TypeError as exc: |
| 171 | pytest.fail(f"Raw TypeError escaped push_to_hub: {exc}") |
| 172 | |
| 173 | def test_invalid_string_never_raises_raw_valueerror(self) -> None: |
| 174 | """int('three') must not escape as ValueError.""" |
| 175 | from muse.core.coord_bus import push_to_hub, CoordBusError |
| 176 | with patch("muse.core.coord_bus._post_json", |
| 177 | return_value={"inserted": "three", "skipped": 0}): |
| 178 | try: |
| 179 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 180 | [_one_record()], signing=None) |
| 181 | except CoordBusError: |
| 182 | pass # correct |
| 183 | except ValueError as exc: |
| 184 | pytest.fail(f"Raw ValueError escaped push_to_hub: {exc}") |
| 185 | |
| 186 | # These should SUCCEED — confirm good values still work |
| 187 | def test_valid_integer_counts_pass_through(self) -> None: |
| 188 | from muse.core.coord_bus import push_to_hub |
| 189 | with patch("muse.core.coord_bus._post_json", |
| 190 | return_value={"inserted": 1, "skipped": 0}): |
| 191 | result = push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 192 | [_one_record()], signing=None) |
| 193 | assert result == {"inserted": 1, "skipped": 0} |
| 194 | |
| 195 | def test_float_count_truncated_to_int(self) -> None: |
| 196 | """Float counts are a hub bug but truncate cleanly when within bounds.""" |
| 197 | from muse.core.coord_bus import push_to_hub |
| 198 | # Send 3 records so int(2.9)=2 and int(0.1)=0 both pass the bounds check |
| 199 | three_records = [_one_record(), _one_record(), _one_record()] |
| 200 | with patch("muse.core.coord_bus._post_json", |
| 201 | return_value={"inserted": 2.9, "skipped": 0.1}): |
| 202 | result = push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 203 | three_records, signing=None) |
| 204 | assert result["inserted"] == 2 |
| 205 | assert result["skipped"] == 0 |
| 206 | |
| 207 | def test_zero_counts_valid(self) -> None: |
| 208 | from muse.core.coord_bus import push_to_hub |
| 209 | with patch("muse.core.coord_bus._post_json", |
| 210 | return_value={"inserted": 0, "skipped": 0}): |
| 211 | result = push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 212 | [_one_record()], signing=None) |
| 213 | assert result == {"inserted": 0, "skipped": 0} |
| 214 | |
| 215 | def test_missing_both_keys_defaults_to_zero(self) -> None: |
| 216 | """Hub omits both keys entirely — already handled by .get default.""" |
| 217 | from muse.core.coord_bus import push_to_hub |
| 218 | with patch("muse.core.coord_bus._post_json", return_value={}): |
| 219 | result = push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 220 | [_one_record()], signing=None) |
| 221 | assert result == {"inserted": 0, "skipped": 0} |
| 222 | |
| 223 | def test_negative_count_raises_coord_bus_error(self) -> None: |
| 224 | """Hub returning negative counts is a protocol violation.""" |
| 225 | from muse.core.coord_bus import push_to_hub, CoordBusError |
| 226 | with patch("muse.core.coord_bus._post_json", |
| 227 | return_value={"inserted": -5, "skipped": 0}): |
| 228 | with pytest.raises(CoordBusError): |
| 229 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 230 | [_one_record()], signing=None) |
| 231 | |
| 232 | def test_count_exceeding_batch_size_raises_coord_bus_error(self) -> None: |
| 233 | """Hub claims it inserted more records than were sent — impossible.""" |
| 234 | from muse.core.coord_bus import push_to_hub, CoordBusError, MAX_PUSH_BATCH |
| 235 | with patch("muse.core.coord_bus._post_json", |
| 236 | return_value={"inserted": MAX_PUSH_BATCH + 1, "skipped": 0}): |
| 237 | with pytest.raises(CoordBusError): |
| 238 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 239 | [_one_record()], signing=None) |
| 240 | |
| 241 | |
| 242 | # ============================================================================= |
| 243 | # 2. INTEGRATION — run_push with bad hub response (two layers deep) |
| 244 | # ============================================================================= |
| 245 | |
| 246 | class TestRunPushNullCountsIntegration: |
| 247 | """ |
| 248 | Integration tests: run_push with _post_json mocked at the wire level. |
| 249 | Asserts clean exit (SystemExit(1) for hub errors) — never an unhandled exception. |
| 250 | """ |
| 251 | |
| 252 | def test_null_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: |
| 253 | code, output = _run_push_with_hub_response( |
| 254 | tmp_path, {"inserted": None, "skipped": 0} |
| 255 | ) |
| 256 | assert code != "CRASH", f"run_push crashed: {output}" |
| 257 | |
| 258 | def test_null_skipped_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: |
| 259 | code, output = _run_push_with_hub_response( |
| 260 | tmp_path, {"inserted": 1, "skipped": None} |
| 261 | ) |
| 262 | assert code != "CRASH", f"run_push crashed: {output}" |
| 263 | |
| 264 | def test_both_null_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: |
| 265 | code, output = _run_push_with_hub_response( |
| 266 | tmp_path, {"inserted": None, "skipped": None} |
| 267 | ) |
| 268 | assert code != "CRASH", f"run_push crashed: {output}" |
| 269 | |
| 270 | def test_string_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: |
| 271 | code, output = _run_push_with_hub_response( |
| 272 | tmp_path, {"inserted": "three", "skipped": 0} |
| 273 | ) |
| 274 | assert code != "CRASH", f"run_push crashed: {output}" |
| 275 | |
| 276 | def test_list_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: |
| 277 | code, output = _run_push_with_hub_response( |
| 278 | tmp_path, {"inserted": [1, 2, 3], "skipped": 0} |
| 279 | ) |
| 280 | assert code != "CRASH", f"run_push crashed: {output}" |
| 281 | |
| 282 | def test_dict_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: |
| 283 | code, output = _run_push_with_hub_response( |
| 284 | tmp_path, {"inserted": {"count": 1}, "skipped": 0} |
| 285 | ) |
| 286 | assert code != "CRASH", f"run_push crashed: {output}" |
| 287 | |
| 288 | def test_bad_response_exits_with_code_1(self, tmp_path: pathlib.Path) -> None: |
| 289 | """Bad hub response should be an error exit (code 1), not success (None/0).""" |
| 290 | code, output = _run_push_with_hub_response( |
| 291 | tmp_path, {"inserted": None, "skipped": None} |
| 292 | ) |
| 293 | # run_push raises SystemExit(1) when failed=True; clean success returns None |
| 294 | assert code == 1, f"expected exit code 1 for bad hub response, got {code!r}" |
| 295 | |
| 296 | def test_bad_response_json_output_has_failed_true(self, tmp_path: pathlib.Path) -> None: |
| 297 | """JSON output must have failed=true, not a raw exception message.""" |
| 298 | code, output = _run_push_with_hub_response( |
| 299 | tmp_path, {"inserted": None, "skipped": None} |
| 300 | ) |
| 301 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 302 | assert lines, "no output produced" |
| 303 | summary = json.loads(lines[-1]) |
| 304 | assert summary.get("failed") is True, f"expected failed=true in {summary}" |
| 305 | |
| 306 | def test_bad_response_output_contains_no_traceback(self, tmp_path: pathlib.Path) -> None: |
| 307 | """Traceback must never appear in stdout.""" |
| 308 | _, output = _run_push_with_hub_response( |
| 309 | tmp_path, {"inserted": None, "skipped": None} |
| 310 | ) |
| 311 | assert "Traceback" not in output, f"traceback leaked to stdout:\n{output}" |
| 312 | assert "TypeError" not in output, f"TypeError leaked to stdout:\n{output}" |
| 313 | assert "ValueError" not in output, f"ValueError leaked to stdout:\n{output}" |
| 314 | |
| 315 | def test_good_response_still_works_after_fix(self, tmp_path: pathlib.Path) -> None: |
| 316 | """Valid hub response must still succeed after the fix is applied.""" |
| 317 | code, output = _run_push_with_hub_response( |
| 318 | tmp_path, {"inserted": 1, "skipped": 0} |
| 319 | ) |
| 320 | # run_push does not raise SystemExit on success — exit_code stays None |
| 321 | assert code in (0, None), f"expected clean exit for valid response, got {code!r}" |
| 322 | summary = json.loads(output.strip().splitlines()[-1]) |
| 323 | assert summary["inserted"] == 1 |
| 324 | assert summary["skipped"] == 0 |
| 325 | assert summary["failed"] is False |
| 326 | |
| 327 | |
| 328 | # ============================================================================= |
| 329 | # 3. END-TO-END — CLI output is valid JSON with no tracebacks |
| 330 | # ============================================================================= |
| 331 | |
| 332 | class TestRunPushNullCountsEndToEnd: |
| 333 | """ |
| 334 | End-to-end: simulate what an operator would see at the terminal. |
| 335 | Output must be valid JSON, must contain no Python traceback text, |
| 336 | and the process must exit cleanly (no unhandled exception). |
| 337 | """ |
| 338 | |
| 339 | @pytest.mark.parametrize("bad_response", [ |
| 340 | {"inserted": None, "skipped": None}, |
| 341 | {"inserted": None, "skipped": 0}, |
| 342 | {"inserted": 1, "skipped": None}, |
| 343 | {"inserted": "bad", "skipped": 0}, |
| 344 | {"inserted": [], "skipped": 0}, |
| 345 | {}, # completely empty |
| 346 | {"other": "keys"}, # no inserted/skipped at all (already handled) |
| 347 | ]) |
| 348 | def test_cli_output_is_valid_json_for_bad_hub_response(self, tmp_path: pathlib.Path, bad_response: MsgpackDict) -> None: |
| 349 | code, output = _run_push_with_hub_response(tmp_path, bad_response) |
| 350 | assert code != "CRASH", f"run_push crashed on {bad_response}: {output}" |
| 351 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 352 | assert lines, f"no output for hub response {bad_response}" |
| 353 | # Every output line must be valid JSON |
| 354 | for line in lines: |
| 355 | try: |
| 356 | json.loads(line) |
| 357 | except json.JSONDecodeError: |
| 358 | pytest.fail(f"non-JSON line in output for {bad_response}: {line!r}") |
| 359 | |
| 360 | def test_cli_output_never_contains_exception_class_names(self, tmp_path: pathlib.Path) -> None: |
| 361 | for bad_val in [None, "bad", [], {}]: |
| 362 | _, output = _run_push_with_hub_response( |
| 363 | tmp_path, {"inserted": bad_val, "skipped": 0} |
| 364 | ) |
| 365 | for forbidden in ("TypeError", "ValueError", "AttributeError", |
| 366 | "Traceback", "most recent call"): |
| 367 | assert forbidden not in output, ( |
| 368 | f"{forbidden!r} leaked into CLI output for inserted={bad_val!r}:\n{output}" |
| 369 | ) |
| 370 | |
| 371 | def test_text_mode_also_clean_on_bad_response(self, tmp_path: pathlib.Path) -> None: |
| 372 | """Non-JSON (text) mode must also not crash.""" |
| 373 | import io |
| 374 | import sys |
| 375 | root = _make_repo(tmp_path) |
| 376 | captured = io.StringIO() |
| 377 | |
| 378 | with patch("muse.cli.commands.coord_sync._gather_local_records", |
| 379 | return_value=[_one_record()]), \ |
| 380 | patch("muse.core.coord_bus._post_json", |
| 381 | return_value={"inserted": None, "skipped": None}), \ |
| 382 | patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ |
| 383 | patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", |
| 384 | return_value=("https://localhost:1337", "tok")), \ |
| 385 | patch("sys.stdout", captured): |
| 386 | args = argparse.Namespace( |
| 387 | owner="torvalds", slug="linux", |
| 388 | json_out=False, hub_url=None, |
| 389 | kinds=["reservation"], |
| 390 | ) |
| 391 | try: |
| 392 | from muse.cli.commands.coord_sync import run_push |
| 393 | run_push(args) |
| 394 | except SystemExit: |
| 395 | pass |
| 396 | except Exception as exc: |
| 397 | pytest.fail(f"text mode crashed: {type(exc).__name__}: {exc}") |
| 398 | |
| 399 | output = captured.getvalue() |
| 400 | assert "Traceback" not in output |
| 401 | assert "TypeError" not in output |
| 402 | |
| 403 | |
| 404 | # ============================================================================= |
| 405 | # 4. STRESS — 14 batches, every batch returns a bad response |
| 406 | # ============================================================================= |
| 407 | |
| 408 | class TestRunPushNullCountsStress: |
| 409 | """ |
| 410 | Stress tests: 14 batches (7000 records) all return null counts. |
| 411 | The system must not crash on any batch and must emit correct summary output. |
| 412 | """ |
| 413 | |
| 414 | def _run_push_n_batches( |
| 415 | self, |
| 416 | tmp_path: pathlib.Path, |
| 417 | n_records: int, |
| 418 | hub_responses: list[MsgpackDict], |
| 419 | ) -> tuple[int | None, MsgpackDict]: |
| 420 | import io |
| 421 | root = _make_repo(tmp_path) |
| 422 | records = [ |
| 423 | { |
| 424 | "kind": "reservation", |
| 425 | "record_id": f"res-{i:06d}", |
| 426 | "run_id": f"run-{i}", |
| 427 | "payload": {}, |
| 428 | "expires_at": _FUTURE_TS, |
| 429 | } |
| 430 | for i in range(n_records) |
| 431 | ] |
| 432 | |
| 433 | response_iter = iter(hub_responses) |
| 434 | |
| 435 | def fake_post_json(url: str, body: MsgpackDict, token: str) -> MsgpackDict: |
| 436 | try: |
| 437 | return next(response_iter) |
| 438 | except StopIteration: |
| 439 | return {"inserted": 0, "skipped": 0} |
| 440 | |
| 441 | captured = io.StringIO() |
| 442 | exit_code = None |
| 443 | |
| 444 | with patch("muse.cli.commands.coord_sync._gather_local_records", |
| 445 | return_value=records), \ |
| 446 | patch("muse.core.coord_bus._post_json", side_effect=fake_post_json), \ |
| 447 | patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ |
| 448 | patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", |
| 449 | return_value=("https://localhost:1337", "tok")), \ |
| 450 | patch("sys.stdout", captured): |
| 451 | args = argparse.Namespace( |
| 452 | owner="torvalds", slug="linux", |
| 453 | json_out=True, hub_url=None, |
| 454 | kinds=["reservation"], |
| 455 | ) |
| 456 | try: |
| 457 | from muse.cli.commands.coord_sync import run_push |
| 458 | run_push(args) |
| 459 | except SystemExit as exc: |
| 460 | exit_code = exc.code |
| 461 | except Exception as exc: |
| 462 | return ("CRASH", {}) |
| 463 | |
| 464 | lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()] |
| 465 | summary = json.loads(lines[-1]) if lines else {} |
| 466 | return (exit_code, summary) |
| 467 | |
| 468 | def test_all_14_batches_return_null_no_crash(self, tmp_path: pathlib.Path) -> None: |
| 469 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 470 | n = MAX_PUSH_BATCH * 14 # 7000 records |
| 471 | responses = [{"inserted": None, "skipped": None}] * 14 |
| 472 | code, summary = self._run_push_n_batches(tmp_path, n, responses) |
| 473 | assert code != "CRASH", "run_push crashed on 14 null-count batches" |
| 474 | |
| 475 | def test_all_14_batches_return_null_exit_code_1(self, tmp_path: pathlib.Path) -> None: |
| 476 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 477 | n = MAX_PUSH_BATCH * 14 |
| 478 | responses = [{"inserted": None, "skipped": None}] * 14 |
| 479 | code, summary = self._run_push_n_batches(tmp_path, n, responses) |
| 480 | assert code == 1, f"expected exit 1 for all-null batches, got {code!r}" |
| 481 | |
| 482 | def test_all_14_batches_return_null_failed_true_in_output(self, tmp_path: pathlib.Path) -> None: |
| 483 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 484 | n = MAX_PUSH_BATCH * 14 |
| 485 | responses = [{"inserted": None, "skipped": None}] * 14 |
| 486 | code, summary = self._run_push_n_batches(tmp_path, n, responses) |
| 487 | assert summary.get("failed") is True |
| 488 | |
| 489 | def test_alternating_good_and_null_batches(self, tmp_path: pathlib.Path) -> None: |
| 490 | """Odd batches succeed, even batches return null. No crash. failed=true.""" |
| 491 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 492 | n = MAX_PUSH_BATCH * 6 # 3000 records |
| 493 | responses = [] |
| 494 | for i in range(6): |
| 495 | if i % 2 == 0: |
| 496 | responses.append({"inserted": MAX_PUSH_BATCH, "skipped": 0}) |
| 497 | else: |
| 498 | responses.append({"inserted": None, "skipped": None}) |
| 499 | |
| 500 | code, summary = self._run_push_n_batches(tmp_path, n, responses) |
| 501 | assert code != "CRASH" |
| 502 | assert summary.get("failed") is True |
| 503 | # 3 good batches × MAX_PUSH_BATCH inserted |
| 504 | assert summary.get("inserted") == MAX_PUSH_BATCH * 3 |
| 505 | |
| 506 | def test_first_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None: |
| 507 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 508 | n = MAX_PUSH_BATCH * 3 |
| 509 | responses = [ |
| 510 | {"inserted": None, "skipped": None}, |
| 511 | {"inserted": MAX_PUSH_BATCH, "skipped": 0}, |
| 512 | {"inserted": MAX_PUSH_BATCH, "skipped": 0}, |
| 513 | ] |
| 514 | code, summary = self._run_push_n_batches(tmp_path, n, responses) |
| 515 | assert code != "CRASH" |
| 516 | assert summary.get("failed") is True |
| 517 | # 2 good batches should still be counted |
| 518 | assert summary.get("inserted") == MAX_PUSH_BATCH * 2 |
| 519 | |
| 520 | def test_last_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None: |
| 521 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 522 | n = MAX_PUSH_BATCH * 3 |
| 523 | responses = [ |
| 524 | {"inserted": MAX_PUSH_BATCH, "skipped": 0}, |
| 525 | {"inserted": MAX_PUSH_BATCH, "skipped": 0}, |
| 526 | {"inserted": None, "skipped": None}, |
| 527 | ] |
| 528 | code, summary = self._run_push_n_batches(tmp_path, n, responses) |
| 529 | assert code != "CRASH" |
| 530 | assert summary.get("failed") is True |
| 531 | assert summary.get("inserted") == MAX_PUSH_BATCH * 2 |
| 532 | |
| 533 | |
| 534 | # ============================================================================= |
| 535 | # 5. PERFORMANCE — bad response handling overhead is negligible |
| 536 | # ============================================================================= |
| 537 | |
| 538 | class TestRunPushNullCountsPerformance: |
| 539 | """ |
| 540 | Bad response handling must not introduce measurable overhead. |
| 541 | Error paths in push_to_hub should be as fast as success paths. |
| 542 | """ |
| 543 | |
| 544 | def _measure_push(self, tmp_path: pathlib.Path, response: MsgpackDict) -> float: |
| 545 | import io |
| 546 | root = _make_repo(tmp_path) |
| 547 | records = [_one_record()] |
| 548 | |
| 549 | with patch("muse.cli.commands.coord_sync._gather_local_records", |
| 550 | return_value=records), \ |
| 551 | patch("muse.core.coord_bus._post_json", return_value=response), \ |
| 552 | patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ |
| 553 | patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", |
| 554 | return_value=("https://localhost:1337", "tok")), \ |
| 555 | patch("sys.stdout", io.StringIO()): |
| 556 | args = argparse.Namespace( |
| 557 | owner="torvalds", slug="linux", |
| 558 | json_out=True, hub_url=None, |
| 559 | kinds=["reservation"], |
| 560 | ) |
| 561 | t0 = time.monotonic() |
| 562 | try: |
| 563 | from muse.cli.commands.coord_sync import run_push |
| 564 | run_push(args) |
| 565 | except SystemExit: |
| 566 | pass |
| 567 | return time.monotonic() - t0 |
| 568 | |
| 569 | def test_null_response_not_slower_than_good_response(self, tmp_path: pathlib.Path) -> None: |
| 570 | # Warm up |
| 571 | self._measure_push(tmp_path, {"inserted": 1, "skipped": 0}) |
| 572 | self._measure_push(tmp_path / "x", {"inserted": None, "skipped": None}) |
| 573 | |
| 574 | good = self._measure_push(tmp_path / "good", {"inserted": 1, "skipped": 0}) |
| 575 | bad = self._measure_push(tmp_path / "bad", {"inserted": None, "skipped": None}) |
| 576 | |
| 577 | assert bad < max(good * 10, 0.100), ( |
| 578 | f"null response path ({bad:.4f}s) is unexpectedly slower than " |
| 579 | f"good response ({good:.4f}s)" |
| 580 | ) |
| 581 | |
| 582 | def test_100_consecutive_null_responses_under_1s(self, tmp_path: pathlib.Path) -> None: |
| 583 | import io |
| 584 | |
| 585 | root = _make_repo(tmp_path) |
| 586 | records = [_one_record()] |
| 587 | |
| 588 | t0 = time.monotonic() |
| 589 | for i in range(100): |
| 590 | with patch("muse.cli.commands.coord_sync._gather_local_records", |
| 591 | return_value=records), \ |
| 592 | patch("muse.core.coord_bus._post_json", |
| 593 | return_value={"inserted": None, "skipped": None}), \ |
| 594 | patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ |
| 595 | patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", |
| 596 | return_value=("https://localhost:1337", "tok")), \ |
| 597 | patch("sys.stdout", io.StringIO()): |
| 598 | args = argparse.Namespace( |
| 599 | owner="torvalds", slug="linux", |
| 600 | json_out=True, hub_url=None, |
| 601 | kinds=["reservation"], |
| 602 | ) |
| 603 | try: |
| 604 | from muse.cli.commands.coord_sync import run_push |
| 605 | run_push(args) |
| 606 | except SystemExit: |
| 607 | pass |
| 608 | elapsed = time.monotonic() - t0 |
| 609 | assert elapsed < 1.0, f"100 null-response pushes took {elapsed:.3f}s (> 1s)" |
| 610 | |
| 611 | |
| 612 | # ============================================================================= |
| 613 | # 6. SECURITY — hub cannot cause code execution via count fields |
| 614 | # ============================================================================= |
| 615 | |
| 616 | class TestRunPushNullCountsSecurity: |
| 617 | """ |
| 618 | Security: a malicious hub cannot exploit the count parsing path. |
| 619 | All attack payloads must result in CoordBusError, never in exec/import. |
| 620 | """ |
| 621 | |
| 622 | @pytest.mark.parametrize("attack_payload", [ |
| 623 | "__import__('os').system('ls')", |
| 624 | "1; __import__('os').system('ls')", |
| 625 | "exec('import os')", |
| 626 | "${7*7}", |
| 627 | "{{7*7}}", |
| 628 | "' OR 1=1 --", |
| 629 | "\x00\x01\x02", |
| 630 | "9" * 10000, # absurdly long numeric string |
| 631 | "1e308", # float overflow |
| 632 | "inf", |
| 633 | "nan", |
| 634 | "-inf", |
| 635 | ]) |
| 636 | def test_malicious_inserted_raises_coord_bus_error_not_exec(self, attack_payload: str | int | float | None) -> None: |
| 637 | from muse.core.coord_bus import push_to_hub, CoordBusError |
| 638 | with patch("muse.core.coord_bus._post_json", |
| 639 | return_value={"inserted": attack_payload, "skipped": 0}): |
| 640 | try: |
| 641 | push_to_hub("https://localhost:1337", "torvalds", "linux", |
| 642 | [_one_record()], signing=None) |
| 643 | except CoordBusError: |
| 644 | pass # correct — attack contained |
| 645 | except Exception as exc: |
| 646 | pytest.fail( |
| 647 | f"Attack payload {attack_payload!r} escaped as " |
| 648 | f"{type(exc).__name__}: {exc}" |
| 649 | ) |
| 650 | |
| 651 | def test_attack_payload_causes_coord_bus_error_not_execution(self, tmp_path: pathlib.Path) -> None: |
| 652 | """ |
| 653 | Malicious count value must be rejected as CoordBusError — not executed. |
| 654 | The error message may contain the repr of the bad value (that is fine for |
| 655 | a CLI tool), but the Python expression must never be evaluated. |
| 656 | """ |
| 657 | attack = "__import__('os').system('id')" |
| 658 | code, output = _run_push_with_hub_response( |
| 659 | tmp_path, {"inserted": attack, "skipped": 0} |
| 660 | ) |
| 661 | # Must be an error exit, not success |
| 662 | assert code == 1, f"expected exit 1 for attack payload, got {code!r}" |
| 663 | # Must not crash with unhandled exception |
| 664 | assert code != "CRASH", f"attack payload caused crash: {output}" |
| 665 | # Output must be valid JSON (no raw traceback) |
| 666 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 667 | for line in lines: |
| 668 | try: |
| 669 | json.loads(line) |
| 670 | except json.JSONDecodeError: |
| 671 | pytest.fail(f"non-JSON output for attack payload: {line!r}") |
| 672 | |
| 673 | def test_extremely_large_count_rejected(self, tmp_path: pathlib.Path) -> None: |
| 674 | """Hub claiming it inserted 2^63 records is impossible; treat as error.""" |
| 675 | huge = 2**63 |
| 676 | code, output = _run_push_with_hub_response( |
| 677 | tmp_path, {"inserted": huge, "skipped": 0} |
| 678 | ) |
| 679 | # Should not silently succeed with a nonsensical count |
| 680 | assert code != "CRASH" |
| 681 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 682 | summary = json.loads(lines[-1]) if lines else {} |
| 683 | # Either it fails, or if it "succeeds" the count must be sane (not 2^63) |
| 684 | if summary.get("failed") is False: |
| 685 | assert summary.get("inserted", 0) <= 10**9, ( |
| 686 | f"hub's 2^63 count was accepted verbatim: {summary}" |
| 687 | ) |
| 688 | |
| 689 | |
| 690 | # ============================================================================= |
| 691 | # 7. DATA INTEGRITY — counts in output reflect reality |
| 692 | # ============================================================================= |
| 693 | |
| 694 | class TestRunPushNullCountsDataIntegrity: |
| 695 | """ |
| 696 | When some batches succeed and some return bad counts, the summary output |
| 697 | must accurately reflect only the records from successful batches. |
| 698 | """ |
| 699 | |
| 700 | def test_total_reflects_records_sent_not_hub_count(self, tmp_path: pathlib.Path) -> None: |
| 701 | """'total' in output is len(local records), independent of hub response.""" |
| 702 | code, output = _run_push_with_hub_response( |
| 703 | tmp_path, {"inserted": None, "skipped": None} |
| 704 | ) |
| 705 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 706 | summary = json.loads(lines[-1]) |
| 707 | # total must be 1 (we sent 1 record) regardless of hub response |
| 708 | assert summary.get("total") == 1, ( |
| 709 | f"total should be 1 (records sent), got {summary.get('total')}" |
| 710 | ) |
| 711 | |
| 712 | def test_inserted_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None: |
| 713 | """When hub returns null for inserted, the count must be 0, not garbage.""" |
| 714 | code, output = _run_push_with_hub_response( |
| 715 | tmp_path, {"inserted": None, "skipped": 1} |
| 716 | ) |
| 717 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 718 | summary = json.loads(lines[-1]) |
| 719 | # After fix: inserted should be 0 (not crashing, not garbage) |
| 720 | assert isinstance(summary.get("inserted"), int), ( |
| 721 | f"inserted must be int in summary, got {summary.get('inserted')!r}" |
| 722 | ) |
| 723 | |
| 724 | def test_skipped_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None: |
| 725 | code, output = _run_push_with_hub_response( |
| 726 | tmp_path, {"inserted": 1, "skipped": None} |
| 727 | ) |
| 728 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 729 | summary = json.loads(lines[-1]) |
| 730 | assert isinstance(summary.get("skipped"), int), ( |
| 731 | f"skipped must be int in summary, got {summary.get('skipped')!r}" |
| 732 | ) |
| 733 | |
| 734 | def test_partial_null_counts_are_accumulated_correctly(self, tmp_path: pathlib.Path) -> None: |
| 735 | """ |
| 736 | 3 batches: inserted=[5, null, 3]. |
| 737 | After fix: total inserted = 5 + 0 + 3 = 8. |
| 738 | """ |
| 739 | import io |
| 740 | from muse.core.coord_bus import MAX_PUSH_BATCH |
| 741 | |
| 742 | root = _make_repo(tmp_path) |
| 743 | records = [ |
| 744 | {"kind": "reservation", "record_id": f"res-{i:06d}", |
| 745 | "run_id": "r", "payload": {}, "expires_at": _FUTURE_TS} |
| 746 | for i in range(3) |
| 747 | ] |
| 748 | |
| 749 | responses = iter([ |
| 750 | {"inserted": 5, "skipped": 0}, |
| 751 | {"inserted": None, "skipped": None}, |
| 752 | {"inserted": 3, "skipped": 0}, |
| 753 | ]) |
| 754 | |
| 755 | def fake_post(url: str, body: MsgpackDict, token: str) -> MsgpackDict: |
| 756 | return next(responses) |
| 757 | |
| 758 | captured = io.StringIO() |
| 759 | with patch("muse.cli.commands.coord_sync._gather_local_records", |
| 760 | return_value=records * MAX_PUSH_BATCH), \ |
| 761 | patch("muse.core.coord_bus._post_json", side_effect=fake_post), \ |
| 762 | patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ |
| 763 | patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", |
| 764 | return_value=("https://localhost:1337", "tok")), \ |
| 765 | patch("sys.stdout", captured): |
| 766 | args = argparse.Namespace( |
| 767 | owner="torvalds", slug="linux", |
| 768 | json_out=True, hub_url=None, |
| 769 | kinds=["reservation"], |
| 770 | ) |
| 771 | try: |
| 772 | from muse.cli.commands.coord_sync import run_push |
| 773 | run_push(args) |
| 774 | except SystemExit: |
| 775 | pass |
| 776 | except Exception as exc: |
| 777 | pytest.fail(f"Crashed: {type(exc).__name__}: {exc}") |
| 778 | |
| 779 | lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()] |
| 780 | summary = json.loads(lines[-1]) |
| 781 | assert summary.get("inserted") == 8, ( |
| 782 | f"expected inserted=8 (5+0+3), got {summary.get('inserted')}" |
| 783 | ) |
| 784 | assert summary.get("failed") is True, "middle batch failed, so failed must be True" |
| 785 | |
| 786 | def test_output_json_schema_complete_on_bad_response(self, tmp_path: pathlib.Path) -> None: |
| 787 | """All required keys must be present in JSON output even on error.""" |
| 788 | code, output = _run_push_with_hub_response( |
| 789 | tmp_path, {"inserted": None, "skipped": None} |
| 790 | ) |
| 791 | lines = [l for l in output.strip().splitlines() if l.strip()] |
| 792 | summary = json.loads(lines[-1]) |
| 793 | for key in ("schema", "inserted", "skipped", "total", "failed", "duration_ms"): |
| 794 | assert key in summary, f"key {key!r} missing from summary: {summary}" |
File History
1 commit
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
30 days ago