backfill_structured_delta.py
python
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
6 days ago
| 1 | """Backfill structured_delta from local Muse history into musehub_commits column. |
| 2 | |
| 3 | Migration 0021 added structured_delta as a first-class column. Historical rows |
| 4 | are NULL because the data was in the now-dropped commit_meta blob. This script |
| 5 | reads structured_delta for each commit directly from the local Muse object |
| 6 | store (which has the full history) and writes it back to the DB. |
| 7 | |
| 8 | Usage (run from the host, not inside Docker — needs the local muse CLI): |
| 9 | python3 deploy/backfill_structured_delta.py --repo ~/ecosystem/musehub --dry-run |
| 10 | python3 deploy/backfill_structured_delta.py --repo ~/ecosystem/musehub |
| 11 | """ |
| 12 | from __future__ import annotations |
| 13 | |
| 14 | import argparse |
| 15 | import asyncio |
| 16 | import json |
| 17 | import subprocess |
| 18 | import sys |
| 19 | import time |
| 20 | |
| 21 | import sqlalchemy as sa |
| 22 | from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine |
| 23 | from sqlalchemy.orm import sessionmaker |
| 24 | |
| 25 | from musehub.db.database import get_database_url |
| 26 | from musehub.db.musehub_repo_models import MusehubCommit |
| 27 | |
| 28 | |
| 29 | def muse(repo: str, *args: str) -> dict: |
| 30 | result = subprocess.run( |
| 31 | ["muse", "-C", repo, *args, "--json"], |
| 32 | capture_output=True, text=True, check=True, |
| 33 | ) |
| 34 | return json.loads(result.stdout) |
| 35 | |
| 36 | |
| 37 | async def run(repo_path: str, dry_run: bool, batch_size: int) -> None: |
| 38 | engine = create_async_engine(get_database_url(), echo=False) |
| 39 | async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) |
| 40 | |
| 41 | print(f"Reading commit log from {repo_path}…") |
| 42 | log = muse(repo_path, "log") |
| 43 | commits = log["commits"] |
| 44 | print(f"Found {len(commits)} commits in local history.") |
| 45 | |
| 46 | updated = 0 |
| 47 | skipped = 0 |
| 48 | missing = 0 |
| 49 | errors = 0 |
| 50 | t0 = time.monotonic() |
| 51 | |
| 52 | async with async_session() as session: |
| 53 | for i, entry in enumerate(commits, 1): |
| 54 | commit_id: str = entry["commit_id"] |
| 55 | |
| 56 | # Check whether this commit exists in the DB and needs backfilling |
| 57 | row = (await session.execute( |
| 58 | sa.select(MusehubCommit.commit_id, MusehubCommit.structured_delta) |
| 59 | .where(MusehubCommit.commit_id == commit_id) |
| 60 | )).one_or_none() |
| 61 | |
| 62 | if row is None: |
| 63 | missing += 1 |
| 64 | continue |
| 65 | |
| 66 | if row.structured_delta is not None: |
| 67 | skipped += 1 |
| 68 | continue |
| 69 | |
| 70 | # Read structured_delta from local Muse object store |
| 71 | try: |
| 72 | commit_data = muse(repo_path, "read", commit_id) |
| 73 | delta = commit_data.get("structured_delta") |
| 74 | except subprocess.CalledProcessError as exc: |
| 75 | print(f" ✗ {commit_id[:16]}… muse read failed: {exc.stderr.strip()}", file=sys.stderr) |
| 76 | errors += 1 |
| 77 | continue |
| 78 | |
| 79 | if not isinstance(delta, dict): |
| 80 | skipped += 1 |
| 81 | continue |
| 82 | |
| 83 | if not dry_run: |
| 84 | await session.execute( |
| 85 | sa.update(MusehubCommit) |
| 86 | .where(MusehubCommit.commit_id == commit_id) |
| 87 | .values(structured_delta=delta) |
| 88 | ) |
| 89 | |
| 90 | updated += 1 |
| 91 | |
| 92 | if updated % batch_size == 0: |
| 93 | if not dry_run: |
| 94 | await session.commit() |
| 95 | elapsed = time.monotonic() - t0 |
| 96 | print(f" [{i}/{len(commits)}] {updated} updated, {skipped} skipped, {missing} not-in-db — {elapsed:.1f}s") |
| 97 | |
| 98 | if not dry_run: |
| 99 | await session.commit() |
| 100 | |
| 101 | elapsed = time.monotonic() - t0 |
| 102 | tag = "[DRY RUN] " if dry_run else "" |
| 103 | print(f"\n{tag}Done in {elapsed:.1f}s:") |
| 104 | print(f" updated : {updated}") |
| 105 | print(f" skipped : {skipped} (already had delta or no delta in local store)") |
| 106 | print(f" not-in-db: {missing} (local commits not yet pushed to server)") |
| 107 | print(f" errors : {errors}") |
| 108 | |
| 109 | await engine.dispose() |
| 110 | |
| 111 | |
| 112 | def main() -> None: |
| 113 | parser = argparse.ArgumentParser(description=__doc__) |
| 114 | parser.add_argument("--repo", required=True, help="Path to local Muse repo (e.g. ~/ecosystem/musehub)") |
| 115 | parser.add_argument("--dry-run", action="store_true") |
| 116 | parser.add_argument("--batch-size", type=int, default=100) |
| 117 | args = parser.parse_args() |
| 118 | |
| 119 | asyncio.run(run(args.repo, args.dry_run, args.batch_size)) |
| 120 | |
| 121 | |
| 122 | if __name__ == "__main__": |
| 123 | main() |
File History
1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
6 days ago