I am using Apache Spark with Scala to create an ML pipeline. One of the transformers that I have in my conveyor performs an expensive operation joinin the early stages of the process. Since I have many functions in mine ParamGrid, this means that the program needs to hold this huge, combined DataFramein memory, while it optimizes each function in the grid.
To try to solve this problem, I created a custom Transformerone that caches this large intermediate DataFrame, writing it to the parquet in S3 and returning a DataFrame that is read from the parquet. This worked and increased the speed of the model until I added the functions to ParamGridthat were delivered before the caching stage. When I write parquet in S3, I use the path that is defined:
class Cacher(override val uid: String) extends Transformer {
lazy val cachePath = Identifiable.randomUID(uid + "transformer-cache")
I think I don’t understand how it works uid... I assumed that whenever Spark was optimized over ParamGrid, it took all the classes that were delivered at that moment in the pipeline, created new instances from them and gave them a new, unique uidto track them. I suspect that caching is going awry because Spark does not give unique uidto the new instances Transformerthat it creates, which means that the cached parquet is constantly overwritten whenever a new cache instance is created Transformer. Can someone give pointers on how to create a unique random uidstep for each instance of the stage that the pipeline creates?
Hooray!