Monday 28 November 2022

Process a large file using Apache Airflow Task Groups

I need to process a zip file(that contains a text file) using task groups in airflow. No. of lines can vary from 1 to 50 Million. I want to read the text file in the zip file process each line and write the processed line to another text file, zip it, update Postgres tables and call another DAG to transmit this new zip file to an SFTP server.

Since a single task can take more time to process a file with millions of lines, I would like to process the file using a task group. That is, a single task in the task group can process certain no. of lines and transform them. For ex. if we receive a file with 15 Million lines, 6 task groups can be called to process 2.5 Million lines each.

But I am confused how to make the task group dynamic and pass the offset to each task. Below is a sample that I tried with fixed offset in islice(),

def start_task(**context):
    print("starting the Main task...")


def apply_transformation(line):
    return f"{line}_NEW"


def task1(**context):
    data = context['dag_run'].conf
    file_name = data.get("file_name")
    with zipfile.ZipFile(file_name) as zf:
        for name in zf.namelist():
            with io.TextIOWrapper(zf.open(name), encoding="UTF-8") as fp:
                for record in islice(fp, 1, 2000000):
                    apply_transformation(record)


def task2(**context):
    data = context['dag_run'].conf
    file_name = data.get("file_name")
    with zipfile.ZipFile(file_name) as zf:
        for name in zf.namelist():
            with io.TextIOWrapper(zf.open(name), encoding="UTF-8") as fp:
                for record in islice(fp, 2000001, 4000000):
                    apply_transformation(record)


def task3(**context):
    data = context['dag_run'].conf
    file_name = data.get("file_name")
    with zipfile.ZipFile(file_name) as zf:
        for name in zf.namelist():
            with io.TextIOWrapper(zf.open(name), encoding="UTF-8") as fp:
                for record in islice(fp, 4000001, 6000000):
                    apply_transformation(record)


def task4(**context):
    data = context['dag_run'].conf
    file_name = data.get("file_name")
    with zipfile.ZipFile(file_name) as zf:
        for name in zf.namelist():
            with io.TextIOWrapper(zf.open(name), encoding="UTF-8") as fp:
                for record in islice(fp, 6000001, 8000000):
                    apply_transformation(record)


def task5(**context):
    data = context['dag_run'].conf
    file_name = data.get("file_name")
    with zipfile.ZipFile(file_name) as zf:
        for name in zf.namelist():
            with io.TextIOWrapper(zf.open(name), encoding="UTF-8") as fp:
                for record in islice(fp, 8000001, 10000000):
                    apply_transformation(record)


def final_task(**context):
    print("This is the final task to update postgres tables and call SFTP DAG...")


with DAG("main",
         schedule_interval=None,
         default_args=default_args, catchup=False) as dag:

    st = PythonOperator(
        task_id='start_task',
        dag=dag,
        python_callable=start_task
    )

    with TaskGroup(group_id='task_group_1') as tg1:
        t1 = PythonOperator(
            task_id='task1',
            python_callable=task1,
            dag=dag,
        )

        t2 = PythonOperator(
            task_id='task2',
            python_callable=task2,
            dag=dag,
        )

        t3 = PythonOperator(
            task_id='task3',
            python_callable=task3,
            dag=dag,
        )

        t4 = PythonOperator(
            task_id='task4',
            python_callable=task4,
            dag=dag,
        )

        t5 = PythonOperator(
            task_id='task5',
            python_callable=task5,
            dag=dag,
        )

    ft = PythonOperator(
        task_id='final_task',
        dag=dag,
        python_callable=final_task
    )

    st >> tg1 >> ft

After applying transformation to each line, I want to get these transformed lines from different tasks and merge them into a new file and do rest of the operations in the final_task.

Or are there any other methods to process large files with millions of lines in parallel?



from Process a large file using Apache Airflow Task Groups

No comments:

Post a Comment