Akka.Kafka - Warning - Resuming Partitions

We receive debugging messages continuously, renewing sections for all topics. As shown below. This message prints every millisecond on my server continuously.

08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-7
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-6
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-9
08:44:34.850 [default-akka.kafka.default-dispatcher-10] DEBUG o.a.k.clients.consumer.KafkaConsumer - Resuming partition test222-8

This is the code

val zookeeperHost = "localhost"
val zookeeperPort = "9092"
// Kafka queue settings
 val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
       .withBootstrapServers(zookeeperHost + ":" + zookeeperPort)
       .withGroupId((groupName))
       .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

 // Streaming the Messages from Kafka queue
 Consumer.committableSource(consumerSettings, Subscriptions.topics(topicName))
     .map(msg => {
       consumed(msg.record.value)
     })
     .runWith(Sink.ignore)

Please help make the partition correct to stop DEBUG messages.

+4
source share
1 answer

It seems like the react-kaf code resumes each section before fetching:

consumer.assignment().asScala.foreach { tp =>
  if (partitionsToFetch.contains(tp)) consumer.resume(java.util.Collections.singleton(tp))
  else consumer.pause(java.util.Collections.singleton(tp))
}
def tryPoll{...}
checkNoResult(tryPoll(0))

The KafkaConsumer.resume method is no-op unless partitions have been previously suspended.

+1
source

Source: https://habr.com/ru/post/1663109/


All Articles