Saturday, 19 December 2020

Completing threads and interacting with results at a different rate

I'd like to get some feedback on an approach for receiving data from multiple threads in a concurrent.futures.ThreadPoolExecutor and iterating over the results. Given the scenario a ThreadPoolExecutor has future thread results appended to a buffer container and a secondary / decoupled operation read and withdraw from the same buffer container.

Thread Manager Workflow

                    /|-> Thread 1 > results \
ThreadPoolExecutor --|-> Thread 2 > results --> Queue [1,2,3] (end) 
                    \|-> Thread 3 > results /
             

Now we have results from the threads in a First-In-First-Out queue container - which needs to be thread-safe. Now the above process is done and results (str|int|bool|list|dict|any) are in the container awaiting processing by the next step: Communicate the gathered results.

Communication Workflow

                                           /|-> Terminal Print
Queue [1,2,3] < Listener > Communicate --|-> Speech Engine Say 
                                           \|-> Write to Log / File

The Communicate class needs to be "listening" on the Queue for new entries, and processing each as they come in at it's own speed (the rate of speech using a text to speech module - Producer-Consumer Problem) and potentially any number of other outputs, so this really can't be invoked from the top-down. If, the Thread Manager calls directly or lets each thread call the Communicate class directly to invoke the Speech Engine we will hear stuttered speech as the speech engine will override itself with each invocation. Thus, we need to decouple the Thread Manager workflow from the Communicate workflow but have them write & read with an In/Out type buffer or Queue and need for a "listener" concept.

I've found references for a structure like the following running as a daemon thread, but the while loop makes me cringe and consumes too much cpu, so I still need a non-blocking approach, where self.pipeline is a queue.Queue object:

    while True :
        try :
            if not self.pipeline.empty ( ) :    
                task = self.pipeline.get ( timeout=1 )
                if task :       
                    self.serve ( task, )
        except queue.Empty :
            continue

Again, in need of something other than a while loop for this...



from Completing threads and interacting with results at a different rate

No comments:

Post a Comment