"""TDD — Phase 4: per-user daily byte limits, anomaly detection, /api/caps (issue #49). Phase 4 invariants: 4a. Per-user daily byte limit — tracked in musehub_daily_push_bytes; presign endpoint rejects with 429 when the caller's daily total would exceed settings.mpack_daily_upload_limit_bytes. 4b. Anomaly detection — after a successful mpack.index, the per-user 30-day rolling average is computed; if today's upload is >10× the average a musehub_push_anomaly row is inserted and a structured WARNING is logged. The push is NOT rejected. 4c. GET /api/caps — public endpoint returning server limits as JSON. """ from __future__ import annotations import datetime from collections.abc import AsyncGenerator from typing import TypedDict import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.musehub_abuse_models import MusehubDailyPushBytes, MusehubPushAnomaly from musehub.db.database import get_db from musehub.main import app class _RepoData(TypedDict): owner: str name: str repoId: str _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "a" * 64, is_agent=False, is_admin=False, ) _AUTH_CTX2 = MSignContext( handle="carol", identity_id="sha256:" + "b" * 64, is_agent=False, is_admin=False, ) # ── fixtures ───────────────────────────────────────────────────────────────── @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def client2(db_session: AsyncSession) -> None: """Second user client — verifies limits are per-user, not global.""" async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX2 app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX2 async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> AsyncGenerator[_RepoData, None]: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "phase4-limits-test", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data await client.delete(f"/api/repos/{data['repoId']}") # ── helper ──────────────────────────────────────────────────────────────────── def _today() -> datetime.date: return datetime.date.today() # ══════════════════════════════════════════════════════════════════════════════ # 4a — per-user daily byte limit # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_daily_push_bytes_table_exists(db_session: AsyncSession) -> None: """musehub_daily_push_bytes table is present and queryable.""" rows = ( await db_session.execute(select(MusehubDailyPushBytes)) ).scalars().all() assert isinstance(rows, list) @pytest.mark.asyncio async def test_record_mpack_bytes_increments_daily_total(db_session: AsyncSession) -> None: """record_mpack_bytes_uploaded upserts the per-user daily row.""" from musehub.services.musehub_wire import record_mpack_bytes_uploaded identity_id = _AUTH_CTX.identity_id today = _today() await record_mpack_bytes_uploaded(db_session, identity_id, 1024) await db_session.commit() row = ( await db_session.execute( select(MusehubDailyPushBytes).where( MusehubDailyPushBytes.identity_id == identity_id, MusehubDailyPushBytes.date == today, ) ) ).scalar_one_or_none() assert row is not None assert row.bytes_uploaded == 1024 @pytest.mark.asyncio async def test_record_mpack_bytes_accumulates_across_calls(db_session: AsyncSession) -> None: """Two calls on the same day accumulate, not overwrite.""" from musehub.services.musehub_wire import record_mpack_bytes_uploaded identity_id = _AUTH_CTX.identity_id await record_mpack_bytes_uploaded(db_session, identity_id, 500) await db_session.commit() await record_mpack_bytes_uploaded(db_session, identity_id, 300) await db_session.commit() row = ( await db_session.execute( select(MusehubDailyPushBytes).where( MusehubDailyPushBytes.identity_id == identity_id, MusehubDailyPushBytes.date == _today(), ) ) ).scalar_one_or_none() assert row is not None assert row.bytes_uploaded == 800 @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_daily_limit_allows_push_under_quota( client: AsyncClient, repo: _RepoData, ) -> None: """mpack-presign succeeds when user is under the daily limit.""" import msgpack body = msgpack.packb({"mpack_key": "sha256:" + "c" * 64, "size_bytes": 1024}) resp = await client.post( f"/{repo['owner']}/{repo['name']}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) # 200 or 422 (no backend presign in test) — definitely NOT 429 assert resp.status_code != 429 @pytest.mark.asyncio async def test_daily_limit_blocks_push_over_quota( client: AsyncClient, repo: _RepoData, db_session: AsyncSession, ) -> None: """mpack-presign returns 429 when daily limit already exceeded.""" from musehub.config import get_settings from musehub.services.musehub_wire import record_mpack_bytes_uploaded limit = get_settings().mpack_daily_upload_limit_bytes # Pre-fill the user's daily counter just over the limit await record_mpack_bytes_uploaded(db_session, _AUTH_CTX.identity_id, limit + 1) await db_session.commit() import msgpack body = msgpack.packb({"mpack_key": "sha256:" + "d" * 64, "size_bytes": 1024}) resp = await client.post( f"/{repo['owner']}/{repo['name']}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status_code == 429 assert "daily" in resp.text.lower() @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_daily_limit_is_per_user_not_global( client: AsyncClient, repo: _RepoData, db_session: AsyncSession, ) -> None: """User carol's quota being exhausted does NOT block gabriel.""" from musehub.config import get_settings from musehub.services.musehub_wire import record_mpack_bytes_uploaded limit = get_settings().mpack_daily_upload_limit_bytes # Exhaust carol's quota await record_mpack_bytes_uploaded(db_session, _AUTH_CTX2.identity_id, limit + 1) await db_session.commit() # Gabriel's presign should still work (not 429) import msgpack body = msgpack.packb({"mpack_key": "sha256:" + "e" * 64, "size_bytes": 512}) resp = await client.post( f"/{repo['owner']}/{repo['name']}/push/mpack-presign", content=body, headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status_code != 429 # ══════════════════════════════════════════════════════════════════════════════ # 4b — anomaly detection # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_push_anomaly_table_exists(db_session: AsyncSession) -> None: """musehub_push_anomaly table is present and queryable.""" rows = ( await db_session.execute(select(MusehubPushAnomaly)) ).scalars().all() assert isinstance(rows, list) @pytest.mark.asyncio async def test_check_push_anomaly_does_not_flag_normal_volume(db_session: AsyncSession) -> None: """No anomaly row created when today's upload ≤ 10× 30-day average.""" from musehub.services.musehub_wire import check_push_anomaly identity_id = _AUTH_CTX.identity_id today = _today() # Seed 7 days of 1 MB pushes for a 1 MB average for i in range(7): day = today - datetime.timedelta(days=i + 1) db_session.add(MusehubDailyPushBytes( identity_id=identity_id, date=day, bytes_uploaded=1024 * 1024, )) await db_session.commit() # Today's upload is 5 MB — 5× the average, under the 10× threshold flagged = await check_push_anomaly(db_session, identity_id, 5 * 1024 * 1024) assert flagged is False rows = ( await db_session.execute( select(MusehubPushAnomaly).where( MusehubPushAnomaly.identity_id == identity_id ) ) ).scalars().all() assert len(rows) == 0 @pytest.mark.asyncio async def test_check_push_anomaly_flags_spike(db_session: AsyncSession) -> None: """Anomaly row inserted when today's upload is >10× 30-day average.""" from musehub.services.musehub_wire import check_push_anomaly identity_id = _AUTH_CTX.identity_id today = _today() # 30-day average of 1 MB/day for i in range(30): day = today - datetime.timedelta(days=i + 1) db_session.add(MusehubDailyPushBytes( identity_id=identity_id, date=day, bytes_uploaded=1024 * 1024, )) await db_session.commit() # Today's upload is 11 MB — 11× the average → should flag flagged = await check_push_anomaly(db_session, identity_id, 11 * 1024 * 1024) assert flagged is True await db_session.commit() row = ( await db_session.execute( select(MusehubPushAnomaly).where( MusehubPushAnomaly.identity_id == identity_id ) ) ).scalar_one_or_none() assert row is not None assert row.bytes_today >= 11 * 1024 * 1024 assert row.rolling_avg_bytes > 0 assert row.ratio >= 10.0 @pytest.mark.asyncio async def test_check_push_anomaly_no_history_does_not_flag(db_session: AsyncSession) -> None: """First-ever push for a user (no history) is never flagged as anomalous.""" from musehub.services.musehub_wire import check_push_anomaly identity_id = "sha256:" + "f" * 64 flagged = await check_push_anomaly(db_session, identity_id, 100 * 1024 * 1024) assert flagged is False @pytest.mark.asyncio async def test_anomaly_does_not_block_push(db_session: AsyncSession) -> None: """check_push_anomaly returns True but raises no exception.""" from musehub.services.musehub_wire import check_push_anomaly identity_id = _AUTH_CTX.identity_id today = _today() for i in range(10): day = today - datetime.timedelta(days=i + 1) db_session.add(MusehubDailyPushBytes( identity_id=identity_id, date=day, bytes_uploaded=100, )) await db_session.commit() # 100 MB vs 100 bytes average — far above 10× threshold # Should return True but NOT raise try: result = await check_push_anomaly(db_session, identity_id, 100 * 1024 * 1024) assert result is True except Exception as exc: pytest.fail(f"check_push_anomaly raised unexpectedly: {exc}") # ══════════════════════════════════════════════════════════════════════════════ # 4c — GET /api/caps # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_caps_endpoint_exists(client: AsyncClient) -> None: """GET /api/caps returns 200.""" resp = await client.get("/api/caps") assert resp.status_code == 200 @pytest.mark.asyncio async def test_caps_returns_required_fields(client: AsyncClient) -> None: """GET /api/caps body contains all required server limit fields.""" resp = await client.get("/api/caps") assert resp.status_code == 200 data = resp.json() assert "max_mpack_bytes" in data assert "daily_upload_limit_bytes" in data assert "max_commits_per_push" in data assert "max_objects_per_push" in data @pytest.mark.asyncio async def test_caps_values_match_settings(client: AsyncClient) -> None: """GET /api/caps values match the active settings object.""" from musehub.config import get_settings s = get_settings() resp = await client.get("/api/caps") assert resp.status_code == 200 data = resp.json() assert data["max_mpack_bytes"] == s.mpack_max_bytes assert data["daily_upload_limit_bytes"] == s.mpack_daily_upload_limit_bytes assert data["max_commits_per_push"] == s.mpack_max_commits assert data["max_objects_per_push"] == s.mpack_max_objects @pytest.mark.asyncio async def test_caps_is_public_no_auth_required() -> None: """GET /api/caps works without any auth header.""" async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: resp = await c.get("/api/caps") assert resp.status_code == 200