ValveSubject:排队主题为接收具有内置缓冲,打开/关闭操作 [英] ValveSubject: a queuing subject for Rx with built-in buffering, open/close operations

查看:114
本文介绍了ValveSubject:排队主题为接收具有内置缓冲,打开/关闭操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我经常碰到,我需要某种形式的阀门结构来控制反应管道的流量情况。通常,在基于网络的应用,本人不得不打开/根据连接状态关闭请求流的要求。

I have often run into situations where I need some sort of valve construct to control the flow of a reactive pipeline. Typically, in a network-based application I have had the requirement to open/close a request stream according to the connection state.

这阀主体应该支持打开/关闭该流,并且在FIFO顺序输出交付。当阀门关闭时的输入值应该进行缓冲。

This valve subject should support opening/closing the stream, and output delivery in FIFO order. Input values should be buffered when the valve is closed.

A ConcurrentQueue BlockingCollection 通常用在这样的情况下,但立即引入穿入图象。我一直在寻找一个纯粹的反应解决了这一问题。

A ConcurrentQueue or BlockingCollection are typically used in such scenarios, but that immediately introduces threading into the picture. I was looking for a purely reactive solution to this problem.

推荐答案

下面是主要依据的实施缓冲() BehaviorSubject 。行为主体跟踪阀门的开/关状态。阀的开口开始缓冲的窗户,并且阀的倒闭关闭这些窗口。缓冲算的输出是重新注入到输入(这样,即使观察者本身可以关闭阀门):

Here's an implementation mainly based on Buffer() and BehaviorSubject. The behavior subject tracks the open/close state of the valve. Openings of the valve start buffering windows, and closings of the valve close those windows. Output of the buffer operator is "re-injected" onto the input (so that even observers themselves can close the valve):

/// <summary>
/// Subject offering Open() and Close() methods, with built-in buffering.
/// Note that closing the valve in the observer is supported.
/// </summary>
/// <remarks>As is the case with other Rx subjects, this class is not thread-safe, in that
/// order of elements in the output is indeterministic in the case of concurrent operation 
/// of Open()/Close()/OnNext()/OnError(). To guarantee strict order of delivery even in the 
/// case of concurrent access, <see cref="ValveSubjectExtensions.Synchronize{T}(NEXThink.Finder.Utils.Rx.IValveSubject{T})"/> can be used.</remarks>
/// <typeparam name="T">Elements type</typeparam>
public class ValveSubject<T> : IValveSubject<T>
{
    private enum Valve
    {
        Open,
        Closed
    }

    private readonly Subject<T> input = new Subject<T>();
    private readonly BehaviorSubject<Valve> valveSubject = new BehaviorSubject<Valve>(Valve.Open);
    private readonly Subject<T> output = new Subject<T>();

    public ValveSubject()
    {
        var valveOperations = valveSubject.DistinctUntilChanged();
        input.Buffer(
            bufferOpenings: valveOperations.Where(v => v == Valve.Closed),
            bufferClosingSelector: _ => valveOperations.Where(v => v == Valve.Open))
            .SelectMany(t => t).Subscribe(input);
        input.Where(t => valveSubject.Value == Valve.Open).Subscribe(output);
    }

    public bool IsOpen
    {
        get { return valveSubject.Value == Valve.Open; }
    }

    public bool IsClosed
    {
        get { return valveSubject.Value == Valve.Closed; }
    }

    public void OnNext(T value)
    {
        input.OnNext(value);
    }

    public void OnError(Exception error)
    {
        input.OnError(error);
    }

    public void OnCompleted()
    {
        output.OnCompleted();
        input.OnCompleted();
        valveSubject.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return output.Subscribe(observer);
    }

    public void Open()
    {
        valveSubject.OnNext(Valve.Open);
    }

    public void Close()
    {
        valveSubject.OnNext(Valve.Closed);
    }
}

public interface IValveSubject<T>:ISubject<T>
{
    void Open();

    void Close();
}



冲洗出油阀的其他方法可以在时间,例如有用消除剩余的请求时,他们不再相关。下面是建立在一个先例实施的适配器型的:

/// <summary>
/// Subject with same semantics as <see cref="ValveSubject{T}"/>, but adding flushing out capability 
/// which allows clearing the valve of any remaining elements before closing.
/// </summary>
/// <typeparam name="T">Elements type</typeparam>
public class FlushableValveSubject<T> : IFlushableValveSubject<T>
{
    private readonly BehaviorSubject<ValveSubject<T>> valvesSubject = new BehaviorSubject<ValveSubject<T>>(new ValveSubject<T>());

    private ValveSubject<T> CurrentValve
    {
        get { return valvesSubject.Value; }
    }

    public bool IsOpen
    {
        get { return CurrentValve.IsOpen; }
    }

    public bool IsClosed
    {
        get { return CurrentValve.IsClosed; }
    }

    public void OnNext(T value)
    {
        CurrentValve.OnNext(value);
    }

    public void OnError(Exception error)
    {
        CurrentValve.OnError(error);
    }

    public void OnCompleted()
    {
        CurrentValve.OnCompleted();
        valvesSubject.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return valvesSubject.Switch().Subscribe(observer);
    }

    public void Open()
    {
        CurrentValve.Open();
    }

    public void Close()
    {
        CurrentValve.Close();
    }

    /// <summary>
    /// Discards remaining elements in the valve and reset the valve into a closed state
    /// </summary>
    /// <returns>Replayable observable with any remaining elements</returns>
    public IObservable<T> FlushAndClose()
    {
        var previousValve = CurrentValve;
        valvesSubject.OnNext(CreateClosedValve());
        var remainingElements = new ReplaySubject<T>();
        previousValve.Subscribe(remainingElements);
        previousValve.Open();
        return remainingElements;
    }

    private static ValveSubject<T> CreateClosedValve()
    {
        var valve = new ValveSubject<T>();
        valve.Close();
        return valve;
    }
}

public interface IFlushableValveSubject<T> : IValveSubject<T>
{
    IObservable<T> FlushAndClose();
}



由于在评论中提到,这些问题都没有线程安全这个意义上,传递顺序并发操作的情况下,不再保证。以类似的方式为标准Rx 主题存在什么 Subject.Synchronize() https://msdn.microsoft.com/en-us/library/ hh211643%28V = vs.103%29.aspx ),我们可以介绍一些扩展,并提供阀周围的锁定:

As mentioned in the comment, these subjects are not "thread-safe" in the sense that order of delivery is no longer guaranteed in the case of concurrent operation. In a similar fashion as what exists for the standard Rx Subject, Subject.Synchronize() (https://msdn.microsoft.com/en-us/library/hh211643%28v=vs.103%29.aspx) we can introduce some extensions which provide locking around the valve:

public static class ValveSubjectExtensions
{
    public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve)
    {
        return Synchronize(valve, new object());
    }

    public static IValveSubject<T> Synchronize<T>(this IValveSubject<T> valve, object gate)
    {
        return new SynchronizedValveAdapter<T>(valve, gate);
    }

    public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve)
    {
        return Synchronize(valve, new object());
    }

    public static IFlushableValveSubject<T> Synchronize<T>(this IFlushableValveSubject<T> valve, object gate)
    {
        return new SynchronizedFlushableValveAdapter<T>(valve, gate);
    }
}

internal class SynchronizedValveAdapter<T> : IValveSubject<T>
{
    private readonly object gate;
    private readonly IValveSubject<T> valve;

    public SynchronizedValveAdapter(IValveSubject<T> valve, object gate)
    {
        this.valve = valve;
        this.gate = gate;
    }

    public void OnNext(T value)
    {
        lock (gate)
        {
            valve.OnNext(value);    
        }
    }

    public void OnError(Exception error)
    {
        lock (gate)
        {
            valve.OnError(error);
        }
    }

    public void OnCompleted()
    {
        lock (gate)
        {
            valve.OnCompleted();
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return valve.Subscribe(observer);
    }

    public void Open()
    {
        lock (gate)
        {
            valve.Open();
        }
    }

    public void Close()
    {
        lock (gate)
        {
            valve.Close();
        }
    }
 }

 internal class SynchronizedFlushableValveAdapter<T> : SynchronizedValveAdapter<T>, IFlushableValveSubject<T>
 {
    private readonly object gate;
    private readonly IFlushableValveSubject<T> valve;

    public SynchronizedFlushableValveAdapter(IFlushableValveSubject<T> valve, object gate)
        : base(valve, gate)
    {
        this.valve = valve;
        this.gate = gate;
    }

    public IObservable<T> FlushAndClose()
    {
        lock (gate)
        {
            return valve.FlushAndClose();
        }
    }
} 

这篇关于ValveSubject:排队主题为接收具有内置缓冲,打开/关闭操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆