"""TDD — fetch.mpack.prebuild job handler and wire_fetch_mpack cache (issue #92 Phases 2–5). Test IDs: FMC_07 Unit: mock wire_fetch_mpack, confirm cache rows written for each tip, confirm existing fresh entries are skipped FMC_08 Integration: insert a job row, run the handler, verify MusehubFetchMPackCache row exists with correct repo_id/tip/mpack_id FMC_13 Unit: cache hit returns presigned URL without entering blob-load path FMC_14 Unit: cache miss builds and writes cache row FMC_18 Integration: enqueue_push_intel inserts fetch.mpack.prebuild with branch tips FMC_20 Unit: gc_fetch_mpack_cache deletes expired rows + R2 objects; fresh rows untouched """ from __future__ import annotations import hashlib from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, patch import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.db.musehub_repo_models import MusehubFetchMPackCache from musehub.services.musehub_gc import gc_fetch_mpack_cache from musehub.services.musehub_jobs import enqueue_push_intel from musehub.services.musehub_wire_fetch import process_fetch_mpack_prebuild_job, wire_fetch_mpack from tests.factories import create_branch, create_repo def _now() -> datetime: return datetime.now(tz=timezone.utc) def _fake_commit_id(seed: str) -> str: return "sha256:" + hashlib.sha256(seed.encode()).hexdigest() def _fake_mpack_id(seed: str) -> str: return "sha256:" + hashlib.sha256(f"mpack-{seed}".encode()).hexdigest() async def _insert_job( session: AsyncSession, repo_id: str, tip_commit_ids: list[str], ) -> str: now = _now() job_id = compute_job_id(repo_id, "fetch.mpack.prebuild", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="fetch.mpack.prebuild", payload={"tip_commit_ids": tip_commit_ids}, status="pending", created_at=now, attempt=0, )) await session.flush() return job_id # ── FMC_07 ──────────────────────────────────────────────────────────────────── @pytest.mark.tier2 async def test_fmc_07_builds_each_tip_and_writes_cache(db_session: AsyncSession) -> None: """FMC_07a: handler calls wire_fetch_mpack once per tip and writes cache rows.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") tip_a = _fake_commit_id("tip-a") tip_b = _fake_commit_id("tip-b") mpack_a = _fake_mpack_id("tip-a") mpack_b = _fake_mpack_id("tip-b") job_id = await _insert_job(db_session, repo.repo_id, [tip_a, tip_b]) await db_session.commit() side_effects = [ {"mpack_id": mpack_a, "mpack_url": "https://r2.example/a", "commit_count": 1, "blob_count": 2}, {"mpack_id": mpack_b, "mpack_url": "https://r2.example/b", "commit_count": 1, "blob_count": 3}, ] with patch( "musehub.services.musehub_wire_fetch.wire_fetch_mpack", new_callable=AsyncMock, side_effect=side_effects, ) as mock_build: result = await process_fetch_mpack_prebuild_job(db_session, job_id) await db_session.commit() assert mock_build.call_count == 2 assert result["tips_requested"] == 2 assert result["tips_built"] == 2 assert result["tips_skipped"] == 0 rows = (await db_session.execute( select(MusehubFetchMPackCache) .where(MusehubFetchMPackCache.repo_id == repo.repo_id) .order_by(MusehubFetchMPackCache.tip_commit_id) )).scalars().all() assert len(rows) == 2 mpack_ids = {r.tip_commit_id: r.mpack_id for r in rows} assert mpack_ids[tip_a] == mpack_a assert mpack_ids[tip_b] == mpack_b @pytest.mark.tier2 async def test_fmc_07b_skips_tips_with_fresh_cache(db_session: AsyncSession) -> None: """FMC_07b: tips with a non-expired cache entry are skipped without calling wire_fetch_mpack.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") tip_cached = _fake_commit_id("tip-cached") tip_new = _fake_commit_id("tip-new") existing_mpack = _fake_mpack_id("existing") new_mpack = _fake_mpack_id("new") # Pre-populate a fresh cache entry for tip_cached. cache_id = hashlib.sha256((repo.repo_id + tip_cached).encode()).hexdigest() db_session.add(MusehubFetchMPackCache( cache_id=cache_id, repo_id=repo.repo_id, tip_commit_id=tip_cached, mpack_id=existing_mpack, created_at=_now(), expires_at=_now() + timedelta(days=7), )) job_id = await _insert_job(db_session, repo.repo_id, [tip_cached, tip_new]) await db_session.commit() with patch( "musehub.services.musehub_wire_fetch.wire_fetch_mpack", new_callable=AsyncMock, return_value={"mpack_id": new_mpack, "mpack_url": "https://r2.example/new", "commit_count": 1, "blob_count": 1}, ) as mock_build: result = await process_fetch_mpack_prebuild_job(db_session, job_id) await db_session.commit() # Only the new tip should have triggered a build. assert mock_build.call_count == 1 assert mock_build.call_args[1]["want"] == [tip_new] or mock_build.call_args[0][2] == [tip_new] assert result["tips_built"] == 1 assert result["tips_skipped"] == 1 # The pre-existing cache entry must be unchanged. cached_row = (await db_session.execute( select(MusehubFetchMPackCache) .where(MusehubFetchMPackCache.repo_id == repo.repo_id) .where(MusehubFetchMPackCache.tip_commit_id == tip_cached) )).scalar_one() assert cached_row.mpack_id == existing_mpack @pytest.mark.tier2 async def test_fmc_07c_empty_payload_is_a_noop(db_session: AsyncSession) -> None: """FMC_07c: job with no tip_commit_ids returns zeros without calling wire_fetch_mpack.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") job_id = await _insert_job(db_session, repo.repo_id, []) await db_session.commit() with patch( "musehub.services.musehub_wire_fetch.wire_fetch_mpack", new_callable=AsyncMock, ) as mock_build: result = await process_fetch_mpack_prebuild_job(db_session, job_id) assert mock_build.call_count == 0 assert result["tips_requested"] == 0 assert result["tips_built"] == 0 # ── FMC_08 ──────────────────────────────────────────────────────────────────── @pytest.mark.tier2 async def test_fmc_08_cache_row_has_correct_fields(db_session: AsyncSession) -> None: """FMC_08: after the handler runs, cache row has matching repo_id, tip, and mpack_id.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") tip = _fake_commit_id("integration-tip") mpack = _fake_mpack_id("integration") job_id = await _insert_job(db_session, repo.repo_id, [tip]) await db_session.commit() with patch( "musehub.services.musehub_wire_fetch.wire_fetch_mpack", new_callable=AsyncMock, return_value={"mpack_id": mpack, "mpack_url": "https://r2.example/int", "commit_count": 5, "blob_count": 10}, ): await process_fetch_mpack_prebuild_job(db_session, job_id) await db_session.commit() row = (await db_session.execute( select(MusehubFetchMPackCache) .where(MusehubFetchMPackCache.repo_id == repo.repo_id) .where(MusehubFetchMPackCache.tip_commit_id == tip) )).scalar_one() assert row.repo_id == repo.repo_id assert row.tip_commit_id == tip assert row.mpack_id == mpack assert row.expires_at > _now() # ── FMC_13 ──────────────────────────────────────────────────────────────────── @pytest.mark.tier2 async def test_fmc_13_cache_hit_returns_presigned_url_without_blob_load(db_session: AsyncSession) -> None: """FMC_13: cache hit returns the presigned URL immediately; blob-load path is never entered.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") tip = _fake_commit_id("fmc13-tip") cached_mpack = _fake_mpack_id("fmc13-tip") expected_url = "https://r2.example/cached-fmc13" # Pre-populate a fresh cache entry. cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest() db_session.add(MusehubFetchMPackCache( cache_id=cache_id, repo_id=repo.repo_id, tip_commit_id=tip, mpack_id=cached_mpack, created_at=_now(), expires_at=_now() + timedelta(days=7), )) await db_session.commit() mock_backend = AsyncMock() mock_backend.presign_mpack_get.return_value = expected_url with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock) as mock_walk: result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[]) # The expensive DAG walk must never have been called. mock_walk.assert_not_called() # The returned URL must match the presigned URL for the cached mpack. assert result["mpack_url"] == expected_url assert result["mpack_id"] == cached_mpack # ── FMC_14 ──────────────────────────────────────────────────────────────────── @pytest.mark.tier2 async def test_fmc_14_cache_miss_builds_and_writes_cache_row(db_session: AsyncSession) -> None: """FMC_14: on a cache miss, wire_fetch_mpack builds the mpack and writes a cache row.""" from types import SimpleNamespace repo = await create_repo(db_session, owner="gabriel", visibility="public") tip = _fake_commit_id("fmc14-tip") built_mpack = _fake_mpack_id("fmc14-tip") built_url = "https://r2.example/built-fmc14" # No cache row exists — this is a cold miss. mock_backend = AsyncMock() mock_backend.put_mpack.return_value = None mock_backend.presign_mpack_get.return_value = built_url mock_backend.delete.return_value = None # _walk_commit_delta returns one proxy commit with no snapshot so that # wire_fetch_mpack proceeds to build an empty-but-valid mpack without # needing real commit/snapshot/object rows in the DB. fake_proxy = SimpleNamespace(commit_id=tip, snapshot_id=None, parent_ids=[]) with patch("musehub.services.musehub_wire_fetch.get_backend", return_value=mock_backend), \ patch("musehub.services.musehub_wire_fetch._walk_commit_delta", new_callable=AsyncMock, return_value={tip: fake_proxy}), \ patch("muse.core.mpack.build_wire_mpack", return_value=b"MUSE\x00fake-mpack"), \ patch("musehub.services.musehub_wire_fetch.blob_id", return_value=built_mpack): result = await wire_fetch_mpack(db_session, repo.repo_id, want=[tip], have=[]) await db_session.commit() assert result["mpack_url"] == built_url assert result["mpack_id"] == built_mpack # The cache row must have been written. row = (await db_session.execute( select(MusehubFetchMPackCache) .where(MusehubFetchMPackCache.repo_id == repo.repo_id) .where(MusehubFetchMPackCache.tip_commit_id == tip) )).scalar_one() assert row.mpack_id == built_mpack assert row.expires_at > _now() # ── FMC_18 ──────────────────────────────────────────────────────────────────── @pytest.mark.tier2 async def test_fmc_18_enqueue_push_intel_creates_prebuild_job_with_branch_tips( db_session: AsyncSession, ) -> None: """FMC_18: enqueue_push_intel enqueues fetch.mpack.prebuild with all branch tip commit IDs.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") tip_a = _fake_commit_id("branch-main") tip_b = _fake_commit_id("branch-dev") await create_branch(db_session, repo.repo_id, name="main", head_commit_id=tip_a) await create_branch(db_session, repo.repo_id, name="dev", head_commit_id=tip_b) await enqueue_push_intel( db_session, repo.repo_id, head=tip_a, domain_id=None, branch="main", ) await db_session.commit() job_row = (await db_session.execute( select(MusehubBackgroundJob) .where(MusehubBackgroundJob.repo_id == repo.repo_id) .where(MusehubBackgroundJob.job_type == "fetch.mpack.prebuild") .where(MusehubBackgroundJob.status == "pending") )).scalar_one() tip_ids = set(job_row.payload.get("tip_commit_ids", [])) assert tip_a in tip_ids, f"main tip {tip_a[:20]} missing from payload" assert tip_b in tip_ids, f"dev tip {tip_b[:20]} missing from payload" # ── FMC_20 ──────────────────────────────────────────────────────────────────── @pytest.mark.tier2 async def test_fmc_20_gc_deletes_expired_rows_and_r2_objects_leaves_fresh_untouched( db_session: AsyncSession, ) -> None: """FMC_20: gc_fetch_mpack_cache deletes expired rows + R2 objects; fresh rows survive.""" repo = await create_repo(db_session, owner="gabriel", visibility="public") tip_expired_a = _fake_commit_id("expired-a") tip_expired_b = _fake_commit_id("expired-b") tip_fresh = _fake_commit_id("fresh") mpack_expired_a = _fake_mpack_id("expired-a") mpack_expired_b = _fake_mpack_id("expired-b") mpack_fresh = _fake_mpack_id("fresh") past = _now() - timedelta(days=1) future = _now() + timedelta(days=6) for tip, mpack, exp in [ (tip_expired_a, mpack_expired_a, past), (tip_expired_b, mpack_expired_b, past), (tip_fresh, mpack_fresh, future), ]: cache_id = hashlib.sha256((repo.repo_id + tip).encode()).hexdigest() db_session.add(MusehubFetchMPackCache( cache_id=cache_id, repo_id=repo.repo_id, tip_commit_id=tip, mpack_id=mpack, created_at=_now(), expires_at=exp, )) await db_session.commit() mock_backend = AsyncMock() mock_backend.delete.return_value = None with patch("musehub.services.musehub_gc.get_backend", return_value=mock_backend): n_deleted = await gc_fetch_mpack_cache(db_session, repo.repo_id) await db_session.commit() assert n_deleted == 2 # R2 delete called exactly once for each expired mpack — in any order. deleted_mpack_ids = {call.args[0] for call in mock_backend.delete.call_args_list} assert mpack_expired_a in deleted_mpack_ids assert mpack_expired_b in deleted_mpack_ids assert mpack_fresh not in deleted_mpack_ids # Expired rows gone from DB. remaining = (await db_session.execute( select(MusehubFetchMPackCache) .where(MusehubFetchMPackCache.repo_id == repo.repo_id) )).scalars().all() remaining_tips = {r.tip_commit_id for r in remaining} assert tip_expired_a not in remaining_tips assert tip_expired_b not in remaining_tips # Fresh row survives. assert tip_fresh in remaining_tips