The easiest way is to use the DataFrame abstraction that comes with Spark.
val sqlContext = new SQLContext(sc)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set("myTopicName"))
stream.foreachRDD(
rdd => {
val dataFrame = sqlContext.read.json(rdd.map(_._2))
})
source
share