How to read json data using scala from kafka theme in apache spark

I am a new spark. Could you let me know how to read json data using scala from kafka theme into apache sparks.

Thanks.

+4
source share
2 answers

The easiest way is to use the DataFrame abstraction that comes with Spark.

val sqlContext = new SQLContext(sc)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
                  ssc, kafkaParams, Set("myTopicName"))

stream.foreachRDD(
  rdd => {
     val dataFrame = sqlContext.read.json(rdd.map(_._2)) //converts json to DF
     //do your operations on this DF. You won't even require a model class.
        })
+7
source

I am using the Play Framework library for Json . You can add it to your project as a standalone module. The following is used:

import play.api.libs.json._
import org.apache.spark.streaming.kafka.KafkaUtils

case class MyClass(field1: String,
                   field2: Int)

implicit val myClassFormat = Json.format[MyClass]

val kafkaParams = Map[String, String](...here are your params...)    
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, Set("myTopicName"))
  .map(m => Json.parse(m._2).as[MyClass])
+4
source

Source: https://habr.com/ru/post/1628924/


All Articles