""" Tests for the bug: hub response with null/non-integer inserted/skipped counts causes an unhandled TypeError or ValueError that escapes run_push as a raw traceback instead of a clean CoordBusError. Root cause (coord_bus.py::push_to_hub lines 233-234): return { "inserted": int(result.get("inserted", 0)), # BUG "skipped": int(result.get("skipped", 0)), # BUG } When the hub returns {"inserted": null}: - result.get("inserted", 0) → None (key EXISTS so default is NOT used) - int(None) → TypeError When the hub returns {"inserted": "three"}: - int("three") → ValueError Neither TypeError nor ValueError is CoordBusError. run_push only catches CoordBusError — the exception escapes as a raw traceback. Coverage: Unit — push_to_hub directly, all bad-value variants Integration — run_push with bad hub response (two layers deep) End-to-end — CLI CliRunner invocation, asserts no traceback in output Stress — 14-batch push, every batch returns a bad response Performance — bad response handling overhead is negligible Security — hub cannot cause arbitrary code exec via count field Data integrity — partial-batch failures produce correct counts in output """ from __future__ import annotations import argparse import json import pathlib import time from io import BytesIO from unittest.mock import MagicMock, patch import pytest from muse.core.types import MsgpackDict, MsgpackValue from muse.core.paths import muse_dir # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim") _FUTURE_TS = "2099-12-31T23:59:59+00:00" def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir(parents=True, exist_ok=True) return tmp_path def _one_record() -> MsgpackDict: return { "kind": "reservation", "record_id": "res-000001", "run_id": "run-0", "payload": {"reservation_id": "res-000001", "expires_at": _FUTURE_TS}, "expires_at": _FUTURE_TS, } def _make_http_response(body: MsgpackDict) -> BytesIO: return BytesIO(json.dumps(body).encode()) def _run_push_with_hub_response(tmp_path: pathlib.Path, hub_response: MsgpackDict) -> tuple[int | None, str]: """ Run run_push with _post_json mocked to return hub_response. Returns (exit_code, stdout_captured). exit_code is None if no SystemExit was raised (i.e. the function returned normally). """ import io import sys root = _make_repo(tmp_path) captured = io.StringIO() exit_code = None with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=[_one_record()]), \ patch("muse.core.coord_bus._post_json", return_value=hub_response), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit as exc: exit_code = exc.code except Exception as exc: # If a non-SystemExit exception escapes, surface it explicitly return ("CRASH", f"{type(exc).__name__}: {exc}") return (exit_code, captured.getvalue()) # ============================================================================= # 1. UNIT — push_to_hub directly # ============================================================================= class TestPushToHubNullCountsUnit: """ Unit tests on coord_bus.push_to_hub. _post_json is mocked to return bad count values. Assert that push_to_hub raises CoordBusError, NOT TypeError or ValueError. """ @pytest.mark.parametrize("bad_inserted", [ None, # JSON null "three", # non-numeric string [], # list {}, # dict "1; drop table", # injection attempt ]) def test_bad_inserted_raises_coord_bus_error_not_typeerror(self, bad_inserted: MsgpackValue) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": bad_inserted, "skipped": 0}): with pytest.raises(CoordBusError): push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) @pytest.mark.parametrize("bad_skipped", [ None, "three", [], {}, "1; drop table", ]) def test_bad_skipped_raises_coord_bus_error_not_typeerror(self, bad_skipped: MsgpackValue) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": 1, "skipped": bad_skipped}): with pytest.raises(CoordBusError): push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) def test_both_null_raises_coord_bus_error(self) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": None}): with pytest.raises(CoordBusError): push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) def test_null_never_raises_raw_typeerror(self) -> None: """The specific confirmed bug: int(None) must not escape as TypeError.""" from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": 0}): try: push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) except CoordBusError: pass # correct except TypeError as exc: pytest.fail(f"Raw TypeError escaped push_to_hub: {exc}") def test_invalid_string_never_raises_raw_valueerror(self) -> None: """int('three') must not escape as ValueError.""" from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": "three", "skipped": 0}): try: push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) except CoordBusError: pass # correct except ValueError as exc: pytest.fail(f"Raw ValueError escaped push_to_hub: {exc}") # These should SUCCEED — confirm good values still work def test_valid_integer_counts_pass_through(self) -> None: from muse.core.coord_bus import push_to_hub with patch("muse.core.coord_bus._post_json", return_value={"inserted": 1, "skipped": 0}): result = push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) assert result == {"inserted": 1, "skipped": 0} def test_float_count_truncated_to_int(self) -> None: """Float counts are a hub bug but truncate cleanly when within bounds.""" from muse.core.coord_bus import push_to_hub # Send 3 records so int(2.9)=2 and int(0.1)=0 both pass the bounds check three_records = [_one_record(), _one_record(), _one_record()] with patch("muse.core.coord_bus._post_json", return_value={"inserted": 2.9, "skipped": 0.1}): result = push_to_hub("https://localhost:1337", "torvalds", "linux", three_records, signing=None) assert result["inserted"] == 2 assert result["skipped"] == 0 def test_zero_counts_valid(self) -> None: from muse.core.coord_bus import push_to_hub with patch("muse.core.coord_bus._post_json", return_value={"inserted": 0, "skipped": 0}): result = push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) assert result == {"inserted": 0, "skipped": 0} def test_missing_both_keys_defaults_to_zero(self) -> None: """Hub omits both keys entirely — already handled by .get default.""" from muse.core.coord_bus import push_to_hub with patch("muse.core.coord_bus._post_json", return_value={}): result = push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) assert result == {"inserted": 0, "skipped": 0} def test_negative_count_raises_coord_bus_error(self) -> None: """Hub returning negative counts is a protocol violation.""" from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": -5, "skipped": 0}): with pytest.raises(CoordBusError): push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) def test_count_exceeding_batch_size_raises_coord_bus_error(self) -> None: """Hub claims it inserted more records than were sent — impossible.""" from muse.core.coord_bus import push_to_hub, CoordBusError, MAX_PUSH_BATCH with patch("muse.core.coord_bus._post_json", return_value={"inserted": MAX_PUSH_BATCH + 1, "skipped": 0}): with pytest.raises(CoordBusError): push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) # ============================================================================= # 2. INTEGRATION — run_push with bad hub response (two layers deep) # ============================================================================= class TestRunPushNullCountsIntegration: """ Integration tests: run_push with _post_json mocked at the wire level. Asserts clean exit (SystemExit(1) for hub errors) — never an unhandled exception. """ def test_null_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_null_skipped_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": 1, "skipped": None} ) assert code != "CRASH", f"run_push crashed: {output}" def test_both_null_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) assert code != "CRASH", f"run_push crashed: {output}" def test_string_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": "three", "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_list_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": [1, 2, 3], "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_dict_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": {"count": 1}, "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_bad_response_exits_with_code_1(self, tmp_path: pathlib.Path) -> None: """Bad hub response should be an error exit (code 1), not success (None/0).""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) # run_push raises SystemExit(1) when failed=True; clean success returns None assert code == 1, f"expected exit code 1 for bad hub response, got {code!r}" def test_bad_response_json_output_has_failed_true(self, tmp_path: pathlib.Path) -> None: """JSON output must have failed=true, not a raw exception message.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] assert lines, "no output produced" summary = json.loads(lines[-1]) assert summary.get("failed") is True, f"expected failed=true in {summary}" def test_bad_response_output_contains_no_traceback(self, tmp_path: pathlib.Path) -> None: """Traceback must never appear in stdout.""" _, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) assert "Traceback" not in output, f"traceback leaked to stdout:\n{output}" assert "TypeError" not in output, f"TypeError leaked to stdout:\n{output}" assert "ValueError" not in output, f"ValueError leaked to stdout:\n{output}" def test_good_response_still_works_after_fix(self, tmp_path: pathlib.Path) -> None: """Valid hub response must still succeed after the fix is applied.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": 1, "skipped": 0} ) # run_push does not raise SystemExit on success — exit_code stays None assert code in (0, None), f"expected clean exit for valid response, got {code!r}" summary = json.loads(output.strip().splitlines()[-1]) assert summary["inserted"] == 1 assert summary["skipped"] == 0 assert summary["failed"] is False # ============================================================================= # 3. END-TO-END — CLI output is valid JSON with no tracebacks # ============================================================================= class TestRunPushNullCountsEndToEnd: """ End-to-end: simulate what an operator would see at the terminal. Output must be valid JSON, must contain no Python traceback text, and the process must exit cleanly (no unhandled exception). """ @pytest.mark.parametrize("bad_response", [ {"inserted": None, "skipped": None}, {"inserted": None, "skipped": 0}, {"inserted": 1, "skipped": None}, {"inserted": "bad", "skipped": 0}, {"inserted": [], "skipped": 0}, {}, # completely empty {"other": "keys"}, # no inserted/skipped at all (already handled) ]) def test_cli_output_is_valid_json_for_bad_hub_response(self, tmp_path: pathlib.Path, bad_response: MsgpackDict) -> None: code, output = _run_push_with_hub_response(tmp_path, bad_response) assert code != "CRASH", f"run_push crashed on {bad_response}: {output}" lines = [l for l in output.strip().splitlines() if l.strip()] assert lines, f"no output for hub response {bad_response}" # Every output line must be valid JSON for line in lines: try: json.loads(line) except json.JSONDecodeError: pytest.fail(f"non-JSON line in output for {bad_response}: {line!r}") def test_cli_output_never_contains_exception_class_names(self, tmp_path: pathlib.Path) -> None: for bad_val in [None, "bad", [], {}]: _, output = _run_push_with_hub_response( tmp_path, {"inserted": bad_val, "skipped": 0} ) for forbidden in ("TypeError", "ValueError", "AttributeError", "Traceback", "most recent call"): assert forbidden not in output, ( f"{forbidden!r} leaked into CLI output for inserted={bad_val!r}:\n{output}" ) def test_text_mode_also_clean_on_bad_response(self, tmp_path: pathlib.Path) -> None: """Non-JSON (text) mode must also not crash.""" import io import sys root = _make_repo(tmp_path) captured = io.StringIO() with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=[_one_record()]), \ patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": None}), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass except Exception as exc: pytest.fail(f"text mode crashed: {type(exc).__name__}: {exc}") output = captured.getvalue() assert "Traceback" not in output assert "TypeError" not in output # ============================================================================= # 4. STRESS — 14 batches, every batch returns a bad response # ============================================================================= class TestRunPushNullCountsStress: """ Stress tests: 14 batches (7000 records) all return null counts. The system must not crash on any batch and must emit correct summary output. """ def _run_push_n_batches( self, tmp_path: pathlib.Path, n_records: int, hub_responses: list[MsgpackDict], ) -> tuple[int | None, MsgpackDict]: import io root = _make_repo(tmp_path) records = [ { "kind": "reservation", "record_id": f"res-{i:06d}", "run_id": f"run-{i}", "payload": {}, "expires_at": _FUTURE_TS, } for i in range(n_records) ] response_iter = iter(hub_responses) def fake_post_json(url: str, body: MsgpackDict, token: str) -> MsgpackDict: try: return next(response_iter) except StopIteration: return {"inserted": 0, "skipped": 0} captured = io.StringIO() exit_code = None with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records), \ patch("muse.core.coord_bus._post_json", side_effect=fake_post_json), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit as exc: exit_code = exc.code except Exception as exc: return ("CRASH", {}) lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) if lines else {} return (exit_code, summary) def test_all_14_batches_return_null_no_crash(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 14 # 7000 records responses = [{"inserted": None, "skipped": None}] * 14 code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH", "run_push crashed on 14 null-count batches" def test_all_14_batches_return_null_exit_code_1(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 14 responses = [{"inserted": None, "skipped": None}] * 14 code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code == 1, f"expected exit 1 for all-null batches, got {code!r}" def test_all_14_batches_return_null_failed_true_in_output(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 14 responses = [{"inserted": None, "skipped": None}] * 14 code, summary = self._run_push_n_batches(tmp_path, n, responses) assert summary.get("failed") is True def test_alternating_good_and_null_batches(self, tmp_path: pathlib.Path) -> None: """Odd batches succeed, even batches return null. No crash. failed=true.""" from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 6 # 3000 records responses = [] for i in range(6): if i % 2 == 0: responses.append({"inserted": MAX_PUSH_BATCH, "skipped": 0}) else: responses.append({"inserted": None, "skipped": None}) code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH" assert summary.get("failed") is True # 3 good batches × MAX_PUSH_BATCH inserted assert summary.get("inserted") == MAX_PUSH_BATCH * 3 def test_first_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 3 responses = [ {"inserted": None, "skipped": None}, {"inserted": MAX_PUSH_BATCH, "skipped": 0}, {"inserted": MAX_PUSH_BATCH, "skipped": 0}, ] code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH" assert summary.get("failed") is True # 2 good batches should still be counted assert summary.get("inserted") == MAX_PUSH_BATCH * 2 def test_last_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 3 responses = [ {"inserted": MAX_PUSH_BATCH, "skipped": 0}, {"inserted": MAX_PUSH_BATCH, "skipped": 0}, {"inserted": None, "skipped": None}, ] code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH" assert summary.get("failed") is True assert summary.get("inserted") == MAX_PUSH_BATCH * 2 # ============================================================================= # 5. PERFORMANCE — bad response handling overhead is negligible # ============================================================================= class TestRunPushNullCountsPerformance: """ Bad response handling must not introduce measurable overhead. Error paths in push_to_hub should be as fast as success paths. """ def _measure_push(self, tmp_path: pathlib.Path, response: MsgpackDict) -> float: import io root = _make_repo(tmp_path) records = [_one_record()] with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records), \ patch("muse.core.coord_bus._post_json", return_value=response), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", io.StringIO()): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, kinds=["reservation"], ) t0 = time.monotonic() try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass return time.monotonic() - t0 def test_null_response_not_slower_than_good_response(self, tmp_path: pathlib.Path) -> None: # Warm up self._measure_push(tmp_path, {"inserted": 1, "skipped": 0}) self._measure_push(tmp_path / "x", {"inserted": None, "skipped": None}) good = self._measure_push(tmp_path / "good", {"inserted": 1, "skipped": 0}) bad = self._measure_push(tmp_path / "bad", {"inserted": None, "skipped": None}) assert bad < max(good * 10, 0.100), ( f"null response path ({bad:.4f}s) is unexpectedly slower than " f"good response ({good:.4f}s)" ) def test_100_consecutive_null_responses_under_1s(self, tmp_path: pathlib.Path) -> None: import io root = _make_repo(tmp_path) records = [_one_record()] t0 = time.monotonic() for i in range(100): with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records), \ patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": None}), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", io.StringIO()): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass elapsed = time.monotonic() - t0 assert elapsed < 1.0, f"100 null-response pushes took {elapsed:.3f}s (> 1s)" # ============================================================================= # 6. SECURITY — hub cannot cause code execution via count fields # ============================================================================= class TestRunPushNullCountsSecurity: """ Security: a malicious hub cannot exploit the count parsing path. All attack payloads must result in CoordBusError, never in exec/import. """ @pytest.mark.parametrize("attack_payload", [ "__import__('os').system('ls')", "1; __import__('os').system('ls')", "exec('import os')", "${7*7}", "{{7*7}}", "' OR 1=1 --", "\x00\x01\x02", "9" * 10000, # absurdly long numeric string "1e308", # float overflow "inf", "nan", "-inf", ]) def test_malicious_inserted_raises_coord_bus_error_not_exec(self, attack_payload: str | int | float | None) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": attack_payload, "skipped": 0}): try: push_to_hub("https://localhost:1337", "torvalds", "linux", [_one_record()], signing=None) except CoordBusError: pass # correct — attack contained except Exception as exc: pytest.fail( f"Attack payload {attack_payload!r} escaped as " f"{type(exc).__name__}: {exc}" ) def test_attack_payload_causes_coord_bus_error_not_execution(self, tmp_path: pathlib.Path) -> None: """ Malicious count value must be rejected as CoordBusError — not executed. The error message may contain the repr of the bad value (that is fine for a CLI tool), but the Python expression must never be evaluated. """ attack = "__import__('os').system('id')" code, output = _run_push_with_hub_response( tmp_path, {"inserted": attack, "skipped": 0} ) # Must be an error exit, not success assert code == 1, f"expected exit 1 for attack payload, got {code!r}" # Must not crash with unhandled exception assert code != "CRASH", f"attack payload caused crash: {output}" # Output must be valid JSON (no raw traceback) lines = [l for l in output.strip().splitlines() if l.strip()] for line in lines: try: json.loads(line) except json.JSONDecodeError: pytest.fail(f"non-JSON output for attack payload: {line!r}") def test_extremely_large_count_rejected(self, tmp_path: pathlib.Path) -> None: """Hub claiming it inserted 2^63 records is impossible; treat as error.""" huge = 2**63 code, output = _run_push_with_hub_response( tmp_path, {"inserted": huge, "skipped": 0} ) # Should not silently succeed with a nonsensical count assert code != "CRASH" lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) if lines else {} # Either it fails, or if it "succeeds" the count must be sane (not 2^63) if summary.get("failed") is False: assert summary.get("inserted", 0) <= 10**9, ( f"hub's 2^63 count was accepted verbatim: {summary}" ) # ============================================================================= # 7. DATA INTEGRITY — counts in output reflect reality # ============================================================================= class TestRunPushNullCountsDataIntegrity: """ When some batches succeed and some return bad counts, the summary output must accurately reflect only the records from successful batches. """ def test_total_reflects_records_sent_not_hub_count(self, tmp_path: pathlib.Path) -> None: """'total' in output is len(local records), independent of hub response.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) # total must be 1 (we sent 1 record) regardless of hub response assert summary.get("total") == 1, ( f"total should be 1 (records sent), got {summary.get('total')}" ) def test_inserted_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None: """When hub returns null for inserted, the count must be 0, not garbage.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": 1} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) # After fix: inserted should be 0 (not crashing, not garbage) assert isinstance(summary.get("inserted"), int), ( f"inserted must be int in summary, got {summary.get('inserted')!r}" ) def test_skipped_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": 1, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert isinstance(summary.get("skipped"), int), ( f"skipped must be int in summary, got {summary.get('skipped')!r}" ) def test_partial_null_counts_are_accumulated_correctly(self, tmp_path: pathlib.Path) -> None: """ 3 batches: inserted=[5, null, 3]. After fix: total inserted = 5 + 0 + 3 = 8. """ import io from muse.core.coord_bus import MAX_PUSH_BATCH root = _make_repo(tmp_path) records = [ {"kind": "reservation", "record_id": f"res-{i:06d}", "run_id": "r", "payload": {}, "expires_at": _FUTURE_TS} for i in range(3) ] responses = iter([ {"inserted": 5, "skipped": 0}, {"inserted": None, "skipped": None}, {"inserted": 3, "skipped": 0}, ]) def fake_post(url: str, body: MsgpackDict, token: str) -> MsgpackDict: return next(responses) captured = io.StringIO() with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records * MAX_PUSH_BATCH), \ patch("muse.core.coord_bus._post_json", side_effect=fake_post), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass except Exception as exc: pytest.fail(f"Crashed: {type(exc).__name__}: {exc}") lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert summary.get("inserted") == 8, ( f"expected inserted=8 (5+0+3), got {summary.get('inserted')}" ) assert summary.get("failed") is True, "middle batch failed, so failed must be True" def test_output_json_schema_complete_on_bad_response(self, tmp_path: pathlib.Path) -> None: """All required keys must be present in JSON output even on error.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) for key in ("schema", "inserted", "skipped", "total", "failed", "duration_ms"): assert key in summary, f"key {key!r} missing from summary: {summary}"