RX - 拉链输出一个意想不到的结果 [英] RX - Zip outputs an unexpected result

查看:95
本文介绍了RX - 拉链输出一个意想不到的结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请帮我明白了一个现象:



为什么为X不等于可观察的项目指数

$ B?
$ b

例如建筑板块:

 公共类EcgSample 
{
公共EcgSample(int y)对
{
Y = Y;
}

公众诠释X {搞定;组; }
公众诠释Ÿ{搞定;组; }
}

私人无效打印(元组LT; EcgSample,INT> S)
{
的Debug.WriteLine(X:{0},Y:{1 }指数:{2},s.Item1.X,s.Item1.Y,s.Item2);
}

私人列表< EcgSample> CreateSamples()
{
变种testSamples =新的List< EcgSample>();

为(简称I = 0; I< 1400;我++)
{
testSamples.Add(新EcgSample(I));
}

返回testSamples;
}



例如观察到的:(其输出期望的结果)

  //(1)创建从收藏。 
的IObservable< EcgSample> 。sampleObservable = CreateSamples()ToObservable(新EventLoopScheduler());

//(2)重复
的IObservable< EcgSample>反复= sampleObservable.Repeat();

//(3)索引
的IObservable<元组LT; EcgSample,INT>>指数= repeated.Select((项指数)=>
{
item.X =指数;
返回新的记录< EcgSample,INT>(项指标);
});

//(4)缓冲
的IObservable<&IList的LT元组LT; EcgSample,INT>>>缓冲= indexed.Buffer(250);

//(5)和的SelectMany打印。
_disposable = buffered.SelectMany(BUF => BUF).Subscribe(打印);



OUTPUT:这是观测序列的期望输出



  [8384] X:0,Y:0,指数:0 
[8384] X:1,Y:1,指标:1
〔8384〕X:2,Y:2,指数:2
[8384] X:3,Y:3,指数:3
[8384] X:4,Y:4,指标: 4

修改(不输出联合国预期结果)



现在我wan't每个缓冲区采取每间隔:

  //(5)创建从定时器可观察到的。 
的IObservable< ElapsedEventArgs> timerObservable = Observable.Create< ElapsedEventArgs>(
观察者=>
{
变种定时器=新的Timer();
timer.Interval = 250;
定时器。经过+ =(S,E)=> observer.OnNext(E);
timer.Start();
返回Disposable.Create(()=>
{
timer.Stop();
});
});

//(6)与缓冲观察到
的IObservable<邮编; IList的<元组LT; EcgSample,INT>>>压缩= timerObservable.Zip(缓冲,(T,B)=> B);

//(7)和的SelectMany打印。
_disposable = zipped.SelectMany(BUF => BUF).Subscribe(打印);



输出:此输出了意想不到的结果:的通知是X不等于指数

  [9708] X:187600,Y:0,指数:0 
[9708] X: 187601,Y:1,指标:1
[9708] X:187602,Y:2,指数:2
[9708] X:187603,Y:3,指数:3

任何想法,为什么X启动以187600 (不用说,我每次运行时该值是不同的?我的程序)。



编辑:



我只需在最后突出解决了问题,但我还是想知道为什么第一个问题出现。

 列表< EcgSample>清单= CreateSamples(); 

无功循环=新EventLoopScheduler();
VAR sampleObservable = list.ToObservable(循环);

的IObservable< EcgSample> reapeted = sampleObservable.Repeat();

的IObservable<&IList的LT; EcgSample>>缓冲= reapeted.Buffer(250);

的IObservable< ElapsedEventArgs> timerObservable = Observable.Create< ElapsedEventArgs>(
观察者=>
{
变种定时器=新的Timer();
timer.Interval = 250;
定时器。经过+ =(S,E)=> observer.OnNext(E);
timer.Start();
返回Disposable.Create(()=>
{
timer.Stop();
});
});

的IObservable<&IList的LT; EcgSample>>压缩= timerObservable.Zip(缓冲,(T,B)=> B);

_disposable = zipped.SelectMany(BUF => BUF)。选择((项指数)=>
{
item.X =指数;
返回新行< EcgSample,INT>(项指标);

})订阅(打印)。


解决方案

您的回答显示了一个单一的东西,你可以改变得到行为你想要的,但它不是真的,为什么它没有工作,你所期望的方式的原因。



如果你想在观测的每个条目与多家关联,你实际上应该是与一些关联。你这样做了,还有就是流和数字中的每个元素之间没有实际的连接。你只是修复,确保您处理每一个项目下一个来通过之前,因此数恰好是在合适的值。但是,这是一个非常片状的局面。



如果你只是想你在做什么项目到流上运行计数,看看的选择的,让你的索引超载

  stream.Select((项指数)=>新建项目{,指数})
.Subscribe(数据=> ;的Debug.WriteLine(索引项{0} {1},data.index,data.item))

另外,如果你想要的东西是不同的只是在流项目的数量,你可以这样做:

  stream.Select(项目=>新建{项指数=<一些价值你计算>})
...

这样,你的对象和它的指数是绑在一起。你可以在任何未来的某个时刻使用该项目的索引,并且还知道它的指数。而你的代码依赖于获取到下一个处理之前的每个项目。



为了解决编辑你的问题



首先,看一看的 Observable.Interval 。它做你想要什么你的计时器做,但更容易。



其次,看看下面的例子中抄录你在做什么你的问题。运行该代码产生正确的输出:

  VAR项目= Enumerable.Range(65,26)
。选择( I =>(char)的我)
.Repeat();

VAR observableItems = items.ToObservable()
。选择((C,I)=>新建{字符= C,指数= I});

VAR间隔= Observable.Interval(TimeSpan.FromSeconds(0.25));

变种缓冲= observableItems.Buffer(10);
变种压缩= buffered.Zip(间隔,(缓冲液,_)=>缓冲液);

zipped.SelectMany(缓冲=>缓冲区)使用.dump();

您可以运行在LinqPad的代码,这是探索接收(和其他部件一个非常有用的工具。净的)



最后 - 我想这是一个简单的运动,试图找出发生的事情在您的情况。它看起来像你可能试图应付与推比要处理更多的更新的传感器数据。使用带有拉链的间隔也不会有多大效果这一点。你会减慢数据的到达率,但它只是建立数据等待通过邮编得到的越来越大的队列。



如果你想获得一个数据点每250毫秒,看样品。如果你想获得250微秒一次读数,看的缓冲区,需要一个时间跨度,而不是数过载。


Please help me understand a phenomenon :

Why is X NOT equal to index in the Observable's items ?

Building blocks for example :

        public class EcgSample
        {               
            public EcgSample(int y)
            {
                Y = y;   
            } 

            public int X { get; set; }
            public int Y { get; set; }  
        }

        private void Print(Tuple<EcgSample, int> s)
        {
              Debug.WriteLine("X : {0} , Y : {1} , Index : {2}", s.Item1.X, s.Item1.Y, s.Item2);
        }

        private List<EcgSample> CreateSamples()
        {
            var testSamples = new List<EcgSample>();

            for (short i = 0; i < 1400; i++)
            {
               testSamples.Add(new EcgSample(i));   
            }

            return testSamples;
        }

Example observable : (Which outputs an expected result)

       // (1) Create From Collection .
       IObservable<EcgSample> sampleObservable = CreateSamples().ToObservable(new EventLoopScheduler());

       // (2) Repeat 
       IObservable<EcgSample> repeated = sampleObservable.Repeat();

       // (3) Indexed 
       IObservable<Tuple<EcgSample,int>> indexed = repeated.Select((item, index) =>
       {
           item.X = index;
           return new Tuple<EcgSample, int>(item, index);
       }); 

       // (4) Buffered 
       IObservable<IList<Tuple<EcgSample, int>>> buffered = indexed.Buffer(250); 

       // (5) SelectMany and Print .
       _disposable = buffered.SelectMany(buf => buf).Subscribe(Print);

OUTPUT : This is the expected output of the Observable sequence .

       [8384] X : 0 , Y : 0 , Index : 0 
       [8384] X : 1 , Y : 1 , Index : 1 
       [8384] X : 2 , Y : 2 , Index : 2 
       [8384] X : 3 , Y : 3 , Index : 3 
       [8384] X : 4 , Y : 4 , Index : 4 

Modification : (Which DOES NOT outputs an UN-Expected result)

Now i wan't each buffer to be taken every interval :

     // (5) Create an Observable from a Timer. 
     IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
            observer =>
            {
                var timer = new Timer();
                timer.Interval = 250;
                timer.Elapsed += (s, e) => observer.OnNext(e);
                timer.Start();
                return Disposable.Create(() =>
                {
                    timer.Stop();
                });
            });

        // (6) Zip with the buffer observable 
        IObservable<IList<Tuple<EcgSample, int>>> zipped = timerObservable.Zip(buffered, (t, b) => b);

        // (7) SelectMany and Print .
        _disposable = zipped.SelectMany(buf => buf).Subscribe(Print);

OUTPUT : This outputs an unexpected result : notice that X is not equal to index.

   [9708] X : 187600 , Y : 0 , Index : 0 
   [9708] X : 187601 , Y : 1 , Index : 1 
   [9708] X : 187602 , Y : 2 , Index : 2 
   [9708] X : 187603 , Y : 3 , Index : 3 

Any ideas why X starts at 187600 ( Needless to say this value is different every time i run my program) ..?

EDIT :

I Solved the issue by simply projecting at the end , but i would still like to know why the first issue occurs .

        List<EcgSample> list = CreateSamples();     

        var loop = new EventLoopScheduler();
        var sampleObservable = list.ToObservable(loop);

        IObservable<EcgSample> reapeted = sampleObservable.Repeat();

        IObservable<IList<EcgSample>> buffered = reapeted.Buffer(250);

        IObservable<ElapsedEventArgs> timerObservable = Observable.Create<ElapsedEventArgs>(
            observer =>
            {
                var timer = new Timer();
                timer.Interval = 250;
                timer.Elapsed += (s, e) => observer.OnNext(e);
                timer.Start();
                return Disposable.Create(() =>
                {
                    timer.Stop();
                });
            });

        IObservable<IList<EcgSample>> zipped = timerObservable.Zip(buffered, (t, b) => b);

        _disposable = zipped.SelectMany(buf => buf).Select((item, index) =>
        {
            item.X = index;
            return new Tuple<EcgSample, int>(item, index);

        }).Subscribe(Print);

解决方案

Your answer shows a single thing you can change to get the behaviour you want, but it's not really the reason why it didn't work the way you expected.

If you want to associate each entry in the Observable with a number, you should actually associate it with a number. The way you're doing it, there's no actual connection between each element in the stream and the number. Your fix just ensures that you handle each item before the next one comes through, so the number happens to be at the right value. But that's a very flaky situation.

If you just want a running count of what item you're up to on the stream, have a look at the overload of Select that gives you the index:

stream.Select((item, index) => new { item, index })
      .Subscribe(data => Debug.WriteLine("Item at index {0} is {1}", data.index, data.item))

Alternately, if you want something that's different from just a count of items on the stream, you could do something like:

stream.Select(item => new { item, index = <some value you calculate> })
...

This way your object and its index are tied together. You can use the item's index at any future point and still know what its index was. Whereas your code relies on getting to each item before the next one is processed.

To address the edits in your question

Firstly, have a look at Observable.Interval. It does what you're trying to do with your timer, but much more easily.

Secondly, have a look at the below example which reproduces what you're doing in your question. Running this code produces the correct output:

var items = Enumerable.Range(65, 26)
                      .Select(i => (char)i)
                      .Repeat();

var observableItems = items.ToObservable()
                           .Select((c, i) => new { Char = c, Index = i });

var interval = Observable.Interval(TimeSpan.FromSeconds(0.25));

var buffered = observableItems.Buffer(10);
var zipped = buffered.Zip(interval, (buffer, _) => buffer);

zipped.SelectMany(buffer => buffer).Dump();

You can run that code in LinqPad, which is a very useful tool for exploring Rx (and other parts of .Net).

Lastly - I assume this is a simplified exercise to try to work out what's happening in your situation. It looks like you're possibly trying to cope with sensor data that pushes more updates than you want to handle. Using Zip with an interval won't help much with that. You'll slow the rate of arrival of data, but it will just build up a bigger and bigger queue of data waiting to get through Zip.

If you want to get a data point every 250 milliseconds, look at Sample. If you want to get 250 milliseconds worth of readings at a time, look at the overload of Buffer that takes a timespan instead of a count.

这篇关于RX - 拉链输出一个意想不到的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆