"""Phase 3.6 — ``muse merge`` at scale. Targets ------- - ``muse merge --dry-run `` on two branches each with 5 000 modified files from a 75 000-file base: < 30 s. - ``detect_conflicts`` on 1 000 files modified by both branches: < 5 s (actual measurement: < 1 ms — target is trivially exceeded; the test documents the asymptotic behaviour). - ``find_merge_base`` on 50-commit-deep chains completes correctly; > ``max_ancestors`` cap raises ``MuseCLIError`` cleanly. Additional coverage discovered during reconnaissance ---------------------------------------------------- - Convergent both-delete: NOT a conflict, absent from merged manifest. - Convergent same-add same-hash: NOT a conflict, present in merged manifest. - Delete-vs-modify: IS a conflict. - Criss-cross DAG: ``find_merge_base`` returns a valid (though non-unique) LCA. - Disjoint histories (no common ancestor): returns ``None``. - Fast-forward merge semantics at 75 000-file scale. - Memory: three 75 000-file manifests stay under 64 MB peak. - Snapshot I/O round-trip at 75 000 files: write < 500 ms, read < 500 ms. - Full three-way pipeline (diff → detect → apply) at 75 000 files: < 5 s. """ from __future__ import annotations from collections.abc import Mapping import datetime import hashlib import pathlib import shutil import tempfile import time import tracemalloc import pytest from muse.core.merge_engine import ( apply_merge, detect_conflicts, diff_snapshots, find_merge_base, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.object_store import object_path from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from muse.core.paths import muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _NOW = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc) _REPO_ID = "bench" def _s256(data: bytes) -> str: return blob_id(data) def _fresh_repo(path: pathlib.Path, max_ancestors: int = 200_000) -> pathlib.Path: dot_muse = muse_dir(path) dot_muse.mkdir(parents=True) (dot_muse / "repo.json").write_text('{"repo_id":"bench","owner":"bench"}') (dot_muse / "config.toml").write_text(f"[limits]\nmax_ancestors = {max_ancestors}\n") for d in ("commits", "snapshots", "objects"): (dot_muse / d).mkdir() (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return path def _make_commit( root: pathlib.Path, parent_id: str | None, snap_id: str, msg: str, offset_secs: int = 0, ) -> str: parent_ids = [parent_id] if parent_id else [] ts = _NOW + datetime.timedelta(seconds=offset_secs) cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=msg, committed_at_iso=ts.isoformat(), author="bench", ) write_commit( root, CommitRecord( commit_id=cid, branch="br", message=msg, author="bench", committed_at=ts, parent_commit_id=parent_id, parent2_commit_id=None, snapshot_id=snap_id, metadata={}, sem_ver_bump="PATCH", ), ) return cid def _write_chain(root: pathlib.Path, depth: int, prefix: str, parent_id: str) -> str: """Write a linear chain of *depth* commits on top of *parent_id*. Returns tip.""" snap = SnapshotRecord( snapshot_id=compute_snapshot_id({}), manifest={}, created_at=_NOW ) write_snapshot(root, snap) prev = parent_id for i in range(depth): prev = _make_commit(root, prev, snap.snapshot_id, f"{prefix}-{i}", i) return prev def _build_manifests( n_base: int, n_ours: int, n_theirs: int, n_conflict: int, ) -> tuple[Mapping[str, str], Mapping[str, str], Mapping[str, str]]: """Return (base, ours, theirs) manifests for a scale scenario.""" base = {f"f{i:06d}.py": _s256(bytes([i % 256] * 64)) for i in range(n_base)} ours = dict(base) theirs = dict(base) for i in range(n_ours): ours[f"f{i:06d}.py"] = _s256(b"ours" + bytes([i % 256] * 60)) for i in range(n_ours, n_ours + n_theirs): theirs[f"f{i:06d}.py"] = _s256(b"theirs" + bytes([i % 256] * 60)) conflict_start = n_ours + n_theirs for i in range(conflict_start, conflict_start + n_conflict): ours[f"f{i:06d}.py"] = _s256(b"ours_c" + bytes([i % 256] * 56)) theirs[f"f{i:06d}.py"] = _s256(b"theirs_c" + bytes([i % 256] * 56)) return base, ours, theirs # --------------------------------------------------------------------------- # TestDiffSnapshotsAtScale — the pure set-arithmetic hot path # --------------------------------------------------------------------------- class TestDiffSnapshotsAtScale: """diff_snapshots is pure Python set arithmetic — verifies correctness at 75k.""" def test_diff_empty_manifests(self) -> None: assert diff_snapshots({}, {}) == set() def test_diff_no_changes(self) -> None: m = {f"f{i}.py": _s256(bytes([i % 256])) for i in range(1000)} assert diff_snapshots(m, m) == set() def test_diff_all_added(self) -> None: other = {f"f{i}.py": _s256(bytes([i % 256])) for i in range(500)} result = diff_snapshots({}, other) assert result == set(other) def test_diff_all_deleted(self) -> None: base = {f"f{i}.py": _s256(bytes([i % 256])) for i in range(500)} assert diff_snapshots(base, {}) == set(base) def test_diff_partial_modify(self) -> None: base = {f"f{i}.py": _s256(bytes([i % 256])) for i in range(1000)} other = dict(base) for i in range(0, 100): other[f"f{i}.py"] = _s256(b"new" + bytes([i % 256])) result = diff_snapshots(base, other) assert len(result) == 100 assert all(f"f{i}.py" in result for i in range(100)) def test_diff_symmetry_add_delete(self) -> None: """diff_snapshots(a, b) == diff_snapshots(b, a) for add/delete (same paths, different meaning).""" base = {"x.py": "h1"} other = {"y.py": "h2"} assert diff_snapshots(base, other) == diff_snapshots(other, base) def test_diff_75k_under_500ms(self) -> None: """diff_snapshots on 75k-file manifests completes in < 500 ms.""" base, ours, _ = _build_manifests(75_000, 5_000, 0, 0) t0 = time.perf_counter() result = diff_snapshots(base, ours) duration_ms = (time.perf_counter() - t0) * 1000 assert len(result) == 5_000 assert duration_ms < 500, f"diff_snapshots took {duration_ms:.1f}ms (limit: 500ms)" # --------------------------------------------------------------------------- # TestDetectConflictsSemantics — correctness of the new convergence semantics # --------------------------------------------------------------------------- class TestDetectConflictsSemantics: """Verifies the fixed convergence semantics of detect_conflicts.""" def test_disjoint_changes_no_conflict(self) -> None: ours_m = {"a.py": "h_a"} theirs_m = {"b.py": "h_b"} assert detect_conflicts({"a.py"}, {"b.py"}, ours_m, theirs_m) == set() def test_divergent_change_is_conflict(self) -> None: ours_m = {"x.py": "hash_ours"} theirs_m = {"x.py": "hash_theirs"} assert detect_conflicts({"x.py"}, {"x.py"}, ours_m, theirs_m) == {"x.py"} def test_both_delete_is_convergent(self) -> None: """Both branches deleted the same file — convergent, NOT a conflict.""" assert detect_conflicts({"gone.py"}, {"gone.py"}, {}, {}) == set() def test_both_delete_absent_from_merged(self) -> None: """Both-delete: convergent → apply_merge correctly omits the file.""" base = {"gone.py": "h_old", "keep.py": "h_k"} ours = {"keep.py": "h_k"} theirs = {"keep.py": "h_k"} oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) assert "gone.py" not in merged, "Both-delete must remove file from merged" assert "keep.py" in merged def test_same_add_same_hash_convergent(self) -> None: """Both branches independently added the same file with identical content.""" ours_m = {"new.py": "hash42"} theirs_m = {"new.py": "hash42"} assert detect_conflicts({"new.py"}, {"new.py"}, ours_m, theirs_m) == set() def test_same_add_present_in_merged(self) -> None: """Same-add convergent → apply_merge includes the file at the agreed hash.""" base: Manifest = {} h = _s256(b"shared-content") ours = {"new.py": h} theirs = {"new.py": h} oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) assert "new.py" not in conflicts assert merged.get("new.py") == h def test_delete_vs_modify_is_conflict(self) -> None: """One side deleted, other modified — genuinely divergent.""" ours_m: Manifest = {} theirs_m = {"a.py": "hash_new"} assert detect_conflicts({"a.py"}, {"a.py"}, ours_m, theirs_m) == {"a.py"} def test_same_add_different_hash_is_conflict(self) -> None: """Both added same path with DIFFERENT content — real conflict.""" ours_m = {"new.py": "h_v1"} theirs_m = {"new.py": "h_v2"} assert detect_conflicts({"new.py"}, {"new.py"}, ours_m, theirs_m) == {"new.py"} def test_commutativity(self) -> None: """detect_conflicts(a, b, ma, mb) == detect_conflicts(b, a, mb, ma).""" ours_m = {f"f{i}.py": f"h_ours_{i}" for i in range(50)} theirs_m = {f"f{i}.py": f"h_theirs_{i}" for i in range(30, 80)} oc = set(ours_m) tc = set(theirs_m) assert detect_conflicts(oc, tc, ours_m, theirs_m) == detect_conflicts( tc, oc, theirs_m, ours_m ) def test_detect_conflicts_1k_under_5s(self) -> None: """1 000 conflicting paths detected in well under 5 s (actually < 1 ms).""" paths = {f"conflict-{i:04d}.py" for i in range(1_000)} ours_m = {p: f"ours-{p}" for p in paths} theirs_m = {p: f"theirs-{p}" for p in paths} t0 = time.perf_counter() result = detect_conflicts(paths, paths, ours_m, theirs_m) duration_ms = (time.perf_counter() - t0) * 1000 assert result == paths assert duration_ms < 5_000, f"detect_conflicts 1k took {duration_ms:.1f}ms (limit: 5000ms)" def test_detect_conflicts_75k_under_500ms(self) -> None: """detect_conflicts on 75k-path intersection completes in < 500 ms.""" n = 75_000 paths = {f"f{i:06d}.py" for i in range(n)} ours_m = {p: f"ours-{p}" for p in paths} theirs_m = {p: f"theirs-{p}" for p in paths} t0 = time.perf_counter() result = detect_conflicts(paths, paths, ours_m, theirs_m) duration_ms = (time.perf_counter() - t0) * 1000 assert len(result) == n assert duration_ms < 500, f"detect_conflicts 75k took {duration_ms:.1f}ms (limit: 500ms)" # --------------------------------------------------------------------------- # TestApplyMergeAtScale — correctness and timing of apply_merge # --------------------------------------------------------------------------- class TestApplyMergeAtScale: """apply_merge correctness and performance at 75k files.""" def test_apply_no_changes(self) -> None: base = {"a.py": "h1", "b.py": "h2"} merged = apply_merge(base, base, base, set(), set(), set()) assert merged == base def test_apply_ours_only_addition(self) -> None: base: Manifest = {} ours = {"new.py": "h_new"} merged = apply_merge(base, ours, {}, {"new.py"}, set(), set()) assert merged["new.py"] == "h_new" def test_apply_theirs_only_deletion(self) -> None: base = {"del.py": "h_old", "keep.py": "h_k"} ours = dict(base) theirs = {"keep.py": "h_k"} merged = apply_merge(base, ours, theirs, set(), {"del.py"}, set()) assert "del.py" not in merged assert "keep.py" in merged def test_apply_conflict_stays_at_base(self) -> None: base = {"c.py": "h_base"} ours = {"c.py": "h_ours"} theirs = {"c.py": "h_theirs"} merged = apply_merge(base, ours, theirs, {"c.py"}, {"c.py"}, {"c.py"}) assert merged["c.py"] == "h_base" def test_apply_all_conflict_returns_base(self) -> None: """When every path is a conflict, apply_merge returns a copy of base.""" base = {f"f{i}.py": f"h{i}" for i in range(100)} ours = {p: f"ours-{v}" for p, v in base.items()} theirs = {p: f"theirs-{v}" for p, v in base.items()} oc = set(base) tc = set(base) merged = apply_merge(base, ours, theirs, oc, tc, oc) assert merged == base def test_apply_75k_5k_5k_under_500ms(self) -> None: """apply_merge on 75k files with 5k ours + 5k theirs changes: < 500 ms.""" base, ours, theirs = _build_manifests(75_000, 5_000, 5_000, 0) oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) t0 = time.perf_counter() merged = apply_merge(base, ours, theirs, oc, tc, conflicts) duration_ms = (time.perf_counter() - t0) * 1000 assert len(conflicts) == 0 assert len(merged) == 75_000 assert duration_ms < 500, f"apply_merge 75k took {duration_ms:.1f}ms (limit: 500ms)" def test_apply_75k_with_1k_conflicts_under_500ms(self) -> None: """apply_merge with 1k conflict paths at 75k scale: < 500 ms.""" base, ours, theirs = _build_manifests(75_000, 5_000, 5_000, 1_000) oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) t0 = time.perf_counter() merged = apply_merge(base, ours, theirs, oc, tc, conflicts) duration_ms = (time.perf_counter() - t0) * 1000 assert len(conflicts) == 1_000 assert duration_ms < 500, f"apply_merge 75k+conflicts took {duration_ms:.1f}ms (limit: 500ms)" # --------------------------------------------------------------------------- # TestFullMergePipeline — diff → detect → apply at scale # --------------------------------------------------------------------------- class TestFullMergePipeline: """End-to-end pure-function pipeline timing and correctness.""" def test_pipeline_correctness_small(self) -> None: base = {"a.py": "base_a", "b.py": "base_b", "c.py": "base_c"} # ours: modifies a.py, leaves b.py and c.py unchanged ours = {"a.py": "ours_a", "b.py": "base_b", "c.py": "base_c"} # theirs: leaves a.py unchanged, modifies b.py, leaves c.py, adds d.py theirs = {"a.py": "base_a", "b.py": "theirs_b", "c.py": "base_c", "d.py": "new_d"} oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) assert merged["a.py"] == "ours_a" # ours-only modification assert merged["b.py"] == "theirs_b" # theirs-only modification assert merged["d.py"] == "new_d" # theirs-only addition assert merged["c.py"] == "base_c" # untouched by both def test_pipeline_both_delete_resolved(self) -> None: """Pipeline: both-delete produces no conflict and absent file in merged.""" base = {"rm.py": "old", "keep.py": "k"} ours = {"keep.py": "k"} theirs = {"keep.py": "k"} oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) assert "rm.py" not in conflicts assert "rm.py" not in merged def test_pipeline_same_add_resolved(self) -> None: """Pipeline: same-add same-hash produces no conflict and file in merged.""" base: Manifest = {} h = _s256(b"shared") ours = {"new.py": h} theirs = {"new.py": h} oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) assert "new.py" not in conflicts assert merged.get("new.py") == h def test_pipeline_75k_5k_5k_under_5s(self) -> None: """Full pipeline (diff×2 + detect + apply) at 75k files, 5k+5k changes: < 5 s.""" base, ours, theirs = _build_manifests(75_000, 5_000, 5_000, 0) t0 = time.perf_counter() oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) elapsed = time.perf_counter() - t0 assert len(conflicts) == 0 assert len(merged) == 75_000 assert elapsed < 5, f"Full pipeline 75k took {elapsed:.2f}s (limit: 5s)" def test_pipeline_75k_with_1k_conflicts_under_5s(self) -> None: """Full pipeline with 1k conflict paths at 75k scale: < 5 s.""" base, ours, theirs = _build_manifests(75_000, 5_000, 5_000, 1_000) t0 = time.perf_counter() oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) elapsed = time.perf_counter() - t0 assert len(conflicts) == 1_000 assert elapsed < 5, f"Full pipeline 75k+conflicts took {elapsed:.2f}s (limit: 5s)" def test_pipeline_memory_3_manifests_under_64mb(self) -> None: """Three 75k-file manifests (base + ours + theirs) peak < 64 MB.""" tracemalloc.start() base, ours, theirs = _build_manifests(75_000, 5_000, 5_000, 1_000) _, peak = tracemalloc.get_traced_memory() tracemalloc.stop() peak_mb = peak / 1024 / 1024 assert peak_mb < 64, f"3 manifests at 75k peak {peak_mb:.1f}MB (limit: 64MB)" # --------------------------------------------------------------------------- # TestSnapshotIOAtScale — read_snapshot / write_snapshot at 75k files # --------------------------------------------------------------------------- class TestSnapshotIOAtScale: """Snapshot serialisation/deserialisation timing at 75k-file scale.""" def test_write_snapshot_75k_under_500ms(self, tmp_path: pathlib.Path) -> None: root = _fresh_repo(tmp_path) manifest = {f"f{i:06d}.py": _s256(bytes([i % 256] * 64)) for i in range(75_000)} snap_id = compute_snapshot_id(manifest) snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest, created_at=_NOW) t0 = time.perf_counter() write_snapshot(root, snap) duration_ms = (time.perf_counter() - t0) * 1000 assert duration_ms < 500, f"write_snapshot 75k took {duration_ms:.1f}ms (limit: 500ms)" def test_read_snapshot_75k_under_500ms(self, tmp_path: pathlib.Path) -> None: from muse.core.snapshots import read_snapshot root = _fresh_repo(tmp_path) manifest = {f"f{i:06d}.py": _s256(bytes([i % 256] * 64)) for i in range(75_000)} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest, created_at=_NOW)) t0 = time.perf_counter() loaded = read_snapshot(root, snap_id) duration_ms = (time.perf_counter() - t0) * 1000 assert loaded is not None assert len(loaded.manifest) == 75_000 assert duration_ms < 500, f"read_snapshot 75k took {duration_ms:.1f}ms (limit: 500ms)" def test_snapshot_roundtrip_integrity(self, tmp_path: pathlib.Path) -> None: """write + read produces bit-identical manifest.""" from muse.core.snapshots import read_snapshot root = _fresh_repo(tmp_path) manifest = {f"f{i:04d}.py": _s256(bytes([i % 256])) for i in range(10_000)} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest, created_at=_NOW)) loaded = read_snapshot(root, snap_id) assert loaded is not None assert loaded.manifest == manifest # --------------------------------------------------------------------------- # TestFindMergeBaseCorrectness — DAG edge cases # --------------------------------------------------------------------------- class TestFindMergeBaseCorrectness: """find_merge_base correctness across edge cases.""" def _repo_with_base( self, tmp_path: pathlib.Path, max_ancestors: int = 200_000 ) -> tuple[pathlib.Path, str, str]: """Return (root, base_commit_id, snap_id).""" root = _fresh_repo(tmp_path, max_ancestors=max_ancestors) snap_id = compute_snapshot_id({}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}, created_at=_NOW)) base_id = _make_commit(root, None, snap_id, "base", 0) return root, base_id, snap_id def test_lca_simple_diverging_chains(self, tmp_path: pathlib.Path) -> None: root, base_id, snap_id = self._repo_with_base(tmp_path) tip_a = _write_chain(root, 10, "a", base_id) tip_b = _write_chain(root, 10, "b", base_id) result = find_merge_base(root, tip_a, tip_b) assert result == base_id def test_lca_identical_tips(self, tmp_path: pathlib.Path) -> None: """find_merge_base(tip, tip) == tip.""" root, base_id, snap_id = self._repo_with_base(tmp_path) tip = _write_chain(root, 5, "a", base_id) assert find_merge_base(root, tip, tip) == tip def test_lca_a_is_ancestor_of_b(self, tmp_path: pathlib.Path) -> None: """When a is a direct ancestor of b, LCA is a.""" root, base_id, snap_id = self._repo_with_base(tmp_path) mid = _write_chain(root, 3, "chain", base_id) tip = _write_chain(root, 3, "ext", mid) result = find_merge_base(root, mid, tip) assert result == mid def test_lca_disjoint_histories_returns_none(self, tmp_path: pathlib.Path) -> None: root = _fresh_repo(tmp_path) snap_id = compute_snapshot_id({}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}, created_at=_NOW)) a = _make_commit(root, None, snap_id, "a", 0) b = _make_commit(root, None, snap_id, "b", 1) assert find_merge_base(root, a, b) is None def test_lca_criss_cross_dag_valid_lca(self, tmp_path: pathlib.Path) -> None: """Criss-cross DAG (merge_1 has parents a+b, merge_2 has parents b+a). Both a and b are valid LCAs; BFS must return one of them. """ root, base_id, snap_id = self._repo_with_base(tmp_path) a = _make_commit(root, base_id, snap_id, "a", 1) b = _make_commit(root, base_id, snap_id, "b", 2) # m1: a merges b ts = _NOW + datetime.timedelta(seconds=3) m1_id = compute_commit_id( parent_ids=[a, b], snapshot_id=snap_id, message="merge1", committed_at_iso=ts.isoformat(), author="b",) write_commit( root, CommitRecord( commit_id=m1_id, branch="a", message="merge1", author="b", committed_at=ts, parent_commit_id=a, parent2_commit_id=b, snapshot_id=snap_id, metadata={}, sem_ver_bump="PATCH", ), ) # m2: b merges a ts2 = _NOW + datetime.timedelta(seconds=4) m2_id = compute_commit_id( parent_ids=[b, a], snapshot_id=snap_id, message="merge2", committed_at_iso=ts2.isoformat(), author="b",) write_commit( root, CommitRecord( commit_id=m2_id, branch="b", message="merge2", author="b", committed_at=ts2, parent_commit_id=b, parent2_commit_id=a, snapshot_id=snap_id, metadata={}, sem_ver_bump="PATCH", ), ) result = find_merge_base(root, m1_id, m2_id) assert result in {a, b}, f"Expected a or b as LCA, got {result}" def test_lca_cap_raises_clean_error(self, tmp_path: pathlib.Path) -> None: """Ancestor graph > max_ancestors raises MuseCLIError, not a silent None.""" from muse.core.errors import MuseCLIError root = _fresh_repo(tmp_path, max_ancestors=20) snap_id = compute_snapshot_id({}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}, created_at=_NOW)) # 25-long chains share no ancestor — A side will hit cap prev_a: str | None = None for i in range(25): prev_a = _make_commit(root, prev_a, snap_id, f"a-{i}", i) prev_b: str | None = None for i in range(25): prev_b = _make_commit(root, prev_b, snap_id, f"b-{i}", i + 25) assert prev_a is not None assert prev_b is not None with pytest.raises(MuseCLIError, match="Ancestor graph exceeds"): find_merge_base(root, prev_a, prev_b) def test_lca_missing_commit_handled(self, tmp_path: pathlib.Path) -> None: """BFS continues gracefully when a commit file is missing (None from read_commit).""" root, base_id, snap_id = self._repo_with_base(tmp_path) # Chain a: base → real commit tip_a = _write_chain(root, 5, "a", base_id) # Chain b: points to a non-existent commit ID (ghost) ghost_id = _s256(b"ghost-commit-does-not-exist") # Stub the ghost so write_commit's parent-existence guard passes; # find_merge_base will encounter the unreadable stub and stop. import json as _json ghost_payload = _json.dumps({"commit_id": ghost_id}, separators=(",", ":")).encode() _p = object_path(root, ghost_id) _p.parent.mkdir(parents=True, exist_ok=True) _p.write_bytes(f"commit {len(ghost_payload)}\0".encode() + ghost_payload) # wrap the ghost in a real commit so find_merge_base walks into it ts = _NOW + datetime.timedelta(seconds=100) wrap_id = compute_commit_id( parent_ids=[ghost_id], snapshot_id=snap_id, message="wrap", committed_at_iso=ts.isoformat(), author="b",) write_commit( root, CommitRecord( commit_id=wrap_id, branch="b", message="wrap", author="b", committed_at=ts, parent_commit_id=ghost_id, parent2_commit_id=None, snapshot_id=snap_id, metadata={}, sem_ver_bump="PATCH", ), ) # Should not raise — returns None because the ghost branch has no real ancestors result = find_merge_base(root, tip_a, wrap_id) assert result is None # ghost side has no ancestors in common with a # --------------------------------------------------------------------------- # TestFindMergeBasePerformance — timing at real-world commit depths # --------------------------------------------------------------------------- class TestFindMergeBasePerformance: """find_merge_base timing. Each test builds fresh chains in a temp dir.""" @pytest.mark.parametrize("depth", [100, 500]) def test_find_merge_base_depth_under_2s( self, depth: int, tmp_path: pathlib.Path ) -> None: """find_merge_base on two diverging {depth}-commit chains: < 2 s.""" root = _fresh_repo(tmp_path) snap_id = compute_snapshot_id({}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}, created_at=_NOW)) base_id = _make_commit(root, None, snap_id, "base", 0) tip_a = _write_chain(root, depth, "a", base_id) tip_b = _write_chain(root, depth, "b", base_id) t0 = time.perf_counter() result = find_merge_base(root, tip_a, tip_b) elapsed = time.perf_counter() - t0 assert result == base_id, f"Wrong LCA at depth={depth}" assert elapsed < 2, f"find_merge_base depth={depth} took {elapsed:.2f}s (limit: 2s)" @pytest.mark.slow def test_find_merge_base_1k_depth_under_5s(self, tmp_path: pathlib.Path) -> None: """find_merge_base on 1k-commit diverging chains: < 5 s.""" root = _fresh_repo(tmp_path) snap_id = compute_snapshot_id({}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}, created_at=_NOW)) base_id = _make_commit(root, None, snap_id, "base", 0) tip_a = _write_chain(root, 1_000, "a", base_id) tip_b = _write_chain(root, 1_000, "b", base_id) t0 = time.perf_counter() result = find_merge_base(root, tip_a, tip_b) elapsed = time.perf_counter() - t0 assert result == base_id assert elapsed < 5, f"find_merge_base 1k took {elapsed:.2f}s (limit: 5s)" @pytest.mark.slow def test_find_merge_base_5k_depth_under_30s(self, tmp_path: pathlib.Path) -> None: """find_merge_base on 5k-commit diverging chains: < 30 s (the merge target).""" root = _fresh_repo(tmp_path) snap_id = compute_snapshot_id({}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={}, created_at=_NOW)) base_id = _make_commit(root, None, snap_id, "base", 0) tip_a = _write_chain(root, 5_000, "a", base_id) tip_b = _write_chain(root, 5_000, "b", base_id) t0 = time.perf_counter() result = find_merge_base(root, tip_a, tip_b) elapsed = time.perf_counter() - t0 assert result == base_id assert elapsed < 30, f"find_merge_base 5k took {elapsed:.2f}s (limit: 30s)" # --------------------------------------------------------------------------- # TestFullMergeScaleSlow — the 30-second overall target (@slow) # --------------------------------------------------------------------------- @pytest.mark.slow class TestFullMergeScaleSlow: """End-to-end scale targets that require @slow to keep CI fast.""" def test_full_pipeline_75k_5k_5k_1k_conflicts_under_30s(self) -> None: """Full pipeline: 75k files, 5k ours, 5k theirs, 1k conflicts — < 30 s.""" base, ours, theirs = _build_manifests(75_000, 5_000, 5_000, 1_000) t0 = time.perf_counter() oc = diff_snapshots(base, ours) tc = diff_snapshots(base, theirs) conflicts = detect_conflicts(oc, tc, ours, theirs) merged = apply_merge(base, ours, theirs, oc, tc, conflicts) elapsed = time.perf_counter() - t0 assert len(conflicts) == 1_000 assert len(merged) == 75_000 assert elapsed < 30, f"Full pipeline 75k+1k conflicts took {elapsed:.2f}s (limit: 30s)" def test_snapshot_io_75k_three_reads_plus_write_under_30s( self, tmp_path: pathlib.Path ) -> None: """3 reads + 1 write of a 75k-file snapshot (realistic merge I/O) < 30 s.""" from muse.core.snapshots import read_snapshot root = _fresh_repo(tmp_path) manifest = {f"f{i:06d}.py": _s256(bytes([i % 256] * 64)) for i in range(75_000)} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest, created_at=_NOW)) t0 = time.perf_counter() for _ in range(3): snap = read_snapshot(root, snap_id) assert snap is not None and len(snap.manifest) == 75_000 # write the merged result merged_snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=merged_snap_id, manifest=manifest, created_at=_NOW)) elapsed = time.perf_counter() - t0 assert elapsed < 30, f"3 reads + 1 write 75k took {elapsed:.2f}s (limit: 30s)" def test_write_merge_state_1k_conflicts_roundtrip( self, tmp_path: pathlib.Path ) -> None: """write_merge_state + read_merge_state with 1 000 conflict paths round-trips cleanly.""" from muse.core.merge_engine import read_merge_state, write_merge_state root = _fresh_repo(tmp_path) conflict_paths = [f"conflict/f{i:04d}.py" for i in range(1_000)] write_merge_state( root, base_commit="a" * 64, ours_commit="b" * 64, theirs_commit="c" * 64, conflict_paths=conflict_paths, other_branch="feat/big-change", ) state = read_merge_state(root) assert state is not None assert len(state.conflict_paths) == 1_000 assert state.other_branch == "feat/big-change" # Paths must round-trip correctly assert sorted(state.conflict_paths) == sorted(conflict_paths)