"""Tests for muse.core.coord_bus — coordination bus HTTP client. Covers all acceptance criteria: Unit: - _build_url: owner/slug %-encoding, path traversal prevention - push_to_hub: request body format, token header, response parsing - pull_from_hub: request body format, cursor, kind filter, limit Integration (mock HTTP): - Successful push returns inserted/skipped counts - Successful pull returns records + cursor - 401 produces CoordBusError with "Authentication failed" - 404 produces CoordBusError with status_code=404 - Network error produces CoordBusError with status_code=0 - Oversized response produces CoordBusError Security: - Owner with path traversal (../etc) is %-encoded in URL - Slug with special chars is %-encoded in URL - Redirect refused (status_code propagated) - Token never appears in error messages (401 body masked) Validation: - push_to_hub with empty records raises ValueError - push_to_hub with too many records raises ValueError - pull_from_hub with out-of-range limit raises ValueError Stress: - 500-record push payload serializes correctly - 1000-record pull response parsed correctly - 100 sequential push_to_hub calls with mock (sub-second) """ from __future__ import annotations import itertools import json import time from contextlib import AbstractContextManager from io import BytesIO from unittest.mock import MagicMock, patch import pytest from muse.core.types import MsgpackDict, content_hash _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) from muse.core.coord_bus import ( CoordBusError, MAX_PULL_LIMIT, MAX_PUSH_BATCH, _build_url, pull_from_hub, push_to_hub, ) # ── Helpers ──────────────────────────────────────────────────────────────────── def _make_signing() -> "SigningIdentity": from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.transport import SigningIdentity return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate()) def _mock_response(body: MsgpackDict, status: int = 200) -> MagicMock: """Build a mock urllib response object for ``_open_url``.""" raw = json.dumps(body).encode("utf-8") resp = MagicMock() resp.read.return_value = raw resp.__enter__ = lambda s: s resp.__exit__ = MagicMock(return_value=False) return resp def _patch_open(body: MsgpackDict, status: int = 200) -> AbstractContextManager[MagicMock]: """Context manager that patches ``_open_url`` in coord_bus.""" return patch( "muse.core.coord_bus._STRICT_OPENER.open", return_value=_mock_response(body, status), ) def _make_record(kind: str = "reservation") -> MsgpackDict: return { "kind": kind, "record_id": _new_id(), "run_id": "agent-1", "payload": {"note": "test"}, } # ── Unit: _build_url ─────────────────────────────────────────────────────────── class TestBuildUrl: def test_basic_url(self) -> None: url = _build_url("https://localhost:1337", "gabriel", "myrepo", "coord/push") assert url == "https://localhost:1337/gabriel/myrepo/coord/push" def test_trailing_slash_stripped_from_hub(self) -> None: url = _build_url("https://localhost:1337/", "gabriel", "myrepo", "coord/push") assert url == "https://localhost:1337/gabriel/myrepo/coord/push" def test_owner_percent_encoded(self) -> None: url = _build_url("https://localhost:1337", "../traversal", "myrepo", "coord/push") assert "../traversal" not in url assert "%2F" in url or "%2E%2E" in url or "..%2Ftraversal" in url def test_slug_percent_encoded(self) -> None: url = _build_url("https://localhost:1337", "gabriel", "my repo", "coord/push") assert " " not in url assert "my%20repo" in url def test_path_traversal_in_owner_encoded(self) -> None: url = _build_url("http://hub", "../../etc", "passwd", "coord/push") # Internal slashes within the owner component MUST be %-encoded so the # owner cannot span multiple path segments (RFC 3986 — %2F ≠ path sep). # The URL structure is /owner/slug/endpoint — only structural '/' chars # are literal; internal ones become %2F. assert "%2F" in url # slashes within owner are encoded # There must be no more than 4 literal slashes: after scheme + host + 3 path seps. path = url.split("://", 1)[1] # strip scheme path_part = path.split("/", 1)[1] if "/" in path else path # strip host # In the path, the ONLY literal '/' must be the owner/slug/endpoint separators. # The traversal '..' must not produce new path segments. assert "..%2F" in url or "%2F.." in url def test_special_chars_in_slug(self) -> None: url = _build_url("http://hub", "gabriel", "my@repo", "coord/pull") assert "my%40repo" in url def test_push_and_pull_endpoints(self) -> None: push_url = _build_url("http://hub", "u", "r", "coord/push") pull_url = _build_url("http://hub", "u", "r", "coord/pull") assert push_url.endswith("coord/push") assert pull_url.endswith("coord/pull") # ── Unit: push_to_hub ───────────────────────────────────────────────────────── class TestPushToHub: def test_push_returns_inserted_skipped(self) -> None: records = [_make_record()] with _patch_open({"inserted": 1, "skipped": 0}): result = push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) assert result["inserted"] == 1 assert result["skipped"] == 0 def test_push_empty_records_raises(self) -> None: with pytest.raises(ValueError, match="non-empty"): push_to_hub("http://hub", "gabriel", "repo", [], signing=_make_signing()) def test_push_too_many_records_raises(self) -> None: records = [_make_record() for _ in range(MAX_PUSH_BATCH + 1)] with pytest.raises(ValueError, match="maximum batch size"): push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) def test_push_sends_authorization_header(self) -> None: records = [_make_record()] with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0}) push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) req = mock_open.call_args[0][0] assert req.get_header("Authorization").startswith("MSign ") def test_push_sends_correct_content_type(self) -> None: records = [_make_record()] with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0}) push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) req = mock_open.call_args[0][0] assert "application/json" in req.get_header("Content-type") def test_push_request_body_structure(self) -> None: records = [_make_record()] captured = {} with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0}) push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) req = mock_open.call_args[0][0] captured["body"] = json.loads(req.data) assert "records" in captured["body"] assert len(captured["body"]["records"]) == 1 def test_push_401_raises_with_auth_message(self) -> None: import urllib.error records = [_make_record()] err = urllib.error.HTTPError( "http://hub", 401, "Unauthorized", {}, BytesIO(b"token leaked here") ) with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) # 401 body must NOT appear in the message (credentials could be reflected). assert "token leaked here" not in str(exc_info.value) assert "Authentication failed" in str(exc_info.value) assert exc_info.value.status_code == 401 def test_push_404_raises_with_status_code(self) -> None: import urllib.error records = [_make_record()] err = urllib.error.HTTPError("http://hub", 404, "Not Found", {}, BytesIO(b"not found")) with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) assert exc_info.value.status_code == 404 def test_push_network_error_raises(self) -> None: import urllib.error records = [_make_record()] err = urllib.error.URLError("Connection refused") with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) assert exc_info.value.status_code == 0 def test_push_oversized_response_raises(self) -> None: from muse.core.coord_bus import MAX_COORD_RESPONSE_BYTES huge_body = b"x" * (MAX_COORD_RESPONSE_BYTES + 2) resp = MagicMock() resp.read.return_value = huge_body resp.__enter__ = lambda s: s resp.__exit__ = MagicMock(return_value=False) records = [_make_record()] with patch("muse.core.coord_bus._STRICT_OPENER.open", return_value=resp): with pytest.raises(CoordBusError, match="exceeded"): push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) def test_push_max_batch_exactly_accepted(self) -> None: records = [_make_record() for _ in range(MAX_PUSH_BATCH)] with _patch_open({"inserted": MAX_PUSH_BATCH, "skipped": 0}): result = push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) assert result["inserted"] == MAX_PUSH_BATCH def test_push_no_token_does_not_send_auth_header(self) -> None: records = [_make_record()] with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0}) push_to_hub("http://hub", "gabriel", "repo", records, signing=None) req = mock_open.call_args[0][0] assert req.get_header("Authorization") is None # ── Unit: pull_from_hub ─────────────────────────────────────────────────────── class TestPullFromHub: def test_pull_returns_records_and_cursor(self) -> None: uid = _new_id() records = [{"id": 1, "kind": "reservation", "record_id": uid, "payload": {}}] with _patch_open({"records": records, "cursor": 1}): result = pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing()) assert len(result["records"]) == 1 assert result["cursor"] == 1 def test_pull_out_of_range_limit_raises(self) -> None: with pytest.raises(ValueError, match="limit must be"): pull_from_hub("http://hub", "gabriel", "repo", limit=0) with pytest.raises(ValueError, match="limit must be"): pull_from_hub("http://hub", "gabriel", "repo", limit=MAX_PULL_LIMIT + 1) def test_pull_sends_correct_body(self) -> None: captured = {} with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"records": [], "cursor": 0}) pull_from_hub( "http://hub", "gabriel", "repo", since_id=42, kinds=["reservation"], limit=100, signing=_make_signing(), ) req = mock_open.call_args[0][0] captured["body"] = json.loads(req.data) assert captured["body"]["since_id"] == 42 assert "reservation" in captured["body"]["kinds"] assert captured["body"]["limit"] == 100 def test_pull_empty_kinds_sends_empty_list(self) -> None: captured = {} with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"records": [], "cursor": 0}) pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing()) req = mock_open.call_args[0][0] captured["body"] = json.loads(req.data) assert captured["body"]["kinds"] == [] def test_pull_401_raises_with_masked_body(self) -> None: import urllib.error err = urllib.error.HTTPError( "http://hub", 401, "Unauthorized", {}, BytesIO(b"secret_token_here") ) with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err): with pytest.raises(CoordBusError) as exc_info: pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing()) assert "secret_token_here" not in str(exc_info.value) assert exc_info.value.status_code == 401 def test_pull_network_error(self) -> None: import urllib.error err = urllib.error.URLError("Name resolution failure") with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err): with pytest.raises(CoordBusError) as exc_info: pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing()) assert exc_info.value.status_code == 0 def test_pull_default_since_id_is_zero(self) -> None: captured = {} with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"records": [], "cursor": 0}) pull_from_hub("http://hub", "gabriel", "repo") req = mock_open.call_args[0][0] captured["body"] = json.loads(req.data) assert captured["body"]["since_id"] == 0 def test_pull_exact_max_limit_accepted(self) -> None: with _patch_open({"records": [], "cursor": 0}): result = pull_from_hub( "http://hub", "gabriel", "repo", limit=MAX_PULL_LIMIT ) assert result["cursor"] == 0 # ── Security tests ───────────────────────────────────────────────────────────── class TestCoordBusSecurity: def test_redirect_refused(self) -> None: """_NoRedirectHandler refuses all redirects.""" import urllib.error from muse.core.coord_bus import _NoRedirectHandler handler = _NoRedirectHandler() req_mock = MagicMock() req_mock.full_url = "http://hub/push" fp_mock = MagicMock() headers_mock = MagicMock() with pytest.raises(urllib.error.HTTPError) as exc_info: handler.redirect_request( req_mock, fp_mock, 301, "Moved Permanently", headers_mock, "http://other-host/malicious" ) assert "Redirect refused" in str(exc_info.value.msg) def test_401_body_never_exposed(self) -> None: """401 response body must not appear in CoordBusError message.""" import urllib.error sensitive = "SENSITIVE_CREDENTIAL_DATA" err = urllib.error.HTTPError("http://hub", 401, "Unauth", {}, BytesIO(sensitive.encode())) records = [_make_record()] with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) assert "SENSITIVE_TOKEN" not in str(exc_info.value) def test_signing_not_in_url(self) -> None: """Signing identity must appear only in Authorization header, never in the URL.""" records = [_make_record()] with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open: mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0}) si = _make_signing() push_to_hub("http://hub", "gabriel", "repo", records, signing=si) req = mock_open.call_args[0][0] assert "testuser" not in req.full_url def test_invalid_json_response_raises(self) -> None: """Non-JSON response body raises CoordBusError, not unhandled exception.""" resp = MagicMock() resp.read.return_value = b"not json at all %%" resp.__enter__ = lambda s: s resp.__exit__ = MagicMock(return_value=False) records = [_make_record()] with patch("muse.core.coord_bus._STRICT_OPENER.open", return_value=resp): with pytest.raises(CoordBusError, match="Invalid JSON"): push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) # ── Stress tests ─────────────────────────────────────────────────────────────── class TestCoordBusStress: def test_push_500_records_serializes_fast(self) -> None: """500-record batch push serializes and deserializes in < 1 second.""" records = [_make_record() for _ in range(500)] t0 = time.monotonic() with _patch_open({"inserted": 500, "skipped": 0}): result = push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) elapsed = time.monotonic() - t0 assert result["inserted"] == 500 assert elapsed < 1.0, f"500-record push took {elapsed:.2f}s" def test_pull_1000_records_parsed_fast(self) -> None: """Parsing a 1000-record pull response completes in < 1 second.""" uid_list = [_new_id() for _ in range(1000)] records = [ {"id": i + 1, "kind": "reservation", "record_id": uid, "payload": {}} for i, uid in enumerate(uid_list) ] t0 = time.monotonic() with _patch_open({"records": records, "cursor": 1000}): result = pull_from_hub("http://hub", "gabriel", "repo", limit=1000) elapsed = time.monotonic() - t0 assert len(result["records"]) == 1000 assert elapsed < 1.0, f"1000-record pull took {elapsed:.2f}s" def test_100_sequential_push_calls_with_mock(self) -> None: """100 sequential push_to_hub calls complete in < 2 seconds with mocked HTTP.""" records = [_make_record()] t0 = time.monotonic() for _ in range(100): with _patch_open({"inserted": 1, "skipped": 0}): push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing()) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"100 sequential pushes took {elapsed:.2f}s" def test_build_url_100k_calls_fast(self) -> None: """_build_url is called frequently; 100k calls must complete in < 1s.""" t0 = time.monotonic() for _ in range(100_000): _build_url("https://localhost:1337", "gabriel", "myrepo", "coord/push") elapsed = time.monotonic() - t0 assert elapsed < 1.0, f"100k _build_url calls took {elapsed:.2f}s"