Wednesday, 15 May 2019

Sharing Lock between Processes

I tried following this solution as well as this solution but thus far have been unsuccessful:

When I run the following block of code:

global manager
global lock
manager = Manager()
lock = manager.Lock()

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        with Pool(procs) as pool:
            pool.map(self.process_numberlist,
                  self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
            lock.acquire()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        self.running_total_list = self.running_total_list + temp
        logger.debug("New running_total_list length: "
                    + str(len(self.running_total_list)))
        lock.release()
        break

The output in my logs look like:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 6
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 9

When my expected output I believe should look like:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 11
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 15

Edit - Attempt 2

See update based on Aaron's suggestion. Now receiving a 'can only join an iterable' error

global manager
global lock

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        manager = Manager()
        lock = manager.Lock()
        with Pool(procs) as pool:
            func = partial(self.process_numberlist, lock)
            pool.map(function,
              self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist, lock):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
             lock.acquire()
             self.running_total_list = self.running_total_list + temp_num_list
             logger.debug("New running_total_list length: "
                + str(len(self.running_total_list)))
             lock.release()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        break

EDIT # 3 - getNewNumbers() which is not included in this toy example, simply returns an array of integers. Hope that helps



from Sharing Lock between Processes

No comments:

Post a Comment