I installed kafka on one node and started zookeeper as well as kafka server.I checked it for the internal producer and consumer on the console and it works fine. But when I start the kafka internal user on the console and my user maker does not work.
Below is my producer class
Properties props = new Properties(); props.put("metadata.broker.list", "xx.xx.xx.xx:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "com.example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); KeyedMessage<String, String> data = new KeyedMessage<String, String>( "mails", "xxxx"); producer.send(data);
When the control reaches the manufacturer .send (), it stops after 3 attempts with the following exception.
java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.Producer.send(Producer.scala:77) at kafka.javaapi.producer.Producer.send(Producer.scala:33)
source share