I am using the new Apache Spark version 1.4.0 data APIs to extract information from Twitter status. JSON, mainly object-oriented Objects - the relevant part of this question is shown below:
{ ... ... "entities": { "hashtags": [], "trends": [], "urls": [], "user_mentions": [ { "screen_name": "linobocchini", "name": "Lino Bocchini", "id": 187356243, "id_str": "187356243", "indices": [ 3, 16 ] }, { "screen_name": "jeanwyllys_real", "name": "Jean Wyllys", "id": 111123176, "id_str": "111123176", "indices": [ 79, 95 ] } ], "symbols": [] }, ... ... }
There are several examples of how to extract information from primitive types like string , integer , etc., but I could not find anything about how to handle such complex structures.
I tried the code below, but it still does not work, it throws an exception
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) val tweets = sqlContext.read.json("tweets.json") // this function is just to filter empty entities.user_mentions[] nodes // some tweets doesn't contains any mentions import org.apache.spark.sql.functions.udf val isEmpty = udf((value: List[Any]) => value.isEmpty) import org.apache.spark.sql._ import sqlContext.implicits._ case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String) val mentions = tweets.select("entities.user_mentions"). filter(!isEmpty($"user_mentions")). explode($"user_mentions") { case Row(arr: Array[Row]) => arr.map { elem => UserMention( elem.getAs[Long]("id"), elem.getAs[String]("is_str"), elem.getAs[Array[Long]]("indices"), elem.getAs[String]("name"), elem.getAs[String]("screen_name")) } } mentions.first
An exception occurred while trying to call mentions.first :
scala> mentions.first 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8) scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55) at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
What is wrong here? I understand that this is related to types, but I still can not understand.
As an additional context, an automatically transformed structure:
scala> mentions.printSchema root |-- user_mentions: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- id: long (nullable = true) | | |-- id_str: string (nullable = true) | | |-- indices: array (nullable = true) | | | |-- element: long (containsNull = true) | | |-- name: string (nullable = true) | | |-- screen_name: string (nullable = true)
NOTE 1: I know that this can be solved using HiveQL , but I would like to use data frames when there will be so many pulses around.
SELECT explode(entities.user_mentions) as mentions FROM tweets
NOTE 2: UDF val isEmpty = udf((value: List[Any]) => value.isEmpty) is an ugly hack and I missed something here, but that was the only way I came to avoid NPE