"""Tier 5 — State integrity tests for musehub_intel_clones (issue #17). Validates that the DB table invariants hold under normal and pathological conditions: JSON parseability, tier enum, count/content agreement, upsert idempotency, stale-row update, and CASCADE delete behavior. Cases: SI01 All DB rows have parseable members_json SI02 All tier values are exactly "exact" or "near" SI03 member_count matches len(json.loads(members_json)) for each row SI04 ClonesProvider upsert is idempotent — running twice = same row count SI05 ClonesProvider upsert updates existing row on re-run with new count SI06 CASCADE delete — deleting repo removes all clones rows """ from __future__ import annotations import json from unittest.mock import AsyncMock, patch import pytest import pytest_asyncio import sqlalchemy as sa from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelClones from musehub.db.musehub_repo_models import MusehubRepo from muse.core.types import long_id from tests.factories import create_repo _REF = long_id("a" * 64) def _make_members(n: int, single_file: bool = False) -> str: return json.dumps([ { "address": f"src/{'a' if single_file else chr(97+i%4)}.py::fn_{i}", "kind": "function", "language": "Python", "body_hash": long_id("a" * 64), "signature_id": long_id("b" * 64), "content_id": long_id("a" * 64), } for i in range(n) ]) async def _insert( session: AsyncSession, repo_id: str, cluster_hash: str, tier: str = "exact", member_count: int = 2, members_json: str | None = None, ) -> None: mj = members_json if members_json is not None else _make_members(member_count) await session.execute( pg_insert(MusehubIntelClones) .values( repo_id=repo_id, cluster_hash=cluster_hash, tier=tier, member_count=member_count, members_json=mj, ref=_REF, ) .on_conflict_do_update( index_elements=["repo_id", "cluster_hash"], set_={"tier": tier, "member_count": member_count, "members_json": mj}, ) ) await session.commit() @pytest_asyncio.fixture async def repo(db_session: AsyncSession) -> MusehubRepo: return await create_repo(db_session, owner="siuser", slug="state-integrity") class TestClonesStateIntegrity: @pytest.mark.asyncio async def test_SI01_all_rows_parseable_members_json( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Every members_json stored in the DB must deserialise without error.""" for i in range(5): await _insert( db_session, str(repo.repo_id), cluster_hash=f"sha256:si01{str(i).zfill(60)}", member_count=i + 2, members_json=_make_members(i + 2), ) result = await db_session.execute( sa.select(MusehubIntelClones).where( MusehubIntelClones.repo_id == str(repo.repo_id) ) ) rows = result.scalars().all() for row in rows: try: parsed = json.loads(row.members_json) assert isinstance(parsed, list) except json.JSONDecodeError as exc: pytest.fail(f"Unparseable members_json for {row.cluster_hash}: {exc}") @pytest.mark.asyncio async def test_SI02_tier_values_in_valid_set( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """All tier values must be exactly 'exact' or 'near'.""" for tier in ("exact", "near", "exact"): await _insert( db_session, str(repo.repo_id), cluster_hash=f"sha256:si02{tier[:1]}{str(id(tier)).zfill(59)}", tier=tier, ) result = await db_session.execute( sa.select(MusehubIntelClones.tier).where( MusehubIntelClones.repo_id == str(repo.repo_id) ) ) for (tier,) in result.all(): assert tier in ("exact", "near"), f"Unexpected tier value: {tier!r}" @pytest.mark.asyncio async def test_SI03_member_count_matches_json_length( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """member_count must equal the number of entries in members_json.""" for n in (2, 5, 10): await _insert( db_session, str(repo.repo_id), cluster_hash=f"sha256:si03n{n}{str(n).zfill(58)}", member_count=n, members_json=_make_members(n), ) result = await db_session.execute( sa.select(MusehubIntelClones).where( MusehubIntelClones.repo_id == str(repo.repo_id) ) ) for row in result.scalars().all(): actual = len(json.loads(row.members_json)) assert actual == row.member_count, ( f"{row.cluster_hash}: member_count={row.member_count} " f"but members_json has {actual} entries" ) @pytest.mark.asyncio async def test_SI04_upsert_is_idempotent( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Inserting the same cluster twice leaves exactly one row.""" h = long_id("4" * 64) mj = _make_members(3) for _ in range(2): await _insert( db_session, str(repo.repo_id), cluster_hash=h, member_count=3, members_json=mj, ) count_result = await db_session.execute( sa.select(sa.func.count()) .select_from(MusehubIntelClones) .where( MusehubIntelClones.repo_id == str(repo.repo_id), MusehubIntelClones.cluster_hash == h, ) ) assert count_result.scalar_one() == 1 @pytest.mark.asyncio async def test_SI05_upsert_updates_existing_row( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Re-running with a new member_count updates the existing row.""" h = long_id("5" * 64) await _insert(db_session, str(repo.repo_id), cluster_hash=h, member_count=2) new_mj = _make_members(7) await _insert( db_session, str(repo.repo_id), cluster_hash=h, member_count=7, members_json=new_mj, ) result = await db_session.execute( sa.select(MusehubIntelClones).where( MusehubIntelClones.repo_id == str(repo.repo_id), MusehubIntelClones.cluster_hash == h, ) ) row = result.scalar_one() assert row.member_count == 7 assert len(json.loads(row.members_json)) == 7 @pytest.mark.asyncio async def test_SI06_cascade_delete_removes_clones( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Deleting the repo cascades and removes all associated clone rows.""" for i in range(3): await _insert( db_session, str(repo.repo_id), cluster_hash=f"sha256:si06{str(i).zfill(60)}", ) # Verify rows exist before = await db_session.execute( sa.select(sa.func.count()) .select_from(MusehubIntelClones) .where(MusehubIntelClones.repo_id == str(repo.repo_id)) ) assert before.scalar_one() == 3 # Delete the repo await db_session.execute( sa.delete(MusehubRepo).where( MusehubRepo.repo_id == str(repo.repo_id) ) ) await db_session.commit() # Clones rows must be gone after = await db_session.execute( sa.select(sa.func.count()) .select_from(MusehubIntelClones) .where(MusehubIntelClones.repo_id == str(repo.repo_id)) ) assert after.scalar_one() == 0