如何使用Pyspark和Dataframes查询Elasticsearch索引 [英] How to query an Elasticsearch index using Pyspark and Dataframes
本文介绍了如何使用Pyspark和Dataframes查询Elasticsearch索引的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
Elasticsaerch的文档仅涉及将完整索引加载到Spark.
Elasticsaerch's documentation only covers loading a complete index to Spark.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()
如何执行查询以从Elasticsearch索引返回数据,并使用pyspark将它们作为DataFrame加载到Spark?
How can you perform a query to return data from an Elasticsearch index and load them to Spark as a DataFrame using pyspark?
推荐答案
下面是我的操作方法.
常规环境设置和命令:
export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2
./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar
代码:
from pyspark import SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
q ="""{
"query": {
"filtered": {
"filter": {
"exists": {
"field": "label"
}
},
"query": {
"match_all": {}
}
}
}
}"""
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "titanic/passenger",
"es.query" : q
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
您还可以定义数据框列.有关更多信息,请参考此处.
You can also define data-frame columns. Refer Here for more info.
希望有帮助!
这篇关于如何使用Pyspark和Dataframes查询Elasticsearch索引的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文