"""Belt-and-suspenders tests for the identity plugin implementation (phases 1–6). Covers gaps not addressed by test_identity_repo_phase{1-6}.py: 1. _commit_key_rotation_to_identity_repo — direct unit/integration tests - no-op when identity_id is not in the DB - no-op when identity has no identity repo - correctly writes new pubkey to HEAD - preserves all other record fields after rotation 2. State integrity - pubkey in identity repo HEAD and registered MusehubAuthKey fingerprint are consistent with each other (sha256 of raw key bytes matches) - after key rotation the GET /api/identities/{handle} response is in sync with the auth key table 3. Concurrent stress - 50 concurrent read_object_bytes calls on a content-cached object all return the same bytes - mixed concurrent reads (cached + disk) all return correct bytes 4. Security edge cases - resolve_handle_to_fingerprint returns None when the identity record contains a malformed pubkey string (no "ed25519:" prefix) - resolve_handle_to_fingerprint returns None when the identity record JSON is corrupted - read_object_bytes returns None rather than raising when the blob store has no object (exception containment guarantee) """ from __future__ import annotations import asyncio import base64 import json import time import msgpack import pytest from datetime import datetime, timezone from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from muse.core.types import long_id, blob_id, encode_pubkey, public_key_fingerprint from musehub.core.genesis import ( compute_identity_id, compute_repo_id, compute_branch_id, ) from musehub.types.json_types import JSONObject # ── key material ────────────────────────────────────────────────────────────── _KEY_A_BYTES = b"\xaa" * 32 _KEY_A_FP = public_key_fingerprint(_KEY_A_BYTES) _KEY_A_B64 = encode_pubkey("ed25519", _KEY_A_BYTES) _KEY_B_BYTES = b"\xbb" * 32 _KEY_B_FP = public_key_fingerprint(_KEY_B_BYTES) _KEY_B_B64 = encode_pubkey("ed25519", _KEY_B_BYTES) _NOW = datetime.now(timezone.utc) _COUNTER: list[int] = [0] def _uid(tag: str = "") -> str: _COUNTER[0] += 1 return f"bns{tag}{_COUNTER[0]}" # ── DB row factories ────────────────────────────────────────────────────────── def _make_identity(handle: str, identity_id: str | None = None) -> None: from musehub.db.musehub_identity_models import MusehubIdentity return MusehubIdentity( identity_id=identity_id or compute_identity_id(handle.encode()), handle=handle, identity_type="human", agent_capabilities=[], pinned_repo_ids=[], is_verified=False, created_at=_NOW, updated_at=_NOW, ) def _make_auth_key(identity_id: str, fingerprint: str, pubkey_b64: str) -> None: from musehub.db.musehub_auth_models import MusehubAuthKey return MusehubAuthKey( key_id=fingerprint, identity_id=identity_id, algorithm="ed25519", public_key_b64=pubkey_b64, fingerprint=fingerprint, label="test key", created_at=_NOW, ) async def _seed_identity_repo( session: AsyncSession, handle: str, identity_id: str, pubkey_b64: str, extra_fields: JSONObject | None = None, ) -> str: """Create a minimal identity repo whose HEAD IdentityRecord has the given pubkey. Returns the repo_id. """ from musehub.db.musehub_repo_models import ( MusehubRepo, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef, MusehubCommit, MusehubCommitRef, MusehubBranch, ) repo_id = compute_repo_id(identity_id, "identity", "identity", _NOW.isoformat()) repo = MusehubRepo( repo_id=repo_id, name="identity", owner=handle, slug="identity", visibility="private", owner_user_id=identity_id, domain_id="identity", ) session.add(repo) record: JSONObject = { "handle": handle, "type": "human", "pubkey": pubkey_b64, "quorum": None, "registered_at": _NOW.isoformat(), "metadata": {}, **(extra_fields or {}), } content = json.dumps(record).encode() file_path = f"identities/{handle}.json" obj_id = blob_id(content) snap_id = blob_id(f"snap:{repo_id}:{handle}".encode()) cmt_id = blob_id(f"cmt:{repo_id}:{handle}".encode()) session.add(MusehubObject( object_id=obj_id, path=file_path, size_bytes=len(content), storage_uri=f"s3://muse-objects/objects/{obj_id}", content_cache=content, )) session.add(MusehubObjectRef(object_id=obj_id, repo_id=repo_id)) session.add(MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb({file_path: obj_id}, use_bin_type=True), entry_count=1, created_at=_NOW, )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) session.add(MusehubCommit( commit_id=cmt_id, branch="main", parent_ids=[], message=f"identity: register {handle}", author=identity_id, timestamp=_NOW, snapshot_id=snap_id, )) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cmt_id)) session.add(MusehubBranch( branch_id=compute_branch_id(repo_id, "main"), repo_id=repo_id, name="main", head_commit_id=cmt_id, )) await session.flush() return repo_id async def _read_head_record(session: AsyncSession, handle: str) -> JSONObject | None: """Read and return the parsed IdentityRecord from the identity repo HEAD, or None.""" from musehub.db.musehub_repo_models import ( MusehubRepo, MusehubBranch, MusehubCommit, MusehubSnapshot, MusehubObject, ) from musehub.storage.backends import read_object_bytes repo_row = (await session.execute( select(MusehubRepo).where( MusehubRepo.owner == handle, MusehubRepo.slug == "identity", ) )).scalar_one_or_none() if repo_row is None: return None branch = (await session.execute( select(MusehubBranch).where( MusehubBranch.repo_id == repo_row.repo_id, MusehubBranch.name == "main", ) )).scalar_one_or_none() if branch is None or branch.head_commit_id is None: return None # Expire cached state so we see the latest write from commit_files_to_repo. await session.commit() await session.refresh(branch) commit = await session.get(MusehubCommit, branch.head_commit_id) if commit is None or commit.snapshot_id is None: return None snap = await session.get(MusehubSnapshot, commit.snapshot_id) if snap is None: return None manifest: dict[str, str] = msgpack.unpackb(snap.manifest_blob, raw=False) obj_id = manifest.get(f"identities/{handle}.json") if obj_id is None: return None obj = await session.get(MusehubObject, obj_id) if obj is None: return None raw = await read_object_bytes(obj) if raw is None: return None try: return json.loads(raw) except Exception: return None # ═══════════════════════════════════════════════════════════════════════════════ # 1. _commit_key_rotation_to_identity_repo — direct tests # ═══════════════════════════════════════════════════════════════════════════════ class TestCommitKeyRotationToIdentityRepo: """Direct tests for _commit_key_rotation_to_identity_repo. Phase 5 and Phase 6 tests only verify the effects transitively (via check_quorum or manually-constructed repo states). These tests call the function directly and verify its concrete behaviour. """ async def test_noop_when_identity_id_not_found( self, db_session: AsyncSession ) -> None: """No crash and no state change when the identity_id does not exist in the DB.""" from musehub.services.musehub_auth import _commit_key_rotation_to_identity_repo await _commit_key_rotation_to_identity_repo( db_session, identity_id=long_id("ff" * 32), # non-existent new_public_key_b64=_KEY_B_B64, ) # If we reach here without an exception the no-op guard works. async def test_noop_when_identity_has_no_identity_repo( self, db_session: AsyncSession ) -> None: """No crash and no state change when the identity exists but has no identity repo.""" from musehub.services.musehub_auth import _commit_key_rotation_to_identity_repo handle = _uid("noirepo") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.commit() await _commit_key_rotation_to_identity_repo( db_session, identity_id=identity_id, new_public_key_b64=_KEY_B_B64, ) # Still no identity repo and no exception. async def test_updates_pubkey_in_head_record( self, db_session: AsyncSession ) -> None: """After rotation the HEAD record carries the new pubkey.""" from musehub.services.musehub_auth import _commit_key_rotation_to_identity_repo handle = _uid("rot") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() await _seed_identity_repo(db_session, handle, identity_id, _KEY_A_B64) await db_session.commit() await _commit_key_rotation_to_identity_repo( db_session, identity_id=identity_id, new_public_key_b64=_KEY_B_B64, ) record = await _read_head_record(db_session, handle) assert record is not None, "HEAD record must exist after rotation" assert record["pubkey"] == _KEY_B_B64, ( f"Expected new pubkey {_KEY_B_B64!r}, got {record.get('pubkey')!r}" ) async def test_preserves_other_fields_after_rotation( self, db_session: AsyncSession ) -> None: """Rotation rewrites only pubkey; handle, type, metadata, etc. are unchanged.""" from musehub.services.musehub_auth import _commit_key_rotation_to_identity_repo handle = _uid("preserve") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() await _seed_identity_repo( db_session, handle, identity_id, _KEY_A_B64, extra_fields={"metadata": {"display_name": "Test User", "custom": "value"}}, ) await db_session.commit() await _commit_key_rotation_to_identity_repo( db_session, identity_id=identity_id, new_public_key_b64=_KEY_B_B64, ) record = await _read_head_record(db_session, handle) assert record is not None assert record["handle"] == handle assert record["type"] == "human" assert record["pubkey"] == _KEY_B_B64 # Metadata from original record must survive the rotation. assert record.get("metadata", {}).get("display_name") == "Test User" async def test_second_rotation_overwrites_first( self, db_session: AsyncSession ) -> None: """Two consecutive rotations produce a HEAD with the second pubkey.""" from musehub.services.musehub_auth import _commit_key_rotation_to_identity_repo handle = _uid("double") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() await _seed_identity_repo(db_session, handle, identity_id, _KEY_A_B64) await db_session.commit() await _commit_key_rotation_to_identity_repo( db_session, identity_id=identity_id, new_public_key_b64=_KEY_B_B64, ) _KEY_C_BYTES = b"\xcc" * 32 _KEY_C_B64 = encode_pubkey("ed25519", _KEY_C_BYTES) await _commit_key_rotation_to_identity_repo( db_session, identity_id=identity_id, new_public_key_b64=_KEY_C_B64, ) record = await _read_head_record(db_session, handle) assert record is not None assert record["pubkey"] == _KEY_C_B64 # ═══════════════════════════════════════════════════════════════════════════════ # 2. State integrity # ═══════════════════════════════════════════════════════════════════════════════ class TestStateIntegrity: """Verify that identity repo HEAD and MusehubAuthKey fingerprint are consistent. The system has two sources of truth for a key: - MusehubAuthKey.fingerprint — sha256 of raw public key bytes - Identity repo HEAD record — "pubkey": "ed25519:" These must always agree. A mismatch would mean the quorum resolver and the auth verifier see different keys for the same identity. """ async def test_registered_key_fingerprint_matches_identity_repo_pubkey( self, db_session: AsyncSession ) -> None: """Key fingerprint in MusehubAuthKey == sha256(raw_bytes) from identity repo pubkey.""" from musehub.crypto.keys import key_fingerprint from muse.core.types import decode_pubkey handle = _uid("integ") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() db_session.add(_make_auth_key(identity_id, _KEY_A_FP, _KEY_A_B64)) await _seed_identity_repo(db_session, handle, identity_id, _KEY_A_B64) await db_session.commit() record = await _read_head_record(db_session, handle) assert record is not None repo_pubkey = record["pubkey"] _, raw_bytes = decode_pubkey(repo_pubkey) repo_fp = key_fingerprint(raw_bytes) assert repo_fp == _KEY_A_FP, ( f"Fingerprint mismatch: auth table has {_KEY_A_FP!r}, " f"identity repo HEAD yields {repo_fp!r}" ) async def test_get_identity_pubkey_consistent_with_auth_key_fingerprint( self, client: AsyncClient, db_session: AsyncSession ) -> None: """GET /api/identities/{handle} pubkey decodes to the registered key fingerprint.""" from musehub.crypto.keys import key_fingerprint from muse.core.types import decode_pubkey handle = _uid("getinteg") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() db_session.add(_make_auth_key(identity_id, _KEY_A_FP, _KEY_A_B64)) await _seed_identity_repo(db_session, handle, identity_id, _KEY_A_B64) await db_session.commit() r = await client.get(f"/api/identities/{handle}") assert r.status_code == 200, r.text pubkey_str = r.json()["pubkey"] assert pubkey_str is not None, "pubkey must be present when identity repo exists" _, raw_bytes = decode_pubkey(pubkey_str) computed_fp = key_fingerprint(raw_bytes) assert computed_fp == _KEY_A_FP, ( f"GET response pubkey yields fingerprint {computed_fp!r}, " f"expected {_KEY_A_FP!r} from the registered auth key" ) async def test_after_rotation_get_response_and_auth_key_agree( self, client: AsyncClient, db_session: AsyncSession ) -> None: """After rotation, GET response pubkey agrees with the new auth key fingerprint.""" from musehub.crypto.keys import key_fingerprint from muse.core.types import decode_pubkey from musehub.services.musehub_auth import _commit_key_rotation_to_identity_repo handle = _uid("rotinteg") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() db_session.add(_make_auth_key(identity_id, _KEY_A_FP, _KEY_A_B64)) await _seed_identity_repo(db_session, handle, identity_id, _KEY_A_B64) await db_session.commit() # Simulate key rotation: add new key to auth table + update identity repo. db_session.add(_make_auth_key(identity_id, _KEY_B_FP, _KEY_B_B64)) await _commit_key_rotation_to_identity_repo( db_session, identity_id=identity_id, new_public_key_b64=_KEY_B_B64, ) # Commit so the HTTP client's fresh DB session sees the rotated state. await db_session.commit() r = await client.get(f"/api/identities/{handle}") assert r.status_code == 200, r.text pubkey_str = r.json()["pubkey"] assert pubkey_str is not None _, raw_bytes = decode_pubkey(pubkey_str) computed_fp = key_fingerprint(raw_bytes) assert computed_fp == _KEY_B_FP, ( f"After rotation, GET pubkey should yield fingerprint {_KEY_B_FP!r}, " f"got {computed_fp!r}" ) # ═══════════════════════════════════════════════════════════════════════════════ # 3. Concurrent stress — read_object_bytes # ═══════════════════════════════════════════════════════════════════════════════ class TestReadObjectBytesStress: """read_object_bytes must be safe under high concurrency with all storage paths.""" def _cached_obj(self, data: bytes) -> None: from types import SimpleNamespace return SimpleNamespace( object_id=long_id("aa" * 32), content_cache=data, storage_uri="", ) def _s3_obj(self, object_id: str) -> None: from types import SimpleNamespace return SimpleNamespace( object_id=object_id, content_cache=None, storage_uri=f"s3://muse-objects/{object_id}", ) async def test_50_concurrent_reads_on_cached_object(self) -> None: from musehub.storage.backends import read_object_bytes expected = b"shared cached data " * 100 obj = self._cached_obj(expected) results = await asyncio.gather(*[read_object_bytes(obj) for _ in range(50)]) assert all(r == expected for r in results), ( "At least one concurrent cached read returned incorrect bytes" ) async def test_50_concurrent_reads_on_s3_object(self) -> None: from musehub.storage.backends import read_object_bytes, get_backend data = b"shared s3 data " * 100 oid = blob_id(data) await get_backend().put(oid, data) obj = self._s3_obj(oid) results = await asyncio.gather(*[read_object_bytes(obj) for _ in range(50)]) assert all(r == data for r in results), ( "At least one concurrent s3 read returned incorrect bytes" ) async def test_mixed_concurrent_reads_cached_and_s3(self) -> None: """50 concurrent reads across cached and s3 objects all return correct bytes.""" from musehub.storage.backends import read_object_bytes, get_backend cached_data = b"cached payload" s3_data = b"s3 payload" oid = blob_id(s3_data) await get_backend().put(oid, s3_data) cached = self._cached_obj(cached_data) on_s3 = self._s3_obj(oid) objs = [cached if i % 2 == 0 else on_s3 for i in range(50)] results = await asyncio.gather(*[read_object_bytes(o) for o in objs]) for i, result in enumerate(results): expected = cached_data if i % 2 == 0 else s3_data assert result == expected, ( f"Read {i}: expected {expected!r}, got {result!r}" ) async def test_concurrent_reads_complete_under_budget(self) -> None: """100 concurrent in-memory reads must complete in under 0.5 s.""" from musehub.storage.backends import read_object_bytes obj = self._cached_obj(b"x" * 4096) start = time.perf_counter() await asyncio.gather(*[read_object_bytes(obj) for _ in range(100)]) elapsed = time.perf_counter() - start assert elapsed < 0.5, ( f"100 concurrent in-memory reads took {elapsed:.3f}s; " "expected < 0.5s" ) # ═══════════════════════════════════════════════════════════════════════════════ # 4. Security edge cases # ═══════════════════════════════════════════════════════════════════════════════ class TestSecurityEdgeCases: """Robustness and containment guarantees for identity service security paths.""" # ── read_object_bytes ────────────────────────────────────────────────────── async def test_read_object_bytes_returns_none_when_blob_store_missing(self) -> None: """read_object_bytes never raises — returns None when blob store has no object.""" from musehub.storage.backends import read_object_bytes from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch oid = long_id("cc" * 32) obj = SimpleNamespace( object_id=oid, content_cache=None, storage_uri=f"s3://muse-objects/objects/{oid}", ) mock_backend = MagicMock() mock_backend.get = AsyncMock(return_value=None) with patch("musehub.storage.backends.get_backend", return_value=mock_backend): result = await read_object_bytes(obj) assert result is None async def test_read_object_bytes_returns_none_on_empty_paths(self) -> None: """read_object_bytes returns None when no cache and no URI are set.""" from musehub.storage.backends import read_object_bytes from types import SimpleNamespace obj = SimpleNamespace( object_id=long_id("dd" * 32), content_cache=None, storage_uri="", ) assert await read_object_bytes(obj) is None # ── resolve_handle_to_fingerprint ───────────────────────────────────────── async def test_resolve_handle_malformed_pubkey_returns_none( self, db_session: AsyncSession ) -> None: """A pubkey string without the 'ed25519:' prefix is rejected without raising.""" from musehub.services.musehub_governance import resolve_handle_to_fingerprint handle = _uid("malkey") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() # Seed identity repo with a pubkey that has no canonical prefix. bad_pubkey = base64.urlsafe_b64encode(b"\xaa" * 32).rstrip(b"=").decode() await _seed_identity_repo(db_session, handle, identity_id, bad_pubkey) await db_session.commit() result = await resolve_handle_to_fingerprint(db_session, handle) assert result is None, ( f"Expected None for malformed pubkey, got {result!r}" ) async def test_resolve_handle_empty_pubkey_returns_none( self, db_session: AsyncSession ) -> None: """An empty pubkey string in the identity record must return None.""" from musehub.services.musehub_governance import resolve_handle_to_fingerprint handle = _uid("emptypk") identity_id = compute_identity_id(handle.encode()) db_session.add(_make_identity(handle, identity_id)) await db_session.flush() await _seed_identity_repo(db_session, handle, identity_id, "") await db_session.commit() result = await resolve_handle_to_fingerprint(db_session, handle) assert result is None async def test_resolve_handle_no_identity_repo_returns_none( self, db_session: AsyncSession ) -> None: """A handle that exists in the DB but has no identity repo returns None.""" from musehub.services.musehub_governance import resolve_handle_to_fingerprint handle = _uid("norepo") db_session.add(_make_identity(handle)) await db_session.commit() result = await resolve_handle_to_fingerprint(db_session, handle) assert result is None async def test_resolve_unknown_handle_returns_none( self, db_session: AsyncSession ) -> None: """A handle that does not exist in the DB at all returns None without raising.""" from musehub.services.musehub_governance import resolve_handle_to_fingerprint result = await resolve_handle_to_fingerprint(db_session, "nobody-bns-zzz") assert result is None # ── _read_identity_record_from_repo ─────────────────────────────────────── async def test_read_identity_record_corrupted_json_returns_none( self, db_session: AsyncSession ) -> None: """Corrupted (non-JSON) bytes in the identity object return None, not an exception.""" from musehub.db.musehub_repo_models import ( MusehubRepo, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef, MusehubCommit, MusehubCommitRef, MusehubBranch, ) from musehub.api.routes.api.identities import _read_identity_record_from_repo handle = _uid("corrupt") identity_id = compute_identity_id(handle.encode()) repo_id = compute_repo_id(identity_id, "identity", "identity", _NOW.isoformat()) bad_content = b"\xff\xfe not json at all" file_path = f"identities/{handle}.json" obj_id = blob_id(bad_content) snap_id = blob_id(f"snap:corrupt:{handle}".encode()) cmt_id = blob_id(f"cmt:corrupt:{handle}".encode()) db_session.add(MusehubRepo( repo_id=repo_id, name="identity", owner=handle, slug="identity", visibility="private", owner_user_id=identity_id, domain_id="identity", )) db_session.add(MusehubObject( object_id=obj_id, path=file_path, size_bytes=len(bad_content), storage_uri=f"s3://muse-objects/objects/{obj_id}", content_cache=bad_content, )) db_session.add(MusehubObjectRef(object_id=obj_id, repo_id=repo_id)) db_session.add(MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb({file_path: obj_id}, use_bin_type=True), entry_count=1, created_at=_NOW, )) db_session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) db_session.add(MusehubCommit( commit_id=cmt_id, branch="main", parent_ids=[], message="corrupt init", author=identity_id, timestamp=_NOW, snapshot_id=snap_id, )) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cmt_id)) db_session.add(MusehubBranch( branch_id=compute_branch_id(repo_id, "main"), repo_id=repo_id, name="main", head_commit_id=cmt_id, )) await db_session.commit() result = await _read_identity_record_from_repo(db_session, handle) assert result is None, ( "Expected None for corrupted identity record, got a parsed dict" )