gabriel / musehub public

test_identity_integration.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Integration tests for Phase 3: Auth as Code.
2
3 Tests:
4 - Expired identity returns 401 at auth time
5 - Agent identity scope is propagated to MSignContext
6 - require_scope() grants access to matching-scope agents
7 - require_scope() blocks agents missing the required scope
8 - Human identities (scope=None) bypass scope checks unconditionally
9 - require_scope() blocks with 403 (not 401) on scope failure
10 - 403 response includes the required scope name in detail
11
12 Run targeted:
13 docker compose exec musehub pytest tests/test_identity_integration.py -v
14 """
15 from __future__ import annotations
16
17 import secrets
18 import time
19 from datetime import datetime, timedelta, timezone
20 from unittest.mock import AsyncMock, MagicMock
21
22 import pytest
23 from fastapi import HTTPException
24 from httpx import AsyncClient
25 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
26 from sqlalchemy.ext.asyncio import AsyncSession
27
28 from musehub.auth.dependencies import require_scope as dep_require_scope, TokenClaims
29 from musehub.auth.request_signing import MSignContext, _verify_msign, build_canonical_message, require_scope
30 from muse.core.types import encode_pubkey
31 from musehub.crypto.keys import b64url_encode, key_fingerprint
32 from musehub.db.musehub_auth_models import MusehubAuthKey
33 from musehub.db.musehub_identity_models import MusehubIdentity
34
35
36 # ---------------------------------------------------------------------------
37 # Helpers
38 # ---------------------------------------------------------------------------
39
40 def _uid() -> str:
41 return secrets.token_hex(16)
42
43
44 def _keypair() -> tuple[Ed25519PrivateKey, bytes]:
45 priv = Ed25519PrivateKey.generate()
46 pub = priv.public_key().public_bytes_raw()
47 return priv, pub
48
49
50 def _msign_header(
51 priv: Ed25519PrivateKey,
52 handle: str,
53 method: str,
54 path: str,
55 body: bytes = b"",
56 ts: int | None = None,
57 host: str = "test",
58 ) -> str:
59 ts = ts if ts is not None else int(time.time())
60 canonical = build_canonical_message(method, path, ts, body, host=host)
61 sig_bytes = priv.sign(canonical)
62 sig_b64 = b64url_encode(sig_bytes)
63 return f'MSign handle="{handle}" alg="ed25519" ts={ts} sig="{sig_b64}"'
64
65
66 async def _seed_identity(
67 session: AsyncSession,
68 handle: str,
69 priv: Ed25519PrivateKey,
70 pub: bytes,
71 *,
72 identity_type: str = "human",
73 scope: list[str] | None = None,
74 expires_at: datetime | None = None,
75 ) -> MusehubIdentity:
76 """Create a MusehubIdentity + MusehubAuthKey pair in the test DB."""
77 identity = MusehubIdentity(
78 identity_id=_uid(),
79 handle=handle,
80 identity_type=identity_type,
81 display_name=handle,
82 scope=scope,
83 expires_at=expires_at,
84 )
85 session.add(identity)
86 await session.flush()
87
88 key_row = MusehubAuthKey(
89 key_id=_uid(),
90 identity_id=identity.identity_id,
91 algorithm="ed25519",
92 public_key_b64=encode_pubkey("ed25519", pub),
93 fingerprint=key_fingerprint(pub),
94 label="test-key",
95 )
96 session.add(key_row)
97 await session.commit()
98 await session.refresh(identity)
99 return identity
100
101
102 # ---------------------------------------------------------------------------
103 # 1. Expiry enforcement (E2E via HTTP client)
104 # ---------------------------------------------------------------------------
105
106
107 async def test_expired_agent_returns_401(client: AsyncClient, db_session: AsyncSession) -> None:
108 """An expired identity is rejected with 401 regardless of key validity."""
109 priv, pub = _keypair()
110 handle = f"expired-bot-{_uid()[:8]}"
111 await _seed_identity(
112 db_session, handle, priv, pub,
113 identity_type="agent",
114 expires_at=datetime.now(timezone.utc) - timedelta(hours=1),
115 )
116 auth = _msign_header(priv, handle, "GET", "/api/repos")
117 resp = await client.get("/api/repos", headers={"Authorization": auth})
118 assert resp.status_code == 401
119 assert "expired" in resp.json().get("detail", "").lower()
120
121
122 async def test_not_yet_expired_agent_passes_auth(client: AsyncClient, db_session: AsyncSession) -> None:
123 """An agent whose expires_at is in the future passes the auth check."""
124 priv, pub = _keypair()
125 handle = f"fresh-bot-{_uid()[:8]}"
126 await _seed_identity(
127 db_session, handle, priv, pub,
128 identity_type="agent",
129 scope=["issue:write"],
130 expires_at=datetime.now(timezone.utc) + timedelta(hours=2),
131 )
132 auth = _msign_header(priv, handle, "GET", "/api/repos")
133 resp = await client.get("/api/repos", headers={"Authorization": auth})
134 # Any non-401 means the auth check passed (could be 200 or scope-gated 403)
135 assert resp.status_code != 401
136
137
138 async def test_human_without_expiry_passes_auth(client: AsyncClient, db_session: AsyncSession) -> None:
139 """Human identities with no expires_at are not rejected."""
140 priv, pub = _keypair()
141 handle = f"human-noexp-{_uid()[:8]}"
142 await _seed_identity(db_session, handle, priv, pub, identity_type="human")
143 auth = _msign_header(priv, handle, "GET", "/api/repos")
144 resp = await client.get("/api/repos", headers={"Authorization": auth})
145 assert resp.status_code != 401
146
147
148 # ---------------------------------------------------------------------------
149 # 2. Scope propagation (service layer — _verify_msign directly)
150 # ---------------------------------------------------------------------------
151
152
153 async def test_agent_scope_propagated_to_msign_context(db_session: AsyncSession) -> None:
154 """scope list from MusehubIdentity is set on MSignContext after verification."""
155 priv, pub = _keypair()
156 handle = f"scoped-agent-{_uid()[:8]}"
157 await _seed_identity(
158 db_session, handle, priv, pub,
159 identity_type="agent",
160 scope=["issue:write", "proposal:write"],
161 )
162
163 method = "GET"
164 path = "/test"
165 ts = int(time.time())
166 canonical = build_canonical_message(method, path, ts, b"", host="")
167 sig_bytes = priv.sign(canonical)
168 sig_b64 = b64url_encode(sig_bytes)
169 auth_header = f'MSign handle="{handle}" alg="ed25519" ts={ts} sig="{sig_b64}"'
170
171 request = MagicMock()
172 request.headers.get.return_value = auth_header
173 request.method = method
174 request.url.path = path
175 request.url.query = ""
176 request.url.hostname = ""
177 request.url.port = None
178 request.url.scheme = "http"
179 request.body = AsyncMock(return_value=b"")
180
181 ctx = await _verify_msign(request, db_session, required=True)
182 assert ctx is not None
183 assert ctx.scope == ["issue:write", "proposal:write"]
184 assert ctx.is_agent is True
185
186
187 async def test_human_scope_is_none_in_context(db_session: AsyncSession) -> None:
188 """Human identity with no scope column → MSignContext.scope is None."""
189 priv, pub = _keypair()
190 handle = f"human-noscope-{_uid()[:8]}"
191 await _seed_identity(db_session, handle, priv, pub, identity_type="human")
192
193 method = "GET"
194 path = "/test"
195 ts = int(time.time())
196 canonical = build_canonical_message(method, path, ts, b"", host="")
197 sig_bytes = priv.sign(canonical)
198 sig_b64 = b64url_encode(sig_bytes)
199 auth_header = f'MSign handle="{handle}" alg="ed25519" ts={ts} sig="{sig_b64}"'
200
201 request = MagicMock()
202 request.headers.get.return_value = auth_header
203 request.method = method
204 request.url.path = path
205 request.url.query = ""
206 request.url.hostname = ""
207 request.url.port = None
208 request.url.scheme = "http"
209 request.body = AsyncMock(return_value=b"")
210
211 ctx = await _verify_msign(request, db_session, required=True)
212 assert ctx is not None
213 assert ctx.scope is None
214 assert ctx.is_agent is False
215
216
217 # ---------------------------------------------------------------------------
218 # 3. require_scope() unit logic (pure — no DB needed)
219 # ---------------------------------------------------------------------------
220
221
222 async def test_require_scope_human_scope_none_passes() -> None:
223 """require_scope() passes when claims.scope is None (human identity)."""
224 ctx = MSignContext(handle="human", identity_id="x", is_agent=False, is_admin=False, scope=None)
225 inner_dep = dep_require_scope("issue:write")
226 result = await inner_dep(claims=ctx)
227 assert result is ctx
228
229
230 async def test_require_scope_agent_matching_scope_passes() -> None:
231 """require_scope() passes when agent scope contains the required value."""
232 ctx = MSignContext(
233 handle="bot", identity_id="x", is_agent=True, is_admin=False,
234 scope=["issue:write", "proposal:write"],
235 )
236 inner_dep = dep_require_scope("issue:write")
237 result = await inner_dep(claims=ctx)
238 assert result is ctx
239
240
241 async def test_require_scope_agent_missing_scope_raises_403() -> None:
242 """require_scope() raises HTTP 403 when agent scope lacks the required value."""
243 ctx = MSignContext(
244 handle="bot", identity_id="x", is_agent=True, is_admin=False,
245 scope=["label:read"],
246 )
247 inner_dep = dep_require_scope("issue:write")
248 with pytest.raises(HTTPException) as exc_info:
249 await inner_dep(claims=ctx)
250 assert exc_info.value.status_code == 403
251 assert "issue:write" in exc_info.value.detail
252
253
254 async def test_require_scope_empty_scope_list_blocks_everything() -> None:
255 """Agent with scope=[] (empty list) is blocked from any scoped operation."""
256 ctx = MSignContext(
257 handle="bot", identity_id="x", is_agent=True, is_admin=False,
258 scope=[],
259 )
260 for required in ("issue:write", "proposal:write", "label:read", "release:write"):
261 inner_dep = dep_require_scope(required)
262 with pytest.raises(HTTPException) as exc_info:
263 await inner_dep(claims=ctx)
264 assert exc_info.value.status_code == 403
265
266
267 async def test_require_scope_returns_callable() -> None:
268 """require_scope() factory returns an awaitable callable."""
269 import inspect
270 dep = require_scope("issue:write")
271 assert inspect.iscoroutinefunction(dep)
272
273
274 # ---------------------------------------------------------------------------
275 # 4. Scope enforcement via HTTP (route-level)
276 # ---------------------------------------------------------------------------
277
278
279 async def test_agent_missing_issue_write_scope_gets_403_on_issue_create(
280 client: AsyncClient, db_session: AsyncSession
281 ) -> None:
282 """Agent without issue:write scope receives 403 when creating an issue."""
283 import json as _json
284 from tests.factories import create_repo
285
286 priv, pub = _keypair()
287 handle = f"bot-no-issue-{_uid()[:8]}"
288 await _seed_identity(
289 db_session, handle, priv, pub,
290 identity_type="agent",
291 scope=["label:read"], # intentionally missing issue:write
292 )
293 repo = await create_repo(db_session, owner=handle, visibility="public")
294
295 path = f"/api/repos/{repo.repo_id}/issues"
296 request_body = _json.dumps({"title": "Forbidden", "body": "body"}).encode()
297 auth = _msign_header(priv, handle, "POST", path, body=request_body)
298 resp = await client.post(
299 path,
300 content=request_body,
301 headers={"Authorization": auth, "Content-Type": "application/json"},
302 )
303 assert resp.status_code == 403
304 assert "issue:write" in resp.json().get("detail", "")
305
306
307 async def test_agent_with_issue_write_scope_passes_scope_check(
308 client: AsyncClient, db_session: AsyncSession
309 ) -> None:
310 """Agent with issue:write scope is not rejected by scope check on issue creation."""
311 import json as _json
312 from tests.factories import create_repo
313
314 priv, pub = _keypair()
315 handle = f"bot-issue-w-{_uid()[:8]}"
316 await _seed_identity(
317 db_session, handle, priv, pub,
318 identity_type="agent",
319 scope=["issue:write", "issue:read"],
320 )
321 repo = await create_repo(db_session, owner=handle, visibility="public")
322
323 path = f"/api/repos/{repo.repo_id}/issues"
324 request_body = _json.dumps({"title": "Agent issue", "body": "agent body"}).encode()
325 auth = _msign_header(priv, handle, "POST", path, body=request_body)
326 resp = await client.post(
327 path,
328 content=request_body,
329 headers={"Authorization": auth, "Content-Type": "application/json"},
330 )
331 # Scope check passes → 201 created (or a validation error, but NOT 403)
332 assert resp.status_code != 403
333
334
335 async def test_agent_missing_proposal_write_scope_gets_403(
336 client: AsyncClient, db_session: AsyncSession
337 ) -> None:
338 """Agent without proposal:write scope receives 403 when creating a proposal."""
339 import json as _json
340 from tests.factories import create_repo
341
342 priv, pub = _keypair()
343 handle = f"bot-no-prop-{_uid()[:8]}"
344 await _seed_identity(
345 db_session, handle, priv, pub,
346 identity_type="agent",
347 scope=["issue:write", "issue:read"], # no proposal:write
348 )
349 repo = await create_repo(db_session, owner=handle, visibility="public")
350
351 path = f"/api/repos/{repo.repo_id}/proposals"
352 request_body = _json.dumps({"title": "Test", "from_branch": "feat/x", "to_branch": "dev"}).encode()
353 auth = _msign_header(priv, handle, "POST", path, body=request_body)
354 resp = await client.post(
355 path,
356 content=request_body,
357 headers={"Authorization": auth, "Content-Type": "application/json"},
358 )
359 assert resp.status_code == 403
360 assert "proposal:write" in resp.json().get("detail", "")
361
362
363 async def test_human_passes_scope_check_on_issue_create(
364 client: AsyncClient, db_session: AsyncSession
365 ) -> None:
366 """Human identity (scope=None) bypasses scope enforcement and can create issues."""
367 import json as _json
368 from tests.factories import create_repo
369
370 priv, pub = _keypair()
371 handle = f"human-full-{_uid()[:8]}"
372 await _seed_identity(db_session, handle, priv, pub, identity_type="human")
373 repo = await create_repo(db_session, owner=handle, visibility="public")
374
375 path = f"/api/repos/{repo.repo_id}/issues"
376 request_body = _json.dumps({"title": "Human issue", "body": "body"}).encode()
377 auth = _msign_header(priv, handle, "POST", path, body=request_body)
378 resp = await client.post(
379 path,
380 content=request_body,
381 headers={"Authorization": auth, "Content-Type": "application/json"},
382 )
383 # Human should not be blocked by scope check
384 assert resp.status_code != 403