Thursday, 2 January 2020

Launching celery task_monitor in django

Looking at the celery docs i can see that the task monitor is launched in a script (see below). In an implementation of django (as is my understanding), this won't be the case, as (in my understanding) I'll have to launch the task monitor in a thread.

Currently I'm launching the monitor the first time i run a job, then checking its state each subsequent time i run a job (see further below). This seems like a bad way to do this.

What is the correct way to instantiate the task monitor for celery in a django project?

It seems I'm missing something really obvious.

Edit: am i just being basic and we should launch this in a sub process?

# docs example
from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')

    # LAUNCHED HERE
    my_monitor(app)
# my current implementation
# If the celery_monitor is not instantiated, set it up
app = Celery('scheduler',
             broker=rabbit_url,  # Rabbit-MQ
             backend=redis_url,  # Redis
             include=tasks
             )

celery_monitor = Thread(target=build_monitor, args=[app], name='monitor-global', daemon=True)

# import celery_monitor into another module
global celery_monitor

if not celery_monitor.is_alive():
    try:
        celery_monitor.start()
        logger.debug('Celery Monitor - Thread Started (monitor-retry) ')
    except RuntimeError as e:  # occurs if thread is dead
        # create new instance if thread is dead
        logger.debug('Celery Monitor - Error restarting thread (monitor-rety): {}'.format(e))
        celery_monitor = Thread(target=build_monitor, args=[app], name='monitor-retry', daemon=True)
        celery_monitor.start()  # start thread
        logger.debug('Celery Monitor - Thread Re-Started (monitor-retry) ')
else:
    logger.debug('Celery Monitor - Thread is already alive. Dont do anything.')


from Launching celery task_monitor in django

No comments:

Post a Comment