The Confluent Kafka library (python version in this case) has a produce method which takes a delivery callback function:
kafka_producer.produce(topic=topic,
key=key,
value=value,
on_delivery=delivery_callback)
This callback is called whether the message was successfully delivered or not:
def delivery_callback(err, msg):
I don't have any retry logic in this function if the message failed, because the docs say it is asynchronous.
Instead every 100 messages or so, I rely on flush() to tell me if any messages weren't successfully produced:
messages_outstanding = kafka_producer.flush()
if messages_outstanding == 0:
//continue to the next batch of 100
else:
//produce the batch again
Will flush() account for any messages which failed to produce? (reported as errors in delivery_callback)
In other words, can I be sure flush() won't return zero if any messages failed?
from Prevent Confluent Kafka from losing messages when producing
No comments:
Post a Comment