带有 NEST 的 Elasticsearch 批量插入返回 es_rejected_execution_exception [英] Elasticsearch bulk insert with NEST returns es_rejected_execution_exception
问题描述
我正在尝试在 Elasticsearch
中使用 .Net API
进行批量插入,这是我在执行操作时遇到的错误;
I am trying to do bulk insert using .Net API
in Elasticsearch
and this is the error that I am getting while performing the operation;
Error {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService$6@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""} Nest.BulkError
是因为我的系统空间不足还是批量插入功能本身不起作用?我的 NEST
版本是 5.0
,Elasticsearch
版本也是 5.0
.
Is it due to the low space in my system or the bulk insert function itself is not working? My NEST
version is 5.0
and Elasticsearch
version is also 5.0
.
批量插入逻辑代码;
public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
BulkDescriptor descriptor = new BulkDescriptor();
foreach (var j in Enumerable.Range(0, recordList.Count)) {
descriptor.Index<BaseData>(op => op.Document(recordList[j])
.Index(listOfIndexName[j]));
}
var result = clientConnection.Bulk(descriptor);
}
推荐答案
正如 Val 在评论中所说,您一次发送的数据可能超出集群的处理能力.看起来您可能正尝试在一个批量请求中发送所有文档,这对于大量文档或大型文档可能不起作用.
As Val said in the comments, you're likely sending more data at a time than your cluster can handle. It looks like you might be trying to send all your documents in one bulk request, which for a lot of documents or large documents may not work.
使用_bulk
,您需要通过多个批量请求将数据发送到集群,并找到您可以在每个批量请求中发送的最佳文档数量,在除了您可以并发发送到集群的批量请求数量之外.
With _bulk
, you need to send your data to the cluster in several bulk requests and find the optimum number of documents that you can send in each bulk request, in addition to the number of bulk requests that you can send concurrently to your cluster.
这里没有关于最佳大小的硬性规定,因为它会因文档的复杂性、分析方式、集群硬件、集群设置、索引设置等而异.
There are no hard and fast rules here for the optimum size because it can vary depending on the complexity of your documents, how they are analyzed, the cluster hardware, cluster settings, index settings, etc.
最好的做法是从一个合理的数字开始,比如在一个请求中包含 500 个文档(或在您的上下文中有意义的某个数字),然后从那里开始.计算每个批量请求的总大小(以字节为单位)也是一种很好的方法.如果性能和吞吐量不足,则增加文档数量、请求字节大小或并发请求,直到您开始看到 es_rejected_execution_exception
.
The best thing to do is start with a reasonable number, say 500 documents (or some number that makes sense in your context) in one request, and then go from there. Calculating the total size in bytes of each bulk request is also a good approach to take. If the performance and throughput is insufficient then increase the number of documents, request byte size or concurrent requests until you start seeing es_rejected_execution_exception
.
NEST 5.x 附带一个方便的帮助程序,使用 IObservable
和 Observable 设计模式
NEST 5.x ships with a handy helper to make bulk requests much easier, using an IObservable<T>
and the Observable design pattern
void Main()
{
var client = new ElasticClient();
// can cancel the operation by calling .Cancel() on this
var cancellationTokenSource = new CancellationTokenSource();
// set up the bulk all observable
var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
// number of concurrent requests
.MaxDegreeOfParallelism(8)
// in case of 429 response, how long we should wait before retrying
.BackOffTime(TimeSpan.FromSeconds(5))
// in case of 429 response, how many times to retry before failing
.BackOffRetries(2)
// number of documents to send in each request
.Size(500)
.Index("index-name")
.RefreshOnCompleted(),
cancellationTokenSource.Token
);
var waitHandle = new ManualResetEvent(false);
Exception ex = null;
// what to do on each call, when an exception is thrown, and
// when the bulk all completes
var bulkAllObserver = new BulkAllObserver(
onNext: bulkAllResponse =>
{
// do something after each bulk request
},
onError: exception =>
{
// do something with exception thrown
ex = exception;
waitHandle.Set();
},
onCompleted: () =>
{
// do something when all bulk operations complete
waitHandle.Set();
});
bulkAllObservable.Subscribe(bulkAllObserver);
// wait for handle to be set.
waitHandle.WaitOne();
if (ex != null)
{
throw ex;
}
}
// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
return Enumerable.Range(1, 10000).Select(x =>
new Document
{
Id = x,
Name = $"Document {x}"
}
);
}
public class Document
{
public int Id { get; set; }
public string Name { get; set; }
}
这篇关于带有 NEST 的 Elasticsearch 批量插入返回 es_rejected_execution_exception的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!