"""Security tests: unbounded ast.parse — CPU/memory denial of service. Python's ast.parse exhibits super-linear behaviour on certain constructs: deeply nested list/dict literals, long chains of binary operators, and multi-megabyte source files all cause parsing time to spike non-linearly. A malicious agent can commit a crafted Python file that causes any command which calls ast.parse on workspace files (blast-risk, entangle, semantic-test-coverage, narrative, gravity, contract, rename, dead) to peg a CPU core indefinitely. The fix: check len(source_bytes) > MAX_AST_BYTES (2 MB) before calling ast.parse. Commands must gracefully skip or report an error rather than blocking the event loop. """ from __future__ import annotations import ast import datetime import hashlib import json import pathlib import time import pytest from tests.cli_test_helper import CliRunner from muse.core.object_store import object_path from muse.core.types import fake_id from muse.core.paths import heads_dir, muse_dir cli = None runner = CliRunner() _AST_DOS_BUDGET_S: float = 10.0 # hard wall-clock limit per test _MAX_AST_BYTES: int = 2 * 1024 * 1024 # 2 MB — must match validation.MAX_AST_BYTES # --------------------------------------------------------------------------- # Shared repo helpers (duplicated-minimal version — no shared conftest dep) # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _init_code_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() repo_id = fake_id("repo") (dot_muse / "repo.json").write_text( json.dumps({ "repo_id": repo_id, "domain": "code", "default_branch": "main", "created_at": "2025-01-01T00:00:00+00:00", }), encoding="utf-8", ) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "snapshots").mkdir() (dot_muse / "commits").mkdir() (dot_muse / "objects").mkdir() return tmp_path, repo_id def _store_object(root: pathlib.Path, content: bytes) -> str: from muse.core.types import blob_id from muse.core.object_store import write_object oid = blob_id(content) write_object(root, oid, content) return oid def _make_commit( root: pathlib.Path, repo_id: str, message: str = "init", manifest: Manifest | None = None, ) -> str: from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id ref_file = heads_dir(root) / "main" parent_id = ref_file.read_text().strip() if ref_file.exists() else None m: Manifest = manifest or {} snap_id = compute_snapshot_id(m) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id # --------------------------------------------------------------------------- # Payload generators # --------------------------------------------------------------------------- def _oversized_py_source() -> bytes: """Produce a valid Python source file just over MAX_AST_BYTES (2 MB + 1).""" # Simple repeated variable assignments — valid Python, linear AST. header = "# generated oversized file\n" line = "x = 1\n" target = _MAX_AST_BYTES + 1 lines_needed = (target - len(header.encode())) // len(line.encode()) return (header + line * lines_needed).encode() def _deep_nesting_bomb(depth: int = 2_000) -> bytes: """Produce a Python source with *depth*-level nested list literals. CPython's compile stage (inside ast.parse) shows super-linear behaviour on this input; at depth 10_000 it can take minutes. We use a moderate depth here to keep the test fast on CI while still showing the pattern. """ inner = "0" for _ in range(depth): inner = f"[{inner}]" return f"x = {inner}\n".encode() # --------------------------------------------------------------------------- # § 1 — MAX_AST_BYTES constant is exported # --------------------------------------------------------------------------- class TestMaxAstBytesConstant: def test_constant_exported_from_validation(self) -> None: from muse.core.validation import MAX_AST_BYTES assert isinstance(MAX_AST_BYTES, int) assert MAX_AST_BYTES == 2 * 1024 * 1024 def test_python_adapter_respects_limit(self) -> None: """PythonAdapter.parse_symbols must reject oversized files gracefully.""" from muse.plugins.code.ast_parser import PythonAdapter adapter = PythonAdapter() oversized = _oversized_py_source() assert len(oversized) > _MAX_AST_BYTES # Should return empty SymbolTree, not raise or hang. t0 = time.monotonic() result = adapter.parse_symbols(oversized, "big.py") elapsed = time.monotonic() - t0 assert isinstance(result, dict) # Grace: either rejected (empty) or parsed quickly (< 5s). assert len(result) == 0 or elapsed < 5.0, ( f"PythonAdapter spent {elapsed:.1f}s on a {len(oversized)}-byte file; " "MAX_AST_BYTES guard is missing" ) def test_python_adapter_file_content_id_respects_limit(self) -> None: """file_content_id must also apply the size limit.""" from muse.plugins.code.ast_parser import PythonAdapter adapter = PythonAdapter() oversized = _oversized_py_source() t0 = time.monotonic() cid = adapter.file_content_id(oversized) elapsed = time.monotonic() - t0 assert cid.startswith("sha256:") and len(cid) == 71 assert elapsed < 5.0, ( f"file_content_id spent {elapsed:.1f}s on oversized file; " "MAX_AST_BYTES guard is missing from file_content_id path" ) # --------------------------------------------------------------------------- # § 2 — Deep-nesting AST bomb # --------------------------------------------------------------------------- class TestDeepNestingBomb: def test_deep_nesting_parse_symbols_bounded(self) -> None: """A 2000-deep nested list must not block parse_symbols for > 10s.""" from muse.plugins.code.ast_parser import PythonAdapter adapter = PythonAdapter() bomb = _deep_nesting_bomb(depth=2_000) assert len(bomb) < _MAX_AST_BYTES # still under the size limit t0 = time.monotonic() result = adapter.parse_symbols(bomb, "bomb.py") elapsed = time.monotonic() - t0 assert elapsed < _AST_DOS_BUDGET_S, ( f"parse_symbols spent {elapsed:.1f}s on a depth-2000 nesting bomb " f"(budget {_AST_DOS_BUDGET_S}s)" ) assert isinstance(result, dict) def test_deep_nesting_file_content_id_bounded(self) -> None: """file_content_id must also be bounded on deeply nested structures.""" from muse.plugins.code.ast_parser import PythonAdapter adapter = PythonAdapter() bomb = _deep_nesting_bomb(depth=2_000) t0 = time.monotonic() cid = adapter.file_content_id(bomb) elapsed = time.monotonic() - t0 assert cid.startswith("sha256:") and len(cid) == 71 assert elapsed < _AST_DOS_BUDGET_S, ( f"file_content_id spent {elapsed:.1f}s on depth-2000 bomb" ) # --------------------------------------------------------------------------- # § 3 — CLI commands reject oversized Python files gracefully # --------------------------------------------------------------------------- def _oversized_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: """Create a repo containing one oversized Python file (> MAX_AST_BYTES).""" root, repo_id = _init_code_repo(tmp_path) src = _oversized_py_source() oid = _store_object(root, src) src_dir = root / "src" src_dir.mkdir() (src_dir / "huge.py").write_bytes(src) _make_commit(root, repo_id, "add oversized file", {"src/huge.py": oid}) return root, repo_id class TestOversizedFileCli: """Commands that parse Python AST must handle oversized files without hanging.""" def _run_bounded( self, root: pathlib.Path, args: list[str], budget_s: float = _AST_DOS_BUDGET_S, ) -> None: t0 = time.monotonic() r = runner.invoke(cli, args, env=_env(root)) elapsed = time.monotonic() - t0 assert elapsed < budget_s, ( f"Command {args} took {elapsed:.1f}s > budget {budget_s}s on " "oversized Python file — MAX_AST_BYTES guard is missing" ) # exit_code may be non-zero (file skipped / error reported) — that's fine. assert r.exception is None, f"Command raised unexpectedly: {r.exception}" def test_symbols_bounded(self, tmp_path: pathlib.Path) -> None: root, _ = _oversized_repo(tmp_path) self._run_bounded(root, ["code", "symbols"]) def test_dead_bounded(self, tmp_path: pathlib.Path) -> None: root, _ = _oversized_repo(tmp_path) self._run_bounded(root, ["code", "dead"]) def test_blast_risk_bounded(self, tmp_path: pathlib.Path) -> None: root, _ = _oversized_repo(tmp_path) self._run_bounded(root, ["code", "blast-risk", "--max-commits", "5"]) def test_semantic_test_coverage_bounded(self, tmp_path: pathlib.Path) -> None: root, _ = _oversized_repo(tmp_path) self._run_bounded(root, ["code", "semantic-test-coverage", "--max-commits", "5"]) def test_narrative_bounded(self, tmp_path: pathlib.Path) -> None: root, _ = _oversized_repo(tmp_path) self._run_bounded( root, ["code", "narrative", "src/huge.py::x", "--max-commits", "5"] ) def test_gravity_bounded(self, tmp_path: pathlib.Path) -> None: root, _ = _oversized_repo(tmp_path) self._run_bounded( root, ["code", "gravity", "src/huge.py::x", "--max-commits", "5"] ) def test_contract_bounded(self, tmp_path: pathlib.Path) -> None: root, _ = _oversized_repo(tmp_path) self._run_bounded( root, ["code", "contract", "src/huge.py::x", "--max-commits", "5"] )