Thread.sleep代码拦截任务的并行执行 [英] Thread.Sleep blocking parallel execution of tasks

查看:209
本文介绍了Thread.sleep代码拦截任务的并行执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打电话,调用了然后遍历和产量进行并行处理返回的值与数据库的辅助方法。为了防止它锤击数据库,我在那里的Thread.Sleep暂停执行到数据库。然而,这似乎会阻止那些依然在Parallel.ForEach发生处决。什么是实现这一目标,以防止堵塞的最好方法是什么?

I'm calling a worker method that calls to the database that then iterates and yield returns values for parallel processing. To prevent it from hammering the database, I have a Thread.Sleep in there to pause the execution to the DB. However, this appears to be blocking executions that are still occurring in the Parallel.ForEach. What is the best way to achieve this to prevent blocking?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());

    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call

        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });

        foreach (var item in workItems)
        {
            yield return item;
        }

        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }

    yield break;
}



编辑:
我改变了它包括的答案,它仍然是没有工作,我很期待。我加了.AsParallel()。WithDegreeOfParallelism(10)到GetWorkItems()调用。 ?是我的期望不正确时,我觉得并行应该继续,即使基地线程处于休眠状态来执行

I changed it to include the answer and it's still not working as I'm expecting. I added the .AsParallel().WithDegreeOfParallelism(10) to the GetWorkItems() call. Are my expectations incorrect when I think that Parallel should continue to execute even though the base thread is sleeping?

例如:
我有15个项目,它迭代,并且抓住10个项目,并启动它们。由于每一个结束,它要求从GetWorkItems一个又一个,直到它试图索要16项。在这一点上,应该停止试图攫取更多的项目,但应继续处理项目11-15直到这些是完整的。是如何并行应该是工作?因为它不是目前这样做。什么它目前做的是它完成6时,它锁定在Parallel.ForEach随后的10仍在运行。

Example: I have 15 items, it iterates and grabs 10 items and starts them. As each one finishes, it asks for another one from GetWorkItems until it tries to ask for a 16th item. At that point it should stop trying to grab more items but should continue processing items 11-15 until those are complete. Is that how parallel should be working? Because it's not currently doing that. What it's currently doing is when it completes 6, it locks the subsequent 10 still being run in the Parallel.ForEach.

推荐答案

我建议你创建的工作项目的 BlockingCollection (队列),并调用一个计时器每30秒的数据库来填充它。是这样的:

I would suggest that you create a BlockingCollection (a queue) of work items, and a timer that calls the database every 30 seconds to populate it. Something like:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

和上初始化:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

这将查询每30秒的项目数据库。

That will query the database for items every 30 seconds.

有关的调度工作项要处理,则有许多不同的解决方案。最接近你有什么会是这样的:

For scheduling the work items to be processed, you have a number of different solutions. The closest to what you have would be this:

WorkItem item;

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

这消除了任何线程阻塞,并让TPL决定如何最好

That eliminates blocking in any of the threads, and lets the TPL decide how best to allocate the parallel tasks.

其实,更接近你所拥有的是:

编辑>

Actually, closer to what you have is:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

您可能能够使用:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

我不知道这是否会工作或有多好。可能是值得一试。

I don't know if that will work or how well. Might be worth a try . . .

END

在一般情况下,就是我的建议是,你把这个作为一个生产者/消费者应用程序,与制片在于定期查询数据库中的新项目的线程。我的示例查询(在这种情况下,30)秒,这将很好地工作,如果平均数据库每N一次,可以清空工作队列中的每个30秒。这将会给小于从项目发布到数据库中,直到你有结果的时间一分钟的平均等待时间。

In general, what I'm suggesting is that you treat this as a producer/consumer application, with the producer being the thread that queries the database periodically for new items. My example queries the database once every N (30 in this case) seconds, which will work well if, on average, you can empty your work queue every 30 seconds. That will give an average latency of less than a minute from the time an item is posted to the database until you have the results.

您可以降低轮询频率(以及因此,延迟),但是,这将导致更多的数据库流量。

You can reduce the polling frequency (and thus the latency), but that will cause more database traffic.

您可以用它票友了。例如,如果你在30秒后轮询数据库,你会得到的物品数量庞大,那么很可能你会更很快得到了,你会想在15秒(或更少)再次轮询。相反,如果你在30秒后轮询数据库和得到什么,那么你或许可以再次轮询之前等待更长的时间。

You can get fancier with it, too. For example, if you poll the database after 30 seconds and you get a huge number of items, then it's likely that you'll be getting more soon, and you'll want to poll again in 15 seconds (or less). Conversely, if you poll the database after 30 seconds and get nothing, then you can probably wait longer before you poll again.

您可以使用单次触发定时器设置的一种自适应轮询。也就是说,你指定-1为最后一个参数当您创建计时器,这会导致它只有一次火。您的计时器回调计算出下一个大选投票前等待多长时间,并呼吁 Timer.Change 初始化与新价值的定时器。

You can set up that kind of adaptive polling using a one-shot timer. That is, you specify -1 for the last parameter when you create the timer, which causes it to fire only once. Your timer callback figures out how long to wait before the next poll and calls Timer.Change to initialize the timer with the new value.

这篇关于Thread.sleep代码拦截任务的并行执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆