无法在Google DataProc的jupyter中添加jars pyspark [英] Can't add jars pyspark in jupyter of Google DataProc
问题描述
我在DataProc上有一个Jupyter笔记本,我需要一个jar来完成一些工作.我知道要编辑spark-defaults.conf
并使用--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
从命令行提交作业-它们都工作良好.但是,如果我想直接将jar添加到jupyter笔记本中,请尝试以下方法,但它们都将失败.
I have a Jupyter notebook on DataProc and I need a jar to run some job. I'm aware of editting spark-defaults.conf
and using the --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
to submit the job from command line - they both work well. However, if I want to directly add jar to jupyter notebook, I tried the methods below and they all fail.
方法1:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar pyspark-shell'
方法2:
spark = SparkSession.builder.appName('Shakespeare WordCount')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar')\
.getOrCreate()
它们都有相同的错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-2b7692efb32b> in <module>()
19 # Read BQ data into spark dataframe
20 # This method reads from BQ directly, does not use GCS for intermediate results
---> 21 df = spark.read.format('bigquery').option('table', table).load()
22
23 df.show(5)
/usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
170 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
171 else:
--> 172 return self._df(self._jreader.load())
173
174 @since(1.4)
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o81.load.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 13 more
我尝试运行的任务非常简单:
The task I try to run is very simple:
table = 'publicdata.samples.shakespeare'
df = spark.read.format('bigquery').option('table', table).load()
df.show(5)
我知道有很多类似的问题和答案,但是它们要么不起作用,要么不符合我的需求.我需要临时罐子,并且我不想将它们全部保留为默认配置.我想更加灵活,可以随时随地添加罐子.我该如何解决?谢谢!
I understand there are many similar questions and answers, but they are either not working or not fitting my needs. There are ad-hoc jars I'll need and I don't want to keep all of them in the default configurations. I'd like to be more flexible and add jars on-the-go. How can I solve this? Thank you!
推荐答案
不幸的是,没有一种内置的方法可以动态地执行此操作,而无需有效地编辑spark-defaults.conf
并重新启动内核.在Spark中有一个打开功能请求.
Unfortunately there isn't a built-in way to do this dynamically without effectively just editing spark-defaults.conf
and restarting the kernel. There's an open feature request in Spark for this.
Zeppelin具有一些可用性功能,可用于通过UI添加罐子,但是即使在Zeppelin中,您也必须在这样做之后重新启动解释器,以使Spark上下文能够在其类加载器中进行选择.而且,这些选项还要求jarfile已在本地文件系统上暂存;您不能只引用远程文件路径或URL.
Zeppelin has some usability features for adding jars through the UI but even in Zeppelin you have to restart the interpreter after doing so for the Spark context to pick it up in its classloader. And also those options require the jarfiles to already be staged on the local filesystem; you can't just refer to remote file paths or URLs.
一种解决方法是创建一个初始化操作,该操作设置一个systemd服务,该服务定期轮询某些HDFS目录,以同步到现有的类路径目录(如/usr/lib/spark/jars
)中:
One workaround would be to create an init action which sets up a systemd service which regularly polls on some HDFS directory to sync into one of the existing classpath directories like /usr/lib/spark/jars
:
#!/bin/bash
# Sets up continuous sync'ing of an HDFS directory into /usr/lib/spark/jars
# Manually copy jars into this HDFS directory to have them sync into
# ${LOCAL_DIR} on all nodes.
HDFS_DROPZONE='hdfs:///usr/lib/jars'
LOCAL_DIR='file:///usr/lib/spark/jars'
readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ "${ROLE}" == 'Master' ]]; then
hdfs dfs -mkdir -p "${HDFS_DROPZONE}"
fi
SYNC_SCRIPT='/usr/lib/hadoop/libexec/periodic-sync-jars.sh'
cat << EOF > "${SYNC_SCRIPT}"
#!/bin/bash
while true; do
sleep 5
hdfs dfs -ls ${HDFS_DROPZONE}/*.jar 2>/dev/null | grep hdfs: | \
sed 's/.*hdfs:/hdfs:/' | xargs -n 1 basename 2>/dev/null | sort \
> /tmp/hdfs_files.txt
hdfs dfs -ls ${LOCAL_DIR}/*.jar 2>/dev/null | grep file: | \
sed 's/.*file:/file:/' | xargs -n 1 basename 2>/dev/null | sort \
> /tmp/local_files.txt
comm -23 /tmp/hdfs_files.txt /tmp/local_files.txt > /tmp/diff_files.txt
if [ -s /tmp/diff_files.txt ]; then
for FILE in \$(cat /tmp/diff_files.txt); do
echo "$(date): Copying \${FILE} from ${HDFS_DROPZONE} into ${LOCAL_DIR}"
hdfs dfs -cp "${HDFS_DROPZONE}/\${FILE}" "${LOCAL_DIR}/\${FILE}"
done
fi
done
EOF
chmod 755 "${SYNC_SCRIPT}"
SERVICE_CONF='/usr/lib/systemd/system/sync-jars.service'
cat << EOF > "${SERVICE_CONF}"
[Unit]
Description=Period Jar Sync
[Service]
Type=simple
ExecStart=/bin/bash -c '${SYNC_SCRIPT} &>> /var/log/periodic-sync-jars.log'
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
chmod a+rw "${SERVICE_CONF}"
systemctl daemon-reload
systemctl enable sync-jars
systemctl restart sync-jars
systemctl status sync-jars
然后,只要需要将jarfile随处可见,只需将jarfile复制到hdfs:///usr/lib/jars
中,定期轮询器将自动将其粘贴到/usr/lib/spark/jars
中,然后您只需重新启动内核即可将其拾取.您可以通过SSH直接进入并运行hdfs dfs -cp
或直接从Jupyter笔记本进行子处理来将jar添加到该HDFS目录:
Then, whenever you need a jarfile to be available everywhere you just copy the jarfile into hdfs:///usr/lib/jars
, and the periodic poller will automatically stick it into /usr/lib/spark/jars
and then you simply restart your kernel to pick it up. You can add jars to that HDFS directory either by SSH'ing in and running hdfs dfs -cp
directly, or simply subprocess out from your Jupyter notebook:
import subprocess
sp = subprocess.Popen(
['hdfs', 'dfs', '-cp',
'gs://spark-lib/bigquery/spark-bigquery-latest.jar',
'hdfs:///usr/lib/jars/spark-bigquery-latest.jar'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = sp.communicate()
print(out)
print(err)
这篇关于无法在Google DataProc的jupyter中添加jars pyspark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!