Thursday, 25 February 2021

Run celery tasks concurrently using pytest

I'm trying to integration test a concurrent celery task in my Django app. I want the task to actually run concurrently on a worker during the pytest integration test but I'm having trouble making that work.

Let's say I have the following basic celery task:

@shared_task
def sleep_task(secs):
    print(f"Sleeping for {secs} seconds...")
    for i in range(secs):
        time.sleep(i)
        print(f"\t{i + 1}")
    return "DONE"

Running the task concurrently in a script works fine:

# script
sleep_task.delay(3)
sleep_task.delay(3)

# output
Sleeping for 3 seconds...
Sleeping for 3 seconds...
1
1
2
2
3
3

However, I can't duplicate this asynchronous behavior in a unit test. Within conftest.py I setup the broker and the result backend:

import pytest

pytest_plugins = ("celery.contrib.pytest",)

@pytest.fixture(scope="session")
def celery_config():
    return {"broker_url": "memory://", "result_backend": "rpc://"}

And here is my unit test. celery_session_app and celery_session_worker sets up the celery app and worker used for testing (docs):

def test_concurrent_sleep_tasks(celery_session_app, celery_session_worker):
    sleep_task.delay(3)
    sleep_task.delay(3)

# output
Sleeping for 3 seconds...
1
2
3
Sleeping for 3 seconds...
1
2
3

It appears the tasks are running synchronously. Is there anyway to run the tasks asynchronously?



from Run celery tasks concurrently using pytest

No comments:

Post a Comment