gabriel / muse public
test_op_merge.py python
704 lines 27.8 KB
Raw
sha256:26e130b630b6a51f141dc4cc495ddfd11a1186ef4407b9530ed1c8ae528cd3ce fix: ops_commute handles AddressedInsertOp/AddressedDeleteO… Sonnet 4.6 patch 3 days ago
1 """Tests for the operation-level merge engine.
2
3 Covers every commutativity rule from the spec table, the sequence position
4 adjustment function, and the full three-way ``merge_op_lists`` algorithm.
5 Each test is named after the specific behaviour it verifies so that a failure
6 message is self-documenting.
7 """
8
9 import pytest
10
11 from muse.core.op_merge import (
12 MergeOpsResult,
13 _adjust_insert_positions,
14 _op_key,
15 adjust_sequence_positions,
16 merge_op_lists,
17 merge_structured,
18 ops_commute,
19 )
20 from muse.domain import (
21 AddressedDeleteOp,
22 AddressedInsertOp,
23 DeleteOp,
24 DomainOp,
25 InsertOp,
26 MoveOp,
27 PatchOp,
28 ReplaceOp,
29 StructuredDelta,
30 )
31
32
33 # ---------------------------------------------------------------------------
34 # Helpers for building typed ops
35 # ---------------------------------------------------------------------------
36
37
38 def _ins(addr: str, pos: int | None, cid: str = "cid-a") -> InsertOp:
39 return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid)
40
41
42 def _del(addr: str, pos: int | None, cid: str = "cid-a") -> DeleteOp:
43 return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid)
44
45
46 def _mov(addr: str, from_pos: int, to_pos: int, cid: str = "cid-a") -> MoveOp:
47 return MoveOp(op="move", address=addr, from_position=from_pos, to_position=to_pos, content_id=cid)
48
49
50 def _rep(addr: str, old: str, new: str) -> ReplaceOp:
51 return ReplaceOp(
52 op="replace",
53 address=addr,
54 position=None,
55 old_content_id=old,
56 new_content_id=new,
57 old_summary="old",
58 new_summary="new",
59 )
60
61
62 def _patch(addr: str, child_ops: list[DomainOp] | None = None) -> PatchOp:
63 return PatchOp(
64 op="patch",
65 address=addr,
66 child_ops=child_ops or [],
67 child_domain="test",
68 child_summary="test patch",
69 )
70
71
72 def _delta(ops: list[DomainOp], *, domain: str = "midi") -> StructuredDelta:
73 return StructuredDelta(domain=domain, ops=ops, summary="test")
74
75
76 # ===========================================================================
77 # Part 1 — ops_commute: commutativity oracle
78 # ===========================================================================
79
80
81 class TestOpsCommuteInserts:
82 def test_inserts_at_different_positions_commute(self) -> None:
83 a = _ins("f.mid", pos=2)
84 b = _ins("f.mid", pos=5)
85 assert ops_commute(a, b) is True
86
87 def test_inserts_at_same_position_do_not_commute(self) -> None:
88 a = _ins("f.mid", pos=3)
89 b = _ins("f.mid", pos=3)
90 assert ops_commute(a, b) is False
91
92 def test_inserts_with_none_position_commute_unordered(self) -> None:
93 a = _ins("files/", pos=None, cid="aa")
94 b = _ins("files/", pos=None, cid="bb")
95 assert ops_commute(a, b) is True
96
97 def test_inserts_with_none_and_int_position_commute_unordered(self) -> None:
98 # If either side is unordered, treat as commuting.
99 a = _ins("files/", pos=None)
100 b = _ins("files/", pos=3)
101 assert ops_commute(a, b) is True
102
103 def test_inserts_at_different_addresses_commute(self) -> None:
104 a = _ins("a.mid", pos=0)
105 b = _ins("b.mid", pos=0)
106 assert ops_commute(a, b) is True
107
108
109 class TestOpsCommuteDeletes:
110 def test_deletes_at_different_addresses_commute(self) -> None:
111 assert ops_commute(_del("a.mid", 0), _del("b.mid", 0)) is True
112
113 def test_consensus_delete_same_address_commutes(self) -> None:
114 # Both branches deleted the same file — idempotent, not a conflict.
115 a = _del("f.mid", pos=0, cid="same")
116 b = _del("f.mid", pos=0, cid="same")
117 assert ops_commute(a, b) is True
118
119 def test_deletes_at_same_address_different_content_still_commute(self) -> None:
120 # Two deletes always commute — the result is "both deleted something".
121 a = _del("f.mid", pos=1, cid="c1")
122 b = _del("f.mid", pos=2, cid="c2")
123 assert ops_commute(a, b) is True
124
125
126 class TestOpsCommuteReplaces:
127 def test_replaces_at_different_addresses_commute(self) -> None:
128 assert ops_commute(_rep("a.mid", "o", "n"), _rep("b.mid", "o", "n")) is True
129
130 def test_replaces_at_same_address_do_not_commute(self) -> None:
131 assert ops_commute(_rep("f.mid", "old", "v1"), _rep("f.mid", "old", "v2")) is False
132
133
134 class TestOpsCommuteMoves:
135 def test_moves_from_different_positions_commute(self) -> None:
136 assert ops_commute(_mov("f.mid", 2, 5), _mov("f.mid", 7, 1)) is True
137
138 def test_moves_from_same_position_do_not_commute(self) -> None:
139 assert ops_commute(_mov("f.mid", 3, 0), _mov("f.mid", 3, 9)) is False
140
141 def test_move_and_delete_same_position_do_not_commute(self) -> None:
142 move = _mov("f.mid", 5, 9)
143 delete = _del("f.mid", pos=5)
144 assert ops_commute(move, delete) is False
145
146 def test_move_and_delete_different_positions_commute(self) -> None:
147 move = _mov("f.mid", 5, 9)
148 delete = _del("f.mid", pos=2)
149 assert ops_commute(move, delete) is True
150
151 def test_delete_and_move_same_position_is_symmetric(self) -> None:
152 move = _mov("f.mid", 5, 9)
153 delete = _del("f.mid", pos=5)
154 # commute(move, delete) == commute(delete, move)
155 assert ops_commute(delete, move) is False
156
157 def test_delete_with_none_position_and_move_commute(self) -> None:
158 move = _mov("f.mid", 5, 9)
159 delete = _del("files/", pos=None)
160 assert ops_commute(move, delete) is True
161
162
163 class TestOpsCommutePatches:
164 def test_patches_at_different_addresses_commute(self) -> None:
165 a = _patch("a.mid")
166 b = _patch("b.mid")
167 assert ops_commute(a, b) is True
168
169 def test_patch_at_same_address_with_non_conflicting_children_commutes(self) -> None:
170 child_a = _ins("note:0", pos=1)
171 child_b = _ins("note:0", pos=5)
172 a = _patch("f.mid", child_ops=[child_a])
173 b = _patch("f.mid", child_ops=[child_b])
174 assert ops_commute(a, b) is True
175
176 def test_patch_at_same_address_with_conflicting_children_does_not_commute(self) -> None:
177 child_a = _rep("note:0", "old", "v1")
178 child_b = _rep("note:0", "old", "v2")
179 a = _patch("f.mid", child_ops=[child_a])
180 b = _patch("f.mid", child_ops=[child_b])
181 assert ops_commute(a, b) is False
182
183 def test_empty_patch_children_always_commute(self) -> None:
184 a = _patch("f.mid", child_ops=[])
185 b = _patch("f.mid", child_ops=[])
186 assert ops_commute(a, b) is True
187
188
189 class TestOpsCommuteMixedTypes:
190 def test_insert_and_delete_at_different_addresses_commute(self) -> None:
191 assert ops_commute(_ins("a.mid", 0), _del("b.mid", 0)) is True
192
193 def test_insert_and_delete_at_same_address_do_not_commute(self) -> None:
194 assert ops_commute(_ins("f.mid", 2), _del("f.mid", 5)) is False
195
196 def test_delete_and_insert_symmetry(self) -> None:
197 a = _ins("f.mid", 2)
198 b = _del("f.mid", 5)
199 assert ops_commute(a, b) == ops_commute(b, a)
200
201 def test_replace_and_insert_at_different_addresses_commute(self) -> None:
202 assert ops_commute(_rep("a.mid", "o", "n"), _ins("b.mid", 0)) is True
203
204 def test_replace_and_insert_at_same_address_do_not_commute(self) -> None:
205 assert ops_commute(_rep("f.mid", "o", "n"), _ins("f.mid", 0)) is False
206
207 def test_patch_and_replace_at_different_addresses_commute(self) -> None:
208 assert ops_commute(_patch("a.mid"), _rep("b.mid", "o", "n")) is True
209
210 def test_patch_and_replace_at_same_address_do_not_commute(self) -> None:
211 assert ops_commute(_patch("f.mid"), _rep("f.mid", "o", "n")) is False
212
213
214 # ===========================================================================
215 # Part 2 — transform: position adjustment for commuting ops
216 # ===========================================================================
217
218
219 class TestAdjustSequencePositions:
220 def test_insert_before_insert_shifts_later_op(self) -> None:
221 # a inserts at pos 2, b inserts at pos 5. a < b, so b' = 6.
222 a = _ins("f.mid", pos=2, cid="a")
223 b = _ins("f.mid", pos=5, cid="b")
224 a_prime, b_prime = adjust_sequence_positions(a, b)
225 assert a_prime["position"] == 2 # unchanged
226 assert b_prime["position"] == 6 # shifted by a
227
228 def test_insert_after_insert_shifts_earlier_op(self) -> None:
229 # a inserts at pos 7, b inserts at pos 3. a > b, so a' = 8.
230 a = _ins("f.mid", pos=7, cid="a")
231 b = _ins("f.mid", pos=3, cid="b")
232 a_prime, b_prime = adjust_sequence_positions(a, b)
233 assert a_prime["position"] == 8 # shifted by b
234 assert b_prime["position"] == 3 # unchanged
235
236 def test_adjust_preserves_content_id(self) -> None:
237 a = _ins("f.mid", pos=1, cid="note-a")
238 b = _ins("f.mid", pos=10, cid="note-b")
239 a_prime, b_prime = adjust_sequence_positions(a, b)
240 assert a_prime["content_id"] == "note-a"
241 assert b_prime["content_id"] == "note-b"
242
243 def test_unordered_inserts_identity(self) -> None:
244 # position=None → identity (unordered collection, no adjustment needed).
245 a = _ins("files/", pos=None, cid="a")
246 b = _ins("files/", pos=None, cid="b")
247 a_prime, b_prime = adjust_sequence_positions(a, b)
248 assert a_prime is a
249 assert b_prime is b
250
251 def test_non_insert_ops_identity(self) -> None:
252 # For all non-sequence commuting pairs, returns identity.
253 a = _del("a.mid", pos=3)
254 b = _del("b.mid", pos=7)
255 a_prime, b_prime = adjust_sequence_positions(a, b)
256 assert a_prime is a
257 assert b_prime is b
258
259 def test_replace_ops_identity(self) -> None:
260 a = _rep("a.mid", "o", "n")
261 b = _rep("b.mid", "o", "n")
262 a_prime, b_prime = adjust_sequence_positions(a, b)
263 assert a_prime is a
264 assert b_prime is b
265
266 def test_diamond_property_two_inserts(self) -> None:
267 """Verify that a ∘ b' == b ∘ a' — the diamond convergence property.
268
269 We simulate applying inserts to a sequence and check the final order
270 matches regardless of which is applied first.
271 """
272 # Start with base list indices [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
273 # a = insert 'X' at position 3; b = insert 'Y' at position 7
274 a = _ins("seq", pos=3, cid="X")
275 b = _ins("seq", pos=7, cid="Y")
276 a_prime, b_prime = adjust_sequence_positions(a, b)
277
278 # Apply a then b': X at 3, Y at 8 → [0,1,2,X,3,4,5,6,7,Y,8,9]
279 seq = list(range(10))
280 a_pos = a["position"]
281 b_prime_pos = b_prime["position"]
282 assert a_pos is not None and b_prime_pos is not None
283 seq.insert(a_pos, "X")
284 seq.insert(b_prime_pos, "Y")
285 path_ab = seq[:]
286
287 # Apply b then a': Y at 7, X at 3 → [0,1,2,X,3,4,5,6,Y,7,8,9]
288 seq2 = list(range(10))
289 b_pos = b["position"]
290 a_prime_pos = a_prime["position"]
291 assert b_pos is not None and a_prime_pos is not None
292 seq2.insert(b_pos, "Y")
293 seq2.insert(a_prime_pos, "X")
294 path_ba = seq2[:]
295
296 assert path_ab == path_ba
297
298
299 # ===========================================================================
300 # Part 3 — _adjust_insert_positions (counting formula)
301 # ===========================================================================
302
303
304 class TestAdjustInsertPositions:
305 def test_no_other_ops_identity(self) -> None:
306 ops = [_ins("f.mid", pos=5, cid="a")]
307 result = _adjust_insert_positions(ops, [])
308 assert result[0]["position"] == 5
309
310 def test_single_other_before_shifts_position(self) -> None:
311 ops = [_ins("f.mid", pos=5, cid="a")]
312 others = [_ins("f.mid", pos=3, cid="x")]
313 result = _adjust_insert_positions(ops, others)
314 assert result[0]["position"] == 6 # shifted by 1
315
316 def test_other_after_does_not_shift(self) -> None:
317 ops = [_ins("f.mid", pos=3, cid="a")]
318 others = [_ins("f.mid", pos=5, cid="x")]
319 result = _adjust_insert_positions(ops, others)
320 assert result[0]["position"] == 3 # unchanged
321
322 def test_multiple_others_all_before_shifts_by_count(self) -> None:
323 ops = [_ins("f.mid", pos=10, cid="a")]
324 others = [_ins("f.mid", pos=2, cid="x"), _ins("f.mid", pos=7, cid="y")]
325 result = _adjust_insert_positions(ops, others)
326 assert result[0]["position"] == 12 # shifted by 2
327
328 def test_mixed_addresses_does_not_cross_contaminate(self) -> None:
329 ops = [_ins("a.mid", pos=3, cid="a")]
330 others = [_ins("b.mid", pos=1, cid="x")] # different address
331 result = _adjust_insert_positions(ops, others)
332 assert result[0]["position"] == 3 # not shifted
333
334 def test_non_insert_ops_pass_through_unchanged(self) -> None:
335 ops: list[DomainOp] = [_del("f.mid", pos=3, cid="x")]
336 result = _adjust_insert_positions(ops, [_ins("f.mid", pos=1, cid="y")])
337 assert result[0] is ops[0]
338
339 def test_unordered_insert_passes_through(self) -> None:
340 ops = [_ins("files/", pos=None, cid="a")]
341 others = [_ins("files/", pos=None, cid="x")]
342 result = _adjust_insert_positions(ops, others)
343 assert result[0]["position"] is None
344
345 def test_concrete_example_four_note_insertions(self) -> None:
346 """Verify counting formula on the four-note example from the spec."""
347 ours = [_ins("f.mid", pos=5, cid="V"), _ins("f.mid", pos=10, cid="W")]
348 theirs = [_ins("f.mid", pos=3, cid="X"), _ins("f.mid", pos=8, cid="Y")]
349
350 ours_adj = _adjust_insert_positions(ours, theirs)
351 theirs_adj = _adjust_insert_positions(theirs, ours)
352
353 # V(5) shifted by X(3) which is ≤ 5: V → 6
354 assert ours_adj[0]["position"] == 6
355 # W(10) shifted by X(3) and Y(8) both ≤ 10: W → 12
356 assert ours_adj[1]["position"] == 12
357 # X(3) no ours inserts ≤ 3: stays 3
358 assert theirs_adj[0]["position"] == 3
359 # Y(8) shifted by V(5) ≤ 8: Y → 9
360 assert theirs_adj[1]["position"] == 9
361
362
363 # ===========================================================================
364 # Part 4 — merge_op_lists: three-way merge
365 # ===========================================================================
366
367
368 class TestMergeOpLists:
369 def test_empty_inputs_return_empty_result(self) -> None:
370 result = merge_op_lists([], [], [])
371 assert result.merged_ops == []
372 assert result.conflict_ops == []
373 assert result.is_clean is True
374
375 def test_ours_only_additions_pass_through(self) -> None:
376 op = _ins("f.mid", pos=2, cid="x")
377 result = merge_op_lists([], [op], [])
378 assert len(result.merged_ops) == 1
379 assert result.conflict_ops == []
380
381 def test_theirs_only_additions_pass_through(self) -> None:
382 op = _del("f.mid", pos=0)
383 result = merge_op_lists([], [], [op])
384 assert len(result.merged_ops) == 1
385 assert result.conflict_ops == []
386
387 def test_non_conflicting_inserts_both_included(self) -> None:
388 ours_op = _ins("f.mid", pos=2, cid="V")
389 theirs_op = _ins("f.mid", pos=5, cid="W")
390 result = merge_op_lists([], [ours_op], [theirs_op])
391 assert result.is_clean is True
392 positions = {op["position"] for op in result.merged_ops if op["op"] == "insert"}
393 # Ours at 2 stays 2 (no theirs ≤ 2); theirs at 5 → 6 (ours at 2 ≤ 5).
394 assert 2 in positions
395 assert 6 in positions
396
397 def test_same_position_insert_produces_conflict(self) -> None:
398 ours_op = _ins("f.mid", pos=3, cid="A")
399 theirs_op = _ins("f.mid", pos=3, cid="B")
400 result = merge_op_lists([], [ours_op], [theirs_op])
401 assert not result.is_clean
402 assert len(result.conflict_ops) == 1
403 assert result.conflict_ops[0][0]["content_id"] == "A"
404 assert result.conflict_ops[0][1]["content_id"] == "B"
405
406 def test_consensus_addition_included_once(self) -> None:
407 op = _ins("f.mid", pos=4, cid="shared")
408 result = merge_op_lists([], [op], [op])
409 # Consensus: both added the same op → include exactly once.
410 assert len(result.merged_ops) == 1
411 assert result.conflict_ops == []
412
413 def test_base_ops_kept_by_both_sides_included(self) -> None:
414 base_op = _ins("f.mid", pos=0, cid="base")
415 # Both sides still have the base op.
416 result = merge_op_lists([base_op], [base_op], [base_op])
417 assert base_op in result.merged_ops
418
419 def test_base_op_deleted_by_ours_not_in_merged(self) -> None:
420 base_op = _ins("f.mid", pos=0, cid="base")
421 # Ours removed it, theirs kept it.
422 result = merge_op_lists([base_op], [], [base_op])
423 # The base op is NOT in kept (ours removed it) and NOT in ours_new
424 # (it was in base). It remains in theirs, so theirs "kept" it.
425 # Only ops in base AND in both branches end up in kept.
426 assert base_op not in result.merged_ops
427
428 def test_replace_conflict_at_same_address(self) -> None:
429 ours_op = _rep("f.mid", "old", "v-ours")
430 theirs_op = _rep("f.mid", "old", "v-theirs")
431 result = merge_op_lists([], [ours_op], [theirs_op])
432 assert not result.is_clean
433 assert len(result.conflict_ops) == 1
434
435 def test_replace_at_different_addresses_no_conflict(self) -> None:
436 ours_op = _rep("a.mid", "old", "new-a")
437 theirs_op = _rep("b.mid", "old", "new-b")
438 result = merge_op_lists([], [ours_op], [theirs_op])
439 assert result.is_clean
440 assert len(result.merged_ops) == 2
441
442 def test_consensus_delete_included_once(self) -> None:
443 del_op = _del("f.mid", pos=2, cid="gone")
444 result = merge_op_lists([], [del_op], [del_op])
445 assert len(result.merged_ops) == 1
446
447 def test_note_level_multi_insert_positions_adjusted_correctly(self) -> None:
448 """Simulate two musicians adding notes at non-overlapping bars."""
449 ours_ops: list[DomainOp] = [
450 _ins("lead.mid", pos=5, cid="note-A"),
451 _ins("lead.mid", pos=10, cid="note-B"),
452 ]
453 theirs_ops: list[DomainOp] = [
454 _ins("lead.mid", pos=3, cid="note-X"),
455 _ins("lead.mid", pos=8, cid="note-Y"),
456 ]
457 result = merge_op_lists([], ours_ops, theirs_ops)
458 assert result.is_clean is True
459 assert len(result.merged_ops) == 4
460
461 # Expected positions after adjustment (counting formula):
462 # note-A(5) → 5 + count(theirs ≤ 5) = 5 + 1[X(3)] = 6
463 # note-B(10) → 10 + 2[X(3),Y(8)] = 12
464 # note-X(3) → 3 + 0 = 3
465 # note-Y(8) → 8 + 1[A(5)] = 9
466 pos_by_cid = {
467 op["content_id"]: op["position"]
468 for op in result.merged_ops
469 if op["op"] == "insert"
470 }
471 assert pos_by_cid["note-A"] == 6
472 assert pos_by_cid["note-B"] == 12
473 assert pos_by_cid["note-X"] == 3
474 assert pos_by_cid["note-Y"] == 9
475
476 def test_mixed_conflict_and_clean_ops(self) -> None:
477 """A conflict on one file should not contaminate clean ops on others."""
478 conflict_ours = _rep("shared.mid", "old", "v-ours")
479 conflict_theirs = _rep("shared.mid", "old", "v-theirs")
480 clean_ours = _ins("only-ours.mid", pos=0, cid="ours-new-file")
481 clean_theirs = _del("only-theirs.mid", pos=2, cid="their-del")
482
483 result = merge_op_lists(
484 [],
485 [conflict_ours, clean_ours],
486 [conflict_theirs, clean_theirs],
487 )
488 assert len(result.conflict_ops) == 1
489 # Clean ops from both sides should appear in merged.
490 merged_cids = {
491 op.get("content_id", "") or op.get("new_content_id", "")
492 for op in result.merged_ops
493 }
494 assert "ours-new-file" in merged_cids
495 assert "their-del" in merged_cids
496
497 def test_patch_ops_at_different_files_both_included(self) -> None:
498 ours_op = _patch("track-a.mid")
499 theirs_op = _patch("track-b.mid")
500 result = merge_op_lists([], [ours_op], [theirs_op])
501 assert result.is_clean is True
502 assert len(result.merged_ops) == 2
503
504 def test_patch_ops_at_same_file_with_non_conflicting_children(self) -> None:
505 child_a = _ins("note:0", pos=1, cid="note-1")
506 child_b = _ins("note:0", pos=4, cid="note-2")
507 ours_op = _patch("f.mid", child_ops=[child_a])
508 theirs_op = _patch("f.mid", child_ops=[child_b])
509 result = merge_op_lists([], [ours_op], [theirs_op])
510 # PatchOps at same address with commuting children should commute.
511 assert result.is_clean is True
512
513 def test_move_and_delete_conflict_detected(self) -> None:
514 move_op = _mov("f.mid", from_pos=5, to_pos=0)
515 del_op = _del("f.mid", pos=5)
516 result = merge_op_lists([], [move_op], [del_op])
517 assert not result.is_clean
518
519 def test_merge_op_lists_is_deterministic(self) -> None:
520 """Same inputs → same output on every call."""
521 ours = [_ins("f.mid", pos=2, cid="a"), _del("g.mid", pos=0, cid="b")]
522 theirs = [_ins("f.mid", pos=7, cid="c"), _rep("h.mid", "x", "y")]
523 r1 = merge_op_lists([], ours, theirs)
524 r2 = merge_op_lists([], ours, theirs)
525 assert [_op_key(o) for o in r1.merged_ops] == [_op_key(o) for o in r2.merged_ops]
526 assert r1.conflict_ops == r2.conflict_ops
527
528
529 # ===========================================================================
530 # Part 5 — merge_structured: StructuredDelta entry point
531 # ===========================================================================
532
533
534 class TestMergeStructured:
535 def test_empty_deltas_produce_clean_result(self) -> None:
536 base = _delta([])
537 ours = _delta([])
538 theirs = _delta([])
539 result = merge_structured(base, ours, theirs)
540 assert result.is_clean is True
541 assert result.merged_ops == []
542
543 def test_non_conflicting_deltas_auto_merge(self) -> None:
544 op_a = _ins("a.mid", pos=1, cid="A")
545 op_b = _ins("b.mid", pos=2, cid="B")
546 result = merge_structured(_delta([]), _delta([op_a]), _delta([op_b]))
547 assert result.is_clean is True
548 assert len(result.merged_ops) == 2
549
550 def test_conflicting_deltas_reported(self) -> None:
551 op_a = _rep("shared.mid", "old", "v-a")
552 op_b = _rep("shared.mid", "old", "v-b")
553 result = merge_structured(_delta([]), _delta([op_a]), _delta([op_b]))
554 assert not result.is_clean
555 assert len(result.conflict_ops) == 1
556
557 def test_base_ops_respected_by_both_sides(self) -> None:
558 shared = _ins("f.mid", pos=0, cid="shared")
559 result = merge_structured(
560 _delta([shared]),
561 _delta([shared, _ins("f.mid", pos=5, cid="extra-ours")]),
562 _delta([shared]),
563 )
564 assert result.is_clean is True
565 # The 'shared' op is kept; 'extra-ours' is new and passes through.
566 assert len(result.merged_ops) >= 1
567
568
569 # ===========================================================================
570 # Part 6 — MergeOpsResult
571 # ===========================================================================
572
573
574 class TestMergeOpsResult:
575 def test_is_clean_when_no_conflicts(self) -> None:
576 r = MergeOpsResult(merged_ops=[], conflict_ops=[])
577 assert r.is_clean is True
578
579 def test_is_not_clean_when_conflicts_present(self) -> None:
580 a = _ins("f.mid", pos=1)
581 b = _ins("f.mid", pos=1)
582 r = MergeOpsResult(merged_ops=[], conflict_ops=[(a, b)])
583 assert r.is_clean is False
584
585 def test_default_factory_empty_lists(self) -> None:
586 r = MergeOpsResult()
587 assert r.merged_ops == []
588 assert r.conflict_ops == []
589
590
591 # ===========================================================================
592 # Part 7 — _op_key determinism and uniqueness
593 # ===========================================================================
594
595
596 class TestOpKey:
597 def test_insert_key_includes_all_fields(self) -> None:
598 op = _ins("f.mid", pos=3, cid="abc")
599 key = _op_key(op)
600 assert "insert" in key
601 assert "f.mid" in key
602 assert "3" in key
603 assert "abc" in key
604
605 def test_same_op_produces_same_key(self) -> None:
606 op = _del("f.mid", pos=2, cid="xyz")
607 assert _op_key(op) == _op_key(op)
608
609 def test_different_positions_produce_different_keys(self) -> None:
610 a = _ins("f.mid", pos=1, cid="c")
611 b = _ins("f.mid", pos=2, cid="c")
612 assert _op_key(a) != _op_key(b)
613
614 def test_move_key_includes_from_and_to(self) -> None:
615 op = _mov("f.mid", from_pos=3, to_pos=7)
616 key = _op_key(op)
617 assert "3" in key
618 assert "7" in key
619
620 def test_replace_key_includes_old_and_new(self) -> None:
621 op = _rep("f.mid", "old-id", "new-id")
622 key = _op_key(op)
623 assert "old-id" in key
624 assert "new-id" in key
625
626 def test_patch_key_includes_address_and_domain(self) -> None:
627 op = _patch("f.mid")
628 key = _op_key(op)
629 assert "patch" in key
630 assert "f.mid" in key
631
632
633 # ---------------------------------------------------------------------------
634 # Helpers for AddressedInsertOp / AddressedDeleteOp (code domain — no position)
635 # ---------------------------------------------------------------------------
636
637 def _ains(addr: str, cid: str = "cid-a") -> AddressedInsertOp:
638 return AddressedInsertOp(op="insert", address=addr, content_id=cid, content_summary=cid)
639
640
641 def _adel(addr: str, cid: str = "cid-a") -> AddressedDeleteOp:
642 return AddressedDeleteOp(op="delete", address=addr, content_id=cid, content_summary=cid)
643
644
645 # ===========================================================================
646 # AddressedInsertOp / AddressedDeleteOp — code domain, no position key
647 # ===========================================================================
648
649
650 class TestOpsCommuteAddressedOps:
651 """ops_commute must handle AddressedInsertOp/AddressedDeleteOp (no position key).
652
653 Before the fix these raised KeyError: 'position' because ops_commute did
654 a["position"] unconditionally for any op with op=="insert" or op=="delete".
655 """
656
657 def test_addressed_inserts_at_different_addresses_commute(self) -> None:
658 a = _ains("src/main.py::foo")
659 b = _ains("src/main.py::bar")
660 assert ops_commute(a, b) is True
661
662 def test_addressed_inserts_at_same_address_do_not_commute(self) -> None:
663 # Two branches both added the same symbol — conflict.
664 a = _ains("src/main.py::foo", cid="v1")
665 b = _ains("src/main.py::foo", cid="v2")
666 assert ops_commute(a, b) is False
667
668 def test_addressed_deletes_at_different_addresses_commute(self) -> None:
669 a = _adel("src/main.py::foo")
670 b = _adel("src/main.py::bar")
671 assert ops_commute(a, b) is True
672
673 def test_addressed_deletes_at_same_address_commute_consensus(self) -> None:
674 # Consensus delete: both branches deleted the same symbol — fine.
675 a = _adel("src/main.py::foo")
676 b = _adel("src/main.py::foo")
677 assert ops_commute(a, b) is True
678
679 def test_addressed_insert_and_addressed_delete_different_addresses_commute(self) -> None:
680 a = _ains("src/a.py::foo")
681 b = _adel("src/b.py::bar")
682 assert ops_commute(a, b) is True
683
684 def test_addressed_delete_and_move_commute(self) -> None:
685 # AddressedDeleteOp (code) vs MoveOp (MIDI) — different domains, always commutes.
686 a = _adel("src/main.py::foo")
687 b = _mov("track.mid", from_pos=0, to_pos=3)
688 assert ops_commute(a, b) is True
689
690 def test_move_and_addressed_delete_commute(self) -> None:
691 a = _mov("track.mid", from_pos=0, to_pos=3)
692 b = _adel("src/main.py::foo")
693 assert ops_commute(a, b) is True
694
695 def test_addressed_insert_inside_patch_commutes_at_different_addresses(self) -> None:
696 # PatchOp containing AddressedInsertOps — the code plugin's actual call pattern.
697 patch_a = _patch("src/main.py", child_ops=[_ains("src/main.py::foo")])
698 patch_b = _patch("src/main.py", child_ops=[_ains("src/main.py::bar")])
699 assert ops_commute(patch_a, patch_b) is True
700
701 def test_addressed_insert_inside_patch_conflicts_at_same_address(self) -> None:
702 patch_a = _patch("src/main.py", child_ops=[_ains("src/main.py::foo", cid="v1")])
703 patch_b = _patch("src/main.py", child_ops=[_ains("src/main.py::foo", cid="v2")])
704 assert ops_commute(patch_a, patch_b) is False
File History 1 commit
sha256:26e130b630b6a51f141dc4cc495ddfd11a1186ef4407b9530ed1c8ae528cd3ce fix: ops_commute handles AddressedInsertOp/AddressedDeleteO… Sonnet 4.6 patch 3 days ago