"""Integration tests for Phase 3: Auth as Code. Tests: - Expired identity returns 401 at auth time - Agent identity scope is propagated to MSignContext - require_scope() grants access to matching-scope agents - require_scope() blocks agents missing the required scope - Human identities (scope=None) bypass scope checks unconditionally - require_scope() blocks with 403 (not 401) on scope failure - 403 response includes the required scope name in detail Run targeted: docker compose exec musehub pytest tests/test_identity_integration.py -v """ from __future__ import annotations import secrets import time from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock import pytest from fastapi import HTTPException from httpx import AsyncClient from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.dependencies import require_scope as dep_require_scope, TokenClaims from musehub.auth.request_signing import MSignContext, _verify_msign, build_canonical_message, require_scope from muse.core.types import encode_pubkey from musehub.crypto.keys import b64url_encode, key_fingerprint from musehub.db.musehub_auth_models import MusehubAuthKey from musehub.db.musehub_identity_models import MusehubIdentity # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _uid() -> str: return secrets.token_hex(16) def _keypair() -> tuple[Ed25519PrivateKey, bytes]: priv = Ed25519PrivateKey.generate() pub = priv.public_key().public_bytes_raw() return priv, pub def _msign_header( priv: Ed25519PrivateKey, handle: str, method: str, path: str, body: bytes = b"", ts: int | None = None, host: str = "test", ) -> str: ts = ts if ts is not None else int(time.time()) canonical = build_canonical_message(method, path, ts, body, host=host) sig_bytes = priv.sign(canonical) sig_b64 = b64url_encode(sig_bytes) return f'MSign handle="{handle}" alg="ed25519" ts={ts} sig="{sig_b64}"' async def _seed_identity( session: AsyncSession, handle: str, priv: Ed25519PrivateKey, pub: bytes, *, identity_type: str = "human", scope: list[str] | None = None, expires_at: datetime | None = None, ) -> MusehubIdentity: """Create a MusehubIdentity + MusehubAuthKey pair in the test DB.""" identity = MusehubIdentity( identity_id=_uid(), handle=handle, identity_type=identity_type, display_name=handle, scope=scope, expires_at=expires_at, ) session.add(identity) await session.flush() key_row = MusehubAuthKey( key_id=_uid(), identity_id=identity.identity_id, algorithm="ed25519", public_key_b64=encode_pubkey("ed25519", pub), fingerprint=key_fingerprint(pub), label="test-key", ) session.add(key_row) await session.commit() await session.refresh(identity) return identity # --------------------------------------------------------------------------- # 1. Expiry enforcement (E2E via HTTP client) # --------------------------------------------------------------------------- async def test_expired_agent_returns_401(client: AsyncClient, db_session: AsyncSession) -> None: """An expired identity is rejected with 401 regardless of key validity.""" priv, pub = _keypair() handle = f"expired-bot-{_uid()[:8]}" await _seed_identity( db_session, handle, priv, pub, identity_type="agent", expires_at=datetime.now(timezone.utc) - timedelta(hours=1), ) auth = _msign_header(priv, handle, "GET", "/api/repos") resp = await client.get("/api/repos", headers={"Authorization": auth}) assert resp.status_code == 401 assert "expired" in resp.json().get("detail", "").lower() async def test_not_yet_expired_agent_passes_auth(client: AsyncClient, db_session: AsyncSession) -> None: """An agent whose expires_at is in the future passes the auth check.""" priv, pub = _keypair() handle = f"fresh-bot-{_uid()[:8]}" await _seed_identity( db_session, handle, priv, pub, identity_type="agent", scope=["issue:write"], expires_at=datetime.now(timezone.utc) + timedelta(hours=2), ) auth = _msign_header(priv, handle, "GET", "/api/repos") resp = await client.get("/api/repos", headers={"Authorization": auth}) # Any non-401 means the auth check passed (could be 200 or scope-gated 403) assert resp.status_code != 401 async def test_human_without_expiry_passes_auth(client: AsyncClient, db_session: AsyncSession) -> None: """Human identities with no expires_at are not rejected.""" priv, pub = _keypair() handle = f"human-noexp-{_uid()[:8]}" await _seed_identity(db_session, handle, priv, pub, identity_type="human") auth = _msign_header(priv, handle, "GET", "/api/repos") resp = await client.get("/api/repos", headers={"Authorization": auth}) assert resp.status_code != 401 # --------------------------------------------------------------------------- # 2. Scope propagation (service layer — _verify_msign directly) # --------------------------------------------------------------------------- async def test_agent_scope_propagated_to_msign_context(db_session: AsyncSession) -> None: """scope list from MusehubIdentity is set on MSignContext after verification.""" priv, pub = _keypair() handle = f"scoped-agent-{_uid()[:8]}" await _seed_identity( db_session, handle, priv, pub, identity_type="agent", scope=["issue:write", "proposal:write"], ) method = "GET" path = "/test" ts = int(time.time()) canonical = build_canonical_message(method, path, ts, b"", host="") sig_bytes = priv.sign(canonical) sig_b64 = b64url_encode(sig_bytes) auth_header = f'MSign handle="{handle}" alg="ed25519" ts={ts} sig="{sig_b64}"' request = MagicMock() request.headers.get.return_value = auth_header request.method = method request.url.path = path request.url.query = "" request.url.hostname = "" request.url.port = None request.url.scheme = "http" request.body = AsyncMock(return_value=b"") ctx = await _verify_msign(request, db_session, required=True) assert ctx is not None assert ctx.scope == ["issue:write", "proposal:write"] assert ctx.is_agent is True async def test_human_scope_is_none_in_context(db_session: AsyncSession) -> None: """Human identity with no scope column → MSignContext.scope is None.""" priv, pub = _keypair() handle = f"human-noscope-{_uid()[:8]}" await _seed_identity(db_session, handle, priv, pub, identity_type="human") method = "GET" path = "/test" ts = int(time.time()) canonical = build_canonical_message(method, path, ts, b"", host="") sig_bytes = priv.sign(canonical) sig_b64 = b64url_encode(sig_bytes) auth_header = f'MSign handle="{handle}" alg="ed25519" ts={ts} sig="{sig_b64}"' request = MagicMock() request.headers.get.return_value = auth_header request.method = method request.url.path = path request.url.query = "" request.url.hostname = "" request.url.port = None request.url.scheme = "http" request.body = AsyncMock(return_value=b"") ctx = await _verify_msign(request, db_session, required=True) assert ctx is not None assert ctx.scope is None assert ctx.is_agent is False # --------------------------------------------------------------------------- # 3. require_scope() unit logic (pure — no DB needed) # --------------------------------------------------------------------------- async def test_require_scope_human_scope_none_passes() -> None: """require_scope() passes when claims.scope is None (human identity).""" ctx = MSignContext(handle="human", identity_id="x", is_agent=False, is_admin=False, scope=None) inner_dep = dep_require_scope("issue:write") result = await inner_dep(claims=ctx) assert result is ctx async def test_require_scope_agent_matching_scope_passes() -> None: """require_scope() passes when agent scope contains the required value.""" ctx = MSignContext( handle="bot", identity_id="x", is_agent=True, is_admin=False, scope=["issue:write", "proposal:write"], ) inner_dep = dep_require_scope("issue:write") result = await inner_dep(claims=ctx) assert result is ctx async def test_require_scope_agent_missing_scope_raises_403() -> None: """require_scope() raises HTTP 403 when agent scope lacks the required value.""" ctx = MSignContext( handle="bot", identity_id="x", is_agent=True, is_admin=False, scope=["label:read"], ) inner_dep = dep_require_scope("issue:write") with pytest.raises(HTTPException) as exc_info: await inner_dep(claims=ctx) assert exc_info.value.status_code == 403 assert "issue:write" in exc_info.value.detail async def test_require_scope_empty_scope_list_blocks_everything() -> None: """Agent with scope=[] (empty list) is blocked from any scoped operation.""" ctx = MSignContext( handle="bot", identity_id="x", is_agent=True, is_admin=False, scope=[], ) for required in ("issue:write", "proposal:write", "label:read", "release:write"): inner_dep = dep_require_scope(required) with pytest.raises(HTTPException) as exc_info: await inner_dep(claims=ctx) assert exc_info.value.status_code == 403 async def test_require_scope_returns_callable() -> None: """require_scope() factory returns an awaitable callable.""" import inspect dep = require_scope("issue:write") assert inspect.iscoroutinefunction(dep) # --------------------------------------------------------------------------- # 4. Scope enforcement via HTTP (route-level) # --------------------------------------------------------------------------- async def test_agent_missing_issue_write_scope_gets_403_on_issue_create( client: AsyncClient, db_session: AsyncSession ) -> None: """Agent without issue:write scope receives 403 when creating an issue.""" import json as _json from tests.factories import create_repo priv, pub = _keypair() handle = f"bot-no-issue-{_uid()[:8]}" await _seed_identity( db_session, handle, priv, pub, identity_type="agent", scope=["label:read"], # intentionally missing issue:write ) repo = await create_repo(db_session, owner=handle, visibility="public") path = f"/api/repos/{repo.repo_id}/issues" request_body = _json.dumps({"title": "Forbidden", "body": "body"}).encode() auth = _msign_header(priv, handle, "POST", path, body=request_body) resp = await client.post( path, content=request_body, headers={"Authorization": auth, "Content-Type": "application/json"}, ) assert resp.status_code == 403 assert "issue:write" in resp.json().get("detail", "") async def test_agent_with_issue_write_scope_passes_scope_check( client: AsyncClient, db_session: AsyncSession ) -> None: """Agent with issue:write scope is not rejected by scope check on issue creation.""" import json as _json from tests.factories import create_repo priv, pub = _keypair() handle = f"bot-issue-w-{_uid()[:8]}" await _seed_identity( db_session, handle, priv, pub, identity_type="agent", scope=["issue:write", "issue:read"], ) repo = await create_repo(db_session, owner=handle, visibility="public") path = f"/api/repos/{repo.repo_id}/issues" request_body = _json.dumps({"title": "Agent issue", "body": "agent body"}).encode() auth = _msign_header(priv, handle, "POST", path, body=request_body) resp = await client.post( path, content=request_body, headers={"Authorization": auth, "Content-Type": "application/json"}, ) # Scope check passes → 201 created (or a validation error, but NOT 403) assert resp.status_code != 403 async def test_agent_missing_proposal_write_scope_gets_403( client: AsyncClient, db_session: AsyncSession ) -> None: """Agent without proposal:write scope receives 403 when creating a proposal.""" import json as _json from tests.factories import create_repo priv, pub = _keypair() handle = f"bot-no-prop-{_uid()[:8]}" await _seed_identity( db_session, handle, priv, pub, identity_type="agent", scope=["issue:write", "issue:read"], # no proposal:write ) repo = await create_repo(db_session, owner=handle, visibility="public") path = f"/api/repos/{repo.repo_id}/proposals" request_body = _json.dumps({"title": "Test", "from_branch": "feat/x", "to_branch": "dev"}).encode() auth = _msign_header(priv, handle, "POST", path, body=request_body) resp = await client.post( path, content=request_body, headers={"Authorization": auth, "Content-Type": "application/json"}, ) assert resp.status_code == 403 assert "proposal:write" in resp.json().get("detail", "") async def test_human_passes_scope_check_on_issue_create( client: AsyncClient, db_session: AsyncSession ) -> None: """Human identity (scope=None) bypasses scope enforcement and can create issues.""" import json as _json from tests.factories import create_repo priv, pub = _keypair() handle = f"human-full-{_uid()[:8]}" await _seed_identity(db_session, handle, priv, pub, identity_type="human") repo = await create_repo(db_session, owner=handle, visibility="public") path = f"/api/repos/{repo.repo_id}/issues" request_body = _json.dumps({"title": "Human issue", "body": "body"}).encode() auth = _msign_header(priv, handle, "POST", path, body=request_body) resp = await client.post( path, content=request_body, headers={"Authorization": auth, "Content-Type": "application/json"}, ) # Human should not be blocked by scope check assert resp.status_code != 403