I have a question regarding the use of local variables in closure when accessing Spark RDD. The problem I would like to solve is as follows:
I have a list of text files that need to be read in RDD. However, first I need to add additional information to the RDD created from a single text file. This additional information is extracted from the file name. Then the RDDs are put into one large RDD using union ().
from pyspark import SparkConf, SparkContext spark_conf = SparkConf().setAppName("SparkTest") spark_context = SparkContext(conf=spark_conf) list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt'] rdd_list = [] for filename in list_of_filenames: tmp_rdd = spark_context.textFile(filename)
The problem is that the map () function inside the loop does not apply to the "correct" file. Instead, it will refer to the most recent file_owner value. On my local machine, I managed to fix the problem by calling the cache () function for each individual RDD:
# .. tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner)) tmp_rdd.cache()
My question is: Is using cache () the right solution for my problem? Are there any alternatives?
Thank you very much!
source share