"""Push Protocol Step 1 — Tiers 4–7: stress, integrity, performance, security. Tier 4 — Stress: concurrent presign requests don't corrupt the quota counter. Tier 5 — Data integrity: DB schema invariants on musehub_daily_push_bytes. Tier 6 — Performance: presign endpoint latency gate with index coverage. Tier 7 — Security: malformed keys, cross-user quota isolation, unauthenticated access. """ from __future__ import annotations import asyncio import datetime import time import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy import select, func, text from sqlalchemy.ext.asyncio import AsyncSession from muse.core.mpack import build_presign_payload from muse.core.types import blob_id from musehub.auth.dependencies import require_valid_token from musehub.auth.request_signing import MSignContext from musehub.config import get_settings from musehub.core.genesis import compute_identity_id import typing from sqlalchemy.ext.asyncio import async_sessionmaker from musehub.db.database import get_db from musehub.db.musehub_abuse_models import MusehubDailyPushBytes from musehub.db.musehub_repo_models import MusehubRepo from musehub.main import app from musehub.services.musehub_repository import create_repo _FAKE_UPLOAD_URL = "https://minio.example.com/mpacks/sha256:fake?sig=presigned" _OWNER = "gabriel" _IDENTITY_ID = compute_identity_id(b"gabriel") _OTHER_IDENTITY_ID = compute_identity_id(b"aria") _REPO_NAME = "step1-tiers-test" _AUTH_CTX = MSignContext( handle=_OWNER, identity_id=_IDENTITY_ID, is_agent=False, is_admin=False, ) _OTHER_AUTH_CTX = MSignContext( handle="aria", identity_id=_OTHER_IDENTITY_ID, is_agent=False, is_admin=False, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture() async def repo(db_session: AsyncSession) -> MusehubRepo: r = await create_repo( db_session, name=_REPO_NAME, owner=_OWNER, owner_user_id=_IDENTITY_ID, visibility="public", initialize=False, ) await db_session.commit() return r @pytest_asyncio.fixture(autouse=True) async def mock_presign_put() -> None: from unittest.mock import AsyncMock, MagicMock, patch mock_backend = MagicMock() mock_backend.presign_mpack_put = AsyncMock(return_value=_FAKE_UPLOAD_URL) with patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend): yield mock_backend def _make_client(db_session: AsyncSession, auth_ctx: MSignContext = _AUTH_CTX) -> None: async def _override_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_db app.dependency_overrides[require_valid_token] = lambda: auth_ctx return AsyncClient(transport=ASGITransport(app=app), base_url="https://localhost:1337") def _body(mpack_bytes: bytes) -> bytes: return msgpack.packb(build_presign_payload(mpack_bytes), use_bin_type=True) async def _presign(client: AsyncClient, mpack_bytes: bytes) -> int: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=_body(mpack_bytes), headers={"Content-Type": "application/x-msgpack"}, ) return resp.status_code # --------------------------------------------------------------------------- # Tier 4 — Stress # --------------------------------------------------------------------------- @pytest.mark.tier4 @pytest.mark.asyncio async def test_t4_concurrent_quota_writes_not_corrupted( repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession], ) -> None: """20 concurrent record_mpack_bytes_uploaded calls → quota row equals sum of all sizes. Tests the upsert concurrency directly — this is the critical DB operation. """ from musehub.services.musehub_wire import record_mpack_bytes_uploaded settings = get_settings() if settings.mpack_daily_upload_limit_bytes <= 0: pytest.skip("daily quota disabled") chunk = 1000 n = 20 async def _do() -> None: async with session_factory() as sess: await record_mpack_bytes_uploaded(sess, _IDENTITY_ID, chunk) await sess.commit() await asyncio.gather(*[_do() for _ in range(n)]) today = datetime.date.today() async with session_factory() as check_sess: result = await check_sess.execute( select(func.coalesce(func.sum(MusehubDailyPushBytes.bytes_uploaded), 0)).where( MusehubDailyPushBytes.identity_id == _IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) recorded = int(result.scalar()) expected = n * chunk assert recorded == expected, f"concurrent quota corrupted: got {recorded}, expected {expected}" @pytest.mark.tier4 @pytest.mark.asyncio async def test_t4_concurrent_distinct_quota_writes_all_land( repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession], ) -> None: """20 concurrent quota writes for different identities all land without error.""" from musehub.services.musehub_wire import record_mpack_bytes_uploaded n = 20 async def _do(i: int) -> None: identity = compute_identity_id(f"stress-user-{i}".encode()) async with session_factory() as sess: await record_mpack_bytes_uploaded(sess, identity, 512) await sess.commit() await asyncio.gather(*[_do(i) for i in range(n)]) today = datetime.date.today() async with session_factory() as check_sess: result = await check_sess.execute( select(func.count()).select_from(MusehubDailyPushBytes).where( MusehubDailyPushBytes.date == today, MusehubDailyPushBytes.bytes_uploaded == 512, ) ) count = result.scalar() assert count >= n, f"expected {n} rows, got {count}" # --------------------------------------------------------------------------- # Tier 5 — Data integrity # --------------------------------------------------------------------------- @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_quota_pk_is_identity_and_date(db_session: AsyncSession, repo: MusehubRepo) -> None: """(identity_id, date) is the PK — two rows for the same identity+date raise IntegrityError.""" import sqlalchemy.exc today = datetime.date.today() now = datetime.datetime.now(datetime.timezone.utc) row_a = MusehubDailyPushBytes( identity_id=_IDENTITY_ID, date=today, bytes_uploaded=100, updated_at=now, ) row_b = MusehubDailyPushBytes( identity_id=_IDENTITY_ID, date=today, bytes_uploaded=200, updated_at=now, ) db_session.add(row_a) await db_session.commit() db_session.expunge(row_a) # remove from identity map so adding row_b with same PK doesn't warn db_session.add(row_b) with pytest.raises(sqlalchemy.exc.IntegrityError): await db_session.commit() @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_different_dates_different_rows(db_session: AsyncSession, repo: MusehubRepo) -> None: """Same identity on two different dates produces two separate rows.""" now = datetime.datetime.now(datetime.timezone.utc) today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) db_session.add(MusehubDailyPushBytes( identity_id=_IDENTITY_ID, date=today, bytes_uploaded=100, updated_at=now, )) db_session.add(MusehubDailyPushBytes( identity_id=_IDENTITY_ID, date=yesterday, bytes_uploaded=50, updated_at=now, )) await db_session.commit() result = await db_session.execute( select(func.count()).select_from(MusehubDailyPushBytes).where( MusehubDailyPushBytes.identity_id == _IDENTITY_ID, ) ) assert result.scalar() == 2 @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_different_identities_isolated(db_session: AsyncSession, repo: MusehubRepo) -> None: """Two identities have independent quota rows — one does not bleed into the other.""" now = datetime.datetime.now(datetime.timezone.utc) today = datetime.date.today() db_session.add(MusehubDailyPushBytes( identity_id=_IDENTITY_ID, date=today, bytes_uploaded=1000, updated_at=now, )) db_session.add(MusehubDailyPushBytes( identity_id=_OTHER_IDENTITY_ID, date=today, bytes_uploaded=500, updated_at=now, )) await db_session.commit() res_a = await db_session.execute( select(MusehubDailyPushBytes.bytes_uploaded).where( MusehubDailyPushBytes.identity_id == _IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) res_b = await db_session.execute( select(MusehubDailyPushBytes.bytes_uploaded).where( MusehubDailyPushBytes.identity_id == _OTHER_IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) assert res_a.scalar() == 1000 assert res_b.scalar() == 500 @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_upsert_accumulates_not_overwrites(db_session: AsyncSession, repo: MusehubRepo) -> None: """record_mpack_bytes_uploaded upserts — repeated calls accumulate, not overwrite.""" from musehub.services.musehub_wire import record_mpack_bytes_uploaded today = datetime.date.today() await record_mpack_bytes_uploaded(db_session, _IDENTITY_ID, 300) await db_session.commit() await record_mpack_bytes_uploaded(db_session, _IDENTITY_ID, 200) await db_session.commit() result = await db_session.execute( select(MusehubDailyPushBytes.bytes_uploaded).where( MusehubDailyPushBytes.identity_id == _IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) assert result.scalar() == 500 # --------------------------------------------------------------------------- # Tier 6 — Performance # --------------------------------------------------------------------------- @pytest.mark.tier6 @pytest.mark.asyncio async def test_t6_presign_latency_under_50ms(db_session: AsyncSession, repo: MusehubRepo) -> None: """Presign endpoint (no MinIO round-trip, stub backend) completes in < 50ms.""" async with _make_client(db_session) as client: # Warm-up — exclude connection setup from gate await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=_body(b"warmup"), headers={"Content-Type": "application/x-msgpack"}, ) t0 = time.perf_counter() resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=_body(b"perf-mpack-bytes" * 64), headers={"Content-Type": "application/x-msgpack"}, ) elapsed_ms = (time.perf_counter() - t0) * 1000 app.dependency_overrides.clear() assert resp.status_code == 200 assert elapsed_ms < 50, f"presign took {elapsed_ms:.1f}ms — gate is 50ms" @pytest.mark.tier6 @pytest.mark.asyncio async def test_t6_quota_query_uses_index(db_session: AsyncSession, repo: MusehubRepo) -> None: """EXPLAIN on the quota SUM query references ix_daily_push_bytes_identity_date.""" today = datetime.date.today() plan = await db_session.execute(text( "EXPLAIN SELECT COALESCE(SUM(bytes_uploaded), 0) " "FROM musehub_daily_push_bytes " f"WHERE identity_id = '{_IDENTITY_ID}' AND date = '{today}'" )) plan_text = "\n".join(row[0] for row in plan) assert "Index" in plan_text or "index" in plan_text, ( f"quota query not using an index:\n{plan_text}" ) # --------------------------------------------------------------------------- # Tier 7 — Security # --------------------------------------------------------------------------- @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_malformed_mpack_key_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None: """mpack_key without sha256: prefix is rejected with 422.""" body = msgpack.packb( {"mpack_key": "not-a-real-key", "size_bytes": 100}, use_bin_type=True, ) async with _make_client(db_session) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code == 422, f"expected 422 for malformed key, got {resp.status_code}" @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_cross_user_quota_isolation(db_session: AsyncSession, repo: MusehubRepo) -> None: """User A's presign does not consume User B's quota.""" settings = get_settings() if settings.mpack_daily_upload_limit_bytes <= 0: pytest.skip("daily quota disabled") today = datetime.date.today() mpack_bytes = b"y" * 2048 async with _make_client(db_session, _AUTH_CTX) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=_body(mpack_bytes), headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code == 200 result = await db_session.execute( select(func.coalesce(func.sum(MusehubDailyPushBytes.bytes_uploaded), 0)).where( MusehubDailyPushBytes.identity_id == _OTHER_IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) aria_bytes = int(result.scalar()) assert aria_bytes == 0, f"aria's quota was touched: {aria_bytes} bytes" @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_unauthenticated_request_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None: """No auth header → 401 before any quota or presign logic runs.""" async def _override_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_db # no require_valid_token override — real enforcement today = datetime.date.today() async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=_body(b"should not reach quota logic"), headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code in (401, 403) # Quota row must not exist — auth rejected before any DB write result = await db_session.execute( select(func.count()).select_from(MusehubDailyPushBytes).where( MusehubDailyPushBytes.identity_id == _IDENTITY_ID, MusehubDailyPushBytes.date == today, ) ) assert result.scalar() == 0, "quota row written for unauthenticated request" @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_oversized_key_field_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None: """A pathologically long mpack_key string is rejected, not stored.""" body = msgpack.packb( {"mpack_key": "sha256:" + "a" * 10_000, "size_bytes": 100}, use_bin_type=True, ) async with _make_client(db_session) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code in (400, 422), ( f"expected 4xx for oversized key, got {resp.status_code}" )