Tuesday, 26 January 2021

Slow Socket IO response when using Docker

I have a Web App built in Flask where tweets are captured (using Tweepy library) and displayed on the front-end. I used Socket IO to display the tweets live on the front-end.

My code works fine when I run this locally. The tweets appear instantly.

However, when i Dockerized the web app, the front-end doesn't update immediately. It takes some time to show the changes (sometimes I think tweets are lost due to the slowness)

Below are code extracts from my website:

fortsocket.js

$(document).ready(function () {



/************************************/
  /*********** My Functions ***********/
  /************************************/
  function stream_active_setup() {
    $("#favicon").attr("href", "/static/icons/fortnite-active.png");
    $("#stream-status-ic").attr("src", "/static/icons/stream-active.png");
    $("#stream-status-text").text("Live stream active");
  }

  function stream_inactive_setup() {
    $("#favicon").attr("href", "/static/icons/fortnite-inactive.png");
    $("#stream-status-ic").attr("src", "/static/icons/stream-inactive.png");
    $("#stream-status-text").text("Live stream inactive");
  }



  /*********************************/
  /*********** My Events ***********/
  /*********************************/

  // Socket connection to server

  // Prometheus
  //var socket = io.connect('http://104.131.173.145:8083');

  // Local
  var socket = io.connect(window.location.protocol + '//' + document.domain + ':' + location.port);

  // Heroku
  //var socket = io.connect('https://fortweet.herokuapp.com/');

  // Send a hello to know
  // if a stream is already active
  socket.on('connect', () => {
    socket.emit('hello-stream', 'hello-stream');
  });

  // Listene for reply from hello
  socket.on('hello-reply', function (bool) {
    if (bool == true) {
      stream_active_setup()
    } else {
      stream_inactive_setup()
    }
  });

  // Listens for tweets
  socket.on('stream-results', function (results) {

    // Insert tweets in divs
    $('#live-tweet-container').prepend(`
    <div class="row justify-content-md-center mt-3">
      <div class="col-md-2">
          <img width="56px" height="56px"  src="${results.profile_pic !== "" ? results.profile_pic : "/static/icons/profile-pic.png"}" class="mx-auto d-block rounded"  alt="">
      </div>
      <div class="col-md-8 my-auto">
        <div><b>${results.author}</b></div>
        <div>${results.message}</div>
      </div>
    </div>
    `);
  });

  // Listener for when a stream of tweets starts
  socket.on('stream-started', function (bool) {
    if (bool == true) {
      stream_active_setup()
    }
  });

  // Listener for when a stream of tweets ends
  socket.on('stream-ended', function (bool) {
    if (bool == true) {
      stream_inactive_setup()
    }
  });

});

init.py

# Create the app
app = create_app()

# JWT Configurations
jwt = JWTManager(app)

# Socket IO
socketio = SocketIO(app, cors_allowed_origins="*")

# CORS
CORS(app)
app.config["CORS_HEADERS"] = "Content-Type"

# Creates default admins and insert in db
create_default_admin()

# Main error handlers
@app.errorhandler(404)  # Handling HTTP 404 NOT FOUND
def page_not_found(e):
    return Err.ERROR_NOT_FOUND


# Listen for hello emit data
# from client
@socketio.on("hello-stream")
def is_stream_active(hello_stream):
    emit("hello-reply", streamer.StreamerInit.is_stream_active(), broadcast=True)

streamer.py

import time
import tweepy
import threading as Coroutine
import app.messages.constants as Const
import app.setup.settings as settings_mod
import app.models.tweet as tweet_mod
import app.services.logger as logger
import app


class FStreamListener(tweepy.StreamListener):
    def __init__(self):
        self.start_time = time.time()
        self.limit = settings_mod.TwitterSettings.get_instance().stream_time

        logger.get_logger().debug("Live capture has started")

        # Notify client that a live capture will start
        app.socketio.emit(
            "stream-started", True, broadcast=True,
        )

        super(FStreamListener, self).__init__()

    def on_status(self, status):
        if (time.time() - self.start_time) < self.limit:

            # Create tweet object
            forttweet = tweet_mod.TweetModel(
                status.source,
                status.user.name,
                status.user.profile_background_image_url_https,
                status.text,
                status.created_at,
                status.user.location,
            )

            # Emit to socket
            app.socketio.emit(
                "stream-results",
                {
                    "profile_pic": forttweet.profile_pic,
                    "author": forttweet.author,
                    "message": forttweet.message,
                },
                broadcast=True,
            )

            # Add to database
            forttweet.insert()

            return True
        else:
            logger.get_logger().debug("Live capture has ended")

            # Notify client that a live capture has ended
            app.socketio.emit(
                "stream-ended", True, broadcast=True,
            )

            # Stop the loop of streaming
            return False

    def on_error(self, status):
        logger.get_logger().debug(f"An error occurred while fetching tweets: {status}")
        raise Exception(f"An error occurred while fetching tweets: {status}")


class StreamerInit:

# [Private] Twitter configurations
def __twitterInstantiation(self):
    # Get settings instance
    settings = settings_mod.TwitterSettings.get_instance()
    # Auths
    auth = tweepy.OAuthHandler(settings.consumer_key, settings.consumer_secret,)
    auth.set_access_token(
        settings.access_token, settings.access_token_secret,
    )
    # Get API
    api = tweepy.API(auth)
    # Live Tweets Streaming
    myStreamListener = FStreamListener()
    myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
    myStream.filter(track=settings.filters)

def start(self):
    for coro in Coroutine.enumerate():
        if coro.name == Const.FLAG_TWEETS_LIVE_CAPTURE:
            return False

    stream = Coroutine.Thread(target=self.__twitterInstantiation)
    stream.setName(Const.FLAG_TWEETS_LIVE_CAPTURE)
    stream.start()

    return True

@staticmethod
def is_stream_active():
    for coro in Coroutine.enumerate():
        if coro.name == Const.FLAG_TWEETS_LIVE_CAPTURE:
            return True

    return False

The streamer.py is called on a button click

Dockerfile

# Using python 3.7 in Alpine
FROM python:3.6.5-stretch

# Set the working directory to /app
WORKDIR /app

# Copy the current directory contents into the container at /app
ADD . /app

RUN apt-get update -y && apt-get upgrade -y && pip install -r requirements.txt

# Run the command
ENTRYPOINT ["uwsgi", "app.ini"]

#ENTRYPOINT ["./entry.sh"]

docker-compose.yml

version: "3.8"

services:
  fortweet:
    container_name: fortweet
    image: mervin16/fortweet:dev
    build: ./
    env_file:
      - secret.env
    networks:
      plutusnet:
        ipv4_address: 172.16.0.10
    expose:
      - 8083
    restart: always

  nginx_fortweet:
    image: nginx
    container_name: nginx_fortweet
    ports:
      - "8083:80"
    networks:
      plutusnet:
        ipv4_address: 172.16.0.100
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - fortweet
    restart: always

networks:
  plutusnet:
    name: plutus_network
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: 172.16.0.0/24
          gateway: 172.16.0.1

app.ini

[uwsgi]

wsgi-file = run.py
callable = app
socket = :8083
processes = 4
threads = 2
master = true
chmod-socket = 660
http-websockets = true
vacuum = true
die-on-term = true

For a full, updated code, you can find it here under the branch dev/mervin

Any help is appreciated.



from Slow Socket IO response when using Docker

No comments:

Post a Comment