I am trying to pull data from Flume channels to a Kafka cluster using Kafka sink, and I can see the related data in a related topic, but at the same time I observe too often observed exception tracing in Kafka logs,
[2017-03-21 16:47:56,250] WARN Unexpected error from /10.XXX; closing connection (org.apache.kafka.common.network.Selector) org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 30662099 larger than 30662028) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at kafka.network.Processor.run(SocketServer.scala:413) at java.lang.Thread.run(Thread.java:745)
An initial analysis led me to my flume logs and watched it trace an exception,
21 Mar 2017 16:25:32,560 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to publish events at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:229) ... 3 more
from the first stack trace, it seems that Flume is trying to push data of 30662099 bytes in size, but the allowable limit for the Kafka broker is limited to 30662028 bytes.
I saved the same size of sending and receiving messages from the manufacturer (Flume) and Broker (Kafka), that is, 30662028, it bothers me if my Flume sends only 30662028 bytes, which additional bytes accumulate along with the message from my manufacturers and form the final message of size 30662099 and causing this message.
Any help would be really noticeable.
source share