"""Tests for ``muse sign`` CLI subcommands. Coverage: - run_header: text output, JSON output, path+hub vs full URL - run_verify: valid → exit 0, invalid sig → exit 1, expired → exit 1, JSON output - run_whoami: env-var source, identity.toml source, JSON output - run_curl: correct curl command format, Authorization header embedded - run_payment: JSON output fields, domain separation from HTTP MSign - _load_signing: exit 1 on missing identity, key-path override """ from __future__ import annotations import argparse import io import json import sys import time import unittest import unittest.mock from collections.abc import Callable from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch from muse.core.types import b64url_decode, b64url_encode if TYPE_CHECKING: from muse.core.transport import SigningIdentity # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _make_signing(handle: str = "gabriel") -> "SigningIdentity": """Return a real SigningIdentity with a fresh Ed25519 key.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.transport import SigningIdentity return SigningIdentity(handle=handle, private_key=Ed25519PrivateKey.generate()) def _public_key_b64(signing: "SigningIdentity") -> str: """Return URL-safe base64 public key (no padding) for a SigningIdentity.""" pub_raw = signing.private_key.public_key().public_bytes_raw() # type: ignore[attr-defined] return b64url_encode(pub_raw) def _make_header(signing: "SigningIdentity", method: str, url: str, body: bytes = b"", ts: int = 1744000000) -> str: """Build an MSign header value for testing verify subcommand.""" from muse.core.msign import build_msign_header return build_msign_header(signing, method, url, body, ts=ts) def _run_cmd(func: Callable[[argparse.Namespace], None], **kwargs: bool | int | str | None) -> argparse.Namespace: """Build an argparse.Namespace from kwargs and call func(args).""" args = argparse.Namespace(**kwargs) func(args) return args # --------------------------------------------------------------------------- # run_header # --------------------------------------------------------------------------- class TestRunHeader(unittest.TestCase): def setUp(self) -> None: self.signing = _make_signing("gabriel") def _args(self, **kwargs: bool | int | str | None) -> argparse.Namespace: defaults = dict( method="POST", path=None, url="https://hub.example.com/gabriel/muse/push", hub=None, body=None, body_file=None, timestamp=1744000000, key_path=None, agent_id=None, json_out=False, ) defaults.update(kwargs) return argparse.Namespace(**defaults) def test_text_output_is_msign_header(self) -> None: from muse.cli.commands.sign import run_header args = self._args() with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(args) output = mock_out.getvalue().strip() assert output.startswith('MSign handle="gabriel"'), f"Unexpected: {output!r}" assert "ts=1744000000" in output assert ' sig="' in output def test_json_output_fields(self) -> None: from muse.cli.commands.sign import run_header args = self._args(json_out=True) with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(args) data = json.loads(mock_out.getvalue()) assert data["handle"] == "gabriel" assert data["method"] == "POST" assert data["signing_ts"] == 1744000000 assert data["algorithm"] == "ed25519" assert "signature_b64" in data assert "fingerprint" in data assert "body_sha256" in data assert data["header_value"].startswith("MSign") def test_path_plus_hub_constructs_url(self) -> None: """--path + --hub must construct a full URL for signing.""" from muse.cli.commands.sign import run_header args = self._args( path="/gabriel/muse/push", url=None, hub="https://staging.musehub.ai", json_out=True, ) with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(args) data = json.loads(mock_out.getvalue()) assert data["path"] == "/gabriel/muse/push" def test_empty_body_uses_empty_sha256(self) -> None: """No body → body_sha256 == sha256(b'') with sha256: prefix.""" import hashlib from muse.cli.commands.sign import run_header from muse.core.types import blob_id args = self._args(json_out=True) with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(args) data = json.loads(mock_out.getvalue()) expected = blob_id(b"") assert data["body_sha256"] == expected def test_inline_body_reflected_in_sha256(self) -> None: """--body flag must be hashed into body_sha256 with sha256: prefix.""" from muse.cli.commands.sign import run_header from muse.core.types import blob_id args = self._args(body="hello world", json_out=True) with patch("muse.cli.commands.sign._load_signing", return_value=self.signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_header(args) data = json.loads(mock_out.getvalue()) expected = blob_id(b"hello world") assert data["body_sha256"] == expected # --------------------------------------------------------------------------- # run_verify # --------------------------------------------------------------------------- class TestRunVerify(unittest.TestCase): def setUp(self) -> None: self.signing = _make_signing("gabriel") self.pub_b64 = _public_key_b64(self.signing) self.url = "https://hub.example.com/gabriel/muse/push" self.ts = int(time.time()) self.header = _make_header(self.signing, "POST", self.url, b"", self.ts) def _args(self, **kwargs: bool | int | str | None) -> argparse.Namespace: defaults = dict( header=self.header, method="POST", url=self.url, public_key_b64=self.pub_b64, body=None, body_file=None, max_age=30, json_out=False, ) defaults.update(kwargs) return argparse.Namespace(**defaults) def test_valid_header_exits_0(self) -> None: from muse.cli.commands.sign import run_verify args = self._args() # Should not raise SystemExit. run_verify(args) def test_valid_header_json_output(self) -> None: from muse.cli.commands.sign import run_verify args = self._args(json_out=True) with patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_verify(args) data = json.loads(mock_out.getvalue()) assert data["valid"] is True assert data["reason"] == "ok" def test_wrong_public_key_exits_1(self) -> None: from muse.cli.commands.sign import run_verify other = _make_signing("other") wrong_pub = _public_key_b64(other) args = self._args(public_key_b64=wrong_pub) with self.assertRaises(SystemExit) as cm: run_verify(args) assert cm.exception.code == 1 def test_wrong_public_key_json_valid_false(self) -> None: from muse.cli.commands.sign import run_verify other = _make_signing("other") wrong_pub = _public_key_b64(other) args = self._args(public_key_b64=wrong_pub, json_out=True) with patch("sys.stdout", new_callable=io.StringIO) as mock_out: with self.assertRaises(SystemExit): run_verify(args) data = json.loads(mock_out.getvalue()) assert data["valid"] is False assert "reason" in data def test_expired_timestamp_exits_1(self) -> None: from muse.cli.commands.sign import run_verify old_ts = int(time.time()) - 9999 old_header = _make_header(self.signing, "POST", self.url, b"", old_ts) args = self._args(header=old_header, max_age=30) with self.assertRaises(SystemExit) as cm: run_verify(args) assert cm.exception.code == 1 def test_malformed_header_exits_1(self) -> None: from muse.cli.commands.sign import run_verify args = self._args(header="Bearer garbage", json_out=True) with patch("sys.stdout", new_callable=io.StringIO) as mock_out: with self.assertRaises(SystemExit) as cm: run_verify(args) assert cm.exception.code == 1 data = json.loads(mock_out.getvalue()) assert data["valid"] is False def test_method_mismatch_exits_1(self) -> None: """A header signed for POST must not verify for GET.""" from muse.cli.commands.sign import run_verify args = self._args(method="GET") with self.assertRaises(SystemExit) as cm: run_verify(args) assert cm.exception.code == 1 def test_body_mismatch_exits_1(self) -> None: """Changing the body after signing must invalidate the header.""" from muse.cli.commands.sign import run_verify args = self._args(body="tampered") with self.assertRaises(SystemExit) as cm: run_verify(args) assert cm.exception.code == 1 def test_custom_max_age_accepted(self) -> None: """A very old timestamp is accepted if max_age is large enough.""" from muse.cli.commands.sign import run_verify old_ts = int(time.time()) - 9999 old_header = _make_header(self.signing, "POST", self.url, b"", old_ts) args = self._args(header=old_header, max_age=99999) # Should not raise. run_verify(args) # --------------------------------------------------------------------------- # run_curl # --------------------------------------------------------------------------- class TestRunCurl(unittest.TestCase): def _args(self, **kwargs: bool | int | str | None) -> argparse.Namespace: defaults = dict( method="POST", url="https://hub.example.com/gabriel/muse/push", hub=None, body=None, body_file=None, content_type="application/json", timestamp=1744000000, key_path=None, agent_id=None, json_out=False, ) defaults.update(kwargs) return argparse.Namespace(**defaults) def test_curl_starts_with_curl(self) -> None: from muse.cli.commands.sign import run_curl signing = _make_signing() args = self._args() with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_curl(args) output = mock_out.getvalue() assert output.strip().startswith("curl -X POST") def test_authorization_header_embedded(self) -> None: """The curl command must include the MSign Authorization header.""" from muse.cli.commands.sign import run_curl signing = _make_signing() args = self._args() with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_curl(args) output = mock_out.getvalue() assert "Authorization: MSign" in output def test_url_appears_in_output(self) -> None: from muse.cli.commands.sign import run_curl signing = _make_signing() url = "https://hub.example.com/gabriel/muse/push" args = self._args(url=url) with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_curl(args) assert url in mock_out.getvalue() def test_body_file_uses_data_binary(self) -> None: """--body-file must produce --data-binary @filename.""" from muse.cli.commands.sign import run_curl signing = _make_signing() args = self._args(body_file="/tmp/payload.bin") with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("muse.cli.commands.sign._read_body", return_value=b"payload"), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_curl(args) output = mock_out.getvalue() assert "--data-binary @/tmp/payload.bin" in output def test_inline_body_uses_data_flag(self) -> None: """--body STRING must produce --data '...'.""" from muse.cli.commands.sign import run_curl signing = _make_signing() args = self._args(body='{"key": "value"}') with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_curl(args) output = mock_out.getvalue() assert "--data" in output assert '{"key": "value"}' in output def test_get_method(self) -> None: from muse.cli.commands.sign import run_curl signing = _make_signing() args = self._args(method="GET") with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_curl(args) assert "curl -X GET" in mock_out.getvalue() # --------------------------------------------------------------------------- # run_payment # --------------------------------------------------------------------------- class TestRunPayment(unittest.TestCase): _NONCE = "a" * 64 # 64-char hex nonce def _args(self, **kwargs: bool | int | str | None) -> argparse.Namespace: defaults = dict( from_handle="alice", to_handle="bob", amount=1_000_000, nonce=self._NONCE, currency="nanoMUSE", memo="stem:sha256:abc123", hub=None, key_path=None, agent_id=None, timestamp=1744000000, json_out=True, ) defaults.update(kwargs) return argparse.Namespace(**defaults) def test_json_output_has_all_fields(self) -> None: from muse.cli.commands.sign import run_payment signing = _make_signing("alice") args = self._args() with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(args) data = json.loads(mock_out.getvalue()) assert data["from_handle"] == "alice" assert data["to_handle"] == "bob" assert data["amount_nano"] == 1_000_000 assert data["currency"] == "nanoMUSE" assert data["nonce_hex"] == self._NONCE assert data["memo"] == "stem:sha256:abc123" assert data["ts"] == 1744000000 assert "signature_b64" in data assert "canonical_message" in data def test_canonical_message_has_mpay_prefix(self) -> None: """Payment canonical message must start with 'MPAY' domain separator.""" from muse.cli.commands.sign import run_payment signing = _make_signing("alice") args = self._args() with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(args) data = json.loads(mock_out.getvalue()) assert data["canonical_message"].startswith("MPAY\n") def test_signature_is_verifiable(self) -> None: """The payment signature must verify against the signer's public key.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.cli.commands.sign import run_payment from muse.core.transport import SigningIdentity private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="alice", private_key=private_key) args = self._args() with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(args) data = json.loads(mock_out.getvalue()) sig_bytes = b64url_decode(data["signature_b64"]) msg = data["canonical_message"].encode() # Must not raise InvalidSignature. private_key.public_key().verify(sig_bytes, msg) def test_domain_separation_from_http_msign(self) -> None: """Payment signature must NOT verify against the HTTP MSign canonical message.""" from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.cli.commands.sign import run_payment from muse.core.msign import canonical_message from muse.core.transport import SigningIdentity private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="alice", private_key=private_key) args = self._args() with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(args) data = json.loads(mock_out.getvalue()) sig_bytes = b64url_decode(data["signature_b64"]) # Try to verify the payment sig against an HTTP canonical message — must fail. http_msg = canonical_message("POST", "/alice/bob", 1744000000, b"", host="hub") with self.assertRaises(InvalidSignature): private_key.public_key().verify(sig_bytes, http_msg) def test_deterministic_at_fixed_timestamp(self) -> None: """Same key + same inputs + same ts → same signature every time.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.cli.commands.sign import run_payment from muse.core.transport import SigningIdentity private_key = Ed25519PrivateKey.generate() signing = SigningIdentity(handle="alice", private_key=private_key) args = self._args() sigs: list[str] = [] for _ in range(3): with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out: run_payment(args) sigs.append(json.loads(mock_out.getvalue())["signature_b64"]) assert sigs[0] == sigs[1] == sigs[2], "Payment signatures must be deterministic" def test_text_output_prints_signature_to_stderr(self) -> None: """Text mode must print payment info (including signature) to stderr only.""" from muse.cli.commands.sign import run_payment signing = _make_signing("alice") args = self._args(json_out=False) with patch("muse.cli.commands.sign._load_signing", return_value=signing), \ patch("sys.stdout", new_callable=io.StringIO) as mock_out, \ patch("sys.stderr", new_callable=io.StringIO) as mock_err: run_payment(args) # stdout must be empty — signature no longer bleeds to stdout assert mock_out.getvalue() == "", f"stdout must be empty in text mode, got: {mock_out.getvalue()!r}" # stderr must contain the signature stderr = mock_err.getvalue() assert "Signature:" in stderr, f"'Signature:' missing from stderr: {stderr!r}" # --------------------------------------------------------------------------- # _load_signing — identity resolution # --------------------------------------------------------------------------- class TestLoadSigning(unittest.TestCase): def test_exits_1_when_no_identity(self) -> None: """When get_signing_identity returns None, must exit with code 1.""" from muse.cli.commands.sign import _load_signing with patch("muse.cli.commands.sign.get_signing_identity", return_value=None, create=True): # Patch the import inside _load_signing. with patch("muse.cli.config.get_signing_identity", return_value=None): with self.assertRaises(SystemExit) as cm: _load_signing(hub="https://hub.example.com") assert cm.exception.code == 1 # --------------------------------------------------------------------------- # _load_signing call-site signature — stale key_path positional arg # --------------------------------------------------------------------------- class TestLoadSigningCallSites(unittest.TestCase): """run_header / run_curl / run_payment must not pass stale key_path to _load_signing. When --key-path was removed from the parser, three call sites were left passing getattr(args, "key_path", None) as a positional arg. _load_signing(hub, agent_id=None) only accepts 2 params, so passing 3 raises TypeError at runtime. Coverage -------- CS-1 run_header does not TypeError when agent_id is set CS-2 run_curl does not TypeError when agent_id is set CS-3 run_payment does not TypeError when agent_id is set CS-4 agent_id is forwarded correctly by run_header (not lost to key_path slot) """ def _signing(self) -> "SigningIdentity": return _make_signing("gabriel") def test_CS1_run_header_no_type_error(self) -> None: """CS-1: run_header must not raise TypeError when key_path absent from args.""" from muse.cli.commands.sign import run_header args = argparse.Namespace( method="GET", path=None, url="https://hub.example.com/test", hub=None, body=None, body_file=None, timestamp=None, agent_id=None, json_out=False, ) with patch("muse.cli.config.get_signing_identity", return_value=self._signing()), \ patch("sys.stdout", new_callable=io.StringIO): run_header(args) # must not raise TypeError def test_CS2_run_curl_no_type_error(self) -> None: """CS-2: run_curl must not raise TypeError when key_path absent from args.""" from muse.cli.commands.sign import run_curl args = argparse.Namespace( method="GET", url="https://hub.example.com/test", hub=None, body=None, body_file=None, content_type="application/json", timestamp=None, agent_id=None, ) with patch("muse.cli.config.get_signing_identity", return_value=self._signing()), \ patch("sys.stdout", new_callable=io.StringIO): run_curl(args) # must not raise TypeError def test_CS3_run_payment_no_type_error(self) -> None: """CS-3: run_payment must not raise TypeError when key_path absent from args.""" from muse.cli.commands.sign import run_payment args = argparse.Namespace( hub=None, from_handle="alice", to_handle="bob", amount=1000000, nonce="ab" * 32, memo="", currency="nanoMUSE", timestamp=None, agent_id=None, json_out=True, ) with patch("muse.cli.config.get_signing_identity", return_value=self._signing()), \ patch("sys.stdout", new_callable=io.StringIO): run_payment(args) # must not raise TypeError def test_CS4_agent_id_forwarded_by_run_header(self) -> None: """CS-4: agent_id passed in args must reach get_signing_identity.""" from muse.cli.commands.sign import run_header args = argparse.Namespace( method="GET", path=None, url="https://hub.example.com/test", hub="https://hub.example.com", body=None, body_file=None, timestamp=None, agent_id="my-agent", json_out=False, ) captured_kwargs: list[dict] = [] def _fake_get_signing(remote_url: str | None = None, agent_id: str | None = None) -> "SigningIdentity": captured_kwargs.append({"remote_url": remote_url, "agent_id": agent_id}) return _make_signing("my-agent") with patch("muse.cli.config.get_signing_identity", side_effect=_fake_get_signing), \ patch("sys.stdout", new_callable=io.StringIO): run_header(args) assert captured_kwargs, "get_signing_identity was not called" assert captured_kwargs[0]["agent_id"] == "my-agent", ( f"agent_id not forwarded — got {captured_kwargs[0]['agent_id']!r}" ) if __name__ == "__main__": unittest.main()