RxJava:InterruptedIOException [英] RxJava: InterruptedIOException
问题描述
我写了一些代码来从服务器下载文件,同时更新进度条.下载代码在 Schedulers.io
线程中运行,更新 ui 代码在 AndroidSchedulers.mainThread
中运行.我的程序在下载开始后终止.这是我的代码:
I wrote some code to download a file from server meanwhile updating progress bar. Downloading code was running in Schedulers.io
thread and updating ui code was running in AndroidSchedulers.mainThread
. My program terminated after download began. Here is my code:
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Response response = getResponse(url);
if (response != null && response.isSuccessful()) {
InputStream is = response.body().byteStream();
subscriber.onNext(response.body().contentLength()); // init progress
File storedFile = Utils.getStoredFile(context, filePath);
OutputStream os = new FileOutputStream(storedFile);
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) != -1) {
// write data
os.write(buffer, 0, len);
count += len;
subscriber.onNext(count); // update progress
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
os.close();
is.close();
response.body().close();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
})
.subscribeOn(Schedulers.io()) // io and network operation
.observeOn(AndroidSchedulers.mainThread()) // UI view update operation
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted -> " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError -> " + e.getMessage());
}
@Override
public void onNext(Long progress) {
Log.d(TAG, "onNext -> " + Thread.currentThread().getName());
Log.d(TAG, "onNext progress -> " + progress);
// here update view in ui thread
}
}
}
这里是错误文本:
java.io.InterruptedIOException: thread interrupted
at okio.Timeout.throwIfReached(Timeout.java:145)
at okio.Okio$2.read(Okio.java:136)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:418)
at okio.RealBufferedSource$1.read(RealBufferedSource.java:371)
at java.io.InputStream.read(InputStream.java:163)
at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:74)
at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:52)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executor at java.util.concurrent.FutureTask.run(FutureTask.java:23 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
at java.lang.Thread.run(Thread.java:841)
推荐答案
observerOn 适用于 Observable.create 但在内部,您正在另一个线程中创建新的 observable.所以你的管道永远不会把监视器交给主线程.我认为您的代码对于您想要实现的目标来说太复杂了.
The observerOn is apply to the Observable.create but internaly you're creating a new observable in another thread. So your pipeline never give the monitor to the main thread. I think your code it's too much complex for what you want to achieve.
为了帮助你理解调度器的概念
Just in case that help you out to understand the concepts of Scheduler
这篇关于RxJava:InterruptedIOException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!