为什么我的Disruptor程序没有充分利用环形缓冲区 [英] Why my Disruptor program don't take full advantage of the ringbuffer

查看:188
本文介绍了为什么我的Disruptor程序没有充分利用环形缓冲区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Disruptor github地址为: https://github.com/LMAX-Exchange/disruptor

Disruptor github address is: https://github.com/LMAX-Exchange/disruptor

我对此进行了一个简单的测试:

I've a simple test for it as below:

public class DisruptorMain {
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void main(String[] args) throws Exception {
        class Element {

            private int value;

            public int get() {
                return value;
            }

            public void set(int value) {
                this.value = value;
            }

        }

        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) {
                try {
                    Thread.sleep(1000 * sequence);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("Element: " + element.get());
            }
        };

        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        int bufferSize = 4;

        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        disruptor.handleEventsWith(handler);

        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; l < 8; l++) {
            long sequence = ringBuffer.next();
            System.out.println("sequence:" + sequence);

            try {
                Element event = ringBuffer.get(sequence);
                event.set(l);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
}

结果是: 顺序:0 顺序:1 顺序:2 顺序:3 元素:0 元素:1 元素:2 元素:3 顺序:4 顺序:5 顺序:6 顺序:7 元素:4 元素:5 元素:6 元素:7

The result is: sequence:0 sequence:1 sequence:2 sequence:3 Element: 0 Element: 1 Element: 2 Element: 3 sequence:4 sequence:5 sequence:6 sequence:7 Element: 4 Element: 5 Element: 6 Element: 7

在我的测试中,我定义了一个大小为4的环形缓冲区,我有一个生产者为其创建8个任务,我的问题是,当生产者将4个任务放入环形缓冲区中时,使用者开始执行任务从环形缓冲区开始工作后,任务1完成后,环形缓冲区应为任务5提供一个空白空间,但结果表明,只有在环形缓冲区中所有任务均已完成后,环形缓冲区才能接受新任务,为什么?

in my test, I've defined a ringbuffer size of 4, and I have a producer to create 8 tasks for it, my question is, when the producer have put 4 tasks in the ringbuffer, the consumer begins to take task from the ringbuffer to work, after task 1 finish, the ringbuffer should have an empty space for the task 5, but the result shows that, only if all of the tasks have been finished in the ringbuffer, the ringbuffer can accept the new task, why?

推荐答案

这是因为Disruptor将在事件处理程序中进行批处理.如果事件处理程序速度慢或环形缓冲区较小,则批处理大小通常可以是环形缓冲区的大小. Disruptor将仅更新该事件处理程序的已处理序列,直到批处理完成为止.这减少了需要对发布者用来确定空间是否可用的序列变量进行的更新次数.如果需要使空间比默认值更早可用,则可以使用SequenceReportingEventHandler.

This is because the Disruptor will batch in the event handler. If the event handler is slow or the ring buffer is small the batch size can often be the size of the ring buffer. The Disruptor will only update the processed sequence for that event handler until the batch is complete. This reduces the number of updates that it needs to make to the sequence variable used by the publisher to determine if space is available. If you need to make space available earlier than the default then you can do that using a SequenceReportingEventHandler.

public class MyEventHandler implements SequenceReportingEventHandler<Element> {
    Sequence processedSequence;

    public void setSequenceCallback(Sequence s) {
        processedSequence = s;
    }

    public void onEvent(Element e, long sequence, boolean endOfBatch) {
        // Do stuff
        processedSequence.set(sequence);
    }
}

这篇关于为什么我的Disruptor程序没有充分利用环形缓冲区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆