PySpark 2.x:以编程方式将Maven JAR坐标添加到Spark [英] PySpark 2.x: Programmatically adding Maven JAR Coordinates to Spark

查看:67
本文介绍了PySpark 2.x:以编程方式将Maven JAR坐标添加到Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是我的PySpark启动片段,它非常可靠(我已经使用了很长时间).今天,我添加了 spark.jars.packages 选项中显示的两个Maven坐标(在Kafka支持中有效地插入").现在通常会触发依赖项下载(由Spark自动执行):

The following is my PySpark startup snippet, which is pretty reliable (I've been using it a long time). Today I added the two Maven Coordinates shown in the spark.jars.packages option (effectively "plugging" in Kafka support). Now that normally triggers dependency downloads (performed by Spark automatically):

import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row
  # ------------------------------------------
  # Note: Row() in .../pyspark/sql/types.py
  # isn't included in '__all__' list(), so
  # we must import it by name here.
  # ------------------------------------------

num_cpus = multiprocessing.cpu_count()        # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None)     # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None)     # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME',        None)     # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP',     None)     # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre'  # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVE_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'

# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
                                'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[{}]'.format(num_cpus)),
                   ('spark.app.name', 'myApp'),
                   ('spark.submit.deployMode', 'client'),
                   ('spark.ui.showConsoleProgress', 'true'),
                   ('spark.eventLog.enabled', 'false'),
                   ('spark.logConf', 'false'),
                   ('spark.jars.repositories', 'file:/' + JARS_IVE_REPO),
                   ('spark.jars.ivy', JARS_IVE_REPO),
                   ('spark.jars.packages', spark_jars_packages), ])

spark_sesn            = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt            = spark_sesn.sparkContext
spark_reader          = spark_sesn.read
spark_streamReader    = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")

但是,当我运行代码段(例如 ./python -i init_spark.py )时,插件并没有下载和/或加载.

However the plugins aren't downloading and/or loading when I run the snippet (e.g. ./python -i init_spark.py), as they should.

该机制曾经起作用,但是随后停止了.我想念什么?

This mechanism used to work, but then stopped. What am I missing?

提前谢谢!

推荐答案

在这种情况下,问题的价值将大于答案,因为上面的代码可以工作,但在Spark 2中找不到任何地方.x文档或示例.

This is the kind of post where the QUESTION will be worth more than the ANSWER, because the code above works but isn't anywhere to be found in Spark 2.x documentation or examples.

以上是我通过Maven坐标以编程方式向Spark 2.x添加功能的方式.我有这个工作,但后来停止了工作.为什么?

The above is how I've programmatically added functionality to Spark 2.x by way of Maven Coordinates. I had this working but then it stopped working. Why?

当我在 jupyter笔记本中运行上述代码时,该笔记本在后台已经通过我的 PYTHONSTARTUP 脚本运行了相同的代码段.该 PYTHONSTARTUP 脚本具有与上面相同的代码,但忽略了maven坐标(出于意图).

When I ran the above code in a jupyter notebook, the notebook had -- behind the scenes -- already run that identical code snippet by way of my PYTHONSTARTUP script. That PYTHONSTARTUP script has the same code as the above, but omits the maven coordinates (by intent).

那么,这个细微问题是如何出现的:

Here, then, is how this subtle problem emerges:

spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()

由于Spark会话已经存在,因此以上语句仅重用了没有加载jar/库的现有会话(.getOrCreate())(再次,因为我的PYTHONSTARTUP脚本有意忽略了它们).这就是为什么最好将打印语句放在PYTHONSTARTUP脚本中的原因(否则该脚本将保持沉默).

Because a Spark Session already existed, the above statement simply reused that existing session (.getOrCreate()), which did not have the jars/libraries loaded (again, because my PYTHONSTARTUP script intentionally omits them). This is why it is a good idea to put print statements in PYTHONSTARTUP scripts (which are otherwise silent).

最后,我只是忘了这样做: $在启动 JupyterLab/Notebook 守护程序之前未设置PYTHONSTARTUP .

In the end, I simply forgot to do this: $ unset PYTHONSTARTUP before starting the JupyterLab / Notebook daemon.

我希望问题可以帮助其他人,因为那是如何以编程方式向Spark 2.x(在本例中为Kafka)添加功能.请注意,您需要Internet连接才能从Maven Central一次性下载指定的jar和递归依赖项.

I hope the Question helps others because that's how to programmatically add functionality to Spark 2.x (in this case Kafka). Note that you'll need an internet connection for the one-time download of the specified jars and recursive dependencies from Maven Central.

这篇关于PySpark 2.x:以编程方式将Maven JAR坐标添加到Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆