test_security_credential_leakage.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
| 1 | """Phase 2.5 — Credential and secret leakage tests. |
| 2 | |
| 3 | Attack surface |
| 4 | -------------- |
| 5 | A malicious or compromised remote server receives ``Authorization: MSign`` |
| 6 | and echoes the token in its HTTP error response body. Without hardening, that |
| 7 | echoed value propagates through ``TransportError.__str__()`` to every display |
| 8 | callsite — landing on stderr where any agent log-scraper reads it. |
| 9 | |
| 10 | Layers defended |
| 11 | --------------- |
| 12 | **Layer 1 — ``_http_error_message`` (transport.py)** |
| 13 | |
| 14 | - HTTP 401: NEVER includes the server error body. Always returns the generic |
| 15 | "Authentication failed" string — the one status code where the server |
| 16 | provably received the signing credential. |
| 17 | - Other HTTP errors: include the response body (capped at 200 chars) after |
| 18 | stripping all C0/C1 control characters (ESC, BEL, CSI, NUL…). This blocks |
| 19 | ANSI/OSC terminal injection while preserving diagnostic text. |
| 20 | - ``URLError.reason``: sanitized before embedding in ``TransportError``. |
| 21 | |
| 22 | **Layer 2 — display callsites** |
| 23 | |
| 24 | All ``print(f"... {exc} ...")`` paths in network-facing commands wrap |
| 25 | ``str(exc)`` with ``sanitize_display``, providing a second layer against any |
| 26 | residual control characters. |
| 27 | |
| 28 | Specific paths verified |
| 29 | ----------------------- |
| 30 | - ``transport.py::_http_error_message`` — unit tests |
| 31 | - ``transport.py::HttpTransport._execute`` — mock HTTP integration |
| 32 | - ``transport.py::HttpTransport._execute_fetch`` — mock HTTP integration |
| 33 | - Display callsite chain (sanitize_display ∘ TransportError.__str__) |
| 34 | - ``auth.py::_json_post`` error paths — ANSI injection |
| 35 | - ``whoami._mask_token`` — masking semantics |
| 36 | - ``sanitize_display`` — control char stripping |
| 37 | - ``ls_remote`` error path — mock integration |
| 38 | """ |
| 39 | from __future__ import annotations |
| 40 | |
| 41 | import io |
| 42 | import sys |
| 43 | import urllib.error |
| 44 | import urllib.request |
| 45 | from http.client import HTTPMessage |
| 46 | from unittest.mock import MagicMock, patch |
| 47 | |
| 48 | import pytest |
| 49 | |
| 50 | |
| 51 | # --------------------------------------------------------------------------- |
| 52 | # Constants |
| 53 | # --------------------------------------------------------------------------- |
| 54 | |
| 55 | _FAKE_TOKEN = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test.signature_abc" |
| 56 | # ESC is 0x1b — stripped by sanitize_display; remaining "[31m..." is harmless text |
| 57 | _ANSI_ESC = "\x1b[31mRED\x1b[0m" |
| 58 | # OSC 8 hyperlink — ESC (0x1b) and BEL (0x07) stripped; surrounding text remains |
| 59 | _OSC_LINK = "\x1b]8;;http://attacker.example.com\x07click\x1b]8;;\x07" |
| 60 | _BEL = "\x07ring" |
| 61 | |
| 62 | |
| 63 | def _make_http_error(code: int, body: str, url: str = "http://hub.test/api") -> urllib.error.HTTPError: |
| 64 | """Construct an ``urllib.error.HTTPError`` with a controlled body.""" |
| 65 | fp = io.BytesIO(body.encode("utf-8")) |
| 66 | return urllib.error.HTTPError(url, code, f"HTTP Error {code}", HTTPMessage(), fp) |
| 67 | |
| 68 | |
| 69 | # --------------------------------------------------------------------------- |
| 70 | # 1. _http_error_message — unit tests |
| 71 | # --------------------------------------------------------------------------- |
| 72 | |
| 73 | class TestHttpErrorMessage: |
| 74 | """``_http_error_message`` is the single safe formatter for HTTP errors.""" |
| 75 | |
| 76 | def _call(self, code: int, body: str) -> str: |
| 77 | from muse.core.transport import _http_error_message |
| 78 | return _http_error_message(_make_http_error(code, body)) |
| 79 | |
| 80 | # ---- HTTP 401 — server MAY have echoed the signing credential ---- |
| 81 | |
| 82 | def test_401_never_includes_body_with_token(self) -> None: |
| 83 | """A 401 body that explicitly echoes the token must NEVER appear.""" |
| 84 | result = self._call(401, f"Token {_FAKE_TOKEN} is invalid.") |
| 85 | assert _FAKE_TOKEN not in result |
| 86 | |
| 87 | def test_401_always_returns_generic_message(self) -> None: |
| 88 | """Every 401 body maps to the same generic string — no content variation.""" |
| 89 | bodies = [ |
| 90 | f"Authorization: MSign {_FAKE_TOKEN}", |
| 91 | "Unauthorized", |
| 92 | "", |
| 93 | "X" * 1000, |
| 94 | _ANSI_ESC, |
| 95 | ] |
| 96 | messages = {self._call(401, b) for b in bodies} |
| 97 | assert len(messages) == 1, f"Multiple messages for 401: {messages}" |
| 98 | |
| 99 | def test_401_contains_authentication_failed(self) -> None: |
| 100 | assert "Authentication failed" in self._call(401, "anything") |
| 101 | |
| 102 | def test_401_status_text_not_in_result(self) -> None: |
| 103 | """No server-controlled content appears in a 401 result.""" |
| 104 | server_text = "your_secret_data" |
| 105 | result = self._call(401, server_text) |
| 106 | assert server_text not in result |
| 107 | |
| 108 | # ---- Non-401 codes — body included but sanitized ---- |
| 109 | |
| 110 | def test_500_includes_status_code(self) -> None: |
| 111 | assert "500" in self._call(500, "Internal server error") |
| 112 | |
| 113 | def test_500_includes_sanitized_body_text(self) -> None: |
| 114 | result = self._call(500, "Internal server error") |
| 115 | assert "Internal server error" in result |
| 116 | |
| 117 | def test_500_strips_esc_from_body(self) -> None: |
| 118 | """ESC (0x1b) stripped — ANSI sequences cannot fire in terminals.""" |
| 119 | result = self._call(500, _ANSI_ESC) |
| 120 | assert "\x1b" not in result |
| 121 | |
| 122 | def test_500_strips_bel_from_body(self) -> None: |
| 123 | """BEL (0x07) stripped — prevents terminal bell injection.""" |
| 124 | result = self._call(500, _OSC_LINK) |
| 125 | assert "\x07" not in result |
| 126 | |
| 127 | def test_500_body_capped_at_200_chars(self) -> None: |
| 128 | """Error body is capped to bound output size.""" |
| 129 | result = self._call(500, "X" * 500) |
| 130 | body_part = result.replace("HTTP 500: ", "") |
| 131 | assert len(body_part) <= 200 |
| 132 | |
| 133 | def test_500_empty_body_returns_just_status(self) -> None: |
| 134 | assert self._call(500, "") == "HTTP 500" |
| 135 | |
| 136 | def test_404_body_included(self) -> None: |
| 137 | result = self._call(404, "Not found") |
| 138 | assert "404" in result |
| 139 | assert "Not found" in result |
| 140 | |
| 141 | def test_403_body_included(self) -> None: |
| 142 | """403 is NOT treated like 401 — body shown (sanitized).""" |
| 143 | result = self._call(403, "Forbidden resource") |
| 144 | assert "403" in result |
| 145 | assert "Forbidden resource" in result |
| 146 | |
| 147 | def test_ansi_in_any_non_401_body_strips_esc(self) -> None: |
| 148 | for code in (400, 403, 404, 409, 500, 502, 503): |
| 149 | result = self._call(code, _ANSI_ESC) |
| 150 | assert "\x1b" not in result, f"ESC in {code} result: {result!r}" |
| 151 | |
| 152 | def test_bel_in_any_non_401_body_stripped(self) -> None: |
| 153 | for code in (400, 403, 404, 409, 500, 502, 503): |
| 154 | result = self._call(code, _OSC_LINK) |
| 155 | assert "\x07" not in result, f"BEL in {code} result: {result!r}" |
| 156 | |
| 157 | |
| 158 | # --------------------------------------------------------------------------- |
| 159 | # 2. HttpTransport._execute — mock HTTP server integration |
| 160 | # --------------------------------------------------------------------------- |
| 161 | |
| 162 | class TestHttpTransportExecute: |
| 163 | """``_execute`` converts HTTP errors to safe ``TransportError`` instances.""" |
| 164 | |
| 165 | def test_401_transport_error_no_token(self) -> None: |
| 166 | from muse.core.transport import HttpTransport, TransportError |
| 167 | |
| 168 | err = _make_http_error(401, f"token={_FAKE_TOKEN} is revoked") |
| 169 | req = urllib.request.Request("http://hub.test/api") |
| 170 | |
| 171 | with patch("muse.core.transport._open_url", side_effect=err): |
| 172 | with pytest.raises(TransportError) as exc_info: |
| 173 | HttpTransport()._execute(req) |
| 174 | |
| 175 | msg = str(exc_info.value) |
| 176 | assert _FAKE_TOKEN not in msg |
| 177 | assert "Authentication failed" in msg |
| 178 | assert exc_info.value.status_code == 401 |
| 179 | |
| 180 | def test_500_includes_sanitized_body(self) -> None: |
| 181 | from muse.core.transport import HttpTransport, TransportError |
| 182 | |
| 183 | err = _make_http_error(500, "Database error on shard-42") |
| 184 | req = urllib.request.Request("http://hub.test/api") |
| 185 | |
| 186 | with patch("muse.core.transport._open_url", side_effect=err): |
| 187 | with pytest.raises(TransportError) as exc_info: |
| 188 | HttpTransport()._execute(req) |
| 189 | |
| 190 | msg = str(exc_info.value) |
| 191 | assert "500" in msg |
| 192 | assert "Database error" in msg |
| 193 | assert "\x1b" not in msg |
| 194 | |
| 195 | def test_500_ansi_in_body_strips_esc(self) -> None: |
| 196 | from muse.core.transport import HttpTransport, TransportError |
| 197 | |
| 198 | err = _make_http_error(500, _ANSI_ESC) |
| 199 | req = urllib.request.Request("http://hub.test/api") |
| 200 | |
| 201 | with patch("muse.core.transport._open_url", side_effect=err): |
| 202 | with pytest.raises(TransportError) as exc_info: |
| 203 | HttpTransport()._execute(req) |
| 204 | |
| 205 | assert "\x1b" not in str(exc_info.value) |
| 206 | |
| 207 | def test_url_error_reason_esc_stripped(self) -> None: |
| 208 | from muse.core.transport import HttpTransport, TransportError |
| 209 | |
| 210 | url_err = urllib.error.URLError(f"Connection refused {_ANSI_ESC}") |
| 211 | req = urllib.request.Request("http://hub.test/api") |
| 212 | |
| 213 | with patch("muse.core.transport._open_url", side_effect=url_err): |
| 214 | with pytest.raises(TransportError) as exc_info: |
| 215 | HttpTransport()._execute(req) |
| 216 | |
| 217 | assert "\x1b" not in str(exc_info.value) |
| 218 | |
| 219 | |
| 220 | class TestHttpTransportExecuteFetch: |
| 221 | """``_execute_fetch`` applies the same safety guarantees as ``_execute``.""" |
| 222 | |
| 223 | def test_401_body_with_token_not_in_error(self) -> None: |
| 224 | from muse.core.transport import HttpTransport, TransportError |
| 225 | |
| 226 | malicious_body = f"Your token {_FAKE_TOKEN} has been revoked." |
| 227 | err = _make_http_error(401, malicious_body) |
| 228 | req = urllib.request.Request("http://hub.test/api") |
| 229 | |
| 230 | with patch("muse.core.transport._open_url", side_effect=err): |
| 231 | with pytest.raises(TransportError) as exc_info: |
| 232 | HttpTransport()._execute_fetch(req) |
| 233 | |
| 234 | assert _FAKE_TOKEN not in str(exc_info.value) |
| 235 | |
| 236 | def test_500_esc_stripped(self) -> None: |
| 237 | from muse.core.transport import HttpTransport, TransportError |
| 238 | |
| 239 | err = _make_http_error(500, _ANSI_ESC) |
| 240 | req = urllib.request.Request("http://hub.test/api") |
| 241 | |
| 242 | with patch("muse.core.transport._open_url", side_effect=err): |
| 243 | with pytest.raises(TransportError) as exc_info: |
| 244 | HttpTransport()._execute_fetch(req) |
| 245 | |
| 246 | assert "\x1b" not in str(exc_info.value) |
| 247 | |
| 248 | |
| 249 | # --------------------------------------------------------------------------- |
| 250 | # 3. Display callsite hardening — sanitize_display ∘ TransportError.__str__ |
| 251 | # --------------------------------------------------------------------------- |
| 252 | |
| 253 | class TestDisplayCallsiteChain: |
| 254 | """The ``sanitize_display(str(exc))`` chain works correctly for all network errors.""" |
| 255 | |
| 256 | def _make_exc(self, msg: str, code: int = 500) -> "Exception": |
| 257 | from muse.core.transport import TransportError |
| 258 | return TransportError(msg, code) |
| 259 | |
| 260 | def test_ansi_in_transport_error_stripped_at_display(self) -> None: |
| 261 | from muse.core.validation import sanitize_display |
| 262 | exc = self._make_exc(f"HTTP 500: {_ANSI_ESC}") |
| 263 | displayed = sanitize_display(str(exc)) |
| 264 | assert "\x1b" not in displayed |
| 265 | |
| 266 | def test_osc_bel_stripped_at_display(self) -> None: |
| 267 | from muse.core.validation import sanitize_display |
| 268 | exc = self._make_exc(f"HTTP 500: {_OSC_LINK}") |
| 269 | displayed = sanitize_display(str(exc)) |
| 270 | assert "\x1b" not in displayed |
| 271 | assert "\x07" not in displayed |
| 272 | |
| 273 | def test_generic_auth_message_passes_through_unchanged(self) -> None: |
| 274 | from muse.core.validation import sanitize_display |
| 275 | from muse.core.transport import TransportError |
| 276 | generic = "Authentication failed (HTTP 401). Run 'muse auth register'." |
| 277 | exc = TransportError(generic, 401) |
| 278 | assert sanitize_display(str(exc)) == generic |
| 279 | |
| 280 | def test_token_value_not_stripped_by_sanitize_display(self) -> None: |
| 281 | """sanitize_display only strips control chars — the 401 guard prevents |
| 282 | token values from reaching TransportError in the first place.""" |
| 283 | from muse.core.validation import sanitize_display |
| 284 | assert sanitize_display(_FAKE_TOKEN) == _FAKE_TOKEN |
| 285 | |
| 286 | def test_all_http_error_codes_strip_esc(self) -> None: |
| 287 | from muse.core.transport import HttpTransport, TransportError |
| 288 | for code in (400, 403, 404, 409, 500, 502, 503): |
| 289 | body = f"Error on server with {_ANSI_ESC}" |
| 290 | err = _make_http_error(code, body) |
| 291 | req = urllib.request.Request("http://hub.test/api") |
| 292 | with patch("muse.core.transport._open_url", side_effect=err): |
| 293 | with pytest.raises(TransportError) as exc_info: |
| 294 | HttpTransport()._execute(req) |
| 295 | msg = str(exc_info.value) |
| 296 | assert "\x1b" not in msg, f"ESC leaked for HTTP {code}: {msg!r}" |
| 297 | assert "\x07" not in msg, f"BEL leaked for HTTP {code}: {msg!r}" |
| 298 | |
| 299 | |
| 300 | # --------------------------------------------------------------------------- |
| 301 | # 4. auth.py _json_post — ANSI injection in server error body |
| 302 | # --------------------------------------------------------------------------- |
| 303 | |
| 304 | class TestAuthJsonPostSanitization: |
| 305 | """``auth._json_post`` sanitizes server-controlled error content before display.""" |
| 306 | |
| 307 | def test_ansi_in_http_error_body_stripped(self) -> None: |
| 308 | import muse.cli.commands.auth as auth_mod |
| 309 | |
| 310 | err = _make_http_error(400, f"Bad request {_ANSI_ESC}") |
| 311 | captured = io.StringIO() |
| 312 | |
| 313 | with patch.object(sys, "stderr", captured), \ |
| 314 | patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=err): |
| 315 | with pytest.raises(SystemExit): |
| 316 | auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"}) |
| 317 | |
| 318 | assert "\x1b" not in captured.getvalue() |
| 319 | |
| 320 | def test_osc_in_http_error_body_stripped(self) -> None: |
| 321 | import muse.cli.commands.auth as auth_mod |
| 322 | |
| 323 | err = _make_http_error(400, _OSC_LINK) |
| 324 | captured = io.StringIO() |
| 325 | |
| 326 | with patch.object(sys, "stderr", captured), \ |
| 327 | patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=err): |
| 328 | with pytest.raises(SystemExit): |
| 329 | auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"}) |
| 330 | |
| 331 | assert "\x07" not in captured.getvalue() |
| 332 | |
| 333 | def test_url_error_reason_sanitized(self) -> None: |
| 334 | import muse.cli.commands.auth as auth_mod |
| 335 | |
| 336 | url_err = urllib.error.URLError(f"DNS failure {_ANSI_ESC}") |
| 337 | captured = io.StringIO() |
| 338 | |
| 339 | with patch.object(sys, "stderr", captured), \ |
| 340 | patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=url_err): |
| 341 | with pytest.raises(SystemExit): |
| 342 | auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"}) |
| 343 | |
| 344 | assert "\x1b" not in captured.getvalue() |
| 345 | |
| 346 | |
| 347 | # --------------------------------------------------------------------------- |
| 348 | # --------------------------------------------------------------------------- |
| 349 | # 5. sanitize_display — control character stripping semantics |
| 350 | # --------------------------------------------------------------------------- |
| 351 | |
| 352 | class TestSanitizeDisplay: |
| 353 | """``sanitize_display`` strips C0/C1 control bytes, preserving text.""" |
| 354 | |
| 355 | def _sanitize(self, s: str) -> str: |
| 356 | from muse.core.validation import sanitize_display |
| 357 | return sanitize_display(s) |
| 358 | |
| 359 | def test_esc_stripped_from_ansi_sequence(self) -> None: |
| 360 | """ESC (0x1b) is stripped; the surrounding bracket text remains harmless.""" |
| 361 | result = self._sanitize(_ANSI_ESC) |
| 362 | assert "\x1b" not in result |
| 363 | assert "RED" in result # display text preserved |
| 364 | |
| 365 | def test_esc_stripped_from_osc_sequence(self) -> None: |
| 366 | result = self._sanitize(_OSC_LINK) |
| 367 | assert "\x1b" not in result |
| 368 | |
| 369 | def test_bel_stripped(self) -> None: |
| 370 | """BEL (0x07) stripped.""" |
| 371 | assert "\x07" not in self._sanitize(_BEL) |
| 372 | assert "ring" in self._sanitize(_BEL) |
| 373 | |
| 374 | def test_null_byte_stripped(self) -> None: |
| 375 | result = self._sanitize("a\x00b") |
| 376 | assert "\x00" not in result |
| 377 | assert "a" in result and "b" in result |
| 378 | |
| 379 | def test_csi_byte_stripped(self) -> None: |
| 380 | """C1 CSI (0x9b) stripped.""" |
| 381 | csi = "\x9b31mRED\x9bm" |
| 382 | result = self._sanitize(csi) |
| 383 | assert "\x9b" not in result |
| 384 | |
| 385 | def test_newline_preserved(self) -> None: |
| 386 | assert "\n" in self._sanitize("line1\nline2") |
| 387 | |
| 388 | def test_tab_preserved(self) -> None: |
| 389 | assert "\t" in self._sanitize("col1\tcol2") |
| 390 | |
| 391 | def test_normal_ascii_unchanged(self) -> None: |
| 392 | text = "HTTP 500: Internal server error on shard-42" |
| 393 | assert self._sanitize(text) == text |
| 394 | |
| 395 | def test_token_value_not_stripped(self) -> None: |
| 396 | """sanitize_display does NOT strip token values — only C0/C1 control bytes. |
| 397 | Token leakage prevention is handled at the TransportError creation layer.""" |
| 398 | assert self._sanitize(_FAKE_TOKEN) == _FAKE_TOKEN |
| 399 | |
| 400 | def test_empty_string(self) -> None: |
| 401 | assert self._sanitize("") == "" |
| 402 | |
| 403 | def test_unicode_preserved(self) -> None: |
| 404 | assert self._sanitize("héllo wörld 中文") == "héllo wörld 中文" |
| 405 | |
| 406 | def test_all_c0_control_bytes_stripped_except_tab_and_lf(self) -> None: |
| 407 | """Every C0 control byte 0x00–0x1f is stripped except HT (0x09) and LF (0x0a).""" |
| 408 | for code in range(0, 32): |
| 409 | if code in (9, 10): # HT and LF are preserved |
| 410 | continue |
| 411 | char = chr(code) |
| 412 | result = self._sanitize(f"before{char}after") |
| 413 | assert char not in result, f"C0 byte 0x{code:02x} not stripped" |
| 414 | |
| 415 | def test_del_stripped(self) -> None: |
| 416 | assert chr(0x7f) not in self._sanitize(f"a{chr(0x7f)}b") |
| 417 | |
| 418 | def test_c1_control_bytes_stripped(self) -> None: |
| 419 | """C1 control bytes 0x80–0x9f stripped.""" |
| 420 | for code in range(0x80, 0xa0): |
| 421 | char = chr(code) |
| 422 | result = self._sanitize(f"x{char}y") |
| 423 | assert char not in result, f"C1 byte 0x{code:02x} not stripped" |
| 424 | |
| 425 | |
| 426 | # --------------------------------------------------------------------------- |
| 427 | # 7. ls_remote — error display sanitized |
| 428 | # --------------------------------------------------------------------------- |
| 429 | |
| 430 | class TestLsRemoteErrorSanitized: |
| 431 | """``ls_remote`` never leaks token or ANSI in error output.""" |
| 432 | |
| 433 | def test_ansi_in_transport_error_stripped(self, tmp_path: "pathlib.Path") -> None: |
| 434 | import pathlib |
| 435 | import muse.cli.commands.ls_remote as ls_mod |
| 436 | from muse.core.transport import TransportError |
| 437 | |
| 438 | exc = TransportError(f"HTTP 500: {_ANSI_ESC}", 500) |
| 439 | captured = io.StringIO() |
| 440 | |
| 441 | with patch.object(sys, "stderr", captured), \ |
| 442 | patch.object(ls_mod, "find_repo_root", return_value=tmp_path), \ |
| 443 | patch.object(ls_mod, "get_remote", return_value="http://hub.test"), \ |
| 444 | patch.object(ls_mod, "get_signing_identity", return_value=_FAKE_TOKEN), \ |
| 445 | patch.object(ls_mod, "HttpTransport") as mock_t: |
| 446 | mock_t.return_value.fetch_remote_info.side_effect = exc |
| 447 | args = MagicMock() |
| 448 | args.remote = "local" |
| 449 | args.fmt = "json" |
| 450 | try: |
| 451 | ls_mod.run(args) |
| 452 | except SystemExit: |
| 453 | pass |
| 454 | |
| 455 | assert "\x1b" not in captured.getvalue() |
| 456 | |
| 457 | def test_401_token_echo_not_in_stderr(self, tmp_path: "pathlib.Path") -> None: |
| 458 | import muse.cli.commands.ls_remote as ls_mod |
| 459 | from muse.core.transport import TransportError |
| 460 | |
| 461 | exc = TransportError("Authentication failed (HTTP 401). Run 'muse auth register'.", 401) |
| 462 | captured = io.StringIO() |
| 463 | |
| 464 | with patch.object(sys, "stderr", captured), \ |
| 465 | patch.object(ls_mod, "find_repo_root", return_value=tmp_path), \ |
| 466 | patch.object(ls_mod, "get_remote", return_value="http://hub.test"), \ |
| 467 | patch.object(ls_mod, "get_signing_identity", return_value=_FAKE_TOKEN), \ |
| 468 | patch.object(ls_mod, "HttpTransport") as mock_t: |
| 469 | mock_t.return_value.fetch_remote_info.side_effect = exc |
| 470 | args = MagicMock() |
| 471 | args.remote = "local" |
| 472 | args.fmt = "json" |
| 473 | try: |
| 474 | ls_mod.run(args) |
| 475 | except SystemExit: |
| 476 | pass |
| 477 | |
| 478 | assert _FAKE_TOKEN not in captured.getvalue() |
| 479 | |
| 480 | |
| 481 | # --------------------------------------------------------------------------- |
| 482 | # 8. End-to-end: 401 body with token echo — full chain |
| 483 | # --------------------------------------------------------------------------- |
| 484 | |
| 485 | class TestEndToEnd401TokenEcho: |
| 486 | """Full chain: malicious 401 body → _http_error_message → TransportError → display.""" |
| 487 | |
| 488 | def test_401_token_echo_fully_suppressed_execute(self) -> None: |
| 489 | from muse.core.transport import HttpTransport, TransportError |
| 490 | |
| 491 | malicious_body = f"Access denied. Token: {_FAKE_TOKEN}" |
| 492 | err = _make_http_error(401, malicious_body) |
| 493 | req = urllib.request.Request("http://hub.test/api") |
| 494 | |
| 495 | with patch("muse.core.transport._open_url", side_effect=err): |
| 496 | with pytest.raises(TransportError) as exc_info: |
| 497 | HttpTransport()._execute(req) |
| 498 | |
| 499 | full_output = str(exc_info.value) |
| 500 | assert _FAKE_TOKEN not in full_output |
| 501 | assert malicious_body not in full_output |
| 502 | |
| 503 | def test_401_token_echo_fully_suppressed_execute_fetch(self) -> None: |
| 504 | from muse.core.transport import HttpTransport, TransportError |
| 505 | |
| 506 | malicious_body = f"Your token {_FAKE_TOKEN} has been revoked." |
| 507 | err = _make_http_error(401, malicious_body) |
| 508 | req = urllib.request.Request("http://hub.test/api") |
| 509 | |
| 510 | with patch("muse.core.transport._open_url", side_effect=err): |
| 511 | with pytest.raises(TransportError) as exc_info: |
| 512 | HttpTransport()._execute_fetch(req) |
| 513 | |
| 514 | assert _FAKE_TOKEN not in str(exc_info.value) |
| 515 | |
| 516 | def test_all_non_401_codes_strip_control_chars(self) -> None: |
| 517 | from muse.core.transport import HttpTransport, TransportError |
| 518 | from muse.core.validation import sanitize_display |
| 519 | |
| 520 | for code in (400, 403, 404, 409, 500, 502, 503): |
| 521 | body = f"Error on server with {_ANSI_ESC} and {_OSC_LINK}" |
| 522 | err = _make_http_error(code, body) |
| 523 | req = urllib.request.Request("http://hub.test/api") |
| 524 | |
| 525 | with patch("muse.core.transport._open_url", side_effect=err): |
| 526 | with pytest.raises(TransportError) as exc_info: |
| 527 | HttpTransport()._execute(req) |
| 528 | |
| 529 | raw_msg = str(exc_info.value) |
| 530 | displayed = sanitize_display(raw_msg) |
| 531 | assert "\x1b" not in displayed, f"ESC in HTTP {code} output: {displayed!r}" |
| 532 | assert "\x07" not in displayed, f"BEL in HTTP {code} output: {displayed!r}" |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
29 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
29 days ago