Scala.collection.mutable.WrappedArray $ ofRef cannot be attributed to Integer

I am new to Spark and Scala. I am trying to call a function like Spark UDF, but I ran into this error that I cannot solve.

I understand that in Scala Array and Seq do not match. WrappedArray is a subtype of Seq and there are implicit conversions between WrappedArray and Array, but I'm not sure why this does not happen in the case of UDF.

Any pointers that help me understand and solve this problem are greatly appreciated.

Here is a snippet of code

def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = { val seqToArray = a.toArray val s = seqToArray.toSet m filterKeys s } val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a)) case class myType(id: Int, m: Map[Int, Int]) val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400))) val mapDF = mapRDD.toDF mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) case class myType2(id: Int, a: Array[Int]) val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) ) val idDF = idRDD.toDF idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>] root |-- id: integer (nullable = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) import sqlContext.implicits._ /* Hive context is exposed as sqlContext */ val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id")) val k = j.withColumn("filteredMap",myUDF(j("m"), j("a"))) k.show 

Looking at the Dataframe "j" and "k", the columns of maps and arrays have the correct data types.

 j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) |-- filteredMap: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) 

However, the action on the Dataframe "k" that invokes the UDF does not execute with the following error -

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
+5
source share
1 answer

Changing the data type from the [Int] array to Seq [Int] in the filterMapKeysWithSet function seems to fix the above problem.

 def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = { val seqToArray = a.toArray val s = seqToArray.toSet m filterKeys s } val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a)) k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>] root |-- id: integer (nullable = false) |-- m: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) |-- a: array (nullable = true) | |-- element: integer (containsNull = false) |-- filteredMap: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = false) +---+--------------------+----------------+--------------------+ | id| m| a| filteredMap| +---+--------------------+----------------+--------------------+ | 1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...| | 2|Map(1 -> 100, 2 -...| [100, 200]| Map()| | 3|Map(3 -> 300, 4 -...| [1, 2]| Map()| +---+--------------------+----------------+--------------------+ 

So it looks like the ArrayType in the Dataframe "idDF" is indeed a WrappedArray, not an array. Thus, the call to the filterMapKeysWithSet function failed because it was expecting an array, but instead received a WrappedArray / Seq (which is implicitly converted to an array in Scala 2.8 and higher).

+10
source

Source: https://habr.com/ru/post/1258653/


All Articles