test_branch_supercharge.py
python
sha256:f81d1ddcc1eec1eaccc667f22067476a0cf138860691611738c0c799df24a1db
fixing more failing tests
Human
3 days ago
| 1 | """Supercharge tests for ``muse branch``. |
| 2 | |
| 3 | Gaps filled versus the baseline test_cmd_branch.py |
| 4 | --------------------------------------------------- |
| 5 | 1. ``committed_at`` field present in list --json schema (baseline missed it) |
| 6 | 2. ``--sort committeddate`` actual ordering (baseline only checked exit 0) |
| 7 | 3. ``-r`` remote-tracking listing E2E |
| 8 | 4. ``-a`` combined local + remote listing E2E |
| 9 | 5. ``-dr`` remote-tracking ref deletion — success path |
| 10 | 6. ``-vv`` upstream shown in text output |
| 11 | 7. Diamond-merge DAG correctness for ``--merged`` |
| 12 | 8. Data integrity: empty parent dirs cleaned after nested branch delete |
| 13 | 9. Rename into nested path creates parent dirs |
| 14 | 10. Force-rename/copy leaves destination at correct tip |
| 15 | 11. JSON error schemas for delete/rename/copy operations |
| 16 | 12. Performance: ``--sort committeddate`` with 50 branches under 3 s |
| 17 | 13. Security: ANSI injection in ``--merged`` / ``--no-merged`` / ``--contains`` |
| 18 | 14. Docstring coverage for all public helpers |
| 19 | |
| 20 | Test categories |
| 21 | --------------- |
| 22 | - unit : _cleanup_empty_dirs, _ref_file, _list_remotes |
| 23 | - integration : remote ops (-r/-a/-dr), committeddate ordering, DAG, -vv |
| 24 | - e2e : full CLI round-trips for new scenarios |
| 25 | - security : ANSI in filter flags, error output sanitisation |
| 26 | - data_integrity: empty-dir cleanup, atomic rename into deep paths, force overwrites |
| 27 | - performance : --sort committeddate with 50 branches |
| 28 | - docstrings : public helper docstring coverage |
| 29 | """ |
| 30 | |
| 31 | from __future__ import annotations |
| 32 | from collections.abc import Mapping |
| 33 | |
| 34 | import json |
| 35 | import os |
| 36 | import pathlib |
| 37 | import time |
| 38 | |
| 39 | import pytest |
| 40 | |
| 41 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 42 | from muse.core.refs import ( |
| 43 | get_head_commit_id, |
| 44 | read_current_branch, |
| 45 | ) |
| 46 | from muse.core.types import long_id |
| 47 | from muse.core.paths import config_toml_path, heads_dir, muse_dir, remotes_dir |
| 48 | |
| 49 | runner = CliRunner() |
| 50 | |
| 51 | type _AnyObj = object |
| 52 | |
| 53 | |
| 54 | # --------------------------------------------------------------------------- |
| 55 | # Helpers |
| 56 | # --------------------------------------------------------------------------- |
| 57 | |
| 58 | |
| 59 | def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: |
| 60 | saved = os.getcwd() |
| 61 | try: |
| 62 | os.chdir(repo) |
| 63 | return runner.invoke(None, args) |
| 64 | finally: |
| 65 | os.chdir(saved) |
| 66 | |
| 67 | |
| 68 | def _branch(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 69 | return _invoke(repo, ["branch", *extra]) |
| 70 | |
| 71 | |
| 72 | def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 73 | _invoke(repo, ["code", "add", "."]) |
| 74 | return _invoke(repo, ["commit", *extra]) |
| 75 | |
| 76 | |
| 77 | @pytest.fixture() |
| 78 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 79 | """Initialised repo with one commit on ``main``.""" |
| 80 | saved = os.getcwd() |
| 81 | try: |
| 82 | os.chdir(tmp_path) |
| 83 | runner.invoke(None, ["init"]) |
| 84 | finally: |
| 85 | os.chdir(saved) |
| 86 | (tmp_path / "a.py").write_text("x = 1\n") |
| 87 | _commit(tmp_path, "-m", "initial") |
| 88 | return tmp_path |
| 89 | |
| 90 | |
| 91 | @pytest.fixture() |
| 92 | def two_commit_repo(repo: pathlib.Path) -> pathlib.Path: |
| 93 | (repo / "b.py").write_text("y = 2\n") |
| 94 | _commit(repo, "-m", "second") |
| 95 | return repo |
| 96 | |
| 97 | |
| 98 | def _first_json(result: InvokeResult) -> Mapping[str, object]: |
| 99 | """Extract the first JSON object from mixed stdout+stderr output.""" |
| 100 | for line in result.output.splitlines(): |
| 101 | stripped = line.strip() |
| 102 | if stripped.startswith("{"): |
| 103 | return json.loads(stripped) |
| 104 | raise ValueError(f"No JSON object in output:\n{result.output!r}") |
| 105 | |
| 106 | |
| 107 | def _make_remote_ref( |
| 108 | repo: pathlib.Path, remote: str, branch: str, commit_id: str |
| 109 | ) -> None: |
| 110 | """Write a local remote-tracking ref to simulate a previous push.""" |
| 111 | ref_dir = remotes_dir(repo) / remote |
| 112 | ref_file = ref_dir / branch |
| 113 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 114 | ref_file.write_text(commit_id + "\n", encoding="utf-8") |
| 115 | |
| 116 | |
| 117 | # --------------------------------------------------------------------------- |
| 118 | # Unit: _ref_file |
| 119 | # --------------------------------------------------------------------------- |
| 120 | |
| 121 | |
| 122 | class TestRefFile: |
| 123 | def test_simple_branch(self, tmp_path: pathlib.Path) -> None: |
| 124 | from muse.cli.commands.branch import _ref_file |
| 125 | p = _ref_file(tmp_path, "main") |
| 126 | assert p == heads_dir(tmp_path) / "main" |
| 127 | |
| 128 | def test_nested_branch(self, tmp_path: pathlib.Path) -> None: |
| 129 | from muse.cli.commands.branch import _ref_file |
| 130 | p = _ref_file(tmp_path, "feat/sub/task") |
| 131 | assert p == heads_dir(tmp_path) / "feat" / "sub" / "task" |
| 132 | |
| 133 | def test_returns_pathlib_path(self, tmp_path: pathlib.Path) -> None: |
| 134 | from muse.cli.commands.branch import _ref_file |
| 135 | p = _ref_file(tmp_path, "dev") |
| 136 | assert isinstance(p, pathlib.Path) |
| 137 | |
| 138 | |
| 139 | # --------------------------------------------------------------------------- |
| 140 | # Unit: _cleanup_empty_dirs |
| 141 | # --------------------------------------------------------------------------- |
| 142 | |
| 143 | |
| 144 | class TestCleanupEmptyDirs: |
| 145 | def test_removes_empty_parent_dir(self, tmp_path: pathlib.Path) -> None: |
| 146 | from muse.cli.commands.branch import _cleanup_empty_dirs |
| 147 | heads = tmp_path / "heads" |
| 148 | ref = heads / "feat" / "task" |
| 149 | ref.parent.mkdir(parents=True) |
| 150 | ref.write_text("") |
| 151 | ref.unlink() |
| 152 | _cleanup_empty_dirs(ref, heads) |
| 153 | assert not (heads / "feat").exists() |
| 154 | |
| 155 | def test_stops_at_heads_dir(self, tmp_path: pathlib.Path) -> None: |
| 156 | from muse.cli.commands.branch import _cleanup_empty_dirs |
| 157 | heads = tmp_path / "heads" |
| 158 | heads.mkdir() |
| 159 | ref = heads / "main" |
| 160 | ref.write_text("") |
| 161 | ref.unlink() |
| 162 | _cleanup_empty_dirs(ref, heads) |
| 163 | assert heads.exists() |
| 164 | |
| 165 | def test_leaves_non_empty_parent(self, tmp_path: pathlib.Path) -> None: |
| 166 | from muse.cli.commands.branch import _cleanup_empty_dirs |
| 167 | heads = tmp_path / "heads" |
| 168 | ref_a = heads / "feat" / "a" |
| 169 | ref_b = heads / "feat" / "b" |
| 170 | ref_a.parent.mkdir(parents=True) |
| 171 | ref_a.write_text("") |
| 172 | ref_b.write_text("") |
| 173 | ref_a.unlink() |
| 174 | _cleanup_empty_dirs(ref_a, heads) |
| 175 | # feat/ still has b, so it must not be removed |
| 176 | assert (heads / "feat").exists() |
| 177 | assert (heads / "feat" / "b").exists() |
| 178 | |
| 179 | def test_removes_multiple_levels(self, tmp_path: pathlib.Path) -> None: |
| 180 | from muse.cli.commands.branch import _cleanup_empty_dirs |
| 181 | heads = tmp_path / "heads" |
| 182 | ref = heads / "a" / "b" / "c" |
| 183 | ref.parent.mkdir(parents=True) |
| 184 | ref.write_text("") |
| 185 | ref.unlink() |
| 186 | _cleanup_empty_dirs(ref, heads) |
| 187 | assert not (heads / "a").exists() |
| 188 | |
| 189 | |
| 190 | # --------------------------------------------------------------------------- |
| 191 | # Unit: _list_remotes |
| 192 | # --------------------------------------------------------------------------- |
| 193 | |
| 194 | |
| 195 | class TestListRemotes: |
| 196 | def test_empty_when_no_remotes_dir(self, tmp_path: pathlib.Path) -> None: |
| 197 | from muse.cli.commands.branch import _list_remotes |
| 198 | muse_dir(tmp_path).mkdir() |
| 199 | assert _list_remotes(tmp_path) == [] |
| 200 | |
| 201 | def test_lists_single_remote(self, tmp_path: pathlib.Path) -> None: |
| 202 | from muse.cli.commands.branch import _list_remotes |
| 203 | ref = remotes_dir(tmp_path) / "origin" / "main" |
| 204 | ref.parent.mkdir(parents=True) |
| 205 | ref.write_text(long_id("a" * 64)) |
| 206 | remotes = _list_remotes(tmp_path) |
| 207 | assert "origin/main" in remotes |
| 208 | |
| 209 | def test_lists_nested_remote_branch(self, tmp_path: pathlib.Path) -> None: |
| 210 | from muse.cli.commands.branch import _list_remotes |
| 211 | ref = remotes_dir(tmp_path) / "origin" / "feat" / "task" |
| 212 | ref.parent.mkdir(parents=True) |
| 213 | ref.write_text(long_id("b" * 64)) |
| 214 | remotes = _list_remotes(tmp_path) |
| 215 | assert "origin/feat/task" in remotes |
| 216 | |
| 217 | def test_skips_hidden_files(self, tmp_path: pathlib.Path) -> None: |
| 218 | from muse.cli.commands.branch import _list_remotes |
| 219 | ref_dir = remotes_dir(tmp_path) / "origin" |
| 220 | ref_dir.mkdir(parents=True) |
| 221 | (ref_dir / ".hidden").write_text("ignored") |
| 222 | (ref_dir / "main").write_text(long_id("c" * 64)) |
| 223 | remotes = _list_remotes(tmp_path) |
| 224 | assert all(not r.endswith(".hidden") for r in remotes) |
| 225 | |
| 226 | def test_sorted_output(self, tmp_path: pathlib.Path) -> None: |
| 227 | from muse.cli.commands.branch import _list_remotes |
| 228 | for name in ("z-branch", "a-branch", "m-branch"): |
| 229 | ref = remotes_dir(tmp_path) / "origin" / name |
| 230 | ref.parent.mkdir(parents=True, exist_ok=True) |
| 231 | ref.write_text(long_id("d" * 64)) |
| 232 | remotes = _list_remotes(tmp_path) |
| 233 | assert remotes == sorted(remotes) |
| 234 | |
| 235 | |
| 236 | # --------------------------------------------------------------------------- |
| 237 | # Integration: JSON schema — committed_at field (gap in baseline) |
| 238 | # --------------------------------------------------------------------------- |
| 239 | |
| 240 | |
| 241 | class TestListJsonCommittedAt: |
| 242 | """committed_at must be present in the listing JSON schema.""" |
| 243 | |
| 244 | def test_committed_at_present_in_schema(self, repo: pathlib.Path) -> None: |
| 245 | result = _branch(repo, "--json") |
| 246 | data = json.loads(result.output) |
| 247 | assert len(data) >= 1 |
| 248 | assert "committed_at" in data[0], ( |
| 249 | "committed_at missing from branch --json output" |
| 250 | ) |
| 251 | |
| 252 | def test_committed_at_is_iso8601(self, repo: pathlib.Path) -> None: |
| 253 | import datetime |
| 254 | result = _branch(repo, "--json") |
| 255 | data = json.loads(result.output) |
| 256 | main = next(b for b in data if b["name"] == "main") |
| 257 | ts = main["committed_at"] |
| 258 | assert ts is not None |
| 259 | # Must parse as ISO 8601 |
| 260 | datetime.datetime.fromisoformat(ts) |
| 261 | |
| 262 | def test_committed_at_null_for_empty_branch(self, repo: pathlib.Path) -> None: |
| 263 | """Branches pointing at no commit (empty branch) get null committed_at.""" |
| 264 | # Force an empty branch by writing an empty ref file directly |
| 265 | ref = heads_dir(repo) / "empty-branch" |
| 266 | ref.write_text("") |
| 267 | result = _branch(repo, "--json") |
| 268 | data = json.loads(result.output) |
| 269 | eb = next((b for b in data if b["name"] == "empty-branch"), None) |
| 270 | assert eb is not None |
| 271 | assert eb["committed_at"] is None |
| 272 | |
| 273 | def test_committed_at_schema_complete(self, repo: pathlib.Path) -> None: |
| 274 | result = _branch(repo, "--json") |
| 275 | data = json.loads(result.output) |
| 276 | required = {"name", "current", "commit_id", "committed_at", |
| 277 | "last_message", "upstream"} |
| 278 | missing = required - set(data[0].keys()) |
| 279 | assert not missing, f"branch --json missing fields: {missing}" |
| 280 | |
| 281 | |
| 282 | # --------------------------------------------------------------------------- |
| 283 | # Integration: --sort committeddate actual ordering |
| 284 | # --------------------------------------------------------------------------- |
| 285 | |
| 286 | |
| 287 | class TestSortCommittedDateOrdering: |
| 288 | """--sort committeddate must emit branches newest-first.""" |
| 289 | |
| 290 | def test_newer_branch_first(self, repo: pathlib.Path) -> None: |
| 291 | # Create a branch at initial commit, then add another commit on main |
| 292 | _branch(repo, "older-branch") |
| 293 | (repo / "c.py").write_text("c=3\n") |
| 294 | _commit(repo, "-m", "newer commit") |
| 295 | _branch(repo, "newer-branch") |
| 296 | |
| 297 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 298 | assert result.exit_code == 0 |
| 299 | data = json.loads(result.output) |
| 300 | names = [b["name"] for b in data] |
| 301 | assert names.index("newer-branch") < names.index("older-branch"), ( |
| 302 | f"newer-branch should sort before older-branch; got order: {names}" |
| 303 | ) |
| 304 | |
| 305 | def test_timestamp_order_consistent_on_ties(self, repo: pathlib.Path) -> None: |
| 306 | """Branches at the same commit produce a stable (name-secondary) order.""" |
| 307 | for name in ("z-same", "a-same", "m-same"): |
| 308 | _branch(repo, name) |
| 309 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 310 | data = json.loads(result.output) |
| 311 | assert result.exit_code == 0 |
| 312 | assert len(data) >= 4 |
| 313 | |
| 314 | def test_empty_branches_sorted_last(self, repo: pathlib.Path) -> None: |
| 315 | """Branches with no commit (committed_at=null) come after dated branches.""" |
| 316 | _branch(repo, "real-commit-branch") |
| 317 | # Write an empty ref manually |
| 318 | (heads_dir(repo) / "no-commit").write_text("") |
| 319 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 320 | data = json.loads(result.output) |
| 321 | names_with_ts = [b["name"] for b in data if b.get("committed_at")] |
| 322 | names_without_ts = [b["name"] for b in data if not b.get("committed_at")] |
| 323 | if names_with_ts and names_without_ts: |
| 324 | last_with_ts = data.index(next(b for b in data if b["name"] == names_with_ts[-1])) |
| 325 | first_without_ts = data.index(next(b for b in data if b["name"] == names_without_ts[0])) |
| 326 | assert last_with_ts < first_without_ts, ( |
| 327 | "Branches with no commit should sort after dated branches" |
| 328 | ) |
| 329 | |
| 330 | |
| 331 | # --------------------------------------------------------------------------- |
| 332 | # Integration: remote-tracking branches (-r / -a / -dr) |
| 333 | # --------------------------------------------------------------------------- |
| 334 | |
| 335 | |
| 336 | class TestRemoteTrackingBranches: |
| 337 | def test_list_r_shows_only_remotes(self, repo: pathlib.Path) -> None: |
| 338 | cid = get_head_commit_id(repo, "main") |
| 339 | _make_remote_ref(repo, "origin", "main", cid) |
| 340 | result = _branch(repo, "-r", "--json") |
| 341 | assert result.exit_code == 0 |
| 342 | data = json.loads(result.output) |
| 343 | names = [b["name"] for b in data] |
| 344 | # Remote entries are prefixed with remotes/ |
| 345 | assert all(n.startswith("remotes/") for n in names), ( |
| 346 | f"-r listing must only contain remote entries; got: {names}" |
| 347 | ) |
| 348 | assert "remotes/origin/main" in names |
| 349 | |
| 350 | def test_list_r_excludes_local_branches(self, repo: pathlib.Path) -> None: |
| 351 | cid = get_head_commit_id(repo, "main") |
| 352 | _make_remote_ref(repo, "origin", "main", cid) |
| 353 | _branch(repo, "local-only") |
| 354 | result = _branch(repo, "-r", "--json") |
| 355 | data = json.loads(result.output) |
| 356 | names = [b["name"] for b in data] |
| 357 | assert "local-only" not in names |
| 358 | |
| 359 | def test_list_a_includes_both(self, repo: pathlib.Path) -> None: |
| 360 | cid = get_head_commit_id(repo, "main") |
| 361 | _make_remote_ref(repo, "origin", "dev", cid) |
| 362 | _branch(repo, "local-feat") |
| 363 | result = _branch(repo, "-a", "--json") |
| 364 | assert result.exit_code == 0 |
| 365 | data = json.loads(result.output) |
| 366 | names = [b["name"] for b in data] |
| 367 | assert "main" in names |
| 368 | assert "local-feat" in names |
| 369 | assert "remotes/origin/dev" in names |
| 370 | |
| 371 | def test_list_r_empty_when_no_remotes(self, repo: pathlib.Path) -> None: |
| 372 | result = _branch(repo, "-r", "--json") |
| 373 | assert result.exit_code == 0 |
| 374 | assert json.loads(result.output) == [] |
| 375 | |
| 376 | def test_list_r_nested_remote_branch(self, repo: pathlib.Path) -> None: |
| 377 | cid = get_head_commit_id(repo, "main") |
| 378 | _make_remote_ref(repo, "origin", "feat/task", cid) |
| 379 | result = _branch(repo, "-r", "--json") |
| 380 | data = json.loads(result.output) |
| 381 | names = [b["name"] for b in data] |
| 382 | assert "remotes/origin/feat/task" in names |
| 383 | |
| 384 | def test_list_a_sorted_by_name(self, repo: pathlib.Path) -> None: |
| 385 | cid = get_head_commit_id(repo, "main") |
| 386 | _make_remote_ref(repo, "origin", "z-remote", cid) |
| 387 | _make_remote_ref(repo, "origin", "a-remote", cid) |
| 388 | _branch(repo, "m-local") |
| 389 | result = _branch(repo, "-a", "--json") |
| 390 | data = json.loads(result.output) |
| 391 | names = [b["name"] for b in data] |
| 392 | # Local branches come first (alphabetically), then remotes/ |
| 393 | local_names = [n for n in names if not n.startswith("remotes/")] |
| 394 | assert local_names == sorted(local_names) |
| 395 | |
| 396 | def test_dr_deletes_remote_tracking_ref(self, repo: pathlib.Path) -> None: |
| 397 | cid = get_head_commit_id(repo, "main") |
| 398 | _make_remote_ref(repo, "origin", "stale", cid) |
| 399 | # Verify it's visible |
| 400 | before = json.loads(_branch(repo, "-r", "--json").output) |
| 401 | assert any(b["name"] == "remotes/origin/stale" for b in before) |
| 402 | # Delete it |
| 403 | result = _branch(repo, "-d", "-r", "origin/stale") |
| 404 | assert result.exit_code == 0 |
| 405 | # Gone now |
| 406 | after = json.loads(_branch(repo, "-r", "--json").output) |
| 407 | assert not any(b["name"] == "remotes/origin/stale" for b in after) |
| 408 | |
| 409 | def test_dr_json_schema(self, repo: pathlib.Path) -> None: |
| 410 | cid = get_head_commit_id(repo, "main") |
| 411 | _make_remote_ref(repo, "origin", "old", cid) |
| 412 | result = _branch(repo, "-d", "-r", "origin/old", "--json") |
| 413 | assert result.exit_code == 0 |
| 414 | data = json.loads(result.output) |
| 415 | assert data["action"] == "deleted_remote_tracking" |
| 416 | assert data["remote"] == "origin" |
| 417 | assert data["branch"] == "old" |
| 418 | |
| 419 | def test_dr_remotes_prefix_accepted(self, repo: pathlib.Path) -> None: |
| 420 | """remotes/origin/branch spelling accepted in addition to origin/branch.""" |
| 421 | cid = get_head_commit_id(repo, "main") |
| 422 | _make_remote_ref(repo, "origin", "with-prefix", cid) |
| 423 | result = _branch(repo, "-d", "-r", "remotes/origin/with-prefix") |
| 424 | assert result.exit_code == 0 |
| 425 | |
| 426 | def test_dr_nonexistent_exits_1(self, repo: pathlib.Path) -> None: |
| 427 | result = _branch(repo, "-d", "-r", "origin/ghost") |
| 428 | assert result.exit_code == 1 |
| 429 | |
| 430 | def test_dr_no_slash_exits_1(self, repo: pathlib.Path) -> None: |
| 431 | result = _branch(repo, "-d", "-r", "justaname") |
| 432 | assert result.exit_code == 1 |
| 433 | |
| 434 | |
| 435 | # --------------------------------------------------------------------------- |
| 436 | # Integration: -vv upstream display |
| 437 | # --------------------------------------------------------------------------- |
| 438 | |
| 439 | |
| 440 | class TestVerboseUpstream: |
| 441 | def _set_upstream(self, repo: pathlib.Path, branch: str, |
| 442 | remote: str, remote_branch: str) -> None: |
| 443 | config_path = config_toml_path(repo) |
| 444 | existing = config_path.read_text() if config_path.exists() else "" |
| 445 | existing += ( |
| 446 | f'\n[branch."{branch}"]\n' |
| 447 | f'remote = "{remote}"\n' |
| 448 | f'merge = "refs/heads/{remote_branch}"\n' |
| 449 | ) |
| 450 | config_path.write_text(existing) |
| 451 | |
| 452 | def test_vv_shows_upstream(self, repo: pathlib.Path) -> None: |
| 453 | self._set_upstream(repo, "main", "origin", "main") |
| 454 | result = _branch(repo, "-vv") |
| 455 | assert result.exit_code == 0 |
| 456 | assert "origin/main" in result.output |
| 457 | |
| 458 | def test_vv_upstream_in_brackets(self, repo: pathlib.Path) -> None: |
| 459 | self._set_upstream(repo, "main", "origin", "main") |
| 460 | result = _branch(repo, "-vv") |
| 461 | assert "[origin/main]" in result.output |
| 462 | |
| 463 | def test_v_does_not_show_upstream(self, repo: pathlib.Path) -> None: |
| 464 | self._set_upstream(repo, "main", "origin", "main") |
| 465 | result = _branch(repo, "-v") |
| 466 | # -v shows commit SHA + message but NOT upstream brackets |
| 467 | assert "[origin/main]" not in result.output |
| 468 | |
| 469 | def test_vv_no_upstream_no_brackets(self, repo: pathlib.Path) -> None: |
| 470 | result = _branch(repo, "-vv") |
| 471 | assert "[" not in result.output |
| 472 | |
| 473 | |
| 474 | # --------------------------------------------------------------------------- |
| 475 | # Integration: Diamond-merge DAG correctness for --merged |
| 476 | # --------------------------------------------------------------------------- |
| 477 | |
| 478 | |
| 479 | class TestDiamondMergeDag: |
| 480 | """--merged must handle merge commits with two parents (parent2_commit_id).""" |
| 481 | |
| 482 | def test_merged_branch_included_after_diamond_merge( |
| 483 | self, repo: pathlib.Path |
| 484 | ) -> None: |
| 485 | """ |
| 486 | Build diamond: main ← feat-a, main ← feat-b, then merge feat-a into feat-b. |
| 487 | After merging feat-a into main via feat-b, --merged should include feat-a. |
| 488 | |
| 489 | main ── C1 |
| 490 | \\ |
| 491 | feat-a ── C2 |
| 492 | \\ |
| 493 | main (merged feat-a) ── C3 |
| 494 | """ |
| 495 | # Create and diverge feat-a |
| 496 | _branch(repo, "feat-a") |
| 497 | _invoke(repo, ["checkout", "feat-a"]) |
| 498 | (repo / "fa.py").write_text("fa=1\n") |
| 499 | _commit(repo, "-m", "feat-a commit") |
| 500 | _invoke(repo, ["checkout", "main"]) |
| 501 | # Merge feat-a into main |
| 502 | _invoke(repo, ["merge", "feat-a"]) |
| 503 | # Now --merged on main should include feat-a |
| 504 | result = _branch(repo, "--merged", "--json") |
| 505 | assert result.exit_code == 0 |
| 506 | data = json.loads(result.output) |
| 507 | names = [b["name"] for b in data] |
| 508 | assert "feat-a" in names, f"feat-a should be merged into main; got: {names}" |
| 509 | |
| 510 | def test_unmerged_sibling_excluded_from_diamond( |
| 511 | self, repo: pathlib.Path |
| 512 | ) -> None: |
| 513 | """Two branches from same point; merging one doesn't include the other.""" |
| 514 | _branch(repo, "merged-branch") |
| 515 | _branch(repo, "unmerged-branch") |
| 516 | |
| 517 | _invoke(repo, ["checkout", "merged-branch"]) |
| 518 | (repo / "mb.py").write_text("mb=1\n") |
| 519 | _commit(repo, "-m", "merged commit") |
| 520 | |
| 521 | _invoke(repo, ["checkout", "unmerged-branch"]) |
| 522 | (repo / "ub.py").write_text("ub=1\n") |
| 523 | _commit(repo, "-m", "unmerged commit") |
| 524 | |
| 525 | _invoke(repo, ["checkout", "main"]) |
| 526 | _invoke(repo, ["merge", "merged-branch"]) |
| 527 | |
| 528 | result = _branch(repo, "--merged", "--json") |
| 529 | data = json.loads(result.output) |
| 530 | names = [b["name"] for b in data] |
| 531 | assert "merged-branch" in names |
| 532 | assert "unmerged-branch" not in names |
| 533 | |
| 534 | |
| 535 | # --------------------------------------------------------------------------- |
| 536 | # Data integrity: nested branch cleanup + deep rename |
| 537 | # --------------------------------------------------------------------------- |
| 538 | |
| 539 | |
| 540 | class TestDataIntegrityNested: |
| 541 | def test_delete_nested_cleans_parent_dirs(self, repo: pathlib.Path) -> None: |
| 542 | """Deleting feat/sub/task must remove the now-empty feat/sub/ and feat/ dirs.""" |
| 543 | _branch(repo, "feat/sub/task") |
| 544 | result = _branch(repo, "-D", "feat/sub/task") |
| 545 | assert result.exit_code == 0 |
| 546 | heads = heads_dir(repo) |
| 547 | assert not (heads / "feat").exists(), ( |
| 548 | "feat/ directory should be removed after deleting feat/sub/task" |
| 549 | ) |
| 550 | |
| 551 | def test_delete_nested_keeps_sibling_dir(self, repo: pathlib.Path) -> None: |
| 552 | """Deleting one nested branch must not remove a sibling.""" |
| 553 | _branch(repo, "feat/sub/a") |
| 554 | _branch(repo, "feat/sub/b") |
| 555 | _branch(repo, "-D", "feat/sub/a") |
| 556 | heads = heads_dir(repo) |
| 557 | assert (heads / "feat" / "sub" / "b").exists() |
| 558 | |
| 559 | def test_rename_into_nested_path_creates_dirs(self, repo: pathlib.Path) -> None: |
| 560 | """Renaming a flat branch to a nested path must create intermediate dirs.""" |
| 561 | _branch(repo, "flat-branch") |
| 562 | result = _branch(repo, "-m", "flat-branch", "deep/nested/branch") |
| 563 | assert result.exit_code == 0 |
| 564 | heads = heads_dir(repo) |
| 565 | assert (heads / "deep" / "nested" / "branch").is_file() |
| 566 | |
| 567 | def test_force_rename_preserves_tip(self, repo: pathlib.Path) -> None: |
| 568 | """Force-rename must not lose the commit pointer.""" |
| 569 | cid_before = get_head_commit_id(repo, "main") |
| 570 | _branch(repo, "original") |
| 571 | _branch(repo, "destination") |
| 572 | _branch(repo, "-M", "original", "destination") |
| 573 | cid_after = get_head_commit_id(repo, "destination") |
| 574 | assert cid_after == cid_before |
| 575 | |
| 576 | def test_force_copy_preserves_src_tip(self, repo: pathlib.Path) -> None: |
| 577 | """Force-copy must not modify the source branch.""" |
| 578 | cid_src = get_head_commit_id(repo, "main") |
| 579 | _branch(repo, "src-branch") |
| 580 | (repo / "x.py").write_text("x=99\n") |
| 581 | _commit(repo, "-m", "extra commit") |
| 582 | _branch(repo, "dst-branch") |
| 583 | # Force-copy src-branch (old tip) onto dst-branch |
| 584 | _branch(repo, "-C", "src-branch", "dst-branch") |
| 585 | assert get_head_commit_id(repo, "src-branch") == cid_src |
| 586 | |
| 587 | def test_rename_current_branch_updates_head(self, repo: pathlib.Path) -> None: |
| 588 | """Renaming the currently checked-out branch must update HEAD.""" |
| 589 | _invoke(repo, ["checkout", "-b", "temp-branch"]) |
| 590 | _branch(repo, "-m", "temp-branch", "renamed-branch") |
| 591 | assert read_current_branch(repo) == "renamed-branch" |
| 592 | |
| 593 | |
| 594 | # --------------------------------------------------------------------------- |
| 595 | # Data integrity: JSON error schemas |
| 596 | # --------------------------------------------------------------------------- |
| 597 | |
| 598 | |
| 599 | class TestJsonErrorSchemas: |
| 600 | """Mutation errors must emit structured JSON with error + message keys.""" |
| 601 | |
| 602 | def test_delete_not_found_json_schema(self, repo: pathlib.Path) -> None: |
| 603 | result = _branch(repo, "-d", "ghost", "--json") |
| 604 | assert result.exit_code == 1 |
| 605 | data = _first_json(result) |
| 606 | assert "error" in data |
| 607 | assert "message" in data |
| 608 | |
| 609 | def test_delete_not_merged_json_schema(self, repo: pathlib.Path) -> None: |
| 610 | _branch(repo, "unmerged") |
| 611 | _invoke(repo, ["checkout", "unmerged"]) |
| 612 | (repo / "z.py").write_text("z=1\n") |
| 613 | _commit(repo, "-m", "diverge") |
| 614 | _invoke(repo, ["checkout", "main"]) |
| 615 | result = _branch(repo, "-d", "unmerged", "--json") |
| 616 | assert result.exit_code == 1 |
| 617 | data = _first_json(result) |
| 618 | assert data.get("error") == "not_merged" |
| 619 | assert "hint" in data # must tell user about -D |
| 620 | |
| 621 | def test_delete_current_branch_json_schema(self, repo: pathlib.Path) -> None: |
| 622 | result = _branch(repo, "-d", "main", "--json") |
| 623 | assert result.exit_code == 1 |
| 624 | data = _first_json(result) |
| 625 | assert data.get("error") == "current_branch" |
| 626 | |
| 627 | def test_rename_not_found_json_schema(self, repo: pathlib.Path) -> None: |
| 628 | result = _branch(repo, "-m", "ghost", "new", "--json") |
| 629 | assert result.exit_code == 1 |
| 630 | data = _first_json(result) |
| 631 | assert data.get("error") == "not_found" |
| 632 | |
| 633 | def test_rename_already_exists_json_schema(self, repo: pathlib.Path) -> None: |
| 634 | _branch(repo, "a") |
| 635 | _branch(repo, "b") |
| 636 | result = _branch(repo, "-m", "a", "b", "--json") |
| 637 | assert result.exit_code == 1 |
| 638 | data = _first_json(result) |
| 639 | assert data.get("error") == "already_exists" |
| 640 | assert "hint" in data # must tell user about -M |
| 641 | |
| 642 | def test_copy_not_found_json_schema(self, repo: pathlib.Path) -> None: |
| 643 | result = _branch(repo, "-c", "ghost", "copy", "--json") |
| 644 | assert result.exit_code == 1 |
| 645 | data = _first_json(result) |
| 646 | assert data.get("error") == "not_found" |
| 647 | |
| 648 | def test_create_already_exists_json_schema(self, repo: pathlib.Path) -> None: |
| 649 | result = _branch(repo, "main", "--json") |
| 650 | assert result.exit_code == 1 |
| 651 | data = _first_json(result) |
| 652 | assert data.get("error") == "already_exists" |
| 653 | |
| 654 | |
| 655 | # --------------------------------------------------------------------------- |
| 656 | # Integration: create JSON schema |
| 657 | # --------------------------------------------------------------------------- |
| 658 | |
| 659 | |
| 660 | class TestCreateJsonSchema: |
| 661 | def test_create_json_schema_complete(self, repo: pathlib.Path) -> None: |
| 662 | result = _branch(repo, "new-branch", "--json") |
| 663 | assert result.exit_code == 0 |
| 664 | data = json.loads(result.output) |
| 665 | assert data["action"] == "created" |
| 666 | assert "branch" in data |
| 667 | assert "commit_id" in data |
| 668 | assert "from" in data |
| 669 | |
| 670 | def test_create_commit_id_is_sha256_prefixed(self, repo: pathlib.Path) -> None: |
| 671 | result = _branch(repo, "sha-check", "--json") |
| 672 | data = json.loads(result.output) |
| 673 | cid = data.get("commit_id") |
| 674 | assert cid is not None |
| 675 | assert cid.startswith("sha256:"), f"commit_id should have sha256: prefix; got {cid!r}" |
| 676 | |
| 677 | def test_create_from_is_null_when_no_start_point(self, repo: pathlib.Path) -> None: |
| 678 | result = _branch(repo, "no-sp", "--json") |
| 679 | data = json.loads(result.output) |
| 680 | assert data.get("from") is None |
| 681 | |
| 682 | def test_create_from_set_when_start_point_given(self, repo: pathlib.Path) -> None: |
| 683 | cid = get_head_commit_id(repo, "main") |
| 684 | result = _branch(repo, "from-sp", cid, "--json") |
| 685 | data = json.loads(result.output) |
| 686 | assert data.get("from") == cid |
| 687 | |
| 688 | |
| 689 | # --------------------------------------------------------------------------- |
| 690 | # Security: ANSI in filter flags |
| 691 | # --------------------------------------------------------------------------- |
| 692 | |
| 693 | |
| 694 | class TestSecurityFilterFlags: |
| 695 | def _has_ansi(self, s: str) -> bool: |
| 696 | return "\x1b[" in s |
| 697 | |
| 698 | def test_ansi_in_merged_ref_rejected_or_sanitized(self, repo: pathlib.Path) -> None: |
| 699 | result = _branch(repo, "--merged", "\x1b[31mmalicious\x1b[0m") |
| 700 | assert not self._has_ansi(result.output) |
| 701 | |
| 702 | def test_ansi_in_no_merged_ref(self, repo: pathlib.Path) -> None: |
| 703 | result = _branch(repo, "--no-merged", "\x1b[31mmalicious\x1b[0m") |
| 704 | assert not self._has_ansi(result.output) |
| 705 | |
| 706 | def test_ansi_in_contains_ref(self, repo: pathlib.Path) -> None: |
| 707 | result = _branch(repo, "--contains", "\x1b[31mmalicious\x1b[0m") |
| 708 | assert not self._has_ansi(result.output) |
| 709 | |
| 710 | def test_newline_in_branch_name_rejected(self, repo: pathlib.Path) -> None: |
| 711 | result = _branch(repo, "branch\nmalicious") |
| 712 | assert result.exit_code == 1 |
| 713 | |
| 714 | def test_ansi_in_delete_json_error_sanitized(self, repo: pathlib.Path) -> None: |
| 715 | result = _branch(repo, "-d", "\x1b[31mmalicious\x1b[0m", "--json") |
| 716 | assert result.exit_code == 1 |
| 717 | assert not self._has_ansi(result.output) |
| 718 | |
| 719 | |
| 720 | # --------------------------------------------------------------------------- |
| 721 | # Performance: --sort committeddate with 50 branches |
| 722 | # --------------------------------------------------------------------------- |
| 723 | |
| 724 | |
| 725 | class TestSortCommittedDatePerformance: |
| 726 | def test_50_branches_committeddate_under_3s(self, repo: pathlib.Path) -> None: |
| 727 | for i in range(50): |
| 728 | _branch(repo, f"perf/branch-{i:03d}") |
| 729 | |
| 730 | start = time.monotonic() |
| 731 | result = _branch(repo, "--sort", "committeddate", "--json") |
| 732 | elapsed = time.monotonic() - start |
| 733 | |
| 734 | assert result.exit_code == 0 |
| 735 | data = json.loads(result.output) |
| 736 | assert len(data) == 51 # main + 50 |
| 737 | assert elapsed < 3.0, f"--sort committeddate with 51 branches took {elapsed:.2f}s" |
| 738 | |
| 739 | |
| 740 | # --------------------------------------------------------------------------- |
| 741 | # Docstrings |
| 742 | # --------------------------------------------------------------------------- |
| 743 | |
| 744 | |
| 745 | class TestDocstrings: |
| 746 | def _has_doc(self, obj: _AnyObj) -> bool: |
| 747 | import inspect |
| 748 | doc = inspect.getdoc(obj) |
| 749 | return bool(doc and len(doc.strip()) > 10) |
| 750 | |
| 751 | def test_module_docstring(self) -> None: |
| 752 | import muse.cli.commands.branch as m |
| 753 | assert self._has_doc(m) |
| 754 | |
| 755 | def test_ref_file_docstring(self) -> None: |
| 756 | from muse.cli.commands.branch import _ref_file |
| 757 | assert self._has_doc(_ref_file) |
| 758 | |
| 759 | def test_list_local_branches_docstring(self) -> None: |
| 760 | from muse.cli.commands.branch import _list_local_branches |
| 761 | assert self._has_doc(_list_local_branches) |
| 762 | |
| 763 | def test_list_remotes_docstring(self) -> None: |
| 764 | from muse.cli.commands.branch import _list_remotes |
| 765 | assert self._has_doc(_list_remotes) |
| 766 | |
| 767 | def test_upstream_for_docstring(self) -> None: |
| 768 | from muse.cli.commands.branch import _upstream_for |
| 769 | assert self._has_doc(_upstream_for) |
| 770 | |
| 771 | def test_commit_ancestors_docstring(self) -> None: |
| 772 | from muse.cli.commands.branch import _commit_ancestors |
| 773 | assert self._has_doc(_commit_ancestors) |
| 774 | |
| 775 | def test_is_merged_docstring(self) -> None: |
| 776 | from muse.cli.commands.branch import _is_merged |
| 777 | assert self._has_doc(_is_merged) |
| 778 | |
| 779 | def test_contains_commit_docstring(self) -> None: |
| 780 | from muse.cli.commands.branch import _contains_commit |
| 781 | assert self._has_doc(_contains_commit) |
| 782 | |
| 783 | def test_cleanup_empty_dirs_docstring(self) -> None: |
| 784 | from muse.cli.commands.branch import _cleanup_empty_dirs |
| 785 | assert self._has_doc(_cleanup_empty_dirs) |
| 786 | |
| 787 | def test_resolve_start_point_docstring(self) -> None: |
| 788 | from muse.cli.commands.branch import _resolve_start_point |
| 789 | assert self._has_doc(_resolve_start_point) |
| 790 | |
| 791 | def test_run_docstring(self) -> None: |
| 792 | from muse.cli.commands.branch import run |
| 793 | assert self._has_doc(run) |
File History
1 commit
sha256:f81d1ddcc1eec1eaccc667f22067476a0cf138860691611738c0c799df24a1db
fixing more failing tests
Human
3 days ago