使用Java RestHighLevelClient使用BulkRequest API为ElasticSearch索引100K文档 [英] ElasticSearch Indexing 100K documents with BulkRequest API using Java RestHighLevelClient

查看:2911
本文介绍了使用Java RestHighLevelClient使用BulkRequest API为ElasticSearch索引100K文档的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用滚动API从索引 documents_qa 中读取 100k 以及文件路径。实际文件将在我的本地 d:\drive 中可用。通过使用文件路径,我将读取实际文件并转换为base64,并使用另一个文件 document_attachment_qa 中的(文件的)base64内容重新索引。

Am reading 100k plus file path from the index documents_qa using scroll API. Actual files will be available in my local d:\drive. By using the file path am reading the actual file and converting into base64 and am reindex with the base64 content (of a file) in another index document_attachment_qa.

我当前的实现是,正在读取filePath,将文件转换为base64并与fileContent一起索引文档。因此,它花费了更多时间,例如:-索引4000文档花费了6个小时以上,并且由于 IO异常而终止了连接。

My current implementation is, am reading filePath, convering the file into base64 and indexing document along with fileContent one by one. So its taking more time for eg:- indexing 4000 documents its taking more than 6 hours and also connection is terminating due to IO exception.

所以现在我想使用BulkRequest API为文档建立索引,但是我正在使用RestHighLevelClient,并且不确定如何使用 BulkRequest API和 RestHighLevelClient

So now i want to index the documents using BulkRequest API, but am using RestHighLevelClient and am not sure how to using BulkRequest API along with RestHighLevelClient.

请找到我当前的实现,该实现将文档逐一索引。 / p>

Please find my current implementation, which am indexing one by one document.

jsonMap = new HashMap<String, Object>();
            jsonMap.put("id", doc.getId());
            jsonMap.put("app_language", doc.getApp_language());
            jsonMap.put("fileContent", result);

            String id=Long.toString(doc.getId());

IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id ) // ATTACHMENT is the index name
                    .source(jsonMap) // Its my single document.
                    .setPipeline(ATTACHMENT);

IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout 

我在 BulkRequest

https://www.elastic.co/guide/zh-CN/elasticsearch/client/java-api/current/java -docs-bulk.html

但不确定如何实现 BulkRequestBuilder bulkRequest = client.prepareBulk(); client.prepareBulk()方法,何时使用 RestHighLevelClient

But am not sure how to implement BulkRequestBuilder bulkRequest = client.prepareBulk(); client.prepareBulk() method when and using RestHighLevelClient.

更新1

Am试图一次索引所有 100K 个文档。所以我创建了一个JSONArray并将我所有的JSONObject一一放入数组中。最后,我试图构建 BulkRequest 并将我所有的文档(JSONArray)作为源添加到BulkRequest中,并试图对其进行索引。

Am trying to indexing all 100K documents in one shot. so i creating one JSONArray and put all my JSONObject into the array one by one. Finally am trying to build BulkRequest and add all my documents (JSONArray) as a source to the BulkRequest and trying to index them.

不确定如何将JSONArray转换为字符串列表。

Here am not sure, how to convert my JSONArray to List of String.

private final static String ATTACHMENT = "document_attachment_qa";
private final static String TYPE = "doc";
JSONArray reqJSONArray=new JSONArray();

while (searchHits != null && searchHits.length > 0) { 
...
...
    jsonMap = new HashMap<String, Object>();
    jsonMap.put("id", doc.getId());
    jsonMap.put("app_language", doc.getApp_language());
    jsonMap.put("fileContent", result);

    reqJSONArray.put(jsonMap)
}

String actionMetaData = String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }%n", ATTACHMENT, TYPE);
List<String> bulkData =   // not sure how to convert a list of my documents in JSON strings    
StringBuilder bulkRequestBody = new StringBuilder();
for (String bulkItem : bulkData) {
    bulkRequestBody.append(actionMetaData);
    bulkRequestBody.append(bulkItem);
    bulkRequestBody.append("\n");
}

HttpEntity entity = new NStringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON);
try {
    Response response = SearchEngineClient.getRestClientInstance().performRequest("POST", "/ATTACHMENT/TYPE/_bulk", Collections.emptyMap(), entity);
    return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
} catch (Exception e) {
    // do something
}


推荐答案

除了@chengpohi答案。我想添加以下几点:

In addition to @chengpohi answer. I would like to add below points:

BulkRequest可用于通过单个请求执行多个索引,更新和/或删除操作。

A BulkRequest can be used to execute multiple index, update and/or delete operations using a single request.

至少需要向批量请求中添加一个操作:

It requires at least one operation to be added to the Bulk request:

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz"));




注意:Bulk API仅支持以JSON或SMILE编码的文档。
提供任何其他格式的文档都会导致错误。

Note: The Bulk API supports only documents encoded in JSON or SMILE. Providing documents in any other format will result in an error.

同步操作:

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

客户将是高级休息客户,执行将是同步的。

client will be High-Level Rest Client and execution will be synchronous.

异步操作(推荐方法):

Asynchronous Operation(Recommended Approach):

client.bulkAsync(request, RequestOptions.DEFAULT, listener);

异步执行批量请求需要将BulkRequest实例和ActionListener实例都传递给

The asynchronous execution of a bulk request requires both the BulkRequest instance and an ActionListener instance to be passed to the asynchronous method.

Listener Example:

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {

    }

    @Override
    public void onFailure(Exception e) {

    }
};

返回的BulkResponse包含有关已执行操作的信息,并允许如下迭代每个结果:

The returned BulkResponse contains information about the executed operations and allows to iterate over each result as follows:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

可以选择提供以下参数:

The following arguments can optionally be provided:

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");

我希望这会有所帮助。

这篇关于使用Java RestHighLevelClient使用BulkRequest API为ElasticSearch索引100K文档的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆