无法在 Google DataProc 的 jupyter 中添加 jars pyspark [英] Can't add jars pyspark in jupyter of Google DataProc

查看:37
本文介绍了无法在 Google DataProc 的 jupyter 中添加 jars pyspark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 DataProc 上有一个 Jupyter 笔记本,我需要一个 jar 来运行一些作业.我知道编辑 spark-defaults.conf 并使用 --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar从命令行提交作业 - 它们都运行良好.但是,如果我想直接将jar添加到jupyter notebook,我尝试了以下方法,它们都失败了.

I have a Jupyter notebook on DataProc and I need a jar to run some job. I'm aware of editting spark-defaults.conf and using the --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar to submit the job from command line - they both work well. However, if I want to directly add jar to jupyter notebook, I tried the methods below and they all fail.

方法一:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar pyspark-shell'

方法二:

spark = SparkSession.builder.appName('Shakespeare WordCount')
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar')
.getOrCreate()

他们都有同样的错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-2b7692efb32b> in <module>()
     19 # Read BQ data into spark dataframe
     20 # This method reads from BQ directly, does not use GCS for intermediate results
---> 21 df = spark.read.format('bigquery').option('table', table).load()
     22 
     23 df.show(5)

/usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    170             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    171         else:
--> 172             return self._df(self._jreader.load())
    173 
    174     @since(1.4)

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.
".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o81.load.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
    ... 13 more

我尝试运行的任务非常简单:

The task I try to run is very simple:

table = 'publicdata.samples.shakespeare'

df = spark.read.format('bigquery').option('table', table).load()

df.show(5)

我知道有很多类似的问题和答案,但它们要么不起作用,要么不符合我的需求.我需要一些特别的 jar,我不想将它们都保留在默认配置中.我想更灵活,并在旅途中添加罐子.我该如何解决这个问题?谢谢!

I understand there are many similar questions and answers, but they are either not working or not fitting my needs. There are ad-hoc jars I'll need and I don't want to keep all of them in the default configurations. I'd like to be more flexible and add jars on-the-go. How can I solve this? Thank you!

推荐答案

不幸的是,如果不有效地编辑 spark-defaults.conf 并重新启动核心.有一个 Spark 中的开放功能请求.

Unfortunately there isn't a built-in way to do this dynamically without effectively just editing spark-defaults.conf and restarting the kernel. There's an open feature request in Spark for this.

Zeppelin 具有一些可用性功能,用于通过 UI 添加 jar 但即使在 Zeppelin 中,您也必须在这样做之后重新启动解释器,以便 Spark 上下文在其类加载器中获取它.而且这些选项要求 jarfile 已经在本地文件系统上暂存;您不能只引用远程文件路径或 URL.

Zeppelin has some usability features for adding jars through the UI but even in Zeppelin you have to restart the interpreter after doing so for the Spark context to pick it up in its classloader. And also those options require the jarfiles to already be staged on the local filesystem; you can't just refer to remote file paths or URLs.

一种解决方法是创建一个 init 操作来设置一个 systemd 服务,该服务定期轮询某个 HDFS 目录以同步到现有的类路径目录之一,例如 /usr/lib/spark/jars:

One workaround would be to create an init action which sets up a systemd service which regularly polls on some HDFS directory to sync into one of the existing classpath directories like /usr/lib/spark/jars:

#!/bin/bash
# Sets up continuous sync'ing of an HDFS directory into /usr/lib/spark/jars

# Manually copy jars into this HDFS directory to have them sync into
# ${LOCAL_DIR} on all nodes.
HDFS_DROPZONE='hdfs:///usr/lib/jars'
LOCAL_DIR='file:///usr/lib/spark/jars'

readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ "${ROLE}" == 'Master' ]]; then
  hdfs dfs -mkdir -p "${HDFS_DROPZONE}"
fi

SYNC_SCRIPT='/usr/lib/hadoop/libexec/periodic-sync-jars.sh'
cat << EOF > "${SYNC_SCRIPT}"
#!/bin/bash
while true; do
  sleep 5
  hdfs dfs -ls ${HDFS_DROPZONE}/*.jar 2>/dev/null | grep hdfs: | 
    sed 's/.*hdfs:/hdfs:/' | xargs -n 1 basename 2>/dev/null | sort 
    > /tmp/hdfs_files.txt
  hdfs dfs -ls ${LOCAL_DIR}/*.jar 2>/dev/null | grep file: | 
    sed 's/.*file:/file:/' | xargs -n 1 basename 2>/dev/null | sort 
    > /tmp/local_files.txt
  comm -23 /tmp/hdfs_files.txt /tmp/local_files.txt > /tmp/diff_files.txt
  if [ -s /tmp/diff_files.txt ]; then
    for FILE in $(cat /tmp/diff_files.txt); do
      echo "$(date): Copying ${FILE} from ${HDFS_DROPZONE} into ${LOCAL_DIR}"
      hdfs dfs -cp "${HDFS_DROPZONE}/${FILE}" "${LOCAL_DIR}/${FILE}"
    done
  fi
done
EOF

chmod 755 "${SYNC_SCRIPT}"

SERVICE_CONF='/usr/lib/systemd/system/sync-jars.service'
cat << EOF > "${SERVICE_CONF}"
[Unit]
Description=Period Jar Sync
[Service]
Type=simple
ExecStart=/bin/bash -c '${SYNC_SCRIPT} &>> /var/log/periodic-sync-jars.log'
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF

chmod a+rw "${SERVICE_CONF}"

systemctl daemon-reload
systemctl enable sync-jars
systemctl restart sync-jars
systemctl status sync-jars

然后,每当您需要一个 jarfile 随处可用时,您只需将 jarfile 复制到 hdfs:///usr/lib/jars,定期轮询器会自动将其粘贴到 /usr/lib/spark/jars 然后你只需重启你的内核就可以了.您可以通过 SSH 进入并直接运行 hdfs dfs -cp 或简单地从您的 Jupyter 笔记本中子进程将 jar 添加到该 HDFS 目录:

Then, whenever you need a jarfile to be available everywhere you just copy the jarfile into hdfs:///usr/lib/jars, and the periodic poller will automatically stick it into /usr/lib/spark/jars and then you simply restart your kernel to pick it up. You can add jars to that HDFS directory either by SSH'ing in and running hdfs dfs -cp directly, or simply subprocess out from your Jupyter notebook:

import subprocess
sp = subprocess.Popen(
    ['hdfs', 'dfs', '-cp',
     'gs://spark-lib/bigquery/spark-bigquery-latest.jar',
     'hdfs:///usr/lib/jars/spark-bigquery-latest.jar'],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
out, err = sp.communicate()
print(out)
print(err)

这篇关于无法在 Google DataProc 的 jupyter 中添加 jars pyspark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆