"""Tests for ``muse auth keygen`` and ``muse auth register``. Covers: - keygen: key generation, file permissions, --force, duplicate key rejection - keygen: public key / fingerprint format - register: full challenge-response flow with a mocked hub - register: token storage in identity.toml - register: error paths (missing key, network errors, bad challenge token) - keypair module: sign / verify round-trip, key loading """ from __future__ import annotations import base64 import json import pathlib import unittest.mock import urllib.error import urllib.request import types from typing import TypedDict import pytest from tests.cli_test_helper import CliRunner from muse.core import keypair as kp_module from muse.core.types import JsonValue from muse.core.types import Manifest type _AuthPayload = dict[str, str | None] type _JsonResponse = dict[str, JsonValue] class _ChallengeResp(TypedDict, total=False): challenge_token: str is_new_key: bool algorithm: str class _VerifyResp(TypedDict, total=False): token: str handle: str identity_id: str is_new_identity: bool auth_method: str cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(tmp_home: pathlib.Path) -> Manifest: """Environment that redirects ~/.muse to a temp directory.""" fake_home = tmp_home / "home" fake_home.mkdir(parents=True, exist_ok=True) return {"HOME": str(fake_home)} def _patch_home(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> pathlib.Path: """Redirect pathlib.Path.home() to a temp dir for this test.""" fake_home = tmp_path / "home" fake_home.mkdir(parents=True, exist_ok=True) monkeypatch.setattr(pathlib.Path, "home", staticmethod(lambda: fake_home)) # Also redirect the module-level constants monkeypatch.setattr(kp_module, "_KEYS_DIR", fake_home / ".muse" / "keys") from muse.core import identity as id_module monkeypatch.setattr(id_module, "_IDENTITY_DIR", fake_home / ".muse") monkeypatch.setattr(id_module, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") return fake_home # --------------------------------------------------------------------------- # keypair module unit tests # --------------------------------------------------------------------------- # Fixed test seeds — deterministic, unique per test scenario. _SEED_A = b"\x01" * 64 _SEED_C = b"\x03" * 64 _SEED_D = b"\x04" * 64 class TestKeypairModule: def test_public_key_to_b64url_has_algo_prefix(self) -> None: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey priv = Ed25519PrivateKey.generate() result = kp_module.public_key_to_b64url(priv.public_key()) assert result.startswith("ed25519:"), f"Expected 'ed25519:' prefix, got: {result!r}" def test_public_key_to_b64url_raw_bytes_round_trip(self) -> None: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from muse.core.types import decode_pubkey priv = Ed25519PrivateKey.generate() pub = priv.public_key() raw_expected = pub.public_bytes(Encoding.Raw, PublicFormat.Raw) encoded = kp_module.public_key_to_b64url(pub) _, raw_decoded = decode_pubkey(encoded) assert raw_decoded == raw_expected def test_sign_bytes_has_algo_prefix(self) -> None: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey priv = Ed25519PrivateKey.generate() result = kp_module.sign_bytes(priv, b"hello") assert result.startswith("ed25519:"), f"Expected 'ed25519:' prefix, got: {result!r}" def test_sign_bytes_verifies_correctly(self) -> None: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.types import decode_sig priv = Ed25519PrivateKey.generate() msg = b"test message" sig_str = kp_module.sign_bytes(priv, msg) _, sig_bytes = decode_sig(sig_str) # Verify with the public key — no exception means success priv.public_key().verify(sig_bytes, msg) def test_fingerprint_matches_sha256(self) -> None: from muse.core.types import public_key_fingerprint, decode_pubkey pub_b64, fingerprint = kp_module.derive_hd_public_info(_SEED_C) assert fingerprint.startswith("sha256:") assert len(fingerprint) == 71 # pub_b64 is now canonically prefixed — decode to raw bytes _, raw = decode_pubkey(pub_b64) assert fingerprint == public_key_fingerprint(raw) def test_derive_hd_public_info_key_has_algo_prefix(self) -> None: pub_b64, _ = kp_module.derive_hd_public_info(_SEED_C) assert pub_b64.startswith("ed25519:"), f"Expected 'ed25519:' prefix, got: {pub_b64!r}" def test_different_seeds_produce_different_keys(self) -> None: pub1, _ = kp_module.derive_hd_public_info(_SEED_A) pub2, _ = kp_module.derive_hd_public_info(_SEED_D) assert pub1 != pub2 # --------------------------------------------------------------------------- # muse auth keygen CLI tests # --------------------------------------------------------------------------- class TestAuthKeygenCLI: HUB = "https://localhost:1337" def test_generates_key_successfully(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: _patch_home(monkeypatch, tmp_path) result = runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB], catch_exceptions=False) assert result.exit_code == 0 assert "Ed25519 keypair generated" in result.stderr assert "Fingerprint" in result.stderr def test_no_pem_file_created(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: """keygen must not write a PEM file — key is derived from mnemonic at sign time.""" _patch_home(monkeypatch, tmp_path) runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB], catch_exceptions=False) keys_dir = tmp_path / "home" / ".muse" / "keys" pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] assert pem_files == [], f"Unexpected PEM files created: {pem_files}" def test_force_flag_still_succeeds(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: """--force still works (generates a new mnemonic, overwrites identity entry).""" _patch_home(monkeypatch, tmp_path) runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB], catch_exceptions=False) r2 = runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB, "--force"], catch_exceptions=False) assert r2.exit_code == 0 def test_no_hub_fails(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: _patch_home(monkeypatch, tmp_path) # chdir to a directory with no hub config so get_hub_url(None) returns None. # MUSE_REPO_ROOT only affects find_repo_root(), not get_hub_url(). monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["auth", "keygen"]) assert result.exit_code != 0 def test_label_shown_in_output(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: _patch_home(monkeypatch, tmp_path) result = runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB, "--label", "My Laptop"], catch_exceptions=False) assert result.exit_code == 0 assert "My Laptop" in result.stderr # --------------------------------------------------------------------------- # muse auth register CLI tests (hub is mocked) # --------------------------------------------------------------------------- _FIXED_MNEMONIC = ( "abandon abandon abandon abandon abandon abandon abandon abandon " "abandon abandon abandon about" ) class TestAuthRegisterCLI: HUB = "https://localhost:1337" def _setup_key(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> tuple[str, str]: """Run keygen so an identity entry with hd_path exists, and seed keychain.""" import muse.core.bip39 as bip39_mod import muse.core.identity as id_module _patch_home(monkeypatch, tmp_path) _kc: dict[str, str] = {} monkeypatch.setattr("muse.core.keychain.is_available", lambda: True) monkeypatch.setattr("muse.core.keychain.store", lambda m: _kc.__setitem__("mnemonic", m)) monkeypatch.setattr("muse.core.keychain.load", lambda: _kc.get("mnemonic")) monkeypatch.setattr("muse.cli.commands.auth._stderr_isatty", lambda: False) monkeypatch.setattr(bip39_mod, "generate_mnemonic", lambda **kw: _FIXED_MNEMONIC) result = runner.invoke(None, ["auth", "keygen", "--hub", self.HUB]) assert result.exit_code == 0, f"keygen setup failed: {result.output}" entry = id_module.load_identity(self.HUB) assert entry is not None return entry.get("public_key_b64", ""), entry.get("fingerprint", "") def _mock_hub( self, monkeypatch: pytest.MonkeyPatch, nonce_hex: str, challenge_resp: _ChallengeResp | None = None, verify_resp: _VerifyResp | None = None, ) -> None: """Patch urllib.request.urlopen to simulate hub challenge-response.""" _challenge_resp = challenge_resp or { "challenge_token": nonce_hex, "is_new_key": True, "algorithm": "ed25519", } _verify_resp = verify_resp or { "handle": "alice", "identity_id": "id-123", "is_new_identity": True, "auth_method": "ed25519", } call_count = 0 class FakeResponse: def __init__(self, data: bytes) -> None: self._data = data def read(self, n: int = -1) -> bytes: return self._data[:n] if n >= 0 else self._data def __enter__(self) -> "FakeResponse": return self def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None) -> None: pass def fake_urlopen(req: urllib.request.Request, timeout: int = 30) -> FakeResponse: nonlocal call_count call_count += 1 if call_count == 1: return FakeResponse(json.dumps(_challenge_resp).encode()) return FakeResponse(json.dumps(_verify_resp).encode()) import muse.cli.commands.auth as auth_mod monkeypatch.setattr(auth_mod, "_json_post_raw", lambda base, path, payload: _challenge_resp if "challenge" in path else _verify_resp) def test_full_registration_stores_identity( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: self._setup_key(monkeypatch, tmp_path) import secrets nonce_hex = secrets.token_hex(32) self._mock_hub(monkeypatch, nonce_hex) result = runner.invoke( cli, ["auth", "register", "--hub", self.HUB, "--handle", "alice"], catch_exceptions=False, ) assert result.exit_code == 0 assert "alice" in result.stderr # Ed25519 identity must be persisted from muse.core.identity import load_identity entry = load_identity(self.HUB) assert entry is not None assert entry.get("handle") == "alice" assert entry.get("hd_path") is not None assert "key_path" not in entry assert "token" not in entry def test_no_key_fails_gracefully( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _patch_home(monkeypatch, tmp_path) result = runner.invoke( cli, ["auth", "register", "--hub", self.HUB, "--handle", "alice"], ) assert result.exit_code != 0 assert "keygen" in result.stderr.lower() or "no ed25519 key" in result.stderr.lower() def test_new_key_without_handle_fails( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: self._setup_key(monkeypatch, tmp_path) import muse.cli.commands.auth as auth_mod def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: if "challenge" in path: return { "challenge_token": "ab" * 32, "is_new_key": True, "algorithm": "ed25519", } return {} # should not reach here monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) result = runner.invoke(cli, ["auth", "register", "--hub", self.HUB]) assert result.exit_code != 0 assert "--handle" in result.stderr def test_agent_flag_marks_identity_as_agent( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: self._setup_key(monkeypatch, tmp_path) import muse.cli.commands.auth as auth_mod import secrets nonce_hex = secrets.token_hex(32) def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: if "challenge" in path: return {"challenge_token": nonce_hex, "is_new_key": False, "algorithm": "ed25519"} return {"handle": "bot", "identity_id": "id-bot", "is_new_identity": False, "auth_method": "ed25519"} monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) runner.invoke(cli, ["auth", "register", "--hub", self.HUB, "--handle", "bot", "--agent"], catch_exceptions=False) from muse.core.identity import load_identity entry = load_identity(self.HUB) assert entry is not None assert entry.get("type") == "agent" def test_bad_challenge_token_fails( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: self._setup_key(monkeypatch, tmp_path) import muse.cli.commands.auth as auth_mod def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: return {"challenge_token": "not-valid-hex!", "is_new_key": False, "algorithm": "ed25519"} monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) result = runner.invoke(cli, ["auth", "register", "--hub", self.HUB, "--handle", "alice"]) assert result.exit_code != 0 def test_empty_verify_response_fails( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Verify response with no handle and no --handle fallback fails.""" self._setup_key(monkeypatch, tmp_path) import muse.cli.commands.auth as auth_mod import secrets nonce_hex = secrets.token_hex(32) def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: if "challenge" in path: return {"challenge_token": nonce_hex, "is_new_key": False, "algorithm": "ed25519"} return {} # No handle field monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) # No --handle flag, and hub returns no handle → must fail result = runner.invoke(cli, ["auth", "register", "--hub", self.HUB]) assert result.exit_code != 0