"""E2E — Push Protocol Step 1: POST /{owner}/{slug}/push/mpack-presign. Exercises the entire Step 1 pseudocode flow end-to-end against the real ASGI app with a real DB session and a mocked MinIO backend. Every pseudocode step is logged so failures are easy to locate. Pseudocode under test --------------------- Client computes mpack_id = "sha256:" + sha256(mpack_bytes).hexdigest() Client sends: { mpack_key: mpack_id, size_bytes: len(mpack_bytes) } Server: authenticate(request) # MSign → claims.identity_id, claims.handle validate: mpack_key present → 422 if missing size_bytes ≤ mpack_max_bytes → 413 if exceeded if mpack_daily_upload_limit_bytes > 0: today_bytes = SUM(musehub_daily_push_bytes WHERE identity_id=me AND date=today) if today_bytes >= daily_limit → 429 INSERT INTO musehub_daily_push_bytes (...) COMMIT upload_url = MinIO.presign_put("mpacks/" + mpack_key, ttl=3600s) respond: { upload_url, mpack_key } Tests ----- S1E2E-1 Happy path: valid payload → 200, upload_url and mpack_key in response. S1E2E-2 Missing mpack_key → 422. S1E2E-3 size_bytes exceeds mpack_max_bytes → 413. S1E2E-4 Daily quota exhausted → 429. S1E2E-5 Quota row written to musehub_daily_push_bytes after successful presign. S1E2E-6 Quota is cumulative: second presign adds to existing daily row. S1E2E-7 Unauthenticated request → 401/403. """ from __future__ import annotations import datetime import logging import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from muse.core.mpack import build_presign_payload from muse.core.types import blob_id from musehub.auth.dependencies import require_valid_token from musehub.auth.request_signing import MSignContext from musehub.config import get_settings from musehub.db.database import get_db from musehub.db.musehub_abuse_models import MusehubDailyPushBytes from musehub.main import app from musehub.services.musehub_repository import create_repo from musehub.core.genesis import compute_identity_id logger = logging.getLogger(__name__) _FAKE_UPLOAD_URL = "https://minio.example.com/mpacks/sha256:fake?sig=presigned" _OWNER = "gabriel" _IDENTITY_ID = compute_identity_id(b"gabriel") _REPO_NAME = "step1-e2e-test" _AUTH_CTX = MSignContext( handle=_OWNER, identity_id=_IDENTITY_ID, is_agent=False, is_admin=False, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_db app.dependency_overrides[require_valid_token] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(db_session: AsyncSession) -> MusehubRepo: r = await create_repo( db_session, name=_REPO_NAME, owner=_OWNER, owner_user_id=_IDENTITY_ID, visibility="public", initialize=False, ) await db_session.commit() return r @pytest_asyncio.fixture(autouse=True) async def mock_presign_put() -> None: """Stub MinIO presign_put so tests don't need a live object store.""" from unittest.mock import AsyncMock, MagicMock, patch mock_backend = MagicMock() mock_backend.presign_mpack_put = AsyncMock(return_value=_FAKE_UPLOAD_URL) with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend): yield mock_backend def _presign_body(mpack_bytes: bytes) -> bytes: payload = build_presign_payload(mpack_bytes) logger.info( "[step1] CLIENT: computed mpack_key=%s size_bytes=%d", payload["mpack_key"][:27], payload["size_bytes"], ) return msgpack.packb(payload, use_bin_type=True) # --------------------------------------------------------------------------- # S1E2E-1 — Happy path # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_s1e2e1_happy_path(client: AsyncClient, repo: MusehubRepo) -> None: """Full Step 1 happy path: valid payload → 200, upload_url + mpack_key returned.""" mpack_bytes = b"commits+snapshots+objects" * 100 expected_key = blob_id(mpack_bytes) logger.info("[step1] CLIENT: building presign payload") body = _presign_body(mpack_bytes) logger.info("[step1] CLIENT: POST /%s/%s/push/mpack-presign", _OWNER, _REPO_NAME) resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: responded HTTP %d", resp.status_code) assert resp.status_code == 200, f"expected 200, got {resp.status_code}: {resp.text}" data = resp.json() logger.info("[step1] SERVER: upload_url=%s mpack_key=%s", str(data.get("upload_url", ""))[:40], str(data.get("mpack_key", ""))[:27]) logger.info("[step1] ASSERT: upload_url is present and non-empty") assert data.get("upload_url"), "upload_url missing from response" logger.info("[step1] ASSERT: mpack_key echoed back matches client-computed key") assert data["mpack_key"] == expected_key # --------------------------------------------------------------------------- # S1E2E-2 — Missing mpack_key → 422 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_s1e2e2_missing_mpack_key_returns_422(client: AsyncClient, repo: MusehubRepo) -> None: """Server validates mpack_key present → 422 when absent.""" body = msgpack.packb({"size_bytes": 1024}, use_bin_type=True) logger.info("[step1] CLIENT: sending body WITHOUT mpack_key") resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: responded HTTP %d (expected 422)", resp.status_code) assert resp.status_code == 422, f"expected 422, got {resp.status_code}" # --------------------------------------------------------------------------- # S1E2E-3 — size_bytes exceeds limit → 413 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_s1e2e3_oversized_payload_returns_413(client: AsyncClient, repo: MusehubRepo) -> None: """Server rejects size_bytes > mpack_max_bytes with 413.""" settings = get_settings() over_limit = settings.mpack_max_bytes + 1 mpack_bytes = b"x" payload = {"mpack_key": blob_id(mpack_bytes), "size_bytes": over_limit} body = msgpack.packb(payload, use_bin_type=True) logger.info( "[step1] CLIENT: sending size_bytes=%d (limit=%d)", over_limit, settings.mpack_max_bytes, ) resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: responded HTTP %d (expected 413)", resp.status_code) assert resp.status_code == 413, f"expected 413, got {resp.status_code}" # --------------------------------------------------------------------------- # S1E2E-4 — Daily quota exhausted → 429 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_s1e2e4_daily_quota_exhausted_returns_429( client: AsyncClient, repo: MusehubRepo, db_session: AsyncSession ) -> None: """When today_bytes >= daily_limit, server returns 429 before presigning.""" settings = get_settings() if settings.mpack_daily_upload_limit_bytes <= 0: pytest.skip("daily quota disabled in this environment") today = datetime.date.today() logger.info( "[step1] SETUP: seeding daily_push_bytes row at limit (%d bytes)", settings.mpack_daily_upload_limit_bytes, ) from sqlalchemy.dialects.postgresql import insert as pg_insert await db_session.execute( pg_insert(MusehubDailyPushBytes).values( identity_id=_IDENTITY_ID, date=today, bytes_uploaded=settings.mpack_daily_upload_limit_bytes, updated_at=datetime.datetime.now(datetime.timezone.utc), ).on_conflict_do_update( index_elements=["identity_id", "date"], set_={"bytes_uploaded": settings.mpack_daily_upload_limit_bytes}, ) ) await db_session.commit() mpack_bytes = b"small mpack" body = _presign_body(mpack_bytes) logger.info("[step1] CLIENT: POST presign after quota exhausted") resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: responded HTTP %d (expected 429)", resp.status_code) assert resp.status_code == 429, f"expected 429, got {resp.status_code}" # --------------------------------------------------------------------------- # S1E2E-5 — Quota row written after successful presign # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_s1e2e5_quota_row_written_after_presign( client: AsyncClient, repo: MusehubRepo, db_session: AsyncSession ) -> None: """After a successful presign, musehub_daily_push_bytes has a row for today.""" settings = get_settings() if settings.mpack_daily_upload_limit_bytes <= 0: pytest.skip("daily quota disabled in this environment") mpack_bytes = b"x" * 2048 body = _presign_body(mpack_bytes) logger.info("[step1] CLIENT: POST presign (quota tracking enabled)") resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: responded HTTP %d", resp.status_code) assert resp.status_code == 200 logger.info("[step1] DB: querying musehub_daily_push_bytes for identity_id=%s", _IDENTITY_ID[:20]) today = datetime.date.today() result = await db_session.execute( select(func.coalesce(func.sum(MusehubDailyPushBytes.bytes_uploaded), 0)).where( MusehubDailyPushBytes.identity_id == _IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) recorded = int(result.scalar()) logger.info("[step1] DB: bytes_uploaded=%d (expected=%d)", recorded, len(mpack_bytes)) assert recorded == len(mpack_bytes), ( f"quota row not written correctly: got {recorded}, expected {len(mpack_bytes)}" ) # --------------------------------------------------------------------------- # S1E2E-6 — Quota is cumulative across calls # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_s1e2e6_quota_is_cumulative( client: AsyncClient, repo: MusehubRepo, db_session: AsyncSession ) -> None: """Second presign call adds to the existing daily row — does not reset it.""" settings = get_settings() if settings.mpack_daily_upload_limit_bytes <= 0: pytest.skip("daily quota disabled in this environment") mpack_a = b"a" * 1000 mpack_b = b"b" * 500 logger.info("[step1] CLIENT: first presign (%d bytes)", len(mpack_a)) resp_a = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=_presign_body(mpack_a), headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: first presign → HTTP %d", resp_a.status_code) assert resp_a.status_code == 200 logger.info("[step1] CLIENT: second presign (%d bytes)", len(mpack_b)) resp_b = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=_presign_body(mpack_b), headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: second presign → HTTP %d", resp_b.status_code) assert resp_b.status_code == 200 today = datetime.date.today() result = await db_session.execute( select(func.coalesce(func.sum(MusehubDailyPushBytes.bytes_uploaded), 0)).where( MusehubDailyPushBytes.identity_id == _IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) recorded = int(result.scalar()) expected = len(mpack_a) + len(mpack_b) logger.info("[step1] DB: cumulative bytes_uploaded=%d (expected=%d)", recorded, expected) assert recorded == expected, f"cumulative quota wrong: got {recorded}, expected {expected}" # --------------------------------------------------------------------------- # S1E2E-7 — Unauthenticated request → 401/403 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_s1e2e7_unauthenticated_returns_401_or_403( db_session: AsyncSession, repo: MusehubRepo ) -> None: """Without auth override, missing credentials → 401 or 403.""" async def _override_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_db # do NOT override require_valid_token — real auth enforcement mpack_bytes = b"should be rejected" body = _presign_body(mpack_bytes) logger.info("[step1] CLIENT: POST presign with NO auth header") async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: resp = await c.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) logger.info("[step1] SERVER: responded HTTP %d (expected 401 or 403)", resp.status_code) assert resp.status_code in (401, 403), ( f"expected 401 or 403 for unauthenticated request, got {resp.status_code}" ) app.dependency_overrides.clear()