gabriel / muse public
test_security_code_porcelain.py python
698 lines 27.7 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
1 """Security regression tests for Muse code domain porcelain commands.
2
3 Red-hat findings from the semantic audit, turned into blue-hat defences:
4
5 1. ANSI / OSC terminal injection — 30+ commands printed user-controlled strings
6 (symbol addresses, commit messages, file paths) without sanitize_display.
7 Any commit with an OSC-52 payload in its message could hijack the clipboard
8 of every developer whose terminal renders that output.
9
10 2. Integer denial-of-service — 13 commands accept unbounded int arguments
11 (--top, --max-commits, --workers, --context, --limit, --min-co-changes,
12 --window, --predict). Passing 2147483647 triggers enormous allocations or
13 infinite-feeling loops that exhaust memory and CPU.
14
15 3. Output-path traversal — docs_cmd wrote to pathlib.Path(args.output) without
16 contain_path, allowing --output /etc/cron.d/malicious to escape the repo.
17 """
18
19 from __future__ import annotations
20
21 import datetime
22 import json
23 import pathlib
24 import time
25
26 import pytest
27
28 from tests.cli_test_helper import CliRunner
29 from muse.core.types import fake_id, blob_id
30 from muse.core.object_store import write_object as _write_obj_store
31 from muse.core.paths import heads_dir, muse_dir
32
33 cli = None # post-argparse migration stub
34 runner = CliRunner()
35
36 # ---------------------------------------------------------------------------
37 # OSC-52 payload — NOT stripped by CliRunner._strip_ansi (which only strips
38 # \x1b[...m sequences), but IS stripped by sanitize_display (which removes
39 # every C0/C1 control character including ESC = 0x1B and BEL = 0x07).
40 # ---------------------------------------------------------------------------
41 _ANSI_PAYLOAD: str = "sec\x1b]52;c;HACKED==\x07end"
42 _ANSI_MARKER: str = "\x1b"
43
44
45 # ---------------------------------------------------------------------------
46 # Shared repo helpers
47 # ---------------------------------------------------------------------------
48
49 def _env(root: pathlib.Path) -> Manifest:
50 return {"MUSE_REPO_ROOT": str(root)}
51
52
53 def _init_code_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
54 dot_muse = muse_dir(tmp_path)
55 dot_muse.mkdir()
56 repo_id = fake_id("repo")
57 (dot_muse / "repo.json").write_text(
58 json.dumps({
59 "repo_id": repo_id,
60 "domain": "code",
61 "default_branch": "main",
62 "created_at": "2025-01-01T00:00:00+00:00",
63 }),
64 encoding="utf-8",
65 )
66 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
67 (dot_muse / "refs" / "heads").mkdir(parents=True)
68 (dot_muse / "snapshots").mkdir()
69 (dot_muse / "commits").mkdir()
70 (dot_muse / "objects").mkdir()
71 return tmp_path, repo_id
72
73
74 def _store_object(root: pathlib.Path, content: bytes) -> str:
75 """Write *content* into the object store and return its sha256:-prefixed id."""
76 oid = blob_id(content)
77 _write_obj_store(root, oid, content)
78 return oid
79
80
81 def _make_commit(
82 root: pathlib.Path,
83 repo_id: str,
84 message: str = "init",
85 manifest: Manifest | None = None,
86 ) -> str:
87 from muse.core.commits import (
88 CommitRecord,
89 write_commit,
90 )
91 from muse.core.snapshots import (
92 SnapshotRecord,
93 write_snapshot,
94 )
95 from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id
96
97 ref_file = heads_dir(root) / "main"
98 parent_id = ref_file.read_text().strip() if ref_file.exists() else None
99 m: Manifest = manifest or {}
100 snap_id = compute_snapshot_id(m)
101 committed_at = datetime.datetime.now(datetime.timezone.utc)
102 commit_id = compute_commit_id(
103 parent_ids=[parent_id] if parent_id else [],
104 snapshot_id=snap_id,
105 message=message,
106 committed_at_iso=committed_at.isoformat(),
107 )
108 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m))
109 write_commit(root, CommitRecord(
110 commit_id=commit_id,
111 branch="main",
112 snapshot_id=snap_id,
113 message=message,
114 committed_at=committed_at,
115 parent_commit_id=parent_id,
116 ))
117 ref_file.parent.mkdir(parents=True, exist_ok=True)
118 ref_file.write_text(commit_id, encoding="utf-8")
119 return commit_id
120
121
122 def _ansi_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
123 """Create a repo whose most-recent commit message contains an OSC-52 payload.
124
125 Also includes a Python source file so symbol-related commands have data
126 to work with. The commit message injection is the primary ANSI attack
127 vector tested here — it affects every command that echoes messages.
128 """
129 root, repo_id = _init_code_repo(tmp_path)
130
131 # First commit — clean Python file
132 py_src = b"def alpha():\n return 1\n\ndef beta():\n return 2\n"
133 oid = _store_object(root, py_src)
134 # Create the physical file so working-tree commands can resolve it.
135 src_dir = root / "src"
136 src_dir.mkdir()
137 (src_dir / "module.py").write_bytes(py_src)
138 _make_commit(root, repo_id, "initial commit", {"src/module.py": oid})
139
140 # Second commit — same file, different body (creates churn for hotspots/stable)
141 py_src2 = b"def alpha():\n return 99\n\ndef beta():\n return 2\n"
142 oid2 = _store_object(root, py_src2)
143 (src_dir / "module.py").write_bytes(py_src2)
144 # Message carries the OSC-52 payload — the primary injection vector.
145 _make_commit(root, repo_id, _ANSI_PAYLOAD, {"src/module.py": oid2})
146
147 return root, repo_id
148
149
150 # ---------------------------------------------------------------------------
151 # § 1 — ANSI / OSC terminal injection
152 #
153 # Each test invokes one code porcelain command against the ANSI fixture repo
154 # and asserts that ESC (0x1B) is absent from the captured output.
155 #
156 # CliRunner._strip_ansi removes \x1b[...m sequences but NOT OSC sequences
157 # like \x1b]52;...\x07. sanitize_display (which the commands must call)
158 # removes ALL C0/C1 control characters including ESC.
159 # ---------------------------------------------------------------------------
160
161 class TestAnsiInjectionCommit:
162 """Commands that display commit messages must not echo raw ESC bytes."""
163
164 def test_symbol_log_no_ansi(self, tmp_path: pathlib.Path) -> None:
165 root, _ = _ansi_repo(tmp_path)
166 r = runner.invoke(cli, ["code", "symbol-log", "src/module.py::alpha"], env=_env(root))
167 assert _ANSI_MARKER not in r.output
168
169 def test_blame_no_ansi(self, tmp_path: pathlib.Path) -> None:
170 root, _ = _ansi_repo(tmp_path)
171 r = runner.invoke(cli, ["code", "blame", "src/module.py::alpha"], env=_env(root))
172 assert _ANSI_MARKER not in r.output
173
174 def test_find_symbol_no_ansi(self, tmp_path: pathlib.Path) -> None:
175 root, _ = _ansi_repo(tmp_path)
176 r = runner.invoke(cli, ["code", "find-symbol", "--name", "alpha"], env=_env(root))
177 assert _ANSI_MARKER not in r.output
178
179 def test_narrative_no_ansi(self, tmp_path: pathlib.Path) -> None:
180 root, _ = _ansi_repo(tmp_path)
181 r = runner.invoke(
182 cli,
183 ["code", "narrative", "src/module.py::alpha", "--max-commits", "5"],
184 env=_env(root),
185 )
186 assert _ANSI_MARKER not in r.output
187
188 def test_contract_no_ansi(self, tmp_path: pathlib.Path) -> None:
189 root, _ = _ansi_repo(tmp_path)
190 r = runner.invoke(
191 cli,
192 ["code", "contract", "src/module.py::alpha", "--max-commits", "5"],
193 env=_env(root),
194 )
195 assert _ANSI_MARKER not in r.output
196
197 def test_detect_refactor_no_ansi(self, tmp_path: pathlib.Path) -> None:
198 root, _ = _ansi_repo(tmp_path)
199 r = runner.invoke(
200 cli, ["code", "detect-refactor", "--max-commits", "5"], env=_env(root)
201 )
202 assert _ANSI_MARKER not in r.output
203
204 def test_query_history_no_ansi(self, tmp_path: pathlib.Path) -> None:
205 root, _ = _ansi_repo(tmp_path)
206 r = runner.invoke(cli, ["code", "query-history", "kind=function"], env=_env(root))
207 assert _ANSI_MARKER not in r.output
208
209
210 class TestAnsiInjectionAddress:
211 """Commands that display symbol addresses must not echo raw ESC bytes.
212
213 We verify the sanitize_display path is called for address output. The
214 OSC-52 injection payload is embedded in the commit message (guaranteed
215 to appear in commands that echo messages). Symbol-address injection via
216 filesystem paths is impossible on most OSes; we rely on the code-review
217 audit and sanitize_display application at all print sites for that case.
218 """
219
220 def test_hotspots_no_ansi(self, tmp_path: pathlib.Path) -> None:
221 root, _ = _ansi_repo(tmp_path)
222 r = runner.invoke(cli, ["code", "hotspots", "--top", "10"], env=_env(root))
223 assert _ANSI_MARKER not in r.output
224
225 def test_stable_no_ansi(self, tmp_path: pathlib.Path) -> None:
226 root, _ = _ansi_repo(tmp_path)
227 r = runner.invoke(cli, ["code", "stable", "--top", "10"], env=_env(root))
228 assert _ANSI_MARKER not in r.output
229
230 def test_symbols_no_ansi(self, tmp_path: pathlib.Path) -> None:
231 root, _ = _ansi_repo(tmp_path)
232 r = runner.invoke(cli, ["code", "symbols"], env=_env(root))
233 assert _ANSI_MARKER not in r.output
234
235 def test_grep_no_ansi(self, tmp_path: pathlib.Path) -> None:
236 root, _ = _ansi_repo(tmp_path)
237 r = runner.invoke(cli, ["code", "grep", "alpha"], env=_env(root))
238 assert _ANSI_MARKER not in r.output
239
240 def test_cat_no_ansi(self, tmp_path: pathlib.Path) -> None:
241 root, _ = _ansi_repo(tmp_path)
242 r = runner.invoke(
243 cli, ["code", "cat", "src/module.py::alpha"], env=_env(root)
244 )
245 assert _ANSI_MARKER not in r.output
246
247 def test_blast_risk_no_ansi(self, tmp_path: pathlib.Path) -> None:
248 root, _ = _ansi_repo(tmp_path)
249 r = runner.invoke(
250 cli,
251 ["code", "blast-risk", "--top", "5", "--max-commits", "5"],
252 env=_env(root),
253 )
254 assert _ANSI_MARKER not in r.output
255
256 def test_age_no_ansi(self, tmp_path: pathlib.Path) -> None:
257 root, _ = _ansi_repo(tmp_path)
258 r = runner.invoke(
259 cli,
260 ["code", "age", "src/module.py::alpha", "--max-commits", "5"],
261 env=_env(root),
262 )
263 assert _ANSI_MARKER not in r.output
264
265 def test_velocity_no_ansi(self, tmp_path: pathlib.Path) -> None:
266 root, _ = _ansi_repo(tmp_path)
267 r = runner.invoke(
268 cli,
269 ["code", "velocity", "--top", "5", "--max-commits", "5"],
270 env=_env(root),
271 )
272 assert _ANSI_MARKER not in r.output
273
274 def test_entangle_no_ansi(self, tmp_path: pathlib.Path) -> None:
275 root, _ = _ansi_repo(tmp_path)
276 r = runner.invoke(
277 cli,
278 ["code", "entangle", "--top", "5", "--max-commits", "5"],
279 env=_env(root),
280 )
281 assert _ANSI_MARKER not in r.output
282
283 def test_gravity_no_ansi(self, tmp_path: pathlib.Path) -> None:
284 root, _ = _ansi_repo(tmp_path)
285 r = runner.invoke(
286 cli,
287 ["code", "gravity", "src/module.py::alpha", "--max-commits", "5"],
288 env=_env(root),
289 )
290 assert _ANSI_MARKER not in r.output
291
292 def test_impact_no_ansi(self, tmp_path: pathlib.Path) -> None:
293 root, _ = _ansi_repo(tmp_path)
294 r = runner.invoke(
295 cli, ["code", "impact", "src/module.py::alpha"], env=_env(root)
296 )
297 assert _ANSI_MARKER not in r.output
298
299 def test_deps_no_ansi(self, tmp_path: pathlib.Path) -> None:
300 root, _ = _ansi_repo(tmp_path)
301 r = runner.invoke(
302 cli, ["code", "deps", "src/module.py"], env=_env(root)
303 )
304 assert _ANSI_MARKER not in r.output
305
306 def test_coverage_no_ansi(self, tmp_path: pathlib.Path) -> None:
307 root, _ = _ansi_repo(tmp_path)
308 r = runner.invoke(
309 cli, ["code", "coverage", "src/module.py::alpha"], env=_env(root)
310 )
311 assert _ANSI_MARKER not in r.output
312
313 def test_lineage_no_ansi(self, tmp_path: pathlib.Path) -> None:
314 root, _ = _ansi_repo(tmp_path)
315 r = runner.invoke(
316 cli, ["code", "lineage", "src/module.py::alpha"], env=_env(root)
317 )
318 assert _ANSI_MARKER not in r.output
319
320 def test_api_surface_no_ansi(self, tmp_path: pathlib.Path) -> None:
321 root, _ = _ansi_repo(tmp_path)
322 r = runner.invoke(cli, ["code", "api-surface"], env=_env(root))
323 assert _ANSI_MARKER not in r.output
324
325 def test_dead_no_ansi(self, tmp_path: pathlib.Path) -> None:
326 root, _ = _ansi_repo(tmp_path)
327 r = runner.invoke(cli, ["code", "dead"], env=_env(root))
328 assert _ANSI_MARKER not in r.output
329
330 def test_clones_no_ansi(self, tmp_path: pathlib.Path) -> None:
331 root, _ = _ansi_repo(tmp_path)
332 r = runner.invoke(cli, ["code", "clones"], env=_env(root))
333 assert _ANSI_MARKER not in r.output
334
335 def test_codemap_no_ansi(self, tmp_path: pathlib.Path) -> None:
336 root, _ = _ansi_repo(tmp_path)
337 r = runner.invoke(cli, ["code", "codemap", "--top", "5"], env=_env(root))
338 assert _ANSI_MARKER not in r.output
339
340 def test_coupling_no_ansi(self, tmp_path: pathlib.Path) -> None:
341 root, _ = _ansi_repo(tmp_path)
342 r = runner.invoke(
343 cli, ["code", "coupling", "--top", "5", "--min", "1"], env=_env(root)
344 )
345 assert _ANSI_MARKER not in r.output
346
347 def test_compare_no_ansi(self, tmp_path: pathlib.Path) -> None:
348 root, _ = _ansi_repo(tmp_path)
349 r = runner.invoke(
350 cli, ["code", "compare", "HEAD~1", "HEAD"], env=_env(root)
351 )
352 assert _ANSI_MARKER not in r.output
353
354 def test_semantic_test_coverage_no_ansi(self, tmp_path: pathlib.Path) -> None:
355 root, _ = _ansi_repo(tmp_path)
356 r = runner.invoke(
357 cli, ["code", "semantic-test-coverage", "--max-commits", "5"], env=_env(root)
358 )
359 assert _ANSI_MARKER not in r.output
360
361 def test_predict_no_ansi(self, tmp_path: pathlib.Path) -> None:
362 root, _ = _ansi_repo(tmp_path)
363 r = runner.invoke(
364 cli, ["code", "predict", "--top", "5", "--max-commits", "5"], env=_env(root)
365 )
366 assert _ANSI_MARKER not in r.output
367
368 def test_patch_error_message_no_ansi(self, tmp_path: pathlib.Path) -> None:
369 """patch echoes the address back on error — must sanitize it."""
370 root, _ = _ansi_repo(tmp_path)
371 malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func"
372 r = runner.invoke(
373 cli, ["code", "patch", malicious_addr, "--body", "-"],
374 env=_env(root), input="def func(): pass",
375 )
376 assert _ANSI_MARKER not in r.output
377
378 def test_checkout_symbol_error_message_no_ansi(self, tmp_path: pathlib.Path) -> None:
379 """checkout-symbol echoes the address on error — must sanitize."""
380 root, _ = _ansi_repo(tmp_path)
381 malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func"
382 r = runner.invoke(
383 cli, ["code", "checkout-symbol", malicious_addr], env=_env(root)
384 )
385 assert _ANSI_MARKER not in r.output
386
387 def test_semantic_cherry_pick_error_no_ansi(self, tmp_path: pathlib.Path) -> None:
388 root, _ = _ansi_repo(tmp_path)
389 malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func"
390 r = runner.invoke(
391 cli, ["code", "semantic-cherry-pick", malicious_addr, "--from", "HEAD~1"],
392 env=_env(root),
393 )
394 assert _ANSI_MARKER not in r.output
395
396 def test_query_no_ansi(self, tmp_path: pathlib.Path) -> None:
397 root, _ = _ansi_repo(tmp_path)
398 r = runner.invoke(cli, ["code", "query", "kind=function"], env=_env(root))
399 assert _ANSI_MARKER not in r.output
400
401 def test_docs_cmd_no_ansi(self, tmp_path: pathlib.Path) -> None:
402 root, _ = _ansi_repo(tmp_path)
403 r = runner.invoke(
404 cli,
405 ["code", "docs", "history", "src/module.py::alpha"],
406 env=_env(root),
407 )
408 assert _ANSI_MARKER not in r.output
409
410
411 # ---------------------------------------------------------------------------
412 # § 2 — Integer denial-of-service
413 #
414 # Commands with unbounded --top / --max-commits / --workers etc. must reject
415 # extreme values rather than allocating gigabytes of memory or looping for
416 # unbounded time.
417 #
418 # The test passes if: the command returns exit_code != 0 (clamped and
419 # rejected) OR it completes within a generous 5-second wall-clock budget
420 # (the correct behaviour after clamping is applied).
421 # ---------------------------------------------------------------------------
422
423 _DOS_BUDGET_S: float = 5.0 # max wall-clock seconds for a command with huge arg
424
425
426 class TestIntegerDoS:
427 """Unbounded numeric args must be clamped; commands must not hang or OOM."""
428
429 def _check(
430 self,
431 root: pathlib.Path,
432 args: list[str],
433 huge_value: str = "2147483647",
434 ) -> None:
435 """Run the command with *huge_value* injected at the right position.
436
437 Asserts: either exit_code != 0 (arg rejected) OR elapsed < _DOS_BUDGET_S.
438 A command that simply produces no output in time is fine; one that
439 hangs indefinitely is not.
440 """
441 t0 = time.monotonic()
442 r = runner.invoke(cli, args, env=_env(root))
443 elapsed = time.monotonic() - t0
444 if r.exit_code == 0:
445 assert elapsed < _DOS_BUDGET_S, (
446 f"Command {args} took {elapsed:.1f}s > budget {_DOS_BUDGET_S}s "
447 "with max-int arg — clamp_int guard is missing"
448 )
449 # exit_code != 0 means the guard rejected the huge value (preferred)
450
451 def test_hotspots_top_dos(self, tmp_path: pathlib.Path) -> None:
452 root, _ = _ansi_repo(tmp_path)
453 self._check(root, ["code", "hotspots", "--top", "2147483647"])
454
455 def test_hotspots_max_commits_dos(self, tmp_path: pathlib.Path) -> None:
456 root, _ = _ansi_repo(tmp_path)
457 self._check(root, ["code", "hotspots", "--max-commits", "2147483647"])
458
459 def test_stable_top_dos(self, tmp_path: pathlib.Path) -> None:
460 root, _ = _ansi_repo(tmp_path)
461 self._check(root, ["code", "stable", "--top", "2147483647"])
462
463 def test_coupling_top_dos(self, tmp_path: pathlib.Path) -> None:
464 root, _ = _ansi_repo(tmp_path)
465 self._check(root, ["code", "coupling", "--top", "2147483647"])
466
467 def test_coupling_min_dos(self, tmp_path: pathlib.Path) -> None:
468 root, _ = _ansi_repo(tmp_path)
469 self._check(root, ["code", "coupling", "--min", "2147483647"])
470
471 def test_blast_risk_top_dos(self, tmp_path: pathlib.Path) -> None:
472 root, _ = _ansi_repo(tmp_path)
473 self._check(root, ["code", "blast-risk", "--top", "2147483647"])
474
475 def test_blast_risk_max_commits_dos(self, tmp_path: pathlib.Path) -> None:
476 root, _ = _ansi_repo(tmp_path)
477 self._check(root, ["code", "blast-risk", "--max-commits", "2147483647"])
478
479 def test_age_max_commits_dos(self, tmp_path: pathlib.Path) -> None:
480 root, _ = _ansi_repo(tmp_path)
481 self._check(
482 root, ["code", "age", "src/module.py::alpha", "--max-commits", "2147483647"]
483 )
484
485 def test_velocity_top_dos(self, tmp_path: pathlib.Path) -> None:
486 root, _ = _ansi_repo(tmp_path)
487 self._check(root, ["code", "velocity", "--top", "2147483647"])
488
489 def test_velocity_max_commits_dos(self, tmp_path: pathlib.Path) -> None:
490 root, _ = _ansi_repo(tmp_path)
491 self._check(root, ["code", "velocity", "--max-commits", "2147483647"])
492
493 def test_entangle_top_dos(self, tmp_path: pathlib.Path) -> None:
494 root, _ = _ansi_repo(tmp_path)
495 self._check(root, ["code", "entangle", "--top", "2147483647"])
496
497 def test_entangle_max_commits_dos(self, tmp_path: pathlib.Path) -> None:
498 root, _ = _ansi_repo(tmp_path)
499 self._check(root, ["code", "entangle", "--max-commits", "2147483647"])
500
501 def test_entangle_min_co_changes_dos(self, tmp_path: pathlib.Path) -> None:
502 root, _ = _ansi_repo(tmp_path)
503 self._check(root, ["code", "entangle", "--min-co-changes", "2147483647"])
504
505 def test_find_symbol_limit_dos(self, tmp_path: pathlib.Path) -> None:
506 root, _ = _ansi_repo(tmp_path)
507 self._check(
508 root, ["code", "find-symbol", "--name", "alpha", "--limit", "2147483647"]
509 )
510
511 def test_dead_workers_dos(self, tmp_path: pathlib.Path) -> None:
512 root, _ = _ansi_repo(tmp_path)
513 self._check(root, ["code", "dead", "--workers", "99999"])
514
515 def test_codemap_top_dos(self, tmp_path: pathlib.Path) -> None:
516 root, _ = _ansi_repo(tmp_path)
517 self._check(root, ["code", "codemap", "--top", "2147483647"])
518
519 def test_cat_context_dos(self, tmp_path: pathlib.Path) -> None:
520 root, _ = _ansi_repo(tmp_path)
521 self._check(
522 root,
523 ["code", "cat", "src/module.py::alpha", "--context", "2147483647"],
524 )
525
526 def test_detect_refactor_max_commits_dos(self, tmp_path: pathlib.Path) -> None:
527 root, _ = _ansi_repo(tmp_path)
528 self._check(root, ["code", "detect-refactor", "--max-commits", "2147483647"])
529
530 def test_blame_max_dos(self, tmp_path: pathlib.Path) -> None:
531 root, _ = _ansi_repo(tmp_path)
532 self._check(
533 root, ["code", "blame", "src/module.py::alpha", "--max", "2147483647"]
534 )
535
536
537 # ---------------------------------------------------------------------------
538 # § 3 — Output-path traversal (docs_cmd --output)
539 # ---------------------------------------------------------------------------
540
541 class TestOutputPathTraversal:
542 """docs_cmd --output must not write files outside the repo root."""
543
544 def test_absolute_path_rejected(self, tmp_path: pathlib.Path) -> None:
545 """An absolute --output path that escapes the repo must fail."""
546 root, _ = _ansi_repo(tmp_path)
547 outside = str(tmp_path.parent / "escaped_output.txt")
548 r = runner.invoke(
549 cli,
550 ["code", "docs", "generate", "--output", outside],
551 env=_env(root),
552 )
553 # Either the command rejects the path (exit_code != 0) or the file
554 # was never written outside the repo root.
555 if r.exit_code == 0:
556 assert not pathlib.Path(outside).exists(), (
557 "docs --output wrote a file outside the repo root — "
558 "validate_output_path guard is missing"
559 )
560
561 def test_dotdot_traversal_rejected(self, tmp_path: pathlib.Path) -> None:
562 """../escape.txt must not land outside the repo root."""
563 root, _ = _ansi_repo(tmp_path)
564 r = runner.invoke(
565 cli,
566 ["code", "docs", "generate", "--output", "../../escape.txt"],
567 env=_env(root),
568 )
569 escaped = (root / "../../escape.txt").resolve()
570 if r.exit_code == 0:
571 assert not escaped.exists() or str(escaped).startswith(str(root.resolve())), (
572 "docs --output allowed ../ traversal out of repo root"
573 )
574
575 def test_safe_relative_path_allowed(self, tmp_path: pathlib.Path) -> None:
576 """A relative path inside the repo should succeed (or exit cleanly)."""
577 root, _ = _ansi_repo(tmp_path)
578 r = runner.invoke(
579 cli,
580 ["code", "docs", "generate", "--output", "out/docs.md"],
581 env=_env(root),
582 )
583 # We don't assert exit_code here — the command may legitimately fail
584 # (e.g. no doc-ci config), but it must NOT write to a path outside root.
585 out_file = root / "out" / "docs.md"
586 if out_file.exists():
587 assert str(out_file.resolve()).startswith(str(root.resolve()))
588
589
590 # ---------------------------------------------------------------------------
591 # § 4 — Sanitize_display unit contract
592 #
593 # Verify that the sanitize_display primitive itself correctly handles the
594 # OSC-52 payload used in §1 so that when commands adopt it, the guarantee
595 # is sound.
596 # ---------------------------------------------------------------------------
597
598 class TestSanitizeDisplayContract:
599 """sanitize_display must strip ESC (0x1B) and BEL (0x07) unconditionally."""
600
601 def test_osc52_stripped(self) -> None:
602 from muse.core.validation import sanitize_display
603 result = sanitize_display(_ANSI_PAYLOAD)
604 assert _ANSI_MARKER not in result
605 assert "\x07" not in result
606
607 def test_csi_color_stripped(self) -> None:
608 from muse.core.validation import sanitize_display
609 assert _ANSI_MARKER not in sanitize_display("\x1b[31mRED\x1b[0m")
610
611 def test_plain_text_preserved(self) -> None:
612 from muse.core.validation import sanitize_display
613 text = "hello world 123 αβγ"
614 assert sanitize_display(text) == text
615
616 def test_newline_and_tab_preserved(self) -> None:
617 from muse.core.validation import sanitize_display
618 text = "line1\n\tindented\n"
619 assert sanitize_display(text) == text
620
621 def test_null_byte_stripped(self) -> None:
622 from muse.core.validation import sanitize_display
623 assert "\x00" not in sanitize_display("null\x00byte")
624
625 def test_bel_stripped(self) -> None:
626 from muse.core.validation import sanitize_display
627 assert "\x07" not in sanitize_display("ring\x07bell")
628
629 def test_hyperlink_osc8_stripped(self) -> None:
630 """OSC 8 hyperlink injection must be neutralised."""
631 from muse.core.validation import sanitize_display
632 payload = "\x1b]8;;https://malicious.example\x07click\x1b]8;;\x07"
633 result = sanitize_display(payload)
634 assert _ANSI_MARKER not in result
635 assert "\x07" not in result
636
637
638 # ---------------------------------------------------------------------------
639 # § 5 — clamp_int / clamp_natural unit contract
640 # ---------------------------------------------------------------------------
641
642 class TestClampNatural:
643 """clamp_natural must accept [0, max_val] and reject anything outside."""
644
645 def test_value_in_range(self) -> None:
646 from muse.core.validation import clamp_natural
647 assert clamp_natural(50, 100) == 50
648
649 def test_zero_allowed(self) -> None:
650 from muse.core.validation import clamp_natural
651 assert clamp_natural(0, 100) == 0
652
653 def test_max_val_allowed(self) -> None:
654 from muse.core.validation import clamp_natural
655 assert clamp_natural(100, 100) == 100
656
657 def test_negative_rejected(self) -> None:
658 from muse.core.validation import clamp_natural
659 with pytest.raises(ValueError, match="value"):
660 clamp_natural(-1, 100)
661
662 def test_above_max_rejected(self) -> None:
663 from muse.core.validation import clamp_natural
664 with pytest.raises(ValueError):
665 clamp_natural(101, 100)
666
667 def test_maxint_rejected(self) -> None:
668 from muse.core.validation import clamp_natural
669 with pytest.raises(ValueError):
670 clamp_natural(2_147_483_647, 10_000)
671
672
673 # ---------------------------------------------------------------------------
674 # § 6 — validate_output_path unit contract
675 # ---------------------------------------------------------------------------
676
677 class TestValidateOutputPath:
678 """validate_output_path must confine the resolved path to the repo root."""
679
680 def test_relative_path_inside_root(self, tmp_path: pathlib.Path) -> None:
681 from muse.core.validation import validate_output_path
682 result = validate_output_path("out/report.md", tmp_path)
683 assert str(result).startswith(str(tmp_path.resolve()))
684
685 def test_dotdot_rejected(self, tmp_path: pathlib.Path) -> None:
686 from muse.core.validation import validate_output_path
687 with pytest.raises(ValueError, match="traversal"):
688 validate_output_path("../../etc/passwd", tmp_path)
689
690 def test_absolute_outside_root_rejected(self, tmp_path: pathlib.Path) -> None:
691 from muse.core.validation import validate_output_path
692 with pytest.raises(ValueError, match="traversal"):
693 validate_output_path("/etc/cron.d/malicious", tmp_path)
694
695 def test_nested_relative_allowed(self, tmp_path: pathlib.Path) -> None:
696 from muse.core.validation import validate_output_path
697 result = validate_output_path("a/b/c/report.txt", tmp_path)
698 assert "a/b/c/report.txt" in str(result)
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago