gabriel / muse public
test_commit_concurrent_ref_safety.py python
280 lines 10.8 KB
Raw
sha256:f6cd81bc71702f5c1c6890bd39aaba994fe58c75f019d7c03934724fa2739bb4 fix: carry dev changes harmony dropped in merge — detached … Sonnet 4.6 minor ⚠ breaking 16 days ago
1 """Concurrent ref-update safety for muse commit.
2
3 Before this fix, write_branch_ref was an unconditional overwrite.
4 Two agents committing to the same branch simultaneously would silently
5 orphan one commit — the second write_branch_ref call would overwrite
6 the first with no detection, dropping a commit from the branch history.
7
8 Fix:
9 write_branch_ref gains an optional expected_id parameter. When
10 provided, the write only proceeds if the current ref value matches.
11 If it doesn't match (another agent advanced the ref first), it raises
12 RefConflictError with a clear message.
13
14 commit passes parent_id as expected_id so any concurrent advance of
15 the branch ref between the parent_id read and the ref write is caught
16 and reported as a retryable error rather than silently overwriting.
17
18 Test structure:
19 Tier 1 — write_branch_ref CAS unit tests
20 Tier 2 — commit detects branch-moved condition
21 Tier 3 — concurrent commit threads: only one succeeds per slot
22 """
23
24 from __future__ import annotations
25
26 import pathlib
27 import threading
28 from typing import TYPE_CHECKING
29
30 import pytest
31
32 from muse.core.refs import write_branch_ref
33 from muse.core.types import fake_id
34 from muse.core.paths import heads_dir, muse_dir
35 from tests.cli_test_helper import CliRunner, InvokeResult
36
37 if TYPE_CHECKING:
38 pass
39
40 runner = CliRunner()
41
42
43 def _run(repo: pathlib.Path, *args: str) -> None:
44 r = runner.invoke(None, list(args), cwd=repo)
45 assert r.exit_code == 0, f"muse {' '.join(args)} failed:\n{r.output}"
46
47
48 def _try(repo: pathlib.Path, *args: str) -> InvokeResult:
49 return runner.invoke(None, list(args), cwd=repo)
50
51
52 def _make_bare_refs(tmp_path: pathlib.Path) -> pathlib.Path:
53 """Minimal .muse/refs/heads/ tree — enough for write_branch_ref."""
54 import json
55 muse = muse_dir(tmp_path)
56 for d in ("objects", "commits", "snapshots", "refs/heads"):
57 (muse / d).mkdir(parents=True, exist_ok=True)
58 (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
59 (muse / "HEAD").write_text("ref: refs/heads/main\n")
60 return tmp_path
61
62
63 # ---------------------------------------------------------------------------
64 # Tier 1 — write_branch_ref CAS unit tests
65 # ---------------------------------------------------------------------------
66
67
68 class TestWriteBranchRefCAS:
69 def test_unconditional_write_succeeds(self, tmp_path: pathlib.Path) -> None:
70 repo = _make_bare_refs(tmp_path)
71 id_a = fake_id("a")
72 write_branch_ref(repo, "main", id_a)
73 ref = (heads_dir(repo) / "main").read_text().strip()
74 assert ref == id_a
75
76 def test_cas_succeeds_when_expected_matches(self, tmp_path: pathlib.Path) -> None:
77 repo = _make_bare_refs(tmp_path)
78 id_a = fake_id("a")
79 id_b = fake_id("b")
80 write_branch_ref(repo, "main", id_a)
81 write_branch_ref(repo, "main", id_b, expected_id=id_a)
82 ref = (heads_dir(repo) / "main").read_text().strip()
83 assert ref == id_b
84
85 def test_cas_fails_when_expected_does_not_match(self, tmp_path: pathlib.Path) -> None:
86 from muse.core.refs import RefConflictError
87 repo = _make_bare_refs(tmp_path)
88 id_a = fake_id("a")
89 id_b = fake_id("b")
90 id_c = fake_id("c")
91 write_branch_ref(repo, "main", id_a)
92 with pytest.raises(RefConflictError):
93 write_branch_ref(repo, "main", id_c, expected_id=id_b)
94 # ref must be unchanged
95 ref = (heads_dir(repo) / "main").read_text().strip()
96 assert ref == id_a
97
98 def test_cas_fails_when_ref_is_missing_and_expected_is_set(self, tmp_path: pathlib.Path) -> None:
99 from muse.core.refs import RefConflictError
100 repo = _make_bare_refs(tmp_path)
101 id_a = fake_id("a")
102 id_b = fake_id("b")
103 # No ref written yet — expected_id=id_a should fail (ref is None, not id_a)
104 with pytest.raises(RefConflictError):
105 write_branch_ref(repo, "main", id_b, expected_id=id_a)
106
107 def test_cas_succeeds_on_first_write_when_expected_is_none(self, tmp_path: pathlib.Path) -> None:
108 repo = _make_bare_refs(tmp_path)
109 id_a = fake_id("a")
110 # expected_id=None means "no prior value" — valid for first commit on a branch
111 write_branch_ref(repo, "main", id_a, expected_id=None)
112 ref = (heads_dir(repo) / "main").read_text().strip()
113 assert ref == id_a
114
115 def test_error_contains_branch_name_and_ids(self, tmp_path: pathlib.Path) -> None:
116 from muse.core.refs import RefConflictError
117 repo = _make_bare_refs(tmp_path)
118 id_a = fake_id("a")
119 id_b = fake_id("b")
120 id_c = fake_id("c")
121 write_branch_ref(repo, "dev", id_a)
122 with pytest.raises(RefConflictError, match="dev"):
123 write_branch_ref(repo, "dev", id_c, expected_id=id_b)
124
125
126 # ---------------------------------------------------------------------------
127 # Tier 2 — commit detects branch-moved condition
128 #
129 # The race window is between get_head_commit_id (parent read) and
130 # write_branch_ref (ref advance). We simulate it by patching
131 # get_head_commit_id to return a stale ID while the branch ref has already
132 # been advanced to a different commit by a "concurrent" agent.
133 # ---------------------------------------------------------------------------
134
135
136 class TestCommitDetectsBranchMoved:
137 def test_commit_fails_when_branch_advanced_concurrently(
138 self, muse_repo: pathlib.Path
139 ) -> None:
140 """If the branch ref advances between parent-read and ref-write, commit
141 must fail with a clear error rather than silently orphaning a commit."""
142 from unittest.mock import patch
143 import json as _json
144
145 f = muse_repo / "a.py"
146 f.write_text("v1\n")
147 _run(muse_repo, "code", "add", "a.py")
148 _run(muse_repo, "commit", "-m", "first")
149
150 # Capture the real HEAD after the first commit
151 head_r = runner.invoke(None, ["rev-parse", "HEAD", "--json"], cwd=muse_repo)
152 real_parent = _json.loads(head_r.output)["commit_id"]
153
154 branch_r = runner.invoke(None, ["rev-parse", "--abbrev-ref", "HEAD", "--json"], cwd=muse_repo)
155 branch = _json.loads(branch_r.output)["branch"]
156
157 # Stage a change for the second commit
158 f.write_text("v2\n")
159 _run(muse_repo, "code", "add", "a.py")
160
161 # Advance the branch ref to simulate another agent committing first
162 other_id = fake_id("other")
163 write_branch_ref(muse_repo, branch, other_id)
164
165 # Patch get_head_commit_id so commit sees the STALE parent (real_parent),
166 # while the branch ref on disk is already at other_id.
167 # This exactly reproduces the race: parent_id read → branch advances → write fails.
168 with patch(
169 "muse.cli.commands.commit.get_head_commit_id",
170 return_value=real_parent,
171 ):
172 result = _try(muse_repo, "commit", "-m", "ours — should fail")
173
174 assert result.exit_code != 0, (
175 "commit succeeded despite branch moving concurrently — "
176 "this would have orphaned the concurrent commit"
177 )
178
179 def test_commit_succeeds_when_branch_has_not_moved(
180 self, muse_repo: pathlib.Path
181 ) -> None:
182 """Normal single-agent commit still works after the CAS fix."""
183 f = muse_repo / "b.py"
184 f.write_text("hello\n")
185 _run(muse_repo, "code", "add", "b.py")
186 r = _try(muse_repo, "commit", "-m", "normal commit")
187 assert r.exit_code == 0, r.output
188
189 def test_commit_error_message_is_actionable(
190 self, muse_repo: pathlib.Path
191 ) -> None:
192 """The error surfaced to the user must tell them to pull/retry."""
193 from unittest.mock import patch
194 import json as _json
195
196 f = muse_repo / "c.py"
197 f.write_text("v1\n")
198 _run(muse_repo, "code", "add", "c.py")
199 _run(muse_repo, "commit", "-m", "first")
200
201 head_r = runner.invoke(None, ["rev-parse", "HEAD", "--json"], cwd=muse_repo)
202 real_parent = _json.loads(head_r.output)["commit_id"]
203
204 branch_r = runner.invoke(None, ["rev-parse", "--abbrev-ref", "HEAD", "--json"], cwd=muse_repo)
205 branch = _json.loads(branch_r.output)["branch"]
206
207 f.write_text("v2\n")
208 _run(muse_repo, "code", "add", "c.py")
209
210 write_branch_ref(muse_repo, branch, fake_id("concurrent"))
211
212 with patch(
213 "muse.cli.commands.commit.get_head_commit_id",
214 return_value=real_parent,
215 ):
216 result = _try(muse_repo, "commit", "-m", "should fail")
217
218 combined = result.output + (result.stderr or "")
219 assert any(
220 kw in combined.lower()
221 for kw in ("conflict", "moved", "concurrent", "retry", "pull", "changed")
222 ), f"error output gave no actionable guidance:\n{combined}"
223
224
225 # ---------------------------------------------------------------------------
226 # Tier 3 — concurrent commit threads: only one succeeds per slot
227 #
228 # Two threads both read parent_id = A, both build a commit, both try to
229 # write the branch ref with expected_id=A. The first rename wins; the
230 # second finds current != A and raises RefConflictError.
231 # ---------------------------------------------------------------------------
232
233
234 class TestConcurrentCommitThreads:
235 def test_only_one_of_two_concurrent_commits_wins(
236 self, muse_repo: pathlib.Path
237 ) -> None:
238 """Two threads racing on write_branch_ref with the same expected_id:
239 exactly one must succeed and one must raise RefConflictError."""
240 from muse.core.refs import RefConflictError
241
242 (muse_repo / "base.py").write_text("base\n")
243 _run(muse_repo, "code", "add", "base.py")
244 _run(muse_repo, "commit", "-m", "base")
245
246 import json as _json
247 head_r = runner.invoke(None, ["rev-parse", "HEAD", "--json"], cwd=muse_repo)
248 current = _json.loads(head_r.output)["commit_id"]
249 branch_r = runner.invoke(None, ["rev-parse", "--abbrev-ref", "HEAD", "--json"], cwd=muse_repo)
250 branch = _json.loads(branch_r.output)["branch"]
251
252 id_a = fake_id("commit-a")
253 id_b = fake_id("commit-b")
254
255 results: list[bool] = [] # True = success, False = RefConflictError
256 lock = threading.Lock()
257 barrier = threading.Barrier(2)
258
259 def do_cas(new_id: str) -> None:
260 barrier.wait() # both threads start the CAS at the same moment
261 try:
262 write_branch_ref(muse_repo, branch, new_id, expected_id=current)
263 with lock:
264 results.append(True)
265 except RefConflictError:
266 with lock:
267 results.append(False)
268
269 t1 = threading.Thread(target=do_cas, args=(id_a,))
270 t2 = threading.Thread(target=do_cas, args=(id_b,))
271 t1.start()
272 t2.start()
273 t1.join()
274 t2.join()
275
276 successes = results.count(True)
277 assert successes == 1, (
278 f"expected exactly 1 CAS success, got {successes}. results: {results}"
279 )
280 assert results.count(False) == 1
File History 2 commits
sha256:43c82f6d4fa2e85dd9ed9dd1a31199ec6b481191517aba66dfa9da275dbfa1af Merge branch 'dev' into main Human 2 days ago
sha256:fb67fed5a4d3e40de84bdd163de94ef1386570bef1dd1a020a732c8a038962ce Merge branch 'dev' into main Human 21 days ago