I recently noticed that Camel now has its own component for Kafka, so I decided to give it a whirl.
I decided to try a nice simple file -> kafka theme as follows ...
<route> <from uri="file:///tmp/input" /> <setHeader headerName="kafka.PARTITION_KEY"> <constant>Test</constant> </setHeader> <to uri="kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1" /> </route>
It seems simple enough, however, when I run this I get ...
java.lang.ClassCastException: java.lang.String cannot be cast to [B at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78)
And when checking the Camel code, it does the following ...
String msg = exchange.getIn().getBody(String.class); KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); producer.send(data);
Obviously this is a serialization issue, am I just not sure if there is a workaround, or is this essentially a bug with the existing implementation? (Or, hopefully, just my misunderstanding)
Any suggestions? Thanks, J
source share