Monday, 3 December 2018

Shutdown infinite async generator

Reproducible error

I tried to reproduce the error in an online REPL here. However, it is not exactly the same implementation (and hence behavior) as my real code (where I do async for response in position_stream(), instead of for position in count() in the REPL).

More details on my actual implementation

I define somewhere a coroutine like so:

async def position(self):
    request = telemetry_pb2.SubscribePositionRequest()
    position_stream = self._stub.SubscribePosition(request)

    try:
        async for response in position_stream:
            yield Position.translate_from_rpc(response)
    finally:
        position_stream.cancel()

where position_stream is infinite (or possibly very long lasting). I use it from an example code like this:

async def print_altitude():
    async for position in drone.telemetry.position():
        print(f"Altitude: {position.relative_altitude_m}")

and print_altitude() is run on the loop with:

asyncio.ensure_future(print_altitude())
asyncio.get_event_loop().run_forever()

That works well. Now, at some point, I'd like to close the stream from the caller. I thought that I could just run asyncio.ensure_future(loop.shutdown_asyncgens()) and wait for my finally close above to get called, but it doesn't happen.

Instead, I receive a warning on an unretrieved exception:

Task exception was never retrieved
future: <Task finished coro=<print_altitude() done, defined at [...]

Why is that, and how can I make it such that all my async generators actually get closed (and run their finally clause)?



from Shutdown infinite async generator

No comments:

Post a Comment