"""TDD tests for get_last_commit_for_file index-lookup optimization. Problem: get_last_commit_for_file scans up to 200 snapshot manifests to find which commit last touched a file — O(N) per page load. Fix: query musehub_symbol_history_entries WHERE repo_id=? AND address=? (or address LIKE 'path::%') ordered by committed_at DESC LIMIT 1. The index ix_symbol_history_repo_address makes this O(1). Fall back to the snapshot scan only when no history entries exist for the file. Test matrix ----------- test_get_last_commit_skips_snapshot_scan_when_history_exists When symbol_history_entries has entries for the file, no snapshot manifests must be fetched at all. test_get_last_commit_returns_most_recent_history_entry Returns the commit whose committed_at is latest among file-level entries. test_get_last_commit_falls_back_to_snapshot_scan_when_no_history When no history entries exist, falls back to snapshot scan (existing behaviour preserved). test_get_last_commit_matches_symbol_entries_for_file Symbol-level addresses (path::Symbol) for the same file also count — if the file path appears as a prefix, the most recent matching entry is used. test_get_last_commit_ignores_entries_for_other_files Entries for a different file path don't pollute the result. """ from __future__ import annotations import secrets from contextlib import asynccontextmanager from datetime import datetime, timezone, timedelta from typing import AsyncGenerator from unittest.mock import AsyncMock, patch import msgpack import pytest from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db import database as _database from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.services.musehub_repository import get_last_commit_for_file from musehub.types.json_types import StrDict from muse.core.types import long_id, now_utc_iso ManifestBatch = dict[str, StrDict] # ── Constants ───────────────────────────────────────────────────────────────── _OWNER_ID = compute_identity_id(b"lcf-index-tester") _FILE = "musehub/core/billing.py" _OTHER = "musehub/core/auth.py" # ── Helpers ─────────────────────────────────────────────────────────────────── def _uid() -> str: return long_id(secrets.token_hex(32)) def _repo_id() -> str: return compute_repo_id( _OWNER_ID, f"lcf-idx-{secrets.token_hex(4)}", "code", now_utc_iso(), ) def _snap_id() -> str: return long_id(secrets.token_hex(32)) def _obj(tag: str) -> str: return long_id(tag.encode().hex().ljust(64, "0")) def _blob(manifest: StrDict) -> bytes: return msgpack.packb(manifest, use_bin_type=True) async def _make_repo(session: AsyncSession) -> str: rid = _repo_id() now = datetime.now(tz=timezone.utc) session.add(MusehubRepo( repo_id=rid, name="lcf-idx-test", owner="lcf-idx-tester", slug="lcf-idx-test", visibility="public", owner_user_id=_OWNER_ID, created_at=now, updated_at=now, )) await session.commit() return rid async def _snap(session: AsyncSession, repo_id: str, manifest: StrDict) -> str: sid = _snap_id() session.add(MusehubSnapshot( snapshot_id=sid, directories=[], manifest_blob=_blob(manifest), entry_count=len(manifest), )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=sid)) await session.flush() return sid async def _commit( session: AsyncSession, repo_id: str, snapshot_id: str, branch: str = "main", offset: int = 0, message: str = "feat: change", ) -> str: cid = _uid() now = datetime.now(tz=timezone.utc) + timedelta(seconds=offset) session.add(MusehubCommit( commit_id=cid, branch=branch, parent_ids=[], message=message, author="tester", timestamp=now, snapshot_id=snapshot_id, )) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await session.flush() return cid async def _history_entry( session: AsyncSession, repo_id: str, commit_id: str, address: str, offset: int = 0, op: str = "modify", message: str = "feat: change", ) -> None: """Insert a MusehubSymbolHistoryEntry for the given address.""" now = datetime.now(tz=timezone.utc) + timedelta(seconds=offset) session.add(MusehubSymbolHistoryEntry( repo_id=repo_id, address=address, commit_id=commit_id, committed_at=now, author="tester", op=op, message=message, )) await session.flush() @asynccontextmanager async def _fresh_session() -> AsyncGenerator[AsyncSession, None]: async with _database._async_session_factory() as session: yield session # ── Tests ───────────────────────────────────────────────────────────────────── @pytest.mark.anyio async def test_get_last_commit_skips_snapshot_scan_when_history_exists( db_session: AsyncSession, ) -> None: """No snapshot manifests fetched when symbol_history_entries has file entries.""" repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0) await _history_entry(db_session, repo_id, c1, _FILE, offset=0) await db_session.commit() batch_calls: list[list[str]] = [] async def _spy_batch(session: AsyncSession, ids: list[str]) -> ManifestBatch: batch_calls.append(ids) return {} with patch( "musehub.services.musehub_repository.get_snapshot_manifests_batch", side_effect=_spy_batch, ): async with _fresh_session() as rs: await get_last_commit_for_file(rs, repo_id, _FILE, c1) assert batch_calls == [], ( f"get_snapshot_manifests_batch called {len(batch_calls)} time(s) " "even though symbol_history_entries has entries for the file" ) @pytest.mark.anyio async def test_get_last_commit_returns_most_recent_history_entry( db_session: AsyncSession, ) -> None: """Returns the commit with the latest committed_at among file-level entries.""" repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0, message="init") s2 = await _snap(db_session, repo_id, {_FILE: _obj("v2")}) c2 = await _commit(db_session, repo_id, s2, offset=10, message="feat: v2") s3 = await _snap(db_session, repo_id, {_FILE: _obj("v3")}) c3 = await _commit(db_session, repo_id, s3, offset=20, message="feat: v3") await _history_entry(db_session, repo_id, c1, _FILE, offset=0) await _history_entry(db_session, repo_id, c2, _FILE, offset=10) await _history_entry(db_session, repo_id, c3, _FILE, offset=20) await db_session.commit() async with _fresh_session() as rs: result = await get_last_commit_for_file(rs, repo_id, _FILE, c3) assert result is not None assert result.commit_id == c3, ( f"Expected most recent commit {c3}, got {result.commit_id}" ) @pytest.mark.anyio async def test_get_last_commit_falls_back_to_snapshot_scan_when_no_history( db_session: AsyncSession, ) -> None: """Falls back to snapshot scan when no history entries exist for the file.""" repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0, message="init") # No history entries seeded — fallback path must be taken. await db_session.commit() async with _fresh_session() as rs: result = await get_last_commit_for_file(rs, repo_id, _FILE, c1) assert result is not None assert result.commit_id == c1 @pytest.mark.anyio async def test_get_last_commit_matches_symbol_entries_for_file( db_session: AsyncSession, ) -> None: """Symbol-level addresses (path::Symbol) for the file also trigger index path.""" repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1")}) c1 = await _commit(db_session, repo_id, s1, offset=0) s2 = await _snap(db_session, repo_id, {_FILE: _obj("v2")}) c2 = await _commit(db_session, repo_id, s2, offset=10, message="feat: update fn") # Only a symbol-level entry, no bare file entry. await _history_entry(db_session, repo_id, c2, f"{_FILE}::compute_total", offset=10) await db_session.commit() batch_calls: list[list[str]] = [] async def _spy_batch(session: AsyncSession, ids: list[str]) -> ManifestBatch: batch_calls.append(ids) return {} with patch( "musehub.services.musehub_repository.get_snapshot_manifests_batch", side_effect=_spy_batch, ): async with _fresh_session() as rs: result = await get_last_commit_for_file(rs, repo_id, _FILE, c2) assert batch_calls == [], "snapshot scan triggered despite symbol history entries" assert result is not None assert result.commit_id == c2 @pytest.mark.anyio async def test_get_last_commit_ignores_entries_for_other_files( db_session: AsyncSession, ) -> None: """History entries for a different file don't affect the result.""" repo_id = await _make_repo(db_session) s1 = await _snap(db_session, repo_id, {_FILE: _obj("v1"), _OTHER: _obj("o1")}) c1 = await _commit(db_session, repo_id, s1, offset=0) s2 = await _snap(db_session, repo_id, {_FILE: _obj("v1"), _OTHER: _obj("o2")}) c2 = await _commit(db_session, repo_id, s2, offset=10) # Only _OTHER has history entries; _FILE has none. await _history_entry(db_session, repo_id, c2, _OTHER, offset=10) await db_session.commit() # The index path should NOT be taken for _FILE — must fall back to snapshot scan. async with _fresh_session() as rs: result = await get_last_commit_for_file(rs, repo_id, _FILE, c2) # File exists in both snapshots with same oid → c1 introduced it assert result is not None assert result.commit_id == c1