如何在 PySpark 中将数据帧保存到 Elasticsearch? [英] How to save dataframe to Elasticsearch in PySpark?

查看:33
本文介绍了如何在 PySpark 中将数据帧保存到 Elasticsearch?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 spark 数据框,我正尝试将其推送到 AWS Elasticsearch,但在此之前我正在测试此示例 代码 代码片段推送到 ES,

I have a spark dataframe that I am trying to push to AWS Elasticsearch, but before that I was testing this sample code snippet to push to ES,

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
    'org.elasticsearch.spark.sql'
).option(
    'es.nodes', 'http://spark-data-push-adertadaltdpioy124.us-west-2.es.amazonaws.com'
).option(
    'es.port', 9200
).option(
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()

我收到一个错误提示,

java.lang.ClassNotFoundException:无法找到数据源:org.elasticsearch.spark.sql.请在 http://spark.apache.org/third-party-projects 中找到软件包.html

任何建议将不胜感激.

错误跟踪:

Traceback (most recent call last):
  File "es_3.py", line 12, in <module>
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 732, in save
    self._jwrite.save()
  File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.save.
: java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
        ... 12 more

推荐答案

tl;dr 使用 pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0并使用 format("es") 来引用连接器.

tl;dr Use pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0 and use format("es") to reference the connector.

引用安装来自Elasticsearch for Apache Hadoop 产品的官方文档:

Quoting Installation from the official documentation of the Elasticsearch for Apache Hadoop product:

就像其他库一样,elasticsearch-hadoop 需要在 Spark 的类路径中可用.

Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath.

后来在 支持的 SparkSQL 版本:

elasticsearch-hadoop 通过两个不同的 jars 支持 Spark SQL 1.3-1.6 和 Spark SQL 2.0 版本:elasticsearch-spark-1.x-<version>.jarelasticsearch-hadoop-.jar

elasticsearch-hadoop supports both version Spark SQL 1.3-1.6 and Spark SQL 2.0 through two different jars: elasticsearch-spark-1.x-<version>.jar and elasticsearch-hadoop-<version>.jar

elasticsearch-spark-2.0-.jar 支持 Spark SQL 2.0

elasticsearch-spark-2.0-<version>.jar supports Spark SQL 2.0

这看起来像是文档的问题(因为它们使用两个不同版本的 jar 文件),但这确实意味着您必须在 Spark 应用程序的 CLASSPATH 中使用正确的 jar 文件.

That looks like an issue with the document (as they use two different versions of the jar file), but does mean that you have to use the proper jar file on the CLASSPATH of your Spark application.

后来在同一个 文档:

在 org.elasticsearch.spark.sql 包下提供 Spark SQL 支持.

Spark SQL support is available under org.elasticsearch.spark.sql package.

这只是说明格式(在 df.write.format('org.elasticsearch.spark.sql') 中)是正确的.

That simply says that the format (in df.write.format('org.elasticsearch.spark.sql')) is correct.

进一步查看文档 你会发现你甚至可以使用别名 df.write.format("es") (!)

Further down the document you can find that you could even use an alias df.write.format("es") (!)

我发现 GitHub 上项目存储库中的 Apache Spark 部分更具可读性和当前.

I found Apache Spark section in the project's repository on GitHub more readable and current.

这篇关于如何在 PySpark 中将数据帧保存到 Elasticsearch?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆