使用 Rx 合并历史和实时股票价格数据 [英] Merging historical and live stock price data with Rx

查看:45
本文介绍了使用 Rx 合并历史和实时股票价格数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试 Rx,因为它看起来很适合我们的领域,但学习曲线让我感到惊讶.

I'm trying out Rx because it seems like a good fit for our domain but the learning curve has taken me by surprise.

我需要将历史价格数据与实时价格数据结合起来.

I need to knit together historical price data with live price data.

我正在尝试将执行此操作的常用方法调整为 Rx 语言:

I'm trying to adapt the usual approach to doing this into the language of Rx:

  1. 立即订阅实时价格并开始缓冲我返回的值
  2. 发起对历史价格数据的请求(这需要在订阅实时价格之后发生,这样我们的数据中就没有任何差距)
  3. 发布历史价格回来
  4. 在我们收到所有历史数据后,发布缓冲的实时数据,删除一开始与历史数据重叠的任何值
  5. 继续重放实时价格 Feed 中的数据

我有这个恶心且不正确的稻草人代码,它似乎适用于我编写的幼稚测试用例:

I have this disgusting and incorrect straw man code which seems to work for the naive test cases I've written:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}

这有一些缺点

  1. 不知道合适的重放缓冲区大小.设置无限缓冲区是不可能的 - 这是一个长时间运行的序列.我们真的想要某种一次性缓冲区,在第一次调用 Subscribe 时刷新.如果这在 Rx 中存在,我找不到它.
  2. 即使我们切换到发布实时价格,重播缓冲区也将继续存在.我们此时不需要缓冲区.
  3. 同样,一旦我们跳过了历史价格和实时价格之间的初始重叠,就不需要过滤重叠分时的谓词.我真的很想做这样的事情:live.SkipWhile(tick =>tick.Timestamp </* 延迟获取历史数据中的最后一个时间戳 */).Wait(this IObservable) 在这里有用吗?
  1. The appropriate replay buffer size is not known. Setting an unlimited buffer isn't possible- this is a long-running sequence. Really we want some kind of one-time buffer that flushes on the first call to Subscribe. If this exists in Rx, I can't find it.
  2. The replay buffer will continue to exist even once we've switched to publishing live prices. We don't need the buffer at this point.
  3. Similarly, the predicate to filter out overlapping ticks isn't necessary once we've skipped the initial overlap between historical and live prices. I really want to do something like: live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */). Is Wait(this IObservable<TSource>) useful here?

一定有更好的方法来做到这一点,但我仍在等待我的大脑像 FP 一样理解 Rx.

There must be a better way to do this, but I'm still waiting for my brain to grok Rx like it does FP.

我考虑过的另一个解决方案 1. 是编写我自己的 Rx 扩展,它是一个 ISubject 将消息排队,直到它获得第一个订阅者(然后拒绝订阅者?).也许这就是要走的路?

Another option I've considered to solve 1. is writing my own Rx extension which would be an ISubject that queues messages until it gets its first subscriber (and refuses subscribers after that?). Maybe that's the way to go?

推荐答案

为了记录,这是我最后所做的.我仍然是一个 Rx 学习者,回到 .Net 上次看到它是在 2.0 版.非常感谢收到所有反馈.

For the record, here's what I did in the end. I'm still very much an Rx learner, and returning to .Net having last seen it at version 2.0. All feedback is very gratefully received.

下面使用的 Ticks 对象可能包含一个或多个刻度值.历史数据服务以多个 Tick 返回数据.

The Ticks object used below may contain one or more tick values. The historical data service returns data in several Ticks.

public class HistoricalAndLivePriceFeed : IPriceFeed
{
    private readonly IPriceFeed history;
    private readonly IPriceFeed live;
    private readonly IClock clock;

    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live)
:            this(history, live, new RealClock())
        {
    }
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock)
    {
        this.history = history;
        this.live = live;
        this.clock = clock;
    }

    public IObservable<Ticks> For(DateTime since, ISymbol symbol)
    {
        return Observable.Create<Ticks>(observer =>
        {
            var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol));

            var definitelyInHistoricalTicks = clock.Now;
            // Sleep to make sure that historical data overlaps our live data
            // If we ever use a data provider with less fresh historical data, we may need to rethink this
            clock.Wait(TimeSpan.FromSeconds(1));

            var liveStreamAfterEndOfHistoricalTicks = liveStream
               .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks)
               .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1));

            var subscription = history.For(since, symbol)
               .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1))
               .Concat(liveStreamAfterEndOfHistoricalTicks)
               .Subscribe(observer);

            return liveStream.And(subscription);
        });
    }
}
public static class CompositeDisposableExtensions
{
    public static CompositeDisposable And(this IDisposable disposable, Action action)
    {
        return And(disposable, Disposable.Create(action));
    }

    public static CompositeDisposable And(this IDisposable disposable, IDisposable other)
    {
        return new CompositeDisposable(disposable, other);
    }
}

使用这个 Rx 代码,我仍然不太信任:

Which uses this Rx code, which I still don't quite trust:

using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;

namespace My.Rx
{
    /// <summary>
    /// Buffers values from an underlying observable when no observers are subscribed.
    /// 
    /// On Subscription, any buffered values will be replayed.
    /// 
    /// Only supports one observer for now.
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods
    /// are hidden. It is not intended that Buffer should be used as an IObserver,
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed.
    /// </summary>
    /// <typeparam name="TSource"></typeparam>
    public class Buffer<TSource> : ISubject<TSource>, IDisposable
    {
        private readonly object gate = new object();
        private readonly Queue<TSource> queue = new Queue<TSource>();

        private bool isDisposed;
        private Exception error;
        private bool stopped;
        private IObserver<TSource> observer = null;
        private IDisposable subscription;

        public static Buffer<TSource> StartBuffering(IObservable<TSource> observable)
        {
            return new Buffer<TSource>(observable);
        }

        private Buffer(IObservable<TSource> observable)
        {
            subscription = observable.Subscribe(this);
        }

        void IObserver<TSource>.OnNext(TSource value)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    queue.Enqueue(value);
                else
                    observer.OnNext(value);
            }
        }

        void IObserver<TSource>.OnError(Exception error)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    this.error = error;
                else
                    observer.OnError(error);
                stopped = true;
            }
        }

        void IObserver<TSource>.OnCompleted()
        {
            lock (gate)
            {
                stopped = true;
            }
        }

        public IDisposable Subscribe(IObserver<TSource> observer)
        {
            lock (gate)
            {
                if (isDisposed)
                    throw new ObjectDisposedException(string.Empty);

                if (this.observer != null)
                    throw new NotImplementedException("A Buffer can currently only support one observer at a time");

                while(!queue.IsEmpty())
                {
                    observer.OnNext(queue.Dequeue());
                }

                if (error != null)
                    observer.OnError(error);
                else if (stopped)
                    observer.OnCompleted();

                this.observer = observer;
                return Disposable.Create(() =>
                                             {
                                                 lock (gate)
                                                 {
                                                                             // Go back to buffering
                                                     this.observer = null;
                                                 }
                                             });
            }
        }

        private bool IsBuffering
        {
            get { return observer == null; }
        }


        public void Dispose()
        {
            lock (gate)
            {
                subscription.Dispose();

                isDisposed = true;
                subscription = null;
                observer = null;
            }
        }
    }
}

通过了这些测试(我还没有费心检查线程安全):

Which passes these tests (I haven't bothered checking thread safety yet):

private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world");

[Test]
public void ReplaysBufferedValuesToFirstSubscriber()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnNext(2);

    var observed = new List<int>();

    buffer.Subscribe(Observer.Create<int>(observed.Add));

    Assert.That(observed, Is.EquivalentTo(new []{1,2}));
}

[Test]
public void PassesNewValuesToObserver()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    var observed = new List<int>();
    buffer.Subscribe(Observer.Create<int>(observed.Add));

    underlying.OnNext(1);
    underlying.OnNext(2);

    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 }));
}


[Test]
public void DisposesOfSubscriptions()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    var observed = new List<int>();

    buffer.Subscribe(Observer.Create<int>(observed.Add))
        .Dispose();

    underlying.OnNext(1);

    Assert.That(observed, Is.Empty);
}

[Test]
public void StartsBufferingAgainWhenSubscriptionIsDisposed()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    // These should be buffered
    underlying.OnNext(1);
    underlying.OnNext(2);

    var firstSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add)))
    {
        // Should be passed through to first subscription
        underlying.OnNext(3);
    }
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 }));

    // First subscription has been disposed-
    // we should be back to buffering again
    underlying.OnNext(4);
    underlying.OnNext(5);

    var secondSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add)))
    {
        // Should be passed through to second subscription
        underlying.OnNext(6);
    }
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6}));
}

[Test]
public void DoesNotSupportTwoConcurrentObservers()
{
    // Use .Publish() if you need to do this

    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    buffer.Subscribe(Observer.Create<int>(i => { }));

    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}

[Test]
public void CannotBeUsedAfterDisposal()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();

    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}

[Test]
public void ReplaysBufferedError()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);

    var observed = new List<int>();
    Exception foundException = null;
    buffer.Subscribe(
        observed.Add, 
        e => foundException = e);

    Assert.That(observed, Is.EquivalentTo(new []{1}));
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying));
}

[Test]
public void ReplaysBufferedCompletion()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnCompleted();

    var observed = new List<int>();
    var completed = false;
    buffer.Subscribe(
        observed.Add,
        () => completed=true);

    Assert.That(observed, Is.EquivalentTo(new[] { 1 }));
    Assert.True(completed);
}

[Test]
public void ReplaysBufferedErrorToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);

    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ;

    var observered = new List<int>();
    Exception exceptionEncountered = null;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e)));

    Assert.That(observered, Is.Empty);
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying));
}

[Test]
public void ReplaysBufferedCompletionToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);

    underlying.OnNext(1);
    underlying.OnCompleted();

    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ;

    var observered = new List<int>();
    var completed = false;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, ()=>completed=true)));

    Assert.That(observered, Is.Empty);
    Assert.True(completed);
}



[Test]
public void DisposingOfBufferDisposesUnderlyingSubscription()
{
    var underlyingSubscriptionWasDisposed = false;
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed=  true   ));

    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();

    Assert.True(underlyingSubscriptionWasDisposed);
}

这篇关于使用 Rx 合并历史和实时股票价格数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆