test_rebase_supercharge.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Supercharged tests for ``muse rebase`` — TDD for all gaps. |
| 2 | |
| 3 | Covers every JSON output path for: |
| 4 | - ``duration_ms`` (float, milliseconds) |
| 5 | - ``exit_code`` (int, 0/1/3) |
| 6 | - ``replayed_commit_ids`` (list[str], sha256:-prefixed) |
| 7 | |
| 8 | Covers sha256:-prefix correctness: |
| 9 | - ``_resolve_ref_to_id`` with sha256:-prefixed content in ref files |
| 10 | - ``_short_id`` keeps the sha256: prefix and truncates only the hex portion |
| 11 | - ``new_head``/``onto`` in JSON are sha256:-prefixed |
| 12 | |
| 13 | Covers all integration and lifecycle paths: |
| 14 | - completed (normal), aborted, up_to_date, conflict, dry_run, status, squash |
| 15 | |
| 16 | Security, performance, and stress: |
| 17 | - symlink guard on REBASE_STATE.json (load, save, clear) |
| 18 | - size cap on REBASE_STATE.json |
| 19 | - 50-commit dry-run, concurrent status reads |
| 20 | """ |
| 21 | |
| 22 | from __future__ import annotations |
| 23 | from collections.abc import Mapping |
| 24 | |
| 25 | import datetime |
| 26 | import argparse |
| 27 | import json |
| 28 | import pathlib |
| 29 | import threading |
| 30 | import time |
| 31 | |
| 32 | import pytest |
| 33 | |
| 34 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 35 | from muse.core.object_store import write_object |
| 36 | from muse.core.rebase import ( |
| 37 | RebaseState, |
| 38 | _MAX_STATE_BYTES, |
| 39 | clear_rebase_state, |
| 40 | collect_commits_to_replay, |
| 41 | get_rebase_progress, |
| 42 | load_rebase_state, |
| 43 | save_rebase_state, |
| 44 | ) |
| 45 | from muse.core.paths import muse_dir, rebase_state_path, ref_path |
| 46 | from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id |
| 47 | from muse.core.commits import ( |
| 48 | CommitRecord, |
| 49 | write_commit, |
| 50 | ) |
| 51 | from muse.core.snapshots import ( |
| 52 | SnapshotRecord, |
| 53 | write_snapshot, |
| 54 | ) |
| 55 | from muse.core.types import Manifest, blob_id, long_id, short_id |
| 56 | |
| 57 | runner = CliRunner() |
| 58 | _REPO_ID = "rebase-supercharge-test" |
| 59 | |
| 60 | |
| 61 | # --------------------------------------------------------------------------- |
| 62 | # Helpers |
| 63 | # --------------------------------------------------------------------------- |
| 64 | |
| 65 | |
| 66 | def _oid(content: bytes) -> str: |
| 67 | """Return a sha256:-prefixed object ID.""" |
| 68 | return blob_id(content) |
| 69 | |
| 70 | |
| 71 | _counter = 0 |
| 72 | _counter_lock = threading.Lock() |
| 73 | |
| 74 | |
| 75 | def _make_commit( |
| 76 | root: pathlib.Path, |
| 77 | parent_id: str | None = None, |
| 78 | content: bytes = b"data", |
| 79 | branch: str = "main", |
| 80 | ) -> str: |
| 81 | """Create a commit with correct sha256:-prefixed object IDs. Returns the commit ID.""" |
| 82 | global _counter |
| 83 | with _counter_lock: |
| 84 | _counter += 1 |
| 85 | c_val = _counter |
| 86 | c = content + str(c_val).encode() |
| 87 | obj_id = _oid(c) |
| 88 | write_object(root, obj_id, c) |
| 89 | manifest: Manifest = {f"f_{c_val}.txt": obj_id} |
| 90 | snap_id = compute_snapshot_id(manifest) |
| 91 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 92 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 93 | parent_ids = [parent_id] if parent_id else [] |
| 94 | commit_id = compute_commit_id( |
| 95 | parent_ids=parent_ids, |
| 96 | snapshot_id=snap_id, |
| 97 | message=f"commit {c_val}", |
| 98 | committed_at_iso=committed_at.isoformat(), |
| 99 | ) |
| 100 | write_commit(root, CommitRecord( |
| 101 | commit_id=commit_id, |
| 102 | branch=branch, |
| 103 | snapshot_id=snap_id, |
| 104 | message=f"commit {c_val}", |
| 105 | committed_at=committed_at, |
| 106 | parent_commit_id=parent_id, |
| 107 | )) |
| 108 | (ref_path(root, branch)).write_text(commit_id, encoding="utf-8") |
| 109 | return commit_id |
| 110 | |
| 111 | |
| 112 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 113 | muse = muse_dir(path) |
| 114 | for d in ("commits", "snapshots", "objects", "refs/heads"): |
| 115 | (muse / d).mkdir(parents=True, exist_ok=True) |
| 116 | (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 117 | (muse / "repo.json").write_text( |
| 118 | json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" |
| 119 | ) |
| 120 | return path |
| 121 | |
| 122 | |
| 123 | def _env(repo: pathlib.Path) -> Mapping[str, str]: |
| 124 | return {"MUSE_REPO_ROOT": str(repo)} |
| 125 | |
| 126 | |
| 127 | def _invoke(args: list[str], repo: pathlib.Path) -> InvokeResult: |
| 128 | return runner.invoke(None, args, env=_env(repo)) |
| 129 | |
| 130 | |
| 131 | def _json_from(output: str) -> Mapping[str, object]: |
| 132 | for line in output.splitlines(): |
| 133 | line = line.strip() |
| 134 | if line.startswith("{"): |
| 135 | return json.loads(line) |
| 136 | return json.loads(output.strip()) |
| 137 | |
| 138 | |
| 139 | # --------------------------------------------------------------------------- |
| 140 | # _short_id helper — prefix is canonical, only hex portion is truncated |
| 141 | # --------------------------------------------------------------------------- |
| 142 | |
| 143 | |
| 144 | class TestShortId: |
| 145 | """_short_id keeps the sha256: prefix and truncates only the hex portion.""" |
| 146 | |
| 147 | def test_short_id_keeps_prefix(self, tmp_path: pathlib.Path) -> None: |
| 148 | """_short_id must keep the sha256: prefix — it is canonical in Muse.""" |
| 149 | |
| 150 | cid = long_id("a" * 64) |
| 151 | result = short_id(cid) |
| 152 | assert result.startswith("sha256:"), f"Expected sha256: prefix, got {result!r}" |
| 153 | |
| 154 | def test_short_id_truncates_hex_to_12(self, tmp_path: pathlib.Path) -> None: |
| 155 | """_short_id returns sha256: + first 12 hex chars.""" |
| 156 | |
| 157 | cid = long_id("deadbeef" * 8) |
| 158 | result = short_id(cid) |
| 159 | assert result == "sha256:deadbeefdead" # prefix + 12 hex chars |
| 160 | |
| 161 | def test_short_id_total_length(self, tmp_path: pathlib.Path) -> None: |
| 162 | """sha256: (7) + 12 hex chars = 19 total chars.""" |
| 163 | |
| 164 | cid = long_id("cafebabe" * 8) |
| 165 | result = short_id(cid) |
| 166 | assert len(result) == 19 # "sha256:" (7) + 12 hex chars |
| 167 | |
| 168 | def test_short_id_bare_hex_passthrough(self, tmp_path: pathlib.Path) -> None: |
| 169 | """_short_id with a bare hex string (no prefix) returns first 12 chars.""" |
| 170 | |
| 171 | bare = f"1234567890ab{'cd' * 26}" # 64 chars total |
| 172 | result = short_id(bare) |
| 173 | assert result == "1234567890ab" |
| 174 | |
| 175 | def test_text_output_shows_sha256_short_id(self, tmp_path: pathlib.Path) -> None: |
| 176 | """Text output must show sha256:<12 hex chars> short IDs, not bare hex.""" |
| 177 | _init_repo(tmp_path) |
| 178 | base = _make_commit(tmp_path, content=b"base") |
| 179 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 180 | c1 = _make_commit(tmp_path, parent_id=base, content=b"c1") |
| 181 | result = _invoke(["rebase", "--dry-run", "upstream"], tmp_path) |
| 182 | assert result.exit_code == 0 |
| 183 | # The output must contain sha256:<first 12 hex chars of c1> |
| 184 | expected_short = long_id(c1[7:19])# prefix + 12 hex chars |
| 185 | assert expected_short in result.output, ( |
| 186 | f"Expected {expected_short!r} in dry-run text output.\n" |
| 187 | f"Got: {result.output!r}" |
| 188 | ) |
| 189 | |
| 190 | |
| 191 | # --------------------------------------------------------------------------- |
| 192 | # _resolve_ref_to_id — must handle sha256:-prefixed content in ref files |
| 193 | # --------------------------------------------------------------------------- |
| 194 | |
| 195 | |
| 196 | class TestResolveRefToId: |
| 197 | """_resolve_ref_to_id must handle ref files whose content has sha256: prefix.""" |
| 198 | |
| 199 | def test_resolves_sha256_prefixed_ref_file(self, tmp_path: pathlib.Path) -> None: |
| 200 | """Bug: len(raw) == 64 check fails when ref file contains sha256:-prefixed ID (71 chars).""" |
| 201 | from muse.cli.commands.rebase import _resolve_ref_to_id |
| 202 | _init_repo(tmp_path) |
| 203 | commit_id = _make_commit(tmp_path, content=b"sha256-prefix-test") |
| 204 | # commit_id is sha256:-prefixed (71 chars) — the ref file already has this |
| 205 | resolved = _resolve_ref_to_id(tmp_path, "main", "main") |
| 206 | assert resolved == commit_id, ( |
| 207 | f"Expected {commit_id!r}, got {resolved!r}. " |
| 208 | "Bug: _resolve_ref_to_id len check fails for sha256:-prefixed IDs." |
| 209 | ) |
| 210 | |
| 211 | def test_resolves_head(self, tmp_path: pathlib.Path) -> None: |
| 212 | """HEAD resolves to the current branch's commit.""" |
| 213 | from muse.cli.commands.rebase import _resolve_ref_to_id |
| 214 | _init_repo(tmp_path) |
| 215 | commit_id = _make_commit(tmp_path, content=b"head-test") |
| 216 | result = _resolve_ref_to_id(tmp_path, "main", "HEAD") |
| 217 | assert result == commit_id |
| 218 | |
| 219 | def test_returns_none_for_missing_branch(self, tmp_path: pathlib.Path) -> None: |
| 220 | """Unknown branch name resolves to None.""" |
| 221 | from muse.cli.commands.rebase import _resolve_ref_to_id |
| 222 | _init_repo(tmp_path) |
| 223 | _make_commit(tmp_path) |
| 224 | result = _resolve_ref_to_id(tmp_path, "main", "nonexistent-branch") |
| 225 | assert result is None |
| 226 | |
| 227 | def test_resolved_id_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 228 | """The resolved commit ID must be sha256:-prefixed.""" |
| 229 | from muse.cli.commands.rebase import _resolve_ref_to_id |
| 230 | _init_repo(tmp_path) |
| 231 | _make_commit(tmp_path, content=b"prefix-check") |
| 232 | result = _resolve_ref_to_id(tmp_path, "main", "main") |
| 233 | assert result is not None |
| 234 | assert result.startswith("sha256:") |
| 235 | |
| 236 | |
| 237 | # --------------------------------------------------------------------------- |
| 238 | # duration_ms — all JSON output paths must include it |
| 239 | # --------------------------------------------------------------------------- |
| 240 | |
| 241 | |
| 242 | class TestJsonSchemaDurationMs: |
| 243 | """Every JSON output path must include duration_ms.""" |
| 244 | |
| 245 | def test_status_inactive_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 246 | _init_repo(tmp_path) |
| 247 | _make_commit(tmp_path) |
| 248 | result = _invoke(["rebase", "--status", "--json"], tmp_path) |
| 249 | assert result.exit_code == 0 |
| 250 | data = _json_from(result.output) |
| 251 | assert "duration_ms" in data, f"Missing duration_ms in status JSON: {data}" |
| 252 | assert isinstance(data["duration_ms"], (int, float)) |
| 253 | assert data["duration_ms"] >= 0 |
| 254 | |
| 255 | def test_status_active_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 256 | _init_repo(tmp_path) |
| 257 | state = RebaseState( |
| 258 | original_branch="main", original_head="a" * 64, onto="b" * 64, |
| 259 | remaining=["c" * 64], completed=[], squash=False, |
| 260 | ) |
| 261 | save_rebase_state(tmp_path, state) |
| 262 | result = _invoke(["rebase", "--status", "--json"], tmp_path) |
| 263 | assert result.exit_code == 0 |
| 264 | data = _json_from(result.output) |
| 265 | assert "duration_ms" in data |
| 266 | assert isinstance(data["duration_ms"], (int, float)) |
| 267 | |
| 268 | def test_abort_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 269 | _init_repo(tmp_path) |
| 270 | base = _make_commit(tmp_path) |
| 271 | state = RebaseState( |
| 272 | original_branch="main", original_head=base, onto=base, |
| 273 | remaining=[], completed=[], squash=False, |
| 274 | ) |
| 275 | save_rebase_state(tmp_path, state) |
| 276 | result = _invoke(["rebase", "--abort", "--json"], tmp_path) |
| 277 | assert result.exit_code == 0, result.output |
| 278 | data = _json_from(result.output) |
| 279 | assert "duration_ms" in data, f"Missing duration_ms in abort JSON: {data}" |
| 280 | assert isinstance(data["duration_ms"], (int, float)) |
| 281 | |
| 282 | def test_up_to_date_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 283 | _init_repo(tmp_path) |
| 284 | cid = _make_commit(tmp_path) |
| 285 | (ref_path(tmp_path, "up")).write_text(cid, encoding="utf-8") |
| 286 | result = _invoke(["rebase", "--json", "up"], tmp_path) |
| 287 | assert result.exit_code == 0, result.output |
| 288 | data = _json_from(result.output) |
| 289 | assert "duration_ms" in data, f"Missing duration_ms in up_to_date JSON: {data}" |
| 290 | assert isinstance(data["duration_ms"], (int, float)) |
| 291 | |
| 292 | def test_dry_run_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 293 | _init_repo(tmp_path) |
| 294 | base = _make_commit(tmp_path) |
| 295 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 296 | _make_commit(tmp_path, parent_id=base) |
| 297 | result = _invoke(["rebase", "--dry-run", "--json", "upstream"], tmp_path) |
| 298 | assert result.exit_code == 0, result.output |
| 299 | data = _json_from(result.output) |
| 300 | assert "duration_ms" in data, f"Missing duration_ms in dry_run JSON: {data}" |
| 301 | assert isinstance(data["duration_ms"], (int, float)) |
| 302 | |
| 303 | def test_completed_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 304 | _init_repo(tmp_path) |
| 305 | base = _make_commit(tmp_path) |
| 306 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 307 | _make_commit(tmp_path, parent_id=base) |
| 308 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 309 | assert result.exit_code == 0, result.output |
| 310 | data = _json_from(result.output) |
| 311 | assert "duration_ms" in data, f"Missing duration_ms in completed JSON: {data}" |
| 312 | assert isinstance(data["duration_ms"], (int, float)) |
| 313 | |
| 314 | |
| 315 | # --------------------------------------------------------------------------- |
| 316 | # exit_code — all JSON output paths must include it |
| 317 | # --------------------------------------------------------------------------- |
| 318 | |
| 319 | |
| 320 | class TestJsonSchemaExitCode: |
| 321 | """Every JSON output path must include exit_code.""" |
| 322 | |
| 323 | def test_status_json_has_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 324 | _init_repo(tmp_path) |
| 325 | _make_commit(tmp_path) |
| 326 | result = _invoke(["rebase", "--status", "--json"], tmp_path) |
| 327 | assert result.exit_code == 0 |
| 328 | data = _json_from(result.output) |
| 329 | assert "exit_code" in data, f"Missing exit_code: {data}" |
| 330 | assert data["exit_code"] == 0 |
| 331 | |
| 332 | def test_abort_json_has_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 333 | _init_repo(tmp_path) |
| 334 | base = _make_commit(tmp_path) |
| 335 | state = RebaseState( |
| 336 | original_branch="main", original_head=base, onto=base, |
| 337 | remaining=[], completed=[], squash=False, |
| 338 | ) |
| 339 | save_rebase_state(tmp_path, state) |
| 340 | result = _invoke(["rebase", "--abort", "--json"], tmp_path) |
| 341 | assert result.exit_code == 0, result.output |
| 342 | data = _json_from(result.output) |
| 343 | assert "exit_code" in data, f"Missing exit_code: {data}" |
| 344 | assert data["exit_code"] == 0 |
| 345 | |
| 346 | def test_up_to_date_json_has_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 347 | _init_repo(tmp_path) |
| 348 | cid = _make_commit(tmp_path) |
| 349 | (ref_path(tmp_path, "up")).write_text(cid, encoding="utf-8") |
| 350 | result = _invoke(["rebase", "--json", "up"], tmp_path) |
| 351 | assert result.exit_code == 0, result.output |
| 352 | data = _json_from(result.output) |
| 353 | assert "exit_code" in data |
| 354 | assert data["exit_code"] == 0 |
| 355 | |
| 356 | def test_dry_run_json_has_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 357 | _init_repo(tmp_path) |
| 358 | base = _make_commit(tmp_path) |
| 359 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 360 | _make_commit(tmp_path, parent_id=base) |
| 361 | result = _invoke(["rebase", "--dry-run", "--json", "upstream"], tmp_path) |
| 362 | assert result.exit_code == 0, result.output |
| 363 | data = _json_from(result.output) |
| 364 | assert "exit_code" in data |
| 365 | assert data["exit_code"] == 0 |
| 366 | |
| 367 | def test_completed_json_has_exit_code_0(self, tmp_path: pathlib.Path) -> None: |
| 368 | _init_repo(tmp_path) |
| 369 | base = _make_commit(tmp_path) |
| 370 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 371 | _make_commit(tmp_path, parent_id=base) |
| 372 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 373 | assert result.exit_code == 0, result.output |
| 374 | data = _json_from(result.output) |
| 375 | assert "exit_code" in data |
| 376 | assert data["exit_code"] == 0 |
| 377 | |
| 378 | def test_duration_ms_is_nonnegative_float(self, tmp_path: pathlib.Path) -> None: |
| 379 | """duration_ms must be a non-negative number.""" |
| 380 | _init_repo(tmp_path) |
| 381 | cid = _make_commit(tmp_path) |
| 382 | (ref_path(tmp_path, "up")).write_text(cid, encoding="utf-8") |
| 383 | result = _invoke(["rebase", "--json", "up"], tmp_path) |
| 384 | data = _json_from(result.output) |
| 385 | assert data["duration_ms"] >= 0.0 |
| 386 | |
| 387 | |
| 388 | # --------------------------------------------------------------------------- |
| 389 | # replayed_commit_ids — completed result JSON must list new commit IDs |
| 390 | # --------------------------------------------------------------------------- |
| 391 | |
| 392 | |
| 393 | class TestReplayedCommitIds: |
| 394 | """Completed rebase JSON must include replayed_commit_ids.""" |
| 395 | |
| 396 | def test_completed_has_replayed_commit_ids(self, tmp_path: pathlib.Path) -> None: |
| 397 | _init_repo(tmp_path) |
| 398 | base = _make_commit(tmp_path) |
| 399 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 400 | _make_commit(tmp_path, parent_id=base) |
| 401 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 402 | assert result.exit_code == 0, result.output |
| 403 | data = _json_from(result.output) |
| 404 | assert "replayed_commit_ids" in data, f"Missing replayed_commit_ids: {data}" |
| 405 | assert isinstance(data["replayed_commit_ids"], list) |
| 406 | |
| 407 | def test_replayed_commit_ids_count_matches_replayed(self, tmp_path: pathlib.Path) -> None: |
| 408 | _init_repo(tmp_path) |
| 409 | base = _make_commit(tmp_path) |
| 410 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 411 | c1 = _make_commit(tmp_path, parent_id=base) |
| 412 | c2 = _make_commit(tmp_path, parent_id=c1) |
| 413 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 414 | assert result.exit_code == 0, result.output |
| 415 | data = _json_from(result.output) |
| 416 | assert data["replayed"] == 2 |
| 417 | assert len(data["replayed_commit_ids"]) == 2 |
| 418 | |
| 419 | def test_replayed_commit_ids_are_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 420 | _init_repo(tmp_path) |
| 421 | base = _make_commit(tmp_path) |
| 422 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 423 | _make_commit(tmp_path, parent_id=base) |
| 424 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 425 | assert result.exit_code == 0, result.output |
| 426 | data = _json_from(result.output) |
| 427 | for cid in data["replayed_commit_ids"]: |
| 428 | assert cid.startswith("sha256:"), f"Not sha256:-prefixed: {cid!r}" |
| 429 | |
| 430 | def test_abort_has_replayed_commit_ids_empty(self, tmp_path: pathlib.Path) -> None: |
| 431 | """Aborted rebase has no new commits — replayed_commit_ids must be empty list.""" |
| 432 | _init_repo(tmp_path) |
| 433 | base = _make_commit(tmp_path) |
| 434 | state = RebaseState( |
| 435 | original_branch="main", original_head=base, onto=base, |
| 436 | remaining=[], completed=[], squash=False, |
| 437 | ) |
| 438 | save_rebase_state(tmp_path, state) |
| 439 | result = _invoke(["rebase", "--abort", "--json"], tmp_path) |
| 440 | assert result.exit_code == 0, result.output |
| 441 | data = _json_from(result.output) |
| 442 | assert "replayed_commit_ids" in data |
| 443 | assert data["replayed_commit_ids"] == [] |
| 444 | |
| 445 | def test_up_to_date_has_replayed_commit_ids_empty(self, tmp_path: pathlib.Path) -> None: |
| 446 | _init_repo(tmp_path) |
| 447 | cid = _make_commit(tmp_path) |
| 448 | (ref_path(tmp_path, "up")).write_text(cid, encoding="utf-8") |
| 449 | result = _invoke(["rebase", "--json", "up"], tmp_path) |
| 450 | assert result.exit_code == 0, result.output |
| 451 | data = _json_from(result.output) |
| 452 | assert "replayed_commit_ids" in data |
| 453 | assert data["replayed_commit_ids"] == [] |
| 454 | |
| 455 | |
| 456 | # --------------------------------------------------------------------------- |
| 457 | # Data integrity — IDs in JSON must be sha256:-prefixed |
| 458 | # --------------------------------------------------------------------------- |
| 459 | |
| 460 | |
| 461 | class TestDataIntegrity: |
| 462 | """All commit IDs in JSON output must be sha256:-prefixed.""" |
| 463 | |
| 464 | def test_new_head_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 465 | _init_repo(tmp_path) |
| 466 | base = _make_commit(tmp_path) |
| 467 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 468 | _make_commit(tmp_path, parent_id=base) |
| 469 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 470 | assert result.exit_code == 0, result.output |
| 471 | data = _json_from(result.output) |
| 472 | assert data["new_head"].startswith("sha256:"), f"new_head not sha256:-prefixed: {data['new_head']!r}" |
| 473 | |
| 474 | def test_onto_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 475 | _init_repo(tmp_path) |
| 476 | base = _make_commit(tmp_path) |
| 477 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 478 | _make_commit(tmp_path, parent_id=base) |
| 479 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 480 | assert result.exit_code == 0, result.output |
| 481 | data = _json_from(result.output) |
| 482 | assert data["onto"].startswith("sha256:"), f"onto not sha256:-prefixed: {data['onto']!r}" |
| 483 | |
| 484 | def test_up_to_date_new_head_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 485 | _init_repo(tmp_path) |
| 486 | cid = _make_commit(tmp_path) |
| 487 | (ref_path(tmp_path, "upstream")).write_text(cid, encoding="utf-8") |
| 488 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 489 | assert result.exit_code == 0, result.output |
| 490 | data = _json_from(result.output) |
| 491 | assert data["new_head"].startswith("sha256:") |
| 492 | |
| 493 | def test_abort_new_head_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 494 | _init_repo(tmp_path) |
| 495 | base = _make_commit(tmp_path) |
| 496 | state = RebaseState( |
| 497 | original_branch="main", original_head=base, onto=base, |
| 498 | remaining=[], completed=[], squash=False, |
| 499 | ) |
| 500 | save_rebase_state(tmp_path, state) |
| 501 | result = _invoke(["rebase", "--abort", "--json"], tmp_path) |
| 502 | assert result.exit_code == 0, result.output |
| 503 | data = _json_from(result.output) |
| 504 | assert data["new_head"].startswith("sha256:"), ( |
| 505 | f"abort new_head not sha256:-prefixed: {data['new_head']!r}" |
| 506 | ) |
| 507 | |
| 508 | def test_dry_run_commit_ids_are_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 509 | _init_repo(tmp_path) |
| 510 | base = _make_commit(tmp_path) |
| 511 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 512 | _make_commit(tmp_path, parent_id=base) |
| 513 | result = _invoke(["rebase", "--dry-run", "--json", "upstream"], tmp_path) |
| 514 | assert result.exit_code == 0, result.output |
| 515 | data = _json_from(result.output) |
| 516 | for entry in data["commits"]: |
| 517 | assert entry["commit_id"].startswith("sha256:"), ( |
| 518 | f"dry_run commit_id not sha256:-prefixed: {entry['commit_id']!r}" |
| 519 | ) |
| 520 | |
| 521 | def test_status_original_head_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: |
| 522 | _init_repo(tmp_path) |
| 523 | base = _make_commit(tmp_path) |
| 524 | state = RebaseState( |
| 525 | original_branch="main", original_head=base, onto=base, |
| 526 | remaining=[], completed=[], squash=False, |
| 527 | ) |
| 528 | save_rebase_state(tmp_path, state) |
| 529 | result = _invoke(["rebase", "--status", "--json"], tmp_path) |
| 530 | assert result.exit_code == 0 |
| 531 | data = _json_from(result.output) |
| 532 | assert data["original_head"].startswith("sha256:"), ( |
| 533 | f"status original_head not sha256:-prefixed: {data['original_head']!r}" |
| 534 | ) |
| 535 | |
| 536 | |
| 537 | # --------------------------------------------------------------------------- |
| 538 | # Full JSON schema — all fields present on each path |
| 539 | # --------------------------------------------------------------------------- |
| 540 | |
| 541 | |
| 542 | class TestJsonSchemaComplete: |
| 543 | """Verify all required fields exist in each output path.""" |
| 544 | |
| 545 | _RESULT_FIELDS = { |
| 546 | "status", "branch", "new_head", "onto", "squash", |
| 547 | "replayed", "replayed_commit_ids", "conflicts", |
| 548 | "duration_ms", "exit_code", |
| 549 | } |
| 550 | _STATUS_FIELDS = { |
| 551 | "active", "original_branch", "original_head", "onto", |
| 552 | "total", "done", "remaining", "squash", |
| 553 | "duration_ms", "exit_code", |
| 554 | } |
| 555 | _DRY_RUN_FIELDS = { |
| 556 | "branch", "onto", "commits", "count", "squash", |
| 557 | "duration_ms", "exit_code", |
| 558 | } |
| 559 | |
| 560 | def test_completed_schema(self, tmp_path: pathlib.Path) -> None: |
| 561 | _init_repo(tmp_path) |
| 562 | base = _make_commit(tmp_path) |
| 563 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 564 | _make_commit(tmp_path, parent_id=base) |
| 565 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 566 | assert result.exit_code == 0, result.output |
| 567 | data = _json_from(result.output) |
| 568 | missing = self._RESULT_FIELDS - set(data) |
| 569 | assert not missing, f"completed JSON missing fields: {missing}" |
| 570 | assert data["status"] == "completed" |
| 571 | assert data["exit_code"] == 0 |
| 572 | assert data["replayed"] == 1 |
| 573 | assert len(data["replayed_commit_ids"]) == 1 |
| 574 | |
| 575 | def test_aborted_schema(self, tmp_path: pathlib.Path) -> None: |
| 576 | _init_repo(tmp_path) |
| 577 | base = _make_commit(tmp_path) |
| 578 | tip = _make_commit(tmp_path, parent_id=base) |
| 579 | state = RebaseState( |
| 580 | original_branch="main", original_head=base, onto=base, |
| 581 | remaining=[tip], completed=[], squash=False, |
| 582 | ) |
| 583 | save_rebase_state(tmp_path, state) |
| 584 | result = _invoke(["rebase", "--abort", "--json"], tmp_path) |
| 585 | assert result.exit_code == 0, result.output |
| 586 | data = _json_from(result.output) |
| 587 | missing = self._RESULT_FIELDS - set(data) |
| 588 | assert not missing, f"aborted JSON missing fields: {missing}" |
| 589 | assert data["status"] == "aborted" |
| 590 | assert data["exit_code"] == 0 |
| 591 | assert data["new_head"] == base |
| 592 | assert data["replayed_commit_ids"] == [] |
| 593 | |
| 594 | def test_up_to_date_schema(self, tmp_path: pathlib.Path) -> None: |
| 595 | _init_repo(tmp_path) |
| 596 | cid = _make_commit(tmp_path) |
| 597 | (ref_path(tmp_path, "upstream")).write_text(cid, encoding="utf-8") |
| 598 | result = _invoke(["rebase", "--json", "upstream"], tmp_path) |
| 599 | assert result.exit_code == 0, result.output |
| 600 | data = _json_from(result.output) |
| 601 | missing = self._RESULT_FIELDS - set(data) |
| 602 | assert not missing, f"up_to_date JSON missing fields: {missing}" |
| 603 | assert data["status"] == "up_to_date" |
| 604 | assert data["exit_code"] == 0 |
| 605 | assert data["replayed"] == 0 |
| 606 | assert data["replayed_commit_ids"] == [] |
| 607 | |
| 608 | def test_dry_run_schema(self, tmp_path: pathlib.Path) -> None: |
| 609 | _init_repo(tmp_path) |
| 610 | base = _make_commit(tmp_path) |
| 611 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 612 | c1 = _make_commit(tmp_path, parent_id=base) |
| 613 | result = _invoke(["rebase", "--dry-run", "--json", "upstream"], tmp_path) |
| 614 | assert result.exit_code == 0, result.output |
| 615 | data = _json_from(result.output) |
| 616 | missing = self._DRY_RUN_FIELDS - set(data) |
| 617 | assert not missing, f"dry_run JSON missing fields: {missing}" |
| 618 | assert data["count"] == 1 |
| 619 | assert data["commits"][0]["commit_id"] == c1 |
| 620 | assert data["exit_code"] == 0 |
| 621 | |
| 622 | def test_status_schema_inactive(self, tmp_path: pathlib.Path) -> None: |
| 623 | _init_repo(tmp_path) |
| 624 | _make_commit(tmp_path) |
| 625 | result = _invoke(["rebase", "--status", "--json"], tmp_path) |
| 626 | assert result.exit_code == 0 |
| 627 | data = _json_from(result.output) |
| 628 | missing = self._STATUS_FIELDS - set(data) |
| 629 | assert not missing, f"status JSON missing fields: {missing}" |
| 630 | assert data["active"] is False |
| 631 | assert data["exit_code"] == 0 |
| 632 | |
| 633 | def test_status_schema_active(self, tmp_path: pathlib.Path) -> None: |
| 634 | _init_repo(tmp_path) |
| 635 | base = _make_commit(tmp_path) |
| 636 | state = RebaseState( |
| 637 | original_branch="feat/x", |
| 638 | original_head=base, |
| 639 | onto=base, |
| 640 | remaining=[base], |
| 641 | completed=[], |
| 642 | squash=True, |
| 643 | ) |
| 644 | save_rebase_state(tmp_path, state) |
| 645 | result = _invoke(["rebase", "--status", "--json"], tmp_path) |
| 646 | assert result.exit_code == 0 |
| 647 | data = _json_from(result.output) |
| 648 | missing = self._STATUS_FIELDS - set(data) |
| 649 | assert not missing, f"status (active) JSON missing fields: {missing}" |
| 650 | assert data["active"] is True |
| 651 | assert data["original_branch"] == "feat/x" |
| 652 | assert data["exit_code"] == 0 |
| 653 | |
| 654 | |
| 655 | # --------------------------------------------------------------------------- |
| 656 | # Lifecycle integration tests |
| 657 | # --------------------------------------------------------------------------- |
| 658 | |
| 659 | |
| 660 | class TestRebaseLifecycle: |
| 661 | """Full lifecycle: init → rebase → result; abort restores HEAD.""" |
| 662 | |
| 663 | def test_simple_rebase_completed(self, tmp_path: pathlib.Path) -> None: |
| 664 | _init_repo(tmp_path) |
| 665 | base = _make_commit(tmp_path) |
| 666 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 667 | _make_commit(tmp_path, parent_id=base) |
| 668 | result = _invoke(["rebase", "upstream"], tmp_path) |
| 669 | assert result.exit_code == 0, result.output |
| 670 | assert "complete" in result.output.lower() or "up to date" in result.output.lower() |
| 671 | |
| 672 | def test_abort_restores_head(self, tmp_path: pathlib.Path) -> None: |
| 673 | _init_repo(tmp_path) |
| 674 | base = _make_commit(tmp_path) |
| 675 | tip = _make_commit(tmp_path, parent_id=base) |
| 676 | state = RebaseState( |
| 677 | original_branch="main", original_head=base, onto=base, |
| 678 | remaining=[tip], completed=[], squash=False, |
| 679 | ) |
| 680 | save_rebase_state(tmp_path, state) |
| 681 | result = _invoke(["rebase", "--abort"], tmp_path) |
| 682 | assert result.exit_code == 0 |
| 683 | assert "aborted" in result.output.lower() |
| 684 | assert load_rebase_state(tmp_path) is None |
| 685 | restored = (ref_path(tmp_path, "main")).read_text(encoding="utf-8").strip() |
| 686 | assert restored == base |
| 687 | |
| 688 | def test_abort_text_shows_sha256_short_id(self, tmp_path: pathlib.Path) -> None: |
| 689 | """Abort text output must show sha256:<12 hex chars>, not bare hex.""" |
| 690 | _init_repo(tmp_path) |
| 691 | base = _make_commit(tmp_path) |
| 692 | state = RebaseState( |
| 693 | original_branch="main", original_head=base, onto=base, |
| 694 | remaining=[], completed=[], squash=False, |
| 695 | ) |
| 696 | save_rebase_state(tmp_path, state) |
| 697 | result = _invoke(["rebase", "--abort"], tmp_path) |
| 698 | assert result.exit_code == 0 |
| 699 | expected_short = long_id(base[7:19])# prefix + 12 hex chars |
| 700 | assert expected_short in result.output, ( |
| 701 | f"Expected {expected_short!r} in abort text output: {result.output!r}" |
| 702 | ) |
| 703 | |
| 704 | def test_already_up_to_date_text(self, tmp_path: pathlib.Path) -> None: |
| 705 | _init_repo(tmp_path) |
| 706 | cid = _make_commit(tmp_path) |
| 707 | (ref_path(tmp_path, "up")).write_text(cid, encoding="utf-8") |
| 708 | result = _invoke(["rebase", "up"], tmp_path) |
| 709 | assert result.exit_code == 0 |
| 710 | assert "up to date" in result.output.lower() |
| 711 | |
| 712 | def test_dry_run_no_side_effects(self, tmp_path: pathlib.Path) -> None: |
| 713 | """--dry-run must not write REBASE_STATE.json or modify branch refs.""" |
| 714 | _init_repo(tmp_path) |
| 715 | base = _make_commit(tmp_path) |
| 716 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 717 | c1 = _make_commit(tmp_path, parent_id=base) |
| 718 | original_head = (ref_path(tmp_path, "main")).read_text(encoding="utf-8").strip() |
| 719 | result = _invoke(["rebase", "--dry-run", "upstream"], tmp_path) |
| 720 | assert result.exit_code == 0 |
| 721 | assert not (rebase_state_path(tmp_path)).exists() |
| 722 | new_head = (ref_path(tmp_path, "main")).read_text(encoding="utf-8").strip() |
| 723 | assert new_head == original_head |
| 724 | expected_short = long_id(c1[7:19]) |
| 725 | assert expected_short in result.output |
| 726 | |
| 727 | def test_dry_run_squash_flag(self, tmp_path: pathlib.Path) -> None: |
| 728 | _init_repo(tmp_path) |
| 729 | base = _make_commit(tmp_path) |
| 730 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 731 | _make_commit(tmp_path, parent_id=base) |
| 732 | result = _invoke(["rebase", "--dry-run", "--squash", "--json", "upstream"], tmp_path) |
| 733 | assert result.exit_code == 0, result.output |
| 734 | data = _json_from(result.output) |
| 735 | assert data["squash"] is True |
| 736 | |
| 737 | def test_status_text_inactive(self, tmp_path: pathlib.Path) -> None: |
| 738 | _init_repo(tmp_path) |
| 739 | _make_commit(tmp_path) |
| 740 | result = _invoke(["rebase", "--status"], tmp_path) |
| 741 | assert result.exit_code == 0 |
| 742 | assert "No rebase" in result.output |
| 743 | |
| 744 | def test_status_text_active(self, tmp_path: pathlib.Path) -> None: |
| 745 | _init_repo(tmp_path) |
| 746 | base = _make_commit(tmp_path) |
| 747 | state = RebaseState( |
| 748 | original_branch="feat/y", original_head=base, onto=base, |
| 749 | remaining=[base], completed=[], squash=True, |
| 750 | ) |
| 751 | save_rebase_state(tmp_path, state) |
| 752 | result = _invoke(["rebase", "--status"], tmp_path) |
| 753 | assert result.exit_code == 0 |
| 754 | assert "feat/y" in result.output |
| 755 | |
| 756 | def test_completed_clears_state_file(self, tmp_path: pathlib.Path) -> None: |
| 757 | """After a clean rebase, REBASE_STATE.json must be removed.""" |
| 758 | _init_repo(tmp_path) |
| 759 | base = _make_commit(tmp_path) |
| 760 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 761 | _make_commit(tmp_path, parent_id=base) |
| 762 | result = _invoke(["rebase", "upstream"], tmp_path) |
| 763 | assert result.exit_code == 0, result.output |
| 764 | assert load_rebase_state(tmp_path) is None |
| 765 | |
| 766 | def test_max_commits_cap(self, tmp_path: pathlib.Path) -> None: |
| 767 | """--max-commits 2 on a 5-commit chain reports at most 2.""" |
| 768 | _init_repo(tmp_path) |
| 769 | base = _make_commit(tmp_path) |
| 770 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 771 | prev = base |
| 772 | for _ in range(5): |
| 773 | prev = _make_commit(tmp_path, parent_id=prev) |
| 774 | result = _invoke( |
| 775 | ["rebase", "--dry-run", "--json", "--max-commits", "2", "upstream"], tmp_path |
| 776 | ) |
| 777 | assert result.exit_code == 0, result.output |
| 778 | data = _json_from(result.output) |
| 779 | assert data["count"] <= 2 |
| 780 | |
| 781 | |
| 782 | # --------------------------------------------------------------------------- |
| 783 | # Error paths |
| 784 | # --------------------------------------------------------------------------- |
| 785 | |
| 786 | |
| 787 | class TestErrors: |
| 788 | """Error conditions must exit non-zero.""" |
| 789 | |
| 790 | def test_no_upstream_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 791 | _init_repo(tmp_path) |
| 792 | _make_commit(tmp_path) |
| 793 | result = _invoke(["rebase"], tmp_path) |
| 794 | assert result.exit_code != 0 |
| 795 | |
| 796 | def test_unknown_upstream_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 797 | _init_repo(tmp_path) |
| 798 | _make_commit(tmp_path) |
| 799 | result = _invoke(["rebase", "nonexistent-branch-xyz"], tmp_path) |
| 800 | assert result.exit_code != 0 |
| 801 | assert "not found" in result.stderr.lower() |
| 802 | |
| 803 | def test_abort_no_state_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 804 | _init_repo(tmp_path) |
| 805 | result = _invoke(["rebase", "--abort"], tmp_path) |
| 806 | assert result.exit_code != 0 |
| 807 | |
| 808 | def test_continue_no_state_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 809 | _init_repo(tmp_path) |
| 810 | result = _invoke(["rebase", "--continue"], tmp_path) |
| 811 | assert result.exit_code != 0 |
| 812 | |
| 813 | def test_rebase_in_progress_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 814 | _init_repo(tmp_path) |
| 815 | base = _make_commit(tmp_path) |
| 816 | state = RebaseState( |
| 817 | original_branch="main", original_head=base, onto=base, |
| 818 | remaining=[], completed=[], squash=False, |
| 819 | ) |
| 820 | save_rebase_state(tmp_path, state) |
| 821 | result = _invoke(["rebase", "main"], tmp_path) |
| 822 | assert result.exit_code != 0 |
| 823 | assert "--continue" in result.stderr or "--abort" in result.stderr |
| 824 | |
| 825 | |
| 826 | # --------------------------------------------------------------------------- |
| 827 | # Security — symlink and size guards (from hardening tests) |
| 828 | # --------------------------------------------------------------------------- |
| 829 | |
| 830 | |
| 831 | class TestSecurity: |
| 832 | """Symlink and size-cap guards on REBASE_STATE.json.""" |
| 833 | |
| 834 | def test_load_rebase_state_symlink_rejected(self, tmp_path: pathlib.Path) -> None: |
| 835 | _init_repo(tmp_path) |
| 836 | state_path = rebase_state_path(tmp_path) |
| 837 | target = tmp_path / "sensitive.json" |
| 838 | target.write_text( |
| 839 | json.dumps({ |
| 840 | "original_branch": "main", |
| 841 | "original_head": "a" * 64, |
| 842 | "onto": "b" * 64, |
| 843 | "remaining": [], |
| 844 | "completed": [], |
| 845 | "squash": False, |
| 846 | }), |
| 847 | encoding="utf-8", |
| 848 | ) |
| 849 | state_path.symlink_to(target) |
| 850 | result = load_rebase_state(tmp_path) |
| 851 | assert result is None, "Symlinked state file must be rejected" |
| 852 | |
| 853 | def test_save_rebase_state_symlink_rejected(self, tmp_path: pathlib.Path) -> None: |
| 854 | _init_repo(tmp_path) |
| 855 | state_path = rebase_state_path(tmp_path) |
| 856 | target = tmp_path / "victim.json" |
| 857 | target.write_text("{}", encoding="utf-8") |
| 858 | state_path.symlink_to(target) |
| 859 | state = RebaseState( |
| 860 | original_branch="main", original_head="a" * 64, onto="b" * 64, |
| 861 | remaining=[], completed=[], squash=False, |
| 862 | ) |
| 863 | with pytest.raises(OSError, match="symlink"): |
| 864 | save_rebase_state(tmp_path, state) |
| 865 | assert target.read_text(encoding="utf-8") == "{}" |
| 866 | |
| 867 | def test_clear_rebase_state_symlink_not_deleted(self, tmp_path: pathlib.Path) -> None: |
| 868 | _init_repo(tmp_path) |
| 869 | state_path = rebase_state_path(tmp_path) |
| 870 | target = tmp_path / "do_not_delete.json" |
| 871 | target.write_text("important", encoding="utf-8") |
| 872 | state_path.symlink_to(target) |
| 873 | clear_rebase_state(tmp_path) |
| 874 | assert target.exists() |
| 875 | |
| 876 | def test_load_rebase_state_size_cap_rejected(self, tmp_path: pathlib.Path) -> None: |
| 877 | _init_repo(tmp_path) |
| 878 | state_path = rebase_state_path(tmp_path) |
| 879 | state_path.write_bytes(b"x" * (_MAX_STATE_BYTES + 1)) |
| 880 | result = load_rebase_state(tmp_path) |
| 881 | assert result is None |
| 882 | |
| 883 | def test_load_rebase_state_exactly_at_cap_rejected(self, tmp_path: pathlib.Path) -> None: |
| 884 | _init_repo(tmp_path) |
| 885 | state_path = rebase_state_path(tmp_path) |
| 886 | state_path.write_bytes(b"y" * _MAX_STATE_BYTES) |
| 887 | result = load_rebase_state(tmp_path) |
| 888 | assert result is None # invalid JSON, size check fires first |
| 889 | |
| 890 | |
| 891 | # --------------------------------------------------------------------------- |
| 892 | # Performance |
| 893 | # --------------------------------------------------------------------------- |
| 894 | |
| 895 | |
| 896 | class TestPerformance: |
| 897 | """Timing guards — key operations must complete quickly.""" |
| 898 | |
| 899 | def test_status_completes_within_200ms(self, tmp_path: pathlib.Path) -> None: |
| 900 | _init_repo(tmp_path) |
| 901 | _make_commit(tmp_path) |
| 902 | t0 = time.monotonic() |
| 903 | result = _invoke(["rebase", "--status", "--json"], tmp_path) |
| 904 | elapsed = time.monotonic() - t0 |
| 905 | assert result.exit_code == 0 |
| 906 | assert elapsed < 0.2, f"--status took {elapsed*1000:.1f}ms (expected <200ms)" |
| 907 | |
| 908 | def test_dry_run_50_commits_completes_within_5s(self, tmp_path: pathlib.Path) -> None: |
| 909 | _init_repo(tmp_path) |
| 910 | base = _make_commit(tmp_path, content=b"perf-base") |
| 911 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 912 | prev = base |
| 913 | for i in range(50): |
| 914 | prev = _make_commit(tmp_path, parent_id=prev, content=f"p{i}".encode()) |
| 915 | t0 = time.monotonic() |
| 916 | result = _invoke(["rebase", "--dry-run", "--json", "upstream"], tmp_path) |
| 917 | elapsed = time.monotonic() - t0 |
| 918 | assert result.exit_code == 0, result.output |
| 919 | data = _json_from(result.output) |
| 920 | assert data["count"] == 50 |
| 921 | assert elapsed < 5.0, f"dry-run 50 commits took {elapsed:.2f}s (expected <5s)" |
| 922 | |
| 923 | def test_duration_ms_is_positive(self, tmp_path: pathlib.Path) -> None: |
| 924 | _init_repo(tmp_path) |
| 925 | cid = _make_commit(tmp_path) |
| 926 | (ref_path(tmp_path, "up")).write_text(cid, encoding="utf-8") |
| 927 | result = _invoke(["rebase", "--json", "up"], tmp_path) |
| 928 | data = _json_from(result.output) |
| 929 | # duration_ms must be a number (could be 0.0 on very fast systems, but always a float) |
| 930 | assert isinstance(data["duration_ms"], (int, float)) |
| 931 | |
| 932 | |
| 933 | # --------------------------------------------------------------------------- |
| 934 | # Stress |
| 935 | # --------------------------------------------------------------------------- |
| 936 | |
| 937 | |
| 938 | class TestStress: |
| 939 | """Large rebase chains and concurrent operations.""" |
| 940 | |
| 941 | def test_collect_20_commits(self, tmp_path: pathlib.Path) -> None: |
| 942 | _init_repo(tmp_path) |
| 943 | base = _make_commit(tmp_path, content=b"stress-base") |
| 944 | prev = base |
| 945 | ids = [] |
| 946 | for i in range(20): |
| 947 | prev = _make_commit(tmp_path, parent_id=prev, content=f"s{i}".encode()) |
| 948 | ids.append(prev) |
| 949 | result = collect_commits_to_replay(tmp_path, stop_at=base, tip=prev) |
| 950 | assert len(result) == 20 |
| 951 | assert result[0].commit_id == ids[0] |
| 952 | assert result[-1].commit_id == ids[-1] |
| 953 | |
| 954 | def test_50_commit_dry_run_json(self, tmp_path: pathlib.Path) -> None: |
| 955 | _init_repo(tmp_path) |
| 956 | base = _make_commit(tmp_path, content=b"fifty-base") |
| 957 | (ref_path(tmp_path, "upstream")).write_text(base, encoding="utf-8") |
| 958 | prev = base |
| 959 | ids = [] |
| 960 | for i in range(50): |
| 961 | prev = _make_commit(tmp_path, parent_id=prev, content=f"t{i}".encode()) |
| 962 | ids.append(prev) |
| 963 | result = _invoke(["rebase", "--dry-run", "--json", "upstream"], tmp_path) |
| 964 | assert result.exit_code == 0, result.output |
| 965 | data = _json_from(result.output) |
| 966 | assert data["count"] == 50 |
| 967 | assert len(data["commits"]) == 50 |
| 968 | assert data["commits"][0]["commit_id"] == ids[0] |
| 969 | assert data["commits"][-1]["commit_id"] == ids[-1] |
| 970 | # All IDs must be sha256:-prefixed |
| 971 | for entry in data["commits"]: |
| 972 | assert entry["commit_id"].startswith("sha256:") |
| 973 | |
| 974 | def test_concurrent_status_reads(self, tmp_path: pathlib.Path) -> None: |
| 975 | """Multiple threads calling get_rebase_progress must not crash.""" |
| 976 | _init_repo(tmp_path) |
| 977 | state = RebaseState( |
| 978 | original_branch="main", original_head="a" * 64, onto="b" * 64, |
| 979 | remaining=["c" * 64] * 10, completed=["d" * 64] * 5, squash=False, |
| 980 | ) |
| 981 | save_rebase_state(tmp_path, state) |
| 982 | errors: list[str] = [] |
| 983 | |
| 984 | def _read() -> None: |
| 985 | try: |
| 986 | p = get_rebase_progress(tmp_path) |
| 987 | assert p["active"] is True |
| 988 | except Exception as exc: |
| 989 | errors.append(str(exc)) |
| 990 | |
| 991 | threads = [threading.Thread(target=_read) for _ in range(20)] |
| 992 | for t in threads: |
| 993 | t.start() |
| 994 | for t in threads: |
| 995 | t.join() |
| 996 | assert not errors, f"Concurrent status failures: {errors}" |
| 997 | |
| 998 | def test_status_1000_element_state(self, tmp_path: pathlib.Path) -> None: |
| 999 | """get_rebase_progress is fast even with a 1000-element state.""" |
| 1000 | _init_repo(tmp_path) |
| 1001 | state = RebaseState( |
| 1002 | original_branch="main", original_head="a" * 64, onto="b" * 64, |
| 1003 | remaining=["c" * 64] * 500, completed=["d" * 64] * 500, squash=False, |
| 1004 | ) |
| 1005 | save_rebase_state(tmp_path, state) |
| 1006 | p = get_rebase_progress(tmp_path) |
| 1007 | assert p["total"] == 1000 |
| 1008 | assert p["done"] == 500 |
| 1009 | assert p["remaining"] == 500 |
| 1010 | |
| 1011 | |
| 1012 | # --------------------------------------------------------------------------- |
| 1013 | # TestRegisterFlags — argparse-level verification |
| 1014 | # --------------------------------------------------------------------------- |
| 1015 | |
| 1016 | |
| 1017 | class TestRegisterFlags: |
| 1018 | """Verify that register() wires --json / -j correctly.""" |
| 1019 | |
| 1020 | def _make_parser(self) -> "argparse.ArgumentParser": |
| 1021 | import argparse |
| 1022 | from muse.cli.commands.rebase import register |
| 1023 | ap = argparse.ArgumentParser() |
| 1024 | subs = ap.add_subparsers() |
| 1025 | register(subs) |
| 1026 | return ap |
| 1027 | |
| 1028 | def test_json_flag_long(self) -> None: |
| 1029 | ns = self._make_parser().parse_args(["rebase", "--json"]) |
| 1030 | assert ns.json_out is True |
| 1031 | |
| 1032 | def test_j_alias(self) -> None: |
| 1033 | ns = self._make_parser().parse_args(["rebase", "-j"]) |
| 1034 | assert ns.json_out is True |
| 1035 | |
| 1036 | def test_default_is_text(self) -> None: |
| 1037 | ns = self._make_parser().parse_args(["rebase"]) |
| 1038 | assert ns.json_out is False |
| 1039 | |
| 1040 | def test_dest_is_json_out(self) -> None: |
| 1041 | ns = self._make_parser().parse_args(["rebase", "-j"]) |
| 1042 | assert hasattr(ns, "json_out") |
| 1043 | assert not hasattr(ns, "fmt") |