""" Tests for the bug: run_push emits multiple JSON objects to stdout when a batch fails in --json mode. Root cause (muse/cli/commands/coord_sync.py::run_push, batch loop): for i in range(0, len(records), MAX_PUSH_BATCH): batch = records[i : i + MAX_PUSH_BATCH] try: result = push_to_hub(...) ... except CoordBusError as exc: _err(str(exc), as_json, "hub_error") ← prints JSON to stdout failed = True # then unconditionally: print(json.dumps({..., "failed": True, ...})) ← second JSON to stdout When N batches fail you get N+1 JSON objects on stdout. Any caller doing ``json.loads(stdout)`` raises ``json.JSONDecodeError: Extra data`` or silently reads only the first (error) object and misses the summary. The fix: in JSON mode, never call _err() inside the batch loop. Accumulate errors in a list; include them in the single final JSON object under "errors". Text-mode (non-JSON) error lines continue to go to stderr as before. Coverage: Unit — single batch failure: exactly 1 JSON object on stdout Unit — multi-batch partial failure: exactly 1 JSON object on stdout Unit — all batches fail: exactly 1 JSON object on stdout Unit — no failures: exactly 1 JSON object on stdout (regression) Unit — errors list present in output when failed=True Unit — errors list empty when failed=False Unit — text mode still prints errors to stderr (regression) Data integrity — summary counts are correct even when some batches fail Data integrity — inserted/skipped counts from successful batches only Data integrity — failed flag is True when any batch fails Data integrity — failed flag is False when all batches succeed Integration — json.loads(stdout) succeeds with exactly the summary dict Integration — multi-batch partial failure: json.loads returns summary Security — error messages from hub are sanitized in final output Stress — 10 batches, every other fails: exactly 1 JSON object Regression — all existing --json fields still present after fix """ from __future__ import annotations import argparse import io import json import pathlib import sys from collections.abc import Callable from unittest.mock import patch import pytest from muse.core.types import MsgpackDict from muse.core.coord_bus import CoordBusError # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _MAX_PUSH_BATCH = 500 def _make_records(n: int, kind: str = "reservation") -> list[MsgpackDict]: return [ { "kind": kind, "record_id": f"rec-{i:06d}", "run_id": "run-torvalds", "payload": {"i": i}, "expires_at": "2099-12-31T23:59:59+00:00", } for i in range(n) ] def _make_args(fmt: str = "json") -> argparse.Namespace: return argparse.Namespace( json_out=(fmt == "json"), owner="gabriel", slug="muse", hub="https://localhost:1337", signing=None, kinds=["reservation", "heartbeat", "intent", "release", "dependency", "task", "claim"], ) def _run_push(records: list[MsgpackDict], push_side_effect: Callable[..., MsgpackDict], fmt: str = "json") -> tuple[str, str, int | None]: """Run run_push and capture stdout/stderr. Returns: (stdout_str, stderr_str, exit_code_or_None) """ from muse.cli.commands.coord_sync import run_push buf_out = io.StringIO() buf_err = io.StringIO() args = _make_args(fmt=fmt) exit_code = None with ( patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records), patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=push_side_effect), patch("muse.core.repo.require_repo", return_value=pathlib.Path("/fake/repo")), patch("sys.stdout", buf_out), patch("sys.stderr", buf_err), ): try: run_push(args) except SystemExit as e: exit_code = e.code return buf_out.getvalue(), buf_err.getvalue(), exit_code def _json_lines(stdout: str) -> list[MsgpackDict]: """Return all non-empty JSON objects from stdout lines.""" result = [] for line in stdout.splitlines(): line = line.strip() if line: result.append(json.loads(line)) return result def _always_succeed(n_per_batch: int = _MAX_PUSH_BATCH) -> Callable[..., MsgpackDict]: """Factory: push always returns inserted=len(batch), skipped=0.""" def _push(hub_url: str, owner: str, slug: str, batch: list[MsgpackDict], token: str | None = None) -> MsgpackDict: return {"inserted": len(batch), "skipped": 0} return _push def _fail_on_batch(fail_indices: set[int]) -> Callable[..., MsgpackDict]: """Factory: push raises CoordBusError on batches in fail_indices (0-based).""" call_count = [0] def _push(hub_url: str, owner: str, slug: str, batch: list[MsgpackDict], token: str | None = None) -> MsgpackDict: idx = call_count[0] call_count[0] += 1 if idx in fail_indices: raise CoordBusError(f"hub error on batch {idx}", status_code=500) return {"inserted": len(batch), "skipped": 0} return _push # ============================================================================= # 1. UNIT — stdout must contain exactly one JSON object # ============================================================================= class TestSingleJsonObject: def test_single_batch_failure_one_json_object(self) -> None: """Single batch (< MAX_PUSH_BATCH records), batch fails → exactly 1 JSON.""" records = _make_records(10) stdout, _, _ = _run_push(records, _fail_on_batch({0})) objects = _json_lines(stdout) assert len(objects) == 1, ( f"Expected 1 JSON object, got {len(objects)}.\n" f"BUG: _err() printed an error JSON AND a summary JSON to stdout.\n" f"stdout:\n{stdout}" ) def test_multi_batch_first_batch_fails_one_json_object(self) -> None: """600 records (2 batches), batch 0 fails → exactly 1 JSON object.""" records = _make_records(600) stdout, _, _ = _run_push(records, _fail_on_batch({0})) objects = _json_lines(stdout) assert len(objects) == 1, ( f"Expected 1 JSON object, got {len(objects)}.\n" f"stdout:\n{stdout}" ) def test_multi_batch_second_batch_fails_one_json_object(self) -> None: """600 records (2 batches), batch 1 fails → exactly 1 JSON object.""" records = _make_records(600) stdout, _, _ = _run_push(records, _fail_on_batch({1})) objects = _json_lines(stdout) assert len(objects) == 1, ( f"Expected 1 JSON object, got {len(objects)}.\n" f"stdout:\n{stdout}" ) def test_all_batches_fail_one_json_object(self) -> None: """1100 records (3 batches), all fail → still exactly 1 JSON object.""" records = _make_records(1100) stdout, _, _ = _run_push(records, _fail_on_batch({0, 1, 2})) objects = _json_lines(stdout) assert len(objects) == 1, ( f"Expected 1 JSON object, got {len(objects)}.\n" f"stdout:\n{stdout}" ) def test_no_failures_one_json_object_regression(self) -> None: """All batches succeed → exactly 1 JSON object (regression guard).""" records = _make_records(600) stdout, _, _ = _run_push(records, _always_succeed()) objects = _json_lines(stdout) assert len(objects) == 1, ( f"Expected 1 JSON object on success, got {len(objects)}.\n" f"stdout:\n{stdout}" ) def test_zero_records_one_json_object_regression(self) -> None: """No records → exactly 1 JSON object (regression guard).""" stdout, _, _ = _run_push([], _always_succeed()) objects = _json_lines(stdout) assert len(objects) == 1, f"Expected 1 JSON object, got {len(objects)}" # ============================================================================= # 2. UNIT — errors field in final JSON output # ============================================================================= class TestErrorsField: def test_errors_present_when_failed(self) -> None: """When failed=True the final JSON must include an 'errors' list.""" records = _make_records(10) stdout, _, _ = _run_push(records, _fail_on_batch({0})) obj = json.loads(stdout.strip()) assert "errors" in obj, f"'errors' key missing from JSON output: {obj}" assert isinstance(obj["errors"], list), f"'errors' must be a list: {obj}" assert len(obj["errors"]) >= 1, f"'errors' must be non-empty on failure: {obj}" def test_errors_contains_hub_message(self) -> None: """The errors list must contain the CoordBusError message.""" records = _make_records(10) stdout, _, _ = _run_push(records, _fail_on_batch({0})) obj = json.loads(stdout.strip()) assert any("hub error on batch 0" in e for e in obj["errors"]), ( f"Hub error message not in 'errors': {obj['errors']}" ) def test_errors_empty_when_success(self) -> None: """When failed=False the 'errors' list must be empty.""" records = _make_records(50) stdout, _, _ = _run_push(records, _always_succeed()) obj = json.loads(stdout.strip()) assert obj.get("errors", []) == [], ( f"'errors' must be empty on success, got: {obj.get('errors')}" ) def test_errors_has_one_entry_per_failed_batch(self) -> None: """3 failed batches → 3 entries in errors list.""" records = _make_records(1100) # 3 batches stdout, _, _ = _run_push(records, _fail_on_batch({0, 1, 2})) obj = json.loads(stdout.strip()) assert len(obj["errors"]) == 3, ( f"Expected 3 error entries for 3 failed batches, got: {obj['errors']}" ) # ============================================================================= # 3. UNIT — text mode sends errors to stderr, not stdout # ============================================================================= class TestTextModeErrors: def test_text_mode_error_goes_to_stderr(self) -> None: """In text mode, batch errors must go to stderr, not stdout.""" records = _make_records(10) stdout, stderr, _ = _run_push(records, _fail_on_batch({0}), fmt="text") # stderr must have the error assert stderr, "text mode batch error should appear on stderr" # stdout must NOT have a JSON error object for line in stdout.splitlines(): if line.strip(): try: obj = json.loads(line) assert obj.get("status") != "hub_error", ( f"JSON error object leaked to stdout in text mode: {line}" ) except json.JSONDecodeError: pass # text output is fine def test_text_mode_success_no_json_on_stdout(self) -> None: """In text mode, no JSON should appear on stdout.""" records = _make_records(10) stdout, _, _ = _run_push(records, _always_succeed(), fmt="text") for line in stdout.splitlines(): if line.strip(): try: json.loads(line) pytest.fail(f"Unexpected JSON on stdout in text mode: {line}") except json.JSONDecodeError: pass # expected # ============================================================================= # 4. DATA INTEGRITY — counts and flags are correct in final output # ============================================================================= class TestDataIntegrity: def test_inserted_count_from_successful_batches_only(self) -> None: """Inserted count must reflect only successful batches.""" # 600 records: batch 0 (500) succeeds, batch 1 (100) fails records = _make_records(600) stdout, _, _ = _run_push(records, _fail_on_batch({1})) obj = json.loads(stdout.strip()) assert obj["inserted"] == 500, ( f"Expected 500 inserted (only batch 0), got {obj['inserted']}" ) def test_total_is_all_records_regardless_of_failures(self) -> None: """Total must be len(all records), not just successfully pushed ones.""" records = _make_records(600) stdout, _, _ = _run_push(records, _fail_on_batch({1})) obj = json.loads(stdout.strip()) assert obj["total"] == 600, f"Expected total=600, got {obj['total']}" def test_failed_true_when_any_batch_fails(self) -> None: records = _make_records(600) stdout, _, _ = _run_push(records, _fail_on_batch({0})) obj = json.loads(stdout.strip()) assert obj["failed"] is True def test_failed_false_when_all_succeed(self) -> None: records = _make_records(600) stdout, _, _ = _run_push(records, _always_succeed()) obj = json.loads(stdout.strip()) assert obj["failed"] is False def test_skipped_count_from_successful_batches(self) -> None: """skipped reflects only successful batches.""" call_count = [0] def push_with_skips(hub_url: str, owner: str, slug: str, batch: list[MsgpackDict], token: str | None = None) -> MsgpackDict: idx = call_count[0] call_count[0] += 1 if idx == 1: raise CoordBusError("batch 1 failed", status_code=500) return {"inserted": len(batch) // 2, "skipped": len(batch) // 2} records = _make_records(600) stdout, _, _ = _run_push(records, push_with_skips) obj = json.loads(stdout.strip()) # Only batch 0 (500 records) succeeded: 250 inserted, 250 skipped assert obj["skipped"] == 250, f"Expected 250 skipped, got {obj['skipped']}" # ============================================================================= # 5. INTEGRATION — json.loads(stdout) succeeds and returns summary # ============================================================================= class TestJsonParseable: def test_json_loads_succeeds_on_failure(self) -> None: """json.loads(stdout) must succeed when batches fail.""" records = _make_records(600) stdout, _, _ = _run_push(records, _fail_on_batch({0})) try: obj = json.loads(stdout.strip()) except json.JSONDecodeError as exc: pytest.fail( f"json.loads(stdout) raised JSONDecodeError: {exc}\n" f"stdout:\n{stdout}\n" f"BUG: multiple JSON objects on stdout from _err() + summary." ) assert isinstance(obj, dict), f"Expected dict, got {type(obj)}" def test_json_loads_returns_summary_not_error(self) -> None: """The parsed JSON must be the summary dict, not the error dict.""" records = _make_records(600) stdout, _, _ = _run_push(records, _fail_on_batch({0})) obj = json.loads(stdout.strip()) # Summary fields must be present assert "schema" in obj, f"Missing schema in: {obj}" assert "inserted" in obj, f"Missing inserted in: {obj}" assert "failed" in obj, f"Missing failed in: {obj}" assert "total" in obj, f"Missing total in: {obj}" def test_json_loads_success_returns_summary(self) -> None: """On success, json.loads returns the summary with failed=False.""" records = _make_records(100) stdout, _, _ = _run_push(records, _always_succeed()) obj = json.loads(stdout.strip()) assert obj["failed"] is False assert obj["inserted"] == 100 def test_agentception_can_parse_push_output_on_partial_failure(self) -> None: """Simulates how agentception reads push output: reads first line as JSON.""" records = _make_records(1000) # 2 batches stdout, _, _ = _run_push(records, _fail_on_batch({0})) # Agentception reads first line first_line = stdout.strip().splitlines()[0] obj = json.loads(first_line) # Before fix: first_line is the _err() error object; obj has no "total" # After fix: first_line is the summary; obj has all summary fields assert "total" in obj, ( f"First JSON line is not the summary dict (is it the error object?).\n" f"Got: {obj}\n" f"Full stdout:\n{stdout}" ) # ============================================================================= # 6. STRESS — many batches, many failures # ============================================================================= class TestStress: def test_10_batches_alternating_failures_one_json_object(self) -> None: """10 batches (5000 records), odd batches fail → exactly 1 JSON on stdout.""" records = _make_records(5000) fail_on = {1, 3, 5, 7, 9} # 5 failing batches stdout, _, _ = _run_push(records, _fail_on_batch(fail_on)) objects = _json_lines(stdout) assert len(objects) == 1, ( f"Expected 1 JSON object for 10 batches with 5 failures, got {len(objects)}.\n" f"stdout (first 500 chars): {stdout[:500]}" ) def test_10_batches_all_fail_one_json_object(self) -> None: """All 10 batches fail → exactly 1 JSON object on stdout.""" records = _make_records(5000) stdout, _, _ = _run_push(records, _fail_on_batch(set(range(10)))) objects = _json_lines(stdout) assert len(objects) == 1 def test_10_batches_alternating_errors_field_has_5_entries(self) -> None: """5 failed batches → 5 entries in errors.""" records = _make_records(5000) fail_on = {1, 3, 5, 7, 9} stdout, _, _ = _run_push(records, _fail_on_batch(fail_on)) obj = json.loads(stdout.strip()) assert len(obj["errors"]) == 5, ( f"Expected 5 error entries for 5 failed batches, got {len(obj['errors'])}" ) # ============================================================================= # 7. REGRESSION — all existing --json fields still present # ============================================================================= class TestRegressionJsonSchema: def test_all_success_fields_present(self) -> None: records = _make_records(10) stdout, _, _ = _run_push(records, _always_succeed()) obj = json.loads(stdout.strip()) required = {"schema", "inserted", "skipped", "total", "failed", "duration_ms"} missing = required - set(obj.keys()) assert not missing, f"Missing fields in success output: {missing}" def test_all_failure_fields_present(self) -> None: records = _make_records(10) stdout, _, _ = _run_push(records, _fail_on_batch({0})) obj = json.loads(stdout.strip()) required = {"schema", "inserted", "skipped", "total", "failed", "duration_ms", "errors"} missing = required - set(obj.keys()) assert not missing, f"Missing fields in failure output: {missing}" def test_exit_code_1_on_any_failure(self) -> None: records = _make_records(10) _, _, exit_code = _run_push(records, _fail_on_batch({0})) assert exit_code == 1 def test_exit_code_0_on_success(self) -> None: records = _make_records(10) _, _, exit_code = _run_push(records, _always_succeed()) assert exit_code in (0, None) # None = no SystemExit raised = success