I have an use case which consists of loading huge tables from Oracle to Snowflake. The Oracle server sits far away from Snowflake endpoint, so we do have connection issues when loading tables (views in fact) bigger than 12 GB by spool script or cx_oracle.
I was thinking of using ThreadPoolExecutor with 4 threads max., to test, and use SessionPool. With this, I get a connection per thread, that's the whole point. So, this means I would have to distribute the data fetch by batches for each thread. My question is: how can I achieve this? Is it correct to do something like: "Select * from table where rownum between x and y" (not this syntax, I know...but you get my point), should I rely on OFFSET, ...?
My idea was that each thread gets a "slice" of select , fetches data by batches and writerows to csv in batches as well, because I'll rather have small files then a huge file, to send to snowflake.
def query(start_off, pool):
start_conn = datetime.now()
con = pool.acquire()
end_conn = datetime.now()
print(f"Conn/Acquire time: {end_conn-start_conn}")
with con.cursor() as cur:
start_exec_ts = datetime.now()
cur.execute(QUERY, start_pos=start_off, end_pos=start_off+(OFFSET-1))
end_exec_ts = datetime.now()
rows = cur.fetchall()
end_fetch_ts = datetime.now()
total_exec_ts = end_exec_ts-start_exec_ts
total_fetch_ts = end_fetch_ts-end_exec_ts
print(f"Exec time : {total_exec_ts}")
print(f"Fetch time : {total_fetch_ts}")
print(f"Task executed {threading.current_thread().getName()}, {threading.get_ident()}")
return rows
def main():
pool = cx_Oracle.SessionPool(c.oracle_conn['oracle']['username'],
c.oracle_conn['oracle']['password'],
c.oracle_conn['oracle']['dsn'],
min=2, max=4, increment=1,
threaded=True,
getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(query, d, pool) for d in range(1,13,OFFSET)]
for future in as_completed(futures):
# process your records from each thread
print(repr(future.result()))
# process_records(future.result())
if __name__ == '__main__':
main()
Also, using fetchMany in query funcion , how could I send back the results so I can process them each time?
from cx_oracle ThreadPoolExecutor with SessionPool and distributing loads for each Thread
No comments:
Post a Comment