Tuesday, 25 October 2022

The garbage collector is trying to clean up connection asyncmy.connection.Connection

I'll try to be as complete as possible in this issue.

I'm using Sanic, an ASGI Python framework, and I built a Database manager on top of this.

This database manager uses the ContextVar to give access to my current db instance everywhere in the code.

Here's the code related to the database:

database.py

# -*- coding:utf-8 -*-

from sqlalchemy import exc, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import Pool, QueuePool, NullPool
from sqlalchemy.exc import OperationalError
from contextvars import ContextVar
from sentry_sdk import push_scope, capture_exception
from sanic import Sanic


class EngineNotInitialisedError(Exception):
    pass


class DBSessionContext:
    def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:
        self.read_session = read_session
        self.write_session = write_session
        self.commit_on_exit = commit_on_exit

        self.token = None
        self._read = None
        self._write = None

    def _disable_flush(self, *args, **kwargs):
        raise NotImplementedError('Unable to flush a read-only session.')

    async def close(self, exc_type=None, exc_value=None, traceback=None):
        if self._write:
            try:
                if exc_value and getattr(exc_value, 'status_code', 500) > 300:
                    await self._write.rollback()
                else:
                    await self._write.commit()
            except Exception as e:
                pass

            try:
                await self._write.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

        if self._read:
            try:
                await self._read.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

    def set_token(self, token):
        self.token = token

    @property
    def read(self) -> Session:
        if not self._read:
            self._read = self.read_session()
            self._read.flush = self._disable_flush

        return self._read

    @property
    def write(self) -> Session:
        if not self._write:
            self._write = self.write_session()

        return self._write


class AsyncSession(SQLAlchemyAsyncSession):
    async def execute(self, statement, **parameters):
        return await super().execute(statement, parameters)

    async def first(self, statement, **parameters):
        executed = await self.execute(statement, **parameters)
        return executed.first()

    async def all(self, statement, **parameters):
        executed = await self.execute(statement, **parameters)
        return executed.all()


class DBSession:
    def __init__(self):
        self.app = None
        self.read_engine = None
        self.read_session = None
        self.write_engine = None
        self.write_session = None
        self._session = None
        self.context = ContextVar("context", default=None)
        self.commit_on_exit = True

    def init_app(self, app: Surge) -> None:
        self.app = app
        self.commit_on_exit = self.app.config.get('DATABASE_COMMIT_ON_EXIT', cast=bool, default=True)

        self.read_engine = create_async_engine(
            self.app.config.get('DATABASE_READ_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
            **{
                'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
                'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
                'poolclass': QueuePool,  # will be used to create a connection pool instance using the connection parameters given in the URL
                # if pool_class is not NullPool:

                # if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
                'pool_pre_ping': self.app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
                # the number of connections to allow in connection pool “overflow”
                'max_overflow': self.app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
                # the number of connections to keep open inside the connection pool
                'pool_size': self.app.config.get('DATABASE_POOL_SIZE', cast=int, default=100),
                # this setting causes the pool to recycle connections after the given number of seconds has passed
                'pool_recycle': self.app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=3600),
                # number of seconds to wait before giving up on getting a connection from the pool
                'pool_timeout': self.app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=5),
            }
        )

        # @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
        self.read_session = sessionmaker(
            bind=self.read_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=False,
            autocommit=False
        )

        self.write_engine = create_async_engine(
            self.app.config.get('DATABASE_WRITE_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
            **{
                'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
                'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
                'poolclass': NullPool,  # will be used to create a connection pool instance using the connection parameters given in the URL
            }
        )

        self.write_session = sessionmaker(
            bind=self.write_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=True
        )

    async def __aenter__(self):
        session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)
        session_ctx.set_token(self.context.set(session_ctx))

        return session_ctx

    async def __aexit__(self, exc_type, exc_value, traceback):
        session_ctx = self.context.get()
        try:
            await session_ctx.close(exc_type, exc_value, traceback)
        except Exception:
            pass

        self.context.reset(session_ctx.token)

    @property
    def read(self) -> Session:
        return self.context.get().read

    @property
    def write(self) -> Session:
        return self.context.get().write


@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
    '''Listener for Pool checkout events that pings every connection before using.
    Implements pessimistic disconnect handling strategy. See also:
    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

    cursor = dbapi_con.cursor()
    try:
        cursor.execute("SELECT 1")
    except exc.OperationalError as ex:
        if ex.args[0] in (2006,   # MySQL server has gone away
                          2013,   # Lost connection to MySQL server during query
                          2055):  # Lost connection to MySQL server at '%s', system error: %d
            raise exc.DisconnectionError()  # caught by pool, which will retry with a new connection
        else:
            raise

    cursor.close()


db = DBSession()

This configuration allows me to run something like:

from models import User
from database import db

@app.get('/user')
async def get_user(request):
    async with db:
        users = User.find_all()  # Special function in the Model that returns all users
        return json({'items': [{'id': x.id for x in users}])

The __aenter__ and mostly the __aexit__ from the DBSession class (and the subsequent DBSessionContext) handles everything when the code quit the async with, including any exceptions if they occurred.

The issue I'm having, is that from time to time, I have the following error reported at Sentry:

The garbage collector is trying to clean up connection <AdaptedConnection <asyncmy.connection.Connection object at 0x7f290c50dd30>>. This feature is unsupported on unsupported on asyncio dbapis that lack a "terminate" feature, since no IO can be performed at this stage to reset the connection. Please close out all connections when they are no longer used, calling close() or using a context manager to manage their lifetime.

I don't understand why this is happening. Even more odd is that I often get this error on a function call that doesn't use the database at all (the async with db is still present, but the inside doesn't use the database at all).

The content of that function is network call:

import requests

@app.get('/notify')
async def get_user(request):
    async with db:
        requests.post('https://service.com/notify', data={'some': 'data'})

    return text('ok')

Here are my assumptions, but I'm hoping to have a clearer view on the issue:

  • Assumption 1: Since the read is using a QueuePool, maybe the __aexit__ call to close doesn't really close the connection, and as such, the connection remain open, causing the "The garbage collector is trying to clean up connection" issue later on.
  • Assumption 2: The connection is made at the check_connection and remains open, causing the "garbage collector" issue

Any idea why I'm having that "garbage collector" issue?

I'm using :

  • sanic==22.9.0
  • sqlalchemy[asyncio]==1.4.41
  • asyncmy==0.2.5


from The garbage collector is trying to clean up connection asyncmy.connection.Connection

No comments:

Post a Comment