gabriel / musehub public
backfill_intel_jobs.py python
71 lines 2.5 KB
Raw
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor ⚠ breaking 1 day ago
1 """One-shot backfill: enqueue intel jobs for specified repos.
2
3 Usage:
4 python deploy/backfill_intel_jobs.py gabriel/muse gabriel/musehub
5 """
6 from __future__ import annotations
7
8 import asyncio
9 import sys
10 import os
11
12 # Ensure app root is on path when run inside the container
13 sys.path.insert(0, "/app")
14
15 from sqlalchemy import select, text
16 from musehub.db.database import AsyncSessionLocal, init_db
17 from musehub.db.musehub_repo_models import MusehubRepo, MusehubCommit, MusehubCommitRef
18 from musehub.services.musehub_jobs import enqueue_job
19
20
21 async def backfill(slugs: list[str]) -> None:
22 await init_db()
23 async with AsyncSessionLocal() as session:
24 for slug in slugs:
25 owner, name = slug.split("/", 1)
26
27 repo_id: str | None = (await session.execute(
28 select(MusehubRepo.repo_id).where(
29 MusehubRepo.owner == owner,
30 MusehubRepo.name == name,
31 ).limit(1)
32 )).scalar_one_or_none()
33
34 if not repo_id:
35 print(f" ✗ {slug} — not found")
36 continue
37
38 # Get head commit for every branch
39 rows = (await session.execute(
40 select(MusehubCommit.branch, MusehubCommit.commit_id)
41 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
42 .where(MusehubCommitRef.repo_id == repo_id)
43 .order_by(MusehubCommit.timestamp.desc())
44 )).all()
45
46 if not rows:
47 print(f" ✗ {slug} — no commits")
48 continue
49
50 # Dedupe: keep newest commit per branch
51 seen: set[str] = set()
52 branches: list[tuple[str, str]] = []
53 for branch, commit_id in rows:
54 if branch and branch not in seen:
55 seen.add(branch)
56 branches.append((branch, commit_id))
57
58 for branch, head in branches:
59 payload = {"head": head, "branch": branch}
60 for job_type in ("intel.structural", "intel.code", "push.file_last_commits"):
61 job_id = await enqueue_job(session, repo_id, job_type, payload)
62 status = job_id[:16] + "…" if job_id else "already pending"
63 print(f" ✓ {slug} [{branch}] {job_type} → {status}")
64
65 await session.commit()
66 print("\nAll jobs committed — worker will pick them up.")
67
68
69 if __name__ == "__main__":
70 targets = sys.argv[1:] or ["gabriel/muse", "gabriel/musehub"]
71 asyncio.run(backfill(targets))
File History 1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor 1 day ago