Wednesday, 20 September 2023

How to trigger message send of Fastapi websocket outside of Fastapi app

I have a WebSocket connection manager like this :

class ConnectionManager:
    def __init__(self) -> None:
        self.connections  = {}
 
    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket

    async def disconnect(self, user_id):
        websocket: WebSocket = self.connections[user_id]
        await websocket.close()
        del self.connections[user_id]

    async def send_messages(self, user_ids, message):
        for user_id in user_ids:
            websocket: WebSocket = self.connections[user_id]
            await websocket.send_json(message

and a WebSocket route :

@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str, redis :Annotated [Redis, Depends(get_redis)]):
    user_id = redis.get(token)
    if user_id:
        redis.expire(user_id)
    else:
        raise redis_error
    
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

I want to store the user connections and when a Redis pubsub message comes through, process the message and then send a WebSocket message to some users. the module that processes the message is not part of the Fastapi app.

i tried to implement this inside of the Fastapi app by implementing threading and asyncio but these to interupted the Fastapi app itself.

how can I trigger the send message of WebSocket objects outside of the Fastapi application?

What I've tried:

redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")

@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
    message = await  
    pubsub.get_message(ignore_subscribe_messages=True)
    if message is not None:
         # do something
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

but I get pubsub error from redis and it says that i haven't subscribed yet unless i do it like this:


@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
    redis = Redis(redis_host, redis_port)
    pubsub = redis.pubsub()
    pubsub.subscribe("channel_signal")

    message = await  
    pubsub.get_message(ignore_subscribe_messages=True)
    if message is not None:
        # do something
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

but this creates a redis connection for every user that connects to the websocket, is there anyway to globally define a redis connection for all users?



from How to trigger message send of Fastapi websocket outside of Fastapi app

No comments:

Post a Comment