"""Comprehensive tests for ``muse check`` — generic domain invariant enforcement. Coverage dimensions ------------------- Unit ~~~~ - ``_get_checker``: returns CodeChecker for code, MidiChecker for midi, None for unknown - ``_resolve_ref``: HEAD, short SHA, HEAD~N, explicit branch, non-existent ref - ``_filter_report``: filter by severity, rule name, path glob, combined - ``_CheckJson`` TypedDict shape has all required fields - ``format_report`` integration: zero violations, mixed violations Integration (run / CLI) ~~~~~~~~~~~~~~~~~~~~~~~ - Default invocation (HEAD, code domain) → exit 0 - ``--json`` output has all required keys with correct types - ``--json`` duration_ms > 0 - ``--json`` error_count / warning_count / info_count are integers - ``--json`` base_commit_id is None without --base - ``--strict`` exits 1 when errors present - ``--strict`` exits 0 when no errors - ``--warn`` exits 2 when warnings present - ``--warn`` exits 0 when no warnings - ``--strict`` and ``--warn`` combined - ``--base HEAD~1`` diff mode: no new violations on identical snapshots - ``--base`` diff mode: JSON has base_commit_id set - ``--base`` with bad ref exits non-zero with error - ``--branch`` checks tip of another branch - ``--filter-severity error`` narrows violations - ``--filter-severity warning`` narrows violations - ``--filter-rule`` keeps only matching rule - ``--filter-path`` keeps only matching addresses - ``--summary`` prints one-line pass/fail - ``--summary --strict`` propagates exit code - ``--rules`` custom TOML file used - ``--rules`` path outside repo rejected (security) - ``--rules`` absolute path outside repo rejected (security) - ``--json --summary`` → json wins (--summary only affects text mode) Commit resolution ~~~~~~~~~~~~~~~~~ - Full 64-char SHA resolved correctly - Short SHA prefix resolved correctly (HEAD is short prefix) - HEAD~1 walks one parent - HEAD~0 same as HEAD - Non-existent ref exits 1 with error message - Branch name resolves tip of that branch - Empty repo (no commits) exits with error Security ~~~~~~~~ - ANSI escape in commit_arg stripped from display - ANSI escape in domain name stripped from display - ``--rules`` with ``../../../etc/passwd`` rejected - ``--rules`` with absolute path outside repo rejected - ``--filter-rule`` with ANSI escape doesn't crash - ``--filter-path`` with ``/etc/*`` doesn't crash Edge cases ~~~~~~~~~~ - No commits on current branch → error message - Unknown domain (not code/midi) → warning, exit 0 - Rules file that is a symlink outside repo → rejected - ``--base`` same as HEAD → zero new violations - ``--filter-severity`` with no matching violations → empty report, exit 0 - ``--json`` on fresh empty repo → error JSON, non-zero exit Stress ~~~~~~ - 200-violation report filtered correctly - check with large TOML rules file (50 rules) doesn't crash """ from __future__ import annotations import datetime import json import pathlib import pytest from muse.core.invariants import BaseReport, BaseViolation, make_report from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, long_id, short_id, fake_id, blob_id from muse.core.paths import muse_dir, ref_path from muse.core.object_store import write_object from tests.cli_test_helper import CliRunner runner = CliRunner() cli = None _EPOCH = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Repo helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path, domain: str = "code") -> pathlib.Path: dot_muse = muse_dir(tmp_path) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({ "repo_id": fake_id("repo"), "domain": domain, "default_branch": "main", "created_at": "2026-01-01T00:00:00+00:00", }), encoding="utf-8", ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") return tmp_path def _write_commit_chain( root: pathlib.Path, n: int = 1, branch: str = "main", file_content: bytes = b"pass", ) -> list[str]: """Write *n* commits on *branch*, returning the list of commit IDs (oldest first).""" commit_ids: list[str] = [] parent: str | None = None for i in range(n): content = file_content + f"\n# {i}".encode() oid = blob_id(content) write_object(root, oid, content) manifest = {"main.py": oid} snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) ts = (_EPOCH + datetime.timedelta(seconds=i)).isoformat() commit_id = hash_commit( parent_ids=[p for p in [parent] if p], snapshot_id=snap_id, message=f"commit {i}", committed_at_iso=ts, author="test", ) committed_at_dt = _EPOCH + datetime.timedelta(seconds=i) write_commit(root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=f"commit {i}", committed_at=committed_at_dt, parent_commit_id=parent, author="test", )) branch_ref = ref_path(root, branch) branch_ref.parent.mkdir(parents=True, exist_ok=True) branch_ref.write_text(commit_id, encoding="utf-8") commit_ids.append(commit_id) parent = commit_id return commit_ids def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _invoke(root: pathlib.Path, *args: str) -> tuple[int, str]: r = runner.invoke(cli, list(args), env=_env(root), catch_exceptions=False) return r.exit_code, r.output def _invoke_unchecked(root: pathlib.Path, *args: str) -> tuple[int, str]: r = runner.invoke(cli, list(args), env=_env(root)) return r.exit_code, r.output + r.stderr # --------------------------------------------------------------------------- # Unit — _get_checker # --------------------------------------------------------------------------- class TestGetChecker: def test_code_returns_code_checker(self) -> None: from muse.cli.commands.check import _get_checker from muse.plugins.code._invariants import CodeChecker assert isinstance(_get_checker("code"), CodeChecker) def test_midi_returns_midi_checker(self) -> None: from muse.cli.commands.check import _get_checker from muse.plugins.midi._invariants import MidiChecker assert isinstance(_get_checker("midi"), MidiChecker) def test_unknown_domain_returns_none(self) -> None: from muse.cli.commands.check import _get_checker assert _get_checker("genomics") is None assert _get_checker("") is None assert _get_checker("CODE") is None # case-sensitive # --------------------------------------------------------------------------- # Unit — _filter_report # --------------------------------------------------------------------------- class TestFilterReport: def _make_report_with_violations(self) -> BaseReport: violations: list[BaseViolation] = [ BaseViolation(rule_name="max_complexity", severity="error", address="src/a.py::foo", description="too complex"), BaseViolation(rule_name="max_complexity", severity="warning", address="src/b.py::bar", description="complex"), BaseViolation(rule_name="no_cycles", severity="error", address="src/c.py", description="cycle"), BaseViolation(rule_name="coverage", severity="info", address="src/d.py", description="low coverage"), ] return make_report("a" * 64, "code", violations, 3) def test_filter_by_severity_error(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity="error", filter_rule=None, filter_path=None) assert all(v["severity"] == "error" for v in filtered["violations"]) assert len(filtered["violations"]) == 2 def test_filter_by_severity_warning(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity="warning", filter_rule=None, filter_path=None) assert len(filtered["violations"]) == 1 assert filtered["violations"][0]["rule_name"] == "max_complexity" def test_filter_by_severity_info(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity="info", filter_rule=None, filter_path=None) assert len(filtered["violations"]) == 1 assert filtered["violations"][0]["rule_name"] == "coverage" def test_filter_by_rule_name(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity=None, filter_rule="no_cycles", filter_path=None) assert all(v["rule_name"] == "no_cycles" for v in filtered["violations"]) assert len(filtered["violations"]) == 1 def test_filter_by_path_glob(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity=None, filter_rule=None, filter_path="src/a.py::*") assert len(filtered["violations"]) == 1 assert filtered["violations"][0]["address"] == "src/a.py::foo" def test_combined_filters(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity="error", filter_rule="max_complexity", filter_path=None) assert len(filtered["violations"]) == 1 assert filtered["violations"][0]["address"] == "src/a.py::foo" def test_no_filters_returns_all(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity=None, filter_rule=None, filter_path=None) assert len(filtered["violations"]) == len(report["violations"]) def test_filter_no_match_returns_empty(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity="error", filter_rule="nonexistent_rule", filter_path=None) assert filtered["violations"] == [] def test_rules_checked_preserved_through_filter(self) -> None: from muse.cli.commands.check import _filter_report report = self._make_report_with_violations() filtered = _filter_report(report, filter_severity="error", filter_rule=None, filter_path=None) assert filtered["rules_checked"] == report["rules_checked"] # --------------------------------------------------------------------------- # Unit — _CheckJson shape # --------------------------------------------------------------------------- class TestCheckJsonShape: def test_required_keys_present(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--json") assert code == 0 data = json.loads(out.strip()) required = { "commit_id", "domain", "rules_checked", "has_errors", "has_warnings", "error_count", "warning_count", "info_count", "total_violations", "violations", "base_commit_id", "duration_ms", "exit_code", } assert required <= set(data.keys()) def test_field_types(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--json") d = json.loads(out.strip()) assert isinstance(d["commit_id"], str) assert isinstance(d["domain"], str) assert isinstance(d["rules_checked"], int) assert isinstance(d["has_errors"], bool) assert isinstance(d["has_warnings"], bool) assert isinstance(d["error_count"], int) assert isinstance(d["warning_count"], int) assert isinstance(d["info_count"], int) assert isinstance(d["total_violations"], int) assert isinstance(d["violations"], list) assert isinstance(d["duration_ms"], float) def test_duration_ms_positive(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--json") d = json.loads(out.strip()) assert d["duration_ms"] > 0.0 def test_base_commit_id_none_without_base_flag(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--json") d = json.loads(out.strip()) assert d["base_commit_id"] is None def test_counts_consistent_with_violations(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--json") d = json.loads(out.strip()) total = d["error_count"] + d["warning_count"] + d["info_count"] assert total == d["total_violations"] assert len(d["violations"]) == d["total_violations"] # --------------------------------------------------------------------------- # Integration — basic invocation # --------------------------------------------------------------------------- class TestBasicInvocation: def test_default_head_exits_zero(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, _ = _invoke(root, "check") assert code == 0 def test_text_output_contains_domain(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check") assert "code" in out def test_text_output_contains_rules_checked(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check") assert "rules" in out def test_text_output_contains_commit_prefix(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cids = _write_commit_chain(root) _, out = _invoke(root, "check") # check.py displays short_id(commit_id) — bare 12-char hex. assert short_id(cids[-1], strip=True) in out def test_text_output_has_elapsed_time(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check") assert "s)" in out # e.g. "(0.123s)" def test_full_sha_argument(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cids = _write_commit_chain(root) code, _ = _invoke(root, "check", cids[-1]) assert code == 0 def test_short_sha_argument(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cids = _write_commit_chain(root) short = short_id(cids[-1], strip=True) code, _ = _invoke(root, "check", short) assert code == 0 def test_head_tilde_1(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root, n=3) code, _ = _invoke(root, "check", "HEAD~1") assert code == 0 def test_head_tilde_0(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root, n=2) code, _ = _invoke(root, "check", "HEAD~0") assert code == 0 # --------------------------------------------------------------------------- # Integration — --strict and --warn # --------------------------------------------------------------------------- class TestStrictAndWarn: def _make_clean_report_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: """Repo with a simple clean Python file — no violations expected.""" root = _make_repo(tmp_path) _write_commit_chain(root, file_content=b"x = 1\n") return root def test_strict_exits_0_when_no_errors(self, tmp_path: pathlib.Path) -> None: root = self._make_clean_report_repo(tmp_path) code, _ = _invoke(root, "check", "--strict") # Code domain with a clean file may still have warnings — strict only cares about errors. assert code in (0, 1) # 0 if no errors, 1 if errors def test_warn_flag_in_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--json") d = json.loads(out.strip()) # JSON always has warning_count regardless of --warn flag. assert "warning_count" in d def test_strict_json_exit_code_consistent(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--strict", "--json") d = json.loads(out.strip()) if d["has_errors"]: assert code == 1 else: assert code == 0 # --------------------------------------------------------------------------- # Integration — --base diff mode # --------------------------------------------------------------------------- class TestBaseMode: def test_same_commit_as_base_zero_new_violations(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cids = _write_commit_chain(root, n=1) # Base is same as HEAD → no new violations. code, _ = _invoke(root, "check", "--base", cids[0]) assert code == 0 def test_base_head_tilde_1_on_identical_snapshots(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root, n=3) # HEAD~1 and HEAD have the same file content → diff is zero violations. code, out = _invoke(root, "check", "--base", "HEAD~1") assert code == 0 def test_base_sets_base_commit_id_in_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cids = _write_commit_chain(root, n=2) _, out = _invoke(root, "check", "--base", "HEAD~1", "--json") d = json.loads(out.strip()) assert d["base_commit_id"] == cids[0] # HEAD~1 is the first commit def test_base_vs_mode_header_in_text(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root, n=2) _, out = _invoke(root, "check", "--base", "HEAD~1") assert "vs" in out def test_base_bad_ref_exits_nonzero(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, _ = _invoke_unchecked(root, "check", "--base", "nonexistent-branch") assert code != 0 def test_base_json_error_on_bad_ref(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke_unchecked(root, "check", "--base", "bad/ref", "--json") assert code != 0 d = json.loads(out.strip()) assert "error" in d # --------------------------------------------------------------------------- # Integration — --branch # --------------------------------------------------------------------------- class TestBranchFlag: def test_branch_flag_checks_other_branch_head(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) # Create commits on two branches. _write_commit_chain(root, branch="main") _write_commit_chain(root, branch="dev", file_content=b"y = 2\n") code, out = _invoke(root, "check", "--branch", "dev") assert code == 0 assert "code" in out def test_branch_nonexistent_exits_nonzero(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, _ = _invoke_unchecked(root, "check", "--branch", "does-not-exist") assert code != 0 # --------------------------------------------------------------------------- # Integration — --filter flags # --------------------------------------------------------------------------- class TestFilterFlags: def test_filter_severity_error_in_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--filter-severity", "error", "--json") d = json.loads(out.strip()) for v in d["violations"]: assert v["severity"] == "error" def test_filter_severity_warning_in_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--filter-severity", "warning", "--json") d = json.loads(out.strip()) for v in d["violations"]: assert v["severity"] == "warning" def test_filter_rule_in_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--filter-rule", "max_complexity", "--json") d = json.loads(out.strip()) for v in d["violations"]: assert v["rule_name"] == "max_complexity" def test_filter_path_limits_addresses(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--filter-path", "*.py::*", "--json") d = json.loads(out.strip()) for v in d["violations"]: assert ".py" in v["address"] def test_filter_shown_in_text_header(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--filter-severity", "error") assert "filtered" in out or "severity=error" in out def test_filter_severity_invalid_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, _ = _invoke_unchecked(root, "check", "--filter-severity", "critical") assert code != 0 # --------------------------------------------------------------------------- # Integration — --summary # --------------------------------------------------------------------------- class TestSummaryFlag: def test_summary_outputs_single_line(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--summary") # Header line + summary line content_lines = [ln for ln in out.strip().splitlines() if ln.strip()] assert len(content_lines) == 2 def test_summary_pass_shows_checkmark(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root, file_content=b"x = 1\n") # Filter to info only to guarantee zero violations in output. _, out = _invoke(root, "check", "--summary", "--filter-severity", "info") # Most repos have 0 info violations, but we check for correct format assert ("✅" in out or "❌" in out) # one of the two always appears def test_summary_strict_propagates_exit(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out_json = _invoke(root, "check", "--json") d = json.loads(out_json.strip()) code, _ = _invoke(root, "check", "--summary", "--strict") if d["has_errors"]: assert code == 1 else: assert code == 0 def test_summary_no_violation_details(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--summary") # Summary mode should NOT list individual violations. assert "[max_complexity]" not in out assert "[no_cycles]" not in out # --------------------------------------------------------------------------- # Integration — --rules # --------------------------------------------------------------------------- class TestRulesFlag: def test_empty_rules_file_no_violations(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) rules = root / "empty.toml" rules.write_text("") _, out = _invoke(root, "check", "--rules", "empty.toml", "--json") d = json.loads(out.strip()) assert d["rules_checked"] == 0 assert d["total_violations"] == 0 def test_custom_rules_file_used(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) rules = root / "my_rules.toml" rules.write_text( '[[rule]]\nname = "max_complexity"\nseverity = "warning"\n' 'scope = "function"\nrule_type = "max_complexity"\n\n' '[rule.params]\nthreshold = 100\n' ) _, out = _invoke(root, "check", "--rules", "my_rules.toml", "--json") d = json.loads(out.strip()) assert d["rules_checked"] == 1 def test_rules_path_outside_repo_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke_unchecked(root, "check", "--rules", "../../../etc/passwd") assert code != 0 assert "outside" in out.lower() or "error" in out.lower() def test_rules_absolute_path_outside_repo_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke_unchecked(root, "check", "--rules", "/etc/passwd") assert code != 0 def test_rules_inside_repo_accepted(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) rules = muse_dir(root) / "rules.toml" rules.write_text("") code, _ = _invoke(root, "check", "--rules", ".muse/rules.toml") assert code == 0 # --------------------------------------------------------------------------- # Edge cases # --------------------------------------------------------------------------- class TestEdgeCases: def test_no_commits_exits_nonzero(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) # No commits at all. code, out = _invoke_unchecked(root, "check") assert code != 0 def test_no_commits_json_has_error(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) code, out = _invoke_unchecked(root, "check", "--json") assert code != 0 d = json.loads(out.strip()) assert "error" in d def test_unknown_domain_exits_zero_with_warning(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path, domain="genomics") _write_commit_chain(root) code, out = _invoke(root, "check") assert code == 0 # Should mention the domain in the warning. def test_unknown_domain_json_has_error_key(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path, domain="spacetime") _write_commit_chain(root) code, out = _invoke(root, "check", "--json") assert code == 0 d = json.loads(out.strip()) assert "error" in d def test_head_tilde_past_root_exits_nonzero(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root, n=1) code, _ = _invoke_unchecked(root, "check", "HEAD~999") assert code != 0 def test_filter_severity_no_match_empty_report(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root, file_content=b"x=1\n") _, out = _invoke(root, "check", "--filter-severity", "info", "--json") d = json.loads(out.strip()) # info violations are rare; just verify the filter ran. assert isinstance(d["total_violations"], int) def test_base_same_as_head_zero_new_in_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) cids = _write_commit_chain(root, n=1) _, out = _invoke(root, "check", "--base", cids[0], "--json") d = json.loads(out.strip()) assert d["total_violations"] == 0 # --------------------------------------------------------------------------- # Security tests # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_commit_arg_doesnt_crash(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) # ANSI escape in commit arg should be handled without crashing. code, _ = _invoke_unchecked(root, "check", "\x1b[31mmalicious\x1b[0m") assert code != 0 # bad ref, but no crash def test_ansi_in_filter_rule_doesnt_crash(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, _ = _invoke(root, "check", "--filter-rule", "\x1b[31mrule\x1b[0m") assert code == 0 # no matching rule, no crash def test_rules_dotdot_path_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke_unchecked(root, "check", "--rules", "../outside.toml") assert code != 0 assert "outside" in out.lower() or "error" in out.lower() def test_rules_symlink_outside_repo_rejected(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) # Create a symlink inside the repo that points outside. outside = tmp_path.parent / "outside_rules.toml" outside.write_text("") link = root / "malicious_rules.toml" link.symlink_to(outside) code, out = _invoke_unchecked(root, "check", "--rules", "malicious_rules.toml") assert code != 0 def test_filter_path_slash_etc_doesnt_crash(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, _ = _invoke(root, "check", "--filter-path", "/etc/*") assert code == 0 def test_null_byte_in_commit_arg_doesnt_crash(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, _ = _invoke_unchecked(root, "check", "abc\x00def") assert code != 0 # bad ref, no crash # --------------------------------------------------------------------------- # Stress tests # --------------------------------------------------------------------------- class TestStress: def test_filter_on_200_violation_report(self, tmp_path: pathlib.Path) -> None: """_filter_report handles a 200-violation list efficiently.""" from muse.cli.commands.check import _filter_report violations: list[BaseViolation] = [] for i in range(200): violations.append(BaseViolation( rule_name="max_complexity" if i % 2 == 0 else "no_cycles", severity="error" if i % 3 == 0 else "warning", address=f"src/module_{i}.py::func_{i}", description=f"violation {i}", )) report = make_report("a" * 64, "code", violations, 3) filtered = _filter_report(report, filter_severity="error", filter_rule=None, filter_path=None) assert all(v["severity"] == "error" for v in filtered["violations"]) # Deterministic count: every 3rd item (0-indexed) is error. expected = sum(1 for i in range(200) if i % 3 == 0) assert len(filtered["violations"]) == expected def test_check_with_50_rule_toml(self, tmp_path: pathlib.Path) -> None: """muse check with a large rules TOML doesn't crash.""" root = _make_repo(tmp_path) _write_commit_chain(root, file_content=b"x = 1\n") rules_lines = [] for i in range(50): rules_lines.append(f"[[rule]]") rules_lines.append(f'name = "rule_{i}"') rules_lines.append(f'severity = "warning"') rules_lines.append(f'scope = "function"') rules_lines.append(f'rule_type = "max_complexity"') rules_lines.append(f"[rule.params]") rules_lines.append(f"threshold = {1000 + i}") rules_lines.append("") rules = root / "big_rules.toml" rules.write_text("\n".join(rules_lines)) code, out = _invoke(root, "check", "--rules", "big_rules.toml", "--json") assert code == 0 d = json.loads(out.strip()) assert d["rules_checked"] == 50 def test_filter_report_with_glob_on_200_items(self, tmp_path: pathlib.Path) -> None: """Path glob filter on a large violation list is correct.""" from muse.cli.commands.check import _filter_report violations: list[BaseViolation] = [] for i in range(200): violations.append(BaseViolation( rule_name="max_complexity", severity="warning", address=f"src/a/module_{i}.py::func" if i < 100 else f"src/b/module_{i}.py::func", description=f"v{i}", )) report = make_report("a" * 64, "code", violations, 1) filtered = _filter_report(report, filter_severity=None, filter_rule=None, filter_path="src/a/*") assert len(filtered["violations"]) == 100 assert all("src/a/" in v["address"] for v in filtered["violations"]) def test_json_output_with_many_commits(self, tmp_path: pathlib.Path) -> None: """muse check --json works correctly on a repo with 20 commits.""" root = _make_repo(tmp_path) _write_commit_chain(root, n=20) code, out = _invoke(root, "check", "--json") assert code == 0 d = json.loads(out.strip()) assert isinstance(d["total_violations"], int) assert d["duration_ms"] > 0 # --------------------------------------------------------------------------- # exit_code in JSON — agent gating without shell $? # --------------------------------------------------------------------------- class TestExitCodeInJson: """exit_code in --json output lets agents gate on results without relying on $?.""" def test_exit_code_present_in_json(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) _, out = _invoke(root, "check", "--json") d = json.loads(out.strip()) assert "exit_code" in d assert isinstance(d["exit_code"], int) def test_exit_code_zero_when_no_strict_or_warn(self, tmp_path: pathlib.Path) -> None: """Without --strict/--warn, exit_code is always 0 regardless of violations.""" root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--json") d = json.loads(out.strip()) assert d["exit_code"] == 0 assert code == d["exit_code"] def test_exit_code_matches_process_exit_with_strict(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--strict", "--json") d = json.loads(out.strip()) assert d["exit_code"] == code def test_exit_code_matches_process_exit_with_warn(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--warn", "--json") d = json.loads(out.strip()) assert d["exit_code"] == code def test_exit_code_matches_process_exit_strict_and_warn(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--strict", "--warn", "--json") d = json.loads(out.strip()) assert d["exit_code"] == code def test_exit_code_in_json_with_filter(self, tmp_path: pathlib.Path) -> None: """exit_code is present even when filters narrow the violation list.""" root = _make_repo(tmp_path) _write_commit_chain(root) code, out = _invoke(root, "check", "--json", "--filter-severity", "error", "--strict") d = json.loads(out.strip()) assert "exit_code" in d assert d["exit_code"] == code # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.check import register as _register_check from muse.core.paths import ref_path def _parse_check(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_check(subs) return root_p.parse_args(["check", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_check() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_check("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_check("-j") assert ns.json_out is True def test_strict_flag(self) -> None: ns = _parse_check("--strict") assert ns.strict is True def test_format_flag_no_longer_exists(self) -> None: import pytest with pytest.raises(SystemExit): _parse_check("--format", "json")