test_identity_admin_separation.py
python
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923
fix(issues): use issue number as pagination cursor, not cre…
Sonnet 4.6
patch
8 days ago
| 1 | """TDD — identity_type and admin role must be separate concerns. |
| 2 | |
| 3 | Design invariants |
| 4 | ----------------- |
| 5 | identity_type ∈ {"human", "agent", "org"} |
| 6 | Describes what kind of entity an identity IS. |
| 7 | Set at registration and immutable thereafter. |
| 8 | "org" identities are created via POST /api/orgs, not self-registration. |
| 9 | |
| 10 | is_admin (bool column on musehub_identities) |
| 11 | A MuseHub-level privilege flag — describes what a human CAN DO on the hub. |
| 12 | Defaults False. Set only by an out-of-band operator action (migration or |
| 13 | a future admin-grant endpoint), never by self-registration. |
| 14 | |
| 15 | These two concepts must never be conflated. Specifically: |
| 16 | - identity_type="admin" must not be a valid value. |
| 17 | - Self-registration (POST /api/auth/verify) must reject identity_type="admin". |
| 18 | - MSignContext.is_admin must derive from the is_admin column, not identity_type. |
| 19 | - Admin-gated endpoints must check the column, not the type string. |
| 20 | |
| 21 | Tests |
| 22 | ----- |
| 23 | R1 Valid self-registration identity_types — "human" and "agent" are accepted. |
| 24 | R2 "admin" identity_type rejected — POST /api/auth/verify with identity_type="admin" |
| 25 | must return a validation error. |
| 26 | R3 is_admin column exists on MusehubIdentity — ORM model has the field. |
| 27 | R4 MSignContext.is_admin derives from is_admin column — a human identity with |
| 28 | is_admin=True produces is_admin=True in context, not because of identity_type. |
| 29 | R5 identity_type="admin" does NOT grant is_admin — old conflation no longer works. |
| 30 | R6 Admin domain endpoints accept human with is_admin=True. |
| 31 | R7 Admin domain endpoints reject human with is_admin=False. |
| 32 | """ |
| 33 | from __future__ import annotations |
| 34 | |
| 35 | import pytest |
| 36 | from httpx import AsyncClient |
| 37 | from sqlalchemy.ext.asyncio import AsyncSession |
| 38 | |
| 39 | from musehub.auth.request_signing import MSignContext |
| 40 | from musehub.db.musehub_identity_models import MusehubIdentity |
| 41 | from musehub.db.musehub_domain_models import MusehubDomain |
| 42 | from musehub.main import app |
| 43 | from musehub.auth.request_signing import require_signed_request |
| 44 | |
| 45 | |
| 46 | # --------------------------------------------------------------------------- |
| 47 | # Helpers |
| 48 | # --------------------------------------------------------------------------- |
| 49 | |
| 50 | def _make_identity( |
| 51 | handle: str, |
| 52 | identity_type: str = "human", |
| 53 | is_admin: bool = False, |
| 54 | ) -> MusehubIdentity: |
| 55 | from muse.core.types import long_id |
| 56 | identity_id = long_id(handle.ljust(64, "0")[:64]) |
| 57 | return MusehubIdentity( |
| 58 | identity_id=identity_id, |
| 59 | handle=handle, |
| 60 | identity_type=identity_type, |
| 61 | is_admin=is_admin, |
| 62 | ) |
| 63 | |
| 64 | |
| 65 | # --------------------------------------------------------------------------- |
| 66 | # R3 — is_admin column exists on MusehubIdentity |
| 67 | # --------------------------------------------------------------------------- |
| 68 | |
| 69 | def test_r3_is_admin_column_exists_on_orm_model() -> None: |
| 70 | """MusehubIdentity must have an is_admin boolean field.""" |
| 71 | identity = MusehubIdentity( |
| 72 | identity_id="sha256:" + "a" * 64, |
| 73 | handle="testuser", |
| 74 | identity_type="human", |
| 75 | is_admin=False, |
| 76 | ) |
| 77 | assert hasattr(identity, "is_admin") |
| 78 | assert identity.is_admin is False |
| 79 | |
| 80 | |
| 81 | def test_r3_is_admin_defaults_false() -> None: |
| 82 | """is_admin must default to False so existing rows are not accidentally elevated.""" |
| 83 | identity = MusehubIdentity( |
| 84 | identity_id="sha256:" + "b" * 64, |
| 85 | handle="testuser2", |
| 86 | identity_type="human", |
| 87 | ) |
| 88 | assert identity.is_admin is False |
| 89 | |
| 90 | |
| 91 | # --------------------------------------------------------------------------- |
| 92 | # R4 — MSignContext.is_admin reads the column, not identity_type |
| 93 | # --------------------------------------------------------------------------- |
| 94 | |
| 95 | def test_r4_msign_context_is_admin_from_column() -> None: |
| 96 | """A human identity with is_admin=True must produce is_admin=True in MSignContext. |
| 97 | |
| 98 | The context must be built using identity.is_admin (column), not |
| 99 | identity_type == 'admin' (the old broken derivation). |
| 100 | """ |
| 101 | identity = MusehubIdentity( |
| 102 | identity_id="sha256:" + "c" * 64, |
| 103 | handle="admin-human", |
| 104 | identity_type="human", |
| 105 | is_admin=True, |
| 106 | ) |
| 107 | # Correct derivation: read the column, not the type string |
| 108 | ctx = MSignContext( |
| 109 | handle=identity.handle, |
| 110 | identity_id=identity.identity_id, |
| 111 | is_agent=(identity.identity_type == "agent"), |
| 112 | is_admin=identity.is_admin, |
| 113 | scope=identity.scope, |
| 114 | ) |
| 115 | assert ctx.is_admin is True |
| 116 | assert ctx.is_agent is False |
| 117 | |
| 118 | |
| 119 | def test_r4_human_identity_type_without_is_admin_flag_is_not_admin() -> None: |
| 120 | """A human identity with is_admin=False produces is_admin=False in context.""" |
| 121 | identity = MusehubIdentity( |
| 122 | identity_id="sha256:" + "d" * 64, |
| 123 | handle="plain-human", |
| 124 | identity_type="human", |
| 125 | is_admin=False, |
| 126 | ) |
| 127 | ctx = MSignContext( |
| 128 | handle=identity.handle, |
| 129 | identity_id=identity.identity_id, |
| 130 | is_agent=False, |
| 131 | is_admin=identity.is_admin, |
| 132 | scope=identity.scope, |
| 133 | ) |
| 134 | assert ctx.is_admin is False |
| 135 | |
| 136 | |
| 137 | # --------------------------------------------------------------------------- |
| 138 | # R5 — identity_type="admin" no longer grants is_admin |
| 139 | # --------------------------------------------------------------------------- |
| 140 | |
| 141 | def test_r5_identity_type_admin_string_does_not_grant_is_admin() -> None: |
| 142 | """The old conflation identity_type=='admin' must no longer set is_admin=True. |
| 143 | |
| 144 | A stale row with identity_type='admin' and is_admin=False (the column default) |
| 145 | must NOT produce is_admin=True in MSignContext. |
| 146 | """ |
| 147 | # Simulate stale data: identity_type is "admin" (bad), is_admin column is False |
| 148 | identity = MusehubIdentity( |
| 149 | identity_id="sha256:" + "e" * 64, |
| 150 | handle="stale-admin", |
| 151 | identity_type="human", # after migration, this row gets type="human" |
| 152 | is_admin=False, |
| 153 | ) |
| 154 | # Old broken derivation: is_admin=(identity.identity_type == "admin") → would be False here |
| 155 | # New correct derivation: is_admin=identity.is_admin → also False |
| 156 | # The test confirms the correct derivation is used |
| 157 | ctx = MSignContext( |
| 158 | handle=identity.handle, |
| 159 | identity_id=identity.identity_id, |
| 160 | is_agent=False, |
| 161 | is_admin=identity.is_admin, # column, not type string |
| 162 | scope=None, |
| 163 | ) |
| 164 | assert ctx.is_admin is False |
| 165 | |
| 166 | |
| 167 | # --------------------------------------------------------------------------- |
| 168 | # R1 — Valid self-registration identity_types accepted by VerifyRequest |
| 169 | # --------------------------------------------------------------------------- |
| 170 | |
| 171 | @pytest.mark.parametrize("itype", ["human", "agent"]) |
| 172 | def test_r1_valid_self_registration_identity_types_accepted(itype: str) -> None: |
| 173 | """VerifyRequest must accept human and agent — the two self-registration types. |
| 174 | |
| 175 | Note: "org" is created via POST /api/orgs, not via /api/auth/verify. |
| 176 | """ |
| 177 | from musehub.models.musehub_auth import VerifyRequest |
| 178 | req = VerifyRequest( |
| 179 | challenge_token="a" * 64, |
| 180 | public_key_b64="dGVzdA==", |
| 181 | signature_b64="dGVzdA==", |
| 182 | identity_type=itype, |
| 183 | ) |
| 184 | assert req.identity_type == itype |
| 185 | |
| 186 | |
| 187 | # --------------------------------------------------------------------------- |
| 188 | # R2 — "admin" identity_type rejected by VerifyRequest |
| 189 | # --------------------------------------------------------------------------- |
| 190 | |
| 191 | def test_r2_admin_identity_type_rejected_by_verify_request() -> None: |
| 192 | """VerifyRequest must reject identity_type='admin' with a validation error. |
| 193 | |
| 194 | Self-registration cannot grant admin privileges. Admin is an operational |
| 195 | privilege set via a separate flow, never by the registering user. |
| 196 | """ |
| 197 | from pydantic import ValidationError |
| 198 | from musehub.models.musehub_auth import VerifyRequest |
| 199 | |
| 200 | with pytest.raises(ValidationError) as exc_info: |
| 201 | VerifyRequest( |
| 202 | challenge_token="a" * 64, |
| 203 | public_key_b64="dGVzdA==", |
| 204 | signature_b64="dGVzdA==", |
| 205 | identity_type="admin", |
| 206 | ) |
| 207 | errors = exc_info.value.errors() |
| 208 | assert any("identity_type" in str(e) for e in errors), ( |
| 209 | f"Expected validation error on identity_type, got: {errors}" |
| 210 | ) |
| 211 | |
| 212 | |
| 213 | def test_r2_arbitrary_identity_type_rejected() -> None: |
| 214 | """VerifyRequest must reject arbitrary type strings like 'superuser'.""" |
| 215 | from pydantic import ValidationError |
| 216 | from musehub.models.musehub_auth import VerifyRequest |
| 217 | |
| 218 | with pytest.raises(ValidationError): |
| 219 | VerifyRequest( |
| 220 | challenge_token="a" * 64, |
| 221 | public_key_b64="dGVzdA==", |
| 222 | signature_b64="dGVzdA==", |
| 223 | identity_type="superuser", |
| 224 | ) |
| 225 | |
| 226 | |
| 227 | # --------------------------------------------------------------------------- |
| 228 | # R6 — Human with is_admin=True can call domain verify endpoint |
| 229 | # --------------------------------------------------------------------------- |
| 230 | |
| 231 | @pytest.mark.asyncio |
| 232 | async def test_r6_human_with_is_admin_true_can_verify_domain( |
| 233 | client: AsyncClient, |
| 234 | db_session: AsyncSession, |
| 235 | ) -> None: |
| 236 | """A human identity with is_admin=True can call the domain verify endpoint.""" |
| 237 | handle = "r6-admin-human" |
| 238 | identity = _make_identity(handle, identity_type="human", is_admin=True) |
| 239 | db_session.add(identity) |
| 240 | |
| 241 | domain = MusehubDomain( |
| 242 | domain_id="sha256:" + "f" * 64, |
| 243 | author_slug="r6-target", |
| 244 | slug="r6domain", |
| 245 | display_name="R6 Domain", |
| 246 | is_verified=False, |
| 247 | ) |
| 248 | db_session.add(domain) |
| 249 | await db_session.commit() |
| 250 | |
| 251 | admin_ctx = MSignContext( |
| 252 | handle=handle, |
| 253 | identity_id=identity.identity_id, |
| 254 | is_agent=False, |
| 255 | is_admin=True, |
| 256 | ) |
| 257 | app.dependency_overrides[require_signed_request] = lambda: admin_ctx |
| 258 | try: |
| 259 | r = await client.post( |
| 260 | "/api/domains/@r6-target/r6domain/verify", |
| 261 | headers={"Content-Type": "application/json"}, |
| 262 | ) |
| 263 | assert r.status_code == 200, f"Expected 200, got {r.status_code}: {r.text}" |
| 264 | finally: |
| 265 | app.dependency_overrides.pop(require_signed_request, None) |
| 266 | |
| 267 | |
| 268 | # --------------------------------------------------------------------------- |
| 269 | # R7 — Human with is_admin=False is blocked from domain verify endpoint |
| 270 | # --------------------------------------------------------------------------- |
| 271 | |
| 272 | @pytest.mark.asyncio |
| 273 | async def test_r7_human_with_is_admin_false_cannot_verify_domain( |
| 274 | client: AsyncClient, |
| 275 | db_session: AsyncSession, |
| 276 | ) -> None: |
| 277 | """A human identity with is_admin=False (the default) cannot call admin endpoints.""" |
| 278 | handle = "r7-nonadmin" |
| 279 | identity = _make_identity(handle, identity_type="human", is_admin=False) |
| 280 | db_session.add(identity) |
| 281 | await db_session.commit() |
| 282 | |
| 283 | non_admin_ctx = MSignContext( |
| 284 | handle=handle, |
| 285 | identity_id=identity.identity_id, |
| 286 | is_agent=False, |
| 287 | is_admin=False, |
| 288 | ) |
| 289 | app.dependency_overrides[require_signed_request] = lambda: non_admin_ctx |
| 290 | try: |
| 291 | r = await client.post( |
| 292 | "/api/domains/@r7-target/r7domain/verify", |
| 293 | headers={"Content-Type": "application/json"}, |
| 294 | ) |
| 295 | assert r.status_code == 403, f"Expected 403, got {r.status_code}: {r.text}" |
| 296 | finally: |
| 297 | app.dependency_overrides.pop(require_signed_request, None) |
File History
1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923
fix(issues): use issue number as pagination cursor, not cre…
Sonnet 4.6
patch
8 days ago