Thursday, 4 July 2019

How to multithread AsyncConsumer with Django Channels

I've been working with Django Channels for a week and somethin bugs me with the runworker parallelism.

For example, I have this MQTT client which publishes in the channels when it receives a message, basic.

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

This is sending it well. I can send how much I want, it will be sent to the redis queue. To the channel mqtt.

I then run the worker which will redirect messages in the queue for mqtt with :

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']

This is where the problem begins. Here is the content of the AsyncConsumer reading the data :

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

I putted a sleep in order to simulate business of the task. And this is where I'm going : the async consumer is not multi threaded ! When I send two messages to the channel, it takes 10 seconds to the consumer to treat the second message, instead of 5 if it were multi threaded. As shown below.

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

Any intel on the subject would be a great help, thanks in advance !

EDIT: The only way to manage it I found is to make an executor which will contain the workers to do it async. But I'm not sure of its efficiency for deploy purposes

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)



from How to multithread AsyncConsumer with Django Channels

No comments:

Post a Comment