test_hd_keygen_unified.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago
| 1 | """Unified TDD tests for HD-only keygen architecture. |
| 2 | |
| 3 | This file validates: |
| 4 | - agent_id_to_slot: stable, deterministic, BIP32-safe slot mapping |
| 5 | - Human keygen: fresh mnemonic, no --hd flag needed (HD is the only mode) |
| 6 | - Agent keygen: derived from operator's mnemonic via derive_agent_sub_seed |
| 7 | - run_recover: re-derives same fingerprint from stored mnemonic |
| 8 | - No JBOK: generate_keypair must not exist |
| 9 | - Integration flow: keygen → agent keygen → recover round-trip |
| 10 | """ |
| 11 | |
| 12 | from __future__ import annotations |
| 13 | |
| 14 | import base64 |
| 15 | import hashlib |
| 16 | import json |
| 17 | import pathlib |
| 18 | |
| 19 | import pytest |
| 20 | |
| 21 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 22 | from muse.core import keypair as kp_module |
| 23 | from muse.core import identity as id_module |
| 24 | from muse.core.bip39 import mnemonic_to_seed, validate_mnemonic |
| 25 | from muse.core.hdkeys import ( |
| 26 | DOMAIN_IDENTITY, |
| 27 | ENTITY_AGENT, |
| 28 | ENTITY_HUMAN, |
| 29 | MUSE_PURPOSE, |
| 30 | ROLE_SIGN, |
| 31 | agent_id_to_slot, |
| 32 | derive_agent_sub_seed, |
| 33 | derive_identity_key, |
| 34 | muse_path, |
| 35 | ) |
| 36 | |
| 37 | runner = CliRunner() |
| 38 | |
| 39 | _HUB = "https://localhost:1337" |
| 40 | _HOSTNAME = "localhost:1337" |
| 41 | # A well-known BIP39 test mnemonic (abandon × 11 + about) |
| 42 | _TEST_MNEMONIC_12 = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" |
| 43 | |
| 44 | |
| 45 | # --------------------------------------------------------------------------- |
| 46 | # Helpers |
| 47 | # --------------------------------------------------------------------------- |
| 48 | |
| 49 | |
| 50 | def _patch_home(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> pathlib.Path: |
| 51 | fake_home = tmp_path / "home" |
| 52 | fake_home.mkdir(parents=True, exist_ok=True) |
| 53 | monkeypatch.setattr(pathlib.Path, "home", staticmethod(lambda: fake_home)) |
| 54 | monkeypatch.setattr(kp_module, "_KEYS_DIR", fake_home / ".muse" / "keys") |
| 55 | monkeypatch.setattr(id_module, "_IDENTITY_DIR", fake_home / ".muse") |
| 56 | monkeypatch.setattr(id_module, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") |
| 57 | return fake_home |
| 58 | |
| 59 | |
| 60 | type _KcStore = dict[str, str] |
| 61 | _kc_store: _KcStore = {} |
| 62 | |
| 63 | |
| 64 | def _patch_keychain(monkeypatch: pytest.MonkeyPatch) -> _KcStore: |
| 65 | """Isolate the keychain from the real OS keychain.""" |
| 66 | _kc_store.clear() |
| 67 | monkeypatch.setattr("muse.core.keychain.is_available", lambda: True) |
| 68 | monkeypatch.setattr("muse.core.keychain.load", lambda: _kc_store.get("mnemonic")) |
| 69 | monkeypatch.setattr("muse.core.keychain.store", lambda m: _kc_store.__setitem__("mnemonic", m)) |
| 70 | monkeypatch.setattr("muse.core.keychain.delete", lambda: _kc_store.pop("mnemonic", None)) |
| 71 | return _kc_store |
| 72 | |
| 73 | |
| 74 | def _keygen(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path, |
| 75 | extra_args: list[str] | None = None) -> tuple[pathlib.Path, InvokeResult]: |
| 76 | """Run ``muse auth keygen --hub <HUB>`` and return (fake_home, result). |
| 77 | |
| 78 | Patches both the home directory and the keychain so tests are hermetic. |
| 79 | """ |
| 80 | fake_home = _patch_home(monkeypatch, tmp_path) |
| 81 | _patch_keychain(monkeypatch) |
| 82 | args = ["auth", "keygen", "--hub", _HUB] + (extra_args or []) |
| 83 | result = runner.invoke(None, args) |
| 84 | return fake_home, result |
| 85 | |
| 86 | |
| 87 | # --------------------------------------------------------------------------- |
| 88 | # agent_id_to_slot — unit tests |
| 89 | # --------------------------------------------------------------------------- |
| 90 | |
| 91 | |
| 92 | class TestAgentIdToSlot: |
| 93 | """agent_id_to_slot must map handle strings to stable, valid BIP32 indices.""" |
| 94 | |
| 95 | def test_returns_int(self) -> None: |
| 96 | slot = agent_id_to_slot("my-agent") |
| 97 | assert isinstance(slot, int) |
| 98 | |
| 99 | def test_in_valid_bip32_range(self) -> None: |
| 100 | """All slots must be in [0, 2^31 - 1] (hardened offset applied by caller).""" |
| 101 | for handle in ["alpha", "beta", "gamma-007", "a" * 100]: |
| 102 | slot = agent_id_to_slot(handle) |
| 103 | assert 0 <= slot <= 0x7FFF_FFFF, f"slot={slot} out of range for {handle!r}" |
| 104 | |
| 105 | def test_deterministic(self) -> None: |
| 106 | """Same handle must always produce the same slot.""" |
| 107 | handle = "agentception-abc123" |
| 108 | assert agent_id_to_slot(handle) == agent_id_to_slot(handle) |
| 109 | |
| 110 | def test_distinct_handles_likely_distinct_slots(self) -> None: |
| 111 | """Different handles should not collide (SHA-256 collision resistance).""" |
| 112 | handles = ["alice", "bob", "carol", "dave", "eve", "frank"] |
| 113 | slots = [agent_id_to_slot(h) for h in handles] |
| 114 | assert len(set(slots)) == len(slots), f"Unexpected slot collision: {slots}" |
| 115 | |
| 116 | def test_known_vector(self) -> None: |
| 117 | """Verify the slot for 'agentception' against a manually computed value.""" |
| 118 | import hashlib as _hashlib |
| 119 | handle = "agentception" |
| 120 | digest = _hashlib.sha256(handle.encode()).digest() |
| 121 | expected = int.from_bytes(digest[:4], "big") & 0x7FFF_FFFF |
| 122 | assert agent_id_to_slot(handle) == expected |
| 123 | |
| 124 | def test_empty_string_handled(self) -> None: |
| 125 | """Edge case: empty string handle should not crash.""" |
| 126 | slot = agent_id_to_slot("") |
| 127 | assert 0 <= slot <= 0x7FFF_FFFF |
| 128 | |
| 129 | def test_unicode_handle(self) -> None: |
| 130 | """Unicode agent handles should produce valid slots.""" |
| 131 | slot = agent_id_to_slot("音楽エージェント") |
| 132 | assert 0 <= slot <= 0x7FFF_FFFF |
| 133 | |
| 134 | |
| 135 | # --------------------------------------------------------------------------- |
| 136 | # No JBOK — generate_keypair must not exist |
| 137 | # --------------------------------------------------------------------------- |
| 138 | |
| 139 | |
| 140 | class TestNoJbok: |
| 141 | """JBOK mode is deleted. generate_keypair must not exist anywhere.""" |
| 142 | |
| 143 | def test_generate_keypair_not_in_module(self) -> None: |
| 144 | import importlib |
| 145 | kp = importlib.import_module("muse.core.keypair") |
| 146 | assert not hasattr(kp, "generate_keypair"), \ |
| 147 | "generate_keypair still exists — JBOK was not fully removed" |
| 148 | |
| 149 | def test_generate_keypair_not_importable(self) -> None: |
| 150 | with pytest.raises(ImportError): |
| 151 | from muse.core.keypair import generate_keypair # noqa: F401 |
| 152 | |
| 153 | |
| 154 | # --------------------------------------------------------------------------- |
| 155 | # Human keygen — no --hd flag, 24-word default |
| 156 | # --------------------------------------------------------------------------- |
| 157 | |
| 158 | |
| 159 | class TestHumanKeygen: |
| 160 | """Human keygen: HD is the only mode. No --hd flag required.""" |
| 161 | |
| 162 | def test_exits_zero( |
| 163 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 164 | ) -> None: |
| 165 | _, result = _keygen(monkeypatch, tmp_path) |
| 166 | assert result.exit_code == 0, result.output |
| 167 | |
| 168 | def test_no_pem_written( |
| 169 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 170 | ) -> None: |
| 171 | """Keygen must NOT write any PEM file — keys live in keychain only.""" |
| 172 | fake_home, result = _keygen(monkeypatch, tmp_path) |
| 173 | keys_dir = fake_home / ".muse" / "keys" |
| 174 | pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] |
| 175 | assert pem_files == [], f"Unexpected PEM files written: {pem_files}" |
| 176 | |
| 177 | def test_hd_path_in_identity_toml( |
| 178 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 179 | ) -> None: |
| 180 | """Identity entry must contain hd_path (derivation anchor) and no key_path.""" |
| 181 | import tomllib |
| 182 | fake_home, result = _keygen(monkeypatch, tmp_path) |
| 183 | assert result.exit_code == 0 |
| 184 | data = tomllib.loads((fake_home / ".muse" / "identity.toml").read_text()) |
| 185 | entry = data[_HOSTNAME] |
| 186 | assert "hd_path" in entry, "hd_path missing from identity.toml" |
| 187 | assert "key_path" not in entry, "key_path must not be written" |
| 188 | |
| 189 | def test_default_24_word_mnemonic( |
| 190 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 191 | ) -> None: |
| 192 | """Default strength=256 produces a 24-word mnemonic stored in keychain.""" |
| 193 | _, result = _keygen(monkeypatch, tmp_path) |
| 194 | assert result.exit_code == 0 |
| 195 | # Mnemonic is never printed; read it from the in-memory keychain store. |
| 196 | mnemonic = _kc_store.get("mnemonic") |
| 197 | assert mnemonic is not None, "mnemonic not stored in keychain" |
| 198 | assert len(mnemonic.split()) == 24, f"Expected 24 words, got: {mnemonic!r}" |
| 199 | assert validate_mnemonic(mnemonic) |
| 200 | |
| 201 | def test_json_no_mnemonic_in_stdout( |
| 202 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 203 | ) -> None: |
| 204 | """Mnemonic is sensitive — must never appear in JSON stdout.""" |
| 205 | _, result = _keygen(monkeypatch, tmp_path, ["--json"]) |
| 206 | payload = json.loads(result.output.splitlines()[0]) |
| 207 | assert "mnemonic" not in payload |
| 208 | |
| 209 | def test_json_mnemonic_word_count_24( |
| 210 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 211 | ) -> None: |
| 212 | _, result = _keygen(monkeypatch, tmp_path, ["--json"]) |
| 213 | payload = json.loads(result.output.splitlines()[0]) |
| 214 | assert payload.get("mnemonic_word_count") == 24 |
| 215 | |
| 216 | def test_identity_toml_has_no_key_source( |
| 217 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 218 | ) -> None: |
| 219 | import tomllib |
| 220 | fake_home, result = _keygen(monkeypatch, tmp_path) |
| 221 | assert result.exit_code == 0 |
| 222 | data = tomllib.loads((fake_home / ".muse" / "identity.toml").read_text()) |
| 223 | assert "key_source" not in data[_HOSTNAME] |
| 224 | |
| 225 | def test_force_overwrites_existing( |
| 226 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 227 | ) -> None: |
| 228 | _keygen(monkeypatch, tmp_path) |
| 229 | _, result = _keygen(monkeypatch, tmp_path, ["--force"]) |
| 230 | assert result.exit_code == 0 |
| 231 | |
| 232 | def test_no_force_rejects_existing( |
| 233 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 234 | ) -> None: |
| 235 | _keygen(monkeypatch, tmp_path) |
| 236 | _, result = _keygen(monkeypatch, tmp_path) # second time, no --force |
| 237 | assert result.exit_code != 0 |
| 238 | |
| 239 | def test_strength_128_gives_12_words( |
| 240 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 241 | ) -> None: |
| 242 | _, result = _keygen(monkeypatch, tmp_path, ["--strength", "128", "--json"]) |
| 243 | assert result.exit_code == 0 |
| 244 | payload = json.loads(result.output.splitlines()[0]) |
| 245 | assert payload["mnemonic_word_count"] == 12 |
| 246 | |
| 247 | |
| 248 | # --------------------------------------------------------------------------- |
| 249 | # Agent keygen — derived from operator's mnemonic |
| 250 | # --------------------------------------------------------------------------- |
| 251 | |
| 252 | |
| 253 | class TestAgentKeygen: |
| 254 | """Agent keys must be derived from the operator's HD mnemonic.""" |
| 255 | |
| 256 | def _setup_operator( |
| 257 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 258 | ) -> pathlib.Path: |
| 259 | """Generate a human (operator) key first, with isolated home + keychain.""" |
| 260 | fake_home = _patch_home(monkeypatch, tmp_path) |
| 261 | _patch_keychain(monkeypatch) |
| 262 | result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB]) |
| 263 | assert result.exit_code == 0, f"Operator keygen failed:\n{result.output}" |
| 264 | return fake_home |
| 265 | |
| 266 | def test_agent_keygen_exits_zero( |
| 267 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 268 | ) -> None: |
| 269 | self._setup_operator(monkeypatch, tmp_path) |
| 270 | result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) |
| 271 | assert result.exit_code == 0, result.output |
| 272 | |
| 273 | def test_agent_no_pem_written( |
| 274 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 275 | ) -> None: |
| 276 | """Agent keygen must not write any PEM file.""" |
| 277 | fake_home = self._setup_operator(monkeypatch, tmp_path) |
| 278 | runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) |
| 279 | keys_dir = fake_home / ".muse" / "keys" |
| 280 | pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] |
| 281 | assert pem_files == [], f"Unexpected PEM files written: {pem_files}" |
| 282 | |
| 283 | def test_agent_hd_path_in_identity_toml( |
| 284 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 285 | ) -> None: |
| 286 | """Agent entry must contain hd_path and no key_path.""" |
| 287 | import tomllib |
| 288 | fake_home = self._setup_operator(monkeypatch, tmp_path) |
| 289 | runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) |
| 290 | data = tomllib.loads((fake_home / ".muse" / "identity.toml").read_text()) |
| 291 | agent_key = f"{_HOSTNAME}#bot-alpha" |
| 292 | assert agent_key in data, f"No entry for {agent_key} in identity.toml" |
| 293 | entry = data[agent_key] |
| 294 | assert "hd_path" in entry, "hd_path missing from agent identity entry" |
| 295 | assert "key_path" not in entry, "key_path must not be written" |
| 296 | |
| 297 | def test_agent_json_has_hd_path( |
| 298 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 299 | ) -> None: |
| 300 | self._setup_operator(monkeypatch, tmp_path) |
| 301 | result = runner.invoke( |
| 302 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] |
| 303 | ) |
| 304 | assert result.exit_code == 0, result.output |
| 305 | payload = json.loads(result.output.splitlines()[0]) |
| 306 | assert "hd_path" in payload |
| 307 | assert str(MUSE_PURPOSE) in payload["hd_path"] |
| 308 | |
| 309 | def test_agent_json_has_provisioned_by( |
| 310 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 311 | ) -> None: |
| 312 | self._setup_operator(monkeypatch, tmp_path) |
| 313 | result = runner.invoke( |
| 314 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] |
| 315 | ) |
| 316 | payload = json.loads(result.output.splitlines()[0]) |
| 317 | assert "provisioned_by_fingerprint" in payload |
| 318 | assert payload["provisioned_by_fingerprint"].startswith("sha256:") |
| 319 | |
| 320 | def test_agent_key_different_from_human_key( |
| 321 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 322 | ) -> None: |
| 323 | """Agent fingerprint must differ from the operator fingerprint.""" |
| 324 | self._setup_operator(monkeypatch, tmp_path) |
| 325 | op_result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"]) |
| 326 | # operator already exists — get fingerprint from identity.toml via agent keygen output |
| 327 | result = runner.invoke( |
| 328 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] |
| 329 | ) |
| 330 | assert result.exit_code == 0, result.output |
| 331 | agent_payload = json.loads(result.output.splitlines()[0]) |
| 332 | human_payload = json.loads(op_result.output.splitlines()[0]) if op_result.exit_code == 0 else None |
| 333 | |
| 334 | if human_payload: |
| 335 | assert agent_payload["fingerprint"] != human_payload["fingerprint"], \ |
| 336 | "Agent and human keys must be distinct" |
| 337 | # Also verify provisioned_by differs from the agent fingerprint |
| 338 | assert agent_payload["fingerprint"] != agent_payload["provisioned_by_fingerprint"] |
| 339 | |
| 340 | def test_two_agents_have_distinct_keys( |
| 341 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 342 | ) -> None: |
| 343 | """Different agent handles must produce different fingerprints.""" |
| 344 | self._setup_operator(monkeypatch, tmp_path) |
| 345 | result_a = runner.invoke( |
| 346 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] |
| 347 | ) |
| 348 | result_b = runner.invoke( |
| 349 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-beta", "--json"] |
| 350 | ) |
| 351 | assert result_a.exit_code == 0, result_a.output |
| 352 | assert result_b.exit_code == 0, result_b.output |
| 353 | fp_a = json.loads(result_a.output.splitlines()[0])["fingerprint"] |
| 354 | fp_b = json.loads(result_b.output.splitlines()[0])["fingerprint"] |
| 355 | assert fp_a != fp_b, "Different agent handles must produce different keys" |
| 356 | |
| 357 | def test_agent_keygen_without_operator_exits_nonzero( |
| 358 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 359 | ) -> None: |
| 360 | """Attempt to derive agent key before operator key is set up.""" |
| 361 | _patch_home(monkeypatch, tmp_path) |
| 362 | result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha"]) |
| 363 | assert result.exit_code != 0 |
| 364 | |
| 365 | def test_agent_key_deterministic( |
| 366 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 367 | ) -> None: |
| 368 | """Same operator mnemonic + same agent handle = same agent key.""" |
| 369 | fake_home = self._setup_operator(monkeypatch, tmp_path) |
| 370 | result1 = runner.invoke( |
| 371 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--json"] |
| 372 | ) |
| 373 | fp1 = json.loads(result1.output.splitlines()[0])["fingerprint"] |
| 374 | |
| 375 | # Re-derive: force-overwrite the agent key (same operator mnemonic on disk) |
| 376 | result2 = runner.invoke( |
| 377 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "bot-alpha", "--force", "--json"] |
| 378 | ) |
| 379 | fp2 = json.loads(result2.output.splitlines()[0])["fingerprint"] |
| 380 | assert fp1 == fp2, "Agent key not deterministic given same operator mnemonic + handle" |
| 381 | |
| 382 | |
| 383 | # --------------------------------------------------------------------------- |
| 384 | # derive_agent_sub_seed — unit tests (no CLI) |
| 385 | # --------------------------------------------------------------------------- |
| 386 | |
| 387 | |
| 388 | class TestDeriveAgentSubSeed: |
| 389 | """derive_agent_sub_seed must produce stable, domain-isolated sub-seeds.""" |
| 390 | |
| 391 | def test_returns_64_bytes(self) -> None: |
| 392 | seed = mnemonic_to_seed(_TEST_MNEMONIC_12) |
| 393 | slot = agent_id_to_slot("bot-alpha") |
| 394 | sub_seed = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) |
| 395 | assert len(sub_seed) == 64 |
| 396 | |
| 397 | def test_deterministic(self) -> None: |
| 398 | seed = mnemonic_to_seed(_TEST_MNEMONIC_12) |
| 399 | slot = agent_id_to_slot("bot-alpha") |
| 400 | s1 = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) |
| 401 | s2 = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) |
| 402 | assert s1 == s2 |
| 403 | |
| 404 | def test_different_slots_different_sub_seeds(self) -> None: |
| 405 | seed = mnemonic_to_seed(_TEST_MNEMONIC_12) |
| 406 | slot_a = agent_id_to_slot("bot-alpha") |
| 407 | slot_b = agent_id_to_slot("bot-beta") |
| 408 | assert slot_a != slot_b |
| 409 | sub_a = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot_a) |
| 410 | sub_b = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot_b) |
| 411 | assert sub_a != sub_b |
| 412 | |
| 413 | def test_different_domains_different_sub_seeds(self) -> None: |
| 414 | seed = mnemonic_to_seed(_TEST_MNEMONIC_12) |
| 415 | slot = agent_id_to_slot("bot-alpha") |
| 416 | DOMAIN_PAYMENTS = 1 |
| 417 | sub_id = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) |
| 418 | sub_pay = derive_agent_sub_seed(seed, DOMAIN_PAYMENTS, slot) |
| 419 | assert sub_id != sub_pay |
| 420 | |
| 421 | def test_sub_seed_differs_from_parent_seed(self) -> None: |
| 422 | seed = mnemonic_to_seed(_TEST_MNEMONIC_12) |
| 423 | slot = agent_id_to_slot("bot-alpha") |
| 424 | sub = derive_agent_sub_seed(seed, DOMAIN_IDENTITY, slot) |
| 425 | assert sub != seed |
| 426 | |
| 427 | |
| 428 | # --------------------------------------------------------------------------- |
| 429 | # run_recover — re-derive from mnemonic |
| 430 | # --------------------------------------------------------------------------- |
| 431 | |
| 432 | |
| 433 | class TestRunRecover: |
| 434 | """muse auth recover must re-derive the exact same key from the mnemonic.""" |
| 435 | |
| 436 | def _do_recover( |
| 437 | self, |
| 438 | monkeypatch: pytest.MonkeyPatch, |
| 439 | tmp_path: pathlib.Path, |
| 440 | mnemonic: str, |
| 441 | extra_args: list[str] | None = None, |
| 442 | ) -> tuple[pathlib.Path, InvokeResult]: |
| 443 | fake_home = _patch_home(monkeypatch, tmp_path) |
| 444 | _patch_keychain(monkeypatch) |
| 445 | args = ["auth", "recover", "--hub", _HUB] + (extra_args or []) |
| 446 | return fake_home, runner.invoke(None, args, input=mnemonic) |
| 447 | |
| 448 | def test_recover_exits_zero( |
| 449 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 450 | ) -> None: |
| 451 | _, result = self._do_recover(monkeypatch, tmp_path, _TEST_MNEMONIC_12, ["--force"]) |
| 452 | assert result.exit_code == 0, result.output |
| 453 | |
| 454 | def test_recover_writes_no_pem( |
| 455 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 456 | ) -> None: |
| 457 | fake_home, result = self._do_recover(monkeypatch, tmp_path, _TEST_MNEMONIC_12, ["--force"]) |
| 458 | assert result.exit_code == 0 |
| 459 | keys_dir = fake_home / ".muse" / "keys" |
| 460 | pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] |
| 461 | assert pem_files == [], f"PEM files found after recover: {pem_files}" |
| 462 | |
| 463 | def test_recover_produces_same_fingerprint_as_keygen( |
| 464 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 465 | ) -> None: |
| 466 | """Key recovered from mnemonic must match the original keygen fingerprint.""" |
| 467 | import muse.core.bip39 as bip39_mod |
| 468 | |
| 469 | fixed_mnemonic = _TEST_MNEMONIC_12 |
| 470 | monkeypatch.setattr(bip39_mod, "generate_mnemonic", lambda **kw: fixed_mnemonic) |
| 471 | |
| 472 | # Keygen |
| 473 | fake_home = _patch_home(monkeypatch, tmp_path) |
| 474 | _patch_keychain(monkeypatch) |
| 475 | keygen_result = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"]) |
| 476 | assert keygen_result.exit_code == 0, keygen_result.output |
| 477 | keygen_fp = json.loads(keygen_result.output.splitlines()[0])["fingerprint"] |
| 478 | |
| 479 | # Recover into same tmpdir (--force to overwrite) |
| 480 | recover_result = runner.invoke( |
| 481 | None, |
| 482 | ["auth", "recover", "--hub", _HUB, "--force", "--json"], |
| 483 | input=fixed_mnemonic, |
| 484 | ) |
| 485 | assert recover_result.exit_code == 0, recover_result.output |
| 486 | recover_fp = json.loads(recover_result.output.splitlines()[0])["fingerprint"] |
| 487 | |
| 488 | assert keygen_fp == recover_fp, \ |
| 489 | f"Recovered fingerprint {recover_fp} != original {keygen_fp}" |
| 490 | |
| 491 | def test_recover_invalid_mnemonic_exits_nonzero( |
| 492 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 493 | ) -> None: |
| 494 | _, result = self._do_recover(monkeypatch, tmp_path, "not valid mnemonic words here ok", ["--force"]) |
| 495 | assert result.exit_code != 0 |
| 496 | |
| 497 | def test_recover_json_has_fingerprint( |
| 498 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 499 | ) -> None: |
| 500 | _, result = self._do_recover(monkeypatch, tmp_path, _TEST_MNEMONIC_12, ["--force", "--json"]) |
| 501 | assert result.exit_code == 0 |
| 502 | payload = json.loads(result.output.splitlines()[0]) |
| 503 | assert "fingerprint" in payload |
| 504 | assert payload["fingerprint"].startswith("sha256:") |
| 505 | |
| 506 | |
| 507 | # --------------------------------------------------------------------------- |
| 508 | # Integration — full operator → agent → recover flow |
| 509 | # --------------------------------------------------------------------------- |
| 510 | |
| 511 | |
| 512 | class TestIntegrationFlow: |
| 513 | """Full flow: human keygen → agent keygen → recover → fingerprints match.""" |
| 514 | |
| 515 | def test_operator_then_agent_then_recover( |
| 516 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 517 | ) -> None: |
| 518 | import muse.core.bip39 as bip39_mod |
| 519 | # Use a fixed mnemonic so we can recover without reading from the keychain. |
| 520 | fixed_mnemonic = _TEST_MNEMONIC_12 |
| 521 | monkeypatch.setattr(bip39_mod, "generate_mnemonic", lambda **kw: fixed_mnemonic) |
| 522 | |
| 523 | _patch_home(monkeypatch, tmp_path) |
| 524 | |
| 525 | # 1. Operator keygen |
| 526 | r1 = runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"]) |
| 527 | assert r1.exit_code == 0, r1.output |
| 528 | op_payload = json.loads(r1.output.splitlines()[0]) |
| 529 | op_fp = op_payload["fingerprint"] |
| 530 | |
| 531 | # 2. Agent keygen derives from the operator's mnemonic in keychain / ephemeral store |
| 532 | r2 = runner.invoke( |
| 533 | None, ["auth", "keygen", "--hub", _HUB, "--agent-id", "worker-1", "--json"] |
| 534 | ) |
| 535 | assert r2.exit_code == 0, r2.output |
| 536 | agent_payload = json.loads(r2.output.splitlines()[0]) |
| 537 | agent_fp = agent_payload["fingerprint"] |
| 538 | assert agent_fp != op_fp, "Agent fingerprint must differ from operator" |
| 539 | |
| 540 | # 3. Recover operator key via stdin pipe (--force since PEM already exists) |
| 541 | r3 = runner.invoke( |
| 542 | None, |
| 543 | ["auth", "recover", "--hub", _HUB, "--force", "--json"], |
| 544 | input=fixed_mnemonic, |
| 545 | ) |
| 546 | assert r3.exit_code == 0, r3.output |
| 547 | recovered_fp = json.loads(r3.output.splitlines()[0])["fingerprint"] |
| 548 | assert recovered_fp == op_fp, \ |
| 549 | f"Recovered operator fp {recovered_fp!r} != original {op_fp!r}" |
| 550 | |
| 551 | def test_slot_stability_across_keygen_invocations(self) -> None: |
| 552 | """agent_id_to_slot must return the same value before and after any keygen.""" |
| 553 | handle = "production-agent-42" |
| 554 | slot_before = agent_id_to_slot(handle) |
| 555 | # Simulate "after keygen" by just calling again — slot is a pure function |
| 556 | slot_after = agent_id_to_slot(handle) |
| 557 | assert slot_before == slot_after |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago