gabriel / muse public
branch.py python
894 lines 37.4 KB
Raw
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29 fix(branch): guard -d --dry-run against destructive writes;… Sonnet 4.6 patch 2 days ago
1 """``muse branch`` — list, create, rename, copy, and delete branches.
2
3 Git-idiomatic flags::
4
5 muse branch # list all local branches
6 muse branch <name> # create branch at HEAD
7 muse branch <name> <start-point> # create at commit SHA, SHA prefix, or branch
8 muse branch -d <name> # safe delete (must be merged)
9 muse branch -D <name> # force delete
10 muse branch -dr <remote>/<branch> # delete local remote-tracking ref (no server call)
11 muse branch -Dr <remote>/<branch> # same, force (no merge check)
12 muse branch -m [<old>] <new> # rename (safe)
13 muse branch -M [<old>] <new> # rename (force)
14 muse branch -c [<src>] <dest> # copy (safe)
15 muse branch -C [<src>] <dest> # copy (force)
16 muse branch -v # list with last commit SHA + subject
17 muse branch -vv # also show upstream tracking ref
18 muse branch -r # list remote-tracking branches
19 muse branch -a # list local + remote-tracking branches
20 muse branch --merged [<commit>] # only branches merged into commit
21 muse branch --no-merged [<commit>] # only branches NOT merged into commit
22 muse branch --contains <commit> # only branches that contain commit
23 muse branch --sort name # sort by name (default)
24 muse branch --sort committeddate # sort by date of most recent commit
25
26 To delete a branch on the remote **and** prune the local tracking ref in one
27 step, use ``muse push``::
28
29 muse push <remote> --delete <branch>
30
31 Agents should pass ``--format json`` (or ``--json``) for machine-readable
32 output on all operations. The listing JSON schema is::
33
34 [
35 {
36 "name": "feat/my-thing",
37 "current": false,
38 "commit_id": "<sha256> | null",
39 "committed_at": "2026-03-21T12:00:00+00:00 | null",
40 "last_message": "Add feature X",
41 "upstream": "origin/feat/my-thing"
42 },
43 ...
44 ]
45
46 Exit codes::
47
48 0 — success
49 1 — invalid branch name, branch not found, attempting to delete checked-out branch
50 """
51
52 import argparse
53 import json
54 import logging
55 import pathlib
56 import sys
57 import tomllib
58 from typing import TypedDict
59
60 from muse.cli.config import delete_branch_meta, get_protected_branches, get_remote_head, is_branch_protected, read_branch_meta, write_branch_meta
61 from muse.core.reflog import append_reflog
62 from muse.core.types import MsgpackDict, short_id
63 from muse.core.paths import ref_path as _ref_path, heads_dir as _heads_dir, remotes_dir as _remotes_dir, config_toml_path as _config_toml_path, reflog_branch_path as _reflog_branch_path, reflog_heads_dir as _reflog_heads_dir
64 from muse.core.envelope import EnvelopeJson, make_envelope
65 from muse.core.timing import start_timer
66 from muse.core.errors import ExitCode
67 from muse.core.repo import require_repo
68 from muse.core.refs import read_ref
69 from muse.core.io import write_text_atomic
70 from muse.core.refs import (
71 get_head_commit_id,
72 read_current_branch,
73 write_branch_ref,
74 write_head_branch,
75 )
76 from muse.core.commits import (
77 read_commit,
78 resolve_commit_ref,
79 )
80 from muse.core.validation import clamp_int, sanitize_display, validate_branch_name
81
82 type _Payload = dict[str, str | None]
83 logger = logging.getLogger(__name__)
84
85 class _BranchCreateJson(EnvelopeJson):
86 """JSON output for ``muse branch -b <name> --json``."""
87
88 action: str
89 branch: str
90 commit_id: str | None
91 intent: str | None
92 resumable: bool
93
94 class _BranchEntryJson(TypedDict):
95 name: str
96 current: bool
97 commit_id: str | None
98 committed_at: str | None
99 last_message: str | None
100 upstream: str | None
101 intent: str | None
102 resumable: bool
103 created_by: str | None
104
105 class _BranchListJson(EnvelopeJson):
106 """JSON output for ``muse branch --json``."""
107
108 branches: list[_BranchEntryJson]
109
110
111 class _PruneConfigJson(TypedDict):
112 """JSON output for ``muse branch --prune-config``."""
113
114 action: str
115 pruned: int
116 kept: int
117 dry_run: bool
118 pruned_branches: list[str]
119
120 # ---------------------------------------------------------------------------
121 # ANSI helpers — emitted only when stdout is a TTY.
122 # ---------------------------------------------------------------------------
123
124 _RESET = "\033[0m"
125 _BOLD = "\033[1m"
126 _DIM = "\033[2m"
127 _GREEN = "\033[32m"
128 _RED = "\033[31m"
129 _YELLOW = "\033[33m"
130 _CYAN = "\033[36m"
131
132 def _c(text: str, *codes: str, tty: bool) -> str:
133 """Wrap *text* in ANSI escape *codes* only when writing to a TTY."""
134 if not tty:
135 return text
136 return "".join(codes) + text + _RESET
137
138 # ---------------------------------------------------------------------------
139 # Internal helpers
140 # ---------------------------------------------------------------------------
141
142 def _ref_file(root: pathlib.Path, branch: str) -> pathlib.Path:
143 """Return the ref-file path for a local branch."""
144 return _ref_path(root, branch)
145
146 def _list_local_branches(root: pathlib.Path) -> list[str]:
147 """Return a sorted list of all local branch names.
148
149 Only plain files are considered; directories, symlinks and any file not
150 directly under ``refs/heads/`` (e.g. lock files) are silently skipped.
151 """
152 heads_dir = _heads_dir(root)
153 if not heads_dir.exists():
154 return []
155 return sorted(
156 p.relative_to(heads_dir).as_posix()
157 for p in heads_dir.rglob("*")
158 if p.is_file() and not p.name.startswith(".")
159 )
160
161 def _list_remotes(root: pathlib.Path) -> list[str]:
162 """Return sorted remote-tracking branch names as ``remote/branch``.
163
164 Only plain files are visited; symlinks, hidden files, and directories
165 are skipped to avoid leaking internal artefacts into the listing.
166 """
167 remotes_dir = _remotes_dir(root)
168 if not remotes_dir.exists():
169 return []
170 results: list[str] = []
171 for remote_dir in sorted(remotes_dir.iterdir()):
172 if not remote_dir.is_dir():
173 continue
174 remote = remote_dir.name
175 for ref_file in sorted(remote_dir.rglob("*")):
176 if ref_file.is_file() and not ref_file.name.startswith("."):
177 branch_rel = ref_file.relative_to(remote_dir).as_posix()
178 results.append(f"{remote}/{branch_rel}")
179 return results
180
181 def _resolve_commit_id(root: pathlib.Path, b: str) -> str:
182 """Return the current commit ID for a branch listing entry.
183
184 *b* is the display name (e.g. ``"main"`` or ``"remotes/origin/dev"``).
185 Remote entries are read from the remote tracking file under
186 ``.muse/remotes/``; local entries use the standard head ref.
187 """
188 if b.startswith("remotes/"):
189 rest = b.removeprefix("remotes/")
190 remote, _, branch_name = rest.partition("/")
191 if branch_name:
192 return get_remote_head(remote, branch_name, root) or ""
193 return get_head_commit_id(root, b) or ""
194
195 def _upstream_for(root: pathlib.Path, branch: str) -> str | None:
196 """Return the upstream tracking ref for *branch*, or ``None`` if unset."""
197 config_path = _config_toml_path(root)
198 if not config_path.exists():
199 return None
200 try:
201 with config_path.open("rb") as f:
202 config = tomllib.load(f)
203 section = config.get("branch", {}).get(branch, {})
204 remote: str | None = section.get("remote")
205 merge_ref: str | None = section.get("merge")
206 if remote and merge_ref:
207 short = merge_ref.removeprefix("refs/heads/")
208 return f"{remote}/{short}"
209 except Exception:
210 pass
211 return None
212
213 def _commit_ancestors(root: pathlib.Path, commit_id: str) -> set[str]:
214 """Return the set of all commit IDs reachable from *commit_id* (inclusive)."""
215 from muse.core.graph import ancestor_ids
216 return ancestor_ids(root, commit_id)
217
218 def _is_merged(root: pathlib.Path, branch: str, into: str) -> bool:
219 """Return ``True`` if the tip of *branch* is an ancestor of the tip of *into*."""
220 branch_tip = get_head_commit_id(root, branch)
221 into_tip = get_head_commit_id(root, into)
222 if branch_tip is None or into_tip is None:
223 return False
224 return branch_tip in _commit_ancestors(root, into_tip)
225
226 def _contains_commit(root: pathlib.Path, branch: str, commit_id: str) -> bool:
227 """Return ``True`` if *commit_id* is reachable from the tip of *branch*."""
228 tip = get_head_commit_id(root, branch)
229 if tip is None:
230 return False
231 return commit_id in _commit_ancestors(root, tip)
232
233 def _cleanup_empty_dirs(ref_file: pathlib.Path, heads_dir: pathlib.Path) -> None:
234 """Remove any empty parent directories left behind after unlinking *ref_file*."""
235 for parent in ref_file.parents:
236 if parent == heads_dir:
237 break
238 try:
239 parent.rmdir()
240 except OSError:
241 break
242
243 def _resolve_start_point(root: pathlib.Path, current: str, start_point: str) -> str:
244 """Resolve *start_point* to a full commit ID.
245
246 Accepts branch names, full SHA-256 commit IDs, and abbreviated SHA
247 prefixes (any unambiguous prefix works). Returns the raw *start_point*
248 string unchanged if resolution fails — the caller is responsible for
249 surfacing a meaningful error in that case.
250 """
251 # Try as branch name first — skip if it contains characters forbidden in
252 # branch names (e.g. ':' in sha256:-prefixed IDs) to avoid ValueError.
253 try:
254 branch_tip = get_head_commit_id(root, start_point)
255 if branch_tip is not None:
256 return branch_tip
257 except ValueError:
258 pass # Not a valid branch name — fall through to SHA resolution.
259 # Fall back to SHA / SHA-prefix resolution.
260 # resolve_commit_ref handles both bare hex and sha256:-prefixed IDs.
261 rec = resolve_commit_ref(root, current, start_point)
262 if rec is not None:
263 return rec.commit_id
264 # Return as-is; the caller's write_branch_ref will expose the invalid ID.
265 return start_point
266
267 # ---------------------------------------------------------------------------
268 # CLI registration
269 # ---------------------------------------------------------------------------
270
271 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
272 """Register the ``muse branch`` subcommand and all its flags."""
273 parser = subparsers.add_parser(
274 "branch",
275 help="List, create, rename, copy, or delete branches.",
276 description=__doc__,
277 formatter_class=argparse.RawDescriptionHelpFormatter,
278 )
279 parser.add_argument("args", nargs="*", help="Branch name(s) — context-sensitive.")
280
281 # Mutually exclusive operation flags (mirrors git branch).
282 ops = parser.add_mutually_exclusive_group()
283 ops.add_argument(
284 "-d", "--delete", dest="op", action="store_const", const="delete",
285 help="Delete a branch (safe — must be fully merged).",
286 )
287 ops.add_argument(
288 "-D", dest="op", action="store_const", const="force_delete",
289 help="Force-delete a branch regardless of merge status.",
290 )
291 ops.add_argument(
292 "-m", "--move", dest="op", action="store_const", const="rename",
293 help="Rename a branch (safe).",
294 )
295 ops.add_argument(
296 "-M", dest="op", action="store_const", const="force_rename",
297 help="Force-rename a branch.",
298 )
299 ops.add_argument(
300 "-c", "--copy", dest="op", action="store_const", const="copy",
301 help="Copy a branch (safe).",
302 )
303 ops.add_argument(
304 "-C", dest="op", action="store_const", const="force_copy",
305 help="Force-copy a branch.",
306 )
307
308 # Listing modifiers.
309 parser.add_argument(
310 "-v", action="count", default=0, dest="verbose",
311 help="Show last commit SHA + subject. Repeat (-vv) to also show upstream.",
312 )
313 parser.add_argument(
314 "-r", "--remotes", action="store_true",
315 help="List remote-tracking branches.",
316 )
317 parser.add_argument(
318 "-a", "--all", action="store_true", dest="all_branches",
319 help="List both local and remote-tracking branches.",
320 )
321 parser.add_argument(
322 "--merged", metavar="COMMIT", nargs="?", const="HEAD",
323 help="Only list branches merged into COMMIT (default HEAD).",
324 )
325 parser.add_argument(
326 "--no-merged", metavar="COMMIT", nargs="?", const="HEAD",
327 help="Only list branches NOT merged into COMMIT (default HEAD).",
328 )
329 parser.add_argument(
330 "--contains", metavar="COMMIT",
331 help="Only list branches that contain COMMIT.",
332 )
333 parser.add_argument(
334 "--sort", default="name", metavar="KEY",
335 choices=["name", "committeddate"],
336 help="Sort branches by 'name' (default) or 'committeddate'.",
337 )
338 parser.add_argument(
339 "--intent", default=None, metavar="TEXT",
340 help="Short description of what this branch is for (stored in config).",
341 )
342 parser.add_argument(
343 "--resumable", action="store_true", default=False,
344 help=(
345 "On create: mark this branch as a resumable agent checkpoint. "
346 "On list (no name): filter to resumable branches only."
347 ),
348 )
349 parser.add_argument(
350 "--prune-config", action="store_true", dest="prune_config",
351 help=(
352 "Remove stale [branch.*] entries from .muse/config.toml — "
353 "entries for branches whose ref no longer exists. "
354 "Use --dry-run to preview without writing."
355 ),
356 )
357 parser.add_argument(
358 "--dry-run", action="store_true", dest="dry_run",
359 help="With --prune-config: report what would be removed without writing.",
360 )
361 parser.add_argument(
362 "--json", "-j", action="store_true", dest="json_out",
363 help="Emit machine-readable JSON.",
364 )
365 parser.set_defaults(func=run, op=None, prune_config=False, dry_run=False)
366
367 # ---------------------------------------------------------------------------
368 # Command handler
369 # ---------------------------------------------------------------------------
370
371 def run(args: argparse.Namespace) -> None:
372 """List, create, rename, copy, or delete branches.
373
374 Without a subcommand flag, lists all local branches. With ``--format json``
375 the output is a stable JSON array; mutation ops (create, rename, copy,
376 delete) emit a single result object with an ``"action"`` key.
377
378 Agent quickstart
379 ----------------
380 ::
381
382 muse branch --json # list all branches
383 muse branch --json --resumable # list resumable branches only
384 muse branch -b feat/thing --json # create branch
385 muse branch -d feat/thing --json # delete branch
386
387 JSON fields (list mode — top-level is a bare array)
388 ----------------------------------------------------
389 name Branch name.
390 current ``true`` for the currently checked-out branch.
391 commit_id Full ``sha256:…`` commit ID at the tip.
392 last_message Commit message at the tip.
393 upstream Upstream tracking ref; ``null`` if none.
394 intent Branch intent annotation (``--intent`` flag).
395 resumable ``true`` if the branch was created with ``--resumable``.
396
397 JSON fields (mutation mode)
398 ---------------------------
399 action What was done: ``"created"``, ``"deleted"``, ``"renamed"``, etc.
400 name Branch name acted upon.
401
402 Exit codes
403 ----------
404 0 Success.
405 1 Invalid arguments, branch not found, or operation conflicts.
406 2 Not inside a Muse repository.
407 """
408 elapsed = start_timer()
409 positional: list[str] = args.args
410 op: str | None = args.op
411 verbose: int = clamp_int(args.verbose, 0, 4, 'verbose')
412 remotes_only: bool = args.remotes
413 all_branches: bool = args.all_branches
414 merged_into: str | None = args.merged
415 not_merged_into: str | None = args.no_merged
416 contains_commit: str | None = args.contains
417 sort_key: str = args.sort
418 intent: str | None = args.intent
419 resumable: bool = args.resumable
420 json_out: bool = args.json_out
421 tty: bool = sys.stdout.isatty()
422
423 root = require_repo()
424 current = read_current_branch(root)
425 heads_dir = _heads_dir(root)
426
427 # ------------------------------------------------------------------
428 # PRUNE-CONFIG — remove stale [branch.*] entries from config.toml
429 # ------------------------------------------------------------------
430 if args.prune_config:
431 config_path = _config_toml_path(root)
432 config: MsgpackDict = {}
433 if config_path.exists():
434 import tomllib as _tomllib
435 config = _tomllib.loads(config_path.read_text())
436 branch_sections: MsgpackDict = dict(config.get("branch") or {})
437 pruned: list[str] = []
438 kept: list[str] = []
439 for bname in sorted(branch_sections):
440 ref_file = _heads_dir(root) / bname
441 if ref_file.exists():
442 kept.append(bname)
443 else:
444 pruned.append(bname)
445 if not args.dry_run:
446 delete_branch_meta(root, bname)
447 result: _PruneConfigJson = {
448 "action": "prune_config",
449 "pruned": len(pruned),
450 "kept": len(kept),
451 "dry_run": args.dry_run,
452 "pruned_branches": pruned,
453 }
454 if json_out:
455 print(json.dumps(result))
456 else:
457 prefix = "[dry-run] " if args.dry_run else ""
458 print(f"{prefix}Pruned {len(pruned)} stale config entries, kept {len(kept)} live entries.")
459 for b in pruned:
460 print(f" - {b}")
461 return
462
463 # ------------------------------------------------------------------
464 # DELETE / FORCE-DELETE
465 # Supports two modes:
466 # muse branch -d|-D <local-branch> — delete a local branch
467 # muse branch -d|-D -r <remote>/<branch> — prune a remote-tracking ref
468 # ------------------------------------------------------------------
469 if op in ("delete", "force_delete"):
470 if not positional:
471 if json_out:
472 print(json.dumps({"error": "usage", "message": "muse branch -d|-D [-r] <branch> …"}))
473 print("❌ Usage: muse branch -d|-D [-r] <branch> …", file=sys.stderr)
474 raise SystemExit(ExitCode.USER_ERROR)
475
476 # -r flag: delete local remote-tracking refs (no server call).
477 if remotes_only:
478 from muse.cli.config import delete_remote_head
479 for spec in positional:
480 # Accept both "remote/branch" and "remotes/remote/branch" spellings.
481 clean = spec.removeprefix("remotes/")
482 slash = clean.find("/")
483 if slash == -1:
484 if json_out:
485 print(json.dumps({"error": "invalid_ref", "ref": spec, "message": "remote-tracking ref must be '<remote>/<branch>'"}))
486 print(
487 f"❌ Remote-tracking ref must be '<remote>/<branch>', got "
488 f"'{sanitize_display(spec)}'.",
489 file=sys.stderr,
490 )
491 raise SystemExit(ExitCode.USER_ERROR)
492 remote_name = clean[:slash]
493 branch_name = clean[slash + 1:]
494 removed = delete_remote_head(remote_name, branch_name, root)
495 if not removed:
496 if json_out:
497 print(json.dumps({"error": "not_found", "ref": clean, "message": f"remote-tracking ref '{clean}' not found"}))
498 print(
499 f"❌ Remote-tracking ref '{sanitize_display(clean)}' not found.",
500 file=sys.stderr,
501 )
502 raise SystemExit(ExitCode.USER_ERROR)
503 if json_out:
504 print(json.dumps({
505 "action": "deleted_remote_tracking",
506 "remote": remote_name,
507 "branch": branch_name,
508 }))
509 else:
510 print(
511 f"Deleted remote-tracking ref "
512 f"{_c(sanitize_display(clean), _RED, tty=tty)}."
513 )
514 return
515
516 force = op == "force_delete"
517 for branch_name in positional:
518 try:
519 validate_branch_name(branch_name)
520 except ValueError as exc:
521 if json_out:
522 print(json.dumps({"error": "invalid_branch_name", "branch": branch_name, "message": str(exc)}))
523 print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr)
524 raise SystemExit(ExitCode.USER_ERROR)
525 if branch_name == current:
526 if json_out:
527 print(json.dumps({"error": "current_branch", "branch": branch_name, "message": f"cannot delete the currently checked-out branch '{branch_name}'"}))
528 print(
529 f"❌ Cannot delete the currently checked-out branch "
530 f"'{sanitize_display(branch_name)}'.",
531 file=sys.stderr,
532 )
533 raise SystemExit(ExitCode.USER_ERROR)
534 rf = _ref_file(root, branch_name)
535 if not rf.is_file():
536 if json_out:
537 print(json.dumps({"error": "not_found", "branch": branch_name, "message": f"branch '{branch_name}' not found"}))
538 print(f"❌ Branch '{sanitize_display(branch_name)}' not found.", file=sys.stderr)
539 raise SystemExit(ExitCode.USER_ERROR)
540 if not force and not _is_merged(root, branch_name, current):
541 if json_out:
542 print(json.dumps({"error": "not_merged", "branch": branch_name, "message": f"branch '{branch_name}' is not fully merged", "hint": "use -D to force-delete"}))
543 print(
544 f"❌ Branch '{sanitize_display(branch_name)}' is not fully merged.\n"
545 f" Use -D to force-delete.",
546 file=sys.stderr,
547 )
548 raise SystemExit(ExitCode.USER_ERROR)
549 protected = get_protected_branches(root)
550 if is_branch_protected(branch_name, protected):
551 if json_out:
552 print(json.dumps({"error": "protected", "branch": branch_name, "message": f"branch '{branch_name}' is protected and cannot be deleted"}))
553 else:
554 print(
555 f"❌ Branch '{sanitize_display(branch_name)}' is protected and cannot be deleted.",
556 file=sys.stderr,
557 )
558 raise SystemExit(ExitCode.USER_ERROR)
559 tip = read_ref(rf) or ""
560 if not args.dry_run:
561 rf.unlink()
562 _cleanup_empty_dirs(rf, heads_dir)
563 reflog_file = _reflog_branch_path(root, branch_name)
564 reflog_file.unlink(missing_ok=True)
565 _cleanup_empty_dirs(reflog_file, _reflog_heads_dir(root))
566 delete_branch_meta(root, branch_name)
567 if json_out:
568 print(json.dumps({"action": "deleted", "branch": branch_name, "was": tip, "dry_run": args.dry_run}))
569 else:
570 prefix = "[dry-run] " if args.dry_run else ""
571 print(
572 f"{prefix}Deleted branch {_c(sanitize_display(branch_name), _RED, tty=tty)} "
573 f"({_c('was ' + (tip or 'unknown'), _DIM, tty=tty)})."
574 )
575 return
576
577 # ------------------------------------------------------------------
578 # RENAME / FORCE-RENAME
579 # ------------------------------------------------------------------
580 if op in ("rename", "force_rename"):
581 force = op == "force_rename"
582 if len(positional) == 1:
583 old_name, new_name = current, positional[0]
584 elif len(positional) == 2:
585 old_name, new_name = positional[0], positional[1]
586 else:
587 if json_out:
588 print(json.dumps({"error": "usage", "message": "muse branch -m|-M [<old>] <new>"}))
589 print("❌ Usage: muse branch -m|-M [<old>] <new>", file=sys.stderr)
590 raise SystemExit(ExitCode.USER_ERROR)
591 for n in (old_name, new_name):
592 try:
593 validate_branch_name(n)
594 except ValueError as exc:
595 if json_out:
596 print(json.dumps({"error": "invalid_branch_name", "branch": n, "message": str(exc)}))
597 print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr)
598 raise SystemExit(ExitCode.USER_ERROR)
599 src = _ref_file(root, old_name)
600 dst = _ref_file(root, new_name)
601 if not src.is_file():
602 if json_out:
603 print(json.dumps({"error": "not_found", "branch": old_name, "message": f"branch '{old_name}' not found"}))
604 print(f"❌ Branch '{sanitize_display(old_name)}' not found.", file=sys.stderr)
605 raise SystemExit(ExitCode.USER_ERROR)
606 if dst.is_file() and not force:
607 if json_out:
608 print(json.dumps({"error": "already_exists", "branch": new_name, "message": f"branch '{new_name}' already exists", "hint": "use -M to force"}))
609 print(
610 f"❌ Branch '{sanitize_display(new_name)}' already exists. Use -M to force.",
611 file=sys.stderr,
612 )
613 raise SystemExit(ExitCode.USER_ERROR)
614 tip = read_ref(src) or ""
615 if tip:
616 write_branch_ref(root, new_name, tip)
617 else:
618 write_text_atomic(dst, "")
619 src.unlink()
620 _cleanup_empty_dirs(src, heads_dir)
621 if old_name == current:
622 write_head_branch(root, new_name)
623 if json_out:
624 print(json.dumps({"action": "renamed", "from": old_name, "to": new_name}))
625 else:
626 print(
627 f"Renamed branch "
628 f"{_c(sanitize_display(old_name), _YELLOW, tty=tty)} → "
629 f"{_c(sanitize_display(new_name), _GREEN, tty=tty)}."
630 )
631 return
632
633 # ------------------------------------------------------------------
634 # COPY / FORCE-COPY
635 # ------------------------------------------------------------------
636 if op in ("copy", "force_copy"):
637 force = op == "force_copy"
638 if len(positional) == 1:
639 src_name, dst_name = current, positional[0]
640 elif len(positional) == 2:
641 src_name, dst_name = positional[0], positional[1]
642 else:
643 if json_out:
644 print(json.dumps({"error": "usage", "message": "muse branch -c|-C [<src>] <dest>"}))
645 print("❌ Usage: muse branch -c|-C [<src>] <dest>", file=sys.stderr)
646 raise SystemExit(ExitCode.USER_ERROR)
647 for n in (src_name, dst_name):
648 try:
649 validate_branch_name(n)
650 except ValueError as exc:
651 if json_out:
652 print(json.dumps({"error": "invalid_branch_name", "branch": n, "message": str(exc)}))
653 print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr)
654 raise SystemExit(ExitCode.USER_ERROR)
655 src = _ref_file(root, src_name)
656 dst = _ref_file(root, dst_name)
657 if not src.is_file():
658 if json_out:
659 print(json.dumps({"error": "not_found", "branch": src_name, "message": f"branch '{src_name}' not found"}))
660 print(f"❌ Branch '{sanitize_display(src_name)}' not found.", file=sys.stderr)
661 raise SystemExit(ExitCode.USER_ERROR)
662 if dst.is_file() and not force:
663 if json_out:
664 print(json.dumps({"error": "already_exists", "branch": dst_name, "message": f"branch '{dst_name}' already exists", "hint": "use -C to force"}))
665 print(
666 f"❌ Branch '{sanitize_display(dst_name)}' already exists. Use -C to force.",
667 file=sys.stderr,
668 )
669 raise SystemExit(ExitCode.USER_ERROR)
670 tip = read_ref(src) or ""
671 if tip:
672 write_branch_ref(root, dst_name, tip)
673 else:
674 write_text_atomic(dst, "")
675 if json_out:
676 print(json.dumps({"action": "copied", "from": src_name, "to": dst_name}))
677 else:
678 print(
679 f"Copied branch "
680 f"{_c(sanitize_display(src_name), _YELLOW, tty=tty)} → "
681 f"{_c(sanitize_display(dst_name), _GREEN, tty=tty)}."
682 )
683 return
684
685 # ------------------------------------------------------------------
686 # CREATE
687 # ------------------------------------------------------------------
688 if op is None and positional:
689 new_name = positional[0]
690 start_point: str | None = positional[1] if len(positional) > 1 else None
691 try:
692 validate_branch_name(new_name)
693 except ValueError as exc:
694 if json_out:
695 print(json.dumps({"error": "invalid_branch_name", "branch": new_name, "message": str(exc)}))
696 print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr)
697 raise SystemExit(ExitCode.USER_ERROR)
698 rf = _ref_file(root, new_name)
699 if rf.is_file():
700 # Branch exists. If --intent or --resumable given (no start_point),
701 # treat as a metadata update rather than a failed create.
702 if (intent is not None or resumable) and start_point is None:
703 write_branch_meta(
704 root,
705 new_name,
706 intent=intent,
707 resumable=resumable if resumable else None,
708 )
709 meta = read_branch_meta(root, new_name)
710 if json_out:
711 print(json.dumps({
712 "action": "updated",
713 "branch": new_name,
714 "intent": meta.get("intent"),
715 "resumable": bool(meta.get("resumable", False)),
716 }))
717 else:
718 parts: list[str] = []
719 if intent is not None:
720 parts.append(f"intent={sanitize_display(intent)!r}")
721 if resumable:
722 parts.append("resumable=true")
723 print(
724 f"Updated branch {_c(sanitize_display(new_name), _YELLOW, tty=tty)}"
725 f"{' (' + ', '.join(parts) + ')' if parts else ''}."
726 )
727 return
728 if json_out:
729 print(json.dumps({"error": "already_exists", "branch": new_name, "message": f"branch '{new_name}' already exists"}))
730 print(f"❌ Branch '{sanitize_display(new_name)}' already exists.", file=sys.stderr)
731 raise SystemExit(ExitCode.USER_ERROR)
732
733 if start_point is not None:
734 # Resolve branch names, full SHAs, and abbreviated SHA prefixes.
735 sp_tip: str = _resolve_start_point(root, current, start_point)
736 else:
737 sp_tip = get_head_commit_id(root, current) or ""
738
739 if sp_tip:
740 write_branch_ref(root, new_name, sp_tip)
741 source_label = start_point or current or "HEAD"
742 append_reflog(root, new_name, None, sp_tip, "", f"branch: Created from {source_label}")
743 else:
744 write_text_atomic(rf, "")
745
746 # Persist intent / resumable if supplied.
747 if intent is not None or resumable:
748 write_branch_meta(
749 root,
750 new_name,
751 intent=intent,
752 resumable=resumable if resumable else None,
753 )
754
755 if json_out:
756 print(json.dumps({
757 **make_envelope(elapsed),
758 **_BranchCreateJson(
759 action="created",
760 branch=new_name,
761 commit_id=sp_tip or None,
762 intent=intent,
763 resumable=resumable,
764 ),
765 "from": start_point,
766 }))
767 else:
768 print(f"Created branch {_c(sanitize_display(new_name), _GREEN, tty=tty)}.")
769 return
770
771 # ------------------------------------------------------------------
772 # LIST
773 # ------------------------------------------------------------------
774 local_branches = _list_local_branches(root)
775 if remotes_only:
776 display_branches = [f"remotes/{b}" for b in _list_remotes(root)]
777 elif all_branches:
778 display_branches = local_branches + [f"remotes/{b}" for b in _list_remotes(root)]
779 else:
780 display_branches = list(local_branches)
781
782 # --resumable filter: only show branches marked resumable in config.
783 if resumable and not positional:
784 filtered_resumable: list[str] = []
785 for b in display_branches:
786 local_b = b.removeprefix("remotes/")
787 meta = read_branch_meta(root, local_b)
788 if meta.get("resumable") is True:
789 filtered_resumable.append(b)
790 display_branches = filtered_resumable
791
792 # --merged / --no-merged / --contains filters
793 if merged_into or not_merged_into or contains_commit:
794 resolved_current = current
795
796 # Pre-compute ancestor sets once — not once per branch.
797 # _commit_ancestors walks the full commit DAG; recomputing it for every
798 # branch being checked is O(branches × commits) instead of O(commits).
799 _merged_ancestors: set[str] | None = None
800 if merged_into:
801 _into = resolved_current if merged_into == "HEAD" else merged_into
802 _into_tip = get_head_commit_id(root, _into)
803 _merged_ancestors = _commit_ancestors(root, _into_tip) if _into_tip else set()
804
805 _not_merged_ancestors: set[str] | None = None
806 if not_merged_into:
807 _into = resolved_current if not_merged_into == "HEAD" else not_merged_into
808 _into_tip = get_head_commit_id(root, _into)
809 _not_merged_ancestors = _commit_ancestors(root, _into_tip) if _into_tip else set()
810
811 def _passes(b: str) -> bool:
812 local_b = b.removeprefix("remotes/")
813 if _merged_ancestors is not None:
814 tip = get_head_commit_id(root, local_b)
815 if tip is None or tip not in _merged_ancestors:
816 return False
817 if _not_merged_ancestors is not None:
818 tip = get_head_commit_id(root, local_b)
819 if tip is not None and tip in _not_merged_ancestors:
820 return False
821 if contains_commit:
822 if not _contains_commit(root, local_b, contains_commit):
823 return False
824 return True
825
826 display_branches = [b for b in display_branches if _passes(b)]
827
828 # --sort: sort by committed date if requested.
829 # Name sort is the default (already applied by _list_local_branches).
830 if sort_key == "committeddate":
831 def _committed_ts(b: str) -> str:
832 cid = _resolve_commit_id(root, b)
833 if not cid:
834 return ""
835 rec = read_commit(root, cid)
836 return rec.committed_at.isoformat() if rec else ""
837
838 display_branches = sorted(display_branches, key=_committed_ts, reverse=True)
839
840 if json_out:
841 result: list[_BranchEntryJson] = []
842 for b in display_branches:
843 local_b = b.removeprefix("remotes/")
844 commit_id = _resolve_commit_id(root, b)
845 rec = read_commit(root, commit_id) if commit_id else None
846 last_message: str | None = (
847 sanitize_display(rec.message.splitlines()[0][:72]) if rec and rec.message else None
848 )
849 upstream: str | None = _upstream_for(root, local_b)
850 meta = read_branch_meta(root, local_b)
851 branch_intent: str | None = meta.get("intent") or None # type: ignore[assignment]
852 branch_intent = sanitize_display(branch_intent) if branch_intent else None
853 branch_resumable: bool = bool(meta.get("resumable", False))
854 created_by: str | None = (rec.agent_id if rec and rec.agent_id else None)
855 result.append({
856 "name": b,
857 "current": local_b == current,
858 "commit_id": commit_id or None,
859 "committed_at": rec.committed_at.isoformat() if rec else None,
860 "last_message": last_message,
861 "upstream": upstream,
862 "intent": branch_intent,
863 "resumable": branch_resumable,
864 "created_by": created_by,
865 })
866 print(json.dumps(result))
867 return
868
869 for b in display_branches:
870 is_remote_entry = b.startswith("remotes/")
871 local_b = b.removeprefix("remotes/")
872 is_current = (local_b == current) and not is_remote_entry
873 marker = _c("* ", _GREEN, tty=tty) if is_current else " "
874 # Build the display name once; apply sanitization before any coloring
875 # so that ANSI codes from _c() are not accidentally re-sanitized.
876 safe_name = sanitize_display(b)
877 name_str = _c(safe_name, _GREEN, tty=tty) if is_current else safe_name
878 if verbose >= 1:
879 commit_id = _resolve_commit_id(root, b)
880 short = short_id(commit_id) if commit_id else "(empty)"
881 rec = read_commit(root, commit_id) if commit_id else None
882 msg = sanitize_display(rec.message.splitlines()[0][:48]) if rec and rec.message else ""
883 short_str = _c(short, _YELLOW, tty=tty)
884 if verbose >= 2:
885 upstream = _upstream_for(root, local_b)
886 up_str = (
887 f" [{_c(sanitize_display(upstream), _CYAN, tty=tty)}]"
888 if upstream else ""
889 )
890 print(f"{marker}{name_str} {short_str}{up_str} {msg}")
891 else:
892 print(f"{marker}{name_str} {short_str} {msg}")
893 else:
894 print(f"{marker}{name_str}")
File History 4 commits
sha256:e237dc0e8122609f5131d11c9dda9bba480395a5a4355cda0c9fa7e634fddd29 fix(branch): guard -d --dry-run against destructive writes;… Sonnet 4.6 patch 2 days ago
sha256:42d0a10d093980afe543a88e9ed75c5ad0ac339026e419ee3b07b8a57c73ed5b fix(branch): dry-run flag ignored on branch delete (closes #25) Sonnet 4.6 patch 2 days ago
sha256:99f8eb388d9a9c353e68b9a4e5bebe1b4240a8f511e6f0928e58c0e95153e103 feat: branch --prune-config, fix hub repo delete docstrings… Sonnet 4.6 minor 10 days ago