与无扩展事件重新排序 [英] Reordering events with Reactive Extensions

查看:109
本文介绍了与无扩展事件重新排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图重新安排在不同的线程到达的无序事件



是否有可能创造这些大理石图相匹配的反应扩展查询:



  S1 1 2 3 4 

S2 1 3 2 4

结果1 2 3 4

和...

  S1 1 2 3 4 

S2 4 3 2 1

结果1234

这就是:只有发布版本号顺序结果



我得到的最接近是使用加入。打开一个窗口,每次S1蜱S2时到达相同数量只有关闭它。



这样的:

  VAR publishedEvents = events.Publish() .RefCount(); 
publishedEvents.Join(
publishedEvents.Scan(0,(I,O)= I标记+ 1),
expectedVersion => publishedEvents.Any(@event => @event .Version == expectedVersion),
_ => Observable.Never<单元>(),
(@event,expectedVersion)=>新建{@事件,expectedVersion})
。其中(x => x.expectedVersion ==点¯x@ event.Version)
。选择(X =>英寸x @事件)
.Subscribe(坚持);一旦S2与蜱



但不会与图2不2组的工作将全部完成2号,因此前1。



是否有意义?可以用它接收做什么?如果它



修改的?我想这就像重叠的窗口,在那里更高版本的Windows无法关闭之前,所有前面的窗口已经关闭。 。和窗口数量相匹配的事件版本号之前,前面的窗口不会关闭



编辑2 的:



我有这样的事情了,但它不是真正的无功,功能,线程安全的LINQ启示,我希望(请忽略我的事件JObjects现在):

  VAR orderedEvents = Observable.Create< JObject>(观察员=> 
{
VAR nextVersionExpected = 1;
VAR previousEvents =新的List< JObject>();
返回事件
.ObserveOn(Scheduler.CurrentThread)
.Subscribe(@event =>
{
previousEvents.Add(@event);

VAR的版本=(长)@event [版本];
如果(版本= nextVersionExpected!)回报;

的foreach(在previousEvents.OrderBy VAR previousEvent(X =方式>(长)×[版本])了ToList())
{
如果((长)previousEvent [版本] != nextVersionExpected)
中断;

observer.OnNext(previousEvent);
previousEvents.Remove(previousEvent);
nextVersionExpected ++;
}
});
});


解决方案

简介



的关键,这个问题的排序。反正你看它,需要某种形式的缓冲。虽然毫无疑问,运营商的一些复杂的组合可以实现这一点,我觉得这是一个很好的例子,其中 Observable.Create 是一个不错的选择。



一般化的解决方案



我做了一些努力来推广我的方法来接受任何类型的订购关键的。要做到这一点,我希望给予:




  • 用于获取事件的关键一键切换功能,输入的 Func键< TSource,TKEY的>

  • 类型的初始密钥 TKEY的

  • 一个函数来获取序列中的下一个关键的类型, Func键< TKEY的,TKEY的>

  • 系统结果选择生成从源流的配对活动类型 Func键<,结果; TSource,TSource,TSource>



由于我只使用这些被满足了我的测试基于1的整数序列:




  • 的KeySelectors: I =>我

  • firstKey: 1

  • nextKeyFunc: K => K + 1

  • resultSelector:(左,右)= GT;左



排序



下面是我的排序的尝试。它缓冲事件成词典和尽快刷新他们的用户:

 公共静态的IObservable< TSource>排序和LT; TSource,TKEY的> 
(本的IObservable< TSource>源,
Func键< TSource,TKEY的>的KeySelectors,
TKEY的firstKey,
Func键< TKEY的,TKEY的> nextKeyFunc)
{
返回Observable.Create< TSource>(O =>
{
VAR NEXTKEY = firstKey;
变种缓冲=新词典< TKEY的,TSource>();
返回source.Subscribe(I =>
{
如果(的KeySelectors(I).Equals(NEXTKEY))
{
NEXTKEY = nextKeyFunc(NEXTKEY);
○ .OnNext(我);
TSource nextValue;
,而(buffer.TryGetValue(NEXTKEY,出nextValue))
{
buffer.Remove(NEXTKEY);
○ .OnNext(nextValue);
NEXTKEY = nextKeyFunc(NEXTKEY);
}
}
,否则buffer.Add(的KeySelectors(ⅰ),ⅰ);
}) ;
});
}



我不得不说这是一个非常天真的实现。在过去的生产代码,我已经阐述了这一特定的错误处理,一个固定大小的缓冲区和超时,以防止资源泄漏。然而,将用于本实施例做的。 !)



通过这个排序(对不起),我们现在可以看一下处理多个数据流。



合并结果



第一次尝试



我在这第一次尝试是产生已看到事件的无序流所需的次数。这然后可以进行排序。我通过关键分组元素,使用 GroupByUntil 持有每个组,直到两个元素已被抓获做到这一点。每个组是否相同键的结果的流。对于整数事件的简单的例子,我可以只取各组的最后一个元素。但是,我不喜欢这个,因为它是尴尬,其中每个结果流可能是导致一些有用的东西更真实的场景。我包括为感兴趣的缘故的代码。请注意,这样的测试可以在此和我的第二次​​尝试之间共享,我接受了一个未使用的resultSelector参数:

 公共静态的IObservable< ; TSource> OrderedCollect< TSource,TKEY的> 
(本的IObservable< TSource>左,
的IObservable< TSource>右,
Func键< TSource,TKEY的>的KeySelectors,
TKEY的firstKey,
Func键< TKEY的,TKEY的> ; nextKeyFunc
Func键< TSource,TSource,TSource> resultSelector)
{
返回left.Merge(右)
.GroupByUntil(的KeySelectors,X => x.Take(2 ).LastAsync())
.SelectMany(X => x.LastAsync())
的.sort(的KeySelectors,firstKey,nextKeyFunc);
}



旁白:您可以在破解的SelectMany 子句来决定如何挑选结果。一个优点这种解决方案具有在第二次尝试,是在与许多结果情景流很容易地看到如何扩展它挑说,第一三分之二的结果元组到达。



第二次尝试



有关这个方法我每个流分别排序,然后邮编的结果在一起。这不仅是一个更简单的操作看,它也更容易从结果以有趣的方式每个流结合起来。为了保持测试,我的第一个方法兼容,我挑resultSelector功能使用的第一个数据流的事件的结果,但很明显,你必须灵活地做一些有用的东西在您的方案:

 公共静态的IObservable< TSource> OrderedCollect< TSource,TKEY的> 
(本的IObservable< TSource>左,
的IObservable< TSource>右,
Func键< TSource,TKEY的>的KeySelectors,
TKEY的firstKey,
Func键< TKEY的,TKEY的> ; nextKeyFunc,
Func键< TSource,TSource,TSource> resultSelector)
{
返回Observable.Zip(
left.Sort(的KeySelectors,firstKey,nextKeyFunc),
right.Sort(的KeySelectors,firstKey,nextKeyFunc),
resultSelector);
}



旁白:这不是太难看这个代码如何延长至更一般的情况下接受任何数目的输入流,但由于前面提到,使用邮编使得它非常不灵活的以给定的关键阻塞,直到所有流结果是



测试用例



最后,这里是我的测试中回荡你的榜样方案。要运行这些进口的NuGet包 RX-测试 NUnit的,并把实现入一个静态类:

 公共类ReorderingEventsTests:ReactiveTest 
{
[测试]
公共无效ReorderingTest1()
{
无功调度=新T​​estScheduler();

变种S1 = scheduler.CreateColdObservable(
OnNext(100,1),
OnNext(200,2),
OnNext(400,3),
OnNext(500,4));

变种S2 = scheduler.CreateColdObservable(
OnNext(100,1),
OnNext(200,3),
OnNext(300,2),
OnNext(500,4));

VAR的结果= scheduler.CreateObserver< INT>();

s1.OrderedCollect(
右:S2,
的KeySelectors:I = I标记,
firstKey:1,
nextKeyFunc:I =>我+ 1,
resultSelector:(左,右)= GT;左).Subscribe(结果);

scheduler.Start();

results.Messages.AssertEqual(
OnNext(100,1),
OnNext(300,2),
OnNext(400,3),
OnNext(500,4));
}

[测试]
公共无效ReorderingTest2()
{
无功调度=新T​​estScheduler();

变种S1 = scheduler.CreateColdObservable(
OnNext(100,1),
OnNext(200,2),
OnNext(300,3),
OnNext(400,4));

变种S2 = scheduler.CreateColdObservable(
OnNext(100,4),
OnNext(200,3),
OnNext(300,2),
OnNext(400,1));

VAR的结果= scheduler.CreateObserver< INT>();

s1.OrderedCollect(
右:S2,
的KeySelectors:I = I标记,
firstKey:1,
nextKeyFunc:I =>我+ 1,
resultSelector:(左,右)= GT;左).Subscribe(结果);

scheduler.Start();

results.Messages.AssertEqual(
OnNext(400,1),
OnNext(400,2),
OnNext(400,3),
OnNext(400,4));
}
}



钻营,以避免重复



最后的评论,因为我讨厌在代码重复自己,下面是避免重复。我在第二种方法叫排序一个调整。我没有在主体包括以避免混淆读者不熟悉柯里:

 公共静态的IObservable< TSource> OrderedCollect< TSource,TKEY的> 
(本的IObservable< TSource>左,
的IObservable< TSource>右,
Func键< TSource,TKEY的>的KeySelectors,
TKEY的firstKey,
Func键< TKEY的,TKEY的> ; nextKeyFunc,
Func键< TSource,TSource,TSource> resultSelector)
{
Func键<的IObservable< TSource>中的IObservable< TSource>> curriedSort =
事件= GT; events.Sort(的KeySelectors,firstKey,nextKeyFunc);

返回Observable.Zip(
curriedSort(左),
curriedSort(右),
resultSelector);
}


I'm trying to reorder events arriving unordered on different threads.

Is it possible to create a reactive extension query that matches these marble diagrams:

s1          1   2       3   4

s2          1   3   2       4

result      1       2   3   4

and...

s1          1   2   3   4

s2          4   3   2   1

result                  1234

That is: Only publish results in version number order.

The closest I have got is using a Join to open a window each time s1 ticks and only close it when s2 arrives with the same number.

Like this:

var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
        publishedEvents.Scan(0, (i, o) => i + 1),
        expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
        _ => Observable.Never<Unit>(),
        (@event, expectedVersion) => new {@event,expectedVersion})
    .Where(x => x.expectedVersion == x.@event.Version)
    .Select(x => x.@event)
    .Subscribe(Persist);

But that won't work with diagram no 2. Group 2 will be completed once s2 ticks with the number 2, and thus before 1.

Does it make sense? Can it be done with Rx? Should it?

EDIT: I guess it's like overlapping windows, where later windows can't close before all preceding windows have closed. And the preceding windows won't close before the window number matches the event version number.

EDIT 2:

I have something like this now, but it's not really the reactive, functional, thread-safe LINQ-revelation, I hoped for (please ignore that my events are JObjects for now):

var orderedEvents = Observable.Create<JObject>(observer =>
{
    var nextVersionExpected = 1;
    var previousEvents = new List<JObject>();
    return events
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(@event =>
        {
            previousEvents.Add(@event);

            var version = (long) @event["Version"];
            if (version != nextVersionExpected) return;

            foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
            {
                if ((long) previousEvent["Version"] != nextVersionExpected)
                    break;

                observer.OnNext(previousEvent);
                previousEvents.Remove(previousEvent);
                nextVersionExpected++;
            }
        });
});

解决方案

Introduction

The key to this problem is the sort. Anyway you look at it, some form of buffering is required. Whilst no doubt some elaborate combination of operators might achieve this, I think this is a good example where Observable.Create is a good choice.

Generalizing the solution

I've made some effort to generalize my approach to accept any type of ordering key. To do this, I expect to be given:

  • A key selector function used to obtain the key of an event, of type Func<TSource,TKey>
  • The initial key of type TKey
  • A function to get the next key in sequence, of type Func<TKey,TKey>
  • A result selector to generate the result from the paired up events in the source streams, of type Func<TSource,TSource,TSource>

Since I'm just using a 1-based integer sequence for my tests these are satisfied by:

  • keySelector: i => i
  • firstKey: 1
  • nextKeyFunc: k => k+1
  • resultSelector: (left,right) => left

Sort

Here is my Sort attempt. It buffers events into a Dictionary and flushes them as soon as possible to the subscriber:

public static IObservable<TSource> Sort<TSource, TKey>
    (this IObservable<TSource> source,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc)
{
    return Observable.Create<TSource>(o =>
    {
        var nextKey = firstKey;
        var buffer = new Dictionary<TKey, TSource>();
        return source.Subscribe(i =>
        {
            if (keySelector(i).Equals(nextKey))
            {
                nextKey = nextKeyFunc(nextKey);
                o.OnNext(i);
                TSource nextValue;
                while (buffer.TryGetValue(nextKey, out nextValue))
                {
                    buffer.Remove(nextKey);
                    o.OnNext(nextValue);
                    nextKey = nextKeyFunc(nextKey);
                }
            }
            else buffer.Add(keySelector(i), i);
        });
    });
}

I have to say this is a pretty naïve implementation. In production code in the past I have elaborated on this with specific error handling, a fixed-size buffer and time-outs to prevent resource leakage. However, it will do for this example. :)

With this sorted (sorry!), we can now look at handling multiple streams.

Combining Results

First Attempt

My first attempt at this is to produce an unordered stream of events that have been seen the required number of times. This could then be sorted. I do this by grouping elements by key, using GroupByUntil to hold each group until two elements had been captured. Each group is then a stream of results of the same key. For the simple example of integer events, I can just take the last element of each group. However, I don't like this because it's awkward for more real-world scenarios where each result stream may be contributing something useful. I include the code for the sake of interest. Note, so that the tests can be shared between this and my second attempt, I accept an unused resultSelector parameter:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc
     Func<TSource,TSource,TSource> resultSelector)
{
    return left.Merge(right)
               .GroupByUntil(keySelector, x => x.Take(2).LastAsync())
               .SelectMany(x => x.LastAsync())
               .Sort(keySelector, firstKey, nextKeyFunc);
}

Aside: You can hack on the SelectMany clause to decide how to pick results. One advantage this solution has over the second attempt, is that in scenarios with many result streams it is easier to see how to extend it to pick say, the first two out of three result tuples to arrive.

Second Attempt

For this approach I sort each stream independently, and then Zip the results together. Not only is this a far simpler looking operation, it's also far easier to combine results from each stream in interesting ways. To keep the tests compatible with my first approach, I pick the resultSelector function to use the first stream's events as the results, but obviously you have flexibility to do something useful in your scenario:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc,
     Func<TSource, TSource, TSource> resultSelector)
{
    return Observable.Zip(
        left.Sort(keySelector, firstKey, nextKeyFunc),
        right.Sort(keySelector, firstKey, nextKeyFunc),
        resultSelector);
}

Aside: It isn't too hard to see how this code be extended to a more general case accepting any number of input streams, but as alluded to earlier, using Zip makes it is quite inflexible about blocking at a given key until results from all streams are in.

Test Cases

Finally, here are my tests echoing your example scenarios. To run these, import nuget packages rx-testing and nunit and put the implementations above into a static class:

public class ReorderingEventsTests : ReactiveTest
{
    [Test]
    public void ReorderingTest1()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(400, 3),
            OnNext(500, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(500, 4));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left,right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(300, 2),
            OnNext(400, 3),
            OnNext(500, 4));
    }

    [Test]
    public void ReorderingTest2()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(400, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 4),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(400, 1));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left, right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(400, 1),
            OnNext(400, 2),
            OnNext(400, 3),
            OnNext(400, 4));
    }
}

Currying to avoid repetition

Final comment, because I hate repeating myself in code, here's a tweak that avoids the repetitious way I call Sort in the second approach. I've not included it in the main body to avoid confusing readers unfamiliar with currying:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
        IObservable<TSource> right,
        Func<TSource, TKey> keySelector,
        TKey firstKey,
        Func<TKey, TKey> nextKeyFunc,
        Func<TSource, TSource, TSource> resultSelector)
{
    Func<IObservable<TSource>, IObservable<TSource>> curriedSort =
        events => events.Sort(keySelector, firstKey, nextKeyFunc);

    return Observable.Zip(
        curriedSort(left),
        curriedSort(right),
        resultSelector);
}

这篇关于与无扩展事件重新排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆