Tuesday, 11 May 2021

How to stop the execution of a long process if something changes in the db?

I have a view that sends a message to a RabbitMQ queue.

message = {'origin': 'Bytes CSV',
           'data': {'csv_key': str(csv_entry.key),
                    'csv_fields': csv_fields
                    'order_by': order_by,
                    'filters': filters}}

...

queue_service.send(message=message, headers={}, exchange_name=EXCHANGE_IN_NAME,
                   routing_key=MESSAGES_ROUTING_KEY.replace('#', 'bytes_counting.create'))

On my consumer, I have a long process to generate a CSV.

def create(self, data):
    csv_obj = self._get_object(key=data['csv_key'])
    if csv_obj.status == CSVRequestStatus.CANCELED:
        self.logger.info(f'CSV {csv_obj.key} was canceled by the user')
        return

    result = self.generate_result_data(filters=data['filters'], order_by=data['order_by'], csv_obj=csv_obj)
    csv_data = self._generate_csv(result=result, csv_fields=data['csv_fields'], csv_obj=csv_obj)
    file_key = self._post_csv(csv_data=csv_data, csv_obj=csv_obj)

    csv_obj.status = CSVRequestStatus.READY
    csv_obj.status_additional = CSVRequestStatusAdditional.SUCCESS
    csv_obj.file_key = file_key
    csv_obj.ready_at = timezone.now()
    csv_obj.save(update_fields=['status', 'status_additional', 'ready_at', 'file_key'])

    self.logger.info(f'CSV {csv_obj.name} created')

The long proccess happens inside self._generate_csv, because self.generate_result_data returns a queryset, which is lazy.

As you can see, if a user changes the status of the csv_request through an endpoint BEFORE the message starts to be consumed the proccess will not be evaluated. My goal is to let this happen during the execution of self._generate_csv.

So far I tried to use Threading, but unsuccessfully.

How can I achive my goal?

Thanks a lot!



from How to stop the execution of a long process if something changes in the db?

No comments:

Post a Comment