The below method is running multithreading batch download on pandas DataFrame input. Normally it takes 23-29 seconds on average to download 1000 files but sometimes code gets stuck in deadlock. I am not able to pinpoint how to solve this issue. There is one more issue during a longer run this code has a memory leak issue. I am un-allocating all the memory by running a garbage collector if the total memory percentage crosses 40% but still, the memory is not getting clear and it keeps getting piling up
def multithread_download_batch(self, download_meta_data: pd.DataFrame, object_col_name: str = 'pdf', bucket_name: str = None, local_directory_path: Path = None, unique_col_name: str = None, thread_count: int = 20, include_prefix_path: bool = False, thread_join_average_time: int = None) -> str:
"""
Use for Multithread downloading where you can define the number of thread based on the system configuration,
networking and AWS bucket configuration. we need to define proper thread count with trial and error method to
achieve best performance. During Download performance is majorly affected during I/O which are slowest compare
to speed of networking. So based on your system configuration pick best thread count
Parameters
----------
download_meta_data : pd.DataFrame
Dataframe with download information
object_col_name : str, optional
Column name where object info is stored in df, by default 'pdf'
bucket_name : str, optional
Bucket name of AWS Bucket, by default None
local_directory_path : Path, optional
Directory path where files need to be download, by default None
unique_col_name : str, optional
If you want ensure unique number for each row download folder, by default None
thread_count : int, optional
Number of thread that will be used for downloading, by default 20
Returns
-------
str
Returns time taken for downloading this dataframe request
"""
import queue
import threading
# Reading value from config file if None is given
if bucket_name is None:
bucket_name = self.config_dict['source_bucket_name']
# If local directory path is custom
custom_local_directory_path_flag = False
if unique_col_name != None:
custom_local_directory_path_flag = True
else:
include_prefix_path = True
# Downloading current batch of gruops
start_time = time.time()
# Creating download queue
download_queue = queue.Queue()
# Variable denoting the run state of download operation
running = False
def download_queue_worker():
while running:
try:
if (time.time() - start_time) > 60:
break
# If queue doesnot have any element it will
# raise queue.Empty exception
task_parameters = download_queue.get(timeout=0.01)
if task_parameters is None:
continue
try:
self.download_file(*task_parameters, include_prefix_path=include_prefix_path)
finally:
download_queue.task_done()
except queue.Empty:
pass
except :
log.logger_fw.info("Error while processing item")
# Starting condition for download thread
running = True
# Creating Download Threads
worker_threads = []
for _ in range(thread_count):
# Create and manage the thread
thread = threading.Thread(target=download_queue_worker)
worker_threads.append(thread)
# Start the thread
thread.start()
# Popuplating the download queue
for idx, row in download_meta_data.iterrows():
# Setting Local Directory path
if custom_local_directory_path_flag:
local_directory_path = self.config_dict['download_path']/ \
str(row[unique_col_name])
# Creating task parameter
task_parameters = (row[object_col_name],
bucket_name, local_directory_path)
download_queue.put(task_parameters)
# Waiting for all items to finsish processing
# .task_done() is used to mark .join() that the processing is done.
# NOTE If you use .join() and don't call .task_done() for every processed item,
# your script will hang forever.
download_queue.join()
# Stopping condition for download thread
running = False
# Close worker threads
for thread in worker_threads:
thread.join()
# Free up the memory
for thread in worker_threads:
del thread
del download_queue
del worker_threads
msg = f"Download Time Taken: {time.time() - start_time} seconds taken\n"
print(msg)
return msg
This method is used for downloading a single file just need to pass object name and bucket name.
def download_file(self, object_name: str, bucket_name: str = None, local_directory_path: Path = None, config=None, include_prefix_path: bool = True, minimal_logging: bool = True):
"""Download an S3 object to a file.
Parameters
----------
bucket_name : str
S3 bucket name
prefix : str
AWS S3 path from where you need to fetch the aws s3 ls dump
local_directory_path : Path
Directory where files need to be downloaded
"""
# Creating S3 client with our access key
if self.s3_client is None:
self.config_dict = self._config_read_utility(read_aws=True, read_audit=True)
self.client_config = botocore.config.Config(max_pool_connections=50)
self.s3_client = boto3.client('s3', aws_access_key_id=self.config_dict['aws_access_key_id'],
aws_secret_access_key=self.config_dict['aws_secret_access_key'],
config=self.client_config)
# Reading value from config file if None is given
if bucket_name is None:
bucket_name = self.config_dict['source_bucket_name']
if local_directory_path is None:
local_directory_path = self.config_dict['download_path']
# Type Check
if type(local_directory_path) == 'str':
local_directory_path = Path(local_directory_path)
# Folder check to make sure the local directory exists
# if local directory does not exists then create
if not local_directory_path.exists():
local_directory_path.mkdir(parents=True, exist_ok=True)
log.logger_fw.info(f'S3_Download_Files: Local folder created: {str(local_directory_path)}')
# Downloading file
try:
if include_prefix_path:
file_path = local_directory_path / Path(object_name)
else:
file_path = local_directory_path / Path(object_name).name
# Makding download directory structure based on object name
if not os.path.exists(str(file_path.parent)):
os.makedirs(file_path.parent)
# Checking if file name already exists or not
unique_file_idx = 0
while os.path.exists(str(file_path)):
file_name = file_path.stem + \
f'_{unique_file_idx}' + file_path.suffix
file_path = file_path.parent / file_name
self.s3_client.download_file(
bucket_name, object_name, str(file_path), config)
if not minimal_logging:
msg = f"Download Complete: ['s3://{bucket_name}/{object_name}']"
log.logger_fw.info(msg)
except ClientError as e:
msg = f"Download failed: ['s3://{bucket_name}/{object_name}']"
return (False, e)
return True
The main reason for the deadlock condition I think because of running a flag in the worker thread method which some anomaly can cause the code to go into a deadlock condition. I have made some modifications but I am currently testing
stop_event = threading.Event()
def download_queue_worker(download_queue, start_time, include_prefix_path, stop_event):
while not stop_event.is_set():
try:
if (time.time() - start_time) > 60:
# setting stop event flag to be True for rest of the thread
stop_event.set()
while download_queue.qsize() > 0:
try:
download_queue.get()
except queue.Empty:
pass
download_queue.task_done()
# Forcefully closing the while loop for current thread
break
# If queue doesnot have any element it will
# raise queue.Empty exception
task_parameters = download_queue.get(timeout=0.01)
if task_parameters is None:
continue
try:
self.download_file(*task_parameters, include_prefix_path=include_prefix_path)
finally:
download_queue.task_done()
except queue.Empty:
pass
except TimeoutError as e:
break
except Exception as e:
log.logger_fw.info("Error while processing item")
download_queue.join()
# Stop the workers
stop_event.set()
from Memory leak and stuck in deadlock during AWS boto multithread download
No comments:
Post a Comment