"""Unified TDD tests for HD-only keygen architecture. This file validates: - agent_id_to_slot: stable, deterministic, BIP32-safe slot mapping - Human keygen: fresh mnemonic, no --hd flag needed (HD is the only mode) - Agent keygen: derived from operator's mnemonic via derive_agent_sub_seed - run_recover: re-derives same fingerprint from stored mnemonic - No JBOK: generate_keypair must not exist - Integration flow: keygen → agent keygen → recover round-trip """ from __future__ import annotations import base64 import hashlib import json import pathlib import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core import keypair as kp_module from muse.core import identity as id_module from muse.core.bip39 import mnemonic_to_seed, validate_mnemonic from muse.core.hdkeys import ( DOMAIN_IDENTITY, ENTITY_AGENT, ENTITY_HUMAN, MUSE_PURPOSE, ROLE_SIGN, agent_id_to_slot, derive_agent_sub_seed, derive_identity_key, muse_path, ) runner = CliRunner() _HUB = "https://localhost:1337" _HOSTNAME = "localhost:1337" # A well-known BIP39 test mnemonic (abandon × 11 + about) _TEST_MNEMONIC_12 = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _patch_home(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> pathlib.Path: fake_home = tmp_path / "home" fake_home.mkdir(parents=True, exist_ok=True) monkeypatch.setattr(pathlib.Path, "home", staticmethod(lambda: fake_home)) monkeypatch.setattr(kp_module, "_KEYS_DIR", fake_home / ".muse" / "keys") monkeypatch.setattr(id_module, "_IDENTITY_DIR", fake_home / ".muse") monkeypatch.setattr(id_module, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") return fake_home type _KcStore = dict[str, str] _kc_store: _KcStore = {} def _patch_keychain(monkeypatch: pytest.MonkeyPatch) -> _KcStore: """Isolate the keychain from the real OS keychain.""" _kc_store.clear() monkeypatch.setattr("muse.core.keychain.is_available", lambda: True) monkeypatch.setattr("muse.core.keychain.load", lambda: _kc_store.get("mnemonic")) monkeypatch.setattr("muse.core.keychain.store", lambda m: _kc_store.__setitem__("mnemonic", m)) monkeypatch.setattr("muse.core.keychain.delete", lambda: _kc_store.pop("mnemonic", None)) return _kc_store def _keygen(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path, extra_args: list[str] | None = None) -> tuple[pathlib.Path, InvokeResult]: """Run ``muse auth keygen --hub `` and return (fake_home, result). Patches both the home directory and the keychain so tests are hermetic. """ fake_home = _patch_home(monkeypatch, tmp_path) _patch_keychain(monkeypatch) args = ["auth", "keygen", "--hub", _HUB] + (extra_args or []) result = runner.invoke(None, args) return fake_home, result # --------------------------------------------------------------------------- # agent_id_to_slot — unit tests # --------------------------------------------------------------------------- class TestAgentIdToSlot: """agent_id_to_slot must map handle strings to stable, valid BIP32 indices.""" def test_returns_int(self) -> None: slot = agent_id_to_slot("my-agent") assert isinstance(slot, int) def test_in_valid_bip32_range(self) -> None: """All slots must be in [0, 2^31 - 1] (hardened offset applied by caller).""" for handle in ["alpha", "beta", "gamma-007", "a" * 100]: slot = agent_id_to_slot(handle) assert 0 <= slot <= 0x7FFF_FFFF, f"slot={slot} out of range for {handle!r}" def test_deterministic(self) -> None: """Same handle must always produce the same slot.""" handle = "agentception-abc123" assert agent_id_to_slot(handle) == agent_id_to_slot(handle) def test_distinct_handles_likely_distinct_slots(self) -> None: """Different handles should not collide (SHA-256 collision resistance).""" handles = ["alice", "bob", "carol", "dave", "eve", "frank"] slots = [agent_id_to_slot(h) for h in handles] assert len(set(slots)) == len(slots), f"Unexpected slot collision: {slots}" def test_known_vector(self) -> None: """Verify the slot for 'agentception' against a manually computed value.""" import hashlib as _hashlib handle = "agentception" digest = _hashlib.sha256(handle.encode()).digest() expected = int.from_bytes(digest[:4], "big") & 0x7FFF_FFFF assert agent_id_to_slot(handle) == expected def test_empty_string_handled(self) -> None: """Edge case: empty string handle should not crash.""" slot = agent_id_to_slot("") assert 0 <= slot <= 0x7FFF_FFFF def test_unicode_handle(self) -> None: """Unicode agent handles should produce valid slots.""" slot = agent_id_to_slot("音楽エージェント") assert 0 <= slot <= 0x7FFF_FFFF # --------------------------------------------------------------------------- # No JBOK — generate_keypair must not exist # --------------------------------------------------------------------------- class TestNoJbok: """JBOK mode is deleted. generate_keypair must not exist anywhere.""" def test_generate_keypair_not_in_module(self) -> None: import importlib kp = importlib.import_module("muse.core.keypair") assert not hasattr(kp, "generate_keypair"), \ "generate_keypair still exists — JBOK was not fully removed" def test_generate_keypair_not_importable(self) -> None: with pytest.raises(ImportError): from muse.core.keypair import generate_keypair # noqa: F401 # --------------------------------------------------------------------------- # Human keygen — no --hd flag, 24-word default # --------------------------------------------------------------------------- class TestHumanKeygen: """Human keygen: HD is the only mode. No --hd flag required.""" def test_exits_zero( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _, result = _keygen(monkeypatch, tmp_path) assert result.exit_code == 0, result.output def test_no_pem_written( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Keygen must NOT write any PEM file — keys live in keychain only.""" fake_home, result = _keygen(monkeypatch, tmp_path) keys_dir = fake_home / ".muse" / "keys" pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] assert pem_files == [], f"Unexpected PEM files written: {pem_files}" def test_hd_path_in_identity_toml( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Identity entry must contain hd_path (derivation anchor) and no key_path.""" import tomllib fake_home, result = _keygen(monkeypatch, tmp_path) assert result.exit_code == 0 data = tomllib.loads((fake_home / ".muse" / "identity.toml").read_text()) entry = data[_HOSTNAME] assert "hd_path" in entry, "hd_path missing from identity.toml" assert "key_path" not in entry, "key_path must not be written" def test_default_24_word_mnemonic( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Default strength=256 produces a 24-word mnemonic stored in keychain.""" _, result = _keygen(monkeypatch, tmp_path) assert result.exit_code == 0 # Mnemonic is never printed; read it from the in-memory keychain store. mnemonic = _kc_store.get("mnemonic") assert mnemonic is not None, "mnemonic not stored in keychain" assert len(mnemonic.split()) == 24, f"Expected 24 words, got: {mnemonic!r}" assert validate_mnemonic(mnemonic) def test_json_no_mnemonic_in_stdout( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Mnemonic is sensitive — must never appear in JSON stdout.""" _, result = _keygen(monkeypatch, tmp_path, ["--json"]) payload = json.loads(result.output.splitlines()[0]) assert "mnemonic" not in payload def test_json_mnemonic_word_count_24( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _, result = _keygen(monkeypatch, tmp_path, ["--json"]) payload = json.loads(result.output.splitlines()[0]) assert payload.get("mnemonic_word_count") == 24 def test_identity_toml_has_no_key_source( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: import tomllib fake_home, result = _keygen(monkeypatch, tmp_path) assert result.exit_code == 0 data = tomllib.loads((fake_home / ".muse" / "identity.toml").read_text()) assert "key_source" not in data[_HOSTNAME] def test_force_overwrites_existing( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _keygen(monkeypatch, tmp_path) _, result = _keygen(monkeypatch, tmp_path, ["--force"]) assert result.exit_code == 0 def test_no_force_rejects_existing( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _keygen(monkeypatch, tmp_path) _, result = _keygen(monkeypatch, tmp_path) # second time, no --force assert result.exit_code != 0 def test_strength_128_gives_12_words( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _, result = _keygen(monkeypatch, tmp_path, ["--strength", "128", "--json"]) assert result.exit_code == 0 payload = json.loads(result.output.splitlines()[0]) assert payload["mnemonic_word_count"] == 12 # --------------------------------------------------------------------------- # Agent keygen — derived from operator's mnemonic # --------------------------------------------------------------------------- class TestAgentKeygen: """Agent keys must be derived from the operator's HD mnemonic.""" def _setup_operator( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> pathlib.Path: """Generate a human (operator) key first, with isolated home + keychain.""" fake_home = _patch_home(monkeypatch, tmp_path) _patch_keychain(monkeypatch) result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB]) assert result.exit_code == 0, f"Operator keygen failed:\n{result.output}" return fake_home def test_agent_keygen_exits_zero( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: self._setup_operator(monkeypatch, tmp_path) result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) assert result.exit_code == 0, result.output def test_agent_no_pem_written( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Agent keygen must not write any PEM file.""" fake_home = self._setup_operator(monkeypatch, tmp_path) runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) keys_dir = fake_home / ".muse" / "keys" pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] assert pem_files == [], f"Unexpected PEM files written: {pem_files}" def test_agent_hd_path_in_identity_toml( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Agent entry must contain hd_path and no key_path.""" import tomllib fake_home = self._setup_operator(monkeypatch, tmp_path) runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) data = tomllib.loads((fake_home / ".muse" / "identity.toml").read_text()) agent_key = f"{_HOSTNAME}#bot-alpha" assert agent_key in data, f"No entry for {agent_key} in identity.toml" entry = data[agent_key] assert "hd_path" in entry, "hd_path missing from agent identity entry" assert "key_path" not in entry, "key_path must not be written" def test_agent_json_has_hd_path( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: self._setup_operator(monkeypatch, tmp_path) result = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] ) assert result.exit_code == 0, result.output payload = json.loads(result.output.splitlines()[0]) assert "hd_path" in payload assert str(MUSE_PURPOSE) in payload["hd_path"] def test_agent_json_has_provisioned_by( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: self._setup_operator(monkeypatch, tmp_path) result = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] ) payload = json.loads(result.output.splitlines()[0]) assert "provisioned_by_fingerprint" in payload assert payload["provisioned_by_fingerprint"].startswith("sha256:") def test_agent_key_different_from_human_key( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Agent fingerprint must differ from the operator fingerprint.""" self._setup_operator(monkeypatch, tmp_path) op_result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"]) # operator already exists — get fingerprint from identity.toml via agent keygen output result = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] ) assert result.exit_code == 0, result.output agent_payload = json.loads(result.output.splitlines()[0]) human_payload = json.loads(op_result.output.splitlines()[0]) if op_result.exit_code == 0 else None if human_payload: assert agent_payload["fingerprint"] != human_payload["fingerprint"], \ "Agent and human keys must be distinct" # Also verify provisioned_by differs from the agent fingerprint assert agent_payload["fingerprint"] != agent_payload["provisioned_by_fingerprint"] def test_two_agents_have_distinct_keys( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Different agent handles must produce different fingerprints.""" self._setup_operator(monkeypatch, tmp_path) result_a = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] ) result_b = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-beta", "--json"] ) assert result_a.exit_code == 0, result_a.output assert result_b.exit_code == 0, result_b.output fp_a = json.loads(result_a.output.splitlines()[0])["fingerprint"] fp_b = json.loads(result_b.output.splitlines()[0])["fingerprint"] assert fp_a != fp_b, "Different agent handles must produce different keys" def test_agent_keygen_without_operator_exits_nonzero( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Attempt to derive agent key before operator key is set up.""" _patch_home(monkeypatch, tmp_path) result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) assert result.exit_code != 0 def test_agent_key_deterministic( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Same operator mnemonic + same agent handle = same agent key.""" fake_home = self._setup_operator(monkeypatch, tmp_path) result1 = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] ) fp1 = json.loads(result1.output.splitlines()[0])["fingerprint"] # Re-derive: force-overwrite the agent key (same operator mnemonic on disk) result2 = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--force", "--json"] ) fp2 = json.loads(result2.output.splitlines()[0])["fingerprint"] assert fp1 == fp2, "Agent key not deterministic given same operator mnemonic + handle" # --------------------------------------------------------------------------- # derive_agent_sub_seed — unit tests (no CLI) # --------------------------------------------------------------------------- class TestDeriveAgentSubSeed: """derive_agent_sub_seed must produce stable, domain-isolated sub-seeds.""" def test_returns_64_bytes(self) -> None: seed = mnemonic_to_seed(_TEST_MNEMONIC_12) slot = agent_id_to_slot("bot-alpha") sub_seed = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) assert len(sub_seed) == 64 def test_deterministic(self) -> None: seed = mnemonic_to_seed(_TEST_MNEMONIC_12) slot = agent_id_to_slot("bot-alpha") s1 = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) s2 = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) assert s1 == s2 def test_different_slots_different_sub_seeds(self) -> None: seed = mnemonic_to_seed(_TEST_MNEMONIC_12) slot_a = agent_id_to_slot("bot-alpha") slot_b = agent_id_to_slot("bot-beta") assert slot_a != slot_b sub_a = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot_a) sub_b = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot_b) assert sub_a != sub_b def test_different_domains_different_sub_seeds(self) -> None: seed = mnemonic_to_seed(_TEST_MNEMONIC_12) slot = agent_id_to_slot("bot-alpha") DOMAIN_PAYMENTS = 1 sub_id = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) sub_pay = derive_agent_sub_seed(seed, DOMAIN_PAYMENTS, slot) assert sub_id != sub_pay def test_sub_seed_differs_from_parent_seed(self) -> None: seed = mnemonic_to_seed(_TEST_MNEMONIC_12) slot = agent_id_to_slot("bot-alpha") sub = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) assert sub != seed # --------------------------------------------------------------------------- # run_recover — re-derive from mnemonic # --------------------------------------------------------------------------- class TestRunRecover: """muse auth recover must re-derive the exact same key from the mnemonic.""" def _do_recover( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path, mnemonic: str, extra_args: list[str] | None = None, ) -> tuple[pathlib.Path, InvokeResult]: fake_home = _patch_home(monkeypatch, tmp_path) _patch_keychain(monkeypatch) args = ["auth", "recover", "--hub", _HUB] + (extra_args or []) return fake_home, runner.invoke(None, args, input=mnemonic) def test_recover_exits_zero( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _, result = self._do_recover(monkeypatch, tmp_path, _TEST_MNEMONIC_12, ["--force"]) assert result.exit_code == 0, result.output def test_recover_writes_no_pem( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: fake_home, result = self._do_recover(monkeypatch, tmp_path, _TEST_MNEMONIC_12, ["--force"]) assert result.exit_code == 0 keys_dir = fake_home / ".muse" / "keys" pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] assert pem_files == [], f"PEM files found after recover: {pem_files}" def test_recover_produces_same_fingerprint_as_keygen( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """Key recovered from mnemonic must match the original keygen fingerprint.""" import muse.core.bip39 as bip39_mod fixed_mnemonic = _TEST_MNEMONIC_12 monkeypatch.setattr(bip39_mod, "generate_mnemonic", lambda **kw: fixed_mnemonic) # Keygen fake_home = _patch_home(monkeypatch, tmp_path) _patch_keychain(monkeypatch) keygen_result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"]) assert keygen_result.exit_code == 0, keygen_result.output keygen_fp = json.loads(keygen_result.output.splitlines()[0])["fingerprint"] # Recover into same tmpdir (--force to overwrite) recover_result = runner.invoke( None, ["auth", "recover", "--hub", _HUB, "--force", "--json"], input=fixed_mnemonic, ) assert recover_result.exit_code == 0, recover_result.output recover_fp = json.loads(recover_result.output.splitlines()[0])["fingerprint"] assert keygen_fp == recover_fp, \ f"Recovered fingerprint {recover_fp} != original {keygen_fp}" def test_recover_invalid_mnemonic_exits_nonzero( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _, result = self._do_recover(monkeypatch, tmp_path, "not valid mnemonic words here ok", ["--force"]) assert result.exit_code != 0 def test_recover_json_has_fingerprint( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: _, result = self._do_recover(monkeypatch, tmp_path, _TEST_MNEMONIC_12, ["--force", "--json"]) assert result.exit_code == 0 payload = json.loads(result.output.splitlines()[0]) assert "fingerprint" in payload assert payload["fingerprint"].startswith("sha256:") # --------------------------------------------------------------------------- # Integration — full operator → agent → recover flow # --------------------------------------------------------------------------- class TestIntegrationFlow: """Full flow: human keygen → agent keygen → recover → fingerprints match.""" def test_operator_then_agent_then_recover( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: import muse.core.bip39 as bip39_mod # Use a fixed mnemonic so we can recover without reading from the keychain. fixed_mnemonic = _TEST_MNEMONIC_12 monkeypatch.setattr(bip39_mod, "generate_mnemonic", lambda **kw: fixed_mnemonic) _patch_home(monkeypatch, tmp_path) # 1. Operator keygen r1 = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"]) assert r1.exit_code == 0, r1.output op_payload = json.loads(r1.output.splitlines()[0]) op_fp = op_payload["fingerprint"] # 2. Agent keygen derives from the operator's mnemonic in keychain / ephemeral store r2 = runner.invoke( None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "worker-1", "--json"] ) assert r2.exit_code == 0, r2.output agent_payload = json.loads(r2.output.splitlines()[0]) agent_fp = agent_payload["fingerprint"] assert agent_fp != op_fp, "Agent fingerprint must differ from operator" # 3. Recover operator key via stdin pipe (--force since PEM already exists) r3 = runner.invoke( None, ["auth", "recover", "--hub", _HUB, "--force", "--json"], input=fixed_mnemonic, ) assert r3.exit_code == 0, r3.output recovered_fp = json.loads(r3.output.splitlines()[0])["fingerprint"] assert recovered_fp == op_fp, \ f"Recovered operator fp {recovered_fp!r} != original {op_fp!r}" def test_slot_stability_across_keygen_invocations(self) -> None: """agent_id_to_slot must return the same value before and after any keygen.""" handle = "production-agent-42" slot_before = agent_id_to_slot(handle) # Simulate "after keygen" by just calling again — slot is a pure function slot_after = agent_id_to_slot(handle) assert slot_before == slot_after