"""Tests for ``muse conflicts``. Coverage tiers -------------- Unit — parser flags, _parse_conflict, _use_color, dead-code removal. Integration — no-merge state, all filters, count, exit-code, JSON schema. End-to-end — full CLI invocations: text and JSON output. Security — ANSI injection in conflict paths and branch names. Stress — 1 000 and 10 000 conflict entries, concurrent reads. """ from __future__ import annotations import json import os import pathlib import subprocess import threading import time from typing import TYPE_CHECKING import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.cli.commands.conflicts import _ConflictInfo from muse.core.merge_engine import write_merge_state from muse.core.store import get_head_commit_id if TYPE_CHECKING: import argparse runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _conflicts(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["conflicts", *extra]) def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["commit", *extra]) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one commit on ``main``, no merge in progress.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "a.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial") return tmp_path @pytest.fixture() def conflict_repo(repo: pathlib.Path) -> pathlib.Path: """Repo with an active merge state containing 2 conflicts: one whole-file (``a.py``) and one symbol-level (``src/b.py::Foo.bar``). """ sha = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=["a.py", "src/b.py::Foo.bar"], other_branch="feat", ) return repo def _write_state( repo: pathlib.Path, *, paths: list[str], branch: str = "feat", ) -> None: sha = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=paths, other_branch=branch, ) # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.conflicts import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["conflicts", *args]) def test_default_fmt_is_text(self) -> None: ns = self._parse() assert ns.fmt == "text" def test_json_flag_sets_fmt(self) -> None: ns = self._parse("--json") assert ns.fmt == "json" def test_format_json_flag(self) -> None: ns = self._parse("--format", "json") assert ns.fmt == "json" def test_default_filter_is_all(self) -> None: ns = self._parse() assert ns.kind_filter == "all" def test_filter_symbol(self) -> None: ns = self._parse("--filter", "symbol") assert ns.kind_filter == "symbol" def test_filter_file(self) -> None: ns = self._parse("--filter", "file") assert ns.kind_filter == "file" def test_filter_deleted(self) -> None: ns = self._parse("--filter", "deleted") assert ns.kind_filter == "deleted" def test_filter_modified(self) -> None: ns = self._parse("--filter", "modified") assert ns.kind_filter == "modified" def test_count_flag(self) -> None: ns = self._parse("--count") assert ns.count is True def test_count_short_flag(self) -> None: ns = self._parse("-n") assert ns.count is True def test_exit_code_flag(self) -> None: ns = self._parse("--exit-code") assert ns.exit_code is True def test_exit_code_short_flag(self) -> None: ns = self._parse("-z") assert ns.exit_code is True def test_exit_code_default_false(self) -> None: ns = self._parse() assert ns.exit_code is False # ────────────────────────────────────────────────────────────────────────────── # Unit — _parse_conflict # ────────────────────────────────────────────────────────────────────────────── class TestParseConflict: def _parse(self, path: str) -> _ConflictInfo: from muse.cli.commands.conflicts import _parse_conflict return _parse_conflict(path) def test_file_conflict_kind(self) -> None: c = self._parse("src/billing.py") assert c["kind"] == "file" assert c["file"] == "src/billing.py" assert c["symbol"] is None assert c["path"] == "src/billing.py" def test_symbol_conflict_kind(self) -> None: c = self._parse("src/billing.py::Invoice.charge") assert c["kind"] == "symbol" assert c["file"] == "src/billing.py" assert c["symbol"] == "Invoice.charge" assert c["path"] == "src/billing.py::Invoice.charge" def test_double_colon_in_symbol(self) -> None: """Only the first ``::`` splits file from symbol.""" c = self._parse("src/foo.py::A::B") assert c["kind"] == "symbol" assert c["file"] == "src/foo.py" assert c["symbol"] == "A::B" def test_ansi_in_path_sanitized(self) -> None: c = self._parse("\x1b[31mevil/path\x1b[0m") assert "\x1b" not in (c["path"] or "") assert "\x1b" not in (c["file"] or "") def test_ansi_in_symbol_sanitized(self) -> None: c = self._parse("src/a.py::\x1b[31mEvil\x1b[0m") assert "\x1b" not in (c["symbol"] or "") def test_empty_path_kind_file(self) -> None: c = self._parse("") assert c["kind"] == "file" # ────────────────────────────────────────────────────────────────────────────── # Unit — _use_color # ────────────────────────────────────────────────────────────────────────────── class TestUseColor: def test_no_color_env_disables_color(self, monkeypatch: pytest.MonkeyPatch) -> None: from muse.cli.commands.conflicts import _use_color monkeypatch.setenv("NO_COLOR", "1") assert _use_color() is False def test_term_dumb_disables_color(self, monkeypatch: pytest.MonkeyPatch) -> None: from muse.cli.commands.conflicts import _use_color monkeypatch.setenv("TERM", "dumb") assert _use_color() is False def test_no_color_empty_string_disables(self, monkeypatch: pytest.MonkeyPatch) -> None: """NO_COLOR must be present (any value) to suppress colour.""" from muse.cli.commands.conflicts import _use_color # NO_COLOR="" is truthy in the spec but empty str is falsy in Python # Spec says "any value" so we rely on presence. monkeypatch.delenv("NO_COLOR", raising=False) monkeypatch.delenv("TERM", raising=False) # Cannot assert True here since stdout is not a TTY in tests, but # we can at least confirm the function doesn't crash. result = _use_color() assert isinstance(result, bool) # ────────────────────────────────────────────────────────────────────────────── # Integration — no merge in progress # ────────────────────────────────────────────────────────────────────────────── class TestNoMerge: def test_no_merge_exits_0(self, repo: pathlib.Path) -> None: result = _conflicts(repo) assert result.exit_code == 0 def test_no_merge_text_message(self, repo: pathlib.Path) -> None: result = _conflicts(repo) assert "No merge in progress" in result.output def test_no_merge_json_schema(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--json") data = json.loads(result.output) assert data["merge_in_progress"] is False assert data["conflict_count"] == 0 assert data["conflicts"] == [] assert data["merge_from"] is None assert data["ours_commit"] is None assert data["theirs_commit"] is None assert data["base_commit"] is None def test_no_merge_json_next_steps_empty(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--json") data = json.loads(result.output) assert data["next_steps"] == {} def test_no_merge_count_is_zero(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--count") assert result.output.strip() == "0" def test_no_merge_exit_code_is_0(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--exit-code") assert result.exit_code == 0 # ────────────────────────────────────────────────────────────────────────────── # Integration — active merge with conflicts # ────────────────────────────────────────────────────────────────────────────── class TestWithConflicts: def test_exits_0_by_default(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo) assert result.exit_code == 0 def test_text_shows_conflict_count(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo) assert "2" in result.output def test_text_shows_merge_from(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo) assert "feat" in result.output def test_text_shows_file_paths(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo) assert "a.py" in result.output assert "src/b.py" in result.output def test_json_schema_completeness(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--json") data = json.loads(result.output) assert data["merge_in_progress"] is True assert data["merge_from"] == "feat" assert data["conflict_count"] == 2 assert data["total_conflict_count"] == 2 assert "ours_commit" in data assert "theirs_commit" in data assert "base_commit" in data assert isinstance(data["conflicts"], list) assert isinstance(data["next_steps"], dict) def test_json_conflict_entry_schema(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--json") data = json.loads(result.output) for entry in data["conflicts"]: assert "path" in entry assert "file" in entry assert "symbol" in entry assert "kind" in entry def test_json_file_conflict_has_null_symbol(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--json") data = json.loads(result.output) file_conflicts = [c for c in data["conflicts"] if c["kind"] == "file"] assert file_conflicts assert file_conflicts[0]["symbol"] is None def test_json_symbol_conflict_has_symbol(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--json") data = json.loads(result.output) sym_conflicts = [c for c in data["conflicts"] if c["kind"] == "symbol"] assert sym_conflicts assert sym_conflicts[0]["symbol"] == "Foo.bar" def test_json_next_steps_has_all_keys(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--json") data = json.loads(result.output) ns = data["next_steps"] assert "resolve_ours" in ns assert "resolve_theirs" in ns assert "resolve_all_ours" in ns assert "resolve_all_theirs" in ns assert "commit" in ns assert "abort" in ns # ────────────────────────────────────────────────────────────────────────────── # Integration — --filter # ────────────────────────────────────────────────────────────────────────────── class TestFilter: def test_filter_all_returns_all(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--filter", "all", "--json") data = json.loads(result.output) assert data["conflict_count"] == 2 def test_filter_symbol_returns_only_symbols(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--filter", "symbol", "--json") data = json.loads(result.output) assert data["conflict_count"] == 1 assert data["conflicts"][0]["kind"] == "symbol" def test_filter_file_returns_only_files(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--filter", "file", "--json") data = json.loads(result.output) assert data["conflict_count"] == 1 assert data["conflicts"][0]["kind"] == "file" def test_filter_deleted_returns_file_conflicts(self, conflict_repo: pathlib.Path) -> None: """'deleted' maps to whole-file conflicts (one side removed the file).""" result = _conflicts(conflict_repo, "--filter", "deleted", "--json") data = json.loads(result.output) assert data["conflict_count"] == 1 assert all(c["kind"] == "file" for c in data["conflicts"]) def test_filter_modified_returns_symbol_conflicts(self, conflict_repo: pathlib.Path) -> None: """'modified' maps to symbol-level edit conflicts.""" result = _conflicts(conflict_repo, "--filter", "modified", "--json") data = json.loads(result.output) assert data["conflict_count"] == 1 assert all(c["kind"] == "symbol" for c in data["conflicts"]) def test_filter_total_count_reflects_all(self, conflict_repo: pathlib.Path) -> None: """total_conflict_count is always the unfiltered total.""" result = _conflicts(conflict_repo, "--filter", "symbol", "--json") data = json.loads(result.output) assert data["total_conflict_count"] == 2 assert data["conflict_count"] == 1 def test_filter_no_match_exits_0(self, repo: pathlib.Path) -> None: _write_state(repo, paths=["a.py"]) # only file conflicts result = _conflicts(repo, "--filter", "symbol") assert result.exit_code == 0 assert "No conflicts match" in result.output def test_filter_symbol_empty_exits_0(self, repo: pathlib.Path) -> None: _write_state(repo, paths=["a.py"]) result = _conflicts(repo, "--filter", "symbol", "--json") data = json.loads(result.output) assert data["conflict_count"] == 0 def test_filter_symbol_count_respects_filter(self, conflict_repo: pathlib.Path) -> None: """--filter symbol --count must count only symbol conflicts.""" result = _conflicts(conflict_repo, "--filter", "symbol", "--count") assert result.output.strip() == "1" def test_filter_file_count_respects_filter(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--filter", "file", "--count") assert result.output.strip() == "1" def test_filter_deleted_count_respects_filter(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--filter", "deleted", "--count") assert result.output.strip() == "1" def test_filter_modified_count_respects_filter(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--filter", "modified", "--count") assert result.output.strip() == "1" def test_filter_all_count(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--count") assert result.output.strip() == "2" # ────────────────────────────────────────────────────────────────────────────── # Integration — --count # ────────────────────────────────────────────────────────────────────────────── class TestCount: def test_count_is_numeric(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--count") assert result.output.strip().isdigit() def test_count_matches_json(self, conflict_repo: pathlib.Path) -> None: count_r = _conflicts(conflict_repo, "--count") json_r = _conflicts(conflict_repo, "--json") data = json.loads(json_r.output) assert int(count_r.output.strip()) == data["conflict_count"] def test_count_no_merge(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--count") assert result.output.strip() == "0" def test_count_exit_code_zero_when_clean(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--count") assert result.exit_code == 0 def test_count_exit_code_with_exit_code_flag(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--count", "--exit-code") assert result.exit_code == 1 def test_count_exit_code_zero_when_filtered_to_zero(self, repo: pathlib.Path) -> None: _write_state(repo, paths=["a.py"]) # only file conflicts result = _conflicts(repo, "--filter", "symbol", "--count", "--exit-code") assert result.output.strip() == "0" assert result.exit_code == 0 # ────────────────────────────────────────────────────────────────────────────── # Integration — --exit-code # ────────────────────────────────────────────────────────────────────────────── class TestExitCode: def test_exit_code_1_when_conflicts(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--exit-code") assert result.exit_code == 1 def test_exit_code_0_when_no_merge(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--exit-code") assert result.exit_code == 0 def test_exit_code_0_when_filter_matches_zero(self, repo: pathlib.Path) -> None: _write_state(repo, paths=["a.py"]) # file only result = _conflicts(repo, "--filter", "symbol", "--exit-code") assert result.exit_code == 0 def test_exit_code_1_with_json(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--exit-code", "--json") assert result.exit_code == 1 data = json.loads(result.output) assert data["conflict_count"] > 0 def test_exit_code_still_outputs_json(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--exit-code", "--json") data = json.loads(result.output) assert "conflicts" in data def test_exit_code_still_outputs_text(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--exit-code") assert "conflict" in result.output.lower() def test_exit_code_with_all_resolved(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=[], # all resolved other_branch="feat", ) result = _conflicts(repo, "--exit-code") assert result.exit_code == 0 # ────────────────────────────────────────────────────────────────────────────── # Integration — all-resolved state # ────────────────────────────────────────────────────────────────────────────── class TestAllResolved: def test_all_resolved_exits_0(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=[], other_branch="feat", ) result = _conflicts(repo) assert result.exit_code == 0 def test_all_resolved_text_message(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=[], other_branch="feat", ) result = _conflicts(repo) assert "All conflicts resolved" in result.output def test_all_resolved_json_count_zero(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=[], other_branch="feat", ) result = _conflicts(repo, "--json") data = json.loads(result.output) assert data["conflict_count"] == 0 assert data["merge_in_progress"] is True # ────────────────────────────────────────────────────────────────────────────── # Integration — JSON schema stable keys # ────────────────────────────────────────────────────────────────────────────── class TestJsonSchemastability: REQUIRED_KEYS = { "merge_in_progress", "merge_from", "ours_commit", "theirs_commit", "base_commit", "conflict_count", "total_conflict_count", "conflicts", "next_steps", } def test_no_merge_has_all_keys(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in no-merge JSON: {missing}" def test_with_conflicts_has_all_keys(self, conflict_repo: pathlib.Path) -> None: result = _conflicts(conflict_repo, "--json") data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys in conflict JSON: {missing}" def test_no_merge_all_nullable_fields_are_null(self, repo: pathlib.Path) -> None: result = _conflicts(repo, "--json") data = json.loads(result.output) assert data["merge_from"] is None assert data["ours_commit"] is None assert data["theirs_commit"] is None assert data["base_commit"] is None # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: ESC = "\x1b[" def _setup_ansi_state(self, repo: pathlib.Path) -> None: sha = get_head_commit_id(repo, "main") or "" write_merge_state( repo, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=[f"{self.ESC}31mevil/path{self.ESC}0m", f"src/ok.py::{self.ESC}31mEvilSymbol{self.ESC}0m"], other_branch=f"{self.ESC}31mevil-branch{self.ESC}0m", ) def test_ansi_in_merge_from_sanitized_in_text(self, repo: pathlib.Path) -> None: self._setup_ansi_state(repo) result = _conflicts(repo) assert self.ESC not in result.output def test_ansi_in_conflict_path_sanitized_in_text(self, repo: pathlib.Path) -> None: self._setup_ansi_state(repo) result = _conflicts(repo) assert self.ESC not in result.output def test_ansi_in_merge_from_sanitized_in_json(self, repo: pathlib.Path) -> None: self._setup_ansi_state(repo) result = _conflicts(repo, "--json") data = json.loads(result.output) assert self.ESC not in (data.get("merge_from") or "") def test_ansi_in_conflict_paths_sanitized_in_json(self, repo: pathlib.Path) -> None: self._setup_ansi_state(repo) result = _conflicts(repo, "--json") data = json.loads(result.output) for entry in data["conflicts"]: assert self.ESC not in (entry.get("path") or "") assert self.ESC not in (entry.get("file") or "") assert self.ESC not in (entry.get("symbol") or "") def test_ansi_in_symbol_sanitized_in_json(self, repo: pathlib.Path) -> None: self._setup_ansi_state(repo) result = _conflicts(repo, "--json") data = json.loads(result.output) sym_conflicts = [c for c in data["conflicts"] if c["kind"] == "symbol"] for c in sym_conflicts: assert self.ESC not in (c.get("symbol") or "") def test_no_ansi_in_count_output(self, repo: pathlib.Path) -> None: self._setup_ansi_state(repo) result = _conflicts(repo, "--count") assert self.ESC not in result.output # ────────────────────────────────────────────────────────────────────────────── # Stress # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStress: def test_1000_conflicts_fast(self, repo: pathlib.Path) -> None: """Listing 1000 conflicts must complete in under 500ms.""" paths = [f"src/module_{i:04d}.py::Func{i}" for i in range(1000)] _write_state(repo, paths=paths) t0 = time.perf_counter() result = _conflicts(repo, "--json") elapsed = (time.perf_counter() - t0) * 1000 data = json.loads(result.output) assert data["conflict_count"] == 1000 assert elapsed < 500, f"1000 conflicts took {elapsed:.0f}ms (limit 500ms)" def test_10000_conflicts_fast(self, repo: pathlib.Path) -> None: """Listing 10 000 conflicts must complete in under 2s.""" paths = [f"src/m_{i:05d}.py::F{i}" for i in range(10000)] _write_state(repo, paths=paths) t0 = time.perf_counter() result = _conflicts(repo, "--count") elapsed = (time.perf_counter() - t0) * 1000 assert result.output.strip() == "10000" assert elapsed < 2000, f"10000 conflicts took {elapsed:.0f}ms (limit 2s)" def test_1000_file_conflicts_filter_fast(self, repo: pathlib.Path) -> None: """Filtering 1000 file conflicts must be fast.""" paths = [f"file_{i:04d}.py" for i in range(500)] + \ [f"src/s_{i:04d}.py::Sym{i}" for i in range(500)] _write_state(repo, paths=paths) t0 = time.perf_counter() result = _conflicts(repo, "--filter", "file", "--count") elapsed = (time.perf_counter() - t0) * 1000 assert result.output.strip() == "500" assert elapsed < 500, f"Filtered count of 500 took {elapsed:.0f}ms" def test_concurrent_reads_do_not_corrupt(self, repo: pathlib.Path) -> None: """Multiple threads reading conflict state must not interfere.""" paths = [f"src/c_{i}.py::F{i}" for i in range(200)] _write_state(repo, paths=paths) errors: list[str] = [] def read_conflicts() -> None: try: result = _conflicts(repo, "--json") data = json.loads(result.output) if data["conflict_count"] != 200: errors.append(f"Expected 200, got {data['conflict_count']}") except Exception as e: errors.append(str(e)) threads = [threading.Thread(target=read_conflicts) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() assert not errors, "Concurrent read errors:\n" + "\n".join(errors) def test_concurrent_reads_separate_repos(self, tmp_path: pathlib.Path) -> None: """Separate repos read concurrently must not interfere.""" errors: list[str] = [] def check_repo(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True) (repo_dir / "x.py").write_text(f"x={idx}\n") subprocess.run( ["muse", "commit", "-m", f"base{idx}"], cwd=str(repo_dir), capture_output=True, ) paths = [f"f{i}.py" for i in range(50)] from muse.core.store import get_head_commit_id as _ghci sha = _ghci(repo_dir, "main") or "" write_merge_state( repo_dir, base_commit=sha, ours_commit=sha, theirs_commit=sha, conflict_paths=paths, other_branch="feat", ) r = subprocess.run( ["muse", "conflicts", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) try: data = json.loads(r.stdout) if data["conflict_count"] != 50: errors.append(f"repo_{idx}: expected 50, got {data['conflict_count']}") except Exception as e: errors.append(f"repo_{idx}: {e}") threads = [threading.Thread(target=check_repo, args=(i,)) for i in range(5)] for t in threads: t.start() for t in threads: t.join() assert not errors, "Concurrent multi-repo errors:\n" + "\n".join(errors)