gabriel / musehub public

backfill_structured_delta.py file-level

at sha256:0 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Backfill structured_delta from local Muse history into musehub_commits column.
2
3 Migration 0021 added structured_delta as a first-class column. Historical rows
4 are NULL because the data was in the now-dropped commit_meta blob. This script
5 reads structured_delta for each commit directly from the local Muse object
6 store (which has the full history) and writes it back to the DB.
7
8 Usage (run from the host, not inside Docker — needs the local muse CLI):
9 python3 deploy/backfill_structured_delta.py --repo ~/ecosystem/musehub --dry-run
10 python3 deploy/backfill_structured_delta.py --repo ~/ecosystem/musehub
11 """
12 from __future__ import annotations
13
14 import argparse
15 import asyncio
16 import json
17 import subprocess
18 import sys
19 import time
20
21 import sqlalchemy as sa
22 from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
23 from sqlalchemy.orm import sessionmaker
24
25 from musehub.db.database import get_database_url
26 from musehub.db.musehub_repo_models import MusehubCommit
27
28
29 def muse(repo: str, *args: str) -> dict:
30 result = subprocess.run(
31 ["muse", "-C", repo, *args, "--json"],
32 capture_output=True, text=True, check=True,
33 )
34 return json.loads(result.stdout)
35
36
37 async def run(repo_path: str, dry_run: bool, batch_size: int) -> None:
38 engine = create_async_engine(get_database_url(), echo=False)
39 async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
40
41 print(f"Reading commit log from {repo_path}…")
42 log = muse(repo_path, "log")
43 commits = log["commits"]
44 print(f"Found {len(commits)} commits in local history.")
45
46 updated = 0
47 skipped = 0
48 missing = 0
49 errors = 0
50 t0 = time.monotonic()
51
52 async with async_session() as session:
53 for i, entry in enumerate(commits, 1):
54 commit_id: str = entry["commit_id"]
55
56 # Check whether this commit exists in the DB and needs backfilling
57 row = (await session.execute(
58 sa.select(MusehubCommit.commit_id, MusehubCommit.structured_delta)
59 .where(MusehubCommit.commit_id == commit_id)
60 )).one_or_none()
61
62 if row is None:
63 missing += 1
64 continue
65
66 if row.structured_delta is not None:
67 skipped += 1
68 continue
69
70 # Read structured_delta from local Muse object store
71 try:
72 commit_data = muse(repo_path, "read", commit_id)
73 delta = commit_data.get("structured_delta")
74 except subprocess.CalledProcessError as exc:
75 print(f" ✗ {commit_id[:16]}… muse read failed: {exc.stderr.strip()}", file=sys.stderr)
76 errors += 1
77 continue
78
79 if not isinstance(delta, dict):
80 skipped += 1
81 continue
82
83 if not dry_run:
84 await session.execute(
85 sa.update(MusehubCommit)
86 .where(MusehubCommit.commit_id == commit_id)
87 .values(structured_delta=delta)
88 )
89
90 updated += 1
91
92 if updated % batch_size == 0:
93 if not dry_run:
94 await session.commit()
95 elapsed = time.monotonic() - t0
96 print(f" [{i}/{len(commits)}] {updated} updated, {skipped} skipped, {missing} not-in-db — {elapsed:.1f}s")
97
98 if not dry_run:
99 await session.commit()
100
101 elapsed = time.monotonic() - t0
102 tag = "[DRY RUN] " if dry_run else ""
103 print(f"\n{tag}Done in {elapsed:.1f}s:")
104 print(f" updated : {updated}")
105 print(f" skipped : {skipped} (already had delta or no delta in local store)")
106 print(f" not-in-db: {missing} (local commits not yet pushed to server)")
107 print(f" errors : {errors}")
108
109 await engine.dispose()
110
111
112 def main() -> None:
113 parser = argparse.ArgumentParser(description=__doc__)
114 parser.add_argument("--repo", required=True, help="Path to local Muse repo (e.g. ~/ecosystem/musehub)")
115 parser.add_argument("--dry-run", action="store_true")
116 parser.add_argument("--batch-size", type=int, default=100)
117 args = parser.parse_args()
118
119 asyncio.run(run(args.repo, args.dry_run, args.batch_size))
120
121
122 if __name__ == "__main__":
123 main()