Rx在不同线程上产生和消耗 [英] Rx produce and consume on different threads

查看:87
本文介绍了Rx在不同线程上产生和消耗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图通过此处的示例代码来简化我的问题。我有一个生产者线程不断地抽取数据,并且我试图在批处理之间延迟时间来批处理它,以便UI有时间来呈现它。但是结果却不是预期的那样,农产品和消费者似乎在同一线程上。



我不希望批处理缓冲区在该线程上休眠生产。尝试 SubscribeOn 并没有太大帮助。我在这里做错什么,如何在生产者和使用者线程上打印不同的线程ID。

  static void Main (string [] args)
{
var stream = new ReplaySubject< int>();

Task.Factory.StartNew(()=>
{
int种子= 1;
而(true)
{
Console.WriteLine(线程{0}生产{1},
Thread.CurrentThread.ManagedThreadId,种子);

stream.OnNext(seed);
seed ++;

Thread.Sleep(TimeSpan.FromMilliseconds(500));
}
});

stream.Buffer(5).Do(x =>
{
Console.WriteLine(线程{0}休眠以在批处理之间创建时间间隔,
Thread.CurrentThread.ManagedThreadId);

Thread.Sleep(TimeSpan.FromSeconds(2));
})
.SubscribeOn(NewThreadScheduler.Default).Subscribe(items) =>
{
foreach(项目中的可变项目)
{
Console.WriteLine(线程{0}消耗{1},
Thread.CurrentThread .ManagedThreadId,item);
}
});
Console.Read();
}


解决方案

了解<$之间的区别c $ c> ObserveOn 和 SubscribeOn 是关键。请参阅- ObserveOn和SubscribeOn-工作所在的位置

此外,您绝对不希望使用 Thread.Sleep 在您的Rx中。或任何地方。曾经 Do 几乎是邪恶的,但是 Thead.Sleep 几乎总是邪恶的。缓冲区具有您要使用的服务器重载-这些重载包括基于时间的重载和接受计数限制时间限制的重载,并且在达到其中任何一个时都将返回缓冲区。基于时间的缓冲将在生产者和使用者之间引入必要的并发性,即在与生产者不同的线程上将缓冲区传递给其订阅者。



在保持消费者响应能力方面进行了很好讨论的问题和答案(在此处使用WPF,但这些要点通常适用)。





上面的最后一个问题专门使用时间基于的缓冲区重载。就像我说的那样,在呼叫链中使用 Buffer ObserveOn 可以使生产者和消费者之间增加并发性。您仍然需要注意缓冲区的处理速度仍然足够快,以至于不会在缓冲区订阅者上建立队列。



如果确实建立了队列上,您需要考虑施加反压,删除更新和/或合并更新的方法。这些主题太广泛了,无法在此处进行深入讨论-但基本上您也可以:





首先查看适当的缓冲是否有帮助,然后考虑在源处对事件进行限制/合并(一个UI只能显示大量信息),然后考虑更智能的合并,因为这可能变得非常复杂。 https://github.com/AdaptiveConsulting/ReactiveTrader 是使用高级合并的项目的一个很好的例子技术。


I have tried to simplify my issue by a sample code here. I have a producer thread constantly pumping in data and I am trying to batch it with a time delay between batches so that the UI has time to render it. But the result is not as expected, the produce and consumer seems to be on the same thread.

I don't want the batch buffer to sleep on the thread that is producing. Tried SubscribeOn did not help much. What am I doing wrong here, how do I get this to print different thread Ids on producer and consumer thread.

static void Main(string[] args)
{
    var stream = new ReplaySubject<int>();

    Task.Factory.StartNew(() =>
    {
        int seed = 1;
        while (true)
        {
            Console.WriteLine("Thread {0} Producing {1}",
                Thread.CurrentThread.ManagedThreadId, seed);

            stream.OnNext(seed);
            seed++;

            Thread.Sleep(TimeSpan.FromMilliseconds(500));
         }
    });

    stream.Buffer(5).Do(x =>
    {
        Console.WriteLine("Thread {0} sleeping to create time gap between batches",
            Thread.CurrentThread.ManagedThreadId);

        Thread.Sleep(TimeSpan.FromSeconds(2));
    })
    .SubscribeOn(NewThreadScheduler.Default).Subscribe(items =>
    {
        foreach (var item in items)
        {
            Console.WriteLine("Thread {0} Consuming {1}",
                Thread.CurrentThread.ManagedThreadId, item);
        }
    });
    Console.Read();
}

解决方案

Understanding the difference between ObserveOn and SubscribeOn is key here. See - ObserveOn and SubscribeOn - where the work is being done for an in depth explanation of these.

Also, you absolutely don't want to use a Thread.Sleep in your Rx. Or anywhere. Ever. Do is almost as evil, but Thead.Sleep is almost always totally evil. Buffer has serveral overloads you want to use instead - these include a time based overload and an overload that accepts a count limit and a time-limit, returning a buffer when either of these are reached. A time-based buffering will introduce the necessary concurrency between producer and consumer - that is, deliver the buffer to it's subscriber on a separate thread from the producer.

Also see these questions and answers which have good discussions on keeping consumers responsive (in the context of WPF here, but the points are generally applicable).

The last question above specifically uses the time-based buffer overload. As I said, using Buffer or ObserveOn in your call chain will allow you to add concurrency between producer and consumer. You still need to take care that the processing of a buffer is still fast enough that you don't get a queue building up on the buffer subscriber.

If queues do build up, you'll need to think about means of applying backpressure, dropping updates and/or conflating the updates. These is a big topic too broad for in depth discussion here - but basically you either:

See if proper buffering helps first, then think about throttling/conflating events at the source as (a UI can only show so much infomation anway) - then consider smarter conflation as this can get quite complex. https://github.com/AdaptiveConsulting/ReactiveTrader is a good example of a project using some advanced conflation techniques.

这篇关于Rx在不同线程上产生和消耗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆