"""Tests for fd-based agent key injection — Tier 3. MUSE_AGENT_KEY_FD is the only supported env-var mechanism for injecting a sub-seed into an agent subprocess. The old MUSE_AGENT_HD_SEED and MUSE_AGENT_KEY env vars are removed. Protocol: 1. Parent creates an anonymous pipe (r_fd, w_fd). 2. Parent writes exactly 64 bytes of sub-seed to w_fd, closes w_fd. 3. Parent sets MUSE_AGENT_KEY_FD=str(r_fd) and spawns child with pass_fds=(r_fd,). 4. Child (get_signing_identity) reads exactly 64 bytes from r_fd, closes r_fd. 5. Child derives Ed25519 private key from sub_seed via derive_identity_key. 6. Secret never appears in /proc//environ. Coverage -------- I get_signing_identity — fd injection I1 MUSE_AGENT_KEY_FD reads 64 bytes, returns valid SigningIdentity I2 derived key is deterministic for the same sub-seed I3 fd is closed after read (cannot be read a second time) I4 MUSE_AGENT_HANDLE sets the identity handle I5 handle defaults to "agent" when MUSE_AGENT_HANDLE is unset II Priority and fallback II1 MUSE_AGENT_KEY_FD takes priority over identity store II2 falls through to identity store when MUSE_AGENT_KEY_FD is unset III Error handling III1 invalid fd number → falls through (does not crash) III2 fd with wrong byte count → falls through III3 MUSE_AGENT_HD_SEED is no longer recognised III4 MUSE_AGENT_KEY is no longer recognised IV Security IV1 sub-seed does not appear in any log output IV2 two different sub-seeds produce two different signing keys """ from __future__ import annotations import os import pathlib import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.slip010 import DerivedKey _TEST_MNEMONIC = ( "abandon abandon abandon abandon abandon abandon abandon abandon " "abandon abandon abandon about" ) def _make_sub_seed(account: int = 1) -> bytes: """Derive a real 64-byte IDENTITY-domain agent sub-seed.""" from muse.core.bip39 import mnemonic_to_seed from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed seed = mnemonic_to_seed(_TEST_MNEMONIC) return derive_agent_sub_seed(seed, domain=DOMAIN_IDENTITY, agent_id=account) def _pipe_with_seed(sub_seed: bytes) -> int: """Create a pipe, write sub_seed, close write end, return read fd.""" r_fd, w_fd = os.pipe() os.write(w_fd, sub_seed) os.close(w_fd) return r_fd # --------------------------------------------------------------------------- # I get_signing_identity — fd injection # --------------------------------------------------------------------------- class TestFdInjectionI: def test_I1_fd_returns_signing_identity( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """I1: MUSE_AGENT_KEY_FD yields a valid SigningIdentity.""" from muse.cli.config import get_signing_identity sub_seed = _make_sub_seed(account=1) r_fd = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) result = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd) except OSError: pass assert result is not None assert isinstance(result.private_key, Ed25519PrivateKey) def test_I2_deterministic_key_from_same_seed( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """I2: same sub-seed always produces the same signing key.""" from muse.cli.config import get_signing_identity sub_seed = _make_sub_seed(account=2) r_fd1 = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd1)) result1 = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd1) except OSError: pass r_fd2 = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd2)) result2 = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd2) except OSError: pass assert result1 is not None and result2 is not None # Same key material → same public key bytes from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat pub1 = result1.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) pub2 = result2.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) assert pub1 == pub2 def test_I3_fd_closed_after_read( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """I3: the fd is closed by get_signing_identity — cannot be read again.""" from muse.cli.config import get_signing_identity sub_seed = _make_sub_seed(account=3) r_fd = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) get_signing_identity(repo_root=tmp_path) # fd must be closed with pytest.raises(OSError): os.read(r_fd, 1) def test_I4_muse_agent_handle_sets_handle( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """I4: MUSE_AGENT_HANDLE sets the identity handle.""" from muse.cli.config import get_signing_identity sub_seed = _make_sub_seed(account=4) r_fd = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) monkeypatch.setenv("MUSE_AGENT_HANDLE", "my-agent-001") result = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd) except OSError: pass assert result is not None assert result.handle == "my-agent-001" def test_I5_default_handle_is_agent( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """I5: handle defaults to 'agent' when MUSE_AGENT_HANDLE is not set.""" from muse.cli.config import get_signing_identity sub_seed = _make_sub_seed(account=5) r_fd = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) result = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd) except OSError: pass assert result is not None assert result.handle == "agent" # --------------------------------------------------------------------------- # II Priority and fallback # --------------------------------------------------------------------------- class TestPriorityII: def test_II1_fd_takes_priority_over_identity_store( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path, ) -> None: """II1: MUSE_AGENT_KEY_FD takes priority over file-based identity.""" import muse.core.identity as id_mod # Patch resolve_signing_identity so we know if it was called called = [] orig = id_mod.resolve_signing_identity monkeypatch.setattr(id_mod, "resolve_signing_identity", lambda *a, **kw: (called.append(True), orig(*a, **kw))[1]) from muse.cli.config import get_signing_identity sub_seed = _make_sub_seed(account=6) r_fd = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) result = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd) except OSError: pass assert result is not None assert not called, "identity store was consulted despite MUSE_AGENT_KEY_FD being set" def test_II2_falls_through_to_identity_store_when_unset( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path, ) -> None: """II2: without MUSE_AGENT_KEY_FD, falls through to identity store (returns None).""" from muse.cli.config import get_signing_identity monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False) monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) # No identity configured → returns None result = get_signing_identity(repo_root=tmp_path) assert result is None # --------------------------------------------------------------------------- # III Error handling # --------------------------------------------------------------------------- class TestErrorHandlingIII: def test_III1_invalid_fd_falls_through( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """III1: an invalid fd number falls through gracefully (no crash).""" from muse.cli.config import get_signing_identity monkeypatch.setenv("MUSE_AGENT_KEY_FD", "9999") # certainly not open # Should not raise — just falls through to identity store (returns None) result = get_signing_identity(repo_root=tmp_path) assert result is None def test_III2_wrong_byte_count_falls_through( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """III2: fewer than 64 bytes in the pipe → falls through.""" from muse.cli.config import get_signing_identity r_fd, w_fd = os.pipe() os.write(w_fd, b"\x00" * 32) # 32 bytes, not 64 os.close(w_fd) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) result = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd) except OSError: pass assert result is None def test_III3_muse_agent_hd_seed_not_recognised( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """III3: MUSE_AGENT_HD_SEED is no longer supported — setting it has no effect.""" from muse.cli.config import get_signing_identity from muse.core.bip39 import mnemonic_to_seed from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed from muse.core.types import b64url_encode sub_seed = _make_sub_seed(account=7) seed_b64 = b64url_encode(sub_seed) monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False) monkeypatch.setenv("MUSE_AGENT_HD_SEED", seed_b64) # Should NOT return a signing identity (env var is removed) result = get_signing_identity(repo_root=tmp_path) assert result is None, ( "MUSE_AGENT_HD_SEED should be ignored but returned a signing identity" ) def test_III4_muse_agent_key_not_recognised( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """III4: MUSE_AGENT_KEY (PEM env var) is no longer supported.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import ( Encoding, PrivateFormat, NoEncryption, ) from muse.cli.config import get_signing_identity key = Ed25519PrivateKey.generate() pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode() monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False) monkeypatch.setenv("MUSE_AGENT_KEY", pem) result = get_signing_identity(repo_root=tmp_path) assert result is None, ( "MUSE_AGENT_KEY should be ignored but returned a signing identity" ) # --------------------------------------------------------------------------- # IV Security # --------------------------------------------------------------------------- class TestSecurityIV: def test_IV1_sub_seed_not_in_environ( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """IV1: the sub-seed bytes never appear in os.environ.""" from muse.cli.config import get_signing_identity sub_seed = _make_sub_seed(account=8) r_fd = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) get_signing_identity(repo_root=tmp_path) try: os.close(r_fd) except OSError: pass # The raw bytes and any base64 encoding of them must not be in environ from muse.core.types import b64url_encode seed_b64 = b64url_encode(sub_seed) for val in os.environ.values(): assert seed_b64 not in val, "sub-seed base64 found in os.environ" def test_IV2_different_seeds_different_keys( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """IV2: two different sub-seeds produce two different signing keys.""" from muse.cli.config import get_signing_identity from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat sub_seed_a = _make_sub_seed(account=9) sub_seed_b = _make_sub_seed(account=10) r_fd_a = _pipe_with_seed(sub_seed_a) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_a)) result_a = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd_a) except OSError: pass r_fd_b = _pipe_with_seed(sub_seed_b) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_b)) result_b = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd_b) except OSError: pass assert result_a is not None and result_b is not None pub_a = result_a.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) pub_b = result_b.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) assert pub_a != pub_b, "Different sub-seeds produced identical keys" def test_IV3_sub_seed_zeroed_after_use( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: """IV3: the sub-seed buffer must be zeroed in memory after key derivation. CRITICAL-2: a lingering plaintext sub-seed in RAM can be recovered from a core dump or via /proc//mem. The buffer must be all-zero before get_signing_identity returns. Strategy: patch muse.core.hdkeys.derive_identity_key to capture a reference to the buffer passed by the caller. After get_signing_identity returns we verify: 1. The buffer is a bytearray (mutable, so it *can* be zeroed). 2. Every byte is 0x00 (was zeroed before returning). """ import os from unittest.mock import patch from muse.cli.config import get_signing_identity from muse.core import hdkeys as _hdkeys sub_seed = _make_sub_seed(account=99) r_fd = _pipe_with_seed(sub_seed) monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) captured = [] original_derive = _hdkeys.derive_identity_key def capturing_derive(seed: bytes) -> DerivedKey: captured.append(seed) # keep a reference — will survive zeroing return original_derive(seed) with patch.object(_hdkeys, "derive_identity_key", side_effect=capturing_derive): result = get_signing_identity(repo_root=tmp_path) try: os.close(r_fd) except OSError: pass assert result is not None, "get_signing_identity must still succeed" assert len(captured) == 1, "derive_identity_key must be called exactly once" buf = captured[0] assert isinstance(buf, bytearray), ( f"sub-seed must be passed as bytearray (got {type(buf).__name__}) " "so it can be zeroed after use" ) assert buf == bytearray(64), ( "sub-seed buffer must be all-zero after get_signing_identity returns" )