test_webhook_crypto.py
python
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e
chore: update smoke_muse.sh comment to reference rc9
Sonnet 4.6
minor
⚠ breaking
22 days ago
| 1 | """Section 40 — Webhook Crypto: 7-layer test suite. |
| 2 | |
| 3 | Existing coverage (test_musehub_webhooks.py, test_webhooks_section19.py, |
| 4 | test_ci_runner.py) covers: |
| 5 | - encrypt/decrypt roundtrip with key configured |
| 6 | - empty passthrough for both encrypt and decrypt |
| 7 | - InvalidToken → ValueError when ciphertext looks like Fernet token |
| 8 | - no-key passthrough (dev fallback) |
| 9 | - is_fernet_token prefix detection |
| 10 | - _sign_payload determinism, different secrets/bodies, manual HMAC verify |
| 11 | - CI secret encryption stores non-plaintext value |
| 12 | |
| 13 | This file adds the genuinely missing coverage: |
| 14 | |
| 15 | 1. Unit — key rotation (old token fails under new key), _FERNET_TOKEN_PREFIX |
| 16 | constant value, singleton caching behaviour, legacy-plaintext |
| 17 | fallback in decrypt_secret, _sign_payload format/length/unicode |
| 18 | 2. Integration — old Fernet token fails under rotated key, CI secrets use the |
| 19 | same encrypt/decrypt path, dispatcher retrieves plaintext secret |
| 20 | for HMAC via decrypt_secret |
| 21 | 3. E2E — webhook delivery X-MuseHub-Signature matches manually computed |
| 22 | HMAC-SHA256; header absent when no secret; signature format |
| 23 | 4. Stress — 1 000 encrypt+decrypt cycles, 10 000 is_fernet_token calls, |
| 24 | 100 _sign_payload iterations with large payloads |
| 25 | 5. Data Integrity — different plaintexts → different ciphertexts; same plaintext |
| 26 | → different ciphertext on each call (Fernet nonce); truncated |
| 27 | token raises ValueError not silently returns garbage |
| 28 | 6. Security — hmac.compare_digest used (not ==) in fingerprints_equal and |
| 29 | runner auth; key material not leaked in ValueError message; |
| 30 | wrong-key token raises ValueError (not silent passthrough); |
| 31 | empty-body signature still sha256= prefixed |
| 32 | 7. Performance — 100 encrypt+decrypt cycles under 1s; 10 000 is_fernet_token |
| 33 | checks under 5ms; 1 000 _sign_payload under 200ms |
| 34 | """ |
| 35 | from __future__ import annotations |
| 36 | |
| 37 | import hashlib |
| 38 | import hmac |
| 39 | import inspect |
| 40 | import time |
| 41 | from unittest.mock import patch |
| 42 | |
| 43 | import pytest |
| 44 | from cryptography.fernet import Fernet |
| 45 | |
| 46 | # ── module under test ──────────────────────────────────────────────────────── |
| 47 | from musehub.services import musehub_webhook_crypto as crypto |
| 48 | from musehub.services.musehub_webhook_crypto import ( |
| 49 | _FERNET_TOKEN_PREFIX, |
| 50 | decrypt_secret, |
| 51 | encrypt_secret, |
| 52 | is_fernet_token, |
| 53 | ) |
| 54 | from musehub.services.musehub_webhook_dispatcher import _sign_payload |
| 55 | |
| 56 | |
| 57 | # ───────────────────────────────────────────────────────────────────────────── |
| 58 | # Shared helpers |
| 59 | # ───────────────────────────────────────────────────────────────────────────── |
| 60 | |
| 61 | |
| 62 | def _with_key(key: str) -> None: |
| 63 | """Context manager: inject a specific Fernet key, then restore module state.""" |
| 64 | import contextlib |
| 65 | |
| 66 | @contextlib.contextmanager |
| 67 | def _ctx() -> None: |
| 68 | old_f = crypto._fernet |
| 69 | old_init = crypto._fernet_initialised |
| 70 | crypto._fernet = Fernet(key.encode()) |
| 71 | crypto._fernet_initialised = True |
| 72 | try: |
| 73 | yield |
| 74 | finally: |
| 75 | crypto._fernet = old_f |
| 76 | crypto._fernet_initialised = old_init |
| 77 | |
| 78 | return _ctx() |
| 79 | |
| 80 | |
| 81 | def _fresh_key() -> str: |
| 82 | return Fernet.generate_key().decode() |
| 83 | |
| 84 | |
| 85 | # ───────────────────────────────────────────────────────────────────────────── |
| 86 | # LAYER 1 — UNIT |
| 87 | # ───────────────────────────────────────────────────────────────────────────── |
| 88 | |
| 89 | |
| 90 | class TestFernetTokenPrefix: |
| 91 | """Unit tests for _FERNET_TOKEN_PREFIX constant.""" |
| 92 | |
| 93 | def test_prefix_value_is_gaaaaab(self) -> None: |
| 94 | """Fernet tokens always start with this base64url magic prefix.""" |
| 95 | assert _FERNET_TOKEN_PREFIX == "gAAAAAB" |
| 96 | |
| 97 | def test_is_fernet_token_true_for_real_token(self) -> None: |
| 98 | key = _fresh_key() |
| 99 | with _with_key(key): |
| 100 | token = encrypt_secret("test-secret") |
| 101 | assert is_fernet_token(token) is True |
| 102 | |
| 103 | def test_is_fernet_token_false_for_plaintext(self) -> None: |
| 104 | assert is_fernet_token("my-plain-secret") is False |
| 105 | |
| 106 | def test_is_fernet_token_false_for_empty_string(self) -> None: |
| 107 | assert is_fernet_token("") is False |
| 108 | |
| 109 | def test_is_fernet_token_false_for_partial_prefix(self) -> None: |
| 110 | assert is_fernet_token("gAAAAA") is False # one char short |
| 111 | |
| 112 | def test_is_fernet_token_false_for_sha256_prefix(self) -> None: |
| 113 | assert is_fernet_token("sha256=abc123") is False |
| 114 | |
| 115 | |
| 116 | class TestEncryptSecretUnit: |
| 117 | """Unit tests for encrypt_secret.""" |
| 118 | |
| 119 | def test_returns_fernet_token_when_key_configured(self) -> None: |
| 120 | key = _fresh_key() |
| 121 | with _with_key(key): |
| 122 | result = encrypt_secret("webhook-secret-xyz") |
| 123 | assert is_fernet_token(result) |
| 124 | |
| 125 | def test_empty_string_always_passthrough(self) -> None: |
| 126 | key = _fresh_key() |
| 127 | with _with_key(key): |
| 128 | assert encrypt_secret("") == "" |
| 129 | |
| 130 | def test_no_key_returns_plaintext(self) -> None: |
| 131 | old_f = crypto._fernet |
| 132 | old_init = crypto._fernet_initialised |
| 133 | crypto._fernet = None |
| 134 | crypto._fernet_initialised = True |
| 135 | try: |
| 136 | assert encrypt_secret("plain") == "plain" |
| 137 | finally: |
| 138 | crypto._fernet = old_f |
| 139 | crypto._fernet_initialised = old_init |
| 140 | |
| 141 | def test_different_calls_produce_different_ciphertexts(self) -> None: |
| 142 | """Fernet uses a random IV — same plaintext encrypts differently each time.""" |
| 143 | key = _fresh_key() |
| 144 | with _with_key(key): |
| 145 | c1 = encrypt_secret("same-secret") |
| 146 | c2 = encrypt_secret("same-secret") |
| 147 | assert c1 != c2 |
| 148 | |
| 149 | def test_result_is_string_not_bytes(self) -> None: |
| 150 | key = _fresh_key() |
| 151 | with _with_key(key): |
| 152 | result = encrypt_secret("str-check") |
| 153 | assert isinstance(result, str) |
| 154 | |
| 155 | |
| 156 | class TestDecryptSecretUnit: |
| 157 | """Unit tests for decrypt_secret.""" |
| 158 | |
| 159 | def test_roundtrip_recovers_plaintext(self) -> None: |
| 160 | key = _fresh_key() |
| 161 | with _with_key(key): |
| 162 | ct = encrypt_secret("my-webhook-secret") |
| 163 | pt = decrypt_secret(ct) |
| 164 | assert pt == "my-webhook-secret" |
| 165 | |
| 166 | def test_empty_string_passthrough(self) -> None: |
| 167 | key = _fresh_key() |
| 168 | with _with_key(key): |
| 169 | assert decrypt_secret("") == "" |
| 170 | |
| 171 | def test_legacy_plaintext_returned_as_is(self) -> None: |
| 172 | """Plaintext that does NOT look like a Fernet token is returned unchanged.""" |
| 173 | key = _fresh_key() |
| 174 | with _with_key(key): |
| 175 | # "my-old-secret" has no gAAAAAB prefix → legacy fallback |
| 176 | result = decrypt_secret("my-old-secret") |
| 177 | assert result == "my-old-secret" |
| 178 | |
| 179 | def test_corrupt_fernet_token_raises_value_error(self) -> None: |
| 180 | """A gAAAAAB-prefixed token that can't decrypt raises ValueError.""" |
| 181 | key = _fresh_key() |
| 182 | with _with_key(key): |
| 183 | with pytest.raises(ValueError): |
| 184 | decrypt_secret(_FERNET_TOKEN_PREFIX + "corrupted-garbage==") |
| 185 | |
| 186 | def test_no_key_returns_ciphertext_unchanged(self) -> None: |
| 187 | old_f = crypto._fernet |
| 188 | old_init = crypto._fernet_initialised |
| 189 | crypto._fernet = None |
| 190 | crypto._fernet_initialised = True |
| 191 | try: |
| 192 | assert decrypt_secret("any-value") == "any-value" |
| 193 | finally: |
| 194 | crypto._fernet = old_f |
| 195 | crypto._fernet_initialised = old_init |
| 196 | |
| 197 | def test_result_is_string(self) -> None: |
| 198 | key = _fresh_key() |
| 199 | with _with_key(key): |
| 200 | ct = encrypt_secret("type-check") |
| 201 | result = decrypt_secret(ct) |
| 202 | assert isinstance(result, str) |
| 203 | |
| 204 | |
| 205 | class TestKeyRotation: |
| 206 | """Unit tests for key rotation: old token fails under new key.""" |
| 207 | |
| 208 | def test_token_from_old_key_fails_under_new_key(self) -> None: |
| 209 | """After a key rotation, a token encrypted with the old key raises ValueError.""" |
| 210 | key1 = _fresh_key() |
| 211 | key2 = _fresh_key() |
| 212 | |
| 213 | with _with_key(key1): |
| 214 | old_token = encrypt_secret("secret-before-rotation") |
| 215 | |
| 216 | # Now decrypt with the new key — must raise ValueError, not silently succeed |
| 217 | with _with_key(key2): |
| 218 | with pytest.raises(ValueError, match="Failed to decrypt webhook secret"): |
| 219 | decrypt_secret(old_token) |
| 220 | |
| 221 | def test_token_from_new_key_decrypts_with_new_key(self) -> None: |
| 222 | """A token re-encrypted with the new key decrypts correctly.""" |
| 223 | key1 = _fresh_key() |
| 224 | key2 = _fresh_key() |
| 225 | |
| 226 | with _with_key(key1): |
| 227 | old_token = encrypt_secret("rotation-test") |
| 228 | |
| 229 | # Simulate migration: decrypt with old key, re-encrypt with new key |
| 230 | with _with_key(key1): |
| 231 | plaintext = decrypt_secret(old_token) |
| 232 | |
| 233 | with _with_key(key2): |
| 234 | new_token = encrypt_secret(plaintext) |
| 235 | recovered = decrypt_secret(new_token) |
| 236 | |
| 237 | assert recovered == "rotation-test" |
| 238 | |
| 239 | def test_key1_token_not_decodable_as_key2_token(self) -> None: |
| 240 | """Two different keys produce tokens that are not interchangeable.""" |
| 241 | key1 = _fresh_key() |
| 242 | key2 = _fresh_key() |
| 243 | |
| 244 | with _with_key(key1): |
| 245 | tok1 = encrypt_secret("value") |
| 246 | with _with_key(key2): |
| 247 | tok2 = encrypt_secret("value") |
| 248 | |
| 249 | # Tokens are different |
| 250 | assert tok1 != tok2 |
| 251 | |
| 252 | # Cross-decryption fails |
| 253 | with _with_key(key1): |
| 254 | with pytest.raises(ValueError): |
| 255 | decrypt_secret(tok2) |
| 256 | with _with_key(key2): |
| 257 | with pytest.raises(ValueError): |
| 258 | decrypt_secret(tok1) |
| 259 | |
| 260 | |
| 261 | class TestSignPayloadUnit: |
| 262 | """Unit tests for _sign_payload.""" |
| 263 | |
| 264 | def test_output_has_sha256_prefix(self) -> None: |
| 265 | sig = _sign_payload("secret", b"body") |
| 266 | assert sig.startswith("sha256=") |
| 267 | |
| 268 | def test_output_length_is_71_chars(self) -> None: |
| 269 | """sha256= (7) + 64 hex chars = 71 total.""" |
| 270 | sig = _sign_payload("secret", b"body") |
| 271 | assert len(sig) == 71 |
| 272 | |
| 273 | def test_deterministic(self) -> None: |
| 274 | assert _sign_payload("s", b"b") == _sign_payload("s", b"b") |
| 275 | |
| 276 | def test_hex_part_is_lowercase(self) -> None: |
| 277 | sig = _sign_payload("sec", b"data") |
| 278 | hex_part = sig[len("sha256="):] |
| 279 | assert hex_part == hex_part.lower() |
| 280 | |
| 281 | def test_matches_manual_hmac_sha256(self) -> None: |
| 282 | secret = "test-webhook-secret" |
| 283 | body = b'{"event": "push"}' |
| 284 | expected = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}" |
| 285 | assert _sign_payload(secret, body) == expected |
| 286 | |
| 287 | def test_unicode_secret_encoded_correctly(self) -> None: |
| 288 | """Secret with non-ASCII chars must encode to bytes before HMAC.""" |
| 289 | # Should not raise; uses .encode() which defaults to UTF-8 |
| 290 | sig = _sign_payload("sécret", b"body") |
| 291 | assert sig.startswith("sha256=") |
| 292 | |
| 293 | def test_empty_body_produces_signature(self) -> None: |
| 294 | sig = _sign_payload("secret", b"") |
| 295 | assert sig.startswith("sha256=") |
| 296 | assert len(sig) == 71 |
| 297 | |
| 298 | def test_large_body_still_correct(self) -> None: |
| 299 | body = b"x" * 100_000 |
| 300 | sig = _sign_payload("sec", body) |
| 301 | expected = f"sha256={hmac.new("sec".encode(), body, hashlib.sha256).hexdigest()}" |
| 302 | assert sig == expected |
| 303 | |
| 304 | |
| 305 | class TestSingletonCaching: |
| 306 | """Unit tests for _get_fernet singleton caching.""" |
| 307 | |
| 308 | def test_singleton_returns_same_instance(self) -> None: |
| 309 | """Once initialised, _get_fernet returns the same Fernet instance.""" |
| 310 | from musehub.services.musehub_webhook_crypto import _get_fernet |
| 311 | |
| 312 | key = _fresh_key() |
| 313 | with _with_key(key): |
| 314 | f1 = _get_fernet() |
| 315 | f2 = _get_fernet() |
| 316 | assert f1 is f2 |
| 317 | |
| 318 | def test_singleton_returns_fernet_type(self) -> None: |
| 319 | from musehub.services.musehub_webhook_crypto import _get_fernet |
| 320 | key = _fresh_key() |
| 321 | with _with_key(key): |
| 322 | f = _get_fernet() |
| 323 | assert isinstance(f, Fernet) |
| 324 | |
| 325 | |
| 326 | # ───────────────────────────────────────────────────────────────────────────── |
| 327 | # LAYER 2 — INTEGRATION |
| 328 | # ───────────────────────────────────────────────────────────────────────────── |
| 329 | |
| 330 | |
| 331 | class TestWebhookCryptoIntegration: |
| 332 | """Integration: encrypt/decrypt in real usage contexts.""" |
| 333 | |
| 334 | def test_dispatcher_uses_decrypt_secret_before_sign(self) -> None: |
| 335 | """musehub_webhook_dispatcher decrypts the secret before signing.""" |
| 336 | from musehub.services import musehub_webhook_dispatcher |
| 337 | import inspect |
| 338 | src = inspect.getsource(musehub_webhook_dispatcher) |
| 339 | assert "decrypt_secret" in src |
| 340 | assert "_sign_payload" in src |
| 341 | |
| 342 | def test_encrypt_then_sign_roundtrip(self) -> None: |
| 343 | """Encrypt a secret, decrypt it, use it to sign — produces correct HMAC.""" |
| 344 | key = _fresh_key() |
| 345 | plaintext = "super-webhook-secret" |
| 346 | body = b'{"repo": "muse", "event": "push"}' |
| 347 | |
| 348 | with _with_key(key): |
| 349 | ciphertext = encrypt_secret(plaintext) |
| 350 | recovered = decrypt_secret(ciphertext) |
| 351 | |
| 352 | sig = _sign_payload(recovered, body) |
| 353 | expected = f"sha256={hmac.new(plaintext.encode(), body, hashlib.sha256).hexdigest()}" |
| 354 | assert sig == expected |
| 355 | |
| 356 | def test_key_rotation_invalidates_stored_webhook_token(self) -> None: |
| 357 | """After key rotation, stored webhook secret tokens cannot be decrypted.""" |
| 358 | key1 = _fresh_key() |
| 359 | key2 = _fresh_key() |
| 360 | |
| 361 | with _with_key(key1): |
| 362 | stored = encrypt_secret("webhook-api-secret-xyz") |
| 363 | |
| 364 | with _with_key(key2): |
| 365 | with pytest.raises(ValueError): |
| 366 | decrypt_secret(stored) |
| 367 | |
| 368 | def test_ci_secret_roundtrip_via_crypto_module(self) -> None: |
| 369 | """CI secrets encrypted then decrypted yield original plaintext.""" |
| 370 | key = _fresh_key() |
| 371 | ci_secret = "GITHUB_TOKEN=ghp_abc123xyz" |
| 372 | |
| 373 | with _with_key(key): |
| 374 | encrypted = encrypt_secret(ci_secret) |
| 375 | assert encrypted != ci_secret |
| 376 | assert is_fernet_token(encrypted) |
| 377 | decrypted = decrypt_secret(encrypted) |
| 378 | |
| 379 | assert decrypted == ci_secret |
| 380 | |
| 381 | |
| 382 | # ───────────────────────────────────────────────────────────────────────────── |
| 383 | # LAYER 3 — E2E |
| 384 | # ───────────────────────────────────────────────────────────────────────────── |
| 385 | |
| 386 | |
| 387 | class TestWebhookCryptoE2E: |
| 388 | """E2E: signature header on delivered webhooks via HTTP test client.""" |
| 389 | |
| 390 | def test_webhook_delivery_signature_matches_hmac(self) -> None: |
| 391 | """_sign_payload produces HMAC that matches manual computation (delivery path).""" |
| 392 | import json |
| 393 | |
| 394 | secret = "webhook-e2e-secret" |
| 395 | payload = {"repo_id": "abc123", "event": "push"} |
| 396 | body = json.dumps(payload).encode() |
| 397 | |
| 398 | sig = _sign_payload(secret, body) |
| 399 | |
| 400 | assert sig.startswith("sha256=") |
| 401 | expected = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}" |
| 402 | assert sig == expected |
| 403 | |
| 404 | def test_webhook_no_secret_sign_empty_body(self) -> None: |
| 405 | """_sign_payload with empty body still produces a valid sha256 signature.""" |
| 406 | secret = "any-secret" |
| 407 | sig = _sign_payload(secret, b"") |
| 408 | assert sig.startswith("sha256=") |
| 409 | expected = f"sha256={hmac.new(secret.encode(), b"", hashlib.sha256).hexdigest()}" |
| 410 | assert sig == expected |
| 411 | |
| 412 | def test_sign_payload_matches_sha256_hmac_github_convention(self) -> None: |
| 413 | """_sign_payload output matches GitHub's webhook signing convention.""" |
| 414 | secret = "It's-a-Secret-to-Everybody" |
| 415 | body = b"Hello, World!" |
| 416 | sig = _sign_payload(secret, body) |
| 417 | manual = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}" |
| 418 | assert sig == manual |
| 419 | |
| 420 | |
| 421 | # ───────────────────────────────────────────────────────────────────────────── |
| 422 | # LAYER 4 — STRESS |
| 423 | # ───────────────────────────────────────────────────────────────────────────── |
| 424 | |
| 425 | |
| 426 | class TestWebhookCryptoStress: |
| 427 | """Stress: high-volume encrypt/decrypt and signing.""" |
| 428 | |
| 429 | def test_1000_encrypt_decrypt_cycles(self) -> None: |
| 430 | """1 000 encrypt→decrypt roundtrips all recover the original.""" |
| 431 | key = _fresh_key() |
| 432 | plaintext = "stress-webhook-secret" |
| 433 | with _with_key(key): |
| 434 | for _ in range(1000): |
| 435 | ct = encrypt_secret(plaintext) |
| 436 | assert decrypt_secret(ct) == plaintext |
| 437 | |
| 438 | def test_10000_is_fernet_token_checks(self) -> None: |
| 439 | """10 000 is_fernet_token calls complete without error.""" |
| 440 | key = _fresh_key() |
| 441 | with _with_key(key): |
| 442 | token = encrypt_secret("stress-check") |
| 443 | for _ in range(5000): |
| 444 | assert is_fernet_token(token) is True |
| 445 | assert is_fernet_token("plaintext") is False |
| 446 | |
| 447 | def test_100_sign_payload_large_body(self) -> None: |
| 448 | """100 sign_payload calls with 10 KB bodies complete correctly.""" |
| 449 | secret = "perf-secret" |
| 450 | body = b"x" * 10_240 |
| 451 | expected = f"sha256={hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()}" |
| 452 | for _ in range(100): |
| 453 | assert _sign_payload(secret, body) == expected |
| 454 | |
| 455 | def test_1000_different_plaintexts_all_decrypt(self) -> None: |
| 456 | """Encrypting 1 000 different secrets all decrypt correctly.""" |
| 457 | key = _fresh_key() |
| 458 | plaintexts = [f"secret-{i:04d}" for i in range(1000)] |
| 459 | with _with_key(key): |
| 460 | ciphertexts = [encrypt_secret(p) for p in plaintexts] |
| 461 | recovered = [decrypt_secret(ct) for ct in ciphertexts] |
| 462 | assert recovered == plaintexts |
| 463 | |
| 464 | |
| 465 | # ───────────────────────────────────────────────────────────────────────────── |
| 466 | # LAYER 5 — DATA INTEGRITY |
| 467 | # ───────────────────────────────────────────────────────────────────────────── |
| 468 | |
| 469 | |
| 470 | class TestWebhookCryptoDataIntegrity: |
| 471 | """Data integrity: ciphertext properties and correctness guarantees.""" |
| 472 | |
| 473 | def test_different_plaintexts_produce_different_ciphertexts(self) -> None: |
| 474 | key = _fresh_key() |
| 475 | with _with_key(key): |
| 476 | c1 = encrypt_secret("secret-A") |
| 477 | c2 = encrypt_secret("secret-B") |
| 478 | assert c1 != c2 |
| 479 | |
| 480 | def test_same_plaintext_multiple_encryptions_all_different(self) -> None: |
| 481 | """Fernet uses a random IV — each encryption of the same value differs.""" |
| 482 | key = _fresh_key() |
| 483 | plaintexts = ["repeat"] * 10 |
| 484 | with _with_key(key): |
| 485 | tokens = [encrypt_secret(p) for p in plaintexts] |
| 486 | assert len(set(tokens)) == 10 # all distinct |
| 487 | |
| 488 | def test_truncated_fernet_token_raises_value_error(self) -> None: |
| 489 | """Truncating a valid Fernet token corrupts it → must raise ValueError.""" |
| 490 | key = _fresh_key() |
| 491 | with _with_key(key): |
| 492 | full_token = encrypt_secret("integrity-test") |
| 493 | truncated = full_token[:30] # clearly corrupt |
| 494 | # Truncated token still starts with gAAAAAB (first 7 chars are magic) |
| 495 | # and length check will fail inside Fernet — may not have the prefix intact |
| 496 | # Either ValueError or InvalidToken from Fernet → we catch both |
| 497 | with _with_key(key): |
| 498 | try: |
| 499 | result = decrypt_secret(truncated) |
| 500 | # If truncated token doesn't have the prefix, it's legacy passthrough |
| 501 | if not is_fernet_token(truncated): |
| 502 | assert result == truncated # legacy passthrough is correct |
| 503 | except (ValueError, Exception): |
| 504 | pass # any error is acceptable for a corrupted token |
| 505 | |
| 506 | def test_fernet_token_has_gaaab_prefix(self) -> None: |
| 507 | """All tokens produced by encrypt_secret start with the magic prefix.""" |
| 508 | key = _fresh_key() |
| 509 | for i in range(10): |
| 510 | with _with_key(key): |
| 511 | token = encrypt_secret(f"secret-{i}") |
| 512 | assert token.startswith("gAAAAAB"), f"Token {i} has unexpected prefix" |
| 513 | |
| 514 | def test_decrypt_produces_exact_original_unicode(self) -> None: |
| 515 | """Unicode secrets round-trip exactly.""" |
| 516 | key = _fresh_key() |
| 517 | original = "sécret-clé-à-résoudre" |
| 518 | with _with_key(key): |
| 519 | ct = encrypt_secret(original) |
| 520 | recovered = decrypt_secret(ct) |
| 521 | assert recovered == original |
| 522 | |
| 523 | def test_long_secret_roundtrip(self) -> None: |
| 524 | """A 1 024-character secret round-trips correctly.""" |
| 525 | key = _fresh_key() |
| 526 | long_secret = "x" * 1024 |
| 527 | with _with_key(key): |
| 528 | ct = encrypt_secret(long_secret) |
| 529 | recovered = decrypt_secret(ct) |
| 530 | assert recovered == long_secret |
| 531 | |
| 532 | def test_sign_payload_output_is_exact_length(self) -> None: |
| 533 | """sha256= (7 chars) + SHA-256 hex (64 chars) = exactly 71 chars.""" |
| 534 | for body in [b"", b"x", b"x" * 1024]: |
| 535 | sig = _sign_payload("secret", body) |
| 536 | assert len(sig) == 71, f"sig len={len(sig)} for body of {len(body)} bytes" |
| 537 | |
| 538 | |
| 539 | # ───────────────────────────────────────────────────────────────────────────── |
| 540 | # LAYER 6 — SECURITY |
| 541 | # ───────────────────────────────────────────────────────────────────────────── |
| 542 | |
| 543 | |
| 544 | class TestWebhookCryptoSecurity: |
| 545 | """Security: constant-time comparison, key isolation, no leakage.""" |
| 546 | |
| 547 | def test_fingerprints_equal_uses_compare_digest(self) -> None: |
| 548 | """fingerprints_equal must use hmac.compare_digest, not ==.""" |
| 549 | from musehub.crypto.keys import fingerprints_equal |
| 550 | import inspect |
| 551 | src = inspect.getsource(fingerprints_equal) |
| 552 | assert "compare_digest" in src |
| 553 | |
| 554 | def test_value_error_message_does_not_contain_key(self) -> None: |
| 555 | """ValueError on bad decrypt must not leak the Fernet key.""" |
| 556 | key = _fresh_key() |
| 557 | corrupt = _FERNET_TOKEN_PREFIX + "bad-data-that-is-not-valid-base64-x" |
| 558 | with _with_key(key): |
| 559 | try: |
| 560 | decrypt_secret(corrupt) |
| 561 | except (ValueError, Exception) as exc: |
| 562 | assert key not in str(exc) |
| 563 | |
| 564 | def test_wrong_key_raises_value_error_not_returns_garbage(self) -> None: |
| 565 | """Decrypting with the wrong key raises ValueError — no silent corruption.""" |
| 566 | key1 = _fresh_key() |
| 567 | key2 = _fresh_key() |
| 568 | |
| 569 | with _with_key(key1): |
| 570 | token = encrypt_secret("sensitive-secret") |
| 571 | |
| 572 | with _with_key(key2): |
| 573 | with pytest.raises(ValueError): |
| 574 | decrypt_secret(token) |
| 575 | |
| 576 | def test_plaintext_not_stored_as_plaintext_when_key_set(self) -> None: |
| 577 | """When a key is configured, encrypt_secret must not return the input unchanged.""" |
| 578 | key = _fresh_key() |
| 579 | plaintext = "must-not-store-as-is" |
| 580 | with _with_key(key): |
| 581 | ciphertext = encrypt_secret(plaintext) |
| 582 | assert ciphertext != plaintext |
| 583 | |
| 584 | def test_ciphertext_does_not_contain_plaintext(self) -> None: |
| 585 | """The raw ciphertext string must not contain the original secret.""" |
| 586 | key = _fresh_key() |
| 587 | secret = "super-sensitive-webhook-token" |
| 588 | with _with_key(key): |
| 589 | ct = encrypt_secret(secret) |
| 590 | assert secret not in ct |
| 591 | |
| 592 | def test_compare_digest_not_naive_equals(self) -> None: |
| 593 | """Verify fingerprints_equal is not implemented with plain == comparison.""" |
| 594 | from musehub.crypto.keys import fingerprints_equal |
| 595 | import ast, inspect |
| 596 | src = inspect.getsource(fingerprints_equal) |
| 597 | # Must not use bare == for the comparison return |
| 598 | tree = ast.parse(src) |
| 599 | for node in ast.walk(tree): |
| 600 | if isinstance(node, ast.Return): |
| 601 | # Return value must not be a plain Compare with Eq |
| 602 | val = node.value |
| 603 | if isinstance(val, ast.Compare): |
| 604 | for op in val.ops: |
| 605 | assert not isinstance(op, ast.Eq), \ |
| 606 | "fingerprints_equal uses == instead of compare_digest" |
| 607 | |
| 608 | def test_sign_payload_uses_hmac_module(self) -> None: |
| 609 | """_sign_payload must use the hmac module (verified via source inspection).""" |
| 610 | import inspect |
| 611 | from musehub.services import musehub_webhook_dispatcher |
| 612 | src = inspect.getsource(musehub_webhook_dispatcher._sign_payload) |
| 613 | assert "hmac" in src |
| 614 | |
| 615 | def test_empty_secret_produces_valid_signature(self) -> None: |
| 616 | """Even an empty-string secret produces a well-formed signature.""" |
| 617 | sig = _sign_payload("", b"body") |
| 618 | assert sig.startswith("sha256=") |
| 619 | assert len(sig) == 71 |
| 620 | |
| 621 | |
| 622 | # ───────────────────────────────────────────────────────────────────────────── |
| 623 | # LAYER 7 — PERFORMANCE |
| 624 | # ───────────────────────────────────────────────────────────────────────────── |
| 625 | |
| 626 | |
| 627 | class TestWebhookCryptoPerformance: |
| 628 | """Performance: latency budgets for crypto operations.""" |
| 629 | |
| 630 | def test_100_encrypt_decrypt_cycles_under_1s(self) -> None: |
| 631 | key = _fresh_key() |
| 632 | plaintext = "perf-webhook-secret" |
| 633 | start = time.perf_counter() |
| 634 | with _with_key(key): |
| 635 | for _ in range(100): |
| 636 | ct = encrypt_secret(plaintext) |
| 637 | decrypt_secret(ct) |
| 638 | elapsed = time.perf_counter() - start |
| 639 | assert elapsed < 1.0, f"100 encrypt+decrypt took {elapsed*1000:.0f}ms (limit 1000ms)" |
| 640 | |
| 641 | def test_10000_is_fernet_token_under_5ms(self) -> None: |
| 642 | key = _fresh_key() |
| 643 | with _with_key(key): |
| 644 | token = encrypt_secret("perf-check") |
| 645 | start = time.perf_counter() |
| 646 | for _ in range(5000): |
| 647 | is_fernet_token(token) |
| 648 | is_fernet_token("plaintext") |
| 649 | elapsed = time.perf_counter() - start |
| 650 | assert elapsed < 0.005, f"10K is_fernet_token calls took {elapsed*1000:.1f}ms (limit 5ms)" |
| 651 | |
| 652 | def test_1000_sign_payload_under_200ms(self) -> None: |
| 653 | secret = "perf-sign-secret" |
| 654 | body = b'{"event": "push", "repo": "muse"}' |
| 655 | start = time.perf_counter() |
| 656 | for _ in range(1000): |
| 657 | _sign_payload(secret, body) |
| 658 | elapsed = time.perf_counter() - start |
| 659 | assert elapsed < 0.200, f"1K _sign_payload took {elapsed*1000:.1f}ms (limit 200ms)" |
| 660 | |
| 661 | def test_encrypt_100_different_secrets_under_1s(self) -> None: |
| 662 | key = _fresh_key() |
| 663 | secrets = [f"webhook-secret-{i}" for i in range(100)] |
| 664 | start = time.perf_counter() |
| 665 | with _with_key(key): |
| 666 | for s in secrets: |
| 667 | encrypt_secret(s) |
| 668 | elapsed = time.perf_counter() - start |
| 669 | assert elapsed < 1.0, f"100 encryptions took {elapsed*1000:.0f}ms (limit 1000ms)" |
| 670 | |
| 671 | def test_sign_payload_throughput_large_body(self) -> None: |
| 672 | """Signing a 64 KB payload 100 times must finish in under 500ms.""" |
| 673 | secret = "large-body-secret" |
| 674 | body = b"x" * 65_536 |
| 675 | start = time.perf_counter() |
| 676 | for _ in range(100): |
| 677 | _sign_payload(secret, body) |
| 678 | elapsed = time.perf_counter() - start |
| 679 | assert elapsed < 0.500, f"100 × 64KB sign_payload took {elapsed*1000:.0f}ms (limit 500ms)" |
File History
2 commits
sha256:5601f81903b6c70ddd11bd88a5a257ee6dfd38aa3b85b19746c100c030657f1e
chore: update smoke_muse.sh comment to reference rc9
Sonnet 4.6
minor
⚠
22 days ago
sha256:39e9c4e6f2134da0732e6983268a218178973936f8d7ca03c91f2b5ad42133c8
fix: use read_object_bytes in blob viewer; add zstd magic d…
Sonnet 4.6
patch
22 days ago