"""Comprehensive hardening tests for ``muse config``. Coverage -------- Unit — muse/cli/config.py internals - _escape: backslash, quote, newline, carriage-return, null-byte escaping - _validate_toml_key: unsafe characters rejected, safe keys pass - _dump_toml: limits section round-trips, domain key injection blocked, remote name injection blocked; [user] section is never emitted - set_config_value: limits namespace writes correctly, TOML key injection blocked, unknown limits key rejected, non-integer limits rejected, invalid shard_prefix_length rejected, user.* namespace raises ValueError - get_config_value: limits keys read back correctly after write - config_as_dict: limits section included in output; [user] section absent Integration — CLI commands via CliRunner - run_show: TOML text and JSON outputs, limits displayed in both formats, sanitize_display applied in text mode, --format json alias - run_get: bare value to stdout, --json schema, not-set exits nonzero, key sanitized in stderr message - run_set: success to stderr, --json schema, blocked namespace rejected, TOML injection rejected, limits set and readable, non-integer limits rejected, user.* namespace blocked (identity lives in identity.toml) - run_edit: no-repo exits, missing config exits, bad editor exits Security - TOML key injection: newline, bracket, equals, quote in domain key blocked - ANSI in key/value sanitized in run_show text mode - ANSI in exception message sanitized in run_set stderr - run_set success message goes to stderr (stdout clean for scripting) - run_get error goes to stderr E2E (full round-trip via CLI) - set then get is consistent for hub/domain/limits keys - set limits then show --json contains limits - set domain then show TOML is valid TOML - limits fall-through to domain is fixed (writes to [limits] not [domain]) - user.* set is blocked end-to-end Stress - 8 concurrent set_config_value calls to isolated repos: no corruption """ from __future__ import annotations import json import pathlib import threading import tomllib from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest from muse.core.paths import config_toml_path, muse_dir from tests.cli_test_helper import CliRunner, InvokeResult if TYPE_CHECKING: pass from muse.cli.commands.config_cmd import _GetJson, _SetJson from muse.core.types import JsonValue type _ReadJson = dict[str, JsonValue] from muse.core.types import MsgpackDict cli = None runner = CliRunner() # ── fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Minimal .muse/ repo with an empty config.toml.""" from muse._version import __version__ dot_muse = muse_dir(tmp_path) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") (dot_muse / "config.toml").write_text("") monkeypatch.chdir(tmp_path) return tmp_path def _json_get(result: InvokeResult) -> _GetJson: for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): d: _GetJson = json.loads(stripped) return d raise ValueError(f"No JSON in output:\n{result.output!r}") def _json_set(result: InvokeResult) -> _SetJson: for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): d: _SetJson = json.loads(stripped) return d raise ValueError(f"No JSON in output:\n{result.output!r}") def _json_show(result: InvokeResult) -> _ReadJson: """Extract the config payload from ``muse config read --json`` output. The envelope wraps config sections under a ``config`` key; return that inner dict so callers can access ``data["hub"]`` / ``data["domain"]`` etc. directly without caring about the envelope structure. """ lines = result.output.splitlines() start = None for i, line in enumerate(lines): if line.strip().startswith("{"): start = i break if start is None: raise ValueError(f"No JSON in output:\n{result.output!r}") depth = 0 collected: list[str] = [] for line in lines[start:]: collected.append(line) depth += line.count("{") - line.count("}") if depth <= 0: break envelope = json.loads("\n".join(collected)) # Unwrap envelope: return the inner config dict when present so assertions # like data["hub"] work without knowing the envelope structure. return envelope.get("config", envelope) # type: ignore[return-value] # ── Unit: _escape ───────────────────────────────────────────────────────────── class TestEscape: def test_backslash_escaped(self) -> None: from muse.cli.config import _escape assert _escape("back\\slash") == "back\\\\slash" def test_double_quote_escaped(self) -> None: from muse.cli.config import _escape assert _escape('say "hi"') == 'say \\"hi\\"' def test_newline_escaped(self) -> None: from muse.cli.config import _escape assert _escape("line1\nline2") == "line1\\nline2" def test_carriage_return_escaped(self) -> None: from muse.cli.config import _escape assert _escape("text\rmore") == "text\\rmore" def test_null_byte_removed(self) -> None: from muse.cli.config import _escape assert "\0" not in _escape("bad\0byte") def test_clean_string_passthrough(self) -> None: from muse.cli.config import _escape assert _escape("hello world") == "hello world" # ── Unit: _validate_toml_key ────────────────────────────────────────────────── class TestValidateTomlKey: def test_newline_in_key_rejected(self) -> None: from muse.cli.config import _validate_toml_key with pytest.raises(ValueError, match="TOML keys"): _validate_toml_key("bad\nkey") def test_carriage_return_rejected(self) -> None: from muse.cli.config import _validate_toml_key with pytest.raises(ValueError, match="TOML keys"): _validate_toml_key("bad\rkey") def test_closing_bracket_rejected(self) -> None: from muse.cli.config import _validate_toml_key with pytest.raises(ValueError, match="TOML keys"): _validate_toml_key("x]injection") def test_opening_bracket_rejected(self) -> None: from muse.cli.config import _validate_toml_key with pytest.raises(ValueError, match="TOML keys"): _validate_toml_key("[malicious") def test_equals_rejected(self) -> None: from muse.cli.config import _validate_toml_key with pytest.raises(ValueError, match="TOML keys"): _validate_toml_key("k=v") def test_double_quote_rejected(self) -> None: from muse.cli.config import _validate_toml_key with pytest.raises(ValueError, match="TOML keys"): _validate_toml_key('key"val') def test_null_byte_rejected(self) -> None: from muse.cli.config import _validate_toml_key with pytest.raises(ValueError, match="TOML keys"): _validate_toml_key("bad\0key") def test_safe_key_passes(self) -> None: from muse.cli.config import _validate_toml_key _validate_toml_key("ticks_per_beat") _validate_toml_key("my-key.123") _validate_toml_key("CamelCase") # ── Unit: _dump_toml ────────────────────────────────────────────────────────── class TestDumpToml: def test_limits_section_round_trips(self) -> None: from muse.cli.config import LimitsConfig, MuseConfig, _dump_toml cfg: MuseConfig = {"limits": LimitsConfig(max_walk_commits=99, max_ancestors=500)} toml_text = _dump_toml(cfg) parsed = tomllib.loads(toml_text) assert parsed["limits"]["max_walk_commits"] == 99 assert parsed["limits"]["max_ancestors"] == 500 def test_limits_shard_prefix_length_written(self) -> None: from muse.cli.config import LimitsConfig, MuseConfig, _dump_toml cfg: MuseConfig = {"limits": LimitsConfig(shard_prefix_length=4)} toml_text = _dump_toml(cfg) parsed = tomllib.loads(toml_text) assert parsed["limits"]["shard_prefix_length"] == 4 def test_domain_key_injection_blocked_in_dump(self) -> None: from muse.cli.config import MuseConfig, _dump_toml cfg: MuseConfig = {"domain": {"malicious\nkey": "val"}} with pytest.raises(ValueError, match="TOML keys"): _dump_toml(cfg) def test_remote_name_injection_blocked(self) -> None: from muse.cli.config import MuseConfig, RemoteEntry, _dump_toml cfg: MuseConfig = {"remotes": {"malicious\nname": RemoteEntry(url="http://localhost")}} with pytest.raises(ValueError, match="TOML keys"): _dump_toml(cfg) def test_value_with_newline_escaped_not_injected(self) -> None: from muse.cli.config import MuseConfig, _dump_toml cfg: MuseConfig = {"domain": {"key": "line1\nline2"}} toml_text = _dump_toml(cfg) parsed = tomllib.loads(toml_text) # The value line should contain \\n (escaped), not a literal newline value_line = next(l for l in toml_text.splitlines() if l.startswith("key")) assert "\n" not in value_line assert "\\n" in value_line assert "line1" in parsed["domain"]["key"] def test_user_section_not_emitted(self) -> None: """_dump_toml must never emit a [user] section — identity lives in identity.toml.""" from muse.cli.config import HubConfig, MuseConfig, _dump_toml cfg: MuseConfig = { "hub": HubConfig(url="https://musehub.ai"), "domain": {"k": "v"}, } toml_text = _dump_toml(cfg) assert "[user]" not in toml_text def test_hub_before_domain_in_section_order(self) -> None: from muse.cli.config import HubConfig, MuseConfig, _dump_toml cfg: MuseConfig = { "hub": HubConfig(url="https://musehub.ai"), "domain": {"k": "v"}, } toml_text = _dump_toml(cfg) hub_pos = toml_text.index("[hub]") domain_pos = toml_text.index("[domain]") assert hub_pos < domain_pos # ── Unit: set_config_value + get_config_value ───────────────────────────────── class TestSetGetConfigValue: def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "config.toml").write_text("") return tmp_path def test_limits_max_walk_commits_writes_to_limits_section( self, tmp_path: pathlib.Path ) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value set_config_value("limits.max_walk_commits", "5000", root) raw = (config_toml_path(root)).read_text() parsed = tomllib.loads(raw) assert "limits" in parsed assert parsed["limits"]["max_walk_commits"] == 5000 assert "domain" not in parsed def test_limits_max_walk_commits_NOT_written_to_domain( self, tmp_path: pathlib.Path ) -> None: """Regression: previously limits fell through to domain code path.""" root = self._make_repo(tmp_path) from muse.cli.config import set_config_value set_config_value("limits.max_walk_commits", "1000", root) raw = (config_toml_path(root)).read_text() parsed = tomllib.loads(raw) assert "domain" not in parsed def test_limits_shard_prefix_length_valid(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import get_config_value, set_config_value set_config_value("limits.shard_prefix_length", "4", root) assert get_config_value("limits.shard_prefix_length", root) == "4" def test_limits_shard_prefix_length_invalid_rejected( self, tmp_path: pathlib.Path ) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError, match="shard_prefix_length must be 2 or 4"): set_config_value("limits.shard_prefix_length", "3", root) def test_limits_non_integer_rejected(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError, match="integer"): set_config_value("limits.max_walk_commits", "notanint", root) def test_limits_zero_rejected(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError, match="positive"): set_config_value("limits.max_walk_commits", "0", root) def test_limits_negative_rejected(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError, match="positive"): set_config_value("limits.max_walk_commits", "-1", root) def test_limits_unknown_key_rejected(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError, match="Unknown \\[limits\\]"): set_config_value("limits.unknown_key", "5", root) def test_domain_key_injection_rejected(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError, match="TOML keys"): set_config_value("domain.malicious\nkey", "bad", root) def test_domain_key_with_bracket_injection_rejected( self, tmp_path: pathlib.Path ) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError, match="TOML keys"): set_config_value("domain.x][malicious", "bad", root) def test_domain_safe_key_written(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import get_config_value, set_config_value set_config_value("domain.ticks_per_beat", "480", root) assert get_config_value("domain.ticks_per_beat", root) == "480" def test_get_config_value_limits_after_write(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import get_config_value, set_config_value set_config_value("limits.max_ancestors", "25000", root) assert get_config_value("limits.max_ancestors", root) == "25000" def test_user_namespace_raises_value_error(self, tmp_path: pathlib.Path) -> None: """user.* writes must be blocked — identity lives in identity.toml.""" root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError): set_config_value("user.handle", "alice", root) def test_user_email_raises_value_error(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError): set_config_value("user.email", "a@b.com", root) def test_user_type_raises_value_error(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) from muse.cli.config import set_config_value with pytest.raises(ValueError): set_config_value("user.type", "human", root) # ── Unit: config_as_dict ───────────────────────────────────────────────────── class TestConfigAsDict: def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() return tmp_path def test_limits_included_in_output(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) (config_toml_path(root)).write_text( "[limits]\nmax_walk_commits = 5000\n" ) from muse.cli.config import config_as_dict d = config_as_dict(root) assert "limits" in d assert d["limits"]["max_walk_commits"] == "5000" def test_limits_absent_when_not_set(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) # Use a [hub] section to populate the config without [limits] (config_toml_path(root)).write_text('[hub]\nurl = "https://musehub.ai"\n') from muse.cli.config import config_as_dict d = config_as_dict(root) assert "limits" not in d def test_empty_config_returns_empty_dict(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) (config_toml_path(root)).write_text("") from muse.cli.config import config_as_dict assert config_as_dict(root) == {} def test_user_section_silently_dropped(self, tmp_path: pathlib.Path) -> None: """Old config.toml files with a [user] section must have it silently dropped.""" root = self._make_repo(tmp_path) (config_toml_path(root)).write_text( '[user]\nhandle = "alice"\n' ) from muse.cli.config import config_as_dict d = config_as_dict(root) assert "user" not in d # ── Integration: run_show ───────────────────────────────────────────────────── class TestRunRead: def test_read_json_includes_limits(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "7777"]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) assert "limits" in data limits = data["limits"] assert isinstance(limits, dict) assert limits["max_walk_commits"] == "7777" def test_read_json_schema_no_user_section(self, repo: pathlib.Path) -> None: """[user] section must be absent from config read --json output. User identity now lives exclusively in identity.toml; config.toml no longer stores or emits a [user] section. """ result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) assert "user" not in data def test_read_json_flag_emits_json(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) >= 1 def test_read_format_invalid_exits(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--format", "xml"]) assert result.exit_code != 0 def test_read_text_mode_ansi_sanitized( self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: # Write a config with ANSI in value (directly to file to bypass validation) (config_toml_path(repo)).write_text( '[domain]\nticks = "\\x1b[31mmalicious\\x1b[0m"\n' ) result = runner.invoke(cli, ["config", "read"]) assert "\x1b[" not in result.output def test_read_text_empty_config(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read"]) assert result.exit_code == 0 assert "No configuration set" in result.output def test_read_text_limits_section_displayed(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "limits.max_ancestors", "30000"]) result = runner.invoke(cli, ["config", "read"]) assert result.exit_code == 0 assert "[limits]" in result.output assert "max_ancestors" in result.output def test_read_json_stdout_clean(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "read", "--json"]) json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) >= 1 def test_read_no_repo_still_works( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: # show gracefully shows empty config outside a repo (uses cwd) monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["config", "read"]) assert result.exit_code == 0 # ── Integration: run_get ───────────────────────────────────────────────────── class TestRunGet: def test_get_existing_hub_key_raw_value(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url"]) assert result.exit_code == 0 assert "musehub.ai" in result.output def test_get_json_schema_domain_key(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) result = runner.invoke(cli, ["config", "get", "domain.ticks_per_beat", "--json"]) assert result.exit_code == 0 data = _json_get(result) assert data["key"] == "domain.ticks_per_beat" assert data["value"] == "480" def test_get_missing_key_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "get", "hub.url"]) assert result.exit_code != 0 def test_get_missing_key_error_to_stderr( self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: result = runner.invoke(cli, ["config", "get", "hub.url"]) assert result.exit_code != 0 assert "not set" in result.stderr def test_get_limits_key_after_set(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "12345"]) result = runner.invoke(cli, ["config", "get", "limits.max_walk_commits"]) assert result.exit_code == 0 assert "12345" in result.output def test_get_json_stdout_clean(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.sample_rate", "44100"]) result = runner.invoke(cli, ["config", "get", "domain.sample_rate", "--json"]) json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) >= 1 def test_get_user_namespace_exits_nonzero(self, repo: pathlib.Path) -> None: """user.* get must exit nonzero — user identity is not stored in config.toml.""" result = runner.invoke(cli, ["config", "get", "user.handle"]) assert result.exit_code != 0 # ── Integration: run_set ───────────────────────────────────────────────────── class TestRunSet: def test_set_success_json_schema_domain(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "domain.ticks_per_beat", "960", "--json"] ) assert result.exit_code == 0 data = _json_set(result) assert data["status"] == "ok" assert data["key"] == "domain.ticks_per_beat" assert data["value"] == "960" def test_set_success_stderr_message_domain( self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: result = runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "960"]) assert result.exit_code == 0 assert "960" in result.stderr def test_set_json_stdout_clean(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "domain.ticks_per_beat", "480", "--json"] ) json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) >= 1 def test_set_user_namespace_blocked(self, repo: pathlib.Path) -> None: """user.* writes are blocked — identity lives in identity.toml.""" result = runner.invoke(cli, ["config", "set", "user.handle", "Alice"]) assert result.exit_code != 0 def test_set_user_namespace_error_mentions_auth(self, repo: pathlib.Path) -> None: """Error message for user.* must guide user toward auth/identity commands.""" result = runner.invoke(cli, ["config", "set", "user.handle", "Alice"]) assert result.exit_code != 0 assert "auth" in result.stderr.lower() or "identity" in result.stderr.lower() def test_set_blocked_namespace_exits(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "auth.token", "secret"]) assert result.exit_code != 0 def test_set_blocked_remotes_namespace_exits(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "remotes.origin", "url"]) assert result.exit_code != 0 def test_set_domain_newline_injection_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.malicious\nkey", "bad"]) assert result.exit_code != 0 def test_set_domain_bracket_injection_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.x][malicious", "bad"]) assert result.exit_code != 0 def test_set_limits_max_walk_commits(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "limits.max_walk_commits", "20000"] ) assert result.exit_code == 0 get_result = runner.invoke(cli, ["config", "get", "limits.max_walk_commits"]) assert "20000" in get_result.output def test_set_limits_non_integer_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "limits.max_walk_commits", "abc"] ) assert result.exit_code != 0 def test_set_limits_zero_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "limits.max_walk_commits", "0"] ) assert result.exit_code != 0 def test_set_limits_shard_prefix_valid(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "limits.shard_prefix_length", "4"] ) assert result.exit_code == 0 def test_set_limits_shard_prefix_invalid_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "limits.shard_prefix_length", "3"] ) assert result.exit_code != 0 def test_set_error_ansi_sanitized_in_output( self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: # Use a key with ANSI that would appear in the error message ansi_key = "domain.\x1b[31mmalicious\x1b[0m\nkey" result = runner.invoke(cli, ["config", "set", ansi_key, "val"]) assert result.exit_code != 0 assert "\x1b[" not in result.output def test_set_limits_writes_to_limits_not_domain(self, repo: pathlib.Path) -> None: """Regression: limits namespace must not fall through to domain code.""" runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "9999"]) raw = (config_toml_path(repo)).read_text() parsed = tomllib.loads(raw) assert "domain" not in parsed assert parsed["limits"]["max_walk_commits"] == 9999 # ── Integration: run_edit ───────────────────────────────────────────────────── class TestRunEdit: def test_edit_no_repo_exits( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code != 0 def test_edit_missing_config_file_autocreated( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Missing config.toml must be auto-created before the editor opens.""" (config_toml_path(repo)).unlink() monkeypatch.setenv("EDITOR", "true") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code == 0 assert (config_toml_path(repo)).exists() def test_edit_bad_editor_exits( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("EDITOR", "nonexistent-editor-xyz") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code != 0 def test_edit_invokes_editor( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("EDITOR", "true") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code == 0 # ── Security ────────────────────────────────────────────────────────────────── class TestConfigSecurity: def test_toml_key_injection_blocked_end_to_end(self, repo: pathlib.Path) -> None: """Setting domain key with newline must not corrupt config.toml.""" result = runner.invoke( cli, ["config", "set", "domain.malicious\nkey", "bad"] ) assert result.exit_code != 0 raw = (config_toml_path(repo)).read_text() assert "\nkey" not in raw def test_bracket_injection_blocked(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "domain.x][malicious", "val"] ) assert result.exit_code != 0 raw = (config_toml_path(repo)).read_text() assert "[malicious]" not in raw def test_equals_injection_blocked(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "domain.x=y", "val"] ) assert result.exit_code != 0 def test_ansi_in_read_text_stripped(self, repo: pathlib.Path) -> None: (config_toml_path(repo)).write_text( '[domain]\nticks = "\\x1b[31mred\\x1b[0m"\n' ) result = runner.invoke(cli, ["config", "read"]) assert "\x1b[" not in result.output def test_auth_namespace_always_blocked(self, repo: pathlib.Path) -> None: for key in ("auth.token", "auth.password", "auth.secret"): result = runner.invoke(cli, ["config", "set", key, "val"]) assert result.exit_code != 0, f"Expected {key!r} to be blocked" def test_user_namespace_always_blocked(self, repo: pathlib.Path) -> None: """user.* writes must always be blocked; identity.toml owns that namespace.""" for key in ("user.handle", "user.email", "user.type", "user.display_name"): result = runner.invoke(cli, ["config", "set", key, "val"]) assert result.exit_code != 0, f"Expected {key!r} to be blocked" def test_credentials_not_in_json_output(self, repo: pathlib.Path) -> None: (config_toml_path(repo)).write_text( '[hub]\nurl = "https://localhost:1337"\n' '[auth]\ntoken = "secret-token"\n' ) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 assert "secret-token" not in result.output def test_value_newline_escaped_in_toml(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.label", "line1\nline2"]) raw = (config_toml_path(repo)).read_text() # The literal newline must not appear in the value field label_line = [l for l in raw.splitlines() if "label" in l][0] assert "\n" not in label_line def test_error_message_to_stderr_not_stdout(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["config", "set", "domain.malicious\nkey", "val"] ) assert result.exit_code != 0 json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) == 0 def test_user_section_dropped_from_old_config(self, repo: pathlib.Path) -> None: """Old config.toml with [user] section must not expose it in read output.""" (config_toml_path(repo)).write_text( '[user]\nhandle = "alice"\n' ) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) assert "user" not in data # ── E2E round-trips ─────────────────────────────────────────────────────────── class TestE2ERoundTrips: def test_set_user_namespace_blocked_end_to_end(self, repo: pathlib.Path) -> None: """user.* is blocked at the CLI layer — exit nonzero, nothing written.""" result = runner.invoke(cli, ["config", "set", "user.handle", "DeepBlue"]) assert result.exit_code != 0 raw = (config_toml_path(repo)).read_text() assert "DeepBlue" not in raw def test_set_hub_then_get(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url"]) assert result.exit_code == 0 assert "musehub.ai" in result.output def test_set_limits_then_read_json(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "3333"]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) limits = data.get("limits") assert isinstance(limits, dict) assert limits.get("max_walk_commits") == "3333" def test_set_domain_then_read_valid_toml(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "960"]) result = runner.invoke(cli, ["config", "read"]) assert result.exit_code == 0 assert "960" in result.output def test_limits_written_to_limits_section_not_domain( self, repo: pathlib.Path ) -> None: runner.invoke(cli, ["config", "set", "limits.max_graph_commits", "8888"]) raw = (config_toml_path(repo)).read_text() parsed = tomllib.loads(raw) assert "domain" not in parsed assert parsed["limits"]["max_graph_commits"] == 8888 def test_multiple_writes_preserve_all_sections(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "2000"]) raw = (config_toml_path(repo)).read_text() parsed = tomllib.loads(raw) assert parsed["hub"]["url"] == "https://musehub.ai" assert parsed["domain"]["ticks_per_beat"] == "480" assert parsed["limits"]["max_walk_commits"] == 2000 def test_set_json_get_json_consistent_domain(self, repo: pathlib.Path) -> None: set_result = runner.invoke( cli, ["config", "set", "domain.ticks_per_beat", "960", "--json"] ) get_result = runner.invoke( cli, ["config", "get", "domain.ticks_per_beat", "--json"] ) set_data = _json_set(set_result) get_data = _json_get(get_result) assert set_data["value"] == get_data["value"] == "960" # ── Stress ──────────────────────────────────────────────────────────────────── class TestStress: def test_8_concurrent_set_domain_to_isolated_repos( self, tmp_path: pathlib.Path ) -> None: """8 threads writing domain keys to independent repos must not corrupt each other.""" from muse._version import __version__ from muse.cli.config import get_config_value, set_config_value errors: list[str] = [] def _do(idx: int) -> None: try: repo_dir = tmp_path / f"repo_{idx}" dot_muse = muse_dir(repo_dir) dot_muse.mkdir(parents=True) (dot_muse / "config.toml").write_text("") (dot_muse / "repo.json").write_text( json.dumps({ "repo_id": f"repo-{idx}", "schema_version": __version__, "domain": "code", }) ) value = f"val_{idx}" set_config_value("domain.label", value, repo_dir) result = get_config_value("domain.label", repo_dir) assert result == value, f"Expected {value!r}, got {result!r}" except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent config write failures:\n{'\n'.join(errors)}" def test_8_concurrent_user_set_all_blocked( self, tmp_path: pathlib.Path ) -> None: """8 threads attempting user.* writes must all raise ValueError.""" from muse._version import __version__ from muse.cli.config import set_config_value errors: list[str] = [] def _do(idx: int) -> None: try: repo_dir = tmp_path / f"user_repo_{idx}" dot_muse = muse_dir(repo_dir) dot_muse.mkdir(parents=True) (dot_muse / "config.toml").write_text("") (dot_muse / "repo.json").write_text( json.dumps({ "repo_id": f"repo-{idx}", "schema_version": __version__, "domain": "code", }) ) try: set_config_value("user.handle", f"user_{idx}", repo_dir) errors.append(f"Thread {idx}: expected ValueError, got none") except ValueError: pass # expected except Exception as exc: errors.append(f"Thread {idx}: unexpected error: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Unexpected results in user.* block tests:\n{'\n'.join(errors)}" def test_8_concurrent_set_limits_isolated( self, tmp_path: pathlib.Path ) -> None: """8 threads writing limits to isolated repos must write to [limits] not [domain].""" from muse._version import __version__ from muse.cli.config import set_config_value errors: list[str] = [] def _do(idx: int) -> None: try: repo_dir = tmp_path / f"limits_repo_{idx}" dot_muse = muse_dir(repo_dir) dot_muse.mkdir(parents=True) (dot_muse / "config.toml").write_text("") (dot_muse / "repo.json").write_text( json.dumps({ "repo_id": f"repo-{idx}", "schema_version": __version__, "domain": "code", }) ) set_config_value("limits.max_walk_commits", str(1000 + idx), repo_dir) raw = (dot_muse / "config.toml").read_text() parsed = tomllib.loads(raw) assert "limits" in parsed, "limits section missing" assert "domain" not in parsed, "limits fell through to domain" assert parsed["limits"]["max_walk_commits"] == 1000 + idx except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent limits write failures:\n{'\n'.join(errors)}" # ============================================================================= # muse config read — extended hardening # ============================================================================= class TestRunReadExtended: """Additional coverage for ``muse config read`` gaps.""" # ── flag aliases ────────────────────────────────────────────────────────── def test_j_short_flag_emits_json(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "read", "-j"]) assert result.exit_code == 0 data = _json_show(result) assert isinstance(data, dict) def test_j_flag_same_as_json_flag(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) r1 = runner.invoke(cli, ["config", "read", "--json"]) r2 = runner.invoke(cli, ["config", "read", "-j"]) assert r1.exit_code == 0 assert r2.exit_code == 0 # Compare only config sections, not timing fields which naturally differ d1 = {k: v for k, v in _json_show(r1).items() if k not in ("duration_ms", "exit_code")} d2 = {k: v for k, v in _json_show(r2).items() if k not in ("duration_ms", "exit_code")} assert d1 == d2 def test_j_shorthand_emits_json(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) result = runner.invoke(cli, ["config", "read", "-j"]) assert result.exit_code == 0 data = _json_show(result) assert isinstance(data, dict) def test_default_is_text_output(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "read"]) assert result.exit_code == 0 assert "[hub]" in result.output assert "{" not in result.output # no JSON def test_json_flag_emits_single_object( self, repo: pathlib.Path ) -> None: """--json must emit exactly one JSON object, not duplicate output.""" runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 # Must be parseable as a single JSON object data = _json_show(result) assert isinstance(data, dict) # ── JSON structure ──────────────────────────────────────────────────────── def test_json_is_compact(self, repo: pathlib.Path) -> None: """JSON output must be compact — agents parse it, not humans.""" runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 # Compact JSON is a single line json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) == 1 json.loads(json_lines[0]) # must be valid JSON def test_json_hub_section_present(self, repo: pathlib.Path) -> None: from muse.cli.config import set_hub_url set_hub_url("https://musehub.ai", repo) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) assert "hub" in data hub = data["hub"] assert isinstance(hub, dict) assert hub["url"] == "https://musehub.ai" def test_json_remotes_section_present(self, repo: pathlib.Path) -> None: from muse.cli.config import set_remote set_remote("origin", "https://hub.example.com/owner/repo", repo) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) assert "remotes" in data remotes = data["remotes"] assert isinstance(remotes, dict) assert "origin" in remotes def test_json_domain_section_present(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "960"]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) assert "domain" in data domain = data["domain"] assert isinstance(domain, dict) assert domain["ticks_per_beat"] == "960" def test_json_no_user_section(self, repo: pathlib.Path) -> None: """[user] must never appear in config read --json output.""" runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "5000"]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) assert "user" not in data assert "domain" in data assert "limits" in data def test_json_empty_config_has_no_sections(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) # Config sections absent; only timing/status keys present for section in ("user", "hub", "remotes", "domain", "limits"): assert section not in data def test_json_no_credentials(self, repo: pathlib.Path) -> None: """Credentials (auth, token) must never appear in JSON output.""" # Write a config with a fake [auth] section directly (config_toml_path(repo)).write_text( '[hub]\nurl = "https://musehub.ai"\n\n[auth]\ntoken = "secret"\n' ) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 assert "secret" not in result.output assert "token" not in result.output # ── text mode structure ─────────────────────────────────────────────────── def test_text_output_to_stdout(self, repo: pathlib.Path) -> None: """Text mode config content goes to stdout, not only stderr.""" runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "read"]) assert result.exit_code == 0 assert "[hub]" in result.output def test_text_no_user_section_header(self, repo: pathlib.Path) -> None: """Text mode must never emit [user] — user identity is not in config.toml.""" runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "read"]) assert "[user]" not in result.output def test_text_hub_section_header(self, repo: pathlib.Path) -> None: from muse.cli.config import set_hub_url set_hub_url("https://musehub.ai", repo) result = runner.invoke(cli, ["config", "read"]) assert "[hub]" in result.output def test_text_remotes_section_header(self, repo: pathlib.Path) -> None: from muse.cli.config import set_remote set_remote("origin", "https://hub.example.com/owner/repo", repo) result = runner.invoke(cli, ["config", "read"]) assert "[remotes." in result.output def test_text_domain_section_header(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) result = runner.invoke(cli, ["config", "read"]) assert "[domain]" in result.output def test_text_key_value_format_domain(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) result = runner.invoke(cli, ["config", "read"]) # TOML key = "value" format assert 'ticks_per_beat = "480"' in result.output def test_text_limits_no_quotes_on_values(self, repo: pathlib.Path) -> None: """Limits values are integers in TOML — no quotes.""" runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "8000"]) result = runner.invoke(cli, ["config", "read"]) assert "max_walk_commits = 8000" in result.output def test_read_help_contains_quickstart(self) -> None: result = runner.invoke(cli, ["config", "read", "--help"]) assert result.exit_code == 0 assert "quickstart" in result.output.lower() or "jq" in result.output def test_read_help_contains_exit_codes(self) -> None: result = runner.invoke(cli, ["config", "read", "--help"]) assert result.exit_code == 0 assert "Exit codes" in result.output or "exit" in result.output.lower() class TestRunReadStress: """Stress tests for ``muse config read``.""" def test_concurrent_read_calls(self, repo: pathlib.Path) -> None: """Concurrent config_as_dict reads of the same config must all succeed. Uses config_as_dict directly — CliRunner shares a buffer across threads so full CLI invocations cannot be called concurrently from different threads. """ import threading from muse.cli.config import config_as_dict, set_config_value set_config_value("hub.url", "https://musehub.ai", repo) set_config_value("limits.max_walk_commits", "1000", repo) errors: list[str] = [] def _do(idx: int) -> None: try: data = config_as_dict(repo) assert "hub" in data assert data["hub"]["url"] == "https://musehub.ai" except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], "\n".join(errors) def test_read_large_domain_config(self, repo: pathlib.Path) -> None: """Read handles a config with many domain keys without truncation.""" for i in range(20): runner.invoke(cli, ["config", "set", f"domain.key_{i}", str(i * 10)]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = _json_show(result) domain = data.get("domain") assert isinstance(domain, dict) assert len(domain) == 20 def test_read_json_output_is_valid_json(self, repo: pathlib.Path) -> None: """JSON output must always be parseable regardless of config contents.""" runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) runner.invoke(cli, ["config", "set", "limits.max_ancestors", "50000"]) result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 # json.loads raises on invalid JSON parsed = json.loads(result.output) assert isinstance(parsed, dict) # ============================================================================= # muse config get — extended hardening # ============================================================================= class TestRunGetExtended: """Additional coverage for ``muse config get`` gaps.""" # ── flag aliases ────────────────────────────────────────────────────────── def test_j_short_flag_emits_json(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "-j"]) assert result.exit_code == 0 data = _json_get(result) assert "musehub.ai" in data["value"] def test_j_same_as_json_flag(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) r1 = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) r2 = runner.invoke(cli, ["config", "get", "hub.url", "-j"]) assert r1.exit_code == 0 and r2.exit_code == 0 # Compare only stable fields — timing fields naturally differ between calls skip = {"duration_ms", "timestamp"} d1 = {k: v for k, v in _json_get(r1).items() if k not in skip} d2 = {k: v for k, v in _json_get(r2).items() if k not in skip} assert d1 == d2 # ── key format validation ───────────────────────────────────────────────── def test_key_without_dot_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "get", "username"]) assert result.exit_code != 0 def test_key_without_dot_shows_format_hint(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "get", "username"]) assert "namespace.subkey" in result.stderr or "hub.url" in result.stderr def test_key_without_dot_does_not_say_not_set( self, repo: pathlib.Path ) -> None: """Malformed key must not produce the misleading 'is not set' message.""" result = runner.invoke(cli, ["config", "get", "badkey"]) assert "is not set" not in result.output def test_empty_key_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "get", ""]) assert result.exit_code != 0 def test_unknown_namespace_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "get", "unknown.key"]) assert result.exit_code != 0 # ── user.* get returns not-set (no longer in config.toml) ───────────────── def test_get_user_handle_not_set(self, repo: pathlib.Path) -> None: """user.handle is not stored in config.toml — must exit nonzero.""" result = runner.invoke(cli, ["config", "get", "user.handle"]) assert result.exit_code != 0 def test_get_user_email_not_set(self, repo: pathlib.Path) -> None: """user.email is not stored in config.toml — must exit nonzero.""" result = runner.invoke(cli, ["config", "get", "user.email"]) assert result.exit_code != 0 def test_get_user_type_not_set(self, repo: pathlib.Path) -> None: """user.type is not stored in config.toml — must exit nonzero.""" result = runner.invoke(cli, ["config", "get", "user.type"]) assert result.exit_code != 0 # ── all supported key namespaces ────────────────────────────────────────── def test_get_hub_url(self, repo: pathlib.Path) -> None: from muse.cli.config import set_hub_url set_hub_url("https://musehub.ai", repo) result = runner.invoke(cli, ["config", "get", "hub.url"]) assert result.exit_code == 0 assert "musehub.ai" in result.output def test_get_domain_key(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "960"]) result = runner.invoke(cli, ["config", "get", "domain.ticks_per_beat"]) assert result.exit_code == 0 assert "960" in result.output def test_get_limits_max_ancestors(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "limits.max_ancestors", "20000"]) result = runner.invoke(cli, ["config", "get", "limits.max_ancestors"]) assert result.exit_code == 0 assert "20000" in result.output def test_get_limits_max_graph_commits(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "limits.max_graph_commits", "500"]) result = runner.invoke(cli, ["config", "get", "limits.max_graph_commits"]) assert result.exit_code == 0 assert "500" in result.output def test_get_limits_shard_prefix_length(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "limits.shard_prefix_length", "4"]) result = runner.invoke(cli, ["config", "get", "limits.shard_prefix_length"]) assert result.exit_code == 0 assert "4" in result.output # ── JSON structure ──────────────────────────────────────────────────────── def test_json_key_field_matches_input(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 data = _json_get(result) assert data["key"] == "hub.url" def test_json_value_matches_text_output(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) r_text = runner.invoke(cli, ["config", "get", "domain.ticks_per_beat"]) r_json = runner.invoke(cli, ["config", "get", "domain.ticks_per_beat", "--json"]) assert r_text.exit_code == 0 and r_json.exit_code == 0 json_value = _json_get(r_json)["value"] assert json_value in r_text.output def test_json_missing_key_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code != 0 def test_json_stdout_clean_on_success(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 # Output must be valid JSON parsed = json.loads(result.output) assert "key" in parsed and "value" in parsed # ── text mode output ────────────────────────────────────────────────────── def test_text_value_printed_to_stdout(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "960"]) result = runner.invoke(cli, ["config", "get", "domain.ticks_per_beat"]) assert result.exit_code == 0 assert "960" in result.output def test_text_error_goes_to_stderr_marker(self, repo: pathlib.Path) -> None: """Not-set error must not appear in stdout (goes to stderr).""" result = runner.invoke(cli, ["config", "get", "hub.url"]) # CliRunner merges stderr — ensure exit is nonzero and message present assert result.exit_code != 0 assert "not set" in result.stderr or "not" in result.stderr.lower() def test_get_help_contains_key_reference(self) -> None: result = runner.invoke(cli, ["config", "get", "--help"]) assert result.exit_code == 0 assert "user.handle" in result.output or "hub.url" in result.output def test_get_help_contains_exit_codes(self) -> None: result = runner.invoke(cli, ["config", "get", "--help"]) assert result.exit_code == 0 assert "Exit codes" in result.output or "exit" in result.output.lower() # ── outside repo ───────────────────────────────────────────────────────── def test_get_outside_repo_key_not_set( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Outside a repo, all keys return not-set (gracefully).""" monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["config", "get", "hub.url"]) assert result.exit_code != 0 class TestRunGetStress: """Stress tests for ``muse config get``.""" def test_concurrent_get_reads(self, repo: pathlib.Path) -> None: """Concurrent config_as_dict reads are thread-safe.""" import threading from muse.cli.config import get_config_value, set_config_value set_config_value("domain.label", "StressTest", repo) errors: list[str] = [] def _do(idx: int) -> None: try: value = get_config_value("domain.label", repo) assert value == "StressTest" except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], "\n".join(errors) def test_all_limits_keys_readable(self, repo: pathlib.Path) -> None: """All four limits keys must be individually gettable after set.""" pairs = [ ("limits.max_walk_commits", "9000"), ("limits.max_ancestors", "40000"), ("limits.max_graph_commits", "250"), ("limits.shard_prefix_length", "4"), ] for key, val in pairs: runner.invoke(cli, ["config", "set", key, val]) for key, expected in pairs: result = runner.invoke(cli, ["config", "get", key]) assert result.exit_code == 0, f"Failed for {key}" assert expected in result.output, f"Expected {expected!r} for {key}" def test_get_many_different_keys_sequentially( self, repo: pathlib.Path ) -> None: """Reading many keys in sequence must not corrupt state.""" runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) runner.invoke(cli, ["config", "set", "domain.sample_rate", "44100"]) runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "1000"]) for _ in range(20): for key, expected in [ ("hub.url", "musehub.ai"), ("domain.ticks_per_beat", "480"), ("domain.sample_rate", "44100"), ("limits.max_walk_commits", "1000"), ]: result = runner.invoke(cli, ["config", "get", key]) assert result.exit_code == 0 assert expected in result.output # ── Extended: run_set ───────────────────────────────────────────────────────── class TestRunSetExtended: """Extended hardening tests for ``muse config set``.""" def test_j_alias_works(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480", "-j"]) assert result.exit_code == 0 data = _json_set(result) assert data["status"] == "ok" assert data["key"] == "domain.ticks_per_beat" assert data["value"] == "480" def test_key_without_dot_exits_with_format_error(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "username", "Alice"]) assert result.exit_code != 0 assert "namespace.subkey" in result.stderr def test_key_without_dot_mentions_example(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "badkey", "val"]) assert result.exit_code != 0 assert "user.handle" in result.stderr or "hub.url" in result.stderr def test_unknown_namespace_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "mystery.key", "val"]) assert result.exit_code != 0 def test_unknown_namespace_error_message_helpful(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "mystery.key", "val"]) assert "mystery" in result.stderr or "Unknown" in result.stderr def test_user_handle_blocked(self, repo: pathlib.Path) -> None: """user.handle is blocked — identity lives in identity.toml.""" result = runner.invoke(cli, ["config", "set", "user.handle", "Bob"]) assert result.exit_code != 0 def test_user_email_blocked(self, repo: pathlib.Path) -> None: """user.email is blocked — identity lives in identity.toml.""" result = runner.invoke(cli, ["config", "set", "user.email", "bob@example.com"]) assert result.exit_code != 0 def test_user_type_blocked(self, repo: pathlib.Path) -> None: """user.type is blocked — identity lives in identity.toml.""" result = runner.invoke(cli, ["config", "set", "user.type", "human"]) assert result.exit_code != 0 def test_user_handle_blocked_error_mentions_auth(self, repo: pathlib.Path) -> None: """Block message for user.* must redirect toward auth/identity commands.""" result = runner.invoke(cli, ["config", "set", "user.handle", "Bob"]) assert result.exit_code != 0 assert "auth" in result.stderr.lower() or "identity" in result.stderr.lower() def test_hub_url_https_settable(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) assert result.exit_code == 0 def test_hub_url_http_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "http://musehub.ai"]) assert result.exit_code != 0 def test_hub_unknown_subkey_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.secret", "val"]) assert result.exit_code != 0 def test_domain_key_settable(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) assert result.exit_code == 0 def test_limits_max_ancestors_settable(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "limits.max_ancestors", "25000"]) assert result.exit_code == 0 get_result = runner.invoke(cli, ["config", "get", "limits.max_ancestors"]) assert "25000" in get_result.output def test_limits_max_graph_commits_settable(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "limits.max_graph_commits", "5000"]) assert result.exit_code == 0 get_result = runner.invoke(cli, ["config", "get", "limits.max_graph_commits"]) assert "5000" in get_result.output def test_limits_shard_prefix_2_valid(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "limits.shard_prefix_length", "2"]) assert result.exit_code == 0 def test_limits_shard_prefix_4_valid(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "limits.shard_prefix_length", "4"]) assert result.exit_code == 0 def test_limits_shard_prefix_1_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "limits.shard_prefix_length", "1"]) assert result.exit_code != 0 def test_limits_shard_prefix_3_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "limits.shard_prefix_length", "3"]) assert result.exit_code != 0 def test_limits_negative_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "limits.max_walk_commits", "-5"]) assert result.exit_code != 0 def test_blocked_auth_shows_redirect(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "auth.token", "secret"]) assert result.exit_code != 0 assert "auth" in result.stderr.lower() or "login" in result.stderr.lower() def test_blocked_remotes_shows_redirect(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "remotes.origin", "url"]) assert result.exit_code != 0 assert "remote" in result.stderr.lower() def test_success_json_status_ok(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) data = _json_set(result) assert data["status"] == "ok" def test_success_json_stdout_only_has_json(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) >= 1 def test_success_text_goes_to_output(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.ticks_per_beat", "480"]) assert result.exit_code == 0 assert "480" in result.stderr def test_help_contains_settable_namespaces(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "--help"]) assert "hub.url" in result.output assert "domain" in result.output assert "limits" in result.output def test_help_contains_blocked_namespaces(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "--help"]) assert "auth" in result.output assert "remotes" in result.output def test_help_mentions_exit_codes(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "--help"]) assert "Exit" in result.output or "exit" in result.output # ── Security: run_set ───────────────────────────────────────────────────────── class TestRunSetSecurity: """Security-focused tests for ``muse config set``.""" def test_ansi_in_key_sanitized_in_error(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.\x1b[31mmalicious\x1b[0m\nkey", "val"]) assert result.exit_code != 0 assert "\x1b[" not in result.output def test_ansi_in_value_sanitized_in_text_success(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.label", "\x1b[31mRed\x1b[0m"]) assert result.exit_code == 0 assert "\x1b[" not in result.output def test_null_byte_in_domain_key_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.malicious\x00key", "val"]) assert result.exit_code != 0 def test_bracket_in_domain_key_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.x][malicious", "val"]) assert result.exit_code != 0 def test_equals_in_domain_key_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.key=malicious", "val"]) assert result.exit_code != 0 def test_newline_in_domain_key_rejected(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "domain.malicious\nkey", "val"]) assert result.exit_code != 0 def test_key_without_dot_shows_format_hint_not_generic_error(self, repo: pathlib.Path) -> None: """Ensures format error, not a generic 'unknown namespace' message.""" result = runner.invoke(cli, ["config", "set", "nodot", "val"]) assert result.exit_code != 0 assert "namespace.subkey" in result.stderr def test_hub_http_blocked_not_stored(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "http://attacker.example.com"]) result = runner.invoke(cli, ["config", "get", "hub.url"]) assert result.exit_code != 0 or "attacker.example.com" not in result.output def test_user_namespace_not_stored_on_block(self, repo: pathlib.Path) -> None: """Blocked user.* write must not write anything to config.toml.""" runner.invoke(cli, ["config", "set", "user.handle", "ShouldNotStore"]) raw = (config_toml_path(repo)).read_text() assert "ShouldNotStore" not in raw assert "[user]" not in raw # ── Stress: run_set ─────────────────────────────────────────────────────────── class TestRunSetStress: """Concurrency and volume tests for ``muse config set``.""" def test_concurrent_set_to_different_repos_no_corruption( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """8 threads writing to separate repos must each produce correct values.""" import threading from muse._version import __version__ from muse.cli.config import get_config_value, set_config_value errors: list[str] = [] def _worker(idx: int) -> None: repo_path = tmp_path / f"repo_{idx}" dot_muse = muse_dir(repo_path) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": f"repo-{idx}", "schema_version": __version__, "domain": "code"}) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "config.toml").write_text("") expected = f"val_{idx}" set_config_value("domain.label", expected, repo_path) got = get_config_value("domain.label", repo_path) if got != expected: errors.append(f"repo_{idx}: expected {expected!r}, got {got!r}") threads = [threading.Thread(target=_worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], "\n".join(errors) def test_all_four_limits_keys_set_round_trip(self, repo: pathlib.Path) -> None: """All four limits keys written then read back.""" from muse.cli.config import get_config_value, set_config_value pairs = [ ("limits.max_walk_commits", "10000"), ("limits.max_ancestors", "5000"), ("limits.max_graph_commits", "2500"), ("limits.shard_prefix_length", "4"), ] for key, val in pairs: set_config_value(key, val, repo) for key, expected in pairs: got = get_config_value(key, repo) assert got == expected, f"{key}: expected {expected!r}, got {got!r}" def test_20_sequential_domain_sets_no_state_corruption(self, repo: pathlib.Path) -> None: """Writing 20 different domain.label values sequentially — last write wins.""" from muse.cli.config import get_config_value, set_config_value for i in range(20): set_config_value("domain.label", f"val_{i}", repo) got = get_config_value("domain.label", repo) assert got == "val_19" def test_user_set_always_raises_not_written(self, repo: pathlib.Path) -> None: """20 sequential user.* writes must all raise ValueError, nothing stored.""" from muse.cli.config import set_config_value for i in range(20): with pytest.raises(ValueError): set_config_value("user.handle", f"user_{i}", repo) raw = (config_toml_path(repo)).read_text() assert "[user]" not in raw # ── Extended: run_edit ──────────────────────────────────────────────────────── class TestRunEditExtended: """Extended hardening tests for ``muse config edit``.""" def test_visual_takes_precedence_over_editor( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """$VISUAL must be used when both $VISUAL and $EDITOR are set.""" calls: list[list[str]] = [] import subprocess as _sp real_run = _sp.run def _fake_run(cmd: list[str], **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: calls.append(cmd) return real_run(["true"], **{k: v for k, v in kwargs.items()}) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.setenv("VISUAL", "visual-editor") monkeypatch.setenv("EDITOR", "editor-fallback") runner.invoke(cli, ["config", "edit"]) assert calls and calls[0][0] == "visual-editor" def test_editor_used_when_visual_unset( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: calls: list[list[str]] = [] import subprocess as _sp real_run = _sp.run def _fake_run(cmd: list[str], **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: calls.append(cmd) return real_run(["true"], **{k: v for k, v in kwargs.items()}) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.delenv("VISUAL", raising=False) monkeypatch.setenv("EDITOR", "my-editor") runner.invoke(cli, ["config", "edit"]) assert calls and calls[0][0] == "my-editor" def test_vi_fallback_when_both_unset( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: calls: list[list[str]] = [] import subprocess as _sp real_run = _sp.run def _fake_run(cmd: list[str], **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: calls.append(cmd) return real_run(["true"], **{k: v for k, v in kwargs.items()}) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.delenv("VISUAL", raising=False) monkeypatch.delenv("EDITOR", raising=False) runner.invoke(cli, ["config", "edit"]) assert calls and calls[0][0] == "vi" def test_multiword_editor_split_correctly( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """EDITOR='code --wait' must be split to ['code', '--wait', path].""" calls: list[list[str]] = [] import subprocess as _sp real_run = _sp.run def _fake_run(cmd: list[str], **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: calls.append(cmd) return real_run(["true"], **{k: v for k, v in kwargs.items()}) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.setenv("EDITOR", "code --wait") monkeypatch.delenv("VISUAL", raising=False) runner.invoke(cli, ["config", "edit"]) assert calls and calls[0][:2] == ["code", "--wait"] def test_multiword_editor_passes_config_path( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Multi-word editor: config path must be the final argument.""" calls: list[list[str]] = [] import subprocess as _sp real_run = _sp.run def _fake_run(cmd: list[str], **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: calls.append(cmd) return real_run(["true"], **{k: v for k, v in kwargs.items()}) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.setenv("EDITOR", "emacs -nw") monkeypatch.delenv("VISUAL", raising=False) runner.invoke(cli, ["config", "edit"]) assert calls and "config.toml" in calls[0][-1] def test_auto_create_config_when_missing( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: (config_toml_path(repo)).unlink() monkeypatch.setenv("EDITOR", "true") monkeypatch.delenv("VISUAL", raising=False) assert not (config_toml_path(repo)).exists() result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code == 0 assert (config_toml_path(repo)).exists() def test_auto_create_info_message_on_stderr( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: (config_toml_path(repo)).unlink() monkeypatch.setenv("EDITOR", "true") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert "Created" in result.stderr or "config.toml" in result.stderr def test_editor_invoked_as_list_not_shell( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """subprocess.run must receive a list, never shell=True.""" captured: MsgpackDict = {} import subprocess as _sp def _fake_run(cmd: list[str] | str, **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: captured["cmd"] = cmd captured["shell"] = kwargs.get("shell", False) from subprocess import CompletedProcess return CompletedProcess([], 0) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.setenv("EDITOR", "true") monkeypatch.delenv("VISUAL", raising=False) runner.invoke(cli, ["config", "edit"]) assert isinstance(captured.get("cmd"), list) assert not captured.get("shell") def test_editor_nonzero_exit_exits_nonzero( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("EDITOR", "false") # /bin/false always exits 1 monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code != 0 def test_editor_nonzero_exit_message_contains_code( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("EDITOR", "false") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code != 0 # Message should mention the exit code number assert any(ch.isdigit() for ch in result.stderr) def test_outside_repo_exits_nonzero( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code != 0 def test_outside_repo_error_message( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert "repository" in result.stderr.lower() or "repo" in result.stderr.lower() def test_help_mentions_visual(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "edit", "--help"]) assert "VISUAL" in result.output def test_help_mentions_editor(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "edit", "--help"]) assert "EDITOR" in result.output def test_help_mentions_agent_alternative(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "edit", "--help"]) assert "muse config set" in result.output or "agent" in result.output def test_help_mentions_exit_codes(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "edit", "--help"]) assert "Exit" in result.output or "exit" in result.output def test_config_path_passed_to_editor( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Editor must receive the config.toml path as its argument.""" calls: list[list[str]] = [] import subprocess as _sp real_run = _sp.run def _fake_run(cmd: list[str], **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: calls.append(cmd) return real_run(["true"], **{k: v for k, v in kwargs.items()}) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.setenv("EDITOR", "my-editor") monkeypatch.delenv("VISUAL", raising=False) runner.invoke(cli, ["config", "edit"]) assert calls assert str(config_toml_path(repo)) in calls[0] # ── Security: run_edit ──────────────────────────────────────────────────────── class TestRunEditSecurity: """Security-focused tests for ``muse config edit``.""" def test_ansi_in_editor_env_sanitized_in_error( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("EDITOR", "\x1b[31mmalicious\x1b[0m-editor") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code != 0 assert "\x1b[" not in result.output def test_editor_invoked_without_shell( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Verify shell=True is never passed to subprocess.run.""" captured: MsgpackDict = {} import subprocess as _sp def _fake_run(cmd: list[str] | str, **kwargs: str | bool | int | None) -> _sp.CompletedProcess[bytes]: captured["shell"] = kwargs.get("shell", False) from subprocess import CompletedProcess return CompletedProcess([], 0) monkeypatch.setattr(_sp, "run", _fake_run) monkeypatch.setenv("EDITOR", "true") monkeypatch.delenv("VISUAL", raising=False) runner.invoke(cli, ["config", "edit"]) assert not captured.get("shell") def test_malformed_editor_command_exits_gracefully( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """An unparseable $EDITOR value must exit cleanly, not crash.""" # shlex.split raises ValueError on unmatched quotes monkeypatch.setenv("EDITOR", "editor 'unclosed quote") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) assert result.exit_code != 0 def test_shell_metacharacters_in_editor_not_shell_executed( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """$EDITOR with shell metacharacters must not trigger shell execution. shlex.split turns 'true; echo injected' into ['true;', 'echo', 'injected']. subprocess.run receives a list (never shell=True), so it tries to exec a binary literally named 'true;' — which doesn't exist. FileNotFoundError fires; no shell command is ever evaluated. """ monkeypatch.setenv("EDITOR", "true; echo injected") monkeypatch.delenv("VISUAL", raising=False) result = runner.invoke(cli, ["config", "edit"]) # Binary 'true;' doesn't exist → non-zero exit; no shell evaluation. assert result.exit_code != 0 # Error message quotes the editor string, not the result of shell execution. assert "Editor not found" in result.stderr # ============================================================================= # Supercharge — duration_ms + exit_code in all JSON paths # ============================================================================= _GET_FULL_KEYS = frozenset({"key", "value", "duration_ms", "exit_code"}) _SET_FULL_KEYS = frozenset({"status", "key", "value", "duration_ms", "exit_code"}) class TestElapsedSeconds: """duration_ms must appear in every JSON output path.""" def test_read_json_has_elapsed(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data def test_read_elapsed_is_float(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data["duration_ms"], float) def test_read_elapsed_non_negative(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["duration_ms"] >= 0.0 def test_get_json_has_elapsed(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data def test_get_elapsed_is_float(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data["duration_ms"], float) def test_get_elapsed_non_negative(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["duration_ms"] >= 0.0 def test_set_json_has_elapsed(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data def test_set_elapsed_is_float(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data["duration_ms"], float) def test_set_elapsed_non_negative(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["duration_ms"] >= 0.0 class TestExitCode: """exit_code must appear in every JSON output path and mirror process exit.""" def test_read_json_has_exit_code(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "exit_code" in data def test_read_exit_code_zero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["exit_code"] == 0 def test_get_json_has_exit_code(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "exit_code" in data def test_get_exit_code_zero_on_success(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["exit_code"] == 0 def test_set_json_has_exit_code(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "exit_code" in data def test_set_exit_code_zero_on_success(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["exit_code"] == 0 class TestJsonSchemaComplete: """Every JSON output path must carry the full schema — no follow-up reads needed.""" def test_read_full_keys(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "read", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data and "exit_code" in data def test_get_full_keys(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) missing = _GET_FULL_KEYS - set(data.keys()) assert not missing, f"Missing keys: {missing}" def test_set_full_keys(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 data = json.loads(result.output) missing = _SET_FULL_KEYS - set(data.keys()) assert not missing, f"Missing keys: {missing}" def test_get_output_is_single_line_json(self, repo: pathlib.Path) -> None: runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai"]) result = runner.invoke(cli, ["config", "get", "hub.url", "--json"]) assert result.exit_code == 0 json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) == 1 json.loads(json_lines[0]) def test_set_output_is_single_line_json(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["config", "set", "hub.url", "https://musehub.ai", "--json"]) assert result.exit_code == 0 json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) == 1 json.loads(json_lines[0])