gabriel / muse public
test_core_transport.py python
634 lines 26.1 KB
Raw
sha256:79ffe87f5fe2ec146e35f05521218bbf54dffdb0440c07f970bad05f16efb89f chore: merge main — carry all urllib/typing/test fixes from dev Sonnet 4.6 minor ⚠ breaking 19 days ago
1 """Tests for muse.core.transport — HttpTransport and response parsers."""
2
3 from __future__ import annotations
4
5 import json
6 import signal
7 import socket
8 import threading
9 import time
10 import unittest.mock
11
12 import msgpack
13 import pytest
14
15 import hashlib
16
17 from muse.core.types import MsgpackDict, b64url_decode
18 from muse.core.mpack import MPack, RemoteInfo
19 from muse.core.msign import build_msign_header
20 from muse.core.transport import (
21 _Request,
22 HttpTransport,
23 SigningIdentity,
24 TransportError,
25 _parse_mpack,
26 _parse_push_result,
27 _parse_remote_info,
28 )
29
30
31 # ---------------------------------------------------------------------------
32 # Helpers
33 # ---------------------------------------------------------------------------
34
35
36 def _make_signing() -> "SigningIdentity":
37 """Generate a fresh Ed25519 SigningIdentity for tests."""
38 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
39 from muse.core.transport import SigningIdentity
40 return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate())
41
42
43 def _mock_response(
44 body: bytes,
45 status: int = 200,
46 content_type: str = "application/x-msgpack",
47 ) -> unittest.mock.MagicMock:
48 """Return a mock httpx response."""
49 resp = unittest.mock.MagicMock()
50 resp.content = body
51 resp.status_code = status
52 resp.headers = {"Content-Type": content_type}
53 return resp
54
55
56 def _mp(data: MsgpackDict) -> bytes:
57 """Encode data as msgpack."""
58 return msgpack.packb(data, use_bin_type=True)
59
60
61
62 # ---------------------------------------------------------------------------
63 # _parse_remote_info
64 # ---------------------------------------------------------------------------
65
66
67 class TestParseRemoteInfo:
68 def test_valid_response(self) -> None:
69 raw = _mp(
70 {
71 "repo_id": "r123",
72 "domain": "midi",
73 "default_branch": "main",
74 "branch_heads": {"main": "abc123", "dev": "def456"},
75 }
76 )
77 info = _parse_remote_info(raw)
78 assert info["repo_id"] == "r123"
79 assert info["domain"] == "midi"
80 assert info["default_branch"] == "main"
81 assert info["branch_heads"] == {"main": "abc123", "dev": "def456"}
82
83 def test_invalid_msgpack_raises_transport_error(self) -> None:
84 with pytest.raises(TransportError):
85 _parse_remote_info(b"\xff\xff\xff\xff\xff invalid")
86
87 def test_non_dict_response_returns_defaults(self) -> None:
88 raw = _mp([1, 2, 3])
89 info = _parse_remote_info(raw)
90 assert info["repo_id"] == ""
91 assert info["branch_heads"] == {}
92
93 def test_missing_fields_get_defaults(self) -> None:
94 raw = _mp({"repo_id": "x"})
95 info = _parse_remote_info(raw)
96 assert info["repo_id"] == "x"
97 assert info["domain"] == "midi"
98 assert info["default_branch"] == "main"
99 assert info["branch_heads"] == {}
100
101 def test_non_string_branch_heads_excluded(self) -> None:
102 raw = _mp({"branch_heads": {"main": "abc", "bad": 123}})
103 info = _parse_remote_info(raw)
104 assert "main" in info["branch_heads"]
105 assert "bad" not in info["branch_heads"]
106
107
108
109 # ---------------------------------------------------------------------------
110 # build_msign_header — module-level signing utility
111 # ---------------------------------------------------------------------------
112
113
114 class TestBuildMsignHeader:
115 def _make_signing(self) -> SigningIdentity:
116 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
117
118 return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate())
119
120 def test_header_format(self) -> None:
121 header = build_msign_header(self._make_signing(), "GET", "https://example.com/path", None)
122 assert header.startswith('MSign handle="testuser"')
123 assert " ts=" in header
124 assert " sig=" in header
125
126 def test_timestamp_is_recent(self) -> None:
127 import time
128
129 before = int(time.time())
130 header = build_msign_header(self._make_signing(), "GET", "https://example.com/p", None)
131 after = int(time.time())
132 ts_part = next(p for p in header.split() if p.startswith("ts="))
133 ts = int(ts_part[3:])
134 assert before <= ts <= after + 1
135
136 def test_signature_is_verifiable(self) -> None:
137 """The sig= value must verify against the Ed25519 public key for the canonical input."""
138 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
139
140 private_key = Ed25519PrivateKey.generate()
141 signing = SigningIdentity(handle="testuser", private_key=private_key)
142 method = "POST"
143 url = "https://hub.example.com/owner/repo/push"
144 body = b"some body data"
145
146 header = build_msign_header(signing, method, url, body)
147
148 parts: dict[str, str] = {}
149 for part in header[len("MSign "):].split():
150 k, _, v = part.partition("=")
151 parts[k] = v.strip('"')
152
153 ts = int(parts["ts"])
154 sig_bytes = b64url_decode(parts["sig"])
155
156 body_hash = "sha256:" + hashlib.sha256(body).hexdigest()
157 canonical = f"ed25519\n{method}\nhub.example.com\n/owner/repo/push\n{ts}\n{body_hash}".encode()
158
159 # raises cryptography.exceptions.InvalidSignature on failure
160 private_key.public_key().verify(sig_bytes, canonical)
161
162 def test_query_string_included_in_canonical(self) -> None:
163 """Query parameters must be part of the signed path, not dropped."""
164 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
165
166 private_key = Ed25519PrivateKey.generate()
167 signing = SigningIdentity(handle="u", private_key=private_key)
168 url = "https://hub.example.com/path?foo=bar&baz=1"
169
170 header = build_msign_header(signing, "GET", url, None)
171
172 parts: dict[str, str] = {}
173 for part in header[len("MSign "):].split():
174 k, _, v = part.partition("=")
175 parts[k] = v.strip('"')
176
177 ts = int(parts["ts"])
178 sig_bytes = b64url_decode(parts["sig"])
179 body_hash = "sha256:" + hashlib.sha256(b"").hexdigest()
180 canonical = f"ed25519\nGET\nhub.example.com\n/path?foo=bar&baz=1\n{ts}\n{body_hash}".encode()
181
182 private_key.public_key().verify(sig_bytes, canonical)
183
184 def test_none_body_treated_as_empty_bytes(self) -> None:
185 """None and b'' must produce the same SHA-256 body hash in the canonical form."""
186 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
187
188 private_key = Ed25519PrivateKey.generate()
189 signing = SigningIdentity(handle="u", private_key=private_key)
190 url = "https://hub.example.com/path"
191
192 header = build_msign_header(signing, "GET", url, None)
193
194 parts: dict[str, str] = {}
195 for part in header[len("MSign "):].split():
196 k, _, v = part.partition("=")
197 parts[k] = v.strip('"')
198
199 ts = int(parts["ts"])
200 sig_bytes = b64url_decode(parts["sig"])
201 # body=None → b"" → sha256(b"") is the canonical body hash
202 body_hash = "sha256:" + hashlib.sha256(b"").hexdigest()
203 canonical = f"ed25519\nGET\nhub.example.com\n/path\n{ts}\n{body_hash}".encode()
204
205 private_key.public_key().verify(sig_bytes, canonical)
206
207 def test_different_methods_produce_different_headers(self) -> None:
208 """Two calls with different methods on the same URL must differ (method is in canonical)."""
209
210 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
211
212 private_key = Ed25519PrivateKey.generate()
213 signing = SigningIdentity(handle="u", private_key=private_key)
214
215 with unittest.mock.patch("muse.core.msign.time") as mt:
216 mt.time.return_value = 1_700_000_000
217 h_get = build_msign_header(signing, "GET", "https://example.com/x", b"")
218 h_post = build_msign_header(signing, "POST", "https://example.com/x", b"")
219
220 assert h_get != h_post
221
222 def test_different_bodies_produce_different_sigs(self) -> None:
223 """Body content must influence the signature (body hash is in canonical)."""
224
225 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
226
227 private_key = Ed25519PrivateKey.generate()
228 signing = SigningIdentity(handle="u", private_key=private_key)
229
230 with unittest.mock.patch("muse.core.msign.time") as mt:
231 mt.time.return_value = 1_700_000_000
232 h1 = build_msign_header(signing, "POST", "https://example.com/x", b"body-a")
233 h2 = build_msign_header(signing, "POST", "https://example.com/x", b"body-b")
234
235 assert h1 != h2
236
237 def test_handle_embedded_in_header(self) -> None:
238 """The MSign header must carry the signing identity's handle."""
239
240 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
241
242 private_key = Ed25519PrivateKey.generate()
243 signing = SigningIdentity(handle="my-agent-42", private_key=private_key)
244 header = build_msign_header(signing, "GET", "https://example.com/x", None)
245 assert 'handle="my-agent-42"' in header
246
247
248 # ---------------------------------------------------------------------------
249 # _parse_mpack
250 # ---------------------------------------------------------------------------
251
252
253 class TestParseBundle:
254 def test_empty_msgpack_object_returns_empty_bundle(self) -> None:
255 mpack = _parse_mpack(_mp({}))
256 assert mpack == {}
257
258 def test_non_dict_returns_empty_bundle(self) -> None:
259 mpack = _parse_mpack(_mp([]))
260 assert mpack == {}
261
262 def test_commits_extracted(self) -> None:
263 raw = _mp(
264 {
265 "commits": [
266 {
267 "commit_id": "c1",
268 "repo_id": "r1",
269 "branch": "main",
270 "snapshot_id": "1" * 64,
271 "message": "test",
272 "committed_at": "2026-01-01T00:00:00+00:00",
273 "parent_commit_id": None,
274 "parent2_commit_id": None,
275 "author": "bob",
276 "metadata": {},
277 }
278 ]
279 }
280 )
281 mpack = _parse_mpack(raw)
282 commits = mpack.get("commits") or []
283 assert len(commits) == 1
284 assert commits[0]["commit_id"] == "c1"
285
286 def test_objects_extracted(self) -> None:
287 raw = _mp(
288 {
289 "blobs": [
290 {
291 "object_id": "abc123",
292 "content": b"hello",
293 }
294 ]
295 }
296 )
297 mpack = _parse_mpack(raw)
298 objs = mpack.get("blobs") or []
299 assert len(objs) == 1
300 assert objs[0]["object_id"] == "abc123"
301 assert objs[0]["content"] == b"hello"
302
303 def test_object_missing_content_excluded(self) -> None:
304 raw = _mp({"blobs": [{"object_id": "abc"}]})
305 mpack = _parse_mpack(raw)
306 assert (mpack.get("blobs") or []) == []
307
308 def test_branch_heads_extracted(self) -> None:
309 raw = _mp({"branch_heads": {"main": "abc123"}})
310 mpack = _parse_mpack(raw)
311 assert mpack.get("branch_heads") == {"main": "abc123"}
312
313
314 # ---------------------------------------------------------------------------
315 # _parse_push_result
316 # ---------------------------------------------------------------------------
317
318
319 class TestParsePushResult:
320 def test_success_response(self) -> None:
321 raw = _mp({"ok": True, "message": "pushed", "branch_heads": {"main": "abc"}})
322 result = _parse_push_result(raw)
323 assert result["ok"] is True
324 assert result["message"] == "pushed"
325 assert result["branch_heads"] == {"main": "abc"}
326
327 def test_failure_response(self) -> None:
328 raw = _mp({"ok": False, "message": "rejected", "branch_heads": {}})
329 result = _parse_push_result(raw)
330 assert result["ok"] is False
331 assert result["message"] == "rejected"
332
333 def test_non_msgpack_raises_transport_error(self) -> None:
334 with pytest.raises(TransportError):
335 _parse_push_result(b"\xff\xff invalid msgpack")
336
337 def test_missing_ok_defaults_false(self) -> None:
338 raw = _mp({"message": "hm", "branch_heads": {}})
339 result = _parse_push_result(raw)
340 assert result["ok"] is False
341
342
343 # ---------------------------------------------------------------------------
344 # HttpTransport — mocked urlopen
345 # ---------------------------------------------------------------------------
346
347
348 def _mock_httpx_client_resp(body: bytes, status: int = 200) -> unittest.mock.MagicMock:
349 """Return a mock httpx client whose .request() returns a response with body/status.
350
351 Supports context-manager usage: ``with _httpx_mod.Client(...) as client:``.
352 """
353 resp = unittest.mock.MagicMock()
354 resp.status_code = status
355 resp.content = body
356 resp.text = body.decode("utf-8", errors="replace")
357 client = unittest.mock.MagicMock()
358 client.is_closed = False
359 client.request = unittest.mock.MagicMock(return_value=resp)
360 client.__enter__ = unittest.mock.MagicMock(return_value=client)
361 client.__exit__ = unittest.mock.MagicMock(return_value=False)
362 return client
363
364
365 class TestHttpTransportFetchRemoteInfo:
366 def test_calls_correct_endpoint(self) -> None:
367 body = _mp({
368 "repo_id": "r1", "domain": "midi",
369 "default_branch": "main", "branch_heads": {"main": "abc"},
370 })
371 client = _mock_httpx_client_resp(body)
372 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
373 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
374 info = HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
375 url_called = client.request.call_args[0][1]
376 assert url_called == "https://hub.example.com/repos/r1/refs"
377 assert info["repo_id"] == "r1"
378
379 def test_msign_header_sent(self) -> None:
380 body = _mp({"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}})
381 client = _mock_httpx_client_resp(body)
382 signing = _make_signing()
383 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
384 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
385 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", signing)
386 headers = client.request.call_args.kwargs.get("headers", {})
387 auth = headers.get("Authorization") or headers.get("authorization")
388 assert auth and auth.startswith("MSign handle=\"testuser\"")
389
390 def test_no_token_no_auth_header(self) -> None:
391 body = _mp({"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}})
392 client = _mock_httpx_client_resp(body)
393 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
394 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
395 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
396 headers = client.request.call_args.kwargs.get("headers", {})
397 auth = headers.get("Authorization") or headers.get("authorization")
398 assert auth is None
399
400 def test_http_401_raises_transport_error(self) -> None:
401 client = _mock_httpx_client_resp(b"Unauthorized", status=401)
402 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
403 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
404 with pytest.raises(TransportError) as exc_info:
405 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
406 assert exc_info.value.status_code == 401
407
408 def test_http_404_raises_transport_error(self) -> None:
409 client = _mock_httpx_client_resp(b"Not Found", status=404)
410 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
411 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
412 with pytest.raises(TransportError) as exc_info:
413 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
414 assert exc_info.value.status_code == 404
415
416 def test_http_500_raises_transport_error(self) -> None:
417 client = _mock_httpx_client_resp(b"Internal Error", status=500)
418 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
419 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
420 with pytest.raises(TransportError) as exc_info:
421 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
422 assert exc_info.value.status_code == 500
423
424 def test_url_error_raises_transport_error_with_code_0(self) -> None:
425 client = unittest.mock.MagicMock()
426 client.is_closed = False
427 client.request = unittest.mock.MagicMock(side_effect=Exception("Name or service not known"))
428 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
429 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
430 with pytest.raises(TransportError) as exc_info:
431 HttpTransport().fetch_remote_info("https://bad.host/r", None)
432 assert exc_info.value.status_code == 0
433
434 def test_trailing_slash_stripped_from_url(self) -> None:
435 body = _mp({"repo_id": "r", "domain": "midi", "default_branch": "main", "branch_heads": {}})
436 client = _mock_httpx_client_resp(body)
437 with unittest.mock.patch("muse.core.transport._httpx_mod") as mock_mod:
438 mock_mod.Client = unittest.mock.MagicMock(return_value=client)
439 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1/", None)
440 url_called = client.request.call_args[0][1]
441 assert url_called == "https://hub.example.com/repos/r1/refs"
442
443
444 # ---------------------------------------------------------------------------
445 # HttpTransport._build_request — credential security and loopback allowlist
446 # ---------------------------------------------------------------------------
447
448
449 class TestBuildRequest:
450 """_build_request enforces HTTPS for non-loopback URLs with signing identity."""
451
452 def _build(self, url: str, with_signing: bool = True) -> "_Request":
453 signing = _make_signing() if with_signing else None
454 with unittest.mock.patch("muse.core.hub_trust.check_and_pin"):
455 return HttpTransport()._build_request("GET", url, signing)
456
457 # ── Loopback hosts allowed over plain HTTP ────────────────────────────
458
459 def test_localhost_http_with_token_allowed(self) -> None:
460 req = self._build("https://localhost:1337/repo/refs")
461 assert req.headers.get("Authorization", "").startswith("MSign ")
462
463 def test_127_0_0_1_http_with_token_allowed(self) -> None:
464 req = self._build("http://127.0.0.1:10003/repo/refs")
465 assert req.headers.get("Authorization", "").startswith("MSign ")
466
467 def test_ipv6_loopback_http_with_token_allowed(self) -> None:
468 req = self._build("http://[::1]:10003/repo/refs")
469 assert req.headers.get("Authorization", "").startswith("MSign ")
470
471 def test_host_docker_internal_http_with_token_allowed(self) -> None:
472 """host.docker.internal is Docker Desktop's alias for the host loopback.
473
474 Agent swarms run inside Docker and use http://host.docker.internal:10003
475 to reach a local MuseHub instance. Credentials must be sent over this
476 plain-HTTP connection — the traffic never leaves the machine.
477 """
478 req = self._build("http://host.docker.internal:10003/gabriel/repo/refs")
479 assert req.headers.get("Authorization", "").startswith("MSign ")
480
481 def test_https_any_host_with_token_allowed(self) -> None:
482 req = self._build("https://musehub.ai/gabriel/repo/refs")
483 assert req.headers.get("Authorization", "").startswith("MSign ")
484
485 # ── Non-loopback HTTP with token must be rejected ─────────────────────
486
487 def test_non_loopback_http_token_raises_transport_error(self) -> None:
488 with pytest.raises(TransportError, match="non-HTTPS"):
489 self._build("http://musehub.ai/gabriel/repo/refs")
490
491 def test_arbitrary_hostname_http_token_raises(self) -> None:
492 with pytest.raises(TransportError, match="non-HTTPS"):
493 self._build("http://attacker.example.com/steal")
494
495 def test_localhost_lookalike_http_token_raises(self) -> None:
496 """'localhost.attacker.example.com' must NOT be mistaken for the loopback interface."""
497 with pytest.raises(TransportError, match="non-HTTPS"):
498 self._build("http://localhost.attacker.example.com/repo/refs")
499
500 def test_host_docker_internal_lookalike_http_token_raises(self) -> None:
501 """'host.docker.internal.attacker.example.com' must not bypass the check."""
502 with pytest.raises(TransportError, match="non-HTTPS"):
503 self._build("http://host.docker.internal.attacker.example.com/repo")
504
505 # ── No token — scheme restriction does not apply ──────────────────────
506
507 def test_non_loopback_http_without_token_allowed(self) -> None:
508 req = self._build("http://musehub.ai/public/repo/refs", with_signing=False)
509 assert "Authorization" not in req.headers
510
511 # ── Request structure ─────────────────────────────────────────────────
512
513 def test_accept_header_always_set(self) -> None:
514 req = self._build("https://musehub.ai/repo/refs")
515 assert "msgpack" in req.headers.get("Accept", "")
516
517 def test_method_preserved(self) -> None:
518 req = HttpTransport()._build_request("POST", "https://musehub.ai/x", None)
519 assert req.method == "POST"
520
521 def test_body_sets_content_type(self) -> None:
522 req = HttpTransport()._build_request(
523 "POST", "https://musehub.ai/x", None, body_bytes=b"data"
524 )
525 assert req.headers.get("Content-Type") == "application/x-msgpack"
526
527 def test_no_body_omits_content_type(self) -> None:
528 req = self._build("https://musehub.ai/x", with_signing=False)
529 assert "Content-Type" not in req.headers
530
531
532 # ---------------------------------------------------------------------------
533 # SIGPIPE regression — large push body with early-close server
534 # ---------------------------------------------------------------------------
535
536
537 def _early_close_server(
538 server_sock: socket.socket,
539 response_code: int,
540 resp_body: bytes,
541 ) -> None:
542 """Accept one connection, read HTTP headers, send response, close immediately.
543
544 Simulates the scenario where the server sends a 4xx/5xx response while
545 the client is still uploading a large request body. Without the
546 ``_ignore_sigpipe`` guard, the client process dies with exit code 141
547 (SIGPIPE) instead of raising ``TransportError``.
548 """
549 try:
550 conn, _ = server_sock.accept()
551 conn.settimeout(5.0)
552 try:
553 buf = b""
554 deadline = time.time() + 5
555 while time.time() < deadline:
556 try:
557 chunk = conn.recv(4096)
558 if not chunk:
559 break
560 buf += chunk
561 if b"\r\n\r\n" in buf:
562 break
563 except socket.timeout:
564 break
565 status_line = f"HTTP/1.1 {response_code} Error\r\n"
566 resp_headers = (
567 "Content-Type: application/json\r\n"
568 f"Content-Length: {len(resp_body)}\r\n"
569 "Connection: close\r\n\r\n"
570 )
571 conn.sendall((status_line + resp_headers).encode() + resp_body)
572 finally:
573 conn.close()
574 except Exception:
575 pass
576 finally:
577 server_sock.close()
578
579
580 class TestSigpipeRegression:
581 """Regression tests for SIGPIPE on large push bodies.
582
583 ``muse/cli/app.py`` sets ``SIGPIPE = SIG_DFL`` so that piping output to
584 ``head``/``grep``/``jq`` exits cleanly. Without the ``_ignore_sigpipe``
585 guard the push command dies with exit 141 when the server closes the
586 connection while the client is still uploading a large body.
587 """
588
589 def _run_early_close_scenario(self, payload_mb: float, response_code: int) -> None:
590 """Assert that TransportError is raised, not a process-killing SIGPIPE."""
591 # Reproduce app.py's startup action — SIG_DFL kills the process on SIGPIPE.
592 if hasattr(signal, "SIGPIPE"):
593 original = signal.signal(signal.SIGPIPE, signal.SIG_DFL)
594 else:
595 original = None
596
597 body = b"X" * int(payload_mb * 1024 * 1024)
598 resp_body = b'{"detail":"test error"}'
599
600 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
601 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
602 server_sock.bind(("127.0.0.1", 0))
603 server_sock.listen(1)
604 port = server_sock.getsockname()[1]
605
606 t = threading.Thread(
607 target=_early_close_server,
608 args=(server_sock, response_code, resp_body),
609 daemon=True,
610 )
611 t.start()
612
613 try:
614 url = f"http://127.0.0.1:{port}/owner/repo/push"
615 transport = HttpTransport()
616 req = transport._build_request("POST", url, None, body, "application/x-msgpack")
617 with pytest.raises(TransportError):
618 transport._execute(req)
619 finally:
620 t.join(timeout=2)
621 if original is not None and hasattr(signal, "SIGPIPE"):
622 signal.signal(signal.SIGPIPE, original)
623
624 def test_sigpipe_not_fatal_409_large_body(self) -> None:
625 """20 MB body + server closes after headers → TransportError, not exit 141."""
626 self._run_early_close_scenario(payload_mb=20.0, response_code=409)
627
628 def test_sigpipe_not_fatal_401_large_body(self) -> None:
629 """15 MB body + server sends 401 early → TransportError, not SIGPIPE crash."""
630 self._run_early_close_scenario(payload_mb=15.0, response_code=401)
631
632 def test_sigpipe_not_fatal_500_large_body(self) -> None:
633 """10 MB body + server crashes (500) → TransportError, not SIGPIPE crash."""
634 self._run_early_close_scenario(payload_mb=10.0, response_code=500)
File History 2 commits
sha256:79ffe87f5fe2ec146e35f05521218bbf54dffdb0440c07f970bad05f16efb89f chore: merge main — carry all urllib/typing/test fixes from dev Sonnet 4.6 minor 19 days ago
sha256:0bea7600d1eee83e87950be49933b1006fa9dc2c71e7c4ee748d324f61138156 chore: bump version to 0.2.0rc11; fix typing audit violatio… Sonnet 4.6 minor 19 days ago