Friday, 14 January 2022

Use trio nursery as a generator for Sever Sent Events with FastAPI?

I'm trying to build a Server Sent Events endpoint with FastAPI but I'm unsure if what I'm trying to accomplish is possible or how I would go about doing it.

Introduction to the problem

Basically let's say I have a run_task(limit, task) async function that sends an async request, makes a transaction, or something similar. Let's say that for each task run_task can return some json data.

I'd like to run multiple tasks (multiple run_task(limit, task)) asynchronously, to do so I'm using trio and nurseries like so:

async with trio.open_nursery() as nursery:
    limit = trio.CapacityLimiter(10)
    for task in tasks:
        nursery.start_soon(run_task, limit, task)

And finally I want to return the results of each task via a FastAPI endpoint

At first I simply created an object containing a list, and passed that object (by reference) to each run_task, when a task was finished I'd push the json data as a dict to the object, and return the whole object via the endpoint once all the tasks were finished.

This works, but I find it inefficient, the client sending the request needs to wait for all the tasks to finish before it can display the data, however some tasks can be quite slow, meaning the data fetched from other tasks just ends up stagnating.

What I would like to accomplish

Whenever a task is finished, I'd want the API to directly return the data of said task (that I would've previously added to the object) so that the client can display said data in realtime.

That's when I discovered what Server Sent Events and Websockets were. Server sent events seemed like the appropriate solution to my problem, as I don't need bidirectional communication.

Since FastAPI is built on Starlette, I decided to use sse-Starlette to build an endpoint with server sent events, to do so I need to build an endpoint like so

@router.get('/stream')
async def runTasks(
        param1: str,
        request: Request
):
    event_generator = status_event_generator(request, param1)
    return EventSourceResponse(event_generator)

The actual problem

As the name status_event_generator implies, sse-starlette needs to return an event generator, and that's where I'm kind of stuck. I'd want the generator to yield the data of a task when it finishes (so that the client can received the data of each task in realtime), however the tasks are within the async trio nursery so I'm unsure how to proceed

As per Is yielding from inside a nursery in an asynchronous generator function bad? it seems (if I understand correctly) that I can't just put a yield in run_task(limit, task) and expect it to work



from Use trio nursery as a generator for Sever Sent Events with FastAPI?

No comments:

Post a Comment