如何在没有竞争条件的情况下订阅IObservable序列,强制完成并检索所有数据 [英] How to Subscribe to IObservable Sequence, force completion, and retrieve all data without race conditions

查看:65
本文介绍了如何在没有竞争条件的情况下订阅IObservable序列,强制完成并检索所有数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用可观察物时,我遇到一种模式.

There is a pattern I'm having trouble with when working with observables.

我正在使用蓝牙设备.

I am working with a bluetooth device.

  1. 我向该设备发送一条消息,告诉它执行某项操作,并将结果通知我.
  2. 设备开始发送通知(可能会持续10毫秒或20秒)
  3. 我等待设备完成发送通知.有时这是来自设备的特定消息,有时我只是在一段时间内再也没有收到任何消息.
  4. 我将邮件转换为单个项目或IEnumerable,然后按照自己的喜好进行操作.

示例一:

  1. 我输入带有登录消息和密码的登录命令
  2. 设备会发送成功或失败消息(通常为10毫秒左右)
  3. 我等待消息到来
  4. 我使用该消息告诉用户是否可以继续或是否需要重试密码.

示例二:

  1. 我向蓝牙设备发送了一个命令,请求范围内的所有wifi网络
  2. 设备打开其wifi无线电并发送回未知数量的消息,但在某个时间点停止
  3. 我等待消息停止
  4. 我向用户提供了wifi网络的完整列表

我认为应该以以下方式完成此操作. (我已删除了尽可能多的蓝牙特定代码以帮助专注于Rx):

I think this should be done in this something close the following manner. (I've removed as much bluetooth specific code as possible to help focus on Rx):

//Create a new subject
Subject<int> subject = new Subject<int>();

//Observe the subject until some pre-determined stopping criteria
bool waiting = true;
IObservable<int> sequence = subject.TakeWhile(x => waiting);

//Subscribe to the subject so that I can trigger the stopping criteria
IDisposable subscription = sequence.Subscribe(
                onNext: result =>
                {
                     if (result > 50)
                        waiting = false;
                },
                onCompleted: () =>
                {
                     return;
                });

//fake bluetooth messages
int i = 0;
while (i < 100)
    subject.OnNext(i++);

//Gather them all up once the sequence is complete
//***application hangs here***
List<int> ints = await sequence.ToList() as List<int>;

//This line of code is never run
subscription.Dispose();

我希望Rx家伙可以帮助我理解为什么此ToList()调用挂起.我只是当场为这个问题写了这段代码,所以如果它没有意义,请告诉我,我将对其进行更新.

I'm hoping that an Rx guy can help me understand why this ToList() call hangs. I just wrote this code on the spot for this question so if it doesn't make sense let me know and I'll update it.

这是使用第三方蓝牙库并从蓝牙设备接收项目的实际代码.

Here is the actual code that uses a third-party bluetooth library and receives items from a bluetooth device.

    private static async Task<byte> WritePasswordToPeripheral<P>(P Peripheral, byte[] command) where P : Peripheral, IStatePeripheral
    {
        IGattService service = await Peripheral.RPHDevice.GetKnownService(BleService.Control);
        IGattCharacteristic characteristic = await service.GetKnownCharacteristics(BleCharacteristic.PasswordResult);

        //I know that this TakeWhile isn't necessary here because I'm using FirstAsync() later on
        //In some similar blocks I receive multiple notifications and so I need to decide when to stop listening in this way. 
        //In those situations I would call .ToList() instead of .FirstAsync()
        bool waiting = true;
        await characteristic.EnableNotifications().TakeWhile(x=>waiting);

        IObservable<CharacteristicGattResult> passwordResultSequence = characteristic
            .WhenNotificationReceived();

        IDisposable passwordResultSubscription = passwordResultSequence 
                                                    .Subscribe(
                                                    onNext: result =>
                                                    {
                                                        waiting = false;
                                                    },
                                                    onCompleted: () =>
                                                    {
                                                        return;
                                                    });

        try
        {
            await Peripheral.RPHDevice
                    .WriteCharacteristic(BleService.Control, BleCharacteristic.Password, command)
                    .Timeout(TimeSpan.FromSeconds(10));
        }
        catch (Exception)
        {
            return 0;
        }

        //In this case only one notification ever comes back and so FirstAsync would be nice
        var passwordResult = await passwordResultSequence.FirstAsync();
        await characteristic.DisableNotifications();
        passwordResultSubscription.Dispose();

        return passwordResult.Data[0];
    }

何时收到通知:

    IObservable<CharacteristicGattResult> notifyOb;
    public override IObservable<CharacteristicGattResult> WhenNotificationReceived()
    {
        this.AssertNotify();

        this.notifyOb = this.notifyOb ?? Observable.Create<CharacteristicGattResult>(ob =>
        {
            var handler = new EventHandler<CBCharacteristicEventArgs>((sender, args) =>
            {
                if (!this.Equals(args.Characteristic))
                    return;

                if (args.Error == null)
                    ob.OnNext(new CharacteristicGattResult(this, args.Characteristic.Value?.ToArray()));
                else
                    ob.OnError(new BleException(args.Error.Description));
            });
            this.Peripheral.UpdatedCharacterteristicValue += handler;
            return () => this.Peripheral.UpdatedCharacterteristicValue -= handler;
        })
        .Publish()
        .RefCount();

        return this.notifyOb;
    }

推荐答案

您的代码有很多问题.

首先,Rx是您要尝试同步运行的异步编程模型.呼叫await sequence(以及类似的sequence.Wait())将使您几乎所有时间都感到悲伤.

First up, Rx is an asynchronous programming model that you're trying to run synchronously. Calling await sequence (and similarly sequence.Wait()) will cause you grief almost all of the time.

接下来,您要创建两个对sequence可观察项的订阅-一次使用sequence.Subscribe(...),再一次使用await sequence.ToList().它们是对基础subject的单独订阅,需要将它们视为单独的

Next, you're creating two subscriptions to the sequence observable - once with the sequence.Subscribe(...) and again with the await sequence.ToList(). They are separate subscriptions to the underlying subject and they need to be treated as separate.

最后,您正在将外部状态(bool waiting = true)混合到查询subject.TakeWhile(x => waiting)中.这很不好,因为它本质上是非线程安全的,因此您应该像在多个线程上运行查询一样编写代码.

And finally, you're mixing external state (bool waiting = true) into your query subject.TakeWhile(x => waiting). That's bad as it is inherently non-thread-safe and you should code as if your query is running on multiple threads.

您的代码正在发生的事情是await sequence.ToList()正在预订查询之后,您已经抽出了subject.OnNext(i++)值,因此查询永无止境. 之后 .ToList()不会从值中挤出任何值来触发.TakeWhile(x => waiting)结束可观察对象. .ToList()坐在那里等待OnCompleted永远不会出现.

What is happening with your code is that the await sequence.ToList() is subscribing to your query AFTER you have pumped out your subject.OnNext(i++) values so the query never ends. No value is ever pushed out of the subject AFTER the .ToList() to trigger the .TakeWhile(x => waiting) to end the observable. .ToList() just sits there waiting for the OnCompleted that never comes.

您需要先将await sequence.ToList()移至,然后再抽出这些值-您无法执行此操作,因为它仍会卡住,等待永远不会出现的OnCompleted.

You need to move the await sequence.ToList() to before you pumped out the values - which you can't do because it would still get stuck waiting for for the OnCompleted that never comes.

这就是为什么您需要异步编码.

This is why you need to code asynchronously.

现在,两个订阅也会导致您出现竞争状况. sequence.Subscribe可能在sequence.ToList()获得任何值之前将waiting设置为false.这就是为什么您应该像在多个线程上运行查询一样进行编码.因此,为避免这种情况,您应该只订阅一个.

Now the two subscriptions also causes you a race condition. The sequence.Subscribemight set waiting to false before the sequence.ToList() gets any values. This is why you should code as if your query is running on multiple threads. So to avoid this you should only have one subscription.

您需要丢失.TakeWhile(x => waiting)并将条件推入内部,如下所示:subject.TakeWhile(x => x <= 50);.

You need to lose the .TakeWhile(x => waiting) and push the condition inside like this: subject.TakeWhile(x => x <= 50);.

然后您像这样编写代码:

Then you write your code like this:

//Create a new subject
Subject<int> subject = new Subject<int>();

//Observe the subject until some pre-determined stopping criteria
IObservable<int> sequence = subject.TakeWhile(x => x <= 50);

sequence
    .ToList()
    .Subscribe(list =>
    {
        Console.WriteLine(String.Join(", ", list));
    });

//fake bluetooth messages
int i = 0;
while (i < 100)
    subject.OnNext(i++);

此代码运行并在控制台上生成0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50.

This code runs and produces 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50 on to the console.

不要同步编写Rx代码-丢失await.不要运行可能导致竞争条件的多个订阅.不要在查询中引入外部状态.

Don't write Rx code synchronously - lose the await. Don't run multiple subscriptions that could create race conditions. Don't introduce external state into your queries.

此外,使用WhenNotificationReceived方法,您没有正确完成序列.

Also, with the WhenNotificationReceived method, you're not properly completing the sequence.

您正在使用危险的.Publish().RefCount()运算符对,该运算符对创建的序列在完成后将无法订阅.

You're using the dangerous .Publish().RefCount() operator pair which creates a sequence that can't be subscribed to after it completes.

尝试以下示例:

var query =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Take(3)
        .Publish()
        .RefCount();

var s1 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

var s2 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

s1.Dispose();
s2.Dispose();

var s3 = query.Subscribe(Console.WriteLine);

Thread.Sleep(2500);

s3.Dispose();

这只会产生:


0
1
2
2

s3订阅不产生任何结果.我不认为这就是你所追求的.

The s3 subscription produces nothing. I don't think that this is what you're after.

这篇关于如何在没有竞争条件的情况下订阅IObservable序列,强制完成并检索所有数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆