如何使用Pyspark和Dataframes查询Elasticsearch索引 [英] How to query an Elasticsearch index using Pyspark and Dataframes

查看:50
本文介绍了如何使用Pyspark和Dataframes查询Elasticsearch索引的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Elasticsaerch的文档仅涉及将完整索引加载到Spark.

Elasticsaerch's documentation only covers loading a complete index to Spark.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()

如何执行查询以从Elasticsearch索引返回数据,并使用pyspark将它们作为DataFrame加载到Spark?

How can you perform a query to return data from an Elasticsearch index and load them to Spark as a DataFrame using pyspark?

推荐答案

下面是我的操作方法.

常规环境设置和命令:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

代码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "filtered": {
      "filter": {
        "exists": {
          "field": "label"
        }
      },
      "query": {
        "match_all": {}
      }
    }
  }
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "titanic/passenger",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

您还可以定义数据框列.有关更多信息,请参考此处.

You can also define data-frame columns. Refer Here for more info.

希望有帮助!

这篇关于如何使用Pyspark和Dataframes查询Elasticsearch索引的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆