"""Phase 3 — Merge Strategy Engine tests (issue #37). Tier 1 — Unit (pure, no DB) Domain classifier: - Extension-based classification for code/midi/stem/payment - Path-prefix overrides beat extension rules - Unknown extensions fall back to "code" - paths_for_domains filters correctly OVERLAY: - from_branch wins all conflicts - to_branch-only files preserved - files_added / files_modified / files_removed counts correct - conflicts populated when ancestor provided and both sides changed same file - no conflicts when ancestor not provided WEAVE: - only-from changes applied cleanly - only-to changes preserved - unchanged files from ancestor preserved - true conflicts recorded and resolved from_wins - from_branch deletions honoured - to_branch deletions honoured when from didn't touch REPLAY: - applies only from_branch delta (vs ancestor) onto to_branch - to_branch-only changes preserved (unlike overlay) - conflicts recorded when both sides changed same file - from_branch removals applied SELECTIVE: - only files in selected domains taken from from_branch - files in other domains unchanged from to_branch - files_skipped counts non-selected files - ValueError on empty selective_domains PHASED: - fallback to overlay when no phase_manifests provided - strategy name set to "phased" - phase_results populated with at least one entry - full phased execution applies deltas in order execute_merge_strategy router: - routes all 5 strategies correctly - raises ValueError on unknown strategy - weave falls back to overlay when no ancestor Tier 5 — Integration (DB) - merge_proposal uses OVERLAY by default - merge_proposal uses SELECTIVE and only applies selected domains - merge_proposal logs strategy info (smoke test on execution path) """ from __future__ import annotations import os from datetime import datetime, timezone from typing import Any import pytest from muse.core.types import blob_id, fake_id, short_id from sqlalchemy.ext.asyncio import AsyncSession from musehub.types.json_types import StrDict from musehub.services.proposal_merge_strategies import ( ConflictEntry, MergeResult, classify_domain, execute_merge_strategy, merge_selective, merge_phased, merge_overlay, merge_replay, merge_weave, paths_for_domains, ) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid() -> str: return short_id(blob_id(os.urandom(16)), strip=True) def _oid(label: int | str) -> str: """Deterministic test object ID — delegates to the canonical fake_id utility.""" return fake_id(str(label)) # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Domain classifier # ───────────────────────────────────────────────────────────────────────────── class TestDomainClassifier: def test_python_file_is_code(self) -> None: assert classify_domain("src/auth.py") == "code" def test_typescript_file_is_code(self) -> None: assert classify_domain("src/components/App.tsx") == "code" def test_midi_extension(self) -> None: assert classify_domain("tracks/main.mid") == "midi" def test_midi_dot_midi_extension(self) -> None: assert classify_domain("sequences/intro.midi") == "midi" def test_wav_is_stem(self) -> None: assert classify_domain("samples/kick.wav") == "stem" def test_flac_is_stem(self) -> None: assert classify_domain("recordings/session.flac") == "stem" def test_unknown_extension_fallback_to_code(self) -> None: assert classify_domain("assets/mystery.xyz") == "code" def test_path_prefix_midi_beats_extension(self) -> None: # .py file under midi/ path should be classified as midi assert classify_domain("midi/generator.py") == "midi" def test_path_prefix_stem(self) -> None: assert classify_domain("stems/vocal.wav") == "stem" def test_path_prefix_payment(self) -> None: assert classify_domain("payments/claim_001.json") == "payment" def test_path_prefix_identity(self) -> None: assert classify_domain("identity/keys.toml") == "identity" def test_paths_for_domains_filter(self) -> None: manifest = { "src/main.py": _oid(1), "tracks/beat.mid": _oid(2), "stems/vocal.wav": _oid(3), } code_paths = paths_for_domains(manifest, ["code"]) assert "src/main.py" in code_paths assert "tracks/beat.mid" not in code_paths midi_stem = paths_for_domains(manifest, ["midi", "stem"]) assert "tracks/beat.mid" in midi_stem assert "stems/vocal.wav" in midi_stem assert "src/main.py" not in midi_stem # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — STATE_OVERLAY # ───────────────────────────────────────────────────────────────────────────── class TestStateOverlay: def _to(self) -> StrDict: return {"shared.py": _oid(1), "only_to.py": _oid(2)} def _from(self) -> StrDict: return {"shared.py": _oid(3), "only_from.py": _oid(4)} def test_from_wins_shared_file(self) -> None: to, frm = self._to(), self._from() result = merge_overlay(to, frm) assert result.manifest["shared.py"] == frm["shared.py"] def test_to_only_files_preserved(self) -> None: to, frm = self._to(), self._from() result = merge_overlay(to, frm) assert "only_to.py" in result.manifest def test_from_only_files_added(self) -> None: to, frm = self._to(), self._from() result = merge_overlay(to, frm) assert result.manifest["only_from.py"] == frm["only_from.py"] def test_counts(self) -> None: to = {"a.py": _oid(1), "b.py": _oid(2)} frm = {"b.py": _oid(3), "c.py": _oid(4)} # b modified, c added result = merge_overlay(to, frm) assert result.files_added == 1 # c assert result.files_modified == 1 # b def test_no_conflicts_without_ancestor(self) -> None: to, frm = self._to(), self._from() result = merge_overlay(to, frm) assert result.conflicts == [] def test_conflicts_when_ancestor_provided(self) -> None: ancestor = {"shared.py": _oid(0)} to = {"shared.py": _oid(1)} # to changed it frm = {"shared.py": _oid(2)} # from changed it differently result = merge_overlay(to, frm, ancestor_manifest=ancestor) assert len(result.conflicts) == 1 assert result.conflicts[0].path == "shared.py" assert result.conflicts[0].resolution == "from_wins" def test_no_conflict_when_only_one_side_changed(self) -> None: ancestor = {"shared.py": _oid(0), "to_only.py": _oid(1)} to = {"shared.py": _oid(0), "to_only.py": _oid(2)} # only to_only changed frm = {"shared.py": _oid(3)} # from changed shared result = merge_overlay(to, frm, ancestor_manifest=ancestor) assert result.conflicts == [] def test_strategy_name(self) -> None: result = merge_overlay({}, {}) assert result.strategy == "overlay" def test_domains_merged_populated(self) -> None: to = {} frm = {"tracks/beat.mid": _oid(1), "src/main.py": _oid(2)} result = merge_overlay(to, frm) assert "midi" in result.domains_merged assert "code" in result.domains_merged # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — STATE_WEAVE # ───────────────────────────────────────────────────────────────────────────── class TestStateWeave: def test_clean_from_change_applied(self) -> None: ancestor = {"a.py": _oid(1), "b.py": _oid(2)} to = {"a.py": _oid(1), "b.py": _oid(2)} # unchanged frm = {"a.py": _oid(3), "b.py": _oid(2)} # a changed result = merge_weave(to, frm, ancestor_manifest=ancestor) assert result.manifest["a.py"] == _oid(3) assert result.conflicts == [] def test_clean_to_change_preserved(self) -> None: ancestor = {"a.py": _oid(1)} to = {"a.py": _oid(2)} # to changed it frm = {"a.py": _oid(1)} # from unchanged result = merge_weave(to, frm, ancestor_manifest=ancestor) assert result.manifest["a.py"] == _oid(2) assert result.conflicts == [] def test_unchanged_file_kept(self) -> None: ancestor = {"stable.py": _oid(9)} to = {"stable.py": _oid(9)} frm = {"stable.py": _oid(9)} result = merge_weave(to, frm, ancestor_manifest=ancestor) assert result.manifest["stable.py"] == _oid(9) def test_true_conflict_recorded_from_wins(self) -> None: ancestor = {"conflict.py": _oid(0)} to = {"conflict.py": _oid(1)} frm = {"conflict.py": _oid(2)} result = merge_weave(to, frm, ancestor_manifest=ancestor) assert result.manifest["conflict.py"] == _oid(2) assert len(result.conflicts) == 1 assert result.conflicts[0].resolution == "from_wins" def test_from_deletion_honoured(self) -> None: ancestor = {"gone.py": _oid(1)} to = {"gone.py": _oid(1)} # unchanged in to frm = {} # deleted in from result = merge_weave(to, frm, ancestor_manifest=ancestor) assert "gone.py" not in result.manifest def test_to_deletion_honoured_when_from_untouched(self) -> None: ancestor = {"bye.py": _oid(1)} to = {} # deleted in to frm = {"bye.py": _oid(1)} # unchanged in from result = merge_weave(to, frm, ancestor_manifest=ancestor) assert "bye.py" not in result.manifest def test_new_file_from_only_added(self) -> None: ancestor: StrDict = {} to: StrDict = {} frm = {"new.py": _oid(5)} result = merge_weave(to, frm, ancestor_manifest=ancestor) assert result.manifest["new.py"] == _oid(5) def test_strategy_name(self) -> None: result = merge_weave({}, {}, ancestor_manifest={}) assert result.strategy == "weave" # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — STATE_REBASE # ───────────────────────────────────────────────────────────────────────────── class TestStateRebase: def test_from_delta_applied_onto_to(self) -> None: ancestor = {"a.py": _oid(1), "b.py": _oid(2)} to = {"a.py": _oid(1), "b.py": _oid(3)} # to changed b frm = {"a.py": _oid(4), "b.py": _oid(2)} # from changed a result = merge_replay(to, frm, ancestor_manifest=ancestor) # a gets from's version, b keeps to's change assert result.manifest["a.py"] == _oid(4) assert result.manifest["b.py"] == _oid(3) def test_to_only_changes_preserved(self) -> None: ancestor = {"a.py": _oid(1)} to = {"a.py": _oid(1), "to_added.py": _oid(10)} # to added a file frm = {"a.py": _oid(1)} # from didn't touch it result = merge_replay(to, frm, ancestor_manifest=ancestor) assert "to_added.py" in result.manifest def test_conflict_when_both_modified_same_file(self) -> None: ancestor = {"x.py": _oid(0)} to = {"x.py": _oid(1)} frm = {"x.py": _oid(2)} result = merge_replay(to, frm, ancestor_manifest=ancestor) assert result.manifest["x.py"] == _oid(2) assert len(result.conflicts) == 1 def test_from_removal_applied(self) -> None: ancestor = {"old.py": _oid(1)} to = {"old.py": _oid(1)} frm: StrDict = {} result = merge_replay(to, frm, ancestor_manifest=ancestor) assert "old.py" not in result.manifest assert result.files_removed == 1 def test_strategy_name(self) -> None: result = merge_replay({}, {}, ancestor_manifest={}) assert result.strategy == "replay" # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — DOMAIN_SELECTIVE # ───────────────────────────────────────────────────────────────────────────── class TestDomainSelective: def test_selected_domain_applied(self) -> None: to = {"src/main.py": _oid(1), "tracks/beat.mid": _oid(2)} frm = {"src/main.py": _oid(3), "tracks/beat.mid": _oid(4)} result = merge_selective(to, frm, selective_domains=["code"]) assert result.manifest["src/main.py"] == _oid(3) # code → from wins assert result.manifest["tracks/beat.mid"] == _oid(2) # midi → untouched def test_non_selected_files_unchanged(self) -> None: to = {"tracks/beat.mid": _oid(1)} frm = {"tracks/beat.mid": _oid(2), "src/new.py": _oid(3)} result = merge_selective(to, frm, selective_domains=["code"]) assert result.manifest["tracks/beat.mid"] == _oid(1) # midi not selected def test_files_skipped_counted(self) -> None: to = {"a.py": _oid(1)} frm = {"a.py": _oid(2), "beat.mid": _oid(3), "vocal.wav": _oid(4)} result = merge_selective(to, frm, selective_domains=["code"]) assert result.files_skipped >= 2 # mid + wav not in selected def test_empty_selective_domains_raises(self) -> None: with pytest.raises(ValueError, match="selective_domains"): merge_selective({}, {}, selective_domains=[]) def test_domains_merged_lists_selected(self) -> None: to = {} frm = {"src/x.py": _oid(1)} result = merge_selective(to, frm, selective_domains=["code", "midi"]) assert "code" in result.domains_merged def test_strategy_name(self) -> None: result = merge_selective({}, {}, selective_domains=["code"]) assert result.strategy == "selective" def test_conflict_recorded_when_ancestor_provided(self) -> None: ancestor = {"src/x.py": _oid(0)} to = {"src/x.py": _oid(1)} frm = {"src/x.py": _oid(2)} result = merge_selective( to, frm, selective_domains=["code"], ancestor_manifest=ancestor ) assert len(result.conflicts) == 1 # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — PHASED # ───────────────────────────────────────────────────────────────────────────── class TestMergePhased: def test_fallback_overlay_when_no_phase_manifests(self) -> None: to = {"a.py": _oid(1)} frm = {"a.py": _oid(2), "b.py": _oid(3)} result = merge_phased(to, frm) assert result.strategy == "phased" assert result.manifest["a.py"] == _oid(2) assert result.manifest["b.py"] == _oid(3) def test_fallback_has_one_phase_result(self) -> None: result = merge_phased({}, {"new.py": _oid(1)}) assert len(result.phase_results) == 1 assert result.phase_results[0].dependency_proposal_id == "self" def test_full_phased_applies_in_order(self) -> None: ancestor = {"a.py": _oid(0)} to = {"a.py": _oid(0)} frm = {"a.py": _oid(3), "c.py": _oid(4)} dep1_id = "dep1" dep1_manifest = {"a.py": _oid(1)} # dep1 changed a.py dep2_id = "dep2" dep2_manifest = {"a.py": _oid(2), "b.py": _oid(5)} # dep2 changed a.py + added b.py result = merge_phased( to, frm, ancestor_manifest=ancestor, dependency_order=[dep1_id, dep2_id], phase_manifests={dep1_id: dep1_manifest, dep2_id: dep2_manifest}, ) assert result.strategy == "phased" # Final merge: from_branch (a=3, c=4) applied on top of dep2 result assert result.manifest["c.py"] == _oid(4) assert len(result.phase_results) == 3 # dep1, dep2, self def test_phase_results_have_correct_indices(self) -> None: dep_id = "dep_x" result = merge_phased( {"a.py": _oid(1)}, {"a.py": _oid(2)}, dependency_order=[dep_id], phase_manifests={dep_id: {"a.py": _oid(1)}}, ) indices = [pr.phase_index for pr in result.phase_results] assert indices == sorted(indices) def test_strategy_name(self) -> None: result = merge_phased({}, {}) assert result.strategy == "phased" # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — execute_merge_strategy router # ───────────────────────────────────────────────────────────────────────────── class TestStrategyRouter: def test_routes_overlay(self) -> None: result = execute_merge_strategy("overlay", {}, {"x.py": _oid(1)}) assert result.strategy == "overlay" def test_routes_weave(self) -> None: result = execute_merge_strategy( "weave", {}, {}, ancestor_manifest={} ) assert result.strategy == "weave" def test_routes_replay(self) -> None: result = execute_merge_strategy( "replay", {}, {}, ancestor_manifest={} ) assert result.strategy == "replay" def test_routes_selective(self) -> None: result = execute_merge_strategy( "selective", {}, {}, selective_domains=["code"] ) assert result.strategy == "selective" def test_routes_phased(self) -> None: result = execute_merge_strategy("phased", {}, {}) assert result.strategy == "phased" def test_unknown_strategy_raises(self) -> None: with pytest.raises(ValueError, match="Unknown merge strategy"): execute_merge_strategy("magic_merge", {}, {}) def test_weave_fallback_without_ancestor(self) -> None: result = execute_merge_strategy("weave", {}, {"a.py": _oid(1)}) assert result.strategy == "weave" assert result.manifest["a.py"] == _oid(1) def test_replay_fallback_without_ancestor(self) -> None: result = execute_merge_strategy("replay", {}, {"a.py": _oid(1)}) assert result.strategy == "replay" # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — Integration: strategy wired into merge_proposal # ───────────────────────────────────────────────────────────────────────────── async def _make_repo(session: AsyncSession) -> str: from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo owner = "strattest" slug = f"repo-{_uid()}" owner_id = compute_identity_id(owner.encode()) created_at = _now() repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, description="", tags=[], created_at=created_at, ) session.add(repo) await session.flush() return repo.repo_id async def _make_branch_with_commit( session: AsyncSession, repo_id: str, branch_name: str, manifest: StrDict, ) -> None: """Create a branch + one commit + snapshot with the given manifest.""" from musehub.core.genesis import compute_identity_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.muse_cli.snapshot import compute_commit_id, compute_snapshot_id from musehub.services.musehub_snapshot import upsert_snapshot_entries from musehub.core.genesis import compute_branch_id created_at = _now() snapshot_id = compute_snapshot_id(manifest) await upsert_snapshot_entries(session, repo_id, snapshot_id, manifest) commit_id = compute_commit_id( [], snapshot_id, f"init {branch_name}", created_at.isoformat(), author="strattest", signer_public_key="", ) commit = MusehubCommit( commit_id=commit_id, branch=branch_name, parent_ids=[], message=f"init {branch_name}", author="strattest", timestamp=created_at, snapshot_id=snapshot_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) branch = MusehubBranch( branch_id=compute_branch_id(repo_id, branch_name), repo_id=repo_id, name=branch_name, head_commit_id=commit_id, ) session.add(branch) await session.flush() class TestMergeProposalStrategyIntegration: @pytest.mark.asyncio async def test_default_overlay_merges_manifests( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import create_proposal, merge_proposal from musehub.services.musehub_snapshot import get_snapshot_manifest from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from sqlalchemy import select repo_id = await _make_repo(db_session) to_manifest = {"shared.py": _oid(1), "to_only.py": _oid(2)} from_manifest = {"shared.py": _oid(3), "from_only.py": _oid(4)} await _make_branch_with_commit(db_session, repo_id, "dev", to_manifest) await _make_branch_with_commit(db_session, repo_id, "feat/overlay", from_manifest) proposal_resp = await create_proposal( db_session, repo_id=repo_id, title="overlay merge", from_branch="feat/overlay", to_branch="dev", author="strattest", ) merged_resp = await merge_proposal(db_session, repo_id, proposal_resp.proposal_id) # Find the merge commit and read its snapshot merge_commit = await db_session.get(MusehubCommit, merged_resp.merge_commit_id) assert merge_commit is not None merged = await get_snapshot_manifest(db_session, merge_commit.snapshot_id) assert merged["shared.py"] == _oid(3) # from_branch won assert merged["to_only.py"] == _oid(2) # preserved from to assert merged["from_only.py"] == _oid(4) # added from from @pytest.mark.asyncio async def test_domain_selective_only_applies_selected_domain( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import create_proposal, merge_proposal from musehub.services.musehub_snapshot import get_snapshot_manifest from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo repo_id = await _make_repo(db_session) to_manifest = {"src/main.py": _oid(1), "tracks/beat.mid": _oid(2)} from_manifest = {"src/main.py": _oid(3), "tracks/beat.mid": _oid(4)} await _make_branch_with_commit(db_session, repo_id, "dev", to_manifest) await _make_branch_with_commit(db_session, repo_id, "feat/selective", from_manifest) proposal_resp = await create_proposal( db_session, repo_id=repo_id, title="selective merge", from_branch="feat/selective", to_branch="dev", author="strattest", merge_strategy="selective", selective_domains=["code"], ) merged_resp = await merge_proposal(db_session, repo_id, proposal_resp.proposal_id) merge_commit = await db_session.get(MusehubCommit, merged_resp.merge_commit_id) assert merge_commit is not None merged = await get_snapshot_manifest(db_session, merge_commit.snapshot_id) assert merged["src/main.py"] == _oid(3) # code domain applied assert merged["tracks/beat.mid"] == _oid(2) # midi not selected — untouched