gabriel / musehub public
database.py python
165 lines 5.9 KB
Raw
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 7 days ago
1 """Async SQLAlchemy database setup for PostgreSQL."""
2
3 import logging
4 import time
5 from typing import AsyncGenerator
6
7 from sqlalchemy import event
8 from sqlalchemy.engine import Connection as SyncConnection
9
10 # Opaque DBAPI-specific types for SQLAlchemy cursor-execute event parameters.
11 # These are driver-dependent and not meaningfully typed by SQLAlchemy stubs.
12 type _DBAPICursor = str | bytes | int | None
13 type _ExecParams = tuple[str | int | float | None, ...] | dict[str, str | int | float | None] | None
14 type _ExecContext = str | int | None
15 from sqlalchemy.ext.asyncio import (
16 AsyncEngine,
17 AsyncSession,
18 async_sessionmaker,
19 create_async_engine,
20 )
21 from sqlalchemy.orm import DeclarativeBase
22
23 from musehub.config import settings as settings
24
25 logger = logging.getLogger(__name__)
26
27 class Base(DeclarativeBase):
28 """Base class for SQLAlchemy models."""
29 pass
30
31 # Global engine and session factory (initialized on startup)
32 _engine: AsyncEngine | None = None
33 _async_session_factory: async_sessionmaker[AsyncSession] | None = None
34
35 def get_database_url() -> str:
36 """Get the database URL from settings."""
37 url = settings.database_url
38 if not url:
39 raise RuntimeError(
40 "DATABASE_URL must be set. "
41 "Example: postgresql+asyncpg://musehub:musehub@localhost:5432/musehub"
42 )
43 return url
44
45 async def init_db() -> None:
46 """Initialize database engine and session factory.
47
48 Schema is managed by Alembic (``alembic upgrade head`` runs in
49 the container entrypoint *before* the app starts). This function
50 only creates the async engine and session factory.
51 """
52 global _engine, _async_session_factory
53
54 database_url = get_database_url()
55 logger.info(f"Initializing database: {database_url.split('@')[-1] if '@' in database_url else database_url}")
56
57 # pool_size=20/max_overflow=40: handles ~20 concurrent requests without
58 # queueing, with headroom for bursty agent traffic.
59 _engine = create_async_engine(
60 database_url,
61 echo=settings.debug,
62 pool_size=20,
63 max_overflow=40,
64 pool_recycle=1800,
65 pool_timeout=settings.db_pool_timeout,
66 # Verify connections before checkout. If a connection was returned to
67 # the pool in a broken transaction state (e.g. after an asyncpg-level
68 # error during autobegin), pre_ping discards it and issues a fresh one
69 # rather than propagating the "cannot use Connection.transaction() in a
70 # manually started transaction" error to the next request.
71 pool_pre_ping=True,
72 # TCP keepalive every 30s — prevents asyncpg idle-connection reaping
73 # during long-running push/fetch operations.
74 connect_args={
75 "server_settings": {
76 "tcp_keepalives_idle": "30",
77 "statement_timeout": "60000", # 60s — catches runaway queries
78 },
79 },
80 )
81
82 # Slow query log: warn on any statement exceeding the configured threshold.
83 _slow_ms = settings.slow_query_threshold_ms
84 if _slow_ms > 0:
85 @event.listens_for(_engine.sync_engine, "before_cursor_execute")
86 def _before_cursor_execute(
87 conn: SyncConnection, cursor: _DBAPICursor, statement: str, parameters: _ExecParams,
88 context: _ExecContext, executemany: bool
89 ) -> None:
90 conn.info["query_start_time"] = time.monotonic()
91
92 @event.listens_for(_engine.sync_engine, "after_cursor_execute")
93 def _after_cursor_execute(
94 conn: SyncConnection, cursor: _DBAPICursor, statement: str, parameters: _ExecParams,
95 context: _ExecContext, executemany: bool
96 ) -> None:
97 elapsed_ms = (time.monotonic() - conn.info.get("query_start_time", 0)) * 1000
98 if elapsed_ms >= _slow_ms:
99 logger.warning(
100 "SLOW QUERY (%.1f ms): %.200s",
101 elapsed_ms,
102 statement.replace("\n", " "),
103 )
104
105 _async_session_factory = async_sessionmaker(
106 bind=_engine,
107 class_=AsyncSession,
108 expire_on_commit=False,
109 )
110
111 # Import models so relationships resolve even though Alembic owns DDL.
112 from musehub.db import muse_cli_models # noqa: F401
113 from musehub.db.database import Base # noqa: PLC0415
114 from musehub.db.schema_check import assert_schema_matches_orm # noqa: PLC0415
115
116 await assert_schema_matches_orm(_engine, Base)
117 logger.info("✅ Database initialised — ORM and schema are in sync")
118
119 def get_engine() -> AsyncEngine:
120 """Return the module-level engine. Raises if init_db() has not been called."""
121 if _engine is None:
122 raise RuntimeError("Database not initialized. Call init_db() first.")
123 return _engine
124
125 async def close_db() -> None:
126 """Close database connection."""
127 global _engine, _async_session_factory
128
129 if _engine:
130 await _engine.dispose()
131 _engine = None
132 _async_session_factory = None
133 logger.info("Database connection closed")
134
135 async def get_db() -> AsyncGenerator[AsyncSession, None]:
136 """
137 Dependency for getting async database sessions.
138
139 Usage:
140 @app.get("/users")
141 async def get_users(db: AsyncSession = Depends(get_db)):
142 ...
143 """
144 if _async_session_factory is None:
145 raise RuntimeError("Database not initialized. Call init_db() first.")
146
147 async with _async_session_factory() as session:
148 try:
149 yield session
150 await session.commit()
151 except Exception:
152 await session.rollback()
153 raise
154
155 def AsyncSessionLocal() -> AsyncSession:
156 """
157 Get a new async session directly (for non-FastAPI contexts).
158
159 Usage:
160 async with AsyncSessionLocal() as session:
161 ...
162 """
163 if _async_session_factory is None:
164 raise RuntimeError("Database not initialized. Call init_db() first.")
165 return _async_session_factory()
File History 1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 7 days ago