gabriel / musehub public
test_migrations.py python
982 lines 41.3 KB
Raw
sha256:294f5f41277b624e1107a61b102d88c784a5380c2c436095da176538d74f38ba chore: rename migration 0072 → 0071 (0071 prose backfill wa… Sonnet 4.6 minor ⚠ breaking 9 hours ago
1 """Section 36 — Database Migrations / Alembic (7-layer test suite).
2
3 Covers:
4 alembic/versions/0001_consolidated_schema.py
5 Single consolidated migration, no chain
6
7 Key design decisions:
8 - Tests require a real PostgreSQL instance (migrations use now(), JSON casts, etc.).
9 The DB URL is:
10 postgresql+asyncpg://musehub:musehub@localhost:5434/musehub_migration_test
11 - Each test creates a FRESH database (drop + create) so tests are fully isolated.
12 - The `migration_db_url` fixture is module-scoped: the fresh DB is created once
13 per module; individual test groups share it but leave the DB at a known
14 revision (HEAD) after each test.
15 - Tests that need the DB at a specific intermediate revision use
16 `_run_to(cfg, rev)` helpers that upgrade/downgrade as needed.
17 - **Never run the full test suite in CI** — these tests connect to the real
18 Postgres container. Run with:
19 python -m pytest tests/test_migrations_section36.py -x -q
20
21 Connection note:
22 The postgres container is exposed at localhost:5434 (mapped from 5432).
23 Credentials: musehub / musehub.
24 """
25 from __future__ import annotations
26
27 import asyncio
28 import os
29 import pathlib
30 import secrets
31 import time
32 from collections.abc import AsyncGenerator, Generator
33 from urllib.parse import urlparse as _urlparse
34
35 import datetime
36
37 import pytest
38 import pytest_asyncio
39
40 pytestmark = pytest.mark.migrations
41 from musehub.core.genesis import compute_identity_id, compute_repo_id
42 from sqlalchemy import inspect, text
43 from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
44 from sqlalchemy.pool import NullPool
45
46 # Project root: works whether running locally or inside Docker (/app)
47 _PROJECT_ROOT = pathlib.Path(__file__).parent.parent
48
49 # ── constants ─────────────────────────────────────────────────────────────────
50
51 # Derive DB host from DATABASE_URL env var (set inside Docker) or fall back to
52 # the host-mapped port used for local development.
53 _raw_db_url = os.environ.get("DATABASE_URL", "")
54 if _raw_db_url:
55 _p = _urlparse(_raw_db_url)
56 _PG_BASE_URL = f"postgresql+asyncpg://{_p.username}:{_p.password}@{_p.hostname}:{_p.port}"
57 else:
58 _PG_BASE_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434"
59 _TEST_DB = "musehub_migration_test_s36"
60 _TEST_URL = f"{_PG_BASE_URL}/{_TEST_DB}"
61 _ADMIN_URL = f"{_PG_BASE_URL}/musehub" # connect to main DB to create/drop the test DB
62
63 # Separate DB for performance tests so _fresh_db() there doesn't kill migrated_engine
64 _PERF_TEST_DB = "musehub_migration_perf_s36"
65 _PERF_TEST_URL = f"{_PG_BASE_URL}/{_PERF_TEST_DB}"
66
67 _ALL_REVISIONS = [
68 "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008", "0009", "0010",
69 "0011", "0012", "0013", "0014", "0015", "0016", "0017", "0018", "0019", "0020",
70 "0021", "0022", "0023", "0024", "0025", "0026", "0027", "0028", "0029", "0030",
71 "0031", "0032", "0033", "0034", "0035", "0036", "0037", "0038", "0039", "0040",
72 "0041", "0042", "0043", "0044", "0045", "0046", "0047", "0048", "0049", "0050",
73 "0051", "0052", "0053", "0054", "0055", "0056", "0057", "0058", "0059", "0060",
74 "0061", "0062", "0063", "0064", "0065", "0066",
75 "0067", "0068", "0069", "0070", "0071", "0072",
76 ]
77 _HEAD = "0072"
78
79 # Tables that MUST exist after a full upgrade to HEAD
80 _REQUIRED_TABLES = {
81 "alembic_version",
82 "muse_commits", "muse_snapshots",
83 "musehub_auth_challenges", "musehub_auth_keys", "musehub_background_jobs",
84 "musehub_branches", "musehub_collaborators",
85 "musehub_commits", "musehub_coord_records",
86 "musehub_coord_reservations", "musehub_coord_tasks",
87 "musehub_domain_installs", "musehub_domains",
88 "musehub_identities", "musehub_issue_events", "musehub_issues", "musehub_labels",
89 "musehub_object_refs", "musehub_objects", "musehub_proposals", "musehub_releases",
90 "musehub_repos", "musehub_sessions",
91 "musehub_snapshot_entries", "musehub_snapshots",
92 "musehub_wire_tags",
93 "musehub_bridge_mirrors",
94 "musehub_symbol_history_entries", "musehub_symbol_intel", "musehub_hash_occurrence_entries",
95 "musehub_fetch_mpack_cache",
96 }
97
98 # Tables that MUST be absent after a full upgrade
99 _LEGACY_TABLES = {
100 "muse_users", "muse_access_tokens", "musehub_profiles",
101 "musehub_issue_milestones", "musehub_milestones",
102 }
103
104
105 # ── helpers ───────────────────────────────────────────────────────────────────
106
107
108 def _run_alembic(db_url: str, *args: str) -> None:
109 """Run an alembic command in a subprocess to avoid the settings lru_cache.
110
111 The in-process lru_cache on musehub.config.settings is populated from
112 conftest.py before DATABASE_URL is set; running in a subprocess guarantees
113 a clean settings instance that picks up our DATABASE_URL.
114 """
115 import subprocess
116 import sys
117
118 env = {
119 "DATABASE_URL": db_url,
120 "MUSE_ENV": "test",
121 "PATH": "/usr/local/bin:/usr/bin:/bin",
122 }
123 result = subprocess.run(
124 [sys.executable, "-m", "alembic"] + list(args),
125 cwd=str(_PROJECT_ROOT),
126 env=env,
127 capture_output=True,
128 text=True,
129 timeout=120,
130 )
131 if result.returncode != 0:
132 raise RuntimeError(
133 f"alembic {' '.join(args)} failed:\n{result.stderr[-2000:]}"
134 )
135
136
137 def _upgrade(db_url: str, revision: str = "head") -> None:
138 _run_alembic(db_url, "upgrade", revision)
139
140
141 def _downgrade(db_url: str, revision: str) -> None:
142 _run_alembic(db_url, "downgrade", revision)
143
144
145 def _current_rev(db_url: str) -> str | None:
146 from alembic.runtime.migration import MigrationContext
147 from sqlalchemy import create_engine
148
149 sync_url = db_url.replace("+asyncpg", "")
150 engine = create_engine(sync_url)
151 with engine.connect() as conn:
152 ctx = MigrationContext.configure(conn)
153 rev = ctx.get_current_revision()
154 engine.dispose()
155 return rev
156
157
158 async def _tables(engine: AsyncEngine) -> set[str]:
159 async with engine.connect() as conn:
160 result = await conn.execute(
161 text(
162 "SELECT tablename FROM pg_tables "
163 "WHERE schemaname='public' ORDER BY tablename"
164 )
165 )
166 return {row[0] for row in result}
167
168
169 async def _indexes_for(engine: AsyncEngine, table: str) -> set[str]:
170 async with engine.connect() as conn:
171 result = await conn.execute(
172 text(
173 "SELECT indexname FROM pg_indexes "
174 "WHERE schemaname='public' AND tablename = :t"
175 ),
176 {"t": table},
177 )
178 return {row[0] for row in result}
179
180
181 async def _columns(engine: AsyncEngine, table: str) -> set[str]:
182 async with engine.connect() as conn:
183 result = await conn.execute(
184 text(
185 "SELECT column_name FROM information_schema.columns "
186 "WHERE table_schema='public' AND table_name = :t"
187 ),
188 {"t": table},
189 )
190 return {row[0] for row in result}
191
192
193 def _fresh_db() -> None:
194 """Drop and recreate the section-36 test database synchronously."""
195 from sqlalchemy import create_engine, text as stext
196
197 sync_admin = _ADMIN_URL.replace("+asyncpg", "")
198 engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT")
199 with engine.connect() as conn:
200 conn.execute(
201 stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_TEST_DB}'")
202 )
203 conn.execute(stext(f"DROP DATABASE IF EXISTS {_TEST_DB}"))
204 conn.execute(stext(f"CREATE DATABASE {_TEST_DB}"))
205 engine.dispose()
206
207
208 def _perf_fresh_db() -> None:
209 """Drop and recreate the performance test database (separate from _TEST_DB)."""
210 from sqlalchemy import create_engine, text as stext
211
212 sync_admin = _ADMIN_URL.replace("+asyncpg", "")
213 engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT")
214 with engine.connect() as conn:
215 conn.execute(
216 stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_PERF_TEST_DB}'")
217 )
218 conn.execute(stext(f"DROP DATABASE IF EXISTS {_PERF_TEST_DB}"))
219 conn.execute(stext(f"CREATE DATABASE {_PERF_TEST_DB}"))
220 engine.dispose()
221
222
223 # ── module-scoped engine fixture ──────────────────────────────────────────────
224
225
226 @pytest.fixture(scope="module")
227 def migrated_engine() -> Generator[AsyncEngine, None, None]:
228 """Create a fresh DB, run all migrations to HEAD, yield an async engine.
229
230 Module-scoped: created once for all tests in this file.
231 The DB is left at HEAD after each test (tests that downgrade must re-upgrade).
232 """
233 _fresh_db()
234 _upgrade(_TEST_URL)
235 engine = create_async_engine(_TEST_URL, poolclass=NullPool)
236 yield engine
237 asyncio.run(engine.dispose())
238
239
240 # ══════════════════════════════════════════════════════════════════════════════
241 # 1. Unit
242 # ══════════════════════════════════════════════════════════════════════════════
243
244
245 class TestMigrationUnit:
246 """Static analysis of migration files — no DB connection needed."""
247
248 def test_revision_chain_is_linear(self) -> None:
249 """Every migration except the initial one has a down_revision pointing to its predecessor."""
250 import importlib.util
251
252 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
253 revisions = {}
254 for f in versions_dir.glob("*.py"):
255 spec = importlib.util.spec_from_file_location(f.stem, f)
256 assert spec and spec.loader
257 mod = importlib.util.module_from_spec(spec)
258 assert spec.loader is not None
259 spec.loader.exec_module(mod)
260 rev = getattr(mod, "revision", None)
261 down = getattr(mod, "down_revision", None)
262 if rev:
263 revisions[str(rev)] = str(down) if down else None
264
265 # Exactly one migration has no predecessor (the initial one)
266 roots = [r for r, d in revisions.items() if d is None]
267 assert len(roots) == 1, f"Expected exactly one root migration, got: {roots}"
268
269 # Every other migration's down_revision must exist as a revision
270 for rev, down in revisions.items():
271 if down is not None:
272 assert down in revisions, (
273 f"Migration {rev}: down_revision '{down}' not found"
274 )
275
276 def test_all_migrations_present(self) -> None:
277 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
278 files = {f.stem for f in versions_dir.glob("*.py") if not f.stem.startswith("__")}
279 assert len(files) == len(_ALL_REVISIONS), (
280 f"Expected {len(_ALL_REVISIONS)} migrations, found {len(files)}: {files}"
281 )
282
283 def test_head_revision_is_correct(self) -> None:
284 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
285 # The head is the revision not referenced as any down_revision
286 import importlib.util
287
288 all_revs: set[str] = set()
289 down_revs: set[str] = set()
290 for f in versions_dir.glob("*.py"):
291 spec = importlib.util.spec_from_file_location(f.stem, f)
292 assert spec and spec.loader
293 mod = importlib.util.module_from_spec(spec)
294 assert spec.loader is not None
295 spec.loader.exec_module(mod)
296 rev = getattr(mod, "revision", None)
297 down = getattr(mod, "down_revision", None)
298 if rev:
299 all_revs.add(str(rev))
300 if down:
301 down_revs.add(str(down))
302 heads = all_revs - down_revs
303 assert heads == {_HEAD}, f"Expected head {_HEAD}, got: {heads}"
304
305 def test_each_migration_has_upgrade_and_downgrade(self) -> None:
306 import importlib.util
307
308 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
309 for f in versions_dir.glob("*.py"):
310 spec = importlib.util.spec_from_file_location(f.stem, f)
311 assert spec and spec.loader
312 mod = importlib.util.module_from_spec(spec)
313 assert spec.loader is not None
314 spec.loader.exec_module(mod)
315 assert callable(getattr(mod, "upgrade", None)), (
316 f"{f.name} missing upgrade()"
317 )
318 assert callable(getattr(mod, "downgrade", None)), (
319 f"{f.name} missing downgrade()"
320 )
321
322 def test_alembic_ini_points_to_correct_script_location(self) -> None:
323 import configparser
324
325 parser = configparser.ConfigParser()
326 parser.read(str(_PROJECT_ROOT / "alembic.ini"))
327 script_location = parser.get("alembic", "script_location", fallback="")
328 assert "alembic" in script_location
329
330 def test_env_py_imports_all_model_bases(self) -> None:
331 env_text = (_PROJECT_ROOT / "alembic" / "env.py").read_text()
332 assert "from musehub.db.database import Base" in env_text
333 assert "target_metadata = Base.metadata" in env_text
334
335
336 # ══════════════════════════════════════════════════════════════════════════════
337 # 2. Integration
338 # ══════════════════════════════════════════════════════════════════════════════
339
340
341 class TestMigrationIntegration:
342 """Real DB — run migrations and inspect schema state."""
343
344 def test_upgrade_to_head_succeeds(self) -> None:
345 """Full migration chain from empty DB to HEAD completes without error."""
346 _fresh_db()
347 _upgrade(_TEST_URL)
348 rev = _current_rev(_TEST_URL)
349 assert rev == _HEAD
350
351 def test_current_revision_tracked_in_alembic_version(
352 self, migrated_engine: AsyncEngine
353 ) -> None:
354 async def _check() -> str | None:
355 async with migrated_engine.connect() as conn:
356 result = await conn.execute(text("SELECT version_num FROM alembic_version"))
357 row = result.fetchone()
358 return row[0] if row else None
359
360 rev = asyncio.run(_check())
361 assert rev == _HEAD
362
363 def test_required_tables_exist_at_head(
364 self, migrated_engine: AsyncEngine
365 ) -> None:
366 tables = asyncio.run(_tables(migrated_engine))
367 missing = _REQUIRED_TABLES - tables
368 assert not missing, f"Tables missing after upgrade to HEAD: {missing}"
369
370 def test_legacy_tables_absent_at_head(
371 self, migrated_engine: AsyncEngine
372 ) -> None:
373 tables = asyncio.run(_tables(migrated_engine))
374 present_legacy = _LEGACY_TABLES & tables
375 assert not present_legacy, f"Legacy tables still present after HEAD: {present_legacy}"
376
377 def test_downgrade_then_upgrade_returns_to_head(self) -> None:
378 """Downgrade to base and re-upgrade — must return to HEAD cleanly."""
379 _downgrade(_TEST_URL, "base")
380 assert _current_rev(_TEST_URL) is None
381 _upgrade(_TEST_URL, "head")
382 assert _current_rev(_TEST_URL) == _HEAD
383
384 def test_full_downgrade_to_base_then_re_upgrade(self) -> None:
385 """Downgrade all the way to base then re-upgrade — round trip works."""
386 _downgrade(_TEST_URL, "base")
387 assert _current_rev(_TEST_URL) is None
388 _upgrade(_TEST_URL)
389 assert _current_rev(_TEST_URL) == _HEAD
390
391
392 # ══════════════════════════════════════════════════════════════════════════════
393 # 3. End-to-End
394 # ══════════════════════════════════════════════════════════════════════════════
395
396
397 class TestMigrationE2E:
398 """Full stack: migrate, insert data, downgrade, re-upgrade, verify data."""
399
400 def test_repo_data_persists_at_head(
401 self, migrated_engine: AsyncEngine
402 ) -> None:
403 """Insert a repo row at HEAD and verify it can be read back."""
404
405 async def _run() -> None:
406 now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
407 repo_id = compute_repo_id("testuser", "e2e-repo", "", now_iso)
408 async with migrated_engine.connect() as conn:
409 await conn.execute(
410 text(
411 "INSERT INTO musehub_repos "
412 "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, "
413 "description, tags, domain_meta, training_opt_out, created_at, updated_at) "
414 "VALUES (:id, :n, :s, :o, :oid, 'public', 'main', '', "
415 "ARRAY[]::text[], '{}'::json, false, NOW(), NOW())"
416 ),
417 {"id": repo_id, "n": "e2e-repo", "s": "e2e-repo", "o": "testuser", "oid": "testuser"},
418 )
419 await conn.commit()
420
421 async with migrated_engine.connect() as conn:
422 result = await conn.execute(
423 text("SELECT repo_id FROM musehub_repos WHERE repo_id = :id"),
424 {"id": repo_id},
425 )
426 row = result.fetchone()
427 assert row is not None, "Repo row must be readable after insert"
428
429 asyncio.run(_run())
430
431 def test_identity_row_persists_at_head(
432 self, migrated_engine: AsyncEngine
433 ) -> None:
434 """Insert an identity at HEAD and verify it can be read back."""
435
436 async def _run() -> None:
437 identity_id = compute_identity_id(secrets.token_bytes(16))
438 async with migrated_engine.connect() as conn:
439 await conn.execute(
440 text(
441 "INSERT INTO musehub_identities "
442 "(identity_id, handle, display_name, identity_type, "
443 "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) "
444 "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())"
445 ),
446 {"id": identity_id, "h": "e2e-user", "dn": "E2E User"},
447 )
448 await conn.commit()
449
450 async with migrated_engine.connect() as conn:
451 result = await conn.execute(
452 text("SELECT identity_id FROM musehub_identities WHERE identity_id = :id"),
453 {"id": identity_id},
454 )
455 row = result.fetchone()
456 assert row is not None
457
458 asyncio.run(_run())
459
460 def test_each_migration_step_is_individually_runnable(self) -> None:
461 """Upgrade one step at a time from base to HEAD — each step must succeed."""
462 _downgrade(_TEST_URL, "base")
463 for rev in _ALL_REVISIONS:
464 _upgrade(_TEST_URL, rev)
465 actual = _current_rev(_TEST_URL)
466 assert actual == rev, f"After upgrading to {rev}, got revision {actual}"
467
468
469 # ══════════════════════════════════════════════════════════════════════════════
470 # 4. Stress
471 # ══════════════════════════════════════════════════════════════════════════════
472
473
474 class TestMigrationStress:
475 """Performance and repeated-execution scenarios."""
476
477 def test_full_upgrade_completes_under_60_seconds(self) -> None:
478 """25 migrations on an empty DB must complete in under 60 seconds."""
479 _fresh_db()
480 start = time.perf_counter()
481 _upgrade(_TEST_URL)
482 elapsed = time.perf_counter() - start
483 assert elapsed < 60, f"Full upgrade took {elapsed:.1f}s (budget: 60s)"
484
485 def test_full_downgrade_completes_under_60_seconds(self) -> None:
486 """Downgrade from HEAD to base must complete in under 60 seconds."""
487 # DB is at HEAD from previous test
488 start = time.perf_counter()
489 _downgrade(_TEST_URL, "base")
490 elapsed = time.perf_counter() - start
491 assert elapsed < 60, f"Full downgrade took {elapsed:.1f}s (budget: 60s)"
492 # Restore HEAD for subsequent tests
493 _upgrade(_TEST_URL)
494
495 def test_three_consecutive_upgrade_to_head_idempotent(self) -> None:
496 """Running upgrade head twice on an already-migrated DB is a no-op (idempotent)."""
497 assert _current_rev(_TEST_URL) == _HEAD
498 _upgrade(_TEST_URL) # Already at HEAD — must not error
499 assert _current_rev(_TEST_URL) == _HEAD
500 _upgrade(_TEST_URL)
501 assert _current_rev(_TEST_URL) == _HEAD
502
503 def test_100_identity_inserts_survive_step_cycle(
504 self, migrated_engine: AsyncEngine
505 ) -> None:
506 """Insert 100 identity rows and verify all are readable."""
507
508 async def _run() -> None:
509 ids = [secrets.token_hex(16) for _ in range(100)]
510 async with migrated_engine.connect() as conn:
511 for i, identity_id in enumerate(ids):
512 await conn.execute(
513 text(
514 "INSERT INTO musehub_identities "
515 "(identity_id, handle, display_name, identity_type, "
516 "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) "
517 "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())"
518 ),
519 {"id": identity_id, "h": f"stress-{i}-{identity_id[:6]}", "dn": f"Stress {i}"},
520 )
521 await conn.commit()
522
523 async with migrated_engine.connect() as conn:
524 result = await conn.execute(
525 text("SELECT COUNT(*) FROM musehub_identities WHERE identity_id = ANY(:ids)"),
526 {"ids": ids},
527 )
528 count = result.scalar()
529 assert count == 100, f"Only {count}/100 identity rows readable after insert"
530
531 asyncio.run(_run())
532
533
534 # ══════════════════════════════════════════════════════════════════════════════
535 # 5. Data Integrity
536 # ══════════════════════════════════════════════════════════════════════════════
537
538
539 class TestMigrationDataIntegrity:
540 """Schema correctness — columns, indexes, constraints at HEAD."""
541
542 def test_musehub_repos_has_required_columns(
543 self, migrated_engine: AsyncEngine
544 ) -> None:
545 cols = asyncio.run(
546 _columns(migrated_engine, "musehub_repos")
547 )
548 for col in ("repo_id", "owner", "slug", "visibility", "default_branch"):
549 assert col in cols, f"musehub_repos missing column: {col}"
550
551 def test_musehub_identities_has_agent_columns(
552 self, migrated_engine: AsyncEngine
553 ) -> None:
554 """musehub_identities must have agent identity columns."""
555 cols = asyncio.run(
556 _columns(migrated_engine, "musehub_identities")
557 )
558 for col in ("spawned_by", "scope", "expires_at"):
559 assert col in cols, f"musehub_identities missing agent column: {col}"
560
561 def test_musehub_identities_has_no_legacy_user_id(
562 self, migrated_engine: AsyncEngine
563 ) -> None:
564 """musehub_identities must not have legacy_user_id."""
565 cols = asyncio.run(
566 _columns(migrated_engine, "musehub_identities")
567 )
568 assert "legacy_user_id" not in cols
569
570 def test_musehub_issue_comments_has_no_state_refs(
571 self, migrated_engine: AsyncEngine
572 ) -> None:
573 """musehub_issue_comments must not have state_refs column."""
574 cols = asyncio.run(
575 _columns(migrated_engine, "musehub_issue_comments")
576 )
577 assert "state_refs" not in cols
578
579 def test_musehub_auth_keys_has_algorithm_column(
580 self, migrated_engine: AsyncEngine
581 ) -> None:
582 """musehub_auth_keys must have the algorithm column."""
583 cols = asyncio.run(
584 _columns(migrated_engine, "musehub_auth_keys")
585 )
586 assert "algorithm" in cols
587
588 def test_musehub_repos_unique_owner_slug_index_exists(
589 self, migrated_engine: AsyncEngine
590 ) -> None:
591 indexes = asyncio.run(
592 _indexes_for(migrated_engine, "musehub_repos")
593 )
594 assert "uq_musehub_repos_owner_slug" in indexes
595
596 def test_musehub_auth_keys_fingerprint_unique_index(
597 self, migrated_engine: AsyncEngine
598 ) -> None:
599 indexes = asyncio.run(
600 _indexes_for(migrated_engine, "musehub_auth_keys")
601 )
602 assert "uq_musehub_auth_keys_fingerprint" in indexes
603
604 def test_musehub_collaborators_has_identity_handle_column(
605 self, migrated_engine: AsyncEngine
606 ) -> None:
607 """musehub_collaborators must use identity_handle, not user_id."""
608 cols = asyncio.run(
609 _columns(migrated_engine, "musehub_collaborators")
610 )
611 assert "identity_handle" in cols
612 assert "user_id" not in cols
613
614 def test_wire_tags_table_exists(
615 self, migrated_engine: AsyncEngine
616 ) -> None:
617 tables = asyncio.run(_tables(migrated_engine))
618 assert "musehub_wire_tags" in tables
619
620 def test_sessions_table_exists(
621 self, migrated_engine: AsyncEngine
622 ) -> None:
623 tables = asyncio.run(_tables(migrated_engine))
624 assert "musehub_sessions" in tables
625
626
627 # ══════════════════════════════════════════════════════════════════════════════
628 # 6. Security
629 # ══════════════════════════════════════════════════════════════════════════════
630
631
632 class TestMigrationSecurity:
633 """Ensure migrations don't introduce security-relevant schema regressions."""
634
635 def test_legacy_auth_tables_dropped_at_head(
636 self, migrated_engine: AsyncEngine
637 ) -> None:
638 """muse_users, muse_access_tokens, musehub_profiles must not exist at HEAD."""
639 tables = asyncio.run(_tables(migrated_engine))
640 for legacy in _LEGACY_TABLES:
641 assert legacy not in tables, f"Legacy auth table '{legacy}' still present at HEAD"
642
643 def test_musehub_auth_keys_has_fingerprint_unique_constraint(
644 self, migrated_engine: AsyncEngine
645 ) -> None:
646 """Fingerprint uniqueness prevents duplicate key registration."""
647 indexes = asyncio.run(
648 _indexes_for(migrated_engine, "musehub_auth_keys")
649 )
650 assert "uq_musehub_auth_keys_fingerprint" in indexes
651
652 def test_musehub_identities_handle_unique(
653 self, migrated_engine: AsyncEngine
654 ) -> None:
655 """Identity handles must be unique — prevents impersonation via duplicate handle."""
656 indexes = asyncio.run(
657 _indexes_for(migrated_engine, "musehub_identities")
658 )
659 assert "uq_musehub_identities_handle" in indexes
660
661 def test_downgrade_does_not_expose_dropped_columns(self) -> None:
662 """After a full downgrade and re-upgrade, state_refs stays absent."""
663 _downgrade(_TEST_URL, "base")
664 _upgrade(_TEST_URL)
665 engine = create_async_engine(_TEST_URL, poolclass=NullPool)
666 try:
667 cols = asyncio.run(
668 _columns(engine, "musehub_issue_comments")
669 )
670 finally:
671 asyncio.run(engine.dispose())
672 assert "state_refs" not in cols
673
674 def test_repos_owner_slug_uniqueness_enforced(
675 self, migrated_engine: AsyncEngine
676 ) -> None:
677 """Inserting two repos with the same owner/slug must fail with an integrity error."""
678 from sqlalchemy.exc import IntegrityError
679
680 async def _run() -> None:
681 rid1 = secrets.token_hex(16)
682 rid2 = secrets.token_hex(16)
683 _dup_sql = (
684 "INSERT INTO musehub_repos "
685 "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, "
686 "description, tags, domain_meta, training_opt_out, created_at, updated_at) "
687 "VALUES (:id, 'dup', 'dup-slug', 'sec-owner', 'sec-owner', 'public', 'main', "
688 "'', ARRAY[]::text[], '{}'::json, false, NOW(), NOW())"
689 )
690 async with migrated_engine.connect() as conn:
691 await conn.execute(text(_dup_sql), {"id": rid1})
692 await conn.commit()
693 async with migrated_engine.connect() as conn:
694 with pytest.raises(IntegrityError):
695 await conn.execute(text(_dup_sql), {"id": rid2})
696 await conn.commit()
697
698 asyncio.run(_run())
699
700
701 # ══════════════════════════════════════════════════════════════════════════════
702 # 7. Performance
703 # ══════════════════════════════════════════════════════════════════════════════
704
705
706 class TestMigrationPerformance:
707 """Latency budgets for migration operations."""
708
709 def test_single_step_upgrade_under_5_seconds(self) -> None:
710 """Each individual migration step must complete in under 5 seconds."""
711 _fresh_db() # terminate open connections before DDL-heavy downgrade
712 for rev in _ALL_REVISIONS:
713 start = time.perf_counter()
714 _upgrade(_TEST_URL, rev)
715 elapsed = time.perf_counter() - start
716 assert elapsed < 5, (
717 f"Migration {rev} upgrade took {elapsed:.2f}s (budget: 5s per step)"
718 )
719
720 def test_single_step_downgrade_under_5_seconds(self) -> None:
721 """Each individual migration downgrade must complete in under 5 seconds."""
722 _fresh_db()
723 _upgrade(_TEST_URL) # start from a guaranteed-clean HEAD
724 for rev in reversed(_ALL_REVISIONS[:-1]):
725 start = time.perf_counter()
726 _downgrade(_TEST_URL, rev)
727 elapsed = time.perf_counter() - start
728 assert elapsed < 5, (
729 f"Migration {rev} downgrade took {elapsed:.2f}s (budget: 5s per step)"
730 )
731 # Final downgrade to base
732 start = time.perf_counter()
733 _downgrade(_TEST_URL, "base")
734 elapsed = time.perf_counter() - start
735 assert elapsed < 5, f"Migration base downgrade took {elapsed:.2f}s"
736 # Restore HEAD for any remaining tests
737 _upgrade(_TEST_URL)
738
739 def test_schema_introspection_under_500ms(
740 self, migrated_engine: AsyncEngine
741 ) -> None:
742 """Listing all tables in the public schema must complete in under 500ms."""
743 start = time.perf_counter()
744 asyncio.run(_tables(migrated_engine))
745 elapsed_ms = (time.perf_counter() - start) * 1000
746 assert elapsed_ms < 500, f"Table introspection took {elapsed_ms:.0f}ms (budget: 500ms)"
747
748 def test_index_introspection_under_200ms(
749 self, migrated_engine: AsyncEngine
750 ) -> None:
751 """Listing indexes for a single table must complete in under 200ms."""
752 start = time.perf_counter()
753 asyncio.run(
754 _indexes_for(migrated_engine, "musehub_repos")
755 )
756 elapsed_ms = (time.perf_counter() - start) * 1000
757 assert elapsed_ms < 200, f"Index introspection took {elapsed_ms:.0f}ms (budget: 200ms)"
758
759
760 # ══════════════════════════════════════════════════════════════════════════════
761 # Alembic chain tests (structural, no DB connection required)
762 # ══════════════════════════════════════════════════════════════════════════════
763
764
765 import inspect
766 import stat
767 import types
768
769 from alembic.config import Config
770 from alembic.script import ScriptDirectory
771
772 _REPO_ROOT = pathlib.Path(__file__).parent.parent
773 _EXPECTED_HEAD_PREFIX = _HEAD
774 _EXPECTED_MIGRATION_COUNT = len(_ALL_REVISIONS)
775
776
777 # ---------------------------------------------------------------------------
778 # Helpers
779 # ---------------------------------------------------------------------------
780
781 def _script_dir() -> ScriptDirectory:
782 cfg = Config(str(_REPO_ROOT / "alembic.ini"))
783 cfg.set_main_option("script_location", str(_REPO_ROOT / "alembic"))
784 return ScriptDirectory.from_config(cfg)
785
786
787 def _is_stub_downgrade(mod: types.ModuleType) -> bool:
788 """Return True if downgrade() is a pass-only or single-ellipsis stub."""
789 import ast
790 try:
791 src = inspect.getsource(getattr(mod, "downgrade"))
792 except (OSError, AttributeError):
793 return True
794
795 # Strip the 'def downgrade...:' line and check what's left
796 lines = [l.strip() for l in src.splitlines() if l.strip() and not l.strip().startswith("def ")]
797 # A stub body is just 'pass', '...', or a docstring with nothing else
798 non_comment = [l for l in lines if not l.startswith("#") and not l.startswith('"""') and not l.startswith("'''")]
799 if not non_comment:
800 return True
801 if len(non_comment) == 1 and non_comment[0] in ("pass", "..."):
802 return True
803 return False
804
805
806 # ---------------------------------------------------------------------------
807 # Linear chain / versioning
808 # ---------------------------------------------------------------------------
809
810 def test_migration_chain_is_linear() -> None:
811 """Migration graph must have exactly one head (no branches)."""
812 heads = _script_dir().get_heads()
813 assert len(heads) == 1, (
814 f"Expected single-head chain, got {len(heads)} heads: {heads}. "
815 "Resolve the branch before merging."
816 )
817
818
819 def test_migration_count_matches_expected() -> None:
820 """Migration count must equal _EXPECTED_MIGRATION_COUNT.
821
822 Update _EXPECTED_MIGRATION_COUNT here when adding a new migration.
823 """
824 revisions = list(_script_dir().walk_revisions())
825 assert len(revisions) == _EXPECTED_MIGRATION_COUNT, (
826 f"Expected {_EXPECTED_MIGRATION_COUNT} migrations, found {len(revisions)}. "
827 "Update _EXPECTED_MIGRATION_COUNT in this file."
828 )
829
830
831 def test_head_revision_prefix() -> None:
832 """Head must start with the expected revision prefix."""
833 heads = _script_dir().get_heads()
834 assert len(heads) == 1
835 assert heads[0].startswith(_EXPECTED_HEAD_PREFIX), (
836 f"Expected head starting with '{_EXPECTED_HEAD_PREFIX}', got '{heads[0]}'. "
837 "Update _EXPECTED_HEAD_PREFIX when a new migration is added."
838 )
839
840
841 def test_all_migrations_importable() -> None:
842 """Every migration module must be importable without errors."""
843 for rev in _script_dir().walk_revisions():
844 assert rev.module is not None, (
845 f"Revision {rev.revision} has no module — check for missing file."
846 )
847
848
849 def test_revision_ids_are_sequential_integers() -> None:
850 """Numeric revision IDs must be zero-padded 4-digit integers with no gaps within numeric revisions."""
851 revisions = sorted(_script_dir().walk_revisions(), key=lambda r: r.revision)
852 numeric_ids = sorted(int(r.revision[:4]) for r in revisions if r.revision[:4].isdigit())
853 if not numeric_ids:
854 return # no numeric revisions yet — nothing to check
855 expected = list(range(numeric_ids[0], numeric_ids[-1] + 1))
856 assert numeric_ids == expected, (
857 f"Numeric revision IDs have gaps: found {numeric_ids}, expected {expected}."
858 )
859
860
861 # ---------------------------------------------------------------------------
862 # Downgrade coverage — every forward migration has a real downgrade
863 # ---------------------------------------------------------------------------
864
865 def test_all_migrations_have_downgrade_function() -> None:
866 """Every migration module must define a downgrade() function."""
867 for rev in _script_dir().walk_revisions():
868 mod = rev.module
869 assert mod is not None
870 assert hasattr(mod, "downgrade"), (
871 f"Revision {rev.revision} is missing a downgrade() function."
872 )
873
874
875 def test_no_stub_downgrade_implementations() -> None:
876 """No migration may have a pass-only or ellipsis-only downgrade().
877
878 A stub downgrade makes rollback a silent no-op — forbidden.
879 """
880 stubs = []
881 for rev in _script_dir().walk_revisions():
882 mod = rev.module
883 if mod is not None and _is_stub_downgrade(mod):
884 stubs.append(rev.revision)
885
886 assert not stubs, (
887 f"Migrations with stub (non-functional) downgrade(): {stubs}. "
888 "Implement the actual rollback DDL."
889 )
890
891
892 def test_every_migration_references_tables_in_downgrade() -> None:
893 """Migrations that add a column or table must also reference it in downgrade().
894
895 Heuristic: if upgrade() calls op.add_column / op.create_table for table X,
896 downgrade() must reference X (via drop_column / drop_table).
897 Checked by source inspection — not exhaustive, but catches obvious omissions.
898 """
899 import re
900
901 _ADD_RE = re.compile(r'op\.(add_column|create_table)\(\s*["\'](\w+)["\']')
902 _DROP_RE = re.compile(r'op\.(drop_column|drop_table)\(\s*["\'](\w+)["\']')
903 # Also accept raw SQL drops: DROP TABLE [IF EXISTS] <name> and ALTER TABLE [IF EXISTS] <name> DROP COLUMN
904 _SQL_DROP_TABLE_RE = re.compile(r'DROP\s+TABLE\s+(?:IF\s+EXISTS\s+)?(\w+)', re.I)
905 _SQL_DROP_COL_RE = re.compile(r'ALTER\s+TABLE\s+(?:IF\s+EXISTS\s+)?(\w+)\s+DROP\s+COLUMN', re.I)
906
907 violations = []
908 for rev in _script_dir().walk_revisions():
909 mod = rev.module
910 if mod is None:
911 continue
912 try:
913 up_src = inspect.getsource(getattr(mod, "upgrade"))
914 down_src = inspect.getsource(getattr(mod, "downgrade"))
915 except (OSError, AttributeError):
916 continue
917
918 tables_added = {m.group(2) for m in _ADD_RE.finditer(up_src)}
919 tables_dropped = (
920 {m.group(2) for m in _DROP_RE.finditer(down_src)}
921 | {m.group(1) for m in _SQL_DROP_TABLE_RE.finditer(down_src)}
922 | {m.group(1) for m in _SQL_DROP_COL_RE.finditer(down_src)}
923 )
924 missing = tables_added - tables_dropped
925 if missing:
926 violations.append(f"{rev.revision}: added {missing} but downgrade() doesn't drop them")
927
928 assert not violations, (
929 f"Migrations with incomplete downgrade():\n{'\n'.join(violations)}"
930 )
931
932
933 # ---------------------------------------------------------------------------
934 # Transaction safety — env.py wraps migrations in a transaction
935 # ---------------------------------------------------------------------------
936
937 def test_env_py_uses_begin_transaction() -> None:
938 """alembic/env.py must call context.begin_transaction() for both offline and online runs.
939
940 This ensures that a failed migration rolls back cleanly instead of leaving
941 the schema in a partially-applied state.
942 """
943 env_path = _REPO_ROOT / "alembic" / "env.py"
944 src = env_path.read_text()
945 count = src.count("context.begin_transaction()")
946 assert count >= 2, (
947 f"Expected at least 2 calls to context.begin_transaction() in env.py "
948 f"(one for offline, one for online), found {count}. "
949 "Wrap both run_migrations_offline() and do_run_migrations() in a transaction."
950 )
951
952
953 # ---------------------------------------------------------------------------
954 # Prod-snapshot migration test script
955 # ---------------------------------------------------------------------------
956
957 def test_migrate_test_script_exists_and_is_executable() -> None:
958 """deploy/migrate-test.sh must exist and be executable.
959
960 This script is the mechanism for testing migrations against a production
961 data snapshot before applying to production (checklist item 5.3.2).
962 """
963 script = _REPO_ROOT / "deploy" / "migrate-test.sh"
964 assert script.exists(), (
965 "deploy/migrate-test.sh is missing. "
966 "This script is required to validate migrations against a prod snapshot."
967 )
968 mode = script.stat().st_mode
969 assert mode & stat.S_IXUSR, (
970 "deploy/migrate-test.sh must be executable (chmod +x)."
971 )
972
973
974 def test_migrate_test_script_contains_round_trip() -> None:
975 """deploy/migrate-test.sh must perform an upgrade→downgrade→upgrade round-trip."""
976 script = (_REPO_ROOT / "deploy" / "migrate-test.sh").read_text()
977 assert "upgrade head" in script, "script must run 'alembic upgrade head'"
978 assert "downgrade" in script, "script must run 'alembic downgrade' step"
979 # Must do upgrade twice (initial + after downgrade round-trip)
980 assert script.count("upgrade head") >= 2, (
981 "script must upgrade to HEAD twice (initial apply + round-trip after downgrade)"
982 )
File History 7 commits
sha256:294f5f41277b624e1107a61b102d88c784a5380c2c436095da176538d74f38ba chore: rename migration 0072 → 0071 (0071 prose backfill wa… Sonnet 4.6 minor 9 hours ago
sha256:5dfc96524e3921eb9acb8372241b6bec70b5f3e6598f79099a0ead16ff7cbb75 feat(phase1): add musehub_fetch_mpack_cache table (issue #9… Sonnet 4.6 patch 10 hours ago
sha256:d8cbca3a06f39f82f66be6c29de3f41c3dec5f367722958fb5454dcbc007cc15 fix: rc11 test fixes — event→verdict rename, migration coun… Sonnet 4.6 patch 8 days ago
sha256:009b5a222314f47640a58d75ce5a1f428f1624cf0b51384dfcdfbdfab3cc42a4 feat: migration idempotency, file attribution DAG walk, mpa… Sonnet 4.6 minor 9 days ago
sha256:ad616c6113d6c00f4efed6b2993734ca46d3e9b5bee25addd4ce8ae6b57136e5 chore: bump version to 0.2.0rc11; typing audit clean + all … Sonnet 4.6 minor 13 days ago
sha256:f93f34a08d829b55b7324c2dacfa517618560ddfb11c09a1120f325fb6a238af fix: migration downgrade guards and test timing budgets Sonnet 4.6 patch 14 days ago
sha256:ab9eda7b6479e1c35cdba9a54f62bacd2825de8faacec3ba67a9a8ef45914b7d fix: migration and wire protocol alignment Sonnet 4.6 minor 15 days ago