database.py
python
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
7 days ago
| 1 | """Async SQLAlchemy database setup for PostgreSQL.""" |
| 2 | |
| 3 | import logging |
| 4 | import time |
| 5 | from typing import AsyncGenerator |
| 6 | |
| 7 | from sqlalchemy import event |
| 8 | from sqlalchemy.engine import Connection as SyncConnection |
| 9 | |
| 10 | # Opaque DBAPI-specific types for SQLAlchemy cursor-execute event parameters. |
| 11 | # These are driver-dependent and not meaningfully typed by SQLAlchemy stubs. |
| 12 | type _DBAPICursor = str | bytes | int | None |
| 13 | type _ExecParams = tuple[str | int | float | None, ...] | dict[str, str | int | float | None] | None |
| 14 | type _ExecContext = str | int | None |
| 15 | from sqlalchemy.ext.asyncio import ( |
| 16 | AsyncEngine, |
| 17 | AsyncSession, |
| 18 | async_sessionmaker, |
| 19 | create_async_engine, |
| 20 | ) |
| 21 | from sqlalchemy.orm import DeclarativeBase |
| 22 | |
| 23 | from musehub.config import settings as settings |
| 24 | |
| 25 | logger = logging.getLogger(__name__) |
| 26 | |
| 27 | class Base(DeclarativeBase): |
| 28 | """Base class for SQLAlchemy models.""" |
| 29 | pass |
| 30 | |
| 31 | # Global engine and session factory (initialized on startup) |
| 32 | _engine: AsyncEngine | None = None |
| 33 | _async_session_factory: async_sessionmaker[AsyncSession] | None = None |
| 34 | |
| 35 | def get_database_url() -> str: |
| 36 | """Get the database URL from settings.""" |
| 37 | url = settings.database_url |
| 38 | if not url: |
| 39 | raise RuntimeError( |
| 40 | "DATABASE_URL must be set. " |
| 41 | "Example: postgresql+asyncpg://musehub:musehub@localhost:5432/musehub" |
| 42 | ) |
| 43 | return url |
| 44 | |
| 45 | async def init_db() -> None: |
| 46 | """Initialize database engine and session factory. |
| 47 | |
| 48 | Schema is managed by Alembic (``alembic upgrade head`` runs in |
| 49 | the container entrypoint *before* the app starts). This function |
| 50 | only creates the async engine and session factory. |
| 51 | """ |
| 52 | global _engine, _async_session_factory |
| 53 | |
| 54 | database_url = get_database_url() |
| 55 | logger.info(f"Initializing database: {database_url.split('@')[-1] if '@' in database_url else database_url}") |
| 56 | |
| 57 | # pool_size=20/max_overflow=40: handles ~20 concurrent requests without |
| 58 | # queueing, with headroom for bursty agent traffic. |
| 59 | _engine = create_async_engine( |
| 60 | database_url, |
| 61 | echo=settings.debug, |
| 62 | pool_size=20, |
| 63 | max_overflow=40, |
| 64 | pool_recycle=1800, |
| 65 | pool_timeout=settings.db_pool_timeout, |
| 66 | # Verify connections before checkout. If a connection was returned to |
| 67 | # the pool in a broken transaction state (e.g. after an asyncpg-level |
| 68 | # error during autobegin), pre_ping discards it and issues a fresh one |
| 69 | # rather than propagating the "cannot use Connection.transaction() in a |
| 70 | # manually started transaction" error to the next request. |
| 71 | pool_pre_ping=True, |
| 72 | # TCP keepalive every 30s — prevents asyncpg idle-connection reaping |
| 73 | # during long-running push/fetch operations. |
| 74 | connect_args={ |
| 75 | "server_settings": { |
| 76 | "tcp_keepalives_idle": "30", |
| 77 | "statement_timeout": "60000", # 60s — catches runaway queries |
| 78 | }, |
| 79 | }, |
| 80 | ) |
| 81 | |
| 82 | # Slow query log: warn on any statement exceeding the configured threshold. |
| 83 | _slow_ms = settings.slow_query_threshold_ms |
| 84 | if _slow_ms > 0: |
| 85 | @event.listens_for(_engine.sync_engine, "before_cursor_execute") |
| 86 | def _before_cursor_execute( |
| 87 | conn: SyncConnection, cursor: _DBAPICursor, statement: str, parameters: _ExecParams, |
| 88 | context: _ExecContext, executemany: bool |
| 89 | ) -> None: |
| 90 | conn.info["query_start_time"] = time.monotonic() |
| 91 | |
| 92 | @event.listens_for(_engine.sync_engine, "after_cursor_execute") |
| 93 | def _after_cursor_execute( |
| 94 | conn: SyncConnection, cursor: _DBAPICursor, statement: str, parameters: _ExecParams, |
| 95 | context: _ExecContext, executemany: bool |
| 96 | ) -> None: |
| 97 | elapsed_ms = (time.monotonic() - conn.info.get("query_start_time", 0)) * 1000 |
| 98 | if elapsed_ms >= _slow_ms: |
| 99 | logger.warning( |
| 100 | "SLOW QUERY (%.1f ms): %.200s", |
| 101 | elapsed_ms, |
| 102 | statement.replace("\n", " "), |
| 103 | ) |
| 104 | |
| 105 | _async_session_factory = async_sessionmaker( |
| 106 | bind=_engine, |
| 107 | class_=AsyncSession, |
| 108 | expire_on_commit=False, |
| 109 | ) |
| 110 | |
| 111 | # Import models so relationships resolve even though Alembic owns DDL. |
| 112 | from musehub.db import muse_cli_models # noqa: F401 |
| 113 | from musehub.db.database import Base # noqa: PLC0415 |
| 114 | from musehub.db.schema_check import assert_schema_matches_orm # noqa: PLC0415 |
| 115 | |
| 116 | await assert_schema_matches_orm(_engine, Base) |
| 117 | logger.info("✅ Database initialised — ORM and schema are in sync") |
| 118 | |
| 119 | def get_engine() -> AsyncEngine: |
| 120 | """Return the module-level engine. Raises if init_db() has not been called.""" |
| 121 | if _engine is None: |
| 122 | raise RuntimeError("Database not initialized. Call init_db() first.") |
| 123 | return _engine |
| 124 | |
| 125 | async def close_db() -> None: |
| 126 | """Close database connection.""" |
| 127 | global _engine, _async_session_factory |
| 128 | |
| 129 | if _engine: |
| 130 | await _engine.dispose() |
| 131 | _engine = None |
| 132 | _async_session_factory = None |
| 133 | logger.info("Database connection closed") |
| 134 | |
| 135 | async def get_db() -> AsyncGenerator[AsyncSession, None]: |
| 136 | """ |
| 137 | Dependency for getting async database sessions. |
| 138 | |
| 139 | Usage: |
| 140 | @app.get("/users") |
| 141 | async def get_users(db: AsyncSession = Depends(get_db)): |
| 142 | ... |
| 143 | """ |
| 144 | if _async_session_factory is None: |
| 145 | raise RuntimeError("Database not initialized. Call init_db() first.") |
| 146 | |
| 147 | async with _async_session_factory() as session: |
| 148 | try: |
| 149 | yield session |
| 150 | await session.commit() |
| 151 | except Exception: |
| 152 | await session.rollback() |
| 153 | raise |
| 154 | |
| 155 | def AsyncSessionLocal() -> AsyncSession: |
| 156 | """ |
| 157 | Get a new async session directly (for non-FastAPI contexts). |
| 158 | |
| 159 | Usage: |
| 160 | async with AsyncSessionLocal() as session: |
| 161 | ... |
| 162 | """ |
| 163 | if _async_session_factory is None: |
| 164 | raise RuntimeError("Database not initialized. Call init_db() first.") |
| 165 | return _async_session_factory() |
File History
1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
7 days ago