TLDR: I need to setup a flask app for multiprocessing such that the API and stomp queue listener are running in separate processes and therefore not interfering with each other's operations.
Details: I am building a python flask app that has API endpoints and also creates a message queue listener to connect to an activemq queue with the stomp package.
I need to implement multiprocessing such that the API and listener do not block each other's operation. That way the API will accept new requests and the listener will continue to listen for new messages and carry out tasks accordingly.
A simplified version of the code is shown below (some details are omitted for brevity).
Problem: The multiprocessing is causing the application to get stuck. The worker's run method is not called consistently, and therefore the listener never gets created.
# Start the worker as a subprocess -- this is not working -- app gets stuck before the worker's run method is called
m = Manager()
shared_state = m.dict()
worker = MyWorker(shared_state=shared_state)
worker.start()
After several days of troubleshooting I suspect the problem is due to the multiprocessing not being setup correctly. I was able to prove that this is the case because when I stripped out all of the multiprocessing code and called the worker's run method directly, the all of the queue management code is working correctly, the CustomWorker module creates the listener, creates the message, and picks up the message. I think this indicates that the queue management code is working correctly and the source of the problem is most likely due to the multiprocessing.
# Removing the multiprocessing and calling the worker's run method directly works without getting stuck so the issue is likely due to multiprocessing not being setup correctly
worker = MyWorker()
worker.run()
Here is the code I have so far:
App
This part of the code creates the API and attempts to create a new process to create the queue listener. The 'custom_worker_utils' module is a custom module that creates the stomp listener in the CustomWorker() class run method.
from flask import Flask, request, make_response, jsonify
from flask_restx import Resource, Api
import sys, os, logging, time
basedir = os.path.dirname(os.getcwd())
sys.path.append('..')
from custom_worker_utils.custom_worker_utils import *
from multiprocessing import Manager
# app.py
def create_app():
app = Flask(__name__)
app.config['BASE_DIR'] = basedir
api = Api(app, version='1.0', title='MPS Worker', description='MPS Common Worker')
logger = get_logger()
'''
This is a placeholder to trigger the sending of a message to the first queue
'''
@api.route('/initialapicall', endpoint="initialapicall", methods=['GET', 'POST', 'PUT', 'DELETE'])
class InitialApiCall(Resource):
#Sends a message to the queue
def get(self, *args, **kwargs):
mqconn = get_mq_connection()
message = create_queue_message(initial_tracker_file)
mqconn.send('/queue/test1', message, headers = {"persistent":"true"})
return make_response(jsonify({'message': 'Initial Test Call Worked!'}), 200)
# Start the worker as a subprocess -- this is not working -- app gets stuck before the worker's run method is called
m = Manager()
shared_state = m.dict()
worker = MyWorker(shared_state=shared_state)
worker.start()
# Removing the multiprocessing and calling the worker's run method directly works without getting stuck so the issue is likely due to multiprocessing not being setup correctly
#worker = MyWorker()
#worker.run()
return app
Custom worker utils
The run() method is called, connects to the queue and creates the listener with the stomp package
# custom_worker_utils.py
from multiprocessing import Manager, Process
from _datetime import datetime
import os, time, json, stomp, requests, logging, random
'''
The listener
'''
class MyListener(stomp.ConnectionListener):
def __init__(self, p):
self.process = p
self.logger = p.logger
self.conn = p.mqconn
self.conn.connect(_user, _password, wait=True)
self.subscribe_to_queue()
def on_message(self, headers, message):
message_data = json.loads(message)
ticket_id = message_data[constants.TICKET_ID]
prev_status = message_data[constants.PREVIOUS_STEP_STATUS]
task_name = message_data[constants.TASK_NAME]
#Run the service
if prev_status == "success":
resp = self.process.do_task(ticket_id, task_name)
elif hasattr(self, 'revert_task'):
resp = self.process.revert_task(ticket_id, task_name)
else:
resp = True
if (resp):
self.logger.debug('Acknowledging')
self.logger.debug(resp)
self.conn.ack(headers['message-id'], self.process.conn_id)
else:
self.conn.nack(headers['message-id'], self.process.conn_id)
def on_disconnected(self):
self.conn.connect('admin', 'admin', wait=True)
self.subscribe_to_queue()
def subscribe_to_queue(self):
queue = os.getenv('QUEUE_NAME')
self.conn.subscribe(destination=queue, id=self.process.conn_id, ack='client-individual')
def get_mq_connection():
conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
conn.connect(_user, _password, wait=True)
return conn
class CustomWorker(Process):
def __init__(self, **kwargs):
super(CustomWorker, self).__init__()
self.logger = logging.getLogger("Worker Log")
log_level = os.getenv('LOG_LEVEL', 'WARN')
self.logger.setLevel(log_level)
self.mqconn = get_mq_connection()
self.conn_id = random.randrange(1,100)
for k, v in kwargs.items():
setattr(self, k, v)
def revert_task(self, ticket_id, task_name):
# If the subclass does not implement this,
# then there is nothing to undo so just return True
return True
def run(self):
lst = MyListener(self)
self.mqconn.set_listener('queue_listener', lst)
while True:
pass
from Python Flask API with stomp listener multiprocessing
No comments:
Post a Comment