"""Stress tests for the commit DAG and merge-base algorithm. Exercises: - Linear chains of 500 commits. - Wide fan-out / fan-in (octopus merge shapes). - Criss-cross merge (ambiguous LCA — should still find *some* ancestor). - Independent histories (no common ancestor → None). - find_merge_base symmetry: find_merge_base(a, b) == find_merge_base(b, a). - Missing commit handles gracefully (None parent pointers in corrupt graphs). - Diamond topology: four-node diamond always finds the root. - Double diamond: two diamonds chained together. - Long parallel branches that converge at a single point. """ import datetime import pathlib import pytest from muse.core.types import fake_id from muse.core.paths import muse_dir from muse.core.merge_engine import find_merge_base from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _SNAP_ID: str = compute_snapshot_id({}) _BASE_TS: datetime.datetime = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) def _write( root: pathlib.Path, label: str, parent: str | None = None, parent2: str | None = None, ) -> str: """Write a commit with a real content-addressed ID. Returns the commit_id.""" parent_ids = [p for p in (parent, parent2) if p is not None] cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=_SNAP_ID, message=label, committed_at_iso=_BASE_TS.isoformat(), ) write_commit(root, CommitRecord( commit_id=cid, branch="main", snapshot_id=_SNAP_ID, message=label, committed_at=_BASE_TS, parent_commit_id=parent, parent2_commit_id=parent2, )) return cid @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) return tmp_path # --------------------------------------------------------------------------- # Linear chain # --------------------------------------------------------------------------- class TestLinearChain: def test_chain_of_500_finds_base(self, repo: pathlib.Path) -> None: """LCA of two commits on a 500-long linear chain is the shared ancestor.""" prev: str | None = None ids: list[str] = [] for i in range(500): cid = _write(repo, f"c{i:04d}", prev) ids.append(cid) prev = cid # Branch off at the commit at index 100 branch_tip_id = _write(repo, "branch-tip", ids[100]) base = find_merge_base(repo, ids[499], branch_tip_id) assert base == ids[100] def test_lca_of_adjacent_commits_is_parent(self, repo: pathlib.Path) -> None: root_id = _write(repo, "root") child_id = _write(repo, "child", root_id) assert find_merge_base(repo, root_id, child_id) == root_id assert find_merge_base(repo, child_id, root_id) == root_id def test_long_chain_lca_symmetry(self, repo: pathlib.Path) -> None: """find_merge_base(a, b) == find_merge_base(b, a) on a long chain.""" prev: str | None = None ids: list[str] = [] for i in range(100): cid = _write(repo, f"n{i:03d}", prev) ids.append(cid) prev = cid left_id = _write(repo, "left", ids[50]) right_id = _write(repo, "right", ids[50]) assert find_merge_base(repo, left_id, right_id) == ids[50] assert find_merge_base(repo, right_id, left_id) == ids[50] def test_same_commit_returns_itself(self, repo: pathlib.Path) -> None: solo_id = _write(repo, "solo") assert find_merge_base(repo, solo_id, solo_id) == solo_id def test_one_is_ancestor_of_other(self, repo: pathlib.Path) -> None: """When A is a direct ancestor of B, LCA is A.""" prev: str | None = None ids: list[str] = [] for i in range(20): cid = _write(repo, f"x{i:02d}", prev) ids.append(cid) prev = cid assert find_merge_base(repo, ids[0], ids[19]) == ids[0] assert find_merge_base(repo, ids[19], ids[0]) == ids[0] # --------------------------------------------------------------------------- # Diamond topology # --------------------------------------------------------------------------- class TestDiamondTopology: def test_simple_diamond(self, repo: pathlib.Path) -> None: """ root / \\ L R \\ / M (merge commit, not relevant here — just find LCA of L and R) """ root_id = _write(repo, "root") l_id = _write(repo, "L", root_id) r_id = _write(repo, "R", root_id) assert find_merge_base(repo, l_id, r_id) == root_id def test_double_diamond(self, repo: pathlib.Path) -> None: """ A / \\ B C \\ / D / \\ E F \\ / G LCA(E, F) should be D. """ a_id = _write(repo, "A") b_id = _write(repo, "B", a_id) c_id = _write(repo, "C", a_id) d_id = _write(repo, "D", b_id, c_id) e_id = _write(repo, "E", d_id) f_id = _write(repo, "F", d_id) assert find_merge_base(repo, e_id, f_id) == d_id def test_criss_cross_merge(self, repo: pathlib.Path) -> None: """ Criss-cross: A and B are each other's ancestor via two different merge paths. X → L1 → M1(L1,R1) X → R1 → M2(R1,L1) LCA of M1 and M2 should be either L1 or R1 (both are valid LCAs). The algorithm must not return None or crash. """ x_id = _write(repo, "X") l1_id = _write(repo, "L1", x_id) r1_id = _write(repo, "R1", x_id) m1_id = _write(repo, "M1", l1_id, r1_id) m2_id = _write(repo, "M2", r1_id, l1_id) base = find_merge_base(repo, m1_id, m2_id) # Any of X, L1, R1 is a valid common ancestor; None is not acceptable. assert base is not None assert base in {x_id, l1_id, r1_id} def test_octopus_three_branch_fan_in(self, repo: pathlib.Path) -> None: """Three branches that all diverged from the same root.""" root_id = _write(repo, "root") ba_id = _write(repo, "branch-a", root_id) bb_id = _write(repo, "branch-b", root_id) bc_id = _write(repo, "branch-c", root_id) assert find_merge_base(repo, ba_id, bb_id) == root_id assert find_merge_base(repo, ba_id, bc_id) == root_id assert find_merge_base(repo, bb_id, bc_id) == root_id # --------------------------------------------------------------------------- # Independent histories # --------------------------------------------------------------------------- class TestDisjointHistories: def test_no_common_ancestor_returns_none(self, repo: pathlib.Path) -> None: island_a_id = _write(repo, "island-a") island_b_id = _write(repo, "island-b") assert find_merge_base(repo, island_a_id, island_b_id) is None def test_long_independent_chains_return_none(self, repo: pathlib.Path) -> None: prev_a: str | None = None prev_b: str | None = None last_a = last_b = "" for i in range(20): last_a = _write(repo, f"a{i:02d}", prev_a) last_b = _write(repo, f"b{i:02d}", prev_b) prev_a = last_a prev_b = last_b assert find_merge_base(repo, last_a, last_b) is None def test_missing_commit_id_graceful(self, repo: pathlib.Path) -> None: """Asking for an LCA where one commit doesn't exist should return None, not raise.""" real_id = _write(repo, "real") result = find_merge_base(repo, real_id, fake_id("ghost-commit-that-does-not-exist")) # The ghost has no ancestors, so no common ancestor found. assert result is None # --------------------------------------------------------------------------- # Ancestor-set correctness # --------------------------------------------------------------------------- class TestAncestorCorrectness: def test_merge_commit_has_both_parents_as_ancestors(self, repo: pathlib.Path) -> None: root_id = _write(repo, "root") a_id = _write(repo, "A", root_id) b_id = _write(repo, "B", root_id) merge_id = _write(repo, "merge", a_id, b_id) feature_id = _write(repo, "feature", a_id) # LCA of feature and merge: feature branched from A, merge contains A. # So A is the common ancestor. base = find_merge_base(repo, feature_id, merge_id) assert base == a_id def test_wide_history_with_shared_root(self, repo: pathlib.Path) -> None: """100 branches diverging from a shared root, pairwise LCA is root.""" root_id = _write(repo, "root") branch_ids = [_write(repo, f"br{i:03d}", root_id) for i in range(50)] # Check a sampling of pairs for i in range(0, 50, 10): for j in range(i + 1, 50, 10): assert find_merge_base(repo, branch_ids[i], branch_ids[j]) == root_id def test_deep_branch_divergence(self, repo: pathlib.Path) -> None: """Branches diverge at root, each has 50 commits. LCA is root.""" root_id = _write(repo, "root") prev_a: str | None = root_id prev_b: str | None = root_id last_a = last_b = root_id for i in range(50): last_a = _write(repo, f"da{i:02d}", prev_a) last_b = _write(repo, f"db{i:02d}", prev_b) prev_a = last_a prev_b = last_b assert find_merge_base(repo, last_a, last_b) == root_id def test_multiple_merge_bases_chain(self, repo: pathlib.Path) -> None: """A → B → C; branch D from B. LCA of C and D is B.""" a_id = _write(repo, "A") b_id = _write(repo, "B", a_id) c_id = _write(repo, "C", b_id) d_id = _write(repo, "D", b_id) assert find_merge_base(repo, c_id, d_id) == b_id assert find_merge_base(repo, d_id, c_id) == b_id