OnNext中的异步代码将破坏Rx合约 [英] Asynchronous code in OnNext will break the Rx contract

查看:75
本文介绍了OnNext中的异步代码将破坏Rx合约的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在订阅执行异步代码时,我无法找到保持订阅同步的异步方式。


基本上,我想使用和组合Rx很多(标准)异步/等待代码,我真的希望有一个一流的方法来做到这一点。 Console.WriteLine和Thread.Sleep的例子很不错,但现在Rx如何真正适应.NET 4.5 ???


因为我确定可以使用Task.WaitAll,但是失败了我的代码的异步性,最终会导致过多的线程被阻塞的旧问题!线程执行被阻塞的线程应该做的工作。


这里有一些代码来展示我正在尝试做什么。它产生2个观察值,间隔1秒,每个需要2秒钟的处理,在观察物品时以及开始/停止处理时写入控制台。



static void Main(string [] args){
$
    Task .WaitAll(TestObservable());

}

静态异步任务TestObservable(){

    var stopwatch = new秒表();

    Action< string,long> writeline = (格式,价值)=>


     Console.WriteLine(" {1,6:0.00}:" + format,value,stopwatch.Elapsed.TotalSeconds);
$
    stopwatch.Start();

    var subject = new主题< long>();

    Observable.Interval(TimeSpan.FromSeconds(1))

       .Do(value => writeline(" Emitting {0}",value))

   ;     .Take(2)

       .Subscribe(_ => subject.OnNext(_));

    var observable = subject.AsObservable();

    using(observable.Subscribe(async value => {

       writeline(" Start Handling {0}",value);

       ;  await Task.Delay(TimeSpan.FromSeconds(2));

       writeline(" Stop Handling {0}",value) ;

   })){

&nb sp;   等待Task.Run(()=> Console.ReadKey(true));

   }

}


以下是它写入控制台的内容:


  1.13:发放0

  1.13:开始处理0

  2.13:发出1 $
  2.13:开始处理1

  3.14:停止处理0

  4.14:停止处理1


在阅读有关Rx合同的同时,我预计会有一种方法可以产生以下输出(参见Reactive Extensions Design Guidelines 4.2)


  1.15:发送0

  1.16:开始处理0

  3.16:停止处理0

  3.16:发出1 $
  3.16:开始处理1

  5.16:停止处理1


即使我不介意(或多或少地预期)第二个发射线在秒表开始后的2秒附近发生。


解决方案

我看到你在这里想做什么,我想我理解你的困惑。 TPL和Rx的交集是一个棘手的问题(我必须承认一个我还没有完全掌握的)。


为了减少你的困惑,首先要清理一些事情。 / p>

1。您的样本中不需要主题。让我们删除它。

 // LINQPad查询示例
void Main()
{
Task.WaitAll(TestObservable()) ;
}

//在这里定义其他方法和类
static async任务TestObservable(){
var stopwatch = new Stopwatch();
Action< string,long> writeline =(format,value)=>
Console.WriteLine(" {1,6:0.00}:" + format,value,stopwatch.Elapsed.TotalSeconds);
stopwatch.Start();
var observable = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(value => writeline(" Emitting {0}",value))
.Take( 2);

使用(observable.Subscribe(async value => {
writeline("Start Handling {0}",value);
await Task.Delay(TimeSpan。 FromSeconds(2));
writeline("Stop Handling {0}",value);
})){
await Task.Run(()=> Console.Read ());
}
}

2。接下来让我们删除subscribe方法中的混合隐喻。由于OnNext调用是序列化的,因此在其中添加async会破坏该范例。这里我们使OnNext处理程序同步

 void Main()
{
Task.WaitAll(TestObservable());
}

//在这里定义其他方法和类
static async任务TestObservable(){
var stopwatch = new Stopwatch();
Action< string,long> writeline =(format,value)=>
Console.WriteLine(" {1,6:0.00}:" + format,value,stopwatch.Elapsed.TotalSeconds);
stopwatch.Start();
var observable = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(value => writeline(" Emitting {0}",value))
.Take( 2);

使用(observable.Subscribe(value => {
writeline(" Start Handling {0}",value);
Task.Delay(TimeSpan.FromSeconds( 2));
writeline("Stop Handling {0}",value);
})){
await Task.Run(()=> Console.Read() );
}
}

现在我们得到你正在寻找的输出,但我们阻止而不是很好地利用TPL。在这里,我认为您希望通过ToObservable()运算符桥接到Rx。


3。利用ToObservable和SelectMany来序列化但不刻录线程。


这里我还添加了一个我发现在我的应用程序中非常有用的Log扩展方法的变体。抱歉,它会搞砸你的输出。

 void Main()
{
Task.WaitAll(TestObservable()) ;
}

//在这里定义其他方法和类
static async任务TestObservable(){
var stopwatch = new Stopwatch();
Action< string,long> writeline =(format,value)=>
Console.WriteLine(" {1,6:0.00}:" + format,value,stopwatch.Elapsed.TotalSeconds);
stopwatch.Start();
var observable = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(value => writeline(" Emitting {0}",value))
.Take( 2)
.SelectMany(i => Task.Delay(TimeSpan.FromSeconds(2))
.ToObservable()
.Log(string.Format(" Task.Delay {0 }",i))的);

使用(observable.Subscribe(value => {
writeline(" done {0}",0);
}))
{
await Task.Run(()=> Console.Read());
}
}
公共静态类LoggerExtensions
{
public static IObservable< T> Log< T>(此IObservable< T>源,字符串名称)
{
return Observable.Using(
()=> new Timer(name),
timer = > Observable.Create< T>(
o =>
{
Console.WriteLine(" {0} .Subscribe()",name);
var subscription = source
.Do(
i => Console.WriteLine(" {0} .OnNext({1})",name,i),
ex => Console。 WriteLine(" {0} .OnError({1})",name,ex),
()=> Console.WriteLine(" {0} .OnCompleted()",name))
。订阅(o);
var dispos = Disposable.Create(()=> Console.WriteLine(" {0} .Dispose()",name));
返回new CompositeDisposable(订阅,处理);
})
);
}

私人密封类计时器:IDisposable
{
private readonly string _name;
private readonly秒表_stopwatch;

public Timer(字符串名称)
{
_name = name;
_stopwatch = Stopwatch.StartNew();
}

public void Dispose()
{
_stopwatch.Stop();
Console.WriteLine(" {0} take {1}",_ name,_stopwatch.Elapsed);
}
}
}

输出:

 1.02:发出0 
Task.Delay 0.Subscribe()
2.02:发送1
Task.Delay 1.Subscribe()
Task.Delay 0.OnNext(())
3.03:done 0
Task.Delay 0.OnCompleted()
Task.Delay 0.Dispose()
Task.Delay 0花了00:00:02.0006715
Task.Delay 1.OnNext(())
4.03:done 0
Task.Delay 1.OnCompleted()
Task.Delay 1.Dispose()
Task.Delay 1占用00:00:02.0124837

这里我们将处理从订阅中删除并使其成为序列的一部分。这允许我们保持在异步序列范例内,或保持在'Mondad'内。


我们这样做是因为我们可以将值的处理视为异步过程。可以将此Async进程视为Task或Single value Observable Sequence。由于Rx擅长编写可观察的
序列,我们利用了这一点。我们还利用Rx接触,确保顺序交付顺序。



我希望这会有所帮助


Lee Campbell


www.IntroToRx.com







I can't find an asynchronous way to keep a subscription synchronized while the subscription executes asynchronous code.

Essentially, I want to use and combine Rx with a lot of (standard) async/await code and I really hope there is a first-class way to do it. Examples with Console.WriteLine and Thread.Sleep are nice, but now how does Rx really fit in with .NET 4.5???

Because sure I can use Task.WaitAll, but that defeats the asynchronicity of my code and will in the end lead to the old problem of having too many threads blocking for !another! thread doing the work that the blocked thread should have been doing.

Here's some code to show about what I'm trying to do. It yields 2 observations, 1 seconds apart and runs processing on each that takes 2 seconds, writing to the console when items are observed and when starting/stopping processing.

static void Main(string[] args) {
   Task.WaitAll(TestObservable());
}
static async Task TestObservable() {
   var stopwatch = new Stopwatch();
   Action<string, long> writeline = (format, value) =>
     Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
   stopwatch.Start();
   var subject = new Subject<long>();
   Observable.Interval(TimeSpan.FromSeconds(1))
      .Do(value => writeline("Emitting {0}", value))
      .Take(2)
      .Subscribe(_ => subject.OnNext(_));
   var observable = subject.AsObservable();
   using (observable.Subscribe(async value => {
      writeline("Start Handling {0}", value);
      await Task.Delay(TimeSpan.FromSeconds(2));
      writeline("Stop Handling {0}", value);
   })) {
    await Task.Run(() => Console.ReadKey(true));
   }
}

Here's what it writes to the console:

  1.13: Emitting 0
  1.13: Start Handling 0
  2.13: Emitting 1
  2.13: Start Handling 1
  3.14: Stop Handling 0
  4.14: Stop Handling 1

While reading about the Rx contract, I'd have expected there was a way to produce the following output instead (see Reactive Extensions Design Guidelines 4.2)

  1.15: Emitting 0
  1.16: Start Handling 0
  3.16: Stop Handling 0
  3.16: Emitting 1
  3.16: Start Handling 1
  5.16: Stop Handling 1

Even though I wouldn't mind (and more or less expected) the second emitting line to happen somewhere near 2 seconds after the stopwatch started.

解决方案

I see what you are trying to do here and I think I understand your confusion. The intersection of TPL and Rx is a tricky one (and I must confess one that I have not fully mastered yet either).

To reduce your confusion, first lets clear up some things.

1. No need for the subject in your sample. Lets remove that.

//LINQPad query sample
void Main()
{
	Task.WaitAll(TestObservable());
}

// Define other methods and classes here
static async Task TestObservable() {
   var stopwatch = new Stopwatch();
   Action<string, long> writeline = (format, value) => 
     Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
   stopwatch.Start();
   var observable = Observable.Interval(TimeSpan.FromSeconds(1))
      .Do(value => writeline("Emitting {0}", value))
      .Take(2);
   
   using (observable.Subscribe(async value => {
      writeline("Start Handling {0}", value);
      await Task.Delay(TimeSpan.FromSeconds(2));
      writeline("Stop Handling {0}", value);
   })) {
    await Task.Run(() => Console.Read());
   }
}

2. Next let's remove the mixed metaphor in the subscribe method. As the OnNext calls are serialized, adding async in there will break that paradigm. Here we make the OnNext handler syncho

void Main()
{
	Task.WaitAll(TestObservable());
}

// Define other methods and classes here
static async Task TestObservable() {
   var stopwatch = new Stopwatch();
   Action<string, long> writeline = (format, value) => 
     Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
   stopwatch.Start();
   var observable = Observable.Interval(TimeSpan.FromSeconds(1))
      .Do(value => writeline("Emitting {0}", value))
      .Take(2);
   
   using (observable.Subscribe(value => {
      writeline("Start Handling {0}", value);
      Task.Delay(TimeSpan.FromSeconds(2));
      writeline("Stop Handling {0}", value);
   })) {
    await Task.Run(() => Console.Read());
   }
}

Right now we get the output you are looking for, but we are blocking and not leveraging the TPL very well. Here I think you want to bridge to Rx via the ToObservable() operator.

3. Leverage ToObservable and SelectMany to serialize but not burn threads.

Here I also add a variation of a Log extension method I find extremely useful in my applications. Sorry that it screws up your output though.

void Main()
{
	Task.WaitAll(TestObservable());
}

// Define other methods and classes here
static async Task TestObservable() {
   var stopwatch = new Stopwatch();
   Action<string, long> writeline = (format, value) => 
     Console.WriteLine("{1,6:0.00}: " + format, value, stopwatch.Elapsed.TotalSeconds);
   stopwatch.Start();
   var observable = Observable.Interval(TimeSpan.FromSeconds(1))
      .Do(value => writeline("Emitting {0}", value))
      .Take(2)
      .SelectMany(i=>Task.Delay(TimeSpan.FromSeconds(2))
		 		    .ToObservable()
		 		    .Log(string.Format("Task.Delay {0}",i)));
   
   using (observable.Subscribe(value => {
      writeline("done {0}", 0);
   })) 
   {
    await Task.Run(() => Console.Read());
   }
}
public static class LoggerExtensions
{
   public static IObservable<T> Log<T>(this IObservable<T> source, string name)
   {
       return Observable.Using(
           ()=> new Timer(name),
           timer=> Observable.Create<T>(
               o =>
               {
                   Console.WriteLine("{0}.Subscribe()", name);
                   var subscription = source
                       .Do(
                           i => Console.WriteLine("{0}.OnNext({1})", name, i),
                           ex => Console.WriteLine("{0}.OnError({1})", name, ex),
                           () => Console.WriteLine("{0}.OnCompleted()", name))
                       .Subscribe(o);
                   var disposal = Disposable.Create(() => Console.WriteLine("{0}.Dispose()", name));
                   return new CompositeDisposable(subscription, disposal);
               })
           );
   }

   private sealed class Timer : IDisposable
   {
       private readonly string _name;
       private readonly Stopwatch _stopwatch;

       public Timer(string name)
       {
           _name = name;
           _stopwatch = Stopwatch.StartNew();
       }

       public void Dispose()
       {
           _stopwatch.Stop();
           Console.WriteLine("{0} took {1}", _name, _stopwatch.Elapsed);
       }
   }
}

Output:

  1.02: Emitting 0
Task.Delay 0.Subscribe()
  2.02: Emitting 1
Task.Delay 1.Subscribe()
Task.Delay 0.OnNext(())
  3.03: done 0
Task.Delay 0.OnCompleted()
Task.Delay 0.Dispose()
Task.Delay 0 took 00:00:02.0006715
Task.Delay 1.OnNext(())
  4.03: done 0
Task.Delay 1.OnCompleted()
Task.Delay 1.Dispose()
Task.Delay 1 took 00:00:02.0124837

Here we take the processing out of the Subscription and make it part of the sequence. This allows us to stay within the asynchronous sequence paradigm, or staying within the 'Mondad'.

We do this because we can think of the processing of the value as an Async process. This Async processes can be thought of either as a Task or a Single value Observable Sequence. As Rx excels at composing observable sequences we take advantage of that. We also take advantage of the Rx contact that ensures that sequences are guaranteed to be delivered in order.

I hope this helps

Lee Campbell

www.IntroToRx.com




这篇关于OnNext中的异步代码将破坏Rx合约的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆