"""Phase 2 — Key rotation commits to the identity repo. TDD regression suite: every test starts RED and turns GREEN as the feature is implemented. Their permanent role is to prevent regressions. What this phase covers: - add_key_for_identity() creates a new commit on the identity repo - The new commit updates identities/{handle}.json with the rotated pubkey - The commit history grows (2 commits: initial registration + rotation) - A second rotation produces a third commit (each rotation is its own commit) - The commit message follows the "identity: rotate key for {handle}" pattern """ from __future__ import annotations import json import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.types import encode_pubkey, encode_sig from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.crypto.keys import b64url_encode, key_fingerprint from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot from musehub.services.musehub_auth import ( add_key_for_identity, create_challenge, verify_and_authenticate, ) from musehub.types.json_types import JSONObject # ── helpers ─────────────────────────────────────────────────────────────────── def _keypair() -> tuple[Ed25519PrivateKey, bytes]: priv = Ed25519PrivateKey.generate() pub = priv.public_key().public_bytes_raw() return priv, pub def _sign_nonce(priv: Ed25519PrivateKey, nonce_hex: str) -> str: sig_bytes = priv.sign(bytes.fromhex(nonce_hex)) return encode_sig("ed25519", sig_bytes) async def _register( session: AsyncSession, handle: str, priv: Ed25519PrivateKey, pub: bytes, ) -> str: """Full challenge → verify flow; returns identity_id.""" public_key_b64 = encode_pubkey("ed25519", pub) fp = key_fingerprint(pub) nonce = await create_challenge(session, fingerprint=fp, algorithm="ed25519") sig_b64 = _sign_nonce(priv, nonce) result = await verify_and_authenticate( session=session, challenge_token=nonce, public_key_b64=public_key_b64, signature_b64=sig_b64, handle=handle, display_name=handle, label="initial-key", ) return result.identity_id async def _rotate( session: AsyncSession, identity_id: str, new_priv: Ed25519PrivateKey, new_pub: bytes, ) -> None: """Full challenge → add_key_for_identity flow for key rotation.""" public_key_b64 = encode_pubkey("ed25519", new_pub) fp = key_fingerprint(new_pub) nonce = await create_challenge(session, fingerprint=fp, algorithm="ed25519") sig_b64 = _sign_nonce(new_priv, nonce) await add_key_for_identity( session=session, identity_id=identity_id, challenge_token=nonce, public_key_b64=public_key_b64, signature_b64=sig_b64, label="rotated-key", ) async def _get_identity_commits(session: AsyncSession, handle: str) -> list[MusehubCommit]: """Return all commits on the identity repo's main branch, ordered by timestamp.""" repo_result = await session.execute( select(MusehubRepo).where( MusehubRepo.owner == handle, MusehubRepo.slug == "identity", ) ) repo = repo_result.scalar_one() commits_result = await session.execute( select(MusehubCommit) .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) .where( MusehubCommitRef.repo_id == repo.repo_id, MusehubCommit.branch == "main", ).order_by(MusehubCommit.timestamp) ) return list(commits_result.scalars().all()) async def _read_identity_record_from_commit( session: AsyncSession, commit: MusehubCommit ) -> JSONObject: """Read the identity record from a specific commit's snapshot.""" import msgpack from muse.plugins.identity.records import identity_path as _ip snap_result = await session.execute( select(MusehubSnapshot).where( MusehubSnapshot.snapshot_id == commit.snapshot_id ) ) snap = snap_result.scalar_one() manifest: JSONObject = msgpack.unpackb(snap.manifest_blob, raw=False) # identity_path uses the handle embedded in the commit's repo — derive from manifest file_path = next(k for k in manifest if k.startswith("identities/") and k.endswith(".json")) object_id = manifest[file_path] obj_result = await session.execute( select(MusehubObject).where(MusehubObject.object_id == object_id) ) obj = obj_result.scalar_one() from musehub.storage.backends import read_object_bytes raw = await read_object_bytes(obj) if raw is None: return None return json.loads(raw) # ═══════════════════════════════════════════════════════════════════════════════ # 1. Rotation creates a new commit # ═══════════════════════════════════════════════════════════════════════════════ class TestRotationCreatesCommit: async def test_rotation_adds_second_commit_to_identity_repo( self, db_session: AsyncSession ) -> None: """add_key_for_identity must commit a new revision to the identity repo.""" priv, pub = _keypair() identity_id = await _register(db_session, "alice2", priv, pub) new_priv, new_pub = _keypair() await _rotate(db_session, identity_id, new_priv, new_pub) commits = await _get_identity_commits(db_session, "alice2") assert len(commits) == 2, ( f"Expected 2 commits on identity repo after one rotation, got {len(commits)}." ) async def test_second_rotation_adds_third_commit( self, db_session: AsyncSession ) -> None: """Each rotation produces its own commit — history grows linearly.""" priv, pub = _keypair() identity_id = await _register(db_session, "bob2", priv, pub) new_priv1, new_pub1 = _keypair() await _rotate(db_session, identity_id, new_priv1, new_pub1) new_priv2, new_pub2 = _keypair() await _rotate(db_session, identity_id, new_priv2, new_pub2) commits = await _get_identity_commits(db_session, "bob2") assert len(commits) == 3, ( f"Expected 3 commits after two rotations, got {len(commits)}." ) async def test_rotation_commit_message_identifies_handle( self, db_session: AsyncSession ) -> None: """Rotation commit message must contain the handle for audit readability.""" priv, pub = _keypair() identity_id = await _register(db_session, "carol2", priv, pub) new_priv, new_pub = _keypair() await _rotate(db_session, identity_id, new_priv, new_pub) commits = await _get_identity_commits(db_session, "carol2") rotation_commit = commits[1] assert "carol2" in rotation_commit.message, ( f"Rotation commit message {rotation_commit.message!r} must contain the handle 'carol2'." ) # ═══════════════════════════════════════════════════════════════════════════════ # 2. Rotation updates pubkey in the identity record # ═══════════════════════════════════════════════════════════════════════════════ class TestRotationUpdatesPubkey: async def test_rotation_updates_pubkey_in_identity_record( self, db_session: AsyncSession ) -> None: """The identity record after rotation must reflect the new public key.""" priv, pub = _keypair() identity_id = await _register(db_session, "dave2", priv, pub) new_priv, new_pub = _keypair() new_pubkey_b64 = encode_pubkey("ed25519", new_pub) await _rotate(db_session, identity_id, new_priv, new_pub) commits = await _get_identity_commits(db_session, "dave2") latest_record = await _read_identity_record_from_commit(db_session, commits[-1]) assert latest_record["pubkey"] == new_pubkey_b64, ( f"After rotation, identity record pubkey should be {new_pubkey_b64!r}, " f"got {latest_record['pubkey']!r}." ) async def test_initial_record_pubkey_unchanged_in_history( self, db_session: AsyncSession ) -> None: """The initial commit must still hold the original pubkey (immutable history).""" priv, pub = _keypair() original_pubkey_b64 = encode_pubkey("ed25519", pub) identity_id = await _register(db_session, "eve2", priv, pub) new_priv, new_pub = _keypair() await _rotate(db_session, identity_id, new_priv, new_pub) commits = await _get_identity_commits(db_session, "eve2") initial_record = await _read_identity_record_from_commit(db_session, commits[0]) assert initial_record["pubkey"] == original_pubkey_b64, ( "The initial commit must be immutable — original pubkey must still be present " f"in commit[0], got {initial_record['pubkey']!r}." ) async def test_rotation_preserves_handle_and_type( self, db_session: AsyncSession ) -> None: """Rotation must not clobber handle or type in the identity record.""" priv, pub = _keypair() identity_id = await _register(db_session, "frank2", priv, pub) new_priv, new_pub = _keypair() await _rotate(db_session, identity_id, new_priv, new_pub) commits = await _get_identity_commits(db_session, "frank2") latest_record = await _read_identity_record_from_commit(db_session, commits[-1]) assert latest_record["handle"] == "frank2" assert latest_record["type"] == "human" async def test_second_rotation_reflects_latest_pubkey( self, db_session: AsyncSession ) -> None: """After two rotations, HEAD must have the second rotation's pubkey.""" priv, pub = _keypair() identity_id = await _register(db_session, "grace2", priv, pub) new_priv1, new_pub1 = _keypair() await _rotate(db_session, identity_id, new_priv1, new_pub1) new_priv2, new_pub2 = _keypair() final_pubkey_b64 = encode_pubkey("ed25519", new_pub2) await _rotate(db_session, identity_id, new_priv2, new_pub2) commits = await _get_identity_commits(db_session, "grace2") latest_record = await _read_identity_record_from_commit(db_session, commits[-1]) assert latest_record["pubkey"] == final_pubkey_b64, ( f"After two rotations, identity record pubkey should be the second " f"rotation key {final_pubkey_b64!r}, got {latest_record['pubkey']!r}." )