"""Comprehensive tests for muse.core.msign — MSign signing primitives. Coverage -------- Unit - canonical_message: format, body hash, empty body, query string, host, alg - build_msign_header: structure, format, alg field, host extraction - parse_msign_header: valid headers, all error paths, alg field - verify_msign_header: valid round-trip, tampered body, expired timestamp, wrong key, bad public key, bad signature, replay window edge cases - build_payment_claim: structure, canonical message format, chain linkage Data integrity - Ed25519 is deterministic: same key+message+ts → same signature (RFC 8032) - canonical_message is byte-exact (regression against known test vectors) - Empty body → SHA-256 of b"" (not of "null", "{}", or anything else) - Body hash covers raw bytes, not re-serialized JSON - Path includes query string when present - Host is included and normalised (standard ports stripped) - Algorithm is the first field in canonical message Performance - 10 000 sequential build_msign_header calls in < 2 s - build_msign_header has no I/O on the hot path Security - verify rejects tampered body - verify rejects expired timestamp (> max_age seconds) - verify rejects future timestamp (> max_age seconds ahead) - verify rejects signature from a different key - verify rejects truncated signature - verify rejects garbage header - different host → different signature (host is in canonical) """ from __future__ import annotations import hashlib import time from typing import NamedTuple from muse.core.types import b64url_decode, b64url_encode import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey # --------------------------------------------------------------------------- # Fixtures and helpers # --------------------------------------------------------------------------- class _Identity(NamedTuple): handle: str private_key: Ed25519PrivateKey public_key_b64: str def _make_identity(handle: str = "testuser") -> _Identity: pk = Ed25519PrivateKey.generate() pub_bytes = pk.public_key().public_bytes_raw() pub_b64 = b64url_encode(pub_bytes) return _Identity(handle=handle, private_key=pk, public_key_b64=pub_b64) # Fixed seed for determinism tests — do NOT change. _KNOWN_SEED = bytes(range(32)) def _known_identity() -> _Identity: pk = Ed25519PrivateKey.from_private_bytes(_KNOWN_SEED) pub_bytes = pk.public_key().public_bytes_raw() pub_b64 = b64url_encode(pub_bytes) return _Identity(handle="gabriel", private_key=pk, public_key_b64=pub_b64) # --------------------------------------------------------------------------- # Unit: canonical_message # --------------------------------------------------------------------------- class TestCanonicalMessage: def test_format(self) -> None: from muse.core.msign import canonical_message msg = canonical_message( "POST", "/gabriel/muse/push", 1744000000, b"hello", host="staging.musehub.ai", ) body_hash = "sha256:" + hashlib.sha256(b"hello").hexdigest() expected = f"ed25519\nPOST\nstaging.musehub.ai\n/gabriel/muse/push\n1744000000\n{body_hash}" assert msg == expected.encode() def test_empty_body_uses_empty_sha256(self) -> None: from muse.core.msign import canonical_message, EMPTY_BODY_HASH msg = canonical_message("GET", "/x", 1, b"", host="hub.example.com") assert EMPTY_BODY_HASH in msg.decode() assert "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" in msg.decode() def test_query_string_included(self) -> None: from muse.core.msign import canonical_message msg = canonical_message( "GET", "/search?q=foo&page=2", 1, b"", host="hub.example.com" ) assert b"/search?q=foo&page=2" in msg def test_method_uppercase(self) -> None: from muse.core.msign import canonical_message msg = canonical_message("DELETE", "/x", 1, b"", host="hub.example.com") parts = msg.decode().split("\n") assert parts[1] == "DELETE" def test_body_hash_covers_raw_bytes_not_repr(self) -> None: from muse.core.msign import canonical_message body = b'{"key": "value"}' expected = "sha256:" + hashlib.sha256(body).hexdigest() msg = canonical_message("POST", "/x", 1, body, host="h") assert expected in msg.decode() assert "sha256:" + hashlib.sha256(repr(body).encode()).hexdigest() not in msg.decode() def test_six_newline_separated_fields(self) -> None: from muse.core.msign import canonical_message msg = canonical_message("POST", "/path", 12345, b"body", host="localhost:1337") parts = msg.decode().split("\n") assert len(parts) == 6 assert parts[0] == "ed25519" assert parts[1] == "POST" assert parts[2] == "localhost:1337" assert parts[3] == "/path" assert parts[4] == "12345" assert parts[5] == "sha256:" + hashlib.sha256(b"body").hexdigest() def test_algorithm_is_first_field(self) -> None: from muse.core.msign import canonical_message msg = canonical_message("GET", "/x", 1, b"", host="h") assert msg.startswith(b"ed25519\n") def test_host_in_canonical(self) -> None: from muse.core.msign import canonical_message msg = canonical_message("GET", "/x", 1, b"", host="staging.musehub.ai") assert b"staging.musehub.ai" in msg def test_different_hosts_different_output(self) -> None: from muse.core.msign import canonical_message m1 = canonical_message("GET", "/x", 1, b"", host="host-a.com") m2 = canonical_message("GET", "/x", 1, b"", host="host-b.com") assert m1 != m2 def test_custom_algorithm_field(self) -> None: from muse.core.msign import canonical_message msg = canonical_message("GET", "/x", 1, b"", host="h", algorithm="ed25519-v2") assert msg.startswith(b"ed25519-v2\n") # --------------------------------------------------------------------------- # Unit: build_msign_header # --------------------------------------------------------------------------- class TestBuildMsignHeader: def test_format(self) -> None: from muse.core.msign import build_msign_header identity = _make_identity("gabriel") url = "https://staging.musehub.ai/gabriel/muse/push" header = build_msign_header(identity, "POST", url, b"body", ts=1744000000) assert header.startswith('MSign handle="gabriel" alg="ed25519" ts=1744000000 sig="') assert header.endswith('"') def test_contains_all_components(self) -> None: from muse.core.msign import build_msign_header identity = _make_identity("alice") header = build_msign_header(identity, "GET", "https://hub.example.com/x", ts=9999) assert 'handle="alice"' in header assert 'alg="ed25519"' in header assert "ts=9999" in header assert 'sig="' in header def test_sig_is_base64url_no_padding(self) -> None: from muse.core.msign import build_msign_header identity = _make_identity() header = build_msign_header(identity, "POST", "https://hub.example.com/x", b"", ts=1) sig = header.split('sig="')[1].rstrip('"') assert "=" not in sig assert "+" not in sig assert "/" not in sig decoded = b64url_decode(sig) assert len(decoded) == 64 def test_none_body_treated_as_empty(self) -> None: from muse.core.msign import build_msign_header identity = _make_identity() h1 = build_msign_header(identity, "GET", "https://hub.example.com/x", None, ts=1) h2 = build_msign_header(identity, "GET", "https://hub.example.com/x", b"", ts=1) assert h1 == h2 def test_url_query_string_included_in_signing(self) -> None: from muse.core.msign import build_msign_header identity = _make_identity() h1 = build_msign_header(identity, "GET", "https://hub.example.com/x?a=1", ts=1) h2 = build_msign_header(identity, "GET", "https://hub.example.com/x?a=2", ts=1) assert h1 != h2 def test_standard_https_port_stripped(self) -> None: """Port 443 on https must be stripped from host in canonical message.""" from muse.core.msign import build_msign_header identity = _make_identity() # With explicit :443 and without should produce identical signatures. h1 = build_msign_header(identity, "GET", "https://hub.example.com:443/x", ts=1) h2 = build_msign_header(identity, "GET", "https://hub.example.com/x", ts=1) assert h1 == h2 def test_standard_http_port_stripped(self) -> None: from muse.core.msign import build_msign_header identity = _make_identity() h1 = build_msign_header(identity, "GET", "http://hub.example.com:80/x", ts=1) h2 = build_msign_header(identity, "GET", "http://hub.example.com/x", ts=1) assert h1 == h2 def test_nonstandard_port_kept(self) -> None: """localhost:1337 is non-standard — the port must stay in the canonical host.""" from muse.core.msign import build_msign_header identity = _make_identity() h1 = build_msign_header(identity, "GET", "https://localhost:1337/x", ts=1) h2 = build_msign_header(identity, "GET", "http://localhost/x", ts=1) assert h1 != h2 def test_different_hosts_produce_different_sigs(self) -> None: from muse.core.msign import build_msign_header identity = _make_identity() h1 = build_msign_header(identity, "GET", "https://host-a.example.com/x", ts=1) h2 = build_msign_header(identity, "GET", "https://host-b.example.com/x", ts=1) assert h1 != h2 # --------------------------------------------------------------------------- # Data integrity: Ed25519 determinism (RFC 8032) # --------------------------------------------------------------------------- class TestDeterminism: def test_same_inputs_same_signature(self) -> None: from muse.core.msign import build_msign_header identity = _known_identity() url = "https://staging.musehub.ai/path?q=1" body = b"fixed body" results = [ build_msign_header(identity, "POST", url, body, ts=1000) for _ in range(10) ] assert len(set(results)) == 1, "Ed25519 must be deterministic" def test_different_ts_different_signature(self) -> None: from muse.core.msign import build_msign_header identity = _known_identity() h1 = build_msign_header(identity, "POST", "https://hub.example.com/x", b"", ts=1) h2 = build_msign_header(identity, "POST", "https://hub.example.com/x", b"", ts=2) assert h1 != h2 def test_different_body_different_signature(self) -> None: from muse.core.msign import build_msign_header identity = _known_identity() h1 = build_msign_header(identity, "POST", "https://hub.example.com/x", b"aaa", ts=1) h2 = build_msign_header(identity, "POST", "https://hub.example.com/x", b"bbb", ts=1) assert h1 != h2 def test_known_vector(self) -> None: """Regression: known seed → known signature (catches canonical_message drift).""" import urllib.parse from muse.core.msign import build_msign_header, canonical_message, _normalise_host identity = _known_identity() ts = 1744000000 url = "https://staging.musehub.ai/gabriel/muse/push" body = b"" parsed = urllib.parse.urlparse(url) path = parsed.path host = _normalise_host(parsed) msg = canonical_message("POST", path, ts, body, host=host) sig_bytes = identity.private_key.sign(msg) expected_sig = b64url_encode(sig_bytes) header = build_msign_header(identity, "POST", url, body, ts=ts) actual_sig = header.split('sig="')[1].rstrip('"') assert actual_sig == expected_sig # --------------------------------------------------------------------------- # Unit: parse_msign_header # --------------------------------------------------------------------------- class TestParseMsignHeader: def test_valid_header(self) -> None: from muse.core.msign import parse_msign_header h = 'MSign handle="gabriel" alg="ed25519" ts=1744000000 sig="aBcDeFg"' parsed = parse_msign_header(h) assert parsed["handle"] == "gabriel" assert parsed["alg"] == "ed25519" assert parsed["ts"] == 1744000000 assert parsed["sig"] == "aBcDeFg" def test_full_authorization_prefix(self) -> None: from muse.core.msign import parse_msign_header h = 'Authorization: MSign handle="alice" alg="ed25519" ts=1 sig="xyz"' parsed = parse_msign_header(h) assert parsed["handle"] == "alice" assert parsed["alg"] == "ed25519" def test_invalid_raises_value_error(self) -> None: from muse.core.msign import parse_msign_header with pytest.raises(ValueError, match="Not a valid MSign header"): parse_msign_header("Bearer token123") def test_old_four_field_format_raises(self) -> None: """Old format without alg must be rejected.""" from muse.core.msign import parse_msign_header with pytest.raises(ValueError): parse_msign_header('MSign handle="x" ts=42 sig="s"') def test_empty_raises_value_error(self) -> None: from muse.core.msign import parse_msign_header with pytest.raises(ValueError): parse_msign_header("") def test_ts_is_int(self) -> None: from muse.core.msign import parse_msign_header parsed = parse_msign_header('MSign handle="x" alg="ed25519" ts=42 sig="s"') assert isinstance(parsed["ts"], int) def test_alg_field_present(self) -> None: from muse.core.msign import parse_msign_header parsed = parse_msign_header('MSign handle="x" alg="ed25519" ts=1 sig="abc"') assert "alg" in parsed assert parsed["alg"] == "ed25519" def test_roundtrip_with_build(self) -> None: from muse.core.msign import build_msign_header, parse_msign_header identity = _make_identity("gabriel") header = build_msign_header( identity, "POST", "https://staging.musehub.ai/gabriel/muse/push", b"body", ts=1744000000, ) parsed = parse_msign_header(header) assert parsed["handle"] == "gabriel" assert parsed["alg"] == "ed25519" assert parsed["ts"] == 1744000000 # --------------------------------------------------------------------------- # Unit: verify_msign_header # --------------------------------------------------------------------------- class TestVerifyMsignHeader: def _sign_and_header( self, identity: _Identity, method: str = "POST", url: str = "https://hub.example.com/path", body: bytes = b"", ts: int = 1744000000, ) -> str: from muse.core.msign import build_msign_header return build_msign_header(identity, method, url, body, ts=ts) def test_valid_round_trip(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity("gabriel") url = "https://staging.musehub.ai/gabriel/muse/push" body = b"some body content" ts = int(time.time()) header = self._sign_and_header(identity, "POST", url, body, ts=ts) ok, reason = verify_msign_header( header, "POST", url, body, identity.public_key_b64, max_age=300, now=ts, ) assert ok, f"Expected valid, got: {reason}" assert reason == "ok" def test_tampered_body_rejected(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() url = "https://hub.example.com/x" ts = int(time.time()) header = self._sign_and_header(identity, "POST", url, b"original", ts=ts) ok, reason = verify_msign_header( header, "POST", url, b"TAMPERED", identity.public_key_b64, max_age=300, now=ts, ) assert not ok assert "signature" in reason.lower() or "failed" in reason.lower() def test_expired_timestamp_rejected(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() url = "https://hub.example.com/x" ts = 1000 header = self._sign_and_header(identity, "POST", url, b"", ts=ts) ok, reason = verify_msign_header( header, "POST", url, b"", identity.public_key_b64, max_age=30, now=ts + 60, ) assert not ok assert "replay window" in reason def test_future_timestamp_rejected(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() url = "https://hub.example.com/x" ts = 9_000_000_000 header = self._sign_and_header(identity, "POST", url, b"", ts=ts) ok, reason = verify_msign_header( header, "POST", url, b"", identity.public_key_b64, max_age=30, now=int(time.time()), ) assert not ok assert "replay window" in reason def test_timestamp_at_edge_of_window_accepted(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() url = "https://hub.example.com/x" ts = 1000 header = self._sign_and_header(identity, "POST", url, b"", ts=ts) ok, _ = verify_msign_header( header, "POST", url, b"", identity.public_key_b64, max_age=30, now=ts + 30, ) assert ok def test_wrong_key_rejected(self) -> None: from muse.core.msign import verify_msign_header signer = _make_identity("alice") verifier = _make_identity("bob") url = "https://hub.example.com/x" ts = int(time.time()) header = self._sign_and_header(signer, "POST", url, b"", ts=ts) ok, reason = verify_msign_header( header, "POST", url, b"", verifier.public_key_b64, max_age=300, now=ts, ) assert not ok assert "signature" in reason.lower() or "failed" in reason.lower() def test_bad_public_key_rejected(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() url = "https://hub.example.com/x" ts = int(time.time()) header = self._sign_and_header(identity, "POST", url, b"", ts=ts) ok, reason = verify_msign_header( header, "POST", url, b"", "not-valid-base64!!!", max_age=300, now=ts, ) assert not ok assert "public key" in reason.lower() or "invalid" in reason.lower() def test_garbage_header_rejected(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() ok, reason = verify_msign_header( "Bearer totally-not-msign", "POST", "https://hub.example.com/x", b"", identity.public_key_b64, max_age=30, ) assert not ok assert "Not a valid MSign header" in reason def test_none_body_same_as_empty(self) -> None: from muse.core.msign import build_msign_header, verify_msign_header identity = _make_identity() url = "https://hub.example.com/x" ts = int(time.time()) header = build_msign_header(identity, "GET", url, None, ts=ts) ok, _ = verify_msign_header( header, "GET", url, None, identity.public_key_b64, max_age=300, now=ts, ) assert ok def test_different_method_rejected(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() url = "https://hub.example.com/x" ts = int(time.time()) header = self._sign_and_header(identity, "POST", url, b"", ts=ts) ok, _ = verify_msign_header( header, "GET", url, b"", identity.public_key_b64, max_age=300, now=ts, ) assert not ok def test_different_url_rejected(self) -> None: from muse.core.msign import verify_msign_header identity = _make_identity() ts = int(time.time()) header = self._sign_and_header(identity, "POST", "https://hub.example.com/push", b"", ts=ts) ok, _ = verify_msign_header( header, "POST", "https://hub.example.com/pull", b"", identity.public_key_b64, max_age=300, now=ts, ) assert not ok def test_different_host_rejected(self) -> None: """Signature for host-a must not verify against host-b.""" from muse.core.msign import verify_msign_header identity = _make_identity() ts = int(time.time()) header = self._sign_and_header( identity, "POST", "https://host-a.example.com/x", b"", ts=ts ) ok, _ = verify_msign_header( header, "POST", "https://host-b.example.com/x", b"", identity.public_key_b64, max_age=300, now=ts, ) assert not ok def test_localhost_nonstandard_port_in_canonical(self) -> None: """localhost:1337 must be in canonical — round-trip must pass.""" from muse.core.msign import verify_msign_header identity = _make_identity("gabriel") url = "https://localhost:1337/gabriel/muse/push" ts = int(time.time()) header = self._sign_and_header(identity, "POST", url, b"payload", ts=ts) ok, reason = verify_msign_header( header, "POST", url, b"payload", identity.public_key_b64, max_age=300, now=ts, ) assert ok, f"Expected valid, got: {reason}" # --------------------------------------------------------------------------- # Unit: build_payment_claim # --------------------------------------------------------------------------- class TestBuildPaymentClaim: def test_structure(self) -> None: from muse.core.msign import build_payment_claim identity = _make_identity("gabriel") claim = build_payment_claim( identity, from_handle="gabriel", to_handle="stori-node-1", amount_nano=1_000_000, currency="nanoMUSE", nonce_hex="a" * 64, memo="stem:sha256:abc123", ts=1744000000, ) assert claim["from_handle"] == "gabriel" assert claim["to_handle"] == "stori-node-1" assert claim["amount_nano"] == 1_000_000 assert claim["currency"] == "nanoMUSE" assert claim["nonce_hex"] == "a" * 64 assert claim["memo"] == "stem:sha256:abc123" assert claim["ts"] == 1744000000 assert "signature_b64" in claim assert "canonical_message" in claim def test_canonical_message_format(self) -> None: from muse.core.msign import build_payment_claim identity = _make_identity() claim = build_payment_claim( identity, "alice", "bob", 500, "nanoMUSE", "ff" * 32, "memo", ts=1 ) msg = claim["canonical_message"] assert msg.startswith("MPAY\n") parts = msg.split("\n") assert parts[0] == "MPAY" assert parts[1] == "alice" assert parts[2] == "bob" assert parts[3] == "500" assert parts[4] == "nanoMUSE" assert parts[6] == "memo" assert parts[7] == "1" def test_domain_separation_from_http_signing(self) -> None: """MPAY canonical message must differ from MSign canonical message.""" from muse.core.msign import build_msign_header, build_payment_claim identity = _make_identity() ts = 1744000000 http_header = build_msign_header( identity, "POST", "https://hub.example.com/alice", b"", ts=ts ) claim = build_payment_claim( identity, "alice", "bob", 100, "nanoMUSE", "00" * 32, "", ts=ts ) http_sig = http_header.split('sig="')[1].rstrip('"') pay_sig = claim["signature_b64"] assert http_sig != pay_sig def test_deterministic(self) -> None: from muse.core.msign import build_payment_claim identity = _known_identity() claims = [ build_payment_claim(identity, "a", "b", 1, "nanoMUSE", "0" * 64, "", ts=42) for _ in range(5) ] sigs = [c["signature_b64"] for c in claims] assert len(set(sigs)) == 1 def test_chain_linkage(self) -> None: from muse.core.msign import build_payment_claim identity = _make_identity() claim1 = build_payment_claim( identity, "gabriel", "node1", 100, "nanoMUSE", "0" * 64, "first", ts=1 ) nonce2 = hashlib.sha256(claim1["signature_b64"].encode()).hexdigest() claim2 = build_payment_claim( identity, "gabriel", "node1", 100, "nanoMUSE", nonce2, "second", ts=2 ) assert claim2["nonce_hex"] == nonce2 assert claim2["signature_b64"] != claim1["signature_b64"] # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- class TestPerformance: def test_10000_sequential_header_calls_under_2s(self) -> None: from muse.core.msign import build_msign_header identity = _known_identity() url = "https://staging.musehub.ai/gabriel/muse/push" body = b"benchmark body" start = time.perf_counter() for i in range(10_000): build_msign_header(identity, "POST", url, body, ts=i) elapsed = time.perf_counter() - start assert elapsed < 2.0, ( f"10 000 build_msign_header calls took {elapsed:.2f}s — expected < 2s. " "Check for unexpected I/O or import overhead on the hot path." )