Tuesday, 14 September 2021

cx_oracle ThreadPoolExecutor with SessionPool and distributing loads for each Thread

I have an use case which consists of loading huge tables from Oracle to Snowflake. The Oracle server sits far away from Snowflake endpoint, so we do have connection issues when loading tables (views in fact) bigger than 12 GB by spool script or cx_oracle.

I was thinking of using ThreadPoolExecutor with 4 threads max., to test, and use SessionPool. With this, I get a connection per thread, that's the whole point. So, this means I would have to distribute the data fetch by batches for each thread. My question is: how can I achieve this? Is it correct to do something like: "Select * from table where rownum between x and y" (not this syntax, I know...but you get my point), should I rely on OFFSET, ...?

My idea was that each thread gets a "slice" of select , fetches data by batches and writerows to csv in batches as well, because I'll rather have small files then a huge file, to send to snowflake.

def query(start_off, pool):
    start_conn = datetime.now()
    con = pool.acquire()
    end_conn = datetime.now()
    print(f"Conn/Acquire time: {end_conn-start_conn}")

    with con.cursor() as cur:
        start_exec_ts = datetime.now()
        cur.execute(QUERY, start_pos=start_off, end_pos=start_off+(OFFSET-1))
        end_exec_ts = datetime.now()
        rows = cur.fetchall()
        end_fetch_ts = datetime.now()
        total_exec_ts = end_exec_ts-start_exec_ts
        total_fetch_ts = end_fetch_ts-end_exec_ts
        print(f"Exec time : {total_exec_ts}")
        print(f"Fetch time : {total_fetch_ts}")
        print(f"Task executed {threading.current_thread().getName()}, {threading.get_ident()}")
    return rows


def main():
    pool = cx_Oracle.SessionPool(c.oracle_conn['oracle']['username'],
                                 c.oracle_conn['oracle']['password'],
                                 c.oracle_conn['oracle']['dsn'],
                                 min=2, max=4, increment=1,
                                 threaded=True,
                                 getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
                                 )

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(query, d, pool) for d in range(1,13,OFFSET)]

        for future in as_completed(futures):
            # process your records from each thread
            print(repr(future.result()))
            # process_records(future.result())
if __name__ == '__main__':
    main()

Also, using fetchMany in query funcion , how could I send back the results so I can process them each time?



from cx_oracle ThreadPoolExecutor with SessionPool and distributing loads for each Thread

No comments:

Post a Comment