为什么我的破坏者示例如此缓慢? [英] Why is my disruptor example so slow?

查看:84
本文介绍了为什么我的破坏者示例如此缓慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经从Stack Overflow问题 ="a href =" https://stackoverflow.com/questions/8860684/disruptor-net-example> Disruptor.NET示例中提取了代码示例 并将其修改为测量"时间.完整列表如下:

I've taken the code example from Stack Overflow question Disruptor.NET example and modified it to "measure" time. Full listing is below:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }

        public ValueEntry()
        {
            Console.WriteLine("New ValueEntry created");
        }
    }

    public class ValueAdditionHandler : IEventHandler<ValueEntry>
    {
        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            Program.sw.Stop();
            long microseconds = Program.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
            Console.WriteLine("elapsed microseconds = " + microseconds);
            Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
        }
    }

    class Program
    {
        public static Stopwatch sw = Stopwatch.StartNew();

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 16;  // Must be multiple of 2

        static void Main(string[] args)
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            disruptor.HandleEventsWith(new ValueAdditionHandler());

            var ringBuffer = disruptor.Start();

            while (true)
            {
                var valueToSet = _random.Next();
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                sw.Restart();
                ringBuffer.Publish(sequenceNo);

                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                Thread.Sleep(1000);
            }
        }
    }
}

输出为:

New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
New ValueEntry created
Published entry 0, value 1510145842
elapsed microseconds = 2205
Event handled: Value = 1510145842 (processed event 0
Published entry 1, value 1718075893
elapsed microseconds = 85
Event handled: Value = 1718075893 (processed event 1
Published entry 2, value 1675907645
elapsed microseconds = 32
Event handled: Value = 1675907645 (processed event 2
Published entry 3, value 1563009446
elapsed microseconds = 75
Event handled: Value = 1563009446 (processed event 3
Published entry 4, value 1782914062
elapsed microseconds = 34
Event handled: Value = 1782914062 (processed event 4
Published entry 5, value 1516398244
elapsed microseconds = 50
Event handled: Value = 1516398244 (processed event 5
Published entry 6, value 76829327
elapsed microseconds = 50
Event handled: Value = 76829327 (processed event 6

因此,将数据从一个线程传递到另一个线程大约需要50微秒.但这根本不快! 当前版本的Disruptor可以在线程之间以每秒1百万条消息的速度运行约50ns."所以我的结果比预期慢1000倍.

So it takes about 50 microseconds to pass data from one thread to another. But it is not fast at all! "The current version of the Disruptor can do ~50 ns between threads at a rate of 1 million messages per second." So my results are 1000 times slower than expected.

我的示例出了什么问题?如何达到50ns的速度?

What's wrong with my example and how do achieve 50 ns speed?

我在上面修改了程序,现在收到1微秒的延迟,这要好得多.但是,我仍在等待disruptor模式专家的答复.我正在寻找一个可以证明我实际上可以在50ns内传递数据的示例.

I've modified program above and now receive 1 microsecond delay, which is much better. However, I am still waiting for the response from disruptor pattern experts. I'm looking for an example which can prove that I can actually pass data in 50 ns.

我还使用BlockingCollection编写了相同的测试,并且平均收到了14微秒的时间,这证明Disruptor更快:

Also I wrote the same test using BlockingCollection and received 14 microseconds in average, which proves that Disruptor is faster:

使用BlockingCollection:

Using BlockingCollection:

average = 14 minimum = 0 0-5 = 890558, 5-10 = 1773781, 10-30 = 6900128, >30 = 435433

使用Disruptor:

Using Disruptor:

average = 0 minimum = 0 0-5 = 9908469, 5-10 = 64464, 10-30 = 19902, >30 = 7065

BlockingCollection代码:

BlockingCollection code:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
            //   Console.WriteLine("New ValueEntry created");
        }
    }

    //public class ValueAdditionHandler : IEventHandler<ValueEntry>
    //{
    //    public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
    //    {

    //        long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    //        Program.results[data.Value] = microseconds;
    //        //Console.WriteLine("elapsed microseconds = " + microseconds);
    //        //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
    //    }
    //}

    class Program
    {
        public const int length = 10000000;
        public static Stopwatch[] sw = new Stopwatch[length];
        public static long[] results = new long[length];

        static BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(150);

        static void Main(string[] args)
        {
            for (int i = 0; i < length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            // A simple blocking consumer with no cancellation.
            Task.Factory.StartNew(() =>
            {
                while (!dataItems.IsCompleted)
                {

                    ValueEntry ve = null;
                    try
                    {
                        ve = dataItems.Take();
                        long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
                        results[ve.Value] = microseconds;

                        //Console.WriteLine("elapsed microseconds = " + microseconds);
                        //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
                    }
                    catch (InvalidOperationException) { }
                }
            }, TaskCreationOptions.LongRunning);

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;

                ValueEntry entry = new ValueEntry();
                entry.Value = valueToSet;

                sw[i].Restart();
                dataItems.Add(entry);

                //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
                //Thread.Sleep(1000);
            }

            // Wait until all events are delivered
            Thread.Sleep(5000);

            long average = 0;
            long minimum = 10000000000;
            int firstFive = 0;
            int fiveToTen = 0;
            int tenToThirty = 0;
            int moreThenThirty = 0;

            // Do not count first 100 items because they could be extremely slow
            for (int i = 100; i < length; i++)
            {
                average += results[i];
                if (results[i] < minimum)
                {
                    minimum = results[i];
                }
                if (results[i] < 5)
                {
                    firstFive++;
                }
                else if (results[i] < 10)
                {
                    fiveToTen++;
                }
                else if (results[i] < 30)
                {
                    tenToThirty++;
                } else
                {
                    moreThenThirty++;
                }
            }
            average /= (length - 100);
            Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
        }
    }
}

干扰者代码:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
            //   Console.WriteLine("New ValueEntry created");
        }
    }

    public class ValueAdditionHandler : IEventHandler<ValueEntry>
    {
        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {

            long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
            Program.results[data.Value] = microseconds;
            //Console.WriteLine("elapsed microseconds = " + microseconds);
            //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence);
        }
    }

    class Program
    {
        public const int length = 10000000;
        public static Stopwatch[] sw = new Stopwatch[length];
        public static long[] results = new long[length];

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 1024;  // Must be multiple of 2

        static void Main(string[] args)
        {
            for (int i = 0; i < length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            disruptor.HandleEventsWith(new ValueAdditionHandler());

            var ringBuffer = disruptor.Start();

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                sw[i].Restart();
                ringBuffer.Publish(sequenceNo);

                //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                //Thread.Sleep(1000);
            }

            // wait until all events are delivered
            Thread.Sleep(5000);

            long average = 0;
            long minimum = 10000000000;
            int firstFive = 0;
            int fiveToTen = 0;
            int tenToThirty = 0;
            int moreThenThirty = 0;

            // Do not count first 100 items because they could be extremely slow
            for (int i = 100; i < length; i++)
            {
                average += results[i];
                if (results[i] < minimum)
                {
                    minimum = results[i];
                }
                if (results[i] < 5)
                {
                    firstFive++;
                }
                else if (results[i] < 10)
                {
                    fiveToTen++;
                }
                else if (results[i] < 30)
                {
                    tenToThirty++;
                }
                else
                {
                    moreThenThirty++;
                }
            }
            average /= (length - 100);
            Console.WriteLine("average = {0} minimum = {1} 0-5 = {2}, 5-10 = {3}, 10-30 = {4}, >30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty);
        }
    }
}

推荐答案

在这里,我修复了您的代码:

Here, I fixed your code:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public int Value { get; set; }

        public ValueEntry()
        {
         //   Console.WriteLine("New ValueEntry created");
        }
    }

    class Program
    {
        public const int length = 1000000;
        public static Stopwatch sw;

        private static readonly Random _random = new Random();
        private static readonly int _ringSize = 1024;  // Must be multiple of 2

        static void Main(string[] args)
        {
            sw = Stopwatch.StartNew();

            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), _ringSize, TaskScheduler.Default);

            var ringBuffer = disruptor.Start();

            for (int i = 0; i < length; i++)
            {
                var valueToSet = i;
                long sequenceNo = ringBuffer.Next();

                ValueEntry entry = ringBuffer[sequenceNo];

                entry.Value = valueToSet;

                ringBuffer.Publish(sequenceNo);

                //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value);

                //Thread.Sleep(1000);
            }

            var elapsed = sw.Elapsed.Miliseconds();
            // wait until all events are delivered
            Thread.Sleep(10000);

            double average = /(double)length;
            Console.WriteLine("average = " + average);
        }
    }
}

这应该正确测试每个项目花费多长时间.

This should correctly test how long does it take for each item.

这篇关于为什么我的破坏者示例如此缓慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆