使用Java RestHighLevelClient使用BulkRequest API为ElasticSearch索引100K文档 [英] ElasticSearch Indexing 100K documents with BulkRequest API using Java RestHighLevelClient
问题描述
使用滚动API从索引 documents_qa
中读取 100k
以及文件路径。实际文件将在我的本地 d:\drive
中可用。通过使用文件路径,我将读取实际文件并转换为base64,并使用另一个文件 document_attachment_qa
中的(文件的)base64内容重新索引。
Am reading 100k
plus file path from the index documents_qa
using scroll API. Actual files will be available in my local d:\drive
. By using the file path am reading the actual file and converting into base64 and am reindex with the base64 content (of a file) in another index document_attachment_qa
.
我当前的实现是,正在读取filePath,将文件转换为base64并与fileContent一起索引文档。因此,它花费了更多时间,例如:-索引4000文档花费了6个小时以上,并且由于 IO异常
而终止了连接。
My current implementation is, am reading filePath, convering the file into base64 and indexing document along with fileContent one by one. So its taking more time for eg:- indexing 4000 documents its taking more than 6 hours and also connection is terminating due to IO exception
.
所以现在我想使用BulkRequest API为文档建立索引,但是我正在使用RestHighLevelClient,并且不确定如何使用 BulkRequest
API和 RestHighLevelClient
。
So now i want to index the documents using BulkRequest API, but am using RestHighLevelClient and am not sure how to using BulkRequest
API along with RestHighLevelClient
.
请找到我当前的实现,该实现将文档逐一索引。 / p>
Please find my current implementation, which am indexing one by one document.
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", doc.getId());
jsonMap.put("app_language", doc.getApp_language());
jsonMap.put("fileContent", result);
String id=Long.toString(doc.getId());
IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id ) // ATTACHMENT is the index name
.source(jsonMap) // Its my single document.
.setPipeline(ATTACHMENT);
IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
我在 BulkRequest $ c中找到了以下文档$ c>。
https://www.elastic.co/guide/zh-CN/elasticsearch/client/java-api/current/java -docs-bulk.html
但不确定如何实现 BulkRequestBuilder bulkRequest = client.prepareBulk();
client.prepareBulk()方法,何时使用 RestHighLevelClient
。
But am not sure how to implement BulkRequestBuilder bulkRequest = client.prepareBulk();
client.prepareBulk() method when and using RestHighLevelClient
.
更新1
Am试图一次索引所有 100K
个文档。所以我创建了一个JSONArray并将我所有的JSONObject一一放入数组中。最后,我试图构建 BulkRequest
并将我所有的文档(JSONArray)作为源添加到BulkRequest中,并试图对其进行索引。
Am trying to indexing all 100K
documents in one shot. so i creating one JSONArray and put all my JSONObject into the array one by one. Finally am trying to build BulkRequest
and add all my documents (JSONArray) as a source to the BulkRequest and trying to index them.
不确定如何将JSONArray转换为字符串列表。
Here am not sure, how to convert my JSONArray to List of String.
private final static String ATTACHMENT = "document_attachment_qa";
private final static String TYPE = "doc";
JSONArray reqJSONArray=new JSONArray();
while (searchHits != null && searchHits.length > 0) {
...
...
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", doc.getId());
jsonMap.put("app_language", doc.getApp_language());
jsonMap.put("fileContent", result);
reqJSONArray.put(jsonMap)
}
String actionMetaData = String.format("{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"%s\" } }%n", ATTACHMENT, TYPE);
List<String> bulkData = // not sure how to convert a list of my documents in JSON strings
StringBuilder bulkRequestBody = new StringBuilder();
for (String bulkItem : bulkData) {
bulkRequestBody.append(actionMetaData);
bulkRequestBody.append(bulkItem);
bulkRequestBody.append("\n");
}
HttpEntity entity = new NStringEntity(bulkRequestBody.toString(), ContentType.APPLICATION_JSON);
try {
Response response = SearchEngineClient.getRestClientInstance().performRequest("POST", "/ATTACHMENT/TYPE/_bulk", Collections.emptyMap(), entity);
return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
} catch (Exception e) {
// do something
}
推荐答案
除了@chengpohi答案。我想添加以下几点:
In addition to @chengpohi answer. I would like to add below points:
BulkRequest可用于通过单个请求执行多个索引,更新和/或删除操作。
A BulkRequest can be used to execute multiple index, update and/or delete operations using a single request.
至少需要向批量请求中添加一个操作:
It requires at least one operation to be added to the Bulk request:
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("posts", "doc", "1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON,"field", "baz"));
注意:Bulk API仅支持以JSON或SMILE编码的文档。
提供任何其他格式的文档都会导致错误。
Note: The Bulk API supports only documents encoded in JSON or SMILE. Providing documents in any other format will result in an error.
同步操作:
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
客户将是高级休息客户,执行将是同步的。
client will be High-Level Rest Client and execution will be synchronous.
异步操作(推荐方法):
Asynchronous Operation(Recommended Approach):
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
异步执行批量请求需要将BulkRequest实例和ActionListener实例都传递给
The asynchronous execution of a bulk request requires both the BulkRequest instance and an ActionListener instance to be passed to the asynchronous method.
Listener Example:
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
返回的BulkResponse包含有关已执行操作的信息,并允许如下迭代每个结果:
The returned BulkResponse contains information about the executed operations and allows to iterate over each result as follows:
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
可以选择提供以下参数:
The following arguments can optionally be provided:
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
我希望这会有所帮助。
这篇关于使用Java RestHighLevelClient使用BulkRequest API为ElasticSearch索引100K文档的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!