gabriel / musehub public
test_attestations_phase1.py python
1,002 lines 38.2 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 20 days ago
1 """TDD — attestation system Phase 1: claim-type registry, scope, expiry.
2
3 Seven test tiers:
4 T1 Unit — pure functions, no I/O
5 T2 Integration — real DB via db_session fixture
6 T3 End-to-end — FastAPI TestClient round-trips
7 T4 Stress — concurrency / idempotency under load
8 T5 Data integrity — DB invariants, no silent mutation
9 T6 Performance — query latency with index coverage
10 T7 Security — sig replay, cross-protocol, impersonation
11
12 Run a single tier:
13 pytest tests/test_attestations_phase1.py -m tier1 -q --tb=short
14 """
15 from __future__ import annotations
16
17 import asyncio
18 import json
19 import time
20 from datetime import datetime, timedelta, timezone
21 from unittest.mock import AsyncMock, MagicMock
22
23 import pytest
24 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
25 from muse.core.types import encode_pubkey, encode_sig
26 from httpx import AsyncClient
27 from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
28
29 from musehub.core.genesis import compute_attestation_id
30 from musehub.models.musehub import AttestationRequest, AttestationResponse
31 from tests.factories import create_profile, create_repo
32
33
34 # ---------------------------------------------------------------------------
35 # Shared helpers
36 # ---------------------------------------------------------------------------
37
38 def _utc() -> datetime:
39 return datetime.now(tz=timezone.utc)
40
41
42 def _make_keypair() -> tuple[Ed25519PrivateKey, str]:
43 priv = Ed25519PrivateKey.generate()
44 pub = encode_pubkey("ed25519", priv.public_key().public_bytes_raw())
45 return priv, pub
46
47
48 def _sign(
49 priv: Ed25519PrivateKey,
50 attester: str,
51 subject: str,
52 claim: str,
53 ts: str,
54 scope: str = "identity",
55 scope_ref: str | None = None,
56 ) -> str:
57 parts = ["ATTEST", attester, subject, claim, ts]
58 if scope != "identity" and scope_ref:
59 parts.append(scope_ref)
60 msg = "\n".join(parts).encode()
61 return encode_sig("ed25519", priv.sign(msg))
62
63
64 def _req(
65 attester: str,
66 subject: str,
67 claim: str,
68 priv: Ed25519PrivateKey,
69 pub: str,
70 *,
71 scope: str = "identity",
72 scope_ref: str | None = None,
73 repo_id: str | None = None,
74 commit_id: str | None = None,
75 expires_at: datetime | None = None,
76 issued_at: datetime | None = None,
77 ) -> AttestationRequest:
78 ts = (issued_at or _utc()).isoformat()
79 sig = _sign(priv, attester, subject, claim, ts, scope=scope, scope_ref=scope_ref)
80 return AttestationRequest(
81 attester=attester,
82 subject=subject,
83 claim=claim,
84 signature=sig,
85 attester_public_key=pub,
86 issued_at=issued_at or datetime.fromisoformat(ts),
87 scope=scope,
88 scope_ref=scope_ref,
89 repo_id=repo_id,
90 commit_id=commit_id,
91 expires_at=expires_at,
92 )
93
94
95 # ---------------------------------------------------------------------------
96 # T1 — Unit: claim-type registry
97 # ---------------------------------------------------------------------------
98
99 @pytest.mark.tier1
100 class TestClaimTypeRegistry:
101 """Pure registry lookups — no DB, no I/O."""
102
103 def test_known_type_resolves(self) -> None:
104 from musehub.services.musehub_attestations import get_claim_type
105
106 ct = get_claim_type("human")
107 assert ct is not None
108 assert ct["type_key"] == "human"
109 assert ct["category"] == "identity"
110
111 def test_unknown_type_raises(self) -> None:
112 from musehub.services.musehub_attestations import get_claim_type
113
114 with pytest.raises(ValueError, match="unknown claim type"):
115 get_claim_type("nonsense-claim-xyz")
116
117 def test_deprecated_type_raises_on_issue(self) -> None:
118 """Deprecated types must be rejected at write time."""
119 from musehub.services.musehub_attestations import validate_claim_for_issue
120
121 with pytest.raises(ValueError, match="deprecated"):
122 validate_claim_for_issue("__deprecated_test__")
123
124 def test_all_seed_types_present(self) -> None:
125 from musehub.services.musehub_attestations import list_claim_types
126
127 keys = {ct["type_key"] for ct in list_claim_types()}
128 for expected in (
129 "human", "org", "agent", "spawned-by", "delegate", "trusted",
130 "collab", "co-author", "contractor",
131 "code:reviewed", "code:approved", "deploy:approved",
132 "stems:verified", "mix:approved", "midi:generated", "master:approved",
133 "skill:verified",
134 ):
135 assert expected in keys, f"seed type '{expected}' missing from registry"
136
137 def test_valid_scopes_per_type(self) -> None:
138 from musehub.services.musehub_attestations import get_claim_type
139
140 human = get_claim_type("human")
141 assert human["valid_scopes"] == ["identity"]
142
143 deploy = get_claim_type("deploy:approved")
144 assert "commit" in deploy["valid_scopes"]
145 assert "identity" not in deploy["valid_scopes"]
146
147 stems = get_claim_type("stems:verified")
148 assert "identity" in stems["valid_scopes"]
149 assert "commit" in stems["valid_scopes"]
150
151
152 # ---------------------------------------------------------------------------
153 # T1 — Unit: scope_ref parsing
154 # ---------------------------------------------------------------------------
155
156 @pytest.mark.tier1
157 class TestScopeRefParsing:
158 """Parse and validate the scope_ref string formats."""
159
160 def test_identity_scope_ref_is_handle(self) -> None:
161 from musehub.services.musehub_attestations import parse_scope_ref
162
163 result = parse_scope_ref("identity", "gabriel")
164 assert result["handle"] == "gabriel"
165 assert result["repo_slug"] is None
166 assert result["commit_id"] is None
167
168 def test_repo_scope_ref_parsed(self) -> None:
169 from musehub.services.musehub_attestations import parse_scope_ref
170
171 result = parse_scope_ref("repo", "gabriel/musehub")
172 assert result["handle"] == "gabriel"
173 assert result["repo_slug"] == "musehub"
174 assert result["commit_id"] is None
175
176 def test_commit_scope_ref_parsed(self) -> None:
177 from musehub.services.musehub_attestations import parse_scope_ref
178
179 cid = "sha256:" + "a" * 64
180 result = parse_scope_ref("commit", f"gabriel/musehub@{cid}")
181 assert result["handle"] == "gabriel"
182 assert result["repo_slug"] == "musehub"
183 assert result["commit_id"] == cid
184
185 def test_commit_scope_ref_missing_at_raises(self) -> None:
186 from musehub.services.musehub_attestations import parse_scope_ref
187
188 with pytest.raises(ValueError, match="commit scope_ref must contain '@'"):
189 parse_scope_ref("commit", "gabriel/musehub")
190
191 def test_invalid_scope_raises(self) -> None:
192 from musehub.services.musehub_attestations import parse_scope_ref
193
194 with pytest.raises(ValueError, match="invalid scope"):
195 parse_scope_ref("galaxy", "gabriel")
196
197
198 # ---------------------------------------------------------------------------
199 # T1 — Unit: scope enforcement against claim type valid_scopes
200 # ---------------------------------------------------------------------------
201
202 @pytest.mark.tier1
203 class TestScopeClaimEnforcement:
204
205 def test_human_on_commit_scope_raises(self) -> None:
206 from musehub.services.musehub_attestations import validate_scope_for_claim
207
208 with pytest.raises(ValueError, match="'human' is not valid for scope 'commit'"):
209 validate_scope_for_claim("human", "commit")
210
211 def test_deploy_approved_on_identity_scope_raises(self) -> None:
212 from musehub.services.musehub_attestations import validate_scope_for_claim
213
214 with pytest.raises(ValueError, match="'deploy:approved' is not valid for scope 'identity'"):
215 validate_scope_for_claim("deploy:approved", "identity")
216
217 def test_collab_on_commit_scope_passes(self) -> None:
218 from musehub.services.musehub_attestations import validate_scope_for_claim
219
220 validate_scope_for_claim("collab", "commit") # must not raise
221
222 def test_stems_verified_on_identity_passes(self) -> None:
223 from musehub.services.musehub_attestations import validate_scope_for_claim
224
225 validate_scope_for_claim("stems:verified", "identity") # must not raise
226
227
228 # ---------------------------------------------------------------------------
229 # T1 — Unit: canonical message includes scope_ref
230 # ---------------------------------------------------------------------------
231
232 @pytest.mark.tier1
233 class TestCanonicalMessage:
234
235 def test_identity_scope_message_unchanged(self) -> None:
236 """Legacy identity-scope message must stay compatible with existing sigs."""
237 from musehub.services.musehub_attestations import build_canonical_message
238
239 msg = build_canonical_message(
240 attester="gabriel",
241 subject="aria",
242 claim='{"type":"human"}',
243 issued_at_iso="2026-05-01T00:00:00+00:00",
244 scope="identity",
245 scope_ref=None,
246 )
247 assert msg == b"ATTEST\ngabriel\naria\n{\"type\":\"human\"}\n2026-05-01T00:00:00+00:00"
248
249 def test_commit_scope_appends_scope_ref(self) -> None:
250 from musehub.services.musehub_attestations import build_canonical_message
251
252 cid = "sha256:" + "b" * 64
253 msg = build_canonical_message(
254 attester="gabriel",
255 subject="gabriel/musehub",
256 claim='{"type":"deploy:approved"}',
257 issued_at_iso="2026-05-01T00:00:00+00:00",
258 scope="commit",
259 scope_ref=f"gabriel/musehub@{cid}",
260 )
261 assert f"gabriel/musehub@{cid}".encode() in msg
262
263 def test_deterministic(self) -> None:
264 from musehub.services.musehub_attestations import build_canonical_message
265
266 kwargs = dict(
267 attester="a", subject="b", claim='{"type":"collab"}',
268 issued_at_iso="2026-01-01T00:00:00+00:00",
269 scope="identity", scope_ref=None,
270 )
271 assert build_canonical_message(**kwargs) == build_canonical_message(**kwargs)
272
273
274 # ---------------------------------------------------------------------------
275 # T1 — Unit: AttestationRequest Pydantic validation
276 # ---------------------------------------------------------------------------
277
278 @pytest.mark.tier1
279 class TestAttestationRequestModel:
280
281 def test_scope_default_is_identity(self) -> None:
282 priv, pub = _make_keypair()
283 ts = _utc()
284 sig = _sign(priv, "a", "b", '{"type":"collab"}', ts.isoformat())
285 req = AttestationRequest(
286 attester="a", subject="b", claim='{"type":"collab"}',
287 signature=sig, attester_public_key=pub, issued_at=ts,
288 )
289 assert req.scope == "identity"
290 assert req.scope_ref is None
291 assert req.expires_at is None
292
293 def test_commit_scope_requires_scope_ref(self) -> None:
294 priv, pub = _make_keypair()
295 ts = _utc()
296 sig = _sign(priv, "a", "b", '{"type":"deploy:approved"}', ts.isoformat())
297 with pytest.raises(Exception):
298 AttestationRequest(
299 attester="a", subject="b", claim='{"type":"deploy:approved"}',
300 signature=sig, attester_public_key=pub, issued_at=ts,
301 scope="commit", scope_ref=None, # missing scope_ref for commit
302 )
303
304 def test_expires_at_accepted(self) -> None:
305 priv, pub = _make_keypair()
306 ts = _utc()
307 exp = ts + timedelta(days=90)
308 sig = _sign(priv, "a", "b", '{"type":"collab"}', ts.isoformat())
309 req = AttestationRequest(
310 attester="a", subject="b", claim='{"type":"collab"}',
311 signature=sig, attester_public_key=pub, issued_at=ts,
312 expires_at=exp,
313 )
314 assert req.expires_at == exp
315
316
317 # ---------------------------------------------------------------------------
318 # T2 — Integration: issue with scope and expiry (real DB)
319 # ---------------------------------------------------------------------------
320
321 @pytest.mark.tier2
322 @pytest.mark.asyncio
323 async def test_i1_issue_identity_scope(db_session: AsyncSession) -> None:
324 """Basic identity-scoped attestation round-trip."""
325 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
326
327 priv, pub = _make_keypair()
328 req = _req("gabriel", "aria", '{"type":"human"}', priv, pub)
329 result = await issue_attestation(db_session, req)
330
331 assert result.attester == "gabriel"
332 assert result.subject == "aria"
333 assert result.scope == "identity"
334 assert result.scope_ref is None
335 assert result.expires_at is None
336 assert result.revoked_at is None
337
338 listed = await get_attestations_for_subject(db_session, "aria")
339 assert any(a.attestation_id == result.attestation_id for a in listed.attestations)
340
341
342 @pytest.mark.tier2
343 @pytest.mark.asyncio
344 async def test_i2_issue_commit_scope(db_session: AsyncSession) -> None:
345 """Commit-scoped attestation stores scope_ref and commit_id correctly."""
346 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_commit
347
348 priv, pub = _make_keypair()
349 cid = "sha256:" + "c" * 64
350 scope_ref = f"gabriel/musehub@{cid}"
351
352 req = _req(
353 "gabriel", "gabriel/musehub", '{"type":"deploy:approved"}', priv, pub,
354 scope="commit", scope_ref=scope_ref, commit_id=cid,
355 )
356 result = await issue_attestation(db_session, req)
357
358 assert result.scope == "commit"
359 assert result.scope_ref == scope_ref
360 assert result.commit_id == cid
361
362 by_commit = await get_attestations_for_commit(db_session, cid)
363 assert any(a.attestation_id == result.attestation_id for a in by_commit.attestations)
364
365
366 @pytest.mark.tier2
367 @pytest.mark.asyncio
368 async def test_i3_commit_scope_excluded_from_identity_query(db_session: AsyncSession) -> None:
369 """Commit-scoped attestations must not appear in identity-scoped profile queries."""
370 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
371
372 priv, pub = _make_keypair()
373 cid = "sha256:" + "d" * 64
374 req = _req(
375 "gabriel", "gabriel/musehub", '{"type":"deploy:approved"}', priv, pub,
376 scope="commit", scope_ref=f"gabriel/musehub@{cid}", commit_id=cid,
377 )
378 await issue_attestation(db_session, req)
379
380 # subject for commit scope is repo slug, not a handle — should not appear
381 listed = await get_attestations_for_subject(db_session, "gabriel/musehub")
382 assert all(a.scope == "identity" for a in listed.attestations)
383
384
385 @pytest.mark.tier2
386 @pytest.mark.asyncio
387 async def test_i4_expired_attestation_excluded_by_default(db_session: AsyncSession) -> None:
388 """Attestations past expires_at are excluded from default queries."""
389 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
390
391 priv, pub = _make_keypair()
392 past = _utc() - timedelta(seconds=1)
393 req = _req("gabriel", "aria", '{"type":"contractor"}', priv, pub, expires_at=past)
394 result = await issue_attestation(db_session, req)
395
396 listed = await get_attestations_for_subject(db_session, "aria")
397 assert all(a.attestation_id != result.attestation_id for a in listed.attestations)
398
399
400 @pytest.mark.tier2
401 @pytest.mark.asyncio
402 async def test_i5_expired_attestation_included_with_flag(db_session: AsyncSession) -> None:
403 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
404
405 priv, pub = _make_keypair()
406 past = _utc() - timedelta(seconds=1)
407 req = _req("gabriel", "aria", '{"type":"contractor"}', priv, pub, expires_at=past)
408 result = await issue_attestation(db_session, req)
409
410 listed = await get_attestations_for_subject(db_session, "aria", include_expired=True)
411 assert any(a.attestation_id == result.attestation_id for a in listed.attestations)
412
413
414 @pytest.mark.tier2
415 @pytest.mark.asyncio
416 async def test_i6_unknown_claim_type_rejected(db_session: AsyncSession) -> None:
417 """Registry enforcement: unknown claim type raises ValueError before DB write."""
418 from musehub.services.musehub_attestations import issue_attestation
419
420 priv, pub = _make_keypair()
421 req = _req("gabriel", "aria", '{"type":"galaxy-brain"}', priv, pub)
422
423 with pytest.raises(ValueError, match="unknown claim type"):
424 await issue_attestation(db_session, req)
425
426
427 @pytest.mark.tier2
428 @pytest.mark.asyncio
429 async def test_i7_wrong_scope_for_claim_type_rejected(db_session: AsyncSession) -> None:
430 """`human` claim type only valid for identity scope — commit scope raises."""
431 from musehub.services.musehub_attestations import issue_attestation
432
433 priv, pub = _make_keypair()
434 cid = "sha256:" + "e" * 64
435 req = _req(
436 "gabriel", "gabriel/musehub", '{"type":"human"}', priv, pub,
437 scope="commit", scope_ref=f"gabriel/musehub@{cid}", commit_id=cid,
438 )
439 with pytest.raises(ValueError, match="not valid for scope"):
440 await issue_attestation(db_session, req)
441
442
443 @pytest.mark.tier2
444 @pytest.mark.asyncio
445 async def test_i8_idempotent_issue(db_session: AsyncSession) -> None:
446 """Same canonical payload issued twice returns existing row, inserts only once."""
447 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
448
449 priv, pub = _make_keypair()
450 ts = _utc()
451 req = _req("gabriel", "aria", '{"type":"trusted"}', priv, pub, issued_at=ts)
452
453 r1 = await issue_attestation(db_session, req)
454 r2 = await issue_attestation(db_session, req)
455
456 assert r1.attestation_id == r2.attestation_id
457 listed = await get_attestations_for_subject(db_session, "aria")
458 matching = [a for a in listed.attestations if a.attestation_id == r1.attestation_id]
459 assert len(matching) == 1
460
461
462 @pytest.mark.tier2
463 @pytest.mark.asyncio
464 async def test_i9_repo_scope_stored_and_retrieved(db_session: AsyncSession) -> None:
465 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_repo
466
467 priv, pub = _make_keypair()
468 req = _req(
469 "gabriel", "gabriel/musehub", '{"type":"code:reviewed"}', priv, pub,
470 scope="repo", scope_ref="gabriel/musehub",
471 )
472 result = await issue_attestation(db_session, req)
473 assert result.scope == "repo"
474
475 by_repo = await get_attestations_for_repo(db_session, "gabriel/musehub")
476 assert any(a.attestation_id == result.attestation_id for a in by_repo.attestations)
477
478
479 # ---------------------------------------------------------------------------
480 # T2 — Integration: DB-backed claim type registry
481 # ---------------------------------------------------------------------------
482
483 @pytest.mark.tier2
484 @pytest.mark.asyncio
485 async def test_i10_registry_seed_types_in_db(db_session: AsyncSession) -> None:
486 """Seed claim types are present in the DB registry after migration."""
487 from musehub.services.musehub_attestations import get_claim_type_from_db
488
489 ct = await get_claim_type_from_db(db_session, "human")
490 assert ct is not None
491 assert ct["category"] == "identity"
492
493
494 @pytest.mark.tier2
495 @pytest.mark.asyncio
496 async def test_i11_add_new_claim_type(db_session: AsyncSession) -> None:
497 """New claim types can be added to the DB registry at runtime."""
498 from musehub.services.musehub_attestations import add_claim_type, get_claim_type_from_db
499
500 await add_claim_type(
501 db_session,
502 type_key="test:custom",
503 category="collab",
504 label="Custom Test",
505 description="A test-only claim type.",
506 valid_scopes=["identity", "commit"],
507 )
508
509 ct = await get_claim_type_from_db(db_session, "test:custom")
510 assert ct is not None
511 assert ct["label"] == "Custom Test"
512
513
514 @pytest.mark.tier2
515 @pytest.mark.asyncio
516 async def test_i12_deprecated_type_rejected(db_session: AsyncSession) -> None:
517 """After deprecating a type, issue_attestation raises ValueError."""
518 from musehub.services.musehub_attestations import (
519 add_claim_type, deprecate_claim_type, issue_attestation,
520 )
521
522 await add_claim_type(
523 db_session,
524 type_key="test:to-deprecate",
525 category="collab",
526 label="Deprecated Soon",
527 description="Will be deprecated.",
528 valid_scopes=["identity"],
529 )
530 await deprecate_claim_type(db_session, "test:to-deprecate")
531
532 priv, pub = _make_keypair()
533 req = _req("gabriel", "aria", '{"type":"test:to-deprecate"}', priv, pub)
534 with pytest.raises(ValueError, match="deprecated"):
535 await issue_attestation(db_session, req)
536
537
538 # ---------------------------------------------------------------------------
539 # T3 — End-to-end: API round-trips via TestClient
540 # ---------------------------------------------------------------------------
541
542 @pytest.fixture
543 def real_keypair() -> tuple[Ed25519PrivateKey, str]:
544 """A fresh Ed25519 keypair for E2E signing."""
545 return _make_keypair()
546
547
548 @pytest.mark.tier3
549 class TestAttestationAPIRoundTrip:
550
551 @pytest.mark.asyncio
552 async def test_e2e_create_and_list_identity_scope(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None:
553 """POST /api/profiles/{handle}/attestations → GET lists it."""
554 priv, pub = real_keypair
555 ts = _utc()
556 claim = '{"type":"collab"}'
557 sig = _sign(priv, "gabriel", "aria", claim, ts.isoformat())
558
559 resp = await client.post(
560 "/api/profiles/aria/attestations",
561 json={
562 "attester": "gabriel",
563 "subject": "aria",
564 "claim": claim,
565 "signature": sig,
566 "attester_public_key": pub,
567 "issued_at": ts.isoformat(),
568 "scope": "identity",
569 },
570 )
571 assert resp.status_code == 201
572 aid = resp.json()["attestationId"]
573
574 resp = await client.get("/api/profiles/aria/attestations")
575 assert resp.status_code == 200
576 ids = [a["attestationId"] for a in resp.json()["attestations"]]
577 assert aid in ids
578
579 @pytest.mark.asyncio
580 async def test_e2e_commit_scope_not_in_profile_list(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None:
581 """Commit-scoped attestations don't appear in identity profile listing."""
582 priv, pub = real_keypair
583 ts = _utc()
584 cid = "sha256:" + "f" * 64
585 claim = '{"type":"deploy:approved"}'
586 sig = _sign(priv, "gabriel", "gabriel/musehub", claim, ts.isoformat(),
587 scope="commit", scope_ref=f"gabriel/musehub@{cid}")
588
589 resp = await client.post(
590 "/api/profiles/gabriel/attestations",
591 json={
592 "attester": "gabriel",
593 "subject": "gabriel/musehub",
594 "claim": claim,
595 "signature": sig,
596 "attester_public_key": pub,
597 "issued_at": ts.isoformat(),
598 "scope": "commit",
599 "scope_ref": f"gabriel/musehub@{cid}",
600 "commit_id": cid,
601 },
602 )
603 assert resp.status_code == 201
604
605 resp = await client.get("/api/profiles/gabriel/attestations")
606 for a in resp.json()["attestations"]:
607 assert a.get("scope", "identity") == "identity"
608
609 @pytest.mark.asyncio
610 async def test_e2e_unknown_type_returns_400(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None:
611 priv, pub = real_keypair
612 ts = _utc()
613 claim = '{"type":"made-up-claim"}'
614 sig = _sign(priv, "gabriel", "aria", claim, ts.isoformat())
615
616 resp = await client.post(
617 "/api/profiles/aria/attestations",
618 json={
619 "attester": "gabriel", "subject": "aria", "claim": claim,
620 "signature": sig, "attester_public_key": pub,
621 "issued_at": ts.isoformat(), "scope": "identity",
622 },
623 )
624 assert resp.status_code == 400
625
626 @pytest.mark.asyncio
627 async def test_e2e_claim_types_endpoint(self, client: AsyncClient) -> None:
628 """GET /api/profiles/attestation-types returns the seeded registry."""
629 resp = await client.get("/api/profiles/attestation-types")
630 assert resp.status_code == 200
631 keys = {ct["typeKey"] for ct in resp.json()["claimTypes"]}
632 assert "human" in keys
633 assert "deploy:approved" in keys
634
635 @pytest.mark.asyncio
636 async def test_e2e_revoke_removes_from_listing(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None:
637 priv, pub = real_keypair
638 ts = _utc()
639 claim = '{"type":"trusted"}'
640 sig = _sign(priv, "gabriel", "aria", claim, ts.isoformat())
641
642 create = await client.post(
643 "/api/profiles/aria/attestations",
644 json={
645 "attester": "gabriel", "subject": "aria", "claim": claim,
646 "signature": sig, "attester_public_key": pub,
647 "issued_at": ts.isoformat(), "scope": "identity",
648 },
649 )
650 assert create.status_code == 201
651 aid = create.json()["attestationId"]
652
653 revoke = await client.delete(f"/api/profiles/aria/attestations/{aid}",
654 params={"revoker": "gabriel"})
655 assert revoke.status_code == 200
656
657 listed = await client.get("/api/profiles/aria/attestations")
658 ids = [a["attestationId"] for a in listed.json()["attestations"]]
659 assert aid not in ids
660
661
662 # ---------------------------------------------------------------------------
663 # T4 — Stress: idempotency under concurrent writes
664 # ---------------------------------------------------------------------------
665
666 @pytest.mark.tier4
667 @pytest.mark.asyncio
668 async def test_s1_concurrent_identical_issue_is_idempotent(session_factory: async_sessionmaker[AsyncSession]) -> None:
669 """50 concurrent issues of the same payload → exactly 1 row."""
670 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
671
672 priv, pub = _make_keypair()
673 ts = _utc()
674 req = _req("gabriel", "aria-stress", '{"type":"collab"}', priv, pub, issued_at=ts)
675
676 async def _do() -> AttestationResponse:
677 async with session_factory() as sess:
678 return await issue_attestation(sess, req)
679
680 results = await asyncio.gather(*[_do() for _ in range(50)])
681 ids = {r.attestation_id for r in results}
682 assert len(ids) == 1
683
684 async with session_factory() as check_sess:
685 listed = await get_attestations_for_subject(check_sess, "aria-stress")
686 matching = [a for a in listed.attestations if a.attester == "gabriel"]
687 assert len(matching) == 1
688
689
690 @pytest.mark.tier4
691 @pytest.mark.asyncio
692 async def test_s2_many_distinct_attestations(session_factory: async_sessionmaker[AsyncSession]) -> None:
693 """100 distinct attestations insert without error (max 10 concurrent connections)."""
694 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
695
696 sem = asyncio.Semaphore(10)
697
698 async def _issue(i: int) -> None:
699 priv, pub = _make_keypair()
700 req = _req(f"attester-{i:04d}", "aria-many", '{"type":"trusted"}', priv, pub)
701 async with sem:
702 async with session_factory() as sess:
703 await issue_attestation(sess, req)
704
705 await asyncio.gather(*[_issue(i) for i in range(100)])
706 async with session_factory() as check_sess:
707 listed = await get_attestations_for_subject(check_sess, "aria-many")
708 assert listed.total >= 100
709
710
711 # ---------------------------------------------------------------------------
712 # T5 — Data integrity
713 # ---------------------------------------------------------------------------
714
715 @pytest.mark.tier5
716 @pytest.mark.asyncio
717 async def test_d1_attestation_id_unique_constraint(db_session: AsyncSession) -> None:
718 """Duplicate attestation_id raises IntegrityError, not silent overwrite."""
719 import sqlalchemy.exc
720 from musehub.db.musehub_identity_models import MusehubAttestation
721
722 priv, pub = _make_keypair()
723 ts = _utc()
724 claim = '{"type":"collab"}'
725 aid = compute_attestation_id("gabriel", "aria", claim, ts.isoformat())
726
727 row = MusehubAttestation(
728 attestation_id=aid,
729 attester="gabriel",
730 subject="aria",
731 claim=claim,
732 signature=_sign(priv, "gabriel", "aria", claim, ts.isoformat()),
733 attester_public_key=pub,
734 issued_at=ts,
735 scope="identity",
736 )
737 db_session.add(row)
738 await db_session.commit()
739 db_session.expunge(row)
740
741 dup = MusehubAttestation(
742 attestation_id=aid, # same PK
743 attester="gabriel",
744 subject="aria",
745 claim=claim,
746 signature=_sign(priv, "gabriel", "aria", claim, ts.isoformat()),
747 attester_public_key=pub,
748 issued_at=ts,
749 scope="identity",
750 )
751 db_session.add(dup)
752 with pytest.raises(sqlalchemy.exc.IntegrityError):
753 await db_session.commit()
754
755
756 @pytest.mark.tier5
757 @pytest.mark.asyncio
758 async def test_d2_revoked_at_set_not_deleted(db_session: AsyncSession) -> None:
759 """Revocation sets revoked_at; the row is never deleted."""
760 from musehub.services.musehub_attestations import issue_attestation, revoke_attestation
761 from sqlalchemy import select
762 from musehub.db.musehub_identity_models import MusehubAttestation
763
764 priv, pub = _make_keypair()
765 req = _req("gabriel", "aria", '{"type":"trusted"}', priv, pub)
766 result = await issue_attestation(db_session, req)
767
768 await revoke_attestation(db_session, result.attestation_id, revoker="gabriel")
769
770 row = (await db_session.execute(
771 select(MusehubAttestation).where(
772 MusehubAttestation.attestation_id == result.attestation_id
773 )
774 )).scalar_one()
775 assert row.revoked_at is not None
776 assert row.attester == "gabriel" # row intact
777
778
779 @pytest.mark.tier5
780 @pytest.mark.asyncio
781 async def test_d3_scope_columns_non_nullable_defaults(db_session: AsyncSession) -> None:
782 """scope column has non-null default of 'identity'; scope_ref and commit_id are nullable."""
783 from musehub.services.musehub_attestations import issue_attestation
784 from sqlalchemy import select
785 from musehub.db.musehub_identity_models import MusehubAttestation
786
787 priv, pub = _make_keypair()
788 req = _req("gabriel", "aria", '{"type":"collab"}', priv, pub)
789 result = await issue_attestation(db_session, req)
790
791 row = (await db_session.execute(
792 select(MusehubAttestation).where(
793 MusehubAttestation.attestation_id == result.attestation_id
794 )
795 )).scalar_one()
796 assert row.scope == "identity"
797 assert row.scope_ref is None
798 assert row.commit_id is None
799 assert row.expires_at is None
800
801
802 @pytest.mark.tier5
803 @pytest.mark.asyncio
804 async def test_d4_claim_stored_verbatim(db_session: AsyncSession) -> None:
805 """claim JSON is stored byte-for-byte; no normalisation changes claim_type derivation."""
806 from musehub.services.musehub_attestations import issue_attestation
807 from sqlalchemy import select
808 from musehub.db.musehub_identity_models import MusehubAttestation
809
810 priv, pub = _make_keypair()
811 raw_claim = '{"type":"collab","note":"verbatim"}'
812 req = _req("gabriel", "aria", raw_claim, priv, pub)
813 result = await issue_attestation(db_session, req)
814
815 row = (await db_session.execute(
816 select(MusehubAttestation).where(
817 MusehubAttestation.attestation_id == result.attestation_id
818 )
819 )).scalar_one()
820 assert row.claim == raw_claim
821
822
823 # ---------------------------------------------------------------------------
824 # T6 — Performance
825 # ---------------------------------------------------------------------------
826
827 @pytest.mark.tier6
828 @pytest.mark.asyncio
829 async def test_p1_subject_query_under_50ms_with_500_rows(db_session: AsyncSession) -> None:
830 """get_attestations_for_subject with 500 rows completes in < 50ms."""
831 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
832
833 async def _bulk() -> None:
834 for i in range(500):
835 priv, pub = _make_keypair()
836 req = _req(f"bulk-{i:04d}", "perf-subject", '{"type":"trusted"}', priv, pub)
837 await issue_attestation(db_session, req)
838
839 await _bulk()
840
841 t0 = time.monotonic()
842 result = await get_attestations_for_subject(db_session, "perf-subject")
843 elapsed_ms = (time.monotonic() - t0) * 1000
844
845 assert result.total >= 500
846 assert elapsed_ms < 50, f"query took {elapsed_ms:.1f}ms — expected < 50ms"
847
848
849 @pytest.mark.tier6
850 @pytest.mark.asyncio
851 async def test_p2_commit_query_under_20ms(db_session: AsyncSession) -> None:
852 """get_attestations_for_commit with index returns in < 20ms."""
853 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_commit
854
855 cid = "sha256:" + "9" * 64
856 for i in range(50):
857 priv, pub = _make_keypair()
858 req = _req(
859 f"perf-att-{i:03d}", "gabriel/musehub",
860 '{"type":"deploy:approved"}', priv, pub,
861 scope="commit", scope_ref=f"gabriel/musehub@{cid}", commit_id=cid,
862 )
863 await issue_attestation(db_session, req)
864
865 t0 = time.monotonic()
866 result = await get_attestations_for_commit(db_session, cid)
867 elapsed_ms = (time.monotonic() - t0) * 1000
868
869 assert result.total >= 50
870 assert elapsed_ms < 50, f"commit query took {elapsed_ms:.1f}ms — expected < 50ms"
871
872
873 # ---------------------------------------------------------------------------
874 # T7 — Security
875 # ---------------------------------------------------------------------------
876
877 @pytest.mark.tier7
878 @pytest.mark.asyncio
879 async def test_sec1_cross_protocol_replay_rejected(db_session: AsyncSession) -> None:
880 """MSign-prefixed message cannot satisfy attestation signature check."""
881 from musehub.services.musehub_attestations import issue_attestation
882
883 priv, pub = _make_keypair()
884 ts = _utc()
885 claim = '{"type":"collab"}'
886 # Sign with MSign prefix instead of ATTEST prefix
887 msign_msg = f"MUSE-SIGN-V1\ngabriel\naria\n{claim}\n{ts.isoformat()}".encode()
888 bad_sig = encode_sig("ed25519", priv.sign(msign_msg))
889
890 req = AttestationRequest(
891 attester="gabriel", subject="aria", claim=claim,
892 signature=bad_sig, attester_public_key=pub, issued_at=ts,
893 )
894 with pytest.raises(ValueError, match="Invalid attestation signature"):
895 await issue_attestation(db_session, req)
896
897
898 @pytest.mark.tier7
899 @pytest.mark.asyncio
900 async def test_sec2_attester_impersonation_rejected(db_session: AsyncSession) -> None:
901 """Claiming attester=gabriel but signing with a different key → rejected."""
902 from musehub.services.musehub_attestations import issue_attestation
903
904 _, real_pub = _make_keypair() # gabriel's real key (pub only)
905 evil_priv, _ = _make_keypair() # attacker's key
906
907 ts = _utc()
908 claim = '{"type":"human"}'
909 # Attacker signs with own key but presents gabriel's pubkey
910 evil_sig = _sign(evil_priv, "gabriel", "aria", claim, ts.isoformat())
911
912 req = AttestationRequest(
913 attester="gabriel", subject="aria", claim=claim,
914 signature=evil_sig, attester_public_key=real_pub, issued_at=ts,
915 )
916 with pytest.raises(ValueError, match="Invalid attestation signature"):
917 await issue_attestation(db_session, req)
918
919
920 @pytest.mark.tier7
921 @pytest.mark.asyncio
922 async def test_sec3_tampered_claim_rejected(db_session: AsyncSession) -> None:
923 """Signature over original claim does not verify for a mutated claim."""
924 from musehub.services.musehub_attestations import issue_attestation
925
926 priv, pub = _make_keypair()
927 ts = _utc()
928 original_claim = '{"type":"collab"}'
929 tampered_claim = '{"type":"human"}' # different type
930 sig = _sign(priv, "gabriel", "aria", original_claim, ts.isoformat())
931
932 req = AttestationRequest(
933 attester="gabriel", subject="aria", claim=tampered_claim,
934 signature=sig, attester_public_key=pub, issued_at=ts,
935 )
936 with pytest.raises(ValueError, match="Invalid attestation signature"):
937 await issue_attestation(db_session, req)
938
939
940 @pytest.mark.tier7
941 @pytest.mark.asyncio
942 async def test_sec4_revoker_impersonation_rejected(db_session: AsyncSession) -> None:
943 """Wrong revoker cannot retract someone else's attestation."""
944 from musehub.services.musehub_attestations import issue_attestation, revoke_attestation
945
946 priv, pub = _make_keypair()
947 req = _req("gabriel", "aria", '{"type":"trusted"}', priv, pub)
948 result = await issue_attestation(db_session, req)
949
950 with pytest.raises(PermissionError, match="only the attester can revoke"):
951 await revoke_attestation(db_session, result.attestation_id, revoker="evil-actor")
952
953
954 @pytest.mark.tier7
955 @pytest.mark.asyncio
956 async def test_sec5_expired_attestation_excluded_without_flag(db_session: AsyncSession) -> None:
957 """Expired attestation is excluded from live queries even if not explicitly revoked."""
958 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
959
960 priv, pub = _make_keypair()
961 past = _utc() - timedelta(hours=1)
962 req = _req("gabriel", "aria-exp", '{"type":"contractor"}', priv, pub, expires_at=past)
963 result = await issue_attestation(db_session, req)
964
965 listed = await get_attestations_for_subject(db_session, "aria-exp")
966 assert all(a.attestation_id != result.attestation_id for a in listed.attestations)
967 assert result.revoked_at is None # not revoked — just expired
968
969
970 @pytest.mark.tier7
971 @pytest.mark.asyncio
972 async def test_sec6_key_rotation_old_attestations_still_verifiable(db_session: AsyncSession) -> None:
973 """attester_public_key is stored per-attestation; key rotation doesn't break old records."""
974 from musehub.services.musehub_attestations import issue_attestation, verify_stored_attestation
975
976 old_priv, old_pub = _make_keypair()
977 req = _req("gabriel", "aria", '{"type":"collab"}', old_priv, old_pub)
978 result = await issue_attestation(db_session, req)
979
980 # Simulate key rotation — new key exists but old attestation used old key
981 _new_priv, _new_pub = _make_keypair()
982
983 # Verify using the stored public key (not the current key)
984 ok, reason = await verify_stored_attestation(db_session, result.attestation_id)
985 assert ok, f"old attestation failed verification after key rotation: {reason}"
986
987
988 @pytest.mark.tier7
989 @pytest.mark.asyncio
990 async def test_sec7_sql_metachar_in_claim_stored_safely(db_session: AsyncSession) -> None:
991 """SQL metacharacters in claim payload are stored verbatim without injection."""
992 from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject
993
994 priv, pub = _make_keypair()
995 evil_claim = '{"type":"collab","note":"Robert\'); DROP TABLE musehub_attestations;--"}'
996 req = _req("gabriel", "aria-sql", evil_claim, priv, pub)
997 result = await issue_attestation(db_session, req)
998
999 listed = await get_attestations_for_subject(db_session, "aria-sql")
1000 found = next((a for a in listed.attestations if a.attestation_id == result.attestation_id), None)
1001 assert found is not None
1002 assert "DROP TABLE" in found.claim # stored verbatim, not executed
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago