I have a custom Aggregator that does the count-min sketch aggregation. It works, but slow (code below). I get similar slow performance if I use a custom UDAF based on the UserDefinedAggregateFunction class.
This is much faster if I use the Dataset mapPartitions API to aggregate within a section, and then reduce the number of sections.
The question is - the slowness of the UDAF and Aggregator API appear to be caused by the serialization / deserialization (coding) that occurs on each line. Are the UDAF and Aggregator APIs not intended to be combined into non-trivial data structures such as count-min sketch? Is mapPartitions mapping approach optimized?
Aggregator example code:
import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Row, SparkSession} import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.util.sketch.CountMinSketch object AggTest extends App { val input = "2008.csv" val conf = new SparkConf().setMaster("local[4]").setAppName("tester") val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext val df = sqlContext.read.format("csv").option("header", "true").load(input) implicit val sketchEncoder = org.apache.spark.sql.Encoders.kryo[CountMinSketch] case class SketchAgg(col: String) extends Aggregator[Row, CountMinSketch, CountMinSketch] { def zero: CountMinSketch = CountMinSketch.create(500, 4, 2429) def reduce(sketch: CountMinSketch, row: Row) = { val a = row.getAs[Any](col) sketch.add(a) sketch } def merge(sketch1: CountMinSketch, sketch2: CountMinSketch) = { sketch1.mergeInPlace(sketch2) } def finish(sketch: CountMinSketch) = sketch def bufferEncoder: Encoder[CountMinSketch] = sketchEncoder def outputEncoder: Encoder[CountMinSketch] = sketchEncoder } val sketch = df.agg(SketchAgg("ArrDelay") .toColumn .alias("sketch")) .select("sketch") .as[CountMinSketch] .first() }
source share