Passing a map with a struct key to Spark UDF

I want to write Spark 1.6 UDF, which takes the following map:

case class MyRow(mapping: Map[(Int, Int), Double]) val data = Seq( MyRow(Map((1, 1) -> 1.0)) ) val df = sc.parallelize(data).toDF() df.printSchema() root |-- mapping: map (nullable = true) | |-- key: struct | |-- value: double (valueContainsNull = false) | | |-- _1: integer (nullable = false) | | |-- _2: integer (nullable = false) 

(As a note: I think the above output looks weird, since the key type is printed under the value type, why?)

Now I define my UDF as:

 val myUDF = udf((inputMapping: Map[(Int,Int), Double]) => inputMapping.map { case ((i1, i2), value) => ((i1 + i2), value) } ) df .withColumn("udfResult", myUDF($"mapping")) .show() 

But it gives me:

 java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 

So, I tried replacing (Int,Int) with a custom case class , because this is how I usually do it if I want to pass a struct to UDF:

 case class MyTuple2(i1: Int, i2: Int) val myUDF = udf((inputMapping: Map[MyTuple2, Double]) => inputMapping.map { case (MyTuple2(i1, i2), value) => ((i1 + i2), value) } ) 

This strangely gives:

 org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(mapping)' due to data type mismatch: argument 1 requires map<struct<i1:int,i2:int>,double> type, however, 'mapping' is of map<struct<_1:int,_2:int>,double> type. 

I do not understand the above exception because the types match.

The only (ugly) solution I found was to pass to org.apache.spark.sql.Row and then β€œextract” the structure elements:

 val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping .map { case (key, value) => ((key.getInt(0), key.getInt(1)), value) } // extract Row into Tuple2 .map { case ((i1, i2), value) => ((i1 + i2), value) } ) 
+5
source share
1 answer

As far as I know, in this context one can’t get around using Row : the tuple (or case class) used in the map (or another tuple / class case / array ...) is a nested structure, and as such it will be represented as Row when transfer to UDF.

The only improvement I can offer is to use Row.unapply to simplify the code:

 val myUDF = udf((inputMapping: Map[Row, Double]) => inputMapping .map { case (Row(i1: Int, i2: Int), value) => (i1 + i2, value) } ) 
+4
source

Source: https://habr.com/ru/post/1263204/


All Articles