"""TDD: Proposer Ed25519 signature on merge proposals. Every merge proposal must carry: - proposer_signature — ed25519: over the canonical PROPOSE message - proposer_public_key — ed25519: of the signing key Canonical PROPOSE message (UTF-8, LF line endings): PROPOSE\\n proposal_id: sha256:\\n repo_id: sha256:\\n from_branch: \\n to_branch: \\n author: \\n created_at: Acceptance criteria ------------------- T1 POST /proposals with a valid signature stores both fields, returns them in the response body as proposerSignature + proposerPublicKey. T2 GET /proposals/{id} returns proposerSignature + proposerPublicKey. T3 POST /proposals with a signature that doesn't verify against the supplied public key returns 422. T4 POST /proposals with a public key that doesn't match the authenticated identity's registered key returns 403. T5 POST /proposals without a signature still creates the proposal (nullable — existing flows must not break); both fields are null in the response. T6 canonical_propose_message() is deterministic — same inputs always produce the same bytes. T7 verify_proposer_signature() accepts a valid sig and raises SignatureError on a tampered message. """ from __future__ import annotations import base64 from datetime import datetime, timezone import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_key_id, compute_repo_id from musehub.crypto.keys import key_fingerprint from musehub.db.musehub_auth_models import MusehubAuthKey from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubBranch from musehub.proposals.signing import ( canonical_propose_message, verify_proposer_signature, ) from musehub.types.json_types import StrDict from muse.core.types import b64url_encode, b64url_decode # Identity ID that matches the conftest `auth_headers` fixture (testuser) _TEST_IDENTITY_ID = compute_identity_id(b"testuser") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_keypair() -> tuple[Ed25519PrivateKey, str, bytes]: """Return (private_key, 'ed25519:', pub_bytes) for tests.""" priv = Ed25519PrivateKey.generate() pub_bytes = priv.public_key().public_bytes_raw() pub_str = f"ed25519:{b64url_encode(pub_bytes)}" return priv, pub_str, pub_bytes async def _register_key(db: AsyncSession, identity_id: str, pub_bytes: bytes) -> str: """Insert a MusehubAuthKey row and return the 'ed25519:' public key string.""" key_b64 = b64url_encode(pub_bytes) key_id = compute_key_id(identity_id, key_b64) fingerprint = key_fingerprint(pub_bytes) row = MusehubAuthKey( key_id=key_id, identity_id=identity_id, public_key_b64=key_b64, fingerprint=fingerprint, algorithm="ed25519", label="test-key", ) db.add(row) await db.commit() return f"ed25519:{key_b64}" def _sign_propose(priv: Ed25519PrivateKey, message: bytes) -> str: """Return 'ed25519:' signature over message.""" sig_bytes = priv.sign(message) return f"ed25519:{b64url_encode(sig_bytes)}" async def _create_repo(client: AsyncClient, auth_headers: StrDict, name: str) -> str: r = await client.post( "/api/repos", json={"name": name, "owner": "testuser", "initialize": False}, headers=auth_headers, ) assert r.status_code == 201 return str(r.json()["repoId"]) async def _push_branch(db: AsyncSession, repo_id: str, branch_name: str) -> None: from musehub.core.genesis import compute_branch_id branch = MusehubBranch( branch_id=compute_branch_id(repo_id, branch_name), repo_id=repo_id, name=branch_name, head_commit_id=None, ) db.add(branch) await db.commit() # --------------------------------------------------------------------------- # T6 — canonical_propose_message determinism (unit, no DB) # --------------------------------------------------------------------------- def test_canonical_propose_message_is_deterministic() -> None: """Same inputs always produce identical bytes.""" kwargs = dict( proposal_id="sha256:" + "a" * 64, repo_id="sha256:" + "b" * 64, from_branch="feat/x", to_branch="dev", author="gabriel", created_at=datetime(2026, 5, 8, 19, 30, 34, tzinfo=timezone.utc), ) assert canonical_propose_message(**kwargs) == canonical_propose_message(**kwargs) def test_canonical_propose_message_format() -> None: """Message starts with PROPOSE domain prefix and contains all fields.""" created_at = datetime(2026, 5, 8, 19, 30, 34, tzinfo=timezone.utc) msg = canonical_propose_message( proposal_id="sha256:" + "a" * 64, repo_id="sha256:" + "b" * 64, from_branch="feat/identity-v2", to_branch="dev", author="gabriel", created_at=created_at, ) text = msg.decode("utf-8") assert text.startswith("PROPOSE\n") assert "proposal_id: sha256:" in text assert "from_branch: feat/identity-v2" in text assert "author: gabriel" in text assert "created_at: 2026-05-08T19:30:34+00:00" in text # --------------------------------------------------------------------------- # T7 — verify_proposer_signature (unit, no DB) # --------------------------------------------------------------------------- def test_verify_proposer_signature_accepts_valid() -> None: priv, pub_str, _ = _make_keypair() msg = b"PROPOSE\ntest message" sig_str = _sign_propose(priv, msg) verify_proposer_signature(message=msg, signature=sig_str, public_key=pub_str) def test_verify_proposer_signature_rejects_tampered_message() -> None: from musehub.crypto.keys import SignatureError priv, pub_str, _ = _make_keypair() msg = b"PROPOSE\ntest message" sig_str = _sign_propose(priv, msg) with pytest.raises(SignatureError): verify_proposer_signature( message=b"PROPOSE\ntampered message", signature=sig_str, public_key=pub_str, ) def test_verify_proposer_signature_rejects_wrong_key() -> None: from musehub.crypto.keys import SignatureError priv, _, _ = _make_keypair() _, pub_str_other, _ = _make_keypair() msg = b"PROPOSE\ntest message" sig_str = _sign_propose(priv, msg) with pytest.raises(SignatureError): verify_proposer_signature(message=msg, signature=sig_str, public_key=pub_str_other) # --------------------------------------------------------------------------- # T5 — proposal without signature still creates (backwards compat) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_create_proposal_without_signature_succeeds( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _create_repo(client, auth_headers, "propose-no-sig-repo") await _push_branch(db_session, repo_id, "feat/no-sig") r = await client.post( f"/api/repos/{repo_id}/proposals", json={"title": "No sig proposal", "fromBranch": "feat/no-sig", "toBranch": "main"}, headers=auth_headers, ) assert r.status_code == 201 body = r.json() assert body.get("proposerSignature") is None assert body.get("proposerPublicKey") is None # --------------------------------------------------------------------------- # T1 — POST with valid signature stores and returns both fields # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_create_proposal_with_valid_signature( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _create_repo(client, auth_headers, "propose-sig-repo") await _push_branch(db_session, repo_id, "feat/signed") priv, _, pub_bytes = _make_keypair() pub_str = await _register_key(db_session, _TEST_IDENTITY_ID, pub_bytes) import datetime as dt client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat() pre_image = canonical_propose_message( repo_id=repo_id, from_branch="feat/signed", to_branch="main", author="testuser", created_at=dt.datetime.fromisoformat(client_ts), ) sig_str = _sign_propose(priv, pre_image) r = await client.post( f"/api/repos/{repo_id}/proposals", json={ "title": "Signed proposal", "fromBranch": "feat/signed", "toBranch": "main", "proposerPublicKey": pub_str, "proposerSignature": sig_str, "proposerTimestamp": client_ts, }, headers=auth_headers, ) assert r.status_code == 201 body = r.json() assert body["proposerSignature"] == sig_str assert body["proposerPublicKey"] == pub_str # --------------------------------------------------------------------------- # T2 — GET returns stored signature fields # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_proposal_returns_signature_fields( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _create_repo(client, auth_headers, "propose-get-sig-repo") await _push_branch(db_session, repo_id, "feat/get-sig") priv, _, pub_bytes = _make_keypair() pub_str = await _register_key(db_session, _TEST_IDENTITY_ID, pub_bytes) import datetime as dt client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat() pre_image = canonical_propose_message( repo_id=repo_id, from_branch="feat/get-sig", to_branch="main", author="testuser", created_at=dt.datetime.fromisoformat(client_ts), ) sig_str = _sign_propose(priv, pre_image) create_r = await client.post( f"/api/repos/{repo_id}/proposals", json={ "title": "Get sig test", "fromBranch": "feat/get-sig", "toBranch": "main", "proposerPublicKey": pub_str, "proposerSignature": sig_str, "proposerTimestamp": client_ts, }, headers=auth_headers, ) assert create_r.status_code == 201 proposal_id = create_r.json()["proposalId"] get_r = await client.get( f"/api/repos/{repo_id}/proposals/{proposal_id}", headers=auth_headers, ) assert get_r.status_code == 200 body = get_r.json() assert body["proposerSignature"] == sig_str assert body["proposerPublicKey"] == pub_str # --------------------------------------------------------------------------- # T3 — signature that doesn't verify returns 422 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_create_proposal_bad_signature_returns_422( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _create_repo(client, auth_headers, "propose-badsig-repo") await _push_branch(db_session, repo_id, "feat/bad-sig") priv, _, pub_bytes = _make_keypair() pub_str = await _register_key(db_session, _TEST_IDENTITY_ID, pub_bytes) import datetime as dt client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat() # Sign the wrong message — key is registered but sig won't verify wrong_sig = _sign_propose(priv, b"PROPOSE\nwrong content") r = await client.post( f"/api/repos/{repo_id}/proposals", json={ "title": "Bad sig", "fromBranch": "feat/bad-sig", "toBranch": "main", "proposerPublicKey": pub_str, "proposerSignature": wrong_sig, "proposerTimestamp": client_ts, }, headers=auth_headers, ) assert r.status_code == 422 # --------------------------------------------------------------------------- # T4 — public key not matching registered identity key returns 403 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_create_proposal_unregistered_key_returns_403( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _create_repo(client, auth_headers, "propose-unreg-key-repo") await _push_branch(db_session, repo_id, "feat/unreg-key") # Generate a key that is NOT registered to testuser priv, pub_str, _ = _make_keypair() import datetime as dt client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat() pre_image = ( f"PROPOSE\nrepo_id: {repo_id}\nfrom_branch: feat/unreg-key\n" f"to_branch: main\nauthor: testuser\ncreated_at: {client_ts}" ).encode("utf-8") sig_str = _sign_propose(priv, pre_image) r = await client.post( f"/api/repos/{repo_id}/proposals", json={ "title": "Unregistered key", "fromBranch": "feat/unreg-key", "toBranch": "main", "proposerPublicKey": pub_str, "proposerSignature": sig_str, "proposerTimestamp": client_ts, }, headers=auth_headers, ) assert r.status_code == 403