如何使用rxjava替换使用易失性变量的线程间通信? [英] How can I replace inter thread communication using volatile variables with rxjava?

查看:114
本文介绍了如何使用rxjava替换使用易失性变量的线程间通信?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序,通过让一个线程在另一个线程检查的某个对象上设置一个volatile变量来在线程之间进行大量通信.我发现这很容易出错,我想尝试使用RxJava替换它,但在某些情况下我不知道如何进行转换.

I've got an application that does a lot of communication between threads by having one thread set a volatile variable on some object which another thread checks. I find this to be very error prone, and I want to try and replace it using RxJava bu there are some cases I can't figure out how to convert.

我现在正在苦苦挣扎的情况是,我有两个线程,让我们一个叫控制器,另一个叫测量器.测量员的工作是每100ms记录一些数量.控制器要与应用程序的各个部分进行大量的工作,并且每隔一段时间就会告诉测量者更改其测量内容.现在,它通过设置一个volatile变量来做到这一点,并且在测量程序循环的每次迭代中,它都会检查该变量以查看要测量的内容.

The case I'm struggling with right now is where I have two threads, lets call one the controller and the other the measurer. The measurer's job is to record some quantity every 100ms. The controller does a lot of work talking to various pieces of the app and every so often it will tell the measurer to change what it's measuring. Right now it does that by setting a volatile variable, and every iteration of the measurer's loop it checks that variable to see what to measure.

由于测量需要时间,因此测量者不能与控制器位于同一线程中,并且控制器不能延迟其正在进行的其他工作.

The measurer can't be in the same thread as the controller as measuring takes time and the controller can't delay the other work it's doing.

感觉解决方案就像使控制器成为可观察的对象,每当需要更新测量者的指令时,它都会发出一个项目,但我看到的唯一方法是在收到事件时使测量者改变其行为.让这些事件的订阅者像以前一样设置volatile变量,然后我一无所获.

It feels like the solution is something like making the controller an observable which will emit an item whenever the instructions to the measurer need updating but the only way I can see for the measurer to change it's behaviour when an event is received is to have a subscriber to these events setting the volatile variable like before, and then I haven't got anywhere.

我想知道是否可以以某种方式获取控制器发出的项目流,并将其转换为一个重复一遍又一遍的流,直到控制器发出不同的项目,然后我可以在测量器,它将在每次接收到一个测量值时进行测量.这是正确的方法吗?如果是,那么如何将控制器发出的项目转换为重复的项目流?

I was wondering if I could somehow take the stream of items emitted by the controller and convert it into a stream that repeats each item over and over until the controller emits a different item, then I can subscribe to these repeated items in the measurer which would do a measurement every time it receives one. Is this the right approach, and if it is, how can I transform the items emitted by the controller into a repeated stream of items?

推荐答案

我对Rx比较陌生,但是我会使用BehaviorSubject.您可以使用distinctUntilChanged(),或将其与计时器Observable组合:

I'm relatively new to Rx, but I would use a BehaviorSubject. You can use distinctUntilChanged(), or combine it with a timer Observable:

    public enum Stat { FOO, BAR }

    public class Controller
    {
        private Subject<Stat> statSubject;

        public Controller()
        {
            statSubject = BehaviorSubject.<Stat>create().toSerialized();
        }

        public Observable<Stat> getStatChange()
        {
            return statSubject.distinctUntilChanged();
        }

        public void setStat( Stat stat )
        {
            statSubject.onNext( stat );
        }
    }

    public class Measurer
    {
        public Measurer( Controller controller )
        {
            Observable.timer( 1, TimeUnit.SECONDS, Schedulers.newThread() )
                .repeat()
                .withLatestFrom(
                        controller.getStatChange(),
                        ( __, stat ) -> stat ) // ignore the Long emitted by timer
                .subscribe( this::measureStat );
        }

        private void measureStat( Stat stat )
        {
            switch( stat )
            {
            case FOO:
                measureFoo();
                break;

            default:
                measureBar();
                break;
            }
        }

        private void measureBar()
        {
            System.out.println( "Measuring Bar" );
        }

        private void measureFoo()
        {
            System.out.println( "Measuring Foo" );
        }
    }

    @Test
    public void testMeasureStats() throws InterruptedException
    {
        Controller controller = new Controller();
        controller.setStat( Stat.BAR );

        @SuppressWarnings( "unused" )
        Measurer measurer = new Measurer( controller );

        Thread.sleep( 5000 );

        controller.setStat( Stat.FOO );

        Thread.sleep( 5000 );

        controller.setStat( Stat.BAR );

        Thread.sleep( 5000 );
    }

输出:

Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar

这篇关于如何使用rxjava替换使用易失性变量的线程间通信?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆