"""MuseHub file blob viewer page — extracted from ui.py for Phase 1 redesign. GET /{owner}/{repo_slug}/blob/{ref}/{path:path} Phase 1: commit provenance pill + sem-ver badge Phase 2: symbol outline panel Phase 3: file provenance timeline Phase 4: intelligence signal pills (hotspot / cold / blast) in header """ import asyncio import logging import re from pathlib import Path from fastapi import APIRouter, Depends, HTTPException, Request from fastapi import status as http_status from sqlalchemy import desc, func, select as sa_select from sqlalchemy.ext.asyncio import AsyncSession from starlette.responses import Response from musehub.api.routes.musehub._templates import templates from musehub.api.validation import BranchParam, FilePathParam, SlugParam from musehub.auth.dependencies import TokenClaims, optional_token from musehub.api.routes.musehub._ui_helpers import _resolve_ref_and_path, _resolve_repo from musehub.api.routes.musehub.json_alternate import json_or_html from musehub.db import get_db from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject from musehub.services import musehub_repository from musehub.services.musehub_snapshot import get_snapshot_manifest, get_snapshot_manifests_batch from musehub.services.musehub_symbol_indexer import load_symbol_history, SymbolHistory from musehub.storage.backends import read_object_bytes as _read_object_bytes from musehub.types.compression import decompress_if_needed as _decompress_if_needed from musehub.types.json_types import JSONObject, StrDict type _RangeMap = dict[str, tuple[int, int]] type _LineMap = dict[str, list[int]] logger = logging.getLogger(__name__) router = APIRouter(prefix="", tags=["musehub-ui"]) # ── Markdown rendering ──────────────────────────────────────────────────────── def _render_markdown(content: str) -> str: import mistune # noqa: PLC0415 html = mistune.html(content) # Inject id= anchors on headings for deep-linking via #heading-slug def _anchor(m: re.Match[str]) -> str: level = m.group(1) inner = m.group(2) slug = re.sub(r"<[^>]+>", "", inner) # strip any inline HTML slug = slug.lower().strip() slug = re.sub(r"[^\w\s-]", "", slug) slug = re.sub(r"[-\s]+", "-", slug) return f'{inner}' return re.sub(r"(.*?)", _anchor, html, flags=re.DOTALL) # ── Phase 2: Symbol outline data ───────────────────────────────────────────── def _symbol_history_to_outline(sh: SymbolHistory) -> list[JSONObject]: """Convert a loaded symbol history dict to outline entries.""" entries_out: list[JSONObject] = [] for address, ops in sh.items(): if not ops: continue sorted_ops = sorted(ops, key=lambda e: e.get("committed_at", ""), reverse=True) last = sorted_ops[0] entries_out.append({ "address": address, "display_name": address.split("::")[-1] if "::" in address else address.split("/")[-1], "last_op": last.get("op", ""), "last_op_time": last.get("committed_at", ""), "last_commit_id": last.get("commit_id", "") or "", "touch_count": len(ops), }) entries_out.sort(key=lambda e: e["last_op_time"], reverse=True) return entries_out[:50] async def _fetch_file_symbols( session: AsyncSession, repo_id: str, file_path: str ) -> list[JSONObject]: try: sh = await load_symbol_history(session, repo_id, file_path=file_path) except Exception: return [] return _symbol_history_to_outline(sh) async def _fetch_file_symbols_from_history( session: AsyncSession, repo_id: str, file_path: str, sh: SymbolHistory ) -> list[JSONObject]: """Like _fetch_file_symbols but accepts a pre-loaded history dict.""" return _symbol_history_to_outline(sh) def _parse_symbols_from_content(file_path: str, content: str) -> list[JSONObject]: """Extract symbols directly from file content when the symbol index has no entries. Supports Python (ast) and a generic line-scan fallback for other text files. Returns entries in the same shape as ``_fetch_file_symbols`` but with no commit history — last_op / last_op_time / last_commit_id are left empty. """ symbols: list[JSONObject] = [] if file_path.endswith(".py"): import ast # noqa: PLC0415 try: tree = ast.parse(content) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): name = node.name address = f"{file_path}::{name}" kind = "class" if isinstance(node, ast.ClassDef) else "async_function" if isinstance(node, ast.AsyncFunctionDef) else "function" symbols.append({ "address": address, "display_name": name, "last_op": "", "last_op_time": "", "last_commit_id": "", "touch_count": 0, "kind": kind, "lineno": node.lineno, "end_lineno": node.end_lineno, }) except SyntaxError: pass symbols.sort(key=lambda s: s.get("lineno", 0)) return symbols def _enrich_with_linenos(symbols: list[JSONObject], file_path: str, content: str) -> None: """Add ``lineno`` / ``end_lineno`` to symbol entries that lack them via AST. Mutates ``symbols`` in place. Only runs for Python files. """ if not file_path.endswith(".py"): return import ast # noqa: PLC0415 try: tree = ast.parse(content) except SyntaxError: return name_to_range: _RangeMap = {} for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): name_to_range[node.name] = (node.lineno, node.end_lineno) for sym in symbols: if "lineno" not in sym: name = str(sym.get("display_name", "")) if name in name_to_range: sym["lineno"], sym["end_lineno"] = name_to_range[name] def _symbol_line_map(symbols: list[JSONObject]) -> _LineMap: """Return a ``{display_name: [start, end]}`` map for symbols that have lineno. Both start and end are 1-based inclusive line numbers. When end_lineno is absent the symbol is treated as a single line (end == start). """ result: _LineMap = {} for s in symbols: if not s.get("lineno"): continue start = int(s["lineno"]) end = int(s["end_lineno"]) if s.get("end_lineno") else start result[str(s["display_name"])] = [start, end] return result def _symbol_line_map_from_content(file_path: str, content: str) -> _LineMap: """Build a complete ``{name: [start, end]}`` map by AST-parsing raw content. Used to populate ``symbolLines`` in the blob page_json for ``#S:`` deep links. Always covers every symbol in the file regardless of index state. """ return _symbol_line_map(_parse_symbols_from_content(file_path, content)) # ── Phase 3: File provenance timeline ──────────────────────────────────────── async def _fetch_file_history( session: AsyncSession, repo_id: str, file_path: str, head_commit_id: str, limit: int = 20, ) -> list[JSONObject]: """Return up to *limit* commits that changed *file_path*, newest first. Fast path: queries musehub_symbol_history_entries by (repo_id, address) — O(1) via index. Falls back to snapshot manifest scan when no history entries exist for the file. """ norm = file_path.lstrip("/") # ── Fast path: history index ────────────────────────────────────────────── she = MusehubSymbolHistoryEntry index_stmt = ( sa_select(she.commit_id, she.committed_at) .where( she.repo_id == repo_id, (she.address == norm) | she.address.like(f"{norm}::%"), ) .distinct() .order_by(desc(she.committed_at)) .limit(limit) ) index_rows = (await session.execute(index_stmt)).all() if index_rows: # Deduplicate by commit_id, keeping the latest committed_at per commit. seen: dict[str, any] = {} for row in index_rows: if row.commit_id not in seen or row.committed_at > seen[row.commit_id].committed_at: seen[row.commit_id] = row deduped = sorted(seen.values(), key=lambda r: r.committed_at, reverse=True)[:limit] commit_ids = [r.commit_id for r in deduped] commits = ( await session.execute( sa_select(MusehubCommit).where( MusehubCommit.commit_id.in_(commit_ids) ) ) ).scalars().all() commit_map = {c.commit_id: c for c in commits} history: list[JSONObject] = [] for row in deduped: c = commit_map.get(row.commit_id) if c is None: continue agent_id = c.agent_id or "" model_id = c.model_id or "" history.append({ "commit_id": c.commit_id, "commit_id_full": c.commit_id, "message": (c.message.split("\n")[0] if c.message else ""), "author": c.author or "", "timestamp": c.timestamp, "is_agent": bool(agent_id), "model_label": _model_label(model_id) if model_id else "", "sem_ver_bump": c.sem_ver_bump or "none", "breaking": bool(c.breaking_changes), }) return history # ── Fallback: snapshot manifest scan ───────────────────────────────────── head = await session.get(MusehubCommit, head_commit_id) if head is None or head.snapshot_id is None: return [] stmt = ( sa_select(MusehubCommit) .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) .where( MusehubCommitRef.repo_id == repo_id, MusehubCommit.branch == (head.branch or "main"), MusehubCommit.timestamp <= head.timestamp, ) .order_by(desc(MusehubCommit.timestamp)) .limit(300) ) rows = (await session.execute(stmt)).scalars().all() snapshot_ids = [c.snapshot_id for c in rows if c.snapshot_id] manifests = await get_snapshot_manifests_batch(session, snapshot_ids[:100]) if len(snapshot_ids) > 100: manifests.update(await get_snapshot_manifests_batch(session, snapshot_ids[100:200])) if len(snapshot_ids) > 200: manifests.update(await get_snapshot_manifests_batch(session, snapshot_ids[200:300])) head_manifest = manifests.get(head.snapshot_id, {}) if norm not in head_manifest: return [] history_fb: list[JSONObject] = [] prev_oid: str | None = None for commit in rows: if commit.snapshot_id is None: continue oid = manifests.get(commit.snapshot_id, {}).get(norm) if oid != prev_oid and oid is not None: agent_id = commit.agent_id or "" model_id = commit.model_id or "" history_fb.append({ "commit_id": commit.commit_id, "commit_id_full": commit.commit_id, "message": (commit.message.split("\n")[0] if commit.message else ""), "author": commit.author or "", "timestamp": commit.timestamp, "is_agent": bool(agent_id), "model_label": _model_label(model_id) if model_id else "", "sem_ver_bump": commit.sem_ver_bump or "none", "breaking": bool(commit.breaking_changes), }) if len(history_fb) >= limit: break prev_oid = oid return history_fb # ── Phase 4: Intelligence signals ──────────────────────────────────────────── _INTEL_EMPTY: JSONObject = { "is_hotspot": False, "hotspot_count": 0, "has_dead": False, "dead_count": 0, "blast_risk": False, "blast_count": 0, "health_score": 100, "health_label": "Excellent", } def _compute_intel_from_history(file_sh: SymbolHistory) -> JSONObject: """Compute hotspot/dead/blast signals from a pre-loaded symbol history dict.""" if not file_sh: return _INTEL_EMPTY from datetime import datetime, timezone # noqa: PLC0415 _HOTSPOT_THRESHOLD = 10 _DEAD_COLD_DAYS = 90 _BLAST_THRESHOLD = 20 now = datetime.now(tz=timezone.utc) hotspot_count = 0 dead_count = 0 blast_count = 0 for ops in file_sh.values(): n = len(ops) if n > _HOTSPOT_THRESHOLD: hotspot_count += 1 last_ts: datetime | None = None for op in ops: raw = op.get("committed_at") or op.get("timestamp") or op.get("ts", "") if raw: try: ts = datetime.fromisoformat(raw.replace("Z", "+00:00")) if last_ts is None or ts > last_ts: last_ts = ts except Exception: pass if last_ts and (now - last_ts).days > _DEAD_COLD_DAYS: dead_count += 1 unique_commits = len({op.get("commit_id") for op in ops if op.get("commit_id")}) if unique_commits > _BLAST_THRESHOLD: blast_count += 1 score = 100 score -= min(hotspot_count * 8, 30) score -= min(dead_count * 10, 30) score -= min(blast_count * 6, 20) score = max(0, min(score, 100)) if score >= 90: health_label = "Excellent" elif score >= 75: health_label = "Good" elif score >= 55: health_label = "Fair" elif score >= 35: health_label = "Poor" else: health_label = "Critical" return { "is_hotspot": hotspot_count > 0, "hotspot_count": hotspot_count, "has_dead": dead_count > 0, "dead_count": dead_count, "blast_risk": blast_count > 0, "blast_count": blast_count, "health_score": score, "health_label": health_label, } async def _fetch_file_intel( session: AsyncSession, repo_id: str, file_path: str ) -> JSONObject: """Compute hotspot / dead / blast signals for *file_path*.""" try: file_sh = await load_symbol_history(session, repo_id, file_path=file_path) except Exception: return _INTEL_EMPTY return _compute_intel_from_history(file_sh) async def _fetch_file_intel_from_history( session: AsyncSession, repo_id: str, file_path: str, sh: SymbolHistory ) -> JSONObject: """Like _fetch_file_intel but accepts a pre-loaded history dict.""" return _compute_intel_from_history(sh) # ── File classification helpers ─────────────────────────────────────────────── _BLOB_BINARY_TYPES: frozenset[str] = frozenset( [".webp", ".png", ".jpg", ".jpeg", ".gif", ".svg"] ) _LANG_MAP: StrDict = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".md": "markdown", ".txt": "text", ".xml": "xml", ".html": "html", ".css": "css", ".sh": "bash", ".toml": "toml", ".bats": "bash", } # Dotfiles have no extension — map by exact filename. _FILENAME_LANG_MAP: StrDict = { ".museattributes": "toml", ".museignore": "toml", } def _detect_language(path: str) -> str: filename = Path(path).name if filename in _FILENAME_LANG_MAP: return _FILENAME_LANG_MAP[filename] # Zsh completion files: _ with no extension (e.g. completions/_muse) if filename.startswith("_") and "." not in filename: return "bash" return _LANG_MAP.get(Path(path).suffix.lower(), "") def _model_label(model_id: str | None) -> str: """``claude-sonnet-4-6`` → ``Sonnet 4.6``.""" if not model_id: return "unknown" stripped = model_id.removeprefix("claude-") parts = stripped.split("-") name = parts[0].capitalize() if parts else model_id version = ".".join(parts[1:]) if len(parts) > 1 else "" return f"{name} {version}".strip() async def _empty_list() -> list[JSONObject]: return [] # ── Route ───────────────────────────────────────────────────────────────────── @router.get( "/{owner}/{repo_slug}/blob/{ref}/{path:path}", summary="MuseHub file blob viewer — content-addressed file rendering", ) async def blob_page( request: Request, owner: SlugParam, repo_slug: SlugParam, ref: str, path: FilePathParam, db: AsyncSession = Depends(get_db), claims: TokenClaims | None = Depends(optional_token), ) -> Response: """Render the Muse-native file blob viewer. Resolution: ref → commit → snapshot manifest → object_id → storage backend. Rendering modes: - Text/code → SSR line-numbered table + JS syntax highlighting - Binary / >1 MB → download link only Phase 1 context additions: - commit_agent_id, commit_model_id, commit_model_label — for provenance pill - commit_sem_ver_bump, commit_breaking, commit_is_agent — for sem-ver badge """ repo_id, base_url, nav_ctx = await _resolve_repo(owner, repo_slug, db, claims) # Resolve branch names containing slashes (e.g. feat/my-branch). ref, path = await _resolve_ref_and_path(db, repo_id, f"{ref}/{path}") norm_path = path.lstrip("/") filename = norm_path.split("/")[-1] if norm_path else "" lang = _detect_language(norm_path) ext = Path(norm_path).suffix.lower() is_binary = ext in _BLOB_BINARY_TYPES file_meta = await musehub_repository.get_file_at_ref(db, repo_id, ref, norm_path) content_bytes: bytes | None = None object_id: str = "" snapshot_id: str = "" blob_found = False if file_meta: object_id = str(file_meta["object_id"]) snapshot_id = str(file_meta["snapshot_id"]) blob_found = True if not is_binary: obj_row = await db.get(MusehubObject, object_id) if obj_row is not None: raw = await _read_object_bytes(obj_row, session=db) if raw: content_bytes = _decompress_if_needed(raw) if not blob_found: ctx_404: JSONObject = { "owner": owner, "repo_slug": repo_slug, "repo_id": repo_id, "ref": ref, "file_path": norm_path, "filename": filename, "base_url": base_url, "current_page": "tree", "blob_found": False, "lang": lang, "is_binary": False, "size_bytes": 0, "lines": [], "line_count": 0, "object_id": "", "snapshot_id": "", "commit_is_agent": False, "commit_agent_id": "", "commit_model_id": "", "commit_model_label": "", "commit_sem_ver_bump": "none", "commit_breaking": False, "file_symbols": [], "has_outline": False, "symbol_line_map": {}, "file_history": [], "file_intel": {"is_hotspot": False, "has_dead": False, "blast_risk": False, "hotspot_count": 0, "dead_count": 0, "blast_count": 0, "health_score": 100, "health_label": "Excellent"}, } ctx_404.update(nav_ctx) return json_or_html( request, lambda: templates.TemplateResponse(request, "musehub/pages/blob.html", ctx_404), ctx_404, ) # Decode text content content: str | None = None if content_bytes is not None and not is_binary: if len(content_bytes) > 1_000_000: is_binary = True else: content = content_bytes.decode("utf-8", errors="replace") size_bytes: int = len(content_bytes) if content_bytes else 0 if size_bytes > 1_000_000: is_binary = True content = None is_markdown = ext == ".md" md_html: str = _render_markdown(content) if (is_markdown and content) else "" lines: list[str] = content.splitlines() if content else [] line_count = len(lines) # Last-modified commit for this file last_commit = ( await musehub_repository.get_last_commit_for_file( db, repo_id, norm_path, str(file_meta["commit_id"]) if file_meta else ref ) if file_meta else None ) # ── Phase 1: extract provenance from first-class columns ────────────── commit_agent_id: str = (last_commit.agent_id or "") if last_commit else "" commit_model_id: str = (last_commit.model_id or "") if last_commit else "" commit_sem_ver_bump: str = (last_commit.sem_ver_bump or "none") if last_commit else "none" commit_breaking: bool = bool(last_commit.breaking_changes) if last_commit else False commit_is_agent: bool = bool(commit_agent_id) # ── Phases 2/3/4: load symbol history once, run phases concurrently ─── head_cid = str(file_meta["commit_id"]) if file_meta else "" try: file_sh = await load_symbol_history(db, repo_id, file_path=norm_path) except Exception: file_sh = {} file_symbols, file_history, file_intel = await asyncio.gather( _fetch_file_symbols_from_history(db, repo_id, norm_path, file_sh), _fetch_file_history(db, repo_id, norm_path, head_cid) if head_cid else _empty_list(), _fetch_file_intel_from_history(db, repo_id, norm_path, file_sh), ) # ── Phase 2 post-processing ──────────────────────────────────────────── if not file_symbols and content: file_symbols = _parse_symbols_from_content(norm_path, content) elif file_symbols and content: # Symbol index entries lack lineno — enrich from AST when content is available. _enrich_with_linenos(file_symbols, norm_path, content) has_outline = bool(file_symbols) # Build symbolLines from a full AST parse so every symbol in the file is # reachable via #S: deep links, regardless of whether it appears in the index. symbol_line_map = ( _symbol_line_map_from_content(norm_path, content) if content else _symbol_line_map(file_symbols) ) # Breadcrumb path segments path_segments: list[tuple[str, str]] = [] accumulated = "" for seg in norm_path.split("/")[:-1]: accumulated = f"{accumulated}/{seg}" if accumulated else seg path_segments.append((seg, f"{base_url}/tree/{ref}/{accumulated}")) ctx: JSONObject = { "owner": owner, "repo_slug": repo_slug, "repo_id": repo_id, "ref": ref, "file_path": norm_path, "filename": filename, "path_segments": path_segments, "base_url": base_url, "current_page": "tree", "lang": lang, "is_binary": is_binary, "is_markdown": is_markdown, "md_html": md_html, "size_bytes": size_bytes, "lines": lines, "line_count": line_count, "blob_found": blob_found, "object_id": object_id, "object_id_short": object_id if object_id else "", "snapshot_id": snapshot_id, "last_commit": last_commit.commit_id if last_commit else "", "last_commit_full": last_commit.commit_id if last_commit else "", "last_commit_msg": (last_commit.message.split("\n")[0] if last_commit else ""), "last_commit_time": last_commit.timestamp if last_commit else None, "last_commit_author": last_commit.author if last_commit else "", # Phase 1: provenance "commit_agent_id": commit_agent_id, "commit_model_id": commit_model_id, "commit_model_label": _model_label(commit_model_id) if commit_model_id else "", "commit_sem_ver_bump": commit_sem_ver_bump, "commit_breaking": commit_breaking, "commit_is_agent": commit_is_agent, # Phase 2: outline panel + symbol→lineno map for JS deep linking "file_symbols": file_symbols, "has_outline": has_outline, "symbol_line_map": symbol_line_map, # Phase 3: provenance timeline "file_history": file_history, # Phase 4: intelligence signals "file_intel": file_intel, } ctx.update(nav_ctx) return json_or_html( request, lambda: templates.TemplateResponse(request, "musehub/pages/blob.html", ctx), ctx, )