Tuesday, 4 July 2023

multiprocessing.pool.Threadpool stuck - Databricks Notebook

I have an API which is rather slow per single request but i can scale and ask up to 70s per second. Thus, to download all data i split the data into multiple intervals [(intervall_1_lowest_id,intervall_1_highest_id),....(intervall_n_lowest_id, intervall_n_highest_id).

I have a single Thread to determine these ranges. Then the list of ranges is given to ThreadPool to download all data. Data is written to different files. So threads do not write to same files or anything similar. For some reason I do not understand I usually end up having the situation that 99% of work is done, and 1-5 ranges did not successfully load. However, the Threadpool just seems to do nothing. It seems like the threads failed but silently although I retrieve the result.

Some very reduced pseudo_code to show what i am doing is below.

The result seems to be very stable if I let it run for all data. For small amount it works. It always stucks and a small amount of tasks is not finished however it does not seem to be doing anything. I let it even run over weekend just to be sure it's not a super slow connection or similar but no further progress happend.

Environment: Databricks Notebook (11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12))

python: 3.9.5

I have some understanding of concurrency and thus I wonder if there is some kind of silent failling of threads happening or deadlock occurs. Do you have a good idea how to debug such an issue? I try to get better logging right now but i still wonder what's going on. Maybe its something with ipykernel and threading in notebooks?

Update: It seems that dbutils.fs.put is the problem. I can now reproduce quite stable. It somehow goes into deadlock when using it at some point. Even when only uploading very small amount of data. Any idea what could be the issue for dbutils causing this. It also happens when using true processes through multiprocessing instead of Threads

Latest Insight: I guess there is somewhere a memory leak which is causing to silently fail some processes without proper exception thrown. This makes the pool stuck. I could make a very simple example with two processes which i ran in a single cell of a databricks notebook on a small cluster with 14GB RAM. I simply create thousands of session to overwhelm RAM. By doing so what happens for me is that the second process initially does something but then silently crashes. In the end only the first process finished when trying the code below. I would except some sort of notfication on the main process/thread but this seems not be the case?

Code example to reproduce issue on databricks:

import multiprocessing
import logging
import time
import requests
import psutil

def execute_single_task(task):

    session =[]
    max_s = 1000000
    print(f"before:{multiprocessing.current_process().pid} {task}")
    while len(session) < max_s:
        # dumb example just to cause Memory Issue
        session.append(requests.Session())

        if len(session) %1000 == 0:
            print(len(session))
            print(psutil.virtual_memory())

    print(f"after:{multiprocessing.current_process().pid} {task}")
    time.sleep(2)
    return True

def execute_all_tasks(tasks):

    PARALLELIZATION = 2
    print(PARALLELIZATION)
    
    with multiprocessing.Pool(processes=PARALLELIZATION) as pool:

         results = []
         print("with pool")
         for result in pool.imap_unordered(execute_single_task, tasks):
            if result:

                logging.info("Task succeded!")

            else:

                logging.info("Task failed!")
                raise Exception("The entire main process failed.")

        logging.info("All Tasks succeeded!")




if __name__ == "__main__":

    print(f"if name:{__name__}")
    tasks = [1,2]
    execute_all_tasks(tasks)

This is the code example for initial question

from multiprocessing.pool.ThreadPool
import logging
from dataclasses import dataclass

@dataclass
class Task:
  id: int
  range: tuple[int,int]

def download_and_save(range):
    resp=requests.post(....)    
    # I wonder if this is my problem. Can i get some race condition here on databricks dbutils.fs ?
    dbutils.fs.put(some_path, resp.text)

def download_range(task):
    longging.info("some info here and there")
    download_and_save(task.range) #
    return task

def determine_ranges()-> [Task]: # id, range
   #....
   return list_of_tasks

def main():
    
    logging.basicConfig(level=...,format=...)
    tasks = determine_ranges() # Task(id, range)
    remaining_tasks = copy.deepcopy(tasks) 
    with ThreadPool(processes=70) as pool:
        
        for result pool.imap_unordered(dowload_range,tasks):
            print(f"Task with id: {task.id} done")
            if isinstance(result,Task):
               # remove task from remaining
               #....
               logging.info(f"remaining: {remaining_tasks}")
            elif isinstance(result, Exception):
              logging.exception(result)
              raise result
            else: 
              raise ValueError(f"unexpected result: {type(result)}")


from multiprocessing.pool.Threadpool stuck - Databricks Notebook

No comments:

Post a Comment