gabriel / musehub public
wire.py python
937 lines 35.7 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 2 days ago
1 """Wire protocol endpoints — Muse CLI push/fetch transport.
2
3 URL pattern mirrors Git's Smart HTTP protocol:
4
5 muse remote add origin https://musehub.ai/gabriel/muse
6
7 Active endpoints:
8
9 GET /{owner}/{slug}/refs — branch heads + domain metadata (pre-flight)
10 POST /{owner}/{slug}/push/mpack-presign — get presigned PUT URL for whole mpack
11 POST /{owner}/{slug}/push/unpack-mpack — server indexes mpack from R2
12 POST /{owner}/{slug}/fetch/mpack — server builds and returns fetch mpack
13 POST /{owner}/{slug}/fetch/presign — presigned GET URLs for large fetches
14
15 These routes MUST be registered before the wildcard UI router in main.py
16 (/{owner}/{repo_slug}/...) so FastAPI matches the concrete third-segment
17 paths first.
18 """
19
20 import json
21 import logging
22
23 import msgpack
24 import pydantic
25 from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status
26 from fastapi.responses import Response
27 from sqlalchemy.ext.asyncio import AsyncSession
28
29 from sqlalchemy import select
30 from musehub.auth.dependencies import optional_token, require_valid_token, TokenClaims
31 from musehub.config import get_settings
32 from musehub.db.database import get_db as get_session
33 from musehub.db.musehub_abuse_models import MusehubDailyPushBytes
34 from musehub.db.musehub_collaborator_models import MusehubCollaborator
35 from musehub.db.musehub_repo_models import MusehubBranch, MusehubObject, MusehubRepo
36 from musehub.models.wire import WireFetchRequest
37 from musehub.types.json_types import JSONObject
38
39 from musehub.api.validation import BranchParam, SlugParam
40 from musehub.rate_limits import limiter, WIRE_PUSH_LIMIT, WIRE_FETCH_LIMIT, OBJECT_LIMIT, REPAIR_LIMIT
41 from musehub.services.musehub_repository import get_repo_row_by_owner_slug
42 from musehub.services.musehub_wire import (
43 FetchCommitNotFound,
44 FetchNotIndexedError,
45 FetchNotReady,
46 MPackValidationError,
47 ObjectHashMismatch,
48 record_mpack_bytes_uploaded,
49 wire_fetch,
50 wire_fetch_mpack,
51 wire_fetch_objects,
52 wire_push_mpack_presign,
53 wire_push_unpack_mpack,
54 wire_refs,
55 wire_repair_object,
56 wire_repair_snapshot,
57 )
58 from musehub.storage import get_backend
59
60 logger = logging.getLogger(__name__)
61
62 router = APIRouter(tags=["Wire Protocol"])
63
64 # ── helpers ────────────────────────────────────────────────────────────────────
65
66 async def _resolve_repo(
67 session: AsyncSession,
68 owner: SlugParam,
69 slug: SlugParam,
70 ) -> MusehubRepo:
71 """Resolve owner/slug → repo row or raise 404."""
72 repo = await get_repo_row_by_owner_slug(session, owner, slug)
73 if repo is None:
74 raise HTTPException(
75 status_code=status.HTTP_404_NOT_FOUND,
76 detail=f"repo '{owner}/{slug}' not found",
77 )
78 return repo
79
80 async def _resolve_repo_id(
81 session: AsyncSession,
82 owner: SlugParam,
83 slug: SlugParam,
84 ) -> str:
85 """Resolve owner/slug → repo_id or raise 404 (kept for push — push does its own auth)."""
86 return (await _resolve_repo(session, owner, slug)).repo_id
87
88 async def _assert_readable(
89 repo: MusehubRepo,
90 claims: TokenClaims | None,
91 session: AsyncSession,
92 ) -> None:
93 """Raise 404 if *repo* is private and the caller is not the owner or a collaborator.
94
95 Returns 404 (not 403) to avoid leaking that the repo exists.
96 """
97 if repo.visibility == "public":
98 return
99 caller_handle: str | None = claims.handle if claims else None
100 if caller_handle == repo.owner:
101 return
102 # Check collaborators with at least read permission
103 if caller_handle:
104 collab_row = (await session.execute(
105 select(MusehubCollaborator).where(
106 MusehubCollaborator.repo_id == repo.repo_id,
107 MusehubCollaborator.identity_handle == caller_handle,
108 MusehubCollaborator.accepted_at.isnot(None),
109 )
110 )).scalar_one_or_none()
111 if collab_row is not None:
112 return
113 raise HTTPException(
114 status_code=status.HTTP_404_NOT_FOUND,
115 detail="repo not found",
116 )
117
118 # ── Wire helpers ─────────────────────────────────────────────────────────────
119
120 def _mpack_response(data: JSONObject, request: Request) -> Response:
121 """Encode *data* as msgpack based on the client's Accept header.
122
123 Clients send ``Accept: application/x-msgpack`` and always receive
124 binary msgpack. The dict may contain ``bytes`` values (e.g. object
125 content) which msgpack handles natively.
126 """
127 accept = request.headers.get("accept", "")
128 if "application/x-msgpack" in accept:
129 return Response(
130 content=msgpack.packb(data, use_bin_type=True),
131 media_type="application/x-msgpack",
132 )
133 return Response(content=json.dumps(data), media_type="application/json")
134
135 def _decode_request_body(raw: bytes, content_type: str) -> JSONObject:
136 """Decode an HTTP request body from msgpack or JSON.
137
138 Clients send ``Content-Type: application/x-msgpack``; JSON is also
139 accepted as a fallback for compatibility.
140 """
141 if "application/x-msgpack" in content_type or "application/x-muse-mpack" in content_type:
142 decoded = msgpack.unpackb(raw, raw=False)
143 if not isinstance(decoded, dict):
144 raise ValueError("msgpack body must be a mapping")
145 return dict(decoded)
146 parsed = json.loads(raw)
147 if not isinstance(parsed, dict):
148 raise ValueError("JSON body must be a mapping")
149 return dict(parsed)
150
151 # ── wire endpoints ─────────────────────────────────────────────────────────────
152
153 @router.get(
154 "/{owner}/{slug}/refs",
155 summary="Get branch heads (muse pull / muse push pre-flight)",
156 response_description="Repo metadata and current branch heads",
157 )
158 @limiter.limit(WIRE_FETCH_LIMIT)
159 async def get_refs(
160 request: Request,
161 owner: SlugParam,
162 slug: SlugParam,
163 _claims: TokenClaims | None = Depends(optional_token),
164 session: AsyncSession = Depends(get_session),
165 ) -> Response:
166 """Return branch heads and domain metadata for a repo.
167
168 Called by ``muse push`` and ``muse pull`` as a pre-flight to determine
169 what the remote already has. Equivalent to Git's:
170 ``GET /owner/repo/info/refs?service=git-upload-pack``
171
172 Private repos are only visible to their owner — unauthenticated callers
173 receive a 404 (same response as a non-existent repo, to avoid leaking
174 the existence of private repos).
175
176 Response:
177 ```json
178 {
179 "repo_id": "...",
180 "domain": "code",
181 "default_branch": "main",
182 "branch_heads": {"main": "sha...", "dev": "sha..."}
183 }
184 ```
185 """
186 repo = await _resolve_repo(session, owner, slug)
187 await _assert_readable(repo, _claims, session)
188 result = await wire_refs(session, repo.repo_id)
189 if result is None:
190 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="repo not found")
191 return _mpack_response(result.model_dump(), request)
192
193 @router.post(
194 "/{owner}/{slug}/push/mpack-presign",
195 summary="Return one presigned PUT URL for a whole mpack",
196 status_code=status.HTTP_200_OK,
197 )
198 @limiter.limit(OBJECT_LIMIT)
199 async def push_mpack_presign(
200 request: Request,
201 owner: SlugParam,
202 slug: SlugParam,
203 claims: TokenClaims = Depends(require_valid_token),
204 session: AsyncSession = Depends(get_session),
205 ) -> Response:
206 """Return one presigned PUT URL so the client can upload the entire mpack.
207
208 Request body (msgpack):
209 mpack_key str — sha256:<hex> of the mpack bytes
210 size_bytes int — advisory byte count
211
212 Response (msgpack):
213 upload_url str — presigned PUT URL valid for 1 hour
214 mpack_key str — echoed back for the client to pass to unpack-mpack
215 """
216 raw = await request.body()
217 ct = request.headers.get("Content-Type", "")
218 data = _decode_request_body(raw, ct)
219 mpack_key = str(data.get("mpack_key", "") or "")
220 size_bytes = int(data.get("size_bytes", 0))
221 logger.warning("[mpack-presign] received mpack_key=%s size_bytes=%d content_type=%r", mpack_key, size_bytes, ct)
222 if not mpack_key:
223 raise HTTPException(
224 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
225 detail="mpack-presign requires mpack_key",
226 )
227 # mpack_key must be "sha256:<64-hex>" — anything else will never match
228 # what the client PUT to MinIO and will fail integrity check in unpack-mpack.
229 if not mpack_key.startswith("sha256:") or len(mpack_key) != 7 + 64:
230 raise HTTPException(
231 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
232 detail="mpack_key must be 'sha256:<64-hex>'",
233 )
234 _settings = get_settings()
235 if size_bytes > _settings.mpack_max_bytes:
236 raise HTTPException(
237 status_code=status.HTTP_413_CONTENT_TOO_LARGE,
238 detail=(
239 f"mpack size {size_bytes:,} bytes exceeds limit "
240 f"{_settings.mpack_max_bytes:,} bytes"
241 ),
242 )
243 # 4a — per-user daily byte limit
244 if _settings.mpack_daily_upload_limit_bytes > 0:
245 from sqlalchemy import select as _select, func as _func
246 import datetime as _dt
247 today = _dt.date.today()
248 _result = await session.execute(
249 _select(_func.coalesce(
250 _func.sum(MusehubDailyPushBytes.bytes_uploaded), 0
251 )).where(
252 MusehubDailyPushBytes.identity_id == claims.identity_id,
253 MusehubDailyPushBytes.date == today,
254 )
255 )
256 daily_total = int(_result.scalar() or 0)
257 if daily_total >= _settings.mpack_daily_upload_limit_bytes:
258 raise HTTPException(
259 status_code=status.HTTP_429_TOO_MANY_REQUESTS,
260 detail=(
261 f"daily upload limit of {_settings.mpack_daily_upload_limit_bytes:,} bytes reached; "
262 "try again tomorrow"
263 ),
264 )
265 await record_mpack_bytes_uploaded(session, claims.identity_id, size_bytes)
266 await session.commit()
267 result = await wire_push_mpack_presign(mpack_key, size_bytes)
268 return _mpack_response(result, request)
269
270
271 @router.post(
272 "/{owner}/{slug}/push/unpack-mpack",
273 summary="Read an mpack from storage, index all contents into PG",
274 status_code=status.HTTP_200_OK,
275 )
276 @limiter.limit(WIRE_PUSH_LIMIT)
277 async def push_unpack_mpack(
278 request: Request,
279 owner: SlugParam,
280 slug: SlugParam,
281 claims: TokenClaims = Depends(require_valid_token),
282 session: AsyncSession = Depends(get_session),
283 ) -> Response:
284 """Server reads a previously uploaded mpack from MinIO and indexes it.
285
286 Request body (msgpack):
287 mpack_key str — sha256:<hex> used when the client called mpack-presign
288
289 Response (msgpack):
290 commits_written int
291 snapshots_written int
292 blobs_written int
293 """
294 raw = await request.body()
295 ct = request.headers.get("Content-Type", "")
296 data = _decode_request_body(raw, ct)
297 mpack_key = data.get("mpack_key", "")
298 logger.warning("[unpack-mpack] received mpack_key=%s raw_body_len=%d content_type=%r", mpack_key, len(raw), ct)
299 if not mpack_key:
300 raise HTTPException(
301 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
302 detail="unpack-pack requires mpack_key",
303 )
304 branch = str(data.get("branch") or "main")
305 head_commit_id = str(data.get("head") or "")
306 commits_count = int(data.get("commits_count") or 0)
307 blobs_count = int(data.get("blobs_count") or 0)
308 force = bool(data.get("force") or False)
309 _settings = get_settings()
310 if commits_count > _settings.mpack_max_commits:
311 raise HTTPException(
312 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
313 detail=f"commits_count {commits_count:,} exceeds limit {_settings.mpack_max_commits:,}",
314 )
315 if blobs_count > _settings.mpack_max_objects:
316 raise HTTPException(
317 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
318 detail=f"blobs_count {blobs_count:,} exceeds limit {_settings.mpack_max_objects:,}",
319 )
320 repo_id = await _resolve_repo_id(session, owner, slug)
321 from musehub.services.musehub_wire import NonFastForwardError as _NonFastForwardError
322 try:
323 result = await wire_push_unpack_mpack(
324 session, repo_id, mpack_key, claims.handle,
325 branch=branch, head_commit_id=head_commit_id,
326 commits_count=commits_count, blobs_count=blobs_count,
327 force=force,
328 )
329 except _NonFastForwardError as exc:
330 raise HTTPException(
331 status_code=status.HTTP_409_CONFLICT,
332 detail=str(exc),
333 )
334 except ValueError as exc:
335 raise HTTPException(
336 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
337 detail=str(exc),
338 )
339
340 # Enqueue intel + file-last-commits + gc + profile.snapshot after every push.
341 try:
342 from musehub.services.musehub_jobs import enqueue_push_intel as _enqueue_push_intel
343 repo_row = await session.get(MusehubRepo, repo_id)
344 domain_id = repo_row.domain_id if repo_row else None
345 head = result.get("head", head_commit_id)
346 await _enqueue_push_intel(
347 session,
348 repo_id,
349 head,
350 domain_id=domain_id,
351 branch=branch,
352 owner=owner,
353 mpack_key=mpack_key,
354 )
355 await session.commit()
356 except Exception:
357 logger.exception(
358 "enqueue_push_intel failed for repo=%s — push succeeded, intel skipped",
359 repo_id[:16],
360 )
361
362 return _mpack_response(result, request)
363
364
365 @router.post(
366 "/{owner}/{slug}/repair-object",
367 summary="Replace a corrupt stored object with verified correct bytes (owner/write only)",
368 status_code=status.HTTP_200_OK,
369 )
370 @limiter.limit(REPAIR_LIMIT)
371 async def repair_object(
372 request: Request,
373 owner: SlugParam,
374 slug: SlugParam,
375 claims: TokenClaims = Depends(require_valid_token),
376 session: AsyncSession = Depends(get_session),
377 ) -> Response:
378 """Replace a stored object's bytes with correct content, verified by SHA-256.
379
380 Intended for operators to repair objects that were stored with wrong bytes —
381 e.g. objects produced by a failed delta reconstruction where the base was
382 zlib-compressed and the delta result is garbage.
383
384 Request body (msgpack):
385 object_id str — bare 64-char hex or "sha256:<hex>"
386 content bytes — the correct raw bytes for this object
387
388 The endpoint verifies SHA-256(content) == object_id before writing.
389 Only the repo owner or a write/admin collaborator may call this.
390 """
391 raw = await request.body()
392 ct = request.headers.get("Content-Type", "")
393 data = _decode_request_body(raw, ct)
394 object_id: str = data.get("object_id", "")
395 content: bytes = data.get("content", b"")
396 if not object_id or not isinstance(content, bytes):
397 raise HTTPException(
398 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
399 detail="repair-object requires object_id (str) and content (bytes)",
400 )
401 repo_id = await _resolve_repo_id(session, owner, slug)
402 caller_id: str | None = claims.handle
403 try:
404 result = await wire_repair_object(session, repo_id, object_id, content, caller_id)
405 except PermissionError as exc:
406 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc))
407 except ObjectHashMismatch as exc:
408 raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=str(exc))
409 except ValueError as exc:
410 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
411 return _mpack_response(result, request)
412
413 @router.post(
414 "/{owner}/{slug}/repair-snapshot",
415 summary="Replace a corrupt snapshot manifest with verified correct content (owner/write only)",
416 status_code=status.HTTP_200_OK,
417 )
418 @limiter.limit(REPAIR_LIMIT)
419 async def repair_snapshot(
420 request: Request,
421 owner: SlugParam,
422 slug: SlugParam,
423 claims: TokenClaims = Depends(require_valid_token),
424 session: AsyncSession = Depends(get_session),
425 ) -> Response:
426 """Replace a stored snapshot's manifest with correct content, verified by snapshot_id.
427
428 Intended for operators to repair snapshots that were stored with an empty
429 or corrupted manifest_blob — e.g. snapshots affected by the R2 empty-object
430 bug. Uses force-overwrite (unlike the push path which uses ON CONFLICT DO
431 NOTHING), so an existing row with wrong content is corrected in place.
432
433 Request body (msgpack):
434 snapshot_id str — bare 64-char hex snapshot ID
435 manifest dict[str,str] — {path: object_id} mapping
436 directories list[str] — directory paths (may be empty)
437
438 The endpoint recomputes compute_snapshot_id(manifest, directories) and
439 verifies it matches snapshot_id before writing.
440 Only the repo owner or a write/admin collaborator may call this.
441 """
442 raw = await request.body()
443 ct = request.headers.get("Content-Type", "")
444 data = _decode_request_body(raw, ct)
445 snapshot_id: str = data.get("snapshot_id", "")
446 manifest: JSONObject = data.get("manifest", {})
447 directories: list[str] = data.get("directories", [])
448 if not snapshot_id or not isinstance(manifest, dict):
449 raise HTTPException(
450 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
451 detail="repair-snapshot requires snapshot_id (str) and manifest (dict)",
452 )
453 repo_id = await _resolve_repo_id(session, owner, slug)
454 caller_id: str | None = claims.handle
455 try:
456 result = await wire_repair_snapshot(
457 session, repo_id, snapshot_id, manifest, directories, caller_id
458 )
459 except PermissionError as exc:
460 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc))
461 except ObjectHashMismatch as exc:
462 raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=str(exc))
463 except ValueError as exc:
464 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
465 return _mpack_response(result, request)
466
467 @router.post(
468 "/{owner}/{slug}/fetch/objects",
469 summary="Fetch raw content for a list of object IDs",
470 status_code=status.HTTP_200_OK,
471 )
472 @limiter.limit(OBJECT_LIMIT)
473 async def fetch_objects(
474 request: Request,
475 owner: SlugParam,
476 slug: SlugParam,
477 _claims: TokenClaims | None = Depends(optional_token),
478 session: AsyncSession = Depends(get_session),
479 ) -> Response:
480 """Return raw bytes for each requested object ID as concatenated msgpack frames.
481
482 Request body (msgpack):
483 object_ids list[str] — canonical sha256:<hex> IDs to fetch
484
485 Response body: concatenated msgpack dicts, one per found object:
486 {"object_id": str, "content": bytes}
487
488 Objects not found are silently omitted.
489 """
490 raw = await request.body()
491 ct = request.headers.get("Content-Type", "")
492 data = _decode_request_body(raw, ct)
493 object_ids = data.get("object_ids", [])
494 if not isinstance(object_ids, list):
495 raise HTTPException(
496 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
497 detail="fetch/objects requires object_ids (list)",
498 )
499 repo = await _resolve_repo(session, owner, slug)
500 await _assert_readable(repo, _claims, session)
501 objects = await wire_fetch_objects(session, repo.repo_id, [str(o) for o in object_ids])
502 import msgpack as _mp
503 body = b"".join(_mp.packb(obj, use_bin_type=True) for obj in objects)
504 return Response(content=body, media_type="application/x-msgpack")
505
506 @router.post(
507 "/{owner}/{slug}/fetch/presign",
508 summary="Wire — presigned fetch for large repos",
509 status_code=status.HTTP_200_OK,
510 )
511 @limiter.limit(WIRE_FETCH_LIMIT)
512 async def fetch_presign(
513 request: Request,
514 owner: SlugParam,
515 slug: SlugParam,
516 _claims: TokenClaims | None = Depends(optional_token),
517 session: AsyncSession = Depends(get_session),
518 ) -> Response:
519 """Return per-object presigned GET URLs for a large fetch delta.
520
521 For large fetches (≥ 500 objects or ≥ 50 MB) Cloudflare's origin timeout
522 kills the response before the client receives everything. This
523 endpoint generates one presigned R2 GET URL per needed object — zero object
524 bytes are read server-side. The client downloads all objects in parallel
525 directly from R2, bypassing Cloudflare entirely.
526
527 **Request body** (``Content-Type: application/x-msgpack``):
528 msgpack ``{"want": [str], "have": [str], "depth": int|null, "ttl_seconds": int}``
529
530 **Response body** (``Content-Type: application/x-msgpack``):
531 msgpack ``{"presign": bool, "blob_urls": {oid: url}, "commits": [...],
532 "snapshots": [...], "branch_heads": {...}, "repo_id": str,
533 "domain": str, "default_branch": str,
534 "expires_at": str|null, "commit_count": int, "blob_count": int}``
535
536 When ``presign=False`` the delta is below the threshold or the backend does
537 not support presigned URLs — the client should fall back to ``fetch/mpack``.
538 """
539 from musehub.services.musehub_wire import wire_fetch_presign
540 raw = await request.body()
541 data = _decode_request_body(raw, "application/x-msgpack")
542 ttl_seconds = int(data.pop("ttl_seconds", 3600))
543 body = WireFetchRequest.model_validate(data)
544 repo = await _resolve_repo(session, owner, slug)
545 await _assert_readable(repo, _claims, session)
546 result = await wire_fetch_presign(session, repo.repo_id, body, ttl_seconds)
547 return _mpack_response(result, request)
548
549
550 @router.post(
551 "/{owner}/{slug}/fetch",
552 summary="Wire — fetch protocol step 1 (issue #68)",
553 status_code=status.HTTP_200_OK,
554 )
555 @limiter.limit(WIRE_FETCH_LIMIT)
556 async def fetch(
557 request: Request,
558 owner: SlugParam,
559 slug: SlugParam,
560 _claims: TokenClaims | None = Depends(optional_token),
561 session: AsyncSession = Depends(get_session),
562 ) -> Response:
563 """Compute the fetch delta and return a presigned GET URL for the mpack.
564
565 **Request body** (``Content-Type: application/x-msgpack``):
566 ``{"want": [sha256:...], "have": [sha256:...]}``
567
568 **Response body** (``Content-Type: application/x-msgpack``):
569 ``{"mpack_id": "sha256:...", "mpack_url": "...", "commit_count": N, "object_count": N}``
570
571 Returns 422 if want is empty or entries are malformed.
572 Returns 404 if any want commit_id is unknown.
573 Returns 503 with Retry-After if needed objects are not yet indexed.
574 """
575 raw = await request.body()
576 data = _decode_request_body(raw, "application/x-msgpack")
577 want: list[str] = data.get("want") or []
578 have: list[str] = data.get("have") or []
579
580 if not want:
581 raise HTTPException(status_code=422, detail="want must be non-empty")
582
583 repo = await _resolve_repo(session, owner, slug)
584 await _assert_readable(repo, _claims, session)
585
586 try:
587 result = await wire_fetch(session, repo.repo_id, want, have)
588 except MPackValidationError as exc:
589 raise HTTPException(status_code=422, detail=str(exc)) from exc
590 except FetchCommitNotFound as exc:
591 raise HTTPException(status_code=404, detail=str(exc)) from exc
592 except FetchNotReady as exc:
593 from fastapi.responses import Response as _Resp
594 body = msgpack.packb({"error": str(exc), "retry_after": 60}, use_bin_type=True)
595 return _Resp(
596 content=body,
597 status_code=503,
598 headers={"Retry-After": "60", "Content-Type": "application/x-msgpack"},
599 )
600
601 return _mpack_response(result, request)
602
603
604 @router.post(
605 "/{owner}/{slug}/fetch/mpack",
606 summary="Wire — single-mpack fetch (issue #47)",
607 status_code=status.HTTP_200_OK,
608 )
609 @limiter.limit(WIRE_FETCH_LIMIT)
610 async def fetch_mpack(
611 request: Request,
612 owner: SlugParam,
613 slug: SlugParam,
614 _claims: TokenClaims | None = Depends(optional_token),
615 session: AsyncSession = Depends(get_session),
616 ) -> Response:
617 """Return the fetch delta as a single content-addressed mpack.
618
619 Phase 2 protocol: server assembles the fetch mpack, stores it ephemerally
620 in MinIO, and returns a presigned GET URL. The client GETs the mpack
621 directly from MinIO, verifies sha256, then calls apply_mpack().
622
623 **Request body** (``Content-Type: application/x-msgpack``):
624 msgpack ``{"want": [str], "have": [str], "ttl_seconds": int}``
625
626 **Response body** (``Content-Type: application/x-msgpack``):
627 msgpack ``{"mpack_url": str|null, "mpack_id": str|null,
628 "commit_count": int, "blob_count": int}``
629
630 Returns 503 with ``Retry-After: 60`` if needed objects are not yet indexed
631 (mpack.index background job still running).
632 """
633 raw = await request.body()
634 data = _decode_request_body(raw, "application/x-msgpack")
635 ttl_seconds = int(data.pop("ttl_seconds", 3600))
636 want: list[str] = data.get("want") or []
637 have: list[str] = data.get("have") or []
638 repo = await _resolve_repo(session, owner, slug)
639 await _assert_readable(repo, _claims, session)
640 try:
641 result = await wire_fetch_mpack(session, repo.repo_id, want, have, ttl_seconds)
642 except FetchNotIndexedError as exc:
643 return Response(
644 content=f"Objects not yet indexed: {exc.missing_count} missing. Retry shortly.",
645 status_code=503,
646 headers={"Retry-After": "60"},
647 )
648 return _mpack_response(result, request)
649
650
651 # ── release wire endpoints ─────────────────────────────────────────────────────
652
653 @router.post(
654 "/{owner}/{slug}/releases",
655 summary="Push a release from muse CLI",
656 status_code=status.HTTP_201_CREATED,
657 )
658 @limiter.limit(WIRE_PUSH_LIMIT)
659 async def wire_create_release(
660 request: Request,
661 owner: SlugParam,
662 slug: SlugParam,
663 background_tasks: BackgroundTasks,
664 claims: TokenClaims = Depends(require_valid_token),
665 session: AsyncSession = Depends(get_session),
666 ) -> Response:
667 """Accept a ``ReleaseDict`` payload from ``muse release push``.
668
669 The body must be a JSON object matching the CLI ``ReleaseRecord.to_dict()``
670 shape (application/json). Returns immediately with the server-assigned
671 ``release_id``; semantic analysis runs as a background task so the push
672 is never blocked by analysis time.
673
674 Only the repo owner may push releases — the endpoint mirrors the auth
675 model of ``POST /{owner}/{slug}/push``.
676 """
677 from musehub.services import musehub_releases as rel_svc
678 from musehub.services.release_analysis import analyse_release_background
679
680 raw = await request.body()
681 ct = request.headers.get("Content-Type", "")
682 data = _decode_request_body(raw, ct)
683
684 repo_id = await _resolve_repo_id(session, owner, slug)
685
686 try:
687 response = await rel_svc.create_release_from_dict(session, repo_id, data)
688 except ValueError as exc:
689 raise HTTPException(
690 status_code=status.HTTP_409_CONFLICT,
691 detail=str(exc),
692 )
693
694 await session.commit()
695 logger.info("✅ wire: release %s pushed for %s/%s", response.tag, owner, slug)
696
697 # Fire semantic analysis after the response is sent. Opens its own
698 # session so it is independent of the request lifecycle.
699 background_tasks.add_task(
700 analyse_release_background, repo_id, response.release_id
701 )
702
703 return _mpack_response({"release_id": response.release_id}, request)
704
705 @router.delete(
706 "/{owner}/{slug}/branches/{branch_name:path}",
707 summary="Delete a branch from MuseHub",
708 status_code=status.HTTP_200_OK,
709 )
710 @limiter.limit(WIRE_PUSH_LIMIT)
711 async def wire_delete_branch(
712 request: Request,
713 owner: SlugParam,
714 slug: SlugParam,
715 branch_name: str,
716 claims: TokenClaims = Depends(require_valid_token),
717 session: AsyncSession = Depends(get_session),
718 ) -> Response:
719 """Delete a branch pushed by ``muse push``.
720
721 Idiomatic equivalent of ``git push origin --delete <branch>``. The branch
722 ref is removed from MuseHub; commits and objects are unaffected.
723
724 Only the repo owner may delete branches. Attempting to delete the
725 repository's default branch is rejected with 409.
726 """
727 from sqlalchemy import delete as sql_delete
728
729 repo = await _resolve_repo(session, owner, slug)
730
731 caller_id: str | None = claims.handle
732 if caller_id != repo.owner:
733 raise HTTPException(
734 status_code=status.HTTP_403_FORBIDDEN,
735 detail="only the repo owner may delete branches",
736 )
737
738 default_branch: str = getattr(repo, "default_branch", None) or "main"
739 if branch_name == default_branch:
740 raise HTTPException(
741 status_code=status.HTTP_409_CONFLICT,
742 detail=f"cannot delete the default branch '{default_branch}'",
743 )
744
745 result = await session.execute(
746 sql_delete(MusehubBranch).where(
747 MusehubBranch.repo_id == repo.repo_id,
748 MusehubBranch.name == branch_name,
749 ).returning(MusehubBranch.name)
750 )
751 deleted_name: str | None = result.scalar_one_or_none()
752 if deleted_name is None:
753 raise HTTPException(
754 status_code=status.HTTP_404_NOT_FOUND,
755 detail=f"branch '{branch_name}' not found",
756 )
757
758 await session.commit()
759 logger.info("✅ wire: branch %s deleted from %s/%s", branch_name, owner, slug)
760 return _mpack_response({"deleted": branch_name}, request)
761
762 @router.delete(
763 "/{owner}/{slug}/releases/{tag:path}",
764 summary="Retract a release from MuseHub",
765 status_code=status.HTTP_200_OK,
766 )
767 @limiter.limit(WIRE_PUSH_LIMIT)
768 async def wire_delete_release(
769 request: Request,
770 owner: SlugParam,
771 slug: SlugParam,
772 tag: str,
773 claims: TokenClaims = Depends(require_valid_token),
774 session: AsyncSession = Depends(get_session),
775 ) -> Response:
776 """Retract a release pushed by ``muse release push``.
777
778 Removes the named release label from MuseHub. The underlying commits and
779 snapshots are not affected — they remain in the content-addressed object
780 store and are still reachable by their SHA-256.
781
782 Only the repo owner may retract releases.
783 """
784 from musehub.services import musehub_releases as rel_svc
785
786 repo = await _resolve_repo(session, owner, slug)
787
788 caller_id: str | None = claims.handle
789 if caller_id != repo.owner:
790 raise HTTPException(
791 status_code=status.HTTP_403_FORBIDDEN,
792 detail="only the repo owner may retract releases",
793 )
794
795 deleted = await rel_svc.delete_release_by_tag(session, repo.repo_id, tag)
796 if not deleted:
797 raise HTTPException(
798 status_code=status.HTTP_404_NOT_FOUND,
799 detail=f"release '{tag}' not found",
800 )
801
802 await session.commit()
803 logger.info("✅ wire: release %s retracted from %s/%s", tag, owner, slug)
804 return _mpack_response({"retracted": tag}, request)
805
806 # ── wire-tag endpoints ─────────────────────────────────────────────────────────
807
808 @router.post(
809 "/{owner}/{slug}/tags",
810 summary="Push lightweight wire tags from muse CLI",
811 status_code=status.HTTP_200_OK,
812 )
813 @limiter.limit(WIRE_PUSH_LIMIT)
814 async def wire_push_tags(
815 request: Request,
816 owner: SlugParam,
817 slug: SlugParam,
818 claims: TokenClaims = Depends(require_valid_token),
819 session: AsyncSession = Depends(get_session),
820 ) -> Response:
821 """Upsert a batch of lightweight semantic tags pushed from the Muse CLI.
822
823 The body must be a msgpack object with a ``tags`` key holding a list of
824 ``WireTag`` dicts. The server upserts them — pushing the same tag label
825 twice for the same repo is a no-op (``commit_id`` is refreshed).
826
827 Returns ``{"stored": <count>}`` as msgpack or JSON.
828 """
829 from musehub.models.musehub import WireTagInput
830 from musehub.services import musehub_wire_tags as tag_svc
831
832 raw = await request.body()
833 ct = request.headers.get("Content-Type", "")
834 try:
835 data = _decode_request_body(raw, ct)
836 except (ValueError, Exception):
837 raise HTTPException(
838 status_code=status.HTTP_400_BAD_REQUEST,
839 detail="Request body must be a valid msgpack or JSON object",
840 )
841
842 repo_id = await _resolve_repo_id(session, owner, slug)
843
844 tags_raw = data.get("tags", [])
845 if not isinstance(tags_raw, list):
846 raise HTTPException(
847 status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
848 detail="'tags' must be a list",
849 )
850
851 tags: list[WireTagInput] = []
852 for item in tags_raw:
853 if not isinstance(item, dict):
854 continue
855 tag_id_raw = item.get("tag_id", "")
856 commit_id_raw = item.get("commit_id", "")
857 tag_label_raw = item.get("tag", "")
858 created_at_raw = item.get("created_at", "")
859 tags.append(
860 WireTagInput(
861 tag_id=str(tag_id_raw) if isinstance(tag_id_raw, str) else "",
862 commit_id=str(commit_id_raw) if isinstance(commit_id_raw, str) else "",
863 tag=str(tag_label_raw) if isinstance(tag_label_raw, str) else "",
864 created_at=str(created_at_raw) if isinstance(created_at_raw, str) else "",
865 )
866 )
867
868 stored = await tag_svc.store_wire_tags(session, repo_id, tags)
869 await session.commit()
870 logger.info("✅ wire: %d tag(s) pushed for %s/%s", stored, owner, slug)
871 return _mpack_response({"stored": stored}, request)
872
873 # ── content-addressed CDN ──────────────────────────────────────────────────────
874
875 @router.get(
876 "/o/{object_id:path}",
877 summary="Content-addressed object CDN endpoint",
878 response_description="Raw binary blob",
879 tags=["Objects"],
880 )
881 @limiter.limit(OBJECT_LIMIT)
882 async def get_object(
883 request: Request,
884 object_id: str,
885 repo_id: str | None = None,
886 _claims: TokenClaims | None = Depends(optional_token),
887 session: AsyncSession = Depends(get_session),
888 ) -> Response:
889 """Serve a content-addressed binary object.
890
891 Objects are immutable (ID is derived from content hash), so the response
892 carries ``Cache-Control: max-age=31536000, immutable`` — safe to place
893 behind CloudFront forever.
894 """
895 from musehub.storage.backends import read_object_bytes
896 backend = get_backend()
897 try:
898 raw = await backend.get(object_id)
899 except ValueError:
900 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="invalid object path")
901 if raw is None:
902 obj_row = await session.scalar(
903 select(MusehubObject).where(MusehubObject.object_id == object_id).limit(1)
904 )
905 if obj_row is not None:
906 raw = await read_object_bytes(obj_row, session=session)
907 if raw is None:
908 logger.error(
909 "OBJECT_404 object_id=%s repo_id=%s backend=%s",
910 object_id,
911 repo_id,
912 type(get_backend()).__name__,
913 )
914 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="object not found")
915
916 # Integrity check: verify stored bytes match the content-addressed ID.
917 import hashlib as _hashlib
918 actual_hash = _hashlib.sha256(raw).hexdigest()
919 expected_hex = object_id.removeprefix("sha256:")
920 if actual_hash != expected_hex:
921 logger.error(
922 "OBJECT_CORRUPT object_id=%s actual_hash=sha256:%s size=%d backend=%s",
923 object_id,
924 actual_hash,
925 len(raw),
926 type(get_backend()).__name__,
927 )
928
929 return Response(
930 content=raw,
931 media_type="application/octet-stream",
932 headers={
933 "Cache-Control": "public, max-age=31536000, immutable",
934 "ETag": f'"{object_id}"',
935 },
936 )
937
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 2 days ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 11 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 13 days ago