I am experiencing a long (3-hour) delay (EDIT: the delay is brief at first and then gets longer throughout the day) in processing data pushed from a websocket server to my client.py. I know that it is not delayed by the server.
For example every 5 seconds I see the keep_alive log-event and its respective timestamp. So that runs smoothly. But when I see a data frame processed in logs is actually 3 hours after when the server sent it. Am I doing something to delay this process?
Am I calling my coroutine 'keep_alive' correctly? keep_alive is just a message to the server to keep the connection alive. The server echos the message back. Also am I logging too much? Could that be delaying the processing (I don't think so since I'm seeing the logging events occur right away).
async def keep_alive(websocket):
"""
This only needs to happen every 30 minutes. I currently have it set to every 5 seconds.
"""
await websocket.send('Hello')
await asyncio.sleep(5)
async def open_connection_test():
"""
Establishes web socket (WSS). Receives data and then stores in csv.
"""
async with websockets.connect(
'wss://{}:{}@localhost.urlname.com/ws'.format(user,pswd), ssl=True, ) as websocket:
while True:
"""
Handle message from server.
"""
message = await websocket.recv()
if message.isdigit():
# now = datetime.datetime.now()
rotating_logger.info ('Keep alive message: {}'.format(str(message)))
else:
jasonified_message = json.loads(message)
for key in jasonified_message:
rotating_logger.info ('{}: \n\t{}\n'.format(key,jasonified_message[key]))
"""
Store in a csv file.
"""
try:
convert_and_store(jasonified_message)
except PermissionError:
convert_and_store(jasonified_message, divert = True)
"""
Keep connection alive.
"""
await keep_alive(websocket)
"""
Logs any exceptions in logs file.
"""
try:
asyncio.get_event_loop().run_until_complete(open_connection())
except Exception as e:
rotating_logger.info (e)
EDIT: From the documentation - I'm thinking that this might have something to do with it - but I haven't connected the dots.
The max_queue parameter sets the maximum length of the queue that holds incoming messages. The default value is 32. 0 disables the limit. Messages are added to an in-memory queue when they’re received; then recv() pops from that queue. In order to prevent excessive memory consumption when messages are received faster than they can be processed, the queue must be bounded. If the queue fills up, the protocol stops processing incoming data until recv() is called. In this situation, various receive buffers (at least in asyncio and in the OS) will fill up, then the TCP receive window will shrink, slowing down transmission to avoid packet loss.
from Long delay in using asyncio and websockets in Python 3
No comments:
Post a Comment