"""TDD — attestation system Phase 1: claim-type registry, scope, expiry. Seven test tiers: T1 Unit — pure functions, no I/O T2 Integration — real DB via db_session fixture T3 End-to-end — FastAPI TestClient round-trips T4 Stress — concurrency / idempotency under load T5 Data integrity — DB invariants, no silent mutation T6 Performance — query latency with index coverage T7 Security — sig replay, cross-protocol, impersonation Run a single tier: pytest tests/test_attestations_phase1.py -m tier1 -q --tb=short """ from __future__ import annotations import asyncio import json import time from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.types import encode_pubkey, encode_sig from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from musehub.core.genesis import compute_attestation_id from musehub.models.musehub import AttestationRequest, AttestationResponse from tests.factories import create_profile, create_repo # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _utc() -> datetime: return datetime.now(tz=timezone.utc) def _make_keypair() -> tuple[Ed25519PrivateKey, str]: priv = Ed25519PrivateKey.generate() pub = encode_pubkey("ed25519", priv.public_key().public_bytes_raw()) return priv, pub def _sign( priv: Ed25519PrivateKey, attester: str, subject: str, claim: str, ts: str, scope: str = "identity", scope_ref: str | None = None, ) -> str: parts = ["ATTEST", attester, subject, claim, ts] if scope != "identity" and scope_ref: parts.append(scope_ref) msg = "\n".join(parts).encode() return encode_sig("ed25519", priv.sign(msg)) def _req( attester: str, subject: str, claim: str, priv: Ed25519PrivateKey, pub: str, *, scope: str = "identity", scope_ref: str | None = None, repo_id: str | None = None, commit_id: str | None = None, expires_at: datetime | None = None, issued_at: datetime | None = None, ) -> AttestationRequest: ts = (issued_at or _utc()).isoformat() sig = _sign(priv, attester, subject, claim, ts, scope=scope, scope_ref=scope_ref) return AttestationRequest( attester=attester, subject=subject, claim=claim, signature=sig, attester_public_key=pub, issued_at=issued_at or datetime.fromisoformat(ts), scope=scope, scope_ref=scope_ref, repo_id=repo_id, commit_id=commit_id, expires_at=expires_at, ) # --------------------------------------------------------------------------- # T1 — Unit: claim-type registry # --------------------------------------------------------------------------- @pytest.mark.tier1 class TestClaimTypeRegistry: """Pure registry lookups — no DB, no I/O.""" def test_known_type_resolves(self) -> None: from musehub.services.musehub_attestations import get_claim_type ct = get_claim_type("human") assert ct is not None assert ct["type_key"] == "human" assert ct["category"] == "identity" def test_unknown_type_raises(self) -> None: from musehub.services.musehub_attestations import get_claim_type with pytest.raises(ValueError, match="unknown claim type"): get_claim_type("nonsense-claim-xyz") def test_deprecated_type_raises_on_issue(self) -> None: """Deprecated types must be rejected at write time.""" from musehub.services.musehub_attestations import validate_claim_for_issue with pytest.raises(ValueError, match="deprecated"): validate_claim_for_issue("__deprecated_test__") def test_all_seed_types_present(self) -> None: from musehub.services.musehub_attestations import list_claim_types keys = {ct["type_key"] for ct in list_claim_types()} for expected in ( "human", "org", "agent", "spawned-by", "delegate", "trusted", "collab", "co-author", "contractor", "code:reviewed", "code:approved", "deploy:approved", "stems:verified", "mix:approved", "midi:generated", "master:approved", "skill:verified", ): assert expected in keys, f"seed type '{expected}' missing from registry" def test_valid_scopes_per_type(self) -> None: from musehub.services.musehub_attestations import get_claim_type human = get_claim_type("human") assert human["valid_scopes"] == ["identity"] deploy = get_claim_type("deploy:approved") assert "commit" in deploy["valid_scopes"] assert "identity" not in deploy["valid_scopes"] stems = get_claim_type("stems:verified") assert "identity" in stems["valid_scopes"] assert "commit" in stems["valid_scopes"] # --------------------------------------------------------------------------- # T1 — Unit: scope_ref parsing # --------------------------------------------------------------------------- @pytest.mark.tier1 class TestScopeRefParsing: """Parse and validate the scope_ref string formats.""" def test_identity_scope_ref_is_handle(self) -> None: from musehub.services.musehub_attestations import parse_scope_ref result = parse_scope_ref("identity", "gabriel") assert result["handle"] == "gabriel" assert result["repo_slug"] is None assert result["commit_id"] is None def test_repo_scope_ref_parsed(self) -> None: from musehub.services.musehub_attestations import parse_scope_ref result = parse_scope_ref("repo", "gabriel/musehub") assert result["handle"] == "gabriel" assert result["repo_slug"] == "musehub" assert result["commit_id"] is None def test_commit_scope_ref_parsed(self) -> None: from musehub.services.musehub_attestations import parse_scope_ref cid = "sha256:" + "a" * 64 result = parse_scope_ref("commit", f"gabriel/musehub@{cid}") assert result["handle"] == "gabriel" assert result["repo_slug"] == "musehub" assert result["commit_id"] == cid def test_commit_scope_ref_missing_at_raises(self) -> None: from musehub.services.musehub_attestations import parse_scope_ref with pytest.raises(ValueError, match="commit scope_ref must contain '@'"): parse_scope_ref("commit", "gabriel/musehub") def test_invalid_scope_raises(self) -> None: from musehub.services.musehub_attestations import parse_scope_ref with pytest.raises(ValueError, match="invalid scope"): parse_scope_ref("galaxy", "gabriel") # --------------------------------------------------------------------------- # T1 — Unit: scope enforcement against claim type valid_scopes # --------------------------------------------------------------------------- @pytest.mark.tier1 class TestScopeClaimEnforcement: def test_human_on_commit_scope_raises(self) -> None: from musehub.services.musehub_attestations import validate_scope_for_claim with pytest.raises(ValueError, match="'human' is not valid for scope 'commit'"): validate_scope_for_claim("human", "commit") def test_deploy_approved_on_identity_scope_raises(self) -> None: from musehub.services.musehub_attestations import validate_scope_for_claim with pytest.raises(ValueError, match="'deploy:approved' is not valid for scope 'identity'"): validate_scope_for_claim("deploy:approved", "identity") def test_collab_on_commit_scope_passes(self) -> None: from musehub.services.musehub_attestations import validate_scope_for_claim validate_scope_for_claim("collab", "commit") # must not raise def test_stems_verified_on_identity_passes(self) -> None: from musehub.services.musehub_attestations import validate_scope_for_claim validate_scope_for_claim("stems:verified", "identity") # must not raise # --------------------------------------------------------------------------- # T1 — Unit: canonical message includes scope_ref # --------------------------------------------------------------------------- @pytest.mark.tier1 class TestCanonicalMessage: def test_identity_scope_message_unchanged(self) -> None: """Legacy identity-scope message must stay compatible with existing sigs.""" from musehub.services.musehub_attestations import build_canonical_message msg = build_canonical_message( attester="gabriel", subject="aria", claim='{"type":"human"}', issued_at_iso="2026-05-01T00:00:00+00:00", scope="identity", scope_ref=None, ) assert msg == b"ATTEST\ngabriel\naria\n{\"type\":\"human\"}\n2026-05-01T00:00:00+00:00" def test_commit_scope_appends_scope_ref(self) -> None: from musehub.services.musehub_attestations import build_canonical_message cid = "sha256:" + "b" * 64 msg = build_canonical_message( attester="gabriel", subject="gabriel/musehub", claim='{"type":"deploy:approved"}', issued_at_iso="2026-05-01T00:00:00+00:00", scope="commit", scope_ref=f"gabriel/musehub@{cid}", ) assert f"gabriel/musehub@{cid}".encode() in msg def test_deterministic(self) -> None: from musehub.services.musehub_attestations import build_canonical_message kwargs = dict( attester="a", subject="b", claim='{"type":"collab"}', issued_at_iso="2026-01-01T00:00:00+00:00", scope="identity", scope_ref=None, ) assert build_canonical_message(**kwargs) == build_canonical_message(**kwargs) # --------------------------------------------------------------------------- # T1 — Unit: AttestationRequest Pydantic validation # --------------------------------------------------------------------------- @pytest.mark.tier1 class TestAttestationRequestModel: def test_scope_default_is_identity(self) -> None: priv, pub = _make_keypair() ts = _utc() sig = _sign(priv, "a", "b", '{"type":"collab"}', ts.isoformat()) req = AttestationRequest( attester="a", subject="b", claim='{"type":"collab"}', signature=sig, attester_public_key=pub, issued_at=ts, ) assert req.scope == "identity" assert req.scope_ref is None assert req.expires_at is None def test_commit_scope_requires_scope_ref(self) -> None: priv, pub = _make_keypair() ts = _utc() sig = _sign(priv, "a", "b", '{"type":"deploy:approved"}', ts.isoformat()) with pytest.raises(Exception): AttestationRequest( attester="a", subject="b", claim='{"type":"deploy:approved"}', signature=sig, attester_public_key=pub, issued_at=ts, scope="commit", scope_ref=None, # missing scope_ref for commit ) def test_expires_at_accepted(self) -> None: priv, pub = _make_keypair() ts = _utc() exp = ts + timedelta(days=90) sig = _sign(priv, "a", "b", '{"type":"collab"}', ts.isoformat()) req = AttestationRequest( attester="a", subject="b", claim='{"type":"collab"}', signature=sig, attester_public_key=pub, issued_at=ts, expires_at=exp, ) assert req.expires_at == exp # --------------------------------------------------------------------------- # T2 — Integration: issue with scope and expiry (real DB) # --------------------------------------------------------------------------- @pytest.mark.tier2 @pytest.mark.asyncio async def test_i1_issue_identity_scope(db_session: AsyncSession) -> None: """Basic identity-scoped attestation round-trip.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() req = _req("gabriel", "aria", '{"type":"human"}', priv, pub) result = await issue_attestation(db_session, req) assert result.attester == "gabriel" assert result.subject == "aria" assert result.scope == "identity" assert result.scope_ref is None assert result.expires_at is None assert result.revoked_at is None listed = await get_attestations_for_subject(db_session, "aria") assert any(a.attestation_id == result.attestation_id for a in listed.attestations) @pytest.mark.tier2 @pytest.mark.asyncio async def test_i2_issue_commit_scope(db_session: AsyncSession) -> None: """Commit-scoped attestation stores scope_ref and commit_id correctly.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_commit priv, pub = _make_keypair() cid = "sha256:" + "c" * 64 scope_ref = f"gabriel/musehub@{cid}" req = _req( "gabriel", "gabriel/musehub", '{"type":"deploy:approved"}', priv, pub, scope="commit", scope_ref=scope_ref, commit_id=cid, ) result = await issue_attestation(db_session, req) assert result.scope == "commit" assert result.scope_ref == scope_ref assert result.commit_id == cid by_commit = await get_attestations_for_commit(db_session, cid) assert any(a.attestation_id == result.attestation_id for a in by_commit.attestations) @pytest.mark.tier2 @pytest.mark.asyncio async def test_i3_commit_scope_excluded_from_identity_query(db_session: AsyncSession) -> None: """Commit-scoped attestations must not appear in identity-scoped profile queries.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() cid = "sha256:" + "d" * 64 req = _req( "gabriel", "gabriel/musehub", '{"type":"deploy:approved"}', priv, pub, scope="commit", scope_ref=f"gabriel/musehub@{cid}", commit_id=cid, ) await issue_attestation(db_session, req) # subject for commit scope is repo slug, not a handle — should not appear listed = await get_attestations_for_subject(db_session, "gabriel/musehub") assert all(a.scope == "identity" for a in listed.attestations) @pytest.mark.tier2 @pytest.mark.asyncio async def test_i4_expired_attestation_excluded_by_default(db_session: AsyncSession) -> None: """Attestations past expires_at are excluded from default queries.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() past = _utc() - timedelta(seconds=1) req = _req("gabriel", "aria", '{"type":"contractor"}', priv, pub, expires_at=past) result = await issue_attestation(db_session, req) listed = await get_attestations_for_subject(db_session, "aria") assert all(a.attestation_id != result.attestation_id for a in listed.attestations) @pytest.mark.tier2 @pytest.mark.asyncio async def test_i5_expired_attestation_included_with_flag(db_session: AsyncSession) -> None: from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() past = _utc() - timedelta(seconds=1) req = _req("gabriel", "aria", '{"type":"contractor"}', priv, pub, expires_at=past) result = await issue_attestation(db_session, req) listed = await get_attestations_for_subject(db_session, "aria", include_expired=True) assert any(a.attestation_id == result.attestation_id for a in listed.attestations) @pytest.mark.tier2 @pytest.mark.asyncio async def test_i6_unknown_claim_type_rejected(db_session: AsyncSession) -> None: """Registry enforcement: unknown claim type raises ValueError before DB write.""" from musehub.services.musehub_attestations import issue_attestation priv, pub = _make_keypair() req = _req("gabriel", "aria", '{"type":"galaxy-brain"}', priv, pub) with pytest.raises(ValueError, match="unknown claim type"): await issue_attestation(db_session, req) @pytest.mark.tier2 @pytest.mark.asyncio async def test_i7_wrong_scope_for_claim_type_rejected(db_session: AsyncSession) -> None: """`human` claim type only valid for identity scope — commit scope raises.""" from musehub.services.musehub_attestations import issue_attestation priv, pub = _make_keypair() cid = "sha256:" + "e" * 64 req = _req( "gabriel", "gabriel/musehub", '{"type":"human"}', priv, pub, scope="commit", scope_ref=f"gabriel/musehub@{cid}", commit_id=cid, ) with pytest.raises(ValueError, match="not valid for scope"): await issue_attestation(db_session, req) @pytest.mark.tier2 @pytest.mark.asyncio async def test_i8_idempotent_issue(db_session: AsyncSession) -> None: """Same canonical payload issued twice returns existing row, inserts only once.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() ts = _utc() req = _req("gabriel", "aria", '{"type":"trusted"}', priv, pub, issued_at=ts) r1 = await issue_attestation(db_session, req) r2 = await issue_attestation(db_session, req) assert r1.attestation_id == r2.attestation_id listed = await get_attestations_for_subject(db_session, "aria") matching = [a for a in listed.attestations if a.attestation_id == r1.attestation_id] assert len(matching) == 1 @pytest.mark.tier2 @pytest.mark.asyncio async def test_i9_repo_scope_stored_and_retrieved(db_session: AsyncSession) -> None: from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_repo priv, pub = _make_keypair() req = _req( "gabriel", "gabriel/musehub", '{"type":"code:reviewed"}', priv, pub, scope="repo", scope_ref="gabriel/musehub", ) result = await issue_attestation(db_session, req) assert result.scope == "repo" by_repo = await get_attestations_for_repo(db_session, "gabriel/musehub") assert any(a.attestation_id == result.attestation_id for a in by_repo.attestations) # --------------------------------------------------------------------------- # T2 — Integration: DB-backed claim type registry # --------------------------------------------------------------------------- @pytest.mark.tier2 @pytest.mark.asyncio async def test_i10_registry_seed_types_in_db(db_session: AsyncSession) -> None: """Seed claim types are present in the DB registry after migration.""" from musehub.services.musehub_attestations import get_claim_type_from_db ct = await get_claim_type_from_db(db_session, "human") assert ct is not None assert ct["category"] == "identity" @pytest.mark.tier2 @pytest.mark.asyncio async def test_i11_add_new_claim_type(db_session: AsyncSession) -> None: """New claim types can be added to the DB registry at runtime.""" from musehub.services.musehub_attestations import add_claim_type, get_claim_type_from_db await add_claim_type( db_session, type_key="test:custom", category="collab", label="Custom Test", description="A test-only claim type.", valid_scopes=["identity", "commit"], ) ct = await get_claim_type_from_db(db_session, "test:custom") assert ct is not None assert ct["label"] == "Custom Test" @pytest.mark.tier2 @pytest.mark.asyncio async def test_i12_deprecated_type_rejected(db_session: AsyncSession) -> None: """After deprecating a type, issue_attestation raises ValueError.""" from musehub.services.musehub_attestations import ( add_claim_type, deprecate_claim_type, issue_attestation, ) await add_claim_type( db_session, type_key="test:to-deprecate", category="collab", label="Deprecated Soon", description="Will be deprecated.", valid_scopes=["identity"], ) await deprecate_claim_type(db_session, "test:to-deprecate") priv, pub = _make_keypair() req = _req("gabriel", "aria", '{"type":"test:to-deprecate"}', priv, pub) with pytest.raises(ValueError, match="deprecated"): await issue_attestation(db_session, req) # --------------------------------------------------------------------------- # T3 — End-to-end: API round-trips via TestClient # --------------------------------------------------------------------------- @pytest.fixture def real_keypair() -> tuple[Ed25519PrivateKey, str]: """A fresh Ed25519 keypair for E2E signing.""" return _make_keypair() @pytest.mark.tier3 class TestAttestationAPIRoundTrip: @pytest.mark.asyncio async def test_e2e_create_and_list_identity_scope(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None: """POST /api/profiles/{handle}/attestations → GET lists it.""" priv, pub = real_keypair ts = _utc() claim = '{"type":"collab"}' sig = _sign(priv, "gabriel", "aria", claim, ts.isoformat()) resp = await client.post( "/api/profiles/aria/attestations", json={ "attester": "gabriel", "subject": "aria", "claim": claim, "signature": sig, "attester_public_key": pub, "issued_at": ts.isoformat(), "scope": "identity", }, ) assert resp.status_code == 201 aid = resp.json()["attestationId"] resp = await client.get("/api/profiles/aria/attestations") assert resp.status_code == 200 ids = [a["attestationId"] for a in resp.json()["attestations"]] assert aid in ids @pytest.mark.asyncio async def test_e2e_commit_scope_not_in_profile_list(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None: """Commit-scoped attestations don't appear in identity profile listing.""" priv, pub = real_keypair ts = _utc() cid = "sha256:" + "f" * 64 claim = '{"type":"deploy:approved"}' sig = _sign(priv, "gabriel", "gabriel/musehub", claim, ts.isoformat(), scope="commit", scope_ref=f"gabriel/musehub@{cid}") resp = await client.post( "/api/profiles/gabriel/attestations", json={ "attester": "gabriel", "subject": "gabriel/musehub", "claim": claim, "signature": sig, "attester_public_key": pub, "issued_at": ts.isoformat(), "scope": "commit", "scope_ref": f"gabriel/musehub@{cid}", "commit_id": cid, }, ) assert resp.status_code == 201 resp = await client.get("/api/profiles/gabriel/attestations") for a in resp.json()["attestations"]: assert a.get("scope", "identity") == "identity" @pytest.mark.asyncio async def test_e2e_unknown_type_returns_400(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None: priv, pub = real_keypair ts = _utc() claim = '{"type":"made-up-claim"}' sig = _sign(priv, "gabriel", "aria", claim, ts.isoformat()) resp = await client.post( "/api/profiles/aria/attestations", json={ "attester": "gabriel", "subject": "aria", "claim": claim, "signature": sig, "attester_public_key": pub, "issued_at": ts.isoformat(), "scope": "identity", }, ) assert resp.status_code == 400 @pytest.mark.asyncio async def test_e2e_claim_types_endpoint(self, client: AsyncClient) -> None: """GET /api/profiles/attestation-types returns the seeded registry.""" resp = await client.get("/api/profiles/attestation-types") assert resp.status_code == 200 keys = {ct["typeKey"] for ct in resp.json()["claimTypes"]} assert "human" in keys assert "deploy:approved" in keys @pytest.mark.asyncio async def test_e2e_revoke_removes_from_listing(self, client: AsyncClient, real_keypair: tuple[Ed25519PrivateKey, str]) -> None: priv, pub = real_keypair ts = _utc() claim = '{"type":"trusted"}' sig = _sign(priv, "gabriel", "aria", claim, ts.isoformat()) create = await client.post( "/api/profiles/aria/attestations", json={ "attester": "gabriel", "subject": "aria", "claim": claim, "signature": sig, "attester_public_key": pub, "issued_at": ts.isoformat(), "scope": "identity", }, ) assert create.status_code == 201 aid = create.json()["attestationId"] revoke = await client.delete(f"/api/profiles/aria/attestations/{aid}", params={"revoker": "gabriel"}) assert revoke.status_code == 200 listed = await client.get("/api/profiles/aria/attestations") ids = [a["attestationId"] for a in listed.json()["attestations"]] assert aid not in ids # --------------------------------------------------------------------------- # T4 — Stress: idempotency under concurrent writes # --------------------------------------------------------------------------- @pytest.mark.tier4 @pytest.mark.asyncio async def test_s1_concurrent_identical_issue_is_idempotent(session_factory: async_sessionmaker[AsyncSession]) -> None: """50 concurrent issues of the same payload → exactly 1 row.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() ts = _utc() req = _req("gabriel", "aria-stress", '{"type":"collab"}', priv, pub, issued_at=ts) async def _do() -> AttestationResponse: async with session_factory() as sess: return await issue_attestation(sess, req) results = await asyncio.gather(*[_do() for _ in range(50)]) ids = {r.attestation_id for r in results} assert len(ids) == 1 async with session_factory() as check_sess: listed = await get_attestations_for_subject(check_sess, "aria-stress") matching = [a for a in listed.attestations if a.attester == "gabriel"] assert len(matching) == 1 @pytest.mark.tier4 @pytest.mark.asyncio async def test_s2_many_distinct_attestations(session_factory: async_sessionmaker[AsyncSession]) -> None: """100 distinct attestations insert without error (max 10 concurrent connections).""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject sem = asyncio.Semaphore(10) async def _issue(i: int) -> None: priv, pub = _make_keypair() req = _req(f"attester-{i:04d}", "aria-many", '{"type":"trusted"}', priv, pub) async with sem: async with session_factory() as sess: await issue_attestation(sess, req) await asyncio.gather(*[_issue(i) for i in range(100)]) async with session_factory() as check_sess: listed = await get_attestations_for_subject(check_sess, "aria-many") assert listed.total >= 100 # --------------------------------------------------------------------------- # T5 — Data integrity # --------------------------------------------------------------------------- @pytest.mark.tier5 @pytest.mark.asyncio async def test_d1_attestation_id_unique_constraint(db_session: AsyncSession) -> None: """Duplicate attestation_id raises IntegrityError, not silent overwrite.""" import sqlalchemy.exc from musehub.db.musehub_identity_models import MusehubAttestation priv, pub = _make_keypair() ts = _utc() claim = '{"type":"collab"}' aid = compute_attestation_id("gabriel", "aria", claim, ts.isoformat()) row = MusehubAttestation( attestation_id=aid, attester="gabriel", subject="aria", claim=claim, signature=_sign(priv, "gabriel", "aria", claim, ts.isoformat()), attester_public_key=pub, issued_at=ts, scope="identity", ) db_session.add(row) await db_session.commit() db_session.expunge(row) dup = MusehubAttestation( attestation_id=aid, # same PK attester="gabriel", subject="aria", claim=claim, signature=_sign(priv, "gabriel", "aria", claim, ts.isoformat()), attester_public_key=pub, issued_at=ts, scope="identity", ) db_session.add(dup) with pytest.raises(sqlalchemy.exc.IntegrityError): await db_session.commit() @pytest.mark.tier5 @pytest.mark.asyncio async def test_d2_revoked_at_set_not_deleted(db_session: AsyncSession) -> None: """Revocation sets revoked_at; the row is never deleted.""" from musehub.services.musehub_attestations import issue_attestation, revoke_attestation from sqlalchemy import select from musehub.db.musehub_identity_models import MusehubAttestation priv, pub = _make_keypair() req = _req("gabriel", "aria", '{"type":"trusted"}', priv, pub) result = await issue_attestation(db_session, req) await revoke_attestation(db_session, result.attestation_id, revoker="gabriel") row = (await db_session.execute( select(MusehubAttestation).where( MusehubAttestation.attestation_id == result.attestation_id ) )).scalar_one() assert row.revoked_at is not None assert row.attester == "gabriel" # row intact @pytest.mark.tier5 @pytest.mark.asyncio async def test_d3_scope_columns_non_nullable_defaults(db_session: AsyncSession) -> None: """scope column has non-null default of 'identity'; scope_ref and commit_id are nullable.""" from musehub.services.musehub_attestations import issue_attestation from sqlalchemy import select from musehub.db.musehub_identity_models import MusehubAttestation priv, pub = _make_keypair() req = _req("gabriel", "aria", '{"type":"collab"}', priv, pub) result = await issue_attestation(db_session, req) row = (await db_session.execute( select(MusehubAttestation).where( MusehubAttestation.attestation_id == result.attestation_id ) )).scalar_one() assert row.scope == "identity" assert row.scope_ref is None assert row.commit_id is None assert row.expires_at is None @pytest.mark.tier5 @pytest.mark.asyncio async def test_d4_claim_stored_verbatim(db_session: AsyncSession) -> None: """claim JSON is stored byte-for-byte; no normalisation changes claim_type derivation.""" from musehub.services.musehub_attestations import issue_attestation from sqlalchemy import select from musehub.db.musehub_identity_models import MusehubAttestation priv, pub = _make_keypair() raw_claim = '{"type":"collab","note":"verbatim"}' req = _req("gabriel", "aria", raw_claim, priv, pub) result = await issue_attestation(db_session, req) row = (await db_session.execute( select(MusehubAttestation).where( MusehubAttestation.attestation_id == result.attestation_id ) )).scalar_one() assert row.claim == raw_claim # --------------------------------------------------------------------------- # T6 — Performance # --------------------------------------------------------------------------- @pytest.mark.tier6 @pytest.mark.asyncio async def test_p1_subject_query_under_50ms_with_500_rows(db_session: AsyncSession) -> None: """get_attestations_for_subject with 500 rows completes in < 50ms.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject async def _bulk() -> None: for i in range(500): priv, pub = _make_keypair() req = _req(f"bulk-{i:04d}", "perf-subject", '{"type":"trusted"}', priv, pub) await issue_attestation(db_session, req) await _bulk() t0 = time.monotonic() result = await get_attestations_for_subject(db_session, "perf-subject") elapsed_ms = (time.monotonic() - t0) * 1000 assert result.total >= 500 assert elapsed_ms < 50, f"query took {elapsed_ms:.1f}ms — expected < 50ms" @pytest.mark.tier6 @pytest.mark.asyncio async def test_p2_commit_query_under_20ms(db_session: AsyncSession) -> None: """get_attestations_for_commit with index returns in < 20ms.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_commit cid = "sha256:" + "9" * 64 for i in range(50): priv, pub = _make_keypair() req = _req( f"perf-att-{i:03d}", "gabriel/musehub", '{"type":"deploy:approved"}', priv, pub, scope="commit", scope_ref=f"gabriel/musehub@{cid}", commit_id=cid, ) await issue_attestation(db_session, req) t0 = time.monotonic() result = await get_attestations_for_commit(db_session, cid) elapsed_ms = (time.monotonic() - t0) * 1000 assert result.total >= 50 assert elapsed_ms < 50, f"commit query took {elapsed_ms:.1f}ms — expected < 50ms" # --------------------------------------------------------------------------- # T7 — Security # --------------------------------------------------------------------------- @pytest.mark.tier7 @pytest.mark.asyncio async def test_sec1_cross_protocol_replay_rejected(db_session: AsyncSession) -> None: """MSign-prefixed message cannot satisfy attestation signature check.""" from musehub.services.musehub_attestations import issue_attestation priv, pub = _make_keypair() ts = _utc() claim = '{"type":"collab"}' # Sign with MSign prefix instead of ATTEST prefix msign_msg = f"MUSE-SIGN-V1\ngabriel\naria\n{claim}\n{ts.isoformat()}".encode() bad_sig = encode_sig("ed25519", priv.sign(msign_msg)) req = AttestationRequest( attester="gabriel", subject="aria", claim=claim, signature=bad_sig, attester_public_key=pub, issued_at=ts, ) with pytest.raises(ValueError, match="Invalid attestation signature"): await issue_attestation(db_session, req) @pytest.mark.tier7 @pytest.mark.asyncio async def test_sec2_attester_impersonation_rejected(db_session: AsyncSession) -> None: """Claiming attester=gabriel but signing with a different key → rejected.""" from musehub.services.musehub_attestations import issue_attestation _, real_pub = _make_keypair() # gabriel's real key (pub only) evil_priv, _ = _make_keypair() # attacker's key ts = _utc() claim = '{"type":"human"}' # Attacker signs with own key but presents gabriel's pubkey evil_sig = _sign(evil_priv, "gabriel", "aria", claim, ts.isoformat()) req = AttestationRequest( attester="gabriel", subject="aria", claim=claim, signature=evil_sig, attester_public_key=real_pub, issued_at=ts, ) with pytest.raises(ValueError, match="Invalid attestation signature"): await issue_attestation(db_session, req) @pytest.mark.tier7 @pytest.mark.asyncio async def test_sec3_tampered_claim_rejected(db_session: AsyncSession) -> None: """Signature over original claim does not verify for a mutated claim.""" from musehub.services.musehub_attestations import issue_attestation priv, pub = _make_keypair() ts = _utc() original_claim = '{"type":"collab"}' tampered_claim = '{"type":"human"}' # different type sig = _sign(priv, "gabriel", "aria", original_claim, ts.isoformat()) req = AttestationRequest( attester="gabriel", subject="aria", claim=tampered_claim, signature=sig, attester_public_key=pub, issued_at=ts, ) with pytest.raises(ValueError, match="Invalid attestation signature"): await issue_attestation(db_session, req) @pytest.mark.tier7 @pytest.mark.asyncio async def test_sec4_revoker_impersonation_rejected(db_session: AsyncSession) -> None: """Wrong revoker cannot retract someone else's attestation.""" from musehub.services.musehub_attestations import issue_attestation, revoke_attestation priv, pub = _make_keypair() req = _req("gabriel", "aria", '{"type":"trusted"}', priv, pub) result = await issue_attestation(db_session, req) with pytest.raises(PermissionError, match="only the attester can revoke"): await revoke_attestation(db_session, result.attestation_id, revoker="evil-actor") @pytest.mark.tier7 @pytest.mark.asyncio async def test_sec5_expired_attestation_excluded_without_flag(db_session: AsyncSession) -> None: """Expired attestation is excluded from live queries even if not explicitly revoked.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() past = _utc() - timedelta(hours=1) req = _req("gabriel", "aria-exp", '{"type":"contractor"}', priv, pub, expires_at=past) result = await issue_attestation(db_session, req) listed = await get_attestations_for_subject(db_session, "aria-exp") assert all(a.attestation_id != result.attestation_id for a in listed.attestations) assert result.revoked_at is None # not revoked — just expired @pytest.mark.tier7 @pytest.mark.asyncio async def test_sec6_key_rotation_old_attestations_still_verifiable(db_session: AsyncSession) -> None: """attester_public_key is stored per-attestation; key rotation doesn't break old records.""" from musehub.services.musehub_attestations import issue_attestation, verify_stored_attestation old_priv, old_pub = _make_keypair() req = _req("gabriel", "aria", '{"type":"collab"}', old_priv, old_pub) result = await issue_attestation(db_session, req) # Simulate key rotation — new key exists but old attestation used old key _new_priv, _new_pub = _make_keypair() # Verify using the stored public key (not the current key) ok, reason = await verify_stored_attestation(db_session, result.attestation_id) assert ok, f"old attestation failed verification after key rotation: {reason}" @pytest.mark.tier7 @pytest.mark.asyncio async def test_sec7_sql_metachar_in_claim_stored_safely(db_session: AsyncSession) -> None: """SQL metacharacters in claim payload are stored verbatim without injection.""" from musehub.services.musehub_attestations import issue_attestation, get_attestations_for_subject priv, pub = _make_keypair() evil_claim = '{"type":"collab","note":"Robert\'); DROP TABLE musehub_attestations;--"}' req = _req("gabriel", "aria-sql", evil_claim, priv, pub) result = await issue_attestation(db_session, req) listed = await get_attestations_for_subject(db_session, "aria-sql") found = next((a for a in listed.attestations if a.attestation_id == result.attestation_id), None) assert found is not None assert "DROP TABLE" in found.claim # stored verbatim, not executed