.NET中的反应式Rx Zip队列 [英] Reactive Rx zip queue in .Net

查看:40
本文介绍了.NET中的反应式Rx Zip队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对反应式编程的概念相当陌生。我使用的是Bonsai,它通过c#公开了一些但不是所有的.Net RX命令。

我正在尝试获得如下大理石图所示的行为:

input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e
基本上,输入2生成应该存储在队列中的事件波。 输入%1充当从该队列发出单个项目的触发器。

当队列为空时,应该发出队列的最后一项。 我尝试了Zip和comineLatest的各种组合,但无法获得所需的行为。

我还尝试了基于this postWithLatestFrom实现,但回想起来,我意识到这也不会产生所需的行为。

public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
            IObservable<TSource> source,
            IObservable<TOther> other)
        {


            // return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
            return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
        }

是否有任何运算符或运算符组合会产生此行为?一旦我了解了要使用哪些运算符,我就可以对盆景进行实现。

更新1:2018/05/18

基于Sentinel的帖子,我在Bonsai命名空间中编写了一个新类DiscriminatedUnion。不过,我没有设法指定适当的类型。编译器声明"无法推断Merge的类型参数"(在.Merge(input1.Select...中)。 我应该在哪里添加正确的类型规范?

using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;    
namespace Bonsai.Reactive
{
    [Combinator]
   // [XmlType(Namespace = Constants.XmlNamespace)]
    [Description("Implementation of Discriminated Union")]
    public class DiscriminatedUnion
    {
        public IObservable<int?> Process<TInput1, TInput2>(
           IObservable<TInput1> input1,
            IObservable<TInput2> input2)
        {
            var merged =
                        input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
                        .Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
                        .Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
                        {
                            int? next = state.Item1;
                            if (val.Item1 == 1)
                            {
                                if (state.Item2.Count > 0)
                                {
                                    next = state.Item2.Dequeue();
                                }
                            }
                            else
                            {
                                state.Item2.Enqueue(val.Item2);
                            }
                            return Tuple.Create(next, state.Item2, val.Item1);
                        })
                        .Where(x => (x.Item1 != null && x.Item3 == 1))
                        .Select(x => x.Item1);
            return merged;
        }
    }
}

推荐答案

以下是您的问题的可测试表示(或大理石图),使用NuGet包Microsoft.Reactive.Testing

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

哪个使用此扩展方法:

public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

这个问题基本上是一个状态机问题,涉及两个不同类型的可观测对象。解决这个问题的最好方法是使用Discriminated Union类型,这在C#中是不存在的,所以我们将创建一个。@Sentinel的回答使用了Tuple,这也是可行的:

public class DUnion<T1, T2>
{
    public DUnion(T1 t1) 
    { 
        Type1Item = t1;
        Type2Item = default(T2);
        IsType1 = true;
    }

    public DUnion(T2 t2) 
    { 
        Type2Item = t2;
        Type1Item = default(T1);
        IsType1 = false;
    }

    public bool IsType1 { get; }
    public bool IsType2 => !IsType1;

    public T1 Type1Item { get; }
    public T2 Type2Item { get; }
}
然后,我们可以将两个不同类型的流SelectMerge放入一个有区别的联合流中,在其中我们可以使用Scan管理状态。您的状态逻辑有点棘手,但可行:

  • 如果有号码到达,但队列中没有项目,则不执行任何操作
  • 如果到达一个数字并且队列中有项目,则发出队列中的第一个项目。
    • 如果有多个项目,请从队列中删除最近的发射。
    • 如果队列只有一项,请不要将其移除,并进入"假空"状态。
  • 如果字符串到达,则将其放入队列。
    • 如果队列为‘FAKE-EMPTY’,则弹出最后一个项目并退出‘FAKE-EMPTY’状态。

以下是生成的观察值(使用NuGet包System.Collections.Immutable):

var result = input1.Select(i => new DUnion<int, string>(i))
    .Merge(input2.Select(s => new DUnion<int, string>(s)))
    .Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
        ? state.queue.IsEmpty   
            ? (state.queue, null, false, false)     //Is integer, but empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
                ? (state.queue,           state.queue.Peek(), true,  true)
                : (state.queue.Dequeue(), state.queue.Peek(), false, true)
        : state.isFakeEmptyState //Is new string, just add to queue, don't emit
            ? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false) 
            : (state.queue.Enqueue(dItem.Type2Item),   (string)null, false, false) 
    )
    .Where(t => t.emit)
    .Select(t => t.item);

然后可以按如下方式进行测试:

var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.

更新:我稍微考虑了一下这个问题,我认为使用一些运算符绕过区别对待的Union功能是有意义的。这样您就不必显式地处理类型:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2)
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
        {
            return source.Scan(initialState, (state, u) => u.IsType1
                ? type1Handler(state, u.Type1Item)
                : type2Handler(state, u.Type2Item)
            );
        }
}

使用这些扩展方法,解决方案更改为以下内容,我认为这样读起来更好:

var result = input1
    .Union(input2)
    .ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), 
        (state, _) => state.queue.IsEmpty
            ? (state.queue, null, false, false)     //empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : (state.queue.Dequeue(), state.queue.Peek(), false, true),
        (state, s) => state.isFakeEmptyState 
            ? (state.queue.Dequeue().Enqueue(s), null, false, false)
            : (state.queue.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.emit)
    .Select(t => t.item); 

如果您在使用命名元组语法时遇到问题,则可以使用旧的元组:

var result = input1
    .Union(input2)
    .ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
        (state, _) => state.Item1.IsEmpty
            ? Tuple.Create(state.Item1, (string)null, false, false)     //empty queue, so don't emit item
            : state.Item1.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
        (state, s) => state.Item3
            ? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
            : Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.Item4)
    .Select(t => t.Item2);

这篇关于.NET中的反应式Rx Zip队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆