Wednesday, 20 January 2021

Python Flask API with stomp listener multiprocessing

TLDR: I need to setup a flask app for multiprocessing such that the API and stomp queue listener are running in separate processes and therefore not interfering with each other's operations.

Details: I am building a python flask app that has API endpoints and also creates a message queue listener to connect to an activemq queue with the stomp package.

I need to implement multiprocessing such that the API and listener do not block each other's operation. That way the API will accept new requests and the listener will continue to listen for new messages and carry out tasks accordingly.

A simplified version of the code is shown below (some details are omitted for brevity).

Problem: The multiprocessing is causing the application to get stuck. The worker's run method is not called consistently, and therefore the listener never gets created.

# Start the worker as a subprocess -- this is not working -- app gets stuck before the worker's run method is called
m = Manager()
shared_state = m.dict()
worker = MyWorker(shared_state=shared_state)
worker.start()

After several days of troubleshooting I suspect the problem is due to the multiprocessing not being setup correctly. I was able to prove that this is the case because when I stripped out all of the multiprocessing code and called the worker's run method directly, the all of the queue management code is working correctly, the CustomWorker module creates the listener, creates the message, and picks up the message. I think this indicates that the queue management code is working correctly and the source of the problem is most likely due to the multiprocessing.

# Removing the multiprocessing and calling the worker's run method directly works without getting stuck so the issue is likely due to multiprocessing not being setup correctly
worker = MyWorker()
worker.run()

Here is the code I have so far:

App

This part of the code creates the API and attempts to create a new process to create the queue listener. The 'custom_worker_utils' module is a custom module that creates the stomp listener in the CustomWorker() class run method.

from flask import Flask, request, make_response, jsonify
from flask_restx import Resource, Api
import sys, os, logging, time
basedir = os.path.dirname(os.getcwd())

sys.path.append('..')

from custom_worker_utils.custom_worker_utils import *
from multiprocessing import Manager

# app.py
def create_app():
    app = Flask(__name__)
    app.config['BASE_DIR'] = basedir

    api = Api(app, version='1.0', title='MPS Worker', description='MPS Common Worker')
    logger = get_logger()

    '''
    This is a placeholder to trigger the sending of a message to the first queue
    '''
    @api.route('/initialapicall', endpoint="initialapicall", methods=['GET', 'POST', 'PUT', 'DELETE'])
    class InitialApiCall(Resource):
    
        #Sends a message to the queue
        def get(self, *args, **kwargs):
            mqconn = get_mq_connection()
            message = create_queue_message(initial_tracker_file)
            mqconn.send('/queue/test1', message,  headers = {"persistent":"true"})
            return make_response(jsonify({'message': 'Initial Test Call Worked!'}), 200)
    
    # Start the worker as a subprocess -- this is not working -- app gets stuck before the worker's run method is called
    m = Manager()
    shared_state = m.dict()
    worker = MyWorker(shared_state=shared_state)
    worker.start()

    # Removing the multiprocessing and calling the worker's run method directly works without getting stuck so the issue is likely due to multiprocessing not being setup correctly
    #worker = MyWorker()
    #worker.run()
    
    return app

Custom worker utils

The run() method is called, connects to the queue and creates the listener with the stomp package

# custom_worker_utils.py
from multiprocessing import Manager, Process
from _datetime import datetime
import os, time, json, stomp, requests, logging, random

'''
The listener
'''
class MyListener(stomp.ConnectionListener):

    def __init__(self, p):
        self.process = p
        self.logger = p.logger
        self.conn = p.mqconn
        self.conn.connect(_user, _password, wait=True)
        self.subscribe_to_queue()
                
    def on_message(self, headers, message):
        message_data = json.loads(message)
        ticket_id = message_data[constants.TICKET_ID]
        prev_status = message_data[constants.PREVIOUS_STEP_STATUS]
        task_name = message_data[constants.TASK_NAME]
        #Run the service
        if prev_status == "success":
            resp = self.process.do_task(ticket_id, task_name)
        elif hasattr(self, 'revert_task'):
            resp = self.process.revert_task(ticket_id, task_name)
        else:
            resp = True
        if (resp):
            self.logger.debug('Acknowledging')
            self.logger.debug(resp)
            self.conn.ack(headers['message-id'], self.process.conn_id)
        else:
            self.conn.nack(headers['message-id'], self.process.conn_id)

    def on_disconnected(self):
        self.conn.connect('admin', 'admin', wait=True)
        self.subscribe_to_queue()
        
    def subscribe_to_queue(self):
        queue = os.getenv('QUEUE_NAME')
        self.conn.subscribe(destination=queue, id=self.process.conn_id, ack='client-individual')
        

def get_mq_connection():
  conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
  conn.connect(_user, _password, wait=True)
  return conn

class CustomWorker(Process):
    
    def __init__(self, **kwargs):
        super(CustomWorker, self).__init__()
        self.logger = logging.getLogger("Worker Log")
        log_level = os.getenv('LOG_LEVEL', 'WARN')
        self.logger.setLevel(log_level)
        self.mqconn = get_mq_connection()
        self.conn_id = random.randrange(1,100)
        for k, v in kwargs.items():
            setattr(self, k, v)

    def revert_task(self, ticket_id, task_name):
        # If the subclass does not implement this,
        # then there is nothing to undo so just return True
        return True

    def run(self):
        lst = MyListener(self)
        self.mqconn.set_listener('queue_listener', lst)
        while True:
            pass


from Python Flask API with stomp listener multiprocessing

No comments:

Post a Comment