I'm trying to use a cluster of computers to run millions of small simulations. To do this I tried to set up two "servers" on my main computer, one to add input variables in a queue to the network and one to take care of the result.
This is the code for putting stuff into the simulation variables queue:
"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
class MultiComputers(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(MultiComputers, self).__init__()
def get_sim_obj(self, offset, db):
"""returns a list of lists from a database query"""
def handle_queue(self):
self.sim_nr = 0
sims = self.get_sim_obj()
self.total = len(sims)
while len(sims) > 0:
if self.queue.qsize() > 100:
self.queue.put(sims[0])
self.sim_nr += 1
print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
del sims[0]
def run(self):
self.handle_queue()
if __name__ == '__main__':
freeze_support()
queue = Queue()
w = MultiComputers('seed_1_hundred', queue)
w.start()
QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('', 8001), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
And then is this queue run to take care of the results of the simulations:
__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time
class QueueManager(BaseManager):
pass
class SaveFromMultiComp(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(SaveFromMultiComp, self).__init__()
def run(self):
res_got = 0
with open('sim_type1_' + self.sim_name, 'a') as f_1:
with open('sim_type2_' + self.sim_name, 'a') as f_2:
while True:
if self.queue.qsize() > 0:
while self.queue.qsize() > 0:
res = self.queue.get()
res_got += 1
if res[0] == 1:
f_1.write(str(res[1]) + '\n')
elif res[0] == 2:
f_2.write(str(res[1]) + '\n')
print(res_got)
time.sleep(0.5)
if __name__ == '__main__':
queue = Queue()
w = SaveFromMultiComp('seed_1_hundred', queue)
w.start()
m = QueueManager(address=('', 8002), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
These scripts works as expected for handling the first ~7-800 simulations, after that I get the following error in the terminal running the receiving result script:
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
self.run()
File "C:\Python35\lib\threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "C:\Python35\lib\multiprocessing\managers.py", line 177, in accepter
t.start()
File "C:\Python35\lib\threading.py", line 844, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
Can anyone give som insights in where and how the threads are spawned, is a new thread spawned every time I call queue.get()
or how does it work? And I would be very glad if someone knows what I can do to avoid this failure? (i'm running the script with Python3.5-32)
from Python can't start new thread multiprocessing
No comments:
Post a Comment