"""Phase 2.5 — Credential and secret leakage tests. Attack surface -------------- A malicious or compromised remote server receives ``Authorization: MSign`` and echoes the token in its HTTP error response body. Without hardening, that echoed value propagates through ``TransportError.__str__()`` to every display callsite — landing on stderr where any agent log-scraper reads it. Layers defended --------------- **Layer 1 — ``_http_error_message`` (transport.py)** - HTTP 401: NEVER includes the server error body. Always returns the generic "Authentication failed" string — the one status code where the server provably received the signing credential. - Other HTTP errors: include the response body (capped at 200 chars) after stripping all C0/C1 control characters (ESC, BEL, CSI, NUL…). This blocks ANSI/OSC terminal injection while preserving diagnostic text. - ``URLError.reason``: sanitized before embedding in ``TransportError``. **Layer 2 — display callsites** All ``print(f"... {exc} ...")`` paths in network-facing commands wrap ``str(exc)`` with ``sanitize_display``, providing a second layer against any residual control characters. Specific paths verified ----------------------- - ``transport.py::_http_error_message`` — unit tests - ``transport.py::HttpTransport._execute`` — mock HTTP integration - ``transport.py::HttpTransport._execute_fetch`` — mock HTTP integration - Display callsite chain (sanitize_display ∘ TransportError.__str__) - ``auth.py::_json_post`` error paths — ANSI injection - ``whoami._mask_token`` — masking semantics - ``sanitize_display`` — control char stripping - ``ls_remote`` error path — mock integration """ from __future__ import annotations import io import sys import urllib.error import urllib.request from http.client import HTTPMessage from unittest.mock import MagicMock, patch import pytest # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _FAKE_TOKEN = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test.signature_abc" # ESC is 0x1b — stripped by sanitize_display; remaining "[31m..." is harmless text _ANSI_ESC = "\x1b[31mRED\x1b[0m" # OSC 8 hyperlink — ESC (0x1b) and BEL (0x07) stripped; surrounding text remains _OSC_LINK = "\x1b]8;;http://attacker.example.com\x07click\x1b]8;;\x07" _BEL = "\x07ring" def _make_http_error(code: int, body: str, url: str = "http://hub.test/api") -> urllib.error.HTTPError: """Construct an ``urllib.error.HTTPError`` with a controlled body.""" fp = io.BytesIO(body.encode("utf-8")) return urllib.error.HTTPError(url, code, f"HTTP Error {code}", HTTPMessage(), fp) # --------------------------------------------------------------------------- # 1. _http_error_message — unit tests # --------------------------------------------------------------------------- class TestHttpErrorMessage: """``_http_error_message`` is the single safe formatter for HTTP errors.""" def _call(self, code: int, body: str) -> str: from muse.core.transport import _http_error_message return _http_error_message(_make_http_error(code, body)) # ---- HTTP 401 — server MAY have echoed the signing credential ---- def test_401_never_includes_body_with_token(self) -> None: """A 401 body that explicitly echoes the token must NEVER appear.""" result = self._call(401, f"Token {_FAKE_TOKEN} is invalid.") assert _FAKE_TOKEN not in result def test_401_always_returns_generic_message(self) -> None: """Every 401 body maps to the same generic string — no content variation.""" bodies = [ f"Authorization: MSign {_FAKE_TOKEN}", "Unauthorized", "", "X" * 1000, _ANSI_ESC, ] messages = {self._call(401, b) for b in bodies} assert len(messages) == 1, f"Multiple messages for 401: {messages}" def test_401_contains_authentication_failed(self) -> None: assert "Authentication failed" in self._call(401, "anything") def test_401_status_text_not_in_result(self) -> None: """No server-controlled content appears in a 401 result.""" server_text = "your_secret_data" result = self._call(401, server_text) assert server_text not in result # ---- Non-401 codes — body included but sanitized ---- def test_500_includes_status_code(self) -> None: assert "500" in self._call(500, "Internal server error") def test_500_includes_sanitized_body_text(self) -> None: result = self._call(500, "Internal server error") assert "Internal server error" in result def test_500_strips_esc_from_body(self) -> None: """ESC (0x1b) stripped — ANSI sequences cannot fire in terminals.""" result = self._call(500, _ANSI_ESC) assert "\x1b" not in result def test_500_strips_bel_from_body(self) -> None: """BEL (0x07) stripped — prevents terminal bell injection.""" result = self._call(500, _OSC_LINK) assert "\x07" not in result def test_500_body_capped_at_200_chars(self) -> None: """Error body is capped to bound output size.""" result = self._call(500, "X" * 500) body_part = result.replace("HTTP 500: ", "") assert len(body_part) <= 200 def test_500_empty_body_returns_just_status(self) -> None: assert self._call(500, "") == "HTTP 500" def test_404_body_included(self) -> None: result = self._call(404, "Not found") assert "404" in result assert "Not found" in result def test_403_body_included(self) -> None: """403 is NOT treated like 401 — body shown (sanitized).""" result = self._call(403, "Forbidden resource") assert "403" in result assert "Forbidden resource" in result def test_ansi_in_any_non_401_body_strips_esc(self) -> None: for code in (400, 403, 404, 409, 500, 502, 503): result = self._call(code, _ANSI_ESC) assert "\x1b" not in result, f"ESC in {code} result: {result!r}" def test_bel_in_any_non_401_body_stripped(self) -> None: for code in (400, 403, 404, 409, 500, 502, 503): result = self._call(code, _OSC_LINK) assert "\x07" not in result, f"BEL in {code} result: {result!r}" # --------------------------------------------------------------------------- # 2. HttpTransport._execute — mock HTTP server integration # --------------------------------------------------------------------------- class TestHttpTransportExecute: """``_execute`` converts HTTP errors to safe ``TransportError`` instances.""" def test_401_transport_error_no_token(self) -> None: from muse.core.transport import HttpTransport, TransportError err = _make_http_error(401, f"token={_FAKE_TOKEN} is revoked") req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute(req) msg = str(exc_info.value) assert _FAKE_TOKEN not in msg assert "Authentication failed" in msg assert exc_info.value.status_code == 401 def test_500_includes_sanitized_body(self) -> None: from muse.core.transport import HttpTransport, TransportError err = _make_http_error(500, "Database error on shard-42") req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute(req) msg = str(exc_info.value) assert "500" in msg assert "Database error" in msg assert "\x1b" not in msg def test_500_ansi_in_body_strips_esc(self) -> None: from muse.core.transport import HttpTransport, TransportError err = _make_http_error(500, _ANSI_ESC) req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute(req) assert "\x1b" not in str(exc_info.value) def test_url_error_reason_esc_stripped(self) -> None: from muse.core.transport import HttpTransport, TransportError url_err = urllib.error.URLError(f"Connection refused {_ANSI_ESC}") req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=url_err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute(req) assert "\x1b" not in str(exc_info.value) class TestHttpTransportExecuteFetch: """``_execute_fetch`` applies the same safety guarantees as ``_execute``.""" def test_401_body_with_token_not_in_error(self) -> None: from muse.core.transport import HttpTransport, TransportError malicious_body = f"Your token {_FAKE_TOKEN} has been revoked." err = _make_http_error(401, malicious_body) req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute_fetch(req) assert _FAKE_TOKEN not in str(exc_info.value) def test_500_esc_stripped(self) -> None: from muse.core.transport import HttpTransport, TransportError err = _make_http_error(500, _ANSI_ESC) req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute_fetch(req) assert "\x1b" not in str(exc_info.value) # --------------------------------------------------------------------------- # 3. Display callsite hardening — sanitize_display ∘ TransportError.__str__ # --------------------------------------------------------------------------- class TestDisplayCallsiteChain: """The ``sanitize_display(str(exc))`` chain works correctly for all network errors.""" def _make_exc(self, msg: str, code: int = 500) -> "Exception": from muse.core.transport import TransportError return TransportError(msg, code) def test_ansi_in_transport_error_stripped_at_display(self) -> None: from muse.core.validation import sanitize_display exc = self._make_exc(f"HTTP 500: {_ANSI_ESC}") displayed = sanitize_display(str(exc)) assert "\x1b" not in displayed def test_osc_bel_stripped_at_display(self) -> None: from muse.core.validation import sanitize_display exc = self._make_exc(f"HTTP 500: {_OSC_LINK}") displayed = sanitize_display(str(exc)) assert "\x1b" not in displayed assert "\x07" not in displayed def test_generic_auth_message_passes_through_unchanged(self) -> None: from muse.core.validation import sanitize_display from muse.core.transport import TransportError generic = "Authentication failed (HTTP 401). Run 'muse auth register'." exc = TransportError(generic, 401) assert sanitize_display(str(exc)) == generic def test_token_value_not_stripped_by_sanitize_display(self) -> None: """sanitize_display only strips control chars — the 401 guard prevents token values from reaching TransportError in the first place.""" from muse.core.validation import sanitize_display assert sanitize_display(_FAKE_TOKEN) == _FAKE_TOKEN def test_all_http_error_codes_strip_esc(self) -> None: from muse.core.transport import HttpTransport, TransportError for code in (400, 403, 404, 409, 500, 502, 503): body = f"Error on server with {_ANSI_ESC}" err = _make_http_error(code, body) req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute(req) msg = str(exc_info.value) assert "\x1b" not in msg, f"ESC leaked for HTTP {code}: {msg!r}" assert "\x07" not in msg, f"BEL leaked for HTTP {code}: {msg!r}" # --------------------------------------------------------------------------- # 4. auth.py _json_post — ANSI injection in server error body # --------------------------------------------------------------------------- class TestAuthJsonPostSanitization: """``auth._json_post`` sanitizes server-controlled error content before display.""" def test_ansi_in_http_error_body_stripped(self) -> None: import muse.cli.commands.auth as auth_mod err = _make_http_error(400, f"Bad request {_ANSI_ESC}") captured = io.StringIO() with patch.object(sys, "stderr", captured), \ patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=err): with pytest.raises(SystemExit): auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"}) assert "\x1b" not in captured.getvalue() def test_osc_in_http_error_body_stripped(self) -> None: import muse.cli.commands.auth as auth_mod err = _make_http_error(400, _OSC_LINK) captured = io.StringIO() with patch.object(sys, "stderr", captured), \ patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=err): with pytest.raises(SystemExit): auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"}) assert "\x07" not in captured.getvalue() def test_url_error_reason_sanitized(self) -> None: import muse.cli.commands.auth as auth_mod url_err = urllib.error.URLError(f"DNS failure {_ANSI_ESC}") captured = io.StringIO() with patch.object(sys, "stderr", captured), \ patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=url_err): with pytest.raises(SystemExit): auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"}) assert "\x1b" not in captured.getvalue() # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # 5. sanitize_display — control character stripping semantics # --------------------------------------------------------------------------- class TestSanitizeDisplay: """``sanitize_display`` strips C0/C1 control bytes, preserving text.""" def _sanitize(self, s: str) -> str: from muse.core.validation import sanitize_display return sanitize_display(s) def test_esc_stripped_from_ansi_sequence(self) -> None: """ESC (0x1b) is stripped; the surrounding bracket text remains harmless.""" result = self._sanitize(_ANSI_ESC) assert "\x1b" not in result assert "RED" in result # display text preserved def test_esc_stripped_from_osc_sequence(self) -> None: result = self._sanitize(_OSC_LINK) assert "\x1b" not in result def test_bel_stripped(self) -> None: """BEL (0x07) stripped.""" assert "\x07" not in self._sanitize(_BEL) assert "ring" in self._sanitize(_BEL) def test_null_byte_stripped(self) -> None: result = self._sanitize("a\x00b") assert "\x00" not in result assert "a" in result and "b" in result def test_csi_byte_stripped(self) -> None: """C1 CSI (0x9b) stripped.""" csi = "\x9b31mRED\x9bm" result = self._sanitize(csi) assert "\x9b" not in result def test_newline_preserved(self) -> None: assert "\n" in self._sanitize("line1\nline2") def test_tab_preserved(self) -> None: assert "\t" in self._sanitize("col1\tcol2") def test_normal_ascii_unchanged(self) -> None: text = "HTTP 500: Internal server error on shard-42" assert self._sanitize(text) == text def test_token_value_not_stripped(self) -> None: """sanitize_display does NOT strip token values — only C0/C1 control bytes. Token leakage prevention is handled at the TransportError creation layer.""" assert self._sanitize(_FAKE_TOKEN) == _FAKE_TOKEN def test_empty_string(self) -> None: assert self._sanitize("") == "" def test_unicode_preserved(self) -> None: assert self._sanitize("héllo wörld 中文") == "héllo wörld 中文" def test_all_c0_control_bytes_stripped_except_tab_and_lf(self) -> None: """Every C0 control byte 0x00–0x1f is stripped except HT (0x09) and LF (0x0a).""" for code in range(0, 32): if code in (9, 10): # HT and LF are preserved continue char = chr(code) result = self._sanitize(f"before{char}after") assert char not in result, f"C0 byte 0x{code:02x} not stripped" def test_del_stripped(self) -> None: assert chr(0x7f) not in self._sanitize(f"a{chr(0x7f)}b") def test_c1_control_bytes_stripped(self) -> None: """C1 control bytes 0x80–0x9f stripped.""" for code in range(0x80, 0xa0): char = chr(code) result = self._sanitize(f"x{char}y") assert char not in result, f"C1 byte 0x{code:02x} not stripped" # --------------------------------------------------------------------------- # 7. ls_remote — error display sanitized # --------------------------------------------------------------------------- class TestLsRemoteErrorSanitized: """``ls_remote`` never leaks token or ANSI in error output.""" def test_ansi_in_transport_error_stripped(self, tmp_path: "pathlib.Path") -> None: import pathlib import muse.cli.commands.ls_remote as ls_mod from muse.core.transport import TransportError exc = TransportError(f"HTTP 500: {_ANSI_ESC}", 500) captured = io.StringIO() with patch.object(sys, "stderr", captured), \ patch.object(ls_mod, "find_repo_root", return_value=tmp_path), \ patch.object(ls_mod, "get_remote", return_value="http://hub.test"), \ patch.object(ls_mod, "get_signing_identity", return_value=_FAKE_TOKEN), \ patch.object(ls_mod, "HttpTransport") as mock_t: mock_t.return_value.fetch_remote_info.side_effect = exc args = MagicMock() args.remote = "local" args.fmt = "json" try: ls_mod.run(args) except SystemExit: pass assert "\x1b" not in captured.getvalue() def test_401_token_echo_not_in_stderr(self, tmp_path: "pathlib.Path") -> None: import muse.cli.commands.ls_remote as ls_mod from muse.core.transport import TransportError exc = TransportError("Authentication failed (HTTP 401). Run 'muse auth register'.", 401) captured = io.StringIO() with patch.object(sys, "stderr", captured), \ patch.object(ls_mod, "find_repo_root", return_value=tmp_path), \ patch.object(ls_mod, "get_remote", return_value="http://hub.test"), \ patch.object(ls_mod, "get_signing_identity", return_value=_FAKE_TOKEN), \ patch.object(ls_mod, "HttpTransport") as mock_t: mock_t.return_value.fetch_remote_info.side_effect = exc args = MagicMock() args.remote = "local" args.fmt = "json" try: ls_mod.run(args) except SystemExit: pass assert _FAKE_TOKEN not in captured.getvalue() # --------------------------------------------------------------------------- # 8. End-to-end: 401 body with token echo — full chain # --------------------------------------------------------------------------- class TestEndToEnd401TokenEcho: """Full chain: malicious 401 body → _http_error_message → TransportError → display.""" def test_401_token_echo_fully_suppressed_execute(self) -> None: from muse.core.transport import HttpTransport, TransportError malicious_body = f"Access denied. Token: {_FAKE_TOKEN}" err = _make_http_error(401, malicious_body) req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute(req) full_output = str(exc_info.value) assert _FAKE_TOKEN not in full_output assert malicious_body not in full_output def test_401_token_echo_fully_suppressed_execute_fetch(self) -> None: from muse.core.transport import HttpTransport, TransportError malicious_body = f"Your token {_FAKE_TOKEN} has been revoked." err = _make_http_error(401, malicious_body) req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute_fetch(req) assert _FAKE_TOKEN not in str(exc_info.value) def test_all_non_401_codes_strip_control_chars(self) -> None: from muse.core.transport import HttpTransport, TransportError from muse.core.validation import sanitize_display for code in (400, 403, 404, 409, 500, 502, 503): body = f"Error on server with {_ANSI_ESC} and {_OSC_LINK}" err = _make_http_error(code, body) req = urllib.request.Request("http://hub.test/api") with patch("muse.core.transport._open_url", side_effect=err): with pytest.raises(TransportError) as exc_info: HttpTransport()._execute(req) raw_msg = str(exc_info.value) displayed = sanitize_display(raw_msg) assert "\x1b" not in displayed, f"ESC in HTTP {code} output: {displayed!r}" assert "\x07" not in displayed, f"BEL in HTTP {code} output: {displayed!r}"