"""muse auth — identity management. Muse has two primary user types: **humans** and **agents**. Both are first-class identities authenticated via Ed25519 key-pair challenge-response. This command manages the identity lifecycle: keygen, register, whoami, logout. Why not ``muse config set`` for credentials? --------------------------------------------- Credentials belong to the machine, not the repository. Storing credentials inside ``.muse/config.toml`` means they could be committed to version control, shared across repos accidentally, or tied to a single repo when the identity is global. Instead: - Credentials live in ``~/.muse/identity.toml`` (mode 0o600, never read by the snapshot engine). - ``config.toml`` records *where* the hub is (``[hub] url``), not *who you are*. - This command owns the identity lifecycle: keygen, register, whoami, logout. Authentication flow -------------------- :: # Step 1: generate an Ed25519 key pair (private key stored in ~/.muse/keys/) muse auth keygen --hub https://musehub.ai # Step 2: register the public key with the hub via challenge-response muse auth register --hub https://musehub.ai --handle alice # Inspect stored identity: muse auth whoami Security model -------------- - ``_json_post`` validates the URL scheme (``http``/``https`` only) before making any network request — prevents SSRF from a tampered hub URL. - All diagnostic messages (progress, warnings, errors) go to **stderr**. **stdout** is reserved for machine-readable output and the interactive prompt (get-url returns a bare URL; --json returns a JSON object). Subcommands ----------- :: muse auth keygen [--hub HUB] [--label LABEL] [--force] [--json] muse auth register [--hub HUB] [--handle HANDLE] [--label LABEL] [--agent] [--json] muse auth whoami [--hub HUB] [--all] [--json] muse auth logout [--hub HUB] [--all] [--json] JSON schemas ------------ ``muse auth keygen --json``:: {"status": "ok", "hub": "", "hostname": "", "public_key_b64": "", "fingerprint": "sha256:<64-hex>", "hd_path": ""} ``muse auth register --json``:: {"status": "registered"|"authenticated", "hub": "", "handle": "", "identity_type": "human"|"agent", "fingerprint": "sha256:<64-hex>", "identity_path": ""} ``muse auth whoami --json``:: {"hub": "", "type": "", "handle": "", "key_set": true, "capabilities": []} ``muse auth logout --json``:: {"status": "ok"|"nothing_to_do", "hubs": ["", ...], "count": } """ import argparse import json import logging import os import pathlib import sys import urllib.error import urllib.parse import urllib.request from typing import TypedDict from muse.cli.config import get_hub_url from muse.core.types import DEFAULT_SIGN_ALGO, blob_id from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.errors import ExitCode from muse.core.timing import start_timer from muse.core.identity import ( IdentityEntry, clear_all_identities, clear_identity, get_identity_path, hostname_from_url, list_all_identities, load_identity, save_identity, ) from muse.core.validation import sanitize_display logger = logging.getLogger(__name__) type _DerivedPaths = dict[str, str] # Module-level hooks so tests can monkeypatch without importing getpass at module load. def _isatty() -> bool: return sys.stdin.isatty() def _stderr_isatty() -> bool: return sys.stderr.isatty() def _getpass(prompt: str = "") -> str: import getpass as _gp return _gp.getpass(prompt) def _read_mnemonic_securely(fd: int | None = None) -> str: """Read a BIP39 mnemonic without exposing it in process args or shell history. Input is accepted through three channels in priority order: 1. *fd* (``--mnemonic-fd N``) — read one line from file descriptor *N*, close it immediately. Used by orchestrators that pass secrets via pipe. 2. Non-TTY stdin — read one line from ``sys.stdin``. Triggered by piped input (``echo "..." | muse auth recover``) or heredoc redirects. 3. TTY stdin — prompt via :func:`getpass.getpass` with echo disabled. The prompt and the phrase itself are never written to the terminal buffer that shell history reads. Args: fd: File descriptor number to read from, or ``None`` to use stdin / TTY. Returns: The stripped mnemonic phrase. Raises: SystemExit(1): If the fd is invalid, unreadable, or the input is empty. """ phrase: str if fd is not None: if fd < 3: print( f"❌ --mnemonic-fd {fd} is invalid: " "fd 0 (stdin), 1 (stdout), and 2 (stderr) are reserved and must not be used.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) try: with os.fdopen(fd, "r", encoding="utf-8") as fh: phrase = fh.readline().strip() except OSError as exc: print(f"muse auth: cannot read from fd {fd}: {exc}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) elif not sys.stdin.isatty(): phrase = sys.stdin.readline().strip() else: import getpass try: phrase = getpass.getpass("Enter BIP39 mnemonic: ").strip() except (KeyboardInterrupt, EOFError): print("", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if not phrase: print("muse auth: mnemonic input was empty.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) return phrase def _resolve_passphrase(args: "argparse.Namespace") -> str: """Return the BIP-39 passphrase using only safe delivery channels. Priority (highest to lowest): 1. ``--passphrase-fd N`` — read from pipe fd; never visible in ``ps``/procfs. 2. ``MUSE_BIP39_PASSPHRASE`` env var — visible to process owner in ``/proc/pid/environ`` but not world-readable. 3. Interactive ``getpass`` prompt — TTY only, no echo. 4. Empty string — standard BIP-39 behaviour (no passphrase). The passphrase is never stored — callers must supply it at every derivation. """ # 1. --passphrase-fd passphrase_fd: int | None = getattr(args, "passphrase_fd", None) if passphrase_fd is not None: if passphrase_fd < 3: print( f"❌ --passphrase-fd {passphrase_fd} is invalid: " "fd 0 (stdin), 1 (stdout), and 2 (stderr) are reserved and must not be used.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) try: raw = os.read(passphrase_fd, 4096) os.close(passphrase_fd) except OSError as exc: print(f"muse auth: cannot read passphrase from fd {passphrase_fd}: {exc}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) return raw.decode("utf-8").rstrip("\n") # 2. env var env_val = os.environ.get("MUSE_BIP39_PASSPHRASE") if env_val is not None: logger.warning( "MUSE_BIP39_PASSPHRASE is set: passphrase is visible to the process owner " "in /proc/pid/environ. Prefer --passphrase-fd N (pipe fd) for production use — " "it never appears in the process environment." ) return env_val # 3. interactive TTY prompt if _isatty(): try: return _getpass("BIP-39 passphrase (leave blank for none): ") except (KeyboardInterrupt, EOFError): print("", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) # 4. empty — non-interactive, no env var, no fd return "" # Auth endpoints on the hub (relative to the hub base URL). _CHALLENGE_PATH = "/api/auth/challenge" _VERIFY_PATH = "/api/auth/verify" # Hard cap on response size to prevent OOM from a compromised hub. _MAX_RESPONSE_BYTES = 1 * 1024 * 1024 # 1 MiB # Only allow http and https — no file://, ftp://, data://, etc. _ALLOWED_SCHEMES = frozenset({"http", "https"}) # ── TypedDicts ──────────────────────────────────────────────────────────────── class _KeygenJson(EnvelopeJson, total=False): """JSON schema for ``muse auth keygen --json`` (human key).""" status: str # "ok" hub: str # hub URL hostname: str # extracted hostname public_key_b64: str # base64url-encoded public key fingerprint: str # sha256:<64-hex> fingerprint hd_path: str # SLIP-0010 derivation path mnemonic_word_count: int # number of BIP39 mnemonic words class _AgentKeygenJson(EnvelopeJson): """JSON schema for ``muse auth keygen --agent-id --json``.""" status: str hub: str hostname: str agent_id: str public_key_b64: str fingerprint: str hd_path: str slot: int provisioned_by_fingerprint: str class _RegisterJson(EnvelopeJson): """JSON schema for ``muse auth register --json``.""" status: str # "registered" | "authenticated" hub: str # hub URL handle: str # registered username identity_id: str # hub-assigned ID identity_type: str # "human" | "agent" fingerprint: str # SHA-256 hex fingerprint of the public key token_stored: bool # always true on success identity_path: str # path to ~/.muse/identity.toml class _WhoamiJson(TypedDict, total=False): """JSON schema for a single identity entry (used in list output).""" hub: str # hostname key type: str # "human" | "agent" | "" handle: str # registered handle fingerprint: str # SHA-256 hex of the public key key_set: bool # true if an Ed25519 key is stored capabilities: list[str] provisioned_by: str # agent only: handle of the human who provisioned this key hd_path: str # HD keys only: SLIP-0010 derivation path class _WhoamiSingleJson(EnvelopeJson, total=False): """JSON schema for ``muse auth whoami --json`` (single-hub output).""" hub: str type: str handle: str fingerprint: str key_set: bool capabilities: list[str] provisioned_by: str hd_path: str class _WhoamiAllJson(EnvelopeJson): """JSON schema for ``muse auth whoami --json`` (multi-hub listing).""" identities: list[_WhoamiJson] class _ShowJson(TypedDict, total=False): """JSON schema for a single identity detail entry (used in show output).""" hub: str handle: str type: str fingerprint: str algorithm: str # "ed25519" | "ml-dsa-65" provisioned_by: str # agent only: handle of provisioning human provisioned_by_fingerprint: str # agent only: fingerprint of provisioning key hd_path: str mnemonic_word_count: int derived_paths: _DerivedPaths avax_c_chain_address: str class _ShowOutputJson(EnvelopeJson, total=False): """JSON schema for ``muse auth show --json``.""" hub: str handle: str type: str fingerprint: str algorithm: str provisioned_by: str provisioned_by_fingerprint: str hd_path: str mnemonic_word_count: int derived_paths: _DerivedPaths avax_c_chain_address: str class _LogoutJson(EnvelopeJson): """JSON schema for ``muse auth logout --json``.""" status: str # "ok" | "nothing_to_do" hubs: list[str] # hostnames logged out from count: int # number of identities removed class _RecoverJson(EnvelopeJson, total=False): """JSON schema for ``muse auth recover --json``.""" status: str hub: str hostname: str public_key_b64: str fingerprint: str hd_path: str agent_id: str class _RotateJson(EnvelopeJson): """JSON schema for ``muse auth rotate --json``.""" status: str hub: str hostname: str fingerprint: str hd_path: str rotation_index: int class _CleanupKeysJson(EnvelopeJson): """JSON schema for ``muse auth cleanup-keys --json``.""" destroyed: list[str] # absolute paths of PEM files overwritten and deleted count: int class _SecurityCheckJson(EnvelopeJson): """JSON schema for ``muse auth security-check --json``.""" mnemonic_in_keychain: bool no_pem_files: bool no_key_path_in_identity: bool fingerprint_matches_mnemonic: bool pem_files_found: list[str] key_path_entries: list[str] ok: bool class _JsonPayload(TypedDict, total=False): """Generic JSON request payload for hub auth endpoints (all fields optional str).""" fingerprint: str algorithm: str challenge_token: str public_key_b64: str signature_b64: str handle: str label: str class _ChallengeResp(TypedDict, total=False): """Parsed response from the hub challenge endpoint (snake_case canonical).""" challenge_token: str is_new_key: bool expires_in: int algorithm: str class _VerifyResp(TypedDict, total=False): """Parsed response from the hub verify/add-key endpoint (snake_case canonical).""" handle: str identity_id: str is_new_identity: bool auth_method: str # ── HTTP helpers ────────────────────────────────────────────────────────────── def _hub_base_url(hub_url: str) -> str: """Extract the base URL (scheme + host + port) from a full hub URL. Examples:: "https://musehub.ai/gabriel/muse" → "https://musehub.ai" "https://localhost:1337" → "https://localhost:1337" Raises: SystemExit: If the URL scheme is not ``http`` or ``https``. """ parsed = urllib.parse.urlparse(hub_url) if parsed.scheme.lower() not in _ALLOWED_SCHEMES: print( f"❌ Hub URL scheme '{sanitize_display(parsed.scheme)}' is not allowed. " f"Use http or https.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) port_str = f":{parsed.port}" if parsed.port else "" return f"{parsed.scheme}://{parsed.hostname}{port_str}" def _json_post_raw( base_url: str, path: str, payload: _JsonPayload, extra_headers: "dict[str, str] | None" = None, ) -> _JsonPayload: """POST *payload* as JSON and return the raw parsed response dict. Private implementation — call :func:`_post_challenge` or :func:`_post_verify` instead. Validates the URL scheme before any network I/O to prevent SSRF. Args: base_url: Hub base URL (e.g. ``"https://musehub.ai"``). path: Endpoint path (e.g. ``"/api/auth/challenge"``). payload: Dict to serialise as the JSON body (``None`` values omitted). extra_headers: Optional additional HTTP headers (e.g. ``Authorization`` for MSign-authenticated endpoints). Returns: Parsed JSON response body as a dict. Raises: SystemExit: On invalid URL scheme, HTTP error, or network failure. """ scheme = urllib.parse.urlparse(base_url).scheme.lower() if scheme not in _ALLOWED_SCHEMES: print( f"❌ Hub URL scheme '{sanitize_display(scheme)}' is not allowed. " f"Use http or https.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) url = f"{base_url.rstrip('/')}{path}" clean_payload = {k: v for k, v in payload.items() if v is not None} body_bytes = json.dumps(clean_payload).encode("utf-8") headers: dict[str, str] = { "Content-Type": "application/json", "Accept": "application/json", } if extra_headers: headers.update(extra_headers) req = urllib.request.Request( url=url, data=body_bytes, headers=headers, method="POST", ) # For localhost HTTPS use the bundled self-signed CA cert so that # muse auth register works against the local dev hub without requiring # the cert to be installed system-wide. ssl_ctx: "ssl.SSLContext | None" = None parsed_url = urllib.parse.urlparse(base_url) if parsed_url.scheme == "https" and parsed_url.hostname in ("localhost", "127.0.0.1"): import ssl as _ssl from muse.core.transport import _mkcert_ca ca = _mkcert_ca() if ca is not None: ssl_ctx = _ssl.create_default_context(cafile=str(ca)) try: with urllib.request.urlopen(req, timeout=30, context=ssl_ctx) as resp: # noqa: S310 raw: bytes = resp.read(_MAX_RESPONSE_BYTES + 1) except urllib.error.HTTPError as exc: try: err_body = exc.read().decode("utf-8", errors="replace")[:400] except Exception: # noqa: BLE001 err_body = "" print( f"❌ HTTP {exc.code} from {sanitize_display(url)}: {sanitize_display(err_body)}", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) from exc except urllib.error.URLError as exc: print( f"❌ Network error contacting {sanitize_display(url)}: " f"{sanitize_display(str(exc.reason))}", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) from exc if len(raw) > _MAX_RESPONSE_BYTES: print(f"❌ Response from {sanitize_display(url)} exceeds size limit.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) parsed = json.loads(raw) if not isinstance(parsed, dict): print(f"❌ Unexpected response shape from {sanitize_display(url)}.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) return parsed def _hub_delete(url: str, auth_header: str, ssl_ctx: "object | None" = None) -> None: """Send a signed DELETE request to a hub endpoint. Raises ``urllib.error.HTTPError`` / ``urllib.error.URLError`` on failure. Extracted so tests can spy on ``muse.cli.commands.auth._hub_delete`` instead of reaching into ``urllib.request``. """ req = urllib.request.Request( url, headers={"Authorization": auth_header, "Accept": "application/json"}, method="DELETE", ) with urllib.request.urlopen(req, timeout=10, context=ssl_ctx) as _resp: # noqa: S310 pass def _post_challenge(base_url: str, payload: _JsonPayload) -> _ChallengeResp: """POST to the challenge endpoint and return a typed response.""" raw = _json_post_raw(base_url, _CHALLENGE_PATH, payload) return _ChallengeResp( challenge_token=str(raw.get("challenge_token") or ""), is_new_key=bool(raw.get("is_new_key")), expires_in=int(raw.get("expires_in") or 300), algorithm=str(raw.get("algorithm") or ""), ) def _post_verify(base_url: str, payload: _JsonPayload) -> _VerifyResp: """POST to the verify endpoint and return a typed response.""" raw = _json_post_raw(base_url, _VERIFY_PATH, payload) return _VerifyResp( handle=str(raw.get("handle") or ""), identity_id=str(raw.get("identity_id") or ""), is_new_identity=bool(raw.get("is_new_identity")), auth_method=str(raw.get("auth_method") or ""), ) # ── Helpers ─────────────────────────────────────────────────────────────────── def _resolve_hub(hub_opt: str | None, repo_root: pathlib.Path | None = None) -> str | None: """Return the hub URL: explicit option → repo config → None.""" if hub_opt: return hub_opt return get_hub_url(repo_root) def _display_entry(hostname: str, entry: IdentityEntry, *, json_output: bool, elapsed: "Callable[[], float] | None" = None) -> None: """Print an identity entry. JSON → stdout; human-readable → stderr.""" from muse.core.timing import start_timer if elapsed is None: elapsed = start_timer() itype = entry.get("type") or "" handle = entry.get("handle") or "" fingerprint = entry.get("fingerprint") or "" caps = list(entry.get("capabilities") or []) provisioned_by = entry.get("provisioned_by") or "" hd_path = entry.get("hd_path") or "" if json_output: out: _WhoamiSingleJson = _WhoamiSingleJson( **make_envelope(elapsed), hub=hostname, type=itype, handle=handle, fingerprint=fingerprint, key_set=bool(hd_path), capabilities=caps, ) if provisioned_by: out["provisioned_by"] = provisioned_by if hd_path: out["hd_path"] = hd_path print(json.dumps(out)) else: print("", file=sys.stderr) print(f" Hub: {sanitize_display(hostname)}", file=sys.stderr) print(f" Type: {itype or 'unknown'}", file=sys.stderr) print(f" Handle: {sanitize_display(handle) or '—'}", file=sys.stderr) print(f" Fingerprint: {fingerprint or '—'}", file=sys.stderr) print(f" Key: {'keychain' if hd_path else 'not set — run muse auth keygen'}", file=sys.stderr) if hd_path: print(f" HD path: {hd_path}", file=sys.stderr) if caps: print(f" Caps: {' '.join(caps)}", file=sys.stderr) print("", file=sys.stderr) # ── register ────────────────────────────────────────────────────────────────── def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None: """Register the ``muse auth`` subcommand tree and all its flags. Every subcommand accepts ``--json`` for machine-readable output on stdout. All progress and diagnostic messages go to stderr. """ parser = subparsers.add_parser( "auth", help="Identity management.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) subs = parser.add_subparsers(dest="subcommand", metavar="SUBCOMMAND") subs.required = True # ── keygen ─────────────────────────────────────────────────────────────── keygen_p = subs.add_parser( "keygen", help="Generate a new Ed25519 keypair for public-key authentication.", description=( "Generates a fresh Ed25519 keypair and stores the private key at\n" "~/.muse/keys/{hostname}.pem (mode 0o600).\n" "The public key fingerprint is printed to stderr for verification.\n" "Run 'muse auth register' afterward to register the key with the hub." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) keygen_p.add_argument("--hub", default=None, metavar="URL", help="Hub URL (e.g. https://musehub.ai). Falls back to [hub] url in config.toml.") keygen_p.add_argument("--agent-id", default=None, metavar="AGENT_ID", dest="agent_id", help=( "Generate a dedicated keypair for this agent handle " "(stored at ~/.muse/keys/{hostname}__{agent_id}.pem). " "Omit for the human (operator) key." )) keygen_p.add_argument("--label", default=None, metavar="LABEL", help='Friendly key label, e.g. "MacBook Pro" or "CI agent". Stored locally.') keygen_p.add_argument("--force", action="store_true", help="Overwrite an existing identity entry for this hub without prompting.") keygen_p.add_argument("--destroy-mnemonic", action="store_true", dest="destroy_mnemonic", help=( "Generate fresh entropy and overwrite the existing keychain mnemonic. " "IRREVERSIBLE: all previously derived keys become unrecoverable. " "Requires --force." )) keygen_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout on success.") keygen_p.add_argument( "--strength", type=int, default=256, dest="hd_strength", metavar="BITS", help=( "BIP39 entropy strength in bits: 128 (12 words), 160 (15), " "192 (18), 224 (21), or 256 (24 words, default)." ), ) keygen_p.add_argument( "--language", default="english", dest="hd_language", metavar="LANG", help=( "BIP39 wordlist language for the generated mnemonic " "(e.g. english, spanish, japanese). Default: english." ), ) keygen_p.add_argument( "--passphrase-fd", type=int, default=None, dest="passphrase_fd", metavar="N", help=( "Read BIP-39 extension passphrase ('25th word') from file descriptor N. " "Never stored; supply at every derivation. " "Falls back to MUSE_BIP39_PASSPHRASE env var, then interactive prompt on a TTY." ), ) keygen_p.set_defaults(func=run_keygen) # ── recover ────────────────────────────────────────────────────────────── recover_p = subs.add_parser( "recover", help="Re-derive and overwrite key(s) from a BIP39 mnemonic.", description=( "Re-derives the Ed25519 identity key from a BIP39 mnemonic and\n" "overwrites the local PEM file. Use this to restore keys after\n" "losing the PEM file (e.g. new machine, disk failure).\n\n" "The resulting fingerprint will match the original registration\n" "exactly — no re-registration is needed." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) recover_p.add_argument("--hub", default=None, metavar="URL", help="Hub URL. Falls back to [hub] url in config.toml.") recover_p.add_argument("--mnemonic-fd", type=int, default=None, metavar="N", dest="mnemonic_fd", help="Read BIP39 mnemonic from file descriptor N (for scripted use). " "If omitted: reads from stdin if piped, or prompts interactively.") recover_p.add_argument("--agent-id", default=None, metavar="AGENT_ID", dest="agent_id", help="Recover an agent key (derives from the operator mnemonic at the agent's slot).") recover_p.add_argument("--force", action="store_true", help="Overwrite an existing PEM without prompting.") recover_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout on success.") recover_p.add_argument( "--passphrase-fd", type=int, default=None, dest="passphrase_fd", metavar="N", help=( "Read BIP-39 extension passphrase from file descriptor N. " "Must match the one used at keygen. " "Falls back to MUSE_BIP39_PASSPHRASE env var, then interactive prompt on a TTY." ), ) recover_p.set_defaults(func=run_recover) # ── rotate ─────────────────────────────────────────────────────────────── rotate_p = subs.add_parser( "rotate", help="Rotate the identity key to the next HD derivation index.", description=( "Derives a new Ed25519 identity key at rotation_index + 1,\n" "overwrites the local PEM, and updates identity.toml.\n\n" "After rotation, re-register the new fingerprint with the hub:\n" " muse auth register --hub --handle " ), formatter_class=argparse.RawDescriptionHelpFormatter, ) rotate_p.add_argument("--hub", default=None, metavar="URL", help="Hub URL. Falls back to [hub] url in config.toml.") rotate_p.add_argument("--mnemonic-fd", type=int, default=None, metavar="N", dest="mnemonic_fd", help="Read BIP39 mnemonic from file descriptor N. " "If omitted: reads from stdin if piped, else prompts.") rotate_p.add_argument( "--passphrase-fd", type=int, default=None, dest="passphrase_fd", metavar="N", help="Read BIP-39 extension passphrase from file descriptor N. " "Must match the one used at keygen. " "Falls back to MUSE_BIP39_PASSPHRASE env var, then interactive prompt on a TTY.", ) rotate_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout on success.") rotate_p.set_defaults(func=run_rotate) # ── register ───────────────────────────────────────────────────────────── register_p = subs.add_parser( "register", help="Register the local Ed25519 key with a MuseHub instance (challenge-response).", description=( "Performs the Ed25519 challenge-response flow with the hub:\n" " 1. POST /api/auth/challenge (fingerprint → nonce)\n" " 2. Sign the nonce with the local private key\n" " 3. POST /api/auth/verify (signed nonce → session token)\n\n" "The resulting session token is saved to ~/.muse/identity.toml (0o600).\n" "Use this command for initial registration AND to refresh an expired token." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) register_p.add_argument("--hub", default=None, metavar="URL", help="Hub URL (e.g. https://musehub.ai). Falls back to [hub] url in config.toml.") register_p.add_argument("--handle", default=None, metavar="HANDLE", help=( "Desired username (required for first-time registration; " "ignored if the key is already registered)." )) register_p.add_argument("--label", default=None, metavar="LABEL", help='Friendly key label, e.g. "MacBook Pro" or "CI agent".') register_p.add_argument("--agent", action="store_true", help="Mark this identity as an agent (default: human).") register_p.add_argument("--agent-id", default=None, metavar="AGENT_ID", dest="agent_id", help=( "Agent handle whose dedicated key should be registered. " "Loads ~/.muse/keys/{hostname}__{agent_id}.pem and stores " "the identity under the compound key 'hostname#agent_id'. " "Implies --agent." )) register_p.add_argument("--provisioned-by", default=None, metavar="HANDLE", dest="provisioned_by", help=( "Handle of the human who is provisioning this agent key. " "Recorded in identity.toml as the trust-chain root. " "Required when --agent-id is used." )) register_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout on success.") register_p.set_defaults(func=run_register) # ── whoami ──────────────────────────────────────────────────────────────── whoami_p = subs.add_parser( "whoami", help="Show the current identity stored in ~/.muse/identity.toml.", description=( "Print the identity stored in ~/.muse/identity.toml for a hub.\n" "Exits non-zero when no identity is stored — useful for agent branching:\n\n" " muse auth whoami --json || muse auth register --hub --handle --agent\n\n" "With --all --json, emits a single JSON array (one object per hub):\n\n" " muse auth whoami --all --json | jq '.[] | select(.type == \"agent\")'" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) whoami_p.add_argument("--hub", default=None, metavar="URL", help="Hub URL to inspect. Defaults to the repo's configured hub.") whoami_p.add_argument("--all", "-a", action="store_true", dest="all_hubs", help="Show identities for all configured hubs.") whoami_p.add_argument( "--type", default=None, metavar="TYPE", dest="identity_type", help=( 'Filter by identity type: "human" or "agent". ' 'Only valid with --all. ' 'Exits non-zero when no matching identity is found.' ), ) whoami_p.add_argument("--json", "-j", action="store_true", dest="json_out", help="Emit JSON to stdout. With --all, emits a JSON array.") whoami_p.set_defaults(func=run_whoami) # ── logout ──────────────────────────────────────────────────────────────── logout_p = subs.add_parser( "logout", help="Remove stored credentials for a hub.", description=( "Remove the signing identity stored in ~/.muse/identity.toml for a hub.\n" "Operation is idempotent — logging out when no identity is stored exits 0.\n\n" "Agent quickstart:\n" " muse auth logout --json # single hub, JSON result\n" " muse auth logout --all --json # remove all hubs, sorted list\n\n" "JSON output shape (stdout):\n" " {\"status\": \"ok\" | \"nothing_to_do\",\n" " \"hub\": \"\", # single-hub only\n" " \"hubs\": [\"\", ...], # --all only (sorted)\n" " \"count\": } # --all only\n\n" "Exit codes: 0 success (incl. nothing to do), 1 bad arguments, 3 internal error." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) logout_p.add_argument("--hub", default=None, metavar="URL", help="Hub URL to log out from. Defaults to the repo's configured hub.") logout_p.add_argument("--all", "-a", action="store_true", dest="all_hubs", help="Remove credentials for ALL configured hubs.") logout_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout on completion.") logout_p.set_defaults(func=run_logout) # ── show ────────────────────────────────────────────────────────────────── show_p = subs.add_parser( "show", help="Display full identity details including HD derivation paths and AVAX address.", description=( "Print detailed identity information stored in ~/.muse/identity.toml.\n" "For HD identities this includes:\n" " - Mnemonic word count\n" " - All six-level Muse derivation paths (MSign, MPay, AVAX …)\n" " - AVAX C-Chain address derived from the BIP39 mnemonic\n\n" "JSON output (--json):\n" " {\"hub\": \"\", \"handle\": \"\",\n" " \"mnemonic_word_count\": 12,\n" " \"derived_paths\": {\"identity_msign\": \"m/…\", …},\n" " \"avax_c_chain_address\": \"0x…\"}" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) show_p.add_argument("--hub", default=None, metavar="URL", help="Hub URL to inspect. Defaults to the repo's configured hub.") show_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout.") show_p.set_defaults(func=run_show) # ── cleanup-keys ────────────────────────────────────────────────────────── cleanup_p = subs.add_parser( "cleanup-keys", help="Securely overwrite and delete stale PEM files from ~/.muse/keys/.", description=( "Overwrite every *.pem file in ~/.muse/keys/ with random bytes, then\n" "delete it. Keys are now derived from the mnemonic in the OS keychain\n" "at sign time — PEM files are vestigial and represent unprotected key\n" "material. This command removes them permanently.\n\n" "WARNING: ensure your mnemonic is safely stored in the OS keychain\n" "(run `muse auth security-check`) before running this command.\n\n" "JSON output (--json):\n" " {\"destroyed\": [\"\", ...], \"count\": }\n\n" "Exit codes: 0 success, 3 I/O error." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) cleanup_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout.") cleanup_p.set_defaults(func=run_cleanup_keys) # ── security-check ──────────────────────────────────────────────────────── seccheck_p = subs.add_parser( "security-check", help="Verify the local identity is in a clean, PEM-free state.", description=( "Run a set of security invariant checks against the local identity:\n" " 1. Mnemonic is present in the OS keychain\n" " 2. No PEM files exist in ~/.muse/keys/\n" " 3. identity.toml has no key_path fields\n" " 4. Fingerprint in identity.toml matches mnemonic derivation\n\n" "Exits 0 when all checks pass. Exits 1 if any check fails.\n\n" "JSON output (--json):\n" " {\"ok\": true|false, \"mnemonic_in_keychain\": true|false, ...}" ), formatter_class=argparse.RawDescriptionHelpFormatter, ) seccheck_p.add_argument("--hub", default=None, metavar="URL", help="Hub to check fingerprint against. Defaults to the repo's configured hub.") seccheck_p.add_argument("--json", "-j", action="store_true", dest="json_out", default=False, help="Emit a JSON object to stdout.") seccheck_p.set_defaults(func=run_security_check) # ── keygen ──────────────────────────────────────────────────────────────────── def run_keygen(args: argparse.Namespace) -> None: """Generate a new Ed25519 keypair for this hub (or for a specific agent). All keys are derived from a BIP39 mnemonic via SLIP-0010 Ed25519 HD derivation (no JBOK mode). Human keygen produces a fresh 24-word mnemonic and prints it **once** to stderr. Agent keygen (``--agent-id``) derives a sub-seed from the operator's existing mnemonic — no new mnemonic is generated. Agent quickstart ---------------- :: muse auth keygen --hub https://localhost:1337 --json muse auth keygen --hub https://localhost:1337 --agent-id mybot --json JSON fields ----------- status ``"ok"`` on success. hub Hub URL used. hostname Hostname extracted from the hub URL. public_key_b64 Base64url-encoded 32-byte Ed25519 public key. fingerprint ``sha256:<64-hex>`` fingerprint of the public key. hd_path SLIP-0010 derivation path string. mnemonic_word_count Number of BIP39 mnemonic words (human keys only). Exit codes ---------- 0 Keypair generated successfully. 1 No hub URL, existing key without ``--force``, derivation error. """ elapsed = start_timer() from muse.core.keypair import derive_hd_public_info from muse.core.bip39 import ( generate_mnemonic, mnemonic_to_seed, word_count, Bip39Error, _WORDS_FOR_STRENGTH, SUPPORTED_LANGUAGES, ) from muse.core.hdkeys import ( muse_path, agent_id_to_slot, derive_agent_sub_seed, DOMAIN_IDENTITY, ENTITY_HUMAN, ENTITY_AGENT, ROLE_SIGN, ) hub: str | None = args.hub agent_id: str | None = getattr(args, "agent_id", None) label: str | None = args.label force: bool = args.force destroy_mnemonic: bool = getattr(args, "destroy_mnemonic", False) json_out: bool = args.json_out hd_strength: int = getattr(args, "hd_strength", 256) hd_language: str = getattr(args, "hd_language", "english") passphrase: str = _resolve_passphrase(args) hub_url = _resolve_hub(hub) if hub_url is None: print( "❌ No hub URL provided.\n" " Pass --hub , or first run: muse hub connect ", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) hub_url = _hub_base_url(hub_url) hostname = hostname_from_url(hub_url) # Guard: reject if an identity already exists for this hub (human or agent) unless --force. _existing_entry = load_identity(hub_url, agent_id=agent_id) if _existing_entry is not None and not force: subject = f"{hostname}/{agent_id}" if agent_id else hostname print( f"❌ An identity already exists for {subject}.\n" " Pass --force to overwrite it.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if agent_id: # ── Agent key: derived from operator's mnemonic ─────────────────────── operator_entry = load_identity(hub_url) if operator_entry is None: print( "❌ No operator identity found for this hub.\n" f" Run 'muse auth keygen --hub {hub_url}' first to establish the operator key.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) operator_mnemonic = operator_entry.get("mnemonic") if not operator_mnemonic: print( "❌ Operator mnemonic is not available.\n" " Agent keys are derived from the operator's BIP39 mnemonic.\n" " Ensure the mnemonic is stored in the OS keychain, or re-key with:\n" f" muse auth keygen --hub {hub_url} --force", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) operator_seed = mnemonic_to_seed(operator_mnemonic, passphrase) slot = agent_id_to_slot(agent_id) agent_sub_seed = derive_agent_sub_seed(operator_seed, DOMAIN_IDENTITY, slot) pub_b64, fingerprint = derive_hd_public_info(agent_sub_seed) hd_path_str = muse_path(DOMAIN_IDENTITY, ENTITY_AGENT, slot) provisional_entry: IdentityEntry = { "type": "agent", "handle": "", "algorithm": DEFAULT_SIGN_ALGO, "fingerprint": fingerprint, "hd_path": hd_path_str, "provisioned_by_fingerprint": operator_entry["fingerprint"], } try: save_identity(hub_url, provisional_entry, agent_id=agent_id) except OSError as exc: print(f"❌ Could not persist agent identity: {exc}", file=sys.stderr) raise SystemExit(ExitCode.INTERNAL_ERROR) from exc if json_out: print(json.dumps(_AgentKeygenJson( **make_envelope(elapsed), status="ok", hub=hub_url, hostname=hostname, agent_id=agent_id, public_key_b64=pub_b64, fingerprint=fingerprint, hd_path=hd_path_str, slot=slot, provisioned_by_fingerprint=operator_entry["fingerprint"], ))) else: register_flags = ( f"--hub {hub_url} --agent-id {agent_id} " f"--handle {agent_id} --provisioned-by " ) print( f"✅ Agent Ed25519 key derived (agent: {agent_id})\n" f" Public key (b64url): {pub_b64}\n" f" Fingerprint (SHA-256): {fingerprint}\n" f" HD path: {hd_path_str} (slot {slot})\n" f" Derived from operator: {operator_entry['fingerprint'][:16]}…\n\n" f" Next step: muse auth register {register_flags}", file=sys.stderr, ) return # ── Human key: generate fresh mnemonic ──────────────────────────────────── if hd_strength not in _WORDS_FOR_STRENGTH: print( f"❌ Unsupported --strength {hd_strength}. " f"Must be one of {sorted(_WORDS_FOR_STRENGTH)}.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if hd_language not in SUPPORTED_LANGUAGES: print( f"❌ Unsupported --language {sanitize_display(hd_language)!r}. " f"Supported: {sorted(SUPPORTED_LANGUAGES)}.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) # Reuse the existing keychain mnemonic when available. # Generating fresh entropy irreversibly destroys the old mnemonic and all # keys derived from it — requires --destroy-mnemonic to be explicit. from muse.core.keychain import load as kc_load, is_available as kc_avail existing_mnemonic: str | None = kc_load() if kc_avail() else None if existing_mnemonic and destroy_mnemonic and not force: print( "❌ --destroy-mnemonic requires --force.\n" " This is a double-confirmation requirement — both flags must be\n" " present to overwrite the existing mnemonic and identity.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if existing_mnemonic and destroy_mnemonic: mnemonic_to_generate = True elif existing_mnemonic: mnemonic_to_generate = False else: mnemonic_to_generate = True # no existing mnemonic — always generate if mnemonic_to_generate: try: mnemonic = generate_mnemonic(strength=hd_strength, language=hd_language) except Bip39Error as exc: print(f"❌ Mnemonic generation failed: {exc}", file=sys.stderr) raise SystemExit(ExitCode.INTERNAL_ERROR) else: mnemonic = existing_mnemonic # type: ignore[assignment] seed = mnemonic_to_seed(mnemonic, passphrase) pub_b64, fingerprint = derive_hd_public_info(seed) hd_path_str = muse_path(DOMAIN_IDENTITY, ENTITY_HUMAN) n_words = len(mnemonic.strip().split()) # Preserve the registered handle from an existing entry so that --force # re-keying (e.g. key rotation) does not silently clear the username and # break subsequent MSign authentication. _existing_handle = (_existing_entry or {}).get("handle") or "" provisional_entry: IdentityEntry = { "type": "human", "handle": _existing_handle, "algorithm": DEFAULT_SIGN_ALGO, "fingerprint": fingerprint, "hd_path": hd_path_str, } try: save_identity(hub_url, provisional_entry, mnemonic=mnemonic) except OSError as exc: print(f"❌ Could not persist HD provenance: {exc}", file=sys.stderr) raise SystemExit(ExitCode.INTERNAL_ERROR) from exc # Never print the mnemonic to the terminal — it would persist in scrollback # and screen recordings. The mnemonic is stored in the OS keychain; users # retrieve it on demand via the platform keychain CLI and pipe directly to # their password manager, so it never touches the terminal display at all. action = "generated and stored" if mnemonic_to_generate else "already stored" print( f"\n✅ {n_words}-word BIP-39 mnemonic {action} in your OS keychain.\n" " Back it up now — pipe directly to your password manager so it\n" " never appears in your terminal scrollback:\n\n" " macOS: security find-generic-password -s muse -a mnemonic -w | pbcopy\n" " Linux: secret-tool lookup service muse account mnemonic | xclip -selection clipboard\n", file=sys.stderr, ) if json_out: print(json.dumps(_KeygenJson( **make_envelope(elapsed), status="ok", hub=hub_url, hostname=hostname, public_key_b64=pub_b64, fingerprint=fingerprint, hd_path=hd_path_str, mnemonic_word_count=n_words, ))) else: label_note = f" (label: {label!r})" if label else "" print( f"✅ Ed25519 keypair generated{label_note}\n" f" Public key (b64url): {pub_b64}\n" f" Fingerprint (SHA-256): {fingerprint}\n" f" HD path: {hd_path_str}\n" f" Mnemonic: {n_words} words ({hd_language})\n\n" f" Next step: muse auth register --hub {hub_url} --handle ", file=sys.stderr, ) # ── recover ─────────────────────────────────────────────────────────────────── def run_recover(args: argparse.Namespace) -> None: """Re-derive the Ed25519 identity from a BIP39 mnemonic and store it in the keychain. The resulting fingerprint is identical to the original — no re-registration with the hub is needed. No PEM file is written; the key is derived on demand from the mnemonic stored in the OS keychain. Agent quickstart ---------------- :: muse auth recover --hub https://localhost:1337 --json muse auth recover --hub https://localhost:1337 --agent-id mybot --json JSON fields ----------- status ``"ok"`` on success. hub Hub URL used. hostname Hostname extracted from the hub URL. fingerprint SHA-256 hex fingerprint (matches original registration). hd_path SLIP-0010 derivation path string. Exit codes ---------- 0 Key recovered successfully. 1 Invalid mnemonic, no hub URL, or existing entry without ``--force``. 3 I/O error writing identity.toml. """ elapsed = start_timer() from muse.core.keypair import derive_hd_public_info from muse.core.bip39 import validate_mnemonic, mnemonic_to_seed from muse.core.hdkeys import ( muse_path, agent_id_to_slot, derive_agent_sub_seed, DOMAIN_IDENTITY, ENTITY_HUMAN, ENTITY_AGENT, ) from muse.core.keychain import store as kc_store hub: str | None = args.hub mnemonic_fd: int | None = getattr(args, "mnemonic_fd", None) mnemonic: str = _read_mnemonic_securely(fd=mnemonic_fd) agent_id: str | None = getattr(args, "agent_id", None) force: bool = args.force json_out: bool = args.json_out passphrase: str = _resolve_passphrase(args) hub_url = _resolve_hub(hub) if hub_url is None: print( "❌ No hub URL provided.\n" " Pass --hub , or first run: muse hub connect ", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if not validate_mnemonic(mnemonic): print("❌ Invalid mnemonic: phrase did not pass BIP39 validation.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) hostname = hostname_from_url(hub_url) existing = load_identity(hub_url, agent_id) if existing is not None and not force: subject = f"{sanitize_display(hostname)}#{sanitize_display(agent_id)}" if agent_id else sanitize_display(hostname) print( f"⚠️ An identity already exists for {subject}.\n" " Pass --force to overwrite it.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) operator_seed = mnemonic_to_seed(mnemonic, passphrase) if agent_id: slot = agent_id_to_slot(agent_id) agent_sub_seed = derive_agent_sub_seed(operator_seed, DOMAIN_IDENTITY, slot) pub_b64, fingerprint = derive_hd_public_info(agent_sub_seed) agent_sub_seed.zero() hd_path_str = muse_path(DOMAIN_IDENTITY, ENTITY_AGENT, slot) else: pub_b64, fingerprint = derive_hd_public_info(operator_seed) hd_path_str = muse_path(DOMAIN_IDENTITY, ENTITY_HUMAN) operator_seed[:] = b"\x00" * len(operator_seed) handle = existing.get("handle", "") if existing else "" entry: IdentityEntry = { "type": "agent" if agent_id else "human", "handle": handle, "algorithm": DEFAULT_SIGN_ALGO, "fingerprint": fingerprint, "hd_path": hd_path_str, } try: save_identity( hub_url, entry, agent_id=agent_id, mnemonic=mnemonic if not agent_id else None, ) except OSError as exc: print(f"❌ Could not persist identity: {exc}", file=sys.stderr) raise SystemExit(ExitCode.INTERNAL_ERROR) from exc # Store the mnemonic in the OS keychain for future key derivations. if not agent_id: kc_store(mnemonic) if json_out: out: _RecoverJson = _RecoverJson( **make_envelope(elapsed), status="ok", hub=hub_url, hostname=hostname, public_key_b64=pub_b64, fingerprint=fingerprint, hd_path=hd_path_str, ) if agent_id: out["agent_id"] = agent_id print(json.dumps(out)) else: subject = f"agent '{agent_id}'" if agent_id else "human" print( f"✅ Key recovered for {subject}\n" f" Fingerprint (SHA-256): {fingerprint}\n" f" HD path: {hd_path_str}", file=sys.stderr, ) # ── rotate ──────────────────────────────────────────────────────────────────── def _parse_rotation_index(hd_path: str) -> int: """Return the rotation index (6th path component) from an hd_path string. E.g. ``"m/1075233755'/0'/0'/0'/0'/2'"`` → ``2``. Returns ``0`` if the path cannot be parsed (safe default — index 0 is current). """ try: parts = hd_path.rstrip("/").split("/") # parts[0] = "m", parts[1..6] = the six levels return int(parts[-1].rstrip("'")) except (IndexError, ValueError): return 0 def _compute_key_id(identity_id: str, public_key_b64: str) -> str: """Return the hub key_id for a registered key. Mirrors musehub.core.genesis.compute_key_id: key_id = sha256(identity_id NUL public_key_b64) The NUL separator prevents collision between an identity_id that ends with a public_key_b64 prefix. """ payload = f"{identity_id}\x00{public_key_b64}".encode("utf-8") return blob_id(payload) def run_rotate(args: argparse.Namespace) -> None: """Rotate the Ed25519 identity key to the next HD derivation index. The full atomic sequence: 1. Derive the new key at index+1 from the OS keychain mnemonic. 2. Register the new key with the hub via challenge-response (using the old key's signing identity so the request is authenticated). 3. Deregister the old key from the hub via ``DELETE /api/auth/keys/{handle}/{key_id}`` (signed with the old key before identity.toml is updated — the old key is still valid at this point because the hub now has both). 4. Update identity.toml with the new key. If hub registration fails (step 2), identity.toml is NOT updated — local and remote stay in sync. No PEM file is written. Exit codes ---------- 0 Success. 1 No existing identity, no mnemonic in keychain, or hub registration fails. """ elapsed = start_timer() from muse.core.keypair import derive_hd_public_info from muse.core.bip39 import mnemonic_to_seed from muse.core.hdkeys import ( muse_path, DOMAIN_IDENTITY, ENTITY_HUMAN, ROLE_SIGN, ) from muse.core.keychain import load as kc_load, is_available as kc_avail hub: str | None = args.hub json_out: bool = args.json_out passphrase: str = _resolve_passphrase(args) hub_url = _resolve_hub(hub) if hub_url is None: print( "❌ No hub URL provided.\n" " Pass --hub , or first run: muse hub connect ", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) hostname = hostname_from_url(hub_url) # Must have an existing identity to rotate from. existing = load_identity(hub_url) if existing is None: print( f"❌ No identity found for {sanitize_display(hub_url)}.\n" " Run 'muse auth keygen' first to establish an identity before rotating.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) # Read mnemonic from the OS keychain — no stdin prompt needed. mnemonic: str | None = kc_load() if kc_avail() else None if not mnemonic: print( "❌ No mnemonic found in OS keychain.\n" " Run 'muse auth recover' to restore the mnemonic first.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) current_hd_path = existing.get("hd_path") or muse_path(DOMAIN_IDENTITY) current_index = _parse_rotation_index(current_hd_path) new_index = current_index + 1 seed = mnemonic_to_seed(mnemonic, passphrase) # Derive the OLD key (needed to compute old_key_id and to sign the DELETE). old_pub_b64, old_fingerprint = derive_hd_public_info(seed, rotation_index=current_index) # old fingerprint == sha256(old_public_key_bytes) == the hub's identity_id for this user. old_identity_id = old_fingerprint # sha256(pub_key_bytes) ≡ fingerprint old_key_id = _compute_key_id(old_identity_id, old_pub_b64) new_hd_path = muse_path(DOMAIN_IDENTITY, ENTITY_HUMAN, role=ROLE_SIGN, index=new_index) new_pub_b64, new_fingerprint = derive_hd_public_info(seed, rotation_index=new_index) seed[:] = b"\x00" * len(seed) handle = existing.get("handle", "") # ── Step 2: register the new key with the hub ───────────────────────────── # Uses POST /api/auth/keys (MSign-authenticated with the OLD key to prove # account ownership + challenge-response with the NEW key to prove new key # ownership). This is the correct rotation endpoint — POST /api/auth/verify # only works for fresh registrations. print(f" → Registering new key with {sanitize_display(hub_url)} …", file=sys.stderr) from muse.core.msign import build_msign_header as _build_msign from muse.core.transport import SigningIdentity as _SigningIdentity from muse.core.hdkeys import derive_identity_key as _derive_key from muse.core.keypair import sign_bytes as _sign_bytes from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey as _Ed25519PrivKey # Derive both keys from the mnemonic once each; zero seeds immediately. old_seed = mnemonic_to_seed(mnemonic, passphrase) old_dk = _derive_key(old_seed, index=current_index) old_seed[:] = b"\x00" * len(old_seed) old_priv_key = _Ed25519PrivKey.from_private_bytes(old_dk.private_bytes) old_dk.zero() new_seed = mnemonic_to_seed(mnemonic, passphrase) new_dk = _derive_key(new_seed, index=new_index) new_seed[:] = b"\x00" * len(new_seed) new_priv_key = _Ed25519PrivKey.from_private_bytes(new_dk.private_bytes) new_dk.zero() # Get a challenge nonce for the new key's fingerprint. try: challenge_resp = _post_challenge(hub_url, { "fingerprint": new_fingerprint, "algorithm": DEFAULT_SIGN_ALGO, }) except SystemExit: print("❌ Hub registration failed — identity.toml unchanged.", file=sys.stderr) raise token = challenge_resp.get("challenge_token", "") # Hub issues hex nonces; sign the raw bytes. try: nonce_bytes = bytes.fromhex(token) except ValueError: nonce_bytes = token.encode("utf-8") new_key_sig = _sign_bytes(new_priv_key, nonce_bytes) # POST /api/auth/keys — MSign with old key proves account ownership; # new key signature in body proves new key ownership. parsed_url = urllib.parse.urlparse(hub_url) server_root = f"{parsed_url.scheme}://{parsed_url.netloc}" add_key_path = "/api/auth/keys" add_key_url = f"{server_root}{add_key_path}" ssl_ctx: "object | None" = None if parsed_url.hostname in ("localhost", "127.0.0.1"): import ssl as _ssl from muse.core.transport import _mkcert_ca ca = _mkcert_ca() if ca is not None: ssl_ctx = _ssl.create_default_context(cafile=str(ca)) # MSign must cover the actual request body — precompute the bytes before signing. add_key_payload = { "challenge_token": token, "public_key_b64": new_pub_b64, "signature_b64": new_key_sig, "label": f"rotation-index-{new_index}", } add_key_body = json.dumps(add_key_payload).encode("utf-8") old_signing = _SigningIdentity(handle=handle, private_key=old_priv_key) add_key_auth = _build_msign(old_signing, "POST", add_key_url, add_key_body) try: _json_post_raw( hub_url, add_key_path, add_key_payload, extra_headers={"Authorization": add_key_auth}, ) except SystemExit: print("❌ Hub registration failed — identity.toml unchanged.", file=sys.stderr) raise # ── Step 3: deregister the old key from the hub ─────────────────────────── # Sign the DELETE with the OLD key (still valid; hub now has both keys). # A failure here is non-fatal — the new key is already registered. delete_path = f"/api/auth/keys/{urllib.parse.quote(handle)}/{urllib.parse.quote(old_key_id)}" delete_url = f"{server_root}{delete_path}" auth_header = _build_msign(old_signing, "DELETE", delete_url, None) try: _hub_delete(delete_url, auth_header, ssl_ctx) print(" → Old key deregistered from hub.", file=sys.stderr) except (urllib.error.HTTPError, urllib.error.URLError) as exc: # Non-fatal: warn but proceed — new key is registered, old key # may have already been deregistered or not been registered. _reason = getattr(exc, "code", None) or getattr(exc, "reason", exc) print( f"⚠️ Could not deregister old key from hub ({_reason}) — " "it may already be revoked or was never registered.", file=sys.stderr, ) # ── Step 4: persist the new identity locally ────────────────────────────── entry: IdentityEntry = { "type": "human", "handle": handle, "algorithm": DEFAULT_SIGN_ALGO, "fingerprint": new_fingerprint, "hd_path": new_hd_path, } try: save_identity(hub_url, entry, mnemonic=mnemonic) except OSError as exc: print(f"❌ Could not persist rotated identity: {exc}", file=sys.stderr) raise SystemExit(ExitCode.INTERNAL_ERROR) from exc if json_out: print(json.dumps(_RotateJson( **make_envelope(elapsed), status="ok", hub=hub_url, hostname=hostname, fingerprint=new_fingerprint, hd_path=new_hd_path, rotation_index=new_index, ))) else: print( f"✅ Key rotated (index {current_index} → {new_index})\n" f" New fingerprint: {new_fingerprint}\n" f" HD path: {new_hd_path}", file=sys.stderr, ) # ── register ────────────────────────────────────────────────────────────────── def run_register(args: argparse.Namespace) -> None: """Register the local Ed25519 key with a MuseHub instance. Performs the full Ed25519 challenge-response flow: request nonce → sign with local private key → submit signature → store returned identity in ``~/.muse/identity.toml``. The private key never leaves the local machine. Agent quickstart ---------------- :: muse auth register --hub https://localhost:1337 --handle gabriel --json muse auth register --hub https://localhost:1337 --handle mybot --agent-id mybot --provisioned-by gabriel --json JSON fields ----------- status ``"registered"`` (new key) or ``"authenticated"`` (re-auth). hub Hub URL used. handle Registered username confirmed by the hub. identity_id Hub-assigned ID for this identity. identity_type ``"human"`` or ``"agent"``. fingerprint SHA-256 hex fingerprint of the public key. token_stored Always ``true`` on success. identity_path Path to ``~/.muse/identity.toml``. Exit codes ---------- 0 Registered or re-authenticated successfully. 1 No hub URL, no local key, hub rejected the signature, or missing ``--handle``. 3 I/O error writing identity file. """ elapsed = start_timer() from muse.core.keypair import ( public_key_fingerprint, public_key_to_b64url, sign_bytes, ) hub: str | None = args.hub handle: str | None = args.handle label: str | None = args.label agent: bool = args.agent agent_id: str | None = getattr(args, "agent_id", None) provisioned_by: str | None = getattr(args, "provisioned_by", None) json_out: bool = args.json_out # --agent-id implies --agent if agent_id: agent = True hub_url = _resolve_hub(hub) if hub_url is None: print( "❌ No hub URL provided.\n" " Pass --hub , or first run: muse hub connect ", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if agent_id and not provisioned_by: print( "❌ --agent-id requires --provisioned-by .\n" " Example: muse auth register --agent-id agentception-abc123 " "--handle agentception-abc123 --provisioned-by gabriel", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) hostname = hostname_from_url(hub_url) base_url = _hub_base_url(hub_url) # Derive the private key from the mnemonic stored in the OS keychain. # The hd_path was written by `muse auth keygen` into the identity entry. keygen_entry = load_identity(hub_url, agent_id=agent_id) hd_path_for_register = keygen_entry.get("hd_path") if keygen_entry else None private_key = None if hd_path_for_register: from muse.core.keychain import load as kc_load from muse.core.bip39 import mnemonic_to_seed from muse.core.slip010 import derive_path, to_ed25519_private_key mnemonic = kc_load() if mnemonic: seed = mnemonic_to_seed(mnemonic) dk = derive_path(seed, hd_path_for_register) try: private_key = to_ed25519_private_key(dk) finally: dk.zero() if private_key is None: keygen_flags = f"--hub {hub_url}" if agent_id: keygen_flags += f" --agent-id {agent_id}" print( f"❌ No Ed25519 key found for {sanitize_display(hostname)}" f"{'#' + sanitize_display(agent_id) if agent_id else ''}.\n" f" Generate one first: muse auth keygen {keygen_flags}", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) public_key = private_key.public_key() fingerprint = public_key_fingerprint(public_key) pub_b64 = public_key_to_b64url(public_key) # Step 1: request a challenge nonce. print(f" → Requesting challenge from {sanitize_display(base_url)} …", file=sys.stderr) challenge_resp = _post_challenge(base_url, { "fingerprint": fingerprint, "algorithm": DEFAULT_SIGN_ALGO, }) challenge_token = challenge_resp.get("challenge_token") or "" if not challenge_token: print( "❌ Hub returned an invalid challenge response (missing challenge_token).", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) is_new_key = challenge_resp.get("is_new_key") or False if is_new_key and not handle: print( "❌ This key is not yet registered. " "Pass --handle to register it.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) nonce_hex = challenge_token if not nonce_hex or not all(c in "0123456789abcdef" for c in nonce_hex): print(f"❌ Hub returned an invalid challenge (expected hex nonce).", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) try: nonce_bytes = bytes.fromhex(nonce_hex) except ValueError as exc: print(f"❌ Could not decode nonce: {sanitize_display(str(exc))}", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) from exc # Step 2: sign the nonce. signature_b64 = sign_bytes(private_key, nonce_bytes) # Step 3: submit the signed nonce. print(f" → Submitting signature to {sanitize_display(base_url)} …", file=sys.stderr) verify_resp = _post_verify(base_url, { "challenge_token": challenge_token, "public_key_b64": pub_b64, "signature_b64": signature_b64, "handle": handle, "label": label, }) returned_handle = verify_resp.get("handle") or handle or "" identity_id = verify_resp.get("identity_id") or "" is_new_identity = verify_resp.get("is_new_identity") or False if not returned_handle: print("❌ Hub returned an invalid verify response (missing handle).", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) identity_type = "agent" if agent else "human" # Build the identity entry — no key_path (mnemonic lives in the OS keychain). # For agent keys, store under the compound key "hostname#agent_id" so that # human and agent entries coexist in identity.toml without collision. entry: IdentityEntry = { "type": identity_type, "handle": returned_handle, "algorithm": DEFAULT_SIGN_ALGO, "fingerprint": fingerprint, } if provisioned_by: entry["provisioned_by"] = provisioned_by # Preserve HD derivation path written by `muse auth keygen`. # keygen_entry was already loaded above to derive the private key. if hd_path_for_register: entry["hd_path"] = hd_path_for_register try: save_identity(hub_url, entry, agent_id=agent_id) except OSError as exc: print(f"❌ Could not write credentials: {exc}", file=sys.stderr) raise SystemExit(ExitCode.INTERNAL_ERROR) from exc action = "registered" if is_new_identity else "authenticated" identity_path = str(get_identity_path()) if json_out: print(json.dumps(_RegisterJson( **make_envelope(elapsed), status=action, hub=hub_url, handle=returned_handle, identity_id=identity_id, identity_type=identity_type, fingerprint=fingerprint, token_stored=False, identity_path=identity_path, ))) else: action_cap = action.capitalize() prov_line = f"\n Provisioned by: {provisioned_by}" if provisioned_by else "" print( f"\n✅ {action_cap} as '{returned_handle}' on {hub_url}{prov_line}\n" f" Identity ID: {identity_id}\n" f" Auth method: ed25519 (key fingerprint: {fingerprint[:16]}…)\n" f" HD path: {hd_path_for_register or 'not set'}\n" f" Identity stored in: {identity_path}", file=sys.stderr, ) # ── whoami ──────────────────────────────────────────────────────────────────── def run_whoami(args: argparse.Namespace) -> None: """Show the identity stored in ``~/.muse/identity.toml`` for a hub. Exits non-zero when no identity is stored — useful for agent branching on authentication status. With ``--all``, shows every configured hub. Agent quickstart ---------------- :: muse auth whoami --json muse auth whoami --hub https://localhost:1337 --json muse auth whoami --all --json muse auth whoami --all --type agent --json JSON fields ----------- hub Hub hostname. type ``"human"`` or ``"agent"``. handle Registered handle. fingerprint SHA-256 hex fingerprint of the public key. key_set ``true`` if an Ed25519 key PEM is stored. capabilities List of capability strings (may be empty). With ``--all``, the response is a list of the above objects. Exit codes ---------- 0 Identity found and printed. 1 No hub configured, or no identity stored for that hub. """ elapsed = start_timer() hub: str | None = args.hub all_hubs: bool = args.all_hubs json_out: bool = args.json_out identity_type: str | None = getattr(args, "identity_type", None) # Validate --type value early _VALID_TYPES = ("human", "agent") if identity_type is not None: if identity_type not in _VALID_TYPES: print( f"❌ Invalid --type value: {sanitize_display(identity_type)!r}\n" f" Valid values: {', '.join(_VALID_TYPES)}", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if all_hubs: identities = list_all_identities() if not identities: print("No identities stored. Run `muse auth keygen` + `muse auth register`.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if identity_type is not None: identities = { hostname: e for hostname, e in identities.items() if e.get("type") == identity_type } if not identities: print( f"No identities of type {identity_type!r} found.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) if json_out: entries_out: list[_WhoamiJson] = [] for hostname, e in sorted(identities.items()): entry_dict: _WhoamiJson = { "hub": hostname, "type": e.get("type") or "", "handle": e.get("handle") or "", "fingerprint": e.get("fingerprint") or "", "key_set": bool(e.get("hd_path")), "capabilities": list(e.get("capabilities") or []), } pb = e.get("provisioned_by") or "" if pb: entry_dict["provisioned_by"] = pb hdp = e.get("hd_path") or "" if hdp: entry_dict["hd_path"] = hdp entries_out.append(entry_dict) print(json.dumps(_WhoamiAllJson(**make_envelope(elapsed), identities=entries_out))) else: for hostname, stored_entry in sorted(identities.items()): _display_entry(hostname, stored_entry, json_output=False) return hub_url = _resolve_hub(hub) if hub_url is None: # No hub in args or repo config — fall back to showing all identities. identities = list_all_identities() if not identities: print("No identities stored. Run `muse auth keygen` + `muse auth register`.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) if json_out: entries_out = [] for hostname, e in sorted(identities.items()): entry_dict = { "hub": hostname, "type": e.get("type") or "", "handle": e.get("handle") or "", "fingerprint": e.get("fingerprint") or "", "key_set": bool(e.get("hd_path")), "capabilities": list(e.get("capabilities") or []), } pb = e.get("provisioned_by") or "" if pb: entry_dict["provisioned_by"] = pb hdp = e.get("hd_path") or "" if hdp: entry_dict["hd_path"] = hdp entries_out.append(entry_dict) print(json.dumps(_WhoamiAllJson(**make_envelope(elapsed), identities=entries_out))) else: for hostname, stored_entry in sorted(identities.items()): _display_entry(hostname, stored_entry, json_output=False) return single_entry = load_identity(hub_url) if single_entry is None: print( f"No identity stored for {sanitize_display(hub_url)}.\n" f"Run: muse auth keygen --hub {hub_url} && muse auth register --hub {hub_url} --handle ", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) _display_entry(hostname_from_url(hub_url), single_entry, json_output=json_out, elapsed=elapsed) # ── logout ──────────────────────────────────────────────────────────────────── def run_logout(args: argparse.Namespace) -> None: """Remove stored credentials for one hub or all hubs. Deletes entries from ``~/.muse/identity.toml``. Idempotent — logging out when no identity is stored exits 0 with ``status: "nothing_to_do"``. The hub URL in ``.muse/config.toml`` is not touched; use ``muse hub disconnect`` to remove the hub association from the repo. Agent quickstart ---------------- :: muse auth logout --json muse auth logout --hub https://localhost:1337 --json muse auth logout --all --json JSON fields ----------- status ``"ok"`` (credentials removed) or ``"nothing_to_do"`` (none found). hubs Sorted list of hostnames logged out from; empty on nothing_to_do. count Number of identities removed; ``0`` on nothing_to_do. Exit codes ---------- 0 Success (credentials removed or nothing to do). 1 No hub URL could be resolved. """ elapsed = start_timer() hub: str | None = args.hub all_hubs: bool = args.all_hubs json_out: bool = args.json_out if all_hubs: removed_hubs = clear_all_identities() if not removed_hubs: if json_out: print(json.dumps(_LogoutJson(**make_envelope(elapsed), status="nothing_to_do", hubs=[], count=0))) else: print("No identities stored.", file=sys.stderr) return if json_out: print(json.dumps(_LogoutJson( **make_envelope(elapsed), status="ok", hubs=removed_hubs, # already sorted by clear_all_identities count=len(removed_hubs), ))) else: hub_list = ", ".join(sanitize_display(h) for h in removed_hubs) print( f"✅ Logged out from {len(removed_hubs)} hub(s): {hub_list}", file=sys.stderr, ) return hub_url = _resolve_hub(hub) if hub_url is None: print( "❌ No hub URL provided.\n" " Pass --hub , or first run: muse hub connect ", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) removed = clear_identity(hub_url) hub_display = hostname_from_url(hub_url) if json_out: if removed: print(json.dumps(_LogoutJson( **make_envelope(elapsed), status="ok", hubs=[sanitize_display(hub_display)], count=1, ))) else: print(json.dumps(_LogoutJson(**make_envelope(elapsed), status="nothing_to_do", hubs=[], count=0))) else: if removed: print(f"✅ Logged out from {sanitize_display(hub_display)}.", file=sys.stderr) else: print( f"No identity stored for {sanitize_display(hub_display)} — nothing to do.", file=sys.stderr, ) # ── show ────────────────────────────────────────────────────────────────────── def _show_identity_detail( hostname: str, entry: IdentityEntry, *, json_output: bool, ) -> None: """Print detailed identity info including HD derivation paths. For HD identities the mnemonic is read from the entry only to derive the AVAX C-Chain address — it is **never** printed. All diagnostic text goes to stderr; ``--json`` output goes to stdout. Args: hostname: Hub hostname (used as the display / JSON ``hub`` key). entry: Identity entry loaded from ``~/.muse/identity.toml``. json_output: When ``True``, emit a JSON object to stdout. """ elapsed = start_timer() from muse.core.hdkeys import ( DOMAIN_IDENTITY, DOMAIN_PAYMENTS, ENTITY_AGENT, ENTITY_HUMAN, muse_path, ) mnemonic = entry.get("mnemonic") or "" hd_path = entry.get("hd_path") or "" handle = entry.get("handle") or "" itype = entry.get("type") or "" fingerprint = entry.get("fingerprint") or "" algorithm = entry.get("algorithm") or "" provisioned_by = entry.get("provisioned_by") or "" provisioned_by_fingerprint = entry.get("provisioned_by_fingerprint") or "" derived_paths: _DerivedPaths[str, str] = { "identity_msign": muse_path(DOMAIN_IDENTITY, ENTITY_HUMAN), "payments_mpay": muse_path(DOMAIN_PAYMENTS, ENTITY_HUMAN), "avax_c_chain": "m/44'/60'/0'/0/0", "agent_slot_0": muse_path(DOMAIN_IDENTITY, ENTITY_AGENT, entity_id=0), } avax_address: str | None = None mnemonic_word_count: int = 0 if hd_path and mnemonic: from muse.core.bip39 import mnemonic_to_seed from muse.core.secp256k1_sign import avax_c_chain_address, derive_avax_key words = mnemonic.strip().split() mnemonic_word_count = len(words) try: seed = mnemonic_to_seed(mnemonic) avax_key = derive_avax_key(seed) avax_address = avax_c_chain_address(avax_key.public_key) except Exception: pass # non-fatal — omit address rather than crashing if json_output: out: _ShowOutputJson = _ShowOutputJson( **make_envelope(elapsed), hub=hostname, handle=handle, type=itype, fingerprint=fingerprint, ) if algorithm: out["algorithm"] = algorithm if provisioned_by: out["provisioned_by"] = provisioned_by if provisioned_by_fingerprint: out["provisioned_by_fingerprint"] = provisioned_by_fingerprint if hd_path: out["hd_path"] = hd_path out["mnemonic_word_count"] = mnemonic_word_count out["derived_paths"] = derived_paths if avax_address: out["avax_c_chain_address"] = avax_address print(json.dumps(out)) else: print("", file=sys.stderr) print(f" Hub: {sanitize_display(hostname)}", file=sys.stderr) print(f" Handle: {sanitize_display(handle) or '—'}", file=sys.stderr) print(f" Type: {itype or 'unknown'}", file=sys.stderr) print(f" Fingerprint: {fingerprint[:16] + '…' if fingerprint else '—'}", file=sys.stderr) if hd_path: print(f" HD path: {hd_path}", file=sys.stderr) print(f" Mnemonic: {mnemonic_word_count} words (phrase not shown)", file=sys.stderr) print("", file=sys.stderr) print(" Derived paths:", file=sys.stderr) for name, path in derived_paths.items(): print(f" {name:<20} {path}", file=sys.stderr) if avax_address: print(f" AVAX C-Chain: {avax_address}", file=sys.stderr) print("", file=sys.stderr) def run_show(args: argparse.Namespace) -> None: """Display full identity details including HD derivation paths and AVAX address. For HD identities, prints all six-level Muse derivation paths and the AVAX C-Chain address. The mnemonic phrase itself is **never** printed — only the word count is shown. Agent quickstart ---------------- :: muse auth show --hub https://localhost:1337 --json muse auth show --json # all stored hubs JSON fields ----------- hub Hub hostname. handle Registered handle. type ``"human"`` or ``"agent"``. fingerprint ``sha256:<64-hex>`` fingerprint of the public key. hd_path SLIP-0010 derivation path (HD identities only). mnemonic_word_count Number of BIP39 mnemonic words (HD identities only). derived_paths Dict of named paths: identity_msign, payments_mpay, etc. (HD only). avax_c_chain_address AVAX C-Chain address (HD identities only). Exit codes ---------- 0 Identity found and displayed. 1 No hub configured and no identities stored. """ elapsed = start_timer() hub: str | None = args.hub json_out: bool = args.json_out hub_url = _resolve_hub(hub) if hub_url is None: identities = list_all_identities() if not identities: print("No identities stored. Run `muse auth keygen` + `muse auth register`.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR) for hostname, stored_entry in sorted(identities.items()): _show_identity_detail(hostname, stored_entry, json_output=json_out) return entry = load_identity(hub_url) if entry is None: print( f"No identity stored for {sanitize_display(hub_url)}.\n" f"Run: muse auth keygen --hub {hub_url} && muse auth register --hub {hub_url} --handle ", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR) _show_identity_detail(hostname_from_url(hub_url), entry, json_output=json_out) # ── cleanup-keys ────────────────────────────────────────────────────────────── def run_cleanup_keys(args: argparse.Namespace) -> None: """Securely overwrite and delete stale PEM files from ~/.muse/keys/. Every ``*.pem`` file is overwritten with cryptographically random bytes of the same length, then unlinked. After Phases 1–4 of the key-material security migration, private keys are derived from the OS-keychain mnemonic at sign time — PEM files are vestigial unprotected key material. Agent quickstart ---------------- :: muse auth cleanup-keys --json JSON fields ----------- destroyed List of absolute paths overwritten and deleted. count Number of files destroyed. Exit codes ---------- 0 All PEM files destroyed (or none existed). 3 I/O error during overwrite or unlink. """ elapsed = start_timer() json_out: bool = args.json_out from muse.core.keypair import _KEYS_DIR destroyed: list[str] = [] pem_files = sorted(_KEYS_DIR.glob("*.pem")) if _KEYS_DIR.exists() else [] for pem_path in pem_files: try: size = pem_path.stat().st_size with pem_path.open("r+b") as fh: fh.write(os.urandom(size)) fh.flush() os.fsync(fh.fileno()) pem_path.unlink() destroyed.append(str(pem_path)) except OSError as exc: print(f"❌ Could not destroy {pem_path}: {exc}", file=sys.stderr) raise SystemExit(ExitCode.INTERNAL_ERROR) from exc if json_out: print(json.dumps(_CleanupKeysJson( **make_envelope(elapsed), destroyed=destroyed, count=len(destroyed), ))) else: if destroyed: for path in destroyed: print(f" 🔥 Destroyed: {path}", file=sys.stderr) print( f"\n✅ {len(destroyed)} PEM file(s) securely overwritten and deleted.\n" f" Key material now derived exclusively from the OS-keychain mnemonic.", file=sys.stderr, ) else: print("✅ No PEM files found — nothing to clean up.", file=sys.stderr) # ── security-check ──────────────────────────────────────────────────────────── def run_security_check(args: argparse.Namespace) -> None: """Verify the local identity is in a clean, PEM-free state. Runs four invariant checks: 1. Mnemonic is present in the OS keychain. 2. No PEM files exist in ``~/.muse/keys/``. 3. ``identity.toml`` has no ``key_path`` fields. 4. Fingerprint in ``identity.toml`` matches derivation from the keychain mnemonic. Exits 0 when all checks pass. Exits 1 if any check fails. Agent quickstart ---------------- :: muse auth security-check --json muse auth security-check --hub https://localhost:1337 --json JSON fields ----------- ok ``true`` when all four checks pass. mnemonic_in_keychain Mnemonic is present in OS keychain. no_pem_files No ``*.pem`` files found in ``~/.muse/keys/``. no_key_path_in_identity No ``key_path`` field in any identity entry. fingerprint_matches_mnemonic Stored fingerprint matches mnemonic derivation. pem_files_found List of PEM paths found (empty on success). key_path_entries List of hostnames whose entries still have ``key_path``. Exit codes ---------- 0 All checks pass. 1 One or more checks fail. """ elapsed = start_timer() hub: str | None = args.hub json_out: bool = args.json_out from muse.core.keypair import _KEYS_DIR from muse.core.keychain import load as kc_load, is_available as kc_avail from muse.core.identity import list_all_identities # Check 1: mnemonic in keychain mnemonic_in_keychain = False if kc_avail(): mnemonic_in_keychain = bool(kc_load()) # Check 2: no PEM files pem_files_found: list[str] = [] if _KEYS_DIR.exists(): pem_files_found = [str(p) for p in sorted(_KEYS_DIR.glob("*.pem"))] no_pem_files = len(pem_files_found) == 0 # Check 3: no key_path in any identity entry key_path_entries: list[str] = [] all_ids = list_all_identities() for hostname, entry in all_ids.items(): if entry.get("key_path"): key_path_entries.append(hostname) no_key_path_in_identity = len(key_path_entries) == 0 # Check 4: fingerprint matches mnemonic derivation fingerprint_matches_mnemonic = False if mnemonic_in_keychain: hub_url = _resolve_hub(hub) if hub_url is None and all_ids: # Pick the first stored identity if no hub specified first_hostname = next(iter(all_ids)) hub_url = f"https://{first_hostname}" if hub_url: from muse.core.identity import load_identity from muse.core.bip39 import mnemonic_to_seed from muse.core.keypair import derive_hd_public_info entry = load_identity(hub_url) if entry and entry.get("hd_path"): mnemonic = kc_load() if mnemonic: seed = mnemonic_to_seed(mnemonic) rotation = _parse_rotation_index(entry["hd_path"]) _, expected_fp = derive_hd_public_info(seed, rotation_index=rotation) stored_fp = entry.get("fingerprint", "") fingerprint_matches_mnemonic = (stored_fp == expected_fp) ok = ( mnemonic_in_keychain and no_pem_files and no_key_path_in_identity and fingerprint_matches_mnemonic ) if json_out: print(json.dumps(_SecurityCheckJson( **make_envelope(elapsed), mnemonic_in_keychain=mnemonic_in_keychain, no_pem_files=no_pem_files, no_key_path_in_identity=no_key_path_in_identity, fingerprint_matches_mnemonic=fingerprint_matches_mnemonic, pem_files_found=pem_files_found, key_path_entries=key_path_entries, ok=ok, ))) else: def _line(label: str, passed: bool, detail: str = "") -> None: icon = "✅" if passed else "❌" suffix = f" — {detail}" if detail else "" print(f" {icon} {label}{suffix}", file=sys.stderr) print("\nmuse auth security-check\n", file=sys.stderr) _line("Mnemonic in OS keychain", mnemonic_in_keychain, "" if mnemonic_in_keychain else "run muse auth recover to restore") _line("No PEM files on disk", no_pem_files, ", ".join(pem_files_found) if pem_files_found else "") _line("No key_path in identity.toml", no_key_path_in_identity, ", ".join(key_path_entries) if key_path_entries else "") _line("Fingerprint matches mnemonic", fingerprint_matches_mnemonic, "" if fingerprint_matches_mnemonic else "re-register to fix") print(f"\n{'✅ All checks passed.' if ok else '❌ One or more checks failed.'}\n", file=sys.stderr) if not ok: raise SystemExit(ExitCode.USER_ERROR)