Rx.NET:按顺序组合可观察对象 [英] Rx.NET: Combine observables in order

查看:87
本文介绍了Rx.NET:按顺序组合可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有2个 IConnectableObservable ,其中一个正在重播旧的历史记录消息,另一个正在发出新的当前值:

I have 2 IConnectableObservables where one is replaying old historic messages and the other is emitting fresh current values:

HistoricObservable: - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - ...
CurrentObservable:    - - - - - 5 - 6 - 7 - 8 - 9 - 10 - ...

如何将它们合并为单个可观察对象,这样我就可以从两个可观察对象中获取完整(正确)序列,但是一旦我开始从CurrentObservable中发出值,就删除订阅并在HistoricObservable订阅上调用Dispose.

How can I merge them into a single observable such that I get the full (correct) sequence from both observables, but also drop the subscription and call Dispose on the HistoricObservable subscription once I've started emitting values from CurrentObservable.

MergedObservable: - 1 - 2 - 3 - 4 - 56 - 7 - 8 - 9 - 10 - ...

我的消息由向导标识,因此该解决方案只能使用Equal对其进行比较,并且除了依赖于每个可观察对象发出的顺序之外,不能依赖任何顺序.

My messages are identified by a Guid, so the solution can only compare them using Equal and can't rely on any ordering other than how it's emitted by each of the observables.

简而言之,我正在寻找一种方法:

In short I'm looking to populate a the method:

public static IObservable<T> MergeObservables<T>(
    IObservable<T> historicObservable,
    IObservable<T> currentObservable)
    where T : IEquatable<T>
{
    throw new NotImplementedException();
}

MergedObservable应该保持从HistoricObservable发出值,而无需等待CurrentObservable的第一个值,如果先前已经发出CurrentObservable的第一个值,则MergedObservable应该跳过已发出的CurrentObservable的任何值,并处理对HistoricObservable的订阅,并开始从CurrentObservable获取所有新值.我也不想在CurrentObservable发出第一个对象时立即切换,直到我到达HistoricObservable中的那个点为止,所以我一直很难尝试使用TakeWhile/TakeUntil.我在使用CombineLatest来保存状态方面取得了一些次要的成功,但是我认为可能有更好的方法.

The MergedObservable should keep emitting values from HistoricObservable without waiting for the first value from CurrentObservable, and if the first value from CurrentObservable has already been emitted previously then MergedObservable should skip over any values in CurrentObservable already emitted, dispose of the subscription to HistoricObservable, and start taking all new values from CurrentObservable. I also don't want to immediately switch over when the first object is emitted by the CurrentObservable until I get to that point in the HistoricObservable so I've been having a hard time trying to use TakeWhile/TakeUntil. I've been having some minor success with CombineLatest to save state, but I'm thinking there's probably a better way.

对于以下测试用例,假定每个消息均由GUID表示,如下所示:

For the following test cases assume each message is represented by a GUID as follows:

A = E021ED8F-F0B7-44A1-B099-9878C6400F34
B = 1139570D-8465-4D7D-982F-E83A183619DE
C = 0AA2422E-19D9-49A7-9E8C-C9333FC46C46
D = F77D0714-2A02-4154-A44C-E593FFC16E3F
E = 14570189-4AAD-4D60-8780-BCDC1D23273D
F = B42983F0-5161-4165-A2F7-074698ECCE77
G = D2506881-F8AB-447F-96FA-896AEAAD1D0A
H = 3063CB7F-CD25-4287-85C3-67C609FA5679
I = 91200C69-CC59-4488-9FBA-AD2D181FD276
J = 2BEA364E-BE86-48FF-941C-4894CEF7A257
K = 67375907-8587-4D77-9C58-3E3254666303
L = C37C2259-C81A-4BC6-BF02-C96A34011479
M = E6F709BE-8910-42AD-A100-2801697496B0
N = 8741D0BB-EDA9-4735-BBAF-CE95629E880D

1)如果历史可观察对象永远无法赶上当前可观察对象,那么合并后的可观察对象绝不应该从当前可观察对象发出任何东西

1) If the historic observable never catches up to the current observable then the merged observable should never emit anything from the current observable

Historic: - A - B - C - D - E - F - G - H|
Current:    - - - - - - - - - - - - - - - I - J - K - L - M - N|
Merged:   - A - B - C - D - E - F - G - H|

2)一旦历史可观测值达到当前可观测值发出的第一个值,则合并的可观测值应立即发出当前当前可观测值所发出的所有值,并与历史可观测值断开连接.

2) As soon as the historic observable reaches the first value emitted by the current observable then the merged observable should immediately emit all values previously emitted by current observable and disconnect from the historic observable.

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|

3)解决方案应该能够处理来自当前可观测值的值,而不是历史可观测值.

3) The solution should be able to handle values coming from the current observable before the historic observable.

Historic: - - - - - A - B - C - D - E - F - G - H - I - J|
Current:  - - C - D - E - F - G - H - I - J - K - L - M - N|
Merged:   - - - - - A - B - CDEF-G-H- I - J - K - L - M - N|

4)如果已经发出了当前可观察值,则解决方案应跳过它们,直到发出新值为止.

4) If the values from current observable have already been emitted then the solution should skip over them until a new value is emitted.

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - - - B - C - D - E - F - G - H - I - J|
Merged:   - A - B - C - D - - - - - - E - F - G - H - I - J|

5)对于我的用例,我保证当前的可观察对象将是历史的可观察子集,但是为了完整起见,我想该解决方案将继续从历史的可观察对象的思维中拉出,即第一个元素将出现在以后点

5) For my use cases I am guaranteed that the current observable will be a subset of historic, but for the sake of completeness I would imagine the solution would continue pulling from the historic observable thinking that the first element will occur at a later point

Historic: - - - - - E - F - G - H - I - J - ... - Z - A|
Current:  - - A - B - C - D - E - F - G - H - I - J|
Merged:   - - - - - E - F - G - H - I - J - ... - Z - ABCDEFGHIJ|

6)我还保证,历史可观察对象在同步后不会与当前可观察对象有所不同,但是如果出于某些原因,它们合并后的可观察对象应该已经与它断开连接并且不会选择任何差异

6) I'm also guaranteed that the historic observable won't differ from the current observable once they're synced up, but if for some reason they do the merged observable should have already disconnected from it and won't pick up any differences

Historic: - A - B - C - D - E - D - C - B - A|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|


有关创建有效解决方案的帮助,以下是一些输入数据:


The help with creating a working solution, here's some input data:

var historic = new Subject<int>();
var current = new Subject<int>();

// query & subscription goes here

historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);

正确的解决方案应产生1到10之间的数字.

A correct solution should produce the numbers from 1 to 10.

推荐答案

回到这一点,我终于在我所要研究的方向上取得了一些进展.在说明中通过了所有测试用例,因此我认为这对于我的用例来说已经足够了.建设性的反馈总是值得赞赏的.

Coming back to this I finally made some progress on the direction that I've been going in. Got it to go through all the test cases in the description so I think this will be good enough for my use cases. Constructive feedback is always appreciated.

public static IObservable<T> CombineObservables<T>(
    IObservable<T> historicObservable,
    IObservable<T> currentObservable)
    where T : IEquatable<T>
{
    var cachedCurrent = currentObservable.Replay();
    cachedCurrent.Connect();

    var firstMessage = cachedCurrent.FirstAsync();

    var emittedHistoryItems = new List<T>();

    var part1 = historicObservable.TakeUntil(firstMessage)
                                  .Do(x => emittedHistoryItems.Add(x));

    var part2 = historicObservable.CombineLatest(firstMessage, Tuple.Create)
                                  .TakeWhile(x =>
                                             {
                                                 var historyItem = x.Item1;
                                                 var first = x.Item2;

                                                 return !emittedHistoryItems.Any(y => y.Equals(first)) && !historyItem.Equals(first);
                                             })
                                  .Select(x => x.Item1)
                                  .Do(x => emittedHistoryItems.Add(x));

    var part3 = cachedCurrent.SkipWhile(x => emittedHistoryItems.Contains(x));

    return part1.Concat(part2).Concat(part3);
}

小提琴示例: https://dotnetfiddle.net/6BqfiW

这篇关于Rx.NET:按顺序组合可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆