I am using Flask with Celery and I am trying to lock a specific task so that it can only be run one at a time. In the celery docs it gives a example of doing this Celery docs, Ensuring a task is only executed one at a time. This example that was given was for Django however I am using flask I have done my best to convert this to work with Flask however I still see myTask1 which has the lock can be run multiple times.
One thing that is not clear to me is if I am using the cache correctly, I have never used it before so all of it is new to me. One thing from the doc's that is mentioned but not explained is this
Doc Notes:
In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
Im not truly sure what that means, should i be using the cache in conjunction with a database and if so how would I do that? I am using mongodb. In my code I just have this setup for the cache cache = Cache(app, config={'CACHE_TYPE': 'simple'})
as that is what was mentioned in the Flask-Cache doc's Flask-Cache Docs
Another thing that is not clear to me is if there is anything different I need to do as I am calling my myTask1
from within my Flask route task1
Here is an example of my code that I am using.
from flask import (Flask, render_template, flash, redirect,
url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'
######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'
mongo = PyMongo(app)
##############################
# CELERY ARGUMENTS
##############################
app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
"host": "localhost",
"port": 27017,
"database": "celery-test-db",
"taskmeta_collection": "celery_jobs",
}
app.config['CELERY_TASK_SERIALIZER'] = 'json'
celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
lock_id = self.name
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
print('you are in task2')
self.update_state(state='STARTING')
time.sleep(120)
print('task2 done')
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/task1', methods=['GET', 'POST'])
def task1():
print('running task1')
result = myTask1.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})
return render_template('task1.html')
@app.route('/task2', methods=['GET', 'POST'])
def task2():
print('running task2')
result = myTask2.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})
return render_template('task2.html')
@app.route('/status', methods=['GET', 'POST'])
def status():
taskid_list = []
task_state_list = []
TaskName_list = []
allAsyncData = mongo.db.job_task_id.find()
for doc in allAsyncData:
try:
taskid_list.append(doc['taskid'])
except:
print('error with db conneciton in asyncJobStatus')
TaskName_list.append(doc['TaskName'])
# PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
for item in taskid_list:
try:
task_state_list.append(myTask1.AsyncResult(item).state)
except:
task_state_list.append('UNKNOWN')
return render_template('status.html', data_list=zip(task_state_list, TaskName_list))
from Flask Celery task locking
No comments:
Post a Comment