gabriel / muse public
worktree.py python
491 lines 17.6 KB
Raw
sha256:51116ec824246acde6abf729e6ba854c223dc5173eff31a645520208023b0652 refactor(bridge): comprehensive spec sweep — close all issu… Sonnet 4.6 minor ⚠ breaking 28 days ago
1 """Worktree management — multiple simultaneous branch checkouts.
2
3 A *worktree* is a second (or third, …) checked-out working tree linked to
4 the same ``.muse/`` repository. Each worktree has its own branch and its own
5 working directory, so multiple agents — or multiple human engineers — can
6 work on different branches simultaneously without interfering with each other.
7
8 Layout
9 ------
10 Each linked worktree lives in a sibling directory of the repository root::
11
12 myproject/ ← main worktree (holds .muse/)
13 .muse/
14 worktrees/
15 <name>.json ← metadata for each linked worktree
16 <name>.HEAD ← HEAD ref for the linked worktree
17
18 myproject-<name>/ ← linked worktree directory
19 ← worktree working directory
20
21 The shared ``.muse/`` directory is the single source of truth for commits,
22 snapshots, objects, and branch refs. Each worktree has its own HEAD file
23 stored inside the main ``.muse/worktrees/<name>.HEAD``.
24
25 Security model
26 --------------
27 - **Name validation**: Worktree names pass through ``validate_branch_name``
28 (no path separators, no null bytes, no control characters). This ensures
29 the derived meta path and HEAD path cannot escape ``.muse/worktrees/``.
30 - **Symlink guard on meta files**: ``_load_meta`` rejects a symlink at the
31 meta file path before any read.
32 - **Size cap on meta files**: ``_load_meta`` refuses files larger than
33 ``_MAX_META_BYTES`` to guard against memory exhaustion.
34 - **Path safety on delete**: ``remove_worktree`` and ``prune_worktrees``
35 call ``_safe_delete_path`` which refuses to ``rmtree`` a path that is a
36 symlink itself or that resolves inside the shared ``.muse/`` store.
37
38 Agent concurrency
39 -----------------
40 Multiple agents can operate on separate worktrees simultaneously. Each
41 worktree's HEAD is independent; commits from one worktree appear immediately
42 in all others (they share the object store).
43 """
44
45 import json
46 import logging
47 import pathlib
48 import shutil
49 from dataclasses import dataclass
50 from typing import TypedDict
51
52 from muse.core.paths import muse_dir as _muse_dir, heads_dir as _heads_dir, ref_path as _ref_path, worktrees_dir
53 from muse.core.types import load_json_file
54 from muse.core.object_store import restore_object
55 from muse.core.io import write_text_atomic
56 from muse.core.refs import (
57 get_head_commit_id,
58 read_current_branch,
59 )
60 from muse.core.commits import read_commit
61 from muse.core.snapshots import read_snapshot
62 from muse.core.validation import contain_path, validate_branch_name
63
64 logger = logging.getLogger(__name__)
65
66
67 # Guard against tampered or pathologically large meta files.
68 _MAX_META_BYTES: int = 4 * 1024 # 4 KiB — more than enough for any meta record
69
70 # ---------------------------------------------------------------------------
71 # Types
72 # ---------------------------------------------------------------------------
73
74 class WorktreeRecord(TypedDict):
75 """Persisted metadata for a linked worktree."""
76
77 name: str
78 branch: str
79 path: str # absolute path to the worktree directory
80
81 @dataclass
82 class WorktreeInfo:
83 """Runtime information about a worktree."""
84
85 name: str
86 branch: str
87 path: pathlib.Path
88 head_commit: str | None
89 is_main: bool = False
90
91 class WorktreeStatusResult(TypedDict):
92 """Machine-readable status of a single worktree."""
93
94 name: str
95 branch: str
96 path: str
97 head_commit: str | None
98 present: bool
99 is_main: bool
100
101 # ---------------------------------------------------------------------------
102 # Paths
103 # ---------------------------------------------------------------------------
104
105
106 def _worktree_meta_path(repo_root: pathlib.Path, name: str) -> pathlib.Path:
107 return worktrees_dir(repo_root) / f"{name}.json"
108
109 def _worktree_head_path(repo_root: pathlib.Path, name: str) -> pathlib.Path:
110 return worktrees_dir(repo_root) / f"{name}.HEAD"
111
112 def _worktree_dir(repo_root: pathlib.Path, name: str) -> pathlib.Path:
113 """Return the default path of the linked worktree directory (sibling of repo_root)."""
114 parent = repo_root.parent
115 repo_name = repo_root.name
116 return parent / f"{repo_name}-{name}"
117
118 # ---------------------------------------------------------------------------
119 # Internal helpers
120 # ---------------------------------------------------------------------------
121
122 def _load_meta(repo_root: pathlib.Path, name: str) -> WorktreeRecord | None:
123 """Read and validate the metadata file for *name*.
124
125 Safety guards applied before any read:
126
127 - **Symlink check**: a symlink at the meta path could redirect writes to
128 arbitrary locations or reads to sensitive files.
129 - **Size cap** (``_MAX_META_BYTES``): a tampered or corrupt meta file
130 cannot be used to exhaust memory.
131 """
132 meta_path = _worktree_meta_path(repo_root, name)
133 if not meta_path.exists():
134 return None
135 if meta_path.is_symlink():
136 logger.warning("⚠️ Worktree meta file for %r is a symlink — ignoring", name)
137 return None
138 try:
139 if meta_path.stat().st_size > _MAX_META_BYTES:
140 logger.warning(
141 "⚠️ Worktree meta file for %r exceeds size cap (%d bytes) — ignoring",
142 name,
143 _MAX_META_BYTES,
144 )
145 return None
146 raw = load_json_file(meta_path)
147 if raw is None:
148 logger.warning("⚠️ Could not read worktree metadata for %r", name)
149 return None
150 return WorktreeRecord(
151 name=str(raw["name"]),
152 branch=str(raw["branch"]),
153 path=str(raw["path"]),
154 )
155 except (KeyError, ValueError, OSError) as exc:
156 logger.warning("⚠️ Could not read worktree metadata for %r: %s", name, exc)
157 return None
158
159 def _save_meta(repo_root: pathlib.Path, record: WorktreeRecord) -> None:
160 """Write *record* to the metadata file atomically."""
161 meta_path = _worktree_meta_path(repo_root, record["name"])
162 write_text_atomic(meta_path, json.dumps(record, indent=2))
163
164 def _write_worktree_pointer(wt_dir: pathlib.Path, repo_root: pathlib.Path) -> None:
165 """Write a ``.muse`` pointer file in *wt_dir* pointing to *repo_root*'s store.
166
167 The file contains a single line::
168
169 musestore: /absolute/path/to/main/.muse
170
171 This mirrors git's ``.git`` worktree file, enabling ``find_repo_root``
172 to resolve the shared object store from any worktree directory.
173
174 No-ops when ``wt_dir`` is the main repo itself or when ``.muse`` already
175 exists as a directory (a legacy worktree with its own store).
176 """
177 pointer_path = _muse_dir(wt_dir)
178 # Never clobber a real .muse/ store directory.
179 if pointer_path.is_dir():
180 logger.debug("Skipping pointer write — %s is already a store directory", pointer_path)
181 return
182 store_path = _muse_dir(repo_root).resolve()
183 # Don't write a self-referential pointer (wt_dir IS the main repo).
184 if pointer_path.resolve() == store_path:
185 logger.debug("Skipping pointer write — wt_dir is the main repo root")
186 return
187 write_text_atomic(pointer_path, f"musestore: {store_path}\n")
188 logger.debug("Wrote worktree pointer %s → %s", pointer_path, store_path)
189
190 def _safe_delete_path(repo_root: pathlib.Path, path: pathlib.Path) -> bool:
191 """Delete *path* and its contents, with safety guards.
192
193 Refuses deletion when:
194
195 - *path* is a symlink (could target an unrelated directory).
196 - *path* resolves to be inside the shared ``.muse/`` store — deleting it
197 would corrupt the repository.
198 - *path* does not exist (no-op, returns True so callers can proceed).
199
200 Returns:
201 ``True`` if the directory was deleted or did not exist.
202 ``False`` if the deletion was refused for safety reasons.
203 """
204 if not path.exists():
205 return True
206 if path.is_symlink():
207 logger.warning(
208 "⚠️ Refusing to delete worktree path %s — it is a symlink", path
209 )
210 return False
211 muse_dir = _muse_dir(repo_root).resolve()
212 try:
213 resolved = path.resolve()
214 except OSError:
215 logger.warning("⚠️ Could not resolve worktree path %s", path)
216 return False
217 try:
218 resolved.relative_to(muse_dir)
219 # Path is inside .muse/ — refuse.
220 logger.warning(
221 "⚠️ Refusing to delete worktree path %s — it resolves inside .muse/", path
222 )
223 return False
224 except ValueError:
225 pass # Not inside .muse/ — safe to delete.
226 shutil.rmtree(path)
227 return True
228
229 def _read_main_branch(repo_root: pathlib.Path) -> str:
230 return read_current_branch(repo_root)
231
232 # ---------------------------------------------------------------------------
233 # Public API
234 # ---------------------------------------------------------------------------
235
236 def add_worktree(
237 repo_root: pathlib.Path,
238 name: str,
239 branch: str,
240 path: pathlib.Path | None = None,
241 ) -> pathlib.Path:
242 """Create and populate a new linked worktree.
243
244 Args:
245 repo_root: Main repository root (where ``.muse/`` lives).
246 name: Short identifier for the worktree (validated like a branch
247 name — no path separators, no null bytes).
248 branch: Branch to check out in the new worktree.
249 path: Explicit filesystem path for the worktree directory. When
250 ``None`` (default) the standard sibling layout is used:
251 ``<repo_root.parent>/<repo_root.name>-<name>``.
252
253 Returns:
254 The path to the newly created worktree directory.
255
256 Raises:
257 ValueError: If the name is invalid, the worktree already exists, or
258 the branch does not exist.
259 """
260 validate_branch_name(name)
261
262 wt_dir = path if path is not None else _worktree_dir(repo_root, name)
263 meta_path = _worktree_meta_path(repo_root, name)
264
265 if meta_path.exists():
266 raise ValueError(f"Worktree '{name}' already exists.")
267 if wt_dir.exists():
268 raise ValueError(f"Directory '{wt_dir}' already exists.")
269
270 # Verify the branch exists.
271 branch_ref = _ref_path(repo_root, branch)
272 if not branch_ref.exists():
273 raise ValueError(f"Branch '{branch}' does not exist.")
274
275 # Create the worktree directory.
276 wt_dir.mkdir(parents=True)
277
278 # Write .muse pointer file so `muse` commands work inside the worktree.
279 _write_worktree_pointer(wt_dir, repo_root)
280
281 # Write the worktree HEAD file.
282 head_path = _worktree_head_path(repo_root, name)
283 write_text_atomic(head_path, f"refs/heads/{branch}\n")
284
285 # Populate the worktree from the branch's latest snapshot.
286 commit_id = get_head_commit_id(repo_root, branch)
287 if commit_id:
288 commit = read_commit(repo_root, commit_id)
289 if commit:
290 snap = read_snapshot(repo_root, commit.snapshot_id)
291 if snap:
292 for rel_path, object_id in snap.manifest.items():
293 try:
294 dest = contain_path(wt_dir, rel_path)
295 except ValueError as exc:
296 logger.warning("⚠️ Skipping unsafe path %r: %s", rel_path, exc)
297 continue
298 restore_object(repo_root, object_id, dest)
299
300 # Persist metadata.
301 record: WorktreeRecord = {
302 "name": name,
303 "branch": branch,
304 "path": str(wt_dir),
305 }
306 _save_meta(repo_root, record)
307
308 logger.info("✅ Worktree '%s' created at %s (branch: %s)", name, wt_dir, branch)
309 return wt_dir
310
311 def list_worktrees(repo_root: pathlib.Path) -> list[WorktreeInfo]:
312 """Return all worktrees (main + linked), sorted by name.
313
314 The main worktree is always first; linked worktrees follow in
315 lexicographic order of name.
316 """
317 results: list[WorktreeInfo] = []
318
319 # Main worktree.
320 main_branch = _read_main_branch(repo_root)
321 main_head = get_head_commit_id(repo_root, main_branch)
322 results.append(WorktreeInfo(
323 name="(main)",
324 branch=main_branch,
325 path=repo_root,
326 head_commit=main_head,
327 is_main=True,
328 ))
329
330 wt_dir = worktrees_dir(repo_root)
331 if not wt_dir.exists():
332 return results
333
334 for meta_file in sorted(wt_dir.glob("*.json")):
335 name = meta_file.stem
336 record = _load_meta(repo_root, name)
337 if record is None:
338 continue
339 wt_path = pathlib.Path(record["path"])
340 branch = record["branch"]
341 head_path = _worktree_head_path(repo_root, name)
342 commit_id = get_head_commit_id(repo_root, branch) if head_path.exists() else None
343 results.append(WorktreeInfo(
344 name=name,
345 branch=branch,
346 path=wt_path,
347 head_commit=commit_id,
348 ))
349 return results
350
351 def remove_worktree(repo_root: pathlib.Path, name: str, force: bool = False) -> None:
352 """Remove a linked worktree.
353
354 The branch itself is not deleted — only the worktree directory and its
355 metadata are removed. Commits already made in the worktree remain in the
356 shared object store.
357
358 Args:
359 repo_root: Main repository root.
360 name: Name of the worktree to remove.
361 force: Accepted for interface compatibility. Currently has no
362 effect since Muse does not track working-tree dirtiness
363 per-worktree.
364
365 Raises:
366 ValueError: If the worktree does not exist or its metadata is corrupt.
367 """
368 validate_branch_name(name)
369
370 meta_path = _worktree_meta_path(repo_root, name)
371 if not meta_path.exists():
372 raise ValueError(f"Worktree '{name}' does not exist.")
373
374 record = _load_meta(repo_root, name)
375 if record is None:
376 raise ValueError(f"Could not read metadata for worktree '{name}'.")
377
378 wt_path = pathlib.Path(record["path"])
379 if not _safe_delete_path(repo_root, wt_path):
380 raise ValueError(
381 f"Refusing to delete worktree path '{wt_path}' — "
382 "it is a symlink or resolves inside .muse/."
383 )
384
385 meta_path.unlink(missing_ok=True)
386 head_path = _worktree_head_path(repo_root, name)
387 head_path.unlink(missing_ok=True)
388 # Belt-and-suspenders: remove the pointer file if it still exists
389 # (the directory deletion above should have removed it already).
390 pointer_path = _muse_dir(wt_path)
391 pointer_path.unlink(missing_ok=True)
392
393 logger.info("Worktree '%s' removed.", name)
394
395 def prune_worktrees(repo_root: pathlib.Path, *, dry_run: bool = False) -> list[str]:
396 """Remove metadata for worktrees whose directories no longer exist.
397
398 Args:
399 repo_root: Main repository root.
400 dry_run: When ``True``, report what would be pruned without
401 making any filesystem changes.
402
403 Returns:
404 Names of pruned (or would-be-pruned, when *dry_run* is ``True``)
405 worktrees.
406 """
407 pruned: list[str] = []
408 wt_dir = worktrees_dir(repo_root)
409 if not wt_dir.exists():
410 return pruned
411 for meta_file in list(wt_dir.glob("*.json")):
412 name = meta_file.stem
413 record = _load_meta(repo_root, name)
414 if record is None:
415 if not dry_run:
416 meta_file.unlink(missing_ok=True)
417 pruned.append(name)
418 continue
419 wt_path = pathlib.Path(record["path"])
420 if not wt_path.exists():
421 if not dry_run:
422 meta_file.unlink(missing_ok=True)
423 _worktree_head_path(repo_root, name).unlink(missing_ok=True)
424 pruned.append(name)
425 return pruned
426
427 def get_worktree_status(repo_root: pathlib.Path, name: str) -> WorktreeStatusResult:
428 """Return the status of a single named worktree.
429
430 Covers both linked worktrees (by name) and the implicit main worktree
431 when *name* is ``"(main)"`` or ``"main"``.
432
433 Returns:
434 A ``WorktreeStatusResult`` with present/absent flag, current branch,
435 and HEAD commit — ready for JSON serialisation by the CLI.
436
437 Raises:
438 ValueError: If no worktree with *name* exists.
439 """
440 # Main worktree shortcut.
441 if name in {"(main)", "main"}:
442 branch = _read_main_branch(repo_root)
443 head = get_head_commit_id(repo_root, branch)
444 return WorktreeStatusResult(
445 name="(main)",
446 branch=branch,
447 path=str(repo_root),
448 head_commit=head,
449 present=repo_root.exists(),
450 is_main=True,
451 )
452
453 validate_branch_name(name)
454 meta_path = _worktree_meta_path(repo_root, name)
455 if not meta_path.exists():
456 raise ValueError(f"Worktree '{name}' does not exist.")
457
458 record = _load_meta(repo_root, name)
459 if record is None:
460 raise ValueError(f"Could not read metadata for worktree '{name}'.")
461
462 wt_path = pathlib.Path(record["path"])
463 branch = record["branch"]
464 head_path = _worktree_head_path(repo_root, name)
465 head = get_head_commit_id(repo_root, branch) if head_path.exists() else None
466 return WorktreeStatusResult(
467 name=name,
468 branch=branch,
469 path=str(wt_path),
470 head_commit=head,
471 present=wt_path.exists() and not wt_path.is_symlink(),
472 is_main=False,
473 )
474
475 def repair_worktree_pointers(repo_root: pathlib.Path) -> list[str]:
476 """Write missing ``.muse`` pointer files for all registered worktrees.
477
478 Idempotent — safe to run multiple times. Returns a list of worktree
479 names that were repaired (pointer file written or overwritten).
480 """
481 repaired: list[str] = []
482 for wt in list_worktrees(repo_root):
483 if wt.is_main:
484 continue # Main worktree has a real .muse/ store — never needs a pointer.
485 wt_path = wt.path
486 if not wt_path.is_dir():
487 continue
488 _write_worktree_pointer(wt_path, repo_root)
489 repaired.append(wt.name)
490 logger.info("✅ Repaired worktree pointer: %s", _muse_dir(wt_path))
491 return repaired
File History 1 commit
sha256:51116ec824246acde6abf729e6ba854c223dc5173eff31a645520208023b0652 refactor(bridge): comprehensive spec sweep — close all issu… Sonnet 4.6 minor 28 days ago