Python将Dataframe激发到Elasticsearch [英] Python spark Dataframe to Elasticsearch
问题描述
我不知道如何使用Spark中的python将数据帧写入elasticsearch.我遵循了
I can't figure out how to write a dataframe to elasticsearch using python from spark. I followed the steps from here.
这是我的代码:
# Read file
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true') \
.load('/vagrant/data/input/input.csv', schema = customSchema)
df.registerTempTable("data")
# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")
es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
以上代码给出
原因:net.razorvine.pickle.PickleException:预期为零 用于构造ClassDict的参数(用于 pyspark.sql.types._create_row)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
我还从以下位置启动了脚本:
spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py
确保已加载elasticsearch-hadoop
I also started the script from:
spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py
to ensure that elasticsearch-hadoop
is loaded
推荐答案
对于初学者来说,saveAsNewAPIHadoopFile
期望RDD
个(key, value)
对,在您的情况下为
For starters saveAsNewAPIHadoopFile
expects a RDD
of (key, value)
pairs and in your case this may happen only accidentally. The same thing applies to the value format you declare.
我不熟悉Elastic,但仅根据参数,您可能应该尝试类似的操作:
I am not familiar with Elastic but just based on the arguments you should probably try something similar to this:
kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)
由于Elastic-Hadoop提供了SQL数据源,您还应该可以跳过它并直接保存数据:
Since Elastic-Hadoop provide SQL Data Source you should be also able to skip that and save data directly:
df.write.format("org.elasticsearch.spark.sql").save(...)
这篇关于Python将Dataframe激发到Elasticsearch的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!