gabriel / musehub public

test_proposal_proposer_signature.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """TDD: Proposer Ed25519 signature on merge proposals.
2
3 Every merge proposal must carry:
4 - proposer_signature — ed25519:<base64url> over the canonical PROPOSE message
5 - proposer_public_key — ed25519:<base64url> of the signing key
6
7 Canonical PROPOSE message (UTF-8, LF line endings):
8 PROPOSE\\n
9 proposal_id: sha256:<hex>\\n
10 repo_id: sha256:<hex>\\n
11 from_branch: <name>\\n
12 to_branch: <name>\\n
13 author: <handle>\\n
14 created_at: <ISO-8601 UTC>
15
16 Acceptance criteria
17 -------------------
18 T1 POST /proposals with a valid signature stores both fields, returns them in
19 the response body as proposerSignature + proposerPublicKey.
20 T2 GET /proposals/{id} returns proposerSignature + proposerPublicKey.
21 T3 POST /proposals with a signature that doesn't verify against the supplied
22 public key returns 422.
23 T4 POST /proposals with a public key that doesn't match the authenticated
24 identity's registered key returns 403.
25 T5 POST /proposals without a signature still creates the proposal (nullable —
26 existing flows must not break); both fields are null in the response.
27 T6 canonical_propose_message() is deterministic — same inputs always produce
28 the same bytes.
29 T7 verify_proposer_signature() accepts a valid sig and raises SignatureError
30 on a tampered message.
31 """
32 from __future__ import annotations
33
34 import base64
35 from datetime import datetime, timezone
36
37 import pytest
38 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
39 from httpx import AsyncClient
40 from sqlalchemy.ext.asyncio import AsyncSession
41
42 from musehub.core.genesis import compute_identity_id, compute_key_id, compute_repo_id
43 from musehub.crypto.keys import key_fingerprint
44 from musehub.db.musehub_auth_models import MusehubAuthKey
45 from musehub.db.musehub_identity_models import MusehubIdentity
46 from musehub.db.musehub_repo_models import MusehubBranch
47 from musehub.proposals.signing import (
48 canonical_propose_message,
49 verify_proposer_signature,
50 )
51 from musehub.types.json_types import StrDict
52 from muse.core.types import b64url_encode, b64url_decode
53
54
55 # Identity ID that matches the conftest `auth_headers` fixture (testuser)
56 _TEST_IDENTITY_ID = compute_identity_id(b"testuser")
57
58
59 # ---------------------------------------------------------------------------
60 # Helpers
61 # ---------------------------------------------------------------------------
62
63 def _make_keypair() -> tuple[Ed25519PrivateKey, str, bytes]:
64 """Return (private_key, 'ed25519:<pub_b64>', pub_bytes) for tests."""
65 priv = Ed25519PrivateKey.generate()
66 pub_bytes = priv.public_key().public_bytes_raw()
67 pub_str = f"ed25519:{b64url_encode(pub_bytes)}"
68 return priv, pub_str, pub_bytes
69
70
71 async def _register_key(db: AsyncSession, identity_id: str, pub_bytes: bytes) -> str:
72 """Insert a MusehubAuthKey row and return the 'ed25519:<b64>' public key string."""
73 key_b64 = b64url_encode(pub_bytes)
74 key_id = compute_key_id(identity_id, key_b64)
75 fingerprint = key_fingerprint(pub_bytes)
76 row = MusehubAuthKey(
77 key_id=key_id,
78 identity_id=identity_id,
79 public_key_b64=key_b64,
80 fingerprint=fingerprint,
81 algorithm="ed25519",
82 label="test-key",
83 )
84 db.add(row)
85 await db.commit()
86 return f"ed25519:{key_b64}"
87
88
89 def _sign_propose(priv: Ed25519PrivateKey, message: bytes) -> str:
90 """Return 'ed25519:<base64url>' signature over message."""
91 sig_bytes = priv.sign(message)
92 return f"ed25519:{b64url_encode(sig_bytes)}"
93
94
95 async def _create_repo(client: AsyncClient, auth_headers: StrDict, name: str) -> str:
96 r = await client.post(
97 "/api/repos",
98 json={"name": name, "owner": "testuser", "initialize": False},
99 headers=auth_headers,
100 )
101 assert r.status_code == 201
102 return str(r.json()["repoId"])
103
104
105 async def _push_branch(db: AsyncSession, repo_id: str, branch_name: str) -> None:
106 from musehub.core.genesis import compute_branch_id
107 branch = MusehubBranch(
108 branch_id=compute_branch_id(repo_id, branch_name),
109 repo_id=repo_id,
110 name=branch_name,
111 head_commit_id=None,
112 )
113 db.add(branch)
114 await db.commit()
115
116
117 # ---------------------------------------------------------------------------
118 # T6 — canonical_propose_message determinism (unit, no DB)
119 # ---------------------------------------------------------------------------
120
121 def test_canonical_propose_message_is_deterministic() -> None:
122 """Same inputs always produce identical bytes."""
123 kwargs = dict(
124 proposal_id="sha256:" + "a" * 64,
125 repo_id="sha256:" + "b" * 64,
126 from_branch="feat/x",
127 to_branch="dev",
128 author="gabriel",
129 created_at=datetime(2026, 5, 8, 19, 30, 34, tzinfo=timezone.utc),
130 )
131 assert canonical_propose_message(**kwargs) == canonical_propose_message(**kwargs)
132
133
134 def test_canonical_propose_message_format() -> None:
135 """Message starts with PROPOSE domain prefix and contains all fields."""
136 created_at = datetime(2026, 5, 8, 19, 30, 34, tzinfo=timezone.utc)
137 msg = canonical_propose_message(
138 proposal_id="sha256:" + "a" * 64,
139 repo_id="sha256:" + "b" * 64,
140 from_branch="feat/identity-v2",
141 to_branch="dev",
142 author="gabriel",
143 created_at=created_at,
144 )
145 text = msg.decode("utf-8")
146 assert text.startswith("PROPOSE\n")
147 assert "proposal_id: sha256:" in text
148 assert "from_branch: feat/identity-v2" in text
149 assert "author: gabriel" in text
150 assert "created_at: 2026-05-08T19:30:34+00:00" in text
151
152
153 # ---------------------------------------------------------------------------
154 # T7 — verify_proposer_signature (unit, no DB)
155 # ---------------------------------------------------------------------------
156
157 def test_verify_proposer_signature_accepts_valid() -> None:
158 priv, pub_str, _ = _make_keypair()
159 msg = b"PROPOSE\ntest message"
160 sig_str = _sign_propose(priv, msg)
161 verify_proposer_signature(message=msg, signature=sig_str, public_key=pub_str)
162
163
164 def test_verify_proposer_signature_rejects_tampered_message() -> None:
165 from musehub.crypto.keys import SignatureError
166 priv, pub_str, _ = _make_keypair()
167 msg = b"PROPOSE\ntest message"
168 sig_str = _sign_propose(priv, msg)
169 with pytest.raises(SignatureError):
170 verify_proposer_signature(
171 message=b"PROPOSE\ntampered message",
172 signature=sig_str,
173 public_key=pub_str,
174 )
175
176
177 def test_verify_proposer_signature_rejects_wrong_key() -> None:
178 from musehub.crypto.keys import SignatureError
179 priv, _, _ = _make_keypair()
180 _, pub_str_other, _ = _make_keypair()
181 msg = b"PROPOSE\ntest message"
182 sig_str = _sign_propose(priv, msg)
183 with pytest.raises(SignatureError):
184 verify_proposer_signature(message=msg, signature=sig_str, public_key=pub_str_other)
185
186
187 # ---------------------------------------------------------------------------
188 # T5 — proposal without signature still creates (backwards compat)
189 # ---------------------------------------------------------------------------
190
191 @pytest.mark.asyncio
192 async def test_create_proposal_without_signature_succeeds(
193 client: AsyncClient,
194 auth_headers: StrDict,
195 db_session: AsyncSession,
196 ) -> None:
197 repo_id = await _create_repo(client, auth_headers, "propose-no-sig-repo")
198 await _push_branch(db_session, repo_id, "feat/no-sig")
199
200 r = await client.post(
201 f"/api/repos/{repo_id}/proposals",
202 json={"title": "No sig proposal", "fromBranch": "feat/no-sig", "toBranch": "main"},
203 headers=auth_headers,
204 )
205 assert r.status_code == 201
206 body = r.json()
207 assert body.get("proposerSignature") is None
208 assert body.get("proposerPublicKey") is None
209
210
211 # ---------------------------------------------------------------------------
212 # T1 — POST with valid signature stores and returns both fields
213 # ---------------------------------------------------------------------------
214
215 @pytest.mark.asyncio
216 async def test_create_proposal_with_valid_signature(
217 client: AsyncClient,
218 auth_headers: StrDict,
219 db_session: AsyncSession,
220 ) -> None:
221 repo_id = await _create_repo(client, auth_headers, "propose-sig-repo")
222 await _push_branch(db_session, repo_id, "feat/signed")
223
224 priv, _, pub_bytes = _make_keypair()
225 pub_str = await _register_key(db_session, _TEST_IDENTITY_ID, pub_bytes)
226
227 import datetime as dt
228 client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat()
229
230 pre_image = canonical_propose_message(
231 repo_id=repo_id,
232 from_branch="feat/signed",
233 to_branch="main",
234 author="testuser",
235 created_at=dt.datetime.fromisoformat(client_ts),
236 )
237 sig_str = _sign_propose(priv, pre_image)
238
239 r = await client.post(
240 f"/api/repos/{repo_id}/proposals",
241 json={
242 "title": "Signed proposal",
243 "fromBranch": "feat/signed",
244 "toBranch": "main",
245 "proposerPublicKey": pub_str,
246 "proposerSignature": sig_str,
247 "proposerTimestamp": client_ts,
248 },
249 headers=auth_headers,
250 )
251 assert r.status_code == 201
252 body = r.json()
253 assert body["proposerSignature"] == sig_str
254 assert body["proposerPublicKey"] == pub_str
255
256
257 # ---------------------------------------------------------------------------
258 # T2 — GET returns stored signature fields
259 # ---------------------------------------------------------------------------
260
261 @pytest.mark.asyncio
262 async def test_get_proposal_returns_signature_fields(
263 client: AsyncClient,
264 auth_headers: StrDict,
265 db_session: AsyncSession,
266 ) -> None:
267 repo_id = await _create_repo(client, auth_headers, "propose-get-sig-repo")
268 await _push_branch(db_session, repo_id, "feat/get-sig")
269
270 priv, _, pub_bytes = _make_keypair()
271 pub_str = await _register_key(db_session, _TEST_IDENTITY_ID, pub_bytes)
272
273 import datetime as dt
274 client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat()
275 pre_image = canonical_propose_message(
276 repo_id=repo_id,
277 from_branch="feat/get-sig",
278 to_branch="main",
279 author="testuser",
280 created_at=dt.datetime.fromisoformat(client_ts),
281 )
282 sig_str = _sign_propose(priv, pre_image)
283
284 create_r = await client.post(
285 f"/api/repos/{repo_id}/proposals",
286 json={
287 "title": "Get sig test",
288 "fromBranch": "feat/get-sig",
289 "toBranch": "main",
290 "proposerPublicKey": pub_str,
291 "proposerSignature": sig_str,
292 "proposerTimestamp": client_ts,
293 },
294 headers=auth_headers,
295 )
296 assert create_r.status_code == 201
297 proposal_id = create_r.json()["proposalId"]
298
299 get_r = await client.get(
300 f"/api/repos/{repo_id}/proposals/{proposal_id}",
301 headers=auth_headers,
302 )
303 assert get_r.status_code == 200
304 body = get_r.json()
305 assert body["proposerSignature"] == sig_str
306 assert body["proposerPublicKey"] == pub_str
307
308
309 # ---------------------------------------------------------------------------
310 # T3 — signature that doesn't verify returns 422
311 # ---------------------------------------------------------------------------
312
313 @pytest.mark.asyncio
314 async def test_create_proposal_bad_signature_returns_422(
315 client: AsyncClient,
316 auth_headers: StrDict,
317 db_session: AsyncSession,
318 ) -> None:
319 repo_id = await _create_repo(client, auth_headers, "propose-badsig-repo")
320 await _push_branch(db_session, repo_id, "feat/bad-sig")
321
322 priv, _, pub_bytes = _make_keypair()
323 pub_str = await _register_key(db_session, _TEST_IDENTITY_ID, pub_bytes)
324 import datetime as dt
325 client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat()
326
327 # Sign the wrong message — key is registered but sig won't verify
328 wrong_sig = _sign_propose(priv, b"PROPOSE\nwrong content")
329
330 r = await client.post(
331 f"/api/repos/{repo_id}/proposals",
332 json={
333 "title": "Bad sig",
334 "fromBranch": "feat/bad-sig",
335 "toBranch": "main",
336 "proposerPublicKey": pub_str,
337 "proposerSignature": wrong_sig,
338 "proposerTimestamp": client_ts,
339 },
340 headers=auth_headers,
341 )
342 assert r.status_code == 422
343
344
345 # ---------------------------------------------------------------------------
346 # T4 — public key not matching registered identity key returns 403
347 # ---------------------------------------------------------------------------
348
349 @pytest.mark.asyncio
350 async def test_create_proposal_unregistered_key_returns_403(
351 client: AsyncClient,
352 auth_headers: StrDict,
353 db_session: AsyncSession,
354 ) -> None:
355 repo_id = await _create_repo(client, auth_headers, "propose-unreg-key-repo")
356 await _push_branch(db_session, repo_id, "feat/unreg-key")
357
358 # Generate a key that is NOT registered to testuser
359 priv, pub_str, _ = _make_keypair()
360 import datetime as dt
361 client_ts = dt.datetime.now(tz=dt.timezone.utc).isoformat()
362 pre_image = (
363 f"PROPOSE\nrepo_id: {repo_id}\nfrom_branch: feat/unreg-key\n"
364 f"to_branch: main\nauthor: testuser\ncreated_at: {client_ts}"
365 ).encode("utf-8")
366 sig_str = _sign_propose(priv, pre_image)
367
368 r = await client.post(
369 f"/api/repos/{repo_id}/proposals",
370 json={
371 "title": "Unregistered key",
372 "fromBranch": "feat/unreg-key",
373 "toBranch": "main",
374 "proposerPublicKey": pub_str,
375 "proposerSignature": sig_str,
376 "proposerTimestamp": client_ts,
377 },
378 headers=auth_headers,
379 )
380 assert r.status_code == 403