I would like to write an integration test checking connection of the Python script with Azure Service Bus queue. The test should:
- send a message to a queue,
- confirm that the message landed in the queue.
The test looks like this:
import pytest
from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusSender
CONNECTION_STRING = <some connection string>
QUEUE = <queue name>
def send_message_to_service_bus(sender: ServiceBusSender, msg: str) -> None:
message = ServiceBusMessage(msg)
sender.send_message(message)
class TestConnectionWithQueue:
def test_message_is_sent_to_queue_and_received(self):
msg = "test message sent to queue"
expected_message = ServiceBusMessage(msg)
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE)
with sender:
send_message_to_service_bus(sender, expected_message)
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE)
with receiver:
messages_in_queue = receiver.receive_messages(max_message_count=10, max_wait_time=20)
assert any(expected_message == str(actual_message) for actual_message in messages_in_queue)
The test occassionally works, more often than not it doesn't. There are no other messages sent to the queue at the same time. As I debugged the code, if the test does not work, the variable messages_in_queue
is just an empty list.
Why doesn't the code work at all times and what should be done to fix it?
from Test of sending & receiving message for Azure Service Bus Queue
No comments:
Post a Comment