test_security_branch_ref_injection.py
python
sha256:c5131d76c6eada02939111fda4aa8e51b0c1456b9983727cfd6be101916de14e
merge: pull local/dev — resolve trivial _EXT_MAP symbol con…
Sonnet 4.6
patch
12 days ago
| 1 | """Phase 2.6 — Branch and ref injection security tests. |
| 2 | |
| 3 | Attack surface |
| 4 | -------------- |
| 5 | Branch names are user-controlled strings that become filesystem paths: |
| 6 | .muse/refs/heads/<branch> |
| 7 | |
| 8 | A permissive validator allows an attacker to: |
| 9 | 1. Escape the ref store via path traversal (../../etc/cron.d/pwned). |
| 10 | 2. Inject terminal-escape sequences into for-each-ref text output via ESC or |
| 11 | other C0 control characters in the branch name. |
| 12 | 3. Create phantom branch aliases: ``feat/./sub`` resolves to the same inode |
| 13 | as ``feat/sub`` on every POSIX filesystem, so two names share one file. |
| 14 | 4. Produce .lock-suffixed files that look like stale atomic-write temp files |
| 15 | to any tooling scanning the ref directory. |
| 16 | 5. Inject git reflog notation (``@{``) into pipeline outputs, confusing |
| 17 | downstream parsers. |
| 18 | 6. Smuggle glob metacharacters that expand unexpectedly if branch names are |
| 19 | ever used in a glob pattern. |
| 20 | |
| 21 | Fixes |
| 22 | ----- |
| 23 | ``_BRANCH_FORBIDDEN_RE`` in ``muse.core.validation`` was extended to block: |
| 24 | - All C0 control chars (0x00–0x1F), space (0x20), DEL (0x7F). |
| 25 | - Git-banned punctuation: ``~``, ``^``, ``:``, ``?``, ``*``, ``[``. |
| 26 | - Single-dot path component (``/./``). |
| 27 | - Any path component ending in ``.lock``. |
| 28 | - The ``@{`` sequence and the bare ``@`` string. |
| 29 | |
| 30 | All ref-writing commands (``update-ref``, ``symbolic-ref``, |
| 31 | ``branch``) call ``validate_branch_name`` before any filesystem operation, |
| 32 | so these fixes propagate automatically to every write path. |
| 33 | """ |
| 34 | |
| 35 | from __future__ import annotations |
| 36 | from collections.abc import Mapping |
| 37 | |
| 38 | import json |
| 39 | import os |
| 40 | import pathlib |
| 41 | from typing import TypedDict |
| 42 | |
| 43 | import pytest |
| 44 | |
| 45 | from muse.core.validation import validate_branch_name |
| 46 | from muse.core.refs import write_branch_ref, write_head_branch |
| 47 | from tests.cli_test_helper import CliRunner |
| 48 | from muse.core.types import NULL_LONG_ID, long_id |
| 49 | from muse.core.paths import commits_dir, head_path, heads_dir, objects_dir, repo_json_path, snapshots_dir |
| 50 | |
| 51 | |
| 52 | class _CheckRefFormatResult(TypedDict, total=False): |
| 53 | """Shape of muse check-ref-format --json output.""" |
| 54 | all_valid: bool |
| 55 | valid_count: int |
| 56 | invalid_count: int |
| 57 | results: list[Mapping[str, str | bool | None]] |
| 58 | max_length: int |
| 59 | forbidden_chars: list[str] |
| 60 | forbidden_patterns: list[str] |
| 61 | notes: str |
| 62 | |
| 63 | cli = None # argparse migration — CliRunner ignores this |
| 64 | runner = CliRunner() |
| 65 | |
| 66 | |
| 67 | # --------------------------------------------------------------------------- |
| 68 | # Helpers |
| 69 | # --------------------------------------------------------------------------- |
| 70 | |
| 71 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 72 | """Create a minimal .muse repo skeleton for integration tests.""" |
| 73 | (heads_dir(tmp_path)).mkdir(parents=True) |
| 74 | (commits_dir(tmp_path)).mkdir(parents=True) |
| 75 | (snapshots_dir(tmp_path)).mkdir(parents=True) |
| 76 | (objects_dir(tmp_path)).mkdir(parents=True) |
| 77 | (head_path(tmp_path)).write_text("ref: refs/heads/main\n") |
| 78 | (repo_json_path(tmp_path)).write_text( |
| 79 | '{"repo_id": "test-repo", "name": "test"}' |
| 80 | ) |
| 81 | return tmp_path |
| 82 | |
| 83 | |
| 84 | def _invoke_in_repo(tmp_path: pathlib.Path, args: list[str]) -> tuple[int, str]: |
| 85 | """Invoke the muse CLI inside *tmp_path* (which must contain a .muse dir).""" |
| 86 | old_cwd = os.getcwd() |
| 87 | try: |
| 88 | os.chdir(tmp_path) |
| 89 | result = runner.invoke(cli, args) |
| 90 | return result.exit_code, result.output + result.stderr |
| 91 | finally: |
| 92 | os.chdir(old_cwd) |
| 93 | |
| 94 | |
| 95 | _ZERO_OID = NULL_LONG_ID |
| 96 | |
| 97 | |
| 98 | # =========================================================================== |
| 99 | # Unit tests — validate_branch_name |
| 100 | # =========================================================================== |
| 101 | |
| 102 | |
| 103 | class TestValidBranchNames: |
| 104 | """Names that must be accepted.""" |
| 105 | |
| 106 | @pytest.mark.parametrize("name", [ |
| 107 | "main", |
| 108 | "dev", |
| 109 | "feature/my-branch", |
| 110 | "fix/auth-token-exposure", |
| 111 | "feat/v2/core", |
| 112 | "release/1.2.0", |
| 113 | "bugfix/PROJ-42", |
| 114 | "hotfix/auth", |
| 115 | "branch-123_test", |
| 116 | "a", |
| 117 | "A", |
| 118 | "Z9", |
| 119 | "a" * 255, |
| 120 | "-branch", # leading dash: allowed (Git allows it; no shell interpolation) |
| 121 | "branch-", # trailing dash: allowed |
| 122 | "feat/--desc", # double dash in namespace: allowed |
| 123 | ]) |
| 124 | def test_accepted(self, name: str) -> None: |
| 125 | assert validate_branch_name(name) == name |
| 126 | |
| 127 | |
| 128 | # --------------------------------------------------------------------------- |
| 129 | # C0/C1 control character injection |
| 130 | # --------------------------------------------------------------------------- |
| 131 | |
| 132 | |
| 133 | class TestControlCharInjection: |
| 134 | """All C0 control chars must be rejected to prevent terminal injection. |
| 135 | |
| 136 | ESC (0x1b) is the highest-risk char: a branch named ``main\x1b[31m`` |
| 137 | would inject ANSI colour sequences into ``for-each-ref --format text`` |
| 138 | output, potentially hiding output, changing terminal colours, or |
| 139 | triggering OSC 8 hyperlinks in compliant terminal emulators. |
| 140 | """ |
| 141 | |
| 142 | @pytest.mark.parametrize("char,description", [ |
| 143 | ("\x00", "NUL"), |
| 144 | ("\x01", "SOH"), |
| 145 | ("\x02", "STX"), |
| 146 | ("\x03", "ETX"), |
| 147 | ("\x04", "EOT"), |
| 148 | ("\x05", "ENQ"), |
| 149 | ("\x06", "ACK"), |
| 150 | ("\x07", "BEL"), |
| 151 | ("\x08", "BS"), |
| 152 | ("\x09", "HT (tab)"), |
| 153 | ("\x0a", "LF"), |
| 154 | ("\x0b", "VT"), |
| 155 | ("\x0c", "FF"), |
| 156 | ("\x0d", "CR"), |
| 157 | ("\x0e", "SO"), |
| 158 | ("\x0f", "SI"), |
| 159 | ("\x10", "DLE"), |
| 160 | ("\x11", "DC1"), |
| 161 | ("\x12", "DC2"), |
| 162 | ("\x13", "DC3"), |
| 163 | ("\x14", "DC4"), |
| 164 | ("\x15", "NAK"), |
| 165 | ("\x16", "SYN"), |
| 166 | ("\x17", "ETB"), |
| 167 | ("\x18", "CAN"), |
| 168 | ("\x19", "EM"), |
| 169 | ("\x1a", "SUB"), |
| 170 | ("\x1b", "ESC — highest risk, ANSI sequence introducer"), |
| 171 | ("\x1c", "FS"), |
| 172 | ("\x1d", "GS"), |
| 173 | ("\x1e", "RS"), |
| 174 | ("\x1f", "US"), |
| 175 | ("\x20", "space (0x20) — shell interpolation / log-parsing hazard"), |
| 176 | ("\x7f", "DEL"), |
| 177 | ]) |
| 178 | def test_control_char_rejected(self, char: str, description: str) -> None: |
| 179 | with pytest.raises((ValueError, TypeError)): |
| 180 | validate_branch_name(f"main{char}malicious") |
| 181 | |
| 182 | def test_esc_at_start(self) -> None: |
| 183 | with pytest.raises(ValueError): |
| 184 | validate_branch_name("\x1bmain") |
| 185 | |
| 186 | def test_esc_at_end(self) -> None: |
| 187 | with pytest.raises(ValueError): |
| 188 | validate_branch_name("main\x1b") |
| 189 | |
| 190 | def test_multiple_control_chars(self) -> None: |
| 191 | """Payloads combining multiple control chars are still rejected.""" |
| 192 | with pytest.raises(ValueError): |
| 193 | validate_branch_name("feat\x1b[31m/\x07sub") |
| 194 | |
| 195 | def test_space_only(self) -> None: |
| 196 | with pytest.raises(ValueError): |
| 197 | validate_branch_name(" ") |
| 198 | |
| 199 | def test_space_in_namespace(self) -> None: |
| 200 | with pytest.raises(ValueError): |
| 201 | validate_branch_name("feat/my branch") |
| 202 | |
| 203 | |
| 204 | # --------------------------------------------------------------------------- |
| 205 | # Git-banned punctuation |
| 206 | # --------------------------------------------------------------------------- |
| 207 | |
| 208 | |
| 209 | class TestGitBannedPunctuation: |
| 210 | """Characters forbidden by git-check-ref-format that Muse now also rejects.""" |
| 211 | |
| 212 | @pytest.mark.parametrize("char,description", [ |
| 213 | ("~", "tilde — git ancestry operator"), |
| 214 | ("^", "caret — git ancestry operator"), |
| 215 | (":", "colon — refspec separator"), |
| 216 | ("?", "question mark — glob wildcard"), |
| 217 | ("*", "asterisk — glob wildcard"), |
| 218 | ("[", "open bracket — character class in glob"), |
| 219 | ]) |
| 220 | def test_git_banned_char_in_name(self, char: str, description: str) -> None: |
| 221 | with pytest.raises(ValueError): |
| 222 | validate_branch_name(f"feat{char}malicious") |
| 223 | |
| 224 | def test_tilde_suffix(self) -> None: |
| 225 | """feat~1 looks like a git ancestry ref; must be rejected.""" |
| 226 | with pytest.raises(ValueError): |
| 227 | validate_branch_name("feat~1") |
| 228 | |
| 229 | def test_colon_refspec(self) -> None: |
| 230 | """feat:main is a refspec; must be rejected.""" |
| 231 | with pytest.raises(ValueError): |
| 232 | validate_branch_name("feat:main") |
| 233 | |
| 234 | def test_glob_expansion_star(self) -> None: |
| 235 | with pytest.raises(ValueError): |
| 236 | validate_branch_name("feat/*") |
| 237 | |
| 238 | def test_glob_expansion_question(self) -> None: |
| 239 | with pytest.raises(ValueError): |
| 240 | validate_branch_name("feat/fo?") |
| 241 | |
| 242 | def test_glob_char_class(self) -> None: |
| 243 | with pytest.raises(ValueError): |
| 244 | validate_branch_name("feat/[abc]") |
| 245 | |
| 246 | |
| 247 | # --------------------------------------------------------------------------- |
| 248 | # Single-dot path component (inode aliasing) |
| 249 | # --------------------------------------------------------------------------- |
| 250 | |
| 251 | |
| 252 | class TestSingleDotPathComponent: |
| 253 | """``feat/./sub`` and ``feat/sub`` resolve to the same inode on disk. |
| 254 | |
| 255 | If both were valid branch names, writing to the first would silently |
| 256 | overwrite the second's ref file. This is a subtle data-corruption vector |
| 257 | that requires no privilege escalation. |
| 258 | """ |
| 259 | |
| 260 | def test_dot_slash_dot_slash(self) -> None: |
| 261 | """feat/./sub — single dot in the middle.""" |
| 262 | with pytest.raises(ValueError): |
| 263 | validate_branch_name("feat/./sub") |
| 264 | |
| 265 | def test_dot_slash_at_end(self) -> None: |
| 266 | """feat/. — trailing slash-dot.""" |
| 267 | with pytest.raises(ValueError): |
| 268 | validate_branch_name("feat/.") |
| 269 | |
| 270 | def test_deep_dot_path(self) -> None: |
| 271 | """a/b/./c/d — dot buried deep in a hierarchy.""" |
| 272 | with pytest.raises(ValueError): |
| 273 | validate_branch_name("a/b/./c/d") |
| 274 | |
| 275 | def test_multiple_dots(self) -> None: |
| 276 | """Two single-dot components in a row.""" |
| 277 | with pytest.raises(ValueError): |
| 278 | validate_branch_name("a/././b") |
| 279 | |
| 280 | def test_dot_as_entire_name(self) -> None: |
| 281 | """Bare dot is already rejected by the leading-dot rule.""" |
| 282 | with pytest.raises(ValueError): |
| 283 | validate_branch_name(".") |
| 284 | |
| 285 | def test_inode_aliasing_proven(self, tmp_path: pathlib.Path) -> None: |
| 286 | """Demonstrate the attack: /tmp/x/feat/./sub IS the same file as /tmp/x/feat/sub.""" |
| 287 | import os |
| 288 | (tmp_path / "feat").mkdir() |
| 289 | (tmp_path / "feat" / "sub").write_text("ORIGINAL") |
| 290 | alias = tmp_path / "feat" / "." / "sub" |
| 291 | assert alias.exists(), "alias should exist via filesystem normalisation" |
| 292 | assert os.stat(tmp_path / "feat" / "sub").st_ino == os.stat(alias).st_ino |
| 293 | alias.write_text("OVERWRITTEN") |
| 294 | assert (tmp_path / "feat" / "sub").read_text() == "OVERWRITTEN" |
| 295 | |
| 296 | |
| 297 | # --------------------------------------------------------------------------- |
| 298 | # .lock suffix |
| 299 | # --------------------------------------------------------------------------- |
| 300 | |
| 301 | |
| 302 | class TestLockSuffix: |
| 303 | """Names ending in .lock on any path component must be rejected. |
| 304 | |
| 305 | The VCS convention reserves ``.lock`` for exclusive-lock files. Allowing |
| 306 | ``main.lock`` would create ``.muse/refs/heads/main.lock`` — a file that |
| 307 | tooling scanning the ref directory could mistake for a stale lock or a |
| 308 | failed atomic write. |
| 309 | """ |
| 310 | |
| 311 | def test_top_level_lock(self) -> None: |
| 312 | with pytest.raises(ValueError): |
| 313 | validate_branch_name("main.lock") |
| 314 | |
| 315 | def test_namespaced_lock(self) -> None: |
| 316 | with pytest.raises(ValueError): |
| 317 | validate_branch_name("feat/my-branch.lock") |
| 318 | |
| 319 | def test_lock_as_midpath_component(self) -> None: |
| 320 | with pytest.raises(ValueError): |
| 321 | validate_branch_name("feat/foo.lock/sub") |
| 322 | |
| 323 | def test_lock_prefix_only_is_allowed(self) -> None: |
| 324 | """A branch named 'lockdown' does not end in .lock; must be allowed.""" |
| 325 | assert validate_branch_name("lockdown") == "lockdown" |
| 326 | |
| 327 | def test_lock_substring_allowed(self) -> None: |
| 328 | """'lockfix' does not end in .lock; must be allowed.""" |
| 329 | assert validate_branch_name("lockfix") == "lockfix" |
| 330 | |
| 331 | def test_dotlock_exact_name(self) -> None: |
| 332 | """.lock alone is rejected by the leading-dot rule first.""" |
| 333 | with pytest.raises(ValueError): |
| 334 | validate_branch_name(".lock") |
| 335 | |
| 336 | |
| 337 | # --------------------------------------------------------------------------- |
| 338 | # @{ sequence and bare @ |
| 339 | # --------------------------------------------------------------------------- |
| 340 | |
| 341 | |
| 342 | class TestAtBraceSequence: |
| 343 | """The @{ sequence is git reflog notation; it must be rejected. |
| 344 | |
| 345 | A branch named ``feat/@{0}`` would confuse any tool that parses |
| 346 | ``<branch>@{<n>}`` as a reflog reference — including Muse's own future |
| 347 | reflog implementation. |
| 348 | """ |
| 349 | |
| 350 | def test_at_brace_top_level(self) -> None: |
| 351 | with pytest.raises(ValueError): |
| 352 | validate_branch_name("@{upstream}") |
| 353 | |
| 354 | def test_at_brace_in_namespace(self) -> None: |
| 355 | with pytest.raises(ValueError): |
| 356 | validate_branch_name("feat/@{0}") |
| 357 | |
| 358 | def test_at_brace_suffix(self) -> None: |
| 359 | with pytest.raises(ValueError): |
| 360 | validate_branch_name("feat@{0}") |
| 361 | |
| 362 | def test_bare_at(self) -> None: |
| 363 | """Bare @ is git HEAD shorthand; rejected for the same reason.""" |
| 364 | with pytest.raises(ValueError): |
| 365 | validate_branch_name("@") |
| 366 | |
| 367 | def test_at_in_normal_name_allowed(self) -> None: |
| 368 | """@ followed by anything other than { is not the forbidden sequence.""" |
| 369 | # e.g. "feat@42" is unusual but not the @{ reflog pattern |
| 370 | # validate_branch_name should allow it (@ is ASCII printable, not |
| 371 | # in the C0 or punctuation block). |
| 372 | result = validate_branch_name("feat@42") |
| 373 | assert result == "feat@42" |
| 374 | |
| 375 | |
| 376 | # --------------------------------------------------------------------------- |
| 377 | # Existing rules (regression: they must still work after the regex change) |
| 378 | # --------------------------------------------------------------------------- |
| 379 | |
| 380 | |
| 381 | class TestExistingRulesRegression: |
| 382 | """Ensure the new regex does not break pre-existing rejections.""" |
| 383 | |
| 384 | def test_backslash(self) -> None: |
| 385 | with pytest.raises(ValueError): |
| 386 | validate_branch_name("malicious\\branch") |
| 387 | |
| 388 | def test_null_byte(self) -> None: |
| 389 | with pytest.raises(ValueError): |
| 390 | validate_branch_name("branch\x00name") |
| 391 | |
| 392 | def test_carriage_return(self) -> None: |
| 393 | with pytest.raises(ValueError): |
| 394 | validate_branch_name("branch\rname") |
| 395 | |
| 396 | def test_linefeed(self) -> None: |
| 397 | with pytest.raises(ValueError): |
| 398 | validate_branch_name("branch\nname") |
| 399 | |
| 400 | def test_tab(self) -> None: |
| 401 | with pytest.raises(ValueError): |
| 402 | validate_branch_name("branch\tname") |
| 403 | |
| 404 | def test_leading_dot(self) -> None: |
| 405 | with pytest.raises(ValueError): |
| 406 | validate_branch_name(".hidden") |
| 407 | |
| 408 | def test_trailing_dot(self) -> None: |
| 409 | with pytest.raises(ValueError): |
| 410 | validate_branch_name("branch.") |
| 411 | |
| 412 | def test_consecutive_dots(self) -> None: |
| 413 | with pytest.raises(ValueError): |
| 414 | validate_branch_name("branch..name") |
| 415 | |
| 416 | def test_double_slash(self) -> None: |
| 417 | with pytest.raises(ValueError): |
| 418 | validate_branch_name("feat//branch") |
| 419 | |
| 420 | def test_leading_slash(self) -> None: |
| 421 | with pytest.raises(ValueError): |
| 422 | validate_branch_name("/branch") |
| 423 | |
| 424 | def test_trailing_slash(self) -> None: |
| 425 | with pytest.raises(ValueError): |
| 426 | validate_branch_name("branch/") |
| 427 | |
| 428 | def test_empty_string(self) -> None: |
| 429 | with pytest.raises(ValueError): |
| 430 | validate_branch_name("") |
| 431 | |
| 432 | def test_too_long(self) -> None: |
| 433 | with pytest.raises(ValueError): |
| 434 | validate_branch_name("a" * 256) |
| 435 | |
| 436 | def test_dotdot_traversal(self) -> None: |
| 437 | with pytest.raises(ValueError): |
| 438 | validate_branch_name("../../etc/passwd") |
| 439 | |
| 440 | def test_dotdot_in_namespace(self) -> None: |
| 441 | with pytest.raises(ValueError): |
| 442 | validate_branch_name("feat/../main") |
| 443 | |
| 444 | |
| 445 | # =========================================================================== |
| 446 | # Integration tests — store-level gatekeeping |
| 447 | # =========================================================================== |
| 448 | |
| 449 | |
| 450 | class TestWriteBranchRefGatekeeping: |
| 451 | """write_branch_ref validates the branch name before writing any file.""" |
| 452 | |
| 453 | def test_traversal_rejected_before_write(self, tmp_path: pathlib.Path) -> None: |
| 454 | repo = _make_repo(tmp_path) |
| 455 | with pytest.raises(ValueError): |
| 456 | write_branch_ref(repo, "../../etc/passwd", _ZERO_OID) |
| 457 | assert not (tmp_path / "etc" / "passwd").exists() |
| 458 | |
| 459 | def test_esc_injection_rejected_before_write(self, tmp_path: pathlib.Path) -> None: |
| 460 | repo = _make_repo(tmp_path) |
| 461 | with pytest.raises(ValueError): |
| 462 | write_branch_ref(repo, "main\x1b[31m", _ZERO_OID) |
| 463 | |
| 464 | def test_single_dot_component_rejected(self, tmp_path: pathlib.Path) -> None: |
| 465 | repo = _make_repo(tmp_path) |
| 466 | with pytest.raises(ValueError): |
| 467 | write_branch_ref(repo, "feat/./sub", _ZERO_OID) |
| 468 | |
| 469 | def test_lock_suffix_rejected(self, tmp_path: pathlib.Path) -> None: |
| 470 | repo = _make_repo(tmp_path) |
| 471 | with pytest.raises(ValueError): |
| 472 | write_branch_ref(repo, "main.lock", _ZERO_OID) |
| 473 | |
| 474 | def test_at_brace_rejected(self, tmp_path: pathlib.Path) -> None: |
| 475 | repo = _make_repo(tmp_path) |
| 476 | with pytest.raises(ValueError): |
| 477 | write_branch_ref(repo, "feat/@{0}", _ZERO_OID) |
| 478 | |
| 479 | def test_space_in_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 480 | repo = _make_repo(tmp_path) |
| 481 | with pytest.raises(ValueError): |
| 482 | write_branch_ref(repo, "feat branch", _ZERO_OID) |
| 483 | |
| 484 | def test_valid_name_writes_file(self, tmp_path: pathlib.Path) -> None: |
| 485 | repo = _make_repo(tmp_path) |
| 486 | write_branch_ref(repo, "feat/ok", _ZERO_OID) |
| 487 | ref_path = heads_dir(repo) / "feat" / "ok" |
| 488 | assert ref_path.read_text().strip() == _ZERO_OID |
| 489 | |
| 490 | def test_valid_name_no_file_escape(self, tmp_path: pathlib.Path) -> None: |
| 491 | """A valid name must not write outside .muse/refs/heads/.""" |
| 492 | repo = _make_repo(tmp_path) |
| 493 | write_branch_ref(repo, "main", _ZERO_OID) |
| 494 | ref_path = heads_dir(repo) / "main" |
| 495 | assert ref_path.exists() |
| 496 | assert not (repo / "main").exists() |
| 497 | |
| 498 | |
| 499 | class TestWriteHeadBranchGatekeeping: |
| 500 | """write_head_branch validates the branch name before writing HEAD.""" |
| 501 | |
| 502 | def test_esc_injection_rejected(self, tmp_path: pathlib.Path) -> None: |
| 503 | repo = _make_repo(tmp_path) |
| 504 | with pytest.raises(ValueError): |
| 505 | write_head_branch(repo, "main\x1b[31m") |
| 506 | |
| 507 | def test_dotdot_traversal_rejected(self, tmp_path: pathlib.Path) -> None: |
| 508 | repo = _make_repo(tmp_path) |
| 509 | with pytest.raises(ValueError): |
| 510 | write_head_branch(repo, "../../etc/passwd") |
| 511 | |
| 512 | def test_valid_name_writes_head(self, tmp_path: pathlib.Path) -> None: |
| 513 | repo = _make_repo(tmp_path) |
| 514 | write_head_branch(repo, "feat/ok") |
| 515 | head = (head_path(repo)).read_text() |
| 516 | assert "feat/ok" in head |
| 517 | assert "../../" not in head |
| 518 | |
| 519 | |
| 520 | # =========================================================================== |
| 521 | # Integration tests — CLI commands via CliRunner |
| 522 | # =========================================================================== |
| 523 | |
| 524 | |
| 525 | class TestUpdateRefCLIGatekeeping: |
| 526 | """muse update-ref rejects injection branch names at the CLI level.""" |
| 527 | |
| 528 | def test_dotdot_traversal(self, tmp_path: pathlib.Path) -> None: |
| 529 | _make_repo(tmp_path) |
| 530 | code, out = _invoke_in_repo(tmp_path, ["update-ref", "../../etc/passwd", _ZERO_OID]) |
| 531 | assert code != 0 |
| 532 | assert "Invalid branch name" in out or "forbidden" in out.lower() or "error" in out.lower() |
| 533 | |
| 534 | def test_esc_injection(self, tmp_path: pathlib.Path) -> None: |
| 535 | _make_repo(tmp_path) |
| 536 | code, out = _invoke_in_repo(tmp_path, ["update-ref", "main\x1b[31m", _ZERO_OID]) |
| 537 | assert code != 0 |
| 538 | |
| 539 | def test_lock_suffix(self, tmp_path: pathlib.Path) -> None: |
| 540 | _make_repo(tmp_path) |
| 541 | code, out = _invoke_in_repo(tmp_path, ["update-ref", "main.lock", _ZERO_OID]) |
| 542 | assert code != 0 |
| 543 | assert not (heads_dir(tmp_path) / "main.lock").exists() |
| 544 | |
| 545 | def test_single_dot_component(self, tmp_path: pathlib.Path) -> None: |
| 546 | _make_repo(tmp_path) |
| 547 | code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat/./sub", _ZERO_OID]) |
| 548 | assert code != 0 |
| 549 | # The alias must not have silently created feat/sub |
| 550 | assert not (heads_dir(tmp_path) / "feat" / "sub").exists() |
| 551 | |
| 552 | def test_at_brace(self, tmp_path: pathlib.Path) -> None: |
| 553 | _make_repo(tmp_path) |
| 554 | code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat/@{0}", _ZERO_OID]) |
| 555 | assert code != 0 |
| 556 | |
| 557 | def test_space_in_name(self, tmp_path: pathlib.Path) -> None: |
| 558 | _make_repo(tmp_path) |
| 559 | code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat branch", _ZERO_OID]) |
| 560 | assert code != 0 |
| 561 | |
| 562 | def test_tilde(self, tmp_path: pathlib.Path) -> None: |
| 563 | _make_repo(tmp_path) |
| 564 | code, out = _invoke_in_repo(tmp_path, ["update-ref", "feat~1", _ZERO_OID]) |
| 565 | assert code != 0 |
| 566 | |
| 567 | |
| 568 | class TestSymbolicRefCLIGatekeeping: |
| 569 | """muse symbolic-ref --set rejects injection branch names.""" |
| 570 | |
| 571 | def test_dotdot_traversal(self, tmp_path: pathlib.Path) -> None: |
| 572 | _make_repo(tmp_path) |
| 573 | code, out = _invoke_in_repo(tmp_path, [ |
| 574 | "symbolic-ref", "HEAD", |
| 575 | "--set", "../../etc/passwd", "--create-branch", |
| 576 | ]) |
| 577 | assert code != 0 |
| 578 | |
| 579 | def test_esc_injection(self, tmp_path: pathlib.Path) -> None: |
| 580 | _make_repo(tmp_path) |
| 581 | code, out = _invoke_in_repo(tmp_path, [ |
| 582 | "symbolic-ref", "HEAD", |
| 583 | "--set", "main\x1b[31m", "--create-branch", |
| 584 | ]) |
| 585 | assert code != 0 |
| 586 | |
| 587 | def test_lock_suffix(self, tmp_path: pathlib.Path) -> None: |
| 588 | _make_repo(tmp_path) |
| 589 | code, out = _invoke_in_repo(tmp_path, [ |
| 590 | "symbolic-ref", "HEAD", |
| 591 | "--set", "main.lock", "--create-branch", |
| 592 | ]) |
| 593 | assert code != 0 |
| 594 | |
| 595 | def test_at_brace(self, tmp_path: pathlib.Path) -> None: |
| 596 | _make_repo(tmp_path) |
| 597 | code, out = _invoke_in_repo(tmp_path, [ |
| 598 | "symbolic-ref", "HEAD", |
| 599 | "--set", "@{0}", "--create-branch", |
| 600 | ]) |
| 601 | assert code != 0 |
| 602 | |
| 603 | |
| 604 | class TestCheckRefFormatCLI: |
| 605 | """muse check-ref-format reflects the full rule set.""" |
| 606 | |
| 607 | def _run_check(self, tmp_path: pathlib.Path, name: str) -> tuple[int, _CheckRefFormatResult]: |
| 608 | _make_repo(tmp_path) |
| 609 | code, out = _invoke_in_repo(tmp_path, ["check-ref-format", name, "--json"]) |
| 610 | raw = out.strip() |
| 611 | data: _CheckRefFormatResult = json.loads(raw) if raw else {} |
| 612 | return code, data |
| 613 | |
| 614 | def test_valid_name_passes(self, tmp_path: pathlib.Path) -> None: |
| 615 | code, data = self._run_check(tmp_path, "feat/ok") |
| 616 | assert code == 0 |
| 617 | assert data["all_valid"] is True |
| 618 | |
| 619 | def test_dotdot_traversal_fails(self, tmp_path: pathlib.Path) -> None: |
| 620 | code, data = self._run_check(tmp_path, "../../etc/passwd") |
| 621 | assert code != 0 |
| 622 | assert data.get("all_valid") is False |
| 623 | |
| 624 | def test_esc_injection_fails(self, tmp_path: pathlib.Path) -> None: |
| 625 | code, data = self._run_check(tmp_path, "main\x1b[31m") |
| 626 | assert code != 0 |
| 627 | assert data.get("all_valid") is False |
| 628 | |
| 629 | def test_lock_suffix_fails(self, tmp_path: pathlib.Path) -> None: |
| 630 | code, data = self._run_check(tmp_path, "main.lock") |
| 631 | assert code != 0 |
| 632 | assert data.get("all_valid") is False |
| 633 | |
| 634 | def test_single_dot_component_fails(self, tmp_path: pathlib.Path) -> None: |
| 635 | code, data = self._run_check(tmp_path, "feat/./sub") |
| 636 | assert code != 0 |
| 637 | assert data.get("all_valid") is False |
| 638 | |
| 639 | def test_at_brace_fails(self, tmp_path: pathlib.Path) -> None: |
| 640 | code, data = self._run_check(tmp_path, "@{upstream}") |
| 641 | assert code != 0 |
| 642 | assert data.get("all_valid") is False |
| 643 | |
| 644 | def test_space_fails(self, tmp_path: pathlib.Path) -> None: |
| 645 | code, data = self._run_check(tmp_path, "feat branch") |
| 646 | assert code != 0 |
| 647 | assert data.get("all_valid") is False |
| 648 | |
| 649 | def test_tilde_fails(self, tmp_path: pathlib.Path) -> None: |
| 650 | code, data = self._run_check(tmp_path, "feat~1") |
| 651 | assert code != 0 |
| 652 | assert data.get("all_valid") is False |
| 653 | |
| 654 | def test_rules_endpoint_lists_new_patterns(self, tmp_path: pathlib.Path) -> None: |
| 655 | """--rules must mention the new forbidden patterns.""" |
| 656 | _make_repo(tmp_path) |
| 657 | code, out = _invoke_in_repo(tmp_path, ["check-ref-format", "--rules", "--json"]) |
| 658 | rules = json.loads(out.strip()) |
| 659 | patterns = rules.get("forbidden_patterns", []) |
| 660 | assert any("lock" in p for p in patterns), "missing .lock rule" |
| 661 | assert any("dot" in p.lower() and "/" in p for p in patterns), "missing /./rule" |
| 662 | assert any("@{" in p for p in patterns), "missing @{ rule" |
| 663 | |
| 664 | |
| 665 | # =========================================================================== |
| 666 | # Concurrency / race — validate blocks before any write |
| 667 | # =========================================================================== |
| 668 | |
| 669 | |
| 670 | class TestConcurrentWriteWithInjectionName: |
| 671 | """Two threads racing to write a traversal branch name: both must fail.""" |
| 672 | |
| 673 | def test_concurrent_traversal_both_fail(self, tmp_path: pathlib.Path) -> None: |
| 674 | import threading |
| 675 | |
| 676 | repo = _make_repo(tmp_path) |
| 677 | errors: list[str] = [] |
| 678 | successes: list[str] = [] |
| 679 | |
| 680 | def try_write(name: str) -> None: |
| 681 | try: |
| 682 | write_branch_ref(repo, name, _ZERO_OID) |
| 683 | successes.append(name) |
| 684 | except (ValueError, TypeError) as exc: |
| 685 | errors.append(str(exc)) |
| 686 | |
| 687 | threads = [ |
| 688 | threading.Thread(target=try_write, args=("../../etc/passwd",)), |
| 689 | threading.Thread(target=try_write, args=("feat\x1b[31m",)), |
| 690 | threading.Thread(target=try_write, args=("main.lock",)), |
| 691 | threading.Thread(target=try_write, args=("feat/./sub",)), |
| 692 | ] |
| 693 | for t in threads: |
| 694 | t.start() |
| 695 | for t in threads: |
| 696 | t.join() |
| 697 | |
| 698 | assert successes == [], f"Expected all writes to fail; successes: {successes}" |
| 699 | assert len(errors) == 4 |
| 700 | |
| 701 | def test_concurrent_valid_writes_succeed(self, tmp_path: pathlib.Path) -> None: |
| 702 | """Ensure the fix does not regress valid concurrent writes.""" |
| 703 | import threading |
| 704 | |
| 705 | repo = _make_repo(tmp_path) |
| 706 | errors: list[str] = [] |
| 707 | |
| 708 | def try_write(name: str) -> None: |
| 709 | try: |
| 710 | write_branch_ref(repo, name, _ZERO_OID) |
| 711 | except Exception as exc: |
| 712 | errors.append(f"{name}: {exc}") |
| 713 | |
| 714 | threads = [ |
| 715 | threading.Thread(target=try_write, args=(f"feat/branch-{i}",)) |
| 716 | for i in range(8) |
| 717 | ] |
| 718 | for t in threads: |
| 719 | t.start() |
| 720 | for t in threads: |
| 721 | t.join() |
| 722 | |
| 723 | assert errors == [], f"Valid writes unexpectedly failed: {errors}" |
| 724 | |
| 725 | |
| 726 | # =========================================================================== |
| 727 | # Fuzzing — randomised injection payloads |
| 728 | # =========================================================================== |
| 729 | |
| 730 | |
| 731 | class TestFuzzedBranchNames: |
| 732 | """Randomised payloads: any name containing a forbidden char must be rejected.""" |
| 733 | |
| 734 | @pytest.mark.parametrize("seed", range(20)) |
| 735 | def test_random_control_char_payload(self, seed: int) -> None: |
| 736 | import random |
| 737 | rng = random.Random(seed) |
| 738 | # Build a name with a random C0 or DEL char embedded |
| 739 | forbidden = [chr(c) for c in range(0x00, 0x21)] + ["\x7f"] |
| 740 | char = rng.choice(forbidden) |
| 741 | name = f"feat/{rng.randbytes(4).hex()}{char}suffix" |
| 742 | with pytest.raises((ValueError, TypeError)): |
| 743 | validate_branch_name(name) |
| 744 | |
| 745 | @pytest.mark.parametrize("seed", range(10)) |
| 746 | def test_random_git_punct_payload(self, seed: int) -> None: |
| 747 | import random |
| 748 | rng = random.Random(seed + 100) |
| 749 | git_banned = list("~^:?*[") |
| 750 | char = rng.choice(git_banned) |
| 751 | name = f"branch{char}{rng.randbytes(3).hex()}" |
| 752 | with pytest.raises((ValueError, TypeError)): |
| 753 | validate_branch_name(name) |
File History
5 commits
sha256:c5131d76c6eada02939111fda4aa8e51b0c1456b9983727cfd6be101916de14e
merge: pull local/dev — resolve trivial _EXT_MAP symbol con…
Sonnet 4.6
patch
12 days ago
sha256:9c33d61749fff814c5226d5386aa2af7064c2c02788594a25fdd709358132eea
fix: _PROPOSAL_PREFIX_RESOLVE_LIMIT 200 → 100 to match hub …
Sonnet 4.6
19 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago