Wednesday, 5 June 2019

Airflow - creating dynamic Tasks from XCOM

I'm attempting to generate a set of dynamic tasks from a XCOM variable. In the XCOM I'm storing a list and I want to use each element of the list to dynamically create a downstream task.

My use case is that I have an upstream operator that checks a sftp server for files and returns a list of file names matching specific criteria. I want to create dynamic downstream tasks for each of the file names returned.

I've simplified it to the below, and while it works I feel like its not an idiomatic airflow solution. In my use case, I would write a python function that's called from a python operator that pulls the value from xcom and returns it, instead of using the pusher function.

I understand that while I can create a custom operator that combines both I don't think creating a throwaway operator is good practice and I'm hoping there's another solution.

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "test",
    "depends_on_past": False,
    "start_date": datetime(2018, 10, 27),
    "email": ["test@mctest.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "email_on_success": False,
    "retries": 0,
    "provide_context": True
}

dag = DAG("test",  default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
    return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
    task_id='pusher_task',
    dag=dag,
    python_callable=pusher  
)

def bash_wrapper(task, **context):
    return BashOperator(
        task_id='dynamic'+task,
        dag=dag,
        bash_command='date'
    )

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end



from Airflow - creating dynamic Tasks from XCOM

No comments:

Post a Comment