"""Phase 2.6 — Branch and ref injection security tests. Attack surface -------------- Branch names are user-controlled strings that become filesystem paths: .muse/refs/heads/ A permissive validator allows an attacker to: 1. Escape the ref store via path traversal (../../etc/cron.d/pwned). 2. Inject terminal-escape sequences into for-each-ref text output via ESC or other C0 control characters in the branch name. 3. Create phantom branch aliases: ``feat/./sub`` resolves to the same inode as ``feat/sub`` on every POSIX filesystem, so two names share one file. 4. Produce .lock-suffixed files that look like stale atomic-write temp files to any tooling scanning the ref directory. 5. Inject git reflog notation (``@{``) into pipeline outputs, confusing downstream parsers. 6. Smuggle glob metacharacters that expand unexpectedly if branch names are ever used in a glob pattern. Fixes ----- ``_BRANCH_FORBIDDEN_RE`` in ``muse.core.validation`` was extended to block: - All C0 control chars (0x00–0x1F), space (0x20), DEL (0x7F). - Git-banned punctuation: ``~``, ``^``, ``:``, ``?``, ``*``, ``[``. - Single-dot path component (``/./``). - Any path component ending in ``.lock``. - The ``@{`` sequence and the bare ``@`` string. All ref-writing commands (``update-ref``, ``symbolic-ref``, ``branch``) call ``validate_branch_name`` before any filesystem operation, so these fixes propagate automatically to every write path. """ from __future__ import annotations from collections.abc import Mapping import json import os import pathlib from typing import TypedDict import pytest from muse.core.validation import validate_branch_name from muse.core.refs import write_branch_ref, write_head_branch from tests.cli_test_helper import CliRunner from muse.core.types import NULL_LONG_ID, long_id from muse.core.paths import commits_dir, head_path, heads_dir, objects_dir, repo_json_path, snapshots_dir class _CheckRefFormatResult(TypedDict, total=False): """Shape of muse check-ref-format --json output.""" all_valid: bool valid_count: int invalid_count: int results: list[Mapping[str, str | bool | None]] max_length: int forbidden_chars: list[str] forbidden_patterns: list[str] notes: str cli = None # argparse migration — CliRunner ignores this runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Create a minimal .muse repo skeleton for integration tests.""" (heads_dir(tmp_path)).mkdir(parents=True) (commits_dir(tmp_path)).mkdir(parents=True) (snapshots_dir(tmp_path)).mkdir(parents=True) (objects_dir(tmp_path)).mkdir(parents=True) (head_path(tmp_path)).write_text("ref: refs/heads/main\n") (repo_json_path(tmp_path)).write_text( '{"repo_id": "test-repo", "name": "test"}' ) return tmp_path def _invoke_in_repo(tmp_path: pathlib.Path, args: list[str]) -> tuple[int, str]: """Invoke the muse CLI inside *tmp_path* (which must contain a .muse dir).""" old_cwd = os.getcwd() try: os.chdir(tmp_path) result = runner.invoke(cli, args) return result.exit_code, result.output + result.stderr finally: os.chdir(old_cwd) _ZERO_OID = NULL_LONG_ID # =========================================================================== # Unit tests — validate_branch_name # =========================================================================== class TestValidBranchNames: """Names that must be accepted.""" @pytest.mark.parametrize("name", [ "main", "dev", "feature/my-branch", "fix/auth-token-exposure", "feat/v2/core", "release/1.2.0", "bugfix/PROJ-42", "hotfix/auth", "branch-123_test", "a", "A", "Z9", "a" * 255, "-branch", # leading dash: allowed (Git allows it; no shell interpolation) "branch-", # trailing dash: allowed "feat/--desc", # double dash in namespace: allowed ]) def test_accepted(self, name: str) -> None: assert validate_branch_name(name) == name # --------------------------------------------------------------------------- # C0/C1 control character injection # --------------------------------------------------------------------------- class TestControlCharInjection: """All C0 control chars must be rejected to prevent terminal injection. ESC (0x1b) is the highest-risk char: a branch named ``main\x1b[31m`` would inject ANSI colour sequences into ``for-each-ref --format text`` output, potentially hiding output, changing terminal colours, or triggering OSC 8 hyperlinks in compliant terminal emulators. """ @pytest.mark.parametrize("char,description", [ ("\x00", "NUL"), ("\x01", "SOH"), ("\x02", "STX"), ("\x03", "ETX"), ("\x04", "EOT"), ("\x05", "ENQ"), ("\x06", "ACK"), ("\x07", "BEL"), ("\x08", "BS"), ("\x09", "HT (tab)"), ("\x0a", "LF"), ("\x0b", "VT"), ("\x0c", "FF"), ("\x0d", "CR"), ("\x0e", "SO"), ("\x0f", "SI"), ("\x10", "DLE"), ("\x11", "DC1"), ("\x12", "DC2"), ("\x13", "DC3"), ("\x14", "DC4"), ("\x15", "NAK"), ("\x16", "SYN"), ("\x17", "ETB"), ("\x18", "CAN"), ("\x19", "EM"), ("\x1a", "SUB"), ("\x1b", "ESC — highest risk, ANSI sequence introducer"), ("\x1c", "FS"), ("\x1d", "GS"), ("\x1e", "RS"), ("\x1f", "US"), ("\x20", "space (0x20) — shell interpolation / log-parsing hazard"), ("\x7f", "DEL"), ]) def test_control_char_rejected(self, char: str, description: str) -> None: with pytest.raises((ValueError, TypeError)): validate_branch_name(f"main{char}malicious") def test_esc_at_start(self) -> None: with pytest.raises(ValueError): validate_branch_name("\x1bmain") def test_esc_at_end(self) -> None: with pytest.raises(ValueError): validate_branch_name("main\x1b") def test_multiple_control_chars(self) -> None: """Payloads combining multiple control chars are still rejected.""" with pytest.raises(ValueError): validate_branch_name("feat\x1b[31m/\x07sub") def test_space_only(self) -> None: with pytest.raises(ValueError): validate_branch_name(" ") def test_space_in_namespace(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/my branch") # --------------------------------------------------------------------------- # Git-banned punctuation # --------------------------------------------------------------------------- class TestGitBannedPunctuation: """Characters forbidden by git-check-ref-format that Muse now also rejects.""" @pytest.mark.parametrize("char,description", [ ("~", "tilde — git ancestry operator"), ("^", "caret — git ancestry operator"), (":", "colon — refspec separator"), ("?", "question mark — glob wildcard"), ("*", "asterisk — glob wildcard"), ("[", "open bracket — character class in glob"), ]) def test_git_banned_char_in_name(self, char: str, description: str) -> None: with pytest.raises(ValueError): validate_branch_name(f"feat{char}malicious") def test_tilde_suffix(self) -> None: """feat~1 looks like a git ancestry ref; must be rejected.""" with pytest.raises(ValueError): validate_branch_name("feat~1") def test_colon_refspec(self) -> None: """feat:main is a refspec; must be rejected.""" with pytest.raises(ValueError): validate_branch_name("feat:main") def test_glob_expansion_star(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/*") def test_glob_expansion_question(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/fo?") def test_glob_char_class(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/[abc]") # --------------------------------------------------------------------------- # Single-dot path component (inode aliasing) # --------------------------------------------------------------------------- class TestSingleDotPathComponent: """``feat/./sub`` and ``feat/sub`` resolve to the same inode on disk. If both were valid branch names, writing to the first would silently overwrite the second's ref file. This is a subtle data-corruption vector that requires no privilege escalation. """ def test_dot_slash_dot_slash(self) -> None: """feat/./sub — single dot in the middle.""" with pytest.raises(ValueError): validate_branch_name("feat/./sub") def test_dot_slash_at_end(self) -> None: """feat/. — trailing slash-dot.""" with pytest.raises(ValueError): validate_branch_name("feat/.") def test_deep_dot_path(self) -> None: """a/b/./c/d — dot buried deep in a hierarchy.""" with pytest.raises(ValueError): validate_branch_name("a/b/./c/d") def test_multiple_dots(self) -> None: """Two single-dot components in a row.""" with pytest.raises(ValueError): validate_branch_name("a/././b") def test_dot_as_entire_name(self) -> None: """Bare dot is already rejected by the leading-dot rule.""" with pytest.raises(ValueError): validate_branch_name(".") def test_inode_aliasing_proven(self, tmp_path: pathlib.Path) -> None: """Demonstrate the attack: /tmp/x/feat/./sub IS the same file as /tmp/x/feat/sub.""" import os (tmp_path / "feat").mkdir() (tmp_path / "feat" / "sub").write_text("ORIGINAL") alias = tmp_path / "feat" / "." / "sub" assert alias.exists(), "alias should exist via filesystem normalisation" assert os.stat(tmp_path / "feat" / "sub").st_ino == os.stat(alias).st_ino alias.write_text("OVERWRITTEN") assert (tmp_path / "feat" / "sub").read_text() == "OVERWRITTEN" # --------------------------------------------------------------------------- # .lock suffix # --------------------------------------------------------------------------- class TestLockSuffix: """Names ending in .lock on any path component must be rejected. The VCS convention reserves ``.lock`` for exclusive-lock files. Allowing ``main.lock`` would create ``.muse/refs/heads/main.lock`` — a file that tooling scanning the ref directory could mistake for a stale lock or a failed atomic write. """ def test_top_level_lock(self) -> None: with pytest.raises(ValueError): validate_branch_name("main.lock") def test_namespaced_lock(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/my-branch.lock") def test_lock_as_midpath_component(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/foo.lock/sub") def test_lock_prefix_only_is_allowed(self) -> None: """A branch named 'lockdown' does not end in .lock; must be allowed.""" assert validate_branch_name("lockdown") == "lockdown" def test_lock_substring_allowed(self) -> None: """'lockfix' does not end in .lock; must be allowed.""" assert validate_branch_name("lockfix") == "lockfix" def test_dotlock_exact_name(self) -> None: """.lock alone is rejected by the leading-dot rule first.""" with pytest.raises(ValueError): validate_branch_name(".lock") # --------------------------------------------------------------------------- # @{ sequence and bare @ # --------------------------------------------------------------------------- class TestAtBraceSequence: """The @{ sequence is git reflog notation; it must be rejected. A branch named ``feat/@{0}`` would confuse any tool that parses ``@{}`` as a reflog reference — including Muse's own future reflog implementation. """ def test_at_brace_top_level(self) -> None: with pytest.raises(ValueError): validate_branch_name("@{upstream}") def test_at_brace_in_namespace(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/@{0}") def test_at_brace_suffix(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat@{0}") def test_bare_at(self) -> None: """Bare @ is git HEAD shorthand; rejected for the same reason.""" with pytest.raises(ValueError): validate_branch_name("@") def test_at_in_normal_name_allowed(self) -> None: """@ followed by anything other than { is not the forbidden sequence.""" # e.g. "feat@42" is unusual but not the @{ reflog pattern # validate_branch_name should allow it (@ is ASCII printable, not # in the C0 or punctuation block). result = validate_branch_name("feat@42") assert result == "feat@42" # --------------------------------------------------------------------------- # Existing rules (regression: they must still work after the regex change) # --------------------------------------------------------------------------- class TestExistingRulesRegression: """Ensure the new regex does not break pre-existing rejections.""" def test_backslash(self) -> None: with pytest.raises(ValueError): validate_branch_name("malicious\\branch") def test_null_byte(self) -> None: with pytest.raises(ValueError): validate_branch_name("branch\x00name") def test_carriage_return(self) -> None: with pytest.raises(ValueError): validate_branch_name("branch\rname") def test_linefeed(self) -> None: with pytest.raises(ValueError): validate_branch_name("branch\nname") def test_tab(self) -> None: with pytest.raises(ValueError): validate_branch_name("branch\tname") def test_leading_dot(self) -> None: with pytest.raises(ValueError): validate_branch_name(".hidden") def test_trailing_dot(self) -> None: with pytest.raises(ValueError): validate_branch_name("branch.") def test_consecutive_dots(self) -> None: with pytest.raises(ValueError): validate_branch_name("branch..name") def test_double_slash(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat//branch") def test_leading_slash(self) -> None: with pytest.raises(ValueError): validate_branch_name("/branch") def test_trailing_slash(self) -> None: with pytest.raises(ValueError): validate_branch_name("branch/") def test_empty_string(self) -> None: with pytest.raises(ValueError): validate_branch_name("") def test_too_long(self) -> None: with pytest.raises(ValueError): validate_branch_name("a" * 256) def test_dotdot_traversal(self) -> None: with pytest.raises(ValueError): validate_branch_name("../../etc/passwd") def test_dotdot_in_namespace(self) -> None: with pytest.raises(ValueError): validate_branch_name("feat/../main") # =========================================================================== # Integration tests — store-level gatekeeping # =========================================================================== class TestWriteBranchRefGatekeeping: """write_branch_ref validates the branch name before writing any file.""" def test_traversal_rejected_before_write(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_branch_ref(repo, "../../etc/passwd", _ZERO_OID) assert not (tmp_path / "etc" / "passwd").exists() def test_esc_injection_rejected_before_write(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_branch_ref(repo, "main\x1b[31m", _ZERO_OID) def test_single_dot_component_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_branch_ref(repo, "feat/./sub", _ZERO_OID) def test_lock_suffix_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_branch_ref(repo, "main.lock", _ZERO_OID) def test_at_brace_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_branch_ref(repo, "feat/@{0}", _ZERO_OID) def test_space_in_name_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_branch_ref(repo, "feat branch", _ZERO_OID) def test_valid_name_writes_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) write_branch_ref(repo, "feat/ok", _ZERO_OID) ref_path = heads_dir(repo) / "feat" / "ok" assert ref_path.read_text().strip() == _ZERO_OID def test_valid_name_no_file_escape(self, tmp_path: pathlib.Path) -> None: """A valid name must not write outside .muse/refs/heads/.""" repo = _make_repo(tmp_path) write_branch_ref(repo, "main", _ZERO_OID) ref_path = heads_dir(repo) / "main" assert ref_path.exists() assert not (repo / "main").exists() class TestWriteHeadBranchGatekeeping: """write_head_branch validates the branch name before writing HEAD.""" def test_esc_injection_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_head_branch(repo, "main\x1b[31m") def test_dotdot_traversal_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): write_head_branch(repo, "../../etc/passwd") def test_valid_name_writes_head(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) write_head_branch(repo, "feat/ok") head = (head_path(repo)).read_text() assert "feat/ok" in head assert "../../" not in head # =========================================================================== # Integration tests — CLI commands via CliRunner # =========================================================================== class TestUpdateRefCLIGatekeeping: """muse update-ref rejects injection branch names at the CLI level.""" def test_dotdot_traversal(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["update-ref", "../../etc/passwd", _ZERO_OID]) assert code != 0 assert "Invalid branch name" in out or "forbidden" in out.lower() or "error" in out.lower() def test_esc_injection(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["update-ref", "main\x1b[31m", _ZERO_OID]) assert code != 0 def test_lock_suffix(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["update-ref", "main.lock", _ZERO_OID]) assert code != 0 assert not (heads_dir(tmp_path) / "main.lock").exists() def test_single_dot_component(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat/./sub", _ZERO_OID]) assert code != 0 # The alias must not have silently created feat/sub assert not (heads_dir(tmp_path) / "feat" / "sub").exists() def test_at_brace(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat/@{0}", _ZERO_OID]) assert code != 0 def test_space_in_name(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat branch", _ZERO_OID]) assert code != 0 def test_tilde(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat~1", _ZERO_OID]) assert code != 0 class TestSymbolicRefCLIGatekeeping: """muse symbolic-ref --set rejects injection branch names.""" def test_dotdot_traversal(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, [ "symbolic-ref", "HEAD", "--set", "../../etc/passwd", "--create-branch", ]) assert code != 0 def test_esc_injection(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, [ "symbolic-ref", "HEAD", "--set", "main\x1b[31m", "--create-branch", ]) assert code != 0 def test_lock_suffix(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, [ "symbolic-ref", "HEAD", "--set", "main.lock", "--create-branch", ]) assert code != 0 def test_at_brace(self, tmp_path: pathlib.Path) -> None: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, [ "symbolic-ref", "HEAD", "--set", "@{0}", "--create-branch", ]) assert code != 0 class TestCheckRefFormatCLI: """muse check-ref-format reflects the full rule set.""" def _run_check(self, tmp_path: pathlib.Path, name: str) -> tuple[int, _CheckRefFormatResult]: _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["check-ref-format", name, "--json"]) raw = out.strip() data: _CheckRefFormatResult = json.loads(raw) if raw else {} return code, data def test_valid_name_passes(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "feat/ok") assert code == 0 assert data["all_valid"] is True def test_dotdot_traversal_fails(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "../../etc/passwd") assert code != 0 assert data.get("all_valid") is False def test_esc_injection_fails(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "main\x1b[31m") assert code != 0 assert data.get("all_valid") is False def test_lock_suffix_fails(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "main.lock") assert code != 0 assert data.get("all_valid") is False def test_single_dot_component_fails(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "feat/./sub") assert code != 0 assert data.get("all_valid") is False def test_at_brace_fails(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "@{upstream}") assert code != 0 assert data.get("all_valid") is False def test_space_fails(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "feat branch") assert code != 0 assert data.get("all_valid") is False def test_tilde_fails(self, tmp_path: pathlib.Path) -> None: code, data = self._run_check(tmp_path, "feat~1") assert code != 0 assert data.get("all_valid") is False def test_rules_endpoint_lists_new_patterns(self, tmp_path: pathlib.Path) -> None: """--rules must mention the new forbidden patterns.""" _make_repo(tmp_path) code, out = _invoke_in_repo(tmp_path, ["check-ref-format", "--rules", "--json"]) rules = json.loads(out.strip()) patterns = rules.get("forbidden_patterns", []) assert any("lock" in p for p in patterns), "missing .lock rule" assert any("dot" in p.lower() and "/" in p for p in patterns), "missing /./rule" assert any("@{" in p for p in patterns), "missing @{ rule" # =========================================================================== # Concurrency / race — validate blocks before any write # =========================================================================== class TestConcurrentWriteWithInjectionName: """Two threads racing to write a traversal branch name: both must fail.""" def test_concurrent_traversal_both_fail(self, tmp_path: pathlib.Path) -> None: import threading repo = _make_repo(tmp_path) errors: list[str] = [] successes: list[str] = [] def try_write(name: str) -> None: try: write_branch_ref(repo, name, _ZERO_OID) successes.append(name) except (ValueError, TypeError) as exc: errors.append(str(exc)) threads = [ threading.Thread(target=try_write, args=("../../etc/passwd",)), threading.Thread(target=try_write, args=("feat\x1b[31m",)), threading.Thread(target=try_write, args=("main.lock",)), threading.Thread(target=try_write, args=("feat/./sub",)), ] for t in threads: t.start() for t in threads: t.join() assert successes == [], f"Expected all writes to fail; successes: {successes}" assert len(errors) == 4 def test_concurrent_valid_writes_succeed(self, tmp_path: pathlib.Path) -> None: """Ensure the fix does not regress valid concurrent writes.""" import threading repo = _make_repo(tmp_path) errors: list[str] = [] def try_write(name: str) -> None: try: write_branch_ref(repo, name, _ZERO_OID) except Exception as exc: errors.append(f"{name}: {exc}") threads = [ threading.Thread(target=try_write, args=(f"feat/branch-{i}",)) for i in range(8) ] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Valid writes unexpectedly failed: {errors}" # =========================================================================== # Fuzzing — randomised injection payloads # =========================================================================== class TestFuzzedBranchNames: """Randomised payloads: any name containing a forbidden char must be rejected.""" @pytest.mark.parametrize("seed", range(20)) def test_random_control_char_payload(self, seed: int) -> None: import random rng = random.Random(seed) # Build a name with a random C0 or DEL char embedded forbidden = [chr(c) for c in range(0x00, 0x21)] + ["\x7f"] char = rng.choice(forbidden) name = f"feat/{rng.randbytes(4).hex()}{char}suffix" with pytest.raises((ValueError, TypeError)): validate_branch_name(name) @pytest.mark.parametrize("seed", range(10)) def test_random_git_punct_payload(self, seed: int) -> None: import random rng = random.Random(seed + 100) git_banned = list("~^:?*[") char = rng.choice(git_banned) name = f"branch{char}{rng.randbytes(3).hex()}" with pytest.raises((ValueError, TypeError)): validate_branch_name(name)