如何在PySpark中将数据帧保存到Elasticsearch? [英] How to save dataframe to Elasticsearch in PySpark?

查看:104
本文介绍了如何在PySpark中将数据帧保存到Elasticsearch?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark数据框,我试图将其推送到AWS Elasticsearch,但是在此之前,我正在测试此示例

I have a spark dataframe that I am trying to push to AWS Elasticsearch, but before that I was testing this sample code snippet to push to ES,

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
    'org.elasticsearch.spark.sql'
).option(
    'es.nodes', 'http://spark-data-push-adertadaltdpioy124.us-west-2.es.amazonaws.com'
).option(
    'es.port', 9200
).option(
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()

我收到一条错误消息,

java.lang.ClassNotFoundException:无法找到数据源:org.elasticsearch.spark.sql.请在 http://spark.apache.org/third-party-projects中找到软件包. html

任何建议将不胜感激.

错误跟踪:

Traceback (most recent call last):
  File "es_3.py", line 12, in <module>
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 732, in save
    self._jwrite.save()
  File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.save.
: java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
        ... 12 more

推荐答案

tl; dr 使用pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0并使用format("es")引用连接器.

tl;dr Use pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0 and use format("es") to reference the connector.

从以下位置引用安装 Elasticsearch for Apache Hadoop产品的官方文档:

Quoting Installation from the official documentation of the Elasticsearch for Apache Hadoop product:

就像其他库一样,elasticsearch-hadoop必须在Spark的类路径中可用.

Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath.

以及随后的受支持的Spark SQL版本:

elasticsearch-hadoop通过两个不同的jar:elasticsearch-spark-1.x-<version>.jarelasticsearch-hadoop-<version>.jar

elasticsearch-spark-2.0-<version>.jar支持Spark SQL 2.0

elasticsearch-spark-2.0-<version>.jar supports Spark SQL 2.0

这看起来像是文档的问题(因为它们使用jar文件的两个不同版本),但这确实意味着您必须在Spark应用程序的CLASSPATH上使用正确的jar文件.

That looks like an issue with the document (as they use two different versions of the jar file), but does mean that you have to use the proper jar file on the CLASSPATH of your Spark application.

,然后在同一文档:

Spark SQL支持在org.elasticsearch.spark.sql软件包下提供.

Spark SQL support is available under org.elasticsearch.spark.sql package.

这仅表示格式(在df.write.format('org.elasticsearch.spark.sql')中)是正确的.

That simply says that the format (in df.write.format('org.elasticsearch.spark.sql')) is correct.

进一步阅读文档您会发现甚至可以使用别名df.write.format("es")(!)

Further down the document you can find that you could even use an alias df.write.format("es") (!)

我在GitHub上的项目存储库中找到 Apache Spark 部分更具可读性和最新的.

I found Apache Spark section in the project's repository on GitHub more readable and current.

这篇关于如何在PySpark中将数据帧保存到Elasticsearch?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆