Monday, 16 July 2018

Celery tasks through view (Django) returned as pending, but okay via Terminal

This issue has been discussed before and looking over numerous posts, I am so far unable to find a solution to this problem. I'm new to celery so my learning curve is still fairly steep. Below my current scripts:

myapp.__init__.py

from __future__ import absolute_import, unicode_literals
from .celery_main import app as celery_app # Ensures app is always imported when Django starts so that shared_task will use this app.

__all__ = ['celery_app']

myapp.celery_main.py

from __future__ import absolute_import
from celery import Celery
from django.apps import apps

# Initialise the app
app = Celery()                          
app.config_from_object('myapp.celeryconfig')  # WORKS WHEN CALLED THROUGH VIEW/DJANGO: Tell Celery instance to use celeryconfig module
#app.config_from_object('celeryconfig')  # WORKS WHEN CALLED THROUGH TERMINAL

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])

myapp.celeryconfig.py

from __future__ import absolute_import, unicode_literals
from datetime import timedelta

## List of modules to import when celery starts.
CELERY_IMPORTS = ('celery_tasks',)

## Message Broker (RabbitMQ) settings.
BROKER_URL = 'amqp://'
BROKER_PORT = 5672

## Result store settings.
CELERY_RESULT_BACKEND = 'rpc://'

## Misc
#CELERY_IGNORE_RESULT = False
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_ENABLE_UTC = True

CELERYBEAT_SCHEDULE = {
    'doctor-every-10-seconds': {
        'task': 'celery_tasks.fav_doctor',
        'schedule': timedelta(seconds=3),
    },
}

myapp.celery_tasks.py

from __future__ import absolute_import
from celery.task import task

suf = lambda n: "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th"))
@task
def fav_doctor():
    # Stuff happend here

@task
def reverse(string):
    return string[::-1]

@task
def send_email(user_id):
    # Stuff happend here

@task
def add(x, y):
    return x+y

anotherapp.settings.py

INSTALLED_APPS = [
    ...
    'kombu.transport.django',
]

myapp.views.admin_scripts.py

from celery.result import AsyncResult
from myapp.celery_tasks import fav_doctor, reverse, send_email, add
from myapp.celery_main import app

@login_required
def admin_script_dashboard(request):
    if request.method == 'POST':
        form = Admin_Script(request.POST)
        if form.is_valid():
                # Results
                async_result = add.delay(2, 5)
                task_id = async_result.task_id
                res = AsyncResult(async_result)
                res_1 = add.AsyncResult(async_result)
                res_2 = add.AsyncResult(async_result.id)
                print ("async_result: {0}\ntask_id: {1}\nres: {2}\nres_1: {3}\nres_2: {4}".format(async_result, task_id, res, res_1, res_2))

                # Backend: Make sure the client is configured with the right backend
                print("Backend check: {0}".format(async_result.backend))

                # States/statuses
                task_state = res.state
                A = async_result.status
                B = res.status
                print ("task_state: {0}\nA: {1}\nB: {2}".format(task_state, A, B))

The results when triggering the celery workers through my django application (related to the print statements in app.views.admin_scripts.py):

async_result: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
task_id: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_1: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
res_2: 00d7ec84-ebdb-4968-9ea6-f20ca2a793b7
Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>
task_state: PENDING
A: PENDING
B: PENDING

Output in Terminal triggered:

[2018-07-15 21:41:47,015: ERROR/MainProcess] Received unregistered task of type 'MyApp.celery_tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see <link> for more information.

The full contents of the message body was:
{'task': 'MyApp.celery_tasks.add', 'id': 'b21ffa43-d1f1-4767-9ab8-e58afec3ea0f', 'args': [2, 5], 'kwargs': {}, 'retries': 0, 'eta': None, 'expires': None, 'utc': True, 'callbacks': None, 'errbacks': None, 'timelimit': [None, None], 'taskset': None, 'chord': None} (266b)
Traceback (most recent call last):
  File "/Users/My_MBP/anaconda3/lib/python3.6/site-packages/celery/worker/consumer.py", line 465, in on_task_received
    strategies[type_](message, body,
KeyError: 'MyApp.celery_tasks.add'

I have several questions:

1. I can trigger the expected results by using commands in Terminal:

celery -A celery_tasks worker -l info

Then in the Python shell:

from celery_tasks import *
add.delay(2,3) 

Which succeeds:

[2018-07-13 10:12:14,943: INFO/MainProcess] Received task: celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc]
[2018-07-13 10:12:14,961: INFO/MainProcess] Task celery_tasks.add[c100ad91-2f94-40b1-bb0e-9bc2990ff3bc] succeeded in 0.017578680999577045s: 54

So executing the tasks in Terminal works, but not in my view.py in Django, why not?

2. Perhaps related to 1.: I have to, annoyingly, configure in app.celery_main.py the app.config_from_object depending if I want to test via Django, or via Terminal. You can see either I set the celeryconfig.py with myapp name prefixed, or without. Otherwise, an error message is thrown. I suspect some kind of import looping is causing an issue here (though I could be wrong) but I don't know why/where. How can I overcome this?

3. In my settings.py file (not celeryconfig.py) I have configured in INSTALLED_APPS: 'kombu.transport.django'. Is this necessary? I'm using celery 3.1.26.post2 (Cipater)

4. In all my files I have at the top:

from __future__ import absolute_import, unicode_literals

For what purpose is this for exactly and for 3.1.26 is it required?

5. I read here, that you need to ensure the client is configured with the right backend. But I'm not sure exactly what this means. My print out is (as per app.views.admin_scripts.py):

Backend check: <celery.backends.rpc.RPCBackend object at 0x106e308d0>

If there are any abnormalities in my code you recognise, please feel free to let me know.



from Celery tasks through view (Django) returned as pending, but okay via Terminal

No comments:

Post a Comment