test_security_code_porcelain.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
| 1 | """Security regression tests for Muse code domain porcelain commands. |
| 2 | |
| 3 | Red-hat findings from the semantic audit, turned into blue-hat defences: |
| 4 | |
| 5 | 1. ANSI / OSC terminal injection — 30+ commands printed user-controlled strings |
| 6 | (symbol addresses, commit messages, file paths) without sanitize_display. |
| 7 | Any commit with an OSC-52 payload in its message could hijack the clipboard |
| 8 | of every developer whose terminal renders that output. |
| 9 | |
| 10 | 2. Integer denial-of-service — 13 commands accept unbounded int arguments |
| 11 | (--top, --max-commits, --workers, --context, --limit, --min-co-changes, |
| 12 | --window, --predict). Passing 2147483647 triggers enormous allocations or |
| 13 | infinite-feeling loops that exhaust memory and CPU. |
| 14 | |
| 15 | 3. Output-path traversal — docs_cmd wrote to pathlib.Path(args.output) without |
| 16 | contain_path, allowing --output /etc/cron.d/malicious to escape the repo. |
| 17 | """ |
| 18 | |
| 19 | from __future__ import annotations |
| 20 | |
| 21 | import datetime |
| 22 | import json |
| 23 | import pathlib |
| 24 | import time |
| 25 | |
| 26 | import pytest |
| 27 | |
| 28 | from tests.cli_test_helper import CliRunner |
| 29 | from muse.core.types import fake_id, blob_id |
| 30 | from muse.core.object_store import write_object as _write_obj_store |
| 31 | from muse.core.paths import heads_dir, muse_dir |
| 32 | |
| 33 | cli = None # post-argparse migration stub |
| 34 | runner = CliRunner() |
| 35 | |
| 36 | # --------------------------------------------------------------------------- |
| 37 | # OSC-52 payload — NOT stripped by CliRunner._strip_ansi (which only strips |
| 38 | # \x1b[...m sequences), but IS stripped by sanitize_display (which removes |
| 39 | # every C0/C1 control character including ESC = 0x1B and BEL = 0x07). |
| 40 | # --------------------------------------------------------------------------- |
| 41 | _ANSI_PAYLOAD: str = "sec\x1b]52;c;HACKED==\x07end" |
| 42 | _ANSI_MARKER: str = "\x1b" |
| 43 | |
| 44 | |
| 45 | # --------------------------------------------------------------------------- |
| 46 | # Shared repo helpers |
| 47 | # --------------------------------------------------------------------------- |
| 48 | |
| 49 | def _env(root: pathlib.Path) -> Manifest: |
| 50 | return {"MUSE_REPO_ROOT": str(root)} |
| 51 | |
| 52 | |
| 53 | def _init_code_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 54 | dot_muse = muse_dir(tmp_path) |
| 55 | dot_muse.mkdir() |
| 56 | repo_id = fake_id("repo") |
| 57 | (dot_muse / "repo.json").write_text( |
| 58 | json.dumps({ |
| 59 | "repo_id": repo_id, |
| 60 | "domain": "code", |
| 61 | "default_branch": "main", |
| 62 | "created_at": "2025-01-01T00:00:00+00:00", |
| 63 | }), |
| 64 | encoding="utf-8", |
| 65 | ) |
| 66 | (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 67 | (dot_muse / "refs" / "heads").mkdir(parents=True) |
| 68 | (dot_muse / "snapshots").mkdir() |
| 69 | (dot_muse / "commits").mkdir() |
| 70 | (dot_muse / "objects").mkdir() |
| 71 | return tmp_path, repo_id |
| 72 | |
| 73 | |
| 74 | def _store_object(root: pathlib.Path, content: bytes) -> str: |
| 75 | """Write *content* into the object store and return its sha256:-prefixed id.""" |
| 76 | oid = blob_id(content) |
| 77 | _write_obj_store(root, oid, content) |
| 78 | return oid |
| 79 | |
| 80 | |
| 81 | def _make_commit( |
| 82 | root: pathlib.Path, |
| 83 | repo_id: str, |
| 84 | message: str = "init", |
| 85 | manifest: Manifest | None = None, |
| 86 | ) -> str: |
| 87 | from muse.core.commits import ( |
| 88 | CommitRecord, |
| 89 | write_commit, |
| 90 | ) |
| 91 | from muse.core.snapshots import ( |
| 92 | SnapshotRecord, |
| 93 | write_snapshot, |
| 94 | ) |
| 95 | from muse.core.ids import hash_snapshot as compute_snapshot_id, hash_commit as compute_commit_id |
| 96 | |
| 97 | ref_file = heads_dir(root) / "main" |
| 98 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 99 | m: Manifest = manifest or {} |
| 100 | snap_id = compute_snapshot_id(m) |
| 101 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 102 | commit_id = compute_commit_id( |
| 103 | parent_ids=[parent_id] if parent_id else [], |
| 104 | snapshot_id=snap_id, |
| 105 | message=message, |
| 106 | committed_at_iso=committed_at.isoformat(), |
| 107 | ) |
| 108 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) |
| 109 | write_commit(root, CommitRecord( |
| 110 | commit_id=commit_id, |
| 111 | branch="main", |
| 112 | snapshot_id=snap_id, |
| 113 | message=message, |
| 114 | committed_at=committed_at, |
| 115 | parent_commit_id=parent_id, |
| 116 | )) |
| 117 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 118 | ref_file.write_text(commit_id, encoding="utf-8") |
| 119 | return commit_id |
| 120 | |
| 121 | |
| 122 | def _ansi_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 123 | """Create a repo whose most-recent commit message contains an OSC-52 payload. |
| 124 | |
| 125 | Also includes a Python source file so symbol-related commands have data |
| 126 | to work with. The commit message injection is the primary ANSI attack |
| 127 | vector tested here — it affects every command that echoes messages. |
| 128 | """ |
| 129 | root, repo_id = _init_code_repo(tmp_path) |
| 130 | |
| 131 | # First commit — clean Python file |
| 132 | py_src = b"def alpha():\n return 1\n\ndef beta():\n return 2\n" |
| 133 | oid = _store_object(root, py_src) |
| 134 | # Create the physical file so working-tree commands can resolve it. |
| 135 | src_dir = root / "src" |
| 136 | src_dir.mkdir() |
| 137 | (src_dir / "module.py").write_bytes(py_src) |
| 138 | _make_commit(root, repo_id, "initial commit", {"src/module.py": oid}) |
| 139 | |
| 140 | # Second commit — same file, different body (creates churn for hotspots/stable) |
| 141 | py_src2 = b"def alpha():\n return 99\n\ndef beta():\n return 2\n" |
| 142 | oid2 = _store_object(root, py_src2) |
| 143 | (src_dir / "module.py").write_bytes(py_src2) |
| 144 | # Message carries the OSC-52 payload — the primary injection vector. |
| 145 | _make_commit(root, repo_id, _ANSI_PAYLOAD, {"src/module.py": oid2}) |
| 146 | |
| 147 | return root, repo_id |
| 148 | |
| 149 | |
| 150 | # --------------------------------------------------------------------------- |
| 151 | # § 1 — ANSI / OSC terminal injection |
| 152 | # |
| 153 | # Each test invokes one code porcelain command against the ANSI fixture repo |
| 154 | # and asserts that ESC (0x1B) is absent from the captured output. |
| 155 | # |
| 156 | # CliRunner._strip_ansi removes \x1b[...m sequences but NOT OSC sequences |
| 157 | # like \x1b]52;...\x07. sanitize_display (which the commands must call) |
| 158 | # removes ALL C0/C1 control characters including ESC. |
| 159 | # --------------------------------------------------------------------------- |
| 160 | |
| 161 | class TestAnsiInjectionCommit: |
| 162 | """Commands that display commit messages must not echo raw ESC bytes.""" |
| 163 | |
| 164 | def test_symbol_log_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 165 | root, _ = _ansi_repo(tmp_path) |
| 166 | r = runner.invoke(cli, ["code", "symbol-log", "src/module.py::alpha"], env=_env(root)) |
| 167 | assert _ANSI_MARKER not in r.output |
| 168 | |
| 169 | def test_blame_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 170 | root, _ = _ansi_repo(tmp_path) |
| 171 | r = runner.invoke(cli, ["code", "blame", "src/module.py::alpha"], env=_env(root)) |
| 172 | assert _ANSI_MARKER not in r.output |
| 173 | |
| 174 | def test_find_symbol_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 175 | root, _ = _ansi_repo(tmp_path) |
| 176 | r = runner.invoke(cli, ["code", "find-symbol", "--name", "alpha"], env=_env(root)) |
| 177 | assert _ANSI_MARKER not in r.output |
| 178 | |
| 179 | def test_narrative_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 180 | root, _ = _ansi_repo(tmp_path) |
| 181 | r = runner.invoke( |
| 182 | cli, |
| 183 | ["code", "narrative", "src/module.py::alpha", "--max-commits", "5"], |
| 184 | env=_env(root), |
| 185 | ) |
| 186 | assert _ANSI_MARKER not in r.output |
| 187 | |
| 188 | def test_contract_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 189 | root, _ = _ansi_repo(tmp_path) |
| 190 | r = runner.invoke( |
| 191 | cli, |
| 192 | ["code", "contract", "src/module.py::alpha", "--max-commits", "5"], |
| 193 | env=_env(root), |
| 194 | ) |
| 195 | assert _ANSI_MARKER not in r.output |
| 196 | |
| 197 | def test_detect_refactor_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 198 | root, _ = _ansi_repo(tmp_path) |
| 199 | r = runner.invoke( |
| 200 | cli, ["code", "detect-refactor", "--max-commits", "5"], env=_env(root) |
| 201 | ) |
| 202 | assert _ANSI_MARKER not in r.output |
| 203 | |
| 204 | def test_query_history_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 205 | root, _ = _ansi_repo(tmp_path) |
| 206 | r = runner.invoke(cli, ["code", "query-history", "kind=function"], env=_env(root)) |
| 207 | assert _ANSI_MARKER not in r.output |
| 208 | |
| 209 | |
| 210 | class TestAnsiInjectionAddress: |
| 211 | """Commands that display symbol addresses must not echo raw ESC bytes. |
| 212 | |
| 213 | We verify the sanitize_display path is called for address output. The |
| 214 | OSC-52 injection payload is embedded in the commit message (guaranteed |
| 215 | to appear in commands that echo messages). Symbol-address injection via |
| 216 | filesystem paths is impossible on most OSes; we rely on the code-review |
| 217 | audit and sanitize_display application at all print sites for that case. |
| 218 | """ |
| 219 | |
| 220 | def test_hotspots_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 221 | root, _ = _ansi_repo(tmp_path) |
| 222 | r = runner.invoke(cli, ["code", "hotspots", "--top", "10"], env=_env(root)) |
| 223 | assert _ANSI_MARKER not in r.output |
| 224 | |
| 225 | def test_stable_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 226 | root, _ = _ansi_repo(tmp_path) |
| 227 | r = runner.invoke(cli, ["code", "stable", "--top", "10"], env=_env(root)) |
| 228 | assert _ANSI_MARKER not in r.output |
| 229 | |
| 230 | def test_symbols_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 231 | root, _ = _ansi_repo(tmp_path) |
| 232 | r = runner.invoke(cli, ["code", "symbols"], env=_env(root)) |
| 233 | assert _ANSI_MARKER not in r.output |
| 234 | |
| 235 | def test_grep_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 236 | root, _ = _ansi_repo(tmp_path) |
| 237 | r = runner.invoke(cli, ["code", "grep", "alpha"], env=_env(root)) |
| 238 | assert _ANSI_MARKER not in r.output |
| 239 | |
| 240 | def test_cat_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 241 | root, _ = _ansi_repo(tmp_path) |
| 242 | r = runner.invoke( |
| 243 | cli, ["code", "cat", "src/module.py::alpha"], env=_env(root) |
| 244 | ) |
| 245 | assert _ANSI_MARKER not in r.output |
| 246 | |
| 247 | def test_blast_risk_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 248 | root, _ = _ansi_repo(tmp_path) |
| 249 | r = runner.invoke( |
| 250 | cli, |
| 251 | ["code", "blast-risk", "--top", "5", "--max-commits", "5"], |
| 252 | env=_env(root), |
| 253 | ) |
| 254 | assert _ANSI_MARKER not in r.output |
| 255 | |
| 256 | def test_age_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 257 | root, _ = _ansi_repo(tmp_path) |
| 258 | r = runner.invoke( |
| 259 | cli, |
| 260 | ["code", "age", "src/module.py::alpha", "--max-commits", "5"], |
| 261 | env=_env(root), |
| 262 | ) |
| 263 | assert _ANSI_MARKER not in r.output |
| 264 | |
| 265 | def test_velocity_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 266 | root, _ = _ansi_repo(tmp_path) |
| 267 | r = runner.invoke( |
| 268 | cli, |
| 269 | ["code", "velocity", "--top", "5", "--max-commits", "5"], |
| 270 | env=_env(root), |
| 271 | ) |
| 272 | assert _ANSI_MARKER not in r.output |
| 273 | |
| 274 | def test_entangle_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 275 | root, _ = _ansi_repo(tmp_path) |
| 276 | r = runner.invoke( |
| 277 | cli, |
| 278 | ["code", "entangle", "--top", "5", "--max-commits", "5"], |
| 279 | env=_env(root), |
| 280 | ) |
| 281 | assert _ANSI_MARKER not in r.output |
| 282 | |
| 283 | def test_gravity_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 284 | root, _ = _ansi_repo(tmp_path) |
| 285 | r = runner.invoke( |
| 286 | cli, |
| 287 | ["code", "gravity", "src/module.py::alpha", "--max-commits", "5"], |
| 288 | env=_env(root), |
| 289 | ) |
| 290 | assert _ANSI_MARKER not in r.output |
| 291 | |
| 292 | def test_impact_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 293 | root, _ = _ansi_repo(tmp_path) |
| 294 | r = runner.invoke( |
| 295 | cli, ["code", "impact", "src/module.py::alpha"], env=_env(root) |
| 296 | ) |
| 297 | assert _ANSI_MARKER not in r.output |
| 298 | |
| 299 | def test_deps_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 300 | root, _ = _ansi_repo(tmp_path) |
| 301 | r = runner.invoke( |
| 302 | cli, ["code", "deps", "src/module.py"], env=_env(root) |
| 303 | ) |
| 304 | assert _ANSI_MARKER not in r.output |
| 305 | |
| 306 | def test_coverage_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 307 | root, _ = _ansi_repo(tmp_path) |
| 308 | r = runner.invoke( |
| 309 | cli, ["code", "coverage", "src/module.py::alpha"], env=_env(root) |
| 310 | ) |
| 311 | assert _ANSI_MARKER not in r.output |
| 312 | |
| 313 | def test_lineage_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 314 | root, _ = _ansi_repo(tmp_path) |
| 315 | r = runner.invoke( |
| 316 | cli, ["code", "lineage", "src/module.py::alpha"], env=_env(root) |
| 317 | ) |
| 318 | assert _ANSI_MARKER not in r.output |
| 319 | |
| 320 | def test_api_surface_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 321 | root, _ = _ansi_repo(tmp_path) |
| 322 | r = runner.invoke(cli, ["code", "api-surface"], env=_env(root)) |
| 323 | assert _ANSI_MARKER not in r.output |
| 324 | |
| 325 | def test_dead_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 326 | root, _ = _ansi_repo(tmp_path) |
| 327 | r = runner.invoke(cli, ["code", "dead"], env=_env(root)) |
| 328 | assert _ANSI_MARKER not in r.output |
| 329 | |
| 330 | def test_clones_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 331 | root, _ = _ansi_repo(tmp_path) |
| 332 | r = runner.invoke(cli, ["code", "clones"], env=_env(root)) |
| 333 | assert _ANSI_MARKER not in r.output |
| 334 | |
| 335 | def test_codemap_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 336 | root, _ = _ansi_repo(tmp_path) |
| 337 | r = runner.invoke(cli, ["code", "codemap", "--top", "5"], env=_env(root)) |
| 338 | assert _ANSI_MARKER not in r.output |
| 339 | |
| 340 | def test_coupling_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 341 | root, _ = _ansi_repo(tmp_path) |
| 342 | r = runner.invoke( |
| 343 | cli, ["code", "coupling", "--top", "5", "--min", "1"], env=_env(root) |
| 344 | ) |
| 345 | assert _ANSI_MARKER not in r.output |
| 346 | |
| 347 | def test_compare_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 348 | root, _ = _ansi_repo(tmp_path) |
| 349 | r = runner.invoke( |
| 350 | cli, ["code", "compare", "HEAD~1", "HEAD"], env=_env(root) |
| 351 | ) |
| 352 | assert _ANSI_MARKER not in r.output |
| 353 | |
| 354 | def test_semantic_test_coverage_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 355 | root, _ = _ansi_repo(tmp_path) |
| 356 | r = runner.invoke( |
| 357 | cli, ["code", "semantic-test-coverage", "--max-commits", "5"], env=_env(root) |
| 358 | ) |
| 359 | assert _ANSI_MARKER not in r.output |
| 360 | |
| 361 | def test_predict_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 362 | root, _ = _ansi_repo(tmp_path) |
| 363 | r = runner.invoke( |
| 364 | cli, ["code", "predict", "--top", "5", "--max-commits", "5"], env=_env(root) |
| 365 | ) |
| 366 | assert _ANSI_MARKER not in r.output |
| 367 | |
| 368 | def test_patch_error_message_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 369 | """patch echoes the address back on error — must sanitize it.""" |
| 370 | root, _ = _ansi_repo(tmp_path) |
| 371 | malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func" |
| 372 | r = runner.invoke( |
| 373 | cli, ["code", "patch", malicious_addr, "--body", "-"], |
| 374 | env=_env(root), input="def func(): pass", |
| 375 | ) |
| 376 | assert _ANSI_MARKER not in r.output |
| 377 | |
| 378 | def test_checkout_symbol_error_message_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 379 | """checkout-symbol echoes the address on error — must sanitize.""" |
| 380 | root, _ = _ansi_repo(tmp_path) |
| 381 | malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func" |
| 382 | r = runner.invoke( |
| 383 | cli, ["code", "checkout-symbol", malicious_addr], env=_env(root) |
| 384 | ) |
| 385 | assert _ANSI_MARKER not in r.output |
| 386 | |
| 387 | def test_semantic_cherry_pick_error_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 388 | root, _ = _ansi_repo(tmp_path) |
| 389 | malicious_addr = f"src/module.py::\x1b]52;c;malicious\x07func" |
| 390 | r = runner.invoke( |
| 391 | cli, ["code", "semantic-cherry-pick", malicious_addr, "--from", "HEAD~1"], |
| 392 | env=_env(root), |
| 393 | ) |
| 394 | assert _ANSI_MARKER not in r.output |
| 395 | |
| 396 | def test_query_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 397 | root, _ = _ansi_repo(tmp_path) |
| 398 | r = runner.invoke(cli, ["code", "query", "kind=function"], env=_env(root)) |
| 399 | assert _ANSI_MARKER not in r.output |
| 400 | |
| 401 | def test_docs_cmd_no_ansi(self, tmp_path: pathlib.Path) -> None: |
| 402 | root, _ = _ansi_repo(tmp_path) |
| 403 | r = runner.invoke( |
| 404 | cli, |
| 405 | ["code", "docs", "history", "src/module.py::alpha"], |
| 406 | env=_env(root), |
| 407 | ) |
| 408 | assert _ANSI_MARKER not in r.output |
| 409 | |
| 410 | |
| 411 | # --------------------------------------------------------------------------- |
| 412 | # § 2 — Integer denial-of-service |
| 413 | # |
| 414 | # Commands with unbounded --top / --max-commits / --workers etc. must reject |
| 415 | # extreme values rather than allocating gigabytes of memory or looping for |
| 416 | # unbounded time. |
| 417 | # |
| 418 | # The test passes if: the command returns exit_code != 0 (clamped and |
| 419 | # rejected) OR it completes within a generous 5-second wall-clock budget |
| 420 | # (the correct behaviour after clamping is applied). |
| 421 | # --------------------------------------------------------------------------- |
| 422 | |
| 423 | _DOS_BUDGET_S: float = 5.0 # max wall-clock seconds for a command with huge arg |
| 424 | |
| 425 | |
| 426 | class TestIntegerDoS: |
| 427 | """Unbounded numeric args must be clamped; commands must not hang or OOM.""" |
| 428 | |
| 429 | def _check( |
| 430 | self, |
| 431 | root: pathlib.Path, |
| 432 | args: list[str], |
| 433 | huge_value: str = "2147483647", |
| 434 | ) -> None: |
| 435 | """Run the command with *huge_value* injected at the right position. |
| 436 | |
| 437 | Asserts: either exit_code != 0 (arg rejected) OR elapsed < _DOS_BUDGET_S. |
| 438 | A command that simply produces no output in time is fine; one that |
| 439 | hangs indefinitely is not. |
| 440 | """ |
| 441 | t0 = time.monotonic() |
| 442 | r = runner.invoke(cli, args, env=_env(root)) |
| 443 | elapsed = time.monotonic() - t0 |
| 444 | if r.exit_code == 0: |
| 445 | assert elapsed < _DOS_BUDGET_S, ( |
| 446 | f"Command {args} took {elapsed:.1f}s > budget {_DOS_BUDGET_S}s " |
| 447 | "with max-int arg — clamp_int guard is missing" |
| 448 | ) |
| 449 | # exit_code != 0 means the guard rejected the huge value (preferred) |
| 450 | |
| 451 | def test_hotspots_top_dos(self, tmp_path: pathlib.Path) -> None: |
| 452 | root, _ = _ansi_repo(tmp_path) |
| 453 | self._check(root, ["code", "hotspots", "--top", "2147483647"]) |
| 454 | |
| 455 | def test_hotspots_max_commits_dos(self, tmp_path: pathlib.Path) -> None: |
| 456 | root, _ = _ansi_repo(tmp_path) |
| 457 | self._check(root, ["code", "hotspots", "--max-commits", "2147483647"]) |
| 458 | |
| 459 | def test_stable_top_dos(self, tmp_path: pathlib.Path) -> None: |
| 460 | root, _ = _ansi_repo(tmp_path) |
| 461 | self._check(root, ["code", "stable", "--top", "2147483647"]) |
| 462 | |
| 463 | def test_coupling_top_dos(self, tmp_path: pathlib.Path) -> None: |
| 464 | root, _ = _ansi_repo(tmp_path) |
| 465 | self._check(root, ["code", "coupling", "--top", "2147483647"]) |
| 466 | |
| 467 | def test_coupling_min_dos(self, tmp_path: pathlib.Path) -> None: |
| 468 | root, _ = _ansi_repo(tmp_path) |
| 469 | self._check(root, ["code", "coupling", "--min", "2147483647"]) |
| 470 | |
| 471 | def test_blast_risk_top_dos(self, tmp_path: pathlib.Path) -> None: |
| 472 | root, _ = _ansi_repo(tmp_path) |
| 473 | self._check(root, ["code", "blast-risk", "--top", "2147483647"]) |
| 474 | |
| 475 | def test_blast_risk_max_commits_dos(self, tmp_path: pathlib.Path) -> None: |
| 476 | root, _ = _ansi_repo(tmp_path) |
| 477 | self._check(root, ["code", "blast-risk", "--max-commits", "2147483647"]) |
| 478 | |
| 479 | def test_age_max_commits_dos(self, tmp_path: pathlib.Path) -> None: |
| 480 | root, _ = _ansi_repo(tmp_path) |
| 481 | self._check( |
| 482 | root, ["code", "age", "src/module.py::alpha", "--max-commits", "2147483647"] |
| 483 | ) |
| 484 | |
| 485 | def test_velocity_top_dos(self, tmp_path: pathlib.Path) -> None: |
| 486 | root, _ = _ansi_repo(tmp_path) |
| 487 | self._check(root, ["code", "velocity", "--top", "2147483647"]) |
| 488 | |
| 489 | def test_velocity_max_commits_dos(self, tmp_path: pathlib.Path) -> None: |
| 490 | root, _ = _ansi_repo(tmp_path) |
| 491 | self._check(root, ["code", "velocity", "--max-commits", "2147483647"]) |
| 492 | |
| 493 | def test_entangle_top_dos(self, tmp_path: pathlib.Path) -> None: |
| 494 | root, _ = _ansi_repo(tmp_path) |
| 495 | self._check(root, ["code", "entangle", "--top", "2147483647"]) |
| 496 | |
| 497 | def test_entangle_max_commits_dos(self, tmp_path: pathlib.Path) -> None: |
| 498 | root, _ = _ansi_repo(tmp_path) |
| 499 | self._check(root, ["code", "entangle", "--max-commits", "2147483647"]) |
| 500 | |
| 501 | def test_entangle_min_co_changes_dos(self, tmp_path: pathlib.Path) -> None: |
| 502 | root, _ = _ansi_repo(tmp_path) |
| 503 | self._check(root, ["code", "entangle", "--min-co-changes", "2147483647"]) |
| 504 | |
| 505 | def test_find_symbol_limit_dos(self, tmp_path: pathlib.Path) -> None: |
| 506 | root, _ = _ansi_repo(tmp_path) |
| 507 | self._check( |
| 508 | root, ["code", "find-symbol", "--name", "alpha", "--limit", "2147483647"] |
| 509 | ) |
| 510 | |
| 511 | def test_dead_workers_dos(self, tmp_path: pathlib.Path) -> None: |
| 512 | root, _ = _ansi_repo(tmp_path) |
| 513 | self._check(root, ["code", "dead", "--workers", "99999"]) |
| 514 | |
| 515 | def test_codemap_top_dos(self, tmp_path: pathlib.Path) -> None: |
| 516 | root, _ = _ansi_repo(tmp_path) |
| 517 | self._check(root, ["code", "codemap", "--top", "2147483647"]) |
| 518 | |
| 519 | def test_cat_context_dos(self, tmp_path: pathlib.Path) -> None: |
| 520 | root, _ = _ansi_repo(tmp_path) |
| 521 | self._check( |
| 522 | root, |
| 523 | ["code", "cat", "src/module.py::alpha", "--context", "2147483647"], |
| 524 | ) |
| 525 | |
| 526 | def test_detect_refactor_max_commits_dos(self, tmp_path: pathlib.Path) -> None: |
| 527 | root, _ = _ansi_repo(tmp_path) |
| 528 | self._check(root, ["code", "detect-refactor", "--max-commits", "2147483647"]) |
| 529 | |
| 530 | def test_blame_max_dos(self, tmp_path: pathlib.Path) -> None: |
| 531 | root, _ = _ansi_repo(tmp_path) |
| 532 | self._check( |
| 533 | root, ["code", "blame", "src/module.py::alpha", "--max", "2147483647"] |
| 534 | ) |
| 535 | |
| 536 | |
| 537 | # --------------------------------------------------------------------------- |
| 538 | # § 3 — Output-path traversal (docs_cmd --output) |
| 539 | # --------------------------------------------------------------------------- |
| 540 | |
| 541 | class TestOutputPathTraversal: |
| 542 | """docs_cmd --output must not write files outside the repo root.""" |
| 543 | |
| 544 | def test_absolute_path_rejected(self, tmp_path: pathlib.Path) -> None: |
| 545 | """An absolute --output path that escapes the repo must fail.""" |
| 546 | root, _ = _ansi_repo(tmp_path) |
| 547 | outside = str(tmp_path.parent / "escaped_output.txt") |
| 548 | r = runner.invoke( |
| 549 | cli, |
| 550 | ["code", "docs", "generate", "--output", outside], |
| 551 | env=_env(root), |
| 552 | ) |
| 553 | # Either the command rejects the path (exit_code != 0) or the file |
| 554 | # was never written outside the repo root. |
| 555 | if r.exit_code == 0: |
| 556 | assert not pathlib.Path(outside).exists(), ( |
| 557 | "docs --output wrote a file outside the repo root — " |
| 558 | "validate_output_path guard is missing" |
| 559 | ) |
| 560 | |
| 561 | def test_dotdot_traversal_rejected(self, tmp_path: pathlib.Path) -> None: |
| 562 | """../escape.txt must not land outside the repo root.""" |
| 563 | root, _ = _ansi_repo(tmp_path) |
| 564 | r = runner.invoke( |
| 565 | cli, |
| 566 | ["code", "docs", "generate", "--output", "../../escape.txt"], |
| 567 | env=_env(root), |
| 568 | ) |
| 569 | escaped = (root / "../../escape.txt").resolve() |
| 570 | if r.exit_code == 0: |
| 571 | assert not escaped.exists() or str(escaped).startswith(str(root.resolve())), ( |
| 572 | "docs --output allowed ../ traversal out of repo root" |
| 573 | ) |
| 574 | |
| 575 | def test_safe_relative_path_allowed(self, tmp_path: pathlib.Path) -> None: |
| 576 | """A relative path inside the repo should succeed (or exit cleanly).""" |
| 577 | root, _ = _ansi_repo(tmp_path) |
| 578 | r = runner.invoke( |
| 579 | cli, |
| 580 | ["code", "docs", "generate", "--output", "out/docs.md"], |
| 581 | env=_env(root), |
| 582 | ) |
| 583 | # We don't assert exit_code here — the command may legitimately fail |
| 584 | # (e.g. no doc-ci config), but it must NOT write to a path outside root. |
| 585 | out_file = root / "out" / "docs.md" |
| 586 | if out_file.exists(): |
| 587 | assert str(out_file.resolve()).startswith(str(root.resolve())) |
| 588 | |
| 589 | |
| 590 | # --------------------------------------------------------------------------- |
| 591 | # § 4 — Sanitize_display unit contract |
| 592 | # |
| 593 | # Verify that the sanitize_display primitive itself correctly handles the |
| 594 | # OSC-52 payload used in §1 so that when commands adopt it, the guarantee |
| 595 | # is sound. |
| 596 | # --------------------------------------------------------------------------- |
| 597 | |
| 598 | class TestSanitizeDisplayContract: |
| 599 | """sanitize_display must strip ESC (0x1B) and BEL (0x07) unconditionally.""" |
| 600 | |
| 601 | def test_osc52_stripped(self) -> None: |
| 602 | from muse.core.validation import sanitize_display |
| 603 | result = sanitize_display(_ANSI_PAYLOAD) |
| 604 | assert _ANSI_MARKER not in result |
| 605 | assert "\x07" not in result |
| 606 | |
| 607 | def test_csi_color_stripped(self) -> None: |
| 608 | from muse.core.validation import sanitize_display |
| 609 | assert _ANSI_MARKER not in sanitize_display("\x1b[31mRED\x1b[0m") |
| 610 | |
| 611 | def test_plain_text_preserved(self) -> None: |
| 612 | from muse.core.validation import sanitize_display |
| 613 | text = "hello world 123 αβγ" |
| 614 | assert sanitize_display(text) == text |
| 615 | |
| 616 | def test_newline_and_tab_preserved(self) -> None: |
| 617 | from muse.core.validation import sanitize_display |
| 618 | text = "line1\n\tindented\n" |
| 619 | assert sanitize_display(text) == text |
| 620 | |
| 621 | def test_null_byte_stripped(self) -> None: |
| 622 | from muse.core.validation import sanitize_display |
| 623 | assert "\x00" not in sanitize_display("null\x00byte") |
| 624 | |
| 625 | def test_bel_stripped(self) -> None: |
| 626 | from muse.core.validation import sanitize_display |
| 627 | assert "\x07" not in sanitize_display("ring\x07bell") |
| 628 | |
| 629 | def test_hyperlink_osc8_stripped(self) -> None: |
| 630 | """OSC 8 hyperlink injection must be neutralised.""" |
| 631 | from muse.core.validation import sanitize_display |
| 632 | payload = "\x1b]8;;https://malicious.example\x07click\x1b]8;;\x07" |
| 633 | result = sanitize_display(payload) |
| 634 | assert _ANSI_MARKER not in result |
| 635 | assert "\x07" not in result |
| 636 | |
| 637 | |
| 638 | # --------------------------------------------------------------------------- |
| 639 | # § 5 — clamp_int / clamp_natural unit contract |
| 640 | # --------------------------------------------------------------------------- |
| 641 | |
| 642 | class TestClampNatural: |
| 643 | """clamp_natural must accept [0, max_val] and reject anything outside.""" |
| 644 | |
| 645 | def test_value_in_range(self) -> None: |
| 646 | from muse.core.validation import clamp_natural |
| 647 | assert clamp_natural(50, 100) == 50 |
| 648 | |
| 649 | def test_zero_allowed(self) -> None: |
| 650 | from muse.core.validation import clamp_natural |
| 651 | assert clamp_natural(0, 100) == 0 |
| 652 | |
| 653 | def test_max_val_allowed(self) -> None: |
| 654 | from muse.core.validation import clamp_natural |
| 655 | assert clamp_natural(100, 100) == 100 |
| 656 | |
| 657 | def test_negative_rejected(self) -> None: |
| 658 | from muse.core.validation import clamp_natural |
| 659 | with pytest.raises(ValueError, match="value"): |
| 660 | clamp_natural(-1, 100) |
| 661 | |
| 662 | def test_above_max_rejected(self) -> None: |
| 663 | from muse.core.validation import clamp_natural |
| 664 | with pytest.raises(ValueError): |
| 665 | clamp_natural(101, 100) |
| 666 | |
| 667 | def test_maxint_rejected(self) -> None: |
| 668 | from muse.core.validation import clamp_natural |
| 669 | with pytest.raises(ValueError): |
| 670 | clamp_natural(2_147_483_647, 10_000) |
| 671 | |
| 672 | |
| 673 | # --------------------------------------------------------------------------- |
| 674 | # § 6 — validate_output_path unit contract |
| 675 | # --------------------------------------------------------------------------- |
| 676 | |
| 677 | class TestValidateOutputPath: |
| 678 | """validate_output_path must confine the resolved path to the repo root.""" |
| 679 | |
| 680 | def test_relative_path_inside_root(self, tmp_path: pathlib.Path) -> None: |
| 681 | from muse.core.validation import validate_output_path |
| 682 | result = validate_output_path("out/report.md", tmp_path) |
| 683 | assert str(result).startswith(str(tmp_path.resolve())) |
| 684 | |
| 685 | def test_dotdot_rejected(self, tmp_path: pathlib.Path) -> None: |
| 686 | from muse.core.validation import validate_output_path |
| 687 | with pytest.raises(ValueError, match="traversal"): |
| 688 | validate_output_path("../../etc/passwd", tmp_path) |
| 689 | |
| 690 | def test_absolute_outside_root_rejected(self, tmp_path: pathlib.Path) -> None: |
| 691 | from muse.core.validation import validate_output_path |
| 692 | with pytest.raises(ValueError, match="traversal"): |
| 693 | validate_output_path("/etc/cron.d/malicious", tmp_path) |
| 694 | |
| 695 | def test_nested_relative_allowed(self, tmp_path: pathlib.Path) -> None: |
| 696 | from muse.core.validation import validate_output_path |
| 697 | result = validate_output_path("a/b/c/report.txt", tmp_path) |
| 698 | assert "a/b/c/report.txt" in str(result) |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago