I have a code that just enqueuing a message to the broker's queue with pika.
class Publisher:
def __init__(self, config):
self._params = ConnectionParameters(
host = config.RABBITMQ_HOST,
credentials = PlainCredentials(config.RABBITMQ_USER, config.RABBITMQ_PASSWORD))
self._conn = None
self._channel = None
self.exchange_name = config.RABBITMQ_AGENT_EXCHANGE
def connect(self):
if not self._conn or self._conn.is_closed:
self._conn = BlockingConnection(self._params)
self._channel = self._conn.channel()
self._channel.exchange_declare(exchange=self.exchange_name, exchange_type = 'topic')
def _publish(self, task):
properties = BasicProperties(expiration=task.expiration_ms)
self._channel.basic_publish(exchange= self.exchange_name,
routing_key = task.routing_key,
properties = properties if task.has_expiration else None,
body=dumps(task, cls = TaskEncoder).encode())
logging.debug('message sent: %s', task)
def publish(self, msg):
"""Publish msg, reconnecting if necessary."""
try:
self._publish(msg)
except ConnectionClosed:
logging.error('reconnecting to queue')
self.connect()
self._publish(msg)
Pika stops enqueuing messages for long-running connection with the next messages and doesn't throw any error anymore
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.heartbeat [INFO] - Connection is idle, 1 stale byte intervals
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)>
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)
Code usage
publisher = Publisher(config)
publisher.connect()
while True:
publisher.publish(obj)
time.sleep(1)
I have 2 questions:
How to prevent it? Does it disabling heartbeat can work in this case?
How to reproduce/simulate this behavior with a firewall? I tried to add a rule with packet drop on RMQ port but with no luck.
Pika version: 1.0.1
RMQ version: 3.8.9
Python: 3.8.6
from Pika heartbeat terminates connection
No comments:
Post a Comment