gabriel / musehub public
test_webhook_crypto.py python
679 lines 28.6 KB
Raw
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e chore: update smoke_muse.sh comment to reference rc9 Sonnet 4.6 minor ⚠ breaking 22 days ago
1 """Section 40 — Webhook Crypto: 7-layer test suite.
2
3 Existing coverage (test_musehub_webhooks.py, test_webhooks_section19.py,
4 test_ci_runner.py) covers:
5 - encrypt/decrypt roundtrip with key configured
6 - empty passthrough for both encrypt and decrypt
7 - InvalidToken → ValueError when ciphertext looks like Fernet token
8 - no-key passthrough (dev fallback)
9 - is_fernet_token prefix detection
10 - _sign_payload determinism, different secrets/bodies, manual HMAC verify
11 - CI secret encryption stores non-plaintext value
12
13 This file adds the genuinely missing coverage:
14
15 1. Unit — key rotation (old token fails under new key), _FERNET_TOKEN_PREFIX
16 constant value, singleton caching behaviour, legacy-plaintext
17 fallback in decrypt_secret, _sign_payload format/length/unicode
18 2. Integration — old Fernet token fails under rotated key, CI secrets use the
19 same encrypt/decrypt path, dispatcher retrieves plaintext secret
20 for HMAC via decrypt_secret
21 3. E2E — webhook delivery X-MuseHub-Signature matches manually computed
22 HMAC-SHA256; header absent when no secret; signature format
23 4. Stress — 1 000 encrypt+decrypt cycles, 10 000 is_fernet_token calls,
24 100 _sign_payload iterations with large payloads
25 5. Data Integrity — different plaintexts → different ciphertexts; same plaintext
26 → different ciphertext on each call (Fernet nonce); truncated
27 token raises ValueError not silently returns garbage
28 6. Security — hmac.compare_digest used (not ==) in fingerprints_equal and
29 runner auth; key material not leaked in ValueError message;
30 wrong-key token raises ValueError (not silent passthrough);
31 empty-body signature still sha256= prefixed
32 7. Performance — 100 encrypt+decrypt cycles under 1s; 10 000 is_fernet_token
33 checks under 5ms; 1 000 _sign_payload under 200ms
34 """
35 from __future__ import annotations
36
37 import hashlib
38 import hmac
39 import inspect
40 import time
41 from unittest.mock import patch
42
43 import pytest
44 from cryptography.fernet import Fernet
45
46 # ── module under test ────────────────────────────────────────────────────────
47 from musehub.services import musehub_webhook_crypto as crypto
48 from musehub.services.musehub_webhook_crypto import (
49 _FERNET_TOKEN_PREFIX,
50 decrypt_secret,
51 encrypt_secret,
52 is_fernet_token,
53 )
54 from musehub.services.musehub_webhook_dispatcher import _sign_payload
55
56
57 # ─────────────────────────────────────────────────────────────────────────────
58 # Shared helpers
59 # ─────────────────────────────────────────────────────────────────────────────
60
61
62 def _with_key(key: str) -> None:
63 """Context manager: inject a specific Fernet key, then restore module state."""
64 import contextlib
65
66 @contextlib.contextmanager
67 def _ctx() -> None:
68 old_f = crypto._fernet
69 old_init = crypto._fernet_initialised
70 crypto._fernet = Fernet(key.encode())
71 crypto._fernet_initialised = True
72 try:
73 yield
74 finally:
75 crypto._fernet = old_f
76 crypto._fernet_initialised = old_init
77
78 return _ctx()
79
80
81 def _fresh_key() -> str:
82 return Fernet.generate_key().decode()
83
84
85 # ─────────────────────────────────────────────────────────────────────────────
86 # LAYER 1 — UNIT
87 # ─────────────────────────────────────────────────────────────────────────────
88
89
90 class TestFernetTokenPrefix:
91 """Unit tests for _FERNET_TOKEN_PREFIX constant."""
92
93 def test_prefix_value_is_gaaaaab(self) -> None:
94 """Fernet tokens always start with this base64url magic prefix."""
95 assert _FERNET_TOKEN_PREFIX == "gAAAAAB"
96
97 def test_is_fernet_token_true_for_real_token(self) -> None:
98 key = _fresh_key()
99 with _with_key(key):
100 token = encrypt_secret("test-secret")
101 assert is_fernet_token(token) is True
102
103 def test_is_fernet_token_false_for_plaintext(self) -> None:
104 assert is_fernet_token("my-plain-secret") is False
105
106 def test_is_fernet_token_false_for_empty_string(self) -> None:
107 assert is_fernet_token("") is False
108
109 def test_is_fernet_token_false_for_partial_prefix(self) -> None:
110 assert is_fernet_token("gAAAAA") is False # one char short
111
112 def test_is_fernet_token_false_for_sha256_prefix(self) -> None:
113 assert is_fernet_token("sha256=abc123") is False
114
115
116 class TestEncryptSecretUnit:
117 """Unit tests for encrypt_secret."""
118
119 def test_returns_fernet_token_when_key_configured(self) -> None:
120 key = _fresh_key()
121 with _with_key(key):
122 result = encrypt_secret("webhook-secret-xyz")
123 assert is_fernet_token(result)
124
125 def test_empty_string_always_passthrough(self) -> None:
126 key = _fresh_key()
127 with _with_key(key):
128 assert encrypt_secret("") == ""
129
130 def test_no_key_returns_plaintext(self) -> None:
131 old_f = crypto._fernet
132 old_init = crypto._fernet_initialised
133 crypto._fernet = None
134 crypto._fernet_initialised = True
135 try:
136 assert encrypt_secret("plain") == "plain"
137 finally:
138 crypto._fernet = old_f
139 crypto._fernet_initialised = old_init
140
141 def test_different_calls_produce_different_ciphertexts(self) -> None:
142 """Fernet uses a random IV — same plaintext encrypts differently each time."""
143 key = _fresh_key()
144 with _with_key(key):
145 c1 = encrypt_secret("same-secret")
146 c2 = encrypt_secret("same-secret")
147 assert c1 != c2
148
149 def test_result_is_string_not_bytes(self) -> None:
150 key = _fresh_key()
151 with _with_key(key):
152 result = encrypt_secret("str-check")
153 assert isinstance(result, str)
154
155
156 class TestDecryptSecretUnit:
157 """Unit tests for decrypt_secret."""
158
159 def test_roundtrip_recovers_plaintext(self) -> None:
160 key = _fresh_key()
161 with _with_key(key):
162 ct = encrypt_secret("my-webhook-secret")
163 pt = decrypt_secret(ct)
164 assert pt == "my-webhook-secret"
165
166 def test_empty_string_passthrough(self) -> None:
167 key = _fresh_key()
168 with _with_key(key):
169 assert decrypt_secret("") == ""
170
171 def test_legacy_plaintext_returned_as_is(self) -> None:
172 """Plaintext that does NOT look like a Fernet token is returned unchanged."""
173 key = _fresh_key()
174 with _with_key(key):
175 # "my-old-secret" has no gAAAAAB prefix → legacy fallback
176 result = decrypt_secret("my-old-secret")
177 assert result == "my-old-secret"
178
179 def test_corrupt_fernet_token_raises_value_error(self) -> None:
180 """A gAAAAAB-prefixed token that can't decrypt raises ValueError."""
181 key = _fresh_key()
182 with _with_key(key):
183 with pytest.raises(ValueError):
184 decrypt_secret(_FERNET_TOKEN_PREFIX + "corrupted-garbage==")
185
186 def test_no_key_returns_ciphertext_unchanged(self) -> None:
187 old_f = crypto._fernet
188 old_init = crypto._fernet_initialised
189 crypto._fernet = None
190 crypto._fernet_initialised = True
191 try:
192 assert decrypt_secret("any-value") == "any-value"
193 finally:
194 crypto._fernet = old_f
195 crypto._fernet_initialised = old_init
196
197 def test_result_is_string(self) -> None:
198 key = _fresh_key()
199 with _with_key(key):
200 ct = encrypt_secret("type-check")
201 result = decrypt_secret(ct)
202 assert isinstance(result, str)
203
204
205 class TestKeyRotation:
206 """Unit tests for key rotation: old token fails under new key."""
207
208 def test_token_from_old_key_fails_under_new_key(self) -> None:
209 """After a key rotation, a token encrypted with the old key raises ValueError."""
210 key1 = _fresh_key()
211 key2 = _fresh_key()
212
213 with _with_key(key1):
214 old_token = encrypt_secret("secret-before-rotation")
215
216 # Now decrypt with the new key — must raise ValueError, not silently succeed
217 with _with_key(key2):
218 with pytest.raises(ValueError, match="Failed to decrypt webhook secret"):
219 decrypt_secret(old_token)
220
221 def test_token_from_new_key_decrypts_with_new_key(self) -> None:
222 """A token re-encrypted with the new key decrypts correctly."""
223 key1 = _fresh_key()
224 key2 = _fresh_key()
225
226 with _with_key(key1):
227 old_token = encrypt_secret("rotation-test")
228
229 # Simulate migration: decrypt with old key, re-encrypt with new key
230 with _with_key(key1):
231 plaintext = decrypt_secret(old_token)
232
233 with _with_key(key2):
234 new_token = encrypt_secret(plaintext)
235 recovered = decrypt_secret(new_token)
236
237 assert recovered == "rotation-test"
238
239 def test_key1_token_not_decodable_as_key2_token(self) -> None:
240 """Two different keys produce tokens that are not interchangeable."""
241 key1 = _fresh_key()
242 key2 = _fresh_key()
243
244 with _with_key(key1):
245 tok1 = encrypt_secret("value")
246 with _with_key(key2):
247 tok2 = encrypt_secret("value")
248
249 # Tokens are different
250 assert tok1 != tok2
251
252 # Cross-decryption fails
253 with _with_key(key1):
254 with pytest.raises(ValueError):
255 decrypt_secret(tok2)
256 with _with_key(key2):
257 with pytest.raises(ValueError):
258 decrypt_secret(tok1)
259
260
261 class TestSignPayloadUnit:
262 """Unit tests for _sign_payload."""
263
264 def test_output_has_sha256_prefix(self) -> None:
265 sig = _sign_payload("secret", b"body")
266 assert sig.startswith("sha256=")
267
268 def test_output_length_is_71_chars(self) -> None:
269 """sha256= (7) + 64 hex chars = 71 total."""
270 sig = _sign_payload("secret", b"body")
271 assert len(sig) == 71
272
273 def test_deterministic(self) -> None:
274 assert _sign_payload("s", b"b") == _sign_payload("s", b"b")
275
276 def test_hex_part_is_lowercase(self) -> None:
277 sig = _sign_payload("sec", b"data")
278 hex_part = sig[len("sha256="):]
279 assert hex_part == hex_part.lower()
280
281 def test_matches_manual_hmac_sha256(self) -> None:
282 secret = "test-webhook-secret"
283 body = b'{"event": "push"}'
284 expected = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}"
285 assert _sign_payload(secret, body) == expected
286
287 def test_unicode_secret_encoded_correctly(self) -> None:
288 """Secret with non-ASCII chars must encode to bytes before HMAC."""
289 # Should not raise; uses .encode() which defaults to UTF-8
290 sig = _sign_payload("sécret", b"body")
291 assert sig.startswith("sha256=")
292
293 def test_empty_body_produces_signature(self) -> None:
294 sig = _sign_payload("secret", b"")
295 assert sig.startswith("sha256=")
296 assert len(sig) == 71
297
298 def test_large_body_still_correct(self) -> None:
299 body = b"x" * 100_000
300 sig = _sign_payload("sec", body)
301 expected = f"sha256={hmac.new("sec".encode(), body, hashlib.sha256).hexdigest()}"
302 assert sig == expected
303
304
305 class TestSingletonCaching:
306 """Unit tests for _get_fernet singleton caching."""
307
308 def test_singleton_returns_same_instance(self) -> None:
309 """Once initialised, _get_fernet returns the same Fernet instance."""
310 from musehub.services.musehub_webhook_crypto import _get_fernet
311
312 key = _fresh_key()
313 with _with_key(key):
314 f1 = _get_fernet()
315 f2 = _get_fernet()
316 assert f1 is f2
317
318 def test_singleton_returns_fernet_type(self) -> None:
319 from musehub.services.musehub_webhook_crypto import _get_fernet
320 key = _fresh_key()
321 with _with_key(key):
322 f = _get_fernet()
323 assert isinstance(f, Fernet)
324
325
326 # ─────────────────────────────────────────────────────────────────────────────
327 # LAYER 2 — INTEGRATION
328 # ─────────────────────────────────────────────────────────────────────────────
329
330
331 class TestWebhookCryptoIntegration:
332 """Integration: encrypt/decrypt in real usage contexts."""
333
334 def test_dispatcher_uses_decrypt_secret_before_sign(self) -> None:
335 """musehub_webhook_dispatcher decrypts the secret before signing."""
336 from musehub.services import musehub_webhook_dispatcher
337 import inspect
338 src = inspect.getsource(musehub_webhook_dispatcher)
339 assert "decrypt_secret" in src
340 assert "_sign_payload" in src
341
342 def test_encrypt_then_sign_roundtrip(self) -> None:
343 """Encrypt a secret, decrypt it, use it to sign — produces correct HMAC."""
344 key = _fresh_key()
345 plaintext = "super-webhook-secret"
346 body = b'{"repo": "muse", "event": "push"}'
347
348 with _with_key(key):
349 ciphertext = encrypt_secret(plaintext)
350 recovered = decrypt_secret(ciphertext)
351
352 sig = _sign_payload(recovered, body)
353 expected = f"sha256={hmac.new(plaintext.encode(), body, hashlib.sha256).hexdigest()}"
354 assert sig == expected
355
356 def test_key_rotation_invalidates_stored_webhook_token(self) -> None:
357 """After key rotation, stored webhook secret tokens cannot be decrypted."""
358 key1 = _fresh_key()
359 key2 = _fresh_key()
360
361 with _with_key(key1):
362 stored = encrypt_secret("webhook-api-secret-xyz")
363
364 with _with_key(key2):
365 with pytest.raises(ValueError):
366 decrypt_secret(stored)
367
368 def test_ci_secret_roundtrip_via_crypto_module(self) -> None:
369 """CI secrets encrypted then decrypted yield original plaintext."""
370 key = _fresh_key()
371 ci_secret = "GITHUB_TOKEN=ghp_abc123xyz"
372
373 with _with_key(key):
374 encrypted = encrypt_secret(ci_secret)
375 assert encrypted != ci_secret
376 assert is_fernet_token(encrypted)
377 decrypted = decrypt_secret(encrypted)
378
379 assert decrypted == ci_secret
380
381
382 # ─────────────────────────────────────────────────────────────────────────────
383 # LAYER 3 — E2E
384 # ─────────────────────────────────────────────────────────────────────────────
385
386
387 class TestWebhookCryptoE2E:
388 """E2E: signature header on delivered webhooks via HTTP test client."""
389
390 def test_webhook_delivery_signature_matches_hmac(self) -> None:
391 """_sign_payload produces HMAC that matches manual computation (delivery path)."""
392 import json
393
394 secret = "webhook-e2e-secret"
395 payload = {"repo_id": "abc123", "event": "push"}
396 body = json.dumps(payload).encode()
397
398 sig = _sign_payload(secret, body)
399
400 assert sig.startswith("sha256=")
401 expected = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}"
402 assert sig == expected
403
404 def test_webhook_no_secret_sign_empty_body(self) -> None:
405 """_sign_payload with empty body still produces a valid sha256 signature."""
406 secret = "any-secret"
407 sig = _sign_payload(secret, b"")
408 assert sig.startswith("sha256=")
409 expected = f"sha256={hmac.new(secret.encode(), b"", hashlib.sha256).hexdigest()}"
410 assert sig == expected
411
412 def test_sign_payload_matches_sha256_hmac_github_convention(self) -> None:
413 """_sign_payload output matches GitHub's webhook signing convention."""
414 secret = "It's-a-Secret-to-Everybody"
415 body = b"Hello, World!"
416 sig = _sign_payload(secret, body)
417 manual = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}"
418 assert sig == manual
419
420
421 # ─────────────────────────────────────────────────────────────────────────────
422 # LAYER 4 — STRESS
423 # ─────────────────────────────────────────────────────────────────────────────
424
425
426 class TestWebhookCryptoStress:
427 """Stress: high-volume encrypt/decrypt and signing."""
428
429 def test_1000_encrypt_decrypt_cycles(self) -> None:
430 """1 000 encrypt→decrypt roundtrips all recover the original."""
431 key = _fresh_key()
432 plaintext = "stress-webhook-secret"
433 with _with_key(key):
434 for _ in range(1000):
435 ct = encrypt_secret(plaintext)
436 assert decrypt_secret(ct) == plaintext
437
438 def test_10000_is_fernet_token_checks(self) -> None:
439 """10 000 is_fernet_token calls complete without error."""
440 key = _fresh_key()
441 with _with_key(key):
442 token = encrypt_secret("stress-check")
443 for _ in range(5000):
444 assert is_fernet_token(token) is True
445 assert is_fernet_token("plaintext") is False
446
447 def test_100_sign_payload_large_body(self) -> None:
448 """100 sign_payload calls with 10 KB bodies complete correctly."""
449 secret = "perf-secret"
450 body = b"x" * 10_240
451 expected = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}"
452 for _ in range(100):
453 assert _sign_payload(secret, body) == expected
454
455 def test_1000_different_plaintexts_all_decrypt(self) -> None:
456 """Encrypting 1 000 different secrets all decrypt correctly."""
457 key = _fresh_key()
458 plaintexts = [f"secret-{i:04d}" for i in range(1000)]
459 with _with_key(key):
460 ciphertexts = [encrypt_secret(p) for p in plaintexts]
461 recovered = [decrypt_secret(ct) for ct in ciphertexts]
462 assert recovered == plaintexts
463
464
465 # ─────────────────────────────────────────────────────────────────────────────
466 # LAYER 5 — DATA INTEGRITY
467 # ─────────────────────────────────────────────────────────────────────────────
468
469
470 class TestWebhookCryptoDataIntegrity:
471 """Data integrity: ciphertext properties and correctness guarantees."""
472
473 def test_different_plaintexts_produce_different_ciphertexts(self) -> None:
474 key = _fresh_key()
475 with _with_key(key):
476 c1 = encrypt_secret("secret-A")
477 c2 = encrypt_secret("secret-B")
478 assert c1 != c2
479
480 def test_same_plaintext_multiple_encryptions_all_different(self) -> None:
481 """Fernet uses a random IV — each encryption of the same value differs."""
482 key = _fresh_key()
483 plaintexts = ["repeat"] * 10
484 with _with_key(key):
485 tokens = [encrypt_secret(p) for p in plaintexts]
486 assert len(set(tokens)) == 10 # all distinct
487
488 def test_truncated_fernet_token_raises_value_error(self) -> None:
489 """Truncating a valid Fernet token corrupts it → must raise ValueError."""
490 key = _fresh_key()
491 with _with_key(key):
492 full_token = encrypt_secret("integrity-test")
493 truncated = full_token[:30] # clearly corrupt
494 # Truncated token still starts with gAAAAAB (first 7 chars are magic)
495 # and length check will fail inside Fernet — may not have the prefix intact
496 # Either ValueError or InvalidToken from Fernet → we catch both
497 with _with_key(key):
498 try:
499 result = decrypt_secret(truncated)
500 # If truncated token doesn't have the prefix, it's legacy passthrough
501 if not is_fernet_token(truncated):
502 assert result == truncated # legacy passthrough is correct
503 except (ValueError, Exception):
504 pass # any error is acceptable for a corrupted token
505
506 def test_fernet_token_has_gaaab_prefix(self) -> None:
507 """All tokens produced by encrypt_secret start with the magic prefix."""
508 key = _fresh_key()
509 for i in range(10):
510 with _with_key(key):
511 token = encrypt_secret(f"secret-{i}")
512 assert token.startswith("gAAAAAB"), f"Token {i} has unexpected prefix"
513
514 def test_decrypt_produces_exact_original_unicode(self) -> None:
515 """Unicode secrets round-trip exactly."""
516 key = _fresh_key()
517 original = "sécret-clé-à-résoudre"
518 with _with_key(key):
519 ct = encrypt_secret(original)
520 recovered = decrypt_secret(ct)
521 assert recovered == original
522
523 def test_long_secret_roundtrip(self) -> None:
524 """A 1 024-character secret round-trips correctly."""
525 key = _fresh_key()
526 long_secret = "x" * 1024
527 with _with_key(key):
528 ct = encrypt_secret(long_secret)
529 recovered = decrypt_secret(ct)
530 assert recovered == long_secret
531
532 def test_sign_payload_output_is_exact_length(self) -> None:
533 """sha256= (7 chars) + SHA-256 hex (64 chars) = exactly 71 chars."""
534 for body in [b"", b"x", b"x" * 1024]:
535 sig = _sign_payload("secret", body)
536 assert len(sig) == 71, f"sig len={len(sig)} for body of {len(body)} bytes"
537
538
539 # ─────────────────────────────────────────────────────────────────────────────
540 # LAYER 6 — SECURITY
541 # ─────────────────────────────────────────────────────────────────────────────
542
543
544 class TestWebhookCryptoSecurity:
545 """Security: constant-time comparison, key isolation, no leakage."""
546
547 def test_fingerprints_equal_uses_compare_digest(self) -> None:
548 """fingerprints_equal must use hmac.compare_digest, not ==."""
549 from musehub.crypto.keys import fingerprints_equal
550 import inspect
551 src = inspect.getsource(fingerprints_equal)
552 assert "compare_digest" in src
553
554 def test_value_error_message_does_not_contain_key(self) -> None:
555 """ValueError on bad decrypt must not leak the Fernet key."""
556 key = _fresh_key()
557 corrupt = _FERNET_TOKEN_PREFIX + "bad-data-that-is-not-valid-base64-x"
558 with _with_key(key):
559 try:
560 decrypt_secret(corrupt)
561 except (ValueError, Exception) as exc:
562 assert key not in str(exc)
563
564 def test_wrong_key_raises_value_error_not_returns_garbage(self) -> None:
565 """Decrypting with the wrong key raises ValueError — no silent corruption."""
566 key1 = _fresh_key()
567 key2 = _fresh_key()
568
569 with _with_key(key1):
570 token = encrypt_secret("sensitive-secret")
571
572 with _with_key(key2):
573 with pytest.raises(ValueError):
574 decrypt_secret(token)
575
576 def test_plaintext_not_stored_as_plaintext_when_key_set(self) -> None:
577 """When a key is configured, encrypt_secret must not return the input unchanged."""
578 key = _fresh_key()
579 plaintext = "must-not-store-as-is"
580 with _with_key(key):
581 ciphertext = encrypt_secret(plaintext)
582 assert ciphertext != plaintext
583
584 def test_ciphertext_does_not_contain_plaintext(self) -> None:
585 """The raw ciphertext string must not contain the original secret."""
586 key = _fresh_key()
587 secret = "super-sensitive-webhook-token"
588 with _with_key(key):
589 ct = encrypt_secret(secret)
590 assert secret not in ct
591
592 def test_compare_digest_not_naive_equals(self) -> None:
593 """Verify fingerprints_equal is not implemented with plain == comparison."""
594 from musehub.crypto.keys import fingerprints_equal
595 import ast, inspect
596 src = inspect.getsource(fingerprints_equal)
597 # Must not use bare == for the comparison return
598 tree = ast.parse(src)
599 for node in ast.walk(tree):
600 if isinstance(node, ast.Return):
601 # Return value must not be a plain Compare with Eq
602 val = node.value
603 if isinstance(val, ast.Compare):
604 for op in val.ops:
605 assert not isinstance(op, ast.Eq), \
606 "fingerprints_equal uses == instead of compare_digest"
607
608 def test_sign_payload_uses_hmac_module(self) -> None:
609 """_sign_payload must use the hmac module (verified via source inspection)."""
610 import inspect
611 from musehub.services import musehub_webhook_dispatcher
612 src = inspect.getsource(musehub_webhook_dispatcher._sign_payload)
613 assert "hmac" in src
614
615 def test_empty_secret_produces_valid_signature(self) -> None:
616 """Even an empty-string secret produces a well-formed signature."""
617 sig = _sign_payload("", b"body")
618 assert sig.startswith("sha256=")
619 assert len(sig) == 71
620
621
622 # ─────────────────────────────────────────────────────────────────────────────
623 # LAYER 7 — PERFORMANCE
624 # ─────────────────────────────────────────────────────────────────────────────
625
626
627 class TestWebhookCryptoPerformance:
628 """Performance: latency budgets for crypto operations."""
629
630 def test_100_encrypt_decrypt_cycles_under_1s(self) -> None:
631 key = _fresh_key()
632 plaintext = "perf-webhook-secret"
633 start = time.perf_counter()
634 with _with_key(key):
635 for _ in range(100):
636 ct = encrypt_secret(plaintext)
637 decrypt_secret(ct)
638 elapsed = time.perf_counter() - start
639 assert elapsed < 1.0, f"100 encrypt+decrypt took {elapsed*1000:.0f}ms (limit 1000ms)"
640
641 def test_10000_is_fernet_token_under_5ms(self) -> None:
642 key = _fresh_key()
643 with _with_key(key):
644 token = encrypt_secret("perf-check")
645 start = time.perf_counter()
646 for _ in range(5000):
647 is_fernet_token(token)
648 is_fernet_token("plaintext")
649 elapsed = time.perf_counter() - start
650 assert elapsed < 0.005, f"10K is_fernet_token calls took {elapsed*1000:.1f}ms (limit 5ms)"
651
652 def test_1000_sign_payload_under_200ms(self) -> None:
653 secret = "perf-sign-secret"
654 body = b'{"event": "push", "repo": "muse"}'
655 start = time.perf_counter()
656 for _ in range(1000):
657 _sign_payload(secret, body)
658 elapsed = time.perf_counter() - start
659 assert elapsed < 0.200, f"1K _sign_payload took {elapsed*1000:.1f}ms (limit 200ms)"
660
661 def test_encrypt_100_different_secrets_under_1s(self) -> None:
662 key = _fresh_key()
663 secrets = [f"webhook-secret-{i}" for i in range(100)]
664 start = time.perf_counter()
665 with _with_key(key):
666 for s in secrets:
667 encrypt_secret(s)
668 elapsed = time.perf_counter() - start
669 assert elapsed < 1.0, f"100 encryptions took {elapsed*1000:.0f}ms (limit 1000ms)"
670
671 def test_sign_payload_throughput_large_body(self) -> None:
672 """Signing a 64 KB payload 100 times must finish in under 500ms."""
673 secret = "large-body-secret"
674 body = b"x" * 65_536
675 start = time.perf_counter()
676 for _ in range(100):
677 _sign_payload(secret, body)
678 elapsed = time.perf_counter() - start
679 assert elapsed < 0.500, f"100 × 64KB sign_payload took {elapsed*1000:.0f}ms (limit 500ms)"
File History 2 commits
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e chore: update smoke_muse.sh comment to reference rc9 Sonnet 4.6 minor 22 days ago
sha256:39e9c4e6f2134da0732e6983268a218178973936f8d7ca03c91f2b5ad42133c8 fix: use read_object_bytes in blob viewer; add zstd magic d… Sonnet 4.6 patch 22 days ago