Wednesday, 22 May 2019

Robust way to manage and kill any process

I am writing code to run experiments in parallel. I don't have control over what the experiments do, they might open use subprocess.Popen or check_output to run one or multiple additional child processes.

I have two conditions: I want to be able to kill experiments that exceed a time out and I want to kill experiments upon KeyboardInterrupt.

Most ways to terminate processes don't make sure that all subprocesses etc are killed. This is obviously a problem if 100s of experiments are run one after the other but they all spawn child processes that stay alive after the timeout occurred and the experiment was supposedly killed.

The way I am dealing with this now it to include code to store experiment configurations in a database, generating code that loads and runs experiments from command line and then calling these commands via subprocess.Popen(cmd, shell=True, start_new_session=True) and killing them using os.killpg on timeout.

My main question then is: Calling these experiments via command line feels cumbersome, so is there a way to call code directly via multiprocessing.Process(target=fn) and achieving the same effect of start_new_session=True + os.killpg upon timeout and KeyboardInterrupt?

<file1>
def run_exp(config):
    do work
    return result

if __name__ == "__main__":
    save_exp(run_exp(load_config(sys.args)))

<file2>
def monitor(queue):
    active = set()  # active process ids
    while True:
        msg = queue.get()
        if msg == "sentinel":
             <loop over active ids and kill them with os.killpg>
        else:
            <add or remove id from active set>


def worker(args):
    id, queue = args
    command = f"python <file1> {id}"
    with subprocess.Popen(command, shell=True, ..., start_new_session=True) as process:
        try:
            queue.put(f"start {process.pid}")
            process.communicate(timeout=timeout)
        except TimeoutExpired:
            os.killpg(process.pid, signal.SIGINT)  # send signal to the process group
            process.communicate()
        finally:
            queue.put(f"done {process.pid}")

def main():
    <save configs => c_ids>
    queue = manager.Queue()
    process = Process(target=monitor, args=(queue,))
    process.start()

    def clean_exit():
        queue.put("sentinel")
        <terminate pool and monitor process>

    r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
    atexit.register(clean_exit)
    r.wait()
    <terminate pool and monitor process>

I posted a skeleton of the code that details the approach of starting processes via command line and killing them. An additional complication of that version of my approach is that when the KeyboardInterrupt arrives, the queue already gets terminated (for a lack of a better word) and communicating with the monitor process is impossible (the sentinel message never arrives). Instead I have to resort to writing process ids to file and reading the file back to in the main process to kill the still running processes. If you know a way to work around this queue-issue I'd be eager to learn about it.



from Robust way to manage and kill any process

No comments:

Post a Comment