Thursday, 13 April 2023

Process elements in chuncks using multiprocessing queues

I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.

aq =  Queue() 

........................

The instance in the queue are of class A:

class A:
 id: str
 desc: str

In a function I'm getting elements from the queue aq and process them in chunks. The first element(if is just one) can be a SENTINEL, nothing to process. ....

def process:
  chunk_data = []
  all = [
  item = aq.get()
  if not isinstance(item, A):
    return
  chunk_data.append(item.id)
  while item != SENTINEL:
   # start process in chunks
   # adding elements to the chunk list until is full
    while len(chunk_data) < CHUNK_MAX_SIZE: # 50
      item = aq.get()
      if item == SENTINEL:
        break
      chunk_data.append(item.id)
    # the chunk list is full start processing
    chunk_process_ids = process_data(chunk_data) # process chunks
    all.extend(chunk_process_ids)
    # empty chunk list and start again
    chunk_data.clear()  

The function works as expected but I consider the code to be convoluted, I'm looking for a simple, clearer version.



from Process elements in chuncks using multiprocessing queues

No comments:

Post a Comment