I'm struggling to get some code to work using asyncio
. I'm very new to it. So new to it, that I don't know how to properly figure out what I'm doing wrong.
I am using Django Channels to run an AyncJsonWebsocketConsumer
which I connect to via websockets from the client application. I used websockets because I need bidirectional communication. I'm creating a printing process where I start a series of long running actions, but I need the ability to pause, stop, etc. I had this all working when I was using asyncio.sleep(x)
to mock my long running task (the print_layer
method). When I tried adding my RPC to a queue inplace of the asyncio.sleep
it stopped working as expected.
class Command(Enum):
START = 'start_print'
PAUSE = 'pause_print'
STOP = 'stop_print'
CANCEL = 'cancel_print'
RESET = 'reset_print'
class State(Enum):
NOT_STARTED = 0
STARTING = 1
RUNNING = 2
PAUSED = 3
STOPPED = 4
COMPLETE = 5
ERROR = 500
class PrintConsumer(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.status = State.NOT_STARTED
self.publisher = Publisher()
self.print_instruction = None
self.current_task = None
self.loop = asyncio.get_event_loop()
self.loop.set_debug(enabled=True)
@property
def running_task(self):
return self.current_task is not None and not self.current_task.done() and not self.current_task.cancelled()
async def connect(self):
await self.accept()
async def disconnect(self, close_code):
self.status = State.STOPPED
async def receive(self, text_data):
response = json.loads(text_data)
event_cmd = response.get('event', None)
if Command(event_cmd) == Command.START:
if self.status == State.NOT_STARTED:
sting_uuid = response.get('sting_uuid', None)
try:
await asyncio.wait_for(self.initialize_print(sting_uuid), timeout=10) # WARNING: this is blocking
except asyncio.TimeoutError:
self.status = State.ERROR
self.status = State.RUNNING
if not self.running_task:
# it would only have a running task in this situation already if someone starts/stops it quickly
self.current_task = asyncio.create_task(self.resume_print())
elif Command(event_cmd) == Command.PAUSE:
self.status = State.PAUSED
elif Command(event_cmd) == Command.STOP:
if self.running_task:
self.current_task.cancel()
elif Command(event_cmd) == Command.RESET:
self.status = State.NOT_STARTED
self.print_instruction = None
if self.running_task:
self.current_task.cancel()
self.current_task = None
await self.send(json.dumps({ 'message': []}))
async def initialize_print(self, uuid):
stingfile = await get_file(uuid)
# This is just an iterator that returns the next task to do
# hence I use "next" in the resume_print method
self.print_instruction = StingInstruction(stingfile)
async def resume_print(self):
try:
while self.status == State.RUNNING:
await self.send(json.dumps({ 'message': self.print_instruction.serialized_action_status}))
await asyncio.sleep(.2) # It works with this here only
try:
action = next(self.print_instruction) # step through iterator
except StopIteration:
self.status = State.COMPLETE
break;
# can't seem to get this part to work
await self.print_layer(action)
except asyncio.CancelledError:
# TODO: Add logic here that will send a command out to stop pumps and all motors.
self.status = State.STOPPED
async def print_layer(self, instruction):
print_command = instruction['instruction']
# this publishes using RPC, so it holds until I get a response.
try:
await asyncio.wait_for(self.publisher.publish('pumps', json.dumps(print_command)), timeout=10)
except asyncio.TimeoutError:
self.status = State.ERROR
# when I used just this in the function, the code worked as expected
# await asyncio.sleep(1)
I don't know where to begin when showing what I've tried... My "best" attempt, as I see it, was to turn the print_layer
method into a thread so that it did not block execution using asyncio.to_thread(print_layer)
.. but in many of the things I tried, it would not even execute.
The self.print_instruction.serialized_action_status
returns the status of each step. My goal is to have it sending this before each long running task. This might look like...
# sending status update for each step to client
# running print_layer for first action
# sending status update for each step to client
# running print_layer for second action
...
# sending final update
Instead, I'm creating every single task at once, and it's sending the updates all at the end when I add the long running task, or a number of issues. I can get the long running task to run in order (seemingly), but the send
won't actually send inbetween layer prints. I'd really appreciate some help.. thank you in advance.
Here is some simplified relevant code (doesn't handle connection loss, etc) for my publisher...
class Publisher():
def on_response(self, ch, method, props, body):
"""when job response set job to inactive"""
async def publish(routing_key, msg):
new_corr_id = str(uuid4())
self.active_jobs[new_corr_id] = False
self.channel.basic_publish(...)
white not self.active_jobs[new_corr_id]:
self._connection.process_data_events()
sleep(.1)
I found a partial working hack.. if I add await asyncio.sleep(.1)
after my send command (i.e. like this)
await self.send(json.dumps({ 'message': self.print_instruction.serialized_action_status}))
await asyncio.sleep(.2)
then it appears to work how I want it to (minus ability to interrupt), and I'm able to still pause/start my process. Obviously I'd rather do this without a hack. Why does this code all of the sudden work where the status updates send out as expected after the .2
asyncio sleep and not without? I also can not interrupt with a STOP command, which I don't understand. I would have expected the django channel to read the stop command, and then cancel the task that was running and force the asyncio.CancelledError
in the resume_print
method.
from django channels asyncio trouble running task in order
No comments:
Post a Comment