Wednesday, 18 January 2023

Pytest + SqlAlchemy + FastAPI. Cannot Get data added by sync engine using async engine in tests

TL;DR In my application there are 2 parts. First, part is API which only serves data (has multiple GET endpoints). Second, the event consumer consumes from pubsub and writes to db. I am using sync DB for event consumer and async db for API. My code works perfectly fine but tests are failing. In my tests, I insert data using a sync session, then try to get with an async session. However, I get no data returned when I call API using async client (which uses async db). Can you help me to find out the problem?

Edit: I think I need to start async engine from savepoint that sync engine lastly created after inserting all data. Is there a way to pass savepoint to another fixture?


I read at multiple resources and thought that if I create client,db for sync and async, use recipe for Joining a Session into an External Transaction and add my data using sync db session and use async client with async db to query data I will have proper result. But, it seems like I am missing something. Because after I insert I add pdb.set_trace() and check db, data seems to be inserted. However, after switching to async client db seems empty. Since my query results are empty. You can find the code I used below. I think problem is related to my configuration in conftest, if you need more code or any extra information I will share it right away!

conftest.py

import sqlalchemy as sa
from fastapi.testclient import TestClient
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from httpx import AsyncClient
import pytest

@pytest.fixture(scope="session")
def engine(config: TestConfig):
    return sa.create_engine(config.DATABASE_URL)

@pytest.fixture(scope="session")
def async_engine(config: TestConfig):
    return create_async_engine(
        config.ASYNC_DATABASE_URL, pool_size=10, echo=True, max_overflow=10
    )


@pytest.fixture(scope="session")
def TestingSessionLocal(engine):
    return sessionmaker(
        autocommit=False,
        autoflush=False,
        bind=engine,
    )


@pytest.fixture(scope="session")
def TestingSessionLocalAsync(async_engine):
    return sessionmaker(
        bind=async_engine,
        autoflush=False,
        future=True,
        class_=AsyncSession,
        expire_on_commit=False,
    )


@pytest.fixture
def db(engine, TestingSessionLocal):
    connection = engine.connect()
    transaction = connection.begin()
    session = TestingSessionLocal(bind=connection)
    session.begin_nested()

    @sa.event.listens_for(session, "after_transaction_end")
    def end_savepoint(session, transaction):
        if transaction.nested and not transaction._parent.nested:
            session.begin_nested()

    yield session
    # Rollback the overall transaction, restoring the state before the test ran.
    session.close()
    transaction.rollback()
    connection.close()

@pytest.fixture
async def async_db(async_engine, TestingSessionLocalAsync):

    async with async_engine.connect() as conn:
        await conn.begin()
        await conn.begin_nested()

        async_session = TestingSessionLocalAsync()

        @sa.event.listens_for(async_session.sync_session, "after_transaction_end")
        def end_savepoint(session, transaction):
            if conn.closed:
                return
            if not conn.in_nested_transaction():
                conn.sync_connection.begin_nested()

        yield async_session

    await async_session.close()
    await conn.close()

@pytest.fixture
def app(config):
    return create_app(config)

@pytest.fixture
def client(app, db):
    def override_get_db():
        yield db

    app.dependency_overrides[get_db] = override_get_db
    yield TestClient(app)
    del app.dependency_overrides[get_db]
    
@pytest.fixture()
async def async_client(app, async_db):
    def override_get_async_db():
        yield async_db

    app.dependency_overrides[get_async_db] = override_get_async_db
    async with AsyncClient(
        app=app,
        base_url="http://localhost:8080",
        headers={"Content-Type": "application/json"},
    ) as client:
        yield client

    del app.dependency_overrides[get_async_db]

test.py

@pytest.mark.asyncio
async def test_thing(async_client, db: Session):
    
    thing = Thing()
    db.add(thing)
    db.flush()
    db.refresh(thing)
    # db.commit() # Tried adding db.commit() here result has not changed
    res1 = await async_db.execute(select(thing.name).select_from(thing))
    res2 = db.execute(select(thing.name).select_from(thing))
    pdb.set_trace()

    get_thing = await async_client.get(
        f"/thing/{thing.id}",
    )
    assert len(get_thing) == 1

pdb result

enter image description here

It basically fails in assert line in test.py outputing len(get_thing) is 0.



from Pytest + SqlAlchemy + FastAPI. Cannot Get data added by sync engine using async engine in tests

No comments:

Post a Comment