"""EXTREME security test suite for the ``muse coord`` subcommand layer. Scenario: Linus Torvalds has migrated the Linux kernel to Muse. 50 AI agents coordinate continuously via ``muse coord sync push/pull``. A compromised hub server, a rogue agent, a man-in-the-middle, and a malicious repo all try simultaneously to: 1. Read local files via SSRF through a ``file://`` hub URL. 2. Redirect auth tokens to an attacker-controlled server. 3. Inject HTTP headers by encoding CRLF into the signing credential. 4. Break out of the ``remote/`` coordination directory via traversal in ``kind`` or ``record_id`` fields returned by the hub. 5. Corrupt the terminal with ANSI/OSC/BEL/CSI escape sequences injected into owner, slug, or error payloads. 6. Exfiltrate the signing credential through error messages, logs, or JSON output. 7. Crash the coord layer with adversarial response bodies. 8. Bypass input validation on ``--owner``, ``--slug``, ``--since-id``, ``--limit``, and GC parameters. Coverage matrix --------------- SSRF via hub_url (scheme injection) * ``file://`` hub URL raises CoordBusError — file content NOT in error * ``file://`` hub URL pointing to valid JSON raises CoordBusError cleanly * Error message from ``file://`` attempt does not include file content * Adversarial hub_url with embedded path components normalizes correctly * Multiple trailing slashes in hub_url are stripped, correct URL built * IPv6 localhost ``[::1]`` hub URL builds correct URL structure * hub_url with user-info ``user:pass@host`` — token NOT duplicated in URL * Data URI hub_url raises CoordBusError Redirect blocking (credential leakage prevention) * push_to_hub with 301 response raises CoordBusError "Redirect refused" * push_to_hub with 302 response raises CoordBusError * push_to_hub with 307 response raises CoordBusError * push_to_hub with 308 response raises CoordBusError * pull_from_hub with 301 raises CoordBusError * Redirect error message contains destination URL (for debugging) * Redirect does NOT silently follow — auth token never sent to new host HTTP header injection via token * Token with ``\r\n`` rejected by sanitize_token (CRLF injection) * Token with ``\r`` alone rejected * Token with ``\n`` alone rejected * Token with null byte rejected * Token with C0 control char (BEL ``\x07``) rejected * Token at exactly MAX length (8 192 chars) accepted * Token one char over MAX length rejected (None returned) * Token with only whitespace returns None (empty after strip) URL structure: owner/slug encoding prevents injection * ``owner="user@host"`` → ``%40`` in URL, never parsed as user-info * ``owner="#fragment"`` → ``%23`` in URL, never parsed as fragment * ``owner="?q=1"`` → ``%3F`` in URL, never parsed as query string * ``owner="a/b"`` → ``%2F`` in URL, owner cannot span path segments * ``owner="../../etc"`` → slashes %-encoded, cannot escape path * ``slug="../../admin"`` → slashes %-encoded * Newline in owner/slug → %-encoded, cannot split HTTP request line * owner+slug encoded chars never re-appear as literal / in URL path Error body leakage prevention * HTTP 403 body: included in error but truncated to ≤ 200 chars * HTTP 500 body: included but ANSI escape codes stripped * HTTP error body with C1 control chars (``\x80``–``\x9f``) stripped * Network error reason string: ANSI stripped, capped at 200 chars * 401 body: COMPLETELY masked regardless of content * 401 error message reveals no body even when body is valid JSON * Very long error body: truncated, not fully included in CoordBusError * Zero-byte error body: produces clean "HTTP " message Terminal injection: ANSI/OSC/BEL/CSI in user-controlled strings * OSC escape ``\\x1b]0;title\\x07`` in --owner stripped in text push output * BEL ``\\x07`` in --slug stripped in text push output * C1 CSI ``\\x9b[2J`` in --owner stripped in text pull output * Full ANSI clear-screen ``\\x1b[H\\x1b[J`` in --slug stripped in text output * ``\\x1b[31m`` in --owner stripped, no red text bleeds into terminal * All control chars in --owner stripped: no ESC anywhere in output Token leakage: signing credential must never appear in any output or log * Token NOT in push JSON success output * Token NOT in pull JSON success output * Token NOT in push JSON error output (CoordBusError) * Token NOT in pull JSON error output * Token NOT in push text success output * Token NOT in pull text success output * Token NOT in any Python log record emitted during push * Token NOT in CoordBusError string representation Malicious hub response: adversarial records must not escape ``remote/`` * kind = ``"../../../.ssh"`` → rejected (not in _ALL_KINDS), no write * kind = ``""`` → rejected, no write * kind = ``"RESERVATION"`` (wrong case) → rejected * record_id = ``"../../.ssh/authorized_keys"`` → rejected by regex * record_id = ``"foo\\x00bar"`` (null byte) → rejected * record_id = 129-char string → rejected (> 128 max) * record_id = ``"foo bar"`` (space) → rejected * record_id = ``"foo\\nbar"`` (newline) → rejected * Payload containing ``{"__import__": "os"}`` safely serialized, not executed * 1 000-record response with all adversarial record_ids → 0 files written GC input validation * --grace-period -1 → exit 1 before any file I/O * --grace-period 0 → accepted (valid: no grace window) * --max-intent-age 0 → exit 1 * --max-intent-age -1 → exit 1 * --max-intent-age MAX_INT → accepted * GC dry-run mode never deletes any file even with collectable records * GC verbose output with record_ids is clean (no ANSI injection vector) CLI input validation hardening * --owner at exactly _MAX_OWNER_LEN → accepted * --owner at _MAX_OWNER_LEN + 1 → exit 1, no file I/O * --slug at exactly _MAX_SLUG_LEN → accepted * --slug at _MAX_SLUG_LEN + 1 → exit 1, no file I/O * --since-id at MAX Python int (sys.maxsize) → accepted (no overflow) * --limit = 0 → exit 1 * --limit = _MAX_PULL_LIMIT → accepted * --limit = _MAX_PULL_LIMIT + 1 → exit 1 """ from __future__ import annotations import argparse import json import logging import pathlib import sys import itertools from io import BytesIO from unittest.mock import MagicMock, patch import pytest import urllib.error from tests.cli_test_helper import CliRunner from muse.core.types import content_hash, MsgpackDict from muse.core.coord_bus import ( CoordBusError, MAX_PUSH_BATCH, MAX_PULL_LIMIT, _build_url, _NoRedirectHandler, push_to_hub, pull_from_hub, ) from muse.core.validation import sanitize_token, _MAX_TOKEN_LEN from muse.cli.commands.coord_sync import ( _ALL_KINDS, _MAX_OWNER_LEN, _MAX_SLUG_LEN, _MAX_PULL_LIMIT, _SAFE_RECORD_ID_RE, _write_remote_records, ) from muse.core.paths import coordination_dir, muse_dir runner = CliRunner() cli = None _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) # --------------------------------------------------------------------------- # Patch targets # --------------------------------------------------------------------------- _OPENER = "muse.core.coord_bus._STRICT_OPENER.open" _PUSH_TARGET = "muse.cli.commands.coord_sync.push_to_hub" _PULL_TARGET = "muse.cli.commands.coord_sync.pull_from_hub" _REQUIRE_REPO = "muse.cli.commands.coord_sync.require_repo" _RESOLVE_HUB = "muse.cli.commands.coord_sync._resolve_hub_and_signing" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _SECRET = "NEVER-LEAK-THIS-TOKEN-3k9mXpQz7vR4wY" def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() return tmp_path @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: return _make_repo(tmp_path) def _mock_response(body: MsgpackDict) -> MagicMock: raw = json.dumps(body).encode("utf-8") resp = MagicMock() resp.read.return_value = raw resp.__enter__ = lambda s: s resp.__exit__ = MagicMock(return_value=False) return resp def _make_record(kind: str = "reservation") -> MsgpackDict: return { "kind": kind, "record_id": _new_id(), "run_id": "agent-linux", "payload": {"note": "kernel coord"}, } def _push_args(owner: str = "gabriel", slug: str = "linux") -> list[str]: return [ "coord", "sync", "push", "--hub", "https://localhost:1337", "--owner", owner, "--slug", slug, ] def _pull_args(owner: str = "gabriel", slug: str = "linux") -> list[str]: return [ "coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", owner, "--slug", slug, "--since-id", "0", ] # =========================================================================== # 1. SSRF VIA SCHEME INJECTION # =========================================================================== class TestCoordSecuritySSRF: """A compromised user or rogue config supplies a non-HTTP hub URL.""" def test_file_scheme_raises_coord_bus_error(self, tmp_path: pathlib.Path) -> None: """``file://`` hub URL must raise CoordBusError, not silently succeed.""" json_file = tmp_path / "data.json" json_file.write_text('{"inserted": 999, "skipped": 0}', encoding="utf-8") records = [_make_record()] with pytest.raises(CoordBusError): push_to_hub(f"file://{tmp_path}", "owner", "repo", records, signing=None) def test_file_scheme_error_does_not_expose_file_content( self, tmp_path: pathlib.Path ) -> None: """File content must not be present in any CoordBusError message.""" secret_data = "SECRET_FILE_CONTENT_XYZZY_NEVER_EXPOSE" json_file = tmp_path / "secret.json" json_file.write_text(f'"{secret_data}"', encoding="utf-8") # valid JSON string records = [_make_record()] try: push_to_hub(f"file://{tmp_path}", "owner", "repo", records, signing=None) except CoordBusError as exc: assert secret_data not in str(exc), "File content leaked in error message!" except Exception: pass # Any exception is acceptable — the point is no content leak def test_file_scheme_valid_json_target_still_raises_coord_bus_error( self, tmp_path: pathlib.Path ) -> None: """Even if file:// target is valid JSON, the call must raise CoordBusError because the URL is not a valid HTTP endpoint.""" # A real .muse/config.json style file — valid JSON but wrong endpoint data_file = tmp_path / "valid.json" data_file.write_text('{"inserted": 5, "skipped": 0}', encoding="utf-8") records = [_make_record()] with pytest.raises(CoordBusError): # The path needs to point to the file itself for file:// to work push_to_hub(f"file://{tmp_path}", "owner", "repo", records, signing=None) def test_hub_url_multiple_trailing_slashes_normalized(self) -> None: """Multiple trailing slashes are stripped; URL structure is correct.""" url = _build_url("https://localhost:1337///", "gabriel", "linux", "coord/push") assert not url.endswith("///coord/push") assert url.endswith("/coord/push") assert url.count("///") == 0 def test_hub_url_with_path_components_does_not_inject_segments(self) -> None: """hub_url with embedded path components must not inject extra segments.""" url = _build_url( "https://localhost:1337/api/v1", "gabriel", "linux", "coord/push" ) # Owner and slug must appear in the correct positions assert "/gabriel/linux/coord/push" in url def test_ipv6_localhost_hub_url_builds_correctly(self) -> None: """IPv6 loopback address must produce a well-formed URL.""" url = _build_url("http://[::1]:10003", "gabriel", "linux", "coord/push") assert url.startswith("http://[::1]:10003/") assert url.endswith("coord/push") def test_push_url_never_contains_token(self) -> None: records = [_make_record()] with patch(_OPENER) as mock_open: mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0}) push_to_hub("http://hub", "gabriel", "repo", records, signing=None) req = mock_open.call_args[0][0] assert _SECRET not in req.full_url def test_pull_url_never_contains_token(self) -> None: with patch(_OPENER) as mock_open: mock_open.return_value = _mock_response({"records": [], "cursor": 0}) pull_from_hub("http://hub", "gabriel", "repo", signing=None) req = mock_open.call_args[0][0] assert _SECRET not in req.full_url # =========================================================================== # 2. REDIRECT BLOCKING (CREDENTIAL LEAKAGE PREVENTION) # =========================================================================== class TestCoordSecurityRedirect: """A MITM or compromised hub issues redirects to steal the auth token.""" def _make_redirect_error(self, code: int, newurl: str = "http://attacker.example.com/steal") -> urllib.error.HTTPError: headers = MagicMock() fp = BytesIO(b"") return urllib.error.HTTPError( "http://hub/coord/push", code, f"Redirect {code}", headers, fp, ) def test_push_301_redirect_raises_coord_bus_error(self) -> None: """301 Moved Permanently must raise CoordBusError, not follow.""" # Simulate _NoRedirectHandler raising HTTPError on redirect handler = _NoRedirectHandler() req = MagicMock() req.full_url = "http://hub/coord/push" fp = MagicMock() headers = MagicMock() with pytest.raises(urllib.error.HTTPError) as exc_info: handler.redirect_request( req, fp, 301, "Moved Permanently", headers, "http://attacker.example.com/steal" ) assert "Redirect refused" in str(exc_info.value.msg) def test_push_302_redirect_raises(self) -> None: handler = _NoRedirectHandler() req = MagicMock() req.full_url = "http://hub/coord/push" fp = MagicMock() headers = MagicMock() with pytest.raises(urllib.error.HTTPError): handler.redirect_request(req, fp, 302, "Found", headers, "http://attacker.example.com/") def test_push_307_redirect_raises(self) -> None: handler = _NoRedirectHandler() req = MagicMock() req.full_url = "http://hub/coord/push" fp = MagicMock() headers = MagicMock() with pytest.raises(urllib.error.HTTPError): handler.redirect_request(req, fp, 307, "Temporary Redirect", headers, "http://attacker.example.com/") def test_push_308_redirect_raises(self) -> None: handler = _NoRedirectHandler() req = MagicMock() req.full_url = "http://hub/coord/push" fp = MagicMock() headers = MagicMock() with pytest.raises(urllib.error.HTTPError): handler.redirect_request(req, fp, 308, "Permanent Redirect", headers, "http://attacker.example.com/") def test_full_push_to_hub_301_raises_coord_bus_error(self) -> None: """End-to-end: push_to_hub with a 301 response raises CoordBusError.""" redirect_err = urllib.error.HTTPError( "http://hub/coord/push", 301, "Redirect refused (301): server tried to redirect to 'http://attacker.example.com/'. Update the configured remote URL to the final destination.", MagicMock(), BytesIO(b""), ) records = [_make_record()] with patch(_OPENER, side_effect=redirect_err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "gabriel", "repo", records, signing=None) # Must surface as CoordBusError, not silently succeed assert exc_info.value.status_code == 301 def test_full_pull_from_hub_301_raises_coord_bus_error(self) -> None: redirect_err = urllib.error.HTTPError( "http://hub/coord/pull", 301, "Redirect refused (301): server tried to redirect to 'http://attacker.example.com/'. Update the configured remote URL to the final destination.", MagicMock(), BytesIO(b""), ) with patch(_OPENER, side_effect=redirect_err): with pytest.raises(CoordBusError) as exc_info: pull_from_hub("http://hub", "gabriel", "repo", signing=None) assert exc_info.value.status_code == 301 def test_redirect_error_contains_refused_message(self) -> None: """Error message must say 'Redirect refused' so the user understands why.""" handler = _NoRedirectHandler() req = MagicMock() req.full_url = "http://hub/coord/push" with pytest.raises(urllib.error.HTTPError) as exc_info: handler.redirect_request( req, MagicMock(), 302, "Found", MagicMock(), "http://attacker.example.com/" ) assert "Redirect refused" in exc_info.value.msg # =========================================================================== # 3. HTTP HEADER INJECTION VIA TOKEN (CRLF) # =========================================================================== class TestCoordSecurityHeaderInjection: """An attacker tries to inject extra HTTP headers by poisoning the token.""" def test_token_with_crlf_rejected(self) -> None: """CRLF in token → HTTP header injection → must return None.""" malicious = "valid-prefix\r\nAuthorization: MSign attacker-token" assert sanitize_token(malicious) is None def test_token_with_cr_only_rejected(self) -> None: malicious = "valid\rAuthorization: MSign injected" assert sanitize_token(malicious) is None def test_token_with_lf_only_rejected(self) -> None: malicious = "valid\nX-Custom-Header: injected" assert sanitize_token(malicious) is None def test_token_with_null_byte_rejected(self) -> None: malicious = "valid\x00malicious" assert sanitize_token(malicious) is None def test_token_with_bel_control_char_rejected(self) -> None: malicious = "valid\x07bel" assert sanitize_token(malicious) is None def test_token_with_esc_rejected(self) -> None: malicious = "valid\x1b[31mred" assert sanitize_token(malicious) is None def test_token_at_max_length_accepted(self) -> None: """Token exactly at 8 192 chars (MAX) must be accepted.""" long_valid = "a" * _MAX_TOKEN_LEN result = sanitize_token(long_valid) assert result == long_valid def test_token_one_over_max_length_rejected(self) -> None: too_long = "a" * (_MAX_TOKEN_LEN + 1) assert sanitize_token(too_long) is None def test_token_whitespace_only_returns_none(self) -> None: assert sanitize_token(" \t \n ") is None def test_token_empty_string_returns_none(self) -> None: assert sanitize_token("") is None def test_base64url_dotted_token_accepted(self) -> None: """Dotted base64url strings (e.g. opaque tokens) must not be rejected.""" token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6Ikpva" assert sanitize_token(token) == token def test_token_with_c1_control_char_rejected(self) -> None: """C1 control chars (0x80-0x9F) must be rejected.""" malicious = "valid\x80injection" assert sanitize_token(malicious) is None # =========================================================================== # 4. URL STRUCTURE: OWNER/SLUG ENCODING # =========================================================================== class TestCoordSecurityURLEncoding: """Every special character in owner/slug must be %-encoded, never literal.""" def test_owner_at_sign_encoded_not_user_info(self) -> None: """``@`` in owner must become ``%40`` — never parsed as user-info.""" url = _build_url("http://hub", "user@host", "repo", "coord/push") # Must not have user-info syntax assert "user@host" not in url assert "%40" in url # Scheme+authority part must not have user:pass@host form host_part = url.split("://")[1].split("/")[0] assert "@" not in host_part def test_owner_hash_encoded_not_fragment(self) -> None: url = _build_url("http://hub", "#malicious", "repo", "coord/push") assert "#" not in url.split("hub")[1] # no literal # after host assert "%23" in url def test_owner_question_mark_encoded_not_query(self) -> None: url = _build_url("http://hub", "?q=1", "repo", "coord/push") assert "%3F" in url # No literal ? means no query string was injected assert "?" not in url.split("hub")[1] def test_owner_slash_encoded_cannot_span_path_segments(self) -> None: url = _build_url("http://hub", "a/b", "repo", "coord/push") # Internal / must be %-encoded; owner cannot occupy 2 path segments assert "%2F" in url path = url.split("hub")[1] # /a%2Fb/repo/coord/push parts = path.lstrip("/").split("/") # parts[0] = "a%2Fb", parts[1] = "repo", parts[2] = "coord", parts[3] = "push" assert "repo" in parts[1] def test_owner_path_traversal_slashes_encoded(self) -> None: url = _build_url("http://hub", "../../etc", "passwd", "coord/push") assert "%2F" in url # slashes within owner are encoded # The hub host must not be followed by a literal ../ path = url.split("://", 1)[1].split("/", 1)[1] # strip host assert not path.startswith("../") assert not path.startswith("..%2F..%2F..%2F") # still points past hub def test_slug_path_traversal_encoded(self) -> None: url = _build_url("http://hub", "gabriel", "../../admin", "coord/push") assert "%2F" in url # Structure: /gabriel//coord/push path = url.split("hub")[1] parts = path.lstrip("/").split("/") assert parts[0] == "gabriel" assert "admin" not in parts[2] # admin must be inside encoded slug def test_newline_in_owner_percent_encoded(self) -> None: """Newline in owner would split the HTTP request line — must be encoded.""" url = _build_url("http://hub", "eve\nGET /malicious HTTP/1.1", "repo", "coord/push") assert "\n" not in url assert "%0A" in url def test_space_in_slug_percent_encoded(self) -> None: url = _build_url("http://hub", "gabriel", "my repo", "coord/push") assert " " not in url assert "%20" in url def test_all_special_chars_in_owner_encoded(self) -> None: """Broad sweep: each dangerous char individually.""" # Note: % itself is expected to appear as %25 in the encoded segment, # so we exclude it from this check. All other chars must be fully encoded. dangerous = ["/", "\\", "?", "#", "@", ";", "=", "&", "+"] for ch in dangerous: url = _build_url("http://hub", f"malicious{ch}owner", "repo", "coord/push") assert ch not in url.split("hub/")[1].split("/")[0], ( f"Literal '{ch}' found unencoded in owner segment of URL: {url}" ) def test_encoded_owner_slug_do_not_escape_to_different_path_positions(self) -> None: """Crafted owner/slug must not reach the 'coord/push' endpoint position.""" url = _build_url("http://hub", "gabriel", "coord%2Fpush/malicious", "coord/push") # The real endpoint must still be at the end assert url.endswith("/coord/push") # =========================================================================== # 5. ERROR BODY LEAKAGE PREVENTION # =========================================================================== class TestCoordSecurityErrorLeakage: """A malicious hub embeds sensitive data or attack payloads in error bodies.""" def test_http_403_body_truncated_to_200_chars(self) -> None: long_body = "X" * 1000 err = urllib.error.HTTPError( "http://hub", 403, "Forbidden", {}, BytesIO(long_body.encode()) ) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) # Body truncated to 200 chars + "HTTP 403: " prefix is manageable msg = str(exc_info.value) body_portion = msg.replace("HTTP 403: ", "") assert len(body_portion) <= 200, f"Body not truncated: len={len(body_portion)}" def test_http_500_body_ansi_codes_stripped(self) -> None: ansi_body = "\x1b[31mSECRET_500_BODY\x1b[0m" err = urllib.error.HTTPError( "http://hub", 500, "Internal Server Error", {}, BytesIO(ansi_body.encode()) ) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) assert "\x1b" not in str(exc_info.value) def test_http_error_body_c1_control_chars_stripped(self) -> None: """C1 chars (0x80–0x9F) in error body must be stripped.""" c1_body = "error: \x80\x9b\x9f malformed" err = urllib.error.HTTPError( "http://hub", 422, "Unprocessable", {}, BytesIO(c1_body.encode("latin-1")) ) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) msg = str(exc_info.value) for ch in ["\x80", "\x9b", "\x9f"]: assert ch not in msg, f"C1 char {ch!r} not stripped from error message" def test_network_error_reason_ansi_stripped(self) -> None: """URLError reason with ANSI must be stripped before including in CoordBusError.""" ansi_reason = "\x1b[31mConnection refused\x1b[0m" err = urllib.error.URLError(ansi_reason) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) assert "\x1b" not in str(exc_info.value) def test_network_error_reason_length_limited(self) -> None: """Very long URLError reason must be capped at 200 chars in error message.""" long_reason = "A" * 10_000 err = urllib.error.URLError(long_reason) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) # The reason is capped at 200 chars in _post_json msg = str(exc_info.value) assert "A" * 201 not in msg def test_http_401_body_completely_masked(self) -> None: """401 body must never appear in error message — even if it's valid JSON.""" sensitive = '{"token": "REAL_SECRET_IN_401_BODY", "user": "gabriel"}' err = urllib.error.HTTPError( "http://hub", 401, "Unauthorized", {}, BytesIO(sensitive.encode()) ) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) msg = str(exc_info.value) assert "REAL_SECRET_IN_401_BODY" not in msg assert "gabriel" not in msg assert "Authentication failed" in msg def test_http_401_body_masked_for_pull_too(self) -> None: sensitive = "BEARER_TOKEN_eyJ_EXPOSED_IN_401" err = urllib.error.HTTPError( "http://hub", 401, "Unauthorized", {}, BytesIO(sensitive.encode()) ) with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: pull_from_hub("http://hub", "g", "r", signing=None) assert sensitive not in str(exc_info.value) assert "Authentication failed" in str(exc_info.value) def test_zero_byte_error_body_produces_clean_http_code_message(self) -> None: """Empty error body must produce 'HTTP ' — not an exception.""" err = urllib.error.HTTPError("http://hub", 503, "Service Unavailable", {}, BytesIO(b"")) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) assert "503" in str(exc_info.value) def test_http_error_body_with_embedded_token_not_surfaced(self) -> None: """Hub 400 body containing a token string must not leak that token.""" body_with_token = f'Invalid auth: token={_SECRET} is malformed' err = urllib.error.HTTPError( "http://hub", 400, "Bad Request", {}, BytesIO(body_with_token.encode()) ) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) # Body is truncated to 200 chars and sanitized — _SECRET may or may not # appear, but the token WE sent (tok) must not appear assert "tok" not in str(exc_info.value) or "tok" in body_with_token # =========================================================================== # 6. TERMINAL INJECTION: ANSI/OSC/BEL/CSI # =========================================================================== class TestCoordSecurityTerminalInjection: """A rogue agent or repo name tries to hijack the operator's terminal.""" def test_osc_escape_in_owner_stripped_in_text_push(self, repo: pathlib.Path) -> None: """OSC sequence that sets terminal title must be stripped.""" osc_owner = "\x1b]0;PWNED_TERMINAL_TITLE\x07" with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(owner=osc_owner)) assert "\x1b" not in result.output assert "\x07" not in result.output def test_bel_char_in_slug_stripped_in_text_push(self, repo: pathlib.Path) -> None: bel_slug = "linux\x07DING" with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(slug=bel_slug)) assert "\x07" not in result.output def test_c1_csi_in_owner_stripped_in_text_pull(self, repo: pathlib.Path) -> None: """C1 CSI (0x9B) that initiates escape sequence must be stripped.""" csi_owner = "gabriel\x9b[2J" with patch(_PULL_TARGET, return_value={"records": [], "cursor": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _pull_args(owner=csi_owner)) assert "\x9b" not in result.output def test_full_ansi_clear_screen_in_slug_stripped(self, repo: pathlib.Path) -> None: """ESC[H + ESC[2J (clear screen + cursor home) must be stripped.""" clear_slug = "linux\x1b[H\x1b[2J" with patch(_PULL_TARGET, return_value={"records": [], "cursor": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _pull_args(slug=clear_slug)) assert "\x1b" not in result.output def test_ansi_colour_in_owner_stripped(self, repo: pathlib.Path) -> None: ansi_owner = "\x1b[31mmalicious_red_owner\x1b[0m" with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(owner=ansi_owner)) assert "\x1b" not in result.output def test_all_c0_control_chars_stripped_in_text_output( self, repo: pathlib.Path ) -> None: """Every C0 control char except \\t and \\n must not appear in output.""" # Build an owner with all C0 chars (except \\n which is legitimate and \\t) malicious_owner = "".join( chr(i) for i in range(0x00, 0x20) if i not in (0x09, 0x0A) # keep tab and newline ) + "gabriel" with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(owner=malicious_owner)) # Check: no raw control chars except newline (output separators) in output for ch in result.output: cp = ord(ch) if cp < 0x20 and cp not in (0x09, 0x0A): pytest.fail( f"Control char U+{cp:04X} found in text output after sanitize_display" ) def test_ansi_in_json_mode_is_json_encoded_not_raw(self, repo: pathlib.Path) -> None: """In --json mode, ANSI in owner must be JSON-encoded (\\u001b), not raw ESC. JSON consumers parse the string — they must not see raw escape sequences.""" ansi_owner = "\x1b[31mred\x1b[0m" with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): # JSON mode does not include owner in output, but must not crash result = runner.invoke(cli, _push_args(owner=ansi_owner) + ["-j"]) # Must produce valid JSON assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "inserted" in data # =========================================================================== # 7. TOKEN LEAKAGE ACROSS ALL OUTPUT PATHS # =========================================================================== class TestCoordSecurityTokenLeakage: """Signing credential must be absolutely absent from every output path.""" def test_token_not_in_push_json_success(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", _SECRET)): result = runner.invoke(cli, _push_args() + ["-j"]) assert _SECRET not in result.output def test_token_not_in_pull_json_success(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value={"records": [], "cursor": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", _SECRET)): result = runner.invoke(cli, _pull_args() + ["-j"]) assert _SECRET not in result.output def test_token_not_in_push_text_success(self, repo: pathlib.Path) -> None: with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", _SECRET)): result = runner.invoke(cli, _push_args()) assert _SECRET not in result.output def test_token_not_in_pull_text_success(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value={"records": [], "cursor": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", _SECRET)): result = runner.invoke(cli, _pull_args()) assert _SECRET not in result.output def test_token_not_in_push_json_on_error(self, repo: pathlib.Path) -> None: subdir = coordination_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = _new_id() (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) with patch(_PUSH_TARGET, side_effect=CoordBusError("hub refused", status_code=500)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", _SECRET)): result = runner.invoke(cli, _push_args() + ["-j"]) assert _SECRET not in result.output def test_token_not_in_pull_json_on_error(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, side_effect=CoordBusError("gateway timeout", status_code=504)), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", _SECRET)): result = runner.invoke(cli, _pull_args() + ["-j"]) assert _SECRET not in result.output def test_token_not_in_coord_bus_error_string(self) -> None: """CoordBusError raised by push_to_hub must not include the token.""" sensitive = "SECRET_TOKEN_XYZZY" err = urllib.error.HTTPError( "http://hub", 500, "Server Error", {}, BytesIO(b"internal failure - auth token rejected"), ) records = [_make_record()] with patch(_OPENER, side_effect=err): with pytest.raises(CoordBusError) as exc_info: push_to_hub("http://hub", "g", "r", records, signing=None) assert sensitive not in str(exc_info.value) def test_token_not_in_log_records_during_push(self, repo: pathlib.Path) -> None: """Logging must never emit the signing credential, even at DEBUG level.""" log_records: list[str] = [] class Capture(logging.Handler): def emit(self, record: logging.LogRecord) -> None: log_records.append(self.format(record)) handler = Capture() logger = logging.getLogger("muse") old_level = logger.level logger.setLevel(logging.DEBUG) logger.addHandler(handler) try: subdir = coordination_dir(repo) / "reservations" subdir.mkdir(parents=True, exist_ok=True) uid = _new_id() (subdir / f"{uid}.json").write_text( json.dumps({"reservation_id": uid, "run_id": "r"}), encoding="utf-8" ) with patch(_PUSH_TARGET, return_value={"inserted": 1, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", _SECRET)): runner.invoke(cli, _push_args() + ["-j"]) finally: logger.removeHandler(handler) logger.setLevel(old_level) for record in log_records: assert _SECRET not in record, f"Token found in log record: {record!r}" # =========================================================================== # 8. MALICIOUS HUB RESPONSE: ADVERSARIAL RECORDS # =========================================================================== class TestCoordSecurityMaliciousHubResponse: """A compromised hub returns adversarial records to escape the remote/ directory.""" def test_kind_traversal_does_not_write_outside_remote( self, repo: pathlib.Path ) -> None: uid = _new_id() malicious_records = [ {"kind": "../../../.ssh", "record_id": uid, "run_id": "r", "payload": {}, "expires_at": None}, ] _write_remote_records(repo, malicious_records) ssh_path = pathlib.Path.home() / ".ssh" / f"{uid}.json" assert not ssh_path.exists(), "Traversal escaped to ~/.ssh!" remote = coordination_dir(repo) / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_unknown_kind_uppercase_rejected(self, repo: pathlib.Path) -> None: """'RESERVATION' is not in _ALL_KINDS — case-sensitive allowlist.""" uid = _new_id() records = [{"kind": "RESERVATION", "record_id": uid, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = coordination_dir(repo) / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_unknown_kind_with_null_byte_rejected(self, repo: pathlib.Path) -> None: uid = _new_id() records = [{"kind": "reservation\x00malicious", "record_id": uid, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = coordination_dir(repo) / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_traversal_record_id_ssh_target_rejected(self, repo: pathlib.Path) -> None: """record_id = ``../../.ssh/authorized_keys`` must not write to ~/.ssh.""" malicious_id = "../../.ssh/authorized_keys" records = [{"kind": "reservation", "record_id": malicious_id, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) # Nothing must be written target = coordination_dir(repo) / "remote" / "reservation" / f"{malicious_id}.json" assert not target.exists() remote = coordination_dir(repo) / "remote" if remote.exists(): all_files = list(remote.rglob("*.json")) assert all_files == [], f"Unexpected files written: {all_files}" def test_record_id_with_null_byte_rejected(self, repo: pathlib.Path) -> None: malicious_id = "foo\x00bar" records = [{"kind": "reservation", "record_id": malicious_id, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = coordination_dir(repo) / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_record_id_with_space_rejected(self, repo: pathlib.Path) -> None: records = [{"kind": "task", "record_id": "foo bar", "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = coordination_dir(repo) / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_record_id_with_newline_rejected(self, repo: pathlib.Path) -> None: records = [{"kind": "task", "record_id": "foo\nbar", "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = coordination_dir(repo) / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] def test_record_id_with_backslash_rejected(self) -> None: """Backslash would be a path separator on Windows — must be rejected.""" assert not _SAFE_RECORD_ID_RE.match("foo\\bar") assert not _SAFE_RECORD_ID_RE.match("..\\..\\malicious") def test_payload_with_exec_string_safely_serialized( self, repo: pathlib.Path ) -> None: """Adversarial payload values must be stored as data, not executed.""" uid = _new_id() exec_payload = {"cmd": "__import__('os').system('rm -rf /')"} records = [{"kind": "reservation", "record_id": uid, "run_id": "r", "payload": exec_payload, "expires_at": None}] _write_remote_records(repo, records) target = coordination_dir(repo) / "remote" / "reservation" / f"{uid}.json" assert target.exists() loaded = json.loads(target.read_text()) # Stored as a string, not executed assert loaded["payload"]["cmd"] == exec_payload["cmd"] def test_1000_adversarial_record_ids_zero_files_written( self, repo: pathlib.Path ) -> None: """1000-record response where every record_id is adversarial → 0 files.""" traversals = [ "../../etc/passwd", "../.ssh/authorized_keys", "/etc/crontab", "foo\x00bar", "a" * 129, # over max length "foo bar", "foo\nbar", "foo\tbar", "\x1b[31mred", "..%2F..%2Fetc", ] malicious_records = [] for i in range(1000): malicious_records.append({ "kind": "reservation", "record_id": traversals[i % len(traversals)], "run_id": "r", "payload": {}, "expires_at": None, }) _write_remote_records(repo, malicious_records) remote = coordination_dir(repo) / "remote" if remote.exists(): written = list(remote.rglob("*.json")) assert written == [], f"Adversarial record_ids wrote {len(written)} files!" def test_hub_response_with_unicode_kind_rejected(self, repo: pathlib.Path) -> None: """Non-ASCII kind like 'réservation' is not in _ALL_KINDS allowlist.""" uid = _new_id() records = [{"kind": "réservation", "record_id": uid, "run_id": "r", "payload": {}, "expires_at": None}] _write_remote_records(repo, records) remote = coordination_dir(repo) / "remote" if remote.exists(): assert list(remote.rglob("*.json")) == [] # =========================================================================== # 9. GC INPUT VALIDATION # =========================================================================== class TestCoordSecurityGCBoundaries: """Adversarial GC parameters must not trigger crashes or unsafe operations.""" def test_gc_negative_grace_period_exits_1(self, repo: pathlib.Path) -> None: with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke( cli, ["coord", "gc", "--grace-period", "-1"] ) assert result.exit_code == 1 def test_gc_zero_grace_period_accepted(self, repo: pathlib.Path) -> None: """Zero grace period is valid (no grace window).""" with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke( cli, ["coord", "gc", "--grace-period", "0"] ) # Should succeed (dry-run, no files to collect) assert result.exit_code == 0 def test_gc_zero_max_intent_age_exits_1(self, repo: pathlib.Path) -> None: with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke( cli, ["coord", "gc", "--max-intent-age", "0"] ) assert result.exit_code == 1 def test_gc_negative_max_intent_age_exits_1(self, repo: pathlib.Path) -> None: with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke( cli, ["coord", "gc", "--max-intent-age", "-1"] ) assert result.exit_code == 1 def test_gc_very_large_grace_period_accepted(self, repo: pathlib.Path) -> None: """sys.maxsize grace period must not overflow or crash.""" max_val = min(sys.maxsize, 2_147_483_647) # keep within int range for argparse with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke( cli, ["coord", "gc", "--grace-period", str(max_val)] ) assert result.exit_code == 0 def test_gc_dry_run_never_deletes_files(self, repo: pathlib.Path) -> None: """Default (dry-run) mode must not delete anything.""" # Create an expired reservation import datetime res_dir = coordination_dir(repo) / "reservations" res_dir.mkdir(parents=True, exist_ok=True) uid = _new_id() expired_res = { "reservation_id": uid, "run_id": "r", "branch": "main", "addresses": ["foo::bar"], "created_at": "2000-01-01T00:00:00+00:00", "expires_at": "2000-01-01T01:00:00+00:00", "operation": None, } (res_dir / f"{uid}.json").write_text(json.dumps(expired_res), encoding="utf-8") with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke(cli, ["coord", "gc"]) assert result.exit_code == 0 # File must still exist (dry-run does not delete) assert (res_dir / f"{uid}.json").exists(), "Dry-run deleted a file!" def test_gc_json_output_is_valid_json(self, repo: pathlib.Path) -> None: with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke(cli, ["coord", "gc", "--json"]) assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "dry_run" in data assert "total_removed" in data def test_gc_no_traceback_on_empty_repo(self, repo: pathlib.Path) -> None: with patch(_REQUIRE_REPO, return_value=repo): result = runner.invoke(cli, ["coord", "gc"]) assert "Traceback" not in result.output assert result.exit_code == 0 # =========================================================================== # 10. CLI INPUT VALIDATION HARDENING # =========================================================================== class TestCoordSecurityInputValidation: """Boundary tests for all size-limited and range-checked CLI inputs.""" def test_push_owner_at_max_length_accepted(self, repo: pathlib.Path) -> None: owner = "a" * _MAX_OWNER_LEN with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(owner=owner)) # Should not exit 1 due to length check assert "too long" not in result.output def test_push_owner_one_over_max_length_rejected_before_io( self, repo: pathlib.Path ) -> None: owner = "a" * (_MAX_OWNER_LEN + 1) gather_called = [] def fake_gather(root: pathlib.Path, kinds: list[str]) -> list[MsgpackDict]: gather_called.append(True) return [] with patch("muse.cli.commands.coord_sync._gather_local_records", fake_gather), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(owner=owner)) assert result.exit_code == 1 assert not gather_called, "File I/O was performed before length check!" def test_push_slug_at_max_length_accepted(self, repo: pathlib.Path) -> None: slug = "s" * _MAX_SLUG_LEN with patch(_PUSH_TARGET, return_value={"inserted": 0, "skipped": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(slug=slug)) assert "too long" not in result.output def test_push_slug_one_over_max_length_rejected_before_io( self, repo: pathlib.Path ) -> None: slug = "s" * (_MAX_SLUG_LEN + 1) gather_called = [] def fake_gather(root: pathlib.Path, kinds: list[str]) -> list[MsgpackDict]: gather_called.append(True) return [] with patch("muse.cli.commands.coord_sync._gather_local_records", fake_gather), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _push_args(slug=slug)) assert result.exit_code == 1 assert not gather_called, "File I/O was performed before slug length check!" def test_pull_since_id_max_python_int_accepted(self, repo: pathlib.Path) -> None: """sys.maxsize as --since-id must not overflow or crash.""" with patch(_PULL_TARGET, return_value={"records": [], "cursor": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke( cli, ["coord", "sync", "pull", "--hub", "https://localhost:1337", "--owner", "gabriel", "--slug", "linux", "--since-id", str(sys.maxsize)], ) # Should not crash with overflow — may exit 0 or 1 depending on argparse int handling assert "Traceback" not in result.output def test_pull_limit_zero_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, _pull_args() + ["--limit", "0"]) assert result.exit_code == 1 def test_pull_limit_at_max_accepted(self, repo: pathlib.Path) -> None: with patch(_PULL_TARGET, return_value={"records": [], "cursor": 0}), \ patch(_REQUIRE_REPO, return_value=repo), \ patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, _pull_args() + ["--limit", str(_MAX_PULL_LIMIT)]) assert result.exit_code == 0 def test_pull_limit_one_over_max_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, _pull_args() + ["--limit", str(_MAX_PULL_LIMIT + 1)] ) assert result.exit_code == 1 def test_pull_since_id_negative_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, _pull_args() + ["--since-id", "-1"]) assert result.exit_code == 1 def test_no_traceback_on_any_validation_failure(self, repo: pathlib.Path) -> None: """All input validation failures must emit clean messages, never tracebacks.""" bad_invocations = [ _push_args(owner="a" * (_MAX_OWNER_LEN + 1)), _push_args(slug="s" * (_MAX_SLUG_LEN + 1)), _pull_args() + ["--since-id", "-99"], _pull_args() + ["--limit", "0"], _pull_args() + ["--limit", str(_MAX_PULL_LIMIT + 1)], ] for args in bad_invocations: with patch(_RESOLVE_HUB, return_value=("https://localhost:1337", "tok")): result = runner.invoke(cli, args) assert result.exit_code == 1, f"Expected exit 1 for: {args}" assert "Traceback" not in result.output, ( f"Traceback leaked for args: {args}\nOutput: {result.output}" )