"""Section 36 — Database Migrations / Alembic (7-layer test suite). Covers: alembic/versions/0001_consolidated_schema.py Single consolidated migration, no chain Key design decisions: - Tests require a real PostgreSQL instance (migrations use now(), JSON casts, etc.). The DB URL is: postgresql+asyncpg://musehub:musehub@localhost:5434/musehub_migration_test - Each test creates a FRESH database (drop + create) so tests are fully isolated. - The `migration_db_url` fixture is module-scoped: the fresh DB is created once per module; individual test groups share it but leave the DB at a known revision (HEAD) after each test. - Tests that need the DB at a specific intermediate revision use `_run_to(cfg, rev)` helpers that upgrade/downgrade as needed. - **Never run the full test suite in CI** — these tests connect to the real Postgres container. Run with: python -m pytest tests/test_migrations_section36.py -x -q Connection note: The postgres container is exposed at localhost:5434 (mapped from 5432). Credentials: musehub / musehub. """ from __future__ import annotations import asyncio import os import pathlib import secrets import time from collections.abc import AsyncGenerator, Generator from urllib.parse import urlparse as _urlparse import datetime import pytest import pytest_asyncio pytestmark = pytest.mark.migrations from musehub.core.genesis import compute_identity_id, compute_repo_id from sqlalchemy import inspect, text from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine from sqlalchemy.pool import NullPool # Project root: works whether running locally or inside Docker (/app) _PROJECT_ROOT = pathlib.Path(__file__).parent.parent # ── constants ───────────────────────────────────────────────────────────────── # Derive DB host from DATABASE_URL env var (set inside Docker) or fall back to # the host-mapped port used for local development. _raw_db_url = os.environ.get("DATABASE_URL", "") if _raw_db_url: _p = _urlparse(_raw_db_url) _PG_BASE_URL = f"postgresql+asyncpg://{_p.username}:{_p.password}@{_p.hostname}:{_p.port}" else: _PG_BASE_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434" _TEST_DB = "musehub_migration_test_s36" _TEST_URL = f"{_PG_BASE_URL}/{_TEST_DB}" _ADMIN_URL = f"{_PG_BASE_URL}/musehub" # connect to main DB to create/drop the test DB # Separate DB for performance tests so _fresh_db() there doesn't kill migrated_engine _PERF_TEST_DB = "musehub_migration_perf_s36" _PERF_TEST_URL = f"{_PG_BASE_URL}/{_PERF_TEST_DB}" _ALL_REVISIONS = [ "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008", "0009", "0010", "0011", "0012", "0013", "0014", "0015", "0016", "0017", "0018", "0019", "0020", "0021", "0022", "0023", "0024", "0025", "0026", "0027", "0028", "0029", "0030", "0031", "0032", "0033", "0034", "0035", "0036", "0037", "0038", "0039", "0040", "0041", "0042", "0043", "0044", "0045", "0046", "0047", "0048", "0049", "0050", "0051", "0052", "0053", "0054", "0055", "0056", "0057", "0058", "0059", "0060", "0061", "0062", "0063", "0064", "0065", "0066", "0067", "0068", "0069", "0070", "0071", ] _HEAD = "0071" # Tables that MUST exist after a full upgrade to HEAD _REQUIRED_TABLES = { "alembic_version", "muse_commits", "muse_snapshots", "musehub_auth_challenges", "musehub_auth_keys", "musehub_background_jobs", "musehub_branches", "musehub_collaborators", "musehub_commits", "musehub_coord_records", "musehub_coord_reservations", "musehub_coord_tasks", "musehub_domain_installs", "musehub_domains", "musehub_identities", "musehub_issue_events", "musehub_issues", "musehub_labels", "musehub_object_refs", "musehub_objects", "musehub_proposals", "musehub_releases", "musehub_repos", "musehub_sessions", "musehub_snapshot_entries", "musehub_snapshots", "musehub_wire_tags", "musehub_bridge_mirrors", "musehub_symbol_history_entries", "musehub_symbol_intel", "musehub_hash_occurrence_entries", "musehub_fetch_mpack_cache", } # Tables that MUST be absent after a full upgrade _LEGACY_TABLES = { "muse_users", "muse_access_tokens", "musehub_profiles", "musehub_issue_milestones", "musehub_milestones", } # ── helpers ─────────────────────────────────────────────────────────────────── def _run_alembic(db_url: str, *args: str) -> None: """Run an alembic command in a subprocess to avoid the settings lru_cache. The in-process lru_cache on musehub.config.settings is populated from conftest.py before DATABASE_URL is set; running in a subprocess guarantees a clean settings instance that picks up our DATABASE_URL. """ import subprocess import sys env = { "DATABASE_URL": db_url, "MUSE_ENV": "test", "PATH": "/usr/local/bin:/usr/bin:/bin", } result = subprocess.run( [sys.executable, "-m", "alembic"] + list(args), cwd=str(_PROJECT_ROOT), env=env, capture_output=True, text=True, timeout=120, ) if result.returncode != 0: raise RuntimeError( f"alembic {' '.join(args)} failed:\n{result.stderr[-2000:]}" ) def _upgrade(db_url: str, revision: str = "head") -> None: _run_alembic(db_url, "upgrade", revision) def _downgrade(db_url: str, revision: str) -> None: _run_alembic(db_url, "downgrade", revision) def _current_rev(db_url: str) -> str | None: from alembic.runtime.migration import MigrationContext from sqlalchemy import create_engine sync_url = db_url.replace("+asyncpg", "") engine = create_engine(sync_url) with engine.connect() as conn: ctx = MigrationContext.configure(conn) rev = ctx.get_current_revision() engine.dispose() return rev async def _tables(engine: AsyncEngine) -> set[str]: async with engine.connect() as conn: result = await conn.execute( text( "SELECT tablename FROM pg_tables " "WHERE schemaname='public' ORDER BY tablename" ) ) return {row[0] for row in result} async def _indexes_for(engine: AsyncEngine, table: str) -> set[str]: async with engine.connect() as conn: result = await conn.execute( text( "SELECT indexname FROM pg_indexes " "WHERE schemaname='public' AND tablename = :t" ), {"t": table}, ) return {row[0] for row in result} async def _columns(engine: AsyncEngine, table: str) -> set[str]: async with engine.connect() as conn: result = await conn.execute( text( "SELECT column_name FROM information_schema.columns " "WHERE table_schema='public' AND table_name = :t" ), {"t": table}, ) return {row[0] for row in result} def _fresh_db() -> None: """Drop and recreate the section-36 test database synchronously.""" from sqlalchemy import create_engine, text as stext sync_admin = _ADMIN_URL.replace("+asyncpg", "") engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT") with engine.connect() as conn: conn.execute( stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_TEST_DB}'") ) conn.execute(stext(f"DROP DATABASE IF EXISTS {_TEST_DB}")) conn.execute(stext(f"CREATE DATABASE {_TEST_DB}")) engine.dispose() def _perf_fresh_db() -> None: """Drop and recreate the performance test database (separate from _TEST_DB).""" from sqlalchemy import create_engine, text as stext sync_admin = _ADMIN_URL.replace("+asyncpg", "") engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT") with engine.connect() as conn: conn.execute( stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_PERF_TEST_DB}'") ) conn.execute(stext(f"DROP DATABASE IF EXISTS {_PERF_TEST_DB}")) conn.execute(stext(f"CREATE DATABASE {_PERF_TEST_DB}")) engine.dispose() # ── module-scoped engine fixture ────────────────────────────────────────────── @pytest.fixture(scope="module") def migrated_engine() -> Generator[AsyncEngine, None, None]: """Create a fresh DB, run all migrations to HEAD, yield an async engine. Module-scoped: created once for all tests in this file. The DB is left at HEAD after each test (tests that downgrade must re-upgrade). """ _fresh_db() _upgrade(_TEST_URL) engine = create_async_engine(_TEST_URL, poolclass=NullPool) yield engine asyncio.run(engine.dispose()) # ══════════════════════════════════════════════════════════════════════════════ # 1. Unit # ══════════════════════════════════════════════════════════════════════════════ class TestMigrationUnit: """Static analysis of migration files — no DB connection needed.""" def test_revision_chain_is_linear(self) -> None: """Every migration except the initial one has a down_revision pointing to its predecessor.""" import importlib.util versions_dir = _PROJECT_ROOT / "alembic" / "versions" revisions = {} for f in versions_dir.glob("*.py"): spec = importlib.util.spec_from_file_location(f.stem, f) assert spec and spec.loader mod = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(mod) rev = getattr(mod, "revision", None) down = getattr(mod, "down_revision", None) if rev: revisions[str(rev)] = str(down) if down else None # Exactly one migration has no predecessor (the initial one) roots = [r for r, d in revisions.items() if d is None] assert len(roots) == 1, f"Expected exactly one root migration, got: {roots}" # Every other migration's down_revision must exist as a revision for rev, down in revisions.items(): if down is not None: assert down in revisions, ( f"Migration {rev}: down_revision '{down}' not found" ) def test_all_migrations_present(self) -> None: versions_dir = _PROJECT_ROOT / "alembic" / "versions" files = {f.stem for f in versions_dir.glob("*.py") if not f.stem.startswith("__")} assert len(files) == len(_ALL_REVISIONS), ( f"Expected {len(_ALL_REVISIONS)} migrations, found {len(files)}: {files}" ) def test_head_revision_is_correct(self) -> None: versions_dir = _PROJECT_ROOT / "alembic" / "versions" # The head is the revision not referenced as any down_revision import importlib.util all_revs: set[str] = set() down_revs: set[str] = set() for f in versions_dir.glob("*.py"): spec = importlib.util.spec_from_file_location(f.stem, f) assert spec and spec.loader mod = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(mod) rev = getattr(mod, "revision", None) down = getattr(mod, "down_revision", None) if rev: all_revs.add(str(rev)) if down: down_revs.add(str(down)) heads = all_revs - down_revs assert heads == {_HEAD}, f"Expected head {_HEAD}, got: {heads}" def test_each_migration_has_upgrade_and_downgrade(self) -> None: import importlib.util versions_dir = _PROJECT_ROOT / "alembic" / "versions" for f in versions_dir.glob("*.py"): spec = importlib.util.spec_from_file_location(f.stem, f) assert spec and spec.loader mod = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(mod) assert callable(getattr(mod, "upgrade", None)), ( f"{f.name} missing upgrade()" ) assert callable(getattr(mod, "downgrade", None)), ( f"{f.name} missing downgrade()" ) def test_alembic_ini_points_to_correct_script_location(self) -> None: import configparser parser = configparser.ConfigParser() parser.read(str(_PROJECT_ROOT / "alembic.ini")) script_location = parser.get("alembic", "script_location", fallback="") assert "alembic" in script_location def test_env_py_imports_all_model_bases(self) -> None: env_text = (_PROJECT_ROOT / "alembic" / "env.py").read_text() assert "from musehub.db.database import Base" in env_text assert "target_metadata = Base.metadata" in env_text # ══════════════════════════════════════════════════════════════════════════════ # 2. Integration # ══════════════════════════════════════════════════════════════════════════════ class TestMigrationIntegration: """Real DB — run migrations and inspect schema state.""" def test_upgrade_to_head_succeeds(self) -> None: """Full migration chain from empty DB to HEAD completes without error.""" _fresh_db() _upgrade(_TEST_URL) rev = _current_rev(_TEST_URL) assert rev == _HEAD def test_current_revision_tracked_in_alembic_version( self, migrated_engine: AsyncEngine ) -> None: async def _check() -> str | None: async with migrated_engine.connect() as conn: result = await conn.execute(text("SELECT version_num FROM alembic_version")) row = result.fetchone() return row[0] if row else None rev = asyncio.run(_check()) assert rev == _HEAD def test_required_tables_exist_at_head( self, migrated_engine: AsyncEngine ) -> None: tables = asyncio.run(_tables(migrated_engine)) missing = _REQUIRED_TABLES - tables assert not missing, f"Tables missing after upgrade to HEAD: {missing}" def test_legacy_tables_absent_at_head( self, migrated_engine: AsyncEngine ) -> None: tables = asyncio.run(_tables(migrated_engine)) present_legacy = _LEGACY_TABLES & tables assert not present_legacy, f"Legacy tables still present after HEAD: {present_legacy}" def test_downgrade_then_upgrade_returns_to_head(self) -> None: """Downgrade to base and re-upgrade — must return to HEAD cleanly.""" _downgrade(_TEST_URL, "base") assert _current_rev(_TEST_URL) is None _upgrade(_TEST_URL, "head") assert _current_rev(_TEST_URL) == _HEAD def test_full_downgrade_to_base_then_re_upgrade(self) -> None: """Downgrade all the way to base then re-upgrade — round trip works.""" _downgrade(_TEST_URL, "base") assert _current_rev(_TEST_URL) is None _upgrade(_TEST_URL) assert _current_rev(_TEST_URL) == _HEAD # ══════════════════════════════════════════════════════════════════════════════ # 3. End-to-End # ══════════════════════════════════════════════════════════════════════════════ class TestMigrationE2E: """Full stack: migrate, insert data, downgrade, re-upgrade, verify data.""" def test_repo_data_persists_at_head( self, migrated_engine: AsyncEngine ) -> None: """Insert a repo row at HEAD and verify it can be read back.""" async def _run() -> None: now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() repo_id = compute_repo_id("testuser", "e2e-repo", "", now_iso) async with migrated_engine.connect() as conn: await conn.execute( text( "INSERT INTO musehub_repos " "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, " "description, tags, domain_meta, training_opt_out, created_at, updated_at) " "VALUES (:id, :n, :s, :o, :oid, 'public', 'main', '', " "ARRAY[]::text[], '{}'::json, false, NOW(), NOW())" ), {"id": repo_id, "n": "e2e-repo", "s": "e2e-repo", "o": "testuser", "oid": "testuser"}, ) await conn.commit() async with migrated_engine.connect() as conn: result = await conn.execute( text("SELECT repo_id FROM musehub_repos WHERE repo_id = :id"), {"id": repo_id}, ) row = result.fetchone() assert row is not None, "Repo row must be readable after insert" asyncio.run(_run()) def test_identity_row_persists_at_head( self, migrated_engine: AsyncEngine ) -> None: """Insert an identity at HEAD and verify it can be read back.""" async def _run() -> None: identity_id = compute_identity_id(secrets.token_bytes(16)) async with migrated_engine.connect() as conn: await conn.execute( text( "INSERT INTO musehub_identities " "(identity_id, handle, display_name, identity_type, " "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) " "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())" ), {"id": identity_id, "h": "e2e-user", "dn": "E2E User"}, ) await conn.commit() async with migrated_engine.connect() as conn: result = await conn.execute( text("SELECT identity_id FROM musehub_identities WHERE identity_id = :id"), {"id": identity_id}, ) row = result.fetchone() assert row is not None asyncio.run(_run()) def test_each_migration_step_is_individually_runnable(self) -> None: """Upgrade one step at a time from base to HEAD — each step must succeed.""" _downgrade(_TEST_URL, "base") for rev in _ALL_REVISIONS: _upgrade(_TEST_URL, rev) actual = _current_rev(_TEST_URL) assert actual == rev, f"After upgrading to {rev}, got revision {actual}" # ══════════════════════════════════════════════════════════════════════════════ # 4. Stress # ══════════════════════════════════════════════════════════════════════════════ class TestMigrationStress: """Performance and repeated-execution scenarios.""" def test_full_upgrade_completes_under_60_seconds(self) -> None: """25 migrations on an empty DB must complete in under 60 seconds.""" _fresh_db() start = time.perf_counter() _upgrade(_TEST_URL) elapsed = time.perf_counter() - start assert elapsed < 60, f"Full upgrade took {elapsed:.1f}s (budget: 60s)" def test_full_downgrade_completes_under_60_seconds(self) -> None: """Downgrade from HEAD to base must complete in under 60 seconds.""" # DB is at HEAD from previous test start = time.perf_counter() _downgrade(_TEST_URL, "base") elapsed = time.perf_counter() - start assert elapsed < 60, f"Full downgrade took {elapsed:.1f}s (budget: 60s)" # Restore HEAD for subsequent tests _upgrade(_TEST_URL) def test_three_consecutive_upgrade_to_head_idempotent(self) -> None: """Running upgrade head twice on an already-migrated DB is a no-op (idempotent).""" assert _current_rev(_TEST_URL) == _HEAD _upgrade(_TEST_URL) # Already at HEAD — must not error assert _current_rev(_TEST_URL) == _HEAD _upgrade(_TEST_URL) assert _current_rev(_TEST_URL) == _HEAD def test_100_identity_inserts_survive_step_cycle( self, migrated_engine: AsyncEngine ) -> None: """Insert 100 identity rows and verify all are readable.""" async def _run() -> None: ids = [secrets.token_hex(16) for _ in range(100)] async with migrated_engine.connect() as conn: for i, identity_id in enumerate(ids): await conn.execute( text( "INSERT INTO musehub_identities " "(identity_id, handle, display_name, identity_type, " "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) " "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())" ), {"id": identity_id, "h": f"stress-{i}-{identity_id[:6]}", "dn": f"Stress {i}"}, ) await conn.commit() async with migrated_engine.connect() as conn: result = await conn.execute( text("SELECT COUNT(*) FROM musehub_identities WHERE identity_id = ANY(:ids)"), {"ids": ids}, ) count = result.scalar() assert count == 100, f"Only {count}/100 identity rows readable after insert" asyncio.run(_run()) # ══════════════════════════════════════════════════════════════════════════════ # 5. Data Integrity # ══════════════════════════════════════════════════════════════════════════════ class TestMigrationDataIntegrity: """Schema correctness — columns, indexes, constraints at HEAD.""" def test_musehub_repos_has_required_columns( self, migrated_engine: AsyncEngine ) -> None: cols = asyncio.run( _columns(migrated_engine, "musehub_repos") ) for col in ("repo_id", "owner", "slug", "visibility", "default_branch"): assert col in cols, f"musehub_repos missing column: {col}" def test_musehub_identities_has_agent_columns( self, migrated_engine: AsyncEngine ) -> None: """musehub_identities must have agent identity columns.""" cols = asyncio.run( _columns(migrated_engine, "musehub_identities") ) for col in ("spawned_by", "scope", "expires_at"): assert col in cols, f"musehub_identities missing agent column: {col}" def test_musehub_identities_has_no_legacy_user_id( self, migrated_engine: AsyncEngine ) -> None: """musehub_identities must not have legacy_user_id.""" cols = asyncio.run( _columns(migrated_engine, "musehub_identities") ) assert "legacy_user_id" not in cols def test_musehub_issue_comments_has_no_state_refs( self, migrated_engine: AsyncEngine ) -> None: """musehub_issue_comments must not have state_refs column.""" cols = asyncio.run( _columns(migrated_engine, "musehub_issue_comments") ) assert "state_refs" not in cols def test_musehub_auth_keys_has_algorithm_column( self, migrated_engine: AsyncEngine ) -> None: """musehub_auth_keys must have the algorithm column.""" cols = asyncio.run( _columns(migrated_engine, "musehub_auth_keys") ) assert "algorithm" in cols def test_musehub_repos_unique_owner_slug_index_exists( self, migrated_engine: AsyncEngine ) -> None: indexes = asyncio.run( _indexes_for(migrated_engine, "musehub_repos") ) assert "uq_musehub_repos_owner_slug" in indexes def test_musehub_auth_keys_fingerprint_unique_index( self, migrated_engine: AsyncEngine ) -> None: indexes = asyncio.run( _indexes_for(migrated_engine, "musehub_auth_keys") ) assert "uq_musehub_auth_keys_fingerprint" in indexes def test_musehub_collaborators_has_identity_handle_column( self, migrated_engine: AsyncEngine ) -> None: """musehub_collaborators must use identity_handle, not user_id.""" cols = asyncio.run( _columns(migrated_engine, "musehub_collaborators") ) assert "identity_handle" in cols assert "user_id" not in cols def test_wire_tags_table_exists( self, migrated_engine: AsyncEngine ) -> None: tables = asyncio.run(_tables(migrated_engine)) assert "musehub_wire_tags" in tables def test_sessions_table_exists( self, migrated_engine: AsyncEngine ) -> None: tables = asyncio.run(_tables(migrated_engine)) assert "musehub_sessions" in tables # ══════════════════════════════════════════════════════════════════════════════ # 6. Security # ══════════════════════════════════════════════════════════════════════════════ class TestMigrationSecurity: """Ensure migrations don't introduce security-relevant schema regressions.""" def test_legacy_auth_tables_dropped_at_head( self, migrated_engine: AsyncEngine ) -> None: """muse_users, muse_access_tokens, musehub_profiles must not exist at HEAD.""" tables = asyncio.run(_tables(migrated_engine)) for legacy in _LEGACY_TABLES: assert legacy not in tables, f"Legacy auth table '{legacy}' still present at HEAD" def test_musehub_auth_keys_has_fingerprint_unique_constraint( self, migrated_engine: AsyncEngine ) -> None: """Fingerprint uniqueness prevents duplicate key registration.""" indexes = asyncio.run( _indexes_for(migrated_engine, "musehub_auth_keys") ) assert "uq_musehub_auth_keys_fingerprint" in indexes def test_musehub_identities_handle_unique( self, migrated_engine: AsyncEngine ) -> None: """Identity handles must be unique — prevents impersonation via duplicate handle.""" indexes = asyncio.run( _indexes_for(migrated_engine, "musehub_identities") ) assert "uq_musehub_identities_handle" in indexes def test_downgrade_does_not_expose_dropped_columns(self) -> None: """After a full downgrade and re-upgrade, state_refs stays absent.""" _downgrade(_TEST_URL, "base") _upgrade(_TEST_URL) engine = create_async_engine(_TEST_URL, poolclass=NullPool) try: cols = asyncio.run( _columns(engine, "musehub_issue_comments") ) finally: asyncio.run(engine.dispose()) assert "state_refs" not in cols def test_repos_owner_slug_uniqueness_enforced( self, migrated_engine: AsyncEngine ) -> None: """Inserting two repos with the same owner/slug must fail with an integrity error.""" from sqlalchemy.exc import IntegrityError async def _run() -> None: rid1 = secrets.token_hex(16) rid2 = secrets.token_hex(16) _dup_sql = ( "INSERT INTO musehub_repos " "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, " "description, tags, domain_meta, training_opt_out, created_at, updated_at) " "VALUES (:id, 'dup', 'dup-slug', 'sec-owner', 'sec-owner', 'public', 'main', " "'', ARRAY[]::text[], '{}'::json, false, NOW(), NOW())" ) async with migrated_engine.connect() as conn: await conn.execute(text(_dup_sql), {"id": rid1}) await conn.commit() async with migrated_engine.connect() as conn: with pytest.raises(IntegrityError): await conn.execute(text(_dup_sql), {"id": rid2}) await conn.commit() asyncio.run(_run()) # ══════════════════════════════════════════════════════════════════════════════ # 7. Performance # ══════════════════════════════════════════════════════════════════════════════ class TestMigrationPerformance: """Latency budgets for migration operations.""" def test_single_step_upgrade_under_5_seconds(self) -> None: """Each individual migration step must complete in under 5 seconds.""" _fresh_db() # terminate open connections before DDL-heavy downgrade for rev in _ALL_REVISIONS: start = time.perf_counter() _upgrade(_TEST_URL, rev) elapsed = time.perf_counter() - start assert elapsed < 5, ( f"Migration {rev} upgrade took {elapsed:.2f}s (budget: 5s per step)" ) def test_single_step_downgrade_under_5_seconds(self) -> None: """Each individual migration downgrade must complete in under 5 seconds.""" _fresh_db() _upgrade(_TEST_URL) # start from a guaranteed-clean HEAD for rev in reversed(_ALL_REVISIONS[:-1]): start = time.perf_counter() _downgrade(_TEST_URL, rev) elapsed = time.perf_counter() - start assert elapsed < 5, ( f"Migration {rev} downgrade took {elapsed:.2f}s (budget: 5s per step)" ) # Final downgrade to base start = time.perf_counter() _downgrade(_TEST_URL, "base") elapsed = time.perf_counter() - start assert elapsed < 5, f"Migration base downgrade took {elapsed:.2f}s" # Restore HEAD for any remaining tests _upgrade(_TEST_URL) def test_schema_introspection_under_500ms( self, migrated_engine: AsyncEngine ) -> None: """Listing all tables in the public schema must complete in under 500ms.""" start = time.perf_counter() asyncio.run(_tables(migrated_engine)) elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 500, f"Table introspection took {elapsed_ms:.0f}ms (budget: 500ms)" def test_index_introspection_under_200ms( self, migrated_engine: AsyncEngine ) -> None: """Listing indexes for a single table must complete in under 200ms.""" start = time.perf_counter() asyncio.run( _indexes_for(migrated_engine, "musehub_repos") ) elapsed_ms = (time.perf_counter() - start) * 1000 assert elapsed_ms < 200, f"Index introspection took {elapsed_ms:.0f}ms (budget: 200ms)" # ══════════════════════════════════════════════════════════════════════════════ # Alembic chain tests (structural, no DB connection required) # ══════════════════════════════════════════════════════════════════════════════ import inspect import stat import types from alembic.config import Config from alembic.script import ScriptDirectory _REPO_ROOT = pathlib.Path(__file__).parent.parent _EXPECTED_HEAD_PREFIX = _HEAD _EXPECTED_MIGRATION_COUNT = len(_ALL_REVISIONS) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _script_dir() -> ScriptDirectory: cfg = Config(str(_REPO_ROOT / "alembic.ini")) cfg.set_main_option("script_location", str(_REPO_ROOT / "alembic")) return ScriptDirectory.from_config(cfg) def _is_stub_downgrade(mod: types.ModuleType) -> bool: """Return True if downgrade() is a pass-only or single-ellipsis stub.""" import ast try: src = inspect.getsource(getattr(mod, "downgrade")) except (OSError, AttributeError): return True # Strip the 'def downgrade...:' line and check what's left lines = [l.strip() for l in src.splitlines() if l.strip() and not l.strip().startswith("def ")] # A stub body is just 'pass', '...', or a docstring with nothing else non_comment = [l for l in lines if not l.startswith("#") and not l.startswith('"""') and not l.startswith("'''")] if not non_comment: return True if len(non_comment) == 1 and non_comment[0] in ("pass", "..."): return True return False # --------------------------------------------------------------------------- # Linear chain / versioning # --------------------------------------------------------------------------- def test_migration_chain_is_linear() -> None: """Migration graph must have exactly one head (no branches).""" heads = _script_dir().get_heads() assert len(heads) == 1, ( f"Expected single-head chain, got {len(heads)} heads: {heads}. " "Resolve the branch before merging." ) def test_migration_count_matches_expected() -> None: """Migration count must equal _EXPECTED_MIGRATION_COUNT. Update _EXPECTED_MIGRATION_COUNT here when adding a new migration. """ revisions = list(_script_dir().walk_revisions()) assert len(revisions) == _EXPECTED_MIGRATION_COUNT, ( f"Expected {_EXPECTED_MIGRATION_COUNT} migrations, found {len(revisions)}. " "Update _EXPECTED_MIGRATION_COUNT in this file." ) def test_head_revision_prefix() -> None: """Head must start with the expected revision prefix.""" heads = _script_dir().get_heads() assert len(heads) == 1 assert heads[0].startswith(_EXPECTED_HEAD_PREFIX), ( f"Expected head starting with '{_EXPECTED_HEAD_PREFIX}', got '{heads[0]}'. " "Update _EXPECTED_HEAD_PREFIX when a new migration is added." ) def test_all_migrations_importable() -> None: """Every migration module must be importable without errors.""" for rev in _script_dir().walk_revisions(): assert rev.module is not None, ( f"Revision {rev.revision} has no module — check for missing file." ) def test_revision_ids_are_sequential_integers() -> None: """Numeric revision IDs must be zero-padded 4-digit integers with no gaps within numeric revisions.""" revisions = sorted(_script_dir().walk_revisions(), key=lambda r: r.revision) numeric_ids = sorted(int(r.revision[:4]) for r in revisions if r.revision[:4].isdigit()) if not numeric_ids: return # no numeric revisions yet — nothing to check expected = list(range(numeric_ids[0], numeric_ids[-1] + 1)) assert numeric_ids == expected, ( f"Numeric revision IDs have gaps: found {numeric_ids}, expected {expected}." ) # --------------------------------------------------------------------------- # Downgrade coverage — every forward migration has a real downgrade # --------------------------------------------------------------------------- def test_all_migrations_have_downgrade_function() -> None: """Every migration module must define a downgrade() function.""" for rev in _script_dir().walk_revisions(): mod = rev.module assert mod is not None assert hasattr(mod, "downgrade"), ( f"Revision {rev.revision} is missing a downgrade() function." ) def test_no_stub_downgrade_implementations() -> None: """No migration may have a pass-only or ellipsis-only downgrade(). A stub downgrade makes rollback a silent no-op — forbidden. """ stubs = [] for rev in _script_dir().walk_revisions(): mod = rev.module if mod is not None and _is_stub_downgrade(mod): stubs.append(rev.revision) assert not stubs, ( f"Migrations with stub (non-functional) downgrade(): {stubs}. " "Implement the actual rollback DDL." ) def test_every_migration_references_tables_in_downgrade() -> None: """Migrations that add a column or table must also reference it in downgrade(). Heuristic: if upgrade() calls op.add_column / op.create_table for table X, downgrade() must reference X (via drop_column / drop_table). Checked by source inspection — not exhaustive, but catches obvious omissions. """ import re _ADD_RE = re.compile(r'op\.(add_column|create_table)\(\s*["\'](\w+)["\']') _DROP_RE = re.compile(r'op\.(drop_column|drop_table)\(\s*["\'](\w+)["\']') # Also accept raw SQL drops: DROP TABLE [IF EXISTS] and ALTER TABLE [IF EXISTS] DROP COLUMN _SQL_DROP_TABLE_RE = re.compile(r'DROP\s+TABLE\s+(?:IF\s+EXISTS\s+)?(\w+)', re.I) _SQL_DROP_COL_RE = re.compile(r'ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?(\w+)\s+DROP\s+COLUMN', re.I) violations = [] for rev in _script_dir().walk_revisions(): mod = rev.module if mod is None: continue try: up_src = inspect.getsource(getattr(mod, "upgrade")) down_src = inspect.getsource(getattr(mod, "downgrade")) except (OSError, AttributeError): continue tables_added = {m.group(2) for m in _ADD_RE.finditer(up_src)} tables_dropped = ( {m.group(2) for m in _DROP_RE.finditer(down_src)} | {m.group(1) for m in _SQL_DROP_TABLE_RE.finditer(down_src)} | {m.group(1) for m in _SQL_DROP_COL_RE.finditer(down_src)} ) missing = tables_added - tables_dropped if missing: violations.append(f"{rev.revision}: added {missing} but downgrade() doesn't drop them") assert not violations, ( f"Migrations with incomplete downgrade():\n{'\n'.join(violations)}" ) # --------------------------------------------------------------------------- # Transaction safety — env.py wraps migrations in a transaction # --------------------------------------------------------------------------- def test_env_py_uses_begin_transaction() -> None: """alembic/env.py must call context.begin_transaction() for both offline and online runs. This ensures that a failed migration rolls back cleanly instead of leaving the schema in a partially-applied state. """ env_path = _REPO_ROOT / "alembic" / "env.py" src = env_path.read_text() count = src.count("context.begin_transaction()") assert count >= 2, ( f"Expected at least 2 calls to context.begin_transaction() in env.py " f"(one for offline, one for online), found {count}. " "Wrap both run_migrations_offline() and do_run_migrations() in a transaction." ) # --------------------------------------------------------------------------- # Prod-snapshot migration test script # --------------------------------------------------------------------------- def test_migrate_test_script_exists_and_is_executable() -> None: """deploy/migrate-test.sh must exist and be executable. This script is the mechanism for testing migrations against a production data snapshot before applying to production (checklist item 5.3.2). """ script = _REPO_ROOT / "deploy" / "migrate-test.sh" assert script.exists(), ( "deploy/migrate-test.sh is missing. " "This script is required to validate migrations against a prod snapshot." ) mode = script.stat().st_mode assert mode & stat.S_IXUSR, ( "deploy/migrate-test.sh must be executable (chmod +x)." ) def test_migrate_test_script_contains_round_trip() -> None: """deploy/migrate-test.sh must perform an upgrade→downgrade→upgrade round-trip.""" script = (_REPO_ROOT / "deploy" / "migrate-test.sh").read_text() assert "upgrade head" in script, "script must run 'alembic upgrade head'" assert "downgrade" in script, "script must run 'alembic downgrade' step" # Must do upgrade twice (initial + after downgrade round-trip) assert script.count("upgrade head") >= 2, ( "script must upgrade to HEAD twice (initial apply + round-trip after downgrade)" )