使用TPL生产者消费者模式,在.NET 4.0中的任务 [英] Producer Consumer model using TPL, Tasks in .net 4.0

查看:304
本文介绍了使用TPL生产者消费者模式,在.NET 4.0中的任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个相当大的XML文件(1-2GB左右)。

要求是坚持XML数据到数据库中。 目前,这是在3个步骤来实现。

  1. 读取大型文件使用较少的内存足迹尽可能
  2. 从XML数据创建实体
  3. 店铺从创建的实体中的数据,以使用SqlBulkCopy的数据库。

要获得更好的性能我想创造一个生产者创建一组实体的生产者 - 消费者模式说了一批10K并将其添加到队列中。与消费者应采取批次实体从队列和使用SqlBulkCopy的坚持到数据库中。

谢谢, 戈库尔

 无效的主要()
{
    INT ICOUNT = 0;
    字符串文件名= @C:\ DATA \ CatalogIndex.xml;

    日期时间的startTime = DateTime.Now;
    Console.WriteLine(开始时间:{0},的startTime);
    FileInfo的F1 =新的FileInfo(文件名);
    Console.WriteLine(文件大小:{0} MB,fi.Length / 1048576.0);

/ *我想改变这个循环在这里创建一个生产者消费者模式来处理数据并行-LY
* /
     的foreach(在StreamElements VAR元素(文件名,标题))
            {
                ICOUNT ++;
            }

            Console.WriteLine(计数:{0},ICOUNT);
            Console.WriteLine(结束时间:{0},拍摄时间:{1},DateTime.Now,DateTime.Now  - 的startTime);
        }

    私有静态的IEnumerable<的XElement> StreamElements(字符串文件名,字符串的ElementName)
    {
        使用(VAR RDR = XmlReader.Create(文件名))
        {
            rdr.MoveToContent();
            而(!rdr.EOF)
            {
                如果((rdr.NodeType == XmlNodeType.Element)及及(rdr.Name ==的ElementName))
                {
                    变种E = XElement.ReadFrom(RDR)作为的XElement;
                    产量返回电子邮件;
                }
                其他
                {
                    rdr.Read();
                }
            }
            rdr.Close();
        }
    }
 

解决方案

这是你想怎么办?

 无效的主要()
    {
        const int的inputCollectionBufferSize = 1024;
        const int的bulkInsertBufferCapacity = 100;
        const int的bulkInsertConcurrency = 4;

        BlockingCollection<对象> inputCollection =新BlockingCollection<对象>(inputCollectionBufferSize);

        任务loadTask = Task.Factory.StartNew(()=>
        {
            的foreach(在ReadAllElements(对象nextItem ...))
            {
                //如果已经有足够多的项目,这将有可能阻止
                inputCollection.Add(nextItem);
            }

            //标记这个集合为已完成
            inputCollection.CompleteAdding();
        });

        行动parseAction =()=>
        {
            名单<对象> bulkInsertBuffer =新的名单,其中,对象>(bulkInsertBufferCapacity);

            的foreach(对象nextItem在inputCollection.GetConsumingEnumerable())
            {
                如果(bulkInsertBuffer.Length == bulkInsertBufferCapacity)
                {
                    CommitBuffer(bulkInsertBuffer);
                    bulkInsertBuffer.Clear();
                }

                bulkInsertBuffer.Add(nextItem);
            }
        };

        名单<任务> parseTasks =新的名单,其中,任务>(bulkInsertConcurrency);

        的for(int i = 0; I< bulkInsertConcurrency;我++)
        {
            parseTasks.Add(Task.Factory.StartNew(parseAction));
        }

        //退出前等待
        loadTask.Wait();
        Task.WaitAll(parseTasks.ToArray());
    }
 

I have a fairly large XML file(around 1-2GB).

The requirement is to persist the xml data in to database. Currently this is achieved in 3 steps.

  1. Read the large file with less memory foot print as much as possible
  2. Create entities from the xml-data
  3. Store the data from the created entities in to the database using SqlBulkCopy.

To achieve better performance I want to create a Producer-consumer model where the producer creates a set of entities say a batch of 10K and adds it to a Queue. And the consumer should take the batch of entities from the queue and persist to the database using sqlbulkcopy.

Thanks, Gokul

void Main()
{
    int iCount = 0;
    string fileName = @"C:\Data\CatalogIndex.xml";

    DateTime startTime = DateTime.Now;
    Console.WriteLine("Start Time: {0}", startTime);
    FileInfo fi = new FileInfo(fileName);
    Console.WriteLine("File Size:{0} MB", fi.Length / 1048576.0);

/* I want to change this loop to create a producer consumer pattern here to process the data parallel-ly
*/
     foreach (var element in StreamElements(fileName,"title"))
            {
                iCount++;
            }

            Console.WriteLine("Count: {0}", iCount);
            Console.WriteLine("End Time: {0}, Time Taken:{1}", DateTime.Now, DateTime.Now - startTime);
        }

    private static IEnumerable<XElement> StreamElements(string fileName, string elementName)
    { 
        using (var rdr = XmlReader.Create(fileName))
        {
            rdr.MoveToContent();
            while (!rdr.EOF)
            {
                if ((rdr.NodeType == XmlNodeType.Element) && (rdr.Name == elementName))
                {
                    var e = XElement.ReadFrom(rdr) as XElement;
                    yield return e;
                }
                else
                {
                    rdr.Read();
                }
            }
            rdr.Close();
        }
    }

解决方案

Is this what you are trying to do?

    void Main()
    {
        const int inputCollectionBufferSize = 1024;
        const int bulkInsertBufferCapacity = 100;
        const int bulkInsertConcurrency = 4;

        BlockingCollection<object> inputCollection = new BlockingCollection<object>(inputCollectionBufferSize);

        Task loadTask = Task.Factory.StartNew(() =>
        {
            foreach (object nextItem in ReadAllElements(...))
            {
                // this will potentially block if there are already enough items
                inputCollection.Add(nextItem);
            }

            // mark this collection as done
            inputCollection.CompleteAdding();
        });

        Action parseAction = () =>
        {
            List<object> bulkInsertBuffer = new List<object>(bulkInsertBufferCapacity);

            foreach (object nextItem in inputCollection.GetConsumingEnumerable())
            {
                if (bulkInsertBuffer.Length == bulkInsertBufferCapacity)
                {
                    CommitBuffer(bulkInsertBuffer);
                    bulkInsertBuffer.Clear();
                }

                bulkInsertBuffer.Add(nextItem);
            }
        };

        List<Task> parseTasks = new List<Task>(bulkInsertConcurrency);

        for (int i = 0; i < bulkInsertConcurrency; i++)
        {
            parseTasks.Add(Task.Factory.StartNew(parseAction));
        }

        // wait before exiting
        loadTask.Wait();
        Task.WaitAll(parseTasks.ToArray());
    }

这篇关于使用TPL生产者消费者模式,在.NET 4.0中的任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆