例外:在Python中创建Spark会话时,在发送驱动程序端口号之前退出Java网关进程 [英] Exception: Java gateway process exited before sending the driver its port number while creating a Spark Session in Python
问题描述
因此,我正在尝试使用以下命令在Python 2.7中创建一个Spark会话:
#Initialize SparkSession和SparkContext从pyspark.sql导入SparkSession从pyspark导入SparkContext#创建Spark会话SpSession = SparkSession \.builder \.master("local [2]")\.appName("V2 Maestros")\.config("spark.executor.memory","1g")\.config("spark.cores.max","2")\.config("spark.sql.warehouse.dir","file:///c:/temp/spark-warehouse")\.getOrCreate()#从Spark会话获取Spark上下文SpContext = SpSession.sparkContext
我收到以下错误,指向 python \ lib \ pyspark.zip \ pyspark \ java_gateway.py
path`
异常:向驱动程序发送端口号之前,Java网关进程已退出
试图查看具有以下内容的java_gateway.py文件:
导入atexit导入操作系统导入系统导入选择导入信号导入shlex进口插座导入平台从子流程导入Popen,PIPE如果sys.version> ='3':xrange =范围从py4j.java_gateway导入java_import,JavaGateway,GatewayClient从py4j.java_collections导入ListConverter从pyspark.serializers导入read_int#修补ListConverter,否则它将字节数组转换为Java ArrayListdef can_convert_list(self,obj):返回isinstance(obj,(list,tuple,xrange))ListConverter.can_convert = can_convert_listdef launch_gateway():如果在os.environ中为"PYSPARK_GATEWAY_PORT":gateway_port = int(os.environ ["PYSPARK_GATEWAY_PORT"])别的:SPARK_HOME = os.environ ["SPARK_HOME"]#使用Spark的run命令启动Py4j网关,以便我们提取#spark-env.sh的正确类路径和设置on_windows = platform.system()=="Windows"脚本="./bin/spark-submit.cmd"(如果在on_windows上,否则为"./bin/spark-submit")Submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS","pyspark-shell")如果os.environ.get("SPARK_TESTING"):Submit_args =''.join(["--conf spark.ui.enabled = false",Submit_args])命令= [os.path.join(SPARK_HOME,脚本)] + shlex.split(submit_args)#启动一个套接字,PythonGatewayServer将使用该套接字将其端口与我们进行通信callback_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)callback_socket.bind(('127.0.0.1',0))callback_socket.listen(1)callback_host,callback_port = callback_socket.getsockname()env = dict(os.environ)env ['_ PYSPARK_DRIVER_CALLBACK_HOST'] = callback_hostenv ['_ PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)#启动Java网关.#我们打开一个连接到stdin的管道,以便在管道断开时Java网关可以终止如果不是on_windows:#不要将ctrl-c/SIGINT发送到Java网关:def preexec_func():signal.signal(signal.SIGINT,signal.SIG_IGN)proc = Popen(命令,stdin = PIPE,preexec_fn = preexec_func,env = env)别的:Windows上不支持#preexec_fnproc = Popen(命令,stdin = PIPE,env = env)gateway_port =无#我们在这里使用select()以避免子进程死亡时无限期阻塞#连接之前而gateway_port为None且proc.poll()为None时:超时= 1#(秒)可读,_,_ = select.select([[callback_socket],[],[],超时)如果callback_socket可读:gateway_connection = callback_socket.accept()[0]#确定服务器在哪个临时端口上启动:gateway_port = read_int(gateway_connection.makefile(mode ="rb"))gateway_connection.close()callback_socket.close()如果gateway_port为None:引发异常(在发送驱动程序的端口号之前退出Java网关进程")#在Windows中,请确保Python退出后Java子进程不会滞留.#在基于UNIX的系统中,子进程可能会在管道中断的情况下自行终止(即#父进程的stdin发送EOF).但是,在Windows中,这是不可能的#因为java.lang.Process直接从父进程的stdin中读取,因此#有机会从父级读取EOF.请注意,这仅是最佳选择#努力,并且如果python进程被暴力终止将不会生效.如果在on_windows上:#在Windows中,此处的子进程是"spark-submit.cmd",而不是JVM本身#(因为UNIX"exec"命令不可用).这意味着我们不能简单地#调用proc.kill(),它仅杀死"spark-submit.cmd"进程,而不杀死#JVM.相反,我们将"taskkill"与tree-kill选项"/t"一起使用来终止所有树中的#个子进程(http://technet.microsoft.com/zh-cn/library/bb491009.aspx)def killChild():Popen([["cmd","/c","taskkill","/f","/t","/pid",str(proc.pid)])atexit.register(killChild)#连接到网关gateway = JavaGateway(GatewayClient(port = gateway_port),auto_convert = True)#导入PySpark使用的类java_import(gateway.jvm,"org.apache.spark.SparkConf")java_import(gateway.jvm,"org.apache.spark.api.java.*")java_import(gateway.jvm,"org.apache.spark.api.python.*")java_import(gateway.jvm,"org.apache.spark.ml.python.*")java_import(gateway.jvm,"org.apache.spark.mllib.api.python.*")#TODO(davies):移至sqljava_import(gateway.jvm,"org.apache.spark.sql.*")java_import(gateway.jvm,"org.apache.spark.sql.hive.*")java_import(gateway.jvm,"scala.Tuple2")返回网关
对于Spark和Pyspark,我还很陌生,因此无法在此处调试问题.我还尝试查看其他一些建议:请注意,sc spark上下文变量需要已经定义.
希望有帮助,您可以在本地运行Spark代码.
So, I am trying to create a Spark session in Python 2.7 using the following:
#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
#Create a Spark Session
SpSession = SparkSession \
.builder \
.master("local[2]") \
.appName("V2 Maestros") \
.config("spark.executor.memory", "1g") \
.config("spark.cores.max","2") \
.config("spark.sql.warehouse.dir", "file:///c:/temp/spark-warehouse")\
.getOrCreate()
#Get the Spark Context from Spark Session
SpContext = SpSession.sparkContext
I get the following error pointing to the python\lib\pyspark.zip\pyspark\java_gateway.py
path`
Exception: Java gateway process exited before sending the driver its port number
Tried to look into the java_gateway.py file, with the following contents:
import atexit
import os
import sys
import select
import signal
import shlex
import socket
import platform
from subprocess import Popen, PIPE
if sys.version >= '3':
xrange = range
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from py4j.java_collections import ListConverter
from pyspark.serializers import read_int
# patching ListConverter, or it will convert bytearray into Java ArrayList
def can_convert_list(self, obj):
return isinstance(obj, (list, tuple, xrange))
ListConverter.can_convert = can_convert_list
def launch_gateway():
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
SPARK_HOME = os.environ["SPARK_HOME"]
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
submit_args
])
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
# Start a socket that will be used by PythonGatewayServer to communicate its port to us
callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
callback_socket.bind(('127.0.0.1', 0))
callback_socket.listen(1)
callback_host, callback_port = callback_socket.getsockname()
env = dict(os.environ)
env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host
env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)
# Launch the Java gateway.
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdin=PIPE, env=env)
gateway_port = None
# We use select() here in order to avoid blocking indefinitely if the subprocess dies
# before connecting
while gateway_port is None and proc.poll() is None:
timeout = 1 # (seconds)
readable, _, _ = select.select([callback_socket], [], [], timeout)
if callback_socket in readable:
gateway_connection = callback_socket.accept()[0]
# Determine which ephemeral port the server started on:
gateway_port = read_int(gateway_connection.makefile(mode="rb"))
gateway_connection.close()
callback_socket.close()
if gateway_port is None:
raise Exception("Java gateway process exited before sending the driver its port number")
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
# Connect to the gateway
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True)
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
I am pretty new to Spark and Pyspark, hence unable to debug the issue here. I also tried to look at some other suggestions: Spark + Python - Java gateway process exited before sending the driver its port number? and Pyspark: Exception: Java gateway process exited before sending the driver its port number
but unable to resolve this so far. Please help!
Here is how the spark environment looks like:
# This script loads spark-env.sh if it exists, and ensures it is only loaded once.
# spark-env.sh is loaded from SPARK_CONF_DIR if set, or within the current directory's
# conf/ subdirectory.
# Figure out where Spark is installed
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1
# Returns the parent of the directory this script lives in.
parent_dir="${SPARK_HOME}"
user_conf_dir="${SPARK_CONF_DIR:-"$parent_dir"/conf}"
if [ -f "${user_conf_dir}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${user_conf_dir}/spark-env.sh"
set +a
fi
fi
# Setting SPARK_SCALA_VERSION if not already set.
if [ -z "$SPARK_SCALA_VERSION" ]; then
ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.10"
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
exit 1
fi
if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.10"
fi
fi
Here is how my Spark environment is set up in Python:
import os
import sys
# NOTE: Please change the folder paths to your current setup.
#Windows
if sys.platform.startswith('win'):
#Where you downloaded the resource bundle
os.chdir("E:/Udemy - Spark/SparkPythonDoBigDataAnalytics-Resources")
#Where you installed spark.
os.environ['SPARK_HOME'] = 'E:/Udemy - Spark/Apache Spark/spark-2.0.0-bin-hadoop2.7'
#other platforms - linux/mac
else:
os.chdir("/Users/kponnambalam/Dropbox/V2Maestros/Modules/Apache Spark/Python")
os.environ['SPARK_HOME'] = '/users/kponnambalam/products/spark-2.0.0-bin-hadoop2.7'
os.curdir
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.1-src.zip"))
#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
After reading many posts I finally made Spark work on my Windows laptop. I use Anaconda Python, but I am sure this will work with standard distibution too.
So, you need to make sure you can run Spark independently. My assumptions are that you have valid python path and Java installed. For Java I had "C:\ProgramData\Oracle\Java\javapath" defined in my Path which redirects to my Java8 bin folder.
- Download pre-built Hadoop version of Spark from https://spark.apache.org/downloads.html and extract it, e.g. to C:\spark-2.2.0-bin-hadoop2.7
- Create Environmental variable SPARK_HOME which you will need later for pyspark to pick up your local Spark installation.
Go to %SPARK_HOME%\bin and try to run pyspark which is Python Spark shell. If your environment is like mine you will see exeption about inability to find winutils and hadoop. Second exception will be about missing Hive:
pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionStateBuilder':"
I then found and simply followed https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-tips-and-tricks-running-spark-windows.html Specifically:
- Download winutils, put it to c:\hadoop\bin . Create HADOOP_HOME env and add %HADOOP_HOME%\bin to PATH.
- Create directory for Hive, e.g. c:\tmp\hive and run
winutils.exe chmod -R 777 C:\tmp\hive
in cmd in admin mode. - Then go to %SPARK_HOME%\bin and make sure when you run pyspark you see a nice following Spark logo in ASCII: Note that sc spark context variable needs to be defined already.
- Well, my main purpose was to have pyspark with auto completion in my IDE, and that's when SPARK_HOME (Step 2) comes into play. If everything is setup correctly, you should see the following lines working:
Hope that helps and you can enjoy running Spark code locally.
这篇关于例外:在Python中创建Spark会话时,在发送驱动程序端口号之前退出Java网关进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!