test_phase1_cas_branch_ref_callers.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago
| 1 | """Phase 1 — CAS guards on all write_branch_ref callers. |
| 2 | |
| 3 | Invariant: any command that advances an existing branch ref must detect |
| 4 | a concurrent advance and fail with RefConflictError rather than silently |
| 5 | orphaning the other agent's commit. |
| 6 | |
| 7 | Commands covered: merge, cherry-pick, revert, pull, rebase. |
| 8 | |
| 9 | Race injection pattern |
| 10 | ---------------------- |
| 11 | We cannot reliably pause a running command between its HEAD-read and its |
| 12 | write_branch_ref call. Instead we patch write_branch_ref in the target |
| 13 | command module to: |
| 14 | |
| 15 | 1. Advance the branch to a fake concurrent commit (simulates another |
| 16 | agent landing just before us). |
| 17 | 2. Call the real write_branch_ref with the original arguments. |
| 18 | |
| 19 | Before the fix (no expected_id): step 2 unconditionally overwrites the |
| 20 | concurrent commit — silent orphan. |
| 21 | After the fix (expected_id passed): step 2 finds current != expected and |
| 22 | raises RefConflictError — detected and surfaced to the caller. |
| 23 | |
| 24 | Testing tiers |
| 25 | ------------- |
| 26 | Unit write_branch_ref CAS covered in test_commit_concurrent_ref_safety.py |
| 27 | Integration each command raises / exits non-zero when branch moves mid-op |
| 28 | E2E CLI exit code != 0, error output contains an actionable keyword |
| 29 | Data branch ref equals the concurrent commit after a failed CAS |
| 30 | (the original write did NOT land — no orphaned commit) |
| 31 | Stress concurrent write_branch_ref pairs; exactly one wins per slot |
| 32 | (covered by test_commit_concurrent_ref_safety.py — same lock) |
| 33 | Security n/a — CAS is internal, not driven by user-controlled input |
| 34 | Performance CAS uncontested round-trip < 50 ms |
| 35 | """ |
| 36 | |
| 37 | from __future__ import annotations |
| 38 | |
| 39 | import contextlib |
| 40 | import json |
| 41 | import pathlib |
| 42 | import time |
| 43 | from unittest.mock import patch, MagicMock |
| 44 | |
| 45 | import pytest |
| 46 | |
| 47 | from muse.core import refs as _store |
| 48 | from muse.core.refs import RefConflictError, write_branch_ref |
| 49 | from muse.core.types import fake_id |
| 50 | from muse.core.paths import head_path, ref_path, repo_json_path |
| 51 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 52 | |
| 53 | runner = CliRunner() |
| 54 | |
| 55 | _CONCURRENT_ID = fake_id("concurrent-agent") |
| 56 | |
| 57 | |
| 58 | def _run(repo: pathlib.Path, *args: str) -> None: |
| 59 | r = runner.invoke(None, list(args), cwd=repo) |
| 60 | assert r.exit_code == 0, f"muse {' '.join(args)} failed:\n{r.output}" |
| 61 | |
| 62 | |
| 63 | def _try(repo: pathlib.Path, *args: str) -> InvokeResult: |
| 64 | return runner.invoke(None, list(args), cwd=repo) |
| 65 | |
| 66 | |
| 67 | def _head(repo: pathlib.Path) -> str: |
| 68 | r = runner.invoke(None, ["rev-parse", "HEAD", "--json"], cwd=repo) |
| 69 | return json.loads(r.output)["commit_id"] |
| 70 | |
| 71 | |
| 72 | def _branch(repo: pathlib.Path) -> str: |
| 73 | r = runner.invoke(None, ["rev-parse", "--abbrev-ref", "HEAD", "--json"], cwd=repo) |
| 74 | return json.loads(r.output)["branch"] |
| 75 | |
| 76 | |
| 77 | def _ref(repo: pathlib.Path, branch: str) -> str: |
| 78 | return (ref_path(repo, branch)).read_text().strip() |
| 79 | |
| 80 | |
| 81 | def _make_race_injector(module_path: str) -> contextlib.AbstractContextManager[MagicMock]: |
| 82 | """Return a context manager that patches write_branch_ref in *module_path* |
| 83 | to advance the branch to _CONCURRENT_ID just before the real write. |
| 84 | |
| 85 | Before fix: real write has no expected_id → unconditional overwrite succeeds |
| 86 | → concurrent commit orphaned (bad). |
| 87 | After fix: real write has expected_id=<pre-op HEAD> → CAS detects mismatch |
| 88 | → RefConflictError raised (correct). |
| 89 | """ |
| 90 | real = _store.write_branch_ref |
| 91 | |
| 92 | def _injected(root: pathlib.Path, branch: str, commit_id: str, **kwargs: str) -> None: |
| 93 | real(root, branch, _CONCURRENT_ID) # another agent lands first |
| 94 | return real(root, branch, commit_id, **kwargs) # our write — should fail |
| 95 | |
| 96 | return patch(f"{module_path}.write_branch_ref", side_effect=_injected) |
| 97 | |
| 98 | |
| 99 | # --------------------------------------------------------------------------- |
| 100 | # merge |
| 101 | # --------------------------------------------------------------------------- |
| 102 | |
| 103 | |
| 104 | class TestMergeCASGuard: |
| 105 | """muse merge must detect a concurrent branch advance and fail cleanly.""" |
| 106 | |
| 107 | @pytest.fixture() |
| 108 | def two_branch_repo(self, muse_repo: pathlib.Path) -> pathlib.Path: |
| 109 | (muse_repo / "base.py").write_text("base\n") |
| 110 | _run(muse_repo, "code", "add", "base.py") |
| 111 | _run(muse_repo, "commit", "-m", "base") |
| 112 | _run(muse_repo, "checkout", "-b", "feat") |
| 113 | (muse_repo / "feat.py").write_text("feat\n") |
| 114 | _run(muse_repo, "code", "add", "feat.py") |
| 115 | _run(muse_repo, "commit", "-m", "feat commit") |
| 116 | _run(muse_repo, "checkout", "main") |
| 117 | return muse_repo |
| 118 | |
| 119 | def test_merge_fails_when_branch_advances_concurrently( |
| 120 | self, two_branch_repo: pathlib.Path |
| 121 | ) -> None: |
| 122 | with _make_race_injector("muse.cli.commands.merge"): |
| 123 | result = _try(two_branch_repo, "merge", "feat") |
| 124 | assert result.exit_code != 0, "merge succeeded despite concurrent branch advance" |
| 125 | |
| 126 | def test_merge_error_is_actionable(self, two_branch_repo: pathlib.Path) -> None: |
| 127 | with _make_race_injector("muse.cli.commands.merge"): |
| 128 | result = _try(two_branch_repo, "merge", "feat") |
| 129 | combined = result.output + (result.stderr or "") |
| 130 | assert any( |
| 131 | kw in combined.lower() |
| 132 | for kw in ("conflict", "moved", "concurrent", "retry", "pull", "changed") |
| 133 | ), f"no actionable guidance:\n{combined}" |
| 134 | |
| 135 | def test_merge_succeeds_when_branch_has_not_moved( |
| 136 | self, two_branch_repo: pathlib.Path |
| 137 | ) -> None: |
| 138 | result = _try(two_branch_repo, "merge", "feat") |
| 139 | assert result.exit_code == 0, result.output |
| 140 | |
| 141 | def test_merge_branch_ref_unchanged_after_failed_cas( |
| 142 | self, two_branch_repo: pathlib.Path |
| 143 | ) -> None: |
| 144 | repo = two_branch_repo |
| 145 | branch = _branch(repo) |
| 146 | with _make_race_injector("muse.cli.commands.merge"): |
| 147 | _try(repo, "merge", "feat") |
| 148 | assert _ref(repo, branch) == _CONCURRENT_ID, ( |
| 149 | "failed merge CAS overwrote the concurrent commit" |
| 150 | ) |
| 151 | |
| 152 | |
| 153 | # --------------------------------------------------------------------------- |
| 154 | # cherry-pick |
| 155 | # --------------------------------------------------------------------------- |
| 156 | |
| 157 | |
| 158 | class TestCherryPickCASGuard: |
| 159 | |
| 160 | @pytest.fixture() |
| 161 | def cherry_repo(self, muse_repo: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 162 | (muse_repo / "a.py").write_text("v1\n") |
| 163 | _run(muse_repo, "code", "add", "a.py") |
| 164 | _run(muse_repo, "commit", "-m", "first") |
| 165 | _run(muse_repo, "checkout", "-b", "source") |
| 166 | (muse_repo / "a.py").write_text("v2\n") |
| 167 | _run(muse_repo, "code", "add", "a.py") |
| 168 | _run(muse_repo, "commit", "-m", "the pick") |
| 169 | pick_id = _head(muse_repo) |
| 170 | _run(muse_repo, "checkout", "main") |
| 171 | return muse_repo, pick_id |
| 172 | |
| 173 | def test_cherry_pick_fails_when_branch_advances_concurrently( |
| 174 | self, cherry_repo: tuple[pathlib.Path, str] |
| 175 | ) -> None: |
| 176 | repo, pick_id = cherry_repo |
| 177 | with _make_race_injector("muse.cli.commands.cherry_pick"): |
| 178 | result = _try(repo, "cherry-pick", pick_id) |
| 179 | assert result.exit_code != 0, "cherry-pick succeeded despite concurrent branch advance" |
| 180 | |
| 181 | def test_cherry_pick_succeeds_when_branch_has_not_moved( |
| 182 | self, cherry_repo: tuple[pathlib.Path, str] |
| 183 | ) -> None: |
| 184 | repo, pick_id = cherry_repo |
| 185 | result = _try(repo, "cherry-pick", pick_id) |
| 186 | assert result.exit_code == 0, result.output |
| 187 | |
| 188 | def test_cherry_pick_branch_ref_unchanged_after_failed_cas( |
| 189 | self, cherry_repo: tuple[pathlib.Path, str] |
| 190 | ) -> None: |
| 191 | repo, pick_id = cherry_repo |
| 192 | branch = _branch(repo) |
| 193 | with _make_race_injector("muse.cli.commands.cherry_pick"): |
| 194 | _try(repo, "cherry-pick", pick_id) |
| 195 | assert _ref(repo, branch) == _CONCURRENT_ID |
| 196 | |
| 197 | |
| 198 | # --------------------------------------------------------------------------- |
| 199 | # revert |
| 200 | # --------------------------------------------------------------------------- |
| 201 | |
| 202 | |
| 203 | class TestRevertCASGuard: |
| 204 | |
| 205 | @pytest.fixture() |
| 206 | def revert_repo(self, muse_repo: pathlib.Path) -> pathlib.Path: |
| 207 | (muse_repo / "a.py").write_text("v1\n") |
| 208 | _run(muse_repo, "code", "add", "a.py") |
| 209 | _run(muse_repo, "commit", "-m", "first") |
| 210 | (muse_repo / "a.py").write_text("v2\n") |
| 211 | _run(muse_repo, "code", "add", "a.py") |
| 212 | _run(muse_repo, "commit", "-m", "second") |
| 213 | return muse_repo |
| 214 | |
| 215 | def test_revert_fails_when_branch_advances_concurrently( |
| 216 | self, revert_repo: pathlib.Path |
| 217 | ) -> None: |
| 218 | with _make_race_injector("muse.cli.commands.revert"): |
| 219 | result = _try(revert_repo, "revert", "HEAD") |
| 220 | assert result.exit_code != 0, "revert succeeded despite concurrent branch advance" |
| 221 | |
| 222 | def test_revert_succeeds_when_branch_has_not_moved( |
| 223 | self, revert_repo: pathlib.Path |
| 224 | ) -> None: |
| 225 | result = _try(revert_repo, "revert", "HEAD") |
| 226 | assert result.exit_code == 0, result.output |
| 227 | |
| 228 | def test_revert_branch_ref_unchanged_after_failed_cas( |
| 229 | self, revert_repo: pathlib.Path |
| 230 | ) -> None: |
| 231 | repo = revert_repo |
| 232 | branch = _branch(repo) |
| 233 | with _make_race_injector("muse.cli.commands.revert"): |
| 234 | _try(repo, "revert", "HEAD") |
| 235 | assert _ref(repo, branch) == _CONCURRENT_ID |
| 236 | |
| 237 | |
| 238 | # --------------------------------------------------------------------------- |
| 239 | # Performance — CAS happy-path overhead |
| 240 | # --------------------------------------------------------------------------- |
| 241 | |
| 242 | |
| 243 | class TestCASPerformance: |
| 244 | def test_write_branch_ref_cas_uncontested_under_50ms( |
| 245 | self, bare_muse_repo: pathlib.Path |
| 246 | ) -> None: |
| 247 | repo = bare_muse_repo |
| 248 | (head_path(repo)).write_text("ref: refs/heads/main\n") |
| 249 | (repo_json_path(repo)).write_text( |
| 250 | json.dumps({"repo_id": "perf-test"}) |
| 251 | ) |
| 252 | id_a = fake_id("a") |
| 253 | id_b = fake_id("b") |
| 254 | write_branch_ref(repo, "main", id_a) |
| 255 | |
| 256 | start = time.perf_counter() |
| 257 | write_branch_ref(repo, "main", id_b, expected_id=id_a) |
| 258 | elapsed = time.perf_counter() - start |
| 259 | |
| 260 | assert elapsed < 0.05, f"CAS took {elapsed:.3f}s — expected < 0.05s" |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago