1出版商和4个并行消费者干扰器的例子 [英] Disruptor example with 1 publisher and 4 parallel consumers

查看:148
本文介绍了1出版商和4个并行消费者干扰器的例子的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在这个例子中 http://stackoverflow.com/a/9980346/93647 这里的why我破坏者的例子是如此之慢?(在问题的末尾)有1该出版商发布项目和1个消费者。

In this example http://stackoverflow.com/a/9980346/93647 and here why my disruptor example is so slow? (at the end of the question) there is 1 publisher which publish items and 1 consumer.

但在我的情况下,消费者的工作更为复杂,需要一定的时间。所以,我想4消费者,并行处理数据

But in my case consumer work is much more complicated and takes some time. So I want 4 consumers that process data in parallel.

因此,举例来说,如果生产商生产的数字:1,2,3,4,5,6,7,8, 9,10,11 ..

So for example if producer produce numbers: 1,2,3,4,5,6,7,8,9,10,11..

我要consumer1赶上1,5,9,... consumer2赶上2,6,10,... consumer3到赶上3,7,11,... consumer4赶上4,8,12 ......(当然不完全是这些数字,这个想法是,数据应并行处理,我不在乎这一定数量的处理上这是因为在实际应用中消费的工作是相当昂贵的消费)

I want consumer1 to catch 1,5,9,... consumer2 to catch 2,6,10,... consumer3 to catch 3,7,11,... consumer4 to catch 4,8,12... (well not exactly these numbers, the idea is that data should be processed in parallel, i don't care which certain number is processed on which consumer)

和记住这需要做并行。我希望消费者能够在不同的线程中执行使用多核系统的能力。

And remember this need to be done parallel because in real application consumer work is pretty expensive. I expect consumers to be executed in different threads to use power of multicore systems.

当然,我可以创建4个ringbuffers并附加1消费1环形缓冲区。这样我可以使用原来的例子。但我觉得它不会是正确的。 。可能这将是正确的创建1发布者(1 ringbuffer)和4的消费者 - 因为这是我需要的。

Of course I can just create 4 ringbuffers and attach 1 consumer to 1 ring-buffer. This way I can use original example. But I feel it wouldn't be correct. Likely it would be correct to create 1 publisher (1 ringbuffer) and 4 consumers - as this is what i need.

添加链接到谷歌群体非常simular问题: https://groups.google.com/forum/#! MSG / LMAX-破坏剂/ -CLapWuwWLU / GHEP4UkxrAEJ

Adding link to a very simular question in google groups: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

因此,我们有两种选择:

So we have two options:


  • 一圈许多消费者(每个消费者将唤醒每此外,所有的消费者应该有相同的WaitStrategy)

  • 许多一环 - 一个消费者(每个消费者将被唤醒,只对数据,它应该处理。每一个消费者可以拥有自己的WaitStrategy)。

推荐答案

修改:我忘了提代码从部分的常见问题。我不知道,如果这种方法比弗兰克的建议,更好或更差。

EDIT: I forgot to mention the code is partially taken from the FAQ. I have no idea if this approach is better or worse than Frank's suggestion.

该项目严重不足证明,这是一个耻辱,因为它看起来不错。结果
反正尝试以下剪断(根据您的第一个链接) - 单声道测试,似乎是确定:

The project is severely under documented, that's a shame as it looks nice.
Anyway try the following snip (based on your first link) - tested on mono and seems to be OK:

using System;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        public long Value { get; set; }
    }

    public class MyHandler : IEventHandler<ValueEntry>
    {
        private static int _consumers = 0;
        private readonly int _ordinal;

        public MyHandler()
        {
            this._ordinal = _consumers++;
        }

        public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
        {
            if ((sequence % _consumers) == _ordinal)
                Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal);
            else
                Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);                     
        }
    }

    class Program
    {
        private static readonly Random _random = new Random();
        private const int SIZE = 16;  // Must be multiple of 2
        private const int WORKERS = 4; 

        static void Main()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default);
            for (int i=0; i < WORKERS; i++)
                disruptor.HandleEventsWith(new MyHandler());
            var ringBuffer = disruptor.Start();

            while (true)
            {
                long sequenceNo = ringBuffer.Next();
                ringBuffer[sequenceNo].Value =  _random.Next();;
                ringBuffer.Publish(sequenceNo);
                Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value);
                Console.ReadKey();
            }
        }
    }
}

这篇关于1出版商和4个并行消费者干扰器的例子的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆