test_agent_key_fd.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
7 days ago
| 1 | """Tests for fd-based agent key injection — Tier 3. |
| 2 | |
| 3 | MUSE_AGENT_KEY_FD is the only supported env-var mechanism for injecting |
| 4 | a sub-seed into an agent subprocess. The old MUSE_AGENT_HD_SEED and |
| 5 | MUSE_AGENT_KEY env vars are removed. |
| 6 | |
| 7 | Protocol: |
| 8 | 1. Parent creates an anonymous pipe (r_fd, w_fd). |
| 9 | 2. Parent writes exactly 64 bytes of sub-seed to w_fd, closes w_fd. |
| 10 | 3. Parent sets MUSE_AGENT_KEY_FD=str(r_fd) and spawns child with pass_fds=(r_fd,). |
| 11 | 4. Child (get_signing_identity) reads exactly 64 bytes from r_fd, closes r_fd. |
| 12 | 5. Child derives Ed25519 private key from sub_seed via derive_identity_key. |
| 13 | 6. Secret never appears in /proc/<pid>/environ. |
| 14 | |
| 15 | Coverage |
| 16 | -------- |
| 17 | I get_signing_identity — fd injection |
| 18 | I1 MUSE_AGENT_KEY_FD reads 64 bytes, returns valid SigningIdentity |
| 19 | I2 derived key is deterministic for the same sub-seed |
| 20 | I3 fd is closed after read (cannot be read a second time) |
| 21 | I4 MUSE_AGENT_HANDLE sets the identity handle |
| 22 | I5 handle defaults to "agent" when MUSE_AGENT_HANDLE is unset |
| 23 | |
| 24 | II Priority and fallback |
| 25 | II1 MUSE_AGENT_KEY_FD takes priority over identity store |
| 26 | II2 falls through to identity store when MUSE_AGENT_KEY_FD is unset |
| 27 | |
| 28 | III Error handling |
| 29 | III1 invalid fd number → falls through (does not crash) |
| 30 | III2 fd with wrong byte count → falls through |
| 31 | III3 MUSE_AGENT_HD_SEED is no longer recognised |
| 32 | III4 MUSE_AGENT_KEY is no longer recognised |
| 33 | |
| 34 | IV Security |
| 35 | IV1 sub-seed does not appear in any log output |
| 36 | IV2 two different sub-seeds produce two different signing keys |
| 37 | """ |
| 38 | |
| 39 | from __future__ import annotations |
| 40 | |
| 41 | import os |
| 42 | import pathlib |
| 43 | |
| 44 | import pytest |
| 45 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 46 | from muse.core.slip010 import DerivedKey |
| 47 | |
| 48 | _TEST_MNEMONIC = ( |
| 49 | "abandon abandon abandon abandon abandon abandon abandon abandon " |
| 50 | "abandon abandon abandon about" |
| 51 | ) |
| 52 | |
| 53 | |
| 54 | def _make_sub_seed(account: int = 1) -> bytes: |
| 55 | """Derive a real 64-byte IDENTITY-domain agent sub-seed.""" |
| 56 | from muse.core.bip39 import mnemonic_to_seed |
| 57 | from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed |
| 58 | seed = mnemonic_to_seed(_TEST_MNEMONIC) |
| 59 | return derive_agent_sub_seed(seed, domain=DOMAIN_IDENTITY, agent_id=account) |
| 60 | |
| 61 | |
| 62 | def _pipe_with_seed(sub_seed: bytes) -> int: |
| 63 | """Create a pipe, write sub_seed, close write end, return read fd.""" |
| 64 | r_fd, w_fd = os.pipe() |
| 65 | os.write(w_fd, sub_seed) |
| 66 | os.close(w_fd) |
| 67 | return r_fd |
| 68 | |
| 69 | |
| 70 | # --------------------------------------------------------------------------- |
| 71 | # I get_signing_identity — fd injection |
| 72 | # --------------------------------------------------------------------------- |
| 73 | |
| 74 | |
| 75 | class TestFdInjectionI: |
| 76 | def test_I1_fd_returns_signing_identity( |
| 77 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 78 | ) -> None: |
| 79 | """I1: MUSE_AGENT_KEY_FD yields a valid SigningIdentity.""" |
| 80 | from muse.cli.config import get_signing_identity |
| 81 | |
| 82 | sub_seed = _make_sub_seed(account=1) |
| 83 | r_fd = _pipe_with_seed(sub_seed) |
| 84 | |
| 85 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 86 | monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) |
| 87 | |
| 88 | result = get_signing_identity(repo_root=tmp_path) |
| 89 | try: |
| 90 | os.close(r_fd) |
| 91 | except OSError: |
| 92 | pass |
| 93 | |
| 94 | assert result is not None |
| 95 | assert isinstance(result.private_key, Ed25519PrivateKey) |
| 96 | |
| 97 | def test_I2_deterministic_key_from_same_seed( |
| 98 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 99 | ) -> None: |
| 100 | """I2: same sub-seed always produces the same signing key.""" |
| 101 | from muse.cli.config import get_signing_identity |
| 102 | |
| 103 | sub_seed = _make_sub_seed(account=2) |
| 104 | |
| 105 | r_fd1 = _pipe_with_seed(sub_seed) |
| 106 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd1)) |
| 107 | result1 = get_signing_identity(repo_root=tmp_path) |
| 108 | try: |
| 109 | os.close(r_fd1) |
| 110 | except OSError: |
| 111 | pass |
| 112 | |
| 113 | r_fd2 = _pipe_with_seed(sub_seed) |
| 114 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd2)) |
| 115 | result2 = get_signing_identity(repo_root=tmp_path) |
| 116 | try: |
| 117 | os.close(r_fd2) |
| 118 | except OSError: |
| 119 | pass |
| 120 | |
| 121 | assert result1 is not None and result2 is not None |
| 122 | # Same key material → same public key bytes |
| 123 | from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
| 124 | pub1 = result1.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 125 | pub2 = result2.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 126 | assert pub1 == pub2 |
| 127 | |
| 128 | def test_I3_fd_closed_after_read( |
| 129 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 130 | ) -> None: |
| 131 | """I3: the fd is closed by get_signing_identity — cannot be read again.""" |
| 132 | from muse.cli.config import get_signing_identity |
| 133 | |
| 134 | sub_seed = _make_sub_seed(account=3) |
| 135 | r_fd = _pipe_with_seed(sub_seed) |
| 136 | |
| 137 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 138 | get_signing_identity(repo_root=tmp_path) |
| 139 | |
| 140 | # fd must be closed |
| 141 | with pytest.raises(OSError): |
| 142 | os.read(r_fd, 1) |
| 143 | |
| 144 | def test_I4_muse_agent_handle_sets_handle( |
| 145 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 146 | ) -> None: |
| 147 | """I4: MUSE_AGENT_HANDLE sets the identity handle.""" |
| 148 | from muse.cli.config import get_signing_identity |
| 149 | |
| 150 | sub_seed = _make_sub_seed(account=4) |
| 151 | r_fd = _pipe_with_seed(sub_seed) |
| 152 | |
| 153 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 154 | monkeypatch.setenv("MUSE_AGENT_HANDLE", "my-agent-001") |
| 155 | |
| 156 | result = get_signing_identity(repo_root=tmp_path) |
| 157 | try: |
| 158 | os.close(r_fd) |
| 159 | except OSError: |
| 160 | pass |
| 161 | |
| 162 | assert result is not None |
| 163 | assert result.handle == "my-agent-001" |
| 164 | |
| 165 | def test_I5_default_handle_is_agent( |
| 166 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 167 | ) -> None: |
| 168 | """I5: handle defaults to 'agent' when MUSE_AGENT_HANDLE is not set.""" |
| 169 | from muse.cli.config import get_signing_identity |
| 170 | |
| 171 | sub_seed = _make_sub_seed(account=5) |
| 172 | r_fd = _pipe_with_seed(sub_seed) |
| 173 | |
| 174 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 175 | monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) |
| 176 | |
| 177 | result = get_signing_identity(repo_root=tmp_path) |
| 178 | try: |
| 179 | os.close(r_fd) |
| 180 | except OSError: |
| 181 | pass |
| 182 | |
| 183 | assert result is not None |
| 184 | assert result.handle == "agent" |
| 185 | |
| 186 | |
| 187 | # --------------------------------------------------------------------------- |
| 188 | # II Priority and fallback |
| 189 | # --------------------------------------------------------------------------- |
| 190 | |
| 191 | |
| 192 | class TestPriorityII: |
| 193 | def test_II1_fd_takes_priority_over_identity_store( |
| 194 | self, |
| 195 | monkeypatch: pytest.MonkeyPatch, |
| 196 | tmp_path: pathlib.Path, |
| 197 | ) -> None: |
| 198 | """II1: MUSE_AGENT_KEY_FD takes priority over file-based identity.""" |
| 199 | import muse.core.identity as id_mod |
| 200 | # Patch resolve_signing_identity so we know if it was called |
| 201 | called = [] |
| 202 | orig = id_mod.resolve_signing_identity |
| 203 | monkeypatch.setattr(id_mod, "resolve_signing_identity", |
| 204 | lambda *a, **kw: (called.append(True), orig(*a, **kw))[1]) |
| 205 | |
| 206 | from muse.cli.config import get_signing_identity |
| 207 | sub_seed = _make_sub_seed(account=6) |
| 208 | r_fd = _pipe_with_seed(sub_seed) |
| 209 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 210 | |
| 211 | result = get_signing_identity(repo_root=tmp_path) |
| 212 | try: |
| 213 | os.close(r_fd) |
| 214 | except OSError: |
| 215 | pass |
| 216 | |
| 217 | assert result is not None |
| 218 | assert not called, "identity store was consulted despite MUSE_AGENT_KEY_FD being set" |
| 219 | |
| 220 | def test_II2_falls_through_to_identity_store_when_unset( |
| 221 | self, |
| 222 | monkeypatch: pytest.MonkeyPatch, |
| 223 | tmp_path: pathlib.Path, |
| 224 | ) -> None: |
| 225 | """II2: without MUSE_AGENT_KEY_FD, falls through to identity store (returns None).""" |
| 226 | from muse.cli.config import get_signing_identity |
| 227 | monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False) |
| 228 | monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) |
| 229 | |
| 230 | # No identity configured → returns None |
| 231 | result = get_signing_identity(repo_root=tmp_path) |
| 232 | assert result is None |
| 233 | |
| 234 | |
| 235 | # --------------------------------------------------------------------------- |
| 236 | # III Error handling |
| 237 | # --------------------------------------------------------------------------- |
| 238 | |
| 239 | |
| 240 | class TestErrorHandlingIII: |
| 241 | def test_III1_invalid_fd_falls_through( |
| 242 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 243 | ) -> None: |
| 244 | """III1: an invalid fd number falls through gracefully (no crash).""" |
| 245 | from muse.cli.config import get_signing_identity |
| 246 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", "9999") # certainly not open |
| 247 | |
| 248 | # Should not raise — just falls through to identity store (returns None) |
| 249 | result = get_signing_identity(repo_root=tmp_path) |
| 250 | assert result is None |
| 251 | |
| 252 | def test_III2_wrong_byte_count_falls_through( |
| 253 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 254 | ) -> None: |
| 255 | """III2: fewer than 64 bytes in the pipe → falls through.""" |
| 256 | from muse.cli.config import get_signing_identity |
| 257 | |
| 258 | r_fd, w_fd = os.pipe() |
| 259 | os.write(w_fd, b"\x00" * 32) # 32 bytes, not 64 |
| 260 | os.close(w_fd) |
| 261 | |
| 262 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 263 | |
| 264 | result = get_signing_identity(repo_root=tmp_path) |
| 265 | try: |
| 266 | os.close(r_fd) |
| 267 | except OSError: |
| 268 | pass |
| 269 | |
| 270 | assert result is None |
| 271 | |
| 272 | def test_III3_muse_agent_hd_seed_not_recognised( |
| 273 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 274 | ) -> None: |
| 275 | """III3: MUSE_AGENT_HD_SEED is no longer supported — setting it has no effect.""" |
| 276 | from muse.cli.config import get_signing_identity |
| 277 | from muse.core.bip39 import mnemonic_to_seed |
| 278 | from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed |
| 279 | from muse.core.types import b64url_encode |
| 280 | |
| 281 | sub_seed = _make_sub_seed(account=7) |
| 282 | seed_b64 = b64url_encode(sub_seed) |
| 283 | |
| 284 | monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False) |
| 285 | monkeypatch.setenv("MUSE_AGENT_HD_SEED", seed_b64) |
| 286 | |
| 287 | # Should NOT return a signing identity (env var is removed) |
| 288 | result = get_signing_identity(repo_root=tmp_path) |
| 289 | assert result is None, ( |
| 290 | "MUSE_AGENT_HD_SEED should be ignored but returned a signing identity" |
| 291 | ) |
| 292 | |
| 293 | def test_III4_muse_agent_key_not_recognised( |
| 294 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 295 | ) -> None: |
| 296 | """III4: MUSE_AGENT_KEY (PEM env var) is no longer supported.""" |
| 297 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 298 | from cryptography.hazmat.primitives.serialization import ( |
| 299 | Encoding, PrivateFormat, NoEncryption, |
| 300 | ) |
| 301 | from muse.cli.config import get_signing_identity |
| 302 | |
| 303 | key = Ed25519PrivateKey.generate() |
| 304 | pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode() |
| 305 | |
| 306 | monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False) |
| 307 | monkeypatch.setenv("MUSE_AGENT_KEY", pem) |
| 308 | |
| 309 | result = get_signing_identity(repo_root=tmp_path) |
| 310 | assert result is None, ( |
| 311 | "MUSE_AGENT_KEY should be ignored but returned a signing identity" |
| 312 | ) |
| 313 | |
| 314 | |
| 315 | # --------------------------------------------------------------------------- |
| 316 | # IV Security |
| 317 | # --------------------------------------------------------------------------- |
| 318 | |
| 319 | |
| 320 | class TestSecurityIV: |
| 321 | def test_IV1_sub_seed_not_in_environ( |
| 322 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 323 | ) -> None: |
| 324 | """IV1: the sub-seed bytes never appear in os.environ.""" |
| 325 | from muse.cli.config import get_signing_identity |
| 326 | |
| 327 | sub_seed = _make_sub_seed(account=8) |
| 328 | r_fd = _pipe_with_seed(sub_seed) |
| 329 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 330 | |
| 331 | get_signing_identity(repo_root=tmp_path) |
| 332 | try: |
| 333 | os.close(r_fd) |
| 334 | except OSError: |
| 335 | pass |
| 336 | |
| 337 | # The raw bytes and any base64 encoding of them must not be in environ |
| 338 | from muse.core.types import b64url_encode |
| 339 | seed_b64 = b64url_encode(sub_seed) |
| 340 | for val in os.environ.values(): |
| 341 | assert seed_b64 not in val, "sub-seed base64 found in os.environ" |
| 342 | |
| 343 | def test_IV2_different_seeds_different_keys( |
| 344 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 345 | ) -> None: |
| 346 | """IV2: two different sub-seeds produce two different signing keys.""" |
| 347 | from muse.cli.config import get_signing_identity |
| 348 | from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
| 349 | |
| 350 | sub_seed_a = _make_sub_seed(account=9) |
| 351 | sub_seed_b = _make_sub_seed(account=10) |
| 352 | |
| 353 | r_fd_a = _pipe_with_seed(sub_seed_a) |
| 354 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_a)) |
| 355 | result_a = get_signing_identity(repo_root=tmp_path) |
| 356 | try: |
| 357 | os.close(r_fd_a) |
| 358 | except OSError: |
| 359 | pass |
| 360 | |
| 361 | r_fd_b = _pipe_with_seed(sub_seed_b) |
| 362 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_b)) |
| 363 | result_b = get_signing_identity(repo_root=tmp_path) |
| 364 | try: |
| 365 | os.close(r_fd_b) |
| 366 | except OSError: |
| 367 | pass |
| 368 | |
| 369 | assert result_a is not None and result_b is not None |
| 370 | pub_a = result_a.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 371 | pub_b = result_b.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) |
| 372 | assert pub_a != pub_b, "Different sub-seeds produced identical keys" |
| 373 | |
| 374 | def test_IV3_sub_seed_zeroed_after_use( |
| 375 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 376 | ) -> None: |
| 377 | """IV3: the sub-seed buffer must be zeroed in memory after key derivation. |
| 378 | |
| 379 | CRITICAL-2: a lingering plaintext sub-seed in RAM can be recovered from |
| 380 | a core dump or via /proc/<pid>/mem. The buffer must be all-zero before |
| 381 | get_signing_identity returns. |
| 382 | |
| 383 | Strategy: patch muse.core.hdkeys.derive_identity_key to capture a |
| 384 | reference to the buffer passed by the caller. After get_signing_identity |
| 385 | returns we verify: |
| 386 | 1. The buffer is a bytearray (mutable, so it *can* be zeroed). |
| 387 | 2. Every byte is 0x00 (was zeroed before returning). |
| 388 | """ |
| 389 | import os |
| 390 | from unittest.mock import patch |
| 391 | from muse.cli.config import get_signing_identity |
| 392 | from muse.core import hdkeys as _hdkeys |
| 393 | |
| 394 | sub_seed = _make_sub_seed(account=99) |
| 395 | r_fd = _pipe_with_seed(sub_seed) |
| 396 | monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd)) |
| 397 | monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False) |
| 398 | |
| 399 | captured = [] |
| 400 | original_derive = _hdkeys.derive_identity_key |
| 401 | |
| 402 | def capturing_derive(seed: bytes) -> DerivedKey: |
| 403 | captured.append(seed) # keep a reference — will survive zeroing |
| 404 | return original_derive(seed) |
| 405 | |
| 406 | with patch.object(_hdkeys, "derive_identity_key", side_effect=capturing_derive): |
| 407 | result = get_signing_identity(repo_root=tmp_path) |
| 408 | |
| 409 | try: |
| 410 | os.close(r_fd) |
| 411 | except OSError: |
| 412 | pass |
| 413 | |
| 414 | assert result is not None, "get_signing_identity must still succeed" |
| 415 | assert len(captured) == 1, "derive_identity_key must be called exactly once" |
| 416 | |
| 417 | buf = captured[0] |
| 418 | assert isinstance(buf, bytearray), ( |
| 419 | f"sub-seed must be passed as bytearray (got {type(buf).__name__}) " |
| 420 | "so it can be zeroed after use" |
| 421 | ) |
| 422 | assert buf == bytearray(64), ( |
| 423 | "sub-seed buffer must be all-zero after get_signing_identity returns" |
| 424 | ) |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
7 days ago