将一个 observable 与另一个 observable 的最新相结合 [英] combining one observable with latest from another observable

查看:26
本文介绍了将一个 observable 与另一个 observable 的最新相结合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试组合两个值共享某个键的 observable.

I'm trying to combine two observables whose values share some key.

每当第一个 observable 产生一个新值时,我想产生一个新值,结合第二个 observable 的最新值,哪个选择取决于第一个 observable 的最新值.

I want to produce a new value whenever the first observable produces a new value, combined with the latest value from a second observable which selection depends on the latest value from the first observable.

伪代码示例:

var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x)

var obs2 = Observable.Interval(TimeSpan.FromMilliSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x)

from x in obs1
  let latestFromObs2WhereKeyMatches = …
  select Tuple.create(x, latestFromObs2WhereKeyMatches)

有什么建议吗?

显然,这可以通过订阅第二个 observable 并创建一个字典来实现,该字典具有可通过键索引的最新值.但我正在寻找不同的方法..

Clearly this could be implemented by subcribing to the second observable and creating a dictionary with the latest values indexable by the key. But I'm looking for a different approach..

使用场景:从股票报价流中计算出的一分钟价格柱.在这种情况下,关键是股票代码,字典包含具体股票代码的最新卖价和买价,然后用于计算.

Usage scenario: one minute price bars computed from a stream of stock quotes. In this case the key is the ticker and the dictionary contains latest ask and bid prices for concrete tickers, which are then used in the computation.

(顺便说一句,谢谢 Dave 和 James,这是一次非常富有成效的讨论)

(By the way, thank you Dave and James this has been a very fruitful discussion)

(抱歉格式问题,很难在 iPad 上正确使用..)

(sorry about the formatting, hard to get right on an iPad..)

推荐答案

我想知道这样一个查询的目的.你介意描述一下使用场景吗?

I'd like to know the purpose of a such a query. Would you mind describing the usage scenario a bit?

尽管如此,以下查询似乎可以解决您的问题.如果您已经有某种方法可以识别每个值的来源,则不需要初始预测,但为了概括起见,我将它们包括在内,以与您极其抽象的提问模式保持一致.;-)

Nevertheless, it seems like the following query may solve your problem. The initial projections aren't necessary if you already have some way of identifying the origin of each value, but I've included them for the sake of generalization, to be consistent with your extremely abstract mode of questioning. ;-)

注意:我假设 someKeyThatVaries 不是您展示的共享数据,这就是为什么我还包含术语 anotherKeyThatVaries;否则,整个查询对我来说真的毫无意义.

Note: I'm assuming that someKeyThatVaries is not shared data as you've shown it, which is why I've also included the term anotherKeyThatVaries; otherwise, the entire query really makes no sense to me.

var obs1 = Observable.Interval(TimeSpan.FromSeconds(1))
                     .Select(x => Tuple.Create(someKeyThatVaries, x));
var obs2 = Observable.Interval(TimeSpan.FromSeconds(.25))
                     .Select(x => Tuple.Create(anotherKeyThatVaries, x));

var results = obs1.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 1 })
                  .Merge(
              obs2.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 2 }))
                  .GroupBy(t => t.Key, t => new { t.Value, t.Kind })
                  .SelectMany(g =>
                    g.Scan(
                      new { X = -1L, Y = -1L, Yield = false },
                      (acc, cur) => cur.Kind == 1
                                  ? new { X = cur.Value, Y = acc.Y, Yield = true }
                                  : new { X = acc.X, Y = cur.Value, Yield = false })
                      .Where(s => s.Yield)
                      .Select(s => Tuple.Create(s.X, s.Y)));

这篇关于将一个 observable 与另一个 observable 的最新相结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆