Camel Kafka Integration

I recently noticed that Camel now has its own component for Kafka, so I decided to give it a whirl.

I decided to try a nice simple file -> kafka theme as follows ...

<route> <from uri="file:///tmp/input" /> <setHeader headerName="kafka.PARTITION_KEY"> <constant>Test</constant> </setHeader> <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" /> </route> 

It seems simple enough, however, when I run this I get ...

  java.lang.ClassCastException: java.lang.String cannot be cast to [B at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78) 

And when checking the Camel code, it does the following ...

 String msg = exchange.getIn().getBody(String.class); KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); producer.send(data); 

Obviously this is a serialization issue, am I just not sure if there is a workaround, or is this essentially a bug with the existing implementation? (Or, hopefully, just my misunderstanding)

Any suggestions? Thanks, J

+5
source share
2 answers

And it doesn’t matter, we are here ... I hope this helps someone else, you need to install the serializer in the options.

 <route> <from uri="file:///tmp/input" /> <setHeader headerName="kafka.PARTITION_KEY"> <constant>Test</constant> </setHeader> <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" /> </route> 
+10
source

Found a good example to install and run Apache Kafka and configure the camel endpoint to send a message to the Kafka theme -

 @Override public void configure() throws Exception { String topicName = "topic=javainuse-topic"; String kafkaServer = "kafka:localhost:9092"; String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181"; String serializerClass = "serializerClass=kafka.serializer.StringEncoder"; String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&") .append(zooKeeperHost).append("&").append(serializerClass).toString(); from("file:C:/inbox?noop=true").split().tokenize("\n").to(toKafka); } 

Link- Apache Camel + Kafka Integration Example

0
source

Source: https://habr.com/ru/post/1202089/


All Articles