"""Tests for agent identity provisioning. Covers: - AgentRegistrationRequest model validation - register_agent_identity service function (unit tests with mocked DB) - POST /api/identities/agent route (happy path + error cases) - verify_and_authenticate identity_type support - VerifyRequest identity_type field validation """ from __future__ import annotations import secrets from unittest.mock import AsyncMock, MagicMock, patch import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from httpx import AsyncClient from muse.core.types import encode_pubkey, long_id, public_key_fingerprint from musehub.core.genesis import compute_identity_id from musehub.types.json_types import StrDict from musehub.models.musehub_auth import ( AgentRegistrationRequest, AgentRegistrationResponse, VerifyRequest, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _generate_key_material() -> tuple[str, str]: """Generate a fresh Ed25519 keypair and return (public_key_b64, fingerprint).""" key = Ed25519PrivateKey.generate() pub_raw = key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) return encode_pubkey("ed25519", pub_raw), public_key_fingerprint(pub_raw) # --------------------------------------------------------------------------- # AgentRegistrationRequest validation # --------------------------------------------------------------------------- class TestAgentRegistrationRequestValidation: def _valid_payload(self) -> JSONObject: pub_b64, fp = _generate_key_material() return { "handle": "agentception-abc123", "public_key_b64": pub_b64, "fingerprint": fp, "algorithm": "ed25519", "agent_model": "claude-sonnet-4-6", "scope": ["push:agentception"], "label": "ephemeral/agentception-abc123", } def test_valid_request_parses(self) -> None: req = AgentRegistrationRequest(**self._valid_payload()) assert req.handle == "agentception-abc123" assert req.algorithm == "ed25519" assert req.agent_model == "claude-sonnet-4-6" assert req.scope == ["push:agentception"] def test_handle_normalised_to_lowercase(self) -> None: payload = self._valid_payload() payload["handle"] = "AgentCeption-ABC" req = AgentRegistrationRequest(**payload) assert req.handle == "agentception-abc" def test_invalid_handle_rejected(self) -> None: payload = self._valid_payload() payload["handle"] = "agent with spaces" with pytest.raises(Exception): # ValidationError AgentRegistrationRequest(**payload) def test_fingerprint_must_be_64_hex(self) -> None: payload = self._valid_payload() payload["fingerprint"] = "tooshort" with pytest.raises(Exception): AgentRegistrationRequest(**payload) def test_invalid_algorithm_rejected(self) -> None: payload = self._valid_payload() payload["algorithm"] = "rsa-2048" with pytest.raises(Exception): AgentRegistrationRequest(**payload) def test_scope_defaults_to_empty_list(self) -> None: payload = self._valid_payload() del payload["scope"] req = AgentRegistrationRequest(**payload) assert req.scope == [] def test_expires_at_defaults_to_none(self) -> None: req = AgentRegistrationRequest(**self._valid_payload()) assert req.expires_at is None def test_expires_at_accepts_iso8601(self) -> None: payload = self._valid_payload() payload["expires_at"] = "2026-04-06T14:00:00Z" req = AgentRegistrationRequest(**payload) assert req.expires_at == "2026-04-06T14:00:00Z" # --------------------------------------------------------------------------- # VerifyRequest identity_type field # --------------------------------------------------------------------------- class TestVerifyRequestIdentityType: def _base_payload(self) -> JSONObject: return { "challenge_token": "a" * 64, "public_key_b64": "AAEC", "signature_b64": "AAEC", } def test_default_identity_type_is_human(self) -> None: req = VerifyRequest(**self._base_payload()) assert req.identity_type == "human" def test_agent_identity_type_accepted(self) -> None: req = VerifyRequest(**self._base_payload(), identity_type="agent") assert req.identity_type == "agent" def test_invalid_identity_type_rejected(self) -> None: with pytest.raises(Exception): VerifyRequest(**self._base_payload(), identity_type="robot") # --------------------------------------------------------------------------- # register_agent_identity service — unit tests # --------------------------------------------------------------------------- class TestRegisterAgentIdentityService: """Unit tests using a mock DB session — no real DB required.""" @staticmethod def _make_created_at() -> None: from datetime import datetime, timezone t = MagicMock() t.isoformat.return_value = "2026-04-06T00:00:00+00:00" return t def _make_mock_session( self, *, key_row_exists: bool = False, identity_row_exists: bool = False ) -> AsyncMock: session = AsyncMock() # Simulated scalar_one_or_none return for SELECT MusehubAuthKey mock_scalar_result = MagicMock() if key_row_exists: mock_key = MagicMock() mock_key.key_id = secrets.token_hex(16) mock_key.identity_id = compute_identity_id(secrets.token_bytes(16)) mock_key.algorithm = "ed25519" mock_key.fingerprint = "a" * 64 mock_key.label = "existing" mock_key.created_at = self._make_created_at() mock_key.last_used_at = None mock_scalar_result.scalar_one_or_none.return_value = mock_key if identity_row_exists: mock_identity = MagicMock() mock_identity.identity_id = mock_key.identity_id mock_identity.handle = "agentception-abc" # second execute call returns identity mock_scalar_result2 = MagicMock() mock_scalar_result2.scalar_one_or_none.return_value = mock_identity session.execute.side_effect = [ mock_scalar_result, mock_scalar_result2, ] else: session.execute.return_value = mock_scalar_result else: mock_scalar_result.scalar_one_or_none.return_value = None session.execute.return_value = mock_scalar_result session.flush = AsyncMock() session.commit = AsyncMock() session.add = MagicMock() # refresh populates server_default columns on any ORM object passed to it async def _mock_refresh(obj: MagicMock) -> None: now = self._make_created_at() if not hasattr(obj, "created_at") or obj.created_at is None: obj.created_at = now if not hasattr(obj, "updated_at") or obj.updated_at is None: obj.updated_at = now if not hasattr(obj, "default_branch") or obj.default_branch is None: obj.default_branch = "main" if not hasattr(obj, "last_used_at"): obj.last_used_at = None session.refresh = _mock_refresh return session @pytest.mark.asyncio async def test_new_agent_registration_creates_identity_and_key(self) -> None: from musehub.services.musehub_auth import register_agent_identity pub_b64, fp = _generate_key_material() session = self._make_mock_session(key_row_exists=False) result = await register_agent_identity( session=session, handle="agentception-abc", public_key_b64=pub_b64, fingerprint=fp, algorithm="ed25519", spawned_by="gabriel", agent_model="claude-sonnet-4-6", scope=["push:agentception"], ) assert result.is_new_identity is True assert result.spawned_by == "gabriel" assert result.handle == "agentception-abc" # identity + key + identity repo + branch + commit + commit_ref (from _create_identity_repo) assert session.add.call_count == 6 # commit 1: identity + key; commit 2: after identity repo created assert session.commit.call_count == 2 @pytest.mark.asyncio async def test_fingerprint_mismatch_raises_auth_error(self) -> None: from musehub.services.musehub_auth import AuthError, register_agent_identity pub_b64, _ = _generate_key_material() wrong_fp = long_id("f" * 64) # doesn't match the key session = self._make_mock_session(key_row_exists=False) with pytest.raises(AuthError) as exc_info: await register_agent_identity( session=session, handle="agent-x", public_key_b64=pub_b64, fingerprint=wrong_fp, algorithm="ed25519", spawned_by="gabriel", ) assert exc_info.value.status_code == 422 assert "fingerprint" in exc_info.value.detail.lower() @pytest.mark.asyncio async def test_invalid_public_key_b64_raises_auth_error(self) -> None: from musehub.services.musehub_auth import AuthError, register_agent_identity session = self._make_mock_session(key_row_exists=False) with pytest.raises(AuthError) as exc_info: await register_agent_identity( session=session, handle="agent-x", public_key_b64="!!!not-base64!!!", fingerprint=long_id("a" * 64), algorithm="ed25519", spawned_by="gabriel", ) assert exc_info.value.status_code == 422 @pytest.mark.asyncio async def test_invalid_expires_at_raises_auth_error(self) -> None: from musehub.services.musehub_auth import AuthError, register_agent_identity pub_b64, fp = _generate_key_material() session = self._make_mock_session(key_row_exists=False) with pytest.raises(AuthError) as exc_info: await register_agent_identity( session=session, handle="agent-x", public_key_b64=pub_b64, fingerprint=fp, algorithm="ed25519", spawned_by="gabriel", expires_at="not-a-date", ) assert exc_info.value.status_code == 422 assert "expires_at" in exc_info.value.detail.lower() @pytest.mark.asyncio async def test_expires_at_none_is_accepted(self) -> None: from musehub.services.musehub_auth import register_agent_identity pub_b64, fp = _generate_key_material() session = self._make_mock_session(key_row_exists=False) result = await register_agent_identity( session=session, handle="agent-no-expiry", public_key_b64=pub_b64, fingerprint=fp, algorithm="ed25519", spawned_by="gabriel", expires_at=None, ) assert result.is_new_identity is True # --------------------------------------------------------------------------- # POST /api/identities/agent route — HTTP integration tests # --------------------------------------------------------------------------- class TestProvisionAgentRoute: """Integration tests using the full FastAPI test client.""" def _valid_payload(self) -> JSONObject: pub_b64, fp = _generate_key_material() return { "handle": f"agent-{secrets.token_hex(4)}", "public_key_b64": pub_b64, "fingerprint": fp, "algorithm": "ed25519", "agent_model": "claude-sonnet-4-6", "scope": ["push"], "label": "test-ephemeral", } @pytest.mark.asyncio async def test_provision_agent_happy_path( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: payload = self._valid_payload() resp = await client.post( "/api/identities/agent", json=payload, headers=auth_headers, ) assert resp.status_code in (200, 201) data = resp.json() assert data["handle"] == payload["handle"] assert data["spawned_by"] == "testuser" # from auth_headers fixture assert data["is_new_identity"] is True assert "key" in data assert data["key"]["algorithm"] == "ed25519" @pytest.mark.asyncio async def test_provision_agent_requires_auth( self, client: AsyncClient, ) -> None: payload = self._valid_payload() resp = await client.post( "/api/identities/agent", json=payload, # No auth headers ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_provision_agent_duplicate_handle_409( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: payload = self._valid_payload() # First registration r1 = await client.post("/api/identities/agent", json=payload, headers=auth_headers) assert r1.status_code in (200, 201) # Same handle with a different key — should 409 _, fp2 = _generate_key_material() pub_b64_2, fp2 = _generate_key_material() payload2 = {**payload, "public_key_b64": pub_b64_2, "fingerprint": fp2} r2 = await client.post("/api/identities/agent", json=payload2, headers=auth_headers) assert r2.status_code == 409 @pytest.mark.asyncio async def test_provision_agent_idempotent_same_key( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Registering the same key twice is idempotent (returns 200 on re-register).""" payload = self._valid_payload() r1 = await client.post("/api/identities/agent", json=payload, headers=auth_headers) assert r1.status_code in (200, 201) r2 = await client.post("/api/identities/agent", json=payload, headers=auth_headers) assert r2.status_code == 200 data2 = r2.json() assert data2["is_new_identity"] is False @pytest.mark.asyncio async def test_provision_agent_invalid_handle_422( self, client: AsyncClient, auth_headers: StrDict, ) -> None: payload = self._valid_payload() payload["handle"] = "handle with spaces" resp = await client.post("/api/identities/agent", json=payload, headers=auth_headers) assert resp.status_code == 422 @pytest.mark.asyncio async def test_provision_agent_spawned_by_matches_operator( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: payload = self._valid_payload() resp = await client.post("/api/identities/agent", json=payload, headers=auth_headers) assert resp.status_code in (200, 201) data = resp.json() # _TEST_HANDLE from conftest assert data["spawned_by"] == "testuser" @pytest.mark.asyncio async def test_provision_agent_scope_stored( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: payload = self._valid_payload() payload["scope"] = ["push:agentception", "pull:agentception"] resp = await client.post("/api/identities/agent", json=payload, headers=auth_headers) assert resp.status_code in (200, 201) # --------------------------------------------------------------------------- # verify_and_authenticate identity_type — unit test # --------------------------------------------------------------------------- class TestVerifyAuthenticateIdentityType: """Unit test the identity_type param is propagated to the DB row.""" @pytest.mark.asyncio async def test_identity_type_human_is_default(self) -> None: """When identity_type is omitted it defaults to 'human'.""" req = VerifyRequest( challenge_token="a" * 64, public_key_b64="AAEC", signature_b64="AAEC", ) assert req.identity_type == "human" @pytest.mark.asyncio async def test_identity_type_agent_passes_through(self) -> None: req = VerifyRequest( challenge_token="a" * 64, public_key_b64="AAEC", signature_b64="AAEC", identity_type="agent", ) assert req.identity_type == "agent"