Friday 5 March 2021

Multiprocessing: optimize CPU usage for concurrent HTTP async requests

I need to download a list of sites/URLs (which can vary over time) and I currently use multiprocessing.Manager().Queue() to submit and update said list.
I have to check each URL/task every second: hence each task will basically never ends (until a specific condition mets, like user interrupting). I thought that multiprocessing.Process() combined with asyncio and a good async HTTP client would solve the problem. Unfortunately, I still get a very high CPU usage after submitting 50 or more URLs. You will notice the difference yourselves when the tasks are not doing any requests - running mock_request() - and when they are - running do_request() -.

Here is an example to reproduce each case (press CTRL+C to end it gracefully at any time).

import asyncio, os, sys, time, httpx
import multiprocessing
import queue as Queue

class ExitHandler(object):
    def __init__(self, manager, queue, processes):
        self.manager = manager
        self.queue = queue
        self.processes = processes
        
    def set_exit_handler(self):
        if os.name == "nt":
            try:
                import win32api
                win32api.SetConsoleCtrlHandler(self.on_exit, True)
            except ImportError:
                version = ".".join(map(str, sys.version_info[:2]))
                raise Exception("pywin32 not installed for Python " + version)
        else:
            import signal
            signal.signal(signal.SIGINT, self.on_exit)
            #signal.signal(signal.CTRL_C_EVENT, func)
            signal.signal(signal.SIGTERM, self.on_exit)

    def on_exit(self, sig, func=None):
        print('[Main process]: exit triggered, terminating all workers')
        STOP_WAIT_SECS= 5 
        for _ in range(N_WORKERS):
            self.queue.put('END')
        
        try:
            end_time = time.time() + STOP_WAIT_SECS
            # wait up to STOP_WAIT_SECS for all processes to complete
            for proc in self.processes:
                join_secs = max(0.0, min(end_time - time.time(), STOP_WAIT_SECS))
                proc.join(join_secs)

            # clear the procs list and _terminate_ any procs that have not yet exited
            while self.processes and len(self.processes) > 0:
                proc = self.processes.pop()
                if proc.is_alive():
                    proc.terminate()
            
            self.manager.shutdown()

            # finally, kill this thread and any running
            os._exit(0)
        except Exception:
            pass

async def mock_request(url):

    # we won't do any request here, it's just an example of how much less CPU
    # each process consumes when not doing requests

    x = 0
    while True:
        try:
            x += 1
            print('Finished downloading {}'.format(url))
            await asyncio.sleep(1)
        except asyncio.CancelledError:
            return

async def do_request(url):

    while True:
        try:
            # I use httpx (https://github.com/encode/httpx/) as async client for its simplicity
            # feel free to use your preferred library (e.g. aiohttp)
            async with httpx.AsyncClient() as s:
                await s.get(url)
                print('Finished downloading {}'.format(url))
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            return

def worker(queue):
    
    try:
        event_loop = asyncio.get_event_loop()
        event_loop.run_until_complete(request_worker(queue))
    except KeyboardInterrupt:
        pass

async def request_worker(queue):
    
    p = multiprocessing.current_process()
    loop = asyncio.get_event_loop()
    
    while True:
        try:
            task = await loop.run_in_executor(None, queue.get)
            
            if task == 'END':
                break
            
            elif task['action'] == 'DOWNLOAD':
                print('Worker {}: Received new task'.format(p.name))
                f = loop.create_task(do_request(task['url'])) # high CPU usage
                # f = loop.create_task(mock_request(task['url'])) # low (almost none) CPU usage

        except KeyboardInterrupt:
            pass
        except Queue.Empty:
            pass

    print('Task Worker {}: ending'.format(p.name))

def run_workers(queue, processes):

    print('Starting workers')

    for _ in range(N_WORKERS):
        processes.append(multiprocessing.Process(target=worker, args=(queue,)))

    task = {
        'action': 'DOWNLOAD',
        'url': 'https://google.com'
    }
    
    # this is just an example forcing the same URL * 100 times, while in reaility
    # it will be 1 different URL per task
    for _ in range(100):
        queue.put(task)

    for p in processes:
        p.start()

    for p in processes:
        p.join()
    
    return True

if __name__ == "__main__":
    processes = []
    N_WORKERS = 8 # processes to spawn
    manager = multiprocessing.Manager()
    q = manager.Queue() # main queue to send URLs to

    # just a useful clean exit handler (press CTRL+C to terminate)
    exit_handler = ExitHandler(manager, q, processes) 
    exit_handler.set_exit_handler()

    # start the workers
    run_workers(q, processes)

Here's just an example of how many CPU each process consumes, when doing requests simultaneously:

cpu usage

Any solution that significantly reduces CPU usage (keeping the same amount of requests per second) is accepted, either it uses multiprocessing or not. The only must for me is the async pattern.



from Multiprocessing: optimize CPU usage for concurrent HTTP async requests

No comments:

Post a Comment