"""``muse branch`` — list, create, rename, copy, and delete branches. Git-idiomatic flags:: muse branch # list all local branches muse branch # create branch at HEAD muse branch # create at commit SHA, SHA prefix, or branch muse branch -d # safe delete (must be merged) muse branch -D # force delete muse branch -dr / # delete local remote-tracking ref (no server call) muse branch -Dr / # same, force (no merge check) muse branch -m [] # rename (safe) muse branch -M [] # rename (force) muse branch -c [] # copy (safe) muse branch -C [] # copy (force) muse branch -v # list with last commit SHA + subject muse branch -vv # also show upstream tracking ref muse branch -r # list remote-tracking branches muse branch -a # list local + remote-tracking branches muse branch --merged [] # only branches merged into commit muse branch --no-merged [] # only branches NOT merged into commit muse branch --contains # only branches that contain commit muse branch --sort name # sort by name (default) muse branch --sort committeddate # sort by date of most recent commit To delete a branch on the remote **and** prune the local tracking ref in one step, use ``muse push``:: muse push --delete Agents should pass ``--format json`` (or ``--json``) for machine-readable output on all operations. The listing JSON schema is:: [ { "name": "feat/my-thing", "current": false, "commit_id": " | null", "committed_at": "2026-03-21T12:00:00+00:00 | null", "last_message": "Add feature X", "upstream": "origin/feat/my-thing" }, ... ] Exit codes:: 0 — success 1 — invalid branch name, branch not found, attempting to delete checked-out branch """ import argparse import json import logging import pathlib import sys import tomllib from typing import TypedDict from muse.cli.config import delete_branch_meta, get_protected_branches, get_remote_head, is_branch_protected, read_branch_meta, write_branch_meta from muse.core.reflog import append_reflog from muse.core.types import MsgpackDict, short_id from muse.core.paths import ref_path as _ref_path, heads_dir as _heads_dir, remotes_dir as _remotes_dir, config_toml_path as _config_toml_path, reflog_branch_path as _reflog_branch_path, reflog_heads_dir as _reflog_heads_dir from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.timing import start_timer from muse.core.errors import ExitCode from muse.core.repo import require_repo from muse.core.refs import read_ref from muse.core.io import write_text_atomic from muse.core.refs import ( get_head_commit_id, read_current_branch, write_branch_ref, write_head_branch, ) from muse.core.commits import ( read_commit, resolve_commit_ref, ) from muse.core.validation import clamp_int, sanitize_display, validate_branch_name type _Payload = dict[str, str | None] logger = logging.getLogger(__name__) class _BranchCreateJson(EnvelopeJson): """JSON output for ``muse branch -b --json``.""" action: str branch: str commit_id: str | None intent: str | None resumable: bool class _BranchEntryJson(TypedDict): name: str current: bool commit_id: str | None committed_at: str | None last_message: str | None upstream: str | None intent: str | None resumable: bool created_by: str | None class _BranchListJson(EnvelopeJson): """JSON output for ``muse branch --json``.""" branches: list[_BranchEntryJson] class _PruneConfigJson(TypedDict): """JSON output for ``muse branch --prune-config``.""" action: str pruned: int kept: int dry_run: bool pruned_branches: list[str] # --------------------------------------------------------------------------- # ANSI helpers — emitted only when stdout is a TTY. # --------------------------------------------------------------------------- _RESET = "\033[0m" _BOLD = "\033[1m" _DIM = "\033[2m" _GREEN = "\033[32m" _RED = "\033[31m" _YELLOW = "\033[33m" _CYAN = "\033[36m" def _c(text: str, *codes: str, tty: bool) -> str: """Wrap *text* in ANSI escape *codes* only when writing to a TTY.""" if not tty: return text return "".join(codes) + text + _RESET # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _ref_file(root: pathlib.Path, branch: str) -> pathlib.Path: """Return the ref-file path for a local branch.""" return _ref_path(root, branch) def _list_local_branches(root: pathlib.Path) -> list[str]: """Return a sorted list of all local branch names. Only plain files are considered; directories, symlinks and any file not directly under ``refs/heads/`` (e.g. lock files) are silently skipped. """ heads_dir = _heads_dir(root) if not heads_dir.exists(): return [] return sorted( p.relative_to(heads_dir).as_posix() for p in heads_dir.rglob("*") if p.is_file() and not p.name.startswith(".") ) def _list_remotes(root: pathlib.Path) -> list[str]: """Return sorted remote-tracking branch names as ``remote/branch``. Only plain files are visited; symlinks, hidden files, and directories are skipped to avoid leaking internal artefacts into the listing. """ remotes_dir = _remotes_dir(root) if not remotes_dir.exists(): return [] results: list[str] = [] for remote_dir in sorted(remotes_dir.iterdir()): if not remote_dir.is_dir(): continue remote = remote_dir.name for ref_file in sorted(remote_dir.rglob("*")): if ref_file.is_file() and not ref_file.name.startswith("."): branch_rel = ref_file.relative_to(remote_dir).as_posix() results.append(f"{remote}/{branch_rel}") return results def _resolve_commit_id(root: pathlib.Path, b: str) -> str: """Return the current commit ID for a branch listing entry. *b* is the display name (e.g. ``"main"`` or ``"remotes/origin/dev"``). Remote entries are read from the remote tracking file under ``.muse/remotes/``; local entries use the standard head ref. """ if b.startswith("remotes/"): rest = b.removeprefix("remotes/") remote, _, branch_name = rest.partition("/") if branch_name: return get_remote_head(remote, branch_name, root) or "" return get_head_commit_id(root, b) or "" def _upstream_for(root: pathlib.Path, branch: str) -> str | None: """Return the upstream tracking ref for *branch*, or ``None`` if unset.""" config_path = _config_toml_path(root) if not config_path.exists(): return None try: with config_path.open("rb") as f: config = tomllib.load(f) section = config.get("branch", {}).get(branch, {}) remote: str | None = section.get("remote") merge_ref: str | None = section.get("merge") if remote and merge_ref: short = merge_ref.removeprefix("refs/heads/") return f"{remote}/{short}" except Exception: pass return None def _commit_ancestors(root: pathlib.Path, commit_id: str) -> set[str]: """Return the set of all commit IDs reachable from *commit_id* (inclusive).""" from muse.core.graph import ancestor_ids return ancestor_ids(root, commit_id) def _is_merged(root: pathlib.Path, branch: str, into: str) -> bool: """Return ``True`` if the tip of *branch* is an ancestor of the tip of *into*.""" branch_tip = get_head_commit_id(root, branch) into_tip = get_head_commit_id(root, into) if branch_tip is None or into_tip is None: return False return branch_tip in _commit_ancestors(root, into_tip) def _contains_commit(root: pathlib.Path, branch: str, commit_id: str) -> bool: """Return ``True`` if *commit_id* is reachable from the tip of *branch*.""" tip = get_head_commit_id(root, branch) if tip is None: return False return commit_id in _commit_ancestors(root, tip) def _cleanup_empty_dirs(ref_file: pathlib.Path, heads_dir: pathlib.Path) -> None: """Remove any empty parent directories left behind after unlinking *ref_file*.""" for parent in ref_file.parents: if parent == heads_dir: break try: parent.rmdir() except OSError: break def _resolve_start_point(root: pathlib.Path, current: str, start_point: str) -> str: """Resolve *start_point* to a full commit ID. Accepts branch names, full SHA-256 commit IDs, and abbreviated SHA prefixes (any unambiguous prefix works). Returns the raw *start_point* string unchanged if resolution fails — the caller is responsible for surfacing a meaningful error in that case. """ # Try as branch name first — skip if it contains characters forbidden in # branch names (e.g. ':' in sha256:-prefixed IDs) to avoid ValueError. try: branch_tip = get_head_commit_id(root, start_point) if branch_tip is not None: return branch_tip except ValueError: pass # Not a valid branch name — fall through to SHA resolution. # Fall back to SHA / SHA-prefix resolution. # resolve_commit_ref handles both bare hex and sha256:-prefixed IDs. rec = resolve_commit_ref(root, current, start_point) if rec is not None: return rec.commit_id # Return as-is; the caller's write_branch_ref will expose the invalid ID. return start_point # --------------------------------------------------------------------------- # CLI registration # --------------------------------------------------------------------------- def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``muse branch`` subcommand and all its flags.""" parser = subparsers.add_parser( "branch", help="List, create, rename, copy, or delete branches.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("args", nargs="*", help="Branch name(s) — context-sensitive.") # Mutually exclusive operation flags (mirrors git branch). ops = parser.add_mutually_exclusive_group() ops.add_argument( "-d", "--delete", dest="op", action="store_const", const="delete", help="Delete a branch (safe — must be fully merged).", ) ops.add_argument( "-D", dest="op", action="store_const", const="force_delete", help="Force-delete a branch regardless of merge status.", ) ops.add_argument( "-m", "--move", dest="op", action="store_const", const="rename", help="Rename a branch (safe).", ) ops.add_argument( "-M", dest="op", action="store_const", const="force_rename", help="Force-rename a branch.", ) ops.add_argument( "-c", "--copy", dest="op", action="store_const", const="copy", help="Copy a branch (safe).", ) ops.add_argument( "-C", dest="op", action="store_const", const="force_copy", help="Force-copy a branch.", ) # Listing modifiers. parser.add_argument( "-v", action="count", default=0, dest="verbose", help="Show last commit SHA + subject. Repeat (-vv) to also show upstream.", ) parser.add_argument( "-r", "--remotes", action="store_true", help="List remote-tracking branches.", ) parser.add_argument( "-a", "--all", action="store_true", dest="all_branches", help="List both local and remote-tracking branches.", ) parser.add_argument( "--merged", metavar="COMMIT", nargs="?", const="HEAD", help="Only list branches merged into COMMIT (default HEAD).", ) parser.add_argument( "--no-merged", metavar="COMMIT", nargs="?", const="HEAD", help="Only list branches NOT merged into COMMIT (default HEAD).", ) parser.add_argument( "--contains", metavar="COMMIT", help="Only list branches that contain COMMIT.", ) parser.add_argument( "--sort", default="name", metavar="KEY", choices=["name", "committeddate"], help="Sort branches by 'name' (default) or 'committeddate'.", ) parser.add_argument( "--intent", default=None, metavar="TEXT", help="Short description of what this branch is for (stored in config).", ) parser.add_argument( "--resumable", action="store_true", default=False, help=( "On create: mark this branch as a resumable agent checkpoint. " "On list (no name): filter to resumable branches only." ), ) parser.add_argument( "--prune-config", action="store_true", dest="prune_config", help=( "Remove stale [branch.*] entries from .muse/config.toml — " "entries for branches whose ref no longer exists. " "Use --dry-run to preview without writing." ), ) parser.add_argument( "--dry-run", action="store_true", dest="dry_run", help="With --prune-config: report what would be removed without writing.", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit machine-readable JSON.", ) parser.set_defaults(func=run, op=None, prune_config=False, dry_run=False) # --------------------------------------------------------------------------- # Command handler # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> None: """List, create, rename, copy, or delete branches. Without a subcommand flag, lists all local branches. With ``--format json`` the output is a stable JSON array; mutation ops (create, rename, copy, delete) emit a single result object with an ``"action"`` key. Agent quickstart ---------------- :: muse branch --json # list all branches muse branch --json --resumable # list resumable branches only muse branch -b feat/thing --json # create branch muse branch -d feat/thing --json # delete branch JSON fields (list mode — top-level is a bare array) ---------------------------------------------------- name Branch name. current ``true`` for the currently checked-out branch. commit_id Full ``sha256:…`` commit ID at the tip. last_message Commit message at the tip. upstream Upstream tracking ref; ``null`` if none. intent Branch intent annotation (``--intent`` flag). resumable ``true`` if the branch was created with ``--resumable``. JSON fields (mutation mode) --------------------------- action What was done: ``"created"``, ``"deleted"``, ``"renamed"``, etc. name Branch name acted upon. Exit codes ---------- 0 Success. 1 Invalid arguments, branch not found, or operation conflicts. 2 Not inside a Muse repository. """ elapsed = start_timer() positional: list[str] = args.args op: str | None = args.op verbose: int = clamp_int(args.verbose, 0, 4, 'verbose') remotes_only: bool = args.remotes all_branches: bool = args.all_branches merged_into: str | None = args.merged not_merged_into: str | None = args.no_merged contains_commit: str | None = args.contains sort_key: str = args.sort intent: str | None = args.intent resumable: bool = args.resumable json_out: bool = args.json_out tty: bool = sys.stdout.isatty() root = require_repo() current = read_current_branch(root) heads_dir = _heads_dir(root) # ------------------------------------------------------------------ # PRUNE-CONFIG — remove stale [branch.*] entries from config.toml # ------------------------------------------------------------------ if args.prune_config: config_path = _config_toml_path(root) config: MsgpackDict = {} if config_path.exists(): import tomllib as _tomllib config = _tomllib.loads(config_path.read_text()) branch_sections: MsgpackDict = dict(config.get("branch") or {}) pruned: list[str] = [] kept: list[str] = [] for bname in sorted(branch_sections): ref_file = _heads_dir(root) / bname if ref_file.exists(): kept.append(bname) else: pruned.append(bname) if not args.dry_run: delete_branch_meta(root, bname) result: _PruneConfigJson = { "action": "prune_config", "pruned": len(pruned), "kept": len(kept), "dry_run": args.dry_run, "pruned_branches": pruned, } if json_out: print(json.dumps(result)) else: prefix = "[dry-run] " if args.dry_run else "" print(f"{prefix}Pruned {len(pruned)} stale config entries, kept {len(kept)} live entries.") for b in pruned: print(f" - {b}") return # ------------------------------------------------------------------ # DELETE / FORCE-DELETE # Supports two modes: # muse branch -d|-D — delete a local branch # muse branch -d|-D -r / — prune a remote-tracking ref # ------------------------------------------------------------------ if op in ("delete", "force_delete"): if not positional: if json_out: print(json.dumps({"error": "usage", "message": "muse branch -d|-D [-r] …"})) print("❌ Usage: muse branch -d|-D [-r] …", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) # -r flag: delete local remote-tracking refs (no server call). if remotes_only: from muse.cli.config import delete_remote_head for spec in positional: # Accept both "remote/branch" and "remotes/remote/branch" spellings. clean = spec.removeprefix("remotes/") slash = clean.find("/") if slash == -1: if json_out: print(json.dumps({"error": "invalid_ref", "ref": spec, "message": "remote-tracking ref must be '/'"})) print( f"❌ Remote-tracking ref must be '/', got " f"'{sanitize_display(spec)}'.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) remote_name = clean[:slash] branch_name = clean[slash + 1:] removed = delete_remote_head(remote_name, branch_name, root) if not removed: if json_out: print(json.dumps({"error": "not_found", "ref": clean, "message": f"remote-tracking ref '{clean}' not found"})) print( f"❌ Remote-tracking ref '{sanitize_display(clean)}' not found.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if json_out: print(json.dumps({ "action": "deleted_remote_tracking", "remote": remote_name, "branch": branch_name, })) else: print( f"Deleted remote-tracking ref " f"{_c(sanitize_display(clean), _RED, tty=tty)}." ) return force = op == "force_delete" for branch_name in positional: try: validate_branch_name(branch_name) except ValueError as exc: if json_out: print(json.dumps({"error": "invalid_branch_name", "branch": branch_name, "message": str(exc)})) print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if branch_name == current: if json_out: print(json.dumps({"error": "current_branch", "branch": branch_name, "message": f"cannot delete the currently checked-out branch '{branch_name}'"})) print( f"❌ Cannot delete the currently checked-out branch " f"'{sanitize_display(branch_name)}'.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) rf = _ref_file(root, branch_name) if not rf.is_file(): if json_out: print(json.dumps({"error": "not_found", "branch": branch_name, "message": f"branch '{branch_name}' not found"})) print(f"❌ Branch '{sanitize_display(branch_name)}' not found.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if not force and not _is_merged(root, branch_name, current): if json_out: print(json.dumps({"error": "not_merged", "branch": branch_name, "message": f"branch '{branch_name}' is not fully merged", "hint": "use -D to force-delete"})) print( f"❌ Branch '{sanitize_display(branch_name)}' is not fully merged.\n" f" Use -D to force-delete.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) protected = get_protected_branches(root) if is_branch_protected(branch_name, protected): if json_out: print(json.dumps({"error": "protected", "branch": branch_name, "message": f"branch '{branch_name}' is protected and cannot be deleted"})) else: print( f"❌ Branch '{sanitize_display(branch_name)}' is protected and cannot be deleted.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) tip = read_ref(rf) or "" if not args.dry_run: rf.unlink() _cleanup_empty_dirs(rf, heads_dir) reflog_file = _reflog_branch_path(root, branch_name) reflog_file.unlink(missing_ok=True) _cleanup_empty_dirs(reflog_file, _reflog_heads_dir(root)) delete_branch_meta(root, branch_name) if json_out: print(json.dumps({"action": "deleted", "branch": branch_name, "was": tip, "dry_run": args.dry_run})) else: prefix = "[dry-run] " if args.dry_run else "" print( f"{prefix}Deleted branch {_c(sanitize_display(branch_name), _RED, tty=tty)} " f"({_c('was ' + (tip or 'unknown'), _DIM, tty=tty)})." ) return # ------------------------------------------------------------------ # RENAME / FORCE-RENAME # ------------------------------------------------------------------ if op in ("rename", "force_rename"): force = op == "force_rename" if len(positional) == 1: old_name, new_name = current, positional[0] elif len(positional) == 2: old_name, new_name = positional[0], positional[1] else: if json_out: print(json.dumps({"error": "usage", "message": "muse branch -m|-M [] "})) print("❌ Usage: muse branch -m|-M [] ", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) for n in (old_name, new_name): try: validate_branch_name(n) except ValueError as exc: if json_out: print(json.dumps({"error": "invalid_branch_name", "branch": n, "message": str(exc)})) print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) src = _ref_file(root, old_name) dst = _ref_file(root, new_name) if not src.is_file(): if json_out: print(json.dumps({"error": "not_found", "branch": old_name, "message": f"branch '{old_name}' not found"})) print(f"❌ Branch '{sanitize_display(old_name)}' not found.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if dst.is_file() and not force: if json_out: print(json.dumps({"error": "already_exists", "branch": new_name, "message": f"branch '{new_name}' already exists", "hint": "use -M to force"})) print( f"❌ Branch '{sanitize_display(new_name)}' already exists. Use -M to force.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) tip = read_ref(src) or "" if tip: write_branch_ref(root, new_name, tip) else: write_text_atomic(dst, "") src.unlink() _cleanup_empty_dirs(src, heads_dir) if old_name == current: write_head_branch(root, new_name) if json_out: print(json.dumps({"action": "renamed", "from": old_name, "to": new_name})) else: print( f"Renamed branch " f"{_c(sanitize_display(old_name), _YELLOW, tty=tty)} → " f"{_c(sanitize_display(new_name), _GREEN, tty=tty)}." ) return # ------------------------------------------------------------------ # COPY / FORCE-COPY # ------------------------------------------------------------------ if op in ("copy", "force_copy"): force = op == "force_copy" if len(positional) == 1: src_name, dst_name = current, positional[0] elif len(positional) == 2: src_name, dst_name = positional[0], positional[1] else: if json_out: print(json.dumps({"error": "usage", "message": "muse branch -c|-C [] "})) print("❌ Usage: muse branch -c|-C [] ", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) for n in (src_name, dst_name): try: validate_branch_name(n) except ValueError as exc: if json_out: print(json.dumps({"error": "invalid_branch_name", "branch": n, "message": str(exc)})) print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) src = _ref_file(root, src_name) dst = _ref_file(root, dst_name) if not src.is_file(): if json_out: print(json.dumps({"error": "not_found", "branch": src_name, "message": f"branch '{src_name}' not found"})) print(f"❌ Branch '{sanitize_display(src_name)}' not found.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if dst.is_file() and not force: if json_out: print(json.dumps({"error": "already_exists", "branch": dst_name, "message": f"branch '{dst_name}' already exists", "hint": "use -C to force"})) print( f"❌ Branch '{sanitize_display(dst_name)}' already exists. Use -C to force.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) tip = read_ref(src) or "" if tip: write_branch_ref(root, dst_name, tip) else: write_text_atomic(dst, "") if json_out: print(json.dumps({"action": "copied", "from": src_name, "to": dst_name})) else: print( f"Copied branch " f"{_c(sanitize_display(src_name), _YELLOW, tty=tty)} → " f"{_c(sanitize_display(dst_name), _GREEN, tty=tty)}." ) return # ------------------------------------------------------------------ # CREATE # ------------------------------------------------------------------ if op is None and positional: new_name = positional[0] start_point: str | None = positional[1] if len(positional) > 1 else None try: validate_branch_name(new_name) except ValueError as exc: if json_out: print(json.dumps({"error": "invalid_branch_name", "branch": new_name, "message": str(exc)})) print(f"❌ Invalid branch name: {sanitize_display(str(exc))}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) rf = _ref_file(root, new_name) if rf.is_file(): # Branch exists. If --intent or --resumable given (no start_point), # treat as a metadata update rather than a failed create. if (intent is not None or resumable) and start_point is None: write_branch_meta( root, new_name, intent=intent, resumable=resumable if resumable else None, ) meta = read_branch_meta(root, new_name) if json_out: print(json.dumps({ "action": "updated", "branch": new_name, "intent": meta.get("intent"), "resumable": bool(meta.get("resumable", False)), })) else: parts: list[str] = [] if intent is not None: parts.append(f"intent={sanitize_display(intent)!r}") if resumable: parts.append("resumable=true") print( f"Updated branch {_c(sanitize_display(new_name), _YELLOW, tty=tty)}" f"{' (' + ', '.join(parts) + ')' if parts else ''}." ) return if json_out: print(json.dumps({"error": "already_exists", "branch": new_name, "message": f"branch '{new_name}' already exists"})) print(f"❌ Branch '{sanitize_display(new_name)}' already exists.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if start_point is not None: # Resolve branch names, full SHAs, and abbreviated SHA prefixes. sp_tip: str = _resolve_start_point(root, current, start_point) else: sp_tip = get_head_commit_id(root, current) or "" if sp_tip: write_branch_ref(root, new_name, sp_tip) source_label = start_point or current or "HEAD" append_reflog(root, new_name, None, sp_tip, "", f"branch: Created from {source_label}") else: write_text_atomic(rf, "") # Persist intent / resumable if supplied. if intent is not None or resumable: write_branch_meta( root, new_name, intent=intent, resumable=resumable if resumable else None, ) if json_out: print(json.dumps({ **make_envelope(elapsed), **_BranchCreateJson( action="created", branch=new_name, commit_id=sp_tip or None, intent=intent, resumable=resumable, ), "from": start_point, })) else: print(f"Created branch {_c(sanitize_display(new_name), _GREEN, tty=tty)}.") return # ------------------------------------------------------------------ # LIST # ------------------------------------------------------------------ local_branches = _list_local_branches(root) if remotes_only: display_branches = [f"remotes/{b}" for b in _list_remotes(root)] elif all_branches: display_branches = local_branches + [f"remotes/{b}" for b in _list_remotes(root)] else: display_branches = list(local_branches) # --resumable filter: only show branches marked resumable in config. if resumable and not positional: filtered_resumable: list[str] = [] for b in display_branches: local_b = b.removeprefix("remotes/") meta = read_branch_meta(root, local_b) if meta.get("resumable") is True: filtered_resumable.append(b) display_branches = filtered_resumable # --merged / --no-merged / --contains filters if merged_into or not_merged_into or contains_commit: resolved_current = current # Pre-compute ancestor sets once — not once per branch. # _commit_ancestors walks the full commit DAG; recomputing it for every # branch being checked is O(branches × commits) instead of O(commits). _merged_ancestors: set[str] | None = None if merged_into: _into = resolved_current if merged_into == "HEAD" else merged_into _into_tip = get_head_commit_id(root, _into) _merged_ancestors = _commit_ancestors(root, _into_tip) if _into_tip else set() _not_merged_ancestors: set[str] | None = None if not_merged_into: _into = resolved_current if not_merged_into == "HEAD" else not_merged_into _into_tip = get_head_commit_id(root, _into) _not_merged_ancestors = _commit_ancestors(root, _into_tip) if _into_tip else set() def _passes(b: str) -> bool: local_b = b.removeprefix("remotes/") if _merged_ancestors is not None: tip = get_head_commit_id(root, local_b) if tip is None or tip not in _merged_ancestors: return False if _not_merged_ancestors is not None: tip = get_head_commit_id(root, local_b) if tip is not None and tip in _not_merged_ancestors: return False if contains_commit: if not _contains_commit(root, local_b, contains_commit): return False return True display_branches = [b for b in display_branches if _passes(b)] # --sort: sort by committed date if requested. # Name sort is the default (already applied by _list_local_branches). if sort_key == "committeddate": def _committed_ts(b: str) -> str: cid = _resolve_commit_id(root, b) if not cid: return "" rec = read_commit(root, cid) return rec.committed_at.isoformat() if rec else "" display_branches = sorted(display_branches, key=_committed_ts, reverse=True) if json_out: result: list[_BranchEntryJson] = [] for b in display_branches: local_b = b.removeprefix("remotes/") commit_id = _resolve_commit_id(root, b) rec = read_commit(root, commit_id) if commit_id else None last_message: str | None = ( sanitize_display(rec.message.splitlines()[0][:72]) if rec and rec.message else None ) upstream: str | None = _upstream_for(root, local_b) meta = read_branch_meta(root, local_b) branch_intent: str | None = meta.get("intent") or None # type: ignore[assignment] branch_intent = sanitize_display(branch_intent) if branch_intent else None branch_resumable: bool = bool(meta.get("resumable", False)) created_by: str | None = (rec.agent_id if rec and rec.agent_id else None) result.append({ "name": b, "current": local_b == current, "commit_id": commit_id or None, "committed_at": rec.committed_at.isoformat() if rec else None, "last_message": last_message, "upstream": upstream, "intent": branch_intent, "resumable": branch_resumable, "created_by": created_by, }) print(json.dumps(result)) return for b in display_branches: is_remote_entry = b.startswith("remotes/") local_b = b.removeprefix("remotes/") is_current = (local_b == current) and not is_remote_entry marker = _c("* ", _GREEN, tty=tty) if is_current else " " # Build the display name once; apply sanitization before any coloring # so that ANSI codes from _c() are not accidentally re-sanitized. safe_name = sanitize_display(b) name_str = _c(safe_name, _GREEN, tty=tty) if is_current else safe_name if verbose >= 1: commit_id = _resolve_commit_id(root, b) short = short_id(commit_id) if commit_id else "(empty)" rec = read_commit(root, commit_id) if commit_id else None msg = sanitize_display(rec.message.splitlines()[0][:48]) if rec and rec.message else "" short_str = _c(short, _YELLOW, tty=tty) if verbose >= 2: upstream = _upstream_for(root, local_b) up_str = ( f" [{_c(sanitize_display(upstream), _CYAN, tty=tty)}]" if upstream else "" ) print(f"{marker}{name_str} {short_str}{up_str} {msg}") else: print(f"{marker}{name_str} {short_str} {msg}") else: print(f"{marker}{name_str}")