I have asyncio code that sometimes freezes in a deadlock, which should not be possible in my opinion. As reality always wins over theory, I must obviously be missing something. Can somebody spot a problem in the following code and tell me why it is possible at all that I can run into a deadlock?
async def main():
sem = asyncio.Semaphore(8)
loop = asyncio.get_running_loop()
tasks = []
# Wrapper around the 'do_the_work' function to make sure that the
# semaphore is released in every case. In my opinion it should be
# impossible to leave this code without releasing the semaphore.
#
# But as I can observe a deadlock in real life, I must be missing
# something!?
async def task(**params):
try:
return await do_the_work(**params)
finally:
# Whatever happens in do_the_work (that does not crash the whole
# interpreter), the semaphore should be released.
sem.release()
for params in jobs:
# Without the wait_for my code freezes at some point. The do_the_work
# function does not take too long, so the 10min timeout is
# unrealistic high and just a plausibility check to "proof" the dead
# lock.
try:
await asyncio.wait_for(sem.acquire(), 60*10)
except TimeoutError as e:
raise RuntimeError("Deadlock?") from e
# Start the task. Due to the semaphore there can only be 8 tasks
# running at the same time.
tasks.append(loop.create_task(task(**params)))
# Check tasks which are already done for an exception. If there was
# one just stop immediately and raise it.
for t in tasks:
if t.done():
e = t.exception()
if e:
raise e
# If I reach this point, all tasks were scheduled and the results are
# ready to be consumed.
for result in await asyncio.gather(*tasks):
handle_result(result)
from Deadlock with asyncio.Semaphore
No comments:
Post a Comment