gabriel / muse public
config.py python
1,351 lines 48.5 KB
Raw
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor ⚠ breaking 28 days ago
1 """Muse CLI configuration helpers.
2
3 Reads and writes ``.muse/config.toml`` — the per-repository configuration
4 file. Credentials and user identity are **not** stored here; they live in
5 ``~/.muse/identity.toml`` managed by :mod:`muse.core.identity`.
6
7 Config schema
8 -------------
9 ::
10
11 [hub]
12 url = "https://musehub.ai" # MuseHub fabric endpoint for this repo
13
14 [remotes.origin]
15 url = "https://hub.muse.io/repos/my-repo"
16 branch = "main"
17
18 [domain]
19 # Domain-specific key/value pairs; read by the active domain plugin.
20 # ticks_per_beat = "480"
21
22 Settable via ``muse config set``
23 ---------------------------------
24 - ``hub.url`` (alias: ``muse hub connect <url>``)
25 - ``domain.*``
26
27 Not settable via ``muse config set``
28 --------------------------------------
29 - ``user.*`` — use ``muse auth register`` / ``muse auth whoami``
30 - ``remotes.*`` — use ``muse remote add/remove``
31 - credentials — use ``muse auth register``
32
33 Token resolution
34 ----------------
35 :func:`get_signing_identity` reads the hub URL from this file, then resolves the
36 signing identity from ``~/.muse/identity.toml`` via
37 :func:`muse.core.identity.resolve_token`. The token is **never** logged.
38 """
39
40 import fnmatch
41 import logging
42 import pathlib
43 import shutil
44 import subprocess
45 import tomllib
46 from typing import TypedDict
47
48 from muse.core.types import short_id
49 from muse.core.paths import config_toml_path as _config_toml_path, user_muse_dir as _user_muse_dir, user_config_toml_path as _user_config_toml_path, remote_tracking_dir as _remote_tracking_dir, remote_ref_path as _remote_ref_path
50 from muse.core.refs import read_ref
51 from muse.core.store import write_text_atomic
52
53 logger = logging.getLogger(__name__)
54
55 type RemotesMap = dict[str, RemoteEntry] # remote_name → remote entry
56 type DomainConfig = dict[str, str] # domain key → value
57 type ConfigSection = dict[str, str] # generic flattened section dict
58 type ConfigTree = dict[str, ConfigSection] # section → key → value (for JSON output)
59 type DefaultsMap = dict[str, int] # config key → default int value
60 type _SecurityConfig = dict[str, list[str]] # security section from global config
61
62 # ---------------------------------------------------------------------------
63 # Named configuration types
64 # ---------------------------------------------------------------------------
65
66 class HubConfig(TypedDict, total=False):
67 """``[hub]`` section in ``.muse/config.toml``."""
68
69 url: str
70
71 class RemoteEntry(TypedDict, total=False):
72 """``[remotes.<name>]`` section in ``.muse/config.toml``."""
73
74 url: str
75 branch: str
76 promisor: bool # when False, this remote is not used as a promisor for missing objects
77
78 class LimitsConfig(TypedDict, total=False):
79 """``[limits]`` section in ``.muse/config.toml``.
80
81 All values are optional — defaults are used when absent. Keys map to
82 the ``[limits]`` TOML table::
83
84 [limits]
85 max_walk_commits = 10000 # cap for walk_commits_between / muse log
86 max_ancestors = 50000 # cap for find_merge_base BFS
87 max_graph_commits = 50000 # cap for _collect_all_commits (--graph --all)
88 shard_prefix_length = 2 # object store shard depth: 2 (256 shards)
89 # or 4 (65536 shards) for very large repos
90 """
91
92 max_walk_commits: int
93 max_ancestors: int
94 max_graph_commits: int
95 shard_prefix_length: int
96
97 class CommitConfig(TypedDict, total=False):
98 """``[commit]`` section in ``.muse/config.toml``."""
99
100 sign: bool
101
102
103 class BranchMeta(TypedDict, total=False):
104 """Per-branch metadata stored under ``[branch."<name>"]`` in config.toml.
105
106 Fields written by ``muse branch --intent / --resumable``::
107
108 [branch."feat/my-thing"]
109 intent = "refactor auth layer"
110 resumable = true
111
112 Fields written by ``muse push`` upstream tracking (preserved on read/write)::
113
114 remote = "origin"
115 merge = "refs/heads/feat/my-thing"
116 """
117
118 intent: str # short description of what this branch is for
119 resumable: bool # true when this branch is a resumable agent checkpoint
120 remote: str # upstream remote name (e.g. "origin")
121 merge: str # upstream merge ref (e.g. "refs/heads/main")
122
123 class MuseConfig(TypedDict, total=False):
124 """Structured view of the entire ``.muse/config.toml`` file."""
125
126 hub: HubConfig
127 remotes: RemotesMap
128 domain: DomainConfig
129 limits: LimitsConfig
130 commit: CommitConfig
131 branch: "dict[str, BranchMeta]" # branch_name → per-branch metadata
132 protected_branches: "list[str]" # fnmatch patterns from [protected_branches]
133
134 class RemoteConfig(TypedDict, total=False):
135 """Public-facing remote descriptor returned by :func:`list_remotes`."""
136
137 name: str # always present
138 url: str # always present
139
140 # ---------------------------------------------------------------------------
141 # Internal helpers
142 # ---------------------------------------------------------------------------
143
144 def _config_path(repo_root: pathlib.Path | None) -> pathlib.Path:
145 root = (repo_root or pathlib.Path.cwd()).resolve()
146 return _config_toml_path(root)
147
148 def _load_config(config_path: pathlib.Path) -> MuseConfig:
149 """Load and parse config.toml; return an empty MuseConfig if absent."""
150 if not config_path.is_file():
151 return {}
152
153 try:
154 with config_path.open("rb") as fh:
155 raw = tomllib.load(fh)
156 except Exception as exc: # noqa: BLE001
157 logger.warning("⚠️ Failed to parse %s: %s", config_path, exc)
158 return {}
159
160 config: MuseConfig = {}
161
162 hub_raw = raw.get("hub")
163 if isinstance(hub_raw, dict):
164 hub: HubConfig = {}
165 url_val = hub_raw.get("url")
166 if isinstance(url_val, str):
167 hub["url"] = url_val
168 config["hub"] = hub
169
170 remotes_raw = raw.get("remotes")
171 if isinstance(remotes_raw, dict):
172 remotes: RemotesMap = {}
173 for name, remote_raw in remotes_raw.items():
174 if isinstance(remote_raw, dict):
175 entry: RemoteEntry = {}
176 rurl = remote_raw.get("url")
177 if isinstance(rurl, str):
178 entry["url"] = rurl
179 branch_val = remote_raw.get("branch")
180 if isinstance(branch_val, str):
181 entry["branch"] = branch_val
182 promisor_val = remote_raw.get("promisor")
183 if isinstance(promisor_val, bool):
184 entry["promisor"] = promisor_val
185 remotes[name] = entry
186 config["remotes"] = remotes
187
188 domain_raw = raw.get("domain")
189 if isinstance(domain_raw, dict):
190 domain: DomainConfig = {}
191 for key, val in domain_raw.items():
192 if isinstance(val, str):
193 domain[key] = val
194 config["domain"] = domain
195
196 limits_raw = raw.get("limits")
197 if isinstance(limits_raw, dict):
198 limits: LimitsConfig = {}
199 mwc = limits_raw.get("max_walk_commits")
200 if isinstance(mwc, int) and mwc > 0:
201 limits["max_walk_commits"] = mwc
202 ma = limits_raw.get("max_ancestors")
203 if isinstance(ma, int) and ma > 0:
204 limits["max_ancestors"] = ma
205 mgc = limits_raw.get("max_graph_commits")
206 if isinstance(mgc, int) and mgc > 0:
207 limits["max_graph_commits"] = mgc
208 spl = limits_raw.get("shard_prefix_length")
209 if isinstance(spl, int) and spl in (2, 4):
210 limits["shard_prefix_length"] = spl
211 config["limits"] = limits
212
213 commit_raw = raw.get("commit")
214 if isinstance(commit_raw, dict):
215 commit_cfg: CommitConfig = {}
216 sign_v = commit_raw.get("sign")
217 if isinstance(sign_v, bool):
218 commit_cfg["sign"] = sign_v
219 if commit_cfg:
220 config["commit"] = commit_cfg
221
222 branch_raw = raw.get("branch")
223 if isinstance(branch_raw, dict):
224 branch_map: dict[str, BranchMeta] = {}
225 for bname, bdata in branch_raw.items():
226 if not isinstance(bdata, dict):
227 continue
228 bmeta: BranchMeta = {}
229 intent_v = bdata.get("intent")
230 if isinstance(intent_v, str):
231 bmeta["intent"] = intent_v
232 resumable_v = bdata.get("resumable")
233 if isinstance(resumable_v, bool):
234 bmeta["resumable"] = resumable_v
235 remote_v = bdata.get("remote")
236 if isinstance(remote_v, str):
237 bmeta["remote"] = remote_v
238 merge_v = bdata.get("merge")
239 if isinstance(merge_v, str):
240 bmeta["merge"] = merge_v
241 branch_map[bname] = bmeta
242 if branch_map:
243 config["branch"] = branch_map
244
245 pb_raw = raw.get("protected_branches")
246 if isinstance(pb_raw, dict):
247 branches_val = pb_raw.get("branches")
248 if isinstance(branches_val, list):
249 patterns = [p for p in branches_val if isinstance(p, str)]
250 config["protected_branches"] = patterns
251
252 return config
253
254 def _escape(value: str) -> str:
255 """Escape a TOML basic string value (backslash and double-quote only).
256
257 TOML basic strings allow control characters escaped as ``\\n``, ``\\t``,
258 etc., but we store only printable content — control characters in values
259 are also stripped here so the resulting TOML file remains parseable.
260 """
261 return (
262 value.replace("\\", "\\\\")
263 .replace('"', '\\"')
264 .replace("\n", "\\n")
265 .replace("\r", "\\r")
266 .replace("\0", "")
267 )
268
269 # Characters that are structurally significant in unquoted TOML keys and
270 # table headers. Any of these in a key name would allow injection of
271 # arbitrary TOML sections or key-value pairs.
272 _TOML_KEY_UNSAFE: frozenset[str] = frozenset('\n\r\0][="')
273
274 def _validate_toml_key(key: str, context: str = "key") -> None:
275 """Raise ``ValueError`` if *key* contains TOML-structurally unsafe characters.
276
277 Prevents injection attacks where a crafted key like ``x]\\n[injected`` would
278 break the TOML section structure and allow writing arbitrary sections.
279
280 Args:
281 key: Key string to validate.
282 context: Human-readable label used in the error message (e.g. ``"domain key"``).
283
284 Raises:
285 ValueError: If any character in *key* is in ``_TOML_KEY_UNSAFE``.
286 """
287 bad = _TOML_KEY_UNSAFE & set(key)
288 if bad:
289 chars = ", ".join(sorted(repr(c) for c in bad))
290 raise ValueError(
291 f"Config {context} {key!r} contains characters not allowed in TOML keys: {chars}"
292 )
293
294 def _dump_toml(config: MuseConfig) -> str:
295 """Serialise a MuseConfig to TOML text.
296
297 Section order: ``[hub]``, ``[remotes.*]``, ``[domain]``, ``[limits]``.
298
299 All key names are validated against ``_TOML_KEY_UNSAFE`` before being
300 written, preventing TOML injection via crafted domain keys or remote names.
301 """
302 lines: list[str] = []
303
304 hub = config.get("hub")
305 if hub:
306 lines.append("[hub]")
307 url = hub.get("url", "")
308 if url:
309 lines.append(f'url = "{_escape(url)}"')
310 lines.append("")
311
312 remotes = config.get("remotes") or {}
313 for remote_name in sorted(remotes):
314 # Remote names come from _load_config which parses TOML, so they are
315 # safe at read time. Validate defensively before writing.
316 _validate_toml_key(remote_name, "remote name")
317 entry = remotes[remote_name]
318 lines.append(f"[remotes.{remote_name}]")
319 rurl = entry.get("url", "")
320 if rurl:
321 lines.append(f'url = "{_escape(rurl)}"')
322 branch = entry.get("branch", "")
323 if branch:
324 lines.append(f'branch = "{_escape(branch)}"')
325 if "promisor" in entry:
326 lines.append(f'promisor = {"true" if entry["promisor"] else "false"}')
327 lines.append("")
328
329 domain = config.get("domain") or {}
330 if domain:
331 lines.append("[domain]")
332 for key, val in sorted(domain.items()):
333 _validate_toml_key(key, "domain key")
334 lines.append(f'{key} = "{_escape(val)}"')
335 lines.append("")
336
337 limits = config.get("limits") or {}
338 if limits:
339 lines.append("[limits]")
340 mwc = limits.get("max_walk_commits")
341 if mwc is not None:
342 lines.append(f"max_walk_commits = {mwc}")
343 ma = limits.get("max_ancestors")
344 if ma is not None:
345 lines.append(f"max_ancestors = {ma}")
346 mgc = limits.get("max_graph_commits")
347 if mgc is not None:
348 lines.append(f"max_graph_commits = {mgc}")
349 spl = limits.get("shard_prefix_length")
350 if spl is not None:
351 lines.append(f"shard_prefix_length = {spl}")
352 lines.append("")
353
354 commit_cfg = config.get("commit") or {}
355 if commit_cfg:
356 lines.append("[commit]")
357 if "sign" in commit_cfg:
358 lines.append(f"sign = {'true' if commit_cfg['sign'] else 'false'}")
359 lines.append("")
360
361 branch_sections = config.get("branch") or {}
362 for bname in sorted(branch_sections):
363 _validate_toml_key(bname, "branch name")
364 bmeta = branch_sections[bname]
365 # Skip empty metadata dicts — no section needed.
366 if not bmeta:
367 continue
368 # Branch names require quoted keys (may contain '/' and other chars
369 # that are not valid in bare TOML keys).
370 lines.append(f'[branch."{_escape(bname)}"]')
371 intent = bmeta.get("intent", "")
372 if intent:
373 lines.append(f'intent = "{_escape(intent)}"')
374 if "resumable" in bmeta:
375 lines.append(f"resumable = {'true' if bmeta['resumable'] else 'false'}")
376 remote = bmeta.get("remote", "")
377 if remote:
378 lines.append(f'remote = "{_escape(remote)}"')
379 merge = bmeta.get("merge", "")
380 if merge:
381 lines.append(f'merge = "{_escape(merge)}"')
382 lines.append("")
383
384 return "\n".join(lines)
385
386 # ---------------------------------------------------------------------------
387 # Auth token resolution (via identity store)
388 # ---------------------------------------------------------------------------
389
390 def get_signing_identity(
391 repo_root: pathlib.Path | None = None,
392 remote_url: str | None = None,
393 agent_id: str | None = None,
394 ) -> "object | None":
395 """Return a :class:`~muse.core.transport.SigningIdentity` for a hub, or ``None``.
396
397 Resolution order:
398 1. ``MUSE_AGENT_KEY_FD`` environment variable — integer file descriptor
399 from which exactly 64 bytes of sub-seed are read (then the fd is
400 closed). The Ed25519 identity key is derived via
401 :func:`~muse.core.hdkeys.derive_identity_key`. The handle is taken
402 from ``MUSE_AGENT_HANDLE`` (defaults to *agent_id* if set, else
403 ``"agent"``). This is the only supported env-based injection mechanism;
404 the secret travels through the kernel pipe buffer and never appears in
405 ``/proc/<pid>/environ``.
406 2. Agent-specific entry in ``~/.muse/identity.toml`` keyed by
407 ``"hostname#agent_id"`` — when *agent_id* is provided.
408 3. Human entry in ``~/.muse/identity.toml`` keyed by bare hostname.
409 4. Hub URL from ``[hub] url`` in ``.muse/config.toml`` (fallback lookup
410 URL when *remote_url* is not supplied).
411
412 The private key is **never** logged.
413
414 Args:
415 repo_root: Repository root. Defaults to ``Path.cwd()``.
416 remote_url: URL of the specific remote being contacted.
417 agent_id: Agent handle, e.g. ``"agentception-abc123"``. Used to
418 try an agent-specific key before falling back to the
419 human key.
420
421 Returns:
422 :class:`~muse.core.transport.SigningIdentity` or ``None``.
423 """
424 import os as _os
425 from muse.core.identity import resolve_signing_identity # avoid circular import
426 from muse.core.transport import SigningIdentity
427
428 # 1. MUSE_AGENT_KEY_FD — read 64-byte sub-seed from a pipe fd.
429 # This is the only supported env-var injection mechanism.
430 # The secret travels through the kernel pipe buffer and never appears
431 # in /proc/<pid>/environ.
432 key_fd_str = _os.environ.get("MUSE_AGENT_KEY_FD", "").strip()
433 if key_fd_str:
434 try:
435 key_fd = int(key_fd_str)
436 import os as _os2
437 sub_seed = bytearray(_os2.read(key_fd, 64))
438 _os2.close(key_fd)
439 if len(sub_seed) == 64:
440 from muse.core.hdkeys import derive_identity_key
441 dk = derive_identity_key(sub_seed)
442 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
443 private_key = Ed25519PrivateKey.from_private_bytes(dk.private_bytes)
444 dk.zero()
445 sub_seed[:] = b"\x00" * len(sub_seed)
446 handle = (
447 _os.environ.get("MUSE_AGENT_HANDLE", "").strip()
448 or agent_id
449 or "agent"
450 )
451 logger.debug("✅ Signing identity from MUSE_AGENT_KEY_FD (handle=%s)", handle)
452 return SigningIdentity(handle=handle, private_key=private_key)
453 logger.warning(
454 "⚠️ MUSE_AGENT_KEY_FD fd=%s yielded %d bytes (expected 64) — falling through",
455 key_fd_str, len(sub_seed),
456 )
457 except Exception as exc:
458 logger.warning("⚠️ MUSE_AGENT_KEY_FD could not be read: %s — falling through", exc)
459
460 # 2. Identity store lookup (agent key → human key fallback).
461 lookup_url: str | None = remote_url or get_hub_url(repo_root)
462 if lookup_url is None:
463 logger.debug("⚠️ No hub configured — skipping signing identity lookup")
464 return None
465
466 result = resolve_signing_identity(lookup_url, agent_id=agent_id)
467 if result is None:
468 logger.debug(
469 "⚠️ No signing identity for hub %s — run `muse auth keygen && muse auth register`",
470 lookup_url,
471 )
472 return None
473
474 handle, private_key = result
475 logger.debug("✅ Signing identity resolved for hub %s (handle=%s)", lookup_url, handle)
476 return SigningIdentity(handle=handle, private_key=private_key)
477
478 # ---------------------------------------------------------------------------
479 # Hub helpers
480 # ---------------------------------------------------------------------------
481
482 def get_hub_url(repo_root: pathlib.Path | None = None) -> str | None:
483 """Return the hub URL from ``[hub] url``, or ``None`` if not configured.
484
485 Resolution order:
486 1. ``<repo>/.muse/config.toml`` — repo-local config (highest priority).
487 2. ``~/.muse/config.toml`` — global user config (fallback).
488
489 This fallback ensures ``muse auth whoami`` and other hub-aware commands
490 work without ``--hub`` even when invoked outside a repository, as long as
491 the user has set a default hub in their global config.
492
493 Args:
494 repo_root: Repository root. Defaults to ``Path.cwd()``.
495
496 Returns:
497 URL string, or ``None``.
498 """
499 config = _load_config(_config_path(repo_root))
500 hub = config.get("hub")
501 if hub is not None:
502 url = hub.get("url", "")
503 if url.strip():
504 return url.strip()
505
506 # Fall back to ~/.muse/config.toml so hub-aware commands (e.g. `muse auth
507 # whoami`) work without --hub when invoked outside a repository.
508 global_config = _load_config(_GLOBAL_CONFIG_FILE)
509 global_hub = global_config.get("hub")
510 if global_hub is not None:
511 url = global_hub.get("url", "")
512 if url.strip():
513 return url.strip()
514
515 return None
516
517 def set_hub_url(url: str, repo_root: pathlib.Path | None = None) -> None:
518 """Write ``[hub] url`` to ``.muse/config.toml``.
519
520 Preserves all other sections. Creates the config file if absent.
521 Rejects ``http://`` URLs — Muse never contacts a hub over cleartext HTTP.
522
523 Args:
524 url: Hub URL (must be ``https://``).
525 repo_root: Repository root. Defaults to ``Path.cwd()``.
526
527 Raises:
528 ValueError: If *url* does not use the ``https://`` scheme.
529 """
530 _is_loopback = url.startswith("http://localhost") or url.startswith("http://127.0.0.1") or url.startswith("http://[::1]")
531 if not url.startswith("https://") and not _is_loopback:
532 raise ValueError(
533 f"Hub URL must use HTTPS. Got: {url!r}\n"
534 "Muse never connects to a hub over cleartext HTTP.\n"
535 "(Exception: http://localhost and http://127.0.0.1 are allowed for local development.)"
536 )
537 cp = _config_path(repo_root)
538 cp.parent.mkdir(parents=True, exist_ok=True)
539 config = _load_config(cp)
540 config["hub"] = HubConfig(url=url)
541 write_text_atomic(cp, _dump_toml(config))
542 logger.info("✅ Hub URL set to %s", url)
543
544 def clear_hub_url(repo_root: pathlib.Path | None = None) -> None:
545 """Remove the ``[hub]`` section from ``.muse/config.toml``.
546
547 Args:
548 repo_root: Repository root. Defaults to ``Path.cwd()``.
549 """
550 cp = _config_path(repo_root)
551 config = _load_config(cp)
552 if "hub" in config:
553 del config["hub"]
554 write_text_atomic(cp, _dump_toml(config))
555 logger.info("✅ Hub disconnected")
556
557 # ---------------------------------------------------------------------------
558 # Generic dotted-key helpers
559 # ---------------------------------------------------------------------------
560
561 _BlockedNS = dict[str, str]
562 _BLOCKED_NAMESPACES: _BlockedNS = {
563 "auth": "Use `muse auth keygen` and `muse auth register` to manage credentials.",
564 "remotes": "Use `muse remote add/remove/rename` to manage remotes.",
565 "user": "User identity is managed via `muse auth register`. Run `muse auth whoami` to inspect.",
566 }
567
568 _SETTABLE_NAMESPACES = {"hub", "domain", "limits", "commit"}
569
570 # Default cap values — used when the [limits] section is absent or the key
571 # is not set. These are the same values that were previously hardcoded inside
572 # the individual functions.
573 _DEFAULT_MAX_WALK_COMMITS: int = 10_000
574 _DEFAULT_MAX_ANCESTORS: int = 50_000
575 _DEFAULT_MAX_GRAPH_COMMITS: int = 50_000
576 _DEFAULT_SHARD_PREFIX_LENGTH: int = 2
577
578 def get_limit(key: str, repo_root: pathlib.Path | None = None) -> int:
579 """Return a ``[limits]`` integer cap from config, or its default.
580
581 Args:
582 key: Limit key — one of ``max_walk_commits``, ``max_ancestors``,
583 ``max_graph_commits``.
584 repo_root: Repository root; ``None`` falls back to ``Path.cwd()``.
585
586 Returns:
587 Configured integer value, or the built-in default if not set.
588 """
589 defaults: DefaultsMap = {
590 "max_walk_commits": _DEFAULT_MAX_WALK_COMMITS,
591 "max_ancestors": _DEFAULT_MAX_ANCESTORS,
592 "max_graph_commits": _DEFAULT_MAX_GRAPH_COMMITS,
593 "shard_prefix_length": _DEFAULT_SHARD_PREFIX_LENGTH,
594 }
595 default = defaults.get(key, 10_000)
596 config = _load_config(_config_path(repo_root))
597 limits = config.get("limits") or {}
598 # Explicit key dispatch keeps mypy happy on TypedDict literal-required keys.
599 if key == "max_walk_commits":
600 val: int | None = limits.get("max_walk_commits")
601 elif key == "max_ancestors":
602 val = limits.get("max_ancestors")
603 elif key == "max_graph_commits":
604 val = limits.get("max_graph_commits")
605 elif key == "shard_prefix_length":
606 val = limits.get("shard_prefix_length")
607 else:
608 val = None
609 if isinstance(val, int) and val > 0:
610 return val
611 return default
612
613 def get_config_value(key: str, repo_root: pathlib.Path | None = None) -> str | None:
614 """Get a config value by dotted key (e.g. ``user.handle``, ``hub.url``).
615
616 Returns ``None`` when the key is not set or the namespace is unknown.
617
618 Args:
619 key: Dotted key in ``<namespace>.<subkey>`` form.
620 repo_root: Repository root. Defaults to ``Path.cwd()``.
621
622 Returns:
623 String value, or ``None``.
624 """
625 parts = key.split(".", 1)
626 if len(parts) != 2:
627 return None
628 namespace, subkey = parts
629 config = _load_config(_config_path(repo_root))
630
631 if namespace == "user":
632 # User identity lives in identity.toml, keyed by the configured hub URL.
633 hub_url = (config.get("hub") or {}).get("url", "")
634 if not hub_url:
635 return None
636 try:
637 from muse.core.identity import load_identity, hostname_from_url
638 hostname = hostname_from_url(hub_url)
639 entry = load_identity(hostname)
640 if entry is None:
641 return None
642 if subkey == "handle":
643 return entry.get("handle")
644 if subkey == "type":
645 return entry.get("type")
646 if subkey == "display_name":
647 return entry.get("display_name")
648 if subkey == "email":
649 return entry.get("email")
650 except Exception:
651 pass
652 return None
653
654 if namespace == "hub":
655 hub = config.get("hub") or {}
656 if subkey == "url":
657 return hub.get("url")
658 return None
659
660 if namespace == "domain":
661 domain = config.get("domain") or {}
662 return domain.get(subkey)
663
664 if namespace == "limits":
665 limits = config.get("limits") or {}
666 if subkey == "max_walk_commits":
667 v = limits.get("max_walk_commits")
668 return str(v) if isinstance(v, int) else None
669 if subkey == "max_ancestors":
670 v = limits.get("max_ancestors")
671 return str(v) if isinstance(v, int) else None
672 if subkey == "max_graph_commits":
673 v = limits.get("max_graph_commits")
674 return str(v) if isinstance(v, int) else None
675 if subkey == "shard_prefix_length":
676 v = limits.get("shard_prefix_length")
677 return str(v) if isinstance(v, int) else None
678 return None
679
680 if namespace == "commit":
681 commit = config.get("commit") or {}
682 if subkey == "sign":
683 v = commit.get("sign")
684 if v is True:
685 return "true"
686 if v is False:
687 return "false"
688 return None
689 return None
690
691 return None
692
693 def set_config_value(key: str, value: str, repo_root: pathlib.Path | None = None) -> None:
694 """Set a config value by dotted key (e.g. ``user.handle``, ``domain.ticks_per_beat``).
695
696 Args:
697 key: Dotted key in ``<namespace>.<subkey>`` form.
698 value: New string value.
699 repo_root: Repository root. Defaults to ``Path.cwd()``.
700
701 Raises:
702 ValueError: If the namespace is blocked, unknown, or the subkey is invalid.
703 """
704 parts = key.split(".", 1)
705 if len(parts) != 2:
706 raise ValueError(f"Key must be in 'namespace.subkey' form, got: {key!r}")
707 namespace, subkey = parts
708
709 if namespace in _BLOCKED_NAMESPACES:
710 raise ValueError(_BLOCKED_NAMESPACES[namespace])
711
712 if namespace not in _SETTABLE_NAMESPACES:
713 raise ValueError(
714 f"Unknown config namespace {namespace!r}. "
715 f"Settable namespaces: {', '.join(sorted(_SETTABLE_NAMESPACES))}"
716 )
717
718 cp = _config_path(repo_root)
719 cp.parent.mkdir(parents=True, exist_ok=True)
720 config = _load_config(cp)
721
722 if namespace == "user":
723 set_user_field(subkey, value, repo_root)
724 return
725
726 if namespace == "hub":
727 if subkey != "url":
728 raise ValueError(f"Unknown [hub] config key: {subkey!r}. Valid keys: url")
729 # Route through set_hub_url — it enforces the HTTPS requirement.
730 set_hub_url(value, repo_root)
731 return
732
733 if namespace == "limits":
734 _LIMITS_KEYS = frozenset({
735 "max_walk_commits", "max_ancestors", "max_graph_commits", "shard_prefix_length",
736 })
737 if subkey not in _LIMITS_KEYS:
738 raise ValueError(
739 f"Unknown [limits] config key: {subkey!r}. "
740 f"Valid keys: {', '.join(sorted(_LIMITS_KEYS))}"
741 )
742 try:
743 int_value = int(value)
744 except ValueError as exc:
745 raise ValueError(
746 f"[limits] {subkey} must be an integer, got: {value!r}"
747 ) from exc
748 if int_value <= 0:
749 raise ValueError(f"[limits] {subkey} must be a positive integer, got: {int_value}")
750 if subkey == "shard_prefix_length" and int_value not in (2, 4):
751 raise ValueError("shard_prefix_length must be 2 or 4")
752 limits_section: LimitsConfig = config.get("limits") or {}
753 if subkey == "max_walk_commits":
754 limits_section["max_walk_commits"] = int_value
755 elif subkey == "max_ancestors":
756 limits_section["max_ancestors"] = int_value
757 elif subkey == "max_graph_commits":
758 limits_section["max_graph_commits"] = int_value
759 elif subkey == "shard_prefix_length":
760 limits_section["shard_prefix_length"] = int_value
761 config["limits"] = limits_section
762 write_text_atomic(cp, _dump_toml(config))
763 logger.info("✅ limits.%s = %d", subkey, int_value)
764 return
765
766 if namespace == "commit":
767 _COMMIT_KEYS = frozenset({"sign"})
768 if subkey not in _COMMIT_KEYS:
769 raise ValueError(
770 f"Unknown [commit] config key: {subkey!r}. "
771 f"Valid keys: {', '.join(sorted(_COMMIT_KEYS))}"
772 )
773 if value not in ("true", "false"):
774 raise ValueError(f"[commit] {subkey} must be 'true' or 'false', got: {value!r}")
775 commit_section: CommitConfig = config.get("commit") or {}
776 if subkey == "sign":
777 commit_section["sign"] = value == "true"
778 config["commit"] = commit_section
779 write_text_atomic(cp, _dump_toml(config))
780 logger.info("✅ commit.%s = %s", subkey, value)
781 return
782
783 # namespace == "domain"
784 _validate_toml_key(subkey, "domain key")
785 domain: DomainConfig = config.get("domain") or {}
786 domain[subkey] = value
787 config["domain"] = domain
788 write_text_atomic(cp, _dump_toml(config))
789 logger.info("✅ domain.%s = %r", subkey, value)
790
791 def config_as_dict(repo_root: pathlib.Path | None = None) -> ConfigTree:
792 """Return the full config as a plain ``dict[str, dict[str, str]]`` for JSON output.
793
794 Credentials are never included — the hub section only contains the URL.
795
796 Args:
797 repo_root: Repository root. Defaults to ``Path.cwd()``.
798
799 Returns:
800 Nested dict suitable for ``json.dumps``.
801 """
802 config = _load_config(_config_path(repo_root))
803 result: ConfigTree = {}
804
805 hub = config.get("hub")
806 if hub:
807 hub_url = hub.get("url", "")
808 if hub_url:
809 result["hub"] = {"url": hub_url}
810
811 remotes = config.get("remotes") or {}
812 if remotes:
813 remotes_dict: ConfigSection = {}
814 for rname, entry in sorted(remotes.items()):
815 url = entry.get("url", "")
816 if url:
817 remotes_dict[rname] = url
818 if remotes_dict:
819 result["remotes"] = remotes_dict
820
821 domain = config.get("domain") or {}
822 if domain:
823 result["domain"] = dict(sorted(domain.items()))
824
825 limits = config.get("limits") or {}
826 if limits:
827 limits_dict: ConfigSection = {}
828 for lk in ("max_walk_commits", "max_ancestors", "max_graph_commits", "shard_prefix_length"):
829 lv = limits.get(lk)
830 if lv is not None:
831 limits_dict[lk] = str(lv)
832 if limits_dict:
833 result["limits"] = limits_dict
834
835 return result
836
837 def config_path_for_editor(repo_root: pathlib.Path | None = None) -> pathlib.Path:
838 """Return the config path for the ``config edit`` command."""
839 return _config_path(repo_root)
840
841 # ---------------------------------------------------------------------------
842 # Branch metadata helpers
843 # ---------------------------------------------------------------------------
844
845 def write_branch_meta(
846 repo_root: pathlib.Path,
847 branch_name: str,
848 *,
849 intent: str | None = None,
850 resumable: bool | None = None,
851 ) -> None:
852 """Write per-branch metadata to ``[branch."<name>"]`` in ``.muse/config.toml``.
853
854 Only the supplied keyword arguments are updated; existing fields
855 (``remote``, ``merge``, and previously written ``intent``/``resumable``)
856 are preserved unchanged.
857
858 Args:
859 repo_root: Repository root directory.
860 branch_name: Name of the branch (e.g. ``"feat/my-thing"``).
861 intent: Short description of what this branch is for.
862 resumable: Mark this branch as a resumable agent checkpoint.
863 """
864 _validate_toml_key(branch_name, "branch name")
865 cp = _config_path(repo_root)
866 cp.parent.mkdir(parents=True, exist_ok=True)
867 config = _load_config(cp)
868 branch_map: dict[str, BranchMeta] = dict(config.get("branch") or {})
869 entry: BranchMeta = dict(branch_map.get(branch_name) or {}) # type: ignore[arg-type]
870 if intent is not None:
871 entry["intent"] = intent
872 if resumable is not None:
873 entry["resumable"] = resumable
874 branch_map[branch_name] = entry
875 config["branch"] = branch_map
876 write_text_atomic(cp, _dump_toml(config))
877
878 def delete_branch_meta(repo_root: pathlib.Path, branch_name: str) -> None:
879 """Remove the ``[branch."<name>"]`` section from ``.muse/config.toml``.
880
881 Called by ``muse branch -d/-D`` after a successful branch deletion so
882 stale intent/resumable entries do not accumulate indefinitely. No-op
883 when the branch has no metadata or the config file is absent.
884 """
885 cp = _config_path(repo_root)
886 if not cp.exists():
887 return
888 config = _load_config(cp)
889 branch_map = dict(config.get("branch") or {})
890 if branch_name not in branch_map:
891 return
892 del branch_map[branch_name]
893 config["branch"] = branch_map
894 write_text_atomic(cp, _dump_toml(config))
895
896 def read_branch_meta(
897 repo_root: pathlib.Path,
898 branch_name: str,
899 ) -> BranchMeta:
900 """Return per-branch metadata from ``.muse/config.toml``.
901
902 Returns an empty dict when the branch has no metadata or the config file
903 is absent.
904
905 Args:
906 repo_root: Repository root directory.
907 branch_name: Name of the branch (e.g. ``"feat/my-thing"``).
908
909 Returns:
910 Dict with any of: ``intent`` (str), ``resumable`` (bool),
911 ``remote`` (str), ``merge`` (str).
912 """
913 config = _load_config(_config_path(repo_root))
914 branch_map = config.get("branch") or {}
915 return dict(branch_map.get(branch_name) or {})
916
917 # ---------------------------------------------------------------------------
918 # Protected branches helpers
919 # ---------------------------------------------------------------------------
920
921 def get_protected_branches(repo_root: pathlib.Path | None = None) -> list[str]:
922 """Return the list of protected branch patterns from ``[protected_branches]``.
923
924 Returns an empty list when the section is absent or has no ``branches`` key.
925
926 Args:
927 repo_root: Repository root. Defaults to ``Path.cwd()``.
928 """
929 config = _load_config(_config_path(repo_root))
930 return list(config.get("protected_branches") or [])
931
932 def is_branch_protected(branch: str, patterns: list[str]) -> bool:
933 """Return ``True`` if *branch* matches any pattern in *patterns*.
934
935 Patterns are matched with :func:`fnmatch.fnmatch` (shell-style globs).
936 Matching is case-sensitive, consistent with Python fnmatch behaviour.
937
938 Args:
939 branch: Branch name to test (e.g. ``"release/1.0"``).
940 patterns: List of patterns from ``[protected_branches] branches``.
941 """
942 return any(fnmatch.fnmatch(branch, p) for p in patterns)
943
944 # ---------------------------------------------------------------------------
945 # Remote helpers
946 # ---------------------------------------------------------------------------
947
948 def get_remote(name: str, repo_root: pathlib.Path | None = None) -> str | None:
949 """Return the URL for remote *name*, or ``None`` when not configured.
950
951 Args:
952 name: Remote name (e.g. ``"origin"``).
953 repo_root: Repository root. Defaults to ``Path.cwd()``.
954
955 Returns:
956 URL string, or ``None``.
957 """
958 config = _load_config(_config_path(repo_root))
959 remotes = config.get("remotes")
960 if remotes is None:
961 return None
962 entry = remotes.get(name)
963 if entry is None:
964 return None
965 url = entry.get("url", "")
966 return url.strip() if url.strip() else None
967
968 def set_remote(
969 name: str,
970 url: str,
971 repo_root: pathlib.Path | None = None,
972 ) -> None:
973 """Write ``[remotes.<name>] url`` to ``.muse/config.toml``.
974
975 Preserves all other sections. Creates the file if absent.
976
977 Args:
978 name: Remote name (e.g. ``"origin"``).
979 url: Remote URL.
980 repo_root: Repository root. Defaults to ``Path.cwd()``.
981 """
982 cp = _config_path(repo_root)
983 cp.parent.mkdir(parents=True, exist_ok=True)
984 config = _load_config(cp)
985 existing_remotes = config.get("remotes")
986 remotes: RemotesMap = {}
987 if existing_remotes:
988 remotes.update(existing_remotes)
989 existing_entry = remotes.get(name)
990 entry: RemoteEntry = {}
991 if existing_entry is not None:
992 if "url" in existing_entry:
993 entry["url"] = existing_entry["url"]
994 if "branch" in existing_entry:
995 entry["branch"] = existing_entry["branch"]
996 entry["url"] = url
997 remotes[name] = entry
998 config["remotes"] = remotes
999 write_text_atomic(cp, _dump_toml(config))
1000 logger.info("✅ Remote %r set to %s", name, url)
1001
1002 def remove_remote(
1003 name: str,
1004 repo_root: pathlib.Path | None = None,
1005 ) -> None:
1006 """Remove a named remote and its tracking refs.
1007
1008 Args:
1009 name: Remote name to remove.
1010 repo_root: Repository root. Defaults to ``Path.cwd()``.
1011
1012 Raises:
1013 KeyError: If *name* is not a configured remote.
1014 """
1015 cp = _config_path(repo_root)
1016 config = _load_config(cp)
1017 remotes = config.get("remotes")
1018 if remotes is None or name not in remotes:
1019 raise KeyError(name)
1020 del remotes[name]
1021 config["remotes"] = remotes
1022 write_text_atomic(cp, _dump_toml(config))
1023 logger.info("✅ Remote %r removed from config", name)
1024
1025 root = (repo_root or pathlib.Path.cwd()).resolve()
1026 refs_dir = _remote_tracking_dir(root, name)
1027 if refs_dir.is_symlink():
1028 # Refuse to rmtree a symlink — following a symlink placed by an
1029 # attacker could delete files outside the repository tree.
1030 logger.warning("⚠️ Skipping rmtree: remotes dir %s is a symlink", refs_dir)
1031 elif refs_dir.is_dir():
1032 shutil.rmtree(refs_dir)
1033 logger.debug("✅ Removed tracking refs dir %s", refs_dir)
1034
1035 def rename_remote(
1036 old_name: str,
1037 new_name: str,
1038 repo_root: pathlib.Path | None = None,
1039 ) -> None:
1040 """Rename a remote and move its tracking refs.
1041
1042 Args:
1043 old_name: Current remote name.
1044 new_name: Desired new remote name.
1045 repo_root: Repository root. Defaults to ``Path.cwd()``.
1046
1047 Raises:
1048 KeyError: If *old_name* is not a configured remote.
1049 ValueError: If *new_name* is already configured.
1050 """
1051 cp = _config_path(repo_root)
1052 config = _load_config(cp)
1053 remotes = config.get("remotes")
1054 if remotes is None or old_name not in remotes:
1055 raise KeyError(old_name)
1056 if new_name in remotes:
1057 raise ValueError(new_name)
1058 remotes[new_name] = remotes.pop(old_name)
1059 config["remotes"] = remotes
1060 write_text_atomic(cp, _dump_toml(config))
1061 logger.info("✅ Remote %r renamed to %r", old_name, new_name)
1062
1063 root = (repo_root or pathlib.Path.cwd()).resolve()
1064 old_refs_dir = _remote_tracking_dir(root, old_name)
1065 new_refs_dir = _remote_tracking_dir(root, new_name)
1066 if old_refs_dir.is_dir():
1067 old_refs_dir.rename(new_refs_dir)
1068 logger.debug("✅ Moved tracking refs dir %s → %s", old_refs_dir, new_refs_dir)
1069
1070 def list_remotes(repo_root: pathlib.Path | None = None) -> list[RemoteConfig]:
1071 """Return all configured remotes sorted alphabetically by name.
1072
1073 Args:
1074 repo_root: Repository root. Defaults to ``Path.cwd()``.
1075
1076 Returns:
1077 List of ``{"name": str, "url": str}`` dicts.
1078 """
1079 config = _load_config(_config_path(repo_root))
1080 remotes = config.get("remotes")
1081 if remotes is None:
1082 return []
1083 result: list[RemoteConfig] = []
1084 for remote_name in sorted(remotes):
1085 entry = remotes[remote_name]
1086 url = entry.get("url", "")
1087 if not url.strip():
1088 continue
1089 rc = RemoteConfig(name=remote_name, url=url.strip())
1090 result.append(rc)
1091 return result
1092
1093 # ---------------------------------------------------------------------------
1094 # Remote tracking-head helpers
1095 # ---------------------------------------------------------------------------
1096
1097 def _remote_head_path(
1098 remote_name: str,
1099 branch: str,
1100 repo_root: pathlib.Path | None = None,
1101 ) -> pathlib.Path:
1102 """Return the path to the remote tracking pointer file."""
1103 root = (repo_root or pathlib.Path.cwd()).resolve()
1104 return _remote_ref_path(root, remote_name, branch)
1105
1106 def get_remote_head(
1107 remote_name: str,
1108 branch: str,
1109 repo_root: pathlib.Path | None = None,
1110 ) -> str | None:
1111 """Return the last-known remote commit ID for *remote_name*/*branch*.
1112
1113 Returns ``None`` when the tracking pointer does not exist.
1114
1115 Args:
1116 remote_name: Remote name (e.g. ``"origin"``).
1117 branch: Branch name (e.g. ``"main"``).
1118 repo_root: Repository root. Defaults to ``Path.cwd()``.
1119
1120 Returns:
1121 Commit ID string, or ``None``.
1122 """
1123 return read_ref(_remote_head_path(remote_name, branch, repo_root))
1124
1125 def set_remote_head(
1126 remote_name: str,
1127 branch: str,
1128 commit_id: str,
1129 repo_root: pathlib.Path | None = None,
1130 ) -> None:
1131 """Write the remote tracking pointer for *remote_name*/*branch*.
1132
1133 Args:
1134 remote_name: Remote name (e.g. ``"origin"``).
1135 branch: Branch name.
1136 commit_id: Commit ID to record as the known remote HEAD.
1137 repo_root: Repository root. Defaults to ``Path.cwd()``.
1138 """
1139 pointer = _remote_head_path(remote_name, branch, repo_root)
1140 write_text_atomic(pointer, commit_id)
1141 logger.debug("✅ Remote head %s/%s → %s", remote_name, branch, short_id(commit_id))
1142
1143 def delete_remote_head(
1144 remote_name: str,
1145 branch: str,
1146 repo_root: pathlib.Path | None = None,
1147 ) -> bool:
1148 """Remove the local remote-tracking pointer for *remote_name*/*branch*.
1149
1150 Used after ``muse push --delete`` deletes the branch on the server, or when
1151 pruning stale tracking refs with ``muse branch -dr``.
1152
1153 Args:
1154 remote_name: Remote name (e.g. ``"origin"``).
1155 branch: Branch name (e.g. ``"feat/my-thing"``).
1156 repo_root: Repository root. Defaults to ``Path.cwd()``.
1157
1158 Returns:
1159 ``True`` if the pointer file existed and was removed, ``False`` if it
1160 was already absent (idempotent).
1161 """
1162 pointer = _remote_head_path(remote_name, branch, repo_root)
1163 if not pointer.is_file():
1164 return False
1165 pointer.unlink()
1166 # Remove now-empty parent directories (mirrors _cleanup_empty_dirs in branch.py).
1167 remotes_dir = pointer.parent
1168 while remotes_dir.name != remote_name:
1169 try:
1170 remotes_dir.rmdir()
1171 except OSError:
1172 break
1173 remotes_dir = remotes_dir.parent
1174 logger.debug("🗑 Remote tracking ref %s/%s removed", remote_name, branch)
1175 return True
1176
1177 # ---------------------------------------------------------------------------
1178 # Upstream tracking helpers
1179 # ---------------------------------------------------------------------------
1180
1181 def set_upstream(
1182 branch: str,
1183 remote_name: str,
1184 repo_root: pathlib.Path | None = None,
1185 ) -> None:
1186 """Record *remote_name* as the upstream remote for *branch*.
1187
1188 Args:
1189 branch: Local (and remote) branch name.
1190 remote_name: Remote name.
1191 repo_root: Repository root. Defaults to ``Path.cwd()``.
1192 """
1193 cp = _config_path(repo_root)
1194 cp.parent.mkdir(parents=True, exist_ok=True)
1195 config = _load_config(cp)
1196 existing_remotes = config.get("remotes")
1197 remotes: RemotesMap = {}
1198 if existing_remotes:
1199 remotes.update(existing_remotes)
1200 existing_entry = remotes.get(remote_name)
1201 entry: RemoteEntry = {}
1202 if existing_entry is not None:
1203 if "url" in existing_entry:
1204 entry["url"] = existing_entry["url"]
1205 if "branch" in existing_entry:
1206 entry["branch"] = existing_entry["branch"]
1207 entry["branch"] = branch
1208 remotes[remote_name] = entry
1209 config["remotes"] = remotes
1210 write_text_atomic(cp, _dump_toml(config))
1211 logger.info("✅ Upstream for branch %r set to %s/%r", branch, remote_name, branch)
1212
1213 def get_upstream(
1214 branch: str,
1215 repo_root: pathlib.Path | None = None,
1216 ) -> str | None:
1217 """Return the configured upstream remote name for *branch*, or ``None``.
1218
1219 Args:
1220 branch: Local branch name.
1221 repo_root: Repository root. Defaults to ``Path.cwd()``.
1222
1223 Returns:
1224 Remote name string, or ``None``.
1225 """
1226 config = _load_config(_config_path(repo_root))
1227 remotes = config.get("remotes")
1228 if remotes is None:
1229 return None
1230 for rname, entry in remotes.items():
1231 tracked = entry.get("branch", "")
1232 if tracked.strip() == branch:
1233 return rname
1234 return None
1235
1236 # ---------------------------------------------------------------------------
1237 # Global user config — ~/.muse/config.toml (safe_dirs)
1238 # ---------------------------------------------------------------------------
1239
1240 _GLOBAL_MUSE_DIR = _user_muse_dir()
1241 _GLOBAL_CONFIG_FILE = _user_config_toml_path()
1242
1243 def _load_global_config() -> _SecurityConfig:
1244 """Load ``~/.muse/config.toml`` and return the ``[security]`` section.
1245
1246 Returns a dict with key ``safe_dirs`` mapping to a list of path strings.
1247 Returns ``{"safe_dirs": []}`` when the file is absent or unparseable.
1248 """
1249 if not _GLOBAL_CONFIG_FILE.is_file():
1250 return {"safe_dirs": []}
1251 try:
1252 with _GLOBAL_CONFIG_FILE.open("rb") as fh:
1253 raw = tomllib.load(fh)
1254 except Exception as exc: # noqa: BLE001
1255 logger.warning("⚠️ Failed to parse %s: %s", _GLOBAL_CONFIG_FILE, exc)
1256 return {"safe_dirs": []}
1257 security_raw = raw.get("security")
1258 if not isinstance(security_raw, dict):
1259 return {"safe_dirs": []}
1260 dirs_raw = security_raw.get("safe_dirs")
1261 if not isinstance(dirs_raw, list):
1262 return {"safe_dirs": []}
1263 safe: list[str] = [d for d in dirs_raw if isinstance(d, str) and d.strip()]
1264 return {"safe_dirs": safe}
1265
1266 def _save_global_config(safe_dirs: list[str]) -> None:
1267 """Write ``[security] safe_dirs`` to ``~/.muse/config.toml``.
1268
1269 Preserves any other sections that may exist in the file.
1270 """
1271 import os
1272 _GLOBAL_MUSE_DIR.mkdir(parents=True, exist_ok=True)
1273
1274 # Read existing raw content to preserve other sections.
1275 existing_lines: list[str] = []
1276 if _GLOBAL_CONFIG_FILE.is_file():
1277 try:
1278 existing_lines = _GLOBAL_CONFIG_FILE.read_text("utf-8").splitlines()
1279 except Exception: # noqa: BLE001
1280 existing_lines = []
1281
1282 # Strip any existing [security] section from the file.
1283 filtered: list[str] = []
1284 in_security = False
1285 for line in existing_lines:
1286 stripped = line.strip()
1287 if stripped == "[security]":
1288 in_security = True
1289 continue
1290 if in_security and stripped.startswith("["):
1291 in_security = False
1292 if not in_security:
1293 filtered.append(line)
1294
1295 # Remove trailing blank lines before appending the new section.
1296 while filtered and not filtered[-1].strip():
1297 filtered.pop()
1298
1299 # Append the new [security] section.
1300 filtered.append("")
1301 filtered.append("[security]")
1302 if safe_dirs:
1303 items = ", ".join(f'"{_escape(d)}"' for d in safe_dirs)
1304 filtered.append(f"safe_dirs = [{items}]")
1305 else:
1306 filtered.append("safe_dirs = []")
1307 filtered.append("")
1308
1309 content = "\n".join(filtered)
1310 tmp = _GLOBAL_CONFIG_FILE.with_suffix(".toml.tmp")
1311 tmp.write_text(content, encoding="utf-8")
1312 os.replace(tmp, _GLOBAL_CONFIG_FILE)
1313
1314 def get_global_safe_dirs() -> list[str]:
1315 """Return the ``safe_dirs`` list from ``~/.muse/config.toml``.
1316
1317 Returns an empty list when not configured.
1318 """
1319 return _load_global_config().get("safe_dirs", [])
1320
1321 def add_global_safe_dir(path: str) -> None:
1322 """Add *path* to the ``safe_dirs`` list in ``~/.muse/config.toml``.
1323
1324 Normalises the path (``os.path.abspath``) before storing. Idempotent —
1325 adding the same path twice has no effect.
1326
1327 Args:
1328 path: Absolute or relative path to trust.
1329 """
1330 import os
1331 abs_path = os.path.abspath(path)
1332 current = get_global_safe_dirs()
1333 if abs_path not in current:
1334 current.append(abs_path)
1335 _save_global_config(current)
1336 logger.info("✅ Trusted path added: %s", abs_path)
1337
1338 def remove_global_safe_dir(path: str) -> None:
1339 """Remove *path* from the ``safe_dirs`` list in ``~/.muse/config.toml``.
1340
1341 Normalises the path before matching. No-op when the path is not present.
1342
1343 Args:
1344 path: Absolute or relative path to remove from the trust list.
1345 """
1346 import os
1347 abs_path = os.path.abspath(path)
1348 current = get_global_safe_dirs()
1349 updated = [d for d in current if d != abs_path]
1350 _save_global_config(updated)
1351 logger.info("✅ Trusted path removed: %s", abs_path)
File History 1 commit
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago