worktree.py
python
sha256:51116ec824246acde6abf729e6ba854c223dc5173eff31a645520208023b0652
refactor(bridge): comprehensive spec sweep — close all issu…
Sonnet 4.6
minor
⚠ breaking
28 days ago
| 1 | """Worktree management — multiple simultaneous branch checkouts. |
| 2 | |
| 3 | A *worktree* is a second (or third, …) checked-out working tree linked to |
| 4 | the same ``.muse/`` repository. Each worktree has its own branch and its own |
| 5 | working directory, so multiple agents — or multiple human engineers — can |
| 6 | work on different branches simultaneously without interfering with each other. |
| 7 | |
| 8 | Layout |
| 9 | ------ |
| 10 | Each linked worktree lives in a sibling directory of the repository root:: |
| 11 | |
| 12 | myproject/ ← main worktree (holds .muse/) |
| 13 | .muse/ |
| 14 | worktrees/ |
| 15 | <name>.json ← metadata for each linked worktree |
| 16 | <name>.HEAD ← HEAD ref for the linked worktree |
| 17 | |
| 18 | myproject-<name>/ ← linked worktree directory |
| 19 | ← worktree working directory |
| 20 | |
| 21 | The shared ``.muse/`` directory is the single source of truth for commits, |
| 22 | snapshots, objects, and branch refs. Each worktree has its own HEAD file |
| 23 | stored inside the main ``.muse/worktrees/<name>.HEAD``. |
| 24 | |
| 25 | Security model |
| 26 | -------------- |
| 27 | - **Name validation**: Worktree names pass through ``validate_branch_name`` |
| 28 | (no path separators, no null bytes, no control characters). This ensures |
| 29 | the derived meta path and HEAD path cannot escape ``.muse/worktrees/``. |
| 30 | - **Symlink guard on meta files**: ``_load_meta`` rejects a symlink at the |
| 31 | meta file path before any read. |
| 32 | - **Size cap on meta files**: ``_load_meta`` refuses files larger than |
| 33 | ``_MAX_META_BYTES`` to guard against memory exhaustion. |
| 34 | - **Path safety on delete**: ``remove_worktree`` and ``prune_worktrees`` |
| 35 | call ``_safe_delete_path`` which refuses to ``rmtree`` a path that is a |
| 36 | symlink itself or that resolves inside the shared ``.muse/`` store. |
| 37 | |
| 38 | Agent concurrency |
| 39 | ----------------- |
| 40 | Multiple agents can operate on separate worktrees simultaneously. Each |
| 41 | worktree's HEAD is independent; commits from one worktree appear immediately |
| 42 | in all others (they share the object store). |
| 43 | """ |
| 44 | |
| 45 | import json |
| 46 | import logging |
| 47 | import pathlib |
| 48 | import shutil |
| 49 | from dataclasses import dataclass |
| 50 | from typing import TypedDict |
| 51 | |
| 52 | from muse.core.paths import muse_dir as _muse_dir, heads_dir as _heads_dir, ref_path as _ref_path, worktrees_dir |
| 53 | from muse.core.types import load_json_file |
| 54 | from muse.core.object_store import restore_object |
| 55 | from muse.core.io import write_text_atomic |
| 56 | from muse.core.refs import ( |
| 57 | get_head_commit_id, |
| 58 | read_current_branch, |
| 59 | ) |
| 60 | from muse.core.commits import read_commit |
| 61 | from muse.core.snapshots import read_snapshot |
| 62 | from muse.core.validation import contain_path, validate_branch_name |
| 63 | |
| 64 | logger = logging.getLogger(__name__) |
| 65 | |
| 66 | |
| 67 | # Guard against tampered or pathologically large meta files. |
| 68 | _MAX_META_BYTES: int = 4 * 1024 # 4 KiB — more than enough for any meta record |
| 69 | |
| 70 | # --------------------------------------------------------------------------- |
| 71 | # Types |
| 72 | # --------------------------------------------------------------------------- |
| 73 | |
| 74 | class WorktreeRecord(TypedDict): |
| 75 | """Persisted metadata for a linked worktree.""" |
| 76 | |
| 77 | name: str |
| 78 | branch: str |
| 79 | path: str # absolute path to the worktree directory |
| 80 | |
| 81 | @dataclass |
| 82 | class WorktreeInfo: |
| 83 | """Runtime information about a worktree.""" |
| 84 | |
| 85 | name: str |
| 86 | branch: str |
| 87 | path: pathlib.Path |
| 88 | head_commit: str | None |
| 89 | is_main: bool = False |
| 90 | |
| 91 | class WorktreeStatusResult(TypedDict): |
| 92 | """Machine-readable status of a single worktree.""" |
| 93 | |
| 94 | name: str |
| 95 | branch: str |
| 96 | path: str |
| 97 | head_commit: str | None |
| 98 | present: bool |
| 99 | is_main: bool |
| 100 | |
| 101 | # --------------------------------------------------------------------------- |
| 102 | # Paths |
| 103 | # --------------------------------------------------------------------------- |
| 104 | |
| 105 | |
| 106 | def _worktree_meta_path(repo_root: pathlib.Path, name: str) -> pathlib.Path: |
| 107 | return worktrees_dir(repo_root) / f"{name}.json" |
| 108 | |
| 109 | def _worktree_head_path(repo_root: pathlib.Path, name: str) -> pathlib.Path: |
| 110 | return worktrees_dir(repo_root) / f"{name}.HEAD" |
| 111 | |
| 112 | def _worktree_dir(repo_root: pathlib.Path, name: str) -> pathlib.Path: |
| 113 | """Return the default path of the linked worktree directory (sibling of repo_root).""" |
| 114 | parent = repo_root.parent |
| 115 | repo_name = repo_root.name |
| 116 | return parent / f"{repo_name}-{name}" |
| 117 | |
| 118 | # --------------------------------------------------------------------------- |
| 119 | # Internal helpers |
| 120 | # --------------------------------------------------------------------------- |
| 121 | |
| 122 | def _load_meta(repo_root: pathlib.Path, name: str) -> WorktreeRecord | None: |
| 123 | """Read and validate the metadata file for *name*. |
| 124 | |
| 125 | Safety guards applied before any read: |
| 126 | |
| 127 | - **Symlink check**: a symlink at the meta path could redirect writes to |
| 128 | arbitrary locations or reads to sensitive files. |
| 129 | - **Size cap** (``_MAX_META_BYTES``): a tampered or corrupt meta file |
| 130 | cannot be used to exhaust memory. |
| 131 | """ |
| 132 | meta_path = _worktree_meta_path(repo_root, name) |
| 133 | if not meta_path.exists(): |
| 134 | return None |
| 135 | if meta_path.is_symlink(): |
| 136 | logger.warning("⚠️ Worktree meta file for %r is a symlink — ignoring", name) |
| 137 | return None |
| 138 | try: |
| 139 | if meta_path.stat().st_size > _MAX_META_BYTES: |
| 140 | logger.warning( |
| 141 | "⚠️ Worktree meta file for %r exceeds size cap (%d bytes) — ignoring", |
| 142 | name, |
| 143 | _MAX_META_BYTES, |
| 144 | ) |
| 145 | return None |
| 146 | raw = load_json_file(meta_path) |
| 147 | if raw is None: |
| 148 | logger.warning("⚠️ Could not read worktree metadata for %r", name) |
| 149 | return None |
| 150 | return WorktreeRecord( |
| 151 | name=str(raw["name"]), |
| 152 | branch=str(raw["branch"]), |
| 153 | path=str(raw["path"]), |
| 154 | ) |
| 155 | except (KeyError, ValueError, OSError) as exc: |
| 156 | logger.warning("⚠️ Could not read worktree metadata for %r: %s", name, exc) |
| 157 | return None |
| 158 | |
| 159 | def _save_meta(repo_root: pathlib.Path, record: WorktreeRecord) -> None: |
| 160 | """Write *record* to the metadata file atomically.""" |
| 161 | meta_path = _worktree_meta_path(repo_root, record["name"]) |
| 162 | write_text_atomic(meta_path, json.dumps(record, indent=2)) |
| 163 | |
| 164 | def _write_worktree_pointer(wt_dir: pathlib.Path, repo_root: pathlib.Path) -> None: |
| 165 | """Write a ``.muse`` pointer file in *wt_dir* pointing to *repo_root*'s store. |
| 166 | |
| 167 | The file contains a single line:: |
| 168 | |
| 169 | musestore: /absolute/path/to/main/.muse |
| 170 | |
| 171 | This mirrors git's ``.git`` worktree file, enabling ``find_repo_root`` |
| 172 | to resolve the shared object store from any worktree directory. |
| 173 | |
| 174 | No-ops when ``wt_dir`` is the main repo itself or when ``.muse`` already |
| 175 | exists as a directory (a legacy worktree with its own store). |
| 176 | """ |
| 177 | pointer_path = _muse_dir(wt_dir) |
| 178 | # Never clobber a real .muse/ store directory. |
| 179 | if pointer_path.is_dir(): |
| 180 | logger.debug("Skipping pointer write — %s is already a store directory", pointer_path) |
| 181 | return |
| 182 | store_path = _muse_dir(repo_root).resolve() |
| 183 | # Don't write a self-referential pointer (wt_dir IS the main repo). |
| 184 | if pointer_path.resolve() == store_path: |
| 185 | logger.debug("Skipping pointer write — wt_dir is the main repo root") |
| 186 | return |
| 187 | write_text_atomic(pointer_path, f"musestore: {store_path}\n") |
| 188 | logger.debug("Wrote worktree pointer %s → %s", pointer_path, store_path) |
| 189 | |
| 190 | def _safe_delete_path(repo_root: pathlib.Path, path: pathlib.Path) -> bool: |
| 191 | """Delete *path* and its contents, with safety guards. |
| 192 | |
| 193 | Refuses deletion when: |
| 194 | |
| 195 | - *path* is a symlink (could target an unrelated directory). |
| 196 | - *path* resolves to be inside the shared ``.muse/`` store — deleting it |
| 197 | would corrupt the repository. |
| 198 | - *path* does not exist (no-op, returns True so callers can proceed). |
| 199 | |
| 200 | Returns: |
| 201 | ``True`` if the directory was deleted or did not exist. |
| 202 | ``False`` if the deletion was refused for safety reasons. |
| 203 | """ |
| 204 | if not path.exists(): |
| 205 | return True |
| 206 | if path.is_symlink(): |
| 207 | logger.warning( |
| 208 | "⚠️ Refusing to delete worktree path %s — it is a symlink", path |
| 209 | ) |
| 210 | return False |
| 211 | muse_dir = _muse_dir(repo_root).resolve() |
| 212 | try: |
| 213 | resolved = path.resolve() |
| 214 | except OSError: |
| 215 | logger.warning("⚠️ Could not resolve worktree path %s", path) |
| 216 | return False |
| 217 | try: |
| 218 | resolved.relative_to(muse_dir) |
| 219 | # Path is inside .muse/ — refuse. |
| 220 | logger.warning( |
| 221 | "⚠️ Refusing to delete worktree path %s — it resolves inside .muse/", path |
| 222 | ) |
| 223 | return False |
| 224 | except ValueError: |
| 225 | pass # Not inside .muse/ — safe to delete. |
| 226 | shutil.rmtree(path) |
| 227 | return True |
| 228 | |
| 229 | def _read_main_branch(repo_root: pathlib.Path) -> str: |
| 230 | return read_current_branch(repo_root) |
| 231 | |
| 232 | # --------------------------------------------------------------------------- |
| 233 | # Public API |
| 234 | # --------------------------------------------------------------------------- |
| 235 | |
| 236 | def add_worktree( |
| 237 | repo_root: pathlib.Path, |
| 238 | name: str, |
| 239 | branch: str, |
| 240 | path: pathlib.Path | None = None, |
| 241 | ) -> pathlib.Path: |
| 242 | """Create and populate a new linked worktree. |
| 243 | |
| 244 | Args: |
| 245 | repo_root: Main repository root (where ``.muse/`` lives). |
| 246 | name: Short identifier for the worktree (validated like a branch |
| 247 | name — no path separators, no null bytes). |
| 248 | branch: Branch to check out in the new worktree. |
| 249 | path: Explicit filesystem path for the worktree directory. When |
| 250 | ``None`` (default) the standard sibling layout is used: |
| 251 | ``<repo_root.parent>/<repo_root.name>-<name>``. |
| 252 | |
| 253 | Returns: |
| 254 | The path to the newly created worktree directory. |
| 255 | |
| 256 | Raises: |
| 257 | ValueError: If the name is invalid, the worktree already exists, or |
| 258 | the branch does not exist. |
| 259 | """ |
| 260 | validate_branch_name(name) |
| 261 | |
| 262 | wt_dir = path if path is not None else _worktree_dir(repo_root, name) |
| 263 | meta_path = _worktree_meta_path(repo_root, name) |
| 264 | |
| 265 | if meta_path.exists(): |
| 266 | raise ValueError(f"Worktree '{name}' already exists.") |
| 267 | if wt_dir.exists(): |
| 268 | raise ValueError(f"Directory '{wt_dir}' already exists.") |
| 269 | |
| 270 | # Verify the branch exists. |
| 271 | branch_ref = _ref_path(repo_root, branch) |
| 272 | if not branch_ref.exists(): |
| 273 | raise ValueError(f"Branch '{branch}' does not exist.") |
| 274 | |
| 275 | # Create the worktree directory. |
| 276 | wt_dir.mkdir(parents=True) |
| 277 | |
| 278 | # Write .muse pointer file so `muse` commands work inside the worktree. |
| 279 | _write_worktree_pointer(wt_dir, repo_root) |
| 280 | |
| 281 | # Write the worktree HEAD file. |
| 282 | head_path = _worktree_head_path(repo_root, name) |
| 283 | write_text_atomic(head_path, f"refs/heads/{branch}\n") |
| 284 | |
| 285 | # Populate the worktree from the branch's latest snapshot. |
| 286 | commit_id = get_head_commit_id(repo_root, branch) |
| 287 | if commit_id: |
| 288 | commit = read_commit(repo_root, commit_id) |
| 289 | if commit: |
| 290 | snap = read_snapshot(repo_root, commit.snapshot_id) |
| 291 | if snap: |
| 292 | for rel_path, object_id in snap.manifest.items(): |
| 293 | try: |
| 294 | dest = contain_path(wt_dir, rel_path) |
| 295 | except ValueError as exc: |
| 296 | logger.warning("⚠️ Skipping unsafe path %r: %s", rel_path, exc) |
| 297 | continue |
| 298 | restore_object(repo_root, object_id, dest) |
| 299 | |
| 300 | # Persist metadata. |
| 301 | record: WorktreeRecord = { |
| 302 | "name": name, |
| 303 | "branch": branch, |
| 304 | "path": str(wt_dir), |
| 305 | } |
| 306 | _save_meta(repo_root, record) |
| 307 | |
| 308 | logger.info("✅ Worktree '%s' created at %s (branch: %s)", name, wt_dir, branch) |
| 309 | return wt_dir |
| 310 | |
| 311 | def list_worktrees(repo_root: pathlib.Path) -> list[WorktreeInfo]: |
| 312 | """Return all worktrees (main + linked), sorted by name. |
| 313 | |
| 314 | The main worktree is always first; linked worktrees follow in |
| 315 | lexicographic order of name. |
| 316 | """ |
| 317 | results: list[WorktreeInfo] = [] |
| 318 | |
| 319 | # Main worktree. |
| 320 | main_branch = _read_main_branch(repo_root) |
| 321 | main_head = get_head_commit_id(repo_root, main_branch) |
| 322 | results.append(WorktreeInfo( |
| 323 | name="(main)", |
| 324 | branch=main_branch, |
| 325 | path=repo_root, |
| 326 | head_commit=main_head, |
| 327 | is_main=True, |
| 328 | )) |
| 329 | |
| 330 | wt_dir = worktrees_dir(repo_root) |
| 331 | if not wt_dir.exists(): |
| 332 | return results |
| 333 | |
| 334 | for meta_file in sorted(wt_dir.glob("*.json")): |
| 335 | name = meta_file.stem |
| 336 | record = _load_meta(repo_root, name) |
| 337 | if record is None: |
| 338 | continue |
| 339 | wt_path = pathlib.Path(record["path"]) |
| 340 | branch = record["branch"] |
| 341 | head_path = _worktree_head_path(repo_root, name) |
| 342 | commit_id = get_head_commit_id(repo_root, branch) if head_path.exists() else None |
| 343 | results.append(WorktreeInfo( |
| 344 | name=name, |
| 345 | branch=branch, |
| 346 | path=wt_path, |
| 347 | head_commit=commit_id, |
| 348 | )) |
| 349 | return results |
| 350 | |
| 351 | def remove_worktree(repo_root: pathlib.Path, name: str, force: bool = False) -> None: |
| 352 | """Remove a linked worktree. |
| 353 | |
| 354 | The branch itself is not deleted — only the worktree directory and its |
| 355 | metadata are removed. Commits already made in the worktree remain in the |
| 356 | shared object store. |
| 357 | |
| 358 | Args: |
| 359 | repo_root: Main repository root. |
| 360 | name: Name of the worktree to remove. |
| 361 | force: Accepted for interface compatibility. Currently has no |
| 362 | effect since Muse does not track working-tree dirtiness |
| 363 | per-worktree. |
| 364 | |
| 365 | Raises: |
| 366 | ValueError: If the worktree does not exist or its metadata is corrupt. |
| 367 | """ |
| 368 | validate_branch_name(name) |
| 369 | |
| 370 | meta_path = _worktree_meta_path(repo_root, name) |
| 371 | if not meta_path.exists(): |
| 372 | raise ValueError(f"Worktree '{name}' does not exist.") |
| 373 | |
| 374 | record = _load_meta(repo_root, name) |
| 375 | if record is None: |
| 376 | raise ValueError(f"Could not read metadata for worktree '{name}'.") |
| 377 | |
| 378 | wt_path = pathlib.Path(record["path"]) |
| 379 | if not _safe_delete_path(repo_root, wt_path): |
| 380 | raise ValueError( |
| 381 | f"Refusing to delete worktree path '{wt_path}' — " |
| 382 | "it is a symlink or resolves inside .muse/." |
| 383 | ) |
| 384 | |
| 385 | meta_path.unlink(missing_ok=True) |
| 386 | head_path = _worktree_head_path(repo_root, name) |
| 387 | head_path.unlink(missing_ok=True) |
| 388 | # Belt-and-suspenders: remove the pointer file if it still exists |
| 389 | # (the directory deletion above should have removed it already). |
| 390 | pointer_path = _muse_dir(wt_path) |
| 391 | pointer_path.unlink(missing_ok=True) |
| 392 | |
| 393 | logger.info("Worktree '%s' removed.", name) |
| 394 | |
| 395 | def prune_worktrees(repo_root: pathlib.Path, *, dry_run: bool = False) -> list[str]: |
| 396 | """Remove metadata for worktrees whose directories no longer exist. |
| 397 | |
| 398 | Args: |
| 399 | repo_root: Main repository root. |
| 400 | dry_run: When ``True``, report what would be pruned without |
| 401 | making any filesystem changes. |
| 402 | |
| 403 | Returns: |
| 404 | Names of pruned (or would-be-pruned, when *dry_run* is ``True``) |
| 405 | worktrees. |
| 406 | """ |
| 407 | pruned: list[str] = [] |
| 408 | wt_dir = worktrees_dir(repo_root) |
| 409 | if not wt_dir.exists(): |
| 410 | return pruned |
| 411 | for meta_file in list(wt_dir.glob("*.json")): |
| 412 | name = meta_file.stem |
| 413 | record = _load_meta(repo_root, name) |
| 414 | if record is None: |
| 415 | if not dry_run: |
| 416 | meta_file.unlink(missing_ok=True) |
| 417 | pruned.append(name) |
| 418 | continue |
| 419 | wt_path = pathlib.Path(record["path"]) |
| 420 | if not wt_path.exists(): |
| 421 | if not dry_run: |
| 422 | meta_file.unlink(missing_ok=True) |
| 423 | _worktree_head_path(repo_root, name).unlink(missing_ok=True) |
| 424 | pruned.append(name) |
| 425 | return pruned |
| 426 | |
| 427 | def get_worktree_status(repo_root: pathlib.Path, name: str) -> WorktreeStatusResult: |
| 428 | """Return the status of a single named worktree. |
| 429 | |
| 430 | Covers both linked worktrees (by name) and the implicit main worktree |
| 431 | when *name* is ``"(main)"`` or ``"main"``. |
| 432 | |
| 433 | Returns: |
| 434 | A ``WorktreeStatusResult`` with present/absent flag, current branch, |
| 435 | and HEAD commit — ready for JSON serialisation by the CLI. |
| 436 | |
| 437 | Raises: |
| 438 | ValueError: If no worktree with *name* exists. |
| 439 | """ |
| 440 | # Main worktree shortcut. |
| 441 | if name in {"(main)", "main"}: |
| 442 | branch = _read_main_branch(repo_root) |
| 443 | head = get_head_commit_id(repo_root, branch) |
| 444 | return WorktreeStatusResult( |
| 445 | name="(main)", |
| 446 | branch=branch, |
| 447 | path=str(repo_root), |
| 448 | head_commit=head, |
| 449 | present=repo_root.exists(), |
| 450 | is_main=True, |
| 451 | ) |
| 452 | |
| 453 | validate_branch_name(name) |
| 454 | meta_path = _worktree_meta_path(repo_root, name) |
| 455 | if not meta_path.exists(): |
| 456 | raise ValueError(f"Worktree '{name}' does not exist.") |
| 457 | |
| 458 | record = _load_meta(repo_root, name) |
| 459 | if record is None: |
| 460 | raise ValueError(f"Could not read metadata for worktree '{name}'.") |
| 461 | |
| 462 | wt_path = pathlib.Path(record["path"]) |
| 463 | branch = record["branch"] |
| 464 | head_path = _worktree_head_path(repo_root, name) |
| 465 | head = get_head_commit_id(repo_root, branch) if head_path.exists() else None |
| 466 | return WorktreeStatusResult( |
| 467 | name=name, |
| 468 | branch=branch, |
| 469 | path=str(wt_path), |
| 470 | head_commit=head, |
| 471 | present=wt_path.exists() and not wt_path.is_symlink(), |
| 472 | is_main=False, |
| 473 | ) |
| 474 | |
| 475 | def repair_worktree_pointers(repo_root: pathlib.Path) -> list[str]: |
| 476 | """Write missing ``.muse`` pointer files for all registered worktrees. |
| 477 | |
| 478 | Idempotent — safe to run multiple times. Returns a list of worktree |
| 479 | names that were repaired (pointer file written or overwritten). |
| 480 | """ |
| 481 | repaired: list[str] = [] |
| 482 | for wt in list_worktrees(repo_root): |
| 483 | if wt.is_main: |
| 484 | continue # Main worktree has a real .muse/ store — never needs a pointer. |
| 485 | wt_path = wt.path |
| 486 | if not wt_path.is_dir(): |
| 487 | continue |
| 488 | _write_worktree_pointer(wt_path, repo_root) |
| 489 | repaired.append(wt.name) |
| 490 | logger.info("✅ Repaired worktree pointer: %s", _muse_dir(wt_path)) |
| 491 | return repaired |
File History
1 commit
sha256:51116ec824246acde6abf729e6ba854c223dc5173eff31a645520208023b0652
refactor(bridge): comprehensive spec sweep — close all issu…
Sonnet 4.6
minor
⚠
28 days ago