How to use partial function consisting of orElse as udf in spark

As the question says, I would like to use a partial function consisting of orElse, like udf in a spark. Here is an example that can be run in a spark shell:

val df = sc.parallelize(1 to 15).toDF("num") df.show //Testing out a normal udf - this works val gt5: (Int => String) = num => (num > 5).toString val gt5Udf = udf(gt5) df.withColumn("gt5", gt5Udf(col("num"))).show //Now create a udf of a partial function composed with orElse val baseline: PartialFunction[Int, String] = { case _ => "baseline" } val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" } val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" } val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" } val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline val composedUdf = udf(composed) //This fails (but this is what I'd like to do) df.withColumn("pf", composedUdf(col("num"))).show //Use a partial function not composed with orElse - this works val baselineUdf = udf(baseline) df.withColumn("pf", baselineUdf(col("num"))).show 

I am currently running this on a three standalone node cluster with the following configuration:

  • spark: 1.6.0
  • hdfs: 2.4.1
  • scala: 2.10.5

I found what, in my opinion, is a hint in this answer: Why can Scala serialize a function, but not a PartialFunction?

so i tried:

 scala> composed.isInstanceOf[Serializable] res: Boolean = false scala> composedUdf.isInstanceOf[Serializable] res: Boolean = true scala> baseline.isInstanceOf[Serializable] res: Boolean = true scala> baselineUdf.isInstanceOf[Serializable] res: Boolean = true 

I get the blur here, but it seems like a partial function using orElse removes serialization?

I think the most informative are the errors:

 org.apache.spark.SparkException: Task not serializable ... Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse ... 

How to fix it? Or am I from the base?

Thanks in advance for your help!

+5
source share
2 answers

It should work if you pick it up and wrap it in another function.

 val composed: Int => Option[String] = x => (ge12 orElse ge7 orElse ge3 orElse baseline).lift.apply(x) 
+3
source

Although this does not directly concern your problem, I would like to propose an alternative solution using SQL functions.

First you will need to import the necessary functions:

 import org.apache.spark.sql.functions.{when, lit} 

and some implicits for short:

 import sqlContext.implicits._ 

Further you can express the same conditions as in your code:

 val baseline = lit("baseline") val ge3 = when($"num" >= 3, ">=3") val ge7 = when($"num" >= 7, ">=7") val ge12 = when($"num" >= 12, ">=12") val composed = ge12 otherwise (ge7 otherwise (ge3 otherwise baseline)) 

In this form, it is a little less elegant, but you can easily create an expression like this using the standard collection API ( foldLeft / foldRight ) and , unlike UDFs , the result can be optimized by the Catalyst Optimizer.

+1
source

Source: https://habr.com/ru/post/1257865/


All Articles