How to use an external (custom) package in pyspark?

I am trying to reproduce the name given here https://www.cloudera.com/documentation/enterprise/5-7-x/topics/spark_python.html for importing external packages into pypspark. But he fails.

My code is:

spark_distro.py

from pyspark import SparkContext, SparkConf def import_my_special_package(x): from external_package import external return external.fun(x) conf = SparkConf() sc = SparkContext() int_rdd = sc.parallelize([1, 2, 3, 4]) int_rdd.map(lambda x: import_my_special_package(x)).collect() 

external_package.py

 class external: def __init__(self,in): self.in = in def fun(self,in): return self.in*3 

fix the send command:

 spark-submit \ --master yarn \ /path to script/spark_distro.py \ --py-files /path to script/external_package.py \ 1000 

Actual error:

 Actual: vs = list(itertools.islice(iterator, batch)) File "/home/gsurapur/pyspark_examples/spark_distro.py", line 13, in <lambda> File "/home/gsurapur/pyspark_examples/spark_distro.py", line 6, in import_my_special_package ImportError: No module named external_package 

Expected Result:

 [3,6,9,12] 

I also tried the sc.addPyFile option and it does not work with the same problem. Please help me find the problem. Thanks in advance.

+2
source share
2 answers

I know that in retrospect this sounds silly, but the order of the spark-submit arguments is not generally interchangeable: all arguments related to Spark, including --py-file , must be before the script to execute:

 # your case: spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro.py --py-files /home/ctsats/scripts/SO/external_package.py [...] ImportError: No module named external_package # correct usage: spark-submit --master yarn-client --py-files /home/ctsats/scripts/SO/external_package.py /home/ctsats/scripts/SO/spark_distro.py [...] [3, 6, 9, 12] 

Tested by your scripts, modified as follows:

spark_distro.py

 from pyspark import SparkContext, SparkConf def import_my_special_package(x): from external_package import external return external(x) conf = SparkConf() sc = SparkContext() int_rdd = sc.parallelize([1, 2, 3, 4]) print int_rdd.map(lambda x: import_my_special_package(x)).collect() 

external_package.py

 def external(x): return x*3 

with changes that may not change the essence of the question ...

+2
source

Here is the situation regarding addPyFile :

spark_distro2.py

 from pyspark import SparkContext, SparkConf def import_my_special_package(x): from external_package import external return external(x) conf = SparkConf() sc = SparkContext() sc.addPyFile("/home/ctsats/scripts/SO/external_package.py") # added int_rdd = sc.parallelize([1, 2, 3, 4]) print int_rdd.map(lambda x: import_my_special_package(x)).collect() 

Test:

 spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro2.py [...] [3, 6, 9, 12] 
+1
source

Source: https://habr.com/ru/post/1273221/


All Articles