gabriel / musehub public
musehub_symbol_indexer.py python
1,683 lines 60.7 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """Symbol index builder — materializes code intelligence for a repo.
2
3 Reads ``structured_delta`` from each commit row and builds three normalized indexes:
4
5 ``musehub_symbol_history_entries``
6 One row per (repo_id, address, commit_id). Powers: symbol search,
7 history, provenance, blame.
8
9 ``musehub_symbol_intel``
10 One row per (repo_id, address) with pre-computed metrics: churn,
11 blast, gravity, weekly activity. Powers: hotspots, gravity ranking,
12 per-symbol intel panels.
13
14 ``musehub_hash_occurrence_entries``
15 One row per (content_id, repo_id, address). Powers: clone detection.
16
17 Two aggregate blobs remain in ``musehub_intel_results`` (small, accessed as a unit):
18
19 code.intel_snapshot — full IntelSnapshot (health score, alerts, panels)
20 code.intel_summary — condensed summary for repo home page
21
22 ``build_symbol_index`` is the public entry point. It returns a list of
23 ``(intel_type, data_dict)`` tuples for the two aggregate blobs only.
24 Normalized rows are written directly inside the function via upsert.
25
26 Boundary rules:
27 - Must NOT import state stores, SSE queues, or LLM clients.
28 - May import ORM models from musehub.db domain-specific modules.
29 - May import Pydantic models from musehub.models.musehub.
30 """
31
32 import logging
33 from collections import defaultdict
34 from datetime import datetime, timedelta, timezone
35 from typing import TypedDict
36
37 import sqlalchemy as sa
38 from sqlalchemy import select
39 from sqlalchemy.dialects.postgresql import insert as pg_insert
40 from sqlalchemy.ext.asyncio import AsyncSession
41
42 from muse.core.types import short_id
43 from musehub.db.musehub_intel_models import (
44 MusehubHashOccurrenceEntry,
45 MusehubIntelResult,
46 MusehubSymbolCoupling,
47 MusehubSymbolHistoryEntry,
48 MusehubSymbolIntel,
49 MusehubSymbolVitals,
50 )
51 from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubSnapshot
52 from musehub.types.json_types import JSONObject, JSONValue, StrDict
53
54 type SymbolHistory = dict[str, list[JSONObject]]
55 type HashOccurrence = dict[str, list[str]]
56 type _CountMap = dict[str, int]
57 type _KindMap = dict[str, str] # address → symbol_kind
58 type ManifestByCommit = dict[str, StrDict] # commit_id → {path: object_id}
59
60 _KNOWN_KINDS = frozenset({
61 "function", "async_function", "method", "async_method", "class",
62 "variable", "section", "import",
63 })
64
65 class SymbolIntel(TypedDict):
66 """Per-symbol metrics snapshot stored in musehub_symbol_intel."""
67
68 churn: int
69 churn_30d: int
70 churn_90d: int
71 blast: int
72 blast_direct: int
73 blast_cross: int
74 blast_top: list[str]
75 last_changed: str
76 last_author: str
77 last_op: str
78 last_commit_id: str
79 author_count: int
80 gravity: float
81 weekly: list[int]
82
83 type SymbolIntelMap = dict[str, SymbolIntel]
84
85 logger = logging.getLogger(__name__)
86
87 # ---------------------------------------------------------------------------
88 # Internal helpers
89 # ---------------------------------------------------------------------------
90
91 def _utc_now() -> datetime:
92 return datetime.now(tz=timezone.utc)
93
94 def _extract_ops(structured_delta: JSONObject | None) -> list[JSONObject]:
95 """Pull a flat list of DomainOps from a commit's structured_delta blob."""
96 if not isinstance(structured_delta, dict):
97 return []
98 top_ops = structured_delta.get("ops")
99 if not isinstance(top_ops, list):
100 return []
101
102 flat: list[JSONObject] = []
103 for op in top_ops:
104 if not isinstance(op, dict) or not op.get("address"):
105 continue
106 flat.append(op)
107 child_ops = op.get("child_ops")
108 if isinstance(child_ops, list):
109 for child in child_ops:
110 if isinstance(child, dict) and child.get("address"):
111 flat.append(child)
112 return flat
113
114 def _extract_kind(op: JSONObject) -> str | None:
115 """Extract symbol kind from an op's summary strings.
116
117 new_summary (replace ops): "async_function name (changed) L0-5" → first word
118 content_summary (insert/delete): "added async_function name L0-5" → second word
119 """
120 summary: str = str(op.get("new_summary") or op.get("content_summary") or "")
121 if not summary:
122 return None
123 words = summary.split()
124 if not words:
125 return None
126 candidate = words[0]
127 if candidate in ("added", "removed") and len(words) > 1:
128 candidate = words[1]
129 return candidate if candidate in _KNOWN_KINDS else None
130
131 _PAYLOAD_STRIP = frozenset({"op", "address", "child_ops"})
132
133 def _op_payload(op: JSONObject) -> JSONObject:
134 """Strip indexing keys from a DomainOp dict; the remainder is the payload."""
135 return {k: v for k, v in op.items() if k not in _PAYLOAD_STRIP}
136
137 def _compute_symbol_intel(
138 symbol_history: SymbolHistory,
139 ) -> SymbolIntelMap:
140 """Compute per-symbol intel stats from the merged symbol_history."""
141 now = _utc_now()
142 cutoff_30d = now - timedelta(days=30)
143 cutoff_90d = now - timedelta(days=90)
144 week_seconds = 7 * 24 * 3600
145
146 commit_to_addrs: defaultdict[str, list[str]] = defaultdict(list)
147 for addr, ops in symbol_history.items():
148 sym_part = addr.split("::")[1] if "::" in addr else ""
149 if not sym_part or sym_part == "import":
150 continue
151 for op in ops:
152 cid = op.get("commit_id")
153 if cid:
154 commit_to_addrs[cid].append(addr)
155
156 total_churn = sum(len(ops) for ops in symbol_history.values())
157 result: SymbolIntelMap = {}
158
159 for addr, ops in symbol_history.items():
160 churn = len(ops)
161 churn_30d = 0
162 churn_90d = 0
163 last_changed: str = ""
164 last_author: str = ""
165 last_op: str = ""
166 last_commit_id: str = ""
167 last_ts: datetime | None = None
168 authors: set[str] = set()
169
170 for op in ops:
171 raw_ts: str = str(op.get("committed_at") or "")
172 ts: datetime | None = None
173 try:
174 ts = datetime.fromisoformat(raw_ts) if raw_ts else None
175 except ValueError:
176 pass
177 if ts is not None:
178 if ts.tzinfo is None:
179 ts = ts.replace(tzinfo=timezone.utc)
180 if ts >= cutoff_30d:
181 churn_30d += 1
182 if ts >= cutoff_90d:
183 churn_90d += 1
184 if last_ts is None or ts > last_ts:
185 last_ts = ts
186 last_changed = raw_ts
187 last_author = str(op.get("author") or "")
188 last_op = str(op.get("op") or "")
189 last_commit_id = str(op.get("commit_id") or "")
190 _author: str = str(op.get("author") or "")
191 if _author:
192 authors.add(_author)
193
194 my_file = addr.split("::")[0] if "::" in addr else addr
195 co_direct: _CountMap = defaultdict(int)
196 co_cross: _CountMap = defaultdict(int)
197 for op in ops:
198 cid = op.get("commit_id")
199 if not cid:
200 continue
201 for co_addr in commit_to_addrs.get(str(cid), []):
202 if co_addr == addr:
203 continue
204 co_file = co_addr.split("::")[0] if "::" in co_addr else co_addr
205 if co_file == my_file:
206 co_direct[co_addr] += 1
207 else:
208 co_cross[co_addr] += 1
209
210 blast_direct = len(co_direct)
211 blast_cross = len(co_cross)
212 blast = blast_direct + blast_cross
213 co_cross_by_file: _CountMap = defaultdict(int)
214 for co_addr, cnt in co_cross.items():
215 co_file_path = co_addr.split("::")[0] if "::" in co_addr else co_addr
216 co_cross_by_file[co_file_path] += cnt
217 blast_top = sorted(co_cross_by_file, key=lambda f: co_cross_by_file[f], reverse=True)[:5]
218
219 weekly: list[int] = [0] * 12
220 for op in ops:
221 raw_ts = str(op.get("committed_at") or "")
222 try:
223 ts = datetime.fromisoformat(raw_ts) if raw_ts else None
224 except ValueError:
225 ts = None
226 if ts is None:
227 continue
228 if ts.tzinfo is None:
229 ts = ts.replace(tzinfo=timezone.utc)
230 delta_secs = (now - ts).total_seconds()
231 if delta_secs < 0:
232 continue
233 week_idx = int(delta_secs // week_seconds)
234 if week_idx < 12:
235 weekly[week_idx] += 1
236
237 gravity = churn / total_churn if total_churn > 0 else 0.0
238
239 result[addr] = {
240 "churn": churn,
241 "churn_30d": churn_30d,
242 "churn_90d": churn_90d,
243 "blast": blast,
244 "blast_direct": blast_direct,
245 "blast_cross": blast_cross,
246 "blast_top": blast_top,
247 "last_changed": last_changed,
248 "last_author": last_author,
249 "last_op": last_op,
250 "last_commit_id": last_commit_id,
251 "author_count": len(authors),
252 "gravity": round(gravity, 6),
253 "weekly": weekly,
254 }
255
256 return result
257
258 # ---------------------------------------------------------------------------
259 # Commit walking
260 # ---------------------------------------------------------------------------
261
262 def _sort_key(c: MusehubCommit) -> datetime:
263 ts = c.timestamp or datetime.min
264 if ts.tzinfo is None:
265 ts = ts.replace(tzinfo=timezone.utc)
266 return ts
267
268 async def _walk_new_commits(
269 session: AsyncSession,
270 repo_id: str,
271 new_head: str,
272 stop_at: str | None,
273 ) -> list[MusehubCommit]:
274 """Return commits reachable from new_head that are NOT reachable from stop_at."""
275 from musehub.graph.walk import walk_dag_async
276
277 if stop_at is None:
278 result = await session.execute(
279 select(MusehubCommit)
280 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
281 .where(MusehubCommitRef.repo_id == repo_id)
282 )
283 all_commits_list: list[MusehubCommit] = list(result.scalars())
284 by_id = {c.commit_id: c for c in all_commits_list}
285 ordered: list[MusehubCommit] = []
286
287 async def _adj_mem(cid: str) -> list[str]:
288 row = by_id.get(cid)
289 if row is None:
290 return []
291 ordered.append(row)
292 return row.parent_ids or []
293
294 async for _ in walk_dag_async(new_head, _adj_mem, order="dfs"):
295 pass
296
297 return sorted(ordered, key=_sort_key)
298
299 # Incremental: load all commits for the repo into memory first (one query),
300 # then walk the DAG in memory stopping at stop_at. Avoids N+1 queries.
301 result = await session.execute(
302 select(MusehubCommit)
303 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
304 .where(MusehubCommitRef.repo_id == repo_id)
305 )
306 all_commits_by_id = {c.commit_id: c for c in result.scalars()}
307 new_commits: list[MusehubCommit] = []
308
309 async def _adj_mem_inc(cid: str) -> list[str]:
310 row = all_commits_by_id.get(cid)
311 if row is None:
312 return []
313 new_commits.append(row)
314 return row.parent_ids or []
315
316 async for _ in walk_dag_async(new_head, _adj_mem_inc, order="dfs", exclude={stop_at}):
317 pass
318
319 return sorted(new_commits, key=_sort_key)
320
321 async def _get_prior_ref(session: AsyncSession, repo_id: str) -> str | None:
322 """Return the ref from the most recently built intel_summary, or None."""
323 row = (await session.execute(
324 select(MusehubIntelResult).where(
325 MusehubIntelResult.repo_id == repo_id,
326 MusehubIntelResult.intel_type == "code.intel_summary",
327 )
328 )).scalar_one_or_none()
329 return row.ref if row is not None else None
330
331 # ---------------------------------------------------------------------------
332 # Normalized write helpers
333 # ---------------------------------------------------------------------------
334
335 async def _upsert_history_entries(
336 session: AsyncSession,
337 repo_id: str,
338 commits: list[MusehubCommit],
339 symbol_history: SymbolHistory,
340 ) -> None:
341 """Insert new (repo_id, address, commit_id) rows — skip duplicates."""
342 rows = []
343 for commit in commits:
344 ops = _extract_ops(commit.structured_delta)
345 committed_at = commit.timestamp or _utc_now()
346 if committed_at.tzinfo is None:
347 committed_at = committed_at.replace(tzinfo=timezone.utc)
348 for op in ops:
349 address: str = op["address"]
350 op_type: str = op.get("op", "")
351 content_id: str = op.get("new_content_id") or op.get("content_id") or ""
352 rows.append({
353 "repo_id": repo_id,
354 "address": address,
355 "commit_id": commit.commit_id,
356 "committed_at": committed_at,
357 "author": commit.author,
358 "op": op_type,
359 "op_payload": _op_payload(op),
360 "content_id": content_id or None,
361 "message": (commit.message or "")[:2000] or None,
362 "commit_branch": commit.branch or None,
363 })
364 if not rows:
365 return
366 # asyncpg caps query parameters at 32767; 10 columns per row → max 3276 rows/batch.
367 _BATCH = 3000
368 for i in range(0, len(rows), _BATCH):
369 await session.execute(
370 pg_insert(MusehubSymbolHistoryEntry)
371 .values(rows[i : i + _BATCH])
372 .on_conflict_do_nothing()
373 )
374
375 async def _upsert_symbol_intel(
376 session: AsyncSession,
377 repo_id: str,
378 intel: SymbolIntelMap,
379 kind_map: _KindMap | None = None,
380 ) -> None:
381 """Upsert per-symbol intel rows — replace all metric columns on conflict.
382
383 When kind_map is provided, symbol_kind is included and never overwrites
384 an existing non-null kind (COALESCE keeps the existing value if the new
385 value is NULL).
386 """
387 if not intel:
388 return
389 rows = []
390 for address, metrics in intel.items():
391 last_changed_dt: datetime | None = None
392 if metrics["last_changed"]:
393 try:
394 last_changed_dt = datetime.fromisoformat(metrics["last_changed"])
395 if last_changed_dt.tzinfo is None:
396 last_changed_dt = last_changed_dt.replace(tzinfo=timezone.utc)
397 except ValueError:
398 pass
399 row: JSONObject = {
400 "repo_id": repo_id,
401 "address": address,
402 "churn": metrics["churn"],
403 "churn_30d": metrics["churn_30d"],
404 "churn_90d": metrics["churn_90d"],
405 "blast": metrics["blast"],
406 "blast_direct": metrics["blast_direct"],
407 "blast_cross": metrics["blast_cross"],
408 "blast_top": metrics["blast_top"],
409 "last_changed": last_changed_dt,
410 "last_author": metrics["last_author"] or None,
411 "op": metrics.get("last_op") or None,
412 "last_commit_id": metrics.get("last_commit_id") or None,
413 "author_count": metrics["author_count"],
414 "gravity": metrics["gravity"],
415 "weekly": metrics["weekly"],
416 "symbol_kind": (kind_map or {}).get(address),
417 }
418 rows.append(row)
419 # asyncpg caps query parameters at 32767; 17 columns per row → max 1927 rows/batch.
420 _BATCH = 1800
421 for i in range(0, len(rows), _BATCH):
422 stmt = pg_insert(MusehubSymbolIntel).values(rows[i : i + _BATCH])
423 from sqlalchemy import func as sa_func
424 await session.execute(
425 stmt.on_conflict_do_update(
426 index_elements=["repo_id", "address"],
427 set_={
428 "churn": stmt.excluded.churn,
429 "churn_30d": stmt.excluded.churn_30d,
430 "churn_90d": stmt.excluded.churn_90d,
431 "blast": stmt.excluded.blast,
432 "blast_direct": stmt.excluded.blast_direct,
433 "blast_cross": stmt.excluded.blast_cross,
434 "blast_top": stmt.excluded.blast_top,
435 "last_changed": stmt.excluded.last_changed,
436 "last_author": stmt.excluded.last_author,
437 "author_count": stmt.excluded.author_count,
438 "gravity": stmt.excluded.gravity,
439 "weekly": stmt.excluded.weekly,
440 "op": stmt.excluded.op,
441 "last_commit_id": stmt.excluded.last_commit_id,
442 "symbol_kind": sa_func.coalesce(
443 stmt.excluded.symbol_kind,
444 MusehubSymbolIntel.__table__.c.symbol_kind,
445 ),
446 },
447 )
448 )
449
450 async def _upsert_hash_occurrences(
451 session: AsyncSession,
452 repo_id: str,
453 hash_occurrence: HashOccurrence,
454 ) -> None:
455 """Upsert hash occurrence rows — skip duplicates."""
456 if not hash_occurrence:
457 return
458 rows = [
459 {"content_id": content_id, "repo_id": repo_id, "address": address}
460 for content_id, addresses in hash_occurrence.items()
461 for address in addresses
462 ]
463 if not rows:
464 return
465 # asyncpg caps query parameters at 32767; 3 columns per row → max 10922 rows/batch.
466 _BATCH = 10000
467 for i in range(0, len(rows), _BATCH):
468 await session.execute(
469 pg_insert(MusehubHashOccurrenceEntry)
470 .values(rows[i : i + _BATCH])
471 .on_conflict_do_nothing()
472 )
473
474 async def _upsert_symbol_vitals(
475 session: AsyncSession,
476 repo_id: str,
477 full_history: "SymbolHistory",
478 ) -> None:
479 """Upsert MusehubSymbolVitals from the full in-memory history."""
480 _op_bucket = {
481 "insert": "add", "add": "add",
482 "replace": "modify", "patch": "modify", "mutate": "modify", "modify": "modify",
483 "delete": "delete",
484 "move": "move", "directory_rename": "move", "rename": "move",
485 }
486 rows = []
487 for address, entries in full_history.items():
488 if not entries:
489 continue
490 sorted_entries = sorted(entries, key=lambda e: e.get("committed_at") or "")
491 first_ts_str = sorted_entries[0].get("committed_at") or ""
492 first_introduced: datetime | None = None
493 if first_ts_str:
494 try:
495 first_introduced = datetime.fromisoformat(first_ts_str)
496 if first_introduced.tzinfo is None:
497 first_introduced = first_introduced.replace(tzinfo=timezone.utc)
498 except ValueError:
499 pass
500 version_ids: set[str] = set()
501 op_add = op_modify = op_delete = op_move = 0
502 for e in entries:
503 if e.get("content_id"):
504 version_ids.add(e["content_id"])
505 bucket = _op_bucket.get(e.get("op", ""), "modify")
506 if bucket == "add":
507 op_add += 1
508 elif bucket == "modify":
509 op_modify += 1
510 elif bucket == "delete":
511 op_delete += 1
512 else:
513 op_move += 1
514 rows.append({
515 "repo_id": repo_id,
516 "address": address,
517 "first_introduced": first_introduced,
518 "change_count": len(entries),
519 "version_count": len(version_ids),
520 "op_add": op_add,
521 "op_modify": op_modify,
522 "op_delete": op_delete,
523 "op_move": op_move,
524 })
525 if not rows:
526 return
527 _BATCH = 2000
528 for i in range(0, len(rows), _BATCH):
529 stmt = pg_insert(MusehubSymbolVitals).values(rows[i : i + _BATCH])
530 await session.execute(
531 stmt.on_conflict_do_update(
532 index_elements=["repo_id", "address"],
533 set_={
534 "first_introduced": stmt.excluded.first_introduced,
535 "change_count": stmt.excluded.change_count,
536 "version_count": stmt.excluded.version_count,
537 "op_add": stmt.excluded.op_add,
538 "op_modify": stmt.excluded.op_modify,
539 "op_delete": stmt.excluded.op_delete,
540 "op_move": stmt.excluded.op_move,
541 },
542 )
543 )
544
545
546 async def _upsert_symbol_coupling(
547 session: AsyncSession,
548 repo_id: str,
549 full_history: "SymbolHistory",
550 ) -> None:
551 """Upsert MusehubSymbolCoupling from the full in-memory history.
552
553 Builds a commit→[addresses] map, then counts co-occurrences. Symmetric:
554 writes both (A→B) and (B→A) rows.
555 """
556 # commit_id → set of addresses that changed in it
557 commit_addresses: dict[str, set[str]] = {}
558 for address, entries in full_history.items():
559 for e in entries:
560 cid = e.get("commit_id")
561 if cid:
562 commit_addresses.setdefault(cid, set()).add(address)
563
564 # count shared commits between every pair
565 pair_counts: dict[tuple[str, str], int] = {}
566 for addresses in commit_addresses.values():
567 addr_list = sorted(addresses)
568 for i, a in enumerate(addr_list):
569 for b in addr_list[i + 1:]:
570 pair_counts[(a, b)] = pair_counts.get((a, b), 0) + 1
571
572 if not pair_counts:
573 return
574
575 # Suppress file-level addresses when symbol-level exists for the same file
576 symbol_files = {a.split("::")[0] for a in full_history if "::" in a}
577
578 rows = []
579 for (a, b), count in pair_counts.items():
580 a_is_bare = "::" not in a and a in symbol_files
581 b_is_bare = "::" not in b and b in symbol_files
582 if a_is_bare or b_is_bare:
583 continue
584 rows.append({"repo_id": repo_id, "address": a, "co_address": b, "shared_commits": count})
585 rows.append({"repo_id": repo_id, "address": b, "co_address": a, "shared_commits": count})
586
587 if not rows:
588 return
589
590 _BATCH = 4000
591 for i in range(0, len(rows), _BATCH):
592 stmt = pg_insert(MusehubSymbolCoupling).values(rows[i : i + _BATCH])
593 await session.execute(
594 stmt.on_conflict_do_update(
595 index_elements=["repo_id", "address", "co_address"],
596 set_={"shared_commits": stmt.excluded.shared_commits},
597 )
598 )
599
600 async def _update_coupling_counts(session: AsyncSession, repo_id: str) -> None:
601 """Sync coupling_count on musehub_symbol_vitals from musehub_symbol_coupling.
602
603 Runs a single UPDATE … FROM subquery so the list page can read coupling
604 scores with a JOIN — no per-row COUNT at request time.
605 """
606 await session.execute(
607 sa.text("""
608 UPDATE musehub_symbol_vitals v
609 SET coupling_count = sub.cnt
610 FROM (
611 SELECT address, COUNT(*) AS cnt
612 FROM musehub_symbol_coupling
613 WHERE repo_id = :repo_id
614 GROUP BY address
615 ) sub
616 WHERE v.repo_id = :repo_id
617 AND v.address = sub.address
618 """),
619 {"repo_id": repo_id},
620 )
621 # Zero out any symbols that have no coupling rows (idempotent after removal)
622 await session.execute(
623 sa.text("""
624 UPDATE musehub_symbol_vitals
625 SET coupling_count = 0
626 WHERE repo_id = :repo_id
627 AND address NOT IN (
628 SELECT DISTINCT address
629 FROM musehub_symbol_coupling
630 WHERE repo_id = :repo_id
631 )
632 """),
633 {"repo_id": repo_id},
634 )
635
636
637 async def backfill_coupling(session: AsyncSession, repo_id: str, *, min_shared: int = 2) -> int:
638 """Compute symbol coupling entirely in SQL and update vitals.coupling_count.
639
640 Avoids the O(n²) Python loop in _upsert_symbol_coupling by letting Postgres
641 do the self-join on musehub_symbol_history_entries with its indexes.
642 Safe to run multiple times — idempotent via ON CONFLICT DO UPDATE.
643
644 Returns the number of coupling pairs inserted/updated.
645 """
646 result = await session.execute(
647 sa.text("""
648 INSERT INTO musehub_symbol_coupling (repo_id, address, co_address, shared_commits)
649 SELECT
650 h1.repo_id,
651 h1.address,
652 h2.address,
653 COUNT(*) AS shared_commits
654 FROM musehub_symbol_history_entries h1
655 JOIN musehub_symbol_history_entries h2
656 ON h1.repo_id = h2.repo_id
657 AND h1.commit_id = h2.commit_id
658 AND h1.address < h2.address
659 WHERE h1.repo_id = :repo_id
660 GROUP BY h1.repo_id, h1.address, h2.address
661 HAVING COUNT(*) >= :min_shared
662 ON CONFLICT (repo_id, address, co_address) DO UPDATE
663 SET shared_commits = EXCLUDED.shared_commits
664 """),
665 {"repo_id": repo_id, "min_shared": min_shared},
666 )
667 pairs = result.rowcount or 0
668
669 # Mirror symmetric rows (A→B written above; now write B→A)
670 await session.execute(
671 sa.text("""
672 INSERT INTO musehub_symbol_coupling (repo_id, address, co_address, shared_commits)
673 SELECT repo_id, co_address, address, shared_commits
674 FROM musehub_symbol_coupling
675 WHERE repo_id = :repo_id
676 ON CONFLICT (repo_id, address, co_address) DO UPDATE
677 SET shared_commits = EXCLUDED.shared_commits
678 """),
679 {"repo_id": repo_id},
680 )
681
682 await _update_coupling_counts(session, repo_id)
683 logger.info("backfill_coupling: %d pairs for repo %s", pairs, repo_id)
684 return pairs
685
686
687 # ---------------------------------------------------------------------------
688 # Public entry point
689 # ---------------------------------------------------------------------------
690
691 async def build_symbol_index(
692 session: AsyncSession,
693 repo_id: str,
694 head_commit_id: str,
695 ) -> list[tuple[str, dict]]:
696 """Build or incrementally update the code intelligence index for repo_id.
697
698 Writes normalized rows directly to:
699 musehub_symbol_history_entries
700 musehub_symbol_intel
701 musehub_hash_occurrence_entries
702
703 Returns (intel_type, data_dict) tuples for the two aggregate blobs only:
704 code.intel_summary
705 code.intel_snapshot
706
707 On first push: walks all commits (full build).
708 On subsequent pushes: walks only new commits since the prior ref.
709 """
710 import time as _time
711 _t0 = _time.monotonic()
712
713 prior_ref = await _get_prior_ref(session, repo_id)
714 logger.info("[intel.code] START repo=%s head=%s prior_ref=%s", short_id(repo_id), short_id(head_commit_id), short_id(prior_ref) if prior_ref else "none")
715
716 if prior_ref == head_commit_id:
717 logger.info("[intel.code] already current — skipping")
718 return []
719
720 commits = await _walk_new_commits(session, repo_id, head_commit_id, prior_ref)
721 logger.info("[intel.code] walk_commits=%.2fs commits=%d", _time.monotonic() - _t0, len(commits))
722
723 if not commits:
724 logger.info("[intel.code] no new commits — skipping")
725 return []
726
727 symbol_history: SymbolHistory = {}
728 hash_occurrence: HashOccurrence = {}
729 kind_map: _KindMap = {}
730 op_count = 0
731
732 for commit in commits:
733 ops = _extract_ops(commit.structured_delta)
734 committed_at = commit.timestamp.isoformat() if commit.timestamp else ""
735
736 for op in ops:
737 address: str = op["address"]
738 op_type: str = op.get("op", "")
739 content_id: str = op.get("new_content_id") or op.get("content_id") or ""
740
741 if address not in symbol_history:
742 symbol_history[address] = []
743 symbol_history[address].append({
744 "commit_id": commit.commit_id,
745 "committed_at": committed_at,
746 "author": commit.author,
747 "op": op_type,
748 "op_payload": _op_payload(op),
749 "content_id": content_id,
750 })
751
752 kind = _extract_kind(op)
753 if kind:
754 kind_map[address] = kind
755
756 if content_id and op_type in ("insert", "replace", "patch", "mutate"):
757 if content_id not in hash_occurrence:
758 hash_occurrence[content_id] = []
759 if address not in hash_occurrence[content_id]:
760 hash_occurrence[content_id].append(address)
761
762 op_count += 1
763
764 logger.info("[intel.code] build_in_memory=%.2fs symbols=%d ops=%d hashes=%d", _time.monotonic() - _t0, len(symbol_history), op_count, len(hash_occurrence))
765
766 if op_count == 0:
767 logger.info("[intel.code] no symbol ops — skipping (no structured_delta on commits)")
768 return []
769
770 _t1 = _time.monotonic()
771 await _upsert_history_entries(session, repo_id, commits, symbol_history)
772 logger.info("[intel.code] upsert_history=%.2fs", _time.monotonic() - _t1)
773
774 _t1 = _time.monotonic()
775 await _upsert_hash_occurrences(session, repo_id, hash_occurrence)
776 logger.info("[intel.code] upsert_hashes=%.2fs", _time.monotonic() - _t1)
777
778 _t1 = _time.monotonic()
779 full_history = await _load_full_history_from_db(session, repo_id)
780 logger.info("[intel.code] load_full_history=%.2fs entries=%d", _time.monotonic() - _t1, sum(len(v) for v in full_history.values()))
781
782 _t1 = _time.monotonic()
783 intel = _compute_symbol_intel(full_history)
784 logger.info("[intel.code] compute_intel=%.2fs symbols=%d", _time.monotonic() - _t1, len(intel))
785
786 _t1 = _time.monotonic()
787 await _upsert_symbol_intel(session, repo_id, intel, kind_map)
788 logger.info("[intel.code] upsert_symbol_intel=%.2fs", _time.monotonic() - _t1)
789
790 _t1 = _time.monotonic()
791 await _upsert_symbol_vitals(session, repo_id, full_history)
792 logger.info("[intel.code] upsert_symbol_vitals=%.2fs", _time.monotonic() - _t1)
793
794 # Coupling is handled by the separate intel.code.coupling job which has
795 # mass-commit guards and a size cap. Running it inline here OOM-kills the
796 # worker on large repos (O(n²) self-join on symbol_history_entries).
797
798 # Compute aggregate blobs.
799 intel_summary: JSONObject = {}
800 intel_snapshot: JSONObject = {}
801 try:
802 from musehub.services.musehub_intel import compute_intel as _ci
803 snap = _ci(symbol_history=full_history, recent_breaking_changes=[])
804 intel_summary = {
805 "health_score": snap.health_score,
806 "health_label": snap.health_label,
807 "symbol_count": snap.total_symbols,
808 "hotspot_count": snap.alert_hotspot_count,
809 "dead_symbol_count": snap.alert_dead_count,
810 }
811 intel_snapshot = snap.as_dict()
812 logger.info(
813 "✅ Code intel computed for repo %s: %d symbols, %d ops",
814 repo_id, len(intel), op_count,
815 )
816 except Exception as exc:
817 logger.warning("Intel computation failed for repo %s: %s", repo_id, exc)
818
819 logger.info(
820 "✅ Symbol index built for repo %s @ %s: %d symbols, %d new ops",
821 repo_id, head_commit_id, len(full_history), op_count,
822 )
823
824 return [
825 ("code.intel_summary", intel_summary),
826 ("code.intel_snapshot", intel_snapshot),
827 ]
828
829 async def _load_full_history_from_db(
830 session: AsyncSession,
831 repo_id: str,
832 ) -> SymbolHistory:
833 """Load all symbol history entries from the DB into memory for intel computation."""
834 rows = (await session.execute(
835 select(MusehubSymbolHistoryEntry).where(
836 MusehubSymbolHistoryEntry.repo_id == repo_id
837 )
838 )).scalars().all()
839
840 history: SymbolHistory = {}
841 for row in rows:
842 if row.address not in history:
843 history[row.address] = []
844 history[row.address].append({
845 "commit_id": row.commit_id,
846 "committed_at": row.committed_at.isoformat() if row.committed_at else "",
847 "author": row.author or "",
848 "op": row.op,
849 "op_payload": row.op_payload or {},
850 "content_id": row.content_id or "",
851 })
852 return history
853
854 # ---------------------------------------------------------------------------
855 # Backfill helper — populate symbol_kind from commit history
856 # ---------------------------------------------------------------------------
857
858 async def backfill_symbol_kinds(session: AsyncSession, repo_id: str) -> int:
859 """Populate symbol_kind for rows that currently have NULL kind.
860
861 Walks all stored commits for the repo, extracts kind from op summaries,
862 then batch-updates only the NULL-kind rows. Returns the number of rows
863 updated. Safe to call multiple times — skips repos that have no NULL rows.
864 """
865 import sqlalchemy as sa
866
867 null_count: int = (await session.execute(
868 sa.select(sa.func.count()).select_from(MusehubSymbolIntel).where(
869 MusehubSymbolIntel.repo_id == repo_id,
870 MusehubSymbolIntel.symbol_kind.is_(None),
871 )
872 )).scalar_one()
873
874 if null_count == 0:
875 return 0
876
877 all_commits = (await session.execute(
878 select(MusehubCommit)
879 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
880 .where(MusehubCommitRef.repo_id == repo_id)
881 )).scalars().all()
882
883 kind_map: _KindMap = {}
884 for commit in all_commits:
885 for op in _extract_ops(commit.structured_delta):
886 kind = _extract_kind(op)
887 if kind:
888 kind_map[op["address"]] = kind
889
890 if not kind_map:
891 return 0
892
893 # Group addresses by kind to minimize UPDATE statements.
894 by_kind: dict[str, list[str]] = {}
895 for addr, kind in kind_map.items():
896 by_kind.setdefault(kind, []).append(addr)
897
898 _ADDR_BATCH = 1000
899 updated = 0
900 for kind, addresses in by_kind.items():
901 for i in range(0, len(addresses), _ADDR_BATCH):
902 result = await session.execute(
903 sa.update(MusehubSymbolIntel)
904 .where(
905 MusehubSymbolIntel.repo_id == repo_id,
906 MusehubSymbolIntel.address.in_(addresses[i : i + _ADDR_BATCH]),
907 MusehubSymbolIntel.symbol_kind.is_(None),
908 )
909 .values(symbol_kind=kind)
910 )
911 updated += result.rowcount
912
913 logger.info("backfill_symbol_kinds: updated %d rows for repo %s", updated, repo_id)
914 return updated
915
916 async def backfill_intel_fields(
917 session: AsyncSession,
918 repo_id: str,
919 ) -> int:
920 """Backfill op and last_commit_id on musehub_symbol_intel from history entries.
921
922 Two-pass:
923 1. UPDATE existing intel rows with op + last_commit_id from most-recent history entry.
924 2. INSERT minimal intel rows for symbols that have history entries but no intel row
925 (e.g. move-only symbols that pre-date the intel system).
926
927 Safe to run multiple times — idempotent.
928 """
929 update_result = await session.execute(
930 sa.text("""
931 UPDATE musehub_symbol_intel AS i
932 SET
933 op = h.op,
934 last_commit_id = h.commit_id
935 FROM (
936 SELECT DISTINCT ON (repo_id, address)
937 repo_id,
938 address,
939 op,
940 commit_id
941 FROM musehub_symbol_history_entries
942 WHERE repo_id = :repo_id
943 ORDER BY repo_id, address, committed_at DESC
944 ) AS h
945 WHERE i.repo_id = h.repo_id
946 AND i.address = h.address
947 """),
948 {"repo_id": repo_id},
949 )
950 updated: int = update_result.rowcount or 0
951
952 insert_result = await session.execute(
953 sa.text("""
954 INSERT INTO musehub_symbol_intel (
955 repo_id, address,
956 churn, churn_30d, churn_90d,
957 blast, blast_direct, blast_cross, blast_top,
958 last_changed, last_author, author_count,
959 gravity, weekly,
960 last_commit_id, op
961 )
962 SELECT
963 h.repo_id,
964 h.address,
965 COUNT(*) AS churn,
966 COUNT(*) FILTER (WHERE h.committed_at >= NOW() - INTERVAL '30 days') AS churn_30d,
967 COUNT(*) FILTER (WHERE h.committed_at >= NOW() - INTERVAL '90 days') AS churn_90d,
968 0, 0, 0,
969 ARRAY[]::text[],
970 MAX(h.committed_at),
971 (ARRAY_AGG(h.author ORDER BY h.committed_at DESC NULLS LAST))[1],
972 COUNT(DISTINCT h.author),
973 0.0,
974 ARRAY[0,0,0,0,0,0,0,0,0,0,0,0]::integer[],
975 (ARRAY_AGG(h.commit_id ORDER BY h.committed_at DESC NULLS LAST))[1],
976 (ARRAY_AGG(h.op ORDER BY h.committed_at DESC NULLS LAST))[1]
977 FROM musehub_symbol_history_entries h
978 WHERE h.repo_id = :repo_id
979 AND NOT EXISTS (
980 SELECT 1 FROM musehub_symbol_intel i
981 WHERE i.repo_id = h.repo_id AND i.address = h.address
982 )
983 GROUP BY h.repo_id, h.address
984 ON CONFLICT DO NOTHING
985 """),
986 {"repo_id": repo_id},
987 )
988 inserted: int = insert_result.rowcount or 0
989
990 logger.info(
991 "backfill_intel_fields: updated %d, inserted %d rows for repo %s",
992 updated, inserted, repo_id,
993 )
994 return updated + inserted
995
996
997 async def backfill_genesis_ops(
998 session: AsyncSession,
999 repo_id: str | None = None,
1000 *,
1001 dry_run: bool = False,
1002 ) -> int:
1003 """Set op='add' on the oldest history entry for every symbol where it was
1004 recorded as 'modify' (or any non-add op) due to the genesis commit having
1005 no structured_delta.
1006
1007 The oldest entry for a symbol IS its birth record — if it shows 'modify'
1008 the indexer simply never saw the insert op because the genesis commit
1009 predated the structured_delta fix. Correcting it to 'add' makes the
1010 provenance timeline show the birth epoch correctly.
1011
1012 Args:
1013 session: Async SQLAlchemy session.
1014 repo_id: Limit to one repo. Pass None to backfill all repos.
1015 dry_run: Count rows without writing. Returns the would-be count.
1016
1017 Returns:
1018 Number of rows updated (or that would be updated in dry_run mode).
1019 """
1020 import sqlalchemy as sa
1021
1022 she = MusehubSymbolHistoryEntry
1023
1024 # Subquery: oldest committed_at per (repo_id, address).
1025 min_ts_subq = (
1026 sa.select(
1027 she.repo_id.label("r"),
1028 she.address.label("a"),
1029 sa.func.min(she.committed_at).label("min_ts"),
1030 )
1031 .group_by(she.repo_id, she.address)
1032 )
1033 if repo_id is not None:
1034 min_ts_subq = min_ts_subq.where(she.repo_id == repo_id)
1035 min_ts_subq = min_ts_subq.subquery()
1036
1037 # Birth rows: composite PK of the oldest entry per symbol where op != 'add'.
1038 birth_pks_subq = (
1039 sa.select(she.repo_id, she.address, she.commit_id)
1040 .join(
1041 min_ts_subq,
1042 sa.and_(
1043 she.repo_id == min_ts_subq.c.r,
1044 she.address == min_ts_subq.c.a,
1045 she.committed_at == min_ts_subq.c.min_ts,
1046 ),
1047 )
1048 .where(she.op != "add")
1049 )
1050 if repo_id is not None:
1051 birth_pks_subq = birth_pks_subq.where(she.repo_id == repo_id)
1052
1053 if dry_run:
1054 count: int = (await session.execute(
1055 sa.select(sa.func.count()).select_from(birth_pks_subq.subquery())
1056 )).scalar_one()
1057 return count
1058
1059 # Single UPDATE: set op='add' on all birth rows identified above.
1060 result = await session.execute(
1061 sa.update(she)
1062 .where(
1063 sa.tuple_(she.repo_id, she.address, she.commit_id).in_(birth_pks_subq)
1064 )
1065 .values(op="add")
1066 )
1067 updated = result.rowcount
1068
1069 logger.info(
1070 "backfill_genesis_ops: corrected %d birth entries%s",
1071 updated,
1072 f" for repo {repo_id}" if repo_id else " across all repos",
1073 )
1074 return updated
1075
1076 # ---------------------------------------------------------------------------
1077 # Backfill: content_id from snapshot manifest
1078 # ---------------------------------------------------------------------------
1079
1080 async def backfill_content_ids_from_snapshots(
1081 session: AsyncSession,
1082 repo_id: str | None = None,
1083 *,
1084 dry_run: bool = False,
1085 ) -> int:
1086 """Fill in missing content_id values on symbol history entries by reading
1087 the snapshot manifest for each entry's commit.
1088
1089 When the indexer processes a file-level move/rename op (op_type='patch'
1090 with from_address), it records no new_content_id at the symbol level.
1091 The snapshot at that commit, however, does have the file's content_id in
1092 its manifest. This function closes that gap.
1093
1094 Only file-level addresses (no '::' separator) are handled here — those map
1095 directly to a manifest path. Symbol-level addresses ('file.py::Symbol')
1096 must be backfilled differently (not yet implemented).
1097
1098 Args:
1099 session: Async SQLAlchemy session.
1100 repo_id: Limit to one repo. None = all repos.
1101 dry_run: Count without writing.
1102
1103 Returns:
1104 Number of rows updated (or that would be).
1105 """
1106 import msgpack # type: ignore[import]
1107 import sqlalchemy as sa
1108
1109 she = MusehubSymbolHistoryEntry
1110
1111 # Find history entries with no content_id, scoped to file-level addresses.
1112 missing_q = (
1113 sa.select(she.repo_id, she.address, she.commit_id)
1114 .where(
1115 she.content_id.is_(None),
1116 ~she.address.contains("::"), # file-level only
1117 )
1118 )
1119 if repo_id is not None:
1120 missing_q = missing_q.where(she.repo_id == repo_id)
1121
1122 missing_rows = (await session.execute(missing_q)).all()
1123 if not missing_rows:
1124 return 0
1125
1126 if dry_run:
1127 return len(missing_rows)
1128
1129 # Group by commit_id so we fetch each snapshot at most once.
1130 by_commit: dict[str, list[tuple[str, str]]] = {}
1131 for row in missing_rows:
1132 by_commit.setdefault(row.commit_id, []).append((row.repo_id, row.address))
1133
1134 commit_ids = list(by_commit.keys())
1135
1136 # Batch-fetch snapshot manifests for all relevant commits.
1137 snap_rows = (await session.execute(
1138 sa.select(
1139 MusehubCommit.commit_id,
1140 MusehubSnapshot.manifest_blob,
1141 )
1142 .join(MusehubSnapshot, MusehubSnapshot.snapshot_id == MusehubCommit.snapshot_id)
1143 .where(MusehubCommit.commit_id.in_(commit_ids))
1144 )).all()
1145
1146 manifest_by_commit: ManifestByCommit = {}
1147 for snap_row in snap_rows:
1148 try:
1149 manifest_by_commit[snap_row.commit_id] = msgpack.unpackb(
1150 bytes(snap_row.manifest_blob), raw=False
1151 )
1152 except Exception as exc:
1153 logger.warning("Could not decode manifest for commit %s: %s", snap_row.commit_id, exc)
1154
1155 updated = 0
1156 for commit_id, entries in by_commit.items():
1157 manifest = manifest_by_commit.get(commit_id)
1158 if not manifest:
1159 continue
1160 for r_id, address in entries:
1161 cid = manifest.get(address)
1162 if not cid:
1163 continue
1164 result = await session.execute(
1165 sa.update(she)
1166 .where(
1167 she.repo_id == r_id,
1168 she.address == address,
1169 she.commit_id == commit_id,
1170 )
1171 .values(content_id=cid)
1172 )
1173 updated += result.rowcount
1174
1175 logger.info(
1176 "backfill_content_ids_from_snapshots: filled %d content_id values%s",
1177 updated,
1178 f" for repo {repo_id}" if repo_id else " across all repos",
1179 )
1180 return updated
1181
1182 async def backfill_raw_ops_from_commits(
1183 session: AsyncSession,
1184 repo_id: str | None = None,
1185 *,
1186 dry_run: bool = False,
1187 ) -> int:
1188 """Re-index stale coarse-op history entries using the original structured_delta.
1189
1190 Before migration 0018, the indexer collapsed DomainOp types to four coarse
1191 values ('add', 'modify', 'rename', 'move'). This function corrects existing
1192 rows by re-reading each commit's structured_delta column and writing the
1193 raw op type and full op payload.
1194
1195 Only rows with coarse values ('add', 'modify', 'rename') are touched.
1196 'delete' and 'move' were already correct. Rows where no matching address
1197 is found in the delta are left untouched.
1198
1199 Args:
1200 session: Async SQLAlchemy session.
1201 repo_id: Limit to one repo. None = all repos.
1202 dry_run: Count without writing.
1203
1204 Returns:
1205 Number of rows updated (or that would be).
1206 """
1207 import sqlalchemy as sa
1208
1209 she = MusehubSymbolHistoryEntry
1210 _COARSE = ("add", "modify", "rename")
1211
1212 stale_q = (
1213 sa.select(she.repo_id, she.address, she.commit_id)
1214 .where(she.op.in_(_COARSE))
1215 )
1216 if repo_id is not None:
1217 stale_q = stale_q.where(she.repo_id == repo_id)
1218
1219 stale_rows = (await session.execute(stale_q)).all()
1220 if not stale_rows:
1221 return 0
1222
1223 if dry_run:
1224 return len(stale_rows)
1225
1226 # Group by commit_id.
1227 by_commit: dict[str, list[tuple[str, str]]] = {}
1228 for row in stale_rows:
1229 by_commit.setdefault(row.commit_id, []).append((row.repo_id, row.address))
1230
1231 # Batch-load structured_delta for all commit_ids in one query.
1232 delta_map: dict[str, JSONObject | None] = {}
1233 for batch in [list(by_commit.keys())[i:i+500] for i in range(0, len(by_commit), 500)]:
1234 rows_d = (await session.execute(
1235 select(MusehubCommit.commit_id, MusehubCommit.structured_delta)
1236 .where(MusehubCommit.commit_id.in_(batch))
1237 )).all()
1238 for r in rows_d:
1239 delta_map[r.commit_id] = r.structured_delta
1240
1241 updated = 0
1242 for commit_id, entries in by_commit.items():
1243 ops_flat = _extract_ops(delta_map.get(commit_id))
1244 addr_to_op: dict[str, dict] = {
1245 op["address"]: op for op in ops_flat if op.get("address")
1246 }
1247
1248 for r_id, address in entries:
1249 raw_op = addr_to_op.get(address)
1250 if not raw_op:
1251 continue
1252 new_op_type = raw_op.get("op", "")
1253 if not new_op_type:
1254 continue
1255 result = await session.execute(
1256 sa.update(she)
1257 .where(
1258 she.repo_id == r_id,
1259 she.address == address,
1260 she.commit_id == commit_id,
1261 she.op.in_(_COARSE),
1262 )
1263 .values(op=new_op_type, op_payload=_op_payload(raw_op))
1264 )
1265 updated += result.rowcount
1266
1267 logger.info(
1268 "backfill_raw_ops_from_commits: updated %d entries%s",
1269 updated,
1270 f" for repo {repo_id}" if repo_id else " across all repos",
1271 )
1272 return updated
1273
1274 # ---------------------------------------------------------------------------
1275 # Snapshot-diff backfill
1276 # ---------------------------------------------------------------------------
1277
1278 _Manifest = dict[str, str] # path → object_id
1279
1280 def _diff_manifests(
1281 parent: _Manifest,
1282 child: _Manifest,
1283 ) -> list[tuple[str, str, str | None]]:
1284 """Diff two snapshot manifests and return (address, op, extra|None) tuples.
1285
1286 The ``extra`` field carries different semantics per op:
1287 - move → from_address (where the content came from)
1288 - delete → to_address (where the content moved to, if unambiguous rename)
1289 - insert / replace → None
1290
1291 Rules:
1292 - Path in child only → insert
1293 - Path in both with different object_id → replace
1294 - Path in parent only → delete (plain, extra=None)
1295 - Unambiguous rename (1-to-1 same object_id) → move with from_address
1296 AND delete for the source with to_address pointing to the new path
1297 - Same object_id ambiguous (multiple sources or destinations) → insert + plain delete
1298 """
1299 parent_set = set(parent)
1300 child_set = set(child)
1301
1302 added_paths = child_set - parent_set # candidates: insert or move destination
1303 removed_paths = parent_set - child_set # candidates: delete or move source
1304
1305 # Build content_id → [paths] maps for ambiguity detection.
1306 removed_by_oid: dict[str, list[str]] = {}
1307 for p in removed_paths:
1308 oid = parent[p]
1309 removed_by_oid.setdefault(oid, []).append(p)
1310
1311 added_by_oid: dict[str, list[str]] = {}
1312 for p in added_paths:
1313 oid = child[p]
1314 added_by_oid.setdefault(oid, []).append(p)
1315
1316 # Resolve unambiguous moves: one removed + one added share the same object_id.
1317 move_dest_to_src: dict[str, str] = {}
1318 move_sources: set[str] = set()
1319 for oid, removed in removed_by_oid.items():
1320 added = added_by_oid.get(oid, [])
1321 if len(removed) == 1 and len(added) == 1:
1322 src, dst = removed[0], added[0]
1323 move_dest_to_src[dst] = src
1324 move_sources.add(src)
1325
1326 ops: list[tuple[str, str, str | None]] = []
1327
1328 # Additions (insert or move destination)
1329 for path in added_paths:
1330 if path in move_dest_to_src:
1331 ops.append((path, "move", move_dest_to_src[path]))
1332 else:
1333 ops.append((path, "insert", None))
1334
1335 # Removals: plain delete for ambiguous cases; delete with to_address for move sources
1336 # Build reverse map: move source → destination
1337 move_src_to_dest: dict[str, str] = {src: dst for dst, src in move_dest_to_src.items()}
1338 for path in removed_paths:
1339 to_address = move_src_to_dest.get(path) # None for non-move deletes
1340 ops.append((path, "delete", to_address))
1341
1342 # Replacements (same path, different content)
1343 for path in parent_set & child_set:
1344 if parent[path] != child[path]:
1345 ops.append((path, "replace", None))
1346
1347 return ops
1348
1349 async def backfill_history_from_snapshots(
1350 session: AsyncSession,
1351 repo_id: str | None = None,
1352 *,
1353 dry_run: bool = False,
1354 ) -> int:
1355 """Create history entries for commits whose files were never covered by
1356 structured_delta indexing.
1357
1358 Walks every commit in the repo(s), diffs its snapshot manifest against its
1359 parent's, and inserts a history row for each (address, commit_id) pair that
1360 does not already have one.
1361
1362 Each created entry carries ``{"inferred_from": "snapshot_diff"}`` in
1363 ``op_payload`` to distinguish it from semantically richer structured_delta
1364 entries. Existing entries are never modified.
1365
1366 Returns the number of rows created (or that would be created in dry_run).
1367 """
1368 import msgpack # type: ignore[import]
1369 import sqlalchemy as sa
1370
1371 she = MusehubSymbolHistoryEntry
1372
1373 # Load all (commit, repo_id) pairs for the target repo(s), oldest-first.
1374 # Select repo_id from MusehubCommitRef since MusehubCommit is globally addressed.
1375 commit_q = sa.select(MusehubCommit, MusehubCommitRef.repo_id.label("ref_repo_id")).join(
1376 MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id
1377 )
1378 if repo_id is not None:
1379 commit_q = commit_q.where(MusehubCommitRef.repo_id == repo_id)
1380 commit_q = commit_q.order_by(MusehubCommit.timestamp.asc())
1381 all_rows = (await session.execute(commit_q)).all()
1382 all_commits_with_repo: list[tuple[MusehubCommit, str]] = [
1383 (row.MusehubCommit, row.ref_repo_id) for row in all_rows
1384 ]
1385
1386 if not all_commits_with_repo:
1387 return 0
1388
1389 # Batch-load all snapshot manifests keyed by snapshot_id.
1390 snap_ids = [c.snapshot_id for c, _ in all_commits_with_repo if c.snapshot_id]
1391 snap_rows = (await session.execute(
1392 sa.select(MusehubSnapshot.snapshot_id, MusehubSnapshot.manifest_blob)
1393 .where(MusehubSnapshot.snapshot_id.in_(snap_ids))
1394 )).all()
1395 manifest_by_snap: dict[str, _Manifest] = {}
1396 for snap in snap_rows:
1397 try:
1398 manifest_by_snap[snap.snapshot_id] = msgpack.unpackb(
1399 bytes(snap.manifest_blob), raw=False
1400 )
1401 except Exception as exc:
1402 logger.warning("backfill_history_from_snapshots: bad manifest %s: %s",
1403 snap.snapshot_id, exc)
1404
1405 # Load existing (repo_id, address, commit_id) pairs to skip.
1406 existing_q = sa.select(she.repo_id, she.address, she.commit_id)
1407 if repo_id is not None:
1408 existing_q = existing_q.where(she.repo_id == repo_id)
1409 existing_set: set[tuple[str, str, str]] = {
1410 (r.repo_id, r.address, r.commit_id)
1411 for r in (await session.execute(existing_q)).all()
1412 }
1413
1414 # Index commits by id for parent manifest lookup (snapshot lookup only needs commit, not repo).
1415 commit_by_id = {c.commit_id: c for c, _ in all_commits_with_repo}
1416
1417 pending: list[MusehubSymbolHistoryEntry] = []
1418
1419 for commit, effective_repo_id in all_commits_with_repo:
1420 if not commit.snapshot_id or commit.snapshot_id not in manifest_by_snap:
1421 continue
1422
1423 child_manifest = manifest_by_snap[commit.snapshot_id]
1424
1425 # Resolve parent manifest (merge commits: use first parent).
1426 parent_manifest: _Manifest = {}
1427 for parent_id in (commit.parent_ids or [])[:1]:
1428 parent_commit = commit_by_id.get(parent_id)
1429 if parent_commit and parent_commit.snapshot_id:
1430 parent_manifest = manifest_by_snap.get(parent_commit.snapshot_id, {})
1431 break
1432
1433 diffs = _diff_manifests(parent_manifest, child_manifest)
1434
1435 for address, op, extra in diffs:
1436 key = (effective_repo_id, address, commit.commit_id)
1437 if key in existing_set:
1438 continue # already covered by structured_delta or prior run
1439
1440 payload: JSONObject = {"inferred_from": "snapshot_diff"}
1441 if op == "move" and extra:
1442 payload["from_address"] = extra
1443 elif op == "delete" and extra:
1444 payload["to_address"] = extra
1445
1446 content_id = child_manifest.get(address) if op != "delete" else None
1447
1448 pending.append(MusehubSymbolHistoryEntry(
1449 repo_id=effective_repo_id,
1450 address=address,
1451 commit_id=commit.commit_id,
1452 op=op,
1453 op_payload=payload,
1454 content_id=content_id,
1455 committed_at=commit.timestamp,
1456 author=commit.author or "",
1457 ))
1458 existing_set.add(key) # prevent duplicate inserts within this run
1459
1460 if dry_run:
1461 return len(pending)
1462
1463 for row in pending:
1464 session.add(row)
1465
1466 logger.info(
1467 "backfill_history_from_snapshots: created %d entries%s",
1468 len(pending),
1469 f" for repo {repo_id}" if repo_id else " across all repos",
1470 )
1471 return len(pending)
1472
1473 # ---------------------------------------------------------------------------
1474 # Read helpers — used by API routes
1475 # ---------------------------------------------------------------------------
1476
1477 def _row_to_entry(row: MusehubSymbolHistoryEntry, address_override: str | None = None) -> JSONObject:
1478 return {
1479 "commit_id": row.commit_id,
1480 "committed_at": row.committed_at.isoformat() if row.committed_at else "",
1481 "author": row.author or "",
1482 "op": row.op,
1483 "op_payload": row.op_payload or {},
1484 "content_id": row.content_id or "",
1485 "address": address_override if address_override is not None else row.address,
1486 }
1487
1488 _RowCache = dict[str, list["MusehubSymbolHistoryEntry"]]
1489
1490 async def _resolve_lineage(
1491 session: AsyncSession,
1492 repo_id: str,
1493 address: str,
1494 cache: _RowCache,
1495 visited: set[str],
1496 ) -> list[JSONObject]:
1497 """Walk from_address chains to build a full oldest-first lineage for address.
1498
1499 Uses cache for already-loaded rows; issues a targeted DB query only when a
1500 lineage hop points to an address not yet in cache (e.g. filtered-out origin
1501 files). visited prevents infinite loops on malformed data.
1502 """
1503 if address in visited:
1504 return []
1505 visited.add(address)
1506
1507 if address not in cache:
1508 she = MusehubSymbolHistoryEntry
1509 import sqlalchemy as sa
1510 db_rows = (await session.execute(
1511 sa.select(she)
1512 .where(she.repo_id == repo_id, she.address == address)
1513 .order_by(she.committed_at.asc())
1514 )).scalars().all()
1515 cache[address] = list(db_rows)
1516
1517 rows = cache[address]
1518 if not rows:
1519 return []
1520
1521 from_address: str | None = (rows[0].op_payload or {}).get("from_address")
1522
1523 prefix: list[JSONObject] = []
1524 if from_address and from_address != address:
1525 prefix = await _resolve_lineage(session, repo_id, from_address, cache, visited)
1526
1527 return prefix + [_row_to_entry(r) for r in rows]
1528
1529 async def load_symbol_history(
1530 session: AsyncSession,
1531 repo_id: str,
1532 file_path: str | None = None,
1533 ) -> SymbolHistory:
1534 """Load symbol history from normalized rows, following from_address chains.
1535
1536 Each address whose oldest entry carries a ``from_address`` in its
1537 ``op_payload`` is traced back recursively until the origin insert is found.
1538 The result is a merged, oldest-first timeline keyed on the *current*
1539 (final) address; intermediate addresses are not included as top-level keys.
1540
1541 When file_path is given, only symbols under that file path are returned.
1542 Their lineage (which may span a different origin file path) is still
1543 included, fetched via a targeted query if not already in the loaded set.
1544 """
1545 she = MusehubSymbolHistoryEntry
1546
1547 stmt = select(she).where(she.repo_id == repo_id)
1548 if file_path is not None:
1549 stmt = stmt.where(
1550 she.address.in_(
1551 select(she.address).where(
1552 she.repo_id == repo_id,
1553 (she.address == file_path) | she.address.like(f"{file_path}::%"),
1554 ).distinct()
1555 )
1556 )
1557
1558 rows = (await session.execute(stmt)).scalars().all()
1559
1560 # Build address→rows cache, sorted oldest-first.
1561 cache: _RowCache = {}
1562 for row in rows:
1563 cache.setdefault(row.address, []).append(row)
1564 for addr_rows in cache.values():
1565 addr_rows.sort(key=lambda r: r.committed_at or datetime.min.replace(tzinfo=timezone.utc))
1566
1567 # Determine which loaded addresses are origins (will be folded into a
1568 # successor's lineage rather than kept as top-level keys).
1569 origin_addresses: set[str] = set()
1570 for addr_rows in cache.values():
1571 from_address = (addr_rows[0].op_payload or {}).get("from_address")
1572 if from_address and from_address in cache:
1573 origin_addresses.add(from_address)
1574
1575 # Build merged timeline for each leaf address.
1576 # Snapshot keys before iteration — _resolve_lineage may add entries to cache
1577 # when it fetches origin addresses not present in the initial load.
1578 history: SymbolHistory = {}
1579 for address in list(cache):
1580 if address in origin_addresses:
1581 continue
1582 history[address] = await _resolve_lineage(session, repo_id, address, cache, set())
1583
1584 return history
1585
1586 async def load_hash_occurrence(
1587 session: AsyncSession,
1588 repo_id: str,
1589 ) -> HashOccurrence:
1590 """Load clone detection index from normalized rows."""
1591 rows = (await session.execute(
1592 select(MusehubHashOccurrenceEntry).where(
1593 MusehubHashOccurrenceEntry.repo_id == repo_id
1594 )
1595 )).scalars().all()
1596 occurrence: HashOccurrence = {}
1597 for row in rows:
1598 if row.content_id not in occurrence:
1599 occurrence[row.content_id] = []
1600 occurrence[row.content_id].append(row.address)
1601 return occurrence
1602
1603 async def get_index_meta(
1604 session: AsyncSession,
1605 repo_id: str,
1606 ) -> JSONObject | None:
1607 """Return index metadata (ref, computed_at, symbol_count) or None."""
1608 summary_row = (await session.execute(
1609 select(MusehubIntelResult).where(
1610 MusehubIntelResult.repo_id == repo_id,
1611 MusehubIntelResult.intel_type == "code.intel_summary",
1612 )
1613 )).scalar_one_or_none()
1614 if summary_row is None:
1615 return None
1616 symbol_count = 0
1617 try:
1618 import json
1619 symbol_count = json.loads(summary_row.data_json).get("symbol_count", 0)
1620 except Exception:
1621 pass
1622 return {
1623 "ref": summary_row.ref,
1624 "built_at": summary_row.computed_at.isoformat() if summary_row.computed_at else None,
1625 "symbol_count": symbol_count,
1626 }
1627
1628 async def load_intel_snapshot(
1629 session: AsyncSession,
1630 repo_id: str,
1631 ) -> "IntelSnapshot | None":
1632 """Load the pre-computed IntelSnapshot from code.intel_snapshot."""
1633 from musehub.services.musehub_intel import IntelSnapshot
1634 import json
1635 row = (await session.execute(
1636 select(MusehubIntelResult).where(
1637 MusehubIntelResult.repo_id == repo_id,
1638 MusehubIntelResult.intel_type == "code.intel_snapshot",
1639 )
1640 )).scalar_one_or_none()
1641 if row is None:
1642 return None
1643 try:
1644 return IntelSnapshot.from_dict(json.loads(row.data_json))
1645 except Exception as exc:
1646 logger.warning("Failed to deserialize code.intel_snapshot for repo %s: %s", repo_id, exc)
1647 return None
1648
1649 async def lookup_symbol_intel(
1650 session: AsyncSession,
1651 repo_id: str,
1652 addresses: list[str],
1653 ) -> SymbolIntelMap:
1654 """Return per-symbol intel for the requested addresses.
1655
1656 Single indexed query — O(len(addresses)) regardless of repo size.
1657 """
1658 if not addresses:
1659 return {}
1660 rows = (await session.execute(
1661 select(MusehubSymbolIntel).where(
1662 MusehubSymbolIntel.repo_id == repo_id,
1663 MusehubSymbolIntel.address.in_(addresses),
1664 )
1665 )).scalars().all()
1666
1667 result: SymbolIntelMap = {}
1668 for row in rows:
1669 result[row.address] = {
1670 "churn": row.churn,
1671 "churn_30d": row.churn_30d,
1672 "churn_90d": row.churn_90d,
1673 "blast": row.blast,
1674 "blast_direct": row.blast_direct,
1675 "blast_cross": row.blast_cross,
1676 "blast_top": list(row.blast_top or []),
1677 "last_changed": row.last_changed.isoformat() if row.last_changed else "",
1678 "last_author": row.last_author or "",
1679 "author_count": row.author_count,
1680 "gravity": row.gravity,
1681 "weekly": list(row.weekly or []),
1682 }
1683 return result
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 9 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 12 days ago