test_security_env_injection.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
| 1 | """Phase 2.7 — Environment variable injection security tests. |
| 2 | |
| 3 | Attack surface |
| 4 | -------------- |
| 5 | Muse reads six environment variables: |
| 6 | |
| 7 | MUSE_REPO_ROOT — overrides repository root discovery. |
| 8 | MUSE_AGENT_ID — agent provenance stored in commit records. |
| 9 | MUSE_MODEL_ID — model provenance stored in commit records. |
| 10 | MUSE_TOOLCHAIN_ID — toolchain provenance stored in commit records. |
| 11 | MUSE_PROMPT_HASH — prompt hash stored in commit records. |
| 12 | MUSE_TEST_ENV — passed through to CI gate subprocesses. |
| 13 | |
| 14 | Each variable represents a trust boundary: an attacker who can influence the |
| 15 | process environment (CI pipeline, shared-host user, container escape) can |
| 16 | inject crafted values. |
| 17 | |
| 18 | Attack vectors discovered via muse recon |
| 19 | ----------------------------------------- |
| 20 | 1. **MUSE_REPO_ROOT — empty/whitespace string**: ``pathlib.Path("").resolve()`` |
| 21 | returns the current working directory, so an empty override silently |
| 22 | behaves as if no override was set. Now explicitly ignored (falls through |
| 23 | to directory walk) with a debug log. |
| 24 | |
| 25 | 2. **MUSE_REPO_ROOT — control characters**: a path containing ESC or BEL |
| 26 | could not be a real filesystem path on any OS; now rejected to prevent |
| 27 | logging or display injection of the invalid value. |
| 28 | |
| 29 | 3. **MUSE_REPO_ROOT — overly long path**: values longer than PATH_MAX (4096) |
| 30 | are rejected as injection payloads rather than passed to ``pathlib``. |
| 31 | |
| 32 | 4. **Agent provenance fields (MUSE_AGENT_ID, MUSE_MODEL_ID, |
| 33 | MUSE_TOOLCHAIN_ID, MUSE_PROMPT_HASH)**: the comment in ``commit.py`` |
| 34 | said "prevent control-character-laden strings" but only the length cap |
| 35 | (256 chars) was implemented, not control-character sanitization. |
| 36 | ESC sequences in agent_id → stored in commit records → terminal injection |
| 37 | when provenance is rendered in future display paths, agent dashboards, |
| 38 | or log pipelines. |
| 39 | |
| 40 | 5. **Challenge nonce — CRLF injection**: a nonce containing ``\\r\\n`` would |
| 41 | attempt to inject arbitrary HTTP headers. Python's ``http.client`` blocks |
| 42 | the injection at the wire level (``ValueError: Invalid header value``), but |
| 43 | now rejected at ingestion time so the error is surfaced as a clear |
| 44 | diagnostic rather than a confusing transport exception. |
| 45 | |
| 46 | 6. **Challenge nonce — control characters, excessive length**: non-printable chars |
| 47 | and pathologically long values (> 8192 chars) are now rejected by |
| 48 | ``sanitize_token`` before reaching the HTTP stack. |
| 49 | |
| 50 | Fixes |
| 51 | ----- |
| 52 | - ``sanitize_provenance()`` added to ``muse.core.validation`` — strips all |
| 53 | C0 (0x00–0x1F), DEL (0x7F), and C1 (0x80–0x9F) control characters. |
| 54 | Applied to all four provenance fields in ``muse/cli/commands/commit.py``. |
| 55 | - ``sanitize_token()`` added to ``muse.core.validation`` — strips whitespace, |
| 56 | rejects control chars and values longer than 8192 chars. |
| 57 | - ``find_repo_root()`` in ``muse/core/repo.py`` now explicitly ignores empty |
| 58 | and whitespace-only ``MUSE_REPO_ROOT`` values, logs a debug message, and |
| 59 | rejects values containing control characters or exceeding 4096 chars. |
| 60 | """ |
| 61 | |
| 62 | from __future__ import annotations |
| 63 | |
| 64 | import os |
| 65 | import pathlib |
| 66 | import tempfile |
| 67 | |
| 68 | import pytest |
| 69 | |
| 70 | from muse.core.paths import muse_dir |
| 71 | from muse.core.validation import sanitize_provenance, sanitize_token |
| 72 | from muse.core.repo import find_repo_root |
| 73 | |
| 74 | |
| 75 | # --------------------------------------------------------------------------- |
| 76 | # Helpers |
| 77 | # --------------------------------------------------------------------------- |
| 78 | |
| 79 | def _make_muse_dir(tmp_path: pathlib.Path) -> pathlib.Path: |
| 80 | """Create a minimal .muse/ skeleton and return the repo root.""" |
| 81 | muse_dir(tmp_path).mkdir() |
| 82 | return tmp_path |
| 83 | |
| 84 | |
| 85 | # =========================================================================== |
| 86 | # sanitize_provenance — unit tests |
| 87 | # =========================================================================== |
| 88 | |
| 89 | |
| 90 | class TestSanitizeProvenance: |
| 91 | """sanitize_provenance must strip all C0/DEL/C1 control characters.""" |
| 92 | |
| 93 | def test_clean_string_unchanged(self) -> None: |
| 94 | assert sanitize_provenance("my-agent-v1") == "my-agent-v1" |
| 95 | |
| 96 | def test_empty_string(self) -> None: |
| 97 | assert sanitize_provenance("") == "" |
| 98 | |
| 99 | def test_unicode_allowed(self) -> None: |
| 100 | assert sanitize_provenance("agent-αβγ") == "agent-αβγ" |
| 101 | |
| 102 | def test_spaces_allowed(self) -> None: |
| 103 | """Space (0x20) is not a control char and must be preserved.""" |
| 104 | assert sanitize_provenance("my agent") == "my agent" |
| 105 | |
| 106 | @pytest.mark.parametrize("char,description", [ |
| 107 | ("\x00", "NUL"), |
| 108 | ("\x01", "SOH"), |
| 109 | ("\x07", "BEL"), |
| 110 | ("\x08", "BS"), |
| 111 | ("\x09", "HT (tab)"), |
| 112 | ("\x0a", "LF (newline)"), |
| 113 | ("\x0b", "VT"), |
| 114 | ("\x0c", "FF"), |
| 115 | ("\x0d", "CR"), |
| 116 | ("\x0e", "SO"), |
| 117 | ("\x1b", "ESC — ANSI injection entry point"), |
| 118 | ("\x1f", "US"), |
| 119 | ("\x7f", "DEL"), |
| 120 | ("\x80", "C1 PAD"), |
| 121 | ("\x9b", "CSI — ANSI CSI sequence introducer"), |
| 122 | ("\x9f", "C1 APC"), |
| 123 | ]) |
| 124 | def test_control_char_stripped(self, char: str, description: str) -> None: |
| 125 | result = sanitize_provenance(f"prefix{char}suffix") |
| 126 | assert char not in result |
| 127 | assert "prefixsuffix" == result |
| 128 | |
| 129 | def test_esc_sequence_stripped(self) -> None: |
| 130 | """Full ANSI colour sequence embedded in agent_id must be stripped.""" |
| 131 | result = sanitize_provenance("\x1b[31mmalicious-agent\x1b[0m") |
| 132 | assert "\x1b" not in result |
| 133 | assert result == "[31mmalicious-agent[0m" |
| 134 | |
| 135 | def test_newline_stripped(self) -> None: |
| 136 | """Newline in agent_id would split log lines — must be removed.""" |
| 137 | result = sanitize_provenance("agent\nid\nsplitting") |
| 138 | assert "\n" not in result |
| 139 | assert result == "agentidsplitting" |
| 140 | |
| 141 | def test_crlf_stripped(self) -> None: |
| 142 | result = sanitize_provenance("agent\r\nid") |
| 143 | assert "\r" not in result |
| 144 | assert "\n" not in result |
| 145 | |
| 146 | def test_bel_stripped(self) -> None: |
| 147 | """BEL (0x07) causes terminal bell — must be stripped.""" |
| 148 | result = sanitize_provenance("agent\x07id") |
| 149 | assert "\x07" not in result |
| 150 | |
| 151 | def test_rtl_override_preserved(self) -> None: |
| 152 | """U+202E is not a C0/C1 char; sanitize_provenance does not strip Unicode bidi.""" |
| 153 | # Unicode bidi control characters are a separate concern handled by |
| 154 | # rendering layers. sanitize_provenance only strips C0/DEL/C1. |
| 155 | s = "agent\u202eid" |
| 156 | result = sanitize_provenance(s) |
| 157 | assert result == s |
| 158 | |
| 159 | def test_multiple_control_chars(self) -> None: |
| 160 | payload = "\x1b[31m\x07\x00agent\x1b[0m" |
| 161 | result = sanitize_provenance(payload) |
| 162 | assert "\x1b" not in result |
| 163 | assert "\x07" not in result |
| 164 | assert "\x00" not in result |
| 165 | assert "agent" in result |
| 166 | |
| 167 | def test_does_not_truncate(self) -> None: |
| 168 | """sanitize_provenance does not enforce length — callers do [:256].""" |
| 169 | long_s = "a" * 300 |
| 170 | assert len(sanitize_provenance(long_s)) == 300 |
| 171 | |
| 172 | |
| 173 | # =========================================================================== |
| 174 | # sanitize_token — unit tests |
| 175 | # =========================================================================== |
| 176 | |
| 177 | |
| 178 | class TestSanitizeToken: |
| 179 | """sanitize_token must strip whitespace and reject control chars / overlength.""" |
| 180 | |
| 181 | def test_valid_opaque_token(self) -> None: |
| 182 | tok = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1c2VyIn0.abc123" |
| 183 | result = sanitize_token(tok) |
| 184 | assert result == tok |
| 185 | |
| 186 | def test_strips_leading_trailing_whitespace(self) -> None: |
| 187 | result = sanitize_token(" mytoken ") |
| 188 | assert result == "mytoken" |
| 189 | |
| 190 | def test_empty_string_returns_none(self) -> None: |
| 191 | assert sanitize_token("") is None |
| 192 | |
| 193 | def test_whitespace_only_returns_none(self) -> None: |
| 194 | assert sanitize_token(" \t\n ") is None |
| 195 | |
| 196 | def test_overlength_returns_none(self) -> None: |
| 197 | assert sanitize_token("a" * 8193) is None |
| 198 | |
| 199 | def test_max_length_accepted(self) -> None: |
| 200 | result = sanitize_token("a" * 8192) |
| 201 | assert result is not None |
| 202 | assert len(result) == 8192 |
| 203 | |
| 204 | @pytest.mark.parametrize("char,description", [ |
| 205 | ("\r", "CR — HTTP header line terminator"), |
| 206 | ("\n", "LF — HTTP header line terminator"), |
| 207 | ("\r\n", "CRLF — HTTP header injection sequence"), |
| 208 | ("\x00", "NUL"), |
| 209 | ("\x01", "SOH"), |
| 210 | ("\x1b", "ESC"), |
| 211 | ("\x1f", "US"), |
| 212 | ("\x7f", "DEL"), |
| 213 | ]) |
| 214 | def test_control_char_returns_none(self, char: str, description: str) -> None: |
| 215 | result = sanitize_token(f"good_token{char}malicious") |
| 216 | assert result is None |
| 217 | |
| 218 | def test_crlf_header_injection_blocked(self) -> None: |
| 219 | """Classic HTTP header injection payload must be rejected.""" |
| 220 | payload = "good_token\r\nX-Injected: pwned\r\nAuthorization: MSign attacker" |
| 221 | assert sanitize_token(payload) is None |
| 222 | |
| 223 | def test_unicode_printable_allowed(self) -> None: |
| 224 | """Unicode printable chars (e.g., in opaque tokens) must be accepted.""" |
| 225 | tok = "token-αβγ-δεζ" |
| 226 | result = sanitize_token(tok) |
| 227 | assert result == tok |
| 228 | |
| 229 | def test_bare_api_key_format(self) -> None: |
| 230 | tok = "sk-abc123XYZ_-." |
| 231 | result = sanitize_token(tok) |
| 232 | assert result == tok |
| 233 | |
| 234 | |
| 235 | # =========================================================================== |
| 236 | # find_repo_root — MUSE_REPO_ROOT hardening |
| 237 | # =========================================================================== |
| 238 | |
| 239 | |
| 240 | class TestFindRepoRootEnvHardening: |
| 241 | """find_repo_root must safely handle all MUSE_REPO_ROOT attack payloads.""" |
| 242 | |
| 243 | def _with_env(self, key: str, value: str) -> None: |
| 244 | os.environ[key] = value |
| 245 | |
| 246 | def _clear_env(self, key: str) -> None: |
| 247 | os.environ.pop(key, None) |
| 248 | |
| 249 | def test_empty_string_ignored_falls_through(self, tmp_path: pathlib.Path) -> None: |
| 250 | """Empty MUSE_REPO_ROOT must not redirect to cwd; falls through to walk.""" |
| 251 | _make_muse_dir(tmp_path) |
| 252 | old_cwd = os.getcwd() |
| 253 | try: |
| 254 | os.chdir(tmp_path) |
| 255 | self._with_env("MUSE_REPO_ROOT", "") |
| 256 | result = find_repo_root() |
| 257 | # Should find the cwd repo, not crash or return None |
| 258 | assert result is not None |
| 259 | finally: |
| 260 | self._clear_env("MUSE_REPO_ROOT") |
| 261 | os.chdir(old_cwd) |
| 262 | |
| 263 | def test_whitespace_only_ignored(self, tmp_path: pathlib.Path) -> None: |
| 264 | """Whitespace-only MUSE_REPO_ROOT must be ignored.""" |
| 265 | _make_muse_dir(tmp_path) |
| 266 | old_cwd = os.getcwd() |
| 267 | try: |
| 268 | os.chdir(tmp_path) |
| 269 | self._with_env("MUSE_REPO_ROOT", " \t ") |
| 270 | result = find_repo_root() |
| 271 | # Falls through to walk — finds repo at tmp_path |
| 272 | assert result is not None |
| 273 | finally: |
| 274 | self._clear_env("MUSE_REPO_ROOT") |
| 275 | os.chdir(old_cwd) |
| 276 | |
| 277 | def test_control_char_in_path_returns_none(self) -> None: |
| 278 | """MUSE_REPO_ROOT containing ESC must be rejected, not resolved.""" |
| 279 | self._with_env("MUSE_REPO_ROOT", "/tmp/\x1b[31mattack") |
| 280 | try: |
| 281 | result = find_repo_root() |
| 282 | assert result is None |
| 283 | finally: |
| 284 | self._clear_env("MUSE_REPO_ROOT") |
| 285 | |
| 286 | def test_nul_byte_in_path_returns_none(self) -> None: |
| 287 | """MUSE_REPO_ROOT with embedded NUL (0x01 since 0x00 can't be in env) rejected.""" |
| 288 | self._with_env("MUSE_REPO_ROOT", "/tmp/\x01attack") |
| 289 | try: |
| 290 | result = find_repo_root() |
| 291 | assert result is None |
| 292 | finally: |
| 293 | self._clear_env("MUSE_REPO_ROOT") |
| 294 | |
| 295 | def test_path_max_exceeded_returns_none(self) -> None: |
| 296 | """MUSE_REPO_ROOT longer than PATH_MAX (4096) must be rejected.""" |
| 297 | self._with_env("MUSE_REPO_ROOT", f"/tmp/{'a' * 4092}") |
| 298 | try: |
| 299 | result = find_repo_root() |
| 300 | assert result is None |
| 301 | finally: |
| 302 | self._clear_env("MUSE_REPO_ROOT") |
| 303 | |
| 304 | def test_valid_override_to_attacker_dir_with_muse( |
| 305 | self, tmp_path: pathlib.Path |
| 306 | ) -> None: |
| 307 | """MUSE_REPO_ROOT pointing to a dir with real .muse/ is accepted (by design).""" |
| 308 | _make_muse_dir(tmp_path) |
| 309 | self._with_env("MUSE_REPO_ROOT", str(tmp_path)) |
| 310 | try: |
| 311 | result = find_repo_root() |
| 312 | assert result is not None |
| 313 | assert result.resolve() == tmp_path.resolve() |
| 314 | finally: |
| 315 | self._clear_env("MUSE_REPO_ROOT") |
| 316 | |
| 317 | def test_valid_override_to_dir_without_muse_returns_none( |
| 318 | self, tmp_path: pathlib.Path |
| 319 | ) -> None: |
| 320 | """MUSE_REPO_ROOT pointing to a dir without .muse/ returns None.""" |
| 321 | self._with_env("MUSE_REPO_ROOT", str(tmp_path)) |
| 322 | try: |
| 323 | result = find_repo_root() |
| 324 | assert result is None |
| 325 | finally: |
| 326 | self._clear_env("MUSE_REPO_ROOT") |
| 327 | |
| 328 | def test_symlinked_muse_dir_rejected(self, tmp_path: pathlib.Path) -> None: |
| 329 | """MUSE_REPO_ROOT pointing to a dir with a symlinked .muse/ returns None.""" |
| 330 | real = tmp_path / "real" |
| 331 | real.mkdir() |
| 332 | muse_dir(real).mkdir() |
| 333 | attacker = tmp_path / "attacker" |
| 334 | attacker.mkdir() |
| 335 | muse_dir(attacker).symlink_to(muse_dir(real)) |
| 336 | |
| 337 | self._with_env("MUSE_REPO_ROOT", str(attacker)) |
| 338 | try: |
| 339 | result = find_repo_root() |
| 340 | assert result is None |
| 341 | finally: |
| 342 | self._clear_env("MUSE_REPO_ROOT") |
| 343 | |
| 344 | def test_path_traversal_resolved_safely(self, tmp_path: pathlib.Path) -> None: |
| 345 | """MUSE_REPO_ROOT with ../../ is resolved by pathlib — no .muse/ means None.""" |
| 346 | self._with_env("MUSE_REPO_ROOT", "/tmp/../../nonexistent") |
| 347 | try: |
| 348 | result = find_repo_root() |
| 349 | # Either None (no .muse/ there) or a resolved path without .muse/ → None |
| 350 | assert result is None |
| 351 | finally: |
| 352 | self._clear_env("MUSE_REPO_ROOT") |
| 353 | |
| 354 | def test_nonexistent_path_returns_none(self) -> None: |
| 355 | """MUSE_REPO_ROOT pointing to a non-existent path returns None.""" |
| 356 | self._with_env("MUSE_REPO_ROOT", "/tmp/muse_definitely_does_not_exist_xyz") |
| 357 | try: |
| 358 | result = find_repo_root() |
| 359 | assert result is None |
| 360 | finally: |
| 361 | self._clear_env("MUSE_REPO_ROOT") |
| 362 | |
| 363 | def test_devnull_returns_none(self) -> None: |
| 364 | """/dev/null is not a directory with .muse/ — returns None.""" |
| 365 | self._with_env("MUSE_REPO_ROOT", "/dev/null") |
| 366 | try: |
| 367 | result = find_repo_root() |
| 368 | assert result is None |
| 369 | finally: |
| 370 | self._clear_env("MUSE_REPO_ROOT") |
| 371 | |
| 372 | def test_filesystem_root_returns_none(self) -> None: |
| 373 | """MUSE_REPO_ROOT=/ returns None (no .muse/ at /). Confirms no special behaviour.""" |
| 374 | self._with_env("MUSE_REPO_ROOT", "/") |
| 375 | try: |
| 376 | result = find_repo_root() |
| 377 | assert result is None |
| 378 | finally: |
| 379 | self._clear_env("MUSE_REPO_ROOT") |
| 380 | |
| 381 | |
| 382 | # =========================================================================== |
| 383 | # Agent provenance — end-to-end sanitization in stored records |
| 384 | # =========================================================================== |
| 385 | |
| 386 | |
| 387 | class TestProvenanceSanitizationEndToEnd: |
| 388 | """After commit.py applies sanitize_provenance, commit records must be clean.""" |
| 389 | |
| 390 | def test_esc_in_agent_id_stripped(self) -> None: |
| 391 | """ESC injection in MUSE_AGENT_ID must not survive into the stored value.""" |
| 392 | raw = "\x1b[31mmalicias-agent\x1b[0m" |
| 393 | clean = sanitize_provenance(raw[:256]) |
| 394 | assert "\x1b" not in clean |
| 395 | # Printable text is preserved |
| 396 | assert "malicias-agent" in clean |
| 397 | |
| 398 | def test_newline_in_model_id_stripped(self) -> None: |
| 399 | raw = "gpt-4\nX-Injected: pwned" |
| 400 | clean = sanitize_provenance(raw[:256]) |
| 401 | assert "\n" not in clean |
| 402 | |
| 403 | def test_tab_in_toolchain_id_stripped(self) -> None: |
| 404 | raw = "cursor-agent\tv2" |
| 405 | clean = sanitize_provenance(raw[:256]) |
| 406 | assert "\t" not in clean |
| 407 | |
| 408 | def test_all_c0_chars_stripped_from_prompt_hash(self) -> None: |
| 409 | for byte_val in range(0x00, 0x20): |
| 410 | char = chr(byte_val) |
| 411 | raw = f"hash{char}value" |
| 412 | clean = sanitize_provenance(raw) |
| 413 | assert char not in clean, f"Control char 0x{byte_val:02x} survived sanitize_provenance" |
| 414 | |
| 415 | def test_length_truncation_then_sanitize(self) -> None: |
| 416 | """Simulate commit.py: truncate to _MAX_PROV then sanitize.""" |
| 417 | _MAX_PROV = 256 |
| 418 | payload = f"{'a' * 200}\x1b[31m{'b' * 100}" |
| 419 | stored = sanitize_provenance(payload[:_MAX_PROV]) |
| 420 | assert len(stored) <= _MAX_PROV |
| 421 | assert "\x1b" not in stored |
| 422 | |
| 423 | def test_clean_agent_id_survives(self) -> None: |
| 424 | raw = "counterpoint-bot-v2.1" |
| 425 | assert sanitize_provenance(raw[:256]) == raw |
| 426 | |
| 427 | def test_unicode_agent_id_survives(self) -> None: |
| 428 | raw = "agent-αβγ-2024" |
| 429 | assert sanitize_provenance(raw[:256]) == raw |
| 430 | |
| 431 | def test_hyphen_underscore_dot_survive(self) -> None: |
| 432 | raw = "cursor-agent_v2.1" |
| 433 | assert sanitize_provenance(raw[:256]) == raw |
| 434 | |
| 435 | |
| 436 | # =========================================================================== |
| 437 | # sanitize_token — integration with identity.py resolve_token |
| 438 | # =========================================================================== |
| 439 | |
| 440 | |
| 441 | class TestSanitizeTokenIntegration: |
| 442 | """sanitize_token must block CRLF and control chars before HTTP stack.""" |
| 443 | |
| 444 | def test_crlf_blocked_before_http_client(self) -> None: |
| 445 | """A CRLF token must be caught by sanitize_token, not by http.client.""" |
| 446 | payload = "good\r\nX-Injected: pwned" |
| 447 | result = sanitize_token(payload) |
| 448 | assert result is None |
| 449 | |
| 450 | def test_newline_only_blocked(self) -> None: |
| 451 | assert sanitize_token("token\nmalicious") is None |
| 452 | |
| 453 | def test_cr_only_blocked(self) -> None: |
| 454 | assert sanitize_token("token\rmalicious") is None |
| 455 | |
| 456 | def test_http_client_would_also_block_crlf(self) -> None: |
| 457 | """Demonstrate that Python's http.client blocks CRLF at the wire level. |
| 458 | |
| 459 | This proves the http.client defence exists but our sanitize_token defence |
| 460 | should fire first so the user gets a clear diagnostic. We call |
| 461 | ``putrequest`` + ``putheader`` directly to trigger validation without |
| 462 | opening a socket. |
| 463 | """ |
| 464 | import http.client |
| 465 | |
| 466 | payload = "good_token\r\nX-Injected: pwned" |
| 467 | conn = http.client.HTTPConnection("example.com") |
| 468 | conn.putrequest("GET", "/") |
| 469 | with pytest.raises((ValueError, Exception)): |
| 470 | conn.putheader("Authorization", f"MSign {payload}") |
| 471 | |
| 472 | |
| 473 | # =========================================================================== |
| 474 | # Concurrency — env var reads are snapshot-safe |
| 475 | # =========================================================================== |
| 476 | |
| 477 | |
| 478 | class TestConcurrentEnvVarReads: |
| 479 | """Multiple threads calling sanitize_provenance/sanitize_token concurrently.""" |
| 480 | |
| 481 | def test_concurrent_sanitize_provenance(self) -> None: |
| 482 | import threading |
| 483 | |
| 484 | results: list[str] = [] |
| 485 | errors: list[str] = [] |
| 486 | |
| 487 | def worker(payload: str) -> None: |
| 488 | try: |
| 489 | results.append(sanitize_provenance(payload)) |
| 490 | except Exception as exc: |
| 491 | errors.append(str(exc)) |
| 492 | |
| 493 | payloads = [ |
| 494 | f"agent-{i}\x1b[31m\x07\r\n" for i in range(20) |
| 495 | ] |
| 496 | threads = [threading.Thread(target=worker, args=(p,)) for p in payloads] |
| 497 | for t in threads: |
| 498 | t.start() |
| 499 | for t in threads: |
| 500 | t.join() |
| 501 | |
| 502 | assert errors == [] |
| 503 | assert len(results) == 20 |
| 504 | for r in results: |
| 505 | assert "\x1b" not in r |
| 506 | assert "\x07" not in r |
| 507 | assert "\r" not in r |
| 508 | assert "\n" not in r |
| 509 | |
| 510 | def test_concurrent_sanitize_token(self) -> None: |
| 511 | import threading |
| 512 | |
| 513 | good: list[str] = [] |
| 514 | bad: list[None] = [] |
| 515 | |
| 516 | def worker(tok: str) -> None: |
| 517 | result = sanitize_token(tok) |
| 518 | if result is None: |
| 519 | bad.append(None) |
| 520 | else: |
| 521 | good.append(result) |
| 522 | |
| 523 | valid_tokens = [f"valid-token-{i}" for i in range(10)] |
| 524 | invalid_tokens = [f"bad\r\ntoken-{i}" for i in range(10)] |
| 525 | |
| 526 | threads = [ |
| 527 | threading.Thread(target=worker, args=(t,)) |
| 528 | for t in valid_tokens + invalid_tokens |
| 529 | ] |
| 530 | for t in threads: |
| 531 | t.start() |
| 532 | for t in threads: |
| 533 | t.join() |
| 534 | |
| 535 | assert len(good) == 10 |
| 536 | assert len(bad) == 10 |
| 537 | |
| 538 | |
| 539 | # =========================================================================== |
| 540 | # Fuzzing — random payloads |
| 541 | # =========================================================================== |
| 542 | |
| 543 | |
| 544 | class TestFuzzedEnvVarPayloads: |
| 545 | |
| 546 | @pytest.mark.parametrize("seed", range(20)) |
| 547 | def test_random_control_char_in_provenance_always_stripped(self, seed: int) -> None: |
| 548 | import random |
| 549 | rng = random.Random(seed) |
| 550 | char = chr(rng.randint(0x00, 0x1F)) |
| 551 | payload = f"prefix{char}suffix" |
| 552 | result = sanitize_provenance(payload) |
| 553 | assert char not in result |
| 554 | |
| 555 | @pytest.mark.parametrize("seed", range(10)) |
| 556 | def test_random_crlf_token_always_rejected(self, seed: int) -> None: |
| 557 | import random |
| 558 | rng = random.Random(seed + 50) |
| 559 | crlf = rng.choice(["\r\n", "\r", "\n"]) |
| 560 | payload = f"token{crlf}malicious" |
| 561 | assert sanitize_token(payload) is None |
| 562 | |
| 563 | @pytest.mark.parametrize("seed", range(5)) |
| 564 | def test_random_overlength_token_rejected(self, seed: int) -> None: |
| 565 | import random |
| 566 | rng = random.Random(seed + 100) |
| 567 | length = rng.randint(8193, 20000) |
| 568 | payload = "a" * length |
| 569 | assert sanitize_token(payload) is None |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
29 days ago