"""Comprehensive tests for ``muse code grep``. Coverage -------- Unit _normalise_language — case folding, unknown passthrough _file_matches — exact, suffix, separator normalisation _resolve_file_filter — single match, ambiguous, no match _MAX_PATTERN_LEN — boundary value _KIND_ICON — all documented kinds present Integration grep basic substring — match, no-match, kind filter, language filter grep --regex — valid, invalid (→ exit 1), boundary grep --file — scoped to one file (faster path) grep --count — prints integer, no extra lines grep --json — schema correctness, unicode, qualified-name search grep --hashes — content-id appears in output grep --commit — historical snapshot qualified-name search — "Invoice.compute_total" hits only that method Security / ReDoS Pattern length cap — 512 accepted, 513 rejected Catastrophic regex — (a+)+ type does not hang (timeout guard) NUL bytes in pattern — handled without crash Control chars — handled without crash Stress 1 000 symbols — search completes in < 5 s 512-char regex — compiles and runs without hang """ from __future__ import annotations import json import pathlib import textwrap import time import pytest from tests.cli_test_helper import CliRunner from muse.cli.commands.grep import ( _KIND_ICON, _MAX_PATTERN_LEN, _file_matches, normalise_language as _normalise_language, _resolve_file_filter, ) cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Shared fixture # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Initialise a fresh code-domain Muse repo with two Python files.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["init", "--domain", "code"]) assert result.exit_code == 0, result.output (tmp_path / "billing.py").write_text(textwrap.dedent("""\ class Invoice: def compute_total(self, items: list[int]) -> int: return sum(items) def apply_discount(self, total: float, pct: float) -> float: return total * (1 - pct) def validate_amount(amount: float) -> bool: return amount > 0 """)) (tmp_path / "auth.py").write_text(textwrap.dedent("""\ def validate_token(token: str) -> bool: return len(token) > 0 class Validator: def validate(self, value: object) -> bool: # type: ignore[override] return bool(value) """)) r = runner.invoke(cli, ["commit", "-m", "initial"]) assert r.exit_code == 0, r.output return tmp_path # --------------------------------------------------------------------------- # Unit — _normalise_language # --------------------------------------------------------------------------- class TestNormaliseLanguage: def test_python_lowercase(self) -> None: assert _normalise_language("python") == "Python" def test_python_uppercase(self) -> None: assert _normalise_language("PYTHON") == "Python" def test_python_mixed(self) -> None: assert _normalise_language("PyThOn") == "Python" def test_unknown_passthrough(self) -> None: # Unknown languages are passed through unchanged (after strip). result = _normalise_language("Cobol") assert result == "Cobol" def test_strips_whitespace(self) -> None: result = _normalise_language(" python ") assert result == "Python" def test_empty_string(self) -> None: # Empty string is not a known language — passed through. result = _normalise_language("") assert result == "" # --------------------------------------------------------------------------- # Unit — _file_matches # --------------------------------------------------------------------------- class TestFileMatches: def test_exact_match(self) -> None: assert _file_matches("src/billing.py", "src/billing.py") def test_suffix_match(self) -> None: assert _file_matches("src/billing.py", "billing.py") def test_no_match(self) -> None: assert not _file_matches("src/billing.py", "other.py") def test_partial_name_no_match(self) -> None: # "illing.py" is a suffix of "billing.py" but should not match without # a leading slash boundary. assert not _file_matches("src/billing.py", "illing.py") def test_backslash_normalised(self) -> None: # Windows-style separators in the filter are normalised before suffix check. assert _file_matches("a/src/billing.py", "src\\billing.py") def test_empty_filter(self) -> None: # Empty filter matches nothing sensible but must not crash. # The function returns True only for exact or slash-prefixed suffix. # "src/billing.py".endswith("/" + "") == endswith("/") → False # "src/billing.py" == "" → False assert not _file_matches("src/billing.py", "") def test_deep_path_suffix(self) -> None: assert _file_matches("a/b/c/d.py", "c/d.py") def test_same_filename_different_dir_no_match(self) -> None: assert not _file_matches("src/billing.py", "tests/billing.py") # --------------------------------------------------------------------------- # Unit — _resolve_file_filter # --------------------------------------------------------------------------- class TestResolveFileFilter: def test_single_match_returns_full_path(self) -> None: manifest = {"src/billing.py": "abc", "src/auth.py": "def"} result = _resolve_file_filter("billing.py", manifest) assert result == "src/billing.py" def test_no_match_returns_none(self) -> None: manifest = {"src/billing.py": "abc"} result = _resolve_file_filter("nonexistent.py", manifest) assert result is None def test_ambiguous_raises_system_exit(self) -> None: manifest = { "a/billing.py": "hash1", "b/billing.py": "hash2", } with pytest.raises(SystemExit): _resolve_file_filter("billing.py", manifest) def test_exact_path_returns_itself(self) -> None: manifest = {"src/billing.py": "abc"} result = _resolve_file_filter("src/billing.py", manifest) assert result == "src/billing.py" def test_empty_manifest_returns_none(self) -> None: result = _resolve_file_filter("billing.py", {}) assert result is None # --------------------------------------------------------------------------- # Unit — _MAX_PATTERN_LEN and _KIND_ICON constants # --------------------------------------------------------------------------- class TestConstants: def test_max_pattern_len_is_512(self) -> None: assert _MAX_PATTERN_LEN == 512 def test_kind_icon_has_function(self) -> None: assert "function" in _KIND_ICON def test_kind_icon_has_class(self) -> None: assert "class" in _KIND_ICON def test_kind_icon_has_method(self) -> None: assert "method" in _KIND_ICON # --------------------------------------------------------------------------- # Integration — basic substring search # --------------------------------------------------------------------------- class TestGrepBasic: def test_finds_function_by_name(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate"]) assert result.exit_code == 0, result.output assert "validate" in result.output.lower() def test_no_match_exits_zero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "zzznomatch999"]) assert result.exit_code == 0 assert "no symbols" in result.output.lower() def test_kind_filter_function(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--kind", "function"]) assert result.exit_code == 0 # Only functions should appear (methods excluded). assert "fn" in result.output def test_kind_filter_class(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "Invoice", "--kind", "class"]) assert result.exit_code == 0 assert "Invoice" in result.output # Methods of Invoice should NOT appear. assert "compute_total" not in result.output def test_language_filter(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--language", "python"]) assert result.exit_code == 0 assert "validate" in result.output.lower() def test_language_filter_unknown_exits_zero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--language", "COBOL"]) # No COBOL files — 0 matches, but not an error. assert result.exit_code == 0 def test_match_count_suffix(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate"]) assert "match" in result.output.lower() # --------------------------------------------------------------------------- # Integration — --count flag # --------------------------------------------------------------------------- class TestGrepCount: def test_count_only_prints_integer(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--count"]) assert result.exit_code == 0 line = result.output.strip().splitlines()[0] assert line.endswith("match(es)") # The leading token must be an integer. count_str = line.split()[0] assert count_str.isdigit() def test_count_zero_for_no_match(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "zzz_nothing", "--count"]) assert result.exit_code == 0 assert result.output.strip().startswith("0") # --------------------------------------------------------------------------- # Integration — --json output # --------------------------------------------------------------------------- class TestGrepJson: def test_json_schema(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "total_matches" in data assert "results" in data assert isinstance(data["results"], list) def test_json_result_fields(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--json"]) data = json.loads(result.output) if data["results"]: r = data["results"][0] for field in ("address", "kind", "name", "lineno"): assert field in r, f"missing field {field!r}" def test_json_total_matches_consistent(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--json"]) data = json.loads(result.output) assert data["total_matches"] == len(data["results"]) def test_json_no_match_empty_results(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "zzz_nope_ever", "--json"]) data = json.loads(result.output) assert data["total_matches"] == 0 assert data["results"] == [] def test_json_pattern_echoed(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--json"]) data = json.loads(result.output) assert data["pattern"] == "validate" def test_json_unicode_pattern(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "café", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["pattern"] == "café" # --------------------------------------------------------------------------- # Integration — --file scoped search # --------------------------------------------------------------------------- class TestGrepFile: def test_file_scoped_results_only_from_that_file(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--file", "billing.py"]) assert result.exit_code == 0 if "validate" in result.output: assert "auth.py" not in result.output def test_file_ambiguous_exits_nonzero(self, repo: pathlib.Path) -> None: # Create two files with the same basename in different dirs. (repo / "sub").mkdir() (repo / "sub" / "billing.py").write_text("def helper(): pass\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "add sub billing"]) result = runner.invoke(cli, ["code", "grep", "helper", "--file", "billing.py"]) # With two billing.py files it should be ambiguous. assert result.exit_code == 1 or "ambiguous" in result.output.lower() # --------------------------------------------------------------------------- # Integration — --hashes flag # --------------------------------------------------------------------------- class TestGrepHashes: def test_hashes_appear_in_output(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--hashes"]) assert result.exit_code == 0 # Content hash prefix should appear (8 hex chars + "..") assert ".." in result.output # --------------------------------------------------------------------------- # Integration — qualified name search # --------------------------------------------------------------------------- class TestGrepQualifiedName: def test_dot_separator_hits_qualified_name(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "Invoice.compute_total"]) assert result.exit_code == 0 assert "compute_total" in result.output def test_double_colon_separator_hits_qualified_name(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "Invoice::compute_total"]) assert result.exit_code == 0 def test_qualified_name_does_not_match_unrelated(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "Invoice.zzz_missing"]) assert result.exit_code == 0 assert "no symbols" in result.output.lower() or "0 match" in result.output.lower() # --------------------------------------------------------------------------- # Integration — --regex flag # --------------------------------------------------------------------------- class TestGrepRegex: def test_valid_regex_matches(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "--regex", "^validate"]) assert result.exit_code == 0 def test_invalid_regex_exits_one(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "--regex", "[unclosed"]) assert result.exit_code == 1 assert "regex" in result.stderr.lower() or "invalid" in result.stderr.lower() def test_regex_anchored_no_match(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "--regex", "^zzz_nothing$"]) assert result.exit_code == 0 assert "0 match" in result.output.lower() or "no symbols" in result.output.lower() # --------------------------------------------------------------------------- # Security — ReDoS guards # --------------------------------------------------------------------------- class TestGrepSecurity: def test_pattern_at_512_accepted(self, repo: pathlib.Path) -> None: pattern = "a" * 512 result = runner.invoke(cli, ["code", "grep", pattern]) assert "too long" not in result.output.lower() def test_pattern_at_513_rejected(self, repo: pathlib.Path) -> None: pattern = "a" * 513 result = runner.invoke(cli, ["code", "grep", pattern]) assert result.exit_code == 1 assert "512" in result.stderr or "too long" in result.stderr.lower() def test_catastrophic_regex_does_not_hang(self, repo: pathlib.Path) -> None: # (a+)+ is exponential on backtracking engines; Python's re module # with IGNORECASE+escape still builds a safe compiled pattern. # Without --regex the pattern is escaped so it literally searches for # "(a+)+" as a substring — which must return quickly. start = time.monotonic() result = runner.invoke(cli, ["code", "grep", "(a+)+"]) elapsed = time.monotonic() - start assert result.exit_code == 0 assert elapsed < 5.0, f"grep took {elapsed:.1f}s — possible hang" def test_null_bytes_in_pattern_handled(self, repo: pathlib.Path) -> None: # NUL byte in pattern must not crash the process. result = runner.invoke(cli, ["code", "grep", "val\x00idate"]) assert result.exit_code in (0, 1) def test_control_chars_in_pattern_handled(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "val\x01\x02idate"]) assert result.exit_code in (0, 1) def test_requires_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) result = runner.invoke(cli, ["code", "grep", "validate"]) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Stress — 1 000 symbols, search must complete in < 5 s # --------------------------------------------------------------------------- class TestGrepStress: @pytest.fixture def large_repo( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> pathlib.Path: """Repo with ~1 000 Python symbols across 10 files.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) runner.invoke(cli, ["init", "--domain", "code"]) for file_idx in range(10): lines: list[str] = [] for sym_idx in range(100): lines.append(f"def compute_{file_idx}_{sym_idx}(x: int) -> int:") lines.append(f" return x + {sym_idx}") lines.append("") (tmp_path / f"module_{file_idx}.py").write_text("\n".join(lines)) r = runner.invoke(cli, ["commit", "-m", "large module"]) assert r.exit_code == 0, r.output return tmp_path def test_search_1000_symbols_under_5s(self, large_repo: pathlib.Path) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "grep", "compute"]) elapsed = time.monotonic() - start assert result.exit_code == 0, result.output assert "1000" in result.output or "match" in result.output.lower() assert elapsed < 5.0, f"grep took {elapsed:.1f}s on 1 000 symbols" def test_count_mode_1000_symbols_under_5s(self, large_repo: pathlib.Path) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "grep", "compute", "--count"]) elapsed = time.monotonic() - start assert result.exit_code == 0 assert elapsed < 5.0 def test_json_mode_1000_symbols_schema_valid(self, large_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "compute", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["total_matches"] == len(data["results"]) assert data["total_matches"] >= 1000 def test_regex_search_1000_symbols_under_5s(self, large_repo: pathlib.Path) -> None: start = time.monotonic() result = runner.invoke(cli, ["code", "grep", "--regex", r"^compute_\d"]) elapsed = time.monotonic() - start assert result.exit_code == 0 assert elapsed < 5.0 def test_kind_filter_1000_symbols_correct_count(self, large_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "compute", "--kind", "function", "--count"]) assert result.exit_code == 0 line = result.output.strip().splitlines()[0] count = int(line.split()[0]) assert count >= 1000 def test_512_char_regex_does_not_hang_on_large_corpus( self, large_repo: pathlib.Path ) -> None: pattern = f"compute_{'0' * 504}" # exactly 512 chars start = time.monotonic() result = runner.invoke(cli, ["code", "grep", pattern]) elapsed = time.monotonic() - start assert elapsed < 5.0, f"512-char pattern took {elapsed:.1f}s — possible hang" assert result.exit_code in (0, 1) # no match is fine # --------------------------------------------------------------------------- # --files flag (-l) — one file path per line, unique, sorted # --------------------------------------------------------------------------- class TestGrepFiles: """``muse code grep --files`` prints one unique file path per line. Ergonomics goal: trivially pipeable without JSON parsing. Mirrors ``grep -l`` / ``rg -l`` behaviour. """ def test_files_lists_matching_file(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--files"]) assert result.exit_code == 0, result.output lines = [l for l in result.output.splitlines() if l.strip()] assert any("billing.py" in l or "auth.py" in l for l in lines) def test_files_output_is_unique_paths(self, repo: pathlib.Path) -> None: """Each file path appears at most once, even if it has multiple matches.""" result = runner.invoke(cli, ["code", "grep", "validate", "--files"]) assert result.exit_code == 0, result.output lines = [l.strip() for l in result.output.splitlines() if l.strip()] assert len(lines) == len(set(lines)), "duplicate file paths in --files output" def test_files_output_is_sorted(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--files"]) assert result.exit_code == 0, result.output lines = [l.strip() for l in result.output.splitlines() if l.strip()] assert lines == sorted(lines), "--files output must be sorted" def test_files_no_match_empty_output(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "zzznomatch", "--files"]) assert result.exit_code == 0 assert result.output.strip() == "" def test_files_excludes_non_matching_files(self, repo: pathlib.Path) -> None: """Only files that contain at least one match appear.""" result = runner.invoke(cli, ["code", "grep", "Invoice", "--files"]) assert result.exit_code == 0, result.output lines = [l.strip() for l in result.output.splitlines() if l.strip()] # Invoice is only in billing.py assert all("billing.py" in l for l in lines) assert not any("auth.py" in l for l in lines) def test_files_mutually_exclusive_with_json(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--files", "--json"]) assert result.exit_code != 0 def test_files_mutually_exclusive_with_count(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "grep", "validate", "--files", "--count"]) assert result.exit_code != 0 def test_files_long_flag_only(self, repo: pathlib.Path) -> None: """``--files`` is the only form (``-l`` is taken by ``--language``).""" result = runner.invoke(cli, ["code", "grep", "validate", "--files"]) assert result.exit_code == 0, result.output lines = [l.strip() for l in result.output.splitlines() if l.strip()] assert len(lines) > 0 def test_files_compatible_with_kind_filter(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "grep", "validate", "--files", "--kind", "function"] ) assert result.exit_code == 0, result.output lines = [l.strip() for l in result.output.splitlines() if l.strip()] # validate_amount and validate_token are functions assert len(lines) > 0 def test_files_compatible_with_file_filter(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "grep", "validate", "--files", "--file", "billing.py"] ) assert result.exit_code == 0, result.output lines = [l.strip() for l in result.output.splitlines() if l.strip()] assert all("billing.py" in l for l in lines)