gabriel / muse public
test_core_cohen_transform.py python
890 lines 38.0 KB
Raw
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor ⚠ breaking 28 days ago
1 """Tests for muse.core.cohen_transform — the Cohen Transform three-way merge.
2
3 Named in honour of Bram Cohen (creator of BitTorrent, Manyana CRDT weave),
4 whose conflict-presentation insight is the direct inspiration for this module.
5
6 Test categories
7 ---------------
8 TestClassifyAction — unit: classify_action()
9 TestAnnotateHunkAction — unit: annotate_hunk_action()
10 TestComputeRegions — unit: compute_regions() / _find_sync_regions()
11 TestThreeWayMergeLines — unit: three_way_merge_lines() clean + conflict
12 TestConflictMarkerFormat — unit: marker syntax, diff3 style, label content
13 TestFormatConflictDiff — unit: format_conflict_diff() rendering
14 TestEdgeCases — unit: empty sequences, binary-safe text, unicode
15 TestIntegration — integration: realistic multi-hunk scenarios
16 TestStress — stress: large files, many conflicts, many hunks
17 TestDataIntegrity — data integrity: determinism, idempotency
18 TestSecurity — security: ANSI injection, path traversal, null bytes
19 """
20
21 from __future__ import annotations
22
23 import difflib
24 import pathlib
25 import threading
26 import time
27 from collections.abc import Sequence
28 from unittest.mock import MagicMock
29
30 import pytest
31
32 from muse.core.cohen_transform import (
33 CONFLICT_SEPARATOR,
34 MergeRegion,
35 annotate_hunk_action,
36 classify_action,
37 compute_regions,
38 format_conflict_diff,
39 three_way_merge_lines,
40 )
41
42
43 # ──────────────────────────────────────────────────────────────────────────────
44 # Helpers
45 # ──────────────────────────────────────────────────────────────────────────────
46
47 def _lines(*text: str) -> list[str]:
48 """Split text into lines, each ending with \\n."""
49 return [ln + "\n" for ln in "\n".join(text).split("\n") if ln + "\n"]
50
51
52 def _merge(base: Sequence[str], ours: Sequence[str], theirs: Sequence[str], **kw: str) -> tuple[list[str], bool]:
53 return three_way_merge_lines(
54 _lines(*base) if isinstance(base, (list, tuple)) else list(base),
55 _lines(*ours) if isinstance(ours, (list, tuple)) else list(ours),
56 _lines(*theirs) if isinstance(theirs, (list, tuple)) else list(theirs),
57 **kw,
58 )
59
60
61 def _has_marker(lines: list[str], marker: str) -> bool:
62 return any(marker in ln for ln in lines)
63
64
65 # ──────────────────────────────────────────────────────────────────────────────
66 # TestClassifyAction
67 # ──────────────────────────────────────────────────────────────────────────────
68
69 class TestClassifyAction:
70 def test_inserted_when_base_empty(self) -> None:
71 assert classify_action([], ["new\n"]) == "inserted"
72
73 def test_deleted_when_other_empty(self) -> None:
74 assert classify_action(["old\n"], []) == "deleted"
75
76 def test_modified_when_both_non_empty(self) -> None:
77 assert classify_action(["old\n"], ["new\n"]) == "modified"
78
79 def test_modified_when_multi_line_both(self) -> None:
80 assert classify_action(["a\n", "b\n"], ["c\n", "d\n"]) == "modified"
81
82 def test_inserted_single_line(self) -> None:
83 assert classify_action([], ["x\n"]) == "inserted"
84
85 def test_deleted_single_line(self) -> None:
86 assert classify_action(["x\n"], []) == "deleted"
87
88
89 # ──────────────────────────────────────────────────────────────────────────────
90 # TestAnnotateHunkAction
91 # ──────────────────────────────────────────────────────────────────────────────
92
93 class TestAnnotateHunkAction:
94 def _make_hunk(self, adds: int = 0, dels: int = 0) -> list[str]:
95 lines = ["--- a/f.py", "+++ b/f.py", "@@ -1,3 +1,3 @@", " context\n"]
96 lines.extend([f"+added{i}\n" for i in range(adds)])
97 lines.extend([f"-deleted{i}\n" for i in range(dels)])
98 return lines
99
100 def test_pure_insert_hunk_labelled_inserted(self) -> None:
101 hunk = self._make_hunk(adds=2, dels=0)
102 result = annotate_hunk_action(hunk, "ours")
103 at_line = next(ln for ln in result if ln.startswith("@@"))
104 assert "[ours: inserted]" in at_line
105
106 def test_pure_delete_hunk_labelled_deleted(self) -> None:
107 hunk = self._make_hunk(adds=0, dels=2)
108 result = annotate_hunk_action(hunk, "theirs")
109 at_line = next(ln for ln in result if ln.startswith("@@"))
110 assert "[theirs: deleted]" in at_line
111
112 def test_mixed_hunk_labelled_modified(self) -> None:
113 hunk = self._make_hunk(adds=1, dels=1)
114 result = annotate_hunk_action(hunk, "ours")
115 at_line = next(ln for ln in result if ln.startswith("@@"))
116 assert "[ours: modified]" in at_line
117
118 def test_header_lines_preserved(self) -> None:
119 hunk = self._make_hunk(adds=1)
120 result = annotate_hunk_action(hunk, "ours")
121 assert any(ln.startswith("---") for ln in result)
122 assert any(ln.startswith("+++") for ln in result)
123
124 def test_multiple_hunks_each_annotated(self) -> None:
125 hunk = [
126 "--- a/f.py", "+++ b/f.py",
127 "@@ -1,2 +1,2 @@", " ctx\n", "+add1\n",
128 "@@ -10,2 +10,2 @@", " ctx2\n", "-del1\n",
129 ]
130 result = annotate_hunk_action(hunk, "ours")
131 at_lines = [ln for ln in result if ln.startswith("@@")]
132 assert len(at_lines) == 2
133 assert "[ours: inserted]" in at_lines[0]
134 assert "[ours: deleted]" in at_lines[1]
135
136 def test_empty_hunk_list_returns_empty(self) -> None:
137 assert annotate_hunk_action([], "ours") == []
138
139 def test_no_at_markers_unchanged(self) -> None:
140 lines = ["--- a/f.py", "+++ b/f.py", " context\n"]
141 result = annotate_hunk_action(lines, "ours")
142 assert result == lines
143
144 def test_side_label_appears_in_annotation(self) -> None:
145 hunk = self._make_hunk(adds=1)
146 for label in ("main", "feature/auth", "theirs"):
147 result = annotate_hunk_action(hunk, label)
148 at_line = next(ln for ln in result if ln.startswith("@@"))
149 assert label in at_line
150
151
152 # ──────────────────────────────────────────────────────────────────────────────
153 # TestComputeRegions
154 # ──────────────────────────────────────────────────────────────────────────────
155
156 class TestComputeRegions:
157 def test_all_stable_no_changes(self) -> None:
158 lines = ["a\n", "b\n", "c\n"]
159 regions = compute_regions(lines, lines, lines)
160 kinds = [r.kind for r in regions]
161 assert all(k == "stable" for k in kinds)
162
163 def test_ours_only(self) -> None:
164 base = ["a\n", "b\n", "c\n"]
165 ours = ["a\n", "B\n", "c\n"]
166 regions = compute_regions(base, ours, base)
167 conflict_kinds = [r.kind for r in regions]
168 assert "ours_only" in conflict_kinds
169 assert "conflict" not in conflict_kinds
170
171 def test_theirs_only(self) -> None:
172 base = ["a\n", "b\n", "c\n"]
173 theirs = ["a\n", "T\n", "c\n"]
174 regions = compute_regions(base, base, theirs)
175 kinds = [r.kind for r in regions]
176 assert "theirs_only" in kinds
177 assert "conflict" not in kinds
178
179 def test_both_same(self) -> None:
180 base = ["a\n", "b\n"]
181 changed = ["a\n", "X\n"]
182 regions = compute_regions(base, changed, changed)
183 kinds = [r.kind for r in regions]
184 assert "both_same" in kinds
185 assert "conflict" not in kinds
186
187 def test_conflict(self) -> None:
188 base = ["a\n", "b\n"]
189 ours = ["a\n", "O\n"]
190 theirs = ["a\n", "T\n"]
191 regions = compute_regions(base, ours, theirs)
192 kinds = [r.kind for r in regions]
193 assert "conflict" in kinds
194
195 def test_empty_sequences(self) -> None:
196 regions = compute_regions([], [], [])
197 assert regions == []
198
199 def test_region_lines_are_lists(self) -> None:
200 base = ["a\n", "b\n"]
201 ours = ["a\n", "X\n"]
202 for r in compute_regions(base, ours, base):
203 assert isinstance(r.base_lines, list)
204 assert isinstance(r.ours_lines, list)
205 assert isinstance(r.theirs_lines, list)
206
207 def test_stable_region_all_three_equal(self) -> None:
208 lines = ["x\n", "y\n"]
209 regions = compute_regions(lines, lines, lines)
210 for r in regions:
211 assert r.base_lines == r.ours_lines == r.theirs_lines
212
213 def test_pure_insertion_ours(self) -> None:
214 base = ["a\n", "c\n"]
215 ours = ["a\n", "b\n", "c\n"] # inserted b
216 regions = compute_regions(base, ours, base)
217 kinds = [r.kind for r in regions]
218 assert "ours_only" in kinds or "stable" in kinds
219 assert "conflict" not in kinds
220
221 def test_pure_insertion_conflict(self) -> None:
222 base = ["a\n", "c\n"]
223 ours = ["a\n", "B\n", "c\n"]
224 theirs = ["a\n", "T\n", "c\n"]
225 regions = compute_regions(base, ours, theirs)
226 kinds = [r.kind for r in regions]
227 assert "conflict" in kinds
228
229
230 # ──────────────────────────────────────────────────────────────────────────────
231 # TestThreeWayMergeLines — clean merges
232 # ──────────────────────────────────────────────────────────────────────────────
233
234 class TestThreeWayMergeLines:
235
236 # ── Clean cases ──────────────────────────────────────────────────────────
237
238 def test_no_changes_returns_base(self) -> None:
239 base = ["a\n", "b\n", "c\n"]
240 merged, conflict = three_way_merge_lines(base, base, base)
241 assert merged == base
242 assert not conflict
243
244 def test_ours_only_change_applied(self) -> None:
245 base = ["a\n", "b\n", "c\n"]
246 ours = ["a\n", "B\n", "c\n"]
247 merged, conflict = three_way_merge_lines(base, ours, base)
248 assert merged == ours
249 assert not conflict
250
251 def test_theirs_only_change_applied(self) -> None:
252 base = ["a\n", "b\n", "c\n"]
253 theirs = ["a\n", "T\n", "c\n"]
254 merged, conflict = three_way_merge_lines(base, base, theirs)
255 assert merged == theirs
256 assert not conflict
257
258 def test_both_same_change_applied_once(self) -> None:
259 base = ["a\n", "b\n"]
260 changed = ["a\n", "X\n"]
261 merged, conflict = three_way_merge_lines(base, changed, changed)
262 assert merged == changed
263 assert not conflict
264
265 def test_non_overlapping_changes_both_applied(self) -> None:
266 base = ["a\n", "b\n", "c\n", "d\n"]
267 ours = ["a\n", "B\n", "c\n", "d\n"] # b→B
268 theirs = ["a\n", "b\n", "c\n", "D\n"] # d→D
269 merged, conflict = three_way_merge_lines(base, ours, theirs)
270 assert "B\n" in merged
271 assert "D\n" in merged
272 assert not conflict
273
274 def test_ours_inserts_line(self) -> None:
275 base = ["a\n", "c\n"]
276 ours = ["a\n", "b\n", "c\n"]
277 merged, conflict = three_way_merge_lines(base, ours, base)
278 assert merged == ours
279 assert not conflict
280
281 def test_theirs_deletes_line(self) -> None:
282 base = ["a\n", "b\n", "c\n"]
283 theirs = ["a\n", "c\n"]
284 merged, conflict = three_way_merge_lines(base, base, theirs)
285 assert merged == theirs
286 assert not conflict
287
288 def test_empty_base_both_add(self) -> None:
289 # Both add same content to empty base → clean
290 added = ["line1\n", "line2\n"]
291 merged, conflict = three_way_merge_lines([], added, added)
292 assert merged == added
293 assert not conflict
294
295 # ── Conflict cases ───────────────────────────────────────────────────────
296
297 def test_conflict_detected(self) -> None:
298 base = ["a\n", "b\n"]
299 ours = ["a\n", "O\n"]
300 theirs = ["a\n", "T\n"]
301 merged, conflict = three_way_merge_lines(base, ours, theirs)
302 assert conflict
303
304 def test_conflict_has_ours_marker(self) -> None:
305 base = ["a\n"]
306 ours = ["O\n"]
307 theirs = ["T\n"]
308 merged, _ = three_way_merge_lines(base, ours, theirs)
309 assert _has_marker(merged, "<<<<<<<")
310
311 def test_conflict_has_base_marker(self) -> None:
312 base = ["a\n"]
313 ours = ["O\n"]
314 theirs = ["T\n"]
315 merged, _ = three_way_merge_lines(base, ours, theirs)
316 assert _has_marker(merged, "|||||||")
317
318 def test_conflict_has_sep_marker(self) -> None:
319 base = ["a\n"]
320 ours = ["O\n"]
321 theirs = ["T\n"]
322 merged, _ = three_way_merge_lines(base, ours, theirs)
323 assert _has_marker(merged, "=======")
324
325 def test_conflict_has_end_marker(self) -> None:
326 base = ["a\n"]
327 ours = ["O\n"]
328 theirs = ["T\n"]
329 merged, _ = three_way_merge_lines(base, ours, theirs)
330 assert _has_marker(merged, ">>>>>>> end conflict")
331
332 def test_conflict_ours_content_present(self) -> None:
333 base = ["x\n"]
334 ours = ["ours_line\n"]
335 theirs = ["theirs_line\n"]
336 merged, _ = three_way_merge_lines(base, ours, theirs)
337 assert any("ours_line" in ln for ln in merged)
338
339 def test_conflict_theirs_content_present(self) -> None:
340 base = ["x\n"]
341 ours = ["ours_line\n"]
342 theirs = ["theirs_line\n"]
343 merged, _ = three_way_merge_lines(base, ours, theirs)
344 assert any("theirs_line" in ln for ln in merged)
345
346 def test_conflict_base_content_in_diff3_section(self) -> None:
347 base = ["base_line\n"]
348 ours = ["ours_line\n"]
349 theirs = ["theirs_line\n"]
350 merged, _ = three_way_merge_lines(base, ours, theirs)
351 assert any("base_line" in ln for ln in merged)
352
353 def test_stable_lines_outside_conflict_preserved(self) -> None:
354 base = ["preamble\n", "conflict_zone\n", "epilogue\n"]
355 ours = ["preamble\n", "ours_content\n", "epilogue\n"]
356 theirs = ["preamble\n", "theirs_content\n", "epilogue\n"]
357 merged, conflict = three_way_merge_lines(base, ours, theirs)
358 assert conflict
359 assert any("preamble" in ln for ln in merged)
360 assert any("epilogue" in ln for ln in merged)
361
362 def test_multiple_independent_conflicts(self) -> None:
363 base = ["a\n", "b\n", "c\n", "d\n", "e\n"]
364 ours = ["a\n", "O1\n", "c\n", "O2\n", "e\n"]
365 theirs = ["a\n", "T1\n", "c\n", "T2\n", "e\n"]
366 merged, conflict = three_way_merge_lines(base, ours, theirs)
367 assert conflict
368 assert sum(1 for ln in merged if ln.startswith("<<<<<<< ")) == 2
369
370 def test_both_empty_insertion_at_same_point_conflicts(self) -> None:
371 base = ["a\n", "c\n"]
372 ours = ["a\n", "B\n", "c\n"]
373 theirs = ["a\n", "T\n", "c\n"]
374 merged, conflict = three_way_merge_lines(base, ours, theirs)
375 assert conflict
376
377 def test_custom_labels_appear_in_markers(self) -> None:
378 base = ["x\n"]
379 ours = ["O\n"]
380 theirs = ["T\n"]
381 merged, _ = three_way_merge_lines(
382 base, ours, theirs,
383 label_ours="feature/login",
384 label_base="merge-base",
385 label_theirs="main",
386 )
387 assert any("feature/login" in ln for ln in merged)
388 assert any("main" in ln for ln in merged)
389 assert any("merge-base" in ln for ln in merged)
390
391
392 # ──────────────────────────────────────────────────────────────────────────────
393 # TestConflictMarkerFormat — exact marker syntax / Cohen action labels
394 # ──────────────────────────────────────────────────────────────────────────────
395
396 class TestConflictMarkerFormat:
397
398 def _conflict_merged(self, base_text: str, ours_text: str, theirs_text: str) -> list[str]:
399 base = base_text.splitlines(keepends=True)
400 ours = ours_text.splitlines(keepends=True)
401 theirs = theirs_text.splitlines(keepends=True)
402 merged, _ = three_way_merge_lines(base, ours, theirs)
403 return merged
404
405 def test_deleted_action_on_ours_marker(self) -> None:
406 merged = self._conflict_merged("func()\n", "", "other()\n")
407 assert any("<<<<<<< ours [deleted]" in ln for ln in merged)
408
409 def test_inserted_action_on_theirs_marker(self) -> None:
410 merged = self._conflict_merged("", "ours_line\n", "theirs_line\n")
411 assert any("[inserted]" in ln for ln in merged)
412
413 def test_modified_action_on_modified_conflict(self) -> None:
414 merged = self._conflict_merged("original\n", "ours_ver\n", "theirs_ver\n")
415 assert any("[modified]" in ln for ln in merged)
416
417 def test_base_section_marker_present(self) -> None:
418 merged = self._conflict_merged("base\n", "ours\n", "theirs\n")
419 assert any("|||||||" in ln for ln in merged)
420
421 def test_end_conflict_marker_present(self) -> None:
422 merged = self._conflict_merged("base\n", "ours\n", "theirs\n")
423 assert any(">>>>>>> end conflict" in ln for ln in merged)
424
425 def test_marker_ordering(self) -> None:
426 """<<<<<<< must precede ||||||| must precede ======= must precede >>>>>>>."""
427 merged = self._conflict_merged("base\n", "ours\n", "theirs\n")
428 positions = {}
429 for i, ln in enumerate(merged):
430 if "<<<<<<<" in ln:
431 positions["start"] = i
432 elif "|||||||" in ln:
433 positions["base"] = i
434 elif "=======" in ln:
435 positions["sep"] = i
436 elif ">>>>>>>" in ln:
437 positions["end"] = i
438 assert positions["start"] < positions["base"] < positions["sep"] < positions["end"]
439
440
441 # ──────────────────────────────────────────────────────────────────────────────
442 # TestFormatConflictDiff
443 # ──────────────────────────────────────────────────────────────────────────────
444
445 class TestFormatConflictDiff:
446 """Tests for format_conflict_diff() — the muse diff --conflict renderer."""
447
448 def _make_manifests(self) -> tuple[dict, dict, dict, MagicMock]:
449 base_content = b"base line\n"
450 ours_content = b"ours line\n"
451 theirs_content = b"theirs line\n"
452 path = "src/util.py"
453
454 base_oid = "aaaa" * 16
455 ours_oid = "bbbb" * 16
456 theirs_oid = "cccc" * 16
457
458 base_manifest = {path: base_oid}
459 ours_manifest = {path: ours_oid}
460 theirs_manifest = {path: theirs_oid}
461
462 def _read(root: pathlib.Path, oid: str) -> bytes | None:
463 return {base_oid: base_content, ours_oid: ours_content, theirs_oid: theirs_content}.get(oid)
464
465 return base_manifest, ours_manifest, theirs_manifest, _read
466
467 def test_output_contains_conflict_header(self, tmp_path: pathlib.Path) -> None:
468 base_m, ours_m, theirs_m, read_fn = self._make_manifests()
469 lines = format_conflict_diff(
470 "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn,
471 )
472 assert any("CONFLICT" in ln for ln in lines)
473 assert any("src/util.py" in ln for ln in lines)
474
475 def test_output_contains_ours_section(self, tmp_path: pathlib.Path) -> None:
476 base_m, ours_m, theirs_m, read_fn = self._make_manifests()
477 lines = format_conflict_diff(
478 "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn,
479 )
480 assert any("[ours]" in ln for ln in lines)
481
482 def test_output_contains_theirs_section(self, tmp_path: pathlib.Path) -> None:
483 base_m, ours_m, theirs_m, read_fn = self._make_manifests()
484 lines = format_conflict_diff(
485 "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn,
486 )
487 assert any("[theirs]" in ln for ln in lines)
488
489 def test_custom_labels_appear(self, tmp_path: pathlib.Path) -> None:
490 base_m, ours_m, theirs_m, read_fn = self._make_manifests()
491 lines = format_conflict_diff(
492 "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn,
493 ours_label="feature/auth",
494 theirs_label="main",
495 )
496 assert any("feature/auth" in ln for ln in lines)
497 assert any("main" in ln for ln in lines)
498
499 def test_returns_list_of_strings(self, tmp_path: pathlib.Path) -> None:
500 base_m, ours_m, theirs_m, read_fn = self._make_manifests()
501 result = format_conflict_diff(
502 "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn,
503 )
504 assert isinstance(result, list)
505 assert all(isinstance(ln, str) for ln in result)
506
507 def test_ansi_sanitized_in_path(self, tmp_path: pathlib.Path) -> None:
508 """ANSI escape sequences in the path must not appear in output."""
509 malicious_path = "\x1b[31mmalicious\x1b[0m/file.py"
510 base_m, ours_m, theirs_m, read_fn = self._make_manifests()
511 # Use the malicious path (will have no manifest entry → empty diffs)
512 lines = format_conflict_diff(
513 malicious_path, tmp_path, {}, {}, {}, read_fn,
514 )
515 output = "\n".join(lines)
516 assert "\x1b[31m" not in output
517
518 def test_missing_file_shows_no_changes_message(self, tmp_path: pathlib.Path) -> None:
519 def _read(root: pathlib.Path, oid: str) -> bytes | None:
520 return None
521
522 lines = format_conflict_diff(
523 "missing.py", tmp_path, {}, {}, {}, _read,
524 )
525 assert any("no changes" in ln.lower() for ln in lines)
526
527 def test_no_color_mode(self, tmp_path: pathlib.Path) -> None:
528 base_m, ours_m, theirs_m, read_fn = self._make_manifests()
529 lines = format_conflict_diff(
530 "src/util.py", tmp_path, base_m, ours_m, theirs_m, read_fn,
531 use_color=False,
532 )
533 output = "\n".join(lines)
534 assert "\x1b[" not in output
535
536
537 # ──────────────────────────────────────────────────────────────────────────────
538 # TestEdgeCases
539 # ──────────────────────────────────────────────────────────────────────────────
540
541 class TestEdgeCases:
542 def test_all_empty(self) -> None:
543 merged, conflict = three_way_merge_lines([], [], [])
544 assert merged == []
545 assert not conflict
546
547 def test_base_empty_ours_adds(self) -> None:
548 merged, conflict = three_way_merge_lines([], ["new\n"], [])
549 assert merged == ["new\n"]
550 assert not conflict
551
552 def test_base_empty_theirs_adds(self) -> None:
553 merged, conflict = three_way_merge_lines([], [], ["new\n"])
554 assert merged == ["new\n"]
555 assert not conflict
556
557 def test_base_empty_both_add_same(self) -> None:
558 merged, conflict = three_way_merge_lines([], ["same\n"], ["same\n"])
559 assert merged == ["same\n"]
560 assert not conflict
561
562 def test_base_empty_both_add_different_conflicts(self) -> None:
563 _, conflict = three_way_merge_lines([], ["A\n"], ["B\n"])
564 assert conflict
565
566 def test_ours_equals_base_theirs_deletes(self) -> None:
567 base = ["x\n"]
568 merged, conflict = three_way_merge_lines(base, base, [])
569 assert merged == []
570 assert not conflict
571
572 def test_both_delete_everything(self) -> None:
573 base = ["x\n", "y\n"]
574 merged, conflict = three_way_merge_lines(base, [], [])
575 assert merged == []
576 assert not conflict
577
578 def test_unicode_content(self) -> None:
579 base = ["héllo\n", "wörld\n"]
580 ours = ["héllo\n", "WÖRLD\n"]
581 theirs = ["héllo\n", "wörld\n"]
582 merged, conflict = three_way_merge_lines(base, ours, theirs)
583 assert not conflict
584 assert "WÖRLD\n" in merged
585
586 def test_long_lines_no_crash(self) -> None:
587 long_line = "x" * 10_000 + "\n"
588 base = [long_line]
589 ours = ["y" * 10_000 + "\n"]
590 theirs = [long_line]
591 merged, conflict = three_way_merge_lines(base, ours, theirs)
592 assert not conflict
593
594 def test_many_identical_lines(self) -> None:
595 base = ["same\n"] * 1000
596 merged, conflict = three_way_merge_lines(base, base, base)
597 assert merged == base
598 assert not conflict
599
600 def test_lines_without_trailing_newline(self) -> None:
601 # Not all editors guarantee trailing newlines.
602 base = ["no newline"]
603 ours = ["changed"]
604 merged, conflict = three_way_merge_lines(base, ours, base)
605 assert not conflict
606 assert "changed" in merged[0]
607
608
609 # ──────────────────────────────────────────────────────────────────────────────
610 # TestIntegration — realistic multi-hunk merges
611 # ──────────────────────────────────────────────────────────────────────────────
612
613 class TestIntegration:
614 """Realistic multi-hunk merge scenarios matching real developer workflows."""
615
616 def test_ours_adds_docstring_theirs_renames_param(self) -> None:
617 base = [
618 "def calculate(x):\n",
619 " return x * 2\n",
620 ]
621 ours = [
622 "def calculate(x):\n",
623 ' """Double the input."""\n',
624 " return x * 2\n",
625 ]
626 theirs = [
627 "def calculate(value):\n",
628 " return value * 2\n",
629 ]
630 merged, conflict = three_way_merge_lines(base, ours, theirs)
631 # ours inserted a docstring; theirs renamed x→value.
632 # They touch different parts of the function — should be conflict-free
633 # if ours and theirs don't overlap on line 1.
634 # (They DO overlap on line 1 if difflib groups them — conflict is acceptable)
635 # Just assert no crash and well-formed output.
636 assert isinstance(merged, list)
637 assert isinstance(conflict, bool)
638
639 def test_both_fix_same_typo(self) -> None:
640 base = ["# Calcualtion module\n"]
641 ours = ["# Calculation module\n"]
642 theirs = ["# Calculation module\n"]
643 merged, conflict = three_way_merge_lines(base, ours, theirs)
644 assert not conflict
645 assert merged == ours
646
647 def test_ours_deletes_block_theirs_modifies_different_block(self) -> None:
648 base = [
649 "block_a_line_1\n",
650 "block_a_line_2\n",
651 "separator\n",
652 "block_b_line_1\n",
653 "block_b_line_2\n",
654 ]
655 ours = [
656 "separator\n",
657 "block_b_line_1\n",
658 "block_b_line_2\n",
659 ]
660 theirs = [
661 "block_a_line_1\n",
662 "block_a_line_2\n",
663 "separator\n",
664 "block_b_LINE_1\n", # modified
665 "block_b_line_2\n",
666 ]
667 merged, conflict = three_way_merge_lines(base, ours, theirs)
668 # No overlap between ours' deletion (top) and theirs' modification (bottom)
669 assert not conflict
670 assert "block_b_LINE_1\n" in merged
671 assert "block_a_line_1\n" not in merged
672
673 def test_conflict_preserves_stable_context_above_and_below(self) -> None:
674 base = ["header\n", "conflict_zone\n", "footer\n"]
675 ours = ["header\n", "ours_version\n", "footer\n"]
676 theirs = ["header\n", "theirs_version\n", "footer\n"]
677 merged, conflict = three_way_merge_lines(base, ours, theirs)
678 assert conflict
679 text = "".join(merged)
680 assert "header\n" in text
681 assert "footer\n" in text
682
683 def test_append_only_change_ours(self) -> None:
684 base = ["line1\n", "line2\n"]
685 ours = ["line1\n", "line2\n", "appended\n"]
686 merged, conflict = three_way_merge_lines(base, ours, base)
687 assert not conflict
688 assert merged == ours
689
690 def test_prepend_only_change_theirs(self) -> None:
691 base = ["line1\n"]
692 theirs = ["prepended\n", "line1\n"]
693 merged, conflict = three_way_merge_lines(base, base, theirs)
694 assert not conflict
695 assert merged == theirs
696
697 def test_roundtrip_clean_merge_deterministic(self) -> None:
698 base = ["a\n", "b\n", "c\n"]
699 ours = ["a\n", "B\n", "c\n"]
700 theirs = ["a\n", "b\n", "C\n"]
701 m1, c1 = three_way_merge_lines(base, ours, theirs)
702 m2, c2 = three_way_merge_lines(base, ours, theirs)
703 assert m1 == m2
704 assert c1 == c2
705
706
707 # ──────────────────────────────────────────────────────────────────────────────
708 # TestStress
709 # ──────────────────────────────────────────────────────────────────────────────
710
711 class TestStress:
712 def test_large_file_clean_merge_fast(self) -> None:
713 """500-line file with non-overlapping changes in each half: < 2 s."""
714 base = [f"line_{i:04d}\n" for i in range(500)]
715 ours = list(base)
716 ours[50] = "OURS_CHANGE\n"
717 theirs = list(base)
718 theirs[450] = "THEIRS_CHANGE\n"
719
720 start = time.monotonic()
721 merged, conflict = three_way_merge_lines(base, ours, theirs)
722 elapsed = time.monotonic() - start
723
724 assert not conflict
725 assert elapsed < 2.0, f"merge took {elapsed:.2f}s"
726 assert "OURS_CHANGE\n" in merged
727 assert "THEIRS_CHANGE\n" in merged
728
729 def test_many_conflicts_no_crash(self) -> None:
730 """Alternating conflict zones throughout a 200-line file."""
731 base = [f"line_{i}\n" for i in range(200)]
732 ours = list(base)
733 theirs = list(base)
734 # Every 10th line conflicts.
735 for i in range(0, 200, 10):
736 ours[i] = f"ours_{i}\n"
737 theirs[i] = f"theirs_{i}\n"
738
739 merged, conflict = three_way_merge_lines(base, ours, theirs)
740 assert conflict
741 assert isinstance(merged, list)
742
743 def test_concurrent_merges_consistent(self) -> None:
744 """Concurrent invocations must return identical results (no shared mutable state)."""
745 base = ["a\n", "conflict\n", "b\n"]
746 ours = ["a\n", "ours\n", "b\n"]
747 theirs = ["a\n", "theirs\n", "b\n"]
748 results: list[tuple[list[str], bool]] = []
749 errors: list[Exception] = []
750
751 def _run() -> None:
752 try:
753 results.append(three_way_merge_lines(base, ours, theirs))
754 except Exception as exc:
755 errors.append(exc)
756
757 threads = [threading.Thread(target=_run) for _ in range(10)]
758 for t in threads:
759 t.start()
760 for t in threads:
761 t.join()
762
763 assert not errors
764 assert len(results) == 10
765 assert all(r == results[0] for r in results)
766
767
768 # ──────────────────────────────────────────────────────────────────────────────
769 # TestDataIntegrity
770 # ──────────────────────────────────────────────────────────────────────────────
771
772 class TestDataIntegrity:
773 def test_clean_merge_is_deterministic(self) -> None:
774 base = ["a\n", "b\n", "c\n"]
775 ours = ["a\n", "B\n", "c\n"]
776 theirs = ["a\n", "b\n", "C\n"]
777 r1, c1 = three_way_merge_lines(base, ours, theirs)
778 r2, c2 = three_way_merge_lines(base, ours, theirs)
779 assert r1 == r2
780 assert c1 == c2
781
782 def test_conflict_output_is_deterministic(self) -> None:
783 base = ["x\n"]
784 ours = ["O\n"]
785 theirs = ["T\n"]
786 r1, _ = three_way_merge_lines(base, ours, theirs)
787 r2, _ = three_way_merge_lines(base, ours, theirs)
788 assert r1 == r2
789
790 def test_input_sequences_not_mutated(self) -> None:
791 base = ["a\n", "b\n"]
792 ours = ["a\n", "O\n"]
793 theirs = ["a\n", "T\n"]
794 base_copy = list(base)
795 ours_copy = list(ours)
796 theirs_copy = list(theirs)
797 three_way_merge_lines(base, ours, theirs)
798 assert base == base_copy
799 assert ours == ours_copy
800 assert theirs == theirs_copy
801
802 def test_merged_content_contains_all_clean_changes(self) -> None:
803 # ours changes b→B (pos 1); theirs changes d→D (pos 3).
804 # 'a' (pos 0) and 'c' (pos 2) are stable in both LCS runs,
805 # so the two changes land in separate non-overlapping regions → clean.
806 base = ["a\n", "b\n", "c\n", "d\n"]
807 ours = ["a\n", "B\n", "c\n", "d\n"]
808 theirs = ["a\n", "b\n", "c\n", "D\n"]
809 merged, conflict = three_way_merge_lines(base, ours, theirs)
810 assert not conflict
811 merged_str = "".join(merged)
812 assert "B\n" in merged_str
813 assert "D\n" in merged_str
814 assert "a\n" in merged_str
815 assert "c\n" in merged_str
816
817 def test_conflict_markers_are_balanced(self) -> None:
818 """Every <<<<<<< must have a matching >>>>>>>."""
819 base = ["x\n", "y\n"]
820 ours = ["O1\n", "O2\n"]
821 theirs = ["T1\n", "T2\n"]
822 merged, _ = three_way_merge_lines(base, ours, theirs)
823 opens = sum(1 for ln in merged if ln.startswith("<<<<<<<"))
824 closes = sum(1 for ln in merged if ln.startswith(">>>>>>>"))
825 assert opens == closes
826 assert opens >= 1
827
828
829 # ──────────────────────────────────────────────────────────────────────────────
830 # TestSecurity
831 # ──────────────────────────────────────────────────────────────────────────────
832
833 class TestSecurity:
834 def test_ansi_in_content_does_not_spoof_markers(self) -> None:
835 """ANSI sequences in file content must not produce fake conflict markers."""
836 base = ["normal\n"]
837 ours = ["\x1b[31m<<<<<<< fake ours\x1b[0m\n"]
838 theirs = ["other\n"]
839 merged, conflict = three_way_merge_lines(base, ours, theirs)
840 # The ANSI-containing line is just content — the conflict marker check
841 # should find real markers (7 < chars), not ANSI-faked ones.
842 real_opens = sum(1 for ln in merged if ln.startswith("<<<<<<<"))
843 # There may be a real conflict (ours ≠ theirs), but the ANSI line itself
844 # should appear as content, not as an extra conflict opener.
845 assert isinstance(merged, list)
846 assert isinstance(conflict, bool)
847
848 def test_null_bytes_in_content_handled(self) -> None:
849 """Null bytes in text content must not crash the merge."""
850 base = ["a\x00b\n"]
851 ours = ["a\x00B\n"]
852 theirs = ["a\x00b\n"]
853 merged, conflict = three_way_merge_lines(base, ours, theirs)
854 assert not conflict # only ours changed
855
856 def test_very_long_label_no_overflow(self) -> None:
857 long_label = "x" * 2000
858 base = ["a\n"]
859 ours = ["O\n"]
860 theirs = ["T\n"]
861 merged, conflict = three_way_merge_lines(
862 base, ours, theirs, label_ours=long_label, label_theirs=long_label
863 )
864 assert conflict
865 assert any(long_label in ln for ln in merged)
866
867 def test_label_injection_via_newline(self) -> None:
868 """Newline in a label must not inject extra lines into markers."""
869 malicious = "branch\n>>>>>>> malicious injection"
870 base = ["a\n"]
871 ours = ["O\n"]
872 theirs = ["T\n"]
873 merged, _ = three_way_merge_lines(base, ours, theirs, label_ours=malicious)
874 # The malicious label appears in the <<<<<<< line; verify no phantom >>>>>>> before the real one
875 positions_of_end = [i for i, ln in enumerate(merged) if ">>>>>>> end conflict" in ln]
876 positions_of_open = [i for i, ln in enumerate(merged) if ln.startswith("<<<<<<<")]
877 assert len(positions_of_end) >= 1
878 assert len(positions_of_open) >= 1
879
880 def test_format_conflict_diff_ansi_in_path_sanitized(
881 self, tmp_path: pathlib.Path
882 ) -> None:
883 """ANSI codes in the file path argument must not appear in rendered output."""
884 malicious_path = "\x1b[31minjected\x1b[0m/file.py"
885 lines = format_conflict_diff(
886 malicious_path, tmp_path, {}, {}, {},
887 lambda root, oid: None,
888 )
889 output = "\n".join(lines)
890 assert "\x1b[31m" not in output
File History 1 commit
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago