如何使用 ConcurrentQueue 处理线程<T> [英] How to work threading with ConcurrentQueue<T>

查看:19
本文介绍了如何使用 ConcurrentQueue 处理线程<T>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试找出使用队列的最佳方式.我有一个返回 DataTable 的进程.每个 DataTable 依次与前一个 DataTable 合并.有一个问题,在最终 BulkCopy (OutOfMemory) 之前保存的记录太多.

I am trying to figure out what the best way of working with a queue will be. I have a process that returns a DataTable. Each DataTable, in turn, is merged with the previous DataTable. There is one problem, too many records to hold until the final BulkCopy (OutOfMemory).

所以,我决定我应该立即处理每个传入的 DataTable.考虑 ConcurrentQueue<T>...但我不知道 WriteQueuedData() 方法如何知道将表出列并将其写入数据库.

So, I have determined that I should process each incoming DataTable immediately. Thinking about the ConcurrentQueue<T>...but I don't see how the WriteQueuedData() method would know to dequeue a table and write it to the database.

例如:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

我的第一个问题是,除了我实际上没有任何要订阅的事件这一事实之外,如果我异步调用 ExtractData() 这是否就是我所需要的全部?其次,对于 ConcurrentQueue<T> 的运行方式以及需要某种形式的触发器来与排队的对象异步工作,我是否遗漏了什么?

My first question is, aside from the fact that I don't actually have any events to subscribe to, if I call ExtractData() asynchronously will this be all that I need? Second, is there something I'm missing about the way ConcurrentQueue<T> functions and needing some form of trigger to work asynchronously with the queued objects?

更新我刚刚从 ConcurrentQueue<T> 派生了一个具有 OnItemQueued 事件处理程序的类.那么:

Update I have just derived a class from ConcurrentQueue<T> that has an OnItemQueued event handler. Then:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

对此实现有任何顾虑吗?

Any concerns about this implementation?

推荐答案

根据我对问题的理解,您遗漏了一些东西.

From my understanding of the problem, you are missing a few things.

并发队列是一种数据结构,旨在接受多个线程读取和写入队列,而无需显式锁定数据结构.(所有爵士乐都在幕后处理,或者集合以不需要锁定的方式实现.)

The concurrent queue is a data structure designed to accept multiple threads reading and writing to the queue without you needing to explicitly lock the data structure. (All that jazz is taken care of behind the scenes, or the collection is implemented in such a way that it doesn't need to take a lock.)

考虑到这一点,您尝试使用的模式似乎是生产者/消费者".首先,您有一些任务产生工作(并将项目添加到队列中).其次,您还有第二个任务消耗队列中的东西(并从队列中取出项目).

With that in mind, it looks like the pattern you are trying to use is the "Produce/Consumer". First, you have some tasks producing work (and adding items to the queue). And second you have a second task Consuming things from the queue (and dequeing items).

所以你真的需要两个线程:一个添加项目,第二个删除项目.因为您使用的是并发集合,所以可以有多个线程添加项目和多个线程删除项目.但显然,并发队列上的争用越多,就会越快成为瓶颈.

So really you want two threads: one adding items and a second removing items. Because you are using a concurrent collection, you can have multiple threads adding items and multiple threads removing items. But obviously the more contention you have on the concurrent queue the quicker that will become the bottleneck.

这篇关于如何使用 ConcurrentQueue 处理线程&lt;T&gt;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆