Using Spark UDF with Structural Sequences

Given a data frame in which one column is a sequence of structures generated by the following sequence

val df = spark .range(10) .map((i) => (i % 2, util.Random.nextInt(10), util.Random.nextInt(10))) .toDF("a","b","c") .groupBy("a") .agg(collect_list(struct($"b",$"c")).as("my_list")) df.printSchema df.show(false) 

Outputs

 root |-- a: long (nullable = false) |-- my_list: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- b: integer (nullable = false) | | |-- c: integer (nullable = false) +---+-----------------------------------+ |a |my_list | +---+-----------------------------------+ |0 |[[0,3], [9,5], [3,1], [4,2], [3,3]]| |1 |[[1,7], [4,6], [5,9], [6,4], [3,9]]| +---+-----------------------------------+ 

I need to run a function on each list of structures. The function prototype is similar to the function below

 case class DataPoint(b: Int, c: Int) def do_something_with_data(data: Seq[DataPoint]): Double = { // This is an example. I don't actually want the sum data.map(data_point => data_point.b + data_point.c).sum } 

I want to save the result of this function to another DataFrame column.

I tried to run

 val my_udf = udf(do_something_with_data(_)) val df_with_result = df.withColumn("result", my_udf($"my_list")) df_with_result.show(false) 

and received

 17/07/13 12:33:42 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 225, REDACTED, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<struct<b:int,c:int>>) => double) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line27.$read$$iw$$iw$DataPoint at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$do_something_with_data$1.apply(<console>:29) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.do_something_with_data(<console>:29) at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29) at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29) 

Is it possible to use UDF like this without first casting my lines into the container structure with the DataFrame API?

Doing something like:

 case class MyRow(a: Long, my_list: Seq[DataPoint]) df.as[MyRow].map(_ => (a, my_list, my_udf(my_list))) 

using api DataSet files, but I would prefer to stick with the DataFrame API if possible.

+5
source share
1 answer

You cannot use a case class as an input argument to your UDF (but you can return class classes from UDF). To map an array of structures, you can pass Seq[Row] to your UDF:

 val my_uDF = udf((data: Seq[Row]) => { // This is an example. I don't actually want the sum data.map{case Row(x:Int,y:Int) => x+y}.sum }) df.withColumn("result", my_udf($"my_list")).show +---+--------------------+------+ | a| my_list|result| +---+--------------------+------+ | 0|[[0,3], [5,5], [3...| 41| | 1|[[0,9], [4,9], [6...| 54| +---+--------------------+------+ 
+6
source

Source: https://habr.com/ru/post/1269790/


All Articles