Wednesday 16 August 2023

How can I properly return my Node.js app to idle and to wait for the next Cron Schedule

I have built a Node.js app to query a couple of API endpoints to gather data on a certain stocks and act as a stock screener/websocket and then upload those final objects to my database for analysis at the end of the day. The app works fine and the objects are successfully inserted but I have to manually shut the app down and restart it after insertion because the websocket client keeps trying to reconnect after the lists are cleared at 8 pm. It will do so until it either crashes or the next cron schedule happens at 6 am the next weekday.

I haven't found a proper way to shutdown the process after the lists are cleared at the end of the day, so I manually restart the app with "npm run start" which puts the app in waiting state for the next cron schedule each night, but it defeats the purpose of trying to automate my routine, doesn't it.

Please forgive any oversight or structural errors as I am a self-taught amateur.

Q: How can I restructure my code so that when the lists are cleared and the isFirstCall flag is returned to "true", my app waits for the next cron schedule?

I have tried calling the main() method after clearing the lists with no success, and I can't call the ws.close() function from the cron schedule at the end of the day because the websocket is initiliazed inside the main() method only after the first lists are populated.

server.js code:

const express = require("express");
const schedule = require("node-schedule");
const mysql = require("mysql");
const fs = require("fs");
const { // Dependencies} = require("./utilities");
const { //  Dependencies } = require("./databaseHandler");
const { // Dependencies} = require("./apihandler");

const cors = require("cors");
const WebSocket = require("ws");
const os = require("os");
require("dotenv").config();
const cron = require("node-cron");
const { v4: uuidv4 } = require("uuid");
const { log } = require("console");
const port = 5000;

// Start API and Websocket server
const app = express();
const wss = new WebSocket.Server({ port: 8080 });

// Define global variables
let stockScreenerList = [];
let batchQuoteList = [];
let premarketRunners = [];
let isFirstCall = true;

let subscriptionList = [];
let authenticated = false;


/**
* MYSQL CONNECTION SECTION
*/
const connection = mysql.createConnection({
  // MYSQL database params
});

// EOD CRON SCHEDULE
cron.schedule(
  "5 20 * * 1-5",
  async () => {
    try {
      if (premarketRunners.length > 0) {
        for (let i = 0; i < premarketRunners.length; i++) {
          const stockItem = premarketRunners[i];
          const stockSymbol = premarketRunners[i].symbol;
        }

        const isInserted = await insertGappersDb(premarketRunners);
        if (isInserted) {
          console.log("Clearing lists.");
          premarketRunners = [];
          stockScreenerList = [];
          batchQuoteList = [];
          subscriptionList = [];
          isFirstCall = true;
        }
      }
    } catch (error) {
      console.error("Error:", error);
    }
  },
  {
    timezone: "America/New_York",
  }
);


// Populate premarket runners list
async function getGappers() {
  let tempList = [];

  if (isFirstCall) {
// populating lists first time of the day



    console.log("First PremarketRunnersList populated...");
    isFirstCall = false;

    const endTime = new Date(); // record end time
    const duration = endTime - startTime; // calculate duration
    console.log(`First api call took ${duration} milliseconds to execute`);

    return gappersList;
  } else {
    console.log("Not the first api call, Checking for new stocks");
    //Logic to populate and return only new stocks to optimize efficiency
    return tempList;
  }
}

var connectWS = function () {
  const apiKey = process.env.WSAPIKEY;
  const loginMessage = JSON.stringify({
    event: "login",
    data: { apiKey },
  });
  var reconnectInterval = 1000 * 10;
  const ws = new WebSocket("wss://websockets.financialmodelingprep.com");

  const login = () => {
    if (!authenticated) {
      console.log("Logging in...");
      ws.send(loginMessage);
    }
  };

  const subscribe = () => {
    if (authenticated) {
      const subscribeMessages = premarketRunners
        .filter(
          (ticker) => !subscriptionList.includes(ticker.symbol.toLowerCase())
        )
        .map((ticker) =>
          JSON.stringify({
            event: "subscribe",
            data: { ticker: ticker.symbol.toLowerCase() },
          })
        );
      subscribeMessages.forEach((message) => ws.send(message));
      subscribeMessages.forEach((message) => console.log(message));
    }
  };

  ws.on("open", () => {
    console.log("WebSocket opened");
    login();
    subscribe();
  });

  ws.on("message", (data) => {
    const message = JSON.parse(data);
    if (message.event === "login") {
      if (message.status !== 200) {
        console.error(`Login failed: ${message.message}`);
        setTimeout(login, 5000);
      } else {
        console.log("Authenticated");
        authenticated = true;
        subscribe();
      }
    }

    if (message.event === "subscribe") {
      if (message.status !== 200) {
        console.error(`Subscription failed: ${message.message}`);
        setTimeout(subscribe, 5000);
      } else {
        const stockSymbol = extractStockSymbol(message.message);
        subscriptionList.push(stockSymbol);
        console.log(`Successfully Subbed to: ${stockSymbol}`);
      }
    }

    if (message.type === "T") {
      if (premarketRunners) {
        //premarket runners list is populated, broadcasting messages.
        const foundStock = premarketRunners.find(
          (item) => item.symbol.toUpperCase() === message.s.toUpperCase()
        );
        if (foundStock) {
          foundStock.premarketPrice = message.lp;
          wss.clients.forEach((client) => {
            if (client.readyState === WebSocket.OPEN) {
              client.send(JSON.stringify(premarketRunners));
            }
          });
        }
      }
    }
  });

  ws.on("error", (err) => {
    console.error(`WebSocket error: ${err.message}`);
  });

  ws.on("close", (code) => {
    console.log(`WebSocket closed with code ${code}`);
    authenticated = false;
    subscriptionList = [];
    setTimeout(connectWS, reconnectInterval);
  });
};

/**
* MAIN SECTION OF THE SERVER
*/

async function main() {
  const startTime = Date.now();
  const todaysDate = new Date(startTime).toLocaleDateString("en-US");

  console.log("Checktime called");
  const timeRight = checkTime();
  const marketStatus = await checkMarketStatus();
  //const market = await isMarketOpen();
  const isPremarket = marketStatus[0];
  const nasdaqStatus = marketStatus[1];
  const serverDate = marketStatus[3];

  if (timeRight && isPremarket && nasdaqStatus !== "open") {
    //console.log('first call');
    premarketRunners = await getGappers();

    connectWS();

    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify(premarketRunners));
      }
    });

    setInterval(async () => {
      const timeRight = checkTime();
      const marketStatus = await checkMarketStatus();

      const isPremarket = marketStatus[0];
      const nasdaqStatus = marketStatus[1];
      const serverDate = marketStatus[3];


      if (timeRight && isPremarket && nasdaqStatus !== "open") {
        // GetGappers already ran once, list ins't empty so just check for new runners.
        console.log(
          `premarketRunners contains ${premarketRunners.length} stocks, \nsubscriptionList contains: ${subscriptionList.length} symbols.. \nChecking for new tickers`
        );

        const newTickers = await getGappers();

        if (newTickers.length > 0) {
          // Filter out newTickers that are already subscribed to
          const filteredTickers = newTickers.filter(
            (ticker) => !subscriptionList.includes(ticker.symbol.toLowerCase())
          );

          filteredTickers.forEach((item) => {
            console.log(`Found new Ticker: ${item.symbol}... Subbing.`);
          });
          //subscribe to websocket info for each new stock that's not already subscribed to
          const subscribeMessages = filteredTickers.map((ticker) =>
            JSON.stringify({
              event: "subscribe",
              data: { ticker: ticker.symbol.toLowerCase() },
            })
          );

          newTickers.forEach((item) => premarketRunners.push(item));
          // send payload to connected users
          wss.clients.forEach((client) => {
            if (client.readyState === WebSocket.OPEN) {
              client.send(JSON.stringify(premarketRunners));
            }
          });
        } else {
          console.log("No new tickers found");
          // Logic to update existing items then sending them

          console.log("Done updating... Sending Updated payload...");

          wss.clients.forEach((client) => {
            if (client.readyState === WebSocket.OPEN) {
              client.send(JSON.stringify(premarketRunners));
            }
          });
        }
      }
    }, 45 * 1000); // Check for new stocks delay
  }
}

/**
* Run The main function everyday of the week at 6 am EST
*/
cron.schedule(
  "0 6 * * 1-5",
  () => {
    main();
  },
  {
    timezone: "America/New_York",
  }
);

Thanks!



from How can I properly return my Node.js app to idle and to wait for the next Cron Schedule

No comments:

Post a Comment