branch.py
python
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29
fix(branch): guard -d --dry-run against destructive writes;…
Sonnet 4.6
patch
2 days ago
| 1 | """``muse branch`` — list, create, rename, copy, and delete branches. |
| 2 | |
| 3 | Git-idiomatic flags:: |
| 4 | |
| 5 | muse branch # list all local branches |
| 6 | muse branch <name> # create branch at HEAD |
| 7 | muse branch <name> <start-point> # create at commit SHA, SHA prefix, or branch |
| 8 | muse branch -d <name> # safe delete (must be merged) |
| 9 | muse branch -D <name> # force delete |
| 10 | muse branch -dr <remote>/<branch> # delete local remote-tracking ref (no server call) |
| 11 | muse branch -Dr <remote>/<branch> # same, force (no merge check) |
| 12 | muse branch -m [<old>] <new> # rename (safe) |
| 13 | muse branch -M [<old>] <new> # rename (force) |
| 14 | muse branch -c [<src>] <dest> # copy (safe) |
| 15 | muse branch -C [<src>] <dest> # copy (force) |
| 16 | muse branch -v # list with last commit SHA + subject |
| 17 | muse branch -vv # also show upstream tracking ref |
| 18 | muse branch -r # list remote-tracking branches |
| 19 | muse branch -a # list local + remote-tracking branches |
| 20 | muse branch --merged [<commit>] # only branches merged into commit |
| 21 | muse branch --no-merged [<commit>] # only branches NOT merged into commit |
| 22 | muse branch --contains <commit> # only branches that contain commit |
| 23 | muse branch --sort name # sort by name (default) |
| 24 | muse branch --sort committeddate # sort by date of most recent commit |
| 25 | |
| 26 | To delete a branch on the remote **and** prune the local tracking ref in one |
| 27 | step, use ``muse push``:: |
| 28 | |
| 29 | muse push <remote> --delete <branch> |
| 30 | |
| 31 | Agents should pass ``--format json`` (or ``--json``) for machine-readable |
| 32 | output on all operations. The listing JSON schema is:: |
| 33 | |
| 34 | [ |
| 35 | { |
| 36 | "name": "feat/my-thing", |
| 37 | "current": false, |
| 38 | "commit_id": "<sha256> | null", |
| 39 | "committed_at": "2026-03-21T12:00:00+00:00 | null", |
| 40 | "last_message": "Add feature X", |
| 41 | "upstream": "origin/feat/my-thing" |
| 42 | }, |
| 43 | ... |
| 44 | ] |
| 45 | |
| 46 | Exit codes:: |
| 47 | |
| 48 | 0 — success |
| 49 | 1 — invalid branch name, branch not found, attempting to delete checked-out branch |
| 50 | """ |
| 51 | |
| 52 | import argparse |
| 53 | import json |
| 54 | import logging |
| 55 | import pathlib |
| 56 | import sys |
| 57 | import tomllib |
| 58 | from typing import TypedDict |
| 59 | |
| 60 | from muse.cli.config import delete_branch_meta, get_protected_branches, get_remote_head, is_branch_protected, read_branch_meta, write_branch_meta |
| 61 | from muse.core.reflog import append_reflog |
| 62 | from muse.core.types import MsgpackDict, short_id |
| 63 | from muse.core.paths import ref_path as _ref_path, heads_dir as _heads_dir, remotes_dir as _remotes_dir, config_toml_path as _config_toml_path, reflog_branch_path as _reflog_branch_path, reflog_heads_dir as _reflog_heads_dir |
| 64 | from muse.core.envelope import EnvelopeJson, make_envelope |
| 65 | from muse.core.timing import start_timer |
| 66 | from muse.core.errors import ExitCode |
| 67 | from muse.core.repo import require_repo |
| 68 | from muse.core.refs import read_ref |
| 69 | from muse.core.io import write_text_atomic |
| 70 | from muse.core.refs import ( |
| 71 | get_head_commit_id, |
| 72 | read_current_branch, |
| 73 | write_branch_ref, |
| 74 | write_head_branch, |
| 75 | ) |
| 76 | from muse.core.commits import ( |
| 77 | read_commit, |
| 78 | resolve_commit_ref, |
| 79 | ) |
| 80 | from muse.core.validation import clamp_int, sanitize_display, validate_branch_name |
| 81 | |
| 82 | type _Payload = dict[str, str | None] |
| 83 | logger = logging.getLogger(__name__) |
| 84 | |
| 85 | class _BranchCreateJson(EnvelopeJson): |
| 86 | """JSON output for ``muse branch -b <name> --json``.""" |
| 87 | |
| 88 | action: str |
| 89 | branch: str |
| 90 | commit_id: str | None |
| 91 | intent: str | None |
| 92 | resumable: bool |
| 93 | |
| 94 | class _BranchEntryJson(TypedDict): |
| 95 | name: str |
| 96 | current: bool |
| 97 | commit_id: str | None |
| 98 | committed_at: str | None |
| 99 | last_message: str | None |
| 100 | upstream: str | None |
| 101 | intent: str | None |
| 102 | resumable: bool |
| 103 | created_by: str | None |
| 104 | |
| 105 | class _BranchListJson(EnvelopeJson): |
| 106 | """JSON output for ``muse branch --json``.""" |
| 107 | |
| 108 | branches: list[_BranchEntryJson] |
| 109 | |
| 110 | |
| 111 | class _PruneConfigJson(TypedDict): |
| 112 | """JSON output for ``muse branch --prune-config``.""" |
| 113 | |
| 114 | action: str |
| 115 | pruned: int |
| 116 | kept: int |
| 117 | dry_run: bool |
| 118 | pruned_branches: list[str] |
| 119 | |
| 120 | # --------------------------------------------------------------------------- |
| 121 | # ANSI helpers — emitted only when stdout is a TTY. |
| 122 | # --------------------------------------------------------------------------- |
| 123 | |
| 124 | _RESET = "\033[0m" |
| 125 | _BOLD = "\033[1m" |
| 126 | _DIM = "\033[2m" |
| 127 | _GREEN = "\033[32m" |
| 128 | _RED = "\033[31m" |
| 129 | _YELLOW = "\033[33m" |
| 130 | _CYAN = "\033[36m" |
| 131 | |
| 132 | def _c(text: str, *codes: str, tty: bool) -> str: |
| 133 | """Wrap *text* in ANSI escape *codes* only when writing to a TTY.""" |
| 134 | if not tty: |
| 135 | return text |
| 136 | return "".join(codes) + text + _RESET |
| 137 | |
| 138 | # --------------------------------------------------------------------------- |
| 139 | # Internal helpers |
| 140 | # --------------------------------------------------------------------------- |
| 141 | |
| 142 | def _ref_file(root: pathlib.Path, branch: str) -> pathlib.Path: |
| 143 | """Return the ref-file path for a local branch.""" |
| 144 | return _ref_path(root, branch) |
| 145 | |
| 146 | def _list_local_branches(root: pathlib.Path) -> list[str]: |
| 147 | """Return a sorted list of all local branch names. |
| 148 | |
| 149 | Only plain files are considered; directories, symlinks and any file not |
| 150 | directly under ``refs/heads/`` (e.g. lock files) are silently skipped. |
| 151 | """ |
| 152 | heads_dir = _heads_dir(root) |
| 153 | if not heads_dir.exists(): |
| 154 | return [] |
| 155 | return sorted( |
| 156 | p.relative_to(heads_dir).as_posix() |
| 157 | for p in heads_dir.rglob("*") |
| 158 | if p.is_file() and not p.name.startswith(".") |
| 159 | ) |
| 160 | |
| 161 | def _list_remotes(root: pathlib.Path) -> list[str]: |
| 162 | """Return sorted remote-tracking branch names as ``remote/branch``. |
| 163 | |
| 164 | Only plain files are visited; symlinks, hidden files, and directories |
| 165 | are skipped to avoid leaking internal artefacts into the listing. |
| 166 | """ |
| 167 | remotes_dir = _remotes_dir(root) |
| 168 | if not remotes_dir.exists(): |
| 169 | return [] |
| 170 | results: list[str] = [] |
| 171 | for remote_dir in sorted(remotes_dir.iterdir()): |
| 172 | if not remote_dir.is_dir(): |
| 173 | continue |
| 174 | remote = remote_dir.name |
| 175 | for ref_file in sorted(remote_dir.rglob("*")): |
| 176 | if ref_file.is_file() and not ref_file.name.startswith("."): |
| 177 | branch_rel = ref_file.relative_to(remote_dir).as_posix() |
| 178 | results.append(f"{remote}/{branch_rel}") |
| 179 | return results |
| 180 | |
| 181 | def _resolve_commit_id(root: pathlib.Path, b: str) -> str: |
| 182 | """Return the current commit ID for a branch listing entry. |
| 183 | |
| 184 | *b* is the display name (e.g. ``"main"`` or ``"remotes/origin/dev"``). |
| 185 | Remote entries are read from the remote tracking file under |
| 186 | ``.muse/remotes/``; local entries use the standard head ref. |
| 187 | """ |
| 188 | if b.startswith("remotes/"): |
| 189 | rest = b.removeprefix("remotes/") |
| 190 | remote, _, branch_name = rest.partition("/") |
| 191 | if branch_name: |
| 192 | return get_remote_head(remote, branch_name, root) or "" |
| 193 | return get_head_commit_id(root, b) or "" |
| 194 | |
| 195 | def _upstream_for(root: pathlib.Path, branch: str) -> str | None: |
| 196 | """Return the upstream tracking ref for *branch*, or ``None`` if unset.""" |
| 197 | config_path = _config_toml_path(root) |
| 198 | if not config_path.exists(): |
| 199 | return None |
| 200 | try: |
| 201 | with config_path.open("rb") as f: |
| 202 | config = tomllib.load(f) |
| 203 | section = config.get("branch", {}).get(branch, {}) |
| 204 | remote: str | None = section.get("remote") |
| 205 | merge_ref: str | None = section.get("merge") |
| 206 | if remote and merge_ref: |
| 207 | short = merge_ref.removeprefix("refs/heads/") |
| 208 | return f"{remote}/{short}" |
| 209 | except Exception: |
| 210 | pass |
| 211 | return None |
| 212 | |
| 213 | def _commit_ancestors(root: pathlib.Path, commit_id: str) -> set[str]: |
| 214 | """Return the set of all commit IDs reachable from *commit_id* (inclusive).""" |
| 215 | from muse.core.graph import ancestor_ids |
| 216 | return ancestor_ids(root, commit_id) |
| 217 | |
| 218 | def _is_merged(root: pathlib.Path, branch: str, into: str) -> bool: |
| 219 | """Return ``True`` if the tip of *branch* is an ancestor of the tip of *into*.""" |
| 220 | branch_tip = get_head_commit_id(root, branch) |
| 221 | into_tip = get_head_commit_id(root, into) |
| 222 | if branch_tip is None or into_tip is None: |
| 223 | return False |
| 224 | return branch_tip in _commit_ancestors(root, into_tip) |
| 225 | |
| 226 | def _contains_commit(root: pathlib.Path, branch: str, commit_id: str) -> bool: |
| 227 | """Return ``True`` if *commit_id* is reachable from the tip of *branch*.""" |
| 228 | tip = get_head_commit_id(root, branch) |
| 229 | if tip is None: |
| 230 | return False |
| 231 | return commit_id in _commit_ancestors(root, tip) |
| 232 | |
| 233 | def _cleanup_empty_dirs(ref_file: pathlib.Path, heads_dir: pathlib.Path) -> None: |
| 234 | """Remove any empty parent directories left behind after unlinking *ref_file*.""" |
| 235 | for parent in ref_file.parents: |
| 236 | if parent == heads_dir: |
| 237 | break |
| 238 | try: |
| 239 | parent.rmdir() |
| 240 | except OSError: |
| 241 | break |
| 242 | |
| 243 | def _resolve_start_point(root: pathlib.Path, current: str, start_point: str) -> str: |
| 244 | """Resolve *start_point* to a full commit ID. |
| 245 | |
| 246 | Accepts branch names, full SHA-256 commit IDs, and abbreviated SHA |
| 247 | prefixes (any unambiguous prefix works). Returns the raw *start_point* |
| 248 | string unchanged if resolution fails — the caller is responsible for |
| 249 | surfacing a meaningful error in that case. |
| 250 | """ |
| 251 | # Try as branch name first — skip if it contains characters forbidden in |
| 252 | # branch names (e.g. ':' in sha256:-prefixed IDs) to avoid ValueError. |
| 253 | try: |
| 254 | branch_tip = get_head_commit_id(root, start_point) |
| 255 | if branch_tip is not None: |
| 256 | return branch_tip |
| 257 | except ValueError: |
| 258 | pass # Not a valid branch name — fall through to SHA resolution. |
| 259 | # Fall back to SHA / SHA-prefix resolution. |
| 260 | # resolve_commit_ref handles both bare hex and sha256:-prefixed IDs. |
| 261 | rec = resolve_commit_ref(root, current, start_point) |
| 262 | if rec is not None: |
| 263 | return rec.commit_id |
| 264 | # Return as-is; the caller's write_branch_ref will expose the invalid ID. |
| 265 | return start_point |
| 266 | |
| 267 | # --------------------------------------------------------------------------- |
| 268 | # CLI registration |
| 269 | # --------------------------------------------------------------------------- |
| 270 | |
| 271 | def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: |
| 272 | """Register the ``muse branch`` subcommand and all its flags.""" |
| 273 | parser = subparsers.add_parser( |
| 274 | "branch", |
| 275 | help="List, create, rename, copy, or delete branches.", |
| 276 | description=__doc__, |
| 277 | formatter_class=argparse.RawDescriptionHelpFormatter, |
| 278 | ) |
| 279 | parser.add_argument("args", nargs="*", help="Branch name(s) — context-sensitive.") |
| 280 | |
| 281 | # Mutually exclusive operation flags (mirrors git branch). |
| 282 | ops = parser.add_mutually_exclusive_group() |
| 283 | ops.add_argument( |
| 284 | "-d", "--delete", dest="op", action="store_const", const="delete", |
| 285 | help="Delete a branch (safe — must be fully merged).", |
| 286 | ) |
| 287 | ops.add_argument( |
| 288 | "-D", dest="op", action="store_const", const="force_delete", |
| 289 | help="Force-delete a branch regardless of merge status.", |
| 290 | ) |
| 291 | ops.add_argument( |
| 292 | "-m", "--move", dest="op", action="store_const", const="rename", |
| 293 | help="Rename a branch (safe).", |
| 294 | ) |
| 295 | ops.add_argument( |
| 296 | "-M", dest="op", action="store_const", const="force_rename", |
| 297 | help="Force-rename a branch.", |
| 298 | ) |
| 299 | ops.add_argument( |
| 300 | "-c", "--copy", dest="op", action="store_const", const="copy", |
| 301 | help="Copy a branch (safe).", |
| 302 | ) |
| 303 | ops.add_argument( |
| 304 | "-C", dest="op", action="store_const", const="force_copy", |
| 305 | help="Force-copy a branch.", |
| 306 | ) |
| 307 | |
| 308 | # Listing modifiers. |
| 309 | parser.add_argument( |
| 310 | "-v", action="count", default=0, dest="verbose", |
| 311 | help="Show last commit SHA + subject. Repeat (-vv) to also show upstream.", |
| 312 | ) |
| 313 | parser.add_argument( |
| 314 | "-r", "--remotes", action="store_true", |
| 315 | help="List remote-tracking branches.", |
| 316 | ) |
| 317 | parser.add_argument( |
| 318 | "-a", "--all", action="store_true", dest="all_branches", |
| 319 | help="List both local and remote-tracking branches.", |
| 320 | ) |
| 321 | parser.add_argument( |
| 322 | "--merged", metavar="COMMIT", nargs="?", const="HEAD", |
| 323 | help="Only list branches merged into COMMIT (default HEAD).", |
| 324 | ) |
| 325 | parser.add_argument( |
| 326 | "--no-merged", metavar="COMMIT", nargs="?", const="HEAD", |
| 327 | help="Only list branches NOT merged into COMMIT (default HEAD).", |
| 328 | ) |
| 329 | parser.add_argument( |
| 330 | "--contains", metavar="COMMIT", |
| 331 | help="Only list branches that contain COMMIT.", |
| 332 | ) |
| 333 | parser.add_argument( |
| 334 | "--sort", default="name", metavar="KEY", |
| 335 | choices=["name", "committeddate"], |
| 336 | help="Sort branches by 'name' (default) or 'committeddate'.", |
| 337 | ) |
| 338 | parser.add_argument( |
| 339 | "--intent", default=None, metavar="TEXT", |
| 340 | help="Short description of what this branch is for (stored in config).", |
| 341 | ) |
| 342 | parser.add_argument( |
| 343 | "--resumable", action="store_true", default=False, |
| 344 | help=( |
| 345 | "On create: mark this branch as a resumable agent checkpoint. " |
| 346 | "On list (no name): filter to resumable branches only." |
| 347 | ), |
| 348 | ) |
| 349 | parser.add_argument( |
| 350 | "--prune-config", action="store_true", dest="prune_config", |
| 351 | help=( |
| 352 | "Remove stale [branch.*] entries from .muse/config.toml — " |
| 353 | "entries for branches whose ref no longer exists. " |
| 354 | "Use --dry-run to preview without writing." |
| 355 | ), |
| 356 | ) |
| 357 | parser.add_argument( |
| 358 | "--dry-run", action="store_true", dest="dry_run", |
| 359 | help="With --prune-config: report what would be removed without writing.", |
| 360 | ) |
| 361 | parser.add_argument( |
| 362 | "--json", "-j", action="store_true", dest="json_out", |
| 363 | help="Emit machine-readable JSON.", |
| 364 | ) |
| 365 | parser.set_defaults(func=run, op=None, prune_config=False, dry_run=False) |
| 366 | |
| 367 | # --------------------------------------------------------------------------- |
| 368 | # Command handler |
| 369 | # --------------------------------------------------------------------------- |
| 370 | |
| 371 | def run(args: argparse.Namespace) -> None: |
| 372 | """List, create, rename, copy, or delete branches. |
| 373 | |
| 374 | Without a subcommand flag, lists all local branches. With ``--format json`` |
| 375 | the output is a stable JSON array; mutation ops (create, rename, copy, |
| 376 | delete) emit a single result object with an ``"action"`` key. |
| 377 | |
| 378 | Agent quickstart |
| 379 | ---------------- |
| 380 | :: |
| 381 | |
| 382 | muse branch --json # list all branches |
| 383 | muse branch --json --resumable # list resumable branches only |
| 384 | muse branch -b feat/thing --json # create branch |
| 385 | muse branch -d feat/thing --json # delete branch |
| 386 | |
| 387 | JSON fields (list mode — top-level is a bare array) |
| 388 | ---------------------------------------------------- |
| 389 | name Branch name. |
| 390 | current ``true`` for the currently checked-out branch. |
| 391 | commit_id Full ``sha256:…`` commit ID at the tip. |
| 392 | last_message Commit message at the tip. |
| 393 | upstream Upstream tracking ref; ``null`` if none. |
| 394 | intent Branch intent annotation (``--intent`` flag). |
| 395 | resumable ``true`` if the branch was created with ``--resumable``. |
| 396 | |
| 397 | JSON fields (mutation mode) |
| 398 | --------------------------- |
| 399 | action What was done: ``"created"``, ``"deleted"``, ``"renamed"``, etc. |
| 400 | name Branch name acted upon. |
| 401 | |
| 402 | Exit codes |
| 403 | ---------- |
| 404 | 0 Success. |
| 405 | 1 Invalid arguments, branch not found, or operation conflicts. |
| 406 | 2 Not inside a Muse repository. |
| 407 | """ |
| 408 | elapsed = start_timer() |
| 409 | positional: list[str] = args.args |
| 410 | op: str | None = args.op |
| 411 | verbose: int = clamp_int(args.verbose, 0, 4, 'verbose') |
| 412 | remotes_only: bool = args.remotes |
| 413 | all_branches: bool = args.all_branches |
| 414 | merged_into: str | None = args.merged |
| 415 | not_merged_into: str | None = args.no_merged |
| 416 | contains_commit: str | None = args.contains |
| 417 | sort_key: str = args.sort |
| 418 | intent: str | None = args.intent |
| 419 | resumable: bool = args.resumable |
| 420 | json_out: bool = args.json_out |
| 421 | tty: bool = sys.stdout.isatty() |
| 422 | |
| 423 | root = require_repo() |
| 424 | current = read_current_branch(root) |
| 425 | heads_dir = _heads_dir(root) |
| 426 | |
| 427 | # ------------------------------------------------------------------ |
| 428 | # PRUNE-CONFIG — remove stale [branch.*] entries from config.toml |
| 429 | # ------------------------------------------------------------------ |
| 430 | if args.prune_config: |
| 431 | config_path = _config_toml_path(root) |
| 432 | config: MsgpackDict = {} |
| 433 | if config_path.exists(): |
| 434 | import tomllib as _tomllib |
| 435 | config = _tomllib.loads(config_path.read_text()) |
| 436 | branch_sections: MsgpackDict = dict(config.get("branch") or {}) |
| 437 | pruned: list[str] = [] |
| 438 | kept: list[str] = [] |
| 439 | for bname in sorted(branch_sections): |
| 440 | ref_file = _heads_dir(root) / bname |
| 441 | if ref_file.exists(): |
| 442 | kept.append(bname) |
| 443 | else: |
| 444 | pruned.append(bname) |
| 445 | if not args.dry_run: |
| 446 | delete_branch_meta(root, bname) |
| 447 | result: _PruneConfigJson = { |
| 448 | "action": "prune_config", |
| 449 | "pruned": len(pruned), |
| 450 | "kept": len(kept), |
| 451 | "dry_run": args.dry_run, |
| 452 | "pruned_branches": pruned, |
| 453 | } |
| 454 | if json_out: |
| 455 | print(json.dumps(result)) |
| 456 | else: |
| 457 | prefix = "[dry-run] " if args.dry_run else "" |
| 458 | print(f"{prefix}Pruned {len(pruned)} stale config entries, kept {len(kept)} live entries.") |
| 459 | for b in pruned: |
| 460 | print(f" - {b}") |
| 461 | return |
| 462 | |
| 463 | # ------------------------------------------------------------------ |
| 464 | # DELETE / FORCE-DELETE |
| 465 | # Supports two modes: |
| 466 | # muse branch -d|-D <local-branch> — delete a local branch |
| 467 | # muse branch -d|-D -r <remote>/<branch> — prune a remote-tracking ref |
| 468 | # ------------------------------------------------------------------ |
| 469 | if op in ("delete", "force_delete"): |
| 470 | if not positional: |
| 471 | if json_out: |
| 472 | print(json.dumps({"error": "usage", "message": "muse branch -d|-D [-r] <branch> …"})) |
| 473 | print("❌ Usage: muse branch -d|-D [-r] <branch> …", file=sys.stderr) |
| 474 | raise SystemExit(ExitCode.USER_ERROR) |
| 475 | |
| 476 | # -r flag: delete local remote-tracking refs (no server call). |
| 477 | if remotes_only: |
| 478 | from muse.cli.config import delete_remote_head |
| 479 | for spec in positional: |
| 480 | # Accept both "remote/branch" and "remotes/remote/branch" spellings. |
| 481 | clean = spec.removeprefix("remotes/") |
| 482 | slash = clean.find("/") |
| 483 | if slash == -1: |
| 484 | if json_out: |
| 485 | print(json.dumps({"error": "invalid_ref", "ref": spec, "message": "remote-tracking ref must be '<remote>/<branch>'"})) |
| 486 | print( |
| 487 | f"❌ Remote-tracking ref must be '<remote>/<branch>', got " |
| 488 | f"'{sanitize_display(spec)}'.", |
| 489 | file=sys.stderr, |
| 490 | ) |
| 491 | raise SystemExit(ExitCode.USER_ERROR) |
| 492 | remote_name = clean[:slash] |
| 493 | branch_name = clean[slash + 1:] |
| 494 | removed = delete_remote_head(remote_name, branch_name, root) |
| 495 | if not removed: |
| 496 | if json_out: |
| 497 | print(json.dumps({"error": "not_found", "ref": clean, "message": f"remote-tracking ref '{clean}' not found"})) |
| 498 | print( |
| 499 | f"❌ Remote-tracking ref '{sanitize_display(clean)}' not found.", |
| 500 | file=sys.stderr, |
| 501 | ) |
| 502 | raise SystemExit(ExitCode.USER_ERROR) |
| 503 | if json_out: |
| 504 | print(json.dumps({ |
| 505 | "action": "deleted_remote_tracking", |
| 506 | "remote": remote_name, |
| 507 | "branch": branch_name, |
| 508 | })) |
| 509 | else: |
| 510 | print( |
| 511 | f"Deleted remote-tracking ref " |
| 512 | f"{_c(sanitize_display(clean), _RED, tty=tty)}." |
| 513 | ) |
| 514 | return |
| 515 | |
| 516 | force = op == "force_delete" |
| 517 | for branch_name in positional: |
| 518 | try: |
| 519 | validate_branch_name(branch_name) |
| 520 | except ValueError as exc: |
| 521 | if json_out: |
| 522 | print(json.dumps({"error": "invalid_branch_name", "branch": branch_name, "message": str(exc)})) |
| 523 | print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) |
| 524 | raise SystemExit(ExitCode.USER_ERROR) |
| 525 | if branch_name == current: |
| 526 | if json_out: |
| 527 | print(json.dumps({"error": "current_branch", "branch": branch_name, "message": f"cannot delete the currently checked-out branch '{branch_name}'"})) |
| 528 | print( |
| 529 | f"❌ Cannot delete the currently checked-out branch " |
| 530 | f"'{sanitize_display(branch_name)}'.", |
| 531 | file=sys.stderr, |
| 532 | ) |
| 533 | raise SystemExit(ExitCode.USER_ERROR) |
| 534 | rf = _ref_file(root, branch_name) |
| 535 | if not rf.is_file(): |
| 536 | if json_out: |
| 537 | print(json.dumps({"error": "not_found", "branch": branch_name, "message": f"branch '{branch_name}' not found"})) |
| 538 | print(f"❌ Branch '{sanitize_display(branch_name)}' not found.", file=sys.stderr) |
| 539 | raise SystemExit(ExitCode.USER_ERROR) |
| 540 | if not force and not _is_merged(root, branch_name, current): |
| 541 | if json_out: |
| 542 | print(json.dumps({"error": "not_merged", "branch": branch_name, "message": f"branch '{branch_name}' is not fully merged", "hint": "use -D to force-delete"})) |
| 543 | print( |
| 544 | f"❌ Branch '{sanitize_display(branch_name)}' is not fully merged.\n" |
| 545 | f" Use -D to force-delete.", |
| 546 | file=sys.stderr, |
| 547 | ) |
| 548 | raise SystemExit(ExitCode.USER_ERROR) |
| 549 | protected = get_protected_branches(root) |
| 550 | if is_branch_protected(branch_name, protected): |
| 551 | if json_out: |
| 552 | print(json.dumps({"error": "protected", "branch": branch_name, "message": f"branch '{branch_name}' is protected and cannot be deleted"})) |
| 553 | else: |
| 554 | print( |
| 555 | f"❌ Branch '{sanitize_display(branch_name)}' is protected and cannot be deleted.", |
| 556 | file=sys.stderr, |
| 557 | ) |
| 558 | raise SystemExit(ExitCode.USER_ERROR) |
| 559 | tip = read_ref(rf) or "" |
| 560 | if not args.dry_run: |
| 561 | rf.unlink() |
| 562 | _cleanup_empty_dirs(rf, heads_dir) |
| 563 | reflog_file = _reflog_branch_path(root, branch_name) |
| 564 | reflog_file.unlink(missing_ok=True) |
| 565 | _cleanup_empty_dirs(reflog_file, _reflog_heads_dir(root)) |
| 566 | delete_branch_meta(root, branch_name) |
| 567 | if json_out: |
| 568 | print(json.dumps({"action": "deleted", "branch": branch_name, "was": tip, "dry_run": args.dry_run})) |
| 569 | else: |
| 570 | prefix = "[dry-run] " if args.dry_run else "" |
| 571 | print( |
| 572 | f"{prefix}Deleted branch {_c(sanitize_display(branch_name), _RED, tty=tty)} " |
| 573 | f"({_c('was ' + (tip or 'unknown'), _DIM, tty=tty)})." |
| 574 | ) |
| 575 | return |
| 576 | |
| 577 | # ------------------------------------------------------------------ |
| 578 | # RENAME / FORCE-RENAME |
| 579 | # ------------------------------------------------------------------ |
| 580 | if op in ("rename", "force_rename"): |
| 581 | force = op == "force_rename" |
| 582 | if len(positional) == 1: |
| 583 | old_name, new_name = current, positional[0] |
| 584 | elif len(positional) == 2: |
| 585 | old_name, new_name = positional[0], positional[1] |
| 586 | else: |
| 587 | if json_out: |
| 588 | print(json.dumps({"error": "usage", "message": "muse branch -m|-M [<old>] <new>"})) |
| 589 | print("❌ Usage: muse branch -m|-M [<old>] <new>", file=sys.stderr) |
| 590 | raise SystemExit(ExitCode.USER_ERROR) |
| 591 | for n in (old_name, new_name): |
| 592 | try: |
| 593 | validate_branch_name(n) |
| 594 | except ValueError as exc: |
| 595 | if json_out: |
| 596 | print(json.dumps({"error": "invalid_branch_name", "branch": n, "message": str(exc)})) |
| 597 | print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) |
| 598 | raise SystemExit(ExitCode.USER_ERROR) |
| 599 | src = _ref_file(root, old_name) |
| 600 | dst = _ref_file(root, new_name) |
| 601 | if not src.is_file(): |
| 602 | if json_out: |
| 603 | print(json.dumps({"error": "not_found", "branch": old_name, "message": f"branch '{old_name}' not found"})) |
| 604 | print(f"❌ Branch '{sanitize_display(old_name)}' not found.", file=sys.stderr) |
| 605 | raise SystemExit(ExitCode.USER_ERROR) |
| 606 | if dst.is_file() and not force: |
| 607 | if json_out: |
| 608 | print(json.dumps({"error": "already_exists", "branch": new_name, "message": f"branch '{new_name}' already exists", "hint": "use -M to force"})) |
| 609 | print( |
| 610 | f"❌ Branch '{sanitize_display(new_name)}' already exists. Use -M to force.", |
| 611 | file=sys.stderr, |
| 612 | ) |
| 613 | raise SystemExit(ExitCode.USER_ERROR) |
| 614 | tip = read_ref(src) or "" |
| 615 | if tip: |
| 616 | write_branch_ref(root, new_name, tip) |
| 617 | else: |
| 618 | write_text_atomic(dst, "") |
| 619 | src.unlink() |
| 620 | _cleanup_empty_dirs(src, heads_dir) |
| 621 | if old_name == current: |
| 622 | write_head_branch(root, new_name) |
| 623 | if json_out: |
| 624 | print(json.dumps({"action": "renamed", "from": old_name, "to": new_name})) |
| 625 | else: |
| 626 | print( |
| 627 | f"Renamed branch " |
| 628 | f"{_c(sanitize_display(old_name), _YELLOW, tty=tty)} → " |
| 629 | f"{_c(sanitize_display(new_name), _GREEN, tty=tty)}." |
| 630 | ) |
| 631 | return |
| 632 | |
| 633 | # ------------------------------------------------------------------ |
| 634 | # COPY / FORCE-COPY |
| 635 | # ------------------------------------------------------------------ |
| 636 | if op in ("copy", "force_copy"): |
| 637 | force = op == "force_copy" |
| 638 | if len(positional) == 1: |
| 639 | src_name, dst_name = current, positional[0] |
| 640 | elif len(positional) == 2: |
| 641 | src_name, dst_name = positional[0], positional[1] |
| 642 | else: |
| 643 | if json_out: |
| 644 | print(json.dumps({"error": "usage", "message": "muse branch -c|-C [<src>] <dest>"})) |
| 645 | print("❌ Usage: muse branch -c|-C [<src>] <dest>", file=sys.stderr) |
| 646 | raise SystemExit(ExitCode.USER_ERROR) |
| 647 | for n in (src_name, dst_name): |
| 648 | try: |
| 649 | validate_branch_name(n) |
| 650 | except ValueError as exc: |
| 651 | if json_out: |
| 652 | print(json.dumps({"error": "invalid_branch_name", "branch": n, "message": str(exc)})) |
| 653 | print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) |
| 654 | raise SystemExit(ExitCode.USER_ERROR) |
| 655 | src = _ref_file(root, src_name) |
| 656 | dst = _ref_file(root, dst_name) |
| 657 | if not src.is_file(): |
| 658 | if json_out: |
| 659 | print(json.dumps({"error": "not_found", "branch": src_name, "message": f"branch '{src_name}' not found"})) |
| 660 | print(f"❌ Branch '{sanitize_display(src_name)}' not found.", file=sys.stderr) |
| 661 | raise SystemExit(ExitCode.USER_ERROR) |
| 662 | if dst.is_file() and not force: |
| 663 | if json_out: |
| 664 | print(json.dumps({"error": "already_exists", "branch": dst_name, "message": f"branch '{dst_name}' already exists", "hint": "use -C to force"})) |
| 665 | print( |
| 666 | f"❌ Branch '{sanitize_display(dst_name)}' already exists. Use -C to force.", |
| 667 | file=sys.stderr, |
| 668 | ) |
| 669 | raise SystemExit(ExitCode.USER_ERROR) |
| 670 | tip = read_ref(src) or "" |
| 671 | if tip: |
| 672 | write_branch_ref(root, dst_name, tip) |
| 673 | else: |
| 674 | write_text_atomic(dst, "") |
| 675 | if json_out: |
| 676 | print(json.dumps({"action": "copied", "from": src_name, "to": dst_name})) |
| 677 | else: |
| 678 | print( |
| 679 | f"Copied branch " |
| 680 | f"{_c(sanitize_display(src_name), _YELLOW, tty=tty)} → " |
| 681 | f"{_c(sanitize_display(dst_name), _GREEN, tty=tty)}." |
| 682 | ) |
| 683 | return |
| 684 | |
| 685 | # ------------------------------------------------------------------ |
| 686 | # CREATE |
| 687 | # ------------------------------------------------------------------ |
| 688 | if op is None and positional: |
| 689 | new_name = positional[0] |
| 690 | start_point: str | None = positional[1] if len(positional) > 1 else None |
| 691 | try: |
| 692 | validate_branch_name(new_name) |
| 693 | except ValueError as exc: |
| 694 | if json_out: |
| 695 | print(json.dumps({"error": "invalid_branch_name", "branch": new_name, "message": str(exc)})) |
| 696 | print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) |
| 697 | raise SystemExit(ExitCode.USER_ERROR) |
| 698 | rf = _ref_file(root, new_name) |
| 699 | if rf.is_file(): |
| 700 | # Branch exists. If --intent or --resumable given (no start_point), |
| 701 | # treat as a metadata update rather than a failed create. |
| 702 | if (intent is not None or resumable) and start_point is None: |
| 703 | write_branch_meta( |
| 704 | root, |
| 705 | new_name, |
| 706 | intent=intent, |
| 707 | resumable=resumable if resumable else None, |
| 708 | ) |
| 709 | meta = read_branch_meta(root, new_name) |
| 710 | if json_out: |
| 711 | print(json.dumps({ |
| 712 | "action": "updated", |
| 713 | "branch": new_name, |
| 714 | "intent": meta.get("intent"), |
| 715 | "resumable": bool(meta.get("resumable", False)), |
| 716 | })) |
| 717 | else: |
| 718 | parts: list[str] = [] |
| 719 | if intent is not None: |
| 720 | parts.append(f"intent={sanitize_display(intent)!r}") |
| 721 | if resumable: |
| 722 | parts.append("resumable=true") |
| 723 | print( |
| 724 | f"Updated branch {_c(sanitize_display(new_name), _YELLOW, tty=tty)}" |
| 725 | f"{' (' + ', '.join(parts) + ')' if parts else ''}." |
| 726 | ) |
| 727 | return |
| 728 | if json_out: |
| 729 | print(json.dumps({"error": "already_exists", "branch": new_name, "message": f"branch '{new_name}' already exists"})) |
| 730 | print(f"❌ Branch '{sanitize_display(new_name)}' already exists.", file=sys.stderr) |
| 731 | raise SystemExit(ExitCode.USER_ERROR) |
| 732 | |
| 733 | if start_point is not None: |
| 734 | # Resolve branch names, full SHAs, and abbreviated SHA prefixes. |
| 735 | sp_tip: str = _resolve_start_point(root, current, start_point) |
| 736 | else: |
| 737 | sp_tip = get_head_commit_id(root, current) or "" |
| 738 | |
| 739 | if sp_tip: |
| 740 | write_branch_ref(root, new_name, sp_tip) |
| 741 | source_label = start_point or current or "HEAD" |
| 742 | append_reflog(root, new_name, None, sp_tip, "", f"branch: Created from {source_label}") |
| 743 | else: |
| 744 | write_text_atomic(rf, "") |
| 745 | |
| 746 | # Persist intent / resumable if supplied. |
| 747 | if intent is not None or resumable: |
| 748 | write_branch_meta( |
| 749 | root, |
| 750 | new_name, |
| 751 | intent=intent, |
| 752 | resumable=resumable if resumable else None, |
| 753 | ) |
| 754 | |
| 755 | if json_out: |
| 756 | print(json.dumps({ |
| 757 | **make_envelope(elapsed), |
| 758 | **_BranchCreateJson( |
| 759 | action="created", |
| 760 | branch=new_name, |
| 761 | commit_id=sp_tip or None, |
| 762 | intent=intent, |
| 763 | resumable=resumable, |
| 764 | ), |
| 765 | "from": start_point, |
| 766 | })) |
| 767 | else: |
| 768 | print(f"Created branch {_c(sanitize_display(new_name), _GREEN, tty=tty)}.") |
| 769 | return |
| 770 | |
| 771 | # ------------------------------------------------------------------ |
| 772 | # LIST |
| 773 | # ------------------------------------------------------------------ |
| 774 | local_branches = _list_local_branches(root) |
| 775 | if remotes_only: |
| 776 | display_branches = [f"remotes/{b}" for b in _list_remotes(root)] |
| 777 | elif all_branches: |
| 778 | display_branches = local_branches + [f"remotes/{b}" for b in _list_remotes(root)] |
| 779 | else: |
| 780 | display_branches = list(local_branches) |
| 781 | |
| 782 | # --resumable filter: only show branches marked resumable in config. |
| 783 | if resumable and not positional: |
| 784 | filtered_resumable: list[str] = [] |
| 785 | for b in display_branches: |
| 786 | local_b = b.removeprefix("remotes/") |
| 787 | meta = read_branch_meta(root, local_b) |
| 788 | if meta.get("resumable") is True: |
| 789 | filtered_resumable.append(b) |
| 790 | display_branches = filtered_resumable |
| 791 | |
| 792 | # --merged / --no-merged / --contains filters |
| 793 | if merged_into or not_merged_into or contains_commit: |
| 794 | resolved_current = current |
| 795 | |
| 796 | # Pre-compute ancestor sets once — not once per branch. |
| 797 | # _commit_ancestors walks the full commit DAG; recomputing it for every |
| 798 | # branch being checked is O(branches × commits) instead of O(commits). |
| 799 | _merged_ancestors: set[str] | None = None |
| 800 | if merged_into: |
| 801 | _into = resolved_current if merged_into == "HEAD" else merged_into |
| 802 | _into_tip = get_head_commit_id(root, _into) |
| 803 | _merged_ancestors = _commit_ancestors(root, _into_tip) if _into_tip else set() |
| 804 | |
| 805 | _not_merged_ancestors: set[str] | None = None |
| 806 | if not_merged_into: |
| 807 | _into = resolved_current if not_merged_into == "HEAD" else not_merged_into |
| 808 | _into_tip = get_head_commit_id(root, _into) |
| 809 | _not_merged_ancestors = _commit_ancestors(root, _into_tip) if _into_tip else set() |
| 810 | |
| 811 | def _passes(b: str) -> bool: |
| 812 | local_b = b.removeprefix("remotes/") |
| 813 | if _merged_ancestors is not None: |
| 814 | tip = get_head_commit_id(root, local_b) |
| 815 | if tip is None or tip not in _merged_ancestors: |
| 816 | return False |
| 817 | if _not_merged_ancestors is not None: |
| 818 | tip = get_head_commit_id(root, local_b) |
| 819 | if tip is not None and tip in _not_merged_ancestors: |
| 820 | return False |
| 821 | if contains_commit: |
| 822 | if not _contains_commit(root, local_b, contains_commit): |
| 823 | return False |
| 824 | return True |
| 825 | |
| 826 | display_branches = [b for b in display_branches if _passes(b)] |
| 827 | |
| 828 | # --sort: sort by committed date if requested. |
| 829 | # Name sort is the default (already applied by _list_local_branches). |
| 830 | if sort_key == "committeddate": |
| 831 | def _committed_ts(b: str) -> str: |
| 832 | cid = _resolve_commit_id(root, b) |
| 833 | if not cid: |
| 834 | return "" |
| 835 | rec = read_commit(root, cid) |
| 836 | return rec.committed_at.isoformat() if rec else "" |
| 837 | |
| 838 | display_branches = sorted(display_branches, key=_committed_ts, reverse=True) |
| 839 | |
| 840 | if json_out: |
| 841 | result: list[_BranchEntryJson] = [] |
| 842 | for b in display_branches: |
| 843 | local_b = b.removeprefix("remotes/") |
| 844 | commit_id = _resolve_commit_id(root, b) |
| 845 | rec = read_commit(root, commit_id) if commit_id else None |
| 846 | last_message: str | None = ( |
| 847 | sanitize_display(rec.message.splitlines()[0][:72]) if rec and rec.message else None |
| 848 | ) |
| 849 | upstream: str | None = _upstream_for(root, local_b) |
| 850 | meta = read_branch_meta(root, local_b) |
| 851 | branch_intent: str | None = meta.get("intent") or None # type: ignore[assignment] |
| 852 | branch_intent = sanitize_display(branch_intent) if branch_intent else None |
| 853 | branch_resumable: bool = bool(meta.get("resumable", False)) |
| 854 | created_by: str | None = (rec.agent_id if rec and rec.agent_id else None) |
| 855 | result.append({ |
| 856 | "name": b, |
| 857 | "current": local_b == current, |
| 858 | "commit_id": commit_id or None, |
| 859 | "committed_at": rec.committed_at.isoformat() if rec else None, |
| 860 | "last_message": last_message, |
| 861 | "upstream": upstream, |
| 862 | "intent": branch_intent, |
| 863 | "resumable": branch_resumable, |
| 864 | "created_by": created_by, |
| 865 | }) |
| 866 | print(json.dumps(result)) |
| 867 | return |
| 868 | |
| 869 | for b in display_branches: |
| 870 | is_remote_entry = b.startswith("remotes/") |
| 871 | local_b = b.removeprefix("remotes/") |
| 872 | is_current = (local_b == current) and not is_remote_entry |
| 873 | marker = _c("* ", _GREEN, tty=tty) if is_current else " " |
| 874 | # Build the display name once; apply sanitization before any coloring |
| 875 | # so that ANSI codes from _c() are not accidentally re-sanitized. |
| 876 | safe_name = sanitize_display(b) |
| 877 | name_str = _c(safe_name, _GREEN, tty=tty) if is_current else safe_name |
| 878 | if verbose >= 1: |
| 879 | commit_id = _resolve_commit_id(root, b) |
| 880 | short = short_id(commit_id) if commit_id else "(empty)" |
| 881 | rec = read_commit(root, commit_id) if commit_id else None |
| 882 | msg = sanitize_display(rec.message.splitlines()[0][:48]) if rec and rec.message else "" |
| 883 | short_str = _c(short, _YELLOW, tty=tty) |
| 884 | if verbose >= 2: |
| 885 | upstream = _upstream_for(root, local_b) |
| 886 | up_str = ( |
| 887 | f" [{_c(sanitize_display(upstream), _CYAN, tty=tty)}]" |
| 888 | if upstream else "" |
| 889 | ) |
| 890 | print(f"{marker}{name_str} {short_str}{up_str} {msg}") |
| 891 | else: |
| 892 | print(f"{marker}{name_str} {short_str} {msg}") |
| 893 | else: |
| 894 | print(f"{marker}{name_str}") |
File History
4 commits
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29
fix(branch): guard -d --dry-run against destructive writes;…
Sonnet 4.6
patch
2 days ago
sha256:42d0a10d093980afe543a88e9ed75c5ad0ac339026e419ee3b07b8a57c73ed5b
fix(branch): dry-run flag ignored on branch delete (closes #25)
Sonnet 4.6
patch
2 days ago
sha256:8860dea10c653956b613a814cc752a6d34cb3986cdf16749a49172affdabf045
fix tests
Human
minor
⚠
9 days ago
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103
feat: branch --prune-config, fix hub repo delete docstrings…
Sonnet 4.6
minor
⚠
10 days ago