Sunday 31 January 2021

Python JoinableQueue call task_done in other process need twice

I have implemented a WorkerManager based on multiprocessing.Process and JoinableQueue. While I try to handle the process exceptions like timeout or un-handle exceptions after proc.join(timeout), and evaluate proc.exitcode to determine how to handle, and then call in_queue.task_done() to notify the job has done with the exception-handle logic. However it need to invoke twice. I have no idea why it should be called twice. Is there anyone could figure it out the reason here.

The whole code snippet:

# -*- coding=utf-8 -*-

import time
import threading
from queue import Empty
from multiprocessing import Event, Process, JoinableQueue, cpu_count, current_process

TIMEOUT = 3


class WorkersManager(object):

    def __init__(self, jobs, processes_num):
        self._processes_num = processes_num if processes_num else cpu_count()
        self._workers_num = processes_num
        self._in_queue, self._run_queue, self._out_queue = JoinableQueue(), JoinableQueue(), JoinableQueue()
        self._spawned_procs = []
        self._total = 0
        self._stop_event = Event()
        self._jobs_on_procs = {}

        self._wk_kwargs = dict(
            in_queue=self._in_queue, run_queue=self._run_queue, out_queue=self._out_queue,
            stop_event=self._stop_event
        )

        self._in_stream = [j for j in jobs]
        self._out_stream = []
        self._total = len(self._in_stream)

    def run(self):
        # Spawn Worker
        worker_processes = [
            WorkerProcess(i, **self._wk_kwargs) for i in range(self._processes_num)
        ]
        self._spawned_procs = [
            Process(target=process.run, args=tuple())
            for process in worker_processes
        ]

        for p in self._spawned_procs:
            p.start()

        self._serve()

        monitor = threading.Thread(target=self._monitor, args=tuple())
        monitor.start()

        collector = threading.Thread(target=self._collect, args=tuple())
        collector.start()

        self._join_workers()
        # TODO: Terminiate threads
        monitor.join(TIMEOUT)
        collector.join(TIMEOUT)

        self._in_queue.join()
        self._out_queue.join()
        return self._out_stream

    def _join_workers(self):
        for p in self._spawned_procs:
            p.join(TIMEOUT)

            if p.is_alive():
                p.terminate()
                job = self._jobs_on_procs.get(p.name)
                print('Process TIMEOUT: {0} {1}'.format(p.name, job))
                result = {
                    "status": "failed"
                }

                self._out_queue.put(result)
                for _ in range(2):
                    # NOTE: Call task_done twice
                    # Guessing:
                    # 1st time to swtich process?
                    # 2nd time to notify task has done?
                    # TODO: figure it out why?
                    self._in_queue.task_done()
            else:
                if p.exitcode == 0:
                    print("{} exit with code:{}".format(p, p.exitcode))
                else:
                    job = self._jobs_on_procs.get(p.name)
                    if p.exitcode > 0:
                        print("{} with code:{} {}".format(p, p.exitcode, job))
                    else:
                        print("{} been killed with code:{} {}".format(p, p.exitcode, job))

                    result = {
                        "status": "failed"
                    }

                    self._out_queue.put(result)
                    for _ in range(2):
                        # NOTE: Call task_done twice
                        # Guessing:
                        # 1st time to swtich process?
                        # 2nd time to notify task has done?
                        # TODO: figure it out why?
                        self._in_queue.task_done()

    def _collect(self):
        # TODO: Spawn a collector proc
        while True:
            try:
                r = self._out_queue.get()
                self._out_stream.append(r)
                self._out_queue.task_done()

                if len(self._out_stream) >= self._total:
                    print("Total {} jobs done.".format(len(self._out_stream)))
                    self._stop_event.set()
                    break
            except Empty:
                continue

    def _serve(self):
        for job in self._in_stream:
            self._in_queue.put(job)

        for _ in range(self._workers_num):
            self._in_queue.put(None)

    def _monitor(self):
        running = 0
        while True:
            proc_name, job = self._run_queue.get()
            running += 1
            self._jobs_on_procs.update({proc_name: job})
            self._run_queue.task_done()
            if running == self._total:
                break


class WorkerProcess(object):

    def __init__(self, worker_id, in_queue, run_queue, out_queue, stop_event):
        self._worker_id = worker_id
        self._in_queue = in_queue
        self._run_queue = run_queue
        self._out_queue = out_queue
        self._stop_event = stop_event

    def run(self):
        self._work()
        print('worker - {} quit'.format(self._worker_id))

    def _work(self):
        print("worker - {0} start to work".format(self._worker_id))
        job = {}
        while not self._stop_event.is_set():
            try:
                job = self._in_queue.get(timeout=.01)
            except Empty:
                continue

            if not job:
                self._in_queue.task_done()
                break

            try:
                proc = current_process()
                self._run_queue.put((proc.name, job))
                r = self._run_job(job)
                self._out_queue.put(r)
            except Exception as err:
                print('Unhandle exception: {0}'.format(err), exc_info=True)
                result = {"status": 'failed'}
                self._out_queue.put(result)
            finally:
                self._in_queue.task_done()

    def _run_job(self, job):
        time.sleep(job)
        return {
            'status': 'succeed'
        }


def main():

    jobs = [3, 4, 5, 6, 7]
    procs_num = 3
    m = WorkersManager(jobs, procs_num)
    m.run()


if __name__ == "__main__":
    main()

And the issue code as following:

   self._out_queue.put(result)
                    for _ in range(2):
                        # ISSUE HERE !!!
                        # NOTE: Call task_done twice
                        # Guessing:
                        # 1st time to swtich process?
                        # 2nd time to notify task has done?
                        # TODO: figure it out why?
                        self._in_queue.task_done()

I need to invoke the self._in_queue.task_done() twice to notify the JoinableQueue the job has done by the exception-handle logic.

I guess whether task_done() call 1st time was to switch process context? or anything else. according to the testing. the 2nd task_done() has effect.

worker - 0 start to work
worker - 1 start to work
worker - 2 start to work

Process TIMEOUT: Process-1 5
Process TIMEOUT: Process-2 6
Process TIMEOUT: Process-3 7
Total 5 jobs done.

If you call task_done() once, and it will block forever and not to finish.



from Python JoinableQueue call task_done in other process need twice

No comments:

Post a Comment