Wednesday, 3 March 2021

Problem with Multiprocessing and Deadlocking in Python3

I'm having a problem with my multiprocessing and I'm afraid it's a rather simple fix and I'm just not properly implementing the multiprocessing correctly. I've been researching the things that can cause the problem, but all I'm really finding is people recommending the use of a queue to prevent this, but that doesn't seem to be stopping it (again, I may just be implementing the queue incorrectly) I've been at this a couple of days now and I was hoping I could get some help. Thanks in advance!

import csv
import multiprocessing as mp
import os
import queue
import sys
import time

import connections
import packages
import profiles


def execute_extract(package, profiles, q):
    # This is the package execution for the extract
    # It fires fine and will print the starting message below
    started_at = time.monotonic()
    print(f"Starting {package.packageName}")
    try:
        oracle_connection = connections.getOracleConnection(profiles['oracle'], 1)
        engine = connections.getSQLConnection(profiles['system'], 1)
        path = os.path.join(os.getcwd(), 'csv_data', package.packageName + '.csv')
        cursor = oracle_connection.cursor()

        if os.path.exists(path):
            os.remove(path)

        f = open(path, 'w')
        chunksize = 100000
        offset = 0
        row_total = 0

        csv_writer = csv.writer(f, delimiter='^', lineterminator='\n')
        # I am having to do some data cleansing.  I know this is not the most efficient way to do this, but currently
        # it is what I am limited too 
        while True:
            cursor.execute(package.query + f'\r\n OFFSET {offset} ROWS\r\n FETCH NEXT {chunksize} ROWS ONLY')
            test = cursor.fetchone()
            if test is None:
                break
            else:
                while True:
                    row = cursor.fetchone()
                    if row is None:
                        break
                    else:
                        new_row = list(row)
                        new_row.append(package.sourceId[0])
                        new_row.append('')
                        i = 0
                        for item in new_row:
                            if type(item) == float:
                                new_row[i] = int(item)
                            elif type(item) == str:
                                new_row[i] = item.encode('ascii', 'replace')
                            i += 1
                        row = tuple(new_row)
                        csv_writer.writerow(row)
                        row_total += 1

            offset += chunksize

        f.close()
        # I know that execution is at least reaching this point.  I can watch the CSV files grow as more and more 
        # rows are added to the for all the packages What I never get are either the success message or error message
        # below, and there are never any entries placed in the tables 
        query = f"BULK INSERT {profiles['system'].database.split('_')[0]}_{profiles['system'].database.split('_')[1]}_test_{profiles['system'].database.split('_')[2]}.{package.destTable} FROM \"{path}\" WITH (FIELDTERMINATOR='^', ROWTERMINATOR='\\n');"
        engine.cursor().execute(query)
        engine.commit()

        end_time = time.monotonic() - started_at
        print(
            f"{package.packageName} has completed.  Total rows inserted: {row_total}.  Total execution time: {end_time} seconds\n")
        os.remove(path)
    except Exception as e:

        print(f'An error has occured for package {package.packageName}.\r\n {repr(e)}')

    finally:
        # Here is where I am trying to add an item to the queue so the get method in the main def will pick it up and
        # remove it from the queue 
        q.put(f'{package.packageName} has completed')
        if oracle_connection:
            oracle_connection.close()
        if engine:
            engine.cursor().close()
            engine.close()


if __name__ == '__main__':
    # Setting mp creation type
    ctx = mp.get_context('spawn')
    q = ctx.Queue()

    # For the Etl I generate a list of class objects that hold relevant information profs contains a list of 
    # connection objects (credentials, connection strings, etc) packages contains the information to run the extract 
    # (destination tables, query string, package name for logging, etc) 
    profs = profiles.get_conn_vars(sys.argv[1])
    packages = packages.get_etl_packages(profs)

    processes = []
    # I'm trying to track both individual package execution time and overall time so I can get an estimate on rows 
    # per second 
    start_time = time.monotonic()

    sqlConn = connections.getSQLConnection(profs['system'])
    # Here I'm executing a SQL command to truncate all my staging tables to ensure they are empty and will not 
    # generate any key violations 
    sqlConn.execute(
        f"USE[{profs['system'].database.split('_')[0]}_{profs['system'].database.split('_')[1]}_test_{profs['system'].database.split('_')[2]}]\r\nExec Sp_msforeachtable @command1='Truncate Table ?',@whereand='and Schema_Id=Schema_id(''my_schema'')'")

    # Here is where I start generating a process per package to try and get all packages to run simultaneously
    for package in packages:
        p = ctx.Process(target=execute_extract, args=(package, profs, q,))
        processes.append(p)
        p.start()

    # Here is my attempt at managing the queue.  This is a monstrosity of fixes I've tried to get this to work
    results = []
    while True:
        try:
            result = q.get(False, 0.01)
            results.append(result)
        except queue.Empty:
            pass
        allExited = True
        for t in processes:
            if t.exitcode is None:
                allExited = False
                break
        if allExited & q.empty():
            break

    for p in processes:
        p.join()

    # Closing out the end time and writing the overall execution time in minutes.
    end_time = time.monotonic() - start_time
    print(f'Total execution time of {end_time / 60} minutes.')


from Problem with Multiprocessing and Deadlocking in Python3

No comments:

Post a Comment