gabriel / muse public
test_security_env_injection.py python
569 lines 21.2 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Phase 2.7 — Environment variable injection security tests.
2
3 Attack surface
4 --------------
5 Muse reads six environment variables:
6
7 MUSE_REPO_ROOT — overrides repository root discovery.
8 MUSE_AGENT_ID — agent provenance stored in commit records.
9 MUSE_MODEL_ID — model provenance stored in commit records.
10 MUSE_TOOLCHAIN_ID — toolchain provenance stored in commit records.
11 MUSE_PROMPT_HASH — prompt hash stored in commit records.
12 MUSE_TEST_ENV — passed through to CI gate subprocesses.
13
14 Each variable represents a trust boundary: an attacker who can influence the
15 process environment (CI pipeline, shared-host user, container escape) can
16 inject crafted values.
17
18 Attack vectors discovered via muse recon
19 -----------------------------------------
20 1. **MUSE_REPO_ROOT — empty/whitespace string**: ``pathlib.Path("").resolve()``
21 returns the current working directory, so an empty override silently
22 behaves as if no override was set. Now explicitly ignored (falls through
23 to directory walk) with a debug log.
24
25 2. **MUSE_REPO_ROOT — control characters**: a path containing ESC or BEL
26 could not be a real filesystem path on any OS; now rejected to prevent
27 logging or display injection of the invalid value.
28
29 3. **MUSE_REPO_ROOT — overly long path**: values longer than PATH_MAX (4096)
30 are rejected as injection payloads rather than passed to ``pathlib``.
31
32 4. **Agent provenance fields (MUSE_AGENT_ID, MUSE_MODEL_ID,
33 MUSE_TOOLCHAIN_ID, MUSE_PROMPT_HASH)**: the comment in ``commit.py``
34 said "prevent control-character-laden strings" but only the length cap
35 (256 chars) was implemented, not control-character sanitization.
36 ESC sequences in agent_id → stored in commit records → terminal injection
37 when provenance is rendered in future display paths, agent dashboards,
38 or log pipelines.
39
40 5. **Challenge nonce — CRLF injection**: a nonce containing ``\\r\\n`` would
41 attempt to inject arbitrary HTTP headers. Python's ``http.client`` blocks
42 the injection at the wire level (``ValueError: Invalid header value``), but
43 now rejected at ingestion time so the error is surfaced as a clear
44 diagnostic rather than a confusing transport exception.
45
46 6. **Challenge nonce — control characters, excessive length**: non-printable chars
47 and pathologically long values (> 8192 chars) are now rejected by
48 ``sanitize_token`` before reaching the HTTP stack.
49
50 Fixes
51 -----
52 - ``sanitize_provenance()`` added to ``muse.core.validation`` — strips all
53 C0 (0x00–0x1F), DEL (0x7F), and C1 (0x80–0x9F) control characters.
54 Applied to all four provenance fields in ``muse/cli/commands/commit.py``.
55 - ``sanitize_token()`` added to ``muse.core.validation`` — strips whitespace,
56 rejects control chars and values longer than 8192 chars.
57 - ``find_repo_root()`` in ``muse/core/repo.py`` now explicitly ignores empty
58 and whitespace-only ``MUSE_REPO_ROOT`` values, logs a debug message, and
59 rejects values containing control characters or exceeding 4096 chars.
60 """
61
62 from __future__ import annotations
63
64 import os
65 import pathlib
66 import tempfile
67
68 import pytest
69
70 from muse.core.paths import muse_dir
71 from muse.core.validation import sanitize_provenance, sanitize_token
72 from muse.core.repo import find_repo_root
73
74
75 # ---------------------------------------------------------------------------
76 # Helpers
77 # ---------------------------------------------------------------------------
78
79 def _make_muse_dir(tmp_path: pathlib.Path) -> pathlib.Path:
80 """Create a minimal .muse/ skeleton and return the repo root."""
81 muse_dir(tmp_path).mkdir()
82 return tmp_path
83
84
85 # ===========================================================================
86 # sanitize_provenance — unit tests
87 # ===========================================================================
88
89
90 class TestSanitizeProvenance:
91 """sanitize_provenance must strip all C0/DEL/C1 control characters."""
92
93 def test_clean_string_unchanged(self) -> None:
94 assert sanitize_provenance("my-agent-v1") == "my-agent-v1"
95
96 def test_empty_string(self) -> None:
97 assert sanitize_provenance("") == ""
98
99 def test_unicode_allowed(self) -> None:
100 assert sanitize_provenance("agent-αβγ") == "agent-αβγ"
101
102 def test_spaces_allowed(self) -> None:
103 """Space (0x20) is not a control char and must be preserved."""
104 assert sanitize_provenance("my agent") == "my agent"
105
106 @pytest.mark.parametrize("char,description", [
107 ("\x00", "NUL"),
108 ("\x01", "SOH"),
109 ("\x07", "BEL"),
110 ("\x08", "BS"),
111 ("\x09", "HT (tab)"),
112 ("\x0a", "LF (newline)"),
113 ("\x0b", "VT"),
114 ("\x0c", "FF"),
115 ("\x0d", "CR"),
116 ("\x0e", "SO"),
117 ("\x1b", "ESC — ANSI injection entry point"),
118 ("\x1f", "US"),
119 ("\x7f", "DEL"),
120 ("\x80", "C1 PAD"),
121 ("\x9b", "CSI — ANSI CSI sequence introducer"),
122 ("\x9f", "C1 APC"),
123 ])
124 def test_control_char_stripped(self, char: str, description: str) -> None:
125 result = sanitize_provenance(f"prefix{char}suffix")
126 assert char not in result
127 assert "prefixsuffix" == result
128
129 def test_esc_sequence_stripped(self) -> None:
130 """Full ANSI colour sequence embedded in agent_id must be stripped."""
131 result = sanitize_provenance("\x1b[31mmalicious-agent\x1b[0m")
132 assert "\x1b" not in result
133 assert result == "[31mmalicious-agent[0m"
134
135 def test_newline_stripped(self) -> None:
136 """Newline in agent_id would split log lines — must be removed."""
137 result = sanitize_provenance("agent\nid\nsplitting")
138 assert "\n" not in result
139 assert result == "agentidsplitting"
140
141 def test_crlf_stripped(self) -> None:
142 result = sanitize_provenance("agent\r\nid")
143 assert "\r" not in result
144 assert "\n" not in result
145
146 def test_bel_stripped(self) -> None:
147 """BEL (0x07) causes terminal bell — must be stripped."""
148 result = sanitize_provenance("agent\x07id")
149 assert "\x07" not in result
150
151 def test_rtl_override_preserved(self) -> None:
152 """U+202E is not a C0/C1 char; sanitize_provenance does not strip Unicode bidi."""
153 # Unicode bidi control characters are a separate concern handled by
154 # rendering layers. sanitize_provenance only strips C0/DEL/C1.
155 s = "agent\u202eid"
156 result = sanitize_provenance(s)
157 assert result == s
158
159 def test_multiple_control_chars(self) -> None:
160 payload = "\x1b[31m\x07\x00agent\x1b[0m"
161 result = sanitize_provenance(payload)
162 assert "\x1b" not in result
163 assert "\x07" not in result
164 assert "\x00" not in result
165 assert "agent" in result
166
167 def test_does_not_truncate(self) -> None:
168 """sanitize_provenance does not enforce length — callers do [:256]."""
169 long_s = "a" * 300
170 assert len(sanitize_provenance(long_s)) == 300
171
172
173 # ===========================================================================
174 # sanitize_token — unit tests
175 # ===========================================================================
176
177
178 class TestSanitizeToken:
179 """sanitize_token must strip whitespace and reject control chars / overlength."""
180
181 def test_valid_opaque_token(self) -> None:
182 tok = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1c2VyIn0.abc123"
183 result = sanitize_token(tok)
184 assert result == tok
185
186 def test_strips_leading_trailing_whitespace(self) -> None:
187 result = sanitize_token(" mytoken ")
188 assert result == "mytoken"
189
190 def test_empty_string_returns_none(self) -> None:
191 assert sanitize_token("") is None
192
193 def test_whitespace_only_returns_none(self) -> None:
194 assert sanitize_token(" \t\n ") is None
195
196 def test_overlength_returns_none(self) -> None:
197 assert sanitize_token("a" * 8193) is None
198
199 def test_max_length_accepted(self) -> None:
200 result = sanitize_token("a" * 8192)
201 assert result is not None
202 assert len(result) == 8192
203
204 @pytest.mark.parametrize("char,description", [
205 ("\r", "CR — HTTP header line terminator"),
206 ("\n", "LF — HTTP header line terminator"),
207 ("\r\n", "CRLF — HTTP header injection sequence"),
208 ("\x00", "NUL"),
209 ("\x01", "SOH"),
210 ("\x1b", "ESC"),
211 ("\x1f", "US"),
212 ("\x7f", "DEL"),
213 ])
214 def test_control_char_returns_none(self, char: str, description: str) -> None:
215 result = sanitize_token(f"good_token{char}malicious")
216 assert result is None
217
218 def test_crlf_header_injection_blocked(self) -> None:
219 """Classic HTTP header injection payload must be rejected."""
220 payload = "good_token\r\nX-Injected: pwned\r\nAuthorization: MSign attacker"
221 assert sanitize_token(payload) is None
222
223 def test_unicode_printable_allowed(self) -> None:
224 """Unicode printable chars (e.g., in opaque tokens) must be accepted."""
225 tok = "token-αβγ-δεζ"
226 result = sanitize_token(tok)
227 assert result == tok
228
229 def test_bare_api_key_format(self) -> None:
230 tok = "sk-abc123XYZ_-."
231 result = sanitize_token(tok)
232 assert result == tok
233
234
235 # ===========================================================================
236 # find_repo_root — MUSE_REPO_ROOT hardening
237 # ===========================================================================
238
239
240 class TestFindRepoRootEnvHardening:
241 """find_repo_root must safely handle all MUSE_REPO_ROOT attack payloads."""
242
243 def _with_env(self, key: str, value: str) -> None:
244 os.environ[key] = value
245
246 def _clear_env(self, key: str) -> None:
247 os.environ.pop(key, None)
248
249 def test_empty_string_ignored_falls_through(self, tmp_path: pathlib.Path) -> None:
250 """Empty MUSE_REPO_ROOT must not redirect to cwd; falls through to walk."""
251 _make_muse_dir(tmp_path)
252 old_cwd = os.getcwd()
253 try:
254 os.chdir(tmp_path)
255 self._with_env("MUSE_REPO_ROOT", "")
256 result = find_repo_root()
257 # Should find the cwd repo, not crash or return None
258 assert result is not None
259 finally:
260 self._clear_env("MUSE_REPO_ROOT")
261 os.chdir(old_cwd)
262
263 def test_whitespace_only_ignored(self, tmp_path: pathlib.Path) -> None:
264 """Whitespace-only MUSE_REPO_ROOT must be ignored."""
265 _make_muse_dir(tmp_path)
266 old_cwd = os.getcwd()
267 try:
268 os.chdir(tmp_path)
269 self._with_env("MUSE_REPO_ROOT", " \t ")
270 result = find_repo_root()
271 # Falls through to walk — finds repo at tmp_path
272 assert result is not None
273 finally:
274 self._clear_env("MUSE_REPO_ROOT")
275 os.chdir(old_cwd)
276
277 def test_control_char_in_path_returns_none(self) -> None:
278 """MUSE_REPO_ROOT containing ESC must be rejected, not resolved."""
279 self._with_env("MUSE_REPO_ROOT", "/tmp/\x1b[31mattack")
280 try:
281 result = find_repo_root()
282 assert result is None
283 finally:
284 self._clear_env("MUSE_REPO_ROOT")
285
286 def test_nul_byte_in_path_returns_none(self) -> None:
287 """MUSE_REPO_ROOT with embedded NUL (0x01 since 0x00 can't be in env) rejected."""
288 self._with_env("MUSE_REPO_ROOT", "/tmp/\x01attack")
289 try:
290 result = find_repo_root()
291 assert result is None
292 finally:
293 self._clear_env("MUSE_REPO_ROOT")
294
295 def test_path_max_exceeded_returns_none(self) -> None:
296 """MUSE_REPO_ROOT longer than PATH_MAX (4096) must be rejected."""
297 self._with_env("MUSE_REPO_ROOT", f"/tmp/{'a' * 4092}")
298 try:
299 result = find_repo_root()
300 assert result is None
301 finally:
302 self._clear_env("MUSE_REPO_ROOT")
303
304 def test_valid_override_to_attacker_dir_with_muse(
305 self, tmp_path: pathlib.Path
306 ) -> None:
307 """MUSE_REPO_ROOT pointing to a dir with real .muse/ is accepted (by design)."""
308 _make_muse_dir(tmp_path)
309 self._with_env("MUSE_REPO_ROOT", str(tmp_path))
310 try:
311 result = find_repo_root()
312 assert result is not None
313 assert result.resolve() == tmp_path.resolve()
314 finally:
315 self._clear_env("MUSE_REPO_ROOT")
316
317 def test_valid_override_to_dir_without_muse_returns_none(
318 self, tmp_path: pathlib.Path
319 ) -> None:
320 """MUSE_REPO_ROOT pointing to a dir without .muse/ returns None."""
321 self._with_env("MUSE_REPO_ROOT", str(tmp_path))
322 try:
323 result = find_repo_root()
324 assert result is None
325 finally:
326 self._clear_env("MUSE_REPO_ROOT")
327
328 def test_symlinked_muse_dir_rejected(self, tmp_path: pathlib.Path) -> None:
329 """MUSE_REPO_ROOT pointing to a dir with a symlinked .muse/ returns None."""
330 real = tmp_path / "real"
331 real.mkdir()
332 muse_dir(real).mkdir()
333 attacker = tmp_path / "attacker"
334 attacker.mkdir()
335 muse_dir(attacker).symlink_to(muse_dir(real))
336
337 self._with_env("MUSE_REPO_ROOT", str(attacker))
338 try:
339 result = find_repo_root()
340 assert result is None
341 finally:
342 self._clear_env("MUSE_REPO_ROOT")
343
344 def test_path_traversal_resolved_safely(self, tmp_path: pathlib.Path) -> None:
345 """MUSE_REPO_ROOT with ../../ is resolved by pathlib — no .muse/ means None."""
346 self._with_env("MUSE_REPO_ROOT", "/tmp/../../nonexistent")
347 try:
348 result = find_repo_root()
349 # Either None (no .muse/ there) or a resolved path without .muse/ → None
350 assert result is None
351 finally:
352 self._clear_env("MUSE_REPO_ROOT")
353
354 def test_nonexistent_path_returns_none(self) -> None:
355 """MUSE_REPO_ROOT pointing to a non-existent path returns None."""
356 self._with_env("MUSE_REPO_ROOT", "/tmp/muse_definitely_does_not_exist_xyz")
357 try:
358 result = find_repo_root()
359 assert result is None
360 finally:
361 self._clear_env("MUSE_REPO_ROOT")
362
363 def test_devnull_returns_none(self) -> None:
364 """/dev/null is not a directory with .muse/ — returns None."""
365 self._with_env("MUSE_REPO_ROOT", "/dev/null")
366 try:
367 result = find_repo_root()
368 assert result is None
369 finally:
370 self._clear_env("MUSE_REPO_ROOT")
371
372 def test_filesystem_root_returns_none(self) -> None:
373 """MUSE_REPO_ROOT=/ returns None (no .muse/ at /). Confirms no special behaviour."""
374 self._with_env("MUSE_REPO_ROOT", "/")
375 try:
376 result = find_repo_root()
377 assert result is None
378 finally:
379 self._clear_env("MUSE_REPO_ROOT")
380
381
382 # ===========================================================================
383 # Agent provenance — end-to-end sanitization in stored records
384 # ===========================================================================
385
386
387 class TestProvenanceSanitizationEndToEnd:
388 """After commit.py applies sanitize_provenance, commit records must be clean."""
389
390 def test_esc_in_agent_id_stripped(self) -> None:
391 """ESC injection in MUSE_AGENT_ID must not survive into the stored value."""
392 raw = "\x1b[31mmalicias-agent\x1b[0m"
393 clean = sanitize_provenance(raw[:256])
394 assert "\x1b" not in clean
395 # Printable text is preserved
396 assert "malicias-agent" in clean
397
398 def test_newline_in_model_id_stripped(self) -> None:
399 raw = "gpt-4\nX-Injected: pwned"
400 clean = sanitize_provenance(raw[:256])
401 assert "\n" not in clean
402
403 def test_tab_in_toolchain_id_stripped(self) -> None:
404 raw = "cursor-agent\tv2"
405 clean = sanitize_provenance(raw[:256])
406 assert "\t" not in clean
407
408 def test_all_c0_chars_stripped_from_prompt_hash(self) -> None:
409 for byte_val in range(0x00, 0x20):
410 char = chr(byte_val)
411 raw = f"hash{char}value"
412 clean = sanitize_provenance(raw)
413 assert char not in clean, f"Control char 0x{byte_val:02x} survived sanitize_provenance"
414
415 def test_length_truncation_then_sanitize(self) -> None:
416 """Simulate commit.py: truncate to _MAX_PROV then sanitize."""
417 _MAX_PROV = 256
418 payload = f"{'a' * 200}\x1b[31m{'b' * 100}"
419 stored = sanitize_provenance(payload[:_MAX_PROV])
420 assert len(stored) <= _MAX_PROV
421 assert "\x1b" not in stored
422
423 def test_clean_agent_id_survives(self) -> None:
424 raw = "counterpoint-bot-v2.1"
425 assert sanitize_provenance(raw[:256]) == raw
426
427 def test_unicode_agent_id_survives(self) -> None:
428 raw = "agent-αβγ-2024"
429 assert sanitize_provenance(raw[:256]) == raw
430
431 def test_hyphen_underscore_dot_survive(self) -> None:
432 raw = "cursor-agent_v2.1"
433 assert sanitize_provenance(raw[:256]) == raw
434
435
436 # ===========================================================================
437 # sanitize_token — integration with identity.py resolve_token
438 # ===========================================================================
439
440
441 class TestSanitizeTokenIntegration:
442 """sanitize_token must block CRLF and control chars before HTTP stack."""
443
444 def test_crlf_blocked_before_http_client(self) -> None:
445 """A CRLF token must be caught by sanitize_token, not by http.client."""
446 payload = "good\r\nX-Injected: pwned"
447 result = sanitize_token(payload)
448 assert result is None
449
450 def test_newline_only_blocked(self) -> None:
451 assert sanitize_token("token\nmalicious") is None
452
453 def test_cr_only_blocked(self) -> None:
454 assert sanitize_token("token\rmalicious") is None
455
456 def test_http_client_would_also_block_crlf(self) -> None:
457 """Demonstrate that Python's http.client blocks CRLF at the wire level.
458
459 This proves the http.client defence exists but our sanitize_token defence
460 should fire first so the user gets a clear diagnostic. We call
461 ``putrequest`` + ``putheader`` directly to trigger validation without
462 opening a socket.
463 """
464 import http.client
465
466 payload = "good_token\r\nX-Injected: pwned"
467 conn = http.client.HTTPConnection("example.com")
468 conn.putrequest("GET", "/")
469 with pytest.raises((ValueError, Exception)):
470 conn.putheader("Authorization", f"MSign {payload}")
471
472
473 # ===========================================================================
474 # Concurrency — env var reads are snapshot-safe
475 # ===========================================================================
476
477
478 class TestConcurrentEnvVarReads:
479 """Multiple threads calling sanitize_provenance/sanitize_token concurrently."""
480
481 def test_concurrent_sanitize_provenance(self) -> None:
482 import threading
483
484 results: list[str] = []
485 errors: list[str] = []
486
487 def worker(payload: str) -> None:
488 try:
489 results.append(sanitize_provenance(payload))
490 except Exception as exc:
491 errors.append(str(exc))
492
493 payloads = [
494 f"agent-{i}\x1b[31m\x07\r\n" for i in range(20)
495 ]
496 threads = [threading.Thread(target=worker, args=(p,)) for p in payloads]
497 for t in threads:
498 t.start()
499 for t in threads:
500 t.join()
501
502 assert errors == []
503 assert len(results) == 20
504 for r in results:
505 assert "\x1b" not in r
506 assert "\x07" not in r
507 assert "\r" not in r
508 assert "\n" not in r
509
510 def test_concurrent_sanitize_token(self) -> None:
511 import threading
512
513 good: list[str] = []
514 bad: list[None] = []
515
516 def worker(tok: str) -> None:
517 result = sanitize_token(tok)
518 if result is None:
519 bad.append(None)
520 else:
521 good.append(result)
522
523 valid_tokens = [f"valid-token-{i}" for i in range(10)]
524 invalid_tokens = [f"bad\r\ntoken-{i}" for i in range(10)]
525
526 threads = [
527 threading.Thread(target=worker, args=(t,))
528 for t in valid_tokens + invalid_tokens
529 ]
530 for t in threads:
531 t.start()
532 for t in threads:
533 t.join()
534
535 assert len(good) == 10
536 assert len(bad) == 10
537
538
539 # ===========================================================================
540 # Fuzzing — random payloads
541 # ===========================================================================
542
543
544 class TestFuzzedEnvVarPayloads:
545
546 @pytest.mark.parametrize("seed", range(20))
547 def test_random_control_char_in_provenance_always_stripped(self, seed: int) -> None:
548 import random
549 rng = random.Random(seed)
550 char = chr(rng.randint(0x00, 0x1F))
551 payload = f"prefix{char}suffix"
552 result = sanitize_provenance(payload)
553 assert char not in result
554
555 @pytest.mark.parametrize("seed", range(10))
556 def test_random_crlf_token_always_rejected(self, seed: int) -> None:
557 import random
558 rng = random.Random(seed + 50)
559 crlf = rng.choice(["\r\n", "\r", "\n"])
560 payload = f"token{crlf}malicious"
561 assert sanitize_token(payload) is None
562
563 @pytest.mark.parametrize("seed", range(5))
564 def test_random_overlength_token_rejected(self, seed: int) -> None:
565 import random
566 rng = random.Random(seed + 100)
567 length = rng.randint(8193, 20000)
568 payload = "a" * length
569 assert sanitize_token(payload) is None
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago