I've used Joblib and Airflow in the past and haven't run into this issue. I'm trying to run a job through Airflow that runs a parallel computation using Joblib. When the Airflow job starts up I see the following warning
UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
Tracing the warning back to source I see the following function triggering in the joblib package in the LokyBackend class (similar logic is also in the MultiprocessingBackend class)
def effective_n_jobs(self, n_jobs):
"""Determine the number of jobs which are going to run in parallel"""
if n_jobs == 0:
raise ValueError('n_jobs == 0 in Parallel has no meaning')
elif mp is None or n_jobs is None:
# multiprocessing is not available or disabled, fallback
# to sequential mode
return 1
elif mp.current_process().daemon:
# Daemonic processes cannot have children
if n_jobs != 1:
warnings.warn(
'Loky-backed parallel loops cannot be called in a'
' multiprocessing, setting n_jobs=1',
stacklevel=3)
return 1
The issue is that I've run a similar function in Joblib and Airflow before and didn't trigger this condition to set n_jobs
equal to 1. Wondering if this is some type of versioning issue (using Airflow 2.X and Joblib 1.X) or if there are settings in Airflow that can fix this. I looked at older versions of Joblib and even downgraded to Joblib 0.4.0 but that didn't solve any issues. I'm more hesitant to downgrade Airflow because of differences in the api, database connections etc.
Edit:
Here is the code I've been running in Airflow:
def test_parallel():
out=joblib.Parallel(n_jobs=-1, backend="loky")(joblib.delayed(lambda a: a+1)(i) for i in range(20))
with DAG("test", default_args=DEFAULT_ARGS, schedule_interval="0 8 * * *",) as test:
run_test = PythonOperator(
task_id="test",
python_callable=test_parallel,
)
run_test
And the output in the airflow logs:
[2021-07-27 10:41:29,890] {logging_mixin.py:104} WARNING - /data01/code/virtualenv/alpha/lib/python3.8/site-packages/joblib/parallel.py:733 UserWarning: Loky-backed parallel loops cannot be called in a multiprocessing, setting n_jobs=1
I launch airflow scheduler
and airflow webserver
via supervisor
. However, even if I launch both airflow processes from the command line the issue still persists. It doesn't happen, however, when I just run the task via the airflow task api e.g. airflow tasks test run_test
from Joblib and other parallel tasks within Airflow
No comments:
Post a Comment