"""Tests for muse.core.cohen_transform — the Cohen Transform three-way merge. Named in honour of Bram Cohen (creator of BitTorrent, Manyana CRDT weave), whose conflict-presentation insight is the direct inspiration for this module. Test categories --------------- TestClassifyAction — unit: classify_action() TestAnnotateHunkAction — unit: annotate_hunk_action() TestComputeRegions — unit: compute_regions() / _find_sync_regions() TestThreeWayMergeLines — unit: three_way_merge_lines() clean + conflict TestConflictMarkerFormat — unit: marker syntax, diff3 style, label content TestFormatConflictDiff — unit: format_conflict_diff() rendering TestEdgeCases — unit: empty sequences, binary-safe text, unicode TestIntegration — integration: realistic multi-hunk scenarios TestStress — stress: large files, many conflicts, many hunks TestDataIntegrity — data integrity: determinism, idempotency TestSecurity — security: ANSI injection, path traversal, null bytes """ from __future__ import annotations import difflib import pathlib import threading import time from collections.abc import Sequence from unittest.mock import MagicMock import pytest from muse.core.cohen_transform import ( CONFLICT_SEPARATOR, MergeRegion, annotate_hunk_action, classify_action, compute_regions, format_conflict_diff, three_way_merge_lines, ) # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _lines(*text: str) -> list[str]: """Split text into lines, each ending with \\n.""" return [ln + "\n" for ln in "\n".join(text).split("\n") if ln + "\n"] def _merge(base: Sequence[str], ours: Sequence[str], theirs: Sequence[str], **kw: str) -> tuple[list[str], bool]: return three_way_merge_lines( _lines(*base) if isinstance(base, (list, tuple)) else list(base), _lines(*ours) if isinstance(ours, (list, tuple)) else list(ours), _lines(*theirs) if isinstance(theirs, (list, tuple)) else list(theirs), **kw, ) def _has_marker(lines: list[str], marker: str) -> bool: return any(marker in ln for ln in lines) # ────────────────────────────────────────────────────────────────────────────── # TestClassifyAction # ────────────────────────────────────────────────────────────────────────────── class TestClassifyAction: def test_inserted_when_base_empty(self) -> None: assert classify_action([], ["new\n"]) == "inserted" def test_deleted_when_other_empty(self) -> None: assert classify_action(["old\n"], []) == "deleted" def test_modified_when_both_non_empty(self) -> None: assert classify_action(["old\n"], ["new\n"]) == "modified" def test_modified_when_multi_line_both(self) -> None: assert classify_action(["a\n", "b\n"], ["c\n", "d\n"]) == "modified" def test_inserted_single_line(self) -> None: assert classify_action([], ["x\n"]) == "inserted" def test_deleted_single_line(self) -> None: assert classify_action(["x\n"], []) == "deleted" # ────────────────────────────────────────────────────────────────────────────── # TestAnnotateHunkAction # ────────────────────────────────────────────────────────────────────────────── class TestAnnotateHunkAction: def _make_hunk(self, adds: int = 0, dels: int = 0) -> list[str]: lines = ["--- a/f.py", "+++ b/f.py", "@@ -1,3 +1,3 @@", " context\n"] lines.extend([f"+added{i}\n" for i in range(adds)]) lines.extend([f"-deleted{i}\n" for i in range(dels)]) return lines def test_pure_insert_hunk_labelled_inserted(self) -> None: hunk = self._make_hunk(adds=2, dels=0) result = annotate_hunk_action(hunk, "ours") at_line = next(ln for ln in result if ln.startswith("@@")) assert "[ours: inserted]" in at_line def test_pure_delete_hunk_labelled_deleted(self) -> None: hunk = self._make_hunk(adds=0, dels=2) result = annotate_hunk_action(hunk, "theirs") at_line = next(ln for ln in result if ln.startswith("@@")) assert "[theirs: deleted]" in at_line def test_mixed_hunk_labelled_modified(self) -> None: hunk = self._make_hunk(adds=1, dels=1) result = annotate_hunk_action(hunk, "ours") at_line = next(ln for ln in result if ln.startswith("@@")) assert "[ours: modified]" in at_line def test_header_lines_preserved(self) -> None: hunk = self._make_hunk(adds=1) result = annotate_hunk_action(hunk, "ours") assert any(ln.startswith("---") for ln in result) assert any(ln.startswith("+++") for ln in result) def test_multiple_hunks_each_annotated(self) -> None: hunk = [ "--- a/f.py", "+++ b/f.py", "@@ -1,2 +1,2 @@", " ctx\n", "+add1\n", "@@ -10,2 +10,2 @@", " ctx2\n", "-del1\n", ] result = annotate_hunk_action(hunk, "ours") at_lines = [ln for ln in result if ln.startswith("@@")] assert len(at_lines) == 2 assert "[ours: inserted]" in at_lines[0] assert "[ours: deleted]" in at_lines[1] def test_empty_hunk_list_returns_empty(self) -> None: assert annotate_hunk_action([], "ours") == [] def test_no_at_markers_unchanged(self) -> None: lines = ["--- a/f.py", "+++ b/f.py", " context\n"] result = annotate_hunk_action(lines, "ours") assert result == lines def test_side_label_appears_in_annotation(self) -> None: hunk = self._make_hunk(adds=1) for label in ("main", "feature/auth", "theirs"): result = annotate_hunk_action(hunk, label) at_line = next(ln for ln in result if ln.startswith("@@")) assert label in at_line # ────────────────────────────────────────────────────────────────────────────── # TestComputeRegions # ────────────────────────────────────────────────────────────────────────────── class TestComputeRegions: def test_all_stable_no_changes(self) -> None: lines = ["a\n", "b\n", "c\n"] regions = compute_regions(lines, lines, lines) kinds = [r.kind for r in regions] assert all(k == "stable" for k in kinds) def test_ours_only(self) -> None: base = ["a\n", "b\n", "c\n"] ours = ["a\n", "B\n", "c\n"] regions = compute_regions(base, ours, base) conflict_kinds = [r.kind for r in regions] assert "ours_only" in conflict_kinds assert "conflict" not in conflict_kinds def test_theirs_only(self) -> None: base = ["a\n", "b\n", "c\n"] theirs = ["a\n", "T\n", "c\n"] regions = compute_regions(base, base, theirs) kinds = [r.kind for r in regions] assert "theirs_only" in kinds assert "conflict" not in kinds def test_both_same(self) -> None: base = ["a\n", "b\n"] changed = ["a\n", "X\n"] regions = compute_regions(base, changed, changed) kinds = [r.kind for r in regions] assert "both_same" in kinds assert "conflict" not in kinds def test_conflict(self) -> None: base = ["a\n", "b\n"] ours = ["a\n", "O\n"] theirs = ["a\n", "T\n"] regions = compute_regions(base, ours, theirs) kinds = [r.kind for r in regions] assert "conflict" in kinds def test_empty_sequences(self) -> None: regions = compute_regions([], [], []) assert regions == [] def test_region_lines_are_lists(self) -> None: base = ["a\n", "b\n"] ours = ["a\n", "X\n"] for r in compute_regions(base, ours, base): assert isinstance(r.base_lines, list) assert isinstance(r.ours_lines, list) assert isinstance(r.theirs_lines, list) def test_stable_region_all_three_equal(self) -> None: lines = ["x\n", "y\n"] regions = compute_regions(lines, lines, lines) for r in regions: assert r.base_lines == r.ours_lines == r.theirs_lines def test_pure_insertion_ours(self) -> None: base = ["a\n", "c\n"] ours = ["a\n", "b\n", "c\n"] # inserted b regions = compute_regions(base, ours, base) kinds = [r.kind for r in regions] assert "ours_only" in kinds or "stable" in kinds assert "conflict" not in kinds def test_pure_insertion_conflict(self) -> None: base = ["a\n", "c\n"] ours = ["a\n", "B\n", "c\n"] theirs = ["a\n", "T\n", "c\n"] regions = compute_regions(base, ours, theirs) kinds = [r.kind for r in regions] assert "conflict" in kinds # ────────────────────────────────────────────────────────────────────────────── # TestThreeWayMergeLines — clean merges # ────────────────────────────────────────────────────────────────────────────── class TestThreeWayMergeLines: # ── Clean cases ────────────────────────────────────────────────────────── def test_no_changes_returns_base(self) -> None: base = ["a\n", "b\n", "c\n"] merged, conflict = three_way_merge_lines(base, base, base) assert merged == base assert not conflict def test_ours_only_change_applied(self) -> None: base = ["a\n", "b\n", "c\n"] ours = ["a\n", "B\n", "c\n"] merged, conflict = three_way_merge_lines(base, ours, base) assert merged == ours assert not conflict def test_theirs_only_change_applied(self) -> None: base = ["a\n", "b\n", "c\n"] theirs = ["a\n", "T\n", "c\n"] merged, conflict = three_way_merge_lines(base, base, theirs) assert merged == theirs assert not conflict def test_both_same_change_applied_once(self) -> None: base = ["a\n", "b\n"] changed = ["a\n", "X\n"] merged, conflict = three_way_merge_lines(base, changed, changed) assert merged == changed assert not conflict def test_non_overlapping_changes_both_applied(self) -> None: base = ["a\n", "b\n", "c\n", "d\n"] ours = ["a\n", "B\n", "c\n", "d\n"] # b→B theirs = ["a\n", "b\n", "c\n", "D\n"] # d→D merged, conflict = three_way_merge_lines(base, ours, theirs) assert "B\n" in merged assert "D\n" in merged assert not conflict def test_ours_inserts_line(self) -> None: base = ["a\n", "c\n"] ours = ["a\n", "b\n", "c\n"] merged, conflict = three_way_merge_lines(base, ours, base) assert merged == ours assert not conflict def test_theirs_deletes_line(self) -> None: base = ["a\n", "b\n", "c\n"] theirs = ["a\n", "c\n"] merged, conflict = three_way_merge_lines(base, base, theirs) assert merged == theirs assert not conflict def test_empty_base_both_add(self) -> None: # Both add same content to empty base → clean added = ["line1\n", "line2\n"] merged, conflict = three_way_merge_lines([], added, added) assert merged == added assert not conflict # ── Conflict cases ─────────────────────────────────────────────────────── def test_conflict_detected(self) -> None: base = ["a\n", "b\n"] ours = ["a\n", "O\n"] theirs = ["a\n", "T\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert conflict def test_conflict_has_ours_marker(self) -> None: base = ["a\n"] ours = ["O\n"] theirs = ["T\n"] merged, _ = three_way_merge_lines(base, ours, theirs) assert _has_marker(merged, "<<<<<<<") def test_conflict_has_base_marker(self) -> None: base = ["a\n"] ours = ["O\n"] theirs = ["T\n"] merged, _ = three_way_merge_lines(base, ours, theirs) assert _has_marker(merged, "|||||||") def test_conflict_has_sep_marker(self) -> None: base = ["a\n"] ours = ["O\n"] theirs = ["T\n"] merged, _ = three_way_merge_lines(base, ours, theirs) assert _has_marker(merged, "=======") def test_conflict_has_end_marker(self) -> None: base = ["a\n"] ours = ["O\n"] theirs = ["T\n"] merged, _ = three_way_merge_lines(base, ours, theirs) assert _has_marker(merged, ">>>>>>> end conflict") def test_conflict_ours_content_present(self) -> None: base = ["x\n"] ours = ["ours_line\n"] theirs = ["theirs_line\n"] merged, _ = three_way_merge_lines(base, ours, theirs) assert any("ours_line" in ln for ln in merged) def test_conflict_theirs_content_present(self) -> None: base = ["x\n"] ours = ["ours_line\n"] theirs = ["theirs_line\n"] merged, _ = three_way_merge_lines(base, ours, theirs) assert any("theirs_line" in ln for ln in merged) def test_conflict_base_content_in_diff3_section(self) -> None: base = ["base_line\n"] ours = ["ours_line\n"] theirs = ["theirs_line\n"] merged, _ = three_way_merge_lines(base, ours, theirs) assert any("base_line" in ln for ln in merged) def test_stable_lines_outside_conflict_preserved(self) -> None: base = ["preamble\n", "conflict_zone\n", "epilogue\n"] ours = ["preamble\n", "ours_content\n", "epilogue\n"] theirs = ["preamble\n", "theirs_content\n", "epilogue\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert conflict assert any("preamble" in ln for ln in merged) assert any("epilogue" in ln for ln in merged) def test_multiple_independent_conflicts(self) -> None: base = ["a\n", "b\n", "c\n", "d\n", "e\n"] ours = ["a\n", "O1\n", "c\n", "O2\n", "e\n"] theirs = ["a\n", "T1\n", "c\n", "T2\n", "e\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert conflict assert sum(1 for ln in merged if ln.startswith("<<<<<<< ")) == 2 def test_both_empty_insertion_at_same_point_conflicts(self) -> None: base = ["a\n", "c\n"] ours = ["a\n", "B\n", "c\n"] theirs = ["a\n", "T\n", "c\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert conflict def test_custom_labels_appear_in_markers(self) -> None: base = ["x\n"] ours = ["O\n"] theirs = ["T\n"] merged, _ = three_way_merge_lines( base, ours, theirs, label_ours="feature/login", label_base="merge-base", label_theirs="main", ) assert any("feature/login" in ln for ln in merged) assert any("main" in ln for ln in merged) assert any("merge-base" in ln for ln in merged) # ────────────────────────────────────────────────────────────────────────────── # TestConflictMarkerFormat — exact marker syntax / Cohen action labels # ────────────────────────────────────────────────────────────────────────────── class TestConflictMarkerFormat: def _conflict_merged(self, base_text: str, ours_text: str, theirs_text: str) -> list[str]: base = base_text.splitlines(keepends=True) ours = ours_text.splitlines(keepends=True) theirs = theirs_text.splitlines(keepends=True) merged, _ = three_way_merge_lines(base, ours, theirs) return merged def test_deleted_action_on_ours_marker(self) -> None: merged = self._conflict_merged("func()\n", "", "other()\n") assert any("<<<<<<< ours [deleted]" in ln for ln in merged) def test_inserted_action_on_theirs_marker(self) -> None: merged = self._conflict_merged("", "ours_line\n", "theirs_line\n") assert any("[inserted]" in ln for ln in merged) def test_modified_action_on_modified_conflict(self) -> None: merged = self._conflict_merged("original\n", "ours_ver\n", "theirs_ver\n") assert any("[modified]" in ln for ln in merged) def test_base_section_marker_present(self) -> None: merged = self._conflict_merged("base\n", "ours\n", "theirs\n") assert any("|||||||" in ln for ln in merged) def test_end_conflict_marker_present(self) -> None: merged = self._conflict_merged("base\n", "ours\n", "theirs\n") assert any(">>>>>>> end conflict" in ln for ln in merged) def test_marker_ordering(self) -> None: """<<<<<<< must precede ||||||| must precede ======= must precede >>>>>>>.""" merged = self._conflict_merged("base\n", "ours\n", "theirs\n") positions = {} for i, ln in enumerate(merged): if "<<<<<<<" in ln: positions["start"] = i elif "|||||||" in ln: positions["base"] = i elif "=======" in ln: positions["sep"] = i elif ">>>>>>>" in ln: positions["end"] = i assert positions["start"] < positions["base"] < positions["sep"] < positions["end"] # ────────────────────────────────────────────────────────────────────────────── # TestFormatConflictDiff # ────────────────────────────────────────────────────────────────────────────── class TestFormatConflictDiff: """Tests for format_conflict_diff() — the muse diff --conflict renderer.""" def _make_manifests(self) -> tuple[dict, dict, dict, MagicMock]: base_content = b"base line\n" ours_content = b"ours line\n" theirs_content = b"theirs line\n" path = "src/util.py" base_oid = "aaaa" * 16 ours_oid = "bbbb" * 16 theirs_oid = "cccc" * 16 base_manifest = {path: base_oid} ours_manifest = {path: ours_oid} theirs_manifest = {path: theirs_oid} def _read(root: pathlib.Path, oid: str) -> bytes | None: return {base_oid: base_content, ours_oid: ours_content, theirs_oid: theirs_content}.get(oid) return base_manifest, ours_manifest, theirs_manifest, _read def test_output_contains_conflict_header(self, tmp_path: pathlib.Path) -> None: base_m, ours_m, theirs_m, read_fn = self._make_manifests() lines = format_conflict_diff( "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, ) assert any("CONFLICT" in ln for ln in lines) assert any("src/util.py" in ln for ln in lines) def test_output_contains_ours_section(self, tmp_path: pathlib.Path) -> None: base_m, ours_m, theirs_m, read_fn = self._make_manifests() lines = format_conflict_diff( "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, ) assert any("[ours]" in ln for ln in lines) def test_output_contains_theirs_section(self, tmp_path: pathlib.Path) -> None: base_m, ours_m, theirs_m, read_fn = self._make_manifests() lines = format_conflict_diff( "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, ) assert any("[theirs]" in ln for ln in lines) def test_custom_labels_appear(self, tmp_path: pathlib.Path) -> None: base_m, ours_m, theirs_m, read_fn = self._make_manifests() lines = format_conflict_diff( "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, ours_label="feature/auth", theirs_label="main", ) assert any("feature/auth" in ln for ln in lines) assert any("main" in ln for ln in lines) def test_returns_list_of_strings(self, tmp_path: pathlib.Path) -> None: base_m, ours_m, theirs_m, read_fn = self._make_manifests() result = format_conflict_diff( "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, ) assert isinstance(result, list) assert all(isinstance(ln, str) for ln in result) def test_ansi_sanitized_in_path(self, tmp_path: pathlib.Path) -> None: """ANSI escape sequences in the path must not appear in output.""" malicious_path = "\x1b[31mmalicious\x1b[0m/file.py" base_m, ours_m, theirs_m, read_fn = self._make_manifests() # Use the malicious path (will have no manifest entry → empty diffs) lines = format_conflict_diff( malicious_path, tmp_path, {}, {}, {}, read_fn, ) output = "\n".join(lines) assert "\x1b[31m" not in output def test_missing_file_shows_no_changes_message(self, tmp_path: pathlib.Path) -> None: def _read(root: pathlib.Path, oid: str) -> bytes | None: return None lines = format_conflict_diff( "missing.py", tmp_path, {}, {}, {}, _read, ) assert any("no changes" in ln.lower() for ln in lines) def test_no_color_mode(self, tmp_path: pathlib.Path) -> None: base_m, ours_m, theirs_m, read_fn = self._make_manifests() lines = format_conflict_diff( "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, use_color=False, ) output = "\n".join(lines) assert "\x1b[" not in output # ────────────────────────────────────────────────────────────────────────────── # TestEdgeCases # ────────────────────────────────────────────────────────────────────────────── class TestEdgeCases: def test_all_empty(self) -> None: merged, conflict = three_way_merge_lines([], [], []) assert merged == [] assert not conflict def test_base_empty_ours_adds(self) -> None: merged, conflict = three_way_merge_lines([], ["new\n"], []) assert merged == ["new\n"] assert not conflict def test_base_empty_theirs_adds(self) -> None: merged, conflict = three_way_merge_lines([], [], ["new\n"]) assert merged == ["new\n"] assert not conflict def test_base_empty_both_add_same(self) -> None: merged, conflict = three_way_merge_lines([], ["same\n"], ["same\n"]) assert merged == ["same\n"] assert not conflict def test_base_empty_both_add_different_conflicts(self) -> None: _, conflict = three_way_merge_lines([], ["A\n"], ["B\n"]) assert conflict def test_ours_equals_base_theirs_deletes(self) -> None: base = ["x\n"] merged, conflict = three_way_merge_lines(base, base, []) assert merged == [] assert not conflict def test_both_delete_everything(self) -> None: base = ["x\n", "y\n"] merged, conflict = three_way_merge_lines(base, [], []) assert merged == [] assert not conflict def test_unicode_content(self) -> None: base = ["héllo\n", "wörld\n"] ours = ["héllo\n", "WÖRLD\n"] theirs = ["héllo\n", "wörld\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert not conflict assert "WÖRLD\n" in merged def test_long_lines_no_crash(self) -> None: long_line = "x" * 10_000 + "\n" base = [long_line] ours = ["y" * 10_000 + "\n"] theirs = [long_line] merged, conflict = three_way_merge_lines(base, ours, theirs) assert not conflict def test_many_identical_lines(self) -> None: base = ["same\n"] * 1000 merged, conflict = three_way_merge_lines(base, base, base) assert merged == base assert not conflict def test_lines_without_trailing_newline(self) -> None: # Not all editors guarantee trailing newlines. base = ["no newline"] ours = ["changed"] merged, conflict = three_way_merge_lines(base, ours, base) assert not conflict assert "changed" in merged[0] # ────────────────────────────────────────────────────────────────────────────── # TestIntegration — realistic multi-hunk merges # ────────────────────────────────────────────────────────────────────────────── class TestIntegration: """Realistic multi-hunk merge scenarios matching real developer workflows.""" def test_ours_adds_docstring_theirs_renames_param(self) -> None: base = [ "def calculate(x):\n", " return x * 2\n", ] ours = [ "def calculate(x):\n", ' """Double the input."""\n', " return x * 2\n", ] theirs = [ "def calculate(value):\n", " return value * 2\n", ] merged, conflict = three_way_merge_lines(base, ours, theirs) # ours inserted a docstring; theirs renamed x→value. # They touch different parts of the function — should be conflict-free # if ours and theirs don't overlap on line 1. # (They DO overlap on line 1 if difflib groups them — conflict is acceptable) # Just assert no crash and well-formed output. assert isinstance(merged, list) assert isinstance(conflict, bool) def test_both_fix_same_typo(self) -> None: base = ["# Calcualtion module\n"] ours = ["# Calculation module\n"] theirs = ["# Calculation module\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert not conflict assert merged == ours def test_ours_deletes_block_theirs_modifies_different_block(self) -> None: base = [ "block_a_line_1\n", "block_a_line_2\n", "separator\n", "block_b_line_1\n", "block_b_line_2\n", ] ours = [ "separator\n", "block_b_line_1\n", "block_b_line_2\n", ] theirs = [ "block_a_line_1\n", "block_a_line_2\n", "separator\n", "block_b_LINE_1\n", # modified "block_b_line_2\n", ] merged, conflict = three_way_merge_lines(base, ours, theirs) # No overlap between ours' deletion (top) and theirs' modification (bottom) assert not conflict assert "block_b_LINE_1\n" in merged assert "block_a_line_1\n" not in merged def test_conflict_preserves_stable_context_above_and_below(self) -> None: base = ["header\n", "conflict_zone\n", "footer\n"] ours = ["header\n", "ours_version\n", "footer\n"] theirs = ["header\n", "theirs_version\n", "footer\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert conflict text = "".join(merged) assert "header\n" in text assert "footer\n" in text def test_append_only_change_ours(self) -> None: base = ["line1\n", "line2\n"] ours = ["line1\n", "line2\n", "appended\n"] merged, conflict = three_way_merge_lines(base, ours, base) assert not conflict assert merged == ours def test_prepend_only_change_theirs(self) -> None: base = ["line1\n"] theirs = ["prepended\n", "line1\n"] merged, conflict = three_way_merge_lines(base, base, theirs) assert not conflict assert merged == theirs def test_roundtrip_clean_merge_deterministic(self) -> None: base = ["a\n", "b\n", "c\n"] ours = ["a\n", "B\n", "c\n"] theirs = ["a\n", "b\n", "C\n"] m1, c1 = three_way_merge_lines(base, ours, theirs) m2, c2 = three_way_merge_lines(base, ours, theirs) assert m1 == m2 assert c1 == c2 # ────────────────────────────────────────────────────────────────────────────── # TestStress # ────────────────────────────────────────────────────────────────────────────── class TestStress: def test_large_file_clean_merge_fast(self) -> None: """500-line file with non-overlapping changes in each half: < 2 s.""" base = [f"line_{i:04d}\n" for i in range(500)] ours = list(base) ours[50] = "OURS_CHANGE\n" theirs = list(base) theirs[450] = "THEIRS_CHANGE\n" start = time.monotonic() merged, conflict = three_way_merge_lines(base, ours, theirs) elapsed = time.monotonic() - start assert not conflict assert elapsed < 2.0, f"merge took {elapsed:.2f}s" assert "OURS_CHANGE\n" in merged assert "THEIRS_CHANGE\n" in merged def test_many_conflicts_no_crash(self) -> None: """Alternating conflict zones throughout a 200-line file.""" base = [f"line_{i}\n" for i in range(200)] ours = list(base) theirs = list(base) # Every 10th line conflicts. for i in range(0, 200, 10): ours[i] = f"ours_{i}\n" theirs[i] = f"theirs_{i}\n" merged, conflict = three_way_merge_lines(base, ours, theirs) assert conflict assert isinstance(merged, list) def test_concurrent_merges_consistent(self) -> None: """Concurrent invocations must return identical results (no shared mutable state).""" base = ["a\n", "conflict\n", "b\n"] ours = ["a\n", "ours\n", "b\n"] theirs = ["a\n", "theirs\n", "b\n"] results: list[tuple[list[str], bool]] = [] errors: list[Exception] = [] def _run() -> None: try: results.append(three_way_merge_lines(base, ours, theirs)) except Exception as exc: errors.append(exc) threads = [threading.Thread(target=_run) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() assert not errors assert len(results) == 10 assert all(r == results[0] for r in results) # ────────────────────────────────────────────────────────────────────────────── # TestDataIntegrity # ────────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: def test_clean_merge_is_deterministic(self) -> None: base = ["a\n", "b\n", "c\n"] ours = ["a\n", "B\n", "c\n"] theirs = ["a\n", "b\n", "C\n"] r1, c1 = three_way_merge_lines(base, ours, theirs) r2, c2 = three_way_merge_lines(base, ours, theirs) assert r1 == r2 assert c1 == c2 def test_conflict_output_is_deterministic(self) -> None: base = ["x\n"] ours = ["O\n"] theirs = ["T\n"] r1, _ = three_way_merge_lines(base, ours, theirs) r2, _ = three_way_merge_lines(base, ours, theirs) assert r1 == r2 def test_input_sequences_not_mutated(self) -> None: base = ["a\n", "b\n"] ours = ["a\n", "O\n"] theirs = ["a\n", "T\n"] base_copy = list(base) ours_copy = list(ours) theirs_copy = list(theirs) three_way_merge_lines(base, ours, theirs) assert base == base_copy assert ours == ours_copy assert theirs == theirs_copy def test_merged_content_contains_all_clean_changes(self) -> None: # ours changes b→B (pos 1); theirs changes d→D (pos 3). # 'a' (pos 0) and 'c' (pos 2) are stable in both LCS runs, # so the two changes land in separate non-overlapping regions → clean. base = ["a\n", "b\n", "c\n", "d\n"] ours = ["a\n", "B\n", "c\n", "d\n"] theirs = ["a\n", "b\n", "c\n", "D\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert not conflict merged_str = "".join(merged) assert "B\n" in merged_str assert "D\n" in merged_str assert "a\n" in merged_str assert "c\n" in merged_str def test_conflict_markers_are_balanced(self) -> None: """Every <<<<<<< must have a matching >>>>>>>.""" base = ["x\n", "y\n"] ours = ["O1\n", "O2\n"] theirs = ["T1\n", "T2\n"] merged, _ = three_way_merge_lines(base, ours, theirs) opens = sum(1 for ln in merged if ln.startswith("<<<<<<<")) closes = sum(1 for ln in merged if ln.startswith(">>>>>>>")) assert opens == closes assert opens >= 1 # ────────────────────────────────────────────────────────────────────────────── # TestSecurity # ────────────────────────────────────────────────────────────────────────────── class TestSecurity: def test_ansi_in_content_does_not_spoof_markers(self) -> None: """ANSI sequences in file content must not produce fake conflict markers.""" base = ["normal\n"] ours = ["\x1b[31m<<<<<<< fake ours\x1b[0m\n"] theirs = ["other\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) # The ANSI-containing line is just content — the conflict marker check # should find real markers (7 < chars), not ANSI-faked ones. real_opens = sum(1 for ln in merged if ln.startswith("<<<<<<<")) # There may be a real conflict (ours ≠ theirs), but the ANSI line itself # should appear as content, not as an extra conflict opener. assert isinstance(merged, list) assert isinstance(conflict, bool) def test_null_bytes_in_content_handled(self) -> None: """Null bytes in text content must not crash the merge.""" base = ["a\x00b\n"] ours = ["a\x00B\n"] theirs = ["a\x00b\n"] merged, conflict = three_way_merge_lines(base, ours, theirs) assert not conflict # only ours changed def test_very_long_label_no_overflow(self) -> None: long_label = "x" * 2000 base = ["a\n"] ours = ["O\n"] theirs = ["T\n"] merged, conflict = three_way_merge_lines( base, ours, theirs, label_ours=long_label, label_theirs=long_label ) assert conflict assert any(long_label in ln for ln in merged) def test_label_injection_via_newline(self) -> None: """Newline in a label must not inject extra lines into markers.""" malicious = "branch\n>>>>>>> malicious injection" base = ["a\n"] ours = ["O\n"] theirs = ["T\n"] merged, _ = three_way_merge_lines(base, ours, theirs, label_ours=malicious) # The malicious label appears in the <<<<<<< line; verify no phantom >>>>>>> before the real one positions_of_end = [i for i, ln in enumerate(merged) if ">>>>>>> end conflict" in ln] positions_of_open = [i for i, ln in enumerate(merged) if ln.startswith("<<<<<<<")] assert len(positions_of_end) >= 1 assert len(positions_of_open) >= 1 def test_format_conflict_diff_ansi_in_path_sanitized( self, tmp_path: pathlib.Path ) -> None: """ANSI codes in the file path argument must not appear in rendered output.""" malicious_path = "\x1b[31minjected\x1b[0m/file.py" lines = format_conflict_diff( malicious_path, tmp_path, {}, {}, {}, lambda root, oid: None, ) output = "\n".join(lines) assert "\x1b[31m" not in output