gabriel / muse public
test_agent_key_fd.py python
424 lines 15.6 KB
Raw
1 """Tests for fd-based agent key injection — Tier 3.
2
3 MUSE_AGENT_KEY_FD is the only supported env-var mechanism for injecting
4 a sub-seed into an agent subprocess. The old MUSE_AGENT_HD_SEED and
5 MUSE_AGENT_KEY env vars are removed.
6
7 Protocol:
8 1. Parent creates an anonymous pipe (r_fd, w_fd).
9 2. Parent writes exactly 64 bytes of sub-seed to w_fd, closes w_fd.
10 3. Parent sets MUSE_AGENT_KEY_FD=str(r_fd) and spawns child with pass_fds=(r_fd,).
11 4. Child (get_signing_identity) reads exactly 64 bytes from r_fd, closes r_fd.
12 5. Child derives Ed25519 private key from sub_seed via derive_identity_key.
13 6. Secret never appears in /proc/<pid>/environ.
14
15 Coverage
16 --------
17 I get_signing_identity — fd injection
18 I1 MUSE_AGENT_KEY_FD reads 64 bytes, returns valid SigningIdentity
19 I2 derived key is deterministic for the same sub-seed
20 I3 fd is closed after read (cannot be read a second time)
21 I4 MUSE_AGENT_HANDLE sets the identity handle
22 I5 handle defaults to "agent" when MUSE_AGENT_HANDLE is unset
23
24 II Priority and fallback
25 II1 MUSE_AGENT_KEY_FD takes priority over identity store
26 II2 falls through to identity store when MUSE_AGENT_KEY_FD is unset
27
28 III Error handling
29 III1 invalid fd number → falls through (does not crash)
30 III2 fd with wrong byte count → falls through
31 III3 MUSE_AGENT_HD_SEED is no longer recognised
32 III4 MUSE_AGENT_KEY is no longer recognised
33
34 IV Security
35 IV1 sub-seed does not appear in any log output
36 IV2 two different sub-seeds produce two different signing keys
37 """
38
39 from __future__ import annotations
40
41 import os
42 import pathlib
43
44 import pytest
45 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
46 from muse.core.slip010 import DerivedKey
47
48 _TEST_MNEMONIC = (
49 "abandon abandon abandon abandon abandon abandon abandon abandon "
50 "abandon abandon abandon about"
51 )
52
53
54 def _make_sub_seed(account: int = 1) -> bytes:
55 """Derive a real 64-byte IDENTITY-domain agent sub-seed."""
56 from muse.core.bip39 import mnemonic_to_seed
57 from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed
58 seed = mnemonic_to_seed(_TEST_MNEMONIC)
59 return derive_agent_sub_seed(seed, domain=DOMAIN_IDENTITY, agent_id=account)
60
61
62 def _pipe_with_seed(sub_seed: bytes) -> int:
63 """Create a pipe, write sub_seed, close write end, return read fd."""
64 r_fd, w_fd = os.pipe()
65 os.write(w_fd, sub_seed)
66 os.close(w_fd)
67 return r_fd
68
69
70 # ---------------------------------------------------------------------------
71 # I get_signing_identity — fd injection
72 # ---------------------------------------------------------------------------
73
74
75 class TestFdInjectionI:
76 def test_I1_fd_returns_signing_identity(
77 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
78 ) -> None:
79 """I1: MUSE_AGENT_KEY_FD yields a valid SigningIdentity."""
80 from muse.cli.config import get_signing_identity
81
82 sub_seed = _make_sub_seed(account=1)
83 r_fd = _pipe_with_seed(sub_seed)
84
85 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
86 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
87
88 result = get_signing_identity(repo_root=tmp_path)
89 try:
90 os.close(r_fd)
91 except OSError:
92 pass
93
94 assert result is not None
95 assert isinstance(result.private_key, Ed25519PrivateKey)
96
97 def test_I2_deterministic_key_from_same_seed(
98 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
99 ) -> None:
100 """I2: same sub-seed always produces the same signing key."""
101 from muse.cli.config import get_signing_identity
102
103 sub_seed = _make_sub_seed(account=2)
104
105 r_fd1 = _pipe_with_seed(sub_seed)
106 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd1))
107 result1 = get_signing_identity(repo_root=tmp_path)
108 try:
109 os.close(r_fd1)
110 except OSError:
111 pass
112
113 r_fd2 = _pipe_with_seed(sub_seed)
114 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd2))
115 result2 = get_signing_identity(repo_root=tmp_path)
116 try:
117 os.close(r_fd2)
118 except OSError:
119 pass
120
121 assert result1 is not None and result2 is not None
122 # Same key material → same public key bytes
123 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
124 pub1 = result1.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
125 pub2 = result2.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
126 assert pub1 == pub2
127
128 def test_I3_fd_closed_after_read(
129 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
130 ) -> None:
131 """I3: the fd is closed by get_signing_identity — cannot be read again."""
132 from muse.cli.config import get_signing_identity
133
134 sub_seed = _make_sub_seed(account=3)
135 r_fd = _pipe_with_seed(sub_seed)
136
137 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
138 get_signing_identity(repo_root=tmp_path)
139
140 # fd must be closed
141 with pytest.raises(OSError):
142 os.read(r_fd, 1)
143
144 def test_I4_muse_agent_handle_sets_handle(
145 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
146 ) -> None:
147 """I4: MUSE_AGENT_HANDLE sets the identity handle."""
148 from muse.cli.config import get_signing_identity
149
150 sub_seed = _make_sub_seed(account=4)
151 r_fd = _pipe_with_seed(sub_seed)
152
153 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
154 monkeypatch.setenv("MUSE_AGENT_HANDLE", "my-agent-001")
155
156 result = get_signing_identity(repo_root=tmp_path)
157 try:
158 os.close(r_fd)
159 except OSError:
160 pass
161
162 assert result is not None
163 assert result.handle == "my-agent-001"
164
165 def test_I5_default_handle_is_agent(
166 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
167 ) -> None:
168 """I5: handle defaults to 'agent' when MUSE_AGENT_HANDLE is not set."""
169 from muse.cli.config import get_signing_identity
170
171 sub_seed = _make_sub_seed(account=5)
172 r_fd = _pipe_with_seed(sub_seed)
173
174 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
175 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
176
177 result = get_signing_identity(repo_root=tmp_path)
178 try:
179 os.close(r_fd)
180 except OSError:
181 pass
182
183 assert result is not None
184 assert result.handle == "agent"
185
186
187 # ---------------------------------------------------------------------------
188 # II Priority and fallback
189 # ---------------------------------------------------------------------------
190
191
192 class TestPriorityII:
193 def test_II1_fd_takes_priority_over_identity_store(
194 self,
195 monkeypatch: pytest.MonkeyPatch,
196 tmp_path: pathlib.Path,
197 ) -> None:
198 """II1: MUSE_AGENT_KEY_FD takes priority over file-based identity."""
199 import muse.core.identity as id_mod
200 # Patch resolve_signing_identity so we know if it was called
201 called = []
202 orig = id_mod.resolve_signing_identity
203 monkeypatch.setattr(id_mod, "resolve_signing_identity",
204 lambda *a, **kw: (called.append(True), orig(*a, **kw))[1])
205
206 from muse.cli.config import get_signing_identity
207 sub_seed = _make_sub_seed(account=6)
208 r_fd = _pipe_with_seed(sub_seed)
209 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
210
211 result = get_signing_identity(repo_root=tmp_path)
212 try:
213 os.close(r_fd)
214 except OSError:
215 pass
216
217 assert result is not None
218 assert not called, "identity store was consulted despite MUSE_AGENT_KEY_FD being set"
219
220 def test_II2_falls_through_to_identity_store_when_unset(
221 self,
222 monkeypatch: pytest.MonkeyPatch,
223 tmp_path: pathlib.Path,
224 ) -> None:
225 """II2: without MUSE_AGENT_KEY_FD, falls through to identity store (returns None)."""
226 from muse.cli.config import get_signing_identity
227 monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False)
228 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
229
230 # No identity configured → returns None
231 result = get_signing_identity(repo_root=tmp_path)
232 assert result is None
233
234
235 # ---------------------------------------------------------------------------
236 # III Error handling
237 # ---------------------------------------------------------------------------
238
239
240 class TestErrorHandlingIII:
241 def test_III1_invalid_fd_falls_through(
242 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
243 ) -> None:
244 """III1: an invalid fd number falls through gracefully (no crash)."""
245 from muse.cli.config import get_signing_identity
246 monkeypatch.setenv("MUSE_AGENT_KEY_FD", "9999") # certainly not open
247
248 # Should not raise — just falls through to identity store (returns None)
249 result = get_signing_identity(repo_root=tmp_path)
250 assert result is None
251
252 def test_III2_wrong_byte_count_falls_through(
253 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
254 ) -> None:
255 """III2: fewer than 64 bytes in the pipe → falls through."""
256 from muse.cli.config import get_signing_identity
257
258 r_fd, w_fd = os.pipe()
259 os.write(w_fd, b"\x00" * 32) # 32 bytes, not 64
260 os.close(w_fd)
261
262 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
263
264 result = get_signing_identity(repo_root=tmp_path)
265 try:
266 os.close(r_fd)
267 except OSError:
268 pass
269
270 assert result is None
271
272 def test_III3_muse_agent_hd_seed_not_recognised(
273 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
274 ) -> None:
275 """III3: MUSE_AGENT_HD_SEED is no longer supported — setting it has no effect."""
276 from muse.cli.config import get_signing_identity
277 from muse.core.bip39 import mnemonic_to_seed
278 from muse.core.hdkeys import DOMAIN_IDENTITY, derive_agent_sub_seed
279 from muse.core.types import b64url_encode
280
281 sub_seed = _make_sub_seed(account=7)
282 seed_b64 = b64url_encode(sub_seed)
283
284 monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False)
285 monkeypatch.setenv("MUSE_AGENT_HD_SEED", seed_b64)
286
287 # Should NOT return a signing identity (env var is removed)
288 result = get_signing_identity(repo_root=tmp_path)
289 assert result is None, (
290 "MUSE_AGENT_HD_SEED should be ignored but returned a signing identity"
291 )
292
293 def test_III4_muse_agent_key_not_recognised(
294 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
295 ) -> None:
296 """III4: MUSE_AGENT_KEY (PEM env var) is no longer supported."""
297 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
298 from cryptography.hazmat.primitives.serialization import (
299 Encoding, PrivateFormat, NoEncryption,
300 )
301 from muse.cli.config import get_signing_identity
302
303 key = Ed25519PrivateKey.generate()
304 pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode()
305
306 monkeypatch.delenv("MUSE_AGENT_KEY_FD", raising=False)
307 monkeypatch.setenv("MUSE_AGENT_KEY", pem)
308
309 result = get_signing_identity(repo_root=tmp_path)
310 assert result is None, (
311 "MUSE_AGENT_KEY should be ignored but returned a signing identity"
312 )
313
314
315 # ---------------------------------------------------------------------------
316 # IV Security
317 # ---------------------------------------------------------------------------
318
319
320 class TestSecurityIV:
321 def test_IV1_sub_seed_not_in_environ(
322 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
323 ) -> None:
324 """IV1: the sub-seed bytes never appear in os.environ."""
325 from muse.cli.config import get_signing_identity
326
327 sub_seed = _make_sub_seed(account=8)
328 r_fd = _pipe_with_seed(sub_seed)
329 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
330
331 get_signing_identity(repo_root=tmp_path)
332 try:
333 os.close(r_fd)
334 except OSError:
335 pass
336
337 # The raw bytes and any base64 encoding of them must not be in environ
338 from muse.core.types import b64url_encode
339 seed_b64 = b64url_encode(sub_seed)
340 for val in os.environ.values():
341 assert seed_b64 not in val, "sub-seed base64 found in os.environ"
342
343 def test_IV2_different_seeds_different_keys(
344 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
345 ) -> None:
346 """IV2: two different sub-seeds produce two different signing keys."""
347 from muse.cli.config import get_signing_identity
348 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
349
350 sub_seed_a = _make_sub_seed(account=9)
351 sub_seed_b = _make_sub_seed(account=10)
352
353 r_fd_a = _pipe_with_seed(sub_seed_a)
354 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_a))
355 result_a = get_signing_identity(repo_root=tmp_path)
356 try:
357 os.close(r_fd_a)
358 except OSError:
359 pass
360
361 r_fd_b = _pipe_with_seed(sub_seed_b)
362 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd_b))
363 result_b = get_signing_identity(repo_root=tmp_path)
364 try:
365 os.close(r_fd_b)
366 except OSError:
367 pass
368
369 assert result_a is not None and result_b is not None
370 pub_a = result_a.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
371 pub_b = result_b.private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
372 assert pub_a != pub_b, "Different sub-seeds produced identical keys"
373
374 def test_IV3_sub_seed_zeroed_after_use(
375 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
376 ) -> None:
377 """IV3: the sub-seed buffer must be zeroed in memory after key derivation.
378
379 CRITICAL-2: a lingering plaintext sub-seed in RAM can be recovered from
380 a core dump or via /proc/<pid>/mem. The buffer must be all-zero before
381 get_signing_identity returns.
382
383 Strategy: patch muse.core.hdkeys.derive_identity_key to capture a
384 reference to the buffer passed by the caller. After get_signing_identity
385 returns we verify:
386 1. The buffer is a bytearray (mutable, so it *can* be zeroed).
387 2. Every byte is 0x00 (was zeroed before returning).
388 """
389 import os
390 from unittest.mock import patch
391 from muse.cli.config import get_signing_identity
392 from muse.core import hdkeys as _hdkeys
393
394 sub_seed = _make_sub_seed(account=99)
395 r_fd = _pipe_with_seed(sub_seed)
396 monkeypatch.setenv("MUSE_AGENT_KEY_FD", str(r_fd))
397 monkeypatch.delenv("MUSE_AGENT_HANDLE", raising=False)
398
399 captured = []
400 original_derive = _hdkeys.derive_identity_key
401
402 def capturing_derive(seed: bytes) -> DerivedKey:
403 captured.append(seed) # keep a reference — will survive zeroing
404 return original_derive(seed)
405
406 with patch.object(_hdkeys, "derive_identity_key", side_effect=capturing_derive):
407 result = get_signing_identity(repo_root=tmp_path)
408
409 try:
410 os.close(r_fd)
411 except OSError:
412 pass
413
414 assert result is not None, "get_signing_identity must still succeed"
415 assert len(captured) == 1, "derive_identity_key must be called exactly once"
416
417 buf = captured[0]
418 assert isinstance(buf, bytearray), (
419 f"sub-seed must be passed as bytearray (got {type(buf).__name__}) "
420 "so it can be zeroed after use"
421 )
422 assert buf == bytearray(64), (
423 "sub-seed buffer must be all-zero after get_signing_identity returns"
424 )
File History 1 commit