无法在Google DataProc的jupyter中添加jars pyspark [英] Can't add jars pyspark in jupyter of Google DataProc

查看:106
本文介绍了无法在Google DataProc的jupyter中添加jars pyspark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在DataProc上有一个Jupyter笔记本,我需要一个jar来完成一些工作.我知道要编辑spark-defaults.conf并使用--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar从命令行提交作业-它们都工作良好.但是,如果我想直接将jar添加到jupyter笔记本中,请尝试以下方法,但它们都将失败.

I have a Jupyter notebook on DataProc and I need a jar to run some job. I'm aware of editting spark-defaults.conf and using the --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar to submit the job from command line - they both work well. However, if I want to directly add jar to jupyter notebook, I tried the methods below and they all fail.

方法1:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar pyspark-shell'

方法2:

spark = SparkSession.builder.appName('Shakespeare WordCount')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar')\
.getOrCreate()

它们都有相同的错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-2b7692efb32b> in <module>()
     19 # Read BQ data into spark dataframe
     20 # This method reads from BQ directly, does not use GCS for intermediate results
---> 21 df = spark.read.format('bigquery').option('table', table).load()
     22 
     23 df.show(5)

/usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    170             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    171         else:
--> 172             return self._df(self._jreader.load())
    173 
    174     @since(1.4)

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o81.load.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
    ... 13 more

我尝试运行的任务非常简单:

The task I try to run is very simple:

table = 'publicdata.samples.shakespeare'

df = spark.read.format('bigquery').option('table', table).load()

df.show(5)

我知道有很多类似的问题和答案,但是它们要么不起作用,要么不符合我的需求.我需要临时罐子,并且我不想将它们全部保留为默认配置.我想更加灵活,可以随时随地添加罐子.我该如何解决?谢谢!

I understand there are many similar questions and answers, but they are either not working or not fitting my needs. There are ad-hoc jars I'll need and I don't want to keep all of them in the default configurations. I'd like to be more flexible and add jars on-the-go. How can I solve this? Thank you!

推荐答案

不幸的是,没有一种内置的方法可以动态地执行此操作,而无需有效地编辑spark-defaults.conf并重新启动内核.在Spark中有一个打开功能请求.

Unfortunately there isn't a built-in way to do this dynamically without effectively just editing spark-defaults.conf and restarting the kernel. There's an open feature request in Spark for this.

Zeppelin具有一些可用性功能,可用于通过UI添加罐子,但是即使在Zeppelin中,您也必须在这样做之后重新启动解释器,以使Spark上下文能够在其类加载器中进行选择.而且,这些选项还要求jarfile已在本地文件系统上暂存;您不能只引用远程文件路径或URL.

Zeppelin has some usability features for adding jars through the UI but even in Zeppelin you have to restart the interpreter after doing so for the Spark context to pick it up in its classloader. And also those options require the jarfiles to already be staged on the local filesystem; you can't just refer to remote file paths or URLs.

一种解决方法是创建一个初始化操作,该操作设置一个systemd服务,该服务定期轮询某些HDFS目录,以同步到现有的类路径目录(如/usr/lib/spark/jars)中:

One workaround would be to create an init action which sets up a systemd service which regularly polls on some HDFS directory to sync into one of the existing classpath directories like /usr/lib/spark/jars:

#!/bin/bash
# Sets up continuous sync'ing of an HDFS directory into /usr/lib/spark/jars

# Manually copy jars into this HDFS directory to have them sync into
# ${LOCAL_DIR} on all nodes.
HDFS_DROPZONE='hdfs:///usr/lib/jars'
LOCAL_DIR='file:///usr/lib/spark/jars'

readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ "${ROLE}" == 'Master' ]]; then
  hdfs dfs -mkdir -p "${HDFS_DROPZONE}"
fi

SYNC_SCRIPT='/usr/lib/hadoop/libexec/periodic-sync-jars.sh'
cat << EOF > "${SYNC_SCRIPT}"
#!/bin/bash
while true; do
  sleep 5
  hdfs dfs -ls ${HDFS_DROPZONE}/*.jar 2>/dev/null | grep hdfs: | \
    sed 's/.*hdfs:/hdfs:/' | xargs -n 1 basename 2>/dev/null | sort \
    > /tmp/hdfs_files.txt
  hdfs dfs -ls ${LOCAL_DIR}/*.jar 2>/dev/null | grep file: | \
    sed 's/.*file:/file:/' | xargs -n 1 basename 2>/dev/null | sort \
    > /tmp/local_files.txt
  comm -23 /tmp/hdfs_files.txt /tmp/local_files.txt > /tmp/diff_files.txt
  if [ -s /tmp/diff_files.txt ]; then
    for FILE in \$(cat /tmp/diff_files.txt); do
      echo "$(date): Copying \${FILE} from ${HDFS_DROPZONE} into ${LOCAL_DIR}"
      hdfs dfs -cp "${HDFS_DROPZONE}/\${FILE}" "${LOCAL_DIR}/\${FILE}"
    done
  fi
done
EOF

chmod 755 "${SYNC_SCRIPT}"

SERVICE_CONF='/usr/lib/systemd/system/sync-jars.service'
cat << EOF > "${SERVICE_CONF}"
[Unit]
Description=Period Jar Sync
[Service]
Type=simple
ExecStart=/bin/bash -c '${SYNC_SCRIPT} &>> /var/log/periodic-sync-jars.log'
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF

chmod a+rw "${SERVICE_CONF}"

systemctl daemon-reload
systemctl enable sync-jars
systemctl restart sync-jars
systemctl status sync-jars

然后,只要需要将jarfile随处可见,只需将jarfile复制到hdfs:///usr/lib/jars中,定期轮询器将自动将其粘贴到/usr/lib/spark/jars中,然后您只需重新启动内核即可将其拾取.您可以通过SSH直接进入并运行hdfs dfs -cp或直接从Jupyter笔记本进行子处理来将jar添加到该HDFS目录:

Then, whenever you need a jarfile to be available everywhere you just copy the jarfile into hdfs:///usr/lib/jars, and the periodic poller will automatically stick it into /usr/lib/spark/jars and then you simply restart your kernel to pick it up. You can add jars to that HDFS directory either by SSH'ing in and running hdfs dfs -cp directly, or simply subprocess out from your Jupyter notebook:

import subprocess
sp = subprocess.Popen(
    ['hdfs', 'dfs', '-cp',
     'gs://spark-lib/bigquery/spark-bigquery-latest.jar',
     'hdfs:///usr/lib/jars/spark-bigquery-latest.jar'],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
out, err = sp.communicate()
print(out)
print(err)

这篇关于无法在Google DataProc的jupyter中添加jars pyspark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆