例外:在Python中创建Spark会话时,在发送驱动程序端口号之前退出Java网关进程 [英] Exception: Java gateway process exited before sending the driver its port number while creating a Spark Session in Python

查看:62
本文介绍了例外:在Python中创建Spark会话时,在发送驱动程序端口号之前退出Java网关进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我正在尝试使用以下命令在Python 2.7中创建一个Spark会话:

  #Initialize SparkSession和SparkContext从pyspark.sql导入SparkSession从pyspark导入SparkContext#创建Spark会话SpSession = SparkSession \.builder \.master("local [2]")\.appName("V2 Maestros")\.config("spark.executor.memory","1g")\.config("spark.cores.max","2")\.config("spark.sql.warehouse.dir","file:///c:/temp/spark-warehouse")\.getOrCreate()#从Spark会话获取Spark上下文SpContext = SpSession.sparkContext 

我收到以下错误,指向 python \ lib \ pyspark.zip \ pyspark \ java_gateway.py path`

 异常:向驱动程序发送端口号之前,Java网关进程已退出 

试图查看具有以下内容的java_gateway.py文件:

 导入atexit导入操作系统导入系统导入选择导入信号导入shlex进口插座导入平台从子流程导入Popen,PIPE如果sys.version> ='3':xrange =范围从py4j.java_gateway导入java_import,JavaGateway,GatewayClient从py4j.java_collections导入ListConverter从pyspark.serializers导入read_int#修补ListConverter,否则它将字节数组转换为Java ArrayListdef can_convert_list(self,obj):返回isinstance(obj,(list,tuple,xrange))ListConverter.can_convert = can_convert_listdef launch_gateway():如果在os.environ中为"PYSPARK_GATEWAY_PORT":gateway_port = int(os.environ ["PYSPARK_GATEWAY_PORT"])别的:SPARK_HOME = os.environ ["SPARK_HOME"]#使用Spark的run命令启动Py4j网关,以便我们提取#spark-env.sh的正确类路径和设置on_windows = platform.system()=="Windows"脚本="./bin/spark-submit.cmd"(如果在on_windows上,否则为"./bin/spark-submit")Submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS","pyspark-shell")如果os.environ.get("SPARK_TESTING"):Submit_args =''.join(["--conf spark.ui.enabled = false",Submit_args])命令= [os.path.join(SPARK_HOME,脚本)] + shlex.split(submit_args)#启动一个套接字,PythonGatewayServer将使用该套接字将其端口与我们进行通信callback_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)callback_socket.bind(('127.0.0.1',0))callback_socket.listen(1)callback_host,callback_port = callback_socket.getsockname()env = dict(os.environ)env ['_ PYSPARK_DRIVER_CALLBACK_HOST'] = callback_hostenv ['_ PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)#启动Java网关.#我们打开一个连接到stdin的管道,以便在管道断开时Java网关可以终止如果不是on_windows:#不要将ctrl-c/SIGINT发送到Java网关:def preexec_func():signal.signal(signal.SIGINT,signal.SIG_IGN)proc = Popen(命令,stdin = PIPE,preexec_fn = preexec_func,env = env)别的:Windows上不支持#preexec_fnproc = Popen(命令,stdin = PIPE,env = env)gateway_port =无#我们在这里使用select()以避免子进程死亡时无限期阻塞#连接之前而gateway_port为None且proc.poll()为None时:超时= 1#(秒)可读,_,_ = select.select([[callback_socket],[],[],超时)如果callback_socket可读:gateway_connection = callback_socket.accept()[0]#确定服务器在哪个临时端口上启动:gateway_port = read_int(gateway_connection.makefile(mode ="rb"))gateway_connection.close()callback_socket.close()如果gateway_port为None:引发异常(在发送驱动程序的端口号之前退出Java网关进程")#在Windows中,请确保Python退出后Java子进程不会滞留.#在基于UNIX的系统中,子进程可能会在管道中断的情况下自行终止(即#父进程的stdin发送EOF).但是,在Windows中,这是不可能的#因为java.lang.Process直接从父进程的stdin中读取,因此#有机会从父级读取EOF.请注意,这仅是最佳选择#努力,并且如果python进程被暴力终止将不会生效.如果在on_windows上:#在Windows中,此处的子进程是"spark-submit.cmd",而不是JVM本身#(因为UNIX"exec"命令不可用).这意味着我们不能简单地#调用proc.kill(),它仅杀死"spark-submit.cmd"进程,而不杀死#JVM.相反,我们将"taskkill"与tree-kill选项"/t"一起使用来终止所有树中的#个子进程(http://technet.microsoft.com/zh-cn/library/bb491009.aspx)def killChild():Popen([["cmd","/c","taskkill","/f","/t","/pid",str(proc.pid)])atexit.register(killChild)#连接到网关gateway = JavaGateway(GatewayClient(port = gateway_port),auto_convert = True)#导入PySpark使用的类java_import(gateway.jvm,"org.apache.spark.SparkConf")java_import(gateway.jvm,"org.apache.spark.api.java.*")java_import(gateway.jvm,"org.apache.spark.api.python.*")java_import(gateway.jvm,"org.apache.spark.ml.python.*")java_import(gateway.jvm,"org.apache.spark.mllib.api.python.*")#TODO(davies):移至sqljava_import(gateway.jvm,"org.apache.spark.sql.*")java_import(gateway.jvm,"org.apache.spark.sql.hive.*")java_import(gateway.jvm,"scala.Tuple2")返回网关 

对于Spark和Pyspark,我还很陌生,因此无法在此处调试问题.我还尝试查看其他一些建议:请注意,sc spark上下文变量需要已经定义.

  • 好吧,我的主要目的是在我的IDE中启用具有自动完成功能的pyspark,这就是SPARK_HOME(第2步)开始起作用的时候.如果一切设置正确,您应该会看到以下几行:
  • 希望有帮助,您可以在本地运行Spark代码.

    So, I am trying to create a Spark session in Python 2.7 using the following:

    #Initialize SparkSession and SparkContext
    from pyspark.sql import SparkSession  
    from pyspark import SparkContext
    
    #Create a Spark Session
    SpSession = SparkSession \
        .builder \
        .master("local[2]") \
        .appName("V2 Maestros") \
        .config("spark.executor.memory", "1g") \
        .config("spark.cores.max","2") \
        .config("spark.sql.warehouse.dir", "file:///c:/temp/spark-warehouse")\
        .getOrCreate()
    
    #Get the Spark Context from Spark Session    
    SpContext = SpSession.sparkContext
    

    I get the following error pointing to the python\lib\pyspark.zip\pyspark\java_gateway.pypath`

    Exception: Java gateway process exited before sending the driver its port number
    

    Tried to look into the java_gateway.py file, with the following contents:

    import atexit
    import os
    import sys
    import select
    import signal
    import shlex
    import socket
    import platform
    from subprocess import Popen, PIPE
    
    if sys.version >= '3':
        xrange = range
    
    from py4j.java_gateway import java_import, JavaGateway, GatewayClient
    from py4j.java_collections import ListConverter
    
    from pyspark.serializers import read_int
    
    
    # patching ListConverter, or it will convert bytearray into Java ArrayList
    def can_convert_list(self, obj):
        return isinstance(obj, (list, tuple, xrange))
    
    ListConverter.can_convert = can_convert_list
    
    
    def launch_gateway():
        if "PYSPARK_GATEWAY_PORT" in os.environ:
            gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
        else:
            SPARK_HOME = os.environ["SPARK_HOME"]
            # Launch the Py4j gateway using Spark's run command so that we pick up the
            # proper classpath and settings from spark-env.sh
            on_windows = platform.system() == "Windows"
            script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
            submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
            if os.environ.get("SPARK_TESTING"):
                submit_args = ' '.join([
                    "--conf spark.ui.enabled=false",
                    submit_args
                ])
            command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
    
            # Start a socket that will be used by PythonGatewayServer to communicate its port to us
            callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            callback_socket.bind(('127.0.0.1', 0))
            callback_socket.listen(1)
            callback_host, callback_port = callback_socket.getsockname()
            env = dict(os.environ)
            env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host
            env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)
    
            # Launch the Java gateway.
            # We open a pipe to stdin so that the Java gateway can die when the pipe is broken
            if not on_windows:
                # Don't send ctrl-c / SIGINT to the Java gateway:
                def preexec_func():
                    signal.signal(signal.SIGINT, signal.SIG_IGN)
                proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
            else:
                # preexec_fn not supported on Windows
                proc = Popen(command, stdin=PIPE, env=env)
    
            gateway_port = None
            # We use select() here in order to avoid blocking indefinitely if the subprocess dies
            # before connecting
            while gateway_port is None and proc.poll() is None:
                timeout = 1  # (seconds)
                readable, _, _ = select.select([callback_socket], [], [], timeout)
                if callback_socket in readable:
                    gateway_connection = callback_socket.accept()[0]
                    # Determine which ephemeral port the server started on:
                    gateway_port = read_int(gateway_connection.makefile(mode="rb"))
                    gateway_connection.close()
                    callback_socket.close()
            if gateway_port is None:
                raise Exception("Java gateway process exited before sending the driver its port number")
    
            # In Windows, ensure the Java child processes do not linger after Python has exited.
            # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
            # the parent process' stdin sends an EOF). In Windows, however, this is not possible
            # because java.lang.Process reads directly from the parent process' stdin, contending
            # with any opportunity to read an EOF from the parent. Note that this is only best
            # effort and will not take effect if the python process is violently terminated.
            if on_windows:
                # In Windows, the child process here is "spark-submit.cmd", not the JVM itself
                # (because the UNIX "exec" command is not available). This means we cannot simply
                # call proc.kill(), which kills only the "spark-submit.cmd" process but not the
                # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
                # child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
                def killChild():
                    Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
                atexit.register(killChild)
    
        # Connect to the gateway
        gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True)
    
        # Import the classes used by PySpark
        java_import(gateway.jvm, "org.apache.spark.SparkConf")
        java_import(gateway.jvm, "org.apache.spark.api.java.*")
        java_import(gateway.jvm, "org.apache.spark.api.python.*")
        java_import(gateway.jvm, "org.apache.spark.ml.python.*")
        java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
        # TODO(davies): move into sql
        java_import(gateway.jvm, "org.apache.spark.sql.*")
        java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
        java_import(gateway.jvm, "scala.Tuple2")
    
        return gateway
    

    I am pretty new to Spark and Pyspark, hence unable to debug the issue here. I also tried to look at some other suggestions: Spark + Python - Java gateway process exited before sending the driver its port number? and Pyspark: Exception: Java gateway process exited before sending the driver its port number

    but unable to resolve this so far. Please help!

    Here is how the spark environment looks like:

    # This script loads spark-env.sh if it exists, and ensures it is only loaded once.
    # spark-env.sh is loaded from SPARK_CONF_DIR if set, or within the current directory's
    # conf/ subdirectory.
    
    # Figure out where Spark is installed
    if [ -z "${SPARK_HOME}" ]; then
      export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
    fi
    
    if [ -z "$SPARK_ENV_LOADED" ]; then
      export SPARK_ENV_LOADED=1
    
      # Returns the parent of the directory this script lives in.
      parent_dir="${SPARK_HOME}"
    
      user_conf_dir="${SPARK_CONF_DIR:-"$parent_dir"/conf}"
    
      if [ -f "${user_conf_dir}/spark-env.sh" ]; then
        # Promote all variable declarations to environment (exported) variables
        set -a
        . "${user_conf_dir}/spark-env.sh"
        set +a
      fi
    fi
    
    # Setting SPARK_SCALA_VERSION if not already set.
    
    if [ -z "$SPARK_SCALA_VERSION" ]; then
    
      ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
      ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.10"
    
      if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
        echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
        echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
        exit 1
      fi
    
      if [ -d "$ASSEMBLY_DIR2" ]; then
        export SPARK_SCALA_VERSION="2.11"
      else
        export SPARK_SCALA_VERSION="2.10"
      fi
    fi
    

    Here is how my Spark environment is set up in Python:

    import os
    import sys
    
    # NOTE: Please change the folder paths to your current setup.
    #Windows
    if sys.platform.startswith('win'):
        #Where you downloaded the resource bundle
        os.chdir("E:/Udemy - Spark/SparkPythonDoBigDataAnalytics-Resources")
        #Where you installed spark.    
        os.environ['SPARK_HOME'] = 'E:/Udemy - Spark/Apache Spark/spark-2.0.0-bin-hadoop2.7'
    #other platforms - linux/mac
    else:
        os.chdir("/Users/kponnambalam/Dropbox/V2Maestros/Modules/Apache Spark/Python")
        os.environ['SPARK_HOME'] = '/users/kponnambalam/products/spark-2.0.0-bin-hadoop2.7'
    
    os.curdir
    
    # Create a variable for our root path
    SPARK_HOME = os.environ['SPARK_HOME']
    # Create a variable for our root path
    SPARK_HOME = os.environ['SPARK_HOME']
    
    #Add the following paths to the system path. Please check your installation
    #to make sure that these zip files actually exist. The names might change
    #as versions change.
    sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
    sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
    sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
    sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.1-src.zip"))
    
    #Initialize SparkSession and SparkContext
    from pyspark.sql import SparkSession  
    from pyspark import SparkContext
    

    解决方案

    After reading many posts I finally made Spark work on my Windows laptop. I use Anaconda Python, but I am sure this will work with standard distibution too.

    So, you need to make sure you can run Spark independently. My assumptions are that you have valid python path and Java installed. For Java I had "C:\ProgramData\Oracle\Java\javapath" defined in my Path which redirects to my Java8 bin folder.

    1. Download pre-built Hadoop version of Spark from https://spark.apache.org/downloads.html and extract it, e.g. to C:\spark-2.2.0-bin-hadoop2.7
    2. Create Environmental variable SPARK_HOME which you will need later for pyspark to pick up your local Spark installation.
    3. Go to %SPARK_HOME%\bin and try to run pyspark which is Python Spark shell. If your environment is like mine you will see exeption about inability to find winutils and hadoop. Second exception will be about missing Hive:

      pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':"

    4. I then found and simply followed https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-tips-and-tricks-running-spark-windows.html Specifically:

    5. Download winutils, put it to c:\hadoop\bin . Create HADOOP_HOME env and add %HADOOP_HOME%\bin to PATH.
    6. Create directory for Hive, e.g. c:\tmp\hive and run winutils.exe chmod -R 777 C:\tmp\hive in cmd in admin mode.
    7. Then go to %SPARK_HOME%\bin and make sure when you run pyspark you see a nice following Spark logo in ASCII: Note that sc spark context variable needs to be defined already.
    8. Well, my main purpose was to have pyspark with auto completion in my IDE, and that's when SPARK_HOME (Step 2) comes into play. If everything is setup correctly, you should see the following lines working:

    Hope that helps and you can enjoy running Spark code locally.

    这篇关于例外:在Python中创建Spark会话时,在发送驱动程序端口号之前退出Java网关进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆