Tuesday 26 July 2022

Implement concurrent data fetch from Azure Redis cache in python

I am currently working on building low latency model inference API using fast API, we are using azure redis cache standard version for fetching features and onnx model for fast model inference. I am using aioredis to implement concurrency for data read in redis. I am calling two feature request from redis one for userID that fetch single string and other for product that fetches list of strings, this later I convert to list of float using json parsing.

For one request overall its taking 70-80ms but for more than 10 concurrent request the redis is taking more than 400ms to fetch results which is huge and can increase linearly over more concurrent users while load testing.

The code for getting data from redis is:

import numpy as np
import json
from ..Helpers.helper import curt_giver, milsec_calc
import aioredis
r = aioredis.from_url("redis://user:host",decode_responses=True)

async def get_user(user:list) -> str:
    user_data = await r.get(user)
    return user_data
async def get_products(product:list)-> list:
    product_data = await r.mget(product)
    return product_data

async def get_features(inputs: dict) -> list:
    
    st = curt_giver()
    user_data = await get_user(inputs['userId'])
    online_user_data = [json.loads(json.loads(user_data))]
    end = curt_giver()
    print("Time to get user features: ", milsec_calc(st,end))
    
    st = curt_giver()
    product_data = await get_products(inputs['productIds'])
    online_product_data = []
    for i in product_data:
        online_product_data.append(json.loads(json.loads(i)))
    end = curt_giver()
    print("Time to get product features: ", milsec_calc(st,end))

    user_outputs = np.asarray(online_user_data,dtype=object)
    product_outputs = np.asarray(online_product_data,dtype=object)
    output = np.concatenate([np.concatenate([user_outputs]*product_outputs.shape[0])
    ,product_outputs],axis = 1)
    return output.tolist()

curt_giver() is time in milliseconds. The code from main file is:

    from fastapi import FastAPI
    from v1.redis_conn.get_features import get_features
    
    from model_scoring.score_onnx import score_features
    from v1.post_processing.sort_results import sort_results
    
    from v1.api_models.input_models import Ranking_Input
    from v1.api_models.output_models import Ranking_Output
    from v1.Helpers.helper import curt_giver, milsec_calc
    import numpy as np
    
    
    app = FastAPI()
    
    # Sending user and product ids through body, 
    # Hence a POST request is well suited for this, GET has unexpected behaviour
    @app.post("/predict", response_model = Ranking_Output)
    async def rank_products(inp_req: Ranking_Input):
      beg = curt_giver()
      reqids = inp_req.dict()
      st = curt_giver()
      features = await get_features(reqids)
      end = curt_giver()
    
      print("Total Redis duration ( user + products fetch): ", milsec_calc(st,end))
    
      data = np.asarray(features,dtype=np.float32,order=None)
      
      st = curt_giver()
      scores = score_features(data)
      end = curt_giver()
    
      print("ONNX model duration: ", milsec_calc(st,end))
    
      Ranking_results = sort_results(scores, list(reqids["productIds"]))
      end = curt_giver()
      print("Total time for API: ",milsec_calc(beg,end))
      resp_json = {"requestId": inp_req.requestId,
      "ranking": Ranking_results,
      "zipCode": inp_req.zipCode}
    
      return resp_json    

Through the timings I can read that for one request its taking very less time but for concurrent user the time for getting product data is keep on increasing linearly. Time to fetch one request all values are in milliseconds:

Time to get user features:  1
Time to get product features:  47
Total Redis duration ( user + products fetch):  53
ONNX model duration:  2
Total time for API:  60

Time to fetch for more than 10 concurrent request:

Time to get user features:  151
Time to get user features:  150
Time to get user features:  151
Time to get user features:  52
Time to get user features:  51
Time to get product features:  187
Total Redis duration ( user + products fetch):  433
ONNX model duration:  2
Total time for API:  440
INFO:     127.0.0.1:60646 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  239
Total Redis duration ( user + products fetch):  488
ONNX model duration:  2
Total time for API:  495
INFO:     127.0.0.1:60644 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  142
Total Redis duration ( user + products fetch):  297
ONNX model duration:  2
Total time for API:  303
INFO:     127.0.0.1:60648 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  188
Total Redis duration ( user + products fetch):  342
ONNX model duration:  2
Total time for API:  348

Its keep on increasing for more, hitting even 900ms+ to fetch both data from redis, Is there any way I can efficiently fetch concurrent data with low latency and increasing concurrent request like 500 and doesn't effect the latency, my target is under 300ms for 300 request concurrently every second.

I am stuck at this point any help, I will be very grateful.



from Implement concurrent data fetch from Azure Redis cache in python

No comments:

Post a Comment