无法在 Google DataProc 的 jupyter 中添加 jars pyspark [英] Can't add jars pyspark in jupyter of Google DataProc
问题描述
我在 DataProc 上有一个 Jupyter 笔记本,我需要一个 jar 来运行一些作业.我知道编辑 spark-defaults.conf
并使用 --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
从命令行提交作业 - 它们都运行良好.但是,如果我想直接将jar添加到jupyter notebook,我尝试了以下方法,它们都失败了.
I have a Jupyter notebook on DataProc and I need a jar to run some job. I'm aware of editting spark-defaults.conf
and using the --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
to submit the job from command line - they both work well. However, if I want to directly add jar to jupyter notebook, I tried the methods below and they all fail.
方法一:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar pyspark-shell'
方法二:
spark = SparkSession.builder.appName('Shakespeare WordCount')
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar')
.getOrCreate()
他们都有同样的错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-2b7692efb32b> in <module>()
19 # Read BQ data into spark dataframe
20 # This method reads from BQ directly, does not use GCS for intermediate results
---> 21 df = spark.read.format('bigquery').option('table', table).load()
22
23 df.show(5)
/usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
170 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
171 else:
--> 172 return self._df(self._jreader.load())
173
174 @since(1.4)
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.
".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o81.load.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 13 more
我尝试运行的任务非常简单:
The task I try to run is very simple:
table = 'publicdata.samples.shakespeare'
df = spark.read.format('bigquery').option('table', table).load()
df.show(5)
我知道有很多类似的问题和答案,但它们要么不起作用,要么不符合我的需求.我需要一些特别的 jar,我不想将它们都保留在默认配置中.我想更灵活,并在旅途中添加罐子.我该如何解决这个问题?谢谢!
I understand there are many similar questions and answers, but they are either not working or not fitting my needs. There are ad-hoc jars I'll need and I don't want to keep all of them in the default configurations. I'd like to be more flexible and add jars on-the-go. How can I solve this? Thank you!
推荐答案
不幸的是,如果不有效地编辑 spark-defaults.conf
并重新启动核心.有一个 Spark 中的开放功能请求.
Unfortunately there isn't a built-in way to do this dynamically without effectively just editing spark-defaults.conf
and restarting the kernel. There's an open feature request in Spark for this.
Zeppelin 具有一些可用性功能,用于通过 UI 添加 jar 但即使在 Zeppelin 中,您也必须在这样做之后重新启动解释器,以便 Spark 上下文在其类加载器中获取它.而且这些选项要求 jarfile 已经在本地文件系统上暂存;您不能只引用远程文件路径或 URL.
Zeppelin has some usability features for adding jars through the UI but even in Zeppelin you have to restart the interpreter after doing so for the Spark context to pick it up in its classloader. And also those options require the jarfiles to already be staged on the local filesystem; you can't just refer to remote file paths or URLs.
一种解决方法是创建一个 init 操作来设置一个 systemd 服务,该服务定期轮询某个 HDFS 目录以同步到现有的类路径目录之一,例如 /usr/lib/spark/jars
:
One workaround would be to create an init action which sets up a systemd service which regularly polls on some HDFS directory to sync into one of the existing classpath directories like /usr/lib/spark/jars
:
#!/bin/bash
# Sets up continuous sync'ing of an HDFS directory into /usr/lib/spark/jars
# Manually copy jars into this HDFS directory to have them sync into
# ${LOCAL_DIR} on all nodes.
HDFS_DROPZONE='hdfs:///usr/lib/jars'
LOCAL_DIR='file:///usr/lib/spark/jars'
readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ "${ROLE}" == 'Master' ]]; then
hdfs dfs -mkdir -p "${HDFS_DROPZONE}"
fi
SYNC_SCRIPT='/usr/lib/hadoop/libexec/periodic-sync-jars.sh'
cat << EOF > "${SYNC_SCRIPT}"
#!/bin/bash
while true; do
sleep 5
hdfs dfs -ls ${HDFS_DROPZONE}/*.jar 2>/dev/null | grep hdfs: |
sed 's/.*hdfs:/hdfs:/' | xargs -n 1 basename 2>/dev/null | sort
> /tmp/hdfs_files.txt
hdfs dfs -ls ${LOCAL_DIR}/*.jar 2>/dev/null | grep file: |
sed 's/.*file:/file:/' | xargs -n 1 basename 2>/dev/null | sort
> /tmp/local_files.txt
comm -23 /tmp/hdfs_files.txt /tmp/local_files.txt > /tmp/diff_files.txt
if [ -s /tmp/diff_files.txt ]; then
for FILE in $(cat /tmp/diff_files.txt); do
echo "$(date): Copying ${FILE} from ${HDFS_DROPZONE} into ${LOCAL_DIR}"
hdfs dfs -cp "${HDFS_DROPZONE}/${FILE}" "${LOCAL_DIR}/${FILE}"
done
fi
done
EOF
chmod 755 "${SYNC_SCRIPT}"
SERVICE_CONF='/usr/lib/systemd/system/sync-jars.service'
cat << EOF > "${SERVICE_CONF}"
[Unit]
Description=Period Jar Sync
[Service]
Type=simple
ExecStart=/bin/bash -c '${SYNC_SCRIPT} &>> /var/log/periodic-sync-jars.log'
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
chmod a+rw "${SERVICE_CONF}"
systemctl daemon-reload
systemctl enable sync-jars
systemctl restart sync-jars
systemctl status sync-jars
然后,每当您需要一个 jarfile 随处可用时,您只需将 jarfile 复制到 hdfs:///usr/lib/jars
,定期轮询器会自动将其粘贴到 /usr/lib/spark/jars
然后你只需重启你的内核就可以了.您可以通过 SSH 进入并直接运行 hdfs dfs -cp
或简单地从您的 Jupyter 笔记本中子进程将 jar 添加到该 HDFS 目录:
Then, whenever you need a jarfile to be available everywhere you just copy the jarfile into hdfs:///usr/lib/jars
, and the periodic poller will automatically stick it into /usr/lib/spark/jars
and then you simply restart your kernel to pick it up. You can add jars to that HDFS directory either by SSH'ing in and running hdfs dfs -cp
directly, or simply subprocess out from your Jupyter notebook:
import subprocess
sp = subprocess.Popen(
['hdfs', 'dfs', '-cp',
'gs://spark-lib/bigquery/spark-bigquery-latest.jar',
'hdfs:///usr/lib/jars/spark-bigquery-latest.jar'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = sp.communicate()
print(out)
print(err)
这篇关于无法在 Google DataProc 的 jupyter 中添加 jars pyspark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!