As the question says, I would like to use a partial function consisting of orElse, like udf in a spark. Here is an example that can be run in a spark shell:
val df = sc.parallelize(1 to 15).toDF("num") df.show //Testing out a normal udf - this works val gt5: (Int => String) = num => (num > 5).toString val gt5Udf = udf(gt5) df.withColumn("gt5", gt5Udf(col("num"))).show //Now create a udf of a partial function composed with orElse val baseline: PartialFunction[Int, String] = { case _ => "baseline" } val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" } val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" } val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" } val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline val composedUdf = udf(composed) //This fails (but this is what I'd like to do) df.withColumn("pf", composedUdf(col("num"))).show //Use a partial function not composed with orElse - this works val baselineUdf = udf(baseline) df.withColumn("pf", baselineUdf(col("num"))).show
I am currently running this on a three standalone node cluster with the following configuration:
- spark: 1.6.0
- hdfs: 2.4.1
- scala: 2.10.5
I found what, in my opinion, is a hint in this answer: Why can Scala serialize a function, but not a PartialFunction?
so i tried:
scala> composed.isInstanceOf[Serializable] res: Boolean = false scala> composedUdf.isInstanceOf[Serializable] res: Boolean = true scala> baseline.isInstanceOf[Serializable] res: Boolean = true scala> baselineUdf.isInstanceOf[Serializable] res: Boolean = true
I get the blur here, but it seems like a partial function using orElse removes serialization?
I think the most informative are the errors:
org.apache.spark.SparkException: Task not serializable ... Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse ...
How to fix it? Or am I from the base?
Thanks in advance for your help!
source share