如何等待已结束的IObserver呼叫,包括观察订户呼叫? [英] How to await finished IObserver call including observing subscriber calls?
问题描述
当调用onNext并且订阅者在另一个线程/任务上时,如何等待订阅者方法完成?我想念像await onNext()之类的东西主题...
How to wait for subscribers method completion when calling onNext and the subscriber is on another thread/task? I miss something like await onNext() e.g. for Subject...
例如,在同一线程上运行时:
For example when running on the same Thread:
[Test, Explicit]
public void TestSyncOnSameTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject
.Subscribe(
o => Console.WriteLine("Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o),
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
将打印输出:
Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10
在单独的线程上运行时(例如,卸载UI线程)
When running on separate Threads (e.g. to offload the UI-Thread)
[Test, Explicit]
public void TestObserveOnSeparateTask()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
using (var subject = new Subject<int>())
using (subject.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(
o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
},
() => Console.WriteLine("OnCompleted on threadId:{0}",
Thread.CurrentThread.ManagedThreadId)))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
}
}
将会显示以下结果:
Starting on threadId:10
Received 3 on threadId:11
如您所见... using块保留下来是因为主题调用OnNext()立即返回并且不等待订阅者Task的完成.
As you can see... the using block is left cause the subject call OnNext() returns immediately and does not wait for the completion of the subscriber Task.
如何等待"订户呼叫?订单得到保证吗?我尚未找到有关Synchronize()方法以及该方法真正同步内容的好的文档.
How to "await" the subscriber call? Is the order guaranteed? I have not found good documentation for the Synchronize() method and what this method really synchronizes.
我正在寻找类似的东西
await subject.OnNext(3);
await subject.OnNext(2);
在老派方式"中,我将BlockingCollection用作队列和
In the "old school way" I have used a BlockingCollection as queue and
public async Task<IResponse> Request(IRequest request)
{
var taskCompletionSource = new TaskCompletionSource<IResponse>();
Task<IResponse> tsk = taskCompletionSource.Task;
Action<IResponse> responseHandler = taskCompletionSource.SetResult;
this.requestProcessor.Process(request, responseHandler);
return await tsk.ConfigureAwait(false);
}
和requestprocessor类似:
and the requestprocessor is something like:
while (true)
{
//blockingCollection is shared between caller and runner
blockingCollection.TryTake(out request, Timeout.Infinite);
// call callback when finished to return to awaited method
}
现在,我想改用RX/Observables,但是这个卸载话题让我感到震惊...它更多地是关于使用Reactive Extensions发布/订阅,但是我认为我在这里需要诸如request/response之类的东西.用RX可以做到这一点吗?
Now I want to use RX/Observables instead but this offloading topic hits me... It is more about publish/subscribe with Reactive Extensions but I think I need something like request/response here. Any way to do this with RX?
推荐答案
要完成您想做的事情,您可以这样做:
To get what you want done, you could do this:
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
using (
subject
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Task.Delay(1000 * o);
}, () =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
subject.OnNext(3);
subject.OnNext(2);
subject.OnNext(1);
subject.OnCompleted();
are.WaitOne();
}
}
Console.WriteLine("Done.");
这篇关于如何等待已结束的IObserver呼叫,包括观察订户呼叫?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!