gabriel / muse public
test_security_ast_dos.py python
288 lines 10.8 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago
1 """Security tests: unbounded ast.parse — CPU/memory denial of service.
2
3 Python's ast.parse exhibits super-linear behaviour on certain constructs:
4 deeply nested list/dict literals, long chains of binary operators, and
5 multi-megabyte source files all cause parsing time to spike non-linearly.
6
7 A malicious agent can commit a crafted Python file that causes any command
8 which calls ast.parse on workspace files (blast-risk, entangle,
9 semantic-test-coverage, narrative, gravity, contract, rename, dead) to peg
10 a CPU core indefinitely.
11
12 The fix: check len(source_bytes) > MAX_AST_BYTES (2 MB) before calling
13 ast.parse. Commands must gracefully skip or report an error rather than
14 blocking the event loop.
15 """
16
17 from __future__ import annotations
18
19 import ast
20 import datetime
21 import hashlib
22 import json
23 import pathlib
24 import time
25 import pytest
26
27 from tests.cli_test_helper import CliRunner
28 from muse.core.object_store import object_path
29 from muse.core.types import fake_id
30 from muse.core.paths import heads_dir, muse_dir
31
32 cli = None
33 runner = CliRunner()
34
35 _AST_DOS_BUDGET_S: float = 10.0 # hard wall-clock limit per test
36 _MAX_AST_BYTES: int = 2 * 1024 * 1024 # 2 MB — must match validation.MAX_AST_BYTES
37
38
39 # ---------------------------------------------------------------------------
40 # Shared repo helpers (duplicated-minimal version — no shared conftest dep)
41 # ---------------------------------------------------------------------------
42
43 def _env(root: pathlib.Path) -> Manifest:
44 return {"MUSE_REPO_ROOT": str(root)}
45
46
47 def _init_code_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
48 dot_muse = muse_dir(tmp_path)
49 dot_muse.mkdir()
50 repo_id = fake_id("repo")
51 (dot_muse / "repo.json").write_text(
52 json.dumps({
53 "repo_id": repo_id,
54 "domain": "code",
55 "default_branch": "main",
56 "created_at": "2025-01-01T00:00:00+00:00",
57 }),
58 encoding="utf-8",
59 )
60 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
61 (dot_muse / "refs" / "heads").mkdir(parents=True)
62 (dot_muse / "snapshots").mkdir()
63 (dot_muse / "commits").mkdir()
64 (dot_muse / "objects").mkdir()
65 return tmp_path, repo_id
66
67
68 def _store_object(root: pathlib.Path, content: bytes) -> str:
69 from muse.core.types import blob_id
70 from muse.core.object_store import write_object
71 oid = blob_id(content)
72 write_object(root, oid, content)
73 return oid
74
75
76 def _make_commit(
77 root: pathlib.Path,
78 repo_id: str,
79 message: str = "init",
80 manifest: Manifest | None = None,
81 ) -> str:
82 from muse.core.commits import (
83 CommitRecord,
84 write_commit,
85 )
86 from muse.core.snapshots import (
87 SnapshotRecord,
88 write_snapshot,
89 )
90 from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id
91
92 ref_file = heads_dir(root) / "main"
93 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
94 m: Manifest = manifest or {}
95 snap_id = compute_snapshot_id(m)
96 committed_at = datetime.datetime.now(datetime.timezone.utc)
97 commit_id = compute_commit_id(
98 parent_ids=[parent_id] if parent_id else [],
99 snapshot_id=snap_id,
100 message=message,
101 committed_at_iso=committed_at.isoformat(),
102 )
103 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m))
104 write_commit(root, CommitRecord(
105 commit_id=commit_id,
106 branch="main",
107 snapshot_id=snap_id,
108 message=message,
109 committed_at=committed_at,
110 parent_commit_id=parent_id,
111 ))
112 ref_file.parent.mkdir(parents=True, exist_ok=True)
113 ref_file.write_text(commit_id, encoding="utf-8")
114 return commit_id
115
116
117 # ---------------------------------------------------------------------------
118 # Payload generators
119 # ---------------------------------------------------------------------------
120
121 def _oversized_py_source() -> bytes:
122 """Produce a valid Python source file just over MAX_AST_BYTES (2 MB + 1)."""
123 # Simple repeated variable assignments — valid Python, linear AST.
124 header = "# generated oversized file\n"
125 line = "x = 1\n"
126 target = _MAX_AST_BYTES + 1
127 lines_needed = (target - len(header.encode())) // len(line.encode())
128 return (header + line * lines_needed).encode()
129
130
131 def _deep_nesting_bomb(depth: int = 2_000) -> bytes:
132 """Produce a Python source with *depth*-level nested list literals.
133
134 CPython's compile stage (inside ast.parse) shows super-linear behaviour
135 on this input; at depth 10_000 it can take minutes. We use a moderate
136 depth here to keep the test fast on CI while still showing the pattern.
137 """
138 inner = "0"
139 for _ in range(depth):
140 inner = f"[{inner}]"
141 return f"x = {inner}\n".encode()
142
143
144 # ---------------------------------------------------------------------------
145 # § 1 — MAX_AST_BYTES constant is exported
146 # ---------------------------------------------------------------------------
147
148 class TestMaxAstBytesConstant:
149 def test_constant_exported_from_validation(self) -> None:
150 from muse.core.validation import MAX_AST_BYTES
151 assert isinstance(MAX_AST_BYTES, int)
152 assert MAX_AST_BYTES == 2 * 1024 * 1024
153
154 def test_python_adapter_respects_limit(self) -> None:
155 """PythonAdapter.parse_symbols must reject oversized files gracefully."""
156 from muse.plugins.code.ast_parser import PythonAdapter
157 adapter = PythonAdapter()
158 oversized = _oversized_py_source()
159 assert len(oversized) > _MAX_AST_BYTES
160 # Should return empty SymbolTree, not raise or hang.
161 t0 = time.monotonic()
162 result = adapter.parse_symbols(oversized, "big.py")
163 elapsed = time.monotonic() - t0
164 assert isinstance(result, dict)
165 # Grace: either rejected (empty) or parsed quickly (< 5s).
166 assert len(result) == 0 or elapsed < 5.0, (
167 f"PythonAdapter spent {elapsed:.1f}s on a {len(oversized)}-byte file; "
168 "MAX_AST_BYTES guard is missing"
169 )
170
171 def test_python_adapter_file_content_id_respects_limit(self) -> None:
172 """file_content_id must also apply the size limit."""
173 from muse.plugins.code.ast_parser import PythonAdapter
174 adapter = PythonAdapter()
175 oversized = _oversized_py_source()
176 t0 = time.monotonic()
177 cid = adapter.file_content_id(oversized)
178 elapsed = time.monotonic() - t0
179 assert cid.startswith("sha256:") and len(cid) == 71
180 assert elapsed < 5.0, (
181 f"file_content_id spent {elapsed:.1f}s on oversized file; "
182 "MAX_AST_BYTES guard is missing from file_content_id path"
183 )
184
185
186 # ---------------------------------------------------------------------------
187 # § 2 — Deep-nesting AST bomb
188 # ---------------------------------------------------------------------------
189
190 class TestDeepNestingBomb:
191 def test_deep_nesting_parse_symbols_bounded(self) -> None:
192 """A 2000-deep nested list must not block parse_symbols for > 10s."""
193 from muse.plugins.code.ast_parser import PythonAdapter
194 adapter = PythonAdapter()
195 bomb = _deep_nesting_bomb(depth=2_000)
196 assert len(bomb) < _MAX_AST_BYTES # still under the size limit
197
198 t0 = time.monotonic()
199 result = adapter.parse_symbols(bomb, "bomb.py")
200 elapsed = time.monotonic() - t0
201 assert elapsed < _AST_DOS_BUDGET_S, (
202 f"parse_symbols spent {elapsed:.1f}s on a depth-2000 nesting bomb "
203 f"(budget {_AST_DOS_BUDGET_S}s)"
204 )
205 assert isinstance(result, dict)
206
207 def test_deep_nesting_file_content_id_bounded(self) -> None:
208 """file_content_id must also be bounded on deeply nested structures."""
209 from muse.plugins.code.ast_parser import PythonAdapter
210 adapter = PythonAdapter()
211 bomb = _deep_nesting_bomb(depth=2_000)
212 t0 = time.monotonic()
213 cid = adapter.file_content_id(bomb)
214 elapsed = time.monotonic() - t0
215 assert cid.startswith("sha256:") and len(cid) == 71
216 assert elapsed < _AST_DOS_BUDGET_S, (
217 f"file_content_id spent {elapsed:.1f}s on depth-2000 bomb"
218 )
219
220
221 # ---------------------------------------------------------------------------
222 # § 3 — CLI commands reject oversized Python files gracefully
223 # ---------------------------------------------------------------------------
224
225 def _oversized_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
226 """Create a repo containing one oversized Python file (> MAX_AST_BYTES)."""
227 root, repo_id = _init_code_repo(tmp_path)
228 src = _oversized_py_source()
229 oid = _store_object(root, src)
230 src_dir = root / "src"
231 src_dir.mkdir()
232 (src_dir / "huge.py").write_bytes(src)
233 _make_commit(root, repo_id, "add oversized file", {"src/huge.py": oid})
234 return root, repo_id
235
236
237 class TestOversizedFileCli:
238 """Commands that parse Python AST must handle oversized files without hanging."""
239
240 def _run_bounded(
241 self,
242 root: pathlib.Path,
243 args: list[str],
244 budget_s: float = _AST_DOS_BUDGET_S,
245 ) -> None:
246 t0 = time.monotonic()
247 r = runner.invoke(cli, args, env=_env(root))
248 elapsed = time.monotonic() - t0
249 assert elapsed < budget_s, (
250 f"Command {args} took {elapsed:.1f}s > budget {budget_s}s on "
251 "oversized Python file — MAX_AST_BYTES guard is missing"
252 )
253 # exit_code may be non-zero (file skipped / error reported) — that's fine.
254 assert r.exception is None, f"Command raised unexpectedly: {r.exception}"
255
256 def test_symbols_bounded(self, tmp_path: pathlib.Path) -> None:
257 root, _ = _oversized_repo(tmp_path)
258 self._run_bounded(root, ["code", "symbols"])
259
260 def test_dead_bounded(self, tmp_path: pathlib.Path) -> None:
261 root, _ = _oversized_repo(tmp_path)
262 self._run_bounded(root, ["code", "dead"])
263
264 def test_blast_risk_bounded(self, tmp_path: pathlib.Path) -> None:
265 root, _ = _oversized_repo(tmp_path)
266 self._run_bounded(root, ["code", "blast-risk", "--max-commits", "5"])
267
268 def test_semantic_test_coverage_bounded(self, tmp_path: pathlib.Path) -> None:
269 root, _ = _oversized_repo(tmp_path)
270 self._run_bounded(root, ["code", "semantic-test-coverage", "--max-commits", "5"])
271
272 def test_narrative_bounded(self, tmp_path: pathlib.Path) -> None:
273 root, _ = _oversized_repo(tmp_path)
274 self._run_bounded(
275 root, ["code", "narrative", "src/huge.py::x", "--max-commits", "5"]
276 )
277
278 def test_gravity_bounded(self, tmp_path: pathlib.Path) -> None:
279 root, _ = _oversized_repo(tmp_path)
280 self._run_bounded(
281 root, ["code", "gravity", "src/huge.py::x", "--max-commits", "5"]
282 )
283
284 def test_contract_bounded(self, tmp_path: pathlib.Path) -> None:
285 root, _ = _oversized_repo(tmp_path)
286 self._run_bounded(
287 root, ["code", "contract", "src/huge.py::x", "--max-commits", "5"]
288 )
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago