"""Hardening tests for ``muse merge`` — security, schema, error routing. These tests cover the 9 issues fixed in the security/correctness/agent-UX audit. They are intentionally distinct from the existing test_cmd_merge.py and test_cmd_merge_dry_run.py suites, which cover the core merge algorithm. Coverage tiers -------------- Unit — parser flags, dead-code removal, _use_color, _semver_from_op_log. Integration — error routing to stderr, JSON schema stability across all statuses. End-to-end — full CLI: security, branch-name sanitization, abort, strategy JSON. Security — ANSI injection in branch names, commit messages, conflict paths. Stress — large merges, concurrent repos, abort+re-merge cycles. """ from __future__ import annotations import json import os import pathlib import subprocess import threading import time from typing import TYPE_CHECKING import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import read_commit from muse.core.paths import merge_state_path if TYPE_CHECKING: import argparse runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── JSON_REQUIRED_KEYS = { "status", "commit_id", "branch", "current_branch", "base_commit_id", "conflicts", "files_changed", "semver_impact", "strategy", "dry_run", } def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _merge(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["merge", *extra]) def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: _invoke(repo, ["code", "add", "."]) return _invoke(repo, ["commit", *extra]) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one commit on ``main``.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "a.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") return tmp_path @pytest.fixture() def ff_repo(repo: pathlib.Path) -> pathlib.Path: """Repo where ``feat`` is strictly ahead of ``main`` → fast-forward.""" _invoke(repo, ["branch", "feat"]) _invoke(repo, ["checkout", "feat"]) (repo / "b.py").write_text("y = 2\n") _commit(repo, "-m", "feat add b") _invoke(repo, ["checkout", "main"]) return repo @pytest.fixture() def three_way_repo(repo: pathlib.Path) -> pathlib.Path: """Repo requiring a clean three-way merge (both sides diverged).""" _invoke(repo, ["branch", "feat"]) _invoke(repo, ["checkout", "feat"]) (repo / "b.py").write_text("y = 2\n") _commit(repo, "-m", "feat add b") _invoke(repo, ["checkout", "main"]) (repo / "c.py").write_text("z = 3\n") _commit(repo, "-m", "main add c") return repo @pytest.fixture() def conflict_repo(repo: pathlib.Path) -> pathlib.Path: """Repo where both sides modified the same file — conflict.""" _invoke(repo, ["branch", "feat"]) _invoke(repo, ["checkout", "feat"]) (repo / "a.py").write_text("x = 999\n") _commit(repo, "-m", "feat modify a") _invoke(repo, ["checkout", "main"]) (repo / "a.py").write_text("x = 42\n") _commit(repo, "-m", "main modify a") return repo # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.merge import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["merge", *args]) def test_default_json_is_false(self) -> None: ns = self._parse("feat") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("feat", "--json") assert ns.json_out is True def test_dry_run_default_false(self) -> None: ns = self._parse("feat") assert ns.dry_run is False def test_dry_run_flag(self) -> None: ns = self._parse("feat", "--dry-run") assert ns.dry_run is True def test_strategy_default_none(self) -> None: ns = self._parse("feat") assert ns.strategy is None def test_strategy_ours(self) -> None: ns = self._parse("feat", "--strategy", "ours") assert ns.strategy == "ours" def test_strategy_theirs(self) -> None: ns = self._parse("feat", "--strategy", "theirs") assert ns.strategy == "theirs" def test_no_ff_default_false(self) -> None: ns = self._parse("feat") assert ns.no_ff is False def test_abort_default_false(self) -> None: ns = self._parse() assert ns.abort is False def test_harmony_autoupdate_default_true(self) -> None: ns = self._parse("feat") assert ns.harmony_autoupdate is True def test_no_harmony_autoupdate(self) -> None: ns = self._parse("feat", "--no-harmony-autoupdate") assert ns.harmony_autoupdate is False # ────────────────────────────────────────────────────────────────────────────── # Unit — dead-code removal # ────────────────────────────────────────────────────────────────────────────── class TestDeadCodeRemoved: def test_read_branch_wrapper_removed(self) -> None: import muse.cli.commands.merge as m assert not hasattr(m, "_read_branch"), ( "_read_branch was a dead one-liner wrapper and must be deleted" ) def test_restore_from_manifest_wrapper_removed(self) -> None: import muse.cli.commands.merge as m assert not hasattr(m, "_restore_from_manifest"), ( "_restore_from_manifest was a dead one-liner wrapper and must be deleted" ) # ────────────────────────────────────────────────────────────────────────────── # Unit — _use_color # ────────────────────────────────────────────────────────────────────────────── class TestUseColor: def test_no_color_env_disables_color(self, monkeypatch: pytest.MonkeyPatch) -> None: from muse.core.terminal import use_color monkeypatch.setenv("NO_COLOR", "1") assert use_color() is False def test_term_dumb_disables_color(self, monkeypatch: pytest.MonkeyPatch) -> None: from muse.core.terminal import use_color monkeypatch.setenv("TERM", "dumb") assert use_color() is False def test_c_helper_respects_use_color(self, monkeypatch: pytest.MonkeyPatch) -> None: from muse.cli.commands.merge import _c, _GREEN monkeypatch.setenv("NO_COLOR", "1") result = _c("hello", _GREEN) assert "\x1b[" not in result assert result == "hello" # ────────────────────────────────────────────────────────────────────────────── # Unit — _semver_from_op_log # ────────────────────────────────────────────────────────────────────────────── class TestSemverFromOpLog: """Verify _semver_from_op_log with an empty list (the only type-safe call site). Non-empty behaviour is exercised via dry-run integration tests below, which receive semver_impact in the JSON output after a real plugin diff.""" def test_empty_returns_empty(self) -> None: from muse.cli.commands.merge import _semver_from_op_log assert _semver_from_op_log([]) == "" def test_semver_impact_present_in_dry_run_json( self, three_way_repo: pathlib.Path ) -> None: """semver_impact is always a str in the dry-run JSON schema.""" result = _merge(three_way_repo, "feat", "--dry-run", "--json") data = json.loads(result.output) assert "semver_impact" in data assert isinstance(data["semver_impact"], str) def test_semver_impact_present_in_live_merge_json( self, three_way_repo: pathlib.Path ) -> None: result = _merge(three_way_repo, "feat", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data["semver_impact"], str) # ────────────────────────────────────────────────────────────────────────────── # Integration — error routing to stderr # ────────────────────────────────────────────────────────────────────────────── class TestErrorRouting: def test_merge_itself_error_to_stderr(self, repo: pathlib.Path) -> None: result = _merge(repo, "main") assert result.exit_code == 1 assert "Cannot merge" in (result.stderr or "") assert "Cannot merge" not in result.output.replace(result.stderr or "", "") def test_no_branch_arg_error_to_stderr(self, repo: pathlib.Path) -> None: result = _merge(repo) assert result.exit_code == 1 assert "Usage" in (result.stderr or "") def test_nonexistent_branch_error_to_stderr(self, repo: pathlib.Path) -> None: result = _merge(repo, "ghost-branch") assert result.exit_code == 1 assert "no commits" in (result.stderr or "").lower() def test_unrecognized_flag_exits_nonzero(self, repo: pathlib.Path) -> None: result = _merge(repo, "main", "--no-such-flag") assert result.exit_code != 0 def test_conflict_error_to_stderr(self, conflict_repo: pathlib.Path) -> None: result = _merge(conflict_repo, "feat") assert result.exit_code == 1 assert "conflict" in (result.stderr or "").lower() def test_abort_no_merge_error_to_stderr(self, repo: pathlib.Path) -> None: result = _merge(repo, "--abort") assert result.exit_code == 1 assert "No merge in progress" in (result.stderr or "") # ────────────────────────────────────────────────────────────────────────────── # Integration — JSON schema stability # ────────────────────────────────────────────────────────────────────────────── class TestJsonSchema: def test_up_to_date_has_all_keys(self, ff_repo: pathlib.Path) -> None: """First merge puts us at up-to-date; merge again to get up_to_date status.""" _merge(ff_repo, "feat") result = _merge(ff_repo, "feat", "--json") data = json.loads(result.output) assert data["status"] == "up_to_date" missing = JSON_REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in up_to_date JSON: {missing}" def test_fast_forward_has_all_keys(self, ff_repo: pathlib.Path) -> None: result = _merge(ff_repo, "feat", "--json") data = json.loads(result.output) assert data["status"] == "fast_forward" missing = JSON_REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in fast_forward JSON: {missing}" def test_three_way_merged_has_all_keys(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["status"] == "merged" missing = JSON_REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in three-way merged JSON: {missing}" def test_conflict_has_all_keys(self, conflict_repo: pathlib.Path) -> None: result = _merge(conflict_repo, "feat", "--json") assert result.exit_code == 1 data = json.loads(result.output) assert data["status"] == "conflict" # conflict status uses same schema (minus commit_id which is null) core_keys = JSON_REQUIRED_KEYS - {"symbol_conflicts"} missing = core_keys - set(data) assert not missing, f"Missing keys in conflict JSON: {missing}" def test_dry_run_merged_has_all_keys(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["dry_run"] is True missing = JSON_REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in dry-run merged JSON: {missing}" def test_strategy_ours_has_all_keys(self, conflict_repo: pathlib.Path) -> None: result = _merge(conflict_repo, "feat", "--strategy", "ours", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["strategy"] == "ours" missing = JSON_REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in strategy=ours JSON: {missing}" def test_strategy_theirs_has_all_keys(self, conflict_repo: pathlib.Path) -> None: result = _merge(conflict_repo, "feat", "--strategy", "theirs", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["strategy"] == "theirs" missing = JSON_REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in strategy=theirs JSON: {missing}" def test_fast_forward_has_base_commit_id(self, ff_repo: pathlib.Path) -> None: result = _merge(ff_repo, "feat", "--json") data = json.loads(result.output) assert "base_commit_id" in data assert data["base_commit_id"] is not None # FF always has a base def test_three_way_has_files_changed(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--json") assert result.exit_code == 0 data = json.loads(result.output) fc = data["files_changed"] assert "added" in fc and "modified" in fc and "deleted" in fc def test_three_way_has_semver_impact(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "semver_impact" in data assert isinstance(data["semver_impact"], str) def test_three_way_has_strategy_null(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["strategy"] is None def test_three_way_has_dry_run_false(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["dry_run"] is False def test_dry_run_commit_id_is_null(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--dry-run", "--json") data = json.loads(result.output) assert data["commit_id"] is None def test_live_merge_commit_id_is_sha(self, three_way_repo: pathlib.Path) -> None: result = _merge(three_way_repo, "feat", "--json") data = json.loads(result.output) assert data["commit_id"] is not None assert data["commit_id"].startswith("sha256:") # canonical OID def test_files_changed_correct_for_ff(self, ff_repo: pathlib.Path) -> None: result = _merge(ff_repo, "feat", "--json") data = json.loads(result.output) fc = data["files_changed"] assert fc["added"] == 1 # b.py added on feat assert fc["modified"] == 0 assert fc["deleted"] == 0 # ────────────────────────────────────────────────────────────────────────────── # Integration — abort # ────────────────────────────────────────────────────────────────────────────── class TestAbort: def test_abort_no_merge_exits_1(self, repo: pathlib.Path) -> None: result = _merge(repo, "--abort") assert result.exit_code == 1 def test_abort_no_merge_json(self, repo: pathlib.Path) -> None: result = _merge(repo, "--abort", "--json") data = json.loads(result.output) assert data["error"] == "no_merge_in_progress" def test_abort_restores_working_tree(self, conflict_repo: pathlib.Path) -> None: original = (conflict_repo / "a.py").read_text() _merge(conflict_repo, "feat") # leaves conflict state # Verify conflict state was created assert (merge_state_path(conflict_repo)).exists() result = _merge(conflict_repo, "--abort") assert result.exit_code == 0 # MERGE_STATE should be gone assert not (merge_state_path(conflict_repo)).exists() def test_abort_json_has_status(self, conflict_repo: pathlib.Path) -> None: _merge(conflict_repo, "feat") result = _merge(conflict_repo, "--abort", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["status"] == "aborted" assert "restored_to" in data def test_abort_uses_read_merge_state(self, conflict_repo: pathlib.Path) -> None: """_run_abort must use read_merge_state() not raw json.loads().""" import inspect from muse.cli.commands.merge import _run_abort src = inspect.getsource(_run_abort) assert "json.loads" not in src, ( "_run_abort must use read_merge_state() instead of raw json.loads() " "to benefit from schema validation" ) assert "read_merge_state" in src # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: ESC = "\x1b[" def test_error_routing_no_ansi_in_stdout(self, repo: pathlib.Path) -> None: """Error messages for invalid operations must not bleed ANSI into stdout.""" result = _merge(repo, "main") # merge into itself assert self.ESC not in result.output.replace(result.stderr or "", "") def test_ansi_in_strategy_arg_sanitized_in_stderr(self, repo: pathlib.Path) -> None: result = _merge(repo, "main", "--strategy", f"{self.ESC}31mxml{self.ESC}0m") assert self.ESC not in (result.stderr or "") def test_conflict_paths_sanitized_in_json(self, conflict_repo: pathlib.Path) -> None: """Any ANSI that leaked into conflict_paths must be sanitized in JSON output.""" result = _merge(conflict_repo, "feat", "--json") data = json.loads(result.output) for path in data.get("conflicts", []): assert self.ESC not in path def test_merge_message_sanitized_in_commit(self, three_way_repo: pathlib.Path) -> None: """Branch name embedded in merge commit message must be ANSI-clean.""" result = _merge(three_way_repo, "feat") assert result.exit_code == 0 cid = get_head_commit_id(three_way_repo, "main") assert cid is not None commit = read_commit(three_way_repo, cid) assert commit is not None assert self.ESC not in commit.message def test_applied_strategies_sanitized(self, three_way_repo: pathlib.Path) -> None: """Any applied_strategies entries are sanitized before printing.""" result = _merge(three_way_repo, "feat") assert self.ESC not in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — strategy shortcuts # ────────────────────────────────────────────────────────────────────────────── class TestStrategy: def test_strategy_ours_resolves_conflict(self, conflict_repo: pathlib.Path) -> None: result = _merge(conflict_repo, "feat", "--strategy", "ours") assert result.exit_code == 0 def test_strategy_ours_keeps_our_content(self, conflict_repo: pathlib.Path) -> None: _merge(conflict_repo, "feat", "--strategy", "ours") content = (conflict_repo / "a.py").read_text() assert "42" in content # main's version def test_strategy_theirs_keeps_their_content(self, conflict_repo: pathlib.Path) -> None: _merge(conflict_repo, "feat", "--strategy", "theirs") content = (conflict_repo / "a.py").read_text() assert "999" in content # feat's version def test_strategy_ours_creates_merge_commit(self, conflict_repo: pathlib.Path) -> None: before = get_head_commit_id(conflict_repo, "main") _merge(conflict_repo, "feat", "--strategy", "ours") after = get_head_commit_id(conflict_repo, "main") assert after != before def test_strategy_json_has_correct_strategy_field(self, conflict_repo: pathlib.Path) -> None: result = _merge(conflict_repo, "feat", "--strategy", "ours", "--json") data = json.loads(result.output) assert data["strategy"] == "ours" def test_strategy_json_has_files_changed(self, conflict_repo: pathlib.Path) -> None: result = _merge(conflict_repo, "feat", "--strategy", "ours", "--json") data = json.loads(result.output) assert "files_changed" in data def test_strategy_dry_run_does_not_write(self, conflict_repo: pathlib.Path) -> None: before = get_head_commit_id(conflict_repo, "main") # --dry-run bypasses strategy shortcuts; simulates three-way instead result = _merge(conflict_repo, "feat", "--strategy", "ours", "--dry-run") after = get_head_commit_id(conflict_repo, "main") assert before == after # ────────────────────────────────────────────────────────────────────────────── # Stress # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStress: def test_merge_100_file_branch_fast(self, repo: pathlib.Path) -> None: """Merging 100 new files must complete in under 5s.""" _invoke(repo, ["branch", "big-feat"]) _invoke(repo, ["checkout", "big-feat"]) for i in range(100): (repo / f"f{i:03d}.py").write_text(f"x={i}\n") _commit(repo, "-m", "add 100 files") _invoke(repo, ["checkout", "main"]) (repo / "main_extra.py").write_text("m=1\n") _commit(repo, "-m", "main diverges") t0 = time.perf_counter() result = _merge(repo, "big-feat") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 5000, f"100-file merge took {elapsed:.0f}ms (limit 5s)" def test_dry_run_100_file_branch_fast(self, repo: pathlib.Path) -> None: """Dry-run of a 100-file merge must complete in under 3s.""" _invoke(repo, ["branch", "big-feat"]) _invoke(repo, ["checkout", "big-feat"]) for i in range(100): (repo / f"g{i:03d}.py").write_text(f"y={i}\n") _commit(repo, "-m", "add 100 files") _invoke(repo, ["checkout", "main"]) (repo / "main_extra2.py").write_text("m=2\n") _commit(repo, "-m", "main diverges") t0 = time.perf_counter() result = _merge(repo, "big-feat", "--dry-run") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 3000, f"100-file dry-run took {elapsed:.0f}ms (limit 3s)" def test_abort_cycle_10_times(self, conflict_repo: pathlib.Path) -> None: """Abort should cleanly reset MERGE_STATE each time.""" for i in range(10): r_merge = _merge(conflict_repo, "feat") assert r_merge.exit_code == 1 # conflict assert (merge_state_path(conflict_repo)).exists() r_abort = _merge(conflict_repo, "--abort") assert r_abort.exit_code == 0 assert not (merge_state_path(conflict_repo)).exists() def test_concurrent_merges_separate_repos(self, tmp_path: pathlib.Path) -> None: """Multiple repos merging concurrently must not interfere.""" errors: list[str] = [] def do_merge(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) (repo_dir / "a.py").write_text(f"x={idx}\n") subprocess.run( ["muse", "commit", "-m", f"base{idx}"], cwd=str(repo_dir), capture_output=True, ) subprocess.run( ["muse", "branch", "feat"], cwd=str(repo_dir), capture_output=True ) subprocess.run( ["muse", "checkout", "feat"], cwd=str(repo_dir), capture_output=True ) (repo_dir / "b.py").write_text(f"y={idx}\n") subprocess.run( ["muse", "code", "add", "."], cwd=str(repo_dir), capture_output=True, ) subprocess.run( ["muse", "commit", "-m", f"feat{idx}"], cwd=str(repo_dir), capture_output=True, ) subprocess.run( ["muse", "checkout", "main"], cwd=str(repo_dir), capture_output=True ) r = subprocess.run( ["muse", "merge", "feat", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) if r.returncode != 0: errors.append(f"repo_{idx}: exit={r.returncode}") return try: data = json.loads(r.stdout) if data["status"] not in ("fast_forward", "merged"): errors.append(f"repo_{idx}: unexpected status {data['status']}") except Exception as e: errors.append(f"repo_{idx}: {e}") threads = [threading.Thread(target=do_merge, args=(i,)) for i in range(6)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent merge errors:\n{'\n'.join(errors)}"