"""TDD tests for blob page performance fixes. Problem 1: _fetch_file_history scans 300 snapshot manifests every page load. Fix: use musehub_symbol_history_entries as fast path (same index as get_last_commit_for_file). Fall back to snapshot scan only when no entries exist. Problem 2: _fetch_file_symbols and _fetch_file_intel each call load_symbol_history independently — two identical DB queries per page load. Fix: call load_symbol_history once in blob_page, pass result to both phases. Test matrix ----------- test_fetch_file_history_skips_snapshot_scan_when_history_exists No snapshot manifests fetched when symbol_history_entries has entries for the file. test_fetch_file_history_returns_commits_from_history_index Returns correct commits ordered newest-first from the index. test_fetch_file_history_falls_back_when_no_history_entries When no history entries exist, falls back to snapshot scan. test_fetch_file_history_deduplicates_commits Multiple addresses for the same file (path::SymA, path::SymB) in the same commit produce one history entry, not one per symbol. test_blob_page_calls_load_symbol_history_once load_symbol_history is called at most once per blob_page request, never twice. """ from __future__ import annotations import secrets from contextlib import asynccontextmanager from datetime import datetime, timezone, timedelta from typing import AsyncGenerator from unittest.mock import AsyncMock, patch, call import msgpack import pytest from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.db import database as _database from musehub.types.json_types import JSONObject, StrDict from muse.core.types import long_id, now_utc_iso ManifestBatch = dict[str, StrDict] # ── Constants ───────────────────────────────────────────────────────────────── _OWNER_ID = compute_identity_id(b"blob-perf-tester") _FILE = "src/billing.py" _OTHER = "src/auth.py" # ── Helpers ─────────────────────────────────────────────────────────────────── def _uid() -> str: return long_id(secrets.token_hex(32)) def _repo_id() -> str: return compute_repo_id( _OWNER_ID, f"bp-{secrets.token_hex(4)}", "code", now_utc_iso(), ) def _snap_id() -> str: return long_id(secrets.token_hex(32)) def _obj(tag: str) -> str: return long_id(tag.encode().hex().ljust(64, "0")) def _blob(manifest: StrDict) -> bytes: return msgpack.packb(manifest, use_bin_type=True) async def _make_repo(session: AsyncSession) -> str: rid = _repo_id() now = datetime.now(tz=timezone.utc) session.add(MusehubRepo( repo_id=rid, name="bp-test", owner="bp-tester", slug="bp-test", visibility="public", owner_user_id=_OWNER_ID, created_at=now, updated_at=now, )) await session.commit() return rid async def _snap(session: AsyncSession, repo_id: str, manifest: StrDict) -> str: sid = _snap_id() now = datetime.now(tz=timezone.utc) session.add(MusehubSnapshot( snapshot_id=sid, directories=[], manifest_blob=_blob(manifest), entry_count=len(manifest), created_at=now, )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=sid, created_at=now)) await session.flush() return sid async def _commit( session: AsyncSession, repo_id: str, snapshot_id: str, offset: int = 0, message: str = "feat: change", author: str = "tester", ) -> MusehubCommit: cid = _uid() now = datetime.now(tz=timezone.utc) + timedelta(seconds=offset) row = MusehubCommit( commit_id=cid, branch="main", parent_ids=[], message=message, author=author, timestamp=now, snapshot_id=snapshot_id, ) session.add(row) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await session.flush() return row async def _history_entry( session: AsyncSession, repo_id: str, commit_id: str, address: str, offset: int = 0, op: str = "modify", message: str = "feat: change", ) -> None: now = datetime.now(tz=timezone.utc) + timedelta(seconds=offset) session.add(MusehubSymbolHistoryEntry( repo_id=repo_id, address=address, commit_id=commit_id, committed_at=now, author="tester", op=op, message=message, )) await session.flush() @asynccontextmanager async def _fresh_session() -> AsyncGenerator[AsyncSession, None]: async with _database._async_session_factory() as session: yield session # ── Tests: _fetch_file_history fast path ────────────────────────────────────── @pytest.mark.anyio async def test_fetch_file_history_skips_snapshot_scan_when_history_exists( db_session: AsyncSession, ) -> None: """No snapshot manifests fetched when symbol_history_entries has file entries.""" import musehub.api.routes.musehub.ui_blob as _blob_mod repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0) await _history_entry(db_session, repo_id, c1.commit_id, _FILE, offset=0) await db_session.commit() batch_calls: list[list[str]] = [] async def _spy_batch(session: AsyncSession, ids: list[str]) -> ManifestBatch: batch_calls.append(ids) return {} with patch( "musehub.api.routes.musehub.ui_blob.get_snapshot_manifests_batch", side_effect=_spy_batch, ): async with _fresh_session() as rs: result = await _blob_mod._fetch_file_history( rs, repo_id, _FILE, c1.commit_id ) assert batch_calls == [], ( f"get_snapshot_manifests_batch called {len(batch_calls)} time(s) " "even though symbol_history_entries has entries" ) assert len(result) == 1 assert result[0]["commit_id"] == c1.commit_id @pytest.mark.anyio async def test_fetch_file_history_returns_commits_from_history_index( db_session: AsyncSession, ) -> None: """Returns commits ordered newest-first, drawn from the history index.""" import musehub.api.routes.musehub.ui_blob as _blob_mod repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0, message="init") s2 = await _snap(db_session, repo_id, {_FILE: _obj("v2")}) c2 = await _commit(db_session, repo_id, s2, offset=10, message="feat: v2") s3 = await _snap(db_session, repo_id, {_FILE: _obj("v3")}) c3 = await _commit(db_session, repo_id, s3, offset=20, message="feat: v3") await _history_entry(db_session, repo_id, c1.commit_id, _FILE, offset=0) await _history_entry(db_session, repo_id, c2.commit_id, _FILE, offset=10) await _history_entry(db_session, repo_id, c3.commit_id, _FILE, offset=20) await db_session.commit() async with _fresh_session() as rs: result = await _blob_mod._fetch_file_history( rs, repo_id, _FILE, c3.commit_id ) assert [r["commit_id"] for r in result] == [c3.commit_id, c2.commit_id, c1.commit_id] @pytest.mark.anyio async def test_fetch_file_history_falls_back_when_no_history_entries( db_session: AsyncSession, ) -> None: """Falls back to snapshot scan when no history entries exist.""" import musehub.api.routes.musehub.ui_blob as _blob_mod repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0) # No history entries. await db_session.commit() async with _fresh_session() as rs: result = await _blob_mod._fetch_file_history( rs, repo_id, _FILE, c1.commit_id ) assert len(result) == 1 assert result[0]["commit_id"] == c1.commit_id @pytest.mark.anyio async def test_fetch_file_history_deduplicates_commits( db_session: AsyncSession, ) -> None: """Multiple symbol addresses from the same commit produce one history entry.""" import musehub.api.routes.musehub.ui_blob as _blob_mod repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0) # Three addresses all pointing to the same commit. await _history_entry(db_session, repo_id, c1.commit_id, _FILE, offset=0) await _history_entry(db_session, repo_id, c1.commit_id, f"{_FILE}::compute", offset=0) await _history_entry(db_session, repo_id, c1.commit_id, f"{_FILE}::validate", offset=0) await db_session.commit() async with _fresh_session() as rs: result = await _blob_mod._fetch_file_history( rs, repo_id, _FILE, c1.commit_id ) assert len(result) == 1, ( f"Expected 1 deduplicated entry, got {len(result)}" ) assert result[0]["commit_id"] == c1.commit_id # ── Tests: load_symbol_history called once per request ──────────────────────── @pytest.mark.anyio async def test_blob_page_calls_load_symbol_history_once( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, ) -> None: """load_symbol_history is called at most once per blob_page request.""" import musehub.api.routes.musehub.ui_blob as _blob_mod call_count = 0 async def _counting_load(session: AsyncSession, repo_id: str, *, file_path: str | None = None) -> None: nonlocal call_count call_count += 1 return {} monkeypatch.setattr(_blob_mod, "load_symbol_history", _counting_load, raising=False) repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0) await db_session.commit() # Call both phases the way blob_page does. import asyncio async with _fresh_session() as rs: sh = await _counting_load(rs, repo_id, file_path=_FILE) await asyncio.gather( _blob_mod._fetch_file_symbols_from_history(rs, repo_id, _FILE, sh), _blob_mod._fetch_file_intel_from_history(rs, repo_id, _FILE, sh), ) assert call_count == 1, ( f"load_symbol_history called {call_count} time(s), expected 1" )