"""Section 30 — Sessions: 7-layer test suite. Covers: musehub/services/musehub_sessions.py — _to_response, upsert_session, list_sessions, get_session musehub/services/musehub_repository.py — create_session, stop_session, list_sessions, get_session musehub/api/routes/musehub/repos.py — POST/GET/GET/{id}/POST/{id}/stop endpoints musehub/db/musehub_models.py — MusehubSession ORM model musehub/models/musehub.py — SessionCreate, SessionStop, SessionResponse HTTP routes (all under /api): POST /repos/{repo_id}/sessions → create_session (201, auth required) GET /repos/{repo_id}/sessions → list_sessions (200, optional auth) GET /repos/{repo_id}/sessions/{sid} → get_session (200, optional auth) POST /repos/{repo_id}/sessions/{sid}/stop → stop_session (200, auth required) """ from __future__ import annotations import secrets import time from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import long_id from musehub.core.genesis import compute_identity_id, compute_repo_id, compute_session_id from musehub.db.musehub_repo_models import MusehubRepo, MusehubSession from musehub.models.musehub import SessionCreate, SessionResponse, SessionStop from musehub.services import musehub_repository, musehub_sessions from musehub.types.json_types import StrDict # ── helpers ─────────────────────────────────────────────────────────────────── def _uid() -> str: return secrets.token_hex(16) def _now() -> datetime: return datetime.now(tz=timezone.utc) async def _db_repo(session: AsyncSession, *, visibility: str = "public") -> MusehubRepo: slug = f"sess-repo-{_uid()[:8]}" owner_id = compute_identity_id(b"testuser") created_at = datetime.now(tz=timezone.utc) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, slug=slug, owner="testuser", owner_user_id=owner_id, visibility=visibility, created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() return repo async def _db_session( session: AsyncSession, repo_id: str, *, is_active: bool = True, participants: list[str] | None = None, ) -> MusehubSession: started_at = _now() author_id = compute_identity_id(b"testuser") s = MusehubSession( session_id=compute_session_id(repo_id, author_id, started_at.isoformat()), repo_id=repo_id, started_at=started_at, participants=participants or [], location="Test Studio", intent="test intent", is_active=is_active, ) session.add(s) await session.flush() return s # ═══════════════════════════════════════════════════════════════════════════════ # Layer 1 — Unit # ═══════════════════════════════════════════════════════════════════════════════ class TestUnitSessions: """Pure logic tests — no DB, no HTTP.""" def test_session_create_defaults(self) -> None: sc = SessionCreate() assert sc.participants == [] assert sc.intent == "" assert sc.location == "" assert sc.started_at is None assert sc.is_active is True def test_session_create_with_data(self) -> None: t = _now() sc = SessionCreate( started_at=t, participants=["alice", "bob"], intent="Write the chorus", location="Abbey Road", ) assert sc.participants == ["alice", "bob"] assert sc.started_at == t def test_session_stop_defaults(self) -> None: ss = SessionStop() assert ss.ended_at is None def test_session_stop_with_time(self) -> None: t = _now() ss = SessionStop(ended_at=t) assert ss.ended_at == t def test_session_response_fields(self) -> None: t = _now() sid = compute_session_id(long_id("a" * 64), compute_identity_id(b"carol"), t.isoformat()) sr = SessionResponse( session_id=sid, started_at=t, ended_at=None, duration_seconds=None, participants=["carol"], commits=[], notes="", intent="jam", location="studio", is_active=True, created_at=t, ) assert sr.session_id == sid assert sr.is_active is True assert sr.duration_seconds is None def test_session_response_duration_none_when_active(self) -> None: t = _now() sid = compute_session_id(long_id("b" * 64), compute_identity_id(b"testuser"), t.isoformat()) sr = SessionResponse( session_id=sid, started_at=t, ended_at=None, duration_seconds=None, participants=[], commits=[], notes="", intent="", location="", is_active=True, created_at=t, ) assert sr.duration_seconds is None def test_session_response_commits_default_empty(self) -> None: t = _now() sid = compute_session_id(long_id("c" * 64), compute_identity_id(b"testuser"), t.isoformat()) sr = SessionResponse( session_id=sid, started_at=t, participants=[], commits=[], notes="", intent="", location="", is_active=False, created_at=t, ) assert sr.commits == [] # ═══════════════════════════════════════════════════════════════════════════════ # Layer 2 — Integration # ═══════════════════════════════════════════════════════════════════════════════ class TestIntegrationSessionService: """Real DB, service-layer calls.""" async def test_create_session_returns_response(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() resp = await musehub_repository.create_session( db_session, repo.repo_id, started_at=None, participants=["alice"], intent="write", location="studio", ) assert isinstance(resp, SessionResponse) assert resp.is_active is True assert resp.session_id is not None async def test_create_session_uses_provided_started_at(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() t = _now() resp = await musehub_repository.create_session( db_session, repo.repo_id, started_at=t, participants=[], intent="", location="", ) # started_at stored as UTC; compare without tz assert resp.started_at.replace(tzinfo=None) == t.replace(tzinfo=None) async def test_list_sessions_empty(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() sessions, total, _ = await musehub_repository.list_sessions(db_session, repo.repo_id) assert sessions == [] assert total == 0 async def test_list_sessions_returns_all(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await _db_session(db_session, repo.repo_id) await _db_session(db_session, repo.repo_id) await db_session.commit() sessions, total, _ = await musehub_repository.list_sessions(db_session, repo.repo_id) assert total == 2 assert len(sessions) == 2 async def test_list_sessions_limit(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) for _ in range(5): await _db_session(db_session, repo.repo_id) await db_session.commit() sessions, total, _ = await musehub_repository.list_sessions(db_session, repo.repo_id, limit=3) assert total == 5 assert len(sessions) == 3 async def test_get_session_found(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id) await db_session.commit() result = await musehub_repository.get_session(db_session, repo.repo_id, sess.session_id) assert result is not None assert result.session_id == sess.session_id async def test_get_session_not_found(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() result = await musehub_repository.get_session(db_session, repo.repo_id, "nonexistent") assert result is None async def test_get_session_wrong_repo_returns_none(self, db_session: AsyncSession) -> None: repo1 = await _db_repo(db_session) repo2 = await _db_repo(db_session) sess = await _db_session(db_session, repo1.repo_id) await db_session.commit() result = await musehub_repository.get_session(db_session, repo2.repo_id, sess.session_id) assert result is None async def test_stop_session_marks_ended(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id, is_active=True) await db_session.commit() result = await musehub_repository.stop_session( db_session, repo.repo_id, sess.session_id, ended_at=None ) assert result.is_active is False assert result.ended_at is not None async def test_stop_session_not_found_returns_none(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() result = await musehub_repository.stop_session( db_session, repo.repo_id, "nonexistent-id", ended_at=None ) assert result is None async def test_stop_session_idempotent(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id, is_active=True) await db_session.commit() t = _now() await musehub_repository.stop_session( db_session, repo.repo_id, sess.session_id, ended_at=t ) # stop again — is_active already False, should not change ended_at result2 = await musehub_repository.stop_session( db_session, repo.repo_id, sess.session_id, ended_at=None ) assert result2.is_active is False async def test_stop_session_duration_computed(self, db_session: AsyncSession) -> None: from datetime import timedelta repo = await _db_repo(db_session) started = datetime(2025, 1, 1, 10, 0, 0) ended = datetime(2025, 1, 1, 11, 30, 0) author_id = compute_identity_id(b"testuser") sess = MusehubSession( session_id=compute_session_id(repo.repo_id, author_id, started.isoformat()), repo_id=repo.repo_id, started_at=started, participants=[], location="", intent="", is_active=True, ) db_session.add(sess) await db_session.commit() result = await musehub_repository.stop_session( db_session, repo.repo_id, sess.session_id, ended_at=ended ) assert result.duration_seconds == 5400.0 # 90 minutes async def test_musehub_sessions_service_upsert(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() sc = SessionCreate(participants=["dave"], intent="jam", location="garage") resp = await musehub_sessions.upsert_session(db_session, repo.repo_id, sc) assert resp.is_active is True assert resp.participants == ["dave"] async def test_musehub_sessions_service_list(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await _db_session(db_session, repo.repo_id) await db_session.commit() sessions, total, _ = await musehub_sessions.list_sessions(db_session, repo.repo_id) assert total == 1 assert len(sessions) == 1 async def test_musehub_sessions_service_get(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id) await db_session.commit() result = await musehub_sessions.get_session(db_session, repo.repo_id, sess.session_id) assert result is not None assert result.session_id == sess.session_id async def test_musehub_sessions_service_get_missing(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) await db_session.commit() result = await musehub_sessions.get_session(db_session, repo.repo_id, "bad-id") assert result is None # ═══════════════════════════════════════════════════════════════════════════════ # Layer 3 — End-to-End # ═══════════════════════════════════════════════════════════════════════════════ class TestE2ESessions: """Full HTTP stack via AsyncClient.""" async def test_create_session_201( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/sessions", json={"participants": ["alice"], "intent": "compose", "location": "home"}, headers=auth_headers, ) assert resp.status_code == 201 data = resp.json() assert data["isActive"] is True assert "sessionId" in data async def test_create_session_repo_not_found( self, client: AsyncClient, auth_headers: StrDict ) -> None: resp = await client.post( "/api/repos/nonexistent/sessions", json={}, headers=auth_headers, ) assert resp.status_code == 404 async def test_list_sessions_empty( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions") assert resp.status_code == 200 data = resp.json() assert data["total"] == 0 assert data["sessions"] == [] async def test_list_sessions_returns_created( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() await client.post( f"/api/repos/{repo.repo_id}/sessions", json={"participants": ["bob"], "intent": "record", "location": "studio"}, headers=auth_headers, ) resp = await client.get(f"/api/repos/{repo.repo_id}/sessions") assert resp.status_code == 200 assert resp.json()["total"] == 1 async def test_list_sessions_limit_param( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) for _ in range(5): await _db_session(db_session, repo.repo_id) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions?limit=3") assert resp.status_code == 200 data = resp.json() assert data["total"] == 5 assert len(data["sessions"]) == 3 async def test_get_session_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions/{sess.session_id}") assert resp.status_code == 200 data = resp.json() assert data["sessionId"] == sess.session_id async def test_get_session_not_found( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions/nonexistent") assert resp.status_code == 404 async def test_stop_session_200( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() # create via HTTP create_resp = await client.post( f"/api/repos/{repo.repo_id}/sessions", json={"intent": "test"}, headers=auth_headers, ) assert create_resp.status_code == 201 session_id = create_resp.json()["sessionId"] # stop it stop_resp = await client.post( f"/api/repos/{repo.repo_id}/sessions/{session_id}/stop", json={}, headers=auth_headers, ) assert stop_resp.status_code == 200 data = stop_resp.json() assert data["isActive"] is False assert data["endedAt"] is not None async def test_stop_session_not_found( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/sessions/nonexistent/stop", json={}, headers=auth_headers, ) assert resp.status_code == 404 async def test_stop_session_with_explicit_ended_at( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() create_resp = await client.post( f"/api/repos/{repo.repo_id}/sessions", json={}, headers=auth_headers, ) session_id = create_resp.json()["sessionId"] ended_at = "2025-06-01T12:00:00+00:00" stop_resp = await client.post( f"/api/repos/{repo.repo_id}/sessions/{session_id}/stop", json={"endedAt": ended_at}, headers=auth_headers, ) assert stop_resp.status_code == 200 assert stop_resp.json()["isActive"] is False async def test_list_sessions_invalid_limit( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions?limit=999") assert resp.status_code == 422 async def test_list_sessions_repo_not_found(self, client: AsyncClient) -> None: resp = await client.get("/api/repos/nonexistent/sessions") assert resp.status_code == 404 # ═══════════════════════════════════════════════════════════════════════════════ # Layer 4 — Stress # ═══════════════════════════════════════════════════════════════════════════════ class TestStressSessions: async def test_create_many_sessions( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() n = 15 for _ in range(n): resp = await client.post( f"/api/repos/{repo.repo_id}/sessions", json={"intent": "batch"}, headers=auth_headers, ) assert resp.status_code == 201 list_resp = await client.get(f"/api/repos/{repo.repo_id}/sessions?limit=200") assert list_resp.json()["total"] == n async def test_list_sessions_large_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) for _ in range(60): await _db_session(db_session, repo.repo_id) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions") data = resp.json() assert data["total"] == 60 assert len(data["sessions"]) == 50 # default limit # ═══════════════════════════════════════════════════════════════════════════════ # Layer 5 — Data Integrity # ═══════════════════════════════════════════════════════════════════════════════ class TestDataIntegritySessions: async def test_session_persists_after_commit(self, db_session: AsyncSession) -> None: from sqlalchemy import select repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id) await db_session.commit() result = await db_session.execute( select(MusehubSession).where(MusehubSession.session_id == sess.session_id) ) row = result.scalar_one_or_none() assert row is not None assert row.repo_id == repo.repo_id async def test_stop_session_updates_is_active_flag(self, db_session: AsyncSession) -> None: from sqlalchemy import select repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id, is_active=True) await db_session.commit() await musehub_repository.stop_session( db_session, repo.repo_id, sess.session_id, ended_at=None ) await db_session.commit() result = await db_session.execute( select(MusehubSession).where(MusehubSession.session_id == sess.session_id) ) row = result.scalar_one() assert row.is_active is False assert row.ended_at is not None async def test_participants_stored_as_list(self, db_session: AsyncSession) -> None: from sqlalchemy import select repo = await _db_repo(db_session) participants = ["alice", "bob", "carol"] sess = await _db_session(db_session, repo.repo_id, participants=participants) await db_session.commit() result = await db_session.execute( select(MusehubSession).where(MusehubSession.session_id == sess.session_id) ) row = result.scalar_one() assert row.participants == participants async def test_duration_correct_after_stop(self, db_session: AsyncSession) -> None: from datetime import timedelta repo = await _db_repo(db_session) started = datetime(2025, 3, 1, 9, 0, 0) ended = datetime(2025, 3, 1, 10, 0, 0) # 3600 seconds later author_id = compute_identity_id(b"testuser") sess = MusehubSession( session_id=compute_session_id(repo.repo_id, author_id, started.isoformat()), repo_id=repo.repo_id, started_at=started, participants=[], location="", intent="", is_active=True, ) db_session.add(sess) await db_session.commit() result = await musehub_repository.stop_session( db_session, repo.repo_id, sess.session_id, ended_at=ended ) assert result.duration_seconds == 3600.0 async def test_session_scoped_to_repo(self, db_session: AsyncSession) -> None: repo1 = await _db_repo(db_session) repo2 = await _db_repo(db_session) await _db_session(db_session, repo1.repo_id) await _db_session(db_session, repo1.repo_id) await _db_session(db_session, repo2.repo_id) await db_session.commit() sessions1, total1, _ = await musehub_repository.list_sessions(db_session, repo1.repo_id) sessions2, total2, _ = await musehub_repository.list_sessions(db_session, repo2.repo_id) assert total1 == 2 assert total2 == 1 async def test_stop_already_stopped_preserves_ended_at(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id, is_active=True) await db_session.commit() t1 = datetime(2025, 6, 1, 10, 0, 0) await musehub_repository.stop_session(db_session, repo.repo_id, sess.session_id, ended_at=t1) await db_session.commit() # Stop again — is_active is already False, so ended_at must NOT change t2 = datetime(2025, 6, 1, 11, 0, 0) result = await musehub_repository.stop_session(db_session, repo.repo_id, sess.session_id, ended_at=t2) # ended_at stays as t1 (not overwritten because is_active was already False) assert result.ended_at.replace(tzinfo=None) == t1 # ═══════════════════════════════════════════════════════════════════════════════ # Layer 6 — Security # ═══════════════════════════════════════════════════════════════════════════════ class TestSecuritySessions: async def test_create_session_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/sessions", json={"intent": "unauth"}, ) assert resp.status_code == 401 async def test_stop_session_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) sess = await _db_session(db_session, repo.repo_id) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/sessions/{sess.session_id}/stop", json={}, ) assert resp.status_code == 401 async def test_list_sessions_public_repo_no_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, visibility="public") await _db_session(db_session, repo.repo_id) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions") assert resp.status_code == 200 async def test_list_sessions_private_repo_no_auth_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, visibility="private") await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions") assert resp.status_code == 401 async def test_get_session_private_repo_no_auth_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, visibility="private") sess = await _db_session(db_session, repo.repo_id) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions/{sess.session_id}") assert resp.status_code == 401 async def test_cannot_stop_other_repos_session( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: """Session from repo1 cannot be stopped via repo2's endpoint.""" repo1 = await _db_repo(db_session) repo2 = await _db_repo(db_session) sess = await _db_session(db_session, repo1.repo_id) await db_session.commit() resp = await client.post( f"/api/repos/{repo2.repo_id}/sessions/{sess.session_id}/stop", json={}, headers=auth_headers, ) assert resp.status_code == 404 # ═══════════════════════════════════════════════════════════════════════════════ # Layer 7 — Performance # ═══════════════════════════════════════════════════════════════════════════════ class TestPerformanceSessions: async def test_create_session_latency( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() start = time.perf_counter() resp = await client.post( f"/api/repos/{repo.repo_id}/sessions", json={"intent": "perf"}, headers=auth_headers, ) elapsed = time.perf_counter() - start assert resp.status_code == 201 assert elapsed < 0.5 async def test_list_sessions_latency( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) for _ in range(20): await _db_session(db_session, repo.repo_id) await db_session.commit() start = time.perf_counter() resp = await client.get(f"/api/repos/{repo.repo_id}/sessions") elapsed = time.perf_counter() - start assert resp.status_code == 200 assert elapsed < 0.5 async def test_stop_session_latency( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: repo = await _db_repo(db_session) await db_session.commit() create_resp = await client.post( f"/api/repos/{repo.repo_id}/sessions", json={}, headers=auth_headers, ) session_id = create_resp.json()["sessionId"] start = time.perf_counter() stop_resp = await client.post( f"/api/repos/{repo.repo_id}/sessions/{session_id}/stop", json={}, headers=auth_headers, ) elapsed = time.perf_counter() - start assert stop_resp.status_code == 200 assert elapsed < 0.5