""" Tests for the bug: hub response with null/non-list records or null/non-integer cursor causes unhandled TypeError that escapes run_pull as a raw traceback. Root cause (coord_bus.py::pull_from_hub): return _post_json(url, body, token) The raw hub response is returned without validation. run_pull then does: pulled_records: list[dict] = result.get("records", []) cursor: int = result.get("cursor", 0) When hub returns {"records": null}: - result.get("records", []) → None (key EXISTS, default [] not used) - if pulled_records: → False (None is falsy, so _write_remote_records skipped) - len(pulled_records) → TypeError: object of type 'NoneType' has no len() When hub returns {"records": ["a", "b"]} (list of strings not dicts): - _write_remote_records iterates; rec.get("kind") → AttributeError on str When hub returns {"cursor": null}: - cursor = None; json.dumps({"cursor": None}) → "cursor": null in output - text mode: f"cursor: {cursor}" → "cursor: None" — contract violated When hub returns {"cursor": "malicious"}: - cursor = "malicious"; propagated verbatim to JSON output None of these are CoordBusError. run_pull only catches CoordBusError. TypeError/AttributeError escape as raw tracebacks. Fix location: pull_from_hub in coord_bus.py — validate response before returning. Coverage: Unit — pull_from_hub directly, all bad-value variants Integration — run_pull with bad hub response, two layers deep End-to-end — CLI output is valid JSON with no tracebacks Stress — 50 consecutive pulls mixing good and bad responses Performance — bad response path not slower than good path Security — hub cannot inject arbitrary values into cursor output Data integrity — cursor and count in output reflect reality """ from __future__ import annotations import argparse import io import itertools import json import pathlib import sys import time from unittest.mock import patch import pytest from muse.core.types import MsgpackDict, MsgpackValue, content_hash from muse.core.paths import coordination_dir, muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _FUTURE_TS = "2099-12-31T23:59:59+00:00" _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir(parents=True, exist_ok=True) return tmp_path def _good_record(i: int = 0) -> MsgpackDict: return { "kind": "reservation", "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"reservation_id": f"res-{i:06d}", "expires_at": _FUTURE_TS}, "expires_at": _FUTURE_TS, } def _run_pull_with_hub_response( tmp_path: pathlib.Path, hub_response: MsgpackDict, ) -> tuple[int | str | None, str]: """ Run run_pull with pull_from_hub mocked to return hub_response. Returns (exit_code, stdout). exit_code is None for clean success, int for SystemExit, "CRASH" for unhandled exception. """ root = _make_repo(tmp_path) captured = io.StringIO() # Mock _post_json (not pull_from_hub) so the validation in pull_from_hub runs. with patch("muse.core.coord_bus._post_json", return_value=hub_response), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", captured): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, since_id=0, limit=1000, kinds=[], ) try: from muse.cli.commands.coord_sync import run_pull run_pull(args) except SystemExit as exc: return (exc.code, captured.getvalue()) except Exception as exc: return ("CRASH", f"{type(exc).__name__}: {exc}") return (None, captured.getvalue()) # ============================================================================= # 1. UNIT — pull_from_hub directly # ============================================================================= class TestPullFromHubNullRecordsUnit: """ Unit tests on coord_bus.pull_from_hub. _post_json mocked to return bad responses. Assert CoordBusError is raised, not TypeError/AttributeError. """ # --- records field --- def test_null_records_raises_coord_bus_error(self) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": None, "cursor": 0}): with pytest.raises(CoordBusError): pull_from_hub("https://localhost:1337", "torvalds", "linux") def test_null_records_never_raises_raw_typeerror(self) -> None: """The confirmed crash: len(None) must not escape as TypeError.""" from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": None, "cursor": 0}): try: pull_from_hub("https://localhost:1337", "torvalds", "linux") except CoordBusError: pass # correct except TypeError as exc: pytest.fail(f"Raw TypeError escaped pull_from_hub: {exc}") @pytest.mark.parametrize("bad_records", [ "malicious string", 42, 3.14, True, {"key": "val"}, ]) def test_non_list_records_raises_coord_bus_error(self, bad_records: MsgpackValue) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": bad_records, "cursor": 0}): with pytest.raises(CoordBusError): pull_from_hub("https://localhost:1337", "torvalds", "linux") def test_list_of_strings_raises_coord_bus_error(self) -> None: """List of strings would cause AttributeError on .get() — must be CoordBusError.""" from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": ["a", "b", "c"], "cursor": 3}): with pytest.raises(CoordBusError): pull_from_hub("https://localhost:1337", "torvalds", "linux") def test_list_of_strings_never_raises_raw_attributeerror(self) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": ["a", "b"], "cursor": 2}): try: pull_from_hub("https://localhost:1337", "torvalds", "linux") except CoordBusError: pass except AttributeError as exc: pytest.fail(f"Raw AttributeError escaped pull_from_hub: {exc}") def test_missing_records_key_defaults_to_empty_list(self) -> None: """Hub omits records entirely — must default to [] not crash.""" from muse.core.coord_bus import pull_from_hub with patch("muse.core.coord_bus._post_json", return_value={"cursor": 0}): result = pull_from_hub("https://localhost:1337", "torvalds", "linux") assert result["records"] == [] def test_empty_records_list_is_valid(self) -> None: from muse.core.coord_bus import pull_from_hub with patch("muse.core.coord_bus._post_json", return_value={"records": [], "cursor": 0}): result = pull_from_hub("https://localhost:1337", "torvalds", "linux") assert result["records"] == [] assert result["cursor"] == 0 def test_valid_records_list_passes_through(self) -> None: from muse.core.coord_bus import pull_from_hub records = [_good_record(i) for i in range(5)] with patch("muse.core.coord_bus._post_json", return_value={"records": records, "cursor": 5}): result = pull_from_hub("https://localhost:1337", "torvalds", "linux") assert len(result["records"]) == 5 assert result["cursor"] == 5 # --- cursor field --- def test_null_cursor_raises_coord_bus_error(self) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": [], "cursor": None}): with pytest.raises(CoordBusError): pull_from_hub("https://localhost:1337", "torvalds", "linux") @pytest.mark.parametrize("bad_cursor", [ "malicious", [], {}, "123abc", ]) def test_non_integer_cursor_raises_coord_bus_error(self, bad_cursor: MsgpackValue) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": [], "cursor": bad_cursor}): with pytest.raises(CoordBusError): pull_from_hub("https://localhost:1337", "torvalds", "linux") def test_negative_cursor_raises_coord_bus_error(self) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": [], "cursor": -1}): with pytest.raises(CoordBusError): pull_from_hub("https://localhost:1337", "torvalds", "linux") def test_missing_cursor_key_defaults_to_zero(self) -> None: from muse.core.coord_bus import pull_from_hub with patch("muse.core.coord_bus._post_json", return_value={"records": []}): result = pull_from_hub("https://localhost:1337", "torvalds", "linux") assert result["cursor"] == 0 def test_float_cursor_truncated(self) -> None: """Float cursor from hub is truncated to int — no crash.""" from muse.core.coord_bus import pull_from_hub with patch("muse.core.coord_bus._post_json", return_value={"records": [], "cursor": 7.9}): result = pull_from_hub("https://localhost:1337", "torvalds", "linux") assert result["cursor"] == 7 def test_zero_cursor_valid(self) -> None: from muse.core.coord_bus import pull_from_hub with patch("muse.core.coord_bus._post_json", return_value={"records": [], "cursor": 0}): result = pull_from_hub("https://localhost:1337", "torvalds", "linux") assert result["cursor"] == 0 # --- both null --- def test_both_null_raises_coord_bus_error(self) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": None, "cursor": None}): with pytest.raises(CoordBusError): pull_from_hub("https://localhost:1337", "torvalds", "linux") # ============================================================================= # 2. INTEGRATION — run_pull with bad hub response # ============================================================================= class TestRunPullNullRecordsIntegration: """ Integration: run_pull with pull_from_hub mocked at the boundary. Asserts clean exit, no unhandled exceptions, correct JSON structure. """ def test_null_records_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_pull_with_hub_response( tmp_path, {"records": None, "cursor": 0} ) assert code != "CRASH", f"run_pull crashed: {output}" def test_null_cursor_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_pull_with_hub_response( tmp_path, {"records": [], "cursor": None} ) assert code != "CRASH", f"run_pull crashed: {output}" def test_both_null_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_pull_with_hub_response( tmp_path, {"records": None, "cursor": None} ) assert code != "CRASH", f"run_pull crashed: {output}" def test_string_records_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_pull_with_hub_response( tmp_path, {"records": "malicious", "cursor": 0} ) assert code != "CRASH", f"run_pull crashed: {output}" def test_list_of_strings_exits_cleanly_not_crash(self, tmp_path: pathlib.Path) -> None: code, output = _run_pull_with_hub_response( tmp_path, {"records": ["a", "b", "c"], "cursor": 3} ) assert code != "CRASH", f"run_pull crashed: {output}" def test_null_records_exits_with_code_1(self, tmp_path: pathlib.Path) -> None: code, output = _run_pull_with_hub_response( tmp_path, {"records": None, "cursor": 0} ) assert code == 1, f"expected exit 1 for null records, got {code!r}" def test_bad_response_json_output_has_no_traceback(self, tmp_path: pathlib.Path) -> None: _, output = _run_pull_with_hub_response( tmp_path, {"records": None, "cursor": None} ) assert "Traceback" not in output assert "TypeError" not in output assert "AttributeError" not in output def test_good_response_exits_cleanly(self, tmp_path: pathlib.Path) -> None: records = [_good_record(i) for i in range(3)] code, output = _run_pull_with_hub_response( tmp_path, {"records": records, "cursor": 3} ) assert code in (0, None), f"expected clean exit, got {code!r}" def test_good_response_output_has_correct_count(self, tmp_path: pathlib.Path) -> None: records = [_good_record(i) for i in range(3)] code, output = _run_pull_with_hub_response( tmp_path, {"records": records, "cursor": 3} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert summary["count"] == 3 assert summary["cursor"] == 3 def test_empty_records_exits_cleanly(self, tmp_path: pathlib.Path) -> None: code, output = _run_pull_with_hub_response( tmp_path, {"records": [], "cursor": 0} ) assert code in (0, None) def test_text_mode_null_records_does_not_crash(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) with patch("muse.core.coord_bus._post_json", return_value={"records": None, "cursor": 0}), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", io.StringIO()): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, hub_url=None, since_id=0, limit=1000, kinds=[], ) try: from muse.cli.commands.coord_sync import run_pull run_pull(args) except SystemExit: pass except Exception as exc: pytest.fail(f"text mode crashed: {type(exc).__name__}: {exc}") # ============================================================================= # 3. END-TO-END — CLI output is always valid JSON with no exception text # ============================================================================= class TestRunPullNullRecordsEndToEnd: @pytest.mark.parametrize("bad_response", [ {"records": None, "cursor": 0}, {"records": None, "cursor": None}, {"records": [], "cursor": None}, {"records": "malicious", "cursor": 0}, {"records": 42, "cursor": 0}, {"records": ["a", "b"], "cursor": 2}, {"records": None}, {"cursor": 0}, {}, ]) def test_every_bad_response_produces_valid_json_output(self, tmp_path: pathlib.Path, bad_response: MsgpackDict) -> None: code, output = _run_pull_with_hub_response(tmp_path, bad_response) assert code != "CRASH", f"crashed on {bad_response}: {output}" lines = [l for l in output.strip().splitlines() if l.strip()] assert lines, f"no output for {bad_response}" for line in lines: try: json.loads(line) except json.JSONDecodeError: pytest.fail(f"non-JSON output for {bad_response!r}: {line!r}") def test_output_never_contains_exception_class_names(self, tmp_path: pathlib.Path) -> None: for bad in [None, "malicious", [], 42]: _, output = _run_pull_with_hub_response( tmp_path, {"records": bad, "cursor": 0} ) for forbidden in ("TypeError", "AttributeError", "Traceback", "most recent call", "ValueError"): assert forbidden not in output, ( f"{forbidden!r} leaked for records={bad!r}:\n{output}" ) def test_output_json_schema_complete_on_bad_response(self, tmp_path: pathlib.Path) -> None: """All required keys present in output even on error.""" _, output = _run_pull_with_hub_response( tmp_path, {"records": None, "cursor": 0} ) lines = [l for l in output.strip().splitlines() if l.strip()] # At minimum there should be an error line assert lines, "no output at all" # The last line must be parseable JSON summary = json.loads(lines[-1]) assert isinstance(summary, dict) # ============================================================================= # 4. STRESS — many pulls mixing good and bad responses # ============================================================================= class TestRunPullNullRecordsStress: def _pull(self, tmp_path_subdir: pathlib.Path, hub_response: MsgpackDict) -> tuple[int | str | None, str]: return _run_pull_with_hub_response(tmp_path_subdir, hub_response) def test_50_consecutive_null_records_no_crash(self, tmp_path: pathlib.Path) -> None: for i in range(50): code, output = self._pull( tmp_path / str(i), {"records": None, "cursor": i} ) assert code != "CRASH", f"crashed on iteration {i}: {output}" def test_alternating_good_and_null_no_crash(self, tmp_path: pathlib.Path) -> None: for i in range(20): if i % 2 == 0: response = {"records": [_good_record(i)], "cursor": i + 1} else: response = {"records": None, "cursor": None} code, output = self._pull(tmp_path / str(i), response) assert code != "CRASH", f"crashed on iteration {i}: {output}" def test_all_bad_response_types_in_sequence_no_crash(self, tmp_path: pathlib.Path) -> None: bad_responses = [ {"records": None, "cursor": 0}, {"records": None, "cursor": None}, {"records": "string", "cursor": 0}, {"records": 42, "cursor": 0}, {"records": True, "cursor": 0}, {"records": ["str1", "str2"], "cursor": 2}, {"records": {}, "cursor": 0}, {"records": [], "cursor": None}, {"records": [], "cursor": "bad"}, {"records": [], "cursor": -1}, ] for i, response in enumerate(bad_responses): code, output = self._pull(tmp_path / str(i), response) assert code != "CRASH", f"crashed on response {response}: {output}" assert code == 1, f"expected exit 1 for {response}, got {code!r}" # ============================================================================= # 5. PERFORMANCE — bad response path overhead is negligible # ============================================================================= class TestRunPullNullRecordsPerformance: def _time_pull(self, tmp_path: pathlib.Path, response: MsgpackDict) -> float: t0 = time.monotonic() _run_pull_with_hub_response(tmp_path, response) return time.monotonic() - t0 def test_null_response_not_slower_than_good_response(self, tmp_path: pathlib.Path) -> None: # warm up self._time_pull(tmp_path / "w1", {"records": [], "cursor": 0}) self._time_pull(tmp_path / "w2", {"records": None, "cursor": 0}) good = self._time_pull(tmp_path / "g", {"records": [], "cursor": 0}) bad = self._time_pull(tmp_path / "b", {"records": None, "cursor": 0}) assert bad < max(good * 10, 0.100), ( f"null response ({bad:.4f}s) unexpectedly slower than good ({good:.4f}s)" ) def test_50_null_pulls_under_1s(self, tmp_path: pathlib.Path) -> None: t0 = time.monotonic() for i in range(50): _run_pull_with_hub_response( tmp_path / str(i), {"records": None, "cursor": 0} ) elapsed = time.monotonic() - t0 assert elapsed < 1.0, f"50 null-response pulls took {elapsed:.3f}s (> 1s)" # ============================================================================= # 6. SECURITY — hub cannot inject arbitrary cursor values into output # ============================================================================= class TestRunPullNullRecordsSecurity: @pytest.mark.parametrize("attack_cursor", [ "__import__('os').system('id')", "${7*7}", "{{7*7}}", "' OR 1=1 --", "\x00\x01\x02", "9" * 10000, "1e308", "inf", -9999999999, ]) def test_attack_cursor_raises_coord_bus_error_not_exec(self, attack_cursor: str | int) -> None: from muse.core.coord_bus import pull_from_hub, CoordBusError with patch("muse.core.coord_bus._post_json", return_value={"records": [], "cursor": attack_cursor}): try: pull_from_hub("https://localhost:1337", "torvalds", "linux") except CoordBusError: pass except Exception as exc: pytest.fail( f"Attack cursor {attack_cursor!r} escaped as " f"{type(exc).__name__}: {exc}" ) def test_null_cursor_never_appears_in_output_as_none_string(self, tmp_path: pathlib.Path) -> None: """'None' must not appear in CLI output — it means Python None leaked.""" _, output = _run_pull_with_hub_response( tmp_path, {"records": [], "cursor": None} ) assert '"cursor": null' not in output or True # null is ok in error JSON # What must NOT happen: the Python repr "None" appearing as a string value for line in output.strip().splitlines(): if not line.strip(): continue parsed = json.loads(line) # If cursor appears in output it must be an int or absent if "cursor" in parsed: assert isinstance(parsed["cursor"], int) or parsed.get("failed"), ( f"cursor in output is not an int: {parsed['cursor']!r}" ) def test_extremely_large_cursor_rejected(self, tmp_path: pathlib.Path) -> None: """Hub claiming cursor=2^63 must not be accepted verbatim.""" huge = 2**63 code, output = _run_pull_with_hub_response( tmp_path, {"records": [], "cursor": huge} ) assert code != "CRASH" lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) if not summary.get("failed"): # If it succeeded, cursor must be sane assert summary.get("cursor", 0) <= 10**15, ( f"2^63 cursor accepted verbatim: {summary}" ) # ============================================================================= # 7. DATA INTEGRITY — count and cursor in output reflect reality # ============================================================================= class TestRunPullNullRecordsDataIntegrity: def test_count_is_zero_when_records_null(self, tmp_path: pathlib.Path) -> None: """count in output must be 0 (not a crash) when hub returns null records.""" code, output = _run_pull_with_hub_response( tmp_path, {"records": None, "cursor": 0} ) assert code != "CRASH" # Output must contain some line indicating failure lines = [l for l in output.strip().splitlines() if l.strip()] assert lines def test_count_equals_len_of_returned_records(self, tmp_path: pathlib.Path) -> None: records = [_good_record(i) for i in range(7)] _, output = _run_pull_with_hub_response( tmp_path, {"records": records, "cursor": 7} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert summary["count"] == 7 def test_cursor_in_output_matches_hub_cursor(self, tmp_path: pathlib.Path) -> None: records = [_good_record(i) for i in range(3)] _, output = _run_pull_with_hub_response( tmp_path, {"records": records, "cursor": 42} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert summary["cursor"] == 42 def test_cursor_zero_when_no_records(self, tmp_path: pathlib.Path) -> None: _, output = _run_pull_with_hub_response( tmp_path, {"records": [], "cursor": 0} ) lines = [l for l in output.strip().splitlines() if l.strip()] summary = json.loads(lines[-1]) assert summary["cursor"] == 0 def test_records_written_to_disk_on_good_response(self, tmp_path: pathlib.Path) -> None: """Pulled records must actually be written to the remote/ directory.""" root = _make_repo(tmp_path) records = [_good_record(i) for i in range(3)] with patch("muse.core.coord_bus._post_json", return_value={"records": records, "cursor": 3}), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", io.StringIO()): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, since_id=0, limit=1000, kinds=[], ) try: from muse.cli.commands.coord_sync import run_pull run_pull(args) except SystemExit: pass remote_dir = coordination_dir(root) / "remote" written = list(remote_dir.rglob("*.json")) assert len(written) == 3, f"expected 3 files written, got {len(written)}" def test_null_records_writes_nothing_to_disk(self, tmp_path: pathlib.Path) -> None: """When hub returns null records, nothing must be written to remote/.""" root = _make_repo(tmp_path) with patch("muse.core.coord_bus._post_json", return_value={"records": None, "cursor": 0}), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")), \ patch("sys.stdout", io.StringIO()): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=True, hub_url=None, since_id=0, limit=1000, kinds=[], ) try: from muse.cli.commands.coord_sync import run_pull run_pull(args) except SystemExit: pass except Exception: pass remote_dir = coordination_dir(root) / "remote" written = list(remote_dir.rglob("*.json")) if remote_dir.exists() else [] assert written == [], f"null records caused {len(written)} files to be written"