"""Backfill structured_delta from local Muse history into musehub_commits column. Migration 0021 added structured_delta as a first-class column. Historical rows are NULL because the data was in the now-dropped commit_meta blob. This script reads structured_delta for each commit directly from the local Muse object store (which has the full history) and writes it back to the DB. Usage (run from the host, not inside Docker — needs the local muse CLI): python3 deploy/backfill_structured_delta.py --repo ~/ecosystem/musehub --dry-run python3 deploy/backfill_structured_delta.py --repo ~/ecosystem/musehub """ from __future__ import annotations import argparse import asyncio import json import subprocess import sys import time import sqlalchemy as sa from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from musehub.db.database import get_database_url from musehub.db.musehub_repo_models import MusehubCommit def muse(repo: str, *args: str) -> dict: result = subprocess.run( ["muse", "-C", repo, *args, "--json"], capture_output=True, text=True, check=True, ) return json.loads(result.stdout) async def run(repo_path: str, dry_run: bool, batch_size: int) -> None: engine = create_async_engine(get_database_url(), echo=False) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) print(f"Reading commit log from {repo_path}…") log = muse(repo_path, "log") commits = log["commits"] print(f"Found {len(commits)} commits in local history.") updated = 0 skipped = 0 missing = 0 errors = 0 t0 = time.monotonic() async with async_session() as session: for i, entry in enumerate(commits, 1): commit_id: str = entry["commit_id"] # Check whether this commit exists in the DB and needs backfilling row = (await session.execute( sa.select(MusehubCommit.commit_id, MusehubCommit.structured_delta) .where(MusehubCommit.commit_id == commit_id) )).one_or_none() if row is None: missing += 1 continue if row.structured_delta is not None: skipped += 1 continue # Read structured_delta from local Muse object store try: commit_data = muse(repo_path, "read", commit_id) delta = commit_data.get("structured_delta") except subprocess.CalledProcessError as exc: print(f" ✗ {commit_id[:16]}… muse read failed: {exc.stderr.strip()}", file=sys.stderr) errors += 1 continue if not isinstance(delta, dict): skipped += 1 continue if not dry_run: await session.execute( sa.update(MusehubCommit) .where(MusehubCommit.commit_id == commit_id) .values(structured_delta=delta) ) updated += 1 if updated % batch_size == 0: if not dry_run: await session.commit() elapsed = time.monotonic() - t0 print(f" [{i}/{len(commits)}] {updated} updated, {skipped} skipped, {missing} not-in-db — {elapsed:.1f}s") if not dry_run: await session.commit() elapsed = time.monotonic() - t0 tag = "[DRY RUN] " if dry_run else "" print(f"\n{tag}Done in {elapsed:.1f}s:") print(f" updated : {updated}") print(f" skipped : {skipped} (already had delta or no delta in local store)") print(f" not-in-db: {missing} (local commits not yet pushed to server)") print(f" errors : {errors}") await engine.dispose() def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--repo", required=True, help="Path to local Muse repo (e.g. ~/ecosystem/musehub)") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--batch-size", type=int, default=100) args = parser.parse_args() asyncio.run(run(args.repo, args.dry_run, args.batch_size)) if __name__ == "__main__": main()