gabriel / muse public

test_agent_key_fd.py file-level

at sha256:d · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Tests for fd-based agent key injection β€” Tier 3.
2
3 MUSE_AGENT_KEY_FD is the only supported env-var mechanism for injecting
4 a sub-seed into an agent subprocess. The old MUSE_AGENT_HD_SEED and
5 MUSE_AGENT_KEY env vars are removed.
6
7 Protocol:
8 1. Parent creates an anonymous pipe (r_fd, w_fd).
9 2. Parent writes exactly 64 bytes of sub-seed to w_fd, closes w_fd.
10 3. Parent sets MUSE_AGENT_KEY_FD=str(r_fd) and spawns child with pass_fds=(r_fd,).
11 4. Child (get_signing_identity) reads exactly 64 bytes from r_fd, closes r_fd.
12 5. Child derives Ed25519 private key from sub_seed via derive_identity_key.
13 6. Secret never appears in /proc/<pid>/environ.
14
15 Coverage
16 --------
17 I get_signing_identity β€” fd injection
18 I1 MUSE_AGENT_KEY_FD reads 64 bytes, returns valid SigningIdentity
19 I2 derived key is deterministic for the same sub-seed
20 I3 fd is closed after read (cannot be read a second time)
21 I4 MUSE_AGENT_HANDLE sets the identity handle
22 I5 handle defaults to "agent" when MUSE_AGENT_HANDLE is unset
23
24 II Priority and fallback
25 II1 MUSE_AGENT_KEY_FD takes priority over identity store
26 II2 falls through to identity store when MUSE_AGENT_KEY_FD is unset
27
28 III Error handling
29 III1 invalid fd number β†’ falls through (does not crash)
30 III2 fd with wrong byte count β†’ falls through
31 III3 MUSE_AGENT_HD_SEED is no longer recognised
32 III4 MUSE_AGENT_KEY is no longer recognised
33
34 IV Security
35 IV1 sub-seed does not appear in any log output
36 IV2 two different sub-seeds produce two different signing keys
37 """
38
39 from __future__ import annotations
40
41 import os
42 import pathlib
43
44 import pytest
45 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
46 from muse.core.slip010 import DerivedKey
47
48 _TEST_MNEMONIC = (
49 "abandon abandon abandon abandon abandon abandon abandon abandon "
50 "abandon abandon abandon about"
51 )
52
53
54 def _make_sub_seed(account: int = 1) -> bytes:
55 """Derive a real 64-byte IDENTITY-domain agent sub-seed."""
56 from muse.core.bip39 import mnemonic_to_seed
57 from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed
58 seed = mnemonic_to_seed(_TEST_MNEMONIC)
59 return derive_agent_sub_seed(seed, domain=DOMAIN_IDENTITY, agent_id=account)
60
61
62 def _pipe_with_seed(sub_seed: bytes) -> int:
63 """Create a pipe, write sub_seed, close write end, return read fd."""
64 r_fd, w_fd = os.pipe()
65 os.write(w_fd, sub_seed)
66 os.close(w_fd)
67 return r_fd
68
69
70 # ---------------------------------------------------------------------------
71 # I get_signing_identity β€” fd injection
72 # ---------------------------------------------------------------------------
73
74
75 class TestFdInjectionI:
76 def test_I1_fd_returns_signing_identity(
77 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
78 ) -> None:
79 """I1: MUSE_AGENT_KEY_FD yields a valid SigningIdentity."""
80 from muse.cli.config import get_signing_identity
81
82 sub_seed = _make_sub_seed(account=1)
83 r_fd = _pipe_with_seed(sub_seed)
84
85 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
86 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
87
88 result = get_signing_identity(repo_root=tmp_path)
89 try:
90 os.close(r_fd)
91 except OSError:
92 pass
93
94 assert result is not None
95 assert isinstance(result.private_key, Ed25519PrivateKey)
96
97 def test_I2_deterministic_key_from_same_seed(
98 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
99 ) -> None:
100 """I2: same sub-seed always produces the same signing key."""
101 from muse.cli.config import get_signing_identity
102
103 sub_seed = _make_sub_seed(account=2)
104
105 r_fd1 = _pipe_with_seed(sub_seed)
106 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd1))
107 result1 = get_signing_identity(repo_root=tmp_path)
108 try:
109 os.close(r_fd1)
110 except OSError:
111 pass
112
113 r_fd2 = _pipe_with_seed(sub_seed)
114 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd2))
115 result2 = get_signing_identity(repo_root=tmp_path)
116 try:
117 os.close(r_fd2)
118 except OSError:
119 pass
120
121 assert result1 is not None and result2 is not None
122 # Same key material β†’ same public key bytes
123 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
124 pub1 = result1.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
125 pub2 = result2.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
126 assert pub1 == pub2
127
128 def test_I3_fd_closed_after_read(
129 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
130 ) -> None:
131 """I3: the fd is closed by get_signing_identity β€” cannot be read again."""
132 from muse.cli.config import get_signing_identity
133
134 sub_seed = _make_sub_seed(account=3)
135 r_fd = _pipe_with_seed(sub_seed)
136
137 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
138 get_signing_identity(repo_root=tmp_path)
139
140 # fd must be closed
141 with pytest.raises(OSError):
142 os.read(r_fd, 1)
143
144 def test_I4_muse_agent_handle_sets_handle(
145 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
146 ) -> None:
147 """I4: MUSE_AGENT_HANDLE sets the identity handle."""
148 from muse.cli.config import get_signing_identity
149
150 sub_seed = _make_sub_seed(account=4)
151 r_fd = _pipe_with_seed(sub_seed)
152
153 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
154 monkeypatch.setenv("MUSE_AGENT_HANDLE", "my-agent-001")
155
156 result = get_signing_identity(repo_root=tmp_path)
157 try:
158 os.close(r_fd)
159 except OSError:
160 pass
161
162 assert result is not None
163 assert result.handle == "my-agent-001"
164
165 def test_I5_default_handle_is_agent(
166 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
167 ) -> None:
168 """I5: handle defaults to 'agent' when MUSE_AGENT_HANDLE is not set."""
169 from muse.cli.config import get_signing_identity
170
171 sub_seed = _make_sub_seed(account=5)
172 r_fd = _pipe_with_seed(sub_seed)
173
174 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
175 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
176
177 result = get_signing_identity(repo_root=tmp_path)
178 try:
179 os.close(r_fd)
180 except OSError:
181 pass
182
183 assert result is not None
184 assert result.handle == "agent"
185
186
187 # ---------------------------------------------------------------------------
188 # II Priority and fallback
189 # ---------------------------------------------------------------------------
190
191
192 class TestPriorityII:
193 def test_II1_fd_takes_priority_over_identity_store(
194 self,
195 monkeypatch: pytest.MonkeyPatch,
196 tmp_path: pathlib.Path,
197 ) -> None:
198 """II1: MUSE_AGENT_KEY_FD takes priority over file-based identity."""
199 import muse.core.identity as id_mod
200 # Patch resolve_signing_identity so we know if it was called
201 called = []
202 orig = id_mod.resolve_signing_identity
203 monkeypatch.setattr(id_mod, "resolve_signing_identity",
204 lambda *a, **kw: (called.append(True), orig(*a, **kw))[1])
205
206 from muse.cli.config import get_signing_identity
207 sub_seed = _make_sub_seed(account=6)
208 r_fd = _pipe_with_seed(sub_seed)
209 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
210
211 result = get_signing_identity(repo_root=tmp_path)
212 try:
213 os.close(r_fd)
214 except OSError:
215 pass
216
217 assert result is not None
218 assert not called, "identity store was consulted despite MUSE_AGENT_KEY_FD being set"
219
220 def test_II2_falls_through_to_identity_store_when_unset(
221 self,
222 monkeypatch: pytest.MonkeyPatch,
223 tmp_path: pathlib.Path,
224 ) -> None:
225 """II2: without MUSE_AGENT_KEY_FD, falls through to identity store (returns None)."""
226 from muse.cli.config import get_signing_identity
227 monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False)
228 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
229
230 # No identity configured β†’ returns None
231 result = get_signing_identity(repo_root=tmp_path)
232 assert result is None
233
234
235 # ---------------------------------------------------------------------------
236 # III Error handling
237 # ---------------------------------------------------------------------------
238
239
240 class TestErrorHandlingIII:
241 def test_III1_invalid_fd_falls_through(
242 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
243 ) -> None:
244 """III1: an invalid fd number falls through gracefully (no crash)."""
245 from muse.cli.config import get_signing_identity
246 monkeypatch.setenv("MUSE_AGENT_KEY_FD", "9999") # certainly not open
247
248 # Should not raise β€” just falls through to identity store (returns None)
249 result = get_signing_identity(repo_root=tmp_path)
250 assert result is None
251
252 def test_III2_wrong_byte_count_falls_through(
253 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
254 ) -> None:
255 """III2: fewer than 64 bytes in the pipe β†’ falls through."""
256 from muse.cli.config import get_signing_identity
257
258 r_fd, w_fd = os.pipe()
259 os.write(w_fd, b"\x00" * 32) # 32 bytes, not 64
260 os.close(w_fd)
261
262 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
263
264 result = get_signing_identity(repo_root=tmp_path)
265 try:
266 os.close(r_fd)
267 except OSError:
268 pass
269
270 assert result is None
271
272 def test_III3_muse_agent_hd_seed_not_recognised(
273 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
274 ) -> None:
275 """III3: MUSE_AGENT_HD_SEED is no longer supported β€” setting it has no effect."""
276 from muse.cli.config import get_signing_identity
277 from muse.core.bip39 import mnemonic_to_seed
278 from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed
279 from muse.core.types import b64url_encode
280
281 sub_seed = _make_sub_seed(account=7)
282 seed_b64 = b64url_encode(sub_seed)
283
284 monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False)
285 monkeypatch.setenv("MUSE_AGENT_HD_SEED", seed_b64)
286
287 # Should NOT return a signing identity (env var is removed)
288 result = get_signing_identity(repo_root=tmp_path)
289 assert result is None, (
290 "MUSE_AGENT_HD_SEED should be ignored but returned a signing identity"
291 )
292
293 def test_III4_muse_agent_key_not_recognised(
294 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
295 ) -> None:
296 """III4: MUSE_AGENT_KEY (PEM env var) is no longer supported."""
297 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
298 from cryptography.hazmat.primitives.serialization import (
299 Encoding, PrivateFormat, NoEncryption,
300 )
301 from muse.cli.config import get_signing_identity
302
303 key = Ed25519PrivateKey.generate()
304 pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode()
305
306 monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False)
307 monkeypatch.setenv("MUSE_AGENT_KEY", pem)
308
309 result = get_signing_identity(repo_root=tmp_path)
310 assert result is None, (
311 "MUSE_AGENT_KEY should be ignored but returned a signing identity"
312 )
313
314
315 # ---------------------------------------------------------------------------
316 # IV Security
317 # ---------------------------------------------------------------------------
318
319
320 class TestSecurityIV:
321 def test_IV1_sub_seed_not_in_environ(
322 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
323 ) -> None:
324 """IV1: the sub-seed bytes never appear in os.environ."""
325 from muse.cli.config import get_signing_identity
326
327 sub_seed = _make_sub_seed(account=8)
328 r_fd = _pipe_with_seed(sub_seed)
329 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
330
331 get_signing_identity(repo_root=tmp_path)
332 try:
333 os.close(r_fd)
334 except OSError:
335 pass
336
337 # The raw bytes and any base64 encoding of them must not be in environ
338 from muse.core.types import b64url_encode
339 seed_b64 = b64url_encode(sub_seed)
340 for val in os.environ.values():
341 assert seed_b64 not in val, "sub-seed base64 found in os.environ"
342
343 def test_IV2_different_seeds_different_keys(
344 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
345 ) -> None:
346 """IV2: two different sub-seeds produce two different signing keys."""
347 from muse.cli.config import get_signing_identity
348 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
349
350 sub_seed_a = _make_sub_seed(account=9)
351 sub_seed_b = _make_sub_seed(account=10)
352
353 r_fd_a = _pipe_with_seed(sub_seed_a)
354 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_a))
355 result_a = get_signing_identity(repo_root=tmp_path)
356 try:
357 os.close(r_fd_a)
358 except OSError:
359 pass
360
361 r_fd_b = _pipe_with_seed(sub_seed_b)
362 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_b))
363 result_b = get_signing_identity(repo_root=tmp_path)
364 try:
365 os.close(r_fd_b)
366 except OSError:
367 pass
368
369 assert result_a is not None and result_b is not None
370 pub_a = result_a.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
371 pub_b = result_b.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
372 assert pub_a != pub_b, "Different sub-seeds produced identical keys"
373
374 def test_IV3_sub_seed_zeroed_after_use(
375 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
376 ) -> None:
377 """IV3: the sub-seed buffer must be zeroed in memory after key derivation.
378
379 CRITICAL-2: a lingering plaintext sub-seed in RAM can be recovered from
380 a core dump or via /proc/<pid>/mem. The buffer must be all-zero before
381 get_signing_identity returns.
382
383 Strategy: patch muse.core.hdkeys.derive_identity_key to capture a
384 reference to the buffer passed by the caller. After get_signing_identity
385 returns we verify:
386 1. The buffer is a bytearray (mutable, so it *can* be zeroed).
387 2. Every byte is 0x00 (was zeroed before returning).
388 """
389 import os
390 from unittest.mock import patch
391 from muse.cli.config import get_signing_identity
392 from muse.core import hdkeys as _hdkeys
393
394 sub_seed = _make_sub_seed(account=99)
395 r_fd = _pipe_with_seed(sub_seed)
396 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
397 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
398
399 captured = []
400 original_derive = _hdkeys.derive_identity_key
401
402 def capturing_derive(seed: bytes) -> DerivedKey:
403 captured.append(seed) # keep a reference β€” will survive zeroing
404 return original_derive(seed)
405
406 with patch.object(_hdkeys, "derive_identity_key", side_effect=capturing_derive):
407 result = get_signing_identity(repo_root=tmp_path)
408
409 try:
410 os.close(r_fd)
411 except OSError:
412 pass
413
414 assert result is not None, "get_signing_identity must still succeed"
415 assert len(captured) == 1, "derive_identity_key must be called exactly once"
416
417 buf = captured[0]
418 assert isinstance(buf, bytearray), (
419 f"sub-seed must be passed as bytearray (got {type(buf).__name__}) "
420 "so it can be zeroed after use"
421 )
422 assert buf == bytearray(64), (
423 "sub-seed buffer must be all-zero after get_signing_identity returns"
424 )