"""Backfill to_address into DELETE rows that are move-source counterparts. When backfill_history_from_snapshots previously ran, move-source paths were not recorded at all (the delete was suppressed). Now they are recorded with op_payload.to_address pointing to the new path. This script finds existing DELETE rows that are missing to_address and re-derives it by matching the corresponding MOVE row in the same commit (same repo, same commit_id, move.op_payload.from_address == delete.address). Usage: docker exec musehub python3 /app/deploy/backfill_move_to_address.py --dry-run docker exec musehub python3 /app/deploy/backfill_move_to_address.py docker exec musehub python3 /app/deploy/backfill_move_to_address.py --repo-id """ from __future__ import annotations import argparse import asyncio import time import sqlalchemy as sa from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from musehub.db.database import get_database_url from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit async def run(repo_id: str | None, dry_run: bool) -> None: engine = create_async_engine(get_database_url(), echo=False) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with async_session() as session: t0 = time.monotonic() count = await backfill_move_to_address(session, repo_id=repo_id, dry_run=dry_run) if not dry_run: await session.commit() elapsed = time.monotonic() - t0 verb = "Would update" if dry_run else "Updated" scope = f" for repo {repo_id}" if repo_id else " across all repos" print(f"{verb} {count} DELETE rows with to_address{scope} in {elapsed:.1f}s") async def backfill_move_to_address( session: AsyncSession, repo_id: str | None = None, *, dry_run: bool = False, ) -> int: """For each MOVE row, find its corresponding DELETE row (same commit, same repo, delete.address == move.op_payload.from_address) and write to_address into the DELETE's op_payload. Skips DELETE rows that already have to_address set. Also creates DELETE rows that are completely missing (old backfill run suppressed them entirely). """ she = MusehubSymbolHistoryEntry # Load all rows that carry from_address in op_payload — covers both # snapshot-diff MOVE rows and structured_delta PATCH/move rows. ref_q = sa.select(she).where( she.op_payload["from_address"].as_string() != None # noqa: E711 ) if repo_id is not None: ref_q = ref_q.where(she.repo_id == repo_id) ref_rows = (await session.execute(ref_q)).scalars().all() if not ref_rows: return 0 # Build (repo_id, commit_id, from_address) → new_address map move_map: dict[tuple[str, str, str], str] = {} for row in ref_rows: from_addr = (row.op_payload or {}).get("from_address") if from_addr: move_map[(row.repo_id, row.commit_id, from_addr)] = row.address if not move_map: return 0 # Load existing DELETE rows for those (repo_id, commit_id, address) keys # to determine which need updating vs. creating. from_addrs = list({k[2] for k in move_map}) existing_q = sa.select(she).where( she.op == "delete", she.address.in_(from_addrs), ) if repo_id is not None: existing_q = existing_q.where(she.repo_id == repo_id) existing_rows = (await session.execute(existing_q)).scalars().all() existing_map: dict[tuple[str, str, str], she] = { # type: ignore[valid-type] (r.repo_id, r.commit_id, r.address): r for r in existing_rows } # Also load commit metadata for creating missing rows commit_ids = list({k[1] for k in move_map}) commit_q = sa.select(MusehubCommit).where( MusehubCommit.commit_id.in_(commit_ids) ) commit_map = {c.commit_id: c for c in (await session.execute(commit_q)).scalars().all()} updated = 0 for (r_id, c_id, from_addr), to_addr in move_map.items(): key = (r_id, c_id, from_addr) existing = existing_map.get(key) if existing is not None: if (existing.op_payload or {}).get("to_address"): continue # already set if not dry_run: import json as _json full_payload = dict(existing.op_payload or {}) full_payload["to_address"] = to_addr await session.execute( sa.text( "UPDATE musehub_symbol_history_entries" " SET op_payload = CAST(:payload AS json)" " WHERE repo_id = :repo_id" " AND commit_id = :commit_id" " AND address = :address" " AND op = 'delete'" ), { "payload": _json.dumps(full_payload), "repo_id": r_id, "commit_id": c_id, "address": from_addr, }, ) updated += 1 else: # Row missing entirely — create it commit = commit_map.get(c_id) if commit is None: continue payload = { "inferred_from": "snapshot_diff", "to_address": to_addr, } if not dry_run: session.add(MusehubSymbolHistoryEntry( repo_id=r_id, address=from_addr, commit_id=c_id, op="delete", op_payload=payload, content_id=None, committed_at=commit.timestamp, author=commit.author or "", )) updated += 1 return updated def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--repo-id", default=None) parser.add_argument("-q", "--quiet", action="store_true") args = parser.parse_args() if args.quiet: import logging logging.disable(logging.CRITICAL) asyncio.run(run(args.repo_id, args.dry_run)) if __name__ == "__main__": main()