如何拆卸SparkSession并在一个应用程序中创建一个新的SparkSession? [英] How can I tear down a SparkSession and create a new one within one application?

查看:104
本文介绍了如何拆卸SparkSession并在一个应用程序中创建一个新的SparkSession?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有多个独立模块的pyspark程序,每个模块都可以独立处理数据以满足我的各种需求.但是它们也可以链接在一起,以处理管道中的数据.这些模块中的每一个都构建一个SparkSession并独立执行.

但是,当我尝试在同一python进程中串行运行它们时,遇到了问题.在管道中的第二个模块执行时,Spark抱怨我试图使用的SparkContext已停止:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

每个模块都在执行开始时构建一个SparkSession,并在其过程结束时停止sparkContext.我像这样建立和停止会话/上下文:

session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()

根据

如何在独立的模块中构建新的SparkSession,并在相同的Python进程中按顺序执行它们,而以前的会话不会干扰新创建的SparkSession?

以下是项目结构的示例:

main.py

import collect
import process

if __name__ == '__main__':
    data = collect.execute()
    process.execute(data)

collect.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()

    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

process.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()
    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

解决方案

长话短说,Spark(包括PySpark)并非旨在在单个应用程序中处理多个上下文.如果您对故事的JVM感兴趣,建议阅读 SPARK-2243 (解析为无法解决).

PySpark做出了许多设计决定,这些决定包括但不限于您不能一次拥有多个SparkContexts应用. SparkSession不仅绑定到SparkContext,而且还引入了自身的问题,例如,如果使用本地Hive元存储,则将其处理.此外,还有一些函数在内部使用SparkSession.builder.getOrCreate并根据您现在看到的行为使用 .一个明显的例子是UDF注册.如果存在多个SQL上下文(例如RDD.toDF),其他功能可能会表现出意外的行为.

多个上下文不仅不受支持,而且我个人认为还违反了单一责任原则.您的业​​务逻辑不应与所有设置,清理和配置详细信息有关.

我的个人建议如下:

  • 如果应用程序可以组成多个一致的模块,并且可以从一个具有缓存和公共metastore的单一执行环境中受益,则可以初始化应用程序入口点中的所有必需上下文,并在必要时将这些上下文传递给各个管道:

    • main.py:

      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      if __name__ == "__main__":
          spark: SparkSession = ...
      
          # Pass data between modules
          collected = collect.execute(spark)
          processed = process.execute(spark, data=collected)
          ...
          spark.stop()
      

    • collect.py/process.py:

      from pyspark.sql import SparkSession
      
      def execute(spark: SparkSession, data=None):
          ...
      

  • 否则(根据您的描述,这里似乎是这种情况),我将设计入口点以执行单个管道并使用外部worfklow管理器(例如 Toil )来处理执行

    它不仅更清洁,而且可以使故障恢复和调度更加灵活.

    建设者当然可以做同样的事情,但是就像聪明的人曾经说过:显式胜于隐式.

    • main.py

      import argparse
      
      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      pipelines = {"collect": collect, "process": process}
      
      if __name__ == "__main__":
          parser = argparse.ArgumentParser()
          parser.add_argument('--pipeline')
          args = parser.parse_args()
      
          spark: SparkSession = ...
      
          # Execute a single pipeline only for side effects
          pipelines[args.pipeline].execute(spark)
          spark.stop()
      

    • collect.py/process.py和上一点一样.

以一种或另一种方式,我会保留一个且只有一个地方设置了上下文,并且将一个和唯一一个地方保留了.

I have a pyspark program with multiple independent modules that can each independently process data to meet my various needs. But they can also be chained together to process data in a pipeline. Each of these modules builds a SparkSession and executes perfectly on their own.

However, when I try to run them serially within the same python process, I run into issues. At the moment when the second module in the pipeline executes, spark complains that the SparkContext I am attempting to use has been stopped:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

Each of these modules builds a SparkSession at the beginning of execution and stops the sparkContext at the end of its process. I build and stop sessions/contexts like so:

session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()

According to official documentation, getOrCreate "gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder." But I don't want this behavior (this behavior where the process attempts to get an existing session). I can't find any way to disable it, and I can't figure out how to destroy the session -- I only know how to stop its associated SparkContext.

How can I build new SparkSessions in independent modules, and execute them in sequence in the same Python process without previous sessions interfering with the newly created ones?

The following is an example of the project structure:

main.py

import collect
import process

if __name__ == '__main__':
    data = collect.execute()
    process.execute(data)

collect.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()

    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

process.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()
    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

解决方案

Long story short, Spark (including PySpark) is not designed to handle multiple contexts in a single application. If you're interested in JVM side of the story I would recommend reading SPARK-2243 (resolved as won't fix).

There is a number of design decisions made in PySpark which reflects that including, but not limited to a singleton Py4J gateway. Effectively you cannot have multiple SparkContexts in a single application. SparkSession is not only bound to SparkContext but also introduces problems of its own, like handling local (standalone) Hive metastore if one is used. Moreover there functions which use SparkSession.builder.getOrCreate internally and depend on the behavior you see right now. A notable example is UDF registration. Other functions may exhibit unexpected behavior if multiple SQL contexts are present (for example RDD.toDF).

Multiple contexts are not only unsupported but also, in my personal opinion, violate single responsibility principle. Your business logic shouldn't be concerned with all the setup, cleanup and configuration details.

My personal recommendations are as follows:

  • If application consist of multiple coherent modules which can be composed and benefit from a single execution environment with caching and common metastore initialize all required contexts in the application entry point and pass these down to individual pipelines when necessary:

    • main.py:

      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      if __name__ == "__main__":
          spark: SparkSession = ...
      
          # Pass data between modules
          collected = collect.execute(spark)
          processed = process.execute(spark, data=collected)
          ...
          spark.stop()
      

    • collect.py / process.py:

      from pyspark.sql import SparkSession
      
      def execute(spark: SparkSession, data=None):
          ...
      

  • Otherwise (it seems to be the case here based on your description) I would design entrypoint to execute a single pipeline and use external worfklow manager (like Apache Airflow or Toil) to handle the execution.

    It is not only cleaner but also allows for much more flexible fault recovery and scheduling.

    The same thing can be of course done with builders but like a smart person once said: Explicit is better than implicit.

    • main.py

      import argparse
      
      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      pipelines = {"collect": collect, "process": process}
      
      if __name__ == "__main__":
          parser = argparse.ArgumentParser()
          parser.add_argument('--pipeline')
          args = parser.parse_args()
      
          spark: SparkSession = ...
      
          # Execute a single pipeline only for side effects
          pipelines[args.pipeline].execute(spark)
          spark.stop()
      

    • collect.py / process.py as in the previous point.

One way or another I would keep one and only one place where context is set up and one and only one place were it is tear down.

这篇关于如何拆卸SparkSession并在一个应用程序中创建一个新的SparkSession?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆