I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.
aq = Queue()
........................
The instance in the queue are of class A:
class A:
id: str
desc: str
In a function I'm getting elements from the queue aq and process them in chunks. The first element(if is just one) can be a SENTINEL, nothing to process. ....
def process:
chunk_data = []
all = [
item = aq.get()
if not isinstance(item, A):
return
chunk_data.append(item.id)
while item != SENTINEL:
# start process in chunks
# adding elements to the chunk list until is full
while len(chunk_data) < CHUNK_MAX_SIZE: # 50
item = aq.get()
if item == SENTINEL:
break
chunk_data.append(item.id)
# the chunk list is full start processing
chunk_process_ids = process_data(chunk_data) # process chunks
all.extend(chunk_process_ids)
# empty chunk list and start again
chunk_data.clear()
The function works as expected but I consider the code to be convoluted, I'm looking for a simple, clearer version.
from Process elements in chuncks using multiprocessing queues
No comments:
Post a Comment