"""MuseHub webhook dispatcher — event-driven HTTP notification delivery. This module is the single point responsible for delivering webhook events to registered subscriber URLs. It is called by route handlers after a state- changing operation completes (push, issue create/close, proposal create/merge, etc.). Delivery contract: - HTTP POST to the subscriber's ``url`` with a JSON payload. - ``Content-Type: application/json`` - ``X-MuseHub-Event: `` header identifying the event. - ``X-MuseHub-Delivery: `` header for idempotency. - ``X-MuseHub-Signature: sha256=`` header when ``secret`` is set. - Retry policy: up to 3 attempts with exponential back-off (1 s, 2 s, 4 s). - Each attempt is logged as a separate ``MusehubWebhookDelivery`` row. Boundary rules (same as all musehub services): - Must NOT import state stores, SSE queues, or LLM clients. - May import ORM models from musehub.db domain-specific modules. - May import Pydantic models from musehub.models.musehub. """ import asyncio import hashlib import hmac import json import logging from datetime import datetime, timezone import httpx from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_webhook_delivery_id, compute_webhook_id from musehub.db.musehub_webhook_models import MusehubWebhook, MusehubWebhookDelivery from musehub.db.database import AsyncSessionLocal from musehub.types.json_types import StrDict from musehub.models.musehub import ( WebhookDeliveryListResponse, WebhookDeliveryResponse, WebhookEventPayload, WebhookListResponse, WebhookRedeliverResponse, WebhookResponse, ) from musehub.services.musehub_webhook_crypto import decrypt_secret, encrypt_secret logger = logging.getLogger(__name__) # Maximum delivery attempts per dispatch call (initial + 2 retries). _MAX_ATTEMPTS = 3 # Base back-off in seconds; doubled on each retry. _BACKOFF_BASE = 1.0 # HTTP timeout per outbound POST attempt. _REQUEST_TIMEOUT = 10.0 def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) def _sign_payload(secret: str, body: bytes) -> str: """Compute HMAC-SHA256 signature for ``body`` using ``secret``. Returns the hex digest prefixed with ``sha256=``, matching GitHub's webhook signature convention so existing verification libraries work. """ mac = hmac.new(secret.encode(), body, hashlib.sha256) return f"sha256={mac.hexdigest()}" def _to_webhook_response(row: MusehubWebhook) -> WebhookResponse: return WebhookResponse( webhook_id=row.webhook_id, repo_id=row.repo_id, url=row.url, events=list(row.events or []), active=row.active, created_at=row.created_at, ) def _to_delivery_response(row: MusehubWebhookDelivery) -> WebhookDeliveryResponse: return WebhookDeliveryResponse( delivery_id=row.delivery_id, webhook_id=row.webhook_id, event_type=row.event_type, payload=row.payload, attempt=row.attempt, success=row.success, response_status=row.response_status, response_body=row.response_body, delivered_at=row.delivered_at, ) # --------------------------------------------------------------------------- # Webhook CRUD # --------------------------------------------------------------------------- async def create_webhook( session: AsyncSession, *, repo_id: str, url: str, events: list[str], secret: str, ) -> WebhookResponse: """Persist a new webhook subscription and return its wire representation. The webhook is created in active state. The caller must commit the session. """ _wh_now = datetime.now(timezone.utc) webhook = MusehubWebhook( webhook_id=compute_webhook_id(repo_id, url, _wh_now.isoformat()), repo_id=repo_id, url=url, events=events, secret=encrypt_secret(secret), active=True, created_at=_wh_now, updated_at=_wh_now, ) session.add(webhook) await session.flush() await session.refresh(webhook) logger.info("✅ Registered webhook %s for repo %s → %s", webhook.webhook_id, repo_id, url) return _to_webhook_response(webhook) async def list_webhooks( session: AsyncSession, repo_id: str, cursor: str | None = None, limit: int = 20, ) -> WebhookListResponse: """Return webhook subscriptions for a repo with cursor-based keyset pagination. Results are ordered by ``created_at`` ascending. ``cursor`` is the ISO 8601 ``created_at`` of the last seen webhook (opaque to callers — pass ``nextCursor`` from a previous response verbatim). Omit to start from the beginning. """ conditions = [MusehubWebhook.repo_id == repo_id] count_stmt = select(func.count(MusehubWebhook.webhook_id)).where(*conditions) total: int = (await session.execute(count_stmt)).scalar_one() data_conditions = list(conditions) if cursor is not None: data_conditions.append( MusehubWebhook.created_at > datetime.fromisoformat(cursor) ) rows = list( ( await session.execute( select(MusehubWebhook) .where(*data_conditions) .order_by(MusehubWebhook.created_at) .limit(limit + 1) ) ).scalars() ) next_cursor: str | None = None if len(rows) == limit + 1: next_cursor = rows[limit - 1].created_at.isoformat() rows = rows[:limit] return WebhookListResponse( webhooks=[_to_webhook_response(r) for r in rows], total=total, next_cursor=next_cursor, ) async def get_webhook( session: AsyncSession, repo_id: str, webhook_id: str, ) -> WebhookResponse | None: """Return a single webhook by ID, or None if not found in this repo.""" stmt = select(MusehubWebhook).where( MusehubWebhook.repo_id == repo_id, MusehubWebhook.webhook_id == webhook_id, ) row = (await session.execute(stmt)).scalar_one_or_none() if row is None: return None return _to_webhook_response(row) async def delete_webhook( session: AsyncSession, repo_id: str, webhook_id: str, ) -> bool: """Delete a webhook by ID. Returns True if deleted, False if not found. The caller must commit the session after a True result. """ stmt = select(MusehubWebhook).where( MusehubWebhook.repo_id == repo_id, MusehubWebhook.webhook_id == webhook_id, ) row = (await session.execute(stmt)).scalar_one_or_none() if row is None: return False await session.delete(row) await session.flush() logger.info("✅ Deleted webhook %s from repo %s", webhook_id, repo_id) return True async def list_deliveries( session: AsyncSession, webhook_id: str, cursor: str | None = None, limit: int = 50, ) -> WebhookDeliveryListResponse: """Return delivery history for a webhook with cursor-based keyset pagination. Results are ordered by ``delivered_at`` descending (newest first). ``cursor`` is the ISO 8601 ``delivered_at`` of the last seen delivery (opaque to callers — pass ``nextCursor`` from a previous response verbatim). Omit to start from the most recent delivery. """ conditions = [MusehubWebhookDelivery.webhook_id == webhook_id] count_stmt = select(func.count(MusehubWebhookDelivery.delivery_id)).where(*conditions) total: int = (await session.execute(count_stmt)).scalar_one() data_conditions = list(conditions) if cursor is not None: data_conditions.append( MusehubWebhookDelivery.delivered_at < datetime.fromisoformat(cursor) ) rows = list( ( await session.execute( select(MusehubWebhookDelivery) .where(*data_conditions) .order_by(MusehubWebhookDelivery.delivered_at.desc()) .limit(limit + 1) ) ).scalars() ) next_cursor: str | None = None if len(rows) == limit + 1: next_cursor = rows[limit - 1].delivered_at.isoformat() rows = rows[:limit] return WebhookDeliveryListResponse( deliveries=[_to_delivery_response(r) for r in rows], total=total, next_cursor=next_cursor, ) async def get_delivery( session: AsyncSession, webhook_id: str, delivery_id: str, ) -> WebhookDeliveryResponse | None: """Return a single delivery record by ID, or None if not found for this webhook.""" stmt = select(MusehubWebhookDelivery).where( MusehubWebhookDelivery.webhook_id == webhook_id, MusehubWebhookDelivery.delivery_id == delivery_id, ) row = (await session.execute(stmt)).scalar_one_or_none() if row is None: return None return _to_delivery_response(row) async def redeliver_delivery( session: AsyncSession, repo_id: str, webhook_id: str, delivery_id: str, ) -> WebhookRedeliverResponse: """Retry a single past delivery attempt using its stored payload. Fetches the original delivery row to recover the event type and payload, then executes one new delivery attempt (with full retry policy) against the webhook's current URL. Each retry attempt is persisted as a new ``MusehubWebhookDelivery`` row — the original row is never mutated. Raises ``ValueError`` when the delivery or webhook cannot be found, or when the stored payload is empty (delivery predates payload storage). The caller must commit the session after a successful return. """ delivery_stmt = select(MusehubWebhookDelivery).where( MusehubWebhookDelivery.webhook_id == webhook_id, MusehubWebhookDelivery.delivery_id == delivery_id, ) delivery_row = (await session.execute(delivery_stmt)).scalar_one_or_none() if delivery_row is None: raise ValueError(f"Delivery {delivery_id!r} not found for webhook {webhook_id!r}") if not delivery_row.payload: raise ValueError( f"Delivery {delivery_id!r} has no stored payload" "it predates payload storage and cannot be redelivered" ) webhook_stmt = select(MusehubWebhook).where( MusehubWebhook.webhook_id == webhook_id, MusehubWebhook.repo_id == repo_id, ) webhook_row = (await session.execute(webhook_stmt)).scalar_one_or_none() if webhook_row is None: raise ValueError(f"Webhook {webhook_id!r} not found for repo {repo_id!r}") payload_bytes = delivery_row.payload.encode() event_type = delivery_row.event_type final_success = False final_status = 0 final_body = "" async with httpx.AsyncClient() as client: for attempt in range(1, _MAX_ATTEMPTS + 1): delivered_at = _utc_now() new_delivery_id = compute_webhook_delivery_id(webhook_id, event_type, attempt, delivered_at.isoformat()) success, status_code, response_body = await _attempt_delivery( client, webhook=webhook_row, event_type=event_type, payload_bytes=payload_bytes, delivery_id=new_delivery_id, attempt=attempt, ) new_row = MusehubWebhookDelivery( delivery_id=new_delivery_id, webhook_id=webhook_id, event_type=event_type, payload=delivery_row.payload, attempt=attempt, success=success, response_status=status_code, response_body=response_body, delivered_at=delivered_at, ) session.add(new_row) await session.flush() final_success = success final_status = status_code final_body = response_body if success: logger.info( "✅ Redelivery of %s (webhook %s) succeeded on attempt %d (status %d)", delivery_id, webhook_id, attempt, status_code, ) break if attempt < _MAX_ATTEMPTS: backoff = _BACKOFF_BASE * (2 ** (attempt - 1)) logger.warning( "⚠️ Redelivery of %s attempt %d failed (status %d) — retrying in %.1fs", delivery_id, attempt, status_code, backoff, ) await asyncio.sleep(backoff) else: logger.error( "❌ Redelivery of %s failed after %d attempts (last status %d)", delivery_id, _MAX_ATTEMPTS, status_code, ) return WebhookRedeliverResponse( original_delivery_id=delivery_id, webhook_id=webhook_id, event_type=event_type, success=final_success, response_status=final_status, response_body=final_body, ) # --------------------------------------------------------------------------- # Dispatch # --------------------------------------------------------------------------- async def _attempt_delivery( client: httpx.AsyncClient, *, webhook: MusehubWebhook, event_type: str, payload_bytes: bytes, delivery_id: str, attempt: int, ) -> tuple[bool, int, str]: """Execute one HTTP POST attempt and return (success, status_code, body_snippet). Returns (False, 0, error_message) when the request fails at the transport layer (timeout, DNS failure, connection refused). """ headers: StrDict = { "Content-Type": "application/json", "X-MuseHub-Event": event_type, "X-MuseHub-Delivery": delivery_id, "User-Agent": "MuseHub-Webhook/1.0", } if webhook.secret: plaintext_secret = decrypt_secret(webhook.secret) headers["X-MuseHub-Signature"] = _sign_payload(plaintext_secret, payload_bytes) # Defence-in-depth SSRF check with DNS resolution — guards against # DNS rebinding between registration time and delivery time. try: from musehub.security.ssrf import validate_outbound_url await validate_outbound_url(webhook.url) except ValueError as ssrf_exc: return False, 0, f"SSRF blocked: {ssrf_exc}" try: resp = await client.post( webhook.url, content=payload_bytes, headers=headers, timeout=_REQUEST_TIMEOUT, ) success = resp.is_success return success, resp.status_code, resp.text[:512] except httpx.TimeoutException as exc: return False, 0, f"timeout: {exc}" except httpx.RequestError as exc: return False, 0, f"request error: {exc}" async def dispatch_event( session: AsyncSession, *, repo_id: str, event_type: str, payload: WebhookEventPayload, ) -> None: """Dispatch a webhook event to all active subscribers for ``repo_id``. Called by route handlers after a state-changing operation. Does NOT block the caller's HTTP response — this function handles its own retries internally and logs each attempt to ``musehub_webhook_deliveries``. The ``payload`` dict is serialised to camelCase JSON before delivery. It should use snake_case keys; the serialiser converts them automatically via the Pydantic CamelModel aliases so that wire format is consistent with the rest of the MuseHub API. Delivery is best-effort: failures are logged but never raised. """ stmt = select(MusehubWebhook).where( MusehubWebhook.repo_id == repo_id, MusehubWebhook.active.is_(True), ) webhooks = (await session.execute(stmt)).scalars().all() active = [w for w in webhooks if event_type in (w.events or [])] if not active: return payload_bytes = json.dumps(payload, default=str).encode() async with httpx.AsyncClient() as client: for webhook in active: success = False status_code = 0 response_body = "" for attempt in range(1, _MAX_ATTEMPTS + 1): delivered_at = _utc_now() delivery_id = compute_webhook_delivery_id(webhook.webhook_id, event_type, attempt, delivered_at.isoformat()) success, status_code, response_body = await _attempt_delivery( client, webhook=webhook, event_type=event_type, payload_bytes=payload_bytes, delivery_id=delivery_id, attempt=attempt, ) delivery_row = MusehubWebhookDelivery( delivery_id=delivery_id, webhook_id=webhook.webhook_id, event_type=event_type, payload=payload_bytes.decode(), attempt=attempt, success=success, response_status=status_code, response_body=response_body, delivered_at=delivered_at, ) session.add(delivery_row) await session.flush() if success: logger.info( "✅ Webhook %s delivered '%s' on attempt %d (status %d)", webhook.webhook_id, event_type, attempt, status_code, ) break if attempt < _MAX_ATTEMPTS: backoff = _BACKOFF_BASE * (2 ** (attempt - 1)) logger.warning( "⚠️ Webhook %s attempt %d failed (status %d) — retrying in %.1fs", webhook.webhook_id, attempt, status_code, backoff, ) await asyncio.sleep(backoff) else: logger.error( "❌ Webhook %s delivery failed after %d attempts (last status %d)", webhook.webhook_id, _MAX_ATTEMPTS, status_code, ) async def dispatch_event_background( repo_id: str, event_type: str, payload: WebhookEventPayload, ) -> None: """Fire-and-forget webhook dispatch that manages its own DB session. Intended for use with FastAPI ``BackgroundTasks`` so that webhook delivery does not block the HTTP response. Errors are logged but never re-raised. Usage in a route handler:: background_tasks.add_task( dispatch_event_background, repo_id=repo_id, event_type="push", payload={"repoId": repo_id, "branch": branch, ...}, ) """ try: async with AsyncSessionLocal() as session: await dispatch_event(session, repo_id=repo_id, event_type=event_type, payload=payload) await session.commit() except Exception as exc: logger.error( "❌ Background webhook dispatch failed for repo %s event '%s': %s", repo_id, event_type, exc, )