"""Tests for checklist section 5.1 — Database integrity. Covers: - updated_at present on critical tables - FK constraints enforced (PostgreSQL) - Orphan object scan (scan and delete) """ from __future__ import annotations import pytest from muse.core.types import fake_id from sqlalchemy.ext.asyncio import AsyncSession from tests.factories import create_repo # ── updated_at on critical tables ───────────────────────────────────────────── def test_musehub_repo_has_updated_at() -> None: from musehub.db.musehub_repo_models import MusehubRepo cols = {c.name for c in MusehubRepo.__table__.columns} assert "updated_at" in cols, "MusehubRepo is missing updated_at column" def test_musehub_proposal_has_updated_at() -> None: from musehub.db.musehub_social_models import MusehubProposal cols = {c.name for c in MusehubProposal.__table__.columns} assert "updated_at" in cols, "MusehubProposal is missing updated_at column" def test_musehub_webhook_has_updated_at() -> None: from musehub.db.musehub_webhook_models import MusehubWebhook cols = {c.name for c in MusehubWebhook.__table__.columns} assert "updated_at" in cols, "MusehubWebhook is missing updated_at column" def test_musehub_release_has_updated_at() -> None: from musehub.db.musehub_release_models import MusehubRelease cols = {c.name for c in MusehubRelease.__table__.columns} assert "updated_at" in cols, "MusehubRelease is missing updated_at column" def test_existing_critical_tables_have_updated_at() -> None: """Spot-check that pre-existing updated_at columns are still present.""" from musehub.db.musehub_social_models import MusehubIssue, MusehubIssueComment for model in (MusehubIssue, MusehubIssueComment): cols = {c.name for c in model.__table__.columns} assert "updated_at" in cols, f"{model.__name__} is missing updated_at" async def test_repo_updated_at_is_populated_on_create( db_session: AsyncSession, ) -> None: """A freshly created repo must have a non-null updated_at.""" repo = await create_repo(db_session, slug="updated-at-test", owner="testuser") assert repo.updated_at is not None, "updated_at must be set on repo creation" # ── Orphan object scan ───────────────────────────────────────────────────────── async def test_orphan_scan_returns_empty_when_no_orphans( db_session: AsyncSession, ) -> None: """scan_orphan_objects must return empty when all objects have at least one ref.""" from musehub.maintenance.orphan_scan import scan_orphan_objects from musehub.db.musehub_repo_models import MusehubObject, MusehubObjectRef repo = await create_repo(db_session, slug="orphan-scan-clean", owner="testuser") oid = fake_id("clean-object") obj = MusehubObject( object_id=oid, path="test.bin", size_bytes=4, storage_uri=f"s3://muse-objects/objects/{oid}", ) db_session.add(obj) db_session.add(MusehubObjectRef(repo_id=repo.repo_id, object_id=oid)) await db_session.commit() result = await scan_orphan_objects(db_session) assert result.ok assert result.count == 0 async def test_orphan_scan_detects_objects_with_no_refs( db_session: AsyncSession, ) -> None: """scan_orphan_objects must find objects that have no row in musehub_object_refs.""" from musehub.maintenance.orphan_scan import scan_orphan_objects from musehub.db.musehub_repo_models import MusehubObject orphan_obj_id = fake_id("orphan-object") # Insert an object with no corresponding ref row. obj = MusehubObject( object_id=orphan_obj_id, path="orphan.bin", size_bytes=4, storage_uri=f"s3://muse-objects/objects/{orphan_obj_id}", ) db_session.add(obj) await db_session.commit() result = await scan_orphan_objects(db_session) assert not result.ok, "Orphan scan should detect the unreferenced object" assert orphan_obj_id in result.orphaned_object_ids # ── ingest_push parent validation (Phase 8 / invariant 8 parity) ────────────── async def test_ingest_push_rejects_missing_external_parent( db_session: AsyncSession, ) -> None: """ingest_push() must raise ValueError when a parent commit is not in DB. A client pushing a commit that references a fabricated or missing parent_id must be rejected. Without this guard, history becomes unreadable because muse log walks off the end when it tries to fetch the non-existent parent. """ from musehub.models.musehub import CommitInput, ObjectInput from musehub.services.musehub_sync import ingest_push from tests.factories import create_repo from muse.core.types import blob_id repo = await create_repo(db_session, slug="parent-val-test", owner="testuser") repo_id = str(repo.repo_id) ghost_parent_id = blob_id(b"nonexistent parent") commit_id = blob_id(b"orphan commit") commits = [ CommitInput( commit_id=commit_id, branch="main", parent_ids=[ghost_parent_id], message="commit with bogus parent", author="tester", timestamp="2026-01-01T00:00:00Z", snapshot_id=None, ) ] with pytest.raises(ValueError, match="missing_parent_commits"): await ingest_push( db_session, repo_id=repo_id, branch="main", head_commit_id=commit_id, commits=commits, snapshots=[], objects=[], force=False, author="tester", ) async def test_ingest_push_accepts_parent_in_same_mpack( db_session: AsyncSession, ) -> None: """ingest_push() must accept a commit whose parent is in the same push mpack.""" from musehub.models.musehub import CommitInput, ObjectInput from musehub.services.musehub_sync import ingest_push from tests.factories import create_repo from muse.core.types import blob_id repo = await create_repo(db_session, slug="mpack-parent-test", owner="testuser") repo_id = str(repo.repo_id) first_id = blob_id(b"first commit") second_id = blob_id(b"second commit") commits = [ CommitInput( commit_id=first_id, branch="main", parent_ids=[], message="genesis", author="tester", timestamp="2026-01-01T00:00:00Z", snapshot_id=None, ), CommitInput( commit_id=second_id, branch="main", parent_ids=[first_id], message="second", author="tester", timestamp="2026-01-01T00:01:00Z", snapshot_id=None, ), ] result = await ingest_push( db_session, repo_id=repo_id, branch="main", head_commit_id=second_id, commits=commits, snapshots=[], objects=[], force=False, author="tester", ) assert result.ok assert result.remote_head == second_id async def test_ingest_push_accepts_parent_already_in_db( db_session: AsyncSession, ) -> None: """ingest_push() must accept a commit whose parent is already stored in the DB.""" from musehub.models.musehub import CommitInput from musehub.services.musehub_sync import ingest_push from tests.factories import create_repo from muse.core.types import blob_id repo = await create_repo(db_session, slug="db-parent-test", owner="testuser") repo_id = str(repo.repo_id) first_id = blob_id(b"db-stored first commit") # Push the first commit to establish it in DB. await ingest_push( db_session, repo_id=repo_id, branch="main", head_commit_id=first_id, commits=[ CommitInput( commit_id=first_id, branch="main", parent_ids=[], message="genesis", author="tester", timestamp="2026-01-01T00:00:00Z", snapshot_id=None, ) ], snapshots=[], objects=[], force=False, author="tester", ) # Now push a second commit that references the DB-stored first commit. second_id = blob_id(b"db-stored second commit") result = await ingest_push( db_session, repo_id=repo_id, branch="main", head_commit_id=second_id, commits=[ CommitInput( commit_id=second_id, branch="main", parent_ids=[first_id], message="incremental push", author="tester", timestamp="2026-01-01T00:01:00Z", snapshot_id=None, ) ], snapshots=[], objects=[], force=False, author="tester", ) assert result.ok assert result.remote_head == second_id async def test_ingest_push_genesis_commit_no_parent_accepted( db_session: AsyncSession, ) -> None: """ingest_push() must accept a genesis commit with an empty parent_ids list.""" from musehub.models.musehub import CommitInput from musehub.services.musehub_sync import ingest_push from tests.factories import create_repo from muse.core.types import blob_id repo = await create_repo(db_session, slug="genesis-test", owner="testuser") genesis_id = blob_id(b"genesis commit fresh") result = await ingest_push( db_session, repo_id=str(repo.repo_id), branch="main", head_commit_id=genesis_id, commits=[ CommitInput( commit_id=genesis_id, branch="main", parent_ids=[], message="initial commit", author="tester", timestamp="2026-01-01T00:00:00Z", snapshot_id=None, ) ], snapshots=[], objects=[], force=False, author="tester", ) assert result.ok assert result.remote_head == genesis_id