带有 NEST 的 Elasticsearch 批量插入返回 es_rejected_execution_exception [英] Elasticsearch bulk insert with NEST returns es_rejected_execution_exception

查看:21
本文介绍了带有 NEST 的 Elasticsearch 批量插入返回 es_rejected_execution_exception的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 Elasticsearch 中使用 .Net API 进行批量插入,这是我在执行操作时遇到的错误;

I am trying to do bulk insert using .Net API in Elasticsearch and this is the error that I am getting while performing the operation;

Error   {Type: es_rejected_execution_exception Reason: "rejected execution of org.elasticsearch.transport.TransportService$6@604b47a4 on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51f4f734[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 164]]" CausedBy: ""}   Nest.BulkError

是因为我的系统空间不足还是批量插入功能本身不起作用?我的 NEST 版本是 5.0Elasticsearch 版本也是 5.0.

Is it due to the low space in my system or the bulk insert function itself is not working? My NEST version is 5.0 and Elasticsearch version is also 5.0.

批量插入逻辑代码;

public void bulkInsert(List<BaseData> recordList, List<String> listOfIndexName) {
    BulkDescriptor descriptor = new BulkDescriptor();            
    foreach (var j in Enumerable.Range(0, recordList.Count)) {
        descriptor.Index<BaseData>(op => op.Document(recordList[j])
                                           .Index(listOfIndexName[j]));
    }
    var result = clientConnection.Bulk(descriptor);
}

推荐答案

正如 Val 在评论中所说,您一次发送的数据可能超出集群的处理能力.看起来您可能正尝试在一个批量请求中发送所有文档,这对于大量文档或大型文档可能不起作用.

As Val said in the comments, you're likely sending more data at a time than your cluster can handle. It looks like you might be trying to send all your documents in one bulk request, which for a lot of documents or large documents may not work.

使用_bulk,您需要通过多个批量请求将数据发送到集群,并找到您可以在每个批量请求中发送的最佳文档数量,在除了您可以并发发送到集群的批量请求数量之外.

With _bulk, you need to send your data to the cluster in several bulk requests and find the optimum number of documents that you can send in each bulk request, in addition to the number of bulk requests that you can send concurrently to your cluster.

这里没有关于最佳大小的硬性规定,因为它会因文档的复杂性、分析方式、集群硬件、集群设置、索引设置等而异.

There are no hard and fast rules here for the optimum size because it can vary depending on the complexity of your documents, how they are analyzed, the cluster hardware, cluster settings, index settings, etc.

最好的做法是从一个合理的数字开始,比如在一个请求中包含 500 个文档(或在您的上下文中有意义的某个数字),然后从那里开始.计算每个批量请求的总大小(以字节为单位)也是一种很好的方法.如果性能和吞吐量不足,则增加文档数量、请求字节大小或并发请求,直到您开始看到 es_rejected_execution_exception.

The best thing to do is start with a reasonable number, say 500 documents (or some number that makes sense in your context) in one request, and then go from there. Calculating the total size in bytes of each bulk request is also a good approach to take. If the performance and throughput is insufficient then increase the number of documents, request byte size or concurrent requests until you start seeing es_rejected_execution_exception.

NEST 5.x 附带一个方便的帮助程序,使用 IObservable 和 Observable 设计模式

NEST 5.x ships with a handy helper to make bulk requests much easier, using an IObservable<T> and the Observable design pattern

void Main()
{
    var client = new ElasticClient();

    // can cancel the operation by calling .Cancel() on this
    var cancellationTokenSource = new CancellationTokenSource();

    // set up the bulk all observable
    var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
        // number of concurrent requests
        .MaxDegreeOfParallelism(8)
        // in case of 429 response, how long we should wait before retrying
        .BackOffTime(TimeSpan.FromSeconds(5))
        // in case of 429 response, how many times to retry before failing
        .BackOffRetries(2)
        // number of documents to send in each request
        .Size(500)
        .Index("index-name")
        .RefreshOnCompleted(),
        cancellationTokenSource.Token
    );

    var waitHandle = new ManualResetEvent(false);
    Exception ex = null;

    // what to do on each call, when an exception is thrown, and 
    // when the bulk all completes
    var bulkAllObserver = new BulkAllObserver(
        onNext: bulkAllResponse =>
        {
            // do something after each bulk request
        },
        onError: exception =>
        {
            // do something with exception thrown
            ex = exception;
            waitHandle.Set();
        },
        onCompleted: () =>
        {
            // do something when all bulk operations complete
            waitHandle.Set();
        });

    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait for handle to be set.
    waitHandle.WaitOne();

    if (ex != null)
    {
        throw ex;
    }
}

// Getting documents should be lazily enumerated collection ideally
public static IEnumerable<Document> GetDocuments()
{
    return Enumerable.Range(1, 10000).Select(x =>
        new Document
        {
            Id = x,
            Name = $"Document {x}" 
        }
    );
}

public class Document
{
    public int Id { get; set; }
    public string Name { get; set; }
}

这篇关于带有 NEST 的 Elasticsearch 批量插入返回 es_rejected_execution_exception的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆