I am currently using Kafka 0.9.0.1. According to some sources I found, a way to set message sizes is to change the following key values ββin server.properties
.
- message.max.bytes
- replica.fetch.max.bytes
- fetch.message.max.bytes
My server.properties
file has these settings.
message.max.bytes=10485760 replica.fetch.max.bytes=20971520 fetch.message.max.bytes=10485760
Below are other options that may be relevant.
socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600
However, when I try to send messages with a payload size of 4 to 6 MB, the consumer never receives any messages. It seems that the manufacturer is sending messages without any exceptions. If I send a smaller payload (for example, 1 MB), then the consumer really receives the messages.
Any idea on what I am doing wrong in terms of configuration settings?
Here is a sample code to send a message.
Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps()); File dir = new File("/path/to/dir"); for(String s : dir.list()) { File f = new File(dir, s); byte[] data = Files.readAllBytes(f.toPath()); Payload payload = new Payload(data);
Here is a sample code to receive a message.
KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps()); consumer.subscribe(Arrays.asList("test")); while(true) { ConsumerRecord<String, byte[]> records = consumer.poll(100); for(ConsumerRecord<String, byte[]> record : records) { long offset = record.offset(); String key = record.key(); byte[] val = record.value(); Payload payload = (Payload)KryoUtil.toObject(val, Payload.class);
Here are ways to populate the property files for the producer and consumer.
public static Properties getProducerProps() { Properties props = new Properties(); props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("compression.type", "snappy"); props.put("max.request.size", 10485760); //need this props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return props; } public static Properties getConsumerProps() { Properties props = new Properties(); props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.partition.fetch.bytes", 10485760); //need this too props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); return props; }