Thursday 29 July 2021

Joblib and other parallel tasks within Airflow

I've used Joblib and Airflow in the past and haven't run into this issue. I'm trying to run a job through Airflow that runs a parallel computation using Joblib. When the Airflow job starts up I see the following warning

UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1

Tracing the warning back to source I see the following function triggering in the joblib package in the LokyBackend class (similar logic is also in the MultiprocessingBackend class)

def effective_n_jobs(self, n_jobs):
    """Determine the number of jobs which are going to run in parallel"""
    if n_jobs == 0:
        raise ValueError('n_jobs == 0 in Parallel has no meaning')
    elif mp is None or n_jobs is None:
        # multiprocessing is not available or disabled, fallback
        # to sequential mode
        return 1
    elif mp.current_process().daemon:
        # Daemonic processes cannot have children
        if n_jobs != 1:
            warnings.warn(
                'Loky-backed parallel loops cannot be called in a'
                ' multiprocessing, setting n_jobs=1',
                stacklevel=3)
        return 1

The issue is that I've run a similar function in Joblib and Airflow before and didn't trigger this condition to set n_jobs equal to 1. Wondering if this is some type of versioning issue (using Airflow 2.X and Joblib 1.X) or if there are settings in Airflow that can fix this. I looked at older versions of Joblib and even downgraded to Joblib 0.4.0 but that didn't solve any issues. I'm more hesitant to downgrade Airflow because of differences in the api, database connections etc.


Edit:

Here is the code I've been running in Airflow:

def test_parallel():
    out=joblib.Parallel(n_jobs=-1, backend="loky")(joblib.delayed(lambda a: a+1)(i) for i in range(20))

with DAG("test", default_args=DEFAULT_ARGS, schedule_interval="0 8 * * *",) as test:
    run_test = PythonOperator(
        task_id="test",
        python_callable=test_parallel,
    )

    run_test

And the output in the airflow logs:

[2021-07-27 10:41:29,890] {logging_mixin.py:104} WARNING - /data01/code/virtualenv/alpha/lib/python3.8/site-packages/joblib/parallel.py:733 UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1

I launch airflow scheduler and airflow webserver via supervisor. However, even if I launch both airflow processes from the command line the issue still persists. It doesn't happen, however, when I just run the task via the airflow task api e.g. airflow tasks test run_test



from Joblib and other parallel tasks within Airflow

No comments:

Post a Comment