I download large datasets and then cache them for reference throughout my code. The code looks something like this:
val conversations = sqlContext.read .format("com.databricks.spark.redshift") .option("url", jdbcUrl) .option("tempdir", tempDir) .option("forward_spark_s3_credentials","true") .option("query", "SELECT * FROM my_table "+ "WHERE date <= '2017-06-03' "+ "AND date >= '2017-03-06' ") .load() .cache()
If I leave the cache, the code runs quickly because Datasets are evaluated lazily. But if I type cache (), the block will take a lot of time.
On the Spark UI event timeline, it looks like the SQL table is passed to work nodes and then cached on work nodes.
Why is the cache running immediately? The source code, apparently, only indicates caching when computing data:
The source code for Dataset calls this code in the CacheManager. scala when cache or persist is called:
def cacheQuery( query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { val planToCache = query.logicalPlan if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession cachedData.add(CachedData( planToCache, InMemoryRelation( sparkSession.sessionState.conf.useCompression, sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(planToCache).executedPlan, tableName))) } }
Which indicates only caching, not data caching. And I expect that caching will be returned immediately based on other Stack Overflow responses.
Has anyone else seen that caching happens just before an action is executed on a dataset? Why is this happening?