I have a WebSocket
connection manager like this :
class ConnectionManager:
def __init__(self) -> None:
self.connections = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
async def disconnect(self, user_id):
websocket: WebSocket = self.connections[user_id]
await websocket.close()
del self.connections[user_id]
async def send_messages(self, user_ids, message):
for user_id in user_ids:
websocket: WebSocket = self.connections[user_id]
await websocket.send_json(message
and a WebSocket route :
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str, redis :Annotated [Redis, Depends(get_redis)]):
user_id = redis.get(token)
if user_id:
redis.expire(user_id)
else:
raise redis_error
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
I want to store the user connections and when a Redis pubsub message
comes through, process the message and then send a WebSocket message
to some users. the module that processes the message is not part of the Fastapi
app.
i tried to implement this inside of the Fastapi
app by implementing threading
and asyncio
but these to interupted the Fastapi
app itself.
how can I trigger the send message of WebSocket objects
outside of the Fastapi application
?
What I've tried:
redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
message = await
pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
# do something
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
but I get pubsub error from redis and it says that i haven't subscribed yet unless i do it like this:
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")
message = await
pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
# do something
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
but this creates a redis connection for every user that connects to the websocket, is there anyway to globally define a redis connection for all users?
from How to trigger message send of Fastapi websocket outside of Fastapi app
No comments:
Post a Comment