订阅具有异步功能的可观察序列 [英] Subscribing to observable sequence with async function

查看:72
本文介绍了订阅具有异步功能的可观察序列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 asnyc 函数,我想以 IObservable 顺序在每次观察中调用该函数,从而一次只能将事件传递给一个事件.消费者期望在飞行中不超过一条消息.如果我正确理解的话,这也是RX合同.

I have an asnyc function that I want to invoke on every observation in an IObservable sequence, limiting delivery to one event at a time. The consumer expects no more than one message in flight; and this is also the RX contract, if I understand it correctly.

请考虑以下示例:

static void Main() {
  var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
  //var d = ob.Subscribe(async x => await Consume(x));  // Does not rate-limit.
  var d = ob.Subscribe(x => Consume(x).Wait());
  Thread.Sleep(10000);
  d.Dispose();
}

static async Task<Unit> Consume(long count) {
  Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
  await Task.Delay(750);
  Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
  return Unit.Default;
}

Consume 函数伪造了750毫秒的处理时间,而 ob 每100毫秒产生一次事件.上面的代码有效,但是在随机线程上调用 task.Wait().如果我改为在注释行3中订阅,则以 ob 产生事件的速率调用 Consume (而且我什至无法查看的重载情况)> Subscribe 我正在此注释语句中使用,因此可能是无稽之谈.

The Consume function fakes a 750 ms processing time, and ob produces events every 100 ms. The code above works, but calls task.Wait() on a random thread. If I instead subscribe as in the commented out line 3, then Consume is invoked at the same rate at which ob produces events (and I cannot even grok what overload of Subscribe I am using in this commented statement, so it is probably nonsense).

那么我如何一次将可观察序列中的一个事件正确地传递给 async 函数?

So how do I correctly deliver one event at a time from an observable sequence to an async function?

推荐答案

订户不应长时间运行,因此不支持在订阅处理程序中执行长时间运行的异步方法.

Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.

相反,应将异步方法视为可从另一个序列获取值的单个值可观察序列.现在您可以编写序列,这是Rx设计要做的.

Instead, consider your async method to be a single value observable sequence that takes a value from another sequence. Now you can compose sequences, which is what Rx was designed to do.

现在,您已经取得了飞跃,您可能会拥有类似@Reijher在

Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.

他的代码分解如下.

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
      //Project the event you receive, into the result of the async method
      .Select(l => Observable.FromAsync(() => asyncMethod(l)))
      //Ensure that the results are serialized
      .Concat()
      //do what you will here with the results of the async method calls
      .Subscribe();

在这种情况下,您正在创建隐式队列.在生产者比消费者快的任何问题中,都需要在等待时使用队列来收集值.就我个人而言,我更喜欢通过将数据放入队列来使其明确.另外,您可以显式地使用Scheduler来发出信号,该信号应该是应该吸收松弛的线程模型.

In this scenario, you are creating implicit queues. In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting. Personally I prefer to make this explicit by putting data into a queue. Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.

对于Rx新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步).出于许多原因,指南不将其放入您的订户中,例如:1.打破错误模型2.您正在混合异步模型(rx在这里,任务在那里)3.订阅是异步序列组成的使用者.异步方法只是单个值序列,因此,由于该视图不能成为序列的结尾,因此结果可能是

This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers. There are many reasons that the guidance is to not put them in your subscriber, for example: 1. you break the error model 2. you are mixing async models (rx here, task there) 3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.

更新

为了说明有关破坏错误模型的注释,这里是对OP示例的更新.

To illustrate the comment about breaking the error model here is an update of the OP sample.

void Main()
{
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var d = ob.Subscribe(
        x => ConsumeThrows(x).Wait(),
        ex=> Console.WriteLine("I will not get hit"));

    Thread.Sleep(10000);
    d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
    return await Task.FromException<Unit>(new Exception("some failure"));
    //this will have the same effect of bringing down the application.
    //throw new Exception("some failure");
}

在这里我们可以看到,如果要抛出 OnNext 处理程序,那么我们将不受Rx OnError 处理程序的保护.该异常将无法处理,很可能会导致应用程序崩溃.

Here we can see that if the OnNext handler was to throw, then we are not protected by our Rx OnError handler. The exception would be unhandled and most likely bring down the application.

这篇关于订阅具有异步功能的可观察序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆