gabriel / muse public
test_security_credential_leakage.py python
532 lines 22.0 KB
Raw
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 73 days ago
1 """Phase 2.5 — Credential and secret leakage tests.
2
3 Attack surface
4 --------------
5 A malicious or compromised remote server receives ``Authorization: MSign``
6 and echoes the token in its HTTP error response body. Without hardening, that
7 echoed value propagates through ``TransportError.__str__()`` to every display
8 callsite — landing on stderr where any agent log-scraper reads it.
9
10 Layers defended
11 ---------------
12 **Layer 1 — ``_http_error_message`` (transport.py)**
13
14 - HTTP 401: NEVER includes the server error body. Always returns the generic
15 "Authentication failed" string — the one status code where the server
16 provably received the signing credential.
17 - Other HTTP errors: include the response body (capped at 200 chars) after
18 stripping all C0/C1 control characters (ESC, BEL, CSI, NUL…). This blocks
19 ANSI/OSC terminal injection while preserving diagnostic text.
20 - ``URLError.reason``: sanitized before embedding in ``TransportError``.
21
22 **Layer 2 — display callsites**
23
24 All ``print(f"... {exc} ...")`` paths in network-facing commands wrap
25 ``str(exc)`` with ``sanitize_display``, providing a second layer against any
26 residual control characters.
27
28 Specific paths verified
29 -----------------------
30 - ``transport.py::_http_error_message`` — unit tests
31 - ``transport.py::HttpTransport._execute`` — mock HTTP integration
32 - ``transport.py::HttpTransport._execute_fetch`` — mock HTTP integration
33 - Display callsite chain (sanitize_display ∘ TransportError.__str__)
34 - ``auth.py::_json_post`` error paths — ANSI injection
35 - ``whoami._mask_token`` — masking semantics
36 - ``sanitize_display`` — control char stripping
37 - ``plumbing/ls_remote`` error path — mock integration
38 """
39 from __future__ import annotations
40
41 import io
42 import sys
43 import urllib.error
44 import urllib.request
45 from http.client import HTTPMessage
46 from unittest.mock import MagicMock, patch
47
48 import pytest
49
50
51 # ---------------------------------------------------------------------------
52 # Constants
53 # ---------------------------------------------------------------------------
54
55 _FAKE_TOKEN = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.test.signature_abc"
56 # ESC is 0x1b — stripped by sanitize_display; remaining "[31m..." is harmless text
57 _ANSI_ESC = "\x1b[31mRED\x1b[0m"
58 # OSC 8 hyperlink — ESC (0x1b) and BEL (0x07) stripped; surrounding text remains
59 _OSC_LINK = "\x1b]8;;http://evil.com\x07click\x1b]8;;\x07"
60 _BEL = "\x07ring"
61
62
63 def _make_http_error(code: int, body: str, url: str = "http://hub.test/api") -> urllib.error.HTTPError:
64 """Construct an ``urllib.error.HTTPError`` with a controlled body."""
65 fp = io.BytesIO(body.encode("utf-8"))
66 return urllib.error.HTTPError(url, code, f"HTTP Error {code}", HTTPMessage(), fp)
67
68
69 # ---------------------------------------------------------------------------
70 # 1. _http_error_message — unit tests
71 # ---------------------------------------------------------------------------
72
73 class TestHttpErrorMessage:
74 """``_http_error_message`` is the single safe formatter for HTTP errors."""
75
76 def _call(self, code: int, body: str) -> str:
77 from muse.core.transport import _http_error_message
78 return _http_error_message(_make_http_error(code, body))
79
80 # ---- HTTP 401 — server MAY have echoed the signing credential ----
81
82 def test_401_never_includes_body_with_token(self) -> None:
83 """A 401 body that explicitly echoes the token must NEVER appear."""
84 result = self._call(401, f"Token {_FAKE_TOKEN} is invalid.")
85 assert _FAKE_TOKEN not in result
86
87 def test_401_always_returns_generic_message(self) -> None:
88 """Every 401 body maps to the same generic string — no content variation."""
89 bodies = [
90 f"Authorization: MSign {_FAKE_TOKEN}",
91 "Unauthorized",
92 "",
93 "X" * 1000,
94 _ANSI_ESC,
95 ]
96 messages = {self._call(401, b) for b in bodies}
97 assert len(messages) == 1, f"Multiple messages for 401: {messages}"
98
99 def test_401_contains_authentication_failed(self) -> None:
100 assert "Authentication failed" in self._call(401, "anything")
101
102 def test_401_status_text_not_in_result(self) -> None:
103 """No server-controlled content appears in a 401 result."""
104 server_text = "your_secret_data"
105 result = self._call(401, server_text)
106 assert server_text not in result
107
108 # ---- Non-401 codes — body included but sanitized ----
109
110 def test_500_includes_status_code(self) -> None:
111 assert "500" in self._call(500, "Internal server error")
112
113 def test_500_includes_sanitized_body_text(self) -> None:
114 result = self._call(500, "Internal server error")
115 assert "Internal server error" in result
116
117 def test_500_strips_esc_from_body(self) -> None:
118 """ESC (0x1b) stripped — ANSI sequences cannot fire in terminals."""
119 result = self._call(500, _ANSI_ESC)
120 assert "\x1b" not in result
121
122 def test_500_strips_bel_from_body(self) -> None:
123 """BEL (0x07) stripped — prevents terminal bell injection."""
124 result = self._call(500, _OSC_LINK)
125 assert "\x07" not in result
126
127 def test_500_body_capped_at_200_chars(self) -> None:
128 """Error body is capped to bound output size."""
129 result = self._call(500, "X" * 500)
130 body_part = result.replace("HTTP 500: ", "")
131 assert len(body_part) <= 200
132
133 def test_500_empty_body_returns_just_status(self) -> None:
134 assert self._call(500, "") == "HTTP 500"
135
136 def test_404_body_included(self) -> None:
137 result = self._call(404, "Not found")
138 assert "404" in result
139 assert "Not found" in result
140
141 def test_403_body_included(self) -> None:
142 """403 is NOT treated like 401 — body shown (sanitized)."""
143 result = self._call(403, "Forbidden resource")
144 assert "403" in result
145 assert "Forbidden resource" in result
146
147 def test_ansi_in_any_non_401_body_strips_esc(self) -> None:
148 for code in (400, 403, 404, 409, 500, 502, 503):
149 result = self._call(code, _ANSI_ESC)
150 assert "\x1b" not in result, f"ESC in {code} result: {result!r}"
151
152 def test_bel_in_any_non_401_body_stripped(self) -> None:
153 for code in (400, 403, 404, 409, 500, 502, 503):
154 result = self._call(code, _OSC_LINK)
155 assert "\x07" not in result, f"BEL in {code} result: {result!r}"
156
157
158 # ---------------------------------------------------------------------------
159 # 2. HttpTransport._execute — mock HTTP server integration
160 # ---------------------------------------------------------------------------
161
162 class TestHttpTransportExecute:
163 """``_execute`` converts HTTP errors to safe ``TransportError`` instances."""
164
165 def test_401_transport_error_no_token(self) -> None:
166 from muse.core.transport import HttpTransport, TransportError
167
168 err = _make_http_error(401, f"token={_FAKE_TOKEN} is revoked")
169 req = urllib.request.Request("http://hub.test/api")
170
171 with patch("muse.core.transport._open_url", side_effect=err):
172 with pytest.raises(TransportError) as exc_info:
173 HttpTransport()._execute(req)
174
175 msg = str(exc_info.value)
176 assert _FAKE_TOKEN not in msg
177 assert "Authentication failed" in msg
178 assert exc_info.value.status_code == 401
179
180 def test_500_includes_sanitized_body(self) -> None:
181 from muse.core.transport import HttpTransport, TransportError
182
183 err = _make_http_error(500, "Database error on shard-42")
184 req = urllib.request.Request("http://hub.test/api")
185
186 with patch("muse.core.transport._open_url", side_effect=err):
187 with pytest.raises(TransportError) as exc_info:
188 HttpTransport()._execute(req)
189
190 msg = str(exc_info.value)
191 assert "500" in msg
192 assert "Database error" in msg
193 assert "\x1b" not in msg
194
195 def test_500_ansi_in_body_strips_esc(self) -> None:
196 from muse.core.transport import HttpTransport, TransportError
197
198 err = _make_http_error(500, _ANSI_ESC)
199 req = urllib.request.Request("http://hub.test/api")
200
201 with patch("muse.core.transport._open_url", side_effect=err):
202 with pytest.raises(TransportError) as exc_info:
203 HttpTransport()._execute(req)
204
205 assert "\x1b" not in str(exc_info.value)
206
207 def test_url_error_reason_esc_stripped(self) -> None:
208 from muse.core.transport import HttpTransport, TransportError
209
210 url_err = urllib.error.URLError(f"Connection refused {_ANSI_ESC}")
211 req = urllib.request.Request("http://hub.test/api")
212
213 with patch("muse.core.transport._open_url", side_effect=url_err):
214 with pytest.raises(TransportError) as exc_info:
215 HttpTransport()._execute(req)
216
217 assert "\x1b" not in str(exc_info.value)
218
219
220 class TestHttpTransportExecuteFetch:
221 """``_execute_fetch`` applies the same safety guarantees as ``_execute``."""
222
223 def test_401_body_with_token_not_in_error(self) -> None:
224 from muse.core.transport import HttpTransport, TransportError
225
226 malicious_body = f"Your token {_FAKE_TOKEN} has been revoked."
227 err = _make_http_error(401, malicious_body)
228 req = urllib.request.Request("http://hub.test/api")
229
230 with patch("muse.core.transport._open_url", side_effect=err):
231 with pytest.raises(TransportError) as exc_info:
232 HttpTransport()._execute_fetch(req)
233
234 assert _FAKE_TOKEN not in str(exc_info.value)
235
236 def test_500_esc_stripped(self) -> None:
237 from muse.core.transport import HttpTransport, TransportError
238
239 err = _make_http_error(500, _ANSI_ESC)
240 req = urllib.request.Request("http://hub.test/api")
241
242 with patch("muse.core.transport._open_url", side_effect=err):
243 with pytest.raises(TransportError) as exc_info:
244 HttpTransport()._execute_fetch(req)
245
246 assert "\x1b" not in str(exc_info.value)
247
248
249 # ---------------------------------------------------------------------------
250 # 3. Display callsite hardening — sanitize_display ∘ TransportError.__str__
251 # ---------------------------------------------------------------------------
252
253 class TestDisplayCallsiteChain:
254 """The ``sanitize_display(str(exc))`` chain works correctly for all network errors."""
255
256 def _make_exc(self, msg: str, code: int = 500) -> "Exception":
257 from muse.core.transport import TransportError
258 return TransportError(msg, code)
259
260 def test_ansi_in_transport_error_stripped_at_display(self) -> None:
261 from muse.core.validation import sanitize_display
262 exc = self._make_exc(f"HTTP 500: {_ANSI_ESC}")
263 displayed = sanitize_display(str(exc))
264 assert "\x1b" not in displayed
265
266 def test_osc_bel_stripped_at_display(self) -> None:
267 from muse.core.validation import sanitize_display
268 exc = self._make_exc(f"HTTP 500: {_OSC_LINK}")
269 displayed = sanitize_display(str(exc))
270 assert "\x1b" not in displayed
271 assert "\x07" not in displayed
272
273 def test_generic_auth_message_passes_through_unchanged(self) -> None:
274 from muse.core.validation import sanitize_display
275 from muse.core.transport import TransportError
276 generic = "Authentication failed (HTTP 401). Run 'muse auth register'."
277 exc = TransportError(generic, 401)
278 assert sanitize_display(str(exc)) == generic
279
280 def test_token_value_not_stripped_by_sanitize_display(self) -> None:
281 """sanitize_display only strips control chars — the 401 guard prevents
282 token values from reaching TransportError in the first place."""
283 from muse.core.validation import sanitize_display
284 assert sanitize_display(_FAKE_TOKEN) == _FAKE_TOKEN
285
286 def test_all_http_error_codes_strip_esc(self) -> None:
287 from muse.core.transport import HttpTransport, TransportError
288 for code in (400, 403, 404, 409, 500, 502, 503):
289 body = f"Error on server with {_ANSI_ESC}"
290 err = _make_http_error(code, body)
291 req = urllib.request.Request("http://hub.test/api")
292 with patch("muse.core.transport._open_url", side_effect=err):
293 with pytest.raises(TransportError) as exc_info:
294 HttpTransport()._execute(req)
295 msg = str(exc_info.value)
296 assert "\x1b" not in msg, f"ESC leaked for HTTP {code}: {msg!r}"
297 assert "\x07" not in msg, f"BEL leaked for HTTP {code}: {msg!r}"
298
299
300 # ---------------------------------------------------------------------------
301 # 4. auth.py _json_post — ANSI injection in server error body
302 # ---------------------------------------------------------------------------
303
304 class TestAuthJsonPostSanitization:
305 """``auth._json_post`` sanitizes server-controlled error content before display."""
306
307 def test_ansi_in_http_error_body_stripped(self) -> None:
308 import muse.cli.commands.auth as auth_mod
309
310 err = _make_http_error(400, f"Bad request {_ANSI_ESC}")
311 captured = io.StringIO()
312
313 with patch.object(sys, "stderr", captured), \
314 patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=err):
315 with pytest.raises(SystemExit):
316 auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"})
317
318 assert "\x1b" not in captured.getvalue()
319
320 def test_osc_in_http_error_body_stripped(self) -> None:
321 import muse.cli.commands.auth as auth_mod
322
323 err = _make_http_error(400, _OSC_LINK)
324 captured = io.StringIO()
325
326 with patch.object(sys, "stderr", captured), \
327 patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=err):
328 with pytest.raises(SystemExit):
329 auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"})
330
331 assert "\x07" not in captured.getvalue()
332
333 def test_url_error_reason_sanitized(self) -> None:
334 import muse.cli.commands.auth as auth_mod
335
336 url_err = urllib.error.URLError(f"DNS failure {_ANSI_ESC}")
337 captured = io.StringIO()
338
339 with patch.object(sys, "stderr", captured), \
340 patch("muse.cli.commands.auth.urllib.request.urlopen", side_effect=url_err):
341 with pytest.raises(SystemExit):
342 auth_mod._json_post_raw("http://hub.test", "/api/auth", {"key": "val"})
343
344 assert "\x1b" not in captured.getvalue()
345
346
347 # ---------------------------------------------------------------------------
348 # ---------------------------------------------------------------------------
349 # 5. sanitize_display — control character stripping semantics
350 # ---------------------------------------------------------------------------
351
352 class TestSanitizeDisplay:
353 """``sanitize_display`` strips C0/C1 control bytes, preserving text."""
354
355 def _sanitize(self, s: str) -> str:
356 from muse.core.validation import sanitize_display
357 return sanitize_display(s)
358
359 def test_esc_stripped_from_ansi_sequence(self) -> None:
360 """ESC (0x1b) is stripped; the surrounding bracket text remains harmless."""
361 result = self._sanitize(_ANSI_ESC)
362 assert "\x1b" not in result
363 assert "RED" in result # display text preserved
364
365 def test_esc_stripped_from_osc_sequence(self) -> None:
366 result = self._sanitize(_OSC_LINK)
367 assert "\x1b" not in result
368
369 def test_bel_stripped(self) -> None:
370 """BEL (0x07) stripped."""
371 assert "\x07" not in self._sanitize(_BEL)
372 assert "ring" in self._sanitize(_BEL)
373
374 def test_null_byte_stripped(self) -> None:
375 result = self._sanitize("a\x00b")
376 assert "\x00" not in result
377 assert "a" in result and "b" in result
378
379 def test_csi_byte_stripped(self) -> None:
380 """C1 CSI (0x9b) stripped."""
381 csi = "\x9b31mRED\x9bm"
382 result = self._sanitize(csi)
383 assert "\x9b" not in result
384
385 def test_newline_preserved(self) -> None:
386 assert "\n" in self._sanitize("line1\nline2")
387
388 def test_tab_preserved(self) -> None:
389 assert "\t" in self._sanitize("col1\tcol2")
390
391 def test_normal_ascii_unchanged(self) -> None:
392 text = "HTTP 500: Internal server error on shard-42"
393 assert self._sanitize(text) == text
394
395 def test_token_value_not_stripped(self) -> None:
396 """sanitize_display does NOT strip token values — only C0/C1 control bytes.
397 Token leakage prevention is handled at the TransportError creation layer."""
398 assert self._sanitize(_FAKE_TOKEN) == _FAKE_TOKEN
399
400 def test_empty_string(self) -> None:
401 assert self._sanitize("") == ""
402
403 def test_unicode_preserved(self) -> None:
404 assert self._sanitize("héllo wörld 中文") == "héllo wörld 中文"
405
406 def test_all_c0_control_bytes_stripped_except_tab_and_lf(self) -> None:
407 """Every C0 control byte 0x00–0x1f is stripped except HT (0x09) and LF (0x0a)."""
408 for code in range(0, 32):
409 if code in (9, 10): # HT and LF are preserved
410 continue
411 char = chr(code)
412 result = self._sanitize(f"before{char}after")
413 assert char not in result, f"C0 byte 0x{code:02x} not stripped"
414
415 def test_del_stripped(self) -> None:
416 assert chr(0x7f) not in self._sanitize(f"a{chr(0x7f)}b")
417
418 def test_c1_control_bytes_stripped(self) -> None:
419 """C1 control bytes 0x80–0x9f stripped."""
420 for code in range(0x80, 0xa0):
421 char = chr(code)
422 result = self._sanitize(f"x{char}y")
423 assert char not in result, f"C1 byte 0x{code:02x} not stripped"
424
425
426 # ---------------------------------------------------------------------------
427 # 7. ls_remote plumbing — error display sanitized
428 # ---------------------------------------------------------------------------
429
430 class TestLsRemoteErrorSanitized:
431 """``plumbing/ls_remote`` never leaks token or ANSI in error output."""
432
433 def test_ansi_in_transport_error_stripped(self, tmp_path: "pathlib.Path") -> None:
434 import pathlib
435 import muse.cli.commands.plumbing.ls_remote as ls_mod
436 from muse.core.transport import TransportError
437
438 exc = TransportError(f"HTTP 500: {_ANSI_ESC}", 500)
439 captured = io.StringIO()
440
441 with patch.object(sys, "stderr", captured), \
442 patch.object(ls_mod, "find_repo_root", return_value=tmp_path), \
443 patch.object(ls_mod, "get_remote", return_value="http://hub.test"), \
444 patch.object(ls_mod, "get_signing_identity", return_value=_FAKE_TOKEN), \
445 patch.object(ls_mod, "HttpTransport") as mock_t:
446 mock_t.return_value.fetch_remote_info.side_effect = exc
447 args = MagicMock()
448 args.remote = "local"
449 args.fmt = "json"
450 try:
451 ls_mod.run(args)
452 except SystemExit:
453 pass
454
455 assert "\x1b" not in captured.getvalue()
456
457 def test_401_token_echo_not_in_stderr(self, tmp_path: "pathlib.Path") -> None:
458 import muse.cli.commands.plumbing.ls_remote as ls_mod
459 from muse.core.transport import TransportError
460
461 exc = TransportError("Authentication failed (HTTP 401). Run 'muse auth register'.", 401)
462 captured = io.StringIO()
463
464 with patch.object(sys, "stderr", captured), \
465 patch.object(ls_mod, "find_repo_root", return_value=tmp_path), \
466 patch.object(ls_mod, "get_remote", return_value="http://hub.test"), \
467 patch.object(ls_mod, "get_signing_identity", return_value=_FAKE_TOKEN), \
468 patch.object(ls_mod, "HttpTransport") as mock_t:
469 mock_t.return_value.fetch_remote_info.side_effect = exc
470 args = MagicMock()
471 args.remote = "local"
472 args.fmt = "json"
473 try:
474 ls_mod.run(args)
475 except SystemExit:
476 pass
477
478 assert _FAKE_TOKEN not in captured.getvalue()
479
480
481 # ---------------------------------------------------------------------------
482 # 8. End-to-end: 401 body with token echo — full chain
483 # ---------------------------------------------------------------------------
484
485 class TestEndToEnd401TokenEcho:
486 """Full chain: malicious 401 body → _http_error_message → TransportError → display."""
487
488 def test_401_token_echo_fully_suppressed_execute(self) -> None:
489 from muse.core.transport import HttpTransport, TransportError
490
491 malicious_body = f"Access denied. Token: {_FAKE_TOKEN}"
492 err = _make_http_error(401, malicious_body)
493 req = urllib.request.Request("http://hub.test/api")
494
495 with patch("muse.core.transport._open_url", side_effect=err):
496 with pytest.raises(TransportError) as exc_info:
497 HttpTransport()._execute(req)
498
499 full_output = str(exc_info.value)
500 assert _FAKE_TOKEN not in full_output
501 assert malicious_body not in full_output
502
503 def test_401_token_echo_fully_suppressed_execute_fetch(self) -> None:
504 from muse.core.transport import HttpTransport, TransportError
505
506 malicious_body = f"Your token {_FAKE_TOKEN} has been revoked."
507 err = _make_http_error(401, malicious_body)
508 req = urllib.request.Request("http://hub.test/api")
509
510 with patch("muse.core.transport._open_url", side_effect=err):
511 with pytest.raises(TransportError) as exc_info:
512 HttpTransport()._execute_fetch(req)
513
514 assert _FAKE_TOKEN not in str(exc_info.value)
515
516 def test_all_non_401_codes_strip_control_chars(self) -> None:
517 from muse.core.transport import HttpTransport, TransportError
518 from muse.core.validation import sanitize_display
519
520 for code in (400, 403, 404, 409, 500, 502, 503):
521 body = f"Error on server with {_ANSI_ESC} and {_OSC_LINK}"
522 err = _make_http_error(code, body)
523 req = urllib.request.Request("http://hub.test/api")
524
525 with patch("muse.core.transport._open_url", side_effect=err):
526 with pytest.raises(TransportError) as exc_info:
527 HttpTransport()._execute(req)
528
529 raw_msg = str(exc_info.value)
530 displayed = sanitize_display(raw_msg)
531 assert "\x1b" not in displayed, f"ESC in HTTP {code} output: {displayed!r}"
532 assert "\x07" not in displayed, f"BEL in HTTP {code} output: {displayed!r}"
File History 1 commit
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 73 days ago