gabriel / musehub public
musehub_coord_server.py python
461 lines 16.2 KB
Raw
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor ⚠ breaking 1 day ago
1 """MuseHub Coordination Persistence Service — Phase 4.1.
2
3 Bridges the raw ``musehub_coord_records`` event bus into the materialized
4 ``musehub_coord_reservations`` and ``musehub_coord_tasks`` tables, enabling
5 the Intelligence Hub UI and MCP tools to query structured coordination state
6 without parsing raw JSON payloads.
7
8 Two layers:
9
10 1. **Materialization** — ``materialize_coord_record()`` is called after every
11 successful ``coord_push`` to extract and upsert structured rows from raw
12 events.
13
14 2. **CRUD** — ``list_reservations()``, ``list_tasks()``, ``claim_task()``,
15 ``complete_task()``, ``fail_task()``, ``extend_reservation()``, and
16 ``conflict_check()`` provide the full query surface for the V2 API and
17 MCP tools.
18 """
19
20 import logging
21 from datetime import datetime, timedelta, timezone
22
23 from sqlalchemy import and_, select, update
24 from sqlalchemy.ext.asyncio import AsyncSession
25
26 from musehub.db import coord_models as _cm
27 from musehub.types.json_types import JSONObject, json_list
28
29 logger = logging.getLogger(__name__)
30
31 # ── Helpers ───────────────────────────────────────────────────────────────────
32
33 def _utc_now() -> datetime:
34 return datetime.now(tz=timezone.utc)
35
36 def _ensure_utc(dt: datetime | None) -> datetime | None:
37 if dt is None:
38 return None
39 return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
40
41 # ── Materialization ───────────────────────────────────────────────────────────
42
43 async def materialize_coord_record(
44 session: AsyncSession,
45 repo_id: str,
46 kind: str,
47 record_id: str,
48 payload: JSONObject,
49 expires_at: datetime | None,
50 ) -> None:
51 """Extract structured rows from a raw coord record and upsert them.
52
53 Called after every successful coord_push. Idempotent — re-materializing
54 the same (repo_id, kind, record_id) is a no-op.
55
56 Supported kinds:
57 ``reservation`` → upsert ``MusehubCoordReservation``
58 ``release`` → mark matching reservation as released
59 ``task`` → upsert ``MusehubCoordTask``
60 ``claim`` → mark matching task as claimed
61 Other kinds (intent, heartbeat, dependency) are not materialized as
62 separate rows — they live only in the raw event bus.
63 """
64 try:
65 if kind == "reservation":
66 await _materialize_reservation(session, repo_id, record_id, payload, expires_at)
67 elif kind == "release":
68 await _materialize_release(session, repo_id, payload)
69 elif kind == "task":
70 await _materialize_task(session, repo_id, record_id, payload, expires_at)
71 elif kind == "claim":
72 await _materialize_claim(session, repo_id, payload)
73 # intent, heartbeat, dependency — live in coord_records only
74 except Exception as exc:
75 logger.warning(
76 "materialize_coord_record: failed for repo=%s kind=%s record_id=%s: %s",
77 repo_id, kind, record_id, exc,
78 )
79
80 async def _materialize_reservation(
81 session: AsyncSession,
82 repo_id: str,
83 record_id: str,
84 payload: JSONObject,
85 expires_at: datetime | None,
86 ) -> None:
87 """Upsert a MusehubCoordReservation from a raw reservation record."""
88 # payload shape from `muse coord reserve --json`:
89 # {reservation_id, run_id, branch, addresses: [...], created_at,
90 # expires_at, operation, conflicts, depends_on}
91 _addrs = payload.get("addresses")
92 addresses: list[str] = [str(a) for a in _addrs] if isinstance(_addrs, list) else []
93 _run_id = payload.get("run_id")
94 agent_id: str = str(_run_id) if isinstance(_run_id, str) else ""
95 _model_id = payload.get("model_id")
96 agent_model_id: str = str(_model_id) if isinstance(_model_id, str) else ""
97 _ttl = payload.get("ttl_s") or payload.get("ttl_seconds")
98 ttl_s: int = int(_ttl) if isinstance(_ttl, (int, float)) else 300
99 _cat = payload.get("created_at")
100 created_at = _parse_dt(str(_cat) if isinstance(_cat, str) else None) or _utc_now()
101 _eat = payload.get("expires_at")
102 exp = _parse_dt(str(_eat) if isinstance(_eat, str) else None) or expires_at or (created_at + timedelta(seconds=ttl_s))
103
104 _res_id = payload.get("reservation_id")
105 reservation_id: str = str(_res_id) if isinstance(_res_id, str) else record_id
106
107 for address in addresses:
108 existing = await session.get(_cm.MusehubCoordReservation, (reservation_id, address))
109 if existing is not None:
110 continue
111 row = _cm.MusehubCoordReservation(
112 reservation_id=reservation_id,
113 repo_id=repo_id,
114 symbol_address=address,
115 agent_id=agent_id,
116 agent_model_id=agent_model_id,
117 ttl_s=ttl_s,
118 created_at=created_at,
119 expires_at=exp,
120 released_at=None,
121 )
122 session.add(row)
123
124 try:
125 await session.flush()
126 except Exception:
127 await session.rollback()
128
129 async def _materialize_release(
130 session: AsyncSession,
131 repo_id: str,
132 payload: JSONObject,
133 ) -> None:
134 """Mark a reservation as released."""
135 reservation_id = payload.get("reservation_id")
136 if not reservation_id:
137 return
138 now = _utc_now()
139 await session.execute(
140 update(_cm.MusehubCoordReservation)
141 .where(
142 and_(
143 _cm.MusehubCoordReservation.reservation_id == reservation_id,
144 _cm.MusehubCoordReservation.repo_id == repo_id,
145 _cm.MusehubCoordReservation.released_at.is_(None),
146 )
147 )
148 .values(released_at=now)
149 )
150 await session.flush()
151
152 async def _materialize_task(
153 session: AsyncSession,
154 repo_id: str,
155 record_id: str,
156 payload: JSONObject,
157 expires_at: datetime | None,
158 ) -> None:
159 """Upsert a MusehubCoordTask from a raw task record."""
160 # payload shape from `muse coord enqueue --json`:
161 # {task_id, title, payload, priority, queue, created_at, created_by,
162 # ttl_seconds, tags}
163 _task_id = payload.get("task_id")
164 task_id: str = str(_task_id) if isinstance(_task_id, str) else record_id
165 _queue = payload.get("queue")
166 queue: str = str(_queue) if isinstance(_queue, str) else "default"
167 _pri = payload.get("priority")
168 priority: int = int(_pri) if isinstance(_pri, (int, float)) else 50
169 task_payload = dict(payload)
170 _status = payload.get("status")
171 status: str = str(_status) if isinstance(_status, str) else "pending"
172 _run_id_t = payload.get("created_by") or payload.get("run_id")
173 run_id: str | None = str(_run_id_t) if isinstance(_run_id_t, str) else None
174 _cat_t = payload.get("created_at")
175 created_at = _parse_dt(str(_cat_t) if isinstance(_cat_t, str) else None) or _utc_now()
176 _dep = payload.get("depends_on")
177 depends_on: list[str] = [str(d) for d in _dep] if isinstance(_dep, list) else []
178
179 existing = await session.get(_cm.MusehubCoordTask, task_id)
180 if existing is not None:
181 return
182
183 row = _cm.MusehubCoordTask(
184 task_id=task_id,
185 repo_id=repo_id,
186 queue=queue,
187 priority=priority,
188 payload=task_payload,
189 status=status,
190 claimed_by=None,
191 claimed_at=None,
192 completed_at=None,
193 depends_on=depends_on,
194 run_id=run_id,
195 created_at=created_at,
196 )
197 session.add(row)
198 try:
199 await session.flush()
200 except Exception:
201 await session.rollback()
202
203 async def _materialize_claim(
204 session: AsyncSession,
205 repo_id: str,
206 payload: JSONObject,
207 ) -> None:
208 """Mark a task as claimed by an agent."""
209 task_id = payload.get("task_id")
210 agent_id = payload.get("agent_id") or payload.get("run_id")
211 if not task_id or not agent_id:
212 return
213 now = _utc_now()
214 await session.execute(
215 update(_cm.MusehubCoordTask)
216 .where(
217 and_(
218 _cm.MusehubCoordTask.task_id == task_id,
219 _cm.MusehubCoordTask.repo_id == repo_id,
220 _cm.MusehubCoordTask.status == "pending",
221 )
222 )
223 .values(status="claimed", claimed_by=agent_id, claimed_at=now)
224 )
225 await session.flush()
226
227 # ── Reservations CRUD ─────────────────────────────────────────────────────────
228
229 async def list_reservations(
230 session: AsyncSession,
231 repo_id: str,
232 *,
233 include_expired: bool = False,
234 include_released: bool = False,
235 agent_id: str | None = None,
236 limit: int = 200,
237 ) -> list[_cm.MusehubCoordReservation]:
238 """Return materialized reservations for a repo."""
239 now = _utc_now()
240 stmt = (
241 select(_cm.MusehubCoordReservation)
242 .where(_cm.MusehubCoordReservation.repo_id == repo_id)
243 .order_by(_cm.MusehubCoordReservation.created_at.desc())
244 .limit(limit)
245 )
246 if not include_expired:
247 stmt = stmt.where(_cm.MusehubCoordReservation.expires_at > now)
248 if not include_released:
249 stmt = stmt.where(_cm.MusehubCoordReservation.released_at.is_(None))
250 if agent_id:
251 stmt = stmt.where(_cm.MusehubCoordReservation.agent_id == agent_id)
252 result = await session.execute(stmt)
253 return list(result.scalars().all())
254
255 async def extend_reservation(
256 session: AsyncSession,
257 repo_id: str,
258 reservation_id: str,
259 extend_by_s: int = 300,
260 ) -> _cm.MusehubCoordReservation | None:
261 """Extend a reservation's TTL for all rows sharing this reservation_id."""
262 # Fetch any row to check existence and released state (all rows share same metadata).
263 result = await session.execute(
264 select(_cm.MusehubCoordReservation).where(
265 and_(
266 _cm.MusehubCoordReservation.reservation_id == reservation_id,
267 _cm.MusehubCoordReservation.repo_id == repo_id,
268 )
269 ).limit(1)
270 )
271 row = result.scalar_one_or_none()
272 if row is None or row.released_at is not None:
273 return None
274
275 now = _utc_now()
276 base = max(now, _ensure_utc(row.expires_at) or now)
277 new_expires_at = base + timedelta(seconds=extend_by_s)
278
279 # Update ALL rows for this reservation_id atomically.
280 await session.execute(
281 update(_cm.MusehubCoordReservation)
282 .where(
283 and_(
284 _cm.MusehubCoordReservation.reservation_id == reservation_id,
285 _cm.MusehubCoordReservation.repo_id == repo_id,
286 _cm.MusehubCoordReservation.released_at.is_(None),
287 )
288 )
289 .values(expires_at=new_expires_at, ttl_s=extend_by_s)
290 )
291 await session.flush()
292 await session.commit()
293 row.expires_at = new_expires_at
294 row.ttl_s = extend_by_s
295 return row
296
297 async def conflict_check(
298 session: AsyncSession,
299 repo_id: str,
300 addresses: list[str],
301 ) -> list[JSONObject]:
302 """Return active reservations that overlap with the given symbol addresses.
303
304 Used by agents before reserving symbols to detect conflicts.
305 """
306 if not addresses:
307 return []
308 now = _utc_now()
309 stmt = select(_cm.MusehubCoordReservation).where(
310 and_(
311 _cm.MusehubCoordReservation.repo_id == repo_id,
312 _cm.MusehubCoordReservation.symbol_address.in_(addresses),
313 _cm.MusehubCoordReservation.expires_at > now,
314 _cm.MusehubCoordReservation.released_at.is_(None),
315 )
316 )
317 result = await session.execute(stmt)
318 rows = result.scalars().all()
319 return [_reservation_to_dict(r) for r in rows]
320
321 # ── Tasks CRUD ────────────────────────────────────────────────────────────────
322
323 async def list_tasks(
324 session: AsyncSession,
325 repo_id: str,
326 *,
327 queue: str | None = None,
328 status: str | None = None,
329 limit: int = 100,
330 ) -> list[_cm.MusehubCoordTask]:
331 """Return tasks for a repo, optionally filtered by queue/status."""
332 stmt = (
333 select(_cm.MusehubCoordTask)
334 .where(_cm.MusehubCoordTask.repo_id == repo_id)
335 .order_by(
336 _cm.MusehubCoordTask.priority.asc(),
337 _cm.MusehubCoordTask.created_at.asc(),
338 )
339 .limit(limit)
340 )
341 if queue:
342 stmt = stmt.where(_cm.MusehubCoordTask.queue == queue)
343 if status:
344 stmt = stmt.where(_cm.MusehubCoordTask.status == status)
345 result = await session.execute(stmt)
346 return list(result.scalars().all())
347
348 async def claim_task(
349 session: AsyncSession,
350 repo_id: str,
351 task_id: str,
352 agent_id: str,
353 ) -> _cm.MusehubCoordTask | None:
354 """Atomically claim a pending task for an agent.
355
356 Returns the updated task on success, or None if the task is already
357 claimed, doesn't exist, or belongs to a different repo.
358 """
359 row = await session.get(_cm.MusehubCoordTask, task_id)
360 if row is None or row.repo_id != repo_id:
361 return None
362 if row.status != "pending":
363 return None
364 row.status = "claimed"
365 row.claimed_by = agent_id
366 row.claimed_at = _utc_now()
367 await session.flush()
368 await session.commit()
369 return row
370
371 async def complete_task(
372 session: AsyncSession,
373 repo_id: str,
374 task_id: str,
375 agent_id: str,
376 result: JSONObject | None = None,
377 ) -> _cm.MusehubCoordTask | None:
378 """Mark a claimed task as completed."""
379 row = await session.get(_cm.MusehubCoordTask, task_id)
380 if row is None or row.repo_id != repo_id:
381 return None
382 if row.status != "claimed" or row.claimed_by != agent_id:
383 return None
384 row.status = "completed"
385 row.completed_at = _utc_now()
386 if result:
387 payload = dict(row.payload)
388 payload["result"] = result
389 row.payload = payload
390 await session.flush()
391 await session.commit()
392 return row
393
394 async def fail_task(
395 session: AsyncSession,
396 repo_id: str,
397 task_id: str,
398 agent_id: str,
399 reason: str = "",
400 ) -> _cm.MusehubCoordTask | None:
401 """Mark a claimed task as failed."""
402 row = await session.get(_cm.MusehubCoordTask, task_id)
403 if row is None or row.repo_id != repo_id:
404 return None
405 if row.status != "claimed" or row.claimed_by != agent_id:
406 return None
407 row.status = "failed"
408 row.completed_at = _utc_now()
409 if reason:
410 payload = dict(row.payload)
411 payload["failure_reason"] = reason
412 row.payload = payload
413 await session.flush()
414 await session.commit()
415 return row
416
417 # ── Serialization helpers ─────────────────────────────────────────────────────
418
419 def _reservation_to_dict(r: _cm.MusehubCoordReservation) -> JSONObject:
420 return {
421 "reservation_id": r.reservation_id,
422 "repo_id": r.repo_id,
423 "symbol_address": r.symbol_address,
424 "agent_id": r.agent_id,
425 "agent_model_id": r.agent_model_id,
426 "ttl_s": r.ttl_s,
427 "created_at": r.created_at.isoformat() if r.created_at else None,
428 "expires_at": r.expires_at.isoformat() if r.expires_at else None,
429 "released_at": r.released_at.isoformat() if r.released_at else None,
430 "active": (
431 r.released_at is None
432 and r.expires_at is not None
433 and (_ensure_utc(r.expires_at) or _utc_now()) > _utc_now()
434 ),
435 }
436
437 def _task_to_dict(t: _cm.MusehubCoordTask) -> JSONObject:
438 return {
439 "task_id": t.task_id,
440 "repo_id": t.repo_id,
441 "queue": t.queue,
442 "priority": t.priority,
443 "payload": t.payload,
444 "status": t.status,
445 "claimed_by": t.claimed_by,
446 "claimed_at": t.claimed_at.isoformat() if t.claimed_at else None,
447 "completed_at": t.completed_at.isoformat() if t.completed_at else None,
448 "depends_on": json_list(t.depends_on or []),
449 "run_id": t.run_id,
450 "created_at": t.created_at.isoformat() if t.created_at else None,
451 }
452
453 def _parse_dt(raw: str | None) -> datetime | None:
454 if not raw:
455 return None
456 try:
457 s = str(raw).replace("Z", "+00:00")
458 dt = datetime.fromisoformat(s)
459 return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
460 except Exception:
461 return None
File History 1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor 1 day ago