"""Security-focused regression tests for all porcelain hardening fixes. These tests verify the specific security improvements made during the porcelain hardening pass: - ReDoS guard in content-grep (pattern length limit) - Zip-slip prevention in archive and snapshot export - validate_branch_name added to checkout and rebase - sanitize_display applied to all user-sourced echoed strings - Atomic shelf writes (no temp file corruption) - Snapshot ID glob prefix sanitisation """ from __future__ import annotations import datetime import json import pathlib import pytest from tests.cli_test_helper import CliRunner from muse.core.types import NULL_COMMIT_ID, long_id, blob_id, fake_id from muse.core.object_store import object_path from muse.core.paths import heads_dir, muse_dir, shelf_dir cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Shared repo setup helper # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _init_repo(tmp_path: pathlib.Path, domain: str = "code") -> tuple[pathlib.Path, str]: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() repo_id = fake_id("repo") (dot_muse / "repo.json").write_text(json.dumps({ "repo_id": repo_id, "domain": domain, "default_branch": "main", "created_at": "2025-01-01T00:00:00+00:00", }), encoding="utf-8") (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "snapshots").mkdir() (dot_muse / "commits").mkdir() (dot_muse / "objects").mkdir() return tmp_path, repo_id def _make_commit(root: pathlib.Path, repo_id: str, message: str = "test") -> str: from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id ref_file = heads_dir(root) / "main" parent_id = ref_file.read_text().strip() if ref_file.exists() else None manifest: Manifest = {} snap_id = compute_snapshot_id(manifest) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id # --------------------------------------------------------------------------- # content-grep: ReDoS guard # --------------------------------------------------------------------------- class TestContentGrepSecurity: def test_pattern_too_long_rejected(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) long_pattern = "a" * 501 # > 500 char limit result = runner.invoke(cli, ["content-grep", long_pattern], env=_env(root)) assert result.exit_code != 0 assert "too long" in result.stderr or "Pattern" in result.stderr def test_pattern_exactly_500_chars_accepted(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) pattern_500 = "a" * 500 result = runner.invoke(cli, ["content-grep", pattern_500], env=_env(root)) # No match → exit 1, but not a ReDoS validation failure assert result.exit_code in (0, 1) def test_invalid_regex_rejected(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["content-grep", "[invalid regex"], env=_env(root)) assert result.exit_code != 0 assert "regex" in result.stderr.lower() or "invalid" in result.stderr.lower() def test_output_sanitized_no_ansi_injection(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) content = b"normal line\n\x1b[31mRED\x1b[0m line\nanother\n" obj_id = blob_id(content) obj_path = object_path(root, obj_id) obj_path.parent.mkdir(parents=True, exist_ok=True) obj_path.write_bytes(content) from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id manifest = {"file.txt": obj_id} snap_id = compute_snapshot_id(manifest) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="test", committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message="test", committed_at=committed_at, parent_commit_id=None, )) (heads_dir(root) / "main").write_text(commit_id) result = runner.invoke(cli, ["content-grep", "RED"], env=_env(root)) if result.exit_code == 0: assert "\x1b" not in result.output # --------------------------------------------------------------------------- # archive: zip-slip guard # --------------------------------------------------------------------------- class TestArchiveSecurity: def test_archive_prefix_with_dotdot_rejected(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["archive", "--prefix", "../../malicious"], env=_env(root)) assert result.exit_code != 0 def test_zip_slip_guard_in_safe_arcname(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("safe", "../../../etc/passwd") is None assert _safe_arcname("safe", "/etc/passwd") is None assert _safe_arcname("safe", "normal/path.txt") == "safe/normal/path.txt" # --------------------------------------------------------------------------- # snapshot: glob prefix sanitisation # --------------------------------------------------------------------------- class TestSnapshotSecurity: def test_validate_snapshot_id_prefix_strips_metacharacters(self) -> None: from muse.cli.commands.snapshot_cmd import _validate_snapshot_id_prefix prefix = _validate_snapshot_id_prefix("*bad[0-9]?glob*") assert "*" not in prefix assert "[" not in prefix assert "?" not in prefix assert all(c in "0123456789abcdef" for c in prefix) def test_snapshot_show_with_glob_meta_no_injection( self, tmp_path: pathlib.Path ) -> None: """Glob metacharacters in the snapshot ID prefix must be sanitised.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) # The '*' prefix is sanitised to empty string (no hex chars), so the # command finds nothing but must not raise an exception or expose paths. result = runner.invoke(cli, ["snapshot", "show", "*"], env=_env(root)) # Should not crash; may exit 0 (empty match) or non-zero (not found) assert "\x1b" not in result.output assert result.exception is None def test_safe_arcname_in_snapshot(self) -> None: from muse.cli.commands.snapshot_cmd import _safe_arcname assert _safe_arcname("", "../../../etc/passwd") is None assert _safe_arcname("prefix", "safe.txt") == "prefix/safe.txt" # --------------------------------------------------------------------------- # checkout: validate_branch_name on switch # --------------------------------------------------------------------------- class TestCheckoutSecurity: def test_checkout_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["checkout", "../traversal"], env=_env(root)) assert result.exit_code != 0 def test_checkout_double_dot_rejected(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["checkout", ".."], env=_env(root)) assert result.exit_code != 0 def test_checkout_valid_existing_branch_works(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) # Create a second branch and switch to it (heads_dir(root) / "dev").write_text( (heads_dir(root) / "main").read_text() ) result = runner.invoke(cli, ["checkout", "dev"], env=_env(root), catch_exceptions=False) assert result.exit_code == 0 # --------------------------------------------------------------------------- # rebase: validate_branch_name on upstream/onto # --------------------------------------------------------------------------- class TestRebaseSecurity: def test_rebase_invalid_upstream_fails(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["rebase", "../../../etc/passwd"], env=_env(root)) assert result.exit_code != 0 # --------------------------------------------------------------------------- # shelf: atomic write regression # --------------------------------------------------------------------------- class TestShelfAtomicWrite: def test_no_temp_files_after_save(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id from muse.core.shelf import write_shelf_entry raw = { "name": "dev/000", "snapshot": {}, "deleted": [], "snapshot_id": long_id("a" * 64), "parent_commit": long_id("b" * 64), "branch": "main", "created_at": "2025-01-01T00:00:00+00:00", "created_by": "human", "intent_type": "checkpoint", "intent": None, "resumable": False, "tags": [], "expires_at": None, "domain_state": {}, } shelf_id = _compute_shelf_id(raw) entry = ShelfEntry(id=shelf_id, **raw) # type: ignore[misc] write_shelf_entry(root, entry) # type: ignore[arg-type] assert list(muse_dir(root).glob(".muse-tmp-*")) == [] assert (shelf_dir(root)).is_dir() def test_shelf_file_contents_after_atomic_write(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) from muse.cli.commands.shelf import _load_shelf, ShelfEntry, _compute_shelf_id from muse.core.shelf import write_shelf_entry raw = { "name": "dev/000", "snapshot": {"a.py": long_id("c" * 64)}, "deleted": [], "snapshot_id": long_id("b" * 64), "parent_commit": long_id("d" * 64), "branch": "main", "created_at": "2025-06-01T12:00:00+00:00", "created_by": "human", "intent_type": "checkpoint", "intent": None, "resumable": False, "tags": [], "expires_at": None, "domain_state": {}, } shelf_id = _compute_shelf_id(raw) entry = ShelfEntry(id=shelf_id, **raw) # type: ignore[misc] write_shelf_entry(root, entry) # type: ignore[arg-type] loaded = _load_shelf(root) assert len(loaded) == 1 assert loaded[0]["name"] == "dev/000" # --------------------------------------------------------------------------- # show: sanitize_display regression # --------------------------------------------------------------------------- class TestShowDisplaySanitize: def test_commit_message_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id snap_id = compute_snapshot_id({}) committed_at = datetime.datetime.now(datetime.timezone.utc) # Compute the commit_id from the actual message that will be stored. actual_message = "malicious\x1b[31mRED\x1b[0m message" commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message=actual_message, committed_at_iso=committed_at.isoformat(), author="Alice\x1b[0m",) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={})) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=actual_message, committed_at=committed_at, parent_commit_id=None, author="Alice\x1b[0m", )) (heads_dir(root) / "main").write_text(commit_id) result = runner.invoke(cli, ["read"], env=_env(root), catch_exceptions=False) assert result.exit_code == 0 assert "\x1b" not in result.output # --------------------------------------------------------------------------- # reflog: operation sanitization regression # --------------------------------------------------------------------------- class TestReflogSanitize: def test_operation_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) from muse.core.reflog import append_reflog _make_commit(root, repo_id) append_reflog( root, "main", old_id=NULL_COMMIT_ID, new_id="a" * 64, author="user", operation="malicious\x1b[31mRED\x1b[0m", ) result = runner.invoke(cli, ["reflog"], env=_env(root), catch_exceptions=False) assert "\x1b" not in result.output