test_cmd_auth_keygen_register.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
| 1 | """Tests for ``muse auth keygen`` and ``muse auth register``. |
| 2 | |
| 3 | Covers: |
| 4 | - keygen: key generation, file permissions, --force, duplicate key rejection |
| 5 | - keygen: public key / fingerprint format |
| 6 | - register: full challenge-response flow with a mocked hub |
| 7 | - register: token storage in identity.toml |
| 8 | - register: error paths (missing key, network errors, bad challenge token) |
| 9 | - keypair module: sign / verify round-trip, key loading |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import base64 |
| 14 | import json |
| 15 | import pathlib |
| 16 | import unittest.mock |
| 17 | import urllib.error |
| 18 | import urllib.request |
| 19 | import types |
| 20 | from typing import TypedDict |
| 21 | |
| 22 | import pytest |
| 23 | from tests.cli_test_helper import CliRunner |
| 24 | |
| 25 | from muse.core import keypair as kp_module |
| 26 | from muse.core.types import JsonValue |
| 27 | from muse.core.types import Manifest |
| 28 | |
| 29 | type _AuthPayload = dict[str, str | None] |
| 30 | type _JsonResponse = dict[str, JsonValue] |
| 31 | |
| 32 | |
| 33 | class _ChallengeResp(TypedDict, total=False): |
| 34 | challenge_token: str |
| 35 | is_new_key: bool |
| 36 | algorithm: str |
| 37 | |
| 38 | |
| 39 | class _VerifyResp(TypedDict, total=False): |
| 40 | token: str |
| 41 | handle: str |
| 42 | identity_id: str |
| 43 | is_new_identity: bool |
| 44 | auth_method: str |
| 45 | |
| 46 | cli = None |
| 47 | runner = CliRunner() |
| 48 | |
| 49 | |
| 50 | # --------------------------------------------------------------------------- |
| 51 | # Helpers |
| 52 | # --------------------------------------------------------------------------- |
| 53 | |
| 54 | |
| 55 | def _env(tmp_home: pathlib.Path) -> Manifest: |
| 56 | """Environment that redirects ~/.muse to a temp directory.""" |
| 57 | fake_home = tmp_home / "home" |
| 58 | fake_home.mkdir(parents=True, exist_ok=True) |
| 59 | return {"HOME": str(fake_home)} |
| 60 | |
| 61 | |
| 62 | def _patch_home(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> pathlib.Path: |
| 63 | """Redirect pathlib.Path.home() to a temp dir for this test.""" |
| 64 | fake_home = tmp_path / "home" |
| 65 | fake_home.mkdir(parents=True, exist_ok=True) |
| 66 | monkeypatch.setattr(pathlib.Path, "home", staticmethod(lambda: fake_home)) |
| 67 | # Also redirect the module-level constants |
| 68 | monkeypatch.setattr(kp_module, "_KEYS_DIR", fake_home / ".muse" / "keys") |
| 69 | from muse.core import identity as id_module |
| 70 | monkeypatch.setattr(id_module, "_IDENTITY_DIR", fake_home / ".muse") |
| 71 | monkeypatch.setattr(id_module, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") |
| 72 | return fake_home |
| 73 | |
| 74 | |
| 75 | # --------------------------------------------------------------------------- |
| 76 | # keypair module unit tests |
| 77 | # --------------------------------------------------------------------------- |
| 78 | |
| 79 | # Fixed test seeds — deterministic, unique per test scenario. |
| 80 | _SEED_A = b"\x01" * 64 |
| 81 | _SEED_C = b"\x03" * 64 |
| 82 | _SEED_D = b"\x04" * 64 |
| 83 | |
| 84 | |
| 85 | class TestKeypairModule: |
| 86 | def test_public_key_to_b64url_has_algo_prefix(self) -> None: |
| 87 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 88 | priv = Ed25519PrivateKey.generate() |
| 89 | result = kp_module.public_key_to_b64url(priv.public_key()) |
| 90 | assert result.startswith("ed25519:"), f"Expected 'ed25519:' prefix, got: {result!r}" |
| 91 | |
| 92 | def test_public_key_to_b64url_raw_bytes_round_trip(self) -> None: |
| 93 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 94 | from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
| 95 | from muse.core.types import decode_pubkey |
| 96 | priv = Ed25519PrivateKey.generate() |
| 97 | pub = priv.public_key() |
| 98 | raw_expected = pub.public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 99 | encoded = kp_module.public_key_to_b64url(pub) |
| 100 | _, raw_decoded = decode_pubkey(encoded) |
| 101 | assert raw_decoded == raw_expected |
| 102 | |
| 103 | def test_sign_bytes_has_algo_prefix(self) -> None: |
| 104 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 105 | priv = Ed25519PrivateKey.generate() |
| 106 | result = kp_module.sign_bytes(priv, b"hello") |
| 107 | assert result.startswith("ed25519:"), f"Expected 'ed25519:' prefix, got: {result!r}" |
| 108 | |
| 109 | def test_sign_bytes_verifies_correctly(self) -> None: |
| 110 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 111 | from muse.core.types import decode_sig |
| 112 | priv = Ed25519PrivateKey.generate() |
| 113 | msg = b"test message" |
| 114 | sig_str = kp_module.sign_bytes(priv, msg) |
| 115 | _, sig_bytes = decode_sig(sig_str) |
| 116 | # Verify with the public key — no exception means success |
| 117 | priv.public_key().verify(sig_bytes, msg) |
| 118 | |
| 119 | def test_fingerprint_matches_sha256(self) -> None: |
| 120 | from muse.core.types import public_key_fingerprint, decode_pubkey |
| 121 | |
| 122 | pub_b64, fingerprint = kp_module.derive_hd_public_info(_SEED_C) |
| 123 | assert fingerprint.startswith("sha256:") |
| 124 | assert len(fingerprint) == 71 |
| 125 | # pub_b64 is now canonically prefixed — decode to raw bytes |
| 126 | _, raw = decode_pubkey(pub_b64) |
| 127 | assert fingerprint == public_key_fingerprint(raw) |
| 128 | |
| 129 | def test_derive_hd_public_info_key_has_algo_prefix(self) -> None: |
| 130 | pub_b64, _ = kp_module.derive_hd_public_info(_SEED_C) |
| 131 | assert pub_b64.startswith("ed25519:"), f"Expected 'ed25519:' prefix, got: {pub_b64!r}" |
| 132 | |
| 133 | def test_different_seeds_produce_different_keys(self) -> None: |
| 134 | pub1, _ = kp_module.derive_hd_public_info(_SEED_A) |
| 135 | pub2, _ = kp_module.derive_hd_public_info(_SEED_D) |
| 136 | assert pub1 != pub2 |
| 137 | |
| 138 | |
| 139 | # --------------------------------------------------------------------------- |
| 140 | # muse auth keygen CLI tests |
| 141 | # --------------------------------------------------------------------------- |
| 142 | |
| 143 | |
| 144 | class TestAuthKeygenCLI: |
| 145 | HUB = "https://localhost:1337" |
| 146 | |
| 147 | def test_generates_key_successfully(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: |
| 148 | _patch_home(monkeypatch, tmp_path) |
| 149 | result = runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB], catch_exceptions=False) |
| 150 | assert result.exit_code == 0 |
| 151 | assert "Ed25519 keypair generated" in result.stderr |
| 152 | assert "Fingerprint" in result.stderr |
| 153 | |
| 154 | def test_no_pem_file_created(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: |
| 155 | """keygen must not write a PEM file — key is derived from mnemonic at sign time.""" |
| 156 | _patch_home(monkeypatch, tmp_path) |
| 157 | runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB], catch_exceptions=False) |
| 158 | keys_dir = tmp_path / "home" / ".muse" / "keys" |
| 159 | pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] |
| 160 | assert pem_files == [], f"Unexpected PEM files created: {pem_files}" |
| 161 | |
| 162 | def test_force_flag_still_succeeds(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: |
| 163 | """--force still works (generates a new mnemonic, overwrites identity entry).""" |
| 164 | _patch_home(monkeypatch, tmp_path) |
| 165 | runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB], catch_exceptions=False) |
| 166 | r2 = runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB, "--force"], catch_exceptions=False) |
| 167 | assert r2.exit_code == 0 |
| 168 | |
| 169 | def test_no_hub_fails(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: |
| 170 | _patch_home(monkeypatch, tmp_path) |
| 171 | # chdir to a directory with no hub config so get_hub_url(None) returns None. |
| 172 | # MUSE_REPO_ROOT only affects find_repo_root(), not get_hub_url(). |
| 173 | monkeypatch.chdir(tmp_path) |
| 174 | result = runner.invoke(cli, ["auth", "keygen"]) |
| 175 | assert result.exit_code != 0 |
| 176 | |
| 177 | def test_label_shown_in_output(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> None: |
| 178 | _patch_home(monkeypatch, tmp_path) |
| 179 | result = runner.invoke(cli, ["auth", "keygen", "--hub", self.HUB, "--label", "My Laptop"], |
| 180 | catch_exceptions=False) |
| 181 | assert result.exit_code == 0 |
| 182 | assert "My Laptop" in result.stderr |
| 183 | |
| 184 | |
| 185 | # --------------------------------------------------------------------------- |
| 186 | # muse auth register CLI tests (hub is mocked) |
| 187 | # --------------------------------------------------------------------------- |
| 188 | |
| 189 | |
| 190 | _FIXED_MNEMONIC = ( |
| 191 | "abandon abandon abandon abandon abandon abandon abandon abandon " |
| 192 | "abandon abandon abandon about" |
| 193 | ) |
| 194 | |
| 195 | |
| 196 | class TestAuthRegisterCLI: |
| 197 | HUB = "https://localhost:1337" |
| 198 | |
| 199 | def _setup_key(self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> tuple[str, str]: |
| 200 | """Run keygen so an identity entry with hd_path exists, and seed keychain.""" |
| 201 | import muse.core.bip39 as bip39_mod |
| 202 | import muse.core.identity as id_module |
| 203 | |
| 204 | _patch_home(monkeypatch, tmp_path) |
| 205 | |
| 206 | _kc: dict[str, str] = {} |
| 207 | monkeypatch.setattr("muse.core.keychain.is_available", lambda: True) |
| 208 | monkeypatch.setattr("muse.core.keychain.store", lambda m: _kc.__setitem__("mnemonic", m)) |
| 209 | monkeypatch.setattr("muse.core.keychain.load", lambda: _kc.get("mnemonic")) |
| 210 | monkeypatch.setattr("muse.cli.commands.auth._stderr_isatty", lambda: False) |
| 211 | monkeypatch.setattr(bip39_mod, "generate_mnemonic", lambda **kw: _FIXED_MNEMONIC) |
| 212 | |
| 213 | result = runner.invoke(None, ["auth", "keygen", "--hub", self.HUB]) |
| 214 | assert result.exit_code == 0, f"keygen setup failed: {result.output}" |
| 215 | |
| 216 | entry = id_module.load_identity(self.HUB) |
| 217 | assert entry is not None |
| 218 | return entry.get("public_key_b64", ""), entry.get("fingerprint", "") |
| 219 | |
| 220 | def _mock_hub( |
| 221 | self, |
| 222 | monkeypatch: pytest.MonkeyPatch, |
| 223 | nonce_hex: str, |
| 224 | challenge_resp: _ChallengeResp | None = None, |
| 225 | verify_resp: _VerifyResp | None = None, |
| 226 | ) -> None: |
| 227 | """Patch urllib.request.urlopen to simulate hub challenge-response.""" |
| 228 | _challenge_resp = challenge_resp or { |
| 229 | "challenge_token": nonce_hex, |
| 230 | "is_new_key": True, |
| 231 | "algorithm": "ed25519", |
| 232 | } |
| 233 | _verify_resp = verify_resp or { |
| 234 | "handle": "alice", |
| 235 | "identity_id": "id-123", |
| 236 | "is_new_identity": True, |
| 237 | "auth_method": "ed25519", |
| 238 | } |
| 239 | call_count = 0 |
| 240 | |
| 241 | class FakeResponse: |
| 242 | def __init__(self, data: bytes) -> None: |
| 243 | self._data = data |
| 244 | |
| 245 | def read(self, n: int = -1) -> bytes: |
| 246 | return self._data[:n] if n >= 0 else self._data |
| 247 | |
| 248 | def __enter__(self) -> "FakeResponse": |
| 249 | return self |
| 250 | |
| 251 | def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None) -> None: |
| 252 | pass |
| 253 | |
| 254 | def fake_urlopen(req: urllib.request.Request, timeout: int = 30) -> FakeResponse: |
| 255 | nonlocal call_count |
| 256 | call_count += 1 |
| 257 | if call_count == 1: |
| 258 | return FakeResponse(json.dumps(_challenge_resp).encode()) |
| 259 | return FakeResponse(json.dumps(_verify_resp).encode()) |
| 260 | |
| 261 | import muse.cli.commands.auth as auth_mod |
| 262 | monkeypatch.setattr(auth_mod, "_json_post_raw", lambda base, path, payload: _challenge_resp if "challenge" in path else _verify_resp) |
| 263 | |
| 264 | def test_full_registration_stores_identity( |
| 265 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 266 | ) -> None: |
| 267 | self._setup_key(monkeypatch, tmp_path) |
| 268 | import secrets |
| 269 | nonce_hex = secrets.token_hex(32) |
| 270 | self._mock_hub(monkeypatch, nonce_hex) |
| 271 | |
| 272 | result = runner.invoke( |
| 273 | cli, |
| 274 | ["auth", "register", "--hub", self.HUB, "--handle", "alice"], |
| 275 | catch_exceptions=False, |
| 276 | ) |
| 277 | assert result.exit_code == 0 |
| 278 | assert "alice" in result.stderr |
| 279 | |
| 280 | # Ed25519 identity must be persisted |
| 281 | from muse.core.identity import load_identity |
| 282 | entry = load_identity(self.HUB) |
| 283 | assert entry is not None |
| 284 | assert entry.get("handle") == "alice" |
| 285 | assert entry.get("hd_path") is not None |
| 286 | assert "key_path" not in entry |
| 287 | assert "token" not in entry |
| 288 | |
| 289 | def test_no_key_fails_gracefully( |
| 290 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 291 | ) -> None: |
| 292 | _patch_home(monkeypatch, tmp_path) |
| 293 | result = runner.invoke( |
| 294 | cli, |
| 295 | ["auth", "register", "--hub", self.HUB, "--handle", "alice"], |
| 296 | ) |
| 297 | assert result.exit_code != 0 |
| 298 | assert "keygen" in result.stderr.lower() or "no ed25519 key" in result.stderr.lower() |
| 299 | |
| 300 | def test_new_key_without_handle_fails( |
| 301 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 302 | ) -> None: |
| 303 | self._setup_key(monkeypatch, tmp_path) |
| 304 | import muse.cli.commands.auth as auth_mod |
| 305 | |
| 306 | def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: |
| 307 | if "challenge" in path: |
| 308 | return { |
| 309 | "challenge_token": "ab" * 32, |
| 310 | "is_new_key": True, |
| 311 | "algorithm": "ed25519", |
| 312 | } |
| 313 | return {} # should not reach here |
| 314 | |
| 315 | monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) |
| 316 | result = runner.invoke(cli, ["auth", "register", "--hub", self.HUB]) |
| 317 | assert result.exit_code != 0 |
| 318 | assert "--handle" in result.stderr |
| 319 | |
| 320 | def test_agent_flag_marks_identity_as_agent( |
| 321 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 322 | ) -> None: |
| 323 | self._setup_key(monkeypatch, tmp_path) |
| 324 | import muse.cli.commands.auth as auth_mod |
| 325 | import secrets |
| 326 | |
| 327 | nonce_hex = secrets.token_hex(32) |
| 328 | |
| 329 | def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: |
| 330 | if "challenge" in path: |
| 331 | return {"challenge_token": nonce_hex, "is_new_key": False, "algorithm": "ed25519"} |
| 332 | return {"handle": "bot", "identity_id": "id-bot", "is_new_identity": False, "auth_method": "ed25519"} |
| 333 | |
| 334 | monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) |
| 335 | runner.invoke(cli, ["auth", "register", "--hub", self.HUB, "--handle", "bot", "--agent"], catch_exceptions=False) |
| 336 | |
| 337 | from muse.core.identity import load_identity |
| 338 | entry = load_identity(self.HUB) |
| 339 | assert entry is not None |
| 340 | assert entry.get("type") == "agent" |
| 341 | |
| 342 | def test_bad_challenge_token_fails( |
| 343 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 344 | ) -> None: |
| 345 | self._setup_key(monkeypatch, tmp_path) |
| 346 | import muse.cli.commands.auth as auth_mod |
| 347 | |
| 348 | def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: |
| 349 | return {"challenge_token": "not-valid-hex!", "is_new_key": False, "algorithm": "ed25519"} |
| 350 | |
| 351 | monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) |
| 352 | result = runner.invoke(cli, ["auth", "register", "--hub", self.HUB, "--handle", "alice"]) |
| 353 | assert result.exit_code != 0 |
| 354 | |
| 355 | def test_empty_verify_response_fails( |
| 356 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 357 | ) -> None: |
| 358 | """Verify response with no handle and no --handle fallback fails.""" |
| 359 | self._setup_key(monkeypatch, tmp_path) |
| 360 | import muse.cli.commands.auth as auth_mod |
| 361 | import secrets |
| 362 | |
| 363 | nonce_hex = secrets.token_hex(32) |
| 364 | |
| 365 | def fake_json_post(base: str, path: str, payload: _AuthPayload) -> _JsonResponse: |
| 366 | if "challenge" in path: |
| 367 | return {"challenge_token": nonce_hex, "is_new_key": False, "algorithm": "ed25519"} |
| 368 | return {} # No handle field |
| 369 | |
| 370 | monkeypatch.setattr(auth_mod, "_json_post_raw", fake_json_post) |
| 371 | # No --handle flag, and hub returns no handle → must fail |
| 372 | result = runner.invoke(cli, ["auth", "register", "--hub", self.HUB]) |
| 373 | assert result.exit_code != 0 |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
29 days ago