PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark [英] PySpark 2.x: Programmatically adding Maven JAR Coordinates to Spark

查看:19
本文介绍了PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是我的 PySpark 启动片段,它非常可靠(我已经使用了很长时间).今天我添加了 spark.jars.packages 选项中显示的两个 Maven 坐标(在 Kafka 支持中有效地插入").现在通常会触发依赖下载(由 Spark 自动执行):

The following is my PySpark startup snippet, which is pretty reliable (I've been using it a long time). Today I added the two Maven Coordinates shown in the spark.jars.packages option (effectively "plugging" in Kafka support). Now that normally triggers dependency downloads (performed by Spark automatically):

import sys, os, multiprocessing
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn
from pyspark.sql.types import *
from pyspark.sql.types import Row
  # ------------------------------------------
  # Note: Row() in .../pyspark/sql/types.py
  # isn't included in '__all__' list(), so
  # we must import it by name here.
  # ------------------------------------------
 
num_cpus = multiprocessing.cpu_count()        # Number of CPUs for SPARK Local mode.
os.environ.pop('SPARK_MASTER_HOST', None)     # Since we're using pip/pySpark these three ENVs
os.environ.pop('SPARK_MASTER_POST', None)     # aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME',        None)     # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP',     None)     # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre'  # Oracle JAVA for our pip/python3/pySpark 2.4 (CDH's JRE won't work).
JARS_IVY_REPO = '/home/jdoe/SPARK.JARS.REPO.d/'

# ======================================================================
# Maven Coordinates for JARs (and their dependencies) needed to plug
# extra functionality into Spark 2.x (e.g. Kafka SQL and Streaming)
# A one-time internet connection is necessary for Spark to autimatically
# download JARs specified by the coordinates (and dependencies).
# ======================================================================
spark_jars_packages = ','.join(['org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
                                'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',])
# ======================================================================
spark_conf = SparkConf()
spark_conf.setAll([('spark.master', 'local[{}]'.format(num_cpus)),
                   ('spark.app.name', 'myApp'),
                   ('spark.submit.deployMode', 'client'),
                   ('spark.ui.showConsoleProgress', 'true'),
                   ('spark.eventLog.enabled', 'false'),
                   ('spark.logConf', 'false'),
                   ('spark.jars.repositories', 'file:/' + JARS_IVY_REPO),
                   ('spark.jars.ivy', JARS_IVY_REPO),
                   ('spark.jars.packages', spark_jars_packages), ])

spark_sesn            = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark_ctxt            = spark_sesn.sparkContext
spark_reader          = spark_sesn.read
spark_streamReader    = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")

但是,当我运行代码段(例如 ./python -i init_spark.py)时,插件并没有像它们应该的那样下载和/或加载.

However the plugins aren't downloading and/or loading when I run the snippet (e.g. ./python -i init_spark.py), as they should.

这种机制曾经有效,但后来停止了.我错过了什么?

This mechanism used to work, but then stopped. What am I missing?

先谢谢你!

推荐答案

在这种帖子中,QUESTION 比 ANSWER 更有价值,因为上面的代码有效,但在 Spark 2 中找不到.x 文档或示例.

This is the kind of post where the QUESTION will be worth more than the ANSWER, because the code above works but isn't anywhere to be found in Spark 2.x documentation or examples.

以上是我通过 Maven 坐标以编程方式向 Spark 2.x 添加功能的方式.我有这个工作,但后来它停止工作.为什么?

The above is how I've programmatically added functionality to Spark 2.x by way of Maven Coordinates. I had this working but then it stopped working. Why?

当我在 jupyter notebook 中运行上述代码时,notebook 在幕后已经通过我的 PYTHONSTARTUP 脚本运行了相同的代码片段.PYTHONSTARTUP 脚本与上述代码具有相同的代码,但省略了 maven 坐标(出于意图).

When I ran the above code in a jupyter notebook, the notebook had -- behind the scenes -- already run that identical code snippet by way of my PYTHONSTARTUP script. That PYTHONSTARTUP script has the same code as the above, but omits the maven coordinates (by intent).

那么,这个微妙的问题是如何出现的:

Here, then, is how this subtle problem emerges:

spark_sesn = SparkSession.builder.config(conf = spark_conf).getOrCreate()

因为 Spark Session 已经存在,所以上面的语句只是重用了没有加载 jars/libraries 的现有会话 (.getOrCreate())(同样,因为我的 PYTHONSTARTUP 脚本故意省略了它们).这就是为什么将打印语句放在 PYTHONSTARTUP 脚本中是个好主意(否则它们是静默的).

Because a Spark Session already existed, the above statement simply reused that existing session (.getOrCreate()), which did not have the jars/libraries loaded (again, because my PYTHONSTARTUP script intentionally omits them). This is why it is a good idea to put print statements in PYTHONSTARTUP scripts (which are otherwise silent).

最后,我只是忘了这样做:$ unset PYTHONSTARTUP 在启动 JupyterLab/Notebook 守护进程之前.

In the end, I simply forgot to do this: $ unset PYTHONSTARTUP before starting the JupyterLab / Notebook daemon.

我希望这个问题可以帮助其他人,因为这是如何以编程方式向 Spark 2.x(在本例中为 Kafka)添加功能.请注意,从 Maven Central 一次性下载指定的 jar 和递归依赖项时,您需要互联网连接.

I hope the Question helps others because that's how to programmatically add functionality to Spark 2.x (in this case Kafka). Note that you'll need an internet connection for the one-time download of the specified jars and recursive dependencies from Maven Central.

这篇关于PySpark 2.x:以编程方式将 Maven JAR 坐标添加到 Spark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆