backfill_move_to_address.py
python
sha256:3c58668648c7323bb9f5c6881cfe6a3f14fc93fcb73b537d253732952a5bf8bf
chore: bump version to 0.2.0rc12
Sonnet 4.6
patch
8 days ago
| 1 | """Backfill to_address into DELETE rows that are move-source counterparts. |
| 2 | |
| 3 | When backfill_history_from_snapshots previously ran, move-source paths were |
| 4 | not recorded at all (the delete was suppressed). Now they are recorded with |
| 5 | op_payload.to_address pointing to the new path. |
| 6 | |
| 7 | This script finds existing DELETE rows that are missing to_address and |
| 8 | re-derives it by matching the corresponding MOVE row in the same commit |
| 9 | (same repo, same commit_id, move.op_payload.from_address == delete.address). |
| 10 | |
| 11 | Usage: |
| 12 | docker exec musehub python3 /app/deploy/backfill_move_to_address.py --dry-run |
| 13 | docker exec musehub python3 /app/deploy/backfill_move_to_address.py |
| 14 | docker exec musehub python3 /app/deploy/backfill_move_to_address.py --repo-id <id> |
| 15 | """ |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import argparse |
| 19 | import asyncio |
| 20 | import time |
| 21 | |
| 22 | import sqlalchemy as sa |
| 23 | from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine |
| 24 | from sqlalchemy.orm import sessionmaker |
| 25 | |
| 26 | from musehub.db.database import get_database_url |
| 27 | from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry |
| 28 | from musehub.db.musehub_repo_models import MusehubCommit |
| 29 | |
| 30 | |
| 31 | async def run(repo_id: str | None, dry_run: bool) -> None: |
| 32 | engine = create_async_engine(get_database_url(), echo=False) |
| 33 | async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) |
| 34 | |
| 35 | async with async_session() as session: |
| 36 | t0 = time.monotonic() |
| 37 | count = await backfill_move_to_address(session, repo_id=repo_id, dry_run=dry_run) |
| 38 | if not dry_run: |
| 39 | await session.commit() |
| 40 | elapsed = time.monotonic() - t0 |
| 41 | |
| 42 | verb = "Would update" if dry_run else "Updated" |
| 43 | scope = f" for repo {repo_id}" if repo_id else " across all repos" |
| 44 | print(f"{verb} {count} DELETE rows with to_address{scope} in {elapsed:.1f}s") |
| 45 | |
| 46 | |
| 47 | async def backfill_move_to_address( |
| 48 | session: AsyncSession, |
| 49 | repo_id: str | None = None, |
| 50 | *, |
| 51 | dry_run: bool = False, |
| 52 | ) -> int: |
| 53 | """For each MOVE row, find its corresponding DELETE row (same commit, same |
| 54 | repo, delete.address == move.op_payload.from_address) and write |
| 55 | to_address into the DELETE's op_payload. |
| 56 | |
| 57 | Skips DELETE rows that already have to_address set. |
| 58 | Also creates DELETE rows that are completely missing (old backfill run |
| 59 | suppressed them entirely). |
| 60 | """ |
| 61 | she = MusehubSymbolHistoryEntry |
| 62 | |
| 63 | # Load all rows that carry from_address in op_payload — covers both |
| 64 | # snapshot-diff MOVE rows and structured_delta PATCH/move rows. |
| 65 | ref_q = sa.select(she).where( |
| 66 | she.op_payload["from_address"].as_string() != None # noqa: E711 |
| 67 | ) |
| 68 | if repo_id is not None: |
| 69 | ref_q = ref_q.where(she.repo_id == repo_id) |
| 70 | ref_rows = (await session.execute(ref_q)).scalars().all() |
| 71 | |
| 72 | if not ref_rows: |
| 73 | return 0 |
| 74 | |
| 75 | # Build (repo_id, commit_id, from_address) → new_address map |
| 76 | move_map: dict[tuple[str, str, str], str] = {} |
| 77 | for row in ref_rows: |
| 78 | from_addr = (row.op_payload or {}).get("from_address") |
| 79 | if from_addr: |
| 80 | move_map[(row.repo_id, row.commit_id, from_addr)] = row.address |
| 81 | |
| 82 | if not move_map: |
| 83 | return 0 |
| 84 | |
| 85 | # Load existing DELETE rows for those (repo_id, commit_id, address) keys |
| 86 | # to determine which need updating vs. creating. |
| 87 | from_addrs = list({k[2] for k in move_map}) |
| 88 | existing_q = sa.select(she).where( |
| 89 | she.op == "delete", |
| 90 | she.address.in_(from_addrs), |
| 91 | ) |
| 92 | if repo_id is not None: |
| 93 | existing_q = existing_q.where(she.repo_id == repo_id) |
| 94 | existing_rows = (await session.execute(existing_q)).scalars().all() |
| 95 | |
| 96 | existing_map: dict[tuple[str, str, str], she] = { # type: ignore[valid-type] |
| 97 | (r.repo_id, r.commit_id, r.address): r for r in existing_rows |
| 98 | } |
| 99 | |
| 100 | # Also load commit metadata for creating missing rows |
| 101 | commit_ids = list({k[1] for k in move_map}) |
| 102 | commit_q = sa.select(MusehubCommit).where( |
| 103 | MusehubCommit.commit_id.in_(commit_ids) |
| 104 | ) |
| 105 | commit_map = {c.commit_id: c for c in (await session.execute(commit_q)).scalars().all()} |
| 106 | |
| 107 | updated = 0 |
| 108 | for (r_id, c_id, from_addr), to_addr in move_map.items(): |
| 109 | key = (r_id, c_id, from_addr) |
| 110 | existing = existing_map.get(key) |
| 111 | |
| 112 | if existing is not None: |
| 113 | if (existing.op_payload or {}).get("to_address"): |
| 114 | continue # already set |
| 115 | if not dry_run: |
| 116 | import json as _json |
| 117 | full_payload = dict(existing.op_payload or {}) |
| 118 | full_payload["to_address"] = to_addr |
| 119 | await session.execute( |
| 120 | sa.text( |
| 121 | "UPDATE musehub_symbol_history_entries" |
| 122 | " SET op_payload = CAST(:payload AS json)" |
| 123 | " WHERE repo_id = :repo_id" |
| 124 | " AND commit_id = :commit_id" |
| 125 | " AND address = :address" |
| 126 | " AND op = 'delete'" |
| 127 | ), |
| 128 | { |
| 129 | "payload": _json.dumps(full_payload), |
| 130 | "repo_id": r_id, |
| 131 | "commit_id": c_id, |
| 132 | "address": from_addr, |
| 133 | }, |
| 134 | ) |
| 135 | updated += 1 |
| 136 | else: |
| 137 | # Row missing entirely — create it |
| 138 | commit = commit_map.get(c_id) |
| 139 | if commit is None: |
| 140 | continue |
| 141 | payload = { |
| 142 | "inferred_from": "snapshot_diff", |
| 143 | "to_address": to_addr, |
| 144 | } |
| 145 | if not dry_run: |
| 146 | session.add(MusehubSymbolHistoryEntry( |
| 147 | repo_id=r_id, |
| 148 | address=from_addr, |
| 149 | commit_id=c_id, |
| 150 | op="delete", |
| 151 | op_payload=payload, |
| 152 | content_id=None, |
| 153 | committed_at=commit.timestamp, |
| 154 | author=commit.author or "", |
| 155 | )) |
| 156 | updated += 1 |
| 157 | |
| 158 | return updated |
| 159 | |
| 160 | |
| 161 | def main() -> None: |
| 162 | parser = argparse.ArgumentParser(description=__doc__) |
| 163 | parser.add_argument("--dry-run", action="store_true") |
| 164 | parser.add_argument("--repo-id", default=None) |
| 165 | parser.add_argument("-q", "--quiet", action="store_true") |
| 166 | args = parser.parse_args() |
| 167 | |
| 168 | if args.quiet: |
| 169 | import logging |
| 170 | logging.disable(logging.CRITICAL) |
| 171 | |
| 172 | asyncio.run(run(args.repo_id, args.dry_run)) |
| 173 | |
| 174 | |
| 175 | if __name__ == "__main__": |
| 176 | main() |
File History
1 commit
sha256:35d76015db2541686c33edd44343ea2d9f751325b4a5556cc9c4c9c0f84edbbe
chore: bump version to 0.2.0rc12
Sonnet 4.6
patch
6 days ago