"""Phase 5 — Quorum resolution via identity handles. TDD regression suite: every test starts RED and turns GREEN as the feature is implemented. Their permanent role is to prevent regressions. What this phase covers: - check_quorum resolves handle-based members via identity repo HEAD pubkey - Backward compat: sha256:... entries in members still match fingerprints directly - Key rotation propagates: after rotation the handle still resolves to new key - Handle with no identity repo does not count toward quorum - Handle whose identity repo has a different pubkey than the reviewer's key does not match """ from __future__ import annotations import base64 import json import msgpack import pytest from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id, encode_pubkey, public_key_fingerprint from musehub.core.genesis import ( compute_identity_id, compute_proposal_id, compute_repo_id, compute_review_id, compute_branch_id, ) # ── fixed fake key material ──────────────────────────────────────────────────── # 32 zero bytes = a valid-length Ed25519 key (fake, not cryptographically useful) _KEY_A_BYTES = b"\xaa" * 32 _KEY_A_FP = public_key_fingerprint(_KEY_A_BYTES) _KEY_A_B64 = encode_pubkey("ed25519", _KEY_A_BYTES) _KEY_B_BYTES = b"\xbb" * 32 _KEY_B_FP = public_key_fingerprint(_KEY_B_BYTES) _KEY_B_B64 = encode_pubkey("ed25519", _KEY_B_BYTES) _KEY_C_BYTES = b"\xcc" * 32 _KEY_C_FP = public_key_fingerprint(_KEY_C_BYTES) _KEY_C_B64 = encode_pubkey("ed25519", _KEY_C_BYTES) _NOW = datetime.now(timezone.utc) _COUNTER: list[int] = [0] # ── helpers ─────────────────────────────────────────────────────────────────── def _uid(tag: str) -> str: """Return a unique handle/slug for this test run.""" _COUNTER[0] += 1 return f"p5{tag}{_COUNTER[0]}" def _make_identity(handle: str) -> None: from musehub.db.musehub_identity_models import MusehubIdentity return MusehubIdentity( identity_id=compute_identity_id(handle.encode()), handle=handle, identity_type="human", agent_capabilities=[], pinned_repo_ids=[], is_verified=False, created_at=_NOW, updated_at=_NOW, ) def _make_auth_key(identity_id: str, fingerprint: str, pubkey_b64: str) -> None: from musehub.db.musehub_auth_models import MusehubAuthKey return MusehubAuthKey( key_id=fingerprint, identity_id=identity_id, algorithm="ed25519", public_key_b64=pubkey_b64, fingerprint=fingerprint, label="test key", created_at=_NOW, ) def _make_repo(owner: str, slug: str, identity_id: str) -> None: from musehub.db.musehub_repo_models import MusehubRepo return MusehubRepo( repo_id=compute_repo_id(identity_id, slug, "muse/generic", _NOW.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=identity_id, ) def _make_proposal(repo_id: str, identity_id: str) -> None: from musehub.db.musehub_social_models import MusehubProposal _COUNTER[0] += 1 proposal_id = compute_proposal_id( repo_id, identity_id, "feat/x", "main", _NOW.isoformat() ) return MusehubProposal( proposal_id=proposal_id, repo_id=repo_id, proposal_number=_COUNTER[0], title="Test proposal", body="", from_branch="feat/x", to_branch="main", state="open", author="p5user", created_at=_NOW, updated_at=_NOW, ), proposal_id def _make_review(proposal_id: str, reviewer: str, state: str = "approved") -> None: from musehub.db.musehub_social_models import MusehubProposalReview review_id = compute_review_id( proposal_id, compute_identity_id(reviewer.encode()), _NOW.isoformat() ) return MusehubProposalReview( review_id=review_id, proposal_id=proposal_id, reviewer_username=reviewer, state=state, submitted_at=_NOW, created_at=_NOW, ) async def _create_identity_repo_with_pubkey( session: AsyncSession, handle: str, identity_id: str, pubkey_b64: str, ) -> None: """Persist a minimal identity repo whose HEAD IdentityRecord has pubkey_b64.""" from musehub.db.musehub_repo_models import ( MusehubRepo, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef, MusehubCommit, MusehubCommitRef, MusehubBranch, ) repo_id = compute_repo_id(identity_id, "identity", "identity", _NOW.isoformat()) repo = MusehubRepo( repo_id=repo_id, name="identity", owner=handle, slug="identity", visibility="private", owner_user_id=identity_id, domain_id="identity", ) session.add(repo) record = { "handle": handle, "type": "human", "pubkey": pubkey_b64, "quorum": None, "registered_at": _NOW.isoformat(), "metadata": {}, } content = json.dumps(record).encode() file_path = f"identities/{handle}.json" obj_id = blob_id(content) snap_id = blob_id(f"snap:{repo_id}:{handle}".encode()) commit_id = blob_id(f"commit:{repo_id}:{handle}".encode()) obj = MusehubObject( object_id=obj_id, path=file_path, size_bytes=len(content), storage_uri=f"s3://muse-objects/objects/{obj_id}", content_cache=content, ) session.add(obj) session.add(MusehubObjectRef(object_id=obj_id, repo_id=repo_id)) manifest = {file_path: obj_id} snap = MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=1, created_at=_NOW, ) session.add(snap) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) commit = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=[], message=f"identity: register {handle}", author=identity_id, timestamp=_NOW, snapshot_id=snap_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) branch = MusehubBranch( branch_id=compute_branch_id(repo_id, "main"), repo_id=repo_id, name="main", head_commit_id=commit_id, ) session.add(branch) await session.flush() # ═══════════════════════════════════════════════════════════════════════════════ # 1. Handle-based member resolves to fingerprint via identity repo HEAD # ═══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_handle_member_resolves_to_fingerprint(db_session: AsyncSession) -> None: """resolve_handle_to_fingerprint reads identity repo HEAD and returns FP.""" from musehub.services.musehub_governance import resolve_handle_to_fingerprint handle = _uid("alice") identity_id = compute_identity_id(handle.encode()) identity = _make_identity(handle) db_session.add(identity) await db_session.flush() await _create_identity_repo_with_pubkey(db_session, handle, identity_id, _KEY_A_B64) await db_session.commit() fp = await resolve_handle_to_fingerprint(db_session, handle) assert fp == _KEY_A_FP, f"Expected {_KEY_A_FP!r}, got {fp!r}" @pytest.mark.asyncio async def test_handle_with_no_identity_repo_returns_none(db_session: AsyncSession) -> None: """resolve_handle_to_fingerprint returns None when no identity repo exists.""" from musehub.services.musehub_governance import resolve_handle_to_fingerprint handle = _uid("ghost") identity = _make_identity(handle) db_session.add(identity) await db_session.commit() fp = await resolve_handle_to_fingerprint(db_session, handle) assert fp is None # ═══════════════════════════════════════════════════════════════════════════════ # 2. check_quorum — handle-based members # ═══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_check_quorum_handle_member_counts_when_key_matches( db_session: AsyncSession, ) -> None: """check_quorum counts a handle member whose identity repo pubkey matches reviewer's key.""" from musehub.services.musehub_governance import check_quorum handle = _uid("bob") identity_id = compute_identity_id(handle.encode()) identity = _make_identity(handle) db_session.add(identity) await db_session.flush() # Identity repo: handle → KEY_A await _create_identity_repo_with_pubkey(db_session, handle, identity_id, _KEY_A_B64) # Auth key: reviewer registered with KEY_A db_session.add(_make_auth_key(identity_id, _KEY_A_FP, _KEY_A_B64)) # Governance repo + proposal repo = _make_repo(handle, "proj", identity_id) db_session.add(repo) proposal, proposal_id = _make_proposal(repo.repo_id, identity_id) db_session.add(proposal) db_session.add(_make_review(proposal_id, handle)) await db_session.commit() governance = { "schema": 1, "quorum": {"threshold": 1, "members": [handle]}, } met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, governance ) assert met, f"Expected quorum met but found={found} threshold={threshold}" assert found == 1 @pytest.mark.asyncio async def test_check_quorum_handle_member_not_counted_when_key_differs( db_session: AsyncSession, ) -> None: """check_quorum does not count a handle member when reviewer has a different key.""" from musehub.services.musehub_governance import check_quorum handle = _uid("carol") identity_id = compute_identity_id(handle.encode()) identity = _make_identity(handle) db_session.add(identity) await db_session.flush() # Identity repo: handle → KEY_A await _create_identity_repo_with_pubkey(db_session, handle, identity_id, _KEY_A_B64) # But reviewer's registered key is KEY_B (different) db_session.add(_make_auth_key(identity_id, _KEY_B_FP, _KEY_B_B64)) repo = _make_repo(handle, "proj2", identity_id) db_session.add(repo) proposal, proposal_id = _make_proposal(repo.repo_id, identity_id) db_session.add(proposal) db_session.add(_make_review(proposal_id, handle)) await db_session.commit() governance = { "schema": 1, "quorum": {"threshold": 1, "members": [handle]}, } met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, governance ) assert not met, f"Expected quorum NOT met but found={found}" assert found == 0 @pytest.mark.asyncio async def test_check_quorum_handle_with_no_identity_repo_not_counted( db_session: AsyncSession, ) -> None: """check_quorum skips handle members who have no identity repo.""" from musehub.services.musehub_governance import check_quorum handle = _uid("dave") identity_id = compute_identity_id(handle.encode()) identity = _make_identity(handle) db_session.add(identity) await db_session.flush() # No identity repo created — resolver returns None db_session.add(_make_auth_key(identity_id, _KEY_A_FP, _KEY_A_B64)) repo = _make_repo(handle, "proj3", identity_id) db_session.add(repo) proposal, proposal_id = _make_proposal(repo.repo_id, identity_id) db_session.add(proposal) db_session.add(_make_review(proposal_id, handle)) await db_session.commit() governance = { "schema": 1, "quorum": {"threshold": 1, "members": [handle]}, } met, found, _ = await check_quorum( db_session, repo.repo_id, proposal_id, governance ) assert not met assert found == 0 # ═══════════════════════════════════════════════════════════════════════════════ # 3. Backward compatibility — sha256: entries still match fingerprints directly # ═══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_check_quorum_fp_member_still_works(db_session: AsyncSession) -> None: """sha256:... entries in members continue to match reviewer fingerprints directly.""" from musehub.services.musehub_governance import check_quorum handle = _uid("eve") identity_id = compute_identity_id(handle.encode()) identity = _make_identity(handle) db_session.add(identity) await db_session.flush() db_session.add(_make_auth_key(identity_id, _KEY_A_FP, _KEY_A_B64)) repo = _make_repo(handle, "proj4", identity_id) db_session.add(repo) proposal, proposal_id = _make_proposal(repo.repo_id, identity_id) db_session.add(proposal) db_session.add(_make_review(proposal_id, handle)) await db_session.commit() # Old format: fingerprint directly in members governance = { "schema": 1, "quorum": {"threshold": 1, "members": [_KEY_A_FP]}, } met, found, _ = await check_quorum( db_session, repo.repo_id, proposal_id, governance ) assert met assert found == 1 # ═══════════════════════════════════════════════════════════════════════════════ # 4. Key rotation propagates automatically via identity repo # ═══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_key_rotation_propagates_via_identity_repo( db_session: AsyncSession, ) -> None: """After key rotation, handle-based member still satisfies quorum with the new key.""" from musehub.services.musehub_governance import check_quorum handle = _uid("frank") identity_id = compute_identity_id(handle.encode()) identity = _make_identity(handle) db_session.add(identity) await db_session.flush() # Identity repo HEAD shows KEY_B (rotated) await _create_identity_repo_with_pubkey(db_session, handle, identity_id, _KEY_B_B64) # Reviewer's current registered key is KEY_B db_session.add(_make_auth_key(identity_id, _KEY_B_FP, _KEY_B_B64)) repo = _make_repo(handle, "proj5", identity_id) db_session.add(repo) proposal, proposal_id = _make_proposal(repo.repo_id, identity_id) db_session.add(proposal) db_session.add(_make_review(proposal_id, handle)) await db_session.commit() governance = { "schema": 1, "quorum": {"threshold": 1, "members": [handle]}, } met, found, _ = await check_quorum( db_session, repo.repo_id, proposal_id, governance ) assert met, f"Quorum should be met after key rotation, found={found}" # ═══════════════════════════════════════════════════════════════════════════════ # 5. Mixed handle + fingerprint member list # ═══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_check_quorum_mixed_members_both_resolve( db_session: AsyncSession, ) -> None: """Mixed member list (one handle, one fingerprint) counts both correctly.""" from musehub.services.musehub_governance import check_quorum handle_g = _uid("grace") handle_h = _uid("hank") id_g = compute_identity_id(handle_g.encode()) id_h = compute_identity_id(handle_h.encode()) for handle, iid in ((handle_g, id_g), (handle_h, id_h)): db_session.add(_make_identity(handle)) await db_session.flush() # grace → identity repo with KEY_A await _create_identity_repo_with_pubkey(db_session, handle_g, id_g, _KEY_A_B64) db_session.add(_make_auth_key(id_g, _KEY_A_FP, _KEY_A_B64)) # hank → fingerprint directly (old-style), registered with KEY_C db_session.add(_make_auth_key(id_h, _KEY_C_FP, _KEY_C_B64)) # Use grace's repo for the proposal repo = _make_repo(handle_g, "proj6", id_g) db_session.add(repo) proposal, proposal_id = _make_proposal(repo.repo_id, id_g) db_session.add(proposal) # Both approve db_session.add(_make_review(proposal_id, handle_g)) db_session.add(_make_review(proposal_id, handle_h)) await db_session.commit() governance = { "schema": 1, "quorum": { "threshold": 2, "members": [handle_g, _KEY_C_FP], # grace by handle, hank by fingerprint }, } met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, governance ) assert met, f"Expected quorum met, found={found} threshold={threshold}" assert found == 2