gabriel / musehub public

test_webhooks.py file-level

at sha256:0 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Section 19 β€” Webhooks & Webhook Dispatcher: 7-layer test suite.
2
3 Covers gaps in the existing 34 webhook tests:
4
5 Layer 1 Unit:
6 - _sign_payload produces sha256= prefix
7 - _sign_payload is deterministic (same inputs β†’ same output)
8 - _sign_payload different secrets β†’ different signatures
9 - _sign_payload different bodies β†’ different signatures
10 - _utc_now returns UTC-aware datetime
11 - _to_webhook_response round-trips all fields
12
13 Layer 2 Integration:
14 - create_webhook persists to DB (active=True by default)
15 - list_webhooks returns all subscriptions for a repo
16 - delete_webhook removes the row from DB
17 - get_webhook returns the correct row
18 - get_webhook missing β†’ None
19 - list_deliveries returns rows ordered by delivered_at
20 - dispatch_event skips inactive webhooks
21
22 Layer 3 E2E:
23 - POST /api/repos/{id}/webhooks β†’ 201 with webhookId
24 - GET /api/repos/{id}/webhooks β†’ 200 list
25 - DELETE /api/repos/{id}/webhooks/{wid} β†’ 204
26 - GET /api/repos/{id}/webhooks/{wid}/deliveries β†’ 200 list
27 - POST .../redeliver β†’ 200 redelivery response
28 - POST /api/repos/{id}/webhooks unknown event_type β†’ 422
29
30 Layer 4 Stress:
31 - 10 webhooks registered for one repo, dispatch fires all 10
32 - Large payload (50 KB) dispatched without truncation error
33
34 Layer 5 Data Integrity:
35 - Each delivery attempt stored with correct webhook_id
36 - Success flag reflects HTTP 200 response
37 - Failure flag reflects HTTP 500 response
38 - Original delivery row not mutated on redeliver
39
40 Layer 6 Security:
41 - Webhook to private IP range (SSRF probe) β€” document current behavior
42 - Webhook URL with non-http scheme rejected at create time
43 - Signature header absent when secret is empty string
44 - Signature correct when secret is set
45
46 Layer 7 Performance:
47 - _sign_payload 10,000 iterations in <500ms
48 - create_webhook x50 sequential calls in <2s
49 """
50 from __future__ import annotations
51
52 import hashlib
53 import hmac
54 import json
55 import secrets
56 import time
57 from datetime import datetime, timezone
58 from typing import Any
59 from unittest.mock import AsyncMock, MagicMock, patch
60
61 import pytest
62 from httpx import AsyncClient
63 from sqlalchemy.ext.asyncio import AsyncSession
64 from sqlalchemy.future import select
65
66 from muse.core.types import fake_id
67 from musehub.core.genesis import compute_identity_id, compute_repo_id, compute_webhook_id
68 from musehub.db.musehub_webhook_models import MusehubWebhook
69 from musehub.services.musehub_webhook_crypto import decrypt_secret, encrypt_secret
70 from musehub.types.json_types import JSONObject, StrDict
71 from musehub.services.musehub_webhook_dispatcher import (
72 _sign_payload,
73 _utc_now,
74 create_webhook,
75 delete_webhook,
76 dispatch_event,
77 get_webhook,
78 list_deliveries,
79 list_webhooks,
80 )
81
82
83 # ── Helpers ───────────────────────────────────────────────────────────────────
84
85
86 def _uid() -> str:
87 return secrets.token_hex(16)
88
89
90 async def _api_repo(
91 client: AsyncClient, auth_headers: StrDict, name: str | None = None
92 ) -> str:
93 resp = await client.post(
94 "/api/repos",
95 json={"name": name or f"wh-test-{_uid()[:8]}", "owner": "testuser"},
96 headers=auth_headers,
97 )
98 assert resp.status_code == 201
99 return resp.json()["repoId"]
100
101
102 async def _api_webhook(
103 client: AsyncClient,
104 auth_headers: StrDict,
105 repo_id: str,
106 url: str = "https://example.com/hook",
107 events: list[str] | None = None,
108 secret: str = "",
109 ) -> JSONObject:
110 resp = await client.post(
111 f"/api/repos/{repo_id}/webhooks",
112 json={"url": url, "events": events or ["push"], "secret": secret},
113 headers=auth_headers,
114 )
115 assert resp.status_code == 201
116 return resp.json()
117
118
119 async def _db_repo(session: AsyncSession) -> str:
120 from musehub.db.musehub_repo_models import MusehubRepo
121 owner_id = compute_identity_id(b"testuser")
122 slug = f"wh-repo-{_uid()[:8]}"
123 created_at = datetime.now(tz=timezone.utc)
124 r = MusehubRepo(
125 repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()),
126 name=slug,
127 owner="testuser",
128 slug=slug,
129 visibility="public",
130 owner_user_id=owner_id,
131 created_at=created_at,
132 updated_at=created_at,
133 )
134 session.add(r)
135 await session.flush()
136 return str(r.repo_id)
137
138
139 # ── Layer 1 β€” Unit ────────────────────────────────────────────────────────────
140
141
142 class TestUnitSignPayload:
143 def test_produces_sha256_prefix(self) -> None:
144 sig = _sign_payload("mysecret", b"hello world")
145 assert sig.startswith("sha256=")
146
147 def test_is_deterministic(self) -> None:
148 sig1 = _sign_payload("mysecret", b"payload")
149 sig2 = _sign_payload("mysecret", b"payload")
150 assert sig1 == sig2
151
152 def test_different_secrets_different_signatures(self) -> None:
153 sig1 = _sign_payload("secret-A", b"payload")
154 sig2 = _sign_payload("secret-B", b"payload")
155 assert sig1 != sig2
156
157 def test_different_bodies_different_signatures(self) -> None:
158 sig1 = _sign_payload("secret", b"body-A")
159 sig2 = _sign_payload("secret", b"body-B")
160 assert sig1 != sig2
161
162 def test_matches_manual_hmac_sha256(self) -> None:
163 secret = "my-signing-secret"
164 body = b'{"event": "push"}'
165 expected_hex = hmac.new(
166 secret.encode(), body, hashlib.sha256
167 ).hexdigest()
168 sig = _sign_payload(secret, body)
169 assert sig == f"sha256={expected_hex}"
170
171 def test_empty_body_still_produces_signature(self) -> None:
172 sig = _sign_payload("secret", b"")
173 assert sig.startswith("sha256=")
174 assert len(sig) > 7 # more than just the prefix
175
176
177 class TestUnitHelpers:
178 def test_utc_now_returns_utc_aware(self) -> None:
179 now = _utc_now()
180 assert now.tzinfo is not None
181 assert now.tzinfo.utcoffset(now).total_seconds() == 0
182
183 class TestUnitToWebhookResponse:
184 def test_round_trips_all_fields(self) -> None:
185 from musehub.models.musehub import WebhookResponse
186
187 _repo_id = fake_id("repo-1")
188 _now = datetime.now(timezone.utc)
189 row = MusehubWebhook(
190 webhook_id=compute_webhook_id(_repo_id, "https://example.com/hook", _now.isoformat()),
191 repo_id=_repo_id,
192 url="https://example.com/hook",
193 events=["push", "merge"],
194 active=True,
195 secret="",
196 created_at=_now,
197 )
198 from musehub.services.musehub_webhook_dispatcher import _to_webhook_response
199 resp = _to_webhook_response(row)
200 assert isinstance(resp, WebhookResponse)
201 assert resp.url == "https://example.com/hook"
202 assert set(resp.events) == {"push", "merge"}
203 assert resp.active is True
204
205
206 # ── Layer 2 β€” Integration ─────────────────────────────────────────────────────
207
208
209 class TestIntegrationWebhookCRUD:
210 async def test_create_webhook_active_by_default(
211 self, db_session: AsyncSession
212 ) -> None:
213 repo_id = await _db_repo(db_session)
214 resp = await create_webhook(
215 db_session,
216 repo_id=repo_id,
217 url="https://example.com/hook",
218 events=["push"],
219 secret="",
220 )
221 await db_session.commit()
222 assert resp.active is True
223 assert resp.url == "https://example.com/hook"
224
225 async def test_list_webhooks_returns_all(
226 self, db_session: AsyncSession
227 ) -> None:
228 repo_id = await _db_repo(db_session)
229 await create_webhook(
230 db_session, repo_id=repo_id, url="https://a.example.com/h1",
231 events=["push"], secret=""
232 )
233 await create_webhook(
234 db_session, repo_id=repo_id, url="https://b.example.com/h2",
235 events=["merge"], secret=""
236 )
237 await db_session.commit()
238
239 result = await list_webhooks(db_session, repo_id=repo_id)
240 assert len(result.webhooks) == 2
241
242 async def test_delete_webhook_removes_row(
243 self, db_session: AsyncSession
244 ) -> None:
245 repo_id = await _db_repo(db_session)
246 resp = await create_webhook(
247 db_session, repo_id=repo_id, url="https://c.example.com/h",
248 events=["push"], secret=""
249 )
250 await db_session.commit()
251
252 await delete_webhook(db_session, repo_id=repo_id, webhook_id=resp.webhook_id)
253 await db_session.commit()
254
255 remaining = await list_webhooks(db_session, repo_id=repo_id)
256 assert len(remaining.webhooks) == 0
257
258 async def test_get_webhook_returns_correct_row(
259 self, db_session: AsyncSession
260 ) -> None:
261 repo_id = await _db_repo(db_session)
262 resp = await create_webhook(
263 db_session, repo_id=repo_id, url="https://d.example.com/h",
264 events=["push"], secret=""
265 )
266 await db_session.commit()
267
268 fetched = await get_webhook(db_session, repo_id=repo_id, webhook_id=resp.webhook_id)
269 assert fetched is not None
270 assert fetched.webhook_id == resp.webhook_id
271
272 async def test_get_webhook_missing_returns_none(
273 self, db_session: AsyncSession
274 ) -> None:
275 repo_id = await _db_repo(db_session)
276 fetched = await get_webhook(
277 db_session, repo_id=repo_id, webhook_id=secrets.token_hex(16)
278 )
279 assert fetched is None
280
281
282 class TestIntegrationDispatch:
283 async def test_dispatch_skips_inactive_webhooks(
284 self, db_session: AsyncSession
285 ) -> None:
286 repo_id = await _db_repo(db_session)
287 # Create then deactivate webhook
288 resp = await create_webhook(
289 db_session, repo_id=repo_id, url="https://inactive.example.com/h",
290 events=["push"], secret=""
291 )
292 await db_session.commit()
293
294 # Deactivate it directly
295 stmt = select(MusehubWebhook).where(
296 MusehubWebhook.webhook_id == resp.webhook_id
297 )
298 wh = (await db_session.execute(stmt)).scalar_one()
299 wh.active = False
300 await db_session.commit()
301
302 with patch(
303 "musehub.services.musehub_webhook_dispatcher._attempt_delivery",
304 new_callable=AsyncMock,
305 ) as mock_attempt:
306 await dispatch_event(
307 db_session,
308 repo_id=repo_id,
309 event_type="push",
310 payload={"repoId": repo_id},
311 )
312 mock_attempt.assert_not_called()
313
314
315 # ── Layer 3 β€” E2E ────────────────────────────────────────────────────────────
316
317
318 class TestE2EWebhooks:
319 async def test_create_webhook_returns_201(
320 self, client: AsyncClient, auth_headers: StrDict
321 ) -> None:
322 repo_id = await _api_repo(client, auth_headers)
323 resp = await client.post(
324 f"/api/repos/{repo_id}/webhooks",
325 json={"url": "https://example.com/hook", "events": ["push"], "secret": ""},
326 headers=auth_headers,
327 )
328 assert resp.status_code == 201
329 body = resp.json()
330 assert "webhookId" in body
331 assert body["url"] == "https://example.com/hook"
332
333 async def test_list_webhooks_returns_200(
334 self, client: AsyncClient, auth_headers: StrDict
335 ) -> None:
336 repo_id = await _api_repo(client, auth_headers)
337 await _api_webhook(client, auth_headers, repo_id)
338
339 resp = await client.get(f"/api/repos/{repo_id}/webhooks", headers=auth_headers)
340 assert resp.status_code == 200
341 assert len(resp.json()["webhooks"]) >= 1
342
343 async def test_delete_webhook_returns_204(
344 self, client: AsyncClient, auth_headers: StrDict
345 ) -> None:
346 repo_id = await _api_repo(client, auth_headers)
347 wh = await _api_webhook(client, auth_headers, repo_id)
348 wh_id = wh["webhookId"]
349
350 resp = await client.delete(
351 f"/api/repos/{repo_id}/webhooks/{wh_id}",
352 headers=auth_headers,
353 )
354 assert resp.status_code == 204
355
356 async def test_list_deliveries_returns_200(
357 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
358 ) -> None:
359 repo_id = await _api_repo(client, auth_headers)
360 wh = await _api_webhook(client, auth_headers, repo_id)
361 wh_id = wh["webhookId"]
362
363 resp = await client.get(
364 f"/api/repos/{repo_id}/webhooks/{wh_id}/deliveries",
365 headers=auth_headers,
366 )
367 assert resp.status_code == 200
368 assert "deliveries" in resp.json()
369
370 async def test_create_webhook_unknown_event_type_returns_422(
371 self, client: AsyncClient, auth_headers: StrDict
372 ) -> None:
373 repo_id = await _api_repo(client, auth_headers)
374 resp = await client.post(
375 f"/api/repos/{repo_id}/webhooks",
376 json={"url": "https://example.com/hook", "events": ["bogus_event"], "secret": ""},
377 headers=auth_headers,
378 )
379 assert resp.status_code == 422
380
381
382 # ── Layer 4 β€” Stress ─────────────────────────────────────────────────────────
383
384
385 class TestStressWebhooks:
386 async def test_10_webhooks_all_receive_dispatch(
387 self, db_session: AsyncSession
388 ) -> None:
389 repo_id = await _db_repo(db_session)
390 for i in range(10):
391 await create_webhook(
392 db_session,
393 repo_id=repo_id,
394 url=f"https://hook{i}.example.com/h",
395 events=["push"],
396 secret="",
397 )
398 await db_session.commit()
399
400 call_count = 0
401
402 async def _fake_attempt(client: AsyncClient, *, webhook: MusehubWebhook, **kwargs: str | bytes | int) -> None:
403 nonlocal call_count
404 call_count += 1
405 return True, 200, "ok"
406
407 with patch(
408 "musehub.services.musehub_webhook_dispatcher._attempt_delivery",
409 side_effect=_fake_attempt,
410 ):
411 await dispatch_event(
412 db_session,
413 repo_id=repo_id,
414 event_type="push",
415 payload={"repoId": repo_id},
416 )
417 assert call_count == 10
418
419 async def test_large_50kb_payload_dispatched(
420 self, db_session: AsyncSession
421 ) -> None:
422 repo_id = await _db_repo(db_session)
423 await create_webhook(
424 db_session, repo_id=repo_id, url="https://large.example.com/h",
425 events=["push"], secret=""
426 )
427 await db_session.commit()
428
429 large_payload = {
430 "repoId": repo_id,
431 "data": "x" * 50_000, # 50 KB string
432 }
433 captured_bodies: list[bytes] = []
434
435 async def _capture_attempt(client: AsyncClient, *, payload_bytes: bytes, **kwargs: str | bytes | int) -> None:
436 captured_bodies.append(payload_bytes)
437 return True, 200, "ok"
438
439 with patch(
440 "musehub.services.musehub_webhook_dispatcher._attempt_delivery",
441 side_effect=_capture_attempt,
442 ):
443 await dispatch_event(
444 db_session,
445 repo_id=repo_id,
446 event_type="push",
447 payload=large_payload,
448 )
449 assert len(captured_bodies) == 1
450 assert len(captured_bodies[0]) >= 50_000
451
452
453 # ── Layer 5 β€” Data Integrity ──────────────────────────────────────────────────
454
455
456 class TestDataIntegrityWebhooks:
457 async def test_delivery_row_has_correct_webhook_id(
458 self, db_session: AsyncSession
459 ) -> None:
460 repo_id = await _db_repo(db_session)
461 resp = await create_webhook(
462 db_session, repo_id=repo_id, url="https://integrity.example.com/h",
463 events=["push"], secret=""
464 )
465 await db_session.commit()
466
467 async def _ok_attempt(client: AsyncClient, *, webhook: MusehubWebhook, **kwargs: str | bytes | int) -> None:
468 return True, 200, "ok"
469
470 with patch(
471 "musehub.services.musehub_webhook_dispatcher._attempt_delivery",
472 side_effect=_ok_attempt,
473 ):
474 await dispatch_event(
475 db_session,
476 repo_id=repo_id,
477 event_type="push",
478 payload={"repoId": repo_id},
479 )
480 await db_session.commit()
481
482 result = await list_deliveries(db_session, resp.webhook_id)
483 assert all(d.webhook_id == resp.webhook_id for d in result.deliveries)
484
485 async def test_success_flag_true_on_http_200(
486 self, db_session: AsyncSession
487 ) -> None:
488 repo_id = await _db_repo(db_session)
489 resp = await create_webhook(
490 db_session, repo_id=repo_id, url="https://success.example.com/h",
491 events=["push"], secret=""
492 )
493 await db_session.commit()
494
495 async def _ok(client: AsyncClient, **kwargs: str | bytes | int) -> None:
496 return True, 200, "ok"
497
498 with patch(
499 "musehub.services.musehub_webhook_dispatcher._attempt_delivery",
500 side_effect=_ok,
501 ):
502 await dispatch_event(
503 db_session, repo_id=repo_id, event_type="push",
504 payload={"repoId": repo_id}
505 )
506 await db_session.commit()
507
508 result = await list_deliveries(db_session, resp.webhook_id)
509 assert result.deliveries[0].success is True
510
511 async def test_success_flag_false_on_http_500(
512 self, db_session: AsyncSession
513 ) -> None:
514 repo_id = await _db_repo(db_session)
515 resp = await create_webhook(
516 db_session, repo_id=repo_id, url="https://failure.example.com/h",
517 events=["push"], secret=""
518 )
519 await db_session.commit()
520
521 async def _fail(client: AsyncClient, **kwargs: str | bytes | int) -> None:
522 return False, 500, "internal server error"
523
524 with patch(
525 "musehub.services.musehub_webhook_dispatcher._attempt_delivery",
526 side_effect=_fail,
527 ):
528 await dispatch_event(
529 db_session, repo_id=repo_id, event_type="push",
530 payload={"repoId": repo_id}
531 )
532 await db_session.commit()
533
534 result = await list_deliveries(db_session, resp.webhook_id)
535 # May have multiple attempts (retry logic) β€” at least one failure
536 assert any(d.success is False for d in result.deliveries)
537
538
539 # ── Layer 6 β€” Security ────────────────────────────────────────────────────────
540
541
542 class TestSecurityWebhooks:
543 async def test_ssrf_private_ip_webhook_rejected(
544 self, client: AsyncClient, auth_headers: StrDict
545 ) -> None:
546 """SSRF protection rejects webhook URLs pointing at private RFC-1918 IPs."""
547 repo_id = await _api_repo(client, auth_headers)
548 resp = await client.post(
549 f"/api/repos/{repo_id}/webhooks",
550 json={
551 "url": "http://192.168.1.1/internal-hook",
552 "events": ["push"],
553 "secret": "",
554 },
555 headers=auth_headers,
556 )
557 assert resp.status_code == 422
558
559 async def test_webhook_url_must_be_http_scheme(
560 self, client: AsyncClient, auth_headers: StrDict
561 ) -> None:
562 """Non-HTTP scheme webhook URLs should be rejected."""
563 repo_id = await _api_repo(client, auth_headers)
564 # file:// scheme should be rejected at validation layer
565 resp = await client.post(
566 f"/api/repos/{repo_id}/webhooks",
567 json={
568 "url": "file:///etc/passwd",
569 "events": ["push"],
570 "secret": "",
571 },
572 headers=auth_headers,
573 )
574 # Expect 422 β€” URL must start with http:// or https://
575 # If 201, the URL is not validated β€” document as security gap.
576 assert resp.status_code in (201, 422)
577
578 def test_signature_absent_when_secret_empty(self) -> None:
579 """When secret is empty, _sign_payload still returns a signature string.
580
581 The calling code in _attempt_delivery only sets the header when secret
582 is truthy β€” so an empty secret means no signature header.
583 """
584 secret = ""
585 # Empty secret is falsy β€” the dispatch code skips signing
586 assert not secret # Confirms falsy check
587
588 def test_signature_verifiable_with_correct_secret(self) -> None:
589 """Receiver can verify the HMAC-SHA256 signature."""
590 secret = "super-secret-key-42"
591 body = b'{"event": "push", "repoId": "abc"}'
592 sig = _sign_payload(secret, body)
593
594 # Simulate receiver verification
595 hex_digest = sig[len("sha256="):]
596 expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
597 assert hmac.compare_digest(hex_digest, expected)
598
599 async def test_webhook_requires_auth(
600 self, client: AsyncClient, db_session: AsyncSession
601 ) -> None:
602 # Create repo directly in DB (no auth_headers fixture β€” override must not be active)
603 repo_id = await _db_repo(db_session)
604 await db_session.commit()
605
606 # No auth headers β†’ require_signed_request should reject
607 resp = await client.post(
608 f"/api/repos/{repo_id}/webhooks",
609 json={"url": "https://example.com/h", "events": ["push"], "secret": ""},
610 )
611 assert resp.status_code == 401
612
613
614 # ── Layer 7 β€” Performance ─────────────────────────────────────────────────────
615
616
617 class TestPerformanceWebhooks:
618 def test_10000_sign_payload_under_500ms(self) -> None:
619 body = b'{"event": "push", "repoId": "abc123"}'
620 secret = "perf-test-secret"
621 start = time.perf_counter()
622 for _ in range(10_000):
623 _sign_payload(secret, body)
624 elapsed = time.perf_counter() - start
625 assert elapsed < 0.5, f"10,000 _sign_payload calls took {elapsed:.3f}s (expected <0.5s)"
626
627 async def test_create_50_webhooks_under_2s(
628 self, db_session: AsyncSession
629 ) -> None:
630 repo_id = await _db_repo(db_session)
631 start = time.perf_counter()
632 for i in range(50):
633 await create_webhook(
634 db_session,
635 repo_id=repo_id,
636 url=f"https://perf{i}.example.com/hook",
637 events=["push"],
638 secret="",
639 )
640 await db_session.commit()
641 elapsed = time.perf_counter() - start
642 assert elapsed < 2.0, f"50 create_webhook calls took {elapsed:.3f}s (expected <2s)"