gabriel / musehub public

test_migrations.py file-level

at main · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:9 Merge 'fix/assignee-sigil-inline' into 'dev' β€” proposal: Assignee sigil… · gabriel · Jun 7, 2026
1 """Section 36 β€” Database Migrations / Alembic (7-layer test suite).
2
3 Covers:
4 alembic/versions/0001_consolidated_schema.py
5 Single consolidated migration, no chain
6
7 Key design decisions:
8 - Tests require a real PostgreSQL instance (migrations use now(), JSON casts, etc.).
9 The DB URL is:
10 postgresql+asyncpg://musehub:musehub@localhost:5434/musehub_migration_test
11 - Each test creates a FRESH database (drop + create) so tests are fully isolated.
12 - The `migration_db_url` fixture is module-scoped: the fresh DB is created once
13 per module; individual test groups share it but leave the DB at a known
14 revision (HEAD) after each test.
15 - Tests that need the DB at a specific intermediate revision use
16 `_run_to(cfg, rev)` helpers that upgrade/downgrade as needed.
17 - **Never run the full test suite in CI** β€” these tests connect to the real
18 Postgres container. Run with:
19 python -m pytest tests/test_migrations_section36.py -x -q
20
21 Connection note:
22 The postgres container is exposed at localhost:5434 (mapped from 5432).
23 Credentials: musehub / musehub.
24 """
25 from __future__ import annotations
26
27 import asyncio
28 import os
29 import pathlib
30 import secrets
31 import time
32 from collections.abc import AsyncGenerator, Generator
33 from urllib.parse import urlparse as _urlparse
34
35 import datetime
36
37 import pytest
38 import pytest_asyncio
39
40 pytestmark = pytest.mark.migrations
41 from musehub.core.genesis import compute_identity_id, compute_repo_id
42 from sqlalchemy import inspect, text
43 from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
44 from sqlalchemy.pool import NullPool
45
46 # Project root: works whether running locally or inside Docker (/app)
47 _PROJECT_ROOT = pathlib.Path(__file__).parent.parent
48
49 # ── constants ─────────────────────────────────────────────────────────────────
50
51 # Derive DB host from DATABASE_URL env var (set inside Docker) or fall back to
52 # the host-mapped port used for local development.
53 _raw_db_url = os.environ.get("DATABASE_URL", "")
54 if _raw_db_url:
55 _p = _urlparse(_raw_db_url)
56 _PG_BASE_URL = f"postgresql+asyncpg://{_p.username}:{_p.password}@{_p.hostname}:{_p.port}"
57 else:
58 _PG_BASE_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434"
59 _TEST_DB = "musehub_migration_test_s36"
60 _TEST_URL = f"{_PG_BASE_URL}/{_TEST_DB}"
61 _ADMIN_URL = f"{_PG_BASE_URL}/musehub" # connect to main DB to create/drop the test DB
62
63 # Separate DB for performance tests so _fresh_db() there doesn't kill migrated_engine
64 _PERF_TEST_DB = "musehub_migration_perf_s36"
65 _PERF_TEST_URL = f"{_PG_BASE_URL}/{_PERF_TEST_DB}"
66
67 _ALL_REVISIONS = [
68 "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008", "0009", "0010",
69 "0011", "0012", "0013", "0014", "0015", "0016", "0017", "0018", "0019", "0020",
70 "0021", "0022", "0023", "0024", "0025", "0026", "0027", "0028", "0029", "0030",
71 "0031", "0032", "0033", "0034", "0035", "0036", "0037", "0038", "0039", "0040",
72 "0041", "0042", "0043", "0044", "0045", "0046", "0047", "0048", "0049", "0050",
73 "0051", "0052", "0053", "0054", "0055", "0056", "0057", "0058", "0059", "0060",
74 "0061", "0062", "0063", "0064", "0065", "0066",
75 "0067", "0068", "0069",
76 ]
77 _HEAD = "0069"
78
79 # Tables that MUST exist after a full upgrade to HEAD
80 _REQUIRED_TABLES = {
81 "alembic_version",
82 "muse_commits", "muse_snapshots",
83 "musehub_auth_challenges", "musehub_auth_keys", "musehub_background_jobs",
84 "musehub_branches", "musehub_collaborators",
85 "musehub_commits", "musehub_coord_records",
86 "musehub_coord_reservations", "musehub_coord_tasks",
87 "musehub_domain_installs", "musehub_domains",
88 "musehub_identities", "musehub_issue_events", "musehub_issues", "musehub_labels",
89 "musehub_object_refs", "musehub_objects", "musehub_proposals", "musehub_releases",
90 "musehub_repos", "musehub_sessions",
91 "musehub_snapshot_entries", "musehub_snapshots",
92 "musehub_wire_tags",
93 "musehub_bridge_mirrors",
94 "musehub_symbol_history_entries", "musehub_symbol_intel", "musehub_hash_occurrence_entries",
95 }
96
97 # Tables that MUST be absent after a full upgrade
98 _LEGACY_TABLES = {
99 "muse_users", "muse_access_tokens", "musehub_profiles",
100 "musehub_issue_milestones", "musehub_milestones",
101 }
102
103
104 # ── helpers ───────────────────────────────────────────────────────────────────
105
106
107 def _run_alembic(db_url: str, *args: str) -> None:
108 """Run an alembic command in a subprocess to avoid the settings lru_cache.
109
110 The in-process lru_cache on musehub.config.settings is populated from
111 conftest.py before DATABASE_URL is set; running in a subprocess guarantees
112 a clean settings instance that picks up our DATABASE_URL.
113 """
114 import subprocess
115 import sys
116
117 env = {
118 "DATABASE_URL": db_url,
119 "MUSE_ENV": "test",
120 "PATH": "/usr/local/bin:/usr/bin:/bin",
121 }
122 result = subprocess.run(
123 [sys.executable, "-m", "alembic"] + list(args),
124 cwd=str(_PROJECT_ROOT),
125 env=env,
126 capture_output=True,
127 text=True,
128 timeout=120,
129 )
130 if result.returncode != 0:
131 raise RuntimeError(
132 f"alembic {' '.join(args)} failed:\n{result.stderr[-2000:]}"
133 )
134
135
136 def _upgrade(db_url: str, revision: str = "head") -> None:
137 _run_alembic(db_url, "upgrade", revision)
138
139
140 def _downgrade(db_url: str, revision: str) -> None:
141 _run_alembic(db_url, "downgrade", revision)
142
143
144 def _current_rev(db_url: str) -> str | None:
145 from alembic.runtime.migration import MigrationContext
146 from sqlalchemy import create_engine
147
148 sync_url = db_url.replace("+asyncpg", "")
149 engine = create_engine(sync_url)
150 with engine.connect() as conn:
151 ctx = MigrationContext.configure(conn)
152 rev = ctx.get_current_revision()
153 engine.dispose()
154 return rev
155
156
157 async def _tables(engine: AsyncEngine) -> set[str]:
158 async with engine.connect() as conn:
159 result = await conn.execute(
160 text(
161 "SELECT tablename FROM pg_tables "
162 "WHERE schemaname='public' ORDER BY tablename"
163 )
164 )
165 return {row[0] for row in result}
166
167
168 async def _indexes_for(engine: AsyncEngine, table: str) -> set[str]:
169 async with engine.connect() as conn:
170 result = await conn.execute(
171 text(
172 "SELECT indexname FROM pg_indexes "
173 "WHERE schemaname='public' AND tablename = :t"
174 ),
175 {"t": table},
176 )
177 return {row[0] for row in result}
178
179
180 async def _columns(engine: AsyncEngine, table: str) -> set[str]:
181 async with engine.connect() as conn:
182 result = await conn.execute(
183 text(
184 "SELECT column_name FROM information_schema.columns "
185 "WHERE table_schema='public' AND table_name = :t"
186 ),
187 {"t": table},
188 )
189 return {row[0] for row in result}
190
191
192 def _fresh_db() -> None:
193 """Drop and recreate the section-36 test database synchronously."""
194 from sqlalchemy import create_engine, text as stext
195
196 sync_admin = _ADMIN_URL.replace("+asyncpg", "")
197 engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT")
198 with engine.connect() as conn:
199 conn.execute(
200 stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_TEST_DB}'")
201 )
202 conn.execute(stext(f"DROP DATABASE IF EXISTS {_TEST_DB}"))
203 conn.execute(stext(f"CREATE DATABASE {_TEST_DB}"))
204 engine.dispose()
205
206
207 def _perf_fresh_db() -> None:
208 """Drop and recreate the performance test database (separate from _TEST_DB)."""
209 from sqlalchemy import create_engine, text as stext
210
211 sync_admin = _ADMIN_URL.replace("+asyncpg", "")
212 engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT")
213 with engine.connect() as conn:
214 conn.execute(
215 stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_PERF_TEST_DB}'")
216 )
217 conn.execute(stext(f"DROP DATABASE IF EXISTS {_PERF_TEST_DB}"))
218 conn.execute(stext(f"CREATE DATABASE {_PERF_TEST_DB}"))
219 engine.dispose()
220
221
222 # ── module-scoped engine fixture ──────────────────────────────────────────────
223
224
225 @pytest.fixture(scope="module")
226 def migrated_engine() -> Generator[AsyncEngine, None, None]:
227 """Create a fresh DB, run all migrations to HEAD, yield an async engine.
228
229 Module-scoped: created once for all tests in this file.
230 The DB is left at HEAD after each test (tests that downgrade must re-upgrade).
231 """
232 _fresh_db()
233 _upgrade(_TEST_URL)
234 engine = create_async_engine(_TEST_URL, poolclass=NullPool)
235 yield engine
236 asyncio.run(engine.dispose())
237
238
239 # ══════════════════════════════════════════════════════════════════════════════
240 # 1. Unit
241 # ══════════════════════════════════════════════════════════════════════════════
242
243
244 class TestMigrationUnit:
245 """Static analysis of migration files β€” no DB connection needed."""
246
247 def test_revision_chain_is_linear(self) -> None:
248 """Every migration except the initial one has a down_revision pointing to its predecessor."""
249 import importlib.util
250
251 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
252 revisions = {}
253 for f in versions_dir.glob("*.py"):
254 spec = importlib.util.spec_from_file_location(f.stem, f)
255 assert spec and spec.loader
256 mod = importlib.util.module_from_spec(spec)
257 assert spec.loader is not None
258 spec.loader.exec_module(mod)
259 rev = getattr(mod, "revision", None)
260 down = getattr(mod, "down_revision", None)
261 if rev:
262 revisions[str(rev)] = str(down) if down else None
263
264 # Exactly one migration has no predecessor (the initial one)
265 roots = [r for r, d in revisions.items() if d is None]
266 assert len(roots) == 1, f"Expected exactly one root migration, got: {roots}"
267
268 # Every other migration's down_revision must exist as a revision
269 for rev, down in revisions.items():
270 if down is not None:
271 assert down in revisions, (
272 f"Migration {rev}: down_revision '{down}' not found"
273 )
274
275 def test_all_migrations_present(self) -> None:
276 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
277 files = {f.stem for f in versions_dir.glob("*.py") if not f.stem.startswith("__")}
278 assert len(files) == len(_ALL_REVISIONS), (
279 f"Expected {len(_ALL_REVISIONS)} migrations, found {len(files)}: {files}"
280 )
281
282 def test_head_revision_is_correct(self) -> None:
283 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
284 # The head is the revision not referenced as any down_revision
285 import importlib.util
286
287 all_revs: set[str] = set()
288 down_revs: set[str] = set()
289 for f in versions_dir.glob("*.py"):
290 spec = importlib.util.spec_from_file_location(f.stem, f)
291 assert spec and spec.loader
292 mod = importlib.util.module_from_spec(spec)
293 assert spec.loader is not None
294 spec.loader.exec_module(mod)
295 rev = getattr(mod, "revision", None)
296 down = getattr(mod, "down_revision", None)
297 if rev:
298 all_revs.add(str(rev))
299 if down:
300 down_revs.add(str(down))
301 heads = all_revs - down_revs
302 assert heads == {_HEAD}, f"Expected head {_HEAD}, got: {heads}"
303
304 def test_each_migration_has_upgrade_and_downgrade(self) -> None:
305 import importlib.util
306
307 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
308 for f in versions_dir.glob("*.py"):
309 spec = importlib.util.spec_from_file_location(f.stem, f)
310 assert spec and spec.loader
311 mod = importlib.util.module_from_spec(spec)
312 assert spec.loader is not None
313 spec.loader.exec_module(mod)
314 assert callable(getattr(mod, "upgrade", None)), (
315 f"{f.name} missing upgrade()"
316 )
317 assert callable(getattr(mod, "downgrade", None)), (
318 f"{f.name} missing downgrade()"
319 )
320
321 def test_alembic_ini_points_to_correct_script_location(self) -> None:
322 import configparser
323
324 parser = configparser.ConfigParser()
325 parser.read(str(_PROJECT_ROOT / "alembic.ini"))
326 script_location = parser.get("alembic", "script_location", fallback="")
327 assert "alembic" in script_location
328
329 def test_env_py_imports_all_model_bases(self) -> None:
330 env_text = (_PROJECT_ROOT / "alembic" / "env.py").read_text()
331 assert "from musehub.db.database import Base" in env_text
332 assert "target_metadata = Base.metadata" in env_text
333
334
335 # ══════════════════════════════════════════════════════════════════════════════
336 # 2. Integration
337 # ══════════════════════════════════════════════════════════════════════════════
338
339
340 class TestMigrationIntegration:
341 """Real DB β€” run migrations and inspect schema state."""
342
343 def test_upgrade_to_head_succeeds(self) -> None:
344 """Full migration chain from empty DB to HEAD completes without error."""
345 _fresh_db()
346 _upgrade(_TEST_URL)
347 rev = _current_rev(_TEST_URL)
348 assert rev == _HEAD
349
350 def test_current_revision_tracked_in_alembic_version(
351 self, migrated_engine: AsyncEngine
352 ) -> None:
353 async def _check() -> str | None:
354 async with migrated_engine.connect() as conn:
355 result = await conn.execute(text("SELECT version_num FROM alembic_version"))
356 row = result.fetchone()
357 return row[0] if row else None
358
359 rev = asyncio.run(_check())
360 assert rev == _HEAD
361
362 def test_required_tables_exist_at_head(
363 self, migrated_engine: AsyncEngine
364 ) -> None:
365 tables = asyncio.run(_tables(migrated_engine))
366 missing = _REQUIRED_TABLES - tables
367 assert not missing, f"Tables missing after upgrade to HEAD: {missing}"
368
369 def test_legacy_tables_absent_at_head(
370 self, migrated_engine: AsyncEngine
371 ) -> None:
372 tables = asyncio.run(_tables(migrated_engine))
373 present_legacy = _LEGACY_TABLES & tables
374 assert not present_legacy, f"Legacy tables still present after HEAD: {present_legacy}"
375
376 def test_downgrade_then_upgrade_returns_to_head(self) -> None:
377 """Downgrade to base and re-upgrade β€” must return to HEAD cleanly."""
378 _downgrade(_TEST_URL, "base")
379 assert _current_rev(_TEST_URL) is None
380 _upgrade(_TEST_URL, "head")
381 assert _current_rev(_TEST_URL) == _HEAD
382
383 def test_full_downgrade_to_base_then_re_upgrade(self) -> None:
384 """Downgrade all the way to base then re-upgrade β€” round trip works."""
385 _downgrade(_TEST_URL, "base")
386 assert _current_rev(_TEST_URL) is None
387 _upgrade(_TEST_URL)
388 assert _current_rev(_TEST_URL) == _HEAD
389
390
391 # ══════════════════════════════════════════════════════════════════════════════
392 # 3. End-to-End
393 # ══════════════════════════════════════════════════════════════════════════════
394
395
396 class TestMigrationE2E:
397 """Full stack: migrate, insert data, downgrade, re-upgrade, verify data."""
398
399 def test_repo_data_persists_at_head(
400 self, migrated_engine: AsyncEngine
401 ) -> None:
402 """Insert a repo row at HEAD and verify it can be read back."""
403
404 async def _run() -> None:
405 now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
406 repo_id = compute_repo_id("testuser", "e2e-repo", "", now_iso)
407 async with migrated_engine.connect() as conn:
408 await conn.execute(
409 text(
410 "INSERT INTO musehub_repos "
411 "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, "
412 "description, tags, domain_meta, training_opt_out, created_at, updated_at) "
413 "VALUES (:id, :n, :s, :o, :oid, 'public', 'main', '', "
414 "ARRAY[]::text[], '{}'::json, false, NOW(), NOW())"
415 ),
416 {"id": repo_id, "n": "e2e-repo", "s": "e2e-repo", "o": "testuser", "oid": "testuser"},
417 )
418 await conn.commit()
419
420 async with migrated_engine.connect() as conn:
421 result = await conn.execute(
422 text("SELECT repo_id FROM musehub_repos WHERE repo_id = :id"),
423 {"id": repo_id},
424 )
425 row = result.fetchone()
426 assert row is not None, "Repo row must be readable after insert"
427
428 asyncio.run(_run())
429
430 def test_identity_row_persists_at_head(
431 self, migrated_engine: AsyncEngine
432 ) -> None:
433 """Insert an identity at HEAD and verify it can be read back."""
434
435 async def _run() -> None:
436 identity_id = compute_identity_id(secrets.token_bytes(16))
437 async with migrated_engine.connect() as conn:
438 await conn.execute(
439 text(
440 "INSERT INTO musehub_identities "
441 "(identity_id, handle, display_name, identity_type, "
442 "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) "
443 "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())"
444 ),
445 {"id": identity_id, "h": "e2e-user", "dn": "E2E User"},
446 )
447 await conn.commit()
448
449 async with migrated_engine.connect() as conn:
450 result = await conn.execute(
451 text("SELECT identity_id FROM musehub_identities WHERE identity_id = :id"),
452 {"id": identity_id},
453 )
454 row = result.fetchone()
455 assert row is not None
456
457 asyncio.run(_run())
458
459 def test_each_migration_step_is_individually_runnable(self) -> None:
460 """Upgrade one step at a time from base to HEAD β€” each step must succeed."""
461 _downgrade(_TEST_URL, "base")
462 for rev in _ALL_REVISIONS:
463 _upgrade(_TEST_URL, rev)
464 actual = _current_rev(_TEST_URL)
465 assert actual == rev, f"After upgrading to {rev}, got revision {actual}"
466
467
468 # ══════════════════════════════════════════════════════════════════════════════
469 # 4. Stress
470 # ══════════════════════════════════════════════════════════════════════════════
471
472
473 class TestMigrationStress:
474 """Performance and repeated-execution scenarios."""
475
476 def test_full_upgrade_completes_under_60_seconds(self) -> None:
477 """25 migrations on an empty DB must complete in under 60 seconds."""
478 _fresh_db()
479 start = time.perf_counter()
480 _upgrade(_TEST_URL)
481 elapsed = time.perf_counter() - start
482 assert elapsed < 60, f"Full upgrade took {elapsed:.1f}s (budget: 60s)"
483
484 def test_full_downgrade_completes_under_60_seconds(self) -> None:
485 """Downgrade from HEAD to base must complete in under 60 seconds."""
486 # DB is at HEAD from previous test
487 start = time.perf_counter()
488 _downgrade(_TEST_URL, "base")
489 elapsed = time.perf_counter() - start
490 assert elapsed < 60, f"Full downgrade took {elapsed:.1f}s (budget: 60s)"
491 # Restore HEAD for subsequent tests
492 _upgrade(_TEST_URL)
493
494 def test_three_consecutive_upgrade_to_head_idempotent(self) -> None:
495 """Running upgrade head twice on an already-migrated DB is a no-op (idempotent)."""
496 assert _current_rev(_TEST_URL) == _HEAD
497 _upgrade(_TEST_URL) # Already at HEAD β€” must not error
498 assert _current_rev(_TEST_URL) == _HEAD
499 _upgrade(_TEST_URL)
500 assert _current_rev(_TEST_URL) == _HEAD
501
502 def test_100_identity_inserts_survive_step_cycle(
503 self, migrated_engine: AsyncEngine
504 ) -> None:
505 """Insert 100 identity rows and verify all are readable."""
506
507 async def _run() -> None:
508 ids = [secrets.token_hex(16) for _ in range(100)]
509 async with migrated_engine.connect() as conn:
510 for i, identity_id in enumerate(ids):
511 await conn.execute(
512 text(
513 "INSERT INTO musehub_identities "
514 "(identity_id, handle, display_name, identity_type, "
515 "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) "
516 "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())"
517 ),
518 {"id": identity_id, "h": f"stress-{i}-{identity_id[:6]}", "dn": f"Stress {i}"},
519 )
520 await conn.commit()
521
522 async with migrated_engine.connect() as conn:
523 result = await conn.execute(
524 text("SELECT COUNT(*) FROM musehub_identities WHERE identity_id = ANY(:ids)"),
525 {"ids": ids},
526 )
527 count = result.scalar()
528 assert count == 100, f"Only {count}/100 identity rows readable after insert"
529
530 asyncio.run(_run())
531
532
533 # ══════════════════════════════════════════════════════════════════════════════
534 # 5. Data Integrity
535 # ══════════════════════════════════════════════════════════════════════════════
536
537
538 class TestMigrationDataIntegrity:
539 """Schema correctness β€” columns, indexes, constraints at HEAD."""
540
541 def test_musehub_repos_has_required_columns(
542 self, migrated_engine: AsyncEngine
543 ) -> None:
544 cols = asyncio.run(
545 _columns(migrated_engine, "musehub_repos")
546 )
547 for col in ("repo_id", "owner", "slug", "visibility", "default_branch"):
548 assert col in cols, f"musehub_repos missing column: {col}"
549
550 def test_musehub_identities_has_agent_columns(
551 self, migrated_engine: AsyncEngine
552 ) -> None:
553 """musehub_identities must have agent identity columns."""
554 cols = asyncio.run(
555 _columns(migrated_engine, "musehub_identities")
556 )
557 for col in ("spawned_by", "scope", "expires_at"):
558 assert col in cols, f"musehub_identities missing agent column: {col}"
559
560 def test_musehub_identities_has_no_legacy_user_id(
561 self, migrated_engine: AsyncEngine
562 ) -> None:
563 """musehub_identities must not have legacy_user_id."""
564 cols = asyncio.run(
565 _columns(migrated_engine, "musehub_identities")
566 )
567 assert "legacy_user_id" not in cols
568
569 def test_musehub_issue_comments_has_no_state_refs(
570 self, migrated_engine: AsyncEngine
571 ) -> None:
572 """musehub_issue_comments must not have state_refs column."""
573 cols = asyncio.run(
574 _columns(migrated_engine, "musehub_issue_comments")
575 )
576 assert "state_refs" not in cols
577
578 def test_musehub_auth_keys_has_algorithm_column(
579 self, migrated_engine: AsyncEngine
580 ) -> None:
581 """musehub_auth_keys must have the algorithm column."""
582 cols = asyncio.run(
583 _columns(migrated_engine, "musehub_auth_keys")
584 )
585 assert "algorithm" in cols
586
587 def test_musehub_repos_unique_owner_slug_index_exists(
588 self, migrated_engine: AsyncEngine
589 ) -> None:
590 indexes = asyncio.run(
591 _indexes_for(migrated_engine, "musehub_repos")
592 )
593 assert "uq_musehub_repos_owner_slug" in indexes
594
595 def test_musehub_auth_keys_fingerprint_unique_index(
596 self, migrated_engine: AsyncEngine
597 ) -> None:
598 indexes = asyncio.run(
599 _indexes_for(migrated_engine, "musehub_auth_keys")
600 )
601 assert "uq_musehub_auth_keys_fingerprint" in indexes
602
603 def test_musehub_collaborators_has_identity_handle_column(
604 self, migrated_engine: AsyncEngine
605 ) -> None:
606 """musehub_collaborators must use identity_handle, not user_id."""
607 cols = asyncio.run(
608 _columns(migrated_engine, "musehub_collaborators")
609 )
610 assert "identity_handle" in cols
611 assert "user_id" not in cols
612
613 def test_wire_tags_table_exists(
614 self, migrated_engine: AsyncEngine
615 ) -> None:
616 tables = asyncio.run(_tables(migrated_engine))
617 assert "musehub_wire_tags" in tables
618
619 def test_sessions_table_exists(
620 self, migrated_engine: AsyncEngine
621 ) -> None:
622 tables = asyncio.run(_tables(migrated_engine))
623 assert "musehub_sessions" in tables
624
625
626 # ══════════════════════════════════════════════════════════════════════════════
627 # 6. Security
628 # ══════════════════════════════════════════════════════════════════════════════
629
630
631 class TestMigrationSecurity:
632 """Ensure migrations don't introduce security-relevant schema regressions."""
633
634 def test_legacy_auth_tables_dropped_at_head(
635 self, migrated_engine: AsyncEngine
636 ) -> None:
637 """muse_users, muse_access_tokens, musehub_profiles must not exist at HEAD."""
638 tables = asyncio.run(_tables(migrated_engine))
639 for legacy in _LEGACY_TABLES:
640 assert legacy not in tables, f"Legacy auth table '{legacy}' still present at HEAD"
641
642 def test_musehub_auth_keys_has_fingerprint_unique_constraint(
643 self, migrated_engine: AsyncEngine
644 ) -> None:
645 """Fingerprint uniqueness prevents duplicate key registration."""
646 indexes = asyncio.run(
647 _indexes_for(migrated_engine, "musehub_auth_keys")
648 )
649 assert "uq_musehub_auth_keys_fingerprint" in indexes
650
651 def test_musehub_identities_handle_unique(
652 self, migrated_engine: AsyncEngine
653 ) -> None:
654 """Identity handles must be unique β€” prevents impersonation via duplicate handle."""
655 indexes = asyncio.run(
656 _indexes_for(migrated_engine, "musehub_identities")
657 )
658 assert "uq_musehub_identities_handle" in indexes
659
660 def test_downgrade_does_not_expose_dropped_columns(self) -> None:
661 """After a full downgrade and re-upgrade, state_refs stays absent."""
662 _downgrade(_TEST_URL, "base")
663 _upgrade(_TEST_URL)
664 engine = create_async_engine(_TEST_URL, poolclass=NullPool)
665 try:
666 cols = asyncio.run(
667 _columns(engine, "musehub_issue_comments")
668 )
669 finally:
670 asyncio.run(engine.dispose())
671 assert "state_refs" not in cols
672
673 def test_repos_owner_slug_uniqueness_enforced(
674 self, migrated_engine: AsyncEngine
675 ) -> None:
676 """Inserting two repos with the same owner/slug must fail with an integrity error."""
677 from sqlalchemy.exc import IntegrityError
678
679 async def _run() -> None:
680 rid1 = secrets.token_hex(16)
681 rid2 = secrets.token_hex(16)
682 _dup_sql = (
683 "INSERT INTO musehub_repos "
684 "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, "
685 "description, tags, domain_meta, training_opt_out, created_at, updated_at) "
686 "VALUES (:id, 'dup', 'dup-slug', 'sec-owner', 'sec-owner', 'public', 'main', "
687 "'', ARRAY[]::text[], '{}'::json, false, NOW(), NOW())"
688 )
689 async with migrated_engine.connect() as conn:
690 await conn.execute(text(_dup_sql), {"id": rid1})
691 await conn.commit()
692 async with migrated_engine.connect() as conn:
693 with pytest.raises(IntegrityError):
694 await conn.execute(text(_dup_sql), {"id": rid2})
695 await conn.commit()
696
697 asyncio.run(_run())
698
699
700 # ══════════════════════════════════════════════════════════════════════════════
701 # 7. Performance
702 # ══════════════════════════════════════════════════════════════════════════════
703
704
705 class TestMigrationPerformance:
706 """Latency budgets for migration operations."""
707
708 def test_single_step_upgrade_under_5_seconds(self) -> None:
709 """Each individual migration step must complete in under 5 seconds."""
710 _fresh_db() # terminate open connections before DDL-heavy downgrade
711 for rev in _ALL_REVISIONS:
712 start = time.perf_counter()
713 _upgrade(_TEST_URL, rev)
714 elapsed = time.perf_counter() - start
715 assert elapsed < 5, (
716 f"Migration {rev} upgrade took {elapsed:.2f}s (budget: 5s per step)"
717 )
718
719 def test_single_step_downgrade_under_5_seconds(self) -> None:
720 """Each individual migration downgrade must complete in under 5 seconds."""
721 _fresh_db()
722 _upgrade(_TEST_URL) # start from a guaranteed-clean HEAD
723 for rev in reversed(_ALL_REVISIONS[:-1]):
724 start = time.perf_counter()
725 _downgrade(_TEST_URL, rev)
726 elapsed = time.perf_counter() - start
727 assert elapsed < 5, (
728 f"Migration {rev} downgrade took {elapsed:.2f}s (budget: 5s per step)"
729 )
730 # Final downgrade to base
731 start = time.perf_counter()
732 _downgrade(_TEST_URL, "base")
733 elapsed = time.perf_counter() - start
734 assert elapsed < 5, f"Migration base downgrade took {elapsed:.2f}s"
735 # Restore HEAD for any remaining tests
736 _upgrade(_TEST_URL)
737
738 def test_schema_introspection_under_500ms(
739 self, migrated_engine: AsyncEngine
740 ) -> None:
741 """Listing all tables in the public schema must complete in under 500ms."""
742 start = time.perf_counter()
743 asyncio.run(_tables(migrated_engine))
744 elapsed_ms = (time.perf_counter() - start) * 1000
745 assert elapsed_ms < 500, f"Table introspection took {elapsed_ms:.0f}ms (budget: 500ms)"
746
747 def test_index_introspection_under_200ms(
748 self, migrated_engine: AsyncEngine
749 ) -> None:
750 """Listing indexes for a single table must complete in under 200ms."""
751 start = time.perf_counter()
752 asyncio.run(
753 _indexes_for(migrated_engine, "musehub_repos")
754 )
755 elapsed_ms = (time.perf_counter() - start) * 1000
756 assert elapsed_ms < 200, f"Index introspection took {elapsed_ms:.0f}ms (budget: 200ms)"
757
758
759 # ══════════════════════════════════════════════════════════════════════════════
760 # Alembic chain tests (structural, no DB connection required)
761 # ══════════════════════════════════════════════════════════════════════════════
762
763
764 import inspect
765 import stat
766 import types
767
768 from alembic.config import Config
769 from alembic.script import ScriptDirectory
770
771 _REPO_ROOT = pathlib.Path(__file__).parent.parent
772 _EXPECTED_HEAD_PREFIX = _HEAD
773 _EXPECTED_MIGRATION_COUNT = len(_ALL_REVISIONS)
774
775
776 # ---------------------------------------------------------------------------
777 # Helpers
778 # ---------------------------------------------------------------------------
779
780 def _script_dir() -> ScriptDirectory:
781 cfg = Config(str(_REPO_ROOT / "alembic.ini"))
782 cfg.set_main_option("script_location", str(_REPO_ROOT / "alembic"))
783 return ScriptDirectory.from_config(cfg)
784
785
786 def _is_stub_downgrade(mod: types.ModuleType) -> bool:
787 """Return True if downgrade() is a pass-only or single-ellipsis stub."""
788 import ast
789 try:
790 src = inspect.getsource(getattr(mod, "downgrade"))
791 except (OSError, AttributeError):
792 return True
793
794 # Strip the 'def downgrade...:' line and check what's left
795 lines = [l.strip() for l in src.splitlines() if l.strip() and not l.strip().startswith("def ")]
796 # A stub body is just 'pass', '...', or a docstring with nothing else
797 non_comment = [l for l in lines if not l.startswith("#") and not l.startswith('"""') and not l.startswith("'''")]
798 if not non_comment:
799 return True
800 if len(non_comment) == 1 and non_comment[0] in ("pass", "..."):
801 return True
802 return False
803
804
805 # ---------------------------------------------------------------------------
806 # Linear chain / versioning
807 # ---------------------------------------------------------------------------
808
809 def test_migration_chain_is_linear() -> None:
810 """Migration graph must have exactly one head (no branches)."""
811 heads = _script_dir().get_heads()
812 assert len(heads) == 1, (
813 f"Expected single-head chain, got {len(heads)} heads: {heads}. "
814 "Resolve the branch before merging."
815 )
816
817
818 def test_migration_count_matches_expected() -> None:
819 """Migration count must equal _EXPECTED_MIGRATION_COUNT.
820
821 Update _EXPECTED_MIGRATION_COUNT here when adding a new migration.
822 """
823 revisions = list(_script_dir().walk_revisions())
824 assert len(revisions) == _EXPECTED_MIGRATION_COUNT, (
825 f"Expected {_EXPECTED_MIGRATION_COUNT} migrations, found {len(revisions)}. "
826 "Update _EXPECTED_MIGRATION_COUNT in this file."
827 )
828
829
830 def test_head_revision_prefix() -> None:
831 """Head must start with the expected revision prefix."""
832 heads = _script_dir().get_heads()
833 assert len(heads) == 1
834 assert heads[0].startswith(_EXPECTED_HEAD_PREFIX), (
835 f"Expected head starting with '{_EXPECTED_HEAD_PREFIX}', got '{heads[0]}'. "
836 "Update _EXPECTED_HEAD_PREFIX when a new migration is added."
837 )
838
839
840 def test_all_migrations_importable() -> None:
841 """Every migration module must be importable without errors."""
842 for rev in _script_dir().walk_revisions():
843 assert rev.module is not None, (
844 f"Revision {rev.revision} has no module β€” check for missing file."
845 )
846
847
848 def test_revision_ids_are_sequential_integers() -> None:
849 """Numeric revision IDs must be zero-padded 4-digit integers with no gaps within numeric revisions."""
850 revisions = sorted(_script_dir().walk_revisions(), key=lambda r: r.revision)
851 numeric_ids = sorted(int(r.revision[:4]) for r in revisions if r.revision[:4].isdigit())
852 if not numeric_ids:
853 return # no numeric revisions yet β€” nothing to check
854 expected = list(range(numeric_ids[0], numeric_ids[-1] + 1))
855 assert numeric_ids == expected, (
856 f"Numeric revision IDs have gaps: found {numeric_ids}, expected {expected}."
857 )
858
859
860 # ---------------------------------------------------------------------------
861 # Downgrade coverage β€” every forward migration has a real downgrade
862 # ---------------------------------------------------------------------------
863
864 def test_all_migrations_have_downgrade_function() -> None:
865 """Every migration module must define a downgrade() function."""
866 for rev in _script_dir().walk_revisions():
867 mod = rev.module
868 assert mod is not None
869 assert hasattr(mod, "downgrade"), (
870 f"Revision {rev.revision} is missing a downgrade() function."
871 )
872
873
874 def test_no_stub_downgrade_implementations() -> None:
875 """No migration may have a pass-only or ellipsis-only downgrade().
876
877 A stub downgrade makes rollback a silent no-op β€” forbidden.
878 """
879 stubs = []
880 for rev in _script_dir().walk_revisions():
881 mod = rev.module
882 if mod is not None and _is_stub_downgrade(mod):
883 stubs.append(rev.revision)
884
885 assert not stubs, (
886 f"Migrations with stub (non-functional) downgrade(): {stubs}. "
887 "Implement the actual rollback DDL."
888 )
889
890
891 def test_every_migration_references_tables_in_downgrade() -> None:
892 """Migrations that add a column or table must also reference it in downgrade().
893
894 Heuristic: if upgrade() calls op.add_column / op.create_table for table X,
895 downgrade() must reference X (via drop_column / drop_table).
896 Checked by source inspection β€” not exhaustive, but catches obvious omissions.
897 """
898 import re
899
900 _ADD_RE = re.compile(r'op\.(add_column|create_table)\(\s*["\'](\w+)["\']')
901 _DROP_RE = re.compile(r'op\.(drop_column|drop_table)\(\s*["\'](\w+)["\']')
902 # Also accept raw SQL drops: DROP TABLE [IF EXISTS] <name> and ALTER TABLE [IF EXISTS] <name> DROP COLUMN
903 _SQL_DROP_TABLE_RE = re.compile(r'DROP\s+TABLE\s+(?:IF\s+EXISTS\s+)?(\w+)', re.I)
904 _SQL_DROP_COL_RE = re.compile(r'ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?(\w+)\s+DROP\s+COLUMN', re.I)
905
906 violations = []
907 for rev in _script_dir().walk_revisions():
908 mod = rev.module
909 if mod is None:
910 continue
911 try:
912 up_src = inspect.getsource(getattr(mod, "upgrade"))
913 down_src = inspect.getsource(getattr(mod, "downgrade"))
914 except (OSError, AttributeError):
915 continue
916
917 tables_added = {m.group(2) for m in _ADD_RE.finditer(up_src)}
918 tables_dropped = (
919 {m.group(2) for m in _DROP_RE.finditer(down_src)}
920 | {m.group(1) for m in _SQL_DROP_TABLE_RE.finditer(down_src)}
921 | {m.group(1) for m in _SQL_DROP_COL_RE.finditer(down_src)}
922 )
923 missing = tables_added - tables_dropped
924 if missing:
925 violations.append(f"{rev.revision}: added {missing} but downgrade() doesn't drop them")
926
927 assert not violations, (
928 f"Migrations with incomplete downgrade():\n{'\n'.join(violations)}"
929 )
930
931
932 # ---------------------------------------------------------------------------
933 # Transaction safety β€” env.py wraps migrations in a transaction
934 # ---------------------------------------------------------------------------
935
936 def test_env_py_uses_begin_transaction() -> None:
937 """alembic/env.py must call context.begin_transaction() for both offline and online runs.
938
939 This ensures that a failed migration rolls back cleanly instead of leaving
940 the schema in a partially-applied state.
941 """
942 env_path = _REPO_ROOT / "alembic" / "env.py"
943 src = env_path.read_text()
944 count = src.count("context.begin_transaction()")
945 assert count >= 2, (
946 f"Expected at least 2 calls to context.begin_transaction() in env.py "
947 f"(one for offline, one for online), found {count}. "
948 "Wrap both run_migrations_offline() and do_run_migrations() in a transaction."
949 )
950
951
952 # ---------------------------------------------------------------------------
953 # Prod-snapshot migration test script
954 # ---------------------------------------------------------------------------
955
956 def test_migrate_test_script_exists_and_is_executable() -> None:
957 """deploy/migrate-test.sh must exist and be executable.
958
959 This script is the mechanism for testing migrations against a production
960 data snapshot before applying to production (checklist item 5.3.2).
961 """
962 script = _REPO_ROOT / "deploy" / "migrate-test.sh"
963 assert script.exists(), (
964 "deploy/migrate-test.sh is missing. "
965 "This script is required to validate migrations against a prod snapshot."
966 )
967 mode = script.stat().st_mode
968 assert mode & stat.S_IXUSR, (
969 "deploy/migrate-test.sh must be executable (chmod +x)."
970 )
971
972
973 def test_migrate_test_script_contains_round_trip() -> None:
974 """deploy/migrate-test.sh must perform an upgrade→downgrade→upgrade round-trip."""
975 script = (_REPO_ROOT / "deploy" / "migrate-test.sh").read_text()
976 assert "upgrade head" in script, "script must run 'alembic upgrade head'"
977 assert "downgrade" in script, "script must run 'alembic downgrade' step"
978 # Must do upgrade twice (initial + after downgrade round-trip)
979 assert script.count("upgrade head") >= 2, (
980 "script must upgrade to HEAD twice (initial apply + round-trip after downgrade)"
981 )