"""MuseHub file blob viewer page — extracted from ui.py for Phase 1 redesign.
GET /{owner}/{repo_slug}/blob/{ref}/{path:path}
Phase 1: commit provenance pill + sem-ver badge
Phase 2: symbol outline panel
Phase 3: file provenance timeline
Phase 4: intelligence signal pills (hotspot / cold / blast) in header
"""
import asyncio
import logging
import re
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi import status as http_status
from sqlalchemy import desc, func, select as sa_select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette.responses import Response
from musehub.api.routes.musehub._templates import templates
from musehub.api.validation import BranchParam, FilePathParam, SlugParam
from musehub.auth.dependencies import TokenClaims, optional_token
from musehub.api.routes.musehub._ui_helpers import _resolve_ref_and_path, _resolve_repo
from musehub.api.routes.musehub.json_alternate import json_or_html
from musehub.db import get_db
from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubObject
from musehub.services import musehub_repository
from musehub.services.musehub_snapshot import get_snapshot_manifest, get_snapshot_manifests_batch
from musehub.services.musehub_symbol_indexer import load_symbol_history, SymbolHistory
from musehub.storage.backends import read_object_bytes as _read_object_bytes
from musehub.types.compression import decompress_if_needed as _decompress_if_needed
from musehub.types.json_types import JSONObject, StrDict
type _RangeMap = dict[str, tuple[int, int]]
type _LineMap = dict[str, list[int]]
logger = logging.getLogger(__name__)
router = APIRouter(prefix="", tags=["musehub-ui"])
# ── Markdown rendering ────────────────────────────────────────────────────────
def _render_markdown(content: str) -> str:
import mistune # noqa: PLC0415
html = mistune.html(content)
# Inject id= anchors on headings for deep-linking via #heading-slug
def _anchor(m: re.Match[str]) -> str:
level = m.group(1)
inner = m.group(2)
slug = re.sub(r"<[^>]+>", "", inner) # strip any inline HTML
slug = slug.lower().strip()
slug = re.sub(r"[^\w\s-]", "", slug)
slug = re.sub(r"[-\s]+", "-", slug)
return f'{inner}'
return re.sub(r"(.*?)", _anchor, html, flags=re.DOTALL)
# ── Phase 2: Symbol outline data ─────────────────────────────────────────────
def _symbol_history_to_outline(sh: SymbolHistory) -> list[JSONObject]:
"""Convert a loaded symbol history dict to outline entries."""
entries_out: list[JSONObject] = []
for address, ops in sh.items():
if not ops:
continue
sorted_ops = sorted(ops, key=lambda e: e.get("committed_at", ""), reverse=True)
last = sorted_ops[0]
entries_out.append({
"address": address,
"display_name": address.split("::")[-1] if "::" in address else address.split("/")[-1],
"last_op": last.get("op", ""),
"last_op_time": last.get("committed_at", ""),
"last_commit_id": last.get("commit_id", "") or "",
"touch_count": len(ops),
})
entries_out.sort(key=lambda e: e["last_op_time"], reverse=True)
return entries_out[:50]
async def _fetch_file_symbols(
session: AsyncSession, repo_id: str, file_path: str
) -> list[JSONObject]:
try:
sh = await load_symbol_history(session, repo_id, file_path=file_path)
except Exception:
return []
return _symbol_history_to_outline(sh)
async def _fetch_file_symbols_from_history(
session: AsyncSession, repo_id: str, file_path: str, sh: SymbolHistory
) -> list[JSONObject]:
"""Like _fetch_file_symbols but accepts a pre-loaded history dict."""
return _symbol_history_to_outline(sh)
def _parse_symbols_from_content(file_path: str, content: str) -> list[JSONObject]:
"""Extract symbols directly from file content when the symbol index has no entries.
Supports Python (ast) and a generic line-scan fallback for other text files.
Returns entries in the same shape as ``_fetch_file_symbols`` but with no
commit history — last_op / last_op_time / last_commit_id are left empty.
"""
symbols: list[JSONObject] = []
if file_path.endswith(".py"):
import ast # noqa: PLC0415
try:
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
name = node.name
address = f"{file_path}::{name}"
kind = "class" if isinstance(node, ast.ClassDef) else "async_function" if isinstance(node, ast.AsyncFunctionDef) else "function"
symbols.append({
"address": address,
"display_name": name,
"last_op": "",
"last_op_time": "",
"last_commit_id": "",
"touch_count": 0,
"kind": kind,
"lineno": node.lineno,
"end_lineno": node.end_lineno,
})
except SyntaxError:
pass
symbols.sort(key=lambda s: s.get("lineno", 0))
return symbols
def _enrich_with_linenos(symbols: list[JSONObject], file_path: str, content: str) -> None:
"""Add ``lineno`` / ``end_lineno`` to symbol entries that lack them via AST.
Mutates ``symbols`` in place. Only runs for Python files.
"""
if not file_path.endswith(".py"):
return
import ast # noqa: PLC0415
try:
tree = ast.parse(content)
except SyntaxError:
return
name_to_range: _RangeMap = {}
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
name_to_range[node.name] = (node.lineno, node.end_lineno)
for sym in symbols:
if "lineno" not in sym:
name = str(sym.get("display_name", ""))
if name in name_to_range:
sym["lineno"], sym["end_lineno"] = name_to_range[name]
def _symbol_line_map(symbols: list[JSONObject]) -> _LineMap:
"""Return a ``{display_name: [start, end]}`` map for symbols that have lineno.
Both start and end are 1-based inclusive line numbers. When end_lineno is
absent the symbol is treated as a single line (end == start).
"""
result: _LineMap = {}
for s in symbols:
if not s.get("lineno"):
continue
start = int(s["lineno"])
end = int(s["end_lineno"]) if s.get("end_lineno") else start
result[str(s["display_name"])] = [start, end]
return result
def _symbol_line_map_from_content(file_path: str, content: str) -> _LineMap:
"""Build a complete ``{name: [start, end]}`` map by AST-parsing raw content.
Used to populate ``symbolLines`` in the blob page_json for ``#S:`` deep
links. Always covers every symbol in the file regardless of index state.
"""
return _symbol_line_map(_parse_symbols_from_content(file_path, content))
# ── Phase 3: File provenance timeline ────────────────────────────────────────
async def _fetch_file_history(
session: AsyncSession,
repo_id: str,
file_path: str,
head_commit_id: str,
limit: int = 20,
) -> list[JSONObject]:
"""Return up to *limit* commits that changed *file_path*, newest first.
Fast path: queries musehub_symbol_history_entries by (repo_id, address)
— O(1) via index. Falls back to snapshot manifest scan when no history
entries exist for the file.
"""
norm = file_path.lstrip("/")
# ── Fast path: history index ──────────────────────────────────────────────
she = MusehubSymbolHistoryEntry
index_stmt = (
sa_select(she.commit_id, she.committed_at)
.where(
she.repo_id == repo_id,
(she.address == norm) | she.address.like(f"{norm}::%"),
)
.distinct()
.order_by(desc(she.committed_at))
.limit(limit)
)
index_rows = (await session.execute(index_stmt)).all()
if index_rows:
# Deduplicate by commit_id, keeping the latest committed_at per commit.
seen: dict[str, any] = {}
for row in index_rows:
if row.commit_id not in seen or row.committed_at > seen[row.commit_id].committed_at:
seen[row.commit_id] = row
deduped = sorted(seen.values(), key=lambda r: r.committed_at, reverse=True)[:limit]
commit_ids = [r.commit_id for r in deduped]
commits = (
await session.execute(
sa_select(MusehubCommit).where(
MusehubCommit.commit_id.in_(commit_ids)
)
)
).scalars().all()
commit_map = {c.commit_id: c for c in commits}
history: list[JSONObject] = []
for row in deduped:
c = commit_map.get(row.commit_id)
if c is None:
continue
agent_id = c.agent_id or ""
model_id = c.model_id or ""
history.append({
"commit_id": c.commit_id,
"commit_id_full": c.commit_id,
"message": (c.message.split("\n")[0] if c.message else ""),
"author": c.author or "",
"timestamp": c.timestamp,
"is_agent": bool(agent_id),
"model_label": _model_label(model_id) if model_id else "",
"sem_ver_bump": c.sem_ver_bump or "none",
"breaking": bool(c.breaking_changes),
})
return history
# ── Fallback: snapshot manifest scan ─────────────────────────────────────
head = await session.get(MusehubCommit, head_commit_id)
if head is None or head.snapshot_id is None:
return []
stmt = (
sa_select(MusehubCommit)
.join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
.where(
MusehubCommitRef.repo_id == repo_id,
MusehubCommit.branch == (head.branch or "main"),
MusehubCommit.timestamp <= head.timestamp,
)
.order_by(desc(MusehubCommit.timestamp))
.limit(300)
)
rows = (await session.execute(stmt)).scalars().all()
snapshot_ids = [c.snapshot_id for c in rows if c.snapshot_id]
manifests = await get_snapshot_manifests_batch(session, snapshot_ids[:100])
if len(snapshot_ids) > 100:
manifests.update(await get_snapshot_manifests_batch(session, snapshot_ids[100:200]))
if len(snapshot_ids) > 200:
manifests.update(await get_snapshot_manifests_batch(session, snapshot_ids[200:300]))
head_manifest = manifests.get(head.snapshot_id, {})
if norm not in head_manifest:
return []
history_fb: list[JSONObject] = []
prev_oid: str | None = None
for commit in rows:
if commit.snapshot_id is None:
continue
oid = manifests.get(commit.snapshot_id, {}).get(norm)
if oid != prev_oid and oid is not None:
agent_id = commit.agent_id or ""
model_id = commit.model_id or ""
history_fb.append({
"commit_id": commit.commit_id,
"commit_id_full": commit.commit_id,
"message": (commit.message.split("\n")[0] if commit.message else ""),
"author": commit.author or "",
"timestamp": commit.timestamp,
"is_agent": bool(agent_id),
"model_label": _model_label(model_id) if model_id else "",
"sem_ver_bump": commit.sem_ver_bump or "none",
"breaking": bool(commit.breaking_changes),
})
if len(history_fb) >= limit:
break
prev_oid = oid
return history_fb
# ── Phase 4: Intelligence signals ────────────────────────────────────────────
_INTEL_EMPTY: JSONObject = {
"is_hotspot": False, "hotspot_count": 0,
"has_dead": False, "dead_count": 0,
"blast_risk": False, "blast_count": 0,
"health_score": 100, "health_label": "Excellent",
}
def _compute_intel_from_history(file_sh: SymbolHistory) -> JSONObject:
"""Compute hotspot/dead/blast signals from a pre-loaded symbol history dict."""
if not file_sh:
return _INTEL_EMPTY
from datetime import datetime, timezone # noqa: PLC0415
_HOTSPOT_THRESHOLD = 10
_DEAD_COLD_DAYS = 90
_BLAST_THRESHOLD = 20
now = datetime.now(tz=timezone.utc)
hotspot_count = 0
dead_count = 0
blast_count = 0
for ops in file_sh.values():
n = len(ops)
if n > _HOTSPOT_THRESHOLD:
hotspot_count += 1
last_ts: datetime | None = None
for op in ops:
raw = op.get("committed_at") or op.get("timestamp") or op.get("ts", "")
if raw:
try:
ts = datetime.fromisoformat(raw.replace("Z", "+00:00"))
if last_ts is None or ts > last_ts:
last_ts = ts
except Exception:
pass
if last_ts and (now - last_ts).days > _DEAD_COLD_DAYS:
dead_count += 1
unique_commits = len({op.get("commit_id") for op in ops if op.get("commit_id")})
if unique_commits > _BLAST_THRESHOLD:
blast_count += 1
score = 100
score -= min(hotspot_count * 8, 30)
score -= min(dead_count * 10, 30)
score -= min(blast_count * 6, 20)
score = max(0, min(score, 100))
if score >= 90: health_label = "Excellent"
elif score >= 75: health_label = "Good"
elif score >= 55: health_label = "Fair"
elif score >= 35: health_label = "Poor"
else: health_label = "Critical"
return {
"is_hotspot": hotspot_count > 0,
"hotspot_count": hotspot_count,
"has_dead": dead_count > 0,
"dead_count": dead_count,
"blast_risk": blast_count > 0,
"blast_count": blast_count,
"health_score": score,
"health_label": health_label,
}
async def _fetch_file_intel(
session: AsyncSession, repo_id: str, file_path: str
) -> JSONObject:
"""Compute hotspot / dead / blast signals for *file_path*."""
try:
file_sh = await load_symbol_history(session, repo_id, file_path=file_path)
except Exception:
return _INTEL_EMPTY
return _compute_intel_from_history(file_sh)
async def _fetch_file_intel_from_history(
session: AsyncSession, repo_id: str, file_path: str, sh: SymbolHistory
) -> JSONObject:
"""Like _fetch_file_intel but accepts a pre-loaded history dict."""
return _compute_intel_from_history(sh)
# ── File classification helpers ───────────────────────────────────────────────
_BLOB_BINARY_TYPES: frozenset[str] = frozenset(
[".webp", ".png", ".jpg", ".jpeg", ".gif", ".svg"]
)
_LANG_MAP: StrDict = {
".py": "python", ".js": "javascript", ".ts": "typescript",
".json": "json", ".yaml": "yaml", ".yml": "yaml",
".md": "markdown", ".txt": "text", ".xml": "xml",
".html": "html", ".css": "css", ".sh": "bash",
".toml": "toml", ".bats": "bash",
}
# Dotfiles have no extension — map by exact filename.
_FILENAME_LANG_MAP: StrDict = {
".museattributes": "toml",
".museignore": "toml",
}
def _detect_language(path: str) -> str:
filename = Path(path).name
if filename in _FILENAME_LANG_MAP:
return _FILENAME_LANG_MAP[filename]
# Zsh completion files: _ with no extension (e.g. completions/_muse)
if filename.startswith("_") and "." not in filename:
return "bash"
return _LANG_MAP.get(Path(path).suffix.lower(), "")
def _model_label(model_id: str | None) -> str:
"""``claude-sonnet-4-6`` → ``Sonnet 4.6``."""
if not model_id:
return "unknown"
stripped = model_id.removeprefix("claude-")
parts = stripped.split("-")
name = parts[0].capitalize() if parts else model_id
version = ".".join(parts[1:]) if len(parts) > 1 else ""
return f"{name} {version}".strip()
async def _empty_list() -> list[JSONObject]:
return []
# ── Route ─────────────────────────────────────────────────────────────────────
@router.get(
"/{owner}/{repo_slug}/blob/{ref}/{path:path}",
summary="MuseHub file blob viewer — content-addressed file rendering",
)
async def blob_page(
request: Request,
owner: SlugParam,
repo_slug: SlugParam,
ref: str,
path: FilePathParam,
db: AsyncSession = Depends(get_db),
claims: TokenClaims | None = Depends(optional_token),
) -> Response:
"""Render the Muse-native file blob viewer.
Resolution: ref → commit → snapshot manifest → object_id → storage backend.
Rendering modes:
- Text/code → SSR line-numbered table + JS syntax highlighting
- Binary / >1 MB → download link only
Phase 1 context additions:
- commit_agent_id, commit_model_id, commit_model_label — for provenance pill
- commit_sem_ver_bump, commit_breaking, commit_is_agent — for sem-ver badge
"""
repo_id, base_url, nav_ctx = await _resolve_repo(owner, repo_slug, db, claims)
# Resolve branch names containing slashes (e.g. feat/my-branch).
ref, path = await _resolve_ref_and_path(db, repo_id, f"{ref}/{path}")
norm_path = path.lstrip("/")
filename = norm_path.split("/")[-1] if norm_path else ""
lang = _detect_language(norm_path)
ext = Path(norm_path).suffix.lower()
is_binary = ext in _BLOB_BINARY_TYPES
file_meta = await musehub_repository.get_file_at_ref(db, repo_id, ref, norm_path)
content_bytes: bytes | None = None
object_id: str = ""
snapshot_id: str = ""
blob_found = False
if file_meta:
object_id = str(file_meta["object_id"])
snapshot_id = str(file_meta["snapshot_id"])
blob_found = True
if not is_binary:
obj_row = await db.get(MusehubObject, object_id)
if obj_row is not None:
raw = await _read_object_bytes(obj_row, session=db)
if raw:
content_bytes = _decompress_if_needed(raw)
if not blob_found:
ctx_404: JSONObject = {
"owner": owner, "repo_slug": repo_slug, "repo_id": repo_id,
"ref": ref, "file_path": norm_path, "filename": filename,
"base_url": base_url, "current_page": "tree",
"blob_found": False, "lang": lang, "is_binary": False,
"size_bytes": 0, "lines": [],
"line_count": 0, "object_id": "", "snapshot_id": "",
"commit_is_agent": False, "commit_agent_id": "",
"commit_model_id": "", "commit_model_label": "",
"commit_sem_ver_bump": "none", "commit_breaking": False,
"file_symbols": [], "has_outline": False, "symbol_line_map": {},
"file_history": [],
"file_intel": {"is_hotspot": False, "has_dead": False, "blast_risk": False,
"hotspot_count": 0, "dead_count": 0, "blast_count": 0,
"health_score": 100, "health_label": "Excellent"},
}
ctx_404.update(nav_ctx)
return json_or_html(
request,
lambda: templates.TemplateResponse(request, "musehub/pages/blob.html", ctx_404),
ctx_404,
)
# Decode text content
content: str | None = None
if content_bytes is not None and not is_binary:
if len(content_bytes) > 1_000_000:
is_binary = True
else:
content = content_bytes.decode("utf-8", errors="replace")
size_bytes: int = len(content_bytes) if content_bytes else 0
if size_bytes > 1_000_000:
is_binary = True
content = None
is_markdown = ext == ".md"
md_html: str = _render_markdown(content) if (is_markdown and content) else ""
lines: list[str] = content.splitlines() if content else []
line_count = len(lines)
# Last-modified commit for this file
last_commit = (
await musehub_repository.get_last_commit_for_file(
db, repo_id, norm_path, str(file_meta["commit_id"]) if file_meta else ref
)
if file_meta
else None
)
# ── Phase 1: extract provenance from first-class columns ──────────────
commit_agent_id: str = (last_commit.agent_id or "") if last_commit else ""
commit_model_id: str = (last_commit.model_id or "") if last_commit else ""
commit_sem_ver_bump: str = (last_commit.sem_ver_bump or "none") if last_commit else "none"
commit_breaking: bool = bool(last_commit.breaking_changes) if last_commit else False
commit_is_agent: bool = bool(commit_agent_id)
# ── Phases 2/3/4: load symbol history once, run phases concurrently ───
head_cid = str(file_meta["commit_id"]) if file_meta else ""
try:
file_sh = await load_symbol_history(db, repo_id, file_path=norm_path)
except Exception:
file_sh = {}
file_symbols, file_history, file_intel = await asyncio.gather(
_fetch_file_symbols_from_history(db, repo_id, norm_path, file_sh),
_fetch_file_history(db, repo_id, norm_path, head_cid) if head_cid else _empty_list(),
_fetch_file_intel_from_history(db, repo_id, norm_path, file_sh),
)
# ── Phase 2 post-processing ────────────────────────────────────────────
if not file_symbols and content:
file_symbols = _parse_symbols_from_content(norm_path, content)
elif file_symbols and content:
# Symbol index entries lack lineno — enrich from AST when content is available.
_enrich_with_linenos(file_symbols, norm_path, content)
has_outline = bool(file_symbols)
# Build symbolLines from a full AST parse so every symbol in the file is
# reachable via #S: deep links, regardless of whether it appears in the index.
symbol_line_map = (
_symbol_line_map_from_content(norm_path, content)
if content
else _symbol_line_map(file_symbols)
)
# Breadcrumb path segments
path_segments: list[tuple[str, str]] = []
accumulated = ""
for seg in norm_path.split("/")[:-1]:
accumulated = f"{accumulated}/{seg}" if accumulated else seg
path_segments.append((seg, f"{base_url}/tree/{ref}/{accumulated}"))
ctx: JSONObject = {
"owner": owner,
"repo_slug": repo_slug,
"repo_id": repo_id,
"ref": ref,
"file_path": norm_path,
"filename": filename,
"path_segments": path_segments,
"base_url": base_url,
"current_page": "tree",
"lang": lang,
"is_binary": is_binary,
"is_markdown": is_markdown,
"md_html": md_html,
"size_bytes": size_bytes,
"lines": lines,
"line_count": line_count,
"blob_found": blob_found,
"object_id": object_id,
"object_id_short": object_id if object_id else "",
"snapshot_id": snapshot_id,
"last_commit": last_commit.commit_id if last_commit else "",
"last_commit_full": last_commit.commit_id if last_commit else "",
"last_commit_msg": (last_commit.message.split("\n")[0] if last_commit else ""),
"last_commit_time": last_commit.timestamp if last_commit else None,
"last_commit_author": last_commit.author if last_commit else "",
# Phase 1: provenance
"commit_agent_id": commit_agent_id,
"commit_model_id": commit_model_id,
"commit_model_label": _model_label(commit_model_id) if commit_model_id else "",
"commit_sem_ver_bump": commit_sem_ver_bump,
"commit_breaking": commit_breaking,
"commit_is_agent": commit_is_agent,
# Phase 2: outline panel + symbol→lineno map for JS deep linking
"file_symbols": file_symbols,
"has_outline": has_outline,
"symbol_line_map": symbol_line_map,
# Phase 3: provenance timeline
"file_history": file_history,
# Phase 4: intelligence signals
"file_intel": file_intel,
}
ctx.update(nav_ctx)
return json_or_html(
request,
lambda: templates.TemplateResponse(request, "musehub/pages/blob.html", ctx),
ctx,
)