"""Dead identity recovery — handle exists but has no registered keys. Scenario that breaks aaronrene and edwin: 1. User registered successfully (identity + key created) 2. Key was deleted from musehub_auth_keys (logout, failed rotation, etc.) 3. Identity row remains in musehub_identities — handle is "taken" 4. User tries to authenticate → 401 "No registered keys for identity" 5. User tries to re-register with same handle → 409 "Handle already taken" 6. User is locked out with no recovery path Fix: when a new key attempts to register under a handle that exists but has no keys (dead identity), add the new key to the existing identity instead of raising 409. Coverage -------- D1 Dead identity → auth returns 401 "No registered keys for identity" D2 Dead identity + re-register with same handle → 200 (recovery succeeds) D3 Dead identity recovery → subsequent auth with new key succeeds D4 Active identity (has keys) + different key cannot steal the handle → 409 D5 Active identity (has keys) + same key → 200 (normal login) """ from __future__ import annotations from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from httpx import AsyncClient from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession import pytest from muse.core.types import encode_pubkey, encode_sig, public_key_fingerprint from musehub.core.genesis import compute_identity_id from musehub.db.musehub_auth_models import MusehubAuthKey from musehub.db.musehub_identity_models import MusehubIdentity # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _kp() -> tuple[Ed25519PrivateKey, bytes, str, str]: """Return (priv, raw_pub_bytes, pub_b64, fingerprint).""" priv = Ed25519PrivateKey.generate() raw = priv.public_key().public_bytes_raw() return priv, raw, encode_pubkey("ed25519", raw), public_key_fingerprint(raw) def _sign(priv: Ed25519PrivateKey, nonce_hex: str) -> str: return encode_sig("ed25519", priv.sign(bytes.fromhex(nonce_hex))) async def _register(client: AsyncClient, priv: Ed25519PrivateKey, pub_b64: str, fp: str, handle: str) -> int: r1 = await client.post("/api/auth/challenge", json={"fingerprint": fp}) assert r1.status_code == 200, r1.text ct = r1.json()["challenge_token"] r2 = await client.post("/api/auth/verify", json={ "challenge_token": ct, "public_key_b64": pub_b64, "signature_b64": _sign(priv, ct), "handle": handle, }) return r2.status_code async def _auth(client: AsyncClient, priv: Ed25519PrivateKey, pub_b64: str, fp: str, handle: str) -> int: r1 = await client.post("/api/auth/challenge", json={"fingerprint": fp}) assert r1.status_code == 200, r1.text ct = r1.json()["challenge_token"] r2 = await client.post("/api/auth/verify", json={ "challenge_token": ct, "public_key_b64": pub_b64, "signature_b64": _sign(priv, ct), "handle": handle, }) return r2.status_code async def _make_dead_identity( client: AsyncClient, db_session: AsyncSession, handle: str, ) -> tuple[Ed25519PrivateKey, str, str]: """Register a user then delete their key, leaving a dead identity. Returns (original_priv, original_pub_b64, original_fp). """ priv, _, pub_b64, fp = _kp() code = await _register(client, priv, pub_b64, fp, handle) assert code == 200, f"setup registration failed: {code}" # Delete the key row — simulates logout / failed rotation await db_session.execute( delete(MusehubAuthKey).where(MusehubAuthKey.fingerprint == fp) ) await db_session.commit() db_session.expire_all() # Verify the identity still exists but has no keys identity = (await db_session.execute( select(MusehubIdentity).where(MusehubIdentity.handle == handle) )).scalar_one_or_none() assert identity is not None, "identity should still exist after key deletion" key_count = (await db_session.execute( select(MusehubAuthKey).where(MusehubAuthKey.identity_id == identity.identity_id) )).scalars().all() assert len(key_count) == 0, "dead identity must have zero keys" return priv, pub_b64, fp # --------------------------------------------------------------------------- # D1 Dead identity → re-register same handle with NEW key → 200 (recovery) # --------------------------------------------------------------------------- async def test_D1_dead_identity_blocks_reregister_without_fix( client: AsyncClient, db_session: AsyncSession, ) -> None: """D1: With an active identity that still has keys, a different key → 409. This is the NEGATIVE case — proves the guard still works when there ARE active keys. The recovery path (D2) only fires when there are zero keys. """ handle = "dead-d1-user" priv, _, pub_b64, fp = _kp() code = await _register(client, priv, pub_b64, fp, handle) assert code == 200, f"setup failed: {code}" # Different key, same handle — identity has an active key → must be 409 other_priv, _, other_pub_b64, other_fp = _kp() code2 = await _register(client, other_priv, other_pub_b64, other_fp, handle) assert code2 == 409, ( f"Active identity should not allow a different key: expected 409, got {code2}" ) # --------------------------------------------------------------------------- # D2 Dead identity + re-register same handle → 200 (recovery) # --------------------------------------------------------------------------- async def test_D2_dead_identity_reregister_succeeds( client: AsyncClient, db_session: AsyncSession, ) -> None: """D2: Re-registering a new key under a dead identity's handle → 200.""" handle = "dead-d2-user" await _make_dead_identity(client, db_session, handle) # New key — different from the one that was deleted new_priv, _, new_pub_b64, new_fp = _kp() code = await _register(client, new_priv, new_pub_b64, new_fp, handle) assert code == 200, ( f"Dead identity recovery should return 200, got {code}. " "Server should allow re-keying a handle with no active keys." ) # --------------------------------------------------------------------------- # D3 After recovery, new key authenticates successfully # --------------------------------------------------------------------------- async def test_D3_recovered_identity_auth_succeeds( client: AsyncClient, db_session: AsyncSession, ) -> None: """D3: After dead identity recovery, the new key can authenticate.""" handle = "dead-d3-user" await _make_dead_identity(client, db_session, handle) new_priv, _, new_pub_b64, new_fp = _kp() reg_code = await _register(client, new_priv, new_pub_b64, new_fp, handle) assert reg_code == 200, f"recovery registration failed: {reg_code}" auth_code = await _auth(client, new_priv, new_pub_b64, new_fp, handle) assert auth_code == 200, ( f"Expected 200 for auth after recovery, got {auth_code}" ) # --------------------------------------------------------------------------- # D4 Active identity (has keys) → different key cannot steal handle → 409 # --------------------------------------------------------------------------- async def test_D4_active_identity_cannot_be_stolen( client: AsyncClient, db_session: AsyncSession, ) -> None: """D4: A handle with active keys cannot be claimed by a different key.""" handle = "dead-d4-user" priv, _, pub_b64, fp = _kp() code = await _register(client, priv, pub_b64, fp, handle) assert code == 200, f"setup failed: {code}" # Different key tries to register under the same handle other_priv, _, other_pub_b64, other_fp = _kp() steal_code = await _register(client, other_priv, other_pub_b64, other_fp, handle) assert steal_code == 409, ( f"Expected 409 when handle with active keys is reused by a different key, got {steal_code}" ) # --------------------------------------------------------------------------- # D5 Active identity + same key → 200 (normal login / idempotent re-register) # --------------------------------------------------------------------------- async def test_D5_active_identity_same_key_login( client: AsyncClient, db_session: AsyncSession, ) -> None: """D5: Re-registering with the same key under the same handle → 200 (login).""" handle = "dead-d5-user" priv, _, pub_b64, fp = _kp() code1 = await _register(client, priv, pub_b64, fp, handle) assert code1 == 200, f"first registration failed: {code1}" code2 = await _auth(client, priv, pub_b64, fp, handle) assert code2 == 200, f"second auth (same key) failed: {code2}"