Saturday 26 September 2020

asyncio queue.get() gets stuck

I am working on a project that needs to do some tasks asynchronously, but I am limited to using the least amount of extra subprocesses, for that I decided to have 2 processes: dispatcher_p and worker_p. Since I am making use of the async library I have two async tasks: async_worker and async_task. The program works as follows:

  1. worker_p starts the event loop with a single task: async_worker
  2. dispatcher_p waits on a queue for incoming data
  3. dispatcher_p adds data to an async_queue via put_nowait()
  4. async_worker which is awaiting the async_queue gets the data and starts a task using async_task and calls task_done()
  5. For simplicity async_task just sleeps for 3 seconds and exits.

For simplicity lets strip out the sub task that async_worker should start, leaving us with the following code:

import multiprocessing as mp
import asyncio

async def task_worker(aq):
    while True:
        task = await aq.get()
        if task is not None:
            await asyncio.sleep(3)
            aq.task_done()
        else:
            break

def dispatcher_p(ev_l, q, async_q):
    asyncio.set_event_loop(ev_l)
    while True:
        task = q.get()
        if task is not None:
            async_q.put_nowait(task)
        else:
            async_q.put(None)
            break

def worker_p(ev_l, aq):
    asyncio.set_event_loop(ev_l)
    ev_l.run_until_complete(asyncio.gather(task_worker(aq)))
    ev_l.close()

q = mp.Queue()

def put_task(data):
    q.put(data)

def init():
    event_loop = asyncio.get_event_loop()
    aq = asyncio.Queue()

    p1 = mp.Process(target=worker_p, args=(event_loop, aq,))
    p2 = mp.Process(target=dispatcher_p, args=(event_loop, q, aq))

    p1.start()
    p2.start()

    # Test
    put_task("hi")
    put_task("bye")
    put_task("test")
    put_task(None)

if __name__ == '__main__':
    init()

The problem is that even though task_worker is running in the event_loop, it freezes at task = await aq.get(). Why does this happen? I still don't understand how asyncio works across several processes.



from asyncio queue.get() gets stuck

No comments:

Post a Comment