"""Regression + integration tests for DB-backed auth challenge storage. These tests verify that: 1. Challenges are persisted to Postgres (not in-memory). 2. A challenge created by one service instance is usable by another (blue-green deploy resilience) — simulated here by using separate DB sessions. 3. Challenges are consumed (deleted) on first use — replay is prevented. 4. Expired challenges are rejected. 5. Unknown (never-issued) challenge tokens are rejected. The in-memory dict (_pending_challenges) no longer exists in the codebase. All state lives in the musehub_auth_challenges table. """ from __future__ import annotations import time from muse.core.types import encode_pubkey, encode_sig, public_key_fingerprint, long_id from datetime import datetime, timedelta, timezone import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from httpx import AsyncClient from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_auth_models import MusehubAuthChallenge # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _kp() -> tuple[Ed25519PrivateKey, bytes, str, str]: """Generate (priv, raw_pub, pub_b64, fingerprint).""" priv = Ed25519PrivateKey.generate() raw = priv.public_key().public_bytes_raw() return priv, raw, encode_pubkey("ed25519", raw), public_key_fingerprint(raw) def _sign(priv: Ed25519PrivateKey, nonce_hex: str) -> str: return encode_sig("ed25519", priv.sign(bytes.fromhex(nonce_hex))) # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- async def test_challenge_stored_in_db( client: AsyncClient, db_session: AsyncSession, ) -> None: """POST /api/auth/challenge must insert a row into musehub_auth_challenges.""" _, _, pub_b64, fp = _kp() resp = await client.post("/api/auth/challenge", json={"fingerprint": fp}) assert resp.status_code == 200, resp.text challenge_token = resp.json()["challenge_token"] # The row must exist in the DB — not in any in-memory dict. row: MusehubAuthChallenge | None = ( await db_session.execute( select(MusehubAuthChallenge).where( MusehubAuthChallenge.nonce_hex == challenge_token ) ) ).scalar_one_or_none() assert row is not None, "challenge row was not written to the database" assert row.fingerprint == fp assert row.expires_at > datetime.now(timezone.utc) async def test_challenge_consumed_on_successful_verify( client: AsyncClient, db_session: AsyncSession, ) -> None: """After a successful verify, the challenge row must be deleted (single-use).""" priv, _, pub_b64, fp = _kp() r1 = await client.post("/api/auth/challenge", json={"fingerprint": fp}) assert r1.status_code == 200 ct = r1.json()["challenge_token"] # Confirm row is in DB before verify before: MusehubAuthChallenge | None = ( await db_session.execute( select(MusehubAuthChallenge).where(MusehubAuthChallenge.nonce_hex == ct) ) ).scalar_one_or_none() assert before is not None r2 = await client.post("/api/auth/verify", json={ "challenge_token": ct, "public_key_b64": pub_b64, "signature_b64": _sign(priv, ct), "handle": "testhandle-consumed", }) assert r2.status_code == 200, r2.text # Expire the session cache so we see the committed state db_session.expire_all() # Row must be gone after: MusehubAuthChallenge | None = ( await db_session.execute( select(MusehubAuthChallenge).where(MusehubAuthChallenge.nonce_hex == ct) ) ).scalar_one_or_none() assert after is None, "challenge row was not deleted after successful verify" async def test_challenge_replay_rejected( client: AsyncClient, db_session: AsyncSession, ) -> None: """Re-submitting a consumed challenge token must return 400.""" priv, _, pub_b64, fp = _kp() r1 = await client.post("/api/auth/challenge", json={"fingerprint": fp}) assert r1.status_code == 200 ct = r1.json()["challenge_token"] sig = _sign(priv, ct) # First use — should succeed r2 = await client.post("/api/auth/verify", json={ "challenge_token": ct, "public_key_b64": pub_b64, "signature_b64": sig, "handle": "replay-test-handle", }) assert r2.status_code == 200, r2.text # Replay — must be rejected because the row was deleted r3 = await client.post("/api/auth/verify", json={ "challenge_token": ct, "public_key_b64": pub_b64, "signature_b64": sig, "handle": "replay-test-handle", }) assert r3.status_code == 400, f"replay was not rejected: {r3.text}" assert "expired" in r3.json()["detail"].lower() or "unknown" in r3.json()["detail"].lower() async def test_unknown_challenge_token_rejected( client: AsyncClient, db_session: AsyncSession, ) -> None: """A challenge token that was never issued must return 400.""" priv, _, pub_b64, fp = _kp() # Fabricate a plausible 64-hex nonce that was never inserted fake_token = "deadbeef" * 8 # 64 hex chars r = await client.post("/api/auth/verify", json={ "challenge_token": fake_token, "public_key_b64": pub_b64, "signature_b64": _sign(priv, fake_token), "handle": "unknown-token-handle", }) assert r.status_code == 400, r.text async def test_expired_challenge_rejected( client: AsyncClient, db_session: AsyncSession, ) -> None: """A challenge whose expires_at is in the past must be rejected.""" priv, _, pub_b64, fp = _kp() import secrets # Insert an already-expired challenge directly into the DB nonce_hex = secrets.token_bytes(32).hex() expired_row = MusehubAuthChallenge( nonce_hex=nonce_hex, fingerprint=fp, algorithm="ed25519", expires_at=datetime.now(timezone.utc) - timedelta(seconds=1), ) db_session.add(expired_row) await db_session.commit() r = await client.post("/api/auth/verify", json={ "challenge_token": nonce_hex, "public_key_b64": pub_b64, "signature_b64": _sign(priv, nonce_hex), "handle": "expired-handle", }) assert r.status_code == 400, r.text detail = r.json()["detail"].lower() assert "expired" in detail or "unknown" in detail async def test_nonce_is_primary_key(db_session: AsyncSession) -> None: """nonce_hex must be the PK — challenge_id must not exist on the model.""" import inspect import secrets # challenge_id must not exist assert not hasattr(MusehubAuthChallenge, "challenge_id"), ( "challenge_id should have been dropped; nonce_hex is the PK now" ) nonce_hex = secrets.token_bytes(32).hex() row = MusehubAuthChallenge( nonce_hex=nonce_hex, fingerprint=long_id("a" * 64), algorithm="ed25519", expires_at=datetime.now(timezone.utc) + timedelta(minutes=5), ) db_session.add(row) await db_session.flush() fetched = ( await db_session.execute( select(MusehubAuthChallenge).where(MusehubAuthChallenge.nonce_hex == nonce_hex) ) ).scalar_one() assert fetched.nonce_hex == nonce_hex async def test_blue_green_resilience( client: AsyncClient, db_session: AsyncSession, ) -> None: """Challenge created in one 'process' is verifiable in another. This simulates a blue-green deploy: the green container issues a challenge, then nginx flips to blue. Blue has no in-memory state — but both share Postgres. The verify call must succeed. Simulation: use a fresh AsyncSession (separate from db_session) to call create_challenge, then verify via the HTTP client (which uses its own session). If challenges were in-memory, this would fail because the HTTP handler's session would find no matching nonce. Since they are in Postgres, it works. """ from musehub.db import database as _db # noqa: PLC0415 from musehub.services.musehub_auth import create_challenge as _create_challenge # noqa: PLC0415 priv, _, pub_b64, fp = _kp() # Simulate "green" container creating the challenge in its own session. # _db._async_session_factory is pointed at the test DB by the db_session fixture. async with _db._async_session_factory() as green_session: nonce_hex = await _create_challenge(green_session, fp, "ed25519") # Simulate "blue" container verifying — uses a completely separate session # (the HTTP client gets its own session via the override_get_db fixture) r = await client.post("/api/auth/verify", json={ "challenge_token": nonce_hex, "public_key_b64": pub_b64, "signature_b64": _sign(priv, nonce_hex), "handle": "blue-green-handle", }) assert r.status_code == 200, ( f"Blue-green resilience FAILED — challenge created in one session could not " f"be verified in another: {r.text}" ) data = r.json() assert data["handle"] == "blue-green-handle" assert data["is_new_identity"] is True