将数据从pyspark写入ElasticSearch [英] Write data from pyspark to ElasticSearch
本文介绍了将数据从pyspark写入ElasticSearch的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我遵循这个文章将一些数据发送到AWS ES,我使用了jar弹性搜索。这是我的脚本:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
if __name_ ==__main__:
conf = SparkConf()。setAppName(WriteToES)
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
es_conf = {es.nodes:https://search-elasticsearchdomaine.region.es.amazonaws.com/,
es.port:9200,es.nodes.client.only :true,es.resource:sensor_counts / metrics}
es_df_p = sqlContext.read.format(com.databricks.spark.csv)。option(header,true) .load(output / part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv)
es_df_pf = es_df_p.groupBy(network_key)
es_df_pf.saveAsNewAPIHadoopFile(
path =' - ',
outputFormatClass =org.elasticsearch.hadoop.mr.EsOutputFormat,
keyClass =org.apache.hadoop.io.NullWritable,
valueClass =org。 $ s
$ / code>
然后我运行这个命令行:
spark-submit - jars elasticsearch-spark-20_2.11-5.3.1.jar write_to_es.py
其中write_to_es.py是上面的脚本。
这是我遇到的错误:
17/05/05 17:51:52 INFO执行者:在1.0阶段运行任务0.0(TID 1)
17/05/05 17:51:52 INFO HadoopRDD:输入split:file:/ home / user /spark-2.1.0-bin-hadoop2.7/output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv:0+178633
17/05/05 17:51:52 INFO执行者:在1.0阶段完成任务0.0(TID 1)。 1143字节结果发送到驱动程序
17/05/05 17:51:52 INFO TaskSetManager:完成任务0.0在阶段1.0(TID 1)在11 ms在本地主机(执行器驱动程序)(1/1)
17/05/05 17:51:52 INFO TaskSchedulerImpl:删除TaskSet 1.0,其任务全部完成,从池
17/05/05 17:51:52 INFO DAGScheduler:ResultStage 1(在NativeMethodAccessorImpl加载) java:0)完成0,011 s
17/05/05 17:51:52 INFO DAGScheduler:作业1完成:在NativeMethodAccessorImpl.java:0加载,取0,018727 s
17/05 / 05 17:51:52 INFO BlockManagerInfo:在内存中删除了192.168.1.26:39609的broadcast_1_piece0(大小:2.1 KB,免费:366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo:删除了broadcast_2_piece0内存中的192.168.1.26:39609(大小:22.9 KB,免费:366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo:在内存中删除了192.168.1.26:39609中的broadcast_3_piece0(大小:2.1 KB ,免费:366.3 MB)
追溯(最近呼叫最后):
文件/home/user/spark-2.1.0-bin-hadoo p2.7 / write_to_es.py,第11行在< module>
es_df_pf.saveAsNewAPIHadoopFile(
文件/home/user/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py,第964行,__getattr__
AttributeError:'DataFrame'对象没有属性'saveAsNewAPIHadoopFile'
17/05/05 17:51:53 INFO SparkContext:调用stop()从shutdown hook
17/05 / 05 17:51:53 INFO SparkUI:停止Spark网络用户界面http://192.168.1.26:4040
17/05/05 17:51:53 INFO MapOutputTrackerMasterEndpoint:MapOutputTrackerMasterEndpoint停止!
17 / 05/05 17:51:53 INFO MemoryStore:MemoryStore清除
17/05/05 17:51:53 INFO BlockManager:BlockManager已停止
17/05/05 17:51:53 INFO BlockManagerMaster:BlockManagerMaster停止
17/05/05 17:51:53 INFO OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint:OutputCommitCoordinator stopped!
17/05/05 17:51:53 INFO SparkContext:成功停止SparkContext
17/05 / 05 17:51:53 INFO ShutdownHookManager:关闭钩叫
17/05/05 17: 51:53 INFO ShutdownHookManager:删除目录/ tmp / spark-501c4efa-5402-430e-93c1-aaff4caddef0
17/05/05 17:51:53 INFO ShutdownHookManager:删除目录/ tmp / spark-501c4efa-5402- 430e-93c1-aaff4caddef0 / pyspark-52406fa8-e8d1-4aca-bcb6-91748dc87507
如何解决这个问题:
AttributeError:'DataFrame'对象没有属性'saveAsNewAPIHadoopFile'
非常感谢任何帮助或建议。
解决方案
有同样的问题。
>>> type(df)
< class'pyspark.sql.dataframe.DataFrame'>
>>> type(df.rdd)
< class'pyspark.rdd.RDD'>
>>> df.rdd.saveAsNewAPIHadoopFile(...)#得到相同的错误消息
>>> df.printSchema()#我的模式
root
| - id:string(nullable = true)
...
#让我们转换为PythonRDD
>>>> python_rdd = df.map(lambda项:('key',{
...'id':item ['id'],
...
...})
>>> python_rdd
PythonRDD [42]在RDD在PythonRDD.scala:43
>>> python_rdd.saveAsNewAPIHadoopFile(...)#现在,成功
I followed this article to send some data to AWS ES, and I used the jar elasticsearch-hadoop. Here is my script:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
if __name__ == "__main__":
conf = SparkConf().setAppName("WriteToES")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
es_conf = {"es.nodes" : "https://search-elasticsearchdomaine.region.es.amazonaws.com/",
"es.port" : "9200","es.nodes.client.only" : "true","es.resource" : "sensor_counts/metrics"}
es_df_p = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv")
es_df_pf= es_df_p.groupBy("network_key")
es_df_pf.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
Then I run this command line:
spark-submit --jars elasticsearch-spark-20_2.11-5.3.1.jar write_to_es.py
where write_to_es.py is the script above.
Here is the error I got:
17/05/05 17:51:52 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/05/05 17:51:52 INFO HadoopRDD: Input split: file:/home/user/spark-2.1.0-bin-hadoop2.7/output/part-00000-c353bb29-f189-4189-b35b-f7f1af717355.csv:0+178633
17/05/05 17:51:52 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1143 bytes result sent to driver
17/05/05 17:51:52 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 11 ms on localhost (executor driver) (1/1)
17/05/05 17:51:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/05/05 17:51:52 INFO DAGScheduler: ResultStage 1 (load at NativeMethodAccessorImpl.java:0) finished in 0,011 s
17/05/05 17:51:52 INFO DAGScheduler: Job 1 finished: load at NativeMethodAccessorImpl.java:0, took 0,018727 s
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.26:39609 in memory (size: 22.9 KB, free: 366.3 MB)
17/05/05 17:51:52 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.1.26:39609 in memory (size: 2.1 KB, free: 366.3 MB)
Traceback (most recent call last):
File "/home/user/spark-2.1.0-bin-hadoop2.7/write_to_es.py", line 11, in <module>
es_df_pf.saveAsNewAPIHadoopFile(
File "/home/user/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 964, in __getattr__
AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile'
17/05/05 17:51:53 INFO SparkContext: Invoking stop() from shutdown hook
17/05/05 17:51:53 INFO SparkUI: Stopped Spark web UI at http://192.168.1.26:4040
17/05/05 17:51:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/05/05 17:51:53 INFO MemoryStore: MemoryStore cleared
17/05/05 17:51:53 INFO BlockManager: BlockManager stopped
17/05/05 17:51:53 INFO BlockManagerMaster: BlockManagerMaster stopped
17/05/05 17:51:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/05/05 17:51:53 INFO SparkContext: Successfully stopped SparkContext
17/05/05 17:51:53 INFO ShutdownHookManager: Shutdown hook called
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0
17/05/05 17:51:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-501c4efa-5402-430e-93c1-aaff4caddef0/pyspark-52406fa8-e8d1-4aca-bcb6-91748dc87507
How to solve this:
AttributeError: 'DataFrame' object has no attribute 'saveAsNewAPIHadoopFile'
Any help or suggestion is very appreciated.
解决方案
I had the same problem.
After reading this article, I found the answer!!!
You have to convert to PythonRDD
Type like this:
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> type(df.rdd)
<class 'pyspark.rdd.RDD'>
>>> df.rdd.saveAsNewAPIHadoopFile(...) # Got the same error message
>>> df.printSchema() # My schema
root
|-- id: string (nullable = true)
...
# Let's convert to PythonRDD
>>> python_rdd = df.map(lambda item: ('key', {
... 'id': item['id'],
...
... }))
>>> python_rdd
PythonRDD[42] at RDD at PythonRDD.scala:43
>>> python_rdd.saveAsNewAPIHadoopFile(...) # Now, success
这篇关于将数据从pyspark写入ElasticSearch的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文