gabriel / musehub public

test_agent_registration.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Tests for agent identity provisioning.
2
3 Covers:
4 - AgentRegistrationRequest model validation
5 - register_agent_identity service function (unit tests with mocked DB)
6 - POST /api/identities/agent route (happy path + error cases)
7 - verify_and_authenticate identity_type support
8 - VerifyRequest identity_type field validation
9 """
10 from __future__ import annotations
11
12 import secrets
13 from unittest.mock import AsyncMock, MagicMock, patch
14
15 import pytest
16 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
17 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
18 from httpx import AsyncClient
19 from muse.core.types import encode_pubkey, long_id, public_key_fingerprint
20 from musehub.core.genesis import compute_identity_id
21
22 from musehub.types.json_types import StrDict
23 from musehub.models.musehub_auth import (
24 AgentRegistrationRequest,
25 AgentRegistrationResponse,
26 VerifyRequest,
27 )
28
29
30 # ---------------------------------------------------------------------------
31 # Helpers
32 # ---------------------------------------------------------------------------
33
34
35 def _generate_key_material() -> tuple[str, str]:
36 """Generate a fresh Ed25519 keypair and return (public_key_b64, fingerprint)."""
37 key = Ed25519PrivateKey.generate()
38 pub_raw = key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
39 return encode_pubkey("ed25519", pub_raw), public_key_fingerprint(pub_raw)
40
41
42 # ---------------------------------------------------------------------------
43 # AgentRegistrationRequest validation
44 # ---------------------------------------------------------------------------
45
46
47 class TestAgentRegistrationRequestValidation:
48 def _valid_payload(self) -> JSONObject:
49 pub_b64, fp = _generate_key_material()
50 return {
51 "handle": "agentception-abc123",
52 "public_key_b64": pub_b64,
53 "fingerprint": fp,
54 "algorithm": "ed25519",
55 "agent_model": "claude-sonnet-4-6",
56 "scope": ["push:agentception"],
57 "label": "ephemeral/agentception-abc123",
58 }
59
60 def test_valid_request_parses(self) -> None:
61 req = AgentRegistrationRequest(**self._valid_payload())
62 assert req.handle == "agentception-abc123"
63 assert req.algorithm == "ed25519"
64 assert req.agent_model == "claude-sonnet-4-6"
65 assert req.scope == ["push:agentception"]
66
67 def test_handle_normalised_to_lowercase(self) -> None:
68 payload = self._valid_payload()
69 payload["handle"] = "AgentCeption-ABC"
70 req = AgentRegistrationRequest(**payload)
71 assert req.handle == "agentception-abc"
72
73 def test_invalid_handle_rejected(self) -> None:
74 payload = self._valid_payload()
75 payload["handle"] = "agent with spaces"
76 with pytest.raises(Exception): # ValidationError
77 AgentRegistrationRequest(**payload)
78
79 def test_fingerprint_must_be_64_hex(self) -> None:
80 payload = self._valid_payload()
81 payload["fingerprint"] = "tooshort"
82 with pytest.raises(Exception):
83 AgentRegistrationRequest(**payload)
84
85 def test_invalid_algorithm_rejected(self) -> None:
86 payload = self._valid_payload()
87 payload["algorithm"] = "rsa-2048"
88 with pytest.raises(Exception):
89 AgentRegistrationRequest(**payload)
90
91 def test_scope_defaults_to_empty_list(self) -> None:
92 payload = self._valid_payload()
93 del payload["scope"]
94 req = AgentRegistrationRequest(**payload)
95 assert req.scope == []
96
97 def test_expires_at_defaults_to_none(self) -> None:
98 req = AgentRegistrationRequest(**self._valid_payload())
99 assert req.expires_at is None
100
101 def test_expires_at_accepts_iso8601(self) -> None:
102 payload = self._valid_payload()
103 payload["expires_at"] = "2026-04-06T14:00:00Z"
104 req = AgentRegistrationRequest(**payload)
105 assert req.expires_at == "2026-04-06T14:00:00Z"
106
107
108 # ---------------------------------------------------------------------------
109 # VerifyRequest identity_type field
110 # ---------------------------------------------------------------------------
111
112
113 class TestVerifyRequestIdentityType:
114 def _base_payload(self) -> JSONObject:
115 return {
116 "challenge_token": "a" * 64,
117 "public_key_b64": "AAEC",
118 "signature_b64": "AAEC",
119 }
120
121 def test_default_identity_type_is_human(self) -> None:
122 req = VerifyRequest(**self._base_payload())
123 assert req.identity_type == "human"
124
125 def test_agent_identity_type_accepted(self) -> None:
126 req = VerifyRequest(**self._base_payload(), identity_type="agent")
127 assert req.identity_type == "agent"
128
129 def test_invalid_identity_type_rejected(self) -> None:
130 with pytest.raises(Exception):
131 VerifyRequest(**self._base_payload(), identity_type="robot")
132
133
134 # ---------------------------------------------------------------------------
135 # register_agent_identity service — unit tests
136 # ---------------------------------------------------------------------------
137
138
139 class TestRegisterAgentIdentityService:
140 """Unit tests using a mock DB session — no real DB required."""
141
142 @staticmethod
143 def _make_created_at() -> None:
144 from datetime import datetime, timezone
145 t = MagicMock()
146 t.isoformat.return_value = "2026-04-06T00:00:00+00:00"
147 return t
148
149 def _make_mock_session(
150 self, *, key_row_exists: bool = False, identity_row_exists: bool = False
151 ) -> AsyncMock:
152 session = AsyncMock()
153
154 # Simulated scalar_one_or_none return for SELECT MusehubAuthKey
155 mock_scalar_result = MagicMock()
156 if key_row_exists:
157 mock_key = MagicMock()
158 mock_key.key_id = secrets.token_hex(16)
159 mock_key.identity_id = compute_identity_id(secrets.token_bytes(16))
160 mock_key.algorithm = "ed25519"
161 mock_key.fingerprint = "a" * 64
162 mock_key.label = "existing"
163 mock_key.created_at = self._make_created_at()
164 mock_key.last_used_at = None
165 mock_scalar_result.scalar_one_or_none.return_value = mock_key
166
167 if identity_row_exists:
168 mock_identity = MagicMock()
169 mock_identity.identity_id = mock_key.identity_id
170 mock_identity.handle = "agentception-abc"
171 # second execute call returns identity
172 mock_scalar_result2 = MagicMock()
173 mock_scalar_result2.scalar_one_or_none.return_value = mock_identity
174 session.execute.side_effect = [
175 mock_scalar_result,
176 mock_scalar_result2,
177 ]
178 else:
179 session.execute.return_value = mock_scalar_result
180 else:
181 mock_scalar_result.scalar_one_or_none.return_value = None
182 session.execute.return_value = mock_scalar_result
183
184 session.flush = AsyncMock()
185 session.commit = AsyncMock()
186 session.add = MagicMock()
187
188 # refresh populates server_default columns on any ORM object passed to it
189 async def _mock_refresh(obj: MagicMock) -> None:
190 now = self._make_created_at()
191 if not hasattr(obj, "created_at") or obj.created_at is None:
192 obj.created_at = now
193 if not hasattr(obj, "updated_at") or obj.updated_at is None:
194 obj.updated_at = now
195 if not hasattr(obj, "default_branch") or obj.default_branch is None:
196 obj.default_branch = "main"
197 if not hasattr(obj, "last_used_at"):
198 obj.last_used_at = None
199
200 session.refresh = _mock_refresh
201
202 return session
203
204 @pytest.mark.asyncio
205 async def test_new_agent_registration_creates_identity_and_key(self) -> None:
206 from musehub.services.musehub_auth import register_agent_identity
207
208 pub_b64, fp = _generate_key_material()
209 session = self._make_mock_session(key_row_exists=False)
210
211 result = await register_agent_identity(
212 session=session,
213 handle="agentception-abc",
214 public_key_b64=pub_b64,
215 fingerprint=fp,
216 algorithm="ed25519",
217 spawned_by="gabriel",
218 agent_model="claude-sonnet-4-6",
219 scope=["push:agentception"],
220 )
221
222 assert result.is_new_identity is True
223 assert result.spawned_by == "gabriel"
224 assert result.handle == "agentception-abc"
225 # identity + key + identity repo + branch + commit + commit_ref (from _create_identity_repo)
226 assert session.add.call_count == 6
227 # commit 1: identity + key; commit 2: after identity repo created
228 assert session.commit.call_count == 2
229
230 @pytest.mark.asyncio
231 async def test_fingerprint_mismatch_raises_auth_error(self) -> None:
232 from musehub.services.musehub_auth import AuthError, register_agent_identity
233
234 pub_b64, _ = _generate_key_material()
235 wrong_fp = long_id("f" * 64) # doesn't match the key
236
237 session = self._make_mock_session(key_row_exists=False)
238
239 with pytest.raises(AuthError) as exc_info:
240 await register_agent_identity(
241 session=session,
242 handle="agent-x",
243 public_key_b64=pub_b64,
244 fingerprint=wrong_fp,
245 algorithm="ed25519",
246 spawned_by="gabriel",
247 )
248
249 assert exc_info.value.status_code == 422
250 assert "fingerprint" in exc_info.value.detail.lower()
251
252 @pytest.mark.asyncio
253 async def test_invalid_public_key_b64_raises_auth_error(self) -> None:
254 from musehub.services.musehub_auth import AuthError, register_agent_identity
255
256 session = self._make_mock_session(key_row_exists=False)
257
258 with pytest.raises(AuthError) as exc_info:
259 await register_agent_identity(
260 session=session,
261 handle="agent-x",
262 public_key_b64="!!!not-base64!!!",
263 fingerprint=long_id("a" * 64),
264 algorithm="ed25519",
265 spawned_by="gabriel",
266 )
267
268 assert exc_info.value.status_code == 422
269
270 @pytest.mark.asyncio
271 async def test_invalid_expires_at_raises_auth_error(self) -> None:
272 from musehub.services.musehub_auth import AuthError, register_agent_identity
273
274 pub_b64, fp = _generate_key_material()
275 session = self._make_mock_session(key_row_exists=False)
276
277 with pytest.raises(AuthError) as exc_info:
278 await register_agent_identity(
279 session=session,
280 handle="agent-x",
281 public_key_b64=pub_b64,
282 fingerprint=fp,
283 algorithm="ed25519",
284 spawned_by="gabriel",
285 expires_at="not-a-date",
286 )
287
288 assert exc_info.value.status_code == 422
289 assert "expires_at" in exc_info.value.detail.lower()
290
291 @pytest.mark.asyncio
292 async def test_expires_at_none_is_accepted(self) -> None:
293 from musehub.services.musehub_auth import register_agent_identity
294
295 pub_b64, fp = _generate_key_material()
296 session = self._make_mock_session(key_row_exists=False)
297
298 result = await register_agent_identity(
299 session=session,
300 handle="agent-no-expiry",
301 public_key_b64=pub_b64,
302 fingerprint=fp,
303 algorithm="ed25519",
304 spawned_by="gabriel",
305 expires_at=None,
306 )
307 assert result.is_new_identity is True
308
309
310 # ---------------------------------------------------------------------------
311 # POST /api/identities/agent route — HTTP integration tests
312 # ---------------------------------------------------------------------------
313
314
315 class TestProvisionAgentRoute:
316 """Integration tests using the full FastAPI test client."""
317
318 def _valid_payload(self) -> JSONObject:
319 pub_b64, fp = _generate_key_material()
320 return {
321 "handle": f"agent-{secrets.token_hex(4)}",
322 "public_key_b64": pub_b64,
323 "fingerprint": fp,
324 "algorithm": "ed25519",
325 "agent_model": "claude-sonnet-4-6",
326 "scope": ["push"],
327 "label": "test-ephemeral",
328 }
329
330 @pytest.mark.asyncio
331 async def test_provision_agent_happy_path(
332 self,
333 client: AsyncClient,
334 auth_headers: StrDict,
335 db_session: AsyncSession,
336 ) -> None:
337 payload = self._valid_payload()
338 resp = await client.post(
339 "/api/identities/agent",
340 json=payload,
341 headers=auth_headers,
342 )
343 assert resp.status_code in (200, 201)
344 data = resp.json()
345 assert data["handle"] == payload["handle"]
346 assert data["spawned_by"] == "testuser" # from auth_headers fixture
347 assert data["is_new_identity"] is True
348 assert "key" in data
349 assert data["key"]["algorithm"] == "ed25519"
350
351 @pytest.mark.asyncio
352 async def test_provision_agent_requires_auth(
353 self,
354 client: AsyncClient,
355 ) -> None:
356 payload = self._valid_payload()
357 resp = await client.post(
358 "/api/identities/agent",
359 json=payload,
360 # No auth headers
361 )
362 assert resp.status_code == 401
363
364 @pytest.mark.asyncio
365 async def test_provision_agent_duplicate_handle_409(
366 self,
367 client: AsyncClient,
368 auth_headers: StrDict,
369 db_session: AsyncSession,
370 ) -> None:
371 payload = self._valid_payload()
372 # First registration
373 r1 = await client.post("/api/identities/agent", json=payload, headers=auth_headers)
374 assert r1.status_code in (200, 201)
375
376 # Same handle with a different key — should 409
377 _, fp2 = _generate_key_material()
378 pub_b64_2, fp2 = _generate_key_material()
379 payload2 = {**payload, "public_key_b64": pub_b64_2, "fingerprint": fp2}
380 r2 = await client.post("/api/identities/agent", json=payload2, headers=auth_headers)
381 assert r2.status_code == 409
382
383 @pytest.mark.asyncio
384 async def test_provision_agent_idempotent_same_key(
385 self,
386 client: AsyncClient,
387 auth_headers: StrDict,
388 db_session: AsyncSession,
389 ) -> None:
390 """Registering the same key twice is idempotent (returns 200 on re-register)."""
391 payload = self._valid_payload()
392 r1 = await client.post("/api/identities/agent", json=payload, headers=auth_headers)
393 assert r1.status_code in (200, 201)
394
395 r2 = await client.post("/api/identities/agent", json=payload, headers=auth_headers)
396 assert r2.status_code == 200
397 data2 = r2.json()
398 assert data2["is_new_identity"] is False
399
400 @pytest.mark.asyncio
401 async def test_provision_agent_invalid_handle_422(
402 self,
403 client: AsyncClient,
404 auth_headers: StrDict,
405 ) -> None:
406 payload = self._valid_payload()
407 payload["handle"] = "handle with spaces"
408 resp = await client.post("/api/identities/agent", json=payload, headers=auth_headers)
409 assert resp.status_code == 422
410
411 @pytest.mark.asyncio
412 async def test_provision_agent_spawned_by_matches_operator(
413 self,
414 client: AsyncClient,
415 auth_headers: StrDict,
416 db_session: AsyncSession,
417 ) -> None:
418 payload = self._valid_payload()
419 resp = await client.post("/api/identities/agent", json=payload, headers=auth_headers)
420 assert resp.status_code in (200, 201)
421 data = resp.json()
422 # _TEST_HANDLE from conftest
423 assert data["spawned_by"] == "testuser"
424
425 @pytest.mark.asyncio
426 async def test_provision_agent_scope_stored(
427 self,
428 client: AsyncClient,
429 auth_headers: StrDict,
430 db_session: AsyncSession,
431 ) -> None:
432 payload = self._valid_payload()
433 payload["scope"] = ["push:agentception", "pull:agentception"]
434 resp = await client.post("/api/identities/agent", json=payload, headers=auth_headers)
435 assert resp.status_code in (200, 201)
436
437
438 # ---------------------------------------------------------------------------
439 # verify_and_authenticate identity_type — unit test
440 # ---------------------------------------------------------------------------
441
442
443 class TestVerifyAuthenticateIdentityType:
444 """Unit test the identity_type param is propagated to the DB row."""
445
446 @pytest.mark.asyncio
447 async def test_identity_type_human_is_default(self) -> None:
448 """When identity_type is omitted it defaults to 'human'."""
449 req = VerifyRequest(
450 challenge_token="a" * 64,
451 public_key_b64="AAEC",
452 signature_b64="AAEC",
453 )
454 assert req.identity_type == "human"
455
456 @pytest.mark.asyncio
457 async def test_identity_type_agent_passes_through(self) -> None:
458 req = VerifyRequest(
459 challenge_token="a" * 64,
460 public_key_b64="AAEC",
461 signature_b64="AAEC",
462 identity_type="agent",
463 )
464 assert req.identity_type == "agent"