I am running an Airflow job to load data into a table. The task is :
- query a database -> get results in pandas data frame -> pass the result set to a worker processes -> each worker process process the rows and load data into a different database.
The following is a simplified version of the DAG file
import process
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator
LOADING = PythonOperator(
task_id='LOADING',
python_callable=process,
op_kwargs={
'source_DB': MySqlHook(mysql_conn_id='source_DB'),
'destination_DB': MySqlHook(mysql_conn_id='destination_DB')
},
dag=dag,
)
start >> LOADING >> end
This is the code of the task:
import os
import logging
import billiard as mp
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
def process(source_DB, destination_DB):
get_data = open(f"{CUR_DIR}/path/to/get_data.sql").read()
data = source_DB.get_pandas_df(
sql=get_data,
parameters={}
)
with mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(destination_DB,)) as pool:
items = [(idx, row) for idx, row in data.iterrows()]
pool.map(load_data, items)
def init_worker(destination_DB):
global conn
conn = destination_DB.get_conn()
def load_data(args):
index, data = args
insert_sql = open(
f"{CUR_DIR}/path/to/insert.sql").read()
conn.autocommit(True)
destination_DB_cur = conn.cursor()
params = {
'para1': data['para1'],
'para2': data['para2']
}
for word, replacement in params.items():
insert_sql = insert_sql.replace(
' + str(word) + ', str(replacement))
try:
destination_DB_cur.execute(insert_sql)
except Exception as e:
print(e)
destination_DB_cur.close()
The Job is working fine without any error, but I have noticed that sometimes the loaded data is duplicated 3 times.
I did some research and some say it has to do with the billiard library, others say I have to use connection pooling to insure synchronization and coordination.
Can someone please help me understand the issue exactly and what to do to prevent it from happening
from Data is duplicated three times when inserting in MySQL DB using Multiprocessing package : Billiard
No comments:
Post a Comment