A typical suggestion is an exception from sql for this, but if you want to stay in SQL, here is the answer I received from the request in the mailing list (nabble does not show the answer for some reason):
From Michael Armbrost
( HiveContext), , , , , , jsonRDD json- ( ) .
val myJson = sqlContext.jsonRDD(sc.parallelize("""{"foo":[{"bar":1},{"baz":2}]}""" :: Nil))
myJson.registerTempTable("JsonTest")
val result = sql("SELECT f.bar FROM JsonTest LATERAL VIEW explode(foo) a AS f").collect()
myJson: org.apache.spark.sql.DataFrame = [foo: array<struct<bar:bigint,baz:bigint>>]
result: Array[org.apache.spark.sql.Row] = Array([1], [null])
Spark 1.3 jsonRDD, , json, Maps ( ) structs, JSON.
import org.apache.spark.sql.types._
val schema =
StructType(
StructField("foo", ArrayType(MapType(StringType, IntegerType))) :: Nil)
sqlContext.jsonRDD(sc.parallelize("""{"foo":[{"bar":1},{"baz":2}]}""" :: Nil), schema).registerTempTable("jsonTest")
val withSql = sql("SELECT a FROM jsonTest LATERAL VIEW explode(foo) a AS a WHERE a['bar'] IS NOT NULL").collect()
val withSpark = sql("SELECT a FROM jsonTest LATERAL VIEW explode(foo) a AS a").rdd.filter {
case Row(a: Map[String, Int]) if a.contains("bar") => true
case _: Row => false
}.collect()
schema: org.apache.spark.sql.types.StructType = StructType(StructField(foo,ArrayType(MapType(StringType,IntegerType,true),true),true))
withSql: Array[org.apache.spark.sql.Row] = Array([Map(bar -> 1)])
withSpark: Array[org.apache.spark.sql.Row] = Array([Map(bar -> 1)])