"""Security regression tests for Muse code domain porcelain commands. Red-hat findings from the semantic audit, turned into blue-hat defences: 1. ANSI / OSC terminal injection — 30+ commands printed user-controlled strings (symbol addresses, commit messages, file paths) without sanitize_display. Any commit with an OSC-52 payload in its message could hijack the clipboard of every developer whose terminal renders that output. 2. Integer denial-of-service — 13 commands accept unbounded int arguments (--top, --max-commits, --workers, --context, --limit, --min-co-changes, --window, --predict). Passing 2147483647 triggers enormous allocations or infinite-feeling loops that exhaust memory and CPU. 3. Output-path traversal — docs_cmd wrote to pathlib.Path(args.output) without contain_path, allowing --output /etc/cron.d/malicious to escape the repo. """ from __future__ import annotations import datetime import json import pathlib import time import pytest from tests.cli_test_helper import CliRunner from muse.core.types import fake_id, blob_id from muse.core.object_store import write_object as _write_obj_store from muse.core.paths import heads_dir, muse_dir cli = None # post-argparse migration stub runner = CliRunner() # --------------------------------------------------------------------------- # OSC-52 payload — NOT stripped by CliRunner._strip_ansi (which only strips # \x1b[...m sequences), but IS stripped by sanitize_display (which removes # every C0/C1 control character including ESC = 0x1B and BEL = 0x07). # --------------------------------------------------------------------------- _ANSI_PAYLOAD: str = "sec\x1b]52;c;HACKED==\x07end" _ANSI_MARKER: str = "\x1b" # --------------------------------------------------------------------------- # Shared repo helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _init_code_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() repo_id = fake_id("repo") (dot_muse / "repo.json").write_text( json.dumps({ "repo_id": repo_id, "domain": "code", "default_branch": "main", "created_at": "2025-01-01T00:00:00+00:00", }), encoding="utf-8", ) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "snapshots").mkdir() (dot_muse / "commits").mkdir() (dot_muse / "objects").mkdir() return tmp_path, repo_id def _store_object(root: pathlib.Path, content: bytes) -> str: """Write *content* into the object store and return its sha256:-prefixed id.""" oid = blob_id(content) _write_obj_store(root, oid, content) return oid def _make_commit( root: pathlib.Path, repo_id: str, message: str = "init", manifest: Manifest | None = None, ) -> str: from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id ref_file = heads_dir(root) / "main" parent_id = ref_file.read_text().strip() if ref_file.exists() else None m: Manifest = manifest or {} snap_id = compute_snapshot_id(m) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id def _ansi_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: """Create a repo whose most-recent commit message contains an OSC-52 payload. Also includes a Python source file so symbol-related commands have data to work with. The commit message injection is the primary ANSI attack vector tested here — it affects every command that echoes messages. """ root, repo_id = _init_code_repo(tmp_path) # First commit — clean Python file py_src = b"def alpha():\n return 1\n\ndef beta():\n return 2\n" oid = _store_object(root, py_src) # Create the physical file so working-tree commands can resolve it. src_dir = root / "src" src_dir.mkdir() (src_dir / "module.py").write_bytes(py_src) _make_commit(root, repo_id, "initial commit", {"src/module.py": oid}) # Second commit — same file, different body (creates churn for hotspots/stable) py_src2 = b"def alpha():\n return 99\n\ndef beta():\n return 2\n" oid2 = _store_object(root, py_src2) (src_dir / "module.py").write_bytes(py_src2) # Message carries the OSC-52 payload — the primary injection vector. _make_commit(root, repo_id, _ANSI_PAYLOAD, {"src/module.py": oid2}) return root, repo_id # --------------------------------------------------------------------------- # § 1 — ANSI / OSC terminal injection # # Each test invokes one code porcelain command against the ANSI fixture repo # and asserts that ESC (0x1B) is absent from the captured output. # # CliRunner._strip_ansi removes \x1b[...m sequences but NOT OSC sequences # like \x1b]52;...\x07. sanitize_display (which the commands must call) # removes ALL C0/C1 control characters including ESC. # --------------------------------------------------------------------------- class TestAnsiInjectionCommit: """Commands that display commit messages must not echo raw ESC bytes.""" def test_symbol_log_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "symbol-log", "src/module.py::alpha"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_blame_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "blame", "src/module.py::alpha"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_find_symbol_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "find-symbol", "--name", "alpha"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_narrative_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "narrative", "src/module.py::alpha", "--max-commits", "5"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_contract_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "contract", "src/module.py::alpha", "--max-commits", "5"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_detect_refactor_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "detect-refactor", "--max-commits", "5"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_query_history_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "query-history", "kind=function"], env=_env(root)) assert _ANSI_MARKER not in r.output class TestAnsiInjectionAddress: """Commands that display symbol addresses must not echo raw ESC bytes. We verify the sanitize_display path is called for address output. The OSC-52 injection payload is embedded in the commit message (guaranteed to appear in commands that echo messages). Symbol-address injection via filesystem paths is impossible on most OSes; we rely on the code-review audit and sanitize_display application at all print sites for that case. """ def test_hotspots_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "hotspots", "--top", "10"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_stable_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "stable", "--top", "10"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_symbols_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "symbols"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_grep_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "grep", "alpha"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_cat_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "cat", "src/module.py::alpha"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_blast_risk_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "blast-risk", "--top", "5", "--max-commits", "5"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_age_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "age", "src/module.py::alpha", "--max-commits", "5"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_velocity_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "velocity", "--top", "5", "--max-commits", "5"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_entangle_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "entangle", "--top", "5", "--max-commits", "5"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_gravity_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "gravity", "src/module.py::alpha", "--max-commits", "5"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_impact_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "impact", "src/module.py::alpha"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_deps_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "deps", "src/module.py"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_coverage_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "coverage", "src/module.py::alpha"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_lineage_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "lineage", "src/module.py::alpha"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_api_surface_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "api-surface"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_dead_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "dead"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_clones_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "clones"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_codemap_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "codemap", "--top", "5"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_coupling_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "coupling", "--top", "5", "--min", "1"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_compare_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "compare", "HEAD~1", "HEAD"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_semantic_test_coverage_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "semantic-test-coverage", "--max-commits", "5"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_predict_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "predict", "--top", "5", "--max-commits", "5"], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_patch_error_message_no_ansi(self, tmp_path: pathlib.Path) -> None: """patch echoes the address back on error — must sanitize it.""" root, _ = _ansi_repo(tmp_path) malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func" r = runner.invoke( cli, ["code", "patch", malicious_addr, "--body", "-"], env=_env(root), input="def func(): pass", ) assert _ANSI_MARKER not in r.output def test_checkout_symbol_error_message_no_ansi(self, tmp_path: pathlib.Path) -> None: """checkout-symbol echoes the address on error — must sanitize.""" root, _ = _ansi_repo(tmp_path) malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func" r = runner.invoke( cli, ["code", "checkout-symbol", malicious_addr], env=_env(root) ) assert _ANSI_MARKER not in r.output def test_semantic_cherry_pick_error_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func" r = runner.invoke( cli, ["code", "semantic-cherry-pick", malicious_addr, "--from", "HEAD~1"], env=_env(root), ) assert _ANSI_MARKER not in r.output def test_query_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke(cli, ["code", "query", "kind=function"], env=_env(root)) assert _ANSI_MARKER not in r.output def test_docs_cmd_no_ansi(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "docs", "history", "src/module.py::alpha"], env=_env(root), ) assert _ANSI_MARKER not in r.output # --------------------------------------------------------------------------- # § 2 — Integer denial-of-service # # Commands with unbounded --top / --max-commits / --workers etc. must reject # extreme values rather than allocating gigabytes of memory or looping for # unbounded time. # # The test passes if: the command returns exit_code != 0 (clamped and # rejected) OR it completes within a generous 5-second wall-clock budget # (the correct behaviour after clamping is applied). # --------------------------------------------------------------------------- _DOS_BUDGET_S: float = 5.0 # max wall-clock seconds for a command with huge arg class TestIntegerDoS: """Unbounded numeric args must be clamped; commands must not hang or OOM.""" def _check( self, root: pathlib.Path, args: list[str], huge_value: str = "2147483647", ) -> None: """Run the command with *huge_value* injected at the right position. Asserts: either exit_code != 0 (arg rejected) OR elapsed < _DOS_BUDGET_S. A command that simply produces no output in time is fine; one that hangs indefinitely is not. """ t0 = time.monotonic() r = runner.invoke(cli, args, env=_env(root)) elapsed = time.monotonic() - t0 if r.exit_code == 0: assert elapsed < _DOS_BUDGET_S, ( f"Command {args} took {elapsed:.1f}s > budget {_DOS_BUDGET_S}s " "with max-int arg — clamp_int guard is missing" ) # exit_code != 0 means the guard rejected the huge value (preferred) def test_hotspots_top_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "hotspots", "--top", "2147483647"]) def test_hotspots_max_commits_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "hotspots", "--max-commits", "2147483647"]) def test_stable_top_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "stable", "--top", "2147483647"]) def test_coupling_top_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "coupling", "--top", "2147483647"]) def test_coupling_min_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "coupling", "--min", "2147483647"]) def test_blast_risk_top_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "blast-risk", "--top", "2147483647"]) def test_blast_risk_max_commits_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "blast-risk", "--max-commits", "2147483647"]) def test_age_max_commits_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check( root, ["code", "age", "src/module.py::alpha", "--max-commits", "2147483647"] ) def test_velocity_top_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "velocity", "--top", "2147483647"]) def test_velocity_max_commits_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "velocity", "--max-commits", "2147483647"]) def test_entangle_top_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "entangle", "--top", "2147483647"]) def test_entangle_max_commits_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "entangle", "--max-commits", "2147483647"]) def test_entangle_min_co_changes_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "entangle", "--min-co-changes", "2147483647"]) def test_find_symbol_limit_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check( root, ["code", "find-symbol", "--name", "alpha", "--limit", "2147483647"] ) def test_dead_workers_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "dead", "--workers", "99999"]) def test_codemap_top_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "codemap", "--top", "2147483647"]) def test_cat_context_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check( root, ["code", "cat", "src/module.py::alpha", "--context", "2147483647"], ) def test_detect_refactor_max_commits_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check(root, ["code", "detect-refactor", "--max-commits", "2147483647"]) def test_blame_max_dos(self, tmp_path: pathlib.Path) -> None: root, _ = _ansi_repo(tmp_path) self._check( root, ["code", "blame", "src/module.py::alpha", "--max", "2147483647"] ) # --------------------------------------------------------------------------- # § 3 — Output-path traversal (docs_cmd --output) # --------------------------------------------------------------------------- class TestOutputPathTraversal: """docs_cmd --output must not write files outside the repo root.""" def test_absolute_path_rejected(self, tmp_path: pathlib.Path) -> None: """An absolute --output path that escapes the repo must fail.""" root, _ = _ansi_repo(tmp_path) outside = str(tmp_path.parent / "escaped_output.txt") r = runner.invoke( cli, ["code", "docs", "generate", "--output", outside], env=_env(root), ) # Either the command rejects the path (exit_code != 0) or the file # was never written outside the repo root. if r.exit_code == 0: assert not pathlib.Path(outside).exists(), ( "docs --output wrote a file outside the repo root — " "validate_output_path guard is missing" ) def test_dotdot_traversal_rejected(self, tmp_path: pathlib.Path) -> None: """../escape.txt must not land outside the repo root.""" root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "docs", "generate", "--output", "../../escape.txt"], env=_env(root), ) escaped = (root / "../../escape.txt").resolve() if r.exit_code == 0: assert not escaped.exists() or str(escaped).startswith(str(root.resolve())), ( "docs --output allowed ../ traversal out of repo root" ) def test_safe_relative_path_allowed(self, tmp_path: pathlib.Path) -> None: """A relative path inside the repo should succeed (or exit cleanly).""" root, _ = _ansi_repo(tmp_path) r = runner.invoke( cli, ["code", "docs", "generate", "--output", "out/docs.md"], env=_env(root), ) # We don't assert exit_code here — the command may legitimately fail # (e.g. no doc-ci config), but it must NOT write to a path outside root. out_file = root / "out" / "docs.md" if out_file.exists(): assert str(out_file.resolve()).startswith(str(root.resolve())) # --------------------------------------------------------------------------- # § 4 — Sanitize_display unit contract # # Verify that the sanitize_display primitive itself correctly handles the # OSC-52 payload used in §1 so that when commands adopt it, the guarantee # is sound. # --------------------------------------------------------------------------- class TestSanitizeDisplayContract: """sanitize_display must strip ESC (0x1B) and BEL (0x07) unconditionally.""" def test_osc52_stripped(self) -> None: from muse.core.validation import sanitize_display result = sanitize_display(_ANSI_PAYLOAD) assert _ANSI_MARKER not in result assert "\x07" not in result def test_csi_color_stripped(self) -> None: from muse.core.validation import sanitize_display assert _ANSI_MARKER not in sanitize_display("\x1b[31mRED\x1b[0m") def test_plain_text_preserved(self) -> None: from muse.core.validation import sanitize_display text = "hello world 123 αβγ" assert sanitize_display(text) == text def test_newline_and_tab_preserved(self) -> None: from muse.core.validation import sanitize_display text = "line1\n\tindented\n" assert sanitize_display(text) == text def test_null_byte_stripped(self) -> None: from muse.core.validation import sanitize_display assert "\x00" not in sanitize_display("null\x00byte") def test_bel_stripped(self) -> None: from muse.core.validation import sanitize_display assert "\x07" not in sanitize_display("ring\x07bell") def test_hyperlink_osc8_stripped(self) -> None: """OSC 8 hyperlink injection must be neutralised.""" from muse.core.validation import sanitize_display payload = "\x1b]8;;https://malicious.example\x07click\x1b]8;;\x07" result = sanitize_display(payload) assert _ANSI_MARKER not in result assert "\x07" not in result # --------------------------------------------------------------------------- # § 5 — clamp_int / clamp_natural unit contract # --------------------------------------------------------------------------- class TestClampNatural: """clamp_natural must accept [0, max_val] and reject anything outside.""" def test_value_in_range(self) -> None: from muse.core.validation import clamp_natural assert clamp_natural(50, 100) == 50 def test_zero_allowed(self) -> None: from muse.core.validation import clamp_natural assert clamp_natural(0, 100) == 0 def test_max_val_allowed(self) -> None: from muse.core.validation import clamp_natural assert clamp_natural(100, 100) == 100 def test_negative_rejected(self) -> None: from muse.core.validation import clamp_natural with pytest.raises(ValueError, match="value"): clamp_natural(-1, 100) def test_above_max_rejected(self) -> None: from muse.core.validation import clamp_natural with pytest.raises(ValueError): clamp_natural(101, 100) def test_maxint_rejected(self) -> None: from muse.core.validation import clamp_natural with pytest.raises(ValueError): clamp_natural(2_147_483_647, 10_000) # --------------------------------------------------------------------------- # § 6 — validate_output_path unit contract # --------------------------------------------------------------------------- class TestValidateOutputPath: """validate_output_path must confine the resolved path to the repo root.""" def test_relative_path_inside_root(self, tmp_path: pathlib.Path) -> None: from muse.core.validation import validate_output_path result = validate_output_path("out/report.md", tmp_path) assert str(result).startswith(str(tmp_path.resolve())) def test_dotdot_rejected(self, tmp_path: pathlib.Path) -> None: from muse.core.validation import validate_output_path with pytest.raises(ValueError, match="traversal"): validate_output_path("../../etc/passwd", tmp_path) def test_absolute_outside_root_rejected(self, tmp_path: pathlib.Path) -> None: from muse.core.validation import validate_output_path with pytest.raises(ValueError, match="traversal"): validate_output_path("/etc/cron.d/malicious", tmp_path) def test_nested_relative_allowed(self, tmp_path: pathlib.Path) -> None: from muse.core.validation import validate_output_path result = validate_output_path("a/b/c/report.txt", tmp_path) assert "a/b/c/report.txt" in str(result)