gabriel / musehub public
backfill_move_to_address.py python
176 lines 6.3 KB
Raw
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago
1 """Backfill to_address into DELETE rows that are move-source counterparts.
2
3 When backfill_history_from_snapshots previously ran, move-source paths were
4 not recorded at all (the delete was suppressed). Now they are recorded with
5 op_payload.to_address pointing to the new path.
6
7 This script finds existing DELETE rows that are missing to_address and
8 re-derives it by matching the corresponding MOVE row in the same commit
9 (same repo, same commit_id, move.op_payload.from_address == delete.address).
10
11 Usage:
12 docker exec musehub python3 /app/deploy/backfill_move_to_address.py --dry-run
13 docker exec musehub python3 /app/deploy/backfill_move_to_address.py
14 docker exec musehub python3 /app/deploy/backfill_move_to_address.py --repo-id <id>
15 """
16 from __future__ import annotations
17
18 import argparse
19 import asyncio
20 import time
21
22 import sqlalchemy as sa
23 from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
24 from sqlalchemy.orm import sessionmaker
25
26 from musehub.db.database import get_database_url
27 from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
28 from musehub.db.musehub_repo_models import MusehubCommit
29
30
31 async def run(repo_id: str | None, dry_run: bool) -> None:
32 engine = create_async_engine(get_database_url(), echo=False)
33 async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
34
35 async with async_session() as session:
36 t0 = time.monotonic()
37 count = await backfill_move_to_address(session, repo_id=repo_id, dry_run=dry_run)
38 if not dry_run:
39 await session.commit()
40 elapsed = time.monotonic() - t0
41
42 verb = "Would update" if dry_run else "Updated"
43 scope = f" for repo {repo_id}" if repo_id else " across all repos"
44 print(f"{verb} {count} DELETE rows with to_address{scope} in {elapsed:.1f}s")
45
46
47 async def backfill_move_to_address(
48 session: AsyncSession,
49 repo_id: str | None = None,
50 *,
51 dry_run: bool = False,
52 ) -> int:
53 """For each MOVE row, find its corresponding DELETE row (same commit, same
54 repo, delete.address == move.op_payload.from_address) and write
55 to_address into the DELETE's op_payload.
56
57 Skips DELETE rows that already have to_address set.
58 Also creates DELETE rows that are completely missing (old backfill run
59 suppressed them entirely).
60 """
61 she = MusehubSymbolHistoryEntry
62
63 # Load all rows that carry from_address in op_payload — covers both
64 # snapshot-diff MOVE rows and structured_delta PATCH/move rows.
65 ref_q = sa.select(she).where(
66 she.op_payload["from_address"].as_string() != None # noqa: E711
67 )
68 if repo_id is not None:
69 ref_q = ref_q.where(she.repo_id == repo_id)
70 ref_rows = (await session.execute(ref_q)).scalars().all()
71
72 if not ref_rows:
73 return 0
74
75 # Build (repo_id, commit_id, from_address) → new_address map
76 move_map: dict[tuple[str, str, str], str] = {}
77 for row in ref_rows:
78 from_addr = (row.op_payload or {}).get("from_address")
79 if from_addr:
80 move_map[(row.repo_id, row.commit_id, from_addr)] = row.address
81
82 if not move_map:
83 return 0
84
85 # Load existing DELETE rows for those (repo_id, commit_id, address) keys
86 # to determine which need updating vs. creating.
87 from_addrs = list({k[2] for k in move_map})
88 existing_q = sa.select(she).where(
89 she.op == "delete",
90 she.address.in_(from_addrs),
91 )
92 if repo_id is not None:
93 existing_q = existing_q.where(she.repo_id == repo_id)
94 existing_rows = (await session.execute(existing_q)).scalars().all()
95
96 existing_map: dict[tuple[str, str, str], she] = { # type: ignore[valid-type]
97 (r.repo_id, r.commit_id, r.address): r for r in existing_rows
98 }
99
100 # Also load commit metadata for creating missing rows
101 commit_ids = list({k[1] for k in move_map})
102 commit_q = sa.select(MusehubCommit).where(
103 MusehubCommit.commit_id.in_(commit_ids)
104 )
105 commit_map = {c.commit_id: c for c in (await session.execute(commit_q)).scalars().all()}
106
107 updated = 0
108 for (r_id, c_id, from_addr), to_addr in move_map.items():
109 key = (r_id, c_id, from_addr)
110 existing = existing_map.get(key)
111
112 if existing is not None:
113 if (existing.op_payload or {}).get("to_address"):
114 continue # already set
115 if not dry_run:
116 import json as _json
117 full_payload = dict(existing.op_payload or {})
118 full_payload["to_address"] = to_addr
119 await session.execute(
120 sa.text(
121 "UPDATE musehub_symbol_history_entries"
122 " SET op_payload = CAST(:payload AS json)"
123 " WHERE repo_id = :repo_id"
124 " AND commit_id = :commit_id"
125 " AND address = :address"
126 " AND op = 'delete'"
127 ),
128 {
129 "payload": _json.dumps(full_payload),
130 "repo_id": r_id,
131 "commit_id": c_id,
132 "address": from_addr,
133 },
134 )
135 updated += 1
136 else:
137 # Row missing entirely — create it
138 commit = commit_map.get(c_id)
139 if commit is None:
140 continue
141 payload = {
142 "inferred_from": "snapshot_diff",
143 "to_address": to_addr,
144 }
145 if not dry_run:
146 session.add(MusehubSymbolHistoryEntry(
147 repo_id=r_id,
148 address=from_addr,
149 commit_id=c_id,
150 op="delete",
151 op_payload=payload,
152 content_id=None,
153 committed_at=commit.timestamp,
154 author=commit.author or "",
155 ))
156 updated += 1
157
158 return updated
159
160
161 def main() -> None:
162 parser = argparse.ArgumentParser(description=__doc__)
163 parser.add_argument("--dry-run", action="store_true")
164 parser.add_argument("--repo-id", default=None)
165 parser.add_argument("-q", "--quiet", action="store_true")
166 args = parser.parse_args()
167
168 if args.quiet:
169 import logging
170 logging.disable(logging.CRITICAL)
171
172 asyncio.run(run(args.repo_id, args.dry_run))
173
174
175 if __name__ == "__main__":
176 main()
File History 1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago