线程安全的数据表 [英] Thread safety for DataTable

查看:255
本文介绍了线程安全的数据表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我读了这个答案 ADO.NET的DataTable / DataRow的线程安全的,不能了解一些事情。 特别是我无法理解[2]的文章。我需要使用什么样的包装呢? 谁能举个例子?

另外,我不明白是什么作者手段谈论级联锁,全锁。请举例了。

解决方案

数据表根本就不是设计或用于并发使用情况(特别是如果有任何形式的突变参与)。该建议包装就在这里,在我看来,是可以:

  • 不再需要工作,对数据表同时(涉及突变时),或者:
  • 删除数据表,而是使用一个数据结构,它可以直接支持你所需要的(例如并发收集),或者更简单,可以是平凡同步(无论是独家或读/写器)

基本上:改变问题


从评论:

  

在code是这样的:

  Parallel.ForEach(字符串,海峡=>
{
    DataRow的列;
    锁定(表){
        行= table.NewRow();
    }
    MyParser.Parse(海峡,出列);
    锁定(表){
        table.Rows.Add(行)
    }
});
 

我只能希望在行是一个错字这里,因为这实际上并不会导致它填充通过 NEWROW创建的行( ),但是:如果你非得用这种方法,你的无法使用 NEWROW 的,作为待定行还挺共享。你最好的选择是:

  Parallel.ForEach(字符串,海峡=> {
    对象[]值= MyParser.Parse(STR);
    锁定(表){
        table.Rows.Add(值);
    }
});
 

在上面的重要的变化是,锁定涵盖了整个新行的过程。请注意,您在使用不能保证秩序 Parallel.ForEach 这样,使最终为了不需要精确匹配(这不应该是很重要的如果数据包括时间分量)中的问题。

不过!我仍然认为你正在接近这个错误的方式:并行是相关的,它必须是不平凡的数据。如果你有不平凡的数据,你真的不希望有缓冲这一切在内存中。 我强烈建议做类似于以下,这将很好地工作在单个线程:

 使用(VAR BCP =新SqlBulkCopy的())
使用(VAR读卡器= ObjectReader.Create(ParseFile(路径)))
{
    bcp.DestinationTable =MyLog;
    bcp.WriteToServer(读卡器);
}
...
静态的IEnumerable< LogRow> ParseFile(字符串路径)
{
    使用(VAR读卡器= File.OpenText(路径))
    {
        串线;
        而((行= reader.ReadLine())!= NULL)
        {
            产量返回新LogRow {
                // TODO:从线填充行这里
            };
        }
    }
}
...
公共密封类LogRow {
    / *这里定义的模式* /
}
 

优势:

  • 在没有缓冲 - 这是一个全流式操作(收益率回报不把东西放到一个列表或类似)
  • 因为这个原因,该行就可以开始流立即,而无需等待整个文件是pre处理的第一个
  • 无记忆饱和的问题
  • 在没有穿线并发症/管理费用
  • 您得到preserve原来的顺序(通常不是关键的,但好)
  • 您只能通过你能多快读取原始文件,这是制约​​一般更快在单个线程比它在多线程(一个IO设备上的竞争才刚刚开销)
  • 避免数据表,这是矫枉过正这里的所有费用 - 因为它是非常灵活它有显著开销
  • (从日志文件)读写(到数据库),现在并发而不是顺序

我做我自己的工作有很多像^^^,并从经验通常是至少快一倍比填充数据表内存第一。


最后 - 这里有一个的IEnumerable与其中的一个例子; T> 实现,它接受并发读者和作者,而不需要一切都在内存中进行缓冲 - 这将允许多个线程分析数据(调用添加最后关闭)与单个线程使用SqlBulkCopy 的IEnumerable< T> API:

 使用系统;
System.Collections中使用;
使用System.Collections.Generic;
使用的System.Threading;
使用System.Threading.Tasks;

///<总结>
///充当一个容器并行读/写冲洗(例如,解析
///文件而同时上载的内容);支持任何数目的并发
///作家和读者,但要注意,每个项目将只返回一次(一次
///牵强,被丢弃)。有必要加入最后后关闭()桶
///数据,否则任何迭代器永远不会结束
///< /总结>
类ThreadSafeBucket< T> :IEnumerable的< T>
{
    私人只读队列< T>队列=新的队列< T>();

    公共无效添加(T值)
    {
        锁(队列)
        {
            如果(关闭)//没有更多的数据,一旦关闭
                抛出新的InvalidOperationException异常(以下简称斗已经被标记为已关闭);

            queue.Enqueue(值);
            如果(queue.Count == 1)
            {//有人可能正在等待数据
                Monitor.PulseAll(队列);
            }
        }
    }

    公共无效关闭()
    {
        锁(队列)
        {
            关闭= TRUE;
            Monitor.PulseAll(队列);
        }
    }
    私人布尔关闭;

    公众的IEnumerator< T>的GetEnumerator()
    {
        而(真)
        {
            t值;
            锁(队列)
            {
                如果(queue.Count == 0)
                {
                    // 没有数据;我们应该期待任何?
                    如果(关闭)产量突破; //仅此而已究竟还会不会回来

                    //其他等待被唤醒,并从开始重做
                    Monitor.Wait(队列);
                    继续;
                }
                值= queue.Dequeue();
            }
            //产生锁是**之外**
            得到的返回值;
        }
    }

    IEnumerator的IEnumerable.GetEnumerator()
    {
        返回的GetEnumerator();
    }
}

静态类节目
{
    静态无效的主要()
    {
        VAR桶=新ThreadSafeBucket<诠释>();
        INT expectedTotal = 0;
        ThreadPool.QueueUserWorkItem(代表
        {
            诠释计数= 0,总和= 0;
            的foreach(斗VAR项)
            {
                算上++;
                总和+ =项目;
                如果((计数%100)== 0)
                    Console.WriteLine(后{0} {1},计数,总和);
            }
            Console.WriteLine(总共超过{0} {1},计数,总和);
        });
        的Parallel.For(0,5000,
            新ParallelOptions {MaxDegreeOfParallelism = 3},
            I => {
                bucket.Add(ⅰ);
                Interlocked.Add(REF expectedTotal,我);
            }
        );
        Console.WriteLine(所有数据补充;关闭斗);
        bucket.Close();
        Thread.sleep代码(100);
        Console.WriteLine(预计总:{0},
            Interlocked.CompareExchange(参考expectedTotal,0,0));
        到Console.ReadLine();


    }

}
 

I had read this answer ADO.NET DataTable/DataRow Thread Safety, and can't understand some things. Particularly I can't understand [2] article. What kind of wrapper I need to use? Can anyone give an example?

Also I can't understand what author means talking about cascading lock and full lock. Please example too.

解决方案

DataTable is simply not designed or intended for concurrent usage (in particular where there is any form of mutation involved). The advisable "wrapper" here would, in my view, be either:

  • remove the need to work on the DataTable concurrently (when involving mutation), or:
  • remove the DataTable, instead using a data-structure that either directly supports what you need (for example a concurrent collection), or which is much simpler and can be trivially synchronized (either exclusive or reader/writer)

Basically: change the problem.


From comments:

The code looks like:

Parallel.ForEach(strings, str=>
{
    DataRow row;
    lock(table){
        row= table.NewRow();
    }
    MyParser.Parse(str, out row);
    lock(table){
        table.Rows.Add(row)
    }
});

I can only hope that out row is a typo here, as that won't actually lead to it populating the row created via NewRow(), but: if you absolutely have to use that approach, you can't use NewRow, as the pending row is kinda shared. Your best bet would be:

Parallel.ForEach(strings, str=> {
    object[] values = MyParser.Parse(str);
    lock(table) {
        table.Rows.Add(values);
    }
});

The important change in the above is that the lock covers the entire new row process. Note that you will have no guarantee of order when using Parallel.ForEach like this, so it is important that the final order does not need to match exactly (which shouldn't be a problem if the data includes a time component).

However! I still think you are approaching this the wrong way: for parallelism to be relevant, it must be non-trivial data. If you have non-trivial data, you really don't want to have to buffer it all in memory. I strongly suggest doing something like the following, which will work fine on a single thread:

using(var bcp = new SqlBulkCopy())
using(var reader = ObjectReader.Create(ParseFile(path)))
{
    bcp.DestinationTable = "MyLog";
    bcp.WriteToServer(reader);    
}
...
static IEnumerable<LogRow> ParseFile(string path)
{
    using(var reader = File.OpenText(path))
    {
        string line;
        while((line = reader.ReadLine()) != null)
        {
            yield return new LogRow {
                // TODO: populate the row from line here
            };
        }
    }
}
...
public sealed class LogRow {
    /* define your schema here */
}

Advantages:

  • no buffering - this is a fully streaming operation (yield return does not put things into a list or similar)
  • for that reason, the rows can start streaming immediately without needing to wait for the entire file to be pre-processed first
  • no memory saturation issues
  • no threading complications / overheads
  • you get to preserve the original order (not usually critical, but nice)
  • you are only constrained by how fast you can read the original file, which is typically faster on a single thread than it is from multiple threads (contention on a single IO device is just overhead)
  • avoids all the overheads of DataTable, which is overkill here - because it is so flexible it has significant overheads
  • read (from the log file) and write (to the database) are now concurrent rather than sequential

I do a lot of things like ^^^ in my own work, and from experience it is usually at least twice as fast than populating a DataTable in memory first.


And finally - here's an example of an IEnumerable<T> implementation that accepts concurrent readers and writers without requiring everything to be buffered in memory - which would allow multiple threads to parse the data (calling Add and finally Close) with a single thread for SqlBulkCopy via the IEnumerable<T> API:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Acts as a container for concurrent read/write flushing (for example, parsing a
/// file while concurrently uploading the contents); supports any number of concurrent
/// writers and readers, but note that each item will only be returned once (and once
/// fetched, is discarded). It is necessary to Close() the bucket after adding the last
/// of the data, otherwise any iterators will never finish
/// </summary>
class ThreadSafeBucket<T> : IEnumerable<T>
{
    private readonly Queue<T> queue = new Queue<T>();

    public void Add(T value)
    {
        lock (queue)
        {
            if (closed) // no more data once closed
                throw new InvalidOperationException("The bucket has been marked as closed");

            queue.Enqueue(value);
            if (queue.Count == 1)
            { // someone may be waiting for data
                Monitor.PulseAll(queue);
            }
        }
    }

    public void Close()
    {
        lock (queue)
        {
            closed = true;
            Monitor.PulseAll(queue);
        }
    }
    private bool closed;

    public IEnumerator<T> GetEnumerator()
    {
        while (true)
        {
            T value;
            lock (queue)
            {
                if (queue.Count == 0)
                {
                    // no data; should we expect any?
                    if (closed) yield break; // nothing more ever coming

                    // else wait to be woken, and redo from start
                    Monitor.Wait(queue);
                    continue;
                }
                value = queue.Dequeue();
            }
            // yield it **outside** of the lock
            yield return value;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

static class Program
{
    static void Main()
    {
        var bucket = new ThreadSafeBucket<int>();
        int expectedTotal = 0;
        ThreadPool.QueueUserWorkItem(delegate
        {
            int count = 0, sum = 0;
            foreach(var item in bucket)
            {
                count++;
                sum += item;
                if ((count % 100) == 0)
                    Console.WriteLine("After {0}: {1}", count, sum);
            }
            Console.WriteLine("Total over {0}: {1}", count, sum);
        });
        Parallel.For(0, 5000,
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            i => {
                bucket.Add(i);
                Interlocked.Add(ref expectedTotal, i);
            }
        );
        Console.WriteLine("all data added; closing bucket");
        bucket.Close();
        Thread.Sleep(100);
        Console.WriteLine("expecting total: {0}",
            Interlocked.CompareExchange(ref expectedTotal, 0, 0));
        Console.ReadLine();


    }

}

这篇关于线程安全的数据表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆