Given a data frame in which one column is a sequence of structures generated by the following sequence
val df = spark .range(10) .map((i) => (i % 2, util.Random.nextInt(10), util.Random.nextInt(10))) .toDF("a","b","c") .groupBy("a") .agg(collect_list(struct($"b",$"c")).as("my_list")) df.printSchema df.show(false)
Outputs
root |-- a: long (nullable = false) |-- my_list: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- b: integer (nullable = false) | | |-- c: integer (nullable = false) +---+-----------------------------------+ |a |my_list | +---+-----------------------------------+ |0 |[[0,3], [9,5], [3,1], [4,2], [3,3]]| |1 |[[1,7], [4,6], [5,9], [6,4], [3,9]]| +---+-----------------------------------+
I need to run a function on each list of structures. The function prototype is similar to the function below
case class DataPoint(b: Int, c: Int) def do_something_with_data(data: Seq[DataPoint]): Double = {
I want to save the result of this function to another DataFrame column.
I tried to run
val my_udf = udf(do_something_with_data(_)) val df_with_result = df.withColumn("result", my_udf($"my_list")) df_with_result.show(false)
and received
17/07/13 12:33:42 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 225, REDACTED, executor 0): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<struct<b:int,c:int>>) => double) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line27.$read$$iw$$iw$DataPoint at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$do_something_with_data$1.apply(<console>:29) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.do_something_with_data(<console>:29) at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29) at $line32.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:29)
Is it possible to use UDF like this without first casting my lines into the container structure with the DataFrame API?
Doing something like:
case class MyRow(a: Long, my_list: Seq[DataPoint]) df.as[MyRow].map(_ => (a, my_list, my_udf(my_list)))
using api DataSet files, but I would prefer to stick with the DataFrame API if possible.