Thursday, 1 September 2022

Run a custom task asynchronously in airflow using existing celery

I have a running airflow with celery and redis. This by default sends dag's task to celery worker. I want to run a custom task from one of DAG's task from python code.

In tasks.py I have following code.

from airflow.configuration import conf
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from celery import Celery
from celery import shared_task



if conf.has_option('celery', 'celery_config_options'):
    celery_configuration = conf.getimport('celery', 'celery_config_options')
else:
    celery_configuration = DEFAULT_CELERY_CONFIG

app = Celery(conf.get('celery', 'CELERY_APP_NAME'), config_source=celery_configuration,include=["dags.tasks"])
app.autodiscover_tasks(force=True)
print("here")
print(conf.get('celery', 'CELERY_APP_NAME'))
print(celery_configuration)
print(app)
@app.task(name='maximum')
def maximum(x=10, y=11):
    #print("here")
    print(x)
    if x > y:
        return x
    else:
        return y

tasks = app.tasks.keys()
print(tasks)

I am calling this from one of the DAG's task.

    max=maximum.apply_async(kwargs={'x':5, 'y':4})
    print(max)
    print(max.get(timeout=5))

I am geting

  File "/home/airflow/.local/lib/python3.7/site-packages/celery/result.py", line 336, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/home/airflow/.local/lib/python3.7/site-packages/celery/result.py", line 329, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/vine/promises.py", line 234, in throw
    reraise(type(exc), exc, tb)
  File "/home/airflow/.local/lib/python3.7/site-packages/vine/utils.py", line 30, in reraise
    raise value
celery.exceptions.NotRegistered: 'maximum'

In the registered tasks from above I am getting :

tasks = app.tasks.keys()
print(tasks)

output

dict_keys(['celery.chunks', 'airflow.executors.celery_executor.execute_command', **'maximum'**, 'celery.backend_cleanup', 'celery.chord_unlock', 'celery.group', 'celery.map', 'celery.accumulate', 'celery.chain', 'celery.starmap', 'celery.chord'])

Maximum is there in registered tasks.



from Run a custom task asynchronously in airflow using existing celery

No comments:

Post a Comment