Monday, 27 November 2023

Deadlock with asyncio.Semaphore

I have asyncio code that sometimes freezes in a deadlock, which should not be possible in my opinion. As reality always wins over theory, I must obviously be missing something. Can somebody spot a problem in the following code and tell me why it is possible at all that I can run into a deadlock?

async def main():
    sem = asyncio.Semaphore(8)
    loop = asyncio.get_running_loop()
    tasks = []

    #   Wrapper around the 'do_the_work' function to make sure that the
    #   semaphore is released in every case. In my opinion it should be
    #   impossible to leave this code without releasing the semaphore.
    #
    #   But as I can observe a deadlock in real life, I must be missing
    #   something!?
    async def task(**params):
        try:
            return await do_the_work(**params)
        finally:
            #   Whatever happens in do_the_work (that does not crash the whole
            #   interpreter), the semaphore should be released.
            sem.release()

    for params in jobs:
        #   Without the wait_for my code freezes at some point. The do_the_work
        #   function does not take too long, so the 10min timeout is
        #   unrealistic high and just a plausibility check to "proof" the dead
        #   lock.
        try:
            await asyncio.wait_for(sem.acquire(), 60*10)
        except TimeoutError as e:
            raise RuntimeError("Deadlock?") from e

        #   Start the task. Due to the semaphore there can only be 8 tasks
        #   running at the same time.
        tasks.append(loop.create_task(task(**params)))

        #   Check tasks which are already done for an exception. If there was
        #   one just stop immediately and raise it.
        for t in tasks:
            if t.done():
                e = t.exception()
                if e:
                    raise e

    #   If I reach this point, all tasks were scheduled and the results are
    #   ready to be consumed.
    for result in await asyncio.gather(*tasks):
        handle_result(result)


from Deadlock with asyncio.Semaphore

No comments:

Post a Comment