Tuesday, 12 October 2021

How to make Python multithreaded pipeline utilize 90% of available memory?

My Python script is running in GCE instance, with Pub/Sub as input (async pull subscription) and output.

As far as I understand, this way I can control the number of concurrent threads and, therefore limit the amount of memory used. If I set max_messages to 100, my script eventually runs out of memory.

from google.cloud import pubsub_v1
from concurrent import futures

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)

def callback(message):
        print (str(message.data) + " " + str(threading.current_thread()))
        message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)

It seems to me that hardcoding number of workers and messages is a primitive method of controlling memory utilization. Is there a better way to make my script allocate as many threads as VM resources allow to utilize it as efficiently as possible?



from How to make Python multithreaded pipeline utilize 90% of available memory?

No comments:

Post a Comment