gabriel / musehub public
musehub_intel_providers.py python
2,740 lines 109.8 KB
Raw
sha256:d50f9cf9829dfbe35721a23b81ad256c729ddf9dd565a0a9e56d27847e255632 feat(#92): phase 4 — enqueue fetch.mpack.prebuild on push (… Sonnet 4.6 patch 5 days ago
1 """Domain-agnostic intelligence provider registry.
2
3 Every intelligence result written to ``musehub_intel_results`` is produced by
4 an ``IntelProvider``. Providers are keyed by job_type string and registered in
5 ``_PROVIDER_REGISTRY``. The worker imports the registry and dispatches jobs
6 without knowing anything about the underlying domain.
7
8 Adding a new domain means:
9 1. Implement a class that satisfies the ``IntelProvider`` Protocol.
10 2. Add it to ``_PROVIDER_REGISTRY`` under the appropriate ``"intel.<domain>"`` key.
11 3. Update ``job_types_for_push`` to enqueue it for repos in that domain.
12
13 No other files need changes.
14
15 Provider contract
16 -----------------
17 - ``compute`` receives an async session, repo_id, head commit_id, and the raw
18 job payload. It returns a list of ``(intel_type, data_dict)`` tuples.
19 - Each tuple is written to ``musehub_intel_results`` via upsert:
20 ``(repo_id, intel_type)`` is the unique key; every push overwrites the
21 previous result for that type.
22 - ``schema_version`` defaults to 1; bump it inside the provider when the
23 ``data_dict`` schema changes in a backwards-incompatible way so readers can
24 detect stale entries.
25 """
26
27 import asyncio
28 import json
29 import logging
30 from datetime import datetime, timedelta, timezone
31 from typing import Protocol, runtime_checkable
32
33 import sqlalchemy as sa
34 from sqlalchemy import select
35 from sqlalchemy.dialects.postgresql import insert as pg_insert
36 from sqlalchemy.ext.asyncio import AsyncSession
37
38 from muse.core.types import blob_id
39 from muse.plugins.code.ast_parser import parse_symbols
40 from muse.plugins.code._query import language_of
41 from musehub.core.genesis import compute_intel_result_id
42 from musehub.db.musehub_intel_models import (
43 MusehubHashOccurrenceEntry,
44 MusehubIntelApiSurface,
45 MusehubIntelBlastRisk,
46 MusehubIntelBreakageIssue,
47 MusehubIntelBreakageMeta,
48 MusehubIntelClones,
49 MusehubIntelCodemapMeta,
50 MusehubIntelCodemapModule,
51 MusehubIntelCoupling,
52 MusehubIntelDead,
53 MusehubIntelEntangle,
54 MusehubIntelLanguages,
55 MusehubIntelRefactorEvent,
56 MusehubIntelResult,
57 MusehubIntelStable,
58 MusehubIntelType,
59 MusehubIntelVelocity,
60 MusehubSymbolHistoryEntry,
61 MusehubSymbolIntel,
62 )
63 from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubMist, MusehubRepo, MusehubSnapshot
64 from musehub.storage.backends import StorageBackend, get_backend
65 from musehub.types.json_types import IntDict, JSONObject, StrDict
66
67 logger = logging.getLogger(__name__)
68
69 # ---------------------------------------------------------------------------
70 # Types
71 # ---------------------------------------------------------------------------
72
73 IntelResults = list[tuple[str, dict]] # [(intel_type, data_dict), ...]
74
75 # Named type aliases — avoids boundary_dict / dict_of_dict audit violations.
76 ModuleStatsMap = dict[str, IntDict] # module_name → {added, removed, modified, ...}
77 SymbolInfoMap = dict[str, JSONObject] # address → {body_hash, signature_id, file, kind}
78 GraphMap = dict[str, list[str]] # file_path → [imported_file_path, ...]
79 LangKindsMap = dict[str, IntDict] # language → {kind → count}
80
81 @runtime_checkable
82 class IntelProvider(Protocol):
83 """Protocol satisfied by every domain intelligence provider."""
84
85 async def compute(
86 self,
87 session: AsyncSession,
88 repo_id: str,
89 ref: str,
90 payload: JSONObject,
91 ) -> IntelResults:
92 """Compute intelligence results for a repo at the given ref.
93
94 Returns a list of (intel_type, data_dict) tuples. Each will be
95 written to ``musehub_intel_results`` with an upsert on
96 ``(repo_id, intel_type)``.
97 """
98 ...
99
100 # ---------------------------------------------------------------------------
101 # Persistence helper
102 # ---------------------------------------------------------------------------
103
104 async def persist_intel_results(
105 session: AsyncSession,
106 repo_id: str,
107 ref: str,
108 results: IntelResults,
109 ) -> None:
110 """Upsert a batch of intel results into musehub_intel_results.
111
112 Each (intel_type, data_dict) tuple becomes one row. The unique
113 constraint on (repo_id, intel_type) means this is always an overwrite
114 of the previous result for that type.
115 """
116 now = datetime.now(tz=timezone.utc)
117 for intel_type, data in results:
118 domain = intel_type.split(".")[0]
119 result_id = compute_intel_result_id(repo_id, intel_type, ref)
120 stmt = (
121 pg_insert(MusehubIntelResult)
122 .values(
123 result_id=result_id,
124 repo_id=repo_id,
125 intel_type=intel_type,
126 domain=domain,
127 ref=ref,
128 data_json=json.dumps(data),
129 schema_version=1,
130 computed_at=now,
131 )
132 .on_conflict_do_update(
133 constraint="uq_musehub_intel_results_repo_type",
134 set_={
135 "result_id": result_id,
136 "ref": ref,
137 "data_json": json.dumps(data),
138 "computed_at": now,
139 },
140 )
141 )
142 await session.execute(stmt)
143
144 # ---------------------------------------------------------------------------
145 # Structural provider — works for ALL domains
146 # ---------------------------------------------------------------------------
147
148 class StructuralProvider:
149 """Compute cross-domain structural intelligence from DB-stored commit history.
150
151 No subprocess, no domain-specific logic — reads directly from
152 ``musehub_commits``. Produces results meaningful for any Muse domain.
153
154 Intel types produced:
155 ``structural.velocity`` — weekly commit counts (12 weeks, newest first)
156 ``structural.contributors`` — author → commit count over all time
157 """
158
159 async def compute(
160 self,
161 session: AsyncSession,
162 repo_id: str,
163 ref: str,
164 payload: JSONObject,
165 ) -> IntelResults:
166 commits_q = await session.execute(
167 select(MusehubCommit.author, MusehubCommit.timestamp)
168 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
169 .where(MusehubCommitRef.repo_id == repo_id)
170 .order_by(MusehubCommit.timestamp.desc())
171 )
172 rows = commits_q.all()
173
174 now = datetime.now(tz=timezone.utc)
175 weekly_counts: list[int] = [0] * 12
176
177 contributor_counts: dict[str, int] = {}
178 for author, ts in rows:
179 if author:
180 contributor_counts[author] = contributor_counts.get(author, 0) + 1
181 if ts:
182 ts_aware = ts if ts.tzinfo else ts.replace(tzinfo=timezone.utc)
183 age_days = (now - ts_aware).days
184 week_idx = age_days // 7
185 if 0 <= week_idx < 12:
186 weekly_counts[week_idx] += 1
187
188 return [
189 (
190 "structural.velocity",
191 {
192 "weekly": weekly_counts,
193 "total_commits": len(rows),
194 },
195 ),
196 (
197 "structural.contributors",
198 {
199 "authors": contributor_counts,
200 "total_authors": len(contributor_counts),
201 },
202 ),
203 ]
204
205 # ---------------------------------------------------------------------------
206 # Code provider — wraps the symbol indexer
207 # ---------------------------------------------------------------------------
208
209 class CodeProvider:
210 """Compute code-domain intelligence using the symbol indexer.
211
212 Delegates to ``build_symbol_index`` from the existing indexer module,
213 which reads the structured_delta column from stored commits and outputs five result
214 types into ``musehub_intel_results``.
215
216 Intel types produced:
217 ``code.symbol_history`` — address → op list (provenance)
218 ``code.hash_occurrence`` — content_id → address list (clone detection)
219 ``code.intel_snapshot`` — full IntelSnapshot (dashboards, health)
220 ``code.intel_summary`` — condensed for the repo home page
221 ``code.per_symbol_intel``— per-symbol {churn, blast, gravity, …}
222 """
223
224 async def compute(
225 self,
226 session: AsyncSession,
227 repo_id: str,
228 ref: str,
229 payload: JSONObject,
230 ) -> IntelResults:
231 from musehub.services.musehub_symbol_indexer import build_symbol_index
232 return await build_symbol_index(session, repo_id, ref)
233
234 # ---------------------------------------------------------------------------
235 # Subprocess helper and repo-root resolution for code intel providers
236 # ---------------------------------------------------------------------------
237
238 async def _run_muse(repo_root: str, *args: str) -> JSONObject | None:
239 """Run ``muse -C <repo_root> <args> --json`` and return parsed JSON.
240
241 Returns ``None`` on non-zero exit or JSON parse failure so callers can
242 safely return ``[]`` without crashing the worker.
243 """
244 try:
245 proc = await asyncio.create_subprocess_exec(
246 "muse", "-C", repo_root, *args, "--json",
247 stdout=asyncio.subprocess.PIPE,
248 stderr=asyncio.subprocess.PIPE,
249 )
250 stdout, _ = await proc.communicate()
251 if proc.returncode != 0:
252 logger.warning("muse %s exited %s", " ".join(args), proc.returncode)
253 return None
254 return json.loads(stdout)
255 except Exception:
256 logger.exception("_run_muse failed: args=%s", args)
257 return None
258
259
260 # ---------------------------------------------------------------------------
261 # MIDI provider — stub for future implementation
262 # ---------------------------------------------------------------------------
263
264 class MidiProvider:
265 """Compute MIDI-domain intelligence (stub — returns empty until implemented).
266
267 When the MIDI domain intelligence commands are available via ``muse`` CLI
268 subprocess calls or direct Muse library APIs, implement this provider to
269 produce ``midi.*`` intel types such as:
270 ``midi.tags`` — tag frequency, commit coverage
271 ``midi.harmony`` — chord progression statistics
272 ``midi.tempo_map`` — tempo change history
273 """
274
275 async def compute(
276 self,
277 session: AsyncSession,
278 repo_id: str,
279 ref: str,
280 payload: JSONObject,
281 ) -> IntelResults:
282 return []
283
284 # ---------------------------------------------------------------------------
285 # Mist provider — symbol anchor extraction for mist repos
286 # ---------------------------------------------------------------------------
287
288 class MistProvider:
289 """Compute mist-domain intelligence for a mist-domain repo.
290
291 Two result types are produced per push:
292
293 ``mist.anchor_index``
294 Snapshot-level symbol anchor index built by
295 ``build_mist_anchor_index``. Works for VCS-only mist repos (bare
296 push) and for API-created mists alike. Writes normalized rows to
297 ``musehub_symbol_history_entries`` and ``musehub_symbol_intel`` and
298 returns ``{anchor_count, filename_count}``. Returns ``[]`` when the
299 commit has no snapshot, the snapshot is empty, or no artifact yields
300 any anchors.
301
302 ``mist.anchors``
303 Per-mist anchor blob — only emitted when a ``MusehubMist`` row is
304 linked to this repo (i.e. the mist was created via ``POST /api/mists``).
305 Re-extracts anchors from the stored content and refreshes
306 ``MusehubMist.symbol_anchors`` in the DB. Callers that only push via
307 the wire protocol without an API-created mist will not see this type.
308
309 Never raises — binary artifacts with no parseable symbols yield an empty
310 anchor list; missing snapshots return ``[]`` immediately.
311 """
312
313 async def compute(
314 self,
315 session: AsyncSession,
316 repo_id: str,
317 ref: str,
318 payload: JSONObject,
319 ) -> IntelResults:
320 from muse.plugins.mist.plugin import extract_mist_symbol_anchors
321 from sqlalchemy import select as _select
322 from musehub.services.musehub_mist_indexer import build_mist_anchor_index
323
324 # Snapshot-level index — always run, works for VCS-only and API-created mists.
325 results: list[tuple[str, dict]] = []
326 results.extend(await build_mist_anchor_index(session, repo_id, ref))
327
328 # Per-mist anchor blob — only when a MusehubMist row is linked to this repo.
329 result = await session.execute(
330 _select(MusehubMist).where(MusehubMist.repo_id == repo_id)
331 )
332 mist = result.scalar_one_or_none()
333 if mist is not None:
334 raw = mist.content.encode("utf-8")
335 anchors = extract_mist_symbol_anchors(mist.filename, raw)
336 mist.symbol_anchors = anchors
337 results.append((
338 "mist.anchors",
339 {
340 "mist_id": mist.mist_id,
341 "filename": mist.filename,
342 "artifact_type": mist.artifact_type,
343 "symbol_anchors": anchors,
344 "anchor_count": len(anchors),
345 },
346 ))
347
348 return results
349
350 # ---------------------------------------------------------------------------
351 # Profile snapshot provider — cross-repo user-level aggregates
352 # ---------------------------------------------------------------------------
353
354 class ProfileSnapshotProvider:
355 """Compute and persist the full profile snapshot for a repo's owner.
356
357 Unlike repo-scoped providers this provider reads ``payload["handle"]`` to
358 identify the target user, computes all expensive cross-repo aggregates,
359 and upserts a single row in ``musehub_profile_snapshots`` keyed by handle.
360
361 Returns ``[]`` — results are written directly to the snapshots table,
362 not through ``persist_intel_results``.
363
364 Data computed:
365 stats — {repo_count, commit_count, agent_count, avg_health}
366 repos — list of repo card dicts (all owned repos)
367 heatmap — 52-week daily heatmap with streaks
368 agent_fleet — agents that committed to owned repos
369 badges — provenance badge dicts
370 footprint — top-N touched symbols across repos
371 activity_canvas — 6-domain 52×7 activity grid (active domains only)
372 """
373
374 async def compute(
375 self,
376 session: AsyncSession,
377 repo_id: str,
378 ref: str,
379 payload: JSONObject,
380 ) -> IntelResults:
381 handle = str(payload.get("handle", ""))
382 if not handle:
383 logger.warning("profile.snapshot: missing handle in payload, skipping")
384 return []
385
386 try:
387 await _compute_and_persist_profile_snapshot(session, handle)
388 except Exception:
389 import traceback
390 logger.error(
391 "profile.snapshot: failed for handle=%s\n%s",
392 handle, traceback.format_exc(),
393 )
394 return []
395
396 async def _compute_and_persist_profile_snapshot(
397 session: AsyncSession,
398 handle: str,
399 ) -> None:
400 """Compute all expensive profile data for *handle* and upsert the snapshot row."""
401 import json as _json
402
403 # Lazy import to avoid circular dependency at module level.
404 from musehub.api.routes.musehub.ui_user_profile import (
405 _resolve_identity,
406 _compute_provenance_stats,
407 _fetch_repos,
408 _build_heatmap,
409 _fetch_agent_fleet,
410 _compute_provenance_badges,
411 _fetch_symbol_footprint,
412 )
413 from musehub.services.musehub_profile import build_activity_canvas
414
415 identity = await _resolve_identity(session, handle)
416 if identity is None:
417 logger.info("profile.snapshot: no identity found for handle=%s, skipping", handle)
418 return
419
420 stats = await _compute_provenance_stats(session, identity)
421 repos = await _fetch_repos(session, identity)
422 heatmap = await _build_heatmap(session, identity)
423 agent_fleet = await _fetch_agent_fleet(session, identity)
424 badges = await _compute_provenance_badges(session, identity, stats)
425 footprint = await _fetch_symbol_footprint(session, identity)
426
427 activity_canvas_raw = await build_activity_canvas(session, handle)
428 activity_canvas = [
429 {
430 "domain": dom.domain,
431 "weeks": [dom.grid[i: i + 7] for i in range(0, len(dom.grid), 7)],
432 "peak": dom.peak,
433 "total": dom.total,
434 }
435 for dom in activity_canvas_raw
436 ]
437
438 snapshot_data = {
439 "stats": stats,
440 "repos": repos,
441 "heatmap": heatmap,
442 "agent_fleet": agent_fleet,
443 "badges": badges,
444 "footprint": footprint,
445 "activity_canvas": activity_canvas,
446 }
447
448 from musehub.db.musehub_identity_models import MusehubProfileSnapshot
449 now = datetime.now(tz=timezone.utc)
450 stmt = (
451 pg_insert(MusehubProfileSnapshot)
452 .values(
453 handle=handle,
454 data_json=_json.dumps(snapshot_data, default=str),
455 computed_at=now,
456 is_stale=False,
457 )
458 .on_conflict_do_update(
459 index_elements=["handle"],
460 set_={
461 "data_json": _json.dumps(snapshot_data, default=str),
462 "computed_at": now,
463 "is_stale": False,
464 },
465 )
466 )
467 await session.execute(stmt)
468 logger.info("profile.snapshot: persisted snapshot for handle=%s", handle)
469
470 # ---------------------------------------------------------------------------
471 # Shared commit-walk helper
472 # ---------------------------------------------------------------------------
473
474 async def _load_commit_walk(
475 session: AsyncSession,
476 repo_id: str,
477 ref: str,
478 max_walk: int,
479 ) -> list[str]:
480 """Return commit IDs in BFS order reachable from ref, up to max_walk.
481
482 Fetches all commits for the repo in one query, builds a parent map, and
483 delegates the walk to muse.core.graph.walk_dag. Only commits present in
484 the DB for this repo are included in the result.
485 """
486 import sqlalchemy as sa
487 from muse.core.graph import walk_dag
488
489 commits_result = await session.execute(
490 sa.select(
491 MusehubCommit.commit_id,
492 MusehubCommit.parent_ids,
493 )
494 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
495 .where(MusehubCommitRef.repo_id == repo_id)
496 )
497 commit_parents: dict[str, list[str]] = {
498 row.commit_id: (row.parent_ids or []) for row in commits_result
499 }
500 if not commit_parents:
501 return []
502
503 return [
504 cid for cid in walk_dag(
505 ref,
506 lambda cid: [p for p in commit_parents.get(cid, []) if p],
507 max_nodes=max_walk,
508 )
509 if cid in commit_parents
510 ]
511
512
513 # ---------------------------------------------------------------------------
514 # Code intel providers — one per normalized intel table
515 # ---------------------------------------------------------------------------
516
517 class CouplingProvider:
518 """Persist co-changing file pairs by mining musehub_symbol_history_entries.
519
520 Mirrors ``muse code coupling`` exactly — same BFS commit walk, same
521 mass-commit exclusion, same minimum co-change threshold.
522
523 Unlike EntangleProvider (symbol-level), this works at the file level.
524 For each history entry, the file is derived as ``address.split("::")[0]``.
525 Bare-path entries (no "::" in address) are treated as filenames directly —
526 they are valid signals at the file level, unlike at the symbol level where
527 they must be filtered out.
528
529 Algorithm
530 ---------
531 1. Fetch all commits for the repo; build a commit_id → parent_ids map.
532 2. BFS-walk from HEAD ref, cap at _MAX_WALK commits.
533 3. Bulk-fetch all history entries for the repo.
534 4. Per commit: derive the distinct file set from each address.
535 5. Skip commits where the distinct file count exceeds _MAX_FILES_PER_COMMIT
536 (mass scaffolding / import commits produce O(N²) noise).
537 6. For every unordered file pair (a < b) in a qualifying commit,
538 accumulate pair_co_changes[(a, b)] += 1.
539 7. Filter: co_changes >= _MIN_CO_CHANGES; sort DESC; truncate to _MAX_PAIRS.
540 8. DELETE stale rows for the repo; upsert the fresh set.
541
542 Constants
543 ---------
544 _MAX_WALK = 10_000 BFS depth cap
545 _MAX_FILES_PER_COMMIT = 200 mass-commit guard (tighter than symbol-level 500)
546 _MAX_PAIRS = 200 stored leaderboard size
547 _MIN_CO_CHANGES = 2 noise floor
548 """
549
550 _MAX_WALK = 10_000
551 _MAX_FILES_PER_COMMIT = 200
552 _MAX_PAIRS = 200
553 _MIN_CO_CHANGES = 2
554
555 async def compute(
556 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
557 ) -> IntelResults:
558 import sqlalchemy as sa
559 from collections import defaultdict
560
561 # ── Step 1: fetch all commits, BFS walk from HEAD ─────────────────────
562 walk_order = await _load_commit_walk(session, repo_id, ref, self._MAX_WALK)
563 if not walk_order:
564 return []
565
566 walk_set = set(walk_order)
567
568 # ── Step 2: bulk-fetch history entries, derive files ──────────────────
569 history_result = await session.execute(
570 sa.select(
571 MusehubSymbolHistoryEntry.commit_id,
572 MusehubSymbolHistoryEntry.address,
573 ).where(MusehubSymbolHistoryEntry.repo_id == repo_id)
574 )
575
576 commit_files: dict[str, set[str]] = defaultdict(set)
577 for cid, addr in history_result:
578 if cid not in walk_set:
579 continue
580 file = addr.split("::")[0] if "::" in addr else addr
581 if file:
582 commit_files[cid].add(file)
583
584 # ── Step 3: accumulate co-change counts ───────────────────────────────
585 pair_co_changes: dict[tuple[str, str], int] = defaultdict(int)
586 for cid, files in commit_files.items():
587 if len(files) > self._MAX_FILES_PER_COMMIT:
588 continue
589 sorted_files = sorted(files)
590 for i in range(len(sorted_files)):
591 for j in range(i + 1, len(sorted_files)):
592 pair_co_changes[(sorted_files[i], sorted_files[j])] += 1
593
594 if not pair_co_changes:
595 return []
596
597 # ── Step 4: filter, sort, truncate ────────────────────────────────────
598 pairs = [
599 {"file_a": a, "file_b": b, "co_changes": co}
600 for (a, b), co in pair_co_changes.items()
601 if co >= self._MIN_CO_CHANGES
602 ]
603 pairs.sort(key=lambda p: -p["co_changes"])
604 truncated = len(pairs) > self._MAX_PAIRS
605 pairs = pairs[: self._MAX_PAIRS]
606
607 # ── Step 5: delete stale rows, upsert fresh set ───────────────────────
608 await session.execute(
609 sa.delete(MusehubIntelCoupling).where(
610 MusehubIntelCoupling.repo_id == repo_id
611 )
612 )
613 for p in pairs:
614 await session.execute(
615 pg_insert(MusehubIntelCoupling)
616 .values(
617 repo_id=repo_id,
618 file_a=p["file_a"],
619 file_b=p["file_b"],
620 co_changes=p["co_changes"],
621 ref=ref,
622 )
623 .on_conflict_do_update(
624 index_elements=["repo_id", "file_a", "file_b"],
625 set_={"co_changes": p["co_changes"], "ref": ref},
626 )
627 )
628
629 return [("intel.code.coupling", {
630 "count": len(pairs),
631 "commits_analysed": len(walk_order),
632 "truncated": truncated,
633 })]
634
635 class EntangleProvider:
636 """Persist symbol co-change pairs by mining musehub_symbol_history_entries.
637
638 Mirrors ``muse code entangle`` exactly — same Jaccard-based co-change rate,
639 same mass-commit exclusion, same import-symbol filter.
640
641 Algorithm
642 ---------
643 1. BFS-walk commits from HEAD (cap 10 000), same as StableProvider.
644 2. Bulk-fetch all history entries for this repo; group by commit_id.
645 3. Skip commits touching > MAX_SYMBOLS_PER_COMMIT distinct symbols.
646 4. Exclude import pseudo-symbols (``::import::`` in address).
647 5. For each qualifying commit, accumulate:
648 - symbol_commits[addr] → set of commit_ids where addr was touched
649 - pair_co_changes[(a,b)] → count of commits where BOTH appeared
650 (pair key is always canonical: a < b lexicographically)
651 6. Filter: co_changes >= 2, file_a != file_b.
652 7. co_change_rate = co_changes / |commits_a ∪ commits_b| (Jaccard index).
653 8. Sort by (rate DESC, co_changes DESC); store top MAX_PAIRS.
654 9. DELETE stale rows then upsert fresh set.
655 """
656
657 _MAX_WALK = 10_000
658 _MAX_SYMBOLS_PER_COMMIT = 500
659 _MAX_PAIRS = 500
660 _MIN_CO_CHANGES = 2
661
662 async def compute(
663 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
664 ) -> IntelResults:
665 import sqlalchemy as sa
666 from collections import defaultdict
667
668 # ── Step 1: fetch all commits, BFS walk from HEAD ─────────────────────
669 walk_order = await _load_commit_walk(session, repo_id, ref, self._MAX_WALK)
670 if not walk_order:
671 return []
672
673 walk_set = set(walk_order)
674
675 # ── Step 2: bulk-fetch all history entries for this repo ──────────────
676 history_result = await session.execute(
677 sa.select(
678 MusehubSymbolHistoryEntry.commit_id,
679 MusehubSymbolHistoryEntry.address,
680 ).where(MusehubSymbolHistoryEntry.repo_id == repo_id)
681 )
682
683 commit_symbols: dict[str, set[str]] = defaultdict(set)
684 symbol_commits: dict[str, set[str]] = defaultdict(set)
685 for cid, addr in history_result:
686 if cid not in walk_set:
687 continue
688 if "::" not in addr or "::import::" in addr:
689 continue
690 commit_symbols[cid].add(addr)
691 symbol_commits[addr].add(cid)
692
693 # ── Step 3: accumulate co-change counts ───────────────────────────────
694 pair_co_changes: dict[tuple[str, str], int] = defaultdict(int)
695 for cid, symbols in commit_symbols.items():
696 if len(symbols) > self._MAX_SYMBOLS_PER_COMMIT:
697 continue
698 syms = sorted(symbols)
699 for i in range(len(syms)):
700 for j in range(i + 1, len(syms)):
701 pair_co_changes[(syms[i], syms[j])] += 1
702
703 if not pair_co_changes:
704 return []
705
706 # ── Step 4: compute rates, filter, sort ───────────────────────────────
707 pairs: list[dict] = []
708 for (a, b), co_changes in pair_co_changes.items():
709 if co_changes < self._MIN_CO_CHANGES:
710 continue
711 file_a = a.split("::")[0]
712 file_b = b.split("::")[0]
713 if file_a == file_b:
714 continue
715 count_a = len(symbol_commits[a])
716 count_b = len(symbol_commits[b])
717 union_count = count_a + count_b - co_changes
718 if union_count == 0:
719 continue
720 commits_both_active = union_count
721 rate = round(co_changes / union_count, 6)
722 pairs.append({
723 "symbol_a": a,
724 "symbol_b": b,
725 "file_a": file_a,
726 "file_b": file_b,
727 "co_changes": co_changes,
728 "commits_both_active": commits_both_active,
729 "co_change_rate": rate,
730 "same_file": False,
731 "structurally_linked": False,
732 "a_in_test": "test" in file_a.lower(),
733 "b_in_test": "test" in file_b.lower(),
734 })
735
736 pairs.sort(key=lambda p: (-p["co_change_rate"], -p["co_changes"]))
737 truncated = len(pairs) > self._MAX_PAIRS
738 pairs = pairs[: self._MAX_PAIRS]
739
740 # ── Step 5: delete stale rows, upsert fresh set ───────────────────────
741 await session.execute(
742 sa.delete(MusehubIntelEntangle).where(
743 MusehubIntelEntangle.repo_id == repo_id
744 )
745 )
746 for p in pairs:
747 await session.execute(
748 pg_insert(MusehubIntelEntangle)
749 .values(
750 repo_id=repo_id,
751 symbol_a=p["symbol_a"],
752 symbol_b=p["symbol_b"],
753 co_change_rate=p["co_change_rate"],
754 co_changes=p["co_changes"],
755 commits_both_active=p["commits_both_active"],
756 file_a=p["file_a"],
757 file_b=p["file_b"],
758 same_file=p["same_file"],
759 a_in_test=p["a_in_test"],
760 b_in_test=p["b_in_test"],
761 structurally_linked=p["structurally_linked"],
762 ref=ref,
763 )
764 .on_conflict_do_update(
765 index_elements=["repo_id", "symbol_a", "symbol_b"],
766 set_={
767 "co_change_rate": p["co_change_rate"],
768 "co_changes": p["co_changes"],
769 "commits_both_active": p["commits_both_active"],
770 "file_a": p["file_a"],
771 "file_b": p["file_b"],
772 "same_file": p["same_file"],
773 "a_in_test": p["a_in_test"],
774 "b_in_test": p["b_in_test"],
775 "structurally_linked": p["structurally_linked"],
776 "ref": ref,
777 },
778 )
779 )
780
781 return [("intel.code.entangle", {
782 "count": len(pairs),
783 "commits_analysed": len(walk_order),
784 "truncated": truncated,
785 })]
786
787 _DEAD_TRACKED_KINDS = frozenset(
788 {"function", "async_function", "method", "async_method", "class"}
789 )
790
791 _DEAD_REASONS: dict[str, str] = {
792 "high": "Added once, never modified. Zero blast radius in full history.",
793 "medium": "Modified in past but zero blast radius for ≥ 30 days.",
794 "low": "Zero blast radius. Recently active — verify before deleting.",
795 }
796
797 class DeadProvider:
798 """Persist dead-code candidates derived from musehub_symbol_intel."""
799
800 async def compute(
801 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
802 ) -> IntelResults:
803 import sqlalchemy as sa
804
805 rows_result = await session.execute(
806 sa.select(MusehubSymbolIntel).where(
807 MusehubSymbolIntel.repo_id == repo_id,
808 MusehubSymbolIntel.symbol_kind.in_(_DEAD_TRACKED_KINDS),
809 MusehubSymbolIntel.blast == 0,
810 )
811 )
812 rows = rows_result.scalars().all()
813 if not rows:
814 return []
815
816 count = 0
817 for row in rows:
818 if row.churn == 1:
819 confidence = "high"
820 elif row.churn_30d == 0:
821 confidence = "medium"
822 else:
823 confidence = "low"
824
825 stmt = (
826 pg_insert(MusehubIntelDead)
827 .values(
828 repo_id=repo_id,
829 address=row.address,
830 kind=row.symbol_kind or "unknown",
831 confidence=confidence,
832 reason=_DEAD_REASONS[confidence],
833 ref=ref,
834 dismissed=False,
835 )
836 .on_conflict_do_update(
837 index_elements=["repo_id", "address"],
838 set_={
839 "kind": row.symbol_kind or "unknown",
840 "confidence": confidence,
841 "reason": _DEAD_REASONS[confidence],
842 "ref": ref,
843 "dismissed": sa.func.coalesce(
844 MusehubIntelDead.__table__.c.dismissed,
845 False,
846 ),
847 },
848 )
849 )
850 await session.execute(stmt)
851 count += 1
852
853 return [("intel.code.dead", {"count": count})]
854
855 _BLAST_RISK_TRACKED_KINDS = frozenset(
856 {"function", "async_function", "method", "async_method", "class"}
857 )
858
859 # Weights must sum to 100.
860 _BLAST_RISK_WEIGHTS = {
861 "impact": 40,
862 "churn": 25,
863 "test_gap": 20,
864 "coupling": 15,
865 }
866
867 def _compute_risk_score(
868 *,
869 impact_score: float,
870 churn_score: float,
871 test_gap_score: float,
872 coupling_score: float,
873 ) -> int:
874 """Return composite risk score 0–100 from four normalized sub-scores.
875
876 Each sub-score must be in [0.0, 1.0]. The result is clamped to [0, 100]
877 so overflow inputs (e.g. blast=9999) never produce an out-of-range value.
878
879 Weights: impact=40, churn=25, test_gap=20, coupling=15.
880 """
881 raw = (
882 impact_score * _BLAST_RISK_WEIGHTS["impact"] +
883 churn_score * _BLAST_RISK_WEIGHTS["churn"] +
884 test_gap_score * _BLAST_RISK_WEIGHTS["test_gap"] +
885 coupling_score * _BLAST_RISK_WEIGHTS["coupling"]
886 )
887 return min(100, max(0, round(raw)))
888
889 def _risk_tier(risk_score: int) -> str:
890 """Map a composite risk score to a named tier.
891
892 critical → score >= 75
893 high → score >= 50
894 medium → score >= 25
895 low → score < 25
896 """
897 if risk_score >= 75:
898 return "critical"
899 if risk_score >= 50:
900 return "high"
901 if risk_score >= 25:
902 return "medium"
903 return "low"
904
905 class BlastRiskProvider:
906 """Persist composite pre-release risk scores derived from ``musehub_symbol_intel``.
907
908 Replaces the former ``muse code blast-risk`` subprocess approach with a pure
909 SQL derivation so the provider works in every environment without a local
910 muse repository on disk.
911
912 Data source
913 -----------
914 ``musehub_symbol_intel`` columns:
915 - ``blast`` → normalized to ``impact_score`` (cap at 50)
916 - ``churn_30d`` → normalized to ``churn_score`` (cap at 20)
917 - ``blast_cross`` → normalized to ``coupling_score``(cap at 10)
918 - test_gap_score is fixed at 1.0 until symbol-level coverage data lands
919
920 Only tracked kinds (function / async_function / method / async_method / class)
921 with ``blast > 0`` are candidates — zero-blast symbols have no downstream impact.
922
923 Output
924 ------
925 Upserts into ``musehub_intel_blast_risk`` (repo_id, address) and returns
926 ``[("intel.code.blast_risk", {"count": N})]``.
927 """
928
929 async def compute(
930 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
931 ) -> IntelResults:
932 """Derive and persist blast-risk scores for all tracked symbols in ``repo_id``.
933
934 Parameters
935 ----------
936 session: Async SQLAlchemy session.
937 repo_id: Target repository ID.
938 ref: Head commit ref that triggered this run (stored per-row).
939 payload: Raw job payload (unused — kept for protocol compatibility).
940
941 Returns
942 -------
943 A one-element list ``[("intel.code.blast_risk", {"count": N})]`` where N
944 is the number of symbols upserted, or ``[]`` if no candidates exist.
945
946 Side effects
947 ------------
948 Upserts rows in ``musehub_intel_blast_risk``. Never touches
949 ``musehub_symbol_intel`` blast/churn columns.
950 """
951 import sqlalchemy as sa
952
953 rows_result = await session.execute(
954 sa.select(MusehubSymbolIntel).where(
955 MusehubSymbolIntel.repo_id == repo_id,
956 MusehubSymbolIntel.symbol_kind.in_(_BLAST_RISK_TRACKED_KINDS),
957 MusehubSymbolIntel.blast > 0,
958 ).execution_options(populate_existing=True)
959 )
960 rows = rows_result.scalars().all()
961 if not rows:
962 return []
963
964 count = 0
965 for row in rows:
966 impact_score = min(row.blast / 50.0, 1.0)
967 churn_score = min((row.churn_30d or 0) / 20.0, 1.0)
968 test_gap_score = 1.0 # no coverage data yet — worst case
969 coupling_score = min((row.blast_cross or 0) / 10.0, 1.0)
970
971 risk_score = _compute_risk_score(
972 impact_score=impact_score,
973 churn_score=churn_score,
974 test_gap_score=test_gap_score,
975 coupling_score=coupling_score,
976 )
977 risk = _risk_tier(risk_score)
978
979 stmt = (
980 pg_insert(MusehubIntelBlastRisk)
981 .values(
982 repo_id=repo_id,
983 address=row.address,
984 kind=row.symbol_kind or "unknown",
985 risk=risk,
986 risk_score=risk_score,
987 impact_score=impact_score,
988 churn_score=churn_score,
989 test_gap_score=test_gap_score,
990 coupling_score=coupling_score,
991 ref=ref,
992 )
993 .on_conflict_do_update(
994 index_elements=["repo_id", "address"],
995 set_={
996 "kind": row.symbol_kind or "unknown",
997 "risk": risk,
998 "risk_score": risk_score,
999 "impact_score": impact_score,
1000 "churn_score": churn_score,
1001 "test_gap_score": test_gap_score,
1002 "coupling_score": coupling_score,
1003 "ref": ref,
1004 },
1005 )
1006 )
1007 await session.execute(stmt)
1008 count += 1
1009
1010 return [("intel.code.blast_risk", {"count": count})]
1011
1012 def _days_stable_from_dt(last_changed: datetime | None) -> int:
1013 """Return whole days elapsed since ``last_changed`` as of now (UTC).
1014
1015 Parameters
1016 ----------
1017 last_changed:
1018 UTC-aware or naive datetime of the symbol's last modification.
1019 Naive datetimes are treated as UTC. ``None`` returns 0.
1020
1021 Returns
1022 -------
1023 Non-negative integer — days elapsed, clamped to zero (future timestamps
1024 return 0 rather than a negative value).
1025
1026 Examples
1027 --------
1028 >>> from datetime import datetime, timezone, timedelta
1029 >>> _days_stable_from_dt(datetime.now(timezone.utc) - timedelta(days=90))
1030 90
1031 >>> _days_stable_from_dt(None)
1032 0
1033 """
1034 if last_changed is None:
1035 return 0
1036 now = datetime.now(timezone.utc)
1037 if last_changed.tzinfo is None:
1038 last_changed = last_changed.replace(tzinfo=timezone.utc)
1039 delta = now - last_changed
1040 return max(0, delta.days)
1041
1042 class StableProvider:
1043 """Persist stability records derived directly from ``musehub_symbol_intel``.
1044
1045 A symbol is *stable* when it has not changed in the last 30 or 90 days
1046 (``churn_30d == 0`` and ``churn_90d == 0``) and has a known ``last_changed``
1047 timestamp. ``days_stable`` is calendar days elapsed since ``last_changed``.
1048 ``since_start`` is True when the symbol's lifetime ``churn`` is zero.
1049 """
1050
1051 async def compute(
1052 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
1053 ) -> IntelResults:
1054 import sqlalchemy as sa
1055
1056 sym_result = await session.execute(
1057 sa.select(MusehubSymbolIntel).where(
1058 MusehubSymbolIntel.repo_id == repo_id,
1059 MusehubSymbolIntel.churn_30d == 0,
1060 MusehubSymbolIntel.churn_90d == 0,
1061 MusehubSymbolIntel.last_changed.is_not(None),
1062 )
1063 )
1064 qualifying = sym_result.scalars().all()
1065 if not qualifying:
1066 return []
1067
1068 count = 0
1069 for sym in qualifying:
1070 days_stable = _days_stable_from_dt(sym.last_changed)
1071 since_start = sym.churn == 0
1072 stmt = (
1073 pg_insert(MusehubIntelStable)
1074 .values(
1075 repo_id=repo_id,
1076 address=sym.address,
1077 days_stable=days_stable,
1078 since_start=since_start,
1079 last_changed_commit=sym.last_commit_id,
1080 symbol_kind=sym.symbol_kind,
1081 ref=ref,
1082 )
1083 .on_conflict_do_update(
1084 index_elements=["repo_id", "address"],
1085 set_={
1086 "days_stable": days_stable,
1087 "since_start": since_start,
1088 "last_changed_commit": sym.last_commit_id,
1089 "symbol_kind": sym.symbol_kind,
1090 "ref": ref,
1091 },
1092 )
1093 )
1094 await session.execute(stmt)
1095 count += 1
1096
1097 return [("intel.code.stable", {"count": count, "truncated": False})]
1098
1099 class VelocityProvider:
1100 """Persist module growth velocity by mining musehub_symbol_history_entries.
1101
1102 Mirrors ``muse code velocity`` exactly — same BFS commit walk, same
1103 two-window (current / prior) structure, same per-module metrics.
1104
1105 Algorithm
1106 ---------
1107 1. Fetch all commits for the repo; build a commit_id → parent_ids map.
1108 2. BFS-walk from HEAD ref, cap at _MAX_WALK commits.
1109 3. Split walk_order into current window (0.._WINDOW-1) and prior window
1110 (_WINDOW..2*_WINDOW-1).
1111 4. Bulk-fetch all history entries for the repo.
1112 5. Per-commit per-module: accumulate added / removed / modified counts.
1113 Module is derived as the directory of the file component of each address.
1114 6. active_commits = distinct commits in window that touched the module.
1115 stagnant_commits = commits in window where module's commit-level net == 0.
1116 7. acceleration = current_net - prior_net.
1117 8. Sort by active_commits DESC; truncate to _TOP; DELETE stale; upsert fresh.
1118
1119 Constants
1120 ---------
1121 _MAX_WALK = 10_000 BFS depth cap
1122 _WINDOW = 20 commits per analysis window (current + prior each)
1123 _TOP = 20 stored leaderboard size
1124 """
1125
1126 _MAX_WALK = 10_000
1127 _WINDOW = 20
1128 _TOP = 20
1129
1130 async def compute(
1131 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
1132 ) -> IntelResults:
1133 import sqlalchemy as sa
1134 from collections import defaultdict
1135
1136 # ── Step 1: fetch all commits, BFS walk from HEAD ─────────────────────
1137 walk_order = await _load_commit_walk(session, repo_id, ref, self._MAX_WALK)
1138 if not walk_order:
1139 return []
1140
1141 current_window = set(walk_order[: self._WINDOW])
1142 prior_window = set(walk_order[self._WINDOW : self._WINDOW * 2])
1143 combined_window = current_window | prior_window
1144
1145 # ── Step 2: bulk-fetch history entries ────────────────────────────────
1146 history_result = await session.execute(
1147 sa.select(
1148 MusehubSymbolHistoryEntry.commit_id,
1149 MusehubSymbolHistoryEntry.address,
1150 MusehubSymbolHistoryEntry.op,
1151 ).where(MusehubSymbolHistoryEntry.repo_id == repo_id)
1152 )
1153
1154 def _module(addr: str) -> str:
1155 file = addr.split("::")[0] if "::" in addr else addr
1156 if "/" in file:
1157 return f"{file.rsplit('/', 1)[0]}/"
1158 return f"{file}/"
1159
1160 # per_commit_module[(cid, module)] = {added, removed, modified}
1161 per_commit_module: dict[tuple[str, str], dict[str, int]] = defaultdict(
1162 lambda: {"added": 0, "removed": 0, "modified": 0}
1163 )
1164 for cid, addr, op in history_result:
1165 if cid not in combined_window or not addr:
1166 continue
1167 mod = _module(addr)
1168 bucket = per_commit_module[(cid, mod)]
1169 if op == "add":
1170 bucket["added"] += 1
1171 elif op == "delete":
1172 bucket["removed"] += 1
1173 else: # modify, rename
1174 bucket["modified"] += 1
1175
1176 # ── Step 3: aggregate per-module per-window ───────────────────────────
1177 def _blank() -> IntDict:
1178 return {"added": 0, "removed": 0, "modified": 0, "net": 0,
1179 "active_commits": 0, "stagnant_commits": 0}
1180
1181 current_stats: ModuleStatsMap = defaultdict(_blank)
1182 prior_stats: ModuleStatsMap = defaultdict(_blank)
1183
1184 for (cid, mod), counts in per_commit_module.items():
1185 commit_net = counts["added"] - counts["removed"]
1186 target = current_stats[mod] if cid in current_window else prior_stats[mod]
1187 target["added"] += counts["added"]
1188 target["removed"] += counts["removed"]
1189 target["modified"] += counts["modified"]
1190 target["net"] += commit_net
1191 target["active_commits"] += 1
1192 if commit_net == 0:
1193 target["stagnant_commits"] += 1
1194
1195 all_modules = set(current_stats) | set(prior_stats)
1196 if not all_modules:
1197 return []
1198
1199 # ── Step 4: build final records ───────────────────────────────────────
1200 records = []
1201 for mod in all_modules:
1202 cur = current_stats[mod]
1203 pri = prior_stats[mod]
1204 records.append({
1205 "module": mod,
1206 "added": cur["added"],
1207 "removed": cur["removed"],
1208 "modified": cur["modified"],
1209 "net": cur["net"],
1210 "active_commits": cur["active_commits"],
1211 "stagnant_commits": cur["stagnant_commits"],
1212 "prior_added": pri["added"],
1213 "prior_net": pri["net"],
1214 "prior_modified": pri["modified"],
1215 "prior_active_commits": pri["active_commits"],
1216 "acceleration": float(cur["net"] - pri["net"]),
1217 "window_size": self._WINDOW,
1218 "commits_analysed": len(walk_order),
1219 })
1220
1221 records.sort(key=lambda r: -r["active_commits"])
1222 truncated = len(records) > self._TOP
1223 records = records[: self._TOP]
1224
1225 # ── Step 5: delete stale rows, upsert fresh set ───────────────────────
1226 await session.execute(
1227 sa.delete(MusehubIntelVelocity).where(
1228 MusehubIntelVelocity.repo_id == repo_id
1229 )
1230 )
1231 for r in records:
1232 await session.execute(
1233 pg_insert(MusehubIntelVelocity)
1234 .values(
1235 repo_id=repo_id,
1236 module=r["module"],
1237 added=r["added"],
1238 removed=r["removed"],
1239 net=r["net"],
1240 modified=r["modified"],
1241 active_commits=r["active_commits"],
1242 prior_added=r["prior_added"],
1243 prior_net=r["prior_net"],
1244 prior_modified=r["prior_modified"],
1245 prior_active_commits=r["prior_active_commits"],
1246 acceleration=r["acceleration"],
1247 stagnant_commits=r["stagnant_commits"],
1248 window_size=r["window_size"],
1249 commits_analysed=r["commits_analysed"],
1250 ref=ref,
1251 )
1252 .on_conflict_do_update(
1253 index_elements=["repo_id", "module"],
1254 set_={
1255 "added": r["added"],
1256 "removed": r["removed"],
1257 "net": r["net"],
1258 "modified": r["modified"],
1259 "active_commits": r["active_commits"],
1260 "prior_added": r["prior_added"],
1261 "prior_net": r["prior_net"],
1262 "prior_modified": r["prior_modified"],
1263 "prior_active_commits": r["prior_active_commits"],
1264 "acceleration": r["acceleration"],
1265 "stagnant_commits": r["stagnant_commits"],
1266 "window_size": r["window_size"],
1267 "commits_analysed": r["commits_analysed"],
1268 "ref": ref,
1269 },
1270 )
1271 )
1272
1273 return [("intel.code.velocity", {
1274 "count": len(records),
1275 "commits_analysed": len(walk_order),
1276 "truncated": truncated,
1277 })]
1278
1279 _EXT_TO_LANGUAGE: dict[str, str] = {
1280 ".py": "Python", ".js": "JavaScript", ".ts": "TypeScript",
1281 ".tsx": "TypeScript", ".jsx": "JavaScript", ".rs": "Rust",
1282 ".go": "Go", ".rb": "Ruby", ".java": "Java", ".kt": "Kotlin",
1283 ".swift": "Swift", ".cpp": "C++", ".c": "C", ".cs": "C#",
1284 ".php": "PHP", ".md": "Markdown", ".toml": "TOML", ".yaml": "YAML",
1285 ".yml": "YAML", ".json": "JSON", ".sh": "Shell", ".sql": "SQL",
1286 }
1287
1288 def _lang_from_address(address: str) -> str:
1289 """Infer language from the file portion of a symbol address."""
1290 file_part = address.split("::")[0] if "::" in address else address
1291 ext = f".{file_part.rsplit('.', 1)[-1]}" if "." in file_part else ""
1292 return _EXT_TO_LANGUAGE.get(ext.lower(), "Unknown")
1293
1294 class ClonesProvider:
1295 """Persist exact duplicate-code clusters derived from musehub_hash_occurrence_entries.
1296
1297 Reads ``musehub_hash_occurrence_entries`` — populated by ``CodeProvider``
1298 on every push — and groups addresses by ``content_id``. Any content_id
1299 with 2+ distinct addresses is an exact-clone cluster. No subprocess or
1300 on-disk repo access required: all data comes from the DB.
1301
1302 Near-clone detection (``signature_id`` grouping) requires ``signature_id``
1303 to be stored at push time, which is not yet implemented. All clusters
1304 produced here are tier ``"exact"``.
1305 """
1306
1307 _MIN_CLUSTER = 2
1308
1309 async def compute(
1310 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
1311 ) -> IntelResults:
1312 import sqlalchemy as sa
1313
1314 # Fetch all (content_id, address, symbol_kind) for this repo where
1315 # content_id appears on 2+ distinct addresses (exact clone).
1316 subq = (
1317 sa.select(MusehubHashOccurrenceEntry.content_id)
1318 .where(MusehubHashOccurrenceEntry.repo_id == repo_id)
1319 .group_by(MusehubHashOccurrenceEntry.content_id)
1320 .having(sa.func.count() >= self._MIN_CLUSTER)
1321 .scalar_subquery()
1322 )
1323 rows_result = await session.execute(
1324 sa.select(
1325 MusehubHashOccurrenceEntry.content_id,
1326 MusehubHashOccurrenceEntry.address,
1327 MusehubSymbolIntel.symbol_kind,
1328 )
1329 .outerjoin(
1330 MusehubSymbolIntel,
1331 sa.and_(
1332 MusehubSymbolIntel.repo_id == MusehubHashOccurrenceEntry.repo_id,
1333 MusehubSymbolIntel.address == MusehubHashOccurrenceEntry.address,
1334 ),
1335 )
1336 .where(
1337 MusehubHashOccurrenceEntry.repo_id == repo_id,
1338 MusehubHashOccurrenceEntry.content_id.in_(subq),
1339 )
1340 )
1341 rows = rows_result.all()
1342 if not rows:
1343 return []
1344
1345 # Group into clusters keyed by content_id.
1346 clusters: dict[str, list[dict]] = {}
1347 for content_id, address, symbol_kind in rows:
1348 if content_id not in clusters:
1349 clusters[content_id] = []
1350 clusters[content_id].append({
1351 "address": address,
1352 "kind": symbol_kind or "unknown",
1353 "language": _lang_from_address(address),
1354 "body_hash": content_id,
1355 "signature_id": content_id,
1356 "content_id": content_id,
1357 })
1358
1359 count = 0
1360 for content_id, members in clusters.items():
1361 # Stable cluster hash: sha256 of the sorted content_id string.
1362 cluster_hash = blob_id(content_id.encode())
1363 members_json = json.dumps(members)
1364 stmt = (
1365 pg_insert(MusehubIntelClones)
1366 .values(
1367 repo_id=repo_id,
1368 cluster_hash=cluster_hash,
1369 tier="exact",
1370 member_count=len(members),
1371 members_json=members_json,
1372 ref=ref,
1373 )
1374 .on_conflict_do_update(
1375 index_elements=["repo_id", "cluster_hash"],
1376 set_={
1377 "tier": "exact",
1378 "member_count": len(members),
1379 "members_json": members_json,
1380 "ref": ref,
1381 },
1382 )
1383 )
1384 await session.execute(stmt)
1385 count += 1
1386
1387 return [("intel.code.clones", {"count": count})]
1388
1389 class TypeProvider:
1390 """Persist per-symbol type-annotation health derived from stored object content.
1391
1392 Reads the HEAD snapshot manifest from ``musehub_snapshots.manifest_blob``
1393 (msgpack dict of path → object_id), fetches each Python file's bytes via
1394 the storage backend, runs ``muse.core.type_analysis.extract_annotations``
1395 (pure AST — no subprocess, no on-disk repo checkout), then batch-upserts
1396 into ``musehub_intel_type`` in chunks of 1,000 rows.
1397
1398 This follows the same DB-only pattern as every other intel provider:
1399 all data flows from objects stored at push time; nothing requires a
1400 working muse repository on disk.
1401 """
1402
1403 _CHUNK = 1_000
1404
1405 async def compute(
1406 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
1407 ) -> IntelResults:
1408 import msgpack
1409 from muse.core.type_analysis import extract_annotations
1410 from musehub.storage.backends import get_backend
1411
1412 # ── 1. Resolve owner/slug for the storage backend ────────────────────
1413 owner: str | None = payload.get("owner") # type: ignore[assignment]
1414 slug: str | None = payload.get("slug") # type: ignore[assignment]
1415 if not (owner and slug):
1416 row = await session.execute(
1417 select(MusehubRepo.owner, MusehubRepo.slug)
1418 .where(MusehubRepo.repo_id == repo_id)
1419 .limit(1)
1420 )
1421 result = row.first()
1422 if result is None:
1423 return []
1424 owner, slug = result
1425
1426 # ── 2. Fetch HEAD commit → snapshot manifest ─────────────────────────
1427 commit_row = await session.execute(
1428 select(MusehubCommit.snapshot_id)
1429 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
1430 .where(MusehubCommitRef.repo_id == repo_id)
1431 .order_by(MusehubCommit.timestamp.desc())
1432 .limit(1)
1433 )
1434 commit_result = commit_row.first()
1435 if commit_result is None or commit_result[0] is None:
1436 return []
1437 snapshot_id: str = commit_result[0]
1438
1439 snap_row = await session.execute(
1440 select(MusehubSnapshot.manifest_blob)
1441 .where(MusehubSnapshot.snapshot_id == snapshot_id)
1442 .limit(1)
1443 )
1444 snap_result = snap_row.first()
1445 if snap_result is None or snap_result[0] is None:
1446 return []
1447
1448 manifest: dict[str, str] = msgpack.unpackb(bytes(snap_result[0]), raw=False)
1449
1450 # ── 3. Extract type annotations from each Python file ─────────────────
1451 from musehub.storage.backends import read_object_bytes as _read_obj_bytes
1452 from musehub.types.compression import decompress_if_needed as _decompress
1453 from musehub.db.musehub_repo_models import MusehubObject as _MusehubObject
1454 py_entries = [(path, oid) for path, oid in manifest.items() if path.endswith(".py")]
1455 if not py_entries:
1456 return []
1457
1458 rows: list[dict] = []
1459 for path, object_id in py_entries:
1460 obj_row = await session.get(_MusehubObject, object_id)
1461 if obj_row is None:
1462 continue
1463 raw = await _read_obj_bytes(obj_row, session=session)
1464 if not raw:
1465 continue
1466 src = _decompress(raw)
1467 try:
1468 ann_map = extract_annotations(src, path)
1469 except SyntaxError:
1470 continue
1471 for address, ann in ann_map.items():
1472 rows.append({
1473 "repo_id": repo_id,
1474 "address": address,
1475 "kind": ann.get("kind", "unknown"),
1476 "return_is_any": ann.get("return_is_any", False),
1477 "params_total": ann.get("params_total", 0),
1478 "params_annotated": ann.get("params_annotated", 0),
1479 "params_with_any": ann.get("params_with_any", 0),
1480 "type_score": ann.get("type_score", 0.0),
1481 "return_annotation": (ann.get("return_annotation") or "")[:256] or None,
1482 "ref": ref,
1483 })
1484
1485 if not rows:
1486 return []
1487
1488 # ── 4. Batch-upsert in chunks of 1,000 ───────────────────────────────
1489 for i in range(0, len(rows), self._CHUNK):
1490 chunk = rows[i : i + self._CHUNK]
1491 stmt = (
1492 pg_insert(MusehubIntelType)
1493 .values(chunk)
1494 .on_conflict_do_update(
1495 index_elements=["repo_id", "address"],
1496 set_={
1497 "kind": sa.literal_column("excluded.kind"),
1498 "return_is_any": sa.literal_column("excluded.return_is_any"),
1499 "params_total": sa.literal_column("excluded.params_total"),
1500 "params_annotated": sa.literal_column("excluded.params_annotated"),
1501 "params_with_any": sa.literal_column("excluded.params_with_any"),
1502 "type_score": sa.literal_column("excluded.type_score"),
1503 "return_annotation": sa.literal_column("excluded.return_annotation"),
1504 "ref": sa.literal_column("excluded.ref"),
1505 },
1506 )
1507 )
1508 await session.execute(stmt)
1509
1510 return [("intel.code.type", {"count": len(rows)})]
1511
1512 class ApiSurfaceProvider:
1513 """Persist public API surface entries by parsing stored snapshot objects.
1514
1515 Reads the HEAD snapshot manifest from ``musehub_snapshots.manifest_blob``
1516 (msgpack dict of path → object_id), fetches each source file's bytes via
1517 the storage backend, runs ``parse_symbols`` (pure AST — no subprocess, no
1518 on-disk repo checkout), filters to public symbols using the same
1519 ``_is_public`` predicate as ``muse code api-surface``, then batch-upserts
1520 into ``musehub_intel_api_surface`` in chunks of 1,000 rows.
1521
1522 This follows the same DB-only pattern as ``TypeProvider``:
1523 all data flows from objects stored at push time; nothing requires a
1524 working muse repository on disk.
1525
1526 Parameters
1527 ----------
1528 session:
1529 Active async SQLAlchemy session provided by the push worker.
1530 repo_id:
1531 Opaque repo identifier from ``musehub_repos.repo_id``.
1532 ref:
1533 Branch or commit ref string at push time (e.g. ``"dev"``).
1534 payload:
1535 Raw push-event JSON — used to resolve owner/slug for the storage
1536 backend when not already present on the session.
1537
1538 Returns
1539 -------
1540 List of ``(event_name, metadata)`` tuples consumed by the telemetry layer.
1541 Returns ``[("intel.code.api_surface", {"count": N})]`` on success,
1542 ``[]`` when the snapshot manifest cannot be resolved or no public symbols
1543 are found.
1544 """
1545
1546 _CHUNK = 1_000
1547
1548 _PUBLIC_KINDS: frozenset[str] = frozenset({
1549 "function", "async_function", "class", "method", "async_method",
1550 })
1551
1552 def _is_public(self, name: str, kind: str) -> bool:
1553 return kind in self._PUBLIC_KINDS and not name.split(".")[-1].startswith("_")
1554
1555 async def compute(
1556 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
1557 ) -> IntelResults:
1558 import msgpack
1559
1560 # ── 1. Resolve owner/slug for the storage backend ────────────────────
1561 owner: str | None = payload.get("owner") # type: ignore[assignment]
1562 slug: str | None = payload.get("slug") # type: ignore[assignment]
1563 if not (owner and slug):
1564 row = await session.execute(
1565 select(MusehubRepo.owner, MusehubRepo.slug)
1566 .where(MusehubRepo.repo_id == repo_id)
1567 .limit(1)
1568 )
1569 result = row.first()
1570 if result is None:
1571 return []
1572 owner, slug = result
1573
1574 # ── 2. Fetch HEAD commit → snapshot manifest ─────────────────────────
1575 commit_row = await session.execute(
1576 select(MusehubCommit.snapshot_id)
1577 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
1578 .where(MusehubCommitRef.repo_id == repo_id)
1579 .order_by(MusehubCommit.timestamp.desc())
1580 .limit(1)
1581 )
1582 commit_result = commit_row.first()
1583 if commit_result is None or commit_result[0] is None:
1584 return []
1585 snapshot_id: str = commit_result[0]
1586
1587 snap_row = await session.execute(
1588 select(MusehubSnapshot.manifest_blob)
1589 .where(MusehubSnapshot.snapshot_id == snapshot_id)
1590 .limit(1)
1591 )
1592 snap_result = snap_row.first()
1593 if snap_result is None or snap_result[0] is None:
1594 return []
1595
1596 manifest: dict[str, str] = msgpack.unpackb(bytes(snap_result[0]), raw=False)
1597
1598 # ── 3. Parse public symbols from each source file ─────────────────────
1599 from musehub.storage.backends import read_object_bytes as _read_obj_bytes
1600 from musehub.types.compression import decompress_if_needed as _decompress
1601 from musehub.db.musehub_repo_models import MusehubObject as _MusehubObject
1602 rows: list[dict] = []
1603
1604 for file_path, object_id in manifest.items():
1605 obj_row = await session.get(_MusehubObject, object_id)
1606 if obj_row is None:
1607 continue
1608 raw = await _read_obj_bytes(obj_row, session=session)
1609 if not raw:
1610 continue
1611 src = _decompress(raw)
1612 try:
1613 tree = parse_symbols(src, file_path)
1614 except Exception:
1615 continue
1616 for address, rec in tree.items():
1617 if not self._is_public(rec["name"], rec["kind"]):
1618 continue
1619 rows.append({
1620 "repo_id": repo_id,
1621 "address": address,
1622 "kind": rec["kind"],
1623 "signature_id": rec.get("signature_id"),
1624 "visibility": "public",
1625 "ref": ref,
1626 })
1627
1628 if not rows:
1629 return []
1630
1631 # ── 4. Batch-upsert in chunks of 1,000 ───────────────────────────────
1632 for i in range(0, len(rows), self._CHUNK):
1633 chunk = rows[i : i + self._CHUNK]
1634 stmt = (
1635 pg_insert(MusehubIntelApiSurface)
1636 .values(chunk)
1637 .on_conflict_do_update(
1638 index_elements=["repo_id", "address"],
1639 set_={
1640 "kind": sa.literal_column("excluded.kind"),
1641 "signature_id": sa.literal_column("excluded.signature_id"),
1642 "visibility": sa.literal_column("excluded.visibility"),
1643 "ref": sa.literal_column("excluded.ref"),
1644 },
1645 )
1646 )
1647 await session.execute(stmt)
1648
1649 return [("intel.code.api_surface", {"count": len(rows)})]
1650
1651 class LanguagesProvider:
1652 """Persist language composition derived from stored snapshot objects.
1653
1654 Reads the HEAD snapshot manifest from ``musehub_snapshots.manifest_blob``
1655 (msgpack dict of path → object_id), calls ``language_of()`` (pure extension
1656 map — no subprocess, no I/O) to group files by language, fetches each
1657 file's bytes from the storage backend, and runs ``parse_symbols()`` (pure
1658 AST — no subprocess) to count symbols and kind breakdowns per language.
1659 Aggregated results are batch-upserted into ``musehub_intel_languages`` in
1660 a single SQL statement (language count is typically < 20 rows per repo).
1661
1662 This follows the same DB-only pattern as ``TypeProvider`` and
1663 ``ApiSurfaceProvider``: all data flows from objects stored at push time;
1664 nothing requires a working muse repository on disk and no subprocess is
1665 ever spawned.
1666
1667 Parameters
1668 ----------
1669 session : AsyncSession
1670 Active async SQLAlchemy session provided by the push worker.
1671 repo_id : str
1672 Opaque repo identifier from ``musehub_repos.repo_id``.
1673 ref : str
1674 Branch or commit ref string at push time (e.g. ``"dev"``).
1675 payload : JSONObject
1676 Raw push-event JSON — used to resolve owner/slug for the storage
1677 backend when not already present in the session.
1678
1679 Returns
1680 -------
1681 IntelResults
1682 ``[("intel.code.languages", {"count": N})]`` on success where N is
1683 the number of distinct languages found. ``[]`` when the snapshot
1684 manifest cannot be resolved or the repo has no tracked files.
1685
1686 Notes
1687 -----
1688 Import pseudo-symbols (``kind == "import"``) are excluded from counts to
1689 match the default behaviour of ``muse code languages``. ``pct`` is
1690 computed as ``symbol_count / total_symbols * 100``; it is ``0.0`` for
1691 languages with no parseable symbols (assets, config, docs). All languages
1692 present in the manifest are stored — not just those with symbols — so the
1693 GUI can display the full file-type composition of the repo.
1694 """
1695
1696 _CHUNK = 1_000
1697 _IMPORT_KINDS: frozenset[str] = frozenset({"import"})
1698
1699 async def compute(
1700 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
1701 ) -> IntelResults:
1702 import msgpack
1703
1704 # ── 1. Resolve owner/slug for the storage backend ────────────────────
1705 owner: str | None = payload.get("owner") # type: ignore[assignment]
1706 slug: str | None = payload.get("slug") # type: ignore[assignment]
1707 if not (owner and slug):
1708 row = await session.execute(
1709 select(MusehubRepo.owner, MusehubRepo.slug)
1710 .where(MusehubRepo.repo_id == repo_id)
1711 .limit(1)
1712 )
1713 result = row.first()
1714 if result is None:
1715 return []
1716 owner, slug = result
1717
1718 # ── 2. Fetch HEAD commit → snapshot manifest ─────────────────────────
1719 commit_row = await session.execute(
1720 select(MusehubCommit.snapshot_id)
1721 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
1722 .where(MusehubCommitRef.repo_id == repo_id)
1723 .order_by(MusehubCommit.timestamp.desc())
1724 .limit(1)
1725 )
1726 commit_result = commit_row.first()
1727 if commit_result is None or commit_result[0] is None:
1728 return []
1729 snapshot_id: str = commit_result[0]
1730
1731 snap_row = await session.execute(
1732 select(MusehubSnapshot.manifest_blob)
1733 .where(MusehubSnapshot.snapshot_id == snapshot_id)
1734 .limit(1)
1735 )
1736 snap_result = snap_row.first()
1737 if snap_result is None or snap_result[0] is None:
1738 return []
1739
1740 manifest: dict[str, str] = msgpack.unpackb(bytes(snap_result[0]), raw=False)
1741 if not manifest:
1742 return []
1743
1744 # ── 3. Aggregate file counts by language (pure dict lookup, zero I/O) ─
1745 lang_files: dict[str, int] = {}
1746 for file_path in manifest:
1747 lang = language_of(file_path)
1748 lang_files[lang] = lang_files.get(lang, 0) + 1
1749
1750 # ── 4. Fetch bytes + parse symbols → kind counts per language ─────────
1751 backend = get_backend()
1752 lang_symbols: dict[str, int] = {}
1753 lang_kinds: LangKindsMap = {}
1754
1755 for file_path, object_id in manifest.items():
1756 src = await backend.get(object_id)
1757 if not src:
1758 continue
1759 try:
1760 tree = parse_symbols(src, file_path)
1761 except Exception:
1762 continue
1763 lang = language_of(file_path)
1764 kinds = lang_kinds.setdefault(lang, {})
1765 for rec in tree.values():
1766 kind: str = rec["kind"]
1767 if kind in self._IMPORT_KINDS:
1768 continue
1769 lang_symbols[lang] = lang_symbols.get(lang, 0) + 1
1770 kinds[kind] = kinds.get(kind, 0) + 1
1771
1772 # ── 5. Compute pct over total symbols across all languages ────────────
1773 total_symbols = sum(lang_symbols.values())
1774
1775 # ── 6. Build upsert rows — every language in the manifest ────────────
1776 rows = [
1777 {
1778 "repo_id": repo_id,
1779 "language": lang,
1780 "file_count": file_count,
1781 "symbol_count": lang_symbols.get(lang, 0),
1782 "pct": (lang_symbols.get(lang, 0) / total_symbols * 100)
1783 if total_symbols > 0 else 0.0,
1784 "kinds_json": lang_kinds.get(lang) or None,
1785 "ref": ref,
1786 }
1787 for lang, file_count in lang_files.items()
1788 ]
1789
1790 # ── 7. Batch-upsert in chunks of 1,000 ───────────────────────────────
1791 for i in range(0, len(rows), self._CHUNK):
1792 chunk = rows[i : i + self._CHUNK]
1793 stmt = (
1794 pg_insert(MusehubIntelLanguages)
1795 .values(chunk)
1796 .on_conflict_do_update(
1797 index_elements=["repo_id", "language"],
1798 set_={
1799 "file_count": sa.literal_column("excluded.file_count"),
1800 "symbol_count": sa.literal_column("excluded.symbol_count"),
1801 "pct": sa.literal_column("excluded.pct"),
1802 "kinds_json": sa.literal_column("excluded.kinds_json"),
1803 "ref": sa.literal_column("excluded.ref"),
1804 },
1805 )
1806 )
1807 await session.execute(stmt)
1808
1809 return [("intel.code.languages", {"count": len(rows)})]
1810
1811 class DetectRefactorProvider:
1812 """Persist refactoring events derived from snapshot diffs at push time.
1813
1814 Algorithm
1815 ---------
1816 At each push the HEAD commit's snapshot is diffed against the parent
1817 commit's snapshot using pure-Python symbol parsing — no subprocess, no
1818 on-disk muse repository required.
1819
1820 For every symbol that appears in both HEAD and parent we compare:
1821
1822 * ``body_hash`` — hash of the function/class body bytes. A change
1823 means the *implementation* changed (kind="implementation").
1824 * ``signature_id`` — hash of the signature (name + parameter types +
1825 return type). A change means the *signature* changed
1826 (kind="signature"). Checked only when ``body_hash``
1827 is unchanged to avoid double-counting.
1828
1829 For symbols that are present in parent but absent in HEAD we check whether
1830 an identical ``body_hash`` exists at a *different address* in HEAD:
1831
1832 * Address differs, file differs → kind="move" (same body, different file)
1833 * Address differs, same file → kind="rename" (same body, different name)
1834
1835 Symbols that vanish with no body-hash match are silently dropped (deleted
1836 code produces no event).
1837
1838 Each event is upserted on ``event_id`` (stable hash of
1839 ``repo_id:commit_id:address:kind``) so re-runs are idempotent.
1840
1841 Data flow
1842 ---------
1843 push → job → this provider
1844 → HEAD commit row (musehub_commits)
1845 → HEAD snapshot manifest (musehub_snapshots.manifest_blob, msgpack)
1846 → parent snapshot manifest (same table)
1847 → backend.get(object_id) for each file in both manifests
1848 → parse_symbols(src, path) → body_hash / signature_id per symbol
1849 → diff → upsert into musehub_intel_refactor_events
1850 """
1851
1852 _CHUNK = 500
1853
1854 # ── helpers ──────────────────────────────────────────────────────────────
1855
1856 @staticmethod
1857 def _is_tracked(kind: str) -> bool:
1858 """True for symbol kinds that undergo meaningful refactoring."""
1859 return kind in {
1860 "function", "async_function",
1861 "method", "async_method",
1862 "class",
1863 }
1864
1865 @staticmethod
1866 async def _manifest_symbols(
1867 backend: StorageBackend,
1868 manifest: StrDict,
1869 ) -> SymbolInfoMap:
1870 """Parse all tracked symbols from a snapshot manifest.
1871
1872 Returns ``{address: {"body_hash": ..., "signature_id": ...,
1873 "file": ..., "kind": ...}}`` for every tracked symbol kind.
1874
1875 Errors in individual files are silently skipped so a single
1876 unparseable file cannot abort the whole push job.
1877 """
1878 out: SymbolInfoMap = {}
1879 for file_path, object_id in manifest.items():
1880 src = await backend.get(object_id)
1881 if not src:
1882 continue
1883 try:
1884 tree = parse_symbols(src, file_path)
1885 except Exception:
1886 continue
1887 for address, rec in tree.items():
1888 if not DetectRefactorProvider._is_tracked(rec.get("kind", "")):
1889 continue
1890 out[address] = {
1891 "body_hash": rec.get("body_hash", ""),
1892 "signature_id": rec.get("signature_id", ""),
1893 "file": file_path,
1894 "kind": rec.get("kind", ""),
1895 }
1896 return out
1897
1898 # ── main entry point ─────────────────────────────────────────────────────
1899
1900 async def compute(
1901 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
1902 ) -> IntelResults:
1903 """Diff HEAD vs parent snapshot and persist detected refactoring events.
1904
1905 Returns a single ``("intel.code.detect_refactor", {"count": N})``
1906 tuple where *N* is the number of events upserted for this push.
1907 Returns ``[]`` when there is no parent commit to diff against (initial
1908 push) or when the snapshot manifests cannot be loaded.
1909 """
1910 import msgpack
1911 import sqlalchemy as sa
1912
1913 owner: str | None = payload.get("owner") # type: ignore[assignment]
1914 slug: str | None = payload.get("slug") # type: ignore[assignment]
1915 if not owner or not slug:
1916 repo_result = await session.execute(
1917 sa.select(MusehubRepo.owner, MusehubRepo.slug)
1918 .where(MusehubRepo.repo_id == repo_id)
1919 )
1920 row = repo_result.first()
1921 if row is None:
1922 return []
1923 owner, slug = row.owner, row.slug
1924
1925 # ── 1. Fetch HEAD commit + its parent ─────────────────────────────────
1926 head_result = await session.execute(
1927 sa.select(
1928 MusehubCommit.commit_id,
1929 MusehubCommit.parent_ids,
1930 MusehubCommit.snapshot_id,
1931 MusehubCommit.message,
1932 MusehubCommit.timestamp,
1933 )
1934 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
1935 .where(MusehubCommitRef.repo_id == repo_id)
1936 .where(MusehubCommit.commit_id == ref)
1937 .limit(1)
1938 )
1939 head_row = head_result.first()
1940 if head_row is None:
1941 # Fallback: most-recent commit for this repo
1942 head_result2 = await session.execute(
1943 sa.select(
1944 MusehubCommit.commit_id,
1945 MusehubCommit.parent_ids,
1946 MusehubCommit.snapshot_id,
1947 MusehubCommit.message,
1948 MusehubCommit.timestamp,
1949 )
1950 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
1951 .where(MusehubCommitRef.repo_id == repo_id)
1952 .order_by(MusehubCommit.timestamp.desc())
1953 .limit(1)
1954 )
1955 head_row = head_result2.first()
1956 if head_row is None or not head_row.snapshot_id:
1957 return []
1958
1959 head_commit_id = head_row.commit_id
1960 head_message = head_row.message or ""
1961 head_committed = head_row.timestamp
1962 parent_ids: list[str] = head_row.parent_ids or []
1963 head_snap_id = head_row.snapshot_id
1964
1965 if not parent_ids:
1966 return [] # initial commit — nothing to diff against
1967
1968 parent_commit_id = parent_ids[0]
1969
1970 # ── 2. Fetch parent snapshot_id ───────────────────────────────────────
1971 parent_result = await session.execute(
1972 sa.select(MusehubCommit.snapshot_id)
1973 .where(MusehubCommit.commit_id == parent_commit_id)
1974 .limit(1)
1975 )
1976 parent_row = parent_result.first()
1977 if parent_row is None or not parent_row.snapshot_id:
1978 return []
1979
1980 parent_snap_id = parent_row.snapshot_id
1981
1982 # ── 3. Load both snapshot manifests ───────────────────────────────────
1983 snap_result = await session.execute(
1984 sa.select(
1985 MusehubSnapshot.snapshot_id,
1986 MusehubSnapshot.manifest_blob,
1987 ).where(
1988 MusehubSnapshot.snapshot_id.in_([head_snap_id, parent_snap_id])
1989 )
1990 )
1991 snap_rows = {row.snapshot_id: row.manifest_blob for row in snap_result}
1992
1993 head_blob = snap_rows.get(head_snap_id)
1994 parent_blob = snap_rows.get(parent_snap_id)
1995 if not head_blob or not parent_blob:
1996 return []
1997
1998 head_manifest: StrDict = msgpack.unpackb(bytes(head_blob), raw=False)
1999 parent_manifest: StrDict = msgpack.unpackb(bytes(parent_blob), raw=False)
2000
2001 # ── 4. Parse symbols from both snapshots ──────────────────────────────
2002 backend = get_backend()
2003 head_syms = await self._manifest_symbols(backend, head_manifest)
2004 parent_syms = await self._manifest_symbols(backend, parent_manifest)
2005
2006 # ── 5. Build reverse index: body_hash → address in HEAD ───────────────
2007 head_body_to_addr: dict[str, str] = {
2008 v["body_hash"]: addr
2009 for addr, v in head_syms.items()
2010 if v["body_hash"]
2011 }
2012
2013 # ── 6. Diff ────────────────────────────────────────────────────────────
2014 events: list[dict] = []
2015
2016 # 6a. Symbols present in HEAD — check for impl/sig changes
2017 for addr, hdata in head_syms.items():
2018 if addr not in parent_syms:
2019 continue # new symbol — not a refactoring event
2020 pdata = parent_syms[addr]
2021 if hdata["body_hash"] != pdata["body_hash"]:
2022 events.append({
2023 "kind": "implementation",
2024 "address": addr,
2025 "detail": f"body rewritten ({hdata['kind']})",
2026 })
2027 elif hdata["signature_id"] != pdata["signature_id"]:
2028 events.append({
2029 "kind": "signature",
2030 "address": addr,
2031 "detail": f"signature changed ({hdata['kind']})",
2032 })
2033
2034 # 6b. Symbols in parent but absent in HEAD — check for move/rename
2035 for addr, pdata in parent_syms.items():
2036 if addr in head_syms:
2037 continue # still present — handled above
2038 matched_addr = head_body_to_addr.get(pdata["body_hash"])
2039 if matched_addr is None:
2040 continue # deleted — not a refactoring event
2041 p_file = pdata["file"]
2042 m_file = head_syms[matched_addr]["file"]
2043 if m_file != p_file:
2044 kind = "move"
2045 else:
2046 kind = "rename"
2047 events.append({
2048 "kind": kind,
2049 "address": addr,
2050 "detail": matched_addr, # destination address
2051 })
2052
2053 if not events:
2054 return [("intel.code.detect_refactor", {"count": 0})]
2055
2056 # ── 7. Upsert events in chunks ─────────────────────────────────────────
2057 for i in range(0, len(events), self._CHUNK):
2058 chunk = events[i : i + self._CHUNK]
2059 values: list[dict] = []
2060 for ev in chunk:
2061 event_id = blob_id(
2062 f"{repo_id}:{head_commit_id}:{ev['address']}:{ev['kind']}".encode()
2063 )
2064 values.append({
2065 "event_id": event_id,
2066 "repo_id": repo_id,
2067 "kind": ev["kind"],
2068 "address": ev["address"],
2069 "detail": ev.get("detail"),
2070 "commit_id": head_commit_id,
2071 "commit_message": head_message,
2072 "committed_at": head_committed,
2073 })
2074 stmt = (
2075 pg_insert(MusehubIntelRefactorEvent)
2076 .values(values)
2077 .on_conflict_do_update(
2078 index_elements=["event_id"],
2079 set_={
2080 "kind": sa.literal_column("excluded.kind"),
2081 "address": sa.literal_column("excluded.address"),
2082 "detail": sa.literal_column("excluded.detail"),
2083 "commit_id": sa.literal_column("excluded.commit_id"),
2084 "commit_message": sa.literal_column("excluded.commit_message"),
2085 "committed_at": sa.literal_column("excluded.committed_at"),
2086 },
2087 )
2088 )
2089 await session.execute(stmt)
2090
2091 return [("intel.code.detect_refactor", {"count": len(events)})]
2092
2093 _GRAVITY_TRACKED_KINDS = frozenset(
2094 {"function", "async_function", "method", "async_method", "class"}
2095 )
2096
2097 # ---------------------------------------------------------------------------
2098 # BreakageProvider — stale-import detection
2099 # ---------------------------------------------------------------------------
2100
2101 class BreakageProvider:
2102 """Detect stale imports in the HEAD snapshot at push time.
2103
2104 Algorithm
2105 ---------
2106 A stale import is an import statement whose symbol cannot be found anywhere
2107 in the repo — neither as a non-import symbol in any tracked file, nor as a
2108 resolvable module path.
2109
2110 Pass 1 — build ``known_symbol_names``:
2111 Walk every file in the manifest via ``parse_symbols()``. Collect the
2112 ``name`` of every symbol whose ``kind`` is NOT ``"import"``.
2113
2114 Pass 2 — find unresolved imports:
2115 For every import record (``kind == "import"``) in every file:
2116
2117 * Extract ``(module_dotted, symbol_name)`` from ``qualified_name``
2118 (format: ``"import::<dotted.module>::<symbol_name>"``).
2119 * Try to resolve the dotted module path to a tracked file (same logic
2120 as ``CodemapProvider._resolve_import``).
2121 * If the module resolves → not stale (the target file exists; whether
2122 the symbol is exported cannot be checked cheaply).
2123 * If the module does NOT resolve AND ``symbol_name`` is not in
2124 ``known_symbol_names`` → stale import.
2125
2126 Each issue is upserted on a stable ``issue_id`` =
2127 ``blob_id(repo_id:file_path:symbol_name:stale_import)`` so re-runs on the
2128 same snapshot are idempotent.
2129
2130 A ``musehub_intel_breakage_meta`` row is upserted after every run with
2131 aggregate counts used by the stat chips.
2132
2133 No subprocess is ever spawned.
2134 """
2135
2136 _CHUNK = 1_000
2137
2138 async def compute(
2139 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
2140 ) -> IntelResults:
2141 """Detect stale imports and persist issues + meta for the HEAD snapshot."""
2142 import msgpack
2143
2144 # ── 1. Resolve owner/slug ─────────────────────────────────────────────
2145 owner: str | None = payload.get("owner") # type: ignore[assignment]
2146 slug: str | None = payload.get("slug") # type: ignore[assignment]
2147 if not (owner and slug):
2148 row = await session.execute(
2149 select(MusehubRepo.owner, MusehubRepo.slug)
2150 .where(MusehubRepo.repo_id == repo_id)
2151 .limit(1)
2152 )
2153 result = row.first()
2154 if result is None:
2155 return []
2156 owner, slug = result
2157
2158 # ── 2. HEAD commit → snapshot manifest ───────────────────────────────
2159 commit_row = await session.execute(
2160 select(MusehubCommit.snapshot_id)
2161 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
2162 .where(MusehubCommitRef.repo_id == repo_id)
2163 .order_by(MusehubCommit.timestamp.desc())
2164 .limit(1)
2165 )
2166 commit_result = commit_row.first()
2167 if commit_result is None or commit_result[0] is None:
2168 return [("intel.code.breakage", {"count": 0})]
2169 snapshot_id: str = commit_result[0]
2170
2171 snap_row = await session.execute(
2172 select(MusehubSnapshot.manifest_blob)
2173 .where(MusehubSnapshot.snapshot_id == snapshot_id)
2174 .limit(1)
2175 )
2176 snap_result = snap_row.first()
2177 if snap_result is None or snap_result[0] is None:
2178 return [("intel.code.breakage", {"count": 0})]
2179
2180 manifest: dict[str, str] = msgpack.unpackb(bytes(snap_result[0]), raw=False)
2181 if not manifest:
2182 return [("intel.code.breakage", {"count": 0})]
2183
2184 # ── 3. Pass 1 — build known symbol names ─────────────────────────────
2185 backend = get_backend()
2186 manifest_paths: set[str] = set(manifest.keys())
2187 known_symbol_names: set[str] = set()
2188 # file_path → list of (symbol_name, import_rec) for pass 2
2189 import_records: list[tuple[str, str, dict]] = [] # (file_path, address, rec)
2190
2191 for file_path, object_id in manifest.items():
2192 src = await backend.get(object_id)
2193 if not src:
2194 continue
2195 try:
2196 tree = parse_symbols(src, file_path)
2197 except Exception:
2198 continue
2199 for address, rec in tree.items():
2200 kind = rec.get("kind", "")
2201 if kind == "import":
2202 import_records.append((file_path, address, rec))
2203 else:
2204 name = rec.get("name", "")
2205 if name:
2206 known_symbol_names.add(name)
2207
2208 # ── 4. Pass 2 — detect stale imports ─────────────────────────────────
2209 # Use a dict keyed on issue_id to deduplicate: the same symbol may be
2210 # imported from multiple modules in the same file, producing the same
2211 # stable issue_id. Last write wins; all duplicates are semantically
2212 # identical so order doesn't matter.
2213 issues_map: dict[str, dict] = {}
2214 for file_path, _address, rec in import_records:
2215 qn: str = rec.get("qualified_name", "")
2216 parts = qn.split("::")
2217 if len(parts) < 3:
2218 continue
2219 module_dotted = parts[1]
2220 symbol_name = parts[2]
2221 if not symbol_name:
2222 continue
2223
2224 # If the module resolves to a tracked file → not stale
2225 resolved = CodemapProvider._resolve_import(qn, manifest_paths)
2226 if resolved:
2227 continue
2228
2229 # If the symbol name exists anywhere → not stale
2230 if symbol_name in known_symbol_names:
2231 continue
2232
2233 iid = blob_id(f"{repo_id}:{file_path}:{symbol_name}:stale_import".encode())
2234 issues_map[iid] = {
2235 "issue_id": iid,
2236 "repo_id": repo_id,
2237 "file_path": file_path,
2238 "issue_type": "stale_import",
2239 "description": (
2240 f"imports '{symbol_name}' but no symbol or module with that "
2241 f"name exists in the HEAD snapshot"
2242 ),
2243 "severity": "warning",
2244 "ref": ref,
2245 }
2246
2247 issues = list(issues_map.values())
2248
2249 # ── 5. Upsert issues in chunks ────────────────────────────────────────
2250 for i in range(0, len(issues), self._CHUNK):
2251 chunk = issues[i : i + self._CHUNK]
2252 stmt = (
2253 pg_insert(MusehubIntelBreakageIssue)
2254 .values(chunk)
2255 .on_conflict_do_update(
2256 index_elements=["issue_id"],
2257 set_={
2258 "file_path": sa.literal_column("excluded.file_path"),
2259 "issue_type": sa.literal_column("excluded.issue_type"),
2260 "description": sa.literal_column("excluded.description"),
2261 "severity": sa.literal_column("excluded.severity"),
2262 "ref": sa.literal_column("excluded.ref"),
2263 },
2264 )
2265 )
2266 await session.execute(stmt)
2267
2268 # ── 6. Upsert meta row ────────────────────────────────────────────────
2269 warning_count = sum(1 for issue in issues if issue["severity"] == "warning")
2270 error_count = sum(1 for issue in issues if issue["severity"] == "error")
2271 file_count = len({issue["file_path"] for issue in issues})
2272
2273 meta_stmt = (
2274 pg_insert(MusehubIntelBreakageMeta)
2275 .values(
2276 repo_id = repo_id,
2277 total_issues = len(issues),
2278 warning_count= warning_count,
2279 error_count = error_count,
2280 file_count = file_count,
2281 ref = ref,
2282 )
2283 .on_conflict_do_update(
2284 index_elements=["repo_id"],
2285 set_={
2286 "total_issues": sa.literal_column("excluded.total_issues"),
2287 "warning_count": sa.literal_column("excluded.warning_count"),
2288 "error_count": sa.literal_column("excluded.error_count"),
2289 "file_count": sa.literal_column("excluded.file_count"),
2290 "ref": sa.literal_column("excluded.ref"),
2291 },
2292 )
2293 )
2294 await session.execute(meta_stmt)
2295
2296 return [("intel.code.breakage", {"count": len(issues)})]
2297
2298 # ---------------------------------------------------------------------------
2299 # CodemapProvider — structural dependency topology (fan_in, fan_out, cycles)
2300 # ---------------------------------------------------------------------------
2301
2302 class CodemapProvider:
2303 """Persist structural dependency topology derived from stored snapshot objects.
2304
2305 Reads the HEAD snapshot manifest, calls ``parse_symbols()`` per file to
2306 extract import relationships, resolves dotted module names back to tracked
2307 file paths, computes ``fan_in`` / ``fan_out`` per module, runs Tarjan's SCC
2308 for cycle detection, and batch-upserts results into
2309 ``musehub_intel_codemap_modules`` and ``musehub_intel_codemap_meta``.
2310
2311 No subprocess is ever spawned. All data flows from objects stored at push
2312 time via ``get_backend().get(object_id)``.
2313
2314 Parameters
2315 ----------
2316 session : AsyncSession
2317 Active async SQLAlchemy session provided by the push worker.
2318 repo_id : str
2319 Opaque repo identifier from ``musehub_repos.repo_id``.
2320 ref : str
2321 Branch or commit ref string at push time (e.g. ``"dev"``).
2322 payload : JSONObject
2323 Raw push-event JSON — used to resolve owner/slug for the storage
2324 backend when not already present in the session.
2325
2326 Returns
2327 -------
2328 IntelResults
2329 ``[("intel.code.codemap", {"modules": N, "edges": E, "cycles": C})]``
2330 on success. ``[]`` when the snapshot manifest cannot be resolved.
2331
2332 Notes
2333 -----
2334 Import record ``qualified_name`` format from ``parse_symbols()`` is
2335 ``"import::<dotted.module.path>::<symbol_name>"``. The middle segment is
2336 used to resolve the import target to a tracked file path by converting
2337 ``dotted.module.path`` → ``dotted/module/path.py`` (or ``__init__.py``).
2338
2339 Fan-in is computed in a post-processing pass after the full manifest is
2340 walked: for each resolved import edge ``A → B``, ``fan_in[B] += 1``.
2341 Stdlib and third-party imports that cannot be resolved to a tracked file
2342 path are silently skipped — they inflate ``fan_out`` only when the
2343 resolved file is actually in the manifest.
2344
2345 Tarjan's algorithm runs in O(V + E) on the resolved import graph; for a
2346 typical repo of 300 files and 1,800 edges this completes in < 10 ms.
2347 """
2348
2349 _CHUNK = 1_000
2350
2351 # ------------------------------------------------------------------
2352 # Public entry point
2353 # ------------------------------------------------------------------
2354
2355 async def compute(
2356 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
2357 ) -> IntelResults:
2358 import msgpack
2359
2360 # ── 1. Resolve owner/slug ─────────────────────────────────────────────
2361 owner: str | None = payload.get("owner") # type: ignore[assignment]
2362 slug: str | None = payload.get("slug") # type: ignore[assignment]
2363 if not (owner and slug):
2364 row = await session.execute(
2365 select(MusehubRepo.owner, MusehubRepo.slug)
2366 .where(MusehubRepo.repo_id == repo_id)
2367 .limit(1)
2368 )
2369 result = row.first()
2370 if result is None:
2371 return []
2372 owner, slug = result
2373
2374 # ── 2. HEAD commit → snapshot manifest ───────────────────────────────
2375 commit_row = await session.execute(
2376 select(MusehubCommit.snapshot_id)
2377 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
2378 .where(MusehubCommitRef.repo_id == repo_id)
2379 .order_by(MusehubCommit.timestamp.desc())
2380 .limit(1)
2381 )
2382 commit_result = commit_row.first()
2383 if commit_result is None or commit_result[0] is None:
2384 return []
2385 snapshot_id: str = commit_result[0]
2386
2387 snap_row = await session.execute(
2388 select(MusehubSnapshot.manifest_blob)
2389 .where(MusehubSnapshot.snapshot_id == snapshot_id)
2390 .limit(1)
2391 )
2392 snap_result = snap_row.first()
2393 if snap_result is None or snap_result[0] is None:
2394 return []
2395
2396 manifest: dict[str, str] = msgpack.unpackb(bytes(snap_result[0]), raw=False)
2397 if not manifest:
2398 return []
2399
2400 # ── 3. Build a set of tracked paths for import resolution ─────────────
2401 manifest_paths: set[str] = set(manifest.keys())
2402
2403 # ── 4. Walk manifest: fetch bytes, parse symbols, accumulate edges ────
2404 backend = get_backend()
2405 sym_counts: IntDict = {} # file_path → non-import symbol count
2406 fan_out: IntDict = {} # file_path → resolved import edges
2407 fan_in: IntDict = {} # file_path → times imported by others
2408 edges: list[tuple[str, str]] = [] # (importer, importee) resolved pairs
2409
2410 _non_import_kinds = frozenset(
2411 {"function", "async_function", "method", "async_method", "class"}
2412 )
2413
2414 for file_path, object_id in manifest.items():
2415 src = await backend.get(object_id)
2416 if not src:
2417 fan_out[file_path] = 0
2418 sym_counts[file_path] = 0
2419 continue
2420 try:
2421 tree = parse_symbols(src, file_path)
2422 except Exception:
2423 fan_out[file_path] = 0
2424 sym_counts[file_path] = 0
2425 continue
2426
2427 sym_counts[file_path] = sum(
2428 1 for rec in tree.values()
2429 if rec.get("kind") in _non_import_kinds
2430 )
2431
2432 resolved_targets: set[str] = set()
2433 for rec in tree.values():
2434 if rec.get("kind") != "import":
2435 continue
2436 qn: str = rec.get("qualified_name", "")
2437 target = self._resolve_import(qn, manifest_paths)
2438 if target and target != file_path:
2439 resolved_targets.add(target)
2440
2441 fan_out[file_path] = len(resolved_targets)
2442 for target in resolved_targets:
2443 edges.append((file_path, target))
2444 fan_in[target] = fan_in.get(target, 0) + 1
2445
2446 # ── 5. Detect import cycles via Tarjan's SCC ──────────────────────────
2447 graph: dict[str, list[str]] = {f: [] for f in manifest_paths}
2448 for importer, importee in edges:
2449 graph[importer].append(importee)
2450
2451 cycles = self._tarjan_cycles(graph)
2452 cycle_paths = [[*cycle, cycle[0]] for cycle in cycles] # closed paths
2453
2454 # ── 6. Build upsert rows ──────────────────────────────────────────────
2455 rows = [
2456 {
2457 "repo_id": repo_id,
2458 "file_path": fp,
2459 "symbol_count": sym_counts.get(fp, 0),
2460 "fan_in": fan_in.get(fp, 0),
2461 "fan_out": fan_out.get(fp, 0),
2462 "language": language_of(fp),
2463 "ref": ref,
2464 }
2465 for fp in manifest_paths
2466 ]
2467
2468 # ── 7. Batch-upsert module rows ───────────────────────────────────────
2469 for i in range(0, len(rows), self._CHUNK):
2470 chunk = rows[i : i + self._CHUNK]
2471 stmt = (
2472 pg_insert(MusehubIntelCodemapModule)
2473 .values(chunk)
2474 .on_conflict_do_update(
2475 index_elements=["repo_id", "file_path"],
2476 set_={
2477 "symbol_count": sa.literal_column("excluded.symbol_count"),
2478 "fan_in": sa.literal_column("excluded.fan_in"),
2479 "fan_out": sa.literal_column("excluded.fan_out"),
2480 "language": sa.literal_column("excluded.language"),
2481 "ref": sa.literal_column("excluded.ref"),
2482 },
2483 )
2484 )
2485 await session.execute(stmt)
2486
2487 # ── 8. Upsert meta row ────────────────────────────────────────────────
2488 meta_stmt = (
2489 pg_insert(MusehubIntelCodemapMeta)
2490 .values(
2491 repo_id=repo_id,
2492 total_modules=len(rows),
2493 total_edges=len(edges),
2494 cycle_count=len(cycles),
2495 cycles_json=cycle_paths or None,
2496 ref=ref,
2497 )
2498 .on_conflict_do_update(
2499 index_elements=["repo_id"],
2500 set_={
2501 "total_modules": sa.literal_column("excluded.total_modules"),
2502 "total_edges": sa.literal_column("excluded.total_edges"),
2503 "cycle_count": sa.literal_column("excluded.cycle_count"),
2504 "cycles_json": sa.literal_column("excluded.cycles_json"),
2505 "ref": sa.literal_column("excluded.ref"),
2506 },
2507 )
2508 )
2509 await session.execute(meta_stmt)
2510
2511 return [("intel.code.codemap", {"modules": len(rows), "edges": len(edges), "cycles": len(cycles)})]
2512
2513 # ------------------------------------------------------------------
2514 # Import resolution helpers
2515 # ------------------------------------------------------------------
2516
2517 @staticmethod
2518 def _resolve_import(qualified_name: str, manifest_paths: set[str]) -> str | None:
2519 """Resolve a ``parse_symbols`` import record to a tracked file path.
2520
2521 The ``qualified_name`` format is ``"import::<dotted.module>::<symbol>"``.
2522 This method extracts the dotted module segment and tries both
2523 ``dotted/module.py`` and ``dotted/module/__init__.py`` against the
2524 tracked path set.
2525
2526 Parameters
2527 ----------
2528 qualified_name : str
2529 Full qualified name from the import record.
2530 manifest_paths : set[str]
2531 Set of all tracked file paths from the snapshot manifest.
2532
2533 Returns
2534 -------
2535 str | None
2536 Resolved repo-relative path, or ``None`` if not in manifest.
2537 """
2538 parts = qualified_name.split("::")
2539 if len(parts) < 2:
2540 return None
2541 dotted = parts[1]
2542 if not dotted:
2543 return None
2544 as_path = dotted.replace(".", "/")
2545 candidate_py = f"{as_path}.py"
2546 candidate_init = f"{as_path}/__init__.py"
2547 if candidate_py in manifest_paths:
2548 return candidate_py
2549 if candidate_init in manifest_paths:
2550 return candidate_init
2551 return None
2552
2553 # ------------------------------------------------------------------
2554 # Tarjan's SCC — O(V + E) cycle detection
2555 # ------------------------------------------------------------------
2556
2557 @staticmethod
2558 def _tarjan_cycles(graph: GraphMap) -> list[list[str]]:
2559 """Return all strongly-connected components of size ≥ 2 (import cycles).
2560
2561 Uses Tarjan's iterative SCC algorithm to avoid recursion limit issues
2562 on large graphs. Runs in O(V + E) time.
2563
2564 Parameters
2565 ----------
2566 graph : dict[str, list[str]]
2567 Adjacency list: ``{file_path: [imported_file_path, ...]}``.
2568
2569 Returns
2570 -------
2571 list[list[str]]
2572 Each inner list is one SCC (cycle) with ≥ 2 nodes. Self-loops
2573 (a file that imports itself) are excluded.
2574 """
2575 index_counter = [0]
2576 stack: list[str] = []
2577 lowlinks: dict[str, int] = {}
2578 index: dict[str, int] = {}
2579 on_stack: dict[str, bool] = {}
2580 sccs: list[list[str]] = []
2581
2582 def strongconnect(v: str) -> None:
2583 index[v] = index_counter[0]
2584 lowlinks[v] = index_counter[0]
2585 index_counter[0] += 1
2586 stack.append(v)
2587 on_stack[v] = True
2588
2589 for w in graph.get(v, []):
2590 if w not in index:
2591 strongconnect(w)
2592 lowlinks[v] = min(lowlinks[v], lowlinks[w])
2593 elif on_stack.get(w, False):
2594 lowlinks[v] = min(lowlinks[v], index[w])
2595
2596 if lowlinks[v] == index[v]:
2597 scc: list[str] = []
2598 while True:
2599 w = stack.pop()
2600 on_stack[w] = False
2601 scc.append(w)
2602 if w == v:
2603 break
2604 if len(scc) >= 2:
2605 sccs.append(scc)
2606
2607 for v in graph:
2608 if v not in index:
2609 strongconnect(v)
2610
2611 return sccs
2612
2613 class GravityProvider:
2614 """Derive gravity scores from existing blast columns in musehub_symbol_intel.
2615
2616 Formula mirrors muse/muse/cli/commands/gravity.py exactly:
2617
2618 total = count of tracked-kind symbols for this repo
2619 denom = max(1, total - 1)
2620 gravity_pct = round(blast / denom * 100, 1)
2621
2622 Column mapping:
2623 gravity_direct_dependents ← blast_direct
2624 gravity_transitive_dependents ← blast (blast_direct + blast_cross)
2625 gravity_pct ← derived formula above
2626 gravity_max_depth — not derivable; left NULL
2627 gravity_depth_distribution — not derivable; left NULL
2628
2629 No muse CLI call — pure SQL UPDATE on existing rows.
2630 """
2631
2632 async def compute(
2633 self, session: AsyncSession, repo_id: str, ref: str, payload: JSONObject
2634 ) -> IntelResults:
2635 import sqlalchemy as sa
2636 from musehub.services.musehub_symbol_indexer import backfill_symbol_kinds
2637
2638 # One-time backfill: populate symbol_kind for rows written before kind
2639 # extraction was added to the symbol indexer.
2640 await backfill_symbol_kinds(session, repo_id)
2641
2642 # Count tracked-kind production symbols — the denominator.
2643 total_result = await session.execute(
2644 sa.select(sa.func.count())
2645 .select_from(MusehubSymbolIntel)
2646 .where(
2647 MusehubSymbolIntel.repo_id == repo_id,
2648 MusehubSymbolIntel.symbol_kind.in_(_GRAVITY_TRACKED_KINDS),
2649 )
2650 )
2651 total: int = total_result.scalar_one()
2652 if total == 0:
2653 return []
2654
2655 denom = max(1, total - 1)
2656
2657 # Fetch all tracked rows to update.
2658 rows_result = await session.execute(
2659 sa.select(MusehubSymbolIntel)
2660 .where(
2661 MusehubSymbolIntel.repo_id == repo_id,
2662 MusehubSymbolIntel.symbol_kind.in_(_GRAVITY_TRACKED_KINDS),
2663 )
2664 )
2665 rows = rows_result.scalars().all()
2666
2667 for row in rows:
2668 blast: int = row.blast or 0
2669 row.gravity_pct = round(blast / denom * 100, 1)
2670 row.gravity_direct_dependents = row.blast_direct
2671 row.gravity_transitive_dependents = blast
2672 # gravity_max_depth and gravity_depth_distribution not derivable
2673 await session.flush()
2674
2675 return [("intel.code.gravity", {"count": len(rows)})]
2676
2677 # ---------------------------------------------------------------------------
2678 # Registry
2679 # ---------------------------------------------------------------------------
2680
2681 # Maps job_type → provider instance.
2682 # Workers dispatch jobs by looking up job.job_type in this dict.
2683 _PROVIDER_REGISTRY: dict[str, IntelProvider] = {
2684 "intel.structural": StructuralProvider(),
2685 "intel.code": CodeProvider(),
2686 "intel.midi": MidiProvider(),
2687 "intel.mist": MistProvider(),
2688 "profile.snapshot": ProfileSnapshotProvider(),
2689 "intel.code.coupling": CouplingProvider(),
2690 "intel.code.entangle": EntangleProvider(),
2691 "intel.code.dead": DeadProvider(),
2692 "intel.code.blast_risk": BlastRiskProvider(),
2693 "intel.code.stable": StableProvider(),
2694 "intel.code.velocity": VelocityProvider(),
2695 "intel.code.clones": ClonesProvider(),
2696 "intel.code.type": TypeProvider(),
2697 "intel.code.api_surface": ApiSurfaceProvider(),
2698 "intel.code.languages": LanguagesProvider(),
2699 "intel.code.detect_refactor": DetectRefactorProvider(),
2700 "intel.code.gravity": GravityProvider(),
2701 "intel.code.codemap": CodemapProvider(),
2702 "intel.code.breakage": BreakageProvider(),
2703 }
2704
2705 def get_provider(job_type: str) -> IntelProvider | None:
2706 """Return the provider for a given job_type, or None if not registered."""
2707 return _PROVIDER_REGISTRY.get(job_type)
2708
2709 def job_types_for_push(domain_id: str | None) -> list[str]:
2710 """Return the job types to enqueue after a push for the given domain.
2711
2712 ``intel.structural`` is always enqueued — it works for every domain.
2713 Domain-specific jobs are added based on the repo's ``domain_id``.
2714 ``gc`` is always enqueued to prune orphaned objects after any push.
2715 """
2716 types = ["intel.structural", "push.file_last_commits", "mpack.index", "fetch.mpack.prebuild"]
2717 if domain_id is None or "code" in (domain_id or ""):
2718 types.append("intel.code")
2719 types.extend([
2720 "intel.code.coupling",
2721 "intel.code.entangle",
2722 "intel.code.dead",
2723 "intel.code.blast_risk",
2724 "intel.code.stable",
2725 "intel.code.velocity",
2726 "intel.code.clones",
2727 "intel.code.type",
2728 "intel.code.api_surface",
2729 "intel.code.languages",
2730 "intel.code.detect_refactor",
2731 "intel.code.gravity",
2732 "intel.code.codemap",
2733 "intel.code.breakage",
2734 ])
2735 elif "midi" in (domain_id or ""):
2736 types.append("intel.midi")
2737 elif "mist" in (domain_id or ""):
2738 types.append("intel.mist")
2739 types.append("gc")
2740 return types
File History 3 commits
sha256:d50f9cf9829dfbe35721a23b81ad256c729ddf9dd565a0a9e56d27847e255632 feat(#92): phase 4 — enqueue fetch.mpack.prebuild on push (… Sonnet 4.6 patch 5 days ago
sha256:65f2fd8d910e1eeb00b7bc8740d3cbf1b2e14dad83b2eb999fbbbc44e97cd936 getting intel jobs to run properly Human minor 14 days ago
sha256:fa68fb6b242d872f0f8f8a300e4e7bad498a4dd17fdc76ddd87245b460e6aa47 fix: enqueue mpack.index after every push (TDD) Sonnet 4.6 patch 16 days ago