gabriel / muse public
test_core_merge_engine.py python
616 lines 25.0 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago
1 """Tests for muse.core.merge_engine — three-way merge logic.
2
3 Extended to cover the address-keyed map merge path via
4 :func:`~muse.core.op_merge.merge_structured` and the
5 :class:`~muse.domain.AddressedMergePlugin` integration.
6 """
7
8 from collections.abc import Mapping
9 import datetime
10 import json
11 import pathlib
12 import unittest.mock
13
14 import pytest
15
16 from muse.core.types import blob_id, long_id
17 from muse.core.merge_engine import (
18 MergeState,
19 apply_merge,
20 clear_merge_state,
21 detect_conflicts,
22 diff_snapshots,
23 find_merge_base,
24 read_merge_state,
25 write_merge_state,
26 )
27 from muse.core.op_merge import MergeOpsResult, merge_op_lists, merge_structured
28 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
29 from muse.core.commits import (
30 CommitRecord,
31 write_commit,
32 )
33 from muse.domain import (
34 DeleteOp,
35 DomainOp,
36 InsertOp,
37 ReplaceOp,
38 SnapshotManifest,
39 AddressedMergePlugin,
40 StructuredDelta,
41 )
42 from muse.core.attributes import AttributeRule
43 from muse.plugins.code.plugin import CodePlugin
44 from muse.plugins.midi.plugin import MidiPlugin
45 from muse.core.paths import muse_dir
46
47 _OID_AAA = blob_id(b"aaa")
48 _OID_OLD = blob_id(b"old")
49 _OID_NEW = blob_id(b"new")
50 _OID_BASE = blob_id(b"base")
51 _OID_OURS = blob_id(b"ours")
52 _OID_THEIRS = blob_id(b"theirs")
53 _OID_K = blob_id(b"k")
54 _OID_FIXED = blob_id(b"fixed")
55
56
57 @pytest.fixture
58 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
59 dot_muse = muse_dir(tmp_path)
60 (dot_muse / "commits").mkdir(parents=True)
61 (dot_muse / "refs" / "heads").mkdir(parents=True)
62 return tmp_path
63
64
65 def _commit(root: pathlib.Path, cid: str, parent: str | None = None, parent2: str | None = None) -> str:
66 """Write a commit with a valid content-hash commit_id. Returns the actual commit_id."""
67 snap_id = compute_snapshot_id({})
68 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
69 parent_ids = [p for p in [parent, parent2] if p is not None]
70 commit_id = compute_commit_id(
71 parent_ids=parent_ids,
72 snapshot_id=snap_id,
73 message=cid,
74 committed_at_iso=committed_at.isoformat(),
75 )
76 write_commit(root, CommitRecord(
77 commit_id=commit_id,
78 branch="main",
79 snapshot_id=snap_id,
80 message=cid,
81 committed_at=committed_at,
82 parent_commit_id=parent,
83 parent2_commit_id=parent2,
84 ))
85 return commit_id
86
87
88 class TestDiffSnapshots:
89 def test_no_change(self) -> None:
90 m = {"a.mid": "h1", "b.mid": "h2"}
91 assert diff_snapshots(m, m) == set()
92
93 def test_added(self) -> None:
94 assert diff_snapshots({}, {"a.mid": "h1"}) == {"a.mid"}
95
96 def test_removed(self) -> None:
97 assert diff_snapshots({"a.mid": "h1"}, {}) == {"a.mid"}
98
99 def test_modified(self) -> None:
100 assert diff_snapshots({"a.mid": "old"}, {"a.mid": "new"}) == {"a.mid"}
101
102
103 class TestDetectConflicts:
104 def test_no_conflict_disjoint(self) -> None:
105 ours_m = {"a.mid": "h_a"}
106 theirs_m = {"b.mid": "h_b"}
107 assert detect_conflicts({"a.mid"}, {"b.mid"}, ours_m, theirs_m) == set()
108
109 def test_conflict_divergent_content(self) -> None:
110 ours_m = {"a.mid": "h_a", "b.mid": "h_b_ours"}
111 theirs_m = {"b.mid": "h_b_theirs", "c.mid": "h_c"}
112 assert detect_conflicts({"a.mid", "b.mid"}, {"b.mid", "c.mid"}, ours_m, theirs_m) == {"b.mid"}
113
114 def test_both_empty(self) -> None:
115 assert detect_conflicts(set(), set(), {}, {}) == set()
116
117 def test_convergent_both_delete(self) -> None:
118 """Both branches deleted the same file — convergent, NOT a conflict."""
119 ours_m: Manifest = {} # a.py deleted
120 theirs_m: Manifest = {} # a.py deleted
121 assert detect_conflicts({"a.py"}, {"a.py"}, ours_m, theirs_m) == set()
122
123 def test_convergent_same_add(self) -> None:
124 """Both branches independently added the same file with identical content."""
125 ours_m = {"new.py": "hash_n"}
126 theirs_m = {"new.py": "hash_n"}
127 assert detect_conflicts({"new.py"}, {"new.py"}, ours_m, theirs_m) == set()
128
129 def test_delete_vs_modify_is_conflict(self) -> None:
130 """One side deleted, other modified — genuinely divergent."""
131 ours_m: Manifest = {} # deleted a.py
132 theirs_m = {"a.py": "hash_new"} # modified a.py
133 assert detect_conflicts({"a.py"}, {"a.py"}, ours_m, theirs_m) == {"a.py"}
134
135
136 class TestApplyMerge:
137 def test_clean_merge(self) -> None:
138 base = {"a.mid": "h0", "b.mid": "h0"}
139 ours = {"a.mid": "h_ours", "b.mid": "h0"}
140 theirs = {"a.mid": "h0", "b.mid": "h_theirs"}
141 ours_changed = {"a.mid"}
142 theirs_changed = {"b.mid"}
143 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, set())
144 assert result == {"a.mid": "h_ours", "b.mid": "h_theirs"}
145
146 def test_conflict_paths_excluded(self) -> None:
147 base = {"a.mid": "h0"}
148 ours = {"a.mid": "h_ours"}
149 theirs = {"a.mid": "h_theirs"}
150 ours_changed = theirs_changed = {"a.mid"}
151 result = apply_merge(base, ours, theirs, ours_changed, theirs_changed, {"a.mid"})
152 assert result == {"a.mid": "h0"} # Falls back to base
153
154 def test_ours_deletion_applied(self) -> None:
155 base = {"a.mid": "h0", "b.mid": "h0"}
156 ours = {"b.mid": "h0"} # a.mid deleted on ours
157 theirs = {"a.mid": "h0", "b.mid": "h0"}
158 result = apply_merge(base, ours, theirs, {"a.mid"}, set(), set())
159 assert "a.mid" not in result
160
161
162 class TestMergeStateIO:
163 def test_write_and_read(self, repo: pathlib.Path) -> None:
164 base_id = long_id("b" * 64)
165 ours_id = long_id("1" * 64)
166 theirs_id = long_id("2" * 64)
167 write_merge_state(
168 repo,
169 base_commit=base_id,
170 ours_commit=ours_id,
171 theirs_commit=theirs_id,
172 conflict_paths=["a.mid", "b.mid"],
173 other_branch="feature/x",
174 )
175 state = read_merge_state(repo)
176 assert state is not None
177 assert state.base_commit == base_id
178 assert state.conflict_paths == ["a.mid", "b.mid"]
179 assert state.other_branch == "feature/x"
180
181 def test_read_no_state(self, repo: pathlib.Path) -> None:
182 assert read_merge_state(repo) is None
183
184 def test_clear(self, repo: pathlib.Path) -> None:
185 write_merge_state(repo, base_commit=long_id("b" * 64), ours_commit=long_id("c" * 64), theirs_commit=long_id("d" * 64), conflict_paths=[])
186 clear_merge_state(repo)
187 assert read_merge_state(repo) is None
188
189
190 class TestFindMergeBase:
191 def test_direct_parent(self, repo: pathlib.Path) -> None:
192 root_id = _commit(repo, "root")
193 a_id = _commit(repo, "a", parent=root_id)
194 b_id = _commit(repo, "b", parent=root_id)
195 base = find_merge_base(repo, a_id, b_id)
196 assert base == root_id
197
198 def test_same_commit(self, repo: pathlib.Path) -> None:
199 _commit(repo, "root")
200 base = find_merge_base(repo, "root", "root")
201 assert base == "root"
202
203 def test_linear_history(self, repo: pathlib.Path) -> None:
204 a_id = _commit(repo, "a")
205 b_id = _commit(repo, "b", parent=a_id)
206 c_id = _commit(repo, "c", parent=b_id)
207 base = find_merge_base(repo, c_id, b_id)
208 assert base == b_id
209
210 def test_no_common_ancestor(self, repo: pathlib.Path) -> None:
211 x_id = _commit(repo, "x")
212 y_id = _commit(repo, "y")
213 assert find_merge_base(repo, x_id, y_id) is None
214
215 def test_bidirectional_terminates_early(self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
216 """Bidirectional BFS reads O(distance_to_LCA) commits, not O(total_history)."""
217 import muse.core.graph as graph_mod
218 from muse.core.commits import read_commit
219
220 # 100-commit chain: root → c0 → ... → c97 → head_a
221 # ↘ feat (branches from c97)
222 # LCA = c97, one hop from each tip
223 root = _commit(repo, "root")
224 tip = root
225 for i in range(97):
226 tip = _commit(repo, f"c{i}", parent=tip)
227 lca = tip
228 head_a = _commit(repo, "head_a", parent=lca)
229 feat = _commit(repo, "feat", parent=lca)
230
231 call_count = 0
232 original = read_commit
233
234 def counting_read(rr: pathlib.Path, cid: str) -> CommitRecord | None:
235 nonlocal call_count
236 call_count += 1
237 return original(rr, cid)
238
239 monkeypatch.setattr(graph_mod, "read_commit", counting_read)
240
241 base = find_merge_base(repo, head_a, feat)
242 assert base == lca
243 # Bidirectional BFS finds LCA in ~2 reads (one per tip).
244 # Old two-phase BFS read all 99 A ancestors before touching B.
245 assert call_count <= 10
246
247 def test_deep_chain_diamond(self, repo: pathlib.Path) -> None:
248 """LCA correct for a long chain with diverging feature branches."""
249 root = _commit(repo, "root")
250 tip = root
251 for i in range(50):
252 tip = _commit(repo, f"m{i}", parent=tip)
253 lca = tip
254 branch_a = _commit(repo, "a0", parent=lca)
255 branch_b = _commit(repo, "b0", parent=lca)
256 for i in range(1, 5):
257 branch_a = _commit(repo, f"a{i}", parent=branch_a)
258 branch_b = _commit(repo, f"b{i}", parent=branch_b)
259 base = find_merge_base(repo, branch_a, branch_b)
260 assert base == lca
261
262
263 # ===========================================================================
264 # Structured merge engine integration tests
265 # ===========================================================================
266
267
268 def _ins(addr: str, pos: int | None, cid: str) -> InsertOp:
269 return InsertOp(op="insert", address=addr, position=pos, content_id=cid, content_summary=cid)
270
271
272 def _del(addr: str, pos: int | None, cid: str) -> DeleteOp:
273 return DeleteOp(op="delete", address=addr, position=pos, content_id=cid, content_summary=cid)
274
275
276 def _rep(addr: str, old: str, new: str) -> ReplaceOp:
277 return ReplaceOp(
278 op="replace",
279 address=addr,
280 position=None,
281 old_content_id=old,
282 new_content_id=new,
283 old_summary="old",
284 new_summary="new",
285 )
286
287
288 def _delta(ops: list[DomainOp]) -> StructuredDelta:
289 return StructuredDelta(domain="midi", ops=ops, summary="test")
290
291
292 class TestMergeStructuredIntegration:
293 """Verify merge_structured delegates correctly to merge_op_lists."""
294
295 def test_clean_non_overlapping_file_ops(self) -> None:
296 ours = _delta([_ins("a.mid", pos=0, cid="a-hash")])
297 theirs = _delta([_ins("b.mid", pos=0, cid="b-hash")])
298 result = merge_structured(_delta([]), ours, theirs)
299 assert result.is_clean is True
300 assert len(result.merged_ops) == 2
301
302 def test_conflicting_same_address_replaces_detected(self) -> None:
303 ours = _delta([_rep("shared.mid", "old", "v-ours")])
304 theirs = _delta([_rep("shared.mid", "old", "v-theirs")])
305 result = merge_structured(_delta([]), ours, theirs)
306 assert result.is_clean is False
307 assert len(result.conflict_ops) == 1
308
309 def test_base_ops_kept_by_both_sides_preserved(self) -> None:
310 shared = _ins("base.mid", pos=0, cid="base-cid")
311 result = merge_structured(
312 _delta([shared]),
313 _delta([shared]),
314 _delta([shared]),
315 )
316 assert result.is_clean is True
317 assert any(_op_key_tuple(op) == _op_key_tuple(shared) for op in result.merged_ops)
318
319 def test_position_adjustment_in_structured_merge(self) -> None:
320 """Non-conflicting note inserts get position-adjusted in structured merge."""
321 ours = _delta([_ins("lead.mid", pos=3, cid="note-A")])
322 theirs = _delta([_ins("lead.mid", pos=7, cid="note-B")])
323 result = merge_structured(_delta([]), ours, theirs)
324 assert result.is_clean is True
325 pos_by_cid = {
326 op["content_id"]: op["position"]
327 for op in result.merged_ops
328 if op["op"] == "insert"
329 }
330 # note-A(3): no theirs ≤ 3 → stays 3
331 assert pos_by_cid["note-A"] == 3
332 # note-B(7): ours A(3) ≤ 7 → 7+1 = 8
333 assert pos_by_cid["note-B"] == 8
334
335
336 def _op_key_tuple(op: DomainOp) -> tuple[str, ...]:
337 """Re-implementation of _op_key for test assertions."""
338 if op["op"] == "insert":
339 return ("insert", op["address"], str(op["position"]), op["content_id"])
340 if op["op"] == "delete":
341 return ("delete", op["address"], str(op["position"]), op["content_id"])
342 if op["op"] == "replace":
343 return ("replace", op["address"], str(op["position"]), op["old_content_id"], op["new_content_id"])
344 return (op["op"], op["address"])
345
346
347 class TestStructuredMergePluginProtocol:
348 """Verify MidiPlugin satisfies the AddressedMergePlugin protocol."""
349
350 def test_midi_plugin_isinstance_addressed_merge_plugin(self) -> None:
351 plugin = MidiPlugin()
352 assert isinstance(plugin, AddressedMergePlugin)
353
354 def test_merge_ops_non_conflicting_files_is_clean(self) -> None:
355 plugin = MidiPlugin()
356 base = SnapshotManifest(files={}, domain="midi")
357 ours_snap = SnapshotManifest(files={"a.mid": "hash-a"}, domain="midi")
358 theirs_snap = SnapshotManifest(files={"b.mid": "hash-b"}, domain="midi")
359 ours_ops: list[DomainOp] = [_ins("a.mid", pos=None, cid="hash-a")]
360 theirs_ops: list[DomainOp] = [_ins("b.mid", pos=None, cid="hash-b")]
361
362 result = plugin.merge_ops(
363 base, ours_snap, theirs_snap, ours_ops, theirs_ops
364 )
365 assert result.is_clean is True
366 assert "a.mid" in result.merged["files"]
367 assert "b.mid" in result.merged["files"]
368
369 def test_merge_ops_conflicting_same_file_replace_not_clean(self) -> None:
370 plugin = MidiPlugin()
371 base = SnapshotManifest(files={"f.mid": "base-hash"}, domain="midi")
372 ours_snap = SnapshotManifest(files={"f.mid": "ours-hash"}, domain="midi")
373 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-hash"}, domain="midi")
374 ours_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "ours-hash")]
375 theirs_ops: list[DomainOp] = [_rep("f.mid", "base-hash", "theirs-hash")]
376
377 result = plugin.merge_ops(
378 base, ours_snap, theirs_snap, ours_ops, theirs_ops
379 )
380 assert not result.is_clean
381 assert "f.mid" in result.conflicts
382
383 def test_merge_ops_ours_strategy_resolves_conflict(self) -> None:
384 plugin = MidiPlugin()
385 base = SnapshotManifest(files={"f.mid": "base"}, domain="midi")
386 ours_snap = SnapshotManifest(files={"f.mid": "ours-v"}, domain="midi")
387 theirs_snap = SnapshotManifest(files={"f.mid": "theirs-v"}, domain="midi")
388 ours_ops: list[DomainOp] = [_rep("f.mid", "base", "ours-v")]
389 theirs_ops: list[DomainOp] = [_rep("f.mid", "base", "theirs-v")]
390
391 result = plugin.merge_ops(
392 base,
393 ours_snap,
394 theirs_snap,
395 ours_ops,
396 theirs_ops,
397 )
398 # Without .museattributes the conflict stands — verify conflict is reported.
399 assert not result.is_clean
400
401 def test_merge_ops_delete_on_only_one_side_is_clean(self) -> None:
402 plugin = MidiPlugin()
403 base = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi")
404 ours_snap = SnapshotManifest(files={"keep.mid": "k"}, domain="midi")
405 theirs_snap = SnapshotManifest(files={"keep.mid": "k", "remove.mid": "r"}, domain="midi")
406 ours_ops: list[DomainOp] = [_del("remove.mid", pos=None, cid="r")]
407 theirs_ops: list[DomainOp] = []
408
409 result = plugin.merge_ops(
410 base, ours_snap, theirs_snap, ours_ops, theirs_ops
411 )
412 assert result.is_clean is True
413 assert "keep.mid" in result.merged["files"]
414 assert "remove.mid" not in result.merged["files"]
415
416 def test_merge_ops_empty_changes_returns_base(self) -> None:
417 plugin = MidiPlugin()
418 base = SnapshotManifest(files={"f.mid": "h"}, domain="midi")
419 result = plugin.merge_ops(base, base, base, [], [])
420 assert result.is_clean is True
421 assert result.merged["files"] == {"f.mid": "h"}
422
423
424 # ---------------------------------------------------------------------------
425 # Bug: "manual" attribute strategy on l==r paths causes false conflicts
426 #
427 # The `manual` strategy fires "even when the engine would auto-resolve" a
428 # DIVERGENT change (one side changed, the engine would take it automatically).
429 # It must NOT fire when both sides agree (l == r) — whether nothing changed
430 # (b == l == r) or both made the same convergent edit (b != l == r).
431 #
432 # Regression for: muse merge task/core-cat → 73 false conflicts in muse/core/**
433 # caused by [[rules]] path="muse/core/**" strategy="manual" in .museattributes.
434 # ---------------------------------------------------------------------------
435
436 _DUMMY_ROOT = pathlib.Path("/nonexistent-repo-for-testing")
437
438
439 def _code_plugin() -> CodePlugin:
440 return CodePlugin()
441
442
443 def _snap(files: Mapping[str, str]) -> SnapshotManifest:
444 return SnapshotManifest(files=files, domain="code")
445
446
447 def _manual_attrs() -> list[AttributeRule]:
448 return [AttributeRule(path_pattern="core/**", dimension="*", strategy="manual", priority=100)]
449
450
451 def _merge_with_manual(
452 base: SnapshotManifest,
453 ours: SnapshotManifest,
454 theirs: SnapshotManifest,
455 ) -> MergeResult:
456 plugin = _code_plugin()
457 with unittest.mock.patch(
458 "muse.plugins.code.plugin.load_attributes",
459 return_value=_manual_attrs(),
460 ):
461 return plugin.merge(base, ours, theirs, repo_root=_DUMMY_ROOT)
462
463
464 class TestManualStrategyUnchangedFiles:
465 """manual fires for single-branch changes but NEVER for l == r paths."""
466
467 def test_unchanged_no_conflict(self) -> None:
468 """b == l == r: neither branch touched the file → no conflict."""
469 base = _snap({"core/store.py": _OID_AAA})
470 result = _merge_with_manual(base, base, base)
471 assert result.is_clean
472 assert "core/store.py" not in result.conflicts
473
474 def test_convergent_same_change_no_conflict(self) -> None:
475 """b != l == r: both independently made the same edit → convergent, no conflict."""
476 base = _snap({"core/store.py": _OID_OLD})
477 same = _snap({"core/store.py": _OID_NEW})
478 result = _merge_with_manual(base, same, same)
479 assert result.is_clean
480 assert "core/store.py" not in result.conflicts
481
482 def test_only_ours_changed_manual_forces_conflict(self) -> None:
483 """b == theirs != ours: one side changed → manual forces human review."""
484 base = _snap({"core/store.py": _OID_OLD})
485 ours = _snap({"core/store.py": _OID_NEW})
486 theirs = _snap({"core/store.py": _OID_OLD})
487 result = _merge_with_manual(base, ours, theirs)
488 assert "core/store.py" in result.conflicts
489
490 def test_only_theirs_changed_manual_forces_conflict(self) -> None:
491 """b == ours != theirs: one side changed → manual forces human review."""
492 base = _snap({"core/store.py": _OID_OLD})
493 ours = _snap({"core/store.py": _OID_OLD})
494 theirs = _snap({"core/store.py": _OID_NEW})
495 result = _merge_with_manual(base, ours, theirs)
496 assert "core/store.py" in result.conflicts
497
498 def test_divergent_changes_conflict(self) -> None:
499 """Both changed differently → conflict regardless."""
500 base = _snap({"core/store.py": _OID_BASE})
501 ours = _snap({"core/store.py": _OID_OURS})
502 theirs = _snap({"core/store.py": _OID_THEIRS})
503 result = _merge_with_manual(base, ours, theirs)
504 assert "core/store.py" in result.conflicts
505
506 def test_73_unchanged_plus_one_real_conflict(self) -> None:
507 """Regression: 73 unchanged core files + 1 real conflict → only 1 conflict."""
508 unchanged = {f"core/file_{i}.py": blob_id(f"file_{i}".encode()) for i in range(73)}
509 base_files = {**unchanged, "core/store.py": _OID_BASE}
510 ours_files = {**unchanged, "core/store.py": _OID_OURS}
511 theirs_files = {**unchanged, "core/store.py": _OID_THEIRS}
512 result = _merge_with_manual(
513 _snap(base_files), _snap(ours_files), _snap(theirs_files)
514 )
515 false_conflicts = [p for p in result.conflicts if p != "core/store.py"]
516 assert false_conflicts == [], f"{len(false_conflicts)} false conflicts: {false_conflicts[:5]}"
517 assert "core/store.py" in result.conflicts
518
519
520 # ---------------------------------------------------------------------------
521 # Bug: one-sided changes must NEVER produce false conflicts
522 #
523 # When only one branch changes a file (b == ours or b == theirs), the merge
524 # must take the changed side cleanly — no conflict, no manual review needed.
525 # This covers both the file-level merge() path and the operation-level
526 # merge_ops() path used by the code plugin (AddressedMergePlugin).
527 # ---------------------------------------------------------------------------
528
529
530 class TestOneSidedChangeNeverConflicts:
531 """One side changes a file, other side doesn't → always clean."""
532
533 def test_only_theirs_changed_no_conflict(self) -> None:
534 base = _snap({"pyproject.toml": _OID_OLD, "describe.py": _OID_OLD})
535 ours = _snap({"pyproject.toml": _OID_OLD, "describe.py": _OID_OLD})
536 theirs = _snap({"pyproject.toml": _OID_NEW, "describe.py": _OID_FIXED})
537 result = _code_plugin().merge(base, ours, theirs, repo_root=None)
538 assert result.is_clean
539 assert result.conflicts == []
540 assert result.merged["files"]["pyproject.toml"] == _OID_NEW
541 assert result.merged["files"]["describe.py"] == _OID_FIXED
542
543 def test_only_ours_changed_no_conflict(self) -> None:
544 base = _snap({"a.py": _OID_OLD})
545 ours = _snap({"a.py": _OID_OURS})
546 theirs = _snap({"a.py": _OID_OLD})
547 result = _code_plugin().merge(base, ours, theirs, repo_root=None)
548 assert result.is_clean
549 assert result.conflicts == []
550 assert result.merged["files"]["a.py"] == _OID_OURS
551
552 def test_theirs_deleted_file_ours_untouched(self) -> None:
553 base = _snap({"gone.py": _OID_OLD, "keep.py": _OID_K})
554 ours = _snap({"gone.py": _OID_OLD, "keep.py": _OID_K})
555 theirs = _snap({"keep.py": _OID_K})
556 result = _code_plugin().merge(base, ours, theirs, repo_root=None)
557 assert result.is_clean
558 assert "gone.py" not in result.merged["files"]
559
560 def test_merge_ops_one_sided_no_conflict(self) -> None:
561 """merge_ops() must not flag one-sided changes as conflicts."""
562 plugin = _code_plugin()
563 base = _snap({"pyproject.toml": _OID_OLD})
564 ours = _snap({"pyproject.toml": _OID_OLD})
565 theirs = _snap({"pyproject.toml": _OID_NEW})
566 ours_delta: StructuredDelta = {"ops": [], "summary": "", "domain": "code"}
567 theirs_delta: StructuredDelta = {
568 "ops": [{"op": "patch", "address": "pyproject.toml", "child_ops": [],
569 "file_change": "modified", "content_summary": ""}],
570 "summary": "1 change",
571 "domain": "code",
572 }
573 result = plugin.merge_ops(base, ours, theirs, ours_delta["ops"], theirs_delta["ops"])
574 assert result.conflicts == [], f"False conflicts: {result.conflicts}"
575
576
577 # ---------------------------------------------------------------------------
578 # Bug: merge commit with two parents must pass write_commit hash verification
579 #
580 # compute_commit_id and _verify_commit_id must be symmetric for merge commits
581 # (two parents). If they disagree, write_commit raises ValueError and the
582 # merge cannot be completed — data is permanently stuck.
583 # ---------------------------------------------------------------------------
584
585
586 class TestMergeCommitHashVerification:
587 """write_commit must accept merge commits (two parents) without raising."""
588
589 def test_two_parent_commit_passes_verification(self, repo: pathlib.Path) -> None:
590 parent1 = _commit(repo, "ours")
591 parent2 = _commit(repo, "theirs")
592 # A merge commit with both parents must write cleanly.
593 _commit(repo, "merge", parent=parent1, parent2=parent2)
594
595 def test_merge_commit_id_is_deterministic(self, repo: pathlib.Path) -> None:
596 """Same inputs → same commit_id regardless of parent order in the list."""
597 snap_id = compute_snapshot_id({})
598 committed_at = datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc)
599 p1 = _commit(repo, "p1")
600 p2 = _commit(repo, "p2")
601 id_ab = compute_commit_id(parent_ids=[p1, p2], snapshot_id=snap_id, message="merge", committed_at_iso=committed_at.isoformat())
602 id_ba = compute_commit_id(parent_ids=[p2, p1], snapshot_id=snap_id, message="merge", committed_at_iso=committed_at.isoformat())
603 assert id_ab == id_ba, "Merge commit ID must be order-independent"
604
605 def test_verify_sees_same_id_as_compute(self, repo: pathlib.Path) -> None:
606 """_verify_commit_id (called inside write_commit) must agree with compute_commit_id."""
607 from muse.core.commits import read_commit
608 parent1 = _commit(repo, "ours")
609 parent2 = _commit(repo, "theirs")
610 merge_id = _commit(repo, "merge", parent=parent1, parent2=parent2)
611 # If write_commit succeeded, read_commit must return the record intact.
612 rec = read_commit(repo, merge_id)
613 assert rec is not None
614 assert rec.commit_id == merge_id
615 assert rec.parent_commit_id == parent1
616 assert rec.parent2_commit_id == parent2
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago