test_checkout_integrity.py
python
sha256:f81d1ddcc1eec1eaccc667f22067476a0cf138860691611738c0c799df24a1db
fixing more failing tests
Human
3 days ago
| 1 | """Tests for checkout data-integrity guarantees. |
| 2 | |
| 3 | Coverage tiers |
| 4 | -------------- |
| 5 | Unit — read_checkout_head / checkout_head_path helpers. |
| 6 | Integration — pre-flight aborts cleanly (zero mutations), CHECKOUT_HEAD |
| 7 | marker lifecycle, status detection of interrupted checkout. |
| 8 | End-to-end — full CLI round-trips: missing object, simulated mid-flight |
| 9 | kill (marker left behind), recovery via retry checkout. |
| 10 | Stress — rapid branch switching never leaves a stale marker. |
| 11 | |
| 12 | Key invariants under test |
| 13 | ------------------------- |
| 14 | 1. Pre-flight: if any restore object is absent from the store, checkout |
| 15 | prints an error and does NOT modify any file on disk. |
| 16 | 2. CHECKOUT_HEAD written before first mutation; removed on success. |
| 17 | 3. ``muse status`` (text + JSON) detects a stale marker and warns loudly. |
| 18 | 4. A successful checkout on retry clears the stale marker. |
| 19 | 5. Interrupted checkout leaves HEAD pointing to the *old* branch. |
| 20 | """ |
| 21 | |
| 22 | from __future__ import annotations |
| 23 | |
| 24 | import json |
| 25 | import os |
| 26 | import pathlib |
| 27 | import shutil |
| 28 | |
| 29 | import pytest |
| 30 | |
| 31 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 32 | from muse.core.types import MsgpackDict |
| 33 | from muse.core.refs import ( |
| 34 | get_head_commit_id, |
| 35 | read_current_branch, |
| 36 | ) |
| 37 | from muse.core.object_store import has_object |
| 38 | from muse.cli.commands.checkout import checkout_head_path, read_checkout_head |
| 39 | |
| 40 | runner = CliRunner() |
| 41 | |
| 42 | # ────────────────────────────────────────────────────────────────────────────── |
| 43 | # Helpers |
| 44 | # ────────────────────────────────────────────────────────────────────────────── |
| 45 | |
| 46 | |
| 47 | def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: |
| 48 | saved = os.getcwd() |
| 49 | try: |
| 50 | os.chdir(repo) |
| 51 | return runner.invoke(None, args) |
| 52 | finally: |
| 53 | os.chdir(saved) |
| 54 | |
| 55 | |
| 56 | def _commit(repo: pathlib.Path, msg: str = "commit") -> InvokeResult: |
| 57 | _invoke(repo, ["code", "add", "."]) |
| 58 | return _invoke(repo, ["commit", "-m", msg]) |
| 59 | |
| 60 | |
| 61 | def _checkout(repo: pathlib.Path, branch: str, *flags: str) -> InvokeResult: |
| 62 | return _invoke(repo, ["checkout", *flags, branch]) |
| 63 | |
| 64 | |
| 65 | def _status_json(repo: pathlib.Path) -> MsgpackDict: |
| 66 | r = _invoke(repo, ["status", "--json"]) |
| 67 | return json.loads(r.output) |
| 68 | |
| 69 | |
| 70 | def _status_text(repo: pathlib.Path) -> tuple[str, str]: |
| 71 | """Return (stdout, stderr) of muse status.""" |
| 72 | r = _invoke(repo, ["status"]) |
| 73 | return r.output, (r.stderr or "") |
| 74 | |
| 75 | |
| 76 | # ────────────────────────────────────────────────────────────────────────────── |
| 77 | # Fixtures |
| 78 | # ────────────────────────────────────────────────────────────────────────────── |
| 79 | |
| 80 | |
| 81 | @pytest.fixture() |
| 82 | def two_branch_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 83 | """Repo with main and feat branches, each with distinct files. |
| 84 | |
| 85 | main: a.py |
| 86 | feat: a.py (unchanged) + b.py (feat-only) |
| 87 | """ |
| 88 | saved = os.getcwd() |
| 89 | try: |
| 90 | os.chdir(tmp_path) |
| 91 | runner.invoke(None, ["init"]) |
| 92 | finally: |
| 93 | os.chdir(saved) |
| 94 | (tmp_path / "a.py").write_text("x = 1\n") |
| 95 | _commit(tmp_path, "main initial") |
| 96 | _invoke(tmp_path, ["checkout", "-b", "feat"]) |
| 97 | (tmp_path / "b.py").write_text("y = 2\n") |
| 98 | _commit(tmp_path, "feat adds b.py") |
| 99 | _checkout(tmp_path, "main") |
| 100 | return tmp_path |
| 101 | |
| 102 | |
| 103 | # ────────────────────────────────────────────────────────────────────────────── |
| 104 | # Unit: helper functions |
| 105 | # ────────────────────────────────────────────────────────────────────────────── |
| 106 | |
| 107 | |
| 108 | class TestCheckoutHeadHelpers: |
| 109 | def test_checkout_head_path_points_into_muse_dir(self, two_branch_repo: pathlib.Path) -> None: |
| 110 | p = checkout_head_path(two_branch_repo) |
| 111 | assert p.parent == two_branch_repo / ".muse" |
| 112 | assert p.name == "CHECKOUT_HEAD" |
| 113 | |
| 114 | def test_read_checkout_head_returns_none_when_absent(self, two_branch_repo: pathlib.Path) -> None: |
| 115 | assert read_checkout_head(two_branch_repo) is None |
| 116 | |
| 117 | def test_read_checkout_head_returns_content_when_present(self, two_branch_repo: pathlib.Path) -> None: |
| 118 | marker = checkout_head_path(two_branch_repo) |
| 119 | marker.write_text("feat\n", encoding="utf-8") |
| 120 | assert read_checkout_head(two_branch_repo) == "feat" |
| 121 | marker.unlink() |
| 122 | |
| 123 | def test_read_checkout_head_strips_trailing_newline(self, two_branch_repo: pathlib.Path) -> None: |
| 124 | marker = checkout_head_path(two_branch_repo) |
| 125 | marker.write_text("feat\n\n", encoding="utf-8") |
| 126 | assert read_checkout_head(two_branch_repo) == "feat" |
| 127 | marker.unlink() |
| 128 | |
| 129 | def test_read_checkout_head_empty_file_returns_none(self, two_branch_repo: pathlib.Path) -> None: |
| 130 | marker = checkout_head_path(two_branch_repo) |
| 131 | marker.write_text("", encoding="utf-8") |
| 132 | assert read_checkout_head(two_branch_repo) is None |
| 133 | marker.unlink() |
| 134 | |
| 135 | |
| 136 | # ────────────────────────────────────────────────────────────────────────────── |
| 137 | # Integration: pre-flight object existence check |
| 138 | # ────────────────────────────────────────────────────────────────────────────── |
| 139 | |
| 140 | |
| 141 | class TestPreflightObjectCheck: |
| 142 | """Pre-flight: abort before any mutation if a restore object is missing.""" |
| 143 | |
| 144 | def _sabotage_object(self, repo: pathlib.Path, rel_path: str) -> pathlib.Path: |
| 145 | """Remove the object backing *rel_path* from the store; return its path.""" |
| 146 | from muse.core.refs import get_head_commit_id |
| 147 | from muse.core.commits import read_commit |
| 148 | from muse.core.snapshots import read_snapshot |
| 149 | from muse.core.object_store import _object_path_with_fallback |
| 150 | |
| 151 | # Switch to feat first so its snapshot is current, then read its manifest |
| 152 | _checkout(repo, "feat") |
| 153 | branch = read_current_branch(repo) |
| 154 | commit_id = get_head_commit_id(repo, branch) |
| 155 | assert commit_id |
| 156 | commit = read_commit(repo, commit_id) |
| 157 | assert commit |
| 158 | snap = read_snapshot(repo, commit.snapshot_id) |
| 159 | assert snap |
| 160 | obj_id = snap.manifest[rel_path] |
| 161 | obj_path = _object_path_with_fallback(repo, obj_id) |
| 162 | assert obj_path.exists(), f"Object for {rel_path} not in store" |
| 163 | # Back on main before sabotaging |
| 164 | _checkout(repo, "main") |
| 165 | # Now remove the object |
| 166 | obj_path.unlink() |
| 167 | return obj_path |
| 168 | |
| 169 | def test_missing_object_exits_nonzero(self, two_branch_repo: pathlib.Path) -> None: |
| 170 | self._sabotage_object(two_branch_repo, "b.py") |
| 171 | result = _checkout(two_branch_repo, "feat") |
| 172 | assert result.exit_code != 0 |
| 173 | |
| 174 | def test_missing_object_error_on_stderr(self, two_branch_repo: pathlib.Path) -> None: |
| 175 | self._sabotage_object(two_branch_repo, "b.py") |
| 176 | result = _checkout(two_branch_repo, "feat") |
| 177 | err = result.stderr or result.output |
| 178 | assert "missing" in err.lower() or "object" in err.lower() |
| 179 | |
| 180 | def test_missing_object_working_tree_unchanged(self, two_branch_repo: pathlib.Path) -> None: |
| 181 | """Zero mutations — b.py must not appear on main after a failed checkout.""" |
| 182 | self._sabotage_object(two_branch_repo, "b.py") |
| 183 | _checkout(two_branch_repo, "feat") |
| 184 | # Working tree must not contain b.py — no partial mutations |
| 185 | assert not (two_branch_repo / "b.py").exists() |
| 186 | |
| 187 | def test_missing_object_a_py_unchanged(self, two_branch_repo: pathlib.Path) -> None: |
| 188 | """Files that would be kept unchanged must also remain untouched.""" |
| 189 | self._sabotage_object(two_branch_repo, "b.py") |
| 190 | _checkout(two_branch_repo, "feat") |
| 191 | # a.py was already on main and unchanged; must still be present |
| 192 | assert (two_branch_repo / "a.py").exists() |
| 193 | |
| 194 | def test_missing_object_head_stays_on_old_branch(self, two_branch_repo: pathlib.Path) -> None: |
| 195 | """HEAD must not advance when pre-flight aborts.""" |
| 196 | self._sabotage_object(two_branch_repo, "b.py") |
| 197 | _checkout(two_branch_repo, "feat") |
| 198 | assert read_current_branch(two_branch_repo) == "main" |
| 199 | |
| 200 | def test_missing_object_no_checkout_head_marker(self, two_branch_repo: pathlib.Path) -> None: |
| 201 | """Pre-flight abort fires before any marker is written.""" |
| 202 | self._sabotage_object(two_branch_repo, "b.py") |
| 203 | _checkout(two_branch_repo, "feat") |
| 204 | # Marker must NOT be present — pre-flight aborted before any mutation |
| 205 | assert read_checkout_head(two_branch_repo) is None |
| 206 | |
| 207 | def test_stderr_names_the_missing_file(self, two_branch_repo: pathlib.Path) -> None: |
| 208 | self._sabotage_object(two_branch_repo, "b.py") |
| 209 | result = _checkout(two_branch_repo, "feat") |
| 210 | err = result.stderr or result.output |
| 211 | assert "b.py" in err |
| 212 | |
| 213 | def test_stderr_says_working_tree_not_modified(self, two_branch_repo: pathlib.Path) -> None: |
| 214 | self._sabotage_object(two_branch_repo, "b.py") |
| 215 | result = _checkout(two_branch_repo, "feat") |
| 216 | err = result.stderr or result.output |
| 217 | assert "NOT modified" in err or "not modified" in err.lower() |
| 218 | |
| 219 | |
| 220 | # ────────────────────────────────────────────────────────────────────────────── |
| 221 | # Integration: CHECKOUT_HEAD marker lifecycle |
| 222 | # ────────────────────────────────────────────────────────────────────────────── |
| 223 | |
| 224 | |
| 225 | class TestCheckoutHeadMarker: |
| 226 | """CHECKOUT_HEAD written before mutations, cleared after success.""" |
| 227 | |
| 228 | def test_marker_absent_after_clean_checkout(self, two_branch_repo: pathlib.Path) -> None: |
| 229 | _checkout(two_branch_repo, "feat") |
| 230 | assert read_checkout_head(two_branch_repo) is None |
| 231 | |
| 232 | def test_marker_absent_after_round_trip(self, two_branch_repo: pathlib.Path) -> None: |
| 233 | _checkout(two_branch_repo, "feat") |
| 234 | _checkout(two_branch_repo, "main") |
| 235 | assert read_checkout_head(two_branch_repo) is None |
| 236 | |
| 237 | def test_stale_marker_survives_failed_checkout(self, two_branch_repo: pathlib.Path) -> None: |
| 238 | """Manually plant a stale marker; it must persist (not auto-cleared).""" |
| 239 | marker = checkout_head_path(two_branch_repo) |
| 240 | marker.write_text("feat\n", encoding="utf-8") |
| 241 | # Successful checkout to main should clear it |
| 242 | _checkout(two_branch_repo, "main") |
| 243 | # main is already current, but checkout still fires the snapshot path |
| 244 | # which should clear the marker on success |
| 245 | # (Already on main — 'already_on' path does NOT call _checkout_snapshot, |
| 246 | # so the marker is unaffected. Plant while NOT on main.) |
| 247 | # Reset: switch to feat first, plant marker, switch back |
| 248 | _checkout(two_branch_repo, "feat") |
| 249 | marker.write_text("main\n", encoding="utf-8") |
| 250 | _checkout(two_branch_repo, "main") |
| 251 | assert read_checkout_head(two_branch_repo) is None |
| 252 | |
| 253 | def test_simulated_interrupted_marker_persists(self, two_branch_repo: pathlib.Path) -> None: |
| 254 | """Simulate kill mid-checkout: plant marker, verify it stays.""" |
| 255 | marker = checkout_head_path(two_branch_repo) |
| 256 | marker.write_text("feat\n", encoding="utf-8") |
| 257 | # Do not run checkout — marker lingers |
| 258 | assert read_checkout_head(two_branch_repo) == "feat" |
| 259 | marker.unlink() |
| 260 | |
| 261 | def test_retry_checkout_clears_stale_marker(self, two_branch_repo: pathlib.Path) -> None: |
| 262 | """Retry of the interrupted checkout must clear the marker.""" |
| 263 | # Simulate interrupted checkout of feat (marker left behind, |
| 264 | # b.py not restored) |
| 265 | marker = checkout_head_path(two_branch_repo) |
| 266 | marker.write_text("feat\n", encoding="utf-8") |
| 267 | # Retry — should succeed and clear marker |
| 268 | result = _checkout(two_branch_repo, "feat") |
| 269 | assert result.exit_code == 0 |
| 270 | assert read_checkout_head(two_branch_repo) is None |
| 271 | |
| 272 | def test_marker_records_target_branch_name(self, two_branch_repo: pathlib.Path) -> None: |
| 273 | """After a successful checkout the marker is gone; its content was the target.""" |
| 274 | # We can't easily inspect the marker mid-flight without mocking, |
| 275 | # but we can write it ourselves and verify read_checkout_head returns it. |
| 276 | marker = checkout_head_path(two_branch_repo) |
| 277 | marker.write_text("feat\n") |
| 278 | assert read_checkout_head(two_branch_repo) == "feat" |
| 279 | marker.unlink() |
| 280 | |
| 281 | |
| 282 | # ────────────────────────────────────────────────────────────────────────────── |
| 283 | # Integration: muse status detects interrupted checkout |
| 284 | # ────────────────────────────────────────────────────────────────────────────── |
| 285 | |
| 286 | |
| 287 | class TestStatusDetectsInterruptedCheckout: |
| 288 | """muse status warns loudly when CHECKOUT_HEAD exists.""" |
| 289 | |
| 290 | def test_json_checkout_interrupted_false_normally(self, two_branch_repo: pathlib.Path) -> None: |
| 291 | data = _status_json(two_branch_repo) |
| 292 | assert data["checkout_interrupted"] is False |
| 293 | |
| 294 | def test_json_checkout_target_null_normally(self, two_branch_repo: pathlib.Path) -> None: |
| 295 | data = _status_json(two_branch_repo) |
| 296 | assert data["checkout_target"] is None |
| 297 | |
| 298 | def test_json_checkout_interrupted_true_with_marker(self, two_branch_repo: pathlib.Path) -> None: |
| 299 | marker = checkout_head_path(two_branch_repo) |
| 300 | marker.write_text("feat\n") |
| 301 | try: |
| 302 | data = _status_json(two_branch_repo) |
| 303 | assert data["checkout_interrupted"] is True |
| 304 | finally: |
| 305 | marker.unlink(missing_ok=True) |
| 306 | |
| 307 | def test_json_checkout_target_set_with_marker(self, two_branch_repo: pathlib.Path) -> None: |
| 308 | marker = checkout_head_path(two_branch_repo) |
| 309 | marker.write_text("feat\n") |
| 310 | try: |
| 311 | data = _status_json(two_branch_repo) |
| 312 | assert data["checkout_target"] == "feat" |
| 313 | finally: |
| 314 | marker.unlink(missing_ok=True) |
| 315 | |
| 316 | def test_json_keys_always_present(self, two_branch_repo: pathlib.Path) -> None: |
| 317 | data = _status_json(two_branch_repo) |
| 318 | assert "checkout_interrupted" in data |
| 319 | assert "checkout_target" in data |
| 320 | |
| 321 | def test_text_status_warns_on_interrupted_checkout(self, two_branch_repo: pathlib.Path) -> None: |
| 322 | marker = checkout_head_path(two_branch_repo) |
| 323 | marker.write_text("feat\n") |
| 324 | try: |
| 325 | _, stderr = _status_text(two_branch_repo) |
| 326 | assert "CHECKOUT INTERRUPTED" in stderr or "CHECKOUT INTERRUPTED" in stderr.upper() |
| 327 | finally: |
| 328 | marker.unlink(missing_ok=True) |
| 329 | |
| 330 | def test_text_status_names_target_branch(self, two_branch_repo: pathlib.Path) -> None: |
| 331 | marker = checkout_head_path(two_branch_repo) |
| 332 | marker.write_text("feat\n") |
| 333 | try: |
| 334 | _, stderr = _status_text(two_branch_repo) |
| 335 | assert "feat" in stderr |
| 336 | finally: |
| 337 | marker.unlink(missing_ok=True) |
| 338 | |
| 339 | def test_text_status_mentions_retry_command(self, two_branch_repo: pathlib.Path) -> None: |
| 340 | marker = checkout_head_path(two_branch_repo) |
| 341 | marker.write_text("feat\n") |
| 342 | try: |
| 343 | _, stderr = _status_text(two_branch_repo) |
| 344 | assert "muse checkout" in stderr |
| 345 | finally: |
| 346 | marker.unlink(missing_ok=True) |
| 347 | |
| 348 | def test_text_status_clean_no_warning(self, two_branch_repo: pathlib.Path) -> None: |
| 349 | _, stderr = _status_text(two_branch_repo) |
| 350 | assert "CHECKOUT INTERRUPTED" not in stderr |
| 351 | |
| 352 | |
| 353 | # ────────────────────────────────────────────────────────────────────────────── |
| 354 | # End-to-end: full recovery flow |
| 355 | # ────────────────────────────────────────────────────────────────────────────── |
| 356 | |
| 357 | |
| 358 | class TestCheckoutIntegrityE2E: |
| 359 | """Full end-to-end scenarios for the checkout integrity system.""" |
| 360 | |
| 361 | def test_successful_checkout_leaves_clean_status(self, two_branch_repo: pathlib.Path) -> None: |
| 362 | _checkout(two_branch_repo, "feat") |
| 363 | data = _status_json(two_branch_repo) |
| 364 | assert data["checkout_interrupted"] is False |
| 365 | assert data["checkout_target"] is None |
| 366 | assert data["clean"] is True |
| 367 | |
| 368 | def test_interrupted_checkout_then_status_then_retry(self, two_branch_repo: pathlib.Path) -> None: |
| 369 | """Full recovery flow: interrupt → status warns → retry → clean.""" |
| 370 | # Simulate interruption: plant marker and delete b.py as if |
| 371 | # the checkout partially ran (deleted files but didn't restore yet) |
| 372 | marker = checkout_head_path(two_branch_repo) |
| 373 | marker.write_text("feat\n") |
| 374 | |
| 375 | # status should warn |
| 376 | data = _status_json(two_branch_repo) |
| 377 | assert data["checkout_interrupted"] is True |
| 378 | |
| 379 | # retry the checkout — should succeed and clean up |
| 380 | result = _checkout(two_branch_repo, "feat") |
| 381 | assert result.exit_code == 0 |
| 382 | |
| 383 | # marker gone, status clean |
| 384 | assert read_checkout_head(two_branch_repo) is None |
| 385 | data2 = _status_json(two_branch_repo) |
| 386 | assert data2["checkout_interrupted"] is False |
| 387 | assert data2["checkout_target"] is None |
| 388 | |
| 389 | def test_interrupted_checkout_head_unchanged(self, two_branch_repo: pathlib.Path) -> None: |
| 390 | """HEAD must still point to the old branch after a simulated interruption.""" |
| 391 | # On main, plant a marker pretending we were switching to feat |
| 392 | marker = checkout_head_path(two_branch_repo) |
| 393 | marker.write_text("feat\n") |
| 394 | # HEAD has not changed — we only wrote the marker, didn't run checkout |
| 395 | assert read_current_branch(two_branch_repo) == "main" |
| 396 | marker.unlink() |
| 397 | |
| 398 | def test_b_py_present_after_recovery_checkout(self, two_branch_repo: pathlib.Path) -> None: |
| 399 | """After successful retry checkout to feat, feat-only file must exist.""" |
| 400 | # First do a clean checkout to feat to confirm it works |
| 401 | result = _checkout(two_branch_repo, "feat") |
| 402 | assert result.exit_code == 0 |
| 403 | assert (two_branch_repo / "b.py").exists() |
| 404 | |
| 405 | def test_b_py_absent_after_checkout_back_to_main(self, two_branch_repo: pathlib.Path) -> None: |
| 406 | """After checking back out to main, feat-only file must be gone.""" |
| 407 | _checkout(two_branch_repo, "feat") |
| 408 | result = _checkout(two_branch_repo, "main") |
| 409 | assert result.exit_code == 0 |
| 410 | assert not (two_branch_repo / "b.py").exists() |
| 411 | |
| 412 | def test_preflight_then_retry_with_restored_object( |
| 413 | self, two_branch_repo: pathlib.Path |
| 414 | ) -> None: |
| 415 | """Remove an object, verify clean abort, restore object, verify checkout succeeds.""" |
| 416 | from muse.core.refs import get_head_commit_id |
| 417 | from muse.core.commits import read_commit |
| 418 | from muse.core.snapshots import read_snapshot |
| 419 | from muse.core.object_store import _object_path_with_fallback |
| 420 | |
| 421 | # Read the object ID for b.py from the feat snapshot |
| 422 | _checkout(two_branch_repo, "feat") |
| 423 | commit_id = get_head_commit_id(two_branch_repo, "feat") |
| 424 | commit = read_commit(two_branch_repo, commit_id) |
| 425 | snap = read_snapshot(two_branch_repo, commit.snapshot_id) |
| 426 | obj_id = snap.manifest["b.py"] |
| 427 | obj_path = _object_path_with_fallback(two_branch_repo, obj_id) |
| 428 | # Save contents before sabotage |
| 429 | saved = obj_path.read_bytes() |
| 430 | |
| 431 | _checkout(two_branch_repo, "main") |
| 432 | obj_path.unlink() |
| 433 | |
| 434 | # Pre-flight fails cleanly |
| 435 | result = _checkout(two_branch_repo, "feat") |
| 436 | assert result.exit_code != 0 |
| 437 | assert not (two_branch_repo / "b.py").exists() |
| 438 | assert read_current_branch(two_branch_repo) == "main" |
| 439 | |
| 440 | # Restore the object (simulating a fetch) |
| 441 | obj_path.write_bytes(saved) |
| 442 | |
| 443 | # Retry succeeds |
| 444 | result2 = _checkout(two_branch_repo, "feat") |
| 445 | assert result2.exit_code == 0 |
| 446 | assert (two_branch_repo / "b.py").exists() |
| 447 | assert read_current_branch(two_branch_repo) == "feat" |
| 448 | |
| 449 | |
| 450 | # ────────────────────────────────────────────────────────────────────────────── |
| 451 | # Stress: rapid switching never leaves a stale marker |
| 452 | # ────────────────────────────────────────────────────────────────────────────── |
| 453 | |
| 454 | |
| 455 | class TestCheckoutIntegrityStress: |
| 456 | def test_rapid_switching_no_stale_marker(self, two_branch_repo: pathlib.Path) -> None: |
| 457 | """Switching branches 50 times must never leave CHECKOUT_HEAD behind.""" |
| 458 | for i in range(25): |
| 459 | r1 = _checkout(two_branch_repo, "feat") |
| 460 | assert r1.exit_code == 0, f"iter {i}: checkout feat failed" |
| 461 | assert read_checkout_head(two_branch_repo) is None, f"iter {i}: marker after feat checkout" |
| 462 | r2 = _checkout(two_branch_repo, "main") |
| 463 | assert r2.exit_code == 0, f"iter {i}: checkout main failed" |
| 464 | assert read_checkout_head(two_branch_repo) is None, f"iter {i}: marker after main checkout" |
| 465 | |
| 466 | def test_status_json_schema_stable_across_branches(self, two_branch_repo: pathlib.Path) -> None: |
| 467 | """checkout_interrupted and checkout_target always present in JSON output.""" |
| 468 | required = {"checkout_interrupted", "checkout_target"} |
| 469 | for branch in ("feat", "main", "feat", "main"): |
| 470 | _checkout(two_branch_repo, branch) |
| 471 | data = _status_json(two_branch_repo) |
| 472 | missing = required - data.keys() |
| 473 | assert not missing, f"Missing keys after checkout to {branch}: {missing}" |
| 474 | |
| 475 | |
| 476 | # ────────────────────────────────────────────────────────────────────────────── |
| 477 | # Regression: checkout must update HEAD to the target branch |
| 478 | # ────────────────────────────────────────────────────────────────────────────── |
| 479 | |
| 480 | |
| 481 | class TestCheckoutHeadUpdate: |
| 482 | """Regression tests for the _ref_path-not-imported bug. |
| 483 | |
| 484 | Previously, checkout exited with code 1 (NameError on _ref_path) and left |
| 485 | HEAD pointing at the old branch. Every subsequent commit then landed on |
| 486 | the wrong branch. These tests pin the invariant that a successful checkout |
| 487 | always updates HEAD. |
| 488 | """ |
| 489 | |
| 490 | def test_checkout_updates_head_to_target_branch( |
| 491 | self, two_branch_repo: pathlib.Path |
| 492 | ) -> None: |
| 493 | """Switching to feat must update HEAD; switching back must restore main.""" |
| 494 | assert read_current_branch(two_branch_repo) == "main" |
| 495 | |
| 496 | result = _checkout(two_branch_repo, "feat") |
| 497 | assert result.exit_code == 0, f"checkout feat failed: {result.output}" |
| 498 | assert read_current_branch(two_branch_repo) == "feat", ( |
| 499 | "HEAD must point to feat after successful checkout" |
| 500 | ) |
| 501 | |
| 502 | result = _checkout(two_branch_repo, "main") |
| 503 | assert result.exit_code == 0, f"checkout main failed: {result.output}" |
| 504 | assert read_current_branch(two_branch_repo) == "main", ( |
| 505 | "HEAD must point to main after switching back" |
| 506 | ) |
| 507 | |
| 508 | def test_commit_after_checkout_lands_on_correct_branch( |
| 509 | self, two_branch_repo: pathlib.Path |
| 510 | ) -> None: |
| 511 | """A commit made after checkout must advance the target branch tip, not main.""" |
| 512 | from muse.core.refs import get_head_commit_id |
| 513 | |
| 514 | tip_main_before = get_head_commit_id(two_branch_repo, "main") |
| 515 | |
| 516 | result = _checkout(two_branch_repo, "feat") |
| 517 | assert result.exit_code == 0 |
| 518 | |
| 519 | # Write a new file and commit on feat |
| 520 | (two_branch_repo / "on_feat.py").write_text("x = 1\n") |
| 521 | _invoke(two_branch_repo, ["code", "add", "on_feat.py"]) |
| 522 | _invoke(two_branch_repo, ["commit", "-m", "test: commit on feat"]) |
| 523 | |
| 524 | # main tip must be unchanged |
| 525 | tip_main_after = get_head_commit_id(two_branch_repo, "main") |
| 526 | assert tip_main_before == tip_main_after, ( |
| 527 | "Commit after checkout feat must not advance main" |
| 528 | ) |
| 529 | |
| 530 | # feat tip must have advanced |
| 531 | tip_feat = get_head_commit_id(two_branch_repo, "feat") |
| 532 | assert tip_feat != tip_main_after, ( |
| 533 | "Commit after checkout feat must advance feat, not main" |
| 534 | ) |
File History
1 commit
sha256:f81d1ddcc1eec1eaccc667f22067476a0cf138860691611738c0c799df24a1db
fixing more failing tests
Human
3 days ago