gabriel / musehub public
test_migrations.py python
973 lines 40.8 KB
Raw
sha256:294f5f41277b624e1107a61b102d88c784a5380c2c436095da176538d74f38ba chore: rename migration 0072 → 0071 (0071 prose backfill wa… Sonnet 4.6 minor ⚠ breaking 6 days ago
1 """Section 36 — Database Migrations / Alembic (7-layer test suite).
2
3 Covers:
4 alembic/versions/0001_consolidated_schema.py
5 Single consolidated migration, no chain
6
7 Key design decisions:
8 - Tests require a real PostgreSQL instance (migrations use now(), JSON casts, etc.).
9 The DB URL is:
10 postgresql+asyncpg://musehub:musehub@localhost:5434/musehub_migration_test
11 - Each test creates a FRESH database (drop + create) so tests are fully isolated.
12 - The `migration_db_url` fixture is module-scoped: the fresh DB is created once
13 per module; individual test groups share it but leave the DB at a known
14 revision (HEAD) after each test.
15 - Tests that need the DB at a specific intermediate revision use
16 `_run_to(cfg, rev)` helpers that upgrade/downgrade as needed.
17 - **Never run the full test suite in CI** — these tests connect to the real
18 Postgres container. Run with:
19 python -m pytest tests/test_migrations_section36.py -x -q
20
21 Connection note:
22 The postgres container is exposed at localhost:5434 (mapped from 5432).
23 Credentials: musehub / musehub.
24 """
25 from __future__ import annotations
26
27 import asyncio
28 import os
29 import pathlib
30 import secrets
31 import time
32 from collections.abc import AsyncGenerator, Generator
33 from urllib.parse import urlparse as _urlparse
34
35 import datetime
36
37 import pytest
38 import pytest_asyncio
39
40 pytestmark = pytest.mark.migrations
41 from musehub.core.genesis import compute_identity_id, compute_repo_id
42 from sqlalchemy import inspect, text
43 from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, create_async_engine
44 from sqlalchemy.pool import NullPool
45
46 # Project root: works whether running locally or inside Docker (/app)
47 _PROJECT_ROOT = pathlib.Path(__file__).parent.parent
48
49 # ── constants ─────────────────────────────────────────────────────────────────
50
51 # Derive DB host from DATABASE_URL env var (set inside Docker) or fall back to
52 # the host-mapped port used for local development.
53 _raw_db_url = os.environ.get("DATABASE_URL", "")
54 if _raw_db_url:
55 _p = _urlparse(_raw_db_url)
56 _PG_BASE_URL = f"postgresql+asyncpg://{_p.username}:{_p.password}@{_p.hostname}:{_p.port}"
57 else:
58 _PG_BASE_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434"
59 _TEST_DB = "musehub_migration_test_s36"
60 _TEST_URL = f"{_PG_BASE_URL}/{_TEST_DB}"
61 _ADMIN_URL = f"{_PG_BASE_URL}/musehub" # connect to main DB to create/drop the test DB
62
63 # Separate DB for performance tests so _fresh_db() there doesn't kill migrated_engine
64 _PERF_TEST_DB = "musehub_migration_perf_s36"
65 _PERF_TEST_URL = f"{_PG_BASE_URL}/{_PERF_TEST_DB}"
66
67 _ALL_REVISIONS = [
68 "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008", "0009", "0010",
69 "0011", "0012", "0013", "0014", "0015", "0016", "0017", "0018", "0019", "0020",
70 "0021", "0022", "0023", "0024", "0025", "0026", "0027", "0028", "0029", "0030",
71 "0031", "0032", "0033", "0034", "0035", "0036", "0037", "0038", "0039", "0040",
72 "0041", "0042", "0043", "0044", "0045", "0046", "0047", "0048", "0049", "0050",
73 "0051", "0052", "0053", "0054", "0055", "0056", "0057", "0058", "0059", "0060",
74 "0061", "0062", "0063", "0064", "0065", "0066",
75 ]
76 _HEAD = "0066"
77
78 # Tables that MUST exist after a full upgrade to HEAD
79 _REQUIRED_TABLES = {
80 "alembic_version",
81 "muse_commits", "muse_snapshots",
82 "musehub_auth_challenges", "musehub_auth_keys", "musehub_background_jobs",
83 "musehub_branches", "musehub_collaborators",
84 "musehub_commits", "musehub_coord_records",
85 "musehub_coord_reservations", "musehub_coord_tasks",
86 "musehub_domain_installs", "musehub_domains",
87 "musehub_identities", "musehub_issue_events", "musehub_issues", "musehub_labels",
88 "musehub_object_refs", "musehub_objects", "musehub_proposals", "musehub_releases",
89 "musehub_repos", "musehub_sessions",
90 "musehub_snapshot_entries", "musehub_snapshots",
91 "musehub_wire_tags",
92 "musehub_bridge_mirrors",
93 "musehub_symbol_history_entries", "musehub_symbol_intel", "musehub_hash_occurrence_entries",
94 }
95
96 # Tables that MUST be absent after a full upgrade
97 _LEGACY_TABLES = {
98 "muse_users", "muse_access_tokens", "musehub_profiles",
99 "musehub_issue_milestones", "musehub_milestones",
100 }
101
102
103 # ── helpers ───────────────────────────────────────────────────────────────────
104
105
106 def _run_alembic(db_url: str, *args: str) -> None:
107 """Run an alembic command in a subprocess to avoid the settings lru_cache.
108
109 The in-process lru_cache on musehub.config.settings is populated from
110 conftest.py before DATABASE_URL is set; running in a subprocess guarantees
111 a clean settings instance that picks up our DATABASE_URL.
112 """
113 import subprocess
114 import sys
115
116 env = {
117 "DATABASE_URL": db_url,
118 "MUSE_ENV": "test",
119 "PATH": "/usr/local/bin:/usr/bin:/bin",
120 }
121 result = subprocess.run(
122 [sys.executable, "-m", "alembic"] + list(args),
123 cwd=str(_PROJECT_ROOT),
124 env=env,
125 capture_output=True,
126 text=True,
127 timeout=120,
128 )
129 if result.returncode != 0:
130 raise RuntimeError(
131 f"alembic {' '.join(args)} failed:\n{result.stderr[-2000:]}"
132 )
133
134
135 def _upgrade(db_url: str, revision: str = "head") -> None:
136 _run_alembic(db_url, "upgrade", revision)
137
138
139 def _downgrade(db_url: str, revision: str) -> None:
140 _run_alembic(db_url, "downgrade", revision)
141
142
143 def _current_rev(db_url: str) -> str | None:
144 from alembic.runtime.migration import MigrationContext
145 from sqlalchemy import create_engine
146
147 sync_url = db_url.replace("+asyncpg", "")
148 engine = create_engine(sync_url)
149 with engine.connect() as conn:
150 ctx = MigrationContext.configure(conn)
151 rev = ctx.get_current_revision()
152 engine.dispose()
153 return rev
154
155
156 async def _tables(engine: AsyncEngine) -> set[str]:
157 async with engine.connect() as conn:
158 result = await conn.execute(
159 text(
160 "SELECT tablename FROM pg_tables "
161 "WHERE schemaname='public' ORDER BY tablename"
162 )
163 )
164 return {row[0] for row in result}
165
166
167 async def _indexes_for(engine: AsyncEngine, table: str) -> set[str]:
168 async with engine.connect() as conn:
169 result = await conn.execute(
170 text(
171 "SELECT indexname FROM pg_indexes "
172 "WHERE schemaname='public' AND tablename = :t"
173 ),
174 {"t": table},
175 )
176 return {row[0] for row in result}
177
178
179 async def _columns(engine: AsyncEngine, table: str) -> set[str]:
180 async with engine.connect() as conn:
181 result = await conn.execute(
182 text(
183 "SELECT column_name FROM information_schema.columns "
184 "WHERE table_schema='public' AND table_name = :t"
185 ),
186 {"t": table},
187 )
188 return {row[0] for row in result}
189
190
191 def _fresh_db() -> None:
192 """Drop and recreate the section-36 test database synchronously."""
193 from sqlalchemy import create_engine, text as stext
194
195 sync_admin = _ADMIN_URL.replace("+asyncpg", "")
196 engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT")
197 with engine.connect() as conn:
198 conn.execute(
199 stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_TEST_DB}'")
200 )
201 conn.execute(stext(f"DROP DATABASE IF EXISTS {_TEST_DB}"))
202 conn.execute(stext(f"CREATE DATABASE {_TEST_DB}"))
203 engine.dispose()
204
205
206 def _perf_fresh_db() -> None:
207 """Drop and recreate the performance test database (separate from _TEST_DB)."""
208 from sqlalchemy import create_engine, text as stext
209
210 sync_admin = _ADMIN_URL.replace("+asyncpg", "")
211 engine = create_engine(sync_admin, isolation_level="AUTOCOMMIT")
212 with engine.connect() as conn:
213 conn.execute(
214 stext(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='{_PERF_TEST_DB}'")
215 )
216 conn.execute(stext(f"DROP DATABASE IF EXISTS {_PERF_TEST_DB}"))
217 conn.execute(stext(f"CREATE DATABASE {_PERF_TEST_DB}"))
218 engine.dispose()
219
220
221 # ── module-scoped engine fixture ──────────────────────────────────────────────
222
223
224 @pytest.fixture(scope="module")
225 def migrated_engine() -> Generator[AsyncEngine, None, None]:
226 """Create a fresh DB, run all migrations to HEAD, yield an async engine.
227
228 Module-scoped: created once for all tests in this file.
229 The DB is left at HEAD after each test (tests that downgrade must re-upgrade).
230 """
231 _fresh_db()
232 _upgrade(_TEST_URL)
233 engine = create_async_engine(_TEST_URL, poolclass=NullPool)
234 yield engine
235 asyncio.run(engine.dispose())
236
237
238 # ══════════════════════════════════════════════════════════════════════════════
239 # 1. Unit
240 # ══════════════════════════════════════════════════════════════════════════════
241
242
243 class TestMigrationUnit:
244 """Static analysis of migration files — no DB connection needed."""
245
246 def test_revision_chain_is_linear(self) -> None:
247 """Every migration except the initial one has a down_revision pointing to its predecessor."""
248 import importlib.util
249
250 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
251 revisions = {}
252 for f in versions_dir.glob("*.py"):
253 spec = importlib.util.spec_from_file_location(f.stem, f)
254 assert spec and spec.loader
255 mod = importlib.util.module_from_spec(spec)
256 assert spec.loader is not None
257 spec.loader.exec_module(mod)
258 rev = getattr(mod, "revision", None)
259 down = getattr(mod, "down_revision", None)
260 if rev:
261 revisions[str(rev)] = str(down) if down else None
262
263 # Exactly one migration has no predecessor (the initial one)
264 roots = [r for r, d in revisions.items() if d is None]
265 assert len(roots) == 1, f"Expected exactly one root migration, got: {roots}"
266
267 # Every other migration's down_revision must exist as a revision
268 for rev, down in revisions.items():
269 if down is not None:
270 assert down in revisions, (
271 f"Migration {rev}: down_revision '{down}' not found"
272 )
273
274 def test_all_migrations_present(self) -> None:
275 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
276 files = {f.stem for f in versions_dir.glob("*.py") if not f.stem.startswith("__")}
277 assert len(files) == len(_ALL_REVISIONS), (
278 f"Expected {len(_ALL_REVISIONS)} migrations, found {len(files)}: {files}"
279 )
280
281 def test_head_revision_is_correct(self) -> None:
282 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
283 # The head is the revision not referenced as any down_revision
284 import importlib.util
285
286 all_revs: set[str] = set()
287 down_revs: set[str] = set()
288 for f in versions_dir.glob("*.py"):
289 spec = importlib.util.spec_from_file_location(f.stem, f)
290 assert spec and spec.loader
291 mod = importlib.util.module_from_spec(spec)
292 assert spec.loader is not None
293 spec.loader.exec_module(mod)
294 rev = getattr(mod, "revision", None)
295 down = getattr(mod, "down_revision", None)
296 if rev:
297 all_revs.add(str(rev))
298 if down:
299 down_revs.add(str(down))
300 heads = all_revs - down_revs
301 assert heads == {_HEAD}, f"Expected head {_HEAD}, got: {heads}"
302
303 def test_each_migration_has_upgrade_and_downgrade(self) -> None:
304 import importlib.util
305
306 versions_dir = _PROJECT_ROOT / "alembic" / "versions"
307 for f in versions_dir.glob("*.py"):
308 spec = importlib.util.spec_from_file_location(f.stem, f)
309 assert spec and spec.loader
310 mod = importlib.util.module_from_spec(spec)
311 assert spec.loader is not None
312 spec.loader.exec_module(mod)
313 assert callable(getattr(mod, "upgrade", None)), (
314 f"{f.name} missing upgrade()"
315 )
316 assert callable(getattr(mod, "downgrade", None)), (
317 f"{f.name} missing downgrade()"
318 )
319
320 def test_alembic_ini_points_to_correct_script_location(self) -> None:
321 import configparser
322
323 parser = configparser.ConfigParser()
324 parser.read(str(_PROJECT_ROOT / "alembic.ini"))
325 script_location = parser.get("alembic", "script_location", fallback="")
326 assert "alembic" in script_location
327
328 def test_env_py_imports_all_model_bases(self) -> None:
329 env_text = (_PROJECT_ROOT / "alembic" / "env.py").read_text()
330 assert "from musehub.db.database import Base" in env_text
331 assert "target_metadata = Base.metadata" in env_text
332
333
334 # ══════════════════════════════════════════════════════════════════════════════
335 # 2. Integration
336 # ══════════════════════════════════════════════════════════════════════════════
337
338
339 class TestMigrationIntegration:
340 """Real DB — run migrations and inspect schema state."""
341
342 def test_upgrade_to_head_succeeds(self) -> None:
343 """Full migration chain from empty DB to HEAD completes without error."""
344 _fresh_db()
345 _upgrade(_TEST_URL)
346 rev = _current_rev(_TEST_URL)
347 assert rev == _HEAD
348
349 def test_current_revision_tracked_in_alembic_version(
350 self, migrated_engine: AsyncEngine
351 ) -> None:
352 async def _check() -> str | None:
353 async with migrated_engine.connect() as conn:
354 result = await conn.execute(text("SELECT version_num FROM alembic_version"))
355 row = result.fetchone()
356 return row[0] if row else None
357
358 rev = asyncio.run(_check())
359 assert rev == _HEAD
360
361 def test_required_tables_exist_at_head(
362 self, migrated_engine: AsyncEngine
363 ) -> None:
364 tables = asyncio.run(_tables(migrated_engine))
365 missing = _REQUIRED_TABLES - tables
366 assert not missing, f"Tables missing after upgrade to HEAD: {missing}"
367
368 def test_legacy_tables_absent_at_head(
369 self, migrated_engine: AsyncEngine
370 ) -> None:
371 tables = asyncio.run(_tables(migrated_engine))
372 present_legacy = _LEGACY_TABLES & tables
373 assert not present_legacy, f"Legacy tables still present after HEAD: {present_legacy}"
374
375 def test_downgrade_then_upgrade_returns_to_head(self) -> None:
376 """Downgrade to base and re-upgrade — must return to HEAD cleanly."""
377 _downgrade(_TEST_URL, "base")
378 assert _current_rev(_TEST_URL) is None
379 _upgrade(_TEST_URL, "head")
380 assert _current_rev(_TEST_URL) == _HEAD
381
382 def test_full_downgrade_to_base_then_re_upgrade(self) -> None:
383 """Downgrade all the way to base then re-upgrade — round trip works."""
384 _downgrade(_TEST_URL, "base")
385 assert _current_rev(_TEST_URL) is None
386 _upgrade(_TEST_URL)
387 assert _current_rev(_TEST_URL) == _HEAD
388
389
390 # ══════════════════════════════════════════════════════════════════════════════
391 # 3. End-to-End
392 # ══════════════════════════════════════════════════════════════════════════════
393
394
395 class TestMigrationE2E:
396 """Full stack: migrate, insert data, downgrade, re-upgrade, verify data."""
397
398 def test_repo_data_persists_at_head(
399 self, migrated_engine: AsyncEngine
400 ) -> None:
401 """Insert a repo row at HEAD and verify it can be read back."""
402
403 async def _run() -> None:
404 now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
405 repo_id = compute_repo_id("testuser", "e2e-repo", "", now_iso)
406 async with migrated_engine.connect() as conn:
407 await conn.execute(
408 text(
409 "INSERT INTO musehub_repos "
410 "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, "
411 "description, tags, domain_meta, training_opt_out, created_at, updated_at) "
412 "VALUES (:id, :n, :s, :o, :oid, 'public', 'main', '', "
413 "ARRAY[]::text[], '{}'::json, false, NOW(), NOW())"
414 ),
415 {"id": repo_id, "n": "e2e-repo", "s": "e2e-repo", "o": "testuser", "oid": "testuser"},
416 )
417 await conn.commit()
418
419 async with migrated_engine.connect() as conn:
420 result = await conn.execute(
421 text("SELECT repo_id FROM musehub_repos WHERE repo_id = :id"),
422 {"id": repo_id},
423 )
424 row = result.fetchone()
425 assert row is not None, "Repo row must be readable after insert"
426
427 asyncio.run(_run())
428
429 def test_identity_row_persists_at_head(
430 self, migrated_engine: AsyncEngine
431 ) -> None:
432 """Insert an identity at HEAD and verify it can be read back."""
433
434 async def _run() -> None:
435 identity_id = compute_identity_id(secrets.token_bytes(16))
436 async with migrated_engine.connect() as conn:
437 await conn.execute(
438 text(
439 "INSERT INTO musehub_identities "
440 "(identity_id, handle, display_name, identity_type, "
441 "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) "
442 "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())"
443 ),
444 {"id": identity_id, "h": "e2e-user", "dn": "E2E User"},
445 )
446 await conn.commit()
447
448 async with migrated_engine.connect() as conn:
449 result = await conn.execute(
450 text("SELECT identity_id FROM musehub_identities WHERE identity_id = :id"),
451 {"id": identity_id},
452 )
453 row = result.fetchone()
454 assert row is not None
455
456 asyncio.run(_run())
457
458 def test_each_migration_step_is_individually_runnable(self) -> None:
459 """Upgrade one step at a time from base to HEAD — each step must succeed."""
460 _downgrade(_TEST_URL, "base")
461 for rev in _ALL_REVISIONS:
462 _upgrade(_TEST_URL, rev)
463 actual = _current_rev(_TEST_URL)
464 assert actual == rev, f"After upgrading to {rev}, got revision {actual}"
465
466
467 # ══════════════════════════════════════════════════════════════════════════════
468 # 4. Stress
469 # ══════════════════════════════════════════════════════════════════════════════
470
471
472 class TestMigrationStress:
473 """Performance and repeated-execution scenarios."""
474
475 def test_full_upgrade_completes_under_60_seconds(self) -> None:
476 """25 migrations on an empty DB must complete in under 60 seconds."""
477 _fresh_db()
478 start = time.perf_counter()
479 _upgrade(_TEST_URL)
480 elapsed = time.perf_counter() - start
481 assert elapsed < 60, f"Full upgrade took {elapsed:.1f}s (budget: 60s)"
482
483 def test_full_downgrade_completes_under_60_seconds(self) -> None:
484 """Downgrade from HEAD to base must complete in under 60 seconds."""
485 # DB is at HEAD from previous test
486 start = time.perf_counter()
487 _downgrade(_TEST_URL, "base")
488 elapsed = time.perf_counter() - start
489 assert elapsed < 60, f"Full downgrade took {elapsed:.1f}s (budget: 60s)"
490 # Restore HEAD for subsequent tests
491 _upgrade(_TEST_URL)
492
493 def test_three_consecutive_upgrade_to_head_idempotent(self) -> None:
494 """Running upgrade head twice on an already-migrated DB is a no-op (idempotent)."""
495 assert _current_rev(_TEST_URL) == _HEAD
496 _upgrade(_TEST_URL) # Already at HEAD — must not error
497 assert _current_rev(_TEST_URL) == _HEAD
498 _upgrade(_TEST_URL)
499 assert _current_rev(_TEST_URL) == _HEAD
500
501 def test_100_identity_inserts_survive_step_cycle(
502 self, migrated_engine: AsyncEngine
503 ) -> None:
504 """Insert 100 identity rows and verify all are readable."""
505
506 async def _run() -> None:
507 ids = [secrets.token_hex(16) for _ in range(100)]
508 async with migrated_engine.connect() as conn:
509 for i, identity_id in enumerate(ids):
510 await conn.execute(
511 text(
512 "INSERT INTO musehub_identities "
513 "(identity_id, handle, display_name, identity_type, "
514 "agent_capabilities, is_verified, pinned_repo_ids, created_at, updated_at) "
515 "VALUES (:id, :h, :dn, 'human', ARRAY[]::text[], false, ARRAY[]::text[], NOW(), NOW())"
516 ),
517 {"id": identity_id, "h": f"stress-{i}-{identity_id[:6]}", "dn": f"Stress {i}"},
518 )
519 await conn.commit()
520
521 async with migrated_engine.connect() as conn:
522 result = await conn.execute(
523 text("SELECT COUNT(*) FROM musehub_identities WHERE identity_id = ANY(:ids)"),
524 {"ids": ids},
525 )
526 count = result.scalar()
527 assert count == 100, f"Only {count}/100 identity rows readable after insert"
528
529 asyncio.run(_run())
530
531
532 # ══════════════════════════════════════════════════════════════════════════════
533 # 5. Data Integrity
534 # ══════════════════════════════════════════════════════════════════════════════
535
536
537 class TestMigrationDataIntegrity:
538 """Schema correctness — columns, indexes, constraints at HEAD."""
539
540 def test_musehub_repos_has_required_columns(
541 self, migrated_engine: AsyncEngine
542 ) -> None:
543 cols = asyncio.run(
544 _columns(migrated_engine, "musehub_repos")
545 )
546 for col in ("repo_id", "owner", "slug", "visibility", "default_branch"):
547 assert col in cols, f"musehub_repos missing column: {col}"
548
549 def test_musehub_identities_has_agent_columns(
550 self, migrated_engine: AsyncEngine
551 ) -> None:
552 """musehub_identities must have agent identity columns."""
553 cols = asyncio.run(
554 _columns(migrated_engine, "musehub_identities")
555 )
556 for col in ("spawned_by", "scope", "expires_at"):
557 assert col in cols, f"musehub_identities missing agent column: {col}"
558
559 def test_musehub_identities_has_no_legacy_user_id(
560 self, migrated_engine: AsyncEngine
561 ) -> None:
562 """musehub_identities must not have legacy_user_id."""
563 cols = asyncio.run(
564 _columns(migrated_engine, "musehub_identities")
565 )
566 assert "legacy_user_id" not in cols
567
568 def test_musehub_issue_comments_has_no_state_refs(
569 self, migrated_engine: AsyncEngine
570 ) -> None:
571 """musehub_issue_comments must not have state_refs column."""
572 cols = asyncio.run(
573 _columns(migrated_engine, "musehub_issue_comments")
574 )
575 assert "state_refs" not in cols
576
577 def test_musehub_auth_keys_has_algorithm_column(
578 self, migrated_engine: AsyncEngine
579 ) -> None:
580 """musehub_auth_keys must have the algorithm column."""
581 cols = asyncio.run(
582 _columns(migrated_engine, "musehub_auth_keys")
583 )
584 assert "algorithm" in cols
585
586 def test_musehub_repos_unique_owner_slug_index_exists(
587 self, migrated_engine: AsyncEngine
588 ) -> None:
589 indexes = asyncio.run(
590 _indexes_for(migrated_engine, "musehub_repos")
591 )
592 assert "uq_musehub_repos_owner_slug" in indexes
593
594 def test_musehub_auth_keys_fingerprint_unique_index(
595 self, migrated_engine: AsyncEngine
596 ) -> None:
597 indexes = asyncio.run(
598 _indexes_for(migrated_engine, "musehub_auth_keys")
599 )
600 assert "uq_musehub_auth_keys_fingerprint" in indexes
601
602 def test_musehub_collaborators_has_identity_handle_column(
603 self, migrated_engine: AsyncEngine
604 ) -> None:
605 """musehub_collaborators must use identity_handle, not user_id."""
606 cols = asyncio.run(
607 _columns(migrated_engine, "musehub_collaborators")
608 )
609 assert "identity_handle" in cols
610 assert "user_id" not in cols
611
612 def test_wire_tags_table_exists(
613 self, migrated_engine: AsyncEngine
614 ) -> None:
615 tables = asyncio.run(_tables(migrated_engine))
616 assert "musehub_wire_tags" in tables
617
618 def test_sessions_table_exists(
619 self, migrated_engine: AsyncEngine
620 ) -> None:
621 tables = asyncio.run(_tables(migrated_engine))
622 assert "musehub_sessions" in tables
623
624
625 # ══════════════════════════════════════════════════════════════════════════════
626 # 6. Security
627 # ══════════════════════════════════════════════════════════════════════════════
628
629
630 class TestMigrationSecurity:
631 """Ensure migrations don't introduce security-relevant schema regressions."""
632
633 def test_legacy_auth_tables_dropped_at_head(
634 self, migrated_engine: AsyncEngine
635 ) -> None:
636 """muse_users, muse_access_tokens, musehub_profiles must not exist at HEAD."""
637 tables = asyncio.run(_tables(migrated_engine))
638 for legacy in _LEGACY_TABLES:
639 assert legacy not in tables, f"Legacy auth table '{legacy}' still present at HEAD"
640
641 def test_musehub_auth_keys_has_fingerprint_unique_constraint(
642 self, migrated_engine: AsyncEngine
643 ) -> None:
644 """Fingerprint uniqueness prevents duplicate key registration."""
645 indexes = asyncio.run(
646 _indexes_for(migrated_engine, "musehub_auth_keys")
647 )
648 assert "uq_musehub_auth_keys_fingerprint" in indexes
649
650 def test_musehub_identities_handle_unique(
651 self, migrated_engine: AsyncEngine
652 ) -> None:
653 """Identity handles must be unique — prevents impersonation via duplicate handle."""
654 indexes = asyncio.run(
655 _indexes_for(migrated_engine, "musehub_identities")
656 )
657 assert "uq_musehub_identities_handle" in indexes
658
659 def test_downgrade_does_not_expose_dropped_columns(self) -> None:
660 """After a full downgrade and re-upgrade, state_refs stays absent."""
661 _downgrade(_TEST_URL, "base")
662 _upgrade(_TEST_URL)
663 engine = create_async_engine(_TEST_URL, poolclass=NullPool)
664 try:
665 cols = asyncio.run(
666 _columns(engine, "musehub_issue_comments")
667 )
668 finally:
669 asyncio.run(engine.dispose())
670 assert "state_refs" not in cols
671
672 def test_repos_owner_slug_uniqueness_enforced(
673 self, migrated_engine: AsyncEngine
674 ) -> None:
675 """Inserting two repos with the same owner/slug must fail with an integrity error."""
676 from sqlalchemy.exc import IntegrityError
677
678 async def _run() -> None:
679 rid1 = secrets.token_hex(16)
680 rid2 = secrets.token_hex(16)
681 _dup_sql = (
682 "INSERT INTO musehub_repos "
683 "(repo_id, name, slug, owner, owner_user_id, visibility, default_branch, "
684 "description, tags, domain_meta, training_opt_out, created_at, updated_at) "
685 "VALUES (:id, 'dup', 'dup-slug', 'sec-owner', 'sec-owner', 'public', 'main', "
686 "'', ARRAY[]::text[], '{}'::json, false, NOW(), NOW())"
687 )
688 async with migrated_engine.connect() as conn:
689 await conn.execute(text(_dup_sql), {"id": rid1})
690 await conn.commit()
691 async with migrated_engine.connect() as conn:
692 with pytest.raises(IntegrityError):
693 await conn.execute(text(_dup_sql), {"id": rid2})
694 await conn.commit()
695
696 asyncio.run(_run())
697
698
699 # ══════════════════════════════════════════════════════════════════════════════
700 # 7. Performance
701 # ══════════════════════════════════════════════════════════════════════════════
702
703
704 class TestMigrationPerformance:
705 """Latency budgets for migration operations."""
706
707 def test_single_step_upgrade_under_5_seconds(self) -> None:
708 """Each individual migration step must complete in under 5 seconds."""
709 _fresh_db() # terminate open connections before DDL-heavy downgrade
710 for rev in _ALL_REVISIONS:
711 start = time.perf_counter()
712 _upgrade(_TEST_URL, rev)
713 elapsed = time.perf_counter() - start
714 assert elapsed < 5, (
715 f"Migration {rev} upgrade took {elapsed:.2f}s (budget: 5s per step)"
716 )
717
718 def test_single_step_downgrade_under_5_seconds(self) -> None:
719 """Each individual migration downgrade must complete in under 5 seconds."""
720 _fresh_db()
721 _upgrade(_TEST_URL) # start from a guaranteed-clean HEAD
722 for rev in reversed(_ALL_REVISIONS[:-1]):
723 start = time.perf_counter()
724 _downgrade(_TEST_URL, rev)
725 elapsed = time.perf_counter() - start
726 assert elapsed < 5, (
727 f"Migration {rev} downgrade took {elapsed:.2f}s (budget: 5s per step)"
728 )
729 # Final downgrade to base
730 start = time.perf_counter()
731 _downgrade(_TEST_URL, "base")
732 elapsed = time.perf_counter() - start
733 assert elapsed < 5, f"Migration base downgrade took {elapsed:.2f}s"
734 # Restore HEAD for any remaining tests
735 _upgrade(_TEST_URL)
736
737 def test_schema_introspection_under_500ms(
738 self, migrated_engine: AsyncEngine
739 ) -> None:
740 """Listing all tables in the public schema must complete in under 500ms."""
741 start = time.perf_counter()
742 asyncio.run(_tables(migrated_engine))
743 elapsed_ms = (time.perf_counter() - start) * 1000
744 assert elapsed_ms < 500, f"Table introspection took {elapsed_ms:.0f}ms (budget: 500ms)"
745
746 def test_index_introspection_under_200ms(
747 self, migrated_engine: AsyncEngine
748 ) -> None:
749 """Listing indexes for a single table must complete in under 200ms."""
750 start = time.perf_counter()
751 asyncio.run(
752 _indexes_for(migrated_engine, "musehub_repos")
753 )
754 elapsed_ms = (time.perf_counter() - start) * 1000
755 assert elapsed_ms < 200, f"Index introspection took {elapsed_ms:.0f}ms (budget: 200ms)"
756
757
758 # ══════════════════════════════════════════════════════════════════════════════
759 # Alembic chain tests (structural, no DB connection required)
760 # ══════════════════════════════════════════════════════════════════════════════
761
762
763 import inspect
764 import stat
765 import types
766
767 from alembic.config import Config
768 from alembic.script import ScriptDirectory
769
770 _REPO_ROOT = pathlib.Path(__file__).parent.parent
771 _EXPECTED_HEAD_PREFIX = _HEAD
772 _EXPECTED_MIGRATION_COUNT = len(_ALL_REVISIONS)
773
774
775 # ---------------------------------------------------------------------------
776 # Helpers
777 # ---------------------------------------------------------------------------
778
779 def _script_dir() -> ScriptDirectory:
780 cfg = Config(str(_REPO_ROOT / "alembic.ini"))
781 cfg.set_main_option("script_location", str(_REPO_ROOT / "alembic"))
782 return ScriptDirectory.from_config(cfg)
783
784
785 def _is_stub_downgrade(mod: types.ModuleType) -> bool:
786 """Return True if downgrade() is a pass-only or single-ellipsis stub."""
787 import ast
788 try:
789 src = inspect.getsource(getattr(mod, "downgrade"))
790 except (OSError, AttributeError):
791 return True
792
793 # Strip the 'def downgrade...:' line and check what's left
794 lines = [l.strip() for l in src.splitlines() if l.strip() and not l.strip().startswith("def ")]
795 # A stub body is just 'pass', '...', or a docstring with nothing else
796 non_comment = [l for l in lines if not l.startswith("#") and not l.startswith('"""') and not l.startswith("'''")]
797 if not non_comment:
798 return True
799 if len(non_comment) == 1 and non_comment[0] in ("pass", "..."):
800 return True
801 return False
802
803
804 # ---------------------------------------------------------------------------
805 # Linear chain / versioning
806 # ---------------------------------------------------------------------------
807
808 def test_migration_chain_is_linear() -> None:
809 """Migration graph must have exactly one head (no branches)."""
810 heads = _script_dir().get_heads()
811 assert len(heads) == 1, (
812 f"Expected single-head chain, got {len(heads)} heads: {heads}. "
813 "Resolve the branch before merging."
814 )
815
816
817 def test_migration_count_matches_expected() -> None:
818 """Migration count must equal _EXPECTED_MIGRATION_COUNT.
819
820 Update _EXPECTED_MIGRATION_COUNT here when adding a new migration.
821 """
822 revisions = list(_script_dir().walk_revisions())
823 assert len(revisions) == _EXPECTED_MIGRATION_COUNT, (
824 f"Expected {_EXPECTED_MIGRATION_COUNT} migrations, found {len(revisions)}. "
825 "Update _EXPECTED_MIGRATION_COUNT in this file."
826 )
827
828
829 def test_head_revision_prefix() -> None:
830 """Head must start with the expected revision prefix."""
831 heads = _script_dir().get_heads()
832 assert len(heads) == 1
833 assert heads[0].startswith(_EXPECTED_HEAD_PREFIX), (
834 f"Expected head starting with '{_EXPECTED_HEAD_PREFIX}', got '{heads[0]}'. "
835 "Update _EXPECTED_HEAD_PREFIX when a new migration is added."
836 )
837
838
839 def test_all_migrations_importable() -> None:
840 """Every migration module must be importable without errors."""
841 for rev in _script_dir().walk_revisions():
842 assert rev.module is not None, (
843 f"Revision {rev.revision} has no module — check for missing file."
844 )
845
846
847 def test_revision_ids_are_sequential_integers() -> None:
848 """Numeric revision IDs must be zero-padded 4-digit integers with no gaps within numeric revisions."""
849 revisions = sorted(_script_dir().walk_revisions(), key=lambda r: r.revision)
850 numeric_ids = sorted(int(r.revision[:4]) for r in revisions if r.revision[:4].isdigit())
851 if not numeric_ids:
852 return # no numeric revisions yet — nothing to check
853 expected = list(range(numeric_ids[0], numeric_ids[-1] + 1))
854 assert numeric_ids == expected, (
855 f"Numeric revision IDs have gaps: found {numeric_ids}, expected {expected}."
856 )
857
858
859 # ---------------------------------------------------------------------------
860 # Downgrade coverage — every forward migration has a real downgrade
861 # ---------------------------------------------------------------------------
862
863 def test_all_migrations_have_downgrade_function() -> None:
864 """Every migration module must define a downgrade() function."""
865 for rev in _script_dir().walk_revisions():
866 mod = rev.module
867 assert mod is not None
868 assert hasattr(mod, "downgrade"), (
869 f"Revision {rev.revision} is missing a downgrade() function."
870 )
871
872
873 def test_no_stub_downgrade_implementations() -> None:
874 """No migration may have a pass-only or ellipsis-only downgrade().
875
876 A stub downgrade makes rollback a silent no-op — forbidden.
877 """
878 stubs = []
879 for rev in _script_dir().walk_revisions():
880 mod = rev.module
881 if mod is not None and _is_stub_downgrade(mod):
882 stubs.append(rev.revision)
883
884 assert not stubs, (
885 f"Migrations with stub (non-functional) downgrade(): {stubs}. "
886 "Implement the actual rollback DDL."
887 )
888
889
890 def test_every_migration_references_tables_in_downgrade() -> None:
891 """Migrations that add a column or table must also reference it in downgrade().
892
893 Heuristic: if upgrade() calls op.add_column / op.create_table for table X,
894 downgrade() must reference X (via drop_column / drop_table).
895 Checked by source inspection — not exhaustive, but catches obvious omissions.
896 """
897 import re
898
899 _ADD_RE = re.compile(r'op\.(add_column|create_table)\(\s*["\'](\w+)["\']')
900 _DROP_RE = re.compile(r'op\.(drop_column|drop_table)\(\s*["\'](\w+)["\']')
901
902 violations = []
903 for rev in _script_dir().walk_revisions():
904 mod = rev.module
905 if mod is None:
906 continue
907 try:
908 up_src = inspect.getsource(getattr(mod, "upgrade"))
909 down_src = inspect.getsource(getattr(mod, "downgrade"))
910 except (OSError, AttributeError):
911 continue
912
913 tables_added = {m.group(2) for m in _ADD_RE.finditer(up_src)}
914 tables_dropped = {m.group(2) for m in _DROP_RE.finditer(down_src)}
915 missing = tables_added - tables_dropped
916 if missing:
917 violations.append(f"{rev.revision}: added {missing} but downgrade() doesn't drop them")
918
919 assert not violations, (
920 f"Migrations with incomplete downgrade():\n{'\n'.join(violations)}"
921 )
922
923
924 # ---------------------------------------------------------------------------
925 # Transaction safety — env.py wraps migrations in a transaction
926 # ---------------------------------------------------------------------------
927
928 def test_env_py_uses_begin_transaction() -> None:
929 """alembic/env.py must call context.begin_transaction() for both offline and online runs.
930
931 This ensures that a failed migration rolls back cleanly instead of leaving
932 the schema in a partially-applied state.
933 """
934 env_path = _REPO_ROOT / "alembic" / "env.py"
935 src = env_path.read_text()
936 count = src.count("context.begin_transaction()")
937 assert count >= 2, (
938 f"Expected at least 2 calls to context.begin_transaction() in env.py "
939 f"(one for offline, one for online), found {count}. "
940 "Wrap both run_migrations_offline() and do_run_migrations() in a transaction."
941 )
942
943
944 # ---------------------------------------------------------------------------
945 # Prod-snapshot migration test script
946 # ---------------------------------------------------------------------------
947
948 def test_migrate_test_script_exists_and_is_executable() -> None:
949 """deploy/migrate-test.sh must exist and be executable.
950
951 This script is the mechanism for testing migrations against a production
952 data snapshot before applying to production (checklist item 5.3.2).
953 """
954 script = _REPO_ROOT / "deploy" / "migrate-test.sh"
955 assert script.exists(), (
956 "deploy/migrate-test.sh is missing. "
957 "This script is required to validate migrations against a prod snapshot."
958 )
959 mode = script.stat().st_mode
960 assert mode & stat.S_IXUSR, (
961 "deploy/migrate-test.sh must be executable (chmod +x)."
962 )
963
964
965 def test_migrate_test_script_contains_round_trip() -> None:
966 """deploy/migrate-test.sh must perform an upgrade→downgrade→upgrade round-trip."""
967 script = (_REPO_ROOT / "deploy" / "migrate-test.sh").read_text()
968 assert "upgrade head" in script, "script must run 'alembic upgrade head'"
969 assert "downgrade" in script, "script must run 'alembic downgrade' step"
970 # Must do upgrade twice (initial + after downgrade round-trip)
971 assert script.count("upgrade head") >= 2, (
972 "script must upgrade to HEAD twice (initial apply + round-trip after downgrade)"
973 )
File History 7 commits
sha256:294f5f41277b624e1107a61b102d88c784a5380c2c436095da176538d74f38ba chore: rename migration 0072 → 0071 (0071 prose backfill wa… Sonnet 4.6 minor 6 days ago
sha256:5dfc96524e3921eb9acb8372241b6bec70b5f3e6598f79099a0ead16ff7cbb75 feat(phase1): add musehub_fetch_mpack_cache table (issue #9… Sonnet 4.6 patch 6 days ago
sha256:d8cbca3a06f39f82f66be6c29de3f41c3dec5f367722958fb5454dcbc007cc15 fix: rc11 test fixes — event→verdict rename, migration coun… Sonnet 4.6 patch 14 days ago
sha256:009b5a222314f47640a58d75ce5a1f428f1624cf0b51384dfcdfbdfab3cc42a4 feat: migration idempotency, file attribution DAG walk, mpa… Sonnet 4.6 minor 15 days ago
sha256:ad616c6113d6c00f4efed6b2993734ca46d3e9b5bee25addd4ce8ae6b57136e5 chore: bump version to 0.2.0rc11; typing audit clean + all … Sonnet 4.6 minor 19 days ago
sha256:f93f34a08d829b55b7324c2dacfa517618560ddfb11c09a1120f325fb6a238af fix: migration downgrade guards and test timing budgets Sonnet 4.6 patch 20 days ago
sha256:ab9eda7b6479e1c35cdba9a54f62bacd2825de8faacec3ba67a9a8ef45914b7d fix: migration and wire protocol alignment Sonnet 4.6 minor 21 days ago