Reproducible error
I tried to reproduce the error in an online REPL here. However, it is not exactly the same implementation (and hence behavior) as my real code (where I do async for response in position_stream()
, instead of for position in count()
in the REPL).
More details on my actual implementation
I define somewhere a coroutine like so:
async def position(self):
request = telemetry_pb2.SubscribePositionRequest()
position_stream = self._stub.SubscribePosition(request)
try:
async for response in position_stream:
yield Position.translate_from_rpc(response)
finally:
position_stream.cancel()
where position_stream is infinite (or possibly very long lasting). I use it from an example code like this:
async def print_altitude():
async for position in drone.telemetry.position():
print(f"Altitude: {position.relative_altitude_m}")
and print_altitude()
is run on the loop with:
asyncio.ensure_future(print_altitude())
asyncio.get_event_loop().run_forever()
That works well. Now, at some point, I'd like to close the stream from the caller. I thought that I could just run asyncio.ensure_future(loop.shutdown_asyncgens())
and wait for my finally
close above to get called, but it doesn't happen.
Instead, I receive a warning on an unretrieved exception:
Task exception was never retrieved
future: <Task finished coro=<print_altitude() done, defined at [...]
Why is that, and how can I make it such that all my async generators actually get closed (and run their finally
clause)?
from Shutdown infinite async generator
No comments:
Post a Comment