I am trying to process a json message from kafka. When I go through RDD in a stream and try to use SparkSession to read a json string, I see a null pointer exception. Can anyone understand what is wrong here:
SparkSession spark = SparkSession
.builder().master("local[*]")
.appName("ABC")
.config("spark.some.config.option", "some-value")
.getOrCreate();
JavaStreamingContext jssc = new JavaStreamingContext(new StreamingContext(spark.sparkContext(),Durations.seconds(2)));
JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic1), kafkaParams)
);
istream1.foreachRDD(rdd ->{
rdd.foreach(consumerRecord ->{
Dataset<Row> rawData = spark.read().json(consumerRecord.value());
rawData.createOrReplaceTempView("sample");
Dataset<Row> resultsDF = spark.sql("SELECT alert_id, date from sample");
resultsDF.show();
});
});
I see that I cannot use a spark session or context that is part of this session in the foreachRDD section (get a null pointer).
: java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState $lzycompute (SparkSession.scala: 112) org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala: 110) org.apache.spark.sql.DataFrameReader. (DataFrameReader.scala: 535) org.apache.spark.sql.SparkSession.read(SparkSession.scala: 595) at com.ibm.sifs.evidence.SpoofingEvidence.lambda $1 (SpoofingEvidence.java:99) org.apache.spark.api.java.JavaRDDLike $$ anonfun $foreach $1.apply(JavaRDDLike.scala: 350) org.apache.spark.api.java.JavaRDDLike $$ anonfun $foreach $1.apply(JavaRDDLike.scala: 350) scala.collection.Iterator $class.foreach(Iterator.scala: 893) at org.apache.spark.streaming.kafka010.KafkaRDD $KafkaRDDIterator.foreach(KafkaRDD.scala: 193) at org.apache.spark.rdd.RDD $$ anonfun $foreach $1 $$ anonfun $apply $27.apply(RDD.scala: 875) at org.apache.spark.rdd.RDD $$ anonfun $foreach $1 $$ anonfun $apply $27.apply(RDD.scala: 875) at org.apache.spark.SparkContext $$ anonfun $runJob $5.apply(SparkContext.scala: 1897) at org.apache.spark.SparkContext $$ anonfun $runJob $5.apply(SparkContext.scala: 1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala: 70) at org.apache.spark.scheduler.Task.run(Task.scala: 85) at org.apache.spark.executor.Executor $TaskRunner.run(Executor.scala: 274) ... 3