Thursday, 2 December 2021

Python logging with multithreading + multiprocessing

Please take time to read full question to understand the exact issue. Thankyou.

I have a runner/driver program that listens to a Kafka topic and dispatches tasks using a ThreadPoolExecuter whenever a new message is received on the topic ( as shown below ) :



consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
                                 bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
                                 value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                                 enable_auto_commit=False,
                                 auto_offset_reset='latest',
                                 max_poll_records=1,
                                 max_poll_interval_ms=300000)


with ThreadPoolExecutor(max_workers=10) as executor:
     futures = []
     for message in consumer:
         futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))

There is a bunch of code in between but that code is not important here so I have skipped it.

Now, the SOME_FUNCTION is from another python script that is imported ( infact there is a hierarchy of imports that happen in later stages ). What is important is that at some point in these scripts, I call the Multiprocessing Pool because I need to do parallel processing on data ( SIMD - single instruction multiple data ) and use the apply_async function to do so.

for loop_message_chunk in loop_message_chunks:
    res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))

Now, I have 2 versions of the runner/driver program :

  1. Kafka based ( the one shown above )

    • This version spawns threads that start multiprocessing

    Listen To Kafka -> Start A Thread -> Start Multiprocessing

  2. REST based ( using flask to achieve same task with a REST call )

    • This version does not start any threads and calls multiprocessing right away

    Listen to REST endpoint -> Start Multiprocessing

Why 2 runner/driver scripts you ask? - this microservice will be used by multiple teams and some want synchronous REST based while some teams want a real time and asynchronous system that is KAFKA based

When I do logging from the parallelized function ( self.one_matching.match in above example ) it works when called through the REST version but not when called using the KAFKA version ( basically when multiprocessing is kicked off by a thread - it does not work ).

Also notice that only the logging from the parallelized function does not work. rest of the scripts in the hierarchy from runner to the script that calls apply_async - which includes scripts that are called from within the thread - log successfully.

Other details :

  • I configure loggers using yaml file
  • I configure the logger in the runner script itself for either KAFKA or REST version
  • I do a logging.getLogger in every other script called after the runner script to get specific loggers to log to different files

Logger Config ( values replaced with generic since I cannot chare exact names ):

version: 1
formatters:
  simple:
    format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
  custom1:
    format: '%(asctime)s | %(filename)s :: %(message)s'
  time-message:
    format: '%(asctime)s | %(message)s'
handlers:
  console:
    class: logging.StreamHandler
    level: DEBUG
    formatter: simple
    stream: ext://sys.stdout
  handler1:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile1.log
  handler2:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: custom1
    level: INFO
    filename: logs/logfile2.log
  handler3:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile3.log
  handler4:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 30
    formatter: time-message
    level: DEBUG
    filename: logs/logfile4.log
  handler5:
    class: logging.handlers.TimedRotatingFileHandler
    when: midnight
    backupCount: 5
    formatter: simple
    level: DEBUG
    filename: logs/logfile5.log
loggers:
  logger1:
    level: DEBUG
    handlers: [console, handler1]
    propagate: no
  logger2:
    level: DEBUG
    handlers: [console, handler5]
    propagate: no
  logger3:
    level: INFO
    handlers: [handler2]
    propagate: no
  logger4:
    level: DEBUG
    handlers: [console, handler3]
    propagate: no
  logger5:
    level: DEBUG
    handlers: [console, handler4]
    propagate: no
  kafka:
    level: WARNING
    handlers: [console]
    propogate: no
root:
  level: INFO
  handlers: [console]
  propogate: no


from Python logging with multithreading + multiprocessing

No comments:

Post a Comment