"""TDD tests for two _hub_api hardening fixes. Bug A — localhost SSL --------------------- ``_hub_api`` uses bare ``urllib.request.urlopen`` without an SSL context. For ``https://localhost:1337`` (self-signed cert), this raises ``CERTIFICATE_VERIFY_FAILED`` even though the cert IS trusted by muse (the same CA cert that ``muse push`` loads via ``transport._httpx_verify`` is stored at ``musehub/deploy/local-tls/localhost.crt``). Expected: ``_hub_api`` must build an SSL context that loads the local CA cert for localhost URLs, mirroring ``transport._httpx_verify``. Bug B — unauthenticated GET on public resources ----------------------------------------------- ``_hub_api`` exits non-zero when no signing identity is found, even for GET requests on public repos (e.g. reading issues on a public MuseHub repo). Expected: for GET requests, if no signing identity is available, proceed without an ``Authorization`` header rather than exiting with an error. Mutating requests (POST/PUT/DELETE/PATCH) still require auth. Seven test tiers ---------------- Unit — function-level mocks, no network Contract — mock HTTP server, verify wire behaviour Integration — CliRunner through the real command chain Property — hypothesis: any localhost URL gets a non-default SSL context Regression — previously failing cases pinned as non-regressions Security — auth header absent on public GET; present on mutating requests Stress — 8 threads calling _hub_api concurrently, no deadlock """ from __future__ import annotations import io import json import ssl import threading import urllib.error import urllib.request from typing import TYPE_CHECKING from unittest.mock import MagicMock, call, patch import pytest from hypothesis import given, settings from hypothesis import strategies as st from muse.core.types import fake_id from tests.cli_test_helper import CliRunner type _IdentityDict = dict[str, str] if TYPE_CHECKING: pass runner = CliRunner() # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- _IDENTITY: _IdentityDict = {"type": "human", "handle": "gabriel"} _HUB_LOCAL = "https://localhost:1337" _HUB_REMOTE = "https://staging.musehub.ai" def _mock_resp(body: bytes = b'{"ok": true}') -> MagicMock: resp = MagicMock() resp.__enter__ = lambda s: s resp.__exit__ = MagicMock(return_value=False) resp.read.return_value = body return resp def _make_signing() -> MagicMock: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey key = MagicMock(spec=Ed25519PrivateKey) key.sign.return_value = b"\x00" * 64 key.public_key.return_value.public_bytes.return_value = b"\x00" * 32 from muse.core.transport import SigningIdentity return SigningIdentity(handle="gabriel", private_key=key) # =========================================================================== # Tier 1 — Unit # =========================================================================== class TestHubApiSslContextUnit: """_hub_api selects the correct SSL context for each host.""" def test_localhost_https_gets_custom_ssl_context(self) -> None: """Bug A: localhost HTTPS must use local CA cert, not system store.""" from muse.cli.commands.hub._core import _hub_api captured_kwargs: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured_kwargs.append({"context": context}) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api(_HUB_LOCAL, _IDENTITY, "GET", "/api/test") assert captured_kwargs, "urlopen was not called" ctx = captured_kwargs[0]["context"] assert ctx is not None, ( "_hub_api passed context=None to urlopen for localhost — " "self-signed cert will be rejected by the OS CA store" ) assert isinstance(ctx, ssl.SSLContext), ( f"Expected ssl.SSLContext, got {type(ctx)}" ) def test_remote_https_uses_system_ca(self) -> None: """Non-localhost HTTPS must use the system CA store (context=None).""" from muse.cli.commands.hub._core import _hub_api captured_kwargs: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured_kwargs.append({"context": context}) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api(_HUB_REMOTE, _IDENTITY, "GET", "/api/test") assert captured_kwargs ctx = captured_kwargs[0]["context"] assert ctx is None, ( "Non-localhost hub must use system CA (context=None), " f"but got {ctx!r}" ) def test_127_0_0_1_https_gets_custom_ssl_context(self) -> None: """127.0.0.1 is loopback — also gets the local CA cert context.""" from muse.cli.commands.hub._core import _hub_api captured_kwargs: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured_kwargs.append({"context": context}) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api("https://127.0.0.1:1337", _IDENTITY, "GET", "/api/test") ctx = captured_kwargs[0]["context"] assert ctx is not None and isinstance(ctx, ssl.SSLContext) class TestHubApiPublicGetUnit: """Bug B: unauthenticated GET on public endpoint proceeds without auth.""" def test_get_without_signing_proceeds(self) -> None: """No signing identity → GET proceeds without Authorization header.""" from muse.cli.commands.hub._core import _hub_api captured_headers: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured_headers.append(dict(req.headers)) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", side_effect=fake_urlopen): result = _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") assert result == {"ok": True} assert captured_headers, "urlopen was never called" assert "Authorization" not in captured_headers[0], ( "Authorization header must not be sent when no signing identity is available" ) def test_get_without_signing_does_not_exit(self) -> None: """Bug B: no signing identity on GET must NOT raise SystemExit.""" from muse.cli.commands.hub._core import _hub_api with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", return_value=_mock_resp()): # Must not raise SystemExit result = _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") assert isinstance(result, dict) def test_post_without_signing_exits(self) -> None: """POST without signing identity must still exit — mutating ops need auth.""" from muse.cli.commands.hub._core import _hub_api with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen") as mock_net: with pytest.raises(SystemExit): _hub_api("http://localhost:9999", _IDENTITY, "POST", "/api/test", body={"key": "val"}) mock_net.assert_not_called() def test_delete_without_signing_exits(self) -> None: """DELETE without signing must exit.""" from muse.cli.commands.hub._core import _hub_api with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen") as mock_net: with pytest.raises(SystemExit): _hub_api("http://localhost:9999", _IDENTITY, "DELETE", "/api/test") mock_net.assert_not_called() def test_get_with_signing_includes_auth_header(self) -> None: """When signing IS available, GET requests include Authorization.""" from muse.cli.commands.hub._core import _hub_api captured_headers: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured_headers.append(dict(req.headers)) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") assert "Authorization" in captured_headers[0], ( "Authorization header must be included when signing identity is available" ) # =========================================================================== # Tier 2 — Contract (mock HTTP server verifying wire behaviour) # =========================================================================== class TestHubApiContract: """Verify wire-level behaviour through a mock HTTP handler.""" def test_public_get_sends_no_auth_header(self) -> None: """Wire: GET to public endpoint carries no Authorization field.""" from muse.cli.commands.hub._core import _hub_api received: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: received.append({"headers": dict(req.headers), "method": req.get_method()}) return _mock_resp(b'{"number": 5, "title": "muse bridge"}') with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", side_effect=fake_urlopen): result = _hub_api("http://localhost:9999", _IDENTITY, "GET", "/gabriel/musehub/issues/5") assert received[0]["method"] == "GET" assert "Authorization" not in received[0]["headers"] assert result.get("number") == 5 def test_authenticated_get_sends_msign_header(self) -> None: """Wire: GET with identity carries MSign Authorization header.""" from muse.cli.commands.hub._core import _hub_api received: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: received.append({"headers": dict(req.headers)}) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") auth = received[0]["headers"].get("Authorization", "") assert auth.startswith("MSign "), f"Expected MSign header, got: {auth!r}" def test_localhost_ssl_context_cafile_set(self) -> None: """Wire: SSL context passed to urlopen for localhost loads the CA file.""" from muse.cli.commands.hub._core import _hub_api import pathlib local_cert = ( pathlib.Path(__file__).parent.parent.parent / "musehub" / "deploy" / "local-tls" / "localhost.crt" ) captured_ctx: list[ssl.SSLContext | None] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured_ctx.append(context) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api(_HUB_LOCAL, _IDENTITY, "GET", "/api/test") ctx = captured_ctx[0] assert ctx is not None # The context must have the local cert loaded — verify it accepts localhost if local_cert.exists(): # Build the same context directly and compare verify_mode expected = ssl.create_default_context(cafile=str(local_cert)) assert ctx.verify_mode == expected.verify_mode # =========================================================================== # Tier 3 — Integration (CliRunner through real command chain) # =========================================================================== class TestHubIssueReadPublicIntegration: """muse hub issue read on a public repo works without a signing identity.""" _FAKE_REPO_ID = fake_id("repo") def test_issue_read_public_no_auth_succeeds(self) -> None: """Integration: hub issue read proceeds without signing identity.""" issue_body = json.dumps({ "number": 5, "title": "muse bridge", "body": "Implement muse bridge for git interop", "state": "open", "labels": [], }).encode() with patch("muse.cli.config.get_signing_identity", return_value=None): with patch( "muse.cli.commands.hub._resolve_repo_id", return_value=self._FAKE_REPO_ID, ): with patch("urllib.request.urlopen", return_value=_mock_resp(issue_body)): result = runner.invoke( None, ["hub", "issue", "read", "5", "--hub", "http://localhost:9999/gabriel/musehub", "--json"], ) assert result.exit_code == 0, ( f"exit_code={result.exit_code}\n{result.output}\n{result.stderr}" ) data = json.loads(result.output) assert data.get("number") == 5 or "title" in data or data.get("exit_code") == 0 def test_issue_list_public_no_auth_succeeds(self) -> None: """Integration: hub issue list on public repo proceeds without auth.""" issues_body = json.dumps({ "issues": [{"number": 5, "title": "muse bridge", "state": "open", "labels": []}], "total": 1, }).encode() with patch("muse.cli.config.get_signing_identity", return_value=None): with patch( "muse.cli.commands.hub._resolve_repo_id", return_value=self._FAKE_REPO_ID, ): with patch("urllib.request.urlopen", return_value=_mock_resp(issues_body)): result = runner.invoke( None, ["hub", "issue", "list", "--hub", "http://localhost:9999/gabriel/musehub", "--json"], ) # Must not fail with "not authenticated" error assert "signing" not in result.stderr.lower() assert "not authenticated" not in result.stderr.lower() # =========================================================================== # Tier 4 — Property (hypothesis) # =========================================================================== class TestHubApiSslContextProperty: """Property: any localhost/127 URL produces a non-None SSL context.""" @given( port=st.integers(min_value=1024, max_value=65535), path=st.text( alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd"), whitelist_characters="/-_"), min_size=1, max_size=40, ), ) @settings(max_examples=20, deadline=2000) def test_localhost_always_gets_custom_context(self, port: int, path: str) -> None: """Any https://localhost: must produce a non-None SSL context.""" from muse.cli.commands.hub._core import _hub_api captured: list[ssl.SSLContext | None] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured.append(context) return _mock_resp() url = f"https://localhost:{port}" with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): try: _hub_api(url, _IDENTITY, "GET", f"/{path.lstrip('/')}") except SystemExit: pass # scheme or URL validation may exit — that's fine if captured: assert captured[0] is not None, ( f"localhost:{port} got None SSL context — self-signed cert will be rejected" ) @given(method=st.sampled_from(["POST", "PUT", "DELETE", "PATCH"])) @settings(max_examples=10, deadline=2000) def test_mutating_methods_require_auth(self, method: str) -> None: """Property: mutating HTTP methods always require a signing identity.""" from muse.cli.commands.hub._core import _hub_api with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen") as mock_net: with pytest.raises(SystemExit): _hub_api("http://localhost:9999", _IDENTITY, method, "/api/test", body={"x": "y"} if method != "DELETE" else None) mock_net.assert_not_called() # =========================================================================== # Tier 5 — Regression (pinned non-regressions) # =========================================================================== class TestHubApiRegression: """Pinned regressions: previously broken cases must stay fixed.""" def test_R1_hub_issue_read_no_ssl_error_on_localhost(self) -> None: """R1: Reading an issue from https://localhost must not raise SSL error.""" from muse.cli.commands.hub._core import _hub_api ssl_errors: list[Exception] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: # Simulate what happens WITHOUT a proper context — verify the fix # prevents this from reaching urlopen with the system CA store. if context is None: ssl_errors.append( ssl.SSLCertVerificationError("CERTIFICATE_VERIFY_FAILED") ) return _mock_resp(b'{"number": 5}') with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api(_HUB_LOCAL, _IDENTITY, "GET", "/gabriel/musehub/issues/5") assert not ssl_errors, ( f"R1 regression: _hub_api passed context=None to urlopen on localhost, " f"which would cause CERTIFICATE_VERIFY_FAILED: {ssl_errors}" ) def test_R2_public_issue_read_no_exit_without_identity(self) -> None: """R2: hub issue read on public repo must NOT exit 1 when no identity set.""" from muse.cli.commands.hub._core import _hub_api with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", return_value=_mock_resp(b'{"number": 5}')): # Must not raise SystemExit result = _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/issues/5") assert result == {"number": 5} def test_R3_file_scheme_still_blocked(self) -> None: """R3: file:// scheme is still blocked regardless of signing identity.""" from muse.cli.commands.hub._core import _hub_api with patch("urllib.request.urlopen") as mock_net: with pytest.raises(SystemExit): _hub_api("file:///etc/passwd", _IDENTITY, "GET", "/test") mock_net.assert_not_called() def test_R4_response_size_cap_still_enforced(self) -> None: """R4: size cap still applies on unauthenticated GET responses.""" from muse.cli.commands.hub._core import _hub_api, _MAX_API_RESPONSE_BYTES big_resp = _mock_resp(b"x" * (_MAX_API_RESPONSE_BYTES + 2)) with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", return_value=big_resp): with pytest.raises(SystemExit): _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") def test_R5_existing_auth_tests_unaffected(self) -> None: """R5: authenticated calls continue to include MSign header after the fix.""" from muse.cli.commands.hub._core import _hub_api headers_seen: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: headers_seen.append(dict(req.headers)) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") assert headers_seen[0].get("Authorization", "").startswith("MSign ") # =========================================================================== # Tier 6 — Security # =========================================================================== class TestHubApiSecurity: """Security invariants for the two fixes.""" def test_no_auth_header_leaked_on_unauthenticated_get(self) -> None: """Auth header must be completely absent — not empty — on public GET.""" from muse.cli.commands.hub._core import _hub_api headers_seen: list[dict] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: headers_seen.append(dict(req.headers)) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") h = headers_seen[0] assert "Authorization" not in h assert "authorization" not in {k.lower() for k in h} def test_ansi_in_http_error_body_still_sanitized( self, capsys: pytest.CaptureFixture[str] ) -> None: """ANSI sequences in error bodies are sanitized on unauthenticated errors.""" from muse.cli.commands.hub._core import _hub_api ansi_body = b'{"detail": "\\x1b[31merror\\x1b[0m"}' exc = urllib.error.HTTPError( url="", code=401, msg="Unauthorized", hdrs=MagicMock(), fp=io.BytesIO(ansi_body), ) with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", side_effect=exc): with pytest.raises(SystemExit): _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") captured = capsys.readouterr() assert "\x1b[" not in captured.err def test_ssl_context_does_not_disable_verification(self) -> None: """The custom SSL context must not set CERT_NONE — it loads a CA, not disables checking.""" from muse.cli.commands.hub._core import _hub_api captured_ctx: list[ssl.SSLContext | None] = [] def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: captured_ctx.append(context) return _mock_resp() with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): _hub_api(_HUB_LOCAL, _IDENTITY, "GET", "/api/test") ctx = captured_ctx[0] if ctx is not None: assert ctx.verify_mode != ssl.CERT_NONE, ( "SSL context must not disable verification — " "use a CA cert file, not ssl.CERT_NONE" ) def test_ssrf_scheme_blocking_unaffected_by_public_get_change(self) -> None: """The public-GET change must not relax SSRF scheme blocking.""" from muse.cli.commands.hub._core import _hub_api for scheme in ("file", "ftp", "javascript", "data"): with patch("urllib.request.urlopen") as mock_net: with pytest.raises(SystemExit): _hub_api(f"{scheme}://evil.example.com", _IDENTITY, "GET", "/api/test") mock_net.assert_not_called() # =========================================================================== # Tier 7 — Stress # =========================================================================== class TestHubApiStress: """Concurrent calls to _hub_api must not deadlock or corrupt state.""" def test_concurrent_unauthenticated_gets_no_deadlock(self) -> None: """8 threads calling _hub_api(GET, no auth) concurrently — no deadlock.""" from muse.cli.commands.hub._core import _hub_api results: list[dict | Exception] = [] lock = threading.Lock() def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: return _mock_resp(b'{"ok": true}') def worker() -> None: try: with patch("muse.cli.config.get_signing_identity", return_value=None): with patch("urllib.request.urlopen", side_effect=fake_urlopen): r = _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") with lock: results.append(r) except Exception as exc: with lock: results.append(exc) threads = [threading.Thread(target=worker) for _ in range(8)] for t in threads: t.start() for t in threads: t.join(timeout=5) assert len(results) == 8, f"Only {len(results)}/8 threads completed" errors = [r for r in results if isinstance(r, Exception)] assert not errors, f"Thread errors: {errors}" def test_concurrent_authenticated_gets_no_deadlock(self) -> None: """8 threads calling _hub_api(GET, with auth) concurrently — no deadlock.""" from muse.cli.commands.hub._core import _hub_api results: list[dict | Exception] = [] lock = threading.Lock() def fake_urlopen(req: urllib.request.Request, timeout: float = 10, context: ssl.SSLContext | None = None) -> MagicMock: return _mock_resp(b'{"ok": true}') def worker() -> None: try: with patch("muse.cli.config.get_signing_identity", return_value=_make_signing()): with patch("urllib.request.urlopen", side_effect=fake_urlopen): r = _hub_api("http://localhost:9999", _IDENTITY, "GET", "/api/test") with lock: results.append(r) except Exception as exc: with lock: results.append(exc) threads = [threading.Thread(target=worker) for _ in range(8)] for t in threads: t.start() for t in threads: t.join(timeout=5) assert len(results) == 8 errors = [r for r in results if isinstance(r, Exception)] assert not errors, f"Thread errors: {errors}"