"""Postgres DB realization""" from sqlalchemy.ext import asyncio from server.config import AppConfig from server.infra.db.abc import AbstractDB, ConnectError class AsyncDB(AbstractDB): engine: asyncio.AsyncEngine async_session: asyncio.async_sessionmaker[asyncio.AsyncSession] def __init__(self, cnf: AppConfig): con_arg = {} if "postgresql+asyncpg" in str(cnf.db_uri): con_arg = {"server_settings": {"search_path": cnf.db_search_path}} self.engine = asyncio.create_async_engine( str(cnf.db_uri), echo=bool(cnf.app_debug), connect_args=con_arg, pool_recycle=1800 ) # self.engine.execution_options(stream_results=True) if self.engine is None: raise ConnectError session = asyncio.async_sessionmaker(self.engine, expire_on_commit=False) if session is None: raise ConnectError self.async_session = session async def connect(self): """connect with DB""" pass @property def session(self): return asyncio.async_sessionmaker(self.engine, expire_on_commit=False) def new_session(self): return asyncio.async_sessionmaker(self.engine, expire_on_commit=False)()