将数据从pyspark写入ElasticSearch [英] Write data from pyspark to ElasticSearch

查看:1896
本文介绍了将数据从pyspark写入ElasticSearch的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遵循这个文章将一些数据发送到AWS ES,我使用了jar弹性搜索。这是我的脚本:

  from pyspark import SparkContext,SparkConf 
from pyspark.sql import SQLContext
if __name_ ==__main__:
conf = SparkConf()。setAppName(WriteToES)
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
es_conf = {es.nodes:https://search-elasticsearchdomaine.region.es.amazonaws.com/,
es.port:9200,es.nodes.client.only :true,es.resource:sensor_counts / metrics}
es_df_p = sqlContext.read.format(com.databricks.spark.csv)。option(header,true) .load(output / part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv)
es_df_pf = es_df_p.groupBy(network_key)
es_df_pf.saveAsNewAPIHadoopFile(
path =' - ',
outputFormatClass =org.elasticsearch.hadoop.mr.EsOutputFormat,
keyClass =org.apache.hadoop.io.NullWritable,
valueClass =org。 $ s
$ / code>

然后我运行这个命令行:

  spark-submit  - jars elasticsearch-spark-20_2.11-5.3.1.jar write_to_es.py 

其中write_to_es.py是上面的脚本。



这是我遇到的错误:

  17/05/05 17:51:52 INFO执行者:在1.0阶段运行任务0.0(TID 1)
17/05/05 17:51:52 INFO HadoopRDD:输入split:file:/ home / user /spark-2.1.0-bin-hadoop2.7/output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv:0+178633
17/05/05 17:51:52 INFO执行者:在1.0阶段完成任务0.0(TID 1)。 1143字节结果发送到驱动程序
17/05/05 17:51:52 INFO TaskSetManager:完成任务0.0在阶段1.0(TID 1)在11 ms在本地主机(执行器驱动程序)(1/1)
17/05/05 17:51:52 INFO TaskSchedulerImpl:删除TaskSet 1.0,其任务全部完成,从池
17/05/05 17:51:52 INFO DAGScheduler:ResultStage 1(在NativeMethodAccessorImpl加载) java:0)完成0,011 s
17/05/05 17:51:52 INFO DAGScheduler:作业1完成:在NativeMethodAccessorImpl.java:0加载,取0,018727 s
17/05 / 05 17:51:52 INFO BlockManagerInfo:在内存中删除了192.168.1.26:39609的broadcast_1_piece0(大小:2.1 KB,免费:366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo:删除了broadcast_2_piece0内存中的192.168.1.26:39609(大小:22.9 KB,免费:366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo:在内存中删除了192.168.1.26:39609中的broadcast_3_piece0(大小:2.1 KB ,免费:366.3 MB)
追溯(最近呼叫最后):
文件/home/user/spark-2.1.0-bin-hadoo p2.7 / write_to_es.py,第11行在< module>
es_df_pf.saveAsNewAPIHadoopFile(
文件/home/user/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py,第964行,__getattr__
AttributeError:'DataFrame'对象没有属性'saveAsNewAPIHadoopFile'
17/05/05 17:51:53 INFO SparkContext:调用stop()从shutdown hook
17/05 / 05 17:51:53 INFO SparkUI:停止Spark网络用户界面http://192.168.1.26:4040
17/05/05 17:51:53 INFO MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint停止!
17 / 05/05 17:51:53 INFO MemoryStore:MemoryStore清除
17/05/05 17:51:53 INFO BlockManager:BlockManager已停止
17/05/05 17:51:53 INFO BlockManagerMaster:BlockManagerMaster停止
17/05/05 17:51:53 INFO OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint:OutputCommitCoordinator stopped!
17/05/05 17:51:53 INFO SparkContext:成功停止SparkContext
17/05 / 05 17:51:53 INFO ShutdownHookManager:关闭钩叫
17/05/05 17: 51:53 INFO ShutdownHookManager:删除目录/ tmp / spark-501c4efa-5402-430e-93c1-aaff4caddef0
17/05/05 17:51:53 INFO ShutdownHookManager:删除目录/ tmp / spark-501c4efa-5402- 430e-93c1-aaff4caddef0 / pyspark-52406fa8-e8d1-4aca-bcb6-91748dc87507

如何解决这个问题:

  AttributeError:'DataFrame'对象没有属性'saveAsNewAPIHadoopFile'

非常感谢任何帮助或建议。

解决方案

有同样的问题。



阅读 PythonRDD 类型如下:

 >>> type(df)
< class'pyspark.sql.dataframe.DataFrame'>

>>> type(df.rdd)
< class'pyspark.rdd.RDD'>

>>> df.rdd.saveAsNewAPIHadoopFile(...)#得到相同的错误消息

>>> df.printSchema()#我的模式
root
| - id:string(nullable = true)
...

#让我们转换为PythonRDD
>>>> python_rdd = df.map(lambda项:('key',{
...'id':item ['id'],
...
...})

>>> python_rdd
PythonRDD [42]在RDD在PythonRDD.scala:43

>>> python_rdd.saveAsNewAPIHadoopFile(...)#现在,成功


I followed this article to send some data to AWS ES, and I used the jar elasticsearch-hadoop. Here is my script:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
if __name__ == "__main__":
    conf = SparkConf().setAppName("WriteToES")
    sc = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)
    es_conf = {"es.nodes" : "https://search-elasticsearchdomaine.region.es.amazonaws.com/",
    "es.port" : "9200","es.nodes.client.only" : "true","es.resource" : "sensor_counts/metrics"}
    es_df_p = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv")
    es_df_pf= es_df_p.groupBy("network_key")
    es_df_pf.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_conf)

Then I run this command line:

spark-submit --jars elasticsearch-spark-20_2.11-5.3.1.jar write_to_es.py

where write_to_es.py is the script above.

Here is the error I got:

17/05/05 17:51:52 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/05/05 17:51:52 INFO HadoopRDD: Input split: file:/home/user/spark-2.1.0-bin-hadoop2.7/output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv:0+178633
17/05/05 17:51:52 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1143 bytes result sent to driver
17/05/05 17:51:52 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 11 ms on localhost (executor driver) (1/1)
17/05/05 17:51:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/05/05 17:51:52 INFO DAGScheduler: ResultStage 1 (load at NativeMethodAccessorImpl.java:0) finished in 0,011 s
17/05/05 17:51:52 INFO DAGScheduler: Job 1 finished: load at NativeMethodAccessorImpl.java:0, took 0,018727 s
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.26:39609 in memory (size: 22.9 KB, free: 366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB)
Traceback (most recent call last):
  File "/home/user/spark-2.1.0-bin-hadoop2.7/write_to_es.py", line 11, in <module>
    es_df_pf.saveAsNewAPIHadoopFile(
  File "/home/user/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 964, in __getattr__
AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile'
17/05/05 17:51:53 INFO SparkContext: Invoking stop() from shutdown hook
17/05/05 17:51:53 INFO SparkUI: Stopped Spark web UI at http://192.168.1.26:4040
17/05/05 17:51:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/05/05 17:51:53 INFO MemoryStore: MemoryStore cleared
17/05/05 17:51:53 INFO BlockManager: BlockManager stopped
17/05/05 17:51:53 INFO BlockManagerMaster: BlockManagerMaster stopped
17/05/05 17:51:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/05/05 17:51:53 INFO SparkContext: Successfully stopped SparkContext
17/05/05 17:51:53 INFO ShutdownHookManager: Shutdown hook called
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0/pyspark-52406fa8-e8d1-4aca-bcb6-91748dc87507

How to solve this:

 AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile'

Any help or suggestion is very appreciated.

解决方案

I had the same problem.

After reading this article, I found the answer!!!

You have to convert to PythonRDD Type like this:

>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>

>>> type(df.rdd)
<class 'pyspark.rdd.RDD'>

>>> df.rdd.saveAsNewAPIHadoopFile(...) # Got the same error message

>>> df.printSchema() # My schema
root
 |-- id: string (nullable = true)
 ...

# Let's convert to PythonRDD
>>> python_rdd = df.map(lambda item: ('key', {
... 'id': item['id'],
    ...
... }))

>>> python_rdd
PythonRDD[42] at RDD at PythonRDD.scala:43

>>> python_rdd.saveAsNewAPIHadoopFile(...) # Now, success

这篇关于将数据从pyspark写入ElasticSearch的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆