"""Tests for muse.core.merge_engine — three-way merge logic. Extended to cover the address-keyed map merge path via :func:`~muse.core.op_merge.merge_structured` and the :class:`~muse.domain.AddressedMergePlugin` integration. """ from collections.abc import Mapping import datetime import json import pathlib import unittest.mock import pytest from muse.core.types import blob_id, long_id from muse.core.merge_engine import ( MergeState, apply_merge, clear_merge_state, detect_conflicts, diff_snapshots, find_merge_base, read_merge_state, write_merge_state, ) from muse.core.op_merge import MergeOpsResult, merge_op_lists, merge_structured from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.domain import ( DeleteOp, DomainOp, InsertOp, ReplaceOp, SnapshotManifest, AddressedMergePlugin, StructuredDelta, ) from muse.core.attributes import AttributeRule from muse.plugins.code.plugin import CodePlugin from muse.plugins.midi.plugin import MidiPlugin from muse.core.paths import muse_dir _OID_AAA = blob_id(b"aaa") _OID_OLD = blob_id(b"old") _OID_NEW = blob_id(b"new") _OID_BASE = blob_id(b"base") _OID_OURS = blob_id(b"ours") _OID_THEIRS = blob_id(b"theirs") _OID_K = blob_id(b"k") _OID_FIXED = blob_id(b"fixed") @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) return tmp_path def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> str: """Write a commit with a valid content-hash commit_id. Returns the actual commit_id.""" snap_id = compute_snapshot_id({}) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [p for p in [parent, parent2] if p is not None] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=cid, committed_at_iso=committed_at.isoformat(), ) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=cid, committed_at=committed_at, parent_commit_id=parent, parent2_commit_id=parent2, )) return commit_id class TestDiffSnapshots: def test_no_change(self) -> None: m = {"a.mid": "h1", "b.mid": "h2"} assert diff_snapshots(m, m) == set() def test_added(self) -> None: assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"} def test_removed(self) -> None: assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"} def test_modified(self) -> None: assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"} class TestDetectConflicts: def test_no_conflict_disjoint(self) -> None: ours_m = {"a.mid": "h_a"} theirs_m = {"b.mid": "h_b"} assert detect_conflicts({"a.mid"}, {"b.mid"}, ours_m, theirs_m) == set() def test_conflict_divergent_content(self) -> None: ours_m = {"a.mid": "h_a", "b.mid": "h_b_ours"} theirs_m = {"b.mid": "h_b_theirs", "c.mid": "h_c"} assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}, ours_m, theirs_m) == {"b.mid"} def test_both_empty(self) -> None: assert detect_conflicts(set(), set(), {}, {}) == set() def test_convergent_both_delete(self) -> None: """Both branches deleted the same file — convergent, NOT a conflict.""" ours_m: Manifest = {} # a.py deleted theirs_m: Manifest = {} # a.py deleted assert detect_conflicts({"a.py"}, {"a.py"}, ours_m, theirs_m) == set() def test_convergent_same_add(self) -> None: """Both branches independently added the same file with identical content.""" ours_m = {"new.py": "hash_n"} theirs_m = {"new.py": "hash_n"} assert detect_conflicts({"new.py"}, {"new.py"}, ours_m, theirs_m) == set() def test_delete_vs_modify_is_conflict(self) -> None: """One side deleted, other modified — genuinely divergent.""" ours_m: Manifest = {} # deleted a.py theirs_m = {"a.py": "hash_new"} # modified a.py assert detect_conflicts({"a.py"}, {"a.py"}, ours_m, theirs_m) == {"a.py"} class TestApplyMerge: def test_clean_merge(self) -> None: base = {"a.mid": "h0", "b.mid": "h0"} ours = {"a.mid": "h_ours", "b.mid": "h0"} theirs = {"a.mid": "h0", "b.mid": "h_theirs"} ours_changed = {"a.mid"} theirs_changed = {"b.mid"} result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set()) assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"} def test_conflict_paths_excluded(self) -> None: base = {"a.mid": "h0"} ours = {"a.mid": "h_ours"} theirs = {"a.mid": "h_theirs"} ours_changed = theirs_changed = {"a.mid"} result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"}) assert result == {"a.mid": "h0"} # Falls back to base def test_ours_deletion_applied(self) -> None: base = {"a.mid": "h0", "b.mid": "h0"} ours = {"b.mid": "h0"} # a.mid deleted on ours theirs = {"a.mid": "h0", "b.mid": "h0"} result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set()) assert "a.mid" not in result class TestMergeStateIO: def test_write_and_read(self, repo: pathlib.Path) -> None: base_id = long_id("b" * 64) ours_id = long_id("1" * 64) theirs_id = long_id("2" * 64) write_merge_state( repo, base_commit=base_id, ours_commit=ours_id, theirs_commit=theirs_id, conflict_paths=["a.mid", "b.mid"], other_branch="feature/x", ) state = read_merge_state(repo) assert state is not None assert state.base_commit == base_id assert state.conflict_paths == ["a.mid", "b.mid"] assert state.other_branch == "feature/x" def test_read_no_state(self, repo: pathlib.Path) -> None: assert read_merge_state(repo) is None def test_clear(self, repo: pathlib.Path) -> None: write_merge_state(repo, base_commit=long_id("b" * 64), ours_commit=long_id("c" * 64), theirs_commit=long_id("d" * 64), conflict_paths=[]) clear_merge_state(repo) assert read_merge_state(repo) is None class TestFindMergeBase: def test_direct_parent(self, repo: pathlib.Path) -> None: root_id = _commit(repo, "root") a_id = _commit(repo, "a", parent=root_id) b_id = _commit(repo, "b", parent=root_id) base = find_merge_base(repo, a_id, b_id) assert base == root_id def test_same_commit(self, repo: pathlib.Path) -> None: _commit(repo, "root") base = find_merge_base(repo, "root", "root") assert base == "root" def test_linear_history(self, repo: pathlib.Path) -> None: a_id = _commit(repo, "a") b_id = _commit(repo, "b", parent=a_id) c_id = _commit(repo, "c", parent=b_id) base = find_merge_base(repo, c_id, b_id) assert base == b_id def test_no_common_ancestor(self, repo: pathlib.Path) -> None: x_id = _commit(repo, "x") y_id = _commit(repo, "y") assert find_merge_base(repo, x_id, y_id) is None def test_bidirectional_terminates_early(self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """Bidirectional BFS reads O(distance_to_LCA) commits, not O(total_history).""" import muse.core.graph as graph_mod from muse.core.commits import read_commit # 100-commit chain: root → c0 → ... → c97 → head_a # ↘ feat (branches from c97) # LCA = c97, one hop from each tip root = _commit(repo, "root") tip = root for i in range(97): tip = _commit(repo, f"c{i}", parent=tip) lca = tip head_a = _commit(repo, "head_a", parent=lca) feat = _commit(repo, "feat", parent=lca) call_count = 0 original = read_commit def counting_read(rr: pathlib.Path, cid: str) -> CommitRecord | None: nonlocal call_count call_count += 1 return original(rr, cid) monkeypatch.setattr(graph_mod, "read_commit", counting_read) base = find_merge_base(repo, head_a, feat) assert base == lca # Bidirectional BFS finds LCA in ~2 reads (one per tip). # Old two-phase BFS read all 99 A ancestors before touching B. assert call_count <= 10 def test_deep_chain_diamond(self, repo: pathlib.Path) -> None: """LCA correct for a long chain with diverging feature branches.""" root = _commit(repo, "root") tip = root for i in range(50): tip = _commit(repo, f"m{i}", parent=tip) lca = tip branch_a = _commit(repo, "a0", parent=lca) branch_b = _commit(repo, "b0", parent=lca) for i in range(1, 5): branch_a = _commit(repo, f"a{i}", parent=branch_a) branch_b = _commit(repo, f"b{i}", parent=branch_b) base = find_merge_base(repo, branch_a, branch_b) assert base == lca # =========================================================================== # Structured merge engine integration tests # =========================================================================== def _ins(addr: str, pos: int | None, cid: str) -> InsertOp: return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid) def _del(addr: str, pos: int | None, cid: str) -> DeleteOp: return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid) def _rep(addr: str, old: str, new: str) -> ReplaceOp: return ReplaceOp( op="replace", address=addr, position=None, old_content_id=old, new_content_id=new, old_summary="old", new_summary="new", ) def _delta(ops: list[DomainOp]) -> StructuredDelta: return StructuredDelta(domain="midi", ops=ops, summary="test") class TestMergeStructuredIntegration: """Verify merge_structured delegates correctly to merge_op_lists.""" def test_clean_non_overlapping_file_ops(self) -> None: ours = _delta([_ins("a.mid", pos=0, cid="a-hash")]) theirs = _delta([_ins("b.mid", pos=0, cid="b-hash")]) result = merge_structured(_delta([]), ours, theirs) assert result.is_clean is True assert len(result.merged_ops) == 2 def test_conflicting_same_address_replaces_detected(self) -> None: ours = _delta([_rep("shared.mid", "old", "v-ours")]) theirs = _delta([_rep("shared.mid", "old", "v-theirs")]) result = merge_structured(_delta([]), ours, theirs) assert result.is_clean is False assert len(result.conflict_ops) == 1 def test_base_ops_kept_by_both_sides_preserved(self) -> None: shared = _ins("base.mid", pos=0, cid="base-cid") result = merge_structured( _delta([shared]), _delta([shared]), _delta([shared]), ) assert result.is_clean is True assert any(_op_key_tuple(op) == _op_key_tuple(shared) for op in result.merged_ops) def test_position_adjustment_in_structured_merge(self) -> None: """Non-conflicting note inserts get position-adjusted in structured merge.""" ours = _delta([_ins("lead.mid", pos=3, cid="note-A")]) theirs = _delta([_ins("lead.mid", pos=7, cid="note-B")]) result = merge_structured(_delta([]), ours, theirs) assert result.is_clean is True pos_by_cid = { op["content_id"]: op["position"] for op in result.merged_ops if op["op"] == "insert" } # note-A(3): no theirs ≤ 3 → stays 3 assert pos_by_cid["note-A"] == 3 # note-B(7): ours A(3) ≤ 7 → 7+1 = 8 assert pos_by_cid["note-B"] == 8 def _op_key_tuple(op: DomainOp) -> tuple[str, ...]: """Re-implementation of _op_key for test assertions.""" if op["op"] == "insert": return ("insert", op["address"], str(op["position"]), op["content_id"]) if op["op"] == "delete": return ("delete", op["address"], str(op["position"]), op["content_id"]) if op["op"] == "replace": return ("replace", op["address"], str(op["position"]), op["old_content_id"], op["new_content_id"]) return (op["op"], op["address"]) class TestStructuredMergePluginProtocol: """Verify MidiPlugin satisfies the AddressedMergePlugin protocol.""" def test_midi_plugin_isinstance_addressed_merge_plugin(self) -> None: plugin = MidiPlugin() assert isinstance(plugin, AddressedMergePlugin) def test_merge_ops_non_conflicting_files_is_clean(self) -> None: plugin = MidiPlugin() base = SnapshotManifest(files={}, domain="midi") ours_snap = SnapshotManifest(files={"a.mid": "hash-a"}, domain="midi") theirs_snap = SnapshotManifest(files={"b.mid": "hash-b"}, domain="midi") ours_ops: list[DomainOp] = [_ins("a.mid", pos=None, cid="hash-a")] theirs_ops: list[DomainOp] = [_ins("b.mid", pos=None, cid="hash-b")] result = plugin.merge_ops( base, ours_snap, theirs_snap, ours_ops, theirs_ops ) assert result.is_clean is True assert "a.mid" in result.merged["files"] assert "b.mid" in result.merged["files"] def test_merge_ops_conflicting_same_file_replace_not_clean(self) -> None: plugin = MidiPlugin() base = SnapshotManifest(files={"f.mid": "base-hash"}, domain="midi") ours_snap = SnapshotManifest(files={"f.mid": "ours-hash"}, domain="midi") theirs_snap = SnapshotManifest(files={"f.mid": "theirs-hash"}, domain="midi") ours_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "ours-hash")] theirs_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "theirs-hash")] result = plugin.merge_ops( base, ours_snap, theirs_snap, ours_ops, theirs_ops ) assert not result.is_clean assert "f.mid" in result.conflicts def test_merge_ops_ours_strategy_resolves_conflict(self) -> None: plugin = MidiPlugin() base = SnapshotManifest(files={"f.mid": "base"}, domain="midi") ours_snap = SnapshotManifest(files={"f.mid": "ours-v"}, domain="midi") theirs_snap = SnapshotManifest(files={"f.mid": "theirs-v"}, domain="midi") ours_ops: list[DomainOp] = [_rep("f.mid", "base", "ours-v")] theirs_ops: list[DomainOp] = [_rep("f.mid", "base", "theirs-v")] result = plugin.merge_ops( base, ours_snap, theirs_snap, ours_ops, theirs_ops, ) # Without .museattributes the conflict stands — verify conflict is reported. assert not result.is_clean def test_merge_ops_delete_on_only_one_side_is_clean(self) -> None: plugin = MidiPlugin() base = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi") ours_snap = SnapshotManifest(files={"keep.mid": "k"}, domain="midi") theirs_snap = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi") ours_ops: list[DomainOp] = [_del("remove.mid", pos=None, cid="r")] theirs_ops: list[DomainOp] = [] result = plugin.merge_ops( base, ours_snap, theirs_snap, ours_ops, theirs_ops ) assert result.is_clean is True assert "keep.mid" in result.merged["files"] assert "remove.mid" not in result.merged["files"] def test_merge_ops_empty_changes_returns_base(self) -> None: plugin = MidiPlugin() base = SnapshotManifest(files={"f.mid": "h"}, domain="midi") result = plugin.merge_ops(base, base, base, [], []) assert result.is_clean is True assert result.merged["files"] == {"f.mid": "h"} # --------------------------------------------------------------------------- # Bug: "manual" attribute strategy on l==r paths causes false conflicts # # The `manual` strategy fires "even when the engine would auto-resolve" a # DIVERGENT change (one side changed, the engine would take it automatically). # It must NOT fire when both sides agree (l == r) — whether nothing changed # (b == l == r) or both made the same convergent edit (b != l == r). # # Regression for: muse merge task/core-cat → 73 false conflicts in muse/core/** # caused by [[rules]] path="muse/core/**" strategy="manual" in .museattributes. # --------------------------------------------------------------------------- _DUMMY_ROOT = pathlib.Path("/nonexistent-repo-for-testing") def _code_plugin() -> CodePlugin: return CodePlugin() def _snap(files: Mapping[str, str]) -> SnapshotManifest: return SnapshotManifest(files=files, domain="code") def _manual_attrs() -> list[AttributeRule]: return [AttributeRule(path_pattern="core/**", dimension="*", strategy="manual", priority=100)] def _merge_with_manual( base: SnapshotManifest, ours: SnapshotManifest, theirs: SnapshotManifest, ) -> MergeResult: plugin = _code_plugin() with unittest.mock.patch( "muse.plugins.code.plugin.load_attributes", return_value=_manual_attrs(), ): return plugin.merge(base, ours, theirs, repo_root=_DUMMY_ROOT) class TestManualStrategyUnchangedFiles: """manual fires for single-branch changes but NEVER for l == r paths.""" def test_unchanged_no_conflict(self) -> None: """b == l == r: neither branch touched the file → no conflict.""" base = _snap({"core/store.py": _OID_AAA}) result = _merge_with_manual(base, base, base) assert result.is_clean assert "core/store.py" not in result.conflicts def test_convergent_same_change_no_conflict(self) -> None: """b != l == r: both independently made the same edit → convergent, no conflict.""" base = _snap({"core/store.py": _OID_OLD}) same = _snap({"core/store.py": _OID_NEW}) result = _merge_with_manual(base, same, same) assert result.is_clean assert "core/store.py" not in result.conflicts def test_only_ours_changed_manual_forces_conflict(self) -> None: """b == theirs != ours: one side changed → manual forces human review.""" base = _snap({"core/store.py": _OID_OLD}) ours = _snap({"core/store.py": _OID_NEW}) theirs = _snap({"core/store.py": _OID_OLD}) result = _merge_with_manual(base, ours, theirs) assert "core/store.py" in result.conflicts def test_only_theirs_changed_manual_forces_conflict(self) -> None: """b == ours != theirs: one side changed → manual forces human review.""" base = _snap({"core/store.py": _OID_OLD}) ours = _snap({"core/store.py": _OID_OLD}) theirs = _snap({"core/store.py": _OID_NEW}) result = _merge_with_manual(base, ours, theirs) assert "core/store.py" in result.conflicts def test_divergent_changes_conflict(self) -> None: """Both changed differently → conflict regardless.""" base = _snap({"core/store.py": _OID_BASE}) ours = _snap({"core/store.py": _OID_OURS}) theirs = _snap({"core/store.py": _OID_THEIRS}) result = _merge_with_manual(base, ours, theirs) assert "core/store.py" in result.conflicts def test_73_unchanged_plus_one_real_conflict(self) -> None: """Regression: 73 unchanged core files + 1 real conflict → only 1 conflict.""" unchanged = {f"core/file_{i}.py": blob_id(f"file_{i}".encode()) for i in range(73)} base_files = {**unchanged, "core/store.py": _OID_BASE} ours_files = {**unchanged, "core/store.py": _OID_OURS} theirs_files = {**unchanged, "core/store.py": _OID_THEIRS} result = _merge_with_manual( _snap(base_files), _snap(ours_files), _snap(theirs_files) ) false_conflicts = [p for p in result.conflicts if p != "core/store.py"] assert false_conflicts == [], f"{len(false_conflicts)} false conflicts: {false_conflicts[:5]}" assert "core/store.py" in result.conflicts # --------------------------------------------------------------------------- # Bug: one-sided changes must NEVER produce false conflicts # # When only one branch changes a file (b == ours or b == theirs), the merge # must take the changed side cleanly — no conflict, no manual review needed. # This covers both the file-level merge() path and the operation-level # merge_ops() path used by the code plugin (AddressedMergePlugin). # --------------------------------------------------------------------------- class TestOneSidedChangeNeverConflicts: """One side changes a file, other side doesn't → always clean.""" def test_only_theirs_changed_no_conflict(self) -> None: base = _snap({"pyproject.toml": _OID_OLD, "describe.py": _OID_OLD}) ours = _snap({"pyproject.toml": _OID_OLD, "describe.py": _OID_OLD}) theirs = _snap({"pyproject.toml": _OID_NEW, "describe.py": _OID_FIXED}) result = _code_plugin().merge(base, ours, theirs, repo_root=None) assert result.is_clean assert result.conflicts == [] assert result.merged["files"]["pyproject.toml"] == _OID_NEW assert result.merged["files"]["describe.py"] == _OID_FIXED def test_only_ours_changed_no_conflict(self) -> None: base = _snap({"a.py": _OID_OLD}) ours = _snap({"a.py": _OID_OURS}) theirs = _snap({"a.py": _OID_OLD}) result = _code_plugin().merge(base, ours, theirs, repo_root=None) assert result.is_clean assert result.conflicts == [] assert result.merged["files"]["a.py"] == _OID_OURS def test_theirs_deleted_file_ours_untouched(self) -> None: base = _snap({"gone.py": _OID_OLD, "keep.py": _OID_K}) ours = _snap({"gone.py": _OID_OLD, "keep.py": _OID_K}) theirs = _snap({"keep.py": _OID_K}) result = _code_plugin().merge(base, ours, theirs, repo_root=None) assert result.is_clean assert "gone.py" not in result.merged["files"] def test_merge_ops_one_sided_no_conflict(self) -> None: """merge_ops() must not flag one-sided changes as conflicts.""" plugin = _code_plugin() base = _snap({"pyproject.toml": _OID_OLD}) ours = _snap({"pyproject.toml": _OID_OLD}) theirs = _snap({"pyproject.toml": _OID_NEW}) ours_delta: StructuredDelta = {"ops": [], "summary": "", "domain": "code"} theirs_delta: StructuredDelta = { "ops": [{"op": "patch", "address": "pyproject.toml", "child_ops": [], "file_change": "modified", "content_summary": ""}], "summary": "1 change", "domain": "code", } result = plugin.merge_ops(base, ours, theirs, ours_delta["ops"], theirs_delta["ops"]) assert result.conflicts == [], f"False conflicts: {result.conflicts}" # --------------------------------------------------------------------------- # Bug: merge commit with two parents must pass write_commit hash verification # # compute_commit_id and _verify_commit_id must be symmetric for merge commits # (two parents). If they disagree, write_commit raises ValueError and the # merge cannot be completed — data is permanently stuck. # --------------------------------------------------------------------------- class TestMergeCommitHashVerification: """write_commit must accept merge commits (two parents) without raising.""" def test_two_parent_commit_passes_verification(self, repo: pathlib.Path) -> None: parent1 = _commit(repo, "ours") parent2 = _commit(repo, "theirs") # A merge commit with both parents must write cleanly. _commit(repo, "merge", parent=parent1, parent2=parent2) def test_merge_commit_id_is_deterministic(self, repo: pathlib.Path) -> None: """Same inputs → same commit_id regardless of parent order in the list.""" snap_id = compute_snapshot_id({}) committed_at = datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc) p1 = _commit(repo, "p1") p2 = _commit(repo, "p2") id_ab = compute_commit_id(parent_ids=[p1, p2], snapshot_id=snap_id, message="merge", committed_at_iso=committed_at.isoformat()) id_ba = compute_commit_id(parent_ids=[p2, p1], snapshot_id=snap_id, message="merge", committed_at_iso=committed_at.isoformat()) assert id_ab == id_ba, "Merge commit ID must be order-independent" def test_verify_sees_same_id_as_compute(self, repo: pathlib.Path) -> None: """_verify_commit_id (called inside write_commit) must agree with compute_commit_id.""" from muse.core.commits import read_commit parent1 = _commit(repo, "ours") parent2 = _commit(repo, "theirs") merge_id = _commit(repo, "merge", parent=parent1, parent2=parent2) # If write_commit succeeded, read_commit must return the record intact. rec = read_commit(repo, merge_id) assert rec is not None assert rec.commit_id == merge_id assert rec.parent_commit_id == parent1 assert rec.parent2_commit_id == parent2