gabriel / musehub public
musehub_social.py python
212 lines 7.3 KB
Raw
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago
1 """Service layer for Muse Social domain — feed reads and real-time fan-out.
2
3 Architecture
4 ------------
5 Social state lives in the Muse object store — posts and reactions are
6 content-addressed files in a ``domain_id="social"`` repo. This service
7 layer reads that state for the HTTP feed endpoint and manages in-process
8 SSE subscriber queues for real-time delivery.
9
10 Public functions
11 ----------------
12 :func:`get_social_feed` Cursor-paginated posts feed for a handle.
13 :func:`subscribe_handle` Register a subscriber queue for a handle.
14 :func:`unsubscribe_handle` Remove a subscriber queue.
15 :func:`fan_out_to_subscribers` Push an event to all subscribers for a handle.
16 """
17
18 import asyncio
19 import hashlib
20 import json
21 import logging
22 from datetime import datetime, timezone
23
24 import msgpack
25 from sqlalchemy import select
26 from sqlalchemy.ext.asyncio import AsyncSession
27
28 from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject, MusehubRepo, MusehubSnapshot
29 from musehub.types.json_types import JSONObject
30
31 logger = logging.getLogger(__name__)
32
33 # ---------------------------------------------------------------------------
34 # In-process SSE subscriber registry
35 # Handle → set of asyncio.Queue. Queues are added/removed for the lifetime
36 # of each active SSE connection. Module-level so all requests share state.
37 # ---------------------------------------------------------------------------
38
39 _subscribers: dict[str, set[asyncio.Queue[JSONObject]]] = {}
40
41 def subscribe_handle(handle: str) -> asyncio.Queue[JSONObject]:
42 """Register a new subscriber queue for *handle* and return it.
43
44 The caller is responsible for calling :func:`unsubscribe_handle` when
45 the SSE connection closes.
46
47 Args:
48 handle: MuseHub handle to subscribe to.
49
50 Returns:
51 A fresh asyncio.Queue that will receive pushed events.
52 """
53 q: asyncio.Queue[JSONObject] = asyncio.Queue()
54 _subscribers.setdefault(handle, set()).add(q)
55 return q
56
57 def unsubscribe_handle(handle: str, queue: asyncio.Queue[JSONObject]) -> None:
58 """Remove *queue* from the subscriber registry for *handle*.
59
60 Safe to call even if the queue was already removed.
61
62 Args:
63 handle: MuseHub handle the queue was subscribed to.
64 queue: The queue returned by :func:`subscribe_handle`.
65 """
66 if handle in _subscribers:
67 _subscribers[handle].discard(queue)
68 if not _subscribers[handle]:
69 del _subscribers[handle]
70
71 async def fan_out_to_subscribers(handle: str, event: JSONObject) -> None:
72 """Push *event* to every active SSE subscriber for *handle*.
73
74 Called from the wire push handler when a social-domain commit lands.
75 No-op when there are no subscribers — never blocks.
76
77 Args:
78 handle: Owner handle whose subscribers should receive the event.
79 event: JSON-serialisable dict (e.g. ``{"type": "social_delta", ...}``).
80 """
81 for q in list(_subscribers.get(handle, set())):
82 try:
83 q.put_nowait(event)
84 except asyncio.QueueFull:
85 logger.warning("social SSE queue full for handle=%s — dropping event", handle)
86
87 # ---------------------------------------------------------------------------
88 # Feed
89 # ---------------------------------------------------------------------------
90
91 def _utc_now() -> datetime:
92 return datetime.now(tz=timezone.utc)
93
94 async def get_social_feed(
95 session: AsyncSession,
96 handle: str,
97 *,
98 limit: int = 20,
99 cursor: str | None = None,
100 ) -> JSONObject:
101 """Return a cursor-paginated posts feed for *handle*.
102
103 Reads the HEAD commit of the social-domain repo for *handle*, decodes the
104 snapshot manifest, loads each ``posts/*.json`` object, and returns them
105 sorted newest-first.
106
107 Args:
108 session: SQLAlchemy async session.
109 handle: MuseHub owner handle.
110 limit: Maximum posts to return (default 20).
111 cursor: Opaque pagination cursor (ISO-8601 created_at of the last
112 seen post). Pass ``None`` for the first page.
113
114 Returns:
115 Dict with keys: ``handle``, ``posts``, ``total``, ``next_cursor``.
116 ``next_cursor`` is ``None`` when no further pages exist.
117 """
118 # 1. Find social repos for this handle.
119 repo_rows = (await session.execute(
120 select(MusehubRepo).where(
121 MusehubRepo.owner == handle,
122 MusehubRepo.domain_id == "social",
123 )
124 )).scalars().all()
125
126 if not repo_rows:
127 return {"handle": handle, "posts": [], "total": 0, "next_cursor": None}
128
129 # 2. For each repo, find the latest commit and decode the snapshot manifest.
130 all_posts: list[JSONObject] = []
131
132 for repo in repo_rows:
133 commit_row = (await session.execute(
134 select(MusehubCommit)
135 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
136 .where(MusehubCommitRef.repo_id == repo.repo_id)
137 .order_by(MusehubCommit.timestamp.desc())
138 .limit(1)
139 )).scalar_one_or_none()
140
141 if commit_row is None or not commit_row.snapshot_id:
142 continue
143
144 snap_row = (await session.execute(
145 select(MusehubSnapshot).where(
146 MusehubSnapshot.snapshot_id == commit_row.snapshot_id
147 )
148 )).scalar_one_or_none()
149
150 if snap_row is None or not snap_row.manifest_blob:
151 continue
152
153 manifest: dict[str, str] = msgpack.unpackb(snap_row.manifest_blob, raw=False)
154
155 # 3. Load each posts/*.json object.
156 for path, object_id in manifest.items():
157 if not path.startswith("posts/") or not path.endswith(".json"):
158 continue
159 obj_row = (await session.execute(
160 select(MusehubObject).where(
161 MusehubObject.object_id == object_id
162 )
163 )).scalar_one_or_none()
164
165 if obj_row is None:
166 continue
167
168 raw = obj_row.content_cache
169 if raw is None:
170 continue
171
172 try:
173 post_data: JSONObject = json.loads(raw)
174 except (json.JSONDecodeError, UnicodeDecodeError):
175 logger.warning("social: malformed post object %s — skipping", object_id)
176 continue
177
178 all_posts.append({
179 "post_id": object_id,
180 "body": post_data.get("body", ""),
181 "created_at": post_data.get("created_at", ""),
182 "reply_to": post_data.get("reply_to"),
183 "quote_of": post_data.get("quote_of"),
184 })
185
186 # 4. Sort newest-first by created_at.
187 all_posts.sort(key=lambda p: p.get("created_at", ""), reverse=True)
188
189 total = len(all_posts)
190
191 # 5. Apply cursor (skip posts up to and including cursor value).
192 if cursor is not None:
193 trimmed: list[JSONObject] = []
194 seen_cursor = False
195 for p in all_posts:
196 if seen_cursor:
197 trimmed.append(p)
198 elif p.get("created_at") == cursor:
199 seen_cursor = True
200 all_posts = trimmed
201
202 # 6. Paginate.
203 page = all_posts[:limit]
204 remaining = all_posts[limit:]
205 next_cursor: str | None = page[-1]["created_at"] if remaining and page else None # type: ignore[assignment]
206
207 return {
208 "handle": handle,
209 "posts": page,
210 "total": total,
211 "next_cursor": next_cursor,
212 }
File History 1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago