gabriel / musehub public
musehub_sync.py python
546 lines 20.8 KB
Raw
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520 fix: blobs only in S3/mpack — remove commit/snapshot indivi… Sonnet 4.6 patch 16 days ago
1 """MuseHub sync service — push and pull protocol implementation.
2
3 Implements the two core data-movement operations:
4 - ``ingest_push``: stores commits and objects from a client push, enforcing
5 fast-forward semantics and updating the branch head.
6 - ``compute_pull_delta``: returns commits and objects the client does not yet
7 have, keyed by their ``have_commits`` / ``have_objects`` exclusion lists.
8
9 Object content is written to the blob store (R2/MinIO) and metadata
10 (path, size, storage_uri) is persisted to Postgres.
11
12 Boundary rules (same as musehub_repository):
13 - Must NOT import state stores, SSE queues, or LLM clients.
14 - May import ORM models from musehub.db domain-specific modules.
15 - May import Pydantic models from musehub.models.musehub.
16 """
17
18 import base64
19 import datetime
20 import hashlib
21 import logging
22
23 from sqlalchemy import select
24 from sqlalchemy.ext.asyncio import AsyncSession
25
26 from muse.core.types import blob_id
27 from musehub.core.genesis import compute_branch_id
28 from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubObject, MusehubObjectRef, MusehubRepo, MusehubSnapshot
29 from musehub.models.musehub import (
30 CommitInput,
31 CommitResponse,
32 ObjectInput,
33 ObjectResponse,
34 PullResponse,
35 PushResponse,
36 SnapshotInput,
37 )
38
39 logger = logging.getLogger(__name__)
40
41
42 # ---------------------------------------------------------------------------
43 # Internal helpers
44 # ---------------------------------------------------------------------------
45
46
47 def _to_commit_response(row: MusehubCommit) -> CommitResponse:
48 return CommitResponse(
49 commit_id=row.commit_id,
50 branch=row.branch,
51 parent_ids=list(row.parent_ids or []),
52 message=row.message,
53 author=row.author,
54 timestamp=row.timestamp,
55 snapshot_id=row.snapshot_id,
56 )
57
58
59 async def _to_object_response(row: MusehubObject) -> ObjectResponse:
60 """Fetch object bytes from the blob store and return as base64-encoded response."""
61 from musehub.storage.backends import get_backend
62 data = await get_backend().get(row.object_id)
63 if data is None:
64 logger.warning("⚠️ Object missing from blob store: object_id=%s", row.object_id)
65 data = b""
66 return ObjectResponse(
67 object_id=row.object_id,
68 path=row.path,
69 content_b64=base64.b64encode(data).decode(),
70 )
71
72
73
74 def _is_fast_forward(
75 remote_head: str | None,
76 head_commit_id: str,
77 commits: list[CommitInput],
78 ) -> bool:
79 """Return True if the push is a fast-forward update.
80
81 A push is fast-forward when:
82 - the remote branch has no head yet (first push), or
83 - the new head_commit_id equals the remote head (no-op), or
84 - the remote head appears somewhere in the ancestry graph of the pushed
85 commits (meaning the client built on top of the remote head).
86
87 We build a local graph from the pushed commits and walk parents. This
88 does NOT query the DB for previously stored commits — for MVP the client
89 is expected to include all commits since the common ancestor.
90 """
91 if remote_head is None:
92 return True
93 if head_commit_id == remote_head:
94 return True
95
96 from muse.core.graph import walk_dag
97 parent_map = {c.commit_id: c.parent_ids for c in commits}
98 for cid in walk_dag(head_commit_id, lambda cid: parent_map.get(cid, [])):
99 if cid == remote_head:
100 return True
101 return False
102
103
104 # ---------------------------------------------------------------------------
105 # Public API
106 # ---------------------------------------------------------------------------
107
108
109 async def ingest_push(
110 session: AsyncSession,
111 *,
112 repo_id: str,
113 branch: str,
114 head_commit_id: str,
115 commits: list[CommitInput],
116 snapshots: list[SnapshotInput] | None = None,
117 objects: list[ObjectInput],
118 force: bool,
119 author: str,
120 ) -> PushResponse:
121 """Store commits, snapshot manifests, and objects from a push; update the branch head.
122
123 Execution steps (in order):
124
125 1. **Resolve / create branch** — upsert the branch row; first push initialises
126 ``head_commit_id = None`` and ``default_branch`` is set only when this is the
127 repo's inaugural branch.
128 2. **Fast-forward check** — BFS traverses both ``parent_commit_id`` and
129 ``parent2_commit_id`` to support merge commits. Rejected with
130 ``ValueError("non_fast_forward")`` when the current tip is not an ancestor of
131 the new head and ``force`` is False. The route handler maps this to HTTP 409.
132 3. **Upsert commits** — existing commit IDs are skipped; new rows are bulk-inserted.
133 4. **Upsert snapshots** — each :class:`SnapshotInput` is stored with
134 ``(repo_id, snapshot_id)`` as a composite key. Re-pushing an identical
135 snapshot is a safe no-op (idempotent via ``merge`` / ``ON CONFLICT DO NOTHING``).
136 5. **Upsert objects** — binary blobs are written to the configured storage backend
137 and their metadata rows are inserted or skipped if already present.
138 6. **Update branch head** — the branch row's ``head_commit_id`` is set to
139 ``head_commit_id`` and ``pushed_at`` is refreshed.
140
141 Args:
142 session: Active async SQLAlchemy session.
143 repo_id: Repository ID; must already exist in the DB.
144 branch: Target branch name (created on first push).
145 head_commit_id: SHA of the new branch tip after the push.
146 commits: Ordered list of :class:`CommitInput` objects to store.
147 snapshots: Optional list of :class:`SnapshotInput` objects; ``None``
148 or ``[]`` are both treated as "no snapshots in this push".
149 objects: Content-addressed blob payloads.
150 force: When ``True``, skip the fast-forward check (destructive).
151 author: Username performing the push (used for the response summary).
152
153 Returns:
154 :class:`PushResponse` with commit / snapshot / object counts and the
155 new branch head commit ID.
156
157 Raises:
158 ValueError: with key ``"non_fast_forward"`` when the update would create
159 a non-linear history and ``force`` is False.
160 """
161 # ------------------------------------------------------------------
162 # 1. Resolve (or create) the branch
163 # ------------------------------------------------------------------
164 branch_row = await _get_or_create_branch(session, repo_id=repo_id, branch=branch)
165
166 # ------------------------------------------------------------------
167 # 2. Fast-forward check
168 # ------------------------------------------------------------------
169 if not force and not _is_fast_forward(branch_row.head_commit_id, head_commit_id, commits):
170 logger.warning(
171 "⚠️ Non-fast-forward push rejected for repo=%s branch=%s remote_head=%s new_head=%s",
172 repo_id,
173 branch,
174 branch_row.head_commit_id,
175 head_commit_id,
176 )
177 raise ValueError("non_fast_forward")
178
179 # ------------------------------------------------------------------
180 # 3. Upsert commits
181 # ------------------------------------------------------------------
182 existing_commit_ids: set[str] = set()
183 if commits:
184 stmt = select(MusehubCommitRef.commit_id).where(
185 MusehubCommitRef.repo_id == repo_id,
186 MusehubCommitRef.commit_id.in_([c.commit_id for c in commits]),
187 )
188 result = await session.execute(stmt)
189 existing_commit_ids = set(result.scalars().all())
190
191 # ------------------------------------------------------------------
192 # 3a. Parent existence validation (Phase 8 / invariant 8 parity)
193 #
194 # Every parent_id referenced by an incoming commit must either:
195 # (a) be in this push mpack itself, or
196 # (b) already exist in the DB for this repo.
197 #
198 # Without this guard, a client can push a commit whose parent is a
199 # fabricated or missing ID. `muse log` then walks off the end of
200 # history trying to read the parent — silent data corruption.
201 #
202 # Mirrors the same validation in wire_push_unpack_mpack() so the ingest
203 # path is equally safe.
204 # ------------------------------------------------------------------
205 mpack_commit_ids: set[str] = {c.commit_id for c in commits}
206 external_parent_ids: set[str] = set()
207 for c in commits:
208 if c.commit_id in existing_commit_ids:
209 continue
210 for pid in (c.parent_ids or []):
211 if pid and pid not in mpack_commit_ids:
212 external_parent_ids.add(pid)
213
214 if external_parent_ids:
215 db_parent_ids: set[str] = set(
216 (await session.execute(
217 select(MusehubCommitRef.commit_id).where(
218 MusehubCommitRef.commit_id.in_(external_parent_ids),
219 MusehubCommitRef.repo_id == repo_id,
220 )
221 )).scalars().all()
222 )
223 missing_parents = external_parent_ids - db_parent_ids
224 if missing_parents:
225 short = ", ".join(p[:16] for p in sorted(missing_parents))
226 raise ValueError(f"missing_parent_commits: {short}")
227
228 new_commits: list[MusehubCommit] = []
229 new_commit_refs: list[MusehubCommitRef] = []
230 for c in commits:
231 if c.commit_id in existing_commit_ids:
232 new_commit_refs.append(MusehubCommitRef(repo_id=repo_id, commit_id=c.commit_id))
233 continue
234 row = MusehubCommit(
235 commit_id=c.commit_id,
236 branch=branch,
237 parent_ids=c.parent_ids,
238 message=c.message,
239 author=c.author if c.author else author,
240 timestamp=c.timestamp,
241 snapshot_id=c.snapshot_id,
242 )
243 new_commits.append(row)
244 new_commit_refs.append(MusehubCommitRef(repo_id=repo_id, commit_id=c.commit_id))
245 if new_commits:
246 session.add_all(new_commits)
247 logger.info("✅ Ingested %d new commits for repo=%s", len(new_commits), repo_id)
248 if new_commit_refs:
249 session.add_all(new_commit_refs)
250
251 # ------------------------------------------------------------------
252 # 4. Upsert snapshots (idempotent — always writes entries)
253 # ------------------------------------------------------------------
254 from musehub.services.musehub_snapshot import upsert_snapshot_entries
255 ingest_snapshots = snapshots or []
256 for s in ingest_snapshots:
257 if not s.snapshot_id:
258 continue
259 manifest = s.manifest if isinstance(s.manifest, dict) else {}
260 await upsert_snapshot_entries(session, repo_id, s.snapshot_id, manifest)
261 if ingest_snapshots:
262 logger.info("✅ Ingested %d snapshots for repo=%s", len(ingest_snapshots), repo_id)
263
264 # ------------------------------------------------------------------
265 # 5. Upsert objects (write bytes to per-repo store, metadata to DB)
266 # ------------------------------------------------------------------
267 existing_object_ids: set[str] = set()
268 if objects:
269 stmt_obj = select(MusehubObjectRef.object_id).where(
270 MusehubObjectRef.repo_id == repo_id,
271 MusehubObjectRef.object_id.in_([o.object_id for o in objects]),
272 )
273 res_obj = await session.execute(stmt_obj)
274 existing_object_ids = set(res_obj.scalars().all())
275
276 repo_row = await session.get(MusehubRepo, repo_id) if objects else None
277
278 for obj in objects:
279 if obj.object_id in existing_object_ids:
280 continue
281 await _write_object(
282 session,
283 repo_id=repo_id,
284 obj=obj,
285 )
286
287 # ------------------------------------------------------------------
288 # 6. Update branch head
289 # ------------------------------------------------------------------
290 branch_row.head_commit_id = head_commit_id
291 logger.info(
292 "✅ Branch '%s' head updated to %s for repo=%s",
293 branch,
294 head_commit_id,
295 repo_id,
296 )
297
298 await session.flush()
299 return PushResponse(ok=True, remote_head=head_commit_id)
300
301
302 _PULL_OBJECTS_PAGE_SIZE: int = 500 # max objects returned per pull response
303
304
305 async def compute_pull_delta(
306 session: AsyncSession,
307 *,
308 repo_id: str,
309 branch: str,
310 have_commits: list[str],
311 have_objects: list[str],
312 cursor: str | None = None,
313 ) -> PullResponse:
314 """Return commits and objects the caller does not have.
315
316 Objects are paginated at ``_PULL_OBJECTS_PAGE_SIZE`` items per response to
317 prevent a single pull from returning an unbounded payload (OOM / timeout).
318
319 Pagination protocol:
320 - First call: ``cursor=None``
321 - When ``has_more=True``, re-issue with ``cursor=response.next_cursor``
322 - The ``next_cursor`` is the ``object_id`` of the last item in this page;
323 the next page starts *after* that ID (keyset pagination, stable sort).
324
325 Commits are never paginated — a repo's commit graph is O(kB per commit),
326 so even 10 000 commits stay well within a single JSON response.
327 """
328 branch_row = await _get_branch(session, repo_id=repo_id, branch=branch)
329 remote_head = branch_row.head_commit_id if branch_row else None
330
331 # ------------------------------------------------------------------
332 # Missing commits (no pagination — commit metadata is small)
333 # ------------------------------------------------------------------
334 commit_stmt = (
335 select(MusehubCommit)
336 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
337 .where(MusehubCommitRef.repo_id == repo_id, MusehubCommit.branch == branch)
338 )
339 if have_commits:
340 commit_stmt = commit_stmt.where(
341 MusehubCommit.commit_id.notin_(have_commits)
342 )
343 commit_rows = (await session.execute(commit_stmt)).scalars().all()
344 missing_commits = [_to_commit_response(r) for r in commit_rows]
345
346 # ------------------------------------------------------------------
347 # Missing objects — keyset-paginated, capped at _PULL_OBJECTS_PAGE_SIZE
348 # ------------------------------------------------------------------
349 obj_stmt = (
350 select(MusehubObject)
351 .join(MusehubObjectRef, MusehubObject.object_id == MusehubObjectRef.object_id)
352 .where(MusehubObjectRef.repo_id == repo_id)
353 .order_by(MusehubObject.object_id) # stable keyset sort
354 .limit(_PULL_OBJECTS_PAGE_SIZE + 1) # fetch one extra to detect next page
355 )
356 if have_objects:
357 obj_stmt = obj_stmt.where(MusehubObject.object_id.notin_(have_objects))
358 if cursor:
359 # Resume after the last seen object_id (keyset: strictly greater-than)
360 obj_stmt = obj_stmt.where(MusehubObject.object_id > cursor)
361
362 obj_rows = list((await session.execute(obj_stmt)).scalars().all())
363 has_more = len(obj_rows) > _PULL_OBJECTS_PAGE_SIZE
364 if has_more:
365 obj_rows = obj_rows[:_PULL_OBJECTS_PAGE_SIZE]
366
367 import asyncio
368 missing_objects = list(await asyncio.gather(*[_to_object_response(r) for r in obj_rows]))
369 next_cursor = obj_rows[-1].object_id if has_more and obj_rows else None
370
371 logger.info(
372 "✅ Pull delta: %d commits, %d objects (has_more=%s) for repo=%s branch=%s",
373 len(missing_commits),
374 len(missing_objects),
375 has_more,
376 repo_id,
377 branch,
378 )
379 return PullResponse(
380 commits=missing_commits,
381 objects=missing_objects,
382 remote_head=remote_head,
383 has_more=has_more,
384 next_cursor=next_cursor,
385 )
386
387
388 # ---------------------------------------------------------------------------
389 # Private helpers
390 # ---------------------------------------------------------------------------
391
392
393 async def _get_branch(
394 session: AsyncSession, *, repo_id: str, branch: str
395 ) -> MusehubBranch | None:
396 stmt = select(MusehubBranch).where(
397 MusehubBranch.repo_id == repo_id,
398 MusehubBranch.name == branch,
399 )
400 return (await session.execute(stmt)).scalar_one_or_none()
401
402
403 async def _get_or_create_branch(
404 session: AsyncSession, *, repo_id: str, branch: str
405 ) -> MusehubBranch:
406 existing = await _get_branch(session, repo_id=repo_id, branch=branch)
407 if existing is not None:
408 return existing
409 new_branch = MusehubBranch(branch_id=compute_branch_id(repo_id, branch), repo_id=repo_id, name=branch)
410 session.add(new_branch)
411 await session.flush()
412 logger.info("✅ Created branch '%s' for repo=%s", branch, repo_id)
413 return new_branch
414
415
416 async def _write_object(
417 session: AsyncSession,
418 *,
419 repo_id: str,
420 obj: ObjectInput,
421 ) -> None:
422 """Decode base64 content, write to the blob store and insert metadata row."""
423 from musehub.storage.backends import get_backend
424 raw = base64.b64decode(obj.content_b64)
425 uri = await get_backend().put(obj.object_id, raw)
426
427 row = MusehubObject(
428 object_id=obj.object_id,
429 path=obj.path,
430 size_bytes=len(raw),
431 storage_uri=uri,
432 )
433 session.add(row)
434 session.add(MusehubObjectRef(repo_id=repo_id, object_id=obj.object_id))
435 logger.info(
436 "✅ Stored object %s (%d bytes) for repo=%s",
437 obj.object_id,
438 len(raw),
439 repo_id,
440 )
441
442
443 async def commit_files_to_repo(
444 session: AsyncSession,
445 *,
446 repo_id: str,
447 branch: str,
448 files: dict[str, bytes],
449 message: str,
450 author: str,
451 delete_paths: list[str] | None = None,
452 ) -> str:
453 """Write files to a repo as a single commit and return the new commit_id.
454
455 Creates MusehubObject rows, a MusehubSnapshot, a MusehubCommit, and
456 advances (or creates) the branch head. Used for server-side commits that
457 originate inside MuseHub rather than from a client push.
458
459 Args:
460 session: Active async SQLAlchemy session (caller owns transaction).
461 repo_id: Target repo ID — must already exist.
462 branch: Branch name to commit to (created if absent).
463 files: ``{path: raw_bytes}`` mapping — paths to add/update.
464 message: Commit message.
465 author: Identity handle or ID of the author.
466 delete_paths: Paths to remove from the HEAD manifest before committing.
467 The current HEAD snapshot is read, the paths are excluded,
468 and the resulting manifest is merged with ``files``.
469
470 Returns:
471 The new commit_id (``sha256:<hex>``).
472 """
473 from musehub.muse_cli.snapshot import compute_snapshot_id
474 from musehub.services.musehub_snapshot import upsert_snapshot_entries
475 from musehub.storage.backends import get_backend
476
477 repo_row = await session.get(MusehubRepo, repo_id)
478 if repo_row is None:
479 raise ValueError(f"commit_files_to_repo: repo not found: {repo_id}")
480 backend = get_backend()
481
482 # 1a. Build base manifest from HEAD when deletions are requested.
483 base_manifest: dict[str, str] = {}
484 if delete_paths:
485 branch_row_pre = await _get_or_create_branch(session, repo_id=repo_id, branch=branch)
486 if branch_row_pre.head_commit_id:
487 head_commit = await session.get(MusehubCommit, branch_row_pre.head_commit_id)
488 if head_commit and head_commit.snapshot_id:
489 import msgpack as _mp
490 head_snap = await session.get(MusehubSnapshot, head_commit.snapshot_id)
491 if head_snap:
492 base_manifest = _mp.unpackb(head_snap.manifest_blob, raw=False)
493 for p in delete_paths:
494 base_manifest.pop(p, None)
495
496 # 1b. Write each new/updated file as an object.
497 manifest: dict[str, str] = {**base_manifest}
498 for path, content in files.items():
499 oid = blob_id(content)
500 existing = await session.get(MusehubObject, oid)
501 if existing is None:
502 uri = await backend.put(oid, content)
503 session.add(MusehubObject(
504 object_id=oid,
505 path=path,
506 size_bytes=len(content),
507 storage_uri=uri,
508 ))
509 session.add(MusehubObjectRef(repo_id=repo_id, object_id=oid))
510 manifest[path] = oid
511
512 # 2. Upsert snapshot
513 snap_id = compute_snapshot_id(manifest)
514 await upsert_snapshot_entries(session, repo_id, snap_id, manifest)
515
516 # 3. Resolve parent commit from existing branch head
517 now = datetime.datetime.now(datetime.timezone.utc)
518 branch_row = await _get_or_create_branch(session, repo_id=repo_id, branch=branch)
519 parent_ids: list[str] = []
520 if branch_row.head_commit_id:
521 parent_ids = [branch_row.head_commit_id]
522
523 # 4. Derive a deterministic commit_id from content
524 commit_seed = f"{repo_id}\x00{branch}\x00{snap_id}\x00{message}\x00{now.isoformat()}"
525 commit_id = blob_id(commit_seed.encode())
526
527 session.add(MusehubCommit(
528 commit_id=commit_id,
529 branch=branch,
530 parent_ids=parent_ids,
531 message=message,
532 author=author,
533 timestamp=now,
534 snapshot_id=snap_id,
535 ))
536 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id))
537
538 # 5. Advance branch head
539 branch_row.head_commit_id = commit_id
540 await session.flush()
541
542 logger.info(
543 "✅ commit_files_to_repo: repo=%s branch=%s commit=%s files=%d",
544 repo_id, branch, commit_id[:20], len(files),
545 )
546 return commit_id
File History 2 commits
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520 fix: blobs only in S3/mpack — remove commit/snapshot indivi… Sonnet 4.6 patch 16 days ago
sha256:e597c0b97ade9c3c52ac4735ceb437ee69d1b6f0db61b8d7caa6467c5866566d feat(phase2): write commit objects to S3 at all 5 write sit… Sonnet 4.6 patch 19 days ago