gabriel / muse public
test_porcelain_security.py python
343 lines 14.3 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
1 """Security-focused regression tests for all porcelain hardening fixes.
2
3 These tests verify the specific security improvements made during the
4 porcelain hardening pass:
5
6 - ReDoS guard in content-grep (pattern length limit)
7 - Zip-slip prevention in archive and snapshot export
8 - validate_branch_name added to checkout and rebase
9 - sanitize_display applied to all user-sourced echoed strings
10 - Atomic shelf writes (no temp file corruption)
11 - Snapshot ID glob prefix sanitisation
12 """
13
14 from __future__ import annotations
15
16 import datetime
17 import json
18 import pathlib
19
20 import pytest
21 from tests.cli_test_helper import CliRunner
22 from muse.core.types import NULL_COMMIT_ID, long_id, blob_id, fake_id
23 from muse.core.object_store import object_path
24 from muse.core.paths import heads_dir, muse_dir, shelf_dir
25
26 cli = None # argparse migration — CliRunner ignores this arg
27
28 runner = CliRunner()
29
30
31 # ---------------------------------------------------------------------------
32 # Shared repo setup helper
33 # ---------------------------------------------------------------------------
34
35 def _env(root: pathlib.Path) -> Manifest:
36 return {"MUSE_REPO_ROOT": str(root)}
37
38
39 def _init_repo(tmp_path: pathlib.Path, domain: str = "code") -> tuple[pathlib.Path, str]:
40 dot_muse = muse_dir(tmp_path)
41 dot_muse.mkdir()
42 repo_id = fake_id("repo")
43 (dot_muse / "repo.json").write_text(json.dumps({
44 "repo_id": repo_id,
45 "domain": domain,
46 "default_branch": "main",
47 "created_at": "2025-01-01T00:00:00+00:00",
48 }), encoding="utf-8")
49 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
50 (dot_muse / "refs" / "heads").mkdir(parents=True)
51 (dot_muse / "snapshots").mkdir()
52 (dot_muse / "commits").mkdir()
53 (dot_muse / "objects").mkdir()
54 return tmp_path, repo_id
55
56
57 def _make_commit(root: pathlib.Path, repo_id: str, message: str = "test") -> str:
58 from muse.core.commits import (
59 CommitRecord,
60 write_commit,
61 )
62 from muse.core.snapshots import (
63 SnapshotRecord,
64 write_snapshot,
65 )
66 from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id
67
68 ref_file = heads_dir(root) / "main"
69 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
70 manifest: Manifest = {}
71 snap_id = compute_snapshot_id(manifest)
72 committed_at = datetime.datetime.now(datetime.timezone.utc)
73 commit_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [],
74 snapshot_id=snap_id,
75 message=message,
76 committed_at_iso=committed_at.isoformat(),
77 )
78 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
79 write_commit(root, CommitRecord(
80 commit_id=commit_id, branch="main",
81 snapshot_id=snap_id, message=message, committed_at=committed_at,
82 parent_commit_id=parent_id,
83 ))
84 ref_file.parent.mkdir(parents=True, exist_ok=True)
85 ref_file.write_text(commit_id, encoding="utf-8")
86 return commit_id
87
88
89 # ---------------------------------------------------------------------------
90 # content-grep: ReDoS guard
91 # ---------------------------------------------------------------------------
92
93 class TestContentGrepSecurity:
94 def test_pattern_too_long_rejected(self, tmp_path: pathlib.Path) -> None:
95 root, repo_id = _init_repo(tmp_path)
96 _make_commit(root, repo_id)
97 long_pattern = "a" * 501 # > 500 char limit
98 result = runner.invoke(cli, ["content-grep", long_pattern], env=_env(root))
99 assert result.exit_code != 0
100 assert "too long" in result.stderr or "Pattern" in result.stderr
101
102 def test_pattern_exactly_500_chars_accepted(self, tmp_path: pathlib.Path) -> None:
103 root, repo_id = _init_repo(tmp_path)
104 _make_commit(root, repo_id)
105 pattern_500 = "a" * 500
106 result = runner.invoke(cli, ["content-grep", pattern_500], env=_env(root))
107 # No match → exit 1, but not a ReDoS validation failure
108 assert result.exit_code in (0, 1)
109
110 def test_invalid_regex_rejected(self, tmp_path: pathlib.Path) -> None:
111 root, repo_id = _init_repo(tmp_path)
112 _make_commit(root, repo_id)
113 result = runner.invoke(cli, ["content-grep", "[invalid regex"], env=_env(root))
114 assert result.exit_code != 0
115 assert "regex" in result.stderr.lower() or "invalid" in result.stderr.lower()
116
117 def test_output_sanitized_no_ansi_injection(self, tmp_path: pathlib.Path) -> None:
118 root, repo_id = _init_repo(tmp_path)
119 content = b"normal line\n\x1b[31mRED\x1b[0m line\nanother\n"
120 obj_id = blob_id(content)
121 obj_path = object_path(root, obj_id)
122 obj_path.parent.mkdir(parents=True, exist_ok=True)
123 obj_path.write_bytes(content)
124
125 from muse.core.commits import (
126 CommitRecord,
127 write_commit,
128 )
129 from muse.core.snapshots import (
130 SnapshotRecord,
131 write_snapshot,
132 )
133 from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id
134
135 manifest = {"file.txt": obj_id}
136 snap_id = compute_snapshot_id(manifest)
137 committed_at = datetime.datetime.now(datetime.timezone.utc)
138 commit_id = compute_commit_id( parent_ids=[],
139 snapshot_id=snap_id,
140 message="test",
141 committed_at_iso=committed_at.isoformat(),
142 )
143 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
144 write_commit(root, CommitRecord(
145 commit_id=commit_id, branch="main",
146 snapshot_id=snap_id, message="test", committed_at=committed_at,
147 parent_commit_id=None,
148 ))
149 (heads_dir(root) / "main").write_text(commit_id)
150
151 result = runner.invoke(cli, ["content-grep", "RED"], env=_env(root))
152 if result.exit_code == 0:
153 assert "\x1b" not in result.output
154
155
156 # ---------------------------------------------------------------------------
157 # archive: zip-slip guard
158 # ---------------------------------------------------------------------------
159
160 class TestArchiveSecurity:
161 def test_archive_prefix_with_dotdot_rejected(self, tmp_path: pathlib.Path) -> None:
162 root, repo_id = _init_repo(tmp_path)
163 _make_commit(root, repo_id)
164 result = runner.invoke(cli, ["archive", "--prefix", "../../malicious"], env=_env(root))
165 assert result.exit_code != 0
166
167 def test_zip_slip_guard_in_safe_arcname(self) -> None:
168 from muse.cli.commands.archive import _safe_arcname
169 assert _safe_arcname("safe", "../../../etc/passwd") is None
170 assert _safe_arcname("safe", "/etc/passwd") is None
171 assert _safe_arcname("safe", "normal/path.txt") == "safe/normal/path.txt"
172
173
174 # ---------------------------------------------------------------------------
175 # snapshot: glob prefix sanitisation
176 # ---------------------------------------------------------------------------
177
178 class TestSnapshotSecurity:
179 def test_validate_snapshot_id_prefix_strips_metacharacters(self) -> None:
180 from muse.cli.commands.snapshot_cmd import _validate_snapshot_id_prefix
181 prefix = _validate_snapshot_id_prefix("*bad[0-9]?glob*")
182 assert "*" not in prefix
183 assert "[" not in prefix
184 assert "?" not in prefix
185 assert all(c in "0123456789abcdef" for c in prefix)
186
187 def test_snapshot_show_with_glob_meta_no_injection(
188 self, tmp_path: pathlib.Path
189 ) -> None:
190 """Glob metacharacters in the snapshot ID prefix must be sanitised."""
191 root, repo_id = _init_repo(tmp_path)
192 _make_commit(root, repo_id)
193 # The '*' prefix is sanitised to empty string (no hex chars), so the
194 # command finds nothing but must not raise an exception or expose paths.
195 result = runner.invoke(cli, ["snapshot", "show", "*"], env=_env(root))
196 # Should not crash; may exit 0 (empty match) or non-zero (not found)
197 assert "\x1b" not in result.output
198 assert result.exception is None
199
200 def test_safe_arcname_in_snapshot(self) -> None:
201 from muse.cli.commands.snapshot_cmd import _safe_arcname
202 assert _safe_arcname("", "../../../etc/passwd") is None
203 assert _safe_arcname("prefix", "safe.txt") == "prefix/safe.txt"
204
205
206 # ---------------------------------------------------------------------------
207 # checkout: validate_branch_name on switch
208 # ---------------------------------------------------------------------------
209
210 class TestCheckoutSecurity:
211 def test_checkout_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None:
212 root, repo_id = _init_repo(tmp_path)
213 _make_commit(root, repo_id)
214 result = runner.invoke(cli, ["checkout", "../traversal"], env=_env(root))
215 assert result.exit_code != 0
216
217 def test_checkout_double_dot_rejected(self, tmp_path: pathlib.Path) -> None:
218 root, repo_id = _init_repo(tmp_path)
219 _make_commit(root, repo_id)
220 result = runner.invoke(cli, ["checkout", ".."], env=_env(root))
221 assert result.exit_code != 0
222
223 def test_checkout_valid_existing_branch_works(self, tmp_path: pathlib.Path) -> None:
224 root, repo_id = _init_repo(tmp_path)
225 _make_commit(root, repo_id)
226 # Create a second branch and switch to it
227 (heads_dir(root) / "dev").write_text(
228 (heads_dir(root) / "main").read_text()
229 )
230 result = runner.invoke(cli, ["checkout", "dev"], env=_env(root), catch_exceptions=False)
231 assert result.exit_code == 0
232
233
234 # ---------------------------------------------------------------------------
235 # rebase: validate_branch_name on upstream/onto
236 # ---------------------------------------------------------------------------
237
238 class TestRebaseSecurity:
239 def test_rebase_invalid_upstream_fails(self, tmp_path: pathlib.Path) -> None:
240 root, repo_id = _init_repo(tmp_path)
241 _make_commit(root, repo_id)
242 result = runner.invoke(cli, ["rebase", "../../../etc/passwd"], env=_env(root))
243 assert result.exit_code != 0
244
245
246 # ---------------------------------------------------------------------------
247 # shelf: atomic write regression
248 # ---------------------------------------------------------------------------
249
250 class TestShelfAtomicWrite:
251 def test_no_temp_files_after_save(self, tmp_path: pathlib.Path) -> None:
252 root, _ = _init_repo(tmp_path)
253 from muse.cli.commands.shelf import ShelfEntry, _compute_shelf_id
254 from muse.core.shelf import write_shelf_entry
255 raw = {
256 "name": "dev/000", "snapshot": {}, "deleted": [],
257 "snapshot_id": long_id("a" * 64), "parent_commit": long_id("b" * 64),
258 "branch": "main", "created_at": "2025-01-01T00:00:00+00:00",
259 "created_by": "human", "intent_type": "checkpoint", "intent": None,
260 "resumable": False, "tags": [], "expires_at": None, "domain_state": {},
261 }
262 shelf_id = _compute_shelf_id(raw)
263 entry = ShelfEntry(id=shelf_id, **raw) # type: ignore[misc]
264 write_shelf_entry(root, entry) # type: ignore[arg-type]
265 assert list(muse_dir(root).glob(".muse-tmp-*")) == []
266 assert (shelf_dir(root)).is_dir()
267
268 def test_shelf_file_contents_after_atomic_write(self, tmp_path: pathlib.Path) -> None:
269 root, _ = _init_repo(tmp_path)
270 from muse.cli.commands.shelf import _load_shelf, ShelfEntry, _compute_shelf_id
271 from muse.core.shelf import write_shelf_entry
272 raw = {
273 "name": "dev/000", "snapshot": {"a.py": long_id("c" * 64)}, "deleted": [],
274 "snapshot_id": long_id("b" * 64), "parent_commit": long_id("d" * 64),
275 "branch": "main", "created_at": "2025-06-01T12:00:00+00:00",
276 "created_by": "human", "intent_type": "checkpoint", "intent": None,
277 "resumable": False, "tags": [], "expires_at": None, "domain_state": {},
278 }
279 shelf_id = _compute_shelf_id(raw)
280 entry = ShelfEntry(id=shelf_id, **raw) # type: ignore[misc]
281 write_shelf_entry(root, entry) # type: ignore[arg-type]
282 loaded = _load_shelf(root)
283 assert len(loaded) == 1
284 assert loaded[0]["name"] == "dev/000"
285
286
287 # ---------------------------------------------------------------------------
288 # show: sanitize_display regression
289 # ---------------------------------------------------------------------------
290
291 class TestShowDisplaySanitize:
292 def test_commit_message_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None:
293 root, repo_id = _init_repo(tmp_path)
294 from muse.core.commits import (
295 CommitRecord,
296 write_commit,
297 )
298 from muse.core.snapshots import (
299 SnapshotRecord,
300 write_snapshot,
301 )
302 from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id
303
304 snap_id = compute_snapshot_id({})
305 committed_at = datetime.datetime.now(datetime.timezone.utc)
306 # Compute the commit_id from the actual message that will be stored.
307 actual_message = "malicious\x1b[31mRED\x1b[0m message"
308 commit_id = compute_commit_id( parent_ids=[],
309 snapshot_id=snap_id,
310 message=actual_message,
311 committed_at_iso=committed_at.isoformat(),
312 author="Alice\x1b[0m",)
313 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}))
314 write_commit(root, CommitRecord(
315 commit_id=commit_id, branch="main",
316 snapshot_id=snap_id,
317 message=actual_message,
318 committed_at=committed_at, parent_commit_id=None,
319 author="Alice\x1b[0m",
320 ))
321 (heads_dir(root) / "main").write_text(commit_id)
322
323 result = runner.invoke(cli, ["read"], env=_env(root), catch_exceptions=False)
324 assert result.exit_code == 0
325 assert "\x1b" not in result.output
326
327
328 # ---------------------------------------------------------------------------
329 # reflog: operation sanitization regression
330 # ---------------------------------------------------------------------------
331
332 class TestReflogSanitize:
333 def test_operation_ansi_not_in_output(self, tmp_path: pathlib.Path) -> None:
334 root, repo_id = _init_repo(tmp_path)
335 from muse.core.reflog import append_reflog
336 _make_commit(root, repo_id)
337 append_reflog(
338 root, "main",
339 old_id=NULL_COMMIT_ID, new_id="a" * 64,
340 author="user", operation="malicious\x1b[31mRED\x1b[0m",
341 )
342 result = runner.invoke(cli, ["reflog"], env=_env(root), catch_exceptions=False)
343 assert "\x1b" not in result.output
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago