"""Push Protocol Step 3 (unpack-mpack) — Tiers 4–7: stress, integrity, performance, security. Tier 4 — Stress: concurrent wire_push_unpack_mpack calls all land without errors. Tier 5 — Data integrity: DB invariants on MusehubBackgroundJob and MusehubBranch. Tier 6 — Performance: unpack endpoint latency gate. Tier 7 — Security: malformed keys, oversized keys, unauthenticated access. """ from __future__ import annotations import asyncio import time import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy import select, func, text from sqlalchemy.ext.asyncio import AsyncSession from muse.core.mpack import build_wire_mpack from muse.core.types import blob_id, fake_id from musehub.auth.dependencies import require_valid_token from musehub.auth.request_signing import MSignContext from musehub.config import get_settings from musehub.core.genesis import compute_identity_id from musehub.db.database import get_db from musehub.db.musehub_jobs_models import MusehubBackgroundJob import typing from sqlalchemy.ext.asyncio import async_sessionmaker from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo from musehub.main import app from musehub.services.musehub_repository import create_repo _OWNER = "gabriel" _IDENTITY_ID = compute_identity_id(b"gabriel") _OTHER_IDENTITY_ID = compute_identity_id(b"aria") _REPO_NAME = "step3-tiers-test" _MPACK_BYTES = build_wire_mpack({"blobs": [], "commits": [], "snapshots": []}) _MPACK_KEY = blob_id(_MPACK_BYTES) _HEAD = fake_id("step3-tiers-tip") _AUTH_CTX = MSignContext( handle=_OWNER, identity_id=_IDENTITY_ID, is_agent=False, is_admin=False, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture() async def repo(db_session: AsyncSession) -> MusehubRepo: r = await create_repo( db_session, name=_REPO_NAME, owner=_OWNER, owner_user_id=_IDENTITY_ID, visibility="public", initialize=False, ) await db_session.commit() return r @pytest_asyncio.fixture(autouse=True) async def mock_get_mpack() -> None: from unittest.mock import AsyncMock, MagicMock, patch mock_backend = MagicMock() mock_backend.get_mpack = AsyncMock(return_value=_MPACK_BYTES) mock_backend.put = AsyncMock(return_value="s3://muse-objects/objects/fake") mock_backend.put_mpack = AsyncMock(return_value=None) mock_backend.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack?sig=x") # Patch all entry points — module-level imports and the canonical backends module # (deferred imports pick up musehub.storage.backends.get_backend at call time) with patch("musehub.storage.backends.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend): yield mock_backend def _make_client(db_session: AsyncSession, auth_ctx: MSignContext = _AUTH_CTX) -> None: async def _override_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_db app.dependency_overrides[require_valid_token] = lambda: auth_ctx return AsyncClient(transport=ASGITransport(app=app), base_url="https://localhost:1337") def _body( mpack_key: str = _MPACK_KEY, branch: str = "main", head: str = "", commits_count: int = 1, blobs_count: int = 2, ) -> bytes: return msgpack.packb( { "mpack_key": mpack_key, "branch": branch, "head": head, "commits_count": commits_count, "blobs_count": blobs_count, }, use_bin_type=True, ) async def _unpack(client: AsyncClient, repo_name: str = _REPO_NAME, **kwargs: typing.Any) -> int: resp = await client.post( f"/{_OWNER}/{repo_name}/push/unpack-mpack", content=_body(**kwargs), headers={"Content-Type": "application/x-msgpack"}, ) return resp.status_code # --------------------------------------------------------------------------- # Tier 4 — Stress # --------------------------------------------------------------------------- @pytest.mark.tier4 @pytest.mark.asyncio async def test_t4_concurrent_unpack_calls_all_succeed( repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession], ) -> None: """20 concurrent wire_push_unpack_mpack calls with distinct mpack_keys all succeed. Tests inline processing under concurrency — every call must complete without deadlocking. Phase 3: all processing is inline; no background job rows. """ from musehub.services.musehub_wire import wire_push_unpack_mpack n = 20 async def _do(i: int) -> None: # Each call gets a unique mpack_key so job_id is unique content = f"object-{i}".encode() oid = blob_id(content) unique_bytes = build_wire_mpack( {"blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": []} ) unique_key = blob_id(unique_bytes) async with session_factory() as sess: # Override get_mpack to return the correct bytes for this key from unittest.mock import AsyncMock, MagicMock, patch mock_backend = MagicMock() mock_backend.get_mpack = AsyncMock(return_value=unique_bytes) mock_backend.put = AsyncMock(return_value="s3://muse-objects/objects/fake") mock_backend.put_mpack = AsyncMock(return_value=None) mock_backend.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack?sig=x") with patch("musehub.storage.backends.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend): await wire_push_unpack_mpack( sess, repo.repo_id, unique_key, _OWNER, branch="main", head_commit_id="", commits_count=1, blobs_count=1, ) await sess.commit() # If any call raises, gather re-raises immediately — no further assertion needed. await asyncio.gather(*[_do(i) for i in range(n)]) @pytest.mark.tier4 @pytest.mark.asyncio async def test_t4_concurrent_branch_updates_for_same_branch( repo: MusehubRepo, session_factory: async_sessionmaker[AsyncSession], ) -> None: """10 concurrent calls advancing the same branch serialise without data corruption. The service uses WITH FOR UPDATE on the branch row — concurrent updates must not produce two branch rows for the same name. Pre-create the branch so all concurrent calls hit the UPDATE path (avoiding the INSERT race). """ from musehub.services.musehub_wire import wire_push_unpack_mpack from musehub.core.genesis import compute_branch_id # Pre-create the branch so concurrent calls all SELECT-for-update and UPDATE async with session_factory() as seed_sess: seed_sess.add(MusehubBranch( branch_id=compute_branch_id(repo.repo_id, "concurrent-branch"), repo_id=repo.repo_id, name="concurrent-branch", head_commit_id=fake_id("seed-head"), )) await seed_sess.commit() n = 10 async def _do(i: int) -> None: content = f"seq-{i}".encode() oid = blob_id(content) unique_bytes = build_wire_mpack( {"blobs": [{"object_id": oid, "content": content}], "commits": [], "snapshots": []} ) unique_key = blob_id(unique_bytes) head = fake_id(f"commit-{i}") async with session_factory() as sess: from unittest.mock import AsyncMock, MagicMock, patch mock_backend = MagicMock() mock_backend.get_mpack = AsyncMock(return_value=unique_bytes) mock_backend.put = AsyncMock(return_value="s3://muse-objects/objects/fake") mock_backend.put_mpack = AsyncMock(return_value=None) mock_backend.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack?sig=x") with patch("musehub.storage.backends.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend): await wire_push_unpack_mpack( sess, repo.repo_id, unique_key, _OWNER, branch="concurrent-branch", head_commit_id=head, commits_count=1, blobs_count=0, force=True, ) await sess.commit() await asyncio.gather(*[_do(i) for i in range(n)]) async with session_factory() as check_sess: result = await check_sess.execute( select(func.count()).select_from(MusehubBranch).where( MusehubBranch.repo_id == repo.repo_id, MusehubBranch.name == "concurrent-branch", ) ) count = result.scalar() assert count == 1, f"expected exactly 1 branch row, got {count}" # --------------------------------------------------------------------------- # Tier 5 — Data integrity # --------------------------------------------------------------------------- @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_unpack_returns_correct_shape( db_session: AsyncSession, repo: MusehubRepo, ) -> None: """Successful inline unpack returns the expected response keys (Phase 3 — no background job).""" from musehub.services.musehub_wire import wire_push_unpack_mpack result = await wire_push_unpack_mpack( db_session, repo.repo_id, _MPACK_KEY, _OWNER, branch="main", head_commit_id="", commits_count=3, blobs_count=7, ) assert "head" in result assert "branch" in result assert result["branch"] == "main" assert "blobs_in_mpack" in result assert "commits_in_mpack" in result @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_branch_row_created_when_head_provided( db_session: AsyncSession, repo: MusehubRepo, ) -> None: """Branch row is inserted when head_commit_id is provided and branch doesn't exist.""" from musehub.services.musehub_wire import wire_push_unpack_mpack await wire_push_unpack_mpack( db_session, repo.repo_id, _MPACK_KEY, _OWNER, branch="feat/new-branch", head_commit_id=_HEAD, commits_count=1, blobs_count=0, ) result = await db_session.execute( select(MusehubBranch).where( MusehubBranch.repo_id == repo.repo_id, MusehubBranch.name == "feat/new-branch", ) ) branch_row = result.scalar_one_or_none() assert branch_row is not None, "branch row was not created" assert branch_row.head_commit_id == _HEAD @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_branch_row_updated_when_branch_exists( db_session: AsyncSession, repo: MusehubRepo, ) -> None: """When branch already exists, unpack updates head_commit_id — no duplicate row.""" from musehub.services.musehub_wire import wire_push_unpack_mpack first_head = fake_id("first-head") second_head = fake_id("second-head") await wire_push_unpack_mpack( db_session, repo.repo_id, _MPACK_KEY, _OWNER, branch="main", head_commit_id=first_head, commits_count=1, blobs_count=0, ) # Second push to same branch with different mpack (different key) second_content = b"second-push-content" second_oid = blob_id(second_content) second_bytes = build_wire_mpack( {"blobs": [{"object_id": second_oid, "content": second_content}], "commits": [], "snapshots": []} ) second_key = blob_id(second_bytes) from unittest.mock import AsyncMock, MagicMock, patch mock_backend2 = MagicMock() mock_backend2.get_mpack = AsyncMock(return_value=second_bytes) mock_backend2.put = AsyncMock(return_value="s3://muse-objects/objects/fake2") mock_backend2.put_mpack = AsyncMock(return_value=None) mock_backend2.presign_mpack_get = AsyncMock(return_value="https://minio.example.com/mpack2?sig=x") with patch("musehub.storage.backends.get_backend", return_value=mock_backend2), \ patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend2), \ patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend2): await wire_push_unpack_mpack( db_session, repo.repo_id, second_key, _OWNER, branch="main", head_commit_id=second_head, commits_count=1, blobs_count=0, force=True, ) result = await db_session.execute( select(func.count()).select_from(MusehubBranch).where( MusehubBranch.repo_id == repo.repo_id, MusehubBranch.name == "main", ) ) assert result.scalar() == 1, "duplicate branch row created" branch_result = await db_session.execute( select(MusehubBranch.head_commit_id).where( MusehubBranch.repo_id == repo.repo_id, MusehubBranch.name == "main", ) ) assert branch_result.scalar() == second_head, "branch head not updated to second push" @pytest.mark.tier5 @pytest.mark.asyncio async def test_t5_job_row_not_written_on_mpack_not_found( db_session: AsyncSession, repo: MusehubRepo, ) -> None: """When MinIO returns None, ValueError is raised and no job row is persisted.""" from musehub.services.musehub_wire import wire_push_unpack_mpack from unittest.mock import AsyncMock, MagicMock, patch mock_backend_none = MagicMock() mock_backend_none.get_mpack = AsyncMock(return_value=None) with patch("musehub.storage.backends.get_backend", return_value=mock_backend_none), \ patch("musehub.services.musehub_wire.get_backend", return_value=mock_backend_none), \ patch("musehub.services.musehub_wire_push.get_backend", return_value=mock_backend_none): with pytest.raises(ValueError): await wire_push_unpack_mpack( db_session, repo.repo_id, _MPACK_KEY, _OWNER, branch="main", head_commit_id="", commits_count=0, blobs_count=0, ) result = await db_session.execute( select(func.count()).select_from(MusehubBackgroundJob).where( MusehubBackgroundJob.repo_id == repo.repo_id, MusehubBackgroundJob.job_type == "mpack.index", ) ) assert result.scalar() == 0, "job row written despite mpack not found" # --------------------------------------------------------------------------- # Tier 6 — Performance # --------------------------------------------------------------------------- @pytest.mark.tier6 @pytest.mark.asyncio async def test_t6_unpack_latency_under_100ms(db_session: AsyncSession, repo: MusehubRepo) -> None: """Unpack endpoint (stub MinIO, small mpack) completes in < 100ms.""" async with _make_client(db_session) as client: # Warm-up — exclude connection setup from gate await client.post( f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack", content=_body(), headers={"Content-Type": "application/x-msgpack"}, ) t0 = time.perf_counter() resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack", content=_body(commits_count=3, blobs_count=10), headers={"Content-Type": "application/x-msgpack"}, ) elapsed_ms = (time.perf_counter() - t0) * 1000 app.dependency_overrides.clear() assert resp.status_code == 200 assert elapsed_ms < 100, f"unpack took {elapsed_ms:.1f}ms — gate is 100ms" @pytest.mark.tier6 @pytest.mark.asyncio async def test_t6_job_index_query_uses_index(db_session: AsyncSession, repo: MusehubRepo) -> None: """EXPLAIN on the pending job query references ix_musehub_background_jobs_status_created.""" plan = await db_session.execute(text( "EXPLAIN SELECT * FROM musehub_background_jobs " "WHERE status = 'pending' " "ORDER BY created_at " "LIMIT 10" )) plan_text = "\n".join(row[0] for row in plan) assert "Index" in plan_text or "index" in plan_text, ( f"job queue query not using an index:\n{plan_text}" ) # --------------------------------------------------------------------------- # Tier 7 — Security # --------------------------------------------------------------------------- @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_malformed_mpack_key_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None: """mpack_key without sha256: prefix is rejected with 422.""" body = msgpack.packb( {"mpack_key": "not-a-real-key", "branch": "main", "commits_count": 1, "blobs_count": 0}, use_bin_type=True, ) async with _make_client(db_session) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack", content=body, headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code == 422, f"expected 422 for malformed key, got {resp.status_code}" @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_oversized_key_field_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None: """A pathologically long mpack_key string is rejected, not stored.""" body = msgpack.packb( {"mpack_key": "sha256:" + "a" * 10_000, "branch": "main"}, use_bin_type=True, ) async with _make_client(db_session) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack", content=body, headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code in (400, 422), ( f"expected 4xx for oversized key, got {resp.status_code}" ) @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_unauthenticated_request_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None: """No auth header → 401/403 before any job enqueue logic runs.""" async def _override_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_db # no require_valid_token override — real enforcement async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack", content=_body(), headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code in (401, 403) # No job row must exist — auth rejected before any DB write result = await db_session.execute( select(func.count()).select_from(MusehubBackgroundJob).where( MusehubBackgroundJob.repo_id == repo.repo_id, MusehubBackgroundJob.job_type == "mpack.index", ) ) assert result.scalar() == 0, "job row written for unauthenticated request" @pytest.mark.tier7 @pytest.mark.asyncio async def test_t7_commits_count_overflow_rejected(db_session: AsyncSession, repo: MusehubRepo) -> None: """commits_count wildly exceeding the limit is rejected with 422 — no job enqueued.""" settings = get_settings() body = msgpack.packb( { "mpack_key": _MPACK_KEY, "branch": "main", "commits_count": settings.mpack_max_commits * 100, "blobs_count": 0, }, use_bin_type=True, ) async with _make_client(db_session) as client: resp = await client.post( f"/{_OWNER}/{_REPO_NAME}/push/unpack-mpack", content=body, headers={"Content-Type": "application/x-msgpack"}, ) app.dependency_overrides.clear() assert resp.status_code == 422 result = await db_session.execute( select(func.count()).select_from(MusehubBackgroundJob).where( MusehubBackgroundJob.repo_id == repo.repo_id, ) ) assert result.scalar() == 0, "job row written despite over-limit commits_count"