退订后,我如何等待一切按Rx可观察的顺序完成? [英] How can I await that everything is done in a Rx observable sequence after unsubscribe?

查看:53
本文介绍了退订后,我如何等待一切按Rx可观察的顺序完成?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

简介

在WPF C#.NET应用程序中,我使用反应性扩展(Rx)订阅事件,而且我经常不得不从数据库中重新加载某些内容以获取更新UI所需的值,因为事件对象通常仅包含ID和一些元数据.

In my WPF C# .NET application I use the reactive extensions (Rx) to subscribe to events and I often have to reload something from the DB to get the values I need to update the UI, because the event objects often only contains IDs and some meta data.

我使用Rx调度在后台加载数据并更新调度程序上的UI.我在Rx序列内部混合"Task.Run"时遇到了一些不好的经验(当使用"SelectMany"时,将不再保证顺序,并且很难在UnitTests中控制调度).另请参阅:

I use the Rx scheduling to load the data in the background and update the UI on the dispatcher. I have made some bad experience with mixing "Task.Run" inside of a Rx sequence (when using "SelectMany" the order is no longer guaranteed and it is hard to control the scheduling in UnitTests). See also: Executing TPL code in a reactive pipeline and controlling execution via test scheduler

我的问题

如果我关闭我的应用程序(或关闭选项卡),我想取消订阅,然后等待DB调用(从Rx选择"调用),该调用在"subscription.Dispose"之后仍然可以运行.到目前为止,我还没有找到任何好的工具或简便的方法来实现这一目标.

If I shutdown my app (or close a tab) I want to unsubscribe and then await the DB call (which is called from a Rx "Select") that still can be running after "subscription.Dispose". Until now I haven't found any good utility or easy way to do that.

问题

是否有任何框架支持等待一切仍在Rx链中运行?

Is there any framework support to await everything still running in a Rx chain?

如果没有,您对如何制作易于使用的实用程序有什么好主意吗?

If not, do you have any good ideas how to make a easy to use utility?

有没有其他好的方法可以达到相同的目的?

Are there any good alternative ways to achieve the same?

示例

public async Task AwaitEverythingInARxChain()
{
    // In real life this is a hot observable event sequence which never completes
    IObservable<int> eventSource = Enumerable.Range(1, int.MaxValue).ToObservable();

    IDisposable subscription = eventSource
        // Load data in the background
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))

        // Update UI on the dispatcher
        .ObserveOn(DispatcherScheduler.Current)
        .SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread.
        .Subscribe(loadedData => UpdateUi(loadedData));

    Thread.Sleep(TimeSpan.FromSeconds(10));
// In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done.

    // Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here.
    subscription.Dispose();

    await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore.

    ShutDownDatabase();
}

我已经尝试过(但没有成功)

  • 使用最终"运算符来设置TaskCompletionSource的结果. 这种方法的问题:退订后最终直接被调用,并且"LoadFromDatabase"仍然可以运行
  • Using the "Finally" operator to set the result of a TaskCompletionSource. The problem with this approach: Finally gets called directly after unsubscribing and "LoadFromDatabase" can still be running

更新:带有控制台输出和TakeUntil

public async Task Main()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5.0))
        .Subscribe(x =>
        {
            Console.WriteLine("Cancel started");
            _shuttingDown.OnNext(Unit.Default);
        });

    await AwaitEverythingInARxChain();
    Console.WriteLine("Cancel finished");
    ShutDownDatabase();
    Thread.Sleep(TimeSpan.FromSeconds(3));
}

private Subject<Unit> _shuttingDown = new Subject<Unit>();

public async Task AwaitEverythingInARxChain()
{
    IObservable<int> eventSource = Observable.Range(0, 10);

    await eventSource
        .ObserveOn(Scheduler.Default)
        .Select(id => LoadFromDatabase(id))
        .ObserveOn(Scheduler.Default)
        .TakeUntil(_shuttingDown)
        .Do(loadedData => UpdateUi(loadedData));
}

public int LoadFromDatabase(int x)
{
    Console.WriteLine("Start LoadFromDatabase: " + x);
    Thread.Sleep(1000);
    Console.WriteLine("Finished LoadFromDatabase: " + x);

    return x;
}

public void UpdateUi(int x)
{
    Console.WriteLine("UpdateUi: " + x);
}

public void ShutDownDatabase()
{
    Console.WriteLine("ShutDownDatabase");
}

输出(实际):

Start LoadFromDatabase: 0
Finished LoadFromDatabase: 0
Start LoadFromDatabase: 1
UpdateUi: 0
Finished LoadFromDatabase: 1
Start LoadFromDatabase: 2
UpdateUi: 1
Finished LoadFromDatabase: 2
Start LoadFromDatabase: 3
UpdateUi: 2
Finished LoadFromDatabase: 3
Start LoadFromDatabase: 4
UpdateUi: 3
Cancel started
Cancel finished
ShutDownDatabase
Finished LoadFromDatabase: 4
Start LoadFromDatabase: 5
Finished LoadFromDatabase: 5
Start LoadFromDatabase: 6
Finished LoadFromDatabase: 6
Start LoadFromDatabase: 7

预期: 我想保证以下是最后的输出:

Expected: I want to have a guarantee that following are the last Outputs:

Cancel finished
ShutDownDatabase

推荐答案

我终于找到了自己的解决方案. 您可以使用TakeWhile实现它. TakeUntil不起作用,因为当第二个可观察序列产生第一个值时,主要可观察序列立即完成.

I finally found a solution myself. You can use TakeWhile to achive it. TakeUntil does not work, because the main observable sequence immediately completes when the second observable sequence produces the first value.

以下是工作解决方案的示例:

Here is a example of the working solution:

     public async Task Main_Solution()
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        Observable
            .Timer(TimeSpan.FromSeconds(4))
            .Subscribe(x =>
            {
                Console.WriteLine("Cancel startedthread='{0}'", Thread.CurrentThread.ManagedThreadId);
                cancellationTokenSource.Cancel();
            });

        await AwaitEverythingInARxChain(cancellationTokenSource.Token);
        Console.WriteLine("Cancel finished thread='{0}'", Thread.CurrentThread.ManagedThreadId);
        ShutDownDatabase();
        Thread.Sleep(TimeSpan.FromSeconds(10));
    }

    public async Task AwaitEverythingInARxChain(CancellationToken token)
    {
        IObservable<int> eventSource = Observable.Range(0, 10);

        await eventSource
            .ObserveOn(Scheduler.Default)
            .Select(id => LoadFromDatabase(id))
            .TakeWhile(_ => !token.IsCancellationRequested)
            .ObserveOn(Scheduler.Default) // Dispatcher in real life
            .Do(loadedData => UpdateUi(loadedData)).LastOrDefaultAsync();
    }

    public int LoadFromDatabase(int x)
    {
        Console.WriteLine("Start LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(TimeSpan.FromSeconds(3));
        Console.WriteLine("Finished LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);

        return x;
    }

    public void UpdateUi(int x)
    {
        Console.WriteLine("UpdateUi: '{0}' thread='{1}'", x, Thread.CurrentThread.ManagedThreadId);
    }

    public void ShutDownDatabase()
    {
        Console.WriteLine("ShutDownDatabase thread='{0}'", Thread.CurrentThread.ManagedThreadId);
    }

输出:

Start LoadFromDatabase: 0 thread='9'
Finished LoadFromDatabase: 0 thread='9'
Start LoadFromDatabase: 1 thread='9'
UpdateUi: '0' thread='10'
Cancel startedthread='4'
Finished LoadFromDatabase: 1 thread='9'
Cancel finished thread='10'
ShutDownDatabase thread='10'

请注意,"ShutDownDatabase"是最后的输出(按预期).它等待直到"LoadFromDatabase"完成第二个值,即使其产生的值未得到进一步处理也是如此.这正是我想要的.

Note that "ShutDownDatabase" is the last output (as expected). It waits until "LoadFromDatabase" is finished for the second value, even if its produced value is not further processed. This is exactly what I want.

这篇关于退订后,我如何等待一切按Rx可观察的顺序完成?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆