tux.database.client
¶
Classes:
Name | Description |
---|---|
DatabaseClient | Singleton wrapper around an async SQLAlchemy engine / session factory. |
Classes¶
DatabaseClient()
¶
Singleton wrapper around an async SQLAlchemy engine / session factory.
This class provides a clean async interface for database operations using SQLModel and SQLAlchemy. All interactions go through an :pyclass:~sqlalchemy.ext.asyncio.AsyncSession
.
Methods:
Name | Description |
---|---|
is_connected | Return True if the engine/metadata are initialised. |
connect | Initialise the async engine and create all tables. |
disconnect | Dispose the engine and tear-down the connection pool. |
session | Return an async SQLAlchemy session context-manager. |
transaction | Synonym for :pyfunc: |
Source code in tux/database/client.py
Functions¶
is_connected() -> bool
¶
connect(database_url: str | None = None, *, echo: bool = False) -> None
async
¶
Initialise the async engine and create all tables.
The first call performs initialisation - every subsequent call is a no-op (but will log a warning).
Source code in tux/database/client.py
async def connect(self, database_url: str | None = None, *, echo: bool = False) -> None:
"""Initialise the async engine and create all tables.
The *first* call performs initialisation - every subsequent call is a
no-op (but will log a warning).
"""
if self.is_connected():
logger.warning("Database engine already connected - reusing existing engine")
return
database_url = database_url or os.getenv("DATABASE_URL")
if not database_url:
error_msg = "DATABASE_URL environment variable must be set before connecting to the DB"
raise RuntimeError(error_msg)
# SQLAlchemy async engines expect an async driver (e.g. asyncpg for Postgres)
# If the user provided a sync URL, we attempt to coerce it to async-pg URL.
if database_url.startswith("postgresql://") and "+asyncpg" not in database_url:
database_url = database_url.replace("postgresql://", "postgresql+asyncpg://", 1)
logger.debug(f"Creating async SQLAlchemy engine (echo={echo})")
self._engine = create_async_engine(database_url, echo=echo, future=True)
self._session_factory = async_sessionmaker(self._engine, expire_on_commit=False, class_=AsyncSession)
# Create tables (auto-create helps during development & tests, migrations
# are handled by Alembic).
async with self._engine.begin() as conn: # type: ignore[attr-defined]
await conn.run_sync(SQLModel.metadata.create_all) # type: ignore[attr-defined]
logger.info("Successfully connected to database via SQLModel/SQLAlchemy")
disconnect() -> None
async
¶
Dispose the engine and tear-down the connection pool.
Source code in tux/database/client.py
async def disconnect(self) -> None:
"""Dispose the engine and tear-down the connection pool."""
if not self.is_connected():
logger.warning("Database engine not connected - nothing to disconnect")
return
assert self._engine is not None # mypy
await self._engine.dispose() # type: ignore[attr-defined]
self._engine = None
self._session_factory = None
logger.info("Disconnected from database")
session() -> AsyncGenerator[AsyncSession]
async
¶
Return an async SQLAlchemy session context-manager.
Source code in tux/database/client.py
@asynccontextmanager
async def session(self) -> AsyncGenerator[AsyncSession]:
"""Return an async SQLAlchemy session context-manager."""
if not self.is_connected():
error_msg = "Database engine not initialised - call connect() first"
raise RuntimeError(error_msg)
assert self._session_factory is not None # mypy
async with self._session_factory() as sess: # type: ignore[attr-defined]
try:
yield sess
await sess.commit() # type: ignore[attr-defined]
except Exception:
await sess.rollback() # type: ignore[attr-defined]
raise