如何使用elasticsearch.helpers.streaming_bulk [英] How to use elasticsearch.helpers.streaming_bulk

查看:1967
本文介绍了如何使用elasticsearch.helpers.streaming_bulk的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人可以建议如何使用函数elasticsearch.helpers.streaming_bulk而不是弹性搜索.helpers.bulk将数据索引到弹性搜索。



如果我只是更改streaming_bulk而不是批量,则没有任何索引,所以我猜想需要以不同的形式使用。



下面的代码创建了索引,类型和索引数据,从500个元素的CSV文件到弹性搜索。它正在工作,但我正在徘徊是有可能增加预期。这就是为什么我想尝试streaming_bulk函数。



目前,我需要10分钟,为200MB的CSV文档索引1百万行。我使用两台机器,Centos 6.6有8个CPU-s,x86_64,CPU MHz:2499.902,Mem:15.574G。
不确定它可以更快。

  es = elasticsearch.Elasticsearch([{'host':'uxmachine -test','port':9200}])
index_name ='new_index'
type_name ='new_type'
mapping = json.loads(open(config [index_mapping])。 read())#read从json文件

es.indices.create(index_name)
es.indices.put_mapping(index = index_name,doc_type = type_name,body = mapping)

with open(file_to_index,'rb')as csvfile:
reader = csv.reader(csvfile)#read从CSV文件索引的文档,超过百万行
content = { _index:index_name,_type:type_name}
batch_chunks = []
iterator = 0

用于读取器中的行:
var = transform_row_for_indexing(row,字段,index_name,type_name,id_name,id_increment)
id_increment = id_increment + 1
#var = transform_row_for_indexing(row,fields,index_name,type_name)
batch_chunk s.append(var)
如果迭代器%500 == 0:
helpers.bulk(es,batch_chunks)
del batch_chunks [:]
打印ispucalo批次
iterator = iterator + 1
#indexing最后一个batch_chunk
如果len(batch_chunks)!= 0:
helpers.bulk(es,batch_chunks)


解决方案

所以流批量返回一个interator。这意味着在您开始迭代之前不会发生任何事情。 批量功能的代码如下所示:

  success,failed = 0,0 

#要收集的错误列表不是stats_only
errors = []

for ok,item in streaming_bulk(客户端,操作,** kwargs):
#通过请求响应对和检测失败
如果不行:
如果不是stats_only:
errors.append(item)
失败+ = 1
else:
成功+ = 1

返回成功,失败if stats_only else errors

所以基本上只是调用streaming_bulk(客户端,动作,** kwargs)实际上不会做任何事情。直到你重复它,就像在这个for循环中完成的那样,索引实际上开始发生。



所以在你的代码中。欢迎您将批量更改为streaming_bulk,但您需要迭代流批量的结果,以便实际上具有任何索引。


Can someone advice how to use function elasticsearch.helpers.streaming_bulk instead elasticsearch.helpers.bulk for indexing data into elasticsearch.

If I simply change streaming_bulk instead of bulk, nothing gets indexed, so I guess it needs to be used in different form.

Code below creates index, type and index data from CSV file in chunks of 500 elemens into elasticsearch. It is working properly but I am wandering is it possible to increse prerformance. That's why I want to try out streaming_bulk function.

Currently I need 10 minutes to index 1 million rows for CSV document of 200MB. I use two machines, Centos 6.6 with 8 CPU-s, x86_64, CPU MHz: 2499.902, Mem: 15.574G total. Not sure can it go any faster.

es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file

es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)

with open(file_to_index, 'rb') as csvfile:
    reader = csv.reader(csvfile)        #read documents for indexing from CSV file, more than million rows
    content = {"_index": index_name, "_type": type_name}
    batch_chunks = []
    iterator = 0

    for row in reader:
        var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
        id_increment = id_increment + 1
        #var = transform_row_for_indexing(row,fields, index_name, type_name)
        batch_chunks.append(var)
        if iterator % 500 == 0:
            helpers.bulk(es,batch_chunks)
            del batch_chunks[:]
            print "ispucalo batch"
        iterator = iterator + 1
    # indexing of last batch_chunk
    if len(batch_chunks) != 0:
        helpers.bulk(es,batch_chunks)

解决方案

So streaming bulk returns an interator. Which means nothing will happen until you start iterating over it. The code for the 'bulk' function looks like this:

success, failed = 0, 0

# list of errors to be collected is not stats_only
errors = []

for ok, item in streaming_bulk(client, actions, **kwargs):
    # go through request-reponse pairs and detect failures
    if not ok:
        if not stats_only:
            errors.append(item)
        failed += 1
    else:
        success += 1

return success, failed if stats_only else errors

So basically calling just streaming_bulk(client, actions, **kwargs) won't actually do anything. It's not until you iterate over it as is done in this for loop that the indexing actually starts to happen.

So in your code. You are welcome to change 'bulk' to 'streaming_bulk' however you need to iterate over the results of streaming bulk in order actually have anything indexed.

这篇关于如何使用elasticsearch.helpers.streaming_bulk的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆