I use Spark for data analysis and machine learning.
After reading some data like trainDF, I will build two pipelines that are logically equivalent, but one of them has a VectorAssembler at the end (which has only one inputCols) to demonstrate slowdown:
scala> val assembler = new VectorAssembler().setInputCols(Array("all_description_features")).setOutputCol("features") assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_a76e6412bc96 scala> val idfDescription = new IDF().setInputCol("all_description_hashed").setOutputCol("all_description_features") idfDescription: org.apache.spark.ml.feature.IDF = idf_4b504cf08d86 scala> val descriptionArray = Array(tokensDescription, removerDescription, hashingTFDescription, idfDescription, assembler, lr) descriptionArray: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}}] = Array(regexTok_316674b9209b, stopWords_8ecdf6f09955, hashingTF_48cf3f9cc065, idf_4b504cf08d86, vecAssembler_a76e6412bc96, logreg_f0763c33b304) scala> val pipeline = new Pipeline().setStages(descriptionArray) pipeline: org.apache.spark.ml.Pipeline = pipeline_4e462d0ee649 scala> time {pipeline.fit(trainDF)} 16/09/28 13:04:17 WARN Executor: 1 block locks were not released by TID = 9526: [rdd_38_0] Elapsed time: 62370646425ns res94: org.apache.spark.ml.PipelineModel = pipeline_4e462d0ee649 scala> val idfDescription = new IDF().setInputCol("all_description_hashed").setOutputCol("features") idfDescription: org.apache.spark.ml.feature.IDF = idf_264569f76b23 scala> val descriptionArray = Array(tokensDescription, removerDescription, hashingTFDescription, idfDescription, lr) descriptionArray: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}}] = Array(regexTok_316674b9209b, stopWords_8ecdf6f09955, hashingTF_48cf3f9cc065, idf_264569f76b23, logreg_f0763c33b304) scala> val pipeline = new Pipeline().setStages(descriptionArray) pipeline: org.apache.spark.ml.Pipeline = pipeline_758ec8aa3228 scala> time {pipeline.fit(trainDF)} Elapsed time: 11092968167ns res95: org.apache.spark.ml.PipelineModel = pipeline_758ec8aa3228
As you can see, pip.fit with the optional VectorAssembler is much slower. This is an example of toys, but the actual example that I use will benefit from VectorAssembler (whereas in this case it makes no sense to use it) and suffers from a similar performance impact.
Just wondering if this should be expected, or if I'm using it the wrong way. I also noticed that with VectorAssembler I get a warning message that the locks are not released, what could be connected?
Thanks for the help and guidance!
Update # 1
Further analysis shows that the additional time spent on the stage of logistic adjustment, and not the actual assembly of functions. It is unclear why this will take more time, since the data in which it acts in both cases is identical (I proved it to myself by joining two data sets before they are transferred to the correspondence function, and we check the coincidence of the two columns of the function for all ids).
Update # 2
Another thing I noticed is that if I write two data sets to disk as parquet (the one that went through VectorAssembler and the one that doesn't), the one that went through VectorAssembler is 10x in size, although they have a seemingly identical pattern, row count and data.
Update # 3
OK - so I think I see what is happening. Although the data with / without VectorAssembler is identical, the act of invoking the conversion to VectorAssembler according to my data decorates it with a large amount (in my case, several useless) metadata. This causes a bloat in disk size, as well as, apparently, a much slower regression due to the need to process this additional data.