线程锁中的多线程 [英] Multithreading within a thread lock

查看:35
本文介绍了线程锁中的多线程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力加快某些进程的执行速度,这些进程将大量记录(主要是数百万)发布到 Elasticsearch.在我的 C# 代码中,我已经使用 Dataflow 实现了一个多线程解决方案,如下所示:

I am working on speeding up the execution of some processes that publish a bulk set of records (Mostly in the millions) to Elasticsearch. In my C# code I have already implemented a multi-threaded solution using Dataflow as scaffolded below:

var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));

fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });

fetchRecords.Post("Start");

然后是我要实现的发送批量请求调用:

And then the send bulk request call I want to implement:

public IBulkResponse sendBulkRequest(List<?> records)
{
    lock(SomeStaticObject)
    {
       // Execute several new threads to send records in bulk
    }
}

我对的问题是在作为数据流管道一部分存在的锁中执行附加线程的实用性.

My question for you is on the practicality for executing additional threads within a lock that exists as part of a Dataflow pipeline.

这样好吗?我能看到性能、执行、缓存/内存未命中等方面的任何潜在问题吗?

Is this ok? Could I see any potential hiccups in performance, execution, cache/memory misses, etc?

我们很乐意接受任何见解.

Any insight would be gladly accepted.

推荐答案

您可能希望在此处使用 BulkAll,它实现了可观察模式以向 Elasticsearch 发出并发批量请求.举个例子

You may want to use BulkAll here, which implements the observable pattern to make concurrent bulk requests to Elasticsearch. Here's an example

void Main()
{   
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var connectionSettings = new ConnectionSettings(pool);

    var client = new ElasticClient(connectionSettings);
    var indexName = "bulk-index";

    if (client.IndexExists(indexName).Exists)
        client.DeleteIndex(indexName);

    client.CreateIndex(indexName, c => c
        .Settings(s => s
            .NumberOfShards(3)
            .NumberOfReplicas(0)
        )
        .Mappings(m => m
            .Map<DeviceStatus>(p => p.AutoMap())
        )
    );

    var size = 500;

    // set up the observable
    var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
        .Index(indexName)
        .MaxDegreeOfParallelism(4)
        .RefreshOnCompleted()
        .Size(size)
    );

    var countdownEvent = new CountdownEvent(1);

    Exception exception = null;

    // set up an observer. Delegates passed are:
    // 1. onNext
    // 2. onError
    // 3. onCompleted
    var bulkAllObserver = new BulkAllObserver(
        response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
        ex => 
        {
            // capture exception for throwing outside Observer.
            // You may decide to do something different here
            exception = ex;
            countdownEvent.Signal();
        },
        () => 
        {
            Console.WriteLine("Finished");
            countdownEvent.Signal();
        });

    // subscribe to the observable          
    bulkAllObservable.Subscribe(bulkAllObserver);

    // wait indefinitely for it to finish. May want to put a
    // max timeout on this  
    countdownEvent.Wait();

    if (exception != null) 
    {
        throw exception;
    }
}

// lazily enumerated collection
private static IEnumerable<DeviceStatus> GetDeviceStatus()
{
    for (var i = 0; i < DocumentCount; i++)
        yield return new DeviceStatus(i); 
}

private const int DocumentCount = 20000;

public class DeviceStatus
{
    public DeviceStatus(int id) => Id = id;
    public int Id {get;set;}
}

如果不需要在observable中做任何特别的事情,可以在observable上使用.Wait()方法

If you don't need to do anything special in the observer, you can use the .Wait() method on the observable

var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
    .Index(indexName)
    .MaxDegreeOfParallelism(4)
    .RefreshOnCompleted()
    .Size(size)
)
.Wait(
    TimeSpan.FromHours(1), 
    response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
);

BulkAllScrollAllReindex 有可观察的方法(尽管有 ReindexOnServer 在 Elasticsearch 中重新索引并映射到 Reindex API -Reindex 方法早于此)

There are observable methods for BulkAll, ScrollAll and Reindex (although there is ReindexOnServer which reindexes within Elasticsearch and maps to the Reindex API - the Reindex method predates this)

这篇关于线程锁中的多线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆