"""Comprehensive tests for ``muse attributes`` CLI hardening. Audit findings addressed ------------------------ Security - ANSI injection in domain, path_pattern, dimension, strategy, comment fields stripped via sanitize_display() before text-mode output. - Null bytes in paths supplied to ``check`` rejected with USER_ERROR. - No raw Python tracebacks exposed on ValueError from load_attributes_full. - All diagnostic messages routed to stderr; stdout carries data only. Performance - Single-pass parsing (load_attributes_full) instead of two separate _parse_raw calls (read_attributes_meta + load_attributes). - File size cap (_MAX_ATTRIBUTES_BYTES) prevents OOM. Correctness - ``list`` now shows comment and priority columns. - JSON always includes ``domain`` (empty string when unset). - JSON includes ``comment`` and ``priority`` fields on every rule. - Empty-file vs. missing-file messages are distinct. - validate exit-codes 0 on valid, USER_ERROR on invalid. Agent UX - ``muse attributes list --json`` stable schema. - ``muse attributes check --json`` stable schema with rule_index. - ``muse attributes validate --json`` stable schema. - subcommand structure mirrors hub/config/auth pattern. Coverage tiers -------------- - Unit: _resolve_with_index, _rule_to_json, TypedDict schemas - Integration: run_list, run_check, run_validate via CliRunner - Security: ANSI injection, null bytes, stderr routing, no tracebacks - E2E: full CLI invocation, JSON round-trips, exit codes - Stress: 1 000 paths, 200 rules, concurrent isolated parses """ from __future__ import annotations import json import pathlib import threading from typing import TYPE_CHECKING from unittest.mock import patch import pytest from unittest.mock import patch from muse.core.attributes import ( AttributeRule, AttributesMeta, _MAX_ATTRIBUTES_BYTES, load_attributes_full, ) from muse.core.errors import ExitCode from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.paths import muse_dir if TYPE_CHECKING: from muse.cli.commands.attributes import ( _CheckJson, _CheckResultJson, _ListJson, _RuleJson, _ValidateJson, ) runner = CliRunner() cli = None # argparse-based; CliRunner ignores this # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path, domain: str = "code") -> pathlib.Path: """Create a minimal Muse repo layout at *tmp_path*.""" repo = tmp_path / "repo" dot_muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "r1", "domain": domain}), encoding="utf-8", ) return repo def _write_attrs(repo: pathlib.Path, content: str) -> None: (repo / ".museattributes").write_text(content, encoding="utf-8") _SIMPLE_ATTRS = """\ [meta] domain = "code" [[rules]] path = "build/*" dimension = "*" strategy = "ours" comment = "Build artifacts prefer ours." priority = 10 [[rules]] path = "*.md" dimension = "*" strategy = "theirs" comment = "Docs from incoming branch." priority = 5 [[rules]] path = "*" dimension = "*" strategy = "auto" comment = "" priority = 0 """ _ANSI = "\x1b[31mmalicious\x1b[0m" def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: """Run ``muse attributes `` inside *repo* via MUSE_REPO_ROOT.""" return runner.invoke( cli, ["attributes", *args], env={"MUSE_REPO_ROOT": str(repo)}, ) # --------------------------------------------------------------------------- # Unit — TypedDicts exist with correct keys # --------------------------------------------------------------------------- class TestTypedDicts: def test_rule_json_keys(self) -> None: from muse.cli.commands.attributes import _RuleJson rule: _RuleJson = { "path_pattern": "x", "dimension": "y", "strategy": "auto", "comment": "", "priority": 0, "source_index": 0, } assert set(rule.keys()) == { "path_pattern", "dimension", "strategy", "comment", "priority", "source_index", } def test_list_json_keys(self) -> None: from muse.cli.commands.attributes import _ListJson payload: _ListJson = {"domain": "", "rules": []} assert set(payload.keys()) == {"domain", "rules"} def test_check_json_keys(self) -> None: from muse.cli.commands.attributes import _CheckJson, _CheckResultJson item: _CheckResultJson = { "path": "x.mid", "dimension": "*", "strategy": "auto", "rule_index": -1, } payload: _CheckJson = {"results": [item]} assert "results" in payload def test_validate_json_keys(self) -> None: from muse.cli.commands.attributes import _ValidateJson, _ValidateErrorJson err: _ValidateErrorJson = {"kind": "missing", "message": "no file"} payload: _ValidateJson = {"valid": False, "errors": [err]} assert set(payload.keys()) == {"valid", "errors"} # --------------------------------------------------------------------------- # Unit — _resolve_with_index # --------------------------------------------------------------------------- class TestResolveWithIndex: def _rules(self) -> list[AttributeRule]: return [ AttributeRule("build/*", "*", "ours", "x", 10, 0), AttributeRule("*.md", "*", "theirs", "", 5, 1), AttributeRule("*", "*", "auto", "", 0, 2), ] def test_first_rule_match(self) -> None: from muse.cli.commands.attributes import _resolve_with_index strategy, idx = _resolve_with_index(self._rules(), "build/foo.bin", "*") assert strategy == "ours" assert idx == 0 def test_second_rule_match(self) -> None: from muse.cli.commands.attributes import _resolve_with_index strategy, idx = _resolve_with_index(self._rules(), "README.md", "*") assert strategy == "theirs" assert idx == 1 def test_fallthrough_to_wildcard(self) -> None: from muse.cli.commands.attributes import _resolve_with_index strategy, idx = _resolve_with_index(self._rules(), "src/main.py", "*") assert strategy == "auto" assert idx == 2 def test_no_match_returns_auto_neg1(self) -> None: from muse.cli.commands.attributes import _resolve_with_index rules: list[AttributeRule] = [ AttributeRule("build/*", "notes", "ours", "", 0, 0), ] strategy, idx = _resolve_with_index(rules, "src/main.py", "notes") assert strategy == "auto" assert idx == -1 def test_dimension_filter_respected(self) -> None: from muse.cli.commands.attributes import _resolve_with_index rules: list[AttributeRule] = [ AttributeRule("*.mid", "pitch_bend", "manual", "", 0, 0), AttributeRule("*.mid", "*", "auto", "", 0, 1), ] strategy, idx = _resolve_with_index(rules, "track.mid", "notes") assert strategy == "auto" assert idx == 1 def test_empty_rules_returns_auto(self) -> None: from muse.cli.commands.attributes import _resolve_with_index strategy, idx = _resolve_with_index([], "anything.mid", "*") assert strategy == "auto" assert idx == -1 # --------------------------------------------------------------------------- # Unit — _rule_to_json # --------------------------------------------------------------------------- class TestRuleToJson: def test_all_fields_present(self) -> None: from muse.cli.commands.attributes import _rule_to_json rule = AttributeRule("drums/*", "*", "ours", "Drums are ours", 10, 3) j = _rule_to_json(rule) assert j["path_pattern"] == "drums/*" assert j["dimension"] == "*" assert j["strategy"] == "ours" assert j["comment"] == "Drums are ours" assert j["priority"] == 10 assert j["source_index"] == 3 def test_defaults_preserved(self) -> None: from muse.cli.commands.attributes import _rule_to_json rule = AttributeRule("*", "*", "auto") j = _rule_to_json(rule) assert j["comment"] == "" assert j["priority"] == 0 assert j["source_index"] == 0 # --------------------------------------------------------------------------- # Unit — load_attributes_full single-pass # --------------------------------------------------------------------------- class TestLoadAttributesFull: def test_missing_file_returns_empty_meta_and_rules( self, tmp_path: pathlib.Path ) -> None: meta, rules = load_attributes_full(tmp_path) assert meta == {} assert rules == [] def test_returns_meta_and_rules_together(self, tmp_path: pathlib.Path) -> None: _write_attrs(tmp_path, _SIMPLE_ATTRS) meta, rules = load_attributes_full(tmp_path) assert meta.get("domain") == "code" assert len(rules) == 3 def test_priority_sort_applied(self, tmp_path: pathlib.Path) -> None: _write_attrs(tmp_path, _SIMPLE_ATTRS) _, rules = load_attributes_full(tmp_path) priorities = [r.priority for r in rules] assert priorities == sorted(priorities, reverse=True) def test_invalid_strategy_raises_value_error( self, tmp_path: pathlib.Path ) -> None: _write_attrs( tmp_path, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "bogus"\n', ) with pytest.raises(ValueError, match="unknown strategy"): load_attributes_full(tmp_path) def test_file_too_large_raises_value_error( self, tmp_path: pathlib.Path ) -> None: giant = tmp_path / ".museattributes" giant.write_bytes(b"# " + b"x" * (_MAX_ATTRIBUTES_BYTES + 1)) with pytest.raises(ValueError, match="file too large"): load_attributes_full(tmp_path) def test_bad_toml_raises_value_error(self, tmp_path: pathlib.Path) -> None: _write_attrs(tmp_path, "[[rules]]\npath = << None: """load_attributes_full must invoke _parse_raw exactly once.""" _write_attrs(tmp_path, _SIMPLE_ATTRS) call_count = 0 from muse.core import attributes as attrs_mod original = attrs_mod._parse_raw def counting_parse(root: pathlib.Path) -> attrs_mod.MuseAttributesFile: nonlocal call_count call_count += 1 return original(root) with patch.object(attrs_mod, "_parse_raw", counting_parse): load_attributes_full(tmp_path) assert call_count == 1, f"Expected 1 parse, got {call_count}" # --------------------------------------------------------------------------- # Integration — muse attributes list # --------------------------------------------------------------------------- class TestRunList: def test_missing_file_exits_zero_text_to_stderr( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "list") assert result.exit_code == 0 assert "No .museattributes" in result.stderr def test_empty_file_exits_zero_no_rules_message( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "") result = _invoke(repo, "list") assert result.exit_code == 0 assert "no rules" in result.stderr.lower() or "empty" in result.stderr.lower() def test_table_shows_all_three_rules(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert result.exit_code == 0 assert "build/*" in result.output assert "*.md" in result.output def test_table_shows_domain(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "Domain: code" in result.output def test_table_shows_comment_column(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "Build artifacts prefer ours." in result.output def test_table_shows_priority_column(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "10" in result.output # priority for build/* rule def test_table_shows_pri_header(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "Pri" in result.output def test_invalid_strategy_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "bogus"\n', ) result = _invoke(repo, "list") assert result.exit_code != 0 def test_bad_toml_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "list") assert result.exit_code != 0 def test_bad_toml_no_traceback(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "list") assert "Traceback" not in result.output assert "Traceback" not in (result.output or "") class TestRunListJson: def _parse(self, result: InvokeResult) -> "_ListJson": from muse.cli.commands.attributes import _ListJson, _RuleJson start = next( i for i, l in enumerate(result.output.splitlines()) if l.strip().startswith("{") ) blob = "\n".join(result.output.splitlines()[start:]) depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break raw = json.loads(blob[:end]) assert isinstance(raw, dict) domain = raw.get("domain", "") assert isinstance(domain, str) rules_raw = raw.get("rules", []) assert isinstance(rules_raw, list) rules: list[_RuleJson] = [] for r in rules_raw: assert isinstance(r, dict) rules.append( _RuleJson( path_pattern=str(r.get("path_pattern", "")), dimension=str(r.get("dimension", "")), strategy=str(r.get("strategy", "")), comment=str(r.get("comment", "")), priority=int(r.get("priority", 0)), source_index=int(r.get("source_index", 0)), ) ) return _ListJson(domain=domain, rules=rules) def test_json_schema_domain_always_present( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = self._parse(result) assert "domain" in data assert data["domain"] == "code" def test_json_domain_empty_string_when_no_meta( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "auto"\n', ) result = _invoke(repo, "list", "--json") data = self._parse(result) assert data["domain"] == "" def test_json_rules_is_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = self._parse(result) assert isinstance(data["rules"], list) assert len(data["rules"]) == 3 def test_json_rule_has_all_fields(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = self._parse(result) rule = data["rules"][0] for field in ("path_pattern", "dimension", "strategy", "comment", "priority", "source_index"): assert field in rule, f"Missing field: {field}" def test_json_includes_comment(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = self._parse(result) comments = [r["comment"] for r in data["rules"]] assert any("Build artifacts" in c for c in comments) def test_json_includes_priority(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = self._parse(result) priorities = [r["priority"] for r in data["rules"]] assert 10 in priorities def test_json_missing_file_exits_zero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 def test_json_missing_file_has_empty_rules( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "list", "--json") data = self._parse(result) assert data["rules"] == [] assert data["domain"] == "" # --------------------------------------------------------------------------- # Integration — muse attributes check # --------------------------------------------------------------------------- class TestRunCheck: def _parse_check(self, result: InvokeResult) -> "_CheckJson": from muse.cli.commands.attributes import _CheckJson, _CheckResultJson start = result.output.index("{") blob = result.output[start:] depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break raw = json.loads(blob[:end]) assert isinstance(raw, dict) results_raw = raw.get("results", []) assert isinstance(results_raw, list) results: list[_CheckResultJson] = [] for item in results_raw: assert isinstance(item, dict) results.append( _CheckResultJson( path=str(item.get("path", "")), dimension=str(item.get("dimension", "")), strategy=str(item.get("strategy", "")), rule_index=int(item.get("rule_index", -1)), ) ) return _CheckJson(results=results) def test_check_resolves_single_path(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o") assert result.exit_code == 0 assert "ours" in result.output def test_check_shows_rule_index(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o") assert "rule #" in result.output def test_check_unmatched_shows_default(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "build/*"\ndimension = "*"\nstrategy = "ours"\n', ) result = _invoke(repo, "check", "src/main.py") assert result.exit_code == 0 assert "auto" in result.output assert "default" in result.output def test_check_multiple_paths(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/x", "README.md", "src/a.py") assert result.exit_code == 0 assert "build/x" in result.output assert "README.md" in result.output assert "src/a.py" in result.output def test_check_dimension_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = ( '[[rules]]\npath = "*.mid"\ndimension = "pitch_bend"\n' 'strategy = "manual"\n\n' '[[rules]]\npath = "*.mid"\ndimension = "*"\nstrategy = "auto"\n' ) _write_attrs(repo, content) result = _invoke(repo, "check", "track.mid", "--dimension", "pitch_bend") assert result.exit_code == 0 assert "manual" in result.output def test_check_dimension_star_matches_any( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/x", "--dimension", "notes") assert "ours" in result.output def test_check_json_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") assert result.exit_code == 0 data = self._parse_check(result) assert "results" in data item = data["results"][0] for field in ("path", "dimension", "strategy", "rule_index"): assert field in item, f"Missing: {field}" def test_check_json_rule_index_positive_on_match( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") data = self._parse_check(result) assert data["results"][0]["rule_index"] >= 0 def test_check_json_rule_index_neg1_on_no_match( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "build/*"\ndimension = "*"\nstrategy = "ours"\n', ) result = _invoke(repo, "check", "src/a.py", "--json") data = self._parse_check(result) assert data["results"][0]["rule_index"] == -1 def test_check_invalid_strategy_in_file_exits_nonzero( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "bogus"\n', ) result = _invoke(repo, "check", "foo.mid") assert result.exit_code != 0 def test_check_no_file_exits_zero_no_match( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "check", "any/path.mid") assert result.exit_code == 0 assert "auto" in result.output # --------------------------------------------------------------------------- # Integration — muse attributes validate # --------------------------------------------------------------------------- class TestRunValidate: def _parse_validate(self, result: InvokeResult) -> "_ValidateJson": from muse.cli.commands.attributes import _ValidateErrorJson, _ValidateJson start = result.output.index("{") blob = result.output[start:] depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break raw = json.loads(blob[:end]) assert isinstance(raw, dict) valid_val = raw.get("valid", False) assert isinstance(valid_val, bool) errors_raw = raw.get("errors", []) assert isinstance(errors_raw, list) errors: list[_ValidateErrorJson] = [] for e in errors_raw: assert isinstance(e, dict) errors.append( _ValidateErrorJson( kind=str(e.get("kind", "")), message=str(e.get("message", "")), ) ) return _ValidateJson(valid=valid_val, errors=errors) def test_valid_file_exits_zero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate") assert result.exit_code == 0 def test_valid_file_shows_success_message( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate") assert "valid" in result.output.lower() or "✅" in result.output def test_valid_file_shows_rule_count(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate") assert "3 rule" in result.output def test_missing_file_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "validate") assert result.exit_code != 0 def test_bad_strategy_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "zap"\n', ) result = _invoke(repo, "validate") assert result.exit_code != 0 def test_bad_toml_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "validate") assert result.exit_code != 0 def test_bad_toml_no_traceback(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "validate") assert "Traceback" not in result.output def test_json_valid_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") assert result.exit_code == 0 data = self._parse_validate(result) assert data["valid"] is True assert data["errors"] == [] def test_json_invalid_has_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "oops"\n', ) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = self._parse_validate(result) assert data["valid"] is False assert len(data["errors"]) > 0 def test_json_missing_file_error_kind(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = self._parse_validate(result) assert data["errors"][0]["kind"] == "missing" def test_json_semantic_error_kind(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "zap"\n', ) result = _invoke(repo, "validate", "--json") data = self._parse_validate(result) assert data["errors"][0]["kind"] == "semantic" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestAttributesSecurity: def test_ansi_in_domain_stripped_from_text_output( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, f'[meta]\ndomain = "{_ANSI}"\n' '[[rules]]\npath="*"\ndimension="*"\nstrategy="auto"\n', ) result = _invoke(repo, "list") assert "\x1b[" not in result.output def test_ansi_in_path_pattern_stripped(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, f'[[rules]]\npath = "{_ANSI}"\ndimension = "*"\nstrategy = "auto"\n', ) result = _invoke(repo, "list") assert "\x1b[" not in result.output def test_ansi_in_comment_stripped(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, f'[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "auto"\n' f'comment = "{_ANSI}"\n', ) result = _invoke(repo, "list") assert "\x1b[" not in result.output def test_ansi_in_check_path_stripped_in_output( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) ansi_path = f"{_ANSI}/foo.mid" result = _invoke(repo, "check", ansi_path) assert result.exit_code == 0 assert "\x1b[" not in result.output def test_null_byte_in_check_path_exits_user_error( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "foo\x00bar") assert result.exit_code == ExitCode.USER_ERROR.value def test_null_byte_rejected_even_without_attrs_file( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "check", "foo\x00bar") assert result.exit_code == ExitCode.USER_ERROR.value def test_error_messages_to_stderr_not_only_stdout( self, tmp_path: pathlib.Path ) -> None: """Errors for bad TOML must not produce raw tracebacks.""" repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "list") assert result.exit_code != 0 assert "Traceback" not in result.output def test_oversized_file_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) giant = repo / ".museattributes" giant.write_bytes(b"# " + b"x" * (_MAX_ATTRIBUTES_BYTES + 1)) result = _invoke(repo, "list") assert result.exit_code != 0 def test_oversized_file_no_oom(self, tmp_path: pathlib.Path) -> None: """Validate the size cap fires before TOML parsing.""" repo = _make_repo(tmp_path) giant = repo / ".museattributes" giant.write_bytes(b"# " + b"x" * (_MAX_ATTRIBUTES_BYTES + 1)) result = _invoke(repo, "validate") assert result.exit_code != 0 assert "Traceback" not in result.output def test_json_output_on_stdout_error_on_stderr_list( self, tmp_path: pathlib.Path ) -> None: """When --json used for list, valid JSON goes to stdout.""" repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 first_json_line = next( (l for l in result.output.splitlines() if l.strip().startswith("{")), None, ) assert first_json_line is not None, "No JSON on stdout" # --------------------------------------------------------------------------- # E2E — full CLI invocation with CliRunner # --------------------------------------------------------------------------- class TestE2E: def test_list_subcommand_is_required_without_subcommand( self, tmp_path: pathlib.Path ) -> None: """muse attributes (no subcommand) should exit non-zero (subcommand required).""" repo = _make_repo(tmp_path) result = runner.invoke( cli, ["attributes"], env={"MUSE_REPO_ROOT": str(repo)}, ) # subcommand is required — argparse exits 2 assert result.exit_code != 0 def test_list_json_round_trips(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 raw = next( i for i, l in enumerate(result.output.splitlines()) if l.strip().startswith("{") ) blob = "\n".join(result.output.splitlines()[raw:]) depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break data = json.loads(blob[:end]) assert data["domain"] == "code" assert len(data["rules"]) == 3 def test_check_json_round_trips(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/x", "--json") assert result.exit_code == 0 start = result.output.index("{") blob = result.output[start:] depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break data = json.loads(blob[:end]) assert data["results"][0]["strategy"] == "ours" def test_validate_exits_0_valid_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate") assert result.exit_code == 0 def test_validate_exits_nonzero_missing_file( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "validate") assert result.exit_code != 0 def test_help_list_available(self, tmp_path: pathlib.Path) -> None: result = runner.invoke(cli, ["attributes", "--help"]) assert result.exit_code == 0 assert "list" in result.output assert "check" in result.output assert "validate" in result.output def test_all_valid_strategies_accepted(self, tmp_path: pathlib.Path) -> None: from muse.core.attributes import VALID_STRATEGIES repo = _make_repo(tmp_path) for strategy in sorted(VALID_STRATEGIES): attrs = ( f'[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "{strategy}"\n' ) _write_attrs(repo, attrs) result = _invoke(repo, "validate") assert result.exit_code == 0, f"Strategy {strategy!r} should be valid" def test_check_text_format_includes_path_and_strategy( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "README.md") assert "README.md" in result.output assert "theirs" in result.output # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_1000_paths_resolved_efficiently( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) paths = [f"build/file_{i}.o" for i in range(1000)] result = _invoke(repo, "check", *paths) assert result.exit_code == 0 # All 1 000 paths should resolve to "ours" ours_count = result.output.count("ours") assert ours_count == 1000 def test_200_rules_loads_correctly(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) rules_toml = "".join( f'[[rules]]\npath = "dir{i}/*"\ndimension = "*"\nstrategy = "auto"\n\n' for i in range(200) ) _write_attrs(repo, rules_toml) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 start = result.output.index("{") blob = result.output[start:] depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break data = json.loads(blob[:end]) assert len(data["rules"]) == 200 def test_concurrent_load_isolated_repos( self, tmp_path: pathlib.Path ) -> None: """Eight threads each call load_attributes_full on isolated repos. Tests the core parsing layer for thread-safety without going through the CliRunner (whose env patching is not thread-safe). """ errors: list[str] = [] def worker(idx: int) -> None: try: repo_dir = tmp_path / f"repo{idx}" repo_dir.mkdir(parents=True, exist_ok=True) _write_attrs(repo_dir, _SIMPLE_ATTRS) meta, rules = load_attributes_full(repo_dir) if len(rules) != 3: errors.append(f"Thread {idx}: got {len(rules)} rules, want 3") if meta.get("domain") != "code": errors.append(f"Thread {idx}: wrong domain {meta.get('domain')!r}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent failures: {errors}" def test_concurrent_resolve_with_index(self, tmp_path: pathlib.Path) -> None: """Eight threads each call _resolve_with_index on independent rule lists. Tests the resolution helper for thread-safety without shared state. """ from muse.core.attributes import AttributeRule from muse.cli.commands.attributes import _resolve_with_index errors: list[str] = [] rules = [ AttributeRule("build/*", "*", "ours", "", 10, 0), AttributeRule("*.md", "*", "theirs", "", 5, 1), AttributeRule("*", "*", "auto", "", 0, 2), ] def worker(idx: int) -> None: try: # Each thread has its own path — no shared mutable state path = f"build/file_{idx}.o" strategy, rule_idx = _resolve_with_index(rules, path, "*") if strategy != "ours": errors.append(f"Thread {idx}: got {strategy!r}, want 'ours'") if rule_idx != 0: errors.append(f"Thread {idx}: rule_index {rule_idx}, want 0") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Extended — muse attributes list (deeper coverage) # --------------------------------------------------------------------------- class TestRunListExtended: def test_help_mentions_json_flag(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, "list", "--help") assert "--json" in result.output or "-j" in result.output def test_j_alias_works(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "-j") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "domain" in data assert "rules" in data def test_json_is_valid_and_parseable(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "domain" in data and "rules" in data and "rule_count" in data def test_json_parses_cleanly(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output.strip()) assert isinstance(data, dict) def test_json_rule_count_matches_attrs(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output.strip()) assert len(data["rules"]) == 3 def test_json_priority_is_int(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output.strip()) for rule in data["rules"]: assert isinstance(rule["priority"], int) def test_json_source_index_is_int(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output.strip()) for rule in data["rules"]: assert isinstance(rule["source_index"], int) def test_json_source_index_sequential(self, tmp_path: pathlib.Path) -> None: """source_index values form a complete 0..N-1 set.""" repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output.strip()) indices = sorted(r["source_index"] for r in data["rules"]) assert indices == list(range(len(data["rules"]))) def test_json_strategy_strings_valid(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output.strip()) from muse.core.attributes import VALID_STRATEGIES for rule in data["rules"]: assert rule["strategy"] in VALID_STRATEGIES def test_text_header_always_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "Path pattern" in result.output assert "Strategy" in result.output def test_text_separator_row_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "---" in result.output def test_text_no_comment_column_when_no_comments(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "auto"\n', ) result = _invoke(repo, "list") assert result.exit_code == 0 assert "Comment" not in result.output def test_text_comment_column_present_when_any_comment(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "Comment" in result.output def test_json_missing_file_exits_zero_empty_rules(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["rules"] == [] assert data["domain"] == "" def test_text_missing_file_message_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "list") assert result.exit_code == 0 assert ".museattributes" in result.stderr def test_json_single_rule_roundtrip(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "src/*.py"\ndimension = "code"\nstrategy = "ours"\ncomment = "Python wins"\npriority = 7\n', ) result = _invoke(repo, "list", "-j") assert result.exit_code == 0 data = json.loads(result.output.strip()) rule = data["rules"][0] assert rule["path_pattern"] == "src/*.py" assert rule["dimension"] == "code" assert rule["strategy"] == "ours" assert rule["comment"] == "Python wins" assert rule["priority"] == 7 def test_help_shows_exit_codes(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, "list", "--help") assert "Exit code" in result.output or "exit code" in result.output def test_domain_in_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list") assert "Domain: code" in result.output # --------------------------------------------------------------------------- # Security — muse attributes list # --------------------------------------------------------------------------- class TestRunListSecurity: def test_ansi_in_domain_stripped_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, f'[meta]\ndomain = "{_ANSI}"\n\n[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "auto"\n', ) result = _invoke(repo, "list") assert "\x1b[" not in result.output def test_ansi_in_path_pattern_stripped_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ansi_path = _ANSI + "/*" _write_attrs( repo, f'[[rules]]\npath = "{ansi_path}"\ndimension = "*"\nstrategy = "auto"\n', ) result = _invoke(repo, "list") assert "\x1b[" not in result.output def test_ansi_in_comment_stripped_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, f'[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "auto"\ncomment = "{_ANSI}"\n', ) result = _invoke(repo, "list") assert "\x1b[" not in result.output def test_unicode_comment_preserved_verbatim_in_json(self, tmp_path: pathlib.Path) -> None: """Unicode in comments is preserved verbatim in JSON (sanitization is text-only). TOML rejects control characters (including ANSI ESC \x1b) as illegal, so only valid Unicode can appear in TOML-sourced comments. """ repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "auto"\ncomment = "caf\u00e9 note"\n', ) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 data = json.loads(result.output.strip()) comments = [r["comment"] for r in data["rules"]] assert any("café" in c for c in comments) def test_oversized_file_exits_nonzero_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) from muse.core.attributes import _MAX_ATTRIBUTES_BYTES (repo / ".museattributes").write_bytes(b"x" * (_MAX_ATTRIBUTES_BYTES + 1)) result = _invoke(repo, "list") assert result.exit_code != 0 def test_json_stdout_only_no_stray_text(self, tmp_path: pathlib.Path) -> None: """In JSON mode, stdout must be valid JSON and nothing else.""" repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 json.loads(result.output.strip()) # --------------------------------------------------------------------------- # Stress — muse attributes list # --------------------------------------------------------------------------- class TestRunListStress: def _make_attrs_with_n_rules(self, n: int) -> str: lines = ['[meta]\ndomain = "stress"\n'] for i in range(n): lines.append( f'[[rules]]\npath = "dir_{i}/*.bin"\ndimension = "*"\n' f'strategy = "ours"\ncomment = "rule {i}"\npriority = {i}\n' ) return "\n".join(lines) def test_500_rules_text_renders(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, self._make_attrs_with_n_rules(500)) result = _invoke(repo, "list") assert result.exit_code == 0 assert "dir_0" in result.output def test_500_rules_json_serializes(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, self._make_attrs_with_n_rules(500)) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert len(data["rules"]) == 500 def test_concurrent_list_isolated_repos(self, tmp_path: pathlib.Path) -> None: """Ten threads each call load_attributes_full on isolated repos — no shared state. Uses the core layer directly to avoid CliRunner stdout-interleaving. """ errors: list[str] = [] def worker(idx: int) -> None: try: repo = _make_repo(tmp_path / f"repo_{idx}") _write_attrs(repo, _SIMPLE_ATTRS) meta, rules = load_attributes_full(repo) if len(rules) != 3: errors.append(f"Thread {idx}: expected 3 rules, got {len(rules)}") if meta.get("domain") != "code": errors.append(f"Thread {idx}: wrong domain {meta.get('domain')!r}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Extended — muse attributes check (deeper coverage) # --------------------------------------------------------------------------- class TestRunCheckExtended: def test_j_alias_works(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "-j") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert "results" in data def test_help_mentions_json_flag(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, "check", "--help") assert "--json" in result.output or "-j" in result.output def test_help_shows_exit_codes(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, "check", "--help") assert "Exit code" in result.output or "exit code" in result.output def test_json_is_valid_and_parseable(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "results" in data and isinstance(data["results"], list) def test_json_parses_cleanly(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") data = json.loads(result.output.strip()) assert isinstance(data, dict) def test_json_result_count_matches_path_count(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/x", "README.md", "src/a.py", "--json") data = json.loads(result.output.strip()) assert len(data["results"]) == 3 def test_json_rule_index_is_int(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") data = json.loads(result.output.strip()) assert isinstance(data["results"][0]["rule_index"], int) def test_json_all_four_fields_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") data = json.loads(result.output.strip()) item = data["results"][0] for field in ("path", "dimension", "strategy", "rule_index"): assert field in item, f"Missing field: {field}" def test_json_dimension_echoed_from_arg(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--dimension", "notes", "--json") data = json.loads(result.output.strip()) assert data["results"][0]["dimension"] == "notes" def test_json_path_echoed_verbatim(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") data = json.loads(result.output.strip()) assert data["results"][0]["path"] == "build/foo.o" def test_json_no_match_rule_index_neg1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "build/*"\ndimension = "*"\nstrategy = "ours"\n', ) result = _invoke(repo, "check", "src/unmatched.py", "--json") data = json.loads(result.output.strip()) assert data["results"][0]["rule_index"] == -1 assert data["results"][0]["strategy"] == "auto" def test_json_no_file_exits_zero_returns_auto(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "check", "any/path.mid", "--json") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["results"][0]["strategy"] == "auto" assert data["results"][0]["rule_index"] == -1 def test_d_alias_for_dimension(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = ( '[[rules]]\npath = "*.mid"\ndimension = "pitch_bend"\n' 'strategy = "manual"\n' ) _write_attrs(repo, content) result = _invoke(repo, "check", "track.mid", "-d", "pitch_bend", "--json") data = json.loads(result.output.strip()) assert data["results"][0]["strategy"] == "manual" def test_text_format_path_colon_strategy(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o") assert result.exit_code == 0 assert "build/foo.o:" in result.output assert "ours" in result.output def test_text_no_match_shows_default(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "check", "unmatched.txt") assert "default" in result.output def test_text_match_shows_rule_number(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o") assert "rule #" in result.output def test_invalid_strategy_exits_nonzero_json_mode(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "bogus"\n', ) result = _invoke(repo, "check", "foo.mid", "--json") assert result.exit_code != 0 def test_multiple_paths_results_in_order(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) paths = ["build/a.o", "README.md", "src/b.py"] result = _invoke(repo, "check", *paths, "--json") data = json.loads(result.output.strip()) assert [r["path"] for r in data["results"]] == paths # --------------------------------------------------------------------------- # Security — muse attributes check # --------------------------------------------------------------------------- class TestRunCheckSecurity: def test_null_byte_in_path_exits_user_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "path\x00malicious") assert result.exit_code == ExitCode.USER_ERROR.value def test_null_byte_in_json_mode_exits_user_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "check", "path\x00malicious", "--json") assert result.exit_code == ExitCode.USER_ERROR.value def test_null_byte_error_to_stderr_not_stdout(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "check", "path\x00malicious") # CliRunner merges streams; error should mention null byte assert "null byte" in result.stderr or "null" in result.stderr def test_ansi_in_caller_path_stripped_text(self, tmp_path: pathlib.Path) -> None: """Caller-supplied path with ANSI is sanitized in text output.""" repo = _make_repo(tmp_path) result = _invoke(repo, "check", f"{_ANSI}/file.py") assert result.exit_code == 0 assert "\x1b[" not in result.output def test_multiple_paths_null_byte_in_second_exits_error(self, tmp_path: pathlib.Path) -> None: """Null byte anywhere in the path list aborts processing.""" repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/ok.o", "bad\x00path") assert result.exit_code == ExitCode.USER_ERROR.value def test_json_stdout_only_no_stray_text(self, tmp_path: pathlib.Path) -> None: """In JSON mode, stdout must be valid JSON and nothing else.""" repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") assert result.exit_code == 0 json.loads(result.output.strip()) # --------------------------------------------------------------------------- # Stress — muse attributes check # --------------------------------------------------------------------------- class TestRunCheckStress: def _make_many_rules(self, n: int) -> str: lines: list[str] = [] for i in range(n): lines.append( f'[[rules]]\npath = "dir_{i}/*.bin"\ndimension = "*"\n' f'strategy = "ours"\npriority = {i}\n' ) return "\n".join(lines) def test_1000_paths_against_500_rules_core(self, tmp_path: pathlib.Path) -> None: """1000 path resolutions against 500 rules — core layer, no CLI overhead.""" repo = _make_repo(tmp_path) _write_attrs(repo, self._make_many_rules(500)) _, rules = load_attributes_full(repo) from muse.cli.commands.attributes import _resolve_with_index for i in range(1000): strategy, _ = _resolve_with_index(rules, f"dir_{i % 500}/file.bin", "*") assert strategy == "ours" def test_500_paths_json_serializes(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) paths = [f"build/file_{i}.o" for i in range(500)] result = _invoke(repo, "check", *paths, "--json") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert len(data["results"]) == 500 def test_concurrent_check_core_no_shared_state(self, tmp_path: pathlib.Path) -> None: """Eight threads resolve paths concurrently using the core layer.""" from muse.cli.commands.attributes import _resolve_with_index errors: list[str] = [] repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) _, rules = load_attributes_full(repo) def worker(idx: int) -> None: try: strategy, rule_idx = _resolve_with_index(rules, f"build/file_{idx}.o", "*") if strategy != "ours": errors.append(f"Thread {idx}: expected 'ours', got {strategy!r}") if rule_idx < 0: errors.append(f"Thread {idx}: expected rule_index >= 0, got {rule_idx}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Extended — muse attributes validate (deeper coverage) # --------------------------------------------------------------------------- class TestRunValidateExtended: def test_j_alias_works_valid_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "-j") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["valid"] is True def test_help_mentions_json_flag(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, "validate", "--help") assert "--json" in result.output or "-j" in result.output def test_help_shows_exit_codes(self, tmp_path: pathlib.Path) -> None: result = _invoke(tmp_path, "validate", "--help") assert "Exit code" in result.output or "exit code" in result.output def test_json_is_valid_and_parseable(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "valid" in data and "rule_count" in data and "errors" in data def test_json_valid_has_true_and_empty_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") data = json.loads(result.output.strip()) assert data["valid"] is True assert data["errors"] == [] def test_json_invalid_has_false_and_nonempty_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "bogus"\n', ) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = json.loads(result.output.strip()) assert data["valid"] is False assert len(data["errors"]) > 0 def test_json_missing_file_kind_is_missing(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = json.loads(result.output.strip()) assert data["errors"][0]["kind"] == "missing" def test_json_bad_strategy_kind_is_semantic(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "zap"\n', ) result = _invoke(repo, "validate", "--json") data = json.loads(result.output.strip()) assert data["errors"][0]["kind"] == "semantic" def test_json_bad_toml_kind_is_semantic(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "validate", "--json") data = json.loads(result.output.strip()) assert data["errors"][0]["kind"] == "semantic" def test_json_errors_is_always_array(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") data = json.loads(result.output.strip()) assert isinstance(data["errors"], list) def test_json_valid_is_bool(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") data = json.loads(result.output.strip()) assert isinstance(data["valid"], bool) def test_json_error_message_is_string(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "validate", "--json") data = json.loads(result.output.strip()) assert isinstance(data["errors"][0]["message"], str) assert len(data["errors"][0]["message"]) > 0 def test_text_success_shows_rule_count(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate") assert "3 rule" in result.output def test_text_success_shows_domain(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate") assert "code" in result.output def test_text_no_traceback_on_bad_toml(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "validate") assert "Traceback" not in result.output def test_json_exit_zero_on_valid(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") assert result.exit_code == 0 def test_json_exit_nonzero_on_invalid(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs( repo, '[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "bad"\n', ) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 def test_empty_rules_file_exits_zero(self, tmp_path: pathlib.Path) -> None: """A .museattributes with no rules but valid TOML is still valid.""" repo = _make_repo(tmp_path) _write_attrs(repo, '[meta]\ndomain = "x"\n') result = _invoke(repo, "validate") assert result.exit_code == 0 # --------------------------------------------------------------------------- # Security — muse attributes validate # --------------------------------------------------------------------------- class TestRunValidateSecurity: def test_ansi_in_domain_stripped_text_success(self, tmp_path: pathlib.Path) -> None: """Domain with ANSI sequences is sanitized in the success message.""" repo = _make_repo(tmp_path) _write_attrs( repo, f'[meta]\ndomain = "safe"\n\n[[rules]]\npath = "*"\ndimension = "*"\nstrategy = "auto"\n', ) result = _invoke(repo, "validate") assert result.exit_code == 0 assert "\x1b[" not in result.output def test_oversized_file_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) from muse.core.attributes import _MAX_ATTRIBUTES_BYTES (repo / ".museattributes").write_bytes(b"x" * (_MAX_ATTRIBUTES_BYTES + 1)) result = _invoke(repo, "validate") assert result.exit_code != 0 def test_oversized_file_json_valid_false(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) from muse.core.attributes import _MAX_ATTRIBUTES_BYTES (repo / ".museattributes").write_bytes(b"x" * (_MAX_ATTRIBUTES_BYTES + 1)) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = json.loads(result.output.strip()) assert data["valid"] is False def test_bad_toml_no_traceback_json_mode(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[[broken\n") result = _invoke(repo, "validate", "--json") assert "Traceback" not in result.output def test_json_stdout_only_on_success(self, tmp_path: pathlib.Path) -> None: """In JSON mode on success, stdout is exactly one valid JSON line.""" repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") assert result.exit_code == 0 json.loads(result.output.strip()) def test_json_stdout_only_on_failure(self, tmp_path: pathlib.Path) -> None: """In JSON mode on failure, stdout is exactly one valid JSON line.""" repo = _make_repo(tmp_path) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 json.loads(result.output.strip()) # --------------------------------------------------------------------------- # Stress — muse attributes validate # --------------------------------------------------------------------------- class TestRunValidateStress: def _make_attrs_with_n_rules(self, n: int) -> str: lines = ['[meta]\ndomain = "stress"\n'] for i in range(n): lines.append( f'[[rules]]\npath = "dir_{i}/*.bin"\ndimension = "*"\n' f'strategy = "ours"\npriority = {i}\n' ) return "\n".join(lines) def test_500_rule_file_validates_successfully(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, self._make_attrs_with_n_rules(500)) result = _invoke(repo, "validate", "--json") assert result.exit_code == 0 data = json.loads(result.output.strip()) assert data["valid"] is True def test_file_with_one_bad_rule_among_200_reports_error(self, tmp_path: pathlib.Path) -> None: content = self._make_attrs_with_n_rules(200) content += '\n[[rules]]\npath = "bad/*"\ndimension = "*"\nstrategy = "INVALID"\n' repo = _make_repo(tmp_path) _write_attrs(repo, content) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = json.loads(result.output.strip()) assert data["valid"] is False def test_concurrent_validate_core_no_shared_state(self, tmp_path: pathlib.Path) -> None: """Eight threads validate isolated repos via the core layer.""" errors: list[str] = [] def worker(idx: int) -> None: try: repo = _make_repo(tmp_path / f"repo_{idx}") _write_attrs(repo, _SIMPLE_ATTRS) meta, rules = load_attributes_full(repo) if len(rules) != 3: errors.append(f"Thread {idx}: got {len(rules)} rules, want 3") if not meta.get("domain"): errors.append(f"Thread {idx}: missing domain") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # New-flag tests — list --strategy, list --dimension, list rule_count, # check --match-required, validate rule_count, indent=2 # --------------------------------------------------------------------------- _MIXED_ATTRS = """\ [meta] domain = "midi" [[rules]] path = "drums/*" dimension = "notes" strategy = "ours" comment = "Drum notes stay ours." priority = 20 [[rules]] path = "keys/*" dimension = "pitch_bend" strategy = "theirs" comment = "Keys pitch-bend from remote." priority = 15 [[rules]] path = "stems/*" dimension = "notes" strategy = "union" comment = "Union note additions." priority = 10 [[rules]] path = "*" dimension = "*" strategy = "auto" comment = "" priority = 0 """ class TestListStrategyFilter: def test_strategy_filter_returns_only_matching(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--strategy", "ours", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert all(r["strategy"] == "ours" for r in data["rules"]) assert data["rule_count"] == 1 def test_strategy_filter_excludes_others(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--strategy", "theirs", "--json") data = json.loads(result.output) for r in data["rules"]: assert r["strategy"] == "theirs" def test_strategy_filter_no_match_returns_empty(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--strategy", "manual", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["rules"] == [] assert data["rule_count"] == 0 def test_strategy_filter_text_mode(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--strategy", "union") assert result.exit_code == 0 assert "union" in result.output assert "theirs" not in result.output def test_strategy_short_flag(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "-s", "ours", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["rule_count"] == 1 class TestListDimensionFilter: def test_dimension_filter_returns_only_matching(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--dimension", "notes", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert all(r["dimension"] == "notes" for r in data["rules"]) assert data["rule_count"] == 2 def test_dimension_filter_no_match_empty(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--dimension", "reverb", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["rules"] == [] assert data["rule_count"] == 0 def test_strategy_and_dimension_combined(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--strategy", "ours", "--dimension", "notes", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["rule_count"] == 1 assert data["rules"][0]["strategy"] == "ours" assert data["rules"][0]["dimension"] == "notes" def test_combined_no_match(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--strategy", "ours", "--dimension", "pitch_bend", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["rule_count"] == 0 class TestListRuleCount: def test_rule_count_present_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output) assert "rule_count" in data def test_rule_count_matches_rules_length(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "list", "--json") data = json.loads(result.output) assert data["rule_count"] == len(data["rules"]) def test_rule_count_zero_on_empty_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "[meta]\ndomain = \"code\"\n") result = _invoke(repo, "list", "--json") data = json.loads(result.output) assert data["rule_count"] == 0 assert data["rules"] == [] class TestCheckMatchRequired: def test_match_required_exits_0_when_all_matched(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "check", "drums/kick.mid", "--match-required") assert result.exit_code == 0 def test_match_required_exits_1_when_unmatched(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) # Only rule: drums/* → ours. Check a path that won't match. attrs = "[meta]\ndomain=\"midi\"\n[[rules]]\npath=\"drums/*\"\ndimension=\"*\"\nstrategy=\"ours\"\n" _write_attrs(repo, attrs) result = _invoke(repo, "check", "untracked/file.mid", "--match-required") assert result.exit_code != 0 def test_match_required_error_on_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) attrs = "[meta]\ndomain=\"midi\"\n[[rules]]\npath=\"drums/*\"\ndimension=\"*\"\nstrategy=\"ours\"\n" _write_attrs(repo, attrs) result = _invoke(repo, "check", "other/file.mid", "--match-required") assert result.exit_code != 0 assert "❌" in result.stderr def test_match_required_json_still_prints_before_exit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) attrs = "[meta]\ndomain=\"midi\"\n[[rules]]\npath=\"drums/*\"\ndimension=\"*\"\nstrategy=\"ours\"\n" _write_attrs(repo, attrs) result = _invoke(repo, "check", "other/file.mid", "--match-required", "--json") assert result.exit_code != 0 # Error lands on stderr; JSON is on stdout. CliRunner merges them in # result.output, so split on the first closing brace to isolate JSON. assert "❌" in result.stderr json_part = result.output[: result.output.rfind("}") + 1] data = json.loads(json_part) assert "results" in data def test_match_required_with_mixed_paths(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) attrs = "[meta]\ndomain=\"midi\"\n[[rules]]\npath=\"drums/*\"\ndimension=\"*\"\nstrategy=\"ours\"\n" _write_attrs(repo, attrs) # One matched, one unmatched → exit 1 result = _invoke(repo, "check", "drums/kick.mid", "other/file.mid", "--match-required") assert result.exit_code != 0 def test_match_required_false_by_default(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) attrs = "[meta]\ndomain=\"midi\"\n[[rules]]\npath=\"drums/*\"\ndimension=\"*\"\nstrategy=\"ours\"\n" _write_attrs(repo, attrs) # Without --match-required, unmatched path is still exit 0 result = _invoke(repo, "check", "other/file.mid") assert result.exit_code == 0 class TestValidateRuleCount: def test_rule_count_present_on_success(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _MIXED_ATTRS) result = _invoke(repo, "validate", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "rule_count" in data assert data["rule_count"] == 4 def test_rule_count_zero_on_missing(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = json.loads(result.output) assert data["rule_count"] == 0 def test_rule_count_zero_on_parse_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, "not valid toml ][[[") result = _invoke(repo, "validate", "--json") assert result.exit_code != 0 data = json.loads(result.output) assert data["rule_count"] == 0 class TestJsonCompact: def test_list_json_valid(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "list", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "rules" in data def test_check_json_valid(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "check", "build/foo.o", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "results" in data def test_validate_json_valid(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_attrs(repo, _SIMPLE_ATTRS) result = _invoke(repo, "validate", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "valid" in data # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.attributes import register as _register_attributes def _parse_attrs(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_attributes(subs) return root_p.parse_args(["attributes", *args]) class TestRegisterFlags: # ── check subcommand ──────────────────────────────────────────────────── def test_check_default_json_out_is_false(self) -> None: ns = _parse_attrs("check", "src/foo.py") assert ns.json_out is False def test_check_json_flag_sets_json_out(self) -> None: ns = _parse_attrs("check", "src/foo.py", "--json") assert ns.json_out is True def test_check_j_shorthand_sets_json_out(self) -> None: ns = _parse_attrs("check", "src/foo.py", "-j") assert ns.json_out is True def test_check_dimension_default_is_star(self) -> None: ns = _parse_attrs("check", "src/foo.py") assert ns.dimension == "*" def test_check_dimension_flag(self) -> None: ns = _parse_attrs("check", "src/foo.py", "--dimension", "notes") assert ns.dimension == "notes" def test_check_d_shorthand(self) -> None: ns = _parse_attrs("check", "src/foo.py", "-d", "pitch_bend") assert ns.dimension == "pitch_bend" def test_check_match_required_default_false(self) -> None: ns = _parse_attrs("check", "src/foo.py") assert ns.match_required is False def test_check_match_required_flag(self) -> None: ns = _parse_attrs("check", "src/foo.py", "--match-required") assert ns.match_required is True # ── list subcommand ───────────────────────────────────────────────────── def test_list_default_json_out_is_false(self) -> None: ns = _parse_attrs("list") assert ns.json_out is False def test_list_json_flag_sets_json_out(self) -> None: ns = _parse_attrs("list", "--json") assert ns.json_out is True def test_list_j_shorthand_sets_json_out(self) -> None: ns = _parse_attrs("list", "-j") assert ns.json_out is True def test_list_strategy_default_none(self) -> None: ns = _parse_attrs("list") assert ns.strategy is None def test_list_strategy_flag(self) -> None: ns = _parse_attrs("list", "--strategy", "ours") assert ns.strategy == "ours" def test_list_dimension_default_none(self) -> None: ns = _parse_attrs("list") assert ns.dimension is None def test_list_dimension_flag(self) -> None: ns = _parse_attrs("list", "--dimension", "notes") assert ns.dimension == "notes" # ── validate subcommand ───────────────────────────────────────────────── def test_validate_default_json_out_is_false(self) -> None: ns = _parse_attrs("validate") assert ns.json_out is False def test_validate_json_flag_sets_json_out(self) -> None: ns = _parse_attrs("validate", "--json") assert ns.json_out is True def test_validate_j_shorthand_sets_json_out(self) -> None: ns = _parse_attrs("validate", "-j") assert ns.json_out is True