如何在PySpark中将数据帧保存到Elasticsearch? [英] How to save dataframe to Elasticsearch in PySpark?
问题描述
我有一个Spark数据框,我试图将其推送到AWS Elasticsearch,但是在此之前,我正在测试此示例
I have a spark dataframe that I am trying to push to AWS Elasticsearch, but before that I was testing this sample code snippet to push to ES,
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
'org.elasticsearch.spark.sql'
).option(
'es.nodes', 'http://spark-data-push-adertadaltdpioy124.us-west-2.es.amazonaws.com'
).option(
'es.port', 9200
).option(
'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()
我收到一条错误消息,
java.lang.ClassNotFoundException:无法找到数据源:org.elasticsearch.spark.sql.请在 http://spark.apache.org/third-party-projects中找到软件包. html
任何建议将不胜感激.
错误跟踪:
Traceback (most recent call last):
File "es_3.py", line 12, in <module>
'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
File "/usr/local/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 732, in save
self._jwrite.save()
File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.save.
: java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 12 more
推荐答案
tl; dr 使用pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0
并使用format("es")
引用连接器.
tl;dr Use pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0
and use format("es")
to reference the connector.
从以下位置引用安装 Elasticsearch for Apache Hadoop产品的官方文档:
Quoting Installation from the official documentation of the Elasticsearch for Apache Hadoop product:
就像其他库一样,elasticsearch-hadoop必须在Spark的类路径中可用.
Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath.
以及随后的受支持的Spark SQL版本:
elasticsearch-hadoop通过两个不同的jar:
elasticsearch-spark-1.x-<version>.jar
和elasticsearch-hadoop-<version>.jar
elasticsearch-spark-2.0-<version>.jar
支持Spark SQL 2.0
elasticsearch-spark-2.0-<version>.jar
supports Spark SQL 2.0
这看起来像是文档的问题(因为它们使用jar文件的两个不同版本),但这确实意味着您必须在Spark应用程序的CLASSPATH上使用正确的jar文件.
That looks like an issue with the document (as they use two different versions of the jar file), but does mean that you have to use the proper jar file on the CLASSPATH of your Spark application.
,然后在同一文档:
Spark SQL支持在org.elasticsearch.spark.sql软件包下提供.
Spark SQL support is available under org.elasticsearch.spark.sql package.
这仅表示格式(在df.write.format('org.elasticsearch.spark.sql')
中)是正确的.
That simply says that the format (in df.write.format('org.elasticsearch.spark.sql')
) is correct.
进一步阅读文档您会发现甚至可以使用别名df.write.format("es")
(!)
Further down the document you can find that you could even use an alias df.write.format("es")
(!)
我在GitHub上的项目存储库中找到 Apache Spark 部分更具可读性和最新的.
I found Apache Spark section in the project's repository on GitHub more readable and current.
这篇关于如何在PySpark中将数据帧保存到Elasticsearch?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!