gabriel / muse public
test_cmd_sign_hardening.py python
482 lines 18.5 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago
1 """Hardening tests for ``muse sign``.
2
3 Gaps closed
4 -----------
5 1. ``duration_ms`` + ``exit_code`` absent from ALL JSON output paths
6 (``header``, ``verify``, ``payment``).
7 2. ``_emit()`` is a dead stub — text-mode branch does nothing; stub removed.
8 3. ``run_verify`` valid/invalid JSON must carry the envelope.
9 4. ``run_payment`` text mode prints 6 lines to stderr then sig to stdout —
10 inconsistent; text mode should use stderr only.
11 5. Module docstring JSON schemas missing ``duration_ms`` / ``exit_code``.
12 """
13
14 from __future__ import annotations
15
16 import argparse
17 import io
18 import json
19 import time
20 import unittest
21 from typing import TYPE_CHECKING
22 from unittest.mock import patch
23
24 from muse.core.types import b64url_encode
25
26 if TYPE_CHECKING:
27 from muse.core.transport import SigningIdentity
28
29
30 # ---------------------------------------------------------------------------
31 # Shared helpers (mirrors test_cmd_sign.py pattern)
32 # ---------------------------------------------------------------------------
33
34
35 def _make_signing(handle: str = "gabriel") -> "SigningIdentity":
36 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
37 from muse.core.transport import SigningIdentity
38
39 return SigningIdentity(handle=handle, private_key=Ed25519PrivateKey.generate())
40
41
42 def _public_key_b64(signing: "SigningIdentity") -> str:
43 pub_raw = signing.private_key.public_key().public_bytes_raw() # type: ignore[attr-defined]
44 return b64url_encode(pub_raw)
45
46
47 def _make_header(signing: "SigningIdentity", method: str, url: str,
48 body: bytes = b"", ts: int | None = None) -> str:
49 from muse.core.msign import build_msign_header
50 return build_msign_header(signing, method, url, body, ts=ts or int(time.time()))
51
52
53 # ---------------------------------------------------------------------------
54 # TestElapsedAndExitCode — every JSON output path must carry the envelope
55 # ---------------------------------------------------------------------------
56
57
58 class TestElapsedAndExitCode(unittest.TestCase):
59 def setUp(self) -> None:
60 self.signing = _make_signing("gabriel")
61 self.pub_b64 = _public_key_b64(self.signing)
62 self.url = "https://hub.example.com/gabriel/muse/push"
63 self.ts = int(time.time())
64 self.header_value = _make_header(self.signing, "POST", self.url, ts=self.ts)
65
66 # ── header ────────────────────────────────────────────────────────────────
67
68 def _header_args(self, **extra: bool | int | str | None) -> argparse.Namespace:
69 base = dict(
70 method="POST",
71 path=None,
72 url=self.url,
73 hub=None,
74 body=None,
75 body_file=None,
76 timestamp=self.ts,
77 key_path=None,
78 agent_id=None,
79 json_out=True,
80 )
81 base.update(extra)
82 return argparse.Namespace(**base)
83
84 def test_header_json_has_duration_ms(self) -> None:
85 from muse.cli.commands.sign import run_header
86
87 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
88 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
89 run_header(self._header_args())
90 data = json.loads(mock_out.getvalue())
91 assert "duration_ms" in data, f"'duration_ms' missing: {list(data)}"
92
93 def test_header_json_duration_ms_is_float(self) -> None:
94 from muse.cli.commands.sign import run_header
95
96 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
97 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
98 run_header(self._header_args())
99 data = json.loads(mock_out.getvalue())
100 assert isinstance(data["duration_ms"], float)
101 assert data["duration_ms"] >= 0.0
102
103 def test_header_json_has_exit_code_zero(self) -> None:
104 from muse.cli.commands.sign import run_header
105
106 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
107 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
108 run_header(self._header_args())
109 data = json.loads(mock_out.getvalue())
110 assert "exit_code" in data, f"'exit_code' missing: {list(data)}"
111 assert data["exit_code"] == 0
112
113 # ── verify (valid) ────────────────────────────────────────────────────────
114
115 def _verify_args(self, **extra: bool | int | str | None) -> argparse.Namespace:
116 base = dict(
117 header=self.header_value,
118 method="POST",
119 url=self.url,
120 public_key_b64=self.pub_b64,
121 body=None,
122 body_file=None,
123 max_age=300, # generous window to avoid flaky tests
124 json_out=True,
125 )
126 base.update(extra)
127 return argparse.Namespace(**base)
128
129 def test_verify_valid_json_has_duration_ms(self) -> None:
130 from muse.cli.commands.sign import run_verify
131
132 with patch("sys.stdout", new_callable=io.StringIO) as mock_out:
133 run_verify(self._verify_args())
134 data = json.loads(mock_out.getvalue())
135 assert "duration_ms" in data, f"'duration_ms' missing: {list(data)}"
136
137 def test_verify_valid_json_has_exit_code_zero(self) -> None:
138 from muse.cli.commands.sign import run_verify
139
140 with patch("sys.stdout", new_callable=io.StringIO) as mock_out:
141 run_verify(self._verify_args())
142 data = json.loads(mock_out.getvalue())
143 assert "exit_code" in data, f"'exit_code' missing: {list(data)}"
144 assert data["exit_code"] == 0
145
146 def test_verify_valid_json_duration_ms_is_float(self) -> None:
147 from muse.cli.commands.sign import run_verify
148
149 with patch("sys.stdout", new_callable=io.StringIO) as mock_out:
150 run_verify(self._verify_args())
151 data = json.loads(mock_out.getvalue())
152 assert isinstance(data["duration_ms"], float)
153 assert data["duration_ms"] >= 0.0
154
155 def test_verify_invalid_json_has_duration_ms(self) -> None:
156 """Even failed verification carries the envelope."""
157 from muse.cli.commands.sign import run_verify
158
159 other_signing = _make_signing("impostor")
160 bad_pub_b64 = _public_key_b64(other_signing)
161 args = self._verify_args(public_key_b64=bad_pub_b64)
162 with patch("sys.stdout", new_callable=io.StringIO) as mock_out:
163 try:
164 run_verify(args)
165 except SystemExit:
166 pass
167 data = json.loads(mock_out.getvalue())
168 assert "duration_ms" in data, f"'duration_ms' missing from invalid verify JSON: {list(data)}"
169
170 def test_verify_invalid_json_has_exit_code_nonzero(self) -> None:
171 from muse.cli.commands.sign import run_verify
172
173 other_signing = _make_signing("impostor")
174 bad_pub_b64 = _public_key_b64(other_signing)
175 args = self._verify_args(public_key_b64=bad_pub_b64)
176 with patch("sys.stdout", new_callable=io.StringIO) as mock_out:
177 try:
178 run_verify(args)
179 except SystemExit:
180 pass
181 data = json.loads(mock_out.getvalue())
182 assert "exit_code" in data
183 assert data["exit_code"] != 0
184
185 # ── payment ───────────────────────────────────────────────────────────────
186
187 def _payment_args(self, **extra: bool | int | str | None) -> argparse.Namespace:
188 base = dict(
189 from_handle="gabriel",
190 to_handle="alice",
191 amount=1_000_000,
192 nonce="a" * 64,
193 currency="nanoMUSE",
194 memo="stem:sha256:deadbeef",
195 hub=None,
196 key_path=None,
197 agent_id=None,
198 timestamp=self.ts,
199 json_out=True,
200 )
201 base.update(extra)
202 return argparse.Namespace(**base)
203
204 def test_payment_json_has_duration_ms(self) -> None:
205 from muse.cli.commands.sign import run_payment
206
207 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
208 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
209 run_payment(self._payment_args())
210 data = json.loads(mock_out.getvalue())
211 assert "duration_ms" in data, f"'duration_ms' missing: {list(data)}"
212
213 def test_payment_json_duration_ms_is_float(self) -> None:
214 from muse.cli.commands.sign import run_payment
215
216 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
217 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
218 run_payment(self._payment_args())
219 data = json.loads(mock_out.getvalue())
220 assert isinstance(data["duration_ms"], float)
221 assert data["duration_ms"] >= 0.0
222
223 def test_payment_json_has_exit_code_zero(self) -> None:
224 from muse.cli.commands.sign import run_payment
225
226 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
227 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
228 run_payment(self._payment_args())
229 data = json.loads(mock_out.getvalue())
230 assert "exit_code" in data, f"'exit_code' missing: {list(data)}"
231 assert data["exit_code"] == 0
232
233
234 # ---------------------------------------------------------------------------
235 # TestHeaderJsonSchema — all documented fields present + envelope
236 # ---------------------------------------------------------------------------
237
238
239 class TestHeaderJsonSchema(unittest.TestCase):
240 REQUIRED_KEYS = {
241 "handle", "hub", "method", "path", "signing_ts",
242 "body_sha256", "signature_b64", "header_value",
243 "algorithm", "fingerprint",
244 "duration_ms", "exit_code",
245 }
246
247 def test_all_required_keys_present(self) -> None:
248 from muse.cli.commands.sign import run_header
249
250 signing = _make_signing("gabriel")
251 ts = int(time.time())
252 args = argparse.Namespace(
253 method="POST",
254 path=None,
255 url="https://hub.example.com/gabriel/muse/push",
256 hub="https://hub.example.com",
257 body=None,
258 body_file=None,
259 timestamp=ts,
260 key_path=None,
261 agent_id=None,
262 json_out=True,
263 )
264 with patch("muse.cli.commands.sign._load_signing", return_value=signing), \
265 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
266 run_header(args)
267 data = json.loads(mock_out.getvalue())
268 missing = self.REQUIRED_KEYS - set(data)
269 assert not missing, f"Missing JSON keys: {missing}"
270
271
272 # ---------------------------------------------------------------------------
273 # TestVerifyJsonSchema — valid and invalid paths both have full schema
274 # ---------------------------------------------------------------------------
275
276
277 class TestVerifyJsonSchema(unittest.TestCase):
278 REQUIRED_KEYS = {"valid", "reason", "duration_ms", "exit_code"}
279
280 def setUp(self) -> None:
281 self.signing = _make_signing("gabriel")
282 self.pub_b64 = _public_key_b64(self.signing)
283 self.url = "https://hub.example.com/gabriel/muse/push"
284 self.ts = int(time.time())
285 self.header_value = _make_header(self.signing, "POST", self.url, ts=self.ts)
286
287 def test_valid_has_all_required_keys(self) -> None:
288 from muse.cli.commands.sign import run_verify
289
290 args = argparse.Namespace(
291 header=self.header_value,
292 method="POST",
293 url=self.url,
294 public_key_b64=self.pub_b64,
295 body=None,
296 body_file=None,
297 max_age=300,
298 json_out=True,
299 )
300 with patch("sys.stdout", new_callable=io.StringIO) as mock_out:
301 run_verify(args)
302 data = json.loads(mock_out.getvalue())
303 missing = self.REQUIRED_KEYS - set(data)
304 assert not missing, f"Missing JSON keys: {missing}"
305
306 def test_invalid_has_all_required_keys(self) -> None:
307 from muse.cli.commands.sign import run_verify
308
309 other = _make_signing("impostor")
310 args = argparse.Namespace(
311 header=self.header_value,
312 method="POST",
313 url=self.url,
314 public_key_b64=_public_key_b64(other),
315 body=None,
316 body_file=None,
317 max_age=300,
318 json_out=True,
319 )
320 with patch("sys.stdout", new_callable=io.StringIO) as mock_out:
321 try:
322 run_verify(args)
323 except SystemExit:
324 pass
325 data = json.loads(mock_out.getvalue())
326 missing = self.REQUIRED_KEYS - set(data)
327 assert not missing, f"Missing JSON keys in invalid-verify JSON: {missing}"
328
329
330 # ---------------------------------------------------------------------------
331 # TestPaymentJsonSchema — all documented fields present + envelope
332 # ---------------------------------------------------------------------------
333
334
335 class TestPaymentJsonSchema(unittest.TestCase):
336 REQUIRED_KEYS = {
337 "from_handle", "to_handle", "amount_nano", "currency",
338 "nonce_hex", "memo", "ts", "signature_b64", "canonical_message",
339 "duration_ms", "exit_code",
340 }
341
342 def test_all_required_keys_present(self) -> None:
343 from muse.cli.commands.sign import run_payment
344
345 signing = _make_signing("gabriel")
346 ts = int(time.time())
347 args = argparse.Namespace(
348 from_handle="gabriel",
349 to_handle="alice",
350 amount=1_000_000,
351 nonce="b" * 64,
352 currency="nanoMUSE",
353 memo="",
354 hub=None,
355 key_path=None,
356 agent_id=None,
357 timestamp=ts,
358 json_out=True,
359 )
360 with patch("muse.cli.commands.sign._load_signing", return_value=signing), \
361 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
362 run_payment(args)
363 data = json.loads(mock_out.getvalue())
364 missing = self.REQUIRED_KEYS - set(data)
365 assert not missing, f"Missing JSON keys: {missing}"
366
367
368 # ---------------------------------------------------------------------------
369 # TestPaymentTextMode — text mode must not bleed to stdout
370 # ---------------------------------------------------------------------------
371
372
373 class TestPaymentTextMode(unittest.TestCase):
374 def setUp(self) -> None:
375 self.signing = _make_signing("gabriel")
376
377 def _args(self) -> argparse.Namespace:
378 return argparse.Namespace(
379 from_handle="gabriel",
380 to_handle="alice",
381 amount=1_000_000,
382 nonce="c" * 64,
383 currency="nanoMUSE",
384 memo="",
385 hub=None,
386 key_path=None,
387 agent_id=None,
388 timestamp=int(time.time()),
389 json_out=False,
390 )
391
392 def test_text_mode_stdout_is_empty(self) -> None:
393 """Text mode must not write anything to stdout."""
394 from muse.cli.commands.sign import run_payment
395
396 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
397 patch("sys.stdout", new_callable=io.StringIO) as mock_out:
398 run_payment(self._args())
399 assert mock_out.getvalue() == "", (
400 f"Text mode should not write to stdout, got: {mock_out.getvalue()!r}"
401 )
402
403 def test_text_mode_stderr_has_content(self) -> None:
404 """Text mode must write payment info to stderr."""
405 from muse.cli.commands.sign import run_payment
406
407 with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \
408 patch("sys.stderr", new_callable=io.StringIO) as mock_err:
409 run_payment(self._args())
410 assert mock_err.getvalue().strip(), "Text mode must write to stderr"
411
412
413 # ---------------------------------------------------------------------------
414 # TestAlgorithmDowngradeProtection — canonical_message binds the algorithm
415 # ---------------------------------------------------------------------------
416
417
418 class TestAlgorithmDowngradeProtection(unittest.TestCase):
419 """``canonical_message()`` must include the algorithm as the first line.
420
421 This guards against downgrade attacks: a server using a weaker algorithm
422 cannot accept a signature computed under a stronger one.
423 """
424
425 def test_algorithm_is_first_line(self) -> None:
426 from muse.core.msign import canonical_message
427
428 msg = canonical_message("POST", "/path", 1744000000, b"", host="example.com")
429 first_line = msg.decode().split("\n")[0]
430 assert first_line == "ed25519", (
431 f"First line of canonical_message must be the algorithm, got: {first_line!r}"
432 )
433
434 def test_custom_algorithm_is_bound(self) -> None:
435 from muse.core.msign import canonical_message
436
437 msg = canonical_message(
438 "POST", "/path", 1744000000, b"", host="example.com", algorithm="ed448"
439 )
440 first_line = msg.decode().split("\n")[0]
441 assert first_line == "ed448"
442
443 def test_different_algorithms_produce_different_messages(self) -> None:
444 from muse.core.msign import canonical_message
445
446 msg_25519 = canonical_message("GET", "/x", 1, b"", host="h.io", algorithm="ed25519")
447 msg_448 = canonical_message("GET", "/x", 1, b"", host="h.io", algorithm="ed448")
448 assert msg_25519 != msg_448
449
450 def test_host_is_in_canonical_message(self) -> None:
451 """Host must appear in signed bytes so signature is host-bound."""
452 from muse.core.msign import canonical_message
453
454 msg_prod = canonical_message("GET", "/", 1, b"", host="musehub.ai")
455 msg_staging = canonical_message("GET", "/", 1, b"", host="staging.musehub.ai")
456 assert msg_prod != msg_staging
457
458
459 class TestRegisterFlags(unittest.TestCase):
460 def _parser(self) -> "argparse.ArgumentParser":
461 import argparse
462 from muse.cli.commands.sign import register
463 p = argparse.ArgumentParser()
464 subs = p.add_subparsers()
465 register(subs)
466 return p
467
468 def test_default_json_out_is_false(self) -> None:
469 args = self._parser().parse_args(["sign", "header", "--path", "/test"])
470 assert args.json_out is False
471
472 def test_json_flag_sets_json_out(self) -> None:
473 args = self._parser().parse_args(["sign", "header", "--path", "/test", "--json"])
474 assert args.json_out is True
475
476 def test_j_shorthand_sets_json_out(self) -> None:
477 args = self._parser().parse_args(["sign", "header", "--path", "/test", "-j"])
478 assert args.json_out is True
479
480
481 if __name__ == "__main__":
482 unittest.main()
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago