"""Comprehensive tests for ``muse check-attr``. Audit findings addressed here ------------------------------ Performance - Default mode previously called ``resolve_strategy`` + ``_find_matching_rule`` separately → 2× rule-list iteration per path. Replaced with ``_resolve_with_rule`` — single O(N) pass returning (strategy, rule). Code quality - ``--all-rules`` manual dim-match loop extracted into ``_all_matching_rules``. - ``_dim_match`` helper eliminates repeated boolean expression. Security - Format error now goes to stderr (was stdout) — verified below. - ANSI injection in text output stripped via sanitize_display(). - Null bytes in paths now rejected with USER_ERROR. Agent UX - ``--stdin`` — read paths from stdin (one per line, # comments skipped). - ``--rules-only`` — emit loaded rules without testing paths. - ``formatter_class`` added for clean --help rendering. - ``nargs="*"`` on paths (was "+") to support --stdin and --rules-only. Coverage tiers -------------- - Unit: _dim_match, _resolve_with_rule, _all_matching_rules, _rule_to_dict, _RuleDict / _PathResult schemas - Integration: JSON/text formats, --dimension filter, --all-rules, --rules-only, --stdin, empty .museattributes, multiple rules priority, negation - Security: null bytes rejected, ANSI stripped in text, format error→stderr, no tracebacks, bad TOML errors cleanly - Stress: 1 000-path batch, 50-rule set, 200 sequential runs, stdin 1 000 paths """ from __future__ import annotations import json import pathlib import pytest from muse.core.attributes import AttributeRule from muse.core.paths import muse_dir from muse.core.errors import ExitCode from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path, domain: str = "code") -> pathlib.Path: repo = tmp_path / "repo" dot_muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "r1", "domain": domain})) return repo def _write_attrs(repo: pathlib.Path, content: str) -> None: (repo / ".museattributes").write_text(content) _SIMPLE_ATTRS = """ [meta] domain = "code" [[rules]] path = "build/*" dimension = "*" strategy = "ours" comment = "Build artifacts prefer ours." priority = 10 [[rules]] path = "*.md" dimension = "*" strategy = "union" priority = 5 """ def _ca(repo: pathlib.Path, *args: str, stdin: str | None = None) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke( cli, ["check-attr", *args], env={"MUSE_REPO_ROOT": str(repo)}, input=stdin, ) def _make_rule( path_pattern: str = "*", dimension: str = "*", strategy: str = "ours", priority: int = 0, source_index: int = 0, ) -> AttributeRule: from dataclasses import dataclass return AttributeRule( path_pattern=path_pattern, dimension=dimension, strategy=strategy, priority=priority, source_index=source_index, ) # --------------------------------------------------------------------------- # Unit — private helpers # --------------------------------------------------------------------------- class TestDimMatch: def test_wildcard_rule_matches_any(self) -> None: from muse.cli.commands.check_attr import _dim_match rule = _make_rule(dimension="*") assert _dim_match(rule, "notes") assert _dim_match(rule, "pitch_bend") def test_exact_dim_matches(self) -> None: from muse.cli.commands.check_attr import _dim_match rule = _make_rule(dimension="notes") assert _dim_match(rule, "notes") def test_exact_dim_no_match(self) -> None: from muse.cli.commands.check_attr import _dim_match rule = _make_rule(dimension="notes") assert not _dim_match(rule, "pitch_bend") def test_wildcard_query_matches_any_rule(self) -> None: from muse.cli.commands.check_attr import _dim_match rule = _make_rule(dimension="notes") assert _dim_match(rule, "*") class TestResolveWithRule: def test_no_rules_returns_auto(self) -> None: from muse.cli.commands.check_attr import _resolve_with_rule strategy, rule = _resolve_with_rule([], "tracks/drums.mid", "*") assert strategy == "auto" assert rule is None def test_matching_rule_returned(self) -> None: from muse.cli.commands.check_attr import _resolve_with_rule rules = [_make_rule(path_pattern="build/*", strategy="ours")] strategy, rule = _resolve_with_rule(rules, "build/out.bin", "*") assert strategy == "ours" assert rule is not None assert rule.path_pattern == "build/*" def test_non_matching_path_returns_auto(self) -> None: from muse.cli.commands.check_attr import _resolve_with_rule rules = [_make_rule(path_pattern="build/*", strategy="ours")] strategy, rule = _resolve_with_rule(rules, "tracks/drums.mid", "*") assert strategy == "auto" assert rule is None def test_first_match_wins(self) -> None: from muse.cli.commands.check_attr import _resolve_with_rule rules = [ _make_rule(path_pattern="*.mid", strategy="ours", priority=10), _make_rule(path_pattern="tracks/*", strategy="theirs", priority=5), ] strategy, rule = _resolve_with_rule(rules, "tracks/drums.mid", "*") assert strategy == "ours" def test_dimension_filter_respected(self) -> None: from muse.cli.commands.check_attr import _resolve_with_rule rules = [_make_rule(path_pattern="*", dimension="notes", strategy="union")] strategy, rule = _resolve_with_rule(rules, "tracks/drums.mid", "pitch_bend") assert strategy == "auto" assert rule is None class TestAllMatchingRules: def test_returns_all_matches(self) -> None: from muse.cli.commands.check_attr import _all_matching_rules rules = [ _make_rule(path_pattern="*.mid", strategy="ours"), _make_rule(path_pattern="tracks/*", strategy="theirs"), _make_rule(path_pattern="build/*", strategy="union"), ] matched = _all_matching_rules(rules, "tracks/drums.mid", "*") assert len(matched) == 2 def test_empty_when_no_match(self) -> None: from muse.cli.commands.check_attr import _all_matching_rules rules = [_make_rule(path_pattern="build/*", strategy="ours")] assert _all_matching_rules(rules, "tracks/drums.mid", "*") == [] class TestRuleToDict: def test_all_fields_present(self) -> None: from muse.cli.commands.check_attr import _rule_to_dict rule = _make_rule(path_pattern="*.mid", dimension="notes", strategy="ours", priority=5, source_index=2) d = _rule_to_dict(rule) assert d["path_pattern"] == "*.mid" assert d["dimension"] == "notes" assert d["strategy"] == "ours" assert d["priority"] == 5 assert d["source_index"] == 2 class TestSchemas: def test_path_result_fields(self) -> None: from muse.cli.commands.check_attr import _PathResult fields = set(_PathResult.__annotations__) assert fields == {"path", "dimension", "strategy", "rule"} def test_rule_dict_fields(self) -> None: from muse.cli.commands.check_attr import _RuleDict fields = set(_RuleDict.__annotations__) assert {"path_pattern", "dimension", "strategy", "comment", "priority", "source_index"} == fields # --------------------------------------------------------------------------- # Integration — JSON output (default mode) # --------------------------------------------------------------------------- class TestJsonOutput: def test_no_attrs_file_returns_auto(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "--json", "tracks/drums.mid") assert result.exit_code == 0 data = json.loads(result.output) assert data["rules_loaded"] == 0 assert data["results"][0]["strategy"] == "auto" assert data["results"][0]["rule"] is None def test_matching_rule_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "build/out.bin").output) assert data["results"][0]["strategy"] == "ours" assert data["results"][0]["rule"] is not None assert data["results"][0]["rule"]["path_pattern"] == "build/*" def test_non_matching_path_auto(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "tracks/drums.mid").output) assert data["results"][0]["strategy"] == "auto" def test_multiple_paths(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "build/out.bin", "README.md", "main.py").output) assert len(data["results"]) == 3 strategies = [r["strategy"] for r in data["results"]] assert strategies == ["ours", "union", "auto"] def test_json_shorthand(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "--json", "foo.py") assert result.exit_code == 0 assert "results" in json.loads(result.output) def test_dimension_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, """ [[rules]] path = "tracks/*" dimension = "notes" strategy = "union" """) data = json.loads(_ca(repo, "--json", "--dimension", "notes", "tracks/drums.mid").output) assert data["results"][0]["strategy"] == "union" def test_dimension_no_match_auto(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, """ [[rules]] path = "tracks/*" dimension = "notes" strategy = "union" """) data = json.loads(_ca(repo, "--json", "--dimension", "pitch_bend", "tracks/drums.mid").output) assert data["results"][0]["strategy"] == "auto" # --------------------------------------------------------------------------- # Integration — text output # --------------------------------------------------------------------------- class TestTextOutput: def test_strategy_in_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _ca(repo, "build/out.bin") assert result.exit_code == 0 assert "strategy=ours" in result.output def test_no_matching_rule_label(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "main.py") assert "no matching rule" in result.output # --------------------------------------------------------------------------- # Integration — --all-rules # --------------------------------------------------------------------------- class TestAllRules: def test_all_matching_rules_returned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, """ [[rules]] path = "tracks/*" dimension = "*" strategy = "ours" [[rules]] path = "*.mid" dimension = "*" strategy = "union" """) data = json.loads(_ca(repo, "--json", "--all-rules", "tracks/drums.mid").output) result = data["results"][0] assert len(result["matching_rules"]) == 2 def test_no_matching_rules(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "--all-rules", "main.py").output) assert data["results"][0]["matching_rules"] == [] def test_all_rules_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _ca(repo, "--all-rules", "build/a.bin") assert "strategy=ours" in result.output # --------------------------------------------------------------------------- # Integration — --rules-only (new agent UX) # --------------------------------------------------------------------------- class TestRulesOnly: def test_json_rules_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "--rules-only").output) assert "rules" in data assert len(data["rules"]) == 2 assert data["domain"] == "code" def test_empty_attrs_empty_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "--rules-only").output) assert data["rules"] == [] assert data["rules_loaded"] == 0 def test_text_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _ca(repo, "--rules-only") assert result.exit_code == 0 assert "strategy=ours" in result.output def test_no_path_required(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "--rules-only") assert result.exit_code == 0 # --------------------------------------------------------------------------- # Integration — --stdin (new agent UX) # --------------------------------------------------------------------------- class TestStdinMode: def test_reads_paths_from_stdin(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _ca(repo, "--json", "--stdin", stdin="build/out.bin\nmain.py\n") data = json.loads(result.output) assert len(data["results"]) == 2 assert data["results"][0]["strategy"] == "ours" assert data["results"][1]["strategy"] == "auto" def test_blank_lines_and_comments_skipped(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "--json", "--stdin", stdin="\n# comment\nmain.py\n\n") data = json.loads(result.output) assert len(data["results"]) == 1 def test_stdin_combines_with_positional(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "--json", "--stdin", "positional.py", stdin="from_stdin.py\n") data = json.loads(result.output) paths = [r["path"] for r in data["results"]] assert "positional.py" in paths assert "from_stdin.py" in paths def test_empty_stdin_no_positional_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "--stdin", stdin="") assert result.exit_code == ExitCode.USER_ERROR # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_null_byte_in_path_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "tracks/\x00malicious.mid") assert result.exit_code == ExitCode.USER_ERROR assert "null byte" in result.stderr.lower() def test_ansi_in_path_stripped_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "\x1b[31mmalicious\x1b[0m.py") assert "\x1b" not in result.output def test_ansi_in_path_pattern_stripped_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, """ [[rules]] path = "\\u001b[31m*\\u001b[0m" dimension = "*" strategy = "ours" """) result = _ca(repo, "tracks/drums.mid") assert "\x1b" not in result.output def test_no_traceback_on_invalid_toml(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) (repo / ".museattributes").write_text("[broken toml !!!") result = _ca(repo, "foo.py") assert "Traceback" not in result.output def test_invalid_toml_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) (repo / ".museattributes").write_text("[broken toml !!!") result = _ca(repo, "foo.py") assert result.exit_code == ExitCode.INTERNAL_ERROR assert "Traceback" not in result.output def test_no_paths_errors_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo) assert result.exit_code == ExitCode.USER_ERROR assert "error" in result.stderr.lower() # --------------------------------------------------------------------------- # duration_ms # --------------------------------------------------------------------------- class TestElapsed: def test_elapsed_in_default_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "foo.py").output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_elapsed_in_rules_only_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "--rules-only").output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_elapsed_in_all_rules_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "--all-rules", "foo.py").output) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_elapsed_absent_from_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "foo.py") assert "duration_ms" not in result.output # --------------------------------------------------------------------------- # exit_code # --------------------------------------------------------------------------- class TestExitCode: def test_exit_code_0_in_default_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "foo.py").output) assert "exit_code" in data assert data["exit_code"] == 0 def test_exit_code_in_rules_only_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "--rules-only").output) assert "exit_code" in data assert data["exit_code"] == 0 def test_exit_code_in_all_rules_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "--all-rules", "foo.py").output) assert "exit_code" in data assert data["exit_code"] == 0 # --------------------------------------------------------------------------- # summary block (default mode only) # --------------------------------------------------------------------------- class TestSummary: def test_summary_present_in_default_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "build/out.bin", "README.md", "main.py").output) assert "summary" in data s = data["summary"] assert s["total"] == 3 assert s["matched"] == 2 assert s["unmatched"] == 1 def test_summary_by_strategy(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "build/out.bin", "README.md", "main.py").output) by = data["summary"]["by_strategy"] assert by.get("ours") == 1 assert by.get("union") == 1 assert by.get("auto") == 1 def test_summary_all_unmatched(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "foo.py", "bar.py").output) s = data["summary"] assert s["total"] == 2 assert s["matched"] == 0 assert s["unmatched"] == 2 assert s["by_strategy"] == {"auto": 2} def test_summary_absent_from_rules_only(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "--rules-only").output) assert "summary" not in data def test_summary_absent_from_all_rules(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "--all-rules", "foo.py").output) assert "summary" not in data # --------------------------------------------------------------------------- # --unmatched-only # --------------------------------------------------------------------------- class TestUnmatchedOnly: def test_filters_to_auto_paths(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "--unmatched-only", "build/out.bin", "main.py").output) assert len(data["results"]) == 1 assert data["results"][0]["path"] == "main.py" assert data["results"][0]["strategy"] == "auto" def test_empty_when_all_matched(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "--unmatched-only", "build/out.bin", "README.md").output) assert data["results"] == [] def test_all_when_none_matched(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_ca(repo, "--json", "--unmatched-only", "foo.py", "bar.py").output) assert len(data["results"]) == 2 def test_summary_reflects_filter(self, tmp_path: pathlib.Path) -> None: """summary.total reflects filtered count, not original count.""" repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) data = json.loads(_ca(repo, "--json", "--unmatched-only", "build/out.bin", "main.py").output) assert data["summary"]["total"] == 1 assert data["summary"]["unmatched"] == 1 def test_text_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _ca(repo, "--unmatched-only", "build/out.bin", "main.py") assert result.exit_code == 0 assert "build/out.bin" not in result.output assert "main.py" in result.output def test_incompatible_with_rules_only(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _ca(repo, "--unmatched-only", "--rules-only") assert result.exit_code == ExitCode.USER_ERROR def test_stdin_compatible(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _ca(repo, "--json", "--unmatched-only", "--stdin", stdin="build/out.bin\nmain.py\n") data = json.loads(result.output) assert len(data["results"]) == 1 assert data["results"][0]["path"] == "main.py" # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_1000_paths(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) paths = [f"build/file_{i:04d}.bin" for i in range(500)] paths += [f"src/file_{i:04d}.py" for i in range(500)] result = _ca(repo, "--json", *paths) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) == 1000 ours_count = sum(1 for r in data["results"] if r["strategy"] == "ours") assert ours_count == 500 def test_50_rule_set(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) rules_toml = "\n".join( f'[[rules]]\npath = "dir_{i}/*"\ndimension = "*"\nstrategy = "ours"\npriority = {50 - i}\n' for i in range(50) ) _write_attrs(repo, rules_toml) data = json.loads(_ca(repo, "--json", "dir_0/file.py", "dir_49/file.py", "other.py").output) assert data["rules_loaded"] == 50 assert data["results"][0]["strategy"] == "ours" assert data["results"][1]["strategy"] == "ours" assert data["results"][2]["strategy"] == "auto" def test_200_sequential_runs(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) for i in range(200): result = _ca(repo, "--json", "build/out.bin") assert result.exit_code == 0, f"failed at iteration {i}" data = json.loads(result.output) assert data["results"][0]["strategy"] == "ours" def test_stdin_1000_paths(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) stdin_input = "\n".join(f"build/file_{i}.bin" for i in range(1000)) + "\n" result = _ca(repo, "--json", "--stdin", stdin=stdin_input) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["results"]) == 1000 assert all(r["strategy"] == "ours" for r in data["results"]) # --------------------------------------------------------------------------- # Flag tests # --------------------------------------------------------------------------- import argparse as _argparse class TestRegisterFlags: def _parse(self, *args: str) -> _argparse.Namespace: from muse.cli.commands.check_attr import register p = _argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["check-attr", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse("foo.py") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json", "foo.py") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j", "foo.py") assert ns.json_out is True