musehub_sync.py
python
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520
fix: blobs only in S3/mpack — remove commit/snapshot indivi…
Sonnet 4.6
patch
16 days ago
| 1 | """MuseHub sync service — push and pull protocol implementation. |
| 2 | |
| 3 | Implements the two core data-movement operations: |
| 4 | - ``ingest_push``: stores commits and objects from a client push, enforcing |
| 5 | fast-forward semantics and updating the branch head. |
| 6 | - ``compute_pull_delta``: returns commits and objects the client does not yet |
| 7 | have, keyed by their ``have_commits`` / ``have_objects`` exclusion lists. |
| 8 | |
| 9 | Object content is written to the blob store (R2/MinIO) and metadata |
| 10 | (path, size, storage_uri) is persisted to Postgres. |
| 11 | |
| 12 | Boundary rules (same as musehub_repository): |
| 13 | - Must NOT import state stores, SSE queues, or LLM clients. |
| 14 | - May import ORM models from musehub.db domain-specific modules. |
| 15 | - May import Pydantic models from musehub.models.musehub. |
| 16 | """ |
| 17 | |
| 18 | import base64 |
| 19 | import datetime |
| 20 | import hashlib |
| 21 | import logging |
| 22 | |
| 23 | from sqlalchemy import select |
| 24 | from sqlalchemy.ext.asyncio import AsyncSession |
| 25 | |
| 26 | from muse.core.types import blob_id |
| 27 | from musehub.core.genesis import compute_branch_id |
| 28 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubObject, MusehubObjectRef, MusehubRepo, MusehubSnapshot |
| 29 | from musehub.models.musehub import ( |
| 30 | CommitInput, |
| 31 | CommitResponse, |
| 32 | ObjectInput, |
| 33 | ObjectResponse, |
| 34 | PullResponse, |
| 35 | PushResponse, |
| 36 | SnapshotInput, |
| 37 | ) |
| 38 | |
| 39 | logger = logging.getLogger(__name__) |
| 40 | |
| 41 | |
| 42 | # --------------------------------------------------------------------------- |
| 43 | # Internal helpers |
| 44 | # --------------------------------------------------------------------------- |
| 45 | |
| 46 | |
| 47 | def _to_commit_response(row: MusehubCommit) -> CommitResponse: |
| 48 | return CommitResponse( |
| 49 | commit_id=row.commit_id, |
| 50 | branch=row.branch, |
| 51 | parent_ids=list(row.parent_ids or []), |
| 52 | message=row.message, |
| 53 | author=row.author, |
| 54 | timestamp=row.timestamp, |
| 55 | snapshot_id=row.snapshot_id, |
| 56 | ) |
| 57 | |
| 58 | |
| 59 | async def _to_object_response(row: MusehubObject) -> ObjectResponse: |
| 60 | """Fetch object bytes from the blob store and return as base64-encoded response.""" |
| 61 | from musehub.storage.backends import get_backend |
| 62 | data = await get_backend().get(row.object_id) |
| 63 | if data is None: |
| 64 | logger.warning("⚠️ Object missing from blob store: object_id=%s", row.object_id) |
| 65 | data = b"" |
| 66 | return ObjectResponse( |
| 67 | object_id=row.object_id, |
| 68 | path=row.path, |
| 69 | content_b64=base64.b64encode(data).decode(), |
| 70 | ) |
| 71 | |
| 72 | |
| 73 | |
| 74 | def _is_fast_forward( |
| 75 | remote_head: str | None, |
| 76 | head_commit_id: str, |
| 77 | commits: list[CommitInput], |
| 78 | ) -> bool: |
| 79 | """Return True if the push is a fast-forward update. |
| 80 | |
| 81 | A push is fast-forward when: |
| 82 | - the remote branch has no head yet (first push), or |
| 83 | - the new head_commit_id equals the remote head (no-op), or |
| 84 | - the remote head appears somewhere in the ancestry graph of the pushed |
| 85 | commits (meaning the client built on top of the remote head). |
| 86 | |
| 87 | We build a local graph from the pushed commits and walk parents. This |
| 88 | does NOT query the DB for previously stored commits — for MVP the client |
| 89 | is expected to include all commits since the common ancestor. |
| 90 | """ |
| 91 | if remote_head is None: |
| 92 | return True |
| 93 | if head_commit_id == remote_head: |
| 94 | return True |
| 95 | |
| 96 | from muse.core.graph import walk_dag |
| 97 | parent_map = {c.commit_id: c.parent_ids for c in commits} |
| 98 | for cid in walk_dag(head_commit_id, lambda cid: parent_map.get(cid, [])): |
| 99 | if cid == remote_head: |
| 100 | return True |
| 101 | return False |
| 102 | |
| 103 | |
| 104 | # --------------------------------------------------------------------------- |
| 105 | # Public API |
| 106 | # --------------------------------------------------------------------------- |
| 107 | |
| 108 | |
| 109 | async def ingest_push( |
| 110 | session: AsyncSession, |
| 111 | *, |
| 112 | repo_id: str, |
| 113 | branch: str, |
| 114 | head_commit_id: str, |
| 115 | commits: list[CommitInput], |
| 116 | snapshots: list[SnapshotInput] | None = None, |
| 117 | objects: list[ObjectInput], |
| 118 | force: bool, |
| 119 | author: str, |
| 120 | ) -> PushResponse: |
| 121 | """Store commits, snapshot manifests, and objects from a push; update the branch head. |
| 122 | |
| 123 | Execution steps (in order): |
| 124 | |
| 125 | 1. **Resolve / create branch** — upsert the branch row; first push initialises |
| 126 | ``head_commit_id = None`` and ``default_branch`` is set only when this is the |
| 127 | repo's inaugural branch. |
| 128 | 2. **Fast-forward check** — BFS traverses both ``parent_commit_id`` and |
| 129 | ``parent2_commit_id`` to support merge commits. Rejected with |
| 130 | ``ValueError("non_fast_forward")`` when the current tip is not an ancestor of |
| 131 | the new head and ``force`` is False. The route handler maps this to HTTP 409. |
| 132 | 3. **Upsert commits** — existing commit IDs are skipped; new rows are bulk-inserted. |
| 133 | 4. **Upsert snapshots** — each :class:`SnapshotInput` is stored with |
| 134 | ``(repo_id, snapshot_id)`` as a composite key. Re-pushing an identical |
| 135 | snapshot is a safe no-op (idempotent via ``merge`` / ``ON CONFLICT DO NOTHING``). |
| 136 | 5. **Upsert objects** — binary blobs are written to the configured storage backend |
| 137 | and their metadata rows are inserted or skipped if already present. |
| 138 | 6. **Update branch head** — the branch row's ``head_commit_id`` is set to |
| 139 | ``head_commit_id`` and ``pushed_at`` is refreshed. |
| 140 | |
| 141 | Args: |
| 142 | session: Active async SQLAlchemy session. |
| 143 | repo_id: Repository ID; must already exist in the DB. |
| 144 | branch: Target branch name (created on first push). |
| 145 | head_commit_id: SHA of the new branch tip after the push. |
| 146 | commits: Ordered list of :class:`CommitInput` objects to store. |
| 147 | snapshots: Optional list of :class:`SnapshotInput` objects; ``None`` |
| 148 | or ``[]`` are both treated as "no snapshots in this push". |
| 149 | objects: Content-addressed blob payloads. |
| 150 | force: When ``True``, skip the fast-forward check (destructive). |
| 151 | author: Username performing the push (used for the response summary). |
| 152 | |
| 153 | Returns: |
| 154 | :class:`PushResponse` with commit / snapshot / object counts and the |
| 155 | new branch head commit ID. |
| 156 | |
| 157 | Raises: |
| 158 | ValueError: with key ``"non_fast_forward"`` when the update would create |
| 159 | a non-linear history and ``force`` is False. |
| 160 | """ |
| 161 | # ------------------------------------------------------------------ |
| 162 | # 1. Resolve (or create) the branch |
| 163 | # ------------------------------------------------------------------ |
| 164 | branch_row = await _get_or_create_branch(session, repo_id=repo_id, branch=branch) |
| 165 | |
| 166 | # ------------------------------------------------------------------ |
| 167 | # 2. Fast-forward check |
| 168 | # ------------------------------------------------------------------ |
| 169 | if not force and not _is_fast_forward(branch_row.head_commit_id, head_commit_id, commits): |
| 170 | logger.warning( |
| 171 | "⚠️ Non-fast-forward push rejected for repo=%s branch=%s remote_head=%s new_head=%s", |
| 172 | repo_id, |
| 173 | branch, |
| 174 | branch_row.head_commit_id, |
| 175 | head_commit_id, |
| 176 | ) |
| 177 | raise ValueError("non_fast_forward") |
| 178 | |
| 179 | # ------------------------------------------------------------------ |
| 180 | # 3. Upsert commits |
| 181 | # ------------------------------------------------------------------ |
| 182 | existing_commit_ids: set[str] = set() |
| 183 | if commits: |
| 184 | stmt = select(MusehubCommitRef.commit_id).where( |
| 185 | MusehubCommitRef.repo_id == repo_id, |
| 186 | MusehubCommitRef.commit_id.in_([c.commit_id for c in commits]), |
| 187 | ) |
| 188 | result = await session.execute(stmt) |
| 189 | existing_commit_ids = set(result.scalars().all()) |
| 190 | |
| 191 | # ------------------------------------------------------------------ |
| 192 | # 3a. Parent existence validation (Phase 8 / invariant 8 parity) |
| 193 | # |
| 194 | # Every parent_id referenced by an incoming commit must either: |
| 195 | # (a) be in this push mpack itself, or |
| 196 | # (b) already exist in the DB for this repo. |
| 197 | # |
| 198 | # Without this guard, a client can push a commit whose parent is a |
| 199 | # fabricated or missing ID. `muse log` then walks off the end of |
| 200 | # history trying to read the parent — silent data corruption. |
| 201 | # |
| 202 | # Mirrors the same validation in wire_push_unpack_mpack() so the ingest |
| 203 | # path is equally safe. |
| 204 | # ------------------------------------------------------------------ |
| 205 | mpack_commit_ids: set[str] = {c.commit_id for c in commits} |
| 206 | external_parent_ids: set[str] = set() |
| 207 | for c in commits: |
| 208 | if c.commit_id in existing_commit_ids: |
| 209 | continue |
| 210 | for pid in (c.parent_ids or []): |
| 211 | if pid and pid not in mpack_commit_ids: |
| 212 | external_parent_ids.add(pid) |
| 213 | |
| 214 | if external_parent_ids: |
| 215 | db_parent_ids: set[str] = set( |
| 216 | (await session.execute( |
| 217 | select(MusehubCommitRef.commit_id).where( |
| 218 | MusehubCommitRef.commit_id.in_(external_parent_ids), |
| 219 | MusehubCommitRef.repo_id == repo_id, |
| 220 | ) |
| 221 | )).scalars().all() |
| 222 | ) |
| 223 | missing_parents = external_parent_ids - db_parent_ids |
| 224 | if missing_parents: |
| 225 | short = ", ".join(p[:16] for p in sorted(missing_parents)) |
| 226 | raise ValueError(f"missing_parent_commits: {short}") |
| 227 | |
| 228 | new_commits: list[MusehubCommit] = [] |
| 229 | new_commit_refs: list[MusehubCommitRef] = [] |
| 230 | for c in commits: |
| 231 | if c.commit_id in existing_commit_ids: |
| 232 | new_commit_refs.append(MusehubCommitRef(repo_id=repo_id, commit_id=c.commit_id)) |
| 233 | continue |
| 234 | row = MusehubCommit( |
| 235 | commit_id=c.commit_id, |
| 236 | branch=branch, |
| 237 | parent_ids=c.parent_ids, |
| 238 | message=c.message, |
| 239 | author=c.author if c.author else author, |
| 240 | timestamp=c.timestamp, |
| 241 | snapshot_id=c.snapshot_id, |
| 242 | ) |
| 243 | new_commits.append(row) |
| 244 | new_commit_refs.append(MusehubCommitRef(repo_id=repo_id, commit_id=c.commit_id)) |
| 245 | if new_commits: |
| 246 | session.add_all(new_commits) |
| 247 | logger.info("✅ Ingested %d new commits for repo=%s", len(new_commits), repo_id) |
| 248 | if new_commit_refs: |
| 249 | session.add_all(new_commit_refs) |
| 250 | |
| 251 | # ------------------------------------------------------------------ |
| 252 | # 4. Upsert snapshots (idempotent — always writes entries) |
| 253 | # ------------------------------------------------------------------ |
| 254 | from musehub.services.musehub_snapshot import upsert_snapshot_entries |
| 255 | ingest_snapshots = snapshots or [] |
| 256 | for s in ingest_snapshots: |
| 257 | if not s.snapshot_id: |
| 258 | continue |
| 259 | manifest = s.manifest if isinstance(s.manifest, dict) else {} |
| 260 | await upsert_snapshot_entries(session, repo_id, s.snapshot_id, manifest) |
| 261 | if ingest_snapshots: |
| 262 | logger.info("✅ Ingested %d snapshots for repo=%s", len(ingest_snapshots), repo_id) |
| 263 | |
| 264 | # ------------------------------------------------------------------ |
| 265 | # 5. Upsert objects (write bytes to per-repo store, metadata to DB) |
| 266 | # ------------------------------------------------------------------ |
| 267 | existing_object_ids: set[str] = set() |
| 268 | if objects: |
| 269 | stmt_obj = select(MusehubObjectRef.object_id).where( |
| 270 | MusehubObjectRef.repo_id == repo_id, |
| 271 | MusehubObjectRef.object_id.in_([o.object_id for o in objects]), |
| 272 | ) |
| 273 | res_obj = await session.execute(stmt_obj) |
| 274 | existing_object_ids = set(res_obj.scalars().all()) |
| 275 | |
| 276 | repo_row = await session.get(MusehubRepo, repo_id) if objects else None |
| 277 | |
| 278 | for obj in objects: |
| 279 | if obj.object_id in existing_object_ids: |
| 280 | continue |
| 281 | await _write_object( |
| 282 | session, |
| 283 | repo_id=repo_id, |
| 284 | obj=obj, |
| 285 | ) |
| 286 | |
| 287 | # ------------------------------------------------------------------ |
| 288 | # 6. Update branch head |
| 289 | # ------------------------------------------------------------------ |
| 290 | branch_row.head_commit_id = head_commit_id |
| 291 | logger.info( |
| 292 | "✅ Branch '%s' head updated to %s for repo=%s", |
| 293 | branch, |
| 294 | head_commit_id, |
| 295 | repo_id, |
| 296 | ) |
| 297 | |
| 298 | await session.flush() |
| 299 | return PushResponse(ok=True, remote_head=head_commit_id) |
| 300 | |
| 301 | |
| 302 | _PULL_OBJECTS_PAGE_SIZE: int = 500 # max objects returned per pull response |
| 303 | |
| 304 | |
| 305 | async def compute_pull_delta( |
| 306 | session: AsyncSession, |
| 307 | *, |
| 308 | repo_id: str, |
| 309 | branch: str, |
| 310 | have_commits: list[str], |
| 311 | have_objects: list[str], |
| 312 | cursor: str | None = None, |
| 313 | ) -> PullResponse: |
| 314 | """Return commits and objects the caller does not have. |
| 315 | |
| 316 | Objects are paginated at ``_PULL_OBJECTS_PAGE_SIZE`` items per response to |
| 317 | prevent a single pull from returning an unbounded payload (OOM / timeout). |
| 318 | |
| 319 | Pagination protocol: |
| 320 | - First call: ``cursor=None`` |
| 321 | - When ``has_more=True``, re-issue with ``cursor=response.next_cursor`` |
| 322 | - The ``next_cursor`` is the ``object_id`` of the last item in this page; |
| 323 | the next page starts *after* that ID (keyset pagination, stable sort). |
| 324 | |
| 325 | Commits are never paginated — a repo's commit graph is O(kB per commit), |
| 326 | so even 10 000 commits stay well within a single JSON response. |
| 327 | """ |
| 328 | branch_row = await _get_branch(session, repo_id=repo_id, branch=branch) |
| 329 | remote_head = branch_row.head_commit_id if branch_row else None |
| 330 | |
| 331 | # ------------------------------------------------------------------ |
| 332 | # Missing commits (no pagination — commit metadata is small) |
| 333 | # ------------------------------------------------------------------ |
| 334 | commit_stmt = ( |
| 335 | select(MusehubCommit) |
| 336 | .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) |
| 337 | .where(MusehubCommitRef.repo_id == repo_id, MusehubCommit.branch == branch) |
| 338 | ) |
| 339 | if have_commits: |
| 340 | commit_stmt = commit_stmt.where( |
| 341 | MusehubCommit.commit_id.notin_(have_commits) |
| 342 | ) |
| 343 | commit_rows = (await session.execute(commit_stmt)).scalars().all() |
| 344 | missing_commits = [_to_commit_response(r) for r in commit_rows] |
| 345 | |
| 346 | # ------------------------------------------------------------------ |
| 347 | # Missing objects — keyset-paginated, capped at _PULL_OBJECTS_PAGE_SIZE |
| 348 | # ------------------------------------------------------------------ |
| 349 | obj_stmt = ( |
| 350 | select(MusehubObject) |
| 351 | .join(MusehubObjectRef, MusehubObject.object_id == MusehubObjectRef.object_id) |
| 352 | .where(MusehubObjectRef.repo_id == repo_id) |
| 353 | .order_by(MusehubObject.object_id) # stable keyset sort |
| 354 | .limit(_PULL_OBJECTS_PAGE_SIZE + 1) # fetch one extra to detect next page |
| 355 | ) |
| 356 | if have_objects: |
| 357 | obj_stmt = obj_stmt.where(MusehubObject.object_id.notin_(have_objects)) |
| 358 | if cursor: |
| 359 | # Resume after the last seen object_id (keyset: strictly greater-than) |
| 360 | obj_stmt = obj_stmt.where(MusehubObject.object_id > cursor) |
| 361 | |
| 362 | obj_rows = list((await session.execute(obj_stmt)).scalars().all()) |
| 363 | has_more = len(obj_rows) > _PULL_OBJECTS_PAGE_SIZE |
| 364 | if has_more: |
| 365 | obj_rows = obj_rows[:_PULL_OBJECTS_PAGE_SIZE] |
| 366 | |
| 367 | import asyncio |
| 368 | missing_objects = list(await asyncio.gather(*[_to_object_response(r) for r in obj_rows])) |
| 369 | next_cursor = obj_rows[-1].object_id if has_more and obj_rows else None |
| 370 | |
| 371 | logger.info( |
| 372 | "✅ Pull delta: %d commits, %d objects (has_more=%s) for repo=%s branch=%s", |
| 373 | len(missing_commits), |
| 374 | len(missing_objects), |
| 375 | has_more, |
| 376 | repo_id, |
| 377 | branch, |
| 378 | ) |
| 379 | return PullResponse( |
| 380 | commits=missing_commits, |
| 381 | objects=missing_objects, |
| 382 | remote_head=remote_head, |
| 383 | has_more=has_more, |
| 384 | next_cursor=next_cursor, |
| 385 | ) |
| 386 | |
| 387 | |
| 388 | # --------------------------------------------------------------------------- |
| 389 | # Private helpers |
| 390 | # --------------------------------------------------------------------------- |
| 391 | |
| 392 | |
| 393 | async def _get_branch( |
| 394 | session: AsyncSession, *, repo_id: str, branch: str |
| 395 | ) -> MusehubBranch | None: |
| 396 | stmt = select(MusehubBranch).where( |
| 397 | MusehubBranch.repo_id == repo_id, |
| 398 | MusehubBranch.name == branch, |
| 399 | ) |
| 400 | return (await session.execute(stmt)).scalar_one_or_none() |
| 401 | |
| 402 | |
| 403 | async def _get_or_create_branch( |
| 404 | session: AsyncSession, *, repo_id: str, branch: str |
| 405 | ) -> MusehubBranch: |
| 406 | existing = await _get_branch(session, repo_id=repo_id, branch=branch) |
| 407 | if existing is not None: |
| 408 | return existing |
| 409 | new_branch = MusehubBranch(branch_id=compute_branch_id(repo_id, branch), repo_id=repo_id, name=branch) |
| 410 | session.add(new_branch) |
| 411 | await session.flush() |
| 412 | logger.info("✅ Created branch '%s' for repo=%s", branch, repo_id) |
| 413 | return new_branch |
| 414 | |
| 415 | |
| 416 | async def _write_object( |
| 417 | session: AsyncSession, |
| 418 | *, |
| 419 | repo_id: str, |
| 420 | obj: ObjectInput, |
| 421 | ) -> None: |
| 422 | """Decode base64 content, write to the blob store and insert metadata row.""" |
| 423 | from musehub.storage.backends import get_backend |
| 424 | raw = base64.b64decode(obj.content_b64) |
| 425 | uri = await get_backend().put(obj.object_id, raw) |
| 426 | |
| 427 | row = MusehubObject( |
| 428 | object_id=obj.object_id, |
| 429 | path=obj.path, |
| 430 | size_bytes=len(raw), |
| 431 | storage_uri=uri, |
| 432 | ) |
| 433 | session.add(row) |
| 434 | session.add(MusehubObjectRef(repo_id=repo_id, object_id=obj.object_id)) |
| 435 | logger.info( |
| 436 | "✅ Stored object %s (%d bytes) for repo=%s", |
| 437 | obj.object_id, |
| 438 | len(raw), |
| 439 | repo_id, |
| 440 | ) |
| 441 | |
| 442 | |
| 443 | async def commit_files_to_repo( |
| 444 | session: AsyncSession, |
| 445 | *, |
| 446 | repo_id: str, |
| 447 | branch: str, |
| 448 | files: dict[str, bytes], |
| 449 | message: str, |
| 450 | author: str, |
| 451 | delete_paths: list[str] | None = None, |
| 452 | ) -> str: |
| 453 | """Write files to a repo as a single commit and return the new commit_id. |
| 454 | |
| 455 | Creates MusehubObject rows, a MusehubSnapshot, a MusehubCommit, and |
| 456 | advances (or creates) the branch head. Used for server-side commits that |
| 457 | originate inside MuseHub rather than from a client push. |
| 458 | |
| 459 | Args: |
| 460 | session: Active async SQLAlchemy session (caller owns transaction). |
| 461 | repo_id: Target repo ID — must already exist. |
| 462 | branch: Branch name to commit to (created if absent). |
| 463 | files: ``{path: raw_bytes}`` mapping — paths to add/update. |
| 464 | message: Commit message. |
| 465 | author: Identity handle or ID of the author. |
| 466 | delete_paths: Paths to remove from the HEAD manifest before committing. |
| 467 | The current HEAD snapshot is read, the paths are excluded, |
| 468 | and the resulting manifest is merged with ``files``. |
| 469 | |
| 470 | Returns: |
| 471 | The new commit_id (``sha256:<hex>``). |
| 472 | """ |
| 473 | from musehub.muse_cli.snapshot import compute_snapshot_id |
| 474 | from musehub.services.musehub_snapshot import upsert_snapshot_entries |
| 475 | from musehub.storage.backends import get_backend |
| 476 | |
| 477 | repo_row = await session.get(MusehubRepo, repo_id) |
| 478 | if repo_row is None: |
| 479 | raise ValueError(f"commit_files_to_repo: repo not found: {repo_id}") |
| 480 | backend = get_backend() |
| 481 | |
| 482 | # 1a. Build base manifest from HEAD when deletions are requested. |
| 483 | base_manifest: dict[str, str] = {} |
| 484 | if delete_paths: |
| 485 | branch_row_pre = await _get_or_create_branch(session, repo_id=repo_id, branch=branch) |
| 486 | if branch_row_pre.head_commit_id: |
| 487 | head_commit = await session.get(MusehubCommit, branch_row_pre.head_commit_id) |
| 488 | if head_commit and head_commit.snapshot_id: |
| 489 | import msgpack as _mp |
| 490 | head_snap = await session.get(MusehubSnapshot, head_commit.snapshot_id) |
| 491 | if head_snap: |
| 492 | base_manifest = _mp.unpackb(head_snap.manifest_blob, raw=False) |
| 493 | for p in delete_paths: |
| 494 | base_manifest.pop(p, None) |
| 495 | |
| 496 | # 1b. Write each new/updated file as an object. |
| 497 | manifest: dict[str, str] = {**base_manifest} |
| 498 | for path, content in files.items(): |
| 499 | oid = blob_id(content) |
| 500 | existing = await session.get(MusehubObject, oid) |
| 501 | if existing is None: |
| 502 | uri = await backend.put(oid, content) |
| 503 | session.add(MusehubObject( |
| 504 | object_id=oid, |
| 505 | path=path, |
| 506 | size_bytes=len(content), |
| 507 | storage_uri=uri, |
| 508 | )) |
| 509 | session.add(MusehubObjectRef(repo_id=repo_id, object_id=oid)) |
| 510 | manifest[path] = oid |
| 511 | |
| 512 | # 2. Upsert snapshot |
| 513 | snap_id = compute_snapshot_id(manifest) |
| 514 | await upsert_snapshot_entries(session, repo_id, snap_id, manifest) |
| 515 | |
| 516 | # 3. Resolve parent commit from existing branch head |
| 517 | now = datetime.datetime.now(datetime.timezone.utc) |
| 518 | branch_row = await _get_or_create_branch(session, repo_id=repo_id, branch=branch) |
| 519 | parent_ids: list[str] = [] |
| 520 | if branch_row.head_commit_id: |
| 521 | parent_ids = [branch_row.head_commit_id] |
| 522 | |
| 523 | # 4. Derive a deterministic commit_id from content |
| 524 | commit_seed = f"{repo_id}\x00{branch}\x00{snap_id}\x00{message}\x00{now.isoformat()}" |
| 525 | commit_id = blob_id(commit_seed.encode()) |
| 526 | |
| 527 | session.add(MusehubCommit( |
| 528 | commit_id=commit_id, |
| 529 | branch=branch, |
| 530 | parent_ids=parent_ids, |
| 531 | message=message, |
| 532 | author=author, |
| 533 | timestamp=now, |
| 534 | snapshot_id=snap_id, |
| 535 | )) |
| 536 | session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) |
| 537 | |
| 538 | # 5. Advance branch head |
| 539 | branch_row.head_commit_id = commit_id |
| 540 | await session.flush() |
| 541 | |
| 542 | logger.info( |
| 543 | "✅ commit_files_to_repo: repo=%s branch=%s commit=%s files=%d", |
| 544 | repo_id, branch, commit_id[:20], len(files), |
| 545 | ) |
| 546 | return commit_id |
File History
2 commits
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520
fix: blobs only in S3/mpack — remove commit/snapshot indivi…
Sonnet 4.6
patch
16 days ago
sha256:e597c0b97ade9c3c52ac4735ceb437ee69d1b6f0db61b8d7caa6467c5866566d
feat(phase2): write commit objects to S3 at all 5 write sit…
Sonnet 4.6
patch
19 days ago