"""muse remote — manage remote repository connections. Subcommands ----------- muse remote [-v] [--json] List configured remotes muse remote add [--json] Register a new remote muse remote get-url [--json] Print a remote's URL muse remote remove [--json] Remove a remote and its tracking refs muse remote rename [--json] Rename a remote muse remote set-url [--json] Update a remote's URL muse remote status [--json] Check reachability and last-known refs All remote URLs and tracking data are stored in ``.muse/config.toml`` and ``.muse/remotes//`` — no network calls are made except for ``muse remote status`` which pings the remote's health endpoint. JSON schema (subcommand-specific) ---------------------------------- ``muse remote [--json]``:: { "remotes": [{"name": "", "url": "", "tracking": "/", "head": ""}] } ``muse remote add|remove|rename|set-url [--json]``:: {"status": "ok", "name": "", "url": "|null", "old_name": "|null", "new_name": "|null"} ``muse remote get-url [--json]``:: {"name": "", "url": ""} ``muse remote status [--json]``:: {"remote": "", "url": "", "server_root": "", "reachable": true|false, "http_status": |null, "message": "", "tracked_refs": {"": ""}} Exit codes ---------- 0 — success 1 — user error (unknown remote, duplicate remote, invalid URL scheme, invalid name) 2 — not inside a Muse repository 5 — remote unreachable (status subcommand) """ from __future__ import annotations import argparse import json import logging import sys import urllib.error import urllib.request from typing import TYPE_CHECKING, TypedDict from urllib.parse import urlparse from muse.cli.config import ( get_remote, get_remote_head, get_upstream, list_remotes, remove_remote, rename_remote, set_remote, ) from muse.core.errors import ExitCode from muse.core.repo import require_repo from muse.core.validation import sanitize_display if TYPE_CHECKING: import pathlib type _RefMap = dict[str, str] logger = logging.getLogger(__name__) # Remote name: alphanumeric, dash, underscore, dot. No slashes, no spaces. _VALID_REMOTE_NAME_CHARS = frozenset( "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "0123456789-_." ) # Only allow http and https — no file://, ftp://, data://, etc. _ALLOWED_URL_SCHEMES = frozenset({"http", "https"}) # Prevent unbounded writes to config.toml. _MAX_REMOTE_NAME_LEN = 100 _MAX_URL_LEN = 2048 # ── TypedDicts ──────────────────────────────────────────────────────────────── class _RemoteEntryJson(TypedDict): """Single remote entry in ``muse remote --json`` list output.""" name: str url: str tracking: str # "/" or empty head: str # last-known HEAD sha8 or empty class _RemoteListJson(TypedDict): """JSON schema for ``muse remote [--json]``.""" remotes: list[_RemoteEntryJson] class _RemoteMutationJson(TypedDict): """JSON schema for add / remove / rename / set-url subcommands.""" status: str # "ok" | "error" name: str # primary remote name (new name for rename) url: str | None # applicable URL, null for remove/rename old_url: str | None # set-url only: the URL before the update old_name: str | None # rename only new_name: str | None # rename only class _RemoteGetUrlJson(TypedDict): """JSON schema for ``muse remote get-url``.""" name: str url: str class _RemoteStatusJson(TypedDict): """JSON schema for ``muse remote status``.""" remote: str url: str server_root: str reachable: bool http_status: int | None message: str tracked_refs: _RefMap # ── Validation helpers ──────────────────────────────────────────────────────── def _validate_remote_name(name: str) -> str | None: """Return an error message if *name* is not a valid remote name, else None.""" if not name: return "Remote name must not be empty." if len(name) > _MAX_REMOTE_NAME_LEN: return f"Remote name is too long ({len(name)} chars); maximum is {_MAX_REMOTE_NAME_LEN}." invalid = {c for c in name if c not in _VALID_REMOTE_NAME_CHARS} if invalid: shown = ", ".join(repr(c) for c in sorted(invalid)) return f"Remote name contains invalid characters: {shown}" return None def _validate_url_scheme(url: str) -> str | None: """Return an error message if *url* does not use an allowed scheme, else None.""" scheme = urlparse(url).scheme.lower() if scheme not in _ALLOWED_URL_SCHEMES: allowed = ", ".join(sorted(_ALLOWED_URL_SCHEMES)) return f"URL scheme '{sanitize_display(scheme)}' is not allowed. Use one of: {allowed}" return None def _collect_tracked_refs(remotes_dir: "pathlib.Path") -> _RefMap: """Walk *remotes_dir* recursively and return branch → sha8 mapping. Handles nested branch names (e.g. ``feat/ui`` stored as ``remotes_dir/feat/ui``). Symlinks are skipped to prevent path-traversal attacks — the same guard applied in ``muse fetch``. """ refs: _RefMap = {} if not remotes_dir.exists(): return refs _walk_refs(remotes_dir, remotes_dir, refs) return refs def _walk_refs(base: "pathlib.Path", current: "pathlib.Path", acc: _RefMap) -> None: """Recursively populate *acc* with branch_name → sha8 from *current*.""" for entry in sorted(current.iterdir()): if entry.is_symlink(): logger.debug("⚠️ Skipping symlink in remotes dir: %s", entry) continue if entry.is_dir(): _walk_refs(base, entry, acc) elif entry.is_file(): branch = str(entry.relative_to(base)) sha = entry.read_text().strip() acc[branch] = sha[:8] if sha else "(empty)" # ── register ────────────────────────────────────────────────────────────────── def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``muse remote`` subcommand tree and all its flags. Every subcommand accepts ``--json`` for machine-readable output. All diagnostic messages (errors, hints) go to stderr; success JSON or plain output goes to stdout. """ parser = subparsers.add_parser( "remote", help="Manage remote repository connections.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "-v", "--verbose", action="store_true", help="Show URLs and last-known HEAD with each remote (like git remote -v).", ) parser.add_argument( "--json", action="store_true", dest="json_output", default=False, help="Emit JSON to stdout instead of human-readable text.", ) subs = parser.add_subparsers(dest="subcommand", metavar="SUBCOMMAND") # ── add ────────────────────────────────────────────────────────────────── add_p = subs.add_parser( "add", help="Register a new remote repository connection.", description=( "Register a new named remote in .muse/config.toml.\n\n" "Remote name rules:\n" " - Alphanumeric characters, dash (-), underscore (_), and dot (.) only\n" " - No slashes, spaces, or control characters\n" f" - Maximum {_MAX_REMOTE_NAME_LEN} characters\n\n" "URL rules:\n" " - http:// or https:// only — file://, ftp://, data://, etc. are blocked\n" f" - Maximum {_MAX_URL_LEN} characters\n" " - Leading/trailing whitespace is stripped automatically\n\n" "Agent quickstart:\n" " muse remote add origin https://musehub.ai/gabriel/my-repo\n" " muse remote add origin https://musehub.ai/gabriel/my-repo --json\n" " muse remote add upstream https://musehub.ai/upstream/my-repo\n\n" "Exit codes:\n" " 0 Remote added successfully\n" " 1 Invalid name, invalid URL scheme, or remote already exists\n" " 2 Not inside a Muse repository" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) add_p.add_argument("name", help="Name for the new remote (e.g. origin).") add_p.add_argument("url", help="URL of the remote repository (http/https only).") add_p.add_argument( "--json", "-j", action="store_true", dest="json_output", default=False, help="Emit JSON to stdout.", ) add_p.set_defaults(func=run_add) # ── get-url ─────────────────────────────────────────────────────────────── get_url_p = subs.add_parser( "get-url", help="Print the URL of a remote.", description=( "Print the URL of a named remote.\n\n" "In text mode the bare URL is written to stdout — designed for shell\n" "composition without extra quoting or parsing:\n" " URL=$(muse remote get-url origin)\n" " muse push $URL\n\n" "In JSON mode a structured object is emitted to stdout:\n" " {\"name\": \"origin\", \"url\": \"https://...\"}\n\n" "Agent quickstart:\n" " muse remote get-url origin\n" " muse remote get-url origin --json\n" " muse remote get-url origin -j # same, short flag\n" " muse remote get-url origin --json | jq -r '.url'\n\n" "Exit codes:\n" " 0 URL printed to stdout\n" " 1 Invalid remote name, or remote does not exist\n" " 2 Not inside a Muse repository" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) get_url_p.add_argument("name", help="Remote name.") get_url_p.add_argument( "--json", "-j", action="store_true", dest="json_output", default=False, help="Emit JSON to stdout.", ) get_url_p.set_defaults(func=run_get_url) # ── remove ─────────────────────────────────────────────────────────────── remove_p = subs.add_parser( "remove", help="Remove a remote and all its tracking refs.", description=( "Remove a named remote from .muse/config.toml and delete its\n" "tracking refs directory (.muse/remotes//).\n\n" "Both the config entry and the tracking refs are deleted atomically\n" "— if the tracking refs directory does not exist, the command still\n" "succeeds as long as the config entry is present.\n\n" "Agent quickstart:\n" " muse remote remove origin\n" " muse remote remove origin --json\n" " muse remote remove origin -j # same, short flag\n\n" "The --json response includes the removed URL so agents can confirm\n" "or undo the operation:\n" " {\"status\": \"ok\", \"name\": \"origin\", \"url\": \"https://...\", ...}\n\n" "Exit codes:\n" " 0 Remote removed successfully\n" " 1 Remote does not exist, or name is invalid\n" " 2 Not inside a Muse repository" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) remove_p.add_argument("name", help="Name of the remote to remove.") remove_p.add_argument( "--json", "-j", action="store_true", dest="json_output", default=False, help="Emit JSON to stdout.", ) remove_p.set_defaults(func=run_remove) # ── rename ─────────────────────────────────────────────────────────────── rename_p = subs.add_parser( "rename", help="Rename a remote and move its tracking refs.", description=( "Rename a remote in .muse/config.toml and move its tracking refs\n" "directory from .muse/remotes// to .muse/remotes//.\n\n" "Both and are validated against remote name rules\n" "(alphanumeric + dash, underscore, dot; max 100 chars) before any write.\n\n" "Agent quickstart:\n" " muse remote rename origin upstream\n" " muse remote rename origin upstream --json\n" " muse remote rename origin upstream -j # same, short flag\n\n" "The --json response includes the URL so agents can verify the rename:\n" " {\"status\": \"ok\", \"name\": \"upstream\",\n" " \"url\": \"https://...\", \"old_name\": \"origin\", \"new_name\": \"upstream\"}\n\n" "Exit codes:\n" " 0 Remote renamed successfully\n" " 1 Invalid name, old name does not exist, or new name already taken\n" " 2 Not inside a Muse repository" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) rename_p.add_argument("old_name", help="Current remote name.") rename_p.add_argument("new_name", help="New remote name.") rename_p.add_argument( "--json", "-j", action="store_true", dest="json_output", default=False, help="Emit JSON to stdout.", ) rename_p.set_defaults(func=run_rename) # ── set-url ─────────────────────────────────────────────────────────────── set_url_p = subs.add_parser( "set-url", help="Update the URL of an existing remote.", description=( "Update the URL of an existing named remote in .muse/config.toml.\n\n" "Remote name rules:\n" " - Alphanumeric characters, dash (-), underscore (_), and dot (.) only\n" " - No slashes, spaces, or control characters\n" f" - Maximum {_MAX_REMOTE_NAME_LEN} characters\n\n" "URL rules:\n" " - http:// or https:// only — file://, ftp://, data://, etc. are blocked\n" f" - Maximum {_MAX_URL_LEN} characters\n" " - Leading/trailing whitespace is stripped automatically\n\n" "Agent quickstart:\n" " muse remote set-url origin https://musehub.ai/gabriel/new-repo\n" " muse remote set-url origin https://musehub.ai/gabriel/new-repo --json\n" " muse remote set-url origin https://musehub.ai/gabriel/new-repo -j\n\n" "The --json response includes old_url so agents can confirm or undo:\n" " {\"status\": \"ok\", \"name\": \"origin\",\n" " \"url\": \"https://...(new)\", \"old_url\": \"https://...(old)\", ...}\n\n" "Exit codes:\n" " 0 URL updated successfully\n" " 1 Invalid name, invalid URL scheme, oversized URL, or remote does not exist\n" " 2 Not inside a Muse repository" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) set_url_p.add_argument("name", help="Remote name.") set_url_p.add_argument("url", help="New URL for the remote (http/https only).") set_url_p.add_argument( "--json", "-j", action="store_true", dest="json_output", default=False, help="Emit JSON to stdout.", ) set_url_p.set_defaults(func=run_set_url) # ── status ──────────────────────────────────────────────────────────────── status_p = subs.add_parser( "status", help="Check reachability and last-known refs for a remote (read-only, no fetch).", description=( "Ping a remote's /health endpoint and show locally cached tracking refs.\n\n" "This command is READ-ONLY — it never fetches, writes, or modifies local state.\n" "Use it to verify a hub is reachable before running 'muse push' or 'muse fetch'.\n\n" "The tracking refs shown are cached from previous fetch/push operations and are\n" "only as current as the last 'muse fetch'. An empty refs list does not mean the\n" "remote is empty — it means no fetch has been run yet.\n\n" "Agent quickstart:\n" " muse remote status origin\n" " muse remote status origin --json\n" " muse remote status origin -j # same, short flag\n" " muse remote status origin --json --timeout 10\n" " muse remote status origin --json | jq '.reachable'\n\n" "JSON schema:\n" " {\"remote\": \"origin\", \"url\": \"https://...\", \"server_root\": \"https://...\",\n" " \"reachable\": true|false, \"http_status\": |null, \"message\": \"...\",\n" " \"tracked_refs\": {\"main\": \"\", \"feat/ui\": \"\"}}\n\n" "Exit codes:\n" " 0 Remote is reachable\n" " 1 Invalid remote name, or remote does not exist\n" " 2 Not inside a Muse repository\n" " 5 Remote is unreachable (network error, timeout, or non-2xx response)" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) status_p.add_argument("name", help="Remote name.") status_p.add_argument( "--json", "-j", action="store_true", dest="json_output", default=False, help="Emit JSON to stdout instead of human-readable output.", ) status_p.add_argument( "--timeout", dest="timeout", type=float, default=6.0, help="HTTP connect timeout in seconds (default: 6).", ) status_p.set_defaults(func=run_status) parser.set_defaults(func=run) # ── list (no subcommand) ───────────────────────────────────────────────────── def run(args: argparse.Namespace) -> None: """List configured remotes. With no flags prints bare names (one per line). With ``-v``/``--verbose`` prints fetch and push lines with URL and last-known HEAD (mirroring ``git remote -v``). With ``--json`` emits a :class:`_RemoteListJson` object on stdout; all other output goes to stderr. """ verbose: bool = args.verbose json_output: bool = args.json_output root = require_repo() remotes = list_remotes(root) if json_output: entries: list[_RemoteEntryJson] = [] for r in remotes: upstream = get_upstream(r["name"], root) head = get_remote_head(r["name"], upstream or "main", root) if upstream else None entries.append({ "name": r["name"], "url": r["url"], "tracking": f"{r['name']}/{upstream}" if upstream else "", "head": head[:8] if head else "", }) out: _RemoteListJson = {"remotes": entries} print(json.dumps(out)) return if not remotes: print("No remotes configured. Use 'muse remote add '.", file=sys.stderr) return name_width = max(len(r["name"]) for r in remotes) for r in remotes: if verbose: upstream = get_upstream(r["name"], root) head = get_remote_head(r["name"], upstream or "main", root) head_str = f" @ {head[:8]}" if head else "" tracking = f" -> {r['name']}/{upstream}" if upstream else "" label = f"{r['name']:<{name_width}}" print(f"{sanitize_display(label)}\t{sanitize_display(r['url'])}{tracking}{head_str} (fetch)") print(f"{sanitize_display(label)}\t{sanitize_display(r['url'])}{tracking}{head_str} (push)") else: print(r["name"]) # ── add ─────────────────────────────────────────────────────────────────────── def run_add(args: argparse.Namespace) -> None: """Register a new remote repository connection. Validates all inputs before any write: - Remote name: alphanumeric + ``-_.``, no slashes/spaces/control chars, max :data:`_MAX_REMOTE_NAME_LEN` characters. - URL: ``http`` or ``https`` scheme only; leading/trailing whitespace is stripped before validation so pasted URLs with trailing newlines work correctly. Max :data:`_MAX_URL_LEN` characters. - Duplicate check: exits with a hint to use ``muse remote set-url`` when the remote already exists. Diagnostics go to stderr; ``--json`` emits :class:`_RemoteMutationJson` to stdout. Exit codes: 0 Remote written to ``.muse/config.toml``. 1 Invalid name, invalid/oversized URL, or remote already exists. 2 Not inside a Muse repository. """ name: str = args.name url: str = args.url.strip() # strip whitespace — pasted URLs often have trailing newlines json_output: bool = args.json_output if err := _validate_remote_name(name): print(f"❌ Invalid remote name '{sanitize_display(name)}': {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if len(url) > _MAX_URL_LEN: print( f"❌ URL is too long ({len(url)} chars); maximum is {_MAX_URL_LEN}.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if err := _validate_url_scheme(url): print(f"❌ {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() existing = get_remote(name, root) if existing is not None: print( f"❌ Remote '{sanitize_display(name)}' already exists: {sanitize_display(existing)}", file=sys.stderr, ) print( f" Use 'muse remote set-url {sanitize_display(name)} ' to update it.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) set_remote(name, url, root) if json_output: result: _RemoteMutationJson = { "status": "ok", "name": name, "url": url, "old_url": None, "old_name": None, "new_name": None, } print(json.dumps(result)) else: print(f"✅ Remote '{sanitize_display(name)}' added: {sanitize_display(url)}", file=sys.stderr) # ── remove ──────────────────────────────────────────────────────────────────── def run_remove(args: argparse.Namespace) -> None: """Remove a remote and all its tracking refs. Validates the remote name format first (same rules as ``muse remote add``) so invalid-looking names produce a clear format error rather than a misleading "does not exist" message. The removed URL is captured before deletion and included in the ``--json`` response, giving agents enough information to confirm the correct remote was removed or to undo the operation with ``muse remote add``. The tracking refs directory (``.muse/remotes//``) is removed with ``shutil.rmtree`` if present. If that path is a symlink the deletion is skipped and a warning is logged — following a symlink could delete files outside the repository tree. Exit codes: 0 Remote removed from config and tracking refs cleaned up. 1 Invalid name, or remote does not exist. 2 Not inside a Muse repository. """ name: str = args.name json_output: bool = args.json_output if err := _validate_remote_name(name): print(f"❌ Invalid remote name '{sanitize_display(name)}': {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() # Capture the URL before removal so it can be returned in JSON output. removed_url: str | None = get_remote(name, root) if removed_url is None: print(f"❌ Remote '{sanitize_display(name)}' does not exist.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) try: remove_remote(name, root) except KeyError: print(f"❌ Remote '{sanitize_display(name)}' does not exist.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if json_output: result: _RemoteMutationJson = { "status": "ok", "name": name, "url": removed_url, "old_url": None, "old_name": None, "new_name": None, } print(json.dumps(result)) else: print(f"✅ Remote '{sanitize_display(name)}' removed.", file=sys.stderr) # ── rename ──────────────────────────────────────────────────────────────────── def run_rename(args: argparse.Namespace) -> None: """Rename a remote and move its tracking refs. Both *old_name* and *new_name* are validated against remote name rules before any repo or filesystem access, so invalid-looking names produce a clear format error rather than a confusing "does not exist" message. The URL is looked up before the rename and included in the ``--json`` response so agents can verify which remote was renamed. The tracking refs directory (``.muse/remotes//``) is moved to ``.muse/remotes//`` via an atomic ``os.rename`` if it exists. Exit codes: 0 Remote renamed in config and tracking refs moved. 1 Invalid name, old remote does not exist, or new name already taken. 2 Not inside a Muse repository. """ old_name: str = args.old_name new_name: str = args.new_name json_output: bool = args.json_output if err := _validate_remote_name(old_name): print(f"❌ Invalid remote name '{sanitize_display(old_name)}': {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if err := _validate_remote_name(new_name): print(f"❌ Invalid remote name '{sanitize_display(new_name)}': {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() # Capture the URL before renaming so it can be returned in JSON output. renamed_url: str | None = get_remote(old_name, root) try: rename_remote(old_name, new_name, root) except KeyError: print(f"❌ Remote '{sanitize_display(old_name)}' does not exist.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) except ValueError: print(f"❌ Remote '{sanitize_display(new_name)}' already exists.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if json_output: result: _RemoteMutationJson = { "status": "ok", "name": new_name, "url": renamed_url, "old_url": None, "old_name": old_name, "new_name": new_name, } print(json.dumps(result)) else: print( f"✅ Remote '{sanitize_display(old_name)}' renamed to '{sanitize_display(new_name)}'.", file=sys.stderr, ) # ── get-url ─────────────────────────────────────────────────────────────────── def run_get_url(args: argparse.Namespace) -> None: """Print the URL of a remote. Validates the remote name format before any repo or filesystem access so an invalid-looking name produces a clear format error rather than a misleading "does not exist" message. In text mode the bare URL is printed to stdout via :func:`~muse.core.validation.sanitize_display` so ANSI escape codes that might have been placed in ``config.toml`` by direct editing cannot inject terminal control sequences. For shell composition the sanitized URL is virtually always identical to the stored one — valid URLs contain no ANSI. In JSON mode a :class:`_RemoteGetUrlJson` object is emitted to stdout; JSON string encoding neutralises any control characters in the value. Exit codes: 0 URL printed to stdout. 1 Invalid remote name, or remote does not exist. 2 Not inside a Muse repository. """ name: str = args.name json_output: bool = args.json_output if err := _validate_remote_name(name): print(f"❌ Invalid remote name '{sanitize_display(name)}': {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() url = get_remote(name, root) if url is None: print(f"❌ Remote '{sanitize_display(name)}' does not exist.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if json_output: out: _RemoteGetUrlJson = {"name": name, "url": url} print(json.dumps(out)) else: # Bare URL on stdout — intended for shell composition: $(muse remote get-url origin) # sanitize_display strips ANSI/control chars that might appear in a hand-edited config. print(sanitize_display(url)) # ── set-url ─────────────────────────────────────────────────────────────────── def run_set_url(args: argparse.Namespace) -> None: """Update the URL of an existing remote. Validates all inputs before any write: - Remote name: alphanumeric + ``-_.``, no slashes/spaces/control chars, max :data:`_MAX_REMOTE_NAME_LEN` characters. Validated before :func:`~muse.core.repo.require_repo` so invalid-looking names produce a clear format error rather than a "does not exist" message. - URL: ``http`` or ``https`` scheme only; leading/trailing whitespace is stripped before validation so pasted URLs with trailing newlines work correctly. Max :data:`_MAX_URL_LEN` characters. The previous URL is captured before the write and included in the ``--json`` response as ``old_url``, giving agents enough information to confirm the correct remote was updated or to undo the operation with another ``muse remote set-url``. Diagnostics go to stderr; ``--json`` emits :class:`_RemoteMutationJson` to stdout. Exit codes: 0 URL updated in ``.muse/config.toml``. 1 Invalid name, invalid/oversized URL, or remote does not exist. 2 Not inside a Muse repository. """ name: str = args.name url: str = args.url.strip() # strip whitespace — pasted URLs often have trailing newlines json_output: bool = args.json_output if err := _validate_remote_name(name): print(f"❌ Invalid remote name '{sanitize_display(name)}': {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if len(url) > _MAX_URL_LEN: print( f"❌ URL is too long ({len(url)} chars); maximum is {_MAX_URL_LEN}.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if err := _validate_url_scheme(url): print(f"❌ {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() old_url = get_remote(name, root) if old_url is None: print(f"❌ Remote '{sanitize_display(name)}' does not exist.", file=sys.stderr) print( f" Use 'muse remote add {sanitize_display(name)} ' to create it.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) set_remote(name, url, root) if json_output: result: _RemoteMutationJson = { "status": "ok", "name": name, "url": url, "old_url": old_url, "old_name": None, "new_name": None, } print(json.dumps(result)) else: print( f"✅ Remote '{sanitize_display(name)}' URL updated: {sanitize_display(url)}", file=sys.stderr, ) # ── _ping_url ───────────────────────────────────────────────────────────────── def _ping_url(base_url: str, timeout: float) -> tuple[bool, int | None, str]: """Ping ``/health``. Returns ``(reachable, http_status, message)``. Only ``http`` and ``https`` schemes are accepted — any other scheme is treated as unreachable without making a network request to prevent SSRF via ``file://`` or ``ftp://`` URLs stored in config. """ scheme = urlparse(base_url).scheme.lower() if scheme not in _ALLOWED_URL_SCHEMES: return False, None, f"Unsupported URL scheme '{sanitize_display(scheme)}'" health_url = base_url.rstrip("/") + "/health" try: req = urllib.request.Request(health_url, method="GET") with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310 return True, resp.status, f"HTTP {resp.status} OK" except urllib.error.HTTPError as exc: return False, exc.code, f"HTTP {exc.code} {exc.reason}" except urllib.error.URLError as exc: return False, None, str(exc.reason) except TimeoutError: return False, None, f"timed out after {timeout}s" except OSError as exc: return False, None, str(exc) # ── status ──────────────────────────────────────────────────────────────────── def run_status(args: argparse.Namespace) -> None: """Check reachability and last-known tracking refs for a remote. This command is **read-only** — it does not fetch, write, or modify any local state. Use it to inspect a remote before running ``muse push`` or ``muse fetch``:: muse remote status origin muse remote status origin --json muse remote status origin --json --timeout 10 Validates the remote name format before any repo or filesystem access so invalid-looking names produce a clear format error rather than a misleading "does not exist" message. Pings ``/health`` (derived from the stored URL) and reports locally cached tracking data from previous fetch/push operations. The tracking data is only as current as the last ``muse fetch``. Tracking refs are collected recursively so that nested branch names like ``feat/ui`` (stored as ``remotes/origin/feat/ui``) are shown correctly. Symlinks inside the remotes directory are skipped to prevent path traversal. JSON output (``--json``) goes to stdout; all human-readable text goes to stderr. Exit codes: 0 Remote reachable and status printed. 1 Invalid remote name, or remote does not exist. 2 Not inside a Muse repository. 5 Remote is unreachable (network error, timeout, or non-2xx response). """ name: str = args.name json_output: bool = args.json_output timeout: float = args.timeout if err := _validate_remote_name(name): print(f"❌ Invalid remote name '{sanitize_display(name)}': {err}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) root = require_repo() url = get_remote(name, root) if url is None: print(f"❌ Remote '{sanitize_display(name)}' does not exist.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) # Extract the server root URL: http://host[:port]/owner/repo → http://host[:port] parsed = urlparse(url) server_root = f"{parsed.scheme}://{parsed.netloc}" reachable, http_status, message = _ping_url(server_root, timeout) # Recursively collect tracking refs — handles nested branch names like # "feat/ui" and skips symlinks (path-traversal guard). remotes_dir = root / ".muse" / "remotes" / name tracked_refs = _collect_tracked_refs(remotes_dir) if json_output: out: _RemoteStatusJson = { "remote": name, "url": url, "server_root": server_root, "reachable": reachable, "http_status": http_status, "message": message, "tracked_refs": tracked_refs, } print(json.dumps(out)) else: status_icon = "✅" if reachable else "❌" print(f"\n Remote: {sanitize_display(name)}", file=sys.stderr) print(f" URL: {sanitize_display(url)}", file=sys.stderr) print(f" Server: {sanitize_display(server_root)}", file=sys.stderr) print(f" Ping: {status_icon} {sanitize_display(message)}", file=sys.stderr) if tracked_refs: print(" Tracked refs (from last fetch/push):", file=sys.stderr) for branch, sha in sorted(tracked_refs.items()): print( f" {sanitize_display(name)}/{sanitize_display(branch):<30} {sha}", file=sys.stderr, ) else: print(" Tracked refs: (none — run 'muse fetch' first)", file=sys.stderr) print("", file=sys.stderr) if not reachable: raise SystemExit(ExitCode.REMOTE_ERROR)