"""TDD tests for the profile snapshot pre-computation pipeline. Covers: - test_snapshot_table_is_queryable — MusehubProfileSnapshot ORM model works - test_compute_and_persist_snapshot — _compute_and_persist_profile_snapshot writes a row - test_snapshot_is_read_by_profile_route — GET /handle serves data from snapshot (no live queries) - test_stale_snapshot_triggers_fallback — is_stale=True causes live fallback - test_missing_snapshot_triggers_fallback — missing row causes live fallback - test_enqueue_profile_snapshot — enqueue_profile_snapshot inserts a pending job - test_push_enqueues_profile_snapshot — musehub_wire enqueues profile.snapshot on push - test_profile_snapshot_provider_returns_empty — ProfileSnapshotProvider.compute() returns [] """ from __future__ import annotations import json from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import now_utc_iso from musehub.db.musehub_identity_models import MusehubIdentity, MusehubProfileSnapshot from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.db.musehub_repo_models import MusehubRepo from musehub.types.json_types import JSONObject # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _seed_identity( db: AsyncSession, *, handle: str = "snapuser", user_id: str = "snap-user-001", ) -> MusehubIdentity: identity = MusehubIdentity( identity_id=user_id, handle=handle, identity_type="human", bio="Snapshot test bio", avatar_url=None, ) db.add(identity) await db.commit() await db.refresh(identity) return identity async def _seed_repo( db: AsyncSession, *, owner: str = "snapuser", owner_user_id: str = "snap-user-001", slug: str = "snap-repo", ) -> MusehubRepo: from musehub.core.genesis import compute_repo_id repo_id = compute_repo_id(owner_user_id, slug, "code", now_utc_iso()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_user_id, ) db.add(repo) await db.commit() await db.refresh(repo) return repo async def _seed_snapshot( db: AsyncSession, *, handle: str = "snapuser", stats: JSONObject | None = None, is_stale: bool = False, ) -> MusehubProfileSnapshot: data = { "stats": stats or {"repo_count": 3, "commit_count": 42, "agent_count": 2, "avg_health": None}, "repos": [], "heatmap": {"days": [], "total": 0, "longest_streak": 0, "current_streak": 0}, "agent_fleet": [], "badges": [], "footprint": [], "activity_canvas": [], } snap = MusehubProfileSnapshot( handle=handle, data_json=json.dumps(data), computed_at=datetime.now(tz=timezone.utc), is_stale=is_stale, ) db.add(snap) await db.commit() await db.refresh(snap) return snap # --------------------------------------------------------------------------- # Phase 1 — ORM model # --------------------------------------------------------------------------- async def test_snapshot_table_is_queryable(db_session: AsyncSession) -> None: """MusehubProfileSnapshot ORM model persists and queries correctly.""" snap = MusehubProfileSnapshot( handle="tabletest", data_json='{"stats": {"repo_count": 1}}', computed_at=datetime.now(tz=timezone.utc), is_stale=False, ) db_session.add(snap) await db_session.commit() result = await db_session.execute( select(MusehubProfileSnapshot).where(MusehubProfileSnapshot.handle == "tabletest") ) row = result.scalar_one_or_none() assert row is not None assert row.handle == "tabletest" assert row.is_stale is False data = json.loads(row.data_json) assert data["stats"]["repo_count"] == 1 # --------------------------------------------------------------------------- # Phase 2 — ProfileSnapshotProvider # --------------------------------------------------------------------------- async def test_profile_snapshot_provider_returns_empty(db_session: AsyncSession) -> None: """ProfileSnapshotProvider.compute() always returns [] (writes directly to table).""" from musehub.services.musehub_intel_providers import ProfileSnapshotProvider from musehub.core.genesis import compute_repo_id await _seed_identity(db_session) repo = await _seed_repo(db_session) provider = ProfileSnapshotProvider() result = await provider.compute( db_session, repo.repo_id, "", {"handle": "snapuser"}, ) assert result == [] async def test_compute_and_persist_snapshot(db_session: AsyncSession) -> None: """_compute_and_persist_profile_snapshot writes a row to musehub_profile_snapshots.""" from musehub.services.musehub_intel_providers import _compute_and_persist_profile_snapshot await _seed_identity(db_session) await _seed_repo(db_session) await _compute_and_persist_profile_snapshot(db_session, "snapuser") await db_session.commit() result = await db_session.execute( select(MusehubProfileSnapshot).where(MusehubProfileSnapshot.handle == "snapuser") ) row = result.scalar_one_or_none() assert row is not None assert row.is_stale is False data = json.loads(row.data_json) assert "stats" in data assert "repos" in data assert "heatmap" in data assert "badges" in data assert "activity_canvas" in data async def test_compute_and_persist_snapshot_upserts(db_session: AsyncSession) -> None: """Re-running _compute_and_persist_profile_snapshot overwrites the existing row.""" from musehub.services.musehub_intel_providers import _compute_and_persist_profile_snapshot await _seed_identity(db_session) await _seed_repo(db_session) # First write await _compute_and_persist_profile_snapshot(db_session, "snapuser") await db_session.commit() # Second write — should not raise and should overwrite await _compute_and_persist_profile_snapshot(db_session, "snapuser") await db_session.commit() result = await db_session.execute( select(MusehubProfileSnapshot).where(MusehubProfileSnapshot.handle == "snapuser") ) rows = result.scalars().all() assert len(rows) == 1 # upsert, not insert async def test_compute_and_persist_snapshot_missing_identity(db_session: AsyncSession) -> None: """_compute_and_persist_profile_snapshot is a no-op for unknown handles.""" from musehub.services.musehub_intel_providers import _compute_and_persist_profile_snapshot # No identity seeded — should not raise await _compute_and_persist_profile_snapshot(db_session, "nobody-exists-xyz") await db_session.commit() result = await db_session.execute( select(MusehubProfileSnapshot).where(MusehubProfileSnapshot.handle == "nobody-exists-xyz") ) assert result.scalar_one_or_none() is None # --------------------------------------------------------------------------- # Phase 3 — enqueue_profile_snapshot # --------------------------------------------------------------------------- async def test_enqueue_profile_snapshot(db_session: AsyncSession) -> None: """enqueue_profile_snapshot inserts a pending profile.snapshot job.""" from musehub.services.musehub_jobs import enqueue_profile_snapshot await _seed_identity(db_session) repo = await _seed_repo(db_session) job_id = await enqueue_profile_snapshot(db_session, repo.repo_id, "snapuser") await db_session.commit() assert job_id is not None result = await db_session.execute( select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id) ) job = result.scalar_one_or_none() assert job is not None assert job.job_type == "profile.snapshot" assert job.status == "pending" payload = job.payload or {} assert payload.get("handle") == "snapuser" async def test_enqueue_profile_snapshot_is_idempotent(db_session: AsyncSession) -> None: """enqueue_profile_snapshot returns None if a pending job already exists.""" from musehub.services.musehub_jobs import enqueue_profile_snapshot await _seed_identity(db_session) repo = await _seed_repo(db_session) first = await enqueue_profile_snapshot(db_session, repo.repo_id, "snapuser") await db_session.commit() second = await enqueue_profile_snapshot(db_session, repo.repo_id, "snapuser") await db_session.commit() assert first is not None assert second is None # idempotent — no duplicate # --------------------------------------------------------------------------- # Phase 4 — SSR snapshot fast-path # --------------------------------------------------------------------------- async def test_snapshot_is_read_by_profile_route( client: AsyncClient, db_session: AsyncSession, ) -> None: """GET /handle serves stats from the pre-computed snapshot when present.""" await _seed_identity(db_session, handle="snapuser2", user_id="snap-user-002") await _seed_snapshot( db_session, handle="snapuser2", stats={"repo_count": 99, "commit_count": 777, "agent_count": 5, "avg_health": None}, ) resp = await client.get("/snapuser2?format=json") assert resp.status_code == 200 body = resp.json() # The JSON route serialises from the stats dict assert body["repoCount"] == 99 assert body["commitCount"] == 777 async def test_stale_snapshot_triggers_fallback( client: AsyncClient, db_session: AsyncSession, ) -> None: """is_stale=True snapshot is ignored; live computation runs instead.""" await _seed_identity(db_session, handle="staleuser", user_id="stale-001") await _seed_snapshot( db_session, handle="staleuser", stats={"repo_count": 999, "commit_count": 9999, "agent_count": 0, "avg_health": None}, is_stale=True, ) resp = await client.get("/staleuser?format=json") assert resp.status_code == 200 body = resp.json() # Live computation returns 0 repos (no repos seeded), not the stale 999 assert body["repoCount"] == 0 async def test_missing_snapshot_falls_back_to_live( client: AsyncClient, db_session: AsyncSession, ) -> None: """When no snapshot exists the route computes live and returns valid data.""" await _seed_identity(db_session, handle="nosnapuser", user_id="nosnap-001") resp = await client.get("/nosnapuser?format=json") assert resp.status_code == 200 body = resp.json() assert body["handle"] == "nosnapuser" assert body["repoCount"] == 0 async def test_snapshot_html_route_serves_200( client: AsyncClient, db_session: AsyncSession, ) -> None: """Profile HTML route works with a pre-computed snapshot.""" await _seed_identity(db_session, handle="htmlsnap", user_id="htmlsnap-001") await _seed_snapshot(db_session, handle="htmlsnap") resp = await client.get("/htmlsnap") assert resp.status_code == 200 assert "text/html" in resp.headers["content-type"]