test_security_ast_dos.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago
| 1 | """Security tests: unbounded ast.parse — CPU/memory denial of service. |
| 2 | |
| 3 | Python's ast.parse exhibits super-linear behaviour on certain constructs: |
| 4 | deeply nested list/dict literals, long chains of binary operators, and |
| 5 | multi-megabyte source files all cause parsing time to spike non-linearly. |
| 6 | |
| 7 | A malicious agent can commit a crafted Python file that causes any command |
| 8 | which calls ast.parse on workspace files (blast-risk, entangle, |
| 9 | semantic-test-coverage, narrative, gravity, contract, rename, dead) to peg |
| 10 | a CPU core indefinitely. |
| 11 | |
| 12 | The fix: check len(source_bytes) > MAX_AST_BYTES (2 MB) before calling |
| 13 | ast.parse. Commands must gracefully skip or report an error rather than |
| 14 | blocking the event loop. |
| 15 | """ |
| 16 | |
| 17 | from __future__ import annotations |
| 18 | |
| 19 | import ast |
| 20 | import datetime |
| 21 | import hashlib |
| 22 | import json |
| 23 | import pathlib |
| 24 | import time |
| 25 | import pytest |
| 26 | |
| 27 | from tests.cli_test_helper import CliRunner |
| 28 | from muse.core.object_store import object_path |
| 29 | from muse.core.types import fake_id |
| 30 | from muse.core.paths import heads_dir, muse_dir |
| 31 | |
| 32 | cli = None |
| 33 | runner = CliRunner() |
| 34 | |
| 35 | _AST_DOS_BUDGET_S: float = 10.0 # hard wall-clock limit per test |
| 36 | _MAX_AST_BYTES: int = 2 * 1024 * 1024 # 2 MB — must match validation.MAX_AST_BYTES |
| 37 | |
| 38 | |
| 39 | # --------------------------------------------------------------------------- |
| 40 | # Shared repo helpers (duplicated-minimal version — no shared conftest dep) |
| 41 | # --------------------------------------------------------------------------- |
| 42 | |
| 43 | def _env(root: pathlib.Path) -> Manifest: |
| 44 | return {"MUSE_REPO_ROOT": str(root)} |
| 45 | |
| 46 | |
| 47 | def _init_code_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 48 | dot_muse = muse_dir(tmp_path) |
| 49 | dot_muse.mkdir() |
| 50 | repo_id = fake_id("repo") |
| 51 | (dot_muse / "repo.json").write_text( |
| 52 | json.dumps({ |
| 53 | "repo_id": repo_id, |
| 54 | "domain": "code", |
| 55 | "default_branch": "main", |
| 56 | "created_at": "2025-01-01T00:00:00+00:00", |
| 57 | }), |
| 58 | encoding="utf-8", |
| 59 | ) |
| 60 | (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 61 | (dot_muse / "refs" / "heads").mkdir(parents=True) |
| 62 | (dot_muse / "snapshots").mkdir() |
| 63 | (dot_muse / "commits").mkdir() |
| 64 | (dot_muse / "objects").mkdir() |
| 65 | return tmp_path, repo_id |
| 66 | |
| 67 | |
| 68 | def _store_object(root: pathlib.Path, content: bytes) -> str: |
| 69 | from muse.core.types import blob_id |
| 70 | from muse.core.object_store import write_object |
| 71 | oid = blob_id(content) |
| 72 | write_object(root, oid, content) |
| 73 | return oid |
| 74 | |
| 75 | |
| 76 | def _make_commit( |
| 77 | root: pathlib.Path, |
| 78 | repo_id: str, |
| 79 | message: str = "init", |
| 80 | manifest: Manifest | None = None, |
| 81 | ) -> str: |
| 82 | from muse.core.commits import ( |
| 83 | CommitRecord, |
| 84 | write_commit, |
| 85 | ) |
| 86 | from muse.core.snapshots import ( |
| 87 | SnapshotRecord, |
| 88 | write_snapshot, |
| 89 | ) |
| 90 | from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id |
| 91 | |
| 92 | ref_file = heads_dir(root) / "main" |
| 93 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 94 | m: Manifest = manifest or {} |
| 95 | snap_id = compute_snapshot_id(m) |
| 96 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 97 | commit_id = compute_commit_id( |
| 98 | parent_ids=[parent_id] if parent_id else [], |
| 99 | snapshot_id=snap_id, |
| 100 | message=message, |
| 101 | committed_at_iso=committed_at.isoformat(), |
| 102 | ) |
| 103 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) |
| 104 | write_commit(root, CommitRecord( |
| 105 | commit_id=commit_id, |
| 106 | branch="main", |
| 107 | snapshot_id=snap_id, |
| 108 | message=message, |
| 109 | committed_at=committed_at, |
| 110 | parent_commit_id=parent_id, |
| 111 | )) |
| 112 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 113 | ref_file.write_text(commit_id, encoding="utf-8") |
| 114 | return commit_id |
| 115 | |
| 116 | |
| 117 | # --------------------------------------------------------------------------- |
| 118 | # Payload generators |
| 119 | # --------------------------------------------------------------------------- |
| 120 | |
| 121 | def _oversized_py_source() -> bytes: |
| 122 | """Produce a valid Python source file just over MAX_AST_BYTES (2 MB + 1).""" |
| 123 | # Simple repeated variable assignments — valid Python, linear AST. |
| 124 | header = "# generated oversized file\n" |
| 125 | line = "x = 1\n" |
| 126 | target = _MAX_AST_BYTES + 1 |
| 127 | lines_needed = (target - len(header.encode())) // len(line.encode()) |
| 128 | return (header + line * lines_needed).encode() |
| 129 | |
| 130 | |
| 131 | def _deep_nesting_bomb(depth: int = 2_000) -> bytes: |
| 132 | """Produce a Python source with *depth*-level nested list literals. |
| 133 | |
| 134 | CPython's compile stage (inside ast.parse) shows super-linear behaviour |
| 135 | on this input; at depth 10_000 it can take minutes. We use a moderate |
| 136 | depth here to keep the test fast on CI while still showing the pattern. |
| 137 | """ |
| 138 | inner = "0" |
| 139 | for _ in range(depth): |
| 140 | inner = f"[{inner}]" |
| 141 | return f"x = {inner}\n".encode() |
| 142 | |
| 143 | |
| 144 | # --------------------------------------------------------------------------- |
| 145 | # § 1 — MAX_AST_BYTES constant is exported |
| 146 | # --------------------------------------------------------------------------- |
| 147 | |
| 148 | class TestMaxAstBytesConstant: |
| 149 | def test_constant_exported_from_validation(self) -> None: |
| 150 | from muse.core.validation import MAX_AST_BYTES |
| 151 | assert isinstance(MAX_AST_BYTES, int) |
| 152 | assert MAX_AST_BYTES == 2 * 1024 * 1024 |
| 153 | |
| 154 | def test_python_adapter_respects_limit(self) -> None: |
| 155 | """PythonAdapter.parse_symbols must reject oversized files gracefully.""" |
| 156 | from muse.plugins.code.ast_parser import PythonAdapter |
| 157 | adapter = PythonAdapter() |
| 158 | oversized = _oversized_py_source() |
| 159 | assert len(oversized) > _MAX_AST_BYTES |
| 160 | # Should return empty SymbolTree, not raise or hang. |
| 161 | t0 = time.monotonic() |
| 162 | result = adapter.parse_symbols(oversized, "big.py") |
| 163 | elapsed = time.monotonic() - t0 |
| 164 | assert isinstance(result, dict) |
| 165 | # Grace: either rejected (empty) or parsed quickly (< 5s). |
| 166 | assert len(result) == 0 or elapsed < 5.0, ( |
| 167 | f"PythonAdapter spent {elapsed:.1f}s on a {len(oversized)}-byte file; " |
| 168 | "MAX_AST_BYTES guard is missing" |
| 169 | ) |
| 170 | |
| 171 | def test_python_adapter_file_content_id_respects_limit(self) -> None: |
| 172 | """file_content_id must also apply the size limit.""" |
| 173 | from muse.plugins.code.ast_parser import PythonAdapter |
| 174 | adapter = PythonAdapter() |
| 175 | oversized = _oversized_py_source() |
| 176 | t0 = time.monotonic() |
| 177 | cid = adapter.file_content_id(oversized) |
| 178 | elapsed = time.monotonic() - t0 |
| 179 | assert cid.startswith("sha256:") and len(cid) == 71 |
| 180 | assert elapsed < 5.0, ( |
| 181 | f"file_content_id spent {elapsed:.1f}s on oversized file; " |
| 182 | "MAX_AST_BYTES guard is missing from file_content_id path" |
| 183 | ) |
| 184 | |
| 185 | |
| 186 | # --------------------------------------------------------------------------- |
| 187 | # § 2 — Deep-nesting AST bomb |
| 188 | # --------------------------------------------------------------------------- |
| 189 | |
| 190 | class TestDeepNestingBomb: |
| 191 | def test_deep_nesting_parse_symbols_bounded(self) -> None: |
| 192 | """A 2000-deep nested list must not block parse_symbols for > 10s.""" |
| 193 | from muse.plugins.code.ast_parser import PythonAdapter |
| 194 | adapter = PythonAdapter() |
| 195 | bomb = _deep_nesting_bomb(depth=2_000) |
| 196 | assert len(bomb) < _MAX_AST_BYTES # still under the size limit |
| 197 | |
| 198 | t0 = time.monotonic() |
| 199 | result = adapter.parse_symbols(bomb, "bomb.py") |
| 200 | elapsed = time.monotonic() - t0 |
| 201 | assert elapsed < _AST_DOS_BUDGET_S, ( |
| 202 | f"parse_symbols spent {elapsed:.1f}s on a depth-2000 nesting bomb " |
| 203 | f"(budget {_AST_DOS_BUDGET_S}s)" |
| 204 | ) |
| 205 | assert isinstance(result, dict) |
| 206 | |
| 207 | def test_deep_nesting_file_content_id_bounded(self) -> None: |
| 208 | """file_content_id must also be bounded on deeply nested structures.""" |
| 209 | from muse.plugins.code.ast_parser import PythonAdapter |
| 210 | adapter = PythonAdapter() |
| 211 | bomb = _deep_nesting_bomb(depth=2_000) |
| 212 | t0 = time.monotonic() |
| 213 | cid = adapter.file_content_id(bomb) |
| 214 | elapsed = time.monotonic() - t0 |
| 215 | assert cid.startswith("sha256:") and len(cid) == 71 |
| 216 | assert elapsed < _AST_DOS_BUDGET_S, ( |
| 217 | f"file_content_id spent {elapsed:.1f}s on depth-2000 bomb" |
| 218 | ) |
| 219 | |
| 220 | |
| 221 | # --------------------------------------------------------------------------- |
| 222 | # § 3 — CLI commands reject oversized Python files gracefully |
| 223 | # --------------------------------------------------------------------------- |
| 224 | |
| 225 | def _oversized_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 226 | """Create a repo containing one oversized Python file (> MAX_AST_BYTES).""" |
| 227 | root, repo_id = _init_code_repo(tmp_path) |
| 228 | src = _oversized_py_source() |
| 229 | oid = _store_object(root, src) |
| 230 | src_dir = root / "src" |
| 231 | src_dir.mkdir() |
| 232 | (src_dir / "huge.py").write_bytes(src) |
| 233 | _make_commit(root, repo_id, "add oversized file", {"src/huge.py": oid}) |
| 234 | return root, repo_id |
| 235 | |
| 236 | |
| 237 | class TestOversizedFileCli: |
| 238 | """Commands that parse Python AST must handle oversized files without hanging.""" |
| 239 | |
| 240 | def _run_bounded( |
| 241 | self, |
| 242 | root: pathlib.Path, |
| 243 | args: list[str], |
| 244 | budget_s: float = _AST_DOS_BUDGET_S, |
| 245 | ) -> None: |
| 246 | t0 = time.monotonic() |
| 247 | r = runner.invoke(cli, args, env=_env(root)) |
| 248 | elapsed = time.monotonic() - t0 |
| 249 | assert elapsed < budget_s, ( |
| 250 | f"Command {args} took {elapsed:.1f}s > budget {budget_s}s on " |
| 251 | "oversized Python file — MAX_AST_BYTES guard is missing" |
| 252 | ) |
| 253 | # exit_code may be non-zero (file skipped / error reported) — that's fine. |
| 254 | assert r.exception is None, f"Command raised unexpectedly: {r.exception}" |
| 255 | |
| 256 | def test_symbols_bounded(self, tmp_path: pathlib.Path) -> None: |
| 257 | root, _ = _oversized_repo(tmp_path) |
| 258 | self._run_bounded(root, ["code", "symbols"]) |
| 259 | |
| 260 | def test_dead_bounded(self, tmp_path: pathlib.Path) -> None: |
| 261 | root, _ = _oversized_repo(tmp_path) |
| 262 | self._run_bounded(root, ["code", "dead"]) |
| 263 | |
| 264 | def test_blast_risk_bounded(self, tmp_path: pathlib.Path) -> None: |
| 265 | root, _ = _oversized_repo(tmp_path) |
| 266 | self._run_bounded(root, ["code", "blast-risk", "--max-commits", "5"]) |
| 267 | |
| 268 | def test_semantic_test_coverage_bounded(self, tmp_path: pathlib.Path) -> None: |
| 269 | root, _ = _oversized_repo(tmp_path) |
| 270 | self._run_bounded(root, ["code", "semantic-test-coverage", "--max-commits", "5"]) |
| 271 | |
| 272 | def test_narrative_bounded(self, tmp_path: pathlib.Path) -> None: |
| 273 | root, _ = _oversized_repo(tmp_path) |
| 274 | self._run_bounded( |
| 275 | root, ["code", "narrative", "src/huge.py::x", "--max-commits", "5"] |
| 276 | ) |
| 277 | |
| 278 | def test_gravity_bounded(self, tmp_path: pathlib.Path) -> None: |
| 279 | root, _ = _oversized_repo(tmp_path) |
| 280 | self._run_bounded( |
| 281 | root, ["code", "gravity", "src/huge.py::x", "--max-commits", "5"] |
| 282 | ) |
| 283 | |
| 284 | def test_contract_bounded(self, tmp_path: pathlib.Path) -> None: |
| 285 | root, _ = _oversized_repo(tmp_path) |
| 286 | self._run_bounded( |
| 287 | root, ["code", "contract", "src/huge.py::x", "--max-commits", "5"] |
| 288 | ) |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago