由BulkRequestBuilder创建的Elasticsearch索引变慢 [英] Elasticsearch Indexing by BulkRequestBuilder getting slow down

查看:577
本文介绍了由BulkRequestBuilder创建的Elasticsearch索引变慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所有Elasticsearch大师。

Hi all elasticsearch masters.

我有数百万个数据要由Elasticsearch Java API索引。
用于弹性搜索的群集节点数为三个(1个为主节点+ 2个节点)。

I have millions of data to be indexed by elasticsearch Java API. The number of cluster nodes for elasticsearch are three (1 as master + 2 nodes).

我的代码段如下。

Settings settings = ImmutableSettings.settingsBuilder()
     .put("cluster.name", "MyClusterName").build();

TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));

BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";

while((readLine = br.readLine()) != null){

    id = somefunction(readLine);
    String json = new ObjectMapper().writeValueAsString(readLine);
    bulkBuilder.add(client.prepareIndex(index, type, id)
        .setSource(json));
    bulkBuilderLength++;
    if(bulkBuilderLength % 1000== 0){
        logger.info("##### " + bulkBuilderLength + " data indexed.");
        BulkResponse bulkRes = bulkBuilder.execute().actionGet();
        if(bulkRes.hasFailures()){
            logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
        }
    }
}

br.close();

if(bulkBuilder.numberOfActions() > 0){
    logger.info("##### " + bulkBuilderLength + " data indexed.");
    BulkResponse bulkRes = bulkBuilder.execute().actionGet();
    if(bulkRes.hasFailures()){
        logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
    }
    bulkBuilder = client.prepareBulk();
}

它可以正常工作,但性能会迅速降低 >在成千上万的文档之后。

It works fine but the performance getting SLOW DOWN RAPIDLY after thousands of document.

我已经尝试将 refresh_interval 的设置值更改为 -1 ,而 副本数 0
但是,性能下降的情况是相同的。

I've already tried to change settings value of "refresh_interval" as -1 and "number_of_replicas" as 0. However, the situation of performance decreasing is the same.

如果我使用bigdesk监视群集的状态,则 GC 值每秒钟达到1,如下面的屏幕截图。

If I monitor the status of my cluster using bigdesk, the GC value reaches 1 in every seconds like the screenshot below.

有人可以帮助我吗?

预先感谢。

===================更新=========== ================

最后,我已经解决了这个问题。 (请参阅答案)。

Finally, I've solved this problem. (See the answer).

问题的原因是我错过了重新创建新的BulkRequestBuilder的过程。
像下面那样更改代码段后,性能再也不会降低。

The cause of the problem is that I've missed recreate a new BulkRequestBuilder. Performance degradation is never occurred after I've changed my code snippet like below.

非常感谢。

Settings settings = ImmutableSettings.settingsBuilder()
     .put("cluster.name", "MyClusterName").build();

TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));

BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";

while((readLine = br.readLine()) != null){

    id = somefunction(readLine);
    String json = new ObjectMapper().writeValueAsString(readLine);
    bulkBuilder.add(client.prepareIndex(index, type, id)
        .setSource(json));
    bulkBuilderLength++;
    if(bulkBuilderLength % 1000== 0){
        logger.info("##### " + bulkBuilderLength + " data indexed.");
        BulkResponse bulkRes = bulkBuilder.execute().actionGet();
        if(bulkRes.hasFailures()){
            logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
        }
        bulkBuilder = client.prepareBulk();  // This line is my mistake and the solution !!!
    }
}

br.close();

if(bulkBuilder.numberOfActions() > 0){
    logger.info("##### " + bulkBuilderLength + " data indexed.");
    BulkResponse bulkRes = bulkBuilder.execute().actionGet();
    if(bulkRes.hasFailures()){
        logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
    }
    bulkBuilder = client.prepareBulk();
}


推荐答案

这里的问题是你

这意味着您要一次又一次重新索引相同的第一个数据。

It means that you are reindexing the same first data again and again.

顺便说一句,看看BulkProcessor类。绝对好用。

BTW, look at BulkProcessor class. Definitely better to use.

这篇关于由BulkRequestBuilder创建的Elasticsearch索引变慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆