In our work with spark flow, we read messages in streaming from kafka.
To do this, we use the KafkaUtils.createDirectStream API, which returns JavaPairInputDStreamfrom .
Messages are read from kafka (out of three topics - test1, test2, test3) as follows:
private static final String TOPICS = "test1,test2,test3"; HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(","))); HashMap<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", BROKERS); JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet );
We want to process messages from each topic differently, and for this we need to know the topic name for each message.
therefore we do the following:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
and this is the implementation of SplitToLinesFunction :
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> { @Override public String call(Tuple2<String, String> tuple2) { System.out.println(tuple2._1); return tuple2._2(); } }
The problem is that tuple2._1 is null, and we assumed that tuple2._1 will contain some metadata, such as the name of the section / section from which the message came.
However, when we tuple2._1 , it is null.
Our question: is there a way to send the topic name to kafka so that in code with a spark stream tuple2._1 will contain it (and will not be null)?
Please note that we also tried to get the names of the themes from DStream, as indicated in the spark-forming integration kafa tutorial :
But it returns ALL topics that were sent to KafkaUtils.createDirectStream , and not the specific section from which messages were received (related to the current RDD).
Thus, this did not help us determine the name of the topic from which messages were sent to RDD.
EDIT
in response to David's answer - I tried using MessageAndMetadata as follows:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap(); topicAndPartition.put(new TopicAndPartition("test1", 0), 1L); topicAndPartition.put(new TopicAndPartition("test2", 0), 1L); topicAndPartition.put(new TopicAndPartition("test3", 0), 1L); class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String> { @Override public String call(MessageAndMetadata<String, String> v1) throws Exception {
The problem is that nothing was printed in the MessageAndMetadataFunction.call method. What should I fix to get the appropriate theme for this RDD inside the MessageAndMetadataFunction.call method?