"""Phase 02 TDD — MuseHub Social API. RED → GREEN cycle. Run before implementation to confirm failures, then implement until all pass. Coverage matrix --------------- TestSocialApiShape — route registration, 200 for known handle, 404 for unknown TestSocialApiFeed — pagination, posts sorted newest-first, cursor, post fields TestSocialApiStream — SSE endpoint returns text/event-stream, heartbeat TestSocialFanOut — push with domain="social" fans out to subscribers TestSocialApiDocstrings — module, route handler docstrings present """ from __future__ import annotations import asyncio import json import secrets from collections.abc import AsyncGenerator from datetime import datetime, timezone, timedelta import anyio import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from muse.core.types import blob_id, split_id from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.main import app # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _OWNER = "testuser" # matches conftest._TEST_HANDLE # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) def _post_bytes(body: str, created_at: str) -> bytes: return json.dumps({"body": body, "created_at": created_at}).encode() async def _seed_social_repo( session: AsyncSession, owner: str = _OWNER, posts: list[tuple[str, str]] | None = None, ) -> MusehubRepo: """Create a social-domain repo with an optional set of posts in HEAD. Each entry in *posts* is (body, iso_created_at). Posts are seeded as objects stored via content_cache so no real filesystem is needed. """ slug = f"social-{secrets.token_hex(4)}" created_at = _utc_now() owner_id = compute_identity_id(owner.encode()) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "social", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, domain_id="social", created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() await session.refresh(repo) # Build manifest: {path: object_id} manifest: dict[str, str] = {} for body, ts in (posts or []): raw = _post_bytes(body, ts) oid = blob_id(raw) algo, hex_digest = split_id(oid) path = f"posts/{algo}/{hex_digest[:16]}.json" manifest[path] = oid # Seed the object (content_cache so no disk needed) obj = MusehubObject( object_id=oid, path=path, size_bytes=len(raw), storage_uri=None, content_cache=raw, ) session.add(obj) blob = msgpack.packb(manifest, use_bin_type=True) snap_id = blob_id(blob) snap = MusehubSnapshot( snapshot_id=snap_id, manifest_blob=blob, entry_count=len(manifest), ) session.add(snap) session.add(MusehubSnapshotRef(repo_id=repo.repo_id, snapshot_id=snap_id)) commit_id = blob_id(f"{repo.repo_id}:{snap_id}".encode()) commit = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=[], message="initial social commit", author=owner, timestamp=created_at, snapshot_id=snap_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=commit_id)) await session.flush() await session.commit() return repo # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]: transport = ASGITransport(app=app) # type: ignore[arg-type] async with AsyncClient(transport=transport, base_url="http://test") as ac: yield ac # =========================================================================== # Shape — route registration # =========================================================================== class TestSocialApiShape: @pytest.mark.asyncio async def test_feed_404_for_unknown_handle(self, client: AsyncClient) -> None: r = await client.get("/api/social/nobody-exists-xyz") assert r.status_code == 404 @pytest.mark.asyncio async def test_feed_200_for_known_handle_no_posts( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_social_repo(db_session, posts=[]) r = await client.get(f"/api/social/{_OWNER}") assert r.status_code == 200 @pytest.mark.asyncio async def test_feed_response_has_expected_keys( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_social_repo(db_session, posts=[]) r = await client.get(f"/api/social/{_OWNER}") body = r.json() assert "handle" in body assert "posts" in body assert "total" in body assert "next_cursor" in body @pytest.mark.asyncio async def test_feed_handle_matches_path( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_social_repo(db_session, posts=[]) r = await client.get(f"/api/social/{_OWNER}") assert r.json()["handle"] == _OWNER def test_stream_route_registered(self) -> None: # SSE streaming via BaseHTTPMiddleware cannot be consumed in-process; # verify route registration at the app level instead. from musehub.main import app paths = {getattr(r, "path", "") for r in app.routes} assert "/api/social/{handle}/stream" in paths def test_stream_returns_streaming_response(self) -> None: import inspect from musehub.api.routes.musehub.social import social_stream source = inspect.getsource(social_stream) assert "StreamingResponse" in source assert "SSE_CONTENT_TYPE" in source # =========================================================================== # Feed — pagination, ordering, post fields # =========================================================================== class TestSocialApiFeed: @pytest.mark.asyncio async def test_feed_returns_all_posts( self, client: AsyncClient, db_session: AsyncSession ) -> None: posts = [ ("hello muse", "2026-05-01T00:00:00Z"), ("second post", "2026-05-01T01:00:00Z"), ("third post", "2026-05-01T02:00:00Z"), ] await _seed_social_repo(db_session, posts=posts) r = await client.get(f"/api/social/{_OWNER}") body = r.json() assert body["total"] == 3 assert len(body["posts"]) == 3 @pytest.mark.asyncio async def test_feed_posts_sorted_newest_first( self, client: AsyncClient, db_session: AsyncSession ) -> None: posts = [ ("oldest", "2026-05-01T00:00:00Z"), ("newest", "2026-05-01T02:00:00Z"), ("middle", "2026-05-01T01:00:00Z"), ] await _seed_social_repo(db_session, posts=posts) r = await client.get(f"/api/social/{_OWNER}") returned = [p["body"] for p in r.json()["posts"]] assert returned == ["newest", "middle", "oldest"] @pytest.mark.asyncio async def test_feed_post_has_required_fields( self, client: AsyncClient, db_session: AsyncSession ) -> None: posts = [("test post", "2026-05-01T00:00:00Z")] await _seed_social_repo(db_session, posts=posts) r = await client.get(f"/api/social/{_OWNER}") post = r.json()["posts"][0] assert "post_id" in post assert "body" in post assert "created_at" in post @pytest.mark.asyncio async def test_feed_limit_parameter( self, client: AsyncClient, db_session: AsyncSession ) -> None: posts = [(f"post {i}", f"2026-05-01T0{i}:00:00Z") for i in range(5)] await _seed_social_repo(db_session, posts=posts) r = await client.get(f"/api/social/{_OWNER}?limit=2") body = r.json() assert len(body["posts"]) == 2 assert body["next_cursor"] is not None @pytest.mark.asyncio async def test_feed_cursor_pagination( self, client: AsyncClient, db_session: AsyncSession ) -> None: posts = [(f"post {i}", f"2026-05-01T0{i}:00:00Z") for i in range(4)] await _seed_social_repo(db_session, posts=posts) page1 = (await client.get(f"/api/social/{_OWNER}?limit=2")).json() cursor = page1["next_cursor"] assert cursor is not None page2 = (await client.get(f"/api/social/{_OWNER}?limit=2&cursor={cursor}")).json() # Combined posts should cover all 4 with no duplicates all_ids = {p["post_id"] for p in page1["posts"]} | {p["post_id"] for p in page2["posts"]} assert len(all_ids) == 4 @pytest.mark.asyncio async def test_feed_no_cursor_on_last_page( self, client: AsyncClient, db_session: AsyncSession ) -> None: posts = [("only post", "2026-05-01T00:00:00Z")] await _seed_social_repo(db_session, posts=posts) r = await client.get(f"/api/social/{_OWNER}?limit=10") assert r.json()["next_cursor"] is None @pytest.mark.asyncio async def test_feed_empty_for_no_posts( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_social_repo(db_session, posts=[]) r = await client.get(f"/api/social/{_OWNER}") body = r.json() assert body["posts"] == [] assert body["total"] == 0 # =========================================================================== # Fan-out — push with domain="social" notifies SSE subscribers # =========================================================================== class TestSocialFanOut: @pytest.mark.asyncio async def test_fan_out_delivers_event_to_subscriber(self) -> None: from musehub.services.musehub_social import ( subscribe_handle, unsubscribe_handle, fan_out_to_subscribers, ) q: asyncio.Queue[dict] = subscribe_handle(_OWNER) try: event = {"type": "social_delta", "posts_added": 1} await fan_out_to_subscribers(_OWNER, event) received = q.get_nowait() assert received["type"] == "social_delta" finally: unsubscribe_handle(_OWNER, q) @pytest.mark.asyncio async def test_fan_out_no_subscribers_is_noop(self) -> None: from musehub.services.musehub_social import fan_out_to_subscribers # Should not raise even when no subscribers are registered await fan_out_to_subscribers("handle-with-no-subscribers", {"type": "test"}) @pytest.mark.asyncio async def test_fan_out_multiple_subscribers_all_receive(self) -> None: from musehub.services.musehub_social import ( subscribe_handle, unsubscribe_handle, fan_out_to_subscribers, ) q1: asyncio.Queue[dict] = subscribe_handle(_OWNER) q2: asyncio.Queue[dict] = subscribe_handle(_OWNER) try: await fan_out_to_subscribers(_OWNER, {"type": "social_delta", "n": 1}) assert q1.get_nowait()["type"] == "social_delta" assert q2.get_nowait()["type"] == "social_delta" finally: unsubscribe_handle(_OWNER, q1) unsubscribe_handle(_OWNER, q2) @pytest.mark.asyncio async def test_unsubscribe_stops_delivery(self) -> None: from musehub.services.musehub_social import ( subscribe_handle, unsubscribe_handle, fan_out_to_subscribers, ) q: asyncio.Queue[dict] = subscribe_handle(_OWNER) unsubscribe_handle(_OWNER, q) await fan_out_to_subscribers(_OWNER, {"type": "social_delta"}) assert q.empty() # =========================================================================== # Service layer — get_social_feed unit tests # =========================================================================== class TestSocialFeedService: @pytest.mark.asyncio async def test_get_social_feed_empty_when_no_social_repo( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_social import get_social_feed result = await get_social_feed(db_session, "nonexistent-handle-abc") assert result["posts"] == [] assert result["total"] == 0 @pytest.mark.asyncio async def test_get_social_feed_returns_posts( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_social import get_social_feed posts = [ ("hello service", "2026-05-01T00:00:00Z"), ("second", "2026-05-01T01:00:00Z"), ] await _seed_social_repo(db_session, posts=posts) result = await get_social_feed(db_session, _OWNER) assert result["total"] == 2 @pytest.mark.asyncio async def test_get_social_feed_sorted_newest_first( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_social import get_social_feed posts = [ ("old", "2026-05-01T00:00:00Z"), ("new", "2026-05-01T02:00:00Z"), ] await _seed_social_repo(db_session, posts=posts) result = await get_social_feed(db_session, _OWNER) assert result["posts"][0]["body"] == "new" @pytest.mark.asyncio async def test_get_social_feed_limit( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_social import get_social_feed posts = [(f"post{i}", f"2026-05-01T0{i}:00:00Z") for i in range(5)] await _seed_social_repo(db_session, posts=posts) result = await get_social_feed(db_session, _OWNER, limit=2) assert len(result["posts"]) == 2 assert result["next_cursor"] is not None @pytest.mark.asyncio async def test_get_social_feed_raises_404_not_found( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_social import get_social_feed # No social repo for this handle → should return empty, not raise result = await get_social_feed(db_session, "nobody-xyz-abc") assert result["posts"] == [] # =========================================================================== # Docstrings # =========================================================================== class TestSocialApiDocstrings: def test_route_module_has_docstring(self) -> None: import musehub.api.routes.musehub.social as mod assert mod.__doc__ and len(mod.__doc__.strip()) > 20 def test_service_module_has_docstring(self) -> None: import musehub.services.musehub_social as mod assert mod.__doc__ and len(mod.__doc__.strip()) > 20 def test_feed_handler_has_docstring(self) -> None: from musehub.api.routes.musehub.social import social_feed assert social_feed.__doc__ and len(social_feed.__doc__.strip()) > 10 def test_stream_handler_has_docstring(self) -> None: from musehub.api.routes.musehub.social import social_stream assert social_stream.__doc__ and len(social_stream.__doc__.strip()) > 10 def test_get_social_feed_service_has_docstring(self) -> None: from musehub.services.musehub_social import get_social_feed assert get_social_feed.__doc__ and len(get_social_feed.__doc__.strip()) > 10 def test_fan_out_has_docstring(self) -> None: from musehub.services.musehub_social import fan_out_to_subscribers assert fan_out_to_subscribers.__doc__ and len(fan_out_to_subscribers.__doc__.strip()) > 10