gabriel / muse public
test_core_transport.py python
631 lines 26.0 KB
Raw
sha256:79ffe87f5fe2ec146e35f05521218bbf54dffdb0440c07f970bad05f16efb89f chore: merge main — carry all urllib/typing/test fixes from dev Sonnet 4.6 minor ⚠ breaking 18 days ago
1 """Tests for muse.core.transport — HttpTransport and response parsers."""
2
3 from __future__ import annotations
4
5 import json
6 import signal
7 import socket
8 import threading
9 import time
10 import unittest.mock
11
12 import msgpack
13 import pytest
14
15 import hashlib
16
17 from muse.core.types import MsgpackDict, b64url_decode
18 from muse.core.mpack import MPack, RemoteInfo
19 from muse.core.msign import build_msign_header
20 from muse.core.transport import (
21 _Request,
22 HttpTransport,
23 SigningIdentity,
24 TransportError,
25 _parse_mpack,
26 _parse_push_result,
27 _parse_remote_info,
28 )
29
30
31 # ---------------------------------------------------------------------------
32 # Helpers
33 # ---------------------------------------------------------------------------
34
35
36 def _make_signing() -> "SigningIdentity":
37 """Generate a fresh Ed25519 SigningIdentity for tests."""
38 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
39 from muse.core.transport import SigningIdentity
40 return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate())
41
42
43 def _mock_response(
44 body: bytes,
45 status: int = 200,
46 content_type: str = "application/x-msgpack",
47 ) -> unittest.mock.MagicMock:
48 """Return a mock httpx response."""
49 resp = unittest.mock.MagicMock()
50 resp.content = body
51 resp.status_code = status
52 resp.headers = {"Content-Type": content_type}
53 return resp
54
55
56 def _mp(data: MsgpackDict) -> bytes:
57 """Encode data as msgpack."""
58 return msgpack.packb(data, use_bin_type=True)
59
60
61
62 # ---------------------------------------------------------------------------
63 # _parse_remote_info
64 # ---------------------------------------------------------------------------
65
66
67 class TestParseRemoteInfo:
68 def test_valid_response(self) -> None:
69 raw = _mp(
70 {
71 "repo_id": "r123",
72 "domain": "midi",
73 "default_branch": "main",
74 "branch_heads": {"main": "abc123", "dev": "def456"},
75 }
76 )
77 info = _parse_remote_info(raw)
78 assert info["repo_id"] == "r123"
79 assert info["domain"] == "midi"
80 assert info["default_branch"] == "main"
81 assert info["branch_heads"] == {"main": "abc123", "dev": "def456"}
82
83 def test_invalid_msgpack_raises_transport_error(self) -> None:
84 with pytest.raises(TransportError):
85 _parse_remote_info(b"\xff\xff\xff\xff\xff invalid")
86
87 def test_non_dict_response_returns_defaults(self) -> None:
88 raw = _mp([1, 2, 3])
89 info = _parse_remote_info(raw)
90 assert info["repo_id"] == ""
91 assert info["branch_heads"] == {}
92
93 def test_missing_fields_get_defaults(self) -> None:
94 raw = _mp({"repo_id": "x"})
95 info = _parse_remote_info(raw)
96 assert info["repo_id"] == "x"
97 assert info["domain"] == "midi"
98 assert info["default_branch"] == "main"
99 assert info["branch_heads"] == {}
100
101 def test_non_string_branch_heads_excluded(self) -> None:
102 raw = _mp({"branch_heads": {"main": "abc", "bad": 123}})
103 info = _parse_remote_info(raw)
104 assert "main" in info["branch_heads"]
105 assert "bad" not in info["branch_heads"]
106
107
108
109 # ---------------------------------------------------------------------------
110 # build_msign_header — module-level signing utility
111 # ---------------------------------------------------------------------------
112
113
114 class TestBuildMsignHeader:
115 def _make_signing(self) -> SigningIdentity:
116 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
117
118 return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate())
119
120 def test_header_format(self) -> None:
121 header = build_msign_header(self._make_signing(), "GET", "https://example.com/path", None)
122 assert header.startswith('MSign handle="testuser"')
123 assert " ts=" in header
124 assert " sig=" in header
125
126 def test_timestamp_is_recent(self) -> None:
127 import time
128
129 before = int(time.time())
130 header = build_msign_header(self._make_signing(), "GET", "https://example.com/p", None)
131 after = int(time.time())
132 ts_part = next(p for p in header.split() if p.startswith("ts="))
133 ts = int(ts_part[3:])
134 assert before <= ts <= after + 1
135
136 def test_signature_is_verifiable(self) -> None:
137 """The sig= value must verify against the Ed25519 public key for the canonical input."""
138 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
139
140 private_key = Ed25519PrivateKey.generate()
141 signing = SigningIdentity(handle="testuser", private_key=private_key)
142 method = "POST"
143 url = "https://hub.example.com/owner/repo/push"
144 body = b"some body data"
145
146 header = build_msign_header(signing, method, url, body)
147
148 parts: dict[str, str] = {}
149 for part in header[len("MSign "):].split():
150 k, _, v = part.partition("=")
151 parts[k] = v.strip('"')
152
153 ts = int(parts["ts"])
154 sig_bytes = b64url_decode(parts["sig"])
155
156 body_hash = "sha256:" + hashlib.sha256(body).hexdigest()
157 canonical = f"ed25519\n{method}\nhub.example.com\n/owner/repo/push\n{ts}\n{body_hash}".encode()
158
159 # raises cryptography.exceptions.InvalidSignature on failure
160 private_key.public_key().verify(sig_bytes, canonical)
161
162 def test_query_string_included_in_canonical(self) -> None:
163 """Query parameters must be part of the signed path, not dropped."""
164 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
165
166 private_key = Ed25519PrivateKey.generate()
167 signing = SigningIdentity(handle="u", private_key=private_key)
168 url = "https://hub.example.com/path?foo=bar&baz=1"
169
170 header = build_msign_header(signing, "GET", url, None)
171
172 parts: dict[str, str] = {}
173 for part in header[len("MSign "):].split():
174 k, _, v = part.partition("=")
175 parts[k] = v.strip('"')
176
177 ts = int(parts["ts"])
178 sig_bytes = b64url_decode(parts["sig"])
179 body_hash = "sha256:" + hashlib.sha256(b"").hexdigest()
180 canonical = f"ed25519\nGET\nhub.example.com\n/path?foo=bar&baz=1\n{ts}\n{body_hash}".encode()
181
182 private_key.public_key().verify(sig_bytes, canonical)
183
184 def test_none_body_treated_as_empty_bytes(self) -> None:
185 """None and b'' must produce the same SHA-256 body hash in the canonical form."""
186 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
187
188 private_key = Ed25519PrivateKey.generate()
189 signing = SigningIdentity(handle="u", private_key=private_key)
190 url = "https://hub.example.com/path"
191
192 header = build_msign_header(signing, "GET", url, None)
193
194 parts: dict[str, str] = {}
195 for part in header[len("MSign "):].split():
196 k, _, v = part.partition("=")
197 parts[k] = v.strip('"')
198
199 ts = int(parts["ts"])
200 sig_bytes = b64url_decode(parts["sig"])
201 # body=None → b"" → sha256(b"") is the canonical body hash
202 body_hash = "sha256:" + hashlib.sha256(b"").hexdigest()
203 canonical = f"ed25519\nGET\nhub.example.com\n/path\n{ts}\n{body_hash}".encode()
204
205 private_key.public_key().verify(sig_bytes, canonical)
206
207 def test_different_methods_produce_different_headers(self) -> None:
208 """Two calls with different methods on the same URL must differ (method is in canonical)."""
209
210 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
211
212 private_key = Ed25519PrivateKey.generate()
213 signing = SigningIdentity(handle="u", private_key=private_key)
214
215 with unittest.mock.patch("muse.core.msign.time") as mt:
216 mt.time.return_value = 1_700_000_000
217 h_get = build_msign_header(signing, "GET", "https://example.com/x", b"")
218 h_post = build_msign_header(signing, "POST", "https://example.com/x", b"")
219
220 assert h_get != h_post
221
222 def test_different_bodies_produce_different_sigs(self) -> None:
223 """Body content must influence the signature (body hash is in canonical)."""
224
225 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
226
227 private_key = Ed25519PrivateKey.generate()
228 signing = SigningIdentity(handle="u", private_key=private_key)
229
230 with unittest.mock.patch("muse.core.msign.time") as mt:
231 mt.time.return_value = 1_700_000_000
232 h1 = build_msign_header(signing, "POST", "https://example.com/x", b"body-a")
233 h2 = build_msign_header(signing, "POST", "https://example.com/x", b"body-b")
234
235 assert h1 != h2
236
237 def test_handle_embedded_in_header(self) -> None:
238 """The MSign header must carry the signing identity's handle."""
239
240 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
241
242 private_key = Ed25519PrivateKey.generate()
243 signing = SigningIdentity(handle="my-agent-42", private_key=private_key)
244 header = build_msign_header(signing, "GET", "https://example.com/x", None)
245 assert 'handle="my-agent-42"' in header
246
247
248 # ---------------------------------------------------------------------------
249 # _parse_mpack
250 # ---------------------------------------------------------------------------
251
252
253 class TestParseBundle:
254 def test_empty_msgpack_object_returns_empty_bundle(self) -> None:
255 mpack = _parse_mpack(_mp({}))
256 assert mpack == {}
257
258 def test_non_dict_returns_empty_bundle(self) -> None:
259 mpack = _parse_mpack(_mp([]))
260 assert mpack == {}
261
262 def test_commits_extracted(self) -> None:
263 raw = _mp(
264 {
265 "commits": [
266 {
267 "commit_id": "c1",
268 "repo_id": "r1",
269 "branch": "main",
270 "snapshot_id": "1" * 64,
271 "message": "test",
272 "committed_at": "2026-01-01T00:00:00+00:00",
273 "parent_commit_id": None,
274 "parent2_commit_id": None,
275 "author": "bob",
276 "metadata": {},
277 }
278 ]
279 }
280 )
281 mpack = _parse_mpack(raw)
282 commits = mpack.get("commits") or []
283 assert len(commits) == 1
284 assert commits[0]["commit_id"] == "c1"
285
286 def test_objects_extracted(self) -> None:
287 raw = _mp(
288 {
289 "blobs": [
290 {
291 "object_id": "abc123",
292 "content": b"hello",
293 }
294 ]
295 }
296 )
297 mpack = _parse_mpack(raw)
298 objs = mpack.get("blobs") or []
299 assert len(objs) == 1
300 assert objs[0]["object_id"] == "abc123"
301 assert objs[0]["content"] == b"hello"
302
303 def test_object_missing_content_excluded(self) -> None:
304 raw = _mp({"blobs": [{"object_id": "abc"}]})
305 mpack = _parse_mpack(raw)
306 assert (mpack.get("blobs") or []) == []
307
308 def test_branch_heads_extracted(self) -> None:
309 raw = _mp({"branch_heads": {"main": "abc123"}})
310 mpack = _parse_mpack(raw)
311 assert mpack.get("branch_heads") == {"main": "abc123"}
312
313
314 # ---------------------------------------------------------------------------
315 # _parse_push_result
316 # ---------------------------------------------------------------------------
317
318
319 class TestParsePushResult:
320 def test_success_response(self) -> None:
321 raw = _mp({"ok": True, "message": "pushed", "branch_heads": {"main": "abc"}})
322 result = _parse_push_result(raw)
323 assert result["ok"] is True
324 assert result["message"] == "pushed"
325 assert result["branch_heads"] == {"main": "abc"}
326
327 def test_failure_response(self) -> None:
328 raw = _mp({"ok": False, "message": "rejected", "branch_heads": {}})
329 result = _parse_push_result(raw)
330 assert result["ok"] is False
331 assert result["message"] == "rejected"
332
333 def test_non_msgpack_raises_transport_error(self) -> None:
334 with pytest.raises(TransportError):
335 _parse_push_result(b"\xff\xff invalid msgpack")
336
337 def test_missing_ok_defaults_false(self) -> None:
338 raw = _mp({"message": "hm", "branch_heads": {}})
339 result = _parse_push_result(raw)
340 assert result["ok"] is False
341
342
343 # ---------------------------------------------------------------------------
344 # HttpTransport — mocked urlopen
345 # ---------------------------------------------------------------------------
346
347
348 def _mock_urllib_do(body: bytes, status: int = 200) -> "Callable[[str, str, dict[str, str], bytes | None], bytes]":
349 """Return a side_effect for patching _urllib_do.
350
351 _urllib_do converts HTTPError/URLError → TransportError internally, so
352 the mock raises TransportError directly to match what callers see.
353 """
354 def _side_effect(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kwargs: "str | int | bool") -> bytes:
355 if status >= 400:
356 raise TransportError(f"HTTP {status}", status)
357 return body
358
359 return _side_effect
360
361
362 class TestHttpTransportFetchRemoteInfo:
363 def test_calls_correct_endpoint(self) -> None:
364 body = _mp({
365 "repo_id": "r1", "domain": "midi",
366 "default_branch": "main", "branch_heads": {"main": "abc"},
367 })
368 calls: list[tuple] = []
369 def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes:
370 calls.append((method, url, headers))
371 return body
372 with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do):
373 info = HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
374 assert calls[0][1] == "https://hub.example.com/repos/r1/refs"
375 assert info["repo_id"] == "r1"
376
377 def test_msign_header_sent(self) -> None:
378 body = _mp({"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}})
379 calls: list[tuple] = []
380 def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes:
381 calls.append(headers)
382 return body
383 signing = _make_signing()
384 with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do):
385 with unittest.mock.patch("muse.core.hub_trust.check_and_pin"):
386 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", signing)
387 auth = calls[0].get("Authorization") or calls[0].get("authorization")
388 assert auth and auth.startswith("MSign handle=\"testuser\"")
389
390 def test_no_token_no_auth_header(self) -> None:
391 body = _mp({"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}})
392 calls: list[tuple] = []
393 def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes:
394 calls.append(headers)
395 return body
396 with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do):
397 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
398 auth = calls[0].get("Authorization") or calls[0].get("authorization")
399 assert auth is None
400
401 def test_http_401_raises_transport_error(self) -> None:
402 with unittest.mock.patch("muse.core.transport._urllib_do",
403 side_effect=_mock_urllib_do(b"Unauthorized", 401)):
404 with pytest.raises(TransportError) as exc_info:
405 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
406 assert exc_info.value.status_code == 401
407
408 def test_http_404_raises_transport_error(self) -> None:
409 with unittest.mock.patch("muse.core.transport._urllib_do",
410 side_effect=_mock_urllib_do(b"Not Found", 404)):
411 with pytest.raises(TransportError) as exc_info:
412 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
413 assert exc_info.value.status_code == 404
414
415 def test_http_500_raises_transport_error(self) -> None:
416 with unittest.mock.patch("muse.core.transport._urllib_do",
417 side_effect=_mock_urllib_do(b"Internal Error", 500)):
418 with pytest.raises(TransportError) as exc_info:
419 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None)
420 assert exc_info.value.status_code == 500
421
422 def test_url_error_raises_transport_error_with_code_0(self) -> None:
423 def _fail(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes:
424 raise TransportError("Name or service not known", 0)
425 with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fail):
426 with pytest.raises(TransportError) as exc_info:
427 HttpTransport().fetch_remote_info("https://unreachable.invalid/r", None)
428 assert exc_info.value.status_code == 0
429
430 def test_trailing_slash_stripped_from_url(self) -> None:
431 body = _mp({"repo_id": "r", "domain": "midi", "default_branch": "main", "branch_heads": {}})
432 calls: list[tuple] = []
433 def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes:
434 calls.append(url)
435 return body
436 with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do):
437 HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1/", None)
438 assert calls[0] == "https://hub.example.com/repos/r1/refs"
439
440
441 # ---------------------------------------------------------------------------
442 # HttpTransport._build_request — credential security and loopback allowlist
443 # ---------------------------------------------------------------------------
444
445
446 class TestBuildRequest:
447 """_build_request enforces HTTPS for non-loopback URLs with signing identity."""
448
449 def _build(self, url: str, with_signing: bool = True) -> "_Request":
450 signing = _make_signing() if with_signing else None
451 with unittest.mock.patch("muse.core.hub_trust.check_and_pin"):
452 return HttpTransport()._build_request("GET", url, signing)
453
454 # ── Loopback hosts allowed over plain HTTP ────────────────────────────
455
456 def test_localhost_http_with_token_allowed(self) -> None:
457 req = self._build("https://localhost:1337/repo/refs")
458 assert req.headers.get("Authorization", "").startswith("MSign ")
459
460 def test_127_0_0_1_http_with_token_allowed(self) -> None:
461 req = self._build("http://127.0.0.1:10003/repo/refs")
462 assert req.headers.get("Authorization", "").startswith("MSign ")
463
464 def test_ipv6_loopback_http_with_token_allowed(self) -> None:
465 req = self._build("http://[::1]:10003/repo/refs")
466 assert req.headers.get("Authorization", "").startswith("MSign ")
467
468 def test_host_docker_internal_http_with_token_allowed(self) -> None:
469 """host.docker.internal is Docker Desktop's alias for the host loopback.
470
471 Agent swarms run inside Docker and use http://host.docker.internal:10003
472 to reach a local MuseHub instance. Credentials must be sent over this
473 plain-HTTP connection — the traffic never leaves the machine.
474 """
475 req = self._build("http://host.docker.internal:10003/gabriel/repo/refs")
476 assert req.headers.get("Authorization", "").startswith("MSign ")
477
478 def test_https_any_host_with_token_allowed(self) -> None:
479 req = self._build("https://musehub.ai/gabriel/repo/refs")
480 assert req.headers.get("Authorization", "").startswith("MSign ")
481
482 # ── Non-loopback HTTP with token must be rejected ─────────────────────
483
484 def test_non_loopback_http_token_raises_transport_error(self) -> None:
485 with pytest.raises(TransportError, match="non-HTTPS"):
486 self._build("http://musehub.ai/gabriel/repo/refs")
487
488 def test_arbitrary_hostname_http_token_raises(self) -> None:
489 with pytest.raises(TransportError, match="non-HTTPS"):
490 self._build("http://attacker.example.com/steal")
491
492 def test_localhost_lookalike_http_token_raises(self) -> None:
493 """'localhost.attacker.example.com' must NOT be mistaken for the loopback interface."""
494 with pytest.raises(TransportError, match="non-HTTPS"):
495 self._build("http://localhost.attacker.example.com/repo/refs")
496
497 def test_host_docker_internal_lookalike_http_token_raises(self) -> None:
498 """'host.docker.internal.attacker.example.com' must not bypass the check."""
499 with pytest.raises(TransportError, match="non-HTTPS"):
500 self._build("http://host.docker.internal.attacker.example.com/repo")
501
502 # ── No token — scheme restriction does not apply ──────────────────────
503
504 def test_non_loopback_http_without_token_allowed(self) -> None:
505 req = self._build("http://musehub.ai/public/repo/refs", with_signing=False)
506 assert "Authorization" not in req.headers
507
508 # ── Request structure ─────────────────────────────────────────────────
509
510 def test_accept_header_always_set(self) -> None:
511 req = self._build("https://musehub.ai/repo/refs")
512 assert "msgpack" in req.headers.get("Accept", "")
513
514 def test_method_preserved(self) -> None:
515 req = HttpTransport()._build_request("POST", "https://musehub.ai/x", None)
516 assert req.method == "POST"
517
518 def test_body_sets_content_type(self) -> None:
519 req = HttpTransport()._build_request(
520 "POST", "https://musehub.ai/x", None, body_bytes=b"data"
521 )
522 assert req.headers.get("Content-Type") == "application/x-msgpack"
523
524 def test_no_body_omits_content_type(self) -> None:
525 req = self._build("https://musehub.ai/x", with_signing=False)
526 assert "Content-Type" not in req.headers
527
528
529 # ---------------------------------------------------------------------------
530 # SIGPIPE regression — large push body with early-close server
531 # ---------------------------------------------------------------------------
532
533
534 def _early_close_server(
535 server_sock: socket.socket,
536 response_code: int,
537 resp_body: bytes,
538 ) -> None:
539 """Accept one connection, read HTTP headers, send response, close immediately.
540
541 Simulates the scenario where the server sends a 4xx/5xx response while
542 the client is still uploading a large request body. Without the
543 ``_ignore_sigpipe`` guard, the client process dies with exit code 141
544 (SIGPIPE) instead of raising ``TransportError``.
545 """
546 try:
547 conn, _ = server_sock.accept()
548 conn.settimeout(5.0)
549 try:
550 buf = b""
551 deadline = time.time() + 5
552 while time.time() < deadline:
553 try:
554 chunk = conn.recv(4096)
555 if not chunk:
556 break
557 buf += chunk
558 if b"\r\n\r\n" in buf:
559 break
560 except socket.timeout:
561 break
562 status_line = f"HTTP/1.1 {response_code} Error\r\n"
563 resp_headers = (
564 "Content-Type: application/json\r\n"
565 f"Content-Length: {len(resp_body)}\r\n"
566 "Connection: close\r\n\r\n"
567 )
568 conn.sendall((status_line + resp_headers).encode() + resp_body)
569 finally:
570 conn.close()
571 except Exception:
572 pass
573 finally:
574 server_sock.close()
575
576
577 class TestSigpipeRegression:
578 """Regression tests for SIGPIPE on large push bodies.
579
580 ``muse/cli/app.py`` sets ``SIGPIPE = SIG_DFL`` so that piping output to
581 ``head``/``grep``/``jq`` exits cleanly. Without the ``_ignore_sigpipe``
582 guard the push command dies with exit 141 when the server closes the
583 connection while the client is still uploading a large body.
584 """
585
586 def _run_early_close_scenario(self, payload_mb: float, response_code: int) -> None:
587 """Assert that TransportError is raised, not a process-killing SIGPIPE."""
588 # Reproduce app.py's startup action — SIG_DFL kills the process on SIGPIPE.
589 if hasattr(signal, "SIGPIPE"):
590 original = signal.signal(signal.SIGPIPE, signal.SIG_DFL)
591 else:
592 original = None
593
594 body = b"X" * int(payload_mb * 1024 * 1024)
595 resp_body = b'{"detail":"test error"}'
596
597 server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
598 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
599 server_sock.bind(("127.0.0.1", 0))
600 server_sock.listen(1)
601 port = server_sock.getsockname()[1]
602
603 t = threading.Thread(
604 target=_early_close_server,
605 args=(server_sock, response_code, resp_body),
606 daemon=True,
607 )
608 t.start()
609
610 try:
611 url = f"http://127.0.0.1:{port}/owner/repo/push"
612 transport = HttpTransport()
613 req = transport._build_request("POST", url, None, body, "application/x-msgpack")
614 with pytest.raises(TransportError):
615 transport._execute(req)
616 finally:
617 t.join(timeout=2)
618 if original is not None and hasattr(signal, "SIGPIPE"):
619 signal.signal(signal.SIGPIPE, original)
620
621 def test_sigpipe_not_fatal_409_large_body(self) -> None:
622 """20 MB body + server closes after headers → TransportError, not exit 141."""
623 self._run_early_close_scenario(payload_mb=20.0, response_code=409)
624
625 def test_sigpipe_not_fatal_401_large_body(self) -> None:
626 """15 MB body + server sends 401 early → TransportError, not SIGPIPE crash."""
627 self._run_early_close_scenario(payload_mb=15.0, response_code=401)
628
629 def test_sigpipe_not_fatal_500_large_body(self) -> None:
630 """10 MB body + server crashes (500) → TransportError, not SIGPIPE crash."""
631 self._run_early_close_scenario(payload_mb=10.0, response_code=500)
File History 2 commits
sha256:79ffe87f5fe2ec146e35f05521218bbf54dffdb0440c07f970bad05f16efb89f chore: merge main — carry all urllib/typing/test fixes from dev Sonnet 4.6 minor 18 days ago
sha256:0bea7600d1eee83e87950be49933b1006fa9dc2c71e7c4ee748d324f61138156 chore: bump version to 0.2.0rc11; fix typing audit violatio… Sonnet 4.6 minor 18 days ago