为什么接收缓冲区时连续缓冲区不包含的项目执行方法? [英] Why does Rx buffer continuously perform method when buffer contains no items?

查看:113
本文介绍了为什么接收缓冲区时连续缓冲区不包含的项目执行方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个接收观测充当缓冲区。现在,它执行的订阅方法或者当它获得10个项目,或经过100毫秒,以先到者为准。

I have a Rx Observable that acts as a buffer. Right now it performs the method in Subscribe either when it gets 10 items, or after 100 milliseconds, whichever comes first.

我注意到,我的方法是不断地被称为每100毫秒,即使有在缓冲区中没有任何项目,这让我吃惊。这是很简单,只是让我的方法立即返回,如果它收到从缓冲区中没有产品,但我认为这件事很奇怪,它只是翻腾走在这样的背景。

I noticed that my method is continuously being called every 100 ms, even when there are no items in the buffer, which surprised me. It's simple enough to just make my method return immediately if it receives no items from the buffer, but I thought it was weird that it's just churning away in the background like that.

这是为什么?你怎么推荐我这个最划算?我是一个完整的新手到RX,所以也许我在做一些奇怪的。下面是我的代码的简化版本:

Why is this? How do you recommend I best deal with this? I am a complete newbie to Rx, so maybe I'm doing something weird. Here's a simplified version of my code:

private Subject<KeyValuePair<int, Action<MyData>>> serverRequests;

public MyBufferClass(IMyServer server, IScheduler scheduler)
{
    this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();

    this.serverRequests
        .Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler)
        .Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}   

public void GetSingleItemFromServer(int id, Action<MyData> callback)
{
    this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback));
}

public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks)
{
    if (idsWithCallbacks.IsNullOrEmpty()) return;

    this.server.GetMultipleItems(idsWithCallbacks)
}



在我的测试,如果我叫GetSingleItemFromServer 5倍,用1000毫秒推进我TestScheduler,我以为GetMultipleItemsFromServer只会被调用一次,但它被调用10次。

In my tests, if I call GetSingleItemFromServer 5 times and then advance my TestScheduler by 1000 ms, I thought GetMultipleItemsFromServer would only be called once, but it gets called 10 times.

推荐答案

在这样的情况下,一个优雅的解决方案可以使用Where运算符缓冲后直接过滤掉所有空的结果。事情是这样的:

In situations like this an elegant solution can be to use the Where operator straight after the Buffer to filter out any empty results. Something like this:

            stream
            .Buffer (...)
            .Where (x => x.Any())
            .Subscribe (x => {...}, ex => {...});



至于为什么缓冲作用就像这样,我想这是更好地表面的空收藏并允许消费者选择什么用它做,而不是吞下它,并拒绝这样的机会。

As to why Buffer acts like this, I suppose it's better to surface an empty collection and allow the consumer to choose what to do with it, than to swallow it and deny that opportunity.

在一个单独的说明,我不会有订阅块中服务器的呼叫。我认为这是一个更好的主意有任何异步操作的接收流组成本身的一部分,并限制订阅行动到最终的结果,即更新UI,登录成功/失败等事情处理的轻量级操作像这样的:

On a separate note, I wouldn't have your server call within the subscribe block. I think it's a better idea to have any asynchronous operations as a part of the Rx stream composition itself, and to restrict the Subscribe action to any lightweight operations that deal with the final result, i.e. updating the UI, logging success/failure etc. Something like this:

(from request in serverRequests
            .Buffer (TimeSpan.FromMinutes (1))
            .Where (x => x.Any())
from response in Observable.Start(server.GetMultipleItems(...))
select response)
.Subscribe (x => {}, ex => {});



优点情况包括:

Advantages to this include:

- 能够在你的服务器上调用,如超时(),重试(),捕捉()等进一步使用接收的运营商。

-Being able to use further Rx operators on your server call, such as Timeout(), Retry(), Catch(), etc.

-Being能够处理任何管道在订阅()过载

-Being able to handle any pipeline errors within the Subscribe() overload

不依赖管道和SubscribeOn()/ ObserveOn()的订阅行动计划。

-Independent scheduling of the pipeline and the Subscribe action with SubscribeOn()/ObserveOn().

这篇关于为什么接收缓冲区时连续缓冲区不包含的项目执行方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆