How can I demolish SparkSession and create a new one in one application?

I have a pyspark program with several independent modules, each of which can independently process data to meet my various needs. But they can also be joined together to process data in the pipeline. Each of these modules creates a SparkSession and works great on its own.

However, when I try to run them sequentially in the same python process, I run into problems. At that moment, when the second module is executed in the pipeline, the sparks complain that the SparkContext that I am trying to use has been stopped:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

Each of these modules creates a SparkSession at the start of execution and stops intrinsic safety at the end of its process. I create and stop sessions / contexts as follows:

session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()

According to official documentation , getOrCreate"receives an existing SparkSession or, if it is not, creates a new one based on the settings specified in this builder." But I do not want this behavior (this is behavior when a process tries to get an existing session). I cannot find a way to disconnect it, and I cannot figure out how to destroy the session - I know how to stop the associated SparkContext.

How can I create new SparkSessions in independent modules and execute them sequentially in the same Python process without previous sessions interfering with new ones?

The following is an example project structure:

main.py

import collect
import process

if __name__ == '__main__':
    data = collect.execute()
    process.execute(data)

collect.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()

    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

process.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()
    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result
+4
3

, Spark ( PySpark) . JVM- , SPARK-2243 (, ).

PySpark , , , , Py4J-. SparkContexts . SparkSession SparkContext, , () , . , , SparkSession.builder.getOrCreate , . UDF. , SQL (, RDD.toDF).

, , , . - , .

:

  • , , , :

    • main.py:

      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      if __name__ == "__main__":
          spark: SparkSession = ...
      
          # Pass data between modules
          collected = collect.execute(spark)
          processed = process.execute(spark, data=collected)
          ...
          spark.stop()
      
    • collect.py/process.py:

      from pyspark.sql import SparkSession
      
      def execute(spark: SparkSession, data=None):
          ...
      
  • (, , ) worfklow (, Apache Airflow Toil) .

    , .

    , , : , .

    • main.py

      import argparse
      
      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      pipelines = {"collect": collect, "process": process}
      
      if __name__ == "__main__":
          parser = argparse.ArgumentParser()
          parser.add_argument('--pipeline')
          args = parser.parse_args()
      
          spark: SparkSession = ...
      
          # Execute a single pipeline only for side effects
          pipelines[args.pipeline].execute(spark)
          spark.stop()
      
    • collect.py/process.py, .

, , - .

+3

, :

, SparkSession __init__ ( ):

_instantiatedContext = None

def __init__(self, sparkContext, jsparkSession=None):
    self._sc = sparkContext
    if SparkSession._instantiatedContext is None:
        SparkSession._instantiatedContext = self

, _instantiatedContext None session.stop(). , getOrCreate() _instantiatedContext, sparkContext.

, . , - .

+2

Why don't you go through a single instance of a spark session at several stages of your pipeline? You can use the builder pattern. It seems to me that you collect the result sets at the end of each stage, and then transfer this data to the next stage. Consider leaving the data in the cluster in the same session and passing the link to the session and the link to the result from stage to stage until your application is completed.

In other words, put

session = SparkSession.builder...

... at your core.

0
source

Source: https://habr.com/ru/post/1665914/


All Articles