I am working on a project that needs to do some tasks asynchronously, but I am limited to using the least amount of extra subprocesses, for that I decided to have 2 processes: dispatcher_p
and worker_p
. Since I am making use of the async library I have two async tasks: async_worker
and async_task
. The program works as follows:
worker_p
starts the event loop with a single task:async_worker
dispatcher_p
waits on a queue for incoming datadispatcher_p
adds data to an async_queue viaput_nowait()
async_worker
which is awaiting the async_queue gets the data and starts a task usingasync_task
and calls task_done()- For simplicity
async_task
just sleeps for 3 seconds and exits.
For simplicity lets strip out the sub task that async_worker
should start, leaving us with the following code:
import multiprocessing as mp
import asyncio
async def task_worker(aq):
while True:
task = await aq.get()
if task is not None:
await asyncio.sleep(3)
aq.task_done()
else:
break
def dispatcher_p(ev_l, q, async_q):
asyncio.set_event_loop(ev_l)
while True:
task = q.get()
if task is not None:
async_q.put_nowait(task)
else:
async_q.put(None)
break
def worker_p(ev_l, aq):
asyncio.set_event_loop(ev_l)
ev_l.run_until_complete(asyncio.gather(task_worker(aq)))
ev_l.close()
q = mp.Queue()
def put_task(data):
q.put(data)
def init():
event_loop = asyncio.get_event_loop()
aq = asyncio.Queue()
p1 = mp.Process(target=worker_p, args=(event_loop, aq,))
p2 = mp.Process(target=dispatcher_p, args=(event_loop, q, aq))
p1.start()
p2.start()
# Test
put_task("hi")
put_task("bye")
put_task("test")
put_task(None)
if __name__ == '__main__':
init()
The problem is that even though task_worker
is running in the event_loop, it freezes at task = await aq.get()
. Why does this happen? I still don't understand how asyncio works across several processes.
from asyncio queue.get() gets stuck
No comments:
Post a Comment