config.py
python
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠ breaking
28 days ago
| 1 | """Muse CLI configuration helpers. |
| 2 | |
| 3 | Reads and writes ``.muse/config.toml`` — the per-repository configuration |
| 4 | file. Credentials and user identity are **not** stored here; they live in |
| 5 | ``~/.muse/identity.toml`` managed by :mod:`muse.core.identity`. |
| 6 | |
| 7 | Config schema |
| 8 | ------------- |
| 9 | :: |
| 10 | |
| 11 | [hub] |
| 12 | url = "https://musehub.ai" # MuseHub fabric endpoint for this repo |
| 13 | |
| 14 | [remotes.origin] |
| 15 | url = "https://hub.muse.io/repos/my-repo" |
| 16 | branch = "main" |
| 17 | |
| 18 | [domain] |
| 19 | # Domain-specific key/value pairs; read by the active domain plugin. |
| 20 | # ticks_per_beat = "480" |
| 21 | |
| 22 | Settable via ``muse config set`` |
| 23 | --------------------------------- |
| 24 | - ``hub.url`` (alias: ``muse hub connect <url>``) |
| 25 | - ``domain.*`` |
| 26 | |
| 27 | Not settable via ``muse config set`` |
| 28 | -------------------------------------- |
| 29 | - ``user.*`` — use ``muse auth register`` / ``muse auth whoami`` |
| 30 | - ``remotes.*`` — use ``muse remote add/remove`` |
| 31 | - credentials — use ``muse auth register`` |
| 32 | |
| 33 | Token resolution |
| 34 | ---------------- |
| 35 | :func:`get_signing_identity` reads the hub URL from this file, then resolves the |
| 36 | signing identity from ``~/.muse/identity.toml`` via |
| 37 | :func:`muse.core.identity.resolve_token`. The token is **never** logged. |
| 38 | """ |
| 39 | |
| 40 | import fnmatch |
| 41 | import logging |
| 42 | import pathlib |
| 43 | import shutil |
| 44 | import subprocess |
| 45 | import tomllib |
| 46 | from typing import TypedDict |
| 47 | |
| 48 | from muse.core.types import short_id |
| 49 | from muse.core.paths import config_toml_path as _config_toml_path, user_muse_dir as _user_muse_dir, user_config_toml_path as _user_config_toml_path, remote_tracking_dir as _remote_tracking_dir, remote_ref_path as _remote_ref_path |
| 50 | from muse.core.refs import read_ref |
| 51 | from muse.core.store import write_text_atomic |
| 52 | |
| 53 | logger = logging.getLogger(__name__) |
| 54 | |
| 55 | type RemotesMap = dict[str, RemoteEntry] # remote_name → remote entry |
| 56 | type DomainConfig = dict[str, str] # domain key → value |
| 57 | type ConfigSection = dict[str, str] # generic flattened section dict |
| 58 | type ConfigTree = dict[str, ConfigSection] # section → key → value (for JSON output) |
| 59 | type DefaultsMap = dict[str, int] # config key → default int value |
| 60 | type _SecurityConfig = dict[str, list[str]] # security section from global config |
| 61 | |
| 62 | # --------------------------------------------------------------------------- |
| 63 | # Named configuration types |
| 64 | # --------------------------------------------------------------------------- |
| 65 | |
| 66 | class HubConfig(TypedDict, total=False): |
| 67 | """``[hub]`` section in ``.muse/config.toml``.""" |
| 68 | |
| 69 | url: str |
| 70 | |
| 71 | class RemoteEntry(TypedDict, total=False): |
| 72 | """``[remotes.<name>]`` section in ``.muse/config.toml``.""" |
| 73 | |
| 74 | url: str |
| 75 | branch: str |
| 76 | promisor: bool # when False, this remote is not used as a promisor for missing objects |
| 77 | |
| 78 | class LimitsConfig(TypedDict, total=False): |
| 79 | """``[limits]`` section in ``.muse/config.toml``. |
| 80 | |
| 81 | All values are optional — defaults are used when absent. Keys map to |
| 82 | the ``[limits]`` TOML table:: |
| 83 | |
| 84 | [limits] |
| 85 | max_walk_commits = 10000 # cap for walk_commits_between / muse log |
| 86 | max_ancestors = 50000 # cap for find_merge_base BFS |
| 87 | max_graph_commits = 50000 # cap for _collect_all_commits (--graph --all) |
| 88 | shard_prefix_length = 2 # object store shard depth: 2 (256 shards) |
| 89 | # or 4 (65536 shards) for very large repos |
| 90 | """ |
| 91 | |
| 92 | max_walk_commits: int |
| 93 | max_ancestors: int |
| 94 | max_graph_commits: int |
| 95 | shard_prefix_length: int |
| 96 | |
| 97 | class CommitConfig(TypedDict, total=False): |
| 98 | """``[commit]`` section in ``.muse/config.toml``.""" |
| 99 | |
| 100 | sign: bool |
| 101 | |
| 102 | |
| 103 | class BranchMeta(TypedDict, total=False): |
| 104 | """Per-branch metadata stored under ``[branch."<name>"]`` in config.toml. |
| 105 | |
| 106 | Fields written by ``muse branch --intent / --resumable``:: |
| 107 | |
| 108 | [branch."feat/my-thing"] |
| 109 | intent = "refactor auth layer" |
| 110 | resumable = true |
| 111 | |
| 112 | Fields written by ``muse push`` upstream tracking (preserved on read/write):: |
| 113 | |
| 114 | remote = "origin" |
| 115 | merge = "refs/heads/feat/my-thing" |
| 116 | """ |
| 117 | |
| 118 | intent: str # short description of what this branch is for |
| 119 | resumable: bool # true when this branch is a resumable agent checkpoint |
| 120 | remote: str # upstream remote name (e.g. "origin") |
| 121 | merge: str # upstream merge ref (e.g. "refs/heads/main") |
| 122 | |
| 123 | class MuseConfig(TypedDict, total=False): |
| 124 | """Structured view of the entire ``.muse/config.toml`` file.""" |
| 125 | |
| 126 | hub: HubConfig |
| 127 | remotes: RemotesMap |
| 128 | domain: DomainConfig |
| 129 | limits: LimitsConfig |
| 130 | commit: CommitConfig |
| 131 | branch: "dict[str, BranchMeta]" # branch_name → per-branch metadata |
| 132 | protected_branches: "list[str]" # fnmatch patterns from [protected_branches] |
| 133 | |
| 134 | class RemoteConfig(TypedDict, total=False): |
| 135 | """Public-facing remote descriptor returned by :func:`list_remotes`.""" |
| 136 | |
| 137 | name: str # always present |
| 138 | url: str # always present |
| 139 | |
| 140 | # --------------------------------------------------------------------------- |
| 141 | # Internal helpers |
| 142 | # --------------------------------------------------------------------------- |
| 143 | |
| 144 | def _config_path(repo_root: pathlib.Path | None) -> pathlib.Path: |
| 145 | root = (repo_root or pathlib.Path.cwd()).resolve() |
| 146 | return _config_toml_path(root) |
| 147 | |
| 148 | def _load_config(config_path: pathlib.Path) -> MuseConfig: |
| 149 | """Load and parse config.toml; return an empty MuseConfig if absent.""" |
| 150 | if not config_path.is_file(): |
| 151 | return {} |
| 152 | |
| 153 | try: |
| 154 | with config_path.open("rb") as fh: |
| 155 | raw = tomllib.load(fh) |
| 156 | except Exception as exc: # noqa: BLE001 |
| 157 | logger.warning("⚠️ Failed to parse %s: %s", config_path, exc) |
| 158 | return {} |
| 159 | |
| 160 | config: MuseConfig = {} |
| 161 | |
| 162 | hub_raw = raw.get("hub") |
| 163 | if isinstance(hub_raw, dict): |
| 164 | hub: HubConfig = {} |
| 165 | url_val = hub_raw.get("url") |
| 166 | if isinstance(url_val, str): |
| 167 | hub["url"] = url_val |
| 168 | config["hub"] = hub |
| 169 | |
| 170 | remotes_raw = raw.get("remotes") |
| 171 | if isinstance(remotes_raw, dict): |
| 172 | remotes: RemotesMap = {} |
| 173 | for name, remote_raw in remotes_raw.items(): |
| 174 | if isinstance(remote_raw, dict): |
| 175 | entry: RemoteEntry = {} |
| 176 | rurl = remote_raw.get("url") |
| 177 | if isinstance(rurl, str): |
| 178 | entry["url"] = rurl |
| 179 | branch_val = remote_raw.get("branch") |
| 180 | if isinstance(branch_val, str): |
| 181 | entry["branch"] = branch_val |
| 182 | promisor_val = remote_raw.get("promisor") |
| 183 | if isinstance(promisor_val, bool): |
| 184 | entry["promisor"] = promisor_val |
| 185 | remotes[name] = entry |
| 186 | config["remotes"] = remotes |
| 187 | |
| 188 | domain_raw = raw.get("domain") |
| 189 | if isinstance(domain_raw, dict): |
| 190 | domain: DomainConfig = {} |
| 191 | for key, val in domain_raw.items(): |
| 192 | if isinstance(val, str): |
| 193 | domain[key] = val |
| 194 | config["domain"] = domain |
| 195 | |
| 196 | limits_raw = raw.get("limits") |
| 197 | if isinstance(limits_raw, dict): |
| 198 | limits: LimitsConfig = {} |
| 199 | mwc = limits_raw.get("max_walk_commits") |
| 200 | if isinstance(mwc, int) and mwc > 0: |
| 201 | limits["max_walk_commits"] = mwc |
| 202 | ma = limits_raw.get("max_ancestors") |
| 203 | if isinstance(ma, int) and ma > 0: |
| 204 | limits["max_ancestors"] = ma |
| 205 | mgc = limits_raw.get("max_graph_commits") |
| 206 | if isinstance(mgc, int) and mgc > 0: |
| 207 | limits["max_graph_commits"] = mgc |
| 208 | spl = limits_raw.get("shard_prefix_length") |
| 209 | if isinstance(spl, int) and spl in (2, 4): |
| 210 | limits["shard_prefix_length"] = spl |
| 211 | config["limits"] = limits |
| 212 | |
| 213 | commit_raw = raw.get("commit") |
| 214 | if isinstance(commit_raw, dict): |
| 215 | commit_cfg: CommitConfig = {} |
| 216 | sign_v = commit_raw.get("sign") |
| 217 | if isinstance(sign_v, bool): |
| 218 | commit_cfg["sign"] = sign_v |
| 219 | if commit_cfg: |
| 220 | config["commit"] = commit_cfg |
| 221 | |
| 222 | branch_raw = raw.get("branch") |
| 223 | if isinstance(branch_raw, dict): |
| 224 | branch_map: dict[str, BranchMeta] = {} |
| 225 | for bname, bdata in branch_raw.items(): |
| 226 | if not isinstance(bdata, dict): |
| 227 | continue |
| 228 | bmeta: BranchMeta = {} |
| 229 | intent_v = bdata.get("intent") |
| 230 | if isinstance(intent_v, str): |
| 231 | bmeta["intent"] = intent_v |
| 232 | resumable_v = bdata.get("resumable") |
| 233 | if isinstance(resumable_v, bool): |
| 234 | bmeta["resumable"] = resumable_v |
| 235 | remote_v = bdata.get("remote") |
| 236 | if isinstance(remote_v, str): |
| 237 | bmeta["remote"] = remote_v |
| 238 | merge_v = bdata.get("merge") |
| 239 | if isinstance(merge_v, str): |
| 240 | bmeta["merge"] = merge_v |
| 241 | branch_map[bname] = bmeta |
| 242 | if branch_map: |
| 243 | config["branch"] = branch_map |
| 244 | |
| 245 | pb_raw = raw.get("protected_branches") |
| 246 | if isinstance(pb_raw, dict): |
| 247 | branches_val = pb_raw.get("branches") |
| 248 | if isinstance(branches_val, list): |
| 249 | patterns = [p for p in branches_val if isinstance(p, str)] |
| 250 | config["protected_branches"] = patterns |
| 251 | |
| 252 | return config |
| 253 | |
| 254 | def _escape(value: str) -> str: |
| 255 | """Escape a TOML basic string value (backslash and double-quote only). |
| 256 | |
| 257 | TOML basic strings allow control characters escaped as ``\\n``, ``\\t``, |
| 258 | etc., but we store only printable content — control characters in values |
| 259 | are also stripped here so the resulting TOML file remains parseable. |
| 260 | """ |
| 261 | return ( |
| 262 | value.replace("\\", "\\\\") |
| 263 | .replace('"', '\\"') |
| 264 | .replace("\n", "\\n") |
| 265 | .replace("\r", "\\r") |
| 266 | .replace("\0", "") |
| 267 | ) |
| 268 | |
| 269 | # Characters that are structurally significant in unquoted TOML keys and |
| 270 | # table headers. Any of these in a key name would allow injection of |
| 271 | # arbitrary TOML sections or key-value pairs. |
| 272 | _TOML_KEY_UNSAFE: frozenset[str] = frozenset('\n\r\0][="') |
| 273 | |
| 274 | def _validate_toml_key(key: str, context: str = "key") -> None: |
| 275 | """Raise ``ValueError`` if *key* contains TOML-structurally unsafe characters. |
| 276 | |
| 277 | Prevents injection attacks where a crafted key like ``x]\\n[injected`` would |
| 278 | break the TOML section structure and allow writing arbitrary sections. |
| 279 | |
| 280 | Args: |
| 281 | key: Key string to validate. |
| 282 | context: Human-readable label used in the error message (e.g. ``"domain key"``). |
| 283 | |
| 284 | Raises: |
| 285 | ValueError: If any character in *key* is in ``_TOML_KEY_UNSAFE``. |
| 286 | """ |
| 287 | bad = _TOML_KEY_UNSAFE & set(key) |
| 288 | if bad: |
| 289 | chars = ", ".join(sorted(repr(c) for c in bad)) |
| 290 | raise ValueError( |
| 291 | f"Config {context} {key!r} contains characters not allowed in TOML keys: {chars}" |
| 292 | ) |
| 293 | |
| 294 | def _dump_toml(config: MuseConfig) -> str: |
| 295 | """Serialise a MuseConfig to TOML text. |
| 296 | |
| 297 | Section order: ``[hub]``, ``[remotes.*]``, ``[domain]``, ``[limits]``. |
| 298 | |
| 299 | All key names are validated against ``_TOML_KEY_UNSAFE`` before being |
| 300 | written, preventing TOML injection via crafted domain keys or remote names. |
| 301 | """ |
| 302 | lines: list[str] = [] |
| 303 | |
| 304 | hub = config.get("hub") |
| 305 | if hub: |
| 306 | lines.append("[hub]") |
| 307 | url = hub.get("url", "") |
| 308 | if url: |
| 309 | lines.append(f'url = "{_escape(url)}"') |
| 310 | lines.append("") |
| 311 | |
| 312 | remotes = config.get("remotes") or {} |
| 313 | for remote_name in sorted(remotes): |
| 314 | # Remote names come from _load_config which parses TOML, so they are |
| 315 | # safe at read time. Validate defensively before writing. |
| 316 | _validate_toml_key(remote_name, "remote name") |
| 317 | entry = remotes[remote_name] |
| 318 | lines.append(f"[remotes.{remote_name}]") |
| 319 | rurl = entry.get("url", "") |
| 320 | if rurl: |
| 321 | lines.append(f'url = "{_escape(rurl)}"') |
| 322 | branch = entry.get("branch", "") |
| 323 | if branch: |
| 324 | lines.append(f'branch = "{_escape(branch)}"') |
| 325 | if "promisor" in entry: |
| 326 | lines.append(f'promisor = {"true" if entry["promisor"] else "false"}') |
| 327 | lines.append("") |
| 328 | |
| 329 | domain = config.get("domain") or {} |
| 330 | if domain: |
| 331 | lines.append("[domain]") |
| 332 | for key, val in sorted(domain.items()): |
| 333 | _validate_toml_key(key, "domain key") |
| 334 | lines.append(f'{key} = "{_escape(val)}"') |
| 335 | lines.append("") |
| 336 | |
| 337 | limits = config.get("limits") or {} |
| 338 | if limits: |
| 339 | lines.append("[limits]") |
| 340 | mwc = limits.get("max_walk_commits") |
| 341 | if mwc is not None: |
| 342 | lines.append(f"max_walk_commits = {mwc}") |
| 343 | ma = limits.get("max_ancestors") |
| 344 | if ma is not None: |
| 345 | lines.append(f"max_ancestors = {ma}") |
| 346 | mgc = limits.get("max_graph_commits") |
| 347 | if mgc is not None: |
| 348 | lines.append(f"max_graph_commits = {mgc}") |
| 349 | spl = limits.get("shard_prefix_length") |
| 350 | if spl is not None: |
| 351 | lines.append(f"shard_prefix_length = {spl}") |
| 352 | lines.append("") |
| 353 | |
| 354 | commit_cfg = config.get("commit") or {} |
| 355 | if commit_cfg: |
| 356 | lines.append("[commit]") |
| 357 | if "sign" in commit_cfg: |
| 358 | lines.append(f"sign = {'true' if commit_cfg['sign'] else 'false'}") |
| 359 | lines.append("") |
| 360 | |
| 361 | branch_sections = config.get("branch") or {} |
| 362 | for bname in sorted(branch_sections): |
| 363 | _validate_toml_key(bname, "branch name") |
| 364 | bmeta = branch_sections[bname] |
| 365 | # Skip empty metadata dicts — no section needed. |
| 366 | if not bmeta: |
| 367 | continue |
| 368 | # Branch names require quoted keys (may contain '/' and other chars |
| 369 | # that are not valid in bare TOML keys). |
| 370 | lines.append(f'[branch."{_escape(bname)}"]') |
| 371 | intent = bmeta.get("intent", "") |
| 372 | if intent: |
| 373 | lines.append(f'intent = "{_escape(intent)}"') |
| 374 | if "resumable" in bmeta: |
| 375 | lines.append(f"resumable = {'true' if bmeta['resumable'] else 'false'}") |
| 376 | remote = bmeta.get("remote", "") |
| 377 | if remote: |
| 378 | lines.append(f'remote = "{_escape(remote)}"') |
| 379 | merge = bmeta.get("merge", "") |
| 380 | if merge: |
| 381 | lines.append(f'merge = "{_escape(merge)}"') |
| 382 | lines.append("") |
| 383 | |
| 384 | return "\n".join(lines) |
| 385 | |
| 386 | # --------------------------------------------------------------------------- |
| 387 | # Auth token resolution (via identity store) |
| 388 | # --------------------------------------------------------------------------- |
| 389 | |
| 390 | def get_signing_identity( |
| 391 | repo_root: pathlib.Path | None = None, |
| 392 | remote_url: str | None = None, |
| 393 | agent_id: str | None = None, |
| 394 | ) -> "object | None": |
| 395 | """Return a :class:`~muse.core.transport.SigningIdentity` for a hub, or ``None``. |
| 396 | |
| 397 | Resolution order: |
| 398 | 1. ``MUSE_AGENT_KEY_FD`` environment variable — integer file descriptor |
| 399 | from which exactly 64 bytes of sub-seed are read (then the fd is |
| 400 | closed). The Ed25519 identity key is derived via |
| 401 | :func:`~muse.core.hdkeys.derive_identity_key`. The handle is taken |
| 402 | from ``MUSE_AGENT_HANDLE`` (defaults to *agent_id* if set, else |
| 403 | ``"agent"``). This is the only supported env-based injection mechanism; |
| 404 | the secret travels through the kernel pipe buffer and never appears in |
| 405 | ``/proc/<pid>/environ``. |
| 406 | 2. Agent-specific entry in ``~/.muse/identity.toml`` keyed by |
| 407 | ``"hostname#agent_id"`` — when *agent_id* is provided. |
| 408 | 3. Human entry in ``~/.muse/identity.toml`` keyed by bare hostname. |
| 409 | 4. Hub URL from ``[hub] url`` in ``.muse/config.toml`` (fallback lookup |
| 410 | URL when *remote_url* is not supplied). |
| 411 | |
| 412 | The private key is **never** logged. |
| 413 | |
| 414 | Args: |
| 415 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 416 | remote_url: URL of the specific remote being contacted. |
| 417 | agent_id: Agent handle, e.g. ``"agentception-abc123"``. Used to |
| 418 | try an agent-specific key before falling back to the |
| 419 | human key. |
| 420 | |
| 421 | Returns: |
| 422 | :class:`~muse.core.transport.SigningIdentity` or ``None``. |
| 423 | """ |
| 424 | import os as _os |
| 425 | from muse.core.identity import resolve_signing_identity # avoid circular import |
| 426 | from muse.core.transport import SigningIdentity |
| 427 | |
| 428 | # 1. MUSE_AGENT_KEY_FD — read 64-byte sub-seed from a pipe fd. |
| 429 | # This is the only supported env-var injection mechanism. |
| 430 | # The secret travels through the kernel pipe buffer and never appears |
| 431 | # in /proc/<pid>/environ. |
| 432 | key_fd_str = _os.environ.get("MUSE_AGENT_KEY_FD", "").strip() |
| 433 | if key_fd_str: |
| 434 | try: |
| 435 | key_fd = int(key_fd_str) |
| 436 | import os as _os2 |
| 437 | sub_seed = bytearray(_os2.read(key_fd, 64)) |
| 438 | _os2.close(key_fd) |
| 439 | if len(sub_seed) == 64: |
| 440 | from muse.core.hdkeys import derive_identity_key |
| 441 | dk = derive_identity_key(sub_seed) |
| 442 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 443 | private_key = Ed25519PrivateKey.from_private_bytes(dk.private_bytes) |
| 444 | dk.zero() |
| 445 | sub_seed[:] = b"\x00" * len(sub_seed) |
| 446 | handle = ( |
| 447 | _os.environ.get("MUSE_AGENT_HANDLE", "").strip() |
| 448 | or agent_id |
| 449 | or "agent" |
| 450 | ) |
| 451 | logger.debug("✅ Signing identity from MUSE_AGENT_KEY_FD (handle=%s)", handle) |
| 452 | return SigningIdentity(handle=handle, private_key=private_key) |
| 453 | logger.warning( |
| 454 | "⚠️ MUSE_AGENT_KEY_FD fd=%s yielded %d bytes (expected 64) — falling through", |
| 455 | key_fd_str, len(sub_seed), |
| 456 | ) |
| 457 | except Exception as exc: |
| 458 | logger.warning("⚠️ MUSE_AGENT_KEY_FD could not be read: %s — falling through", exc) |
| 459 | |
| 460 | # 2. Identity store lookup (agent key → human key fallback). |
| 461 | lookup_url: str | None = remote_url or get_hub_url(repo_root) |
| 462 | if lookup_url is None: |
| 463 | logger.debug("⚠️ No hub configured — skipping signing identity lookup") |
| 464 | return None |
| 465 | |
| 466 | result = resolve_signing_identity(lookup_url, agent_id=agent_id) |
| 467 | if result is None: |
| 468 | logger.debug( |
| 469 | "⚠️ No signing identity for hub %s — run `muse auth keygen && muse auth register`", |
| 470 | lookup_url, |
| 471 | ) |
| 472 | return None |
| 473 | |
| 474 | handle, private_key = result |
| 475 | logger.debug("✅ Signing identity resolved for hub %s (handle=%s)", lookup_url, handle) |
| 476 | return SigningIdentity(handle=handle, private_key=private_key) |
| 477 | |
| 478 | # --------------------------------------------------------------------------- |
| 479 | # Hub helpers |
| 480 | # --------------------------------------------------------------------------- |
| 481 | |
| 482 | def get_hub_url(repo_root: pathlib.Path | None = None) -> str | None: |
| 483 | """Return the hub URL from ``[hub] url``, or ``None`` if not configured. |
| 484 | |
| 485 | Resolution order: |
| 486 | 1. ``<repo>/.muse/config.toml`` — repo-local config (highest priority). |
| 487 | 2. ``~/.muse/config.toml`` — global user config (fallback). |
| 488 | |
| 489 | This fallback ensures ``muse auth whoami`` and other hub-aware commands |
| 490 | work without ``--hub`` even when invoked outside a repository, as long as |
| 491 | the user has set a default hub in their global config. |
| 492 | |
| 493 | Args: |
| 494 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 495 | |
| 496 | Returns: |
| 497 | URL string, or ``None``. |
| 498 | """ |
| 499 | config = _load_config(_config_path(repo_root)) |
| 500 | hub = config.get("hub") |
| 501 | if hub is not None: |
| 502 | url = hub.get("url", "") |
| 503 | if url.strip(): |
| 504 | return url.strip() |
| 505 | |
| 506 | # Fall back to ~/.muse/config.toml so hub-aware commands (e.g. `muse auth |
| 507 | # whoami`) work without --hub when invoked outside a repository. |
| 508 | global_config = _load_config(_GLOBAL_CONFIG_FILE) |
| 509 | global_hub = global_config.get("hub") |
| 510 | if global_hub is not None: |
| 511 | url = global_hub.get("url", "") |
| 512 | if url.strip(): |
| 513 | return url.strip() |
| 514 | |
| 515 | return None |
| 516 | |
| 517 | def set_hub_url(url: str, repo_root: pathlib.Path | None = None) -> None: |
| 518 | """Write ``[hub] url`` to ``.muse/config.toml``. |
| 519 | |
| 520 | Preserves all other sections. Creates the config file if absent. |
| 521 | Rejects ``http://`` URLs — Muse never contacts a hub over cleartext HTTP. |
| 522 | |
| 523 | Args: |
| 524 | url: Hub URL (must be ``https://``). |
| 525 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 526 | |
| 527 | Raises: |
| 528 | ValueError: If *url* does not use the ``https://`` scheme. |
| 529 | """ |
| 530 | _is_loopback = url.startswith("http://localhost") or url.startswith("http://127.0.0.1") or url.startswith("http://[::1]") |
| 531 | if not url.startswith("https://") and not _is_loopback: |
| 532 | raise ValueError( |
| 533 | f"Hub URL must use HTTPS. Got: {url!r}\n" |
| 534 | "Muse never connects to a hub over cleartext HTTP.\n" |
| 535 | "(Exception: http://localhost and http://127.0.0.1 are allowed for local development.)" |
| 536 | ) |
| 537 | cp = _config_path(repo_root) |
| 538 | cp.parent.mkdir(parents=True, exist_ok=True) |
| 539 | config = _load_config(cp) |
| 540 | config["hub"] = HubConfig(url=url) |
| 541 | write_text_atomic(cp, _dump_toml(config)) |
| 542 | logger.info("✅ Hub URL set to %s", url) |
| 543 | |
| 544 | def clear_hub_url(repo_root: pathlib.Path | None = None) -> None: |
| 545 | """Remove the ``[hub]`` section from ``.muse/config.toml``. |
| 546 | |
| 547 | Args: |
| 548 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 549 | """ |
| 550 | cp = _config_path(repo_root) |
| 551 | config = _load_config(cp) |
| 552 | if "hub" in config: |
| 553 | del config["hub"] |
| 554 | write_text_atomic(cp, _dump_toml(config)) |
| 555 | logger.info("✅ Hub disconnected") |
| 556 | |
| 557 | # --------------------------------------------------------------------------- |
| 558 | # Generic dotted-key helpers |
| 559 | # --------------------------------------------------------------------------- |
| 560 | |
| 561 | _BlockedNS = dict[str, str] |
| 562 | _BLOCKED_NAMESPACES: _BlockedNS = { |
| 563 | "auth": "Use `muse auth keygen` and `muse auth register` to manage credentials.", |
| 564 | "remotes": "Use `muse remote add/remove/rename` to manage remotes.", |
| 565 | "user": "User identity is managed via `muse auth register`. Run `muse auth whoami` to inspect.", |
| 566 | } |
| 567 | |
| 568 | _SETTABLE_NAMESPACES = {"hub", "domain", "limits", "commit"} |
| 569 | |
| 570 | # Default cap values — used when the [limits] section is absent or the key |
| 571 | # is not set. These are the same values that were previously hardcoded inside |
| 572 | # the individual functions. |
| 573 | _DEFAULT_MAX_WALK_COMMITS: int = 10_000 |
| 574 | _DEFAULT_MAX_ANCESTORS: int = 50_000 |
| 575 | _DEFAULT_MAX_GRAPH_COMMITS: int = 50_000 |
| 576 | _DEFAULT_SHARD_PREFIX_LENGTH: int = 2 |
| 577 | |
| 578 | def get_limit(key: str, repo_root: pathlib.Path | None = None) -> int: |
| 579 | """Return a ``[limits]`` integer cap from config, or its default. |
| 580 | |
| 581 | Args: |
| 582 | key: Limit key — one of ``max_walk_commits``, ``max_ancestors``, |
| 583 | ``max_graph_commits``. |
| 584 | repo_root: Repository root; ``None`` falls back to ``Path.cwd()``. |
| 585 | |
| 586 | Returns: |
| 587 | Configured integer value, or the built-in default if not set. |
| 588 | """ |
| 589 | defaults: DefaultsMap = { |
| 590 | "max_walk_commits": _DEFAULT_MAX_WALK_COMMITS, |
| 591 | "max_ancestors": _DEFAULT_MAX_ANCESTORS, |
| 592 | "max_graph_commits": _DEFAULT_MAX_GRAPH_COMMITS, |
| 593 | "shard_prefix_length": _DEFAULT_SHARD_PREFIX_LENGTH, |
| 594 | } |
| 595 | default = defaults.get(key, 10_000) |
| 596 | config = _load_config(_config_path(repo_root)) |
| 597 | limits = config.get("limits") or {} |
| 598 | # Explicit key dispatch keeps mypy happy on TypedDict literal-required keys. |
| 599 | if key == "max_walk_commits": |
| 600 | val: int | None = limits.get("max_walk_commits") |
| 601 | elif key == "max_ancestors": |
| 602 | val = limits.get("max_ancestors") |
| 603 | elif key == "max_graph_commits": |
| 604 | val = limits.get("max_graph_commits") |
| 605 | elif key == "shard_prefix_length": |
| 606 | val = limits.get("shard_prefix_length") |
| 607 | else: |
| 608 | val = None |
| 609 | if isinstance(val, int) and val > 0: |
| 610 | return val |
| 611 | return default |
| 612 | |
| 613 | def get_config_value(key: str, repo_root: pathlib.Path | None = None) -> str | None: |
| 614 | """Get a config value by dotted key (e.g. ``user.handle``, ``hub.url``). |
| 615 | |
| 616 | Returns ``None`` when the key is not set or the namespace is unknown. |
| 617 | |
| 618 | Args: |
| 619 | key: Dotted key in ``<namespace>.<subkey>`` form. |
| 620 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 621 | |
| 622 | Returns: |
| 623 | String value, or ``None``. |
| 624 | """ |
| 625 | parts = key.split(".", 1) |
| 626 | if len(parts) != 2: |
| 627 | return None |
| 628 | namespace, subkey = parts |
| 629 | config = _load_config(_config_path(repo_root)) |
| 630 | |
| 631 | if namespace == "user": |
| 632 | # User identity lives in identity.toml, keyed by the configured hub URL. |
| 633 | hub_url = (config.get("hub") or {}).get("url", "") |
| 634 | if not hub_url: |
| 635 | return None |
| 636 | try: |
| 637 | from muse.core.identity import load_identity, hostname_from_url |
| 638 | hostname = hostname_from_url(hub_url) |
| 639 | entry = load_identity(hostname) |
| 640 | if entry is None: |
| 641 | return None |
| 642 | if subkey == "handle": |
| 643 | return entry.get("handle") |
| 644 | if subkey == "type": |
| 645 | return entry.get("type") |
| 646 | if subkey == "display_name": |
| 647 | return entry.get("display_name") |
| 648 | if subkey == "email": |
| 649 | return entry.get("email") |
| 650 | except Exception: |
| 651 | pass |
| 652 | return None |
| 653 | |
| 654 | if namespace == "hub": |
| 655 | hub = config.get("hub") or {} |
| 656 | if subkey == "url": |
| 657 | return hub.get("url") |
| 658 | return None |
| 659 | |
| 660 | if namespace == "domain": |
| 661 | domain = config.get("domain") or {} |
| 662 | return domain.get(subkey) |
| 663 | |
| 664 | if namespace == "limits": |
| 665 | limits = config.get("limits") or {} |
| 666 | if subkey == "max_walk_commits": |
| 667 | v = limits.get("max_walk_commits") |
| 668 | return str(v) if isinstance(v, int) else None |
| 669 | if subkey == "max_ancestors": |
| 670 | v = limits.get("max_ancestors") |
| 671 | return str(v) if isinstance(v, int) else None |
| 672 | if subkey == "max_graph_commits": |
| 673 | v = limits.get("max_graph_commits") |
| 674 | return str(v) if isinstance(v, int) else None |
| 675 | if subkey == "shard_prefix_length": |
| 676 | v = limits.get("shard_prefix_length") |
| 677 | return str(v) if isinstance(v, int) else None |
| 678 | return None |
| 679 | |
| 680 | if namespace == "commit": |
| 681 | commit = config.get("commit") or {} |
| 682 | if subkey == "sign": |
| 683 | v = commit.get("sign") |
| 684 | if v is True: |
| 685 | return "true" |
| 686 | if v is False: |
| 687 | return "false" |
| 688 | return None |
| 689 | return None |
| 690 | |
| 691 | return None |
| 692 | |
| 693 | def set_config_value(key: str, value: str, repo_root: pathlib.Path | None = None) -> None: |
| 694 | """Set a config value by dotted key (e.g. ``user.handle``, ``domain.ticks_per_beat``). |
| 695 | |
| 696 | Args: |
| 697 | key: Dotted key in ``<namespace>.<subkey>`` form. |
| 698 | value: New string value. |
| 699 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 700 | |
| 701 | Raises: |
| 702 | ValueError: If the namespace is blocked, unknown, or the subkey is invalid. |
| 703 | """ |
| 704 | parts = key.split(".", 1) |
| 705 | if len(parts) != 2: |
| 706 | raise ValueError(f"Key must be in 'namespace.subkey' form, got: {key!r}") |
| 707 | namespace, subkey = parts |
| 708 | |
| 709 | if namespace in _BLOCKED_NAMESPACES: |
| 710 | raise ValueError(_BLOCKED_NAMESPACES[namespace]) |
| 711 | |
| 712 | if namespace not in _SETTABLE_NAMESPACES: |
| 713 | raise ValueError( |
| 714 | f"Unknown config namespace {namespace!r}. " |
| 715 | f"Settable namespaces: {', '.join(sorted(_SETTABLE_NAMESPACES))}" |
| 716 | ) |
| 717 | |
| 718 | cp = _config_path(repo_root) |
| 719 | cp.parent.mkdir(parents=True, exist_ok=True) |
| 720 | config = _load_config(cp) |
| 721 | |
| 722 | if namespace == "user": |
| 723 | set_user_field(subkey, value, repo_root) |
| 724 | return |
| 725 | |
| 726 | if namespace == "hub": |
| 727 | if subkey != "url": |
| 728 | raise ValueError(f"Unknown [hub] config key: {subkey!r}. Valid keys: url") |
| 729 | # Route through set_hub_url — it enforces the HTTPS requirement. |
| 730 | set_hub_url(value, repo_root) |
| 731 | return |
| 732 | |
| 733 | if namespace == "limits": |
| 734 | _LIMITS_KEYS = frozenset({ |
| 735 | "max_walk_commits", "max_ancestors", "max_graph_commits", "shard_prefix_length", |
| 736 | }) |
| 737 | if subkey not in _LIMITS_KEYS: |
| 738 | raise ValueError( |
| 739 | f"Unknown [limits] config key: {subkey!r}. " |
| 740 | f"Valid keys: {', '.join(sorted(_LIMITS_KEYS))}" |
| 741 | ) |
| 742 | try: |
| 743 | int_value = int(value) |
| 744 | except ValueError as exc: |
| 745 | raise ValueError( |
| 746 | f"[limits] {subkey} must be an integer, got: {value!r}" |
| 747 | ) from exc |
| 748 | if int_value <= 0: |
| 749 | raise ValueError(f"[limits] {subkey} must be a positive integer, got: {int_value}") |
| 750 | if subkey == "shard_prefix_length" and int_value not in (2, 4): |
| 751 | raise ValueError("shard_prefix_length must be 2 or 4") |
| 752 | limits_section: LimitsConfig = config.get("limits") or {} |
| 753 | if subkey == "max_walk_commits": |
| 754 | limits_section["max_walk_commits"] = int_value |
| 755 | elif subkey == "max_ancestors": |
| 756 | limits_section["max_ancestors"] = int_value |
| 757 | elif subkey == "max_graph_commits": |
| 758 | limits_section["max_graph_commits"] = int_value |
| 759 | elif subkey == "shard_prefix_length": |
| 760 | limits_section["shard_prefix_length"] = int_value |
| 761 | config["limits"] = limits_section |
| 762 | write_text_atomic(cp, _dump_toml(config)) |
| 763 | logger.info("✅ limits.%s = %d", subkey, int_value) |
| 764 | return |
| 765 | |
| 766 | if namespace == "commit": |
| 767 | _COMMIT_KEYS = frozenset({"sign"}) |
| 768 | if subkey not in _COMMIT_KEYS: |
| 769 | raise ValueError( |
| 770 | f"Unknown [commit] config key: {subkey!r}. " |
| 771 | f"Valid keys: {', '.join(sorted(_COMMIT_KEYS))}" |
| 772 | ) |
| 773 | if value not in ("true", "false"): |
| 774 | raise ValueError(f"[commit] {subkey} must be 'true' or 'false', got: {value!r}") |
| 775 | commit_section: CommitConfig = config.get("commit") or {} |
| 776 | if subkey == "sign": |
| 777 | commit_section["sign"] = value == "true" |
| 778 | config["commit"] = commit_section |
| 779 | write_text_atomic(cp, _dump_toml(config)) |
| 780 | logger.info("✅ commit.%s = %s", subkey, value) |
| 781 | return |
| 782 | |
| 783 | # namespace == "domain" |
| 784 | _validate_toml_key(subkey, "domain key") |
| 785 | domain: DomainConfig = config.get("domain") or {} |
| 786 | domain[subkey] = value |
| 787 | config["domain"] = domain |
| 788 | write_text_atomic(cp, _dump_toml(config)) |
| 789 | logger.info("✅ domain.%s = %r", subkey, value) |
| 790 | |
| 791 | def config_as_dict(repo_root: pathlib.Path | None = None) -> ConfigTree: |
| 792 | """Return the full config as a plain ``dict[str, dict[str, str]]`` for JSON output. |
| 793 | |
| 794 | Credentials are never included — the hub section only contains the URL. |
| 795 | |
| 796 | Args: |
| 797 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 798 | |
| 799 | Returns: |
| 800 | Nested dict suitable for ``json.dumps``. |
| 801 | """ |
| 802 | config = _load_config(_config_path(repo_root)) |
| 803 | result: ConfigTree = {} |
| 804 | |
| 805 | hub = config.get("hub") |
| 806 | if hub: |
| 807 | hub_url = hub.get("url", "") |
| 808 | if hub_url: |
| 809 | result["hub"] = {"url": hub_url} |
| 810 | |
| 811 | remotes = config.get("remotes") or {} |
| 812 | if remotes: |
| 813 | remotes_dict: ConfigSection = {} |
| 814 | for rname, entry in sorted(remotes.items()): |
| 815 | url = entry.get("url", "") |
| 816 | if url: |
| 817 | remotes_dict[rname] = url |
| 818 | if remotes_dict: |
| 819 | result["remotes"] = remotes_dict |
| 820 | |
| 821 | domain = config.get("domain") or {} |
| 822 | if domain: |
| 823 | result["domain"] = dict(sorted(domain.items())) |
| 824 | |
| 825 | limits = config.get("limits") or {} |
| 826 | if limits: |
| 827 | limits_dict: ConfigSection = {} |
| 828 | for lk in ("max_walk_commits", "max_ancestors", "max_graph_commits", "shard_prefix_length"): |
| 829 | lv = limits.get(lk) |
| 830 | if lv is not None: |
| 831 | limits_dict[lk] = str(lv) |
| 832 | if limits_dict: |
| 833 | result["limits"] = limits_dict |
| 834 | |
| 835 | return result |
| 836 | |
| 837 | def config_path_for_editor(repo_root: pathlib.Path | None = None) -> pathlib.Path: |
| 838 | """Return the config path for the ``config edit`` command.""" |
| 839 | return _config_path(repo_root) |
| 840 | |
| 841 | # --------------------------------------------------------------------------- |
| 842 | # Branch metadata helpers |
| 843 | # --------------------------------------------------------------------------- |
| 844 | |
| 845 | def write_branch_meta( |
| 846 | repo_root: pathlib.Path, |
| 847 | branch_name: str, |
| 848 | *, |
| 849 | intent: str | None = None, |
| 850 | resumable: bool | None = None, |
| 851 | ) -> None: |
| 852 | """Write per-branch metadata to ``[branch."<name>"]`` in ``.muse/config.toml``. |
| 853 | |
| 854 | Only the supplied keyword arguments are updated; existing fields |
| 855 | (``remote``, ``merge``, and previously written ``intent``/``resumable``) |
| 856 | are preserved unchanged. |
| 857 | |
| 858 | Args: |
| 859 | repo_root: Repository root directory. |
| 860 | branch_name: Name of the branch (e.g. ``"feat/my-thing"``). |
| 861 | intent: Short description of what this branch is for. |
| 862 | resumable: Mark this branch as a resumable agent checkpoint. |
| 863 | """ |
| 864 | _validate_toml_key(branch_name, "branch name") |
| 865 | cp = _config_path(repo_root) |
| 866 | cp.parent.mkdir(parents=True, exist_ok=True) |
| 867 | config = _load_config(cp) |
| 868 | branch_map: dict[str, BranchMeta] = dict(config.get("branch") or {}) |
| 869 | entry: BranchMeta = dict(branch_map.get(branch_name) or {}) # type: ignore[arg-type] |
| 870 | if intent is not None: |
| 871 | entry["intent"] = intent |
| 872 | if resumable is not None: |
| 873 | entry["resumable"] = resumable |
| 874 | branch_map[branch_name] = entry |
| 875 | config["branch"] = branch_map |
| 876 | write_text_atomic(cp, _dump_toml(config)) |
| 877 | |
| 878 | def delete_branch_meta(repo_root: pathlib.Path, branch_name: str) -> None: |
| 879 | """Remove the ``[branch."<name>"]`` section from ``.muse/config.toml``. |
| 880 | |
| 881 | Called by ``muse branch -d/-D`` after a successful branch deletion so |
| 882 | stale intent/resumable entries do not accumulate indefinitely. No-op |
| 883 | when the branch has no metadata or the config file is absent. |
| 884 | """ |
| 885 | cp = _config_path(repo_root) |
| 886 | if not cp.exists(): |
| 887 | return |
| 888 | config = _load_config(cp) |
| 889 | branch_map = dict(config.get("branch") or {}) |
| 890 | if branch_name not in branch_map: |
| 891 | return |
| 892 | del branch_map[branch_name] |
| 893 | config["branch"] = branch_map |
| 894 | write_text_atomic(cp, _dump_toml(config)) |
| 895 | |
| 896 | def read_branch_meta( |
| 897 | repo_root: pathlib.Path, |
| 898 | branch_name: str, |
| 899 | ) -> BranchMeta: |
| 900 | """Return per-branch metadata from ``.muse/config.toml``. |
| 901 | |
| 902 | Returns an empty dict when the branch has no metadata or the config file |
| 903 | is absent. |
| 904 | |
| 905 | Args: |
| 906 | repo_root: Repository root directory. |
| 907 | branch_name: Name of the branch (e.g. ``"feat/my-thing"``). |
| 908 | |
| 909 | Returns: |
| 910 | Dict with any of: ``intent`` (str), ``resumable`` (bool), |
| 911 | ``remote`` (str), ``merge`` (str). |
| 912 | """ |
| 913 | config = _load_config(_config_path(repo_root)) |
| 914 | branch_map = config.get("branch") or {} |
| 915 | return dict(branch_map.get(branch_name) or {}) |
| 916 | |
| 917 | # --------------------------------------------------------------------------- |
| 918 | # Protected branches helpers |
| 919 | # --------------------------------------------------------------------------- |
| 920 | |
| 921 | def get_protected_branches(repo_root: pathlib.Path | None = None) -> list[str]: |
| 922 | """Return the list of protected branch patterns from ``[protected_branches]``. |
| 923 | |
| 924 | Returns an empty list when the section is absent or has no ``branches`` key. |
| 925 | |
| 926 | Args: |
| 927 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 928 | """ |
| 929 | config = _load_config(_config_path(repo_root)) |
| 930 | return list(config.get("protected_branches") or []) |
| 931 | |
| 932 | def is_branch_protected(branch: str, patterns: list[str]) -> bool: |
| 933 | """Return ``True`` if *branch* matches any pattern in *patterns*. |
| 934 | |
| 935 | Patterns are matched with :func:`fnmatch.fnmatch` (shell-style globs). |
| 936 | Matching is case-sensitive, consistent with Python fnmatch behaviour. |
| 937 | |
| 938 | Args: |
| 939 | branch: Branch name to test (e.g. ``"release/1.0"``). |
| 940 | patterns: List of patterns from ``[protected_branches] branches``. |
| 941 | """ |
| 942 | return any(fnmatch.fnmatch(branch, p) for p in patterns) |
| 943 | |
| 944 | # --------------------------------------------------------------------------- |
| 945 | # Remote helpers |
| 946 | # --------------------------------------------------------------------------- |
| 947 | |
| 948 | def get_remote(name: str, repo_root: pathlib.Path | None = None) -> str | None: |
| 949 | """Return the URL for remote *name*, or ``None`` when not configured. |
| 950 | |
| 951 | Args: |
| 952 | name: Remote name (e.g. ``"origin"``). |
| 953 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 954 | |
| 955 | Returns: |
| 956 | URL string, or ``None``. |
| 957 | """ |
| 958 | config = _load_config(_config_path(repo_root)) |
| 959 | remotes = config.get("remotes") |
| 960 | if remotes is None: |
| 961 | return None |
| 962 | entry = remotes.get(name) |
| 963 | if entry is None: |
| 964 | return None |
| 965 | url = entry.get("url", "") |
| 966 | return url.strip() if url.strip() else None |
| 967 | |
| 968 | def set_remote( |
| 969 | name: str, |
| 970 | url: str, |
| 971 | repo_root: pathlib.Path | None = None, |
| 972 | ) -> None: |
| 973 | """Write ``[remotes.<name>] url`` to ``.muse/config.toml``. |
| 974 | |
| 975 | Preserves all other sections. Creates the file if absent. |
| 976 | |
| 977 | Args: |
| 978 | name: Remote name (e.g. ``"origin"``). |
| 979 | url: Remote URL. |
| 980 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 981 | """ |
| 982 | cp = _config_path(repo_root) |
| 983 | cp.parent.mkdir(parents=True, exist_ok=True) |
| 984 | config = _load_config(cp) |
| 985 | existing_remotes = config.get("remotes") |
| 986 | remotes: RemotesMap = {} |
| 987 | if existing_remotes: |
| 988 | remotes.update(existing_remotes) |
| 989 | existing_entry = remotes.get(name) |
| 990 | entry: RemoteEntry = {} |
| 991 | if existing_entry is not None: |
| 992 | if "url" in existing_entry: |
| 993 | entry["url"] = existing_entry["url"] |
| 994 | if "branch" in existing_entry: |
| 995 | entry["branch"] = existing_entry["branch"] |
| 996 | entry["url"] = url |
| 997 | remotes[name] = entry |
| 998 | config["remotes"] = remotes |
| 999 | write_text_atomic(cp, _dump_toml(config)) |
| 1000 | logger.info("✅ Remote %r set to %s", name, url) |
| 1001 | |
| 1002 | def remove_remote( |
| 1003 | name: str, |
| 1004 | repo_root: pathlib.Path | None = None, |
| 1005 | ) -> None: |
| 1006 | """Remove a named remote and its tracking refs. |
| 1007 | |
| 1008 | Args: |
| 1009 | name: Remote name to remove. |
| 1010 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1011 | |
| 1012 | Raises: |
| 1013 | KeyError: If *name* is not a configured remote. |
| 1014 | """ |
| 1015 | cp = _config_path(repo_root) |
| 1016 | config = _load_config(cp) |
| 1017 | remotes = config.get("remotes") |
| 1018 | if remotes is None or name not in remotes: |
| 1019 | raise KeyError(name) |
| 1020 | del remotes[name] |
| 1021 | config["remotes"] = remotes |
| 1022 | write_text_atomic(cp, _dump_toml(config)) |
| 1023 | logger.info("✅ Remote %r removed from config", name) |
| 1024 | |
| 1025 | root = (repo_root or pathlib.Path.cwd()).resolve() |
| 1026 | refs_dir = _remote_tracking_dir(root, name) |
| 1027 | if refs_dir.is_symlink(): |
| 1028 | # Refuse to rmtree a symlink — following a symlink placed by an |
| 1029 | # attacker could delete files outside the repository tree. |
| 1030 | logger.warning("⚠️ Skipping rmtree: remotes dir %s is a symlink", refs_dir) |
| 1031 | elif refs_dir.is_dir(): |
| 1032 | shutil.rmtree(refs_dir) |
| 1033 | logger.debug("✅ Removed tracking refs dir %s", refs_dir) |
| 1034 | |
| 1035 | def rename_remote( |
| 1036 | old_name: str, |
| 1037 | new_name: str, |
| 1038 | repo_root: pathlib.Path | None = None, |
| 1039 | ) -> None: |
| 1040 | """Rename a remote and move its tracking refs. |
| 1041 | |
| 1042 | Args: |
| 1043 | old_name: Current remote name. |
| 1044 | new_name: Desired new remote name. |
| 1045 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1046 | |
| 1047 | Raises: |
| 1048 | KeyError: If *old_name* is not a configured remote. |
| 1049 | ValueError: If *new_name* is already configured. |
| 1050 | """ |
| 1051 | cp = _config_path(repo_root) |
| 1052 | config = _load_config(cp) |
| 1053 | remotes = config.get("remotes") |
| 1054 | if remotes is None or old_name not in remotes: |
| 1055 | raise KeyError(old_name) |
| 1056 | if new_name in remotes: |
| 1057 | raise ValueError(new_name) |
| 1058 | remotes[new_name] = remotes.pop(old_name) |
| 1059 | config["remotes"] = remotes |
| 1060 | write_text_atomic(cp, _dump_toml(config)) |
| 1061 | logger.info("✅ Remote %r renamed to %r", old_name, new_name) |
| 1062 | |
| 1063 | root = (repo_root or pathlib.Path.cwd()).resolve() |
| 1064 | old_refs_dir = _remote_tracking_dir(root, old_name) |
| 1065 | new_refs_dir = _remote_tracking_dir(root, new_name) |
| 1066 | if old_refs_dir.is_dir(): |
| 1067 | old_refs_dir.rename(new_refs_dir) |
| 1068 | logger.debug("✅ Moved tracking refs dir %s → %s", old_refs_dir, new_refs_dir) |
| 1069 | |
| 1070 | def list_remotes(repo_root: pathlib.Path | None = None) -> list[RemoteConfig]: |
| 1071 | """Return all configured remotes sorted alphabetically by name. |
| 1072 | |
| 1073 | Args: |
| 1074 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1075 | |
| 1076 | Returns: |
| 1077 | List of ``{"name": str, "url": str}`` dicts. |
| 1078 | """ |
| 1079 | config = _load_config(_config_path(repo_root)) |
| 1080 | remotes = config.get("remotes") |
| 1081 | if remotes is None: |
| 1082 | return [] |
| 1083 | result: list[RemoteConfig] = [] |
| 1084 | for remote_name in sorted(remotes): |
| 1085 | entry = remotes[remote_name] |
| 1086 | url = entry.get("url", "") |
| 1087 | if not url.strip(): |
| 1088 | continue |
| 1089 | rc = RemoteConfig(name=remote_name, url=url.strip()) |
| 1090 | result.append(rc) |
| 1091 | return result |
| 1092 | |
| 1093 | # --------------------------------------------------------------------------- |
| 1094 | # Remote tracking-head helpers |
| 1095 | # --------------------------------------------------------------------------- |
| 1096 | |
| 1097 | def _remote_head_path( |
| 1098 | remote_name: str, |
| 1099 | branch: str, |
| 1100 | repo_root: pathlib.Path | None = None, |
| 1101 | ) -> pathlib.Path: |
| 1102 | """Return the path to the remote tracking pointer file.""" |
| 1103 | root = (repo_root or pathlib.Path.cwd()).resolve() |
| 1104 | return _remote_ref_path(root, remote_name, branch) |
| 1105 | |
| 1106 | def get_remote_head( |
| 1107 | remote_name: str, |
| 1108 | branch: str, |
| 1109 | repo_root: pathlib.Path | None = None, |
| 1110 | ) -> str | None: |
| 1111 | """Return the last-known remote commit ID for *remote_name*/*branch*. |
| 1112 | |
| 1113 | Returns ``None`` when the tracking pointer does not exist. |
| 1114 | |
| 1115 | Args: |
| 1116 | remote_name: Remote name (e.g. ``"origin"``). |
| 1117 | branch: Branch name (e.g. ``"main"``). |
| 1118 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1119 | |
| 1120 | Returns: |
| 1121 | Commit ID string, or ``None``. |
| 1122 | """ |
| 1123 | return read_ref(_remote_head_path(remote_name, branch, repo_root)) |
| 1124 | |
| 1125 | def set_remote_head( |
| 1126 | remote_name: str, |
| 1127 | branch: str, |
| 1128 | commit_id: str, |
| 1129 | repo_root: pathlib.Path | None = None, |
| 1130 | ) -> None: |
| 1131 | """Write the remote tracking pointer for *remote_name*/*branch*. |
| 1132 | |
| 1133 | Args: |
| 1134 | remote_name: Remote name (e.g. ``"origin"``). |
| 1135 | branch: Branch name. |
| 1136 | commit_id: Commit ID to record as the known remote HEAD. |
| 1137 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1138 | """ |
| 1139 | pointer = _remote_head_path(remote_name, branch, repo_root) |
| 1140 | write_text_atomic(pointer, commit_id) |
| 1141 | logger.debug("✅ Remote head %s/%s → %s", remote_name, branch, short_id(commit_id)) |
| 1142 | |
| 1143 | def delete_remote_head( |
| 1144 | remote_name: str, |
| 1145 | branch: str, |
| 1146 | repo_root: pathlib.Path | None = None, |
| 1147 | ) -> bool: |
| 1148 | """Remove the local remote-tracking pointer for *remote_name*/*branch*. |
| 1149 | |
| 1150 | Used after ``muse push --delete`` deletes the branch on the server, or when |
| 1151 | pruning stale tracking refs with ``muse branch -dr``. |
| 1152 | |
| 1153 | Args: |
| 1154 | remote_name: Remote name (e.g. ``"origin"``). |
| 1155 | branch: Branch name (e.g. ``"feat/my-thing"``). |
| 1156 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1157 | |
| 1158 | Returns: |
| 1159 | ``True`` if the pointer file existed and was removed, ``False`` if it |
| 1160 | was already absent (idempotent). |
| 1161 | """ |
| 1162 | pointer = _remote_head_path(remote_name, branch, repo_root) |
| 1163 | if not pointer.is_file(): |
| 1164 | return False |
| 1165 | pointer.unlink() |
| 1166 | # Remove now-empty parent directories (mirrors _cleanup_empty_dirs in branch.py). |
| 1167 | remotes_dir = pointer.parent |
| 1168 | while remotes_dir.name != remote_name: |
| 1169 | try: |
| 1170 | remotes_dir.rmdir() |
| 1171 | except OSError: |
| 1172 | break |
| 1173 | remotes_dir = remotes_dir.parent |
| 1174 | logger.debug("🗑 Remote tracking ref %s/%s removed", remote_name, branch) |
| 1175 | return True |
| 1176 | |
| 1177 | # --------------------------------------------------------------------------- |
| 1178 | # Upstream tracking helpers |
| 1179 | # --------------------------------------------------------------------------- |
| 1180 | |
| 1181 | def set_upstream( |
| 1182 | branch: str, |
| 1183 | remote_name: str, |
| 1184 | repo_root: pathlib.Path | None = None, |
| 1185 | ) -> None: |
| 1186 | """Record *remote_name* as the upstream remote for *branch*. |
| 1187 | |
| 1188 | Args: |
| 1189 | branch: Local (and remote) branch name. |
| 1190 | remote_name: Remote name. |
| 1191 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1192 | """ |
| 1193 | cp = _config_path(repo_root) |
| 1194 | cp.parent.mkdir(parents=True, exist_ok=True) |
| 1195 | config = _load_config(cp) |
| 1196 | existing_remotes = config.get("remotes") |
| 1197 | remotes: RemotesMap = {} |
| 1198 | if existing_remotes: |
| 1199 | remotes.update(existing_remotes) |
| 1200 | existing_entry = remotes.get(remote_name) |
| 1201 | entry: RemoteEntry = {} |
| 1202 | if existing_entry is not None: |
| 1203 | if "url" in existing_entry: |
| 1204 | entry["url"] = existing_entry["url"] |
| 1205 | if "branch" in existing_entry: |
| 1206 | entry["branch"] = existing_entry["branch"] |
| 1207 | entry["branch"] = branch |
| 1208 | remotes[remote_name] = entry |
| 1209 | config["remotes"] = remotes |
| 1210 | write_text_atomic(cp, _dump_toml(config)) |
| 1211 | logger.info("✅ Upstream for branch %r set to %s/%r", branch, remote_name, branch) |
| 1212 | |
| 1213 | def get_upstream( |
| 1214 | branch: str, |
| 1215 | repo_root: pathlib.Path | None = None, |
| 1216 | ) -> str | None: |
| 1217 | """Return the configured upstream remote name for *branch*, or ``None``. |
| 1218 | |
| 1219 | Args: |
| 1220 | branch: Local branch name. |
| 1221 | repo_root: Repository root. Defaults to ``Path.cwd()``. |
| 1222 | |
| 1223 | Returns: |
| 1224 | Remote name string, or ``None``. |
| 1225 | """ |
| 1226 | config = _load_config(_config_path(repo_root)) |
| 1227 | remotes = config.get("remotes") |
| 1228 | if remotes is None: |
| 1229 | return None |
| 1230 | for rname, entry in remotes.items(): |
| 1231 | tracked = entry.get("branch", "") |
| 1232 | if tracked.strip() == branch: |
| 1233 | return rname |
| 1234 | return None |
| 1235 | |
| 1236 | # --------------------------------------------------------------------------- |
| 1237 | # Global user config — ~/.muse/config.toml (safe_dirs) |
| 1238 | # --------------------------------------------------------------------------- |
| 1239 | |
| 1240 | _GLOBAL_MUSE_DIR = _user_muse_dir() |
| 1241 | _GLOBAL_CONFIG_FILE = _user_config_toml_path() |
| 1242 | |
| 1243 | def _load_global_config() -> _SecurityConfig: |
| 1244 | """Load ``~/.muse/config.toml`` and return the ``[security]`` section. |
| 1245 | |
| 1246 | Returns a dict with key ``safe_dirs`` mapping to a list of path strings. |
| 1247 | Returns ``{"safe_dirs": []}`` when the file is absent or unparseable. |
| 1248 | """ |
| 1249 | if not _GLOBAL_CONFIG_FILE.is_file(): |
| 1250 | return {"safe_dirs": []} |
| 1251 | try: |
| 1252 | with _GLOBAL_CONFIG_FILE.open("rb") as fh: |
| 1253 | raw = tomllib.load(fh) |
| 1254 | except Exception as exc: # noqa: BLE001 |
| 1255 | logger.warning("⚠️ Failed to parse %s: %s", _GLOBAL_CONFIG_FILE, exc) |
| 1256 | return {"safe_dirs": []} |
| 1257 | security_raw = raw.get("security") |
| 1258 | if not isinstance(security_raw, dict): |
| 1259 | return {"safe_dirs": []} |
| 1260 | dirs_raw = security_raw.get("safe_dirs") |
| 1261 | if not isinstance(dirs_raw, list): |
| 1262 | return {"safe_dirs": []} |
| 1263 | safe: list[str] = [d for d in dirs_raw if isinstance(d, str) and d.strip()] |
| 1264 | return {"safe_dirs": safe} |
| 1265 | |
| 1266 | def _save_global_config(safe_dirs: list[str]) -> None: |
| 1267 | """Write ``[security] safe_dirs`` to ``~/.muse/config.toml``. |
| 1268 | |
| 1269 | Preserves any other sections that may exist in the file. |
| 1270 | """ |
| 1271 | import os |
| 1272 | _GLOBAL_MUSE_DIR.mkdir(parents=True, exist_ok=True) |
| 1273 | |
| 1274 | # Read existing raw content to preserve other sections. |
| 1275 | existing_lines: list[str] = [] |
| 1276 | if _GLOBAL_CONFIG_FILE.is_file(): |
| 1277 | try: |
| 1278 | existing_lines = _GLOBAL_CONFIG_FILE.read_text("utf-8").splitlines() |
| 1279 | except Exception: # noqa: BLE001 |
| 1280 | existing_lines = [] |
| 1281 | |
| 1282 | # Strip any existing [security] section from the file. |
| 1283 | filtered: list[str] = [] |
| 1284 | in_security = False |
| 1285 | for line in existing_lines: |
| 1286 | stripped = line.strip() |
| 1287 | if stripped == "[security]": |
| 1288 | in_security = True |
| 1289 | continue |
| 1290 | if in_security and stripped.startswith("["): |
| 1291 | in_security = False |
| 1292 | if not in_security: |
| 1293 | filtered.append(line) |
| 1294 | |
| 1295 | # Remove trailing blank lines before appending the new section. |
| 1296 | while filtered and not filtered[-1].strip(): |
| 1297 | filtered.pop() |
| 1298 | |
| 1299 | # Append the new [security] section. |
| 1300 | filtered.append("") |
| 1301 | filtered.append("[security]") |
| 1302 | if safe_dirs: |
| 1303 | items = ", ".join(f'"{_escape(d)}"' for d in safe_dirs) |
| 1304 | filtered.append(f"safe_dirs = [{items}]") |
| 1305 | else: |
| 1306 | filtered.append("safe_dirs = []") |
| 1307 | filtered.append("") |
| 1308 | |
| 1309 | content = "\n".join(filtered) |
| 1310 | tmp = _GLOBAL_CONFIG_FILE.with_suffix(".toml.tmp") |
| 1311 | tmp.write_text(content, encoding="utf-8") |
| 1312 | os.replace(tmp, _GLOBAL_CONFIG_FILE) |
| 1313 | |
| 1314 | def get_global_safe_dirs() -> list[str]: |
| 1315 | """Return the ``safe_dirs`` list from ``~/.muse/config.toml``. |
| 1316 | |
| 1317 | Returns an empty list when not configured. |
| 1318 | """ |
| 1319 | return _load_global_config().get("safe_dirs", []) |
| 1320 | |
| 1321 | def add_global_safe_dir(path: str) -> None: |
| 1322 | """Add *path* to the ``safe_dirs`` list in ``~/.muse/config.toml``. |
| 1323 | |
| 1324 | Normalises the path (``os.path.abspath``) before storing. Idempotent — |
| 1325 | adding the same path twice has no effect. |
| 1326 | |
| 1327 | Args: |
| 1328 | path: Absolute or relative path to trust. |
| 1329 | """ |
| 1330 | import os |
| 1331 | abs_path = os.path.abspath(path) |
| 1332 | current = get_global_safe_dirs() |
| 1333 | if abs_path not in current: |
| 1334 | current.append(abs_path) |
| 1335 | _save_global_config(current) |
| 1336 | logger.info("✅ Trusted path added: %s", abs_path) |
| 1337 | |
| 1338 | def remove_global_safe_dir(path: str) -> None: |
| 1339 | """Remove *path* from the ``safe_dirs`` list in ``~/.muse/config.toml``. |
| 1340 | |
| 1341 | Normalises the path before matching. No-op when the path is not present. |
| 1342 | |
| 1343 | Args: |
| 1344 | path: Absolute or relative path to remove from the trust list. |
| 1345 | """ |
| 1346 | import os |
| 1347 | abs_path = os.path.abspath(path) |
| 1348 | current = get_global_safe_dirs() |
| 1349 | updated = [d for d in current if d != abs_path] |
| 1350 | _save_global_config(updated) |
| 1351 | logger.info("✅ Trusted path removed: %s", abs_path) |
File History
1 commit
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago