I tried following this solution as well as this solution but thus far have been unsuccessful:
When I run the following block of code:
global manager
global lock
manager = Manager()
lock = manager.Lock()
class MyClass(object):
def get_next_chunk(self, numberlist, chunks):
for i in range(0, len(numberlist), chunks):
yield numberlist[i:i + chunks]
def multi_process(self, numberlist):
procs = 5
chunksize = 100
with Pool(procs) as pool:
pool.map(self.process_numberlist,
self.get_next_chunk(numberlist, chunksize))
return self.running_total_list
def process_numberlist(self, numberlist):
temp_num_list = []
temp_num_list = self.getnewNumbers()
logger.debug("temp_num_list length: " + str(len(temp_num_list)))
try:
lock.acquire()
except Exception as e:
logger.error("Couldn't acquire lock")
logger.error(e)
traceback.format_exc()
logger.error(sys.exc_info()[0])
self.running_total_list = self.running_total_list + temp
logger.debug("New running_total_list length: "
+ str(len(self.running_total_list)))
lock.release()
break
The output in my logs look like:
[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 6
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 9
When my expected output I believe should look like:
[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 11
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 15
Edit - Attempt 2
See update based on Aaron's suggestion. Now receiving a 'can only join an iterable' error
global manager
global lock
class MyClass(object):
def get_next_chunk(self, numberlist, chunks):
for i in range(0, len(numberlist), chunks):
yield numberlist[i:i + chunks]
def multi_process(self, numberlist):
procs = 5
chunksize = 100
manager = Manager()
lock = manager.Lock()
with Pool(procs) as pool:
func = partial(self.process_numberlist, lock)
pool.map(function,
self.get_next_chunk(numberlist, chunksize))
return self.running_total_list
def process_numberlist(self, numberlist, lock):
temp_num_list = []
temp_num_list = self.getnewNumbers()
logger.debug("temp_num_list length: " + str(len(temp_num_list)))
try:
lock.acquire()
self.running_total_list = self.running_total_list + temp_num_list
logger.debug("New running_total_list length: "
+ str(len(self.running_total_list)))
lock.release()
except Exception as e:
logger.error("Couldn't acquire lock")
logger.error(e)
traceback.format_exc()
logger.error(sys.exc_info()[0])
break
EDIT # 3 - getNewNumbers() which is not included in this toy example, simply returns an array of integers. Hope that helps
from Sharing Lock between Processes
No comments:
Post a Comment