I have a pyspark program with several independent modules, each of which can independently process data to meet my various needs. But they can also be joined together to process data in the pipeline. Each of these modules creates a SparkSession and works great on its own.
However, when I try to run them sequentially in the same python process, I run into problems. At that moment, when the second module is executed in the pipeline, the sparks complain that the SparkContext that I am trying to use has been stopped:
py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
Each of these modules creates a SparkSession at the start of execution and stops intrinsic safety at the end of its process. I create and stop sessions / contexts as follows:
session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()
According to official documentation , getOrCreate"receives an existing SparkSession or, if it is not, creates a new one based on the settings specified in this builder." But I do not want this behavior (this is behavior when a process tries to get an existing session). I cannot find a way to disconnect it, and I cannot figure out how to destroy the session - I know how to stop the associated SparkContext.
How can I create new SparkSessions in independent modules and execute them sequentially in the same Python process without previous sessions interfering with new ones?
The following is an example project structure:
main.py
import collect
import process
if __name__ == '__main__':
data = collect.execute()
process.execute(data)
collect.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result
process.py
import datagetter
def execute(data=None):
session = SparkSession.builder.appName("myApp").getOrCreate()
data = data if data else datagetter.get()
rdd = session.sparkContext.parallelize(data)
[... do some work here ...]
result = rdd.collect()
session.stop()
return result