Using local variables in closure when accessing Spark RDD

I have a question regarding the use of local variables in closure when accessing Spark RDD. The problem I would like to solve is as follows:

I have a list of text files that need to be read in RDD. However, first I need to add additional information to the RDD created from a single text file. This additional information is extracted from the file name. Then the RDDs are put into one large RDD using union ().

from pyspark import SparkConf, SparkContext spark_conf = SparkConf().setAppName("SparkTest") spark_context = SparkContext(conf=spark_conf) list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt'] rdd_list = [] for filename in list_of_filenames: tmp_rdd = spark_context.textFile(filename) # extract_file_info('file_from_Owner.txt') == 'Owner' file_owner = extract_file_info(filename) tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner)) rdd_list.append(tmp_rdd) overall_content_rdd = spark_context.union(rdd_list) # ...do something... overall_content_rdd.collect() # However, this does not work: # The result is that always Bert will be the owner, ie, never Ernie. 

The problem is that the map () function inside the loop does not apply to the "correct" file. Instead, it will refer to the most recent file_owner value. On my local machine, I managed to fix the problem by calling the cache () function for each individual RDD:

 # .. tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner)) tmp_rdd.cache() # .. 

My question is: Is using cache () the right solution for my problem? Are there any alternatives?

Thank you very much!

+6
source share
4 answers

Thus, the cache () method that you execute will not necessarily work in 100% of cases, it works on the condition that no nodes work, and no partitions should be recounted. A simple solution would be to make a function that "captures" the value of file_owner. Here is a small illustration in the pyspark shell of a potential solution:

 Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0-SNAPSHOT /_/ Using Python version 2.7.6 (default, Mar 22 2014 22:59:56) SparkContext available as sc. >>> hi = "hi" >>> sc.parallelize(["panda"]) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:365 >>> r = sc.parallelize(["panda"]) >>> meeps = r.map(lambda x : x + hi) >>> hi = "by" >>> meeps.collect() ['pandaby'] >>> hi = "hi" >>> def makeGreetFunction(param): ... return (lambda x: x + param) ... >>> f = makeGreetFunction(hi) >>> hi="by" >>> meeps = r.map(f) >>> meeps.collect() ['pandahi'] >>> 
+1
source

This is not a spark phenomenon, but simple Python.

 >>> fns = [] >>> for i in range(3): ... fns.append(lambda: i) ... >>> for fn in fns: ... print fn() ... 2 2 2 

One way to avoid this is to declare functions, default arguments. The default value is evaluated at the time of announcement.

 >>> fns = [] >>> for i in range(3): ... def f(i=i): ... return i ... fns.append(f) ... >>> for fn in fns: ... print fn() ... 0 1 2 

This is a lot, see the following questions:

+2
source

You can create an array of file owners and use this in the map conversion:

 file_owner[i] = extract_file_info(filename) tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner[i])) 
0
source

As others have explained, the problem with your lambda function is that it will evaluate file_owner at runtime. To make it evaluate during iteration of the for loop, you need to create a constructive function and execute . Here's how to do it with lambdas:

 # ... file_owner = extract_file_info(filename) tmp_rdd = tmp_rdd.map((lambda owner: lambda line: (line,owner))(file_owner)) # ... 
0
source

Source: https://habr.com/ru/post/981690/


All Articles