"""Phase 2.7 — Environment variable injection security tests. Attack surface -------------- Muse reads six environment variables: MUSE_REPO_ROOT — overrides repository root discovery. MUSE_AGENT_ID — agent provenance stored in commit records. MUSE_MODEL_ID — model provenance stored in commit records. MUSE_TOOLCHAIN_ID — toolchain provenance stored in commit records. MUSE_PROMPT_HASH — prompt hash stored in commit records. MUSE_TEST_ENV — passed through to CI gate subprocesses. Each variable represents a trust boundary: an attacker who can influence the process environment (CI pipeline, shared-host user, container escape) can inject crafted values. Attack vectors discovered via muse recon ----------------------------------------- 1. **MUSE_REPO_ROOT — empty/whitespace string**: ``pathlib.Path("").resolve()`` returns the current working directory, so an empty override silently behaves as if no override was set. Now explicitly ignored (falls through to directory walk) with a debug log. 2. **MUSE_REPO_ROOT — control characters**: a path containing ESC or BEL could not be a real filesystem path on any OS; now rejected to prevent logging or display injection of the invalid value. 3. **MUSE_REPO_ROOT — overly long path**: values longer than PATH_MAX (4096) are rejected as injection payloads rather than passed to ``pathlib``. 4. **Agent provenance fields (MUSE_AGENT_ID, MUSE_MODEL_ID, MUSE_TOOLCHAIN_ID, MUSE_PROMPT_HASH)**: the comment in ``commit.py`` said "prevent control-character-laden strings" but only the length cap (256 chars) was implemented, not control-character sanitization. ESC sequences in agent_id → stored in commit records → terminal injection when provenance is rendered in future display paths, agent dashboards, or log pipelines. 5. **Challenge nonce — CRLF injection**: a nonce containing ``\\r\\n`` would attempt to inject arbitrary HTTP headers. Python's ``http.client`` blocks the injection at the wire level (``ValueError: Invalid header value``), but now rejected at ingestion time so the error is surfaced as a clear diagnostic rather than a confusing transport exception. 6. **Challenge nonce — control characters, excessive length**: non-printable chars and pathologically long values (> 8192 chars) are now rejected by ``sanitize_token`` before reaching the HTTP stack. Fixes ----- - ``sanitize_provenance()`` added to ``muse.core.validation`` — strips all C0 (0x00–0x1F), DEL (0x7F), and C1 (0x80–0x9F) control characters. Applied to all four provenance fields in ``muse/cli/commands/commit.py``. - ``sanitize_token()`` added to ``muse.core.validation`` — strips whitespace, rejects control chars and values longer than 8192 chars. - ``find_repo_root()`` in ``muse/core/repo.py`` now explicitly ignores empty and whitespace-only ``MUSE_REPO_ROOT`` values, logs a debug message, and rejects values containing control characters or exceeding 4096 chars. """ from __future__ import annotations import os import pathlib import tempfile import pytest from muse.core.paths import muse_dir from muse.core.validation import sanitize_provenance, sanitize_token from muse.core.repo import find_repo_root # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_muse_dir(tmp_path: pathlib.Path) -> pathlib.Path: """Create a minimal .muse/ skeleton and return the repo root.""" muse_dir(tmp_path).mkdir() return tmp_path # =========================================================================== # sanitize_provenance — unit tests # =========================================================================== class TestSanitizeProvenance: """sanitize_provenance must strip all C0/DEL/C1 control characters.""" def test_clean_string_unchanged(self) -> None: assert sanitize_provenance("my-agent-v1") == "my-agent-v1" def test_empty_string(self) -> None: assert sanitize_provenance("") == "" def test_unicode_allowed(self) -> None: assert sanitize_provenance("agent-αβγ") == "agent-αβγ" def test_spaces_allowed(self) -> None: """Space (0x20) is not a control char and must be preserved.""" assert sanitize_provenance("my agent") == "my agent" @pytest.mark.parametrize("char,description", [ ("\x00", "NUL"), ("\x01", "SOH"), ("\x07", "BEL"), ("\x08", "BS"), ("\x09", "HT (tab)"), ("\x0a", "LF (newline)"), ("\x0b", "VT"), ("\x0c", "FF"), ("\x0d", "CR"), ("\x0e", "SO"), ("\x1b", "ESC — ANSI injection entry point"), ("\x1f", "US"), ("\x7f", "DEL"), ("\x80", "C1 PAD"), ("\x9b", "CSI — ANSI CSI sequence introducer"), ("\x9f", "C1 APC"), ]) def test_control_char_stripped(self, char: str, description: str) -> None: result = sanitize_provenance(f"prefix{char}suffix") assert char not in result assert "prefixsuffix" == result def test_esc_sequence_stripped(self) -> None: """Full ANSI colour sequence embedded in agent_id must be stripped.""" result = sanitize_provenance("\x1b[31mmalicious-agent\x1b[0m") assert "\x1b" not in result assert result == "[31mmalicious-agent[0m" def test_newline_stripped(self) -> None: """Newline in agent_id would split log lines — must be removed.""" result = sanitize_provenance("agent\nid\nsplitting") assert "\n" not in result assert result == "agentidsplitting" def test_crlf_stripped(self) -> None: result = sanitize_provenance("agent\r\nid") assert "\r" not in result assert "\n" not in result def test_bel_stripped(self) -> None: """BEL (0x07) causes terminal bell — must be stripped.""" result = sanitize_provenance("agent\x07id") assert "\x07" not in result def test_rtl_override_preserved(self) -> None: """U+202E is not a C0/C1 char; sanitize_provenance does not strip Unicode bidi.""" # Unicode bidi control characters are a separate concern handled by # rendering layers. sanitize_provenance only strips C0/DEL/C1. s = "agent\u202eid" result = sanitize_provenance(s) assert result == s def test_multiple_control_chars(self) -> None: payload = "\x1b[31m\x07\x00agent\x1b[0m" result = sanitize_provenance(payload) assert "\x1b" not in result assert "\x07" not in result assert "\x00" not in result assert "agent" in result def test_does_not_truncate(self) -> None: """sanitize_provenance does not enforce length — callers do [:256].""" long_s = "a" * 300 assert len(sanitize_provenance(long_s)) == 300 # =========================================================================== # sanitize_token — unit tests # =========================================================================== class TestSanitizeToken: """sanitize_token must strip whitespace and reject control chars / overlength.""" def test_valid_opaque_token(self) -> None: tok = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1c2VyIn0.abc123" result = sanitize_token(tok) assert result == tok def test_strips_leading_trailing_whitespace(self) -> None: result = sanitize_token(" mytoken ") assert result == "mytoken" def test_empty_string_returns_none(self) -> None: assert sanitize_token("") is None def test_whitespace_only_returns_none(self) -> None: assert sanitize_token(" \t\n ") is None def test_overlength_returns_none(self) -> None: assert sanitize_token("a" * 8193) is None def test_max_length_accepted(self) -> None: result = sanitize_token("a" * 8192) assert result is not None assert len(result) == 8192 @pytest.mark.parametrize("char,description", [ ("\r", "CR — HTTP header line terminator"), ("\n", "LF — HTTP header line terminator"), ("\r\n", "CRLF — HTTP header injection sequence"), ("\x00", "NUL"), ("\x01", "SOH"), ("\x1b", "ESC"), ("\x1f", "US"), ("\x7f", "DEL"), ]) def test_control_char_returns_none(self, char: str, description: str) -> None: result = sanitize_token(f"good_token{char}malicious") assert result is None def test_crlf_header_injection_blocked(self) -> None: """Classic HTTP header injection payload must be rejected.""" payload = "good_token\r\nX-Injected: pwned\r\nAuthorization: MSign attacker" assert sanitize_token(payload) is None def test_unicode_printable_allowed(self) -> None: """Unicode printable chars (e.g., in opaque tokens) must be accepted.""" tok = "token-αβγ-δεζ" result = sanitize_token(tok) assert result == tok def test_bare_api_key_format(self) -> None: tok = "sk-abc123XYZ_-." result = sanitize_token(tok) assert result == tok # =========================================================================== # find_repo_root — MUSE_REPO_ROOT hardening # =========================================================================== class TestFindRepoRootEnvHardening: """find_repo_root must safely handle all MUSE_REPO_ROOT attack payloads.""" def _with_env(self, key: str, value: str) -> None: os.environ[key] = value def _clear_env(self, key: str) -> None: os.environ.pop(key, None) def test_empty_string_ignored_falls_through(self, tmp_path: pathlib.Path) -> None: """Empty MUSE_REPO_ROOT must not redirect to cwd; falls through to walk.""" _make_muse_dir(tmp_path) old_cwd = os.getcwd() try: os.chdir(tmp_path) self._with_env("MUSE_REPO_ROOT", "") result = find_repo_root() # Should find the cwd repo, not crash or return None assert result is not None finally: self._clear_env("MUSE_REPO_ROOT") os.chdir(old_cwd) def test_whitespace_only_ignored(self, tmp_path: pathlib.Path) -> None: """Whitespace-only MUSE_REPO_ROOT must be ignored.""" _make_muse_dir(tmp_path) old_cwd = os.getcwd() try: os.chdir(tmp_path) self._with_env("MUSE_REPO_ROOT", " \t ") result = find_repo_root() # Falls through to walk — finds repo at tmp_path assert result is not None finally: self._clear_env("MUSE_REPO_ROOT") os.chdir(old_cwd) def test_control_char_in_path_returns_none(self) -> None: """MUSE_REPO_ROOT containing ESC must be rejected, not resolved.""" self._with_env("MUSE_REPO_ROOT", "/tmp/\x1b[31mattack") try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_nul_byte_in_path_returns_none(self) -> None: """MUSE_REPO_ROOT with embedded NUL (0x01 since 0x00 can't be in env) rejected.""" self._with_env("MUSE_REPO_ROOT", "/tmp/\x01attack") try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_path_max_exceeded_returns_none(self) -> None: """MUSE_REPO_ROOT longer than PATH_MAX (4096) must be rejected.""" self._with_env("MUSE_REPO_ROOT", f"/tmp/{'a' * 4092}") try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_valid_override_to_attacker_dir_with_muse( self, tmp_path: pathlib.Path ) -> None: """MUSE_REPO_ROOT pointing to a dir with real .muse/ is accepted (by design).""" _make_muse_dir(tmp_path) self._with_env("MUSE_REPO_ROOT", str(tmp_path)) try: result = find_repo_root() assert result is not None assert result.resolve() == tmp_path.resolve() finally: self._clear_env("MUSE_REPO_ROOT") def test_valid_override_to_dir_without_muse_returns_none( self, tmp_path: pathlib.Path ) -> None: """MUSE_REPO_ROOT pointing to a dir without .muse/ returns None.""" self._with_env("MUSE_REPO_ROOT", str(tmp_path)) try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_symlinked_muse_dir_rejected(self, tmp_path: pathlib.Path) -> None: """MUSE_REPO_ROOT pointing to a dir with a symlinked .muse/ returns None.""" real = tmp_path / "real" real.mkdir() muse_dir(real).mkdir() attacker = tmp_path / "attacker" attacker.mkdir() muse_dir(attacker).symlink_to(muse_dir(real)) self._with_env("MUSE_REPO_ROOT", str(attacker)) try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_path_traversal_resolved_safely(self, tmp_path: pathlib.Path) -> None: """MUSE_REPO_ROOT with ../../ is resolved by pathlib — no .muse/ means None.""" self._with_env("MUSE_REPO_ROOT", "/tmp/../../nonexistent") try: result = find_repo_root() # Either None (no .muse/ there) or a resolved path without .muse/ → None assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_nonexistent_path_returns_none(self) -> None: """MUSE_REPO_ROOT pointing to a non-existent path returns None.""" self._with_env("MUSE_REPO_ROOT", "/tmp/muse_definitely_does_not_exist_xyz") try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_devnull_returns_none(self) -> None: """/dev/null is not a directory with .muse/ — returns None.""" self._with_env("MUSE_REPO_ROOT", "/dev/null") try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") def test_filesystem_root_returns_none(self) -> None: """MUSE_REPO_ROOT=/ returns None (no .muse/ at /). Confirms no special behaviour.""" self._with_env("MUSE_REPO_ROOT", "/") try: result = find_repo_root() assert result is None finally: self._clear_env("MUSE_REPO_ROOT") # =========================================================================== # Agent provenance — end-to-end sanitization in stored records # =========================================================================== class TestProvenanceSanitizationEndToEnd: """After commit.py applies sanitize_provenance, commit records must be clean.""" def test_esc_in_agent_id_stripped(self) -> None: """ESC injection in MUSE_AGENT_ID must not survive into the stored value.""" raw = "\x1b[31mmalicias-agent\x1b[0m" clean = sanitize_provenance(raw[:256]) assert "\x1b" not in clean # Printable text is preserved assert "malicias-agent" in clean def test_newline_in_model_id_stripped(self) -> None: raw = "gpt-4\nX-Injected: pwned" clean = sanitize_provenance(raw[:256]) assert "\n" not in clean def test_tab_in_toolchain_id_stripped(self) -> None: raw = "cursor-agent\tv2" clean = sanitize_provenance(raw[:256]) assert "\t" not in clean def test_all_c0_chars_stripped_from_prompt_hash(self) -> None: for byte_val in range(0x00, 0x20): char = chr(byte_val) raw = f"hash{char}value" clean = sanitize_provenance(raw) assert char not in clean, f"Control char 0x{byte_val:02x} survived sanitize_provenance" def test_length_truncation_then_sanitize(self) -> None: """Simulate commit.py: truncate to _MAX_PROV then sanitize.""" _MAX_PROV = 256 payload = f"{'a' * 200}\x1b[31m{'b' * 100}" stored = sanitize_provenance(payload[:_MAX_PROV]) assert len(stored) <= _MAX_PROV assert "\x1b" not in stored def test_clean_agent_id_survives(self) -> None: raw = "counterpoint-bot-v2.1" assert sanitize_provenance(raw[:256]) == raw def test_unicode_agent_id_survives(self) -> None: raw = "agent-αβγ-2024" assert sanitize_provenance(raw[:256]) == raw def test_hyphen_underscore_dot_survive(self) -> None: raw = "cursor-agent_v2.1" assert sanitize_provenance(raw[:256]) == raw # =========================================================================== # sanitize_token — integration with identity.py resolve_token # =========================================================================== class TestSanitizeTokenIntegration: """sanitize_token must block CRLF and control chars before HTTP stack.""" def test_crlf_blocked_before_http_client(self) -> None: """A CRLF token must be caught by sanitize_token, not by http.client.""" payload = "good\r\nX-Injected: pwned" result = sanitize_token(payload) assert result is None def test_newline_only_blocked(self) -> None: assert sanitize_token("token\nmalicious") is None def test_cr_only_blocked(self) -> None: assert sanitize_token("token\rmalicious") is None def test_http_client_would_also_block_crlf(self) -> None: """Demonstrate that Python's http.client blocks CRLF at the wire level. This proves the http.client defence exists but our sanitize_token defence should fire first so the user gets a clear diagnostic. We call ``putrequest`` + ``putheader`` directly to trigger validation without opening a socket. """ import http.client payload = "good_token\r\nX-Injected: pwned" conn = http.client.HTTPConnection("example.com") conn.putrequest("GET", "/") with pytest.raises((ValueError, Exception)): conn.putheader("Authorization", f"MSign {payload}") # =========================================================================== # Concurrency — env var reads are snapshot-safe # =========================================================================== class TestConcurrentEnvVarReads: """Multiple threads calling sanitize_provenance/sanitize_token concurrently.""" def test_concurrent_sanitize_provenance(self) -> None: import threading results: list[str] = [] errors: list[str] = [] def worker(payload: str) -> None: try: results.append(sanitize_provenance(payload)) except Exception as exc: errors.append(str(exc)) payloads = [ f"agent-{i}\x1b[31m\x07\r\n" for i in range(20) ] threads = [threading.Thread(target=worker, args=(p,)) for p in payloads] for t in threads: t.start() for t in threads: t.join() assert errors == [] assert len(results) == 20 for r in results: assert "\x1b" not in r assert "\x07" not in r assert "\r" not in r assert "\n" not in r def test_concurrent_sanitize_token(self) -> None: import threading good: list[str] = [] bad: list[None] = [] def worker(tok: str) -> None: result = sanitize_token(tok) if result is None: bad.append(None) else: good.append(result) valid_tokens = [f"valid-token-{i}" for i in range(10)] invalid_tokens = [f"bad\r\ntoken-{i}" for i in range(10)] threads = [ threading.Thread(target=worker, args=(t,)) for t in valid_tokens + invalid_tokens ] for t in threads: t.start() for t in threads: t.join() assert len(good) == 10 assert len(bad) == 10 # =========================================================================== # Fuzzing — random payloads # =========================================================================== class TestFuzzedEnvVarPayloads: @pytest.mark.parametrize("seed", range(20)) def test_random_control_char_in_provenance_always_stripped(self, seed: int) -> None: import random rng = random.Random(seed) char = chr(rng.randint(0x00, 0x1F)) payload = f"prefix{char}suffix" result = sanitize_provenance(payload) assert char not in result @pytest.mark.parametrize("seed", range(10)) def test_random_crlf_token_always_rejected(self, seed: int) -> None: import random rng = random.Random(seed + 50) crlf = rng.choice(["\r\n", "\r", "\n"]) payload = f"token{crlf}malicious" assert sanitize_token(payload) is None @pytest.mark.parametrize("seed", range(5)) def test_random_overlength_token_rejected(self, seed: int) -> None: import random rng = random.Random(seed + 100) length = rng.randint(8193, 20000) payload = "a" * length assert sanitize_token(payload) is None