Get topic from Kafka's message in spark

In our work with spark flow, we read messages in streaming from kafka.

To do this, we use the KafkaUtils.createDirectStream API, which returns JavaPairInputDStreamfrom .

Messages are read from kafka (out of three topics - test1, test2, test3) as follows:

 private static final String TOPICS = "test1,test2,test3"; HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(","))); HashMap<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", BROKERS); JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); 

We want to process messages from each topic differently, and for this we need to know the topic name for each message.

therefore we do the following:

 JavaDStream<String> lines = messages.map(new SplitToLinesFunction()); 

and this is the implementation of SplitToLinesFunction :

 public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> { @Override public String call(Tuple2<String, String> tuple2) { System.out.println(tuple2._1); return tuple2._2(); } } 

The problem is that tuple2._1 is null, and we assumed that tuple2._1 will contain some metadata, such as the name of the section / section from which the message came.

However, when we tuple2._1 , it is null.

Our question: is there a way to send the topic name to kafka so that in code with a spark stream tuple2._1 will contain it (and will not be null)?

Please note that we also tried to get the names of the themes from DStream, as indicated in the spark-forming integration kafa tutorial :

But it returns ALL topics that were sent to KafkaUtils.createDirectStream , and not the specific section from which messages were received (related to the current RDD).

Thus, this did not help us determine the name of the topic from which messages were sent to RDD.

EDIT

in response to David's answer - I tried using MessageAndMetadata as follows:

  Map<TopicAndPartition, Long> topicAndPartition = new HashMap(); topicAndPartition.put(new TopicAndPartition("test1", 0), 1L); topicAndPartition.put(new TopicAndPartition("test2", 0), 1L); topicAndPartition.put(new TopicAndPartition("test3", 0), 1L); class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String> { @Override public String call(MessageAndMetadata<String, String> v1) throws Exception { // nothing is printed here System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition()); return v1.topic(); } } JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction()); messages.foreachRDD(new VoidFunction() { @Override public void call(Object t) throws Exception { JavaRDD<String> rdd = (JavaRDD<String>)t; OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); // here all the topics kafka listens to are printed, but that doesn't help for (OffsetRange offset : offsets) { System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset()); } } }); 

The problem is that nothing was printed in the MessageAndMetadataFunction.call method. What should I fix to get the appropriate theme for this RDD inside the MessageAndMetadataFunction.call method?

+5
source share
2 answers

Use one of the versions of createDirectStream that executes the messageHandler function as a parameter. That's what I'm doing:

 val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]]( ssc, kafkaParams, getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap, (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} ) 

There is something that means nothing to you - an important part

 (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)} 

If you are not familiar with Scala , the entire do function returns msg.message . Your function should return both of them so that you can use them downstream. Instead, you can simply return the entire MessageAndMetadata object, which will give you a couple of other interesting fields. But if you only need topic and message , use the above.

+6
source

At the bottom of the Kafka Integration Guide there is an example that extracts a topic from messages.

Relevant code in Java:

  // Hold a reference to the current offset ranges, so it can be used downstream final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); directKafkaStream.transformToPair( new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd; } } ).map( ... ).foreachRDD( new Function<JavaPairRDD<String, String>, Void>() { @Override public Void call(JavaPairRDD<String, String> rdd) throws IOException { for (OffsetRange o : offsetRanges.get()) { System.out.println( o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() ); } ... return null; } } ); 

This can probably be folded into something more compact that just asks for a theme and nothing more.

+1
source

Source: https://habr.com/ru/post/1247623/


All Articles