Python将Dataframe激发到Elasticsearch [英] Python spark Dataframe to Elasticsearch

查看:252
本文介绍了Python将Dataframe激发到Elasticsearch的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不知道如何使用Spark中的python将数据帧写入elasticsearch.我遵循了

I can't figure out how to write a dataframe to elasticsearch using python from spark. I followed the steps from here.

这是我的代码:

# Read file
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load('/vagrant/data/input/input.csv', schema = customSchema)

df.registerTempTable("data")

# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_conf)

以上代码给出

原因:net.razorvine.pickle.PickleException:预期为零 用于构造ClassDict的参数(用于 pyspark.sql.types._create_row)

Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

我还从以下位置启动了脚本: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py确保已加载elasticsearch-hadoop

I also started the script from: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py to ensure that elasticsearch-hadoop is loaded

推荐答案

对于初学者来说,saveAsNewAPIHadoopFile期望RDD(key, value)对,在您的情况下为

For starters saveAsNewAPIHadoopFile expects a RDD of (key, value) pairs and in your case this may happen only accidentally. The same thing applies to the value format you declare.

我不熟悉Elastic,但仅根据参数,您可能应该尝试类似的操作:

I am not familiar with Elastic but just based on the arguments you should probably try something similar to this:

kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)

由于Elastic-Hadoop提供了SQL数据源,您还应该可以跳过它并直接保存数据:

Since Elastic-Hadoop provide SQL Data Source you should be also able to skip that and save data directly:

df.write.format("org.elasticsearch.spark.sql").save(...)

这篇关于Python将Dataframe激发到Elasticsearch的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆