""" Tests for the bug: hub response with null/non-integer inserted/skipped counts causes an unhandled TypeError or ValueError that escapes run_push as a raw traceback instead of a clean CoordBusError. Root cause (coord_bus.py::push_to_hub lines 233-234): return { "inserted": int(result.get("inserted", 0)), # BUG "skipped": int(result.get("skipped", 0)), # BUG } When the hub returns {"inserted": null}: - result.get("inserted", 0) → None (key EXISTS so default is NOT used) - int(None) → TypeError When the hub returns {"inserted": "three"}: - int("three") → ValueError Neither TypeError nor ValueError is CoordBusError. run_push only catches CoordBusError — the exception escapes as a raw traceback. Coverage: Unit — push_to_hub directly, all bad-value variants Integration — run_push with bad hub response (two layers deep) End-to-end — CLI CliRunner invocation, asserts no traceback in output Stress — 14-batch push, every batch returns a bad response Performance — bad response handling overhead is negligible Security — hub cannot cause arbitrary code exec via count field Data integrity — partial-batch failures produce correct counts in output """ from __future__ import annotations import argparse import json import pathlib import time from io import BytesIO from unittest.mock import MagicMock, patch import pytest from muse.core._types import MsgpackDict, MsgpackValue # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim") _FUTURE_TS = "2099-12-31T23:59:59+00:00" def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: (tmp_path / ".muse").mkdir(parents=True, exist_ok=True) return tmp_path def _one_record() -> MsgpackDict: return { "kind": "reservation", "record_uuid": "res-000001", "run_id": "run-0", "payload": {"reservation_id": "res-000001", "expires_at": _FUTURE_TS}, "expires_at": _FUTURE_TS, } def _make_http_response(body: MsgpackDict) -> BytesIO: return BytesIO(json.dumps(body).encode()) def _run_push_with_hub_response(tmp_path: pathlib.Path, hub_response: MsgpackDict) -> tuple[int | None, str]: """ Run run_push with _post_json mocked to return hub_response. Returns (exit_code, stdout_captured). exit_code is None if no SystemExit was raised (i.e. the function returned normally). """ import io import sys root = _make_repo(tmp_path) captured = io.StringIO() exit_code = None with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=[_one_record()]), \ patch("muse.core.coord_bus._post_json", return_value=hub_response), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("http://localhost:10003", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", fmt="json", hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit as exc: exit_code = exc.code except Exception as exc: # If a non-SystemExit exception escapes, surface it explicitly return ("CRASH", f"{type(exc).__name__}: {exc}") return (exit_code, captured.getvalue()) # ============================================================================= # 1. UNIT — push_to_hub directly # ============================================================================= class TestPushToHubNullCountsUnit: """ Unit tests on coord_bus.push_to_hub. _post_json is mocked to return bad count values. Assert that push_to_hub raises CoordBusError, NOT TypeError or ValueError. """ @pytest.mark.parametrize("bad_inserted", [ None, # JSON null "three", # non-numeric string [], # list {}, # dict "1; drop table", # injection attempt ]) def test_bad_inserted_raises_coord_bus_error_not_typeerror(self, bad_inserted: MsgpackValue) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": bad_inserted, "skipped": 0}): with pytest.raises(CoordBusError): push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) @pytest.mark.parametrize("bad_skipped", [ None, "three", [], {}, "1; drop table", ]) def test_bad_skipped_raises_coord_bus_error_not_typeerror(self, bad_skipped: MsgpackValue) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": 1, "skipped": bad_skipped}): with pytest.raises(CoordBusError): push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) def test_both_null_raises_coord_bus_error(self) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": None}): with pytest.raises(CoordBusError): push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) def test_null_never_raises_raw_typeerror(self) -> None: """The specific confirmed bug: int(None) must not escape as TypeError.""" from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": 0}): try: push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) except CoordBusError: pass # correct except TypeError as exc: pytest.fail(f"Raw TypeError escaped push_to_hub: {exc}") def test_invalid_string_never_raises_raw_valueerror(self) -> None: """int('three') must not escape as ValueError.""" from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": "three", "skipped": 0}): try: push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) except CoordBusError: pass # correct except ValueError as exc: pytest.fail(f"Raw ValueError escaped push_to_hub: {exc}") # These should SUCCEED — confirm good values still work def test_valid_integer_counts_pass_through(self) -> None: from muse.core.coord_bus import push_to_hub with patch("muse.core.coord_bus._post_json", return_value={"inserted": 1, "skipped": 0}): result = push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) assert result == {"inserted": 1, "skipped": 0} def test_float_count_truncated_to_int(self) -> None: """Float counts are a hub bug but truncate cleanly when within bounds.""" from muse.core.coord_bus import push_to_hub # Send 3 records so int(2.9)=2 and int(0.1)=0 both pass the bounds check three_records = [_one_record(), _one_record(), _one_record()] with patch("muse.core.coord_bus._post_json", return_value={"inserted": 2.9, "skipped": 0.1}): result = push_to_hub("http://localhost:10003", "torvalds", "linux", three_records, signing=None) assert result["inserted"] == 2 assert result["skipped"] == 0 def test_zero_counts_valid(self) -> None: from muse.core.coord_bus import push_to_hub with patch("muse.core.coord_bus._post_json", return_value={"inserted": 0, "skipped": 0}): result = push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) assert result == {"inserted": 0, "skipped": 0} def test_missing_both_keys_defaults_to_zero(self) -> None: """Hub omits both keys entirely — already handled by .get default.""" from muse.core.coord_bus import push_to_hub with patch("muse.core.coord_bus._post_json", return_value={}): result = push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) assert result == {"inserted": 0, "skipped": 0} def test_negative_count_raises_coord_bus_error(self) -> None: """Hub returning negative counts is a protocol violation.""" from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": -5, "skipped": 0}): with pytest.raises(CoordBusError): push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) def test_count_exceeding_batch_size_raises_coord_bus_error(self) -> None: """Hub claims it inserted more records than were sent — impossible.""" from muse.core.coord_bus import push_to_hub, CoordBusError, MAX_PUSH_BATCH with patch("muse.core.coord_bus._post_json", return_value={"inserted": MAX_PUSH_BATCH + 1, "skipped": 0}): with pytest.raises(CoordBusError): push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) # ============================================================================= # 2. INTEGRATION — run_push with bad hub response (two layers deep) # ============================================================================= class TestRunPushNullCountsIntegration: """ Integration tests: run_push with _post_json mocked at the wire level. Asserts clean exit (SystemExit(1) for hub errors) — never an unhandled exception. """ def test_null_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_null_skipped_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": 1, "skipped": None} ) assert code != "CRASH", f"run_push crashed: {output}" def test_both_null_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) assert code != "CRASH", f"run_push crashed: {output}" def test_string_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": "three", "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_list_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": [1, 2, 3], "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_dict_inserted_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": {"count": 1}, "skipped": 0} ) assert code != "CRASH", f"run_push crashed: {output}" def test_bad_response_exits_with_code_1(self, tmp_path: pathlib.Path) -> None: """Bad hub response should be an error exit (code 1), not success (None/0).""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) # run_push raises SystemExit(1) when failed=True; clean success returns None assert code == 1, f"expected exit code 1 for bad hub response, got {code!r}" def test_bad_response_json_output_has_failed_true(self, tmp_path: pathlib.Path) -> None: """JSON output must have failed=true, not a raw exception message.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] assert lines, "no output produced" summary = json.loads(lines[-1]) assert summary.get("failed") is True, f"expected failed=true in {summary}" def test_bad_response_output_contains_no_traceback(self, tmp_path: pathlib.Path) -> None: """Traceback must never appear in stdout.""" _, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) assert "Traceback" not in output, f"traceback leaked to stdout:\n{output}" assert "TypeError" not in output, f"TypeError leaked to stdout:\n{output}" assert "ValueError" not in output, f"ValueError leaked to stdout:\n{output}" def test_good_response_still_works_after_fix(self, tmp_path: pathlib.Path) -> None: """Valid hub response must still succeed after the fix is applied.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": 1, "skipped": 0} ) # run_push does not raise SystemExit on success — exit_code stays None assert code in (0, None), f"expected clean exit for valid response, got {code!r}" summary = json.loads(output.strip().splitlines()[-1]) assert summary["inserted"] == 1 assert summary["skipped"] == 0 assert summary["failed"] is False # ============================================================================= # 3. END-TO-END — CLI output is valid JSON with no tracebacks # ============================================================================= class TestRunPushNullCountsEndToEnd: """ End-to-end: simulate what an operator would see at the terminal. Output must be valid JSON, must contain no Python traceback text, and the process must exit cleanly (no unhandled exception). """ @pytest.mark.parametrize("bad_response", [ {"inserted": None, "skipped": None}, {"inserted": None, "skipped": 0}, {"inserted": 1, "skipped": None}, {"inserted": "bad", "skipped": 0}, {"inserted": [], "skipped": 0}, {}, # completely empty {"other": "keys"}, # no inserted/skipped at all (already handled) ]) def test_cli_output_is_valid_json_for_bad_hub_response(self, tmp_path: pathlib.Path, bad_response: MsgpackDict) -> None: code, output = _run_push_with_hub_response(tmp_path, bad_response) assert code != "CRASH", f"run_push crashed on {bad_response}: {output}" lines = [l for l in output.strip().splitlines() if l.strip()] assert lines, f"no output for hub response {bad_response}" # Every output line must be valid JSON for line in lines: try: json.loads(line) except json.JSONDecodeError: pytest.fail(f"non-JSON line in output for {bad_response}: {line!r}") def test_cli_output_never_contains_exception_class_names(self, tmp_path: pathlib.Path) -> None: for bad_val in [None, "bad", [], {}]: _, output = _run_push_with_hub_response( tmp_path, {"inserted": bad_val, "skipped": 0} ) for forbidden in ("TypeError", "ValueError", "AttributeError", "Traceback", "most recent call"): assert forbidden not in output, ( f"{forbidden!r} leaked into CLI output for inserted={bad_val!r}:\n{output}" ) def test_text_mode_also_clean_on_bad_response(self, tmp_path: pathlib.Path) -> None: """Non-JSON (text) mode must also not crash.""" import io import sys root = _make_repo(tmp_path) captured = io.StringIO() with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=[_one_record()]), \ patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": None}), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("http://localhost:10003", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", fmt="text", hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass except Exception as exc: pytest.fail(f"text mode crashed: {type(exc).__name__}: {exc}") output = captured.getvalue() assert "Traceback" not in output assert "TypeError" not in output # ============================================================================= # 4. STRESS — 14 batches, every batch returns a bad response # ============================================================================= class TestRunPushNullCountsStress: """ Stress tests: 14 batches (7000 records) all return null counts. The system must not crash on any batch and must emit correct summary output. """ def _run_push_n_batches( self, tmp_path: pathlib.Path, n_records: int, hub_responses: list[MsgpackDict], ) -> tuple[int | None, MsgpackDict]: import io root = _make_repo(tmp_path) records = [ { "kind": "reservation", "record_uuid": f"res-{i:06d}", "run_id": f"run-{i}", "payload": {}, "expires_at": _FUTURE_TS, } for i in range(n_records) ] response_iter = iter(hub_responses) def fake_post_json(url: str, body: MsgpackDict, token: str) -> MsgpackDict: try: return next(response_iter) except StopIteration: return {"inserted": 0, "skipped": 0} captured = io.StringIO() exit_code = None with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records), \ patch("muse.core.coord_bus._post_json", side_effect=fake_post_json), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("http://localhost:10003", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", fmt="json", hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit as exc: exit_code = exc.code except Exception as exc: return ("CRASH", {}) lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) if lines else {} return (exit_code, summary) def test_all_14_batches_return_null_no_crash(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 14 # 7000 records responses = [{"inserted": None, "skipped": None}] * 14 code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH", "run_push crashed on 14 null-count batches" def test_all_14_batches_return_null_exit_code_1(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 14 responses = [{"inserted": None, "skipped": None}] * 14 code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code == 1, f"expected exit 1 for all-null batches, got {code!r}" def test_all_14_batches_return_null_failed_true_in_output(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 14 responses = [{"inserted": None, "skipped": None}] * 14 code, summary = self._run_push_n_batches(tmp_path, n, responses) assert summary.get("failed") is True def test_alternating_good_and_null_batches(self, tmp_path: pathlib.Path) -> None: """Odd batches succeed, even batches return null. No crash. failed=true.""" from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 6 # 3000 records responses = [] for i in range(6): if i % 2 == 0: responses.append({"inserted": MAX_PUSH_BATCH, "skipped": 0}) else: responses.append({"inserted": None, "skipped": None}) code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH" assert summary.get("failed") is True # 3 good batches × MAX_PUSH_BATCH inserted assert summary.get("inserted") == MAX_PUSH_BATCH * 3 def test_first_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 3 responses = [ {"inserted": None, "skipped": None}, {"inserted": MAX_PUSH_BATCH, "skipped": 0}, {"inserted": MAX_PUSH_BATCH, "skipped": 0}, ] code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH" assert summary.get("failed") is True # 2 good batches should still be counted assert summary.get("inserted") == MAX_PUSH_BATCH * 2 def test_last_batch_null_rest_succeed(self, tmp_path: pathlib.Path) -> None: from muse.core.coord_bus import MAX_PUSH_BATCH n = MAX_PUSH_BATCH * 3 responses = [ {"inserted": MAX_PUSH_BATCH, "skipped": 0}, {"inserted": MAX_PUSH_BATCH, "skipped": 0}, {"inserted": None, "skipped": None}, ] code, summary = self._run_push_n_batches(tmp_path, n, responses) assert code != "CRASH" assert summary.get("failed") is True assert summary.get("inserted") == MAX_PUSH_BATCH * 2 # ============================================================================= # 5. PERFORMANCE — bad response handling overhead is negligible # ============================================================================= class TestRunPushNullCountsPerformance: """ Bad response handling must not introduce measurable overhead. Error paths in push_to_hub should be as fast as success paths. """ def _measure_push(self, tmp_path: pathlib.Path, response: MsgpackDict) -> float: import io root = _make_repo(tmp_path) records = [_one_record()] with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records), \ patch("muse.core.coord_bus._post_json", return_value=response), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("http://localhost:10003", "tok")), \ patch("sys.stdout", io.StringIO()): args = argparse.Namespace( owner="torvalds", slug="linux", fmt="json", hub_url=None, kinds=["reservation"], ) t0 = time.monotonic() try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass return time.monotonic() - t0 def test_null_response_not_slower_than_good_response(self, tmp_path: pathlib.Path) -> None: # Warm up self._measure_push(tmp_path, {"inserted": 1, "skipped": 0}) self._measure_push(tmp_path / "x", {"inserted": None, "skipped": None}) good = self._measure_push(tmp_path / "good", {"inserted": 1, "skipped": 0}) bad = self._measure_push(tmp_path / "bad", {"inserted": None, "skipped": None}) assert bad < max(good * 10, 0.100), ( f"null response path ({bad:.4f}s) is unexpectedly slower than " f"good response ({good:.4f}s)" ) def test_100_consecutive_null_responses_under_1s(self, tmp_path: pathlib.Path) -> None: import io root = _make_repo(tmp_path) records = [_one_record()] t0 = time.monotonic() for i in range(100): with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records), \ patch("muse.core.coord_bus._post_json", return_value={"inserted": None, "skipped": None}), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("http://localhost:10003", "tok")), \ patch("sys.stdout", io.StringIO()): args = argparse.Namespace( owner="torvalds", slug="linux", fmt="json", hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass elapsed = time.monotonic() - t0 assert elapsed < 1.0, f"100 null-response pushes took {elapsed:.3f}s (> 1s)" # ============================================================================= # 6. SECURITY — hub cannot cause code execution via count fields # ============================================================================= class TestRunPushNullCountsSecurity: """ Security: a malicious hub cannot exploit the count parsing path. All attack payloads must result in CoordBusError, never in exec/import. """ @pytest.mark.parametrize("attack_payload", [ "__import__('os').system('ls')", "1; __import__('os').system('ls')", "exec('import os')", "${7*7}", "{{7*7}}", "' OR 1=1 --", "\x00\x01\x02", "9" * 10000, # absurdly long numeric string "1e308", # float overflow "inf", "nan", "-inf", ]) def test_malicious_inserted_raises_coord_bus_error_not_exec(self, attack_payload: str | int | float | None) -> None: from muse.core.coord_bus import push_to_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"inserted": attack_payload, "skipped": 0}): try: push_to_hub("http://localhost:10003", "torvalds", "linux", [_one_record()], signing=None) except CoordBusError: pass # correct — attack contained except Exception as exc: pytest.fail( f"Attack payload {attack_payload!r} escaped as " f"{type(exc).__name__}: {exc}" ) def test_attack_payload_causes_coord_bus_error_not_execution(self, tmp_path: pathlib.Path) -> None: """ Malicious count value must be rejected as CoordBusError — not executed. The error message may contain the repr of the bad value (that is fine for a CLI tool), but the Python expression must never be evaluated. """ attack = "__import__('os').system('id')" code, output = _run_push_with_hub_response( tmp_path, {"inserted": attack, "skipped": 0} ) # Must be an error exit, not success assert code == 1, f"expected exit 1 for attack payload, got {code!r}" # Must not crash with unhandled exception assert code != "CRASH", f"attack payload caused crash: {output}" # Output must be valid JSON (no raw traceback) lines = [l for l in output.strip().splitlines() if l.strip()] for line in lines: try: json.loads(line) except json.JSONDecodeError: pytest.fail(f"non-JSON output for attack payload: {line!r}") def test_extremely_large_count_rejected(self, tmp_path: pathlib.Path) -> None: """Hub claiming it inserted 2^63 records is impossible; treat as error.""" huge = 2**63 code, output = _run_push_with_hub_response( tmp_path, {"inserted": huge, "skipped": 0} ) # Should not silently succeed with a nonsensical count assert code != "CRASH" lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) if lines else {} # Either it fails, or if it "succeeds" the count must be sane (not 2^63) if summary.get("failed") is False: assert summary.get("inserted", 0) <= 10**9, ( f"hub's 2^63 count was accepted verbatim: {summary}" ) # ============================================================================= # 7. DATA INTEGRITY — counts in output reflect reality # ============================================================================= class TestRunPushNullCountsDataIntegrity: """ When some batches succeed and some return bad counts, the summary output must accurately reflect only the records from successful batches. """ def test_total_reflects_records_sent_not_hub_count(self, tmp_path: pathlib.Path) -> None: """'total' in output is len(local records), independent of hub response.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) # total must be 1 (we sent 1 record) regardless of hub response assert summary.get("total") == 1, ( f"total should be 1 (records sent), got {summary.get('total')}" ) def test_inserted_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None: """When hub returns null for inserted, the count must be 0, not garbage.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": 1} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) # After fix: inserted should be 0 (not crashing, not garbage) assert isinstance(summary.get("inserted"), int), ( f"inserted must be int in summary, got {summary.get('inserted')!r}" ) def test_skipped_is_zero_when_hub_returns_null(self, tmp_path: pathlib.Path) -> None: code, output = _run_push_with_hub_response( tmp_path, {"inserted": 1, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert isinstance(summary.get("skipped"), int), ( f"skipped must be int in summary, got {summary.get('skipped')!r}" ) def test_partial_null_counts_are_accumulated_correctly(self, tmp_path: pathlib.Path) -> None: """ 3 batches: inserted=[5, null, 3]. After fix: total inserted = 5 + 0 + 3 = 8. """ import io from muse.core.coord_bus import MAX_PUSH_BATCH root = _make_repo(tmp_path) records = [ {"kind": "reservation", "record_uuid": f"res-{i:06d}", "run_id": "r", "payload": {}, "expires_at": _FUTURE_TS} for i in range(3) ] responses = iter([ {"inserted": 5, "skipped": 0}, {"inserted": None, "skipped": None}, {"inserted": 3, "skipped": 0}, ]) def fake_post(url: str, body: MsgpackDict, token: str) -> MsgpackDict: return next(responses) captured = io.StringIO() with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records * MAX_PUSH_BATCH), \ patch("muse.core.coord_bus._post_json", side_effect=fake_post), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("http://localhost:10003", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", fmt="json", hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass except Exception as exc: pytest.fail(f"Crashed: {type(exc).__name__}: {exc}") lines = [l for l in captured.getvalue().strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert summary.get("inserted") == 8, ( f"expected inserted=8 (5+0+3), got {summary.get('inserted')}" ) assert summary.get("failed") is True, "middle batch failed, so failed must be True" def test_output_json_schema_complete_on_bad_response(self, tmp_path: pathlib.Path) -> None: """All required keys must be present in JSON output even on error.""" code, output = _run_push_with_hub_response( tmp_path, {"inserted": None, "skipped": None} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) for key in ("schema_version", "inserted", "skipped", "total", "failed", "elapsed_seconds"): assert key in summary, f"key {key!r} missing from summary: {summary}"