I am new to Spark and Scala. I am trying to call a function like Spark UDF, but I ran into this error that I cannot solve.
I understand that in Scala Array and Seq do not match. WrappedArray is a subtype of Seq and there are implicit conversions between WrappedArray and Array, but I'm not sure why this does not happen in the case of UDF.
Any pointers that help me understand and solve this problem are greatly appreciated.
Here is a snippet of code
def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = { val seqToArray = a.toArray val s = seqToArray.toSet m filterKeys s } val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a)) case class myType(id: Int, m: Map[Int, Int]) val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400))) val mapDF = mapRDD.toDF mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) case class myType2(id: Int, a: Array[Int]) val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) ) val idDF = idRDD.toDF idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>] root |-- id: integer (nullable = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) import sqlContext.implicits._ val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id")) val k = j.withColumn("filteredMap",myUDF(j("m"), j("a"))) k.show
Looking at the Dataframe "j" and "k", the columns of maps and arrays have the correct data types.
j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) |-- filteredMap: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false)
However, the action on the Dataframe "k" that invokes the UDF does not execute with the following error -
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)