Wednesday, 12 July 2023

Data is duplicated three times when inserting in MySQL DB using Multiprocessing package : Billiard

I am running an Airflow job to load data into a table. The task is :

  • query a database -> get results in pandas data frame -> pass the result set to a worker processes -> each worker process process the rows and load data into a different database.

The following is a simplified version of the DAG file

import process
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator

LOADING = PythonOperator(
            task_id='LOADING',
            python_callable=process,
            op_kwargs={
                'source_DB': MySqlHook(mysql_conn_id='source_DB'),
                'destination_DB': MySqlHook(mysql_conn_id='destination_DB')
            },
            dag=dag,
        )

start >> LOADING >> end

This is the code of the task:

import os
import logging
import billiard as mp

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def process(source_DB, destination_DB):

    get_data = open(f"{CUR_DIR}/path/to/get_data.sql").read()

    data = source_DB.get_pandas_df(
        sql=get_data,
        parameters={}
    )

    with mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(destination_DB,)) as pool:
        items = [(idx, row) for idx, row in data.iterrows()]
        pool.map(load_data, items)


def init_worker(destination_DB):
    global conn
    conn = destination_DB.get_conn()


def load_data(args):

    index, data = args
    insert_sql = open(
        f"{CUR_DIR}/path/to/insert.sql").read()

    conn.autocommit(True)
    destination_DB_cur = conn.cursor()

    params = {
        'para1': data['para1'],
        'para2': data['para2']
    }
    for word, replacement in params.items():
        insert_sql = insert_sql.replace(
            ' + str(word) + ', str(replacement))

    try:
        destination_DB_cur.execute(insert_sql)
    except Exception as e:
        print(e)
    destination_DB_cur.close()

The Job is working fine without any error, but I have noticed that sometimes the loaded data is duplicated 3 times.

I did some research and some say it has to do with the billiard library, others say I have to use connection pooling to insure synchronization and coordination.

Can someone please help me understand the issue exactly and what to do to prevent it from happening



from Data is duplicated three times when inserting in MySQL DB using Multiprocessing package : Billiard

No comments:

Post a Comment