I'll try to be as complete as possible in this issue.
I'm using Sanic, an ASGI Python framework, and I built a Database manager on top of this.
This database manager uses the ContextVar
to give access to my current db
instance everywhere in the code.
Here's the code related to the database:
database.py
# -*- coding:utf-8 -*-
from sqlalchemy import exc, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import Pool, QueuePool, NullPool
from sqlalchemy.exc import OperationalError
from contextvars import ContextVar
from sentry_sdk import push_scope, capture_exception
from sanic import Sanic
class EngineNotInitialisedError(Exception):
pass
class DBSessionContext:
def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:
self.read_session = read_session
self.write_session = write_session
self.commit_on_exit = commit_on_exit
self.token = None
self._read = None
self._write = None
def _disable_flush(self, *args, **kwargs):
raise NotImplementedError('Unable to flush a read-only session.')
async def close(self, exc_type=None, exc_value=None, traceback=None):
if self._write:
try:
if exc_value and getattr(exc_value, 'status_code', 500) > 300:
await self._write.rollback()
else:
await self._write.commit()
except Exception as e:
pass
try:
await self._write.close()
except OperationalError as e:
if e.orig.args[0] != 2013: # Lost connection to MySQL server during query
raise e
if self._read:
try:
await self._read.close()
except OperationalError as e:
if e.orig.args[0] != 2013: # Lost connection to MySQL server during query
raise e
def set_token(self, token):
self.token = token
@property
def read(self) -> Session:
if not self._read:
self._read = self.read_session()
self._read.flush = self._disable_flush
return self._read
@property
def write(self) -> Session:
if not self._write:
self._write = self.write_session()
return self._write
class AsyncSession(SQLAlchemyAsyncSession):
async def execute(self, statement, **parameters):
return await super().execute(statement, parameters)
async def first(self, statement, **parameters):
executed = await self.execute(statement, **parameters)
return executed.first()
async def all(self, statement, **parameters):
executed = await self.execute(statement, **parameters)
return executed.all()
class DBSession:
def __init__(self):
self.app = None
self.read_engine = None
self.read_session = None
self.write_engine = None
self.write_session = None
self._session = None
self.context = ContextVar("context", default=None)
self.commit_on_exit = True
def init_app(self, app: Surge) -> None:
self.app = app
self.commit_on_exit = self.app.config.get('DATABASE_COMMIT_ON_EXIT', cast=bool, default=True)
self.read_engine = create_async_engine(
self.app.config.get('DATABASE_READ_URL'),
connect_args={
'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
},
**{
'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': QueuePool, # will be used to create a connection pool instance using the connection parameters given in the URL
# if pool_class is not NullPool:
# if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
'pool_pre_ping': self.app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
# the number of connections to allow in connection pool “overflow”
'max_overflow': self.app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
# the number of connections to keep open inside the connection pool
'pool_size': self.app.config.get('DATABASE_POOL_SIZE', cast=int, default=100),
# this setting causes the pool to recycle connections after the given number of seconds has passed
'pool_recycle': self.app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=3600),
# number of seconds to wait before giving up on getting a connection from the pool
'pool_timeout': self.app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=5),
}
)
# @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
self.read_session = sessionmaker(
bind=self.read_engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=False,
autocommit=False
)
self.write_engine = create_async_engine(
self.app.config.get('DATABASE_WRITE_URL'),
connect_args={
'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
},
**{
'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': NullPool, # will be used to create a connection pool instance using the connection parameters given in the URL
}
)
self.write_session = sessionmaker(
bind=self.write_engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=True
)
async def __aenter__(self):
session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)
session_ctx.set_token(self.context.set(session_ctx))
return session_ctx
async def __aexit__(self, exc_type, exc_value, traceback):
session_ctx = self.context.get()
try:
await session_ctx.close(exc_type, exc_value, traceback)
except Exception:
pass
self.context.reset(session_ctx.token)
@property
def read(self) -> Session:
return self.context.get().read
@property
def write(self) -> Session:
return self.context.get().write
@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
'''Listener for Pool checkout events that pings every connection before using.
Implements pessimistic disconnect handling strategy. See also:
http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''
cursor = dbapi_con.cursor()
try:
cursor.execute("SELECT 1")
except exc.OperationalError as ex:
if ex.args[0] in (2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
2055): # Lost connection to MySQL server at '%s', system error: %d
raise exc.DisconnectionError() # caught by pool, which will retry with a new connection
else:
raise
cursor.close()
db = DBSession()
This configuration allows me to run something like:
from models import User
from database import db
@app.get('/user')
async def get_user(request):
async with db:
users = User.find_all() # Special function in the Model that returns all users
return json({'items': [{'id': x.id for x in users}])
The __aenter__
and mostly the __aexit__
from the DBSession class (and the subsequent DBSessionContext
) handles everything when the code quit the async with
, including any exceptions if they occurred.
The issue I'm having, is that from time to time, I have the following error reported at Sentry:
The garbage collector is trying to clean up connection <AdaptedConnection <asyncmy.connection.Connection object at 0x7f290c50dd30>>. This feature is unsupported on unsupported on asyncio dbapis that lack a "terminate" feature, since no IO can be performed at this stage to reset the connection. Please close out all connections when they are no longer used, calling
close()
or using a context manager to manage their lifetime.
I don't understand why this is happening. Even more odd is that I often get this error on a function call that doesn't use the database at all (the async with db
is still present, but the inside doesn't use the database at all).
The content of that function is network call:
import requests
@app.get('/notify')
async def get_user(request):
async with db:
requests.post('https://service.com/notify', data={'some': 'data'})
return text('ok')
Here are my assumptions, but I'm hoping to have a clearer view on the issue:
- Assumption 1: Since the read is using a QueuePool, maybe the
__aexit__
call toclose
doesn't really close the connection, and as such, the connection remain open, causing the "The garbage collector is trying to clean up connection" issue later on. - Assumption 2: The connection is made at the
check_connection
and remains open, causing the "garbage collector" issue
Any idea why I'm having that "garbage collector" issue?
I'm using :
- sanic==22.9.0
- sqlalchemy[asyncio]==1.4.41
- asyncmy==0.2.5
from The garbage collector is trying to clean up connection asyncmy.connection.Connection
No comments:
Post a Comment