musehub_social.py
python
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923
fix(issues): use issue number as pagination cursor, not cre…
Sonnet 4.6
patch
8 days ago
| 1 | """Service layer for Muse Social domain — feed reads and real-time fan-out. |
| 2 | |
| 3 | Architecture |
| 4 | ------------ |
| 5 | Social state lives in the Muse object store — posts and reactions are |
| 6 | content-addressed files in a ``domain_id="social"`` repo. This service |
| 7 | layer reads that state for the HTTP feed endpoint and manages in-process |
| 8 | SSE subscriber queues for real-time delivery. |
| 9 | |
| 10 | Public functions |
| 11 | ---------------- |
| 12 | :func:`get_social_feed` Cursor-paginated posts feed for a handle. |
| 13 | :func:`subscribe_handle` Register a subscriber queue for a handle. |
| 14 | :func:`unsubscribe_handle` Remove a subscriber queue. |
| 15 | :func:`fan_out_to_subscribers` Push an event to all subscribers for a handle. |
| 16 | """ |
| 17 | |
| 18 | import asyncio |
| 19 | import hashlib |
| 20 | import json |
| 21 | import logging |
| 22 | from datetime import datetime, timezone |
| 23 | |
| 24 | import msgpack |
| 25 | from sqlalchemy import select |
| 26 | from sqlalchemy.ext.asyncio import AsyncSession |
| 27 | |
| 28 | from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot |
| 29 | from musehub.types.json_types import JSONObject |
| 30 | |
| 31 | logger = logging.getLogger(__name__) |
| 32 | |
| 33 | # --------------------------------------------------------------------------- |
| 34 | # In-process SSE subscriber registry |
| 35 | # Handle → set of asyncio.Queue. Queues are added/removed for the lifetime |
| 36 | # of each active SSE connection. Module-level so all requests share state. |
| 37 | # --------------------------------------------------------------------------- |
| 38 | |
| 39 | _subscribers: dict[str, set[asyncio.Queue[JSONObject]]] = {} |
| 40 | |
| 41 | def subscribe_handle(handle: str) -> asyncio.Queue[JSONObject]: |
| 42 | """Register a new subscriber queue for *handle* and return it. |
| 43 | |
| 44 | The caller is responsible for calling :func:`unsubscribe_handle` when |
| 45 | the SSE connection closes. |
| 46 | |
| 47 | Args: |
| 48 | handle: MuseHub handle to subscribe to. |
| 49 | |
| 50 | Returns: |
| 51 | A fresh asyncio.Queue that will receive pushed events. |
| 52 | """ |
| 53 | q: asyncio.Queue[JSONObject] = asyncio.Queue() |
| 54 | _subscribers.setdefault(handle, set()).add(q) |
| 55 | return q |
| 56 | |
| 57 | def unsubscribe_handle(handle: str, queue: asyncio.Queue[JSONObject]) -> None: |
| 58 | """Remove *queue* from the subscriber registry for *handle*. |
| 59 | |
| 60 | Safe to call even if the queue was already removed. |
| 61 | |
| 62 | Args: |
| 63 | handle: MuseHub handle the queue was subscribed to. |
| 64 | queue: The queue returned by :func:`subscribe_handle`. |
| 65 | """ |
| 66 | if handle in _subscribers: |
| 67 | _subscribers[handle].discard(queue) |
| 68 | if not _subscribers[handle]: |
| 69 | del _subscribers[handle] |
| 70 | |
| 71 | async def fan_out_to_subscribers(handle: str, event: JSONObject) -> None: |
| 72 | """Push *event* to every active SSE subscriber for *handle*. |
| 73 | |
| 74 | Called from the wire push handler when a social-domain commit lands. |
| 75 | No-op when there are no subscribers — never blocks. |
| 76 | |
| 77 | Args: |
| 78 | handle: Owner handle whose subscribers should receive the event. |
| 79 | event: JSON-serialisable dict (e.g. ``{"type": "social_delta", ...}``). |
| 80 | """ |
| 81 | for q in list(_subscribers.get(handle, set())): |
| 82 | try: |
| 83 | q.put_nowait(event) |
| 84 | except asyncio.QueueFull: |
| 85 | logger.warning("social SSE queue full for handle=%s — dropping event", handle) |
| 86 | |
| 87 | # --------------------------------------------------------------------------- |
| 88 | # Feed |
| 89 | # --------------------------------------------------------------------------- |
| 90 | |
| 91 | def _utc_now() -> datetime: |
| 92 | return datetime.now(tz=timezone.utc) |
| 93 | |
| 94 | async def get_social_feed( |
| 95 | session: AsyncSession, |
| 96 | handle: str, |
| 97 | *, |
| 98 | limit: int = 20, |
| 99 | cursor: str | None = None, |
| 100 | ) -> JSONObject: |
| 101 | """Return a cursor-paginated posts feed for *handle*. |
| 102 | |
| 103 | Reads the HEAD commit of the social-domain repo for *handle*, decodes the |
| 104 | snapshot manifest, loads each ``posts/*.json`` object, and returns them |
| 105 | sorted newest-first. |
| 106 | |
| 107 | Args: |
| 108 | session: SQLAlchemy async session. |
| 109 | handle: MuseHub owner handle. |
| 110 | limit: Maximum posts to return (default 20). |
| 111 | cursor: Opaque pagination cursor (ISO-8601 created_at of the last |
| 112 | seen post). Pass ``None`` for the first page. |
| 113 | |
| 114 | Returns: |
| 115 | Dict with keys: ``handle``, ``posts``, ``total``, ``next_cursor``. |
| 116 | ``next_cursor`` is ``None`` when no further pages exist. |
| 117 | """ |
| 118 | # 1. Find social repos for this handle. |
| 119 | repo_rows = (await session.execute( |
| 120 | select(MusehubRepo).where( |
| 121 | MusehubRepo.owner == handle, |
| 122 | MusehubRepo.domain_id == "social", |
| 123 | ) |
| 124 | )).scalars().all() |
| 125 | |
| 126 | if not repo_rows: |
| 127 | return {"handle": handle, "posts": [], "total": 0, "next_cursor": None} |
| 128 | |
| 129 | # 2. For each repo, find the latest commit and decode the snapshot manifest. |
| 130 | all_posts: list[JSONObject] = [] |
| 131 | |
| 132 | for repo in repo_rows: |
| 133 | commit_row = (await session.execute( |
| 134 | select(MusehubCommit) |
| 135 | .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) |
| 136 | .where(MusehubCommitRef.repo_id == repo.repo_id) |
| 137 | .order_by(MusehubCommit.timestamp.desc()) |
| 138 | .limit(1) |
| 139 | )).scalar_one_or_none() |
| 140 | |
| 141 | if commit_row is None or not commit_row.snapshot_id: |
| 142 | continue |
| 143 | |
| 144 | snap_row = (await session.execute( |
| 145 | select(MusehubSnapshot).where( |
| 146 | MusehubSnapshot.snapshot_id == commit_row.snapshot_id |
| 147 | ) |
| 148 | )).scalar_one_or_none() |
| 149 | |
| 150 | if snap_row is None or not snap_row.manifest_blob: |
| 151 | continue |
| 152 | |
| 153 | manifest: dict[str, str] = msgpack.unpackb(snap_row.manifest_blob, raw=False) |
| 154 | |
| 155 | # 3. Load each posts/*.json object. |
| 156 | for path, object_id in manifest.items(): |
| 157 | if not path.startswith("posts/") or not path.endswith(".json"): |
| 158 | continue |
| 159 | obj_row = (await session.execute( |
| 160 | select(MusehubObject).where( |
| 161 | MusehubObject.object_id == object_id |
| 162 | ) |
| 163 | )).scalar_one_or_none() |
| 164 | |
| 165 | if obj_row is None: |
| 166 | continue |
| 167 | |
| 168 | raw = obj_row.content_cache |
| 169 | if raw is None: |
| 170 | continue |
| 171 | |
| 172 | try: |
| 173 | post_data: JSONObject = json.loads(raw) |
| 174 | except (json.JSONDecodeError, UnicodeDecodeError): |
| 175 | logger.warning("social: malformed post object %s — skipping", object_id) |
| 176 | continue |
| 177 | |
| 178 | all_posts.append({ |
| 179 | "post_id": object_id, |
| 180 | "body": post_data.get("body", ""), |
| 181 | "created_at": post_data.get("created_at", ""), |
| 182 | "reply_to": post_data.get("reply_to"), |
| 183 | "quote_of": post_data.get("quote_of"), |
| 184 | }) |
| 185 | |
| 186 | # 4. Sort newest-first by created_at. |
| 187 | all_posts.sort(key=lambda p: p.get("created_at", ""), reverse=True) |
| 188 | |
| 189 | total = len(all_posts) |
| 190 | |
| 191 | # 5. Apply cursor (skip posts up to and including cursor value). |
| 192 | if cursor is not None: |
| 193 | trimmed: list[JSONObject] = [] |
| 194 | seen_cursor = False |
| 195 | for p in all_posts: |
| 196 | if seen_cursor: |
| 197 | trimmed.append(p) |
| 198 | elif p.get("created_at") == cursor: |
| 199 | seen_cursor = True |
| 200 | all_posts = trimmed |
| 201 | |
| 202 | # 6. Paginate. |
| 203 | page = all_posts[:limit] |
| 204 | remaining = all_posts[limit:] |
| 205 | next_cursor: str | None = page[-1]["created_at"] if remaining and page else None # type: ignore[assignment] |
| 206 | |
| 207 | return { |
| 208 | "handle": handle, |
| 209 | "posts": page, |
| 210 | "total": total, |
| 211 | "next_cursor": next_cursor, |
| 212 | } |
File History
1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923
fix(issues): use issue number as pagination cursor, not cre…
Sonnet 4.6
patch
8 days ago