Saturday, 21 October 2023

How to fetch data asynchronously from a multiprocessing spawn process in Python?

I have a FastAPI app with a single endpoint /generate. This endpoint takes in requests and puts them into global input_queue. Now the background spawn process called worker gets the data from the input_queue and do some calculations on it real-time and stores the output in a global dictionary output_store.

Now it takes some time for the worker to generate output and store it in global output_store. How to make /generate to wait for the output value asynchronously without using a while statement to check if the output appeared in output_store ?

# server.py

import uuid
import asyncio
import multiprocessing as mp
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel


class RAGRequest(BaseModel):
    msg: str 

async def _bg_process(item):
    chat_id, msg = item['chat_id'], item['msg']
    print(f"{chat_id} is running...")
    await asyncio.sleep(10)     # calculations that take atleast 10s
    print(f"{chat_id} is done.")
    return f"{msg}_output"

#Background process
async def bg_process(item, output_store):
    output =  await _bg_process(item)
    chat_id = item['chat_id']    
    # storing this output in global dictionary
    output_store[chat_id] = {'chat_id': chat_id, "output": output}

async def _worker(input_queue, output_store):
    # creating tasks realtime and running them in the background
    async with asyncio.TaskGroup() as tg:
        while True:
            if not input_queue.empty():
                item = input_queue.get()
                cor = bg_process(item, output_store)
                tg.create_task(cor)
            else:
                await asyncio.sleep(0.1)

# Listener
# Starts the background process that listens to incoming requests and processes them
def worker(input_queue, output_store):
    cor = _worker(input_queue, output_store)
    asyncio.run(cor)

app = FastAPI()

@app.post("/generate")
async def gen(request: RAGRequest):
    global input_queue
    global output_store
    chat_id = str(uuid.uuid1())
    input_queue.put({'chat_id': chat_id, "msg": request.msg})

    # not using while True because it blocks the code

    final_output = output_store[chat_id]['output']   # need help here in fetching this item asyncronously
    return final_output


if __name__ == "__main__":
   
    mp.set_start_method("spawn")
    input_queue = mp.Queue()          # passing inputs to listener process
    output_store = mp.Manager().dict()  # stores all outputs from the listener process
    
    p = mp.Process(target=worker, args=(input_queue, output_store))
    p.start()

    uvicorn.run(app, host="0.0.0.0", port = 3000)

you can use the below code to send requests to the server

# send_requests.py

import aiohttp, asyncio

url = "http://0.0.0.0:3000/generate"
url = "http://localhost:3000/generate"

prompts = [
            "Leading Causes of heart diseases in atleast 1000 words", 
            "Give me a brief history of India in atleast 1000 words",
            "Explain to me all the Harry potter books in atleast 1000 words",
            "Explain the War of Roses in atleast 1000 words",
            "Explain all the major events that happened in the World war 2 in atleast 1000 words",
            "Explain General relativity ELI5 in atleaset 1000 words",
            "Explain Evolution ELI5 in atleast 1000 words",
            "What are the big questions in the Universe in atleast 1000 words?",
            "What happens after death in atleast 1000 words",
            "Explain the story of One Piece so far in atleast 1000 words",
            "Write a story about a character who discovers a hidden talent for painting in atleast 1000 words.",
            "Imagine a world where gravity doesn't exist and create a narrative about how people adapt to this new reality in atleast 1000 words.",
            "Write a letter to your future self, ten years from now in atleast 1000 words.",
            "Describe a dream you had last night and explain its significance in atleast 1000 words.",
            "Imagine you are an alien visiting Earth for the first time and write a journal entry about your observations in atleast 1000 words.",
            "Write a script for a movie trailer about a protagonist who must save the world from an evil villain in atleast 1000 words.",
            "Imagine you have been given the power to change one thing about the world. What would it be and why? in atleast 1000 words",
            "Write a poem about your favorite season and the emotions it evokes in atleast 1000 words.",
            "Create a character who is a master of time travel and write a story about their adventures in atleast 1000 words.",
            "Imagine you are a detective solving a crime that took place in a parallel universe. How do you go about solving the case? in atleast 1000 words",
            "Write a monologue for a character who has just discovered a deep secret about their family's past in atleast 1000 words",
            "Imagine you are a superhero with the power to control the elements. Write a scene where you must use your powers to save a city from a natural disaster in atleast 1000 words.",
            "Write a story about a character who is struggling to overcome a personal defeat and find a new sense of purpose in atleast 1000 words.",
            "Create a world where memories can be transferred from one person to another. Write a story about a character who discovers this technology and the implications it has on their life in atleast 1000 words.",
            "Imagine you are a ghost haunting an old mansion. Write a journal entry about your experiences and the secrets you have uncovered in atleast 1000 words.",
]

prompts = [{"msg":i} for i in prompts]

async def send_req(session, url, payload, _id):
    print(f"Sent req number {_id}")
    async with session.post(url, json = payload) as resp:
        r = await resp.text()
        # r = await resp.json()
    print(r)

async def send_multiple(url, prompts):
    if not isinstance(prompts, list):
        prompts = [prompts]
    n = len(prompts)
    async with aiohttp.ClientSession() as session:
        coros = [send_req(session, url, payload, i) for payload,i in zip(prompts, range(1,n+1))]
        await asyncio.gather(*coros)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    # sends one request
    # loop.run_until_complete(send_multiple(url, prompts[0]))

    # sends 25 requests
    loop.run_until_complete(send_multiple(url, prompts))



from How to fetch data asynchronously from a multiprocessing spawn process in Python?

No comments:

Post a Comment