I have a running airflow with celery and redis. This by default sends dag's task to celery worker. I want to run a custom task from one of DAG's task from python code.
In tasks.py I have following code.
from airflow.configuration import conf
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from celery import Celery
from celery import shared_task
if conf.has_option('celery', 'celery_config_options'):
celery_configuration = conf.getimport('celery', 'celery_config_options')
else:
celery_configuration = DEFAULT_CELERY_CONFIG
app = Celery(conf.get('celery', 'CELERY_APP_NAME'), config_source=celery_configuration,include=["dags.tasks"])
app.autodiscover_tasks(force=True)
print("here")
print(conf.get('celery', 'CELERY_APP_NAME'))
print(celery_configuration)
print(app)
@app.task(name='maximum')
def maximum(x=10, y=11):
#print("here")
print(x)
if x > y:
return x
else:
return y
tasks = app.tasks.keys()
print(tasks)
I am calling this from one of the DAG's task.
max=maximum.apply_async(kwargs={'x':5, 'y':4})
print(max)
print(max.get(timeout=5))
I am geting
File "/home/airflow/.local/lib/python3.7/site-packages/celery/result.py", line 336, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/home/airflow/.local/lib/python3.7/site-packages/celery/result.py", line 329, in throw
self.on_ready.throw(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/vine/promises.py", line 234, in throw
reraise(type(exc), exc, tb)
File "/home/airflow/.local/lib/python3.7/site-packages/vine/utils.py", line 30, in reraise
raise value
celery.exceptions.NotRegistered: 'maximum'
In the registered tasks from above I am getting :
tasks = app.tasks.keys()
print(tasks)
output
dict_keys(['celery.chunks', 'airflow.executors.celery_executor.execute_command', **'maximum'**, 'celery.backend_cleanup', 'celery.chord_unlock', 'celery.group', 'celery.map', 'celery.accumulate', 'celery.chain', 'celery.starmap', 'celery.chord'])
Maximum is there in registered tasks.
from Run a custom task asynchronously in airflow using existing celery
No comments:
Post a Comment