Skip to content Skip to sidebar Skip to footer

Kafka Consumer Losing State Of Messages After Shutdown

Thanks for taking time to answer the question. I am using kafka with a python consumer. Everything works great when the consumer is up and running and messages get pushed to kafka

Solution 1:

What client are you using? Maybe it is necessary to set the start offset for the consumer. Have a look at the seek() function and auto-commit setting. May my codes help, but maybe we use different consumer classes (mine:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):

def connect(self):
        '''Initialize Kafka Client and Consumer.'''
        try:
            print "Try to init KafkaClient:", self.Brokers
            self.__kafka_client = KafkaClient( self.Brokers )


            print "Try to init Kafka Consumer."
            self.__consumer = SimpleConsumer(
                    self.__kafka_client,
                    self.GroupID,
                    self.Topic,
                    auto_commit = True,
                    partitions=self.Partitions,
                    auto_commit_every_n = 100,
                    auto_commit_every_t=5000,
                    fetch_size_bytes=4096,
                    buffer_size=4096,
                    max_buffer_size=32768,
                    iter_timeout=None,
                    auto_offset_reset='largest' )


            print "Set the starting offset."
            self.__consumer.seek(0, self.OffsetMode)


self.__consumer.seek(0, 0) =>start reading from the beginning of the queue.
self.__consumer.seek(0, 1) =>start reading from current offset.
self.__consumer.seek(0, 2) =>skip all the pending messages and start reading only new messages (** maybeyour case**).

Post a Comment for "Kafka Consumer Losing State Of Messages After Shutdown"