如何等待已结束的IObserver呼叫,包括观察订户呼叫? [英] How to await finished IObserver call including observing subscriber calls?

查看:74
本文介绍了如何等待已结束的IObserver呼叫,包括观察订户呼叫?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当调用onNext并且订阅者在另一个线程/任务上时,如何等待订阅者方法完成?我想念像await onNext()之类的东西主题...

How to wait for subscribers method completion when calling onNext and the subscriber is on another thread/task? I miss something like await onNext() e.g. for Subject...

例如,在同一线程上运行时:

For example when running on the same Thread:

    [Test, Explicit]
    public void TestSyncOnSameTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject
            .Subscribe(
                o => Console.WriteLine("Received {1} on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId,
                    o),
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

将打印输出:

Starting on threadId:10
Received 3 on threadId:10
Received 2 on threadId:10
Received 1 on threadId:10
OnCompleted on threadId:10

在单独的线程上运行时(例如,卸载UI线程)

When running on separate Threads (e.g. to offload the UI-Thread)

    [Test, Explicit]
    public void TestObserveOnSeparateTask()
    {
        Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
        using (var subject = new Subject<int>())
        using (subject.ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(
                o => { Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                         Task.Delay(1000 * o);
                },
                () => Console.WriteLine("OnCompleted on threadId:{0}",
                    Thread.CurrentThread.ManagedThreadId)))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
        }
    }

将会显示以下结果:

Starting on threadId:10
Received 3 on threadId:11

如您所见... using块保留下来是因为主题调用OnNext()立即返回并且不等待订阅者Task的完成.

As you can see... the using block is left cause the subject call OnNext() returns immediately and does not wait for the completion of the subscriber Task.

如何等待"订户呼叫?订单得到保证吗?我尚未找到有关Synchronize()方法以及该方法真正同步内容的好的文档.

How to "await" the subscriber call? Is the order guaranteed? I have not found good documentation for the Synchronize() method and what this method really synchronizes.

我正在寻找类似的东西

await subject.OnNext(3);
await subject.OnNext(2);

在老派方式"中,我将BlockingCollection用作队列和

In the "old school way" I have used a BlockingCollection as queue and

    public async Task<IResponse> Request(IRequest request)
    {
        var taskCompletionSource = new TaskCompletionSource<IResponse>();
        Task<IResponse> tsk = taskCompletionSource.Task;
        Action<IResponse> responseHandler = taskCompletionSource.SetResult; 
        this.requestProcessor.Process(request, responseHandler);
        return await tsk.ConfigureAwait(false);
    }

和requestprocessor类似:

and the requestprocessor is something like:

        while (true)
        {
            //blockingCollection is shared between caller and runner
            blockingCollection.TryTake(out request, Timeout.Infinite);
            // call callback when finished to return to awaited method
        }

现在,我想改用RX/Observables,但是这个卸载话题让我感到震惊...它更多地是关于使用Reactive Extensions发布/订阅,但是我认为我在这里需要诸如request/response之类的东西.用RX可以做到这一点吗?

Now I want to use RX/Observables instead but this offloading topic hits me... It is more about publish/subscribe with Reactive Extensions but I think I need something like request/response here. Any way to do this with RX?

推荐答案

要完成您想做的事情,您可以这样做:

To get what you want done, you could do this:

Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var are = new AutoResetEvent(false);
using (var subject = new Subject<int>())
{
    using (
        subject
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                Task.Delay(1000 * o);
            }, () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
        {
            subject.OnNext(3);
            subject.OnNext(2);
            subject.OnNext(1);
            subject.OnCompleted();
            are.WaitOne();
        }
}
Console.WriteLine("Done.");

这篇关于如何等待已结束的IObserver呼叫,包括观察订户呼叫?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆