gabriel / muse public
test_run_push_json_multi_object.py python
501 lines 19.4 KB
Raw
sha256:ff478cfdcdd4b7fd6de89cb68896601a981f945634463275ec333bd20ca36402 Merge branch 'dev' into main Human 20 days ago
1 """
2 Tests for the bug: run_push emits multiple JSON objects to stdout when a batch
3 fails in --json mode.
4
5 Root cause (muse/cli/commands/coord_sync.py::run_push, batch loop):
6
7 for i in range(0, len(records), MAX_PUSH_BATCH):
8 batch = records[i : i + MAX_PUSH_BATCH]
9 try:
10 result = push_to_hub(...)
11 ...
12 except CoordBusError as exc:
13 _err(str(exc), as_json, "hub_error") ← prints JSON to stdout
14 failed = True
15
16 # then unconditionally:
17 print(json.dumps({..., "failed": True, ...})) ← second JSON to stdout
18
19 When N batches fail you get N+1 JSON objects on stdout. Any caller doing
20 ``json.loads(stdout)`` raises ``json.JSONDecodeError: Extra data`` or silently
21 reads only the first (error) object and misses the summary.
22
23 The fix: in JSON mode, never call _err() inside the batch loop. Accumulate
24 errors in a list; include them in the single final JSON object under "errors".
25 Text-mode (non-JSON) error lines continue to go to stderr as before.
26
27 Coverage:
28 Unit — single batch failure: exactly 1 JSON object on stdout
29 Unit — multi-batch partial failure: exactly 1 JSON object on stdout
30 Unit — all batches fail: exactly 1 JSON object on stdout
31 Unit — no failures: exactly 1 JSON object on stdout (regression)
32 Unit — errors list present in output when failed=True
33 Unit — errors list empty when failed=False
34 Unit — text mode still prints errors to stderr (regression)
35 Data integrity — summary counts are correct even when some batches fail
36 Data integrity — inserted/skipped counts from successful batches only
37 Data integrity — failed flag is True when any batch fails
38 Data integrity — failed flag is False when all batches succeed
39 Integration — json.loads(stdout) succeeds with exactly the summary dict
40 Integration — multi-batch partial failure: json.loads returns summary
41 Security — error messages from hub are sanitized in final output
42 Stress — 10 batches, every other fails: exactly 1 JSON object
43 Regression — all existing --json fields still present after fix
44 """
45 from __future__ import annotations
46
47 import argparse
48 import io
49 import json
50 import pathlib
51 import sys
52 from collections.abc import Callable
53 from unittest.mock import patch
54
55 import pytest
56
57 from muse.core.types import MsgpackDict
58 from muse.core.coord_bus import CoordBusError
59
60 # ---------------------------------------------------------------------------
61 # Helpers
62 # ---------------------------------------------------------------------------
63
64 _MAX_PUSH_BATCH = 500
65
66
67 def _make_records(n: int, kind: str = "reservation") -> list[MsgpackDict]:
68 return [
69 {
70 "kind": kind,
71 "record_id": f"rec-{i:06d}",
72 "run_id": "run-torvalds",
73 "payload": {"i": i},
74 "expires_at": "2099-12-31T23:59:59+00:00",
75 }
76 for i in range(n)
77 ]
78
79
80 def _make_args(fmt: str = "json") -> argparse.Namespace:
81 return argparse.Namespace(
82 json_out=(fmt == "json"),
83 owner="gabriel",
84 slug="muse",
85 hub="https://localhost:1337",
86 signing=None,
87 kinds=["reservation", "heartbeat", "intent", "release", "dependency", "task", "claim"],
88 )
89
90
91 def _run_push(records: list[MsgpackDict], push_side_effect: Callable[..., MsgpackDict], fmt: str = "json") -> tuple[str, str, int | None]:
92 """Run run_push and capture stdout/stderr.
93
94 Returns:
95 (stdout_str, stderr_str, exit_code_or_None)
96 """
97 from muse.cli.commands.coord_sync import run_push
98
99 buf_out = io.StringIO()
100 buf_err = io.StringIO()
101 args = _make_args(fmt=fmt)
102 exit_code = None
103
104 with (
105 patch("muse.cli.commands.coord_sync._gather_local_records", return_value=records),
106 patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=push_side_effect),
107 patch("muse.core.repo.require_repo", return_value=pathlib.Path("/fake/repo")),
108 patch("sys.stdout", buf_out),
109 patch("sys.stderr", buf_err),
110 ):
111 try:
112 run_push(args)
113 except SystemExit as e:
114 exit_code = e.code
115
116 return buf_out.getvalue(), buf_err.getvalue(), exit_code
117
118
119 def _json_lines(stdout: str) -> list[MsgpackDict]:
120 """Return all non-empty JSON objects from stdout lines."""
121 result = []
122 for line in stdout.splitlines():
123 line = line.strip()
124 if line:
125 result.append(json.loads(line))
126 return result
127
128
129 def _always_succeed(n_per_batch: int = _MAX_PUSH_BATCH) -> Callable[..., MsgpackDict]:
130 """Factory: push always returns inserted=len(batch), skipped=0."""
131 def _push(hub_url: str, owner: str, slug: str, batch: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
132 return {"inserted": len(batch), "skipped": 0}
133 return _push
134
135
136 def _fail_on_batch(fail_indices: set[int]) -> Callable[..., MsgpackDict]:
137 """Factory: push raises CoordBusError on batches in fail_indices (0-based)."""
138 call_count = [0]
139
140 def _push(hub_url: str, owner: str, slug: str, batch: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
141 idx = call_count[0]
142 call_count[0] += 1
143 if idx in fail_indices:
144 raise CoordBusError(f"hub error on batch {idx}", status_code=500)
145 return {"inserted": len(batch), "skipped": 0}
146
147 return _push
148
149
150 # =============================================================================
151 # 1. UNIT — stdout must contain exactly one JSON object
152 # =============================================================================
153
154
155 class TestSingleJsonObject:
156
157 def test_single_batch_failure_one_json_object(self) -> None:
158 """Single batch (< MAX_PUSH_BATCH records), batch fails → exactly 1 JSON."""
159 records = _make_records(10)
160 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
161
162 objects = _json_lines(stdout)
163 assert len(objects) == 1, (
164 f"Expected 1 JSON object, got {len(objects)}.\n"
165 f"BUG: _err() printed an error JSON AND a summary JSON to stdout.\n"
166 f"stdout:\n{stdout}"
167 )
168
169 def test_multi_batch_first_batch_fails_one_json_object(self) -> None:
170 """600 records (2 batches), batch 0 fails → exactly 1 JSON object."""
171 records = _make_records(600)
172 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
173
174 objects = _json_lines(stdout)
175 assert len(objects) == 1, (
176 f"Expected 1 JSON object, got {len(objects)}.\n"
177 f"stdout:\n{stdout}"
178 )
179
180 def test_multi_batch_second_batch_fails_one_json_object(self) -> None:
181 """600 records (2 batches), batch 1 fails → exactly 1 JSON object."""
182 records = _make_records(600)
183 stdout, _, _ = _run_push(records, _fail_on_batch({1}))
184
185 objects = _json_lines(stdout)
186 assert len(objects) == 1, (
187 f"Expected 1 JSON object, got {len(objects)}.\n"
188 f"stdout:\n{stdout}"
189 )
190
191 def test_all_batches_fail_one_json_object(self) -> None:
192 """1100 records (3 batches), all fail → still exactly 1 JSON object."""
193 records = _make_records(1100)
194 stdout, _, _ = _run_push(records, _fail_on_batch({0, 1, 2}))
195
196 objects = _json_lines(stdout)
197 assert len(objects) == 1, (
198 f"Expected 1 JSON object, got {len(objects)}.\n"
199 f"stdout:\n{stdout}"
200 )
201
202 def test_no_failures_one_json_object_regression(self) -> None:
203 """All batches succeed → exactly 1 JSON object (regression guard)."""
204 records = _make_records(600)
205 stdout, _, _ = _run_push(records, _always_succeed())
206
207 objects = _json_lines(stdout)
208 assert len(objects) == 1, (
209 f"Expected 1 JSON object on success, got {len(objects)}.\n"
210 f"stdout:\n{stdout}"
211 )
212
213 def test_zero_records_one_json_object_regression(self) -> None:
214 """No records → exactly 1 JSON object (regression guard)."""
215 stdout, _, _ = _run_push([], _always_succeed())
216
217 objects = _json_lines(stdout)
218 assert len(objects) == 1, f"Expected 1 JSON object, got {len(objects)}"
219
220
221 # =============================================================================
222 # 2. UNIT — errors field in final JSON output
223 # =============================================================================
224
225
226 class TestErrorsField:
227
228 def test_errors_present_when_failed(self) -> None:
229 """When failed=True the final JSON must include an 'errors' list."""
230 records = _make_records(10)
231 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
232
233 obj = json.loads(stdout.strip())
234 assert "errors" in obj, f"'errors' key missing from JSON output: {obj}"
235 assert isinstance(obj["errors"], list), f"'errors' must be a list: {obj}"
236 assert len(obj["errors"]) >= 1, f"'errors' must be non-empty on failure: {obj}"
237
238 def test_errors_contains_hub_message(self) -> None:
239 """The errors list must contain the CoordBusError message."""
240 records = _make_records(10)
241 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
242
243 obj = json.loads(stdout.strip())
244 assert any("hub error on batch 0" in e for e in obj["errors"]), (
245 f"Hub error message not in 'errors': {obj['errors']}"
246 )
247
248 def test_errors_empty_when_success(self) -> None:
249 """When failed=False the 'errors' list must be empty."""
250 records = _make_records(50)
251 stdout, _, _ = _run_push(records, _always_succeed())
252
253 obj = json.loads(stdout.strip())
254 assert obj.get("errors", []) == [], (
255 f"'errors' must be empty on success, got: {obj.get('errors')}"
256 )
257
258 def test_errors_has_one_entry_per_failed_batch(self) -> None:
259 """3 failed batches → 3 entries in errors list."""
260 records = _make_records(1100) # 3 batches
261 stdout, _, _ = _run_push(records, _fail_on_batch({0, 1, 2}))
262
263 obj = json.loads(stdout.strip())
264 assert len(obj["errors"]) == 3, (
265 f"Expected 3 error entries for 3 failed batches, got: {obj['errors']}"
266 )
267
268
269 # =============================================================================
270 # 3. UNIT — text mode sends errors to stderr, not stdout
271 # =============================================================================
272
273
274 class TestTextModeErrors:
275
276 def test_text_mode_error_goes_to_stderr(self) -> None:
277 """In text mode, batch errors must go to stderr, not stdout."""
278 records = _make_records(10)
279 stdout, stderr, _ = _run_push(records, _fail_on_batch({0}), fmt="text")
280
281 # stderr must have the error
282 assert stderr, "text mode batch error should appear on stderr"
283 # stdout must NOT have a JSON error object
284 for line in stdout.splitlines():
285 if line.strip():
286 try:
287 obj = json.loads(line)
288 assert obj.get("status") != "hub_error", (
289 f"JSON error object leaked to stdout in text mode: {line}"
290 )
291 except json.JSONDecodeError:
292 pass # text output is fine
293
294 def test_text_mode_success_no_json_on_stdout(self) -> None:
295 """In text mode, no JSON should appear on stdout."""
296 records = _make_records(10)
297 stdout, _, _ = _run_push(records, _always_succeed(), fmt="text")
298
299 for line in stdout.splitlines():
300 if line.strip():
301 try:
302 json.loads(line)
303 pytest.fail(f"Unexpected JSON on stdout in text mode: {line}")
304 except json.JSONDecodeError:
305 pass # expected
306
307
308 # =============================================================================
309 # 4. DATA INTEGRITY — counts and flags are correct in final output
310 # =============================================================================
311
312
313 class TestDataIntegrity:
314
315 def test_inserted_count_from_successful_batches_only(self) -> None:
316 """Inserted count must reflect only successful batches."""
317 # 600 records: batch 0 (500) succeeds, batch 1 (100) fails
318 records = _make_records(600)
319 stdout, _, _ = _run_push(records, _fail_on_batch({1}))
320
321 obj = json.loads(stdout.strip())
322 assert obj["inserted"] == 500, (
323 f"Expected 500 inserted (only batch 0), got {obj['inserted']}"
324 )
325
326 def test_total_is_all_records_regardless_of_failures(self) -> None:
327 """Total must be len(all records), not just successfully pushed ones."""
328 records = _make_records(600)
329 stdout, _, _ = _run_push(records, _fail_on_batch({1}))
330
331 obj = json.loads(stdout.strip())
332 assert obj["total"] == 600, f"Expected total=600, got {obj['total']}"
333
334 def test_failed_true_when_any_batch_fails(self) -> None:
335 records = _make_records(600)
336 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
337
338 obj = json.loads(stdout.strip())
339 assert obj["failed"] is True
340
341 def test_failed_false_when_all_succeed(self) -> None:
342 records = _make_records(600)
343 stdout, _, _ = _run_push(records, _always_succeed())
344
345 obj = json.loads(stdout.strip())
346 assert obj["failed"] is False
347
348 def test_skipped_count_from_successful_batches(self) -> None:
349 """skipped reflects only successful batches."""
350 call_count = [0]
351
352 def push_with_skips(hub_url: str, owner: str, slug: str, batch: list[MsgpackDict], token: str | None = None) -> MsgpackDict:
353 idx = call_count[0]
354 call_count[0] += 1
355 if idx == 1:
356 raise CoordBusError("batch 1 failed", status_code=500)
357 return {"inserted": len(batch) // 2, "skipped": len(batch) // 2}
358
359 records = _make_records(600)
360 stdout, _, _ = _run_push(records, push_with_skips)
361
362 obj = json.loads(stdout.strip())
363 # Only batch 0 (500 records) succeeded: 250 inserted, 250 skipped
364 assert obj["skipped"] == 250, f"Expected 250 skipped, got {obj['skipped']}"
365
366
367 # =============================================================================
368 # 5. INTEGRATION — json.loads(stdout) succeeds and returns summary
369 # =============================================================================
370
371
372 class TestJsonParseable:
373
374 def test_json_loads_succeeds_on_failure(self) -> None:
375 """json.loads(stdout) must succeed when batches fail."""
376 records = _make_records(600)
377 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
378
379 try:
380 obj = json.loads(stdout.strip())
381 except json.JSONDecodeError as exc:
382 pytest.fail(
383 f"json.loads(stdout) raised JSONDecodeError: {exc}\n"
384 f"stdout:\n{stdout}\n"
385 f"BUG: multiple JSON objects on stdout from _err() + summary."
386 )
387
388 assert isinstance(obj, dict), f"Expected dict, got {type(obj)}"
389
390 def test_json_loads_returns_summary_not_error(self) -> None:
391 """The parsed JSON must be the summary dict, not the error dict."""
392 records = _make_records(600)
393 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
394
395 obj = json.loads(stdout.strip())
396 # Summary fields must be present
397 assert "schema" in obj, f"Missing schema in: {obj}"
398 assert "inserted" in obj, f"Missing inserted in: {obj}"
399 assert "failed" in obj, f"Missing failed in: {obj}"
400 assert "total" in obj, f"Missing total in: {obj}"
401
402 def test_json_loads_success_returns_summary(self) -> None:
403 """On success, json.loads returns the summary with failed=False."""
404 records = _make_records(100)
405 stdout, _, _ = _run_push(records, _always_succeed())
406
407 obj = json.loads(stdout.strip())
408 assert obj["failed"] is False
409 assert obj["inserted"] == 100
410
411 def test_agentception_can_parse_push_output_on_partial_failure(self) -> None:
412 """Simulates how agentception reads push output: reads first line as JSON."""
413 records = _make_records(1000) # 2 batches
414 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
415
416 # Agentception reads first line
417 first_line = stdout.strip().splitlines()[0]
418 obj = json.loads(first_line)
419
420 # Before fix: first_line is the _err() error object; obj has no "total"
421 # After fix: first_line is the summary; obj has all summary fields
422 assert "total" in obj, (
423 f"First JSON line is not the summary dict (is it the error object?).\n"
424 f"Got: {obj}\n"
425 f"Full stdout:\n{stdout}"
426 )
427
428
429 # =============================================================================
430 # 6. STRESS — many batches, many failures
431 # =============================================================================
432
433
434 class TestStress:
435
436 def test_10_batches_alternating_failures_one_json_object(self) -> None:
437 """10 batches (5000 records), odd batches fail → exactly 1 JSON on stdout."""
438 records = _make_records(5000)
439 fail_on = {1, 3, 5, 7, 9} # 5 failing batches
440 stdout, _, _ = _run_push(records, _fail_on_batch(fail_on))
441
442 objects = _json_lines(stdout)
443 assert len(objects) == 1, (
444 f"Expected 1 JSON object for 10 batches with 5 failures, got {len(objects)}.\n"
445 f"stdout (first 500 chars): {stdout[:500]}"
446 )
447
448 def test_10_batches_all_fail_one_json_object(self) -> None:
449 """All 10 batches fail → exactly 1 JSON object on stdout."""
450 records = _make_records(5000)
451 stdout, _, _ = _run_push(records, _fail_on_batch(set(range(10))))
452
453 objects = _json_lines(stdout)
454 assert len(objects) == 1
455
456 def test_10_batches_alternating_errors_field_has_5_entries(self) -> None:
457 """5 failed batches → 5 entries in errors."""
458 records = _make_records(5000)
459 fail_on = {1, 3, 5, 7, 9}
460 stdout, _, _ = _run_push(records, _fail_on_batch(fail_on))
461
462 obj = json.loads(stdout.strip())
463 assert len(obj["errors"]) == 5, (
464 f"Expected 5 error entries for 5 failed batches, got {len(obj['errors'])}"
465 )
466
467
468 # =============================================================================
469 # 7. REGRESSION — all existing --json fields still present
470 # =============================================================================
471
472
473 class TestRegressionJsonSchema:
474
475 def test_all_success_fields_present(self) -> None:
476 records = _make_records(10)
477 stdout, _, _ = _run_push(records, _always_succeed())
478
479 obj = json.loads(stdout.strip())
480 required = {"schema", "inserted", "skipped", "total", "failed", "duration_ms"}
481 missing = required - set(obj.keys())
482 assert not missing, f"Missing fields in success output: {missing}"
483
484 def test_all_failure_fields_present(self) -> None:
485 records = _make_records(10)
486 stdout, _, _ = _run_push(records, _fail_on_batch({0}))
487
488 obj = json.loads(stdout.strip())
489 required = {"schema", "inserted", "skipped", "total", "failed", "duration_ms", "errors"}
490 missing = required - set(obj.keys())
491 assert not missing, f"Missing fields in failure output: {missing}"
492
493 def test_exit_code_1_on_any_failure(self) -> None:
494 records = _make_records(10)
495 _, _, exit_code = _run_push(records, _fail_on_batch({0}))
496 assert exit_code == 1
497
498 def test_exit_code_0_on_success(self) -> None:
499 records = _make_records(10)
500 _, _, exit_code = _run_push(records, _always_succeed())
501 assert exit_code in (0, None) # None = no SystemExit raised = success
File History 2 commits
sha256:ff478cfdcdd4b7fd6de89cb68896601a981f945634463275ec333bd20ca36402 Merge branch 'dev' into main Human 20 days ago
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 73 days ago