证明我RxJava中的PublishSubject不是线程安全的 [英] Prove me that PublishSubject in RxJava is not thread safe

查看:443
本文介绍了证明我RxJava中的PublishSubject不是线程安全的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

已声明PublishSubject在RxJava中不是线程安全的.好的.

It is being declared that PublishSubject is not thread safe in RxJava. Ok.

我试图找到任何示例,我试图构建任何示例来模拟竞争条件,这会导致不良结果.但是我不能:(

I'm trying to find any example, I'm trying to construct any example to emulate race condition, that leads to unwanted results. But I can't :(

任何人都可以提供一个示例来证明PublishSubject不是线程安全的吗?

Can anyone provide an example proving that PublishSubject is not thread safe?

推荐答案

我已经找到了证明.我认为这个示例比提供的@akarnokd更明显.

I've found the proof. I think this example more obvious then @akarnokd provided.

    AtomicInteger counter = new AtomicInteger();

    // Thread-safe
    // SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized();

    // Not Thread Safe
    PublishSubject<Object> subject = PublishSubject.create();

    Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter);

    Consumer<Integer> sleep = (s) -> {
        try {
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    subject
            .doOnNext(i -> counter.incrementAndGet())
            .doOnNext(i -> counter.decrementAndGet())
            .doOnNext(print)
            .filter(i -> counter.get() != 0)
            .doOnNext(i -> {
                        throw new NullPointerException("Concurrency detected");
                    }
            )
            .subscribe();

    Runnable r = () -> {
        for (int i = 0; i < 100000; i++) {
            sleep.accept(1);
            subject.onNext(i);
        }
    };

    ExecutorService pool = Executors.newFixedThreadPool(2);
    pool.execute(r);
    pool.execute(r);

这篇关于证明我RxJava中的PublishSubject不是线程安全的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆