Fixed Spark Session pointer when processing json from kafka

I am trying to process a json message from kafka. When I go through RDD in a stream and try to use SparkSession to read a json string, I see a null pointer exception. Can anyone understand what is wrong here:

    SparkSession spark = SparkSession
  .builder().master("local[*]")
              .appName("ABC")
              .config("spark.some.config.option", "some-value")
                  .getOrCreate();
JavaStreamingContext jssc = new JavaStreamingContext(new StreamingContext(spark.sparkContext(),Durations.seconds(2)));

        // Kafka params code here.....not shown

            JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
                    jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic1), kafkaParams)
                );


istream1.foreachRDD(rdd ->{ 
                rdd.foreach(consumerRecord ->{

                    Dataset<Row> rawData = spark.read().json(consumerRecord.value());
                  rawData.createOrReplaceTempView("sample");
                  Dataset<Row> resultsDF = spark.sql("SELECT alert_id, date from sample");
                  resultsDF.show();
                });
            });

I see that I cannot use a spark session or context that is part of this session in the foreachRDD section (get a null pointer).

: java.lang.NullPointerException       at org.apache.spark.sql.SparkSession.sessionState $lzycompute (SparkSession.scala: 112)        org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala: 110)        org.apache.spark.sql.DataFrameReader. (DataFrameReader.scala: 535)        org.apache.spark.sql.SparkSession.read(SparkSession.scala: 595)       at com.ibm.sifs.evidence.SpoofingEvidence.lambda $1 (SpoofingEvidence.java:99)        org.apache.spark.api.java.JavaRDDLike $$ anonfun $foreach $1.apply(JavaRDDLike.scala: 350)        org.apache.spark.api.java.JavaRDDLike $$ anonfun $foreach $1.apply(JavaRDDLike.scala: 350)        scala.collection.Iterator $class.foreach(Iterator.scala: 893)       at org.apache.spark.streaming.kafka010.KafkaRDD $KafkaRDDIterator.foreach(KafkaRDD.scala: 193)       at org.apache.spark.rdd.RDD $$ anonfun $foreach $1 $$ anonfun $apply $27.apply(RDD.scala: 875)       at org.apache.spark.rdd.RDD $$ anonfun $foreach $1 $$ anonfun $apply $27.apply(RDD.scala: 875)       at org.apache.spark.SparkContext $$ anonfun $runJob $5.apply(SparkContext.scala: 1897)       at org.apache.spark.SparkContext $$ anonfun $runJob $5.apply(SparkContext.scala: 1897)       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala: 70)       at org.apache.spark.scheduler.Task.run(Task.scala: 85)       at org.apache.spark.executor.Executor $TaskRunner.run(Executor.scala: 274)       ... 3

+4

Source: https://habr.com/ru/post/1660285/


All Articles