test_core_cohen_transform.py
file-level
1
files
1
commits
0
hotspots
0
π§ dead
0
π₯ blast risk
| 1 | """Tests for muse.core.cohen_transform β the Cohen Transform three-way merge. |
| 2 | |
| 3 | Named in honour of Bram Cohen (creator of BitTorrent, Manyana CRDT weave), |
| 4 | whose conflict-presentation insight is the direct inspiration for this module. |
| 5 | |
| 6 | Test categories |
| 7 | --------------- |
| 8 | TestClassifyAction β unit: classify_action() |
| 9 | TestAnnotateHunkAction β unit: annotate_hunk_action() |
| 10 | TestComputeRegions β unit: compute_regions() / _find_sync_regions() |
| 11 | TestThreeWayMergeLines β unit: three_way_merge_lines() clean + conflict |
| 12 | TestConflictMarkerFormat β unit: marker syntax, diff3 style, label content |
| 13 | TestFormatConflictDiff β unit: format_conflict_diff() rendering |
| 14 | TestEdgeCases β unit: empty sequences, binary-safe text, unicode |
| 15 | TestIntegration β integration: realistic multi-hunk scenarios |
| 16 | TestStress β stress: large files, many conflicts, many hunks |
| 17 | TestDataIntegrity β data integrity: determinism, idempotency |
| 18 | TestSecurity β security: ANSI injection, path traversal, null bytes |
| 19 | """ |
| 20 | |
| 21 | from __future__ import annotations |
| 22 | |
| 23 | import difflib |
| 24 | import pathlib |
| 25 | import threading |
| 26 | import time |
| 27 | from collections.abc import Sequence |
| 28 | from unittest.mock import MagicMock |
| 29 | |
| 30 | import pytest |
| 31 | |
| 32 | from muse.core.cohen_transform import ( |
| 33 | CONFLICT_SEPARATOR, |
| 34 | MergeRegion, |
| 35 | annotate_hunk_action, |
| 36 | classify_action, |
| 37 | compute_regions, |
| 38 | format_conflict_diff, |
| 39 | three_way_merge_lines, |
| 40 | ) |
| 41 | |
| 42 | |
| 43 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 44 | # Helpers |
| 45 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 46 | |
| 47 | def _lines(*text: str) -> list[str]: |
| 48 | """Split text into lines, each ending with \\n.""" |
| 49 | return [ln + "\n" for ln in "\n".join(text).split("\n") if ln + "\n"] |
| 50 | |
| 51 | |
| 52 | def _merge(base: Sequence[str], ours: Sequence[str], theirs: Sequence[str], **kw: str) -> tuple[list[str], bool]: |
| 53 | return three_way_merge_lines( |
| 54 | _lines(*base) if isinstance(base, (list, tuple)) else list(base), |
| 55 | _lines(*ours) if isinstance(ours, (list, tuple)) else list(ours), |
| 56 | _lines(*theirs) if isinstance(theirs, (list, tuple)) else list(theirs), |
| 57 | **kw, |
| 58 | ) |
| 59 | |
| 60 | |
| 61 | def _has_marker(lines: list[str], marker: str) -> bool: |
| 62 | return any(marker in ln for ln in lines) |
| 63 | |
| 64 | |
| 65 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 66 | # TestClassifyAction |
| 67 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 68 | |
| 69 | class TestClassifyAction: |
| 70 | def test_inserted_when_base_empty(self) -> None: |
| 71 | assert classify_action([], ["new\n"]) == "inserted" |
| 72 | |
| 73 | def test_deleted_when_other_empty(self) -> None: |
| 74 | assert classify_action(["old\n"], []) == "deleted" |
| 75 | |
| 76 | def test_modified_when_both_non_empty(self) -> None: |
| 77 | assert classify_action(["old\n"], ["new\n"]) == "modified" |
| 78 | |
| 79 | def test_modified_when_multi_line_both(self) -> None: |
| 80 | assert classify_action(["a\n", "b\n"], ["c\n", "d\n"]) == "modified" |
| 81 | |
| 82 | def test_inserted_single_line(self) -> None: |
| 83 | assert classify_action([], ["x\n"]) == "inserted" |
| 84 | |
| 85 | def test_deleted_single_line(self) -> None: |
| 86 | assert classify_action(["x\n"], []) == "deleted" |
| 87 | |
| 88 | |
| 89 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 90 | # TestAnnotateHunkAction |
| 91 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 92 | |
| 93 | class TestAnnotateHunkAction: |
| 94 | def _make_hunk(self, adds: int = 0, dels: int = 0) -> list[str]: |
| 95 | lines = ["--- a/f.py", "+++ b/f.py", "@@ -1,3 +1,3 @@", " context\n"] |
| 96 | lines.extend([f"+added{i}\n" for i in range(adds)]) |
| 97 | lines.extend([f"-deleted{i}\n" for i in range(dels)]) |
| 98 | return lines |
| 99 | |
| 100 | def test_pure_insert_hunk_labelled_inserted(self) -> None: |
| 101 | hunk = self._make_hunk(adds=2, dels=0) |
| 102 | result = annotate_hunk_action(hunk, "ours") |
| 103 | at_line = next(ln for ln in result if ln.startswith("@@")) |
| 104 | assert "[ours: inserted]" in at_line |
| 105 | |
| 106 | def test_pure_delete_hunk_labelled_deleted(self) -> None: |
| 107 | hunk = self._make_hunk(adds=0, dels=2) |
| 108 | result = annotate_hunk_action(hunk, "theirs") |
| 109 | at_line = next(ln for ln in result if ln.startswith("@@")) |
| 110 | assert "[theirs: deleted]" in at_line |
| 111 | |
| 112 | def test_mixed_hunk_labelled_modified(self) -> None: |
| 113 | hunk = self._make_hunk(adds=1, dels=1) |
| 114 | result = annotate_hunk_action(hunk, "ours") |
| 115 | at_line = next(ln for ln in result if ln.startswith("@@")) |
| 116 | assert "[ours: modified]" in at_line |
| 117 | |
| 118 | def test_header_lines_preserved(self) -> None: |
| 119 | hunk = self._make_hunk(adds=1) |
| 120 | result = annotate_hunk_action(hunk, "ours") |
| 121 | assert any(ln.startswith("---") for ln in result) |
| 122 | assert any(ln.startswith("+++") for ln in result) |
| 123 | |
| 124 | def test_multiple_hunks_each_annotated(self) -> None: |
| 125 | hunk = [ |
| 126 | "--- a/f.py", "+++ b/f.py", |
| 127 | "@@ -1,2 +1,2 @@", " ctx\n", "+add1\n", |
| 128 | "@@ -10,2 +10,2 @@", " ctx2\n", "-del1\n", |
| 129 | ] |
| 130 | result = annotate_hunk_action(hunk, "ours") |
| 131 | at_lines = [ln for ln in result if ln.startswith("@@")] |
| 132 | assert len(at_lines) == 2 |
| 133 | assert "[ours: inserted]" in at_lines[0] |
| 134 | assert "[ours: deleted]" in at_lines[1] |
| 135 | |
| 136 | def test_empty_hunk_list_returns_empty(self) -> None: |
| 137 | assert annotate_hunk_action([], "ours") == [] |
| 138 | |
| 139 | def test_no_at_markers_unchanged(self) -> None: |
| 140 | lines = ["--- a/f.py", "+++ b/f.py", " context\n"] |
| 141 | result = annotate_hunk_action(lines, "ours") |
| 142 | assert result == lines |
| 143 | |
| 144 | def test_side_label_appears_in_annotation(self) -> None: |
| 145 | hunk = self._make_hunk(adds=1) |
| 146 | for label in ("main", "feature/auth", "theirs"): |
| 147 | result = annotate_hunk_action(hunk, label) |
| 148 | at_line = next(ln for ln in result if ln.startswith("@@")) |
| 149 | assert label in at_line |
| 150 | |
| 151 | |
| 152 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 153 | # TestComputeRegions |
| 154 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 155 | |
| 156 | class TestComputeRegions: |
| 157 | def test_all_stable_no_changes(self) -> None: |
| 158 | lines = ["a\n", "b\n", "c\n"] |
| 159 | regions = compute_regions(lines, lines, lines) |
| 160 | kinds = [r.kind for r in regions] |
| 161 | assert all(k == "stable" for k in kinds) |
| 162 | |
| 163 | def test_ours_only(self) -> None: |
| 164 | base = ["a\n", "b\n", "c\n"] |
| 165 | ours = ["a\n", "B\n", "c\n"] |
| 166 | regions = compute_regions(base, ours, base) |
| 167 | conflict_kinds = [r.kind for r in regions] |
| 168 | assert "ours_only" in conflict_kinds |
| 169 | assert "conflict" not in conflict_kinds |
| 170 | |
| 171 | def test_theirs_only(self) -> None: |
| 172 | base = ["a\n", "b\n", "c\n"] |
| 173 | theirs = ["a\n", "T\n", "c\n"] |
| 174 | regions = compute_regions(base, base, theirs) |
| 175 | kinds = [r.kind for r in regions] |
| 176 | assert "theirs_only" in kinds |
| 177 | assert "conflict" not in kinds |
| 178 | |
| 179 | def test_both_same(self) -> None: |
| 180 | base = ["a\n", "b\n"] |
| 181 | changed = ["a\n", "X\n"] |
| 182 | regions = compute_regions(base, changed, changed) |
| 183 | kinds = [r.kind for r in regions] |
| 184 | assert "both_same" in kinds |
| 185 | assert "conflict" not in kinds |
| 186 | |
| 187 | def test_conflict(self) -> None: |
| 188 | base = ["a\n", "b\n"] |
| 189 | ours = ["a\n", "O\n"] |
| 190 | theirs = ["a\n", "T\n"] |
| 191 | regions = compute_regions(base, ours, theirs) |
| 192 | kinds = [r.kind for r in regions] |
| 193 | assert "conflict" in kinds |
| 194 | |
| 195 | def test_empty_sequences(self) -> None: |
| 196 | regions = compute_regions([], [], []) |
| 197 | assert regions == [] |
| 198 | |
| 199 | def test_region_lines_are_lists(self) -> None: |
| 200 | base = ["a\n", "b\n"] |
| 201 | ours = ["a\n", "X\n"] |
| 202 | for r in compute_regions(base, ours, base): |
| 203 | assert isinstance(r.base_lines, list) |
| 204 | assert isinstance(r.ours_lines, list) |
| 205 | assert isinstance(r.theirs_lines, list) |
| 206 | |
| 207 | def test_stable_region_all_three_equal(self) -> None: |
| 208 | lines = ["x\n", "y\n"] |
| 209 | regions = compute_regions(lines, lines, lines) |
| 210 | for r in regions: |
| 211 | assert r.base_lines == r.ours_lines == r.theirs_lines |
| 212 | |
| 213 | def test_pure_insertion_ours(self) -> None: |
| 214 | base = ["a\n", "c\n"] |
| 215 | ours = ["a\n", "b\n", "c\n"] # inserted b |
| 216 | regions = compute_regions(base, ours, base) |
| 217 | kinds = [r.kind for r in regions] |
| 218 | assert "ours_only" in kinds or "stable" in kinds |
| 219 | assert "conflict" not in kinds |
| 220 | |
| 221 | def test_pure_insertion_conflict(self) -> None: |
| 222 | base = ["a\n", "c\n"] |
| 223 | ours = ["a\n", "B\n", "c\n"] |
| 224 | theirs = ["a\n", "T\n", "c\n"] |
| 225 | regions = compute_regions(base, ours, theirs) |
| 226 | kinds = [r.kind for r in regions] |
| 227 | assert "conflict" in kinds |
| 228 | |
| 229 | |
| 230 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 231 | # TestThreeWayMergeLines β clean merges |
| 232 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 233 | |
| 234 | class TestThreeWayMergeLines: |
| 235 | |
| 236 | # ββ Clean cases ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 237 | |
| 238 | def test_no_changes_returns_base(self) -> None: |
| 239 | base = ["a\n", "b\n", "c\n"] |
| 240 | merged, conflict = three_way_merge_lines(base, base, base) |
| 241 | assert merged == base |
| 242 | assert not conflict |
| 243 | |
| 244 | def test_ours_only_change_applied(self) -> None: |
| 245 | base = ["a\n", "b\n", "c\n"] |
| 246 | ours = ["a\n", "B\n", "c\n"] |
| 247 | merged, conflict = three_way_merge_lines(base, ours, base) |
| 248 | assert merged == ours |
| 249 | assert not conflict |
| 250 | |
| 251 | def test_theirs_only_change_applied(self) -> None: |
| 252 | base = ["a\n", "b\n", "c\n"] |
| 253 | theirs = ["a\n", "T\n", "c\n"] |
| 254 | merged, conflict = three_way_merge_lines(base, base, theirs) |
| 255 | assert merged == theirs |
| 256 | assert not conflict |
| 257 | |
| 258 | def test_both_same_change_applied_once(self) -> None: |
| 259 | base = ["a\n", "b\n"] |
| 260 | changed = ["a\n", "X\n"] |
| 261 | merged, conflict = three_way_merge_lines(base, changed, changed) |
| 262 | assert merged == changed |
| 263 | assert not conflict |
| 264 | |
| 265 | def test_non_overlapping_changes_both_applied(self) -> None: |
| 266 | base = ["a\n", "b\n", "c\n", "d\n"] |
| 267 | ours = ["a\n", "B\n", "c\n", "d\n"] # bβB |
| 268 | theirs = ["a\n", "b\n", "c\n", "D\n"] # dβD |
| 269 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 270 | assert "B\n" in merged |
| 271 | assert "D\n" in merged |
| 272 | assert not conflict |
| 273 | |
| 274 | def test_ours_inserts_line(self) -> None: |
| 275 | base = ["a\n", "c\n"] |
| 276 | ours = ["a\n", "b\n", "c\n"] |
| 277 | merged, conflict = three_way_merge_lines(base, ours, base) |
| 278 | assert merged == ours |
| 279 | assert not conflict |
| 280 | |
| 281 | def test_theirs_deletes_line(self) -> None: |
| 282 | base = ["a\n", "b\n", "c\n"] |
| 283 | theirs = ["a\n", "c\n"] |
| 284 | merged, conflict = three_way_merge_lines(base, base, theirs) |
| 285 | assert merged == theirs |
| 286 | assert not conflict |
| 287 | |
| 288 | def test_empty_base_both_add(self) -> None: |
| 289 | # Both add same content to empty base β clean |
| 290 | added = ["line1\n", "line2\n"] |
| 291 | merged, conflict = three_way_merge_lines([], added, added) |
| 292 | assert merged == added |
| 293 | assert not conflict |
| 294 | |
| 295 | # ββ Conflict cases βββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 296 | |
| 297 | def test_conflict_detected(self) -> None: |
| 298 | base = ["a\n", "b\n"] |
| 299 | ours = ["a\n", "O\n"] |
| 300 | theirs = ["a\n", "T\n"] |
| 301 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 302 | assert conflict |
| 303 | |
| 304 | def test_conflict_has_ours_marker(self) -> None: |
| 305 | base = ["a\n"] |
| 306 | ours = ["O\n"] |
| 307 | theirs = ["T\n"] |
| 308 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 309 | assert _has_marker(merged, "<<<<<<<") |
| 310 | |
| 311 | def test_conflict_has_base_marker(self) -> None: |
| 312 | base = ["a\n"] |
| 313 | ours = ["O\n"] |
| 314 | theirs = ["T\n"] |
| 315 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 316 | assert _has_marker(merged, "|||||||") |
| 317 | |
| 318 | def test_conflict_has_sep_marker(self) -> None: |
| 319 | base = ["a\n"] |
| 320 | ours = ["O\n"] |
| 321 | theirs = ["T\n"] |
| 322 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 323 | assert _has_marker(merged, "=======") |
| 324 | |
| 325 | def test_conflict_has_end_marker(self) -> None: |
| 326 | base = ["a\n"] |
| 327 | ours = ["O\n"] |
| 328 | theirs = ["T\n"] |
| 329 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 330 | assert _has_marker(merged, ">>>>>>> end conflict") |
| 331 | |
| 332 | def test_conflict_ours_content_present(self) -> None: |
| 333 | base = ["x\n"] |
| 334 | ours = ["ours_line\n"] |
| 335 | theirs = ["theirs_line\n"] |
| 336 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 337 | assert any("ours_line" in ln for ln in merged) |
| 338 | |
| 339 | def test_conflict_theirs_content_present(self) -> None: |
| 340 | base = ["x\n"] |
| 341 | ours = ["ours_line\n"] |
| 342 | theirs = ["theirs_line\n"] |
| 343 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 344 | assert any("theirs_line" in ln for ln in merged) |
| 345 | |
| 346 | def test_conflict_base_content_in_diff3_section(self) -> None: |
| 347 | base = ["base_line\n"] |
| 348 | ours = ["ours_line\n"] |
| 349 | theirs = ["theirs_line\n"] |
| 350 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 351 | assert any("base_line" in ln for ln in merged) |
| 352 | |
| 353 | def test_stable_lines_outside_conflict_preserved(self) -> None: |
| 354 | base = ["preamble\n", "conflict_zone\n", "epilogue\n"] |
| 355 | ours = ["preamble\n", "ours_content\n", "epilogue\n"] |
| 356 | theirs = ["preamble\n", "theirs_content\n", "epilogue\n"] |
| 357 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 358 | assert conflict |
| 359 | assert any("preamble" in ln for ln in merged) |
| 360 | assert any("epilogue" in ln for ln in merged) |
| 361 | |
| 362 | def test_multiple_independent_conflicts(self) -> None: |
| 363 | base = ["a\n", "b\n", "c\n", "d\n", "e\n"] |
| 364 | ours = ["a\n", "O1\n", "c\n", "O2\n", "e\n"] |
| 365 | theirs = ["a\n", "T1\n", "c\n", "T2\n", "e\n"] |
| 366 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 367 | assert conflict |
| 368 | assert sum(1 for ln in merged if ln.startswith("<<<<<<< ")) == 2 |
| 369 | |
| 370 | def test_both_empty_insertion_at_same_point_conflicts(self) -> None: |
| 371 | base = ["a\n", "c\n"] |
| 372 | ours = ["a\n", "B\n", "c\n"] |
| 373 | theirs = ["a\n", "T\n", "c\n"] |
| 374 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 375 | assert conflict |
| 376 | |
| 377 | def test_custom_labels_appear_in_markers(self) -> None: |
| 378 | base = ["x\n"] |
| 379 | ours = ["O\n"] |
| 380 | theirs = ["T\n"] |
| 381 | merged, _ = three_way_merge_lines( |
| 382 | base, ours, theirs, |
| 383 | label_ours="feature/login", |
| 384 | label_base="merge-base", |
| 385 | label_theirs="main", |
| 386 | ) |
| 387 | assert any("feature/login" in ln for ln in merged) |
| 388 | assert any("main" in ln for ln in merged) |
| 389 | assert any("merge-base" in ln for ln in merged) |
| 390 | |
| 391 | |
| 392 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 393 | # TestConflictMarkerFormat β exact marker syntax / Cohen action labels |
| 394 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 395 | |
| 396 | class TestConflictMarkerFormat: |
| 397 | |
| 398 | def _conflict_merged(self, base_text: str, ours_text: str, theirs_text: str) -> list[str]: |
| 399 | base = base_text.splitlines(keepends=True) |
| 400 | ours = ours_text.splitlines(keepends=True) |
| 401 | theirs = theirs_text.splitlines(keepends=True) |
| 402 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 403 | return merged |
| 404 | |
| 405 | def test_deleted_action_on_ours_marker(self) -> None: |
| 406 | merged = self._conflict_merged("func()\n", "", "other()\n") |
| 407 | assert any("<<<<<<< ours [deleted]" in ln for ln in merged) |
| 408 | |
| 409 | def test_inserted_action_on_theirs_marker(self) -> None: |
| 410 | merged = self._conflict_merged("", "ours_line\n", "theirs_line\n") |
| 411 | assert any("[inserted]" in ln for ln in merged) |
| 412 | |
| 413 | def test_modified_action_on_modified_conflict(self) -> None: |
| 414 | merged = self._conflict_merged("original\n", "ours_ver\n", "theirs_ver\n") |
| 415 | assert any("[modified]" in ln for ln in merged) |
| 416 | |
| 417 | def test_base_section_marker_present(self) -> None: |
| 418 | merged = self._conflict_merged("base\n", "ours\n", "theirs\n") |
| 419 | assert any("|||||||" in ln for ln in merged) |
| 420 | |
| 421 | def test_end_conflict_marker_present(self) -> None: |
| 422 | merged = self._conflict_merged("base\n", "ours\n", "theirs\n") |
| 423 | assert any(">>>>>>> end conflict" in ln for ln in merged) |
| 424 | |
| 425 | def test_marker_ordering(self) -> None: |
| 426 | """<<<<<<< must precede ||||||| must precede ======= must precede >>>>>>>.""" |
| 427 | merged = self._conflict_merged("base\n", "ours\n", "theirs\n") |
| 428 | positions = {} |
| 429 | for i, ln in enumerate(merged): |
| 430 | if "<<<<<<<" in ln: |
| 431 | positions["start"] = i |
| 432 | elif "|||||||" in ln: |
| 433 | positions["base"] = i |
| 434 | elif "=======" in ln: |
| 435 | positions["sep"] = i |
| 436 | elif ">>>>>>>" in ln: |
| 437 | positions["end"] = i |
| 438 | assert positions["start"] < positions["base"] < positions["sep"] < positions["end"] |
| 439 | |
| 440 | |
| 441 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 442 | # TestFormatConflictDiff |
| 443 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 444 | |
| 445 | class TestFormatConflictDiff: |
| 446 | """Tests for format_conflict_diff() β the muse diff --conflict renderer.""" |
| 447 | |
| 448 | def _make_manifests(self) -> tuple[dict, dict, dict, MagicMock]: |
| 449 | base_content = b"base line\n" |
| 450 | ours_content = b"ours line\n" |
| 451 | theirs_content = b"theirs line\n" |
| 452 | path = "src/util.py" |
| 453 | |
| 454 | base_oid = "aaaa" * 16 |
| 455 | ours_oid = "bbbb" * 16 |
| 456 | theirs_oid = "cccc" * 16 |
| 457 | |
| 458 | base_manifest = {path: base_oid} |
| 459 | ours_manifest = {path: ours_oid} |
| 460 | theirs_manifest = {path: theirs_oid} |
| 461 | |
| 462 | def _read(root: pathlib.Path, oid: str) -> bytes | None: |
| 463 | return {base_oid: base_content, ours_oid: ours_content, theirs_oid: theirs_content}.get(oid) |
| 464 | |
| 465 | return base_manifest, ours_manifest, theirs_manifest, _read |
| 466 | |
| 467 | def test_output_contains_conflict_header(self, tmp_path: pathlib.Path) -> None: |
| 468 | base_m, ours_m, theirs_m, read_fn = self._make_manifests() |
| 469 | lines = format_conflict_diff( |
| 470 | "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, |
| 471 | ) |
| 472 | assert any("CONFLICT" in ln for ln in lines) |
| 473 | assert any("src/util.py" in ln for ln in lines) |
| 474 | |
| 475 | def test_output_contains_ours_section(self, tmp_path: pathlib.Path) -> None: |
| 476 | base_m, ours_m, theirs_m, read_fn = self._make_manifests() |
| 477 | lines = format_conflict_diff( |
| 478 | "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, |
| 479 | ) |
| 480 | assert any("[ours]" in ln for ln in lines) |
| 481 | |
| 482 | def test_output_contains_theirs_section(self, tmp_path: pathlib.Path) -> None: |
| 483 | base_m, ours_m, theirs_m, read_fn = self._make_manifests() |
| 484 | lines = format_conflict_diff( |
| 485 | "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, |
| 486 | ) |
| 487 | assert any("[theirs]" in ln for ln in lines) |
| 488 | |
| 489 | def test_custom_labels_appear(self, tmp_path: pathlib.Path) -> None: |
| 490 | base_m, ours_m, theirs_m, read_fn = self._make_manifests() |
| 491 | lines = format_conflict_diff( |
| 492 | "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, |
| 493 | ours_label="feature/auth", |
| 494 | theirs_label="main", |
| 495 | ) |
| 496 | assert any("feature/auth" in ln for ln in lines) |
| 497 | assert any("main" in ln for ln in lines) |
| 498 | |
| 499 | def test_returns_list_of_strings(self, tmp_path: pathlib.Path) -> None: |
| 500 | base_m, ours_m, theirs_m, read_fn = self._make_manifests() |
| 501 | result = format_conflict_diff( |
| 502 | "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, |
| 503 | ) |
| 504 | assert isinstance(result, list) |
| 505 | assert all(isinstance(ln, str) for ln in result) |
| 506 | |
| 507 | def test_ansi_sanitized_in_path(self, tmp_path: pathlib.Path) -> None: |
| 508 | """ANSI escape sequences in the path must not appear in output.""" |
| 509 | malicious_path = "\x1b[31mmalicious\x1b[0m/file.py" |
| 510 | base_m, ours_m, theirs_m, read_fn = self._make_manifests() |
| 511 | # Use the malicious path (will have no manifest entry β empty diffs) |
| 512 | lines = format_conflict_diff( |
| 513 | malicious_path, tmp_path, {}, {}, {}, read_fn, |
| 514 | ) |
| 515 | output = "\n".join(lines) |
| 516 | assert "\x1b[31m" not in output |
| 517 | |
| 518 | def test_missing_file_shows_no_changes_message(self, tmp_path: pathlib.Path) -> None: |
| 519 | def _read(root: pathlib.Path, oid: str) -> bytes | None: |
| 520 | return None |
| 521 | |
| 522 | lines = format_conflict_diff( |
| 523 | "missing.py", tmp_path, {}, {}, {}, _read, |
| 524 | ) |
| 525 | assert any("no changes" in ln.lower() for ln in lines) |
| 526 | |
| 527 | def test_no_color_mode(self, tmp_path: pathlib.Path) -> None: |
| 528 | base_m, ours_m, theirs_m, read_fn = self._make_manifests() |
| 529 | lines = format_conflict_diff( |
| 530 | "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn, |
| 531 | use_color=False, |
| 532 | ) |
| 533 | output = "\n".join(lines) |
| 534 | assert "\x1b[" not in output |
| 535 | |
| 536 | |
| 537 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 538 | # TestEdgeCases |
| 539 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 540 | |
| 541 | class TestEdgeCases: |
| 542 | def test_all_empty(self) -> None: |
| 543 | merged, conflict = three_way_merge_lines([], [], []) |
| 544 | assert merged == [] |
| 545 | assert not conflict |
| 546 | |
| 547 | def test_base_empty_ours_adds(self) -> None: |
| 548 | merged, conflict = three_way_merge_lines([], ["new\n"], []) |
| 549 | assert merged == ["new\n"] |
| 550 | assert not conflict |
| 551 | |
| 552 | def test_base_empty_theirs_adds(self) -> None: |
| 553 | merged, conflict = three_way_merge_lines([], [], ["new\n"]) |
| 554 | assert merged == ["new\n"] |
| 555 | assert not conflict |
| 556 | |
| 557 | def test_base_empty_both_add_same(self) -> None: |
| 558 | merged, conflict = three_way_merge_lines([], ["same\n"], ["same\n"]) |
| 559 | assert merged == ["same\n"] |
| 560 | assert not conflict |
| 561 | |
| 562 | def test_base_empty_both_add_different_conflicts(self) -> None: |
| 563 | _, conflict = three_way_merge_lines([], ["A\n"], ["B\n"]) |
| 564 | assert conflict |
| 565 | |
| 566 | def test_ours_equals_base_theirs_deletes(self) -> None: |
| 567 | base = ["x\n"] |
| 568 | merged, conflict = three_way_merge_lines(base, base, []) |
| 569 | assert merged == [] |
| 570 | assert not conflict |
| 571 | |
| 572 | def test_both_delete_everything(self) -> None: |
| 573 | base = ["x\n", "y\n"] |
| 574 | merged, conflict = three_way_merge_lines(base, [], []) |
| 575 | assert merged == [] |
| 576 | assert not conflict |
| 577 | |
| 578 | def test_unicode_content(self) -> None: |
| 579 | base = ["hΓ©llo\n", "wΓΆrld\n"] |
| 580 | ours = ["hΓ©llo\n", "WΓRLD\n"] |
| 581 | theirs = ["hΓ©llo\n", "wΓΆrld\n"] |
| 582 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 583 | assert not conflict |
| 584 | assert "WΓRLD\n" in merged |
| 585 | |
| 586 | def test_long_lines_no_crash(self) -> None: |
| 587 | long_line = "x" * 10_000 + "\n" |
| 588 | base = [long_line] |
| 589 | ours = ["y" * 10_000 + "\n"] |
| 590 | theirs = [long_line] |
| 591 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 592 | assert not conflict |
| 593 | |
| 594 | def test_many_identical_lines(self) -> None: |
| 595 | base = ["same\n"] * 1000 |
| 596 | merged, conflict = three_way_merge_lines(base, base, base) |
| 597 | assert merged == base |
| 598 | assert not conflict |
| 599 | |
| 600 | def test_lines_without_trailing_newline(self) -> None: |
| 601 | # Not all editors guarantee trailing newlines. |
| 602 | base = ["no newline"] |
| 603 | ours = ["changed"] |
| 604 | merged, conflict = three_way_merge_lines(base, ours, base) |
| 605 | assert not conflict |
| 606 | assert "changed" in merged[0] |
| 607 | |
| 608 | |
| 609 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 610 | # TestIntegration β realistic multi-hunk merges |
| 611 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 612 | |
| 613 | class TestIntegration: |
| 614 | """Realistic multi-hunk merge scenarios matching real developer workflows.""" |
| 615 | |
| 616 | def test_ours_adds_docstring_theirs_renames_param(self) -> None: |
| 617 | base = [ |
| 618 | "def calculate(x):\n", |
| 619 | " return x * 2\n", |
| 620 | ] |
| 621 | ours = [ |
| 622 | "def calculate(x):\n", |
| 623 | ' """Double the input."""\n', |
| 624 | " return x * 2\n", |
| 625 | ] |
| 626 | theirs = [ |
| 627 | "def calculate(value):\n", |
| 628 | " return value * 2\n", |
| 629 | ] |
| 630 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 631 | # ours inserted a docstring; theirs renamed xβvalue. |
| 632 | # They touch different parts of the function β should be conflict-free |
| 633 | # if ours and theirs don't overlap on line 1. |
| 634 | # (They DO overlap on line 1 if difflib groups them β conflict is acceptable) |
| 635 | # Just assert no crash and well-formed output. |
| 636 | assert isinstance(merged, list) |
| 637 | assert isinstance(conflict, bool) |
| 638 | |
| 639 | def test_both_fix_same_typo(self) -> None: |
| 640 | base = ["# Calcualtion module\n"] |
| 641 | ours = ["# Calculation module\n"] |
| 642 | theirs = ["# Calculation module\n"] |
| 643 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 644 | assert not conflict |
| 645 | assert merged == ours |
| 646 | |
| 647 | def test_ours_deletes_block_theirs_modifies_different_block(self) -> None: |
| 648 | base = [ |
| 649 | "block_a_line_1\n", |
| 650 | "block_a_line_2\n", |
| 651 | "separator\n", |
| 652 | "block_b_line_1\n", |
| 653 | "block_b_line_2\n", |
| 654 | ] |
| 655 | ours = [ |
| 656 | "separator\n", |
| 657 | "block_b_line_1\n", |
| 658 | "block_b_line_2\n", |
| 659 | ] |
| 660 | theirs = [ |
| 661 | "block_a_line_1\n", |
| 662 | "block_a_line_2\n", |
| 663 | "separator\n", |
| 664 | "block_b_LINE_1\n", # modified |
| 665 | "block_b_line_2\n", |
| 666 | ] |
| 667 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 668 | # No overlap between ours' deletion (top) and theirs' modification (bottom) |
| 669 | assert not conflict |
| 670 | assert "block_b_LINE_1\n" in merged |
| 671 | assert "block_a_line_1\n" not in merged |
| 672 | |
| 673 | def test_conflict_preserves_stable_context_above_and_below(self) -> None: |
| 674 | base = ["header\n", "conflict_zone\n", "footer\n"] |
| 675 | ours = ["header\n", "ours_version\n", "footer\n"] |
| 676 | theirs = ["header\n", "theirs_version\n", "footer\n"] |
| 677 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 678 | assert conflict |
| 679 | text = "".join(merged) |
| 680 | assert "header\n" in text |
| 681 | assert "footer\n" in text |
| 682 | |
| 683 | def test_append_only_change_ours(self) -> None: |
| 684 | base = ["line1\n", "line2\n"] |
| 685 | ours = ["line1\n", "line2\n", "appended\n"] |
| 686 | merged, conflict = three_way_merge_lines(base, ours, base) |
| 687 | assert not conflict |
| 688 | assert merged == ours |
| 689 | |
| 690 | def test_prepend_only_change_theirs(self) -> None: |
| 691 | base = ["line1\n"] |
| 692 | theirs = ["prepended\n", "line1\n"] |
| 693 | merged, conflict = three_way_merge_lines(base, base, theirs) |
| 694 | assert not conflict |
| 695 | assert merged == theirs |
| 696 | |
| 697 | def test_roundtrip_clean_merge_deterministic(self) -> None: |
| 698 | base = ["a\n", "b\n", "c\n"] |
| 699 | ours = ["a\n", "B\n", "c\n"] |
| 700 | theirs = ["a\n", "b\n", "C\n"] |
| 701 | m1, c1 = three_way_merge_lines(base, ours, theirs) |
| 702 | m2, c2 = three_way_merge_lines(base, ours, theirs) |
| 703 | assert m1 == m2 |
| 704 | assert c1 == c2 |
| 705 | |
| 706 | |
| 707 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 708 | # TestStress |
| 709 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 710 | |
| 711 | class TestStress: |
| 712 | def test_large_file_clean_merge_fast(self) -> None: |
| 713 | """500-line file with non-overlapping changes in each half: < 2 s.""" |
| 714 | base = [f"line_{i:04d}\n" for i in range(500)] |
| 715 | ours = list(base) |
| 716 | ours[50] = "OURS_CHANGE\n" |
| 717 | theirs = list(base) |
| 718 | theirs[450] = "THEIRS_CHANGE\n" |
| 719 | |
| 720 | start = time.monotonic() |
| 721 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 722 | elapsed = time.monotonic() - start |
| 723 | |
| 724 | assert not conflict |
| 725 | assert elapsed < 2.0, f"merge took {elapsed:.2f}s" |
| 726 | assert "OURS_CHANGE\n" in merged |
| 727 | assert "THEIRS_CHANGE\n" in merged |
| 728 | |
| 729 | def test_many_conflicts_no_crash(self) -> None: |
| 730 | """Alternating conflict zones throughout a 200-line file.""" |
| 731 | base = [f"line_{i}\n" for i in range(200)] |
| 732 | ours = list(base) |
| 733 | theirs = list(base) |
| 734 | # Every 10th line conflicts. |
| 735 | for i in range(0, 200, 10): |
| 736 | ours[i] = f"ours_{i}\n" |
| 737 | theirs[i] = f"theirs_{i}\n" |
| 738 | |
| 739 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 740 | assert conflict |
| 741 | assert isinstance(merged, list) |
| 742 | |
| 743 | def test_concurrent_merges_consistent(self) -> None: |
| 744 | """Concurrent invocations must return identical results (no shared mutable state).""" |
| 745 | base = ["a\n", "conflict\n", "b\n"] |
| 746 | ours = ["a\n", "ours\n", "b\n"] |
| 747 | theirs = ["a\n", "theirs\n", "b\n"] |
| 748 | results: list[tuple[list[str], bool]] = [] |
| 749 | errors: list[Exception] = [] |
| 750 | |
| 751 | def _run() -> None: |
| 752 | try: |
| 753 | results.append(three_way_merge_lines(base, ours, theirs)) |
| 754 | except Exception as exc: |
| 755 | errors.append(exc) |
| 756 | |
| 757 | threads = [threading.Thread(target=_run) for _ in range(10)] |
| 758 | for t in threads: |
| 759 | t.start() |
| 760 | for t in threads: |
| 761 | t.join() |
| 762 | |
| 763 | assert not errors |
| 764 | assert len(results) == 10 |
| 765 | assert all(r == results[0] for r in results) |
| 766 | |
| 767 | |
| 768 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 769 | # TestDataIntegrity |
| 770 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 771 | |
| 772 | class TestDataIntegrity: |
| 773 | def test_clean_merge_is_deterministic(self) -> None: |
| 774 | base = ["a\n", "b\n", "c\n"] |
| 775 | ours = ["a\n", "B\n", "c\n"] |
| 776 | theirs = ["a\n", "b\n", "C\n"] |
| 777 | r1, c1 = three_way_merge_lines(base, ours, theirs) |
| 778 | r2, c2 = three_way_merge_lines(base, ours, theirs) |
| 779 | assert r1 == r2 |
| 780 | assert c1 == c2 |
| 781 | |
| 782 | def test_conflict_output_is_deterministic(self) -> None: |
| 783 | base = ["x\n"] |
| 784 | ours = ["O\n"] |
| 785 | theirs = ["T\n"] |
| 786 | r1, _ = three_way_merge_lines(base, ours, theirs) |
| 787 | r2, _ = three_way_merge_lines(base, ours, theirs) |
| 788 | assert r1 == r2 |
| 789 | |
| 790 | def test_input_sequences_not_mutated(self) -> None: |
| 791 | base = ["a\n", "b\n"] |
| 792 | ours = ["a\n", "O\n"] |
| 793 | theirs = ["a\n", "T\n"] |
| 794 | base_copy = list(base) |
| 795 | ours_copy = list(ours) |
| 796 | theirs_copy = list(theirs) |
| 797 | three_way_merge_lines(base, ours, theirs) |
| 798 | assert base == base_copy |
| 799 | assert ours == ours_copy |
| 800 | assert theirs == theirs_copy |
| 801 | |
| 802 | def test_merged_content_contains_all_clean_changes(self) -> None: |
| 803 | # ours changes bβB (pos 1); theirs changes dβD (pos 3). |
| 804 | # 'a' (pos 0) and 'c' (pos 2) are stable in both LCS runs, |
| 805 | # so the two changes land in separate non-overlapping regions β clean. |
| 806 | base = ["a\n", "b\n", "c\n", "d\n"] |
| 807 | ours = ["a\n", "B\n", "c\n", "d\n"] |
| 808 | theirs = ["a\n", "b\n", "c\n", "D\n"] |
| 809 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 810 | assert not conflict |
| 811 | merged_str = "".join(merged) |
| 812 | assert "B\n" in merged_str |
| 813 | assert "D\n" in merged_str |
| 814 | assert "a\n" in merged_str |
| 815 | assert "c\n" in merged_str |
| 816 | |
| 817 | def test_conflict_markers_are_balanced(self) -> None: |
| 818 | """Every <<<<<<< must have a matching >>>>>>>.""" |
| 819 | base = ["x\n", "y\n"] |
| 820 | ours = ["O1\n", "O2\n"] |
| 821 | theirs = ["T1\n", "T2\n"] |
| 822 | merged, _ = three_way_merge_lines(base, ours, theirs) |
| 823 | opens = sum(1 for ln in merged if ln.startswith("<<<<<<<")) |
| 824 | closes = sum(1 for ln in merged if ln.startswith(">>>>>>>")) |
| 825 | assert opens == closes |
| 826 | assert opens >= 1 |
| 827 | |
| 828 | |
| 829 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 830 | # TestSecurity |
| 831 | # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 832 | |
| 833 | class TestSecurity: |
| 834 | def test_ansi_in_content_does_not_spoof_markers(self) -> None: |
| 835 | """ANSI sequences in file content must not produce fake conflict markers.""" |
| 836 | base = ["normal\n"] |
| 837 | ours = ["\x1b[31m<<<<<<< fake ours\x1b[0m\n"] |
| 838 | theirs = ["other\n"] |
| 839 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 840 | # The ANSI-containing line is just content β the conflict marker check |
| 841 | # should find real markers (7 < chars), not ANSI-faked ones. |
| 842 | real_opens = sum(1 for ln in merged if ln.startswith("<<<<<<<")) |
| 843 | # There may be a real conflict (ours β theirs), but the ANSI line itself |
| 844 | # should appear as content, not as an extra conflict opener. |
| 845 | assert isinstance(merged, list) |
| 846 | assert isinstance(conflict, bool) |
| 847 | |
| 848 | def test_null_bytes_in_content_handled(self) -> None: |
| 849 | """Null bytes in text content must not crash the merge.""" |
| 850 | base = ["a\x00b\n"] |
| 851 | ours = ["a\x00B\n"] |
| 852 | theirs = ["a\x00b\n"] |
| 853 | merged, conflict = three_way_merge_lines(base, ours, theirs) |
| 854 | assert not conflict # only ours changed |
| 855 | |
| 856 | def test_very_long_label_no_overflow(self) -> None: |
| 857 | long_label = "x" * 2000 |
| 858 | base = ["a\n"] |
| 859 | ours = ["O\n"] |
| 860 | theirs = ["T\n"] |
| 861 | merged, conflict = three_way_merge_lines( |
| 862 | base, ours, theirs, label_ours=long_label, label_theirs=long_label |
| 863 | ) |
| 864 | assert conflict |
| 865 | assert any(long_label in ln for ln in merged) |
| 866 | |
| 867 | def test_label_injection_via_newline(self) -> None: |
| 868 | """Newline in a label must not inject extra lines into markers.""" |
| 869 | malicious = "branch\n>>>>>>> malicious injection" |
| 870 | base = ["a\n"] |
| 871 | ours = ["O\n"] |
| 872 | theirs = ["T\n"] |
| 873 | merged, _ = three_way_merge_lines(base, ours, theirs, label_ours=malicious) |
| 874 | # The malicious label appears in the <<<<<<< line; verify no phantom >>>>>>> before the real one |
| 875 | positions_of_end = [i for i, ln in enumerate(merged) if ">>>>>>> end conflict" in ln] |
| 876 | positions_of_open = [i for i, ln in enumerate(merged) if ln.startswith("<<<<<<<")] |
| 877 | assert len(positions_of_end) >= 1 |
| 878 | assert len(positions_of_open) >= 1 |
| 879 | |
| 880 | def test_format_conflict_diff_ansi_in_path_sanitized( |
| 881 | self, tmp_path: pathlib.Path |
| 882 | ) -> None: |
| 883 | """ANSI codes in the file path argument must not appear in rendered output.""" |
| 884 | malicious_path = "\x1b[31minjected\x1b[0m/file.py" |
| 885 | lines = format_conflict_diff( |
| 886 | malicious_path, tmp_path, {}, {}, {}, |
| 887 | lambda root, oid: None, |
| 888 | ) |
| 889 | output = "\n".join(lines) |
| 890 | assert "\x1b[31m" not in output |