Kafka Consumer Losing State Of Messages After Shutdown
Thanks for taking time to answer the question. I am using kafka with a python consumer. Everything works great when the consumer is up and running and messages get pushed to kafka
Solution 1:
What client are you using? Maybe it is necessary to set the start offset for the consumer. Have a look at the seek() function and auto-commit setting. May my codes help, but maybe we use different consumer classes (mine:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):
def connect(self):
'''Initialize Kafka Client and Consumer.'''
try:
print "Try to init KafkaClient:", self.Brokers
self.__kafka_client = KafkaClient( self.Brokers )
print "Try to init Kafka Consumer."
self.__consumer = SimpleConsumer(
self.__kafka_client,
self.GroupID,
self.Topic,
auto_commit = True,
partitions=self.Partitions,
auto_commit_every_n = 100,
auto_commit_every_t=5000,
fetch_size_bytes=4096,
buffer_size=4096,
max_buffer_size=32768,
iter_timeout=None,
auto_offset_reset='largest' )
print "Set the starting offset."
self.__consumer.seek(0, self.OffsetMode)
self.__consumer.seek(0, 0) =>start reading from the beginning of the queue.
self.__consumer.seek(0, 1) =>start reading from current offset.
self.__consumer.seek(0, 2) =>skip all the pending messages and start reading only new messages (** maybeyour case**).
Post a Comment for "Kafka Consumer Losing State Of Messages After Shutdown"