backfill_intel_jobs.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """One-shot backfill: enqueue intel jobs for specified repos. |
| 2 | |
| 3 | Usage: |
| 4 | python deploy/backfill_intel_jobs.py gabriel/muse gabriel/musehub |
| 5 | """ |
| 6 | from __future__ import annotations |
| 7 | |
| 8 | import asyncio |
| 9 | import sys |
| 10 | import os |
| 11 | |
| 12 | # Ensure app root is on path when run inside the container |
| 13 | sys.path.insert(0, "/app") |
| 14 | |
| 15 | from sqlalchemy import select, text |
| 16 | from musehub.db.database import AsyncSessionLocal, init_db |
| 17 | from musehub.db.musehub_repo_models import MusehubRepo, MusehubCommit, MusehubCommitRef |
| 18 | from musehub.services.musehub_jobs import enqueue_job |
| 19 | |
| 20 | |
| 21 | async def backfill(slugs: list[str]) -> None: |
| 22 | await init_db() |
| 23 | async with AsyncSessionLocal() as session: |
| 24 | for slug in slugs: |
| 25 | owner, name = slug.split("/", 1) |
| 26 | |
| 27 | repo_id: str | None = (await session.execute( |
| 28 | select(MusehubRepo.repo_id).where( |
| 29 | MusehubRepo.owner == owner, |
| 30 | MusehubRepo.name == name, |
| 31 | ).limit(1) |
| 32 | )).scalar_one_or_none() |
| 33 | |
| 34 | if not repo_id: |
| 35 | print(f" ✗ {slug} — not found") |
| 36 | continue |
| 37 | |
| 38 | # Get head commit for every branch |
| 39 | rows = (await session.execute( |
| 40 | select(MusehubCommit.branch, MusehubCommit.commit_id) |
| 41 | .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) |
| 42 | .where(MusehubCommitRef.repo_id == repo_id) |
| 43 | .order_by(MusehubCommit.timestamp.desc()) |
| 44 | )).all() |
| 45 | |
| 46 | if not rows: |
| 47 | print(f" ✗ {slug} — no commits") |
| 48 | continue |
| 49 | |
| 50 | # Dedupe: keep newest commit per branch |
| 51 | seen: set[str] = set() |
| 52 | branches: list[tuple[str, str]] = [] |
| 53 | for branch, commit_id in rows: |
| 54 | if branch and branch not in seen: |
| 55 | seen.add(branch) |
| 56 | branches.append((branch, commit_id)) |
| 57 | |
| 58 | for branch, head in branches: |
| 59 | payload = {"head": head, "branch": branch} |
| 60 | for job_type in ("intel.structural", "intel.code", "push.file_last_commits"): |
| 61 | job_id = await enqueue_job(session, repo_id, job_type, payload) |
| 62 | status = job_id[:16] + "…" if job_id else "already pending" |
| 63 | print(f" ✓ {slug} [{branch}] {job_type} → {status}") |
| 64 | |
| 65 | await session.commit() |
| 66 | print("\nAll jobs committed — worker will pick them up.") |
| 67 | |
| 68 | |
| 69 | if __name__ == "__main__": |
| 70 | targets = sys.argv[1:] or ["gabriel/muse", "gabriel/musehub"] |
| 71 | asyncio.run(backfill(targets)) |