可观察到像Lmax Disruptor这样的批处理 [英] Observable to batch like Lmax Disruptor

查看:339
本文介绍了可观察到像Lmax Disruptor这样的批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

熟悉 lmax环形缓冲区(干扰器)的人知道最大的缓存器之一该数据结构的优点是它可以对传入的事件进行批处理,当我们有一个可以利用批处理使系统自动适应负载的使用者时,向它抛出的事件越多越好.

我想知道我们能否通过Observable(针对批处理功能)达到相同的效果.我已经尝试过 Observable.buffer ,但这是非常不同的,缓冲区将等待而不是在未达到预期事件数量的情况下发出批次.我们想要的完全不同.

给定子订户正在等待Observable<Collection<Event>>中的一批,当单个项目到达流时,它发出一个单个元素批,并由订户处理,而它正在处理其他元素到达并收集到下一个批次中,订阅者完成执行后,将立即获得下一个批处理,该批处理具有自上次处理开始以来到达的事件数量...

因此,如果我们的订户足够快地一次处理一个事件,那么它将处理,如果负载变大,它仍然具有相同的处理频率,但是每次处理的事件都更多(从而解决了背压问题). ..不像缓冲区那样会粘住并等待批处理填满.

有什么建议吗?还是我应该带环形缓冲器?

解决方案

RxJava和Disruptor代表两种不同的编程方法.

我对Disruptor并没有经验,但是基于视频对话,它基本上是一个很大的缓冲区,生产者可以像消防水龙一样发射数据,而消费者可以旋转/屈服/阻止直到数据可用.

另一方面,RxJava的目标是非阻塞事件传递.我们也有环形缓冲区,特别是在observeOn中,它充当生产者和使用者之间的异步边界,但是这些缓冲区要小得多,并且我们通过应用协同例程方法来避免缓冲区溢出和缓冲区膨胀.协同例程归结为发送给您的回调的回调,因此您可以回调我们的回调以按照您的步调向您发送一些数据.这种请求的频率决定了步调.

有些数据源不支持此类合作社流,并且需要onBackpressureXXX运算符之一,如果下游请求的速度不够快,这些运算符将缓冲/删除值.

如果您认为批量处理数据的效率比一对一更高,则可以使用buffer运算符,该运算符具有重载来指定缓冲区的持续时间:例如,您可以拥有10 ms的时间数据,与在此持续时间内到达多少值无关.

通过请求频率控制批次大小非常棘手,可能会产生无法预料的后果.通常,问题是,如果您从批处理源中request(n),则表示可以处理n个元素,但是源现在必须创建n个大小为1的缓冲区(因为类型为Observable<List<T>>).相反,如果未调用任何请求,则运算符将缓冲数据,从而导致更长的缓冲时间.如果您真的能跟上进度,这些行为会在处理中产生额外的开销,并且还必须将冷源转换为火喉(否则,您所拥有的基本上就是buffer(1))本身现在会导致缓冲区膨胀.

Those who are familiar with lmax ring buffer (disruptor) know that one of the biggest advanatages of that data structure is that it batches incomming events and when we have a consumer that can take advantage of batching that makes the system automatically adjustable to the load, the more events you throw at it the better.

I wonder couldnt we achieve the same effect with an Observable (targeting the batching feature). I've tried out Observable.buffer but this is very different, buffer will wait and not emit the batch while the expected number of events didnt arrive. what we want is quite different.

given the subsriber is waiting for a batch from Observable<Collection<Event>>, when a single item arrives at stream it emits a single element batch which gets processed by subscriber, while it is processing other elements are arriving and getting collected into next batch, as soon as subscriber finishes with the execution it gets the next batch with as many events as had arrived since it started last processing...

So as a result if our subscriber is fast enough to process one event at a time it will do so, if load gets higher it will still have the same frequency of processing but more events each time (thus solving backpressure problem)... unlike buffer which will stick and wait for batch to fill up.

Any suggestions? or shall i go with ring buffer?

解决方案

RxJava and Disruptor represent two different programming approaches.

I'm not experienced with Disruptor but based on video talks, it is basically a large buffer where producer emit data like a firehose and consumers spin/yield/block until data is available.

RxJava, on the other hand, aims at non-blocking event delivery. We too have ringbuffers, notably in observeOn which acts as the async-boundary between producers and consumers, but these are much smaller and we avoid buffer overflows and buffer bloat by applying the co-routines approach. Co-routines boil down to callbacks sent to your callbacks so yo can callback our callbacks to send you some data at your pace. The frequency of such requests determines the pacing.

There are data sources that don't support such co-op streaming and require one of the onBackpressureXXX operators that will buffer/drop values if the downstream doesn't request fast enough.

If you think you can process data in batches more efficiently than one-by-one, you can use the buffer operator which has overloads to specify time duration for the buffers: you can have, for example, 10 ms worth of data, independent of how many values arrive in this duration.

Controlling the batch-size via request frequency is tricky and may have unforseen consequences. The problem, generally, is that if you request(n) from a batching source, you indicate you can process n elements but the source now has to create n buffers of size 1 (because the type is Observable<List<T>>). In contrast, if no request is called, the operator buffers the data resulting in longer buffers. These behaviors introduce extra overhead in the processing if you really could keep up and also has to turn the cold source into a firehose (because otherwise what you have is essentially buffer(1)) which itself can now lead to buffer bloat.

这篇关于可观察到像Lmax Disruptor这样的批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆