"""Hardening tests for ``muse sign``. Gaps closed ----------- 1. ``duration_ms`` + ``exit_code`` absent from ALL JSON output paths (``header``, ``verify``, ``payment``). 2. ``_emit()`` is a dead stub — text-mode branch does nothing; stub removed. 3. ``run_verify`` valid/invalid JSON must carry the envelope. 4. ``run_payment`` text mode prints 6 lines to stderr then sig to stdout — inconsistent; text mode should use stderr only. 5. Module docstring JSON schemas missing ``duration_ms`` / ``exit_code``. """ from __future__ import annotations import argparse import io import json import time import unittest from typing import TYPE_CHECKING from unittest.mock import patch from muse.core.types import b64url_encode if TYPE_CHECKING: from muse.core.transport import SigningIdentity # --------------------------------------------------------------------------- # Shared helpers (mirrors test_cmd_sign.py pattern) # --------------------------------------------------------------------------- def _make_signing(handle: str = "gabriel") -> "SigningIdentity": from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.transport import SigningIdentity return SigningIdentity(handle=handle, private_key=Ed25519PrivateKey.generate()) def _public_key_b64(signing: "SigningIdentity") -> str: pub_raw = signing.private_key.public_key().public_bytes_raw() # type: ignore[attr-defined] return b64url_encode(pub_raw) def _make_header(signing: "SigningIdentity", method: str, url: str, body: bytes = b"", ts: int | None = None) -> str: from muse.core.msign import build_msign_header return build_msign_header(signing, method, url, body, ts=ts or int(time.time())) # --------------------------------------------------------------------------- # TestElapsedAndExitCode — every JSON output path must carry the envelope # --------------------------------------------------------------------------- class TestElapsedAndExitCode(unittest.TestCase): def setUp(self) -> None: self.signing = _make_signing("gabriel") self.pub_b64 = _public_key_b64(self.signing) self.url = "https://hub.example.com/gabriel/muse/push" self.ts = int(time.time()) self.header_value = _make_header(self.signing, "POST", self.url, ts=self.ts) # ── header ──────────────────────────────────────────────────────────────── def _header_args(self, **extra: bool | int | str | None) -> argparse.Namespace: base = dict( method="POST", path=None, url=self.url, hub=None, body=None, body_file=None, timestamp=self.ts, key_path=None, agent_id=None, json_out=True, ) base.update(extra) return argparse.Namespace(**base) def test_header_json_has_duration_ms(self) -> None: from muse.cli.commands.sign import run_header with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(self._header_args()) data = json.loads(mock_out.getvalue()) assert "duration_ms" in data, f"'duration_ms' missing: {list(data)}" def test_header_json_duration_ms_is_float(self) -> None: from muse.cli.commands.sign import run_header with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(self._header_args()) data = json.loads(mock_out.getvalue()) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_header_json_has_exit_code_zero(self) -> None: from muse.cli.commands.sign import run_header with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(self._header_args()) data = json.loads(mock_out.getvalue()) assert "exit_code" in data, f"'exit_code' missing: {list(data)}" assert data["exit_code"] == 0 # ── verify (valid) ──────────────────────────────────────────────────────── def _verify_args(self, **extra: bool | int | str | None) -> argparse.Namespace: base = dict( header=self.header_value, method="POST", url=self.url, public_key_b64=self.pub_b64, body=None, body_file=None, max_age=300, # generous window to avoid flaky tests json_out=True, ) base.update(extra) return argparse.Namespace(**base) def test_verify_valid_json_has_duration_ms(self) -> None: from muse.cli.commands.sign import run_verify with patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_verify(self._verify_args()) data = json.loads(mock_out.getvalue()) assert "duration_ms" in data, f"'duration_ms' missing: {list(data)}" def test_verify_valid_json_has_exit_code_zero(self) -> None: from muse.cli.commands.sign import run_verify with patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_verify(self._verify_args()) data = json.loads(mock_out.getvalue()) assert "exit_code" in data, f"'exit_code' missing: {list(data)}" assert data["exit_code"] == 0 def test_verify_valid_json_duration_ms_is_float(self) -> None: from muse.cli.commands.sign import run_verify with patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_verify(self._verify_args()) data = json.loads(mock_out.getvalue()) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_verify_invalid_json_has_duration_ms(self) -> None: """Even failed verification carries the envelope.""" from muse.cli.commands.sign import run_verify other_signing = _make_signing("impostor") bad_pub_b64 = _public_key_b64(other_signing) args = self._verify_args(public_key_b64=bad_pub_b64) with patch("sys.stdout", new_callable=io.StringIO) as mock_out: try: run_verify(args) except SystemExit: pass data = json.loads(mock_out.getvalue()) assert "duration_ms" in data, f"'duration_ms' missing from invalid verify JSON: {list(data)}" def test_verify_invalid_json_has_exit_code_nonzero(self) -> None: from muse.cli.commands.sign import run_verify other_signing = _make_signing("impostor") bad_pub_b64 = _public_key_b64(other_signing) args = self._verify_args(public_key_b64=bad_pub_b64) with patch("sys.stdout", new_callable=io.StringIO) as mock_out: try: run_verify(args) except SystemExit: pass data = json.loads(mock_out.getvalue()) assert "exit_code" in data assert data["exit_code"] != 0 # ── payment ─────────────────────────────────────────────────────────────── def _payment_args(self, **extra: bool | int | str | None) -> argparse.Namespace: base = dict( from_handle="gabriel", to_handle="alice", amount=1_000_000, nonce="a" * 64, currency="nanoMUSE", memo="stem:sha256:deadbeef", hub=None, key_path=None, agent_id=None, timestamp=self.ts, json_out=True, ) base.update(extra) return argparse.Namespace(**base) def test_payment_json_has_duration_ms(self) -> None: from muse.cli.commands.sign import run_payment with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(self._payment_args()) data = json.loads(mock_out.getvalue()) assert "duration_ms" in data, f"'duration_ms' missing: {list(data)}" def test_payment_json_duration_ms_is_float(self) -> None: from muse.cli.commands.sign import run_payment with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(self._payment_args()) data = json.loads(mock_out.getvalue()) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_payment_json_has_exit_code_zero(self) -> None: from muse.cli.commands.sign import run_payment with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(self._payment_args()) data = json.loads(mock_out.getvalue()) assert "exit_code" in data, f"'exit_code' missing: {list(data)}" assert data["exit_code"] == 0 # --------------------------------------------------------------------------- # TestHeaderJsonSchema — all documented fields present + envelope # --------------------------------------------------------------------------- class TestHeaderJsonSchema(unittest.TestCase): REQUIRED_KEYS = { "handle", "hub", "method", "path", "signing_ts", "body_sha256", "signature_b64", "header_value", "algorithm", "fingerprint", "duration_ms", "exit_code", } def test_all_required_keys_present(self) -> None: from muse.cli.commands.sign import run_header signing = _make_signing("gabriel") ts = int(time.time()) args = argparse.Namespace( method="POST", path=None, url="https://hub.example.com/gabriel/muse/push", hub="https://hub.example.com", body=None, body_file=None, timestamp=ts, key_path=None, agent_id=None, json_out=True, ) with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(args) data = json.loads(mock_out.getvalue()) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing JSON keys: {missing}" # --------------------------------------------------------------------------- # TestVerifyJsonSchema — valid and invalid paths both have full schema # --------------------------------------------------------------------------- class TestVerifyJsonSchema(unittest.TestCase): REQUIRED_KEYS = {"valid", "reason", "duration_ms", "exit_code"} def setUp(self) -> None: self.signing = _make_signing("gabriel") self.pub_b64 = _public_key_b64(self.signing) self.url = "https://hub.example.com/gabriel/muse/push" self.ts = int(time.time()) self.header_value = _make_header(self.signing, "POST", self.url, ts=self.ts) def test_valid_has_all_required_keys(self) -> None: from muse.cli.commands.sign import run_verify args = argparse.Namespace( header=self.header_value, method="POST", url=self.url, public_key_b64=self.pub_b64, body=None, body_file=None, max_age=300, json_out=True, ) with patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_verify(args) data = json.loads(mock_out.getvalue()) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing JSON keys: {missing}" def test_invalid_has_all_required_keys(self) -> None: from muse.cli.commands.sign import run_verify other = _make_signing("impostor") args = argparse.Namespace( header=self.header_value, method="POST", url=self.url, public_key_b64=_public_key_b64(other), body=None, body_file=None, max_age=300, json_out=True, ) with patch("sys.stdout", new_callable=io.StringIO) as mock_out: try: run_verify(args) except SystemExit: pass data = json.loads(mock_out.getvalue()) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing JSON keys in invalid-verify JSON: {missing}" # --------------------------------------------------------------------------- # TestPaymentJsonSchema — all documented fields present + envelope # --------------------------------------------------------------------------- class TestPaymentJsonSchema(unittest.TestCase): REQUIRED_KEYS = { "from_handle", "to_handle", "amount_nano", "currency", "nonce_hex", "memo", "ts", "signature_b64", "canonical_message", "duration_ms", "exit_code", } def test_all_required_keys_present(self) -> None: from muse.cli.commands.sign import run_payment signing = _make_signing("gabriel") ts = int(time.time()) args = argparse.Namespace( from_handle="gabriel", to_handle="alice", amount=1_000_000, nonce="b" * 64, currency="nanoMUSE", memo="", hub=None, key_path=None, agent_id=None, timestamp=ts, json_out=True, ) with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(args) data = json.loads(mock_out.getvalue()) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing JSON keys: {missing}" # --------------------------------------------------------------------------- # TestPaymentTextMode — text mode must not bleed to stdout # --------------------------------------------------------------------------- class TestPaymentTextMode(unittest.TestCase): def setUp(self) -> None: self.signing = _make_signing("gabriel") def _args(self) -> argparse.Namespace: return argparse.Namespace( from_handle="gabriel", to_handle="alice", amount=1_000_000, nonce="c" * 64, currency="nanoMUSE", memo="", hub=None, key_path=None, agent_id=None, timestamp=int(time.time()), json_out=False, ) def test_text_mode_stdout_is_empty(self) -> None: """Text mode must not write anything to stdout.""" from muse.cli.commands.sign import run_payment with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(self._args()) assert mock_out.getvalue() == "", ( f"Text mode should not write to stdout, got: {mock_out.getvalue()!r}" ) def test_text_mode_stderr_has_content(self) -> None: """Text mode must write payment info to stderr.""" from muse.cli.commands.sign import run_payment with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stderr", new_callable=io.StringIO) as mock_err: run_payment(self._args()) assert mock_err.getvalue().strip(), "Text mode must write to stderr" # --------------------------------------------------------------------------- # TestAlgorithmDowngradeProtection — canonical_message binds the algorithm # --------------------------------------------------------------------------- class TestAlgorithmDowngradeProtection(unittest.TestCase): """``canonical_message()`` must include the algorithm as the first line. This guards against downgrade attacks: a server using a weaker algorithm cannot accept a signature computed under a stronger one. """ def test_algorithm_is_first_line(self) -> None: from muse.core.msign import canonical_message msg = canonical_message("POST", "/path", 1744000000, b"", host="example.com") first_line = msg.decode().split("\n")[0] assert first_line == "ed25519", ( f"First line of canonical_message must be the algorithm, got: {first_line!r}" ) def test_custom_algorithm_is_bound(self) -> None: from muse.core.msign import canonical_message msg = canonical_message( "POST", "/path", 1744000000, b"", host="example.com", algorithm="ed448" ) first_line = msg.decode().split("\n")[0] assert first_line == "ed448" def test_different_algorithms_produce_different_messages(self) -> None: from muse.core.msign import canonical_message msg_25519 = canonical_message("GET", "/x", 1, b"", host="h.io", algorithm="ed25519") msg_448 = canonical_message("GET", "/x", 1, b"", host="h.io", algorithm="ed448") assert msg_25519 != msg_448 def test_host_is_in_canonical_message(self) -> None: """Host must appear in signed bytes so signature is host-bound.""" from muse.core.msign import canonical_message msg_prod = canonical_message("GET", "/", 1, b"", host="musehub.ai") msg_staging = canonical_message("GET", "/", 1, b"", host="staging.musehub.ai") assert msg_prod != msg_staging class TestRegisterFlags(unittest.TestCase): def _parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.sign import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) return p def test_default_json_out_is_false(self) -> None: args = self._parser().parse_args(["sign", "header", "--path", "/test"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: args = self._parser().parse_args(["sign", "header", "--path", "/test", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: args = self._parser().parse_args(["sign", "header", "--path", "/test", "-j"]) assert args.json_out is True if __name__ == "__main__": unittest.main()