Thursday, 13 July 2023

ThreadPoolExecutor with stateful workers

I'm working with a Backend class which spawns a subprocess to perform the CPU-bound work. I have no control over that class and basically the only way of interaction is to create an instance backend = Backend() and submit work via backend.run(data) (this in turn submits the work to the subprocess and blocks until completion). Because these computations take quite some time, I'd like to perform them in parallel. Since the Backend class already spawns its own subprocess to perform the actual work, this appears to be an IO-bound situation.

So I thought about using multiple threads, each of which uses its own Backend instance. I could create these threads manually and connect them via queues. The following is an example implementation with some Backend mock class:

import os
import pty
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


class Backend:
    def __init__(self):
        f, g = pty.openpty()
        self.process = Popen(
            ['bash'],  # example program
            text=True, bufsize=1, stdin=PIPE, stdout=g)
        self.write = self.process.stdin.write
        self.read = os.fdopen(f).readline

    def __enter__(self):
        self.write('sleep 2\n')  # startup work
        return self

    def __exit__(self, *exc):
        self.process.stdin.close()
        self.process.kill()

    def run(self, x):
        self.write(f'sleep {x} && echo "ok"\n')  # perform work
        return self.read().strip()


class Worker(Thread):
    def __init__(self, inq, outq, **kwargs):
        super().__init__(**kwargs)
        self.inq = inq
        self.outq = outq

    def run(self):
        with Backend() as backend:
            while True:
                data = self.inq.get()
                result = backend.run(data)
                self.outq.put((data, result))


task_queue = Queue()
result_queue = Queue()

n_workers = 3
threads = [Worker(task_queue, result_queue, daemon=True) for _ in range(n_workers)]
for thread in threads:
    thread.start()

data = [2]*7
for x in data:
    task_queue.put(x)

for _ in data:
    print(f'Result ready: {result_queue.get()}')

Since the Backend needs to perform some work at startup, I don't want to create a new instance for each task. Hence each Worker creates one Backend instance for its whole life cycle. It's also important that each of the workers has its own backend, so they won't interfere with each other.

Now here's the question: Can I also use concurrent.futures.ThreadPoolExecutor to accomplish this? It looks like the Executor.map method would be the right candidate, but I can't figure out how to ensure that each worker receives its own instance of Backend (which needs to be persistent between tasks).



from ThreadPoolExecutor with stateful workers

No comments:

Post a Comment