Saturday, 4 November 2023

Making function single threaded and running in background

I am trying to implement in memory queue like kafka using python using concepts like re entrant locks and threads. I am new to python threading.

I have consumer which will subscribe to the topic and read msg from it. So far it's working fine but I have doubts related to threading.

I am trying to make the consumerRunner process single threaded. When checking the output I am seeing MainThread and Thread-1. If the function is single threaded should not it be displaying the message single time by the thread which is executing this function. Message2 and Message4 has been consumed twice. With the single thread they should be consumed only once. I know main Thread is the default Thread but is it correct to have msg printed twice? Sorry if I am asking silly question.

Output:

(py3_8) ninjakx@Kritis-MacBook-Pro kafka % python queueDemo.py
Msg: message1 has been published to topic: topic1

Msg: message2 has been published to topic: topic1

Msg: message3 has been published to topic: topic2

Msg: message4 has been published to topic: topic1

Msg: message1 has been consumed by consumer: consumer1 at offset: 0 with current thread: MainThread

Msg: message3 has been consumed by consumer: consumer1 at offset: 0 with current thread: MainThread

Msg: message2 has been consumed by consumer: consumer1 at offset: 1 with current thread: Thread-1

Msg: message2 has been consumed by consumer: consumer1 at offset: 1 with current thread: MainThread

Msg: message4 has been consumed by consumer: consumer1 at offset: 2 with current thread: Thread-1

Msg: message4 has been consumed by consumer: consumer1 at offset: 2 with current thread: MainThread

ConsumerImpl.py

import zope.interface
from ..interface.iConsumer import iConsumer
from collections import OrderedDict
from mediator.QueueMediatorImpl import QueueMediatorImpl

import threading 
from threading import Thread
import time
 
@zope.interface.implementer(iConsumer)
class ConsumerImpl:
    # will keep all the topic it has subscribed to and their offset

    def __init__(self, consumerName:str):
        self.__consumerName = consumerName
        self.__topicList = []
        self.__topicVsOffset = OrderedDict()
        self.__queueMediator = QueueMediatorImpl()
        self.threadInit()


    def threadInit(self):
        thread = Thread(target = self._consumerRunner)
        thread.start()
        # thread.join()
        # print("thread finished...exiting")        

    def __getConsumerName(self):
        return self.__consumerName

    def __getQueueMediator(self):
        return self.__queueMediator

    def __getSubscribedTopics(self)->list:
        return self.__topicList

    def __setTopicOffset(self, topicName:str, offset:int)->int:
        self.__topicVsOffset[topicName] = offset

    def __getTopicOffset(self, topicName:str)->int:
        return self.__topicVsOffset[topicName]

    def __addToTopicList(self, topicName:str)->None:
        self.__topicList.append(topicName)

    def _subToTopic(self, topicName:str):
        self.__addToTopicList(topicName)
        self.__topicVsOffset[topicName] = 0

    def __consumeMsg(self, msg:str, offset:int):
        print(f"Msg: {msg} has been consumed by consumer: {self.__getConsumerName()} at offset: {offset} with current thread: {threading.current_thread().name}\n")


    # pull based mechanism
    # running on single thread
    def _consumerRunner(self):
        while(True):
            for topicName in self.__getSubscribedTopics():
                curOffset = self.__getTopicOffset(topicName)
                qmd = self.__getQueueMediator()
                msg = qmd._readMsgIfPresent(topicName, curOffset)
                if msg is not None:
                    self.__consumeMsg(msg._getMessage(), curOffset)
                    curOffset += 1
                # update offset
                self.__setTopicOffset(topicName, curOffset)

            try:
                #sleep for 100 milliseconds
                #thread sleep
                # "sleep() makes the calling thread sleep until seconds seconds have elapsed or a signal arrives which is not ignored."
                time.sleep(0.1)
            except Exception as e:
                print(f"Error: {e}")

QueueDemo.py

from service.QueueServiceImpl import QueueServiceImpl

if __name__ == "__main__":
    queueService = QueueServiceImpl()
    producer1 = queueService._createProducer("producer1")
    producer2 = queueService._createProducer("producer2")
    producer3 = queueService._createProducer("producer3")
    producer4 = queueService._createProducer("producer4")

    consumer1 = queueService._createConsumer("consumer1")
    consumer2 = queueService._createConsumer("consumer2")
    consumer3 = queueService._createConsumer("consumer3")
    
    producer1._publishToTopic("topic1", "message1")
    producer1._publishToTopic("topic1", "message2")
    producer2._publishToTopic("topic2", "message3")
    producer1._publishToTopic("topic1", "message4")

    consumer1._subToTopic("topic1")
    consumer1._subToTopic("topic2")

    consumer1._consumerRunner()        
    
    
    
    
        


from Making function single threaded and running in background

No comments:

Post a Comment