"""Tests for muse.core.transport — HttpTransport and response parsers.""" from __future__ import annotations import json import signal import socket import threading import time import unittest.mock import msgpack import pytest import hashlib from muse.core.types import MsgpackDict, b64url_decode from muse.core.mpack import MPack, RemoteInfo from muse.core.msign import build_msign_header from muse.core.transport import ( _Request, HttpTransport, SigningIdentity, TransportError, _parse_mpack, _parse_push_result, _parse_remote_info, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_signing() -> "SigningIdentity": """Generate a fresh Ed25519 SigningIdentity for tests.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.transport import SigningIdentity return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate()) def _mock_response( body: bytes, status: int = 200, content_type: str = "application/x-msgpack", ) -> unittest.mock.MagicMock: """Return a mock httpx response.""" resp = unittest.mock.MagicMock() resp.content = body resp.status_code = status resp.headers = {"Content-Type": content_type} return resp def _mp(data: MsgpackDict) -> bytes: """Encode data as msgpack.""" return msgpack.packb(data, use_bin_type=True) # --------------------------------------------------------------------------- # _parse_remote_info # --------------------------------------------------------------------------- class TestParseRemoteInfo: def test_valid_response(self) -> None: raw = _mp( { "repo_id": "r123", "domain": "midi", "default_branch": "main", "branch_heads": {"main": "abc123", "dev": "def456"}, } ) info = _parse_remote_info(raw) assert info["repo_id"] == "r123" assert info["domain"] == "midi" assert info["default_branch"] == "main" assert info["branch_heads"] == {"main": "abc123", "dev": "def456"} def test_invalid_msgpack_raises_transport_error(self) -> None: with pytest.raises(TransportError): _parse_remote_info(b"\xff\xff\xff\xff\xff invalid") def test_non_dict_response_returns_defaults(self) -> None: raw = _mp([1, 2, 3]) info = _parse_remote_info(raw) assert info["repo_id"] == "" assert info["branch_heads"] == {} def test_missing_fields_get_defaults(self) -> None: raw = _mp({"repo_id": "x"}) info = _parse_remote_info(raw) assert info["repo_id"] == "x" assert info["domain"] == "midi" assert info["default_branch"] == "main" assert info["branch_heads"] == {} def test_non_string_branch_heads_excluded(self) -> None: raw = _mp({"branch_heads": {"main": "abc", "bad": 123}}) info = _parse_remote_info(raw) assert "main" in info["branch_heads"] assert "bad" not in info["branch_heads"] # --------------------------------------------------------------------------- # build_msign_header — module-level signing utility # --------------------------------------------------------------------------- class TestBuildMsignHeader: def _make_signing(self) -> SigningIdentity: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate()) def test_header_format(self) -> None: header = build_msign_header(self._make_signing(), "GET", "https://example.com/path", None) assert header.startswith('MSign handle="testuser"') assert " ts=" in header assert " sig=" in header def test_timestamp_is_recent(self) -> None: import time before = int(time.time()) header = build_msign_header(self._make_signing(), "GET", "https://example.com/p", None) after = int(time.time()) ts_part = next(p for p in header.split() if p.startswith("ts=")) ts = int(ts_part[3:]) assert before <= ts <= after + 1 def test_signature_is_verifiable(self) -> None: """The sig= value must verify against the Ed25519 public key for the canonical input.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="testuser", private_key=private_key) method = "POST" url = "https://hub.example.com/owner/repo/push" body = b"some body data" header = build_msign_header(signing, method, url, body) parts: dict[str, str] = {} for part in header[len("MSign "):].split(): k, _, v = part.partition("=") parts[k] = v.strip('"') ts = int(parts["ts"]) sig_bytes = b64url_decode(parts["sig"]) body_hash = "sha256:" + hashlib.sha256(body).hexdigest() canonical = f"ed25519\n{method}\nhub.example.com\n/owner/repo/push\n{ts}\n{body_hash}".encode() # raises cryptography.exceptions.InvalidSignature on failure private_key.public_key().verify(sig_bytes, canonical) def test_query_string_included_in_canonical(self) -> None: """Query parameters must be part of the signed path, not dropped.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="u", private_key=private_key) url = "https://hub.example.com/path?foo=bar&baz=1" header = build_msign_header(signing, "GET", url, None) parts: dict[str, str] = {} for part in header[len("MSign "):].split(): k, _, v = part.partition("=") parts[k] = v.strip('"') ts = int(parts["ts"]) sig_bytes = b64url_decode(parts["sig"]) body_hash = "sha256:" + hashlib.sha256(b"").hexdigest() canonical = f"ed25519\nGET\nhub.example.com\n/path?foo=bar&baz=1\n{ts}\n{body_hash}".encode() private_key.public_key().verify(sig_bytes, canonical) def test_none_body_treated_as_empty_bytes(self) -> None: """None and b'' must produce the same SHA-256 body hash in the canonical form.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="u", private_key=private_key) url = "https://hub.example.com/path" header = build_msign_header(signing, "GET", url, None) parts: dict[str, str] = {} for part in header[len("MSign "):].split(): k, _, v = part.partition("=") parts[k] = v.strip('"') ts = int(parts["ts"]) sig_bytes = b64url_decode(parts["sig"]) # body=None → b"" → sha256(b"") is the canonical body hash body_hash = "sha256:" + hashlib.sha256(b"").hexdigest() canonical = f"ed25519\nGET\nhub.example.com\n/path\n{ts}\n{body_hash}".encode() private_key.public_key().verify(sig_bytes, canonical) def test_different_methods_produce_different_headers(self) -> None: """Two calls with different methods on the same URL must differ (method is in canonical).""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="u", private_key=private_key) with unittest.mock.patch("muse.core.msign.time") as mt: mt.time.return_value = 1_700_000_000 h_get = build_msign_header(signing, "GET", "https://example.com/x", b"") h_post = build_msign_header(signing, "POST", "https://example.com/x", b"") assert h_get != h_post def test_different_bodies_produce_different_sigs(self) -> None: """Body content must influence the signature (body hash is in canonical).""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="u", private_key=private_key) with unittest.mock.patch("muse.core.msign.time") as mt: mt.time.return_value = 1_700_000_000 h1 = build_msign_header(signing, "POST", "https://example.com/x", b"body-a") h2 = build_msign_header(signing, "POST", "https://example.com/x", b"body-b") assert h1 != h2 def test_handle_embedded_in_header(self) -> None: """The MSign header must carry the signing identity's handle.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="my-agent-42", private_key=private_key) header = build_msign_header(signing, "GET", "https://example.com/x", None) assert 'handle="my-agent-42"' in header # --------------------------------------------------------------------------- # _parse_mpack # --------------------------------------------------------------------------- class TestParseBundle: def test_empty_msgpack_object_returns_empty_bundle(self) -> None: mpack = _parse_mpack(_mp({})) assert mpack == {} def test_non_dict_returns_empty_bundle(self) -> None: mpack = _parse_mpack(_mp([])) assert mpack == {} def test_commits_extracted(self) -> None: raw = _mp( { "commits": [ { "commit_id": "c1", "repo_id": "r1", "branch": "main", "snapshot_id": "1" * 64, "message": "test", "committed_at": "2026-01-01T00:00:00+00:00", "parent_commit_id": None, "parent2_commit_id": None, "author": "bob", "metadata": {}, } ] } ) mpack = _parse_mpack(raw) commits = mpack.get("commits") or [] assert len(commits) == 1 assert commits[0]["commit_id"] == "c1" def test_objects_extracted(self) -> None: raw = _mp( { "blobs": [ { "object_id": "abc123", "content": b"hello", } ] } ) mpack = _parse_mpack(raw) objs = mpack.get("blobs") or [] assert len(objs) == 1 assert objs[0]["object_id"] == "abc123" assert objs[0]["content"] == b"hello" def test_object_missing_content_excluded(self) -> None: raw = _mp({"blobs": [{"object_id": "abc"}]}) mpack = _parse_mpack(raw) assert (mpack.get("blobs") or []) == [] def test_branch_heads_extracted(self) -> None: raw = _mp({"branch_heads": {"main": "abc123"}}) mpack = _parse_mpack(raw) assert mpack.get("branch_heads") == {"main": "abc123"} # --------------------------------------------------------------------------- # _parse_push_result # --------------------------------------------------------------------------- class TestParsePushResult: def test_success_response(self) -> None: raw = _mp({"ok": True, "message": "pushed", "branch_heads": {"main": "abc"}}) result = _parse_push_result(raw) assert result["ok"] is True assert result["message"] == "pushed" assert result["branch_heads"] == {"main": "abc"} def test_failure_response(self) -> None: raw = _mp({"ok": False, "message": "rejected", "branch_heads": {}}) result = _parse_push_result(raw) assert result["ok"] is False assert result["message"] == "rejected" def test_non_msgpack_raises_transport_error(self) -> None: with pytest.raises(TransportError): _parse_push_result(b"\xff\xff invalid msgpack") def test_missing_ok_defaults_false(self) -> None: raw = _mp({"message": "hm", "branch_heads": {}}) result = _parse_push_result(raw) assert result["ok"] is False # --------------------------------------------------------------------------- # HttpTransport — mocked urlopen # --------------------------------------------------------------------------- def _mock_urllib_do(body: bytes, status: int = 200) -> "Callable[[str, str, dict[str, str], bytes | None], bytes]": """Return a side_effect for patching _urllib_do. _urllib_do converts HTTPError/URLError → TransportError internally, so the mock raises TransportError directly to match what callers see. """ def _side_effect(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kwargs: "str | int | bool") -> bytes: if status >= 400: raise TransportError(f"HTTP {status}", status) return body return _side_effect class TestHttpTransportFetchRemoteInfo: def test_calls_correct_endpoint(self) -> None: body = _mp({ "repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {"main": "abc"}, }) calls: list[tuple] = [] def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes: calls.append((method, url, headers)) return body with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do): info = HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) assert calls[0][1] == "https://hub.example.com/repos/r1/refs" assert info["repo_id"] == "r1" def test_msign_header_sent(self) -> None: body = _mp({"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}}) calls: list[tuple] = [] def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes: calls.append(headers) return body signing = _make_signing() with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do): with unittest.mock.patch("muse.core.hub_trust.check_and_pin"): HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", signing) auth = calls[0].get("Authorization") or calls[0].get("authorization") assert auth and auth.startswith("MSign handle=\"testuser\"") def test_no_token_no_auth_header(self) -> None: body = _mp({"repo_id": "r1", "domain": "midi", "default_branch": "main", "branch_heads": {}}) calls: list[tuple] = [] def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes: calls.append(headers) return body with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do): HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) auth = calls[0].get("Authorization") or calls[0].get("authorization") assert auth is None def test_http_401_raises_transport_error(self) -> None: with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_mock_urllib_do(b"Unauthorized", 401)): with pytest.raises(TransportError) as exc_info: HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) assert exc_info.value.status_code == 401 def test_http_404_raises_transport_error(self) -> None: with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_mock_urllib_do(b"Not Found", 404)): with pytest.raises(TransportError) as exc_info: HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) assert exc_info.value.status_code == 404 def test_http_500_raises_transport_error(self) -> None: with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_mock_urllib_do(b"Internal Error", 500)): with pytest.raises(TransportError) as exc_info: HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1", None) assert exc_info.value.status_code == 500 def test_url_error_raises_transport_error_with_code_0(self) -> None: def _fail(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes: raise TransportError("Name or service not known", 0) with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fail): with pytest.raises(TransportError) as exc_info: HttpTransport().fetch_remote_info("https://unreachable.invalid/r", None) assert exc_info.value.status_code == 0 def test_trailing_slash_stripped_from_url(self) -> None: body = _mp({"repo_id": "r", "domain": "midi", "default_branch": "main", "branch_heads": {}}) calls: list[tuple] = [] def _fake_do(method: str, url: str, headers: "dict[str, str]", data: "bytes | None" = None, **kw: "str | int | bool") -> bytes: calls.append(url) return body with unittest.mock.patch("muse.core.transport._urllib_do", side_effect=_fake_do): HttpTransport().fetch_remote_info("https://hub.example.com/repos/r1/", None) assert calls[0] == "https://hub.example.com/repos/r1/refs" # --------------------------------------------------------------------------- # HttpTransport._build_request — credential security and loopback allowlist # --------------------------------------------------------------------------- class TestBuildRequest: """_build_request enforces HTTPS for non-loopback URLs with signing identity.""" def _build(self, url: str, with_signing: bool = True) -> "_Request": signing = _make_signing() if with_signing else None with unittest.mock.patch("muse.core.hub_trust.check_and_pin"): return HttpTransport()._build_request("GET", url, signing) # ── Loopback hosts allowed over plain HTTP ──────────────────────────── def test_localhost_http_with_token_allowed(self) -> None: req = self._build("https://localhost:1337/repo/refs") assert req.headers.get("Authorization", "").startswith("MSign ") def test_127_0_0_1_http_with_token_allowed(self) -> None: req = self._build("http://127.0.0.1:10003/repo/refs") assert req.headers.get("Authorization", "").startswith("MSign ") def test_ipv6_loopback_http_with_token_allowed(self) -> None: req = self._build("http://[::1]:10003/repo/refs") assert req.headers.get("Authorization", "").startswith("MSign ") def test_host_docker_internal_http_with_token_allowed(self) -> None: """host.docker.internal is Docker Desktop's alias for the host loopback. Agent swarms run inside Docker and use http://host.docker.internal:10003 to reach a local MuseHub instance. Credentials must be sent over this plain-HTTP connection — the traffic never leaves the machine. """ req = self._build("http://host.docker.internal:10003/gabriel/repo/refs") assert req.headers.get("Authorization", "").startswith("MSign ") def test_https_any_host_with_token_allowed(self) -> None: req = self._build("https://musehub.ai/gabriel/repo/refs") assert req.headers.get("Authorization", "").startswith("MSign ") # ── Non-loopback HTTP with token must be rejected ───────────────────── def test_non_loopback_http_token_raises_transport_error(self) -> None: with pytest.raises(TransportError, match="non-HTTPS"): self._build("http://musehub.ai/gabriel/repo/refs") def test_arbitrary_hostname_http_token_raises(self) -> None: with pytest.raises(TransportError, match="non-HTTPS"): self._build("http://attacker.example.com/steal") def test_localhost_lookalike_http_token_raises(self) -> None: """'localhost.attacker.example.com' must NOT be mistaken for the loopback interface.""" with pytest.raises(TransportError, match="non-HTTPS"): self._build("http://localhost.attacker.example.com/repo/refs") def test_host_docker_internal_lookalike_http_token_raises(self) -> None: """'host.docker.internal.attacker.example.com' must not bypass the check.""" with pytest.raises(TransportError, match="non-HTTPS"): self._build("http://host.docker.internal.attacker.example.com/repo") # ── No token — scheme restriction does not apply ────────────────────── def test_non_loopback_http_without_token_allowed(self) -> None: req = self._build("http://musehub.ai/public/repo/refs", with_signing=False) assert "Authorization" not in req.headers # ── Request structure ───────────────────────────────────────────────── def test_accept_header_always_set(self) -> None: req = self._build("https://musehub.ai/repo/refs") assert "msgpack" in req.headers.get("Accept", "") def test_method_preserved(self) -> None: req = HttpTransport()._build_request("POST", "https://musehub.ai/x", None) assert req.method == "POST" def test_body_sets_content_type(self) -> None: req = HttpTransport()._build_request( "POST", "https://musehub.ai/x", None, body_bytes=b"data" ) assert req.headers.get("Content-Type") == "application/x-msgpack" def test_no_body_omits_content_type(self) -> None: req = self._build("https://musehub.ai/x", with_signing=False) assert "Content-Type" not in req.headers # --------------------------------------------------------------------------- # SIGPIPE regression — large push body with early-close server # --------------------------------------------------------------------------- def _early_close_server( server_sock: socket.socket, response_code: int, resp_body: bytes, ) -> None: """Accept one connection, read HTTP headers, send response, close immediately. Simulates the scenario where the server sends a 4xx/5xx response while the client is still uploading a large request body. Without the ``_ignore_sigpipe`` guard, the client process dies with exit code 141 (SIGPIPE) instead of raising ``TransportError``. """ try: conn, _ = server_sock.accept() conn.settimeout(5.0) try: buf = b"" deadline = time.time() + 5 while time.time() < deadline: try: chunk = conn.recv(4096) if not chunk: break buf += chunk if b"\r\n\r\n" in buf: break except socket.timeout: break status_line = f"HTTP/1.1 {response_code} Error\r\n" resp_headers = ( "Content-Type: application/json\r\n" f"Content-Length: {len(resp_body)}\r\n" "Connection: close\r\n\r\n" ) conn.sendall((status_line + resp_headers).encode() + resp_body) finally: conn.close() except Exception: pass finally: server_sock.close() class TestSigpipeRegression: """Regression tests for SIGPIPE on large push bodies. ``muse/cli/app.py`` sets ``SIGPIPE = SIG_DFL`` so that piping output to ``head``/``grep``/``jq`` exits cleanly. Without the ``_ignore_sigpipe`` guard the push command dies with exit 141 when the server closes the connection while the client is still uploading a large body. """ def _run_early_close_scenario(self, payload_mb: float, response_code: int) -> None: """Assert that TransportError is raised, not a process-killing SIGPIPE.""" # Reproduce app.py's startup action — SIG_DFL kills the process on SIGPIPE. if hasattr(signal, "SIGPIPE"): original = signal.signal(signal.SIGPIPE, signal.SIG_DFL) else: original = None body = b"X" * int(payload_mb * 1024 * 1024) resp_body = b'{"detail":"test error"}' server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_sock.bind(("127.0.0.1", 0)) server_sock.listen(1) port = server_sock.getsockname()[1] t = threading.Thread( target=_early_close_server, args=(server_sock, response_code, resp_body), daemon=True, ) t.start() try: url = f"http://127.0.0.1:{port}/owner/repo/push" transport = HttpTransport() req = transport._build_request("POST", url, None, body, "application/x-msgpack") with pytest.raises(TransportError): transport._execute(req) finally: t.join(timeout=2) if original is not None and hasattr(signal, "SIGPIPE"): signal.signal(signal.SIGPIPE, original) def test_sigpipe_not_fatal_409_large_body(self) -> None: """20 MB body + server closes after headers → TransportError, not exit 141.""" self._run_early_close_scenario(payload_mb=20.0, response_code=409) def test_sigpipe_not_fatal_401_large_body(self) -> None: """15 MB body + server sends 401 early → TransportError, not SIGPIPE crash.""" self._run_early_close_scenario(payload_mb=15.0, response_code=401) def test_sigpipe_not_fatal_500_large_body(self) -> None: """10 MB body + server crashes (500) → TransportError, not SIGPIPE crash.""" self._run_early_close_scenario(payload_mb=10.0, response_code=500)