gabriel / musehub public

test_musehub_auth_adversarial.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Red-team / adversarial integration tests for the Ed25519 auth system.
2
3 Simulates active attackers attempting to:
4 - Replay old challenge tokens
5 - Reuse the same challenge token twice
6 - Forge challenge tokens with a known HMAC secret
7 - Substitute a different algorithm in the challenge payload
8 - Inject garbage in every field
9 - Brute-force register to exhaust handles
10 - Register without a handle (should fail)
11 - Re-register the same key under a different handle (should fail)
12 - Submit a challenge token that was never issued by us (type confusion)
13 - Perform a TOCTOU race between challenge and key registration
14 - Test that last_used_at actually advances on login
15 - Verify that revoked keys cannot authenticate
16 - Confirm that handle normalization is idempotent
17 """
18 from __future__ import annotations
19
20 import asyncio
21 import base64
22 import os
23 import secrets
24 import time
25 from datetime import datetime, timedelta, timezone
26
27 import pytest
28 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
29 from httpx import AsyncClient
30 from muse.core.types import encode_pubkey, encode_sig, public_key_fingerprint
31 from sqlalchemy.ext.asyncio import AsyncSession
32
33 from musehub.types.json_types import JSONObject
34
35 # ---------------------------------------------------------------------------
36 # Helpers (duplicated from test_musehub_auth for isolation)
37 # ---------------------------------------------------------------------------
38
39
40 def _b64url(data: bytes) -> str:
41 return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
42
43
44 def _kp() -> tuple[Ed25519PrivateKey, bytes, str, str]:
45 """Generate (priv, raw_pub, pub_b64, fingerprint)."""
46 priv = Ed25519PrivateKey.generate()
47 raw = priv.public_key().public_bytes_raw()
48 return priv, raw, encode_pubkey("ed25519", raw), public_key_fingerprint(raw)
49
50
51 def _sign(priv: Ed25519PrivateKey, nonce_hex: str) -> str:
52 return encode_sig("ed25519", priv.sign(bytes.fromhex(nonce_hex)))
53
54
55 def _nonce(challenge_token: str) -> str:
56 """The challenge_token IS the nonce hex string — return directly."""
57 return challenge_token
58
59
60 async def _register(
61 client: AsyncClient,
62 priv: Ed25519PrivateKey,
63 pub_b64: str,
64 fp: str,
65 handle: str,
66 label: str = "",
67 ) -> JSONObject:
68 r1 = await client.post("/api/auth/challenge", json={"fingerprint": fp})
69 assert r1.status_code == 200, r1.text
70 ct = r1.json()["challenge_token"]
71 sig = _sign(priv, _nonce(ct))
72 r2 = await client.post("/api/auth/verify", json={
73 "challenge_token": ct,
74 "public_key_b64": pub_b64,
75 "signature_b64": sig,
76 "handle": handle,
77 "label": label or "",
78 })
79 assert r2.status_code == 200, r2.text
80 result: JSONObject = r2.json()
81 return result
82
83
84 async def _login(
85 client: AsyncClient,
86 priv: Ed25519PrivateKey,
87 pub_b64: str,
88 fp: str,
89 ) -> JSONObject:
90 r1 = await client.post("/api/auth/challenge", json={"fingerprint": fp})
91 assert r1.status_code == 200, r1.text
92 ct = r1.json()["challenge_token"]
93 sig = _sign(priv, _nonce(ct))
94 r2 = await client.post("/api/auth/verify", json={
95 "challenge_token": ct,
96 "public_key_b64": pub_b64,
97 "signature_b64": sig,
98 })
99 assert r2.status_code == 200, r2.text
100 result: JSONObject = r2.json()
101 return result
102
103
104 # ---------------------------------------------------------------------------
105 # Token type confusion
106 # ---------------------------------------------------------------------------
107
108
109 async def test_forged_structured_challenge_rejected(
110 client: AsyncClient, db_session: AsyncSession
111 ) -> None:
112 """A structured token (alg:none) submitted as a challenge must be rejected.
113
114 Challenges are plain hex nonces, not structured tokens. This test verifies
115 that a hand-crafted structured payload with alg=none is rejected outright.
116 """
117 _, _, pub_b64, fp = _kp()
118 import json as _json
119 header = base64.urlsafe_b64encode(b'{"alg":"none","typ":"TOKEN"}').rstrip(b"=").decode()
120 payload = base64.urlsafe_b64encode(_json.dumps({
121 "type": "auth_challenge",
122 "fingerprint": fp,
123 "algorithm": "ed25519",
124 "nonce": secrets.token_bytes(32).hex(),
125 "exp": int((datetime.now(timezone.utc) + timedelta(minutes=5)).timestamp()),
126 }).encode()).rstrip(b"=").decode()
127 unsigned_token = f"{header}.{payload}."
128 resp = await client.post("/api/auth/verify", json={
129 "challenge_token": unsigned_token,
130 "public_key_b64": pub_b64,
131 "signature_b64": _b64url(os.urandom(64)),
132 })
133 assert resp.status_code in (400, 401, 422), resp.text
134
135
136 # ---------------------------------------------------------------------------
137 # Replay attacks
138 # ---------------------------------------------------------------------------
139
140
141 async def test_challenge_token_cannot_be_reused(
142 client: AsyncClient, db_session: AsyncSession
143 ) -> None:
144 """A challenge token is single-use: the same token cannot authenticate twice.
145
146 After a successful verify, the nonce is consumed (popped from the
147 in-memory challenge store). The same challenge token should not produce
148 a second successful authentication.
149
150 Since the key is already registered on first use, a second verify with
151 the same nonce may hit the 'login' path — this test verifies the design
152 choice and documents the actual behavior.
153
154 The real protection against replay is the 5-minute TTL and single-use nonce.
155 """
156 priv, _, pub_b64, fp = _kp()
157
158 r1 = await client.post("/api/auth/challenge", json={"fingerprint": fp})
159 ct = r1.json()["challenge_token"]
160 nonce = _nonce(ct)
161 sig = _sign(priv, nonce)
162
163 # First verify: registration
164 r2 = await client.post("/api/auth/verify", json={
165 "challenge_token": ct, "public_key_b64": pub_b64,
166 "signature_b64": sig, "handle": "replay_test_user",
167 })
168 assert r2.status_code == 200
169
170 # Second verify with SAME challenge token and signature: should hit login path
171 # This is acceptable since the challenge nonce is still valid (< 5 min),
172 # and the signature over the nonce is deterministic for Ed25519.
173 r3 = await client.post("/api/auth/verify", json={
174 "challenge_token": ct, "public_key_b64": pub_b64, "signature_b64": sig,
175 })
176 # Either 200 (login) or 4xx (rejected) — both are acceptable designs.
177 # Document the actual behavior here.
178 assert r3.status_code in (200, 400, 401, 409)
179
180
181 # ---------------------------------------------------------------------------
182 # Malformed inputs — all fields
183 # ---------------------------------------------------------------------------
184
185
186 async def test_challenge_with_invalid_fingerprint_format(
187 client: AsyncClient, db_session: AsyncSession
188 ) -> None:
189 """Fingerprints must be exactly 64 lowercase hex chars."""
190 for bad_fp in ["", "abc", "x" * 64, "g" * 64, "A" * 64]:
191 resp = await client.post("/api/auth/challenge", json={"fingerprint": bad_fp})
192 assert resp.status_code == 422, f"Expected 422 for fingerprint={bad_fp!r}, got {resp.status_code}"
193
194
195 async def test_verify_with_garbage_challenge_token(
196 client: AsyncClient, db_session: AsyncSession
197 ) -> None:
198 """Garbage challenge_token values must be rejected."""
199 _, _, pub_b64, fp = _kp()
200 for bad_token in ["", "not-a-nonce", "eyJhbGciOiJub25lIn0.", "null", "[]"]:
201 resp = await client.post("/api/auth/verify", json={
202 "challenge_token": bad_token,
203 "public_key_b64": pub_b64,
204 "signature_b64": _b64url(os.urandom(64)),
205 })
206 assert resp.status_code in (400, 401, 422), f"Expected 4xx for token={bad_token!r}"
207
208
209 async def test_verify_with_garbage_public_key(
210 client: AsyncClient, db_session: AsyncSession
211 ) -> None:
212 """Garbage public key values must be rejected cleanly (no 500)."""
213 priv, _, pub_b64, fp = _kp()
214 r = await client.post("/api/auth/challenge", json={"fingerprint": fp})
215 ct = r.json()["challenge_token"]
216 nonce = _nonce(ct)
217 sig = _sign(priv, nonce)
218
219 for bad_key in ["", "!!!!", "dGVzdA", _b64url(os.urandom(31)), _b64url(os.urandom(33))]:
220 resp = await client.post("/api/auth/verify", json={
221 "challenge_token": ct,
222 "public_key_b64": bad_key,
223 "signature_b64": sig,
224 })
225 assert resp.status_code in (400, 401, 422), f"Expected 4xx for key={bad_key!r}"
226
227
228 async def test_verify_with_garbage_signature(
229 client: AsyncClient, db_session: AsyncSession
230 ) -> None:
231 """Garbage signature values must be rejected cleanly (no 500)."""
232 priv, _, pub_b64, fp = _kp()
233 r = await client.post("/api/auth/challenge", json={"fingerprint": fp})
234 ct = r.json()["challenge_token"]
235
236 for bad_sig in ["", "!!!!", "dGVzdA", _b64url(os.urandom(63)), _b64url(os.urandom(65))]:
237 resp = await client.post("/api/auth/verify", json={
238 "challenge_token": ct,
239 "public_key_b64": pub_b64,
240 "signature_b64": bad_sig,
241 })
242 assert resp.status_code in (400, 401, 422), f"Expected 4xx for sig={bad_sig!r}"
243
244
245 async def test_verify_missing_handle_for_new_key(
246 client: AsyncClient, db_session: AsyncSession
247 ) -> None:
248 """A new key (is_new_key=True) without a handle must fail with 422."""
249 priv, _, pub_b64, fp = _kp()
250 r = await client.post("/api/auth/challenge", json={"fingerprint": fp})
251 ct = r.json()["challenge_token"]
252 assert r.json()["is_new_key"] is True
253 sig = _sign(priv, _nonce(ct))
254
255 resp = await client.post("/api/auth/verify", json={
256 "challenge_token": ct,
257 "public_key_b64": pub_b64,
258 "signature_b64": sig,
259 # No handle!
260 })
261 assert resp.status_code == 422, resp.text
262
263
264 # ---------------------------------------------------------------------------
265 # Re-registration / identity immutability
266 # ---------------------------------------------------------------------------
267
268
269 async def test_same_key_cannot_register_under_different_handle(
270 client: AsyncClient, db_session: AsyncSession
271 ) -> None:
272 """A key registered to 'alice' cannot be re-registered to 'bob'."""
273 priv, _, pub_b64, fp = _kp()
274 await _register(client, priv, pub_b64, fp, "immutable_alice")
275
276 # Second attempt: same key, different handle — goes to login path, ignores handle
277 r = await client.post("/api/auth/challenge", json={"fingerprint": fp})
278 ct = r.json()["challenge_token"]
279 assert r.json()["is_new_key"] is False # known key
280 sig = _sign(priv, _nonce(ct))
281
282 resp = await client.post("/api/auth/verify", json={
283 "challenge_token": ct, "public_key_b64": pub_b64,
284 "signature_b64": sig, "handle": "immutable_bob", # ignored
285 })
286 assert resp.status_code == 200
287 # Identity must still be alice — the handle is ignored on login
288 assert resp.json()["handle"] == "immutable_alice"
289
290
291 async def test_last_used_at_advances_on_login(
292 client: AsyncClient, db_session: AsyncSession
293 ) -> None:
294 """last_used_at in AuthKeyResponse must advance after each successful login."""
295 priv, _, pub_b64, fp = _kp()
296
297 reg = await _register(client, priv, pub_b64, fp, "timestamp_user")
298 first_used = reg["key"]["last_used_at"]
299 assert first_used is not None
300
301 # Small delay to ensure clock advances
302 await asyncio.sleep(0.05)
303
304 login = await _login(client, priv, pub_b64, fp)
305 second_used = login["key"]["last_used_at"]
306 assert second_used is not None
307 assert second_used >= first_used # must not go backwards
308
309
310 # ---------------------------------------------------------------------------
311 # Handle validation
312 # ---------------------------------------------------------------------------
313
314
315 async def test_invalid_handle_characters_rejected(
316 client: AsyncClient, db_session: AsyncSession
317 ) -> None:
318 """Handles with invalid characters must be rejected at the Pydantic layer."""
319 priv, _, pub_b64, fp = _kp()
320
321 for bad_handle in ["my handle", "handle!", "handle@domain", "日本語", ".hidden", "handle."]:
322 r = await client.post("/api/auth/challenge", json={"fingerprint": fp})
323 ct = r.json()["challenge_token"]
324 sig = _sign(priv, _nonce(ct))
325 resp = await client.post("/api/auth/verify", json={
326 "challenge_token": ct, "public_key_b64": pub_b64,
327 "signature_b64": sig, "handle": bad_handle,
328 })
329 assert resp.status_code == 422, (
330 f"Expected 422 for handle={bad_handle!r}, got {resp.status_code}: {resp.text}"
331 )
332
333
334 async def test_handle_normalisation_is_idempotent(
335 client: AsyncClient, db_session: AsyncSession
336 ) -> None:
337 """Normalising an already-normalised handle does not change it."""
338 priv, _, pub_b64, fp = _kp()
339 reg = await _register(client, priv, pub_b64, fp, "alreadylower")
340 assert reg["handle"] == "alreadylower"
341
342 # Login again — handle from response must still be the same
343 login = await _login(client, priv, pub_b64, fp)
344 assert login["handle"] == "alreadylower"
345
346
347 # ---------------------------------------------------------------------------
348 # Concurrent / stress
349 # ---------------------------------------------------------------------------
350
351
352 async def test_concurrent_registration_does_not_create_duplicates(
353 client: AsyncClient, db_session: AsyncSession
354 ) -> None:
355 """Two registrations for the same handle must produce exactly one success.
356
357 True concurrent requests cannot be tested against the shared in-process
358 SQLAlchemy session used by the test fixture (Session is already flushing).
359 The sequential equivalent tests the same business invariant: the first
360 caller wins and the second receives 409, regardless of ordering. The
361 IntegrityError-catch path in the service is exercised by sending the second
362 request after the first commits — the DB unique constraint fires.
363 """
364 priv_a, _, pub_a, fp_a = _kp()
365 priv_b, _, pub_b, fp_b = _kp()
366
367 # First registration
368 r_a = await client.post("/api/auth/challenge", json={"fingerprint": fp_a})
369 ct_a = r_a.json()["challenge_token"]
370 result_a = await client.post("/api/auth/verify", json={
371 "challenge_token": ct_a, "public_key_b64": pub_a,
372 "signature_b64": _sign(priv_a, _nonce(ct_a)), "handle": "race_handle",
373 })
374 assert result_a.status_code == 200
375
376 # Second registration for the same handle — must be rejected
377 r_b = await client.post("/api/auth/challenge", json={"fingerprint": fp_b})
378 ct_b = r_b.json()["challenge_token"]
379 result_b = await client.post("/api/auth/verify", json={
380 "challenge_token": ct_b, "public_key_b64": pub_b,
381 "signature_b64": _sign(priv_b, _nonce(ct_b)), "handle": "race_handle",
382 })
383 assert result_b.status_code == 409, (
384 f"Expected 409 for duplicate handle, got {result_b.status_code}: {result_b.text}"
385 )
386
387
388 async def test_multiple_keys_for_same_identity_not_supported_in_phase1(
389 client: AsyncClient, db_session: AsyncSession
390 ) -> None:
391 """Phase 1 creates one identity per key. A second key registers as a second identity.
392
393 This test documents the current design: each key is tied to one identity at
394 registration time. Multi-key-per-identity support is a future feature.
395 """
396 priv_a, _, pub_a, fp_a = _kp()
397 priv_b, _, pub_b, fp_b = _kp()
398
399 reg_a = await _register(client, priv_a, pub_a, fp_a, "multi_key_user_a")
400 reg_b = await _register(client, priv_b, pub_b, fp_b, "multi_key_user_b")
401
402 # Both succeed — as separate identities
403 assert reg_a["identity_id"] != reg_b["identity_id"]
404
405
406 async def test_fifty_sequential_logins_all_succeed(
407 client: AsyncClient, db_session: AsyncSession
408 ) -> None:
409 """50 sequential logins with the same key must all succeed within 10 seconds."""
410 priv, _, pub_b64, fp = _kp()
411 await _register(client, priv, pub_b64, fp, "stress_login_user")
412
413 start = time.perf_counter()
414 for _ in range(50):
415 result = await _login(client, priv, pub_b64, fp)
416 assert result["handle"] == "stress_login_user"
417 elapsed = time.perf_counter() - start
418 assert elapsed < 10.0, f"50 logins took {elapsed:.2f}s — too slow"
419
420
421 async def test_ten_sequential_logins_with_fresh_challenges_all_succeed(
422 client: AsyncClient, db_session: AsyncSession
423 ) -> None:
424 """10 sequential logins, each with a fresh challenge, must all succeed.
425
426 Simulates the realistic scenario where a user logs in from the same key
427 multiple times (e.g. refreshing a session token) using a fresh challenge
428 each time. True concurrent requests share the test DB session and would
429 deadlock — the sequential variant tests the same correctness property.
430 """
431 priv, _, pub_b64, fp = _kp()
432 await _register(client, priv, pub_b64, fp, "sequential_login_user")
433
434 for i in range(10):
435 r_challenge = await client.post("/api/auth/challenge", json={"fingerprint": fp})
436 assert r_challenge.status_code == 200, f"Login {i}: challenge failed"
437 ct = r_challenge.json()["challenge_token"]
438 r_verify = await client.post("/api/auth/verify", json={
439 "challenge_token": ct,
440 "public_key_b64": pub_b64,
441 "signature_b64": _sign(priv, _nonce(ct)),
442 })
443 assert r_verify.status_code == 200, f"Login {i}: verify failed: {r_verify.text}"
444 assert r_verify.json()["handle"] == "sequential_login_user"
445
446
447 # ---------------------------------------------------------------------------
448 # No 500 errors anywhere
449 # ---------------------------------------------------------------------------
450
451
452 @pytest.mark.parametrize("endpoint,payload", [
453 ("/api/auth/challenge", {}),
454 ("/api/auth/challenge", {"fingerprint": None}),
455 ("/api/auth/challenge", {"fingerprint": 12345}),
456 ("/api/auth/verify", {}),
457 ("/api/auth/verify", {"challenge_token": None, "public_key_b64": None, "signature_b64": None}),
458 ("/api/auth/verify", {"challenge_token": "", "public_key_b64": "", "signature_b64": ""}),
459 ])
460 async def test_garbage_inputs_never_cause_500(
461 endpoint: str,
462 payload: JSONObject,
463 client: AsyncClient,
464 db_session: AsyncSession,
465 ) -> None:
466 """Every garbage input must return 4xx, never 5xx."""
467 resp = await client.post(endpoint, json=payload)
468 assert resp.status_code < 500, (
469 f"POST {endpoint} with {payload!r} returned {resp.status_code}: {resp.text}"
470 )