Thursday, 29 July 2021

django channels asyncio trouble running task in order

I'm struggling to get some code to work using asyncio. I'm very new to it. So new to it, that I don't know how to properly figure out what I'm doing wrong.

I am using Django Channels to run an AyncJsonWebsocketConsumer which I connect to via websockets from the client application. I used websockets because I need bidirectional communication. I'm creating a printing process where I start a series of long running actions, but I need the ability to pause, stop, etc. I had this all working when I was using asyncio.sleep(x) to mock my long running task (the print_layer method). When I tried adding my RPC to a queue inplace of the asyncio.sleep it stopped working as expected.

class Command(Enum):
    START = 'start_print'
    PAUSE = 'pause_print'
    STOP = 'stop_print'
    CANCEL = 'cancel_print'
    RESET = 'reset_print'

class State(Enum):
    NOT_STARTED = 0
    STARTING = 1
    RUNNING = 2
    PAUSED = 3
    STOPPED = 4
    COMPLETE = 5
    ERROR = 500

class PrintConsumer(AsyncJsonWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.status = State.NOT_STARTED
        self.publisher = Publisher()
        self.print_instruction = None
        self.current_task = None
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(enabled=True)

    @property
    def running_task(self):
        return self.current_task is not None and not self.current_task.done() and not self.current_task.cancelled()

    async def connect(self):
        await self.accept()

    async def disconnect(self, close_code):
        self.status = State.STOPPED

    async def receive(self, text_data):
        response = json.loads(text_data)
        event_cmd = response.get('event', None)
        
        if Command(event_cmd) == Command.START:
            if self.status == State.NOT_STARTED:
                sting_uuid = response.get('sting_uuid', None)
                try:
                    await asyncio.wait_for(self.initialize_print(sting_uuid), timeout=10)  # WARNING: this is blocking
                except asyncio.TimeoutError:
                    self.status = State.ERROR
            self.status = State.RUNNING
            if not self.running_task:
                # it would only have a running task in this situation already if someone starts/stops it quickly
                self.current_task = asyncio.create_task(self.resume_print())
        elif Command(event_cmd) == Command.PAUSE:
            self.status = State.PAUSED
        elif Command(event_cmd) == Command.STOP:
            if self.running_task:
                self.current_task.cancel()
        elif Command(event_cmd) == Command.RESET:
            self.status = State.NOT_STARTED
            self.print_instruction = None
            if self.running_task:
                self.current_task.cancel()
            self.current_task = None
            await self.send(json.dumps({ 'message': []}))
        
    async def initialize_print(self, uuid):
        stingfile = await get_file(uuid)
        # This is just an iterator that returns the next task to do
        # hence I use "next" in the resume_print method
        self.print_instruction = StingInstruction(stingfile)


    async def resume_print(self):
        try:
            while self.status == State.RUNNING:
                await self.send(json.dumps({ 'message': self.print_instruction.serialized_action_status}))
                await asyncio.sleep(.2) # It works with this here only
                try:
                    action = next(self.print_instruction)  # step through iterator
                except StopIteration:
                    self.status = State.COMPLETE
                    break;

                # can't seem to get this part to work
                await self.print_layer(action)
        except asyncio.CancelledError:
            # TODO: Add logic here that will send a command out to stop pumps and all motors.
            self.status = State.STOPPED
    
    async def print_layer(self, instruction):
        print_command = instruction['instruction']
        # this publishes using RPC, so it holds until I get a response.
        try:
            await asyncio.wait_for(self.publisher.publish('pumps', json.dumps(print_command)), timeout=10)
        except asyncio.TimeoutError:
            self.status = State.ERROR
        # when I used just this in the function, the code worked as expected
        # await asyncio.sleep(1)


I don't know where to begin when showing what I've tried... My "best" attempt, as I see it, was to turn the print_layer method into a thread so that it did not block execution using asyncio.to_thread(print_layer).. but in many of the things I tried, it would not even execute.

The self.print_instruction.serialized_action_status returns the status of each step. My goal is to have it sending this before each long running task. This might look like...

# sending status update for each step to client
# running print_layer for first action
# sending status update for each step to client
# running print_layer for second action
...
# sending final update

Instead, I'm creating every single task at once, and it's sending the updates all at the end when I add the long running task, or a number of issues. I can get the long running task to run in order (seemingly), but the send won't actually send inbetween layer prints. I'd really appreciate some help.. thank you in advance.

Here is some simplified relevant code (doesn't handle connection loss, etc) for my publisher...

class Publisher():
    def on_response(self, ch, method, props, body):
        """when job response set job to inactive"""

    async def publish(routing_key, msg):
        new_corr_id = str(uuid4())
        self.active_jobs[new_corr_id] = False
        self.channel.basic_publish(...)
        white not self.active_jobs[new_corr_id]:
            self._connection.process_data_events()
            sleep(.1)

I found a partial working hack.. if I add await asyncio.sleep(.1) after my send command (i.e. like this)

await self.send(json.dumps({ 'message': self.print_instruction.serialized_action_status}))
await asyncio.sleep(.2)

then it appears to work how I want it to (minus ability to interrupt), and I'm able to still pause/start my process. Obviously I'd rather do this without a hack. Why does this code all of the sudden work where the status updates send out as expected after the .2 asyncio sleep and not without? I also can not interrupt with a STOP command, which I don't understand. I would have expected the django channel to read the stop command, and then cancel the task that was running and force the asyncio.CancelledError in the resume_print method.



from django channels asyncio trouble running task in order

No comments:

Post a Comment