How to set message size in Kafka?

I am currently using Kafka 0.9.0.1. According to some sources I found, a way to set message sizes is to change the following key values ​​in server.properties .

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

My server.properties file has these settings.

 message.max.bytes=10485760 replica.fetch.max.bytes=20971520 fetch.message.max.bytes=10485760 

Below are other options that may be relevant.

 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 

However, when I try to send messages with a payload size of 4 to 6 MB, the consumer never receives any messages. It seems that the manufacturer is sending messages without any exceptions. If I send a smaller payload (for example, 1 MB), then the consumer really receives the messages.

Any idea on what I am doing wrong in terms of configuration settings?

Here is a sample code to send a message.

 Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps()); File dir = new File("/path/to/dir"); for(String s : dir.list()) { File f = new File(dir, s); byte[] data = Files.readAllBytes(f.toPath()); Payload payload = new Payload(data); //a simple pojo to store payload String key = String.valueOf(System.currentTimeMillis()); byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[] producer.send(new ProducerRecord<>("test", key, val)); } producer.close(); 

Here is a sample code to receive a message.

 KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps()); consumer.subscribe(Arrays.asList("test")); while(true) { ConsumerRecord<String, byte[]> records = consumer.poll(100); for(ConsumerRecord<String, byte[]> record : records) { long offset = record.offset(); String key = record.key(); byte[] val = record.value(); Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object System.out.println( System.format("offset=%d, key=%s", offset, key)); } } 

Here are ways to populate the property files for the producer and consumer.

 public static Properties getProducerProps() { Properties props = new Properties(); props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("compression.type", "snappy"); props.put("max.request.size", 10485760); //need this props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return props; } public static Properties getConsumerProps() { Properties props = new Properties(); props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.partition.fetch.bytes", 10485760); //need this too props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); return props; } 
+5
source share
2 answers

Jane, Do not use fetch.message.max.bytes primarily because this property is owned by the consumer and is not included in the server.properties file, and the second is for the old version of the consumer, use max.partition.fetch.bytes when creating the Consumer as part of the properties that you use to create it.

+8
source

The max.fetch.bytes option is also possible.

0
source

Source: https://habr.com/ru/post/1244079/


All Articles