I am trying to implement in memory queue like kafka using python using concepts like re entrant locks and threads. I am new to python threading.
I have consumer which will subscribe to the topic and read msg from it. So far it's working fine but I have doubts related to threading.
I am trying to make the consumerRunner process single threaded. When checking the output I am seeing MainThread and Thread-1. If the function is single threaded should not it be displaying the message single time by the thread which is executing this function. Message2 and Message4 has been consumed twice. With the single thread they should be consumed only once. I know main Thread is the default Thread but is it correct to have msg printed twice? Sorry if I am asking silly question.
Output:
(py3_8) ninjakx@Kritis-MacBook-Pro kafka % python queueDemo.py
Msg: message1 has been published to topic: topic1
Msg: message2 has been published to topic: topic1
Msg: message3 has been published to topic: topic2
Msg: message4 has been published to topic: topic1
Msg: message1 has been consumed by consumer: consumer1 at offset: 0 with current thread: MainThread
Msg: message3 has been consumed by consumer: consumer1 at offset: 0 with current thread: MainThread
Msg: message2 has been consumed by consumer: consumer1 at offset: 1 with current thread: Thread-1
Msg: message2 has been consumed by consumer: consumer1 at offset: 1 with current thread: MainThread
Msg: message4 has been consumed by consumer: consumer1 at offset: 2 with current thread: Thread-1
Msg: message4 has been consumed by consumer: consumer1 at offset: 2 with current thread: MainThread
ConsumerImpl.py
import zope.interface
from ..interface.iConsumer import iConsumer
from collections import OrderedDict
from mediator.QueueMediatorImpl import QueueMediatorImpl
import threading
from threading import Thread
import time
@zope.interface.implementer(iConsumer)
class ConsumerImpl:
# will keep all the topic it has subscribed to and their offset
def __init__(self, consumerName:str):
self.__consumerName = consumerName
self.__topicList = []
self.__topicVsOffset = OrderedDict()
self.__queueMediator = QueueMediatorImpl()
self.threadInit()
def threadInit(self):
thread = Thread(target = self._consumerRunner)
thread.start()
# thread.join()
# print("thread finished...exiting")
def __getConsumerName(self):
return self.__consumerName
def __getQueueMediator(self):
return self.__queueMediator
def __getSubscribedTopics(self)->list:
return self.__topicList
def __setTopicOffset(self, topicName:str, offset:int)->int:
self.__topicVsOffset[topicName] = offset
def __getTopicOffset(self, topicName:str)->int:
return self.__topicVsOffset[topicName]
def __addToTopicList(self, topicName:str)->None:
self.__topicList.append(topicName)
def _subToTopic(self, topicName:str):
self.__addToTopicList(topicName)
self.__topicVsOffset[topicName] = 0
def __consumeMsg(self, msg:str, offset:int):
print(f"Msg: {msg} has been consumed by consumer: {self.__getConsumerName()} at offset: {offset} with current thread: {threading.current_thread().name}\n")
# pull based mechanism
# running on single thread
def _consumerRunner(self):
while(True):
for topicName in self.__getSubscribedTopics():
curOffset = self.__getTopicOffset(topicName)
qmd = self.__getQueueMediator()
msg = qmd._readMsgIfPresent(topicName, curOffset)
if msg is not None:
self.__consumeMsg(msg._getMessage(), curOffset)
curOffset += 1
# update offset
self.__setTopicOffset(topicName, curOffset)
try:
#sleep for 100 milliseconds
#thread sleep
# "sleep() makes the calling thread sleep until seconds seconds have elapsed or a signal arrives which is not ignored."
time.sleep(0.1)
except Exception as e:
print(f"Error: {e}")
QueueDemo.py
from service.QueueServiceImpl import QueueServiceImpl
if __name__ == "__main__":
queueService = QueueServiceImpl()
producer1 = queueService._createProducer("producer1")
producer2 = queueService._createProducer("producer2")
producer3 = queueService._createProducer("producer3")
producer4 = queueService._createProducer("producer4")
consumer1 = queueService._createConsumer("consumer1")
consumer2 = queueService._createConsumer("consumer2")
consumer3 = queueService._createConsumer("consumer3")
producer1._publishToTopic("topic1", "message1")
producer1._publishToTopic("topic1", "message2")
producer2._publishToTopic("topic2", "message3")
producer1._publishToTopic("topic1", "message4")
consumer1._subToTopic("topic1")
consumer1._subToTopic("topic2")
consumer1._consumerRunner()
from Making function single threaded and running in background
No comments:
Post a Comment