如何将Spark数据框推送到弹性搜索(Pyspark) [英] How To Push a Spark Dataframe to Elastic Search (Pyspark)

查看:94
本文介绍了如何将Spark数据框推送到弹性搜索(Pyspark)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

初学者ES问题在这里

将Spark Dataframe推送到Elastic Search的工作流程或步骤是什么?

根据研究,我认为我需要使用 spark. newAPIHadoopFile()方法.

From research, I believe I need to use the spark.newAPIHadoopFile() method.

但是,请仔细阅读 Elastic Search文档 ,以及其他Stack Q/A 我仍然对参数的格式有些困惑需要进入以及为什么

However, digging through the Elastic Search Documentation, and other Stack Q/A's I am still a little confused on what format the arguments need to be in and why

请注意我正在使用pyspark,这是ES的新表(尚不存在索引),并且df是5列(2个字符串类型,2个长类型和1个整数列表),具有约350万行

NOTE that I am using pyspark, this is a new table to ES (no index already exists), and the df is 5 columns (2 string types, 2 long types,  and 1 list of ints) with ~3.5M rows.

推荐答案

设法找到答案,我将与大家分享. Spark DF(来自pyspark.sql)当前不支持newAPIHadoopFile()方法;它不支持newAPIHadoopFile()方法.但是,df.rdd.saveAsNewAPIHadoopFile()也给了我错误.诀窍是通过以下函数将df转换为字符串

Managed to find an answer so I'll share. Spark DF's (from pyspark.sql) don't currently support the newAPIHadoopFile() methods; however, df.rdd.saveAsNewAPIHadoopFile() was giving me errors as well. The trick was to convert the df to strings via the following function

def transform(doc):
    import json
    import hashlib

    _json = json.dumps(doc)
    keys = doc.keys()
    for key in keys:
        if doc[key] == 'null' or doc[key] == 'None':
            del doc[key]
    if not doc.has_key('id'):
        id = hashlib.sha224(_json).hexdigest()
        doc['id'] = id
    else:
        id = doc['id']
    _json = json.dumps(doc)
    return (id, _json)

所以我的JSON工作流程是:

So my JSON workflow is:

1:df = spark.read.json('XXX.json')

2:rdd_mapped = df.rdd.map(lambda y: y.asDict())

3:final_rdd = rdd_mapped.map(transform)

4:

final_rdd.saveAsNewAPIHadoopFile(
     path='-', 
     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
     keyClass="org.apache.hadoop.io.NullWritable",  
     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
     conf={ "es.resource" : "<INDEX> / <INDEX>", "es.mapping.id":"id", 
         "es.input.json": "true", "es.net.http.auth.user":"elastic",
         "es.write.operation":"index", "es.nodes.wan.only":"false",
         "es.net.http.auth.pass":"changeme", "es.nodes":"<NODE1>, <NODE2>, <NODE3>...",
         "es.port":"9200" })

更多有关ES参数的信息可在此处找到(滚动到配置")

这篇关于如何将Spark数据框推送到弹性搜索(Pyspark)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆