"""MuseHub Coordination Persistence Service — Phase 4.1. Bridges the raw ``musehub_coord_records`` event bus into the materialized ``musehub_coord_reservations`` and ``musehub_coord_tasks`` tables, enabling the Intelligence Hub UI and MCP tools to query structured coordination state without parsing raw JSON payloads. Two layers: 1. **Materialization** — ``materialize_coord_record()`` is called after every successful ``coord_push`` to extract and upsert structured rows from raw events. 2. **CRUD** — ``list_reservations()``, ``list_tasks()``, ``claim_task()``, ``complete_task()``, ``fail_task()``, ``extend_reservation()``, and ``conflict_check()`` provide the full query surface for the V2 API and MCP tools. """ import logging from datetime import datetime, timedelta, timezone from sqlalchemy import and_, select, update from sqlalchemy.ext.asyncio import AsyncSession from musehub.db import coord_models as _cm from musehub.types.json_types import JSONObject, json_list logger = logging.getLogger(__name__) # ── Helpers ─────────────────────────────────────────────────────────────────── def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) def _ensure_utc(dt: datetime | None) -> datetime | None: if dt is None: return None return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) # ── Materialization ─────────────────────────────────────────────────────────── async def materialize_coord_record( session: AsyncSession, repo_id: str, kind: str, record_id: str, payload: JSONObject, expires_at: datetime | None, ) -> None: """Extract structured rows from a raw coord record and upsert them. Called after every successful coord_push. Idempotent — re-materializing the same (repo_id, kind, record_id) is a no-op. Supported kinds: ``reservation`` → upsert ``MusehubCoordReservation`` ``release`` → mark matching reservation as released ``task`` → upsert ``MusehubCoordTask`` ``claim`` → mark matching task as claimed Other kinds (intent, heartbeat, dependency) are not materialized as separate rows — they live only in the raw event bus. """ try: if kind == "reservation": await _materialize_reservation(session, repo_id, record_id, payload, expires_at) elif kind == "release": await _materialize_release(session, repo_id, payload) elif kind == "task": await _materialize_task(session, repo_id, record_id, payload, expires_at) elif kind == "claim": await _materialize_claim(session, repo_id, payload) # intent, heartbeat, dependency — live in coord_records only except Exception as exc: logger.warning( "materialize_coord_record: failed for repo=%s kind=%s record_id=%s: %s", repo_id, kind, record_id, exc, ) async def _materialize_reservation( session: AsyncSession, repo_id: str, record_id: str, payload: JSONObject, expires_at: datetime | None, ) -> None: """Upsert a MusehubCoordReservation from a raw reservation record.""" # payload shape from `muse coord reserve --json`: # {reservation_id, run_id, branch, addresses: [...], created_at, # expires_at, operation, conflicts, depends_on} _addrs = payload.get("addresses") addresses: list[str] = [str(a) for a in _addrs] if isinstance(_addrs, list) else [] _run_id = payload.get("run_id") agent_id: str = str(_run_id) if isinstance(_run_id, str) else "" _model_id = payload.get("model_id") agent_model_id: str = str(_model_id) if isinstance(_model_id, str) else "" _ttl = payload.get("ttl_s") or payload.get("ttl_seconds") ttl_s: int = int(_ttl) if isinstance(_ttl, (int, float)) else 300 _cat = payload.get("created_at") created_at = _parse_dt(str(_cat) if isinstance(_cat, str) else None) or _utc_now() _eat = payload.get("expires_at") exp = _parse_dt(str(_eat) if isinstance(_eat, str) else None) or expires_at or (created_at + timedelta(seconds=ttl_s)) _res_id = payload.get("reservation_id") reservation_id: str = str(_res_id) if isinstance(_res_id, str) else record_id for address in addresses: existing = await session.get(_cm.MusehubCoordReservation, (reservation_id, address)) if existing is not None: continue row = _cm.MusehubCoordReservation( reservation_id=reservation_id, repo_id=repo_id, symbol_address=address, agent_id=agent_id, agent_model_id=agent_model_id, ttl_s=ttl_s, created_at=created_at, expires_at=exp, released_at=None, ) session.add(row) try: await session.flush() except Exception: await session.rollback() async def _materialize_release( session: AsyncSession, repo_id: str, payload: JSONObject, ) -> None: """Mark a reservation as released.""" reservation_id = payload.get("reservation_id") if not reservation_id: return now = _utc_now() await session.execute( update(_cm.MusehubCoordReservation) .where( and_( _cm.MusehubCoordReservation.reservation_id == reservation_id, _cm.MusehubCoordReservation.repo_id == repo_id, _cm.MusehubCoordReservation.released_at.is_(None), ) ) .values(released_at=now) ) await session.flush() async def _materialize_task( session: AsyncSession, repo_id: str, record_id: str, payload: JSONObject, expires_at: datetime | None, ) -> None: """Upsert a MusehubCoordTask from a raw task record.""" # payload shape from `muse coord enqueue --json`: # {task_id, title, payload, priority, queue, created_at, created_by, # ttl_seconds, tags} _task_id = payload.get("task_id") task_id: str = str(_task_id) if isinstance(_task_id, str) else record_id _queue = payload.get("queue") queue: str = str(_queue) if isinstance(_queue, str) else "default" _pri = payload.get("priority") priority: int = int(_pri) if isinstance(_pri, (int, float)) else 50 task_payload = dict(payload) _status = payload.get("status") status: str = str(_status) if isinstance(_status, str) else "pending" _run_id_t = payload.get("created_by") or payload.get("run_id") run_id: str | None = str(_run_id_t) if isinstance(_run_id_t, str) else None _cat_t = payload.get("created_at") created_at = _parse_dt(str(_cat_t) if isinstance(_cat_t, str) else None) or _utc_now() _dep = payload.get("depends_on") depends_on: list[str] = [str(d) for d in _dep] if isinstance(_dep, list) else [] existing = await session.get(_cm.MusehubCoordTask, task_id) if existing is not None: return row = _cm.MusehubCoordTask( task_id=task_id, repo_id=repo_id, queue=queue, priority=priority, payload=task_payload, status=status, claimed_by=None, claimed_at=None, completed_at=None, depends_on=depends_on, run_id=run_id, created_at=created_at, ) session.add(row) try: await session.flush() except Exception: await session.rollback() async def _materialize_claim( session: AsyncSession, repo_id: str, payload: JSONObject, ) -> None: """Mark a task as claimed by an agent.""" task_id = payload.get("task_id") agent_id = payload.get("agent_id") or payload.get("run_id") if not task_id or not agent_id: return now = _utc_now() await session.execute( update(_cm.MusehubCoordTask) .where( and_( _cm.MusehubCoordTask.task_id == task_id, _cm.MusehubCoordTask.repo_id == repo_id, _cm.MusehubCoordTask.status == "pending", ) ) .values(status="claimed", claimed_by=agent_id, claimed_at=now) ) await session.flush() # ── Reservations CRUD ───────────────────────────────────────────────────────── async def list_reservations( session: AsyncSession, repo_id: str, *, include_expired: bool = False, include_released: bool = False, agent_id: str | None = None, limit: int = 200, ) -> list[_cm.MusehubCoordReservation]: """Return materialized reservations for a repo.""" now = _utc_now() stmt = ( select(_cm.MusehubCoordReservation) .where(_cm.MusehubCoordReservation.repo_id == repo_id) .order_by(_cm.MusehubCoordReservation.created_at.desc()) .limit(limit) ) if not include_expired: stmt = stmt.where(_cm.MusehubCoordReservation.expires_at > now) if not include_released: stmt = stmt.where(_cm.MusehubCoordReservation.released_at.is_(None)) if agent_id: stmt = stmt.where(_cm.MusehubCoordReservation.agent_id == agent_id) result = await session.execute(stmt) return list(result.scalars().all()) async def extend_reservation( session: AsyncSession, repo_id: str, reservation_id: str, extend_by_s: int = 300, ) -> _cm.MusehubCoordReservation | None: """Extend a reservation's TTL for all rows sharing this reservation_id.""" # Fetch any row to check existence and released state (all rows share same metadata). result = await session.execute( select(_cm.MusehubCoordReservation).where( and_( _cm.MusehubCoordReservation.reservation_id == reservation_id, _cm.MusehubCoordReservation.repo_id == repo_id, ) ).limit(1) ) row = result.scalar_one_or_none() if row is None or row.released_at is not None: return None now = _utc_now() base = max(now, _ensure_utc(row.expires_at) or now) new_expires_at = base + timedelta(seconds=extend_by_s) # Update ALL rows for this reservation_id atomically. await session.execute( update(_cm.MusehubCoordReservation) .where( and_( _cm.MusehubCoordReservation.reservation_id == reservation_id, _cm.MusehubCoordReservation.repo_id == repo_id, _cm.MusehubCoordReservation.released_at.is_(None), ) ) .values(expires_at=new_expires_at, ttl_s=extend_by_s) ) await session.flush() await session.commit() row.expires_at = new_expires_at row.ttl_s = extend_by_s return row async def conflict_check( session: AsyncSession, repo_id: str, addresses: list[str], ) -> list[JSONObject]: """Return active reservations that overlap with the given symbol addresses. Used by agents before reserving symbols to detect conflicts. """ if not addresses: return [] now = _utc_now() stmt = select(_cm.MusehubCoordReservation).where( and_( _cm.MusehubCoordReservation.repo_id == repo_id, _cm.MusehubCoordReservation.symbol_address.in_(addresses), _cm.MusehubCoordReservation.expires_at > now, _cm.MusehubCoordReservation.released_at.is_(None), ) ) result = await session.execute(stmt) rows = result.scalars().all() return [_reservation_to_dict(r) for r in rows] # ── Tasks CRUD ──────────────────────────────────────────────────────────────── async def list_tasks( session: AsyncSession, repo_id: str, *, queue: str | None = None, status: str | None = None, limit: int = 100, ) -> list[_cm.MusehubCoordTask]: """Return tasks for a repo, optionally filtered by queue/status.""" stmt = ( select(_cm.MusehubCoordTask) .where(_cm.MusehubCoordTask.repo_id == repo_id) .order_by( _cm.MusehubCoordTask.priority.asc(), _cm.MusehubCoordTask.created_at.asc(), ) .limit(limit) ) if queue: stmt = stmt.where(_cm.MusehubCoordTask.queue == queue) if status: stmt = stmt.where(_cm.MusehubCoordTask.status == status) result = await session.execute(stmt) return list(result.scalars().all()) async def claim_task( session: AsyncSession, repo_id: str, task_id: str, agent_id: str, ) -> _cm.MusehubCoordTask | None: """Atomically claim a pending task for an agent. Returns the updated task on success, or None if the task is already claimed, doesn't exist, or belongs to a different repo. """ row = await session.get(_cm.MusehubCoordTask, task_id) if row is None or row.repo_id != repo_id: return None if row.status != "pending": return None row.status = "claimed" row.claimed_by = agent_id row.claimed_at = _utc_now() await session.flush() await session.commit() return row async def complete_task( session: AsyncSession, repo_id: str, task_id: str, agent_id: str, result: JSONObject | None = None, ) -> _cm.MusehubCoordTask | None: """Mark a claimed task as completed.""" row = await session.get(_cm.MusehubCoordTask, task_id) if row is None or row.repo_id != repo_id: return None if row.status != "claimed" or row.claimed_by != agent_id: return None row.status = "completed" row.completed_at = _utc_now() if result: payload = dict(row.payload) payload["result"] = result row.payload = payload await session.flush() await session.commit() return row async def fail_task( session: AsyncSession, repo_id: str, task_id: str, agent_id: str, reason: str = "", ) -> _cm.MusehubCoordTask | None: """Mark a claimed task as failed.""" row = await session.get(_cm.MusehubCoordTask, task_id) if row is None or row.repo_id != repo_id: return None if row.status != "claimed" or row.claimed_by != agent_id: return None row.status = "failed" row.completed_at = _utc_now() if reason: payload = dict(row.payload) payload["failure_reason"] = reason row.payload = payload await session.flush() await session.commit() return row # ── Serialization helpers ───────────────────────────────────────────────────── def _reservation_to_dict(r: _cm.MusehubCoordReservation) -> JSONObject: return { "reservation_id": r.reservation_id, "repo_id": r.repo_id, "symbol_address": r.symbol_address, "agent_id": r.agent_id, "agent_model_id": r.agent_model_id, "ttl_s": r.ttl_s, "created_at": r.created_at.isoformat() if r.created_at else None, "expires_at": r.expires_at.isoformat() if r.expires_at else None, "released_at": r.released_at.isoformat() if r.released_at else None, "active": ( r.released_at is None and r.expires_at is not None and (_ensure_utc(r.expires_at) or _utc_now()) > _utc_now() ), } def _task_to_dict(t: _cm.MusehubCoordTask) -> JSONObject: return { "task_id": t.task_id, "repo_id": t.repo_id, "queue": t.queue, "priority": t.priority, "payload": t.payload, "status": t.status, "claimed_by": t.claimed_by, "claimed_at": t.claimed_at.isoformat() if t.claimed_at else None, "completed_at": t.completed_at.isoformat() if t.completed_at else None, "depends_on": json_list(t.depends_on or []), "run_id": t.run_id, "created_at": t.created_at.isoformat() if t.created_at else None, } def _parse_dt(raw: str | None) -> datetime | None: if not raw: return None try: s = str(raw).replace("Z", "+00:00") dt = datetime.fromisoformat(s) return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) except Exception: return None