将异步侦听器转换/包装为Observable(RxJava2) [英] Convert/ wrap async listener to Observable (RxJava2)
问题描述
我想将一个真正的侦听器包装到Observable对象. 对于初学者来说,这是一个测试用例,对于他来说,一切都很好.
I want to wrap a real listener to Observable object. For starters here is a test case, with him everything is fine.
@Override
public void onCreate(@Nullable Bundle savedInstanceState) {
getObservablePhoneState()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer -> Log.i(TAG, "----- subscribe onNext = " + integer));
}
private Flowable<Integer> getObservablePhoneState() {
return Flowable.create(emitter -> {
Log.i(TAG, "Emitting 1");
emitter.onNext(1);
Log.i(TAG, "Emitting 2");
emitter.onNext(2);
}, BackpressureStrategy.BUFFER);
}
*** logcat ***
Emitting 1
Emitting 2
----- subscribe onNext = 1
----- subscribe onNext = 2
此代码会产生错误:
private Flowable<Integer> getObservablePhoneState() {
return Flowable.create(emitter -> {
PhoneStateListener phoneStateListener = new PhoneStateListener() {
@Override
public void onCallStateChanged(int state, String incomingNumber) {
Log.i(TAG, "onCallStateChanged = " + state);
emitter.onNext(state);
}
};
TelephonyManager telephonyManager = (TelephonyManager) getActivity().getSystemService(Context.TELEPHONY_SERVICE);
telephonyManager.listen(phoneStateListener, PhoneStateListener.LISTEN_CALL_STATE);
}, BackpressureStrategy.BUFFER);
}
*** logcat ***
io.reactivex.exceptions.OnErrorNotImplementedException:
Attempt to read from field 'android.os.MessageQueue
android.os.Looper.mQueue' on a null object reference
与Observable.create()相同的错误. 也许是由于以下事实: RxJava2不支持发射空值.
With Observable.create() the same error. Perhaps this is due to the fact that RxJava2 does not support emitting a null value.
如何正确处理?
推荐答案
您应该删除subscribeOn(Schedulers.io())
,以避免在不同的线程中创建PhoneStateListener
,因为在后台试图使用mQueue的Handler发送消息.一片空白.只需致电
You should remove the subscribeOn(Schedulers.io())
to avoid creating the PhoneStateListener
in a different thread, because under the hood is trying to send messages using a Handler which mQueue is null. Just call
getObservablePhoneState()
.subscribe { integer -> Log.i("", "----- subscribe onNext = " + integer) }
这篇关于将异步侦听器转换/包装为Observable(RxJava2)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!