Thursday, 27 August 2020

Python can't start new thread multiprocessing

I'm trying to use a cluster of computers to run millions of small simulations. To do this I tried to set up two "servers" on my main computer, one to add input variables in a queue to the network and one to take care of the result.

This is the code for putting stuff into the simulation variables queue:

"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


class MultiComputers(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(MultiComputers, self).__init__()

    def get_sim_obj(self, offset, db):
        """returns a list of lists from a database query"""

    def handle_queue(self):
        self.sim_nr = 0
        sims = self.get_sim_obj()
        self.total = len(sims)
        while len(sims) > 0:
            if self.queue.qsize() > 100:
                self.queue.put(sims[0])
                self.sim_nr += 1
                print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
                del sims[0]

    def run(self):
        self.handle_queue()

if __name__ == '__main__':
    freeze_support()
    queue = Queue()
    w = MultiComputers('seed_1_hundred', queue)
    w.start()
    QueueManager.register('get_queue', callable=lambda: queue)
    m = QueueManager(address=('', 8001), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

And then is this queue run to take care of the results of the simulations:

__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time


class QueueManager(BaseManager):
    pass


class SaveFromMultiComp(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(SaveFromMultiComp, self).__init__()

    def run(self):
        res_got = 0
        with open('sim_type1_' + self.sim_name, 'a') as f_1:
            with open('sim_type2_' + self.sim_name, 'a') as f_2:
                while True:
                    if self.queue.qsize() > 0:
                        while self.queue.qsize() > 0:
                            res = self.queue.get()
                            res_got += 1
                            if res[0] == 1:
                                f_1.write(str(res[1]) + '\n')
                            elif res[0] == 2:
                                f_2.write(str(res[1]) + '\n')
                            print(res_got)
                    time.sleep(0.5)


if __name__ == '__main__':
    queue = Queue()
    w = SaveFromMultiComp('seed_1_hundred', queue)
    w.start()
    m = QueueManager(address=('', 8002), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

These scripts works as expected for handling the first ~7-800 simulations, after that I get the following error in the terminal running the receiving result script:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\lib\multiprocessing\managers.py", line 177, in accepter
    t.start()
  File "C:\Python35\lib\threading.py", line 844, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

Can anyone give som insights in where and how the threads are spawned, is a new thread spawned every time I call queue.get() or how does it work? And I would be very glad if someone knows what I can do to avoid this failure? (i'm running the script with Python3.5-32)



from Python can't start new thread multiprocessing

No comments:

Post a Comment